diff --git a/scripts/group_inconclusive_uncertainty.py b/scripts/group_inconclusive_uncertainty.py new file mode 100755 index 0000000..b9f1b19 --- /dev/null +++ b/scripts/group_inconclusive_uncertainty.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +"""Group inconclusive IAMScope findings by uncertainty class for reporting only.""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +from collections import OrderedDict +from pathlib import Path +from typing import Any + +REPO_ROOT = Path(__file__).resolve().parents[1] +TEST_OVERRIDE_ENV = "IAMSCOPE_ALLOW_REPO_OUTPUT_FOR_TESTS" + + +def _load_json(path: Path) -> dict[str, Any]: + with path.open(encoding="utf-8") as handle: + payload = json.load(handle) + if not isinstance(payload, dict): + raise ValueError(f"expected JSON object in {path}") + return payload + + +def _is_relative_to(path: Path, parent: Path) -> bool: + try: + path.relative_to(parent) + except ValueError: + return False + return True + + +def _reviewer_actions(expected_groups: Path | None) -> dict[str, str]: + if expected_groups is None: + return {} + payload = _load_json(expected_groups) + actions: dict[str, str] = {} + for group in payload.get("groups", []): + if not isinstance(group, dict): + continue + uncertainty_class = group.get("uncertainty_class") + reviewer_action = group.get("reviewer_action") + if isinstance(uncertainty_class, str) and isinstance(reviewer_action, str): + actions[uncertainty_class] = reviewer_action + return actions + + +def group_inconclusive_uncertainty( + findings_payload: dict[str, Any], + *, + reviewer_actions: dict[str, str] | None = None, +) -> dict[str, Any]: + """Return report-only grouping for inconclusive findings. + + This function does not mutate findings, change verdicts, infer exploitability, + or make replay-equivalence claims. + """ + + actions = reviewer_actions or {} + grouped: OrderedDict[str, list[str]] = OrderedDict() + for finding in findings_payload.get("findings", []): + if not isinstance(finding, dict): + continue + if finding.get("verdict") != "inconclusive": + continue + uncertainty_class = finding.get("uncertainty_class") + finding_id = finding.get("finding_id") + if not isinstance(uncertainty_class, str) or not uncertainty_class: + uncertainty_class = "uncertainty_class_missing" + if not isinstance(finding_id, str) or not finding_id: + continue + grouped.setdefault(uncertainty_class, []).append(finding_id) + + group_details: list[dict[str, Any]] = [] + counts: dict[str, int] = {} + for uncertainty_class, finding_ids in grouped.items(): + detail: dict[str, Any] = { + "uncertainty_class": uncertainty_class, + "count": len(finding_ids), + "finding_ids": finding_ids, + } + if uncertainty_class in actions: + detail["reviewer_action"] = actions[uncertainty_class] + group_details.append(detail) + counts[uncertainty_class] = len(finding_ids) + + top_class = None + top_count = 0 + if group_details: + top = max(group_details, key=lambda item: item["count"]) + top_class = top["uncertainty_class"] + top_count = top["count"] + + return { + "fixture_id": findings_payload.get("fixture_id"), + "report_only": True, + "groups": counts, + "group_details": group_details, + "top_uncertainty_class": top_class, + "top_uncertainty_count": top_count, + "non_claims": { + "does_not_mutate_findings": True, + "does_not_change_verdicts": True, + "does_not_infer_exploitability": True, + "does_not_claim_replay_equivalence": True, + "requires_aws_credentials": False, + }, + } + + +def _write_output(output_path: Path, text: str) -> None: + output_abs = output_path.resolve() + repo_abs = REPO_ROOT.resolve() + if _is_relative_to(output_abs, repo_abs) and os.environ.get(TEST_OVERRIDE_ENV) != "1": + raise ValueError( + f"refusing to write uncertainty grouping output inside repository tree: {output_abs}" + ) + output_abs.parent.mkdir(parents=True, exist_ok=True) + output_abs.write_text(text, encoding="utf-8") + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--findings", required=True, help="Path to findings.json") + parser.add_argument( + "--expected-groups", + default=None, + help="Optional expected groups JSON used only to enrich reviewer_action fields", + ) + parser.add_argument("--out", default=None, help="Optional output JSON path; stdout if omitted") + args = parser.parse_args(argv) + + findings_path = Path(args.findings) + expected_path = Path(args.expected_groups) if args.expected_groups else None + try: + payload = group_inconclusive_uncertainty( + _load_json(findings_path), + reviewer_actions=_reviewer_actions(expected_path), + ) + text = json.dumps(payload, indent=2, sort_keys=True) + "\n" + if args.out: + _write_output(Path(args.out), text) + else: + print(text, end="") + except Exception as exc: + print(f"error: {exc}", file=sys.stderr) + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_group_inconclusive_uncertainty.py b/tests/test_group_inconclusive_uncertainty.py new file mode 100644 index 0000000..451abd5 --- /dev/null +++ b/tests/test_group_inconclusive_uncertainty.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +import json +import subprocess +from pathlib import Path + +from scripts.group_inconclusive_uncertainty import group_inconclusive_uncertainty + +REPO_ROOT = Path(__file__).resolve().parents[1] +HELPER = REPO_ROOT / "scripts" / "group_inconclusive_uncertainty.py" +FIXTURE_DIR = REPO_ROOT / "tests" / "fixtures" / "demo" / "path_overcounting_shared_uncertainty" +FINDINGS = FIXTURE_DIR / "findings.json" +EXPECTED_GROUPS = FIXTURE_DIR / "expected_uncertainty_groups.json" +EXPECTED_COUNTS = { + "shared_passrole_target_resource_scope_unknown": 8, + "shared_boundary_context_unresolved": 2, + "session_policy_context_missing": 1, +} + + +def _run_helper(*args: str) -> subprocess.CompletedProcess[str]: + return subprocess.run( + [str(HELPER), *args], + cwd=REPO_ROOT, + check=False, + text=True, + capture_output=True, + ) + + +def _load(path: Path) -> dict: + return json.loads(path.read_text()) + + +def test_groups_only_inconclusive_findings() -> None: + findings = _load(FINDINGS) + result = group_inconclusive_uncertainty(findings) + inconclusive_ids = { + finding["finding_id"] for finding in findings["findings"] if finding["verdict"] == "inconclusive" + } + grouped_ids = {finding_id for group in result["group_details"] for finding_id in group["finding_ids"]} + assert grouped_ids == inconclusive_ids + assert len(grouped_ids) == 11 + + +def test_demo_fixture_group_counts_and_finding_ids() -> None: + result = group_inconclusive_uncertainty(_load(FINDINGS)) + expected = _load(EXPECTED_GROUPS) + expected_ids = {group["uncertainty_class"]: group["finding_ids"] for group in expected["groups"]} + actual_ids = {group["uncertainty_class"]: group["finding_ids"] for group in result["group_details"]} + assert result["groups"] == EXPECTED_COUNTS + assert actual_ids == expected_ids + assert result["top_uncertainty_class"] == "shared_passrole_target_resource_scope_unknown" + assert result["top_uncertainty_count"] == 8 + + +def test_expected_groups_enriches_reviewer_actions() -> None: + expected = _load(EXPECTED_GROUPS) + actions = {group["uncertainty_class"]: group["reviewer_action"] for group in expected["groups"]} + result = group_inconclusive_uncertainty(_load(FINDINGS), reviewer_actions=actions) + actual_actions = {group["uncertainty_class"]: group["reviewer_action"] for group in result["group_details"]} + assert actual_actions == actions + + +def test_helper_does_not_mutate_input_findings() -> None: + before = FINDINGS.read_text() + group_inconclusive_uncertainty(_load(FINDINGS)) + assert FINDINGS.read_text() == before + + +def test_helper_prints_json_to_stdout() -> None: + result = _run_helper("--findings", str(FINDINGS), "--expected-groups", str(EXPECTED_GROUPS)) + assert result.returncode == 0, result.stderr + payload = json.loads(result.stdout) + assert payload["groups"] == EXPECTED_COUNTS + assert payload["non_claims"]["does_not_mutate_findings"] is True + assert payload["non_claims"]["does_not_change_verdicts"] is True + assert payload["non_claims"]["does_not_infer_exploitability"] is True + assert payload["non_claims"]["does_not_claim_replay_equivalence"] is True + assert payload["non_claims"]["requires_aws_credentials"] is False + + +def test_helper_writes_to_temp_output_path(tmp_path: Path) -> None: + output = tmp_path / "uncertainty-groups.json" + result = _run_helper( + "--findings", + str(FINDINGS), + "--expected-groups", + str(EXPECTED_GROUPS), + "--out", + str(output), + ) + assert result.returncode == 0, result.stderr + assert json.loads(output.read_text())["groups"] == EXPECTED_COUNTS + + +def test_helper_refuses_repository_output_path() -> None: + output = REPO_ROOT / "uncertainty-groups.json" + if output.exists(): + output.unlink() + result = _run_helper("--findings", str(FINDINGS), "--out", str(output)) + assert result.returncode == 1 + assert "refusing to write uncertainty grouping output inside repository tree" in result.stderr + assert not output.exists()