diff --git a/apps/sim/executor/dag/builder.test.ts b/apps/sim/executor/dag/builder.test.ts index 32bf14d0f1..39df0681a0 100644 --- a/apps/sim/executor/dag/builder.test.ts +++ b/apps/sim/executor/dag/builder.test.ts @@ -24,6 +24,71 @@ function createBlock(id: string, metadataId: string): SerializedBlock { } } +describe('DAGBuilder disabled subflow validation', () => { + it('skips validation for disabled loops with no blocks inside', () => { + const workflow: SerializedWorkflow = { + version: '1', + blocks: [ + createBlock('start', BlockType.STARTER), + { ...createBlock('loop-block', BlockType.FUNCTION), enabled: false }, + ], + connections: [], + loops: { + 'loop-1': { + id: 'loop-1', + nodes: [], // Empty loop - would normally throw + iterations: 3, + }, + }, + } + + const builder = new DAGBuilder() + // Should not throw even though loop has no blocks inside + expect(() => builder.build(workflow)).not.toThrow() + }) + + it('skips validation for disabled parallels with no blocks inside', () => { + const workflow: SerializedWorkflow = { + version: '1', + blocks: [createBlock('start', BlockType.STARTER)], + connections: [], + loops: {}, + parallels: { + 'parallel-1': { + id: 'parallel-1', + nodes: [], // Empty parallel - would normally throw + }, + }, + } + + const builder = new DAGBuilder() + // Should not throw even though parallel has no blocks inside + expect(() => builder.build(workflow)).not.toThrow() + }) + + it('skips validation for loops where all inner blocks are disabled', () => { + const workflow: SerializedWorkflow = { + version: '1', + blocks: [ + createBlock('start', BlockType.STARTER), + { ...createBlock('inner-block', BlockType.FUNCTION), enabled: false }, + ], + connections: [], + loops: { + 'loop-1': { + id: 'loop-1', + nodes: ['inner-block'], // Has node but it's disabled + iterations: 3, + }, + }, + } + + const builder = new DAGBuilder() + // Should not throw - loop is effectively disabled since all inner blocks are disabled + expect(() => builder.build(workflow)).not.toThrow() + }) +}) + describe('DAGBuilder human-in-the-loop transformation', () => { it('creates trigger nodes and rewires edges for pause blocks', () => { const workflow: SerializedWorkflow = { diff --git a/apps/sim/executor/dag/builder.ts b/apps/sim/executor/dag/builder.ts index b072faaede..5465df6342 100644 --- a/apps/sim/executor/dag/builder.ts +++ b/apps/sim/executor/dag/builder.ts @@ -136,17 +136,18 @@ export class DAGBuilder { nodes: string[] | undefined, type: 'Loop' | 'Parallel' ): void { + const sentinelStartId = + type === 'Loop' ? buildSentinelStartId(id) : buildParallelSentinelStartId(id) + const sentinelStartNode = dag.nodes.get(sentinelStartId) + + if (!sentinelStartNode) return + if (!nodes || nodes.length === 0) { throw new Error( `${type} has no blocks inside. Add at least one block to the ${type.toLowerCase()}.` ) } - const sentinelStartId = - type === 'Loop' ? buildSentinelStartId(id) : buildParallelSentinelStartId(id) - const sentinelStartNode = dag.nodes.get(sentinelStartId) - if (!sentinelStartNode) return - const hasConnections = Array.from(sentinelStartNode.outgoingEdges.values()).some((edge) => nodes.includes(extractBaseBlockId(edge.target)) ) diff --git a/apps/sim/executor/execution/engine.test.ts b/apps/sim/executor/execution/engine.test.ts index f93ebc2068..880dc77c86 100644 --- a/apps/sim/executor/execution/engine.test.ts +++ b/apps/sim/executor/execution/engine.test.ts @@ -554,6 +554,413 @@ describe('ExecutionEngine', () => { }) }) + describe('Error handling in execution', () => { + it('should fail execution when a single node throws an error', async () => { + const startNode = createMockNode('start', 'starter') + const errorNode = createMockNode('error-node', 'function') + startNode.outgoingEdges.set('edge1', { target: 'error-node' }) + + const dag = createMockDAG([startNode, errorNode]) + const context = createMockContext() + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') return ['error-node'] + return [] + }) + + const nodeOrchestrator = { + executionCount: 0, + executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => { + if (nodeId === 'error-node') { + throw new Error('Block execution failed') + } + return { nodeId, output: {}, isFinalOutput: false } + }), + handleNodeCompletion: vi.fn(), + } as unknown as MockNodeOrchestrator + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + + await expect(engine.run('start')).rejects.toThrow('Block execution failed') + }) + + it('should stop parallel branches when one branch throws an error', async () => { + const startNode = createMockNode('start', 'starter') + const parallelNodes = Array.from({ length: 5 }, (_, i) => + createMockNode(`parallel${i}`, 'function') + ) + + parallelNodes.forEach((_, i) => { + startNode.outgoingEdges.set(`edge${i}`, { target: `parallel${i}` }) + }) + + const dag = createMockDAG([startNode, ...parallelNodes]) + const context = createMockContext() + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') return parallelNodes.map((_, i) => `parallel${i}`) + return [] + }) + + const executedNodes: string[] = [] + const nodeOrchestrator = { + executionCount: 0, + executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => { + executedNodes.push(nodeId) + if (nodeId === 'parallel0') { + await new Promise((resolve) => setTimeout(resolve, 10)) + throw new Error('Parallel branch failed') + } + await new Promise((resolve) => setTimeout(resolve, 100)) + return { nodeId, output: {}, isFinalOutput: false } + }), + handleNodeCompletion: vi.fn(), + } as unknown as MockNodeOrchestrator + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + + await expect(engine.run('start')).rejects.toThrow('Parallel branch failed') + }) + + it('should capture only the first error when multiple parallel branches fail', async () => { + const startNode = createMockNode('start', 'starter') + const parallelNodes = Array.from({ length: 3 }, (_, i) => + createMockNode(`parallel${i}`, 'function') + ) + + parallelNodes.forEach((_, i) => { + startNode.outgoingEdges.set(`edge${i}`, { target: `parallel${i}` }) + }) + + const dag = createMockDAG([startNode, ...parallelNodes]) + const context = createMockContext() + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') return parallelNodes.map((_, i) => `parallel${i}`) + return [] + }) + + const nodeOrchestrator = { + executionCount: 0, + executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => { + if (nodeId === 'parallel0') { + await new Promise((resolve) => setTimeout(resolve, 10)) + throw new Error('First error') + } + if (nodeId === 'parallel1') { + await new Promise((resolve) => setTimeout(resolve, 20)) + throw new Error('Second error') + } + if (nodeId === 'parallel2') { + await new Promise((resolve) => setTimeout(resolve, 30)) + throw new Error('Third error') + } + return { nodeId, output: {}, isFinalOutput: false } + }), + handleNodeCompletion: vi.fn(), + } as unknown as MockNodeOrchestrator + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + + await expect(engine.run('start')).rejects.toThrow('First error') + }) + + it('should wait for ongoing executions to complete before throwing error', async () => { + const startNode = createMockNode('start', 'starter') + const fastErrorNode = createMockNode('fast-error', 'function') + const slowNode = createMockNode('slow', 'function') + + startNode.outgoingEdges.set('edge1', { target: 'fast-error' }) + startNode.outgoingEdges.set('edge2', { target: 'slow' }) + + const dag = createMockDAG([startNode, fastErrorNode, slowNode]) + const context = createMockContext() + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') return ['fast-error', 'slow'] + return [] + }) + + let slowNodeCompleted = false + const nodeOrchestrator = { + executionCount: 0, + executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => { + if (nodeId === 'fast-error') { + await new Promise((resolve) => setTimeout(resolve, 10)) + throw new Error('Fast error') + } + if (nodeId === 'slow') { + await new Promise((resolve) => setTimeout(resolve, 50)) + slowNodeCompleted = true + return { nodeId, output: {}, isFinalOutput: false } + } + return { nodeId, output: {}, isFinalOutput: false } + }), + handleNodeCompletion: vi.fn(), + } as unknown as MockNodeOrchestrator + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + + await expect(engine.run('start')).rejects.toThrow('Fast error') + + expect(slowNodeCompleted).toBe(true) + }) + + it('should not queue new nodes after an error occurs', async () => { + const startNode = createMockNode('start', 'starter') + const errorNode = createMockNode('error-node', 'function') + const afterErrorNode = createMockNode('after-error', 'function') + + startNode.outgoingEdges.set('edge1', { target: 'error-node' }) + errorNode.outgoingEdges.set('edge2', { target: 'after-error' }) + + const dag = createMockDAG([startNode, errorNode, afterErrorNode]) + const context = createMockContext() + + const queuedNodes: string[] = [] + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') { + queuedNodes.push('error-node') + return ['error-node'] + } + if (node.id === 'error-node') { + queuedNodes.push('after-error') + return ['after-error'] + } + return [] + }) + + const executedNodes: string[] = [] + const nodeOrchestrator = { + executionCount: 0, + executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => { + executedNodes.push(nodeId) + if (nodeId === 'error-node') { + throw new Error('Node error') + } + return { nodeId, output: {}, isFinalOutput: false } + }), + handleNodeCompletion: vi.fn(), + } as unknown as MockNodeOrchestrator + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + + await expect(engine.run('start')).rejects.toThrow('Node error') + + expect(executedNodes).not.toContain('after-error') + }) + + it('should populate error result with metadata when execution fails', async () => { + const startNode = createMockNode('start', 'starter') + const errorNode = createMockNode('error-node', 'function') + startNode.outgoingEdges.set('edge1', { target: 'error-node' }) + + const dag = createMockDAG([startNode, errorNode]) + const context = createMockContext() + context.blockLogs.push({ + blockId: 'start', + blockName: 'Start', + blockType: 'starter', + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + durationMs: 10, + success: true, + }) + + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') return ['error-node'] + return [] + }) + + const nodeOrchestrator = { + executionCount: 0, + executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => { + if (nodeId === 'error-node') { + const error = new Error('Execution failed') as any + error.executionResult = { + success: false, + output: { partial: 'data' }, + logs: context.blockLogs, + metadata: context.metadata, + } + throw error + } + return { nodeId, output: {}, isFinalOutput: false } + }), + handleNodeCompletion: vi.fn(), + } as unknown as MockNodeOrchestrator + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + + try { + await engine.run('start') + expect.fail('Should have thrown') + } catch (error: any) { + expect(error.executionResult).toBeDefined() + expect(error.executionResult.metadata.endTime).toBeDefined() + expect(error.executionResult.metadata.duration).toBeDefined() + } + }) + + it('should prefer cancellation status over error when both occur', async () => { + const abortController = new AbortController() + + const startNode = createMockNode('start', 'starter') + const errorNode = createMockNode('error-node', 'function') + startNode.outgoingEdges.set('edge1', { target: 'error-node' }) + + const dag = createMockDAG([startNode, errorNode]) + const context = createMockContext({ abortSignal: abortController.signal }) + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') return ['error-node'] + return [] + }) + + const nodeOrchestrator = { + executionCount: 0, + executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => { + if (nodeId === 'error-node') { + abortController.abort() + throw new Error('Node error') + } + return { nodeId, output: {}, isFinalOutput: false } + }), + handleNodeCompletion: vi.fn(), + } as unknown as MockNodeOrchestrator + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + const result = await engine.run('start') + + expect(result.status).toBe('cancelled') + expect(result.success).toBe(false) + }) + + it('should stop loop iteration when error occurs in loop body', async () => { + const loopStartNode = createMockNode('loop-start', 'loop_sentinel') + loopStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId: 'loop1' } + + const loopBodyNode = createMockNode('loop-body', 'function') + loopBodyNode.metadata = { isLoopNode: true, loopId: 'loop1' } + + const loopEndNode = createMockNode('loop-end', 'loop_sentinel') + loopEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId: 'loop1' } + + const afterLoopNode = createMockNode('after-loop', 'function') + + loopStartNode.outgoingEdges.set('edge1', { target: 'loop-body' }) + loopBodyNode.outgoingEdges.set('edge2', { target: 'loop-end' }) + loopEndNode.outgoingEdges.set('loop_continue', { + target: 'loop-start', + sourceHandle: 'loop_continue', + }) + loopEndNode.outgoingEdges.set('loop_complete', { + target: 'after-loop', + sourceHandle: 'loop_complete', + }) + + const dag = createMockDAG([loopStartNode, loopBodyNode, loopEndNode, afterLoopNode]) + const context = createMockContext() + + let iterationCount = 0 + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'loop-start') return ['loop-body'] + if (node.id === 'loop-body') return ['loop-end'] + if (node.id === 'loop-end') { + iterationCount++ + if (iterationCount < 5) return ['loop-start'] + return ['after-loop'] + } + return [] + }) + + const nodeOrchestrator = { + executionCount: 0, + executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => { + if (nodeId === 'loop-body' && iterationCount >= 2) { + throw new Error('Loop body error on iteration 3') + } + return { nodeId, output: {}, isFinalOutput: false } + }), + handleNodeCompletion: vi.fn(), + } as unknown as MockNodeOrchestrator + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + + await expect(engine.run('loop-start')).rejects.toThrow('Loop body error on iteration 3') + + expect(iterationCount).toBeLessThanOrEqual(3) + }) + + it('should handle error that is not an Error instance', async () => { + const startNode = createMockNode('start', 'starter') + const errorNode = createMockNode('error-node', 'function') + startNode.outgoingEdges.set('edge1', { target: 'error-node' }) + + const dag = createMockDAG([startNode, errorNode]) + const context = createMockContext() + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') return ['error-node'] + return [] + }) + + const nodeOrchestrator = { + executionCount: 0, + executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => { + if (nodeId === 'error-node') { + throw 'String error message' + } + return { nodeId, output: {}, isFinalOutput: false } + }), + handleNodeCompletion: vi.fn(), + } as unknown as MockNodeOrchestrator + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + + await expect(engine.run('start')).rejects.toThrow('String error message') + }) + + it('should preserve partial output when error occurs after some blocks complete', async () => { + const startNode = createMockNode('start', 'starter') + const successNode = createMockNode('success', 'function') + const errorNode = createMockNode('error-node', 'function') + + startNode.outgoingEdges.set('edge1', { target: 'success' }) + successNode.outgoingEdges.set('edge2', { target: 'error-node' }) + + const dag = createMockDAG([startNode, successNode, errorNode]) + const context = createMockContext() + const edgeManager = createMockEdgeManager((node) => { + if (node.id === 'start') return ['success'] + if (node.id === 'success') return ['error-node'] + return [] + }) + + const nodeOrchestrator = { + executionCount: 0, + executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => { + if (nodeId === 'success') { + return { nodeId, output: { successData: 'preserved' }, isFinalOutput: false } + } + if (nodeId === 'error-node') { + throw new Error('Late error') + } + return { nodeId, output: {}, isFinalOutput: false } + }), + handleNodeCompletion: vi.fn(), + } as unknown as MockNodeOrchestrator + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + + try { + await engine.run('start') + expect.fail('Should have thrown') + } catch (error: any) { + // Verify the error was thrown + expect(error.message).toBe('Late error') + // The partial output should be available in executionResult if attached + if (error.executionResult) { + expect(error.executionResult.output).toBeDefined() + } + } + }) + }) + describe('Cancellation flag behavior', () => { it('should set cancelledFlag when abort signal fires', async () => { const abortController = new AbortController() diff --git a/apps/sim/executor/execution/engine.ts b/apps/sim/executor/execution/engine.ts index 7c2317b047..58792ef2b8 100644 --- a/apps/sim/executor/execution/engine.ts +++ b/apps/sim/executor/execution/engine.ts @@ -25,6 +25,8 @@ export class ExecutionEngine { private pausedBlocks: Map = new Map() private allowResumeTriggers: boolean private cancelledFlag = false + private errorFlag = false + private executionError: Error | null = null private lastCancellationCheck = 0 private readonly useRedisCancellation: boolean private readonly CANCELLATION_CHECK_INTERVAL_MS = 500 @@ -103,7 +105,7 @@ export class ExecutionEngine { this.initializeQueue(triggerBlockId) while (this.hasWork()) { - if (await this.checkCancellation()) { + if ((await this.checkCancellation()) || this.errorFlag) { break } await this.processQueue() @@ -113,6 +115,11 @@ export class ExecutionEngine { await this.waitForAllExecutions() } + // Rethrow the captured error so it's handled by the catch block + if (this.errorFlag && this.executionError) { + throw this.executionError + } + if (this.pausedBlocks.size > 0) { return this.buildPausedResult(startTime) } @@ -196,11 +203,17 @@ export class ExecutionEngine { } private trackExecution(promise: Promise): void { - this.executing.add(promise) - promise.catch(() => {}) - promise.finally(() => { - this.executing.delete(promise) - }) + const trackedPromise = promise + .catch((error) => { + if (!this.errorFlag) { + this.errorFlag = true + this.executionError = error instanceof Error ? error : new Error(String(error)) + } + }) + .finally(() => { + this.executing.delete(trackedPromise) + }) + this.executing.add(trackedPromise) } private async waitForAnyExecution(): Promise { @@ -315,7 +328,7 @@ export class ExecutionEngine { private async processQueue(): Promise { while (this.readyQueue.length > 0) { - if (await this.checkCancellation()) { + if ((await this.checkCancellation()) || this.errorFlag) { break } const nodeId = this.dequeue() @@ -324,7 +337,7 @@ export class ExecutionEngine { this.trackExecution(promise) } - if (this.executing.size > 0 && !this.cancelledFlag) { + if (this.executing.size > 0 && !this.cancelledFlag && !this.errorFlag) { await this.waitForAnyExecution() } } diff --git a/apps/sim/executor/handlers/workflow/workflow-handler.test.ts b/apps/sim/executor/handlers/workflow/workflow-handler.test.ts index bf304c786d..21d9cd3fdb 100644 --- a/apps/sim/executor/handlers/workflow/workflow-handler.test.ts +++ b/apps/sim/executor/handlers/workflow/workflow-handler.test.ts @@ -204,26 +204,21 @@ describe('WorkflowBlockHandler', () => { }) }) - it('should map failed child output correctly', () => { + it('should throw error for failed child output so BlockExecutor can check error port', () => { const childResult = { success: false, error: 'Child workflow failed', } - const result = (handler as any).mapChildOutputToParent( - childResult, - 'child-id', - 'Child Workflow', - 100 - ) + expect(() => + (handler as any).mapChildOutputToParent(childResult, 'child-id', 'Child Workflow', 100) + ).toThrow('Error in child workflow "Child Workflow": Child workflow failed') - expect(result).toEqual({ - success: false, - childWorkflowName: 'Child Workflow', - result: {}, - error: 'Child workflow failed', - childTraceSpans: [], - }) + try { + ;(handler as any).mapChildOutputToParent(childResult, 'child-id', 'Child Workflow', 100) + } catch (error: any) { + expect(error.childTraceSpans).toEqual([]) + } }) it('should handle nested response structures', () => { diff --git a/apps/sim/executor/handlers/workflow/workflow-handler.ts b/apps/sim/executor/handlers/workflow/workflow-handler.ts index 94f0908897..d8f7ced358 100644 --- a/apps/sim/executor/handlers/workflow/workflow-handler.ts +++ b/apps/sim/executor/handlers/workflow/workflow-handler.ts @@ -144,6 +144,11 @@ export class WorkflowBlockHandler implements BlockHandler { const workflowMetadata = workflows[workflowId] const childWorkflowName = workflowMetadata?.name || workflowId + const originalError = error.message || 'Unknown error' + const wrappedError = new Error( + `Error in child workflow "${childWorkflowName}": ${originalError}` + ) + if (error.executionResult?.logs) { const executionResult = error.executionResult as ExecutionResult @@ -159,28 +164,12 @@ export class WorkflowBlockHandler implements BlockHandler { ) logger.info(`Captured ${childTraceSpans.length} child trace spans from failed execution`) - - return { - success: false, - childWorkflowName, - result: {}, - error: error.message || 'Child workflow execution failed', - childTraceSpans: childTraceSpans, - } as Record + ;(wrappedError as any).childTraceSpans = childTraceSpans + } else if (error.childTraceSpans && Array.isArray(error.childTraceSpans)) { + ;(wrappedError as any).childTraceSpans = error.childTraceSpans } - if (error.childTraceSpans && Array.isArray(error.childTraceSpans)) { - return { - success: false, - childWorkflowName, - result: {}, - error: error.message || 'Child workflow execution failed', - childTraceSpans: error.childTraceSpans, - } as Record - } - - const originalError = error.message || 'Unknown error' - throw new Error(`Error in child workflow "${childWorkflowName}": ${originalError}`) + throw wrappedError } } @@ -452,17 +441,13 @@ export class WorkflowBlockHandler implements BlockHandler { if (!success) { logger.warn(`Child workflow ${childWorkflowName} failed`) - // Return failure with child trace spans so they can be displayed - return { - success: false, - childWorkflowName, - result, - error: childResult.error || 'Child workflow execution failed', - childTraceSpans: childTraceSpans || [], - } as Record + const error = new Error( + `Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}` + ) + ;(error as any).childTraceSpans = childTraceSpans || [] + throw error } - // Success case return { success: true, childWorkflowName,