From 738b42f172ae0f9e9f7a15ca38bb6726033bf55a Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Sat, 20 Jun 2026 12:23:31 -0700 Subject: [PATCH 1/2] fix(executor): stop HITL error edges from firing on successful resume --- .../executor/execution/edge-manager.test.ts | 111 +++++++++++++++ apps/sim/executor/execution/edge-manager.ts | 27 ++-- apps/sim/executor/execution/engine.test.ts | 132 ++++++++++++++++++ apps/sim/executor/execution/engine.ts | 63 +++++++-- 4 files changed, 305 insertions(+), 28 deletions(-) diff --git a/apps/sim/executor/execution/edge-manager.test.ts b/apps/sim/executor/execution/edge-manager.test.ts index d1c05122a6c..80bd4100d34 100644 --- a/apps/sim/executor/execution/edge-manager.test.ts +++ b/apps/sim/executor/execution/edge-manager.test.ts @@ -875,6 +875,117 @@ describe('EdgeManager', () => { }) }) + describe('deactivateResumedEdge', () => { + it('prunes a resumed pause block error edge without firing the error target', () => { + // Models the HITL-resume bug: a pause block and a regular block both feed + // an error-notifier via `error` handles. On a fully successful run the + // notifier must never run. + const pauseId = 'pause-block' + const regularId = 'regular-block' + const notifyId = 'error-notify' + + const pauseNode = createMockNode( + pauseId, + [ + { target: 'next', sourceHandle: EDGE.SOURCE }, + { target: notifyId, sourceHandle: EDGE.ERROR }, + ], + [] + ) + const regularNode = createMockNode(regularId, [ + { target: notifyId, sourceHandle: EDGE.ERROR }, + ]) + const notifyNode = createMockNode(notifyId, [], [pauseId, regularId]) + + const dag = createMockDAG( + new Map([ + [pauseId, pauseNode], + [regularId, regularNode], + [notifyId, notifyNode], + ]) + ) + const edgeManager = new EdgeManager(dag) + + // Resume releases the pause block's error edge as deactivated. + edgeManager.deactivateResumedEdge(pauseId, notifyId, EDGE.ERROR) + + expect(edgeManager.getDeactivatedEdges()).toContain( + JSON.stringify([pauseId, notifyId, EDGE.ERROR]) + ) + expect(edgeManager.getNodesWithActivatedEdge()).not.toContain(notifyId) + + // The regular block then completes successfully → its error edge deactivates too. + const readyNodes = edgeManager.processOutgoingEdges(regularNode, { result: 'ok' }) + + // With no error edge ever activated, the notifier is never scheduled. + expect(readyNodes).not.toContain(notifyId) + expect(edgeManager.getNodesWithActivatedEdge()).not.toContain(notifyId) + }) + + it('still fires the error target when a real upstream block errors', () => { + // Same topology, but here the regular block genuinely errors — the notifier + // must fire even though the pause block's error edge was pruned on resume. + const pauseId = 'pause-block' + const regularId = 'regular-block' + const notifyId = 'error-notify' + + const regularNode = createMockNode(regularId, [ + { target: notifyId, sourceHandle: EDGE.ERROR }, + ]) + const notifyNode = createMockNode(notifyId, [], [pauseId, regularId]) + + const dag = createMockDAG( + new Map([ + [regularId, regularNode], + [notifyId, notifyNode], + ]) + ) + const edgeManager = new EdgeManager(dag) + + edgeManager.deactivateResumedEdge(pauseId, notifyId, EDGE.ERROR) + + const readyNodes = edgeManager.processOutgoingEdges(regularNode, { error: 'boom' }) + + expect(readyNodes).toContain(notifyId) + expect(edgeManager.getNodesWithActivatedEdge()).toContain(notifyId) + }) + + it('leaves a convergence join ready+activated after pruning a pause error edge', () => { + // Join `C` is fed by `B.source` (succeeds) and `P.error` (pause block). + // After B activates and P's error edge is pruned on resume, C must be both + // activated and ready so the engine re-queues it. + const sourceId = 'block-b' + const pauseId = 'pause-block' + const joinId = 'join-c' + + const sourceNode = createMockNode(sourceId, [{ target: joinId, sourceHandle: EDGE.SOURCE }]) + const pauseNode = createMockNode(pauseId, [{ target: joinId, sourceHandle: EDGE.ERROR }]) + const joinNode = createMockNode(joinId, [], [sourceId, pauseId]) + + const edgeManager = new EdgeManager( + createMockDAG( + new Map([ + [sourceId, sourceNode], + [pauseId, pauseNode], + [joinId, joinNode], + ]) + ) + ) + + // Phase 1: B succeeds → activates B→C, but C still waits on the pause edge. + const readyAfterB = edgeManager.processOutgoingEdges(sourceNode, { result: 'ok' }) + expect(readyAfterB).not.toContain(joinId) + expect(edgeManager.hasActivatedEdge(joinId)).toBe(true) + expect(edgeManager.isNodeReady(joinNode)).toBe(false) + + // Resume: pause block's error edge is pruned → C is now ready (and stays + // activated), which is exactly the state the engine uses to re-queue it. + edgeManager.deactivateResumedEdge(pauseId, joinId, EDGE.ERROR) + expect(edgeManager.hasActivatedEdge(joinId)).toBe(true) + expect(edgeManager.isNodeReady(joinNode)).toBe(true) + }) + }) + describe('Diamond pattern (convergent paths)', () => { it('should handle diamond: condition splits then converges at merge point', () => { const conditionId = 'condition-1' diff --git a/apps/sim/executor/execution/edge-manager.ts b/apps/sim/executor/execution/edge-manager.ts index 86ce85f7eef..c09acd42103 100644 --- a/apps/sim/executor/execution/edge-manager.ts +++ b/apps/sim/executor/execution/edge-manager.ts @@ -134,6 +134,10 @@ export class EdgeManager { return Array.from(this.nodesWithActivatedEdge) } + hasActivatedEdge(nodeId: string): boolean { + return this.nodesWithActivatedEdge.has(nodeId) + } + restoreDeactivatedEdges(edgeKeys?: string[], activatedNodeIds?: string[]): void { this.deactivatedEdges = new Set( (edgeKeys ?? []).map((edgeKey) => this.normalizeSerializedEdgeKey(edgeKey)) @@ -145,17 +149,10 @@ export class EdgeManager { this.nodesWithActivatedEdge.add(nodeId) } - /** - * Clear deactivated edges for a set of nodes (used when restoring loop state for next iteration). - * - * Only clears edges whose SOURCE is in the provided set. Edges pointing INTO a node in the set - * whose source lives outside (e.g. an external branch whose path was cascade-deactivated) must - * remain deactivated — otherwise `countActiveIncomingEdges` would count a source that will never - * fire again, stalling the loop on its next iteration. - * - * Deactivated edge keys encode the source separately so node IDs with shared prefixes - * cannot clear each other's deactivated edges. - */ + deactivateResumedEdge(sourceId: string, targetId: string, sourceHandle?: string): void { + this.deactivateEdgeAndDescendants(sourceId, targetId, sourceHandle) + } + clearDeactivatedEdgesForNodes(nodeIds: Set): void { const edgesToRemove: string[] = [] for (const edgeKey of this.deactivatedEdges) { @@ -182,11 +179,6 @@ export class EdgeManager { return targetNode ? this.isNodeReady(targetNode) : false } - /** - * Checks if the cascade target sentinel belongs to the same subflow as the source node. - * A condition inside a loop that hits a dead-end should still allow the enclosing - * loop's sentinel to fire so the loop can continue or exit. - */ private isEnclosingSentinel(sourceNode: DAGNode, sentinelId: string): boolean { const sentinel = this.dag.nodes.get(sentinelId) if (!sentinel?.metadata.isSentinel) return false @@ -321,9 +313,6 @@ export class EdgeManager { } } - /** - * Checks if a node has any active incoming edges besides the one being excluded. - */ private hasActiveIncomingEdges(node: DAGNode, excludeEdgeKey: string): boolean { for (const incomingSourceId of node.incomingEdges) { const incomingNode = this.dag.nodes.get(incomingSourceId) diff --git a/apps/sim/executor/execution/engine.test.ts b/apps/sim/executor/execution/engine.test.ts index b9c38ed14e7..904fc5f22bb 100644 --- a/apps/sim/executor/execution/engine.test.ts +++ b/apps/sim/executor/execution/engine.test.ts @@ -112,6 +112,8 @@ function createMockEdgeManager( getDeactivatedEdges: vi.fn(() => []), getNodesWithActivatedEdge: vi.fn(() => []), markNodeWithActivatedEdge: vi.fn(), + deactivateResumedEdge: vi.fn(), + hasActivatedEdge: vi.fn(() => false), } as unknown as MockEdgeManager } @@ -231,6 +233,136 @@ describe('ExecutionEngine', () => { expect(nodeOrchestrator.executeNode).not.toHaveBeenCalled() }) + it('deactivates a resumed pause block error edge instead of firing it', async () => { + // Pause block has two outgoing edges: a `source` continuation edge and an + // `error` edge to an error-notifier. On a successful resume the error edge + // must be deactivated (never marked/queued); only the continuation fires. + const pauseNode = createMockNode('pause-block', 'human_in_the_loop') + pauseNode.outgoingEdges.set('pause-block→next-source', { + target: 'next', + sourceHandle: EDGE.SOURCE, + }) + pauseNode.outgoingEdges.set('pause-block→notify-error', { + target: 'error-notify', + sourceHandle: EDGE.ERROR, + }) + + const nextNode = createMockNode('next', 'function') + nextNode.incomingEdges.add('pause-block') + + const errorNotifyNode = createMockNode('error-notify', 'gmail') + errorNotifyNode.incomingEdges.add('pause-block') + + const dag = createMockDAG([pauseNode, nextNode, errorNotifyNode]) + const context = createMockContext({ + metadata: { + executionId: 'test-execution', + startTime: new Date().toISOString(), + pendingBlocks: [], + // remainingEdges omit sourceHandle (as persisted snapshots do), forcing + // the engine to resolve the handle from the live DAG. + remainingEdges: [ + { source: 'pause-block', target: 'next' }, + { source: 'pause-block', target: 'error-notify' }, + ], + } as any, + }) + const edgeManager = createMockEdgeManager(() => []) + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + await engine.run() + + // Continuation edge fires; error edge is deactivated, not activated/queued. + expect(edgeManager.markNodeWithActivatedEdge).toHaveBeenCalledWith('next') + expect(edgeManager.markNodeWithActivatedEdge).not.toHaveBeenCalledWith('error-notify') + expect(edgeManager.deactivateResumedEdge).toHaveBeenCalledWith( + 'pause-block', + 'error-notify', + EDGE.ERROR + ) + // A pure error-handler target (never activated) must not be executed. + expect(nodeOrchestrator.executeNode).not.toHaveBeenCalledWith(context, 'error-notify') + }) + + it('re-queues a convergence target when a resumed pause error edge is pruned', async () => { + // The join is fed by a succeeding block's `source` edge (activated in + // phase 1) AND the pause block's `error` edge. On resume the error edge is + // pruned, but the join must still run because it already had a genuine + // activation — otherwise it would be silently stranded. + const pauseNode = createMockNode('pause-block', 'human_in_the_loop') + pauseNode.outgoingEdges.set('pause-block→join-error', { + target: 'join', + sourceHandle: EDGE.ERROR, + }) + const joinNode = createMockNode('join', 'function') + joinNode.incomingEdges.add('pause-block') + + const dag = createMockDAG([pauseNode, joinNode]) + const context = createMockContext({ + metadata: { + executionId: 'test-execution', + startTime: new Date().toISOString(), + pendingBlocks: [], + remainingEdges: [{ source: 'pause-block', target: 'join' }], + } as any, + }) + const edgeManager = createMockEdgeManager(() => []) + // Join already received a genuine activation in phase 1 and is now ready. + vi.mocked(edgeManager.hasActivatedEdge).mockReturnValue(true) + vi.mocked(edgeManager.isNodeReady).mockReturnValue(true) + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + await engine.run() + + expect(edgeManager.deactivateResumedEdge).toHaveBeenCalledWith( + 'pause-block', + 'join', + EDGE.ERROR + ) + // Pruning is via deactivation, never force-activation... + expect(edgeManager.markNodeWithActivatedEdge).not.toHaveBeenCalledWith('join') + // ...but the already-activated convergence node still runs. + expect(nodeOrchestrator.executeNode).toHaveBeenCalledWith(context, 'join') + }) + + it('prefers the continuation handle when a pause block also errors into the same target', async () => { + // Pause block wires BOTH source and error into the same target. The error + // edge is registered first, but on a successful resume the continuation + // (source) handle must win, so the target is activated, not pruned. + const pauseNode = createMockNode('pause-block', 'human_in_the_loop') + pauseNode.outgoingEdges.set('pause-block→both-error', { + target: 'both', + sourceHandle: EDGE.ERROR, + }) + pauseNode.outgoingEdges.set('pause-block→both-source', { + target: 'both', + sourceHandle: EDGE.SOURCE, + }) + const bothNode = createMockNode('both', 'function') + bothNode.incomingEdges.add('pause-block') + + const dag = createMockDAG([pauseNode, bothNode]) + const context = createMockContext({ + metadata: { + executionId: 'test-execution', + startTime: new Date().toISOString(), + pendingBlocks: [], + remainingEdges: [{ source: 'pause-block', target: 'both' }], + } as any, + }) + const edgeManager = createMockEdgeManager(() => []) + const nodeOrchestrator = createMockNodeOrchestrator() + + const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) + await engine.run() + + expect(edgeManager.markNodeWithActivatedEdge).toHaveBeenCalledWith('both') + expect(edgeManager.deactivateResumedEdge).not.toHaveBeenCalled() + expect(nodeOrchestrator.executeNode).toHaveBeenCalledWith(context, 'both') + }) + it('should execute all nodes in a multi-node workflow', async () => { const nodes = [ createMockNode('start', 'starter'), diff --git a/apps/sim/executor/execution/engine.ts b/apps/sim/executor/execution/engine.ts index 737ad53922d..9c65779a8eb 100644 --- a/apps/sim/executor/execution/engine.ts +++ b/apps/sim/executor/execution/engine.ts @@ -287,17 +287,33 @@ export class ExecutionEngine { for (const edge of remainingEdges) { const targetNode = this.dag.nodes.get(edge.target) - if (targetNode) { - const hadEdge = targetNode.incomingEdges.has(edge.source) - targetNode.incomingEdges.delete(edge.source) - if (hadEdge) { - this.edgeManager.markNodeWithActivatedEdge(targetNode.id) - } - - if (this.edgeManager.isNodeReady(targetNode)) { - this.execLogger.info('Node became ready after edge removal', { nodeId: targetNode.id }) + if (!targetNode) continue + + const sourceHandle = this.resolveRemainingEdgeHandle(edge) + if (sourceHandle === EDGE.ERROR) { + this.edgeManager.deactivateResumedEdge(edge.source, targetNode.id, sourceHandle) + + if ( + this.edgeManager.hasActivatedEdge(targetNode.id) && + this.edgeManager.isNodeReady(targetNode) + ) { + this.execLogger.info('Convergence node ready after pruning resumed error edge', { + nodeId: targetNode.id, + }) this.addToQueue(targetNode.id) } + continue + } + + const hadEdge = targetNode.incomingEdges.has(edge.source) + targetNode.incomingEdges.delete(edge.source) + if (hadEdge) { + this.edgeManager.markNodeWithActivatedEdge(targetNode.id) + } + + if (this.edgeManager.isNodeReady(targetNode)) { + this.execLogger.info('Node became ready after edge removal', { nodeId: targetNode.id }) + this.addToQueue(targetNode.id) } } @@ -351,6 +367,35 @@ export class ExecutionEngine { } } + /** + * Resolves the source handle for an edge released during pause/resume. + * Persisted `remainingEdges` may omit the handle, so fall back to the live DAG + * edge. When a source has both a continuation and an `error` edge to the same + * target, the continuation handle wins — a successful resume must not prune it. + */ + private resolveRemainingEdgeHandle(edge: { + source: string + target: string + sourceHandle?: string + }): string | undefined { + if (edge.sourceHandle !== undefined) return edge.sourceHandle + + const sourceNode = this.dag.nodes.get(edge.source) + if (!sourceNode) return undefined + + let hasErrorEdge = false + for (const [, outgoing] of sourceNode.outgoingEdges) { + if (outgoing.target !== edge.target) continue + if (outgoing.sourceHandle === EDGE.ERROR) { + hasErrorEdge = true + continue + } + return outgoing.sourceHandle + } + + return hasErrorEdge ? EDGE.ERROR : undefined + } + private async processQueue(): Promise { while (this.readyQueue.length > 0) { if (this.checkCancellation() || this.errorFlag) { From d1c26e49abf3766d98d6429a77722ee7fdd47477 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Sat, 20 Jun 2026 12:33:46 -0700 Subject: [PATCH 2/2] add comments --- apps/sim/executor/execution/edge-manager.ts | 30 +++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/apps/sim/executor/execution/edge-manager.ts b/apps/sim/executor/execution/edge-manager.ts index c09acd42103..148d88a352e 100644 --- a/apps/sim/executor/execution/edge-manager.ts +++ b/apps/sim/executor/execution/edge-manager.ts @@ -149,10 +149,32 @@ export class EdgeManager { this.nodesWithActivatedEdge.add(nodeId) } + /** + * Deactivates the `error` edge of a successfully-resumed pause block instead of + * firing it: the block completed normally, so its error path is pruned (and any + * now-dead descendants cascaded), mirroring how a normally-succeeding block's + * error edge is handled in {@link processOutgoingEdges}. + * + * `cascadeTargets` is intentionally left undefined here (unlike the + * {@link processOutgoingEdges} call site, which passes an explicit Set): the + * resume path has no loop/parallel sentinels to queue — the pause block's + * `source` edge drives continuation — so cascade-target collection is omitted. + */ deactivateResumedEdge(sourceId: string, targetId: string, sourceHandle?: string): void { this.deactivateEdgeAndDescendants(sourceId, targetId, sourceHandle) } + /** + * Clear deactivated edges for a set of nodes (used when restoring loop state for next iteration). + * + * Only clears edges whose SOURCE is in the provided set. Edges pointing INTO a node in the set + * whose source lives outside (e.g. an external branch whose path was cascade-deactivated) must + * remain deactivated — otherwise `countActiveIncomingEdges` would count a source that will never + * fire again, stalling the loop on its next iteration. + * + * Deactivated edge keys encode the source separately so node IDs with shared prefixes + * cannot clear each other's deactivated edges. + */ clearDeactivatedEdgesForNodes(nodeIds: Set): void { const edgesToRemove: string[] = [] for (const edgeKey of this.deactivatedEdges) { @@ -179,6 +201,11 @@ export class EdgeManager { return targetNode ? this.isNodeReady(targetNode) : false } + /** + * Checks if the cascade target sentinel belongs to the same subflow as the source node. + * A condition inside a loop that hits a dead-end should still allow the enclosing + * loop's sentinel to fire so the loop can continue or exit. + */ private isEnclosingSentinel(sourceNode: DAGNode, sentinelId: string): boolean { const sentinel = this.dag.nodes.get(sentinelId) if (!sentinel?.metadata.isSentinel) return false @@ -313,6 +340,9 @@ export class EdgeManager { } } + /** + * Checks if a node has any active incoming edges besides the one being excluded. + */ private hasActiveIncomingEdges(node: DAGNode, excludeEdgeKey: string): boolean { for (const incomingSourceId of node.incomingEdges) { const incomingNode = this.dag.nodes.get(incomingSourceId)