Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions packages/google-auth/google/auth/_constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Copyright 2026 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Shared constants."""

_SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations"
_WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/locations/global/workforcePools/{pool_id}/allowedLocations"
_WORKLOAD_IDENTITY_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations"
_SERVICE_ACCOUNT_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations"
_WORKFORCE_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.googleapis.com/v1/locations/global/workforcePools/{pool_id}/allowedLocations"
_WORKLOAD_IDENTITY_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.googleapis.com/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations"
40 changes: 0 additions & 40 deletions packages/google-auth/google/auth/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,46 +308,6 @@ def unpadded_urlsafe_b64encode(value):
return base64.urlsafe_b64encode(value).rstrip(b"=")


def get_bool_from_env(variable_name, default=False):
"""Gets a boolean value from an environment variable.

The environment variable is interpreted as a boolean with the following
(case-insensitive) rules:
- "true", "1" are considered true.
- "false", "0" are considered false.
Any other values will raise an exception.

Args:
variable_name (str): The name of the environment variable.
default (bool): The default value if the environment variable is not
set.

Returns:
bool: The boolean value of the environment variable.

Raises:
google.auth.exceptions.InvalidValue: If the environment variable is
set to a value that can not be interpreted as a boolean.
"""
value = os.environ.get(variable_name)

if value is None:
return default

value = value.lower()

if value in ("true", "1"):
return True
elif value in ("false", "0"):
return False
else:
raise exceptions.InvalidValue(
'Environment variable "{}" must be one of "true", "false", "1", or "0".'.format(
variable_name
)
)


def is_python_3():
"""Check if the Python interpreter is Python 2 or 3.

Expand Down
263 changes: 263 additions & 0 deletions packages/google-auth/google/auth/_regional_access_boundary_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
# Copyright 2026 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utilities for Regional Access Boundary management."""

import datetime
import functools
import logging
import os
import threading
from typing import NamedTuple, Optional, Mapping, Any, MutableMapping

from google.auth import _helpers
from google.auth import environment_vars

_LOGGER = logging.getLogger(__name__)


@functools.lru_cache()
def is_regional_access_boundary_enabled():
"""Checks if Regional Access Boundary is enabled via environment variable.

The environment variable is interpreted as a boolean with the following
(case-insensitive) rules:
- "true", "1" are considered true.
- Any other value (or unset) is considered false.

Returns:
bool: True if Regional Access Boundary is enabled, False otherwise.
"""
value = os.environ.get(environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED)
if value is None:
return False

return value.lower() in ("true", "1")


# The default lifetime for a cached Regional Access Boundary.
DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL = datetime.timedelta(hours=6)

# The period of time prior to the boundary's expiration when a background refresh
# is proactively triggered.
REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD = datetime.timedelta(hours=1)

# The initial cooldown period for a failed Regional Access Boundary lookup.
DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(minutes=15)

# The maximum cooldown period for a failed Regional Access Boundary lookup.
MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN = datetime.timedelta(hours=6)


# The header key used for Regional Access Boundaries.
_REGIONAL_ACCESS_BOUNDARY_HEADER = "x-allowed-locations"


class _RegionalAccessBoundaryData(NamedTuple):
"""Data container for a Regional Access Boundary snapshot.

Attributes:
encoded_locations (Optional[str]): The encoded Regional Access Boundary string.
expiry (Optional[datetime.datetime]): The hard expiration time of the boundary data.
cooldown_expiry (Optional[datetime.datetime]): The time until which further lookups are skipped.
cooldown_duration (datetime.timedelta): The current duration for the exponential cooldown.
"""

encoded_locations: Optional[str]
expiry: Optional[datetime.datetime]
cooldown_expiry: Optional[datetime.datetime]
cooldown_duration: datetime.timedelta


class _RegionalAccessBoundaryManager(object):
"""Manages the Regional Access Boundary state and its background refresh.

This class provides a stable container for the Regional Access Boundary state,
allowing cloned credentials to share the same underlying state and refresh mechanism.
The actual data is held in an immutable `_RegionalAccessBoundaryData` object
and is swapped atomically to ensure thread-safe, lock-free reads.
"""

def __init__(self):
self._data = _RegionalAccessBoundaryData(
encoded_locations=None,
expiry=None,
cooldown_expiry=None,
cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
)
self.refresh_manager = _RegionalAccessBoundaryRefreshManager()
self._update_lock = threading.Lock()

def apply_headers(self, headers):
"""Applies the Regional Access Boundary header to the provided dictionary.

If the boundary is valid, the 'x-allowed-locations' header is added
or updated. Otherwise, the header is removed to ensure no stale
data is sent.

Args:
headers (MutableMapping[str, str]): The headers dictionary to update.
"""
rab_data = self._data

if rab_data.encoded_locations is not None and (
rab_data.expiry is not None and _helpers.utcnow() < rab_data.expiry
):
headers[_REGIONAL_ACCESS_BOUNDARY_HEADER] = rab_data.encoded_locations
else:
headers.pop(_REGIONAL_ACCESS_BOUNDARY_HEADER, None)

def maybe_start_refresh(self, credentials, request):
"""Starts a background thread to refresh the Regional Access Boundary if needed.

Args:
credentials (google.auth.credentials.Credentials): The credentials to refresh.
request (google.auth.transport.Request): The object used to make HTTP requests.
"""
rab_data = self._data

# Don't start a new refresh if the Regional Access Boundary info is still fresh.
if (
rab_data.encoded_locations
and rab_data.expiry
and _helpers.utcnow()
< (rab_data.expiry - REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD)
):
return

# Don't start a new refresh if the cooldown is still in effect.
if rab_data.cooldown_expiry and _helpers.utcnow() < rab_data.cooldown_expiry:
return

# If all checks pass, start the background refresh.
self.refresh_manager.start_refresh(credentials, request, self)


class _RegionalAccessBoundaryRefreshThread(threading.Thread):
"""Thread for background refreshing of the Regional Access Boundary."""

def __init__(self, credentials, request, rab_manager):
super(_RegionalAccessBoundaryRefreshThread, self).__init__()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better maintainability and consistency with modern Python 3 practices, it's recommended to use the argument-less super() call.

Suggested change
super(_RegionalAccessBoundaryRefreshThread, self).__init__()
super().__init__()

self.daemon = True
self._credentials = credentials
self._request = request
self._rab_manager = rab_manager

def run(self):
"""
Performs the Regional Access Boundary lookup and updates the state.

This method is run in a separate thread. It delegates the actual lookup
to the credentials object's `_lookup_regional_access_boundary` method.
Based on the lookup's outcome (success or complete failure after retries),
it updates the cached Regional Access Boundary information,
its expiry, its cooldown expiry, and its exponential cooldown duration.
"""
# Catch exceptions (e.g., from the underlying transport) to prevent the
# background thread from crashing. This ensures we can gracefully enter
# an exponential cooldown state on failure.
try:
regional_access_boundary_info = (
self._credentials._lookup_regional_access_boundary(self._request)
)
except Exception as e:
if _helpers.is_logging_enabled(_LOGGER):
_LOGGER.warning(
"Asynchronous Regional Access Boundary lookup raised an exception: %s",
e,
exc_info=True,
)
regional_access_boundary_info = None

with self._rab_manager._update_lock:
# Capture the current state before calculating updates.
current_data = self._rab_manager._data

if regional_access_boundary_info:
# On success, update the boundary and its expiry, and clear any cooldown.
encoded_locations = regional_access_boundary_info.get(
"encodedLocations"
)
updated_data = _RegionalAccessBoundaryData(
encoded_locations=encoded_locations,
expiry=_helpers.utcnow() + DEFAULT_REGIONAL_ACCESS_BOUNDARY_TTL,
cooldown_expiry=None,
cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
)
if _helpers.is_logging_enabled(_LOGGER):
_LOGGER.debug(
"Asynchronous Regional Access Boundary lookup successful."
)
else:
# On failure, calculate cooldown and update state.
if _helpers.is_logging_enabled(_LOGGER):
_LOGGER.warning(
"Asynchronous Regional Access Boundary lookup failed. Entering cooldown."
)

next_cooldown_expiry = (
_helpers.utcnow() + current_data.cooldown_duration
)
next_cooldown_duration = min(
current_data.cooldown_duration * 2,
MAX_REGIONAL_ACCESS_BOUNDARY_COOLDOWN,
)

# If the refresh failed, we keep reusing the existing data unless
# it has reached its hard expiration time.
if current_data.expiry and _helpers.utcnow() > current_data.expiry:
next_encoded_locations = None
next_expiry = None
else:
next_encoded_locations = current_data.encoded_locations
next_expiry = current_data.expiry

updated_data = _RegionalAccessBoundaryData(
encoded_locations=next_encoded_locations,
expiry=next_expiry,
cooldown_expiry=next_cooldown_expiry,
cooldown_duration=next_cooldown_duration,
)

# Perform the atomic swap of the state object.
self._rab_manager._data = updated_data


class _RegionalAccessBoundaryRefreshManager(object):
"""Manages a thread for background refreshing of the Regional Access Boundary."""

def __init__(self):
self._lock = threading.Lock()
self._worker = None

def start_refresh(self, credentials, request, rab_manager):
"""
Starts a background thread to refresh the Regional Access Boundary if one is not already running.

Args:
credentials (CredentialsWithRegionalAccessBoundary): The credentials
to refresh.
request (google.auth.transport.Request): The object used to make
HTTP requests.
rab_manager (_RegionalAccessBoundaryManager): The manager container to update.
"""
with self._lock:
if self._worker and self._worker.is_alive():
# A refresh is already in progress.
return

self._worker = _RegionalAccessBoundaryRefreshThread(
credentials, request, rab_manager
)
self._worker.start()
Loading
Loading