Skip to content

feat: Implement synchronous workflow execution API#11255

Merged
Jkavia merged 38 commits intomainfrom
developer-api
Jan 21, 2026
Merged

feat: Implement synchronous workflow execution API#11255
Jkavia merged 38 commits intomainfrom
developer-api

Conversation

@Jkavia
Copy link
Copy Markdown
Collaborator

@Jkavia Jkavia commented Jan 9, 2026

Overview

Adds synchronous workflow execution endpoint at /api/v2/workflow/{flow_id}/run with simplified input structure and complete structured responses.

Key Features
Flat Input Structure
Uses dot notation instead of nested tweaks:


{
  "inputs": {
    "ChatInput-abc.input_value": "What is the tallest building?",
    "Prompt-xyz.template": "Rewrite as a philosopher would"
  }
}

Complete Response Format
Returns all terminal node outputs with:

Simplified content for output nodes (ChatOutput, TextOutput)
Full metadata for debugging (component types, model sources, file paths)
Structured format for easy parsing

Changes

New:
src/backend/base/langflow/api/v2/converters.py - Input parsing and output conversion utilities
Modified:
src/backend/base/langflow/api/v2/workflow.py - Added synchronous execution endpoint

Tested

curl -X POST http://localhost:7860/api/v2/workflow \
  -H 'Content-Type: application/json' \
  -d '{"flow_id": "...", "inputs": {"ChatInput-x.input_value": "test"}}'

Screenshot 2026-01-08 at 7 23 43 PM

Response includes all terminal outputs with proper metadata.

Screenshot 2026-01-08 at 2 44 29 PM

Summary by CodeRabbit

  • New Features

    • Synchronous workflow execution now enforces a timeout and returns clear 504 on timeouts.
    • API inputs now use flat component-specific keys (e.g., "component.param") in execution requests.
    • Background and streaming execution paths currently return Not Implemented (501).
  • Bug Fixes / Error Handling

    • Richer structured error responses with explicit error codes and clearer messages for missing flows, invalid data, DB errors, and disabled developer API (403).
  • Tests

    • Expanded unit tests covering converters, execution paths, and error scenarios.

✏️ Tip: You can customize this high-level summary in your review settings.

Janardan S Kavia and others added 12 commits January 2, 2026 12:29
- Add workflow API endpoints (POST /workflow, GET /workflow, POST /workflow/stop)
- Implement developer API protection with settings check
- Add comprehensive workflow schema models with proper validation
- Create extensive unit test suite covering all scenarios
- Apply Ruff linting standards and fix all code quality issues
- Support API key authentication for all workflow endpoints
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jan 9, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

Adds a V2 converters module for mapping V1 RunResponse objects to V2 WorkflowExecutionResponse, extends the workflow API with synchronous execution, timeout enforcement, and structured error responses, introduces workflow-specific exceptions, and adds comprehensive unit tests covering conversions and execution error cases.

Changes

Cohort / File(s) Summary
Core Converter Implementation
src/backend/base/langflow/api/v2/converters.py
New module providing parse_flat_inputs, run_response_to_workflow_response, create_job_response, create_error_response, and multiple helpers for nested extraction, content simplification, metadata building, and mapping V1 run outputs to V2 schemas.
Workflow Execution Layer
src/backend/base/langflow/api/v2/workflow.py
Added EXECUTION_TIMEOUT, execute_sync_workflow_with_timeout, execute_sync_workflow, improved validation and structured HTTP error mapping (FLOW_NOT_FOUND, INVALID_FLOW_DATA, EXECUTION_TIMEOUT, etc.), router dependency for developer API checks, and placeholders returning 501 for background/streaming modes.
Exception Hierarchy
src/backend/base/langflow/exceptions/api.py
New exceptions: WorkflowExecutionError, WorkflowTimeoutError, WorkflowValidationError for categorizing execution-related failures.
Schema Update
src/lfx/src/lfx/schema/workflow.py
Removed GlobalInputs; WorkflowExecutionRequest.inputs now described as flat component-specific inputs and model_config updated with extra="forbid".
Converter Tests
src/backend/tests/unit/api/v2/test_converters.py
New, extensive unit tests validating parse_flat_inputs, all helper functions, run_response_to_workflow_response, create_job_response, create_error_response, and many edge/error cases across message/data formats and terminal node types.
Workflow Endpoint Tests
src/backend/tests/unit/api/v2/test_workflow.py
Expanded tests to expect 403 DEVELOPER_API_DISABLED, FLOW_NOT_FOUND, DATABASE_ERROR (503), INVALID_FLOW_DATA (500), EXECUTION_TIMEOUT (504), and 501 for non-implemented modes; adds sync execution and error-handling scenarios.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant WorkflowAPI
    participant Parser
    participant GraphBuilder
    participant GraphExecutor
    participant Converter
    participant ResponseBuilder

    Client->>WorkflowAPI: execute_workflow(workflow_request)
    WorkflowAPI->>Parser: parse_flat_inputs(inputs)
    Parser-->>WorkflowAPI: tweaks, session_id
    WorkflowAPI->>GraphBuilder: build Graph(flow_data, tweaks)
    GraphBuilder-->>WorkflowAPI: graph
    WorkflowAPI->>GraphExecutor: execute graph
    GraphExecutor-->>WorkflowAPI: run_response
    WorkflowAPI->>Converter: run_response_to_workflow_response(run_response, flow_id, job_id, workflow_request, graph)
    Converter->>Converter: derive terminal nodes, extract contents, build metadata
    Converter->>ResponseBuilder: construct WorkflowExecutionResponse
    ResponseBuilder-->>WorkflowAPI: response
    WorkflowAPI-->>Client: WorkflowExecutionResponse
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 7
✅ Passed checks (7 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: Implement synchronous workflow execution API' directly and accurately summarizes the main change: adding a synchronous workflow execution endpoint with full integration into the V2 API.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Test Coverage For New Implementations ✅ Passed Test files test_converters.py and test_workflow.py provide comprehensive coverage with 139 and 123 assertions respectively, testing all 11 new functions and workflow functionality with 80+ test methods covering edge cases.
Test Quality And Coverage ✅ Passed PR includes comprehensive test coverage with 1,900+ lines across two test files testing 11 converter functions and API endpoints with extensive edge cases and error scenarios.
Test File Naming And Structure ✅ Passed Test files follow correct naming convention, proper pytest structure, logical organization into focused classes, and descriptive test names covering edge cases and error conditions.
Excessive Mock Usage Warning ✅ Passed Test files demonstrate appropriate mock usage with minimal mocks per test and high assertion-to-mock ratios, indicating substantive testing focused on core logic rather than scaffolding.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch developer-api

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added the enhancement New feature or request label Jan 9, 2026
@github-actions github-actions Bot added enhancement New feature or request and removed enhancement New feature or request labels Jan 9, 2026
@github-actions github-actions Bot added enhancement New feature or request and removed enhancement New feature or request labels Jan 9, 2026
Comment thread src/backend/base/langflow/api/v2/converters.py
Comment thread src/backend/base/langflow/api/v2/converters.py Outdated
Comment thread src/backend/base/langflow/api/v2/converters.py Outdated
Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
@codeflash-ai
Copy link
Copy Markdown
Contributor

codeflash-ai Bot commented Jan 9, 2026

This PR is now faster! 🚀 Janardan Singh Kavia accepted my code suggestion above.

Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
@codeflash-ai
Copy link
Copy Markdown
Contributor

codeflash-ai Bot commented Jan 9, 2026

This PR is now faster! 🚀 Janardan Singh Kavia accepted my code suggestion above.

Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
@github-actions github-actions Bot added enhancement New feature or request and removed enhancement New feature or request labels Jan 19, 2026
@github-actions github-actions Bot added enhancement New feature or request and removed enhancement New feature or request labels Jan 19, 2026
@github-actions github-actions Bot added enhancement New feature or request and removed enhancement New feature or request labels Jan 19, 2026
Comment on lines +465 to +466
# First pass: collect all terminal vertices and check for duplicate display_names
terminal_vertices = [graph.get_vertex(vertex_id) for vertex_id in terminal_node_ids]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚡️Codeflash found 60% (0.60x) speedup for run_response_to_workflow_response in src/backend/base/langflow/api/v2/converters.py

⏱️ Runtime : 2.82 milliseconds 1.76 milliseconds (best of 24 runs)

📝 Explanation and details

The optimized code achieves a 59% speedup (from 2.82ms to 1.76ms) by replacing repeated graph.get_vertex() calls with a single dictionary lookup operation.

Key Optimization

What changed: The code now builds a single vertex_by_id mapping upfront instead of calling graph.get_vertex() for each terminal node:

# Original: O(n*m) complexity - repeated linear searches
terminal_vertices = [graph.get_vertex(vertex_id) for vertex_id in terminal_node_ids]

# Optimized: O(n+m) complexity - single dict creation + O(1) lookups
vertex_by_id = {v.id: v for v in vertices_list}
terminal_vertices = [vertex_by_id[vid] for vid in terminal_node_ids if vid in vertex_by_id]

Why it's faster: The line profiler shows the original list comprehension consumed 61.7% of total runtime (26.1ms out of 42.3ms). Each graph.get_vertex() call likely performs a linear search through all vertices. With multiple terminal nodes, this becomes O(n×m) where n is terminal nodes and m is total vertices. The optimized version creates one dictionary (O(m)) then does O(1) lookups (O(n)), reducing complexity to O(n+m).

Impact: The optimization reduces this operation's share from 61.7% to just ~7% of runtime in run_response_to_workflow_response, which is the primary driver of the 59% overall speedup.

Performance Characteristics

Based on the annotated tests:

  • Best case: Large-scale scenarios with many terminal vertices (e.g., test_large_scale_many_vertices_performance_and_correct_count with 300 vertices) benefit most since the O(n×m) vs O(n+m) difference becomes significant
  • Typical case: Workflows with multiple terminal nodes (common pattern shown in tests with 2-4 terminal nodes) see consistent speedup
  • Edge case: Single terminal node scenarios see minimal benefit but no regression

The optimization is particularly valuable if run_response_to_workflow_response is called frequently in workflow execution paths, as even the modest 1ms savings per call can accumulate significantly in high-throughput scenarios.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 82 Passed
🌀 Generated Regression Tests 7 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 95.8%
⚙️ Click to see Existing Unit Tests
🌀 Click to see Generated Regression Tests
from __future__ import annotations

# imports
import sys
import time
import types
from typing import Any

import pytest  # used for our unit tests
from langflow.api.v1.schemas import RunResponse
from langflow.api.v2.converters import run_response_to_workflow_response
from lfx.graph.graph.base import Graph
from lfx.schema.workflow import (ComponentOutput, JobStatus,
                                 WorkflowExecutionRequest,
                                 WorkflowExecutionResponse)


class RunResponse:
    """Minimal RunResponse container used by the function under test."""

    def __init__(self, outputs=None):
        # outputs is expected to be an iterable of run_output objects (or None)
        self.outputs = outputs


class Vertex:
    """Vertex representation used by the Graph class in tests."""

    def __init__(self, id, outputs=None, vertex_type="", is_output=False, display_name=None):
        # outputs: list of dicts where each dict may have a 'types' key (list)
        self.id = id
        self.outputs = outputs or []
        self.vertex_type = vertex_type
        self.is_output = is_output
        self.display_name = display_name


class Graph:
    """Simple Graph implementation that mimics the minimal behaviour expected by the function."""

    def __init__(self, vertices=None, successor_map=None, provide_get_terminal_nodes=True):
        # vertices: list of Vertex instances
        self.vertices = vertices or []
        # successor_map: mapping vertex.id -> list of successor ids
        self.successor_map = successor_map or {}
        # Whether to expose get_terminal_nodes() or not (to force fallback)
        self._provide_get_terminal_nodes = provide_get_terminal_nodes

    def get_vertex(self, vertex_id):
        for v in self.vertices:
            if v.id == vertex_id:
                return v
        raise KeyError(f"Vertex {vertex_id} not found")

    # Optionally provide get_terminal_nodes method. Some tests will create Graph objects
    # that do not expose this attribute to force the fallback path in the implementation.
    def get_terminal_nodes(self):
        if not self._provide_get_terminal_nodes:
            # Simulate object without this method by raising AttributeError to mimic access error
            raise AttributeError("get_terminal_nodes not available")
        # Terminal nodes are those with no successors in successor_map
        result = []
        for v in self.vertices:
            if not self.successor_map.get(v.id):
                result.append(v.id)
        return result


class ComponentOutput:
    """Represents output for a single component in V2 schema."""

    def __init__(self, type, component_id, status, content, metadata):
        self.type = type
        self.component_id = component_id
        self.status = status
        self.content = content
        self.metadata = metadata

    def __eq__(self, other):
        if not isinstance(other, ComponentOutput):
            return False
        return (
            self.type == other.type
            and self.component_id == other.component_id
            and self.status == other.status
            and self.content == other.content
            and self.metadata == other.metadata
        )

    def __repr__(self):
        return f"ComponentOutput(type={self.type!r}, id={self.component_id!r}, status={self.status!r}, content={self.content!r}, metadata={self.metadata!r})"


class JobStatus:
    """Simple enum-like container for statuses."""
    COMPLETED = "completed"
    FAILED = "failed"


class WorkflowExecutionRequest:
    """Represents an incoming workflow execution request."""

    def __init__(self, inputs=None):
        # inputs is a dict echoed back in the response
        self.inputs = inputs or {}


class WorkflowExecutionResponse:
    """Represents the V2 response created by the function under test."""

    def __init__(self, flow_id, job_id, object, created_timestamp, status, errors, inputs, outputs, metadata):
        self.flow_id = flow_id
        self.job_id = job_id
        self.object = object
        self.created_timestamp = created_timestamp
        self.status = status
        self.errors = errors
        self.inputs = inputs
        self.outputs = outputs
        self.metadata = metadata

    def __repr__(self):
        return f"WorkflowExecutionResponse(flow_id={self.flow_id!r}, job_id={self.job_id!r}, outputs={list(self.outputs.keys())!r})"

# The original implementation references a few helper functions that are not included in the snippet:
# _extract_text_from_message, _extract_nested_value, _extract_model_source, _extract_file_path
# We define them here with simple, deterministic behaviour that matches the expectations
# of the test inputs below.

def _extract_text_from_message(content):
    """Extract textual content from message-like dicts.

    Expected strategies used by tests:
    - If content contains {'message': {'text': <str>}} -> return that text
    - If content contains {'message': {'content': <str>}} -> return that content
    - If content contains {'text': <str>} -> return that text
    - Otherwise return None to indicate no extraction possible
    """
    if not isinstance(content, dict):
        return None
    # Common nested variants
    message = content.get("message")
    if isinstance(message, dict):
        if "text" in message and isinstance(message["text"], str):
            return message["text"]
        if "content" in message and isinstance(message["content"], str):
            return message["content"]
    if "text" in content and isinstance(content["text"], str):
        return content["text"]
    return None


def _extract_nested_value(content, key1, key2):
    """Try to fetch content[key1][key2], returning None if any access fails."""
    if not isinstance(content, dict):
        return None
    nested = content.get(key1)
    if isinstance(nested, dict):
        return nested.get(key2)
    return None


def _extract_model_source(raw_content, vertex_id, vertex_display_name):
    """Return a lightweight source description if model info is present."""
    if not isinstance(raw_content, dict):
        return None
    # Accept model info under keys 'model' or 'source'
    if "model" in raw_content:
        return {"model": raw_content["model"]}
    if "source" in raw_content:
        return {"model": raw_content["source"]}
    return None


def _extract_file_path(raw_content, vertex_type):
    """Return file path if present in raw_content under 'file_path'."""
    if not isinstance(raw_content, dict):
        return None
    return raw_content.get("file_path")


# Now that sys.modules has the expected modules and helper functions available in globals,
# we can include the original function implementation exactly as provided by the user.
# (We must not alter the function signature or logic.)


def _get_raw_content(vertex_output_data: Any) -> Any:
    """Extract raw content from vertex output data.

    Tries multiple fields in order: outputs, results, messages.
    Note: Uses 'is not None' checks to avoid treating empty collections as missing.

    Args:
        vertex_output_data: The output data from RunResponse

    Returns:
        Raw content or None
    """
    if hasattr(vertex_output_data, "outputs") and vertex_output_data.outputs is not None:
        return vertex_output_data.outputs
    if hasattr(vertex_output_data, "results") and vertex_output_data.results is not None:
        return vertex_output_data.results
    if hasattr(vertex_output_data, "messages") and vertex_output_data.messages is not None:
        return vertex_output_data.messages
    if isinstance(vertex_output_data, dict):
        # Check for 'results' first, then 'content' if results is None
        if "results" in vertex_output_data:
            return vertex_output_data["results"]
        if "content" in vertex_output_data:
            return vertex_output_data["content"]
    return vertex_output_data


def _simplify_output_content(content: Any, output_type: str) -> Any:
    """Simplify output content for output nodes.

    For message types, extracts plain text from nested structures.
    For data/dataframe types, extracts the actual data value.
    For other types, returns content as-is.

    Args:
        content: The raw content
        output_type: The output type

    Returns:
        Simplified content
    """
    if not isinstance(content, dict):
        return content

    if output_type in {"message", "text"}:
        text = _extract_text_from_message(content)
        return text if text is not None else content

    if output_type == "data":
        # For data types, try multiple path combinations in order
        # This allows flexibility for different component output structures
        data_paths = [
            ("result", "message"),  # Standard: {'result': {'message': {...}}}
            ("results", "message"),  # Plural variant: {'results': {'message': {...}}}
        ]
        for path in data_paths:
            result_data = _extract_nested_value(content, *path)
            if result_data is not None:
                return result_data
    # TODO: Future scope - Add dataframe-specific extraction logic
    # The following code is commented out pending further requirements analysis:
    if output_type == "dataframe":
        # For dataframe types, try multiple path combinations in order
        dataframe_paths = [
            ("results", "message"),  # Plural: {'results': {'message': {...}}}
            ("result", "message"),  # Singular fallback: {'result': {'message': {...}}}
            ("run_sql_query", "message"),  # SQL component specific
        ]
        for path in dataframe_paths:
            dataframe_data = _extract_nested_value(content, *path)
            if dataframe_data is not None:
                return dataframe_data

    return content


def _build_metadata_for_non_output(
    raw_content: Any, vertex_id: str, vertex_display_name: str, vertex_type: str, output_type: str
) -> dict[str, Any]:
    """Build metadata for non-output terminal nodes.

    Extracts:
    - source: Model information for LLM components
    - file_path: File path for SaveToFile components

    Args:
        raw_content: The raw output data
        vertex_id: Vertex ID
        vertex_display_name: Vertex display name
        vertex_type: Vertex type
        output_type: Output type

    Returns:
        Metadata dict
    """
    metadata: dict[str, Any] = {}

    if output_type != "message" or not isinstance(raw_content, dict):
        return metadata

    # Extract model source for LLM components
    source_info = _extract_model_source(raw_content, vertex_id, vertex_display_name)
    if source_info:
        metadata["source"] = source_info

    # Extract file path for SaveToFile components
    file_path = _extract_file_path(raw_content, vertex_type)
    if file_path:
        metadata["file_path"] = file_path

    return metadata


def _process_terminal_vertex(
    vertex: Any,
    output_data_map: dict[str, Any],
    display_name_counts: dict[str, int],
) -> tuple[str, ComponentOutput]:
    """Process a single terminal vertex and return (output_key, component_output).

    Args:
        vertex: The vertex to process
        output_data_map: Map of component_id to output data
        display_name_counts: Count of each display_name for duplicate detection

    Returns:
        Tuple of (output_key, ComponentOutput)
    """
    # Get output data by vertex.id (component_id)
    vertex_output_data = output_data_map.get(vertex.id)

    # Determine output type from vertex
    output_type = "unknown"
    if vertex.outputs and len(vertex.outputs) > 0:
        types = vertex.outputs[0].get("types", [])
        if types:
            output_type = types[0].lower()
    if output_type == "unknown" and vertex.vertex_type:
        output_type = vertex.vertex_type.lower()

    # Initialize metadata with component_type
    metadata: dict[str, Any] = {"component_type": vertex.vertex_type}

    # Extract content
    content = None
    if vertex_output_data:
        raw_content = _get_raw_content(vertex_output_data)

        if vertex.is_output and raw_content is not None:
            # Output nodes: simplify content
            content = _simplify_output_content(raw_content, output_type)
        elif not vertex.is_output and raw_content is not None:
            # Non-output nodes:
            # - For data types: extract and show content
            # - For message types: extract metadata only (source, file_path)
            # TODO: Future scope - Add support for "dataframe" output type
            if output_type in ["data", "dataframe"]:
                # Show data content for non-output data nodes
                content = _simplify_output_content(raw_content, output_type)
            else:
                # For message types, extract metadata only
                extra_metadata = _build_metadata_for_non_output(
                    raw_content,
                    vertex.id,
                    vertex.display_name or vertex.vertex_type,
                    vertex.vertex_type,
                    output_type,
                )
                metadata.update(extra_metadata)

        # Add any additional metadata from result data
        if hasattr(vertex_output_data, "metadata") and vertex_output_data.metadata:
            metadata.update(vertex_output_data.metadata)
        elif isinstance(vertex_output_data, dict) and "metadata" in vertex_output_data:
            result_metadata = vertex_output_data.get("metadata")
            if isinstance(result_metadata, dict):
                metadata.update(result_metadata)

    # Build ComponentOutput
    component_output = ComponentOutput(
        type=output_type,
        component_id=vertex.id,
        status=JobStatus.COMPLETED,
        content=content,
        metadata=metadata,
    )

    # Determine output key: use display_name if unique, otherwise use id
    display_name = vertex.display_name or vertex.id
    if display_name_counts.get(display_name, 0) > 1:
        # Duplicate display_name detected, use id instead
        output_key = vertex.id
        # Store the display_name in metadata for reference
        if vertex.display_name and vertex.display_name != vertex.id:
            metadata["display_name"] = vertex.display_name
    else:
        # Unique display_name, use it as key
        output_key = display_name

    return output_key, component_output
from langflow.api.v2.converters import run_response_to_workflow_response

# unit tests

# Helper lightweight containers used to assemble run_response.outputs structure expected by the function
class ResultDataObj:
    """Represents a single result_data element inside run_output.outputs."""

    def __init__(self, component_id=None, outputs=None, results=None, messages=None, metadata=None):
        # These attributes are accessed by the function using hasattr checks
        self.component_id = component_id
        self.outputs = outputs
        self.results = results
        self.messages = messages
        self.metadata = metadata


class RunOutputContainer:
    """Represents an element in RunResponse.outputs with an 'outputs' attribute."""

    def __init__(self, outputs=None):
        self.outputs = outputs or []


def test_basic_message_output_shows_simplified_text():
    # Basic scenario: single terminal node which is an output node producing a message.
    # The function should expose simplified textual content (extracted by _extract_text_from_message).

    # Create a terminal vertex that is an output node with declared message type
    v = Vertex(id="v1", outputs=[{"types": ["message"]}], vertex_type="ChatOutput", is_output=True, display_name="Chat")

    # Create a run_response where the single result_data has outputs containing a 'message' dict
    message_payload = {"message": {"content": "Hello, world!"}}
    result_data = ResultDataObj(component_id="v1", outputs=message_payload)
    run_output = RunOutputContainer(outputs=[result_data])
    rr = RunResponse(outputs=[run_output])

    # Create graph with the single terminal vertex
    g = Graph(vertices=[v], successor_map={}, provide_get_terminal_nodes=True)

    # Build workflow request
    req = WorkflowExecutionRequest(inputs={"x": 1})

    # Execute transformation
    codeflash_output = run_response_to_workflow_response(rr, flow_id="flow1", job_id="job1", workflow_request=req, graph=g); resp = codeflash_output
    comp_out = resp.outputs["Chat"]


def test_non_output_llm_node_exposes_only_metadata():
    # Edge scenario: a non-output LLM node (is_output=False, type=message) should not expose textual content,
    # but should expose metadata like source and file_path extracted from the raw content.

    v = Vertex(id="llm1", outputs=[{"types": ["message"]}], vertex_type="LLM", is_output=False, display_name="LLM_Model")

    # Provide raw_content that contains a model and file_path so metadata extraction can pick them up
    raw = {"message": {"some": "info"}, "model": "gpt-test", "file_path": "/tmp/model.info"}
    result_data = ResultDataObj(component_id="llm1", outputs=raw)
    rr = RunResponse(outputs=[RunOutputContainer(outputs=[result_data])])

    g = Graph(vertices=[v], successor_map={}, provide_get_terminal_nodes=True)
    req = WorkflowExecutionRequest(inputs={})

    codeflash_output = run_response_to_workflow_response(rr, flow_id="flowX", job_id="jobY", workflow_request=req, graph=g); resp = codeflash_output
    comp_out = resp.outputs["LLM_Model"]


def test_data_node_non_output_exposes_data_content():
    # Non-output node with type data should expose content: we expect the function to extract nested
    # data via ('result', 'message') or ('results','message') paths.

    v = Vertex(id="data1", outputs=[{"types": ["data"]}], vertex_type="DataNode", is_output=False, display_name="DataNode")

    # Create nested structure that matches ('result','message')
    nested = {"result": {"message": {"value": [1, 2, 3]}}}
    result_data = ResultDataObj(component_id="data1", outputs=nested)
    rr = RunResponse(outputs=[RunOutputContainer(outputs=[result_data])])

    g = Graph(vertices=[v], successor_map={}, provide_get_terminal_nodes=True)
    req = WorkflowExecutionRequest()

    codeflash_output = run_response_to_workflow_response(rr, flow_id="flow-data", job_id="job-data", workflow_request=req, graph=g); resp = codeflash_output
    comp_out = resp.outputs["DataNode"]


def test_duplicate_display_names_uses_ids_and_preserves_display_name_in_metadata():
    # Edge case: two terminal nodes share the same display_name. The function must use their ids as keys
    # and include the display_name in metadata for each entry.

    v1 = Vertex(id="v_dup1", outputs=[{"types": ["text"]}], vertex_type="T", is_output=True, display_name="DupName")
    v2 = Vertex(id="v_dup2", outputs=[{"types": ["text"]}], vertex_type="T", is_output=True, display_name="DupName")

    # Provide outputs for both components
    r1 = ResultDataObj(component_id="v_dup1", outputs={"text": {"content": "one"}})
    r2 = ResultDataObj(component_id="v_dup2", outputs={"text": {"content": "two"}})
    rr = RunResponse(outputs=[RunOutputContainer(outputs=[r1, r2])])

    g = Graph(vertices=[v1, v2], successor_map={}, provide_get_terminal_nodes=True)
    req = WorkflowExecutionRequest()

    codeflash_output = run_response_to_workflow_response(rr, flow_id="flow-d", job_id="job-d", workflow_request=req, graph=g); resp = codeflash_output
    co1 = resp.outputs["v_dup1"]
    co2 = resp.outputs["v_dup2"]


def test_missing_get_terminal_nodes_fallback_and_empty_run_outputs():
    # Edge scenario: graph does not provide get_terminal_nodes method (AttributeError path).
    # Also test run_response.outputs is empty/None and ensure outputs still created with no content.
    # This ensures the fallback path using successor_map is used.

    # Two vertices with no successors
    v1 = Vertex(id="a", outputs=[{"types": ["message"]}], vertex_type="A", is_output=False, display_name="AName")
    v2 = Vertex(id="b", outputs=[{"types": ["data"]}], vertex_type="B", is_output=False, display_name="BName")

    # Graph that will trigger AttributeError for get_terminal_nodes by setting provide_get_terminal_nodes=False
    g = Graph(vertices=[v1, v2], successor_map={}, provide_get_terminal_nodes=False)

    # RunResponse with outputs None (no data)
    rr = RunResponse(outputs=None)
    req = WorkflowExecutionRequest(inputs={"k": "v"})

    codeflash_output = run_response_to_workflow_response(rr, flow_id="f", job_id="j", workflow_request=req, graph=g); resp = codeflash_output


def test_result_data_as_plain_dict_merges_metadata_and_supports_results_key():
    # Edge scenario: run_output.outputs may contain plain dicts (not objects).
    # The function should accept dicts and use the 'results' key and merge metadata if present.

    v = Vertex(id="plain1", outputs=[{"types": ["message"]}], vertex_type="Plain", is_output=True, display_name="PlainOut")

    # result represented as dict with 'results' and 'metadata'
    result_dict = {
        "component_id": "plain1",
        "results": {"message": {"content": "plain"}},
        "metadata": {"mkey": "mval"},
    }
    rr = RunResponse(outputs=[RunOutputContainer(outputs=[result_dict])])
    g = Graph(vertices=[v], successor_map={}, provide_get_terminal_nodes=True)
    req = WorkflowExecutionRequest()

    codeflash_output = run_response_to_workflow_response(rr, flow_id="flow-plain", job_id="job-plain", workflow_request=req, graph=g); resp = codeflash_output
    comp = resp.outputs["PlainOut"]


def test_large_scale_many_vertices_performance_and_correct_count():
    # Large scale test: create many terminal vertices (within the 1000 elements limit).
    # The function should process them all and provide an output entry per terminal vertex.
    n = 300  # well under the 1000 limit requested by the user
    vertices = []
    run_output_objs = []

    # We'll create N vertices, half will have corresponding run outputs, the rest won't.
    for i in range(n):
        vid = f"v{i}"
        # Alternate types to exercise the different branches ('message' vs 'data')
        if i % 2 == 0:
            v = Vertex(id=vid, outputs=[{"types": ["message"]}], vertex_type="Msg", is_output=(i % 4 == 0), display_name=f"Name{i}")
            # Provide run output for even indices
            payload = {"message": {"content": f"text-{i}"}}
        else:
            v = Vertex(id=vid, outputs=[{"types": ["data"]}], vertex_type="Data", is_output=False, display_name=f"Name{i}")
            # Provide run output for odd indices as well to exercise data extraction path
            payload = {"result": {"message": {"value": i}}}
        vertices.append(v)
        # Create result_data for every vertex so outputs are present for all - this stresses the mapping logic
        rd = ResultDataObj(component_id=vid, outputs=payload, metadata={"idx": i})
        run_output_objs.append(rd)

    rr = RunResponse(outputs=[RunOutputContainer(outputs=run_output_objs)])
    # Build successor_map empty so all vertices are terminal
    succ = {}
    g = Graph(vertices=vertices, successor_map=succ, provide_get_terminal_nodes=True)
    req = WorkflowExecutionRequest(inputs={"flag": True})

    codeflash_output = run_response_to_workflow_response(rr, flow_id="flow-many", job_id="job-many", workflow_request=req, graph=g); resp = codeflash_output
    # Spot check a few entries for correct content extraction and metadata merging
    sample_even = resp.outputs["Name0"]  # even index, possibly is_output True for i=0
    # For an odd index (data path) ensure nested value extracted successfully
    sample_odd = resp.outputs["Name1"]
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
from typing import Any
from unittest.mock import MagicMock, Mock

import pytest
from langflow.api.v1.schemas import RunResponse
from langflow.api.v2.converters import run_response_to_workflow_response
from lfx.graph.graph.base import Graph
from lfx.schema.workflow import (ComponentOutput, JobStatus,
                                 WorkflowExecutionRequest,
                                 WorkflowExecutionResponse)

To test or edit this optimization locally git merge codeflash/optimize-pr11255-2026-01-20T00.36.50

Click to see suggested changes
Suggested change
# First pass: collect all terminal vertices and check for duplicate display_names
terminal_vertices = [graph.get_vertex(vertex_id) for vertex_id in terminal_node_ids]
# Build terminal_vertices efficiently by creating a single id->vertex map
# This avoids repeated graph.get_vertex() calls which can be expensive
vertices_list = getattr(graph, "vertices", None)
if vertices_list is not None:
vertex_by_id = {v.id: v for v in vertices_list}
terminal_vertices = [vertex_by_id[vid] for vid in terminal_node_ids if vid in vertex_by_id]
else:
# Fallback if vertices attribute is not available
terminal_vertices = [graph.get_vertex(vertex_id) for vertex_id in terminal_node_ids]
# First pass: collect all terminal vertices and check for duplicate display_names

@dkaushik94
Copy link
Copy Markdown
Member

For posterity. Will add logging in a follow up pull request once functionality is merged and testable.

@github-actions github-actions Bot added enhancement New feature or request and removed enhancement New feature or request labels Jan 21, 2026
# Conflicts:
#	src/lfx/src/lfx/_assets/component_index.json
@github-actions github-actions Bot added lgtm This PR has been approved by a maintainer enhancement New feature or request and removed enhancement New feature or request labels Jan 21, 2026
@github-actions github-actions Bot added enhancement New feature or request and removed enhancement New feature or request labels Jan 21, 2026
@github-actions github-actions Bot added enhancement New feature or request and removed enhancement New feature or request labels Jan 21, 2026
Copy link
Copy Markdown
Member

@dkaushik94 dkaushik94 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

@Jkavia Jkavia added this pull request to the merge queue Jan 21, 2026
Merged via the queue into main with commit fa59cc5 Jan 21, 2026
49 of 50 checks passed
@Jkavia Jkavia deleted the developer-api branch January 21, 2026 21:52
cfchase pushed a commit to cfchase/langflow that referenced this pull request Feb 26, 2026
* feat: Create controller shell and schema model for new workflow API

- Add workflow API endpoints (POST /workflow, GET /workflow, POST /workflow/stop)
- Implement developer API protection with settings check
- Add comprehensive workflow schema models with proper validation
- Create extensive unit test suite covering all scenarios
- Apply Ruff linting standards and fix all code quality issues
- Support API key authentication for all workflow endpoints

* fix: Move developer API check to router-level dependency

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* fix: Remove response model as its automatically configured.

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>

* [autofix.ci] apply automated fixes

* feat: Implement synchronous workflow execution API

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* Apply suggestion from @codeflash-ai[bot]

Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>

* Apply suggestion from @codeflash-ai[bot]

Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>

* Apply suggestion from @codeflash-ai[bot]

Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>

* [autofix.ci] apply automated fixes

* Clean up V2 Workflow API code and add unit tests

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* fix: CodeRabbit review fixes

* fix: improve V2 workflow API test quality and code structure

* Fix ruff whitespace issues in v2 API test files

* [autofix.ci] apply automated fixes

* Fix mypy type errors in v2 API files

* [autofix.ci] apply automated fixes

* refactor: improve data extraction with flexible path combinations and prevent flow data mutation

* fix: resolve component index merge conflict after sync

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* refactor: move ComponentOutput construction to avoid dict change after construction

* [autofix.ci] apply automated fixes

---------

Co-authored-by: Janardan S Kavia <janardanskavia@Janardans-MacBook-Pro.local>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
Co-authored-by: Janardan S Kavia <janardanskavia@mac.war.can.ibm.com>
Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request lgtm This PR has been approved by a maintainer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants