Skip to content

Commit f299e00

Browse files
committed
fix(cli): process queued messages one at a time
Add ref-based lock to prevent race condition where React batching caused all queued messages to send simultaneously when stream ended.
1 parent 852e3e3 commit f299e00

File tree

6 files changed

+358
-46
lines changed

6 files changed

+358
-46
lines changed

cli/src/chat.tsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,7 @@ export const Chat = ({
585585
resumeQueue,
586586
clearQueue,
587587
isQueuePausedRef,
588+
isProcessingQueueRef,
588589
} = useMessageQueue(
589590
(message: QueuedMessage) =>
590591
sendMessageRef.current?.({
@@ -682,6 +683,7 @@ export const Chat = ({
682683
scrollToLatest,
683684
onTimerEvent: () => {}, // No-op for now
684685
isQueuePausedRef,
686+
isProcessingQueueRef,
685687
resumeQueue,
686688
continueChat,
687689
continueChatId,

cli/src/hooks/helpers/__tests__/send-message.test.ts

Lines changed: 216 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ ensureEnv()
2828

2929
const { useChatStore } = await import('../../../state/chat-store')
3030
const { createStreamController } = await import('../../stream-state')
31-
const { setupStreamingContext, handleRunError } = await import(
31+
const { setupStreamingContext, handleRunError, finalizeQueueState } = await import(
3232
'../send-message'
3333
)
3434
const { createBatchedMessageUpdater } = await import(
@@ -172,6 +172,94 @@ describe('setupStreamingContext', () => {
172172
expect(canProcessQueue).toBe(false)
173173
})
174174

175+
test('abort resets isProcessingQueueRef to false', () => {
176+
let messages = createBaseMessages()
177+
const streamRefs = createStreamController()
178+
const timerController = createMockTimerController()
179+
const abortControllerRef = { current: null as AbortController | null }
180+
const isProcessingQueueRef = { current: true }
181+
182+
const { abortController } = setupStreamingContext({
183+
aiMessageId: 'ai-1',
184+
timerController,
185+
setMessages: (fn: any) => {
186+
messages = fn(messages)
187+
},
188+
streamRefs,
189+
abortControllerRef,
190+
setStreamStatus: () => {},
191+
setCanProcessQueue: () => {},
192+
isProcessingQueueRef,
193+
updateChainInProgress: () => {},
194+
setIsRetrying: () => {},
195+
})
196+
197+
// Verify ref starts as true
198+
expect(isProcessingQueueRef.current).toBe(true)
199+
200+
// Trigger abort
201+
abortController.abort()
202+
203+
// Verify isProcessingQueueRef is reset to false after abort
204+
expect(isProcessingQueueRef.current).toBe(false)
205+
})
206+
207+
test('abort with both isProcessingQueueRef and isQueuePausedRef handles correctly', () => {
208+
let messages = createBaseMessages()
209+
const streamRefs = createStreamController()
210+
const timerController = createMockTimerController()
211+
const abortControllerRef = { current: null as AbortController | null }
212+
const isProcessingQueueRef = { current: true }
213+
const isQueuePausedRef = { current: true }
214+
let streamStatus = 'streaming' as StreamStatus
215+
let canProcessQueue = true
216+
let chainInProgress = true
217+
let isRetrying = true
218+
219+
const { abortController } = setupStreamingContext({
220+
aiMessageId: 'ai-1',
221+
timerController,
222+
setMessages: (fn: any) => {
223+
messages = fn(messages)
224+
},
225+
streamRefs,
226+
abortControllerRef,
227+
setStreamStatus: (status) => {
228+
streamStatus = status
229+
},
230+
setCanProcessQueue: (can) => {
231+
canProcessQueue = can
232+
},
233+
isQueuePausedRef,
234+
isProcessingQueueRef,
235+
updateChainInProgress: (value) => {
236+
chainInProgress = value
237+
},
238+
setIsRetrying: (value) => {
239+
isRetrying = value
240+
},
241+
})
242+
243+
// Sanity check initial state
244+
expect(isProcessingQueueRef.current).toBe(true)
245+
expect(isQueuePausedRef.current).toBe(true)
246+
expect(streamStatus).toBe('streaming')
247+
expect(canProcessQueue).toBe(true)
248+
expect(chainInProgress).toBe(true)
249+
expect(isRetrying).toBe(true)
250+
251+
// Trigger abort
252+
abortController.abort()
253+
254+
// After abort, lock should be released, queue should respect pause state,
255+
// chain and retry flags should be cleared, and stream should be idle.
256+
expect(isProcessingQueueRef.current).toBe(false)
257+
expect(canProcessQueue).toBe(false)
258+
expect(chainInProgress).toBe(false)
259+
expect(isRetrying).toBe(false)
260+
expect(streamStatus).toBe('idle')
261+
})
262+
175263
test('abort handler stores abortController in ref', () => {
176264
let messages = createBaseMessages()
177265
const streamRefs = createStreamController()
@@ -230,6 +318,61 @@ describe('setupStreamingContext', () => {
230318
})
231319
})
232320

321+
describe('finalizeQueueState', () => {
322+
test('sets stream status to idle and resets queue state', () => {
323+
let streamStatus = 'streaming' as StreamStatus
324+
let canProcessQueue = false
325+
let chainInProgress = true
326+
const isProcessingQueueRef = { current: true }
327+
328+
finalizeQueueState({
329+
setStreamStatus: (status) => { streamStatus = status },
330+
setCanProcessQueue: (can) => { canProcessQueue = can },
331+
updateChainInProgress: (value) => { chainInProgress = value },
332+
isProcessingQueueRef,
333+
})
334+
335+
expect(streamStatus).toBe('idle')
336+
expect(canProcessQueue).toBe(true)
337+
expect(chainInProgress).toBe(false)
338+
expect(isProcessingQueueRef.current).toBe(false)
339+
})
340+
341+
test('calls resumeQueue instead of setCanProcessQueue when provided', () => {
342+
let streamStatus = 'streaming' as StreamStatus
343+
let canProcessQueueCalled = false
344+
let resumeQueueCalled = false
345+
let chainInProgress = true
346+
347+
finalizeQueueState({
348+
setStreamStatus: (status) => { streamStatus = status },
349+
setCanProcessQueue: () => { canProcessQueueCalled = true },
350+
updateChainInProgress: (value) => { chainInProgress = value },
351+
resumeQueue: () => { resumeQueueCalled = true },
352+
})
353+
354+
expect(streamStatus).toBe('idle')
355+
expect(resumeQueueCalled).toBe(true)
356+
expect(canProcessQueueCalled).toBe(false)
357+
expect(chainInProgress).toBe(false)
358+
})
359+
360+
test('respects isQueuePausedRef when no resumeQueue provided', () => {
361+
let canProcessQueue = true
362+
const isQueuePausedRef = { current: true }
363+
364+
finalizeQueueState({
365+
setStreamStatus: () => {},
366+
setCanProcessQueue: (can) => { canProcessQueue = can },
367+
updateChainInProgress: () => {},
368+
isQueuePausedRef,
369+
})
370+
371+
// When queue is paused, canProcessQueue should be false
372+
expect(canProcessQueue).toBe(false)
373+
})
374+
})
375+
233376
describe('handleRunError', () => {
234377
let originalGetState: typeof useChatStore.getState
235378

@@ -376,6 +519,78 @@ describe('handleRunError', () => {
376519
expect(setInputModeMock).not.toHaveBeenCalled()
377520
})
378521

522+
test('resets isProcessingQueueRef to false on error', () => {
523+
let messages: ChatMessage[] = [
524+
{
525+
id: 'ai-1',
526+
variant: 'ai',
527+
content: '',
528+
blocks: [],
529+
timestamp: 'now',
530+
},
531+
]
532+
533+
const timerController = createMockTimerController()
534+
const updater = createBatchedMessageUpdater('ai-1', (fn: any) => {
535+
messages = fn(messages)
536+
})
537+
const isProcessingQueueRef = { current: true }
538+
539+
// Verify ref starts as true
540+
expect(isProcessingQueueRef.current).toBe(true)
541+
542+
handleRunError({
543+
error: new Error('Some error'),
544+
aiMessageId: 'ai-1',
545+
timerController,
546+
updater,
547+
setIsRetrying: () => {},
548+
setStreamStatus: () => {},
549+
setCanProcessQueue: () => {},
550+
updateChainInProgress: () => {},
551+
isProcessingQueueRef,
552+
})
553+
554+
// Verify isProcessingQueueRef is reset to false
555+
expect(isProcessingQueueRef.current).toBe(false)
556+
})
557+
558+
test('respects isQueuePausedRef when setting canProcessQueue on error', () => {
559+
let messages: ChatMessage[] = [
560+
{
561+
id: 'ai-1',
562+
variant: 'ai',
563+
content: '',
564+
blocks: [],
565+
timestamp: 'now',
566+
},
567+
]
568+
569+
const timerController = createMockTimerController()
570+
const updater = createBatchedMessageUpdater('ai-1', (fn: any) => {
571+
messages = fn(messages)
572+
})
573+
const isQueuePausedRef = { current: true }
574+
let canProcessQueue = true
575+
576+
handleRunError({
577+
error: new Error('Some error'),
578+
aiMessageId: 'ai-1',
579+
timerController,
580+
updater,
581+
setIsRetrying: () => {},
582+
setStreamStatus: () => {},
583+
setCanProcessQueue: (can: boolean) => {
584+
canProcessQueue = can
585+
},
586+
updateChainInProgress: () => {},
587+
isQueuePausedRef,
588+
})
589+
590+
// When queue is paused, canProcessQueue should be false
591+
expect(canProcessQueue).toBe(false)
592+
})
593+
379594
test('Payment required error (402) uses setError, invalidates queries, and switches input mode', () => {
380595
let messages: ChatMessage[] = [
381596
{

0 commit comments

Comments
 (0)