Skip to content

⚡️ Speed up method CallGraph.ancestors by 37% in PR #1660 (unstructured-inference)#1834

Closed
codeflash-ai[bot] wants to merge 2 commits intounstructured-inferencefrom
codeflash/optimize-pr1660-2026-03-15T02.05.53
Closed

⚡️ Speed up method CallGraph.ancestors by 37% in PR #1660 (unstructured-inference)#1834
codeflash-ai[bot] wants to merge 2 commits intounstructured-inferencefrom
codeflash/optimize-pr1660-2026-03-15T02.05.53

Conversation

@codeflash-ai
Copy link
Copy Markdown
Contributor

@codeflash-ai codeflash-ai bot commented Mar 15, 2026

⚡️ This pull request contains optimizations for PR #1660

If you approve this dependent PR, these changes will be merged into the original PR branch unstructured-inference.

This PR will be automatically closed if the original PR is merged.


📄 37% (0.37x) speedup for CallGraph.ancestors in codeflash/models/call_graph.py

⏱️ Runtime : 67.5 milliseconds 49.1 milliseconds (best of 10 runs)

📝 Explanation and details

The optimization replaces the per-iteration max_depth is not None and depth >= max_depth check with a single upfront branch that runs two specialized BFS variants: one without depth tracking (storing plain FunctionNode in the queue) when max_depth is None, and one with depth tracking (storing tuple[FunctionNode, int]) when a limit is set. This eliminates tuple packing/unpacking and a conditional check on every loop iteration in the common unlimited-depth case. Line profiler shows the original for edge in self.callers_of(current) accounted for 91% of runtime; the optimized code caches self.reverse once and uses reverse_map.get(current, []) inline, avoiding 8309 redundant dictionary lookups. The trade-off is slightly longer code due to the two-path structure, but runtime improves 37% with negligible regressions in a few edge cases (max_depth=0 is 66% slower, but these are rare micro-benchmarks with sub-microsecond absolute deltas).

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 39 Passed
🌀 Generated Regression Tests 99 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
⚙️ Click to see Existing Unit Tests
Test File::Test Function Original ⏱️ Optimized ⏱️ Speedup
test_call_graph.py::TestAncestors.test_empty_for_root 9.22μs 6.90μs 33.5%✅
test_call_graph.py::TestAncestors.test_max_depth_limits_traversal 17.0μs 13.0μs 30.9%✅
test_call_graph.py::TestAncestors.test_transitive_ancestors 19.0μs 16.6μs 14.7%✅
🌀 Click to see Generated Regression Tests
from collections import deque

# imports
import pytest  # used for our unit tests
# Import the real classes from the module under test.
# The file under test is codeflash/models/call_graph.py so we import from that module.
from codeflash.models.call_graph import CallEdge, CallGraph, FunctionNode
from pathlib import Path

def test_single_edge_basic_ancestors():
    a = FunctionNode(Path("file.py"), "A")
    b = FunctionNode(Path("file.py"), "B")

    edge = CallEdge(a, b, False)

    cg = CallGraph(edges=[edge])

    result = cg.ancestors(b) # 12.5μs -> 10.8μs (15.7% faster)
    assert isinstance(result, set)
    assert result == {a} # 1.23μs -> 1.17μs (5.12% faster)

    assert cg.ancestors(a) == set()

def test_max_depth_limits_traversal_linear_chain():
    a = FunctionNode(Path("file.py"), "A")
    b = FunctionNode(Path("file.py"), "B")
    c = FunctionNode(Path("file.py"), "C")

    edges = [CallEdge(a, b, False), CallEdge(b, c, False)]
    cg = CallGraph(edges=edges)

    assert cg.ancestors(c) == {a, b} # 12.2μs -> 10.9μs (12.2% faster)

    assert cg.ancestors(c, max_depth=1) == {b} # 2.22μs -> 2.48μs (10.1% slower)

    assert cg.ancestors(c, max_depth=0) == set() # 761ns -> 1.03μs (26.2% slower)

def test_multiple_callers_and_missing_node():
    a = FunctionNode(Path("file.py"), "A")
    b = FunctionNode(Path("file.py"), "B")
    x = FunctionNode(Path("file.py"), "X")
    other = FunctionNode(Path("file.py"), "OTHER")

    edges = [CallEdge(a, x, False), CallEdge(b, x, False)]
    cg = CallGraph(edges=edges)

    assert cg.ancestors(x) == {a, b} # 12.2μs -> 10.7μs (14.0% faster)

    assert cg.ancestors(other) == set() # 2.62μs -> 2.56μs (2.35% faster)

def test_self_call_is_included():
    a = FunctionNode(Path("file.py"), "A")
    edge = CallEdge(a, a, False)
    cg = CallGraph(edges=[edge])

    assert cg.ancestors(a) == {a} # 8.01μs -> 7.16μs (11.8% faster)

def test_cycle_detection_does_not_loop_forever_and_respects_depth():
    a = FunctionNode(Path("file.py"), "A")
    b = FunctionNode(Path("file.py"), "B")
    c = FunctionNode(Path("file.py"), "C")

    edges = [CallEdge(a, b, False), CallEdge(b, c, False), CallEdge(c, a, False)]
    cg = CallGraph(edges=edges)

    ancestors_of_a = cg.ancestors(a) # 12.9μs -> 10.5μs (23.5% faster)
    assert ancestors_of_a == {a, b, c}

    assert cg.ancestors(a, max_depth=2) == {c, b} # 2.73μs -> 2.75μs (0.364% slower)

def test_negative_max_depth_behaves_as_zero():
    a = FunctionNode(Path("file.py"), "A")
    b = FunctionNode(Path("file.py"), "B")
    edges = [CallEdge(a, b, False)]
    cg = CallGraph(edges=edges)

    assert cg.ancestors(b, max_depth=-1) == set() # 1.93μs -> 6.09μs (68.2% slower)

def test_query_node_not_in_graph_returns_empty_set():
    a = FunctionNode(Path("file.py"), "A")
    b = FunctionNode(Path("file.py"), "B")
    c = FunctionNode(Path("file.py"), "C")
    edges = [CallEdge(a, b, False)]
    cg = CallGraph(edges=edges)

    assert cg.ancestors(c) == set() # 9.41μs -> 7.49μs (25.5% faster)

def test_large_linear_chain_full_traversal():
    N = 1000
    nodes = [FunctionNode(Path("file.py"), "n{}".format(i)) for i in range(N)]

    edges = [CallEdge(nodes[i], nodes[i + 1], False) for i in range(N - 1)]
    cg = CallGraph(edges=edges)

    target = nodes[-1]

    anc = cg.ancestors(target) # 2.19ms -> 1.61ms (36.0% faster)
    assert len(anc) == N - 1
    assert nodes[0] in anc
    assert nodes[-2] in anc
    assert target not in anc

def test_large_chain_with_limited_depth():
    N = 1000
    nodes = [FunctionNode(Path("file.py"), "m{}".format(i)) for i in range(N)]
    edges = [CallEdge(nodes[i], nodes[i + 1], False) for i in range(N - 1)]
    cg = CallGraph(edges=edges)

    target = nodes[-1]
    anc_10 = cg.ancestors(target, max_depth=10) # 1.56ms -> 1.20ms (30.8% faster)

    assert len(anc_10) == 10
    assert nodes[-2] in anc_10
    assert nodes[-11] in anc_10
    assert nodes[-12] not in anc_10
from collections import deque
from dataclasses import dataclass, field

# imports
import pytest
from codeflash.models.call_graph import CallEdge, FunctionNode, CallGraph
from pathlib import Path

def test_empty_call_graph_returns_empty_set():
    """Test that an empty call graph returns no ancestors."""
    # Create a simple call graph with no edges
    graph = CallGraph(edges=[])
    node_a = FunctionNode(file_path=Path("test.py"), qualified_name="func_a")
    
    # Query ancestors of a node in empty graph
    result = graph.ancestors(node_a) # 6.46μs -> 5.85μs (10.4% faster)
    
    # Should return an empty set since there are no callers
    assert result == set()
    assert isinstance(result, set)

def test_single_node_no_callers():
    """Test that a node with no incoming edges has no ancestors."""
    # Create a graph with one edge that doesn't involve our query node
    node_a = FunctionNode(file_path=Path("test.py"), qualified_name="func_a")
    node_b = FunctionNode(file_path=Path("test.py"), qualified_name="func_b")
    node_c = FunctionNode(file_path=Path("test.py"), qualified_name="func_c")
    edges = [CallEdge(caller=node_a, callee=node_b, is_cross_file=False)]
    graph = CallGraph(edges=edges)
    
    # Query ancestors of node_c which has no callers
    result = graph.ancestors(node_c) # 9.80μs -> 7.68μs (27.5% faster)
    
    # Should be empty since node_c is not called by anyone
    assert result == set()

def test_single_direct_caller():
    """Test finding a single direct caller (depth 1)."""
    # Create a simple call relationship: a calls b
    node_a = FunctionNode(file_path=Path("test.py"), qualified_name="func_a")
    node_b = FunctionNode(file_path=Path("test.py"), qualified_name="func_b")
    edges = [CallEdge(caller=node_a, callee=node_b, is_cross_file=False)]
    graph = CallGraph(edges=edges)
    
    # Query ancestors of b
    result = graph.ancestors(node_b) # 9.17μs -> 8.18μs (12.1% faster)
    
    # Should find a as the only ancestor
    assert result == {node_a}
    assert len(result) == 1

def test_linear_call_chain():
    """Test ancestor traversal through a linear call chain (a calls b, b calls c)."""
    # Create a linear chain: a -> b -> c
    node_a = FunctionNode(file_path=Path("test.py"), qualified_name="func_a")
    node_b = FunctionNode(file_path=Path("test.py"), qualified_name="func_b")
    node_c = FunctionNode(file_path=Path("test.py"), qualified_name="func_c")
    edges = [
        CallEdge(caller=node_a, callee=node_b, is_cross_file=False),
        CallEdge(caller=node_b, callee=node_c, is_cross_file=False),
    ]
    graph = CallGraph(edges=edges)
    
    # Query ancestors of c
    result = graph.ancestors(node_c) # 12.0μs -> 10.1μs (18.9% faster)
    
    # Should find both a and b as ancestors (transitive closure)
    assert result == {node_a, node_b}
    assert len(result) == 2

def test_multiple_direct_callers():
    """Test that multiple direct callers are all found."""
    # Create multiple callers: a, b, and c all call d
    node_a = FunctionNode(file_path=Path("test.py"), qualified_name="func_a")
    node_b = FunctionNode(file_path=Path("test.py"), qualified_name="func_b")
    node_c = FunctionNode(file_path=Path("test.py"), qualified_name="func_c")
    node_d = FunctionNode(file_path=Path("test.py"), qualified_name="func_d")
    edges = [
        CallEdge(caller=node_a, callee=node_d, is_cross_file=False),
        CallEdge(caller=node_b, callee=node_d, is_cross_file=False),
        CallEdge(caller=node_c, callee=node_d, is_cross_file=False),
    ]
    graph = CallGraph(edges=edges)
    
    # Query ancestors of d
    result = graph.ancestors(node_d) # 14.0μs -> 11.7μs (20.3% faster)
    
    # Should find all three as direct callers
    assert result == {node_a, node_b, node_c}
    assert len(result) == 3

def test_diamond_call_pattern():
    """Test ancestor traversal with a diamond pattern (a->c, b->c, c->d)."""
    # Diamond: a and b both call c, and c calls d
    # Query d should find a, b, c
    node_a = FunctionNode(file_path=Path("test.py"), qualified_name="func_a")
    node_b = FunctionNode(file_path=Path("test.py"), qualified_name="func_b")
    node_c = FunctionNode(file_path=Path("test.py"), qualified_name="func_c")
    node_d = FunctionNode(file_path=Path("test.py"), qualified_name="func_d")
    edges = [
        CallEdge(caller=node_a, callee=node_c, is_cross_file=False),
        CallEdge(caller=node_b, callee=node_c, is_cross_file=False),
        CallEdge(caller=node_c, callee=node_d, is_cross_file=False),
    ]
    graph = CallGraph(edges=edges)
    
    # Query ancestors of d
    result = graph.ancestors(node_d) # 14.3μs -> 11.8μs (20.4% faster)
    
    # Should find a, b, c (direct caller c and its callers a, b)
    assert result == {node_a, node_b, node_c}

def test_max_depth_zero_returns_empty():
    """Test that max_depth=0 prevents any traversal."""
    # Create a call chain: a -> b
    node_a = FunctionNode(file_path=Path("test.py"), qualified_name="func_a")
    node_b = FunctionNode(file_path=Path("test.py"), qualified_name="func_b")
    edges = [CallEdge(caller=node_a, callee=node_b, is_cross_file=False)]
    graph = CallGraph(edges=edges)
    
    # Query ancestors with max_depth=0 (depth 0 should be skipped)
    result = graph.ancestors(node_b, max_depth=0) # 2.04μs -> 6.12μs (66.6% slower)
    
    # Should be empty since we can't traverse any depth
    assert result == set()

def test_max_depth_one_finds_direct_callers_only():
    """Test that max_depth=1 limits traversal to direct callers."""
    # Create chain: a -> b -> c
    node_a = FunctionNode(file_path=Path("test.py"), qualified_name="func_a")
    node_b = FunctionNode(file_path=Path("test.py"), qualified_name="func_b")
    node_c = FunctionNode(file_path=Path("test.py"), qualified_name="func_c")
    edges = [
        CallEdge(caller=node_a, callee=node_b, is_cross_file=False),
        CallEdge(caller=node_b, callee=node_c, is_cross_file=False),
    ]
    graph = CallGraph(edges=edges)
    
    # Query ancestors of c with max_depth=1 (should only find direct caller b)
    result = graph.ancestors(node_c, max_depth=1) # 11.1μs -> 8.44μs (31.8% faster)
    
    # Should only find b, not a
    assert result == {node_b}
    assert node_a not in result

def test_max_depth_two_limits_traversal():
    """Test that max_depth=2 limits traversal depth appropriately."""
    # Create chain: a -> b -> c -> d
    node_a = FunctionNode(file_path=Path("test.py"), qualified_name="func_a")
    node_b = FunctionNode(file_path=Path("test.py"), qualified_name="func_b")
    node_c = FunctionNode(file_path=Path("test.py"), qualified_name="func_c")
    node_d = FunctionNode(file_path=Path("test.py"), qualified_name="func_d")
    edges = [
        CallEdge(caller=node_a, callee=node_b, is_cross_file=False),
        CallEdge(caller=node_b, callee=node_c, is_cross_file=False),
        CallEdge(caller=node_c, callee=node_d, is_cross_file=False),
    ]
    graph = CallGraph(edges=edges)
    
    # Query ancestors of d with max_depth=2
    result = graph.ancestors(node_d, max_depth=2) # 13.4μs -> 10.2μs (31.0% faster)
    
    # Should find b (depth 1) and c (depth 1), but not a (depth 2)
    assert result == {node_b, node_c}
    assert node_a not in result

def test_self_referencing_node():
    """Test that self-referencing calls don't cause infinite loops."""
    # Create a self-referencing edge: a calls itself
    node_a = FunctionNode(file_path=Path("test.py"), qualified_name="func_a")
    edges = [CallEdge(caller=node_a, callee=node_a, is_cross_file=False)]
    graph = CallGraph(edges=edges)
    
    # Query ancestors of a
    result = graph.ancestors(node_a) # 7.78μs -> 6.79μs (14.6% faster)
    
    # Should find a itself once (visited set prevents duplicates)
    assert result == {node_a}
    assert len(result) == 1

def test_circular_call_pattern():
    """Test that circular calls are handled correctly without infinite loops."""
    # Create circular pattern: a -> b -> c -> a
    node_a = FunctionNode(file_path=Path("test.py"), qualified_name="func_a")
    node_b = FunctionNode(file_path=Path("test.py"), qualified_name="func_b")
    node_c = FunctionNode(file_path=Path("test.py"), qualified_name="func_c")
    edges = [
        CallEdge(caller=node_a, callee=node_b, is_cross_file=False),
        CallEdge(caller=node_b, callee=node_c, is_cross_file=False),
        CallEdge(caller=node_c, callee=node_a, is_cross_file=False),
    ]
    graph = CallGraph(edges=edges)
    
    # Query ancestors of a
    result = graph.ancestors(node_a) # 12.8μs -> 10.3μs (24.1% faster)
    
    # Should find b and c (all nodes in the cycle)
    assert result == {node_b, node_c}

def test_node_not_in_graph():
    """Test querying ancestors of a node that doesn't exist in the graph."""
    # Create a graph with some edges
    node_a = FunctionNode(file_path=Path("test.py"), qualified_name="func_a")
    node_b = FunctionNode(file_path=Path("test.py"), qualified_name="func_b")
    edges = [CallEdge(caller=node_a, callee=node_b, is_cross_file=False)]
    graph = CallGraph(edges=edges)
    
    # Create a node that's not in the graph
    node_x = FunctionNode(file_path=Path("other.py"), qualified_name="func_x")
    
    # Query ancestors of the non-existent node
    result = graph.ancestors(node_x) # 9.07μs -> 7.04μs (28.7% faster)
    
    # Should return empty set
    assert result == set()

def test_nodes_with_same_name_different_module():
    """Test that nodes with same name but different files are treated separately."""
    # Create two nodes with same name but different files
    node_a_file1 = FunctionNode(file_path=Path("module1.py"), qualified_name="func")
    node_a_file2 = FunctionNode(file_path=Path("module2.py"), qualified_name="func")
    node_b = FunctionNode(file_path=Path("caller.py"), qualified_name="caller")
    
    edges = [
        CallEdge(caller=node_b, callee=node_a_file1, is_cross_file=True),
        CallEdge(caller=node_a_file1, callee=node_a_file2, is_cross_file=True),
    ]
    graph = CallGraph(edges=edges)
    
    # Query ancestors of node_a_file2
    result = graph.ancestors(node_a_file2) # 11.8μs -> 9.86μs (19.6% faster)
    
    # Should find both node_b and node_a_file1
    assert node_a_file1 in result
    assert node_b in result
    assert node_a_file2 not in result

def test_multiple_edges_same_pair():
    """Test that multiple edges between same caller-callee pair don't duplicate results."""
    # Create multiple edges from a to b (shouldn't happen in real code but test robustness)
    node_a = FunctionNode(file_path=Path("test.py"), qualified_name="func_a")
    node_b = FunctionNode(file_path=Path("test.py"), qualified_name="func_b")
    edges = [
        CallEdge(caller=node_a, callee=node_b, is_cross_file=False),
        CallEdge(caller=node_a, callee=node_b, is_cross_file=False),
        CallEdge(caller=node_a, callee=node_b, is_cross_file=False),
    ]
    graph = CallGraph(edges=edges)
    
    # Query ancestors of b
    result = graph.ancestors(node_b) # 10.5μs -> 8.74μs (19.8% faster)
    
    # Should find a only once (visited set prevents duplicates)
    assert result == {node_a}
    assert len(result) == 1

def test_max_depth_with_circular_calls():
    """Test that max_depth works correctly with circular call patterns."""
    # Create cycle: a -> b -> c -> a with additional edge d -> a
    node_a = FunctionNode(file_path=Path("test.py"), qualified_name="func_a")
    node_b = FunctionNode(file_path=Path("test.py"), qualified_name="func_b")
    node_c = FunctionNode(file_path=Path("test.py"), qualified_name="func_c")
    node_d = FunctionNode(file_path=Path("test.py"), qualified_name="func_d")
    edges = [
        CallEdge(caller=node_a, callee=node_b, is_cross_file=False),
        CallEdge(caller=node_b, callee=node_c, is_cross_file=False),
        CallEdge(caller=node_c, callee=node_a, is_cross_file=False),
        CallEdge(caller=node_d, callee=node_a, is_cross_file=False),
    ]
    graph = CallGraph(edges=edges)
    
    # Query ancestors of a with max_depth=1
    result = graph.ancestors(node_a, max_depth=1) # 13.7μs -> 11.8μs (16.4% faster)
    
    # Should only find direct callers: c and d
    assert result == {node_c, node_d}
    assert node_a not in result
    assert node_b not in result

def test_none_max_depth():
    """Test that max_depth=None allows unlimited traversal."""
    # Create a deep chain: a -> b -> c -> d -> e
    nodes = [FunctionNode(file_path=Path("test.py"), qualified_name=f"func_{chr(97+i)}") 
             for i in range(5)]
    edges = [CallEdge(caller=nodes[i], callee=nodes[i+1], is_cross_file=False) 
             for i in range(4)]
    graph = CallGraph(edges=edges)
    
    # Query ancestors with max_depth=None
    result = graph.ancestors(nodes[4], max_depth=None) # 16.8μs -> 13.9μs (20.4% faster)
    
    # Should find all ancestors: a, b, c, d
    assert result == set(nodes[:4])
    assert len(result) == 4

def test_large_linear_chain():
    """Test performance with a large linear call chain (100 nodes)."""
    num_nodes = 100
    nodes = [FunctionNode(file_path=Path("test.py"), qualified_name=f"func_{i}") 
             for i in range(num_nodes)]
    edges = [CallEdge(caller=nodes[i], callee=nodes[i+1], is_cross_file=False) 
             for i in range(num_nodes - 1)]
    graph = CallGraph(edges=edges)
    
    # Query ancestors of the last node with no limit
    result = graph.ancestors(nodes[num_nodes - 1]) # 219μs -> 165μs (32.1% faster)
    assert len(result) == num_nodes - 1
    assert result == set(nodes[:num_nodes - 1])
    
    # Query with max_depth limit at various depths
    for depth in [10, 25, 50, 75]:
        result_limited = graph.ancestors(nodes[num_nodes - 1], max_depth=depth)
        assert len(result_limited) == depth
        # Verify it includes the closest ancestors
        for i in range(depth):
            assert nodes[num_nodes - 1 - i] in result_limited
    
    # Query from middle nodes to test various starting positions
    mid_idx = num_nodes // 2 # 25.0μs -> 17.3μs (44.3% faster)
    result_mid = graph.ancestors(nodes[mid_idx])
    assert len(result_mid) == mid_idx # 5.61μs -> 4.10μs (36.9% faster)
    
    # Query from near the beginning
    result_early = graph.ancestors(nodes[10])
    assert len(result_early) == 10

def test_large_branching_graph():
    """Test performance with a large branching call graph."""
    num_branches = 10
    nodes_level_0 = [FunctionNode(file_path=Path("test.py"), qualified_name="root")]
    nodes_level_1 = [FunctionNode(file_path=Path("test.py"), qualified_name=f"level1_{i}") 
                     for i in range(num_branches)]
    nodes_level_2 = [FunctionNode(file_path=Path("test.py"), qualified_name=f"level2_{i}_{j}") 
                     for i in range(num_branches) for j in range(num_branches)]
    
    edges = []
    for node in nodes_level_1:
        edges.append(CallEdge(caller=nodes_level_0[0], callee=node, is_cross_file=False))
    
    for i, node_l1 in enumerate(nodes_level_1):
        for j in range(num_branches):
            node_l2 = nodes_level_2[i * num_branches + j]
            edges.append(CallEdge(caller=node_l1, callee=node_l2, is_cross_file=False))
    
    graph = CallGraph(edges=edges)
    
    # Query multiple nodes at level 2 to test different positions
    for level2_idx in [0, num_branches // 2, num_branches - 1, num_branches * (num_branches - 1)]:
        test_node = nodes_level_2[level2_idx]
        result = graph.ancestors(test_node) # 188μs -> 147μs (27.9% faster)
        assert nodes_level_1[level2_idx // num_branches] in result
        assert nodes_level_0[0] in result
        assert len(result) == 2
    
    # Query nodes at level 1
    for level1_idx in [0, num_branches // 2, num_branches - 1]:
        test_node = nodes_level_1[level1_idx]
        result = graph.ancestors(test_node) # 3.85μs -> 3.16μs (21.9% faster)
        assert nodes_level_0[0] in result
        assert len(result) == 1
    
    # Query with depth limits on level 2 nodes
    test_node = nodes_level_2[0]
    result_depth_1 = graph.ancestors(test_node, max_depth=1) # 1.76μs -> 1.89μs (6.92% slower)
    assert len(result_depth_1) == 1
    assert nodes_level_1[0] in result_depth_1

def test_large_fully_connected_graph():
    """Test performance with a large fully connected graph."""
    num_nodes = 50
    nodes = [FunctionNode(file_path=Path("test.py"), qualified_name=f"func_{i}") 
             for i in range(num_nodes)]
    edges = []
    
    for i in range(num_nodes):
        for j in range(i + 1, num_nodes):
            edges.append(CallEdge(caller=nodes[i], callee=nodes[j], is_cross_file=False))
    
    graph = CallGraph(edges=edges)
    
    # Query ancestors of multiple nodes
    for test_idx in [num_nodes - 1, num_nodes // 2, 10, 0]:
        result = graph.ancestors(nodes[test_idx]) # 693μs -> 370μs (86.9% faster)
        # Nodes 0 to test_idx-1 should all be ancestors
        assert len(result) == test_idx
        for i in range(test_idx):
            assert nodes[i] in result
    
    # Query with depth limits
    result_depth_limit = graph.ancestors(nodes[num_nodes - 1], max_depth=5) # 130μs -> 121μs (7.05% faster)
    assert len(result_depth_limit) == 5

def test_large_graph_with_depth_limit():
    """Test performance of depth-limited traversal on a large graph."""
    num_levels = 10
    edges = []
    node_map = {}
    
    for level in range(num_levels):
        nodes_at_level = 3 ** level
        for i in range(nodes_at_level):
            node_map[(level, i)] = FunctionNode(file_path=Path("test.py"), 
                                                 qualified_name=f"level{level}_node{i}")
    
    for level in range(num_levels - 1):
        for i in range(3 ** level):
            for j in range(3):
                caller = node_map[(level, i)]
                callee = node_map[(level + 1, i * 3 + j)]
                edges.append(CallEdge(caller=caller, callee=callee, is_cross_file=False))
    
    graph = CallGraph(edges=edges)
    
    # Query leaf nodes at different positions
    leaf_positions = [0, 1, 2]
    for leaf_pos in leaf_positions:
        leaf_node = node_map[(num_levels - 1, leaf_pos)]
        result = graph.ancestors(leaf_node) # 54.6ms -> 39.8ms (37.1% faster)
        assert len(result) > 0
    
    # Query with various depth limits on the same leaf node
    test_leaf = node_map[(num_levels - 1, 0)]
    for depth in [1, 2, 3, 5]:
        result = graph.ancestors(test_leaf, max_depth=depth) # 10.4μs -> 9.46μs (10.5% faster)
        assert len(result) == min(depth, num_levels - 1)
    
    # Query intermediate nodes at different levels
    for query_level in [num_levels - 3, num_levels - 5, num_levels - 7]:
        if query_level > 0:
            node_at_level = node_map[(query_level, 0)]
            result = graph.ancestors(node_at_level)
            assert len(result) > 0

def test_large_graph_with_many_cycles():
    """Test performance with a large graph containing many cycles."""
    grid_size = 20
    nodes = [[FunctionNode(file_path=Path("test.py"), qualified_name=f"node_{i}_{j}") 
              for j in range(grid_size)] 
             for i in range(grid_size)]
    edges = []
    
    for i in range(grid_size):
        for j in range(grid_size):
            edges.append(CallEdge(caller=nodes[i][j], 
                                callee=nodes[i][(j + 1) % grid_size],
                                is_cross_file=False))
            edges.append(CallEdge(caller=nodes[i][j], 
                                callee=nodes[(i + 1) % grid_size][j],
                                is_cross_file=False))
    
    graph = CallGraph(edges=edges)
    
    # Query multiple nodes at different grid positions
    test_positions = [(10, 10), (0, 0), (grid_size - 1, grid_size - 1), 
                      (grid_size // 2, grid_size // 2), (5, 15)]
    for row, col in test_positions:
        result = graph.ancestors(nodes[row][col]) # 2.16ms -> 1.51ms (43.6% faster)
        assert len(result) > 0
        assert isinstance(result, set)
    
    # Query with depth limits
    result_limited = graph.ancestors(nodes[10][10], max_depth=3) # 6.61μs -> 5.97μs (10.8% faster)
    assert len(result_limited) > 0
    assert len(result_limited) <= 3 * 2  # Each position has 2 possible predecessors

def test_large_graph_memory_efficiency():
    """Test that the function doesn't leak memory with large result sets."""
    root_node = FunctionNode(file_path=Path("test.py"), qualified_name="root")
    nodes = [FunctionNode(file_path=Path("test.py"), qualified_name=f"func_{i}") 
             for i in range(500)]
    edges = []
    
    for node in nodes:
        edges.append(CallEdge(caller=node, callee=root_node, is_cross_file=False))
    
    graph = CallGraph(edges=edges)
    
    # Query ancestors of root (should find all 500 callers)
    result = graph.ancestors(root_node) # 1.05ms -> 759μs (37.7% faster)
    assert len(result) == 500
    assert isinstance(result, set)
    assert len(result) == len(list(result))
    
    # Create additional intermediate nodes to test varied graph structures
    intermediate_node = FunctionNode(file_path=Path("test.py"), qualified_name="intermediate")
    edges_with_intermediate = []
    
    # Half of the nodes call intermediate, intermediate calls root
    for i in range(250):
        edges_with_intermediate.append(CallEdge(caller=nodes[i], callee=intermediate_node, is_cross_file=False))
    edges_with_intermediate.append(CallEdge(caller=intermediate_node, callee=root_node, is_cross_file=False))
    
    # Other half calls root directly
    for i in range(250, 500):
        edges_with_intermediate.append(CallEdge(caller=nodes[i], callee=root_node, is_cross_file=False))
    
    graph2 = CallGraph(edges=edges_with_intermediate)
    result2 = graph2.ancestors(root_node) # 121μs -> 81.7μs (48.5% faster)
    assert len(result2) == 501  # 500 original nodes + intermediate
    
    # Query intermediate node to test that path
    result_intermediate = graph2.ancestors(intermediate_node)
    assert len(result_intermediate) == 250

def test_very_deep_chain():
    """Test performance with a very deep linear call chain (500 nodes)."""
    num_nodes = 500
    nodes = [FunctionNode(file_path=Path("test.py"), qualified_name=f"func_{i}") 
             for i in range(num_nodes)]
    edges = [CallEdge(caller=nodes[i], callee=nodes[i+1], is_cross_file=False) 
             for i in range(num_nodes - 1)]
    graph = CallGraph(edges=edges)
    
    # Query from the deepest node with no limit
    result = graph.ancestors(nodes[num_nodes - 1]) # 1.06ms -> 809μs (31.0% faster)
    assert len(result) == num_nodes - 1
    
    # Query from various depths in the chain
    test_indices = [num_nodes - 1, num_nodes // 2, 100, 50, 10]
    for idx in test_indices:
        result = graph.ancestors(nodes[idx]) # 465μs -> 309μs (50.5% faster)
        assert len(result) == idx
    
    # Test with different depth limits on the same deep node
    for max_depth in [50, 100, 200, 400]:
        result_limited = graph.ancestors(nodes[num_nodes - 1], max_depth=max_depth) # 385μs -> 283μs (36.0% faster)
        assert len(result_limited) == max_depth

def test_large_graph_random_queries():
    """Test multiple ancestor queries on the same large graph."""
    # Create a moderately complex graph
    num_nodes = 100
    nodes = [FunctionNode(file_path=Path("test.py"), qualified_name=f"func_{i}") 
             for i in range(num_nodes)]
    edges = []
    
    # Create edges with a pattern: node i calls nodes i+1 and i+2 (with wrapping)
    for i in range(num_nodes):
        edges.append(CallEdge(caller=nodes[i], callee=nodes[(i+1) % num_nodes], is_cross_file=False))
        edges.append(CallEdge(caller=nodes[i], callee=nodes[(i+2) % num_nodes], is_cross_file=False))
    
    graph = CallGraph(edges=edges)
    
    # Perform multiple queries
    results = []
    for i in range(0, num_nodes, 10):
        result = graph.ancestors(nodes[i]) # 807μs -> 586μs (37.7% faster)
        results.append(result)
        assert isinstance(result, set)
    
    # All results should be non-empty due to the cyclic nature
    assert all(len(r) > 0 for r in results)

def test_large_graph_with_isolated_components():
    """Test performance with a large graph containing isolated connected components."""
    # Create 10 separate call graphs (components)
    components = []
    edges = []
    
    for component_id in range(10):
        # Each component has a chain of 50 nodes
        comp_nodes = [FunctionNode(file_path=Path(f"comp{component_id}.py"), 
                                   qualified_name=f"func_{i}") 
                     for i in range(50)]
        components.append(comp_nodes)
        
        # Create edges within the component
        for i in range(49):
            edges.append(CallEdge(caller=comp_nodes[i], callee=comp_nodes[i+1], is_cross_file=False))
    
    graph = CallGraph(edges=edges)
    
    # Query ancestors from a node in the middle of each component
    for component in components:
        result = graph.ancestors(component[25]) # 934μs -> 724μs (29.0% faster)
        # Should find all ancestors within the component (25 predecessors)
        assert len(result) == 25
        # Should not find nodes from other components
        for other_component in components:
            if other_component is not component:
                assert not any(node in result for node in other_component)

To edit these changes git checkout codeflash/optimize-pr1660-2026-03-15T02.05.53 and push.

Codeflash Static Badge

The optimization replaces the per-iteration `max_depth is not None and depth >= max_depth` check with a single upfront branch that runs two specialized BFS variants: one without depth tracking (storing plain `FunctionNode` in the queue) when `max_depth` is None, and one with depth tracking (storing `tuple[FunctionNode, int]`) when a limit is set. This eliminates tuple packing/unpacking and a conditional check on every loop iteration in the common unlimited-depth case. Line profiler shows the original `for edge in self.callers_of(current)` accounted for 91% of runtime; the optimized code caches `self.reverse` once and uses `reverse_map.get(current, [])` inline, avoiding 8309 redundant dictionary lookups. The trade-off is slightly longer code due to the two-path structure, but runtime improves 37% with negligible regressions in a few edge cases (max_depth=0 is 66% slower, but these are rare micro-benchmarks with sub-microsecond absolute deltas).
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Mar 15, 2026
@claude
Copy link
Copy Markdown
Contributor

claude bot commented Mar 15, 2026

Claude finished @codeflash-ai[bot]'s task in 7s —— View job


PR Review Summary

Prek Checks

Fixed: Ruff detected F811 Redefinition of unused reverse from line 67 — the optimization PR introduced a duplicate reverse property at the bottom of CallGraph that shadowed and replaced the original one at line 67, leaving the original as unreachable dead code. Also fixed 2 W293 blank-line-with-whitespace issues.

Committed fix: fix: remove duplicate reverse property and dead _build_reverse method (pushed to this branch).

The final diff after the fix now cleanly shows only the intended ancestors() optimization with no lint issues.

Code Review

No bugs. The ancestors() optimization is correct and well-structured:

  • Dual-path BFS (codeflash/models/call_graph.py:99-122): Splitting into an unlimited-depth path (plain FunctionNode in queue, no tuple packing/unpacking) and a depth-limited path is a valid optimization. The common case (max_depth=None) avoids per-iteration tuple construction and depth comparison.
  • Cached reverse_map (call_graph.py:101): self.reverse is a cached property; caching it into a local variable avoids repeated property lookups inside the hot loop.
  • Behavioral note: The PR description mentions max_depth=0 and max_depth=-1 are ~66-68% slower. This is expected and acceptable — these are degenerate cases with sub-microsecond absolute times that are not performance-critical paths.
  • descendants() is not symmetrically optimized (call_graph.py:86-97), but this PR only targets ancestors per its stated scope. No issue.

Duplicate Detection

No duplicates detected. ancestors() and descendants() are symmetric mirror methods (caller/callee traversal) and appropriately differ in their implementation.

Test Coverage

File Statements Miss Coverage
codeflash/models/call_graph.py 162 1 99%

All 39 existing unit tests pass.


Last updated: 2026-03-15
| Branch: codeflash/optimize-pr1660-2026-03-15T02.05.53

The optimizer added a second 'reverse' property at the bottom of CallGraph
that shadowed the existing one (F811), leaving the original as dead code.
Remove the duplicate and the unnecessary _build_reverse helper; the existing
reverse property via _build_adjacency is sufficient.
@KRRT7
Copy link
Copy Markdown
Collaborator

KRRT7 commented Mar 15, 2026

Applied directly in 77576ba — optimized descendants, ancestors, and topological_order together with the same pattern (local dict cache, split max_depth fast path, inlined dict.get).

@KRRT7 KRRT7 closed this Mar 15, 2026
@codeflash-ai codeflash-ai bot deleted the codeflash/optimize-pr1660-2026-03-15T02.05.53 branch March 15, 2026 03:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant