diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 70c76d2..70fdff5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -38,6 +38,8 @@ jobs: run: uv sync --all-extras --dev - name: Run tests + # Smoke CLI tests intentionally disable subprocess coverage collection + # to avoid runner-specific flakiness while keeping parent-process coverage strict. run: uv run pytest --cov=codeclone --cov-report=term-missing --cov-fail-under=98 - name: Verify baseline exists @@ -46,7 +48,7 @@ jobs: - name: Check for new clones vs baseline if: ${{ matrix.python-version == '3.13' }} - run: uv run codeclone . --fail-on-new --no-progress + run: uv run codeclone . --ci lint: runs-on: ubuntu-latest diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8b0f864..8609d0a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,9 +1,31 @@ repos: -- repo: local + - repo: local hooks: - - id: codeclone + - id: ruff-check + name: Ruff (lint) + entry: ruff check . + language: system + pass_filenames: false + types: [ python ] + + - id: ruff-format + name: Ruff (format) + entry: ruff format . + language: system + pass_filenames: false + types: [ python ] + + - id: mypy + name: Mypy + entry: mypy . + language: system + pass_filenames: false + types: [ python ] + + - id: codeclone name: CodeClone entry: codeclone - language: python - args: [".", "--fail-on-new"] - types: [python] + language: system + pass_filenames: false + args: [ ".", "--ci" ] + types: [ python ] \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index e86fef2..48213a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,79 @@ # Changelog +## [1.3.0] - 2026-02-08 + +### Overview + +This release improves detection precision, determinism, and auditability, adds +segment-level reporting, refreshes the HTML report UI, and hardens baseline/cache +contracts for CI usage. + +**Breaking (CI):** baseline contract checks are stricter. Legacy or mismatched baselines +must be regenerated. + +### Detection Engine + +- Safe normalization upgrades: local logical equivalence, proven-domain commutative + canonicalization, and preserved symbolic call targets. +- Internal CFG metadata markers were moved to the `__CC_META__::...` namespace and emitted + as synthetic AST names to prevent collisions with user string literals. +- CFG precision upgrades: short-circuit micro-CFG, selective `try/except` raise-linking, + loop `break`/`continue` jump semantics, `for/while ... else`, and ordered `match`/`except`. +- Deterministic traversal and ordering improvements for stable clone grouping/report output. +- Segment-level internal detection added with strict candidate->hash confirmation; remains + report-only (not part of baseline/CI fail criteria). +- Segment report noise reduction: overlapping windows are merged and boilerplate-only groups + are suppressed using deterministic AST criteria. + +### Baseline & CI + +- Baseline format is versioned (`baseline_version`, `schema_version`) and legacy baselines + fail fast with regeneration guidance. +- Added tamper-evident baseline integrity for v1.3+ (`generator`, `payload_sha256`). +- Added configurable size guards: `--max-baseline-size-mb`, `--max-cache-size-mb`. +- Behavioral hardening: in normal mode, untrusted baseline states are ignored with warning + and compared as empty; in `--fail-on-new` / `--ci`, they fail fast with deterministic exit codes. + +Update baseline after upgrade: + +```bash +codeclone . --update-baseline +``` + +### CLI & Reports + +- Added `--version`, `--cache-path` (legacy alias: `--cache-dir`), and `--ci` preset. +- Added strict output extension validation for `--html/.html`, `--json/.json`, `--text/.txt`. +- Summary output was redesigned for deterministic, cache-aware metrics across standard and CI modes. +- User-facing CLI messages were centralized in `codeclone/ui_messages.py`. +- HTML/TXT/JSON reports now include consistent provenance metadata (baseline/cache status fields). +- Clone group/report ordering is deterministic and aligned across HTML/TXT/JSON outputs. + +### HTML UI + +- Refreshed layout with improved navigation and dashboard widgets. +- Added command palette and keyboard shortcuts. +- Replaced emoji icons with inline SVG icons. +- Hardened escaping (text + attribute context) and snippet fallback behavior. + +### Cache & Security + +- Cache default moved to `/.cache/codeclone/cache.json` with legacy path warning. +- Cache schema was extended to include segment data (`CACHE_VERSION=1.1`). +- Cache integrity uses constant-time signature checks and deep schema validation. +- Invalid/oversized cache is ignored deterministically and rebuilt from source. +- Added security regressions for traversal safety, report escaping, baseline/cache integrity, + and deterministic report ordering across formats. +- Fixed POSIX parser CPU guard to avoid lowering `RLIMIT_CPU` hard limit. + +### Documentation & Packaging + +- Updated README and docs (`architecture`, `cfg`, `SECURITY`, `CONTRIBUTING`) to reflect + current contracts and behaviors. +- Removed an invalid PyPI classifier from package metadata. + +--- + ## [1.2.1] - 2026-02-02 ### Overview diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7987e01..47ccfdf 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -30,6 +30,7 @@ We especially welcome contributions in the following areas: - Control Flow Graph (CFG) construction and semantics - AST normalization improvements +- Segment-level clone detection and reporting - False-positive reduction - HTML report UX improvements - Performance optimizations @@ -83,6 +84,25 @@ Such changes often require design-level discussion and may be staged across vers --- +## Security & Safety Expectations + +- Assume **untrusted input** (paths and source code). +- Add **negative tests** for any normalization or CFG change. +- Changes must preserve determinism and avoid new false positives. + +--- + +## Baseline & CI + +- Baselines are **versioned**. Regenerate with `codeclone . --update-baseline` + when detection logic or CodeClone version changes. +- Baselines in 1.3+ are tamper-evident (`generator`, `payload_sha256`). +- Baseline verification must use the same Python `major.minor` version. +- In `--fail-on-new` / `--ci`, untrusted baseline states fail fast. Outside gating + mode, baseline is ignored with warning and comparison proceeds against an empty baseline. + +--- + ## Development Setup ```bash @@ -96,15 +116,15 @@ pip install -e .[dev] Run tests: ```bash -pytest +uv run pytest ``` Static checks: ```bash -mypy -ruff check . -ruff format . +uv run mypy . +uv run ruff check . +uv run ruff format . ``` --- @@ -128,6 +148,9 @@ CodeClone follows **semantic versioning**: - **MINOR**: new detection capabilities (for example, CFG improvements) - **PATCH**: bug fixes, performance improvements, and UI/UX polish +Baselines are versioned. Any change to detection behavior must include documentation +and tests, and may require baseline regeneration. + --- ## License diff --git a/README.md b/README.md index 21b7cc9..7dea74c 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,12 @@ # CodeClone -[![PyPI](https://img.shields.io/pypi/v/codeclone.svg)](https://pypi.org/project/codeclone/) -[![Downloads](https://img.shields.io/pypi/dm/codeclone.svg)](https://pypi.org/project/codeclone/) -[![tests](https://github.com/orenlab/codeclone/actions/workflows/tests.yml/badge.svg?branch=main)](https://github.com/orenlab/codeclone/actions/workflows/tests.yml) -[![Python](https://img.shields.io/pypi/pyversions/codeclone.svg)](https://pypi.org/project/codeclone/) -[![License](https://img.shields.io/pypi/l/codeclone.svg)](LICENSE) +[![PyPI](https://img.shields.io/pypi/v/codeclone.svg?style=flat-square)](https://pypi.org/project/codeclone/) +[![Downloads](https://img.shields.io/pypi/dm/codeclone.svg?style=flat-square)](https://pypi.org/project/codeclone/) +[![tests](https://github.com/orenlab/codeclone/actions/workflows/tests.yml/badge.svg?branch=main&style=flat-square)](https://github.com/orenlab/codeclone/actions/workflows/tests.yml) +[![Python](https://img.shields.io/pypi/pyversions/codeclone.svg?style=flat-square)](https://pypi.org/project/codeclone/) +![CI First](https://img.shields.io/badge/CI-first-green?style=flat-square) +![Baseline](https://img.shields.io/badge/baseline-versioned-green?style=flat-square) +[![License](https://img.shields.io/pypi/l/codeclone.svg?style=flat-square)](LICENSE) **CodeClone** is a Python code clone detector based on **normalized Python AST and Control Flow Graphs (CFG)**. It helps teams discover architectural duplication and prevent new copy-paste from entering the codebase via CI. @@ -63,6 +65,12 @@ Typical use cases: - no `__init__` noise, - size and statement-count thresholds. +### Segment-level internal clone detection + +- Detects repeated **segment windows** inside the same function. +- Uses a two-step deterministic match (candidate signature → strict hash). +- Included in reports for explainability, **not** in baseline/CI failure logic. + ### Control-Flow Awareness (CFG v1) - Each function is converted into a **Control Flow Graph**. @@ -74,7 +82,11 @@ Typical use cases: - `with` / `async with` - `match` / `case` (Python 3.10+) - Current CFG semantics (v1): - - `break` and `continue` are treated as statements (no jump targets), + - `and` / `or` are modeled as short-circuit micro-CFG branches, + - `try/except` links only from statements that may raise, + - `break` / `continue` are modeled as terminating loop transitions with explicit targets, + - `for/while ... else` semantics are preserved structurally, + - `match case` and `except` handler order is preserved structurally, - after-blocks are explicit and always present, - focus is on **structural similarity**, not precise runtime semantics. @@ -86,6 +98,7 @@ This design keeps clone detection **stable, deterministic, and low-noise**. - Conservative defaults tuned for real-world Python projects. - Explicit thresholds for size and statement count. - No probabilistic scoring or heuristic similarity thresholds. +- Safe commutative normalization and local logical equivalences only. - Focus on *architectural duplication*, not micro-similarities. ### CI-friendly baseline mode @@ -102,9 +115,7 @@ This design keeps clone detection **stable, deterministic, and low-noise**. pip install codeclone ``` -Python **3.10+** is required. - ---- +Python 3.10+ is required. ## Quick Start @@ -135,11 +146,41 @@ Generate an HTML report: codeclone . --html .cache/codeclone/report.html ``` +Check version: + +```bash +codeclone --version +``` + +--- + +## Reports and Metadata + +All report formats include provenance metadata for auditability: + +`codeclone_version`, `python_version`, `baseline_path`, `baseline_version`, +`baseline_schema_version`, `baseline_python_version`, `baseline_loaded`, +`baseline_status` (and cache metadata when available). + +baseline_status values: + +- `ok` +- `missing` +- `legacy` +- `invalid` +- `mismatch_version` +- `mismatch_schema` +- `mismatch_python` +- `generator_mismatch` +- `integrity_missing` +- `integrity_failed` +- `too_large` + --- ## Baseline Workflow (Recommended) -### 1. Create a baseline +1. Create a baseline Run once on your current codebase: @@ -149,21 +190,73 @@ codeclone . --update-baseline Commit the generated baseline file to the repository. -### 2. Use in CI +Baselines are versioned. If CodeClone is upgraded, regenerate the baseline to keep +CI deterministic and explainable. + +Baseline format in 1.3+ is tamper-evident (generator, payload_sha256) and validated +before baseline comparison. + +2. Trusted vs untrusted baseline behavior + +Baseline states considered untrusted: + +- `invalid` +- `too_large` +- `generator_mismatch` +- `integrity_missing` +- `integrity_failed` + +Behavior: + +- in normal mode, untrusted baseline is ignored with a warning (comparison falls back to empty baseline); +- in `--fail-on-new` / `--ci`, untrusted baseline fails fast (exit code 2). + +3. Use in CI + +```bash +codeclone . --ci +``` + +or: ```bash -codeclone . --fail-on-new --no-progress +codeclone . --ci --html .cache/codeclone/report.html ``` +`--ci` is equivalent to `--fail-on-new --no-color --quiet`. + Behavior: - existing clones are allowed, -- the build fails if *new* clones appear, +- the build fails if new clones appear, - refactoring that removes duplication is always allowed. -`--fail-on-new` exits with a non-zero code when new clones are detected. +`--fail-on-new` / `--ci` exits with a non-zero code when new clones are detected. + +--- + +### Cache + +By default, CodeClone stores the cache per project at: + +```bash +/.cache/codeclone/cache.json +``` + +You can override this path with `--cache-path` (`--cache-dir` is a legacy alias). -### Python Version Consistency for Baseline Checks +If you used an older version of CodeClone, delete the legacy cache file at +`~/.cache/codeclone/cache.json` and add `.cache/` to `.gitignore`. + +Cache integrity checks are strict: signature mismatch or oversized cache files are ignored +with an explicit warning, then rebuilt from source. + +Cache entries are validated against expected structure/types; invalid entries are ignored +deterministically. + +--- + +## Python Version Consistency for Baseline Checks Due to inherent differences in Python’s AST between interpreter versions, baseline generation and verification must be performed using the same Python version. @@ -184,8 +277,9 @@ repos: - id: codeclone name: CodeClone entry: codeclone - language: python - args: [ ".", "--fail-on-new" ] + language: system + pass_filenames: false + args: [ ".", "--ci" ] types: [ python ] ``` @@ -193,30 +287,29 @@ repos: ## What CodeClone Is (and Is Not) -### CodeClone **is** +### CodeClone Is - an architectural analysis tool, - a duplication radar, - a CI guard against copy-paste, - a control-flow-aware clone detector. -### CodeClone **is not** +### CodeClone Is Not - a linter, - a formatter, - a semantic equivalence prover, - a runtime analyzer. ---- - ## How It Works (High Level) 1. Parse Python source into AST. 2. Normalize AST (names, constants, attributes, annotations). -3. Build a **Control Flow Graph (CFG)** per function. +3. Build a Control Flow Graph (CFG) per function. 4. Compute stable CFG fingerprints. -5. Detect function-level and block-level clones. -6. Apply conservative filters to suppress noise. +5. Extract segment windows for internal clone discovery. +6. Detect function-level, block-level, and segment-level clones. +7. Apply conservative filters to suppress noise. See the architectural overview: @@ -226,10 +319,10 @@ See the architectural overview: ## Control Flow Graph (CFG) -Starting from **version 1.1.0**, CodeClone uses a **Control Flow Graph (CFG)** +Starting from version 1.1.0, CodeClone uses a Control Flow Graph (CFG) to improve structural clone detection robustness. -The CFG is a **structural abstraction**, not a runtime execution model. +The CFG is a structural abstraction, not a runtime execution model. See full design and semantics: @@ -237,6 +330,32 @@ See full design and semantics: --- +## CLI Options + +| Option | Description | Default | +|-------------------------------|----------------------------------------------------------------------|--------------------------------------| +| `root` | Project root directory to scan | `.` | +| `--version` | Print CodeClone version and exit | - | +| `--min-loc` | Minimum function LOC to analyze | `15` | +| `--min-stmt` | Minimum AST statements to analyze | `6` | +| `--processes` | Number of worker processes | `4` | +| `--cache-path FILE` | Cache file path | `/.cache/codeclone/cache.json` | +| `--cache-dir FILE` | Legacy alias for `--cache-path` | - | +| `--max-cache-size-mb MB` | Max cache size before ignore + warning | `50` | +| `--baseline FILE` | Baseline file path | `codeclone.baseline.json` | +| `--max-baseline-size-mb MB` | Max baseline size; untrusted baseline fails in CI, ignored otherwise | `5` | +| `--update-baseline` | Regenerate baseline from current results | `False` | +| `--fail-on-new` | Fail if new function/block clone groups appear vs baseline | `False` | +| `--fail-threshold MAX_CLONES` | Fail if total clone groups (`function + block`) exceed threshold | `-1` (disabled) | +| `--ci` | CI preset: `--fail-on-new --no-color --quiet` | `False` | +| `--html FILE` | Write HTML report (`.html`) | - | +| `--json FILE` | Write JSON report (`.json`) | - | +| `--text FILE` | Write text report (`.txt`) | - | +| `--no-progress` | Disable progress bar output | `False` | +| `--no-color` | Disable ANSI colors | `False` | +| `--quiet` | Minimize output (warnings/errors still shown) | `False` | +| `--verbose` | Show hash details for new clone groups in fail output | `False` | + ## License MIT License diff --git a/SECURITY.md b/SECURITY.md index 533843a..080e1ef 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -9,7 +9,8 @@ The following versions currently receive security updates: | Version | Supported | |---------|-----------| -| 1.2.x | Yes | +| 1.3.x | Yes | +| 1.2.x | No | | 1.1.x | No | | 1.0.x | No | @@ -33,6 +34,20 @@ Potential risk areas include: These areas are explicitly tested and hardened, but are still the primary focus of ongoing security review. +Additional safeguards: + +- HTML report content is escaped in both text and attribute contexts to prevent script injection. +- Reports are static and do not execute analyzed code. +- Scanner traversal is root-confined and prevents symlink-based path escape. +- Baseline files are schema/type validated with size limits and tamper-evident integrity fields + (`generator`, `payload_sha256` for v1.3+). +- Baseline integrity is tamper-evident (audit signal), not tamper-proof cryptographic signing. + An actor who can rewrite baseline content and recompute `payload_sha256` can still alter it. +- In `--fail-on-new` / `--ci`, untrusted baseline states fail fast; otherwise baseline is ignored + with explicit warning and comparison proceeds against an empty baseline. +- Cache files are HMAC-signed (constant-time comparison), size-limited, and ignored on mismatch. +- Cache secrets are stored next to the cache (`.cache_secret`) and must not be committed. + --- ## Reporting a Vulnerability diff --git a/codeclone.baseline.json b/codeclone.baseline.json index ce9a169..7dafea0 100644 --- a/codeclone.baseline.json +++ b/codeclone.baseline.json @@ -1,8 +1,10 @@ { - "functions": [ - "23353998d062bbdf37c345cbe5256b3f5686d956|0-19", - "7d573fa56fb11050f1642f18ca4bb3225e11e194|0-19" - ], + "functions": [], "blocks": [], - "python_version": "3.13" + "python_version": "3.13", + "baseline_version": "1.3.0", + "schema_version": 1, + "generator": "codeclone", + "payload_sha256": "92e80b05c857b796bb452de9e62985a1568874da468bc671998133975c94397a", + "created_at": "2026-02-08T09:54:31+00:00" } \ No newline at end of file diff --git a/codeclone/_cli_args.py b/codeclone/_cli_args.py new file mode 100644 index 0000000..49c0ad5 --- /dev/null +++ b/codeclone/_cli_args.py @@ -0,0 +1,161 @@ +""" +CodeClone — AST and CFG-based code clone detector for Python +focused on architectural duplication. + +Copyright (c) 2026 Den Rozhnovskiy +Licensed under the MIT License. +""" + +from __future__ import annotations + +import argparse +from typing import cast + +from . import ui_messages as ui + + +class _HelpFormatter(argparse.ArgumentDefaultsHelpFormatter): + def _get_help_string(self, action: argparse.Action) -> str: + if action.dest == "cache_path": + return action.help or "" + return cast(str, super()._get_help_string(action)) + + +def build_parser(version: str) -> argparse.ArgumentParser: + ap = argparse.ArgumentParser( + prog="codeclone", + description="AST and CFG-based code clone detector for Python.", + formatter_class=_HelpFormatter, + ) + ap.add_argument( + "--version", + action="version", + version=ui.version_output(version), + help=ui.HELP_VERSION, + ) + + core_group = ap.add_argument_group("Target") + core_group.add_argument( + "root", + nargs="?", + default=".", + help=ui.HELP_ROOT, + ) + + tune_group = ap.add_argument_group("Analysis Tuning") + tune_group.add_argument( + "--min-loc", + type=int, + default=15, + help=ui.HELP_MIN_LOC, + ) + tune_group.add_argument( + "--min-stmt", + type=int, + default=6, + help=ui.HELP_MIN_STMT, + ) + tune_group.add_argument( + "--processes", + type=int, + default=4, + help=ui.HELP_PROCESSES, + ) + tune_group.add_argument( + "--cache-path", + dest="cache_path", + metavar="FILE", + default=None, + help=ui.HELP_CACHE_PATH, + ) + tune_group.add_argument( + "--cache-dir", + dest="cache_path", + metavar="FILE", + default=None, + help=ui.HELP_CACHE_DIR_LEGACY, + ) + tune_group.add_argument( + "--max-cache-size-mb", + type=int, + default=50, + metavar="MB", + help=ui.HELP_MAX_CACHE_SIZE_MB, + ) + + ci_group = ap.add_argument_group("Baseline & CI/CD") + ci_group.add_argument( + "--baseline", + default="codeclone.baseline.json", + help=ui.HELP_BASELINE, + ) + ci_group.add_argument( + "--max-baseline-size-mb", + type=int, + default=5, + metavar="MB", + help=ui.HELP_MAX_BASELINE_SIZE_MB, + ) + ci_group.add_argument( + "--update-baseline", + action="store_true", + help=ui.HELP_UPDATE_BASELINE, + ) + ci_group.add_argument( + "--fail-on-new", + action="store_true", + help=ui.HELP_FAIL_ON_NEW, + ) + ci_group.add_argument( + "--fail-threshold", + type=int, + default=-1, + metavar="MAX_CLONES", + help=ui.HELP_FAIL_THRESHOLD, + ) + ci_group.add_argument( + "--ci", + action="store_true", + help=ui.HELP_CI, + ) + + out_group = ap.add_argument_group("Reporting") + out_group.add_argument( + "--html", + dest="html_out", + metavar="FILE", + help=ui.HELP_HTML, + ) + out_group.add_argument( + "--json", + dest="json_out", + metavar="FILE", + help=ui.HELP_JSON, + ) + out_group.add_argument( + "--text", + dest="text_out", + metavar="FILE", + help=ui.HELP_TEXT, + ) + out_group.add_argument( + "--no-progress", + action="store_true", + help=ui.HELP_NO_PROGRESS, + ) + out_group.add_argument( + "--no-color", + action="store_true", + help=ui.HELP_NO_COLOR, + ) + out_group.add_argument( + "--quiet", + action="store_true", + help=ui.HELP_QUIET, + ) + out_group.add_argument( + "--verbose", + action="store_true", + help=ui.HELP_VERBOSE, + ) + return ap diff --git a/codeclone/_cli_meta.py b/codeclone/_cli_meta.py new file mode 100644 index 0000000..fe6a04e --- /dev/null +++ b/codeclone/_cli_meta.py @@ -0,0 +1,43 @@ +""" +CodeClone — AST and CFG-based code clone detector for Python +focused on architectural duplication. + +Copyright (c) 2026 Den Rozhnovskiy +Licensed under the MIT License. +""" + +from __future__ import annotations + +import sys +from pathlib import Path +from typing import Any + +from .baseline import Baseline + + +def _current_python_version() -> str: + return f"{sys.version_info.major}.{sys.version_info.minor}" + + +def _build_report_meta( + *, + codeclone_version: str, + baseline_path: Path, + baseline: Baseline, + baseline_loaded: bool, + baseline_status: str, + cache_path: Path, + cache_used: bool, +) -> dict[str, Any]: + return { + "codeclone_version": codeclone_version, + "python_version": _current_python_version(), + "baseline_path": str(baseline_path), + "baseline_version": baseline.baseline_version, + "baseline_schema_version": baseline.schema_version, + "baseline_python_version": baseline.python_version, + "baseline_loaded": baseline_loaded, + "baseline_status": baseline_status, + "cache_path": str(cache_path), + "cache_used": cache_used, + } diff --git a/codeclone/_cli_paths.py b/codeclone/_cli_paths.py new file mode 100644 index 0000000..4dcd72f --- /dev/null +++ b/codeclone/_cli_paths.py @@ -0,0 +1,36 @@ +""" +CodeClone — AST and CFG-based code clone detector for Python +focused on architectural duplication. + +Copyright (c) 2026 Den Rozhnovskiy +Licensed under the MIT License. +""" + +from __future__ import annotations + +import sys +from collections.abc import Callable +from pathlib import Path + +from rich.console import Console + + +def expand_path(p: str) -> Path: + return Path(p).expanduser().resolve() + + +def _validate_output_path( + path: str, + *, + expected_suffix: str, + label: str, + console: Console, + invalid_message: Callable[..., str], +) -> Path: + out = Path(path).expanduser() + if out.suffix.lower() != expected_suffix: + console.print( + invalid_message(label=label, path=out, expected_suffix=expected_suffix) + ) + sys.exit(2) + return out.resolve() diff --git a/codeclone/_cli_summary.py b/codeclone/_cli_summary.py new file mode 100644 index 0000000..40df388 --- /dev/null +++ b/codeclone/_cli_summary.py @@ -0,0 +1,115 @@ +""" +CodeClone — AST and CFG-based code clone detector for Python +focused on architectural duplication. + +Copyright (c) 2026 Den Rozhnovskiy +Licensed under the MIT License. +""" + +from __future__ import annotations + +from rich.console import Console +from rich.table import Table +from rich.text import Text + +from . import ui_messages as ui + + +def _summary_value_style(*, label: str, value: int) -> str: + if value == 0: + return "dim" + if label == ui.SUMMARY_LABEL_NEW_BASELINE: + return "bold red" + if label == ui.SUMMARY_LABEL_SUPPRESSED: + return "yellow" + return "bold green" + + +def _build_summary_rows( + *, + files_found: int, + files_analyzed: int, + cache_hits: int, + files_skipped: int, + func_clones_count: int, + block_clones_count: int, + segment_clones_count: int, + suppressed_segment_groups: int, + new_clones_count: int, +) -> list[tuple[str, int]]: + return [ + (ui.SUMMARY_LABEL_FILES_FOUND, files_found), + (ui.SUMMARY_LABEL_FILES_ANALYZED, files_analyzed), + (ui.SUMMARY_LABEL_CACHE_HITS, cache_hits), + (ui.SUMMARY_LABEL_FILES_SKIPPED, files_skipped), + (ui.SUMMARY_LABEL_FUNCTION, func_clones_count), + (ui.SUMMARY_LABEL_BLOCK, block_clones_count), + (ui.SUMMARY_LABEL_SEGMENT, segment_clones_count), + (ui.SUMMARY_LABEL_SUPPRESSED, suppressed_segment_groups), + (ui.SUMMARY_LABEL_NEW_BASELINE, new_clones_count), + ] + + +def _build_summary_table(rows: list[tuple[str, int]]) -> Table: + summary_table = Table(title=ui.SUMMARY_TITLE, show_header=True) + summary_table.add_column("Metric") + summary_table.add_column("Value", justify="right") + for label, value in rows: + summary_table.add_row( + label, + Text(str(value), style=_summary_value_style(label=label, value=value)), + ) + return summary_table + + +def _print_summary( + *, + console: Console, + quiet: bool, + files_found: int, + files_analyzed: int, + cache_hits: int, + files_skipped: int, + func_clones_count: int, + block_clones_count: int, + segment_clones_count: int, + suppressed_segment_groups: int, + new_clones_count: int, +) -> None: + invariant_ok = files_found == (files_analyzed + cache_hits + files_skipped) + rows = _build_summary_rows( + files_found=files_found, + files_analyzed=files_analyzed, + cache_hits=cache_hits, + files_skipped=files_skipped, + func_clones_count=func_clones_count, + block_clones_count=block_clones_count, + segment_clones_count=segment_clones_count, + suppressed_segment_groups=suppressed_segment_groups, + new_clones_count=new_clones_count, + ) + + if quiet: + console.print(ui.SUMMARY_TITLE) + console.print( + ui.fmt_summary_compact_input( + found=files_found, + analyzed=files_analyzed, + cache_hits=cache_hits, + skipped=files_skipped, + ) + ) + console.print( + ui.fmt_summary_compact_clones( + function=func_clones_count, + block=block_clones_count, + segment=segment_clones_count, + suppressed=suppressed_segment_groups, + new=new_clones_count, + ) + ) + else: + console.print(_build_summary_table(rows)) + + if not invariant_ok: + console.print(f"[warning]{ui.WARN_SUMMARY_ACCOUNTING_MISMATCH}[/warning]") diff --git a/codeclone/_html_escape.py b/codeclone/_html_escape.py new file mode 100644 index 0000000..16f7ae4 --- /dev/null +++ b/codeclone/_html_escape.py @@ -0,0 +1,35 @@ +""" +CodeClone — AST and CFG-based code clone detector for Python +focused on architectural duplication. + +Copyright (c) 2026 Den Rozhnovskiy +Licensed under the MIT License. +""" + +from __future__ import annotations + +import html +from typing import Any + + +def _escape_html(v: Any) -> str: + text = html.escape("" if v is None else str(v), quote=True) + text = text.replace("`", "`") + text = text.replace("\u2028", "
").replace("\u2029", "
") + return text + + +def _escape_attr(v: Any) -> str: + text = html.escape("" if v is None else str(v), quote=True) + text = text.replace("`", "`") + text = text.replace("\u2028", "
").replace("\u2029", "
") + return text + + +def _meta_display(v: Any) -> str: + if isinstance(v, bool): + return "true" if v else "false" + if v is None: + return "n/a" + text = str(v).strip() + return text if text else "n/a" diff --git a/codeclone/_html_snippets.py b/codeclone/_html_snippets.py new file mode 100644 index 0000000..915cb1d --- /dev/null +++ b/codeclone/_html_snippets.py @@ -0,0 +1,208 @@ +""" +CodeClone — AST and CFG-based code clone detector for Python +focused on architectural duplication. + +Copyright (c) 2026 Den Rozhnovskiy +Licensed under the MIT License. +""" + +from __future__ import annotations + +import html +import importlib +import itertools +from collections.abc import Iterable +from dataclasses import dataclass +from functools import lru_cache +from typing import Any, NamedTuple, cast + +from .errors import FileProcessingError + + +def pairwise(iterable: Iterable[Any]) -> Iterable[tuple[Any, Any]]: + a, b = itertools.tee(iterable) + next(b, None) + return zip(a, b, strict=False) + + +@dataclass(slots=True) +class _Snippet: + filepath: str + start_line: int + end_line: int + code_html: str + + +class _FileCache: + __slots__ = ("_get_lines_impl", "maxsize") + + def __init__(self, maxsize: int = 128) -> None: + self.maxsize = maxsize + self._get_lines_impl = lru_cache(maxsize=maxsize)(self._read_file_range) + + @staticmethod + def _read_file_range( + filepath: str, start_line: int, end_line: int + ) -> tuple[str, ...]: + if start_line < 1: + start_line = 1 + if end_line < start_line: + return () + + try: + + def _read_with_errors(errors: str) -> tuple[str, ...]: + lines: list[str] = [] + with open(filepath, encoding="utf-8", errors=errors) as f: + for lineno, line in enumerate(f, start=1): + if lineno < start_line: + continue + if lineno > end_line: + break + lines.append(line.rstrip("\n")) + return tuple(lines) + + try: + return _read_with_errors("strict") + except UnicodeDecodeError: + return _read_with_errors("replace") + except OSError as e: + raise FileProcessingError(f"Cannot read {filepath}: {e}") from e + + def get_lines_range( + self, filepath: str, start_line: int, end_line: int + ) -> tuple[str, ...]: + return self._get_lines_impl(filepath, start_line, end_line) + + class _CacheInfo(NamedTuple): + hits: int + misses: int + maxsize: int | None + currsize: int + + def cache_info(self) -> _CacheInfo: + return cast(_FileCache._CacheInfo, self._get_lines_impl.cache_info()) + + +def _try_pygments(code: str) -> str | None: + try: + pygments = importlib.import_module("pygments") + formatters = importlib.import_module("pygments.formatters") + lexers = importlib.import_module("pygments.lexers") + except ImportError: + return None + + highlight = pygments.highlight + formatter_cls = formatters.HtmlFormatter + lexer_cls = lexers.PythonLexer + result = highlight(code, lexer_cls(), formatter_cls(nowrap=True)) + return result if isinstance(result, str) else None + + +def _pygments_css(style_name: str) -> str: + """ + Returns CSS for pygments tokens. Scoped to `.codebox` to avoid leaking styles. + If Pygments is not available or style missing, returns "". + """ + try: + formatters = importlib.import_module("pygments.formatters") + except ImportError: + return "" + + try: + formatter_cls = formatters.HtmlFormatter + fmt = formatter_cls(style=style_name) + except Exception: + try: + fmt = formatter_cls() + except Exception: + return "" + + try: + css = fmt.get_style_defs(".codebox") + return css if isinstance(css, str) else "" + except Exception: + return "" + + +def _prefix_css(css: str, prefix: str) -> str: + """ + Prefix every selector block with `prefix `. + Safe enough for pygments CSS which is mostly selector blocks and comments. + """ + out_lines: list[str] = [] + for line in css.splitlines(): + stripped = line.strip() + if not stripped: + out_lines.append(line) + continue + if stripped.startswith(("/*", "*", "*/")): + out_lines.append(line) + continue + if "{" in line: + before, after = line.split("{", 1) + sel = before.strip() + if sel: + out_lines.append(f"{prefix} {sel} {{ {after}".rstrip()) + else: + out_lines.append(line) + else: + out_lines.append(line) + return "\n".join(out_lines) + + +def _render_code_block( + *, + filepath: str, + start_line: int, + end_line: int, + file_cache: _FileCache, + context: int, + max_lines: int, +) -> _Snippet: + s = max(1, start_line - context) + e = end_line + context + + if e - s + 1 > max_lines: + e = s + max_lines - 1 + + try: + lines = file_cache.get_lines_range(filepath, s, e) + except FileProcessingError: + missing = ( + '
'
+            '
Source file unavailable
' + "
" + ) + return _Snippet( + filepath=filepath, + start_line=start_line, + end_line=end_line, + code_html=missing, + ) + + numbered: list[tuple[bool, str]] = [] + for lineno, line in enumerate(lines, start=s): + hit = start_line <= lineno <= end_line + numbered.append((hit, f"{lineno:>5} | {line.rstrip()}")) + + raw = "\n".join(text for _, text in numbered) + highlighted = _try_pygments(raw) + + if highlighted is None: + rendered: list[str] = [] + for hit, text in numbered: + cls = "hitline" if hit else "line" + rendered.append( + f'
{html.escape(text, quote=False)}
' + ) + body = "\n".join(rendered) + else: + body = highlighted + + return _Snippet( + filepath=filepath, + start_line=start_line, + end_line=end_line, + code_html=f'
{body}
', + ) diff --git a/codeclone/_report_grouping.py b/codeclone/_report_grouping.py new file mode 100644 index 0000000..3ad44ab --- /dev/null +++ b/codeclone/_report_grouping.py @@ -0,0 +1,64 @@ +""" +CodeClone — AST and CFG-based code clone detector for Python +focused on architectural duplication. + +Copyright (c) 2026 Den Rozhnovskiy +Licensed under the MIT License. +""" + +from __future__ import annotations + +from ._report_types import GroupItem, GroupMap + + +def build_groups(units: list[GroupItem]) -> GroupMap: + groups: GroupMap = {} + for u in units: + key = f"{u['fingerprint']}|{u['loc_bucket']}" + groups.setdefault(key, []).append(u) + return {k: v for k, v in groups.items() if len(v) > 1} + + +def build_block_groups(blocks: list[GroupItem], min_functions: int = 2) -> GroupMap: + groups: GroupMap = {} + for b in blocks: + groups.setdefault(b["block_hash"], []).append(b) + + filtered: GroupMap = {} + for h, items in groups.items(): + functions = {i["qualname"] for i in items} + if len(functions) >= min_functions: + filtered[h] = items + + return filtered + + +def build_segment_groups( + segments: list[GroupItem], min_occurrences: int = 2 +) -> GroupMap: + sig_groups: GroupMap = {} + for s in segments: + sig_groups.setdefault(s["segment_sig"], []).append(s) + + confirmed: GroupMap = {} + for items in sig_groups.values(): + if len(items) < min_occurrences: + continue + + hash_groups: GroupMap = {} + for item in items: + hash_groups.setdefault(item["segment_hash"], []).append(item) + + for segment_hash, hash_items in hash_groups.items(): + if len(hash_items) < min_occurrences: + continue + + by_func: GroupMap = {} + for it in hash_items: + by_func.setdefault(it["qualname"], []).append(it) + + for qualname, q_items in by_func.items(): + if len(q_items) >= min_occurrences: + confirmed[f"{segment_hash}|{qualname}"] = q_items + + return confirmed diff --git a/codeclone/_report_segments.py b/codeclone/_report_segments.py new file mode 100644 index 0000000..bd985cb --- /dev/null +++ b/codeclone/_report_segments.py @@ -0,0 +1,247 @@ +""" +CodeClone — AST and CFG-based code clone detector for Python +focused on architectural duplication. + +Copyright (c) 2026 Den Rozhnovskiy +Licensed under the MIT License. +""" + +from __future__ import annotations + +import ast +from dataclasses import dataclass +from pathlib import Path + +from ._report_types import GroupItem, GroupMap + +SEGMENT_MIN_UNIQUE_STMT_TYPES = 2 + +_CONTROL_FLOW_STMTS = ( + ast.If, + ast.For, + ast.While, + ast.Try, + ast.With, + ast.Match, + ast.AsyncFor, + ast.AsyncWith, +) +_FORBIDDEN_STMTS = (ast.Return, ast.Raise, ast.Assert) + + +@dataclass(frozen=True, slots=True) +class _SegmentAnalysis: + unique_stmt_types: int + has_control_flow: bool + is_boilerplate: bool + + +class _QualnameCollector(ast.NodeVisitor): + __slots__ = ("funcs", "stack") + + def __init__(self) -> None: + self.stack: list[str] = [] + self.funcs: dict[str, ast.FunctionDef | ast.AsyncFunctionDef] = {} + + def visit_ClassDef(self, node: ast.ClassDef) -> None: + self.stack.append(node.name) + self.generic_visit(node) + self.stack.pop() + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + name = ".".join([*self.stack, node.name]) if self.stack else node.name + self.funcs[name] = node + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: + name = ".".join([*self.stack, node.name]) if self.stack else node.name + self.funcs[name] = node + + +def _merge_segment_items(items: list[GroupItem]) -> list[GroupItem]: + if not items: + return [] + + items_sorted = sorted( + items, + key=lambda i: ( + i.get("filepath", ""), + i.get("qualname", ""), + int(i.get("start_line", 0)), + int(i.get("end_line", 0)), + ), + ) + + merged: list[GroupItem] = [] + current: GroupItem | None = None + + for item in items_sorted: + start = int(item.get("start_line", 0)) + end = int(item.get("end_line", 0)) + if start <= 0 or end <= 0: + continue + + if current is None: + current = dict(item) + current["start_line"] = start + current["end_line"] = end + current["size"] = max(1, end - start + 1) + continue + + same_owner = current.get("filepath") == item.get("filepath") and current.get( + "qualname" + ) == item.get("qualname") + if same_owner and start <= int(current["end_line"]) + 1: + current["end_line"] = max(int(current["end_line"]), end) + current["size"] = max( + 1, int(current["end_line"]) - int(current["start_line"]) + 1 + ) + continue + + merged.append(current) + current = dict(item) + current["start_line"] = start + current["end_line"] = end + current["size"] = max(1, end - start + 1) + + if current is not None: + merged.append(current) + + return merged + + +def _collect_file_functions( + filepath: str, +) -> dict[str, ast.FunctionDef | ast.AsyncFunctionDef] | None: + try: + source = Path(filepath).read_text("utf-8") + except OSError: + return None + try: + tree = ast.parse(source) + except SyntaxError: + return None + + collector = _QualnameCollector() + collector.visit(tree) + return collector.funcs + + +def _segment_statements( + func_node: ast.FunctionDef | ast.AsyncFunctionDef, start_line: int, end_line: int +) -> list[ast.stmt]: + body = getattr(func_node, "body", None) + if not isinstance(body, list): + return [] + stmts: list[ast.stmt] = [] + for stmt in body: + lineno = getattr(stmt, "lineno", None) + end = getattr(stmt, "end_lineno", None) + if lineno is None or end is None: + continue + if lineno >= start_line and end <= end_line: + stmts.append(stmt) + return stmts + + +def _assign_targets_attribute_only(stmt: ast.stmt) -> bool: + if isinstance(stmt, ast.Assign): + return all(isinstance(t, ast.Attribute) for t in stmt.targets) + if isinstance(stmt, ast.AnnAssign): + return isinstance(stmt.target, ast.Attribute) + return False + + +def _analyze_segment_statements(stmts: list[ast.stmt]) -> _SegmentAnalysis | None: + if not stmts: + return None + + unique_types = {type(s) for s in stmts} + has_control_flow = any(isinstance(s, _CONTROL_FLOW_STMTS) for s in stmts) + has_forbidden = any(isinstance(s, _FORBIDDEN_STMTS) for s in stmts) + has_call_stmt = any( + isinstance(s, ast.Expr) and isinstance(s.value, ast.Call) for s in stmts + ) + + assign_stmts = [s for s in stmts if isinstance(s, (ast.Assign, ast.AnnAssign))] + assign_ratio = len(assign_stmts) / len(stmts) + assign_attr_only = all(_assign_targets_attribute_only(s) for s in assign_stmts) + + is_boilerplate = ( + assign_ratio >= 0.8 + and assign_attr_only + and not has_control_flow + and not has_forbidden + and not has_call_stmt + ) + + return _SegmentAnalysis( + unique_stmt_types=len(unique_types), + has_control_flow=has_control_flow, + is_boilerplate=is_boilerplate, + ) + + +def prepare_segment_report_groups( + segment_groups: GroupMap, +) -> tuple[GroupMap, int]: + """ + Merge overlapping segment windows and suppress low-value boilerplate groups + for reporting. Detection hashes remain unchanged. + """ + suppressed = 0 + filtered: GroupMap = {} + file_cache: dict[str, dict[str, ast.FunctionDef | ast.AsyncFunctionDef] | None] = {} + + for key, items in segment_groups.items(): + merged_items = _merge_segment_items(items) + if not merged_items: + continue + + analyses: list[_SegmentAnalysis] = [] + unknown = False + for item in merged_items: + filepath = str(item.get("filepath", "")) + qualname = str(item.get("qualname", "")) + start_line = int(item.get("start_line", 0)) + end_line = int(item.get("end_line", 0)) + if not filepath or not qualname or start_line <= 0 or end_line <= 0: + unknown = True + break + + if filepath not in file_cache: + file_cache[filepath] = _collect_file_functions(filepath) + funcs = file_cache[filepath] + if not funcs: + unknown = True + break + + local_name = qualname.split(":", 1)[1] if ":" in qualname else qualname + func_node = funcs.get(local_name) + if func_node is None: + unknown = True + break + + stmts = _segment_statements(func_node, start_line, end_line) + analysis = _analyze_segment_statements(stmts) + if analysis is None: + unknown = True + break + analyses.append(analysis) + + if unknown: + filtered[key] = merged_items + continue + + all_boilerplate = all(a.is_boilerplate for a in analyses) + all_too_simple = all( + (not a.has_control_flow) + and (a.unique_stmt_types < SEGMENT_MIN_UNIQUE_STMT_TYPES) + for a in analyses + ) + if all_boilerplate or all_too_simple: + suppressed += 1 + continue + + filtered[key] = merged_items + + return filtered, suppressed diff --git a/codeclone/_report_serialize.py b/codeclone/_report_serialize.py new file mode 100644 index 0000000..54dcef5 --- /dev/null +++ b/codeclone/_report_serialize.py @@ -0,0 +1,160 @@ +""" +CodeClone — AST and CFG-based code clone detector for Python +focused on architectural duplication. + +Copyright (c) 2026 Den Rozhnovskiy +Licensed under the MIT License. +""" + +from __future__ import annotations + +import json +from collections.abc import Mapping +from typing import Any + +from ._report_types import GroupItem, GroupMap + + +def to_json(groups: GroupMap) -> str: + def _sorted_items(items: list[GroupItem]) -> list[GroupItem]: + return sorted( + items, + key=lambda item: ( + str(item.get("filepath", "")), + int(item.get("start_line", 0)), + int(item.get("end_line", 0)), + str(item.get("qualname", "")), + ), + ) + + return json.dumps( + { + "group_count": len(groups), + "groups": [ + {"key": k, "count": len(v), "items": _sorted_items(v)} + for k, v in sorted( + groups.items(), + key=lambda kv: (-len(kv[1]), kv[0]), + ) + ], + }, + ensure_ascii=False, + indent=2, + ) + + +def to_json_report( + func_groups: GroupMap, + block_groups: GroupMap, + segment_groups: GroupMap, + meta: Mapping[str, Any] | None = None, +) -> str: + def _sorted_items(items: list[GroupItem]) -> list[GroupItem]: + return sorted( + items, + key=lambda item: ( + str(item.get("filepath", "")), + int(item.get("start_line", 0)), + int(item.get("end_line", 0)), + str(item.get("qualname", "")), + ), + ) + + def _sorted_group_map(groups: GroupMap) -> GroupMap: + return { + k: _sorted_items(v) + for k, v in sorted(groups.items(), key=lambda kv: (-len(kv[1]), kv[0])) + } + + meta_payload = dict(meta or {}) + func_sorted = _sorted_group_map(func_groups) + block_sorted = _sorted_group_map(block_groups) + segment_sorted = _sorted_group_map(segment_groups) + return json.dumps( + { + "meta": meta_payload, + "function_clones": func_sorted, + "block_clones": block_sorted, + "segment_clones": segment_sorted, + # Backward-compatible keys. + "functions": func_sorted, + "blocks": block_sorted, + "segments": segment_sorted, + }, + ensure_ascii=False, + indent=2, + ) + + +def to_text(groups: GroupMap) -> str: + lines: list[str] = [] + for i, (_, v) in enumerate( + sorted(groups.items(), key=lambda kv: (-len(kv[1]), kv[0])) + ): + items = sorted( + v, + key=lambda item: ( + str(item.get("filepath", "")), + int(item.get("start_line", 0)), + int(item.get("end_line", 0)), + str(item.get("qualname", "")), + ), + ) + lines.append(f"\n=== Clone group #{i + 1} (count={len(v)}) ===") + lines.extend( + [ + f"- {item['qualname']} " + f"{item['filepath']}:{item['start_line']}-{item['end_line']} " + f"loc={item.get('loc', item.get('size'))}" + for item in items + ] + ) + return "\n".join(lines).strip() + "\n" + + +def _format_meta_text_value(value: Any) -> str: + if isinstance(value, bool): + return "true" if value else "false" + if value is None: + return "n/a" + text = str(value).strip() + return text if text else "n/a" + + +def to_text_report( + *, + meta: Mapping[str, Any], + func_groups: GroupMap, + block_groups: GroupMap, + segment_groups: GroupMap, +) -> str: + lines = [ + "REPORT METADATA", + f"CodeClone version: {_format_meta_text_value(meta.get('codeclone_version'))}", + f"Python version: {_format_meta_text_value(meta.get('python_version'))}", + f"Baseline path: {_format_meta_text_value(meta.get('baseline_path'))}", + f"Baseline version: {_format_meta_text_value(meta.get('baseline_version'))}", + "Baseline schema version: " + f"{_format_meta_text_value(meta.get('baseline_schema_version'))}", + "Baseline Python version: " + f"{_format_meta_text_value(meta.get('baseline_python_version'))}", + f"Baseline loaded: {_format_meta_text_value(meta.get('baseline_loaded'))}", + f"Baseline status: {_format_meta_text_value(meta.get('baseline_status'))}", + ] + if "cache_path" in meta: + lines.append(f"Cache path: {_format_meta_text_value(meta.get('cache_path'))}") + if "cache_used" in meta: + lines.append(f"Cache used: {_format_meta_text_value(meta.get('cache_used'))}") + + sections = [ + ("FUNCTION CLONES", func_groups), + ("BLOCK CLONES", block_groups), + ("SEGMENT CLONES", segment_groups), + ] + for title, groups in sections: + lines.append("") + lines.append(title) + block = to_text(groups).rstrip() + lines.append(block if block else "(none)") + + return "\n".join(lines).rstrip() + "\n" diff --git a/codeclone/_report_types.py b/codeclone/_report_types.py new file mode 100644 index 0000000..6fbe632 --- /dev/null +++ b/codeclone/_report_types.py @@ -0,0 +1,14 @@ +""" +CodeClone — AST and CFG-based code clone detector for Python +focused on architectural duplication. + +Copyright (c) 2026 Den Rozhnovskiy +Licensed under the MIT License. +""" + +from __future__ import annotations + +from typing import Any + +GroupItem = dict[str, Any] +GroupMap = dict[str, list[GroupItem]] diff --git a/codeclone/baseline.py b/codeclone/baseline.py index 74f2030..4e0894f 100644 --- a/codeclone/baseline.py +++ b/codeclone/baseline.py @@ -8,58 +8,155 @@ from __future__ import annotations +import hashlib +import hmac import json from collections.abc import Mapping +from datetime import datetime, timezone from pathlib import Path from typing import Any +from . import __version__ +from .errors import BaselineValidationError + +BASELINE_SCHEMA_VERSION = 1 +MAX_BASELINE_SIZE_BYTES = 5 * 1024 * 1024 +BASELINE_GENERATOR = "codeclone" + class Baseline: - __slots__ = ("blocks", "functions", "path", "python_version") + __slots__ = ( + "baseline_version", + "blocks", + "created_at", + "functions", + "generator", + "path", + "payload_sha256", + "python_version", + "schema_version", + ) def __init__(self, path: str | Path): self.path = Path(path) self.functions: set[str] = set() self.blocks: set[str] = set() self.python_version: str | None = None + self.baseline_version: str | None = None + self.schema_version: int | None = None + self.generator: str | None = None + self.payload_sha256: str | None = None + self.created_at: str | None = None - def load(self) -> None: + def load(self, *, max_size_bytes: int | None = None) -> None: if not self.path.exists(): return + size_limit = ( + MAX_BASELINE_SIZE_BYTES if max_size_bytes is None else max_size_bytes + ) try: - data = json.loads(self.path.read_text("utf-8")) - self.functions = set(data.get("functions", [])) - self.blocks = set(data.get("blocks", [])) - python_version = data.get("python_version") - self.python_version = ( - python_version if isinstance(python_version, str) else None + size = self.path.stat().st_size + except OSError as e: + raise BaselineValidationError( + f"Cannot stat baseline file at {self.path}: {e}" + ) from e + if size > size_limit: + raise BaselineValidationError( + "Baseline file is too large " + f"({size} bytes, max {size_limit} bytes) at {self.path}", + status="too_large", ) + + try: + data = json.loads(self.path.read_text("utf-8")) except json.JSONDecodeError as e: - raise ValueError(f"Corrupted baseline file at {self.path}: {e}") from e + raise BaselineValidationError( + f"Corrupted baseline file at {self.path}: {e}" + ) from e + + if not isinstance(data, dict): + raise BaselineValidationError( + f"Baseline payload must be an object at {self.path}" + ) + + functions = _require_str_list(data, "functions", path=self.path) + blocks = _require_str_list(data, "blocks", path=self.path) + python_version = _optional_str(data, "python_version", path=self.path) + baseline_version = _optional_str(data, "baseline_version", path=self.path) + schema_version = _optional_int(data, "schema_version", path=self.path) + generator = _optional_str_loose(data, "generator") + payload_sha256 = _optional_str_loose(data, "payload_sha256") + created_at = _optional_str(data, "created_at", path=self.path) + + self.functions = set(functions) + self.blocks = set(blocks) + self.python_version = python_version + self.baseline_version = baseline_version + self.schema_version = schema_version + self.generator = generator + self.payload_sha256 = payload_sha256 + self.created_at = created_at def save(self) -> None: self.path.parent.mkdir(parents=True, exist_ok=True) + now_utc = datetime.now(timezone.utc).replace(microsecond=0).isoformat() self.path.write_text( json.dumps( - _baseline_payload(self.functions, self.blocks, self.python_version), + _baseline_payload( + self.functions, + self.blocks, + self.python_version, + self.baseline_version, + self.schema_version, + self.generator, + now_utc, + ), indent=2, ensure_ascii=False, ), "utf-8", ) + def is_legacy_format(self) -> bool: + return self.baseline_version is None or self.schema_version is None + + def verify_integrity(self) -> None: + if self.is_legacy_format(): + return + if self.generator != BASELINE_GENERATOR: + raise BaselineValidationError( + "Baseline generator mismatch: expected 'codeclone'.", + status="generator_mismatch", + ) + if not isinstance(self.payload_sha256, str): + raise BaselineValidationError( + "Baseline integrity payload hash is missing.", + status="integrity_missing", + ) + expected = _compute_payload_sha256(self.functions, self.blocks) + if not hmac.compare_digest(self.payload_sha256, expected): + raise BaselineValidationError( + "Baseline integrity check failed: payload_sha256 mismatch.", + status="integrity_failed", + ) + @staticmethod def from_groups( func_groups: Mapping[str, object], block_groups: Mapping[str, object], path: str | Path = "", python_version: str | None = None, + baseline_version: str | None = None, + schema_version: int | None = None, ) -> Baseline: bl = Baseline(path) bl.functions = set(func_groups.keys()) bl.blocks = set(block_groups.keys()) bl.python_version = python_version + bl.baseline_version = baseline_version + bl.schema_version = schema_version + bl.generator = BASELINE_GENERATOR return bl def diff( @@ -74,11 +171,75 @@ def _baseline_payload( functions: set[str], blocks: set[str], python_version: str | None, + baseline_version: str | None, + schema_version: int | None, + generator: str | None, + created_at: str | None, ) -> dict[str, Any]: - payload: dict[str, Any] = { - "functions": sorted(functions), - "blocks": sorted(blocks), - } + payload: dict[str, Any] = _canonical_payload(functions, blocks) if python_version: payload["python_version"] = python_version + payload["baseline_version"] = baseline_version or __version__ + payload["schema_version"] = ( + schema_version if schema_version is not None else BASELINE_SCHEMA_VERSION + ) + payload["generator"] = generator or BASELINE_GENERATOR + payload["payload_sha256"] = _compute_payload_sha256(functions, blocks) + if created_at: + payload["created_at"] = created_at return payload + + +def _canonical_payload(functions: set[str], blocks: set[str]) -> dict[str, list[str]]: + return { + "functions": sorted(functions), + "blocks": sorted(blocks), + } + + +def _compute_payload_sha256(functions: set[str], blocks: set[str]) -> str: + serialized = json.dumps( + _canonical_payload(functions, blocks), + sort_keys=True, + separators=(",", ":"), + ensure_ascii=False, + ) + return hashlib.sha256(serialized.encode("utf-8")).hexdigest() + + +def _require_str_list(data: dict[str, Any], key: str, *, path: Path) -> list[str]: + value = data.get(key) + if not isinstance(value, list) or not all(isinstance(v, str) for v in value): + raise BaselineValidationError( + f"Invalid baseline schema at {path}: '{key}' must be list[str]" + ) + return value + + +def _optional_str(data: dict[str, Any], key: str, *, path: Path) -> str | None: + value = data.get(key) + if value is None: + return None + if not isinstance(value, str): + raise BaselineValidationError( + f"Invalid baseline schema at {path}: '{key}' must be string" + ) + return value + + +def _optional_int(data: dict[str, Any], key: str, *, path: Path) -> int | None: + value = data.get(key) + if value is None: + return None + if not isinstance(value, int): + raise BaselineValidationError( + f"Invalid baseline schema at {path}: '{key}' must be integer" + ) + return value + + +def _optional_str_loose(data: dict[str, Any], key: str) -> str | None: + value = data.get(key) + if isinstance(value, str): + return value + return None diff --git a/codeclone/blocks.py b/codeclone/blocks.py index 551d243..3469361 100644 --- a/codeclone/blocks.py +++ b/codeclone/blocks.py @@ -12,6 +12,7 @@ from dataclasses import dataclass from .blockhash import stmt_hash +from .fingerprint import sha1 from .normalize import NormalizationConfig @@ -25,6 +26,17 @@ class BlockUnit: size: int +@dataclass(frozen=True, slots=True) +class SegmentUnit: + segment_hash: str + segment_sig: str + filepath: str + qualname: str + start_line: int + end_line: int + size: int + + def extract_blocks( func_node: ast.AST, *, @@ -72,3 +84,48 @@ def extract_blocks( break return blocks + + +def extract_segments( + func_node: ast.AST, + *, + filepath: str, + qualname: str, + cfg: NormalizationConfig, + window_size: int, + max_segments: int, +) -> list[SegmentUnit]: + body = getattr(func_node, "body", None) + if not isinstance(body, list) or len(body) < window_size: + return [] + + stmt_hashes = [stmt_hash(stmt, cfg) for stmt in body] + + segments: list[SegmentUnit] = [] + + for i in range(len(stmt_hashes) - window_size + 1): + start = getattr(body[i], "lineno", None) + end = getattr(body[i + window_size - 1], "end_lineno", None) + if not start or not end: + continue + + window = stmt_hashes[i : i + window_size] + segment_hash = sha1("|".join(window)) + segment_sig = sha1("|".join(sorted(window))) + + segments.append( + SegmentUnit( + segment_hash=segment_hash, + segment_sig=segment_sig, + filepath=filepath, + qualname=qualname, + start_line=start, + end_line=end, + size=window_size, + ) + ) + + if len(segments) >= max_segments: + break + + return segments diff --git a/codeclone/cache.py b/codeclone/cache.py index f652d17..566e82e 100644 --- a/codeclone/cache.py +++ b/codeclone/cache.py @@ -19,11 +19,14 @@ from typing import TYPE_CHECKING, Any, TypedDict, cast if TYPE_CHECKING: - from .blocks import BlockUnit + from .blocks import BlockUnit, SegmentUnit from .extractor import Unit from .errors import CacheError +OS_NAME = os.name +MAX_CACHE_SIZE_BYTES = 50 * 1024 * 1024 + class FileStat(TypedDict): mtime_ns: int @@ -50,10 +53,21 @@ class BlockDict(TypedDict): size: int +class SegmentDict(TypedDict): + segment_hash: str + segment_sig: str + filepath: str + qualname: str + start_line: int + end_line: int + size: int + + class CacheEntry(TypedDict): stat: FileStat units: list[UnitDict] blocks: list[BlockDict] + segments: list[SegmentDict] class CacheData(TypedDict): @@ -62,14 +76,17 @@ class CacheData(TypedDict): class Cache: - __slots__ = ("data", "load_warning", "path", "secret") - CACHE_VERSION = "1.0" + __slots__ = ("data", "load_warning", "max_size_bytes", "path", "secret") + CACHE_VERSION = "1.1" - def __init__(self, path: str | Path): + def __init__(self, path: str | Path, *, max_size_bytes: int | None = None): self.path = Path(path) self.data: CacheData = {"version": self.CACHE_VERSION, "files": {}} self.secret = self._load_secret() self.load_warning: str | None = None + self.max_size_bytes = ( + MAX_CACHE_SIZE_BYTES if max_size_bytes is None else max_size_bytes + ) def _load_secret(self) -> bytes: """Load or create cache signing secret.""" @@ -85,7 +102,7 @@ def _load_secret(self) -> bytes: self.path.parent.mkdir(parents=True, exist_ok=True) secret_path.write_bytes(secret) # Set restrictive permissions on secret file (Unix only) - if os.name == "posix": + if OS_NAME == "posix": secret_path.chmod(0o600) except OSError: pass @@ -102,6 +119,15 @@ def load(self) -> None: return try: + size = self.path.stat().st_size + if size > self.max_size_bytes: + self.load_warning = ( + "Cache file too large " + f"({size} bytes, max {self.max_size_bytes}); ignoring cache." + ) + self.data = {"version": self.CACHE_VERSION, "files": {}} + return + raw = json.loads(self.path.read_text("utf-8")) stored_sig = raw.get("_signature") @@ -110,7 +136,10 @@ def load(self) -> None: # Verify signature expected_sig = self._sign_data(data) - if stored_sig != expected_sig: + if not ( + isinstance(stored_sig, str) + and hmac.compare_digest(stored_sig, expected_sig) + ): self.load_warning = "Cache signature mismatch; ignoring cache." self.data = {"version": self.CACHE_VERSION, "files": {}} return @@ -129,7 +158,7 @@ def load(self) -> None: self.data = {"version": self.CACHE_VERSION, "files": {}} return - self.data = cast(CacheData, data) + self.data = cast(CacheData, cast(object, data)) self.load_warning = None except (json.JSONDecodeError, ValueError): @@ -159,10 +188,22 @@ def get_file_entry(self, filepath: str) -> CacheEntry | None: if not isinstance(entry, dict): return None - required = {"stat", "units", "blocks"} + required = {"stat", "units", "blocks", "segments"} if not required.issubset(entry.keys()): return None + stat = entry.get("stat") + units = entry.get("units") + blocks = entry.get("blocks") + segments = entry.get("segments") + if not ( + _is_file_stat_dict(stat) + and _is_unit_list(units) + and _is_block_list(blocks) + and _is_segment_list(segments) + ): + return None + return entry def put_file_entry( @@ -171,11 +212,15 @@ def put_file_entry( stat_sig: FileStat, units: list[Unit], blocks: list[BlockUnit], + segments: list[SegmentUnit], ) -> None: self.data["files"][filepath] = { "stat": stat_sig, "units": cast(list[UnitDict], cast(object, [asdict(u) for u in units])), "blocks": cast(list[BlockDict], cast(object, [asdict(b) for b in blocks])), + "segments": cast( + list[SegmentDict], cast(object, [asdict(s) for s in segments]) + ), } @@ -185,3 +230,56 @@ def file_stat_signature(path: str) -> FileStat: "mtime_ns": st.st_mtime_ns, "size": st.st_size, } + + +def _is_file_stat_dict(value: object) -> bool: + if not isinstance(value, dict): + return False + return isinstance(value.get("mtime_ns"), int) and isinstance(value.get("size"), int) + + +def _is_unit_dict(value: object) -> bool: + if not isinstance(value, dict): + return False + string_keys = ("qualname", "filepath", "fingerprint", "loc_bucket") + int_keys = ("start_line", "end_line", "loc", "stmt_count") + return _has_typed_fields(value, string_keys=string_keys, int_keys=int_keys) + + +def _is_block_dict(value: object) -> bool: + if not isinstance(value, dict): + return False + string_keys = ("block_hash", "filepath", "qualname") + int_keys = ("start_line", "end_line", "size") + return _has_typed_fields(value, string_keys=string_keys, int_keys=int_keys) + + +def _is_segment_dict(value: object) -> bool: + if not isinstance(value, dict): + return False + string_keys = ("segment_hash", "segment_sig", "filepath", "qualname") + int_keys = ("start_line", "end_line", "size") + return _has_typed_fields(value, string_keys=string_keys, int_keys=int_keys) + + +def _is_unit_list(value: object) -> bool: + return isinstance(value, list) and all(_is_unit_dict(item) for item in value) + + +def _is_block_list(value: object) -> bool: + return isinstance(value, list) and all(_is_block_dict(item) for item in value) + + +def _is_segment_list(value: object) -> bool: + return isinstance(value, list) and all(_is_segment_dict(item) for item in value) + + +def _has_typed_fields( + value: dict[str, object], + *, + string_keys: tuple[str, ...], + int_keys: tuple[str, ...], +) -> bool: + return all(isinstance(value.get(key), str) for key in string_keys) and all( + isinstance(value.get(key), int) for key in int_keys + ) diff --git a/codeclone/cfg.py b/codeclone/cfg.py index 9235a7f..625f1f8 100644 --- a/codeclone/cfg.py +++ b/codeclone/cfg.py @@ -10,9 +10,11 @@ import ast from collections.abc import Iterable +from dataclasses import dataclass from typing import Protocol, cast from .cfg_model import CFG, Block +from .meta_markers import CFG_META_PREFIX __all__ = ["CFG", "CFGBuilder"] @@ -26,17 +28,28 @@ class _TryLike(Protocol): finalbody: list[ast.stmt] +@dataclass(slots=True) +class _LoopContext: + continue_target: Block + break_target: Block + + +def _meta_expr(value: str) -> ast.Expr: + return ast.Expr(value=ast.Name(id=f"{CFG_META_PREFIX}{value}", ctx=ast.Load())) + + # ========================= # CFG Builder # ========================= class CFGBuilder: - __slots__ = ("cfg", "current") + __slots__ = ("_loop_stack", "cfg", "current") def __init__(self) -> None: self.cfg: CFG self.current: Block + self._loop_stack: list[_LoopContext] = [] def build( self, @@ -73,6 +86,12 @@ def _visit(self, stmt: ast.stmt) -> None: self.current.is_terminated = True self.current.add_successor(self.cfg.exit) + case ast.Break(): + self._visit_break(stmt) + + case ast.Continue(): + self._visit_continue(stmt) + case ast.If(): self._visit_if(stmt) @@ -88,7 +107,7 @@ def _visit(self, stmt: ast.stmt) -> None: case ast.Try(): self._visit_try(cast(_TryLike, stmt)) case _ if TryStar is not None and isinstance(stmt, TryStar): - self._visit_try(cast(_TryLike, stmt)) + self._visit_try(cast(_TryLike, cast(object, stmt))) case ast.With() | ast.AsyncWith(): self._visit_with(stmt) @@ -102,14 +121,11 @@ def _visit(self, stmt: ast.stmt) -> None: # ---------- Control Flow ---------- def _visit_if(self, stmt: ast.If) -> None: - self.current.statements.append(ast.Expr(value=stmt.test)) - then_block = self.cfg.create_block() else_block = self.cfg.create_block() after_block = self.cfg.create_block() - self.current.add_successor(then_block) - self.current.add_successor(else_block) + self._emit_condition(stmt.test, then_block, else_block) self.current = then_block self._visit_statements(stmt.body) @@ -126,25 +142,36 @@ def _visit_if(self, stmt: ast.If) -> None: def _visit_while(self, stmt: ast.While) -> None: cond_block = self.cfg.create_block() body_block = self.cfg.create_block() + else_block = self.cfg.create_block() if stmt.orelse else None after_block = self.cfg.create_block() self.current.add_successor(cond_block) self.current = cond_block - self.current.statements.append(ast.Expr(value=stmt.test)) - self.current.add_successor(body_block) - self.current.add_successor(after_block) + false_target = else_block if else_block is not None else after_block + self._emit_condition(stmt.test, body_block, false_target) + self._loop_stack.append( + _LoopContext(continue_target=cond_block, break_target=after_block) + ) self.current = body_block self._visit_statements(stmt.body) if not self.current.is_terminated: self.current.add_successor(cond_block) + self._loop_stack.pop() + + if else_block is not None: + self.current = else_block + self._visit_statements(stmt.orelse) + if not self.current.is_terminated: + self.current.add_successor(after_block) self.current = after_block def _visit_for(self, stmt: ast.For | ast.AsyncFor) -> None: iter_block = self.cfg.create_block() body_block = self.cfg.create_block() + else_block = self.cfg.create_block() if stmt.orelse else None after_block = self.cfg.create_block() self.current.add_successor(iter_block) @@ -152,12 +179,24 @@ def _visit_for(self, stmt: ast.For | ast.AsyncFor) -> None: self.current = iter_block self.current.statements.append(ast.Expr(value=stmt.iter)) self.current.add_successor(body_block) - self.current.add_successor(after_block) + self.current.add_successor( + else_block if else_block is not None else after_block + ) + self._loop_stack.append( + _LoopContext(continue_target=iter_block, break_target=after_block) + ) self.current = body_block self._visit_statements(stmt.body) if not self.current.is_terminated: self.current.add_successor(iter_block) + self._loop_stack.pop() + + if else_block is not None: + self.current = else_block + self._visit_statements(stmt.orelse) + if not self.current.is_terminated: + self.current.add_successor(after_block) self.current = after_block @@ -193,19 +232,36 @@ def _visit_try(self, stmt: _TryLike) -> None: self.current.add_successor(try_entry) self.current = try_entry - handlers_blocks = [self.cfg.create_block() for _ in stmt.handlers] + handler_test_blocks = [self.cfg.create_block() for _ in stmt.handlers] + handler_body_blocks = [self.cfg.create_block() for _ in stmt.handlers] else_block = self.cfg.create_block() if stmt.orelse else None final_block = self.cfg.create_block() + for idx, (handler, test_block, body_block) in enumerate( + zip(stmt.handlers, handler_test_blocks, handler_body_blocks, strict=True) + ): + test_block.statements.append(_meta_expr(f"TRY_HANDLER_INDEX:{idx}")) + if handler.type is not None: + type_repr = ast.dump(handler.type, annotate_fields=False) + test_block.statements.append( + _meta_expr(f"TRY_HANDLER_TYPE:{type_repr}") + ) + else: + test_block.statements.append(_meta_expr("TRY_HANDLER_TYPE:BARE")) + test_block.add_successor(body_block) + if idx + 1 < len(handler_test_blocks): + test_block.add_successor(handler_test_blocks[idx + 1]) + else: + test_block.add_successor(final_block) + # Process each statement in try body - # Link each to exception handlers + # Link only statements that can raise to exception handlers for stmt_node in stmt.body: if self.current.is_terminated: break - # Current statement could raise exception - for h_block in handlers_blocks: - self.current.add_successor(h_block) + if _stmt_can_raise(stmt_node) and handler_test_blocks: + self.current.add_successor(handler_test_blocks[0]) self._visit(stmt_node) @@ -217,11 +273,8 @@ def _visit_try(self, stmt: _TryLike) -> None: self.current.add_successor(final_block) # Process handlers - for handler, h_block in zip(stmt.handlers, handlers_blocks, strict=True): - self.current = h_block - if handler.type: - self.current.statements.append(ast.Expr(value=handler.type)) - + for handler, body_block in zip(stmt.handlers, handler_body_blocks, strict=True): + self.current = body_block self._visit_statements(handler.body) if not self.current.is_terminated: self.current.add_successor(final_block) @@ -241,23 +294,117 @@ def _visit_try(self, stmt: _TryLike) -> None: def _visit_match(self, stmt: ast.Match) -> None: self.current.statements.append(ast.Expr(value=stmt.subject)) - subject_block = self.current + previous_test_block: Block | None = None after_block = self.cfg.create_block() - for case_ in stmt.cases: - case_block = self.cfg.create_block() - subject_block.add_successor(case_block) + for idx, case_ in enumerate(stmt.cases): + case_test_block = self.cfg.create_block() + case_body_block = self.cfg.create_block() + + if previous_test_block is None: + self.current.add_successor(case_test_block) + else: + previous_test_block.add_successor(case_test_block) - self.current = case_block + case_test_block.statements.append(_meta_expr(f"MATCH_CASE_INDEX:{idx}")) # Record pattern structure pattern_repr = ast.dump(case_.pattern, annotate_fields=False) - self.current.statements.append( - ast.Expr(value=ast.Constant(value=f"PATTERN:{pattern_repr}")) + case_test_block.statements.append( + _meta_expr(f"MATCH_PATTERN:{pattern_repr}") ) + if case_.guard is not None: + case_test_block.statements.append(ast.Expr(value=case_.guard)) + + case_test_block.add_successor(case_body_block) + self.current = case_body_block self._visit_statements(case_.body) if not self.current.is_terminated: self.current.add_successor(after_block) + previous_test_block = case_test_block + + if previous_test_block is not None: + previous_test_block.add_successor(after_block) + self.current = after_block + + def _emit_condition( + self, test: ast.expr, true_block: Block, false_block: Block + ) -> None: + if isinstance(test, ast.BoolOp) and isinstance(test.op, (ast.And, ast.Or)): + self._emit_boolop(test, true_block, false_block) + return + + self.current.statements.append(ast.Expr(value=test)) + self.current.add_successor(true_block) + self.current.add_successor(false_block) + + def _emit_boolop( + self, test: ast.BoolOp, true_block: Block, false_block: Block + ) -> None: + values = test.values + op = test.op + current = self.current + + for idx, value in enumerate(values): + current.statements.append(ast.Expr(value=value)) + is_last = idx == len(values) - 1 + + if isinstance(op, ast.And): + if is_last: + current.add_successor(true_block) + current.add_successor(false_block) + else: + next_block = self.cfg.create_block() + current.add_successor(next_block) + current.add_successor(false_block) + current = next_block + else: + if is_last: + current.add_successor(true_block) + current.add_successor(false_block) + else: + next_block = self.cfg.create_block() + current.add_successor(true_block) + current.add_successor(next_block) + current = next_block + + self.current = current + + def _visit_break(self, stmt: ast.Break) -> None: + self.current.statements.append(stmt) + self.current.is_terminated = True + if self._loop_stack: + self.current.add_successor(self._loop_stack[-1].break_target) + return + self.current.add_successor(self.cfg.exit) + + def _visit_continue(self, stmt: ast.Continue) -> None: + self.current.statements.append(stmt) + self.current.is_terminated = True + if self._loop_stack: + self.current.add_successor(self._loop_stack[-1].continue_target) + return + self.current.add_successor(self.cfg.exit) + + +def _stmt_can_raise(stmt: ast.stmt) -> bool: + if isinstance(stmt, ast.Raise): + return True + + for node in ast.walk(stmt): + if isinstance( + node, + ( + ast.Call, + ast.Attribute, + ast.Subscript, + ast.Await, + ast.YieldFrom, + ), + ): + return True + + return False diff --git a/codeclone/cli.py b/codeclone/cli.py index 677dbf8..0ef5832 100644 --- a/codeclone/cli.py +++ b/codeclone/cli.py @@ -1,6 +1,5 @@ from __future__ import annotations -import argparse import os import sys from concurrent.futures import ProcessPoolExecutor, as_completed @@ -17,16 +16,33 @@ TextColumn, TimeElapsedColumn, ) -from rich.table import Table from rich.theme import Theme -from .baseline import Baseline +from . import __version__ +from . import ui_messages as ui +from ._cli_args import build_parser +from ._cli_meta import _build_report_meta as _build_report_meta_impl +from ._cli_meta import _current_python_version as _current_python_version_impl +from ._cli_paths import _validate_output_path as _validate_output_path_impl +from ._cli_paths import expand_path as _expand_path_impl +from ._cli_summary import _build_summary_rows as _build_summary_rows_impl +from ._cli_summary import _build_summary_table as _build_summary_table_impl +from ._cli_summary import _print_summary as _print_summary_impl +from ._cli_summary import _summary_value_style as _summary_value_style_impl +from .baseline import BASELINE_SCHEMA_VERSION, Baseline from .cache import Cache, CacheEntry, FileStat, file_stat_signature -from .errors import CacheError +from .errors import BaselineValidationError, CacheError from .extractor import extract_units_from_source from .html_report import build_html_report from .normalize import NormalizationConfig -from .report import build_block_groups, build_groups, to_json_report, to_text +from .report import ( + build_block_groups, + build_groups, + build_segment_groups, + prepare_segment_report_groups, + to_json_report, + to_text_report, +) from .scanner import iter_py_files, module_name_from_path # Custom theme for Rich @@ -39,10 +55,39 @@ "dim": "dim", } ) -console = Console(theme=custom_theme, width=200) + + +LEGACY_CACHE_PATH = Path("~/.cache/codeclone/cache.json").expanduser() + + +def _make_console(*, no_color: bool) -> Console: + return Console(theme=custom_theme, width=200, no_color=no_color) + + +console = _make_console(no_color=False) MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB BATCH_SIZE = 100 +_VALID_BASELINE_STATUSES = { + "ok", + "missing", + "legacy", + "invalid", + "mismatch_version", + "mismatch_schema", + "mismatch_python", + "generator_mismatch", + "integrity_missing", + "integrity_failed", + "too_large", +} +_UNTRUSTED_BASELINE_STATUSES = { + "invalid", + "too_large", + "generator_mismatch", + "integrity_missing", + "integrity_failed", +} @dataclass(slots=True) @@ -54,11 +99,12 @@ class ProcessingResult: error: str | None = None units: list[Any] | None = None blocks: list[Any] | None = None + segments: list[Any] | None = None stat: FileStat | None = None def expand_path(p: str) -> Path: - return Path(p).expanduser().resolve() + return _expand_path_impl(p) def process_file( @@ -108,7 +154,7 @@ def process_file( stat = file_stat_signature(filepath) module_name = module_name_from_path(root, filepath) - units, blocks = extract_units_from_source( + units, blocks, segments = extract_units_from_source( source=source, filepath=filepath, module_name=module_name, @@ -122,6 +168,7 @@ def process_file( success=True, units=units, blocks=blocks, + segments=segments, stat=stat, ) @@ -136,133 +183,193 @@ def process_file( def print_banner() -> None: console.print( Panel.fit( - "[bold white]CodeClone[/bold white] [dim]v1.2.1[/dim]\n" - "[italic]Architectural duplication detector[/italic]", + ui.banner_title(__version__), border_style="blue", padding=(0, 2), ) ) -def main() -> None: - ap = argparse.ArgumentParser( - prog="codeclone", - description="AST and CFG-based code clone detector for Python.", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, +def _validate_output_path(path: str, *, expected_suffix: str, label: str) -> Path: + return _validate_output_path_impl( + path, + expected_suffix=expected_suffix, + label=label, + console=console, + invalid_message=ui.fmt_invalid_output_extension, ) - # Core Arguments - core_group = ap.add_argument_group("Target") - core_group.add_argument( - "root", - nargs="?", - default=".", - help="Project root directory to scan.", - ) - # Tuning - tune_group = ap.add_argument_group("Analysis Tuning") - tune_group.add_argument( - "--min-loc", - type=int, - default=15, - help="Minimum Lines of Code (LOC) to consider.", - ) - tune_group.add_argument( - "--min-stmt", - type=int, - default=6, - help="Minimum AST statements to consider.", - ) - tune_group.add_argument( - "--processes", - type=int, - default=4, - help="Number of parallel worker processes.", - ) - tune_group.add_argument( - "--cache-dir", - default="~/.cache/codeclone/cache.json", - help="Path to the cache file to speed up subsequent runs.", +def _current_python_version() -> str: + return _current_python_version_impl() + + +def _build_report_meta( + *, + baseline_path: Path, + baseline: Baseline, + baseline_loaded: bool, + baseline_status: str, + cache_path: Path, + cache_used: bool, +) -> dict[str, Any]: + return _build_report_meta_impl( + codeclone_version=__version__, + baseline_path=baseline_path, + baseline=baseline, + baseline_loaded=baseline_loaded, + baseline_status=baseline_status, + cache_path=cache_path, + cache_used=cache_used, ) - # Baseline & CI - ci_group = ap.add_argument_group("Baseline & CI/CD") - ci_group.add_argument( - "--baseline", - default="codeclone.baseline.json", - help="Path to the baseline file (stored in repo).", - ) - ci_group.add_argument( - "--update-baseline", - action="store_true", - help="Overwrite the baseline file with current results.", - ) - ci_group.add_argument( - "--fail-on-new", - action="store_true", - help="Exit with error if NEW clones (not in baseline) are detected.", - ) - ci_group.add_argument( - "--fail-threshold", - type=int, - default=-1, - metavar="MAX_CLONES", - help="Exit with error if total clone groups exceed this number.", - ) - # Output - out_group = ap.add_argument_group("Reporting") - out_group.add_argument( - "--html", - dest="html_out", - metavar="FILE", - help="Generate an HTML report to FILE.", - ) - out_group.add_argument( - "--json", - dest="json_out", - metavar="FILE", - help="Generate a JSON report to FILE.", - ) - out_group.add_argument( - "--text", - dest="text_out", - metavar="FILE", - help="Generate a text report to FILE.", +def _summary_value_style(*, label: str, value: int) -> str: + return _summary_value_style_impl(label=label, value=value) + + +def _build_summary_rows( + *, + files_found: int, + files_analyzed: int, + cache_hits: int, + files_skipped: int, + func_clones_count: int, + block_clones_count: int, + segment_clones_count: int, + suppressed_segment_groups: int, + new_clones_count: int, +) -> list[tuple[str, int]]: + return _build_summary_rows_impl( + files_found=files_found, + files_analyzed=files_analyzed, + cache_hits=cache_hits, + files_skipped=files_skipped, + func_clones_count=func_clones_count, + block_clones_count=block_clones_count, + segment_clones_count=segment_clones_count, + suppressed_segment_groups=suppressed_segment_groups, + new_clones_count=new_clones_count, ) - out_group.add_argument( - "--no-progress", - action="store_true", - help="Disable the progress bar (recommended for CI logs).", + + +def _build_summary_table(rows: list[tuple[str, int]]) -> Any: + return _build_summary_table_impl(rows) + + +def _print_summary( + *, + quiet: bool, + files_found: int, + files_analyzed: int, + cache_hits: int, + files_skipped: int, + func_clones_count: int, + block_clones_count: int, + segment_clones_count: int, + suppressed_segment_groups: int, + new_clones_count: int, +) -> None: + _print_summary_impl( + console=console, + quiet=quiet, + files_found=files_found, + files_analyzed=files_analyzed, + cache_hits=cache_hits, + files_skipped=files_skipped, + func_clones_count=func_clones_count, + block_clones_count=block_clones_count, + segment_clones_count=segment_clones_count, + suppressed_segment_groups=suppressed_segment_groups, + new_clones_count=new_clones_count, ) + +def main() -> None: + ap = build_parser(__version__) + + cache_path_from_args = any( + arg in {"--cache-dir", "--cache-path"} + or arg.startswith(("--cache-dir=", "--cache-path=")) + for arg in sys.argv + ) args = ap.parse_args() - print_banner() + if args.ci: + args.fail_on_new = True + args.no_color = True + args.quiet = True + + if args.quiet: + args.no_progress = True + + global console + console = _make_console(no_color=args.no_color) + + if args.max_baseline_size_mb < 0 or args.max_cache_size_mb < 0: + console.print("[error]Size limits must be non-negative integers (MB).[/error]") + sys.exit(1) + + if not args.quiet: + print_banner() try: root_path = Path(args.root).resolve() if not root_path.exists(): - console.print(f"[error]Root path does not exist: {root_path}[/error]") + console.print(ui.ERR_ROOT_NOT_FOUND.format(path=root_path)) sys.exit(1) except Exception as e: - console.print(f"[error]Invalid root path: {e}[/error]") + console.print(ui.ERR_INVALID_ROOT_PATH.format(error=e)) sys.exit(1) - console.print(f"[info]Scanning root:[/info] {root_path}") + if not args.quiet: + console.print(ui.fmt_scanning_root(root_path)) + + html_out_path: Path | None = None + json_out_path: Path | None = None + text_out_path: Path | None = None + if args.html_out: + html_out_path = _validate_output_path( + args.html_out, expected_suffix=".html", label="HTML" + ) + if args.json_out: + json_out_path = _validate_output_path( + args.json_out, expected_suffix=".json", label="JSON" + ) + if args.text_out: + text_out_path = _validate_output_path( + args.text_out, expected_suffix=".txt", label="text" + ) # Initialize Cache cfg = NormalizationConfig() - cache_path = Path(args.cache_dir).expanduser() - cache = Cache(cache_path) + if cache_path_from_args and args.cache_path: + cache_path = Path(args.cache_path).expanduser() + else: + cache_path = root_path / ".cache" / "codeclone" / "cache.json" + if LEGACY_CACHE_PATH.exists(): + try: + legacy_resolved = LEGACY_CACHE_PATH.resolve() + except OSError: + legacy_resolved = LEGACY_CACHE_PATH + if legacy_resolved != cache_path: + console.print( + ui.fmt_legacy_cache_warning( + legacy_path=legacy_resolved, new_path=cache_path + ) + ) + cache = Cache(cache_path, max_size_bytes=args.max_cache_size_mb * 1024 * 1024) cache.load() if cache.load_warning: console.print(f"[warning]{cache.load_warning}[/warning]") all_units: list[dict[str, Any]] = [] all_blocks: list[dict[str, Any]] = [] - changed_files_count = 0 + all_segments: list[dict[str, Any]] = [] + files_found = 0 + files_analyzed = 0 + cache_hits = 0 + files_skipped = 0 files_to_process: list[str] = [] def _get_cached_entry( @@ -271,7 +378,7 @@ def _get_cached_entry( try: stat = file_stat_signature(fp) except OSError as e: - return None, None, f"[warning]Skipping file {fp}: {e}[/warning]" + return None, None, ui.fmt_skipping_file(fp, e) cached = cache.get_file_entry(fp) return stat, cached, None @@ -285,7 +392,7 @@ def _safe_process_file(fp: str) -> ProcessingResult | None: args.min_stmt, ) except Exception as e: - console.print(f"[warning]Worker failed: {e}[/warning]") + console.print(ui.fmt_worker_failed(e)) return None def _safe_future_result(future: Any) -> tuple[ProcessingResult | None, str | None]: @@ -295,14 +402,17 @@ def _safe_future_result(future: Any) -> tuple[ProcessingResult | None, str | Non return None, str(e) # Discovery phase - with console.status("[bold green]Discovering Python files...", spinner="dots"): - try: + try: + if args.quiet: for fp in iter_py_files(str(root_path)): + files_found += 1 stat, cached, warn = _get_cached_entry(fp) if warn: console.print(warn) + files_skipped += 1 continue if cached and cached.get("stat") == stat: + cache_hits += 1 all_units.extend( cast( list[dict[str, Any]], @@ -315,11 +425,48 @@ def _safe_future_result(future: Any) -> tuple[ProcessingResult | None, str | Non cast(object, cached.get("blocks", [])), ) ) + all_segments.extend( + cast( + list[dict[str, Any]], + cast(object, cached.get("segments", [])), + ) + ) else: files_to_process.append(fp) - except Exception as e: - console.print(f"[error]Scan failed: {e}[/error]") - sys.exit(1) + else: + with console.status(ui.STATUS_DISCOVERING, spinner="dots"): + for fp in iter_py_files(str(root_path)): + files_found += 1 + stat, cached, warn = _get_cached_entry(fp) + if warn: + console.print(warn) + files_skipped += 1 + continue + if cached and cached.get("stat") == stat: + cache_hits += 1 + all_units.extend( + cast( + list[dict[str, Any]], + cast(object, cached.get("units", [])), + ) + ) + all_blocks.extend( + cast( + list[dict[str, Any]], + cast(object, cached.get("blocks", [])), + ) + ) + all_segments.extend( + cast( + list[dict[str, Any]], + cast(object, cached.get("segments", [])), + ) + ) + else: + files_to_process.append(fp) + except Exception as e: + console.print(ui.ERR_SCAN_FAILED.format(error=e)) + sys.exit(1) total_files = len(files_to_process) failed_files = [] @@ -328,23 +475,28 @@ def _safe_future_result(future: Any) -> tuple[ProcessingResult | None, str | Non if total_files > 0: def handle_result(result: ProcessingResult) -> None: - nonlocal changed_files_count + nonlocal files_analyzed, files_skipped if result.success and result.stat: cache.put_file_entry( result.filepath, result.stat, result.units or [], result.blocks or [], + result.segments or [], ) - changed_files_count += 1 + files_analyzed += 1 if result.units: all_units.extend([asdict(u) for u in result.units]) if result.blocks: all_blocks.extend([asdict(b) for b in result.blocks]) + if result.segments: + all_segments.extend([asdict(s) for s in result.segments]) else: + files_skipped += 1 failed_files.append(f"{result.filepath}: {result.error}") def process_sequential(with_progress: bool) -> None: + nonlocal files_skipped if with_progress: with Progress( SpinnerColumn(), @@ -361,20 +513,26 @@ def process_sequential(with_progress: bool) -> None: result = _safe_process_file(fp) if result is not None: handle_result(result) + else: + files_skipped += 1 + failed_files.append(f"{fp}: worker failed") progress.advance(task) else: - console.print(f"[info]Processing {total_files} changed files...[/info]") + if not args.quiet: + console.print(ui.fmt_processing_changed(total_files)) for fp in files_to_process: result = _safe_process_file(fp) if result is not None: handle_result(result) + else: + files_skipped += 1 + failed_files.append(f"{fp}: worker failed") try: with ProcessPoolExecutor(max_workers=args.processes) as executor: if args.no_progress: - console.print( - f"[info]Processing {total_files} changed files...[/info]" - ) + if not args.quiet: + console.print(ui.fmt_processing_changed(total_files)) # Process in batches to manage memory for i in range(0, total_files, BATCH_SIZE): @@ -390,16 +548,22 @@ def process_sequential(with_progress: bool) -> None: ) for fp in batch ] + future_to_fp = { + id(fut): fp for fut, fp in zip(futures, batch, strict=True) + } for future in as_completed(futures): + fp = future_to_fp[id(future)] result, err = _safe_future_result(future) if result is not None: handle_result(result) elif err is not None: - console.print( - "[warning]Failed to process batch item: " - f"{err}[/warning]" - ) + files_skipped += 1 + reason = err + failed_files.append(f"{fp}: {reason}") + console.print(ui.fmt_batch_item_failed(reason)) + else: + files_skipped += 1 else: with Progress( @@ -428,82 +592,164 @@ def process_sequential(with_progress: bool) -> None: ) for fp in batch ] + future_to_fp = { + id(fut): fp + for fut, fp in zip(futures, batch, strict=True) + } for future in as_completed(futures): + fp = future_to_fp[id(future)] result, err = _safe_future_result(future) if result is not None: handle_result(result) elif err is not None: + files_skipped += 1 + reason = err + failed_files.append(f"{fp}: {reason}") # Should rarely happen due to try/except # in process_file. - console.print( - f"[warning]Worker failed: {err}[/warning]" - ) + console.print(ui.fmt_worker_failed(reason)) + else: + files_skipped += 1 progress.advance(task) except (OSError, RuntimeError, PermissionError) as e: - console.print( - "[warning]Parallel processing unavailable, " - f"falling back to sequential: {e}[/warning]" - ) + console.print(ui.fmt_parallel_fallback(e)) process_sequential(with_progress=not args.no_progress) if failed_files: - console.print( - f"\n[warning]⚠ {len(failed_files)} files failed to process:[/warning]" - ) + console.print(ui.fmt_failed_files_header(len(failed_files))) for failure in failed_files[:10]: console.print(f" • {failure}") if len(failed_files) > 10: console.print(f" ... and {len(failed_files) - 10} more") # Analysis phase - with console.status("[bold green]Grouping clones...", spinner="dots"): + suppressed_segment_groups = 0 + if args.quiet: func_groups = build_groups(all_units) block_groups = build_block_groups(all_blocks) + segment_groups = build_segment_groups(all_segments) + segment_groups, suppressed_segment_groups = prepare_segment_report_groups( + segment_groups + ) try: cache.save() except CacheError as e: - console.print(f"[warning]Failed to save cache: {e}[/warning]") + console.print(ui.fmt_cache_save_failed(e)) + else: + with console.status(ui.STATUS_GROUPING, spinner="dots"): + func_groups = build_groups(all_units) + block_groups = build_block_groups(all_blocks) + segment_groups = build_segment_groups(all_segments) + segment_groups, suppressed_segment_groups = prepare_segment_report_groups( + segment_groups + ) + try: + cache.save() + except CacheError as e: + console.print(ui.fmt_cache_save_failed(e)) # Reporting func_clones_count = len(func_groups) block_clones_count = len(block_groups) + segment_clones_count = len(segment_groups) # Baseline Logic baseline_path = Path(args.baseline).expanduser().resolve() - # If user didn't specify path and default logic applies, baseline_path - # is now ./codeclone_baseline.json + # If user didn't specify path, the default is ./codeclone.baseline.json. baseline = Baseline(baseline_path) baseline_exists = baseline_path.exists() + baseline_loaded = False + baseline_status = "missing" + baseline_failure_code: int | None = None + baseline_trusted_for_diff = False if baseline_exists: - baseline.load() - if not args.update_baseline and baseline.python_version: - current_version = f"{sys.version_info.major}.{sys.version_info.minor}" - if baseline.python_version != current_version: - console.print( - "[warning]Baseline Python version mismatch.[/warning]\n" - f"Baseline was generated with Python {baseline.python_version}.\n" - f"Current interpreter: Python {current_version}." - ) + try: + baseline.load(max_size_bytes=args.max_baseline_size_mb * 1024 * 1024) + except BaselineValidationError as e: + baseline_status = ( + e.status if e.status in _VALID_BASELINE_STATUSES else "invalid" + ) + if not args.update_baseline: + console.print(ui.fmt_invalid_baseline(e)) if args.fail_on_new: - console.print( - "[error]Baseline checks require the same Python version to " - "ensure deterministic results. Please regenerate the baseline " - "using the current interpreter.[/error]" - ) - sys.exit(2) + baseline_failure_code = 2 + else: + console.print(ui.WARN_BASELINE_IGNORED) + else: + baseline_loaded = True + baseline_status = "ok" + baseline_trusted_for_diff = True + if not args.update_baseline: + if baseline.is_legacy_format(): + baseline_status = "legacy" + console.print(ui.fmt_baseline_version_missing(__version__)) + baseline_failure_code = 2 + baseline_trusted_for_diff = False + else: + if baseline.baseline_version != __version__: + assert baseline.baseline_version is not None + baseline_status = "mismatch_version" + console.print( + ui.fmt_baseline_version_mismatch( + baseline_version=baseline.baseline_version, + current_version=__version__, + ) + ) + baseline_failure_code = 2 + baseline_trusted_for_diff = False + if baseline.schema_version != BASELINE_SCHEMA_VERSION: + assert baseline.schema_version is not None + if baseline_status == "ok": + baseline_status = "mismatch_schema" + console.print( + ui.fmt_baseline_schema_mismatch( + baseline_schema=baseline.schema_version, + current_schema=BASELINE_SCHEMA_VERSION, + ) + ) + baseline_failure_code = 2 + baseline_trusted_for_diff = False + if baseline.python_version: + current_version = _current_python_version() + if baseline.python_version != current_version: + if baseline_status == "ok": + baseline_status = "mismatch_python" + console.print( + ui.fmt_baseline_python_mismatch( + baseline_python=baseline.python_version, + current_python=current_version, + ) + ) + if args.fail_on_new: + console.print(ui.ERR_BASELINE_SAME_PYTHON_REQUIRED) + baseline_failure_code = 2 + baseline_trusted_for_diff = False + if baseline_status == "ok": + try: + baseline.verify_integrity() + except BaselineValidationError as e: + status = ( + e.status + if e.status in _VALID_BASELINE_STATUSES + else "invalid" + ) + baseline_status = status + console.print(ui.fmt_invalid_baseline(e)) + baseline_trusted_for_diff = False + if args.fail_on_new: + baseline_failure_code = 2 + else: + console.print(ui.WARN_BASELINE_IGNORED) + if baseline_status in _UNTRUSTED_BASELINE_STATUSES: + baseline_loaded = False + baseline_trusted_for_diff = False else: if not args.update_baseline: - console.print( - "[warning]Baseline file not found at: [bold]" - f"{baseline_path}" - "[/bold][/warning]\n" - "[dim]Comparing against an empty baseline. " - "Use --update-baseline to create it.[/dim]" - ) + console.print(ui.fmt_path(ui.WARN_BASELINE_MISSING, baseline_path)) if args.update_baseline: new_baseline = Baseline.from_groups( @@ -511,92 +757,138 @@ def process_sequential(with_progress: bool) -> None: block_groups, path=baseline_path, python_version=f"{sys.version_info.major}.{sys.version_info.minor}", + baseline_version=__version__, + schema_version=BASELINE_SCHEMA_VERSION, ) new_baseline.save() - console.print(f"[success]✔ Baseline updated:[/success] {baseline_path}") + console.print(ui.fmt_path(ui.SUCCESS_BASELINE_UPDATED, baseline_path)) + baseline = new_baseline + baseline_loaded = True + baseline_status = "ok" + baseline_trusted_for_diff = True # When updating, we don't fail on new, we just saved the new state. # But we might still want to print the summary. + report_meta = _build_report_meta( + baseline_path=baseline_path, + baseline=baseline, + baseline_loaded=baseline_loaded, + baseline_status=baseline_status, + cache_path=cache_path.resolve(), + cache_used=cache.load_warning is None, + ) + # Diff - new_func, new_block = baseline.diff(func_groups, block_groups) + baseline_for_diff = ( + baseline if baseline_trusted_for_diff else Baseline(baseline_path) + ) + new_func, new_block = baseline_for_diff.diff(func_groups, block_groups) new_clones_count = len(new_func) + len(new_block) - # Summary Table - table = Table(title="Analysis Summary", border_style="blue") - table.add_column("Metric", style="cyan") - table.add_column("Value", style="bold white") - - table.add_row("Files Processed", str(changed_files_count)) - table.add_row("Total Function Clones", str(func_clones_count)) - table.add_row("Total Block Clones", str(block_clones_count)) - - if baseline_exists: - style = "error" if new_clones_count > 0 else "success" - table.add_row( - "New Clones (vs Baseline)", f"[{style}]{new_clones_count}[/{style}]" - ) - - console.print(table) + _print_summary( + quiet=args.quiet, + files_found=files_found, + files_analyzed=files_analyzed, + cache_hits=cache_hits, + files_skipped=files_skipped, + func_clones_count=func_clones_count, + block_clones_count=block_clones_count, + segment_clones_count=segment_clones_count, + suppressed_segment_groups=suppressed_segment_groups, + new_clones_count=new_clones_count, + ) # Outputs - if args.html_out: - out = Path(args.html_out).expanduser().resolve() + html_report_path: str | None = None + output_notice_printed = False + + def _print_output_notice(message: str) -> None: + nonlocal output_notice_printed + if args.quiet: + return + if not output_notice_printed: + console.print("") + output_notice_printed = True + console.print(message) + + if html_out_path: + out = html_out_path out.parent.mkdir(parents=True, exist_ok=True) out.write_text( build_html_report( func_groups=func_groups, block_groups=block_groups, + segment_groups=segment_groups, + report_meta=report_meta, title="CodeClone Report", context_lines=3, max_snippet_lines=220, ), "utf-8", ) - console.print(f"[info]HTML report saved:[/info] {out}") + html_report_path = str(out) + _print_output_notice(ui.fmt_path(ui.INFO_HTML_REPORT_SAVED, out)) - if args.json_out: - out = Path(args.json_out).expanduser().resolve() + if json_out_path: + out = json_out_path out.parent.mkdir(parents=True, exist_ok=True) out.write_text( - to_json_report(func_groups, block_groups), + to_json_report(func_groups, block_groups, segment_groups, report_meta), "utf-8", ) - console.print(f"[info]JSON report saved:[/info] {out}") + _print_output_notice(ui.fmt_path(ui.INFO_JSON_REPORT_SAVED, out)) - if args.text_out: - out = Path(args.text_out).expanduser().resolve() + if text_out_path: + out = text_out_path out.parent.mkdir(parents=True, exist_ok=True) out.write_text( - "FUNCTION CLONES\n" - + to_text(func_groups) - + "\nBLOCK CLONES\n" - + to_text(block_groups), + to_text_report( + meta=report_meta, + func_groups=func_groups, + block_groups=block_groups, + segment_groups=segment_groups, + ), "utf-8", ) - console.print(f"[info]Text report saved:[/info] {out}") + _print_output_notice(ui.fmt_path(ui.INFO_TEXT_REPORT_SAVED, out)) + + if baseline_failure_code is not None: + sys.exit(baseline_failure_code) # Exit Codes if args.fail_on_new and (new_func or new_block): - console.print("\n[error]❌ FAILED: New code clones detected![/error]") - if new_func: - console.print(f" New Functions: {', '.join(sorted(new_func))}") - if new_block: - console.print(f" New Blocks: {', '.join(sorted(new_block))}") + default_report = Path(".cache/codeclone/report.html") + if html_report_path is None and default_report.exists(): + html_report_path = str(default_report) + + console.print(f"\n{ui.FAIL_NEW_TITLE}") + console.print(f"\n{ui.FAIL_NEW_SUMMARY_TITLE}") + console.print(ui.FAIL_NEW_FUNCTION.format(count=len(new_func))) + console.print(ui.FAIL_NEW_BLOCK.format(count=len(new_block))) + if html_report_path: + console.print(f"\n{ui.FAIL_NEW_REPORT_TITLE}") + console.print(f" {html_report_path}") + console.print(f"\n{ui.FAIL_NEW_ACCEPT_TITLE}") + console.print(ui.FAIL_NEW_ACCEPT_COMMAND) + + if args.verbose: + if new_func: + console.print(f"\n{ui.FAIL_NEW_DETAIL_FUNCTION}") + for h in sorted(new_func): + console.print(f"- {h}") + if new_block: + console.print(f"\n{ui.FAIL_NEW_DETAIL_BLOCK}") + for h in sorted(new_block): + console.print(f"- {h}") sys.exit(3) if 0 <= args.fail_threshold < (func_clones_count + block_clones_count): total = func_clones_count + block_clones_count - console.print( - f"\n[error]❌ FAILED: Total clones ({total}) " - f"exceed threshold ({args.fail_threshold})![/error]" - ) + console.print(ui.fmt_fail_threshold(total=total, threshold=args.fail_threshold)) sys.exit(2) if not args.update_baseline and not args.fail_on_new and new_clones_count > 0: - console.print( - "\n[warning]New clones detected but --fail-on-new not set.[/warning]\n" - "Run with --update-baseline to accept them as technical debt." - ) + console.print(ui.WARN_NEW_CLONES_WITHOUT_FAIL) if __name__ == "__main__": diff --git a/codeclone/errors.py b/codeclone/errors.py index c2ab463..11e32b8 100644 --- a/codeclone/errors.py +++ b/codeclone/errors.py @@ -25,3 +25,17 @@ class ValidationError(CodeCloneError): class CacheError(CodeCloneError): """Cache operation failed.""" + + +class BaselineSchemaError(CodeCloneError): + """Baseline file structure is invalid.""" + + +class BaselineValidationError(BaselineSchemaError): + """Baseline validation error with machine-readable status.""" + + __slots__ = ("status",) + + def __init__(self, message: str, *, status: str = "invalid") -> None: + super().__init__(message) + self.status = status diff --git a/codeclone/extractor.py b/codeclone/extractor.py index 02f9730..d0a6236 100644 --- a/codeclone/extractor.py +++ b/codeclone/extractor.py @@ -15,7 +15,7 @@ from contextlib import contextmanager from dataclasses import dataclass -from .blocks import BlockUnit, extract_blocks +from .blocks import BlockUnit, SegmentUnit, extract_blocks, extract_segments from .cfg import CFGBuilder from .errors import ParseError from .fingerprint import bucket_loc, sha1 @@ -70,15 +70,14 @@ def _timeout_handler(_signum: int, _frame: object) -> None: old_limits = resource.getrlimit(resource.RLIMIT_CPU) soft, hard = old_limits - new_soft = ( - min(timeout_s, soft) if soft != resource.RLIM_INFINITY else timeout_s - ) - new_hard = ( - min(timeout_s + 1, hard) - if hard != resource.RLIM_INFINITY - else timeout_s + 1 - ) - resource.setrlimit(resource.RLIMIT_CPU, (new_soft, new_hard)) + hard_ceiling = timeout_s if hard == resource.RLIM_INFINITY else max(1, hard) + if soft == resource.RLIM_INFINITY: + new_soft = min(timeout_s, hard_ceiling) + else: + new_soft = min(timeout_s, soft, hard_ceiling) + # Never lower hard limit: raising it back may be disallowed for + # unprivileged processes and can lead to process termination later. + resource.setrlimit(resource.RLIMIT_CPU, (new_soft, hard)) except Exception: # If resource is unavailable or cannot be set, rely on alarm only. pass @@ -189,7 +188,7 @@ def extract_units_from_source( cfg: NormalizationConfig, min_loc: int, min_stmt: int, -) -> tuple[list[Unit], list[BlockUnit]]: +) -> tuple[list[Unit], list[BlockUnit], list[SegmentUnit]]: try: tree = _parse_with_limits(source, PARSE_TIMEOUT_SECONDS) except SyntaxError as e: @@ -200,6 +199,7 @@ def extract_units_from_source( units: list[Unit] = [] block_units: list[BlockUnit] = [] + segment_units: list[SegmentUnit] = [] for local_name, node in qb.units: start = getattr(node, "lineno", None) @@ -243,4 +243,16 @@ def extract_units_from_source( ) block_units.extend(blocks) - return units, block_units + # Segment-level units (windows within functions, for internal clones) + if loc >= 30 and stmt_count >= 12: + segments = extract_segments( + node, + filepath=filepath, + qualname=qualname, + cfg=cfg, + window_size=6, + max_segments=60, + ) + segment_units.extend(segments) + + return units, block_units, segment_units diff --git a/codeclone/html_report.py b/codeclone/html_report.py index eb23f87..c7ddf36 100644 --- a/codeclone/html_report.py +++ b/codeclone/html_report.py @@ -8,247 +8,62 @@ from __future__ import annotations -import html -import importlib -import itertools -from collections.abc import Iterable -from dataclasses import dataclass -from functools import lru_cache -from typing import Any, NamedTuple, cast - -from codeclone import __version__ -from codeclone.errors import FileProcessingError - +from typing import Any + +from . import __version__ +from ._html_escape import _escape_attr, _escape_html, _meta_display +from ._html_snippets import ( + _FileCache, + _prefix_css, + _pygments_css, + _render_code_block, + _try_pygments, + pairwise, +) from .templates import FONT_CSS_URL, REPORT_TEMPLATE -# ============================ -# Pairwise -# ============================ - - -def pairwise(iterable: Iterable[Any]) -> Iterable[tuple[Any, Any]]: - a, b = itertools.tee(iterable) - next(b, None) - return zip(a, b, strict=False) - - -# ============================ -# Code snippet infrastructure -# ============================ - - -@dataclass(slots=True) -class _Snippet: - filepath: str - start_line: int - end_line: int - code_html: str - - -class _FileCache: - __slots__ = ("_get_lines_impl", "maxsize") - - def __init__(self, maxsize: int = 128) -> None: - self.maxsize = maxsize - # Create a bound method with lru_cache - # We need to cache on the method to have instance-level caching if we wanted - # different caches per instance. But lru_cache on method actually caches - # on the function object (class level) if not careful, - # or we use a wrapper. - # However, for this script, we usually have one reporter. - # To be safe and cleaner, we can use a method that delegates to a cached - # function, OR just use lru_cache on a method (which requires 'self' to be - # hashable, which it is by default id). - # But 'self' changes if we create new instances. - # Let's use the audit's pattern: cache the implementation. - - self._get_lines_impl = lru_cache(maxsize=maxsize)(self._read_file_range) - - def _read_file_range( - self, filepath: str, start_line: int, end_line: int - ) -> tuple[str, ...]: - if start_line < 1: - start_line = 1 - if end_line < start_line: - return () - - try: - - def _read_with_errors(errors: str) -> tuple[str, ...]: - lines: list[str] = [] - with open(filepath, encoding="utf-8", errors=errors) as f: - for lineno, line in enumerate(f, start=1): - if lineno < start_line: - continue - if lineno > end_line: - break - lines.append(line.rstrip("\n")) - return tuple(lines) - - try: - return _read_with_errors("strict") - except UnicodeDecodeError: - return _read_with_errors("replace") - except OSError as e: - raise FileProcessingError(f"Cannot read {filepath}: {e}") from e - - def get_lines_range( - self, filepath: str, start_line: int, end_line: int - ) -> tuple[str, ...]: - return self._get_lines_impl(filepath, start_line, end_line) - - class _CacheInfo(NamedTuple): - hits: int - misses: int - maxsize: int | None - currsize: int - - def cache_info(self) -> _CacheInfo: - return cast(_FileCache._CacheInfo, self._get_lines_impl.cache_info()) - - -def _try_pygments(code: str) -> str | None: - try: - pygments = importlib.import_module("pygments") - formatters = importlib.import_module("pygments.formatters") - lexers = importlib.import_module("pygments.lexers") - except Exception: - return None - - highlight = pygments.highlight - formatter_cls = formatters.HtmlFormatter - lexer_cls = lexers.PythonLexer - result = highlight(code, lexer_cls(), formatter_cls(nowrap=True)) - return result if isinstance(result, str) else None - - -def _pygments_css(style_name: str) -> str: - """ - Returns CSS for pygments tokens. Scoped to `.codebox` to avoid leaking styles. - If Pygments is not available or style missing, returns "". - """ - try: - formatters = importlib.import_module("pygments.formatters") - except Exception: - return "" - - try: - formatter_cls = formatters.HtmlFormatter - fmt = formatter_cls(style=style_name) - except Exception: - try: - fmt = formatter_cls() - except Exception: - return "" - - try: - # `.codebox` scope: pygments will emit selectors like `.codebox .k { ... }` - css = fmt.get_style_defs(".codebox") - return css if isinstance(css, str) else "" - except Exception: - return "" - - -def _prefix_css(css: str, prefix: str) -> str: - """ - Prefix every selector block with `prefix `. - Safe enough for pygments CSS which is mostly selector blocks and comments. - """ - out_lines: list[str] = [] - for line in css.splitlines(): - stripped = line.strip() - if not stripped: - out_lines.append(line) - continue - if stripped.startswith(("/*", "*", "*/")): - out_lines.append(line) - continue - # Selector lines usually end with `{ - if "{" in line: - # naive prefix: split at "{", prefix selector part - before, after = line.split("{", 1) - sel = before.strip() - if sel: - out_lines.append(f"{prefix} {sel} {{ {after}".rstrip()) - else: - out_lines.append(line) - else: - out_lines.append(line) - return "\n".join(out_lines) - - -def _render_code_block( - *, - filepath: str, - start_line: int, - end_line: int, - file_cache: _FileCache, - context: int, - max_lines: int, -) -> _Snippet: - s = max(1, start_line - context) - e = end_line + context - - if e - s + 1 > max_lines: - e = s + max_lines - 1 - - lines = file_cache.get_lines_range(filepath, s, e) - - numbered: list[tuple[bool, str]] = [] - for lineno, line in enumerate(lines, start=s): - hit = start_line <= lineno <= end_line - numbered.append((hit, f"{lineno:>5} | {line.rstrip()}")) - - raw = "\n".join(text for _, text in numbered) - highlighted = _try_pygments(raw) - - if highlighted is None: - rendered: list[str] = [] - for hit, text in numbered: - cls = "hitline" if hit else "line" - rendered.append(f'
{html.escape(text)}
') - body = "\n".join(rendered) - else: - body = highlighted - - return _Snippet( - filepath=filepath, - start_line=start_line, - end_line=end_line, - code_html=f'
{body}
', - ) - +__all__ = [ + "_FileCache", + "_prefix_css", + "_pygments_css", + "_render_code_block", + "_try_pygments", + "build_html_report", + "pairwise", +] # ============================ # HTML report builder # ============================ -def _escape(v: Any) -> str: - return html.escape("" if v is None else str(v)) - - -def _group_sort_key(items: list[dict[str, Any]]) -> tuple[int, int]: - return ( - -len(items), - -max(int(i.get("loc") or i.get("size") or 0) for i in items), - ) +def _group_sort_key(items: list[dict[str, Any]]) -> tuple[int]: + return (-len(items),) def build_html_report( *, func_groups: dict[str, list[dict[str, Any]]], block_groups: dict[str, list[dict[str, Any]]], + segment_groups: dict[str, list[dict[str, Any]]], + report_meta: dict[str, Any] | None = None, title: str = "CodeClone Report", context_lines: int = 3, max_snippet_lines: int = 220, ) -> str: file_cache = _FileCache() - func_sorted = sorted(func_groups.items(), key=lambda kv: _group_sort_key(kv[1])) - block_sorted = sorted(block_groups.items(), key=lambda kv: _group_sort_key(kv[1])) + func_sorted = sorted( + func_groups.items(), key=lambda kv: (*_group_sort_key(kv[1]), kv[0]) + ) + block_sorted = sorted( + block_groups.items(), key=lambda kv: (*_group_sort_key(kv[1]), kv[0]) + ) + segment_sorted = sorted( + segment_groups.items(), key=lambda kv: (*_group_sort_key(kv[1]), kv[0]) + ) - has_any = bool(func_sorted) or bool(block_sorted) + has_any = bool(func_sorted) or bool(block_sorted) or bool(segment_sorted) # Pygments CSS (scoped). Use modern GitHub-like styles when available. # We scope per theme to support toggle without reloading. @@ -265,64 +80,60 @@ def build_html_report( # ============================ # Icons (Inline SVG) # ============================ - ICON_SEARCH = ( - '' - '' - '' - "" - ) - ICON_X = ( - '' - '' - '' - "" - ) - ICON_CHEV_DOWN = ( - '' - '' - "" - ) - # ICON_CHEV_RIGHT = ( - # '' - # '' - # "" - # ) - ICON_THEME = ( - '' - '' - "" - ) - ICON_CHECK = ( - '' - '' - "" - ) - ICON_PREV = ( - '' - '' - "" - ) - ICON_NEXT = ( - '' - '' - "" - ) + def _svg_icon(size: int, stroke_width: str, body: str) -> str: + return ( + f'' + f"{body}" + ) + + ICONS = { + "search": _svg_icon( + 16, + "2.5", + '' + '', + ), + "clear": _svg_icon( + 16, + "2.5", + '' + '', + ), + "chev_down": _svg_icon( + 16, + "2.5", + '', + ), + # ICON_CHEV_RIGHT = ( + # '' + # '' + # "" + # ) + "theme": _svg_icon( + 16, + "2", + '', + ), + "check": _svg_icon( + 48, + "2", + '', + ), + "prev": _svg_icon( + 16, + "2", + '', + ), + "next": _svg_icon( + 16, + "2", + '', + ), + } # ---------------------------- # Section renderer @@ -341,16 +152,16 @@ def render_section( out: list[str] = [ f'
', '
', - f"

{_escape(section_title)} " + f"

{_escape_html(section_title)} " f'' f"{len(groups)} groups

", f""" +
+
@@ -970,8 +1377,21 @@
v${version}
+