Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 152 additions & 102 deletions packages/server/src/utils/buildAgentflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2029,67 +2029,156 @@ export const executeAgentFlow = async ({
throw new Error('Maximum iteration limit reached')
}

const currentNode = nodeExecutionQueue.shift()
if (!currentNode) continue
// Check for abort signal early in the loop
if (abortController?.signal?.aborted) {
throw new Error('Aborted')
}

const reactFlowNode = nodes.find((nd) => nd.id === currentNode.nodeId)
if (!reactFlowNode || reactFlowNode === undefined || reactFlowNode.data.name === 'stickyNoteAgentflow') continue
// Dequeue all nodes that are ready at this topological level.
// Independent nodes (e.g. parallel LLM branches) all become ready
// at the same time after their shared parent finishes, so we can
// execute them concurrently with Promise.all.
const currentBatch = nodeExecutionQueue.splice(0, nodeExecutionQueue.length)

interface IBatchResult {
currentNode: INodeQueue
reactFlowNode: IReactFlowNode | undefined
executionResult: any
error: any
}

// Snapshot shared mutable state so each parallel branch starts from
// the same baseline. executeNode mutates agentflowRuntime and
// agentFlowExecutedData in-place, so each parallel invocation must
// receive its own deep clone.
const batchRuntime = cloneDeep(agentflowRuntime)
const batchExecutedData = cloneDeep(agentFlowExecutedData)

const batchResults: IBatchResult[] = await Promise.all(
currentBatch.map(
async (currentNode): Promise<IBatchResult> => {
const reactFlowNode = nodes.find((nd) => nd.id === currentNode.nodeId)
if (!reactFlowNode || reactFlowNode === undefined || reactFlowNode.data.name === 'stickyNoteAgentflow') {
return { currentNode, reactFlowNode: undefined, executionResult: null, error: null }
}

let nodeResult
try {
// Check for abort signal early in the loop
if (abortController?.signal?.aborted) {
throw new Error('Aborted')
}
try {
logger.debug(` 🎯 Executing node: ${reactFlowNode?.data.label}`)

const executionResult = await executeNode({
nodeId: currentNode.nodeId,
reactFlowNode,
nodes,
edges,
graph,
reversedGraph,
incomingInput,
chatflow,
chatId,
sessionId,
apiMessageId,
evaluationRunId,
parentExecutionId,
isInternal,
pastChatHistory,
prependedChatHistory,
appDataSource,
usageCacheManager,
telemetry,
componentNodes,
cachePool,
sseStreamer,
baseURL,
overrideConfig,
apiOverrideStatus,
nodeOverrides,
variableOverrides,
uploadedFilesContent,
fileUploads,
humanInput: currentHumanInput,
agentFlowExecutedData: cloneDeep(batchExecutedData),
agentflowRuntime: cloneDeep(batchRuntime),
abortController,
parentTraceIds,
analyticHandlers,
isRecursive,
iterationContext,
loopCounts,
orgId,
workspaceId,
subscriptionId,
productId
})

return { currentNode, reactFlowNode, executionResult, error: null }
} catch (error) {
return { currentNode, reactFlowNode, executionResult: null, error }
}
}
)
)

logger.debug(` 🎯 Executing node: ${reactFlowNode?.data.label}`)
// Sort results by nodeId for deterministic post-processing order.
batchResults.sort((a, b) => a.currentNode.nodeId.localeCompare(b.currentNode.nodeId))

// Execute current node
const executionResult = await executeNode({
// Check for errors first — if any node in the batch failed we
// report the first error and stop, matching the original
// sequential behaviour.
const firstError = batchResults.find((r) => r.error !== null)
if (firstError) {
const { currentNode, reactFlowNode, error } = firstError
const isAborted = getErrorMessage(error).includes('Aborted')
const errorStatus = isAborted ? 'TERMINATED' : 'ERROR'
const errorMessage = isAborted ? 'Flow execution was cancelled' : getErrorMessage(error)

status = errorStatus

// Add error info to execution data
agentFlowExecutedData.push({
nodeId: currentNode.nodeId,
reactFlowNode,
nodes,
edges,
graph,
reversedGraph,
incomingInput,
chatflow,
chatId,
sessionId,
apiMessageId,
evaluationRunId,
parentExecutionId,
isInternal,
pastChatHistory,
prependedChatHistory,
appDataSource,
usageCacheManager,
telemetry,
componentNodes,
cachePool,
sseStreamer,
baseURL,
overrideConfig,
apiOverrideStatus,
nodeOverrides,
variableOverrides,
uploadedFilesContent,
fileUploads,
humanInput: currentHumanInput,
agentFlowExecutedData,
agentflowRuntime,
abortController,
parentTraceIds,
analyticHandlers,
isRecursive,
iterationContext,
loopCounts,
orgId,
workspaceId,
subscriptionId,
productId
nodeLabel: reactFlowNode?.data?.label ?? currentNode.nodeId,
previousNodeIds: reversedGraph[currentNode.nodeId] || [],
data: {
id: currentNode.nodeId,
name: reactFlowNode?.data?.name ?? '',
error: errorMessage
},
status: errorStatus
})

// Stream events to client
sseStreamer?.streamNextAgentFlowEvent(chatId, {
nodeId: currentNode.nodeId,
nodeLabel: reactFlowNode?.data?.label ?? currentNode.nodeId,
status: errorStatus,
error: isAborted ? undefined : errorMessage
})

// Only update execution record if this is not a recursive call
if (!isRecursive) {
sseStreamer?.streamAgentFlowExecutedDataEvent(chatId, agentFlowExecutedData)

await updateExecution(appDataSource, newExecution.id, workspaceId, {
executionData: JSON.stringify(agentFlowExecutedData),
state: errorStatus
})

sseStreamer?.streamAgentFlowEvent(chatId, errorStatus)
}

if (parentTraceIds && analyticHandlers) {
await analyticHandlers.onChainError(parentTraceIds, errorMessage, true)
}

throw new Error(errorMessage)
}

// Process successful results sequentially for deterministic
// state merging and queue management.
for (const batchResult of batchResults) {
const { currentNode, reactFlowNode, executionResult } = batchResult
if (!reactFlowNode || !executionResult) continue

if (executionResult.agentFlowExecutedData) {
agentFlowExecutedData = executionResult.agentFlowExecutedData
}
Comment on lines 2182 to 2184
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When executing multiple nodes in parallel (currentBatch.length > 1), reassigning agentFlowExecutedData = executionResult.agentFlowExecutedData causes the execution data from previously processed parallel branches in the same batch to be completely overwritten and lost.

To preserve the execution history of all parallel branches, we should only overwrite agentFlowExecutedData directly when there is a single node in the batch (e.g., during human input resumption). For parallel batches, we should extract and append only the new execution data items produced by each branch.

            if (executionResult.agentFlowExecutedData) {
                if (currentBatch.length === 1) {
                    agentFlowExecutedData = executionResult.agentFlowExecutedData
                } else {
                    const newExecutedData = executionResult.agentFlowExecutedData.slice(batchExecutedData.length)
                    agentFlowExecutedData.push(...newExecutedData)
                }
            }

Expand All @@ -2104,7 +2193,7 @@ export const executeAgentFlow = async ({
break
}

nodeResult = executionResult.result
const nodeResult = executionResult.result

// Add execution data
agentFlowExecutedData.push({
Expand All @@ -2123,9 +2212,12 @@ export const executeAgentFlow = async ({

if (!isRecursive) sseStreamer?.streamAgentFlowExecutedDataEvent(chatId, agentFlowExecutedData)

// Add to agentflow runtime state
// Merge runtime state from the parallel branch back into the
// shared runtime. Independent branches may each produce
// additive state changes; we merge them so downstream nodes
// see the combined picture.
if (nodeResult && nodeResult.state) {
agentflowRuntime.state = nodeResult.state
agentflowRuntime.state = { ...agentflowRuntime.state, ...nodeResult.state }
}

if (nodeResult && nodeResult.chatHistory) {
Expand Down Expand Up @@ -2160,53 +2252,11 @@ export const executeAgentFlow = async ({
if (processResult.humanInput !== currentHumanInput) {
currentHumanInput = processResult.humanInput
}
} catch (error) {
const isAborted = getErrorMessage(error).includes('Aborted')
const errorStatus = isAborted ? 'TERMINATED' : 'ERROR'
const errorMessage = isAborted ? 'Flow execution was cancelled' : getErrorMessage(error)

status = errorStatus

// Add error info to execution data
agentFlowExecutedData.push({
nodeId: currentNode.nodeId,
nodeLabel: reactFlowNode.data.label,
previousNodeIds: reversedGraph[currentNode.nodeId] || [],
data: {
id: currentNode.nodeId,
name: reactFlowNode.data.name,
error: errorMessage
},
status: errorStatus
})

// Stream events to client
sseStreamer?.streamNextAgentFlowEvent(chatId, {
nodeId: currentNode.nodeId,
nodeLabel: reactFlowNode.data.label,
status: errorStatus,
error: isAborted ? undefined : errorMessage
})

// Only update execution record if this is not a recursive call
if (!isRecursive) {
sseStreamer?.streamAgentFlowExecutedDataEvent(chatId, agentFlowExecutedData)

await updateExecution(appDataSource, newExecution.id, workspaceId, {
executionData: JSON.stringify(agentFlowExecutedData),
state: errorStatus
})

sseStreamer?.streamAgentFlowEvent(chatId, errorStatus)
}

if (parentTraceIds && analyticHandlers) {
await analyticHandlers.onChainError(parentTraceIds, errorMessage, true)
}

throw new Error(errorMessage)
}

// If we hit a STOPPED status, break out of the loop
if (status === 'STOPPED') break

logger.debug(`/////////////////////////////////////////////////////////////////////////////`)
}

Expand Down