diff --git a/iamscope/pipeline.py b/iamscope/pipeline.py index 96b41cd..5e87805 100644 --- a/iamscope/pipeline.py +++ b/iamscope/pipeline.py @@ -583,6 +583,34 @@ def _get_account_session( ) +def _org_membership_resolution_context( + org_data: OrgData, + all_account_data: list[AccountData], + config: PipelineConfig, +) -> tuple[set[str], bool]: + """Return known accounts and whether absence proves non-membership. + + Trust-policy synthetic principals need a tri-state org-membership signal: + member, non_member, or unknown. Known active org accounts and directly + collected account IDs are members. Absence proves non-membership only when + the run covers the full active org account set. Standalone, account-filtered, + skipped, or otherwise partial runs leave absent accounts unknown instead of + silently asserting they are external/non-members. + """ + active_org_accounts = set(org_data.active_account_ids) + collected_accounts = {acct.account_id for acct in all_account_data if acct.account_id} + known_accounts = active_org_accounts | collected_accounts + + org_collection_complete = ( + not config.standalone + and config.account_filter is None + and not config.skip_accounts + and bool(active_org_accounts) + and active_org_accounts <= collected_accounts + ) + return known_accounts, org_collection_complete + + def _run_resolution( org_data: OrgData, all_account_data: list[AccountData], @@ -657,8 +685,14 @@ def _add_edges(new_edges: list[Edge]) -> None: ec2_mode=config.ec2_mode, ) - # Known account IDs for synthetic node resolution - known_accounts = org_data.active_account_ids + # Known account IDs for synthetic node resolution. Absence from this set + # proves non-membership only when collection covered the full active org; + # partial/standalone runs keep absent accounts explicitly unknown. + known_accounts, org_collection_complete = _org_membership_resolution_context( + org_data, + all_account_data, + config, + ) # NF-1 fix (S06): construct a real NoiseFilter from config and pass its # edge filter function to build_trust_edges. Pre-S06 this was dead code @@ -704,7 +738,11 @@ def _add_edges(new_edges: list[Edge]) -> None: all_nodes.extend(hyperedge_nodes) # Resolve synthetic nodes (external accounts, wildcards, services) - synthetic_nodes = resolve_synthetic_nodes(all_trust_results, known_account_ids=known_accounts) + synthetic_nodes = resolve_synthetic_nodes( + all_trust_results, + known_account_ids=known_accounts, + org_collection_complete=org_collection_complete, + ) all_nodes.extend(synthetic_nodes) # Add Lambda/EC2 service nodes and edges diff --git a/iamscope/resolver/cross_account.py b/iamscope/resolver/cross_account.py index 02ba17d..d91a083 100644 --- a/iamscope/resolver/cross_account.py +++ b/iamscope/resolver/cross_account.py @@ -37,10 +37,15 @@ from iamscope.models import ControlRef, Edge, Node, NodeRef, TrustParseResult from iamscope.resolver.naked_trust import classify_naked_trust +ORG_MEMBERSHIP_MEMBER = "member" +ORG_MEMBERSHIP_NON_MEMBER = "non_member" +ORG_MEMBERSHIP_UNKNOWN = "unknown" + def resolve_synthetic_nodes( trust_results: list[TrustParseResult], known_account_ids: set[str] | None = None, + org_collection_complete: bool = False, ) -> list[Node]: """Create synthetic nodes for principals referenced in trust policies. @@ -51,6 +56,11 @@ def resolve_synthetic_nodes( trust_results: Parsed trust policy results from all roles. known_account_ids: Account IDs collected in this run (used to mark external vs. internal accounts). + org_collection_complete: True when the account/org collection scope + is complete enough that absence from known accounts + can be treated as a confirmed non-member. False + preserves uncertainty for partial or standalone + collection. Returns: Sorted list of deduplicated synthetic Node objects. @@ -64,7 +74,11 @@ def resolve_synthetic_nodes( if key in seen: continue - node = _create_synthetic_node(tr, known) + node = _create_synthetic_node( + tr, + known, + org_collection_complete=org_collection_complete, + ) if node is not None: seen[key] = node @@ -75,6 +89,7 @@ def resolve_synthetic_nodes( def _create_synthetic_node( tr: TrustParseResult, known_account_ids: set[str], + org_collection_complete: bool, ) -> Node | None: """Create a single synthetic node from a trust parse result. @@ -87,6 +102,7 @@ def _create_synthetic_node( if node_type == NODE_TYPE_WILDCARD_PRINCIPAL: properties["description"] = "Any AWS principal (Principal: *)" + properties["org_membership_status"] = ORG_MEMBERSHIP_NON_MEMBER properties["org_member"] = False return Node( provider=PROVIDER_AWS, @@ -98,10 +114,14 @@ def _create_synthetic_node( if node_type == NODE_TYPE_ACCOUNT_ROOT: account_id = _extract_account_from_arn(provider_id) - is_org_member = account_id in known_account_ids if account_id else False properties["account_id"] = account_id or "" - properties["is_external"] = not is_org_member - properties["org_member"] = is_org_member + properties.update( + _org_membership_properties( + account_id, + known_account_ids, + org_collection_complete=org_collection_complete, + ) + ) return Node( provider=PROVIDER_AWS, node_type=NODE_TYPE_ACCOUNT_ROOT, @@ -142,7 +162,9 @@ def _create_synthetic_node( if node_type == NODE_TYPE_EXTERNAL_ACCOUNT: properties["raw_principal"] = provider_id - properties["org_member"] = False + properties["org_membership_status"] = ORG_MEMBERSHIP_UNKNOWN + properties["org_member"] = None + properties["is_external"] = None return Node( provider=PROVIDER_AWS, node_type=NODE_TYPE_EXTERNAL_ACCOUNT, @@ -156,10 +178,14 @@ def _create_synthetic_node( if node_type in (NODE_TYPE_IAM_ROLE, NODE_TYPE_IAM_USER): if tr.cross_account: account_id = _extract_account_from_arn(provider_id) - is_org_member = account_id in known_account_ids if account_id else False properties["account_id"] = account_id or "" - properties["is_external"] = not is_org_member - properties["org_member"] = is_org_member + properties.update( + _org_membership_properties( + account_id, + known_account_ids, + org_collection_complete=org_collection_complete, + ) + ) return Node( provider=PROVIDER_AWS, node_type=node_type, @@ -173,6 +199,56 @@ def _create_synthetic_node( return None +def _org_membership_properties( + account_id: str | None, + known_account_ids: set[str], + *, + org_collection_complete: bool, +) -> dict[str, Any]: + """Return tri-state org-membership properties for synthetic principals. + + Compatibility choice: keep the legacy `org_member` and `is_external` + keys, but set both to None when membership is unknown. This preserves + existing field presence for consumers while avoiding a false + non-member/external assertion when collection scope is partial. + """ + status = _org_membership_status( + account_id, + known_account_ids, + org_collection_complete=org_collection_complete, + ) + if status == ORG_MEMBERSHIP_MEMBER: + return { + "org_membership_status": status, + "org_member": True, + "is_external": False, + } + if status == ORG_MEMBERSHIP_NON_MEMBER: + return { + "org_membership_status": status, + "org_member": False, + "is_external": True, + } + return { + "org_membership_status": ORG_MEMBERSHIP_UNKNOWN, + "org_member": None, + "is_external": None, + } + + +def _org_membership_status( + account_id: str | None, + known_account_ids: set[str], + *, + org_collection_complete: bool, +) -> str: + if account_id and account_id in known_account_ids: + return ORG_MEMBERSHIP_MEMBER + if org_collection_complete and account_id: + return ORG_MEMBERSHIP_NON_MEMBER + return ORG_MEMBERSHIP_UNKNOWN + + def build_trust_edges( trust_results: list[TrustParseResult], role_node: Node, diff --git a/tests/resolver/test_org_membership_uncertainty.py b/tests/resolver/test_org_membership_uncertainty.py new file mode 100644 index 0000000..c26bd76 --- /dev/null +++ b/tests/resolver/test_org_membership_uncertainty.py @@ -0,0 +1,218 @@ +"""Pipeline-shaped tests for synthetic org-membership uncertainty.""" + +from __future__ import annotations + +import json + +from iamscope.collector.account import AccountData +from iamscope.constants import ( + NODE_TYPE_ACCOUNT_ROOT, + NODE_TYPE_IAM_ROLE, + PROVIDER_AWS, + REGION_GLOBAL, + TRUST_SCOPE_ACCOUNT_ROOT, + TRUST_SCOPE_SPECIFIC_ROLE, +) +from iamscope.models import AccountInfo, Node, OrgData, ScenarioMetadata, TrustParseResult +from iamscope.output.scenario_json import emit_scenario +from iamscope.pipeline import PipelineConfig, _run_resolution + +MEMBER_ACCOUNT = "1" * 12 +OTHER_ACCOUNT = "2" * 12 +SKIPPED_ACCOUNT = "3" * 12 + + +def _role_arn(account_id: str, role_name: str = "TargetRole") -> str: + return f"arn:aws:iam::{account_id}:role/{role_name}" + + +def _root_arn(account_id: str) -> str: + return f"arn:aws:iam::{account_id}:root" + + +def _role_node(account_id: str = MEMBER_ACCOUNT) -> Node: + arn = _role_arn(account_id) + return Node( + provider=PROVIDER_AWS, + node_type=NODE_TYPE_IAM_ROLE, + provider_id=arn, + region=REGION_GLOBAL, + properties={"account_id": account_id, "is_synthetic": False, "path": "/"}, + ) + + +def _trust_result( + principal_value: str, + resolved_node_type: str = NODE_TYPE_ACCOUNT_ROOT, + trust_scope: str = TRUST_SCOPE_ACCOUNT_ROOT, + cross_account: bool = True, +) -> TrustParseResult: + return TrustParseResult( + statement_index=0, + effect="Allow", + action="sts:AssumeRole", + principal_type="AWS", + principal_value=principal_value, + resolved_node_type=resolved_node_type, + trust_scope=trust_scope, + raw_conditions={}, + cross_account=cross_account, + ) + + +def _account_data(role_node: Node, trust_result: TrustParseResult) -> AccountData: + return AccountData( + account_id=role_node.properties["account_id"], + nodes=[role_node], + trust_results=[(role_node, trust_result)], + permission_results=[], + role_arns=[role_node.provider_id], + ) + + +def _org_data(account_ids: list[str]) -> OrgData: + return OrgData( + org_id="o-example", + root_id="r-root", + accounts=[ + AccountInfo( + account_id=account_id, + name=f"Account{index}", + email=f"account{index}@example.com", + status="ACTIVE", + parent_id="r-root", + ) + for index, account_id in enumerate(account_ids) + ], + ) + + +def _resolve_single_trust( + org_account_ids: list[str], + trust_result: TrustParseResult, + *, + config: PipelineConfig | None = None, +) -> tuple[list[Node], bytes, str]: + role_node = _role_node() + nodes, edges, constraints, edge_constraints, _budget = _run_resolution( + org_data=_org_data(org_account_ids), + all_account_data=[_account_data(role_node, trust_result)], + config=config or PipelineConfig(), + ) + scenario_bytes, scenario_hash = emit_scenario( + nodes=nodes, + edges=edges, + constraints=constraints, + edge_constraints=edge_constraints, + metadata=ScenarioMetadata(), + ) + return nodes, scenario_bytes, scenario_hash + + +def _node_by_provider_id(nodes: list[Node], provider_id: str) -> Node: + return next(node for node in nodes if node.provider_id == provider_id) + + +def test_pipeline_known_account_root_synthetic_node_is_member() -> None: + """Known org accounts stay member even when their account data is absent.""" + trust_result = _trust_result(_root_arn(OTHER_ACCOUNT)) + + nodes, _scenario_bytes, _scenario_hash = _resolve_single_trust( + [MEMBER_ACCOUNT, OTHER_ACCOUNT], + trust_result, + ) + + source_node = _node_by_provider_id(nodes, _root_arn(OTHER_ACCOUNT)) + assert source_node.properties["org_membership_status"] == "member" + assert source_node.properties["org_member"] is True + assert source_node.properties["is_external"] is False + + +def test_pipeline_complete_org_absent_account_is_non_member() -> None: + """Complete org collection can classify absent accounts as non-members.""" + trust_result = _trust_result(_root_arn(OTHER_ACCOUNT)) + + nodes, _scenario_bytes, _scenario_hash = _resolve_single_trust( + [MEMBER_ACCOUNT], + trust_result, + ) + + source_node = _node_by_provider_id(nodes, _root_arn(OTHER_ACCOUNT)) + assert source_node.properties["org_membership_status"] == "non_member" + assert source_node.properties["org_member"] is False + assert source_node.properties["is_external"] is True + + +def test_pipeline_partial_org_absent_account_remains_unknown() -> None: + """Skipped or partial collection avoids false non-member classification.""" + trust_result = _trust_result(_root_arn(OTHER_ACCOUNT)) + + nodes, _scenario_bytes, _scenario_hash = _resolve_single_trust( + [MEMBER_ACCOUNT, SKIPPED_ACCOUNT], + trust_result, + ) + + source_node = _node_by_provider_id(nodes, _root_arn(OTHER_ACCOUNT)) + assert source_node.properties["org_membership_status"] == "unknown" + assert source_node.properties["org_member"] is None + assert source_node.properties["is_external"] is None + + +def test_pipeline_cross_account_role_uses_same_membership_logic() -> None: + """Cross-account IAMRole synthetics carry the same tri-state signal.""" + source_role_arn = _role_arn(OTHER_ACCOUNT, "SourceRole") + trust_result = _trust_result( + source_role_arn, + resolved_node_type=NODE_TYPE_IAM_ROLE, + trust_scope=TRUST_SCOPE_SPECIFIC_ROLE, + cross_account=True, + ) + + nodes, _scenario_bytes, _scenario_hash = _resolve_single_trust( + [MEMBER_ACCOUNT, SKIPPED_ACCOUNT], + trust_result, + ) + + source_node = _node_by_provider_id(nodes, source_role_arn) + assert source_node.properties["org_membership_status"] == "unknown" + assert source_node.properties["org_member"] is None + assert source_node.properties["is_external"] is None + + +def test_pipeline_standalone_own_account_synthetic_is_member() -> None: + """Standalone own-account synthetics are not marked external.""" + trust_result = _trust_result( + _root_arn(MEMBER_ACCOUNT), + cross_account=False, + ) + + nodes, _scenario_bytes, _scenario_hash = _resolve_single_trust( + [], + trust_result, + config=PipelineConfig(standalone=True), + ) + + source_node = _node_by_provider_id(nodes, _root_arn(MEMBER_ACCOUNT)) + assert source_node.properties["org_membership_status"] == "member" + assert source_node.properties["org_member"] is True + assert source_node.properties["is_external"] is False + + +def test_pipeline_scenario_json_membership_status_is_deterministic() -> None: + """Scenario JSON deterministically emits the tri-state membership field.""" + trust_result = _trust_result(_root_arn(OTHER_ACCOUNT)) + + _nodes_a, scenario_a, hash_a = _resolve_single_trust( + [MEMBER_ACCOUNT, SKIPPED_ACCOUNT], + trust_result, + ) + _nodes_b, scenario_b, hash_b = _resolve_single_trust( + [MEMBER_ACCOUNT, SKIPPED_ACCOUNT], + trust_result, + ) + + assert scenario_a == scenario_b + assert hash_a == hash_b + scenario = json.loads(scenario_a) + source_node = next(node for node in scenario["nodes"] if node["provider_id"] == _root_arn(OTHER_ACCOUNT)) + assert source_node["properties"]["org_membership_status"] == "unknown" diff --git a/tests/test_cross_account.py b/tests/test_cross_account.py index f23a4d5..9ff1c1c 100644 --- a/tests/test_cross_account.py +++ b/tests/test_cross_account.py @@ -22,7 +22,9 @@ NAKED_INTRA_ACCOUNT, NODE_TYPE_ACCOUNT_ROOT, NODE_TYPE_AWS_SERVICE, + NODE_TYPE_EXTERNAL_ACCOUNT, NODE_TYPE_IAM_ROLE, + NODE_TYPE_IAM_USER, NODE_TYPE_OIDC_PROVIDER, NODE_TYPE_SAML_PROVIDER, NODE_TYPE_WILDCARD_PRINCIPAL, @@ -81,8 +83,37 @@ def test_creates_account_root_node(self) -> None: assert n.provider_id == "arn:aws:iam::222222\u003222222:root" assert n.properties["is_synthetic"] is True assert n.properties["account_id"] == "222222\u003222222" - assert n.properties["is_external"] is True - assert n.properties["org_member"] is False + assert n.properties["org_membership_status"] == "unknown" + assert n.properties["is_external"] is None + assert n.properties["org_member"] is None + + def test_unknown_account_complete_org_is_non_member(self) -> None: + """Complete org collection can classify absent accounts as non-members.""" + tr = _make_trust_result() + nodes = resolve_synthetic_nodes( + [tr], + {"111111\u003111111"}, + org_collection_complete=True, + ) + + assert len(nodes) == 1 + assert nodes[0].properties["org_membership_status"] == "non_member" + assert nodes[0].properties["is_external"] is True + assert nodes[0].properties["org_member"] is False + + def test_unknown_account_partial_org_remains_unknown(self) -> None: + """Partial org collection must not falsely confirm non-membership.""" + tr = _make_trust_result() + nodes = resolve_synthetic_nodes( + [tr], + {"111111\u003111111"}, + org_collection_complete=False, + ) + + assert len(nodes) == 1 + assert nodes[0].properties["org_membership_status"] == "unknown" + assert nodes[0].properties["is_external"] is None + assert nodes[0].properties["org_member"] is None def test_deduplicates_same_principal(self) -> None: """Identical principals from multiple statements produce one node.""" @@ -104,6 +135,8 @@ def test_creates_wildcard_node(self) -> None: assert len(nodes) == 1 assert nodes[0].node_type == NODE_TYPE_WILDCARD_PRINCIPAL assert nodes[0].provider_id == "*" + assert nodes[0].properties["org_membership_status"] == "non_member" + assert nodes[0].properties["org_member"] is False def test_creates_service_node(self) -> None: """Service principal creates AWSService synthetic node.""" @@ -162,9 +195,68 @@ def test_creates_cross_account_role_synthetic(self) -> None: assert len(nodes) == 1 assert nodes[0].node_type == NODE_TYPE_IAM_ROLE assert nodes[0].properties["is_synthetic"] is True + assert nodes[0].properties["org_membership_status"] == "unknown" + assert nodes[0].properties["is_external"] is None + assert nodes[0].properties["org_member"] is None + + def test_cross_account_role_complete_org_is_non_member(self) -> None: + """Cross-account IAMRole uses the same complete-org membership logic.""" + tr = _make_trust_result( + principal_value="arn:aws:iam::222222\u003222222:role/CrossRole", + resolved_node_type=NODE_TYPE_IAM_ROLE, + trust_scope=TRUST_SCOPE_SPECIFIC_ROLE, + cross_account=True, + ) + nodes = resolve_synthetic_nodes( + [tr], + {"111111\u003111111"}, + org_collection_complete=True, + ) + + assert len(nodes) == 1 + assert nodes[0].properties["org_membership_status"] == "non_member" assert nodes[0].properties["is_external"] is True assert nodes[0].properties["org_member"] is False + def test_cross_account_user_known_account_is_member(self) -> None: + """Cross-account IAMUser synthetic nodes are member when account is known.""" + tr = _make_trust_result( + principal_value="arn:aws:iam::222222\u003222222:user/CrossUser", + resolved_node_type=NODE_TYPE_IAM_USER, + trust_scope=TRUST_SCOPE_SPECIFIC_ROLE, + cross_account=True, + ) + nodes = resolve_synthetic_nodes( + [tr], + {"111111\u003111111", "222222\u003222222"}, + ) + + assert len(nodes) == 1 + assert nodes[0].node_type == NODE_TYPE_IAM_USER + assert nodes[0].properties["org_membership_status"] == "member" + assert nodes[0].properties["is_external"] is False + assert nodes[0].properties["org_member"] is True + + def test_external_account_membership_is_unknown(self) -> None: + """Unrecognized external principals do not assert non-membership.""" + tr = _make_trust_result( + principal_value="AIDAABCDEFGHIJKLMNOP", + resolved_node_type=NODE_TYPE_EXTERNAL_ACCOUNT, + trust_scope=TRUST_SCOPE_ACCOUNT_ROOT, + cross_account=True, + ) + nodes = resolve_synthetic_nodes( + [tr], + {"111111\u003111111"}, + org_collection_complete=True, + ) + + assert len(nodes) == 1 + assert nodes[0].node_type == NODE_TYPE_EXTERNAL_ACCOUNT + assert nodes[0].properties["org_membership_status"] == "unknown" + assert nodes[0].properties["is_external"] is None + assert nodes[0].properties["org_member"] is None + def test_skips_same_account_role(self) -> None: """Same-account IAMRole does NOT create synthetic node (will be collected).""" tr = _make_trust_result( @@ -185,6 +277,7 @@ def test_marks_known_account_as_internal(self) -> None: nodes = resolve_synthetic_nodes([tr], {"111111\u003111111", "222222\u003222222"}) assert len(nodes) == 1 + assert nodes[0].properties["org_membership_status"] == "member" assert nodes[0].properties["is_external"] is False assert nodes[0].properties["org_member"] is True