From 400eb18d92bbddba9a40e9ec900a69c50d1b5485 Mon Sep 17 00:00:00 2001 From: Josef Viehhauser Date: Tue, 28 Apr 2026 14:46:25 +0200 Subject: [PATCH] Add --revoke-sso-token flag to export-credentials When set, after credentials are emitted, the IAM Identity Center access token used to resolve them is server-side revoked via sso.Logout and its on-disk cache file is removed. The flag is opt-in and surgical: only the cache file for the active profile's sso_session (or legacy sso_start_url) is touched. Other profiles and concurrent SSO sessions are untouched. The cache key is derived identically to botocore's SSOTokenLoader so the file we look at is always the file aws sso login wrote. Best-effort error handling: credentials have already been emitted when revoke runs, so any failure must not change the exit status. Both the narrow ClientError path inside revoke_sso_token (which is also used by aws sso logout, with unchanged behavior) and broader transport-level errors (BotoCoreError, EndpointResolutionError, etc.) are swallowed. Use case: when aws configure export-credentials runs as a credential_process, the SSO token's on-disk lifetime collapses from the IAM Identity Center session duration (8h default) to the duration of a single STS exchange. Each refresh requires a new aws sso login flow. The existing sso.Logout call site in awscli/customizations/sso/ logout.py is refactored into a reusable helper revoke_sso_token. aws sso logout's behavior is unchanged. Tests pin the cache filename contract to botocore's SSOTokenLoader._generate_cache_key directly, so any drift between this command's lookup logic and botocore's storage logic surfaces as a test failure rather than a silent file-path mismatch. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../enhancement-configure-41258.json | 5 + .../customizations/configure/exportcreds.py | 70 ++++++ awscli/customizations/sso/logout.py | 33 ++- .../configure/test_exportcreds.py | 211 ++++++++++++++++++ 4 files changed, 309 insertions(+), 10 deletions(-) create mode 100644 .changes/next-release/enhancement-configure-41258.json diff --git a/.changes/next-release/enhancement-configure-41258.json b/.changes/next-release/enhancement-configure-41258.json new file mode 100644 index 000000000000..5481dff29bf8 --- /dev/null +++ b/.changes/next-release/enhancement-configure-41258.json @@ -0,0 +1,5 @@ +{ + "type": "enhancement", + "category": "configure", + "description": "Add ``--revoke-sso-token`` flag to ``aws configure export-credentials`` that, after exporting credentials, server-side revokes the IAM Identity Center access token used to mint them and removes its on-disk cache file." +} diff --git a/awscli/customizations/configure/exportcreds.py b/awscli/customizations/configure/exportcreds.py index 32f23d6f2a3c..cdca1c081253 100644 --- a/awscli/customizations/configure/exportcreds.py +++ b/awscli/customizations/configure/exportcreds.py @@ -11,6 +11,7 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. import csv +import hashlib import io import json import os @@ -20,6 +21,8 @@ from awscli.customizations.commands import BasicCommand from awscli.customizations.exceptions import ConfigurationError +from awscli.customizations.sso.logout import revoke_sso_token +from awscli.customizations.sso.utils import SSO_TOKEN_DIR # Takes botocore's ReadOnlyCredentials and exposes an expiry_time. Credentials = namedtuple( @@ -199,6 +202,21 @@ class ConfigureExportCredentialsCommand(BasicCommand): 'choices': list(SUPPORTED_FORMATS), 'default': CredentialProcessFormatter.FORMAT, }, + { + 'name': 'revoke-sso-token', + 'action': 'store_true', + 'default': False, + 'help_text': ( + 'After exporting credentials, server-side revoke the AWS ' + 'IAM Identity Center access token used to mint them and ' + 'remove its on-disk cache file. This is a no-op for ' + 'profiles that do not resolve credentials through IAM ' + 'Identity Center. Use this to minimize the on-disk ' + 'lifetime of the SSO access token. Note: when used in a ' + '``credential_process`` configuration, every credential ' + 'refresh will require a new ``aws sso login`` flow.' + ), + }, ] _RECURSION_VAR = '_AWS_CLI_PROFILE_CHAIN' # Two levels is reasonable because you might explicitly run @@ -280,3 +298,55 @@ def _run_main(self, parsed_args, parsed_globals): creds_with_expiry = convert_botocore_credentials(creds) formatter = SUPPORTED_FORMATS[parsed_args.format](self._out_stream) formatter.display_credentials(creds_with_expiry) + if parsed_args.revoke_sso_token: + self._revoke_sso_token_for_profile(parsed_globals) + + def _revoke_sso_token_for_profile(self, parsed_globals): + # Best-effort: credentials have already been emitted at this point, + # so any failure here must not change the exit status. We catch + # broadly to cover transport-level errors from the sso.Logout call + # (BotoCoreError, EndpointResolutionError, etc.) in addition to + # the ``ClientError`` path that ``revoke_sso_token`` already logs. + try: + self._do_revoke_sso_token_for_profile(parsed_globals) + except Exception: + pass + + def _do_revoke_sso_token_for_profile(self, parsed_globals): + cache_key = self._sso_cache_key_for_current_profile() + if cache_key is None: + return + cache_path = os.path.join(SSO_TOKEN_DIR, cache_key + '.json') + try: + with open(cache_path) as f: + contents = json.load(f) + except (OSError, ValueError): + return + access_token = contents.get('accessToken') + sso_region = contents.get('region') + if not access_token or not sso_region: + return + revoke_sso_token( + self._session, sso_region, access_token, parsed_globals + ) + try: + os.remove(cache_path) + except OSError: + pass + + def _sso_cache_key_for_current_profile(self): + # Returns the SSO token cache key (sha1 digest) for the active + # profile, or None if the profile has no IAM Identity Center + # configuration. The cache key is computed identically to + # ``botocore.utils.SSOTokenLoader``: sha1 of the sso_session name + # when present, otherwise sha1 of the legacy sso_start_url. + scoped_config = self._session.get_scoped_config() + sso_session = scoped_config.get('sso_session') + if sso_session: + cache_input = sso_session + else: + sso_start_url = scoped_config.get('sso_start_url') + if not sso_start_url: + return None + cache_input = sso_start_url + return hashlib.sha1(cache_input.encode('utf-8')).hexdigest() diff --git a/awscli/customizations/sso/logout.py b/awscli/customizations/sso/logout.py index 1738acc6ae01..a1fe101ade97 100644 --- a/awscli/customizations/sso/logout.py +++ b/awscli/customizations/sso/logout.py @@ -22,6 +22,24 @@ LOG = logging.getLogger(__name__) +def revoke_sso_token(session, sso_region, access_token, parsed_globals): + """Server-side revoke an SSO access token via the sso.Logout API. + + Errors from the Logout call are caught and logged; this is best-effort. + """ + sso = session.create_client( + 'sso', + region_name=sso_region, + verify=parsed_globals.verify_ssl, + ) + try: + sso.logout(accessToken=access_token) + except ClientError: + # The token may already be expired or otherwise invalid. If we + # get a client error on logout just log and continue on. + LOG.debug('Failed to call logout API:', exc_info=True) + + class LogoutCommand(BasicCommand): NAME = 'logout' DESCRIPTION = ( @@ -85,17 +103,12 @@ def _before_deletion(self, contents): # and invoke the logout api to invalidate the token before deleting it. sso_region = contents.get('region') if sso_region: - sso = self._session.create_client( - 'sso', - region_name=sso_region, - verify=self._parsed_globals.verify_ssl, + revoke_sso_token( + self._session, + sso_region, + contents['accessToken'], + self._parsed_globals, ) - try: - sso.logout(accessToken=contents['accessToken']) - except ClientError: - # The token may already be expired or otherwise invalid. If we - # get a client error on logout just log and continue on - LOG.debug('Failed to call logout API:', exc_info=True) class SSOCredentialSweeper(BaseCredentialSweeper): diff --git a/tests/unit/customizations/configure/test_exportcreds.py b/tests/unit/customizations/configure/test_exportcreds.py index 53cfa493d65b..2b1390bd7693 100644 --- a/tests/unit/customizations/configure/test_exportcreds.py +++ b/tests/unit/customizations/configure/test_exportcreds.py @@ -12,14 +12,18 @@ # language governing permissions and limitations under the License. import io import json +import os from datetime import datetime, timedelta import pytest from botocore.credentials import Credentials as StaticCredentials from botocore.credentials import ReadOnlyCredentials, RefreshableCredentials +from botocore.exceptions import ClientError from botocore.session import Session +from botocore.utils import SSOTokenLoader from dateutil.tz import tzutc +from awscli.customizations.configure import exportcreds as exportcreds_mod from awscli.customizations.configure.exportcreds import ( BashEnvVarFormatter, BashNoExportEnvFormatter, @@ -350,3 +354,210 @@ def test_max_recursion_limit(self): 'Maximum recursive credential process resolution reached', str(excinfo), ) + + +class TestRevokeSSOTokenFlag(unittest.TestCase): + def setUp(self): + self.session = mock.Mock(spec=Session) + self.session.emit_first_non_none_response.return_value = None + self.session.get_config_variable.return_value = 'default' + self.session.get_credentials.return_value = StaticCredentials( + 'access_key', 'secret_key', 'token' + ) + self.out_stream = io.StringIO() + self.err_stream = io.StringIO() + self.os_env = {} + self.global_args = mock.Mock() + self.tmpdir = mock.MagicMock() + + # Patch SSO_TOKEN_DIR to a temp dir for the duration of the test. + self._tmp_token_dir = os.path.join( + os.path.dirname(__file__), '_tmp_sso_cache_test' + ) + os.makedirs(self._tmp_token_dir, exist_ok=True) + self._token_dir_patcher = mock.patch.object( + exportcreds_mod, 'SSO_TOKEN_DIR', self._tmp_token_dir + ) + self._token_dir_patcher.start() + + self.export_creds_cmd = ConfigureExportCredentialsCommand( + self.session, self.out_stream, self.err_stream, env=self.os_env + ) + + def tearDown(self): + self._token_dir_patcher.stop() + for name in os.listdir(self._tmp_token_dir): + os.remove(os.path.join(self._tmp_token_dir, name)) + os.rmdir(self._tmp_token_dir) + + def _write_token_cache(self, start_url, session_name, contents): + # Compute the cache filename via botocore's SSOTokenLoader, the + # authority for where SSO tokens are written by ``aws sso login``. + # Pinning the test to that contract — rather than to the command's + # own cache-key helper — ensures we catch drift between this + # command's lookup logic and botocore's storage logic. + cache_key = SSOTokenLoader()._generate_cache_key( + start_url, session_name + ) + path = os.path.join(self._tmp_token_dir, cache_key + '.json') + with open(path, 'w') as f: + json.dump(contents, f) + return path + + def _sso_session_profile_config(self): + return { + 'scoped': {'sso_session': 'my-session'}, + 'full': { + 'sso_sessions': { + 'my-session': { + 'sso_region': 'us-east-1', + 'sso_start_url': 'https://example.awsapps.com/start', + } + } + }, + } + + def _legacy_profile_config(self): + return { + 'scoped': { + 'sso_region': 'us-east-1', + 'sso_start_url': 'https://example.awsapps.com/start', + }, + 'full': {}, + } + + def _wire_profile(self, profile_config): + self.session.get_scoped_config.return_value = profile_config['scoped'] + self.session.full_config = profile_config['full'] + + def test_revokes_token_for_sso_session_profile(self): + self._wire_profile(self._sso_session_profile_config()) + cache_path = self._write_token_cache( + start_url='https://example.awsapps.com/start', + session_name='my-session', + contents={'accessToken': 'abc123', 'region': 'us-east-1'}, + ) + sso_client = mock.Mock() + self.session.create_client.return_value = sso_client + + rc = self.export_creds_cmd( + args=['--revoke-sso-token'], parsed_globals=self.global_args + ) + + self.assertEqual(rc, 0) + self.session.create_client.assert_called_once_with( + 'sso', + region_name='us-east-1', + verify=self.global_args.verify_ssl, + ) + sso_client.logout.assert_called_once_with(accessToken='abc123') + self.assertFalse(os.path.exists(cache_path)) + + def test_revokes_token_for_legacy_sso_profile(self): + self._wire_profile(self._legacy_profile_config()) + cache_path = self._write_token_cache( + start_url='https://example.awsapps.com/start', + session_name=None, + contents={'accessToken': 'legacy-token', 'region': 'us-east-1'}, + ) + sso_client = mock.Mock() + self.session.create_client.return_value = sso_client + + rc = self.export_creds_cmd( + args=['--revoke-sso-token'], parsed_globals=self.global_args + ) + + self.assertEqual(rc, 0) + sso_client.logout.assert_called_once_with(accessToken='legacy-token') + self.assertFalse(os.path.exists(cache_path)) + + def test_no_op_for_non_sso_profile(self): + self._wire_profile({'scoped': {}, 'full': {}}) + rc = self.export_creds_cmd( + args=['--revoke-sso-token'], parsed_globals=self.global_args + ) + self.assertEqual(rc, 0) + self.session.create_client.assert_not_called() + + def test_no_op_when_cache_file_missing(self): + self._wire_profile(self._sso_session_profile_config()) + # No cache file written. + rc = self.export_creds_cmd( + args=['--revoke-sso-token'], parsed_globals=self.global_args + ) + self.assertEqual(rc, 0) + self.session.create_client.assert_not_called() + + def test_swallows_non_client_error_from_revoke_call(self): + # ``revoke_sso_token`` only catches ClientError. Non-ClientError + # transport-level exceptions (BotoCoreError, EndpointResolutionError, + # network failures, etc.) must not crash the export command, since + # credentials have already been emitted. + self._wire_profile(self._sso_session_profile_config()) + self._write_token_cache( + start_url='https://example.awsapps.com/start', + session_name='my-session', + contents={'accessToken': 'abc123', 'region': 'us-east-1'}, + ) + self.session.create_client.side_effect = RuntimeError( + 'simulated transport failure' + ) + + rc = self.export_creds_cmd( + args=['--revoke-sso-token'], parsed_globals=self.global_args + ) + + self.assertEqual(rc, 0) + # Credentials must still have been emitted to stdout. + emitted = json.loads(self.out_stream.getvalue()) + self.assertEqual(emitted['AccessKeyId'], 'access_key') + + def test_swallows_logout_api_error_and_still_removes_cache(self): + self._wire_profile(self._sso_session_profile_config()) + cache_path = self._write_token_cache( + start_url='https://example.awsapps.com/start', + session_name='my-session', + contents={'accessToken': 'abc123', 'region': 'us-east-1'}, + ) + sso_client = mock.Mock() + sso_client.logout.side_effect = ClientError( + {'Error': {'Code': 'UnauthorizedException', 'Message': 'expired'}}, + 'Logout', + ) + self.session.create_client.return_value = sso_client + + rc = self.export_creds_cmd( + args=['--revoke-sso-token'], parsed_globals=self.global_args + ) + + self.assertEqual(rc, 0) + sso_client.logout.assert_called_once() + self.assertFalse(os.path.exists(cache_path)) + + def test_credentials_still_emitted_when_revoke_fails(self): + self._wire_profile(self._sso_session_profile_config()) + # Cache contents missing required keys -> revoke is a no-op, but + # output must still be produced. + self._write_token_cache( + start_url='https://example.awsapps.com/start', + session_name='my-session', + contents={'accessToken': ''}, + ) + rc = self.export_creds_cmd( + args=['--revoke-sso-token'], parsed_globals=self.global_args + ) + self.assertEqual(rc, 0) + emitted = json.loads(self.out_stream.getvalue()) + self.assertEqual(emitted['AccessKeyId'], 'access_key') + self.assertEqual(emitted['SecretAccessKey'], 'secret_key') + + def test_flag_off_does_not_revoke(self): + self._wire_profile(self._sso_session_profile_config()) + self._write_token_cache( + start_url='https://example.awsapps.com/start', + session_name='my-session', + contents={'accessToken': 'abc123', 'region': 'us-east-1'}, + ) + rc = self.export_creds_cmd(args=[], parsed_globals=self.global_args) + self.assertEqual(rc, 0) + self.session.create_client.assert_not_called()