Skip to content

Commit ee39dea

Browse files
committed
fix state machine issue
1 parent 9406a3b commit ee39dea

2 files changed

Lines changed: 20 additions & 16 deletions

File tree

apps/sim/lib/copilot/chat/stream-liveness.test.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,26 +127,24 @@ describe('reconcileChatStreamMarkers', () => {
127127
})
128128
})
129129

130-
it('detects a live lock before the persisted marker has been written', async () => {
130+
it('treats a null persisted marker as inactive even when Redis still holds a lock (post-completion teardown window)', async () => {
131131
mockGetChatStreamLockOwners.mockResolvedValueOnce({
132132
status: 'verified',
133133
ownersByChatId: new Map([['chat-starting', 'stream-starting']]),
134134
})
135135

136136
const markers = await reconcileChatStreamMarkers([{ chatId: 'chat-starting', streamId: null }])
137137

138-
expect(mockGetChatStreamLockOwners).toHaveBeenCalledWith(['chat-starting'])
139138
expect(markers.get('chat-starting')).toEqual({
140139
chatId: 'chat-starting',
141-
streamId: 'stream-starting',
142-
status: 'active',
140+
streamId: null,
141+
status: 'inactive',
143142
})
144143
})
145144

146-
it('queries locks even when no persisted markers exist', async () => {
145+
it('does not query locks when no chats have persisted stream markers', async () => {
147146
const markers = await reconcileChatStreamMarkers([{ chatId: 'chat-idle', streamId: null }])
148147

149-
expect(mockGetChatStreamLockOwners).toHaveBeenCalledWith(['chat-idle'])
150148
expect(markers.get('chat-idle')).toEqual({
151149
chatId: 'chat-idle',
152150
streamId: null,

apps/sim/lib/copilot/chat/stream-liveness.ts

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,27 +38,33 @@ export async function reconcileChatStreamMarkers(
3838
const results = new Map<string, ReconciledChatStreamMarker>()
3939

4040
for (const candidate of candidates) {
41+
if (candidate.streamId === null) {
42+
results.set(candidate.chatId, {
43+
chatId: candidate.chatId,
44+
streamId: null,
45+
status: 'inactive',
46+
})
47+
continue
48+
}
4149
results.set(candidate.chatId, {
4250
chatId: candidate.chatId,
4351
streamId: candidate.streamId,
44-
status: candidate.streamId ? 'unknown' : 'inactive',
52+
status: 'unknown',
4553
})
4654
}
4755

48-
if (candidates.length === 0) {
56+
const candidatesWithMarkers = candidates.filter((candidate) => candidate.streamId !== null)
57+
if (candidatesWithMarkers.length === 0) {
4958
return results
5059
}
5160

5261
const { status, ownersByChatId } = await getChatStreamLockOwners(
53-
candidates.map((candidate) => candidate.chatId)
62+
candidatesWithMarkers.map((candidate) => candidate.chatId)
5463
)
5564

56-
for (const candidate of candidates) {
65+
for (const candidate of candidatesWithMarkers) {
5766
const owner = ownersByChatId.get(candidate.chatId)
58-
if (
59-
owner &&
60-
(status === 'verified' || owner === candidate.streamId || candidate.streamId === null)
61-
) {
67+
if (owner && (status === 'verified' || owner === candidate.streamId)) {
6268
results.set(candidate.chatId, {
6369
chatId: candidate.chatId,
6470
streamId: owner,
@@ -67,7 +73,7 @@ export async function reconcileChatStreamMarkers(
6773
continue
6874
}
6975

70-
if (status === 'verified' && candidate.streamId !== null) {
76+
if (status === 'verified') {
7177
results.set(candidate.chatId, {
7278
chatId: candidate.chatId,
7379
streamId: null,
@@ -79,7 +85,7 @@ export async function reconcileChatStreamMarkers(
7985
results.set(candidate.chatId, {
8086
chatId: candidate.chatId,
8187
streamId: candidate.streamId,
82-
status: candidate.streamId ? 'unknown' : 'inactive',
88+
status: 'unknown',
8389
})
8490
}
8591

0 commit comments

Comments
 (0)