Skip to content

Commit 9406a3b

Browse files
committed
address comments
1 parent fca291b commit 9406a3b

10 files changed

Lines changed: 523 additions & 92 deletions

File tree

apps/sim/app/api/mothership/chats/[chatId]/route.test.ts

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'
77

88
const {
99
mockGetAccessibleCopilotChat,
10-
mockGetActiveChatStreamIds,
10+
mockReconcileChatStreamMarkers,
1111
mockReadEvents,
1212
mockReadFilePreviewSessions,
1313
mockGetLatestRunForStream,
1414
} = vi.hoisted(() => ({
1515
mockGetAccessibleCopilotChat: vi.fn(),
16-
mockGetActiveChatStreamIds: vi.fn(),
16+
mockReconcileChatStreamMarkers: vi.fn(),
1717
mockReadEvents: vi.fn(),
1818
mockReadFilePreviewSessions: vi.fn(),
1919
mockGetLatestRunForStream: vi.fn(),
@@ -50,8 +50,8 @@ vi.mock('@/lib/copilot/chat/lifecycle', () => ({
5050
getAccessibleCopilotChat: mockGetAccessibleCopilotChat,
5151
}))
5252

53-
vi.mock('@/lib/copilot/request/session/abort', () => ({
54-
getActiveChatStreamIds: mockGetActiveChatStreamIds,
53+
vi.mock('@/lib/copilot/chat/stream-liveness', () => ({
54+
reconcileChatStreamMarkers: mockReconcileChatStreamMarkers,
5555
}))
5656

5757
vi.mock('@/lib/copilot/request/session/buffer', () => ({
@@ -105,7 +105,19 @@ describe('GET /api/mothership/chats/[chatId]', () => {
105105
userId: 'user-1',
106106
isAuthenticated: true,
107107
})
108-
mockGetActiveChatStreamIds.mockResolvedValue(new Set<string>())
108+
mockReconcileChatStreamMarkers.mockImplementation(
109+
async (candidates: Array<{ chatId: string; streamId: string | null }>) =>
110+
new Map(
111+
candidates.map((candidate) => [
112+
candidate.chatId,
113+
{
114+
chatId: candidate.chatId,
115+
streamId: candidate.streamId,
116+
status: candidate.streamId ? 'active' : 'inactive',
117+
},
118+
])
119+
)
120+
)
109121
mockReadEvents.mockResolvedValue([])
110122
mockReadFilePreviewSessions.mockResolvedValue([])
111123
mockGetLatestRunForStream.mockResolvedValue(null)
@@ -122,13 +134,18 @@ describe('GET /api/mothership/chats/[chatId]', () => {
122134
createdAt: new Date('2026-05-11T12:00:00Z'),
123135
updatedAt: new Date('2026-05-11T12:00:00Z'),
124136
})
125-
mockGetActiveChatStreamIds.mockResolvedValueOnce(new Set<string>())
137+
mockReconcileChatStreamMarkers.mockResolvedValueOnce(
138+
new Map([['chat-stuck', { chatId: 'chat-stuck', streamId: null, status: 'inactive' }]])
139+
)
126140

127141
const response = await GET(createRequest('chat-stuck'), makeContext('chat-stuck'))
128142
expect(response.status).toBe(200)
129143
const body = await response.json()
130144

131-
expect(mockGetActiveChatStreamIds).toHaveBeenCalledWith(['chat-stuck'])
145+
expect(mockReconcileChatStreamMarkers).toHaveBeenCalledWith(
146+
[{ chatId: 'chat-stuck', streamId: 'stream-orphaned' }],
147+
{ repairVerifiedStaleMarkers: true }
148+
)
132149
expect(body.success).toBe(true)
133150
expect(body.chat.conversationId).toBeNull()
134151
expect(body.chat.streamSnapshot).toBeUndefined()
@@ -146,7 +163,6 @@ describe('GET /api/mothership/chats/[chatId]', () => {
146163
createdAt: new Date('2026-05-11T12:00:00Z'),
147164
updatedAt: new Date('2026-05-11T12:00:00Z'),
148165
})
149-
mockGetActiveChatStreamIds.mockResolvedValueOnce(new Set(['chat-live']))
150166
mockGetLatestRunForStream.mockResolvedValueOnce({ status: 'active' })
151167

152168
const response = await GET(createRequest('chat-live'), makeContext('chat-live'))
@@ -159,7 +175,32 @@ describe('GET /api/mothership/chats/[chatId]', () => {
159175
expect(body.chat.streamSnapshot.status).toBe('active')
160176
})
161177

162-
it('skips the reconciliation lookup when conversationId is already null', async () => {
178+
it('uses the Redis lock owner when it differs from a stale conversationId', async () => {
179+
mockGetAccessibleCopilotChat.mockResolvedValueOnce({
180+
id: 'chat-mismatch',
181+
type: 'mothership',
182+
title: 'Mismatch',
183+
messages: [],
184+
resources: [],
185+
conversationId: 'stream-stale',
186+
createdAt: new Date('2026-05-11T12:00:00Z'),
187+
updatedAt: new Date('2026-05-11T12:00:00Z'),
188+
})
189+
mockReconcileChatStreamMarkers.mockResolvedValueOnce(
190+
new Map([
191+
['chat-mismatch', { chatId: 'chat-mismatch', streamId: 'stream-live', status: 'active' }],
192+
])
193+
)
194+
195+
const response = await GET(createRequest('chat-mismatch'), makeContext('chat-mismatch'))
196+
expect(response.status).toBe(200)
197+
const body = await response.json()
198+
199+
expect(body.chat.conversationId).toBe('stream-live')
200+
expect(mockReadEvents).toHaveBeenCalledWith('stream-live', '0')
201+
})
202+
203+
it('returns null when conversationId is already null', async () => {
163204
mockGetAccessibleCopilotChat.mockResolvedValueOnce({
164205
id: 'chat-idle',
165206
type: 'mothership',
@@ -174,7 +215,10 @@ describe('GET /api/mothership/chats/[chatId]', () => {
174215
const response = await GET(createRequest('chat-idle'), makeContext('chat-idle'))
175216
expect(response.status).toBe(200)
176217

177-
expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled()
218+
expect(mockReconcileChatStreamMarkers).toHaveBeenCalledWith(
219+
[{ chatId: 'chat-idle', streamId: null }],
220+
{ repairVerifiedStaleMarkers: true }
221+
)
178222
const body = await response.json()
179223
expect(body.chat.conversationId).toBeNull()
180224
})
@@ -184,7 +228,7 @@ describe('GET /api/mothership/chats/[chatId]', () => {
184228

185229
const response = await GET(createRequest('chat-missing'), makeContext('chat-missing'))
186230
expect(response.status).toBe(404)
187-
expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled()
231+
expect(mockReconcileChatStreamMarkers).not.toHaveBeenCalled()
188232
})
189233

190234
it('returns 401 when unauthenticated', async () => {
@@ -196,6 +240,6 @@ describe('GET /api/mothership/chats/[chatId]', () => {
196240
const response = await GET(createRequest('chat-x'), makeContext('chat-x'))
197241
expect(response.status).toBe(401)
198242
expect(mockGetAccessibleCopilotChat).not.toHaveBeenCalled()
199-
expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled()
243+
expect(mockReconcileChatStreamMarkers).not.toHaveBeenCalled()
200244
})
201245
})

apps/sim/app/api/mothership/chats/[chatId]/route.ts

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository'
1414
import { buildEffectiveChatTranscript } from '@/lib/copilot/chat/effective-transcript'
1515
import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle'
1616
import { normalizeMessage } from '@/lib/copilot/chat/persisted-message'
17+
import { reconcileChatStreamMarkers } from '@/lib/copilot/chat/stream-liveness'
1718
import {
1819
authenticateCopilotRequestSessionOnly,
1920
createInternalServerErrorResponse,
2021
createUnauthorizedResponse,
2122
} from '@/lib/copilot/request/http'
2223
import type { FilePreviewSession } from '@/lib/copilot/request/session'
23-
import { getActiveChatStreamIds } from '@/lib/copilot/request/session/abort'
2424
import { readEvents } from '@/lib/copilot/request/session/buffer'
2525
import { readFilePreviewSessions } from '@/lib/copilot/request/session/file-preview-session'
2626
import { type StreamBatchEvent, toStreamBatchEvent } from '@/lib/copilot/request/session/types'
@@ -53,16 +53,11 @@ export const GET = withRouteHandler(
5353
status: string
5454
} | null = null
5555

56-
// Reconcile the persisted stream marker against the canonical Redis
57-
// lock. If `conversation_id` is set but no lock is held, the stream
58-
// is no longer running (process died before finalize) — treat the
59-
// marker as null so the client doesn't try to reconnect to a dead
60-
// stream. Mirrors the same reconciliation in the task list route.
61-
const activeIds = chat.conversationId
62-
? await getActiveChatStreamIds([chat.id])
63-
: new Set<string>()
64-
const liveConversationId =
65-
chat.conversationId && activeIds.has(chat.id) ? chat.conversationId : null
56+
const reconciledMarkers = await reconcileChatStreamMarkers(
57+
[{ chatId: chat.id, streamId: chat.conversationId }],
58+
{ repairVerifiedStaleMarkers: true }
59+
)
60+
const liveConversationId = reconciledMarkers.get(chat.id)?.streamId ?? null
6661

6762
if (liveConversationId) {
6863
try {

apps/sim/app/api/mothership/chats/route.test.ts

Lines changed: 71 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ import { copilotHttpMock, copilotHttpMockFns, permissionsMock } from '@sim/testi
55
import { NextRequest } from 'next/server'
66
import { beforeEach, describe, expect, it, vi } from 'vitest'
77

8-
const { mockSelect, mockFrom, mockWhere, mockOrderBy, mockGetActiveChatStreamIds } = vi.hoisted(
8+
const { mockSelect, mockFrom, mockWhere, mockOrderBy, mockReconcileChatStreamMarkers } = vi.hoisted(
99
() => ({
1010
mockSelect: vi.fn(),
1111
mockFrom: vi.fn(),
1212
mockWhere: vi.fn(),
1313
mockOrderBy: vi.fn(),
14-
mockGetActiveChatStreamIds: vi.fn(),
14+
mockReconcileChatStreamMarkers: vi.fn(),
1515
})
1616
)
1717

@@ -43,8 +43,8 @@ vi.mock('drizzle-orm', () => ({
4343
vi.mock('@/lib/copilot/request/http', () => copilotHttpMock)
4444
vi.mock('@/lib/workspaces/permissions/utils', () => permissionsMock)
4545

46-
vi.mock('@/lib/copilot/request/session/abort', () => ({
47-
getActiveChatStreamIds: mockGetActiveChatStreamIds,
46+
vi.mock('@/lib/copilot/chat/stream-liveness', () => ({
47+
reconcileChatStreamMarkers: mockReconcileChatStreamMarkers,
4848
}))
4949

5050
vi.mock('@/lib/copilot/tasks', () => ({
@@ -77,7 +77,19 @@ describe('GET /api/mothership/chats', () => {
7777
mockFrom.mockReturnValue({ where: mockWhere })
7878
mockSelect.mockReturnValue({ from: mockFrom })
7979

80-
mockGetActiveChatStreamIds.mockResolvedValue(new Set<string>())
80+
mockReconcileChatStreamMarkers.mockImplementation(
81+
async (candidates: Array<{ chatId: string; streamId: string | null }>) =>
82+
new Map(
83+
candidates.map((candidate) => [
84+
candidate.chatId,
85+
{
86+
chatId: candidate.chatId,
87+
streamId: candidate.streamId,
88+
status: candidate.streamId ? 'active' : 'inactive',
89+
},
90+
])
91+
)
92+
)
8193
})
8294

8395
it('clears activeStreamId on chats whose redis lock has expired (stuck-yellow bug)', async () => {
@@ -105,13 +117,26 @@ describe('GET /api/mothership/chats', () => {
105117
lastSeenAt: null,
106118
},
107119
])
108-
mockGetActiveChatStreamIds.mockResolvedValueOnce(new Set(['chat-live']))
120+
mockReconcileChatStreamMarkers.mockResolvedValueOnce(
121+
new Map([
122+
['chat-stuck', { chatId: 'chat-stuck', streamId: null, status: 'inactive' }],
123+
['chat-live', { chatId: 'chat-live', streamId: 'stream-live', status: 'active' }],
124+
['chat-idle', { chatId: 'chat-idle', streamId: null, status: 'inactive' }],
125+
])
126+
)
109127

110128
const response = await GET(createRequest('ws-1'))
111129
expect(response.status).toBe(200)
112130
const body = await response.json()
113131

114-
expect(mockGetActiveChatStreamIds).toHaveBeenCalledWith(['chat-stuck', 'chat-live'])
132+
expect(mockReconcileChatStreamMarkers).toHaveBeenCalledWith(
133+
[
134+
{ chatId: 'chat-stuck', streamId: 'stream-orphaned' },
135+
{ chatId: 'chat-live', streamId: 'stream-live' },
136+
{ chatId: 'chat-idle', streamId: null },
137+
],
138+
{ repairVerifiedStaleMarkers: true }
139+
)
115140
expect(body.success).toBe(true)
116141
expect(body.data).toEqual([
117142
expect.objectContaining({ id: 'chat-stuck', activeStreamId: null }),
@@ -120,7 +145,7 @@ describe('GET /api/mothership/chats', () => {
120145
])
121146
})
122147

123-
it('issues no Redis MGET when no chat has a stream marker set (empty candidateIds)', async () => {
148+
it('preserves chats when no chat has a stream marker set', async () => {
124149
const now = new Date('2026-05-11T12:00:00Z')
125150
mockOrderBy.mockResolvedValueOnce([
126151
{ id: 'chat-1', title: null, updatedAt: now, activeStreamId: null, lastSeenAt: null },
@@ -130,11 +155,18 @@ describe('GET /api/mothership/chats', () => {
130155
const response = await GET(createRequest('ws-1'))
131156
expect(response.status).toBe(200)
132157

133-
expect(mockGetActiveChatStreamIds).toHaveBeenCalledWith([])
158+
expect(mockReconcileChatStreamMarkers).toHaveBeenCalledWith(
159+
[
160+
{ chatId: 'chat-1', streamId: null },
161+
{ chatId: 'chat-2', streamId: null },
162+
],
163+
{ repairVerifiedStaleMarkers: true }
164+
)
134165
const body = await response.json()
135-
expect(
136-
body.data.every((c: { activeStreamId: string | null }) => c.activeStreamId === null)
137-
).toBe(true)
166+
expect(body.data).toEqual([
167+
expect.objectContaining({ id: 'chat-1', activeStreamId: null }),
168+
expect.objectContaining({ id: 'chat-2', activeStreamId: null }),
169+
])
138170
})
139171

140172
it('leaves activeStreamId untouched when redis confirms every lock is live', async () => {
@@ -143,7 +175,6 @@ describe('GET /api/mothership/chats', () => {
143175
{ id: 'chat-a', title: null, updatedAt: now, activeStreamId: 'stream-a', lastSeenAt: null },
144176
{ id: 'chat-b', title: null, updatedAt: now, activeStreamId: 'stream-b', lastSeenAt: null },
145177
])
146-
mockGetActiveChatStreamIds.mockResolvedValueOnce(new Set(['chat-a', 'chat-b']))
147178

148179
const response = await GET(createRequest('ws-1'))
149180
const body = await response.json()
@@ -154,6 +185,32 @@ describe('GET /api/mothership/chats', () => {
154185
])
155186
})
156187

188+
it('uses Redis lock owner when it differs from a stale activeStreamId', async () => {
189+
const now = new Date('2026-05-11T12:00:00Z')
190+
mockOrderBy.mockResolvedValueOnce([
191+
{
192+
id: 'chat-mismatch',
193+
title: null,
194+
updatedAt: now,
195+
activeStreamId: 'stream-stale',
196+
lastSeenAt: null,
197+
},
198+
])
199+
mockReconcileChatStreamMarkers.mockResolvedValueOnce(
200+
new Map([
201+
['chat-mismatch', { chatId: 'chat-mismatch', streamId: 'stream-live', status: 'active' }],
202+
])
203+
)
204+
205+
const response = await GET(createRequest('ws-1'))
206+
expect(response.status).toBe(200)
207+
const body = await response.json()
208+
209+
expect(body.data).toEqual([
210+
expect.objectContaining({ id: 'chat-mismatch', activeStreamId: 'stream-live' }),
211+
])
212+
})
213+
157214
it('returns 401 when unauthenticated', async () => {
158215
copilotHttpMockFns.mockAuthenticateCopilotRequestSessionOnly.mockResolvedValueOnce({
159216
userId: null,
@@ -163,6 +220,6 @@ describe('GET /api/mothership/chats', () => {
163220
const response = await GET(createRequest('ws-1'))
164221
expect(response.status).toBe(401)
165222
expect(mockSelect).not.toHaveBeenCalled()
166-
expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled()
223+
expect(mockReconcileChatStreamMarkers).not.toHaveBeenCalled()
167224
})
168225
})

apps/sim/app/api/mothership/chats/route.ts

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import {
88
listMothershipChatsContract,
99
} from '@/lib/api/contracts/mothership-tasks'
1010
import { parseRequest } from '@/lib/api/server'
11+
import { reconcileChatStreamMarkers } from '@/lib/copilot/chat/stream-liveness'
1112
import {
1213
authenticateCopilotRequestSessionOnly,
1314
createInternalServerErrorResponse,
1415
createUnauthorizedResponse,
1516
} from '@/lib/copilot/request/http'
16-
import { getActiveChatStreamIds } from '@/lib/copilot/request/session/abort'
1717
import { taskPubSub } from '@/lib/copilot/tasks'
1818
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
1919
import { captureServerEvent } from '@/lib/posthog/server'
@@ -56,17 +56,14 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
5656
)
5757
.orderBy(desc(copilotChats.updatedAt))
5858

59-
// Reconcile the persisted stream marker against the canonical Redis
60-
// lock. `conversation_id` is set when a stream starts and cleared on
61-
// the finalize/stop paths — but if those never run (pod crash, OOM,
62-
// throw before callback), the column is orphaned and the task renders
63-
// yellow forever. The Redis lock self-heals via its 60s TTL, so a
64-
// missing lock means the stream is no longer running.
65-
const candidateIds = chats.filter((c) => c.activeStreamId !== null).map((c) => c.id)
66-
const activeIds = await getActiveChatStreamIds(candidateIds)
67-
const reconciled = chats.map((c) =>
68-
c.activeStreamId !== null && !activeIds.has(c.id) ? { ...c, activeStreamId: null } : c
59+
const streamMarkers = await reconcileChatStreamMarkers(
60+
chats.map((c) => ({ chatId: c.id, streamId: c.activeStreamId })),
61+
{ repairVerifiedStaleMarkers: true }
6962
)
63+
const reconciled = chats.map((c) => {
64+
const activeStreamId = streamMarkers.get(c.id)?.streamId ?? null
65+
return activeStreamId === c.activeStreamId ? c : { ...c, activeStreamId }
66+
})
7067

7168
return NextResponse.json({ success: true, data: reconciled })
7269
} catch (error) {

0 commit comments

Comments
 (0)