From 1040170ae812740d6bc5ce8771b5505df9c82262 Mon Sep 17 00:00:00 2001 From: Carl Alexander Adams Date: Sun, 5 Oct 2025 08:14:59 -0700 Subject: [PATCH 1/6] cherry-picking some small incidental changes from a larger feature branch to front-run the main MR. --- .gitignore | 1 + pyproject.toml | 1 + src/planet_auth/oidc/token_validator.py | 17 +++++-- src/planet_auth/request_authenticator.py | 4 +- .../auth_clients/oidc/test_token_validator.py | 44 ++++++++++++++++--- 5 files changed, 56 insertions(+), 11 deletions(-) diff --git a/.gitignore b/.gitignore index d80596f..7829556 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,5 @@ site *.venv venv* version-with-buildnum.txt +.vscode __pycache__ diff --git a/pyproject.toml b/pyproject.toml index 1f89537..7d1f0a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -106,6 +106,7 @@ build-backend = "setuptools.build_meta" source = [ "planet_auth", "planet_auth_utils", + "planet_auth_config_injection", # "tests", ] branch = true diff --git a/src/planet_auth/oidc/token_validator.py b/src/planet_auth/oidc/token_validator.py index 632bcee..986fcbb 100644 --- a/src/planet_auth/oidc/token_validator.py +++ b/src/planet_auth/oidc/token_validator.py @@ -14,7 +14,7 @@ import jwt import time -from typing import Dict, List +from typing import Any, Dict, List, Tuple import planet_auth.logging.auth_logger from planet_auth.auth_exception import AuthException, InvalidTokenException @@ -257,9 +257,18 @@ def validate_token( return validated_claims @staticmethod - def hazmat_unverified_decode(token_str): - # WARNING: Treat unverified token claims like toxic waste. - # Nothing can be trusted until the token is verified. + @InvalidArgumentException.recast(jwt.exceptions.DecodeError) + def hazmat_unverified_decode(token_str) -> Tuple[Dict, Dict, Any]: + """ + Decide a JWT without verifying the signature or any claims. + + !!! Warning + Treat unverified token claims with extreme caution. + Nothing can be trusted until the token is verified. + + Returns: + Returns the decoded JWT header, payload, and signature + """ unverified_complete = jwt.decode_complete(token_str, options={"verify_signature": False}) # nosemgrep return unverified_complete["header"], unverified_complete["payload"], unverified_complete["signature"] diff --git a/src/planet_auth/request_authenticator.py b/src/planet_auth/request_authenticator.py index d7bdbf6..b4c80f2 100644 --- a/src/planet_auth/request_authenticator.py +++ b/src/planet_auth/request_authenticator.py @@ -126,7 +126,7 @@ def update_credential_data(self, new_credential_data: Dict) -> None: self._credential.set_data(new_credential_data) self._credential.save() # Clobber old data that may be saved to disk. # Clear-out auth material when a new credential is set. - # child classes are expected to populate it JIT for auth + # Child classes are expected to populate it JIT for auth # requests. self._token_body = None @@ -135,7 +135,7 @@ def credential(self, refresh_if_needed: bool = False) -> Optional[Credential]: Return the current credential. This may not be the credential the authenticator was constructed with. - Request Authenticators are free to refresh credentials depending in the + Request Authenticators are free to refresh credentials depending on the needs of the implementation. This may happen upon this request, or may happen as a side effect of RequestAuthenticator operations. """ diff --git a/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_token_validator.py b/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_token_validator.py index d67881d..1543ddb 100644 --- a/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_token_validator.py +++ b/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_token_validator.py @@ -104,6 +104,7 @@ def test_valid_access_token(self): access_token = self.token_builder_1.construct_oidc_access_token_rfc8693( username=TEST_TOKEN_USER, requested_scopes=TEST_TOKEN_SCOPES, ttl=TEST_TOKEN_TTL ) + validated_claims = under_test.validate_token( access_token, issuer=self.token_builder_1.issuer, @@ -111,6 +112,11 @@ def test_valid_access_token(self): ) self.assertEqual(TEST_TOKEN_USER, validated_claims["sub"]) + u_header, u_body, _ = TokenValidator.hazmat_unverified_decode(access_token) + self.assertDictEqual(u_header, {"alg": "RS256", "kid": "test_keypair1", "typ": "JWT"}) + self.assertEqual(u_body.get("aud"), "test_token_audience_for_keypair1") + self.assertEqual(u_body.get("sub"), "unit_test_user") + def test_empty_access_token(self): # QE TC3, TC5 under_test = self.under_test_1 @@ -126,26 +132,36 @@ def test_empty_access_token(self): issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) + with self.assertRaises(InvalidArgumentException): + TokenValidator.hazmat_unverified_decode("") + with self.assertRaises(InvalidArgumentException): + TokenValidator.hazmat_unverified_decode(None) def test_malformed_token_1(self): # QE TC6 - just some random garbage. under_test = self.under_test_1 + test_token = secrets.token_bytes(2048) with self.assertRaises(InvalidTokenException): under_test.validate_token( - token_str=secrets.token_bytes(2048), + token_str=test_token, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) + with self.assertRaises(InvalidArgumentException): + TokenValidator.hazmat_unverified_decode(test_token) def test_malformed_token_2(self): # QE TC6 - just some random garbage, but URL safe. under_test = self.under_test_1 + test_token = secrets.token_urlsafe(2048) with self.assertRaises(InvalidTokenException): under_test.validate_token( - token_str=secrets.token_urlsafe(2048), + token_str=test_token, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) + with self.assertRaises(InvalidArgumentException): + TokenValidator.hazmat_unverified_decode(test_token) def test_malformed_token_3(self): # QE TC6 - random garbage, but has JWT three dot structure. @@ -162,11 +178,14 @@ def test_malformed_token_3(self): issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) + with self.assertRaises(InvalidArgumentException): + TokenValidator.hazmat_unverified_decode(fake_jwt) def test_malformed_token_4(self): # QE TC6 - random garbage, but looks more JWT like with encoded json. def _fake_token(header, body): - return "{}.{}.{}".format( + fake_sig_bytes = secrets.token_bytes(256) + fake_jwt = "{}.{}.{}".format( str( jwt.utils.base64url_encode( bytes( @@ -180,11 +199,12 @@ def _fake_token(header, body): jwt.utils.base64url_encode(bytes(json.dumps(body), "utf-8")), encoding="utf-8", ), - str(jwt.utils.base64url_encode(secrets.token_bytes(256)), encoding="utf-8"), + str(jwt.utils.base64url_encode(fake_sig_bytes), encoding="utf-8"), ) + return fake_jwt, fake_sig_bytes under_test = self.under_test_1 - fake_jwt = _fake_token( + fake_jwt, fake_sig_bytes = _fake_token( { "alg": self.token_builder_1.signing_key_algorithm, "kid": self.token_builder_1.signing_key_id, @@ -199,6 +219,13 @@ def _fake_token(header, body): audience=self.token_builder_1.audience, ) + # This is good enough for an unverified decode. It's still invalid, which is + # why you are careful with unverified decoded data. + u_header, u_body, u_sig = TokenValidator.hazmat_unverified_decode(fake_jwt) + self.assertDictEqual(u_header, {"alg": "RS256", "kid": "test_keypair1"}) + self.assertDictEqual(u_body, {"fake_claim": "test claim value"}) + self.assertEqual(u_sig, fake_sig_bytes) + def test_malformed_token_missing_issuer(self): # QE TC14 under_test = self.under_test_1 @@ -287,6 +314,11 @@ def _bad_token(header, body): audience=self.token_builder_1.audience, ) + u_header, u_body, u_sig = TokenValidator.hazmat_unverified_decode(bad_jwt) + self.assertIsNotNone(u_header) + self.assertIsNotNone(u_body) + self.assertEqual(u_sig, b"") + def test_missing_signature_2(self): # QE TC11 - JWT without a signature, and with no second "." def _bad_token(header, body): @@ -321,6 +353,8 @@ def _bad_token(header, body): issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) + with self.assertRaises(InvalidArgumentException): + TokenValidator.hazmat_unverified_decode(bad_jwt) def test_empty_issuer_arg(self): under_test = self.under_test_1 From 8ad43a6055d88fe0ab5eb54f2c397c08572ecf77 Mon Sep 17 00:00:00 2001 From: Carl Alexander Adams Date: Sun, 5 Oct 2025 09:48:27 -0700 Subject: [PATCH 2/6] more details test assertions for token validator --- src/planet_auth/oidc/multi_validator.py | 2 +- src/planet_auth/oidc/token_validator.py | 2 +- .../auth_clients/oidc/test_token_validator.py | 139 +++++++++++++----- 3 files changed, 103 insertions(+), 40 deletions(-) diff --git a/src/planet_auth/oidc/multi_validator.py b/src/planet_auth/oidc/multi_validator.py index 693c43b..a140a23 100644 --- a/src/planet_auth/oidc/multi_validator.py +++ b/src/planet_auth/oidc/multi_validator.py @@ -286,7 +286,7 @@ def validate_access_token( """ if not token: - raise InvalidArgumentException(message="Cannot validate empty string as a token") + raise InvalidArgumentException(message="Cannot decode empty string as a token") validator = self._select_validator(token) local_validation, remote_validation = self._check_access_token( diff --git a/src/planet_auth/oidc/token_validator.py b/src/planet_auth/oidc/token_validator.py index 986fcbb..0bf83bd 100644 --- a/src/planet_auth/oidc/token_validator.py +++ b/src/planet_auth/oidc/token_validator.py @@ -181,7 +181,7 @@ def validate_token( """ # PyJWT should enforce this, but we have unit tests in case... if not token_str: - raise InvalidArgumentException(message="Cannot validate empty string as a token") + raise InvalidArgumentException(message="Cannot decode empty string as a token") if not issuer: # PyJWT does not seem to raise if the issuer is explicitly None, even when # verify_iss was selected. diff --git a/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_token_validator.py b/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_token_validator.py index 1543ddb..7fb9631 100644 --- a/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_token_validator.py +++ b/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_token_validator.py @@ -120,48 +120,65 @@ def test_valid_access_token(self): def test_empty_access_token(self): # QE TC3, TC5 under_test = self.under_test_1 - with self.assertRaises(InvalidArgumentException): + with self.assertRaises(InvalidArgumentException) as raised1: under_test.validate_token( "", issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) - with self.assertRaises(InvalidArgumentException): + self.assertEqual("Cannot decode empty string as a token", str(raised1.exception)) + + with self.assertRaises(InvalidArgumentException) as raised2: under_test.validate_token( None, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) - with self.assertRaises(InvalidArgumentException): + self.assertEqual("Cannot decode empty string as a token", str(raised2.exception)) + + with self.assertRaises(InvalidArgumentException) as raised3: TokenValidator.hazmat_unverified_decode("") - with self.assertRaises(InvalidArgumentException): + self.assertEqual("Not enough segments (DecodeError)", str(raised3.exception)) + + with self.assertRaises(InvalidArgumentException) as raised4: TokenValidator.hazmat_unverified_decode(None) + self.assertEqual("Invalid token type. Token must be a (DecodeError)", str(raised4.exception)) def test_malformed_token_1(self): # QE TC6 - just some random garbage. under_test = self.under_test_1 test_token = secrets.token_bytes(2048) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: under_test.validate_token( token_str=test_token, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) - with self.assertRaises(InvalidArgumentException): + # Random garbage may throw different errors. + # self.assertEqual("TBD", str(raised1.exception)) + + with self.assertRaises(InvalidArgumentException) as raised2: TokenValidator.hazmat_unverified_decode(test_token) + # Random garbage may throw different errors + # self.assertEqual("TBD", str(raised2.exception)) def test_malformed_token_2(self): # QE TC6 - just some random garbage, but URL safe. under_test = self.under_test_1 test_token = secrets.token_urlsafe(2048) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: under_test.validate_token( token_str=test_token, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) - with self.assertRaises(InvalidArgumentException): + # Random garbage may throw different errors. + # self.assertEqual("TBD", str(raised1.exception)) + + with self.assertRaises(InvalidArgumentException) as raised2: TokenValidator.hazmat_unverified_decode(test_token) + # Random garbage may throw different errors. + # self.assertEqual("TBD", str(raised2.exception)) def test_malformed_token_3(self): # QE TC6 - random garbage, but has JWT three dot structure. @@ -172,14 +189,19 @@ def test_malformed_token_3(self): str(jwt.utils.base64url_encode(secrets.token_bytes(256)), encoding="utf-8"), ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: under_test.validate_token( token_str=fake_jwt, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) - with self.assertRaises(InvalidArgumentException): + # Random garbage may throw different errors. + # self.assertEqual("TBD", str(raised1.exception)) + + with self.assertRaises(InvalidArgumentException) as raised2: TokenValidator.hazmat_unverified_decode(fake_jwt) + # Random garbage may throw different errors. + # self.assertEqual("TBD", str(raised2.exception)) def test_malformed_token_4(self): # QE TC6 - random garbage, but looks more JWT like with encoded json. @@ -212,12 +234,14 @@ def _fake_token(header, body): {"fake_claim": "test claim value"}, ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: under_test.validate_token( token_str=fake_jwt, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) + # Will random garbage always parse to this exact error? + self.assertEqual("Signature verification failed (InvalidSignatureError)", str(raised1.exception)) # This is good enough for an unverified decode. It's still invalid, which is # why you are careful with unverified decoded data. @@ -232,12 +256,13 @@ def test_malformed_token_missing_issuer(self): access_token = self.token_builder_1.construct_oidc_access_token_rfc8693( username=TEST_TOKEN_USER, requested_scopes=TEST_TOKEN_SCOPES, ttl=TEST_TOKEN_TTL, remove_claims=["iss"] ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: under_test.validate_token( access_token, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) + self.assertEqual('Token is missing the "iss" claim (MissingRequiredClaimError)', str(raised1.exception)) def test_altered_token(self): # QE TC10 @@ -272,12 +297,13 @@ def _alter_token(access_token, old_claim_value, new_claim_value): # Now, actually change the claim value, and look for the exception altered_access_token = _alter_token(access_token, "sensitive_value_A", "sensitive_value_B") - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: under_test.validate_token( altered_access_token, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) + self.assertEqual("Signature verification failed (InvalidSignatureError)", str(raised1.exception)) def test_missing_signature_1(self): # QE TC11 - JWT without a signature, but with the "." @@ -307,12 +333,13 @@ def _bad_token(header, body): {"fake_claim": "test claim value"}, ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: under_test.validate_token( token_str=bad_jwt, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) + self.assertEqual("Signature verification failed (InvalidSignatureError)", str(raised1.exception)) u_header, u_body, u_sig = TokenValidator.hazmat_unverified_decode(bad_jwt) self.assertIsNotNone(u_header) @@ -347,14 +374,17 @@ def _bad_token(header, body): {"fake_claim": "test claim value"}, ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: under_test.validate_token( token_str=bad_jwt, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) - with self.assertRaises(InvalidArgumentException): + self.assertEqual("Not enough segments (DecodeError)", str(raised1.exception)) + + with self.assertRaises(InvalidArgumentException) as raised2: TokenValidator.hazmat_unverified_decode(bad_jwt) + self.assertEqual("Not enough segments (DecodeError)", str(raised2.exception)) def test_empty_issuer_arg(self): under_test = self.under_test_1 @@ -367,18 +397,21 @@ def test_empty_issuer_arg(self): audience=self.token_builder_1.audience, ) self.assertEqual(TEST_TOKEN_USER, validated_claims["sub"]) - with self.assertRaises(InvalidArgumentException): + with self.assertRaises(InvalidArgumentException) as raised1: validated_claims = under_test.validate_token( access_token, issuer="", audience=self.token_builder_1.audience, ) - with self.assertRaises(InvalidArgumentException): + self.assertEqual("Cannot validate token with no required issuer provided", str(raised1.exception)) + + with self.assertRaises(InvalidArgumentException) as raised2: validated_claims = under_test.validate_token( access_token, issuer=None, audience=self.token_builder_1.audience, ) + self.assertEqual("Cannot validate token with no required issuer provided", str(raised2.exception)) def test_empty_audience_arg(self): under_test = self.under_test_1 @@ -391,18 +424,21 @@ def test_empty_audience_arg(self): audience=self.token_builder_1.audience, ) self.assertEqual(TEST_TOKEN_USER, validated_claims["sub"]) - with self.assertRaises(InvalidArgumentException): + with self.assertRaises(InvalidArgumentException) as raised1: validated_claims = under_test.validate_token( access_token, issuer=self.token_builder_1.issuer, audience="", ) - with self.assertRaises(InvalidArgumentException): + self.assertEqual("Cannot validate token with no required audience provided", str(raised1.exception)) + + with self.assertRaises(InvalidArgumentException) as raised2: validated_claims = under_test.validate_token( access_token, issuer=self.token_builder_1.issuer, audience=None, ) + self.assertEqual("Cannot validate token with no required audience provided", str(raised2.exception)) def test_access_token_unknown_signing_key(self): # QE TC12 @@ -410,12 +446,13 @@ def test_access_token_unknown_signing_key(self): access_token = self.token_builder_2.construct_oidc_access_token_rfc8693( username=TEST_TOKEN_USER, requested_scopes=TEST_TOKEN_SCOPES, ttl=TEST_TOKEN_TTL ) - with self.assertRaises(UnknownSigningKeyTokenException): + with self.assertRaises(UnknownSigningKeyTokenException) as raised1: under_test.validate_token( access_token, issuer=self.token_builder_2.issuer, audience=self.token_builder_2.audience, ) + self.assertEqual("Could not find signing key for key ID test_keypair2", str(raised1.exception)) def test_access_token_issuer_mismatch(self): # QE TC15 untrusted issuer. (See also multi-validator tests) @@ -423,12 +460,13 @@ def test_access_token_issuer_mismatch(self): access_token = self.token_builder_1.construct_oidc_access_token_rfc8693( username=TEST_TOKEN_USER, requested_scopes=TEST_TOKEN_SCOPES, ttl=TEST_TOKEN_TTL ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: under_test.validate_token( access_token, issuer=self.token_builder_1.issuer + "_make_it_mismatch", audience=self.token_builder_1.audience, ) + self.assertEqual("Invalid issuer (InvalidIssuerError)", str(raised1.exception)) def test_access_token_incorrect_audience(self): # QE TC17 @@ -436,12 +474,13 @@ def test_access_token_incorrect_audience(self): access_token = self.token_builder_1.construct_oidc_access_token_rfc8693( username=TEST_TOKEN_USER, requested_scopes=TEST_TOKEN_SCOPES, ttl=TEST_TOKEN_TTL ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: under_test.validate_token( access_token, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience + "_make_it_mismatch", ) + self.assertEqual("Audience doesn't match (InvalidAudienceError)", str(raised1.exception)) def test_access_token_multiple_audiences(self): # QE TC17 @@ -461,10 +500,11 @@ def test_access_token_multiple_audiences(self): under_test.validate_token( token_str=access_token, issuer=self.token_builder_1.issuer, audience="extra_audience_2" ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: under_test.validate_token( token_str=access_token, issuer=self.token_builder_1.issuer, audience="extra_audience_3" ) + self.assertEqual("Audience doesn't match (InvalidAudienceError)", str(raised1.exception)) @freezegun.freeze_time(as_kwarg="frozen_time") def test_access_token_expired(self, frozen_time): @@ -474,25 +514,29 @@ def test_access_token_expired(self, frozen_time): username=TEST_TOKEN_USER, requested_scopes=TEST_TOKEN_SCOPES, ttl=3 ) frozen_time.tick(5) - with self.assertRaises(ExpiredTokenException): + with self.assertRaises(ExpiredTokenException) as raised1: under_test.validate_token( access_token, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) + self.assertEqual("Signature has expired (ExpiredSignatureError)", str(raised1.exception)) def test_access_token_missing_claim(self): under_test = self.under_test_1 access_token = self.token_builder_1.construct_oidc_access_token_rfc8693( username=TEST_TOKEN_USER, requested_scopes=TEST_TOKEN_SCOPES, ttl=TEST_TOKEN_TTL ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: under_test.validate_token( access_token, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, required_claims=["missing_claim_1", "missing_claim_2"], ) + self.assertEqual( + 'Token is missing the "missing_claim_1" claim (MissingRequiredClaimError)', str(raised1.exception) + ) def test_access_token_scope_validation__no_scopes_required__rfc8692(self): under_test = self.under_test_1 @@ -548,13 +592,14 @@ def _scope_validation_assertions(self, under_test, access_token): audience=self.token_builder_1.audience, scopes_anyof=[TEST_TOKEN_SCOPE_1, TEST_TOKEN_SCOPE_2], ) - with self.assertRaises(ScopeNotGrantedTokenException): + with self.assertRaises(ScopeNotGrantedTokenException) as raised1: under_test.validate_token( access_token, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, scopes_anyof=[TEST_TOKEN_SCOPE_3], ) + # self.assertRegexpMatches(str(raised1.exception), r"Access token did not grant sufficient scope.") def test_access_token_scope_validation__scope_required__rfc8693(self): # QE TC16 @@ -567,8 +612,9 @@ def test_access_token_scope_validation__scope_required__rfc8693(self): access_token = self.token_builder_1.construct_oidc_access_token_rfc8693( username=TEST_TOKEN_USER, requested_scopes=[], ttl=TEST_TOKEN_TTL ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: self._scope_validation_assertions(under_test, access_token) + self.assertEqual("No OAuth2 Scopes claim could be found in the access token", str(raised1.exception)) def test_access_token_scope_validation__scope_required__okta(self): # QE TC16 @@ -581,8 +627,9 @@ def test_access_token_scope_validation__scope_required__okta(self): access_token = self.token_builder_1.construct_oidc_access_token_okta( username=TEST_TOKEN_USER, requested_scopes=[], ttl=TEST_TOKEN_TTL ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: self._scope_validation_assertions(under_test, access_token) + self.assertEqual("No OAuth2 Scopes claim could be found in the access token", str(raised1.exception)) def test_id_token_nonce(self): under_test = self.under_test_1 @@ -596,10 +643,11 @@ def test_id_token_nonce(self): under_test.validate_id_token( id_token_with_nonce, issuer=self.token_builder_1.issuer, client_id="test_client_id", nonce="12345" ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: under_test.validate_id_token( id_token_with_nonce, issuer=self.token_builder_1.issuer, client_id="test_client_id", nonce="67890" ) + self.assertEqual("Token nonce did not match expected value", str(raised1.exception)) # TODO: See comments in the class. Unsure if we should make missing # nonce check's fatal when there is a nonce. It would be very strict. @@ -607,13 +655,15 @@ def test_id_token_nonce(self): id_token_with_nonce, issuer=self.token_builder_1.issuer, client_id="test_client_id", nonce=None ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised2: under_test.validate_id_token( id_token_without_nonce, issuer=self.token_builder_1.issuer, client_id="test_client_id", nonce="12345", ) + self.assertEqual('Token is missing the "nonce" claim (MissingRequiredClaimError)', str(raised2.exception)) + under_test.validate_id_token( id_token_with_nonce, issuer=self.token_builder_1.issuer, client_id="test_client_id" ) @@ -638,8 +688,11 @@ def test_validate_id_token_multiple_audiences(self): client_id="test_client_id", extra_claims={"aud": ["test_client_id", "extra_audience_1", "extra_audience_2"]}, ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised1: under_test.validate_id_token(id_token, issuer=self.token_builder_1.issuer, client_id="test_client_id") + self.assertEqual( + '"azp" claim mut be present when ID token contains multiple audiences.', str(raised1.exception) + ) # azp claim doesn't contain expected value when multiple audiences # are present @@ -648,8 +701,12 @@ def test_validate_id_token_multiple_audiences(self): client_id="test_client_id", extra_claims={"aud": ["test_client_id", "extra_audience_1", "extra_audience_2"], "azp": "mismatch_azp"}, ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised2: under_test.validate_id_token(id_token, issuer=self.token_builder_1.issuer, client_id="test_client_id") + self.assertEqual( + 'ID token "azp" claim expected to match the client ID "test_client_id", but was "mismatch_azp"', + str(raised2.exception), + ) @freezegun.freeze_time(as_kwarg="frozen_time") def test_min_jwks_fetch_interval(self, frozen_time): @@ -665,12 +722,13 @@ def test_min_jwks_fetch_interval(self, frozen_time): self.assertEqual(0, under_test._jwks_client.jwks.call_count) # t1 - key miss loads keys - with self.assertRaises(UnknownSigningKeyTokenException): + with self.assertRaises(UnknownSigningKeyTokenException) as raised1: under_test.validate_token( token_unknown_signer, issuer=self.token_builder_2.issuer, audience=self.token_builder_2.audience, ) + self.assertEqual("Could not find signing key for key ID test_keypair2", str(raised1.exception)) self.assertEqual(1, under_test._jwks_client.jwks.call_count) # t2 - key hit should pull from hot cache. @@ -684,12 +742,14 @@ def test_min_jwks_fetch_interval(self, frozen_time): # t3 - Repeated key hits and misses inside the fetch interval should # not trigger a reload of the jwks verification keys for n in range(5): - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised_repeatedly: under_test.validate_token( token_unknown_signer, issuer=self.token_builder_2.issuer, audience=self.token_builder_2.audience, ) + self.assertEqual("Could not find signing key for key ID test_keypair2", str(raised_repeatedly.exception)) + under_test.validate_token( token_known_signer, issuer=self.token_builder_1.issuer, @@ -708,12 +768,13 @@ def test_min_jwks_fetch_interval(self, frozen_time): issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, ) - with self.assertRaises(InvalidTokenException): + with self.assertRaises(InvalidTokenException) as raised3: under_test.validate_token( token_unknown_signer, issuer=self.token_builder_2.issuer, audience=self.token_builder_2.audience, ) + self.assertEqual("Could not find signing key for key ID test_keypair2", str(raised3.exception)) self.assertEqual(2, under_test._jwks_client.jwks.call_count) def test_untrusted_token_algorithm(self): @@ -722,12 +783,13 @@ def test_untrusted_token_algorithm(self): test_token = self.token_builder_3.construct_oidc_access_token_rfc8693( username=TEST_TOKEN_USER, requested_scopes=TEST_TOKEN_SCOPES, ttl=TEST_TOKEN_TTL ) - with self.assertRaises(InvalidAlgorithmTokenException): + with self.assertRaises(InvalidAlgorithmTokenException) as raised1: under_test.validate_token( test_token, issuer=self.token_builder_3.issuer, audience=self.token_builder_3.audience, ) + self.assertEqual("Unknown or unsupported token algorithm RS512", str(raised1.exception)) def test_jwks_endpoint_lists_unsupported_algorithm(self): # See CG-867 @@ -762,12 +824,13 @@ def test_jwks_endpoint_lists_unsupported_algorithm(self): access_token = self.token_builder_4.construct_oidc_access_token_rfc8693( username=TEST_TOKEN_USER, requested_scopes=TEST_TOKEN_SCOPES, ttl=TEST_TOKEN_TTL ) - with self.assertRaises(UnknownSigningKeyTokenException): + with self.assertRaises(UnknownSigningKeyTokenException) as raised1: under_test.validate_token( access_token, issuer=self.token_builder_4.issuer, audience=self.token_builder_4.audience, ) + self.assertEqual("Could not find signing key for key ID test_keypair4", str(raised1.exception)) # def test_max_jwks_age(self): # # Feature not implemented. From 6f7463664659d523e8834aed88396822bc3251ff Mon Sep 17 00:00:00 2001 From: Carl Alexander Adams Date: Sun, 5 Oct 2025 09:55:16 -0700 Subject: [PATCH 3/6] flake fixes --- .../auth_clients/oidc/test_token_validator.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_token_validator.py b/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_token_validator.py index 7fb9631..30b83fe 100644 --- a/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_token_validator.py +++ b/tests/test_planet_auth/unit/auth/auth_clients/oidc/test_token_validator.py @@ -148,7 +148,7 @@ def test_malformed_token_1(self): # QE TC6 - just some random garbage. under_test = self.under_test_1 test_token = secrets.token_bytes(2048) - with self.assertRaises(InvalidTokenException) as raised1: + with self.assertRaises(InvalidTokenException): # as raised1: under_test.validate_token( token_str=test_token, issuer=self.token_builder_1.issuer, @@ -157,7 +157,7 @@ def test_malformed_token_1(self): # Random garbage may throw different errors. # self.assertEqual("TBD", str(raised1.exception)) - with self.assertRaises(InvalidArgumentException) as raised2: + with self.assertRaises(InvalidArgumentException): # as raised2: TokenValidator.hazmat_unverified_decode(test_token) # Random garbage may throw different errors # self.assertEqual("TBD", str(raised2.exception)) @@ -166,7 +166,7 @@ def test_malformed_token_2(self): # QE TC6 - just some random garbage, but URL safe. under_test = self.under_test_1 test_token = secrets.token_urlsafe(2048) - with self.assertRaises(InvalidTokenException) as raised1: + with self.assertRaises(InvalidTokenException): # as raised1: under_test.validate_token( token_str=test_token, issuer=self.token_builder_1.issuer, @@ -175,7 +175,7 @@ def test_malformed_token_2(self): # Random garbage may throw different errors. # self.assertEqual("TBD", str(raised1.exception)) - with self.assertRaises(InvalidArgumentException) as raised2: + with self.assertRaises(InvalidArgumentException): # as raised2: TokenValidator.hazmat_unverified_decode(test_token) # Random garbage may throw different errors. # self.assertEqual("TBD", str(raised2.exception)) @@ -189,7 +189,7 @@ def test_malformed_token_3(self): str(jwt.utils.base64url_encode(secrets.token_bytes(256)), encoding="utf-8"), ) - with self.assertRaises(InvalidTokenException) as raised1: + with self.assertRaises(InvalidTokenException): # as raised1: under_test.validate_token( token_str=fake_jwt, issuer=self.token_builder_1.issuer, @@ -198,7 +198,7 @@ def test_malformed_token_3(self): # Random garbage may throw different errors. # self.assertEqual("TBD", str(raised1.exception)) - with self.assertRaises(InvalidArgumentException) as raised2: + with self.assertRaises(InvalidArgumentException): # as raised2: TokenValidator.hazmat_unverified_decode(fake_jwt) # Random garbage may throw different errors. # self.assertEqual("TBD", str(raised2.exception)) @@ -592,14 +592,13 @@ def _scope_validation_assertions(self, under_test, access_token): audience=self.token_builder_1.audience, scopes_anyof=[TEST_TOKEN_SCOPE_1, TEST_TOKEN_SCOPE_2], ) - with self.assertRaises(ScopeNotGrantedTokenException) as raised1: + with self.assertRaises(ScopeNotGrantedTokenException): under_test.validate_token( access_token, issuer=self.token_builder_1.issuer, audience=self.token_builder_1.audience, scopes_anyof=[TEST_TOKEN_SCOPE_3], ) - # self.assertRegexpMatches(str(raised1.exception), r"Access token did not grant sufficient scope.") def test_access_token_scope_validation__scope_required__rfc8693(self): # QE TC16 From 61242a43e9fc88f3b6a09092612d13fdf3b809cb Mon Sep 17 00:00:00 2001 From: Carl Alexander Adams Date: Sun, 5 Oct 2025 10:19:19 -0700 Subject: [PATCH 4/6] test util improvements --- tests/test_planet_auth/unit/auth/util.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/test_planet_auth/unit/auth/util.py b/tests/test_planet_auth/unit/auth/util.py index b77fcc7..12d8f40 100644 --- a/tests/test_planet_auth/unit/auth/util.py +++ b/tests/test_planet_auth/unit/auth/util.py @@ -69,9 +69,10 @@ def _construct_oidc_access_token(self, username: str, ttl: int, extra_claims, re "sub": username, "aud": self.audience, "iat": now, - "exp": now + ttl, "jti": str(uuid.uuid4()), } + if ttl: + unsigned_jwt["exp"] = now + ttl unsigned_jwt.update(extra_claims) if remove_claims: for remove_claim in remove_claims: @@ -118,9 +119,10 @@ def construct_oidc_id_token(self, client_id: str, ttl: int, extra_claims=None, r "sub": client_id, "aud": client_id, "iat": now, - "exp": now + ttl, "jti": str(uuid.uuid4()), } + if ttl: + unsigned_jwt["exp"] = now + ttl if extra_claims: # Note: this is clobbering of the claims above! might be fine # for this test class, but be warned if you copy-paste somewhere. @@ -322,9 +324,10 @@ def _construct_oidc_credential( credential_data = { "token_type": "Bearer", - "expires_in": self._mock_client_config.stub_authority_ttl, "scope": " ".join(requested_scopes), } + if self._mock_client_config.stub_authority_ttl: + credential_data["expires_in"] = self._mock_client_config.stub_authority_ttl if get_access_token: credential_data["access_token"] = jwt_access_token if get_id_token: From 792618d9409873c1c2c06afe94d015d82065d0e4 Mon Sep 17 00:00:00 2001 From: Carl Alexander Adams Date: Sun, 5 Oct 2025 10:28:05 -0700 Subject: [PATCH 5/6] minor code de-dupe in utils --- src/planet_auth/storage_utils.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/planet_auth/storage_utils.py b/src/planet_auth/storage_utils.py index f429aa7..7b39bab 100644 --- a/src/planet_auth/storage_utils.py +++ b/src/planet_auth/storage_utils.py @@ -364,14 +364,17 @@ def update_data(self, sparse_update_data): new_data = sparse_update_data self.set_data(new_data) - def set_data(self, data): + def set_data(self, data, copy_data: bool = True): """ Set the current in memory data. The data will be checked for validity before in memory values are set. Invalid data will result in an exception being thrown and no change being made to the in memory object. """ self.check_data(data) - self._data = data.copy() + if copy_data: + self._data = data.copy() + else: + self._data = data self._load_time = int(time.time()) def check_data(self, data): @@ -459,9 +462,7 @@ def load(self): return # we now allow in memory operation. Should we raise an error if the current data is invalid? new_data = self._object_storage_provider.load_obj(self._file_path) - self.check_data(new_data) - self._data = new_data - self._load_time = int(time.time()) + self.set_data(new_data, copy_data=False) def lazy_load(self): """ From 17d1c93848b6fe81c8eac6157fd13fc54300ae99 Mon Sep 17 00:00:00 2001 From: Carl Alexander Adams Date: Sun, 5 Oct 2025 16:00:59 -0700 Subject: [PATCH 6/6] fix typo --- src/planet_auth/oidc/token_validator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/planet_auth/oidc/token_validator.py b/src/planet_auth/oidc/token_validator.py index 0bf83bd..44b6f19 100644 --- a/src/planet_auth/oidc/token_validator.py +++ b/src/planet_auth/oidc/token_validator.py @@ -260,7 +260,7 @@ def validate_token( @InvalidArgumentException.recast(jwt.exceptions.DecodeError) def hazmat_unverified_decode(token_str) -> Tuple[Dict, Dict, Any]: """ - Decide a JWT without verifying the signature or any claims. + Decode a JWT without verifying the signature or any claims. !!! Warning Treat unverified token claims with extreme caution.