From f6f26ce860b3248347bbf86a1a0dc1717db7fd5f Mon Sep 17 00:00:00 2001 From: Saivedant Hava Date: Sat, 16 May 2026 21:04:50 -0400 Subject: [PATCH 1/2] Stop SSE filter from leaking tools/list on undecodable lines processSSEResponse used to write the entire raw upstream payload and return whenever a single SSE data line failed jsonrpc2.DecodeMessage or decoded to a non-Response message (e.g. a notifications/* frame). On a tools/list reply that meant every subsequent data line, including the real Response, reached the client unfiltered, silently bypassing the cedar authorization filter and producing the superfluous WriteHeader warning at response_filter.go:191. Treat undecodable or non-Response data lines as pass-through for that line only: fall through to the existing line writer instead of dumping rawResponse. The explicit WriteHeader calls go away with them, which also removes the double-header warning that surfaced the bug. Skipped filtering on tools/list now emits a WARN so future bypasses are visible in audit logs. Adds a table-driven regression test covering both branches (decode error and non-Response). It fails on the old code with the unfiltered admin_tool entry reaching the recorder. Closes #5257 --- pkg/authz/response_filter.go | 63 ++++++++------- pkg/authz/response_filter_test.go | 123 ++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+), 28 deletions(-) diff --git a/pkg/authz/response_filter.go b/pkg/authz/response_filter.go index e00dceadf5..6265ebda64 100644 --- a/pkg/authz/response_filter.go +++ b/pkg/authz/response_filter.go @@ -187,35 +187,42 @@ func (rfw *ResponseFilteringWriter) processSSEResponse(rawResponse []byte) error var written bool if data, ok := bytes.CutPrefix(line, []byte("data:")); ok { message, err := jsonrpc2.DecodeMessage(data) - if err != nil { - rfw.ResponseWriter.WriteHeader(rfw.statusCode) - _, err := rfw.ResponseWriter.Write(rawResponse) - return err - } - - response, ok := message.(*jsonrpc2.Response) - if !ok { - rfw.ResponseWriter.WriteHeader(rfw.statusCode) - _, err := rfw.ResponseWriter.Write(rawResponse) - return err - } - - filteredResponse, err := rfw.filterListResponse(response) - if err != nil { - return rfw.writeErrorResponse(response.ID, err) - } - - filteredData, err := jsonrpc2.EncodeMessage(filteredResponse) - if err != nil { - return rfw.writeErrorResponse(response.ID, err) + switch { + case err != nil: + // Pass this line through unfiltered. Earlier revisions wrote + // rawResponse and returned here, which leaked every subsequent + // data line on the stream past the filter (issue #5257). + if rfw.method == string(mcp.MethodToolsList) { + slog.Warn("SSE data line could not be decoded as JSON-RPC; passing through unfiltered", + "method", rfw.method, "error", err) + } + default: + if response, ok := message.(*jsonrpc2.Response); ok { + filteredResponse, err := rfw.filterListResponse(response) + if err != nil { + return rfw.writeErrorResponse(response.ID, err) + } + + filteredData, err := jsonrpc2.EncodeMessage(filteredResponse) + if err != nil { + return rfw.writeErrorResponse(response.ID, err) + } + + _, err = rfw.ResponseWriter.Write([]byte("data: " + string(filteredData) + "\n")) + if err != nil { + return fmt.Errorf("%w: %w", errBug, err) + } + + written = true + } else if rfw.method == string(mcp.MethodToolsList) { + // Non-Response message (e.g. a notifications/* frame + // interleaved on the stream). Pass through unfiltered for + // this line only; the next data line may still be the real + // tools/list response and must reach the filter. + slog.Warn("SSE data line was not a JSON-RPC Response; passing through unfiltered", + "method", rfw.method) + } } - - _, err = rfw.ResponseWriter.Write([]byte("data: " + string(filteredData) + "\n")) - if err != nil { - return fmt.Errorf("%w: %w", errBug, err) - } - - written = true } if !written { diff --git a/pkg/authz/response_filter_test.go b/pkg/authz/response_filter_test.go index f36c70e772..668761c9c2 100644 --- a/pkg/authz/response_filter_test.go +++ b/pkg/authz/response_filter_test.go @@ -863,3 +863,126 @@ func TestOptimizerPassThroughToolsInResponseFilter(t *testing.T) { "admin_tool has no permit policy and is not a pass-through tool") }) } + +// TestResponseFilteringWriter_SSE_PerLineFallthrough is a regression test for +// issue #5257: when an SSE upstream interleaves a non-Response data line (e.g. +// an MCP notification) or an undecodable data line with a real tools/list +// response, the filter previously wrote the entire raw upstream payload and +// returned, leaking the unfiltered tools/list past Cedar. It must instead pass +// only the offending line through and continue filtering the rest of the +// stream. +func TestResponseFilteringWriter_SSE_PerLineFallthrough(t *testing.T) { + t.Parallel() + + authorizer, err := cedar.NewCedarAuthorizer(cedar.ConfigOptions{ + Policies: []string{ + `permit(principal, action == Action::"call_tool", resource == Tool::"weather");`, + }, + EntitiesJSON: `[]`, + }, "") + require.NoError(t, err) + + identity := &auth.Identity{PrincipalInfo: auth.PrincipalInfo{ + Subject: "user1", + Claims: map[string]interface{}{"sub": "user1"}, + }} + + // Real tools/list response. The caller is only authorized for "weather", + // so a working filter must drop "admin_tool" from the response. + toolsResult := mcp.ListToolsResult{ + Tools: []mcp.Tool{ + {Name: "weather", Description: "Get weather information"}, + {Name: "admin_tool", Description: "Sensitive admin operations"}, + }, + } + resultJSON, err := json.Marshal(toolsResult) + require.NoError(t, err) + encodedResp, err := jsonrpc2.EncodeMessage(&jsonrpc2.Response{ + ID: jsonrpc2.Int64ID(1), + Result: json.RawMessage(resultJSON), + }) + require.NoError(t, err) + respLine := "data: " + string(encodedResp) + + testCases := []struct { + name string + precedingLines []string + }{ + { + name: "non-response data line before tools/list", + precedingLines: []string{ + // A notifications/* frame is a valid JSON-RPC notification + // (no id), so jsonrpc2.DecodeMessage returns a non-Response + // message. The buggy path treated this as a signal to dump + // rawResponse and return. + `data: {"jsonrpc":"2.0","method":"notifications/message","params":{"level":"info","data":"warming up"}}`, + }, + }, + { + name: "undecodable data line before tools/list", + precedingLines: []string{ + `data: this is not json at all`, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + req, err := http.NewRequest(http.MethodPost, "/messages", nil) + require.NoError(t, err) + req = req.WithContext(auth.WithIdentity(req.Context(), identity)) + + rr := httptest.NewRecorder() + rfw := NewResponseFilteringWriter(rr, authorizer, req, string(mcp.MethodToolsList), nil, nil) + rfw.ResponseWriter.Header().Set("Content-Type", "text/event-stream") + + body := strings.Join(append(tc.precedingLines, respLine, ""), "\n") + _, err = rfw.Write([]byte(body)) + require.NoError(t, err) + + require.NoError(t, rfw.FlushAndFilter()) + + out := rr.Body.String() + + // Each preceding line must still appear verbatim; pass-through + // is the whole point of the fix. + for _, pl := range tc.precedingLines { + assert.Contains(t, out, pl, "non-response/undecodable preceding line must pass through unchanged") + } + + // The real tools/list response must have been filtered. Pull the + // last data line out and decode it. + var filteredLine string + for _, line := range strings.Split(out, "\n") { + if strings.HasPrefix(line, "data: {\"jsonrpc\"") && strings.Contains(line, `"result"`) { + filteredLine = line + } + } + require.NotEmpty(t, filteredLine, "no JSON-RPC Response data line found in output") + + payload := strings.TrimPrefix(filteredLine, "data: ") + msg, err := jsonrpc2.DecodeMessage([]byte(payload)) + require.NoError(t, err) + resp, ok := msg.(*jsonrpc2.Response) + require.True(t, ok) + + var filtered mcp.ListToolsResult + require.NoError(t, json.Unmarshal(resp.Result, &filtered)) + + names := make([]string, len(filtered.Tools)) + for i, tool := range filtered.Tools { + names[i] = tool.Name + } + assert.Contains(t, names, "weather", "authorized tool must be retained") + assert.NotContains(t, names, "admin_tool", + "unauthorized tool must be filtered; presence indicates the cedar bypass from #5257 is back") + + // And the raw unfiltered payload (the bug used to dump it) + // must not appear in the wire output. + assert.NotContains(t, out, `"admin_tool"`, + "unfiltered tools/list payload leaked into SSE output") + }) + } +} From 3984262a521cc93dbf9fa42b773a0d3a1e71597c Mon Sep 17 00:00:00 2001 From: Saivedant Hava Date: Wed, 20 May 2026 09:41:19 -0400 Subject: [PATCH 2/2] Broaden SSE filter logging and tests across list methods Address review feedback on #5304: The WARN log paths in processSSEResponse were gated on the tools/list method, but the underlying bypass applies to every method routed through requiresResponseFiltering (tools/list, prompts/list, resources/list, find_tool). Drop the gating so the WARN fires for any of them. The regression test only exercised tools/list. Restructure it into a method-by-preceding-line table so the same notification and undecodable line cases run against prompts/list and resources/list as well. Verified the WARN now logs for all three methods and the unauthorized entry never reaches the recorder. --- pkg/authz/response_filter.go | 16 +- pkg/authz/response_filter_test.go | 242 +++++++++++++++++++----------- 2 files changed, 167 insertions(+), 91 deletions(-) diff --git a/pkg/authz/response_filter.go b/pkg/authz/response_filter.go index 6265ebda64..02ff1d76d9 100644 --- a/pkg/authz/response_filter.go +++ b/pkg/authz/response_filter.go @@ -191,11 +191,12 @@ func (rfw *ResponseFilteringWriter) processSSEResponse(rawResponse []byte) error case err != nil: // Pass this line through unfiltered. Earlier revisions wrote // rawResponse and returned here, which leaked every subsequent - // data line on the stream past the filter (issue #5257). - if rfw.method == string(mcp.MethodToolsList) { - slog.Warn("SSE data line could not be decoded as JSON-RPC; passing through unfiltered", - "method", rfw.method, "error", err) - } + // data line on the stream past the filter (issue #5257). The + // WARN fires for every filtered method (tools/list, + // prompts/list, resources/list, find_tool) because the bypass + // applies equally to all of them. + slog.Warn("SSE data line could not be decoded as JSON-RPC; passing through unfiltered", + "method", rfw.method, "error", err) default: if response, ok := message.(*jsonrpc2.Response); ok { filteredResponse, err := rfw.filterListResponse(response) @@ -214,11 +215,12 @@ func (rfw *ResponseFilteringWriter) processSSEResponse(rawResponse []byte) error } written = true - } else if rfw.method == string(mcp.MethodToolsList) { + } else { // Non-Response message (e.g. a notifications/* frame // interleaved on the stream). Pass through unfiltered for // this line only; the next data line may still be the real - // tools/list response and must reach the filter. + // response and must reach the filter. Logs at WARN for + // every filtered method, not just tools/list. slog.Warn("SSE data line was not a JSON-RPC Response; passing through unfiltered", "method", rfw.method) } diff --git a/pkg/authz/response_filter_test.go b/pkg/authz/response_filter_test.go index 668761c9c2..d70986ddb8 100644 --- a/pkg/authz/response_filter_test.go +++ b/pkg/authz/response_filter_test.go @@ -866,17 +866,22 @@ func TestOptimizerPassThroughToolsInResponseFilter(t *testing.T) { // TestResponseFilteringWriter_SSE_PerLineFallthrough is a regression test for // issue #5257: when an SSE upstream interleaves a non-Response data line (e.g. -// an MCP notification) or an undecodable data line with a real tools/list -// response, the filter previously wrote the entire raw upstream payload and -// returned, leaking the unfiltered tools/list past Cedar. It must instead pass -// only the offending line through and continue filtering the rest of the -// stream. +// an MCP notification) or an undecodable data line with a real list response, +// the filter previously wrote the entire raw upstream payload and returned, +// leaking the unfiltered list past Cedar. It must instead pass only the +// offending line through and continue filtering the rest of the stream. +// +// The same code path runs for every method covered by +// requiresResponseFiltering, so each of tools/list, prompts/list, and +// resources/list is exercised below. func TestResponseFilteringWriter_SSE_PerLineFallthrough(t *testing.T) { t.Parallel() authorizer, err := cedar.NewCedarAuthorizer(cedar.ConfigOptions{ Policies: []string{ `permit(principal, action == Action::"call_tool", resource == Tool::"weather");`, + `permit(principal, action == Action::"get_prompt", resource == Prompt::"greeting");`, + `permit(principal, action == Action::"read_resource", resource == Resource::"data");`, }, EntitiesJSON: `[]`, }, "") @@ -887,102 +892,171 @@ func TestResponseFilteringWriter_SSE_PerLineFallthrough(t *testing.T) { Claims: map[string]interface{}{"sub": "user1"}, }} - // Real tools/list response. The caller is only authorized for "weather", - // so a working filter must drop "admin_tool" from the response. - toolsResult := mcp.ListToolsResult{ - Tools: []mcp.Tool{ - {Name: "weather", Description: "Get weather information"}, - {Name: "admin_tool", Description: "Sensitive admin operations"}, - }, + // encodeListResponse marshals a list result type into a JSON-RPC Response + // data line. + encodeListResponse := func(t *testing.T, result interface{}) string { + t.Helper() + resultJSON, err := json.Marshal(result) + require.NoError(t, err) + encoded, err := jsonrpc2.EncodeMessage(&jsonrpc2.Response{ + ID: jsonrpc2.Int64ID(1), + Result: json.RawMessage(resultJSON), + }) + require.NoError(t, err) + return "data: " + string(encoded) } - resultJSON, err := json.Marshal(toolsResult) - require.NoError(t, err) - encodedResp, err := jsonrpc2.EncodeMessage(&jsonrpc2.Response{ - ID: jsonrpc2.Int64ID(1), - Result: json.RawMessage(resultJSON), - }) - require.NoError(t, err) - respLine := "data: " + string(encodedResp) - testCases := []struct { - name string - precedingLines []string - }{ + // methodCase describes how to build a filterable response for one MCP + // list method and how to read the filtered names out of the wire output. + type methodCase struct { + name string + method string + respLine string + authorizedName string + unauthorizedName string + extractNames func(t *testing.T, result json.RawMessage) []string + } + + methodCases := []methodCase{ { - name: "non-response data line before tools/list", - precedingLines: []string{ - // A notifications/* frame is a valid JSON-RPC notification - // (no id), so jsonrpc2.DecodeMessage returns a non-Response - // message. The buggy path treated this as a signal to dump - // rawResponse and return. - `data: {"jsonrpc":"2.0","method":"notifications/message","params":{"level":"info","data":"warming up"}}`, + name: "tools/list", + method: string(mcp.MethodToolsList), + respLine: encodeListResponse(t, mcp.ListToolsResult{ + Tools: []mcp.Tool{ + {Name: "weather", Description: "Get weather information"}, + {Name: "admin_tool", Description: "Sensitive admin operations"}, + }, + }), + authorizedName: "weather", + unauthorizedName: "admin_tool", + extractNames: func(t *testing.T, result json.RawMessage) []string { + t.Helper() + var r mcp.ListToolsResult + require.NoError(t, json.Unmarshal(result, &r)) + names := make([]string, len(r.Tools)) + for i, tool := range r.Tools { + names[i] = tool.Name + } + return names + }, + }, + { + name: "prompts/list", + method: string(mcp.MethodPromptsList), + respLine: encodeListResponse(t, mcp.ListPromptsResult{ + Prompts: []mcp.Prompt{ + {Name: "greeting", Description: "Generate greetings"}, + {Name: "admin_prompt", Description: "Sensitive admin prompt"}, + }, + }), + authorizedName: "greeting", + unauthorizedName: "admin_prompt", + extractNames: func(t *testing.T, result json.RawMessage) []string { + t.Helper() + var r mcp.ListPromptsResult + require.NoError(t, json.Unmarshal(result, &r)) + names := make([]string, len(r.Prompts)) + for i, p := range r.Prompts { + names[i] = p.Name + } + return names }, }, { - name: "undecodable data line before tools/list", - precedingLines: []string{ - `data: this is not json at all`, + name: "resources/list", + method: string(mcp.MethodResourcesList), + respLine: encodeListResponse(t, mcp.ListResourcesResult{ + Resources: []mcp.Resource{ + {URI: "data", Name: "Data Resource"}, + {URI: "secret", Name: "Sensitive Resource"}, + }, + }), + authorizedName: "data", + unauthorizedName: "secret", + extractNames: func(t *testing.T, result json.RawMessage) []string { + t.Helper() + var r mcp.ListResourcesResult + require.NoError(t, json.Unmarshal(result, &r)) + names := make([]string, len(r.Resources)) + for i, res := range r.Resources { + names[i] = res.URI + } + return names }, }, } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - req, err := http.NewRequest(http.MethodPost, "/messages", nil) - require.NoError(t, err) - req = req.WithContext(auth.WithIdentity(req.Context(), identity)) - - rr := httptest.NewRecorder() - rfw := NewResponseFilteringWriter(rr, authorizer, req, string(mcp.MethodToolsList), nil, nil) - rfw.ResponseWriter.Header().Set("Content-Type", "text/event-stream") + precedingLineCases := []struct { + name string + line string + }{ + { + name: "non-response data line", + // A notifications/* frame is a valid JSON-RPC notification + // (no id), so jsonrpc2.DecodeMessage returns a non-Response + // message. The buggy path treated this as a signal to dump + // rawResponse and return. + line: `data: {"jsonrpc":"2.0","method":"notifications/message","params":{"level":"info","data":"warming up"}}`, + }, + { + name: "undecodable data line", + line: `data: this is not json at all`, + }, + } - body := strings.Join(append(tc.precedingLines, respLine, ""), "\n") - _, err = rfw.Write([]byte(body)) - require.NoError(t, err) + for _, mc := range methodCases { + for _, plc := range precedingLineCases { + mc, plc := mc, plc + t.Run(mc.name+"/"+plc.name, func(t *testing.T) { + t.Parallel() - require.NoError(t, rfw.FlushAndFilter()) + req, err := http.NewRequest(http.MethodPost, "/messages", nil) + require.NoError(t, err) + req = req.WithContext(auth.WithIdentity(req.Context(), identity)) - out := rr.Body.String() + rr := httptest.NewRecorder() + rfw := NewResponseFilteringWriter(rr, authorizer, req, mc.method, nil, nil) + rfw.ResponseWriter.Header().Set("Content-Type", "text/event-stream") - // Each preceding line must still appear verbatim; pass-through - // is the whole point of the fix. - for _, pl := range tc.precedingLines { - assert.Contains(t, out, pl, "non-response/undecodable preceding line must pass through unchanged") - } + body := strings.Join([]string{plc.line, mc.respLine, ""}, "\n") + _, err = rfw.Write([]byte(body)) + require.NoError(t, err) - // The real tools/list response must have been filtered. Pull the - // last data line out and decode it. - var filteredLine string - for _, line := range strings.Split(out, "\n") { - if strings.HasPrefix(line, "data: {\"jsonrpc\"") && strings.Contains(line, `"result"`) { - filteredLine = line - } - } - require.NotEmpty(t, filteredLine, "no JSON-RPC Response data line found in output") + require.NoError(t, rfw.FlushAndFilter()) - payload := strings.TrimPrefix(filteredLine, "data: ") - msg, err := jsonrpc2.DecodeMessage([]byte(payload)) - require.NoError(t, err) - resp, ok := msg.(*jsonrpc2.Response) - require.True(t, ok) + out := rr.Body.String() - var filtered mcp.ListToolsResult - require.NoError(t, json.Unmarshal(resp.Result, &filtered)) + // The preceding line must still appear verbatim; pass-through + // is the whole point of the fix. + assert.Contains(t, out, plc.line, + "non-response/undecodable preceding line must pass through unchanged") - names := make([]string, len(filtered.Tools)) - for i, tool := range filtered.Tools { - names[i] = tool.Name - } - assert.Contains(t, names, "weather", "authorized tool must be retained") - assert.NotContains(t, names, "admin_tool", - "unauthorized tool must be filtered; presence indicates the cedar bypass from #5257 is back") - - // And the raw unfiltered payload (the bug used to dump it) - // must not appear in the wire output. - assert.NotContains(t, out, `"admin_tool"`, - "unfiltered tools/list payload leaked into SSE output") - }) + // The real list response must have been filtered. Pull the + // last JSON-RPC Response data line out and decode it. + var filteredLine string + for _, line := range strings.Split(out, "\n") { + if strings.HasPrefix(line, "data: {\"jsonrpc\"") && strings.Contains(line, `"result"`) { + filteredLine = line + } + } + require.NotEmpty(t, filteredLine, "no JSON-RPC Response data line found in output") + + payload := strings.TrimPrefix(filteredLine, "data: ") + msg, err := jsonrpc2.DecodeMessage([]byte(payload)) + require.NoError(t, err) + resp, ok := msg.(*jsonrpc2.Response) + require.True(t, ok) + + names := mc.extractNames(t, resp.Result) + assert.Contains(t, names, mc.authorizedName, "authorized entry must be retained") + assert.NotContains(t, names, mc.unauthorizedName, + "unauthorized entry must be filtered; presence indicates the cedar bypass from #5257 is back") + + // And the raw unfiltered payload (the bug used to dump it) + // must not appear in the wire output. + assert.NotContains(t, out, `"`+mc.unauthorizedName+`"`, + "unfiltered list payload leaked into SSE output") + }) + } } }