Skip to content

Commit 12e3143

Browse files
committed
fix: handle chatcompletions streaming tool calls with no text preamble and non-zero indices
1 parent a127009 commit 12e3143

5 files changed

Lines changed: 157 additions & 1 deletion

File tree

bridge_integration_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,78 @@ func TestOpenAIChatCompletions(t *testing.T) {
437437
})
438438
}
439439
})
440+
441+
t.Run("streaming injected tool call only", func(t *testing.T) {
442+
t.Parallel()
443+
444+
arc := txtar.Parse(fixtures.OaiChatStreamingToolCallOnly)
445+
t.Logf("%s: %s", t.Name(), arc.Comment)
446+
447+
files := filesMap(arc)
448+
require.Len(t, files, 2)
449+
require.Contains(t, files, fixtureRequest)
450+
require.Contains(t, files, fixtureStreamingResponse)
451+
452+
reqBody := files[fixtureRequest]
453+
454+
// Add the stream param to the request.
455+
newBody, err := setJSON(reqBody, "stream", true)
456+
require.NoError(t, err)
457+
reqBody = newBody
458+
459+
ctx, cancel := context.WithTimeout(t.Context(), time.Second*30)
460+
t.Cleanup(cancel)
461+
srv := newMockServer(ctx, t, files, nil)
462+
t.Cleanup(srv.Close)
463+
464+
fmt.Printf("########## Fixture streaming content length: %d\n", len(files[fixtureStreamingResponse]))
465+
fmt.Printf("########## Fixture streaming content:\n%s\n", string(files[fixtureStreamingResponse]))
466+
467+
recorderClient := &testutil.MockRecorder{}
468+
469+
// Setup MCP proxies with the tool from the fixture
470+
mcpProxiers, mcpCalls := setupMCPServerProxiesForTest(t, testTracer)
471+
mcpMgr := mcp.NewServerProxyManager(mcpProxiers, testTracer)
472+
require.NoError(t, mcpMgr.Init(ctx))
473+
474+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: false}).Leveled(slog.LevelDebug)
475+
providers := []aibridge.Provider{provider.NewOpenAI(openaiCfg(srv.URL, apiKey))}
476+
b, err := aibridge.NewRequestBridge(t.Context(), providers, recorderClient, mcpMgr, logger, nil, testTracer)
477+
require.NoError(t, err)
478+
479+
mockSrv := httptest.NewUnstartedServer(b)
480+
t.Cleanup(mockSrv.Close)
481+
mockSrv.Config.BaseContext = func(_ net.Listener) context.Context {
482+
return aibcontext.AsActor(ctx, userID, nil)
483+
}
484+
mockSrv.Start()
485+
486+
req := createOpenAIChatCompletionsReq(t, mockSrv.URL, reqBody)
487+
488+
client := &http.Client{}
489+
resp, err := client.Do(req)
490+
require.NoError(t, err)
491+
require.Equal(t, http.StatusOK, resp.StatusCode)
492+
defer resp.Body.Close()
493+
494+
// Verify the MCP tool was actually invoked
495+
invocations := mcpCalls.getCallsByTool(mockToolName)
496+
require.Len(t, invocations, 1, "expected MCP tool to be invoked")
497+
498+
// Verify tool was invoked with the args from the fixture
499+
expected, err := json.Marshal(map[string]any{"owner": "me"})
500+
require.NoError(t, err)
501+
actual, err := json.Marshal(invocations[0])
502+
require.NoError(t, err)
503+
require.EqualValues(t, expected, actual)
504+
505+
// Verify tool usage was recorded
506+
toolUsages := recorderClient.RecordedToolUsages()
507+
require.Len(t, toolUsages, 1)
508+
assert.Equal(t, mockToolName, toolUsages[0].Tool)
509+
510+
recorderClient.VerifyAllInterceptionsEnded(t)
511+
})
440512
}
441513

442514
func TestSimple(t *testing.T) {

fixtures/fixtures.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ var (
4545

4646
//go:embed openai/chatcompletions/non_stream_error.txtar
4747
OaiChatNonStreamError []byte
48+
49+
//go:embed openai/chatcompletions/tool_call_only.txtar
50+
OaiChatStreamingToolCallOnly []byte
4851
)
4952

5053
var (
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
First chunk as a tool call.
2+
Copilot error: `Execution failed: Error: request ended without sending any chunks`
3+
4+
-- request --
5+
{
6+
"messages": [
7+
{
8+
"content": "<current_datetime>2026-01-22T18:35:17.612Z</current_datetime>\n\nlist all my coder workspaces",
9+
"role": "user"
10+
}
11+
],
12+
"model": "claude-haiku-4.5",
13+
"n": 1,
14+
"temperature": 1,
15+
"parallel_tool_calls": false,
16+
"stream_options": {
17+
"include_usage": true
18+
},
19+
"stream": true
20+
}
21+
22+
-- streaming --
23+
data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"name":"bmcp_coder_coder_list_workspaces"},"id":"toolu_vrtx_01CvBi1d4qpKTG2PCuc9wDbZ","index":0,"type":"function"}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}
24+
25+
data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":""},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}
26+
27+
data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":"{\"own"},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}
28+
29+
data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":"er\": \"me\"}"},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}
30+
31+
data: {"choices":[{"finish_reason":"tool_calls","index":0,"delta":{"content":null}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","usage":{"completion_tokens":65,"prompt_tokens":25716,"prompt_tokens_details":{"cached_tokens":20470},"total_tokens":25781},"model":"claude-haiku-4.5"}
32+
33+
data: [DONE]

intercept/chatcompletions/streaming.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"fmt"
99
"net/http"
10+
"slices"
1011
"strings"
1112
"time"
1213

@@ -160,6 +161,28 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re
160161
toolCall = nil
161162
}
162163

164+
if toolCall != nil {
165+
// Builtin tools are not intercepted.
166+
if i.getInjectedToolByName(toolCall.Name) == nil {
167+
// Builtin tool - record usage and clear toolCall
168+
_ = i.recorder.RecordToolUsage(streamCtx, &recorder.ToolUsageRecord{
169+
InterceptionID: i.ID().String(),
170+
MsgID: processor.getMsgID(),
171+
Tool: toolCall.Name,
172+
Args: i.unmarshalArgs(toolCall.Arguments),
173+
Injected: false,
174+
})
175+
toolCall = nil
176+
} else {
177+
// Injected tools mark the stream as initiated so we continue to tool invocation.
178+
// When the provider responds with a tool call as the first chunk (no text
179+
// preamble), no chunks are relayed to the client. Marking as initiated
180+
// ensures we continue to tool invocation instead of returning early.
181+
events.MarkInitiated()
182+
183+
}
184+
}
185+
163186
if prompt != nil {
164187
_ = i.recorder.RecordPromptUsage(streamCtx, &recorder.PromptUsageRecord{
165188
InterceptionID: i.ID().String(),
@@ -239,7 +262,13 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re
239262

240263
// Invoke the injected tool, and use the tool result to make a subsequent request to the upstream.
241264
// Append the completion from this stream as context.
242-
i.req.Messages = append(i.req.Messages, processor.getLastCompletion().ToParam())
265+
// Some providers may return tool calls with non-zero starting indices,
266+
// resulting in nil entries in the array that must be removed.
267+
completion := processor.getLastCompletion()
268+
if completion != nil {
269+
compactToolCalls(completion)
270+
i.req.Messages = append(i.req.Messages, completion.ToParam())
271+
}
243272

244273
id := toolCall.ID
245274
args := i.unmarshalArgs(toolCall.Arguments)
@@ -486,3 +515,13 @@ func (s *streamProcessor) getLastUsage() openai.CompletionUsage {
486515
func (s *streamProcessor) getCumulativeUsage() openai.CompletionUsage {
487516
return s.cumulativeUsage
488517
}
518+
519+
// compactToolCalls removes nil/empty tool call entries (without an ID).
520+
func compactToolCalls(msg *openai.ChatCompletionMessage) {
521+
if msg == nil || len(msg.ToolCalls) == 0 {
522+
return
523+
}
524+
msg.ToolCalls = slices.DeleteFunc(msg.ToolCalls, func(tc openai.ChatCompletionMessageToolCallUnion) bool {
525+
return tc.ID == ""
526+
})
527+
}

intercept/eventstream/eventstream.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,15 @@ func (s *EventStream) IsStreaming() bool {
199199
return s.initiated.Load() || len(s.eventsCh) > 0
200200
}
201201

202+
// MarkInitiated marks the stream as initiated, even if no events have been
203+
// sent to the client yet. A stream is considered initiated when processing
204+
// injected tool calls that don't relay chunks to the client.
205+
func (s *EventStream) MarkInitiated() {
206+
s.initiateOnce.Do(func() {
207+
s.initiated.Store(true)
208+
})
209+
}
210+
202211
// IsConnError checks if an error is related to client disconnection or context cancellation.
203212
func IsConnError(err error) bool {
204213
if err == nil {

0 commit comments

Comments
 (0)