From e17b1a2e3e7fa8d364c2782886f52d7b2c8fe3db Mon Sep 17 00:00:00 2001 From: Eric Conklin Date: Fri, 5 Jun 2026 16:47:04 -0500 Subject: [PATCH] Respect unknown org membership in cross-account trust --- iamscope/reasoner/cross_account_trust.py | 80 +++++++- ...est_cross_account_org_membership_status.py | 176 ++++++++++++++++++ 2 files changed, 249 insertions(+), 7 deletions(-) create mode 100644 tests/integration/test_cross_account_org_membership_status.py diff --git a/iamscope/reasoner/cross_account_trust.py b/iamscope/reasoner/cross_account_trust.py index 4b72973..0e8afea 100644 --- a/iamscope/reasoner/cross_account_trust.py +++ b/iamscope/reasoner/cross_account_trust.py @@ -81,6 +81,7 @@ probe_overlay_trace_entries, ) from iamscope.reasoner.verdict import ( + Assumption, Blocker, Check, CheckState, @@ -118,6 +119,10 @@ SEVERITY_INFO, ) +_ORG_MEMBERSHIP_STATUS_MEMBER = "member" +_ORG_MEMBERSHIP_STATUS_NON_MEMBER = "non_member" +_ORG_MEMBERSHIP_STATUS_UNKNOWN = "unknown" + class _OrgMembership(Enum): """Side-channel for check 3's org-membership status. @@ -126,8 +131,8 @@ class _OrgMembership(Enum): module docstring for the spec-ambiguity resolution rationale. """ - EXTERNAL = "external" # source node org_member=False (truly external) - SAME_ORG = "same_org" # source node org_member=True (downgrade severity) + EXTERNAL = "external" # source node is confirmed non-member/external + SAME_ORG = "same_org" # source node is confirmed same-org/member UNKNOWN = "unknown" # could not determine (escalates verdict to inconclusive) @@ -149,6 +154,13 @@ def _resolve_source_org_membership( account-root synthetic nodes do. Use that root metadata only when the concrete node has no explicit value. """ + status_membership = _resolve_org_membership_status( + source_node.properties.get("org_membership_status"), + "source node", + ) + if status_membership is not None: + return status_membership + org_member_value = source_node.properties.get("org_member") if org_member_value is False: return ( @@ -178,6 +190,13 @@ def _resolve_source_org_membership( f"source account root {account_root_arn} not in fact graph", ) + root_status_membership = _resolve_org_membership_status( + account_root.properties.get("org_membership_status"), + f"source account root {account_root_arn}", + ) + if root_status_membership is not None: + return root_status_membership + root_org_member_value = account_root.properties.get("org_member") if root_org_member_value is False: return ( @@ -196,6 +215,37 @@ def _resolve_source_org_membership( ) +def _resolve_org_membership_status( + status_value: Any, + source_label: str, +) -> tuple[_OrgMembership, str] | None: + """Resolve the explicit tri-state synthetic membership status if present.""" + if status_value is None: + return None + if status_value == _ORG_MEMBERSHIP_STATUS_MEMBER: + return ( + _OrgMembership.SAME_ORG, + f"{source_label} properties.org_membership_status=member", + ) + if status_value == _ORG_MEMBERSHIP_STATUS_NON_MEMBER: + return ( + _OrgMembership.EXTERNAL, + f"{source_label} properties.org_membership_status=non_member (confirmed external/non-member)", + ) + if status_value == _ORG_MEMBERSHIP_STATUS_UNKNOWN: + return ( + _OrgMembership.UNKNOWN, + ( + f"{source_label} properties.org_membership_status=unknown; " + "collection may be partial, filtered, or standalone" + ), + ) + return ( + _OrgMembership.UNKNOWN, + f"{source_label} has unrecognized org_membership_status={status_value!r}", + ) + + def _downgrade_severity(severity: str) -> str: """Drop one level on the severity ladder. info stays info.""" try: @@ -601,6 +651,19 @@ def _evaluate_edge( reasoning_trace=tuple(trace), ) + assumptions: list[Assumption] = [] + if org_membership is _OrgMembership.UNKNOWN: + assumptions.append( + Assumption( + kind="org_membership_status", + detail=( + "source org membership is unknown; absence from known " + "accounts is not treated as confirmed external/non-member " + "because collection may be partial, filtered, or standalone" + ), + ) + ) + # ---- Build the Finding. title = self._compose_title( naked_trust_value=naked_trust_value, @@ -627,7 +690,7 @@ def _evaluate_edge( title=title, required_checks=tuple(check_results), blockers_observed=tuple(blockers), - assumptions=(), + assumptions=tuple(assumptions), evidence=evidence, scenario_hash=facts.scenario_hash, reasoner_exit_reason=exit_reason, @@ -767,7 +830,7 @@ def _compute_verdict_and_severity( return ( Verdict.INCONCLUSIVE, SEVERITY_HIGH, - "source node org_membership unknown", + ("source org_membership_status unknown; collection may be partial, filtered, or standalone"), ) # Rule 5: All PASS → VALIDATED with severity from naked_trust class. @@ -805,9 +868,12 @@ def _compose_title( org_membership: _OrgMembership, ) -> str: """Build a human-readable one-line title for the finding.""" - org_qualifier = ( - "same-org cross-account" if org_membership is _OrgMembership.SAME_ORG else "external cross-account" - ) + if org_membership is _OrgMembership.SAME_ORG: + org_qualifier = "same-org cross-account" + elif org_membership is _OrgMembership.EXTERNAL: + org_qualifier = "external cross-account" + else: + org_qualifier = "unknown-membership cross-account" if verdict is Verdict.VALIDATED: return f"Validated {naked_trust_value} {org_qualifier} trust grant" if verdict is Verdict.BLOCKED: diff --git a/tests/integration/test_cross_account_org_membership_status.py b/tests/integration/test_cross_account_org_membership_status.py new file mode 100644 index 0000000..cd42ebd --- /dev/null +++ b/tests/integration/test_cross_account_org_membership_status.py @@ -0,0 +1,176 @@ +"""Cross-account trust reasoner semantics for tri-state org membership.""" + +from __future__ import annotations + +from iamscope.collector.account import AccountData +from iamscope.constants import NODE_TYPE_IAM_ROLE, PROVIDER_AWS, REGION_GLOBAL, SEVERITY_HIGH, SEVERITY_MEDIUM +from iamscope.models import AccountInfo, Node, OrgData +from iamscope.parser.trust_policy import parse_trust_policy +from iamscope.pipeline import PipelineConfig, _run_resolution +from iamscope.reasoner import CheckState, CrossAccountTrustReasoner, FactGraph, Verdict + +TARGET_ACCOUNT = "1" * 12 +SOURCE_ACCOUNT = "2" * 12 +SKIPPED_ACCOUNT = "3" * 12 + + +def _role_arn(account_id: str, role_name: str) -> str: + return f"arn:aws:iam::{account_id}:role/{role_name}" + + +def _root_arn(account_id: str) -> str: + return f"arn:aws:iam::{account_id}:root" + + +def _target_role_node() -> Node: + return Node( + provider=PROVIDER_AWS, + node_type=NODE_TYPE_IAM_ROLE, + provider_id=_role_arn(TARGET_ACCOUNT, "TrustTarget"), + region=REGION_GLOBAL, + properties={"account_id": TARGET_ACCOUNT, "path": "/", "is_synthetic": False}, + ) + + +def _org_data(account_ids: list[str]) -> OrgData: + return OrgData( + org_id="o-org-membership-status", + root_id="r-root", + accounts=[ + AccountInfo( + account_id=account_id, + name=f"Account{index}", + email=f"account{index}@example.invalid", + status="ACTIVE", + parent_id="r-root", + ) + for index, account_id in enumerate(account_ids) + ], + ) + + +def _finding_for_principal(principal: object, org_account_ids: list[str]): + target = _target_role_node() + trust_result = parse_trust_policy( + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": principal, + "Action": "sts:AssumeRole", + } + ], + }, + role_arn=target.provider_id, + role_account_id=TARGET_ACCOUNT, + )[0] + account_data = AccountData( + account_id=TARGET_ACCOUNT, + nodes=[target], + trust_results=[(target, trust_result)], + permission_results=[], + role_arns=[target.provider_id], + ) + nodes, edges, constraints, edge_constraints, _budget = _run_resolution( + _org_data(org_account_ids), + [account_data], + PipelineConfig(), + ) + facts = FactGraph( + nodes=tuple(nodes), + edges=tuple(edges), + constraints=tuple(constraints), + edge_constraints=tuple(edge_constraints), + scenario_hash="org-membership-status-regression", + edge_budget_exhausted=False, + ) + findings = CrossAccountTrustReasoner().run(facts) + assert len(findings) == 1 + return findings[0], nodes + + +def _check(finding, name: str): + for check in finding.required_checks: + if check.name == name: + return check + raise AssertionError(f"missing check {name!r}") + + +def _org_trace(finding): + return next(trace for trace in finding.evidence.reasoning_trace if trace.action == "evaluate_source_org_membership") + + +def test_member_status_preserves_same_org_downgrade() -> None: + finding, _nodes = _finding_for_principal( + {"AWS": _root_arn(SOURCE_ACCOUNT)}, + [TARGET_ACCOUNT, SOURCE_ACCOUNT], + ) + + assert finding.pattern_id == "cross_account_trust" + assert finding.source.provider_id == _root_arn(SOURCE_ACCOUNT) + assert finding.target.provider_id == _role_arn(TARGET_ACCOUNT, "TrustTarget") + assert finding.verdict is Verdict.VALIDATED + assert finding.severity == SEVERITY_MEDIUM + assert "same-org cross-account" in finding.title + assert "severity downgraded" in finding.reasoner_exit_reason + assert _check(finding, "source_principal_resolvable").state is CheckState.PASS + org_trace = _org_trace(finding) + assert org_trace.result == "SAME_ORG" + assert "org_membership_status=member" in org_trace.reason + assert finding.assumptions == () + + +def test_non_member_status_preserves_confirmed_external_wording() -> None: + finding, _nodes = _finding_for_principal( + {"AWS": _root_arn(SOURCE_ACCOUNT)}, + [TARGET_ACCOUNT], + ) + + assert finding.verdict is Verdict.VALIDATED + assert finding.severity == SEVERITY_HIGH + assert "external cross-account" in finding.title + assert "truly external source" in finding.reasoner_exit_reason + org_trace = _org_trace(finding) + assert org_trace.result == "EXTERNAL" + assert "org_membership_status=non_member" in org_trace.reason + assert "confirmed external/non-member" in org_trace.reason + assert finding.assumptions == () + + +def test_unknown_status_is_visible_without_confirmed_external_wording() -> None: + finding, nodes = _finding_for_principal( + {"AWS": _root_arn(SOURCE_ACCOUNT)}, + [TARGET_ACCOUNT, SKIPPED_ACCOUNT], + ) + + source_node = next(node for node in nodes if node.provider_id == _root_arn(SOURCE_ACCOUNT)) + assert source_node.properties["org_membership_status"] == "unknown" + assert finding.verdict is Verdict.INCONCLUSIVE + assert finding.severity == SEVERITY_HIGH + assert "unknown-membership cross-account" in finding.title + assert "external cross-account" not in finding.title + assert "truly external source" not in finding.reasoner_exit_reason + assert "org_membership_status unknown" in finding.reasoner_exit_reason + assert _check(finding, "source_principal_resolvable").state is CheckState.PASS + org_trace = _org_trace(finding) + assert org_trace.result == "UNKNOWN" + assert "org_membership_status=unknown" in org_trace.reason + assert "partial, filtered, or standalone" in org_trace.reason + assert len(finding.assumptions) == 1 + assert finding.assumptions[0].kind == "org_membership_status" + assert "not treated as confirmed external/non-member" in finding.assumptions[0].detail + + +def test_wildcard_principal_remains_confirmed_external() -> None: + finding, _nodes = _finding_for_principal( + "*", + [TARGET_ACCOUNT], + ) + + assert finding.verdict is Verdict.VALIDATED + assert "external cross-account" in finding.title + org_trace = _org_trace(finding) + assert org_trace.result == "EXTERNAL" + assert "org_membership_status=non_member" in org_trace.reason + assert finding.assumptions == ()