diff --git a/src/cli.py b/src/cli.py index 9441f73..67e5ae1 100644 --- a/src/cli.py +++ b/src/cli.py @@ -620,6 +620,15 @@ def build_parser() -> argparse.ArgumentParser: "output/audit-report--*.json warehouse file (requires a prior audit run)" ), ) + parser.add_argument( + "--portfolio-truth-include-security", + action="store_true", + help=( + "Overlay the security.* GHAS alert counts on each project from the latest " + "output/ghas-alerts--*.json file, feeding the active-high-severity-alerts " + "risk factor (requires a prior `audit report --ghas-alerts` run)" + ), + ) parser.add_argument( "--portfolio-context-recovery", action="store_true", @@ -5187,6 +5196,52 @@ def _load_release_count_by_name(*, output_dir: Path, username: str) -> dict[str, return result +def _load_security_alerts_by_name(*, output_dir: Path, username: str) -> dict[str, dict] | None: + """Load per-repo GHAS alert counts from the latest output/ghas-alerts--*.json. + + The file is already keyed by display name in the shape SecurityFields expects + ({name: {"dependabot": {...}, "code_scanning": {...}, "secret_scanning": {...}}}), + so the overlay needs no transformation. Returns None (with a warning) if no GHAS + report is found — the security overlay is then skipped, leaving counts at zero. + """ + import logging + + _log = logging.getLogger(__name__) + + ghas_files = sorted( + output_dir.glob(f"ghas-alerts-{username}-*.json"), + key=lambda p: p.stat().st_mtime, + ) + if not ghas_files: + _log.warning( + "--portfolio-truth-include-security requires a prior `audit report --ghas-alerts` " + "run; no ghas-alerts-%s-*.json found in %s — skipping security overlay", + username, + output_dir, + ) + return None + + ghas_path = ghas_files[-1] + try: + with ghas_path.open() as fh: + data = json.load(fh) + except Exception as exc: # noqa: BLE001 + _log.warning( + "--portfolio-truth-include-security: could not read %s: %s — skipping", + ghas_path, + exc, + ) + return None + + if not isinstance(data, dict): + _log.warning( + "--portfolio-truth-include-security: %s is not a name-keyed object — skipping", + ghas_path, + ) + return None + return {name: entry for name, entry in data.items() if isinstance(entry, dict)} + + def _run_portfolio_truth_mode(args) -> None: from src.portfolio_truth_publish import publish_portfolio_truth @@ -5211,6 +5266,13 @@ def _run_portfolio_truth_mode(args) -> None: username=args.username, ) + security_alerts_by_name: dict[str, dict] | None = None + if getattr(args, "portfolio_truth_include_security", False): + security_alerts_by_name = _load_security_alerts_by_name( + output_dir=output_dir, + username=args.username, + ) + result = publish_portfolio_truth( workspace_root=workspace_root, output_dir=output_dir, @@ -5220,6 +5282,7 @@ def _run_portfolio_truth_mode(args) -> None: legacy_registry_path=legacy_registry_path, include_notion=True, release_count_by_name=release_count_by_name, + security_alerts_by_name=security_alerts_by_name, ) print_info(f"Portfolio truth snapshot: {result.latest_path}") print_info(f"Portfolio truth history snapshot: {result.snapshot_path}") @@ -5381,7 +5444,9 @@ def _run_context_triage_mode(args) -> None: project_key = identity.get("project_key") or "" name = identity.get("display_name") or project.get("name", "") repo_keys.extend(key for key in (project_key, name) if key) - catalog_scores = validate_catalog(catalog_path, sorted(set(repo_keys))) if catalog_path.exists() else {} + catalog_scores = ( + validate_catalog(catalog_path, sorted(set(repo_keys))) if catalog_path.exists() else {} + ) enriched: list[dict] = [] for project in projects: diff --git a/src/portfolio_risk.py b/src/portfolio_risk.py index da01535..638aa24 100644 --- a/src/portfolio_risk.py +++ b/src/portfolio_risk.py @@ -27,6 +27,7 @@ "missing-doctor-standard": "doctor standard not declared", "no-run-instructions": "run instructions missing", "undocumented-risks": "known risks not documented", + "active-high-severity-alerts": "open high/critical security alerts", } _DEFERRED_ARCHIVED = { @@ -36,6 +37,7 @@ "doctor_gap": False, "context_risk": False, "path_risk": False, + "security_risk": False, } _DEFERRED_STALE = { @@ -45,6 +47,7 @@ "doctor_gap": False, "context_risk": False, "path_risk": False, + "security_risk": False, } @@ -61,6 +64,8 @@ def build_risk_entry( doctor_standard: str, known_risks_present: bool, run_instructions_present: bool, + security_high_alerts: int = 0, + security_critical_alerts: int = 0, ) -> dict[str, Any]: # Short-circuit deferred: archived or archive-path if registry_status == "archived" or operating_path == "archive": @@ -91,9 +96,20 @@ def build_risk_entry( if criticality in {"high", "critical"} and not known_risks_present: factors.append("undocumented-risks") + # Active repo carrying open high- or critical-severity Dependabot alerts. + # High alerts contribute one normal factor toward the 3+ elevation threshold; + # an open critical alert force-elevates on its own (see is_elevated below) — a + # lone unpatched critical CVE cannot hide in an otherwise-clean repo. + active = activity_status in ACTIVE_STATUSES + if active and (security_high_alerts > 0 or security_critical_alerts > 0): + factors.append("active-high-severity-alerts") + # Derive tier - is_elevated = len(factors) >= 3 or ( - "weak-context-active" in factors and "investigate-override" in factors + security_forces_elevated = active and security_critical_alerts > 0 + is_elevated = ( + len(factors) >= 3 + or ("weak-context-active" in factors and "investigate-override" in factors) + or security_forces_elevated ) if is_elevated: risk_tier = "elevated" @@ -106,6 +122,7 @@ def build_risk_entry( doctor_gap = "missing-doctor-standard" in factors context_risk = "weak-context-active" in factors path_risk = "investigate-override" in factors or "missing-operating-path" in factors + security_risk = "active-high-severity-alerts" in factors # Build summary if not factors: @@ -121,6 +138,7 @@ def build_risk_entry( "doctor_gap": doctor_gap, "context_risk": context_risk, "path_risk": path_risk, + "security_risk": security_risk, } diff --git a/src/portfolio_truth_publish.py b/src/portfolio_truth_publish.py index 337a3d2..ac60051 100644 --- a/src/portfolio_truth_publish.py +++ b/src/portfolio_truth_publish.py @@ -36,6 +36,7 @@ def publish_portfolio_truth( legacy_registry_path: Path | None = None, include_notion: bool = True, release_count_by_name: dict[str, int] | None = None, + security_alerts_by_name: dict[str, dict] | None = None, ) -> PortfolioTruthPublishResult: validate_publish_targets( workspace_root=workspace_root, @@ -49,6 +50,7 @@ def publish_portfolio_truth( legacy_registry_path=legacy_registry_path, include_notion=include_notion, release_count_by_name=release_count_by_name, + security_alerts_by_name=security_alerts_by_name, ) validate_truth_snapshot(build_result.snapshot) diff --git a/src/portfolio_truth_reconcile.py b/src/portfolio_truth_reconcile.py index 743b652..d98ed29 100644 --- a/src/portfolio_truth_reconcile.py +++ b/src/portfolio_truth_reconcile.py @@ -28,6 +28,7 @@ PortfolioTruthProject, PortfolioTruthSnapshot, RiskFields, + SecurityFields, ) from src.registry_parser import _normalize @@ -180,6 +181,7 @@ def build_portfolio_truth_snapshot( include_notion: bool = True, now: datetime | None = None, release_count_by_name: dict[str, int] | None = None, + security_alerts_by_name: dict[str, dict] | None = None, ) -> PortfolioTruthBuildResult: now = now or datetime.now(timezone.utc) catalog_data = load_portfolio_catalog(catalog_path) @@ -199,6 +201,7 @@ def build_portfolio_truth_snapshot( notion_context=notion_context, now=now, release_count_by_name=release_count_by_name, + security_alerts_by_name=security_alerts_by_name, ) for raw_project in workspace_projects ] @@ -259,21 +262,55 @@ def _unresolved_duplicate_display_names(projects: list[PortfolioTruthProject]) - return sorted( name for name, members in grouped.items() - if len(members) > 1 - and any(not _has_path_catalog_contract(project) for project in members) + if len(members) > 1 and any(not _has_path_catalog_contract(project) for project in members) ) def _has_path_catalog_contract(project: PortfolioTruthProject) -> bool: for source in project.provenance.values(): - if ( - source.get("source") == "catalog_repo" - and source.get("detail") == project.identity.path - ): + if source.get("source") == "catalog_repo" and source.get("detail") == project.identity.path: return True return False +def _build_security_fields(ghas_entry: dict[str, Any] | None) -> SecurityFields: + """Map a per-repo GHAS alert entry (from output/ghas-alerts--*.json) + into SecurityFields. A missing/None entry yields all-zero counts with + alerts_available=False (the repo was not scanned) — distinct from a clean scan, + and keeps the security overlay strictly opt-in (no entry → no security signal).""" + if not ghas_entry: + return SecurityFields() + dependabot = ghas_entry.get("dependabot") or {} + code_scanning = ghas_entry.get("code_scanning") or {} + secret_scanning = ghas_entry.get("secret_scanning") or {} + + def _count(source: dict[str, Any], key: str) -> int: + value = source.get(key) + return value if isinstance(value, int) and value >= 0 else 0 + + return SecurityFields( + alerts_available=bool(dependabot.get("available", False)), + dependabot_critical=_count(dependabot, "critical"), + dependabot_high=_count(dependabot, "high"), + dependabot_medium=_count(dependabot, "medium"), + dependabot_low=_count(dependabot, "low"), + code_scanning_critical=_count(code_scanning, "critical"), + code_scanning_high=_count(code_scanning, "high"), + secret_scanning_open=_count(secret_scanning, "open"), + ) + + +def _select_security_entry( + lookup: dict[str, dict], repo_full_name: str | None, display_name: str +) -> dict | None: + """Join a project to its GHAS overlay entry. The overlay is keyed by GitHub repo + name, but the local dir display_name often differs (e.g. "Signal & Noise" vs + "signal-noise"), so match on the repo name from repo_full_name first and fall back + to display_name only when repo_full_name is absent or unmatched.""" + repo_name = (repo_full_name or "").rsplit("/", 1)[-1] + return lookup.get(repo_name) or lookup.get(display_name) + + def _build_truth_project( raw_project: dict[str, Any], *, @@ -282,6 +319,7 @@ def _build_truth_project( notion_context: dict[str, dict[str, str]], now: datetime, release_count_by_name: dict[str, int] | None = None, + security_alerts_by_name: dict[str, dict] | None = None, ) -> PortfolioTruthProject: relative_path = raw_project["path"] group_entry = group_entry_for_path(relative_path, catalog_data) @@ -393,6 +431,16 @@ def _build_truth_project( "detail": "derived", } + security_entry = _select_security_entry( + security_alerts_by_name or {}, + raw_project.get("repo_full_name"), + raw_project["name"], + ) + security = _build_security_fields(security_entry) + + # Only Dependabot high/critical counts drive the risk tier today. Code-scanning + # and secret-scanning counts are captured in SecurityFields for visibility but do + # not yet feed the active-high-severity-alerts factor (Dependabot-only scope). risk_entry = build_risk_entry( display_name=raw_project["name"], operating_path=path_entry.get("operating_path", ""), @@ -405,6 +453,8 @@ def _build_truth_project( doctor_standard=declared_values["doctor_standard"], known_risks_present=bool(raw_project["known_risks_present"]), run_instructions_present=bool(raw_project["run_instructions_present"]), + security_high_alerts=security.dependabot_high, + security_critical_alerts=security.dependabot_critical, ) declared = DeclaredFields( @@ -516,6 +566,7 @@ def _build_truth_project( doctor_gap=risk_entry["doctor_gap"], context_risk=risk_entry["context_risk"], path_risk=risk_entry["path_risk"], + security_risk=risk_entry["security_risk"], ) provenance["risk.risk_tier"] = {"source": "derived", "detail": risk_entry["risk_tier"]} provenance["risk.doctor_gap"] = { @@ -527,6 +578,7 @@ def _build_truth_project( declared=declared, derived=derived, risk=risk, + security=security, advisory=advisory, provenance=provenance, warnings=warnings, diff --git a/src/portfolio_truth_types.py b/src/portfolio_truth_types.py index dc33438..40b1aca 100644 --- a/src/portfolio_truth_types.py +++ b/src/portfolio_truth_types.py @@ -5,7 +5,7 @@ from datetime import datetime from typing import Any -SCHEMA_VERSION = "0.4.0" +SCHEMA_VERSION = "0.5.0" VALID_CONTEXT_QUALITY = {"full", "standard", "minimum-viable", "boilerplate", "none"} VALID_ACTIVITY_STATUS = {"active", "recent", "stale", "archived"} @@ -124,6 +124,32 @@ class RiskFields: doctor_gap: bool = False context_risk: bool = False path_risk: bool = False + security_risk: bool = False + + def to_dict(self) -> dict[str, Any]: + return dataclasses.asdict(self) + + +@dataclass(frozen=True) +class SecurityFields: + """Live GitHub Advanced Security alert counts, overlaid opt-in from the latest + output/ghas-alerts--*.json. When alerts_available is False the repo was + not scanned (no token / GHAS disabled / not fetched) — distinct from a clean scan + with zero open alerts, so consumers don't mislabel an unscanned repo as secure.""" + + alerts_available: bool = False + dependabot_critical: int = 0 + dependabot_high: int = 0 + dependabot_medium: int = 0 + dependabot_low: int = 0 + code_scanning_critical: int = 0 + code_scanning_high: int = 0 + secret_scanning_open: int = 0 + + @property + def open_high_critical(self) -> int: + """Dependabot high + critical — the security-risk-factor trigger surface.""" + return self.dependabot_high + self.dependabot_critical def to_dict(self) -> dict[str, Any]: return dataclasses.asdict(self) @@ -135,6 +161,7 @@ class PortfolioTruthProject: declared: DeclaredFields derived: DerivedFields risk: RiskFields = field(default_factory=RiskFields) + security: SecurityFields = field(default_factory=SecurityFields) advisory: AdvisoryFields = field(default_factory=AdvisoryFields) provenance: dict[str, dict[str, str]] = field(default_factory=dict) warnings: list[str] = field(default_factory=list) @@ -145,6 +172,7 @@ def to_dict(self) -> dict[str, Any]: "declared": self.declared.to_dict(), "derived": self.derived.to_dict(), "risk": self.risk.to_dict(), + "security": self.security.to_dict(), "advisory": self.advisory.to_dict(), "provenance": self.provenance, "warnings": list(self.warnings), diff --git a/src/weekly_command_center.py b/src/weekly_command_center.py index fecdfb0..912e65b 100644 --- a/src/weekly_command_center.py +++ b/src/weekly_command_center.py @@ -12,6 +12,7 @@ MAX_PATH_ATTENTION_ITEMS = 5 MAX_REPO_BRIEFINGS = 3 MAX_RISK_ATTENTION_ITEMS = 5 +MAX_SECURITY_ATTENTION_ITEMS = 5 def _safe_text(value: Any) -> str: @@ -94,6 +95,10 @@ def build_weekly_command_center_digest( "risk_tier_counts": truth_summary.get("risk_tier_counts", {}), "top_elevated": _build_risk_attention_items(truth), }, + "security_posture": { + **_build_security_summary(truth), + "top_alerts": _build_security_attention_items(truth), + }, "section_digest": _build_section_digest(weekly_story), "top_repo_briefings": [ { @@ -119,6 +124,7 @@ def render_weekly_command_center_markdown(digest: dict[str, Any]) -> str: portfolio_truth = _mapping(digest.get("portfolio_truth")) risk_posture = _mapping(digest.get("risk_posture")) tier_counts = _mapping(risk_posture.get("risk_tier_counts")) + security_posture = _mapping(digest.get("security_posture")) lines = [ f"# Weekly Command Center: {_safe_text(digest.get('username')) or 'unknown'}", "", @@ -133,6 +139,7 @@ def render_weekly_command_center_markdown(digest: dict[str, Any]) -> str: f"- Operating Paths: {_safe_text(digest.get('operating_paths_summary')) or 'No operating-path summary is recorded yet.'}", f"- Portfolio Truth: {portfolio_truth.get('project_count', 0)} projects, {portfolio_truth.get('active_project_count', 0)} active, {portfolio_truth.get('investigate_override_count', 0)} with investigate override, {portfolio_truth.get('low_confidence_path_count', 0)} low-confidence paths", f"- Risk Posture: {risk_posture.get('elevated_count', 0)} elevated, {tier_counts.get('moderate', 0)} moderate, {tier_counts.get('baseline', 0)} baseline", + f"- Security Posture: {security_posture.get('scanned_count', 0)} scanned, {security_posture.get('repos_with_open_high_critical', 0)} with open high/critical Dependabot alerts ({security_posture.get('total_open_critical', 0)} critical, {security_posture.get('total_open_high', 0)} high)", "", "## Path Attention", ] @@ -154,6 +161,26 @@ def render_weekly_command_center_markdown(digest: dict[str, Any]) -> str: for item in risk_items: lines.append(f"- **{item['repo']}** [{item['risk_tier']}]: {item['risk_summary']}") + lines.extend(["", "## Security Posture"]) + security_items = list(security_posture.get("top_alerts") or []) + scanned_count = int(security_posture.get("scanned_count", 0) or 0) + if security_items: + for item in security_items: + lines.append( + f"- **{item['repo']}** [{item['risk_tier']}]: " + f"{item['dependabot_critical']} critical, {item['dependabot_high']} high " + "open Dependabot alerts" + ) + elif scanned_count > 0: + lines.append( + f"- All {scanned_count} scanned repos are clear of open high/critical Dependabot alerts." + ) + else: + lines.append( + "- Security overlay not run for this snapshot " + "(re-run with `--portfolio-truth-include-security`)." + ) + lines.extend(["", "## Weekly Sections"]) for section in list(digest.get("section_digest") or []): lines.append( @@ -303,6 +330,64 @@ def _build_risk_attention_items(portfolio_truth: dict[str, Any]) -> list[dict[st return items[:MAX_RISK_ATTENTION_ITEMS] +def _build_security_summary(portfolio_truth: dict[str, Any]) -> dict[str, Any]: + """Aggregate the opt-in security overlay across scanned repos. scanned_count is + repos with alerts_available=True (the security overlay ran for them); a scanned + repo with zero open alerts is genuinely clear, distinct from an unscanned one.""" + projects = list(portfolio_truth.get("projects") or []) + scanned = 0 + repos_with_open = 0 + total_critical = 0 + total_high = 0 + for project in projects: + security = _mapping(project.get("security")) + if not security.get("alerts_available"): + continue + scanned += 1 + critical = int(security.get("dependabot_critical") or 0) + high = int(security.get("dependabot_high") or 0) + total_critical += critical + total_high += high + if critical > 0 or high > 0: + repos_with_open += 1 + return { + "scanned_count": scanned, + "repos_with_open_high_critical": repos_with_open, + "total_open_critical": total_critical, + "total_open_high": total_high, + } + + +def _build_security_attention_items(portfolio_truth: dict[str, Any]) -> list[dict[str, Any]]: + """Top scanned repos carrying open high/critical Dependabot alerts, critical-first.""" + projects = list(portfolio_truth.get("projects") or []) + items: list[dict[str, Any]] = [] + for project in projects: + security = _mapping(project.get("security")) + if not security.get("alerts_available"): + continue + critical = int(security.get("dependabot_critical") or 0) + high = int(security.get("dependabot_high") or 0) + if critical <= 0 and high <= 0: + continue + identity = _mapping(project.get("identity")) + risk = _mapping(project.get("risk")) + repo = _safe_text(identity.get("display_name")) + items.append( + { + "repo": repo, + "dependabot_critical": critical, + "dependabot_high": high, + "risk_tier": _safe_text(risk.get("risk_tier")) or "baseline", + "_sort_key": (-critical, -high, repo), + } + ) + items.sort(key=lambda x: x["_sort_key"]) + for item in items: + del item["_sort_key"] + return items[:MAX_SECURITY_ATTENTION_ITEMS] + + def _build_section_digest(weekly_story: dict[str, Any]) -> list[dict[str, Any]]: sections = list(weekly_story.get("sections") or []) return [ diff --git a/tests/test_portfolio_risk.py b/tests/test_portfolio_risk.py index f1f7275..6cdd12e 100644 --- a/tests/test_portfolio_risk.py +++ b/tests/test_portfolio_risk.py @@ -124,6 +124,80 @@ def test_doctor_gap_false_for_non_strategic(): assert "missing-doctor-standard" not in result["risk_factors"] +def test_security_high_alert_adds_single_factor_moderate(): + # An open high-severity Dependabot alert on an active, otherwise-healthy repo + # contributes exactly one factor — moderate, not elevated, on its own. + result = build_risk_entry(**_baseline_kwargs(security_high_alerts=2)) + assert result["risk_tier"] == "moderate" + assert result["risk_factors"] == ["active-high-severity-alerts"] + assert result["security_risk"] is True + assert "open high/critical security alerts" in result["risk_summary"] + + +def test_security_critical_alert_force_elevates(): + # A lone open critical alert force-elevates even on an otherwise-clean repo — + # an unpatched critical CVE cannot hide behind good context/path hygiene. + result = build_risk_entry(**_baseline_kwargs(security_critical_alerts=1)) + assert result["risk_tier"] == "elevated" + assert "active-high-severity-alerts" in result["risk_factors"] + assert result["security_risk"] is True + + +def test_security_no_alerts_leaves_security_risk_false(): + result = build_risk_entry(**_baseline_kwargs()) + assert result["security_risk"] is False + assert "active-high-severity-alerts" not in result["risk_factors"] + + +def test_security_alerts_ignored_when_not_active(): + # Alerts on a stale (non-active) repo on the maintain path do not fire the + # factor or force elevation — the factor is gated on active status, like the others. + result = build_risk_entry( + **_baseline_kwargs( + activity_status="stale", + operating_path="maintain", + security_critical_alerts=3, + security_high_alerts=5, + ) + ) + assert result["risk_tier"] != "elevated" + assert result["security_risk"] is False + assert "active-high-severity-alerts" not in result["risk_factors"] + + +def test_security_alerts_do_not_override_deferred_short_circuit(): + # A stale, non-maintain repo short-circuits to deferred BEFORE any factor is + # evaluated — even open critical alerts cannot pull it out of deferred, and the + # deferred constant carries security_risk=False. + result = build_risk_entry( + **_baseline_kwargs( + activity_status="stale", + operating_path="experiment", + security_critical_alerts=4, + security_high_alerts=2, + ) + ) + assert result["risk_tier"] == "deferred" + assert result["security_risk"] is False + assert result["risk_factors"] == [] + + +def test_security_high_alert_stacks_toward_three_factor_elevation(): + # A high alert counts toward the existing 3+ = elevated threshold alongside + # weak-context-active and no-run-instructions. + result = build_risk_entry( + **_baseline_kwargs( + context_quality="boilerplate", + run_instructions_present=False, + activity_status="active", + security_high_alerts=1, + ) + ) + assert result["risk_tier"] == "elevated" + assert "active-high-severity-alerts" in result["risk_factors"] + assert len(result["risk_factors"]) >= 3 + + def test_portfolio_risk_summary_aggregates_tiers(): projects = [ {"risk": {"risk_tier": "elevated"}}, diff --git a/tests/test_portfolio_truth.py b/tests/test_portfolio_truth.py index 89250b8..da82578 100644 --- a/tests/test_portfolio_truth.py +++ b/tests/test_portfolio_truth.py @@ -201,7 +201,151 @@ def test_truth_snapshot_respects_declared_and_derived_fields( assert gamma.identity.section_marker == "iOS Projects" assert gamma.derived.stack == ["Swift"] - assert result.snapshot.schema_version == "0.4.0" + assert result.snapshot.schema_version == "0.5.0" + + +def test_build_security_fields_maps_ghas_entry() -> None: + from src.portfolio_truth_reconcile import _build_security_fields + + fields = _build_security_fields( + { + "dependabot": {"critical": 2, "high": 3, "medium": 4, "low": 5, "available": True}, + "code_scanning": {"critical": 1, "high": 6, "available": True}, + "secret_scanning": {"open": 7, "available": True}, + } + ) + assert fields.alerts_available is True + assert fields.dependabot_critical == 2 + assert fields.dependabot_high == 3 + assert fields.dependabot_medium == 4 + assert fields.dependabot_low == 5 + assert fields.code_scanning_critical == 1 + assert fields.code_scanning_high == 6 + assert fields.secret_scanning_open == 7 + assert fields.open_high_critical == 5 + + +def test_build_security_fields_none_is_unscanned() -> None: + from src.portfolio_truth_reconcile import _build_security_fields + + fields = _build_security_fields(None) + assert fields.alerts_available is False + assert fields.open_high_critical == 0 + assert fields.dependabot_critical == 0 + + +def test_build_security_fields_unavailable_dependabot_is_not_available() -> None: + from src.portfolio_truth_reconcile import _build_security_fields + + fields = _build_security_fields( + {"dependabot": {"available": False}, "secret_scanning": {"open": 0, "available": False}} + ) + assert fields.alerts_available is False + assert fields.dependabot_high == 0 + + +def test_build_security_fields_scanned_clean_is_available_with_zero_counts() -> None: + # A repo whose Dependabot scan succeeded with zero open alerts must read as + # scanned-and-clean (available=True), distinct from an unscanned repo. + from src.portfolio_truth_reconcile import _build_security_fields + + fields = _build_security_fields({"dependabot": {"available": True}}) + assert fields.alerts_available is True + assert fields.dependabot_high == 0 + assert fields.dependabot_critical == 0 + assert fields.open_high_critical == 0 + + +def test_security_overlay_populates_and_force_elevates( + portfolio_workspace: Path, + portfolio_catalog: Path, + legacy_registry: Path, +) -> None: + now = datetime.fromtimestamp(1_700_200_000, tz=timezone.utc) + security = { + "Alpha": { + "dependabot": {"critical": 1, "high": 0, "medium": 0, "low": 2, "available": True}, + "code_scanning": {"critical": 0, "high": 0, "available": True}, + "secret_scanning": {"open": 0, "available": True}, + } + } + result = build_portfolio_truth_snapshot( + workspace_root=portfolio_workspace, + catalog_path=portfolio_catalog, + legacy_registry_path=legacy_registry, + include_notion=False, + now=now, + security_alerts_by_name=security, + ) + projects = {p.identity.display_name: p for p in result.snapshot.projects} + alpha = projects["Alpha"] + assert alpha.security.alerts_available is True + assert alpha.security.dependabot_critical == 1 + assert alpha.risk.security_risk is True + assert alpha.risk.risk_tier == "elevated" + assert "active-high-severity-alerts" in alpha.risk.risk_factors + + # A repo with no security entry stays unscanned (overlay is strictly opt-in). + calibrate = projects["Calibrate"] + assert calibrate.security.alerts_available is False + assert calibrate.security.dependabot_critical == 0 + assert calibrate.risk.security_risk is False + + # Serialized snapshot carries the security block. + alpha_dict = alpha.to_dict() + assert "security" in alpha_dict + assert alpha_dict["security"]["dependabot_critical"] == 1 + + +def test_security_overlay_absent_leaves_repos_unscanned( + portfolio_workspace: Path, + portfolio_catalog: Path, + legacy_registry: Path, +) -> None: + now = datetime.fromtimestamp(1_700_200_000, tz=timezone.utc) + result = build_portfolio_truth_snapshot( + workspace_root=portfolio_workspace, + catalog_path=portfolio_catalog, + legacy_registry_path=legacy_registry, + include_notion=False, + now=now, + ) + for project in result.snapshot.projects: + assert project.security.alerts_available is False + assert project.security.open_high_critical == 0 + assert project.risk.security_risk is False + + +def test_select_security_entry_joins_by_repo_name_when_display_differs() -> None: + # GHAS is keyed by repo name ("signal-noise"); the local dir is "Signal & Noise". + from src.portfolio_truth_reconcile import _select_security_entry + + entry = {"dependabot": {"high": 9, "available": True}} + lookup = {"signal-noise": entry} + assert _select_security_entry(lookup, "saagpatel/signal-noise", "Signal & Noise") is entry + + +def test_select_security_entry_falls_back_to_display_name() -> None: + from src.portfolio_truth_reconcile import _select_security_entry + + entry = {"dependabot": {"high": 1, "available": True}} + # No repo_full_name (local-only repo) → must fall back to display_name. + assert _select_security_entry({"Alpha": entry}, None, "Alpha") is entry + + +def test_select_security_entry_prefers_repo_name_over_display() -> None: + from src.portfolio_truth_reconcile import _select_security_entry + + by_repo = {"dependabot": {"high": 2, "available": True}} + by_display = {"dependabot": {"high": 5, "available": True}} + lookup = {"the-repo": by_repo, "DisplayName": by_display} + assert _select_security_entry(lookup, "owner/the-repo", "DisplayName") is by_repo + + +def test_select_security_entry_returns_none_when_unmatched() -> None: + from src.portfolio_truth_reconcile import _select_security_entry + + assert _select_security_entry({"other": {}}, "owner/missing", "AlsoMissing") is None def test_truth_snapshot_matches_repo_contracts_by_full_name( diff --git a/tests/test_portfolio_truth_strict_signals.py b/tests/test_portfolio_truth_strict_signals.py index ede513d..ae251de 100644 --- a/tests/test_portfolio_truth_strict_signals.py +++ b/tests/test_portfolio_truth_strict_signals.py @@ -237,3 +237,64 @@ def test_release_count_absent_for_missing_project(tmp_path: Path) -> None: assert result is not None assert "UnknownRepo" not in result assert result.get("KnownRepo") == 5 + + +def _make_ghas_json(tmp_path: Path, *, username: str, entries: dict) -> Path: + """Write a minimal output/ghas-alerts--.json fixture.""" + path = tmp_path / f"ghas-alerts-{username}-2026-05-31.json" + path.write_text(json.dumps(entries, indent=2)) + return path + + +def test_security_alerts_loaded_from_ghas_json(tmp_path: Path) -> None: + """--portfolio-truth-include-security with a valid GHAS JSON → name-keyed dict.""" + from src.cli import _load_security_alerts_by_name + + _make_ghas_json( + tmp_path, + username="saagpatel", + entries={ + "MyRepo": { + "dependabot": {"critical": 1, "high": 2, "available": True}, + "code_scanning": {"critical": 0, "high": 0, "available": True}, + "secret_scanning": {"open": 0, "available": True}, + } + }, + ) + result = _load_security_alerts_by_name(output_dir=tmp_path, username="saagpatel") + assert result is not None + assert result["MyRepo"]["dependabot"]["critical"] == 1 + + +def test_security_alerts_no_ghas_json_returns_none( + tmp_path: Path, caplog: pytest.LogCaptureFixture +) -> None: + """--portfolio-truth-include-security with no GHAS JSON → None, warning logged.""" + from src.cli import _load_security_alerts_by_name + + with caplog.at_level(logging.WARNING): + result = _load_security_alerts_by_name(output_dir=tmp_path, username="saagpatel") + + assert result is None + assert any("audit report --ghas-alerts" in record.message for record in caplog.records) + + +def test_security_alerts_picks_latest_by_mtime(tmp_path: Path) -> None: + """When multiple GHAS files exist, the most recently modified one wins.""" + from src.cli import _load_security_alerts_by_name + + older = tmp_path / "ghas-alerts-saagpatel-2026-05-01.json" + older.write_text(json.dumps({"MyRepo": {"dependabot": {"high": 9, "available": True}}})) + import os + + os.utime(older, (1_690_000_000, 1_690_000_000)) + newer = _make_ghas_json( + tmp_path, + username="saagpatel", + entries={"MyRepo": {"dependabot": {"high": 1, "available": True}}}, + ) + os.utime(newer, (1_700_000_000, 1_700_000_000)) + + result = _load_security_alerts_by_name(output_dir=tmp_path, username="saagpatel") + assert result is not None + assert result["MyRepo"]["dependabot"]["high"] == 1 diff --git a/tests/test_weekly_command_center.py b/tests/test_weekly_command_center.py index d24ac59..4d74960 100644 --- a/tests/test_weekly_command_center.py +++ b/tests/test_weekly_command_center.py @@ -131,3 +131,122 @@ def test_build_weekly_command_center_digest_surfaces_truth_and_guardrails() -> N assert "## Risk Posture" in rendered_md assert "GithubRepoAuditor" in rendered_md assert "JobCommandCenter" in rendered_md + + +def _sec(available: bool, critical: int = 0, high: int = 0) -> dict: + return { + "alerts_available": available, + "dependabot_critical": critical, + "dependabot_high": high, + "dependabot_medium": 0, + "dependabot_low": 0, + "code_scanning_critical": 0, + "code_scanning_high": 0, + "secret_scanning_open": 0, + } + + +def _security_project(name: str, tier: str, security: dict, factors: list | None = None) -> dict: + return { + "identity": {"display_name": name}, + "declared": {"operating_path": "maintain"}, + "derived": { + "registry_status": "active", + "activity_status": "active", + "path_override": "", + "path_confidence": "high", + "context_quality": "standard", + }, + "risk": { + "risk_tier": tier, + "risk_factors": factors or [], + "risk_summary": f"{name} risk.", + "doctor_gap": False, + "context_risk": False, + "path_risk": False, + "security_risk": bool( + security.get("dependabot_high") or security.get("dependabot_critical") + ), + }, + "security": security, + } + + +def _digest_for(portfolio_truth: dict) -> dict: + report_data = { + "username": "testuser", + "generated_at": "2026-04-14T12:00:00+00:00", + "operator_summary": {"decision_quality_v1": {}}, + "audits": [], + } + snapshot = {"operator_summary": report_data["operator_summary"], "operator_queue": []} + return build_weekly_command_center_digest( + report_data, + snapshot, + portfolio_truth=portfolio_truth, + generated_at="2026-04-14T12:00:00+00:00", + ) + + +def test_security_posture_surfaces_open_alerts_critical_first() -> None: + portfolio_truth = { + "projects": [ + _security_project( + "CriticalRepo", + "elevated", + _sec(True, critical=2, high=1), + ["active-high-severity-alerts"], + ), + _security_project( + "HighRepo", + "moderate", + _sec(True, critical=0, high=3), + ["active-high-severity-alerts"], + ), + _security_project("CleanRepo", "baseline", _sec(True, 0, 0)), + _security_project("UnscannedRepo", "baseline", _sec(False, 0, 0)), + ] + } + digest = _digest_for(portfolio_truth) + posture = digest["security_posture"] + + # Only repos with alerts_available are scanned; UnscannedRepo is excluded. + assert posture["scanned_count"] == 3 + assert posture["repos_with_open_high_critical"] == 2 + assert posture["total_open_critical"] == 2 + assert posture["total_open_high"] == 4 + + top = posture["top_alerts"] + assert [item["repo"] for item in top] == ["CriticalRepo", "HighRepo"] + assert top[0]["dependabot_critical"] == 2 + + rendered = render_weekly_command_center_markdown(digest) + assert "## Security Posture" in rendered + assert "CriticalRepo" in rendered + assert "2 critical, 1 high" in rendered + + +def test_security_posture_reports_clean_when_scanned_and_no_open_alerts() -> None: + portfolio_truth = { + "projects": [ + _security_project("CleanA", "baseline", _sec(True, 0, 0)), + _security_project("CleanB", "baseline", _sec(True, 0, 0)), + ] + } + digest = _digest_for(portfolio_truth) + assert digest["security_posture"]["scanned_count"] == 2 + assert digest["security_posture"]["top_alerts"] == [] + + rendered = render_weekly_command_center_markdown(digest) + assert "All 2 scanned repos are clear" in rendered + + +def test_security_posture_reports_not_run_when_no_overlay() -> None: + # The existing fixture has no security blocks → overlay was not run. + digest = _digest_for(_make_portfolio_truth()) + assert digest["security_posture"]["scanned_count"] == 0 + assert digest["security_posture"]["top_alerts"] == [] + + rendered = render_weekly_command_center_markdown(digest) + assert "## Security Posture" in rendered + assert "Security overlay not run" in rendered