diff --git a/smart_tests/args4p/command.py b/smart_tests/args4p/command.py index f1523b1ab..9a9d47bfb 100644 --- a/smart_tests/args4p/command.py +++ b/smart_tests/args4p/command.py @@ -95,12 +95,14 @@ def __call__(self, *_args: str) -> Any: if a in ["--help", "-h"]: print(invoker.command.format_help()) raise Exit(0) + has_inline = False if a.startswith("--") and '=' in a: # --long-format=value a, val = a.split('=', 1) args.insert_front(val) + has_inline = True - invoker.eat_options(a, args) + invoker.eat_options(a, args, has_inline) elif isinstance(invoker.command, Group): invoker = invoker.sub_command(a) else: @@ -613,13 +615,13 @@ def eat_arg(self, arg: str): self.kwargs[a.name] = a.append(self.kwargs.get(a.name), arg) self.nargs += 1 - def eat_options(self, option_name: str, args: ArgList): + def eat_options(self, option_name: str, args: ArgList, has_inline: bool = False): inv: _Invoker | None = self option_names = [] while inv is not None: for o in inv.command.options: if option_name in o.option_names: - inv.kwargs[o.name] = o.append(inv.kwargs.get(o.name), option_name, args) + inv.kwargs[o.name] = o.append(inv.kwargs.get(o.name), option_name, args, has_inline) return else: option_names += o.option_names diff --git a/smart_tests/args4p/decorators.py b/smart_tests/args4p/decorators.py index bcf2df65e..916f93924 100644 --- a/smart_tests/args4p/decorators.py +++ b/smart_tests/args4p/decorators.py @@ -35,7 +35,8 @@ def group(name: Optional[str] = None, help: Optional[str] = None) -> Callable[.. def option( *param_decls: str, help: str | None = None, type: type | Callable | None = None, default: Any = NO_DEFAULT, required: bool = False, - metavar: str | None = None, multiple: bool = False, hidden: bool = False + metavar: str | None = None, multiple: bool = False, hidden: bool = False, + optional_value: bool = False, flag_value: Any = None ) -> Callable: ''' See README.md for usage @@ -60,7 +61,9 @@ def decorator(f: Callable) -> Callable: required=required, metavar=metavar, multiple=multiple, - hidden=hidden) + hidden=hidden, + optional_value=optional_value, + flag_value=flag_value) return _attach(f, o) diff --git a/smart_tests/args4p/option.py b/smart_tests/args4p/option.py index 761b6995d..afbbc57a5 100644 --- a/smart_tests/args4p/option.py +++ b/smart_tests/args4p/option.py @@ -29,7 +29,9 @@ def __init__( required: bool = False, metavar: str | None = None, multiple: bool = False, - hidden: bool = False): + hidden: bool = False, + optional_value: bool = False, + flag_value: Any = None): self.name = name # type: ignore[assignment] # once properly constructed, name is never None self.option_names = option_names self.help = help @@ -39,15 +41,33 @@ def __init__( self.metavar = metavar self.multiple = multiple self.hidden = hidden + # When True, the value is taken ONLY from an inline `--opt=value` form; a bare `--opt` + # (no `=`) binds `flag_value` instead of consuming the following argument. This lets a + # single option behave both as a flag (`--opt`) and as a valued option (`--opt=value`). + self.optional_value = optional_value + self.flag_value = flag_value - def append(self, existing: Any, option_name: str, args): # args is ArgList, but typing it creates a circular import + # 'args' is ArgList, but typing it creates a circular import + def append(self, existing: Any, option_name: str, args, has_inline: bool = False): ''' Given the current value 'existing' that represents the present value to invoke the user function with, this method is called when this option was specified as 'option_name' on the command line. 'args' is pointing at the next argument after 'option_name', which may be the value for this option. + 'has_inline' is True when the option was given as `--opt=value` (the value has been pushed to + the front of 'args' by the parser). ''' - if self.type == bool or self.type == Optional[bool]: + if self.optional_value: + if has_inline: + v = args.eat(option_name) + try: + v = self.type(v) + except ValueError as e: + raise BadCmdLineException(f"Invalid value '{v}' for option '{option_name}': {str(e)}") from e + else: + # bare `--opt`: bind the configured flag value, do NOT consume the next argument. + v = self.flag_value + elif self.type == bool or self.type == Optional[bool]: v = True else: v = args.eat(option_name) diff --git a/smart_tests/args4p/typer/__init__.py b/smart_tests/args4p/typer/__init__.py index 0bbb09edc..d78845e77 100644 --- a/smart_tests/args4p/typer/__init__.py +++ b/smart_tests/args4p/typer/__init__.py @@ -9,14 +9,16 @@ def Option( *option_names: str, help: str | None = None, type: type | Callable | None = None, default: Any = NO_DEFAULT, required: bool = False, - metavar: str | None = None, multiple: bool = False, hidden: bool = False + metavar: str | None = None, multiple: bool = False, hidden: bool = False, + optional_value: bool = False, flag_value: Any = None ) -> _Option: ''' See README.md for usage ''' return _Option(name=None, option_names=list(option_names), help=help, type=type, - default=default, required=required, metavar=metavar, multiple=multiple, hidden=hidden) + default=default, required=required, metavar=metavar, multiple=multiple, hidden=hidden, + optional_value=optional_value, flag_value=flag_value) def Argument( diff --git a/smart_tests/commands/verify.py b/smart_tests/commands/verify.py index 5bc502686..3ccedc364 100644 --- a/smart_tests/commands/verify.py +++ b/smart_tests/commands/verify.py @@ -1,24 +1,35 @@ +import base64 +import binascii +import json import os import platform import re import subprocess -from typing import List +import urllib.parse +from typing import Annotated, List import click +import requests import smart_tests.args4p.typer as typer from smart_tests.utils.tracking import Tracking, TrackingClient from .. import args4p from ..app import Application -from ..utils.authentication import ensure_org_workspace, get_org_workspace +from ..utils.authentication import ensure_org_workspace, get_oidc_token, get_org_workspace from ..utils.commands import Command -from ..utils.env_keys import TOKEN_KEY +from ..utils.env_keys import OIDC_TOKEN_KEY, ORGANIZATION_KEY, TOKEN_KEY, WORKSPACE_KEY +from ..utils.http_client import DEFAULT_GET_TIMEOUT, _HttpClient from ..utils.java import get_java_command from ..utils.smart_tests_client import SmartTestsClient from ..utils.typer_types import emoji from ..version import __version__ as version +# Credential-free OIDC bootstrap endpoint. Served at /intake/oidc/verify (the app runs under the +# /intake context-path). It is intentionally NOT workspace-scoped: it self-verifies the presented +# OIDC token and resolves the org/workspace it maps to. +OIDC_VERIFY_PATH = "/intake/oidc/verify" + def parse_version(version_string: str) -> List[int]: """Parse version string and extract numeric parts. @@ -69,12 +80,37 @@ def check_java_version(javacmd: str) -> int: @args4p.command(help="Verify CLI setup and connectivity") -def verify(app_instance: Application): +def verify( + app_instance: Application, + oidc: Annotated[bool, typer.Option( + "--oidc", + help="Authenticate this pipeline with its OIDC id-token (from " + OIDC_TOKEN_KEY + ") " + "instead of an API key, and resolve the org/workspace it is registered to " + "(the credential-free bootstrap).")] = False, + oidc_fetch_issuer: Annotated[bool, typer.Option( + "--oidc-fetch-issuer", + help="Run this from INSIDE a private network to fetch the OIDC issuer's public JWKS (read " + "from the id-token in " + OIDC_TOKEN_KEY + ") and print a block an admin pastes into " + "the WebApp (Trusted OIDC issuers). Use this when Intake cannot reach the issuer " + "directly, so its keys must be registered for manual verification. This authenticates " + "nothing and contacts only the issuer.")] = False, +): # Run the verification (no subcommands in this app) # In this command, regardless of REPORT_ERROR_KEY, always report an unexpected error with full stack trace # to assist troubleshooting. `typer.BadParameter` is handled by the invoking # Click gracefully. + if oidc and oidc_fetch_issuer: + click.secho( + "Use either --oidc or --oidc-fetch-issuer, not both.", fg='red', err=True) + raise typer.Exit(2) + if oidc_fetch_issuer: + fetch_oidc_issuer(app_instance) + return + if oidc: + verify_oidc(app_instance) + return + org, workspace = get_org_workspace() tracking_client = TrackingClient(Command.VERIFY, app=app_instance) client = SmartTestsClient(tracking_client=tracking_client, app=app_instance) @@ -161,3 +197,260 @@ def verify(app_instance: Application): raise typer.Exit(1) click.secho("Your CLI configuration is successfully verified" + emoji(" \U0001f389"), fg='green') + + +def verify_oidc(app_instance: Application): + ''' + Credential-free OIDC bootstrap. Presents the pipeline's OIDC id-token to Intake's + /intake/oidc/verify endpoint and translates the 200/403/401 contract into CLI behavior: + + - 200: the subject is registered. Print `export` lines (org/workspace/oidc-token) so the + pipeline can `eval "$(smart-tests verify --oidc)"` and authenticate subsequent + commands with the same token. Exit 0. + - 403: the token verified but its subject isn't registered to any workspace yet. Show the + normalized `sub` so the user can register it from the WebApp settings. Exit 1. + - 401: the token is missing/expired/invalid. Exit 1. + ''' + tracking_client = TrackingClient(Command.VERIFY, app=app_instance) + + token = get_oidc_token() + if not token: + msg = (f"OIDC authentication requires the {OIDC_TOKEN_KEY} environment variable to hold the " + "pipeline's OIDC id-token. In Jenkins, bind an id-token credential to this variable; " + "see the OIDC pipeline-authentication setup guide.") + click.secho(msg, fg='red', err=True) + tracking_client.send_error_event( + event_name=Tracking.ErrorEvent.USER_ERROR, + stack_trace=msg, + ) + raise typer.Exit(2) + + # The endpoint is not workspace-scoped, so we bypass SmartTestsClient (which requires an + # org/workspace) and call the low-level client directly. The OIDC token is supplied as the + # bearer by authentication_headers(). + http_client = _HttpClient(app=app_instance) + try: + res = http_client.request("post", OIDC_VERIFY_PATH, payload={}) + except Exception as e: + tracking_client.send_error_event( + event_name=Tracking.ErrorEvent.INTERNAL_CLI_ERROR, + stack_trace=str(e), + api="oidc/verify", + ) + click.secho(f"Could not reach the OIDC verification endpoint: {e}", fg='red', err=True) + raise typer.Exit(1) + + if res.status_code == 401: + msg = ("OIDC authentication failed: the presented token was rejected (invalid, expired, or " + "from an unrecognized issuer).") + click.secho(msg, fg='red', err=True) + tracking_client.send_error_event( + event_name=Tracking.ErrorEvent.USER_ERROR, + stack_trace=msg, + ) + raise typer.Exit(1) + + if res.status_code == 403: + issuer = "" + sub = "" + try: + body = res.json() + issuer = body.get("issuer", "") + sub = body.get("sub", "") + except Exception: + pass + click.secho( + "This pipeline's OIDC identity is not yet registered with Smart Tests.", fg='yellow', err=True) + if issuer and sub: + # Emit a copy/paste block the user pastes verbatim into the WebApp's + # "Trusted OIDC subjects" registration form (Settings page). The keys match what the + # WebApp parses: "issuer" and "normalized-sub". + block = json.dumps({"issuer": issuer, "normalized-sub": sub}, indent=4) + click.secho( + "Please copy and paste the block below into your workspace settings " + "(Trusted OIDC subjects) to authorize this pipeline:", + fg='yellow', err=True) + click.echo("########## start ##########", err=True) + click.echo(block, err=True) + click.echo("########## end ##########", err=True) + tracking_client.send_error_event( + event_name=Tracking.ErrorEvent.USER_ERROR, + stack_trace=f"unregistered OIDC subject: {sub}", + ) + raise typer.Exit(1) + + res.raise_for_status() + + data = res.json() + org = data.get("organization") + workspace = data.get("workspace") + if not org or not workspace: + msg = "OIDC verification returned an unexpected response (missing organization/workspace)." + click.secho(msg, fg='red', err=True) + tracking_client.send_error_event( + event_name=Tracking.ErrorEvent.INTERNAL_SERVER_ERROR, + stack_trace=msg, + api="oidc/verify", + ) + raise typer.Exit(1) + + # Emit eval-able export lines so the pipeline can hydrate its environment: + # eval "$(smart-tests verify --oidc)" + # Subsequent commands then read org/workspace from these vars and present the same OIDC token + # (kept in SMART_TESTS_OIDC_TOKEN) as their bearer. + click.echo(f'export {ORGANIZATION_KEY}={_shell_quote(org)}') + click.echo(f'export {WORKSPACE_KEY}={_shell_quote(workspace)}') + click.echo(f'export {OIDC_TOKEN_KEY}={_shell_quote(token)}') + click.secho( + f"OIDC authentication verified for organization {org!r}, workspace {workspace!r}" + emoji(" \U0001f389"), + fg='green', err=True) + + +def fetch_oidc_issuer(app_instance: Application): + ''' + Helper for issuers Intake cannot reach (e.g. a private Jenkins), enabling manual verification. + This runs INSIDE the private network — where the issuer's discovery endpoint IS reachable — + reads the pipeline's OIDC id-token, extracts the `iss` claim, fetches that issuer's public JWKS + via standard OIDC discovery, and prints an `{issuer, jwks}` copy/paste block. + + An admin then pastes that block into the WebApp (Settings → Trusted OIDC issuers) to register + the issuer. This command deliberately does NOT register anything itself and never sends the JWKS + to Intake: the verification key must never travel to the credential-free `verify` endpoint on the + same channel as the token it verifies. Registration is an authenticated admin action; this is + transport only. + + A JWKS contains only PUBLIC keys, so printing/pasting it exposes no secret. + ''' + tracking_client = TrackingClient(Command.VERIFY, app=app_instance) + + token = get_oidc_token() + if not token: + msg = (f"--oidc-fetch-issuer requires the {OIDC_TOKEN_KEY} environment variable to hold the " + "pipeline's OIDC id-token so its issuer can be discovered. In Jenkins, bind an " + "id-token credential to this variable; see the OIDC pipeline-authentication setup guide.") + click.secho(msg, fg='red', err=True) + tracking_client.send_error_event( + event_name=Tracking.ErrorEvent.USER_ERROR, + stack_trace=msg, + ) + raise typer.Exit(2) + + try: + issuer = _issuer_from_jwt(token) + except ValueError as e: + msg = f"Could not read the issuer (iss) from {OIDC_TOKEN_KEY}: {e}" + click.secho(msg, fg='red', err=True) + tracking_client.send_error_event( + event_name=Tracking.ErrorEvent.USER_ERROR, + stack_trace=msg, + ) + raise typer.Exit(2) + + click.secho(f"Discovering the JWKS for issuer {issuer!r} from inside this network...", + fg='yellow', err=True) + try: + jwks = _fetch_issuer_jwks(issuer, app_instance) + except Exception as e: + msg = (f"Could not fetch the JWKS for {issuer!r}: {e}. " + "Run this from a host that can reach the issuer's OIDC discovery endpoint.") + click.secho(msg, fg='red', err=True) + tracking_client.send_error_event( + event_name=Tracking.ErrorEvent.INTERNAL_CLI_ERROR, + stack_trace=str(e), + api="oidc-discovery", + ) + raise typer.Exit(1) + + # Emit the copy/paste block. Keys match what the WebApp's parseTrustedOidcIssuerBlock() expects: + # "issuer" and "jwks" (jwks as a nested JSON object). + block = json.dumps({"issuer": issuer, "jwks": jwks}, indent=4) + click.secho( + "Copy the block below and paste it into the WebApp (Settings -> Trusted OIDC issuers). " + "NOTE: a registered issuer is a platform-wide trust anchor, so this must be done by an " + "organization admin who trusts this issuer's keys:", + fg='yellow', err=True) + click.echo("########## start ##########") + click.echo(block) + click.echo("########## end ##########") + + +def _issuer_from_jwt(token: str) -> str: + ''' + Extract the `iss` claim from an unverified JWT. We do NOT verify the signature here — this is a + local convenience to learn which issuer's JWKS to fetch; the resulting JWKS is what Intake later + uses to verify tokens for real. + ''' + parts = token.split(".") + if len(parts) != 3: + raise ValueError("not a well-formed JWT (expected three dot-separated segments)") + try: + payload_raw = base64.urlsafe_b64decode(parts[1] + "=" * (-len(parts[1]) % 4)) + payload = json.loads(payload_raw) + except (binascii.Error, ValueError) as e: + raise ValueError(f"could not decode the JWT payload: {e}") from e + issuer = payload.get("iss") + if not issuer or not isinstance(issuer, str): + raise ValueError("the token has no 'iss' claim") + return issuer + + +def _fetch_issuer_jwks(issuer: str, app_instance: Application) -> dict: + ''' + Standard OIDC discovery, mirroring the backend's HttpJwksUriDiscovery: + GET {iss}/.well-known/openid-configuration -> jwks_uri -> GET jwks_uri. + Returns the parsed JWKS document (a dict with a non-empty "keys" list). + + Talks to the issuer's OWN host (absolute URLs), not the Smart Tests base URL, so we use + `requests` directly rather than _HttpClient. Public discovery/JWKS endpoints need no auth. + Honors --skip-cert-verification and the standard HTTPS_PROXY environment variable. + ''' + verify_tls = not app_instance.skip_cert_verification + + config_url = issuer.rstrip("/") + "/.well-known/openid-configuration" + res = requests.get(config_url, timeout=DEFAULT_GET_TIMEOUT, verify=verify_tls) + res.raise_for_status() + jwks_uri = res.json().get("jwks_uri") + if not jwks_uri or not isinstance(jwks_uri, str): + raise ValueError("the issuer's discovery document has no 'jwks_uri'") + + # SSRF guard: manual mode runs inside the private network with no backend PrivateNetworkGuard, + # so a tampered discovery document could point jwks_uri at an arbitrary internal host (cloud + # metadata, another internal service). A self-hosted issuer (e.g. Jenkins) always advertises a + # same-origin jwks_uri, so require it to live on the issuer's own host/scheme rather than + # following wherever the (unverified) document points. + if not _same_origin(issuer, jwks_uri): + raise ValueError( + f"the discovery document's jwks_uri {jwks_uri!r} is not on the issuer's origin " + f"{issuer!r}; refusing to fetch keys from a different host") + + res = requests.get(jwks_uri, timeout=DEFAULT_GET_TIMEOUT, verify=verify_tls) + res.raise_for_status() + jwks = res.json() + keys = jwks.get("keys") if isinstance(jwks, dict) else None + if not keys: + raise ValueError("the fetched JWKS has no keys") + return jwks + + +def _same_origin(a: str, b: str) -> bool: + ''' + True iff URLs `a` and `b` share the same (scheme, host, port) origin. Used to require an + issuer's advertised jwks_uri to live on the issuer's own host — a self-hosted OIDC provider + always does, and it prevents a tampered discovery document from redirecting the JWKS fetch to + an arbitrary host. Comparison is case-insensitive on scheme/host and normalizes the default + port for http/https. + ''' + def origin(url: str): + p = urllib.parse.urlsplit(url) + scheme = p.scheme.lower() + host = (p.hostname or "").lower() + default_port = {"http": 80, "https": 443}.get(scheme) + port = p.port if p.port is not None else default_port + return (scheme, host, port) + + return origin(a) == origin(b) + + +def _shell_quote(value: str) -> str: + """Single-quote a value for safe use in a POSIX `export VAR=...` line.""" + return "'" + value.replace("'", "'\\''") + "'" diff --git a/smart_tests/utils/authentication.py b/smart_tests/utils/authentication.py index ff4395e7e..831c8bd77 100644 --- a/smart_tests/utils/authentication.py +++ b/smart_tests/utils/authentication.py @@ -6,7 +6,7 @@ import smart_tests.args4p.typer as typer -from .env_keys import ORGANIZATION_KEY, WORKSPACE_KEY, get_token +from .env_keys import OIDC_TOKEN_KEY, ORGANIZATION_KEY, WORKSPACE_KEY, get_token def get_org_workspace(): @@ -39,11 +39,27 @@ def ensure_org_workspace() -> Tuple[str, str]: return org, workspace +def get_oidc_token(): + ''' + Returns the CI-issued OIDC id-token (e.g. a Jenkins-minted RS256 JWT) from the environment, + or None if not set. This is the same token `smart-tests verify --oidc` exchanges for an + org/workspace, and the one subsequent workspace-scoped calls present as their bearer. + ''' + return os.getenv(OIDC_TOKEN_KEY) + + def authentication_headers(): token = get_token() if token: return {'Authorization': f'Bearer {token}'} + # A pipeline that authenticated via `verify --oidc` carries no SMART_TESTS_TOKEN; it presents + # its OIDC id-token directly. Intake routes this by `iss` (RESTAuthVerifier) to the generic OIDC + # verifier, so subsequent workspace-scoped calls authenticate with the same JWT. + oidc_token = get_oidc_token() + if oidc_token: + return {'Authorization': f'Bearer {oidc_token}'} + if os.getenv('EXPERIMENTAL_GITHUB_OIDC_TOKEN_AUTH'): req_url = os.getenv('ACTIONS_ID_TOKEN_REQUEST_URL') rt_token = os.getenv('ACTIONS_ID_TOKEN_REQUEST_TOKEN') diff --git a/smart_tests/utils/env_keys.py b/smart_tests/utils/env_keys.py index b4ad2ff9d..5d9f8e408 100644 --- a/smart_tests/utils/env_keys.py +++ b/smart_tests/utils/env_keys.py @@ -2,6 +2,10 @@ REPORT_ERROR_KEY = "SMART_TESTS_REPORT_ERROR" TOKEN_KEY = "SMART_TESTS_TOKEN" +# OIDC id-token presented by a CI pipeline (e.g. a Jenkins-minted RS256 JWT). Used by +# `smart-tests verify --oidc` for credential-free bootstrap and as the bearer for subsequent +# workspace-scoped calls. See authentication.authentication_headers(). +OIDC_TOKEN_KEY = "SMART_TESTS_OIDC_TOKEN" ORGANIZATION_KEY = "SMART_TESTS_ORGANIZATION" WORKSPACE_KEY = "SMART_TESTS_WORKSPACE" BASE_URL_KEY = "SMART_TESTS_BASE_URL" diff --git a/tests/args4p/test_command.py b/tests/args4p/test_command.py index e3de571ad..1a252112b 100644 --- a/tests/args4p/test_command.py +++ b/tests/args4p/test_command.py @@ -1,4 +1,4 @@ -from typing import Annotated, Optional +from typing import Annotated, List, Optional from unittest import TestCase import smart_tests.args4p as args4p @@ -68,6 +68,29 @@ def cli(foo: int): cli("--foo", "5") self.assertEqual(v, 5) + def test_optional_value_option(self): + """An option with optional_value binds flag_value when bare, and the inline value when given + as --opt=value, WITHOUT consuming the following argument in the bare form.""" + seen = [] + + @args4p.command() + @args4p.argument("rest", multiple=True, required=False) + @args4p.option("--mode", "mode", optional_value=True, flag_value="auto") + def cli(mode: Optional[str] = None, rest: Optional[List[str]] = None): + seen.append((mode, list(rest or []))) + + # not given at all -> default + cli() + self.assertEqual(seen[-1], (None, [])) + + # bare -> flag_value, and does NOT swallow the trailing positional + cli("--mode", "leftover") + self.assertEqual(seen[-1], ("auto", ["leftover"])) + + # inline value + cli("--mode=manual") + self.assertEqual(seen[-1], ("manual", [])) + def test_command_with_arguments(self): """Test command with positional arguments""" received_args = [] diff --git a/tests/commands/test_verify.py b/tests/commands/test_verify.py index 994c01716..a1318d037 100644 --- a/tests/commands/test_verify.py +++ b/tests/commands/test_verify.py @@ -1,3 +1,5 @@ +import base64 +import json import os from subprocess import CalledProcessError from unittest import TestCase @@ -107,3 +109,181 @@ def test_verify_fallback_when_no_display_name(self): # Should show original org/workspace names self.assertIn(f"'{self.organization}'", result.output) self.assertIn(f"'{self.workspace}'", result.output) + + +class VerifyOidcCommandTest(CliTestCase): + """Test the credential-free `verify --oidc` bootstrap flow.""" + + oidc_token = "header.payload.signature" + base_url = "http://localhost:8080" + oidc_verify_url = f"{base_url}/intake/oidc/verify" + oidc_env = {"SMART_TESTS_OIDC_TOKEN": oidc_token, "SMART_TESTS_BASE_URL": base_url} + + @responses.activate + @patch.dict(os.environ, oidc_env, clear=True) + def test_oidc_registered_emits_exports(self): + """200 → print eval-able export lines for org/workspace/token, exit 0.""" + responses.add( + responses.POST, + self.oidc_verify_url, + json={"organization": "acme", "workspace": "prod"}, + status=200, + ) + + result = self.cli("verify", "--oidc") + self.assert_success(result) + + self.assertIn("export SMART_TESTS_ORGANIZATION='acme'", result.output) + self.assertIn("export SMART_TESTS_WORKSPACE='prod'", result.output) + self.assertIn(f"export SMART_TESTS_OIDC_TOKEN='{self.oidc_token}'", result.output) + + # The token must be presented as the bearer to the verify endpoint. + self.assertEqual(responses.calls[0].request.headers["Authorization"], f"Bearer {self.oidc_token}") + + @responses.activate + @patch.dict(os.environ, oidc_env, clear=True) + def test_oidc_unregistered_shows_paste_block(self): + """403 → show the copy/paste registration block (issuer + normalized-sub), exit 1, no exports.""" + issuer = "https://jenkins.example.com/oidc" + sub = "https://jenkins.example.com/job/my-pipeline/" + responses.add( + responses.POST, + self.oidc_verify_url, + json={"issuer": issuer, "sub": sub}, + status=403, + ) + + result = self.cli("verify", "--oidc") + self.assert_exit_code(result, 1) + self.assertIn("########## start ##########", result.output) + self.assertIn("########## end ##########", result.output) + self.assertIn(issuer, result.output) + self.assertIn(sub, result.output) + self.assertIn("normalized-sub", result.output) + self.assertNotIn("export SMART_TESTS_ORGANIZATION", result.output) + + @responses.activate + @patch.dict(os.environ, oidc_env, clear=True) + def test_oidc_invalid_token_fails(self): + """401 → authentication failed, exit 1.""" + responses.add( + responses.POST, + self.oidc_verify_url, + json={"reason": "bad token"}, + status=401, + ) + + result = self.cli("verify", "--oidc") + self.assert_exit_code(result, 1) + self.assertNotIn("export SMART_TESTS_ORGANIZATION", result.output) + + @patch.dict(os.environ, {}, clear=True) + def test_oidc_missing_token(self): + """No OIDC token in the environment → usage error, exit 2.""" + result = self.cli("verify", "--oidc") + self.assert_exit_code(result, 2) + self.assertIn("SMART_TESTS_OIDC_TOKEN", result.output) + + +def _make_jwt(claims: dict) -> str: + """Build an unsigned-looking JWT (header.payload.signature) with the given claims payload.""" + def seg(obj): + raw = json.dumps(obj).encode() + return base64.urlsafe_b64encode(raw).rstrip(b"=").decode() + return f"{seg({'alg': 'RS256', 'typ': 'JWT'})}.{seg(claims)}.signature" + + +class VerifyOidcFetchIssuerCommandTest(CliTestCase): + """Test `verify --oidc-fetch-issuer`: discover the issuer's JWKS from inside the private network + and print an {issuer, jwks} paste block. It must NOT hit the credential-free verify endpoint.""" + + issuer = "http://jenkins.internal:8080/oidc" + token = _make_jwt({"iss": issuer, "sub": "http://jenkins.internal:8080/job/pipeline/"}) + jwks = {"keys": [{"kty": "RSA", "kid": "k1", "n": "AAAA", "e": "AQAB"}]} + + def _mock_discovery(self): + responses.add( + responses.GET, + f"{self.issuer}/.well-known/openid-configuration", + json={"jwks_uri": f"{self.issuer}/jwks"}, + status=200, + ) + responses.add(responses.GET, f"{self.issuer}/jwks", json=self.jwks, status=200) + + @responses.activate + @patch.dict(os.environ, {"SMART_TESTS_OIDC_TOKEN": token}, clear=True) + def test_fetch_issuer_prints_issuer_jwks_block(self): + """--oidc-fetch-issuer → discover JWKS, print {issuer, jwks} block, exit 0.""" + self._mock_discovery() + + result = self.cli("verify", "--oidc-fetch-issuer") + self.assert_success(result) + + self.assertIn("########## start ##########", result.output) + self.assertIn("########## end ##########", result.output) + + # The stdout block must parse to {issuer, jwks} with the discovered keys. + start = result.output.index("########## start ##########") + len("########## start ##########") + end = result.output.index("########## end ##########") + parsed = json.loads(result.output[start:end]) + self.assertEqual(parsed["issuer"], self.issuer) + self.assertEqual(parsed["jwks"], self.jwks) + + # It must NOT call the credential-free verify endpoint — the key never travels with the token. + for call in responses.calls: + self.assertNotIn("/oidc/verify", call.request.url) + + @patch.dict(os.environ, {}, clear=True) + def test_fetch_issuer_missing_token(self): + """No OIDC token → usage error, exit 2.""" + result = self.cli("verify", "--oidc-fetch-issuer") + self.assert_exit_code(result, 2) + self.assertIn("SMART_TESTS_OIDC_TOKEN", result.output) + + @patch.dict(os.environ, {"SMART_TESTS_OIDC_TOKEN": "not-a-jwt"}, clear=True) + def test_fetch_issuer_malformed_token(self): + """Token that isn't a well-formed JWT → exit 2, mentions iss.""" + result = self.cli("verify", "--oidc-fetch-issuer") + self.assert_exit_code(result, 2) + self.assertIn("iss", result.output) + + @responses.activate + @patch.dict(os.environ, {"SMART_TESTS_OIDC_TOKEN": token}, clear=True) + def test_fetch_issuer_unreachable_issuer_fails(self): + """Discovery endpoint unreachable/404 → exit 1, no paste block.""" + responses.add( + responses.GET, + f"{self.issuer}/.well-known/openid-configuration", + status=404, + ) + result = self.cli("verify", "--oidc-fetch-issuer") + self.assert_exit_code(result, 1) + self.assertNotIn("########## start ##########", result.output) + + @responses.activate + @patch.dict(os.environ, {"SMART_TESTS_OIDC_TOKEN": token}, clear=True) + def test_fetch_issuer_cross_origin_jwks_uri_rejected(self): + """A tampered discovery document whose jwks_uri points at a different host is rejected: + exit 1, no paste block, and the off-origin JWKS URL is never fetched.""" + evil_jwks_uri = "http://169.254.169.254/latest/meta-data/jwks" + responses.add( + responses.GET, + f"{self.issuer}/.well-known/openid-configuration", + json={"jwks_uri": evil_jwks_uri}, + status=200, + ) + responses.add(responses.GET, evil_jwks_uri, json=self.jwks, status=200) + + result = self.cli("verify", "--oidc-fetch-issuer") + self.assert_exit_code(result, 1) + self.assertNotIn("########## start ##########", result.output) + + # The off-origin jwks_uri must never be fetched. + for call in responses.calls: + self.assertNotIn("169.254.169.254", call.request.url) + + @patch.dict(os.environ, {"SMART_TESTS_OIDC_TOKEN": token}, clear=True) + def test_oidc_and_fetch_issuer_mutually_exclusive(self): + """--oidc together with --oidc-fetch-issuer → usage error, exit 2.""" + result = self.cli("verify", "--oidc", "--oidc-fetch-issuer") + self.assert_exit_code(result, 2)