diff --git a/packages/server/src/utils/buildAgentflow.ts b/packages/server/src/utils/buildAgentflow.ts index e38ccbec092..8d9d66c2dfa 100644 --- a/packages/server/src/utils/buildAgentflow.ts +++ b/packages/server/src/utils/buildAgentflow.ts @@ -2029,67 +2029,156 @@ export const executeAgentFlow = async ({ throw new Error('Maximum iteration limit reached') } - const currentNode = nodeExecutionQueue.shift() - if (!currentNode) continue + // Check for abort signal early in the loop + if (abortController?.signal?.aborted) { + throw new Error('Aborted') + } - const reactFlowNode = nodes.find((nd) => nd.id === currentNode.nodeId) - if (!reactFlowNode || reactFlowNode === undefined || reactFlowNode.data.name === 'stickyNoteAgentflow') continue + // Dequeue all nodes that are ready at this topological level. + // Independent nodes (e.g. parallel LLM branches) all become ready + // at the same time after their shared parent finishes, so we can + // execute them concurrently with Promise.all. + const currentBatch = nodeExecutionQueue.splice(0, nodeExecutionQueue.length) + + interface IBatchResult { + currentNode: INodeQueue + reactFlowNode: IReactFlowNode | undefined + executionResult: any + error: any + } + + // Snapshot shared mutable state so each parallel branch starts from + // the same baseline. executeNode mutates agentflowRuntime and + // agentFlowExecutedData in-place, so each parallel invocation must + // receive its own deep clone. + const batchRuntime = cloneDeep(agentflowRuntime) + const batchExecutedData = cloneDeep(agentFlowExecutedData) + + const batchResults: IBatchResult[] = await Promise.all( + currentBatch.map( + async (currentNode): Promise => { + const reactFlowNode = nodes.find((nd) => nd.id === currentNode.nodeId) + if (!reactFlowNode || reactFlowNode === undefined || reactFlowNode.data.name === 'stickyNoteAgentflow') { + return { currentNode, reactFlowNode: undefined, executionResult: null, error: null } + } - let nodeResult - try { - // Check for abort signal early in the loop - if (abortController?.signal?.aborted) { - throw new Error('Aborted') - } + try { + logger.debug(` 🎯 Executing node: ${reactFlowNode?.data.label}`) + + const executionResult = await executeNode({ + nodeId: currentNode.nodeId, + reactFlowNode, + nodes, + edges, + graph, + reversedGraph, + incomingInput, + chatflow, + chatId, + sessionId, + apiMessageId, + evaluationRunId, + parentExecutionId, + isInternal, + pastChatHistory, + prependedChatHistory, + appDataSource, + usageCacheManager, + telemetry, + componentNodes, + cachePool, + sseStreamer, + baseURL, + overrideConfig, + apiOverrideStatus, + nodeOverrides, + variableOverrides, + uploadedFilesContent, + fileUploads, + humanInput: currentHumanInput, + agentFlowExecutedData: cloneDeep(batchExecutedData), + agentflowRuntime: cloneDeep(batchRuntime), + abortController, + parentTraceIds, + analyticHandlers, + isRecursive, + iterationContext, + loopCounts, + orgId, + workspaceId, + subscriptionId, + productId + }) + + return { currentNode, reactFlowNode, executionResult, error: null } + } catch (error) { + return { currentNode, reactFlowNode, executionResult: null, error } + } + } + ) + ) - logger.debug(` 🎯 Executing node: ${reactFlowNode?.data.label}`) + // Sort results by nodeId for deterministic post-processing order. + batchResults.sort((a, b) => a.currentNode.nodeId.localeCompare(b.currentNode.nodeId)) - // Execute current node - const executionResult = await executeNode({ + // Check for errors first — if any node in the batch failed we + // report the first error and stop, matching the original + // sequential behaviour. + const firstError = batchResults.find((r) => r.error !== null) + if (firstError) { + const { currentNode, reactFlowNode, error } = firstError + const isAborted = getErrorMessage(error).includes('Aborted') + const errorStatus = isAborted ? 'TERMINATED' : 'ERROR' + const errorMessage = isAborted ? 'Flow execution was cancelled' : getErrorMessage(error) + + status = errorStatus + + // Add error info to execution data + agentFlowExecutedData.push({ nodeId: currentNode.nodeId, - reactFlowNode, - nodes, - edges, - graph, - reversedGraph, - incomingInput, - chatflow, - chatId, - sessionId, - apiMessageId, - evaluationRunId, - parentExecutionId, - isInternal, - pastChatHistory, - prependedChatHistory, - appDataSource, - usageCacheManager, - telemetry, - componentNodes, - cachePool, - sseStreamer, - baseURL, - overrideConfig, - apiOverrideStatus, - nodeOverrides, - variableOverrides, - uploadedFilesContent, - fileUploads, - humanInput: currentHumanInput, - agentFlowExecutedData, - agentflowRuntime, - abortController, - parentTraceIds, - analyticHandlers, - isRecursive, - iterationContext, - loopCounts, - orgId, - workspaceId, - subscriptionId, - productId + nodeLabel: reactFlowNode?.data?.label ?? currentNode.nodeId, + previousNodeIds: reversedGraph[currentNode.nodeId] || [], + data: { + id: currentNode.nodeId, + name: reactFlowNode?.data?.name ?? '', + error: errorMessage + }, + status: errorStatus + }) + + // Stream events to client + sseStreamer?.streamNextAgentFlowEvent(chatId, { + nodeId: currentNode.nodeId, + nodeLabel: reactFlowNode?.data?.label ?? currentNode.nodeId, + status: errorStatus, + error: isAborted ? undefined : errorMessage }) + // Only update execution record if this is not a recursive call + if (!isRecursive) { + sseStreamer?.streamAgentFlowExecutedDataEvent(chatId, agentFlowExecutedData) + + await updateExecution(appDataSource, newExecution.id, workspaceId, { + executionData: JSON.stringify(agentFlowExecutedData), + state: errorStatus + }) + + sseStreamer?.streamAgentFlowEvent(chatId, errorStatus) + } + + if (parentTraceIds && analyticHandlers) { + await analyticHandlers.onChainError(parentTraceIds, errorMessage, true) + } + + throw new Error(errorMessage) + } + + // Process successful results sequentially for deterministic + // state merging and queue management. + for (const batchResult of batchResults) { + const { currentNode, reactFlowNode, executionResult } = batchResult + if (!reactFlowNode || !executionResult) continue + if (executionResult.agentFlowExecutedData) { agentFlowExecutedData = executionResult.agentFlowExecutedData } @@ -2104,7 +2193,7 @@ export const executeAgentFlow = async ({ break } - nodeResult = executionResult.result + const nodeResult = executionResult.result // Add execution data agentFlowExecutedData.push({ @@ -2123,9 +2212,12 @@ export const executeAgentFlow = async ({ if (!isRecursive) sseStreamer?.streamAgentFlowExecutedDataEvent(chatId, agentFlowExecutedData) - // Add to agentflow runtime state + // Merge runtime state from the parallel branch back into the + // shared runtime. Independent branches may each produce + // additive state changes; we merge them so downstream nodes + // see the combined picture. if (nodeResult && nodeResult.state) { - agentflowRuntime.state = nodeResult.state + agentflowRuntime.state = { ...agentflowRuntime.state, ...nodeResult.state } } if (nodeResult && nodeResult.chatHistory) { @@ -2160,53 +2252,11 @@ export const executeAgentFlow = async ({ if (processResult.humanInput !== currentHumanInput) { currentHumanInput = processResult.humanInput } - } catch (error) { - const isAborted = getErrorMessage(error).includes('Aborted') - const errorStatus = isAborted ? 'TERMINATED' : 'ERROR' - const errorMessage = isAborted ? 'Flow execution was cancelled' : getErrorMessage(error) - - status = errorStatus - - // Add error info to execution data - agentFlowExecutedData.push({ - nodeId: currentNode.nodeId, - nodeLabel: reactFlowNode.data.label, - previousNodeIds: reversedGraph[currentNode.nodeId] || [], - data: { - id: currentNode.nodeId, - name: reactFlowNode.data.name, - error: errorMessage - }, - status: errorStatus - }) - - // Stream events to client - sseStreamer?.streamNextAgentFlowEvent(chatId, { - nodeId: currentNode.nodeId, - nodeLabel: reactFlowNode.data.label, - status: errorStatus, - error: isAborted ? undefined : errorMessage - }) - - // Only update execution record if this is not a recursive call - if (!isRecursive) { - sseStreamer?.streamAgentFlowExecutedDataEvent(chatId, agentFlowExecutedData) - - await updateExecution(appDataSource, newExecution.id, workspaceId, { - executionData: JSON.stringify(agentFlowExecutedData), - state: errorStatus - }) - - sseStreamer?.streamAgentFlowEvent(chatId, errorStatus) - } - - if (parentTraceIds && analyticHandlers) { - await analyticHandlers.onChainError(parentTraceIds, errorMessage, true) - } - - throw new Error(errorMessage) } + // If we hit a STOPPED status, break out of the loop + if (status === 'STOPPED') break + logger.debug(`/////////////////////////////////////////////////////////////////////////////`) }