feat: Implement synchronous workflow execution API#11255
Conversation
- Add workflow API endpoints (POST /workflow, GET /workflow, POST /workflow/stop) - Implement developer API protection with settings check - Add comprehensive workflow schema models with proper validation - Create extensive unit test suite covering all scenarios - Apply Ruff linting standards and fix all code quality issues - Support API key authentication for all workflow endpoints
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughAdds a V2 converters module for mapping V1 RunResponse objects to V2 WorkflowExecutionResponse, extends the workflow API with synchronous execution, timeout enforcement, and structured error responses, introduces workflow-specific exceptions, and adds comprehensive unit tests covering conversions and execution error cases. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant WorkflowAPI
participant Parser
participant GraphBuilder
participant GraphExecutor
participant Converter
participant ResponseBuilder
Client->>WorkflowAPI: execute_workflow(workflow_request)
WorkflowAPI->>Parser: parse_flat_inputs(inputs)
Parser-->>WorkflowAPI: tweaks, session_id
WorkflowAPI->>GraphBuilder: build Graph(flow_data, tweaks)
GraphBuilder-->>WorkflowAPI: graph
WorkflowAPI->>GraphExecutor: execute graph
GraphExecutor-->>WorkflowAPI: run_response
WorkflowAPI->>Converter: run_response_to_workflow_response(run_response, flow_id, job_id, workflow_request, graph)
Converter->>Converter: derive terminal nodes, extract contents, build metadata
Converter->>ResponseBuilder: construct WorkflowExecutionResponse
ResponseBuilder-->>WorkflowAPI: response
WorkflowAPI-->>Client: WorkflowExecutionResponse
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 7✅ Passed checks (7 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
|
This PR is now faster! 🚀 Janardan Singh Kavia accepted my code suggestion above. |
Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
|
This PR is now faster! 🚀 Janardan Singh Kavia accepted my code suggestion above. |
Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
| # First pass: collect all terminal vertices and check for duplicate display_names | ||
| terminal_vertices = [graph.get_vertex(vertex_id) for vertex_id in terminal_node_ids] |
There was a problem hiding this comment.
⚡️Codeflash found 60% (0.60x) speedup for run_response_to_workflow_response in src/backend/base/langflow/api/v2/converters.py
⏱️ Runtime : 2.82 milliseconds → 1.76 milliseconds (best of 24 runs)
📝 Explanation and details
The optimized code achieves a 59% speedup (from 2.82ms to 1.76ms) by replacing repeated graph.get_vertex() calls with a single dictionary lookup operation.
Key Optimization
What changed: The code now builds a single vertex_by_id mapping upfront instead of calling graph.get_vertex() for each terminal node:
# Original: O(n*m) complexity - repeated linear searches
terminal_vertices = [graph.get_vertex(vertex_id) for vertex_id in terminal_node_ids]
# Optimized: O(n+m) complexity - single dict creation + O(1) lookups
vertex_by_id = {v.id: v for v in vertices_list}
terminal_vertices = [vertex_by_id[vid] for vid in terminal_node_ids if vid in vertex_by_id]Why it's faster: The line profiler shows the original list comprehension consumed 61.7% of total runtime (26.1ms out of 42.3ms). Each graph.get_vertex() call likely performs a linear search through all vertices. With multiple terminal nodes, this becomes O(n×m) where n is terminal nodes and m is total vertices. The optimized version creates one dictionary (O(m)) then does O(1) lookups (O(n)), reducing complexity to O(n+m).
Impact: The optimization reduces this operation's share from 61.7% to just ~7% of runtime in run_response_to_workflow_response, which is the primary driver of the 59% overall speedup.
Performance Characteristics
Based on the annotated tests:
- Best case: Large-scale scenarios with many terminal vertices (e.g.,
test_large_scale_many_vertices_performance_and_correct_countwith 300 vertices) benefit most since the O(n×m) vs O(n+m) difference becomes significant - Typical case: Workflows with multiple terminal nodes (common pattern shown in tests with 2-4 terminal nodes) see consistent speedup
- Edge case: Single terminal node scenarios see minimal benefit but no regression
The optimization is particularly valuable if run_response_to_workflow_response is called frequently in workflow execution paths, as even the modest 1ms savings per call can accumulate significantly in high-throughput scenarios.
✅ Correctness verification report:
| Test | Status |
|---|---|
| ⚙️ Existing Unit Tests | ✅ 82 Passed |
| 🌀 Generated Regression Tests | ✅ 7 Passed |
| ⏪ Replay Tests | 🔘 None Found |
| 🔎 Concolic Coverage Tests | 🔘 None Found |
| 📊 Tests Coverage | 95.8% |
⚙️ Click to see Existing Unit Tests
🌀 Click to see Generated Regression Tests
from __future__ import annotations
# imports
import sys
import time
import types
from typing import Any
import pytest # used for our unit tests
from langflow.api.v1.schemas import RunResponse
from langflow.api.v2.converters import run_response_to_workflow_response
from lfx.graph.graph.base import Graph
from lfx.schema.workflow import (ComponentOutput, JobStatus,
WorkflowExecutionRequest,
WorkflowExecutionResponse)
class RunResponse:
"""Minimal RunResponse container used by the function under test."""
def __init__(self, outputs=None):
# outputs is expected to be an iterable of run_output objects (or None)
self.outputs = outputs
class Vertex:
"""Vertex representation used by the Graph class in tests."""
def __init__(self, id, outputs=None, vertex_type="", is_output=False, display_name=None):
# outputs: list of dicts where each dict may have a 'types' key (list)
self.id = id
self.outputs = outputs or []
self.vertex_type = vertex_type
self.is_output = is_output
self.display_name = display_name
class Graph:
"""Simple Graph implementation that mimics the minimal behaviour expected by the function."""
def __init__(self, vertices=None, successor_map=None, provide_get_terminal_nodes=True):
# vertices: list of Vertex instances
self.vertices = vertices or []
# successor_map: mapping vertex.id -> list of successor ids
self.successor_map = successor_map or {}
# Whether to expose get_terminal_nodes() or not (to force fallback)
self._provide_get_terminal_nodes = provide_get_terminal_nodes
def get_vertex(self, vertex_id):
for v in self.vertices:
if v.id == vertex_id:
return v
raise KeyError(f"Vertex {vertex_id} not found")
# Optionally provide get_terminal_nodes method. Some tests will create Graph objects
# that do not expose this attribute to force the fallback path in the implementation.
def get_terminal_nodes(self):
if not self._provide_get_terminal_nodes:
# Simulate object without this method by raising AttributeError to mimic access error
raise AttributeError("get_terminal_nodes not available")
# Terminal nodes are those with no successors in successor_map
result = []
for v in self.vertices:
if not self.successor_map.get(v.id):
result.append(v.id)
return result
class ComponentOutput:
"""Represents output for a single component in V2 schema."""
def __init__(self, type, component_id, status, content, metadata):
self.type = type
self.component_id = component_id
self.status = status
self.content = content
self.metadata = metadata
def __eq__(self, other):
if not isinstance(other, ComponentOutput):
return False
return (
self.type == other.type
and self.component_id == other.component_id
and self.status == other.status
and self.content == other.content
and self.metadata == other.metadata
)
def __repr__(self):
return f"ComponentOutput(type={self.type!r}, id={self.component_id!r}, status={self.status!r}, content={self.content!r}, metadata={self.metadata!r})"
class JobStatus:
"""Simple enum-like container for statuses."""
COMPLETED = "completed"
FAILED = "failed"
class WorkflowExecutionRequest:
"""Represents an incoming workflow execution request."""
def __init__(self, inputs=None):
# inputs is a dict echoed back in the response
self.inputs = inputs or {}
class WorkflowExecutionResponse:
"""Represents the V2 response created by the function under test."""
def __init__(self, flow_id, job_id, object, created_timestamp, status, errors, inputs, outputs, metadata):
self.flow_id = flow_id
self.job_id = job_id
self.object = object
self.created_timestamp = created_timestamp
self.status = status
self.errors = errors
self.inputs = inputs
self.outputs = outputs
self.metadata = metadata
def __repr__(self):
return f"WorkflowExecutionResponse(flow_id={self.flow_id!r}, job_id={self.job_id!r}, outputs={list(self.outputs.keys())!r})"
# The original implementation references a few helper functions that are not included in the snippet:
# _extract_text_from_message, _extract_nested_value, _extract_model_source, _extract_file_path
# We define them here with simple, deterministic behaviour that matches the expectations
# of the test inputs below.
def _extract_text_from_message(content):
"""Extract textual content from message-like dicts.
Expected strategies used by tests:
- If content contains {'message': {'text': <str>}} -> return that text
- If content contains {'message': {'content': <str>}} -> return that content
- If content contains {'text': <str>} -> return that text
- Otherwise return None to indicate no extraction possible
"""
if not isinstance(content, dict):
return None
# Common nested variants
message = content.get("message")
if isinstance(message, dict):
if "text" in message and isinstance(message["text"], str):
return message["text"]
if "content" in message and isinstance(message["content"], str):
return message["content"]
if "text" in content and isinstance(content["text"], str):
return content["text"]
return None
def _extract_nested_value(content, key1, key2):
"""Try to fetch content[key1][key2], returning None if any access fails."""
if not isinstance(content, dict):
return None
nested = content.get(key1)
if isinstance(nested, dict):
return nested.get(key2)
return None
def _extract_model_source(raw_content, vertex_id, vertex_display_name):
"""Return a lightweight source description if model info is present."""
if not isinstance(raw_content, dict):
return None
# Accept model info under keys 'model' or 'source'
if "model" in raw_content:
return {"model": raw_content["model"]}
if "source" in raw_content:
return {"model": raw_content["source"]}
return None
def _extract_file_path(raw_content, vertex_type):
"""Return file path if present in raw_content under 'file_path'."""
if not isinstance(raw_content, dict):
return None
return raw_content.get("file_path")
# Now that sys.modules has the expected modules and helper functions available in globals,
# we can include the original function implementation exactly as provided by the user.
# (We must not alter the function signature or logic.)
def _get_raw_content(vertex_output_data: Any) -> Any:
"""Extract raw content from vertex output data.
Tries multiple fields in order: outputs, results, messages.
Note: Uses 'is not None' checks to avoid treating empty collections as missing.
Args:
vertex_output_data: The output data from RunResponse
Returns:
Raw content or None
"""
if hasattr(vertex_output_data, "outputs") and vertex_output_data.outputs is not None:
return vertex_output_data.outputs
if hasattr(vertex_output_data, "results") and vertex_output_data.results is not None:
return vertex_output_data.results
if hasattr(vertex_output_data, "messages") and vertex_output_data.messages is not None:
return vertex_output_data.messages
if isinstance(vertex_output_data, dict):
# Check for 'results' first, then 'content' if results is None
if "results" in vertex_output_data:
return vertex_output_data["results"]
if "content" in vertex_output_data:
return vertex_output_data["content"]
return vertex_output_data
def _simplify_output_content(content: Any, output_type: str) -> Any:
"""Simplify output content for output nodes.
For message types, extracts plain text from nested structures.
For data/dataframe types, extracts the actual data value.
For other types, returns content as-is.
Args:
content: The raw content
output_type: The output type
Returns:
Simplified content
"""
if not isinstance(content, dict):
return content
if output_type in {"message", "text"}:
text = _extract_text_from_message(content)
return text if text is not None else content
if output_type == "data":
# For data types, try multiple path combinations in order
# This allows flexibility for different component output structures
data_paths = [
("result", "message"), # Standard: {'result': {'message': {...}}}
("results", "message"), # Plural variant: {'results': {'message': {...}}}
]
for path in data_paths:
result_data = _extract_nested_value(content, *path)
if result_data is not None:
return result_data
# TODO: Future scope - Add dataframe-specific extraction logic
# The following code is commented out pending further requirements analysis:
if output_type == "dataframe":
# For dataframe types, try multiple path combinations in order
dataframe_paths = [
("results", "message"), # Plural: {'results': {'message': {...}}}
("result", "message"), # Singular fallback: {'result': {'message': {...}}}
("run_sql_query", "message"), # SQL component specific
]
for path in dataframe_paths:
dataframe_data = _extract_nested_value(content, *path)
if dataframe_data is not None:
return dataframe_data
return content
def _build_metadata_for_non_output(
raw_content: Any, vertex_id: str, vertex_display_name: str, vertex_type: str, output_type: str
) -> dict[str, Any]:
"""Build metadata for non-output terminal nodes.
Extracts:
- source: Model information for LLM components
- file_path: File path for SaveToFile components
Args:
raw_content: The raw output data
vertex_id: Vertex ID
vertex_display_name: Vertex display name
vertex_type: Vertex type
output_type: Output type
Returns:
Metadata dict
"""
metadata: dict[str, Any] = {}
if output_type != "message" or not isinstance(raw_content, dict):
return metadata
# Extract model source for LLM components
source_info = _extract_model_source(raw_content, vertex_id, vertex_display_name)
if source_info:
metadata["source"] = source_info
# Extract file path for SaveToFile components
file_path = _extract_file_path(raw_content, vertex_type)
if file_path:
metadata["file_path"] = file_path
return metadata
def _process_terminal_vertex(
vertex: Any,
output_data_map: dict[str, Any],
display_name_counts: dict[str, int],
) -> tuple[str, ComponentOutput]:
"""Process a single terminal vertex and return (output_key, component_output).
Args:
vertex: The vertex to process
output_data_map: Map of component_id to output data
display_name_counts: Count of each display_name for duplicate detection
Returns:
Tuple of (output_key, ComponentOutput)
"""
# Get output data by vertex.id (component_id)
vertex_output_data = output_data_map.get(vertex.id)
# Determine output type from vertex
output_type = "unknown"
if vertex.outputs and len(vertex.outputs) > 0:
types = vertex.outputs[0].get("types", [])
if types:
output_type = types[0].lower()
if output_type == "unknown" and vertex.vertex_type:
output_type = vertex.vertex_type.lower()
# Initialize metadata with component_type
metadata: dict[str, Any] = {"component_type": vertex.vertex_type}
# Extract content
content = None
if vertex_output_data:
raw_content = _get_raw_content(vertex_output_data)
if vertex.is_output and raw_content is not None:
# Output nodes: simplify content
content = _simplify_output_content(raw_content, output_type)
elif not vertex.is_output and raw_content is not None:
# Non-output nodes:
# - For data types: extract and show content
# - For message types: extract metadata only (source, file_path)
# TODO: Future scope - Add support for "dataframe" output type
if output_type in ["data", "dataframe"]:
# Show data content for non-output data nodes
content = _simplify_output_content(raw_content, output_type)
else:
# For message types, extract metadata only
extra_metadata = _build_metadata_for_non_output(
raw_content,
vertex.id,
vertex.display_name or vertex.vertex_type,
vertex.vertex_type,
output_type,
)
metadata.update(extra_metadata)
# Add any additional metadata from result data
if hasattr(vertex_output_data, "metadata") and vertex_output_data.metadata:
metadata.update(vertex_output_data.metadata)
elif isinstance(vertex_output_data, dict) and "metadata" in vertex_output_data:
result_metadata = vertex_output_data.get("metadata")
if isinstance(result_metadata, dict):
metadata.update(result_metadata)
# Build ComponentOutput
component_output = ComponentOutput(
type=output_type,
component_id=vertex.id,
status=JobStatus.COMPLETED,
content=content,
metadata=metadata,
)
# Determine output key: use display_name if unique, otherwise use id
display_name = vertex.display_name or vertex.id
if display_name_counts.get(display_name, 0) > 1:
# Duplicate display_name detected, use id instead
output_key = vertex.id
# Store the display_name in metadata for reference
if vertex.display_name and vertex.display_name != vertex.id:
metadata["display_name"] = vertex.display_name
else:
# Unique display_name, use it as key
output_key = display_name
return output_key, component_output
from langflow.api.v2.converters import run_response_to_workflow_response
# unit tests
# Helper lightweight containers used to assemble run_response.outputs structure expected by the function
class ResultDataObj:
"""Represents a single result_data element inside run_output.outputs."""
def __init__(self, component_id=None, outputs=None, results=None, messages=None, metadata=None):
# These attributes are accessed by the function using hasattr checks
self.component_id = component_id
self.outputs = outputs
self.results = results
self.messages = messages
self.metadata = metadata
class RunOutputContainer:
"""Represents an element in RunResponse.outputs with an 'outputs' attribute."""
def __init__(self, outputs=None):
self.outputs = outputs or []
def test_basic_message_output_shows_simplified_text():
# Basic scenario: single terminal node which is an output node producing a message.
# The function should expose simplified textual content (extracted by _extract_text_from_message).
# Create a terminal vertex that is an output node with declared message type
v = Vertex(id="v1", outputs=[{"types": ["message"]}], vertex_type="ChatOutput", is_output=True, display_name="Chat")
# Create a run_response where the single result_data has outputs containing a 'message' dict
message_payload = {"message": {"content": "Hello, world!"}}
result_data = ResultDataObj(component_id="v1", outputs=message_payload)
run_output = RunOutputContainer(outputs=[result_data])
rr = RunResponse(outputs=[run_output])
# Create graph with the single terminal vertex
g = Graph(vertices=[v], successor_map={}, provide_get_terminal_nodes=True)
# Build workflow request
req = WorkflowExecutionRequest(inputs={"x": 1})
# Execute transformation
codeflash_output = run_response_to_workflow_response(rr, flow_id="flow1", job_id="job1", workflow_request=req, graph=g); resp = codeflash_output
comp_out = resp.outputs["Chat"]
def test_non_output_llm_node_exposes_only_metadata():
# Edge scenario: a non-output LLM node (is_output=False, type=message) should not expose textual content,
# but should expose metadata like source and file_path extracted from the raw content.
v = Vertex(id="llm1", outputs=[{"types": ["message"]}], vertex_type="LLM", is_output=False, display_name="LLM_Model")
# Provide raw_content that contains a model and file_path so metadata extraction can pick them up
raw = {"message": {"some": "info"}, "model": "gpt-test", "file_path": "/tmp/model.info"}
result_data = ResultDataObj(component_id="llm1", outputs=raw)
rr = RunResponse(outputs=[RunOutputContainer(outputs=[result_data])])
g = Graph(vertices=[v], successor_map={}, provide_get_terminal_nodes=True)
req = WorkflowExecutionRequest(inputs={})
codeflash_output = run_response_to_workflow_response(rr, flow_id="flowX", job_id="jobY", workflow_request=req, graph=g); resp = codeflash_output
comp_out = resp.outputs["LLM_Model"]
def test_data_node_non_output_exposes_data_content():
# Non-output node with type data should expose content: we expect the function to extract nested
# data via ('result', 'message') or ('results','message') paths.
v = Vertex(id="data1", outputs=[{"types": ["data"]}], vertex_type="DataNode", is_output=False, display_name="DataNode")
# Create nested structure that matches ('result','message')
nested = {"result": {"message": {"value": [1, 2, 3]}}}
result_data = ResultDataObj(component_id="data1", outputs=nested)
rr = RunResponse(outputs=[RunOutputContainer(outputs=[result_data])])
g = Graph(vertices=[v], successor_map={}, provide_get_terminal_nodes=True)
req = WorkflowExecutionRequest()
codeflash_output = run_response_to_workflow_response(rr, flow_id="flow-data", job_id="job-data", workflow_request=req, graph=g); resp = codeflash_output
comp_out = resp.outputs["DataNode"]
def test_duplicate_display_names_uses_ids_and_preserves_display_name_in_metadata():
# Edge case: two terminal nodes share the same display_name. The function must use their ids as keys
# and include the display_name in metadata for each entry.
v1 = Vertex(id="v_dup1", outputs=[{"types": ["text"]}], vertex_type="T", is_output=True, display_name="DupName")
v2 = Vertex(id="v_dup2", outputs=[{"types": ["text"]}], vertex_type="T", is_output=True, display_name="DupName")
# Provide outputs for both components
r1 = ResultDataObj(component_id="v_dup1", outputs={"text": {"content": "one"}})
r2 = ResultDataObj(component_id="v_dup2", outputs={"text": {"content": "two"}})
rr = RunResponse(outputs=[RunOutputContainer(outputs=[r1, r2])])
g = Graph(vertices=[v1, v2], successor_map={}, provide_get_terminal_nodes=True)
req = WorkflowExecutionRequest()
codeflash_output = run_response_to_workflow_response(rr, flow_id="flow-d", job_id="job-d", workflow_request=req, graph=g); resp = codeflash_output
co1 = resp.outputs["v_dup1"]
co2 = resp.outputs["v_dup2"]
def test_missing_get_terminal_nodes_fallback_and_empty_run_outputs():
# Edge scenario: graph does not provide get_terminal_nodes method (AttributeError path).
# Also test run_response.outputs is empty/None and ensure outputs still created with no content.
# This ensures the fallback path using successor_map is used.
# Two vertices with no successors
v1 = Vertex(id="a", outputs=[{"types": ["message"]}], vertex_type="A", is_output=False, display_name="AName")
v2 = Vertex(id="b", outputs=[{"types": ["data"]}], vertex_type="B", is_output=False, display_name="BName")
# Graph that will trigger AttributeError for get_terminal_nodes by setting provide_get_terminal_nodes=False
g = Graph(vertices=[v1, v2], successor_map={}, provide_get_terminal_nodes=False)
# RunResponse with outputs None (no data)
rr = RunResponse(outputs=None)
req = WorkflowExecutionRequest(inputs={"k": "v"})
codeflash_output = run_response_to_workflow_response(rr, flow_id="f", job_id="j", workflow_request=req, graph=g); resp = codeflash_output
def test_result_data_as_plain_dict_merges_metadata_and_supports_results_key():
# Edge scenario: run_output.outputs may contain plain dicts (not objects).
# The function should accept dicts and use the 'results' key and merge metadata if present.
v = Vertex(id="plain1", outputs=[{"types": ["message"]}], vertex_type="Plain", is_output=True, display_name="PlainOut")
# result represented as dict with 'results' and 'metadata'
result_dict = {
"component_id": "plain1",
"results": {"message": {"content": "plain"}},
"metadata": {"mkey": "mval"},
}
rr = RunResponse(outputs=[RunOutputContainer(outputs=[result_dict])])
g = Graph(vertices=[v], successor_map={}, provide_get_terminal_nodes=True)
req = WorkflowExecutionRequest()
codeflash_output = run_response_to_workflow_response(rr, flow_id="flow-plain", job_id="job-plain", workflow_request=req, graph=g); resp = codeflash_output
comp = resp.outputs["PlainOut"]
def test_large_scale_many_vertices_performance_and_correct_count():
# Large scale test: create many terminal vertices (within the 1000 elements limit).
# The function should process them all and provide an output entry per terminal vertex.
n = 300 # well under the 1000 limit requested by the user
vertices = []
run_output_objs = []
# We'll create N vertices, half will have corresponding run outputs, the rest won't.
for i in range(n):
vid = f"v{i}"
# Alternate types to exercise the different branches ('message' vs 'data')
if i % 2 == 0:
v = Vertex(id=vid, outputs=[{"types": ["message"]}], vertex_type="Msg", is_output=(i % 4 == 0), display_name=f"Name{i}")
# Provide run output for even indices
payload = {"message": {"content": f"text-{i}"}}
else:
v = Vertex(id=vid, outputs=[{"types": ["data"]}], vertex_type="Data", is_output=False, display_name=f"Name{i}")
# Provide run output for odd indices as well to exercise data extraction path
payload = {"result": {"message": {"value": i}}}
vertices.append(v)
# Create result_data for every vertex so outputs are present for all - this stresses the mapping logic
rd = ResultDataObj(component_id=vid, outputs=payload, metadata={"idx": i})
run_output_objs.append(rd)
rr = RunResponse(outputs=[RunOutputContainer(outputs=run_output_objs)])
# Build successor_map empty so all vertices are terminal
succ = {}
g = Graph(vertices=vertices, successor_map=succ, provide_get_terminal_nodes=True)
req = WorkflowExecutionRequest(inputs={"flag": True})
codeflash_output = run_response_to_workflow_response(rr, flow_id="flow-many", job_id="job-many", workflow_request=req, graph=g); resp = codeflash_output
# Spot check a few entries for correct content extraction and metadata merging
sample_even = resp.outputs["Name0"] # even index, possibly is_output True for i=0
# For an odd index (data path) ensure nested value extracted successfully
sample_odd = resp.outputs["Name1"]
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
from typing import Any
from unittest.mock import MagicMock, Mock
import pytest
from langflow.api.v1.schemas import RunResponse
from langflow.api.v2.converters import run_response_to_workflow_response
from lfx.graph.graph.base import Graph
from lfx.schema.workflow import (ComponentOutput, JobStatus,
WorkflowExecutionRequest,
WorkflowExecutionResponse)To test or edit this optimization locally git merge codeflash/optimize-pr11255-2026-01-20T00.36.50
Click to see suggested changes
| # First pass: collect all terminal vertices and check for duplicate display_names | |
| terminal_vertices = [graph.get_vertex(vertex_id) for vertex_id in terminal_node_ids] | |
| # Build terminal_vertices efficiently by creating a single id->vertex map | |
| # This avoids repeated graph.get_vertex() calls which can be expensive | |
| vertices_list = getattr(graph, "vertices", None) | |
| if vertices_list is not None: | |
| vertex_by_id = {v.id: v for v in vertices_list} | |
| terminal_vertices = [vertex_by_id[vid] for vid in terminal_node_ids if vid in vertex_by_id] | |
| else: | |
| # Fallback if vertices attribute is not available | |
| terminal_vertices = [graph.get_vertex(vertex_id) for vertex_id in terminal_node_ids] | |
| # First pass: collect all terminal vertices and check for duplicate display_names |
|
For posterity. Will add logging in a follow up pull request once functionality is merged and testable. |
# Conflicts: # src/lfx/src/lfx/_assets/component_index.json
* feat: Create controller shell and schema model for new workflow API - Add workflow API endpoints (POST /workflow, GET /workflow, POST /workflow/stop) - Implement developer API protection with settings check - Add comprehensive workflow schema models with proper validation - Create extensive unit test suite covering all scenarios - Apply Ruff linting standards and fix all code quality issues - Support API key authentication for all workflow endpoints * fix: Move developer API check to router-level dependency * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) * fix: Remove response model as its automatically configured. Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org> * [autofix.ci] apply automated fixes * feat: Implement synchronous workflow execution API * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) * Apply suggestion from @codeflash-ai[bot] Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com> * Apply suggestion from @codeflash-ai[bot] Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com> * Apply suggestion from @codeflash-ai[bot] Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com> * [autofix.ci] apply automated fixes * Clean up V2 Workflow API code and add unit tests * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) * fix: CodeRabbit review fixes * fix: improve V2 workflow API test quality and code structure * Fix ruff whitespace issues in v2 API test files * [autofix.ci] apply automated fixes * Fix mypy type errors in v2 API files * [autofix.ci] apply automated fixes * refactor: improve data extraction with flexible path combinations and prevent flow data mutation * fix: resolve component index merge conflict after sync * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) * refactor: move ComponentOutput construction to avoid dict change after construction * [autofix.ci] apply automated fixes --------- Co-authored-by: Janardan S Kavia <janardanskavia@Janardans-MacBook-Pro.local> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org> Co-authored-by: Janardan S Kavia <janardanskavia@mac.war.can.ibm.com> Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
Overview
Adds synchronous workflow execution endpoint at /api/v2/workflow/{flow_id}/run with simplified input structure and complete structured responses.
Key Features
Flat Input Structure
Uses dot notation instead of nested tweaks:
Complete Response Format
Returns all terminal node outputs with:
Simplified content for output nodes (ChatOutput, TextOutput)
Full metadata for debugging (component types, model sources, file paths)
Structured format for easy parsing
Changes
New:
src/backend/base/langflow/api/v2/converters.py - Input parsing and output conversion utilities
Modified:
src/backend/base/langflow/api/v2/workflow.py - Added synchronous execution endpoint
Tested
Response includes all terminal outputs with proper metadata.
Summary by CodeRabbit
New Features
Bug Fixes / Error Handling
Tests
✏️ Tip: You can customize this high-level summary in your review settings.