From 66cdaaf7792561c3ae2e4fbc48c4086164e5cb08 Mon Sep 17 00:00:00 2001 From: Adam Dangoor Date: Thu, 21 May 2026 15:49:45 +0100 Subject: [PATCH] Improve Model Target auth fidelity --- docs/source/differences-to-vws.rst | 3 +- newsfragments/3192.change | 1 + src/mock_vws/_flask_server/vws.py | 2 +- src/mock_vws/_model_target_web_api.py | 125 +++++++++++++--- tests/mock_vws/test_flask_app_usage.py | 2 +- tests/mock_vws/test_model_target_web_api.py | 154 ++++++++++++++++---- tests/mock_vws/test_requests_mock_usage.py | 6 +- tests/mock_vws/test_respx_mock_usage.py | 4 +- 8 files changed, 240 insertions(+), 57 deletions(-) create mode 100644 newsfragments/3192.change diff --git a/docs/source/differences-to-vws.rst b/docs/source/differences-to-vws.rst index c2ae9a2c8..2a1359df4 100644 --- a/docs/source/differences-to-vws.rst +++ b/docs/source/differences-to-vws.rst @@ -115,7 +115,8 @@ Model Target datasets The Model Target Web API mock supports OAuth2 token requests, standard and advanced dataset creation, status polling, dataset downloads, and deletion. The generated dataset download is a small valid zip file containing request metadata, not a real Vuforia Engine Model Target dataset. -Model Target API routes accept any non-empty bearer token. +Model Target API routes require a syntactically JSON Web Token-shaped bearer token, such as the token returned by the mock OAuth2 route. +The mock does not verify token signatures, claims, expiry, or revocation. Header cases ------------ diff --git a/newsfragments/3192.change b/newsfragments/3192.change new file mode 100644 index 000000000..6cbbc6a99 --- /dev/null +++ b/newsfragments/3192.change @@ -0,0 +1 @@ +Improve Model Target Web API mock authentication failure responses. diff --git a/src/mock_vws/_flask_server/vws.py b/src/mock_vws/_flask_server/vws.py index 9704e6cad..451da248d 100644 --- a/src/mock_vws/_flask_server/vws.py +++ b/src/mock_vws/_flask_server/vws.py @@ -136,7 +136,7 @@ def _flask_request_data() -> RequestData: method=request.method, path=request.path, headers=dict(request.headers), - body=request.data, + body=request.get_data(parse_form_data=False), ) diff --git a/src/mock_vws/_model_target_web_api.py b/src/mock_vws/_model_target_web_api.py index 899d0bc6d..515a2756d 100644 --- a/src/mock_vws/_model_target_web_api.py +++ b/src/mock_vws/_model_target_web_api.py @@ -16,6 +16,9 @@ _ResponseType = tuple[int, dict[str, str], str | bytes] _MAX_ADVANCED_MODEL_COUNT = 20 +_JWT_DOT_COUNT = 2 +_MOCK_MODEL_TARGET_CLIENT_ID = "client-id" +_MOCK_MODEL_TARGET_CLIENT_SECRET = "client-secret" # noqa: S105 @beartype @@ -57,6 +60,16 @@ def _error_response( ) +@beartype +def _oauth2_error_response( + *, + status_code: HTTPStatus, + body: dict[str, str], +) -> _ResponseType: + """Return an OAuth2 error response.""" + return _json_response(status_code=status_code, body=body) + + @beartype def _get_header(request: RequestData, name: str) -> str | None: """Return a request header, case-insensitively.""" @@ -67,6 +80,28 @@ def _get_header(request: RequestData, name: str) -> str | None: return None +@beartype +def _basic_auth_credentials(auth_header: str | None) -> tuple[str, str] | None: + """Return HTTP Basic credentials from an authorization header.""" + if auth_header is None or not auth_header.startswith("Basic "): + return None + + encoded_credentials = auth_header.removeprefix("Basic ").strip() + try: + decoded_credentials = base64.b64decode( + s=encoded_credentials, + validate=True, + ).decode(encoding="utf-8") + except ValueError: + return None + + client_id, separator, client_secret = decoded_credentials.partition(":") + if not separator: + return None + + return client_id, client_secret + + @beartype def _require_bearer_token(request: RequestData) -> _ResponseType | None: """Return an error response if the request has no bearer token.""" @@ -78,47 +113,95 @@ def _require_bearer_token(request: RequestData) -> _ResponseType | None: message="no Bearer token", target="jwt", ) - if not auth_header.removeprefix("Bearer ").strip(): + bearer_token = auth_header.removeprefix("Bearer ").strip() + if not bearer_token: + return _error_response( + status_code=HTTPStatus.UNAUTHORIZED, + code="401", + message="no Bearer token", + target="jwt", + ) + if bearer_token.count(".") != _JWT_DOT_COUNT: return _error_response( status_code=HTTPStatus.UNAUTHORIZED, code="401", - message="invalid Bearer token", + message="Invalid JWT serialization: Missing dot delimiter(s)", target="jwt", ) return None +@beartype +def _fake_jwt(*, token_source: bytes) -> str: + """Return a deterministic bearer token for the mock.""" + + def encode_part(value: dict[str, Any]) -> str: + """Return a base64url-encoded token part.""" + raw_part = json.dumps( + obj=value, + sort_keys=True, + separators=(",", ":"), + ).encode(encoding="utf-8") + return ( + base64.urlsafe_b64encode(s=raw_part) + .decode( + encoding="ascii", + ) + .rstrip("=") + ) + + header = encode_part(value={"alg": "mock", "typ": "JWT"}) + payload = encode_part( + value={ + "aud": "vuforia-model-target", + "src": base64.urlsafe_b64encode(s=token_source) + .decode( + encoding="ascii", + ) + .rstrip("="), + }, + ) + return f"{header}.{payload}.mock-signature" + + @beartype def oauth2_token(request: RequestData) -> _ResponseType: """Return a fake OAuth2 access token.""" auth_header = _get_header(request=request, name="Authorization") form = parse_qs(qs=request.body.decode(encoding="utf-8")) - grant_type = form.get("grant_type", [""])[0] - has_basic_auth = auth_header is not None and auth_header.startswith( - "Basic ", - ) - has_password_credentials = all( - form.get(field, [""])[0] for field in ("username", "password") - ) - if grant_type not in {"", "client_credentials", "password"} or ( - not has_basic_auth and not has_password_credentials - ): - return _error_response( + grant_type = form.get("grant_type", ["client_credentials"])[0] + if grant_type != "client_credentials": + return _oauth2_error_response( status_code=HTTPStatus.BAD_REQUEST, - code="BAD_REQUEST", - message="Invalid OAuth2 token request.", - target="grant_type", + body={"error": "unsupported_grant_type"}, + ) + + basic_credentials = _basic_auth_credentials(auth_header=auth_header) + if basic_credentials is None: + return _oauth2_error_response( + status_code=HTTPStatus.UNAUTHORIZED, + body={ + "error": "invalid_request", + "error_description": ( + "Missing or invalid authorization header" + ), + }, + ) + + if basic_credentials != ( + _MOCK_MODEL_TARGET_CLIENT_ID, + _MOCK_MODEL_TARGET_CLIENT_SECRET, + ): + return _oauth2_error_response( + status_code=HTTPStatus.UNAUTHORIZED, + body={"error": "invalid_client"}, ) token_source = request.body or (auth_header or "").encode() - access_token = base64.urlsafe_b64encode(s=token_source).decode( - encoding="ascii", - ) - access_token = access_token.rstrip("=") or "mock-vuforia-access-token" return _json_response( status_code=HTTPStatus.OK, body={ - "access_token": access_token, + "access_token": _fake_jwt(token_source=token_source), "token_type": "bearer", "expires_in": 3600, }, diff --git a/tests/mock_vws/test_flask_app_usage.py b/tests/mock_vws/test_flask_app_usage.py index 24388e60c..18423c59b 100644 --- a/tests/mock_vws/test_flask_app_usage.py +++ b/tests/mock_vws/test_flask_app_usage.py @@ -96,7 +96,7 @@ class TestProcessingTime: # There is a race condition in this test type - if tests start to # fail, consider increasing the leeway. - LEEWAY = 0.5 + LEEWAY = 1.0 def test_default( self, diff --git a/tests/mock_vws/test_model_target_web_api.py b/tests/mock_vws/test_model_target_web_api.py index fdd9059a5..b67cc6649 100644 --- a/tests/mock_vws/test_model_target_web_api.py +++ b/tests/mock_vws/test_model_target_web_api.py @@ -1,5 +1,6 @@ """Verified fake tests for the Model Target Web API.""" +import base64 import json from http import HTTPMethod, HTTPStatus from typing import Any @@ -17,6 +18,7 @@ _VWS_HOST = "https://vws.vuforia.com" _DATASET_UUID = "0b12466eee5d49409a440927006ff5d8" +_MOCK_BEARER_TOKEN = "mock.header.signature" def _dataset_request(*, cad_data_url: str) -> dict[str, Any]: @@ -114,6 +116,17 @@ def _assert_model_target_error( } +def _assert_oauth2_error( + *, + response: requests.Response, + status_code: HTTPStatus, + body: dict[str, str], +) -> None: + """Assert an OAuth2 error response.""" + assert response.status_code == status_code + assert response.json() == body + + @pytest.mark.usefixtures("verify_model_target_mock_vuforia") class TestAuthentication: """Tests for Model Target Web API authentication.""" @@ -195,44 +208,126 @@ def test_missing_bearer_token( }, } + @staticmethod + @pytest.mark.parametrize( + argnames=("authorization", "message"), + argvalues=[ + pytest.param("Bearer ", "no Bearer token", id="blank"), + pytest.param( + "Bearer invalid-token", + "Invalid JWT serialization: Missing dot delimiter(s)", + id="malformed", + ), + ], + ) + def test_invalid_bearer_token( + *, + authorization: str, + message: str, + ) -> None: + """Invalid bearer tokens are rejected.""" + response = requests.get( + url=f"{_VWS_HOST}/modeltargets/datasets/{_DATASET_UUID}/status", + headers={"Authorization": authorization}, + timeout=30, + ) -class TestMockErrors: - """Tests for mock-only Model Target Web API error paths.""" + _assert_model_target_error( + response=response, + status_code=HTTPStatus.UNAUTHORIZED, + code="401", + message=message, + target="jwt", + ) @staticmethod - def test_invalid_oauth2_token_request() -> None: + @pytest.mark.parametrize( + argnames=("auth", "data", "status_code", "body"), + argvalues=[ + pytest.param( + None, + {"grant_type": "client_credentials"}, + HTTPStatus.UNAUTHORIZED, + { + "error": "invalid_request", + "error_description": ( + "Missing or invalid authorization header" + ), + }, + id="missing-basic-auth", + ), + pytest.param( + ("invalid-client-id", "invalid-client-secret"), + {"grant_type": "client_credentials"}, + HTTPStatus.UNAUTHORIZED, + {"error": "invalid_client"}, + id="invalid-client", + ), + pytest.param( + ("invalid-client-id", "invalid-client-secret"), + {"grant_type": "unsupported"}, + HTTPStatus.BAD_REQUEST, + {"error": "unsupported_grant_type"}, + id="unsupported-grant-type", + ), + ], + ) + def test_invalid_oauth2_token_request( + *, + auth: tuple[str, str] | None, + data: dict[str, str], + status_code: HTTPStatus, + body: dict[str, str], + ) -> None: """Invalid OAuth2 token requests are rejected.""" - with MockVWS(): - response = requests.post( - url=f"{_VWS_HOST}/oauth2/token", - data={"grant_type": "unsupported"}, - timeout=30, - ) + response = requests.post( + url=f"{_VWS_HOST}/oauth2/token", + auth=auth, + data=data, + timeout=30, + ) - _assert_model_target_error( + _assert_oauth2_error( response=response, - status_code=HTTPStatus.BAD_REQUEST, - code="BAD_REQUEST", - message="Invalid OAuth2 token request.", - target="grant_type", + status_code=status_code, + body=body, ) + +class TestMockErrors: + """Tests for mock-only Model Target Web API error paths.""" + @staticmethod - def test_blank_bearer_token() -> None: - """A blank bearer token is rejected.""" + @pytest.mark.parametrize( + argnames="authorization", + argvalues=[ + pytest.param("Basic not-base64!", id="invalid-base64"), + pytest.param( + ( + "Basic " + + base64.b64encode(s=b"client-id-without-secret").decode() + ), + id="missing-separator", + ), + ], + ) + def test_invalid_basic_auth_header(*, authorization: str) -> None: + """Malformed OAuth2 Basic auth headers are rejected.""" with MockVWS(): - response = requests.get( - url=f"{_VWS_HOST}/modeltargets/datasets/{_DATASET_UUID}/status", - headers={"Authorization": "Bearer "}, + response = requests.post( + url=f"{_VWS_HOST}/oauth2/token", + headers={"Authorization": authorization}, + data={"grant_type": "client_credentials"}, timeout=30, ) - _assert_model_target_error( + _assert_oauth2_error( response=response, status_code=HTTPStatus.UNAUTHORIZED, - code="401", - message="invalid Bearer token", - target="jwt", + body={ + "error": "invalid_request", + "error_description": "Missing or invalid authorization header", + }, ) @staticmethod @@ -266,7 +361,10 @@ def test_invalid_request_body( with MockVWS(): response = requests.post( url=f"{_VWS_HOST}/modeltargets/datasets", - headers={"Authorization": "Bearer token", **headers}, + headers={ + "Authorization": f"Bearer {_MOCK_BEARER_TOKEN}", + **headers, + }, data=body, timeout=30, ) @@ -337,7 +435,7 @@ def test_invalid_dataset_request( with MockVWS(): response = requests.post( url=f"{_VWS_HOST}{path}", - headers={"Authorization": "Bearer token"}, + headers={"Authorization": f"Bearer {_MOCK_BEARER_TOKEN}"}, json=body, timeout=30, ) @@ -381,7 +479,7 @@ def test_unknown_dataset( response = requests.request( method=method, url=f"{_VWS_HOST}{path}", - headers={"Authorization": "Bearer token"}, + headers={"Authorization": f"Bearer {_MOCK_BEARER_TOKEN}"}, timeout=30, ) @@ -399,7 +497,7 @@ def test_processing_dataset_cannot_be_downloaded() -> None: with MockVWS(processing_time_seconds=60): create_response = requests.post( url=f"{_VWS_HOST}/modeltargets/datasets", - headers={"Authorization": "Bearer token"}, + headers={"Authorization": f"Bearer {_MOCK_BEARER_TOKEN}"}, json=_UNAUTHENTICATED_DATASET_REQUEST, timeout=30, ) @@ -408,7 +506,7 @@ def test_processing_dataset_cannot_be_downloaded() -> None: f"{_VWS_HOST}/modeltargets/datasets/" f"{create_response.json()['uuid']}/dataset" ), - headers={"Authorization": "Bearer token"}, + headers={"Authorization": f"Bearer {_MOCK_BEARER_TOKEN}"}, timeout=30, ) diff --git a/tests/mock_vws/test_requests_mock_usage.py b/tests/mock_vws/test_requests_mock_usage.py index 34d6d8aa5..98b485a5b 100644 --- a/tests/mock_vws/test_requests_mock_usage.py +++ b/tests/mock_vws/test_requests_mock_usage.py @@ -272,7 +272,7 @@ class TestProcessingTime: # There is a race condition in this test type - if tests start to # fail, consider increasing the leeway. - LEEWAY = 0.5 + LEEWAY = 1.0 def test_default(self, image_file_failed_state: io.BytesIO) -> None: """By default, targets in the mock takes 2 seconds to be processed.""" @@ -1132,7 +1132,7 @@ def test_advanced_dataset_workflow() -> None: with MockVWS(processing_time_seconds=0): response = requests.post( url="https://vws.vuforia.com/modeltargets/advancedDatasets", - headers={"Authorization": "Bearer token"}, + headers={"Authorization": "Bearer mock.header.signature"}, json=_MODEL_TARGET_DATASET_REQUEST, timeout=30, ) @@ -1142,7 +1142,7 @@ def test_advanced_dataset_workflow() -> None: "https://vws.vuforia.com/modeltargets/" f"advancedDatasets/{dataset_uuid}/status" ), - headers={"Authorization": "Bearer token"}, + headers={"Authorization": "Bearer mock.header.signature"}, timeout=30, ) diff --git a/tests/mock_vws/test_respx_mock_usage.py b/tests/mock_vws/test_respx_mock_usage.py index 467becf3f..cc7fd461f 100644 --- a/tests/mock_vws/test_respx_mock_usage.py +++ b/tests/mock_vws/test_respx_mock_usage.py @@ -191,7 +191,7 @@ def test_standard_dataset_status() -> None: with MockVWS(processing_time_seconds=0): create_response = httpx.post( url="https://vws.vuforia.com/modeltargets/datasets", - headers={"Authorization": "Bearer token"}, + headers={"Authorization": "Bearer mock.header.signature"}, json=_MODEL_TARGET_DATASET_REQUEST, timeout=30, ) @@ -201,7 +201,7 @@ def test_standard_dataset_status() -> None: "https://vws.vuforia.com/modeltargets/datasets/" f"{dataset_uuid}/status" ), - headers={"Authorization": "Bearer token"}, + headers={"Authorization": "Bearer mock.header.signature"}, timeout=30, )