diff --git a/bridge_integration_test.go b/bridge_integration_test.go index dae4fe8..358b0ef 100644 --- a/bridge_integration_test.go +++ b/bridge_integration_test.go @@ -438,6 +438,119 @@ func TestOpenAIChatCompletions(t *testing.T) { }) } }) + + t.Run("streaming injected tool call edge cases", func(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + fixture []byte + expectedArgs map[string]any + }{ + { + name: "tool call no preamble", + fixture: fixtures.OaiChatStreamingInjectedToolNoPreamble, + expectedArgs: map[string]any{"owner": "me"}, + }, + { + name: "tool call with non-zero index", + fixture: fixtures.OaiChatStreamingInjectedToolNonzeroIndex, + expectedArgs: nil, // No arguments in this fixture + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + arc := txtar.Parse(tc.fixture) + t.Logf("%s: %s", t.Name(), arc.Comment) + + files := filesMap(arc) + require.Len(t, files, 3) + require.Contains(t, files, fixtureRequest) + require.Contains(t, files, fixtureStreamingResponse) + require.Contains(t, files, fixtureStreamingToolResponse) + + reqBody := files[fixtureRequest] + + // Add the stream param to the request. + newBody, err := setJSON(reqBody, "stream", true) + require.NoError(t, err) + reqBody = newBody + + ctx, cancel := context.WithTimeout(t.Context(), time.Second*30) + t.Cleanup(cancel) + + // Setup mock server with response mutator for multi-turn interaction. + srv := newMockServer(ctx, t, files, func(reqCount uint32, resp []byte) []byte { + if reqCount == 1 { + // First request gets the tool call response + return resp + } + // Second request gets final response + return files[fixtureStreamingToolResponse] + }) + t.Cleanup(srv.Close) + + recorderClient := &testutil.MockRecorder{} + + // Setup MCP proxies with the tool from the fixture + mcpProxiers, mcpCalls := setupMCPServerProxiesForTest(t, testTracer) + mcpMgr := mcp.NewServerProxyManager(mcpProxiers, testTracer) + require.NoError(t, mcpMgr.Init(ctx)) + + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: false}).Leveled(slog.LevelDebug) + providers := []aibridge.Provider{provider.NewOpenAI(openaiCfg(srv.URL, apiKey))} + b, err := aibridge.NewRequestBridge(t.Context(), providers, recorderClient, mcpMgr, logger, nil, testTracer) + require.NoError(t, err) + + mockSrv := httptest.NewUnstartedServer(b) + t.Cleanup(mockSrv.Close) + mockSrv.Config.BaseContext = func(_ net.Listener) context.Context { + return aibcontext.AsActor(ctx, userID, nil) + } + mockSrv.Start() + + req := createOpenAIChatCompletionsReq(t, mockSrv.URL, reqBody) + + client := &http.Client{} + resp, err := client.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + // Verify SSE headers are sent correctly + require.Equal(t, "text/event-stream", resp.Header.Get("Content-Type")) + require.Equal(t, "no-cache", resp.Header.Get("Cache-Control")) + require.Equal(t, "keep-alive", resp.Header.Get("Connection")) + + // Consume the full response body to ensure the interception completes + _, err = io.ReadAll(resp.Body) + require.NoError(t, err) + resp.Body.Close() + + // Verify the MCP tool was actually invoked + invocations := mcpCalls.getCallsByTool(mockToolName) + require.Len(t, invocations, 1, "expected MCP tool to be invoked") + + // Verify tool was invoked with the expected args (if specified) + if tc.expectedArgs != nil { + expected, err := json.Marshal(tc.expectedArgs) + require.NoError(t, err) + actual, err := json.Marshal(invocations[0]) + require.NoError(t, err) + require.EqualValues(t, expected, actual) + } + + // Verify tool usage was recorded + toolUsages := recorderClient.RecordedToolUsages() + require.Len(t, toolUsages, 1) + assert.Equal(t, mockToolName, toolUsages[0].Tool) + + recorderClient.VerifyAllInterceptionsEnded(t) + }) + } + }) } func TestSimple(t *testing.T) { diff --git a/fixtures/fixtures.go b/fixtures/fixtures.go index 243b506..2370b4e 100644 --- a/fixtures/fixtures.go +++ b/fixtures/fixtures.go @@ -45,6 +45,12 @@ var ( //go:embed openai/chatcompletions/non_stream_error.txtar OaiChatNonStreamError []byte + + //go:embed openai/chatcompletions/streaming_injected_tool_no_preamble.txtar + OaiChatStreamingInjectedToolNoPreamble []byte + + //go:embed openai/chatcompletions/streaming_injected_tool_nonzero_index.txtar + OaiChatStreamingInjectedToolNonzeroIndex []byte ) var ( diff --git a/fixtures/openai/chatcompletions/streaming_injected_tool_no_preamble.txtar b/fixtures/openai/chatcompletions/streaming_injected_tool_no_preamble.txtar new file mode 100644 index 0000000..f39097c --- /dev/null +++ b/fixtures/openai/chatcompletions/streaming_injected_tool_no_preamble.txtar @@ -0,0 +1,73 @@ +Streaming response where the provider returns an injected tool call as the first chunk with no text preamble. +This test ensures tool invocation continues even when no chunks are relayed to the client. + +-- request -- +{ + "messages": [ + { + "content": "2026-01-22T18:35:17.612Z\n\nlist all my coder workspaces", + "role": "user" + } + ], + "model": "claude-haiku-4.5", + "n": 1, + "temperature": 1, + "parallel_tool_calls": false, + "stream_options": { + "include_usage": true + }, + "stream": true +} + +-- streaming -- +data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"name":"bmcp_coder_coder_list_workspaces"},"id":"toolu_vrtx_01CvBi1d4qpKTG2PCuc9wDbZ","index":0,"type":"function"}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":""},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":"{\"own"},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":"er\": \"me\"}"},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"} + +data: {"choices":[{"finish_reason":"tool_calls","index":0,"delta":{"content":null}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","usage":{"completion_tokens":65,"prompt_tokens":25716,"prompt_tokens_details":{"cached_tokens":20470},"total_tokens":25781},"model":"claude-haiku-4.5"} + +data: [DONE] + + +-- streaming/tool-call -- +data: {"choices":[{"index":0,"delta":{"content":"You","role":"assistant"}}],"created":1769198061,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":" have one","role":"assistant"}}],"created":1769198061,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":" Coder workspace:","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":"\n\n**test-scf** (","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":"ID: a174a2e5","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":"-5050-445d-89","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":"ff-dd720e5b442","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":"e)\n- Template: docker","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":"\n- Template Version","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":" ID","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":": ad1b5ab1-","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":"fc18-4792-84f","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":"7-797787607d30","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":"\n- Status","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":": Up","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":" to date","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"} + +data: {"choices":[{"finish_reason":"stop","index":0,"delta":{"content":null}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","usage":{"completion_tokens":85,"prompt_tokens":25989,"prompt_tokens_details":{"cached_tokens":0},"total_tokens":26074},"model":"claude-haiku-4.5"} + +data: [DONE] + + diff --git a/fixtures/openai/chatcompletions/streaming_injected_tool_nonzero_index.txtar b/fixtures/openai/chatcompletions/streaming_injected_tool_nonzero_index.txtar new file mode 100644 index 0000000..384d1ee --- /dev/null +++ b/fixtures/openai/chatcompletions/streaming_injected_tool_nonzero_index.txtar @@ -0,0 +1,72 @@ +Streaming response where the provider returns text content followed by an injected tool call at index 1 (instead of index 0). +This can happen when the provider incorrectly continues indexing from a previous response. +This tests that nil entries are removed from the tool calls array caused by non-zero starting indices. + +-- request -- +{ + "messages": [ + { + "content": "2026-01-23T20:22:43.781Z\n\nI want you to do to this in order:\n1) create a file in my current directory with name \"test.txt\"\n2) list all my coder workspaces", + "role": "user" + } + ], + "model": "claude-haiku-4.5", + "n": 1, + "temperature": 1, + "parallel_tool_calls": false, + "stream_options": { + "include_usage": true + }, + "stream": true +} + +-- streaming -- +data: {"choices":[{"index":0,"delta":{"content":"Now","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":" listing","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":" your","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":" C","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":"oder workspaces:","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"name":"bmcp_coder_coder_list_workspaces"},"id":"toolu_vrtx_01DbFqUgk6aAtJ4nDBqzFWDF","index":1,"type":"function"}]}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":""},"index":1}]}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"} + +data: {"choices":[{"finish_reason":"tool_calls","index":0,"delta":{"content":null}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","usage":{"completion_tokens":58,"prompt_tokens":25939,"prompt_tokens_details":{"cached_tokens":25429},"total_tokens":25997},"model":"claude-haiku-4.5"} + +data: [DONE] + + +-- streaming/tool-call -- +data: {"choices":[{"index":0,"delta":{"content":"Done","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":"! I create","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":"d `","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":"test.txt` in","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":" your current directory.","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":" You","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":" have","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":" 1","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":" ","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":"Coder workspace:\n\n-","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":" **test-scf** (docker","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"} + +data: {"choices":[{"index":0,"delta":{"content":" template)","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"} + +data: {"choices":[{"finish_reason":"stop","index":0,"delta":{"content":null}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","usage":{"completion_tokens":39,"prompt_tokens":26166,"prompt_tokens_details":{"cached_tokens":25934},"total_tokens":26205},"model":"claude-haiku-4.5"} + +data: [DONE] + + diff --git a/intercept/chatcompletions/streaming.go b/intercept/chatcompletions/streaming.go index fcfffc4..3e48713 100644 --- a/intercept/chatcompletions/streaming.go +++ b/intercept/chatcompletions/streaming.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "net/http" + "slices" "strings" "time" @@ -156,16 +157,28 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re } } - // Builtin tools are not intercepted. - if toolCall != nil && i.getInjectedToolByName(toolCall.Name) == nil { - _ = i.recorder.RecordToolUsage(streamCtx, &recorder.ToolUsageRecord{ - InterceptionID: i.ID().String(), - MsgID: processor.getMsgID(), - Tool: toolCall.Name, - Args: i.unmarshalArgs(toolCall.Arguments), - Injected: false, - }) - toolCall = nil + if toolCall != nil { + // Builtin tools are not intercepted. + if i.getInjectedToolByName(toolCall.Name) == nil { + _ = i.recorder.RecordToolUsage(streamCtx, &recorder.ToolUsageRecord{ + InterceptionID: i.ID().String(), + MsgID: processor.getMsgID(), + Tool: toolCall.Name, + Args: i.unmarshalArgs(toolCall.Arguments), + Injected: false, + }) + toolCall = nil + } else { + // When the provider responds with only tool calls (no text content), + // no chunks are relayed to the client, so the stream is not yet + // initiated. Initiate it here so the SSE headers are sent and the + // ping ticker is started, preventing client timeout during tool invocation. + // Only initiate if no stream error, if there's an error, we'll return + // an HTTP error response instead of starting an SSE stream. + if stream.Err() == nil { + events.InitiateStream(w) + } + } } if prompt != nil { @@ -247,7 +260,13 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re // Invoke the injected tool, and use the tool result to make a subsequent request to the upstream. // Append the completion from this stream as context. - i.req.Messages = append(i.req.Messages, processor.getLastCompletion().ToParam()) + // Some providers may return tool calls with non-zero starting indices, + // resulting in nil entries in the array that must be removed. + completion := processor.getLastCompletion() + if completion != nil { + compactToolCalls(completion) + i.req.Messages = append(i.req.Messages, completion.ToParam()) + } id := toolCall.ID args := i.unmarshalArgs(toolCall.Arguments) @@ -494,3 +513,13 @@ func (s *streamProcessor) getLastUsage() openai.CompletionUsage { func (s *streamProcessor) getCumulativeUsage() openai.CompletionUsage { return s.cumulativeUsage } + +// compactToolCalls removes nil/empty tool call entries (without an ID). +func compactToolCalls(msg *openai.ChatCompletionMessage) { + if msg == nil || len(msg.ToolCalls) == 0 { + return + } + msg.ToolCalls = slices.DeleteFunc(msg.ToolCalls, func(tc openai.ChatCompletionMessageToolCallUnion) bool { + return tc.ID == "" + }) +} diff --git a/intercept/eventstream/eventstream.go b/intercept/eventstream/eventstream.go index 361dc21..b3ee96a 100644 --- a/intercept/eventstream/eventstream.go +++ b/intercept/eventstream/eventstream.go @@ -37,10 +37,18 @@ type EventStream struct { // doneCh is closed when the start loop exits. doneCh chan struct{} + + // tick sends periodic pings to keep the connection alive. + tick *time.Ticker } // NewEventStream creates a new SSE stream, with an optional payload which is used to send pings every [pingInterval]. func NewEventStream(ctx context.Context, logger slog.Logger, pingPayload []byte) *EventStream { + // Send periodic pings to keep connections alive. + // The upstream provider may also send their own pings, but we can't rely on this. + tick := time.NewTicker(time.Nanosecond) + tick.Stop() // Ticker will start after stream initiation. + return &EventStream{ ctx: ctx, logger: logger, @@ -49,9 +57,35 @@ func NewEventStream(ctx context.Context, logger slog.Logger, pingPayload []byte) eventsCh: make(chan event, 128), // Small buffer to unblock senders; once full, senders will block. doneCh: make(chan struct{}), + tick: tick, } } +// InitiateStream initiates the SSE stream by sending headers and starting the +// ping ticker. This is safe to call multiple times as only the first call has +// any effect. +func (s *EventStream) InitiateStream(w http.ResponseWriter) { + s.initiateOnce.Do(func() { + s.initiated.Store(true) + s.logger.Debug(s.ctx, "stream initiated") + + // Send headers for Server-Sent Event stream. + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + + // Send initial flush to ensure connection is established. + if err := flush(w); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Start ping ticker. + s.tick.Reset(pingInterval) + }) +} + // Start handles sending Server-Sent Event to the client. func (s *EventStream) Start(w http.ResponseWriter, r *http.Request) { // Signal completion on exit so senders don't block indefinitely after closure. @@ -59,11 +93,7 @@ func (s *EventStream) Start(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - // Send periodic pings to keep connections alive. - // The upstream provider may also send their own pings, but we can't rely on this. - tick := time.NewTicker(time.Nanosecond) - tick.Stop() // Ticker will start after stream initiation. - defer tick.Stop() + defer s.tick.Stop() for { var ( @@ -83,33 +113,9 @@ func (s *EventStream) Start(w http.ResponseWriter, r *http.Request) { return } - // Initiate the stream once the first event is received. - s.initiateOnce.Do(func() { - s.initiated.Store(true) - s.logger.Debug(ctx, "stream initiated") - - // Send headers for Server-Sent Event stream. - // - // We only send these once an event is processed because an error can occur in the upstream - // request prior to the stream starting, in which case the SSE headers are inappropriate to - // send to the client. - // - // See use of IsStreaming(). - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") - w.Header().Set("X-Accel-Buffering", "no") - - // Send initial flush to ensure connection is established. - if err := flush(w); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // Start ping ticker. - tick.Reset(pingInterval) - }) - case <-tick.C: + // Initiate the stream on first event (if not already initiated). + s.InitiateStream(w) + case <-s.tick.C: ev = s.pingPayload if ev == nil { continue @@ -132,7 +138,7 @@ func (s *EventStream) Start(w http.ResponseWriter, r *http.Request) { // Reset the timer once we've flushed some data to the stream, since it's already fresh. // No need to ping in that case. - tick.Reset(pingInterval) + s.tick.Reset(pingInterval) } }