From 62050f27ebe162f3a33b997445231cabd7394e65 Mon Sep 17 00:00:00 2001 From: dontmindaditya Date: Sun, 5 Apr 2026 04:01:48 -0700 Subject: [PATCH] asyncio --- code_review_graph/changes.py | 209 +++++++++++++++++++++++++++++++++++ 1 file changed, 209 insertions(+) diff --git a/code_review_graph/changes.py b/code_review_graph/changes.py index 35cb9e8..8be3117 100644 --- a/code_review_graph/changes.py +++ b/code_review_graph/changes.py @@ -6,15 +6,18 @@ from __future__ import annotations +import hashlib import logging import os import re import subprocess +from pathlib import Path from typing import Any from .constants import SECURITY_KEYWORDS as _SECURITY_KEYWORDS from .flows import get_affected_flows from .graph import GraphNode, GraphStore, _sanitize_name, node_to_dict +from .parser import CodeParser logger = logging.getLogger(__name__) @@ -293,3 +296,209 @@ def analyze_changes( "test_gaps": test_gaps, "review_priorities": review_priorities, } + + +# --------------------------------------------------------------------------- +# 5. Breaking change detection +# --------------------------------------------------------------------------- + + +def _get_file_from_git(repo_root: str, file_path: str, ref: str) -> bytes | None: + """Get the content of a file at a specific git ref.""" + if not _SAFE_GIT_REF.match(ref): + return None + try: + result = subprocess.run( + ["git", "show", f"{ref}:{file_path}"], + capture_output=True, + cwd=repo_root, + timeout=_GIT_TIMEOUT, + ) + if result.returncode == 0: + return result.stdout + except (OSError, subprocess.SubprocessError) as exc: + logger.warning("git show failed: %s", exc) + return None + + +def _parse_params(params_str: str | None) -> list[str]: + """Parse parameter string into a list of parameter names.""" + if not params_str: + return [] + params = [] + for part in params_str.split(","): + part = part.strip() + if not part: + continue + name = part.split(":")[0].split("=")[0].strip() + name = name.replace("*", "").strip() + if name and name not in ("self", "cls"): + params.append(name) + return params + + +def _detect_signature_change( + old_node: GraphNode, + new_node: GraphNode, +) -> dict[str, Any] | None: + """Detect if there's a breaking signature change between old and new node.""" + changes: list[dict[str, str]] = [] + + old_params = _parse_params(old_node.params) + new_params = _parse_params(new_node.params) + + if old_params != new_params: + if len(old_params) != len(new_params): + changes.append({ + "type": "param_count", + "severity": "high", + "detail": f"Parameter count changed from {len(old_params)} to {len(new_params)}", + }) + else: + for i, (old_p, new_p) in enumerate(zip(old_params, new_params)): + if old_p != new_p: + changes.append({ + "type": "param_renamed", + "severity": "medium", + "detail": f"Parameter {i}: '{old_p}' renamed to '{new_p}'", + }) + + if old_node.return_type != new_node.return_type: + if old_node.return_type and new_node.return_type: + changes.append({ + "type": "return_type", + "severity": "high", + "detail": f"Return type changed from '{old_node.return_type}' to '{new_node.return_type}'", + }) + elif new_node.return_type is None: + changes.append({ + "type": "return_type_removed", + "severity": "high", + "detail": "Return type annotation removed", + }) + + if changes: + return { + "qualified_name": old_node.qualified_name, + "file": old_node.file_path, + "line": old_node.line_start, + "changes": changes, + } + return None + + +def _is_public_api(node: GraphNode) -> bool: + """Return True if the node represents a public API (not starting with _).""" + if node.name.startswith("_") and not node.name.startswith("__"): + return False + return node.kind in ("Function", "Class", "Type") + + +def detect_breaking_changes( + store: GraphStore, + repo_root: str, + changed_files: list[str], + base: str = "HEAD~1", +) -> dict[str, Any]: + """Detect breaking API changes between the current state and a git ref. + + Args: + store: The graph store (current state). + repo_root: Repository root path. + changed_files: List of changed file paths. + base: Git ref to compare against (default: HEAD~1). + + Returns: + Dict with breaking_changes, signature_changes, removed_apis, and summary. + """ + parser = CodeParser() + breaking_changes: list[dict[str, Any]] = [] + signature_changes: list[dict[str, Any]] = [] + removed_apis: list[dict[str, Any]] = [] + potential_issues: list[dict[str, Any]] = [] + + for file_path in changed_files: + old_source = _get_file_from_git(repo_root, file_path, base) + if old_source is None: + continue + + old_nodes: list[GraphNode] = [] + try: + nodes, _ = parser.parse_bytes(Path(file_path), old_source) + for n in nodes: + node = store.get_node(n.qualified_name) + if node: + old_nodes.append(node) + except Exception as exc: + logger.warning("Failed to parse old version of %s: %s", file_path, exc) + continue + + new_nodes = store.get_nodes_by_file(file_path) + old_by_qn = {n.qualified_name: n for n in old_nodes if _is_public_api(n)} + new_by_qn = {n.qualified_name: n for n in new_nodes if _is_public_api(n)} + + for qn, old_node in old_by_qn.items(): + if qn not in new_by_qn: + removed_apis.append({ + "qualified_name": qn, + "file": file_path, + "line": old_node.line_start, + "kind": old_node.kind, + "severity": "high", + "detail": f"Public {old_node.kind.lower()} '{old_node.name}' was removed", + }) + breaking_changes.append({ + "qualified_name": qn, + "file": file_path, + "severity": "high", + "change_type": "removed", + }) + + for qn, new_node in new_by_qn.items(): + if qn not in old_by_qn: + continue + + old_node = old_by_qn[qn] + sig_change = _detect_signature_change(old_node, new_node) + if sig_change: + signature_changes.append(sig_change) + severity = max(c["severity"] for c in sig_change["changes"]) + breaking_changes.append({ + "qualified_name": qn, + "file": file_path, + "severity": severity, + "change_type": "signature", + "changes": sig_change["changes"], + }) + + if signature_changes: + for sc in signature_changes: + for change in sc.get("changes", []): + if change["type"] == "return_type_removed": + potential_issues.append({ + "type": "type_erasure", + "severity": "medium", + "detail": f"'{sc['qualified_name']}' lost type information", + "file": sc["file"], + "line": sc["line"], + }) + + summary_parts = [f"Analyzed {len(changed_files)} changed file(s) for breaking changes:"] + if breaking_changes: + high_severity = [c for c in breaking_changes if c["severity"] == "high"] + summary_parts.append(f" - {len(breaking_changes)} breaking change(s) detected") + summary_parts.append(f" - {len(high_severity)} high severity") + summary_parts.append(f" - {len(removed_apis)} removed public API(s)") + summary_parts.append(f" - {len(signature_changes)} signature change(s)") + else: + summary_parts.append(" - No breaking changes detected") + + return { + "summary": "\n".join(summary_parts), + "breaking_changes": breaking_changes, + "removed_apis": removed_apis, + "signature_changes": signature_changes, + "potential_issues": potential_issues, + "total_breaking": len(breaking_changes), + "high_severity_count": len([c for c in breaking_changes if c["severity"] == "high"]), + }