diff --git a/bridge_integration_test.go b/bridge_integration_test.go
index dae4fe8..358b0ef 100644
--- a/bridge_integration_test.go
+++ b/bridge_integration_test.go
@@ -438,6 +438,119 @@ func TestOpenAIChatCompletions(t *testing.T) {
})
}
})
+
+ t.Run("streaming injected tool call edge cases", func(t *testing.T) {
+ t.Parallel()
+
+ cases := []struct {
+ name string
+ fixture []byte
+ expectedArgs map[string]any
+ }{
+ {
+ name: "tool call no preamble",
+ fixture: fixtures.OaiChatStreamingInjectedToolNoPreamble,
+ expectedArgs: map[string]any{"owner": "me"},
+ },
+ {
+ name: "tool call with non-zero index",
+ fixture: fixtures.OaiChatStreamingInjectedToolNonzeroIndex,
+ expectedArgs: nil, // No arguments in this fixture
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ t.Parallel()
+
+ arc := txtar.Parse(tc.fixture)
+ t.Logf("%s: %s", t.Name(), arc.Comment)
+
+ files := filesMap(arc)
+ require.Len(t, files, 3)
+ require.Contains(t, files, fixtureRequest)
+ require.Contains(t, files, fixtureStreamingResponse)
+ require.Contains(t, files, fixtureStreamingToolResponse)
+
+ reqBody := files[fixtureRequest]
+
+ // Add the stream param to the request.
+ newBody, err := setJSON(reqBody, "stream", true)
+ require.NoError(t, err)
+ reqBody = newBody
+
+ ctx, cancel := context.WithTimeout(t.Context(), time.Second*30)
+ t.Cleanup(cancel)
+
+ // Setup mock server with response mutator for multi-turn interaction.
+ srv := newMockServer(ctx, t, files, func(reqCount uint32, resp []byte) []byte {
+ if reqCount == 1 {
+ // First request gets the tool call response
+ return resp
+ }
+ // Second request gets final response
+ return files[fixtureStreamingToolResponse]
+ })
+ t.Cleanup(srv.Close)
+
+ recorderClient := &testutil.MockRecorder{}
+
+ // Setup MCP proxies with the tool from the fixture
+ mcpProxiers, mcpCalls := setupMCPServerProxiesForTest(t, testTracer)
+ mcpMgr := mcp.NewServerProxyManager(mcpProxiers, testTracer)
+ require.NoError(t, mcpMgr.Init(ctx))
+
+ logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: false}).Leveled(slog.LevelDebug)
+ providers := []aibridge.Provider{provider.NewOpenAI(openaiCfg(srv.URL, apiKey))}
+ b, err := aibridge.NewRequestBridge(t.Context(), providers, recorderClient, mcpMgr, logger, nil, testTracer)
+ require.NoError(t, err)
+
+ mockSrv := httptest.NewUnstartedServer(b)
+ t.Cleanup(mockSrv.Close)
+ mockSrv.Config.BaseContext = func(_ net.Listener) context.Context {
+ return aibcontext.AsActor(ctx, userID, nil)
+ }
+ mockSrv.Start()
+
+ req := createOpenAIChatCompletionsReq(t, mockSrv.URL, reqBody)
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ require.NoError(t, err)
+ require.Equal(t, http.StatusOK, resp.StatusCode)
+
+ // Verify SSE headers are sent correctly
+ require.Equal(t, "text/event-stream", resp.Header.Get("Content-Type"))
+ require.Equal(t, "no-cache", resp.Header.Get("Cache-Control"))
+ require.Equal(t, "keep-alive", resp.Header.Get("Connection"))
+
+ // Consume the full response body to ensure the interception completes
+ _, err = io.ReadAll(resp.Body)
+ require.NoError(t, err)
+ resp.Body.Close()
+
+ // Verify the MCP tool was actually invoked
+ invocations := mcpCalls.getCallsByTool(mockToolName)
+ require.Len(t, invocations, 1, "expected MCP tool to be invoked")
+
+ // Verify tool was invoked with the expected args (if specified)
+ if tc.expectedArgs != nil {
+ expected, err := json.Marshal(tc.expectedArgs)
+ require.NoError(t, err)
+ actual, err := json.Marshal(invocations[0])
+ require.NoError(t, err)
+ require.EqualValues(t, expected, actual)
+ }
+
+ // Verify tool usage was recorded
+ toolUsages := recorderClient.RecordedToolUsages()
+ require.Len(t, toolUsages, 1)
+ assert.Equal(t, mockToolName, toolUsages[0].Tool)
+
+ recorderClient.VerifyAllInterceptionsEnded(t)
+ })
+ }
+ })
}
func TestSimple(t *testing.T) {
diff --git a/fixtures/fixtures.go b/fixtures/fixtures.go
index 243b506..2370b4e 100644
--- a/fixtures/fixtures.go
+++ b/fixtures/fixtures.go
@@ -45,6 +45,12 @@ var (
//go:embed openai/chatcompletions/non_stream_error.txtar
OaiChatNonStreamError []byte
+
+ //go:embed openai/chatcompletions/streaming_injected_tool_no_preamble.txtar
+ OaiChatStreamingInjectedToolNoPreamble []byte
+
+ //go:embed openai/chatcompletions/streaming_injected_tool_nonzero_index.txtar
+ OaiChatStreamingInjectedToolNonzeroIndex []byte
)
var (
diff --git a/fixtures/openai/chatcompletions/streaming_injected_tool_no_preamble.txtar b/fixtures/openai/chatcompletions/streaming_injected_tool_no_preamble.txtar
new file mode 100644
index 0000000..f39097c
--- /dev/null
+++ b/fixtures/openai/chatcompletions/streaming_injected_tool_no_preamble.txtar
@@ -0,0 +1,73 @@
+Streaming response where the provider returns an injected tool call as the first chunk with no text preamble.
+This test ensures tool invocation continues even when no chunks are relayed to the client.
+
+-- request --
+{
+ "messages": [
+ {
+ "content": "2026-01-22T18:35:17.612Z\n\nlist all my coder workspaces",
+ "role": "user"
+ }
+ ],
+ "model": "claude-haiku-4.5",
+ "n": 1,
+ "temperature": 1,
+ "parallel_tool_calls": false,
+ "stream_options": {
+ "include_usage": true
+ },
+ "stream": true
+}
+
+-- streaming --
+data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"name":"bmcp_coder_coder_list_workspaces"},"id":"toolu_vrtx_01CvBi1d4qpKTG2PCuc9wDbZ","index":0,"type":"function"}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":""},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":"{\"own"},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":"er\": \"me\"}"},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"finish_reason":"tool_calls","index":0,"delta":{"content":null}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","usage":{"completion_tokens":65,"prompt_tokens":25716,"prompt_tokens_details":{"cached_tokens":20470},"total_tokens":25781},"model":"claude-haiku-4.5"}
+
+data: [DONE]
+
+
+-- streaming/tool-call --
+data: {"choices":[{"index":0,"delta":{"content":"You","role":"assistant"}}],"created":1769198061,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":" have one","role":"assistant"}}],"created":1769198061,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":" Coder workspace:","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":"\n\n**test-scf** (","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":"ID: a174a2e5","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":"-5050-445d-89","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":"ff-dd720e5b442","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":"e)\n- Template: docker","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":"\n- Template Version","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":" ID","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":": ad1b5ab1-","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":"fc18-4792-84f","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":"7-797787607d30","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":"\n- Status","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":": Up","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":" to date","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"finish_reason":"stop","index":0,"delta":{"content":null}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","usage":{"completion_tokens":85,"prompt_tokens":25989,"prompt_tokens_details":{"cached_tokens":0},"total_tokens":26074},"model":"claude-haiku-4.5"}
+
+data: [DONE]
+
+
diff --git a/fixtures/openai/chatcompletions/streaming_injected_tool_nonzero_index.txtar b/fixtures/openai/chatcompletions/streaming_injected_tool_nonzero_index.txtar
new file mode 100644
index 0000000..384d1ee
--- /dev/null
+++ b/fixtures/openai/chatcompletions/streaming_injected_tool_nonzero_index.txtar
@@ -0,0 +1,72 @@
+Streaming response where the provider returns text content followed by an injected tool call at index 1 (instead of index 0).
+This can happen when the provider incorrectly continues indexing from a previous response.
+This tests that nil entries are removed from the tool calls array caused by non-zero starting indices.
+
+-- request --
+{
+ "messages": [
+ {
+ "content": "2026-01-23T20:22:43.781Z\n\nI want you to do to this in order:\n1) create a file in my current directory with name \"test.txt\"\n2) list all my coder workspaces",
+ "role": "user"
+ }
+ ],
+ "model": "claude-haiku-4.5",
+ "n": 1,
+ "temperature": 1,
+ "parallel_tool_calls": false,
+ "stream_options": {
+ "include_usage": true
+ },
+ "stream": true
+}
+
+-- streaming --
+data: {"choices":[{"index":0,"delta":{"content":"Now","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":" listing","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":" your","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":" C","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":"oder workspaces:","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"name":"bmcp_coder_coder_list_workspaces"},"id":"toolu_vrtx_01DbFqUgk6aAtJ4nDBqzFWDF","index":1,"type":"function"}]}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":""},"index":1}]}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"finish_reason":"tool_calls","index":0,"delta":{"content":null}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","usage":{"completion_tokens":58,"prompt_tokens":25939,"prompt_tokens_details":{"cached_tokens":25429},"total_tokens":25997},"model":"claude-haiku-4.5"}
+
+data: [DONE]
+
+
+-- streaming/tool-call --
+data: {"choices":[{"index":0,"delta":{"content":"Done","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":"! I create","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":"d `","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":"test.txt` in","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":" your current directory.","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":" You","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":" have","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":" 1","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":" ","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":"Coder workspace:\n\n-","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":" **test-scf** (docker","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"index":0,"delta":{"content":" template)","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}
+
+data: {"choices":[{"finish_reason":"stop","index":0,"delta":{"content":null}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","usage":{"completion_tokens":39,"prompt_tokens":26166,"prompt_tokens_details":{"cached_tokens":25934},"total_tokens":26205},"model":"claude-haiku-4.5"}
+
+data: [DONE]
+
+
diff --git a/intercept/chatcompletions/streaming.go b/intercept/chatcompletions/streaming.go
index fcfffc4..3e48713 100644
--- a/intercept/chatcompletions/streaming.go
+++ b/intercept/chatcompletions/streaming.go
@@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"net/http"
+ "slices"
"strings"
"time"
@@ -156,16 +157,28 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re
}
}
- // Builtin tools are not intercepted.
- if toolCall != nil && i.getInjectedToolByName(toolCall.Name) == nil {
- _ = i.recorder.RecordToolUsage(streamCtx, &recorder.ToolUsageRecord{
- InterceptionID: i.ID().String(),
- MsgID: processor.getMsgID(),
- Tool: toolCall.Name,
- Args: i.unmarshalArgs(toolCall.Arguments),
- Injected: false,
- })
- toolCall = nil
+ if toolCall != nil {
+ // Builtin tools are not intercepted.
+ if i.getInjectedToolByName(toolCall.Name) == nil {
+ _ = i.recorder.RecordToolUsage(streamCtx, &recorder.ToolUsageRecord{
+ InterceptionID: i.ID().String(),
+ MsgID: processor.getMsgID(),
+ Tool: toolCall.Name,
+ Args: i.unmarshalArgs(toolCall.Arguments),
+ Injected: false,
+ })
+ toolCall = nil
+ } else {
+ // When the provider responds with only tool calls (no text content),
+ // no chunks are relayed to the client, so the stream is not yet
+ // initiated. Initiate it here so the SSE headers are sent and the
+ // ping ticker is started, preventing client timeout during tool invocation.
+ // Only initiate if no stream error, if there's an error, we'll return
+ // an HTTP error response instead of starting an SSE stream.
+ if stream.Err() == nil {
+ events.InitiateStream(w)
+ }
+ }
}
if prompt != nil {
@@ -247,7 +260,13 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re
// Invoke the injected tool, and use the tool result to make a subsequent request to the upstream.
// Append the completion from this stream as context.
- i.req.Messages = append(i.req.Messages, processor.getLastCompletion().ToParam())
+ // Some providers may return tool calls with non-zero starting indices,
+ // resulting in nil entries in the array that must be removed.
+ completion := processor.getLastCompletion()
+ if completion != nil {
+ compactToolCalls(completion)
+ i.req.Messages = append(i.req.Messages, completion.ToParam())
+ }
id := toolCall.ID
args := i.unmarshalArgs(toolCall.Arguments)
@@ -494,3 +513,13 @@ func (s *streamProcessor) getLastUsage() openai.CompletionUsage {
func (s *streamProcessor) getCumulativeUsage() openai.CompletionUsage {
return s.cumulativeUsage
}
+
+// compactToolCalls removes nil/empty tool call entries (without an ID).
+func compactToolCalls(msg *openai.ChatCompletionMessage) {
+ if msg == nil || len(msg.ToolCalls) == 0 {
+ return
+ }
+ msg.ToolCalls = slices.DeleteFunc(msg.ToolCalls, func(tc openai.ChatCompletionMessageToolCallUnion) bool {
+ return tc.ID == ""
+ })
+}
diff --git a/intercept/eventstream/eventstream.go b/intercept/eventstream/eventstream.go
index 361dc21..b3ee96a 100644
--- a/intercept/eventstream/eventstream.go
+++ b/intercept/eventstream/eventstream.go
@@ -37,10 +37,18 @@ type EventStream struct {
// doneCh is closed when the start loop exits.
doneCh chan struct{}
+
+ // tick sends periodic pings to keep the connection alive.
+ tick *time.Ticker
}
// NewEventStream creates a new SSE stream, with an optional payload which is used to send pings every [pingInterval].
func NewEventStream(ctx context.Context, logger slog.Logger, pingPayload []byte) *EventStream {
+ // Send periodic pings to keep connections alive.
+ // The upstream provider may also send their own pings, but we can't rely on this.
+ tick := time.NewTicker(time.Nanosecond)
+ tick.Stop() // Ticker will start after stream initiation.
+
return &EventStream{
ctx: ctx,
logger: logger,
@@ -49,9 +57,35 @@ func NewEventStream(ctx context.Context, logger slog.Logger, pingPayload []byte)
eventsCh: make(chan event, 128), // Small buffer to unblock senders; once full, senders will block.
doneCh: make(chan struct{}),
+ tick: tick,
}
}
+// InitiateStream initiates the SSE stream by sending headers and starting the
+// ping ticker. This is safe to call multiple times as only the first call has
+// any effect.
+func (s *EventStream) InitiateStream(w http.ResponseWriter) {
+ s.initiateOnce.Do(func() {
+ s.initiated.Store(true)
+ s.logger.Debug(s.ctx, "stream initiated")
+
+ // Send headers for Server-Sent Event stream.
+ w.Header().Set("Content-Type", "text/event-stream")
+ w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set("Connection", "keep-alive")
+ w.Header().Set("X-Accel-Buffering", "no")
+
+ // Send initial flush to ensure connection is established.
+ if err := flush(w); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ // Start ping ticker.
+ s.tick.Reset(pingInterval)
+ })
+}
+
// Start handles sending Server-Sent Event to the client.
func (s *EventStream) Start(w http.ResponseWriter, r *http.Request) {
// Signal completion on exit so senders don't block indefinitely after closure.
@@ -59,11 +93,7 @@ func (s *EventStream) Start(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
- // Send periodic pings to keep connections alive.
- // The upstream provider may also send their own pings, but we can't rely on this.
- tick := time.NewTicker(time.Nanosecond)
- tick.Stop() // Ticker will start after stream initiation.
- defer tick.Stop()
+ defer s.tick.Stop()
for {
var (
@@ -83,33 +113,9 @@ func (s *EventStream) Start(w http.ResponseWriter, r *http.Request) {
return
}
- // Initiate the stream once the first event is received.
- s.initiateOnce.Do(func() {
- s.initiated.Store(true)
- s.logger.Debug(ctx, "stream initiated")
-
- // Send headers for Server-Sent Event stream.
- //
- // We only send these once an event is processed because an error can occur in the upstream
- // request prior to the stream starting, in which case the SSE headers are inappropriate to
- // send to the client.
- //
- // See use of IsStreaming().
- w.Header().Set("Content-Type", "text/event-stream")
- w.Header().Set("Cache-Control", "no-cache")
- w.Header().Set("Connection", "keep-alive")
- w.Header().Set("X-Accel-Buffering", "no")
-
- // Send initial flush to ensure connection is established.
- if err := flush(w); err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
-
- // Start ping ticker.
- tick.Reset(pingInterval)
- })
- case <-tick.C:
+ // Initiate the stream on first event (if not already initiated).
+ s.InitiateStream(w)
+ case <-s.tick.C:
ev = s.pingPayload
if ev == nil {
continue
@@ -132,7 +138,7 @@ func (s *EventStream) Start(w http.ResponseWriter, r *http.Request) {
// Reset the timer once we've flushed some data to the stream, since it's already fresh.
// No need to ping in that case.
- tick.Reset(pingInterval)
+ s.tick.Reset(pingInterval)
}
}