diff --git a/fixtures/fixtures.go b/fixtures/fixtures.go index eeb1e6ae..c731e0fb 100644 --- a/fixtures/fixtures.go +++ b/fixtures/fixtures.go @@ -92,7 +92,7 @@ var ( OaiResponsesBlockingConversation []byte //go:embed openai/responses/blocking/http_error.txtar - OaiResponsesBlockingHttpErr []byte + OaiResponsesBlockingHTTPErr []byte //go:embed openai/responses/blocking/prev_response_id.txtar OaiResponsesBlockingPrevResponseID []byte @@ -139,7 +139,7 @@ var ( OaiResponsesStreamingConversation []byte //go:embed openai/responses/streaming/http_error.txtar - OaiResponsesStreamingHttpErr []byte + OaiResponsesStreamingHTTPErr []byte //go:embed openai/responses/streaming/prev_response_id.txtar OaiResponsesStreamingPrevResponseID []byte diff --git a/intercept/apidump/apidump.go b/intercept/apidump/apidump.go index bcc0cef0..d7a48cd3 100644 --- a/intercept/apidump/apidump.go +++ b/intercept/apidump/apidump.go @@ -130,21 +130,21 @@ func (d *dumper) dumpResponse(resp *http.Response) error { return xerrors.Errorf("write response header terminator: %w", err) } - // Wrap the response body to capture it as it streams - if resp.Body != nil { - resp.Body = &streamingBodyDumper{ - body: resp.Body, - dumpPath: dumpPath, - headerData: headerBuf.Bytes(), - logger: func(err error) { - d.logger.Named("apidump").Warn(context.Background(), "failed to initialize response dump", slog.Error(err)) - }, - } - } else { + if resp.Body == nil { // No body, just write headers return os.WriteFile(dumpPath, headerBuf.Bytes(), 0o644) } + // Wrap the response body to capture it as it streams + resp.Body = &streamingBodyDumper{ + body: resp.Body, + dumpPath: dumpPath, + headerData: headerBuf.Bytes(), + logger: func(err error) { + d.logger.Named("apidump").Warn(context.Background(), "failed to initialize response dump", slog.Error(err)) + }, + } + return nil } diff --git a/intercept/chatcompletions/base.go b/intercept/chatcompletions/base.go index e77c257c..a5dc34c9 100644 --- a/intercept/chatcompletions/base.go +++ b/intercept/chatcompletions/base.go @@ -115,7 +115,7 @@ func (i *interceptionBase) Model() string { return "coder-aibridge-unknown" } - return string(i.req.Model) + return i.req.Model } func (i *interceptionBase) newErrorResponse(err error) map[string]any { diff --git a/intercept/chatcompletions/paramswrap_test.go b/intercept/chatcompletions/paramswrap_test.go index 230664e4..eac38baa 100644 --- a/intercept/chatcompletions/paramswrap_test.go +++ b/intercept/chatcompletions/paramswrap_test.go @@ -144,7 +144,7 @@ func generatePayload(messageCount int) []byte { } // Use realistic message content size content := fmt.Sprintf("This is message number %d with some realistic content that might appear in a conversation.", i+1) - messages = append(messages, fmt.Sprintf(`{"role": "%s", "content": "%s"}`, role, content)) + messages = append(messages, fmt.Sprintf(`{"role": %q, "content": %q}`, role, content)) } return []byte(fmt.Sprintf(`{ diff --git a/intercept/chatcompletions/streaming.go b/intercept/chatcompletions/streaming.go index 97bf2161..ba685dda 100644 --- a/intercept/chatcompletions/streaming.go +++ b/intercept/chatcompletions/streaming.go @@ -190,16 +190,14 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re }) toolCall = nil - } else { + } else if stream.Err() == nil { // When the provider responds with only tool calls (no text content), // no chunks are relayed to the client, so the stream is not yet // initiated. Initiate it here so the SSE headers are sent and the // ping ticker is started, preventing client timeout during tool invocation. // Only initiate if no stream error, if there's an error, we'll return // an HTTP error response instead of starting an SSE stream. - if stream.Err() == nil { - events.InitiateStream(w) - } + events.InitiateStream(w) } } @@ -232,43 +230,43 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re }) } - if events.IsStreaming() { - // Check if the stream encountered any errors. - if streamErr := stream.Err(); streamErr != nil { - if eventstream.IsUnrecoverableError(streamErr) { - logger.Debug(ctx, "stream terminated", slog.Error(streamErr)) - // We can't reflect an error back if there's a connection error or the request context was canceled. - } else if oaiErr := getErrorResponse(streamErr); oaiErr != nil { - logger.Warn(ctx, "openai stream error", slog.Error(streamErr)) - interceptionErr = oaiErr - } else { - logger.Warn(ctx, "unknown error", slog.Error(streamErr)) - // Unfortunately, the OpenAI SDK does not support parsing errors received in the stream - // into known types (i.e. [shared.OverloadedError]). - // See https://github.com/openai/openai-go/blob/v2.7.0/packages/ssestream/ssestream.go#L171 - // All it does is wrap the payload in an error - which is all we can return, currently. - interceptionErr = newErrorResponse(xerrors.Errorf("unknown stream error: %w", streamErr)) - } - } else if lastErr != nil { - // Otherwise check if any logical errors occurred during processing. - logger.Warn(ctx, "stream failed", slog.Error(lastErr)) - interceptionErr = newErrorResponse(xerrors.Errorf("processing error: %w", lastErr)) - } - - if interceptionErr != nil { - payload, err := i.marshalErr(interceptionErr) - if err != nil { - logger.Warn(ctx, "failed to marshal error", slog.Error(err), slog.F("error_payload", slog.F("%+v", interceptionErr))) - } else if err := events.Send(streamCtx, payload); err != nil { - logger.Warn(ctx, "failed to relay error", slog.Error(err), slog.F("payload", payload)) - } - } - } else { + if !events.IsStreaming() { // response/downstream Stream has not started yet; write error response and exit. i.writeUpstreamError(w, getErrorResponse(stream.Err())) return stream.Err() } + // Check if the stream encountered any errors. + if streamErr := stream.Err(); streamErr != nil { + if eventstream.IsUnrecoverableError(streamErr) { + logger.Debug(ctx, "stream terminated", slog.Error(streamErr)) + // We can't reflect an error back if there's a connection error or the request context was canceled. + } else if oaiErr := getErrorResponse(streamErr); oaiErr != nil { + logger.Warn(ctx, "openai stream error", slog.Error(streamErr)) + interceptionErr = oaiErr + } else { + logger.Warn(ctx, "unknown error", slog.Error(streamErr)) + // Unfortunately, the OpenAI SDK does not support parsing errors received in the stream + // into known types (i.e. [shared.OverloadedError]). + // See https://github.com/openai/openai-go/blob/v2.7.0/packages/ssestream/ssestream.go#L171 + // All it does is wrap the payload in an error - which is all we can return, currently. + interceptionErr = newErrorResponse(xerrors.Errorf("unknown stream error: %w", streamErr)) + } + } else if lastErr != nil { + // Otherwise check if any logical errors occurred during processing. + logger.Warn(ctx, "stream failed", slog.Error(lastErr)) + interceptionErr = newErrorResponse(xerrors.Errorf("processing error: %w", lastErr)) + } + + if interceptionErr != nil { + payload, err := i.marshalErr(interceptionErr) + if err != nil { + logger.Warn(ctx, "failed to marshal error", slog.Error(err), slog.F("error_payload", slog.F("%+v", interceptionErr))) + } else if err := events.Send(streamCtx, payload); err != nil { + logger.Warn(ctx, "failed to relay error", slog.Error(err), slog.F("payload", payload)) + } + } + // No tool call, nothing more to do. if toolCall == nil { break diff --git a/intercept/eventstream/eventstream.go b/intercept/eventstream/eventstream.go index 562e385c..6eda409e 100644 --- a/intercept/eventstream/eventstream.go +++ b/intercept/eventstream/eventstream.go @@ -240,8 +240,7 @@ func flush(w http.ResponseWriter) (err error) { } defer func() { - if r := recover(); r != nil { - // Likely a broken connection, don't spam the logs. + if r := recover(); r != nil { //nolint:revive // Intentionally swallowed; likely a broken connection. } }() diff --git a/intercept/messages/base.go b/intercept/messages/base.go index b3aa40cc..5f1d3d22 100644 --- a/intercept/messages/base.go +++ b/intercept/messages/base.go @@ -183,17 +183,15 @@ func (i *interceptionBase) extractModelThoughts(msg *anthropic.Message) []*recor var thoughtRecords []*recorder.ModelThoughtRecord for _, block := range msg.Content { - switch variant := block.AsAny().(type) { - case anthropic.ThinkingBlock: - if variant.Thinking == "" { - continue - } - thoughtRecords = append(thoughtRecords, &recorder.ModelThoughtRecord{ - Content: variant.Thinking, - Metadata: recorder.Metadata{"source": recorder.ThoughtSourceThinking}, - }) - } // anthropic.RedactedThinkingBlock also exists, but there's nothing useful we can capture. + variant, ok := block.AsAny().(anthropic.ThinkingBlock) + if !ok || variant.Thinking == "" { + continue + } + thoughtRecords = append(thoughtRecords, &recorder.ModelThoughtRecord{ + Content: variant.Thinking, + Metadata: recorder.Metadata{"source": recorder.ThoughtSourceThinking}, + }) } return thoughtRecords } diff --git a/intercept/messages/streaming.go b/intercept/messages/streaming.go index 30f76274..3f2e9b1b 100644 --- a/intercept/messages/streaming.go +++ b/intercept/messages/streaming.go @@ -179,8 +179,7 @@ newStream: // Tool-related handling. switch event.Type { case string(constant.ValueOf[constant.ContentBlockStart]()): - switch block := event.AsContentBlockStart().ContentBlock.AsAny().(type) { - case anthropic.ToolUseBlock: + if block, ok := event.AsContentBlockStart().ContentBlock.AsAny().(anthropic.ToolUseBlock); ok { lastToolName = block.Name if i.mcpProxy != nil && i.mcpProxy.GetTool(block.Name) != nil { @@ -307,8 +306,7 @@ newStream: foundTools int ) for _, block := range message.Content { - switch variant := block.AsAny().(type) { - case anthropic.ToolUseBlock: + if variant, ok := block.AsAny().(anthropic.ToolUseBlock); ok { foundTools++ if variant.Name == name { input = variant.Input @@ -431,24 +429,23 @@ newStream: // Causes a new stream to be run with updated messages. isFirst = false continue newStream - } else { - // Find all the non-injected tools and track their uses. - for _, block := range message.Content { - switch variant := block.AsAny().(type) { - case anthropic.ToolUseBlock: - if i.mcpProxy != nil && i.mcpProxy.GetTool(variant.Name) != nil { - continue - } + } - _ = i.recorder.RecordToolUsage(streamCtx, &recorder.ToolUsageRecord{ - InterceptionID: i.ID().String(), - MsgID: message.ID, - ToolCallID: variant.ID, - Tool: variant.Name, - Args: variant.Input, - Injected: false, - }) + // Find all the non-injected tools and track their uses. + for _, block := range message.Content { + if variant, ok := block.AsAny().(anthropic.ToolUseBlock); ok { + if i.mcpProxy != nil && i.mcpProxy.GetTool(variant.Name) != nil { + continue } + + _ = i.recorder.RecordToolUsage(streamCtx, &recorder.ToolUsageRecord{ + InterceptionID: i.ID().String(), + MsgID: message.ID, + ToolCallID: variant.ID, + Tool: variant.Name, + Args: variant.Input, + Injected: false, + }) } } } @@ -464,11 +461,10 @@ newStream: if eventstream.IsUnrecoverableError(err) { logger.Debug(ctx, "processing terminated", slog.Error(err)) break // Stop processing if client disconnected or context canceled. - } else { - logger.Warn(ctx, "failed to relay event", slog.Error(err)) - lastErr = xerrors.Errorf("relay event: %w", err) - break } + logger.Warn(ctx, "failed to relay event", slog.Error(err)) + lastErr = xerrors.Errorf("relay event: %w", err) + break } } diff --git a/intercept/responses/base.go b/intercept/responses/base.go index 7d659507..f73e118b 100644 --- a/intercept/responses/base.go +++ b/intercept/responses/base.go @@ -222,15 +222,15 @@ func (i *responsesInterceptionBase) recordNonInjectedToolUsage(ctx context.Conte func (i *responsesInterceptionBase) parseFunctionCallJSONArgs(ctx context.Context, raw string) recorder.ToolArgs { trimmed := strings.TrimSpace(raw) - if trimmed != "" { - var args recorder.ToolArgs - if err := json.Unmarshal([]byte(trimmed), &args); err != nil { - i.logger.Warn(ctx, "failed to unmarshal tool args", slog.Error(err)) - } else { - return args - } + if trimmed == "" { + return trimmed + } + var args recorder.ToolArgs + if err := json.Unmarshal([]byte(trimmed), &args); err != nil { + i.logger.Warn(ctx, "failed to unmarshal tool args", slog.Error(err)) + return trimmed } - return trimmed + return args } func (i *responsesInterceptionBase) recordTokenUsage(ctx context.Context, response *responses.Response) { diff --git a/internal/integrationtest/circuit_breaker_test.go b/internal/integrationtest/circuit_breaker_test.go index 35beaee6..7072d1b0 100644 --- a/internal/integrationtest/circuit_breaker_test.go +++ b/internal/integrationtest/circuit_breaker_test.go @@ -28,11 +28,11 @@ const ( ) func anthropicSuccessResponse(model string) string { - return fmt.Sprintf(`{"id":"msg_01","type":"message","role":"assistant","content":[{"type":"text","text":"Hello!"}],"model":"%s","stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":5}}`, model) + return fmt.Sprintf(`{"id":"msg_01","type":"message","role":"assistant","content":[{"type":"text","text":"Hello!"}],"model":%q,"stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":5}}`, model) } func openAISuccessResponse(model string) string { - return fmt.Sprintf(`{"id":"chatcmpl-123","object":"chat.completion","created":1677652288,"model":"%s","choices":[{"index":0,"message":{"role":"assistant","content":"Hello!"},"finish_reason":"stop"}],"usage":{"prompt_tokens":9,"completion_tokens":12,"total_tokens":21}}`, model) + return fmt.Sprintf(`{"id":"chatcmpl-123","object":"chat.completion","created":1677652288,"model":%q,"choices":[{"index":0,"message":{"role":"assistant","content":"Hello!"},"finish_reason":"stop"}],"usage":{"prompt_tokens":9,"completion_tokens":12,"total_tokens":21}}`, model) } // TestCircuitBreaker_FullRecoveryCycle tests the complete circuit breaker lifecycle: @@ -555,7 +555,7 @@ func TestCircuitBreaker_PerModelIsolation(t *testing.T) { ) doRequest := func(model string) *http.Response { - body := fmt.Sprintf(`{"model":"%s","max_tokens":1024,"messages":[{"role":"user","content":"hi"}]}`, model) + body := fmt.Sprintf(`{"model":%q,"max_tokens":1024,"messages":[{"role":"user","content":"hi"}]}`, model) resp := bridgeServer.makeRequest(t, http.MethodPost, pathAnthropicMessages, []byte(body), http.Header{ "x-api-key": {"test"}, "anthropic-version": {"2023-06-01"}, diff --git a/internal/integrationtest/metrics_test.go b/internal/integrationtest/metrics_test.go index 6f1dae08..7e5bf2df 100644 --- a/internal/integrationtest/metrics_test.go +++ b/internal/integrationtest/metrics_test.go @@ -104,7 +104,7 @@ func TestMetrics_Interception(t *testing.T) { }, { name: "oai_responses_blocking_error", - fixture: fixtures.OaiResponsesBlockingHttpErr, + fixture: fixtures.OaiResponsesBlockingHTTPErr, path: pathOpenAIResponses, headers: http.Header{"User-Agent": []string{"codex/1.0.0"}}, expectStatus: metrics.InterceptionCountStatusFailed, @@ -127,7 +127,7 @@ func TestMetrics_Interception(t *testing.T) { }, { name: "oai_responses_streaming_error", - fixture: fixtures.OaiResponsesStreamingHttpErr, + fixture: fixtures.OaiResponsesStreamingHTTPErr, path: pathOpenAIResponses, headers: http.Header{"Originator": []string{"roo-code"}}, expectStatus: metrics.InterceptionCountStatusFailed, diff --git a/internal/integrationtest/trace_test.go b/internal/integrationtest/trace_test.go index b19707a0..913e4ed3 100644 --- a/internal/integrationtest/trace_test.go +++ b/internal/integrationtest/trace_test.go @@ -647,7 +647,7 @@ func TestTraceOpenAIErr(t *testing.T) { }, { name: "trace_openai_responses_streaming_http_error", - fixture: fixtures.OaiResponsesStreamingHttpErr, + fixture: fixtures.OaiResponsesStreamingHTTPErr, streaming: true, allowOverflow: true, // 429 error causes retries @@ -664,7 +664,7 @@ func TestTraceOpenAIErr(t *testing.T) { }, { name: "trace_openai_responses_blocking_http_error", - fixture: fixtures.OaiResponsesBlockingHttpErr, + fixture: fixtures.OaiResponsesBlockingHTTPErr, streaming: false, path: pathOpenAIResponses, diff --git a/internal/testutil/mockprovider.go b/internal/testutil/mockprovider.go index 6ef9175f..ebbd8370 100644 --- a/internal/testutil/mockprovider.go +++ b/internal/testutil/mockprovider.go @@ -11,17 +11,17 @@ import ( ) type MockProvider struct { - Name_ string + NameStr string URL string Bridged []string Passthrough []string InterceptorFunc func(w http.ResponseWriter, r *http.Request, tracer trace.Tracer) (intercept.Interceptor, error) } -func (m *MockProvider) Type() string { return m.Name_ } -func (m *MockProvider) Name() string { return m.Name_ } +func (m *MockProvider) Type() string { return m.NameStr } +func (m *MockProvider) Name() string { return m.NameStr } func (m *MockProvider) BaseURL() string { return m.URL } -func (m *MockProvider) RoutePrefix() string { return fmt.Sprintf("/%s", m.Name_) } +func (m *MockProvider) RoutePrefix() string { return fmt.Sprintf("/%s", m.NameStr) } func (m *MockProvider) BridgedRoutes() []string { return m.Bridged } func (m *MockProvider) PassthroughRoutes() []string { return m.Passthrough } func (m *MockProvider) AuthHeader() string { return "Authorization" } diff --git a/mcp/tool.go b/mcp/tool.go index 846928d8..70845c01 100644 --- a/mcp/tool.go +++ b/mcp/tool.go @@ -59,15 +59,15 @@ func (t *Tool) Call(ctx context.Context, input any, tracer trace.Tracer) (_ *mcp ctx, span := tracer.Start(ctx, "Intercept.ProcessRequest.ToolCall", trace.WithAttributes(spanAttrs...)) defer tracing.EndSpanErr(span, &outErr) - inputJson, err := json.Marshal(input) + inputJSON, err := json.Marshal(input) if err != nil { t.Logger.Warn(ctx, "failed to marshal tool input, will be omitted from span attrs", slog.Error(err)) } else { - strJson := string(inputJson) - if len(strJson) > maxSpanInputAttrLen { - strJson = strJson[:maxSpanInputAttrLen] + strJSON := string(inputJSON) + if len(strJSON) > maxSpanInputAttrLen { + strJSON = strJSON[:maxSpanInputAttrLen] } - span.SetAttributes(attribute.String(tracing.MCPInput, strJson)) + span.SetAttributes(attribute.String(tracing.MCPInput, strJSON)) } start := time.Now() @@ -88,7 +88,7 @@ func (t *Tool) Call(ctx context.Context, input any, tracer trace.Tracer) (_ *mcp logFn(ctx, "injected tool invoked", slog.F("name", t.Name), slog.F("server", t.ServerName), - slog.F("input", inputJson), + slog.F("input", inputJSON), slog.F("duration_sec", time.Since(start).Seconds()), slog.Error(outErr), ) diff --git a/metrics/metrics.go b/metrics/metrics.go index 6d14ab9d..ec2d182f 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -5,7 +5,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) -var baseLabels []string = []string{"provider", "model"} +var baseLabels = []string{"provider", "model"} const ( InterceptionCountStatusFailed = "failed" diff --git a/provider/anthropic.go b/provider/anthropic.go index e5132880..6a38f696 100644 --- a/provider/anthropic.go +++ b/provider/anthropic.go @@ -104,64 +104,63 @@ func (p *Anthropic) CreateInterceptor(w http.ResponseWriter, r *http.Request, tr defer tracing.EndSpanErr(span, &outErr) path := strings.TrimPrefix(r.URL.Path, p.RoutePrefix()) - switch path { - case routeMessages: - payload, err := io.ReadAll(r.Body) - if err != nil { - return nil, xerrors.Errorf("read body: %w", err) - } - - reqPayload, err := messages.NewMessagesRequestPayload(payload) - if err != nil { - return nil, xerrors.Errorf("unmarshal request body: %w", err) - } + if path != routeMessages { + span.SetStatus(codes.Error, "unknown route: "+r.URL.Path) + return nil, UnknownRoute + } - cfg := p.cfg - cfg.ExtraHeaders = extractAnthropicHeaders(r) - - // At this point the request contains only LLM provider headers. - // Any Coder-specific authentication has already been stripped. - // - // In centralized mode neither Authorization nor X-Api-Key is - // present, so cfg keeps the centralized key unchanged. - // - // In BYOK mode the user's LLM credentials survive intact. - // If X-Api-Key is present the user has a personal API key; - // overwrite the centralized key with it. If Authorization is - // present the user authenticated directly with provider; - // set BYOKBearerToken and clear the centralized key. - // When both are present, X-Api-Key takes priority to match - // claude-code behavior. - credKind := intercept.CredentialKindCentralized - credSecret := cfg.Key - authHeaderName := p.AuthHeader() - if apiKey := r.Header.Get("X-Api-Key"); apiKey != "" { - cfg.Key = apiKey - authHeaderName = "X-Api-Key" - credKind = intercept.CredentialKindBYOK - credSecret = apiKey - } else if token := utils.ExtractBearerToken(r.Header.Get("Authorization")); token != "" { - cfg.BYOKBearerToken = token - cfg.Key = "" - authHeaderName = "Authorization" - credKind = intercept.CredentialKindBYOK - credSecret = token - } + payload, err := io.ReadAll(r.Body) + if err != nil { + return nil, xerrors.Errorf("read body: %w", err) + } - cred := intercept.NewCredentialInfo(credKind, credSecret) + reqPayload, err := messages.NewMessagesRequestPayload(payload) + if err != nil { + return nil, xerrors.Errorf("unmarshal request body: %w", err) + } - var interceptor intercept.Interceptor - if reqPayload.Stream() { - interceptor = messages.NewStreamingInterceptor(id, reqPayload, p.Name(), cfg, p.bedrockCfg, r.Header, authHeaderName, tracer, cred) - } else { - interceptor = messages.NewBlockingInterceptor(id, reqPayload, p.Name(), cfg, p.bedrockCfg, r.Header, authHeaderName, tracer, cred) - } - span.SetAttributes(interceptor.TraceAttributes(r)...) - return interceptor, nil + cfg := p.cfg + cfg.ExtraHeaders = extractAnthropicHeaders(r) + + // At this point the request contains only LLM provider headers. + // Any Coder-specific authentication has already been stripped. + // + // In centralized mode neither Authorization nor X-Api-Key is + // present, so cfg keeps the centralized key unchanged. + // + // In BYOK mode the user's LLM credentials survive intact. + // If X-Api-Key is present the user has a personal API key; + // overwrite the centralized key with it. If Authorization is + // present the user authenticated directly with provider; + // set BYOKBearerToken and clear the centralized key. + // When both are present, X-Api-Key takes priority to match + // claude-code behavior. + credKind := intercept.CredentialKindCentralized + credSecret := cfg.Key + authHeaderName := p.AuthHeader() + if apiKey := r.Header.Get("X-Api-Key"); apiKey != "" { + cfg.Key = apiKey + authHeaderName = "X-Api-Key" + credKind = intercept.CredentialKindBYOK + credSecret = apiKey + } else if token := utils.ExtractBearerToken(r.Header.Get("Authorization")); token != "" { + cfg.BYOKBearerToken = token + cfg.Key = "" + authHeaderName = "Authorization" + credKind = intercept.CredentialKindBYOK + credSecret = token } - span.SetStatus(codes.Error, "unknown route: "+r.URL.Path) - return nil, UnknownRoute + cred := intercept.NewCredentialInfo(credKind, credSecret) + + var interceptor intercept.Interceptor + if reqPayload.Stream() { + interceptor = messages.NewStreamingInterceptor(id, reqPayload, p.Name(), cfg, p.bedrockCfg, r.Header, authHeaderName, tracer, cred) + } else { + interceptor = messages.NewBlockingInterceptor(id, reqPayload, p.Name(), cfg, p.bedrockCfg, r.Header, authHeaderName, tracer, cred) + } + span.SetAttributes(interceptor.TraceAttributes(r)...) + return interceptor, nil } func (p *Anthropic) BaseURL() string {