Skip to content

Commit 89573e8

Browse files
committed
fix execution stream attach, cleanup comments
1 parent 050de8e commit 89573e8

7 files changed

Lines changed: 287 additions & 143 deletions

File tree

apps/sim/app/workspace/[workspaceId]/home/components/message-content/message-content.tsx

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,6 @@ function parseBlocks(blocks: ContentBlock[]): MessageSegment[] {
182182
if (existing) return { group: existing, created: false }
183183
const group: AgentGroupSegment = {
184184
type: 'agent_group',
185-
// Suffix with segments.length so a later flushLanes / explicit delete
186-
// followed by re-ensure for the same key produces a fresh React key
187-
// instead of colliding with the stranded prior segment.
188185
id: `agent-${key}-${segments.length}`,
189186
agentName: name,
190187
agentLabel: resolveAgentLabel(name),
@@ -211,11 +208,6 @@ function parseBlocks(blocks: ContentBlock[]): MessageSegment[] {
211208
}
212209

213210
const flushLanes = () => {
214-
// Finalize state on the already-pushed segments before we drop them from
215-
// the map. Without this, a thinking/main text/options/stopped block landing
216-
// between a subagent's start and its end strands the lane with stale
217-
// isOpen/isDelegating flags — the later subagent_end iterates groupsByKey
218-
// and can't find it to close.
219211
for (const g of groupsByKey.values()) {
220212
g.isOpen = false
221213
g.isDelegating = false
@@ -300,13 +292,10 @@ function parseBlocks(blocks: ContentBlock[]): MessageSegment[] {
300292
}
301293
}
302294
}
303-
// Subagent lanes break the mothership lane: any mothership tool that
304-
// arrives after this subagent finishes should render below the lane,
305-
// not get back-filled into the mothership group sitting above it.
306295
groupsByKey.delete(groupKey('mothership', undefined))
307296
const { group: g } = ensureGroup(key, block.parentToolCallId)
308297
if (inheritedDelegation) g.isDelegating = true
309-
g.isOpen = block.endedAt === undefined
298+
g.isOpen = true
310299
activeGroupKey = resolveGroupKey(key, block.parentToolCallId)
311300
continue
312301
}
@@ -331,9 +320,6 @@ function parseBlocks(blocks: ContentBlock[]): MessageSegment[] {
331320
if (tc.calledBy) {
332321
const { group: g, created } = ensureGroup(tc.calledBy, block.parentToolCallId)
333322
g.isDelegating = false
334-
// Only mark the lane open when we just created it. Late tool_calls
335-
// arriving after a subagent_end (out-of-order persistence, replay,
336-
// partial state hand-off) must NOT reopen a closed lane.
337323
if (created && block.parentToolCallId) g.isOpen = true
338324
g.items.push({ type: 'tool', data: tool })
339325
activeGroupKey = resolveGroupKey(tc.calledBy, block.parentToolCallId)
@@ -363,11 +349,6 @@ function parseBlocks(blocks: ContentBlock[]): MessageSegment[] {
363349
activeGroupKey = null
364350
}
365351
} else {
366-
// Unparented end blocks come from legacy/partial persisted streams
367-
// that predate parentToolCallId. Close lanes that are themselves
368-
// legacy-keyed without touching parented lanes — this avoids
369-
// force-closing legitimate parallel parented lanes while still
370-
// releasing stranded legacy lanes in mixed/migration streams.
371352
for (const [key, g] of groupsByKey) {
372353
if (key.endsWith(':legacy') && g.agentName !== 'mothership') {
373354
g.isOpen = false

apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts

Lines changed: 154 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ import type {
131131
MothershipResource,
132132
MothershipResourceType,
133133
QueuedMessage,
134+
ToolCallInfo,
134135
} from '../types'
135136
import { ToolCallStatus } from '../types'
136137

@@ -1217,7 +1218,7 @@ export function useChat(
12171218
reader: ReadableStreamDefaultReader<Uint8Array>,
12181219
assistantId: string,
12191220
expectedGen?: number,
1220-
options?: { preserveExistingState?: boolean }
1221+
options?: { preserveExistingState?: boolean; suppressWorkflowToolStarts?: boolean }
12211222
) => Promise<{ sawStreamError: boolean; sawComplete: boolean }>
12221223
>(async () => ({ sawStreamError: false, sawComplete: false }))
12231224
const attachToExistingStreamRef = useRef<
@@ -1457,8 +1458,50 @@ export function useChat(
14571458
return
14581459
}
14591460
if (handledClientWorkflowToolIdsRef.current.has(toolCallId)) {
1461+
// #region agent log
1462+
fetch('http://127.0.0.1:1025/ingest/85045d0a-92f7-4ee2-9de1-e2f99930c6bc', {
1463+
method: 'POST',
1464+
headers: { 'Content-Type': 'application/json', 'X-Debug-Session-Id': '21c369' },
1465+
body: JSON.stringify({
1466+
sessionId: '21c369',
1467+
location: 'use-chat.ts:startClientWorkflowTool',
1468+
message: 'SKIPPED:handled',
1469+
data: { toolCallId },
1470+
timestamp: Date.now(),
1471+
}),
1472+
}).catch(() => {})
1473+
// #endregion
1474+
return
1475+
}
1476+
if (recoveringClientWorkflowToolIdsRef.current.has(toolCallId)) {
1477+
// #region agent log
1478+
fetch('http://127.0.0.1:1025/ingest/85045d0a-92f7-4ee2-9de1-e2f99930c6bc', {
1479+
method: 'POST',
1480+
headers: { 'Content-Type': 'application/json', 'X-Debug-Session-Id': '21c369' },
1481+
body: JSON.stringify({
1482+
sessionId: '21c369',
1483+
location: 'use-chat.ts:startClientWorkflowTool',
1484+
message: 'SKIPPED:recovering',
1485+
data: { toolCallId },
1486+
timestamp: Date.now(),
1487+
}),
1488+
}).catch(() => {})
1489+
// #endregion
14601490
return
14611491
}
1492+
// #region agent log
1493+
fetch('http://127.0.0.1:1025/ingest/85045d0a-92f7-4ee2-9de1-e2f99930c6bc', {
1494+
method: 'POST',
1495+
headers: { 'Content-Type': 'application/json', 'X-Debug-Session-Id': '21c369' },
1496+
body: JSON.stringify({
1497+
sessionId: '21c369',
1498+
location: 'use-chat.ts:startClientWorkflowTool',
1499+
message: 'FIRING',
1500+
data: { toolCallId, toolName },
1501+
timestamp: Date.now(),
1502+
}),
1503+
}).catch(() => {})
1504+
// #endregion
14621505
handledClientWorkflowToolIdsRef.current.add(toolCallId)
14631506

14641507
ensureWorkflowToolResource(toolArgs)
@@ -1469,41 +1512,68 @@ export function useChat(
14691512

14701513
const recoverPendingClientWorkflowTools = useCallback(
14711514
async (nextMessages: ChatMessage[]) => {
1515+
const pending: ToolCallInfo[] = []
1516+
14721517
for (const message of nextMessages) {
14731518
for (const block of message.contentBlocks ?? []) {
14741519
const toolCall = block.toolCall
1475-
if (!toolCall || !isWorkflowToolName(toolCall.name)) {
1476-
continue
1477-
}
1478-
if (toolCall.status !== 'executing') {
1479-
continue
1480-
}
1481-
1520+
if (!toolCall || !isWorkflowToolName(toolCall.name)) continue
1521+
if (toolCall.status !== 'executing') continue
14821522
if (
14831523
handledClientWorkflowToolIdsRef.current.has(toolCall.id) ||
14841524
recoveringClientWorkflowToolIdsRef.current.has(toolCall.id)
14851525
) {
14861526
continue
14871527
}
1488-
14891528
recoveringClientWorkflowToolIdsRef.current.add(toolCall.id)
1529+
pending.push(toolCall)
1530+
}
1531+
}
14901532

1491-
try {
1492-
const toolArgs = toolCall.params ?? {}
1493-
const targetWorkflowId = ensureWorkflowToolResource(toolArgs)
1494-
1495-
if (targetWorkflowId) {
1496-
const rebound = await bindRunToolToExecution(toolCall.id, targetWorkflowId)
1497-
if (rebound) {
1498-
handledClientWorkflowToolIdsRef.current.add(toolCall.id)
1499-
continue
1500-
}
1501-
}
1533+
// #region agent log
1534+
fetch('http://127.0.0.1:1025/ingest/85045d0a-92f7-4ee2-9de1-e2f99930c6bc', {
1535+
method: 'POST',
1536+
headers: { 'Content-Type': 'application/json', 'X-Debug-Session-Id': '21c369' },
1537+
body: JSON.stringify({
1538+
sessionId: '21c369',
1539+
location: 'use-chat.ts:recoverPending',
1540+
message: 'scan complete',
1541+
data: { pendingCount: pending.length, pendingIds: pending.map((t) => t.id) },
1542+
timestamp: Date.now(),
1543+
}),
1544+
}).catch(() => {})
1545+
// #endregion
15021546

1503-
startClientWorkflowTool(toolCall.id, toolCall.name, toolArgs)
1504-
} finally {
1505-
recoveringClientWorkflowToolIdsRef.current.delete(toolCall.id)
1547+
for (const toolCall of pending) {
1548+
try {
1549+
const toolArgs = toolCall.params ?? {}
1550+
const targetWorkflowId = ensureWorkflowToolResource(toolArgs)
1551+
1552+
if (targetWorkflowId) {
1553+
const rebound = await bindRunToolToExecution(toolCall.id, targetWorkflowId)
1554+
// #region agent log
1555+
fetch('http://127.0.0.1:1025/ingest/85045d0a-92f7-4ee2-9de1-e2f99930c6bc', {
1556+
method: 'POST',
1557+
headers: { 'Content-Type': 'application/json', 'X-Debug-Session-Id': '21c369' },
1558+
body: JSON.stringify({
1559+
sessionId: '21c369',
1560+
location: 'use-chat.ts:recoverPending',
1561+
message: 'bind result',
1562+
data: { toolCallId: toolCall.id, targetWorkflowId, rebound },
1563+
timestamp: Date.now(),
1564+
}),
1565+
}).catch(() => {})
1566+
// #endregion
1567+
if (rebound) {
1568+
handledClientWorkflowToolIdsRef.current.add(toolCall.id)
1569+
continue
1570+
}
15061571
}
1572+
1573+
recoveringClientWorkflowToolIdsRef.current.delete(toolCall.id)
1574+
startClientWorkflowTool(toolCall.id, toolCall.name, toolArgs)
1575+
} finally {
1576+
recoveringClientWorkflowToolIdsRef.current.delete(toolCall.id)
15071577
}
15081578
}
15091579
},
@@ -1703,7 +1773,7 @@ export function useChat(
17031773
reader: ReadableStreamDefaultReader<Uint8Array>,
17041774
assistantId: string,
17051775
expectedGen?: number,
1706-
options?: { preserveExistingState?: boolean }
1776+
options?: { preserveExistingState?: boolean; suppressWorkflowToolStarts?: boolean }
17071777
) => {
17081778
const decoder = new TextDecoder()
17091779
streamReaderRef.current = reader
@@ -1818,6 +1888,19 @@ export function useChat(
18181888
return activeSubagent
18191889
}
18201890

1891+
const resolveParentForSubagentBlock = (
1892+
subagent: string | undefined,
1893+
scopedParent: string | undefined
1894+
): string | undefined => {
1895+
if (!subagent) return undefined
1896+
if (scopedParent) return scopedParent
1897+
if (activeSubagent === subagent) return activeSubagentParentToolCallId
1898+
for (const [parent, name] of subagentByParentToolCallId) {
1899+
if (name === subagent) return parent
1900+
}
1901+
return undefined
1902+
}
1903+
18211904
const appendInlineErrorTag = (
18221905
tag: string,
18231906
subagentName?: string,
@@ -2038,9 +2121,10 @@ export function useChat(
20382121
if (chunk) {
20392122
const eventTs = typeof parsed.ts === 'string' ? parsed.ts : undefined
20402123
if (parsed.payload.channel === MothershipStreamV1TextChannel.thinking) {
2041-
const scopedParentForBlock = scopedSubagent
2042-
? (scopedParentToolCallId ?? activeSubagentParentToolCallId)
2043-
: undefined
2124+
const scopedParentForBlock = resolveParentForSubagentBlock(
2125+
scopedSubagent,
2126+
scopedParentToolCallId
2127+
)
20442128
const tb = ensureThinkingBlock(scopedSubagent, scopedParentForBlock, eventTs)
20452129
tb.content = (tb.content ?? '') + chunk
20462130
flushText()
@@ -2052,9 +2136,10 @@ export function useChat(
20522136
lastContentSource !== contentSource &&
20532137
runningText.length > 0 &&
20542138
!runningText.endsWith('\n')
2055-
const scopedParentForBlock = scopedSubagent
2056-
? (scopedParentToolCallId ?? activeSubagentParentToolCallId)
2057-
: undefined
2139+
const scopedParentForBlock = resolveParentForSubagentBlock(
2140+
scopedSubagent,
2141+
scopedParentToolCallId
2142+
)
20582143
const tb = ensureTextBlock(scopedSubagent, scopedParentForBlock, eventTs)
20592144
const normalizedChunk = needsBoundaryNewline ? `\n${chunk}` : chunk
20602145
tb.content = (tb.content ?? '') + normalizedChunk
@@ -2391,12 +2476,17 @@ export function useChat(
23912476
}
23922477
}
23932478

2394-
if (!toolMap.has(id)) {
2479+
const existingToolCall = toolMap.has(id)
2480+
? blocks[toolMap.get(id)!]?.toolCall
2481+
: undefined
2482+
const isNewToolCall = !existingToolCall
2483+
if (isNewToolCall) {
23952484
stampBlockEnd(blocks[blocks.length - 1])
23962485
toolMap.set(id, blocks.length)
2397-
const parentToolCallIdForBlock = scopedSubagent
2398-
? (scopedParentToolCallId ?? activeSubagentParentToolCallId)
2399-
: undefined
2486+
const parentToolCallIdForBlock = resolveParentForSubagentBlock(
2487+
scopedSubagent,
2488+
scopedParentToolCallId
2489+
)
24002490
blocks.push({
24012491
type: 'tool_call',
24022492
toolCall: {
@@ -2427,7 +2517,28 @@ export function useChat(
24272517
flush()
24282518

24292519
if (isWorkflowToolName(name) && !isPartial) {
2430-
startClientWorkflowTool(id, name, args ?? {})
2520+
const shouldStartWorkflowTool =
2521+
!options?.suppressWorkflowToolStarts &&
2522+
(isNewToolCall ||
2523+
(existingToolCall?.status === ToolCallStatus.executing &&
2524+
!existingToolCall.result))
2525+
if (shouldStartWorkflowTool) {
2526+
startClientWorkflowTool(id, name, args ?? {})
2527+
} else {
2528+
// #region agent log
2529+
fetch('http://127.0.0.1:1025/ingest/85045d0a-92f7-4ee2-9de1-e2f99930c6bc', {
2530+
method: 'POST',
2531+
headers: { 'Content-Type': 'application/json', 'X-Debug-Session-Id': '21c369' },
2532+
body: JSON.stringify({
2533+
sessionId: '21c369',
2534+
location: 'use-chat.ts:processSSE',
2535+
message: 'SKIPPED:replay',
2536+
data: { toolCallId: id, toolName: name },
2537+
timestamp: Date.now(),
2538+
}),
2539+
}).catch(() => {})
2540+
// #endregion
2541+
}
24312542
}
24322543
break
24332544
}
@@ -2627,9 +2738,7 @@ export function useChat(
26272738
appendInlineErrorTag(
26282739
buildInlineErrorTag(parsed.payload),
26292740
scopedSubagent,
2630-
scopedSubagent
2631-
? (scopedParentToolCallId ?? activeSubagentParentToolCallId)
2632-
: undefined,
2741+
resolveParentForSubagentBlock(scopedSubagent, scopedParentToolCallId),
26332742
typeof parsed.ts === 'string' ? parsed.ts : undefined
26342743
)
26352744
break
@@ -2734,6 +2843,7 @@ export function useChat(
27342843
let latestCursor = afterCursor
27352844
let seedEvents = opts.initialBatch?.events ?? []
27362845
let streamStatus = opts.initialBatch?.status ?? 'unknown'
2846+
let suppressSeedWorkflowStarts = seedEvents.length > 0
27372847

27382848
const isStaleReconnect = () =>
27392849
streamGenRef.current !== expectedGen || abortControllerRef.current?.signal.aborted === true
@@ -2752,11 +2862,15 @@ export function useChat(
27522862
buildReplayStream(seedEvents).getReader(),
27532863
assistantId,
27542864
expectedGen,
2755-
{ preserveExistingState: true }
2865+
{
2866+
preserveExistingState: true,
2867+
suppressWorkflowToolStarts: suppressSeedWorkflowStarts,
2868+
}
27562869
)
27572870
latestCursor = String(seedEvents[seedEvents.length - 1]?.eventId ?? latestCursor)
27582871
lastCursorRef.current = latestCursor
27592872
seedEvents = []
2873+
suppressSeedWorkflowStarts = false
27602874

27612875
if (replayResult.sawStreamError) {
27622876
return { error: true, aborted: false }
@@ -2800,7 +2914,7 @@ export function useChat(
28002914
sseRes.body.getReader(),
28012915
assistantId,
28022916
expectedGen,
2803-
{ preserveExistingState: true }
2917+
{ preserveExistingState: true, suppressWorkflowToolStarts: true }
28042918
)
28052919

28062920
if (liveResult.sawStreamError) {

0 commit comments

Comments
 (0)