Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 73 additions & 7 deletions iamscope/reasoner/cross_account_trust.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
probe_overlay_trace_entries,
)
from iamscope.reasoner.verdict import (
Assumption,
Blocker,
Check,
CheckState,
Expand Down Expand Up @@ -118,6 +119,10 @@
SEVERITY_INFO,
)

_ORG_MEMBERSHIP_STATUS_MEMBER = "member"
_ORG_MEMBERSHIP_STATUS_NON_MEMBER = "non_member"
_ORG_MEMBERSHIP_STATUS_UNKNOWN = "unknown"


class _OrgMembership(Enum):
"""Side-channel for check 3's org-membership status.
Expand All @@ -126,8 +131,8 @@ class _OrgMembership(Enum):
module docstring for the spec-ambiguity resolution rationale.
"""

EXTERNAL = "external" # source node org_member=False (truly external)
SAME_ORG = "same_org" # source node org_member=True (downgrade severity)
EXTERNAL = "external" # source node is confirmed non-member/external
SAME_ORG = "same_org" # source node is confirmed same-org/member
UNKNOWN = "unknown" # could not determine (escalates verdict to inconclusive)


Expand All @@ -149,6 +154,13 @@ def _resolve_source_org_membership(
account-root synthetic nodes do. Use that root metadata only when the
concrete node has no explicit value.
"""
status_membership = _resolve_org_membership_status(
source_node.properties.get("org_membership_status"),
"source node",
)
if status_membership is not None:
return status_membership

org_member_value = source_node.properties.get("org_member")
if org_member_value is False:
return (
Expand Down Expand Up @@ -178,6 +190,13 @@ def _resolve_source_org_membership(
f"source account root {account_root_arn} not in fact graph",
)

root_status_membership = _resolve_org_membership_status(
account_root.properties.get("org_membership_status"),
f"source account root {account_root_arn}",
)
if root_status_membership is not None:
return root_status_membership

root_org_member_value = account_root.properties.get("org_member")
if root_org_member_value is False:
return (
Expand All @@ -196,6 +215,37 @@ def _resolve_source_org_membership(
)


def _resolve_org_membership_status(
status_value: Any,
source_label: str,
) -> tuple[_OrgMembership, str] | None:
"""Resolve the explicit tri-state synthetic membership status if present."""
if status_value is None:
return None
if status_value == _ORG_MEMBERSHIP_STATUS_MEMBER:
return (
_OrgMembership.SAME_ORG,
f"{source_label} properties.org_membership_status=member",
)
if status_value == _ORG_MEMBERSHIP_STATUS_NON_MEMBER:
return (
_OrgMembership.EXTERNAL,
f"{source_label} properties.org_membership_status=non_member (confirmed external/non-member)",
)
if status_value == _ORG_MEMBERSHIP_STATUS_UNKNOWN:
return (
_OrgMembership.UNKNOWN,
(
f"{source_label} properties.org_membership_status=unknown; "
"collection may be partial, filtered, or standalone"
),
)
return (
_OrgMembership.UNKNOWN,
f"{source_label} has unrecognized org_membership_status={status_value!r}",
)


def _downgrade_severity(severity: str) -> str:
"""Drop one level on the severity ladder. info stays info."""
try:
Expand Down Expand Up @@ -601,6 +651,19 @@ def _evaluate_edge(
reasoning_trace=tuple(trace),
)

assumptions: list[Assumption] = []
if org_membership is _OrgMembership.UNKNOWN:
assumptions.append(
Assumption(
kind="org_membership_status",
detail=(
"source org membership is unknown; absence from known "
"accounts is not treated as confirmed external/non-member "
"because collection may be partial, filtered, or standalone"
),
)
)

# ---- Build the Finding.
title = self._compose_title(
naked_trust_value=naked_trust_value,
Expand All @@ -627,7 +690,7 @@ def _evaluate_edge(
title=title,
required_checks=tuple(check_results),
blockers_observed=tuple(blockers),
assumptions=(),
assumptions=tuple(assumptions),
evidence=evidence,
scenario_hash=facts.scenario_hash,
reasoner_exit_reason=exit_reason,
Expand Down Expand Up @@ -767,7 +830,7 @@ def _compute_verdict_and_severity(
return (
Verdict.INCONCLUSIVE,
SEVERITY_HIGH,
"source node org_membership unknown",
("source org_membership_status unknown; collection may be partial, filtered, or standalone"),
)

# Rule 5: All PASS → VALIDATED with severity from naked_trust class.
Expand Down Expand Up @@ -805,9 +868,12 @@ def _compose_title(
org_membership: _OrgMembership,
) -> str:
"""Build a human-readable one-line title for the finding."""
org_qualifier = (
"same-org cross-account" if org_membership is _OrgMembership.SAME_ORG else "external cross-account"
)
if org_membership is _OrgMembership.SAME_ORG:
org_qualifier = "same-org cross-account"
elif org_membership is _OrgMembership.EXTERNAL:
org_qualifier = "external cross-account"
else:
org_qualifier = "unknown-membership cross-account"
if verdict is Verdict.VALIDATED:
return f"Validated {naked_trust_value} {org_qualifier} trust grant"
if verdict is Verdict.BLOCKED:
Expand Down
176 changes: 176 additions & 0 deletions tests/integration/test_cross_account_org_membership_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""Cross-account trust reasoner semantics for tri-state org membership."""

from __future__ import annotations

from iamscope.collector.account import AccountData
from iamscope.constants import NODE_TYPE_IAM_ROLE, PROVIDER_AWS, REGION_GLOBAL, SEVERITY_HIGH, SEVERITY_MEDIUM
from iamscope.models import AccountInfo, Node, OrgData
from iamscope.parser.trust_policy import parse_trust_policy
from iamscope.pipeline import PipelineConfig, _run_resolution
from iamscope.reasoner import CheckState, CrossAccountTrustReasoner, FactGraph, Verdict

TARGET_ACCOUNT = "1" * 12
SOURCE_ACCOUNT = "2" * 12
SKIPPED_ACCOUNT = "3" * 12


def _role_arn(account_id: str, role_name: str) -> str:
return f"arn:aws:iam::{account_id}:role/{role_name}"


def _root_arn(account_id: str) -> str:
return f"arn:aws:iam::{account_id}:root"


def _target_role_node() -> Node:
return Node(
provider=PROVIDER_AWS,
node_type=NODE_TYPE_IAM_ROLE,
provider_id=_role_arn(TARGET_ACCOUNT, "TrustTarget"),
region=REGION_GLOBAL,
properties={"account_id": TARGET_ACCOUNT, "path": "/", "is_synthetic": False},
)


def _org_data(account_ids: list[str]) -> OrgData:
return OrgData(
org_id="o-org-membership-status",
root_id="r-root",
accounts=[
AccountInfo(
account_id=account_id,
name=f"Account{index}",
email=f"account{index}@example.invalid",
status="ACTIVE",
parent_id="r-root",
)
for index, account_id in enumerate(account_ids)
],
)


def _finding_for_principal(principal: object, org_account_ids: list[str]):
target = _target_role_node()
trust_result = parse_trust_policy(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": principal,
"Action": "sts:AssumeRole",
}
],
},
role_arn=target.provider_id,
role_account_id=TARGET_ACCOUNT,
)[0]
account_data = AccountData(
account_id=TARGET_ACCOUNT,
nodes=[target],
trust_results=[(target, trust_result)],
permission_results=[],
role_arns=[target.provider_id],
)
nodes, edges, constraints, edge_constraints, _budget = _run_resolution(
_org_data(org_account_ids),
[account_data],
PipelineConfig(),
)
facts = FactGraph(
nodes=tuple(nodes),
edges=tuple(edges),
constraints=tuple(constraints),
edge_constraints=tuple(edge_constraints),
scenario_hash="org-membership-status-regression",
edge_budget_exhausted=False,
)
findings = CrossAccountTrustReasoner().run(facts)
assert len(findings) == 1
return findings[0], nodes


def _check(finding, name: str):
for check in finding.required_checks:
if check.name == name:
return check
raise AssertionError(f"missing check {name!r}")


def _org_trace(finding):
return next(trace for trace in finding.evidence.reasoning_trace if trace.action == "evaluate_source_org_membership")


def test_member_status_preserves_same_org_downgrade() -> None:
finding, _nodes = _finding_for_principal(
{"AWS": _root_arn(SOURCE_ACCOUNT)},
[TARGET_ACCOUNT, SOURCE_ACCOUNT],
)

assert finding.pattern_id == "cross_account_trust"
assert finding.source.provider_id == _root_arn(SOURCE_ACCOUNT)
assert finding.target.provider_id == _role_arn(TARGET_ACCOUNT, "TrustTarget")
assert finding.verdict is Verdict.VALIDATED
assert finding.severity == SEVERITY_MEDIUM
assert "same-org cross-account" in finding.title
assert "severity downgraded" in finding.reasoner_exit_reason
assert _check(finding, "source_principal_resolvable").state is CheckState.PASS
org_trace = _org_trace(finding)
assert org_trace.result == "SAME_ORG"
assert "org_membership_status=member" in org_trace.reason
assert finding.assumptions == ()


def test_non_member_status_preserves_confirmed_external_wording() -> None:
finding, _nodes = _finding_for_principal(
{"AWS": _root_arn(SOURCE_ACCOUNT)},
[TARGET_ACCOUNT],
)

assert finding.verdict is Verdict.VALIDATED
assert finding.severity == SEVERITY_HIGH
assert "external cross-account" in finding.title
assert "truly external source" in finding.reasoner_exit_reason
org_trace = _org_trace(finding)
assert org_trace.result == "EXTERNAL"
assert "org_membership_status=non_member" in org_trace.reason
assert "confirmed external/non-member" in org_trace.reason
assert finding.assumptions == ()


def test_unknown_status_is_visible_without_confirmed_external_wording() -> None:
finding, nodes = _finding_for_principal(
{"AWS": _root_arn(SOURCE_ACCOUNT)},
[TARGET_ACCOUNT, SKIPPED_ACCOUNT],
)

source_node = next(node for node in nodes if node.provider_id == _root_arn(SOURCE_ACCOUNT))
assert source_node.properties["org_membership_status"] == "unknown"
assert finding.verdict is Verdict.INCONCLUSIVE
assert finding.severity == SEVERITY_HIGH
assert "unknown-membership cross-account" in finding.title
assert "external cross-account" not in finding.title
assert "truly external source" not in finding.reasoner_exit_reason
assert "org_membership_status unknown" in finding.reasoner_exit_reason
assert _check(finding, "source_principal_resolvable").state is CheckState.PASS
org_trace = _org_trace(finding)
assert org_trace.result == "UNKNOWN"
assert "org_membership_status=unknown" in org_trace.reason
assert "partial, filtered, or standalone" in org_trace.reason
assert len(finding.assumptions) == 1
assert finding.assumptions[0].kind == "org_membership_status"
assert "not treated as confirmed external/non-member" in finding.assumptions[0].detail


def test_wildcard_principal_remains_confirmed_external() -> None:
finding, _nodes = _finding_for_principal(
"*",
[TARGET_ACCOUNT],
)

assert finding.verdict is Verdict.VALIDATED
assert "external cross-account" in finding.title
org_trace = _org_trace(finding)
assert org_trace.result == "EXTERNAL"
assert "org_membership_status=non_member" in org_trace.reason
assert finding.assumptions == ()