Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions code_review_graph/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,35 @@ def _fts_search(
return []


def _fts_prefix_search(
conn: sqlite3.Connection,
prefix: str,
limit: int = 20,
) -> list[tuple[int, float]]:
"""Run an FTS5 prefix search for autocomplete suggestions.

Uses the `*` wildcard suffix to match all terms starting with `prefix`.
Returns list of ``(node_id, score)`` tuples.
"""
if not prefix or len(prefix) < 1:
return []

# Escape special FTS5 characters and add wildcard
escaped = prefix.replace('"', '""')
safe_query = f'"{escaped}"*'

try:
rows = conn.execute(
"SELECT rowid, rank FROM nodes_fts WHERE nodes_fts MATCH ? "
"ORDER BY rank LIMIT ?",
(safe_query, limit),
).fetchall()
return [(row[0], -row[1]) for row in rows]
except sqlite3.OperationalError as e:
logger.warning("FTS5 prefix search failed: %s", e)
return []


# ---------------------------------------------------------------------------
# Embedding search (optional)
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -391,3 +420,93 @@ def hybrid_search(
})

return results


# ---------------------------------------------------------------------------
# Autocomplete / prefix search
# ---------------------------------------------------------------------------


def autocomplete(
store: GraphStore,
prefix: str,
kind: Optional[str] = None,
limit: int = 10,
) -> list[dict[str, Any]]:
"""Provide autocomplete suggestions based on a name prefix.

Uses FTS5 prefix search for fast prefix matching, with additional
scoring based on match quality (exact prefix, camelCase, etc.).

Args:
store: The graph store.
prefix: The prefix string to match against function/class names.
kind: Optional node kind filter (e.g. ``"Function"``, ``"Class"``).
limit: Maximum suggestions to return (default 10).

Returns:
List of dicts with name, qualified_name, kind, file_path, and match_score.
"""
if not prefix or len(prefix) < 1:
return []

conn = store._conn

prefix_lower = prefix.lower()
has_upper = any(c.isupper() for c in prefix)
is_snake = "_" in prefix
is_pascal = has_upper and prefix[0].isupper()

fts_results = _fts_prefix_search(conn, prefix, limit=limit * 2)

if not fts_results:
return []

node_ids = [node_id for node_id, _ in fts_results]
placeholders = ",".join("?" for _ in node_ids)
rows = conn.execute(
f"SELECT * FROM nodes WHERE id IN ({placeholders})", # nosec B608
node_ids,
).fetchall()

scored: list[tuple[int, dict[str, Any]]] = []
for row in rows:
node_kind = row["kind"]
if kind and node_kind != kind:
continue

name = row["name"]
name_lower = name.lower()

match_score = 0.0

if name_lower == prefix_lower:
match_score = 100.0
elif name_lower.startswith(prefix_lower):
match_score = 80.0
elif name_lower.replace("_", "").startswith(prefix_lower.replace("_", "")):
match_score = 70.0
elif is_pascal:
if prefix == "".join(c for c in name if c.isupper() or c == "_"):
match_score = 60.0
elif is_snake:
parts = name_lower.split("_")
if any(p.startswith(prefix_lower) for p in parts):
match_score = 50.0

if match_score > 0:
scored.append((match_score, {
"name": _sanitize_name(name),
"qualified_name": _sanitize_name(row["qualified_name"]),
"kind": node_kind,
"file_path": row["file_path"],
"line_start": row["line_start"],
"line_end": row["line_end"],
"language": row["language"] or "",
"params": row["params"],
"return_type": row["return_type"],
"match_score": round(match_score, 2),
}))

scored.sort(key=lambda x: x[0], reverse=True)
return [item for _, item in scored[:limit]]
Loading