diff --git a/.claude/skills/check-all/SKILL.md b/.claude/skills/check-all/SKILL.md new file mode 100644 index 0000000..d55372a --- /dev/null +++ b/.claude/skills/check-all/SKILL.md @@ -0,0 +1,11 @@ +--- +name: check-all +description: Run the full default nox session suite (lint, test, security scan). Equivalent to CI pipeline checks. +allowed-tools: Bash +--- + +Run the full default nox session suite: black lint check, pytest, semgrep security scan, mypy, pyflakes, and pylint. + +This is equivalent to the CI pipeline checks. + +Run: `nox` diff --git a/.claude/skills/format/SKILL.md b/.claude/skills/format/SKILL.md new file mode 100644 index 0000000..78000ad --- /dev/null +++ b/.claude/skills/format/SKILL.md @@ -0,0 +1,11 @@ +--- +name: format +description: Auto-format code using Black via nox. +allowed-tools: Bash +--- + +Auto-format code using Black via nox. + +Run: `nox -s black_format` + +Black is configured in pyproject.toml with line-length=119. diff --git a/.claude/skills/lint/SKILL.md b/.claude/skills/lint/SKILL.md new file mode 100644 index 0000000..47c1edf --- /dev/null +++ b/.claude/skills/lint/SKILL.md @@ -0,0 +1,9 @@ +--- +name: lint +description: Run all linting checks (black, pyflakes, pylint, mypy) using nox. +allowed-tools: Bash +--- + +Run all linting checks using nox. This runs black (formatting check), pyflakes, pylint, and mypy. + +Run: `nox -s black_lint pyflakes_src pyflakes_examples pyflakes_tests pylint_src pylint_examples pylint_tests mypy` diff --git a/.claude/skills/security-scan/SKILL.md b/.claude/skills/security-scan/SKILL.md new file mode 100644 index 0000000..39cff6a --- /dev/null +++ b/.claude/skills/security-scan/SKILL.md @@ -0,0 +1,9 @@ +--- +name: security-scan +description: Run semgrep security scan on source code using nox. +allowed-tools: Bash +--- + +Run semgrep security scan on the source code using nox. + +Run: `nox -s semgrep_src` diff --git a/.claude/skills/test/SKILL.md b/.claude/skills/test/SKILL.md new file mode 100644 index 0000000..aba07a8 --- /dev/null +++ b/.claude/skills/test/SKILL.md @@ -0,0 +1,17 @@ +--- +name: test +description: Run unit tests using nox. Use when the user wants to run tests. +allowed-tools: Bash +--- + +Run unit tests using nox. Pass any additional arguments through to pytest via nox's posargs. + +Examples: +- Full unit test suite: `nox -s pytest` +- Tests matching a keyword: `nox -s pytest -- -k "test_name"` +- Specific test file: `nox -s pytest -- tests/test_planet_auth/unit/path/to/test_file.py` + +Run: `nox -s pytest -- $ARGUMENTS` + +Note: When `-k` is used, nox automatically disables coverage (`--no-cov`). +Default test paths are configured in pyproject.toml and include `tests/test_planet_auth/unit` and `tests/test_planet_auth_utils/unit`. diff --git a/docs/changelog.md b/docs/changelog.md index b25e09d..9353000 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -1,5 +1,10 @@ # Changelog +## 2.4.0 - 2026-03-30 +- `OidcMultiIssuerValidator` now models trust as explicit (issuer, audience) pairs, + allowing the same issuer to appear with different audiences. +- Minor fixes + ## 2.3.1 - 2025-12-10 - Fix a bug where sops protected files would be rewritten without preserving their sops protection. diff --git a/docs/examples-service.md b/docs/examples-service.md index 0a5d5b5..966a760 100644 --- a/docs/examples-service.md +++ b/docs/examples-service.md @@ -16,9 +16,10 @@ When a service is acting on behalf of one of its clients... ## Verifying OAuth Clients The [planet_auth.OidcMultiIssuerValidator][] class is provided to assist with common OAuth client authentication scenarios. This class can be configured -with a single authority for normal operations, and may optionally be configured -with a secondary authorities. This allows for complex deployments such as -the seamless migration between auth servers over time. +with one or more trusted issuing authorities, each represented as an +(issuer, audience) pair. This allows for complex deployments such as +the seamless migration between auth servers over time, or accepting tokens +minted for different audiences by the same authorization server. This utility class may be configured for entirely local token validation, or may be configured to check token validity against the OAuth token inspection diff --git a/docs/examples/service/flask--oidc-multi-issuer--local-and-remote-validation.py b/docs/examples/service/flask--oidc-multi-issuer--local-and-remote-validation.py index 213af24..660ee81 100644 --- a/docs/examples/service/flask--oidc-multi-issuer--local-and-remote-validation.py +++ b/docs/examples/service/flask--oidc-multi-issuer--local-and-remote-validation.py @@ -48,6 +48,15 @@ # secret) that is permitted client credentials OAuth flow and grant type. # This is not the only possibility. +# Trust is established as (issuer, audience) pairs. Each entry must specify +# exactly one audience. To trust multiple audiences from the same issuer, +# provide separate entries for each (issuer, audience) pair. +# +# Note: The "audiences" config field is a list because the underlying client +# config schema is shared with OAuth clients that request tokens, where +# multiple audiences can be meaningful. For services validating tokens, +# each trust entry must map to exactly one (issuer, audience) pair. + # TODO: we should have an example of how to use a built-in provider to provide # named application server trust environments through use of the # planet_auth_utils.PlanetAuthFactory.initialize_resource_server_validator diff --git a/docs/examples/service/flask--oidc-multi-issuer--local-only-validation.py b/docs/examples/service/flask--oidc-multi-issuer--local-only-validation.py index 628f1dd..4ccc9cd 100644 --- a/docs/examples/service/flask--oidc-multi-issuer--local-only-validation.py +++ b/docs/examples/service/flask--oidc-multi-issuer--local-only-validation.py @@ -49,6 +49,15 @@ # Do not cross the streams. # Seriously. Don't do it. +# Trust is established as (issuer, audience) pairs. Each entry must specify +# exactly one audience. The same issuer may appear multiple times with +# different audiences if the auth server issues tokens for more than one +# audience that this service should accept. +# +# Note: The "audiences" config field is a list because the underlying client +# config schema is shared with OAuth clients that request tokens, where +# multiple audiences can be meaningful. For services validating tokens, +# each trust entry must map to exactly one (issuer, audience) pair. auth_validator = planet_auth.OidcMultiIssuerValidator.from_auth_server_configs( trusted_auth_server_configs=[ { diff --git a/noxfile.py b/noxfile.py index 49b7c24..4e793b2 100644 --- a/noxfile.py +++ b/noxfile.py @@ -3,8 +3,8 @@ import nox -nox.options.stop_on_first_error = True -nox.options.reuse_existing_virtualenvs = False +nox.options.stop_on_first_error = False +nox.options.reuse_existing_virtualenvs = True # Default sessions - all tests, but not packaging nox.options.sessions = [ diff --git a/pyproject.toml b/pyproject.toml index 7d1f0a5..6377938 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,7 +58,7 @@ examples = [ # "planet-auth-config >= 2.0.0" ] test = [ - "black", + "black < 26.0.0", "coverage[toml]", "freezegun", "mypy", diff --git a/src/planet_auth/__init__.py b/src/planet_auth/__init__.py index 667fcc7..8e946a1 100644 --- a/src/planet_auth/__init__.py +++ b/src/planet_auth/__init__.py @@ -112,7 +112,7 @@ class exists. TokenValidatorException, UnknownSigningKeyTokenException, ) -from .oidc.multi_validator import OidcMultiIssuerValidator +from .oidc.multi_validator import OidcMultiIssuerValidator, TrustEntry from .planet_legacy.auth_client import PlanetLegacyAuthClientConfig, PlanetLegacyAuthClient from .static_api_key.auth_client import ( StaticApiKeyAuthClientConfig, @@ -186,6 +186,7 @@ class exists. "OidcClientValidatorAuthClientConfig", "OidcClientValidatorAuthClient", "OidcMultiIssuerValidator", + "TrustEntry", "NoOpAuthClient", "NoOpAuthClientConfig", "PlanetLegacyAuthClient", diff --git a/src/planet_auth/oidc/auth_client.py b/src/planet_auth/oidc/auth_client.py index 6e2dda5..25ddcd9 100644 --- a/src/planet_auth/oidc/auth_client.py +++ b/src/planet_auth/oidc/auth_client.py @@ -564,7 +564,7 @@ def validate_access_token_local( ) if len(conf_audiences) != 1: raise AuthClientException( - message="When using the auth client config's audiences as the source for required token audience during validaiton, only one audience may be specified." + message="When using the auth client config's audiences as the source for required token audience during validation, only one audience may be specified." ) required_audience = conf_audiences[0] diff --git a/src/planet_auth/oidc/multi_validator.py b/src/planet_auth/oidc/multi_validator.py index a140a23..0d194f4 100644 --- a/src/planet_auth/oidc/multi_validator.py +++ b/src/planet_auth/oidc/multi_validator.py @@ -13,7 +13,7 @@ # limitations under the License. import jwt -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, NamedTuple, Optional, Tuple import planet_auth.logging.auth_logger from planet_auth import ExpiredTokenException, TokenValidatorException, InvalidArgumentException @@ -24,6 +24,14 @@ from planet_auth.oidc.api_clients.introspect_api_client import IntrospectionApiClient from planet_auth.oidc.auth_client import OidcAuthClient + +class TrustEntry(NamedTuple): + """An (issuer, audience) pair representing a trusted token authority.""" + + issuer: str + audience: str + + auth_logger = planet_auth.logging.auth_logger.getAuthLogger() @@ -34,6 +42,11 @@ class OidcMultiIssuerValidator: operating mode for most services. This was developed to support migration use cases. + Trust is configured as a set of (issuer, audience) pairs. The same + issuer may appear multiple times with different audiences, allowing + a service to accept tokens from the same authorization server that + were issued for different audiences. + This is a higher level utility class, and is built on top of [planet_auth.AuthClient][] classes. For a lower level utilities, see [planet_auth.TokenValidator][], or see the validation @@ -67,16 +80,17 @@ def __init__( Create a new multi issuer validator using the provided auth clients. The auth clients are expected to be any Auth that implements OIDC functionality. - Since the expected audience could be different for each issuer, - the expectation is that the provided Auth contexts will be configured - with an audience, even though that is an optional configuration parameter - for most implementing classes. + Trust is established as (issuer, audience) pairs. Each provided Auth + context must be configured with an audience. The same issuer may + appear multiple times as long as the audience differs, enabling + services to accept tokens from one authorization server that were + minted for different audiences. Parameters: trusted: log_result: """ - self._trusted: Dict[str, Auth] = {} + self._trusted: Dict[TrustEntry, Auth] = {} self._log_result = log_result def _check_auth_client(auth_client: AuthClient) -> OidcAuthClient: @@ -87,23 +101,38 @@ def _check_auth_client(auth_client: AuthClient) -> OidcAuthClient: raise AuthException( message="Auth Providers used for OIDC token validation must have the audiences configuration value set." ) + if len(auth_client._oidc_client_config.audiences()) > 1: + raise AuthException( + message=( + "Each trusted auth provider must be configured with exactly one audience" + " when used for token validation. To trust multiple audiences from the" + " same issuer, provide separate entries for each (issuer, audience) pair." + " The 'audiences' configuration field is a list because the underlying" + " client configuration schema is shared with OAuth clients that request" + " tokens, where multiple audiences can be meaningful." + ) + ) return auth_client for auth_provider in trusted: if auth_provider: auth_client = _check_auth_client(auth_provider.auth_client()) issuer = auth_client._issuer() - if issuer in self._trusted: + audience = auth_client._oidc_client_config.audiences()[0] + trust_key = TrustEntry(issuer=issuer, audience=audience) + if trust_key in self._trusted: raise AuthException( - message="Cannot configure multiple auth providers for the same issuer '{}'".format(issuer) + message="Cannot configure multiple auth providers for the same issuer '{}' and audience '{}'".format( + issuer, audience + ) ) - self._trusted[issuer] = auth_provider + self._trusted[trust_key] = auth_provider # TODO: we should probably deprecate this method... @staticmethod def from_auth_server_urls( trusted_auth_server_urls: List[str], - audience: str = None, + audience: str, log_result: bool = True, ): """ @@ -124,27 +153,83 @@ def from_auth_server_urls( by a network or configuration problem impacting a single auth server. The multi-validator requires foreknowledge of all the issuers for - proper initialization. So, without this assumption it would be unavoidable + proper initialization. So, without this assumption, it would be unavoidable to introduce network risk into the constructor. This assumption allows us to push all network errors to runtime, and avoids possible initialization time errors. """ - trusted = [] - for auth_server_url in trusted_auth_server_urls: - if auth_server_url: - trusted.append( + return OidcMultiIssuerValidator.from_issuer_audience_pairs( + trusted=[TrustEntry(issuer=url, audience=audience) for url in trusted_auth_server_urls if url], + log_result=log_result, + ) + + @staticmethod + def from_issuer_audience_pairs( + trusted: List["TrustEntry"], + log_result: bool = True, + ): + """ + Create a new multi issuer validator from a list of + [planet_auth.TrustEntry][] named tuples. This is the simplest way + to configure the multi-validator for local-only token validation, + and makes the trust model explicit. + + Each entry represents a single trusted (issuer, audience) pair. + The same issuer may appear multiple times with different audiences, + enabling a service to accept tokens from one authorization server + that were minted for different audiences. + + For advanced use cases that require remote token validation (e.g. + revocation checks), use ``from_auth_server_configs`` instead, which + allows specifying client credentials and other auth client + configuration. + + Warning: + This method assumes that the auth server URL and the issuer + (as burned into signed access tokens) are the same. This is + normally true, but not universally required. See + ``from_auth_server_urls`` for a discussion of the implications. + + Parameters: + trusted: A list of [planet_auth.TrustEntry][] named tuples. + Each entry represents a single trusted authority. Falsy + entries are ignored. + log_result: Control whether successful token validations should + be logged. + + Example: + ```python + from planet_auth import OidcMultiIssuerValidator, TrustEntry + + auth_validator = OidcMultiIssuerValidator.from_issuer_audience_pairs( + trusted=[ + TrustEntry(issuer="https://oauth_server.example.com/oauth2/server_id", audience="https://api.example.com/"), + # Same issuer, different audience: + TrustEntry(issuer="https://oauth_server.example.com/oauth2/server_id", audience="https://internal-api.example.com/"), + # Different issuer: + TrustEntry(issuer="https://other-oauth.example.com/oauth2/server_id", audience="https://api.example.com/"), + ], + ) + ``` + """ + auth_providers = [] + for entry in trusted: + if entry: + if not entry.issuer or not entry.audience: + raise TypeError("TrustEntry must have non-empty issuer and audience values.") + auth_providers.append( Auth.initialize_from_config_dict( client_config={ "client_type": "oidc_client_validator", - "auth_server": auth_server_url, - "issuer": auth_server_url, - "audiences": [audience], + "auth_server": entry.issuer, + "issuer": entry.issuer, + "audiences": [entry.audience], } ) ) return OidcMultiIssuerValidator( - trusted=trusted, + trusted=auth_providers, log_result=log_result, ) @@ -162,8 +247,11 @@ def from_auth_server_configs( Unless remote validation is required, the configuration dictionaries may be sparse, containing only the `auth_server` and `audiences` properties. `auth_server` is expected to be a single string, containing the URL - of the OAuth issuer. `audiences` is expected to be an array, and contain - a list of supported audiences. + of the OAuth issuer. `audiences` is expected to be an array containing + a single audience string. The same auth server may appear in multiple + configuration entries with different audiences, enabling a service to + accept tokens from one authorization server that were minted for + different audiences. log_result: Control whether successful token validations against trusted auth servers should be logged. @@ -175,6 +263,11 @@ def from_auth_server_configs( "auth_server": "https://oauth_server.example.com/oauth2/auth_server_id", "audiences": ["https://api.example.com/"], }, + # Same issuer, different audience: + { + "auth_server": "https://oauth_server.example.com/oauth2/auth_server_id", + "audiences": ["https://internal-api.example.com/"], + }, ], ) ``` @@ -222,6 +315,22 @@ def _check_access_token( return local_validation, remote_validation + @staticmethod + def _get_token_audiences(unverified_decoded_token: dict) -> List[str]: + """ + Extract audiences from an unverified token. Per the OIDC/OAuth2 spec, + the ``aud`` claim may be a single string or a list of strings. + Returns a list in either case, or an empty list when ``aud`` is absent. + """ + aud = unverified_decoded_token.get("aud") + if aud is None: + return [] + if isinstance(aud, str): + return [aud] + if isinstance(aud, list): + return [a for a in aud if isinstance(a, str)] + return [] + def _select_validator(self, token) -> Auth: # WARNING: Treat unverified token claims like toxic waste. # Nothing can be trusted until the token is verified. @@ -236,11 +345,16 @@ def _select_validator(self, token) -> Auth: message=f"Issuer claim ('iss') must be a of string type. '{type(issuer).__name__}' type was detected." ) - validator = self._trusted.get(issuer) - if validator: - return validator + token_audiences = self._get_token_audiences(unverified_decoded_token) + for aud in token_audiences: + validator = self._trusted.get(TrustEntry(issuer=issuer, audience=aud)) + if validator: + return validator + raise AuthException( - message="Rejecting token from an unrecognized issuer '{}'".format(issuer), + message="Rejecting token from an unrecognized issuer/audience combination. issuer='{}' audiences={}".format( + issuer, token_audiences + ), event=AuthEvent.TOKEN_INVALID_BAD_ISSUER, ) diff --git a/src/planet_auth/oidc/token_validator.py b/src/planet_auth/oidc/token_validator.py index 44b6f19..57ed9ba 100644 --- a/src/planet_auth/oidc/token_validator.py +++ b/src/planet_auth/oidc/token_validator.py @@ -224,12 +224,14 @@ def validate_token( # Invalid tokens never get this far. To get this far, all the basics of authentication # have been passed, and it's a question of the proper grants not having been given. if scopes_anyof: - if validated_claims.get(_SCOPE_CLAIM_RFC8693): + rfc8693_scopes = validated_claims.get(_SCOPE_CLAIM_RFC8693) + okta_scopes = validated_claims.get(_SCOPE_CLAIM_OKTA) + if rfc8693_scopes: # RFC 8693 places scopes in a space delimited string. - token_scopes = validated_claims.get(_SCOPE_CLAIM_RFC8693).split() - elif validated_claims.get(_SCOPE_CLAIM_OKTA): + token_scopes = rfc8693_scopes.split() + elif okta_scopes: # No split. Okta places a list of strings in the token. - token_scopes = validated_claims.get(_SCOPE_CLAIM_OKTA) + token_scopes = okta_scopes else: raise InvalidTokenException( message="No OAuth2 Scopes claim could be found in the access token", diff --git a/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_multi_validator.py b/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_multi_validator.py index 3c87876..9cbb09e 100644 --- a/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_multi_validator.py +++ b/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_multi_validator.py @@ -13,6 +13,7 @@ # limitations under the License. import inspect +import json import jwt.utils import secrets import time @@ -28,7 +29,7 @@ from planet_auth.oidc.auth_clients.client_validator import OidcClientValidatorAuthClient from planet_auth.oidc.auth_clients.client_credentials_flow import ClientCredentialsClientSecretAuthClient from planet_auth.auth_exception import AuthException, InvalidTokenException -from planet_auth.oidc.multi_validator import OidcMultiIssuerValidator +from planet_auth.oidc.multi_validator import OidcMultiIssuerValidator, TrustEntry from tests.test_planet_auth.unit.auth.util import StubOidcAuthClient, StubOidcClientConfig, FakeTokenBuilder from tests.test_planet_auth.util import tdata_resource_file_path @@ -46,6 +47,7 @@ TEST_UNTRUSTED_PUB_KEY = tdata_resource_file_path("keys/keypair3_pub_jwk.json") TEST_AUDIENCE = "test_audience" +TEST_AUDIENCE_ALT = "test_audience_alt" TEST_TOKEN_TTL = 60 @@ -137,6 +139,17 @@ def validate_access_token_local( } secondary_issuer_config = StubOidcClientConfig(**secondary_issuer_config_dict) +primary_issuer_alt_audience_config_dict = { + "auth_server": TEST_PRIMARY_ISSUER, + "scopes": ["test_scp0", "test_scp1"], + "audiences": [TEST_AUDIENCE_ALT], + "stub_authority_ttl": TEST_TOKEN_TTL, + "stub_authority_access_token_audience": TEST_AUDIENCE_ALT, + "stub_authority_signing_key_file": TEST_PRIMARY_SIGNING_KEY, + "stub_authority_pub_key_file": TEST_PRIMARY_PUB_KEY, +} +primary_issuer_alt_audience_config = StubOidcClientConfig(**primary_issuer_alt_audience_config_dict) + untrusted_issuer_config_dict = { "auth_server": TEST_UNTRUSTED_ISSUER, "scopes": ["test_scp0", "test_scp2"], @@ -159,7 +172,9 @@ def validate_access_token_local( } bad_config_1 = StubOidcClientConfig(**bad_config_1_dict) + primary_issuer = StubOidcAuthClient(primary_issuer_config) +primary_issuer_alt_audience = StubOidcAuthClient(primary_issuer_alt_audience_config) secondary_issuer = StubOidcAuthClient(secondary_issuer_config) untrusted_issuer = StubOidcAuthClient(untrusted_issuer_config) bad_validator_1 = StubOidcAuthClient(bad_config_1) @@ -190,11 +205,11 @@ class TestMultiValidator: # This would also let us maybe handle other auth mechanisms beyond oauth? @staticmethod def _patch_primary(under_test: OidcMultiIssuerValidator, patched_auth_client: AuthClient): - under_test._trusted[TEST_PRIMARY_ISSUER]._auth_client = patched_auth_client + under_test._trusted[(TEST_PRIMARY_ISSUER, TEST_AUDIENCE)]._auth_client = patched_auth_client @staticmethod def _patch_secondary(under_test: OidcMultiIssuerValidator, patched_auth_client: AuthClient): - under_test._trusted[TEST_SECONDARY_ISSUER]._auth_client = patched_auth_client + under_test._trusted[(TEST_SECONDARY_ISSUER, TEST_AUDIENCE)]._auth_client = patched_auth_client def under_test__only_primary_validator(self, log_primary=True): under_test = OidcMultiIssuerValidator( @@ -287,6 +302,7 @@ def test_malformed_token_4(self): "fake_claim_1": "test claim value", "fake_claim_2": "test claim value", "iss": primary_issuer.token_builder.issuer, + "aud": TEST_AUDIENCE, }, header={ "alg": primary_issuer.token_builder.signing_key_algorithm, @@ -322,16 +338,22 @@ def test_malformed_token_5_iss_liars(self): under_test.validate_access_token(token=test_jwt) # No throw # TC 1 - # Use the real signing key, but make the iss an invalid type. - # You can make the argument this is still valid, because the signature is still - # from a trusted issuer. But, we reject it based on bad structure. - token_body["iss"] = [primary_issuer.token_builder.issuer, untrusted_issuer.token_builder.issuer] - test_jwt = primary_issuer.token_builder.encode(body=token_body, extra_headers=token_header) + # Token with iss set to an invalid type (list instead of string). + # Newer versions of PyJWT prevent encoding non-string iss claims, + # so we use FakeTokenBuilder to craft the malformed token directly. + # The issuer type check happens before signature verification. + fake_jwt = FakeTokenBuilder.fake_token( + body={ + **token_body, + "iss": [primary_issuer.token_builder.issuer, untrusted_issuer.token_builder.issuer], + }, + header=token_header, + ) with pytest.raises( InvalidTokenException, match=re.escape("Issuer claim ('iss') must be a of string type. 'list' type was detected."), ): - under_test.validate_access_token(token=test_jwt) + under_test.validate_access_token(token=fake_jwt) # TC 2 # Liar token. Untrusted issuer signing key claiming to be valid issuer @@ -344,13 +366,22 @@ def test_malformed_token_5_iss_liars(self): # TC 3 # Double-talk liar. Using the untrusted signing key, claiming to be ourselves and the trusted issuer. - token_body["iss"] = [primary_issuer.token_builder.issuer, untrusted_issuer.token_builder.issuer] - test_jwt = untrusted_issuer.token_builder.encode(body=token_body, extra_headers=token_header) + # We encode a valid token with the untrusted key, then tamper the payload + # post-signing to inject a list iss. This produces a token with a real + # (but untrusted) signature that no longer matches the modified payload — + # a more realistic attack than pure garbage signatures. + legit_jwt = untrusted_issuer.token_builder.encode(body=token_body, extra_headers=token_header) + header_b64, payload_b64, signature_b64 = legit_jwt.split(".") + payload_bytes = jwt.utils.base64url_decode(payload_b64) + payload = json.loads(payload_bytes) + payload["iss"] = [primary_issuer.token_builder.issuer, untrusted_issuer.token_builder.issuer] + tampered_payload_b64 = jwt.utils.base64url_encode(json.dumps(payload).encode("utf-8")).decode("utf-8") + tampered_jwt = f"{header_b64}.{tampered_payload_b64}.{signature_b64}" with pytest.raises( InvalidTokenException, match=re.escape("Issuer claim ('iss') must be a of string type. 'list' type was detected."), ): - under_test.validate_access_token(token=test_jwt) + under_test.validate_access_token(token=tampered_jwt) def test_missing_signature(self): # QE TC11 - JWT without a signature @@ -560,7 +591,8 @@ def test_check_auth_client_types(self): ], ) - def test_no_repeat_issuers(self): + def test_no_repeat_issuer_audience_pairs(self): + # Same issuer + same audience should be rejected with pytest.raises(AuthException): OidcMultiIssuerValidator( trusted=[Auth.initialize_from_client(primary_issuer), Auth.initialize_from_client(primary_issuer)], @@ -574,6 +606,65 @@ def test_no_repeat_issuers(self): ], ) + def test_same_issuer_different_audience_allowed(self): + # Same issuer with different audiences should be allowed + under_test = OidcMultiIssuerValidator( + trusted=[ + Auth.initialize_from_client(primary_issuer), + Auth.initialize_from_client(primary_issuer_alt_audience), + ], + ) + assert len(under_test._trusted) == 2 + assert (TEST_PRIMARY_ISSUER, TEST_AUDIENCE) in under_test._trusted + assert (TEST_PRIMARY_ISSUER, TEST_AUDIENCE_ALT) in under_test._trusted + + def test_same_issuer_different_audience_routes_correctly(self): + # Tokens minted for different audiences by the same issuer + # should each be validated by the correct auth provider. + test_case_name = inspect.currentframe().f_code.co_name + test_username = self.username(test_case_name) + + under_test = OidcMultiIssuerValidator( + trusted=[ + Auth.initialize_from_client(primary_issuer), + Auth.initialize_from_client(primary_issuer_alt_audience), + ], + ) + + # Token for the primary audience + access_token = primary_issuer.login(username=test_username, extra_claims={"test_case": test_case_name}) + local_validation, remote_validation = under_test.validate_access_token( + access_token.access_token(), do_remote_revocation_check=False + ) + assert TEST_AUDIENCE == local_validation.get("aud") + assert TEST_PRIMARY_ISSUER == local_validation.get("iss") + + # Token for the alt audience + access_token_alt = primary_issuer_alt_audience.login( + username=test_username, extra_claims={"test_case": test_case_name} + ) + local_validation_alt, remote_validation_alt = under_test.validate_access_token( + access_token_alt.access_token(), do_remote_revocation_check=False + ) + assert TEST_AUDIENCE_ALT == local_validation_alt.get("aud") + assert TEST_PRIMARY_ISSUER == local_validation_alt.get("iss") + + def test_same_issuer_wrong_audience_rejected(self): + # When trust is configured for issuer+audience_A, a token from + # the same issuer but for audience_B should be rejected. + test_case_name = inspect.currentframe().f_code.co_name + test_username = self.username(test_case_name) + + # Only trust primary issuer with the primary audience (not alt) + under_test = self.under_test__only_primary_validator() + + # Issue a token for the alt audience from the same issuer + access_token_alt = primary_issuer_alt_audience.login( + username=test_username, extra_claims={"test_case": test_case_name} + ) + with pytest.raises(AuthException): + under_test.validate_access_token(access_token_alt.access_token(), do_remote_revocation_check=False) + def test_ignore_falsy_issuers_in_direct_construction(self): under_test = OidcMultiIssuerValidator( trusted=["", Auth.initialize_from_client(primary_issuer), None], @@ -619,6 +710,55 @@ def test_ignore_falsy_issuers_in_configdict_construction(self): ) assert len(under_test._trusted) == 2 + def test_issuer_audience_pairs_construction_happy(self): + under_test = OidcMultiIssuerValidator.from_issuer_audience_pairs( + trusted=[ + TrustEntry(issuer=TEST_PRIMARY_ISSUER, audience=TEST_AUDIENCE), + TrustEntry(issuer=TEST_SECONDARY_ISSUER, audience=TEST_AUDIENCE), + ], + ) + assert len(under_test._trusted) == 2 + assert TrustEntry(issuer=TEST_PRIMARY_ISSUER, audience=TEST_AUDIENCE) in under_test._trusted + assert TrustEntry(issuer=TEST_SECONDARY_ISSUER, audience=TEST_AUDIENCE) in under_test._trusted + + def test_issuer_audience_pairs_same_issuer_different_audiences(self): + under_test = OidcMultiIssuerValidator.from_issuer_audience_pairs( + trusted=[ + TrustEntry(issuer=TEST_PRIMARY_ISSUER, audience=TEST_AUDIENCE), + TrustEntry(issuer=TEST_PRIMARY_ISSUER, audience=TEST_AUDIENCE_ALT), + ], + ) + assert len(under_test._trusted) == 2 + assert TrustEntry(issuer=TEST_PRIMARY_ISSUER, audience=TEST_AUDIENCE) in under_test._trusted + assert TrustEntry(issuer=TEST_PRIMARY_ISSUER, audience=TEST_AUDIENCE_ALT) in under_test._trusted + + def test_issuer_audience_pairs_rejects_duplicate(self): + with pytest.raises(AuthException): + OidcMultiIssuerValidator.from_issuer_audience_pairs( + trusted=[ + TrustEntry(issuer=TEST_PRIMARY_ISSUER, audience=TEST_AUDIENCE), + TrustEntry(issuer=TEST_PRIMARY_ISSUER, audience=TEST_AUDIENCE), + ], + ) + + def test_issuer_audience_pairs_ignores_falsy(self): + under_test = OidcMultiIssuerValidator.from_issuer_audience_pairs( + trusted=["", TrustEntry(issuer=TEST_PRIMARY_ISSUER, audience=TEST_AUDIENCE), None], + ) + assert len(under_test._trusted) == 1 + + def test_issuer_audience_pairs_rejects_empty_issuer(self): + with pytest.raises(TypeError, match="non-empty"): + OidcMultiIssuerValidator.from_issuer_audience_pairs( + trusted=[TrustEntry(issuer="", audience=TEST_AUDIENCE)], + ) + + def test_issuer_audience_pairs_rejects_empty_audience(self): + with pytest.raises(TypeError, match="non-empty"): + OidcMultiIssuerValidator.from_issuer_audience_pairs( + trusted=[TrustEntry(issuer=TEST_PRIMARY_ISSUER, audience="")], + ) + def test_reject_unknown_issuer(self): # QE TC15 test_case_name = inspect.currentframe().f_code.co_name @@ -639,6 +779,34 @@ def test_fail_bad_config_1(self): with pytest.raises(AuthException): self.under_test__bad_client_1() + def test_reject_multiple_audiences_in_config_dict(self): + # Each trust entry must have exactly one audience. + # Multiple audiences in a single entry should be rejected. + test_conf_dict = { + "auth_server": TEST_PRIMARY_ISSUER, + "audiences": [TEST_AUDIENCE, TEST_AUDIENCE_ALT], + } + with pytest.raises(AuthException): + OidcMultiIssuerValidator.from_auth_server_configs( + trusted_auth_server_configs=[test_conf_dict], + ) + + def test_reject_multiple_audiences_mixed_with_valid(self): + # A valid single-audience entry alongside an invalid multi-audience + # entry should still be rejected. + valid_conf_dict = { + "auth_server": TEST_PRIMARY_ISSUER, + "audiences": [TEST_AUDIENCE], + } + invalid_conf_dict = { + "auth_server": TEST_SECONDARY_ISSUER, + "audiences": [TEST_AUDIENCE, TEST_AUDIENCE_ALT], + } + with pytest.raises(AuthException): + OidcMultiIssuerValidator.from_auth_server_configs( + trusted_auth_server_configs=[valid_conf_dict, invalid_conf_dict], + ) + def test_config_dict_construction_happy(self): test_conf_dict_1 = { "auth_server": TEST_PRIMARY_ISSUER, @@ -663,9 +831,12 @@ def test_config_dict_construction_sets_client_type(self): } under_test = OidcMultiIssuerValidator.from_auth_server_configs(trusted_auth_server_configs=[test_conf_dict]) assert len(under_test._trusted) == 1 - assert isinstance(under_test._trusted[TEST_PRIMARY_ISSUER].auth_client(), OidcClientValidatorAuthClient) + assert isinstance( + under_test._trusted[(TEST_PRIMARY_ISSUER, TEST_AUDIENCE)].auth_client(), OidcClientValidatorAuthClient + ) assert ( - under_test._trusted[TEST_PRIMARY_ISSUER].auth_client()._oidc_client_config.issuer() == TEST_PRIMARY_ISSUER + under_test._trusted[(TEST_PRIMARY_ISSUER, TEST_AUDIENCE)].auth_client()._oidc_client_config.issuer() + == TEST_PRIMARY_ISSUER ) test_conf_dict["client_type"] = "oidc_client_credentials_secret" @@ -674,8 +845,14 @@ def test_config_dict_construction_sets_client_type(self): test_conf_dict["issuer"] = "__test_dummy__" under_test = OidcMultiIssuerValidator.from_auth_server_configs(trusted_auth_server_configs=[test_conf_dict]) assert len(under_test._trusted) == 1 - assert isinstance(under_test._trusted["__test_dummy__"].auth_client(), ClientCredentialsClientSecretAuthClient) - assert under_test._trusted["__test_dummy__"].auth_client()._oidc_client_config.issuer() == "__test_dummy__" + assert isinstance( + under_test._trusted[("__test_dummy__", TEST_AUDIENCE)].auth_client(), + ClientCredentialsClientSecretAuthClient, + ) + assert ( + under_test._trusted[("__test_dummy__", TEST_AUDIENCE)].auth_client()._oidc_client_config.issuer() + == "__test_dummy__" + ) @mock.patch("planet_auth.logging.auth_logger.AuthLogger.warning") @mock.patch("planet_auth.logging.auth_logger.AuthLogger.info") diff --git a/tests/test_planet_auth_utils/unit/auth_utils/test_plauth_factory.py b/tests/test_planet_auth_utils/unit/auth_utils/test_plauth_factory.py index edd09d8..4f29691 100644 --- a/tests/test_planet_auth_utils/unit/auth_utils/test_plauth_factory.py +++ b/tests/test_planet_auth_utils/unit/auth_utils/test_plauth_factory.py @@ -433,9 +433,12 @@ def test_custom_valid_configs(self): # Check if the issuers match the passed custom configuration for priority, config in zip(ISSUER_PRIORITIES, [VALID_PRIMARY_CONFIGS, VALID_SECONDARY_CONFIGS]): - issuers = list(test_validator_vars[priority].keys()) - self.assertEqual(len(issuers), 1) - self.assertEqual(issuers[0], config[0]["auth_server"]) + trust_keys = list(test_validator_vars[priority].keys()) + self.assertEqual(len(trust_keys), 1) + self.assertEqual( + trust_keys[0], + planet_auth.TrustEntry(issuer=config[0]["auth_server"], audience=config[0]["audiences"][0]), + ) def test_custom_invalid_configs(self): with self.assertRaises(planet_auth.AuthException) as context: diff --git a/version.txt b/version.txt index 2bf1c1c..197c4d5 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -2.3.1 +2.4.0