Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 90 additions & 11 deletions api/analyzers/source_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,38 @@ def first_pass(self, path: Path, files: list[Path], ignore: list[str], graph: Gr

def second_pass(self, graph: Graph, files: list[Path], path: Path) -> None:
"""
Recursively analyze the contents of a directory.

Args:
base (str): The base directory for analysis.
root (str): The current directory being analyzed.
executor (concurrent.futures.Executor): The executor to run tasks concurrently.
Resolve symbol references across the codebase via LSP and write the
resulting edges (CALLS / EXTENDS / IMPLEMENTS / RETURNS / PARAMETERS)
into the graph.

Symbol resolution dominates index wall-time on large repos: every
file's entities trigger several `lsp.request_definition` calls and
most of them are I/O-bound waiting on the language server.
multilspy's SyncLanguageServer schedules each request onto a single
asyncio loop running in a daemon thread (via
`asyncio.run_coroutine_threadsafe`), which makes concurrent calls
from multiple worker threads safe and lets us pipeline them.

We therefore split second_pass into two phases:

A. Parallel resolution. A bounded thread pool processes files in
parallel, calling `entity.resolved_symbol(...)` per entity so
each `Symbol.resolved_symbol` set gets populated. No graph
writes happen here.

B. Serial edge writes. The main thread iterates the same files
in their original order and emits the EXTENDS / CALLS / ...
edges. Keeping graph writes on one thread avoids contending on
FalkorDB MERGE locks and produces a deterministic edge order
matching the pre-parallel implementation.

Pool size is controlled by `CODE_GRAPH_INDEX_WORKERS` (default 4).
Set it to 1 to fall back to the historical fully-serial behavior
(useful for debugging or for hosts where multilspy/jedi misbehaves
under concurrency).
"""
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

logger = MultilspyLogger()
logger.logger.setLevel(logging.ERROR)
Expand All @@ -153,17 +178,71 @@ def second_pass(self, graph: Graph, files: list[Path], path: Path) -> None:
lsps[".kts"] = NullLanguageServer()
lsps[".js"] = NullLanguageServer()
with lsps[".java"].start_server(), lsps[".py"].start_server(), lsps[".cs"].start_server(), lsps[".js"].start_server(), lsps[".kt"].start_server(), lsps[".kts"].start_server():
files_len = len(self.files)
for i, file_path in enumerate(files):
try:
n_workers = max(1, int(os.environ.get("CODE_GRAPH_INDEX_WORKERS", "4")))
except ValueError:
n_workers = 4

# Drop files we don't actually have an entry for and skip files
# whose language has no real LSP (NullLanguageServer provides
# no symbol info, so resolution would be a no-op).
resolvable: list[Path] = []
for file_path in files:
if file_path not in self.files:
continue
# Skip symbol resolution when no real LSP is available
if isinstance(lsps.get(file_path.suffix), NullLanguageServer):
continue
resolvable.append(file_path)

total = len(resolvable)
logging.info(
"second_pass: resolving symbols in %d files with %d worker(s)",
total, n_workers,
)

def _resolve_file(file_path: Path) -> Path:
# Populate Symbol.resolved_symbol sets for every entity in
# this file. Pure LSP work, safe to run from worker threads
# because SyncLanguageServer multiplexes requests through a
# single asyncio loop.
file = self.files[file_path]
for _, entity in file.entities.items():
entity.resolved_symbol(
lambda key, symbol, fp=file_path: analyzers[fp.suffix].resolve_symbol(
self.files, lsps[fp.suffix], fp, path, key, symbol
)
)
return file_path

done = 0
log_every = max(1, total // 50) if total else 1
if n_workers == 1:
for fp in resolvable:
_resolve_file(fp)
done += 1
if done % log_every == 0 or done == total:
logging.info("second_pass: resolved %d/%d files", done, total)
else:
with ThreadPoolExecutor(max_workers=n_workers, thread_name_prefix="sa-resolve") as ex:
futures = {ex.submit(_resolve_file, fp): fp for fp in resolvable}
for fut in as_completed(futures):
fp = futures[fut]
try:
fut.result()
except Exception as exc:
logging.warning(
"second_pass: resolution failed for %s: %s",
fp, exc,
)
done += 1
if done % log_every == 0 or done == total:
logging.info("second_pass: resolved %d/%d files", done, total)

# Phase B: serial edge writes, in the original file order so
# the graph is bit-identical to the single-threaded path.
for file_path in resolvable:
Comment on lines +228 to +243
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't continue phase B after a worker failure.

Line 232 swallows any _resolve_file exception, but Lines 241-243 still emit graph edges. That makes correctness depend on CODE_GRAPH_INDEX_WORKERS: workers=1 still fails fast, while workers>1 can silently write a partial graph for the failed file. Track failed files and abort before phase B, or explicitly exclude/clear them before writing edges.

🛠️ Proposed fix
             done = 0
             log_every = max(1, total // 50) if total else 1
+            failed_files: list[Path] = []
             if n_workers == 1:
                 for fp in resolvable:
                     _resolve_file(fp)
                     done += 1
                     if done % log_every == 0 or done == total:
@@
                 with ThreadPoolExecutor(max_workers=n_workers, thread_name_prefix="sa-resolve") as ex:
                     futures = {ex.submit(_resolve_file, fp): fp for fp in resolvable}
                     for fut in as_completed(futures):
                         fp = futures[fut]
                         try:
                             fut.result()
                         except Exception:
-                            logging.warning(
-                                "second_pass: resolution failed for %s: %s",
-                                fp, exc,
-                            )
+                            failed_files.append(fp)
+                            logging.exception(
+                                "second_pass: resolution failed for %s",
+                                fp,
+                            )
                         done += 1
                         if done % log_every == 0 or done == total:
                             logging.info("second_pass: resolved %d/%d files", done, total)
+
+            if failed_files:
+                raise RuntimeError(
+                    f"second_pass aborted after symbol resolution failed for {len(failed_files)} file(s)"
+                )
 
             # Phase B: serial edge writes, in the original file order so
             # the graph is bit-identical to the single-threaded path.
             for file_path in resolvable:
🧰 Tools
🪛 Ruff (0.15.14)

[warning] 232-232: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@api/analyzers/source_analyzer.py` around lines 228 - 243, The worker loop
that calls fut.result() in the as_completed handling swallows exceptions for
_resolve_file and still proceeds to Phase B which iterates resolvable and writes
edges; modify the logic around futures and resolvable so that any file whose
future raised an exception is recorded and removed (or cause an early abort)
before Phase B starts: capture failed file paths from the except block (use the
mapping futures[fut] to get fp), store them in a failed set, and then either
raise a single aggregated exception to stop processing or filter out those paths
from the resolvable list before the serial edge-write loop to ensure no edges
are written for failed files.

file = self.files[file_path]
logging.info(f'Processing file ({i + 1}/{files_len}): {file_path}')
for _, entity in file.entities.items():
entity.resolved_symbol(lambda key, symbol, fp=file_path: analyzers[fp.suffix].resolve_symbol(self.files, lsps[fp.suffix], fp, path, key, symbol))
for key, resolved_set in entity.resolved_symbols.items():
for resolved in resolved_set:
if key == "base_class":
Expand Down
Loading