feat: add parallel node execution in AgentFlow 2 engine#6483
Conversation
Refactors executeAgentFlow to execute independent nodes at the same topological level concurrently using Promise.all, while maintaining sequential execution between dependent levels. Closes FlowiseAI#4673 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request refactors executeAgentFlow to execute independent nodes concurrently using Promise.all rather than sequentially. While this parallelization improves performance, a critical issue was identified where agentFlowExecutedData gets overwritten during the sequential processing of parallel batch results, leading to lost execution history. It is recommended to selectively append new execution data items for parallel batches to preserve the complete history.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| if (executionResult.agentFlowExecutedData) { | ||
| agentFlowExecutedData = executionResult.agentFlowExecutedData | ||
| } |
There was a problem hiding this comment.
When executing multiple nodes in parallel (currentBatch.length > 1), reassigning agentFlowExecutedData = executionResult.agentFlowExecutedData causes the execution data from previously processed parallel branches in the same batch to be completely overwritten and lost.
To preserve the execution history of all parallel branches, we should only overwrite agentFlowExecutedData directly when there is a single node in the batch (e.g., during human input resumption). For parallel batches, we should extract and append only the new execution data items produced by each branch.
if (executionResult.agentFlowExecutedData) {
if (currentBatch.length === 1) {
agentFlowExecutedData = executionResult.agentFlowExecutedData
} else {
const newExecutedData = executionResult.agentFlowExecutedData.slice(batchExecutedData.length)
agentFlowExecutedData.push(...newExecutedData)
}
}
Enables concurrent execution of independent nodes in AgentFlow 2.
Problem
When multiple LLM nodes are arranged in parallel within the flowchart, the UI implies they should execute concurrently. However, the execution engine processes them sequentially because the main while loop in
executeAgentFlowdequeues one node at a time withnodeExecutionQueue.shift()and awaits each individually.Solution
Refactors the main execution loop to:
Dequeue all ready nodes at the current topological level using
splice(0, queue.length)— all nodes that become ready at the same time are independent and can run concurrently.Execute them in parallel via
Promise.all— each parallel branch receives a deep-cloned copy of the shared mutable state (agentflowRuntime,agentFlowExecutedData) to avoid race conditions.Merge results deterministically — post-processing (state merging, execution-data tracking, SSE events) runs sequentially sorted by
nodeIdfor consistent, reproducible behavior.Preserve the waiting-node dependency mechanism — nodes that depend on multiple parallel branches still wait until all inputs arrive, so the merge point works correctly.
For sequential flows (single node per level), the behavior is identical to before — a single node dequeues, executes, and the next level proceeds.
Changes
packages/server/src/utils/buildAgentflow.ts: Refactored thewhileloop inexecuteAgentFlowto batch-execute same-level nodes withPromise.all.Closes #4673
🤖 Generated with Claude Code