Skip to content

Commit a1752e8

Browse files
committed
improvement(resolver): lazy resolution for underlying fields greater than 10MB
1 parent 3c33c20 commit a1752e8

41 files changed

Lines changed: 1569 additions & 131 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apps/docs/content/docs/en/blocks/function.mdx

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,14 @@ plt.show()
185185

186186
## Best Practices
187187

188+
### Large Inputs and Payload Limits
189+
190+
Function blocks receive their code, parameters, resolved references, and previous block context in an internal execution request. Sim can safely reference oversized workflow outputs, such as large `loop.results` or `parallel.results`, when you select a smaller nested field like `<loop.results[0][0].id>`.
191+
192+
Avoid passing a full large object into a Function block when you only need one field. For example, prefer `<api.data.customerId>` over `<api.data>` when the API response is large. If the complete function request body is still larger than the platform limit, execution can fail before your code starts.
193+
194+
For large generated data, write the result to a file or table with `outputPath`, `outputSandboxPath`, or `outputTable` instead of returning the entire payload inline.
195+
188196
- **Keep functions focused**: Write functions that do one thing well to improve maintainability and debugging
189197
- **Handle errors gracefully**: Use try/catch blocks to handle potential errors and provide meaningful error messages
190198
- **Test edge cases**: Ensure your code handles unusual inputs, null values, and boundary conditions correctly

apps/docs/content/docs/en/execution/api-deployment.mdx

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,25 @@ while (true) {
215215
</Tab>
216216
</Tabs>
217217

218+
#### Oversized outputs
219+
220+
Workflow execution responses are capped by platform request and response limits. When an internal output, log field, streamed field, or async status payload contains a value that is too large to inline, Sim may replace that nested value with a versioned reference:
221+
222+
```json
223+
{
224+
"__simLargeValueRef": true,
225+
"version": 1,
226+
"id": "lv_abc123DEF456",
227+
"kind": "array",
228+
"size": 12582912,
229+
"key": "execution-values/workspace/workflow/execution/lv_abc123DEF456.json",
230+
"executionId": "exec_xyz",
231+
"preview": { "length": 25000 }
232+
}
233+
```
234+
235+
The `version` field is part of the external API contract. Treat the reference as an opaque placeholder for a value that could not be safely embedded in the response. `id`, `key`, and `executionId` are not fetch URLs; use `selectedOutputs` to request a smaller nested field, reduce the data passed between blocks, or return the data from a Response block when your workflow intentionally owns the HTTP response body.
236+
218237
### Asynchronous
219238

220239
For long-running workflows, async mode returns a job ID immediately so you don't need to hold the connection open. Add the `X-Execution-Mode: async` header to your request. The API returns HTTP 202 with a job ID and status URL. Poll the status URL until the job completes.

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 139 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import {
3636
registerManualExecutionAborter,
3737
unregisterManualExecutionAborter,
3838
} from '@/lib/execution/manual-cancellation'
39+
import { compactBlockLogs, compactExecutionPayload } from '@/lib/execution/payloads/serializer'
3940
import { preprocessExecution } from '@/lib/execution/preprocessing'
4041
import { LoggingSession } from '@/lib/logs/execution/logging-session'
4142
import {
@@ -65,7 +66,7 @@ import type {
6566
IterationContext,
6667
SerializableExecutionState,
6768
} from '@/executor/execution/types'
68-
import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
69+
import type { BlockLog, NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
6970
import { getExecutionErrorStatus, hasExecutionResult } from '@/executor/utils/errors'
7071
import { Serializer } from '@/serializer'
7172
import { CORE_TRIGGER_TYPES, type CoreTriggerType } from '@/stores/logs/filters/types'
@@ -75,6 +76,20 @@ const logger = createLogger('WorkflowExecuteAPI')
7576
export const runtime = 'nodejs'
7677
export const dynamic = 'force-dynamic'
7778

79+
async function compactRoutePayload<T>(
80+
value: T,
81+
context: {
82+
workspaceId?: string
83+
workflowId?: string
84+
executionId?: string
85+
userId?: string
86+
preserveUserFileBase64?: boolean
87+
preserveRoot?: boolean
88+
}
89+
): Promise<T> {
90+
return compactExecutionPayload(value, { ...context, requireDurable: true })
91+
}
92+
7893
function resolveOutputIds(
7994
selectedOutputs: string[] | undefined,
8095
blocks: Record<string, any>
@@ -719,6 +734,14 @@ async function handleExecutePost(
719734
})
720735

721736
await handlePostExecutionPauseState({ result, workflowId, executionId, loggingSession })
737+
const compactResultOutput = await compactRoutePayload(result.output, {
738+
workspaceId,
739+
workflowId,
740+
executionId,
741+
userId: actorUserId,
742+
preserveUserFileBase64: true,
743+
preserveRoot: true,
744+
})
722745

723746
if (
724747
result.status === 'cancelled' &&
@@ -734,7 +757,7 @@ async function handleExecutePost(
734757
return NextResponse.json(
735758
{
736759
success: false,
737-
output: result.output,
760+
output: compactResultOutput,
738761
error: timeoutErrorMessage,
739762
metadata: result.metadata
740763
? {
@@ -756,16 +779,23 @@ async function handleExecutePost(
756779
})) as NormalizedBlockOutput)
757780
: result.output
758781

759-
const resultWithBase64 = { ...result, output: outputWithBase64 }
760-
761-
if (auth.authType !== AuthType.INTERNAL_JWT && workflowHasResponseBlock(resultWithBase64)) {
762-
return createHttpResponseFromBlock(resultWithBase64)
782+
if (auth.authType !== AuthType.INTERNAL_JWT && workflowHasResponseBlock(result)) {
783+
return createHttpResponseFromBlock({ ...result, output: outputWithBase64 })
763784
}
764785

786+
const compactOutput = await compactRoutePayload(outputWithBase64, {
787+
workspaceId,
788+
workflowId,
789+
executionId,
790+
userId: actorUserId,
791+
preserveUserFileBase64: true,
792+
preserveRoot: true,
793+
})
794+
765795
const filteredResult = {
766796
success: result.success,
767797
executionId,
768-
output: outputWithBase64,
798+
output: compactOutput,
769799
error: result.error,
770800
metadata: result.metadata
771801
? {
@@ -784,11 +814,21 @@ async function handleExecutePost(
784814

785815
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
786816
const status = getExecutionErrorStatus(error)
817+
const compactErrorOutput = executionResult?.output
818+
? await compactRoutePayload(executionResult.output, {
819+
workspaceId,
820+
workflowId,
821+
executionId,
822+
userId: actorUserId,
823+
preserveUserFileBase64: true,
824+
preserveRoot: true,
825+
})
826+
: undefined
787827

788828
return NextResponse.json(
789829
{
790830
success: false,
791-
output: executionResult?.output,
831+
output: compactErrorOutput,
792832
error: executionResult?.error || errorMessage || 'Execution failed',
793833
metadata: executionResult?.metadata
794834
? {
@@ -838,6 +878,9 @@ async function handleExecutePost(
838878
timeoutMs: preprocessResult.executionTimeout?.sync,
839879
},
840880
executionId,
881+
workspaceId,
882+
workflowId,
883+
userId: actorUserId,
841884
executeFn: async ({ onStream, onBlockComplete, abortSignal }) =>
842885
executeWorkflow(
843886
streamWorkflow,
@@ -872,7 +915,12 @@ async function handleExecutePost(
872915
let isStreamClosed = false
873916
let isManualAbortRegistered = false
874917

875-
const eventWriter = createExecutionEventWriter(executionId)
918+
const eventWriter = createExecutionEventWriter(executionId, {
919+
workspaceId,
920+
workflowId,
921+
userId: actorUserId,
922+
preserveUserFileBase64: includeFileBase64,
923+
})
876924
const metaInitialized = await initializeExecutionStreamMeta(executionId, {
877925
userId: actorUserId,
878926
workflowId,
@@ -898,16 +946,18 @@ async function handleExecutePost(
898946
terminalStatus?: TerminalExecutionStreamStatus
899947
) => {
900948
const isBuffered = event.type !== 'stream:chunk' && event.type !== 'stream:done'
949+
let eventToSend = event
901950
if (isBuffered) {
902951
const entry = terminalStatus
903952
? await eventWriter.writeTerminal(event, terminalStatus)
904953
: await eventWriter.write(event)
905-
event.eventId = entry.eventId
954+
eventToSend = entry.event
955+
eventToSend.eventId = entry.eventId
906956
terminalEventPublished ||= Boolean(terminalStatus)
907957
}
908958
if (!isStreamClosed) {
909959
try {
910-
controller.enqueue(encodeSSEEvent(event))
960+
controller.enqueue(encodeSSEEvent(eventToSend))
911961
} catch {
912962
isStreamClosed = true
913963
}
@@ -971,7 +1021,26 @@ async function handleExecutePost(
9711021
iterationContext?: IterationContext,
9721022
childWorkflowContext?: ChildWorkflowContext
9731023
) => {
974-
const hasError = callbackData.output?.error
1024+
const compactCallbackData = {
1025+
...callbackData,
1026+
input: await compactRoutePayload(callbackData.input, {
1027+
workspaceId,
1028+
workflowId,
1029+
executionId,
1030+
userId: actorUserId,
1031+
preserveUserFileBase64: includeFileBase64,
1032+
preserveRoot: true,
1033+
}),
1034+
output: await compactRoutePayload(callbackData.output, {
1035+
workspaceId,
1036+
workflowId,
1037+
executionId,
1038+
userId: actorUserId,
1039+
preserveUserFileBase64: includeFileBase64,
1040+
preserveRoot: true,
1041+
}),
1042+
}
1043+
const hasError = compactCallbackData.output?.error
9751044
const childWorkflowData = childWorkflowContext
9761045
? {
9771046
childWorkflowBlockId: childWorkflowContext.parentBlockId,
@@ -988,7 +1057,7 @@ async function handleExecutePost(
9881057
blockId,
9891058
blockName,
9901059
blockType,
991-
error: callbackData.output.error,
1060+
error: compactCallbackData.output.error,
9921061
})
9931062
await sendEvent({
9941063
type: 'block:error',
@@ -999,12 +1068,12 @@ async function handleExecutePost(
9991068
blockId,
10001069
blockName,
10011070
blockType,
1002-
input: callbackData.input,
1003-
error: callbackData.output.error,
1004-
durationMs: callbackData.executionTime || 0,
1005-
startedAt: callbackData.startedAt,
1006-
executionOrder: callbackData.executionOrder,
1007-
endedAt: callbackData.endedAt,
1071+
input: compactCallbackData.input,
1072+
error: compactCallbackData.output.error,
1073+
durationMs: compactCallbackData.executionTime || 0,
1074+
startedAt: compactCallbackData.startedAt,
1075+
executionOrder: compactCallbackData.executionOrder,
1076+
endedAt: compactCallbackData.endedAt,
10081077
...(iterationContext && {
10091078
iterationCurrent: iterationContext.iterationCurrent,
10101079
iterationTotal: iterationContext.iterationTotal,
@@ -1033,12 +1102,12 @@ async function handleExecutePost(
10331102
blockId,
10341103
blockName,
10351104
blockType,
1036-
input: callbackData.input,
1037-
output: callbackData.output,
1038-
durationMs: callbackData.executionTime || 0,
1039-
startedAt: callbackData.startedAt,
1040-
executionOrder: callbackData.executionOrder,
1041-
endedAt: callbackData.endedAt,
1105+
input: compactCallbackData.input,
1106+
output: compactCallbackData.output,
1107+
durationMs: compactCallbackData.executionTime || 0,
1108+
startedAt: compactCallbackData.startedAt,
1109+
executionOrder: compactCallbackData.executionOrder,
1110+
endedAt: compactCallbackData.endedAt,
10421111
...(iterationContext && {
10431112
iterationCurrent: iterationContext.iterationCurrent,
10441113
iterationTotal: iterationContext.iterationTotal,
@@ -1171,6 +1240,13 @@ async function handleExecutePost(
11711240
})
11721241

11731242
await handlePostExecutionPauseState({ result, workflowId, executionId, loggingSession })
1243+
const compactTerminalLogs = await compactBlockLogs(result.logs, {
1244+
workspaceId,
1245+
workflowId,
1246+
executionId,
1247+
userId: actorUserId,
1248+
requireDurable: true,
1249+
})
11741250

11751251
if (result.status === 'cancelled') {
11761252
if (timeoutController.isTimedOut() && timeoutController.timeoutMs) {
@@ -1191,7 +1267,7 @@ async function handleExecutePost(
11911267
data: {
11921268
error: timeoutErrorMessage,
11931269
duration: result.metadata?.duration || 0,
1194-
finalBlockLogs: result.logs,
1270+
finalBlockLogs: compactTerminalLogs,
11951271
},
11961272
},
11971273
'error'
@@ -1208,7 +1284,7 @@ async function handleExecutePost(
12081284
workflowId,
12091285
data: {
12101286
duration: result.metadata?.duration || 0,
1211-
finalBlockLogs: result.logs,
1287+
finalBlockLogs: compactTerminalLogs,
12121288
},
12131289
},
12141290
'cancelled'
@@ -1224,6 +1300,21 @@ async function handleExecutePost(
12241300
maxBytes: base64MaxBytes,
12251301
})
12261302
: result.output
1303+
const compactSseOutput = await compactRoutePayload(sseOutput, {
1304+
workspaceId,
1305+
workflowId,
1306+
executionId,
1307+
userId: actorUserId,
1308+
preserveUserFileBase64: true,
1309+
preserveRoot: true,
1310+
})
1311+
const compactFinalBlockLogs = await compactBlockLogs(result.logs, {
1312+
workspaceId,
1313+
workflowId,
1314+
executionId,
1315+
userId: actorUserId,
1316+
requireDurable: true,
1317+
})
12271318

12281319
if (result.status === 'paused') {
12291320
finalMetaStatus = 'complete'
@@ -1234,11 +1325,11 @@ async function handleExecutePost(
12341325
executionId,
12351326
workflowId,
12361327
data: {
1237-
output: sseOutput,
1328+
output: compactSseOutput,
12381329
duration: result.metadata?.duration || 0,
12391330
startTime: result.metadata?.startTime || startTime.toISOString(),
12401331
endTime: result.metadata?.endTime || new Date().toISOString(),
1241-
finalBlockLogs: result.logs,
1332+
finalBlockLogs: compactFinalBlockLogs,
12421333
},
12431334
},
12441335
'complete'
@@ -1253,11 +1344,11 @@ async function handleExecutePost(
12531344
workflowId,
12541345
data: {
12551346
success: result.success,
1256-
output: sseOutput,
1347+
output: compactSseOutput,
12571348
duration: result.metadata?.duration || 0,
12581349
startTime: result.metadata?.startTime || startTime.toISOString(),
12591350
endTime: result.metadata?.endTime || new Date().toISOString(),
1260-
finalBlockLogs: result.logs,
1351+
finalBlockLogs: compactFinalBlockLogs,
12611352
},
12621353
},
12631354
'complete'
@@ -1274,6 +1365,22 @@ async function handleExecutePost(
12741365
reqLogger.error(`SSE execution failed: ${errorMessage}`, { isTimeout })
12751366

12761367
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
1368+
let compactErrorLogs: BlockLog[] | undefined
1369+
try {
1370+
compactErrorLogs = executionResult?.logs
1371+
? await compactBlockLogs(executionResult.logs, {
1372+
workspaceId,
1373+
workflowId,
1374+
executionId,
1375+
userId: actorUserId,
1376+
requireDurable: true,
1377+
})
1378+
: undefined
1379+
} catch (compactionError) {
1380+
reqLogger.warn('Failed to compact SSE error logs, omitting oversized error details', {
1381+
error: toError(compactionError).message,
1382+
})
1383+
}
12771384

12781385
finalMetaStatus = 'error'
12791386
await sendEvent(
@@ -1285,7 +1392,7 @@ async function handleExecutePost(
12851392
data: {
12861393
error: executionResult?.error || errorMessage,
12871394
duration: executionResult?.metadata?.duration || 0,
1288-
finalBlockLogs: executionResult?.logs,
1395+
finalBlockLogs: compactErrorLogs,
12891396
},
12901397
},
12911398
'error'

0 commit comments

Comments
 (0)