From 10eec4dca6c7e69fde58fa0db13f39cf9ce3d542 Mon Sep 17 00:00:00 2001 From: Algis Dumbris Date: Tue, 16 Jun 2026 11:01:45 +0300 Subject: [PATCH] feat(broker): per-user credential-brokering audit records (spec 074 T10) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Emit a secret-free activity-log record on every per-user credential operation — acquisition, refresh, injection, and connect — carrying attribution (user id, server, request id), the acquisition method, and the outcome (success/failure + coarse reason). No token/refresh/secret value is ever recorded (FR-028, FR-029, SC-006; User Story 4 payoff). - broker.AuditEvent/AuditSink: a structurally secret-free audit port (no field can hold a credential) with a default no-op sink. - CredentialResolver.Resolve emits one classified event per resolution (acquire / refresh / inject / connect), threading the action out of the single-flight acquire so success and failure are attributed correctly. - OAuthConnector.Complete/Deny emit the connect-flow event; failure reasons are fixed coarse labels, never the raw upstream token-endpoint body. - storage.ActivityTypeCredentialBroker + an activity-log-backed AuditSink adapter (SaveActivityAsync, non-blocking) that reuses the existing activity log; wired into the live connect flow via NewCredentialHandlers. TDD: resolver/connector emit correct attribution on success and failure and never leak token material; the adapter persists records with no secret. --- docs/cli/activity-commands.md | 4 +- docs/features/activity-log.md | 3 +- internal/serveredition/api/broker_audit.go | 71 +++++ .../serveredition/api/broker_audit_test.go | 124 +++++++++ .../serveredition/api/connector_provider.go | 9 +- .../serveredition/api/credential_handlers.go | 8 +- .../api/credential_handlers_test.go | 22 +- internal/serveredition/broker/audit.go | 95 +++++++ .../broker/audit_connector_test.go | 145 ++++++++++ .../broker/audit_resolver_test.go | 255 ++++++++++++++++++ .../broker/credential_resolver.go | 112 ++++++-- .../serveredition/broker/oauth_connector.go | 42 ++- .../broker/oauth_connector_test.go | 20 +- internal/serveredition/setup.go | 8 +- internal/storage/activity_models.go | 5 + 15 files changed, 873 insertions(+), 50 deletions(-) create mode 100644 internal/serveredition/api/broker_audit.go create mode 100644 internal/serveredition/api/broker_audit_test.go create mode 100644 internal/serveredition/broker/audit.go create mode 100644 internal/serveredition/broker/audit_connector_test.go create mode 100644 internal/serveredition/broker/audit_resolver_test.go diff --git a/docs/cli/activity-commands.md b/docs/cli/activity-commands.md index 8f93ed096..1d7616e17 100644 --- a/docs/cli/activity-commands.md +++ b/docs/cli/activity-commands.md @@ -54,7 +54,7 @@ mcpproxy activity list [flags] | Flag | Short | Default | Description | |------|-------|---------|-------------| -| `--type` | `-t` | | Filter by type (comma-separated for multiple): `tool_call`, `system_start`, `system_stop`, `internal_tool_call`, `config_change`, `policy_decision`, `quarantine_change`, `server_change` | +| `--type` | `-t` | | Filter by type (comma-separated for multiple): `tool_call`, `system_start`, `system_stop`, `internal_tool_call`, `config_change`, `policy_decision`, `quarantine_change`, `server_change`, `credential_broker` | | `--server` | `-s` | | Filter by server name | | `--tool` | | | Filter by tool name | | `--status` | | | Filter by status: `success`, `error`, `blocked` | @@ -175,7 +175,7 @@ mcpproxy activity watch [flags] | Flag | Short | Default | Description | |------|-------|---------|-------------| -| `--type` | `-t` | | Filter by type (comma-separated for multiple): `tool_call`, `system_start`, `system_stop`, `internal_tool_call`, `config_change`, `policy_decision`, `quarantine_change`, `server_change` | +| `--type` | `-t` | | Filter by type (comma-separated for multiple): `tool_call`, `system_start`, `system_stop`, `internal_tool_call`, `config_change`, `policy_decision`, `quarantine_change`, `server_change`, `credential_broker` | | `--server` | `-s` | | Filter by server name | ### Examples diff --git a/docs/features/activity-log.md b/docs/features/activity-log.md index edb46f379..16d402cee 100644 --- a/docs/features/activity-log.md +++ b/docs/features/activity-log.md @@ -25,6 +25,7 @@ The activity log captures: | `policy_decision` | Tool calls blocked by policy rules | | `quarantine_change` | Server quarantine/unquarantine events | | `server_change` | Server enable/disable/restart events | +| `credential_broker` | Per-user credential brokering events (acquire/refresh/inject/connect) — server edition only | ### System Lifecycle Events @@ -284,7 +285,7 @@ GET /api/v1/activity | Parameter | Type | Description | |-----------|------|-------------| -| `type` | string | Filter by type (comma-separated for multiple): `tool_call`, `system_start`, `system_stop`, `internal_tool_call`, `config_change`, `policy_decision`, `quarantine_change`, `server_change` | +| `type` | string | Filter by type (comma-separated for multiple): `tool_call`, `system_start`, `system_stop`, `internal_tool_call`, `config_change`, `policy_decision`, `quarantine_change`, `server_change`, `credential_broker` | | `server` | string | Filter by server name | | `tool` | string | Filter by tool name | | `session_id` | string | Filter by MCP session ID | diff --git a/internal/serveredition/api/broker_audit.go b/internal/serveredition/api/broker_audit.go new file mode 100644 index 000000000..44278638a --- /dev/null +++ b/internal/serveredition/api/broker_audit.go @@ -0,0 +1,71 @@ +//go:build server + +package api + +import ( + "context" + + "go.uber.org/zap" + + "github.com/smart-mcp-proxy/mcpproxy-go/internal/serveredition/broker" + "github.com/smart-mcp-proxy/mcpproxy-go/internal/storage" +) + +// activityAuditSink persists per-user credential-brokering audit events to the +// existing activity log (spec 074 T10). It implements broker.AuditSink by writing +// an ActivityRecord of type ActivityTypeCredentialBroker. Writes are async so the +// broker's request/connect path is never blocked on storage. +// +// The record carries attribution (UserID, ServerName, RequestID) and the +// method/action/outcome metadata only — there is no field, here or in the source +// AuditEvent, able to hold a token or secret value (FR-028/FR-029). +type activityAuditSink struct { + storage *storage.Manager + logger *zap.SugaredLogger +} + +// NewActivityAuditSink builds an audit sink backed by the activity log. It +// returns nil when no storage manager is available, so callers fall back to the +// broker's internal no-op sink and brokering still works (auditing is best-effort). +func NewActivityAuditSink(sm *storage.Manager, logger *zap.SugaredLogger) broker.AuditSink { + if sm == nil { + return nil + } + if logger == nil { + logger = zap.NewNop().Sugar() + } + return &activityAuditSink{storage: sm, logger: logger} +} + +// RecordBrokerEvent maps a broker.AuditEvent onto an ActivityRecord and stores it +// asynchronously. It never copies token/secret material (FR-029). +func (s *activityAuditSink) RecordBrokerEvent(_ context.Context, ev broker.AuditEvent) { + if s == nil || s.storage == nil { + return + } + + status := "success" + if ev.Outcome == broker.AuditOutcomeFailure { + status = "error" + } + + record := &storage.ActivityRecord{ + Type: storage.ActivityTypeCredentialBroker, + Source: storage.ActivitySourceInternal, + ServerName: ev.ServerName, + Status: status, + UserID: ev.UserID, + RequestID: ev.RequestID, + Metadata: map[string]interface{}{ + "broker_method": ev.Method, + "broker_action": ev.Action, + }, + } + if ev.Outcome == broker.AuditOutcomeFailure && ev.Reason != "" { + // The reason is a coarse, secret-free label produced by the broker. + record.ErrorMessage = ev.Reason + record.Metadata["reason"] = ev.Reason + } + + s.storage.SaveActivityAsync(record) +} diff --git a/internal/serveredition/api/broker_audit_test.go b/internal/serveredition/api/broker_audit_test.go new file mode 100644 index 000000000..52bc68ae8 --- /dev/null +++ b/internal/serveredition/api/broker_audit_test.go @@ -0,0 +1,124 @@ +//go:build server + +package api + +import ( + "context" + "testing" + "time" + + "go.uber.org/zap" + + "github.com/smart-mcp-proxy/mcpproxy-go/internal/serveredition/broker" + "github.com/smart-mcp-proxy/mcpproxy-go/internal/storage" +) + +// waitForActivities polls the activity log until at least n records of the given +// type exist or the deadline passes (SaveActivityAsync writes in a goroutine). +func waitForActivities(t *testing.T, m *storage.Manager, n int) []*storage.ActivityRecord { + t.Helper() + deadline := time.Now().Add(2 * time.Second) + for { + recs, _, err := m.ListActivities(storage.ActivityFilter{ + Types: []string{string(storage.ActivityTypeCredentialBroker)}, + Limit: 100, + }) + if err != nil { + t.Fatalf("ListActivities: %v", err) + } + if len(recs) >= n { + return recs + } + if time.Now().After(deadline) { + t.Fatalf("timed out waiting for %d credential_broker records, got %d", n, len(recs)) + } + time.Sleep(10 * time.Millisecond) + } +} + +func findByAction(recs []*storage.ActivityRecord, action string) *storage.ActivityRecord { + for _, r := range recs { + if r.Metadata != nil && r.Metadata["broker_action"] == action { + return r + } + } + return nil +} + +func TestActivityAuditSink_PersistsAttributionNoSecret(t *testing.T) { + mgr, err := storage.NewManager(t.TempDir(), zap.NewNop().Sugar()) + if err != nil { + t.Fatalf("NewManager: %v", err) + } + defer mgr.Close() + + sink := NewActivityAuditSink(mgr, zap.NewNop().Sugar()) + if sink == nil { + t.Fatal("expected non-nil sink for a real storage manager") + } + + // A successful acquisition and a failed connect. + sink.RecordBrokerEvent(context.Background(), broker.AuditEvent{ + UserID: "alice", + ServerName: "grafana", + Method: broker.AuditMethodTokenExchange, + Action: broker.AuditActionAcquire, + Outcome: broker.AuditOutcomeSuccess, + RequestID: "req-abc", + }) + sink.RecordBrokerEvent(context.Background(), broker.AuditEvent{ + UserID: "bob", + ServerName: "github", + Method: broker.AuditMethodConnect, + Action: broker.AuditActionConnect, + Outcome: broker.AuditOutcomeFailure, + Reason: "token endpoint exchange failed", + RequestID: "req-def", + }) + + recs := waitForActivities(t, mgr, 2) + + acq := findByAction(recs, broker.AuditActionAcquire) + if acq == nil { + t.Fatal("acquire record not found") + } + if acq.UserID != "alice" || acq.ServerName != "grafana" { + t.Fatalf("missing attribution: user=%q server=%q", acq.UserID, acq.ServerName) + } + if acq.RequestID != "req-abc" { + t.Fatalf("request_id not persisted: %q", acq.RequestID) + } + if acq.Status != "success" { + t.Fatalf("expected success status, got %q", acq.Status) + } + if acq.Metadata["broker_method"] != broker.AuditMethodTokenExchange { + t.Fatalf("method metadata missing: %v", acq.Metadata["broker_method"]) + } + + conn := findByAction(recs, broker.AuditActionConnect) + if conn == nil { + t.Fatal("connect record not found") + } + if conn.Status != "error" { + t.Fatalf("expected error status, got %q", conn.Status) + } + if conn.ErrorMessage != "token endpoint exchange failed" { + t.Fatalf("expected coarse error message, got %q", conn.ErrorMessage) + } + + // No record may carry token/secret material in any visible field. + for _, r := range recs { + if r.Arguments != nil { + t.Fatalf("credential_broker record must not carry arguments: %v", r.Arguments) + } + if r.Response != "" { + t.Fatalf("credential_broker record must not carry a response: %q", r.Response) + } + } +} + +func TestNewActivityAuditSink_NilStorageReturnsNil(t *testing.T) { + if sink := NewActivityAuditSink(nil, zap.NewNop().Sugar()); sink != nil { + t.Fatalf("expected nil sink for nil storage, got %T", sink) + } +} diff --git a/internal/serveredition/api/connector_provider.go b/internal/serveredition/api/connector_provider.go index 9a64103c8..2b5403efd 100644 --- a/internal/serveredition/api/connector_provider.go +++ b/internal/serveredition/api/connector_provider.go @@ -26,20 +26,23 @@ import ( type connectorProvider struct { store broker.CredentialStore logger *zap.Logger + audit broker.AuditSink // connect-flow audit sink (spec 074 T10); nil = no-op mu sync.Mutex baseURL string // gateway public origin, e.g. "https://gw.example.com" cache map[string]*broker.OAuthConnector } -// newConnectorProvider constructs an empty provider. -func newConnectorProvider(store broker.CredentialStore, logger *zap.Logger) *connectorProvider { +// newConnectorProvider constructs an empty provider. A nil audit sink disables +// connect-flow audit emission. +func newConnectorProvider(store broker.CredentialStore, logger *zap.Logger, audit broker.AuditSink) *connectorProvider { if logger == nil { logger = zap.NewNop() } return &connectorProvider{ store: store, logger: logger, + audit: audit, cache: make(map[string]*broker.OAuthConnector), } } @@ -88,7 +91,7 @@ func (p *connectorProvider) connector(server *config.ServerConfig) (*broker.OAut RedirectURI: p.callbackURLLocked(server.Name), Resource: ab.Resource, } - conn, err := broker.NewOAuthConnector(p.store, cfg, p.logger) + conn, err := broker.NewOAuthConnector(p.store, cfg, p.logger, p.audit) if err != nil { return nil, err } diff --git a/internal/serveredition/api/credential_handlers.go b/internal/serveredition/api/credential_handlers.go index 42a5c55ae..c7604b7eb 100644 --- a/internal/serveredition/api/credential_handlers.go +++ b/internal/serveredition/api/credential_handlers.go @@ -44,15 +44,17 @@ type CredentialHandlers struct { } // NewCredentialHandlers builds the handlers over a credential store and the set -// of shared servers (only those carrying an auth_broker block are brokered). -func NewCredentialHandlers(store broker.CredentialStore, sharedServers []*config.ServerConfig, logger *zap.SugaredLogger) *CredentialHandlers { +// of shared servers (only those carrying an auth_broker block are brokered). The +// audit sink (spec 074 T10) records per-user connect-flow events to the activity +// log; a nil sink disables audit emission. +func NewCredentialHandlers(store broker.CredentialStore, sharedServers []*config.ServerConfig, audit broker.AuditSink, logger *zap.SugaredLogger) *CredentialHandlers { if logger == nil { logger = zap.NewNop().Sugar() } return &CredentialHandlers{ store: store, brokerServers: sharedServers, - connectors: newConnectorProvider(store, logger.Desugar()), + connectors: newConnectorProvider(store, logger.Desugar(), audit), logger: logger, } } diff --git a/internal/serveredition/api/credential_handlers_test.go b/internal/serveredition/api/credential_handlers_test.go index f7cbfbe71..678605500 100644 --- a/internal/serveredition/api/credential_handlers_test.go +++ b/internal/serveredition/api/credential_handlers_test.go @@ -93,7 +93,7 @@ func TestCredentialsList_RedactsSecrets(t *testing.T) { ObtainedVia: "token_exchange", })) - h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar()) + h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar()) r := credRouter(h, defaultAuthContext()) req := httptest.NewRequest(http.MethodGet, "/api/v1/user/credentials", http.NoBody) @@ -130,7 +130,7 @@ func TestCredentialsList_Statuses(t *testing.T) { Type: "oauth2", AccessToken: "b", ExpiresAt: time.Now().Add(-time.Hour), })) - h := NewCredentialHandlers(store, []*config.ServerConfig{connected, expired, fresh}, zap.NewNop().Sugar()) + h := NewCredentialHandlers(store, []*config.ServerConfig{connected, expired, fresh}, nil, zap.NewNop().Sugar()) r := credRouter(h, defaultAuthContext()) req := httptest.NewRequest(http.MethodGet, "/api/v1/user/credentials", http.NoBody) @@ -164,7 +164,7 @@ func TestCredentialsList_StoreDisabled(t *testing.T) { require.False(t, store.Enabled()) srv := brokerHTTPServer("shared-gh", config.AuthBrokerModeTokenExchange) - h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar()) + h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar()) r := credRouter(h, defaultAuthContext()) req := httptest.NewRequest(http.MethodGet, "/api/v1/user/credentials", http.NoBody) @@ -186,7 +186,7 @@ func TestCredentialsDelete_Removes(t *testing.T) { Type: "oauth2", AccessToken: "a", ExpiresAt: time.Now().Add(time.Hour), })) - h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar()) + h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar()) r := credRouter(h, defaultAuthContext()) req := httptest.NewRequest(http.MethodDelete, "/api/v1/user/credentials/shared-gh", http.NoBody) @@ -201,7 +201,7 @@ func TestCredentialsDelete_Removes(t *testing.T) { func TestCredentialsDelete_UnknownServer404(t *testing.T) { store := credTestStore(t) srv := brokerHTTPServer("shared-gh", config.AuthBrokerModeTokenExchange) - h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar()) + h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar()) r := credRouter(h, defaultAuthContext()) req := httptest.NewRequest(http.MethodDelete, "/api/v1/user/credentials/does-not-exist", http.NoBody) @@ -219,7 +219,7 @@ func TestCredentials_CrossUserIsolation(t *testing.T) { Type: "oauth2", AccessToken: "B-SECRET", ExpiresAt: time.Now().Add(time.Hour), })) - h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar()) + h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar()) // Act as user A. r := credRouter(h, auth.UserContext(testUserID, "a@example.com", "A", "google")) @@ -248,7 +248,7 @@ func TestCredentials_CrossUserIsolation(t *testing.T) { func TestCredentialsConnect_Redirects(t *testing.T) { store := credTestStore(t) srv := brokerHTTPServer("connect-srv", config.AuthBrokerModeOAuthConnect) - h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar()) + h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar()) r := credRouter(h, defaultAuthContext()) req := httptest.NewRequest(http.MethodGet, "/api/v1/user/credentials/connect-srv/connect", http.NoBody) @@ -270,7 +270,7 @@ func TestCredentialsConnect_Redirects(t *testing.T) { func TestCredentialsConnect_NonConnectMode400(t *testing.T) { store := credTestStore(t) srv := brokerHTTPServer("xchg-srv", config.AuthBrokerModeTokenExchange) - h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar()) + h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar()) r := credRouter(h, defaultAuthContext()) req := httptest.NewRequest(http.MethodGet, "/api/v1/user/credentials/xchg-srv/connect", http.NoBody) @@ -292,7 +292,7 @@ func TestCredentialsConnectCallback_StoresCredential(t *testing.T) { srv.AuthBroker.TokenEndpoint = ts.URL sk := serverKeyFor(srv) - h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar()) + h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar()) r := credRouter(h, defaultAuthContext()) // Step 1: connect → capture state from the redirect. @@ -325,7 +325,7 @@ func TestCredentialsCallback_DeniedByUpstream(t *testing.T) { store := credTestStore(t) srv := brokerHTTPServer("connect-srv", config.AuthBrokerModeOAuthConnect) sk := serverKeyFor(srv) - h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar()) + h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar()) r := credRouter(h, defaultAuthContext()) // Begin a flow to register a state. @@ -350,7 +350,7 @@ func TestCredentialsCallback_DeniedByUpstream(t *testing.T) { func TestCredentials_Unauthenticated(t *testing.T) { store := credTestStore(t) srv := brokerHTTPServer("shared-gh", config.AuthBrokerModeTokenExchange) - h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar()) + h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar()) // Empty auth context → unauthenticated. r := credRouter(h, &auth.AuthContext{}) diff --git a/internal/serveredition/broker/audit.go b/internal/serveredition/broker/audit.go new file mode 100644 index 000000000..ecdedef32 --- /dev/null +++ b/internal/serveredition/broker/audit.go @@ -0,0 +1,95 @@ +//go:build server + +package broker + +import ( + "context" + + "github.com/smart-mcp-proxy/mcpproxy-go/internal/config" + "github.com/smart-mcp-proxy/mcpproxy-go/internal/reqcontext" +) + +// Audit vocabulary for per-user credential brokering (spec 074 T10, FR-028). +// These strings are the stable, secret-free attribution values recorded on every +// acquisition / refresh / injection / connect operation. +const ( + // AuditMethodTokenExchange is the RFC 8693 token-exchange acquisition method. + AuditMethodTokenExchange = "token_exchange" + // AuditMethodEntraOBO is the Entra ID on-behalf-of acquisition method. + AuditMethodEntraOBO = "entra_obo" + // AuditMethodConnect is the per-user OAuth connect-flow acquisition method. + AuditMethodConnect = "connect" + // AuditMethodUnknown is recorded when the broker mode is unrecognised. + AuditMethodUnknown = "unknown" + + // AuditActionAcquire is a first-time per-user credential acquisition. + AuditActionAcquire = "acquire" + // AuditActionRefresh is the renewal of a near-expiry per-user credential. + AuditActionRefresh = "refresh" + // AuditActionInject is the use of an already-valid cached credential for + // injection into a proxied request (no new acquisition occurred). + AuditActionInject = "inject" + // AuditActionConnect is the per-user OAuth connect-flow consent/callback. + AuditActionConnect = "connect" + + // AuditOutcomeSuccess marks a successful operation. + AuditOutcomeSuccess = "success" + // AuditOutcomeFailure marks a failed operation; Reason explains why. + AuditOutcomeFailure = "failure" +) + +// AuditEvent is the secret-free record of a single per-user credential-brokering +// operation. It deliberately carries NO token, refresh-token, client-secret, or +// any other credential material (FR-029): there is no field able to hold one, so +// auditing can never leak a secret. Reason is a coarse, secret-free explanation +// drawn from the broker's sentinel errors (which are themselves secret-free). +type AuditEvent struct { + // UserID is the user the credential is brokered for (attribution; FR-028). + UserID string + // ServerName is the brokered upstream's configured name. + ServerName string + // Method is the acquisition method: token_exchange | entra_obo | connect. + Method string + // Action is the operation: acquire | refresh | inject | connect. + Action string + // Outcome is success | failure. + Outcome string + // Reason is a secret-free explanation, set on failure (empty on success). + Reason string + // RequestID correlates the operation with the originating HTTP request. + RequestID string +} + +// AuditSink receives broker audit events and persists them to the activity log. +// Implementations MUST NOT block the caller's request path (use async writes) and +// MUST tolerate a nil/zero event gracefully. +type AuditSink interface { + RecordBrokerEvent(ctx context.Context, ev AuditEvent) +} + +// nopAuditSink discards every event. It is the default when no sink is wired, so +// broker code can always call the sink unconditionally. +type nopAuditSink struct{} + +func (nopAuditSink) RecordBrokerEvent(context.Context, AuditEvent) {} + +// auditMethodForMode maps a configured auth-broker mode to its audit method +// label. An unrecognised mode maps to AuditMethodUnknown rather than leaking the +// raw value. +func auditMethodForMode(mode string) string { + switch mode { + case config.AuthBrokerModeTokenExchange: + return AuditMethodTokenExchange + case config.AuthBrokerModeEntraOBO: + return AuditMethodEntraOBO + case config.AuthBrokerModeOAuthConnect: + return AuditMethodConnect + default: + return AuditMethodUnknown + } +} + +// auditRequestID extracts the correlatable request id from ctx, if present. +func auditRequestID(ctx context.Context) string { + return reqcontext.GetRequestID(ctx) +} diff --git a/internal/serveredition/broker/audit_connector_test.go b/internal/serveredition/broker/audit_connector_test.go new file mode 100644 index 000000000..ce174399e --- /dev/null +++ b/internal/serveredition/broker/audit_connector_test.go @@ -0,0 +1,145 @@ +//go:build server + +package broker + +import ( + "context" + "net/http" + "strings" + "testing" + + "go.uber.org/zap" + + "github.com/smart-mcp-proxy/mcpproxy-go/internal/reqcontext" +) + +// connectorWithSink builds a connector wired to a recording audit sink, pointed +// at the given mock token server. +func connectorWithSink(t *testing.T, m *mockTokenServer) (*OAuthConnector, *recordingSink) { + t.Helper() + sink := &recordingSink{} + c, err := NewOAuthConnector(newConnectorTestStore(t), connectorTestConfig(m.srv.URL), zap.NewNop(), sink) + if err != nil { + t.Fatalf("NewOAuthConnector: %v", err) + } + return c, sink +} + +// assertNoConnectorSecret asserts no event leaked the upstream token or the +// configured client secret (FR-029). +func assertNoConnectorSecret(t *testing.T, m *mockTokenServer, evs []AuditEvent) { + t.Helper() + secrets := []string{m.accessToken, m.refreshToken, "gateway-client-secret"} + for i, ev := range evs { + for _, val := range []string{ev.Reason, ev.Method, ev.Action, ev.Outcome, ev.ServerName, ev.UserID} { + for _, secret := range secrets { + if secret != "" && strings.Contains(val, secret) { + t.Fatalf("event %d leaked secret %q in %q", i, secret, val) + } + } + } + } +} + +func TestAuditConnect_CompleteSuccess_EmitsConnectSuccess(t *testing.T) { + m := newMockTokenServer(t) + c, sink := connectorWithSink(t, m) + + authURL, state, err := c.BuildAuthorizationURL("user-alice") + if err != nil { + t.Fatalf("BuildAuthorizationURL: %v", err) + } + _ = authURL + + ctx := reqcontext.WithRequestID(context.Background(), "req-connect-1") + if _, err := c.Complete(ctx, state, "auth-code"); err != nil { + t.Fatalf("Complete: %v", err) + } + + ev := sink.last(t) + if ev.Method != AuditMethodConnect || ev.Action != AuditActionConnect { + t.Fatalf("expected connect/connect, got %s/%s", ev.Method, ev.Action) + } + if ev.Outcome != AuditOutcomeSuccess { + t.Fatalf("expected success, got %s (reason %q)", ev.Outcome, ev.Reason) + } + if ev.UserID != "user-alice" { + t.Fatalf("expected user attribution, got %q", ev.UserID) + } + if ev.ServerName != "github-mcp" { + t.Fatalf("expected server name, got %q", ev.ServerName) + } + if ev.RequestID != "req-connect-1" { + t.Fatalf("expected request_id correlation, got %q", ev.RequestID) + } + assertNoConnectorSecret(t, m, sink.all()) +} + +func TestAuditConnect_CompleteTokenFailure_EmitsFailure_NoBody(t *testing.T) { + m := newMockTokenServer(t) + m.status = http.StatusBadRequest // AS rejects; body is {"error":"invalid_grant"} + c, sink := connectorWithSink(t, m) + + _, state, err := c.BuildAuthorizationURL("user-bob") + if err != nil { + t.Fatalf("BuildAuthorizationURL: %v", err) + } + if _, err := c.Complete(context.Background(), state, "auth-code"); err == nil { + t.Fatalf("expected Complete to fail") + } + + ev := sink.last(t) + if ev.Outcome != AuditOutcomeFailure { + t.Fatalf("expected failure, got %s", ev.Outcome) + } + if ev.UserID != "user-bob" { + t.Fatalf("expected user attribution on failure, got %q", ev.UserID) + } + // The audit reason is the fixed coarse label, never the upstream body. + if ev.Reason != "token endpoint exchange failed" { + t.Fatalf("expected coarse reason, got %q", ev.Reason) + } + if strings.Contains(ev.Reason, "invalid_grant") { + t.Fatalf("audit reason leaked upstream body: %q", ev.Reason) + } + assertNoConnectorSecret(t, m, sink.all()) +} + +func TestAuditConnect_InvalidState_EmitsFailure(t *testing.T) { + m := newMockTokenServer(t) + c, sink := connectorWithSink(t, m) + + if _, err := c.Complete(context.Background(), "bogus-state", "auth-code"); err == nil { + t.Fatalf("expected Complete to fail on unknown state") + } + ev := sink.last(t) + if ev.Outcome != AuditOutcomeFailure || ev.Action != AuditActionConnect { + t.Fatalf("expected connect/failure, got %s/%s", ev.Action, ev.Outcome) + } + if ev.UserID != "" { + t.Fatalf("unknown state has no attributable user, got %q", ev.UserID) + } +} + +func TestAuditConnect_Deny_EmitsFailure(t *testing.T) { + m := newMockTokenServer(t) + c, sink := connectorWithSink(t, m) + + _, state, err := c.BuildAuthorizationURL("user-carol") + if err != nil { + t.Fatalf("BuildAuthorizationURL: %v", err) + } + if err := c.Deny(state, "access_denied"); err != nil { + t.Fatalf("Deny: %v", err) + } + ev := sink.last(t) + if ev.Outcome != AuditOutcomeFailure || ev.Action != AuditActionConnect { + t.Fatalf("expected connect/failure, got %s/%s", ev.Action, ev.Outcome) + } + if ev.UserID != "user-carol" { + t.Fatalf("expected user attribution on deny, got %q", ev.UserID) + } + if !strings.Contains(ev.Reason, "access_denied") { + t.Fatalf("expected denial reason, got %q", ev.Reason) + } +} diff --git a/internal/serveredition/broker/audit_resolver_test.go b/internal/serveredition/broker/audit_resolver_test.go new file mode 100644 index 000000000..86725c3ab --- /dev/null +++ b/internal/serveredition/broker/audit_resolver_test.go @@ -0,0 +1,255 @@ +//go:build server + +package broker + +import ( + "context" + "errors" + "strings" + "sync" + "testing" + "time" + + "github.com/smart-mcp-proxy/mcpproxy-go/internal/config" + "github.com/smart-mcp-proxy/mcpproxy-go/internal/oauth" + "github.com/smart-mcp-proxy/mcpproxy-go/internal/reqcontext" +) + +// recordingSink captures audit events for assertions. It is concurrency-safe so +// it can be used under the resolver's single-flight without data races. +type recordingSink struct { + mu sync.Mutex + events []AuditEvent +} + +func (s *recordingSink) RecordBrokerEvent(_ context.Context, ev AuditEvent) { + s.mu.Lock() + defer s.mu.Unlock() + s.events = append(s.events, ev) +} + +func (s *recordingSink) all() []AuditEvent { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]AuditEvent, len(s.events)) + copy(out, s.events) + return out +} + +func (s *recordingSink) last(t *testing.T) AuditEvent { + t.Helper() + evs := s.all() + if len(evs) == 0 { + t.Fatalf("expected at least one audit event, got none") + } + return evs[len(evs)-1] +} + +// secretToken is a sentinel access-token value asserted to never appear in any +// audit event (FR-029: no secret material in records). +const secretToken = "SUPER-SECRET-ACCESS-TOKEN-do-not-log" + +func assertNoSecret(t *testing.T, evs []AuditEvent) { + t.Helper() + for i, ev := range evs { + for field, val := range map[string]string{ + "Reason": ev.Reason, "Method": ev.Method, "Action": ev.Action, + "Outcome": ev.Outcome, "ServerName": ev.ServerName, "UserID": ev.UserID, + } { + if strings.Contains(val, secretToken) { + t.Fatalf("event %d %s leaked secret material: %q", i, field, val) + } + } + } +} + +func TestAudit_CacheHit_EmitsInjectSuccess(t *testing.T) { + store := newFakeStore() + server := httpServer("grafana", tokenExchangeBroker()) + key := oauth.GenerateServerKey(server.Name, server.URL) + store.seed("alice", key, &UpstreamCredential{AccessToken: secretToken, ExpiresAt: time.Now().Add(time.Hour)}) + + sink := &recordingSink{} + r := NewCredentialResolver(ResolverDeps{Store: store, Audit: sink}) + + ctx := reqcontext.WithRequestID(context.Background(), "req-123") + if _, err := r.Resolve(ctx, "alice", server); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ev := sink.last(t) + if ev.UserID != "alice" || ev.ServerName != "grafana" { + t.Fatalf("missing attribution: %+v", ev) + } + if ev.Method != AuditMethodTokenExchange { + t.Fatalf("expected method %q, got %q", AuditMethodTokenExchange, ev.Method) + } + if ev.Action != AuditActionInject { + t.Fatalf("expected action %q, got %q", AuditActionInject, ev.Action) + } + if ev.Outcome != AuditOutcomeSuccess { + t.Fatalf("expected success, got %q (reason %q)", ev.Outcome, ev.Reason) + } + if ev.RequestID != "req-123" { + t.Fatalf("expected request_id correlation, got %q", ev.RequestID) + } + assertNoSecret(t, sink.all()) +} + +func TestAudit_FreshTokenExchange_EmitsAcquireSuccess(t *testing.T) { + store := newFakeStore() + server := httpServer("grafana", tokenExchangeBroker()) + ex := &fakeExchanger{cred: &UpstreamCredential{AccessToken: secretToken}} + + sink := &recordingSink{} + r := NewCredentialResolver(ResolverDeps{Store: store, Exchanger: ex, Audit: sink}) + + if _, err := r.Resolve(context.Background(), "bob", server); err != nil { + t.Fatalf("unexpected error: %v", err) + } + ev := sink.last(t) + if ev.Action != AuditActionAcquire || ev.Outcome != AuditOutcomeSuccess { + t.Fatalf("expected acquire/success, got %s/%s", ev.Action, ev.Outcome) + } + if ev.Method != AuditMethodTokenExchange { + t.Fatalf("expected token_exchange method, got %q", ev.Method) + } + assertNoSecret(t, sink.all()) +} + +func TestAudit_NearExpiry_EmitsRefreshSuccess(t *testing.T) { + store := newFakeStore() + server := httpServer("grafana", tokenExchangeBroker()) + key := oauth.GenerateServerKey(server.Name, server.URL) + store.seed("carol", key, &UpstreamCredential{AccessToken: "old", ExpiresAt: time.Now().Add(5 * time.Second)}) + ex := &fakeExchanger{cred: &UpstreamCredential{AccessToken: secretToken}} + + sink := &recordingSink{} + r := NewCredentialResolver(ResolverDeps{Store: store, Exchanger: ex, Audit: sink}) + + if _, err := r.Resolve(context.Background(), "carol", server); err != nil { + t.Fatalf("unexpected error: %v", err) + } + ev := sink.last(t) + if ev.Action != AuditActionRefresh || ev.Outcome != AuditOutcomeSuccess { + t.Fatalf("expected refresh/success, got %s/%s", ev.Action, ev.Outcome) + } + assertNoSecret(t, sink.all()) +} + +func TestAudit_EntraOBO_MethodMapping(t *testing.T) { + store := newFakeStore() + b := &config.AuthBrokerConfig{Mode: config.AuthBrokerModeEntraOBO, TokenEndpoint: "https://idp/token"} + b.ApplyDefaults() + server := httpServer("graph", b) + ex := &fakeExchanger{cred: &UpstreamCredential{AccessToken: secretToken}} + + sink := &recordingSink{} + r := NewCredentialResolver(ResolverDeps{Store: store, Exchanger: ex, Audit: sink}) + + if _, err := r.Resolve(context.Background(), "dave", server); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if ev := sink.last(t); ev.Method != AuditMethodEntraOBO { + t.Fatalf("expected entra_obo method, got %q", ev.Method) + } +} + +func TestAudit_ExchangeFailure_EmitsFailureWithReason_NoSecret(t *testing.T) { + store := newFakeStore() + server := httpServer("grafana", tokenExchangeBroker()) + // The exchanger fails; its error must not contain secret material, and the + // audit reason is derived from it. + ex := &fakeExchanger{err: errors.New("token exchange rejected by authorization server")} + + sink := &recordingSink{} + r := NewCredentialResolver(ResolverDeps{Store: store, Exchanger: ex, Audit: sink}) + + if _, err := r.Resolve(context.Background(), "erin", server); err == nil { + t.Fatalf("expected error") + } + ev := sink.last(t) + if ev.Action != AuditActionAcquire || ev.Outcome != AuditOutcomeFailure { + t.Fatalf("expected acquire/failure, got %s/%s", ev.Action, ev.Outcome) + } + if ev.Reason == "" { + t.Fatalf("failure event must carry a reason") + } + assertNoSecret(t, sink.all()) +} + +func TestAudit_PolicyDenied_EmitsInjectFailure(t *testing.T) { + store := newFakeStore() + server := httpServer("grafana", tokenExchangeBroker()) + key := oauth.GenerateServerKey(server.Name, server.URL) + store.seed("frank", key, &UpstreamCredential{AccessToken: secretToken, ExpiresAt: time.Now().Add(time.Hour)}) + + sink := &recordingSink{} + deny := PolicyHookFunc(func(_ context.Context, _ PolicyInput) (PolicyDecision, error) { + return PolicyDecision{Allow: false, Reason: "blocked by org policy"}, nil + }) + r := NewCredentialResolver(ResolverDeps{Store: store, Policy: deny, Audit: sink}) + + if _, err := r.Resolve(context.Background(), "frank", server); err == nil { + t.Fatalf("expected policy-denied error") + } + ev := sink.last(t) + if ev.Action != AuditActionInject || ev.Outcome != AuditOutcomeFailure { + t.Fatalf("expected inject/failure, got %s/%s", ev.Action, ev.Outcome) + } + if !strings.Contains(ev.Reason, "org policy") { + t.Fatalf("expected policy reason, got %q", ev.Reason) + } + assertNoSecret(t, sink.all()) +} + +func TestAudit_NotConnected_EmitsConnectFailure(t *testing.T) { + store := newFakeStore() + server := httpServer("github", connectBroker()) + key := oauth.GenerateServerKey(server.Name, server.URL) + conn := &fakeConnector{serverKey: key, authURL: "https://idp/authorize?state=xyz"} + + sink := &recordingSink{} + r := NewCredentialResolver(ResolverDeps{Store: store, Connectors: &fakeConnectorProvider{conn: conn}, Audit: sink}) + + if _, err := r.Resolve(context.Background(), "grace", server); err == nil { + t.Fatalf("expected not-connected error") + } + ev := sink.last(t) + if ev.Action != AuditActionConnect || ev.Outcome != AuditOutcomeFailure { + t.Fatalf("expected connect/failure, got %s/%s", ev.Action, ev.Outcome) + } + if ev.Method != AuditMethodConnect { + t.Fatalf("expected connect method, got %q", ev.Method) + } +} + +func TestAudit_StoreDisabled_EmitsInjectFailure(t *testing.T) { + store := newFakeStore() + store.enabled = false + server := httpServer("grafana", tokenExchangeBroker()) + + sink := &recordingSink{} + r := NewCredentialResolver(ResolverDeps{Store: store, Audit: sink}) + + if _, err := r.Resolve(context.Background(), "heidi", server); err == nil { + t.Fatalf("expected store-disabled error") + } + if ev := sink.last(t); ev.Action != AuditActionInject || ev.Outcome != AuditOutcomeFailure { + t.Fatalf("expected inject/failure, got %s/%s", ev.Action, ev.Outcome) + } +} + +func TestAudit_Unauthenticated_NoEvent(t *testing.T) { + store := newFakeStore() + server := httpServer("grafana", tokenExchangeBroker()) + sink := &recordingSink{} + r := NewCredentialResolver(ResolverDeps{Store: store, Audit: sink}) + + if _, err := r.Resolve(context.Background(), "", server); !errors.Is(err, ErrUnauthenticated) { + t.Fatalf("expected ErrUnauthenticated, got %v", err) + } + if evs := sink.all(); len(evs) != 0 { + t.Fatalf("expected no audit event for anonymous caller, got %d", len(evs)) + } +} diff --git a/internal/serveredition/broker/credential_resolver.go b/internal/serveredition/broker/credential_resolver.go index 932cc75de..bd7b0c936 100644 --- a/internal/serveredition/broker/credential_resolver.go +++ b/internal/serveredition/broker/credential_resolver.go @@ -137,13 +137,14 @@ func (e *PolicyDeniedError) Error() string { // ResolverDeps are the collaborators a CredentialResolver needs. Store and // Exchanger are required for token-exchange upstreams; Connectors is required -// only for oauth_connect upstreams. Policy and Logger are optional. +// only for oauth_connect upstreams. Policy, Logger, and Audit are optional. type ResolverDeps struct { Store CredentialStore Exchanger Exchanger Connectors ConnectorProvider Policy PolicyHook Logger *zap.Logger + Audit AuditSink RefreshThreshold time.Duration } @@ -166,11 +167,21 @@ type CredentialResolver struct { conns ConnectorProvider policy PolicyHook logger *zap.Logger + audit AuditSink refreshThreshold time.Duration group singleflight.Group } +// acquisition is the internal result of the per-(user,server) acquire flight. It +// carries the resolved credential (nil on failure) and the audit action that the +// flight performed (acquire / refresh / inject / connect) so Resolve can emit a +// single, correctly-classified audit event for the whole resolution. +type acquisition struct { + cred *UpstreamCredential + action string +} + // NewCredentialResolver constructs a resolver from its dependencies, applying // defaults for the optional fields. func NewCredentialResolver(deps ResolverDeps) *CredentialResolver { @@ -182,6 +193,10 @@ func NewCredentialResolver(deps ResolverDeps) *CredentialResolver { if policy == nil { policy = allowAllPolicy{} } + auditSink := deps.Audit + if auditSink == nil { + auditSink = nopAuditSink{} + } threshold := deps.RefreshThreshold if threshold <= 0 { threshold = defaultRefreshThreshold @@ -192,6 +207,7 @@ func NewCredentialResolver(deps ResolverDeps) *CredentialResolver { conns: deps.Connectors, policy: policy, logger: logger.Named("credential-resolver"), + audit: auditSink, refreshThreshold: threshold, } } @@ -207,7 +223,12 @@ func (r *CredentialResolver) Resolve(ctx context.Context, userID string, server if server == nil || server.AuthBroker == nil { return nil, ErrBrokerNotConfigured } + + method := auditMethodForMode(server.AuthBroker.Mode) if r.store == nil || !r.store.Enabled() { + // A brokered upstream whose store is disabled: a genuine injection attempt + // that cannot proceed — audit it so the operator sees why injection failed. + r.emitAudit(ctx, userID, server.Name, method, AuditActionInject, ErrStoreDisabled) return nil, ErrStoreDisabled } @@ -226,13 +247,23 @@ func (r *CredentialResolver) Resolve(ctx context.Context, userID string, server v, err, _ := r.group.Do(flightKey, func() (interface{}, error) { return r.acquire(context.WithoutCancel(ctx), userID, serverKey, server) }) + + // Recover the action the flight performed so the audit event is classified + // correctly (acquire / refresh / inject / connect) regardless of outcome. + action := AuditActionInject + if acq, ok := v.(*acquisition); ok && acq != nil && acq.action != "" { + action = acq.action + } if err != nil { + r.emitAudit(ctx, userID, server.Name, method, action, err) return nil, err } - cred, ok := v.(*UpstreamCredential) - if !ok || cred == nil { + acq, _ := v.(*acquisition) + if acq == nil || acq.cred == nil { + r.emitAudit(ctx, userID, server.Name, method, action, ErrNoCredential) return nil, ErrNoCredential } + cred := acq.cred // Policy-decision seam: evaluated per call, before the credential is handed // to the caller (FR-015). Default hook allows everything. @@ -243,14 +274,53 @@ func (r *CredentialResolver) Resolve(ctx context.Context, userID string, server Credential: cred, }) if perr != nil { - return nil, fmt.Errorf("credential resolver: policy evaluation failed: %w", perr) + wrapped := fmt.Errorf("credential resolver: policy evaluation failed: %w", perr) + r.emitAudit(ctx, userID, server.Name, method, AuditActionInject, wrapped) + return nil, wrapped } if !decision.Allow { - return nil, &PolicyDeniedError{ServerName: server.Name, Reason: decision.Reason} + denied := &PolicyDeniedError{ServerName: server.Name, Reason: decision.Reason} + r.emitAudit(ctx, userID, server.Name, method, AuditActionInject, denied) + return nil, denied } + + r.emitAudit(ctx, userID, server.Name, method, action, nil) return cred, nil } +// emitAudit records one secret-free credential-brokering audit event. A nil err +// is a success; any other err is recorded as a failure with a secret-free reason +// drawn from the broker's (secret-free) sentinel/actionable errors (FR-028/029). +func (r *CredentialResolver) emitAudit(ctx context.Context, userID, serverName, method, action string, err error) { + ev := AuditEvent{ + UserID: userID, + ServerName: serverName, + Method: method, + Action: action, + Outcome: AuditOutcomeSuccess, + RequestID: auditRequestID(ctx), + } + if err != nil { + ev.Outcome = AuditOutcomeFailure + ev.Reason = auditReason(err) + } + r.audit.RecordBrokerEvent(ctx, ev) +} + +// auditReason renders a secret-free failure reason. NotConnectedError carries a +// purpose-built Reason; all other broker errors are deliberately coarse and +// secret-free, so their Error() text is safe to record. +func auditReason(err error) string { + if err == nil { + return "" + } + var nc *NotConnectedError + if errors.As(err, &nc) && nc.Reason != "" { + return nc.Reason + } + return err.Error() +} + // acquire runs the per-user-only ordering for a single (user, server). It is // invoked inside the single-flight group. // @@ -258,7 +328,7 @@ func (r *CredentialResolver) Resolve(ctx context.Context, userID string, server // not trigger a redundant double acquisition. The Exchanger (T4) and Connector // (T5) persist their results into the store themselves, so the resolver never // calls store.Put — it only reads the cache via store.Get. -func (r *CredentialResolver) acquire(ctx context.Context, userID, serverKey string, server *config.ServerConfig) (*UpstreamCredential, error) { +func (r *CredentialResolver) acquire(ctx context.Context, userID, serverKey string, server *config.ServerConfig) (*acquisition, error) { cfg := server.AuthBroker // 1. Serve a still-valid, not-near-expiry cached credential directly. @@ -267,30 +337,39 @@ func (r *CredentialResolver) acquire(ctx context.Context, userID, serverKey stri switch { case hasCache: if cached.IsValid() && !cached.ExpiresWithin(r.refreshThreshold) { - return cached, nil + // No new acquisition: an existing valid credential is used for injection. + return &acquisition{cred: cached, action: AuditActionInject}, nil } // Stale / near-expiry: renewed by the per-mode path below. case errors.Is(err, ErrNotFound): // No cache: acquired by the per-mode path below. default: // Unexpected store error (not "missing"): surface it. - return nil, fmt.Errorf("credential resolver: load cached credential: %w", err) + return &acquisition{action: AuditActionInject}, + fmt.Errorf("credential resolver: load cached credential: %w", err) } switch cfg.Mode { case config.AuthBrokerModeTokenExchange, config.AuthBrokerModeEntraOBO: // 2. Token-exchange / OBO: the first-acquisition and refresh paths are // identical (re-mint from the stored IdP subject token), so a single - // Exchange call covers both the cache-miss and near-expiry cases. + // Exchange call covers both the cache-miss and near-expiry cases. A + // near-expiry cache hit is a refresh; a cache miss is a first acquisition. + action := AuditActionAcquire + if hasCache { + action = AuditActionRefresh + } if r.exchanger == nil { - return nil, fmt.Errorf("credential resolver: no token exchanger configured for mode %q", cfg.Mode) + return &acquisition{action: action}, + fmt.Errorf("credential resolver: no token exchanger configured for mode %q", cfg.Mode) } - return r.exchanger.Exchange(ctx, userID, serverKey, cfg) + cred, xerr := r.exchanger.Exchange(ctx, userID, serverKey, cfg) + return &acquisition{cred: cred, action: action}, xerr case config.AuthBrokerModeOAuthConnect: conn, cerr := r.connectorFor(server) if cerr != nil { - return nil, cerr + return &acquisition{action: AuditActionConnect}, cerr } // A cached connect-flow credential means the user already connected: // renew transparently via the stored refresh token. Only when that @@ -298,11 +377,12 @@ func (r *CredentialResolver) acquire(ctx context.Context, userID, serverKey stri if hasCache && cached.RefreshToken != "" { refreshed, rerr := conn.Refresh(ctx, userID) if rerr == nil { - return refreshed, nil + return &acquisition{cred: refreshed, action: AuditActionRefresh}, nil } r.logger.Warn("connect-flow credential refresh failed; user must reconnect", zap.String("server", server.Name), zap.Error(rerr)) - return nil, r.notConnected(conn, server, userID, "stored credential expired and refresh failed; reconnect required") + return &acquisition{action: AuditActionRefresh}, + r.notConnected(conn, server, userID, "stored credential expired and refresh failed; reconnect required") } // 3. Never connected, or connected without a usable refresh token and now // expired — both require (re)consent through the connect flow. @@ -310,11 +390,11 @@ func (r *CredentialResolver) acquire(ctx context.Context, userID, serverKey stri if hasCache { reason = "stored credential expired; reconnect required" } - return nil, r.notConnected(conn, server, userID, reason) + return &acquisition{action: AuditActionConnect}, r.notConnected(conn, server, userID, reason) default: // 4. No recognised acquisition strategy and no per-user credential. - return nil, ErrNoCredential + return &acquisition{action: AuditActionAcquire}, ErrNoCredential } } diff --git a/internal/serveredition/broker/oauth_connector.go b/internal/serveredition/broker/oauth_connector.go index 5ec89ce31..2b6c5476f 100644 --- a/internal/serveredition/broker/oauth_connector.go +++ b/internal/serveredition/broker/oauth_connector.go @@ -97,6 +97,7 @@ type OAuthConnector struct { serverKey string client *http.Client logger *zap.Logger + audit AuditSink mu sync.Mutex pending map[string]*pendingFlow @@ -107,26 +108,51 @@ type OAuthConnector struct { } // NewOAuthConnector builds a connector for one upstream. It returns an error if -// the configuration is missing fields required to run a connect flow. -func NewOAuthConnector(store CredentialStore, cfg ConnectorConfig, logger *zap.Logger) (*OAuthConnector, error) { +// the configuration is missing fields required to run a connect flow. A nil +// audit sink disables audit emission (no-op). +func NewOAuthConnector(store CredentialStore, cfg ConnectorConfig, logger *zap.Logger, audit AuditSink) (*OAuthConnector, error) { if err := cfg.validate(); err != nil { return nil, err } if logger == nil { logger = zap.NewNop() } + if audit == nil { + audit = nopAuditSink{} + } return &OAuthConnector{ store: store, cfg: cfg, serverKey: oauth.GenerateServerKey(cfg.ServerName, cfg.ServerURL), client: &http.Client{Timeout: 30 * time.Second}, logger: logger.Named("oauth-connector").With(zap.String("server", cfg.ServerName)), + audit: audit, pending: make(map[string]*pendingFlow), now: time.Now, stateTTL: defaultStateTTL, }, nil } +// emitConnect records a secret-free connect-flow audit event (spec 074 T10). An +// empty reason marks success; a non-empty reason marks failure. The reason is a +// fixed, coarse label chosen by the caller — never a raw upstream error body — +// so the audit log never captures secret material (FR-029). +func (c *OAuthConnector) emitConnect(ctx context.Context, userID, reason string) { + ev := AuditEvent{ + UserID: userID, + ServerName: c.cfg.ServerName, + Method: AuditMethodConnect, + Action: AuditActionConnect, + Outcome: AuditOutcomeSuccess, + RequestID: auditRequestID(ctx), + } + if reason != "" { + ev.Outcome = AuditOutcomeFailure + ev.Reason = reason + } + c.audit.RecordBrokerEvent(ctx, ev) +} + // ServerKey returns the store key this connector persists credentials under. func (c *OAuthConnector) ServerKey() string { return c.serverKey } @@ -184,9 +210,12 @@ func (c *OAuthConnector) BuildAuthorizationURL(userID string) (authURL, state st func (c *OAuthConnector) Complete(ctx context.Context, state, code string) (*UpstreamCredential, error) { flow, err := c.consume(state) if err != nil { + // State unknown/expired: the user is not yet identified for attribution. + c.emitConnect(ctx, "", "invalid or expired state") return nil, err } if code == "" { + c.emitConnect(ctx, flow.userID, "missing authorization code") return nil, fmt.Errorf("oauth connector: empty authorization code") } @@ -199,15 +228,18 @@ func (c *OAuthConnector) Complete(ctx context.Context, state, code string) (*Ups } tok, err := c.postToken(ctx, form) if err != nil { + c.emitConnect(ctx, flow.userID, "token endpoint exchange failed") return nil, err } cred := c.credentialFromToken(tok, "") if err := c.store.Put(flow.userID, c.serverKey, cred); err != nil { + c.emitConnect(ctx, flow.userID, "credential persistence failed") return nil, fmt.Errorf("oauth connector: persist credential: %w", err) } c.logger.Info("stored per-user upstream credential via connect flow", zap.String("user_id", flow.userID)) + c.emitConnect(ctx, flow.userID, "") return cred, nil } @@ -215,9 +247,15 @@ func (c *OAuthConnector) Complete(ctx context.Context, state, code string) (*Ups // error=access_denied). It clears the pending flow and stores nothing. func (c *OAuthConnector) Deny(state, reason string) error { c.mu.Lock() + var userID string + if f, ok := c.pending[state]; ok { + userID = f.userID + } delete(c.pending, state) c.mu.Unlock() c.logger.Info("connect flow denied by user", zap.String("reason", reason)) + // reason is the AS-supplied error code (e.g. "access_denied") — secret-free. + c.emitConnect(context.Background(), userID, "connect denied: "+reason) return nil } diff --git a/internal/serveredition/broker/oauth_connector_test.go b/internal/serveredition/broker/oauth_connector_test.go index 9195b0401..982a1bc11 100644 --- a/internal/serveredition/broker/oauth_connector_test.go +++ b/internal/serveredition/broker/oauth_connector_test.go @@ -49,7 +49,7 @@ func newConnectorTestStore(t *testing.T) CredentialStore { func newTestConnector(t *testing.T, cfg ConnectorConfig) *OAuthConnector { t.Helper() - c, err := NewOAuthConnector(newConnectorTestStore(t), cfg, zap.NewNop()) + c, err := NewOAuthConnector(newConnectorTestStore(t), cfg, zap.NewNop(), nil) if err != nil { t.Fatalf("NewOAuthConnector: %v", err) } @@ -162,7 +162,7 @@ func TestOAuthConnector_Complete_StoresEncryptedToken(t *testing.T) { m := newMockTokenServer(t) cfg := connectorTestConfig(m.srv.URL) store := newConnectorTestStore(t) - c, err := NewOAuthConnector(store, cfg, zap.NewNop()) + c, err := NewOAuthConnector(store, cfg, zap.NewNop(), nil) if err != nil { t.Fatalf("NewOAuthConnector: %v", err) } @@ -221,7 +221,7 @@ func TestOAuthConnector_Complete_StoresEncryptedToken(t *testing.T) { func TestOAuthConnector_Complete_InvalidState(t *testing.T) { m := newMockTokenServer(t) store := newConnectorTestStore(t) - c, _ := NewOAuthConnector(store, connectorTestConfig(m.srv.URL), zap.NewNop()) + c, _ := NewOAuthConnector(store, connectorTestConfig(m.srv.URL), zap.NewNop(), nil) if _, err := c.Complete(context.Background(), "bogus-state", "code"); err == nil { t.Fatal("expected error for unknown state") @@ -234,7 +234,7 @@ func TestOAuthConnector_Complete_InvalidState(t *testing.T) { func TestOAuthConnector_Complete_ExpiredState(t *testing.T) { m := newMockTokenServer(t) store := newConnectorTestStore(t) - c, _ := NewOAuthConnector(store, connectorTestConfig(m.srv.URL), zap.NewNop()) + c, _ := NewOAuthConnector(store, connectorTestConfig(m.srv.URL), zap.NewNop(), nil) base := time.Now() c.now = func() time.Time { return base } @@ -255,7 +255,7 @@ func TestOAuthConnector_Complete_ExpiredState(t *testing.T) { func TestOAuthConnector_Complete_StateIsOneTime(t *testing.T) { m := newMockTokenServer(t) - c, _ := NewOAuthConnector(newConnectorTestStore(t), connectorTestConfig(m.srv.URL), zap.NewNop()) + c, _ := NewOAuthConnector(newConnectorTestStore(t), connectorTestConfig(m.srv.URL), zap.NewNop(), nil) _, state, _ := c.BuildAuthorizationURL("user-alice") if _, err := c.Complete(context.Background(), state, "code"); err != nil { @@ -269,7 +269,7 @@ func TestOAuthConnector_Complete_StateIsOneTime(t *testing.T) { func TestOAuthConnector_Deny_StoresNothing(t *testing.T) { m := newMockTokenServer(t) store := newConnectorTestStore(t) - c, _ := NewOAuthConnector(store, connectorTestConfig(m.srv.URL), zap.NewNop()) + c, _ := NewOAuthConnector(store, connectorTestConfig(m.srv.URL), zap.NewNop(), nil) _, state, _ := c.BuildAuthorizationURL("user-alice") if err := c.Deny(state, "access_denied"); err != nil { @@ -292,7 +292,7 @@ func TestOAuthConnector_Refresh(t *testing.T) { m.accessToken = "refreshed-access-token" m.refreshToken = "" // emulate AS that does not rotate the refresh token store := newConnectorTestStore(t) - c, _ := NewOAuthConnector(store, connectorTestConfig(m.srv.URL), zap.NewNop()) + c, _ := NewOAuthConnector(store, connectorTestConfig(m.srv.URL), zap.NewNop(), nil) // Seed an existing connect-flow credential with a refresh token. seed := &UpstreamCredential{ @@ -340,7 +340,7 @@ func TestOAuthConnector_Refresh(t *testing.T) { func TestOAuthConnector_Refresh_NoRefreshToken(t *testing.T) { m := newMockTokenServer(t) store := newConnectorTestStore(t) - c, _ := NewOAuthConnector(store, connectorTestConfig(m.srv.URL), zap.NewNop()) + c, _ := NewOAuthConnector(store, connectorTestConfig(m.srv.URL), zap.NewNop(), nil) seed := &UpstreamCredential{AccessToken: "at", ObtainedVia: "connect_flow"} // no refresh token if err := store.Put("user-alice", c.ServerKey(), seed); err != nil { @@ -355,7 +355,7 @@ func TestOAuthConnector_Complete_TokenEndpointError(t *testing.T) { m := newMockTokenServer(t) m.status = http.StatusBadRequest store := newConnectorTestStore(t) - c, _ := NewOAuthConnector(store, connectorTestConfig(m.srv.URL), zap.NewNop()) + c, _ := NewOAuthConnector(store, connectorTestConfig(m.srv.URL), zap.NewNop(), nil) _, state, _ := c.BuildAuthorizationURL("user-alice") if _, err := c.Complete(context.Background(), state, "code"); err == nil { @@ -379,7 +379,7 @@ func TestNewOAuthConnector_Validation(t *testing.T) { t.Run(name, func(t *testing.T) { cfg := base mutate(&cfg) - if _, err := NewOAuthConnector(store, cfg, zap.NewNop()); err == nil { + if _, err := NewOAuthConnector(store, cfg, zap.NewNop(), nil); err == nil { t.Errorf("expected validation error for %s", name) } }) diff --git a/internal/serveredition/setup.go b/internal/serveredition/setup.go index 8201c6a34..9ae60e83d 100644 --- a/internal/serveredition/setup.go +++ b/internal/serveredition/setup.go @@ -89,8 +89,12 @@ func setupMultiUserOAuth(deps Dependencies) error { userActivityHandlers := teamsapi.NewUserActivityHandlers(nil, userStore, sharedServers, deps.Logger) // Per-user brokered-credential surfaces (spec 074 T8): list connection // status, disconnect, and the Path B connect/callback flow. Reuses the same - // credential store wired into the OAuth login handler above. - credentialHandlers := teamsapi.NewCredentialHandlers(credStore, sharedServers, deps.Logger) + // credential store wired into the OAuth login handler above. The audit sink + // (spec 074 T10) records connect/acquire/refresh/inject events to the existing + // activity log; a nil StorageManager yields a no-op sink (auditing is + // best-effort and never blocks brokering). + brokerAudit := teamsapi.NewActivityAuditSink(deps.StorageManager, deps.Logger) + credentialHandlers := teamsapi.NewCredentialHandlers(credStore, sharedServers, brokerAudit, deps.Logger) deps.Router.Group(func(r chi.Router) { r.Use(authMiddleware.Middleware()) diff --git a/internal/storage/activity_models.go b/internal/storage/activity_models.go index c4293643c..567da4d26 100644 --- a/internal/storage/activity_models.go +++ b/internal/storage/activity_models.go @@ -33,6 +33,10 @@ const ( ActivityTypeToolQuarantineChange ActivityType = "tool_quarantine_change" // ActivityTypeSecurityScan represents a security scan event (Spec 039) ActivityTypeSecurityScan ActivityType = "security_scan" + // ActivityTypeCredentialBroker represents a per-user credential brokering + // event: acquisition, refresh, injection, or connect (Spec 074 T10). It + // carries attribution (UserID, ServerName) and never any token/secret value. + ActivityTypeCredentialBroker ActivityType = "credential_broker" ) // ValidActivityTypes is the list of all valid activity types for filtering (Spec 024) @@ -47,6 +51,7 @@ var ValidActivityTypes = []string{ string(ActivityTypeConfigChange), string(ActivityTypeToolQuarantineChange), string(ActivityTypeSecurityScan), + string(ActivityTypeCredentialBroker), } // ActivitySource indicates how the activity was triggered