Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/cli/activity-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ mcpproxy activity list [flags]

| Flag | Short | Default | Description |
|------|-------|---------|-------------|
| `--type` | `-t` | | Filter by type (comma-separated for multiple): `tool_call`, `system_start`, `system_stop`, `internal_tool_call`, `config_change`, `policy_decision`, `quarantine_change`, `server_change` |
| `--type` | `-t` | | Filter by type (comma-separated for multiple): `tool_call`, `system_start`, `system_stop`, `internal_tool_call`, `config_change`, `policy_decision`, `quarantine_change`, `server_change`, `credential_broker` |
| `--server` | `-s` | | Filter by server name |
| `--tool` | | | Filter by tool name |
| `--status` | | | Filter by status: `success`, `error`, `blocked` |
Expand Down Expand Up @@ -175,7 +175,7 @@ mcpproxy activity watch [flags]

| Flag | Short | Default | Description |
|------|-------|---------|-------------|
| `--type` | `-t` | | Filter by type (comma-separated for multiple): `tool_call`, `system_start`, `system_stop`, `internal_tool_call`, `config_change`, `policy_decision`, `quarantine_change`, `server_change` |
| `--type` | `-t` | | Filter by type (comma-separated for multiple): `tool_call`, `system_start`, `system_stop`, `internal_tool_call`, `config_change`, `policy_decision`, `quarantine_change`, `server_change`, `credential_broker` |
| `--server` | `-s` | | Filter by server name |

### Examples
Expand Down
3 changes: 2 additions & 1 deletion docs/features/activity-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The activity log captures:
| `policy_decision` | Tool calls blocked by policy rules |
| `quarantine_change` | Server quarantine/unquarantine events |
| `server_change` | Server enable/disable/restart events |
| `credential_broker` | Per-user credential brokering events (acquire/refresh/inject/connect) — server edition only |

### System Lifecycle Events

Expand Down Expand Up @@ -284,7 +285,7 @@ GET /api/v1/activity

| Parameter | Type | Description |
|-----------|------|-------------|
| `type` | string | Filter by type (comma-separated for multiple): `tool_call`, `system_start`, `system_stop`, `internal_tool_call`, `config_change`, `policy_decision`, `quarantine_change`, `server_change` |
| `type` | string | Filter by type (comma-separated for multiple): `tool_call`, `system_start`, `system_stop`, `internal_tool_call`, `config_change`, `policy_decision`, `quarantine_change`, `server_change`, `credential_broker` |
| `server` | string | Filter by server name |
| `tool` | string | Filter by tool name |
| `session_id` | string | Filter by MCP session ID |
Expand Down
71 changes: 71 additions & 0 deletions internal/serveredition/api/broker_audit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//go:build server

package api

import (
"context"

"go.uber.org/zap"

"github.com/smart-mcp-proxy/mcpproxy-go/internal/serveredition/broker"
"github.com/smart-mcp-proxy/mcpproxy-go/internal/storage"
)

// activityAuditSink persists per-user credential-brokering audit events to the
// existing activity log (spec 074 T10). It implements broker.AuditSink by writing
// an ActivityRecord of type ActivityTypeCredentialBroker. Writes are async so the
// broker's request/connect path is never blocked on storage.
//
// The record carries attribution (UserID, ServerName, RequestID) and the
// method/action/outcome metadata only — there is no field, here or in the source
// AuditEvent, able to hold a token or secret value (FR-028/FR-029).
type activityAuditSink struct {
storage *storage.Manager
logger *zap.SugaredLogger
}

// NewActivityAuditSink builds an audit sink backed by the activity log. It
// returns nil when no storage manager is available, so callers fall back to the
// broker's internal no-op sink and brokering still works (auditing is best-effort).
func NewActivityAuditSink(sm *storage.Manager, logger *zap.SugaredLogger) broker.AuditSink {
if sm == nil {
return nil
}
if logger == nil {
logger = zap.NewNop().Sugar()
}
return &activityAuditSink{storage: sm, logger: logger}
}

// RecordBrokerEvent maps a broker.AuditEvent onto an ActivityRecord and stores it
// asynchronously. It never copies token/secret material (FR-029).
func (s *activityAuditSink) RecordBrokerEvent(_ context.Context, ev broker.AuditEvent) {
if s == nil || s.storage == nil {
return
}

status := "success"
if ev.Outcome == broker.AuditOutcomeFailure {
status = "error"
}

record := &storage.ActivityRecord{
Type: storage.ActivityTypeCredentialBroker,
Source: storage.ActivitySourceInternal,
ServerName: ev.ServerName,
Status: status,
UserID: ev.UserID,
RequestID: ev.RequestID,
Metadata: map[string]interface{}{
"broker_method": ev.Method,
"broker_action": ev.Action,
},
}
if ev.Outcome == broker.AuditOutcomeFailure && ev.Reason != "" {
// The reason is a coarse, secret-free label produced by the broker.
record.ErrorMessage = ev.Reason
record.Metadata["reason"] = ev.Reason
}

s.storage.SaveActivityAsync(record)
}
124 changes: 124 additions & 0 deletions internal/serveredition/api/broker_audit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//go:build server

package api

import (
"context"
"testing"
"time"

"go.uber.org/zap"

"github.com/smart-mcp-proxy/mcpproxy-go/internal/serveredition/broker"
"github.com/smart-mcp-proxy/mcpproxy-go/internal/storage"
)

// waitForActivities polls the activity log until at least n records of the given
// type exist or the deadline passes (SaveActivityAsync writes in a goroutine).
func waitForActivities(t *testing.T, m *storage.Manager, n int) []*storage.ActivityRecord {
t.Helper()
deadline := time.Now().Add(2 * time.Second)
for {
recs, _, err := m.ListActivities(storage.ActivityFilter{
Types: []string{string(storage.ActivityTypeCredentialBroker)},
Limit: 100,
})
if err != nil {
t.Fatalf("ListActivities: %v", err)
}
if len(recs) >= n {
return recs
}
if time.Now().After(deadline) {
t.Fatalf("timed out waiting for %d credential_broker records, got %d", n, len(recs))
}
time.Sleep(10 * time.Millisecond)
}
}

func findByAction(recs []*storage.ActivityRecord, action string) *storage.ActivityRecord {
for _, r := range recs {
if r.Metadata != nil && r.Metadata["broker_action"] == action {
return r
}
}
return nil
}

func TestActivityAuditSink_PersistsAttributionNoSecret(t *testing.T) {
mgr, err := storage.NewManager(t.TempDir(), zap.NewNop().Sugar())
if err != nil {
t.Fatalf("NewManager: %v", err)
}
defer mgr.Close()

sink := NewActivityAuditSink(mgr, zap.NewNop().Sugar())
if sink == nil {
t.Fatal("expected non-nil sink for a real storage manager")
}

// A successful acquisition and a failed connect.
sink.RecordBrokerEvent(context.Background(), broker.AuditEvent{
UserID: "alice",
ServerName: "grafana",
Method: broker.AuditMethodTokenExchange,
Action: broker.AuditActionAcquire,
Outcome: broker.AuditOutcomeSuccess,
RequestID: "req-abc",
})
sink.RecordBrokerEvent(context.Background(), broker.AuditEvent{
UserID: "bob",
ServerName: "github",
Method: broker.AuditMethodConnect,
Action: broker.AuditActionConnect,
Outcome: broker.AuditOutcomeFailure,
Reason: "token endpoint exchange failed",
RequestID: "req-def",
})

recs := waitForActivities(t, mgr, 2)

acq := findByAction(recs, broker.AuditActionAcquire)
if acq == nil {
t.Fatal("acquire record not found")
}
if acq.UserID != "alice" || acq.ServerName != "grafana" {
t.Fatalf("missing attribution: user=%q server=%q", acq.UserID, acq.ServerName)
}
if acq.RequestID != "req-abc" {
t.Fatalf("request_id not persisted: %q", acq.RequestID)
}
if acq.Status != "success" {
t.Fatalf("expected success status, got %q", acq.Status)
}
if acq.Metadata["broker_method"] != broker.AuditMethodTokenExchange {
t.Fatalf("method metadata missing: %v", acq.Metadata["broker_method"])
}

conn := findByAction(recs, broker.AuditActionConnect)
if conn == nil {
t.Fatal("connect record not found")
}
if conn.Status != "error" {
t.Fatalf("expected error status, got %q", conn.Status)
}
if conn.ErrorMessage != "token endpoint exchange failed" {
t.Fatalf("expected coarse error message, got %q", conn.ErrorMessage)
}

// No record may carry token/secret material in any visible field.
for _, r := range recs {
if r.Arguments != nil {
t.Fatalf("credential_broker record must not carry arguments: %v", r.Arguments)
}
if r.Response != "" {
t.Fatalf("credential_broker record must not carry a response: %q", r.Response)
}
}
}

func TestNewActivityAuditSink_NilStorageReturnsNil(t *testing.T) {
if sink := NewActivityAuditSink(nil, zap.NewNop().Sugar()); sink != nil {
t.Fatalf("expected nil sink for nil storage, got %T", sink)
}
}
9 changes: 6 additions & 3 deletions internal/serveredition/api/connector_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,23 @@ import (
type connectorProvider struct {
store broker.CredentialStore
logger *zap.Logger
audit broker.AuditSink // connect-flow audit sink (spec 074 T10); nil = no-op

mu sync.Mutex
baseURL string // gateway public origin, e.g. "https://gw.example.com"
cache map[string]*broker.OAuthConnector
}

// newConnectorProvider constructs an empty provider.
func newConnectorProvider(store broker.CredentialStore, logger *zap.Logger) *connectorProvider {
// newConnectorProvider constructs an empty provider. A nil audit sink disables
// connect-flow audit emission.
func newConnectorProvider(store broker.CredentialStore, logger *zap.Logger, audit broker.AuditSink) *connectorProvider {
if logger == nil {
logger = zap.NewNop()
}
return &connectorProvider{
store: store,
logger: logger,
audit: audit,
cache: make(map[string]*broker.OAuthConnector),
}
}
Expand Down Expand Up @@ -88,7 +91,7 @@ func (p *connectorProvider) connector(server *config.ServerConfig) (*broker.OAut
RedirectURI: p.callbackURLLocked(server.Name),
Resource: ab.Resource,
}
conn, err := broker.NewOAuthConnector(p.store, cfg, p.logger)
conn, err := broker.NewOAuthConnector(p.store, cfg, p.logger, p.audit)
if err != nil {
return nil, err
}
Expand Down
8 changes: 5 additions & 3 deletions internal/serveredition/api/credential_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ type CredentialHandlers struct {
}

// NewCredentialHandlers builds the handlers over a credential store and the set
// of shared servers (only those carrying an auth_broker block are brokered).
func NewCredentialHandlers(store broker.CredentialStore, sharedServers []*config.ServerConfig, logger *zap.SugaredLogger) *CredentialHandlers {
// of shared servers (only those carrying an auth_broker block are brokered). The
// audit sink (spec 074 T10) records per-user connect-flow events to the activity
// log; a nil sink disables audit emission.
func NewCredentialHandlers(store broker.CredentialStore, sharedServers []*config.ServerConfig, audit broker.AuditSink, logger *zap.SugaredLogger) *CredentialHandlers {
if logger == nil {
logger = zap.NewNop().Sugar()
}
return &CredentialHandlers{
store: store,
brokerServers: sharedServers,
connectors: newConnectorProvider(store, logger.Desugar()),
connectors: newConnectorProvider(store, logger.Desugar(), audit),
logger: logger,
}
}
Expand Down
22 changes: 11 additions & 11 deletions internal/serveredition/api/credential_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestCredentialsList_RedactsSecrets(t *testing.T) {
ObtainedVia: "token_exchange",
}))

h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar())
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar())
r := credRouter(h, defaultAuthContext())

req := httptest.NewRequest(http.MethodGet, "/api/v1/user/credentials", http.NoBody)
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestCredentialsList_Statuses(t *testing.T) {
Type: "oauth2", AccessToken: "b", ExpiresAt: time.Now().Add(-time.Hour),
}))

h := NewCredentialHandlers(store, []*config.ServerConfig{connected, expired, fresh}, zap.NewNop().Sugar())
h := NewCredentialHandlers(store, []*config.ServerConfig{connected, expired, fresh}, nil, zap.NewNop().Sugar())
r := credRouter(h, defaultAuthContext())

req := httptest.NewRequest(http.MethodGet, "/api/v1/user/credentials", http.NoBody)
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestCredentialsList_StoreDisabled(t *testing.T) {
require.False(t, store.Enabled())

srv := brokerHTTPServer("shared-gh", config.AuthBrokerModeTokenExchange)
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar())
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar())
r := credRouter(h, defaultAuthContext())

req := httptest.NewRequest(http.MethodGet, "/api/v1/user/credentials", http.NoBody)
Expand All @@ -186,7 +186,7 @@ func TestCredentialsDelete_Removes(t *testing.T) {
Type: "oauth2", AccessToken: "a", ExpiresAt: time.Now().Add(time.Hour),
}))

h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar())
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar())
r := credRouter(h, defaultAuthContext())

req := httptest.NewRequest(http.MethodDelete, "/api/v1/user/credentials/shared-gh", http.NoBody)
Expand All @@ -201,7 +201,7 @@ func TestCredentialsDelete_Removes(t *testing.T) {
func TestCredentialsDelete_UnknownServer404(t *testing.T) {
store := credTestStore(t)
srv := brokerHTTPServer("shared-gh", config.AuthBrokerModeTokenExchange)
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar())
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar())
r := credRouter(h, defaultAuthContext())

req := httptest.NewRequest(http.MethodDelete, "/api/v1/user/credentials/does-not-exist", http.NoBody)
Expand All @@ -219,7 +219,7 @@ func TestCredentials_CrossUserIsolation(t *testing.T) {
Type: "oauth2", AccessToken: "B-SECRET", ExpiresAt: time.Now().Add(time.Hour),
}))

h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar())
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar())
// Act as user A.
r := credRouter(h, auth.UserContext(testUserID, "a@example.com", "A", "google"))

Expand Down Expand Up @@ -248,7 +248,7 @@ func TestCredentials_CrossUserIsolation(t *testing.T) {
func TestCredentialsConnect_Redirects(t *testing.T) {
store := credTestStore(t)
srv := brokerHTTPServer("connect-srv", config.AuthBrokerModeOAuthConnect)
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar())
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar())
r := credRouter(h, defaultAuthContext())

req := httptest.NewRequest(http.MethodGet, "/api/v1/user/credentials/connect-srv/connect", http.NoBody)
Expand All @@ -270,7 +270,7 @@ func TestCredentialsConnect_Redirects(t *testing.T) {
func TestCredentialsConnect_NonConnectMode400(t *testing.T) {
store := credTestStore(t)
srv := brokerHTTPServer("xchg-srv", config.AuthBrokerModeTokenExchange)
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar())
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar())
r := credRouter(h, defaultAuthContext())

req := httptest.NewRequest(http.MethodGet, "/api/v1/user/credentials/xchg-srv/connect", http.NoBody)
Expand All @@ -292,7 +292,7 @@ func TestCredentialsConnectCallback_StoresCredential(t *testing.T) {
srv.AuthBroker.TokenEndpoint = ts.URL
sk := serverKeyFor(srv)

h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar())
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar())
r := credRouter(h, defaultAuthContext())

// Step 1: connect → capture state from the redirect.
Expand Down Expand Up @@ -325,7 +325,7 @@ func TestCredentialsCallback_DeniedByUpstream(t *testing.T) {
store := credTestStore(t)
srv := brokerHTTPServer("connect-srv", config.AuthBrokerModeOAuthConnect)
sk := serverKeyFor(srv)
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar())
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar())
r := credRouter(h, defaultAuthContext())

// Begin a flow to register a state.
Expand All @@ -350,7 +350,7 @@ func TestCredentialsCallback_DeniedByUpstream(t *testing.T) {
func TestCredentials_Unauthenticated(t *testing.T) {
store := credTestStore(t)
srv := brokerHTTPServer("shared-gh", config.AuthBrokerModeTokenExchange)
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, zap.NewNop().Sugar())
h := NewCredentialHandlers(store, []*config.ServerConfig{srv}, nil, zap.NewNop().Sugar())
// Empty auth context → unauthenticated.
r := credRouter(h, &auth.AuthContext{})

Expand Down
Loading
Loading