Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions bridge_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,119 @@ func TestOpenAIChatCompletions(t *testing.T) {
})
}
})

t.Run("streaming injected tool call edge cases", func(t *testing.T) {
t.Parallel()

cases := []struct {
name string
fixture []byte
expectedArgs map[string]any
}{
{
name: "tool call no preamble",
fixture: fixtures.OaiChatStreamingInjectedToolNoPreamble,
expectedArgs: map[string]any{"owner": "me"},
},
{
name: "tool call with non-zero index",
fixture: fixtures.OaiChatStreamingInjectedToolNonzeroIndex,
expectedArgs: nil, // No arguments in this fixture
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

arc := txtar.Parse(tc.fixture)
t.Logf("%s: %s", t.Name(), arc.Comment)

files := filesMap(arc)
require.Len(t, files, 3)
require.Contains(t, files, fixtureRequest)
require.Contains(t, files, fixtureStreamingResponse)
require.Contains(t, files, fixtureStreamingToolResponse)

reqBody := files[fixtureRequest]

// Add the stream param to the request.
newBody, err := setJSON(reqBody, "stream", true)
require.NoError(t, err)
reqBody = newBody

ctx, cancel := context.WithTimeout(t.Context(), time.Second*30)
t.Cleanup(cancel)

// Setup mock server with response mutator for multi-turn interaction.
srv := newMockServer(ctx, t, files, func(reqCount uint32, resp []byte) []byte {
if reqCount == 1 {
// First request gets the tool call response
return resp
}
// Second request gets final response
return files[fixtureStreamingToolResponse]
})
t.Cleanup(srv.Close)

recorderClient := &testutil.MockRecorder{}

// Setup MCP proxies with the tool from the fixture
mcpProxiers, mcpCalls := setupMCPServerProxiesForTest(t, testTracer)
mcpMgr := mcp.NewServerProxyManager(mcpProxiers, testTracer)
require.NoError(t, mcpMgr.Init(ctx))

logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: false}).Leveled(slog.LevelDebug)
providers := []aibridge.Provider{provider.NewOpenAI(openaiCfg(srv.URL, apiKey))}
b, err := aibridge.NewRequestBridge(t.Context(), providers, recorderClient, mcpMgr, logger, nil, testTracer)
require.NoError(t, err)

mockSrv := httptest.NewUnstartedServer(b)
t.Cleanup(mockSrv.Close)
mockSrv.Config.BaseContext = func(_ net.Listener) context.Context {
return aibcontext.AsActor(ctx, userID, nil)
}
mockSrv.Start()

req := createOpenAIChatCompletionsReq(t, mockSrv.URL, reqBody)

client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

// Verify SSE headers are sent correctly
require.Equal(t, "text/event-stream", resp.Header.Get("Content-Type"))
require.Equal(t, "no-cache", resp.Header.Get("Cache-Control"))
require.Equal(t, "keep-alive", resp.Header.Get("Connection"))

// Consume the full response body to ensure the interception completes
_, err = io.ReadAll(resp.Body)
require.NoError(t, err)
resp.Body.Close()

// Verify the MCP tool was actually invoked
invocations := mcpCalls.getCallsByTool(mockToolName)
require.Len(t, invocations, 1, "expected MCP tool to be invoked")

// Verify tool was invoked with the expected args (if specified)
if tc.expectedArgs != nil {
expected, err := json.Marshal(tc.expectedArgs)
require.NoError(t, err)
actual, err := json.Marshal(invocations[0])
require.NoError(t, err)
require.EqualValues(t, expected, actual)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing else check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not missing, for the non-zero index fixture expectedArgs is nil, because it doesn't include arguments (the owner parameter is optional and Coder defaults to "me", additionally providers don't always include optional arguments). The test only validates arguments for the "tool call no preamble" fixture that includes an argument.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ment to ask if something not being set should be checked in else case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, in this if case, we only check for the tool call arguments, if the fixture only has a tool call without arguments, we don't check anything.


// Verify tool usage was recorded
toolUsages := recorderClient.RecordedToolUsages()
require.Len(t, toolUsages, 1)
assert.Equal(t, mockToolName, toolUsages[0].Tool)

recorderClient.VerifyAllInterceptionsEnded(t)
})
}
})
}

func TestSimple(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ var (

//go:embed openai/chatcompletions/non_stream_error.txtar
OaiChatNonStreamError []byte

//go:embed openai/chatcompletions/streaming_injected_tool_no_preamble.txtar
OaiChatStreamingInjectedToolNoPreamble []byte

//go:embed openai/chatcompletions/streaming_injected_tool_nonzero_index.txtar
OaiChatStreamingInjectedToolNonzeroIndex []byte
)

var (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
Streaming response where the provider returns an injected tool call as the first chunk with no text preamble.
This test ensures tool invocation continues even when no chunks are relayed to the client.

-- request --
{
"messages": [
{
"content": "<current_datetime>2026-01-22T18:35:17.612Z</current_datetime>\n\nlist all my coder workspaces",
"role": "user"
}
],
"model": "claude-haiku-4.5",
"n": 1,
"temperature": 1,
"parallel_tool_calls": false,
"stream_options": {
"include_usage": true
},
"stream": true
}

-- streaming --
data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"name":"bmcp_coder_coder_list_workspaces"},"id":"toolu_vrtx_01CvBi1d4qpKTG2PCuc9wDbZ","index":0,"type":"function"}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":""},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":"{\"own"},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":"er\": \"me\"}"},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}

data: {"choices":[{"finish_reason":"tool_calls","index":0,"delta":{"content":null}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","usage":{"completion_tokens":65,"prompt_tokens":25716,"prompt_tokens_details":{"cached_tokens":20470},"total_tokens":25781},"model":"claude-haiku-4.5"}

data: [DONE]


-- streaming/tool-call --
data: {"choices":[{"index":0,"delta":{"content":"You","role":"assistant"}}],"created":1769198061,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" have one","role":"assistant"}}],"created":1769198061,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" Coder workspace:","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"\n\n**test-scf** (","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"ID: a174a2e5","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"-5050-445d-89","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"ff-dd720e5b442","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"e)\n- Template: docker","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"\n- Template Version","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" ID","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":": ad1b5ab1-","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"fc18-4792-84f","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"7-797787607d30","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"\n- Status","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":": Up","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" to date","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"finish_reason":"stop","index":0,"delta":{"content":null}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","usage":{"completion_tokens":85,"prompt_tokens":25989,"prompt_tokens_details":{"cached_tokens":0},"total_tokens":26074},"model":"claude-haiku-4.5"}

data: [DONE]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streaming fixtures may need 2 new lines at the end for the stream to be properly parsed by SDK. See: https://github.com/coder/aibridge/pull/123/changes#r2698610345

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch; we should add a linter for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had no problems with the tests, but added the newline: 734ddda



Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
Streaming response where the provider returns text content followed by an injected tool call at index 1 (instead of index 0).
This can happen when the provider incorrectly continues indexing from a previous response.
This tests that nil entries are removed from the tool calls array caused by non-zero starting indices.

-- request --
{
"messages": [
{
"content": "<current_datetime>2026-01-23T20:22:43.781Z</current_datetime>\n\nI want you to do to this in order:\n1) create a file in my current directory with name \"test.txt\"\n2) list all my coder workspaces",
"role": "user"
}
],
"model": "claude-haiku-4.5",
"n": 1,
"temperature": 1,
"parallel_tool_calls": false,
"stream_options": {
"include_usage": true
},
"stream": true
}

-- streaming --
data: {"choices":[{"index":0,"delta":{"content":"Now","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" listing","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" your","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" C","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"oder workspaces:","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"name":"bmcp_coder_coder_list_workspaces"},"id":"toolu_vrtx_01DbFqUgk6aAtJ4nDBqzFWDF","index":1,"type":"function"}]}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":""},"index":1}]}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}

data: {"choices":[{"finish_reason":"tool_calls","index":0,"delta":{"content":null}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","usage":{"completion_tokens":58,"prompt_tokens":25939,"prompt_tokens_details":{"cached_tokens":25429},"total_tokens":25997},"model":"claude-haiku-4.5"}

data: [DONE]


-- streaming/tool-call --
data: {"choices":[{"index":0,"delta":{"content":"Done","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"! I create","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"d `","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"test.txt` in","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" your current directory.","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" You","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" have","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" 1","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" ","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"Coder workspace:\n\n-","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" **test-scf** (docker","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" template)","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"finish_reason":"stop","index":0,"delta":{"content":null}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","usage":{"completion_tokens":39,"prompt_tokens":26166,"prompt_tokens_details":{"cached_tokens":25934},"total_tokens":26205},"model":"claude-haiku-4.5"}

data: [DONE]


51 changes: 40 additions & 11 deletions intercept/chatcompletions/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"net/http"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -148,16 +149,28 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re
}
}

// Builtin tools are not intercepted.
if toolCall != nil && i.getInjectedToolByName(toolCall.Name) == nil {
_ = i.recorder.RecordToolUsage(streamCtx, &recorder.ToolUsageRecord{
InterceptionID: i.ID().String(),
MsgID: processor.getMsgID(),
Tool: toolCall.Name,
Args: i.unmarshalArgs(toolCall.Arguments),
Injected: false,
})
toolCall = nil
if toolCall != nil {
// Builtin tools are not intercepted.
if i.getInjectedToolByName(toolCall.Name) == nil {
_ = i.recorder.RecordToolUsage(streamCtx, &recorder.ToolUsageRecord{
InterceptionID: i.ID().String(),
MsgID: processor.getMsgID(),
Tool: toolCall.Name,
Args: i.unmarshalArgs(toolCall.Arguments),
Injected: false,
})
toolCall = nil
} else {
// When the provider responds with only tool calls (no text content),
// no chunks are relayed to the client, so the stream is not yet
// initiated. Initiate it here so the SSE headers are sent and the
// ping ticker is started, preventing client timeout during tool invocation.
// Only initiate if no stream error, if there's an error, we'll return
// an HTTP error response instead of starting an SSE stream.
if stream.Err() == nil {
events.MarkInitiated(w)
}
}
}

if prompt != nil {
Expand Down Expand Up @@ -239,7 +252,13 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re

// Invoke the injected tool, and use the tool result to make a subsequent request to the upstream.
// Append the completion from this stream as context.
i.req.Messages = append(i.req.Messages, processor.getLastCompletion().ToParam())
// Some providers may return tool calls with non-zero starting indices,
// resulting in nil entries in the array that must be removed.
Comment on lines +255 to +256
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Move this comment above compactToolCalls to be more clear.

completion := processor.getLastCompletion()
if completion != nil {
compactToolCalls(completion)
i.req.Messages = append(i.req.Messages, completion.ToParam())
}

id := toolCall.ID
args := i.unmarshalArgs(toolCall.Arguments)
Expand Down Expand Up @@ -486,3 +505,13 @@ func (s *streamProcessor) getLastUsage() openai.CompletionUsage {
func (s *streamProcessor) getCumulativeUsage() openai.CompletionUsage {
return s.cumulativeUsage
}

// compactToolCalls removes nil/empty tool call entries (without an ID).
func compactToolCalls(msg *openai.ChatCompletionMessage) {
if msg == nil || len(msg.ToolCalls) == 0 {
return
}
msg.ToolCalls = slices.DeleteFunc(msg.ToolCalls, func(tc openai.ChatCompletionMessageToolCallUnion) bool {
return tc.ID == ""
})
}
Loading