diff --git a/pyproject.toml b/pyproject.toml index fdc313f..95076cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,8 @@ dependencies = [ "websockets", "markdown", "anthropic", + "httpx>=0.27", + "cryptography>=41.0", ] [project.optional-dependencies] @@ -30,6 +32,9 @@ dev = [ "pytest-asyncio", "ruff", ] +spiffe = [ + "pyspiffe>=0.8", +] [project.scripts] trellis = "trellis.cli:app" diff --git a/tests/test_identity.py b/tests/test_identity.py new file mode 100644 index 0000000..52c04b7 --- /dev/null +++ b/tests/test_identity.py @@ -0,0 +1,565 @@ +"""Tests for trellis/core/identity — SPIFFE identity federation and forge token exchange.""" + +from __future__ import annotations + +import asyncio +import base64 +import json +import os +import re +import stat +import tempfile +import time +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from trellis.core.identity.forge import ( + ForgeCredential, + ForgeTokenExchanger, + GitHubTokenExchanger, + GitLabTokenExchanger, + ForgejoTokenExchanger, + _validate_branch_pattern, + _validate_forge_url, + _validate_repos, + resolve_forge_exchanger, +) +from trellis.core.identity.manager import ForgeTokenManager, create_token_manager +from trellis.core.identity.provider import ( + IdentityProvider, + SpiffeIdentityProvider, + _parse_jwt_exp, + resolve_identity_provider, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_jwt(claims: dict, header: dict | None = None) -> str: + """Build a minimal unsigned JWT for testing.""" + hdr = header or {"alg": "none", "typ": "JWT"} + parts = [] + for payload in (hdr, claims): + encoded = base64.urlsafe_b64encode( + json.dumps(payload).encode() + ).rstrip(b"=").decode() + parts.append(encoded) + parts.append("") # empty signature + return ".".join(parts) + + +class MockIdentityProvider(IdentityProvider): + """Test double for IdentityProvider.""" + + def __init__(self, token: str = "mock-oidc-jwt"): + self._token = token + self.call_count = 0 + + async def get_token(self, audience: str) -> str: + self.call_count += 1 + return self._token + + def provider_name(self) -> str: + return "mock" + + +class MockForgeExchanger(ForgeTokenExchanger): + """Test double for ForgeTokenExchanger.""" + + def __init__(self, token: str = "ghp_mock_token_1234", ttl: float = 3600): + self._token = token + self._ttl = ttl + self.call_count = 0 + self.last_oidc_token: str | None = None + self.last_repos: list[str] | None = None + self.last_permissions: dict[str, str] | None = None + + async def exchange( + self, + oidc_token: str, + repos: list[str] | None = None, + permissions: dict[str, str] | None = None, + ) -> ForgeCredential: + self.call_count += 1 + self.last_oidc_token = oidc_token + self.last_repos = repos + self.last_permissions = permissions + return ForgeCredential( + token=self._token, + expires_at=time.time() + self._ttl, + scopes=list((permissions or {}).keys()), + forge_type="github", + ) + + +# =================================================================== +# Provider tests +# =================================================================== + +class TestParseJwtExp: + def test_valid_jwt(self): + exp = time.time() + 3600 + token = _make_jwt({"exp": exp, "sub": "test"}) + assert _parse_jwt_exp(token) == pytest.approx(exp) + + def test_missing_exp(self): + token = _make_jwt({"sub": "test"}) + assert _parse_jwt_exp(token) is None + + def test_invalid_jwt(self): + assert _parse_jwt_exp("not-a-jwt") is None + + def test_empty_string(self): + assert _parse_jwt_exp("") is None + + +class TestSpiffeIdentityProvider: + def test_validate_trust_domain_rejects_slashes(self): + with pytest.raises(ValueError, match="path separators"): + SpiffeIdentityProvider( + socket_path="/tmp/nonexistent.sock", + trust_domain="../../etc/passwd", + ) + + def test_validate_trust_domain_rejects_wildcards(self): + with pytest.raises(ValueError, match="wildcards"): + SpiffeIdentityProvider( + socket_path="/tmp/nonexistent.sock", + trust_domain="*.example.com", + ) + + def test_validate_trust_domain_rejects_empty(self): + with pytest.raises(ValueError, match="must not be empty"): + SpiffeIdentityProvider( + socket_path="/tmp/nonexistent.sock", + trust_domain="", + ) + + def test_validate_socket_path_rejects_regular_file(self): + with tempfile.NamedTemporaryFile() as f: + with pytest.raises(ValueError, match="not a Unix domain socket"): + SpiffeIdentityProvider( + socket_path=f.name, + trust_domain="trellis.local", + ) + + def test_validate_socket_path_rejects_empty(self): + with pytest.raises(ValueError, match="must not be empty"): + SpiffeIdentityProvider(socket_path="", trust_domain="trellis.local") + + def test_nonexistent_socket_ok_at_init(self): + # Socket may not exist yet (SPIRE agent starts later) + provider = SpiffeIdentityProvider( + socket_path="/tmp/definitely-not-a-real-socket-path-xyz", + trust_domain="trellis.local", + ) + assert provider.provider_name() == "spiffe/trellis.local" + + async def test_caches_token(self): + provider = SpiffeIdentityProvider( + socket_path="/tmp/nonexistent.sock", + trust_domain="trellis.local", + skew_seconds=5.0, + ) + exp = time.time() + 300 + mock_token = _make_jwt({"exp": exp, "aud": "test"}) + + with patch.object(provider, "_fetch_jwt_svid", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = mock_token + t1 = await provider.get_token("test-audience") + t2 = await provider.get_token("test-audience") + assert t1 == t2 + assert mock_fetch.call_count == 1 # only one fetch, second was cached + + async def test_refreshes_expired_token(self): + provider = SpiffeIdentityProvider( + socket_path="/tmp/nonexistent.sock", + trust_domain="trellis.local", + skew_seconds=5.0, + ) + # Token that expires very soon + exp_soon = time.time() + 2 + token_soon = _make_jwt({"exp": exp_soon, "aud": "test"}) + exp_later = time.time() + 300 + token_later = _make_jwt({"exp": exp_later, "aud": "test"}) + + with patch.object(provider, "_fetch_jwt_svid", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = token_soon + t1 = await provider.get_token("test") + assert t1 == token_soon + + # Token is within skew window, should refetch + mock_fetch.return_value = token_later + t2 = await provider.get_token("test") + assert t2 == token_later + assert mock_fetch.call_count == 2 + + +class TestResolveIdentityProvider: + def test_none_returns_none(self): + assert resolve_identity_provider(provider_type="none") is None + + def test_auto_no_socket_returns_none(self): + result = resolve_identity_provider( + provider_type="auto", + spiffe_endpoint_socket="/tmp/nonexistent-socket-xyz", + ) + assert result is None + + def test_spiffe_missing_socket_raises(self): + with pytest.raises(FileNotFoundError): + resolve_identity_provider( + provider_type="spiffe", + spiffe_endpoint_socket="/tmp/nonexistent-socket-xyz", + ) + + def test_unknown_type_raises(self): + with pytest.raises(ValueError, match="Unknown identity provider"): + resolve_identity_provider(provider_type="kubernetes") + + +# =================================================================== +# Forge exchanger tests +# =================================================================== + +class TestForgeCredential: + def test_repr_masks_token(self): + cred = ForgeCredential( + token="ghp_abcdef1234567890", + expires_at=time.time() + 3600, + scopes=["contents"], + forge_type="github", + ) + r = repr(cred) + assert "ghp_abcdef1234567890" not in r + assert "****7890" in r + + def test_str_masks_token(self): + cred = ForgeCredential( + token="glpat-abcdef1234567890", + expires_at=time.time() + 3600, + scopes=[], + forge_type="gitlab", + ) + assert "glpat-abcdef1234567890" not in str(cred) + + def test_fingerprint(self): + cred = ForgeCredential( + token="ghp_abc123xyz", + expires_at=time.time() + 3600, + scopes=[], + forge_type="github", + ) + assert cred.fingerprint == "****3xyz" + + def test_is_expired(self): + cred = ForgeCredential( + token="tok", expires_at=time.time() - 10, + scopes=[], forge_type="github", + ) + assert cred.is_expired + + def test_not_expired(self): + cred = ForgeCredential( + token="tok", expires_at=time.time() + 3600, + scopes=[], forge_type="github", + ) + assert not cred.is_expired + + +class TestValidation: + def test_forge_url_rejects_http(self): + with pytest.raises(ValueError, match="HTTPS"): + _validate_forge_url("http://github.com") + + def test_forge_url_accepts_https(self): + _validate_forge_url("https://github.com") # no exception + + def test_repos_rejects_wildcards(self): + with pytest.raises(ValueError, match="wildcards"): + _validate_repos(["org/*"]) + + def test_repos_rejects_bad_format(self): + with pytest.raises(ValueError, match="owner/repo"): + _validate_repos(["just-a-repo"]) + + def test_repos_accepts_valid(self): + _validate_repos(["org/repo", "other/repo2"]) # no exception + + def test_branch_pattern_rejects_invalid_regex(self): + with pytest.raises(ValueError, match="Invalid branch pattern"): + _validate_branch_pattern("[invalid") + + def test_branch_pattern_accepts_valid(self): + _validate_branch_pattern(r"agent/implementation/.*") # no exception + + def test_branch_pattern_accepts_empty(self): + _validate_branch_pattern("") # no exception + + def test_branch_pattern_uses_fullmatch_semantics(self): + """Pattern 'main' should not match 'main-evil' with fullmatch.""" + pattern = "main" + assert re.fullmatch(pattern, "main") is not None + assert re.fullmatch(pattern, "main-evil") is None + + +class TestGitHubTokenExchanger: + def test_rejects_http_url(self): + with pytest.raises(ValueError, match="HTTPS"): + GitHubTokenExchanger( + forge_url="http://github.com", + app_id="123", + installation_id="456", + private_key_path="/tmp/nonexistent.pem", + ) + + def test_validates_pem_permissions(self): + with tempfile.NamedTemporaryFile(suffix=".pem", delete=False) as f: + f.write(b"fake-key") + f.flush() + # Make world-readable + os.chmod(f.name, 0o644) + try: + with pytest.raises(PermissionError, match="group/world readable"): + GitHubTokenExchanger( + forge_url="https://github.com", + app_id="123", + installation_id="456", + private_key_path=f.name, + ) + finally: + os.unlink(f.name) + + def test_missing_pem_raises(self): + with pytest.raises(FileNotFoundError): + GitHubTokenExchanger( + forge_url="https://github.com", + app_id="123", + installation_id="456", + private_key_path="/tmp/nonexistent-pem-xyz.pem", + ) + + +class TestResolveForgeExchanger: + def test_empty_type_returns_none(self): + assert resolve_forge_exchanger(forge_type="", forge_url="") is None + + def test_unknown_type_raises(self): + with pytest.raises(ValueError, match="Unknown forge type"): + resolve_forge_exchanger(forge_type="bitbucket", forge_url="https://bb.com") + + def test_github_missing_config_raises(self): + with pytest.raises(ValueError, match="github_app_id"): + resolve_forge_exchanger( + forge_type="github", + forge_url="https://github.com", + ) + + def test_forgejo_missing_url_raises(self): + with pytest.raises(ValueError, match="explicit forge_url"): + resolve_forge_exchanger( + forge_type="forgejo", + forge_url="", + github_app_id="1", + github_app_installation_id="2", + github_app_private_key_path="/tmp/nonexistent.pem", + ) + + def test_gitlab_creates_exchanger(self): + exchanger = resolve_forge_exchanger( + forge_type="gitlab", + forge_url="https://gitlab.com", + ) + assert isinstance(exchanger, GitLabTokenExchanger) + + +# =================================================================== +# Token Manager tests +# =================================================================== + +class TestForgeTokenManager: + async def test_get_token_end_to_end(self): + provider = MockIdentityProvider(token="oidc-jwt") + exchanger = MockForgeExchanger(token="ghp_test123", ttl=3600) + manager = ForgeTokenManager(provider, exchanger, audience="github") + + cred = await manager.get_forge_token(repos=["org/repo"]) + assert cred is not None + assert cred.token == "ghp_test123" + assert exchanger.last_oidc_token == "oidc-jwt" + assert exchanger.last_repos == ["org/repo"] + + async def test_caches_forge_token(self): + provider = MockIdentityProvider() + exchanger = MockForgeExchanger(ttl=3600) + manager = ForgeTokenManager(provider, exchanger) + + c1 = await manager.get_forge_token() + c2 = await manager.get_forge_token() + assert c1 is c2 # same object from cache + assert exchanger.call_count == 1 + + async def test_refreshes_at_80_percent_ttl(self): + provider = MockIdentityProvider() + exchanger = MockForgeExchanger(ttl=100) + manager = ForgeTokenManager(provider, exchanger, skew_seconds=5) + + c1 = await manager.get_forge_token() + assert exchanger.call_count == 1 + + # Manipulate cache to simulate 80% of 100s TTL elapsed: + # fetched_at = 85s ago, expires_at = 15s from now (100s total TTL) + key = manager._cache_key(None, None) + entry = manager._cache[key] + now = time.time() + entry.fetched_at = now - 85 + entry.credential.expires_at = now + 15 # 85/100 = 85% elapsed + + c2 = await manager.get_forge_token() + assert exchanger.call_count == 2 # refreshed + + async def test_handles_provider_failure(self): + provider = MockIdentityProvider() + provider.get_token = AsyncMock(side_effect=RuntimeError("SPIRE down")) + exchanger = MockForgeExchanger() + manager = ForgeTokenManager(provider, exchanger) + + result = await manager.get_forge_token() + assert result is None # graceful failure + + async def test_handles_exchanger_failure_with_stale_cache(self): + provider = MockIdentityProvider() + exchanger = MockForgeExchanger(ttl=3600) + manager = ForgeTokenManager(provider, exchanger, skew_seconds=5) + + # First call succeeds + c1 = await manager.get_forge_token() + assert c1 is not None + + # Force refresh condition + key = manager._cache_key(None, None) + manager._cache[key].fetched_at = time.time() - 3600 + + # Exchanger now fails + exchanger.exchange = AsyncMock(side_effect=RuntimeError("GitHub 500")) + + # Should return stale but still valid cached token + c2 = await manager.get_forge_token() + assert c2 is not None + assert c2.token == c1.token + + async def test_concurrent_access(self): + provider = MockIdentityProvider() + exchanger = MockForgeExchanger(ttl=3600) + manager = ForgeTokenManager(provider, exchanger) + + results = await asyncio.gather(*[ + manager.get_forge_token() for _ in range(10) + ]) + # All should succeed + assert all(r is not None for r in results) + # Only one exchange should have happened (lock prevents duplicates) + # Note: the first concurrent call wins the lock; others find cache populated + assert exchanger.call_count == 1 + + async def test_clock_skew_buffer(self): + provider = MockIdentityProvider() + exchanger = MockForgeExchanger(ttl=25) # 25s TTL + manager = ForgeTokenManager(provider, exchanger, skew_seconds=30) + + c1 = await manager.get_forge_token() + assert c1 is not None + + # Token has 25s TTL but skew buffer is 30s, so it's already "expired" + # The manager should try to refresh on next call + c2 = await manager.get_forge_token() + assert exchanger.call_count == 2 # had to refresh + + async def test_invalidate_clears_cache(self): + provider = MockIdentityProvider() + exchanger = MockForgeExchanger(ttl=3600) + manager = ForgeTokenManager(provider, exchanger) + + await manager.get_forge_token() + assert exchanger.call_count == 1 + manager.invalidate() + await manager.get_forge_token() + assert exchanger.call_count == 2 # cache was cleared + + +class TestCreateTokenManager: + async def test_returns_none_if_no_provider(self): + exchanger = MockForgeExchanger() + result = await create_token_manager(provider=None, exchanger=exchanger) + assert result is None + + async def test_returns_none_if_no_exchanger(self): + provider = MockIdentityProvider() + result = await create_token_manager(provider=provider, exchanger=None) + assert result is None + + async def test_returns_manager_when_both_present(self): + provider = MockIdentityProvider() + exchanger = MockForgeExchanger() + result = await create_token_manager(provider=provider, exchanger=exchanger) + assert isinstance(result, ForgeTokenManager) + + +# =================================================================== +# Registry roundtrip tests +# =================================================================== + +class TestRegistryForgeFields: + def test_agent_config_defaults(self): + from trellis.core.registry import AgentConfig + config = AgentConfig(name="test", description="t") + assert config.forge_repos == [] + assert config.forge_permissions == {} + assert config.forge_branch_pattern == "" + + def test_registry_roundtrip(self, tmp_path): + from trellis.core.registry import AgentConfig, Registry, load_registry + + config = AgentConfig( + name="test", + description="test agent", + forge_repos=["org/repo1", "org/repo2"], + forge_permissions={"contents": "write", "pull_requests": "read"}, + forge_branch_pattern=r"agent/test/.*", + ) + registry = Registry(agents={"test": config}) + path = tmp_path / "registry.yaml" + registry.save(path) + + loaded = load_registry(path) + agent = loaded.get_agent("test") + assert agent is not None + assert agent.forge_repos == ["org/repo1", "org/repo2"] + assert agent.forge_permissions == {"contents": "write", "pull_requests": "read"} + assert agent.forge_branch_pattern == r"agent/test/.*" + + +# =================================================================== +# Config tests +# =================================================================== + +class TestConfigForgeSettings: + def test_default_settings(self): + from trellis.config import Settings + s = Settings( + _env_file=None, + project_root=Path("/tmp"), + blackboard_dir=Path("/tmp/bb"), + workspace_dir=Path("/tmp/ws"), + registry_path=Path("/tmp/reg.yaml"), + ) + assert s.identity_provider == "auto" + assert s.forge_type == "" + assert s.spiffe_endpoint_socket == "/tmp/spire-agent/public/api.sock" + assert s.spiffe_trust_domain == "trellis.local" + assert s.github_app_id == "" + assert s.forge_token_audience == "" diff --git a/trellis/config.py b/trellis/config.py index cab7c80..4136e27 100644 --- a/trellis/config.py +++ b/trellis/config.py @@ -80,6 +80,20 @@ class Settings(BaseSettings): web_host: str = "0.0.0.0" web_port: int = 8000 + # Identity Federation (SPIFFE/SPIRE → Git forge) + identity_provider: str = "auto" # "auto", "spiffe", "none" + spiffe_endpoint_socket: str = "/tmp/spire-agent/public/api.sock" + spiffe_trust_domain: str = "trellis.local" + + # Forge federation + forge_type: str = "" # "github", "gitlab", "forgejo", or empty to disable + forge_url: str = "" # e.g. "https://github.com" or self-hosted URL + github_app_id: str = "" + github_app_installation_id: str = "" + github_app_private_key_path: str = "" # path to PEM file + gitlab_token_exchange_url: str = "" + forge_token_audience: str = "" # OIDC audience for token requests + def get_settings() -> Settings: return Settings() diff --git a/trellis/core/agent.py b/trellis/core/agent.py index 725b678..8beeaf1 100644 --- a/trellis/core/agent.py +++ b/trellis/core/agent.py @@ -434,6 +434,71 @@ def _setup_sandbox_env(self, env: dict, idea_id: str) -> Path | None: ) return None + async def _setup_forge_identity(self, env: dict) -> None: + """Resolve OIDC identity and obtain a forge token, injecting into env. + + This is a no-op if forge federation is not configured. Failures are + logged but never prevent the agent from running — forge access is + best-effort. Mutates *env* in-place. + """ + try: + from trellis.config import get_settings + from trellis.core.identity import ( + create_token_manager, + resolve_forge_exchanger, + resolve_identity_provider, + ) + + settings = get_settings() + if not settings.forge_type: + return + + provider = resolve_identity_provider( + provider_type=settings.identity_provider, + spiffe_endpoint_socket=settings.spiffe_endpoint_socket, + spiffe_trust_domain=settings.spiffe_trust_domain, + ) + exchanger = resolve_forge_exchanger( + forge_type=settings.forge_type, + forge_url=settings.forge_url, + github_app_id=settings.github_app_id, + github_app_installation_id=settings.github_app_installation_id, + github_app_private_key_path=settings.github_app_private_key_path, + gitlab_token_exchange_url=settings.gitlab_token_exchange_url, + ) + manager = await create_token_manager( + provider=provider, + exchanger=exchanger, + audience=settings.forge_token_audience, + ) + if manager is None: + return + + cred = await manager.get_forge_token( + repos=self.config.forge_repos or None, + permissions=self.config.forge_permissions or None, + ) + if cred is None: + logger.warning( + "Forge token exchange returned None for agent '%s'", + self.config.name, + ) + return + + env["FORGE_TOKEN"] = cred.token + env["FORGE_TYPE"] = settings.forge_type + env["FORGE_URL"] = settings.forge_url + logger.info( + "Forge token injected for agent '%s': type=%s, fingerprint=%s", + self.config.name, settings.forge_type, cred.fingerprint, + ) + except Exception as e: + logger.warning( + "Forge identity setup failed for agent '%s': %s. " + "Agent will run without forge access.", + self.config.name, e, + ) + async def _run_global(self, max_turns_override: int | None = None, deadline: datetime | None = None) -> AgentResult: """Run in global mode (phase='*') — iterate over all ideas, no idea-specific context.""" bb_server = create_blackboard_mcp_server(self.blackboard, "__all__") @@ -458,6 +523,7 @@ async def _run_global(self, max_turns_override: int | None = None, deadline: dat if self.config.env: env.update(self.config.env) + await self._setup_forge_identity(env) cli_path = self._setup_sandbox_env(env, "__all__") options = ClaudeAgentOptions( @@ -636,6 +702,7 @@ async def _run_inner(self, idea_id: str, max_turns_override: int | None = None, # which account to look up in the keychain. _ensure_agent_auth(agent_config, self.project_root) + await self._setup_forge_identity(env) cli_path = self._setup_sandbox_env(env, idea_id) if cli_path: logger.info("Using nono wrapper at: %s (exists: %s)", cli_path, cli_path.exists()) diff --git a/trellis/core/identity/__init__.py b/trellis/core/identity/__init__.py new file mode 100644 index 0000000..2c53e51 --- /dev/null +++ b/trellis/core/identity/__init__.py @@ -0,0 +1,34 @@ +"""Secretless identity federation for Git forge access. + +Provides OIDC token sourcing (via SPIFFE/SPIRE) and exchange with +Git forges (GitHub, GitLab, Forgejo) for short-lived access tokens. +""" + +from trellis.core.identity.forge import ( + ForgeCredential, + ForgeTokenExchanger, + ForgejoTokenExchanger, + GitHubTokenExchanger, + GitLabTokenExchanger, + resolve_forge_exchanger, +) +from trellis.core.identity.manager import ForgeTokenManager, create_token_manager +from trellis.core.identity.provider import ( + IdentityProvider, + SpiffeIdentityProvider, + resolve_identity_provider, +) + +__all__ = [ + "ForgeCredential", + "ForgeTokenExchanger", + "ForgeTokenManager", + "ForgejoTokenExchanger", + "GitHubTokenExchanger", + "GitLabTokenExchanger", + "IdentityProvider", + "SpiffeIdentityProvider", + "create_token_manager", + "resolve_forge_exchanger", + "resolve_identity_provider", +] diff --git a/trellis/core/identity/forge.py b/trellis/core/identity/forge.py new file mode 100644 index 0000000..1348464 --- /dev/null +++ b/trellis/core/identity/forge.py @@ -0,0 +1,443 @@ +"""Forge token exchangers — trade OIDC JWTs for Git forge access tokens. + +Each exchanger implements the OIDC-to-forge token exchange flow for a +specific forge type (GitHub, GitLab, Forgejo). +""" + +from __future__ import annotations + +import logging +import random +import re +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime, timezone +from urllib.parse import urlparse + +import httpx + +logger = logging.getLogger(__name__) + +# Token fingerprint: last 4 chars for audit logging without exposing secrets +_TOKEN_MASK_LEN = 4 + +# Retry configuration +_MAX_RETRIES = 3 +_BASE_BACKOFF = 1.0 # seconds +_MAX_BACKOFF = 16.0 + + +@dataclass +class ForgeCredential: + """A short-lived forge access token with metadata.""" + + token: str + expires_at: float # Unix timestamp + scopes: list[str] + forge_type: str + + @property + def fingerprint(self) -> str: + """Last few chars of the token, safe for logging.""" + if len(self.token) > _TOKEN_MASK_LEN: + return f"****{self.token[-_TOKEN_MASK_LEN:]}" + return "****" + + @property + def is_expired(self) -> bool: + return time.time() >= self.expires_at + + def __repr__(self) -> str: + return ( + f"ForgeCredential(token={self.fingerprint!r}, " + f"expires_at={self.expires_at}, " + f"scopes={self.scopes!r}, forge_type={self.forge_type!r})" + ) + + def __str__(self) -> str: + return repr(self) + + def __del__(self) -> None: + # Best-effort memory cleanup — not a security guarantee + try: + if hasattr(self, "token") and isinstance(self.token, str): + # Can't truly zero a Python str, but replace the reference + object.__setattr__(self, "token", "x" * len(self.token)) + except Exception: + pass + + +def _validate_forge_url(url: str) -> None: + """Enforce HTTPS-only forge URLs.""" + parsed = urlparse(url) + if parsed.scheme != "https": + raise ValueError( + f"Forge URL must use HTTPS, got {parsed.scheme!r}: {url}" + ) + + +def _validate_repos(repos: list[str]) -> None: + """Validate repo entries are owner/repo format, no wildcards.""" + for repo in repos: + if "*" in repo or "?" in repo: + raise ValueError(f"Forge repo must not contain wildcards: {repo!r}") + if "/" not in repo or repo.count("/") != 1: + raise ValueError( + f"Forge repo must be in owner/repo format: {repo!r}" + ) + + +def _validate_branch_pattern(pattern: str) -> None: + """Validate that branch pattern is a valid regex.""" + if not pattern: + return + try: + re.compile(pattern) + except re.error as e: + raise ValueError(f"Invalid branch pattern regex {pattern!r}: {e}") from e + + +class ForgeTokenExchanger(ABC): + """Abstract base for forge-specific OIDC token exchange.""" + + @abstractmethod + async def exchange( + self, + oidc_token: str, + repos: list[str] | None = None, + permissions: dict[str, str] | None = None, + ) -> ForgeCredential: + """Exchange an OIDC token for a forge access token.""" + + +async def _request_with_retry( + client: httpx.AsyncClient, + method: str, + url: str, + **kwargs, +) -> httpx.Response: + """HTTP request with exponential backoff on 429/5xx.""" + last_exc: Exception | None = None + for attempt in range(_MAX_RETRIES + 1): + try: + resp = await client.request(method, url, **kwargs) + if resp.status_code == 429 or resp.status_code >= 500: + if attempt < _MAX_RETRIES: + retry_after = resp.headers.get("Retry-After") + if retry_after: + try: + delay = float(retry_after) + except ValueError: + delay = _BASE_BACKOFF * (2**attempt) + else: + delay = _BASE_BACKOFF * (2**attempt) + delay = min(delay, _MAX_BACKOFF) + # Add jitter + delay *= 0.5 + random.random() + logger.warning( + "Forge API returned %d, retrying in %.1fs (attempt %d/%d)", + resp.status_code, delay, attempt + 1, _MAX_RETRIES, + ) + import asyncio + await asyncio.sleep(delay) + continue + return resp + except httpx.TransportError as e: + last_exc = e + if attempt < _MAX_RETRIES: + delay = _BASE_BACKOFF * (2**attempt) * (0.5 + random.random()) + logger.warning( + "Forge API transport error: %s, retrying in %.1fs", e, delay, + ) + import asyncio + await asyncio.sleep(delay) + raise last_exc or RuntimeError("Request failed after retries") + + +def _build_httpx_client(forge_url: str) -> httpx.AsyncClient: + """Build an httpx client with security-hardened defaults.""" + return httpx.AsyncClient( + base_url=forge_url, + timeout=httpx.Timeout(connect=10.0, read=30.0, write=10.0, pool=10.0), + follow_redirects=False, + verify=True, + ) + + +class GitHubTokenExchanger(ForgeTokenExchanger): + """Exchange OIDC tokens for GitHub App installation access tokens. + + Flow: + 1. Sign a JWT assertion using the GitHub App private key + 2. POST /app/installations/{id}/access_tokens with the assertion + 3. Receive a scoped installation token + """ + + def __init__( + self, + forge_url: str, + app_id: str, + installation_id: str, + private_key_path: str, + ) -> None: + _validate_forge_url(forge_url) + self._forge_url = forge_url + self._app_id = app_id + self._installation_id = installation_id + self._private_key_path = private_key_path + self._validate_private_key_permissions() + self._api_url = forge_url.replace("https://github.com", "https://api.github.com") + + def _validate_private_key_permissions(self) -> None: + """Ensure the PEM file isn't world or group readable.""" + import os + import stat as stat_mod + if not os.path.exists(self._private_key_path): + raise FileNotFoundError( + f"GitHub App private key not found: {self._private_key_path}" + ) + mode = os.stat(self._private_key_path).st_mode + if mode & (stat_mod.S_IRGRP | stat_mod.S_IROTH): + raise PermissionError( + f"GitHub App private key is group/world readable " + f"(mode {oct(mode)}): {self._private_key_path}. " + f"Run: chmod 600 {self._private_key_path}" + ) + + def _create_jwt_assertion(self) -> str: + """Create a GitHub App JWT signed with the private key.""" + import json + import base64 + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import padding as asym_padding + from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey + from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature + import struct + + with open(self._private_key_path, "rb") as f: + private_key = serialization.load_pem_private_key(f.read(), password=None) + + now = int(time.time()) + # GitHub allows up to 10 minute expiry for App JWTs + payload = { + "iat": now - 60, # issued 60s ago to account for clock skew + "exp": now + 600, # 10 minute expiry + "iss": self._app_id, + } + + # Build JWT manually to avoid PyJWT dependency + header = {"alg": "RS256", "typ": "JWT"} + segments = [] + for part in (header, payload): + encoded = base64.urlsafe_b64encode( + json.dumps(part, separators=(",", ":")).encode() + ).rstrip(b"=") + segments.append(encoded) + + signing_input = b".".join(segments) + signature = private_key.sign(signing_input, asym_padding.PKCS1v15(), hashes.SHA256()) + segments.append(base64.urlsafe_b64encode(signature).rstrip(b"=")) + return b".".join(segments).decode() + + async def exchange( + self, + oidc_token: str, + repos: list[str] | None = None, + permissions: dict[str, str] | None = None, + ) -> ForgeCredential: + if repos: + _validate_repos(repos) + + jwt_assertion = self._create_jwt_assertion() + + body: dict = {} + if repos: + # GitHub expects just the repo names, not owner/repo + body["repositories"] = [r.split("/", 1)[1] for r in repos] + if permissions: + body["permissions"] = permissions + + async with _build_httpx_client(self._api_url) as client: + resp = await _request_with_retry( + client, + "POST", + f"/app/installations/{self._installation_id}/access_tokens", + json=body if body else None, + headers={ + "Authorization": f"Bearer {jwt_assertion}", + "Accept": "application/vnd.github+json", + "X-GitHub-Api-Version": "2022-11-28", + }, + ) + resp.raise_for_status() + data = resp.json() + + token = data["token"] + # Parse expiry — GitHub returns ISO 8601 + expires_str = data.get("expires_at", "") + if expires_str: + expires_at = datetime.fromisoformat( + expires_str.replace("Z", "+00:00") + ).timestamp() + else: + # Default 1 hour if not provided + expires_at = time.time() + 3600 + + scopes = list((data.get("permissions") or {}).keys()) + + cred = ForgeCredential( + token=token, + expires_at=expires_at, + scopes=scopes, + forge_type="github", + ) + logger.info( + "GitHub token exchanged: %s, expires=%s, scopes=%s", + cred.fingerprint, + datetime.fromtimestamp(expires_at, tz=timezone.utc).isoformat(), + scopes, + ) + return cred + + +class GitLabTokenExchanger(ForgeTokenExchanger): + """Exchange OIDC tokens for GitLab access via token exchange. + + Uses the RFC 8693 token exchange flow: + POST /oauth/token with grant_type=urn:ietf:params:oauth:grant-type:token-exchange + """ + + def __init__(self, forge_url: str, token_exchange_url: str = "") -> None: + _validate_forge_url(forge_url) + self._forge_url = forge_url + self._token_exchange_url = token_exchange_url or f"{forge_url}/oauth/token" + + async def exchange( + self, + oidc_token: str, + repos: list[str] | None = None, + permissions: dict[str, str] | None = None, + ) -> ForgeCredential: + if repos: + _validate_repos(repos) + + body = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token": oidc_token, + "subject_token_type": "urn:ietf:params:oauth:token-type:jwt", + "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", + } + + async with _build_httpx_client(self._forge_url) as client: + resp = await _request_with_retry( + client, + "POST", + self._token_exchange_url, + data=body, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + resp.raise_for_status() + data = resp.json() + + token = data["access_token"] + expires_in = int(data.get("expires_in", 3600)) + expires_at = time.time() + expires_in + + cred = ForgeCredential( + token=token, + expires_at=expires_at, + scopes=data.get("scope", "").split() if data.get("scope") else [], + forge_type="gitlab", + ) + logger.info( + "GitLab token exchanged: %s, expires_in=%ds", + cred.fingerprint, expires_in, + ) + return cred + + +class ForgejoTokenExchanger(ForgeTokenExchanger): + """Exchange OIDC tokens for Forgejo access tokens. + + Forgejo mirrors the GitHub App installation token API, so the flow + is identical to GitHubTokenExchanger. + """ + + def __init__( + self, + forge_url: str, + app_id: str, + installation_id: str, + private_key_path: str, + ) -> None: + _validate_forge_url(forge_url) + self._inner = GitHubTokenExchanger( + forge_url=forge_url, + app_id=app_id, + installation_id=installation_id, + private_key_path=private_key_path, + ) + # Forgejo API is at the same base URL (not a separate api. subdomain) + self._inner._api_url = forge_url + + async def exchange( + self, + oidc_token: str, + repos: list[str] | None = None, + permissions: dict[str, str] | None = None, + ) -> ForgeCredential: + cred = await self._inner.exchange(oidc_token, repos, permissions) + # Re-tag as forgejo + cred.forge_type = "forgejo" + return cred + + +def resolve_forge_exchanger( + *, + forge_type: str, + forge_url: str, + github_app_id: str = "", + github_app_installation_id: str = "", + github_app_private_key_path: str = "", + gitlab_token_exchange_url: str = "", +) -> ForgeTokenExchanger | None: + """Create the appropriate forge exchanger based on configuration.""" + if not forge_type: + return None + + if forge_type == "github": + if not all([github_app_id, github_app_installation_id, github_app_private_key_path]): + raise ValueError( + "GitHub forge requires github_app_id, " + "github_app_installation_id, and github_app_private_key_path" + ) + return GitHubTokenExchanger( + forge_url=forge_url or "https://github.com", + app_id=github_app_id, + installation_id=github_app_installation_id, + private_key_path=github_app_private_key_path, + ) + + if forge_type == "gitlab": + return GitLabTokenExchanger( + forge_url=forge_url or "https://gitlab.com", + token_exchange_url=gitlab_token_exchange_url, + ) + + if forge_type == "forgejo": + if not all([github_app_id, github_app_installation_id, github_app_private_key_path]): + raise ValueError( + "Forgejo forge requires github_app_id, " + "github_app_installation_id, and github_app_private_key_path" + ) + if not forge_url: + raise ValueError("Forgejo forge requires an explicit forge_url") + return ForgejoTokenExchanger( + forge_url=forge_url, + app_id=github_app_id, + installation_id=github_app_installation_id, + private_key_path=github_app_private_key_path, + ) + + raise ValueError(f"Unknown forge type: {forge_type!r}") diff --git a/trellis/core/identity/manager.py b/trellis/core/identity/manager.py new file mode 100644 index 0000000..5f18c81 --- /dev/null +++ b/trellis/core/identity/manager.py @@ -0,0 +1,161 @@ +"""Token lifecycle manager — orchestrates identity providers and forge exchangers. + +Combines an IdentityProvider (SPIFFE) with a ForgeTokenExchanger (GitHub/GitLab/ +Forgejo) to manage the full OIDC-to-forge token lifecycle including caching, +proactive refresh, and concurrent access safety. +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from dataclasses import dataclass, field + +from trellis.core.identity.forge import ( + ForgeCredential, + ForgeTokenExchanger, + _validate_branch_pattern, +) +from trellis.core.identity.provider import IdentityProvider + +logger = logging.getLogger(__name__) + +# Refresh when 80% of TTL has elapsed +_REFRESH_THRESHOLD = 0.80 + +# Minimum seconds of validity to consider a cached token usable +_MIN_VALIDITY_SECONDS = 30.0 + + +@dataclass +class _CacheEntry: + credential: ForgeCredential + fetched_at: float + + +class ForgeTokenManager: + """Manages forge token acquisition, caching, and refresh. + + Thread-safe via asyncio.Lock — multiple concurrent agents can share + a single manager without duplicating token exchange requests. + """ + + def __init__( + self, + provider: IdentityProvider, + exchanger: ForgeTokenExchanger, + *, + audience: str = "", + skew_seconds: float = 30.0, + ) -> None: + self._provider = provider + self._exchanger = exchanger + self._audience = audience + self._skew_seconds = skew_seconds + self._cache: dict[str, _CacheEntry] = {} + self._lock = asyncio.Lock() + + def _cache_key(self, repos: list[str] | None, permissions: dict[str, str] | None) -> str: + """Build a deterministic cache key from request parameters.""" + repo_part = ",".join(sorted(repos or [])) + perm_part = ",".join(f"{k}={v}" for k, v in sorted((permissions or {}).items())) + return f"{repo_part}|{perm_part}" + + def _is_usable(self, entry: _CacheEntry) -> bool: + """Check if a cached credential is still valid (with skew buffer).""" + remaining = entry.credential.expires_at - time.time() + return remaining > self._skew_seconds + + def _should_refresh(self, entry: _CacheEntry) -> bool: + """Check if a cached credential should be proactively refreshed.""" + total_ttl = entry.credential.expires_at - entry.fetched_at + if total_ttl <= 0: + return True + elapsed = time.time() - entry.fetched_at + return (elapsed / total_ttl) >= _REFRESH_THRESHOLD + + async def get_forge_token( + self, + repos: list[str] | None = None, + permissions: dict[str, str] | None = None, + ) -> ForgeCredential | None: + """Obtain a forge credential, using cache when possible. + + Returns None if the identity provider or forge exchanger fails. + Logs errors but does not raise — callers should handle None + gracefully (agent continues without forge access). + """ + key = self._cache_key(repos, permissions) + + # Fast path: check cache without lock + entry = self._cache.get(key) + if entry and self._is_usable(entry) and not self._should_refresh(entry): + return entry.credential + + # Slow path: acquire lock and refresh + async with self._lock: + # Double-check after acquiring lock + entry = self._cache.get(key) + if entry and self._is_usable(entry) and not self._should_refresh(entry): + return entry.credential + + try: + cred = await self._exchange(repos, permissions) + self._cache[key] = _CacheEntry( + credential=cred, + fetched_at=time.time(), + ) + return cred + except Exception: + logger.exception("Failed to obtain forge token") + # Return stale-but-valid cached token if available + if entry and self._is_usable(entry): + logger.warning( + "Using stale cached token (fingerprint=%s)", + entry.credential.fingerprint, + ) + return entry.credential + return None + + async def _exchange( + self, + repos: list[str] | None, + permissions: dict[str, str] | None, + ) -> ForgeCredential: + """Execute the full OIDC → forge exchange flow.""" + logger.info( + "Exchanging OIDC token via %s for forge access (repos=%s)", + self._provider.provider_name(), + repos, + ) + oidc_token = await self._provider.get_token(self._audience) + cred = await self._exchanger.exchange(oidc_token, repos, permissions) + logger.info( + "Forge token obtained: fingerprint=%s, expires_in=%.0fs", + cred.fingerprint, + cred.expires_at - time.time(), + ) + return cred + + def invalidate(self) -> None: + """Clear all cached tokens (e.g. on permission change).""" + self._cache.clear() + + +async def create_token_manager( + *, + provider: IdentityProvider | None, + exchanger: ForgeTokenExchanger | None, + audience: str = "", + skew_seconds: float = 30.0, +) -> ForgeTokenManager | None: + """Create a ForgeTokenManager if both provider and exchanger are available.""" + if provider is None or exchanger is None: + return None + return ForgeTokenManager( + provider=provider, + exchanger=exchanger, + audience=audience, + skew_seconds=skew_seconds, + ) diff --git a/trellis/core/identity/provider.py b/trellis/core/identity/provider.py new file mode 100644 index 0000000..947a585 --- /dev/null +++ b/trellis/core/identity/provider.py @@ -0,0 +1,236 @@ +"""Identity providers for OIDC token sourcing. + +Provides an abstract IdentityProvider interface and a concrete SPIFFE/SPIRE +implementation that fetches JWT-SVIDs from the SPIRE Agent Workload API. +""" + +from __future__ import annotations + +import base64 +import json +import logging +import os +import stat +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass + +logger = logging.getLogger(__name__) + + +class IdentityProvider(ABC): + """Abstract base for OIDC identity token sources.""" + + @abstractmethod + async def get_token(self, audience: str) -> str: + """Return a JWT suitable for OIDC federation with the given audience.""" + + @abstractmethod + def provider_name(self) -> str: + """Human-readable name for logging and diagnostics.""" + + +def _parse_jwt_exp(token: str) -> float | None: + """Extract the ``exp`` claim from a JWT without verifying the signature. + + We only need to know when the token expires so we can refresh proactively. + The relying party (Git forge) is responsible for full verification. + """ + try: + parts = token.split(".") + if len(parts) != 3: + return None + # JWT base64url padding + payload = parts[1] + payload += "=" * (-len(payload) % 4) + claims = json.loads(base64.urlsafe_b64decode(payload)) + return float(claims["exp"]) + except Exception: + return None + + +@dataclass +class _CachedToken: + token: str + exp: float # Unix timestamp + + +class SpiffeIdentityProvider(IdentityProvider): + """Fetches JWT-SVIDs from the SPIRE Agent Workload API. + + Connects via Unix domain socket to the local SPIRE agent and requests + a JWT-SVID for the given audience. Tokens are cached until they approach + expiry (controlled by ``skew_seconds``). + """ + + def __init__( + self, + socket_path: str = "/tmp/spire-agent/public/api.sock", + trust_domain: str = "trellis.local", + spiffe_id_prefix: str | None = None, + skew_seconds: float = 30.0, + ) -> None: + self._socket_path = socket_path + self._trust_domain = trust_domain + self._spiffe_id_prefix = spiffe_id_prefix + self._skew_seconds = skew_seconds + self._cache: dict[str, _CachedToken] = {} + self._validate_socket_path(socket_path) + self._validate_trust_domain(trust_domain) + + # ------------------------------------------------------------------ + # Validation helpers + # ------------------------------------------------------------------ + + @staticmethod + def _validate_socket_path(path: str) -> None: + """Reject paths that clearly are not Unix domain sockets.""" + if not path: + raise ValueError("SPIFFE endpoint socket path must not be empty") + # We only validate at construction time if the file exists. + # At runtime the socket may appear later (SPIRE agent restart). + if os.path.exists(path): + mode = os.stat(path).st_mode + if not stat.S_ISSOCK(mode): + raise ValueError( + f"SPIFFE endpoint path is not a Unix domain socket: {path}" + ) + + @staticmethod + def _validate_trust_domain(domain: str) -> None: + if not domain: + raise ValueError("SPIFFE trust domain must not be empty") + if "/" in domain or "\\" in domain: + raise ValueError( + f"SPIFFE trust domain must not contain path separators: {domain!r}" + ) + if "*" in domain or "?" in domain: + raise ValueError( + f"SPIFFE trust domain must not contain wildcards: {domain!r}" + ) + + # ------------------------------------------------------------------ + # IdentityProvider interface + # ------------------------------------------------------------------ + + def provider_name(self) -> str: + return f"spiffe/{self._trust_domain}" + + async def get_token(self, audience: str) -> str: + """Return a cached or freshly-fetched JWT-SVID for *audience*.""" + cached = self._cache.get(audience) + now = time.time() + if cached and (cached.exp - self._skew_seconds) > now: + return cached.token + + token = await self._fetch_jwt_svid(audience) + exp = _parse_jwt_exp(token) + if exp is not None: + self._cache[audience] = _CachedToken(token=token, exp=exp) + return token + + # ------------------------------------------------------------------ + # SPIRE Workload API + # ------------------------------------------------------------------ + + async def _fetch_jwt_svid(self, audience: str) -> str: + """Call the SPIRE Agent Workload API to mint a JWT-SVID. + + Uses pyspiffe if available; otherwise falls back to a minimal + gRPC-free HTTP implementation against the Workload API. + """ + try: + return await self._fetch_via_pyspiffe(audience) + except ImportError: + logger.debug("pyspiffe not installed, using direct Workload API call") + return await self._fetch_via_http(audience) + + async def _fetch_via_pyspiffe(self, audience: str) -> str: + """Use the pyspiffe library for Workload API access.""" + from pyspiffe.spiffe_id.spiffe_id import SpiffeId + from pyspiffe.workloadapi.default_jwt_source import DefaultJwtSource + from pyspiffe.workloadapi.workload_api_client import WorkloadApiClient + + import anyio + + def _sync_fetch() -> str: + with WorkloadApiClient( + spiffe_endpoint_socket=f"unix://{self._socket_path}" + ) as client: + svid_set = client.fetch_jwt_svids( + audiences=[audience], + hint=self._spiffe_id_prefix or "", + ) + if not svid_set: + raise RuntimeError("SPIRE returned no JWT-SVIDs") + return svid_set[0].token + + return await anyio.to_thread.run_sync(_sync_fetch) + + async def _fetch_via_http(self, audience: str) -> str: + """Minimal direct call to the SPIRE Workload API over Unix socket. + + The SPIRE Workload API exposes an HTTP endpoint on a Unix domain + socket. We issue a POST to /v1/auth/jwt-svids with the audience + list. This avoids requiring pyspiffe/gRPC as a hard dependency. + """ + import httpx + + transport = httpx.AsyncHTTPTransport(uds=self._socket_path) + async with httpx.AsyncClient( + transport=transport, + base_url="http://localhost", + timeout=httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=5.0), + ) as client: + resp = await client.post( + "/v1/auth/jwt-svids", + json={"audience": [audience]}, + headers={"Content-Type": "application/json"}, + ) + resp.raise_for_status() + data = resp.json() + svids = data.get("svids", []) + if not svids: + raise RuntimeError("SPIRE returned no JWT-SVIDs") + return svids[0]["svid"] + + +def resolve_identity_provider( + *, + provider_type: str = "auto", + spiffe_endpoint_socket: str = "/tmp/spire-agent/public/api.sock", + spiffe_trust_domain: str = "trellis.local", + spiffe_id_prefix: str | None = None, + skew_seconds: float = 30.0, +) -> IdentityProvider | None: + """Resolve the identity provider based on configuration and environment. + + Returns None if no provider is available or configured. + """ + if provider_type == "none": + return None + + if provider_type in ("auto", "spiffe"): + if os.path.exists(spiffe_endpoint_socket): + try: + return SpiffeIdentityProvider( + socket_path=spiffe_endpoint_socket, + trust_domain=spiffe_trust_domain, + spiffe_id_prefix=spiffe_id_prefix, + skew_seconds=skew_seconds, + ) + except ValueError as e: + logger.warning("SPIFFE provider unavailable: %s", e) + if provider_type == "spiffe": + raise + return None + elif provider_type == "spiffe": + raise FileNotFoundError( + f"SPIFFE endpoint socket not found: {spiffe_endpoint_socket}" + ) + + if provider_type not in ("auto", "none"): + raise ValueError(f"Unknown identity provider type: {provider_type!r}") + + logger.info("No identity provider available (auto-detect found nothing)") + return None diff --git a/trellis/core/registry.py b/trellis/core/registry.py index 046968c..f583bfc 100644 --- a/trellis/core/registry.py +++ b/trellis/core/registry.py @@ -43,6 +43,11 @@ class AgentConfig: sandbox_profile: str = "claude-code" # Base nono profile sandbox_verify_attestations: bool = False # Require signed instruction files + # Identity federation (per-agent forge access) + forge_repos: list[str] = field(default_factory=list) # repos agent can access + forge_permissions: dict[str, str] = field(default_factory=dict) # e.g. {"contents": "write"} + forge_branch_pattern: str = "" # branch restriction regex + @dataclass class Registry: