Skip to content

Commit cef351f

Browse files
fix(terminal): terminal console update for child spans + hitl state machine (#4450)
* fix(terminal): terminal console update for child spans" * address comments * fix hitl state machine * address comments * address greptile
1 parent d721dc3 commit cef351f

34 files changed

Lines changed: 4535 additions & 937 deletions

File tree

apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import { generateRequestId } from '@/lib/core/utils/request'
1010
import { SSE_HEADERS } from '@/lib/core/utils/sse'
1111
import { getBaseUrl } from '@/lib/core/utils/urls'
1212
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
13-
import { setExecutionMeta } from '@/lib/execution/event-buffer'
1413
import { preprocessExecution } from '@/lib/execution/preprocessing'
1514
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
1615
import { createStreamingResponse } from '@/lib/workflows/streaming/streaming'
@@ -157,12 +156,6 @@ export const POST = withRouteHandler(
157156
})
158157
}
159158

160-
await setExecutionMeta(enqueueResult.resumeExecutionId, {
161-
status: 'active',
162-
userId,
163-
workflowId,
164-
})
165-
166159
const resumeArgs = {
167160
resumeEntryId: enqueueResult.resumeEntryId,
168161
resumeExecutionId: enqueueResult.resumeExecutionId,
@@ -249,6 +242,14 @@ export const POST = withRouteHandler(
249242
error: toError(dispatchError).message,
250243
resumeExecutionId: enqueueResult.resumeExecutionId,
251244
})
245+
await PauseResumeManager.markResumeAttemptFailed({
246+
resumeEntryId: enqueueResult.resumeEntryId,
247+
pausedExecutionId: enqueueResult.pausedExecution.id,
248+
parentExecutionId: executionId,
249+
contextId: enqueueResult.contextId,
250+
failureReason: 'Failed to queue async resume execution',
251+
})
252+
await PauseResumeManager.processQueuedResumes(executionId)
252253
return NextResponse.json(
253254
{ error: 'Failed to queue resume execution. Please try again.' },
254255
{ status: 503 }

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 146 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,12 @@ import {
2525
SIM_VIA_HEADER,
2626
validateCallChain,
2727
} from '@/lib/execution/call-chain'
28-
import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer'
28+
import {
29+
createExecutionEventWriter,
30+
flushExecutionStreamReplayBuffer,
31+
initializeExecutionStreamMeta,
32+
type TerminalExecutionStreamStatus,
33+
} from '@/lib/execution/event-buffer'
2934
import { processInputFileFields } from '@/lib/execution/files'
3035
import {
3136
registerManualExecutionAborter,
@@ -868,11 +873,17 @@ async function handleExecutePost(
868873
let isManualAbortRegistered = false
869874

870875
const eventWriter = createExecutionEventWriter(executionId)
871-
setExecutionMeta(executionId, {
872-
status: 'active',
876+
const metaInitialized = await initializeExecutionStreamMeta(executionId, {
873877
userId: actorUserId,
874878
workflowId,
875-
}).catch(() => {})
879+
})
880+
if (!metaInitialized) {
881+
timeoutController.cleanup()
882+
return NextResponse.json(
883+
{ error: 'Run buffer temporarily unavailable' },
884+
{ status: 503, headers: { 'X-Execution-Id': executionId } }
885+
)
886+
}
876887

877888
const stream = new ReadableStream<Uint8Array>({
878889
async start(controller) {
@@ -881,12 +892,18 @@ async function handleExecutePost(
881892
registerManualExecutionAborter(executionId, timeoutController.abort)
882893
isManualAbortRegistered = true
883894

884-
let localEventSeq = 0
885-
const sendEvent = (event: ExecutionEvent) => {
895+
let terminalEventPublished = false
896+
const sendEvent = async (
897+
event: ExecutionEvent,
898+
terminalStatus?: TerminalExecutionStreamStatus
899+
) => {
886900
const isBuffered = event.type !== 'stream:chunk' && event.type !== 'stream:done'
887901
if (isBuffered) {
888-
localEventSeq++
889-
event.eventId = localEventSeq
902+
const entry = terminalStatus
903+
? await eventWriter.writeTerminal(event, terminalStatus)
904+
: await eventWriter.write(event)
905+
event.eventId = entry.eventId
906+
terminalEventPublished ||= Boolean(terminalStatus)
890907
}
891908
if (!isStreamClosed) {
892909
try {
@@ -895,15 +912,12 @@ async function handleExecutePost(
895912
isStreamClosed = true
896913
}
897914
}
898-
if (isBuffered) {
899-
eventWriter.write(event).catch(() => {})
900-
}
901915
}
902916

903917
try {
904918
const startTime = new Date()
905919

906-
sendEvent({
920+
await sendEvent({
907921
type: 'execution:started',
908922
timestamp: startTime.toISOString(),
909923
executionId,
@@ -922,7 +936,7 @@ async function handleExecutePost(
922936
childWorkflowContext?: ChildWorkflowContext
923937
) => {
924938
reqLogger.info('onBlockStart called', { blockId, blockName, blockType })
925-
sendEvent({
939+
await sendEvent({
926940
type: 'block:started',
927941
timestamp: new Date().toISOString(),
928942
executionId,
@@ -976,7 +990,7 @@ async function handleExecutePost(
976990
blockType,
977991
error: callbackData.output.error,
978992
})
979-
sendEvent({
993+
await sendEvent({
980994
type: 'block:error',
981995
timestamp: new Date().toISOString(),
982996
executionId,
@@ -1010,7 +1024,7 @@ async function handleExecutePost(
10101024
blockName,
10111025
blockType,
10121026
})
1013-
sendEvent({
1027+
await sendEvent({
10141028
type: 'block:completed',
10151029
timestamp: new Date().toISOString(),
10161030
executionId,
@@ -1053,7 +1067,7 @@ async function handleExecutePost(
10531067
if (done) break
10541068

10551069
const chunk = decoder.decode(value, { stream: true })
1056-
sendEvent({
1070+
await sendEvent({
10571071
type: 'stream:chunk',
10581072
timestamp: new Date().toISOString(),
10591073
executionId,
@@ -1062,7 +1076,7 @@ async function handleExecutePost(
10621076
})
10631077
}
10641078

1065-
sendEvent({
1079+
await sendEvent({
10661080
type: 'stream:done',
10671081
timestamp: new Date().toISOString(),
10681082
executionId,
@@ -1107,13 +1121,14 @@ async function handleExecutePost(
11071121
selectedOutputs
11081122
)
11091123

1110-
const onChildWorkflowInstanceReady = (
1124+
const onChildWorkflowInstanceReady = async (
11111125
blockId: string,
11121126
childWorkflowInstanceId: string,
11131127
iterationContext?: IterationContext,
1114-
executionOrder?: number
1128+
executionOrder?: number,
1129+
childWorkflowContext?: ChildWorkflowContext
11151130
) => {
1116-
sendEvent({
1131+
await sendEvent({
11171132
type: 'block:childWorkflowStarted',
11181133
timestamp: new Date().toISOString(),
11191134
executionId,
@@ -1123,7 +1138,16 @@ async function handleExecutePost(
11231138
childWorkflowInstanceId,
11241139
...(iterationContext && {
11251140
iterationCurrent: iterationContext.iterationCurrent,
1141+
iterationTotal: iterationContext.iterationTotal,
1142+
iterationType: iterationContext.iterationType,
11261143
iterationContainerId: iterationContext.iterationContainerId,
1144+
...(iterationContext.parentIterations?.length && {
1145+
parentIterations: iterationContext.parentIterations,
1146+
}),
1147+
}),
1148+
...(childWorkflowContext && {
1149+
childWorkflowBlockId: childWorkflowContext.parentBlockId,
1150+
childWorkflowName: childWorkflowContext.workflowName,
11271151
}),
11281152
...(executionOrder !== undefined && { executionOrder }),
11291153
},
@@ -1157,32 +1181,38 @@ async function handleExecutePost(
11571181

11581182
await loggingSession.markAsFailed(timeoutErrorMessage)
11591183

1160-
sendEvent({
1161-
type: 'execution:error',
1162-
timestamp: new Date().toISOString(),
1163-
executionId,
1164-
workflowId,
1165-
data: {
1166-
error: timeoutErrorMessage,
1167-
duration: result.metadata?.duration || 0,
1168-
finalBlockLogs: result.logs,
1169-
},
1170-
})
11711184
finalMetaStatus = 'error'
1185+
await sendEvent(
1186+
{
1187+
type: 'execution:error',
1188+
timestamp: new Date().toISOString(),
1189+
executionId,
1190+
workflowId,
1191+
data: {
1192+
error: timeoutErrorMessage,
1193+
duration: result.metadata?.duration || 0,
1194+
finalBlockLogs: result.logs,
1195+
},
1196+
},
1197+
'error'
1198+
)
11721199
} else {
11731200
reqLogger.info('Workflow execution was cancelled')
11741201

1175-
sendEvent({
1176-
type: 'execution:cancelled',
1177-
timestamp: new Date().toISOString(),
1178-
executionId,
1179-
workflowId,
1180-
data: {
1181-
duration: result.metadata?.duration || 0,
1182-
finalBlockLogs: result.logs,
1183-
},
1184-
})
11851202
finalMetaStatus = 'cancelled'
1203+
await sendEvent(
1204+
{
1205+
type: 'execution:cancelled',
1206+
timestamp: new Date().toISOString(),
1207+
executionId,
1208+
workflowId,
1209+
data: {
1210+
duration: result.metadata?.duration || 0,
1211+
finalBlockLogs: result.logs,
1212+
},
1213+
},
1214+
'cancelled'
1215+
)
11861216
}
11871217
return
11881218
}
@@ -1196,35 +1226,43 @@ async function handleExecutePost(
11961226
: result.output
11971227

11981228
if (result.status === 'paused') {
1199-
sendEvent({
1200-
type: 'execution:paused',
1201-
timestamp: new Date().toISOString(),
1202-
executionId,
1203-
workflowId,
1204-
data: {
1205-
output: sseOutput,
1206-
duration: result.metadata?.duration || 0,
1207-
startTime: result.metadata?.startTime || startTime.toISOString(),
1208-
endTime: result.metadata?.endTime || new Date().toISOString(),
1229+
finalMetaStatus = 'complete'
1230+
await sendEvent(
1231+
{
1232+
type: 'execution:paused',
1233+
timestamp: new Date().toISOString(),
1234+
executionId,
1235+
workflowId,
1236+
data: {
1237+
output: sseOutput,
1238+
duration: result.metadata?.duration || 0,
1239+
startTime: result.metadata?.startTime || startTime.toISOString(),
1240+
endTime: result.metadata?.endTime || new Date().toISOString(),
1241+
finalBlockLogs: result.logs,
1242+
},
12091243
},
1210-
})
1244+
'complete'
1245+
)
12111246
} else {
1212-
sendEvent({
1213-
type: 'execution:completed',
1214-
timestamp: new Date().toISOString(),
1215-
executionId,
1216-
workflowId,
1217-
data: {
1218-
success: result.success,
1219-
output: sseOutput,
1220-
duration: result.metadata?.duration || 0,
1221-
startTime: result.metadata?.startTime || startTime.toISOString(),
1222-
endTime: result.metadata?.endTime || new Date().toISOString(),
1223-
finalBlockLogs: result.logs,
1247+
finalMetaStatus = 'complete'
1248+
await sendEvent(
1249+
{
1250+
type: 'execution:completed',
1251+
timestamp: new Date().toISOString(),
1252+
executionId,
1253+
workflowId,
1254+
data: {
1255+
success: result.success,
1256+
output: sseOutput,
1257+
duration: result.metadata?.duration || 0,
1258+
startTime: result.metadata?.startTime || startTime.toISOString(),
1259+
endTime: result.metadata?.endTime || new Date().toISOString(),
1260+
finalBlockLogs: result.logs,
1261+
},
12241262
},
1225-
})
1263+
'complete'
1264+
)
12261265
}
1227-
finalMetaStatus = 'complete'
12281266
} catch (error: unknown) {
12291267
const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut()
12301268
const errorMessage = isTimeout
@@ -1237,32 +1275,55 @@ async function handleExecutePost(
12371275

12381276
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
12391277

1240-
sendEvent({
1241-
type: 'execution:error',
1242-
timestamp: new Date().toISOString(),
1243-
executionId,
1244-
workflowId,
1245-
data: {
1246-
error: executionResult?.error || errorMessage,
1247-
duration: executionResult?.metadata?.duration || 0,
1248-
finalBlockLogs: executionResult?.logs,
1249-
},
1250-
})
12511278
finalMetaStatus = 'error'
1279+
await sendEvent(
1280+
{
1281+
type: 'execution:error',
1282+
timestamp: new Date().toISOString(),
1283+
executionId,
1284+
workflowId,
1285+
data: {
1286+
error: executionResult?.error || errorMessage,
1287+
duration: executionResult?.metadata?.duration || 0,
1288+
finalBlockLogs: executionResult?.logs,
1289+
},
1290+
},
1291+
'error'
1292+
)
12521293
} finally {
12531294
if (isManualAbortRegistered) {
12541295
unregisterManualExecutionAborter(executionId)
12551296
isManualAbortRegistered = false
12561297
}
1257-
try {
1258-
await eventWriter.close()
1259-
} catch (closeError) {
1260-
reqLogger.warn('Failed to close event writer', {
1261-
error: toError(closeError).message,
1298+
if (finalMetaStatus && !terminalEventPublished) {
1299+
const replayBufferFlushed = await flushExecutionStreamReplayBuffer(
1300+
executionId,
1301+
eventWriter
1302+
)
1303+
reqLogger.error('Failed to publish terminal execution event durably', {
1304+
executionId,
1305+
status: finalMetaStatus,
1306+
replayBufferFlushed,
12621307
})
1263-
}
1264-
if (finalMetaStatus) {
1265-
setExecutionMeta(executionId, { status: finalMetaStatus }).catch(() => {})
1308+
if (!isStreamClosed) {
1309+
controller.error(new Error('Run buffer terminal event publish failed'))
1310+
isStreamClosed = true
1311+
}
1312+
} else if (terminalEventPublished) {
1313+
await eventWriter.close().catch((closeError) => {
1314+
reqLogger.warn('Failed to close execution event writer after terminal publish', {
1315+
executionId,
1316+
error: closeError instanceof Error ? closeError.message : String(closeError),
1317+
})
1318+
})
1319+
} else {
1320+
try {
1321+
await eventWriter.close()
1322+
} catch (closeError) {
1323+
reqLogger.warn('Failed to close event writer', {
1324+
error: toError(closeError).message,
1325+
})
1326+
}
12661327
}
12671328
timeoutController.cleanup()
12681329
if (executionId) {

0 commit comments

Comments
 (0)