From 0d61d14b6b5ae306090ad7146a4cb3648c1c7098 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 10 Apr 2026 12:31:46 +0000 Subject: [PATCH 1/3] Initial plan From d182cf54dd229cdeceb9d064a7529a846039672e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 10 Apr 2026 12:54:54 +0000 Subject: [PATCH 2/3] refactor: consolidate small files and split gateway_logs.go per semantic clustering analysis Agent-Logs-Url: https://github.com/github/gh-aw/sessions/5f151eb8-d057-48b0-9cb8-9b407588e984 Co-authored-by: pelikhan <4175913+pelikhan@users.noreply.github.com> --- pkg/cli/gateway_logs.go | 1332 ----------------- pkg/cli/gateway_logs_metrics.go | 179 +++ pkg/cli/gateway_logs_parser.go | 329 ++++ pkg/cli/gateway_logs_render.go | 667 +++++++++ pkg/cli/gateway_logs_types.go | 207 +++ pkg/workflow/dependabot_wasm.go | 4 + pkg/workflow/docker_validation_wasm.go | 4 + pkg/workflow/git_helpers_wasm.go | 4 + pkg/workflow/github_cli_wasm.go | 4 + pkg/workflow/{add_labels.go => labels.go} | 24 + pkg/workflow/missing_data.go | 11 - pkg/workflow/missing_issue_reporting.go | 32 + pkg/workflow/missing_tool.go | 11 - pkg/workflow/npm_validation_wasm.go | 4 + pkg/workflow/pip_validation_wasm.go | 4 + pkg/workflow/remove_labels.go | 29 - pkg/workflow/report_incomplete.go | 27 - .../repository_features_validation_wasm.go | 4 + 18 files changed, 1466 insertions(+), 1410 deletions(-) delete mode 100644 pkg/cli/gateway_logs.go create mode 100644 pkg/cli/gateway_logs_metrics.go create mode 100644 pkg/cli/gateway_logs_parser.go create mode 100644 pkg/cli/gateway_logs_render.go create mode 100644 pkg/cli/gateway_logs_types.go rename pkg/workflow/{add_labels.go => labels.go} (51%) delete mode 100644 pkg/workflow/missing_data.go delete mode 100644 pkg/workflow/missing_tool.go delete mode 100644 pkg/workflow/remove_labels.go delete mode 100644 pkg/workflow/report_incomplete.go diff --git a/pkg/cli/gateway_logs.go b/pkg/cli/gateway_logs.go deleted file mode 100644 index 3bab3f8a0e9..00000000000 --- a/pkg/cli/gateway_logs.go +++ /dev/null @@ -1,1332 +0,0 @@ -// This file provides command-line interface functionality for gh-aw. -// This file (gateway_logs.go) contains functions for parsing and analyzing -// MCP gateway logs from gateway.jsonl or rpc-messages.jsonl files. -// -// Key responsibilities: -// - Parsing gateway.jsonl JSONL format logs (preferred) -// - Parsing rpc-messages.jsonl JSONL format logs (canonical fallback) -// - Extracting server and tool usage metrics -// - Aggregating gateway statistics -// - Rendering gateway metrics tables - -package cli - -import ( - "bufio" - "encoding/json" - "errors" - "fmt" - "os" - "path/filepath" - "sort" - "strconv" - "strings" - "time" - - "github.com/github/gh-aw/pkg/console" - "github.com/github/gh-aw/pkg/logger" - "github.com/github/gh-aw/pkg/sliceutil" - "github.com/github/gh-aw/pkg/timeutil" -) - -var gatewayLogsLog = logger.New("cli:gateway_logs") - -// maxScannerBufferSize is the maximum scanner buffer for large JSONL payloads (1 MB). -const maxScannerBufferSize = 1024 * 1024 - -// GatewayLogEntry represents a single log entry from gateway.jsonl -type GatewayLogEntry struct { - Timestamp string `json:"timestamp"` - Level string `json:"level"` - Type string `json:"type"` - Event string `json:"event"` - ServerName string `json:"server_name,omitempty"` - ServerID string `json:"server_id,omitempty"` // used by DIFC_FILTERED events - ToolName string `json:"tool_name,omitempty"` - Method string `json:"method,omitempty"` - Duration float64 `json:"duration,omitempty"` // in milliseconds - InputSize int `json:"input_size,omitempty"` - OutputSize int `json:"output_size,omitempty"` - Status string `json:"status,omitempty"` - Error string `json:"error,omitempty"` - Message string `json:"message,omitempty"` - Description string `json:"description,omitempty"` - Reason string `json:"reason,omitempty"` - SecrecyTags []string `json:"secrecy_tags,omitempty"` - IntegrityTags []string `json:"integrity_tags,omitempty"` - AuthorAssociation string `json:"author_association,omitempty"` - AuthorLogin string `json:"author_login,omitempty"` - HTMLURL string `json:"html_url,omitempty"` - Number string `json:"number,omitempty"` -} - -// DifcFilteredEvent represents a DIFC_FILTERED log entry from gateway.jsonl. -// These events occur when a tool call is blocked by DIFC integrity or secrecy checks. -type DifcFilteredEvent struct { - Timestamp string `json:"timestamp"` - ServerID string `json:"server_id"` - ToolName string `json:"tool_name"` - Description string `json:"description,omitempty"` - Reason string `json:"reason"` - SecrecyTags []string `json:"secrecy_tags,omitempty"` - IntegrityTags []string `json:"integrity_tags,omitempty"` - AuthorAssociation string `json:"author_association,omitempty"` - AuthorLogin string `json:"author_login,omitempty"` - HTMLURL string `json:"html_url,omitempty"` - Number string `json:"number,omitempty"` -} - -// Guard policy error codes from MCP Gateway. -// These JSON-RPC error codes indicate guard policy enforcement decisions. -const ( - guardPolicyErrorCodeAccessDenied = -32001 // General access denied - guardPolicyErrorCodeRepoNotAllowed = -32002 // Repository not in allowlist (repos) - guardPolicyErrorCodeInsufficientPerms = -32003 // Insufficient permissions (roles) - guardPolicyErrorCodePrivateRepoDenied = -32004 // Private repository access denied - guardPolicyErrorCodeBlockedUser = -32005 // Content from blocked user - guardPolicyErrorCodeIntegrityBelowMin = -32006 // Content integrity below minimum threshold (min-integrity) -) - -// GuardPolicyEvent represents a guard policy enforcement decision from the MCP Gateway. -// These events are extracted from JSON-RPC error responses with specific error codes -// (-32001 to -32006) in rpc-messages.jsonl. -type GuardPolicyEvent struct { - Timestamp string `json:"timestamp"` - ServerID string `json:"server_id"` - ToolName string `json:"tool_name"` - ErrorCode int `json:"error_code"` - Reason string `json:"reason"` // e.g., "repository_not_allowed", "min_integrity" - Message string `json:"message"` // Error message from JSON-RPC response - Details string `json:"details,omitempty"` // Additional details from error data - Repository string `json:"repository,omitempty"` // Repository involved (for repo scope blocks) -} - -// isGuardPolicyErrorCode returns true if the JSON-RPC error code indicates a -// guard policy enforcement decision. -func isGuardPolicyErrorCode(code int) bool { - return code >= guardPolicyErrorCodeIntegrityBelowMin && code <= guardPolicyErrorCodeAccessDenied -} - -// guardPolicyReasonFromCode returns a human-readable reason string for a guard policy error code. -func guardPolicyReasonFromCode(code int) string { - switch code { - case guardPolicyErrorCodeAccessDenied: - return "access_denied" - case guardPolicyErrorCodeRepoNotAllowed: - return "repo_not_allowed" - case guardPolicyErrorCodeInsufficientPerms: - return "insufficient_permissions" - case guardPolicyErrorCodePrivateRepoDenied: - return "private_repo_denied" - case guardPolicyErrorCodeBlockedUser: - return "blocked_user" - case guardPolicyErrorCodeIntegrityBelowMin: - return "integrity_below_minimum" - default: - return "unknown" - } -} - -// GatewayServerMetrics represents usage metrics for a single MCP server -type GatewayServerMetrics struct { - ServerName string - RequestCount int - ToolCallCount int - TotalDuration float64 // in milliseconds - ErrorCount int - FilteredCount int // number of DIFC_FILTERED events for this server - GuardPolicyBlocked int // number of tool calls blocked by guard policies for this server - Tools map[string]*GatewayToolMetrics -} - -// GatewayToolMetrics represents usage metrics for a specific tool -type GatewayToolMetrics struct { - ToolName string - CallCount int - TotalDuration float64 // in milliseconds - AvgDuration float64 // in milliseconds - MaxDuration float64 // in milliseconds - MinDuration float64 // in milliseconds - ErrorCount int - TotalInputSize int - TotalOutputSize int -} - -// GatewayMetrics represents aggregated metrics from gateway logs -type GatewayMetrics struct { - TotalRequests int - TotalToolCalls int - TotalErrors int - TotalFiltered int // number of DIFC_FILTERED events - TotalGuardBlocked int // number of tool calls blocked by guard policies - Servers map[string]*GatewayServerMetrics - FilteredEvents []DifcFilteredEvent - GuardPolicyEvents []GuardPolicyEvent - StartTime time.Time - EndTime time.Time - TotalDuration float64 // in milliseconds -} - -// RPCMessageEntry represents a single entry from rpc-messages.jsonl. -// This file is written by the Copilot CLI and contains raw JSON-RPC protocol messages -// exchanged between the AI engine and MCP servers, as well as DIFC_FILTERED events. -type RPCMessageEntry struct { - Timestamp string `json:"timestamp"` - Direction string `json:"direction"` // "IN" = received from server, "OUT" = sent to server; empty for DIFC_FILTERED - Type string `json:"type"` // "REQUEST", "RESPONSE", or "DIFC_FILTERED" - ServerID string `json:"server_id"` - Payload json.RawMessage `json:"payload"` - // Fields populated only for DIFC_FILTERED entries - ToolName string `json:"tool_name,omitempty"` - Description string `json:"description,omitempty"` - Reason string `json:"reason,omitempty"` - SecrecyTags []string `json:"secrecy_tags,omitempty"` - IntegrityTags []string `json:"integrity_tags,omitempty"` - AuthorAssociation string `json:"author_association,omitempty"` - AuthorLogin string `json:"author_login,omitempty"` - HTMLURL string `json:"html_url,omitempty"` - Number string `json:"number,omitempty"` -} - -// rpcRequestPayload represents the JSON-RPC request payload fields we care about. -type rpcRequestPayload struct { - Method string `json:"method"` - ID any `json:"id"` - Params json.RawMessage `json:"params"` -} - -// rpcToolCallParams represents the params for a tools/call request. -type rpcToolCallParams struct { - Name string `json:"name"` -} - -// rpcResponsePayload represents the JSON-RPC response payload fields we care about. -type rpcResponsePayload struct { - ID any `json:"id"` - Error *rpcError `json:"error,omitempty"` -} - -// rpcError represents a JSON-RPC error object. -type rpcError struct { - Code int `json:"code"` - Message string `json:"message"` - Data *rpcErrorData `json:"data,omitempty"` -} - -// rpcErrorData represents the optional data field in a JSON-RPC error, used by -// guard policy enforcement to communicate the reason and context for a denial. -type rpcErrorData struct { - Reason string `json:"reason,omitempty"` - Repository string `json:"repository,omitempty"` - Details string `json:"details,omitempty"` -} - -// rpcPendingRequest tracks an in-flight tool call for duration calculation. -type rpcPendingRequest struct { - ServerID string - ToolName string - Timestamp time.Time -} - -// parseRPCMessages parses a rpc-messages.jsonl file and extracts GatewayMetrics. -// This is the canonical fallback when gateway.jsonl is not available. -func parseRPCMessages(logPath string, verbose bool) (*GatewayMetrics, error) { - gatewayLogsLog.Printf("Parsing rpc-messages.jsonl from: %s", logPath) - - file, err := os.Open(logPath) - if err != nil { - return nil, fmt.Errorf("failed to open rpc-messages.jsonl: %w", err) - } - defer file.Close() - - metrics := &GatewayMetrics{ - Servers: make(map[string]*GatewayServerMetrics), - } - - // Track pending requests by (serverID, id) for duration calculation. - // Key format: "/" - pendingRequests := make(map[string]*rpcPendingRequest) - - scanner := bufio.NewScanner(file) - // Increase scanner buffer for large payloads - buf := make([]byte, maxScannerBufferSize) - scanner.Buffer(buf, maxScannerBufferSize) - lineNum := 0 - - for scanner.Scan() { - lineNum++ - line := strings.TrimSpace(scanner.Text()) - if line == "" { - continue - } - - var entry RPCMessageEntry - if err := json.Unmarshal([]byte(line), &entry); err != nil { - gatewayLogsLog.Printf("Failed to parse rpc-messages.jsonl line %d: %v", lineNum, err) - if verbose { - fmt.Fprintln(os.Stderr, console.FormatWarningMessage( - fmt.Sprintf("Failed to parse rpc-messages.jsonl line %d: %v", lineNum, err))) - } - continue - } - - // Update time range - if entry.Timestamp != "" { - if t, err := time.Parse(time.RFC3339Nano, entry.Timestamp); err == nil { - if metrics.StartTime.IsZero() || t.Before(metrics.StartTime) { - metrics.StartTime = t - } - if metrics.EndTime.IsZero() || t.After(metrics.EndTime) { - metrics.EndTime = t - } - } - } - - if entry.ServerID == "" { - continue - } - - switch { - case entry.Type == "DIFC_FILTERED": - // DIFC integrity/secrecy filter event — not a REQUEST or RESPONSE - metrics.TotalFiltered++ - server := getOrCreateServer(metrics, entry.ServerID) - server.FilteredCount++ - metrics.FilteredEvents = append(metrics.FilteredEvents, DifcFilteredEvent{ - Timestamp: entry.Timestamp, - ServerID: entry.ServerID, - ToolName: entry.ToolName, - Description: entry.Description, - Reason: entry.Reason, - SecrecyTags: entry.SecrecyTags, - IntegrityTags: entry.IntegrityTags, - AuthorAssociation: entry.AuthorAssociation, - AuthorLogin: entry.AuthorLogin, - HTMLURL: entry.HTMLURL, - Number: entry.Number, - }) - - case entry.Direction == "OUT" && entry.Type == "REQUEST": - // Outgoing request from AI engine to MCP server - var req rpcRequestPayload - if err := json.Unmarshal(entry.Payload, &req); err != nil { - continue - } - if req.Method != "tools/call" { - continue - } - - // Extract tool name - var params rpcToolCallParams - if err := json.Unmarshal(req.Params, ¶ms); err != nil || params.Name == "" { - continue - } - - metrics.TotalRequests++ - server := getOrCreateServer(metrics, entry.ServerID) - server.RequestCount++ - metrics.TotalToolCalls++ - server.ToolCallCount++ - - tool := getOrCreateTool(server, params.Name) - tool.CallCount++ - - // Store pending request for duration calculation - if req.ID != nil && entry.Timestamp != "" { - if t, err := time.Parse(time.RFC3339Nano, entry.Timestamp); err == nil { - key := fmt.Sprintf("%s/%v", entry.ServerID, req.ID) - pendingRequests[key] = &rpcPendingRequest{ - ServerID: entry.ServerID, - ToolName: params.Name, - Timestamp: t, - } - } - } - - case entry.Direction == "IN" && entry.Type == "RESPONSE": - // Incoming response from MCP server to AI engine - var resp rpcResponsePayload - if err := json.Unmarshal(entry.Payload, &resp); err != nil { - continue - } - - // Track errors and detect guard policy blocks - if resp.Error != nil { - metrics.TotalErrors++ - server := getOrCreateServer(metrics, entry.ServerID) - server.ErrorCount++ - - // Detect guard policy enforcement errors - if isGuardPolicyErrorCode(resp.Error.Code) { - metrics.TotalGuardBlocked++ - server.GuardPolicyBlocked++ - - // Determine tool name from pending request if available - toolName := "" - if resp.ID != nil { - key := fmt.Sprintf("%s/%v", entry.ServerID, resp.ID) - if pending, ok := pendingRequests[key]; ok { - toolName = pending.ToolName - } - } - - reason := guardPolicyReasonFromCode(resp.Error.Code) - if resp.Error.Data != nil && resp.Error.Data.Reason != "" { - reason = resp.Error.Data.Reason - } - - evt := GuardPolicyEvent{ - Timestamp: entry.Timestamp, - ServerID: entry.ServerID, - ToolName: toolName, - ErrorCode: resp.Error.Code, - Reason: reason, - Message: resp.Error.Message, - } - if resp.Error.Data != nil { - evt.Details = resp.Error.Data.Details - evt.Repository = resp.Error.Data.Repository - } - metrics.GuardPolicyEvents = append(metrics.GuardPolicyEvents, evt) - } - } - - // Calculate duration by matching with pending request - if resp.ID != nil && entry.Timestamp != "" { - key := fmt.Sprintf("%s/%v", entry.ServerID, resp.ID) - if pending, ok := pendingRequests[key]; ok { - delete(pendingRequests, key) - if t, err := time.Parse(time.RFC3339Nano, entry.Timestamp); err == nil { - durationMs := float64(t.Sub(pending.Timestamp).Milliseconds()) - if durationMs >= 0 { - server := getOrCreateServer(metrics, entry.ServerID) - server.TotalDuration += durationMs - metrics.TotalDuration += durationMs - - tool := getOrCreateTool(server, pending.ToolName) - tool.TotalDuration += durationMs - if tool.MaxDuration == 0 || durationMs > tool.MaxDuration { - tool.MaxDuration = durationMs - } - if tool.MinDuration == 0 || durationMs < tool.MinDuration { - tool.MinDuration = durationMs - } - - if resp.Error != nil { - tool.ErrorCount++ - } - } - } - } - } - } - } - - if err := scanner.Err(); err != nil { - return nil, fmt.Errorf("error reading rpc-messages.jsonl: %w", err) - } - - calculateGatewayAggregates(metrics) - - gatewayLogsLog.Printf("Successfully parsed rpc-messages.jsonl: %d servers, %d total requests", - len(metrics.Servers), metrics.TotalRequests) - - return metrics, nil -} - -// findRPCMessagesPath returns the path to rpc-messages.jsonl if it exists, or "" if not found. -func findRPCMessagesPath(logDir string) string { - // Check mcp-logs subdirectory (standard location) - mcpLogsPath := filepath.Join(logDir, "mcp-logs", "rpc-messages.jsonl") - if _, err := os.Stat(mcpLogsPath); err == nil { - return mcpLogsPath - } - // Check root directory as fallback - rootPath := filepath.Join(logDir, "rpc-messages.jsonl") - if _, err := os.Stat(rootPath); err == nil { - return rootPath - } - return "" -} - -// parseGatewayLogs parses a gateway.jsonl file and extracts metrics. -// Falls back to rpc-messages.jsonl (canonical fallback) when gateway.jsonl is not present. -func parseGatewayLogs(logDir string, verbose bool) (*GatewayMetrics, error) { - // Try root directory first (for older logs where gateway.jsonl was in the root) - gatewayLogPath := filepath.Join(logDir, "gateway.jsonl") - - // Check if gateway.jsonl exists in root - if _, err := os.Stat(gatewayLogPath); os.IsNotExist(err) { - // Try mcp-logs subdirectory (new path after artifact download) - // Gateway logs are uploaded from /tmp/gh-aw/mcp-logs/gateway.jsonl and the common parent - // /tmp/gh-aw/ is stripped during artifact upload, resulting in mcp-logs/gateway.jsonl after download - mcpLogsPath := filepath.Join(logDir, "mcp-logs", "gateway.jsonl") - if _, err := os.Stat(mcpLogsPath); os.IsNotExist(err) { - // Fall back to rpc-messages.jsonl (canonical fallback when gateway.jsonl is missing) - rpcPath := findRPCMessagesPath(logDir) - if rpcPath != "" { - gatewayLogsLog.Printf("gateway.jsonl not found; falling back to rpc-messages.jsonl: %s", rpcPath) - return parseRPCMessages(rpcPath, verbose) - } - gatewayLogsLog.Printf("gateway.jsonl not found at: %s or %s", gatewayLogPath, mcpLogsPath) - return nil, errors.New("gateway.jsonl not found") - } - gatewayLogPath = mcpLogsPath - gatewayLogsLog.Printf("Found gateway.jsonl in mcp-logs subdirectory") - } - - gatewayLogsLog.Printf("Parsing gateway.jsonl from: %s", gatewayLogPath) - - file, err := os.Open(gatewayLogPath) - if err != nil { - return nil, fmt.Errorf("failed to open gateway.jsonl: %w", err) - } - defer file.Close() - - metrics := &GatewayMetrics{ - Servers: make(map[string]*GatewayServerMetrics), - } - - scanner := bufio.NewScanner(file) - buf := make([]byte, maxScannerBufferSize) - scanner.Buffer(buf, maxScannerBufferSize) - lineNum := 0 - - for scanner.Scan() { - lineNum++ - line := strings.TrimSpace(scanner.Text()) - - // Skip empty lines - if line == "" { - continue - } - - var entry GatewayLogEntry - if err := json.Unmarshal([]byte(line), &entry); err != nil { - gatewayLogsLog.Printf("Failed to parse line %d: %v", lineNum, err) - if verbose { - fmt.Fprintln(os.Stderr, console.FormatWarningMessage(fmt.Sprintf("Failed to parse gateway.jsonl line %d: %v", lineNum, err))) - } - continue - } - - // Process the entry based on its type/event - processGatewayLogEntry(&entry, metrics, verbose) - } - - if err := scanner.Err(); err != nil { - return nil, fmt.Errorf("error reading gateway.jsonl: %w", err) - } - - // Calculate aggregate statistics - calculateGatewayAggregates(metrics) - - gatewayLogsLog.Printf("Successfully parsed gateway.jsonl: %d servers, %d total requests", - len(metrics.Servers), metrics.TotalRequests) - - return metrics, nil -} - -// processGatewayLogEntry processes a single log entry and updates metrics -func processGatewayLogEntry(entry *GatewayLogEntry, metrics *GatewayMetrics, verbose bool) { - // Parse timestamp for time range (supports both RFC3339 and RFC3339Nano) - if entry.Timestamp != "" { - t, err := time.Parse(time.RFC3339Nano, entry.Timestamp) - if err != nil { - t, err = time.Parse(time.RFC3339, entry.Timestamp) - } - if err == nil { - if metrics.StartTime.IsZero() || t.Before(metrics.StartTime) { - metrics.StartTime = t - } - if metrics.EndTime.IsZero() || t.After(metrics.EndTime) { - metrics.EndTime = t - } - } - } - - // Handle DIFC_FILTERED events - if entry.Type == "DIFC_FILTERED" { - metrics.TotalFiltered++ - // DIFC_FILTERED events use server_id; fall back to server_name for compatibility - serverKey := entry.ServerID - if serverKey == "" { - serverKey = entry.ServerName - } - if serverKey != "" { - server := getOrCreateServer(metrics, serverKey) - server.FilteredCount++ - } - metrics.FilteredEvents = append(metrics.FilteredEvents, DifcFilteredEvent{ - Timestamp: entry.Timestamp, - ServerID: serverKey, - ToolName: entry.ToolName, - Description: entry.Description, - Reason: entry.Reason, - SecrecyTags: entry.SecrecyTags, - IntegrityTags: entry.IntegrityTags, - AuthorAssociation: entry.AuthorAssociation, - AuthorLogin: entry.AuthorLogin, - HTMLURL: entry.HTMLURL, - Number: entry.Number, - }) - return - } - - // Handle GUARD_POLICY_BLOCKED events from gateway.jsonl - if entry.Type == "GUARD_POLICY_BLOCKED" { - metrics.TotalGuardBlocked++ - serverKey := entry.ServerID - if serverKey == "" { - serverKey = entry.ServerName - } - if serverKey != "" { - server := getOrCreateServer(metrics, serverKey) - server.GuardPolicyBlocked++ - } - metrics.GuardPolicyEvents = append(metrics.GuardPolicyEvents, GuardPolicyEvent{ - Timestamp: entry.Timestamp, - ServerID: serverKey, - ToolName: entry.ToolName, - Reason: entry.Reason, - Message: entry.Message, - Details: entry.Description, - }) - return - } - - // Track errors - if entry.Status == "error" || entry.Error != "" { - metrics.TotalErrors++ - if entry.ServerName != "" { - server := getOrCreateServer(metrics, entry.ServerName) - server.ErrorCount++ - - if entry.ToolName != "" { - tool := getOrCreateTool(server, entry.ToolName) - tool.ErrorCount++ - } - } - } - - // Process based on event type - switch entry.Event { - case "request", "tool_call", "rpc_call": - metrics.TotalRequests++ - - if entry.ServerName != "" { - server := getOrCreateServer(metrics, entry.ServerName) - server.RequestCount++ - - if entry.Duration > 0 { - server.TotalDuration += entry.Duration - metrics.TotalDuration += entry.Duration - } - - // Track tool calls - if entry.ToolName != "" || entry.Method != "" { - toolName := entry.ToolName - if toolName == "" { - toolName = entry.Method - } - - metrics.TotalToolCalls++ - server.ToolCallCount++ - - tool := getOrCreateTool(server, toolName) - tool.CallCount++ - - if entry.Duration > 0 { - tool.TotalDuration += entry.Duration - if tool.MaxDuration == 0 || entry.Duration > tool.MaxDuration { - tool.MaxDuration = entry.Duration - } - if tool.MinDuration == 0 || entry.Duration < tool.MinDuration { - tool.MinDuration = entry.Duration - } - } - - if entry.InputSize > 0 { - tool.TotalInputSize += entry.InputSize - } - if entry.OutputSize > 0 { - tool.TotalOutputSize += entry.OutputSize - } - } - } - } -} - -// getOrCreateServer gets or creates a server metrics entry -func getOrCreateServer(metrics *GatewayMetrics, serverName string) *GatewayServerMetrics { - if server, exists := metrics.Servers[serverName]; exists { - return server - } - - server := &GatewayServerMetrics{ - ServerName: serverName, - Tools: make(map[string]*GatewayToolMetrics), - } - metrics.Servers[serverName] = server - return server -} - -// getOrCreateTool gets or creates a tool metrics entry -func getOrCreateTool(server *GatewayServerMetrics, toolName string) *GatewayToolMetrics { - if tool, exists := server.Tools[toolName]; exists { - return tool - } - - tool := &GatewayToolMetrics{ - ToolName: toolName, - } - server.Tools[toolName] = tool - return tool -} - -// calculateGatewayAggregates calculates aggregate statistics -func calculateGatewayAggregates(metrics *GatewayMetrics) { - for _, server := range metrics.Servers { - for _, tool := range server.Tools { - if tool.CallCount > 0 { - tool.AvgDuration = tool.TotalDuration / float64(tool.CallCount) - } - } - } -} - -// renderGatewayMetricsTable renders gateway metrics as a console table -func renderGatewayMetricsTable(metrics *GatewayMetrics, verbose bool) string { - if metrics == nil || len(metrics.Servers) == 0 { - return "" - } - - var output strings.Builder - - output.WriteString("\n") - output.WriteString(console.FormatInfoMessage("MCP Gateway Metrics")) - output.WriteString("\n\n") - - // Summary statistics - fmt.Fprintf(&output, "Total Requests: %d\n", metrics.TotalRequests) - fmt.Fprintf(&output, "Total Tool Calls: %d\n", metrics.TotalToolCalls) - fmt.Fprintf(&output, "Total Errors: %d\n", metrics.TotalErrors) - if metrics.TotalFiltered > 0 { - fmt.Fprintf(&output, "Total DIFC Filtered: %d\n", metrics.TotalFiltered) - } - if metrics.TotalGuardBlocked > 0 { - fmt.Fprintf(&output, "Total Guard Policy Blocked: %d\n", metrics.TotalGuardBlocked) - } - fmt.Fprintf(&output, "Servers: %d\n", len(metrics.Servers)) - - if !metrics.StartTime.IsZero() && !metrics.EndTime.IsZero() { - duration := metrics.EndTime.Sub(metrics.StartTime) - fmt.Fprintf(&output, "Time Range: %s\n", duration.Round(time.Second)) - } - - output.WriteString("\n") - - // Server metrics table - if len(metrics.Servers) > 0 { - // Sort servers by request count - serverNames := getSortedServerNames(metrics) - - hasFiltered := metrics.TotalFiltered > 0 - hasGuardPolicy := metrics.TotalGuardBlocked > 0 - serverRows := make([][]string, 0, len(serverNames)) - for _, serverName := range serverNames { - server := metrics.Servers[serverName] - avgTime := 0.0 - if server.RequestCount > 0 { - avgTime = server.TotalDuration / float64(server.RequestCount) - } - row := []string{ - serverName, - strconv.Itoa(server.RequestCount), - strconv.Itoa(server.ToolCallCount), - fmt.Sprintf("%.0fms", avgTime), - strconv.Itoa(server.ErrorCount), - } - if hasFiltered { - row = append(row, strconv.Itoa(server.FilteredCount)) - } - if hasGuardPolicy { - row = append(row, strconv.Itoa(server.GuardPolicyBlocked)) - } - serverRows = append(serverRows, row) - } - - headers := []string{"Server", "Requests", "Tool Calls", "Avg Time", "Errors"} - if hasFiltered { - headers = append(headers, "Filtered") - } - if hasGuardPolicy { - headers = append(headers, "Guard Blocked") - } - output.WriteString(console.RenderTable(console.TableConfig{ - Title: "Server Usage", - Headers: headers, - Rows: serverRows, - })) - } - - // DIFC filtered events table - if len(metrics.FilteredEvents) > 0 { - output.WriteString("\n") - filteredRows := make([][]string, 0, len(metrics.FilteredEvents)) - for _, fe := range metrics.FilteredEvents { - reason := fe.Reason - if len(reason) > 80 { - reason = reason[:77] + "..." - } - filteredRows = append(filteredRows, []string{ - fe.ServerID, - fe.ToolName, - fe.AuthorLogin, - reason, - }) - } - output.WriteString(console.RenderTable(console.TableConfig{ - Title: "DIFC Filtered Events", - Headers: []string{"Server", "Tool", "User", "Reason"}, - Rows: filteredRows, - })) - } - - // Guard policy events table - if len(metrics.GuardPolicyEvents) > 0 { - output.WriteString("\n") - guardRows := make([][]string, 0, len(metrics.GuardPolicyEvents)) - for _, gpe := range metrics.GuardPolicyEvents { - message := gpe.Message - if len(message) > 60 { - message = message[:57] + "..." - } - repo := gpe.Repository - if repo == "" { - repo = "-" - } - guardRows = append(guardRows, []string{ - gpe.ServerID, - gpe.ToolName, - gpe.Reason, - message, - repo, - }) - } - output.WriteString(console.RenderTable(console.TableConfig{ - Title: "Guard Policy Blocked Events", - Headers: []string{"Server", "Tool", "Reason", "Message", "Repository"}, - Rows: guardRows, - })) - } - - // Tool metrics table (if verbose) - if verbose { - output.WriteString("\n") - output.WriteString("Tool Usage Details:\n") - - for _, serverName := range getSortedServerNames(metrics) { - server := metrics.Servers[serverName] - if len(server.Tools) == 0 { - continue - } - - // Sort tools by call count - toolNames := sliceutil.MapToSlice(server.Tools) - sort.Slice(toolNames, func(i, j int) bool { - return server.Tools[toolNames[i]].CallCount > server.Tools[toolNames[j]].CallCount - }) - - toolRows := make([][]string, 0, len(toolNames)) - for _, toolName := range toolNames { - tool := server.Tools[toolName] - toolRows = append(toolRows, []string{ - toolName, - strconv.Itoa(tool.CallCount), - fmt.Sprintf("%.0fms", tool.AvgDuration), - fmt.Sprintf("%.0fms", tool.MaxDuration), - strconv.Itoa(tool.ErrorCount), - }) - } - - output.WriteString(console.RenderTable(console.TableConfig{ - Title: serverName, - Headers: []string{"Tool", "Calls", "Avg Time", "Max Time", "Errors"}, - Rows: toolRows, - })) - } - } - - return output.String() -} - -// getSortedServerNames returns server names sorted by request count -func getSortedServerNames(metrics *GatewayMetrics) []string { - names := sliceutil.MapToSlice(metrics.Servers) - sort.Slice(names, func(i, j int) bool { - return metrics.Servers[names[i]].RequestCount > metrics.Servers[names[j]].RequestCount - }) - return names -} - -// buildToolCallsFromRPCMessages reads rpc-messages.jsonl and builds MCPToolCall records. -// Duration is computed by pairing outgoing requests with incoming responses. -// Input/output sizes are not available in rpc-messages.jsonl and will be 0. -func buildToolCallsFromRPCMessages(logPath string) ([]MCPToolCall, error) { - file, err := os.Open(logPath) - if err != nil { - return nil, fmt.Errorf("failed to open rpc-messages.jsonl: %w", err) - } - defer file.Close() - - type pendingCall struct { - serverID string - toolName string - timestamp time.Time - } - pending := make(map[string]*pendingCall) // key: "/" - - // Collect requests first to pair with responses - type rawEntry struct { - entry RPCMessageEntry - req rpcRequestPayload - resp rpcResponsePayload - valid bool - } - var entries []rawEntry - - scanner := bufio.NewScanner(file) - buf := make([]byte, maxScannerBufferSize) - scanner.Buffer(buf, maxScannerBufferSize) - - for scanner.Scan() { - line := strings.TrimSpace(scanner.Text()) - if line == "" { - continue - } - var e RPCMessageEntry - if err := json.Unmarshal([]byte(line), &e); err != nil { - continue - } - entries = append(entries, rawEntry{entry: e, valid: true}) - } - if err := scanner.Err(); err != nil { - return nil, fmt.Errorf("error reading rpc-messages.jsonl: %w", err) - } - - // Second pass: build MCPToolCall records. - // Declared before first pass so requests without IDs can be appended immediately. - var toolCalls []MCPToolCall - processedKeys := make(map[string]bool) - - // First pass: index outgoing tool-call requests by (serverID, id) - for i := range entries { - e := &entries[i] - if e.entry.Direction != "OUT" || e.entry.Type != "REQUEST" { - continue - } - if err := json.Unmarshal(e.entry.Payload, &e.req); err != nil || e.req.Method != "tools/call" { - continue - } - var params rpcToolCallParams - if err := json.Unmarshal(e.req.Params, ¶ms); err != nil || params.Name == "" { - continue - } - if e.req.ID == nil { - // Requests without an ID cannot be matched to responses. - // Emit the tool call immediately with "unknown" status so it appears - // in the tool_calls list (same as parseRPCMessages counts it in the summary). - toolCalls = append(toolCalls, MCPToolCall{ - Timestamp: e.entry.Timestamp, - ServerName: e.entry.ServerID, - ToolName: params.Name, - Status: "unknown", - }) - continue - } - t, err := time.Parse(time.RFC3339Nano, e.entry.Timestamp) - if err != nil { - continue - } - key := fmt.Sprintf("%s/%v", e.entry.ServerID, e.req.ID) - pending[key] = &pendingCall{ - serverID: e.entry.ServerID, - toolName: params.Name, - timestamp: t, - } - } - - // Second pass: pair responses with pending requests to compute durations - - for i := range entries { - e := &entries[i] - switch { - case e.entry.Direction == "OUT" && e.entry.Type == "REQUEST": - // Outgoing tool-call request – we'll emit the record when we see the response - // (or after if no response found) - case e.entry.Direction == "IN" && e.entry.Type == "RESPONSE": - if err := json.Unmarshal(e.entry.Payload, &e.resp); err != nil { - continue - } - if e.resp.ID == nil { - continue - } - key := fmt.Sprintf("%s/%v", e.entry.ServerID, e.resp.ID) - p, ok := pending[key] - if !ok { - continue - } - processedKeys[key] = true - - call := MCPToolCall{ - Timestamp: p.timestamp.Format(time.RFC3339Nano), - ServerName: p.serverID, - ToolName: p.toolName, - Status: "success", - } - if e.resp.Error != nil { - call.Status = "error" - call.Error = e.resp.Error.Message - } - if t, err := time.Parse(time.RFC3339Nano, e.entry.Timestamp); err == nil { - d := t.Sub(p.timestamp) - if d >= 0 { - call.Duration = timeutil.FormatDuration(d) - } - } - toolCalls = append(toolCalls, call) - } - } - - // Emit any requests that never received a response - for key, p := range pending { - if !processedKeys[key] { - toolCalls = append(toolCalls, MCPToolCall{ - Timestamp: p.timestamp.Format(time.RFC3339Nano), - ServerName: p.serverID, - ToolName: p.toolName, - Status: "unknown", - }) - } - } - - return toolCalls, nil -} - -// extractMCPToolUsageData creates detailed MCP tool usage data from gateway metrics -func extractMCPToolUsageData(logDir string, verbose bool) (*MCPToolUsageData, error) { - // Parse gateway logs (falls back to rpc-messages.jsonl automatically) - gatewayMetrics, err := parseGatewayLogs(logDir, verbose) - if err != nil { - // Return nil if no log file exists (not an error for workflows without MCP) - if strings.Contains(err.Error(), "not found") { - return nil, nil - } - return nil, fmt.Errorf("failed to parse gateway logs: %w", err) - } - - if gatewayMetrics == nil || len(gatewayMetrics.Servers) == 0 { - return nil, nil - } - - mcpData := &MCPToolUsageData{ - Summary: []MCPToolSummary{}, - ToolCalls: []MCPToolCall{}, - Servers: []MCPServerStats{}, - FilteredEvents: gatewayMetrics.FilteredEvents, - } - - // Build guard policy summary if there are guard policy events - if len(gatewayMetrics.GuardPolicyEvents) > 0 { - mcpData.GuardPolicySummary = buildGuardPolicySummary(gatewayMetrics) - } - - // Read the log file again to get individual tool call records. - // Prefer gateway.jsonl; fall back to rpc-messages.jsonl when not available. - gatewayLogPath := filepath.Join(logDir, "gateway.jsonl") - usingRPCMessages := false - - if _, err := os.Stat(gatewayLogPath); os.IsNotExist(err) { - mcpLogsPath := filepath.Join(logDir, "mcp-logs", "gateway.jsonl") - if _, err := os.Stat(mcpLogsPath); os.IsNotExist(err) { - // Fall back to rpc-messages.jsonl - rpcPath := findRPCMessagesPath(logDir) - if rpcPath == "" { - return nil, errors.New("gateway.jsonl not found") - } - gatewayLogPath = rpcPath - usingRPCMessages = true - } else { - gatewayLogPath = mcpLogsPath - } - } - - if usingRPCMessages { - // Build tool call records from rpc-messages.jsonl - toolCalls, err := buildToolCallsFromRPCMessages(gatewayLogPath) - if err != nil { - return nil, fmt.Errorf("failed to read rpc-messages.jsonl: %w", err) - } - mcpData.ToolCalls = toolCalls - } else { - file, err := os.Open(gatewayLogPath) - if err != nil { - return nil, fmt.Errorf("failed to open gateway.jsonl: %w", err) - } - defer file.Close() - - scanner := bufio.NewScanner(file) - buf := make([]byte, maxScannerBufferSize) - scanner.Buffer(buf, maxScannerBufferSize) - for scanner.Scan() { - line := strings.TrimSpace(scanner.Text()) - if line == "" { - continue - } - - var entry GatewayLogEntry - if err := json.Unmarshal([]byte(line), &entry); err != nil { - continue // Skip malformed lines - } - - // Only process tool call events - if entry.Event == "tool_call" || entry.Event == "rpc_call" || entry.Event == "request" { - toolName := entry.ToolName - if toolName == "" { - toolName = entry.Method - } - - // Skip entries without tool information - if entry.ServerName == "" || toolName == "" { - continue - } - - // Create individual tool call record - toolCall := MCPToolCall{ - Timestamp: entry.Timestamp, - ServerName: entry.ServerName, - ToolName: toolName, - Method: entry.Method, - InputSize: entry.InputSize, - OutputSize: entry.OutputSize, - Status: entry.Status, - Error: entry.Error, - } - - if entry.Duration > 0 { - toolCall.Duration = timeutil.FormatDuration(time.Duration(entry.Duration * float64(time.Millisecond))) - } - - mcpData.ToolCalls = append(mcpData.ToolCalls, toolCall) - } - } - - if err := scanner.Err(); err != nil { - return nil, fmt.Errorf("error reading gateway.jsonl: %w", err) - } - } - - // Build summary statistics from aggregated metrics - for serverName, serverMetrics := range gatewayMetrics.Servers { - // Server-level stats - serverStats := MCPServerStats{ - ServerName: serverName, - RequestCount: serverMetrics.RequestCount, - ToolCallCount: serverMetrics.ToolCallCount, - TotalInputSize: 0, - TotalOutputSize: 0, - ErrorCount: serverMetrics.ErrorCount, - } - - if serverMetrics.RequestCount > 0 { - avgDur := serverMetrics.TotalDuration / float64(serverMetrics.RequestCount) - serverStats.AvgDuration = timeutil.FormatDuration(time.Duration(avgDur * float64(time.Millisecond))) - } - - // Tool-level stats - for toolName, toolMetrics := range serverMetrics.Tools { - summary := MCPToolSummary{ - ServerName: serverName, - ToolName: toolName, - CallCount: toolMetrics.CallCount, - TotalInputSize: toolMetrics.TotalInputSize, - TotalOutputSize: toolMetrics.TotalOutputSize, - MaxInputSize: 0, // Will be calculated below - MaxOutputSize: 0, // Will be calculated below - ErrorCount: toolMetrics.ErrorCount, - } - - if toolMetrics.AvgDuration > 0 { - summary.AvgDuration = timeutil.FormatDuration(time.Duration(toolMetrics.AvgDuration * float64(time.Millisecond))) - } - if toolMetrics.MaxDuration > 0 { - summary.MaxDuration = timeutil.FormatDuration(time.Duration(toolMetrics.MaxDuration * float64(time.Millisecond))) - } - - // Calculate max input/output sizes from individual tool calls - for _, tc := range mcpData.ToolCalls { - if tc.ServerName == serverName && tc.ToolName == toolName { - if tc.InputSize > summary.MaxInputSize { - summary.MaxInputSize = tc.InputSize - } - if tc.OutputSize > summary.MaxOutputSize { - summary.MaxOutputSize = tc.OutputSize - } - } - } - - mcpData.Summary = append(mcpData.Summary, summary) - - // Update server totals - serverStats.TotalInputSize += toolMetrics.TotalInputSize - serverStats.TotalOutputSize += toolMetrics.TotalOutputSize - } - - mcpData.Servers = append(mcpData.Servers, serverStats) - } - - // Sort summaries by server name, then tool name - sort.Slice(mcpData.Summary, func(i, j int) bool { - if mcpData.Summary[i].ServerName != mcpData.Summary[j].ServerName { - return mcpData.Summary[i].ServerName < mcpData.Summary[j].ServerName - } - return mcpData.Summary[i].ToolName < mcpData.Summary[j].ToolName - }) - - // Sort servers by name - sort.Slice(mcpData.Servers, func(i, j int) bool { - return mcpData.Servers[i].ServerName < mcpData.Servers[j].ServerName - }) - - return mcpData, nil -} - -// buildGuardPolicySummary creates a GuardPolicySummary from GatewayMetrics. -func buildGuardPolicySummary(metrics *GatewayMetrics) *GuardPolicySummary { - summary := &GuardPolicySummary{ - TotalBlocked: metrics.TotalGuardBlocked, - Events: metrics.GuardPolicyEvents, - BlockedToolCounts: make(map[string]int), - BlockedServerCounts: make(map[string]int), - } - - for _, evt := range metrics.GuardPolicyEvents { - // Categorize by error code - switch evt.ErrorCode { - case guardPolicyErrorCodeIntegrityBelowMin: - summary.IntegrityBlocked++ - case guardPolicyErrorCodeRepoNotAllowed: - summary.RepoScopeBlocked++ - case guardPolicyErrorCodeAccessDenied: - summary.AccessDenied++ - case guardPolicyErrorCodeBlockedUser: - summary.BlockedUserDenied++ - case guardPolicyErrorCodeInsufficientPerms: - summary.PermissionDenied++ - case guardPolicyErrorCodePrivateRepoDenied: - summary.PrivateRepoDenied++ - } - - // Track per-tool blocked counts - if evt.ToolName != "" { - summary.BlockedToolCounts[evt.ToolName]++ - } - - // Track per-server blocked counts - if evt.ServerID != "" { - summary.BlockedServerCounts[evt.ServerID]++ - } - } - - return summary -} - -// displayAggregatedGatewayMetrics aggregates and displays gateway metrics across all processed runs -func displayAggregatedGatewayMetrics(processedRuns []ProcessedRun, outputDir string, verbose bool) { - // Aggregate gateway metrics from all runs - aggregated := &GatewayMetrics{ - Servers: make(map[string]*GatewayServerMetrics), - } - - runCount := 0 - for _, pr := range processedRuns { - runDir := pr.Run.LogsPath - if runDir == "" { - continue - } - - // Try to parse gateway.jsonl from this run - runMetrics, err := parseGatewayLogs(runDir, false) - if err != nil { - // Skip runs without gateway.jsonl (this is normal for runs without MCP gateway) - continue - } - - runCount++ - - // Merge metrics from this run into aggregated metrics - aggregated.TotalRequests += runMetrics.TotalRequests - aggregated.TotalToolCalls += runMetrics.TotalToolCalls - aggregated.TotalErrors += runMetrics.TotalErrors - aggregated.TotalFiltered += runMetrics.TotalFiltered - aggregated.TotalGuardBlocked += runMetrics.TotalGuardBlocked - aggregated.TotalDuration += runMetrics.TotalDuration - aggregated.FilteredEvents = append(aggregated.FilteredEvents, runMetrics.FilteredEvents...) - aggregated.GuardPolicyEvents = append(aggregated.GuardPolicyEvents, runMetrics.GuardPolicyEvents...) - - // Merge server metrics - for serverName, serverMetrics := range runMetrics.Servers { - aggServer := getOrCreateServer(aggregated, serverName) - aggServer.RequestCount += serverMetrics.RequestCount - aggServer.ToolCallCount += serverMetrics.ToolCallCount - aggServer.TotalDuration += serverMetrics.TotalDuration - aggServer.ErrorCount += serverMetrics.ErrorCount - aggServer.FilteredCount += serverMetrics.FilteredCount - aggServer.GuardPolicyBlocked += serverMetrics.GuardPolicyBlocked - - // Merge tool metrics - for toolName, toolMetrics := range serverMetrics.Tools { - aggTool := getOrCreateTool(aggServer, toolName) - aggTool.CallCount += toolMetrics.CallCount - aggTool.TotalDuration += toolMetrics.TotalDuration - aggTool.ErrorCount += toolMetrics.ErrorCount - aggTool.TotalInputSize += toolMetrics.TotalInputSize - aggTool.TotalOutputSize += toolMetrics.TotalOutputSize - - // Update max/min durations - if toolMetrics.MaxDuration > aggTool.MaxDuration { - aggTool.MaxDuration = toolMetrics.MaxDuration - } - if aggTool.MinDuration == 0 || (toolMetrics.MinDuration > 0 && toolMetrics.MinDuration < aggTool.MinDuration) { - aggTool.MinDuration = toolMetrics.MinDuration - } - } - } - - // Update time range - if aggregated.StartTime.IsZero() || (!runMetrics.StartTime.IsZero() && runMetrics.StartTime.Before(aggregated.StartTime)) { - aggregated.StartTime = runMetrics.StartTime - } - if aggregated.EndTime.IsZero() || (!runMetrics.EndTime.IsZero() && runMetrics.EndTime.After(aggregated.EndTime)) { - aggregated.EndTime = runMetrics.EndTime - } - } - - // Only display if we found gateway metrics - if runCount == 0 || len(aggregated.Servers) == 0 { - return - } - - // Recalculate averages for aggregated data - calculateGatewayAggregates(aggregated) - - // Display the aggregated metrics - if metricsOutput := renderGatewayMetricsTable(aggregated, verbose); metricsOutput != "" { - fmt.Fprint(os.Stderr, metricsOutput) - if runCount > 1 { - fmt.Fprintf(os.Stderr, "\n%s\n", - console.FormatInfoMessage(fmt.Sprintf("Gateway metrics aggregated from %d runs", runCount))) - } - } -} diff --git a/pkg/cli/gateway_logs_metrics.go b/pkg/cli/gateway_logs_metrics.go new file mode 100644 index 00000000000..0411f41a257 --- /dev/null +++ b/pkg/cli/gateway_logs_metrics.go @@ -0,0 +1,179 @@ +// This file provides metrics computation functions for MCP gateway logs. +// +// It processes individual log entries and aggregates statistics across servers and tools. +// +// Type definitions are in gateway_logs_types.go. +// Log file parsing is in gateway_logs_parser.go. +// Rendering functions are in gateway_logs_render.go. + +package cli + +import "time" + +// processGatewayLogEntry processes a single log entry and updates metrics +func processGatewayLogEntry(entry *GatewayLogEntry, metrics *GatewayMetrics, verbose bool) { + // Parse timestamp for time range (supports both RFC3339 and RFC3339Nano) + if entry.Timestamp != "" { + t, err := time.Parse(time.RFC3339Nano, entry.Timestamp) + if err != nil { + t, err = time.Parse(time.RFC3339, entry.Timestamp) + } + if err == nil { + if metrics.StartTime.IsZero() || t.Before(metrics.StartTime) { + metrics.StartTime = t + } + if metrics.EndTime.IsZero() || t.After(metrics.EndTime) { + metrics.EndTime = t + } + } + } + + // Handle DIFC_FILTERED events + if entry.Type == "DIFC_FILTERED" { + metrics.TotalFiltered++ + // DIFC_FILTERED events use server_id; fall back to server_name for compatibility + serverKey := entry.ServerID + if serverKey == "" { + serverKey = entry.ServerName + } + if serverKey != "" { + server := getOrCreateServer(metrics, serverKey) + server.FilteredCount++ + } + metrics.FilteredEvents = append(metrics.FilteredEvents, DifcFilteredEvent{ + Timestamp: entry.Timestamp, + ServerID: serverKey, + ToolName: entry.ToolName, + Description: entry.Description, + Reason: entry.Reason, + SecrecyTags: entry.SecrecyTags, + IntegrityTags: entry.IntegrityTags, + AuthorAssociation: entry.AuthorAssociation, + AuthorLogin: entry.AuthorLogin, + HTMLURL: entry.HTMLURL, + Number: entry.Number, + }) + return + } + + // Handle GUARD_POLICY_BLOCKED events from gateway.jsonl + if entry.Type == "GUARD_POLICY_BLOCKED" { + metrics.TotalGuardBlocked++ + serverKey := entry.ServerID + if serverKey == "" { + serverKey = entry.ServerName + } + if serverKey != "" { + server := getOrCreateServer(metrics, serverKey) + server.GuardPolicyBlocked++ + } + metrics.GuardPolicyEvents = append(metrics.GuardPolicyEvents, GuardPolicyEvent{ + Timestamp: entry.Timestamp, + ServerID: serverKey, + ToolName: entry.ToolName, + Reason: entry.Reason, + Message: entry.Message, + Details: entry.Description, + }) + return + } + + // Track errors + if entry.Status == "error" || entry.Error != "" { + metrics.TotalErrors++ + if entry.ServerName != "" { + server := getOrCreateServer(metrics, entry.ServerName) + server.ErrorCount++ + + if entry.ToolName != "" { + tool := getOrCreateTool(server, entry.ToolName) + tool.ErrorCount++ + } + } + } + + // Process based on event type + switch entry.Event { + case "request", "tool_call", "rpc_call": + metrics.TotalRequests++ + + if entry.ServerName != "" { + server := getOrCreateServer(metrics, entry.ServerName) + server.RequestCount++ + + if entry.Duration > 0 { + server.TotalDuration += entry.Duration + metrics.TotalDuration += entry.Duration + } + + // Track tool calls + if entry.ToolName != "" || entry.Method != "" { + toolName := entry.ToolName + if toolName == "" { + toolName = entry.Method + } + + metrics.TotalToolCalls++ + server.ToolCallCount++ + + tool := getOrCreateTool(server, toolName) + tool.CallCount++ + + if entry.Duration > 0 { + tool.TotalDuration += entry.Duration + if tool.MaxDuration == 0 || entry.Duration > tool.MaxDuration { + tool.MaxDuration = entry.Duration + } + if tool.MinDuration == 0 || entry.Duration < tool.MinDuration { + tool.MinDuration = entry.Duration + } + } + + if entry.InputSize > 0 { + tool.TotalInputSize += entry.InputSize + } + if entry.OutputSize > 0 { + tool.TotalOutputSize += entry.OutputSize + } + } + } + } +} + +// getOrCreateServer gets or creates a server metrics entry +func getOrCreateServer(metrics *GatewayMetrics, serverName string) *GatewayServerMetrics { + if server, exists := metrics.Servers[serverName]; exists { + return server + } + + server := &GatewayServerMetrics{ + ServerName: serverName, + Tools: make(map[string]*GatewayToolMetrics), + } + metrics.Servers[serverName] = server + return server +} + +// getOrCreateTool gets or creates a tool metrics entry +func getOrCreateTool(server *GatewayServerMetrics, toolName string) *GatewayToolMetrics { + if tool, exists := server.Tools[toolName]; exists { + return tool + } + + tool := &GatewayToolMetrics{ + ToolName: toolName, + } + server.Tools[toolName] = tool + return tool +} + +// calculateGatewayAggregates calculates aggregate statistics +func calculateGatewayAggregates(metrics *GatewayMetrics) { + for _, server := range metrics.Servers { + for _, tool := range server.Tools { + if tool.CallCount > 0 { + tool.AvgDuration = tool.TotalDuration / float64(tool.CallCount) + } + } + } +} diff --git a/pkg/cli/gateway_logs_parser.go b/pkg/cli/gateway_logs_parser.go new file mode 100644 index 00000000000..d4c15bb20cd --- /dev/null +++ b/pkg/cli/gateway_logs_parser.go @@ -0,0 +1,329 @@ +// This file provides log parsing functions for MCP gateway log files. +// +// It handles parsing of: +// - gateway.jsonl (preferred format, written by MCP Gateway) +// - rpc-messages.jsonl (canonical fallback, written by Copilot CLI) +// +// Type definitions used here are in gateway_logs_types.go. +// Metrics computation helpers are in gateway_logs_metrics.go. +// Rendering functions are in gateway_logs_render.go. + +package cli + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/github/gh-aw/pkg/console" + "github.com/github/gh-aw/pkg/logger" +) + +var gatewayLogsLog = logger.New("cli:gateway_logs") + +// maxScannerBufferSize is the maximum scanner buffer for large JSONL payloads (1 MB). +const maxScannerBufferSize = 1024 * 1024 + +// parseRPCMessages parses a rpc-messages.jsonl file and extracts GatewayMetrics. +// This is the canonical fallback when gateway.jsonl is not available. +func parseRPCMessages(logPath string, verbose bool) (*GatewayMetrics, error) { + gatewayLogsLog.Printf("Parsing rpc-messages.jsonl from: %s", logPath) + + file, err := os.Open(logPath) + if err != nil { + return nil, fmt.Errorf("failed to open rpc-messages.jsonl: %w", err) + } + defer file.Close() + + metrics := &GatewayMetrics{ + Servers: make(map[string]*GatewayServerMetrics), + } + + // Track pending requests by (serverID, id) for duration calculation. + // Key format: "/" + pendingRequests := make(map[string]*rpcPendingRequest) + + scanner := bufio.NewScanner(file) + // Increase scanner buffer for large payloads + buf := make([]byte, maxScannerBufferSize) + scanner.Buffer(buf, maxScannerBufferSize) + lineNum := 0 + + for scanner.Scan() { + lineNum++ + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + + var entry RPCMessageEntry + if err := json.Unmarshal([]byte(line), &entry); err != nil { + gatewayLogsLog.Printf("Failed to parse rpc-messages.jsonl line %d: %v", lineNum, err) + if verbose { + fmt.Fprintln(os.Stderr, console.FormatWarningMessage( + fmt.Sprintf("Failed to parse rpc-messages.jsonl line %d: %v", lineNum, err))) + } + continue + } + + // Update time range + if entry.Timestamp != "" { + if t, err := time.Parse(time.RFC3339Nano, entry.Timestamp); err == nil { + if metrics.StartTime.IsZero() || t.Before(metrics.StartTime) { + metrics.StartTime = t + } + if metrics.EndTime.IsZero() || t.After(metrics.EndTime) { + metrics.EndTime = t + } + } + } + + if entry.ServerID == "" { + continue + } + + switch { + case entry.Type == "DIFC_FILTERED": + // DIFC integrity/secrecy filter event — not a REQUEST or RESPONSE + metrics.TotalFiltered++ + server := getOrCreateServer(metrics, entry.ServerID) + server.FilteredCount++ + metrics.FilteredEvents = append(metrics.FilteredEvents, DifcFilteredEvent{ + Timestamp: entry.Timestamp, + ServerID: entry.ServerID, + ToolName: entry.ToolName, + Description: entry.Description, + Reason: entry.Reason, + SecrecyTags: entry.SecrecyTags, + IntegrityTags: entry.IntegrityTags, + AuthorAssociation: entry.AuthorAssociation, + AuthorLogin: entry.AuthorLogin, + HTMLURL: entry.HTMLURL, + Number: entry.Number, + }) + + case entry.Direction == "OUT" && entry.Type == "REQUEST": + // Outgoing request from AI engine to MCP server + var req rpcRequestPayload + if err := json.Unmarshal(entry.Payload, &req); err != nil { + continue + } + if req.Method != "tools/call" { + continue + } + + // Extract tool name + var params rpcToolCallParams + if err := json.Unmarshal(req.Params, ¶ms); err != nil || params.Name == "" { + continue + } + + metrics.TotalRequests++ + server := getOrCreateServer(metrics, entry.ServerID) + server.RequestCount++ + metrics.TotalToolCalls++ + server.ToolCallCount++ + + tool := getOrCreateTool(server, params.Name) + tool.CallCount++ + + // Store pending request for duration calculation + if req.ID != nil && entry.Timestamp != "" { + if t, err := time.Parse(time.RFC3339Nano, entry.Timestamp); err == nil { + key := fmt.Sprintf("%s/%v", entry.ServerID, req.ID) + pendingRequests[key] = &rpcPendingRequest{ + ServerID: entry.ServerID, + ToolName: params.Name, + Timestamp: t, + } + } + } + + case entry.Direction == "IN" && entry.Type == "RESPONSE": + // Incoming response from MCP server to AI engine + var resp rpcResponsePayload + if err := json.Unmarshal(entry.Payload, &resp); err != nil { + continue + } + + // Track errors and detect guard policy blocks + if resp.Error != nil { + metrics.TotalErrors++ + server := getOrCreateServer(metrics, entry.ServerID) + server.ErrorCount++ + + // Detect guard policy enforcement errors + if isGuardPolicyErrorCode(resp.Error.Code) { + metrics.TotalGuardBlocked++ + server.GuardPolicyBlocked++ + + // Determine tool name from pending request if available + toolName := "" + if resp.ID != nil { + key := fmt.Sprintf("%s/%v", entry.ServerID, resp.ID) + if pending, ok := pendingRequests[key]; ok { + toolName = pending.ToolName + } + } + + reason := guardPolicyReasonFromCode(resp.Error.Code) + if resp.Error.Data != nil && resp.Error.Data.Reason != "" { + reason = resp.Error.Data.Reason + } + + evt := GuardPolicyEvent{ + Timestamp: entry.Timestamp, + ServerID: entry.ServerID, + ToolName: toolName, + ErrorCode: resp.Error.Code, + Reason: reason, + Message: resp.Error.Message, + } + if resp.Error.Data != nil { + evt.Details = resp.Error.Data.Details + evt.Repository = resp.Error.Data.Repository + } + metrics.GuardPolicyEvents = append(metrics.GuardPolicyEvents, evt) + } + } + + // Calculate duration by matching with pending request + if resp.ID != nil && entry.Timestamp != "" { + key := fmt.Sprintf("%s/%v", entry.ServerID, resp.ID) + if pending, ok := pendingRequests[key]; ok { + delete(pendingRequests, key) + if t, err := time.Parse(time.RFC3339Nano, entry.Timestamp); err == nil { + durationMs := float64(t.Sub(pending.Timestamp).Milliseconds()) + if durationMs >= 0 { + server := getOrCreateServer(metrics, entry.ServerID) + server.TotalDuration += durationMs + metrics.TotalDuration += durationMs + + tool := getOrCreateTool(server, pending.ToolName) + tool.TotalDuration += durationMs + if tool.MaxDuration == 0 || durationMs > tool.MaxDuration { + tool.MaxDuration = durationMs + } + if tool.MinDuration == 0 || durationMs < tool.MinDuration { + tool.MinDuration = durationMs + } + + if resp.Error != nil { + tool.ErrorCount++ + } + } + } + } + } + } + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error reading rpc-messages.jsonl: %w", err) + } + + calculateGatewayAggregates(metrics) + + gatewayLogsLog.Printf("Successfully parsed rpc-messages.jsonl: %d servers, %d total requests", + len(metrics.Servers), metrics.TotalRequests) + + return metrics, nil +} + +// findRPCMessagesPath returns the path to rpc-messages.jsonl if it exists, or "" if not found. +func findRPCMessagesPath(logDir string) string { + // Check mcp-logs subdirectory (standard location) + mcpLogsPath := filepath.Join(logDir, "mcp-logs", "rpc-messages.jsonl") + if _, err := os.Stat(mcpLogsPath); err == nil { + return mcpLogsPath + } + // Check root directory as fallback + rootPath := filepath.Join(logDir, "rpc-messages.jsonl") + if _, err := os.Stat(rootPath); err == nil { + return rootPath + } + return "" +} + +// parseGatewayLogs parses a gateway.jsonl file and extracts metrics. +// Falls back to rpc-messages.jsonl (canonical fallback) when gateway.jsonl is not present. +func parseGatewayLogs(logDir string, verbose bool) (*GatewayMetrics, error) { + // Try root directory first (for older logs where gateway.jsonl was in the root) + gatewayLogPath := filepath.Join(logDir, "gateway.jsonl") + + // Check if gateway.jsonl exists in root + if _, err := os.Stat(gatewayLogPath); os.IsNotExist(err) { + // Try mcp-logs subdirectory (new path after artifact download) + // Gateway logs are uploaded from /tmp/gh-aw/mcp-logs/gateway.jsonl and the common parent + // /tmp/gh-aw/ is stripped during artifact upload, resulting in mcp-logs/gateway.jsonl after download + mcpLogsPath := filepath.Join(logDir, "mcp-logs", "gateway.jsonl") + if _, err := os.Stat(mcpLogsPath); os.IsNotExist(err) { + // Fall back to rpc-messages.jsonl (canonical fallback when gateway.jsonl is missing) + rpcPath := findRPCMessagesPath(logDir) + if rpcPath != "" { + gatewayLogsLog.Printf("gateway.jsonl not found; falling back to rpc-messages.jsonl: %s", rpcPath) + return parseRPCMessages(rpcPath, verbose) + } + gatewayLogsLog.Printf("gateway.jsonl not found at: %s or %s", gatewayLogPath, mcpLogsPath) + return nil, errors.New("gateway.jsonl not found") + } + gatewayLogPath = mcpLogsPath + gatewayLogsLog.Printf("Found gateway.jsonl in mcp-logs subdirectory") + } + + gatewayLogsLog.Printf("Parsing gateway.jsonl from: %s", gatewayLogPath) + + file, err := os.Open(gatewayLogPath) + if err != nil { + return nil, fmt.Errorf("failed to open gateway.jsonl: %w", err) + } + defer file.Close() + + metrics := &GatewayMetrics{ + Servers: make(map[string]*GatewayServerMetrics), + } + + scanner := bufio.NewScanner(file) + buf := make([]byte, maxScannerBufferSize) + scanner.Buffer(buf, maxScannerBufferSize) + lineNum := 0 + + for scanner.Scan() { + lineNum++ + line := strings.TrimSpace(scanner.Text()) + + // Skip empty lines + if line == "" { + continue + } + + var entry GatewayLogEntry + if err := json.Unmarshal([]byte(line), &entry); err != nil { + gatewayLogsLog.Printf("Failed to parse line %d: %v", lineNum, err) + if verbose { + fmt.Fprintln(os.Stderr, console.FormatWarningMessage(fmt.Sprintf("Failed to parse gateway.jsonl line %d: %v", lineNum, err))) + } + continue + } + + // Process the entry based on its type/event + processGatewayLogEntry(&entry, metrics, verbose) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error reading gateway.jsonl: %w", err) + } + + // Calculate aggregate statistics + calculateGatewayAggregates(metrics) + + gatewayLogsLog.Printf("Successfully parsed gateway.jsonl: %d servers, %d total requests", + len(metrics.Servers), metrics.TotalRequests) + + return metrics, nil +} diff --git a/pkg/cli/gateway_logs_render.go b/pkg/cli/gateway_logs_render.go new file mode 100644 index 00000000000..6df6c05f766 --- /dev/null +++ b/pkg/cli/gateway_logs_render.go @@ -0,0 +1,667 @@ +// This file provides rendering and display functions for MCP gateway metrics. +// +// It handles: +// - Rendering gateway metrics as console tables (renderGatewayMetricsTable) +// - Building individual tool call records (buildToolCallsFromRPCMessages) +// - Extracting MCP tool usage data for the audit report (extractMCPToolUsageData) +// - Building guard policy summaries (buildGuardPolicySummary) +// - Displaying aggregated metrics across multiple runs (displayAggregatedGatewayMetrics) +// +// Type definitions are in gateway_logs_types.go. +// Log file parsing is in gateway_logs_parser.go. +// Metrics computation helpers are in gateway_logs_metrics.go. + +package cli + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "time" + + "github.com/github/gh-aw/pkg/console" + "github.com/github/gh-aw/pkg/sliceutil" + "github.com/github/gh-aw/pkg/timeutil" +) + +// renderGatewayMetricsTable renders gateway metrics as a console table +func renderGatewayMetricsTable(metrics *GatewayMetrics, verbose bool) string { + if metrics == nil || len(metrics.Servers) == 0 { + return "" + } + + var output strings.Builder + + output.WriteString("\n") + output.WriteString(console.FormatInfoMessage("MCP Gateway Metrics")) + output.WriteString("\n\n") + + // Summary statistics + fmt.Fprintf(&output, "Total Requests: %d\n", metrics.TotalRequests) + fmt.Fprintf(&output, "Total Tool Calls: %d\n", metrics.TotalToolCalls) + fmt.Fprintf(&output, "Total Errors: %d\n", metrics.TotalErrors) + if metrics.TotalFiltered > 0 { + fmt.Fprintf(&output, "Total DIFC Filtered: %d\n", metrics.TotalFiltered) + } + if metrics.TotalGuardBlocked > 0 { + fmt.Fprintf(&output, "Total Guard Policy Blocked: %d\n", metrics.TotalGuardBlocked) + } + fmt.Fprintf(&output, "Servers: %d\n", len(metrics.Servers)) + + if !metrics.StartTime.IsZero() && !metrics.EndTime.IsZero() { + duration := metrics.EndTime.Sub(metrics.StartTime) + fmt.Fprintf(&output, "Time Range: %s\n", duration.Round(time.Second)) + } + + output.WriteString("\n") + + // Server metrics table + if len(metrics.Servers) > 0 { + // Sort servers by request count + serverNames := getSortedServerNames(metrics) + + hasFiltered := metrics.TotalFiltered > 0 + hasGuardPolicy := metrics.TotalGuardBlocked > 0 + serverRows := make([][]string, 0, len(serverNames)) + for _, serverName := range serverNames { + server := metrics.Servers[serverName] + avgTime := 0.0 + if server.RequestCount > 0 { + avgTime = server.TotalDuration / float64(server.RequestCount) + } + row := []string{ + serverName, + strconv.Itoa(server.RequestCount), + strconv.Itoa(server.ToolCallCount), + fmt.Sprintf("%.0fms", avgTime), + strconv.Itoa(server.ErrorCount), + } + if hasFiltered { + row = append(row, strconv.Itoa(server.FilteredCount)) + } + if hasGuardPolicy { + row = append(row, strconv.Itoa(server.GuardPolicyBlocked)) + } + serverRows = append(serverRows, row) + } + + headers := []string{"Server", "Requests", "Tool Calls", "Avg Time", "Errors"} + if hasFiltered { + headers = append(headers, "Filtered") + } + if hasGuardPolicy { + headers = append(headers, "Guard Blocked") + } + output.WriteString(console.RenderTable(console.TableConfig{ + Title: "Server Usage", + Headers: headers, + Rows: serverRows, + })) + } + + // DIFC filtered events table + if len(metrics.FilteredEvents) > 0 { + output.WriteString("\n") + filteredRows := make([][]string, 0, len(metrics.FilteredEvents)) + for _, fe := range metrics.FilteredEvents { + reason := fe.Reason + if len(reason) > 80 { + reason = reason[:77] + "..." + } + filteredRows = append(filteredRows, []string{ + fe.ServerID, + fe.ToolName, + fe.AuthorLogin, + reason, + }) + } + output.WriteString(console.RenderTable(console.TableConfig{ + Title: "DIFC Filtered Events", + Headers: []string{"Server", "Tool", "User", "Reason"}, + Rows: filteredRows, + })) + } + + // Guard policy events table + if len(metrics.GuardPolicyEvents) > 0 { + output.WriteString("\n") + guardRows := make([][]string, 0, len(metrics.GuardPolicyEvents)) + for _, gpe := range metrics.GuardPolicyEvents { + message := gpe.Message + if len(message) > 60 { + message = message[:57] + "..." + } + repo := gpe.Repository + if repo == "" { + repo = "-" + } + guardRows = append(guardRows, []string{ + gpe.ServerID, + gpe.ToolName, + gpe.Reason, + message, + repo, + }) + } + output.WriteString(console.RenderTable(console.TableConfig{ + Title: "Guard Policy Blocked Events", + Headers: []string{"Server", "Tool", "Reason", "Message", "Repository"}, + Rows: guardRows, + })) + } + + // Tool metrics table (if verbose) + if verbose { + output.WriteString("\n") + output.WriteString("Tool Usage Details:\n") + + for _, serverName := range getSortedServerNames(metrics) { + server := metrics.Servers[serverName] + if len(server.Tools) == 0 { + continue + } + + // Sort tools by call count + toolNames := sliceutil.MapToSlice(server.Tools) + sort.Slice(toolNames, func(i, j int) bool { + return server.Tools[toolNames[i]].CallCount > server.Tools[toolNames[j]].CallCount + }) + + toolRows := make([][]string, 0, len(toolNames)) + for _, toolName := range toolNames { + tool := server.Tools[toolName] + toolRows = append(toolRows, []string{ + toolName, + strconv.Itoa(tool.CallCount), + fmt.Sprintf("%.0fms", tool.AvgDuration), + fmt.Sprintf("%.0fms", tool.MaxDuration), + strconv.Itoa(tool.ErrorCount), + }) + } + + output.WriteString(console.RenderTable(console.TableConfig{ + Title: serverName, + Headers: []string{"Tool", "Calls", "Avg Time", "Max Time", "Errors"}, + Rows: toolRows, + })) + } + } + + return output.String() +} + +// getSortedServerNames returns server names sorted by request count +func getSortedServerNames(metrics *GatewayMetrics) []string { + names := sliceutil.MapToSlice(metrics.Servers) + sort.Slice(names, func(i, j int) bool { + return metrics.Servers[names[i]].RequestCount > metrics.Servers[names[j]].RequestCount + }) + return names +} + +// buildToolCallsFromRPCMessages reads rpc-messages.jsonl and builds MCPToolCall records. +// Duration is computed by pairing outgoing requests with incoming responses. +// Input/output sizes are not available in rpc-messages.jsonl and will be 0. +func buildToolCallsFromRPCMessages(logPath string) ([]MCPToolCall, error) { + file, err := os.Open(logPath) + if err != nil { + return nil, fmt.Errorf("failed to open rpc-messages.jsonl: %w", err) + } + defer file.Close() + + type pendingCall struct { + serverID string + toolName string + timestamp time.Time + } + pending := make(map[string]*pendingCall) // key: "/" + + // Collect requests first to pair with responses + type rawEntry struct { + entry RPCMessageEntry + req rpcRequestPayload + resp rpcResponsePayload + valid bool + } + var entries []rawEntry + + scanner := bufio.NewScanner(file) + buf := make([]byte, maxScannerBufferSize) + scanner.Buffer(buf, maxScannerBufferSize) + + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + var e RPCMessageEntry + if err := json.Unmarshal([]byte(line), &e); err != nil { + continue + } + entries = append(entries, rawEntry{entry: e, valid: true}) + } + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error reading rpc-messages.jsonl: %w", err) + } + + // toolCalls and processedKeys are declared here so requests without IDs can be + // appended during the first pass (index pass) below, before the second pass runs. + var toolCalls []MCPToolCall + processedKeys := make(map[string]bool) + + // First pass: index outgoing tool-call requests by (serverID, id) + for i := range entries { + e := &entries[i] + if e.entry.Direction != "OUT" || e.entry.Type != "REQUEST" { + continue + } + if err := json.Unmarshal(e.entry.Payload, &e.req); err != nil || e.req.Method != "tools/call" { + continue + } + var params rpcToolCallParams + if err := json.Unmarshal(e.req.Params, ¶ms); err != nil || params.Name == "" { + continue + } + if e.req.ID == nil { + // Requests without an ID cannot be matched to responses. + // Emit the tool call immediately with "unknown" status so it appears + // in the tool_calls list (same as parseRPCMessages counts it in the summary). + toolCalls = append(toolCalls, MCPToolCall{ + Timestamp: e.entry.Timestamp, + ServerName: e.entry.ServerID, + ToolName: params.Name, + Status: "unknown", + }) + continue + } + t, err := time.Parse(time.RFC3339Nano, e.entry.Timestamp) + if err != nil { + continue + } + key := fmt.Sprintf("%s/%v", e.entry.ServerID, e.req.ID) + pending[key] = &pendingCall{ + serverID: e.entry.ServerID, + toolName: params.Name, + timestamp: t, + } + } + + // Second pass: match incoming responses with pending requests to compute durations + + for i := range entries { + e := &entries[i] + switch { + case e.entry.Direction == "OUT" && e.entry.Type == "REQUEST": + // Outgoing tool-call request – we'll emit the record when we see the response + // (or after if no response found) + case e.entry.Direction == "IN" && e.entry.Type == "RESPONSE": + if err := json.Unmarshal(e.entry.Payload, &e.resp); err != nil { + continue + } + if e.resp.ID == nil { + continue + } + key := fmt.Sprintf("%s/%v", e.entry.ServerID, e.resp.ID) + p, ok := pending[key] + if !ok { + continue + } + processedKeys[key] = true + + call := MCPToolCall{ + Timestamp: p.timestamp.Format(time.RFC3339Nano), + ServerName: p.serverID, + ToolName: p.toolName, + Status: "success", + } + if e.resp.Error != nil { + call.Status = "error" + call.Error = e.resp.Error.Message + } + if t, err := time.Parse(time.RFC3339Nano, e.entry.Timestamp); err == nil { + d := t.Sub(p.timestamp) + if d >= 0 { + call.Duration = timeutil.FormatDuration(d) + } + } + toolCalls = append(toolCalls, call) + } + } + + // Emit any requests that never received a response + for key, p := range pending { + if !processedKeys[key] { + toolCalls = append(toolCalls, MCPToolCall{ + Timestamp: p.timestamp.Format(time.RFC3339Nano), + ServerName: p.serverID, + ToolName: p.toolName, + Status: "unknown", + }) + } + } + + return toolCalls, nil +} + +// extractMCPToolUsageData creates detailed MCP tool usage data from gateway metrics +func extractMCPToolUsageData(logDir string, verbose bool) (*MCPToolUsageData, error) { + // Parse gateway logs (falls back to rpc-messages.jsonl automatically) + gatewayMetrics, err := parseGatewayLogs(logDir, verbose) + if err != nil { + // Return nil if no log file exists (not an error for workflows without MCP) + if strings.Contains(err.Error(), "not found") { + return nil, nil + } + return nil, fmt.Errorf("failed to parse gateway logs: %w", err) + } + + if gatewayMetrics == nil || len(gatewayMetrics.Servers) == 0 { + return nil, nil + } + + mcpData := &MCPToolUsageData{ + Summary: []MCPToolSummary{}, + ToolCalls: []MCPToolCall{}, + Servers: []MCPServerStats{}, + FilteredEvents: gatewayMetrics.FilteredEvents, + } + + // Build guard policy summary if there are guard policy events + if len(gatewayMetrics.GuardPolicyEvents) > 0 { + mcpData.GuardPolicySummary = buildGuardPolicySummary(gatewayMetrics) + } + + // Read the log file again to get individual tool call records. + // Prefer gateway.jsonl; fall back to rpc-messages.jsonl when not available. + gatewayLogPath := filepath.Join(logDir, "gateway.jsonl") + usingRPCMessages := false + + if _, err := os.Stat(gatewayLogPath); os.IsNotExist(err) { + mcpLogsPath := filepath.Join(logDir, "mcp-logs", "gateway.jsonl") + if _, err := os.Stat(mcpLogsPath); os.IsNotExist(err) { + // Fall back to rpc-messages.jsonl + rpcPath := findRPCMessagesPath(logDir) + if rpcPath == "" { + return nil, errors.New("gateway.jsonl not found") + } + gatewayLogPath = rpcPath + usingRPCMessages = true + } else { + gatewayLogPath = mcpLogsPath + } + } + + if usingRPCMessages { + // Build tool call records from rpc-messages.jsonl + toolCalls, err := buildToolCallsFromRPCMessages(gatewayLogPath) + if err != nil { + return nil, fmt.Errorf("failed to read rpc-messages.jsonl: %w", err) + } + mcpData.ToolCalls = toolCalls + } else { + file, err := os.Open(gatewayLogPath) + if err != nil { + return nil, fmt.Errorf("failed to open gateway.jsonl: %w", err) + } + defer file.Close() + + scanner := bufio.NewScanner(file) + buf := make([]byte, maxScannerBufferSize) + scanner.Buffer(buf, maxScannerBufferSize) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + + var entry GatewayLogEntry + if err := json.Unmarshal([]byte(line), &entry); err != nil { + continue // Skip malformed lines + } + + // Only process tool call events + if entry.Event == "tool_call" || entry.Event == "rpc_call" || entry.Event == "request" { + toolName := entry.ToolName + if toolName == "" { + toolName = entry.Method + } + + // Skip entries without tool information + if entry.ServerName == "" || toolName == "" { + continue + } + + // Create individual tool call record + toolCall := MCPToolCall{ + Timestamp: entry.Timestamp, + ServerName: entry.ServerName, + ToolName: toolName, + Method: entry.Method, + InputSize: entry.InputSize, + OutputSize: entry.OutputSize, + Status: entry.Status, + Error: entry.Error, + } + + if entry.Duration > 0 { + toolCall.Duration = timeutil.FormatDuration(time.Duration(entry.Duration * float64(time.Millisecond))) + } + + mcpData.ToolCalls = append(mcpData.ToolCalls, toolCall) + } + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error reading gateway.jsonl: %w", err) + } + } + + // Build summary statistics from aggregated metrics + for serverName, serverMetrics := range gatewayMetrics.Servers { + // Server-level stats + serverStats := MCPServerStats{ + ServerName: serverName, + RequestCount: serverMetrics.RequestCount, + ToolCallCount: serverMetrics.ToolCallCount, + TotalInputSize: 0, + TotalOutputSize: 0, + ErrorCount: serverMetrics.ErrorCount, + } + + if serverMetrics.RequestCount > 0 { + avgDur := serverMetrics.TotalDuration / float64(serverMetrics.RequestCount) + serverStats.AvgDuration = timeutil.FormatDuration(time.Duration(avgDur * float64(time.Millisecond))) + } + + // Tool-level stats + for toolName, toolMetrics := range serverMetrics.Tools { + summary := MCPToolSummary{ + ServerName: serverName, + ToolName: toolName, + CallCount: toolMetrics.CallCount, + TotalInputSize: toolMetrics.TotalInputSize, + TotalOutputSize: toolMetrics.TotalOutputSize, + MaxInputSize: 0, // Will be calculated below + MaxOutputSize: 0, // Will be calculated below + ErrorCount: toolMetrics.ErrorCount, + } + + if toolMetrics.AvgDuration > 0 { + summary.AvgDuration = timeutil.FormatDuration(time.Duration(toolMetrics.AvgDuration * float64(time.Millisecond))) + } + if toolMetrics.MaxDuration > 0 { + summary.MaxDuration = timeutil.FormatDuration(time.Duration(toolMetrics.MaxDuration * float64(time.Millisecond))) + } + + // Calculate max input/output sizes from individual tool calls + for _, tc := range mcpData.ToolCalls { + if tc.ServerName == serverName && tc.ToolName == toolName { + if tc.InputSize > summary.MaxInputSize { + summary.MaxInputSize = tc.InputSize + } + if tc.OutputSize > summary.MaxOutputSize { + summary.MaxOutputSize = tc.OutputSize + } + } + } + + mcpData.Summary = append(mcpData.Summary, summary) + + // Update server totals + serverStats.TotalInputSize += toolMetrics.TotalInputSize + serverStats.TotalOutputSize += toolMetrics.TotalOutputSize + } + + mcpData.Servers = append(mcpData.Servers, serverStats) + } + + // Sort summaries by server name, then tool name + sort.Slice(mcpData.Summary, func(i, j int) bool { + if mcpData.Summary[i].ServerName != mcpData.Summary[j].ServerName { + return mcpData.Summary[i].ServerName < mcpData.Summary[j].ServerName + } + return mcpData.Summary[i].ToolName < mcpData.Summary[j].ToolName + }) + + // Sort servers by name + sort.Slice(mcpData.Servers, func(i, j int) bool { + return mcpData.Servers[i].ServerName < mcpData.Servers[j].ServerName + }) + + return mcpData, nil +} + +// buildGuardPolicySummary creates a GuardPolicySummary from GatewayMetrics. +func buildGuardPolicySummary(metrics *GatewayMetrics) *GuardPolicySummary { + summary := &GuardPolicySummary{ + TotalBlocked: metrics.TotalGuardBlocked, + Events: metrics.GuardPolicyEvents, + BlockedToolCounts: make(map[string]int), + BlockedServerCounts: make(map[string]int), + } + + for _, evt := range metrics.GuardPolicyEvents { + // Categorize by error code + switch evt.ErrorCode { + case guardPolicyErrorCodeIntegrityBelowMin: + summary.IntegrityBlocked++ + case guardPolicyErrorCodeRepoNotAllowed: + summary.RepoScopeBlocked++ + case guardPolicyErrorCodeAccessDenied: + summary.AccessDenied++ + case guardPolicyErrorCodeBlockedUser: + summary.BlockedUserDenied++ + case guardPolicyErrorCodeInsufficientPerms: + summary.PermissionDenied++ + case guardPolicyErrorCodePrivateRepoDenied: + summary.PrivateRepoDenied++ + } + + // Track per-tool blocked counts + if evt.ToolName != "" { + summary.BlockedToolCounts[evt.ToolName]++ + } + + // Track per-server blocked counts + if evt.ServerID != "" { + summary.BlockedServerCounts[evt.ServerID]++ + } + } + + return summary +} + +// displayAggregatedGatewayMetrics aggregates and displays gateway metrics across all processed runs +func displayAggregatedGatewayMetrics(processedRuns []ProcessedRun, outputDir string, verbose bool) { + // Aggregate gateway metrics from all runs + aggregated := &GatewayMetrics{ + Servers: make(map[string]*GatewayServerMetrics), + } + + runCount := 0 + for _, pr := range processedRuns { + runDir := pr.Run.LogsPath + if runDir == "" { + continue + } + + // Try to parse gateway.jsonl from this run + runMetrics, err := parseGatewayLogs(runDir, false) + if err != nil { + // Skip runs without gateway.jsonl (this is normal for runs without MCP gateway) + continue + } + + runCount++ + + // Merge metrics from this run into aggregated metrics + aggregated.TotalRequests += runMetrics.TotalRequests + aggregated.TotalToolCalls += runMetrics.TotalToolCalls + aggregated.TotalErrors += runMetrics.TotalErrors + aggregated.TotalFiltered += runMetrics.TotalFiltered + aggregated.TotalGuardBlocked += runMetrics.TotalGuardBlocked + aggregated.TotalDuration += runMetrics.TotalDuration + aggregated.FilteredEvents = append(aggregated.FilteredEvents, runMetrics.FilteredEvents...) + aggregated.GuardPolicyEvents = append(aggregated.GuardPolicyEvents, runMetrics.GuardPolicyEvents...) + + // Merge server metrics + for serverName, serverMetrics := range runMetrics.Servers { + aggServer := getOrCreateServer(aggregated, serverName) + aggServer.RequestCount += serverMetrics.RequestCount + aggServer.ToolCallCount += serverMetrics.ToolCallCount + aggServer.TotalDuration += serverMetrics.TotalDuration + aggServer.ErrorCount += serverMetrics.ErrorCount + aggServer.FilteredCount += serverMetrics.FilteredCount + aggServer.GuardPolicyBlocked += serverMetrics.GuardPolicyBlocked + + // Merge tool metrics + for toolName, toolMetrics := range serverMetrics.Tools { + aggTool := getOrCreateTool(aggServer, toolName) + aggTool.CallCount += toolMetrics.CallCount + aggTool.TotalDuration += toolMetrics.TotalDuration + aggTool.ErrorCount += toolMetrics.ErrorCount + aggTool.TotalInputSize += toolMetrics.TotalInputSize + aggTool.TotalOutputSize += toolMetrics.TotalOutputSize + + // Update max/min durations + if toolMetrics.MaxDuration > aggTool.MaxDuration { + aggTool.MaxDuration = toolMetrics.MaxDuration + } + if aggTool.MinDuration == 0 || (toolMetrics.MinDuration > 0 && toolMetrics.MinDuration < aggTool.MinDuration) { + aggTool.MinDuration = toolMetrics.MinDuration + } + } + } + + // Update time range + if aggregated.StartTime.IsZero() || (!runMetrics.StartTime.IsZero() && runMetrics.StartTime.Before(aggregated.StartTime)) { + aggregated.StartTime = runMetrics.StartTime + } + if aggregated.EndTime.IsZero() || (!runMetrics.EndTime.IsZero() && runMetrics.EndTime.After(aggregated.EndTime)) { + aggregated.EndTime = runMetrics.EndTime + } + } + + // Only display if we found gateway metrics + if runCount == 0 || len(aggregated.Servers) == 0 { + return + } + + // Recalculate averages for aggregated data + calculateGatewayAggregates(aggregated) + + // Display the aggregated metrics + if metricsOutput := renderGatewayMetricsTable(aggregated, verbose); metricsOutput != "" { + fmt.Fprint(os.Stderr, metricsOutput) + if runCount > 1 { + fmt.Fprintf(os.Stderr, "\n%s\n", + console.FormatInfoMessage(fmt.Sprintf("Gateway metrics aggregated from %d runs", runCount))) + } + } +} diff --git a/pkg/cli/gateway_logs_types.go b/pkg/cli/gateway_logs_types.go new file mode 100644 index 00000000000..f9c2ee934c0 --- /dev/null +++ b/pkg/cli/gateway_logs_types.go @@ -0,0 +1,207 @@ +// This file provides type definitions for gateway log parsing and metrics. +// +// Types defined here are used across the gateway log sub-files: +// - gateway_logs_parser.go — parseRPCMessages, findRPCMessagesPath, parseGatewayLogs +// - gateway_logs_metrics.go — processGatewayLogEntry, getOrCreateServer, calculateGatewayAggregates +// - gateway_logs_render.go — renderGatewayMetricsTable, displayAggregatedGatewayMetrics + +package cli + +import ( + "encoding/json" + "time" +) + +// GatewayLogEntry represents a single log entry from gateway.jsonl +type GatewayLogEntry struct { + Timestamp string `json:"timestamp"` + Level string `json:"level"` + Type string `json:"type"` + Event string `json:"event"` + ServerName string `json:"server_name,omitempty"` + ServerID string `json:"server_id,omitempty"` // used by DIFC_FILTERED events + ToolName string `json:"tool_name,omitempty"` + Method string `json:"method,omitempty"` + Duration float64 `json:"duration,omitempty"` // in milliseconds + InputSize int `json:"input_size,omitempty"` + OutputSize int `json:"output_size,omitempty"` + Status string `json:"status,omitempty"` + Error string `json:"error,omitempty"` + Message string `json:"message,omitempty"` + Description string `json:"description,omitempty"` + Reason string `json:"reason,omitempty"` + SecrecyTags []string `json:"secrecy_tags,omitempty"` + IntegrityTags []string `json:"integrity_tags,omitempty"` + AuthorAssociation string `json:"author_association,omitempty"` + AuthorLogin string `json:"author_login,omitempty"` + HTMLURL string `json:"html_url,omitempty"` + Number string `json:"number,omitempty"` +} + +// DifcFilteredEvent represents a DIFC_FILTERED log entry from gateway.jsonl. +// These events occur when a tool call is blocked by DIFC integrity or secrecy checks. +type DifcFilteredEvent struct { + Timestamp string `json:"timestamp"` + ServerID string `json:"server_id"` + ToolName string `json:"tool_name"` + Description string `json:"description,omitempty"` + Reason string `json:"reason"` + SecrecyTags []string `json:"secrecy_tags,omitempty"` + IntegrityTags []string `json:"integrity_tags,omitempty"` + AuthorAssociation string `json:"author_association,omitempty"` + AuthorLogin string `json:"author_login,omitempty"` + HTMLURL string `json:"html_url,omitempty"` + Number string `json:"number,omitempty"` +} + +// Guard policy error codes from MCP Gateway. +// These JSON-RPC error codes indicate guard policy enforcement decisions. +const ( + guardPolicyErrorCodeAccessDenied = -32001 // General access denied + guardPolicyErrorCodeRepoNotAllowed = -32002 // Repository not in allowlist (repos) + guardPolicyErrorCodeInsufficientPerms = -32003 // Insufficient permissions (roles) + guardPolicyErrorCodePrivateRepoDenied = -32004 // Private repository access denied + guardPolicyErrorCodeBlockedUser = -32005 // Content from blocked user + guardPolicyErrorCodeIntegrityBelowMin = -32006 // Content integrity below minimum threshold (min-integrity) +) + +// GuardPolicyEvent represents a guard policy enforcement decision from the MCP Gateway. +// These events are extracted from JSON-RPC error responses with specific error codes +// (-32001 to -32006) in rpc-messages.jsonl. +type GuardPolicyEvent struct { + Timestamp string `json:"timestamp"` + ServerID string `json:"server_id"` + ToolName string `json:"tool_name"` + ErrorCode int `json:"error_code"` + Reason string `json:"reason"` // e.g., "repository_not_allowed", "min_integrity" + Message string `json:"message"` // Error message from JSON-RPC response + Details string `json:"details,omitempty"` // Additional details from error data + Repository string `json:"repository,omitempty"` // Repository involved (for repo scope blocks) +} + +// isGuardPolicyErrorCode returns true if the JSON-RPC error code indicates a +// guard policy enforcement decision. +func isGuardPolicyErrorCode(code int) bool { + return code >= guardPolicyErrorCodeIntegrityBelowMin && code <= guardPolicyErrorCodeAccessDenied +} + +// guardPolicyReasonFromCode returns a human-readable reason string for a guard policy error code. +func guardPolicyReasonFromCode(code int) string { + switch code { + case guardPolicyErrorCodeAccessDenied: + return "access_denied" + case guardPolicyErrorCodeRepoNotAllowed: + return "repo_not_allowed" + case guardPolicyErrorCodeInsufficientPerms: + return "insufficient_permissions" + case guardPolicyErrorCodePrivateRepoDenied: + return "private_repo_denied" + case guardPolicyErrorCodeBlockedUser: + return "blocked_user" + case guardPolicyErrorCodeIntegrityBelowMin: + return "integrity_below_minimum" + default: + return "unknown" + } +} + +// GatewayServerMetrics represents usage metrics for a single MCP server +type GatewayServerMetrics struct { + ServerName string + RequestCount int + ToolCallCount int + TotalDuration float64 // in milliseconds + ErrorCount int + FilteredCount int // number of DIFC_FILTERED events for this server + GuardPolicyBlocked int // number of tool calls blocked by guard policies for this server + Tools map[string]*GatewayToolMetrics +} + +// GatewayToolMetrics represents usage metrics for a specific tool +type GatewayToolMetrics struct { + ToolName string + CallCount int + TotalDuration float64 // in milliseconds + AvgDuration float64 // in milliseconds + MaxDuration float64 // in milliseconds + MinDuration float64 // in milliseconds + ErrorCount int + TotalInputSize int + TotalOutputSize int +} + +// GatewayMetrics represents aggregated metrics from gateway logs +type GatewayMetrics struct { + TotalRequests int + TotalToolCalls int + TotalErrors int + TotalFiltered int // number of DIFC_FILTERED events + TotalGuardBlocked int // number of tool calls blocked by guard policies + Servers map[string]*GatewayServerMetrics + FilteredEvents []DifcFilteredEvent + GuardPolicyEvents []GuardPolicyEvent + StartTime time.Time + EndTime time.Time + TotalDuration float64 // in milliseconds +} + +// RPCMessageEntry represents a single entry from rpc-messages.jsonl. +// This file is written by the Copilot CLI and contains raw JSON-RPC protocol messages +// exchanged between the AI engine and MCP servers, as well as DIFC_FILTERED events. +type RPCMessageEntry struct { + Timestamp string `json:"timestamp"` + Direction string `json:"direction"` // "IN" = received from server, "OUT" = sent to server; empty for DIFC_FILTERED + Type string `json:"type"` // "REQUEST", "RESPONSE", or "DIFC_FILTERED" + ServerID string `json:"server_id"` + Payload json.RawMessage `json:"payload"` + // Fields populated only for DIFC_FILTERED entries + ToolName string `json:"tool_name,omitempty"` + Description string `json:"description,omitempty"` + Reason string `json:"reason,omitempty"` + SecrecyTags []string `json:"secrecy_tags,omitempty"` + IntegrityTags []string `json:"integrity_tags,omitempty"` + AuthorAssociation string `json:"author_association,omitempty"` + AuthorLogin string `json:"author_login,omitempty"` + HTMLURL string `json:"html_url,omitempty"` + Number string `json:"number,omitempty"` +} + +// rpcRequestPayload represents the JSON-RPC request payload fields we care about. +type rpcRequestPayload struct { + Method string `json:"method"` + ID any `json:"id"` + Params json.RawMessage `json:"params"` +} + +// rpcToolCallParams represents the params for a tools/call request. +type rpcToolCallParams struct { + Name string `json:"name"` +} + +// rpcResponsePayload represents the JSON-RPC response payload fields we care about. +type rpcResponsePayload struct { + ID any `json:"id"` + Error *rpcError `json:"error,omitempty"` +} + +// rpcError represents a JSON-RPC error object. +type rpcError struct { + Code int `json:"code"` + Message string `json:"message"` + Data *rpcErrorData `json:"data,omitempty"` +} + +// rpcErrorData represents the optional data field in a JSON-RPC error, used by +// guard policy enforcement to communicate the reason and context for a denial. +type rpcErrorData struct { + Reason string `json:"reason,omitempty"` + Repository string `json:"repository,omitempty"` + Details string `json:"details,omitempty"` +} + +// rpcPendingRequest tracks an in-flight tool call for duration calculation. +type rpcPendingRequest struct { + ServerID string + ToolName string + Timestamp time.Time +} diff --git a/pkg/workflow/dependabot_wasm.go b/pkg/workflow/dependabot_wasm.go index 5003a80cad5..476ee40ce32 100644 --- a/pkg/workflow/dependabot_wasm.go +++ b/pkg/workflow/dependabot_wasm.go @@ -1,5 +1,9 @@ //go:build js || wasm +// This file provides WASM/JS no-op stubs for Dependabot manifest generation functions. +// The canonical (non-WASM) implementations live in dependabot.go. +// If any function signatures change in dependabot.go, this file must be updated to match. + package workflow func (c *Compiler) GenerateDependabotManifests(workflowDataList []*WorkflowData, workflowDir string, forceOverwrite bool) error { diff --git a/pkg/workflow/docker_validation_wasm.go b/pkg/workflow/docker_validation_wasm.go index 86926b543dd..2ddba0c79df 100644 --- a/pkg/workflow/docker_validation_wasm.go +++ b/pkg/workflow/docker_validation_wasm.go @@ -1,5 +1,9 @@ //go:build js || wasm +// This file provides WASM/JS no-op stubs for Docker validation functions. +// The canonical (non-WASM) implementations live in docker_validation.go. +// If any function signatures change in docker_validation.go, this file must be updated to match. + package workflow func validateDockerImage(image string, verbose bool) error { diff --git a/pkg/workflow/git_helpers_wasm.go b/pkg/workflow/git_helpers_wasm.go index 6c18ac251f5..b28d69d165e 100644 --- a/pkg/workflow/git_helpers_wasm.go +++ b/pkg/workflow/git_helpers_wasm.go @@ -1,5 +1,9 @@ //go:build js || wasm +// This file provides WASM/JS no-op stubs for git helper functions. +// The canonical (non-WASM) implementations live in git_helpers.go. +// If any function signatures change in git_helpers.go, this file must be updated to match. + package workflow import ( diff --git a/pkg/workflow/github_cli_wasm.go b/pkg/workflow/github_cli_wasm.go index 0b836037e5e..f38f1899e34 100644 --- a/pkg/workflow/github_cli_wasm.go +++ b/pkg/workflow/github_cli_wasm.go @@ -1,5 +1,9 @@ //go:build js || wasm +// This file provides WASM/JS no-op stubs for GitHub CLI functions. +// The canonical (non-WASM) implementations live in github_cli.go. +// If any function signatures change in github_cli.go, this file must be updated to match. + package workflow import ( diff --git a/pkg/workflow/add_labels.go b/pkg/workflow/labels.go similarity index 51% rename from pkg/workflow/add_labels.go rename to pkg/workflow/labels.go index 74b95d4fce1..27439996734 100644 --- a/pkg/workflow/add_labels.go +++ b/pkg/workflow/labels.go @@ -27,3 +27,27 @@ func (c *Compiler) parseAddLabelsConfig(outputMap map[string]any) *AddLabelsConf } return config } + +var removeLabelsLog = logger.New("workflow:remove_labels") + +// RemoveLabelsConfig holds configuration for removing labels from issues/PRs from agent output +type RemoveLabelsConfig struct { + BaseSafeOutputConfig `yaml:",inline"` + SafeOutputTargetConfig `yaml:",inline"` + Allowed []string `yaml:"allowed,omitempty"` // Optional list of allowed labels to remove. If omitted, any labels can be removed. + Blocked []string `yaml:"blocked,omitempty"` // Optional list of blocked label patterns (supports glob patterns like "~*", "*[bot]"). Labels matching these patterns will be rejected. +} + +// parseRemoveLabelsConfig handles remove-labels configuration +func (c *Compiler) parseRemoveLabelsConfig(outputMap map[string]any) *RemoveLabelsConfig { + config := parseConfigScaffold(outputMap, "remove-labels", removeLabelsLog, func(err error) *RemoveLabelsConfig { + removeLabelsLog.Printf("Failed to unmarshal config: %v", err) + // Handle null case: create empty config (allows any labels) + removeLabelsLog.Print("Using empty configuration (allows any labels)") + return &RemoveLabelsConfig{} + }) + if config != nil { + removeLabelsLog.Printf("Parsed configuration: allowed_count=%d, blocked_count=%d, target=%s", len(config.Allowed), len(config.Blocked), config.Target) + } + return config +} diff --git a/pkg/workflow/missing_data.go b/pkg/workflow/missing_data.go deleted file mode 100644 index c81eb32a155..00000000000 --- a/pkg/workflow/missing_data.go +++ /dev/null @@ -1,11 +0,0 @@ -package workflow - -import ( - "github.com/github/gh-aw/pkg/logger" -) - -var missingDataLog = logger.New("workflow:missing_data") - -func (c *Compiler) parseMissingDataConfig(outputMap map[string]any) *MissingDataConfig { - return c.parseIssueReportingConfig(outputMap, "missing-data", "[missing data]", missingDataLog) -} diff --git a/pkg/workflow/missing_issue_reporting.go b/pkg/workflow/missing_issue_reporting.go index 4abf71fcefd..f4c3a028b11 100644 --- a/pkg/workflow/missing_issue_reporting.go +++ b/pkg/workflow/missing_issue_reporting.go @@ -19,6 +19,38 @@ type IssueReportingConfig struct { type MissingDataConfig = IssueReportingConfig type MissingToolConfig = IssueReportingConfig +// ReportIncompleteConfig holds configuration for the report_incomplete safe output. +// report_incomplete is a structured signal that the agent could not complete its +// assigned task due to an infrastructure or tool failure (e.g., MCP server crash, +// missing authentication, inaccessible repository). +// +// When an agent emits report_incomplete, gh-aw activates failure handling even +// when the agent process exits 0 and other safe outputs were also emitted. +// This prevents semantically-empty outputs (e.g., a comment describing tool +// failures) from being classified as a successful result. +// +// ReportIncompleteConfig is a type alias for IssueReportingConfig so that it +// supports the same create-issue, title-prefix, and labels configuration fields +// as missing-tool and missing-data. +type ReportIncompleteConfig = IssueReportingConfig + +var missingToolLog = logger.New("workflow:missing_tool") +var missingDataLog = logger.New("workflow:missing_data") +var reportIncompleteLog = logger.New("workflow:report_incomplete") + +func (c *Compiler) parseMissingToolConfig(outputMap map[string]any) *MissingToolConfig { + return c.parseIssueReportingConfig(outputMap, "missing-tool", "[missing tool]", missingToolLog) +} + +func (c *Compiler) parseMissingDataConfig(outputMap map[string]any) *MissingDataConfig { + return c.parseIssueReportingConfig(outputMap, "missing-data", "[missing data]", missingDataLog) +} + +// parseReportIncompleteConfig handles report_incomplete configuration. +func (c *Compiler) parseReportIncompleteConfig(outputMap map[string]any) *ReportIncompleteConfig { + return c.parseIssueReportingConfig(outputMap, "report-incomplete", "[incomplete]", reportIncompleteLog) +} + func (c *Compiler) parseIssueReportingConfig(outputMap map[string]any, yamlKey, defaultTitle string, log *logger.Logger) *IssueReportingConfig { configData, exists := outputMap[yamlKey] if !exists { diff --git a/pkg/workflow/missing_tool.go b/pkg/workflow/missing_tool.go deleted file mode 100644 index 196644f53ea..00000000000 --- a/pkg/workflow/missing_tool.go +++ /dev/null @@ -1,11 +0,0 @@ -package workflow - -import ( - "github.com/github/gh-aw/pkg/logger" -) - -var missingToolLog = logger.New("workflow:missing_tool") - -func (c *Compiler) parseMissingToolConfig(outputMap map[string]any) *MissingToolConfig { - return c.parseIssueReportingConfig(outputMap, "missing-tool", "[missing tool]", missingToolLog) -} diff --git a/pkg/workflow/npm_validation_wasm.go b/pkg/workflow/npm_validation_wasm.go index 3ee567f6e92..971acf7b148 100644 --- a/pkg/workflow/npm_validation_wasm.go +++ b/pkg/workflow/npm_validation_wasm.go @@ -1,5 +1,9 @@ //go:build js || wasm +// This file provides WASM/JS no-op stubs for npm validation functions. +// The canonical (non-WASM) implementations live in npm_validation.go. +// If any function signatures change in npm_validation.go, this file must be updated to match. + package workflow import "errors" diff --git a/pkg/workflow/pip_validation_wasm.go b/pkg/workflow/pip_validation_wasm.go index c52e55e1fd0..286abd008e1 100644 --- a/pkg/workflow/pip_validation_wasm.go +++ b/pkg/workflow/pip_validation_wasm.go @@ -1,5 +1,9 @@ //go:build js || wasm +// This file provides WASM/JS no-op stubs for pip/uv validation functions. +// The canonical (non-WASM) implementations live in pip_validation.go. +// If any function signatures change in pip_validation.go, this file must be updated to match. + package workflow func (c *Compiler) validatePythonPackagesWithPip(packages []string, packageType string, pipCmd string) { diff --git a/pkg/workflow/remove_labels.go b/pkg/workflow/remove_labels.go deleted file mode 100644 index 1369d893ae7..00000000000 --- a/pkg/workflow/remove_labels.go +++ /dev/null @@ -1,29 +0,0 @@ -package workflow - -import ( - "github.com/github/gh-aw/pkg/logger" -) - -var removeLabelsLog = logger.New("workflow:remove_labels") - -// RemoveLabelsConfig holds configuration for removing labels from issues/PRs from agent output -type RemoveLabelsConfig struct { - BaseSafeOutputConfig `yaml:",inline"` - SafeOutputTargetConfig `yaml:",inline"` - Allowed []string `yaml:"allowed,omitempty"` // Optional list of allowed labels to remove. If omitted, any labels can be removed. - Blocked []string `yaml:"blocked,omitempty"` // Optional list of blocked label patterns (supports glob patterns like "~*", "*[bot]"). Labels matching these patterns will be rejected. -} - -// parseRemoveLabelsConfig handles remove-labels configuration -func (c *Compiler) parseRemoveLabelsConfig(outputMap map[string]any) *RemoveLabelsConfig { - config := parseConfigScaffold(outputMap, "remove-labels", removeLabelsLog, func(err error) *RemoveLabelsConfig { - removeLabelsLog.Printf("Failed to unmarshal config: %v", err) - // Handle null case: create empty config (allows any labels) - removeLabelsLog.Print("Using empty configuration (allows any labels)") - return &RemoveLabelsConfig{} - }) - if config != nil { - removeLabelsLog.Printf("Parsed configuration: allowed_count=%d, blocked_count=%d, target=%s", len(config.Allowed), len(config.Blocked), config.Target) - } - return config -} diff --git a/pkg/workflow/report_incomplete.go b/pkg/workflow/report_incomplete.go deleted file mode 100644 index 478ba3f0fe9..00000000000 --- a/pkg/workflow/report_incomplete.go +++ /dev/null @@ -1,27 +0,0 @@ -package workflow - -import ( - "github.com/github/gh-aw/pkg/logger" -) - -var reportIncompleteLog = logger.New("workflow:report_incomplete") - -// ReportIncompleteConfig holds configuration for the report_incomplete safe output. -// report_incomplete is a structured signal that the agent could not complete its -// assigned task due to an infrastructure or tool failure (e.g., MCP server crash, -// missing authentication, inaccessible repository). -// -// When an agent emits report_incomplete, gh-aw activates failure handling even -// when the agent process exits 0 and other safe outputs were also emitted. -// This prevents semantically-empty outputs (e.g., a comment describing tool -// failures) from being classified as a successful result. -// -// ReportIncompleteConfig is a type alias for IssueReportingConfig so that it -// supports the same create-issue, title-prefix, and labels configuration fields -// as missing-tool and missing-data. -type ReportIncompleteConfig = IssueReportingConfig - -// parseReportIncompleteConfig handles report_incomplete configuration. -func (c *Compiler) parseReportIncompleteConfig(outputMap map[string]any) *ReportIncompleteConfig { - return c.parseIssueReportingConfig(outputMap, "report-incomplete", "[incomplete]", reportIncompleteLog) -} diff --git a/pkg/workflow/repository_features_validation_wasm.go b/pkg/workflow/repository_features_validation_wasm.go index 124b220c9db..19f05fde8c0 100644 --- a/pkg/workflow/repository_features_validation_wasm.go +++ b/pkg/workflow/repository_features_validation_wasm.go @@ -1,5 +1,9 @@ //go:build js || wasm +// This file provides WASM/JS no-op stubs for repository features validation functions. +// The canonical (non-WASM) implementations live in repository_features_validation.go. +// If any function signatures change in repository_features_validation.go, this file must be updated to match. + package workflow type RepositoryFeatures struct { From e95fd4f16460a73a0692ab7379b1ce710899bfaa Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:42:58 +0000 Subject: [PATCH 3/3] docs(adr): add draft ADR-0001 for semantic function clustering file organization Generated by the Design Decision Gate workflow (run #24248236762). This draft ADR documents the architectural decision to organize Go source files by semantic responsibility cluster, covering the gateway_logs.go decomposition and pkg/workflow file consolidations in PR #25638. Requires author review, completion, and linking in the PR body before merge. --- ...clustering-go-package-file-organization.md | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 docs/adr/0001-semantic-function-clustering-go-package-file-organization.md diff --git a/docs/adr/0001-semantic-function-clustering-go-package-file-organization.md b/docs/adr/0001-semantic-function-clustering-go-package-file-organization.md new file mode 100644 index 00000000000..7460958772b --- /dev/null +++ b/docs/adr/0001-semantic-function-clustering-go-package-file-organization.md @@ -0,0 +1,81 @@ +# ADR-0001: Semantic Function Clustering for Go Package File Organization + +**Date**: 2026-04-10 +**Status**: Draft +**Deciders**: [TODO: verify — pelikhan and Copilot per PR #25638] + +--- + +## Part 1 — Narrative (Human-Friendly) + +### Context + +The `pkg/workflow` and `pkg/cli` packages in this repository accumulated two opposing structural problems over time: some concerns were spread too thinly (single-function files of ~11 lines each), while at least one file (`gateway_logs.go`, 1,332 lines) had grown into a monolith containing types, parsing, metrics computation, and rendering all in a single file. An automated semantic clustering analysis of the packages surfaced both patterns as maintainability risks — small files create noise and obscure shared infrastructure, while the large file creates high cognitive load and merge-conflict surface area. The goal was to apply a consistent file-organization principle across both cases without changing any logic. + +### Decision + +We will organize Go source files within a package by **semantic responsibility cluster**: files that share a single cohesive concern (a data layer, a processing stage, a set of parallel operations) are co-located, while files that have grown beyond a single concern are split at clean semantic boundaries. Concretely, this means merging files whose only content is a forwarding call to shared infrastructure (e.g., `missing_tool.go`, `missing_data.go`, `report_incomplete.go` → `missing_issue_reporting.go`), consolidating structurally parallel files that always change together (e.g., `add_labels.go` + `remove_labels.go` → `labels.go`), and decomposing monolithic files along their natural layer boundaries (types / parsing / metrics / rendering). WASM no-op stubs are annotated with explicit cross-reference comments pointing at the canonical non-WASM implementation to prevent signature drift. + +### Alternatives Considered + +#### Alternative 1: One File per Exported Symbol (Strict Single-Responsibility) + +Each exported function or type lives in its own file, named after the symbol. This is common in some Go codebases and makes it trivial to find a specific function. It was rejected because it exacerbates the exact problem the clustering analysis found in `pkg/workflow`: a proliferation of tiny files with no shared context, making package-level navigation harder rather than easier. It also makes it impossible to see relationships between closely related functions at a glance. + +#### Alternative 2: Single File per Package (Maximum Consolidation) + +The entire package lives in one file. This is practical only for very small packages and was not seriously considered for packages already above ~500 lines. A single file would recreate the `gateway_logs.go` monolith problem at the package level, with even more severe merge-conflict and navigation costs. + +#### Alternative 3: Maintain the Status Quo (No Reorganization) + +Leave files as-is and tolerate both the tiny-file and monolith problems until they cause a concrete bug or blocked review. This was rejected because the semantic clustering analysis provided an objective, reproducible signal that the existing organization was sub-optimal, and the refactoring cost was low (zero logic changes required). Deferring incurs ongoing maintenance friction with no offsetting benefit. + +### Consequences + +#### Positive +- Maintainers navigating `pkg/cli` now find types, parsing logic, metrics, and rendering in separate files — each file has a clear, single purpose. +- Related parallel operations (add/remove labels; missing-tool/missing-data/report-incomplete reporting) are co-located, making it easier to keep them consistent. +- WASM stub files carry explicit cross-reference comments, reducing the risk of stub signatures drifting from their canonical counterparts. +- Smaller, focused files reduce per-file merge-conflict surface area. + +#### Negative +- The split of `gateway_logs.go` into four files means cross-cutting concerns (e.g., a type used in both parsing and metrics) must be placed in `gateway_logs_types.go` even if the type is only incidentally shared; over time this file may accumulate types that no longer share a clear affinity. +- The file-naming convention (`gateway_logs_types.go`, `gateway_logs_parser.go`, etc.) relies on consistent prefix naming; violations of this convention are not enforced by the Go toolchain. +- Reviewers unfamiliar with the prior organization must read multiple files to trace the full gateway logs flow; the original monolith was self-contained. + +#### Neutral +- This ADR establishes an implicit convention for future decomposition of other large files in the repository. Teams should treat it as a reference pattern rather than a strict rule requiring ADRs for every subsequent file reorganization of similar scale. +- No build system, import, or API surface changes are required; this is a pure file-layout reorganization. + +--- + +## Part 2 — Normative Specification (RFC 2119) + +> The key words **MUST**, **MUST NOT**, **REQUIRED**, **SHALL**, **SHALL NOT**, **SHOULD**, **SHOULD NOT**, **RECOMMENDED**, **MAY**, and **OPTIONAL** in this section are to be interpreted as described in [RFC 2119](https://www.rfc-editor.org/rfc/rfc2119). + +### File Granularity + +1. A Go source file **MUST NOT** contain functions or types from more than one semantic responsibility cluster (e.g., a file **MUST NOT** mix data-type definitions with rendering logic). +2. A Go source file dedicated to a single forwarding call or delegating wrapper (where the file contains only a logger variable and one function that calls shared infrastructure) **SHOULD** be merged into the file that contains the shared infrastructure it wraps. +3. Structurally parallel files that always change together and share a common abstraction **SHOULD** be consolidated into a single file named after their shared concern. + +### Large File Decomposition + +1. A Go source file that exceeds 500 lines **SHOULD** be reviewed for decomposition along semantic layer boundaries. +2. When decomposing a large file, the resulting files **MUST** be named with a consistent shared prefix followed by a suffix that identifies the layer (e.g., `_types.go`, `_parser.go`, `_metrics.go`, `_render.go`). +3. All type definitions shared across the decomposed files **MUST** be placed in the `_types.go` file for that feature. +4. Decomposition **MUST NOT** change any function signatures, exported types, or observable behavior — it **MUST** be a pure reorganization. + +### WASM Stub Maintenance + +1. Every `*_wasm.go` stub file **MUST** contain a header comment that identifies the canonical (non-WASM) implementation file by name. +2. The header comment **SHOULD** include an explicit maintenance note stating that if function signatures change in the canonical file, the stub file must be updated to match. +3. WASM stub function signatures **MUST** exactly match the corresponding signatures in the canonical implementation. + +### Conformance + +An implementation is considered conformant with this ADR if it satisfies all **MUST** and **MUST NOT** requirements above. Specifically: files do not mix semantic clusters; decomposed file sets use consistent prefixed naming; type definitions are centralized in `_types.go` files; WASM stubs carry cross-reference comments and have matching signatures; and all reorganizations are pure (no logic changes). Failure to meet any **MUST** or **MUST NOT** requirement constitutes non-conformance. + +--- + +*This is a DRAFT ADR generated by the [Design Decision Gate](https://github.com/github/gh-aw/actions/runs/24248236762) workflow. The PR author must review, complete, and finalize this document before the PR can merge.*