Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions iamscope/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,10 @@ def _run_reasoners_and_emit(
time.gmtime(),
),
reasoning_duration_seconds=reasoning_duration,
collection_context_source={
"collection_failures": [failure.to_dict() for failure in result.collection_failures],
"policy_parse_failures": [failure.to_dict() for failure in result.policy_parse_failures],
},
)

return (
Expand Down
123 changes: 122 additions & 1 deletion iamscope/output/findings_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

from __future__ import annotations

import json
import re
from typing import Any

from iamscope.constants import ID_ALGORITHM, JSON_TRAILING_NEWLINE
Expand Down Expand Up @@ -95,6 +97,7 @@ def emit_findings(
reasoning_timestamp: str = "",
reasoning_duration_seconds: float = 0.0,
source_tool_version: str = DEFAULT_SOURCE_TOOL_VERSION,
collection_context_source: dict[str, Any] | None = None,
) -> tuple[bytes, str]:
"""Emit findings.json as canonical bytes with deterministic hash.

Expand Down Expand Up @@ -125,6 +128,10 @@ def emit_findings(
source_tool_version: Override for the iamscope version string
written to top-level `source_tool_version` and
`metadata.collector_version`.
collection_context_source: Optional scenario/PipelineResult metadata
carrying `collection_failures` and `policy_parse_failures`.
The emitter maps these records onto each finding without
changing verdict semantics.

Returns:
Tuple of (findings_json_bytes, canonical_hash_hex). The bytes
Expand Down Expand Up @@ -154,7 +161,7 @@ def emit_findings(
# property compute on each Finding, populating the cache. Subsequent
# calls to `f.finding_id` (e.g., for the verdict breakdown) hit the
# cache.
finding_dicts = [_finding_to_dict(f, reasoners_used) for f in sorted_findings]
finding_dicts = [_finding_to_dict(f, reasoners_used, collection_context_source) for f in sorted_findings]

# Verdict breakdown — always populated with all four enum values,
# zero entries for verdicts not present. This makes the field shape
Expand Down Expand Up @@ -242,6 +249,7 @@ def _compute_verdict_breakdown(findings: list[Finding]) -> dict[str, int]:
def _finding_to_dict(
finding: Finding,
reasoners_used: dict[str, dict[str, str]],
collection_context_source: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Convert a Finding to its §3.6 JSON dict shape.

Expand All @@ -264,6 +272,7 @@ def _finding_to_dict(
return {
"assumptions": [_assumption_to_dict(a) for a in finding.assumptions],
"blockers_observed": [_blocker_to_dict(b) for b in finding.blockers_observed],
"collection_context": _collection_context_for_finding(finding, collection_context_source),
"evidence": _evidence_to_dict(finding.evidence),
"finding_id": finding.finding_id,
"finding_key": finding.finding_key,
Expand All @@ -281,6 +290,118 @@ def _finding_to_dict(
}


def _collection_context_for_finding(
finding: Finding,
collection_context_source: dict[str, Any] | None,
) -> dict[str, Any]:
"""Build deterministic per-finding partial-collection context.

Reasoners should not know about collection coverage. The emitter has
enough presentation context to attach scenario-level collection and
parse failures to each finding by source/target account or exact
source/target ARN while preserving verdict semantics.
"""
source = collection_context_source or {}
collection_failures = _normalise_failure_records(source.get("collection_failures", []))
policy_parse_failures = _normalise_failure_records(source.get("policy_parse_failures", []))
finding_accounts = sorted(
{
account
for account in (
_extract_aws_account_id(finding.source.provider_id),
_extract_aws_account_id(finding.target.provider_id),
)
if account
}
)
finding_provider_ids = {finding.source.provider_id, finding.target.provider_id}

related_collection_failures = [
failure for failure in collection_failures if str(failure.get("account_id", "")) in finding_accounts
]
related_policy_parse_failures: list[dict[str, Any]] = []
account_level_policy_match = False
for failure in policy_parse_failures:
source_arn = str(failure.get("source_arn", ""))
failure_account = _extract_aws_account_id(source_arn) or _extract_aws_account_id(
str(failure.get("policy_arn", ""))
)
if source_arn in finding_provider_ids:
related_policy_parse_failures.append(failure)
elif failure_account and failure_account in finding_accounts:
related_policy_parse_failures.append(failure)
account_level_policy_match = True

related_collection_failures = _sort_failure_records(related_collection_failures)
related_policy_parse_failures = _sort_failure_records(related_policy_parse_failures)

coverage_notes: list[str] = []
if collection_failures and not related_collection_failures:
coverage_notes.append("collection was partial; no direct account match found for this finding")
if policy_parse_failures and not related_policy_parse_failures:
coverage_notes.append(
"policy parsing was partial; no direct source/target account match found for this finding"
)
if account_level_policy_match:
coverage_notes.append("policy parse failure related by account, not exact source/target ARN")

affected_accounts = sorted(
{
str(failure.get("account_id", ""))
for failure in related_collection_failures
if str(failure.get("account_id", ""))
}
| {
account
for failure in related_policy_parse_failures
for account in (
_extract_aws_account_id(str(failure.get("source_arn", ""))),
_extract_aws_account_id(str(failure.get("policy_arn", ""))),
)
if account
}
)

has_collection_failures = bool(collection_failures)
has_policy_parse_failures = bool(policy_parse_failures)
return {
"affected_accounts": affected_accounts,
"coverage_notes": sorted(set(coverage_notes)),
"graph_collection_complete": not (has_collection_failures or has_policy_parse_failures),
"has_collection_failures": has_collection_failures,
"has_policy_parse_failures": has_policy_parse_failures,
"related_collection_failures": related_collection_failures,
"related_policy_parse_failures": related_policy_parse_failures,
}


def _normalise_failure_records(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
records: list[dict[str, Any]] = []
for item in value:
if isinstance(item, dict):
records.append({str(key): _safe_context_value(val) for key, val in item.items()})
return _sort_failure_records(records)


def _sort_failure_records(records: list[dict[str, Any]]) -> list[dict[str, Any]]:
return sorted(records, key=lambda record: json.dumps(record, sort_keys=True, separators=(",", ":")))


def _safe_context_value(value: Any) -> Any:
if isinstance(value, str):
return value.replace("\r", " ").replace("\n", " ")
if value is None or isinstance(value, (bool, int, float)):
return value
return str(value).replace("\r", " ").replace("\n", " ")


def _extract_aws_account_id(provider_id: str) -> str:
match = re.match(r"^arn:[^:]*:[^:]*::([0-9]{12}):", provider_id)
return match.group(1) if match else ""


def _check_to_dict(check: Check) -> dict[str, Any]:
"""Convert a Check to its §3.6 dict shape.

Expand Down
3 changes: 3 additions & 0 deletions iamscope/reasoner/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def run_reasoners_on_frozen_artifacts(
binding_metadata_path=binding_metadata_path,
probe_overlay_path=probe_overlay_path,
)
scenario = json.loads(Path(scenario_path).read_bytes())
scenario_metadata = dict(scenario.get("metadata", {}))
registry = Registry()
for instance in reasoner_instances:
registry.register(instance)
Expand Down Expand Up @@ -117,6 +119,7 @@ def run_reasoners_on_frozen_artifacts(
reasoners_skipped=reasoners_skipped or None,
reasoning_timestamp=reasoning_timestamp,
reasoning_duration_seconds=duration,
collection_context_source=scenario_metadata,
)
return ReplayResult(
findings=tuple(findings),
Expand Down

Large diffs are not rendered by default.

Loading