Skip to content

Commit 8328a98

Browse files
committed
fix(mothership): queue supersede crash
1 parent f16d17b commit 8328a98

6 files changed

Lines changed: 156 additions & 19 deletions

File tree

apps/sim/lib/copilot/request/go/stream.test.ts

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,24 @@ import {
1010
MothershipStreamV1ToolOutcome,
1111
MothershipStreamV1ToolPhase,
1212
} from '@/lib/copilot/generated/mothership-stream-v1'
13+
14+
vi.mock('@/lib/copilot/request/session', async () => {
15+
const actual = await vi.importActual<typeof import('@/lib/copilot/request/session')>(
16+
'@/lib/copilot/request/session'
17+
)
18+
return {
19+
...actual,
20+
hasAbortMarker: vi.fn().mockResolvedValue(false),
21+
}
22+
})
23+
1324
import {
1425
buildPreviewContentUpdate,
1526
decodeJsonStringPrefix,
1627
extractEditContent,
1728
runStreamLoop,
1829
} from '@/lib/copilot/request/go/stream'
19-
import { createEvent } from '@/lib/copilot/request/session'
30+
import { createEvent, hasAbortMarker } from '@/lib/copilot/request/session'
2031
import { RequestTraceV1Outcome, TraceCollector } from '@/lib/copilot/request/trace'
2132
import type { ExecutionContext, StreamingContext } from '@/lib/copilot/request/types'
2233

@@ -285,6 +296,71 @@ describe('copilot go stream helpers', () => {
285296
).toBe(true)
286297
})
287298

299+
it('reclassifies as aborted when the body closes without terminal but the abort marker is set', async () => {
300+
const textEvent = createEvent({
301+
streamId: 'stream-1',
302+
cursor: '1',
303+
seq: 1,
304+
requestId: 'req-1',
305+
type: MothershipStreamV1EventType.text,
306+
payload: {
307+
channel: 'assistant',
308+
text: 'partial response',
309+
},
310+
})
311+
312+
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
313+
vi.mocked(hasAbortMarker).mockResolvedValueOnce(true)
314+
315+
const context = createStreamingContext()
316+
const execContext: ExecutionContext = {
317+
userId: 'user-1',
318+
workflowId: 'workflow-1',
319+
}
320+
321+
await runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
322+
timeout: 1000,
323+
})
324+
325+
expect(hasAbortMarker).toHaveBeenCalledWith(context.messageId)
326+
expect(context.wasAborted).toBe(true)
327+
expect(
328+
context.errors.some((message) =>
329+
message.includes('Copilot backend stream ended before a terminal event')
330+
)
331+
).toBe(false)
332+
})
333+
334+
it('still fails closed when the body closes without terminal and the abort marker check throws', async () => {
335+
const textEvent = createEvent({
336+
streamId: 'stream-1',
337+
cursor: '1',
338+
seq: 1,
339+
requestId: 'req-1',
340+
type: MothershipStreamV1EventType.text,
341+
payload: {
342+
channel: 'assistant',
343+
text: 'partial response',
344+
},
345+
})
346+
347+
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
348+
vi.mocked(hasAbortMarker).mockRejectedValueOnce(new Error('redis unavailable'))
349+
350+
const context = createStreamingContext()
351+
const execContext: ExecutionContext = {
352+
userId: 'user-1',
353+
workflowId: 'workflow-1',
354+
}
355+
356+
await expect(
357+
runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
358+
timeout: 1000,
359+
})
360+
).rejects.toThrow('Copilot backend stream ended before a terminal event')
361+
expect(context.wasAborted).toBe(false)
362+
})
363+
288364
it('fails closed when the shared stream receives an invalid event', async () => {
289365
vi.mocked(fetch).mockResolvedValueOnce(
290366
createSseResponse([

apps/sim/lib/copilot/request/go/stream.ts

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import {
3131
import { getCopilotTracer } from '@/lib/copilot/request/otel'
3232
import {
3333
eventToStreamEvent,
34+
hasAbortMarker,
3435
isSubagentSpanStreamEvent,
3536
parsePersistedStreamEventEnvelope,
3637
} from '@/lib/copilot/request/session'
@@ -436,16 +437,31 @@ export async function runStreamLoop(
436437
})
437438

438439
if (!context.streamComplete && !abortSignal?.aborted && !context.wasAborted) {
439-
const streamPath = new URL(fetchUrl).pathname
440-
const message = `Copilot backend stream ended before a terminal event on ${streamPath}`
441-
context.errors.push(message)
442-
logger.error('Copilot backend stream ended before a terminal event', {
443-
path: streamPath,
444-
requestId: context.requestId,
445-
messageId: context.messageId,
446-
})
447-
endedOn = CopilotSseCloseReason.ClosedNoTerminal
448-
throw new CopilotBackendError(message, { status: 503 })
440+
let abortRequested = false
441+
try {
442+
abortRequested = await hasAbortMarker(context.messageId)
443+
} catch (error) {
444+
logger.warn('Failed to read abort marker at body close', {
445+
streamId: context.messageId,
446+
error: error instanceof Error ? error.message : String(error),
447+
})
448+
}
449+
450+
if (abortRequested) {
451+
context.wasAborted = true
452+
endedOn = CopilotSseCloseReason.Aborted
453+
} else {
454+
const streamPath = new URL(fetchUrl).pathname
455+
const message = `Copilot backend stream ended before a terminal event on ${streamPath}`
456+
context.errors.push(message)
457+
logger.error('Copilot backend stream ended before a terminal event', {
458+
path: streamPath,
459+
requestId: context.requestId,
460+
messageId: context.messageId,
461+
})
462+
endedOn = CopilotSseCloseReason.ClosedNoTerminal
463+
throw new CopilotBackendError(message, { status: 503 })
464+
}
449465
}
450466
} catch (error) {
451467
if (error instanceof FatalSseEventError && !context.errors.includes(error.message)) {

apps/sim/lib/copilot/request/lifecycle/headless.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,12 @@ export async function runHeadlessCopilotLifecycle(
5353
simRequestId,
5454
otelContext,
5555
})
56-
outcome = options.abortSignal?.aborted
57-
? RequestTraceV1Outcome.cancelled
58-
: result.success
59-
? RequestTraceV1Outcome.success
60-
: RequestTraceV1Outcome.error
56+
outcome =
57+
options.abortSignal?.aborted || result.cancelled
58+
? RequestTraceV1Outcome.cancelled
59+
: result.success
60+
? RequestTraceV1Outcome.success
61+
: RequestTraceV1Outcome.error
6162
return result
6263
} catch (error) {
6364
outcome = options.abortSignal?.aborted

apps/sim/lib/copilot/request/lifecycle/start.test.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ import { propagation, trace } from '@opentelemetry/api'
66
import { W3CTraceContextPropagator } from '@opentelemetry/core'
77
import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base'
88
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
9-
import { MothershipStreamV1EventType } from '@/lib/copilot/generated/mothership-stream-v1'
9+
import {
10+
MothershipStreamV1CompletionStatus,
11+
MothershipStreamV1EventType,
12+
} from '@/lib/copilot/generated/mothership-stream-v1'
1013

1114
const {
1215
runCopilotLifecycle,
@@ -60,6 +63,7 @@ vi.mock('@/lib/copilot/request/session', () => ({
6063
registerActiveStream: vi.fn(),
6164
unregisterActiveStream: vi.fn(),
6265
startAbortPoller: vi.fn().mockReturnValue(setInterval(() => {}, 999999)),
66+
isExplicitStopReason: vi.fn().mockReturnValue(false),
6367
SSE_RESPONSE_HEADERS: {},
6468
StreamWriter: vi.fn().mockImplementation(() => ({
6569
attach: vi.fn().mockImplementation((ctrl: ReadableStreamDefaultController) => {
@@ -211,6 +215,46 @@ describe('createSSEStream terminal error handling', () => {
211215
expect(scheduleBufferCleanup).toHaveBeenCalledWith('stream-1')
212216
})
213217

218+
it('publishes a cancelled completion (not an error) when the orchestrator reports cancelled without abortSignal aborted', async () => {
219+
runCopilotLifecycle.mockResolvedValue({
220+
success: false,
221+
cancelled: true,
222+
content: '',
223+
contentBlocks: [],
224+
toolCalls: [],
225+
})
226+
227+
const stream = createSSEStream({
228+
requestPayload: { message: 'hello' },
229+
userId: 'user-1',
230+
streamId: 'stream-1',
231+
executionId: 'exec-1',
232+
runId: 'run-1',
233+
currentChat: null,
234+
isNewChat: false,
235+
message: 'hello',
236+
titleModel: 'gpt-5.4',
237+
requestId: 'req-cancelled',
238+
orchestrateOptions: {},
239+
})
240+
241+
await drainStream(stream)
242+
243+
expect(appendEvent).not.toHaveBeenCalledWith(
244+
expect.objectContaining({
245+
type: MothershipStreamV1EventType.error,
246+
})
247+
)
248+
expect(appendEvent).toHaveBeenCalledWith(
249+
expect.objectContaining({
250+
type: MothershipStreamV1EventType.complete,
251+
payload: expect.objectContaining({
252+
status: MothershipStreamV1CompletionStatus.cancelled,
253+
}),
254+
})
255+
)
256+
})
257+
214258
it('passes an OTel context into the streaming lifecycle', async () => {
215259
let lifecycleTraceparent = ''
216260
runCopilotLifecycle.mockImplementation(async (_payload, options) => {

apps/sim/lib/copilot/request/lifecycle/start.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
266266
// 3. Otherwise → error.
267267
outcome = result.success
268268
? RequestTraceV1Outcome.success
269-
: abortController.signal.aborted || publisher.clientDisconnected
269+
: result.cancelled || abortController.signal.aborted || publisher.clientDisconnected
270270
? RequestTraceV1Outcome.cancelled
271271
: RequestTraceV1Outcome.error
272272
if (outcome === RequestTraceV1Outcome.cancelled) {

apps/sim/lib/copilot/request/session/abort.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ const pendingChatStreams = new Map<
1717
{ promise: Promise<void>; resolve: () => void; streamId: string }
1818
>()
1919

20-
const DEFAULT_ABORT_POLL_MS = 1000
20+
const DEFAULT_ABORT_POLL_MS = 250
2121

2222
/**
2323
* TTL for the per-chat stream lock. Kept short so that if the Sim pod

0 commit comments

Comments
 (0)