diff --git a/api/analyzers/source_analyzer.py b/api/analyzers/source_analyzer.py index 9046abcf..e112adec 100644 --- a/api/analyzers/source_analyzer.py +++ b/api/analyzers/source_analyzer.py @@ -122,13 +122,38 @@ def first_pass(self, path: Path, files: list[Path], ignore: list[str], graph: Gr def second_pass(self, graph: Graph, files: list[Path], path: Path) -> None: """ - Recursively analyze the contents of a directory. - - Args: - base (str): The base directory for analysis. - root (str): The current directory being analyzed. - executor (concurrent.futures.Executor): The executor to run tasks concurrently. + Resolve symbol references across the codebase via LSP and write the + resulting edges (CALLS / EXTENDS / IMPLEMENTS / RETURNS / PARAMETERS) + into the graph. + + Symbol resolution dominates index wall-time on large repos: every + file's entities trigger several `lsp.request_definition` calls and + most of them are I/O-bound waiting on the language server. + multilspy's SyncLanguageServer schedules each request onto a single + asyncio loop running in a daemon thread (via + `asyncio.run_coroutine_threadsafe`), which makes concurrent calls + from multiple worker threads safe and lets us pipeline them. + + We therefore split second_pass into two phases: + + A. Parallel resolution. A bounded thread pool processes files in + parallel, calling `entity.resolved_symbol(...)` per entity so + each `Symbol.resolved_symbol` set gets populated. No graph + writes happen here. + + B. Serial edge writes. The main thread iterates the same files + in their original order and emits the EXTENDS / CALLS / ... + edges. Keeping graph writes on one thread avoids contending on + FalkorDB MERGE locks and produces a deterministic edge order + matching the pre-parallel implementation. + + Pool size is controlled by `CODE_GRAPH_INDEX_WORKERS` (default 4). + Set it to 1 to fall back to the historical fully-serial behavior + (useful for debugging or for hosts where multilspy/jedi misbehaves + under concurrency). """ + import os + from concurrent.futures import ThreadPoolExecutor, as_completed logger = MultilspyLogger() logger.logger.setLevel(logging.ERROR) @@ -153,17 +178,71 @@ def second_pass(self, graph: Graph, files: list[Path], path: Path) -> None: lsps[".kts"] = NullLanguageServer() lsps[".js"] = NullLanguageServer() with lsps[".java"].start_server(), lsps[".py"].start_server(), lsps[".cs"].start_server(), lsps[".js"].start_server(), lsps[".kt"].start_server(), lsps[".kts"].start_server(): - files_len = len(self.files) - for i, file_path in enumerate(files): + try: + n_workers = max(1, int(os.environ.get("CODE_GRAPH_INDEX_WORKERS", "4"))) + except ValueError: + n_workers = 4 + + # Drop files we don't actually have an entry for and skip files + # whose language has no real LSP (NullLanguageServer provides + # no symbol info, so resolution would be a no-op). + resolvable: list[Path] = [] + for file_path in files: if file_path not in self.files: continue - # Skip symbol resolution when no real LSP is available if isinstance(lsps.get(file_path.suffix), NullLanguageServer): continue + resolvable.append(file_path) + + total = len(resolvable) + logging.info( + "second_pass: resolving symbols in %d files with %d worker(s)", + total, n_workers, + ) + + def _resolve_file(file_path: Path) -> Path: + # Populate Symbol.resolved_symbol sets for every entity in + # this file. Pure LSP work, safe to run from worker threads + # because SyncLanguageServer multiplexes requests through a + # single asyncio loop. + file = self.files[file_path] + for _, entity in file.entities.items(): + entity.resolved_symbol( + lambda key, symbol, fp=file_path: analyzers[fp.suffix].resolve_symbol( + self.files, lsps[fp.suffix], fp, path, key, symbol + ) + ) + return file_path + + done = 0 + log_every = max(1, total // 50) if total else 1 + if n_workers == 1: + for fp in resolvable: + _resolve_file(fp) + done += 1 + if done % log_every == 0 or done == total: + logging.info("second_pass: resolved %d/%d files", done, total) + else: + with ThreadPoolExecutor(max_workers=n_workers, thread_name_prefix="sa-resolve") as ex: + futures = {ex.submit(_resolve_file, fp): fp for fp in resolvable} + for fut in as_completed(futures): + fp = futures[fut] + try: + fut.result() + except Exception as exc: + logging.warning( + "second_pass: resolution failed for %s: %s", + fp, exc, + ) + done += 1 + if done % log_every == 0 or done == total: + logging.info("second_pass: resolved %d/%d files", done, total) + + # Phase B: serial edge writes, in the original file order so + # the graph is bit-identical to the single-threaded path. + for file_path in resolvable: file = self.files[file_path] - logging.info(f'Processing file ({i + 1}/{files_len}): {file_path}') for _, entity in file.entities.items(): - entity.resolved_symbol(lambda key, symbol, fp=file_path: analyzers[fp.suffix].resolve_symbol(self.files, lsps[fp.suffix], fp, path, key, symbol)) for key, resolved_set in entity.resolved_symbols.items(): for resolved in resolved_set: if key == "base_class":