Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions iamscope/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,34 @@ def _get_account_session(
)


def _org_membership_resolution_context(
org_data: OrgData,
all_account_data: list[AccountData],
config: PipelineConfig,
) -> tuple[set[str], bool]:
"""Return known accounts and whether absence proves non-membership.

Trust-policy synthetic principals need a tri-state org-membership signal:
member, non_member, or unknown. Known active org accounts and directly
collected account IDs are members. Absence proves non-membership only when
the run covers the full active org account set. Standalone, account-filtered,
skipped, or otherwise partial runs leave absent accounts unknown instead of
silently asserting they are external/non-members.
"""
active_org_accounts = set(org_data.active_account_ids)
collected_accounts = {acct.account_id for acct in all_account_data if acct.account_id}
known_accounts = active_org_accounts | collected_accounts

org_collection_complete = (
not config.standalone
and config.account_filter is None
and not config.skip_accounts
and bool(active_org_accounts)
and active_org_accounts <= collected_accounts
)
return known_accounts, org_collection_complete


def _run_resolution(
org_data: OrgData,
all_account_data: list[AccountData],
Expand Down Expand Up @@ -657,8 +685,14 @@ def _add_edges(new_edges: list[Edge]) -> None:
ec2_mode=config.ec2_mode,
)

# Known account IDs for synthetic node resolution
known_accounts = org_data.active_account_ids
# Known account IDs for synthetic node resolution. Absence from this set
# proves non-membership only when collection covered the full active org;
# partial/standalone runs keep absent accounts explicitly unknown.
known_accounts, org_collection_complete = _org_membership_resolution_context(
org_data,
all_account_data,
config,
)

# NF-1 fix (S06): construct a real NoiseFilter from config and pass its
# edge filter function to build_trust_edges. Pre-S06 this was dead code
Expand Down Expand Up @@ -704,7 +738,11 @@ def _add_edges(new_edges: list[Edge]) -> None:
all_nodes.extend(hyperedge_nodes)

# Resolve synthetic nodes (external accounts, wildcards, services)
synthetic_nodes = resolve_synthetic_nodes(all_trust_results, known_account_ids=known_accounts)
synthetic_nodes = resolve_synthetic_nodes(
all_trust_results,
known_account_ids=known_accounts,
org_collection_complete=org_collection_complete,
)
all_nodes.extend(synthetic_nodes)

# Add Lambda/EC2 service nodes and edges
Expand Down
92 changes: 84 additions & 8 deletions iamscope/resolver/cross_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,15 @@
from iamscope.models import ControlRef, Edge, Node, NodeRef, TrustParseResult
from iamscope.resolver.naked_trust import classify_naked_trust

ORG_MEMBERSHIP_MEMBER = "member"
ORG_MEMBERSHIP_NON_MEMBER = "non_member"
ORG_MEMBERSHIP_UNKNOWN = "unknown"


def resolve_synthetic_nodes(
trust_results: list[TrustParseResult],
known_account_ids: set[str] | None = None,
org_collection_complete: bool = False,
) -> list[Node]:
"""Create synthetic nodes for principals referenced in trust policies.

Expand All @@ -51,6 +56,11 @@ def resolve_synthetic_nodes(
trust_results: Parsed trust policy results from all roles.
known_account_ids: Account IDs collected in this run (used to
mark external vs. internal accounts).
org_collection_complete: True when the account/org collection scope
is complete enough that absence from known accounts
can be treated as a confirmed non-member. False
preserves uncertainty for partial or standalone
collection.

Returns:
Sorted list of deduplicated synthetic Node objects.
Expand All @@ -64,7 +74,11 @@ def resolve_synthetic_nodes(
if key in seen:
continue

node = _create_synthetic_node(tr, known)
node = _create_synthetic_node(
tr,
known,
org_collection_complete=org_collection_complete,
)
if node is not None:
seen[key] = node

Expand All @@ -75,6 +89,7 @@ def resolve_synthetic_nodes(
def _create_synthetic_node(
tr: TrustParseResult,
known_account_ids: set[str],
org_collection_complete: bool,
) -> Node | None:
"""Create a single synthetic node from a trust parse result.

Expand All @@ -87,6 +102,7 @@ def _create_synthetic_node(

if node_type == NODE_TYPE_WILDCARD_PRINCIPAL:
properties["description"] = "Any AWS principal (Principal: *)"
properties["org_membership_status"] = ORG_MEMBERSHIP_NON_MEMBER
properties["org_member"] = False
return Node(
provider=PROVIDER_AWS,
Expand All @@ -98,10 +114,14 @@ def _create_synthetic_node(

if node_type == NODE_TYPE_ACCOUNT_ROOT:
account_id = _extract_account_from_arn(provider_id)
is_org_member = account_id in known_account_ids if account_id else False
properties["account_id"] = account_id or ""
properties["is_external"] = not is_org_member
properties["org_member"] = is_org_member
properties.update(
_org_membership_properties(
account_id,
known_account_ids,
org_collection_complete=org_collection_complete,
)
)
return Node(
provider=PROVIDER_AWS,
node_type=NODE_TYPE_ACCOUNT_ROOT,
Expand Down Expand Up @@ -142,7 +162,9 @@ def _create_synthetic_node(

if node_type == NODE_TYPE_EXTERNAL_ACCOUNT:
properties["raw_principal"] = provider_id
properties["org_member"] = False
properties["org_membership_status"] = ORG_MEMBERSHIP_UNKNOWN
properties["org_member"] = None
properties["is_external"] = None
return Node(
provider=PROVIDER_AWS,
node_type=NODE_TYPE_EXTERNAL_ACCOUNT,
Expand All @@ -156,10 +178,14 @@ def _create_synthetic_node(
if node_type in (NODE_TYPE_IAM_ROLE, NODE_TYPE_IAM_USER):
if tr.cross_account:
account_id = _extract_account_from_arn(provider_id)
is_org_member = account_id in known_account_ids if account_id else False
properties["account_id"] = account_id or ""
properties["is_external"] = not is_org_member
properties["org_member"] = is_org_member
properties.update(
_org_membership_properties(
account_id,
known_account_ids,
org_collection_complete=org_collection_complete,
)
)
return Node(
provider=PROVIDER_AWS,
node_type=node_type,
Expand All @@ -173,6 +199,56 @@ def _create_synthetic_node(
return None


def _org_membership_properties(
account_id: str | None,
known_account_ids: set[str],
*,
org_collection_complete: bool,
) -> dict[str, Any]:
"""Return tri-state org-membership properties for synthetic principals.

Compatibility choice: keep the legacy `org_member` and `is_external`
keys, but set both to None when membership is unknown. This preserves
existing field presence for consumers while avoiding a false
non-member/external assertion when collection scope is partial.
"""
status = _org_membership_status(
account_id,
known_account_ids,
org_collection_complete=org_collection_complete,
)
if status == ORG_MEMBERSHIP_MEMBER:
return {
"org_membership_status": status,
"org_member": True,
"is_external": False,
}
if status == ORG_MEMBERSHIP_NON_MEMBER:
return {
"org_membership_status": status,
"org_member": False,
"is_external": True,
}
return {
"org_membership_status": ORG_MEMBERSHIP_UNKNOWN,
"org_member": None,
"is_external": None,
}


def _org_membership_status(
account_id: str | None,
known_account_ids: set[str],
*,
org_collection_complete: bool,
) -> str:
if account_id and account_id in known_account_ids:
return ORG_MEMBERSHIP_MEMBER
if org_collection_complete and account_id:
return ORG_MEMBERSHIP_NON_MEMBER
return ORG_MEMBERSHIP_UNKNOWN


def build_trust_edges(
trust_results: list[TrustParseResult],
role_node: Node,
Expand Down
Loading