Skip to content

Commit 773cd84

Browse files
fix(mothership): reconcile stuck conversation_id against Redis lock to clear stuck-yellow task tiles (#4556)
* fix(mothership): reconcile stuck conversation_id against Redis lock to clear stuck-yellow task tiles copilot_chats.conversation_id has no TTL/heartbeat, so when a stream process dies before the clear path runs (pod OOM, SIGKILL, uncaught throw, deploy mid-stream) the column is orphaned and the task tile renders yellow forever. The Redis lock at copilot:chat-stream-lock:<chatId> is the canonical liveness signal and self-heals via 60s TTL + 20s heartbeat, but the mothership APIs weren't consulting it. Adds read-time reconciliation: a batched MGET helper checks whether each persisted conversation_id still has a live Redis lock, and both GET /api/mothership/chats and GET /api/mothership/chats/[chatId] rewrite the marker to null when the lock has expired. No DB writes; stuck rows self-heal on next fetch. * test(mothership): clarify test name to reflect that getActiveChatStreamIds is called with empty candidateIds * address comments * fix state machine issue * cleanup code and fix types --------- Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
1 parent d5c2ead commit 773cd84

13 files changed

Lines changed: 955 additions & 37 deletions

File tree

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { copilotHttpMock, copilotHttpMockFns } from '@sim/testing'
5+
import { NextRequest } from 'next/server'
6+
import { beforeEach, describe, expect, it, vi } from 'vitest'
7+
8+
const {
9+
mockGetAccessibleCopilotChat,
10+
mockReconcileChatStreamMarkers,
11+
mockReadEvents,
12+
mockReadFilePreviewSessions,
13+
mockGetLatestRunForStream,
14+
} = vi.hoisted(() => ({
15+
mockGetAccessibleCopilotChat: vi.fn(),
16+
mockReconcileChatStreamMarkers: vi.fn(),
17+
mockReadEvents: vi.fn(),
18+
mockReadFilePreviewSessions: vi.fn(),
19+
mockGetLatestRunForStream: vi.fn(),
20+
}))
21+
22+
vi.mock('@sim/db', () => ({ db: {} }))
23+
24+
vi.mock('@sim/db/schema', () => ({
25+
copilotChats: {
26+
id: 'copilotChats.id',
27+
userId: 'copilotChats.userId',
28+
type: 'copilotChats.type',
29+
updatedAt: 'copilotChats.updatedAt',
30+
lastSeenAt: 'copilotChats.lastSeenAt',
31+
},
32+
}))
33+
34+
vi.mock('drizzle-orm', () => ({
35+
and: vi.fn((...conditions: unknown[]) => ({ type: 'and', conditions })),
36+
eq: vi.fn((field: unknown, value: unknown) => ({ type: 'eq', field, value })),
37+
sql: Object.assign(
38+
vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({
39+
type: 'sql',
40+
strings,
41+
values,
42+
})),
43+
{ raw: vi.fn() }
44+
),
45+
}))
46+
47+
vi.mock('@/lib/copilot/request/http', () => copilotHttpMock)
48+
49+
vi.mock('@/lib/copilot/chat/lifecycle', () => ({
50+
getAccessibleCopilotChat: mockGetAccessibleCopilotChat,
51+
}))
52+
53+
vi.mock('@/lib/copilot/chat/stream-liveness', () => ({
54+
reconcileChatStreamMarkers: mockReconcileChatStreamMarkers,
55+
}))
56+
57+
vi.mock('@/lib/copilot/request/session/buffer', () => ({
58+
readEvents: mockReadEvents,
59+
}))
60+
61+
vi.mock('@/lib/copilot/request/session/file-preview-session', () => ({
62+
readFilePreviewSessions: mockReadFilePreviewSessions,
63+
}))
64+
65+
vi.mock('@/lib/copilot/async-runs/repository', () => ({
66+
getLatestRunForStream: mockGetLatestRunForStream,
67+
}))
68+
69+
vi.mock('@/lib/copilot/request/session/types', () => ({
70+
toStreamBatchEvent: (e: unknown) => e,
71+
}))
72+
73+
vi.mock('@/lib/copilot/chat/effective-transcript', () => ({
74+
buildEffectiveChatTranscript: ({ messages }: { messages: unknown[] }) => messages,
75+
}))
76+
77+
vi.mock('@/lib/copilot/chat/persisted-message', () => ({
78+
normalizeMessage: (m: unknown) => m,
79+
}))
80+
81+
vi.mock('@/lib/copilot/tasks', () => ({
82+
taskPubSub: { publishStatusChanged: vi.fn() },
83+
}))
84+
85+
vi.mock('@/lib/posthog/server', () => ({
86+
captureServerEvent: vi.fn(),
87+
}))
88+
89+
import { GET } from '@/app/api/mothership/chats/[chatId]/route'
90+
91+
function makeContext(chatId: string) {
92+
return { params: Promise.resolve({ chatId }) }
93+
}
94+
95+
function createRequest(chatId: string) {
96+
return new NextRequest(`http://localhost:3000/api/mothership/chats/${chatId}`, {
97+
method: 'GET',
98+
})
99+
}
100+
101+
describe('GET /api/mothership/chats/[chatId]', () => {
102+
beforeEach(() => {
103+
vi.clearAllMocks()
104+
copilotHttpMockFns.mockAuthenticateCopilotRequestSessionOnly.mockResolvedValue({
105+
userId: 'user-1',
106+
isAuthenticated: true,
107+
})
108+
mockReconcileChatStreamMarkers.mockImplementation(
109+
async (candidates: Array<{ chatId: string; streamId: string | null }>) =>
110+
new Map(
111+
candidates.map((candidate) => [
112+
candidate.chatId,
113+
{
114+
chatId: candidate.chatId,
115+
streamId: candidate.streamId,
116+
status: candidate.streamId ? 'active' : 'inactive',
117+
},
118+
])
119+
)
120+
)
121+
mockReadEvents.mockResolvedValue([])
122+
mockReadFilePreviewSessions.mockResolvedValue([])
123+
mockGetLatestRunForStream.mockResolvedValue(null)
124+
})
125+
126+
it('clears activeStreamId when the redis lock has expired (stuck-yellow bug)', async () => {
127+
mockGetAccessibleCopilotChat.mockResolvedValueOnce({
128+
id: 'chat-stuck',
129+
type: 'mothership',
130+
title: 'Stuck',
131+
messages: [],
132+
resources: [],
133+
conversationId: 'stream-orphaned',
134+
createdAt: new Date('2026-05-11T12:00:00Z'),
135+
updatedAt: new Date('2026-05-11T12:00:00Z'),
136+
})
137+
mockReconcileChatStreamMarkers.mockResolvedValueOnce(
138+
new Map([['chat-stuck', { chatId: 'chat-stuck', streamId: null, status: 'inactive' }]])
139+
)
140+
141+
const response = await GET(createRequest('chat-stuck'), makeContext('chat-stuck'))
142+
expect(response.status).toBe(200)
143+
const body = await response.json()
144+
145+
expect(mockReconcileChatStreamMarkers).toHaveBeenCalledWith(
146+
[{ chatId: 'chat-stuck', streamId: 'stream-orphaned' }],
147+
{ repairVerifiedStaleMarkers: true }
148+
)
149+
expect(body.success).toBe(true)
150+
expect(body.chat.activeStreamId).toBeNull()
151+
expect(body.chat.streamSnapshot).toBeUndefined()
152+
expect(mockReadEvents).not.toHaveBeenCalled()
153+
})
154+
155+
it('returns the live activeStreamId when redis confirms the lock', async () => {
156+
mockGetAccessibleCopilotChat.mockResolvedValueOnce({
157+
id: 'chat-live',
158+
type: 'mothership',
159+
title: 'Live',
160+
messages: [],
161+
resources: [],
162+
conversationId: 'stream-live',
163+
createdAt: new Date('2026-05-11T12:00:00Z'),
164+
updatedAt: new Date('2026-05-11T12:00:00Z'),
165+
})
166+
mockGetLatestRunForStream.mockResolvedValueOnce({ status: 'active' })
167+
168+
const response = await GET(createRequest('chat-live'), makeContext('chat-live'))
169+
expect(response.status).toBe(200)
170+
const body = await response.json()
171+
172+
expect(body.chat.activeStreamId).toBe('stream-live')
173+
expect(mockReadEvents).toHaveBeenCalledWith('stream-live', '0')
174+
expect(body.chat.streamSnapshot).toBeDefined()
175+
expect(body.chat.streamSnapshot.status).toBe('active')
176+
})
177+
178+
it('uses the Redis lock owner when it differs from a stale persisted streamId', async () => {
179+
mockGetAccessibleCopilotChat.mockResolvedValueOnce({
180+
id: 'chat-mismatch',
181+
type: 'mothership',
182+
title: 'Mismatch',
183+
messages: [],
184+
resources: [],
185+
conversationId: 'stream-stale',
186+
createdAt: new Date('2026-05-11T12:00:00Z'),
187+
updatedAt: new Date('2026-05-11T12:00:00Z'),
188+
})
189+
mockReconcileChatStreamMarkers.mockResolvedValueOnce(
190+
new Map([
191+
['chat-mismatch', { chatId: 'chat-mismatch', streamId: 'stream-live', status: 'active' }],
192+
])
193+
)
194+
195+
const response = await GET(createRequest('chat-mismatch'), makeContext('chat-mismatch'))
196+
expect(response.status).toBe(200)
197+
const body = await response.json()
198+
199+
expect(body.chat.activeStreamId).toBe('stream-live')
200+
expect(mockReadEvents).toHaveBeenCalledWith('stream-live', '0')
201+
})
202+
203+
it('returns null when the persisted stream marker is already null', async () => {
204+
mockGetAccessibleCopilotChat.mockResolvedValueOnce({
205+
id: 'chat-idle',
206+
type: 'mothership',
207+
title: 'Idle',
208+
messages: [],
209+
resources: [],
210+
conversationId: null,
211+
createdAt: new Date('2026-05-11T12:00:00Z'),
212+
updatedAt: new Date('2026-05-11T12:00:00Z'),
213+
})
214+
215+
const response = await GET(createRequest('chat-idle'), makeContext('chat-idle'))
216+
expect(response.status).toBe(200)
217+
218+
expect(mockReconcileChatStreamMarkers).toHaveBeenCalledWith(
219+
[{ chatId: 'chat-idle', streamId: null }],
220+
{ repairVerifiedStaleMarkers: true }
221+
)
222+
const body = await response.json()
223+
expect(body.chat.activeStreamId).toBeNull()
224+
})
225+
226+
it('returns 404 when the chat does not exist', async () => {
227+
mockGetAccessibleCopilotChat.mockResolvedValueOnce(null)
228+
229+
const response = await GET(createRequest('chat-missing'), makeContext('chat-missing'))
230+
expect(response.status).toBe(404)
231+
expect(mockReconcileChatStreamMarkers).not.toHaveBeenCalled()
232+
})
233+
234+
it('returns 401 when unauthenticated', async () => {
235+
copilotHttpMockFns.mockAuthenticateCopilotRequestSessionOnly.mockResolvedValueOnce({
236+
userId: null,
237+
isAuthenticated: false,
238+
})
239+
240+
const response = await GET(createRequest('chat-x'), makeContext('chat-x'))
241+
expect(response.status).toBe(401)
242+
expect(mockGetAccessibleCopilotChat).not.toHaveBeenCalled()
243+
expect(mockReconcileChatStreamMarkers).not.toHaveBeenCalled()
244+
})
245+
})

apps/sim/app/api/mothership/chats/[chatId]/route.ts

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository'
1414
import { buildEffectiveChatTranscript } from '@/lib/copilot/chat/effective-transcript'
1515
import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle'
1616
import { normalizeMessage } from '@/lib/copilot/chat/persisted-message'
17+
import { reconcileChatStreamMarkers } from '@/lib/copilot/chat/stream-liveness'
1718
import {
1819
authenticateCopilotRequestSessionOnly,
1920
createInternalServerErrorResponse,
@@ -52,23 +53,29 @@ export const GET = withRouteHandler(
5253
status: string
5354
} | null = null
5455

55-
if (chat.conversationId) {
56+
const reconciledMarkers = await reconcileChatStreamMarkers(
57+
[{ chatId: chat.id, streamId: chat.conversationId }],
58+
{ repairVerifiedStaleMarkers: true }
59+
)
60+
const liveStreamId = reconciledMarkers.get(chat.id)?.streamId ?? null
61+
62+
if (liveStreamId) {
5663
try {
5764
const [events, previewSessions] = await Promise.all([
58-
readEvents(chat.conversationId, '0'),
59-
readFilePreviewSessions(chat.conversationId).catch((error) => {
65+
readEvents(liveStreamId, '0'),
66+
readFilePreviewSessions(liveStreamId).catch((error) => {
6067
logger.warn('Failed to read preview sessions for mothership chat', {
6168
chatId,
62-
conversationId: chat.conversationId,
69+
streamId: liveStreamId,
6370
error: toError(error).message,
6471
})
6572
return []
6673
}),
6774
])
68-
const run = await getLatestRunForStream(chat.conversationId, userId).catch((error) => {
75+
const run = await getLatestRunForStream(liveStreamId, userId).catch((error) => {
6976
logger.warn('Failed to fetch latest run for mothership chat snapshot', {
7077
chatId,
71-
conversationId: chat.conversationId,
78+
streamId: liveStreamId,
7279
error: toError(error).message,
7380
})
7481
return null
@@ -87,7 +94,7 @@ export const GET = withRouteHandler(
8794
} catch (error) {
8895
logger.warn('Failed to read stream snapshot for mothership chat', {
8996
chatId,
90-
conversationId: chat.conversationId,
97+
streamId: liveStreamId,
9198
error: toError(error).message,
9299
})
93100
}
@@ -100,7 +107,7 @@ export const GET = withRouteHandler(
100107
: []
101108
const effectiveMessages = buildEffectiveChatTranscript({
102109
messages: normalizedMessages,
103-
activeStreamId: chat.conversationId || null,
110+
activeStreamId: liveStreamId,
104111
...(streamSnapshot ? { streamSnapshot } : {}),
105112
})
106113

@@ -110,7 +117,7 @@ export const GET = withRouteHandler(
110117
id: chat.id,
111118
title: chat.title,
112119
messages: effectiveMessages,
113-
conversationId: chat.conversationId || null,
120+
activeStreamId: liveStreamId,
114121
resources: Array.isArray(chat.resources) ? chat.resources : [],
115122
createdAt: chat.createdAt,
116123
updatedAt: chat.updatedAt,

0 commit comments

Comments
 (0)