Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/analyzers/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def resolve(self, files: dict[Path, File], lsp: SyncLanguageServer, file_path: P
locations = lsp.request_definition(str(file_path), node.start_point.row, node.start_point.column)
return [(files[Path(self.resolve_path(location['absolutePath'], path))], files[Path(self.resolve_path(location['absolutePath'], path))].tree.root_node.descendant_for_point_range(Point(location['range']['start']['line'], location['range']['start']['character']), Point(location['range']['end']['line'], location['range']['end']['character']))) for location in locations if location and Path(self.resolve_path(location['absolutePath'], path)) in files]
except Exception as e:
import logging
logging.getLogger(__name__).warning(
"resolve() failed for %s @%d:%d: %s",
file_path, node.start_point.row, node.start_point.column, e,
)
return []

@abstractmethod
Expand Down
33 changes: 31 additions & 2 deletions api/analyzers/source_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,27 @@ def second_pass(self, graph: Graph, files: list[Path], path: Path) -> None:
else:
lsps[".java"] = NullLanguageServer()
if any(path.rglob('*.py')):
config = MultilspyConfig.from_dict({"code_language": "python", "environment_path": f"{path}/venv"})
import sys
py_venv = path / "venv"
py_dotvenv = path / ".venv"
if py_venv.is_dir() and (py_venv / "bin" / "python").exists():
env_path = str(py_venv)
elif py_dotvenv.is_dir() and (py_dotvenv / "bin" / "python").exists():
env_path = str(py_dotvenv)
else:
# Fall back to the host's Python environment so jedi has a
# valid interpreter to introspect; otherwise every
# request_definition() raises InvalidPythonEnvironment and
# we'd silently produce a graph with zero CALLS edges.
env_path = str(Path(sys.executable).resolve().parent.parent)
logging.info(
"No venv at %s; falling back to host env %s for jedi LSP",
path, env_path,
)
config = MultilspyConfig.from_dict({
"code_language": "python",
"environment_path": env_path,
})
lsps[".py"] = SyncLanguageServer.create(config, logger, str(path))
else:
lsps[".py"] = NullLanguageServer()
Expand All @@ -146,7 +166,16 @@ def second_pass(self, graph: Graph, files: list[Path], path: Path) -> None:
with lsps[".java"].start_server(), lsps[".py"].start_server(), lsps[".cs"].start_server():
files_len = len(self.files)
for i, file_path in enumerate(files):
file = self.files[file_path]
file = self.files.get(file_path)
if file is None:
# first_pass skipped this file (e.g. parse error, empty,
# or ignored after entering the candidate list). Skip
# in second_pass too instead of crashing the whole index.
logging.warning(
"second_pass: %s not in files map (first_pass skipped it); skipping",
file_path,
)
continue
logging.info(f'Processing file ({i + 1}/{files_len}): {file_path}')
for _, entity in file.entities.items():
entity.resolved_symbol(lambda key, symbol, fp=file_path: analyzers[fp.suffix].resolve_symbol(self.files, lsps[fp.suffix], fp, path, key, symbol))
Expand Down
163 changes: 163 additions & 0 deletions bench/agents/code_graph_mcp_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
"""MCP-transport adapter to cgraph-mcp for the benchmark.

Sibling of `code_graph_adapter.py` (HTTP). Where the HTTP adapter talks
to the host FastAPI service over the network, this one spawns the
`cgraph-mcp` stdio MCP server in-process via the official MCP Python
SDK and dispatches tool calls over JSON-RPC.

This gives us a second, real-world benchmark track that exercises the
exact same transport agents (Claude Code, Cursor, …) will use in
production. Tool names match the 8-tool MCP surface
(`index_repo`, `search_code`, `get_callers`, `get_callees`,
`get_dependencies`, `impact_analysis`, `find_path`, `ask`).

Each call spawns a fresh server, runs the call, and exits. That's
~0.5-1s overhead per call but keeps the model trivially safe to call
from a bash shim (one process per invocation, no shared state).
A future optimisation could persist the server across calls via a
side-channel daemon, but per-call spawn matches how external agents
actually use MCP servers today.
"""

from __future__ import annotations

import asyncio
import json
import os
from typing import Any

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client


DEFAULT_TIMEOUT_SEC = 60.0


def _env_for_mcp() -> dict[str, str]:
"""Build the env for the spawned cgraph-mcp process.

Pass through everything from the caller but make sure the FalkorDB
coordinates are present — the runner usually sets them to point at
the host FalkorDB container.
"""
env = dict(os.environ)
env.setdefault("FALKORDB_HOST", os.environ.get("FALKORDB_HOST", "127.0.0.1"))
env.setdefault("FALKORDB_PORT", os.environ.get("FALKORDB_PORT", "6379"))
return env


def _extract(result: Any) -> Any:
"""Normalize a CallToolResult into a JSON-serialisable Python value.

The MCP spec lets servers put the payload in `structuredContent`
and/or echo it as a JSON text chunk. Our 8 tools do both; agents
have historically preferred the text payload. We mirror that:
return the parsed text chunk when present, otherwise fall back to
structuredContent (unwrapping the spec's `{"result": ...}` wrapper
for collection-returning tools).
"""
for chunk in result.content:
if hasattr(chunk, "text") and chunk.text:
try:
return json.loads(chunk.text)
except json.JSONDecodeError:
return chunk.text
struct = getattr(result, "structuredContent", None)
if isinstance(struct, dict) and set(struct.keys()) == {"result"}:
return struct["result"]
return struct


async def _call_tool_async(name: str, arguments: dict[str, Any], timeout: float) -> Any:
params = StdioServerParameters(command="cgraph-mcp", args=[], env=_env_for_mcp())
async with stdio_client(params) as (read, write):
async with ClientSession(read, write) as session:
await asyncio.wait_for(session.initialize(), timeout=timeout)
result = await asyncio.wait_for(
session.call_tool(name, arguments), timeout=timeout
)
payload = _extract(result)
if getattr(result, "isError", False):
return {"error": payload}
return payload


def call_tool(name: str, arguments: dict[str, Any], *, timeout: float = DEFAULT_TIMEOUT_SEC) -> Any:
"""Sync entry point for the bash shim. One spawn per call."""
return asyncio.run(_call_tool_async(name, arguments, timeout))


# ── Top-level convenience wrappers ─────────────────────────────────────
# Names map 1:1 onto MCP tool names (and onto bench/tools/code_graph_mcp/
# tools.yaml entries). Kwargs mirror each tool's MCP arg schema.


def index_repo(path_or_url: str, branch: str | None = None, ignore: list[str] | None = None) -> dict[str, Any]:
args: dict[str, Any] = {"path_or_url": path_or_url}
if branch is not None:
args["branch"] = branch
if ignore is not None:
args["ignore"] = ignore
return call_tool("index_repo", args)


def search_code(prefix: str, project: str, branch: str | None = None, limit: int = 10) -> Any:
args: dict[str, Any] = {"prefix": prefix, "project": project, "limit": limit}
if branch is not None:
args["branch"] = branch
return call_tool("search_code", args)


def _neighbors(tool: str, symbol_id: int, project: str, branch: str | None, limit: int) -> Any:
args: dict[str, Any] = {"symbol_id": symbol_id, "project": project, "limit": limit}
if branch is not None:
args["branch"] = branch
return call_tool(tool, args)


def get_callers(symbol_id: int, project: str, branch: str | None = None, limit: int = 50) -> Any:
return _neighbors("get_callers", symbol_id, project, branch, limit)


def get_callees(symbol_id: int, project: str, branch: str | None = None, limit: int = 50) -> Any:
return _neighbors("get_callees", symbol_id, project, branch, limit)


def get_dependencies(symbol_id: int, project: str, branch: str | None = None, limit: int = 50) -> Any:
return _neighbors("get_dependencies", symbol_id, project, branch, limit)


def impact_analysis(
symbol_id: int,
project: str,
branch: str | None = None,
direction: str = "IN",
depth: int = 3,
) -> Any:
args: dict[str, Any] = {
"symbol_id": symbol_id,
"project": project,
"direction": direction,
"depth": depth,
}
if branch is not None:
args["branch"] = branch
return call_tool("impact_analysis", args)


def find_path(source_id: int, dest_id: int, project: str, branch: str | None = None) -> Any:
args: dict[str, Any] = {
"source_id": source_id,
"dest_id": dest_id,
"project": project,
}
if branch is not None:
args["branch"] = branch
return call_tool("find_path", args)


def ask(question: str, project: str, branch: str | None = None) -> Any:
args: dict[str, Any] = {"question": question, "project": project}
if branch is not None:
args["branch"] = branch
return call_tool("ask", args)
5 changes: 5 additions & 0 deletions bench/cli/cg-mcp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env bash
# Bash-callable entry point for the code-graph MCP CLI. Mirrors `cg`
# but speaks JSON-RPC over stdio to a spawned `cgraph-mcp` server
# instead of HTTP to the FastAPI service. Runner adds bench/cli to PATH.
exec "${BENCH_PYTHON:-python3}" -m bench.cli.cg_mcp "$@"
140 changes: 140 additions & 0 deletions bench/cli/cg_mcp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""`cg-mcp` — bash-callable CLI exposing code-graph's 8 MCP tools.

This is the MCP-transport sibling of `cg`. Where `cg` calls the host
FastAPI service over HTTP, `cg-mcp` spawns the `cgraph-mcp` stdio
server (via the official MCP Python SDK) for every invocation and
dispatches one tool call.

The MCP track is what external agents (Claude Code, Cursor, …) use
in production; benchmarking through it tells us how the *real-world*
integration behaves under SWE-bench, not just the in-process FastAPI
adapter.

Subcommands mirror the MCP tool names:

cg-mcp index_repo --path-or-url . [--branch B] [--ignore PAT ...]
cg-mcp search_code --project P --prefix STR [--branch B] [--limit N]
cg-mcp get_callers --project P --symbol-id ID [--branch B] [--limit N]
cg-mcp get_callees --project P --symbol-id ID [--branch B] [--limit N]
cg-mcp get_dependencies --project P --symbol-id ID [--branch B] [--limit N]
cg-mcp impact_analysis --project P --symbol-id ID [--direction IN|OUT] [--depth N]
cg-mcp find_path --project P --source-id ID --dest-id ID [--branch B]
cg-mcp ask --project P --question "..." [--branch B]

Output: one JSON document per call on stdout. Errors print to stderr
and exit non-zero.

Env: FALKORDB_HOST / FALKORDB_PORT are passed through to the spawned
server. Optionally set CGRAPH_MCP_TIMEOUT_SEC to override the
default 60s timeout.
"""

from __future__ import annotations

import argparse
import json
import os
import sys
from typing import Any

from bench.agents import code_graph_mcp_adapter as cgm


def _print(obj: Any) -> None:
json.dump(obj, sys.stdout, indent=2, sort_keys=True, default=str)
sys.stdout.write("\n")


def _timeout() -> float:
try:
return float(os.getenv("CGRAPH_MCP_TIMEOUT_SEC", "60"))
except ValueError:
return 60.0


def _add_project(p: argparse.ArgumentParser) -> None:
p.add_argument("--project", required=True)
p.add_argument("--branch", default=None)


def _add_symbol(p: argparse.ArgumentParser) -> None:
p.add_argument("--symbol-id", type=int, required=True, dest="symbol_id")
p.add_argument("--limit", type=int, default=50)


def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="cg-mcp", description=__doc__)
sub = parser.add_subparsers(dest="cmd", required=True)

ir = sub.add_parser("index_repo")
ir.add_argument("--path-or-url", required=True, dest="path_or_url")
ir.add_argument("--branch", default=None)
ir.add_argument("--ignore", nargs="*", default=None)

sc = sub.add_parser("search_code")
_add_project(sc)
sc.add_argument("--prefix", required=True)
sc.add_argument("--limit", type=int, default=10)

for name in ("get_callers", "get_callees", "get_dependencies"):
p = sub.add_parser(name)
_add_project(p)
_add_symbol(p)

ia = sub.add_parser("impact_analysis")
_add_project(ia)
ia.add_argument("--symbol-id", type=int, required=True, dest="symbol_id")
ia.add_argument("--direction", choices=["IN", "OUT"], default="IN")
ia.add_argument("--depth", type=int, default=3)

fp = sub.add_parser("find_path")
_add_project(fp)
fp.add_argument("--source-id", type=int, required=True, dest="source_id")
fp.add_argument("--dest-id", type=int, required=True, dest="dest_id")

aq = sub.add_parser("ask")
_add_project(aq)
aq.add_argument("--question", required=True)

args = parser.parse_args(argv)
timeout = _timeout()

# Inject timeout for adapter calls.
cgm.DEFAULT_TIMEOUT_SEC = timeout

try:
if args.cmd == "index_repo":
_print(cgm.index_repo(args.path_or_url, branch=args.branch, ignore=args.ignore))
elif args.cmd == "search_code":
_print(cgm.search_code(args.prefix, args.project, branch=args.branch, limit=args.limit))
elif args.cmd == "get_callers":
_print(cgm.get_callers(args.symbol_id, args.project, branch=args.branch, limit=args.limit))
elif args.cmd == "get_callees":
_print(cgm.get_callees(args.symbol_id, args.project, branch=args.branch, limit=args.limit))
elif args.cmd == "get_dependencies":
_print(cgm.get_dependencies(args.symbol_id, args.project, branch=args.branch, limit=args.limit))
elif args.cmd == "impact_analysis":
_print(
cgm.impact_analysis(
args.symbol_id,
args.project,
branch=args.branch,
direction=args.direction,
depth=args.depth,
)
)
elif args.cmd == "find_path":
_print(cgm.find_path(args.source_id, args.dest_id, args.project, branch=args.branch))
elif args.cmd == "ask":
_print(cgm.ask(args.question, args.project, branch=args.branch))
else: # pragma: no cover — argparse already enforces this
parser.error(f"unknown subcommand: {args.cmd}")
except Exception as e: # noqa: BLE001 — surface everything to the agent
print(f"cg-mcp error: {e}", file=sys.stderr)
return 1

return 0


if __name__ == "__main__":
sys.exit(main())
Loading