From 66fca0fa675c171b03ca1476a23357148fcd4fb8 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Mon, 11 May 2026 12:37:26 -0700 Subject: [PATCH 1/5] fix(mothership): reconcile stuck conversation_id against Redis lock to clear stuck-yellow task tiles copilot_chats.conversation_id has no TTL/heartbeat, so when a stream process dies before the clear path runs (pod OOM, SIGKILL, uncaught throw, deploy mid-stream) the column is orphaned and the task tile renders yellow forever. The Redis lock at copilot:chat-stream-lock: is the canonical liveness signal and self-heals via 60s TTL + 20s heartbeat, but the mothership APIs weren't consulting it. Adds read-time reconciliation: a batched MGET helper checks whether each persisted conversation_id still has a live Redis lock, and both GET /api/mothership/chats and GET /api/mothership/chats/[chatId] rewrite the marker to null when the lock has expired. No DB writes; stuck rows self-heal on next fetch. --- .../mothership/chats/[chatId]/route.test.ts | 201 ++++++++++++++++++ .../api/mothership/chats/[chatId]/route.ts | 30 ++- .../app/api/mothership/chats/route.test.ts | 168 +++++++++++++++ apps/sim/app/api/mothership/chats/route.ts | 15 +- .../lib/copilot/request/session/abort.test.ts | 54 ++++- apps/sim/lib/copilot/request/session/abort.ts | 37 ++++ 6 files changed, 494 insertions(+), 11 deletions(-) create mode 100644 apps/sim/app/api/mothership/chats/[chatId]/route.test.ts create mode 100644 apps/sim/app/api/mothership/chats/route.test.ts diff --git a/apps/sim/app/api/mothership/chats/[chatId]/route.test.ts b/apps/sim/app/api/mothership/chats/[chatId]/route.test.ts new file mode 100644 index 00000000000..8606f1c4e04 --- /dev/null +++ b/apps/sim/app/api/mothership/chats/[chatId]/route.test.ts @@ -0,0 +1,201 @@ +/** + * @vitest-environment node + */ +import { copilotHttpMock, copilotHttpMockFns } from '@sim/testing' +import { NextRequest } from 'next/server' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { + mockGetAccessibleCopilotChat, + mockGetActiveChatStreamIds, + mockReadEvents, + mockReadFilePreviewSessions, + mockGetLatestRunForStream, +} = vi.hoisted(() => ({ + mockGetAccessibleCopilotChat: vi.fn(), + mockGetActiveChatStreamIds: vi.fn(), + mockReadEvents: vi.fn(), + mockReadFilePreviewSessions: vi.fn(), + mockGetLatestRunForStream: vi.fn(), +})) + +vi.mock('@sim/db', () => ({ db: {} })) + +vi.mock('@sim/db/schema', () => ({ + copilotChats: { + id: 'copilotChats.id', + userId: 'copilotChats.userId', + type: 'copilotChats.type', + updatedAt: 'copilotChats.updatedAt', + lastSeenAt: 'copilotChats.lastSeenAt', + }, +})) + +vi.mock('drizzle-orm', () => ({ + and: vi.fn((...conditions: unknown[]) => ({ type: 'and', conditions })), + eq: vi.fn((field: unknown, value: unknown) => ({ type: 'eq', field, value })), + sql: Object.assign( + vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ + type: 'sql', + strings, + values, + })), + { raw: vi.fn() } + ), +})) + +vi.mock('@/lib/copilot/request/http', () => copilotHttpMock) + +vi.mock('@/lib/copilot/chat/lifecycle', () => ({ + getAccessibleCopilotChat: mockGetAccessibleCopilotChat, +})) + +vi.mock('@/lib/copilot/request/session/abort', () => ({ + getActiveChatStreamIds: mockGetActiveChatStreamIds, +})) + +vi.mock('@/lib/copilot/request/session/buffer', () => ({ + readEvents: mockReadEvents, +})) + +vi.mock('@/lib/copilot/request/session/file-preview-session', () => ({ + readFilePreviewSessions: mockReadFilePreviewSessions, +})) + +vi.mock('@/lib/copilot/async-runs/repository', () => ({ + getLatestRunForStream: mockGetLatestRunForStream, +})) + +vi.mock('@/lib/copilot/request/session/types', () => ({ + toStreamBatchEvent: (e: unknown) => e, +})) + +vi.mock('@/lib/copilot/chat/effective-transcript', () => ({ + buildEffectiveChatTranscript: ({ messages }: { messages: unknown[] }) => messages, +})) + +vi.mock('@/lib/copilot/chat/persisted-message', () => ({ + normalizeMessage: (m: unknown) => m, +})) + +vi.mock('@/lib/copilot/tasks', () => ({ + taskPubSub: { publishStatusChanged: vi.fn() }, +})) + +vi.mock('@/lib/posthog/server', () => ({ + captureServerEvent: vi.fn(), +})) + +import { GET } from '@/app/api/mothership/chats/[chatId]/route' + +function makeContext(chatId: string) { + return { params: Promise.resolve({ chatId }) } +} + +function createRequest(chatId: string) { + return new NextRequest(`http://localhost:3000/api/mothership/chats/${chatId}`, { + method: 'GET', + }) +} + +describe('GET /api/mothership/chats/[chatId]', () => { + beforeEach(() => { + vi.clearAllMocks() + copilotHttpMockFns.mockAuthenticateCopilotRequestSessionOnly.mockResolvedValue({ + userId: 'user-1', + isAuthenticated: true, + }) + mockGetActiveChatStreamIds.mockResolvedValue(new Set()) + mockReadEvents.mockResolvedValue([]) + mockReadFilePreviewSessions.mockResolvedValue([]) + mockGetLatestRunForStream.mockResolvedValue(null) + }) + + it('clears conversationId when the redis lock has expired (stuck-yellow bug)', async () => { + mockGetAccessibleCopilotChat.mockResolvedValueOnce({ + id: 'chat-stuck', + type: 'mothership', + title: 'Stuck', + messages: [], + resources: [], + conversationId: 'stream-orphaned', + createdAt: new Date('2026-05-11T12:00:00Z'), + updatedAt: new Date('2026-05-11T12:00:00Z'), + }) + mockGetActiveChatStreamIds.mockResolvedValueOnce(new Set()) + + const response = await GET(createRequest('chat-stuck'), makeContext('chat-stuck')) + expect(response.status).toBe(200) + const body = await response.json() + + expect(mockGetActiveChatStreamIds).toHaveBeenCalledWith(['chat-stuck']) + expect(body.success).toBe(true) + expect(body.chat.conversationId).toBeNull() + expect(body.chat.streamSnapshot).toBeUndefined() + expect(mockReadEvents).not.toHaveBeenCalled() + }) + + it('returns the live conversationId when redis confirms the lock', async () => { + mockGetAccessibleCopilotChat.mockResolvedValueOnce({ + id: 'chat-live', + type: 'mothership', + title: 'Live', + messages: [], + resources: [], + conversationId: 'stream-live', + createdAt: new Date('2026-05-11T12:00:00Z'), + updatedAt: new Date('2026-05-11T12:00:00Z'), + }) + mockGetActiveChatStreamIds.mockResolvedValueOnce(new Set(['chat-live'])) + mockGetLatestRunForStream.mockResolvedValueOnce({ status: 'active' }) + + const response = await GET(createRequest('chat-live'), makeContext('chat-live')) + expect(response.status).toBe(200) + const body = await response.json() + + expect(body.chat.conversationId).toBe('stream-live') + expect(mockReadEvents).toHaveBeenCalledWith('stream-live', '0') + expect(body.chat.streamSnapshot).toBeDefined() + expect(body.chat.streamSnapshot.status).toBe('active') + }) + + it('skips the reconciliation lookup when conversationId is already null', async () => { + mockGetAccessibleCopilotChat.mockResolvedValueOnce({ + id: 'chat-idle', + type: 'mothership', + title: 'Idle', + messages: [], + resources: [], + conversationId: null, + createdAt: new Date('2026-05-11T12:00:00Z'), + updatedAt: new Date('2026-05-11T12:00:00Z'), + }) + + const response = await GET(createRequest('chat-idle'), makeContext('chat-idle')) + expect(response.status).toBe(200) + + expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled() + const body = await response.json() + expect(body.chat.conversationId).toBeNull() + }) + + it('returns 404 when the chat does not exist', async () => { + mockGetAccessibleCopilotChat.mockResolvedValueOnce(null) + + const response = await GET(createRequest('chat-missing'), makeContext('chat-missing')) + expect(response.status).toBe(404) + expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled() + }) + + it('returns 401 when unauthenticated', async () => { + copilotHttpMockFns.mockAuthenticateCopilotRequestSessionOnly.mockResolvedValueOnce({ + userId: null, + isAuthenticated: false, + }) + + const response = await GET(createRequest('chat-x'), makeContext('chat-x')) + expect(response.status).toBe(401) + expect(mockGetAccessibleCopilotChat).not.toHaveBeenCalled() + expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled() + }) +}) diff --git a/apps/sim/app/api/mothership/chats/[chatId]/route.ts b/apps/sim/app/api/mothership/chats/[chatId]/route.ts index 62b7fd46181..2849a79bb4e 100644 --- a/apps/sim/app/api/mothership/chats/[chatId]/route.ts +++ b/apps/sim/app/api/mothership/chats/[chatId]/route.ts @@ -20,6 +20,7 @@ import { createUnauthorizedResponse, } from '@/lib/copilot/request/http' import type { FilePreviewSession } from '@/lib/copilot/request/session' +import { getActiveChatStreamIds } from '@/lib/copilot/request/session/abort' import { readEvents } from '@/lib/copilot/request/session/buffer' import { readFilePreviewSessions } from '@/lib/copilot/request/session/file-preview-session' import { type StreamBatchEvent, toStreamBatchEvent } from '@/lib/copilot/request/session/types' @@ -52,23 +53,34 @@ export const GET = withRouteHandler( status: string } | null = null - if (chat.conversationId) { + // Reconcile the persisted stream marker against the canonical Redis + // lock. If `conversation_id` is set but no lock is held, the stream + // is no longer running (process died before finalize) — treat the + // marker as null so the client doesn't try to reconnect to a dead + // stream. Mirrors the same reconciliation in the task list route. + const activeIds = chat.conversationId + ? await getActiveChatStreamIds([chat.id]) + : new Set() + const liveConversationId = + chat.conversationId && activeIds.has(chat.id) ? chat.conversationId : null + + if (liveConversationId) { try { const [events, previewSessions] = await Promise.all([ - readEvents(chat.conversationId, '0'), - readFilePreviewSessions(chat.conversationId).catch((error) => { + readEvents(liveConversationId, '0'), + readFilePreviewSessions(liveConversationId).catch((error) => { logger.warn('Failed to read preview sessions for mothership chat', { chatId, - conversationId: chat.conversationId, + conversationId: liveConversationId, error: toError(error).message, }) return [] }), ]) - const run = await getLatestRunForStream(chat.conversationId, userId).catch((error) => { + const run = await getLatestRunForStream(liveConversationId, userId).catch((error) => { logger.warn('Failed to fetch latest run for mothership chat snapshot', { chatId, - conversationId: chat.conversationId, + conversationId: liveConversationId, error: toError(error).message, }) return null @@ -87,7 +99,7 @@ export const GET = withRouteHandler( } catch (error) { logger.warn('Failed to read stream snapshot for mothership chat', { chatId, - conversationId: chat.conversationId, + conversationId: liveConversationId, error: toError(error).message, }) } @@ -100,7 +112,7 @@ export const GET = withRouteHandler( : [] const effectiveMessages = buildEffectiveChatTranscript({ messages: normalizedMessages, - activeStreamId: chat.conversationId || null, + activeStreamId: liveConversationId || null, ...(streamSnapshot ? { streamSnapshot } : {}), }) @@ -110,7 +122,7 @@ export const GET = withRouteHandler( id: chat.id, title: chat.title, messages: effectiveMessages, - conversationId: chat.conversationId || null, + conversationId: liveConversationId || null, resources: Array.isArray(chat.resources) ? chat.resources : [], createdAt: chat.createdAt, updatedAt: chat.updatedAt, diff --git a/apps/sim/app/api/mothership/chats/route.test.ts b/apps/sim/app/api/mothership/chats/route.test.ts new file mode 100644 index 00000000000..1ef159ee000 --- /dev/null +++ b/apps/sim/app/api/mothership/chats/route.test.ts @@ -0,0 +1,168 @@ +/** + * @vitest-environment node + */ +import { copilotHttpMock, copilotHttpMockFns, permissionsMock } from '@sim/testing' +import { NextRequest } from 'next/server' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockSelect, mockFrom, mockWhere, mockOrderBy, mockGetActiveChatStreamIds } = vi.hoisted( + () => ({ + mockSelect: vi.fn(), + mockFrom: vi.fn(), + mockWhere: vi.fn(), + mockOrderBy: vi.fn(), + mockGetActiveChatStreamIds: vi.fn(), + }) +) + +vi.mock('@sim/db', () => ({ + db: { + select: mockSelect, + }, +})) + +vi.mock('@sim/db/schema', () => ({ + copilotChats: { + id: 'copilotChats.id', + title: 'copilotChats.title', + userId: 'copilotChats.userId', + workspaceId: 'copilotChats.workspaceId', + type: 'copilotChats.type', + updatedAt: 'copilotChats.updatedAt', + conversationId: 'copilotChats.conversationId', + lastSeenAt: 'copilotChats.lastSeenAt', + }, +})) + +vi.mock('drizzle-orm', () => ({ + and: vi.fn((...conditions: unknown[]) => ({ type: 'and', conditions })), + desc: vi.fn((field: unknown) => ({ type: 'desc', field })), + eq: vi.fn((field: unknown, value: unknown) => ({ type: 'eq', field, value })), +})) + +vi.mock('@/lib/copilot/request/http', () => copilotHttpMock) +vi.mock('@/lib/workspaces/permissions/utils', () => permissionsMock) + +vi.mock('@/lib/copilot/request/session/abort', () => ({ + getActiveChatStreamIds: mockGetActiveChatStreamIds, +})) + +vi.mock('@/lib/copilot/tasks', () => ({ + taskPubSub: { publishStatusChanged: vi.fn() }, +})) + +vi.mock('@/lib/posthog/server', () => ({ + captureServerEvent: vi.fn(), +})) + +import { GET } from '@/app/api/mothership/chats/route' + +function createRequest(workspaceId: string) { + return new NextRequest(`http://localhost:3000/api/mothership/chats?workspaceId=${workspaceId}`, { + method: 'GET', + }) +} + +describe('GET /api/mothership/chats', () => { + beforeEach(() => { + vi.clearAllMocks() + + copilotHttpMockFns.mockAuthenticateCopilotRequestSessionOnly.mockResolvedValue({ + userId: 'user-1', + isAuthenticated: true, + }) + + mockOrderBy.mockResolvedValue([]) + mockWhere.mockReturnValue({ orderBy: mockOrderBy }) + mockFrom.mockReturnValue({ where: mockWhere }) + mockSelect.mockReturnValue({ from: mockFrom }) + + mockGetActiveChatStreamIds.mockResolvedValue(new Set()) + }) + + it('clears activeStreamId on chats whose redis lock has expired (stuck-yellow bug)', async () => { + const now = new Date('2026-05-11T12:00:00Z') + mockOrderBy.mockResolvedValueOnce([ + { + id: 'chat-stuck', + title: 'Stuck chat', + updatedAt: now, + activeStreamId: 'stream-orphaned', + lastSeenAt: null, + }, + { + id: 'chat-live', + title: 'Live chat', + updatedAt: now, + activeStreamId: 'stream-live', + lastSeenAt: null, + }, + { + id: 'chat-idle', + title: 'Idle chat', + updatedAt: now, + activeStreamId: null, + lastSeenAt: null, + }, + ]) + mockGetActiveChatStreamIds.mockResolvedValueOnce(new Set(['chat-live'])) + + const response = await GET(createRequest('ws-1')) + expect(response.status).toBe(200) + const body = await response.json() + + expect(mockGetActiveChatStreamIds).toHaveBeenCalledWith(['chat-stuck', 'chat-live']) + expect(body.success).toBe(true) + expect(body.data).toEqual([ + expect.objectContaining({ id: 'chat-stuck', activeStreamId: null }), + expect.objectContaining({ id: 'chat-live', activeStreamId: 'stream-live' }), + expect.objectContaining({ id: 'chat-idle', activeStreamId: null }), + ]) + }) + + it('skips the reconciliation lookup when no chat has a stream marker set', async () => { + const now = new Date('2026-05-11T12:00:00Z') + mockOrderBy.mockResolvedValueOnce([ + { id: 'chat-1', title: null, updatedAt: now, activeStreamId: null, lastSeenAt: null }, + { id: 'chat-2', title: null, updatedAt: now, activeStreamId: null, lastSeenAt: null }, + ]) + + const response = await GET(createRequest('ws-1')) + expect(response.status).toBe(200) + + expect(mockGetActiveChatStreamIds).toHaveBeenCalledWith([]) + const body = await response.json() + expect( + body.data.every((c: { activeStreamId: string | null }) => c.activeStreamId === null) + ).toBe(true) + }) + + it('leaves activeStreamId untouched when redis confirms every lock is live', async () => { + const now = new Date('2026-05-11T12:00:00Z') + mockOrderBy.mockResolvedValueOnce([ + { id: 'chat-a', title: null, updatedAt: now, activeStreamId: 'stream-a', lastSeenAt: null }, + { id: 'chat-b', title: null, updatedAt: now, activeStreamId: 'stream-b', lastSeenAt: null }, + ]) + mockGetActiveChatStreamIds.mockResolvedValueOnce(new Set(['chat-a', 'chat-b'])) + + const response = await GET(createRequest('ws-1')) + const body = await response.json() + + expect(body.data).toEqual([ + expect.objectContaining({ id: 'chat-a', activeStreamId: 'stream-a' }), + expect.objectContaining({ id: 'chat-b', activeStreamId: 'stream-b' }), + ]) + }) + + it('returns 401 when unauthenticated', async () => { + copilotHttpMockFns.mockAuthenticateCopilotRequestSessionOnly.mockResolvedValueOnce({ + userId: null, + isAuthenticated: false, + }) + + const response = await GET(createRequest('ws-1')) + expect(response.status).toBe(401) + expect(mockSelect).not.toHaveBeenCalled() + expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled() + }) +}) diff --git a/apps/sim/app/api/mothership/chats/route.ts b/apps/sim/app/api/mothership/chats/route.ts index 3e114d16532..1b61c85b04d 100644 --- a/apps/sim/app/api/mothership/chats/route.ts +++ b/apps/sim/app/api/mothership/chats/route.ts @@ -13,6 +13,7 @@ import { createInternalServerErrorResponse, createUnauthorizedResponse, } from '@/lib/copilot/request/http' +import { getActiveChatStreamIds } from '@/lib/copilot/request/session/abort' import { taskPubSub } from '@/lib/copilot/tasks' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { captureServerEvent } from '@/lib/posthog/server' @@ -55,7 +56,19 @@ export const GET = withRouteHandler(async (request: NextRequest) => { ) .orderBy(desc(copilotChats.updatedAt)) - return NextResponse.json({ success: true, data: chats }) + // Reconcile the persisted stream marker against the canonical Redis + // lock. `conversation_id` is set when a stream starts and cleared on + // the finalize/stop paths — but if those never run (pod crash, OOM, + // throw before callback), the column is orphaned and the task renders + // yellow forever. The Redis lock self-heals via its 60s TTL, so a + // missing lock means the stream is no longer running. + const candidateIds = chats.filter((c) => c.activeStreamId !== null).map((c) => c.id) + const activeIds = await getActiveChatStreamIds(candidateIds) + const reconciled = chats.map((c) => + c.activeStreamId !== null && !activeIds.has(c.id) ? { ...c, activeStreamId: null } : c + ) + + return NextResponse.json({ success: true, data: reconciled }) } catch (error) { logger.error('Error fetching mothership chats:', error) return createInternalServerErrorResponse('Failed to fetch chats') diff --git a/apps/sim/lib/copilot/request/session/abort.test.ts b/apps/sim/lib/copilot/request/session/abort.test.ts index bdfd5d39cbb..609a494649d 100644 --- a/apps/sim/lib/copilot/request/session/abort.test.ts +++ b/apps/sim/lib/copilot/request/session/abort.test.ts @@ -22,7 +22,7 @@ vi.mock('@/lib/copilot/request/otel', () => ({ fn({ setAttribute: vi.fn() }), })) -import { startAbortPoller } from '@/lib/copilot/request/session/abort' +import { getActiveChatStreamIds, startAbortPoller } from '@/lib/copilot/request/session/abort' describe('startAbortPoller heartbeat', () => { beforeEach(() => { @@ -159,3 +159,55 @@ describe('startAbortPoller heartbeat', () => { } }) }) + +describe('getActiveChatStreamIds', () => { + beforeEach(() => { + vi.clearAllMocks() + redisConfigMockFns.mockGetRedisClient.mockReturnValue(null) + }) + + it('returns an empty set when no chat ids are provided', async () => { + const active = await getActiveChatStreamIds([]) + expect(active.size).toBe(0) + }) + + it('reports a chat as active when redis returns a lock owner', async () => { + const mget = vi.fn().mockResolvedValue(['stream-1', null, 'stream-3']) + redisConfigMockFns.mockGetRedisClient.mockReturnValue({ mget } as never) + + const active = await getActiveChatStreamIds(['chat-1', 'chat-2', 'chat-3']) + + expect(mget).toHaveBeenCalledWith([ + 'copilot:chat-stream-lock:chat-1', + 'copilot:chat-stream-lock:chat-2', + 'copilot:chat-stream-lock:chat-3', + ]) + expect(active).toEqual(new Set(['chat-1', 'chat-3'])) + }) + + it('reports no chats as active when every lock has expired in redis', async () => { + const mget = vi.fn().mockResolvedValue([null, null]) + redisConfigMockFns.mockGetRedisClient.mockReturnValue({ mget } as never) + + const active = await getActiveChatStreamIds(['chat-stuck-1', 'chat-stuck-2']) + + expect(active.size).toBe(0) + }) + + it('returns an empty set when redis is unavailable', async () => { + redisConfigMockFns.mockGetRedisClient.mockReturnValue(null) + + const active = await getActiveChatStreamIds(['chat-1', 'chat-2']) + + expect(active.size).toBe(0) + }) + + it('falls back to an empty set without throwing when mget rejects', async () => { + const mget = vi.fn().mockRejectedValue(new Error('redis down')) + redisConfigMockFns.mockGetRedisClient.mockReturnValue({ mget } as never) + + const active = await getActiveChatStreamIds(['chat-1', 'chat-2']) + + expect(active.size).toBe(0) + }) +}) diff --git a/apps/sim/lib/copilot/request/session/abort.ts b/apps/sim/lib/copilot/request/session/abort.ts index ce508690361..74101075b27 100644 --- a/apps/sim/lib/copilot/request/session/abort.ts +++ b/apps/sim/lib/copilot/request/session/abort.ts @@ -123,6 +123,43 @@ export async function getPendingChatStreamId(chatId: string): Promise> { + const active = new Set() + if (chatIds.length === 0) return active + + for (const chatId of chatIds) { + if (pendingChatStreams.has(chatId)) active.add(chatId) + } + + const redis = getRedisClient() + if (!redis) return active + + try { + const keys = chatIds.map(getChatStreamLockKey) + const values = await redis.mget(keys) + for (let i = 0; i < chatIds.length; i++) { + if (values[i]) active.add(chatIds[i]) + } + } catch (error) { + logger.warn('Failed to load chat stream lock owners (batch)', { + count: chatIds.length, + error: toError(error).message, + }) + } + + return active +} + export async function releasePendingChatStream(chatId: string, streamId: string): Promise { try { await releaseLock(getChatStreamLockKey(chatId), streamId) From fca291bb27aee729f7dbdb9eb70fee4edebbe301 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Mon, 11 May 2026 12:44:02 -0700 Subject: [PATCH 2/5] test(mothership): clarify test name to reflect that getActiveChatStreamIds is called with empty candidateIds --- apps/sim/app/api/mothership/chats/route.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/sim/app/api/mothership/chats/route.test.ts b/apps/sim/app/api/mothership/chats/route.test.ts index 1ef159ee000..79ef053566c 100644 --- a/apps/sim/app/api/mothership/chats/route.test.ts +++ b/apps/sim/app/api/mothership/chats/route.test.ts @@ -120,7 +120,7 @@ describe('GET /api/mothership/chats', () => { ]) }) - it('skips the reconciliation lookup when no chat has a stream marker set', async () => { + it('issues no Redis MGET when no chat has a stream marker set (empty candidateIds)', async () => { const now = new Date('2026-05-11T12:00:00Z') mockOrderBy.mockResolvedValueOnce([ { id: 'chat-1', title: null, updatedAt: now, activeStreamId: null, lastSeenAt: null }, From 9406a3b677a9568359cd0e273be9cddae566a676 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Tue, 12 May 2026 18:08:48 -0700 Subject: [PATCH 3/5] address comments --- .../mothership/chats/[chatId]/route.test.ts | 68 ++++++-- .../api/mothership/chats/[chatId]/route.ts | 17 +- .../app/api/mothership/chats/route.test.ts | 85 ++++++++-- apps/sim/app/api/mothership/chats/route.ts | 19 +-- .../[workspaceId]/home/hooks/use-chat.ts | 24 +-- .../lib/copilot/chat/stream-liveness.test.ts | 156 ++++++++++++++++++ apps/sim/lib/copilot/chat/stream-liveness.ts | 125 ++++++++++++++ .../lib/copilot/request/session/abort.test.ts | 77 +++++++-- apps/sim/lib/copilot/request/session/abort.ts | 42 +++-- apps/sim/lib/copilot/request/session/index.ts | 2 + 10 files changed, 523 insertions(+), 92 deletions(-) create mode 100644 apps/sim/lib/copilot/chat/stream-liveness.test.ts create mode 100644 apps/sim/lib/copilot/chat/stream-liveness.ts diff --git a/apps/sim/app/api/mothership/chats/[chatId]/route.test.ts b/apps/sim/app/api/mothership/chats/[chatId]/route.test.ts index 8606f1c4e04..d9088e436df 100644 --- a/apps/sim/app/api/mothership/chats/[chatId]/route.test.ts +++ b/apps/sim/app/api/mothership/chats/[chatId]/route.test.ts @@ -7,13 +7,13 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' const { mockGetAccessibleCopilotChat, - mockGetActiveChatStreamIds, + mockReconcileChatStreamMarkers, mockReadEvents, mockReadFilePreviewSessions, mockGetLatestRunForStream, } = vi.hoisted(() => ({ mockGetAccessibleCopilotChat: vi.fn(), - mockGetActiveChatStreamIds: vi.fn(), + mockReconcileChatStreamMarkers: vi.fn(), mockReadEvents: vi.fn(), mockReadFilePreviewSessions: vi.fn(), mockGetLatestRunForStream: vi.fn(), @@ -50,8 +50,8 @@ vi.mock('@/lib/copilot/chat/lifecycle', () => ({ getAccessibleCopilotChat: mockGetAccessibleCopilotChat, })) -vi.mock('@/lib/copilot/request/session/abort', () => ({ - getActiveChatStreamIds: mockGetActiveChatStreamIds, +vi.mock('@/lib/copilot/chat/stream-liveness', () => ({ + reconcileChatStreamMarkers: mockReconcileChatStreamMarkers, })) vi.mock('@/lib/copilot/request/session/buffer', () => ({ @@ -105,7 +105,19 @@ describe('GET /api/mothership/chats/[chatId]', () => { userId: 'user-1', isAuthenticated: true, }) - mockGetActiveChatStreamIds.mockResolvedValue(new Set()) + mockReconcileChatStreamMarkers.mockImplementation( + async (candidates: Array<{ chatId: string; streamId: string | null }>) => + new Map( + candidates.map((candidate) => [ + candidate.chatId, + { + chatId: candidate.chatId, + streamId: candidate.streamId, + status: candidate.streamId ? 'active' : 'inactive', + }, + ]) + ) + ) mockReadEvents.mockResolvedValue([]) mockReadFilePreviewSessions.mockResolvedValue([]) mockGetLatestRunForStream.mockResolvedValue(null) @@ -122,13 +134,18 @@ describe('GET /api/mothership/chats/[chatId]', () => { createdAt: new Date('2026-05-11T12:00:00Z'), updatedAt: new Date('2026-05-11T12:00:00Z'), }) - mockGetActiveChatStreamIds.mockResolvedValueOnce(new Set()) + mockReconcileChatStreamMarkers.mockResolvedValueOnce( + new Map([['chat-stuck', { chatId: 'chat-stuck', streamId: null, status: 'inactive' }]]) + ) const response = await GET(createRequest('chat-stuck'), makeContext('chat-stuck')) expect(response.status).toBe(200) const body = await response.json() - expect(mockGetActiveChatStreamIds).toHaveBeenCalledWith(['chat-stuck']) + expect(mockReconcileChatStreamMarkers).toHaveBeenCalledWith( + [{ chatId: 'chat-stuck', streamId: 'stream-orphaned' }], + { repairVerifiedStaleMarkers: true } + ) expect(body.success).toBe(true) expect(body.chat.conversationId).toBeNull() expect(body.chat.streamSnapshot).toBeUndefined() @@ -146,7 +163,6 @@ describe('GET /api/mothership/chats/[chatId]', () => { createdAt: new Date('2026-05-11T12:00:00Z'), updatedAt: new Date('2026-05-11T12:00:00Z'), }) - mockGetActiveChatStreamIds.mockResolvedValueOnce(new Set(['chat-live'])) mockGetLatestRunForStream.mockResolvedValueOnce({ status: 'active' }) const response = await GET(createRequest('chat-live'), makeContext('chat-live')) @@ -159,7 +175,32 @@ describe('GET /api/mothership/chats/[chatId]', () => { expect(body.chat.streamSnapshot.status).toBe('active') }) - it('skips the reconciliation lookup when conversationId is already null', async () => { + it('uses the Redis lock owner when it differs from a stale conversationId', async () => { + mockGetAccessibleCopilotChat.mockResolvedValueOnce({ + id: 'chat-mismatch', + type: 'mothership', + title: 'Mismatch', + messages: [], + resources: [], + conversationId: 'stream-stale', + createdAt: new Date('2026-05-11T12:00:00Z'), + updatedAt: new Date('2026-05-11T12:00:00Z'), + }) + mockReconcileChatStreamMarkers.mockResolvedValueOnce( + new Map([ + ['chat-mismatch', { chatId: 'chat-mismatch', streamId: 'stream-live', status: 'active' }], + ]) + ) + + const response = await GET(createRequest('chat-mismatch'), makeContext('chat-mismatch')) + expect(response.status).toBe(200) + const body = await response.json() + + expect(body.chat.conversationId).toBe('stream-live') + expect(mockReadEvents).toHaveBeenCalledWith('stream-live', '0') + }) + + it('returns null when conversationId is already null', async () => { mockGetAccessibleCopilotChat.mockResolvedValueOnce({ id: 'chat-idle', type: 'mothership', @@ -174,7 +215,10 @@ describe('GET /api/mothership/chats/[chatId]', () => { const response = await GET(createRequest('chat-idle'), makeContext('chat-idle')) expect(response.status).toBe(200) - expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled() + expect(mockReconcileChatStreamMarkers).toHaveBeenCalledWith( + [{ chatId: 'chat-idle', streamId: null }], + { repairVerifiedStaleMarkers: true } + ) const body = await response.json() expect(body.chat.conversationId).toBeNull() }) @@ -184,7 +228,7 @@ describe('GET /api/mothership/chats/[chatId]', () => { const response = await GET(createRequest('chat-missing'), makeContext('chat-missing')) expect(response.status).toBe(404) - expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled() + expect(mockReconcileChatStreamMarkers).not.toHaveBeenCalled() }) it('returns 401 when unauthenticated', async () => { @@ -196,6 +240,6 @@ describe('GET /api/mothership/chats/[chatId]', () => { const response = await GET(createRequest('chat-x'), makeContext('chat-x')) expect(response.status).toBe(401) expect(mockGetAccessibleCopilotChat).not.toHaveBeenCalled() - expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled() + expect(mockReconcileChatStreamMarkers).not.toHaveBeenCalled() }) }) diff --git a/apps/sim/app/api/mothership/chats/[chatId]/route.ts b/apps/sim/app/api/mothership/chats/[chatId]/route.ts index 2849a79bb4e..0a74b5b2ed2 100644 --- a/apps/sim/app/api/mothership/chats/[chatId]/route.ts +++ b/apps/sim/app/api/mothership/chats/[chatId]/route.ts @@ -14,13 +14,13 @@ import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository' import { buildEffectiveChatTranscript } from '@/lib/copilot/chat/effective-transcript' import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle' import { normalizeMessage } from '@/lib/copilot/chat/persisted-message' +import { reconcileChatStreamMarkers } from '@/lib/copilot/chat/stream-liveness' import { authenticateCopilotRequestSessionOnly, createInternalServerErrorResponse, createUnauthorizedResponse, } from '@/lib/copilot/request/http' import type { FilePreviewSession } from '@/lib/copilot/request/session' -import { getActiveChatStreamIds } from '@/lib/copilot/request/session/abort' import { readEvents } from '@/lib/copilot/request/session/buffer' import { readFilePreviewSessions } from '@/lib/copilot/request/session/file-preview-session' import { type StreamBatchEvent, toStreamBatchEvent } from '@/lib/copilot/request/session/types' @@ -53,16 +53,11 @@ export const GET = withRouteHandler( status: string } | null = null - // Reconcile the persisted stream marker against the canonical Redis - // lock. If `conversation_id` is set but no lock is held, the stream - // is no longer running (process died before finalize) — treat the - // marker as null so the client doesn't try to reconnect to a dead - // stream. Mirrors the same reconciliation in the task list route. - const activeIds = chat.conversationId - ? await getActiveChatStreamIds([chat.id]) - : new Set() - const liveConversationId = - chat.conversationId && activeIds.has(chat.id) ? chat.conversationId : null + const reconciledMarkers = await reconcileChatStreamMarkers( + [{ chatId: chat.id, streamId: chat.conversationId }], + { repairVerifiedStaleMarkers: true } + ) + const liveConversationId = reconciledMarkers.get(chat.id)?.streamId ?? null if (liveConversationId) { try { diff --git a/apps/sim/app/api/mothership/chats/route.test.ts b/apps/sim/app/api/mothership/chats/route.test.ts index 79ef053566c..5851d1b45df 100644 --- a/apps/sim/app/api/mothership/chats/route.test.ts +++ b/apps/sim/app/api/mothership/chats/route.test.ts @@ -5,13 +5,13 @@ import { copilotHttpMock, copilotHttpMockFns, permissionsMock } from '@sim/testi import { NextRequest } from 'next/server' import { beforeEach, describe, expect, it, vi } from 'vitest' -const { mockSelect, mockFrom, mockWhere, mockOrderBy, mockGetActiveChatStreamIds } = vi.hoisted( +const { mockSelect, mockFrom, mockWhere, mockOrderBy, mockReconcileChatStreamMarkers } = vi.hoisted( () => ({ mockSelect: vi.fn(), mockFrom: vi.fn(), mockWhere: vi.fn(), mockOrderBy: vi.fn(), - mockGetActiveChatStreamIds: vi.fn(), + mockReconcileChatStreamMarkers: vi.fn(), }) ) @@ -43,8 +43,8 @@ vi.mock('drizzle-orm', () => ({ vi.mock('@/lib/copilot/request/http', () => copilotHttpMock) vi.mock('@/lib/workspaces/permissions/utils', () => permissionsMock) -vi.mock('@/lib/copilot/request/session/abort', () => ({ - getActiveChatStreamIds: mockGetActiveChatStreamIds, +vi.mock('@/lib/copilot/chat/stream-liveness', () => ({ + reconcileChatStreamMarkers: mockReconcileChatStreamMarkers, })) vi.mock('@/lib/copilot/tasks', () => ({ @@ -77,7 +77,19 @@ describe('GET /api/mothership/chats', () => { mockFrom.mockReturnValue({ where: mockWhere }) mockSelect.mockReturnValue({ from: mockFrom }) - mockGetActiveChatStreamIds.mockResolvedValue(new Set()) + mockReconcileChatStreamMarkers.mockImplementation( + async (candidates: Array<{ chatId: string; streamId: string | null }>) => + new Map( + candidates.map((candidate) => [ + candidate.chatId, + { + chatId: candidate.chatId, + streamId: candidate.streamId, + status: candidate.streamId ? 'active' : 'inactive', + }, + ]) + ) + ) }) it('clears activeStreamId on chats whose redis lock has expired (stuck-yellow bug)', async () => { @@ -105,13 +117,26 @@ describe('GET /api/mothership/chats', () => { lastSeenAt: null, }, ]) - mockGetActiveChatStreamIds.mockResolvedValueOnce(new Set(['chat-live'])) + mockReconcileChatStreamMarkers.mockResolvedValueOnce( + new Map([ + ['chat-stuck', { chatId: 'chat-stuck', streamId: null, status: 'inactive' }], + ['chat-live', { chatId: 'chat-live', streamId: 'stream-live', status: 'active' }], + ['chat-idle', { chatId: 'chat-idle', streamId: null, status: 'inactive' }], + ]) + ) const response = await GET(createRequest('ws-1')) expect(response.status).toBe(200) const body = await response.json() - expect(mockGetActiveChatStreamIds).toHaveBeenCalledWith(['chat-stuck', 'chat-live']) + expect(mockReconcileChatStreamMarkers).toHaveBeenCalledWith( + [ + { chatId: 'chat-stuck', streamId: 'stream-orphaned' }, + { chatId: 'chat-live', streamId: 'stream-live' }, + { chatId: 'chat-idle', streamId: null }, + ], + { repairVerifiedStaleMarkers: true } + ) expect(body.success).toBe(true) expect(body.data).toEqual([ expect.objectContaining({ id: 'chat-stuck', activeStreamId: null }), @@ -120,7 +145,7 @@ describe('GET /api/mothership/chats', () => { ]) }) - it('issues no Redis MGET when no chat has a stream marker set (empty candidateIds)', async () => { + it('preserves chats when no chat has a stream marker set', async () => { const now = new Date('2026-05-11T12:00:00Z') mockOrderBy.mockResolvedValueOnce([ { id: 'chat-1', title: null, updatedAt: now, activeStreamId: null, lastSeenAt: null }, @@ -130,11 +155,18 @@ describe('GET /api/mothership/chats', () => { const response = await GET(createRequest('ws-1')) expect(response.status).toBe(200) - expect(mockGetActiveChatStreamIds).toHaveBeenCalledWith([]) + expect(mockReconcileChatStreamMarkers).toHaveBeenCalledWith( + [ + { chatId: 'chat-1', streamId: null }, + { chatId: 'chat-2', streamId: null }, + ], + { repairVerifiedStaleMarkers: true } + ) const body = await response.json() - expect( - body.data.every((c: { activeStreamId: string | null }) => c.activeStreamId === null) - ).toBe(true) + expect(body.data).toEqual([ + expect.objectContaining({ id: 'chat-1', activeStreamId: null }), + expect.objectContaining({ id: 'chat-2', activeStreamId: null }), + ]) }) it('leaves activeStreamId untouched when redis confirms every lock is live', async () => { @@ -143,7 +175,6 @@ describe('GET /api/mothership/chats', () => { { id: 'chat-a', title: null, updatedAt: now, activeStreamId: 'stream-a', lastSeenAt: null }, { id: 'chat-b', title: null, updatedAt: now, activeStreamId: 'stream-b', lastSeenAt: null }, ]) - mockGetActiveChatStreamIds.mockResolvedValueOnce(new Set(['chat-a', 'chat-b'])) const response = await GET(createRequest('ws-1')) const body = await response.json() @@ -154,6 +185,32 @@ describe('GET /api/mothership/chats', () => { ]) }) + it('uses Redis lock owner when it differs from a stale activeStreamId', async () => { + const now = new Date('2026-05-11T12:00:00Z') + mockOrderBy.mockResolvedValueOnce([ + { + id: 'chat-mismatch', + title: null, + updatedAt: now, + activeStreamId: 'stream-stale', + lastSeenAt: null, + }, + ]) + mockReconcileChatStreamMarkers.mockResolvedValueOnce( + new Map([ + ['chat-mismatch', { chatId: 'chat-mismatch', streamId: 'stream-live', status: 'active' }], + ]) + ) + + const response = await GET(createRequest('ws-1')) + expect(response.status).toBe(200) + const body = await response.json() + + expect(body.data).toEqual([ + expect.objectContaining({ id: 'chat-mismatch', activeStreamId: 'stream-live' }), + ]) + }) + it('returns 401 when unauthenticated', async () => { copilotHttpMockFns.mockAuthenticateCopilotRequestSessionOnly.mockResolvedValueOnce({ userId: null, @@ -163,6 +220,6 @@ describe('GET /api/mothership/chats', () => { const response = await GET(createRequest('ws-1')) expect(response.status).toBe(401) expect(mockSelect).not.toHaveBeenCalled() - expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled() + expect(mockReconcileChatStreamMarkers).not.toHaveBeenCalled() }) }) diff --git a/apps/sim/app/api/mothership/chats/route.ts b/apps/sim/app/api/mothership/chats/route.ts index 1b61c85b04d..b0a068fabc1 100644 --- a/apps/sim/app/api/mothership/chats/route.ts +++ b/apps/sim/app/api/mothership/chats/route.ts @@ -8,12 +8,12 @@ import { listMothershipChatsContract, } from '@/lib/api/contracts/mothership-tasks' import { parseRequest } from '@/lib/api/server' +import { reconcileChatStreamMarkers } from '@/lib/copilot/chat/stream-liveness' import { authenticateCopilotRequestSessionOnly, createInternalServerErrorResponse, createUnauthorizedResponse, } from '@/lib/copilot/request/http' -import { getActiveChatStreamIds } from '@/lib/copilot/request/session/abort' import { taskPubSub } from '@/lib/copilot/tasks' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { captureServerEvent } from '@/lib/posthog/server' @@ -56,17 +56,14 @@ export const GET = withRouteHandler(async (request: NextRequest) => { ) .orderBy(desc(copilotChats.updatedAt)) - // Reconcile the persisted stream marker against the canonical Redis - // lock. `conversation_id` is set when a stream starts and cleared on - // the finalize/stop paths — but if those never run (pod crash, OOM, - // throw before callback), the column is orphaned and the task renders - // yellow forever. The Redis lock self-heals via its 60s TTL, so a - // missing lock means the stream is no longer running. - const candidateIds = chats.filter((c) => c.activeStreamId !== null).map((c) => c.id) - const activeIds = await getActiveChatStreamIds(candidateIds) - const reconciled = chats.map((c) => - c.activeStreamId !== null && !activeIds.has(c.id) ? { ...c, activeStreamId: null } : c + const streamMarkers = await reconcileChatStreamMarkers( + chats.map((c) => ({ chatId: c.id, streamId: c.activeStreamId })), + { repairVerifiedStaleMarkers: true } ) + const reconciled = chats.map((c) => { + const activeStreamId = streamMarkers.get(c.id)?.streamId ?? null + return activeStreamId === c.activeStreamId ? c : { ...c, activeStreamId } + }) return NextResponse.json({ success: true, data: reconciled }) } catch (error) { diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 4e7d5138e69..0fbcffe2994 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -3495,11 +3495,11 @@ export function useChat( processSSEStreamRef.current = processSSEStream const getActiveStreamIdForChat = useCallback( - async (chatId: string, signal?: AbortSignal): Promise => { + async ( + chatId: string, + signal?: AbortSignal + ): Promise<{ loaded: boolean; streamId: string | null }> => { const cached = queryClient.getQueryData(taskKeys.detail(chatId)) - if (cached?.activeStreamId) { - return cached.activeStreamId - } try { const fetchSignal = combineAbortSignals( @@ -3507,15 +3507,15 @@ export function useChat( createTimeoutSignal(CHAT_HISTORY_RECOVERY_TIMEOUT_MS) ) const history = await fetchChatHistory(chatId, fetchSignal) - if (signal?.aborted || fetchSignal?.aborted) return null + if (signal?.aborted || fetchSignal?.aborted) return { loaded: false, streamId: null } queryClient.setQueryData(taskKeys.detail(chatId), history) - return history.activeStreamId ?? null + return { loaded: true, streamId: history.activeStreamId ?? null } } catch (error) { logger.warn('Failed to load chat history while recovering stream', { chatId, error: toError(error).message, }) - return null + return { loaded: false, streamId: cached?.activeStreamId ?? null } } }, [queryClient] @@ -4028,12 +4028,12 @@ export function useChat( !recoveryController.signal.aborted const cached = queryClient.getQueryData(taskKeys.detail(chatId)) - let streamId = + const fallbackStreamId = streamIdRef.current ?? activeTurnRef.current?.userMessageId ?? cached?.activeStreamId - if (!streamId) { - streamId = - (await getActiveStreamIdForChat(chatId, recoveryController.signal)) ?? undefined - } + const loadedStream = await getActiveStreamIdForChat(chatId, recoveryController.signal) + const streamId = loadedStream.loaded + ? (loadedStream.streamId ?? undefined) + : fallbackStreamId if ( !isSameRecoverySubject() || streamGenRef.current !== observedGeneration || diff --git a/apps/sim/lib/copilot/chat/stream-liveness.test.ts b/apps/sim/lib/copilot/chat/stream-liveness.test.ts new file mode 100644 index 00000000000..b8743ad7b9d --- /dev/null +++ b/apps/sim/lib/copilot/chat/stream-liveness.test.ts @@ -0,0 +1,156 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockAnd, mockEq, mockGetChatStreamLockOwners, mockSet, mockUpdate, mockWhere } = vi.hoisted( + () => ({ + mockAnd: vi.fn((...conditions: unknown[]) => ({ type: 'and', conditions })), + mockEq: vi.fn((field: unknown, value: unknown) => ({ type: 'eq', field, value })), + mockGetChatStreamLockOwners: vi.fn(), + mockSet: vi.fn(), + mockUpdate: vi.fn(), + mockWhere: vi.fn(), + }) +) + +vi.mock('@sim/db', () => ({ + db: { update: mockUpdate }, +})) + +vi.mock('@sim/db/schema', () => ({ + copilotChats: { + id: 'copilotChats.id', + conversationId: 'copilotChats.conversationId', + }, +})) + +vi.mock('drizzle-orm', () => ({ + and: mockAnd, + eq: mockEq, +})) + +vi.mock('@/lib/copilot/request/session', () => ({ + getChatStreamLockOwners: mockGetChatStreamLockOwners, +})) + +import { reconcileChatStreamMarkers } from '@/lib/copilot/chat/stream-liveness' + +describe('reconcileChatStreamMarkers', () => { + beforeEach(() => { + vi.clearAllMocks() + mockSet.mockReturnValue({ where: mockWhere }) + mockUpdate.mockReturnValue({ set: mockSet }) + mockWhere.mockResolvedValue(undefined) + mockGetChatStreamLockOwners.mockResolvedValue({ + status: 'verified', + ownersByChatId: new Map(), + }) + }) + + it('clears a persisted stream marker when Redis verifies no lock owner exists', async () => { + const markers = await reconcileChatStreamMarkers([ + { chatId: 'chat-stuck', streamId: 'stream-orphaned' }, + ]) + + expect(mockGetChatStreamLockOwners).toHaveBeenCalledWith(['chat-stuck']) + expect(markers.get('chat-stuck')).toEqual({ + chatId: 'chat-stuck', + streamId: null, + status: 'inactive', + }) + }) + + it('repairs a verified stale persisted stream marker when requested', async () => { + await reconcileChatStreamMarkers([{ chatId: 'chat-stuck', streamId: 'stream-orphaned' }], { + repairVerifiedStaleMarkers: true, + }) + + expect(mockUpdate).toHaveBeenCalled() + expect(mockSet).toHaveBeenCalledWith({ conversationId: null }) + expect(mockWhere).toHaveBeenCalledWith( + mockAnd( + mockEq('copilotChats.id', 'chat-stuck'), + mockEq('copilotChats.conversationId', 'stream-orphaned') + ) + ) + }) + + it('uses the canonical Redis owner when the persisted stream marker is stale', async () => { + mockGetChatStreamLockOwners.mockResolvedValueOnce({ + status: 'verified', + ownersByChatId: new Map([['chat-mismatch', 'stream-live']]), + }) + + const markers = await reconcileChatStreamMarkers([ + { chatId: 'chat-mismatch', streamId: 'stream-stale' }, + ]) + + expect(markers.get('chat-mismatch')).toEqual({ + chatId: 'chat-mismatch', + streamId: 'stream-live', + status: 'active', + }) + }) + + it('preserves persisted stream markers when Redis state is unknown', async () => { + mockGetChatStreamLockOwners.mockResolvedValueOnce({ + status: 'unknown', + ownersByChatId: new Map(), + }) + + const markers = await reconcileChatStreamMarkers([ + { chatId: 'chat-remote', streamId: 'stream-remote' }, + ]) + + expect(markers.get('chat-remote')).toEqual({ + chatId: 'chat-remote', + streamId: 'stream-remote', + status: 'unknown', + }) + }) + + it('preserves a persisted marker when unknown local owner differs', async () => { + mockGetChatStreamLockOwners.mockResolvedValueOnce({ + status: 'unknown', + ownersByChatId: new Map([['chat-mismatch', 'stream-local']]), + }) + + const markers = await reconcileChatStreamMarkers([ + { chatId: 'chat-mismatch', streamId: 'stream-persisted' }, + ]) + + expect(markers.get('chat-mismatch')).toEqual({ + chatId: 'chat-mismatch', + streamId: 'stream-persisted', + status: 'unknown', + }) + }) + + it('detects a live lock before the persisted marker has been written', async () => { + mockGetChatStreamLockOwners.mockResolvedValueOnce({ + status: 'verified', + ownersByChatId: new Map([['chat-starting', 'stream-starting']]), + }) + + const markers = await reconcileChatStreamMarkers([{ chatId: 'chat-starting', streamId: null }]) + + expect(mockGetChatStreamLockOwners).toHaveBeenCalledWith(['chat-starting']) + expect(markers.get('chat-starting')).toEqual({ + chatId: 'chat-starting', + streamId: 'stream-starting', + status: 'active', + }) + }) + + it('queries locks even when no persisted markers exist', async () => { + const markers = await reconcileChatStreamMarkers([{ chatId: 'chat-idle', streamId: null }]) + + expect(mockGetChatStreamLockOwners).toHaveBeenCalledWith(['chat-idle']) + expect(markers.get('chat-idle')).toEqual({ + chatId: 'chat-idle', + streamId: null, + status: 'inactive', + }) + }) +}) diff --git a/apps/sim/lib/copilot/chat/stream-liveness.ts b/apps/sim/lib/copilot/chat/stream-liveness.ts new file mode 100644 index 00000000000..1c8de2a3e07 --- /dev/null +++ b/apps/sim/lib/copilot/chat/stream-liveness.ts @@ -0,0 +1,125 @@ +import { db } from '@sim/db' +import { copilotChats } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' +import { and, eq } from 'drizzle-orm' +import { getChatStreamLockOwners } from '@/lib/copilot/request/session' + +const logger = createLogger('ChatStreamLiveness') + +export interface ChatStreamMarkerCandidate { + chatId: string + streamId: string | null +} + +export interface ReconciledChatStreamMarker { + chatId: string + streamId: string | null + status: 'active' | 'inactive' | 'unknown' +} + +interface ReconcileChatStreamMarkersOptions { + repairVerifiedStaleMarkers?: boolean +} + +/** + * Reconciles persisted chat stream markers against the runtime stream lock. + * + * Redis lock ownership is the canonical live-stream signal. When the lookup is + * verified, missing owners clear stale persisted markers and present owners win + * over stale DB values. When Redis state is unknown, persisted markers are + * preserved so a transient Redis failure in a multi-pod deployment does not + * incorrectly hide a live stream owned by another pod. + */ +export async function reconcileChatStreamMarkers( + candidates: ChatStreamMarkerCandidate[], + options: ReconcileChatStreamMarkersOptions = {} +): Promise> { + const results = new Map() + + for (const candidate of candidates) { + results.set(candidate.chatId, { + chatId: candidate.chatId, + streamId: candidate.streamId, + status: candidate.streamId ? 'unknown' : 'inactive', + }) + } + + if (candidates.length === 0) { + return results + } + + const { status, ownersByChatId } = await getChatStreamLockOwners( + candidates.map((candidate) => candidate.chatId) + ) + + for (const candidate of candidates) { + const owner = ownersByChatId.get(candidate.chatId) + if ( + owner && + (status === 'verified' || owner === candidate.streamId || candidate.streamId === null) + ) { + results.set(candidate.chatId, { + chatId: candidate.chatId, + streamId: owner, + status: 'active', + }) + continue + } + + if (status === 'verified' && candidate.streamId !== null) { + results.set(candidate.chatId, { + chatId: candidate.chatId, + streamId: null, + status: 'inactive', + }) + continue + } + + results.set(candidate.chatId, { + chatId: candidate.chatId, + streamId: candidate.streamId, + status: candidate.streamId ? 'unknown' : 'inactive', + }) + } + + if (options.repairVerifiedStaleMarkers) { + await repairVerifiedStaleMarkers(candidates, results) + } + + return results +} + +async function repairVerifiedStaleMarkers( + candidates: ChatStreamMarkerCandidate[], + results: Map +): Promise { + const staleCandidates = candidates.filter((candidate) => { + const result = results.get(candidate.chatId) + return candidate.streamId !== null && result?.status === 'inactive' && result.streamId === null + }) + + if (staleCandidates.length === 0) return + + await Promise.all( + staleCandidates.map(async (candidate) => { + try { + await db + .update(copilotChats) + .set({ conversationId: null }) + .where( + and( + eq(copilotChats.id, candidate.chatId), + eq(copilotChats.conversationId, candidate.streamId) + ) + ) + } catch (error) { + logger.warn('Failed to repair stale chat stream marker', { + chatId: candidate.chatId, + streamId: candidate.streamId, + error: toError(error).message, + }) + } + }) + ) +} diff --git a/apps/sim/lib/copilot/request/session/abort.test.ts b/apps/sim/lib/copilot/request/session/abort.test.ts index 609a494649d..9e8e2feae82 100644 --- a/apps/sim/lib/copilot/request/session/abort.test.ts +++ b/apps/sim/lib/copilot/request/session/abort.test.ts @@ -22,7 +22,12 @@ vi.mock('@/lib/copilot/request/otel', () => ({ fn({ setAttribute: vi.fn() }), })) -import { getActiveChatStreamIds, startAbortPoller } from '@/lib/copilot/request/session/abort' +import { + acquirePendingChatStream, + getChatStreamLockOwners, + releasePendingChatStream, + startAbortPoller, +} from '@/lib/copilot/request/session/abort' describe('startAbortPoller heartbeat', () => { beforeEach(() => { @@ -160,54 +165,92 @@ describe('startAbortPoller heartbeat', () => { }) }) -describe('getActiveChatStreamIds', () => { +describe('getChatStreamLockOwners', () => { beforeEach(() => { vi.clearAllMocks() redisConfigMockFns.mockGetRedisClient.mockReturnValue(null) }) - it('returns an empty set when no chat ids are provided', async () => { - const active = await getActiveChatStreamIds([]) - expect(active.size).toBe(0) + it('returns a verified empty owner map when no chat ids are provided', async () => { + const result = await getChatStreamLockOwners([]) + expect(result.status).toBe('verified') + expect(result.ownersByChatId.size).toBe(0) }) - it('reports a chat as active when redis returns a lock owner', async () => { + it('returns Redis lock owners keyed by chat id', async () => { const mget = vi.fn().mockResolvedValue(['stream-1', null, 'stream-3']) redisConfigMockFns.mockGetRedisClient.mockReturnValue({ mget } as never) - const active = await getActiveChatStreamIds(['chat-1', 'chat-2', 'chat-3']) + const result = await getChatStreamLockOwners(['chat-1', 'chat-2', 'chat-3']) expect(mget).toHaveBeenCalledWith([ 'copilot:chat-stream-lock:chat-1', 'copilot:chat-stream-lock:chat-2', 'copilot:chat-stream-lock:chat-3', ]) - expect(active).toEqual(new Set(['chat-1', 'chat-3'])) + expect(result.status).toBe('verified') + expect(result.ownersByChatId).toEqual( + new Map([ + ['chat-1', 'stream-1'], + ['chat-3', 'stream-3'], + ]) + ) }) - it('reports no chats as active when every lock has expired in redis', async () => { + it('returns a verified empty map when every lock has expired in Redis', async () => { const mget = vi.fn().mockResolvedValue([null, null]) redisConfigMockFns.mockGetRedisClient.mockReturnValue({ mget } as never) - const active = await getActiveChatStreamIds(['chat-stuck-1', 'chat-stuck-2']) + const result = await getChatStreamLockOwners(['chat-stuck-1', 'chat-stuck-2']) - expect(active.size).toBe(0) + expect(result.status).toBe('verified') + expect(result.ownersByChatId.size).toBe(0) }) - it('returns an empty set when redis is unavailable', async () => { + it('trusts verified Redis null over a process-local pending stream', async () => { + const mget = vi.fn().mockResolvedValue([null]) + redisConfigMockFns.mockGetRedisClient.mockReturnValue({ mget } as never) + await acquirePendingChatStream('chat-local', 'stream-local') + + try { + const result = await getChatStreamLockOwners(['chat-local']) + + expect(result.status).toBe('verified') + expect(result.ownersByChatId.size).toBe(0) + } finally { + await releasePendingChatStream('chat-local', 'stream-local') + } + }) + + it('returns unknown status when Redis is unavailable', async () => { redisConfigMockFns.mockGetRedisClient.mockReturnValue(null) - const active = await getActiveChatStreamIds(['chat-1', 'chat-2']) + const result = await getChatStreamLockOwners(['chat-1', 'chat-2']) - expect(active.size).toBe(0) + expect(result.status).toBe('unknown') + expect(result.ownersByChatId.size).toBe(0) + }) + + it('preserves local pending stream owners when Redis is unavailable', async () => { + await acquirePendingChatStream('chat-local', 'stream-local') + + try { + const result = await getChatStreamLockOwners(['chat-local', 'chat-remote']) + + expect(result.status).toBe('unknown') + expect(result.ownersByChatId).toEqual(new Map([['chat-local', 'stream-local']])) + } finally { + await releasePendingChatStream('chat-local', 'stream-local') + } }) - it('falls back to an empty set without throwing when mget rejects', async () => { + it('returns unknown status without throwing when mget rejects', async () => { const mget = vi.fn().mockRejectedValue(new Error('redis down')) redisConfigMockFns.mockGetRedisClient.mockReturnValue({ mget } as never) - const active = await getActiveChatStreamIds(['chat-1', 'chat-2']) + const result = await getChatStreamLockOwners(['chat-1', 'chat-2']) - expect(active.size).toBe(0) + expect(result.status).toBe('unknown') + expect(result.ownersByChatId.size).toBe(0) }) }) diff --git a/apps/sim/lib/copilot/request/session/abort.ts b/apps/sim/lib/copilot/request/session/abort.ts index 74101075b27..b081044f8eb 100644 --- a/apps/sim/lib/copilot/request/session/abort.ts +++ b/apps/sim/lib/copilot/request/session/abort.ts @@ -35,6 +35,11 @@ const CHAT_STREAM_LOCK_TTL_SECONDS = 60 */ const CHAT_STREAM_LOCK_HEARTBEAT_INTERVAL_MS = 20_000 +export interface ChatStreamLockOwnersResult { + status: 'verified' | 'unknown' + ownersByChatId: Map +} + function registerPendingChatStream(chatId: string, streamId: string): void { let resolve!: () => void const promise = new Promise((r) => { @@ -124,40 +129,47 @@ export async function getPendingChatStreamId(chatId: string): Promise> { - const active = new Set() - if (chatIds.length === 0) return active +export async function getChatStreamLockOwners( + chatIds: string[] +): Promise { + const localOwnersByChatId = new Map() + if (chatIds.length === 0) { + return { status: 'verified', ownersByChatId: localOwnersByChatId } + } for (const chatId of chatIds) { - if (pendingChatStreams.has(chatId)) active.add(chatId) + const entry = pendingChatStreams.get(chatId) + if (entry?.streamId) localOwnersByChatId.set(chatId, entry.streamId) } const redis = getRedisClient() - if (!redis) return active + if (!redis) { + return { status: 'unknown', ownersByChatId: localOwnersByChatId } + } try { const keys = chatIds.map(getChatStreamLockKey) const values = await redis.mget(keys) + const redisOwnersByChatId = new Map() for (let i = 0; i < chatIds.length; i++) { - if (values[i]) active.add(chatIds[i]) + const owner = values[i] + if (owner) redisOwnersByChatId.set(chatIds[i], owner) } + return { status: 'verified', ownersByChatId: redisOwnersByChatId } } catch (error) { logger.warn('Failed to load chat stream lock owners (batch)', { count: chatIds.length, error: toError(error).message, }) + return { status: 'unknown', ownersByChatId: localOwnersByChatId } } - - return active } export async function releasePendingChatStream(chatId: string, streamId: string): Promise { diff --git a/apps/sim/lib/copilot/request/session/index.ts b/apps/sim/lib/copilot/request/session/index.ts index a09a194c788..c0ecb7a1716 100644 --- a/apps/sim/lib/copilot/request/session/index.ts +++ b/apps/sim/lib/copilot/request/session/index.ts @@ -1,9 +1,11 @@ +export type { ChatStreamLockOwnersResult } from './abort' export { AbortReason, type AbortReasonValue, abortActiveStream, acquirePendingChatStream, cleanupAbortMarker, + getChatStreamLockOwners, getPendingChatStreamId, isExplicitStopReason, registerActiveStream, From ee39dead8563c6509ad35306a8fecb317b6c71c3 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Tue, 12 May 2026 18:32:38 -0700 Subject: [PATCH 4/5] fix state machine issue --- .../lib/copilot/chat/stream-liveness.test.ts | 10 +++---- apps/sim/lib/copilot/chat/stream-liveness.ts | 26 ++++++++++++------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/apps/sim/lib/copilot/chat/stream-liveness.test.ts b/apps/sim/lib/copilot/chat/stream-liveness.test.ts index b8743ad7b9d..f7294b4c6ec 100644 --- a/apps/sim/lib/copilot/chat/stream-liveness.test.ts +++ b/apps/sim/lib/copilot/chat/stream-liveness.test.ts @@ -127,7 +127,7 @@ describe('reconcileChatStreamMarkers', () => { }) }) - it('detects a live lock before the persisted marker has been written', async () => { + it('treats a null persisted marker as inactive even when Redis still holds a lock (post-completion teardown window)', async () => { mockGetChatStreamLockOwners.mockResolvedValueOnce({ status: 'verified', ownersByChatId: new Map([['chat-starting', 'stream-starting']]), @@ -135,18 +135,16 @@ describe('reconcileChatStreamMarkers', () => { const markers = await reconcileChatStreamMarkers([{ chatId: 'chat-starting', streamId: null }]) - expect(mockGetChatStreamLockOwners).toHaveBeenCalledWith(['chat-starting']) expect(markers.get('chat-starting')).toEqual({ chatId: 'chat-starting', - streamId: 'stream-starting', - status: 'active', + streamId: null, + status: 'inactive', }) }) - it('queries locks even when no persisted markers exist', async () => { + it('does not query locks when no chats have persisted stream markers', async () => { const markers = await reconcileChatStreamMarkers([{ chatId: 'chat-idle', streamId: null }]) - expect(mockGetChatStreamLockOwners).toHaveBeenCalledWith(['chat-idle']) expect(markers.get('chat-idle')).toEqual({ chatId: 'chat-idle', streamId: null, diff --git a/apps/sim/lib/copilot/chat/stream-liveness.ts b/apps/sim/lib/copilot/chat/stream-liveness.ts index 1c8de2a3e07..f1f4ba2e9ed 100644 --- a/apps/sim/lib/copilot/chat/stream-liveness.ts +++ b/apps/sim/lib/copilot/chat/stream-liveness.ts @@ -38,27 +38,33 @@ export async function reconcileChatStreamMarkers( const results = new Map() for (const candidate of candidates) { + if (candidate.streamId === null) { + results.set(candidate.chatId, { + chatId: candidate.chatId, + streamId: null, + status: 'inactive', + }) + continue + } results.set(candidate.chatId, { chatId: candidate.chatId, streamId: candidate.streamId, - status: candidate.streamId ? 'unknown' : 'inactive', + status: 'unknown', }) } - if (candidates.length === 0) { + const candidatesWithMarkers = candidates.filter((candidate) => candidate.streamId !== null) + if (candidatesWithMarkers.length === 0) { return results } const { status, ownersByChatId } = await getChatStreamLockOwners( - candidates.map((candidate) => candidate.chatId) + candidatesWithMarkers.map((candidate) => candidate.chatId) ) - for (const candidate of candidates) { + for (const candidate of candidatesWithMarkers) { const owner = ownersByChatId.get(candidate.chatId) - if ( - owner && - (status === 'verified' || owner === candidate.streamId || candidate.streamId === null) - ) { + if (owner && (status === 'verified' || owner === candidate.streamId)) { results.set(candidate.chatId, { chatId: candidate.chatId, streamId: owner, @@ -67,7 +73,7 @@ export async function reconcileChatStreamMarkers( continue } - if (status === 'verified' && candidate.streamId !== null) { + if (status === 'verified') { results.set(candidate.chatId, { chatId: candidate.chatId, streamId: null, @@ -79,7 +85,7 @@ export async function reconcileChatStreamMarkers( results.set(candidate.chatId, { chatId: candidate.chatId, streamId: candidate.streamId, - status: candidate.streamId ? 'unknown' : 'inactive', + status: 'unknown', }) } From 9a1b16e84a850cb672e906808ee82f4cf69ed5a1 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Tue, 12 May 2026 18:43:39 -0700 Subject: [PATCH 5/5] cleanup code and fix types --- .../mothership/chats/[chatId]/route.test.ts | 16 +++++++-------- .../api/mothership/chats/[chatId]/route.ts | 20 +++++++++---------- apps/sim/hooks/queries/tasks.test.ts | 4 ++-- apps/sim/hooks/queries/tasks.ts | 18 +++++++---------- .../sim/lib/api/contracts/mothership-tasks.ts | 2 +- apps/sim/lib/copilot/chat/stream-liveness.ts | 12 +++++++---- 6 files changed, 36 insertions(+), 36 deletions(-) diff --git a/apps/sim/app/api/mothership/chats/[chatId]/route.test.ts b/apps/sim/app/api/mothership/chats/[chatId]/route.test.ts index d9088e436df..ad0efd9a946 100644 --- a/apps/sim/app/api/mothership/chats/[chatId]/route.test.ts +++ b/apps/sim/app/api/mothership/chats/[chatId]/route.test.ts @@ -123,7 +123,7 @@ describe('GET /api/mothership/chats/[chatId]', () => { mockGetLatestRunForStream.mockResolvedValue(null) }) - it('clears conversationId when the redis lock has expired (stuck-yellow bug)', async () => { + it('clears activeStreamId when the redis lock has expired (stuck-yellow bug)', async () => { mockGetAccessibleCopilotChat.mockResolvedValueOnce({ id: 'chat-stuck', type: 'mothership', @@ -147,12 +147,12 @@ describe('GET /api/mothership/chats/[chatId]', () => { { repairVerifiedStaleMarkers: true } ) expect(body.success).toBe(true) - expect(body.chat.conversationId).toBeNull() + expect(body.chat.activeStreamId).toBeNull() expect(body.chat.streamSnapshot).toBeUndefined() expect(mockReadEvents).not.toHaveBeenCalled() }) - it('returns the live conversationId when redis confirms the lock', async () => { + it('returns the live activeStreamId when redis confirms the lock', async () => { mockGetAccessibleCopilotChat.mockResolvedValueOnce({ id: 'chat-live', type: 'mothership', @@ -169,13 +169,13 @@ describe('GET /api/mothership/chats/[chatId]', () => { expect(response.status).toBe(200) const body = await response.json() - expect(body.chat.conversationId).toBe('stream-live') + expect(body.chat.activeStreamId).toBe('stream-live') expect(mockReadEvents).toHaveBeenCalledWith('stream-live', '0') expect(body.chat.streamSnapshot).toBeDefined() expect(body.chat.streamSnapshot.status).toBe('active') }) - it('uses the Redis lock owner when it differs from a stale conversationId', async () => { + it('uses the Redis lock owner when it differs from a stale persisted streamId', async () => { mockGetAccessibleCopilotChat.mockResolvedValueOnce({ id: 'chat-mismatch', type: 'mothership', @@ -196,11 +196,11 @@ describe('GET /api/mothership/chats/[chatId]', () => { expect(response.status).toBe(200) const body = await response.json() - expect(body.chat.conversationId).toBe('stream-live') + expect(body.chat.activeStreamId).toBe('stream-live') expect(mockReadEvents).toHaveBeenCalledWith('stream-live', '0') }) - it('returns null when conversationId is already null', async () => { + it('returns null when the persisted stream marker is already null', async () => { mockGetAccessibleCopilotChat.mockResolvedValueOnce({ id: 'chat-idle', type: 'mothership', @@ -220,7 +220,7 @@ describe('GET /api/mothership/chats/[chatId]', () => { { repairVerifiedStaleMarkers: true } ) const body = await response.json() - expect(body.chat.conversationId).toBeNull() + expect(body.chat.activeStreamId).toBeNull() }) it('returns 404 when the chat does not exist', async () => { diff --git a/apps/sim/app/api/mothership/chats/[chatId]/route.ts b/apps/sim/app/api/mothership/chats/[chatId]/route.ts index 0a74b5b2ed2..b3a86bc8a2e 100644 --- a/apps/sim/app/api/mothership/chats/[chatId]/route.ts +++ b/apps/sim/app/api/mothership/chats/[chatId]/route.ts @@ -57,25 +57,25 @@ export const GET = withRouteHandler( [{ chatId: chat.id, streamId: chat.conversationId }], { repairVerifiedStaleMarkers: true } ) - const liveConversationId = reconciledMarkers.get(chat.id)?.streamId ?? null + const liveStreamId = reconciledMarkers.get(chat.id)?.streamId ?? null - if (liveConversationId) { + if (liveStreamId) { try { const [events, previewSessions] = await Promise.all([ - readEvents(liveConversationId, '0'), - readFilePreviewSessions(liveConversationId).catch((error) => { + readEvents(liveStreamId, '0'), + readFilePreviewSessions(liveStreamId).catch((error) => { logger.warn('Failed to read preview sessions for mothership chat', { chatId, - conversationId: liveConversationId, + streamId: liveStreamId, error: toError(error).message, }) return [] }), ]) - const run = await getLatestRunForStream(liveConversationId, userId).catch((error) => { + const run = await getLatestRunForStream(liveStreamId, userId).catch((error) => { logger.warn('Failed to fetch latest run for mothership chat snapshot', { chatId, - conversationId: liveConversationId, + streamId: liveStreamId, error: toError(error).message, }) return null @@ -94,7 +94,7 @@ export const GET = withRouteHandler( } catch (error) { logger.warn('Failed to read stream snapshot for mothership chat', { chatId, - conversationId: liveConversationId, + streamId: liveStreamId, error: toError(error).message, }) } @@ -107,7 +107,7 @@ export const GET = withRouteHandler( : [] const effectiveMessages = buildEffectiveChatTranscript({ messages: normalizedMessages, - activeStreamId: liveConversationId || null, + activeStreamId: liveStreamId, ...(streamSnapshot ? { streamSnapshot } : {}), }) @@ -117,7 +117,7 @@ export const GET = withRouteHandler( id: chat.id, title: chat.title, messages: effectiveMessages, - conversationId: liveConversationId || null, + activeStreamId: liveStreamId, resources: Array.isArray(chat.resources) ? chat.resources : [], createdAt: chat.createdAt, updatedAt: chat.updatedAt, diff --git a/apps/sim/hooks/queries/tasks.test.ts b/apps/sim/hooks/queries/tasks.test.ts index 0edae3bb93b..39b2c88b7e8 100644 --- a/apps/sim/hooks/queries/tasks.test.ts +++ b/apps/sim/hooks/queries/tasks.test.ts @@ -100,7 +100,7 @@ describe('tasks query boundary parsing', () => { id: 'chat-1', title: 'Task history', messages: [], - conversationId: 'stream-1', + activeStreamId: 'stream-1', resources: [{ type: 'file', id: 'file-1', title: 'Spec.md' }], streamSnapshot: { events: [], @@ -144,7 +144,7 @@ describe('tasks query boundary parsing', () => { ) await expect(fetchChatHistory('chat-1')).rejects.toThrow( - 'Invalid copilot chat response: chat.resources[0].type is invalid' + 'Invalid chat response: chat.resources[0].type is invalid' ) }) diff --git a/apps/sim/hooks/queries/tasks.ts b/apps/sim/hooks/queries/tasks.ts index edba5ff6f1e..d167270f922 100644 --- a/apps/sim/hooks/queries/tasks.ts +++ b/apps/sim/hooks/queries/tasks.ts @@ -57,8 +57,6 @@ export const taskKeys = { detail: (chatId: string | undefined) => [...taskKeys.details(), chatId ?? ''] as const, } -type ChatHistorySource = 'copilot' | 'mothership' - function isRecord(value: unknown): value is Record { return Boolean(value) && typeof value === 'object' && !Array.isArray(value) } @@ -150,30 +148,28 @@ function parseStrictStreamSnapshot( return snapshot } -function parseChatHistory(value: unknown, source: ChatHistorySource): TaskChatHistory { - const responseContext = `Invalid ${source} chat response` +function parseChatHistory(value: unknown): TaskChatHistory { + const responseContext = 'Invalid chat response' const chatContext = `${responseContext}: chat` assertValid(isRecord(value), `${responseContext}: body must be an object`) assertValid(isRecord(value.chat), `${chatContext} must be an object`) const chat = value.chat - const activeStreamField = source === 'mothership' ? 'conversationId' : 'activeStreamId' - const activeStreamId = chat[activeStreamField] assertValid(typeof chat.id === 'string', `${chatContext}.id must be a string`) assertValid(isNullableString(chat.title), `${chatContext}.title must be a string or null`) assertValid(Array.isArray(chat.messages), `${chatContext}.messages must be an array`) assertValid( - isNullableString(activeStreamId), - `${chatContext}.${activeStreamField} must be a string or null` + isNullableString(chat.activeStreamId), + `${chatContext}.activeStreamId must be a string or null` ) return { id: chat.id, title: chat.title, messages: normalizeMessages(chat.messages), - activeStreamId, + activeStreamId: chat.activeStreamId, resources: parseResources(chat.resources, `${chatContext}.resources`), streamSnapshot: parseStrictStreamSnapshot(chat.streamSnapshot, `${chatContext}.streamSnapshot`), } @@ -233,7 +229,7 @@ export async function fetchChatHistory( params: { chatId }, signal, }) - return parseChatHistory(data, 'mothership') + return parseChatHistory(data) } catch (error) { if (!isApiClientError(error)) throw error // Fall through to the legacy copilot-shape alias on any HTTP error (typically 404 @@ -251,7 +247,7 @@ export async function fetchChatHistory( throw new Error('Failed to load chat') } - return parseChatHistory(await copilotRes.json(), 'copilot') + return parseChatHistory(await copilotRes.json()) } /** diff --git a/apps/sim/lib/api/contracts/mothership-tasks.ts b/apps/sim/lib/api/contracts/mothership-tasks.ts index fe8c0a182b7..5ecbb278ba7 100644 --- a/apps/sim/lib/api/contracts/mothership-tasks.ts +++ b/apps/sim/lib/api/contracts/mothership-tasks.ts @@ -283,7 +283,7 @@ export const getMothershipChatResponseSchema = z.object({ id: z.string(), title: z.string().nullable(), messages: z.array(z.unknown()), - conversationId: z.string().nullable(), + activeStreamId: z.string().nullable(), resources: z.array(z.unknown()), createdAt: z.union([z.string(), z.date()]).nullable().optional(), updatedAt: z.union([z.string(), z.date()]).nullable().optional(), diff --git a/apps/sim/lib/copilot/chat/stream-liveness.ts b/apps/sim/lib/copilot/chat/stream-liveness.ts index f1f4ba2e9ed..82a92acbd24 100644 --- a/apps/sim/lib/copilot/chat/stream-liveness.ts +++ b/apps/sim/lib/copilot/chat/stream-liveness.ts @@ -100,10 +100,14 @@ async function repairVerifiedStaleMarkers( candidates: ChatStreamMarkerCandidate[], results: Map ): Promise { - const staleCandidates = candidates.filter((candidate) => { - const result = results.get(candidate.chatId) - return candidate.streamId !== null && result?.status === 'inactive' && result.streamId === null - }) + const staleCandidates = candidates.filter( + (candidate): candidate is { chatId: string; streamId: string } => { + const result = results.get(candidate.chatId) + return ( + candidate.streamId !== null && result?.status === 'inactive' && result.streamId === null + ) + } + ) if (staleCandidates.length === 0) return