Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 209 additions & 0 deletions code_review_graph/changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@

from __future__ import annotations

import hashlib
import logging
import os
import re
import subprocess
from pathlib import Path
from typing import Any

from .constants import SECURITY_KEYWORDS as _SECURITY_KEYWORDS
from .flows import get_affected_flows
from .graph import GraphNode, GraphStore, _sanitize_name, node_to_dict
from .parser import CodeParser

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -293,3 +296,209 @@ def analyze_changes(
"test_gaps": test_gaps,
"review_priorities": review_priorities,
}


# ---------------------------------------------------------------------------
# 5. Breaking change detection
# ---------------------------------------------------------------------------


def _get_file_from_git(repo_root: str, file_path: str, ref: str) -> bytes | None:
"""Get the content of a file at a specific git ref."""
if not _SAFE_GIT_REF.match(ref):
return None
try:
result = subprocess.run(
["git", "show", f"{ref}:{file_path}"],
capture_output=True,
cwd=repo_root,
timeout=_GIT_TIMEOUT,
)
if result.returncode == 0:
return result.stdout
except (OSError, subprocess.SubprocessError) as exc:
logger.warning("git show failed: %s", exc)
return None


def _parse_params(params_str: str | None) -> list[str]:
"""Parse parameter string into a list of parameter names."""
if not params_str:
return []
params = []
for part in params_str.split(","):
part = part.strip()
if not part:
continue
name = part.split(":")[0].split("=")[0].strip()
name = name.replace("*", "").strip()
if name and name not in ("self", "cls"):
params.append(name)
return params


def _detect_signature_change(
old_node: GraphNode,
new_node: GraphNode,
) -> dict[str, Any] | None:
"""Detect if there's a breaking signature change between old and new node."""
changes: list[dict[str, str]] = []

old_params = _parse_params(old_node.params)
new_params = _parse_params(new_node.params)

if old_params != new_params:
if len(old_params) != len(new_params):
changes.append({
"type": "param_count",
"severity": "high",
"detail": f"Parameter count changed from {len(old_params)} to {len(new_params)}",
})
else:
for i, (old_p, new_p) in enumerate(zip(old_params, new_params)):
if old_p != new_p:
changes.append({
"type": "param_renamed",
"severity": "medium",
"detail": f"Parameter {i}: '{old_p}' renamed to '{new_p}'",
})

if old_node.return_type != new_node.return_type:
if old_node.return_type and new_node.return_type:
changes.append({
"type": "return_type",
"severity": "high",
"detail": f"Return type changed from '{old_node.return_type}' to '{new_node.return_type}'",
})
elif new_node.return_type is None:
changes.append({
"type": "return_type_removed",
"severity": "high",
"detail": "Return type annotation removed",
})

if changes:
return {
"qualified_name": old_node.qualified_name,
"file": old_node.file_path,
"line": old_node.line_start,
"changes": changes,
}
return None


def _is_public_api(node: GraphNode) -> bool:
"""Return True if the node represents a public API (not starting with _)."""
if node.name.startswith("_") and not node.name.startswith("__"):
return False
return node.kind in ("Function", "Class", "Type")


def detect_breaking_changes(
store: GraphStore,
repo_root: str,
changed_files: list[str],
base: str = "HEAD~1",
) -> dict[str, Any]:
"""Detect breaking API changes between the current state and a git ref.

Args:
store: The graph store (current state).
repo_root: Repository root path.
changed_files: List of changed file paths.
base: Git ref to compare against (default: HEAD~1).

Returns:
Dict with breaking_changes, signature_changes, removed_apis, and summary.
"""
parser = CodeParser()
breaking_changes: list[dict[str, Any]] = []
signature_changes: list[dict[str, Any]] = []
removed_apis: list[dict[str, Any]] = []
potential_issues: list[dict[str, Any]] = []

for file_path in changed_files:
old_source = _get_file_from_git(repo_root, file_path, base)
if old_source is None:
continue

old_nodes: list[GraphNode] = []
try:
nodes, _ = parser.parse_bytes(Path(file_path), old_source)
for n in nodes:
node = store.get_node(n.qualified_name)
if node:
old_nodes.append(node)
except Exception as exc:
logger.warning("Failed to parse old version of %s: %s", file_path, exc)
continue

new_nodes = store.get_nodes_by_file(file_path)
old_by_qn = {n.qualified_name: n for n in old_nodes if _is_public_api(n)}
new_by_qn = {n.qualified_name: n for n in new_nodes if _is_public_api(n)}

for qn, old_node in old_by_qn.items():
if qn not in new_by_qn:
removed_apis.append({
"qualified_name": qn,
"file": file_path,
"line": old_node.line_start,
"kind": old_node.kind,
"severity": "high",
"detail": f"Public {old_node.kind.lower()} '{old_node.name}' was removed",
})
breaking_changes.append({
"qualified_name": qn,
"file": file_path,
"severity": "high",
"change_type": "removed",
})

for qn, new_node in new_by_qn.items():
if qn not in old_by_qn:
continue

old_node = old_by_qn[qn]
sig_change = _detect_signature_change(old_node, new_node)
if sig_change:
signature_changes.append(sig_change)
severity = max(c["severity"] for c in sig_change["changes"])
breaking_changes.append({
"qualified_name": qn,
"file": file_path,
"severity": severity,
"change_type": "signature",
"changes": sig_change["changes"],
})

if signature_changes:
for sc in signature_changes:
for change in sc.get("changes", []):
if change["type"] == "return_type_removed":
potential_issues.append({
"type": "type_erasure",
"severity": "medium",
"detail": f"'{sc['qualified_name']}' lost type information",
"file": sc["file"],
"line": sc["line"],
})

summary_parts = [f"Analyzed {len(changed_files)} changed file(s) for breaking changes:"]
if breaking_changes:
high_severity = [c for c in breaking_changes if c["severity"] == "high"]
summary_parts.append(f" - {len(breaking_changes)} breaking change(s) detected")
summary_parts.append(f" - {len(high_severity)} high severity")
summary_parts.append(f" - {len(removed_apis)} removed public API(s)")
summary_parts.append(f" - {len(signature_changes)} signature change(s)")
else:
summary_parts.append(" - No breaking changes detected")

return {
"summary": "\n".join(summary_parts),
"breaking_changes": breaking_changes,
"removed_apis": removed_apis,
"signature_changes": signature_changes,
"potential_issues": potential_issues,
"total_breaking": len(breaking_changes),
"high_severity_count": len([c for c in breaking_changes if c["severity"] == "high"]),
}
Loading