From ab1d074c4d2a0e49ab39c74e3c2ffc5b97f31936 Mon Sep 17 00:00:00 2001 From: nbayati <99771966+nbayati@users.noreply.github.com> Date: Wed, 11 Mar 2026 18:15:36 -0700 Subject: [PATCH 1/2] feat: implement updated design for regional access boundary Make the fetching async and non blocking. Implement proactive refresh every 6 hours. Centralize the logic in a new class. Remove no-op signal and checks. Refactor to Regional Access Boundary name. --- .../google-auth/google/auth/_constants.py | 20 +- packages/google-auth/google/auth/_helpers.py | 40 --- .../auth/_regional_access_boundary_utils.py | 263 ++++++++++++++ .../google/auth/compute_engine/credentials.py | 97 +++-- .../google-auth/google/auth/credentials.py | 194 +++++----- .../google/auth/external_account.py | 52 ++- .../auth/external_account_authorized_user.py | 38 +- .../google-auth/google/auth/identity_pool.py | 1 - .../google/auth/impersonated_credentials.py | 49 ++- packages/google-auth/google/oauth2/_client.py | 75 ++-- .../google/oauth2/_service_account_async.py | 15 + .../google-auth/google/oauth2/credentials.py | 7 +- .../google/oauth2/service_account.py | 49 +-- .../tests/compute_engine/test_credentials.py | 330 +----------------- .../google-auth/tests/oauth2/test__client.py | 104 +++--- .../tests/oauth2/test_service_account.py | 265 +------------- packages/google-auth/tests/test__helpers.py | 29 +- .../test__regional_access_boundary_utils.py | 326 +++++++++++++++++ packages/google-auth/tests/test_aws.py | 5 - .../google-auth/tests/test_credentials.py | 144 ++------ .../tests/test_external_account.py | 307 +--------------- .../test_external_account_authorized_user.py | 76 +--- .../google-auth/tests/test_identity_pool.py | 7 - .../tests/test_impersonated_credentials.py | 314 +---------------- packages/google-auth/tests/test_pluggable.py | 4 - 25 files changed, 1044 insertions(+), 1767 deletions(-) create mode 100644 packages/google-auth/google/auth/_regional_access_boundary_utils.py create mode 100644 packages/google-auth/tests/test__regional_access_boundary_utils.py diff --git a/packages/google-auth/google/auth/_constants.py b/packages/google-auth/google/auth/_constants.py index 28e47025fc4d..7611fd63d9ff 100644 --- a/packages/google-auth/google/auth/_constants.py +++ b/packages/google-auth/google/auth/_constants.py @@ -1,5 +1,19 @@ +# Copyright 2026 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + """Shared constants.""" -_SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations" -_WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/locations/global/workforcePools/{pool_id}/allowedLocations" -_WORKLOAD_IDENTITY_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations" +_SERVICE_ACCOUNT_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations" +_WORKFORCE_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.googleapis.com/v1/locations/global/workforcePools/{pool_id}/allowedLocations" +_WORKLOAD_IDENTITY_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.googleapis.com/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations" diff --git a/packages/google-auth/google/auth/_helpers.py b/packages/google-auth/google/auth/_helpers.py index 750631aa5fc8..19aabf51f826 100644 --- a/packages/google-auth/google/auth/_helpers.py +++ b/packages/google-auth/google/auth/_helpers.py @@ -308,46 +308,6 @@ def unpadded_urlsafe_b64encode(value): return base64.urlsafe_b64encode(value).rstrip(b"=") -def get_bool_from_env(variable_name, default=False): - """Gets a boolean value from an environment variable. - - The environment variable is interpreted as a boolean with the following - (case-insensitive) rules: - - "true", "1" are considered true. - - "false", "0" are considered false. - Any other values will raise an exception. - - Args: - variable_name (str): The name of the environment variable. - default (bool): The default value if the environment variable is not - set. - - Returns: - bool: The boolean value of the environment variable. - - Raises: - google.auth.exceptions.InvalidValue: If the environment variable is - set to a value that can not be interpreted as a boolean. - """ - value = os.environ.get(variable_name) - - if value is None: - return default - - value = value.lower() - - if value in ("true", "1"): - return True - elif value in ("false", "0"): - return False - else: - raise exceptions.InvalidValue( - 'Environment variable "{}" must be one of "true", "false", "1", or "0".'.format( - variable_name - ) - ) - - def is_python_3(): """Check if the Python interpreter is Python 2 or 3. diff --git a/packages/google-auth/google/auth/_regional_access_boundary_utils.py b/packages/google-auth/google/auth/_regional_access_boundary_utils.py new file mode 100644 index 000000000000..41f210d53593 --- /dev/null +++ b/packages/google-auth/google/auth/_regional_access_boundary_utils.py @@ -0,0 +1,263 @@ +# Copyright 2026 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utilities for Regional Access Boundary management.""" + +import datetime +import functools +import logging +import os +import threading +from typing import NamedTuple, Optional, Mapping, Any, MutableMapping + +from google.auth import _helpers +from google.auth import environment_vars + +_LOGGER = logging.getLogger(__name__) + + +@functools.lru_cache() +def is_regional_access_boundary_enabled(): + """Checks if Regional Access Boundary is enabled via environment variable. + + The environment variable is interpreted as a boolean with the following + (case-insensitive) rules: + - "true", "1" are considered true. + - Any other value (or unset) is considered false. + + Returns: + bool: True if Regional Access Boundary is enabled, False otherwise. + """ + value = os.environ.get(environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED) + if value is None: + return False + + return value.lower() in ("true", "1") + + +# The default lifetime for a cached Regional Access Boundary. +DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL = datetime.timedelta(hours=6) + +# The period of time prior to the boundary's expiration when a background refresh +# is proactively triggered. +REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD = datetime.timedelta(hours=1) + +# The initial cooldown period for a failed Regional Access Boundary lookup. +DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(minutes=15) + +# The maximum cooldown period for a failed Regional Access Boundary lookup. +MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(hours=6) + + +# The header key used for Regional Access Boundaries. +_REGIONAL_ACCESS_BOUNDARY_HEADER = "x-allowed-locations" + + +class _RegionalAccessBoundaryData(NamedTuple): + """Data container for a Regional Access Boundary snapshot. + + Attributes: + encoded_locations (Optional[str]): The encoded Regional Access Boundary string. + expiry (Optional[datetime.datetime]): The hard expiration time of the boundary data. + cooldown_expiry (Optional[datetime.datetime]): The time until which further lookups are skipped. + cooldown_duration (datetime.timedelta): The current duration for the exponential cooldown. + """ + + encoded_locations: Optional[str] + expiry: Optional[datetime.datetime] + cooldown_expiry: Optional[datetime.datetime] + cooldown_duration: datetime.timedelta + + +class _RegionalAccessBoundaryManager(object): + """Manages the Regional Access Boundary state and its background refresh. + + This class provides a stable container for the Regional Access Boundary state, + allowing cloned credentials to share the same underlying state and refresh mechanism. + The actual data is held in an immutable `_RegionalAccessBoundaryData` object + and is swapped atomically to ensure thread-safe, lock-free reads. + """ + + def __init__(self): + self._data = _RegionalAccessBoundaryData( + encoded_locations=None, + expiry=None, + cooldown_expiry=None, + cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, + ) + self.refresh_manager = _RegionalAccessBoundaryRefreshManager() + self._update_lock = threading.Lock() + + def apply_headers(self, headers): + """Applies the Regional Access Boundary header to the provided dictionary. + + If the boundary is valid, the 'x-allowed-locations' header is added + or updated. Otherwise, the header is removed to ensure no stale + data is sent. + + Args: + headers (MutableMapping[str, str]): The headers dictionary to update. + """ + rab_data = self._data + + if rab_data.encoded_locations is not None and ( + rab_data.expiry is not None and _helpers.utcnow() < rab_data.expiry + ): + headers[_REGIONAL_ACCESS_BOUNDARY_HEADER] = rab_data.encoded_locations + else: + headers.pop(_REGIONAL_ACCESS_BOUNDARY_HEADER, None) + + def maybe_start_refresh(self, credentials, request): + """Starts a background thread to refresh the Regional Access Boundary if needed. + + Args: + credentials (google.auth.credentials.Credentials): The credentials to refresh. + request (google.auth.transport.Request): The object used to make HTTP requests. + """ + rab_data = self._data + + # Don't start a new refresh if the Regional Access Boundary info is still fresh. + if ( + rab_data.encoded_locations + and rab_data.expiry + and _helpers.utcnow() + < (rab_data.expiry - REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD) + ): + return + + # Don't start a new refresh if the cooldown is still in effect. + if rab_data.cooldown_expiry and _helpers.utcnow() < rab_data.cooldown_expiry: + return + + # If all checks pass, start the background refresh. + self.refresh_manager.start_refresh(credentials, request, self) + + +class _RegionalAccessBoundaryRefreshThread(threading.Thread): + """Thread for background refreshing of the Regional Access Boundary.""" + + def __init__(self, credentials, request, rab_manager): + super(_RegionalAccessBoundaryRefreshThread, self).__init__() + self.daemon = True + self._credentials = credentials + self._request = request + self._rab_manager = rab_manager + + def run(self): + """ + Performs the Regional Access Boundary lookup and updates the state. + + This method is run in a separate thread. It delegates the actual lookup + to the credentials object's `_lookup_regional_access_boundary` method. + Based on the lookup's outcome (success or complete failure after retries), + it updates the cached Regional Access Boundary information, + its expiry, its cooldown expiry, and its exponential cooldown duration. + """ + # Catch exceptions (e.g., from the underlying transport) to prevent the + # background thread from crashing. This ensures we can gracefully enter + # an exponential cooldown state on failure. + try: + regional_access_boundary_info = ( + self._credentials._lookup_regional_access_boundary(self._request) + ) + except Exception as e: + if _helpers.is_logging_enabled(_LOGGER): + _LOGGER.warning( + "Asynchronous Regional Access Boundary lookup raised an exception: %s", + e, + exc_info=True, + ) + regional_access_boundary_info = None + + with self._rab_manager._update_lock: + # Capture the current state before calculating updates. + current_data = self._rab_manager._data + + if regional_access_boundary_info: + # On success, update the boundary and its expiry, and clear any cooldown. + encoded_locations = regional_access_boundary_info.get( + "encodedLocations" + ) + updated_data = _RegionalAccessBoundaryData( + encoded_locations=encoded_locations, + expiry=_helpers.utcnow() + DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL, + cooldown_expiry=None, + cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, + ) + if _helpers.is_logging_enabled(_LOGGER): + _LOGGER.debug( + "Asynchronous Regional Access Boundary lookup successful." + ) + else: + # On failure, calculate cooldown and update state. + if _helpers.is_logging_enabled(_LOGGER): + _LOGGER.warning( + "Asynchronous Regional Access Boundary lookup failed. Entering cooldown." + ) + + next_cooldown_expiry = ( + _helpers.utcnow() + current_data.cooldown_duration + ) + next_cooldown_duration = min( + current_data.cooldown_duration * 2, + MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, + ) + + # If the refresh failed, we keep reusing the existing data unless + # it has reached its hard expiration time. + if current_data.expiry and _helpers.utcnow() > current_data.expiry: + next_encoded_locations = None + next_expiry = None + else: + next_encoded_locations = current_data.encoded_locations + next_expiry = current_data.expiry + + updated_data = _RegionalAccessBoundaryData( + encoded_locations=next_encoded_locations, + expiry=next_expiry, + cooldown_expiry=next_cooldown_expiry, + cooldown_duration=next_cooldown_duration, + ) + + # Perform the atomic swap of the state object. + self._rab_manager._data = updated_data + + +class _RegionalAccessBoundaryRefreshManager(object): + """Manages a thread for background refreshing of the Regional Access Boundary.""" + + def __init__(self): + self._lock = threading.Lock() + self._worker = None + + def start_refresh(self, credentials, request, rab_manager): + """ + Starts a background thread to refresh the Regional Access Boundary if one is not already running. + + Args: + credentials (CredentialsWithRegionalAccessBoundary): The credentials + to refresh. + request (google.auth.transport.Request): The object used to make + HTTP requests. + rab_manager (_RegionalAccessBoundaryManager): The manager container to update. + """ + with self._lock: + if self._worker and self._worker.is_alive(): + # A refresh is already in progress. + return + + self._worker = _RegionalAccessBoundaryRefreshThread( + credentials, request, rab_manager + ) + self._worker.start() diff --git a/packages/google-auth/google/auth/compute_engine/credentials.py b/packages/google-auth/google/auth/compute_engine/credentials.py index 9507e837fbff..795f46663543 100644 --- a/packages/google-auth/google/auth/compute_engine/credentials.py +++ b/packages/google-auth/google/auth/compute_engine/credentials.py @@ -20,7 +20,9 @@ """ import datetime +import logging +from google.auth import _constants from google.auth import _helpers from google.auth import credentials from google.auth import exceptions @@ -30,16 +32,14 @@ from google.auth.compute_engine import _metadata from google.oauth2 import _client -_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( - "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" -) +_LOGGER = logging.getLogger(__name__) class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithUniverseDomain, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, ): """Compute Engine Credentials. @@ -93,7 +93,13 @@ def __init__( if universe_domain: self._universe_domain = universe_domain self._universe_domain_cached = True - self._trust_boundary = trust_boundary + + if trust_boundary is not None: + warnings.warn( + "The trust_boundary parameter is deprecated and has no effect.", + DeprecationWarning, + stacklevel=2, + ) def _retrieve_info(self, request): """Retrieve information about the service account. @@ -146,35 +152,38 @@ def _perform_refresh_token(self, request): new_exc = exceptions.RefreshError(caught_exc) raise new_exc from caught_exc - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API for GCE.""" + def _build_regional_access_boundary_lookup_url(self, request=None): + """Builds and returns the URL for the regional access boundary lookup API for GCE.""" # If the service account email is 'default', we need to get the # actual email address from the metadata server. if self._service_account_email == "default": - from google.auth.transport import requests as google_auth_requests + if request is None: + from google.auth.transport import requests as google_auth_requests - request = google_auth_requests.Request() + request = google_auth_requests.Request() try: info = _metadata.get_service_account_info(request, "default") if not info or "email" not in info: - raise exceptions.RefreshError( + _LOGGER.error( "Unexpected response from metadata server: " - "service account info is missing 'email' field." + "service account info is missing 'email' field. Cannot build Regional Access Boundary lookup URL." ) + return None self._service_account_email = info["email"] except exceptions.TransportError as e: # If fetching the service account email fails due to a transport error, - # it means we cannot build the trust boundary lookup URL. - # Wrap this in a RefreshError so it's caught by _refresh_trust_boundary. - raise exceptions.RefreshError( - "Failed to get service account email for trust boundary lookup: {}".format( - e - ) - ) from e + # it means we cannot build the regional access boundary lookup URL. + _LOGGER.error( + "Failed to get service account email to build Regional Access Boundary lookup URL: %s", + e, + ) + return None - return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - self.universe_domain, self.service_account_email + return ( + _constants._SERVICE_ACCOUNT_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( + service_account_email=self.service_account_email + ) ) @property @@ -211,17 +220,22 @@ def get_cred_info(self): "principal": self.service_account_email, } - @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) - def with_quota_project(self, quota_project_id): + def _make_copy(self): creds = self.__class__( service_account_email=self._service_account_email, - quota_project_id=quota_project_id, + quota_project_id=self._quota_project_id, scopes=self._scopes, default_scopes=self._default_scopes, universe_domain=self._universe_domain, - trust_boundary=self._trust_boundary, ) creds._universe_domain_cached = self._universe_domain_cached + self._copy_regional_access_boundary_manager(creds) + return creds + + @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) + def with_quota_project(self, quota_project_id): + creds = self._make_copy() + creds._quota_project_id = quota_project_id return creds @_helpers.copy_docstring(credentials.Scoped) @@ -229,39 +243,16 @@ def with_scopes(self, scopes, default_scopes=None): # Compute Engine credentials can not be scoped (the metadata service # ignores the scopes parameter). App Engine, Cloud Run and Flex support # requesting scopes. - creds = self.__class__( - scopes=scopes, - default_scopes=default_scopes, - service_account_email=self._service_account_email, - quota_project_id=self._quota_project_id, - universe_domain=self._universe_domain, - trust_boundary=self._trust_boundary, - ) - creds._universe_domain_cached = self._universe_domain_cached + creds = self._make_copy() + creds._scopes = scopes + creds._default_scopes = default_scopes return creds @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) def with_universe_domain(self, universe_domain): - return self.__class__( - scopes=self._scopes, - default_scopes=self._default_scopes, - service_account_email=self._service_account_email, - quota_project_id=self._quota_project_id, - trust_boundary=self._trust_boundary, - universe_domain=universe_domain, - ) - - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - creds = self.__class__( - service_account_email=self._service_account_email, - quota_project_id=self._quota_project_id, - scopes=self._scopes, - default_scopes=self._default_scopes, - universe_domain=self._universe_domain, - trust_boundary=trust_boundary, - ) - creds._universe_domain_cached = self._universe_domain_cached + creds = self._make_copy() + creds._universe_domain = universe_domain + creds._universe_domain_cached = True return creds diff --git a/packages/google-auth/google/auth/credentials.py b/packages/google-auth/google/auth/credentials.py index cdb20653277b..9905bf073b50 100644 --- a/packages/google-auth/google/auth/credentials.py +++ b/packages/google-auth/google/auth/credentials.py @@ -19,17 +19,16 @@ from enum import Enum import logging import os -from typing import List +from urllib.parse import urlparse from google.auth import _helpers, environment_vars +from google.auth import _regional_access_boundary_utils from google.auth import exceptions from google.auth import metrics from google.auth._credentials_base import _BaseCredentials from google.auth._refresh_worker import RefreshThreadManager DEFAULT_UNIVERSE_DOMAIN = "googleapis.com" -NO_OP_TRUST_BOUNDARY_LOCATIONS: List[str] = [] -NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0" _LOGGER = logging.getLogger("google.auth._default") @@ -290,8 +289,14 @@ def with_universe_domain(self, universe_domain): ) -class CredentialsWithTrustBoundary(Credentials): - """Abstract base for credentials supporting ``with_trust_boundary`` factory""" +class CredentialsWithRegionalAccessBoundary(Credentials): + """Abstract base for credentials supporting regional access boundary configuration.""" + + def __init__(self, *args, **kwargs): + super(CredentialsWithRegionalAccessBoundary, self).__init__(*args, **kwargs) + self._rab_manager = ( + _regional_access_boundary_utils._RegionalAccessBoundaryManager() + ) @abc.abstractmethod def _perform_refresh_token(self, request): @@ -308,140 +313,148 @@ def _perform_refresh_token(self, request): raise NotImplementedError("_perform_refresh_token must be implemented") def with_trust_boundary(self, trust_boundary): - """Returns a copy of these credentials with a modified trust boundary. + """Returns a copy of these credentials. + + .. deprecated:: + Manual Regional Access Boundary overrides are not supported. + This method is maintained for backwards compatibility and + returns a copy of the credentials without modifying the + Regional Access Boundary state. Args: - trust_boundary Mapping[str, str]: The trust boundary to use for the - credential. This should be a map with a "locations" key that maps to - a list of GCP regions, and a "encodedLocations" key that maps to a - hex string. + trust_boundary (Mapping[str, str]): Ignored. Returns: google.auth.credentials.Credentials: A new credentials instance. """ - raise NotImplementedError("This credential does not support trust boundaries.") + import warnings + + warnings.warn( + "with_trust_boundary is deprecated and has no effect.", + DeprecationWarning, + stacklevel=2, + ) + return self._make_copy() + + def _copy_regional_access_boundary_manager(self, target): + """Copies the regional access boundary manager to another instance.""" + target._rab_manager = self._rab_manager + + def _maybe_start_regional_access_boundary_refresh(self, request, url): + """ + Starts a background thread to refresh the Regional Access Boundary if needed. + + This method checks if a refresh is necessary and if one is not already + in progress or in a cooldown period. If so, it starts a background + thread to perform the lookup. + + Args: + request (google.auth.transport.Request): The object used to make + HTTP requests. + url (str): The URL of the request. + """ + try: + # Do not perform a lookup if the request is for a regional endpoint. + hostname = urlparse(url).hostname + if hostname and ( + hostname.endswith(".rep.googleapis.com") + or hostname.endswith(".rep.sandbox.googleapis.com") + ): + return + except (ValueError, TypeError): + # If the URL is malformed, proceed with the default lookup behavior. + pass + + # A refresh is only needed if the feature is enabled. + if not self._is_regional_access_boundary_lookup_required(): + return + + # Start the background refresh if needed. + self._rab_manager.maybe_start_refresh(self, request) - def _is_trust_boundary_lookup_required(self): - """Checks if a trust boundary lookup is required. + def _is_regional_access_boundary_lookup_required(self): + """Checks if a Regional Access Boundary lookup is required. A lookup is required if the feature is enabled via an environment - variable, the universe domain is supported, and a no-op boundary - is not already cached. + variable and the universe domain is supported. Returns: - bool: True if a trust boundary lookup is required, False otherwise. + bool: True if a Regional Access Boundary lookup is required, False otherwise. """ - # 1. Check if the feature is enabled via environment variable. - if not _helpers.get_bool_from_env( - environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, default=False - ): + # 1. Check if the feature is enabled. + if not _regional_access_boundary_utils.is_regional_access_boundary_enabled(): return False - # 2. Skip trust boundary flow for non-default universe domains. + # 2. Skip for non-default universe domains. if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN: return False - # 3. Do not trigger refresh if credential has a cached no-op trust boundary. - return not self._has_no_op_trust_boundary() - - def _get_trust_boundary_header(self): - if self._trust_boundary is not None: - if self._has_no_op_trust_boundary(): - # STS expects an empty string if the trust boundary value is no-op. - return {"x-allowed-locations": ""} - else: - return {"x-allowed-locations": self._trust_boundary["encodedLocations"]} - return {} + return True def apply(self, headers, token=None): """Apply the token to the authentication header.""" super().apply(headers, token) - headers.update(self._get_trust_boundary_header()) + self._rab_manager.apply_headers(headers) - def refresh(self, request): - """Refreshes the access token and the trust boundary. - - This method calls the subclass's token refresh logic and then - refreshes the trust boundary if applicable. + def before_request(self, request, method, url, headers): + """Refreshes the access token and triggers the Regional Access Boundary + lookup if necessary. """ - self._perform_refresh_token(request) - self._refresh_trust_boundary(request) - - def _refresh_trust_boundary(self, request): - """Triggers a refresh of the trust boundary and updates the cache if necessary. + super(CredentialsWithRegionalAccessBoundary, self).before_request( + request, method, url, headers + ) + self._maybe_start_regional_access_boundary_refresh(request, url) - Args: - request (google.auth.transport.Request): The object used to make - HTTP requests. + def refresh(self, request): + """Refreshes the access token. - Raises: - google.auth.exceptions.RefreshError: If the trust boundary could - not be refreshed and no cached value is available. + This method calls the subclass's token refresh logic. The Regional + Access Boundary is refreshed separately in a non-blocking way. """ - if not self._is_trust_boundary_lookup_required(): - return - try: - self._trust_boundary = self._lookup_trust_boundary(request) - except exceptions.RefreshError as error: - # If the call to the lookup API failed, check if there is a trust boundary - # already cached. If there is, do nothing. If not, then throw the error. - if self._trust_boundary is None: - raise error - if _helpers.is_logging_enabled(_LOGGER): - _LOGGER.debug( - "Using cached trust boundary due to refresh error: %s", error - ) - return + self._perform_refresh_token(request) - def _lookup_trust_boundary(self, request): - """Calls the trust boundary lookup API to refresh the trust boundary cache. + def _lookup_regional_access_boundary(self, request): + """Calls the Regional Access Boundary lookup API to retrieve the Regional Access Boundary information. Args: request (google.auth.transport.Request): The object used to make HTTP requests. Returns: - trust_boundary (dict): The trust boundary object returned by the lookup API. - - Raises: - google.auth.exceptions.RefreshError: If the trust boundary could not be - retrieved. + Optional[dict]: The Regional Access Boundary information returned by the lookup API, or None if the lookup failed. """ from google.oauth2 import _client - url = self._build_trust_boundary_lookup_url() + url = self._build_regional_access_boundary_lookup_url(request=request) if not url: - raise exceptions.InvalidValue("Failed to build trust boundary lookup URL.") + _LOGGER.error("Failed to build Regional Access Boundary lookup URL.") + return None headers = {} self._apply(headers) - headers.update(self._get_trust_boundary_header()) - return _client._lookup_trust_boundary(request, url, headers=headers) + self._rab_manager.apply_headers(headers) + return _client._lookup_regional_access_boundary(request, url, headers=headers) @abc.abstractmethod - def _build_trust_boundary_lookup_url(self): + def _build_regional_access_boundary_lookup_url(self, request=None): """ - Builds and returns the URL for the trust boundary lookup API. + Builds and returns the URL for the Regional Access Boundary lookup API. This method should be implemented by subclasses to provide the specific URL based on the credential type and its properties. + Args: + request (Optional[google.auth.transport.Request]): The object used + to make HTTP requests. In some subclasses, this may be used to + make an initial network call to resolve required metadata for the + URL. + Returns: - str: The URL for the trust boundary lookup endpoint, or None + str: The URL for the Regional Access Boundary lookup endpoint, or None if lookup should be skipped (e.g., for non-applicable universe domains). """ raise NotImplementedError( - "_build_trust_boundary_lookup_url must be implemented" - ) - - def _has_no_op_trust_boundary(self): - # A no-op trust boundary is indicated by encodedLocations being "0x0". - # The "locations" list may or may not be present as an empty list. - if self._trust_boundary is None: - return False - return ( - self._trust_boundary.get("encodedLocations") - == NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS + "_build_regional_access_boundary_lookup_url must be implemented" ) @@ -665,3 +678,8 @@ class TokenState(Enum): FRESH = 1 STALE = 2 INVALID = 3 + + +# For backwards compatibility with the previus feature name. +# Use CredentialsWithRegionalAccessBoundary instead. +CredentialsWithTrustBoundary = CredentialsWithRegionalAccessBoundary diff --git a/packages/google-auth/google/auth/external_account.py b/packages/google-auth/google/auth/external_account.py index 05874eda7fb1..b77215195af4 100644 --- a/packages/google-auth/google/auth/external_account.py +++ b/packages/google-auth/google/auth/external_account.py @@ -34,6 +34,7 @@ import functools import io import json +import logging import re from google.auth import _constants @@ -45,6 +46,8 @@ from google.oauth2 import sts from google.oauth2 import utils +_LOGGER = logging.getLogger(__name__) + # External account JSON type identifier. _EXTERNAL_ACCOUNT_JSON_TYPE = "external_account" # The token exchange grant_type used for exchanging credentials. @@ -82,7 +85,7 @@ class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithTokenUri, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, metaclass=abc.ABCMeta, ): """Base class for all external account credentials. @@ -202,6 +205,13 @@ def __init__( "credentials" ) + if trust_boundary is not None: + warnings.warn( + "The trust_boundary parameter is deprecated and has no effect.", + DeprecationWarning, + stacklevel=2, + ) + @property def info(self): """Generates the dictionary representation of the current credentials. @@ -349,6 +359,7 @@ def with_scopes(self, scopes, default_scopes=None): scoped = self.__class__(**kwargs) scoped._cred_file_path = self._cred_file_path scoped._metrics_options = self._metrics_options + self._copy_regional_access_boundary_manager(scoped) return scoped @abc.abstractmethod @@ -417,20 +428,9 @@ def refresh(self, request): """Refreshes the access token. For impersonated credentials, this method will refresh the underlying - source credentials and the impersonated credentials. For non-impersonated - credentials, it will refresh the access token and the trust boundary. + source credentials and the impersonated credentials. """ self._perform_refresh_token(request) - self._handle_trust_boundary(request) - - def _handle_trust_boundary(self, request): - # If we are impersonating, the trust boundary is handled by the - # impersonated credentials object. We need to get it from there. - if self._service_account_impersonation_url: - self._trust_boundary = self._impersonated_credentials._trust_boundary - else: - # Otherwise, refresh the trust boundary for the external account. - self._refresh_trust_boundary(request) def _perform_refresh_token(self, request, cert_fingerprint=None): scopes = self._scopes if self._scopes is not None else self._default_scopes @@ -486,8 +486,8 @@ def _perform_refresh_token(self, request, cert_fingerprint=None): self.expiry = now + lifetime - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API.""" + def _build_regional_access_boundary_lookup_url(self, request=None): + """Builds and returns the URL for the Regional Access Boundary lookup API.""" url = None # Try to parse as a workload identity pool. # Audience format: //iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID @@ -497,8 +497,7 @@ def _build_trust_boundary_lookup_url(self): ) if workload_match: project_number, pool_id = workload_match.groups() - url = _constants._WORKLOAD_IDENTITY_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - universe_domain=self._universe_domain, + url = _constants._WORKLOAD_IDENTITY_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( project_number=project_number, pool_id=pool_id, ) @@ -510,21 +509,26 @@ def _build_trust_boundary_lookup_url(self): ) if workforce_match: pool_id = workforce_match.groups()[0] - url = _constants._WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - universe_domain=self._universe_domain, pool_id=pool_id + url = _constants._WORKFORCE_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( + pool_id=pool_id ) if url: return url else: # If both fail, the audience format is invalid. - raise exceptions.InvalidValue("Invalid audience format.") + _LOGGER.error( + "Invalid audience format for Regional Access Boundary lookup: %s", + self._audience, + ) + return None def _make_copy(self): kwargs = self._constructor_args() new_cred = self.__class__(**kwargs) new_cred._cred_file_path = self._cred_file_path new_cred._metrics_options = self._metrics_options + self._copy_regional_access_boundary_manager(new_cred) return new_cred @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) @@ -546,12 +550,6 @@ def with_universe_domain(self, universe_domain): cred._universe_domain = universe_domain return cred - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - cred = self._make_copy() - cred._trust_boundary = trust_boundary - return cred - def _should_initialize_impersonated_credentials(self): return ( self._service_account_impersonation_url is not None @@ -600,7 +598,6 @@ def _initialize_impersonated_credentials(self): lifetime=self._service_account_impersonation_options.get( "token_lifetime_seconds" ), - trust_boundary=self._trust_boundary, ) def _create_default_metrics_options(self): @@ -687,7 +684,6 @@ def from_info(cls, info, **kwargs): universe_domain=info.get( "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN ), - trust_boundary=info.get("trust_boundary"), **kwargs ) diff --git a/packages/google-auth/google/auth/external_account_authorized_user.py b/packages/google-auth/google/auth/external_account_authorized_user.py index 680fce628e2c..ac562d689df1 100644 --- a/packages/google-auth/google/auth/external_account_authorized_user.py +++ b/packages/google-auth/google/auth/external_account_authorized_user.py @@ -36,6 +36,7 @@ import datetime import io import json +import logging import re from google.auth import _constants @@ -45,6 +46,8 @@ from google.oauth2 import sts from google.oauth2 import utils +_LOGGER = logging.getLogger(__name__) + _EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE = "external_account_authorized_user" @@ -52,7 +55,7 @@ class Credentials( credentials.CredentialsWithQuotaProject, credentials.ReadOnlyScoped, credentials.CredentialsWithTokenUri, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, ): """Credentials for External Account Authorized Users. @@ -87,7 +90,6 @@ def __init__( scopes=None, quota_project_id=None, universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ): """Instantiates a external account authorized user credentials object. @@ -113,7 +115,6 @@ def __init__( create the credentials. universe_domain (Optional[str]): The universe domain. The default value is googleapis.com. - trust_boundary (Mapping[str,str]): A credential trust boundary. Returns: google.auth.external_account_authorized_user.Credentials: The @@ -134,7 +135,6 @@ def __init__( self._scopes = scopes self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN self._cred_file_path = None - self._trust_boundary = trust_boundary if not self.valid and not self.can_refresh: raise exceptions.InvalidOperation( @@ -182,7 +182,6 @@ def constructor_args(self): "scopes": self._scopes, "quota_project_id": self._quota_project_id, "universe_domain": self._universe_domain, - "trust_boundary": self._trust_boundary, } @property @@ -308,18 +307,29 @@ def _perform_refresh_token(self, request): if "refresh_token" in response_data: self._refresh_token = response_data["refresh_token"] - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API.""" + def _build_regional_access_boundary_lookup_url(self, request=None): + """Builds and returns the URL for the Regional Access Boundary lookup API. + + Returns: + Optional[str]: The URL for the Regional Access Boundary lookup endpoint, or None + if the URL cannot be built due to an invalid workforce pool audience format. + """ # Audience format: //iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID match = re.search(r"locations/[^/]+/workforcePools/([^/]+)", self._audience) if not match: - raise exceptions.InvalidValue("Invalid workforce pool audience format.") + _LOGGER.error( + "Invalid workforce pool audience format for Regional Access Boundary lookup: %s", + self._audience, + ) + return None pool_id = match.groups()[0] - return _constants._WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - universe_domain=self._universe_domain, pool_id=pool_id + return ( + _constants._WORKFORCE_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( + pool_id=pool_id + ) ) def revoke(self, request): @@ -359,6 +369,7 @@ def _make_copy(self): kwargs = self.constructor_args() cred = self.__class__(**kwargs) cred._cred_file_path = self._cred_file_path + self._copy_regional_access_boundary_manager(cred) return cred @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) @@ -379,12 +390,6 @@ def with_universe_domain(self, universe_domain): cred._universe_domain = universe_domain return cred - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - cred = self._make_copy() - cred._trust_boundary = trust_boundary - return cred - @classmethod def from_info(cls, info, **kwargs): """Creates a Credentials instance from parsed external account info. @@ -429,7 +434,6 @@ def from_info(cls, info, **kwargs): universe_domain=info.get( "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN ), - trust_boundary=info.get("trust_boundary"), **kwargs ) diff --git a/packages/google-auth/google/auth/identity_pool.py b/packages/google-auth/google/auth/identity_pool.py index 50b2a83e4356..30819ef0485a 100644 --- a/packages/google-auth/google/auth/identity_pool.py +++ b/packages/google-auth/google/auth/identity_pool.py @@ -572,4 +572,3 @@ def refresh(self, request): ) self._perform_refresh_token(request, cert_fingerprint=cert_fingerprint) - self._handle_trust_boundary(request) diff --git a/packages/google-auth/google/auth/impersonated_credentials.py b/packages/google-auth/google/auth/impersonated_credentials.py index 304f0606ed85..0eb77da34348 100644 --- a/packages/google-auth/google/auth/impersonated_credentials.py +++ b/packages/google-auth/google/auth/impersonated_credentials.py @@ -30,7 +30,9 @@ from datetime import datetime import http.client as http_client import json +import logging +from google.auth import _constants from google.auth import _exponential_backoff from google.auth import _helpers from google.auth import credentials @@ -40,15 +42,14 @@ from google.auth import metrics from google.oauth2 import _client +_LOGGER = logging.getLogger(__name__) _REFRESH_ERROR = "Unable to acquire impersonated credentials" _DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds _GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" -_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( - "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" -) + _SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE = "authorized_user" _SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE = "service_account" @@ -123,7 +124,7 @@ class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.Signing, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, ): """This module defines impersonated credentials which are essentially impersonated identities. @@ -267,7 +268,13 @@ def __init__( self._quota_project_id = quota_project_id self._iam_endpoint_override = iam_endpoint_override self._cred_file_path = None - self._trust_boundary = trust_boundary + + if trust_boundary is not None: + warnings.warn( + "The trust_boundary parameter is deprecated and has no effect.", + DeprecationWarning, + stacklevel=2, + ) def _metric_header_for_usage(self): return metrics.CRED_TYPE_SA_IMPERSONATE @@ -344,26 +351,26 @@ def _perform_refresh_token(self, request): iam_endpoint_override=self._iam_endpoint_override, ) - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API. + def _build_regional_access_boundary_lookup_url(self, request=None): + """Builds and returns the URL for the Regional Access Boundary lookup API. This method constructs the specific URL for the IAM Credentials API's `allowedLocations` endpoint, using the credential's universe domain and service account email. - Raises: - ValueError: If `self.service_account_email` is None or an empty - string, as it's required to form the URL. - Returns: - str: The URL for the trust boundary lookup endpoint. + Optional[str]: The URL for the Regional Access Boundary lookup endpoint, or None + if the service account email is missing. """ if not self.service_account_email: - raise ValueError( - "Service account email is required to build the trust boundary lookup URL." + _LOGGER.error( + "Service account email is required to build the Regional Access Boundary lookup URL for impersonated credentials." + ) + return None + return ( + _constants._SERVICE_ACCOUNT_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( + service_account_email=self.service_account_email ) - return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - self.universe_domain, self.service_account_email ) def sign_bytes(self, message): @@ -435,15 +442,9 @@ def _make_copy(self): lifetime=self._lifetime, quota_project_id=self._quota_project_id, iam_endpoint_override=self._iam_endpoint_override, - trust_boundary=self._trust_boundary, ) cred._cred_file_path = self._cred_file_path - return cred - - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - cred = self._make_copy() - cred._trust_boundary = trust_boundary + self._copy_regional_access_boundary_manager(cred) return cred @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) @@ -527,7 +528,6 @@ def from_impersonated_service_account_info(cls, info, scopes=None): delegates = info.get("delegates") quota_project_id = info.get("quota_project_id") scopes = scopes or info.get("scopes") - trust_boundary = info.get("trust_boundary") return cls( source_credentials, @@ -535,7 +535,6 @@ def from_impersonated_service_account_info(cls, info, scopes=None): scopes, delegates, quota_project_id=quota_project_id, - trust_boundary=trust_boundary, ) diff --git a/packages/google-auth/google/oauth2/_client.py b/packages/google-auth/google/oauth2/_client.py index d4db420070ef..4d76ed9a9a2b 100644 --- a/packages/google-auth/google/oauth2/_client.py +++ b/packages/google-auth/google/oauth2/_client.py @@ -26,6 +26,7 @@ import datetime import http.client as http_client import json +import logging import urllib from google.auth import _exponential_backoff @@ -36,6 +37,8 @@ from google.auth import metrics from google.auth import transport +_LOGGER = logging.getLogger(__name__) + _URLENCODED_CONTENT_TYPE = "application/x-www-form-urlencoded" _JSON_CONTENT_TYPE = "application/json" _JWT_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:jwt-bearer" @@ -514,18 +517,18 @@ def refresh_grant( return _handle_refresh_grant_response(response_data, refresh_token) -def _lookup_trust_boundary(request, url, headers=None): - """Implements the global lookup of a credential trust boundary. +def _lookup_regional_access_boundary(request, url, headers=None): + """Implements the global lookup of a credential Regional Access Boundary. For the lookup, we send a request to the global lookup endpoint and then parse the response. Service account credentials, workload identity - pools and workforce pools implementation may have trust boundaries configured. + pools and workforce pools implementation may have Regional Access Boundaries configured. Args: request (google.auth.transport.Request): A callable used to make HTTP requests. - url (str): The trust boundary lookup url. + url (str): The Regional Access Boundary lookup url. headers (Optional[Mapping[str, str]]): The headers for the request. Returns: - Mapping[str,list|str]: A dictionary containing + Optional[Mapping[str,list|str]]: A dictionary containing "locations" as a list of allowed locations as strings and "encodedLocations" as a hex string. e.g: @@ -535,61 +538,66 @@ def _lookup_trust_boundary(request, url, headers=None): ], "encodedLocations": "0xA30" } - If the credential is not set up with explicit trust boundaries, a trust boundary - of "all" will be returned as a default response. - { - "locations": [], - "encodedLocations": "0x0" - } - Raises: - exceptions.RefreshError: If the response status code is not 200. - exceptions.MalformedError: If the response is not in a valid format. """ - response_data = _lookup_trust_boundary_request(request, url, headers=headers) - # In case of no-op response, the "locations" list may or may not be present as an empty list. + response_data = _lookup_regional_access_boundary_request( + request, url, headers=headers + ) + if response_data is None: + # Error was already logged by _lookup_regional_access_boundary_request + return None + if "encodedLocations" not in response_data: - raise exceptions.MalformedError( - "Invalid trust boundary info: {}".format(response_data) + _LOGGER.error( + "Regional Access Boundary response malformed: missing 'encodedLocations' key in %s", + response_data, ) + return None return response_data -def _lookup_trust_boundary_request(request, url, can_retry=True, headers=None): - """Makes a request to the trust boundary lookup endpoint. +def _lookup_regional_access_boundary_request( + request, url, can_retry=True, headers=None +): + """Makes a request to the Regional Access Boundary lookup endpoint. Args: request (google.auth.transport.Request): A callable used to make HTTP requests. - url (str): The trust boundary lookup url. + url (str): The Regional Access Boundary lookup url. can_retry (bool): Enable or disable request retry behavior. Defaults to true. headers (Optional[Mapping[str, str]]): The headers for the request. Returns: - Mapping[str, str]: The JSON-decoded response data. - - Raises: - google.auth.exceptions.RefreshError: If the token endpoint returned - an error. + Optional[Mapping[str, str]]: The JSON-decoded response data on success, or None on failure. """ ( response_status_ok, response_data, retryable_error, - ) = _lookup_trust_boundary_request_no_throw(request, url, can_retry, headers) + ) = _lookup_regional_access_boundary_request_no_throw( + request, url, can_retry, headers + ) if not response_status_ok: - _handle_error_response(response_data, retryable_error) + _LOGGER.warning( + "Regional Access Boundary HTTP request failed after retries: response_data=%s, retryable_error=%s", + response_data, + retryable_error, + ) + return None return response_data -def _lookup_trust_boundary_request_no_throw(request, url, can_retry=True, headers=None): - """Makes a request to the trust boundary lookup endpoint. This +def _lookup_regional_access_boundary_request_no_throw( + request, url, can_retry=True, headers=None +): + """Makes a request to the Regional Access Boundary lookup endpoint. This function doesn't throw on response errors. Args: request (google.auth.transport.Request): A callable used to make HTTP requests. - url (str): The trust boundary lookup url. + url (str): The Regional Access Boundary lookup url. can_retry (bool): Enable or disable request retry behavior. Defaults to true. headers (Optional[Mapping[str, str]]): The headers for the request. @@ -603,7 +611,7 @@ def _lookup_trust_boundary_request_no_throw(request, url, can_retry=True, header response_data = {} retryable_error = False - retries = _exponential_backoff.ExponentialBackoff() + retries = _exponential_backoff.ExponentialBackoff(total_attempts=6) for _ in retries: response = request(method="GET", url=url, headers=headers) response_body = ( @@ -624,6 +632,9 @@ def _lookup_trust_boundary_request_no_throw(request, url, can_retry=True, header retryable_error = _can_retry( status_code=response.status, response_data=response_data ) + # Add 502 (Bad Gateway) as a retryable error for RAB lookups. + if response.status == http_client.BAD_GATEWAY: + retryable_error = True if not can_retry or not retryable_error: return False, response_data, retryable_error diff --git a/packages/google-auth/google/oauth2/_service_account_async.py b/packages/google-auth/google/oauth2/_service_account_async.py index cfd315a7ff1f..fa6cfb7b7d7a 100644 --- a/packages/google-auth/google/oauth2/_service_account_async.py +++ b/packages/google-auth/google/oauth2/_service_account_async.py @@ -75,6 +75,13 @@ async def refresh(self, request): self.token = access_token self.expiry = expiry + @_helpers.copy_docstring(credentials_async.Credentials) + async def before_request(self, request, method, url, headers): + # Explicit override to bypass synchronous CredentialsWithRegionalAccessBoundary. + await credentials_async.Credentials.before_request( + self, request, method, url, headers + ) + class IDTokenCredentials( service_account.IDTokenCredentials, @@ -130,3 +137,11 @@ async def refresh(self, request): ) self.token = access_token self.expiry = expiry + + @_helpers.copy_docstring(credentials_async.Credentials) + async def before_request(self, request, method, url, headers): + # Explicit override to bypass synchronous CredentialsWithRegionalAccessBoundary + # and disable Regional Access Boundary refresh for async credentials. + await credentials_async.Credentials.before_request( + self, request, method, url, headers + ) diff --git a/packages/google-auth/google/oauth2/credentials.py b/packages/google-auth/google/oauth2/credentials.py index ae60223b455e..f729f0938223 100644 --- a/packages/google-auth/google/oauth2/credentials.py +++ b/packages/google-auth/google/oauth2/credentials.py @@ -154,7 +154,12 @@ def __init__( self._rapt_token = rapt_token self.refresh_handler = refresh_handler self._enable_reauth_refresh = enable_reauth_refresh - self._trust_boundary = trust_boundary + if trust_boundary is not None: + warnings.warn( + "The trust_boundary parameter is deprecated and has no effect.", + DeprecationWarning, + stacklevel=2, + ) self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN self._account = account or "" self._cred_file_path = None diff --git a/packages/google-auth/google/oauth2/service_account.py b/packages/google-auth/google/oauth2/service_account.py index f897b3b75d51..9600a0670361 100644 --- a/packages/google-auth/google/oauth2/service_account.py +++ b/packages/google-auth/google/oauth2/service_account.py @@ -72,6 +72,7 @@ import copy import datetime +import logging from google.auth import _constants from google.auth import _helpers @@ -83,6 +84,8 @@ from google.auth import metrics from google.oauth2 import _client +_LOGGER = logging.getLogger(__name__) + _DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds _GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" @@ -92,7 +95,7 @@ class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithTokenUri, - credentials.CredentialsWithTrustBoundary, + credentials.CredentialsWithRegionalAccessBoundary, ): """Service account credentials @@ -196,7 +199,15 @@ def __init__( self._additional_claims = additional_claims else: self._additional_claims = {} - self._trust_boundary = trust_boundary + + if trust_boundary is not None: + import warnings + + warnings.warn( + "The trust_boundary parameter is deprecated and has no effect.", + DeprecationWarning, + stacklevel=2, + ) @classmethod def _from_signer_and_info(cls, signer, info, **kwargs): @@ -222,7 +233,6 @@ def _from_signer_and_info(cls, signer, info, **kwargs): universe_domain=info.get( "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN ), - trust_boundary=info.get("trust_boundary"), **kwargs, ) @@ -296,9 +306,9 @@ def _make_copy(self): additional_claims=self._additional_claims.copy(), always_use_jwt_access=self._always_use_jwt_access, universe_domain=self._universe_domain, - trust_boundary=self._trust_boundary, ) cred._cred_file_path = self._cred_file_path + self._copy_regional_access_boundary_manager(cred) return cred @_helpers.copy_docstring(credentials.Scoped) @@ -384,12 +394,6 @@ def with_token_uri(self, token_uri): cred._token_uri = token_uri return cred - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) - def with_trust_boundary(self, trust_boundary): - cred = self._make_copy() - cred._trust_boundary = trust_boundary - return cred - def _make_authorization_grant_assertion(self): """Create the OAuth 2.0 assertion. @@ -433,7 +437,7 @@ def _metric_header_for_usage(self): return metrics.CRED_TYPE_SA_JWT return metrics.CRED_TYPE_SA_ASSERTION - @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) + @_helpers.copy_docstring(credentials.CredentialsWithRegionalAccessBoundary) def _perform_refresh_token(self, request): if self._always_use_jwt_access and not self._jwt_credentials: # If self signed jwt should be used but jwt credential is not @@ -499,27 +503,26 @@ def _create_self_signed_jwt(self, audience): self, audience ) - def _build_trust_boundary_lookup_url(self): - """Builds and returns the URL for the trust boundary lookup API. + def _build_regional_access_boundary_lookup_url(self, request=None): + """Builds and returns the URL for the Regional Access Boundary lookup API. This method constructs the specific URL for the IAM Credentials API's `allowedLocations` endpoint, using the credential's universe domain and service account email. - Raises: - ValueError: If `self.service_account_email` is None or an empty - string, as it's required to form the URL. - Returns: - str: The URL for the trust boundary lookup endpoint. + Optional[str]: The URL for the Regional Access Boundary lookup endpoint, or None + if the service account email is missing. """ if not self.service_account_email: - raise ValueError( - "Service account email is required to build the trust boundary lookup URL." + _LOGGER.error( + "Service account email is required to build the Regional Access Boundary lookup URL for service account credentials." + ) + return None + return ( + _constants._SERVICE_ACCOUNT_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT.format( + service_account_email=self._service_account_email, ) - return _constants._SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - universe_domain=self._universe_domain, - service_account_email=self._service_account_email, ) @_helpers.copy_docstring(credentials.Signing) diff --git a/packages/google-auth/tests/compute_engine/test_credentials.py b/packages/google-auth/tests/compute_engine/test_credentials.py index ab7d45e0486c..1c833c85bcaf 100644 --- a/packages/google-auth/tests/compute_engine/test_credentials.py +++ b/packages/google-auth/tests/compute_engine/test_credentials.py @@ -26,7 +26,6 @@ from google.auth import transport from google.auth.compute_engine import credentials from google.auth.transport import requests -from google.oauth2 import _client SAMPLE_ID_TOKEN_EXP = 1584393400 @@ -62,9 +61,6 @@ class TestCredentials(object): credentials = None credentials_with_all_fields = None - VALID_TRUST_BOUNDARY = {"encodedLocations": "valid-encoded-locations"} - NO_OP_TRUST_BOUNDARY = {"encodedLocations": ""} - EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/default/allowedLocations" @pytest.fixture(autouse=True) def credentials_fixture(self): @@ -258,18 +254,6 @@ def test_with_universe_domain(self): assert creds.universe_domain == "universe_domain" assert creds._universe_domain_cached - def test_with_trust_boundary(self): - creds = self.credentials_with_all_fields - new_boundary = {"encodedLocations": "new_boundary"} - new_creds = creds.with_trust_boundary(new_boundary) - - assert new_creds is not creds - assert new_creds._trust_boundary == new_boundary - assert new_creds._service_account_email == creds._service_account_email - assert new_creds._quota_project_id == creds._quota_project_id - assert new_creds._scopes == creds._scopes - assert new_creds._default_scopes == creds._default_scopes - def test_token_usage_metrics(self): self.credentials.token = "token" self.credentials.expiry = None @@ -309,275 +293,23 @@ def test_user_provided_universe_domain(self, get_universe_domain): # domain endpoint. get_universe_domain.assert_not_called() - @mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True) - @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) - def test_refresh_trust_boundary_lookup_skipped_if_env_var_not_true( - self, mock_metadata_get, mock_lookup_tb - ): - creds = self.credentials - request = mock.Mock() - - mock_metadata_get.side_effect = [ - # from _retrieve_info - {"email": "default", "scopes": ["scope1"]}, - # from get_service_account_token - {"access_token": "mock_token", "expires_in": 3600}, - ] - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"} - ): - creds.refresh(request) - - mock_lookup_tb.assert_not_called() - assert creds._trust_boundary is None - - @mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True) - @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) - def test_refresh_trust_boundary_lookup_skipped_if_env_var_missing( - self, mock_metadata_get, mock_lookup_tb - ): - creds = self.credentials - request = mock.Mock() - - mock_metadata_get.side_effect = [ - # from _retrieve_info - {"email": "default", "scopes": ["scope1"]}, - # from get_service_account_token - {"access_token": "mock_token", "expires_in": 3600}, - ] - - with mock.patch.dict(os.environ, clear=True): - creds.refresh(request) - - mock_lookup_tb.assert_not_called() - assert creds._trust_boundary is None - - @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) - @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) - def test_refresh_trust_boundary_lookup_success( - self, mock_metadata_get, mock_lookup_tb - ): - mock_lookup_tb.return_value = { - "locations": ["us-central1"], - "encodedLocations": "0xABC", - } - creds = self.credentials - request = mock.Mock() - - # The first call to _metadata.get is for service account info, the second - # for the access token, and the third for the universe domain. - mock_metadata_get.side_effect = [ - # from _retrieve_info - {"email": "resolved-email@example.com", "scopes": ["scope1"]}, - # from get_service_account_token - {"access_token": "mock_token", "expires_in": 3600}, - # from get_universe_domain - "", - ] - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - creds.refresh(request) - - # Verify _metadata.get was called three times. - assert mock_metadata_get.call_count == 3 - # Verify lookup_trust_boundary was called with correct URL and token - expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations" - mock_lookup_tb.assert_called_once_with( - request, expected_url, headers={"authorization": "Bearer mock_token"} - ) - # Verify trust boundary was set - assert creds._trust_boundary == { - "locations": ["us-central1"], - "encodedLocations": "0xABC", - } - - # Verify x-allowed-locations header is set by apply() - headers_applied = {} - creds.apply(headers_applied) - assert headers_applied["x-allowed-locations"] == "0xABC" - - @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) - @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) - def test_refresh_trust_boundary_lookup_fails_no_cache( - self, mock_lookup_tb, mock_metadata_get - ): - mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed") - creds = self.credentials - request = mock.Mock() - - # Mock metadata calls for token, universe domain, and service account info - mock_metadata_get.side_effect = [ - # from _retrieve_info - {"email": "resolved-email@example.com", "scopes": ["scope1"]}, - # from get_service_account_token - {"access_token": "mock_token", "expires_in": 3600}, - # from get_universe_domain - "", - ] - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - with pytest.raises(exceptions.RefreshError, match="Lookup failed"): - creds.refresh(request) - - assert creds._trust_boundary is None - assert mock_metadata_get.call_count == 3 - mock_lookup_tb.assert_called_once() - - @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) - @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) - def test_refresh_trust_boundary_lookup_fails_with_cached_data( - self, mock_lookup_tb, mock_metadata_get - ): - # First refresh: Successfully fetch a valid trust boundary. - mock_lookup_tb.return_value = { - "locations": ["us-central1"], - "encodedLocations": "0xABC", - } - mock_metadata_get.side_effect = [ - # from _retrieve_info - {"email": "resolved-email@example.com", "scopes": ["scope1"]}, - # from get_service_account_token - {"access_token": "mock_token_1", "expires_in": 3600}, - # from get_universe_domain - "", - ] - creds = self.credentials - request = mock.Mock() - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - creds.refresh(request) - - assert creds._trust_boundary == { - "locations": ["us-central1"], - "encodedLocations": "0xABC", - } - mock_lookup_tb.assert_called_once() - - # Second refresh: Mock lookup to fail, but expect cached data to be preserved. - mock_lookup_tb.reset_mock() - mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed") - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - # This refresh should not raise an error because a cached value exists. - mock_metadata_get.reset_mock() - mock_metadata_get.side_effect = [ - # from _retrieve_info - {"email": "resolved-email@example.com", "scopes": ["scope1"]}, - # from get_service_account_token - {"access_token": "mock_token_2", "expires_in": 3600}, - # from get_universe_domain - "", - ] - creds.refresh(request) - - assert creds._trust_boundary == { - "locations": ["us-central1"], - "encodedLocations": "0xABC", - } - mock_lookup_tb.assert_called_once() - - @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) - @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) - def test_refresh_fetches_no_op_trust_boundary( - self, mock_lookup_tb, mock_metadata_get - ): - mock_lookup_tb.return_value = {"locations": [], "encodedLocations": "0x0"} - creds = self.credentials - request = mock.Mock() - - mock_metadata_get.side_effect = [ - # from _retrieve_info - {"email": "resolved-email@example.com", "scopes": ["scope1"]}, - # from get_service_account_token - {"access_token": "mock_token", "expires_in": 3600}, - # from get_universe_domain - "", - ] - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - creds.refresh(request) - - assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"} - assert mock_metadata_get.call_count == 3 - expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations" - mock_lookup_tb.assert_called_once_with( - request, expected_url, headers={"authorization": "Bearer mock_token"} - ) - # Verify that an empty header was added. - headers_applied = {} - creds.apply(headers_applied) - assert headers_applied["x-allowed-locations"] == "" - - @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) - @mock.patch.object(_client, "_lookup_trust_boundary", autospec=True) - def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( - self, mock_lookup_tb, mock_metadata_get - ): - creds = self.credentials - # Use pre-cache universe domain to avoid an extra metadata call. - creds._universe_domain_cached = True - creds._trust_boundary = {"locations": [], "encodedLocations": "0x0"} - request = mock.Mock() - - mock_metadata_get.side_effect = [ - # from _retrieve_info - {"email": "resolved-email@example.com", "scopes": ["scope1"]}, - # from get_service_account_token - {"access_token": "mock_token", "expires_in": 3600}, - ] - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - creds.refresh(request) - - # Verify trust boundary remained NO_OP - assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"} - # Lookup should be skipped - mock_lookup_tb.assert_not_called() - # Two metadata calls for token refresh should have happened. - assert mock_metadata_get.call_count == 2 - - # Verify that an empty header was added. - headers_applied = {} - creds.apply(headers_applied) - assert headers_applied["x-allowed-locations"] == "" @mock.patch( "google.auth.compute_engine._metadata.get_service_account_info", autospec=True ) - @mock.patch( - "google.auth.compute_engine._metadata.get_universe_domain", autospec=True - ) - def test_build_trust_boundary_lookup_url_default_email( - self, mock_get_universe_domain, mock_get_service_account_info + def test_build_regional_access_boundary_lookup_url_default_email( + self, mock_get_service_account_info ): - # Test with default service account email, which needs resolution - creds = self.credentials - creds._service_account_email = "default" mock_get_service_account_info.return_value = { "email": "resolved-email@example.com" } - mock_get_universe_domain.return_value = "googleapis.com" - - url = creds._build_trust_boundary_lookup_url() + creds = self.credentials + creds._universe_domain_cached = True + url = creds._build_regional_access_boundary_lookup_url() mock_get_service_account_info.assert_called_once_with(mock.ANY, "default") - mock_get_universe_domain.assert_called_once_with(mock.ANY) - assert url == ( - "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations" - ) + expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations" + assert url == expected_url @mock.patch( "google.auth.compute_engine._metadata.get_service_account_info", autospec=True @@ -585,7 +317,7 @@ def test_build_trust_boundary_lookup_url_default_email( @mock.patch( "google.auth.compute_engine._metadata.get_universe_domain", autospec=True ) - def test_build_trust_boundary_lookup_url_explicit_email( + def test_build_regional_access_boundary_lookup_url_explicit_email( self, mock_get_universe_domain, mock_get_service_account_info ): # Test with an explicit service account email, no resolution needed @@ -593,58 +325,34 @@ def test_build_trust_boundary_lookup_url_explicit_email( creds._service_account_email = FAKE_SERVICE_ACCOUNT_EMAIL mock_get_universe_domain.return_value = "googleapis.com" - url = creds._build_trust_boundary_lookup_url() + url = creds._build_regional_access_boundary_lookup_url() mock_get_service_account_info.assert_not_called() - mock_get_universe_domain.assert_called_once_with(mock.ANY) assert url == ( "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/foo@bar.com/allowedLocations" ) - @mock.patch( - "google.auth.compute_engine._metadata.get_service_account_info", autospec=True - ) @mock.patch( "google.auth.compute_engine._metadata.get_universe_domain", autospec=True ) - def test_build_trust_boundary_lookup_url_non_default_universe( - self, mock_get_universe_domain, mock_get_service_account_info - ): - # Test with a non-default universe domain - creds = self.credentials_with_all_fields - - url = creds._build_trust_boundary_lookup_url() - - # Universe domain is cached and email is explicit, so no metadata calls needed. - mock_get_service_account_info.assert_not_called() - mock_get_universe_domain.assert_not_called() - assert url == ( - "https://iamcredentials.fake-universe-domain/v1/projects/-/serviceAccounts/foo@bar.com/allowedLocations" - ) - @mock.patch( "google.auth.compute_engine._metadata.get_service_account_info", autospec=True ) - def test_build_trust_boundary_lookup_url_get_service_account_info_fails( - self, mock_get_service_account_info + def test_build_regional_access_boundary_lookup_url_get_service_account_info_fails( + self, mock_get_service_account_info, mock_get_universe_domain ): - # Test scenario where get_service_account_info fails mock_get_service_account_info.side_effect = exceptions.TransportError( - "Failed to get info" + "Metadata server error" ) creds = self.credentials - creds._service_account_email = "default" - - with pytest.raises( - exceptions.RefreshError, - match=r"Failed to get service account email for trust boundary lookup: .*", - ): - creds._build_trust_boundary_lookup_url() + url = creds._build_regional_access_boundary_lookup_url() + assert url is None + mock_get_service_account_info.assert_called_once() @mock.patch( "google.auth.compute_engine._metadata.get_service_account_info", autospec=True ) - def test_build_trust_boundary_lookup_url_no_email( + def test_build_regional_access_boundary_lookup_url_no_email( self, mock_get_service_account_info ): # Test with default service account email, which needs resolution, but metadata @@ -653,10 +361,8 @@ def test_build_trust_boundary_lookup_url_no_email( creds._service_account_email = "default" mock_get_service_account_info.return_value = {"scopes": ["one", "two"]} - with pytest.raises(exceptions.RefreshError) as excinfo: - creds._build_trust_boundary_lookup_url() - - assert excinfo.match(r"missing 'email' field") + url = creds._build_regional_access_boundary_lookup_url() + assert url is None @mock.patch("google.auth.compute_engine._metadata.get") @mock.patch("google.auth._agent_identity_utils.get_agent_identity_certificate_path") diff --git a/packages/google-auth/tests/oauth2/test__client.py b/packages/google-auth/tests/oauth2/test__client.py index 6a01b1bacecb..ad510f903d55 100644 --- a/packages/google-auth/tests/oauth2/test__client.py +++ b/packages/google-auth/tests/oauth2/test__client.py @@ -632,7 +632,7 @@ def test__token_endpoint_request_no_throw_with_retry(can_retry): assert mock_request.call_count == 1 -def test_lookup_trust_boundary(): +def test_lookup_regional_access_boundary(): response_data = { "locations": ["us-central1", "us-east1"], "encodedLocations": "0x80080000000000", @@ -647,7 +647,9 @@ def test_lookup_trust_boundary(): url = "http://example.com" headers = {"Authorization": "Bearer access_token"} - response = _client._lookup_trust_boundary(mock_request, url, headers=headers) + response = _client._lookup_regional_access_boundary( + mock_request, url, headers=headers + ) assert response["encodedLocations"] == "0x80080000000000" assert response["locations"] == ["us-central1", "us-east1"] @@ -655,47 +657,7 @@ def test_lookup_trust_boundary(): mock_request.assert_called_once_with(method="GET", url=url, headers=headers) -def test_lookup_trust_boundary_no_op_response_without_locations(): - response_data = {"encodedLocations": "0x0"} - - mock_response = mock.create_autospec(transport.Response, instance=True) - mock_response.status = http_client.OK - mock_response.data = json.dumps(response_data).encode("utf-8") - - mock_request = mock.create_autospec(transport.Request) - mock_request.return_value = mock_response - - url = "http://example.com" - headers = {"Authorization": "Bearer access_token"} - # for the response to be valid, we only need encodedLocations to be present. - response = _client._lookup_trust_boundary(mock_request, url, headers=headers) - assert response["encodedLocations"] == "0x0" - assert "locations" not in response - - mock_request.assert_called_once_with(method="GET", url=url, headers=headers) - - -def test_lookup_trust_boundary_no_op_response(): - response_data = {"locations": [], "encodedLocations": "0x0"} - - mock_response = mock.create_autospec(transport.Response, instance=True) - mock_response.status = http_client.OK - mock_response.data = json.dumps(response_data).encode("utf-8") - - mock_request = mock.create_autospec(transport.Request) - mock_request.return_value = mock_response - - url = "http://example.com" - headers = {"Authorization": "Bearer access_token"} - response = _client._lookup_trust_boundary(mock_request, url, headers=headers) - - assert response["encodedLocations"] == "0x0" - assert response["locations"] == [] - - mock_request.assert_called_once_with(method="GET", url=url, headers=headers) - - -def test_lookup_trust_boundary_error(): +def test_lookup_regional_access_boundary_error(): mock_response = mock.create_autospec(transport.Response, instance=True) mock_response.status = http_client.INTERNAL_SERVER_ERROR mock_response.data = "this is an error message" @@ -705,33 +667,40 @@ def test_lookup_trust_boundary_error(): url = "http://example.com" headers = {"Authorization": "Bearer access_token"} - with pytest.raises(exceptions.RefreshError) as excinfo: - _client._lookup_trust_boundary(mock_request, url, headers=headers) - assert excinfo.match("this is an error message") + result = _client._lookup_regional_access_boundary( + mock_request, url, headers=headers + ) + assert result is None mock_request.assert_called_with(method="GET", url=url, headers=headers) -def test_lookup_trust_boundary_missing_encoded_locations(): - response_data = {"locations": [], "bad_field": "0x0"} - +@pytest.mark.parametrize( + "status_code", + [ + http_client.NOT_FOUND, + http_client.FORBIDDEN, + ], +) +def test_lookup_regional_access_boundary_non_retryable_error(status_code): mock_response = mock.create_autospec(transport.Response, instance=True) - mock_response.status = http_client.OK - mock_response.data = json.dumps(response_data).encode("utf-8") + mock_response.status = status_code + mock_response.data = "Error" mock_request = mock.create_autospec(transport.Request) mock_request.return_value = mock_response url = "http://example.com" headers = {"Authorization": "Bearer access_token"} - with pytest.raises(exceptions.MalformedError) as excinfo: - _client._lookup_trust_boundary(mock_request, url, headers=headers) - assert excinfo.match("Invalid trust boundary info") - + result = _client._lookup_regional_access_boundary( + mock_request, url, headers=headers + ) + assert result is None + # Non-retryable errors should only be called once. mock_request.assert_called_once_with(method="GET", url=url, headers=headers) -def test_lookup_trust_boundary_internal_failure_and_retry_failure_error(): +def test_lookup_regional_access_boundary_internal_failure_and_retry_failure_error(): retryable_error = mock.create_autospec(transport.Response, instance=True) retryable_error.status = http_client.BAD_REQUEST retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode( @@ -749,10 +718,10 @@ def test_lookup_trust_boundary_internal_failure_and_retry_failure_error(): request.side_effect = [retryable_error, retryable_error, unretryable_error] headers = {"Authorization": "Bearer access_token"} - with pytest.raises(exceptions.RefreshError): - _client._lookup_trust_boundary_request( - request, "http://example.com", headers=headers - ) + result = _client._lookup_regional_access_boundary_request( + request, "http://example.com", headers=headers + ) + assert result is None # request should be called three times. Two retryable errors and one # unretryable error to break the retry loop. assert request.call_count == 3 @@ -760,14 +729,17 @@ def test_lookup_trust_boundary_internal_failure_and_retry_failure_error(): assert call[1]["headers"] == headers -def test_lookup_trust_boundary_internal_failure_and_retry_succeeds(): +def test_lookup_regional_access_boundary_internal_failure_and_retry_succeeds(): retryable_error = mock.create_autospec(transport.Response, instance=True) retryable_error.status = http_client.BAD_REQUEST retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode( "utf-8" ) - response_data = {"locations": [], "encodedLocations": "0x0"} + response_data = { + "locations": ["us-central1", "us-east1"], + "encodedLocations": "0xVALIDHEX", + } response = mock.create_autospec(transport.Response, instance=True) response.status = http_client.OK response.data = json.dumps(response_data).encode("utf-8") @@ -777,7 +749,7 @@ def test_lookup_trust_boundary_internal_failure_and_retry_succeeds(): headers = {"Authorization": "Bearer access_token"} request.side_effect = [retryable_error, response] - _ = _client._lookup_trust_boundary_request( + _ = _client._lookup_regional_access_boundary_request( request, "http://example.com", headers=headers ) @@ -786,7 +758,7 @@ def test_lookup_trust_boundary_internal_failure_and_retry_succeeds(): assert call[1]["headers"] == headers -def test_lookup_trust_boundary_with_headers(): +def test_lookup_regional_access_boundary_with_headers(): response_data = { "locations": ["us-central1", "us-east1"], "encodedLocations": "0x80080000000000", @@ -800,7 +772,9 @@ def test_lookup_trust_boundary_with_headers(): mock_request.return_value = mock_response headers = {"Authorization": "Bearer access_token", "x-test-header": "test-value"} - _client._lookup_trust_boundary(mock_request, "http://example.com", headers=headers) + _client._lookup_regional_access_boundary( + mock_request, "http://example.com", headers=headers + ) mock_request.assert_called_once_with( method="GET", url="http://example.com", headers=headers diff --git a/packages/google-auth/tests/oauth2/test_service_account.py b/packages/google-auth/tests/oauth2/test_service_account.py index 788f52263bce..f0d8f0759e50 100644 --- a/packages/google-auth/tests/oauth2/test_service_account.py +++ b/packages/google-auth/tests/oauth2/test_service_account.py @@ -20,9 +20,7 @@ import pytest # type: ignore from google.auth import _helpers -from google.auth import credentials from google.auth import crypt -from google.auth import environment_vars from google.auth import exceptions from google.auth import iam from google.auth import jwt @@ -60,10 +58,6 @@ class TestCredentials(object): SERVICE_ACCOUNT_EMAIL = "service-account@example.com" TOKEN_URI = "https://example.com/oauth2/token" - NO_OP_TRUST_BOUNDARY = { - "locations": credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS, - "encodedLocations": credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS, - } VALID_TRUST_BOUNDARY = { "locations": ["us-central1", "us-east1"], "encodedLocations": "0xVALIDHEXSA", @@ -77,14 +71,12 @@ class TestCredentials(object): def make_credentials( cls, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, # Align with Credentials class default ): return service_account.Credentials( SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI, universe_domain=universe_domain, - trust_boundary=trust_boundary, ) def test_get_cred_info(self): @@ -236,6 +228,16 @@ def test_with_quota_project(self): new_credentials.apply(hdrs, token="tok") assert "x-goog-user-project" in hdrs + def test_build_regional_access_boundary_lookup_url(self): + credentials = self.make_credentials() + expected_url = ( + "https://iamcredentials.googleapis.com/v1/projects/-/" + "serviceAccounts/{}/allowedLocations".format( + credentials.service_account_email + ) + ) + assert credentials._build_regional_access_boundary_lookup_url() == expected_url + def test_with_token_uri(self): credentials = self.make_credentials() new_token_uri = "https://example2.com/oauth2/token" @@ -270,18 +272,6 @@ def test__with_always_use_jwt_access_non_default_universe_domain(self): "always_use_jwt_access should be True for non-default universe domain" ) - def test_with_trust_boundary(self): - credentials = self.make_credentials() - new_boundary = {"encodedLocations": "new_boundary"} - new_credentials = credentials.with_trust_boundary(new_boundary) - - assert new_credentials is not credentials - assert new_credentials._trust_boundary == new_boundary - assert new_credentials._signer == credentials._signer - assert ( - new_credentials.service_account_email == credentials.service_account_email - ) - def test__make_authorization_grant_assertion(self): credentials = self.make_credentials() token = credentials._make_authorization_grant_assertion() @@ -529,39 +519,6 @@ def test_refresh_success(self, jwt_grant): # Check that the credentials are valid (have a token and are not expired). assert credentials.valid - # Trust boundary should be None since env var is not set and no initial - # boundary was provided. - assert credentials._trust_boundary is None - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - @mock.patch("google.oauth2._client.jwt_grant", autospec=True) - def test_refresh_skips_trust_boundary_lookup_non_default_universe( - self, mock_jwt_grant, mock_lookup_trust_boundary - ): - # Create credentials with a non-default universe domain - credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN) - token = "token" - mock_jwt_grant.return_value = ( - token, - _helpers.utcnow() + datetime.timedelta(seconds=500), - {}, - ) - request = mock.create_autospec(transport.Request, instance=True) - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - # Ensure jwt_grant was called (token refresh happened) - mock_jwt_grant.assert_called_once() - # Ensure trust boundary lookup was not called - mock_lookup_trust_boundary.assert_not_called() - # Verify that x-allowed-locations header is not set by apply() - headers_applied = {} - credentials.apply(headers_applied) - assert "x-allowed-locations" not in headers_applied - @mock.patch("google.oauth2._client.jwt_grant", autospec=True) def test_before_request_refreshes(self, jwt_grant): credentials = self.make_credentials() @@ -670,208 +627,6 @@ def test_refresh_non_gdu_domain_wide_delegation_not_supported(self): credentials.refresh(None) assert excinfo.match("domain wide delegation is not supported") - @mock.patch("google.oauth2._client._lookup_trust_boundary") - @mock.patch("google.oauth2._client.jwt_grant", autospec=True) - def test_refresh_success_with_valid_trust_boundary( - self, mock_jwt_grant, mock_lookup_trust_boundary - ): - # Start with no boundary. - credentials = self.make_credentials(trust_boundary=None) - token = "token" - mock_jwt_grant.return_value = ( - token, - _helpers.utcnow() + datetime.timedelta(seconds=500), - {}, - ) - request = mock.create_autospec(transport.Request, instance=True) - - # Mock the trust boundary lookup to return a valid boundary. - mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - assert credentials.valid - assert credentials.token == token - - # Verify trust boundary was set. - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - - # Verify the mock was called with the correct URL. - mock_lookup_trust_boundary.assert_called_once_with( - request, - self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, - headers={"authorization": "Bearer token"}, - ) - - # Verify x-allowed-locations header is set correctly by apply(). - headers_applied = {} - credentials.apply(headers_applied) - assert ( - headers_applied["x-allowed-locations"] - == self.VALID_TRUST_BOUNDARY["encodedLocations"] - ) - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - @mock.patch("google.oauth2._client.jwt_grant", autospec=True) - def test_refresh_fetches_no_op_trust_boundary( - self, mock_jwt_grant, mock_lookup_trust_boundary - ): - # Start with no trust boundary - credentials = self.make_credentials(trust_boundary=None) - token = "token" - mock_jwt_grant.return_value = ( - token, - _helpers.utcnow() + datetime.timedelta(seconds=500), - {}, - ) - request = mock.create_autospec(transport.Request, instance=True) - - mock_lookup_trust_boundary.return_value = self.NO_OP_TRUST_BOUNDARY - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - assert credentials.valid - assert credentials.token == token - assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY - mock_lookup_trust_boundary.assert_called_once_with( - request, - self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, - headers={"authorization": "Bearer token"}, - ) - headers_applied = {} - credentials.apply(headers_applied) - assert headers_applied["x-allowed-locations"] == "" - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - @mock.patch("google.oauth2._client.jwt_grant", autospec=True) - def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( - self, mock_jwt_grant, mock_lookup_trust_boundary - ): - credentials = self.make_credentials( - trust_boundary=self.NO_OP_TRUST_BOUNDARY - ) # Start with NO_OP - token = "token" - mock_jwt_grant.return_value = ( - token, - _helpers.utcnow() + datetime.timedelta(seconds=500), - {}, - ) - request = mock.create_autospec(transport.Request, instance=True) - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - assert credentials.valid - assert credentials.token == token - # Verify trust boundary remained NO_OP - assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY - - # Lookup should be skipped - mock_lookup_trust_boundary.assert_not_called() - - # Verify that an empty header was added. - headers_applied = {} - credentials.apply(headers_applied) - assert headers_applied["x-allowed-locations"] == "" - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - @mock.patch("google.oauth2._client.jwt_grant", autospec=True) - def test_refresh_trust_boundary_lookup_fails_no_cache( - self, mock_jwt_grant, mock_lookup_trust_boundary - ): - # Start with no trust boundary - credentials = self.make_credentials(trust_boundary=None) - mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( - "Lookup failed" - ) - mock_jwt_grant.return_value = ( - "mock_access_token", - _helpers.utcnow() + datetime.timedelta(seconds=3600), - {}, - ) - request = mock.create_autospec(transport.Request, instance=True) - - # Mock the trust boundary lookup to raise an error - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ), pytest.raises(exceptions.RefreshError, match="Lookup failed"): - credentials.refresh(request) - - assert credentials._trust_boundary is None - mock_lookup_trust_boundary.assert_called_once() - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - @mock.patch("google.oauth2._client.jwt_grant", autospec=True) - def test_refresh_trust_boundary_lookup_fails_with_cached_data( - self, mock_jwt_grant, mock_lookup_trust_boundary - ): - # Initial setup: Credentials with no trust boundary. - credentials = self.make_credentials(trust_boundary=None) - token = "token" - mock_jwt_grant.return_value = ( - token, - _helpers.utcnow() + datetime.timedelta(seconds=500), - {}, - ) - request = mock.create_autospec(transport.Request, instance=True) - - # First refresh: Successfully fetch a valid trust boundary. - mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - assert credentials.valid - assert credentials.token == token - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - mock_lookup_trust_boundary.assert_called_once_with( - request, - self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, - headers={"authorization": "Bearer token"}, - ) - - # Second refresh: Mock lookup to fail, but expect cached data to be preserved. - mock_lookup_trust_boundary.reset_mock() - mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( - "Lookup failed" - ) - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) # This should NOT raise an exception - - assert credentials.valid # Credentials should still be valid - assert ( - credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - ) # Cached data should be preserved - mock_lookup_trust_boundary.assert_called_once_with( - request, - self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, - headers={ - "authorization": "Bearer token", - "x-allowed-locations": self.VALID_TRUST_BOUNDARY["encodedLocations"], - }, - ) # Lookup should have been attempted again - - def test_build_trust_boundary_lookup_url_no_email(self): - credentials = self.make_credentials() - credentials._service_account_email = None - - with pytest.raises(ValueError) as excinfo: - credentials._build_trust_boundary_lookup_url() - - assert "Service account email is required" in str(excinfo.value) - class TestIDTokenCredentials(object): SERVICE_ACCOUNT_EMAIL = "service-account@example.com" diff --git a/packages/google-auth/tests/test__helpers.py b/packages/google-auth/tests/test__helpers.py index 4d12abd86c67..e2d8e7b20c03 100644 --- a/packages/google-auth/tests/test__helpers.py +++ b/packages/google-auth/tests/test__helpers.py @@ -20,7 +20,7 @@ import pytest # type: ignore -from google.auth import _helpers, exceptions +from google.auth import _helpers # _MOCK_BASE_LOGGER_NAME is the base logger namespace used for testing. _MOCK_BASE_LOGGER_NAME = "foogle" @@ -234,33 +234,6 @@ def test_unpadded_urlsafe_b64encode(): assert _helpers.unpadded_urlsafe_b64encode(case) == expected -def test_get_bool_from_env(monkeypatch): - # Test default value when environment variable is not set. - assert _helpers.get_bool_from_env("TEST_VAR") is False - assert _helpers.get_bool_from_env("TEST_VAR", default=True) is True - - # Test true values (case-insensitive) - for true_value in ("true", "True", "TRUE", "1"): - monkeypatch.setenv("TEST_VAR", true_value) - assert _helpers.get_bool_from_env("TEST_VAR") is True - - # Test false values (case-insensitive) - for false_value in ("false", "False", "FALSE", "0"): - monkeypatch.setenv("TEST_VAR", false_value) - assert _helpers.get_bool_from_env("TEST_VAR") is False - - # Test invalid value - monkeypatch.setenv("TEST_VAR", "invalid_value") - with pytest.raises(exceptions.InvalidValue) as excinfo: - _helpers.get_bool_from_env("TEST_VAR") - assert 'must be one of "true", "false", "1", or "0"' in str(excinfo.value) - - # Test empty string value - monkeypatch.setenv("TEST_VAR", "") - with pytest.raises(exceptions.InvalidValue): - _helpers.get_bool_from_env("TEST_VAR") - - def test_hash_sensitive_info_basic(): test_data = { "expires_in": 3599, diff --git a/packages/google-auth/tests/test__regional_access_boundary_utils.py b/packages/google-auth/tests/test__regional_access_boundary_utils.py new file mode 100644 index 000000000000..37194136ae24 --- /dev/null +++ b/packages/google-auth/tests/test__regional_access_boundary_utils.py @@ -0,0 +1,326 @@ +# Copyright 2026 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import os +from unittest import mock + +import pytest # type: ignore + +from google.auth import _helpers +from google.auth import _regional_access_boundary_utils +from google.auth import credentials +from google.auth import environment_vars +from google.auth import exceptions + + +class CredentialsImpl(credentials.CredentialsWithRegionalAccessBoundary): + def __init__(self, universe_domain=None): + super(CredentialsImpl, self).__init__() + if universe_domain: + self._universe_domain = universe_domain + + def _perform_refresh_token(self, request): + self.token = "refreshed-token" + self.expiry = ( + datetime.datetime.utcnow() + + _helpers.REFRESH_THRESHOLD + + datetime.timedelta(seconds=5) + ) + + def with_quota_project(self, quota_project_id): + raise NotImplementedError() + + def _build_regional_access_boundary_lookup_url(self): + # Using self.token here to make the URL dynamic for testing purposes + return "http://mock.url/lookup_for_{}".format(self.token) + + def _make_copy(self): + new_credentials = self.__class__() + self._copy_regional_access_boundary_manager(new_credentials) + return new_credentials + + +@pytest.fixture(autouse=True) +def clear_rab_cache(): + """Clears the Regional Access Boundary enablement cache before every test.""" + _regional_access_boundary_utils.is_regional_access_boundary_enabled.cache_clear() + + +class TestCredentialsWithRegionalAccessBoundary(object): + def test_is_regional_access_boundary_enabled_cached(self, monkeypatch): + # Set to true + monkeypatch.setenv( + environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, "true" + ) + assert _regional_access_boundary_utils.is_regional_access_boundary_enabled() is True + + # Change env var to false, but it should still return True due to caching + monkeypatch.setenv( + environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, "false" + ) + assert _regional_access_boundary_utils.is_regional_access_boundary_enabled() is True + + # Clear cache and it should now reflect the new value + _regional_access_boundary_utils.is_regional_access_boundary_enabled.cache_clear() + assert _regional_access_boundary_utils.is_regional_access_boundary_enabled() is False + + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_is_skipped_if_env_var_not_set( + self, mock_start_refresh + ): + creds = CredentialsImpl() + with mock.patch.dict(os.environ, clear=True): + creds._maybe_start_regional_access_boundary_refresh( + mock.Mock(), "http://example.com" + ) + mock_start_refresh.assert_not_called() + + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_is_skipped_if_not_expired(self, mock_start_refresh): + creds = CredentialsImpl() + creds._rab_manager._data = _regional_access_boundary_utils._RegionalAccessBoundaryData( + encoded_locations="0xABC", + expiry=_helpers.utcnow() + datetime.timedelta(hours=2), + cooldown_expiry=None, + cooldown_duration=_regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, + ) + with mock.patch.dict( + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, + ): + creds._maybe_start_regional_access_boundary_refresh( + mock.Mock(), "http://example.com" + ) + mock_start_refresh.assert_not_called() + + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_triggered_if_soft_expired(self, mock_start_refresh): + creds = CredentialsImpl() + creds._rab_manager._data = _regional_access_boundary_utils._RegionalAccessBoundaryData( + encoded_locations="0xABC", + expiry=_helpers.utcnow() + datetime.timedelta(minutes=30), + cooldown_expiry=None, + cooldown_duration=_regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, + ) + request = mock.Mock() + with mock.patch.dict( + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, + ): + creds._maybe_start_regional_access_boundary_refresh( + request, "http://example.com" + ) + mock_start_refresh.assert_called_once_with(creds, request, creds._rab_manager) + + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_is_skipped_if_cooldown_active( + self, mock_start_refresh + ): + creds = CredentialsImpl() + creds._rab_manager._data = _regional_access_boundary_utils._RegionalAccessBoundaryData( + encoded_locations=None, + expiry=None, + cooldown_expiry=_helpers.utcnow() + datetime.timedelta(minutes=5), + cooldown_duration=_regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, + ) + with mock.patch.dict( + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, + ): + creds._maybe_start_regional_access_boundary_refresh( + mock.Mock(), "http://example.com" + ) + mock_start_refresh.assert_not_called() + + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_is_skipped_for_regional_endpoint( + self, mock_start_refresh + ): + creds = CredentialsImpl() + with mock.patch.dict( + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, + ): + creds._maybe_start_regional_access_boundary_refresh( + mock.Mock(), "https://my-service.us-east1.rep.googleapis.com" + ) + mock_start_refresh.assert_not_called() + + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_is_triggered(self, mock_start_refresh): + creds = CredentialsImpl() + request = mock.Mock() + with mock.patch.dict( + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, + ): + creds._maybe_start_regional_access_boundary_refresh( + request, "http://example.com" + ) + mock_start_refresh.assert_called_once_with(creds, request, creds._rab_manager) + + def test_apply_headers(self): + creds = CredentialsImpl() + creds._rab_manager._data = _regional_access_boundary_utils._RegionalAccessBoundaryData( + encoded_locations="0xABC", + expiry=_helpers.utcnow() + datetime.timedelta(hours=1), + cooldown_expiry=None, + cooldown_duration=_regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, + ) + headers = {} + creds._rab_manager.apply_headers(headers) + assert headers == {"x-allowed-locations": "0xABC"} + + creds._rab_manager._data = creds._rab_manager._data._replace( + encoded_locations=None + ) + headers = {"x-allowed-locations": "0xABC"} + creds._rab_manager.apply_headers(headers) + assert headers == {} + + def test_copy_regional_access_boundary_state(self): + source_creds = CredentialsImpl() + snapshot = _regional_access_boundary_utils._RegionalAccessBoundaryData( + encoded_locations="0xABC", + expiry=_helpers.utcnow(), + cooldown_expiry=_helpers.utcnow(), + cooldown_duration=_regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, + ) + source_creds._rab_manager._data = snapshot + + target_creds = CredentialsImpl() + source_creds._copy_regional_access_boundary_manager(target_creds) + + assert target_creds._rab_manager is source_creds._rab_manager + assert ( + target_creds._rab_manager._data.encoded_locations + == snapshot.encoded_locations + ) + assert target_creds._rab_manager._data.expiry == snapshot.expiry + assert ( + target_creds._rab_manager._data.cooldown_expiry == snapshot.cooldown_expiry + ) + + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + def test_maybe_start_refresh_is_skipped_if_non_default_universe_domain( + self, mock_start_refresh + ): + creds = CredentialsImpl(universe_domain="not.googleapis.com") + with mock.patch.dict( + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, + ): + creds._maybe_start_regional_access_boundary_refresh( + mock.Mock(), "http://example.com" + ) + mock_start_refresh.assert_not_called() + + @mock.patch( + "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" + ) + @mock.patch("urllib.parse.urlparse") + def test_maybe_start_refresh_handles_url_parse_errors( + self, mock_urlparse, mock_start_refresh + ): + from urllib.parse import urlparse + mock_urlparse.side_effect = ValueError("Malformed URL") + creds = CredentialsImpl() + request = mock.Mock() + with mock.patch.dict( + os.environ, + {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}, + ): + creds._maybe_start_regional_access_boundary_refresh( + request, "http://malformed-url" + ) + mock_start_refresh.assert_called_once_with(creds, request, creds._rab_manager) + + @mock.patch("google.oauth2._client._lookup_regional_access_boundary") + @mock.patch.object(CredentialsImpl, "_build_regional_access_boundary_lookup_url") + def test_lookup_regional_access_boundary_success( + self, mock_build_url, mock_lookup_rab + ): + creds = CredentialsImpl() + creds.token = "token" + request = mock.Mock() + mock_build_url.return_value = "http://rab.example.com" + mock_lookup_rab.return_value = {"encodedLocations": "success"} + + result = creds._lookup_regional_access_boundary(request) + + mock_build_url.assert_called_once() + mock_lookup_rab.assert_called_once_with( + request, "http://rab.example.com", headers={"authorization": "Bearer token"} + ) + assert result == {"encodedLocations": "success"} + + @mock.patch("google.oauth2._client._lookup_regional_access_boundary") + @mock.patch.object(CredentialsImpl, "_build_regional_access_boundary_lookup_url") + def test_lookup_regional_access_boundary_failure( + self, mock_build_url, mock_lookup_rab + ): + creds = CredentialsImpl() + creds.token = "token" + request = mock.Mock() + mock_build_url.return_value = "http://rab.example.com" + mock_lookup_rab.return_value = None + + result = creds._lookup_regional_access_boundary(request) + + mock_build_url.assert_called_once() + mock_lookup_rab.assert_called_once_with( + request, "http://rab.example.com", headers={"authorization": "Bearer token"} + ) + assert result is None + + @mock.patch("google.oauth2._client._lookup_regional_access_boundary") + @mock.patch.object(CredentialsImpl, "_build_regional_access_boundary_lookup_url") + def test_lookup_regional_access_boundary_null_url( + self, mock_build_url, mock_lookup_rab + ): + creds = CredentialsImpl() + creds.token = "token" + request = mock.Mock() + mock_build_url.return_value = None + + result = creds._lookup_regional_access_boundary(request) + + mock_build_url.assert_called_once() + mock_lookup_rab.assert_not_called() + assert result is None + + def test_credentials_with_regional_access_boundary_initialization(self): + creds = CredentialsImpl() + assert creds._rab_manager._data.encoded_locations is None + assert creds._rab_manager._data.expiry is None + assert creds._rab_manager._data.cooldown_expiry is None + assert creds._rab_manager._data.cooldown_duration == ( + credentials._regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN + ) + assert creds._rab_manager._update_lock is not None diff --git a/packages/google-auth/tests/test_aws.py b/packages/google-auth/tests/test_aws.py index b6b1ca2319ed..477847525894 100644 --- a/packages/google-auth/tests/test_aws.py +++ b/packages/google-auth/tests/test_aws.py @@ -973,7 +973,6 @@ def test_from_info_full_options(self, mock_init): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(aws.Credentials, "__init__", return_value=None) @@ -1003,7 +1002,6 @@ def test_from_info_required_options_only(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(aws.Credentials, "__init__", return_value=None) @@ -1035,7 +1033,6 @@ def test_from_info_supplier(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(aws.Credentials, "__init__", return_value=None) @@ -1073,7 +1070,6 @@ def test_from_file_full_options(self, mock_init, tmpdir): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(aws.Credentials, "__init__", return_value=None) @@ -1104,7 +1100,6 @@ def test_from_file_required_options_only(self, mock_init, tmpdir): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) def test_constructor_invalid_credential_source(self): diff --git a/packages/google-auth/tests/test_credentials.py b/packages/google-auth/tests/test_credentials.py index 0d64f0e0ad2b..0cc29e4a66fc 100644 --- a/packages/google-auth/tests/test_credentials.py +++ b/packages/google-auth/tests/test_credentials.py @@ -23,10 +23,14 @@ from google.auth import credentials from google.auth import environment_vars from google.auth import exceptions -from google.oauth2 import _client -class CredentialsImpl(credentials.CredentialsWithTrustBoundary): +class CredentialsImpl(credentials.CredentialsWithRegionalAccessBoundary): + def __init__(self, universe_domain=None): + super(CredentialsImpl, self).__init__() + if universe_domain: + self._universe_domain = universe_domain + def _perform_refresh_token(self, request): self.token = "refreshed-token" self.expiry = ( @@ -38,10 +42,15 @@ def _perform_refresh_token(self, request): def with_quota_project(self, quota_project_id): raise NotImplementedError() - def _build_trust_boundary_lookup_url(self): + def _build_regional_access_boundary_lookup_url(self): # Using self.token here to make the URL dynamic for testing purposes return "http://mock.url/lookup_for_{}".format(self.token) + def _make_copy(self): + new_credentials = self.__class__() + self._copy_regional_access_boundary_manager(new_credentials) + return new_credentials + class CredentialsImplWithMetrics(credentials.Credentials): def refresh(self, request): @@ -119,17 +128,22 @@ def test_before_request(): assert "x-allowed-locations" not in headers -def test_before_request_with_trust_boundary(): +def test_before_request_with_regional_access_boundary(): DUMMY_BOUNDARY = "0xA30" - credentials = CredentialsImpl() - credentials._trust_boundary = {"locations": [], "encodedLocations": DUMMY_BOUNDARY} + creds = CredentialsImpl() + creds._rab_manager._data = credentials._regional_access_boundary_utils._RegionalAccessBoundaryData( + encoded_locations=DUMMY_BOUNDARY, + expiry=_helpers.utcnow() + datetime.timedelta(hours=1), + cooldown_expiry=None, + cooldown_duration=credentials._regional_access_boundary_utils.DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, + ) request = mock.Mock() headers = {} # First call should call refresh, setting the token. - credentials.before_request(request, "http://example.com", "GET", headers) - assert credentials.valid - assert credentials.token == "refreshed-token" + creds.before_request(request, "http://example.com", "GET", headers) + assert creds.valid + assert creds.token == "refreshed-token" assert headers["authorization"] == "Bearer refreshed-token" assert headers["x-allowed-locations"] == DUMMY_BOUNDARY @@ -137,7 +151,7 @@ def test_before_request_with_trust_boundary(): headers = {} # Second call shouldn't call refresh. - credentials.before_request(request, "http://example.com", "GET", headers) + creds.before_request(request, "http://example.com", "GET", headers) assert credentials.valid assert credentials.token == "refreshed-token" assert headers["authorization"] == "Bearer refreshed-token" @@ -366,113 +380,3 @@ def test_token_state_no_expiry(): c.before_request(request, "http://example.com", "GET", {}) -class TestCredentialsWithTrustBoundary(object): - @mock.patch.object(_client, "_lookup_trust_boundary") - def test_lookup_trust_boundary_env_var_not_true(self, mock_lookup_tb): - creds = CredentialsImpl() - request = mock.Mock() - - # Ensure env var is not "true" - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"} - ): - result = creds._refresh_trust_boundary(request) - - assert result is None - mock_lookup_tb.assert_not_called() - - @mock.patch.object(_client, "_lookup_trust_boundary") - def test_lookup_trust_boundary_env_var_missing(self, mock_lookup_tb): - creds = CredentialsImpl() - request = mock.Mock() - - # Ensure env var is missing - with mock.patch.dict(os.environ, clear=True): - result = creds._refresh_trust_boundary(request) - - assert result is None - mock_lookup_tb.assert_not_called() - - @mock.patch.object(_client, "_lookup_trust_boundary") - def test_lookup_trust_boundary_non_default_universe(self, mock_lookup_tb): - creds = CredentialsImpl() - creds._universe_domain = "my.universe.com" # Non-GDU - request = mock.Mock() - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - result = creds._refresh_trust_boundary(request) - - assert result is None - mock_lookup_tb.assert_not_called() - - @mock.patch.object(_client, "_lookup_trust_boundary") - def test_lookup_trust_boundary_calls_client_and_build_url(self, mock_lookup_tb): - creds = CredentialsImpl() - creds.token = "test_token" # For _build_trust_boundary_lookup_url - request = mock.Mock() - expected_url = "http://mock.url/lookup_for_test_token" - expected_boundary_info = {"encodedLocations": "0xABC"} - mock_lookup_tb.return_value = expected_boundary_info - - # Mock _build_trust_boundary_lookup_url to ensure it's called. - mock_build_url = mock.Mock(return_value=expected_url) - creds._build_trust_boundary_lookup_url = mock_build_url - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - result = creds._lookup_trust_boundary(request) - - assert result == expected_boundary_info - mock_build_url.assert_called_once() - expected_headers = {"authorization": "Bearer test_token"} - mock_lookup_tb.assert_called_once_with( - request, expected_url, headers=expected_headers - ) - - @mock.patch.object(_client, "_lookup_trust_boundary") - def test_lookup_trust_boundary_build_url_returns_none(self, mock_lookup_tb): - creds = CredentialsImpl() - request = mock.Mock() - - # Mock _build_trust_boundary_lookup_url to return None - mock_build_url = mock.Mock(return_value=None) - creds._build_trust_boundary_lookup_url = mock_build_url - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - with pytest.raises( - exceptions.InvalidValue, - match="Failed to build trust boundary lookup URL.", - ): - creds._lookup_trust_boundary(request) - - mock_build_url.assert_called_once() # Ensure _build_trust_boundary_lookup_url was called - mock_lookup_tb.assert_not_called() # Ensure _client.lookup_trust_boundary was not called - - @mock.patch("google.auth.credentials._LOGGER") - @mock.patch("google.auth._helpers.is_logging_enabled", return_value=True) - @mock.patch.object(_client, "_lookup_trust_boundary") - def test_refresh_trust_boundary_fails_with_cached_data_and_logging( - self, mock_lookup_tb, mock_is_logging_enabled, mock_logger - ): - creds = CredentialsImpl() - creds._trust_boundary = {"encodedLocations": "0xABC"} - request = mock.Mock() - - refresh_error = exceptions.RefreshError("Lookup failed") - mock_lookup_tb.side_effect = refresh_error - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - creds.refresh(request) - - mock_lookup_tb.assert_called_once() - mock_is_logging_enabled.assert_called_once_with(mock_logger) - mock_logger.debug.assert_called_once_with( - "Using cached trust boundary due to refresh error: %s", refresh_error - ) diff --git a/packages/google-auth/tests/test_external_account.py b/packages/google-auth/tests/test_external_account.py index e50a1b8c0c5c..50d3e4dd36e0 100644 --- a/packages/google-auth/tests/test_external_account.py +++ b/packages/google-auth/tests/test_external_account.py @@ -15,14 +15,12 @@ import datetime import http.client as http_client import json -import os from unittest import mock import urllib import pytest # type: ignore from google.auth import _helpers -from google.auth import environment_vars from google.auth import exceptions from google.auth import external_account from google.auth import transport @@ -128,11 +126,6 @@ class TestCredentials(object): "status": "INVALID_ARGUMENT", } } - NO_OP_TRUST_BOUNDARY = {"locations": [], "encodedLocations": "0x0"} - VALID_TRUST_BOUNDARY = { - "locations": ["us-central1", "us-east1"], - "encodedLocations": "0xVALIDHEXSA", - } PROJECT_ID = "my-proj-id" CLOUD_RESOURCE_MANAGER_URL = ( "https://cloudresourcemanager.googleapis.com/v1/projects/" @@ -158,7 +151,6 @@ def make_credentials( service_account_impersonation_url=None, service_account_impersonation_options={}, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ): return CredentialsImpl( audience=cls.AUDIENCE, @@ -174,7 +166,6 @@ def make_credentials( scopes=scopes, default_scopes=default_scopes, universe_domain=universe_domain, - trust_boundary=trust_boundary, ) @classmethod @@ -187,7 +178,6 @@ def make_workforce_pool_credentials( default_scopes=None, service_account_impersonation_url=None, workforce_pool_user_project=None, - trust_boundary=None, ): return CredentialsImpl( audience=cls.WORKFORCE_AUDIENCE, @@ -201,7 +191,6 @@ def make_workforce_pool_credentials( scopes=scopes, default_scopes=default_scopes, workforce_pool_user_project=workforce_pool_user_project, - trust_boundary=trust_boundary, ) @classmethod @@ -719,24 +708,6 @@ def test_refresh_without_client_auth_success( assert not credentials.expired assert credentials.token == response["access_token"] - @mock.patch("google.auth.external_account.Credentials._lookup_trust_boundary") - def test_refresh_skips_trust_boundary_lookup_when_disabled( - self, mock_lookup_trust_boundary - ): - credentials = self.make_credentials() - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - - credentials.refresh(request) - - assert credentials.valid - assert credentials.token == self.SUCCESS_RESPONSE["access_token"] - mock_lookup_trust_boundary.assert_not_called() - headers_applied = {} - credentials.apply(headers_applied) - assert "x-allowed-locations" not in headers_applied - def test_perform_refresh_token_with_cert_fingerprint(self): credentials = self.make_credentials() credentials._sts_client = mock.MagicMock() @@ -755,240 +726,6 @@ def test_perform_refresh_token_with_cert_fingerprint(self): _, kwargs = credentials._sts_client.exchange_token.call_args assert kwargs["additional_options"]["bindCertFingerprint"] == "my-fingerprint" - def test_refresh_skips_sending_allowed_locations_header_with_trust_boundary(self): - # This test verifies that the x-allowed-locations header is not sent with - # the STS request even if a trust boundary is cached. - trust_boundary_value = {"encodedLocations": "0x12345"} - headers = { - "Content-Type": "application/x-www-form-urlencoded", - "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false", - } - request_data = { - "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", - "audience": self.AUDIENCE, - "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", - "subject_token": "subject_token_0", - "subject_token_type": self.SUBJECT_TOKEN_TYPE, - } - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_credentials() - # Set a cached trust boundary. - credentials._trust_boundary = trust_boundary_value - - with mock.patch( - "google.auth.metrics.python_and_auth_lib_version", - return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, - ): - credentials.refresh(request) - - self.assert_token_request_kwargs(request.call_args[1], headers, request_data) - - def test_refresh_on_impersonated_credential_skips_parent_trust_boundary_lookup( - self, - ): - # This test verifies that the top-level impersonating credential - # does not perform a trust boundary lookup. - request = self.make_mock_request( - status=http_client.OK, - data=self.SUCCESS_RESPONSE, - impersonation_status=http_client.OK, - impersonation_data={ - "accessToken": "SA_ACCESS_TOKEN", - "expireTime": "2025-01-01T00:00:00Z", - }, - ) - credentials = self.make_credentials( - service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL - ) - - with mock.patch.object( - credentials, "_refresh_trust_boundary", autospec=True - ) as mock_refresh_trust_boundary: - credentials.refresh(request) - - mock_refresh_trust_boundary.assert_not_called() - - def test_refresh_fetches_no_op_trust_boundary(self): - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_credentials() - - with mock.patch.object( - credentials, - "_lookup_trust_boundary", - return_value=self.NO_OP_TRUST_BOUNDARY, - ) as mock_lookup, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - mock_lookup.assert_called_once() - headers = {} - credentials.apply(headers) - assert headers["x-allowed-locations"] == "" - - def test_refresh_skips_lookup_with_cached_no_op_boundary(self): - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_credentials() - credentials._trust_boundary = self.NO_OP_TRUST_BOUNDARY - - with mock.patch.object( - credentials, "_lookup_trust_boundary" - ) as mock_lookup, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - mock_lookup.assert_not_called() - headers = {} - credentials.apply(headers) - assert headers["x-allowed-locations"] == "" - - def test_refresh_fails_on_lookup_failure_with_no_cache(self): - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_credentials() - - with mock.patch.object( - credentials, - "_lookup_trust_boundary", - side_effect=exceptions.RefreshError("Lookup failed"), - ) as mock_lookup, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ), pytest.raises( - exceptions.RefreshError, match="Lookup failed" - ): - credentials.refresh(request) - - mock_lookup.assert_called_once() - - def test_refresh_uses_cached_boundary_on_lookup_failure(self): - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_credentials() - credentials._trust_boundary = {"encodedLocations": "0x123"} - - with mock.patch.object( - credentials, - "_lookup_trust_boundary", - side_effect=exceptions.RefreshError("Lookup failed"), - ) as mock_lookup, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - mock_lookup.assert_called_once() - headers = {} - credentials.apply(headers) - assert headers["x-allowed-locations"] == "0x123" - - def test_refresh_propagates_trust_boundary_to_impersonated_credential(self): - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_credentials( - service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, - trust_boundary=self.VALID_TRUST_BOUNDARY, - ) - impersonated_creds_mock = mock.Mock() - impersonated_creds_mock._trust_boundary = self.VALID_TRUST_BOUNDARY - - with mock.patch( - "google.auth.external_account.impersonated_credentials.Credentials", - return_value=impersonated_creds_mock, - ) as mock_impersonated_creds, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - mock_impersonated_creds.assert_called_once_with( - source_credentials=mock.ANY, - target_principal=mock.ANY, - target_scopes=mock.ANY, - quota_project_id=mock.ANY, - iam_endpoint_override=mock.ANY, - lifetime=mock.ANY, - trust_boundary=self.VALID_TRUST_BOUNDARY, - ) - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - - def test_build_trust_boundary_lookup_url_workload(self): - credentials = self.make_credentials() - expected_url = "https://iamcredentials.googleapis.com/v1/projects/123456/locations/global/workloadIdentityPools/POOL_ID/allowedLocations" - assert credentials._build_trust_boundary_lookup_url() == expected_url - - def test_build_trust_boundary_lookup_url_workforce(self): - credentials = self.make_workforce_pool_credentials() - expected_url = "https://iamcredentials.googleapis.com/v1/locations/global/workforcePools/POOL_ID/allowedLocations" - assert credentials._build_trust_boundary_lookup_url() == expected_url - - @pytest.mark.parametrize( - "audience", - [ - "invalid", - "//iam.googleapis.com/projects/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID", - "//iam.googleapis.com/locations/global/workforcsePools//providers/provider-id", - ], - ) - def test_build_trust_boundary_lookup_url_invalid_audience(self, audience): - credentials = self.make_credentials() - credentials._audience = audience - with pytest.raises(exceptions.InvalidValue, match="Invalid audience format."): - credentials._build_trust_boundary_lookup_url() - - def test_refresh_fetches_trust_boundary_workload(self): - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_credentials() - - with mock.patch.object( - credentials, - "_lookup_trust_boundary", - return_value=self.VALID_TRUST_BOUNDARY, - ) as mock_lookup, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - mock_lookup.assert_called_once() - headers = {} - credentials.apply(headers) - assert ( - headers["x-allowed-locations"] - == self.VALID_TRUST_BOUNDARY["encodedLocations"] - ) - - def test_refresh_fetches_trust_boundary_workforce(self): - request = self.make_mock_request( - status=http_client.OK, data=self.SUCCESS_RESPONSE - ) - credentials = self.make_workforce_pool_credentials() - - with mock.patch.object( - credentials, - "_lookup_trust_boundary", - return_value=self.VALID_TRUST_BOUNDARY, - ) as mock_lookup, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - mock_lookup.assert_called_once() - headers = {} - credentials.apply(headers) - assert ( - headers["x-allowed-locations"] - == self.VALID_TRUST_BOUNDARY["encodedLocations"] - ) - @mock.patch( "google.auth.metrics.python_and_auth_lib_version", return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, @@ -1990,34 +1727,28 @@ def test_before_request_expired(self, utcnow): "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]) } - def test_refresh_impersonation_trust_boundary(self): - request = self.make_mock_request( - status=http_client.OK, - data=self.SUCCESS_RESPONSE, - impersonation_status=http_client.OK, - impersonation_data={ - "accessToken": "SA_ACCESS_TOKEN", - "expireTime": "2025-01-01T00:00:00Z", - }, - ) - credentials = self.make_credentials( - service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL - ) - impersonated_creds_mock = mock.Mock() - impersonated_creds_mock._trust_boundary = self.VALID_TRUST_BOUNDARY - - with mock.patch( - "google.auth.external_account.impersonated_credentials.Credentials", - return_value=impersonated_creds_mock, - ): - credentials.refresh(request) + def test_build_regional_access_boundary_lookup_url_workload(self): + credentials = self.make_credentials() + expected_url = "https://iamcredentials.googleapis.com/v1/projects/123456/locations/global/workloadIdentityPools/POOL_ID/allowedLocations" + assert credentials._build_regional_access_boundary_lookup_url() == expected_url - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + def test_build_regional_access_boundary_lookup_url_workforce(self): + credentials = self.make_workforce_pool_credentials() + expected_url = "https://iamcredentials.googleapis.com/v1/locations/global/workforcePools/POOL_ID/allowedLocations" + assert credentials._build_regional_access_boundary_lookup_url() == expected_url - def test_with_trust_boundary(self): + @pytest.mark.parametrize( + "audience", + [ + "invalid", + "//iam.googleapis.com/projects/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID", + "//iam.googleapis.com/locations/global/workforcsePools//providers/provider-id", + ], + ) + def test_build_regional_access_boundary_lookup_url_invalid_audience(self, audience): credentials = self.make_credentials() - new_credentials = credentials.with_trust_boundary(self.VALID_TRUST_BOUNDARY) - assert new_credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + credentials._audience = audience + assert credentials._build_regional_access_boundary_lookup_url() is None @mock.patch("google.auth._helpers.utcnow") def test_before_request_impersonation_expired(self, utcnow): diff --git a/packages/google-auth/tests/test_external_account_authorized_user.py b/packages/google-auth/tests/test_external_account_authorized_user.py index b023ddf3dc36..648966d924bf 100644 --- a/packages/google-auth/tests/test_external_account_authorized_user.py +++ b/packages/google-auth/tests/test_external_account_authorized_user.py @@ -15,12 +15,10 @@ import datetime import http.client as http_client import json -import os from unittest import mock import pytest # type: ignore -from google.auth import environment_vars from google.auth import exceptions from google.auth import external_account_authorized_user from google.auth import transport @@ -559,26 +557,6 @@ def test_with_universe_domain(self): assert new_creds._quota_project_id == QUOTA_PROJECT_ID assert new_creds.universe_domain == FAKE_UNIVERSE_DOMAIN - def test_with_trust_boundary(self): - creds = self.make_credentials( - token=ACCESS_TOKEN, - expiry=NOW, - revoke_url=REVOKE_URL, - quota_project_id=QUOTA_PROJECT_ID, - ) - new_creds = creds.with_trust_boundary({"encodedLocations": "new_boundary"}) - assert new_creds._audience == creds._audience - assert new_creds._refresh_token == creds.refresh_token - assert new_creds._token_url == creds._token_url - assert new_creds._token_info_url == creds._token_info_url - assert new_creds._client_id == creds._client_id - assert new_creds._client_secret == creds._client_secret - assert new_creds.token == creds.token - assert new_creds.expiry == creds.expiry - assert new_creds._revoke_url == creds._revoke_url - assert new_creds._quota_project_id == QUOTA_PROJECT_ID - assert new_creds._trust_boundary == {"encodedLocations": "new_boundary"} - def test_from_file_required_options_only(self, tmpdir): from_creds = self.make_credentials() config_file = tmpdir.join("config.json") @@ -623,64 +601,18 @@ def test_from_file_full_options(self, tmpdir): assert creds._revoke_url == REVOKE_URL assert creds._quota_project_id == QUOTA_PROJECT_ID - def test_refresh_fetches_trust_boundary(self): - request = self.make_mock_request( - status=http_client.OK, - data={"access_token": ACCESS_TOKEN, "expires_in": 3600}, - ) - credentials = self.make_credentials() - - with mock.patch.object( - credentials, - "_lookup_trust_boundary", - return_value={"encodedLocations": "0x123"}, - ) as mock_lookup, mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - mock_lookup.assert_called_once() - headers = {} - credentials.apply(headers) - assert headers["x-allowed-locations"] == "0x123" - - def test_refresh_skips_trust_boundary_lookup_when_disabled(self): - request = self.make_mock_request( - status=http_client.OK, - data={"access_token": ACCESS_TOKEN, "expires_in": 3600}, - ) - credentials = self.make_credentials() - - with mock.patch.object( - credentials, "_lookup_trust_boundary" - ) as mock_lookup, mock.patch.dict(os.environ, {}, clear=True): - credentials.refresh(request) - - mock_lookup.assert_not_called() - headers = {} - credentials.apply(headers) - assert "x-allowed-locations" not in headers - - def test_build_trust_boundary_lookup_url(self): + def test_build_regional_access_boundary_lookup_url(self): credentials = self.make_credentials() expected_url = "https://iamcredentials.googleapis.com/v1/locations/global/workforcePools/POOL_ID/allowedLocations" - assert credentials._build_trust_boundary_lookup_url() == expected_url + assert credentials._build_regional_access_boundary_lookup_url() == expected_url @pytest.mark.parametrize( "audience", [ "invalid", - "//iam.googleapis.com/locations/global/workforcePools/", "//iam.googleapis.com/locations/global/providers/", - "//iam.googleapis.com/workforcePools/POOL_ID/providers/PROVIDER_ID", ], ) - def test_build_trust_boundary_lookup_url_invalid_audience(self, audience): + def test_build_regional_access_boundary_lookup_url_invalid_audience(self, audience): credentials = self.make_credentials(audience=audience) - with pytest.raises(exceptions.InvalidValue): - credentials._build_trust_boundary_lookup_url() - - def test_build_trust_boundary_lookup_url_different_universe(self): - credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN) - expected_url = "https://iamcredentials.fake-universe-domain/v1/locations/global/workforcePools/POOL_ID/allowedLocations" - assert credentials._build_trust_boundary_lookup_url() == expected_url + assert credentials._build_regional_access_boundary_lookup_url() is None diff --git a/packages/google-auth/tests/test_identity_pool.py b/packages/google-auth/tests/test_identity_pool.py index c68fac64708d..2ffae6b8d2fd 100644 --- a/packages/google-auth/tests/test_identity_pool.py +++ b/packages/google-auth/tests/test_identity_pool.py @@ -508,7 +508,6 @@ def test_from_info_full_options(self, mock_init): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -538,7 +537,6 @@ def test_from_info_required_options_only(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -570,7 +568,6 @@ def test_from_info_supplier(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -601,7 +598,6 @@ def test_from_info_workforce_pool(self, mock_init): quota_project_id=None, workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -638,7 +634,6 @@ def test_from_file_full_options(self, mock_init, tmpdir): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -669,7 +664,6 @@ def test_from_file_required_options_only(self, mock_init, tmpdir): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -701,7 +695,6 @@ def test_from_file_workforce_pool(self, mock_init, tmpdir): quota_project_id=None, workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) def test_constructor_nonworkforce_with_workforce_pool_user_project(self): diff --git a/packages/google-auth/tests/test_impersonated_credentials.py b/packages/google-auth/tests/test_impersonated_credentials.py index 3ff7281a672d..d2310c370b27 100644 --- a/packages/google-auth/tests/test_impersonated_credentials.py +++ b/packages/google-auth/tests/test_impersonated_credentials.py @@ -22,7 +22,6 @@ import pytest # type: ignore from google.auth import _helpers -from google.auth import credentials as auth_credentials from google.auth import crypt from google.auth import environment_vars from google.auth import exceptions @@ -128,21 +127,9 @@ class TestImpersonatedCredentials(object): # Because Python 2.7: DELEGATES = [] # type: ignore LIFETIME = 3600 - NO_OP_TRUST_BOUNDARY = { - "locations": auth_credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS, - "encodedLocations": auth_credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS, - } - VALID_TRUST_BOUNDARY = { - "locations": ["us-central1", "us-east1"], - "encodedLocations": "0xVALIDHEX", - } - EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = ( - "https://iamcredentials.googleapis.com/v1/projects/-" - "/serviceAccounts/impersonated@project.iam.gserviceaccount.com/allowedLocations" - ) - FAKE_UNIVERSE_DOMAIN = "universe.foo" + SOURCE_CREDENTIALS = service_account.Credentials( - SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI, trust_boundary=NO_OP_TRUST_BOUNDARY + SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI ) USER_SOURCE_CREDENTIALS = credentials.Credentials(token="ABCDE") IAM_ENDPOINT_OVERRIDE = ( @@ -157,7 +144,6 @@ def make_credentials( target_principal=TARGET_PRINCIPAL, subject=None, iam_endpoint_override=None, - trust_boundary=None, # Align with Credentials class default ): return Credentials( source_credentials=source_credentials, @@ -167,7 +153,6 @@ def make_credentials( lifetime=lifetime, subject=subject, iam_endpoint_override=iam_endpoint_override, - trust_boundary=trust_boundary, ) def test_from_impersonated_service_account_info(self): @@ -178,17 +163,6 @@ def test_from_impersonated_service_account_info(self): ) assert isinstance(credentials, impersonated_credentials.Credentials) - def test_from_impersonated_service_account_info_with_trust_boundary(self): - info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO) - info["trust_boundary"] = self.VALID_TRUST_BOUNDARY - credentials = ( - impersonated_credentials.Credentials.from_impersonated_service_account_info( - info - ) - ) - assert isinstance(credentials, impersonated_credentials.Credentials) - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - def test_from_impersonated_service_account_info_with_invalid_source_credentials_type( self, ): @@ -311,12 +285,8 @@ def test_token_usage_metrics(self): assert headers["x-goog-api-client"] == "cred-type/imp" @pytest.mark.parametrize("use_data_bytes", [True, False]) - @mock.patch("google.oauth2._client._lookup_trust_boundary") - def test_refresh_success( - self, mock_lookup_trust_boundary, use_data_bytes, mock_donor_credentials - ): - # Start with no boundary. - credentials = self.make_credentials(lifetime=None, trust_boundary=None) + def test_refresh_success(self, use_data_bytes, mock_donor_credentials): + credentials = self.make_credentials(lifetime=None) token = "token" expire_time = ( @@ -330,12 +300,7 @@ def test_refresh_success( use_data_bytes=use_data_bytes, ) - # Mock the trust boundary lookup to return a valid value. - mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ), mock.patch( + with mock.patch( "google.auth.metrics.token_request_access_token_impersonate", return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, ): @@ -348,239 +313,6 @@ def test_refresh_success( == ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE ) - # Verify that the x-allowed-locations header from the source credential - # was applied. The source credential has a NO_OP boundary, so the - # header should be an empty string. - request_kwargs = request.call_args[1] - assert "headers" in request_kwargs - assert "x-allowed-locations" in request_kwargs["headers"] - assert request_kwargs["headers"]["x-allowed-locations"] == "" - - # Verify trust boundary was set. - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - - # Verify the mock was called with the correct URL. - mock_lookup_trust_boundary.assert_called_once_with( - request, - self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, - headers={"authorization": "Bearer token"}, - ) - - # Verify x-allowed-locations header is set correctly by apply(). - headers_applied = {} - credentials.apply(headers_applied) - assert ( - headers_applied["x-allowed-locations"] - == self.VALID_TRUST_BOUNDARY["encodedLocations"] - ) - - def test_refresh_source_creds_no_trust_boundary(self): - # Use a source credential that does not support trust boundaries. - source_credentials = credentials.Credentials(token="source_token") - creds = self.make_credentials(source_credentials=source_credentials) - token = "impersonated_token" - - expire_time = ( - _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) - ).isoformat("T") + "Z" - response_body = {"accessToken": token, "expireTime": expire_time} - - request = self.make_request( - data=json.dumps(response_body), status=http_client.OK - ) - - creds.refresh(request) - - # Verify that the x-allowed-locations header was NOT applied because - # the source credential does not support trust boundaries. - request_kwargs = request.call_args[1] - assert "x-allowed-locations" not in request_kwargs["headers"] - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - def test_refresh_trust_boundary_lookup_fails_no_cache( - self, mock_lookup_trust_boundary, mock_donor_credentials - ): - # Start with no trust boundary - credentials = self.make_credentials(lifetime=None, trust_boundary=None) - token = "token" - - expire_time = ( - _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) - ).isoformat("T") + "Z" - response_body = {"accessToken": token, "expireTime": expire_time} - - request = self.make_request( - data=json.dumps(response_body), status=http_client.OK - ) - - # Mock the trust boundary lookup to raise an error - mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( - "Lookup failed" - ) - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ), pytest.raises(exceptions.RefreshError) as excinfo: - credentials.refresh(request) - - assert "Lookup failed" in str(excinfo.value) - assert credentials._trust_boundary is None # Still no trust boundary - mock_lookup_trust_boundary.assert_called_once() - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - def test_refresh_fetches_no_op_trust_boundary( - self, mock_lookup_trust_boundary, mock_donor_credentials - ): - # Start with no trust boundary - credentials = self.make_credentials(lifetime=None, trust_boundary=None) - token = "token" - expire_time = ( - _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) - ).isoformat("T") + "Z" - response_body = {"accessToken": token, "expireTime": expire_time} - request = self.make_request( - data=json.dumps(response_body), status=http_client.OK - ) - - mock_lookup_trust_boundary.return_value = ( - self.NO_OP_TRUST_BOUNDARY - ) # Mock returns NO_OP - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ), mock.patch( - "google.auth.metrics.token_request_access_token_impersonate", - return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - ): - credentials.refresh(request) - - assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY - mock_lookup_trust_boundary.assert_called_once_with( - request, - self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, - headers={"authorization": "Bearer token"}, - ) - headers_applied = {} - credentials.apply(headers_applied) - assert headers_applied["x-allowed-locations"] == "" - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - def test_refresh_skips_trust_boundary_lookup_non_default_universe( - self, mock_lookup_trust_boundary - ): - # Create source credentials with a non-default universe domain - source_credentials = service_account.Credentials( - SIGNER, - "some@email.com", - TOKEN_URI, - universe_domain=self.FAKE_UNIVERSE_DOMAIN, - ) - # Create impersonated credentials using the non-default source credentials - credentials = self.make_credentials(source_credentials=source_credentials) - - # Mock the IAM credentials API call for generateAccessToken - token = "token" - expire_time = ( - _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) - ).isoformat("T") + "Z" - response_body = {"accessToken": token, "expireTime": expire_time} - request = self.make_request( - data=json.dumps(response_body), status=http_client.OK - ) - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - # Ensure trust boundary lookup was not called - mock_lookup_trust_boundary.assert_not_called() - # Verify that x-allowed-locations header is not set by apply() - headers_applied = {} - credentials.apply(headers_applied) - assert "x-allowed-locations" not in headers_applied - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( - self, mock_lookup_trust_boundary, mock_donor_credentials - ): - credentials = self.make_credentials( - lifetime=None, trust_boundary=self.NO_OP_TRUST_BOUNDARY - ) # Start with NO_OP - token = "token" - expire_time = ( - _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) - ).isoformat("T") + "Z" - response_body = {"accessToken": token, "expireTime": expire_time} - request = self.make_request( - data=json.dumps(response_body), status=http_client.OK - ) - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ), mock.patch( - "google.auth.metrics.token_request_access_token_impersonate", - return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - ): - credentials.refresh(request) - - # Verify trust boundary remained NO_OP - assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY - - # Lookup should be skipped - mock_lookup_trust_boundary.assert_not_called() - - # Verify that an empty header was added. - headers_applied = {} - credentials.apply(headers_applied) - assert headers_applied["x-allowed-locations"] == "" - - @mock.patch("google.oauth2._client._lookup_trust_boundary") - def test_refresh_trust_boundary_lookup_fails_with_cached_data2( - self, mock_lookup_trust_boundary, mock_donor_credentials - ): - # Start with no trust boundary - credentials = self.make_credentials(lifetime=None, trust_boundary=None) - token = "token" - - expire_time = ( - _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) - ).isoformat("T") + "Z" - response_body = {"accessToken": token, "expireTime": expire_time} - - request = self.make_request( - data=json.dumps(response_body), status=http_client.OK - ) - - # First refresh: Successfully fetch a valid trust boundary. - mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ), mock.patch( - "google.auth.metrics.token_request_access_token_impersonate", - return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - ): - credentials.refresh(request) - - assert credentials.valid - # Verify trust boundary was set. - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - mock_lookup_trust_boundary.assert_called_once() - - # Second refresh: Mock lookup to fail, but expect cached data to be preserved. - mock_lookup_trust_boundary.reset_mock() - mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( - "Lookup failed" - ) - - with mock.patch.dict( - os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} - ): - credentials.refresh(request) - - assert credentials.valid - assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY - mock_lookup_trust_boundary.assert_called_once() - @pytest.mark.parametrize("use_data_bytes", [True, False]) def test_refresh_with_subject_success(self, use_data_bytes, mock_dwd_credentials): credentials = self.make_credentials(subject="test@email.com", lifetime=None) @@ -981,36 +713,18 @@ def test_with_scopes(self): assert credentials.requires_scopes is False assert credentials._target_scopes == ["fake_scope1", "fake_scope2"] - def test_with_trust_boundary(self): - credentials = self.make_credentials() - new_boundary = {"encodedLocations": "new_boundary"} - new_credentials = credentials.with_trust_boundary(new_boundary) - - assert new_credentials is not credentials - assert new_credentials._trust_boundary == new_boundary - # The source credentials should be a copy, not the same object. - # But they should be functionally equivalent. - assert ( - new_credentials._source_credentials is not credentials._source_credentials - ) - - assert ( - new_credentials._source_credentials.service_account_email - == credentials._source_credentials.service_account_email - ) - assert ( - new_credentials._source_credentials._signer - == credentials._source_credentials._signer - ) - assert new_credentials._target_principal == credentials._target_principal - - def test_build_trust_boundary_lookup_url_no_email(self): + def test_build_regional_access_boundary_lookup_url_no_email(self): credentials = self.make_credentials(target_principal=None) - with pytest.raises(ValueError) as excinfo: - credentials._build_trust_boundary_lookup_url() + assert credentials._build_regional_access_boundary_lookup_url() is None - assert "Service account email is required" in str(excinfo.value) + def test_build_regional_access_boundary_lookup_url_success(self): + credentials = self.make_credentials() + # Ensure service_account_email is properly set by default mock + expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}/allowedLocations".format( + credentials.service_account_email + ) + assert credentials._build_regional_access_boundary_lookup_url() == expected_url def test_with_scopes_provide_default_scopes(self): credentials = self.make_credentials() diff --git a/packages/google-auth/tests/test_pluggable.py b/packages/google-auth/tests/test_pluggable.py index b2764361bd5d..70e8c188a0c4 100644 --- a/packages/google-auth/tests/test_pluggable.py +++ b/packages/google-auth/tests/test_pluggable.py @@ -274,7 +274,6 @@ def test_from_info_full_options(self, mock_init): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) @@ -303,7 +302,6 @@ def test_from_info_required_options_only(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) @@ -339,7 +337,6 @@ def test_from_file_full_options(self, mock_init, tmpdir): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) @@ -369,7 +366,6 @@ def test_from_file_required_options_only(self, mock_init, tmpdir): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, - trust_boundary=None, ) def test_constructor_invalid_options(self): From 1b570652e9c07366d29b4f3071add5dc95a2c91b Mon Sep 17 00:00:00 2001 From: nbayati <99771966+nbayati@users.noreply.github.com> Date: Thu, 12 Mar 2026 10:27:46 -0700 Subject: [PATCH 2/2] Fix linting issues --- packages/google-auth/google/auth/_helpers.py | 1 - .../auth/_regional_access_boundary_utils.py | 2 +- .../google/auth/compute_engine/credentials.py | 1 + .../google/auth/external_account.py | 1 + .../google/auth/impersonated_credentials.py | 1 + .../tests/compute_engine/test_credentials.py | 3 --- .../test__regional_access_boundary_utils.py | 21 +++++++++++-------- .../google-auth/tests/test_credentials.py | 10 ++------- .../tests/test_impersonated_credentials.py | 1 - 9 files changed, 18 insertions(+), 23 deletions(-) diff --git a/packages/google-auth/google/auth/_helpers.py b/packages/google-auth/google/auth/_helpers.py index 19aabf51f826..08146221503e 100644 --- a/packages/google-auth/google/auth/_helpers.py +++ b/packages/google-auth/google/auth/_helpers.py @@ -21,7 +21,6 @@ import hashlib import json import logging -import os import sys from typing import Any, Dict, Mapping, Optional, Union import urllib diff --git a/packages/google-auth/google/auth/_regional_access_boundary_utils.py b/packages/google-auth/google/auth/_regional_access_boundary_utils.py index 41f210d53593..b17f1f912d94 100644 --- a/packages/google-auth/google/auth/_regional_access_boundary_utils.py +++ b/packages/google-auth/google/auth/_regional_access_boundary_utils.py @@ -19,7 +19,7 @@ import logging import os import threading -from typing import NamedTuple, Optional, Mapping, Any, MutableMapping +from typing import NamedTuple, Optional from google.auth import _helpers from google.auth import environment_vars diff --git a/packages/google-auth/google/auth/compute_engine/credentials.py b/packages/google-auth/google/auth/compute_engine/credentials.py index 795f46663543..5e9e36ea404b 100644 --- a/packages/google-auth/google/auth/compute_engine/credentials.py +++ b/packages/google-auth/google/auth/compute_engine/credentials.py @@ -21,6 +21,7 @@ import datetime import logging +import warnings from google.auth import _constants from google.auth import _helpers diff --git a/packages/google-auth/google/auth/external_account.py b/packages/google-auth/google/auth/external_account.py index b77215195af4..fee7acf24644 100644 --- a/packages/google-auth/google/auth/external_account.py +++ b/packages/google-auth/google/auth/external_account.py @@ -36,6 +36,7 @@ import json import logging import re +import warnings from google.auth import _constants from google.auth import _helpers diff --git a/packages/google-auth/google/auth/impersonated_credentials.py b/packages/google-auth/google/auth/impersonated_credentials.py index 0eb77da34348..3228ab3d867c 100644 --- a/packages/google-auth/google/auth/impersonated_credentials.py +++ b/packages/google-auth/google/auth/impersonated_credentials.py @@ -31,6 +31,7 @@ import http.client as http_client import json import logging +import warnings from google.auth import _constants from google.auth import _exponential_backoff diff --git a/packages/google-auth/tests/compute_engine/test_credentials.py b/packages/google-auth/tests/compute_engine/test_credentials.py index 1c833c85bcaf..6aaa7d67a640 100644 --- a/packages/google-auth/tests/compute_engine/test_credentials.py +++ b/packages/google-auth/tests/compute_engine/test_credentials.py @@ -13,14 +13,12 @@ # limitations under the License. import base64 import datetime -import os from unittest import mock import pytest # type: ignore import responses # type: ignore from google.auth import _helpers -from google.auth import environment_vars from google.auth import exceptions from google.auth import jwt from google.auth import transport @@ -293,7 +291,6 @@ def test_user_provided_universe_domain(self, get_universe_domain): # domain endpoint. get_universe_domain.assert_not_called() - @mock.patch( "google.auth.compute_engine._metadata.get_service_account_info", autospec=True ) diff --git a/packages/google-auth/tests/test__regional_access_boundary_utils.py b/packages/google-auth/tests/test__regional_access_boundary_utils.py index 37194136ae24..bd9778bdb17f 100644 --- a/packages/google-auth/tests/test__regional_access_boundary_utils.py +++ b/packages/google-auth/tests/test__regional_access_boundary_utils.py @@ -22,7 +22,6 @@ from google.auth import _regional_access_boundary_utils from google.auth import credentials from google.auth import environment_vars -from google.auth import exceptions class CredentialsImpl(credentials.CredentialsWithRegionalAccessBoundary): @@ -61,20 +60,25 @@ def clear_rab_cache(): class TestCredentialsWithRegionalAccessBoundary(object): def test_is_regional_access_boundary_enabled_cached(self, monkeypatch): # Set to true - monkeypatch.setenv( - environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, "true" + monkeypatch.setenv(environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, "true") + assert ( + _regional_access_boundary_utils.is_regional_access_boundary_enabled() + is True ) - assert _regional_access_boundary_utils.is_regional_access_boundary_enabled() is True # Change env var to false, but it should still return True due to caching - monkeypatch.setenv( - environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, "false" + monkeypatch.setenv(environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, "false") + assert ( + _regional_access_boundary_utils.is_regional_access_boundary_enabled() + is True ) - assert _regional_access_boundary_utils.is_regional_access_boundary_enabled() is True # Clear cache and it should now reflect the new value _regional_access_boundary_utils.is_regional_access_boundary_enabled.cache_clear() - assert _regional_access_boundary_utils.is_regional_access_boundary_enabled() is False + assert ( + _regional_access_boundary_utils.is_regional_access_boundary_enabled() + is False + ) @mock.patch( "google.auth._regional_access_boundary_utils._RegionalAccessBoundaryRefreshManager.start_refresh" @@ -248,7 +252,6 @@ def test_maybe_start_refresh_is_skipped_if_non_default_universe_domain( def test_maybe_start_refresh_handles_url_parse_errors( self, mock_urlparse, mock_start_refresh ): - from urllib.parse import urlparse mock_urlparse.side_effect = ValueError("Malformed URL") creds = CredentialsImpl() request = mock.Mock() diff --git a/packages/google-auth/tests/test_credentials.py b/packages/google-auth/tests/test_credentials.py index 0cc29e4a66fc..3cd33bb37db4 100644 --- a/packages/google-auth/tests/test_credentials.py +++ b/packages/google-auth/tests/test_credentials.py @@ -13,16 +13,12 @@ # limitations under the License. import datetime -import os from unittest import mock - import pytest # type: ignore from google.auth import _helpers from google.auth import credentials -from google.auth import environment_vars -from google.auth import exceptions class CredentialsImpl(credentials.CredentialsWithRegionalAccessBoundary): @@ -152,8 +148,8 @@ def test_before_request_with_regional_access_boundary(): # Second call shouldn't call refresh. creds.before_request(request, "http://example.com", "GET", headers) - assert credentials.valid - assert credentials.token == "refreshed-token" + assert creds.valid + assert creds.token == "refreshed-token" assert headers["authorization"] == "Bearer refreshed-token" assert headers["x-allowed-locations"] == DUMMY_BOUNDARY @@ -378,5 +374,3 @@ def test_token_state_no_expiry(): assert c.token_state == credentials.TokenState.FRESH c.before_request(request, "http://example.com", "GET", {}) - - diff --git a/packages/google-auth/tests/test_impersonated_credentials.py b/packages/google-auth/tests/test_impersonated_credentials.py index d2310c370b27..500209f663d7 100644 --- a/packages/google-auth/tests/test_impersonated_credentials.py +++ b/packages/google-auth/tests/test_impersonated_credentials.py @@ -23,7 +23,6 @@ from google.auth import _helpers from google.auth import crypt -from google.auth import environment_vars from google.auth import exceptions from google.auth import impersonated_credentials from google.auth import transport