Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions iamscope/resolver/permission_boundary.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
similar to SCPs but scoped per-principal.

Binding rules:
- Trust edges: bind to edges where dst has the boundary
- Permission edges: bind to edges where src has the boundary
- Trust edges: do not bind. Permission boundaries constrain a principal's
effective permissions; they do not constrain who can assume that principal
through a trust policy.

BND-1 post-fix behaviour (S03):
- If the constraint's `parse_status` is "complete", compare the edge's
Expand Down Expand Up @@ -95,9 +97,10 @@ def bind_permission_boundaries(
) -> list[EdgeConstraint]:
"""Bind permission boundary constraints to edges.

A boundary applies to any edge involving the constrained principal:
- Trust edges where dst has the boundary
- Permission edges where src has the boundary
A boundary applies to permission edges where the edge source is the
constrained principal. It intentionally does not bind to trust edges:
permission boundaries constrain what a principal can do after credentials
exist, not who can assume a role through its trust policy.

Post-BND-1 (S03): the binding now computes action intersection against
the boundary's `allowed_actions`. See the module docstring for the
Expand Down Expand Up @@ -135,15 +138,11 @@ def bind_permission_boundaries(
edge_constraints: list[EdgeConstraint] = []

for edge in edges:
# Identify which end of the edge the boundary applies to, based on layer.
if "_trust" in edge.edge_type:
constrained_id = edge.dst.provider_id
side = "dst"
elif "_permission" in edge.edge_type:
constrained_id = edge.src.provider_id
side = "src"
else:
continue # service edges etc. — boundaries don't apply
if "_permission" not in edge.edge_type:
continue

constrained_id = edge.src.provider_id
side = "src"

boundary_arn = boundary_by_principal.get(constrained_id, "")
if not boundary_arn or boundary_arn not in constraint_by_boundary:
Expand Down Expand Up @@ -247,13 +246,11 @@ def _extract_edge_action(edge_type: str) -> str | None:
"""Return the action portion of an edge_type string.

Edge types follow the pattern `<action>_<layer>` where layer is
`trust` or `permission` (e.g. `sts:AssumeRole_trust`,
`iam:PassRole_permission`). Returns None if neither suffix matches.
`permission` (e.g. `iam:PassRole_permission`). Returns None if
the suffix does not match.
"""
if edge_type.endswith("_permission"):
return edge_type[: -len("_permission")]
if edge_type.endswith("_trust"):
return edge_type[: -len("_trust")]
return None


Expand Down
41 changes: 40 additions & 1 deletion tests/integration/test_full_pipeline_reasoner_verdicts.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,52 @@ def test_cross_account_trust_ignores_non_scp_bindings_for_scp_check() -> None:
}

assert CONSTRAINT_TYPE_TRUST_CONDITION in bound_constraint_types
assert CONSTRAINT_TYPE_PERMISSION_BOUNDARY in bound_constraint_types
assert CONSTRAINT_TYPE_PERMISSION_BOUNDARY not in bound_constraint_types
assert CONSTRAINT_TYPE_SCP not in bound_constraint_types
assert scp_check.state is CheckState.PASS
assert not any(blocker.kind == "scp" for blocker in finding.blockers_observed)
assert trust_edge.edge_id in finding.evidence.edge_refs


def test_cross_account_trust_defensively_ignores_malformed_non_scp_trust_binding() -> None:
bundle = _cross_account_bundle()
target = _role_arn(_account("2"), "PipelineExternalIdTrustTarget")
trust_edge = next(
edge for edge in bundle.edges if edge.dst.provider_id == target and _edge_action(edge) == "sts:AssumeRole"
)
boundary_constraint = next(
constraint
for constraint in bundle.constraints
if constraint.constraint_type == CONSTRAINT_TYPE_PERMISSION_BOUNDARY
)
malformed_boundary_binding = EdgeConstraint(
edge_id=trust_edge.edge_id,
constraint_id=boundary_constraint.constraint_id,
governance_confidence="complete",
likely_blocking=True,
binding_reason="malformed legacy boundary binding on trust edge",
)
facts = _facts(
bundle.nodes,
bundle.edges,
bundle.constraints,
bundle.edge_constraints + (malformed_boundary_binding,),
)

findings = CrossAccountTrustReasoner().run(facts)
finding = _single_finding(findings, pattern_id="cross_account_trust", target=target)
scp_check = _check(finding, "no_scp_blocks_sts_assumerole")
bound_constraint_types = {
facts.constraint_by_id(binding.constraint_id).constraint_type
for binding in facts.bindings_for_edge(trust_edge.edge_id)
}

assert CONSTRAINT_TYPE_PERMISSION_BOUNDARY in bound_constraint_types
assert CONSTRAINT_TYPE_SCP not in bound_constraint_types
assert scp_check.state is CheckState.PASS
assert not any(blocker.kind == "scp" for blocker in finding.blockers_observed)


def test_cross_account_trust_real_scp_binding_blocks_scp_check() -> None:
source_account = _account("1")
bundle = _cross_account_bundle(scp_scope_account=source_account)
Expand Down
202 changes: 202 additions & 0 deletions tests/resolver/test_permission_boundary_binder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
"""Permission-boundary binder tests."""

from __future__ import annotations

from iamscope.collector.account import AccountData
from iamscope.constants import (
CONSTRAINT_TYPE_PERMISSION_BOUNDARY,
EDGE_LAYER_PERMISSION,
EDGE_LAYER_TRUST,
NODE_TYPE_ACCOUNT_ROOT,
NODE_TYPE_IAM_ROLE,
NODE_TYPE_S3_BUCKET,
PROVIDER_AWS,
REGION_GLOBAL,
)
from iamscope.models import AccountInfo, Edge, Node, OrgData
from iamscope.parser.permission_policy import parse_permission_policy
from iamscope.parser.trust_policy import parse_trust_policy
from iamscope.pipeline import PipelineConfig, _run_resolution
from iamscope.resolver.permission_boundary import bind_permission_boundaries, build_permission_boundary_constraints

TARGET_ACCOUNT = "1" * 12
SOURCE_ACCOUNT = "2" * 12


def _role_arn(account_id: str = TARGET_ACCOUNT) -> str:
return f"arn:aws:iam::{account_id}:role/BoundaryTarget"


def _root_arn(account_id: str = SOURCE_ACCOUNT) -> str:
return f"arn:aws:iam::{account_id}:root"


def _boundary_arn(account_id: str = TARGET_ACCOUNT) -> str:
return f"arn:aws:iam::{account_id}:policy/BoundaryPolicy"


def _bucket_arn() -> str:
return "arn:aws:s3:::boundary-demo-bucket"


def _role_node() -> Node:
return Node(
provider=PROVIDER_AWS,
node_type=NODE_TYPE_IAM_ROLE,
provider_id=_role_arn(),
region=REGION_GLOBAL,
properties={
"account_id": TARGET_ACCOUNT,
"path": "/",
"permission_boundary_arn": _boundary_arn(),
},
)


def _source_root_node() -> Node:
return Node(
provider=PROVIDER_AWS,
node_type=NODE_TYPE_ACCOUNT_ROOT,
provider_id=_root_arn(),
region=REGION_GLOBAL,
properties={"account_id": SOURCE_ACCOUNT, "is_synthetic": True},
)


def _bucket_node() -> Node:
return Node(
provider=PROVIDER_AWS,
node_type=NODE_TYPE_S3_BUCKET,
provider_id=_bucket_arn(),
region="us-east-1",
properties={"account_id": TARGET_ACCOUNT},
)


def _boundary_policy() -> dict:
return {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:ListBucket", "lambda:CreateFunction"],
"Resource": "*",
}
],
}


def test_permission_boundary_binds_permission_edge_not_trust_edge() -> None:
"""Boundaries constrain source-principal permissions, not trust admission."""
source = _source_root_node()
role = _role_node()
bucket = _bucket_node()
trust_edge = Edge(
edge_type=f"sts:AssumeRole_{EDGE_LAYER_TRUST}",
src=source.to_ref(),
dst=role.to_ref(),
region=REGION_GLOBAL,
features={"layer": EDGE_LAYER_TRUST},
)
permission_edge = Edge(
edge_type=f"s3:ListBucket_{EDGE_LAYER_PERMISSION}",
src=role.to_ref(),
dst=bucket.to_ref(),
region=REGION_GLOBAL,
features={"layer": EDGE_LAYER_PERMISSION},
)
constraints = build_permission_boundary_constraints({_boundary_arn(): _boundary_policy()})

edge_constraints = bind_permission_boundaries(
[trust_edge, permission_edge],
[source, role, bucket],
constraints,
)

assert len(edge_constraints) == 1
assert edge_constraints[0].edge_id == permission_edge.edge_id
assert edge_constraints[0].likely_blocking is False
assert edge_constraints[0].governance_confidence == "complete"
assert "src has permission boundary" in edge_constraints[0].binding_reason


def test_run_resolution_does_not_attach_permission_boundary_to_trust_edge() -> None:
"""Pipeline output has boundary sidecars only on permission edges."""
role = _role_node()
trust_result = parse_trust_policy(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": _root_arn()},
"Action": "sts:AssumeRole",
}
],
},
role_arn=role.provider_id,
role_account_id=TARGET_ACCOUNT,
)[0]
permission_results = parse_permission_policy(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "lambda:CreateFunction",
"Resource": "*",
}
],
},
source_arn=role.provider_id,
source_node_type=NODE_TYPE_IAM_ROLE,
source_account_id=TARGET_ACCOUNT,
policy_source="inline",
policy_name="BoundaryTargetPolicy",
)
account_data = AccountData(
account_id=TARGET_ACCOUNT,
nodes=[role, _bucket_node()],
trust_results=[(role, trust_result)],
permission_results=permission_results,
role_arns=[role.provider_id],
permission_boundary_policies={_boundary_arn(): _boundary_policy()},
)
org_data = OrgData(
org_id="o-boundary",
root_id="r-root",
accounts=[
AccountInfo(
account_id=TARGET_ACCOUNT,
name="Boundary",
email="boundary@example.invalid",
status="ACTIVE",
parent_id="r-root",
)
],
)

_nodes, edges, constraints, edge_constraints, _budget = _run_resolution(
org_data,
[account_data],
PipelineConfig(),
)

boundary_constraint_ids = {
constraint.constraint_id
for constraint in constraints
if constraint.constraint_type == CONSTRAINT_TYPE_PERMISSION_BOUNDARY
}
assert boundary_constraint_ids
trust_edge_ids = {edge.edge_id for edge in edges if edge.edge_type.endswith(f"_{EDGE_LAYER_TRUST}")}
permission_edge_ids = {edge.edge_id for edge in edges if edge.edge_type.endswith(f"_{EDGE_LAYER_PERMISSION}")}

boundary_binding_edge_ids = {
edge_constraint.edge_id
for edge_constraint in edge_constraints
if edge_constraint.constraint_id in boundary_constraint_ids
}

assert boundary_binding_edge_ids
assert boundary_binding_edge_ids <= permission_edge_ids
assert boundary_binding_edge_ids.isdisjoint(trust_edge_ids)
23 changes: 8 additions & 15 deletions tests/test_permission_boundary.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

Tests cover:
- build_permission_boundary_constraints from policy docs
- bind_permission_boundaries to trust and permission edges
- bind_permission_boundaries to permission edges only
- permission boundaries do not bind to trust edges
- Empty policies produce no constraints
- Multiple boundaries deduplication
- Pipeline integration with boundary-attached role
Expand Down Expand Up @@ -123,8 +124,8 @@ def test_deny_statements_excluded(self) -> None:
class TestBindBoundaries:
"""Tests for bind_permission_boundaries."""

def test_trust_edge_dst_has_boundary(self) -> None:
"""Trust edge bound when dst role has the boundary."""
def test_trust_edge_dst_has_boundary_does_not_bind(self) -> None:
"""Trust edge is not bound when dst role has the boundary."""
dst_node = _make_node("arn:role/Target", boundary_arn="arn:boundary")
edge = _make_edge("sts:AssumeRole_trust", "arn:src", "arn:role/Target")

Expand All @@ -138,9 +139,7 @@ def test_trust_edge_dst_has_boundary(self) -> None:
}
)
ecs = bind_permission_boundaries([edge], [dst_node], constraints)
assert len(ecs) == 1
assert ecs[0].edge_id == edge.edge_id
assert ecs[0].constraint_id == constraints[0].constraint_id
assert ecs == []

def test_permission_edge_src_has_boundary(self) -> None:
"""Permission edge bound when src principal has the boundary."""
Expand Down Expand Up @@ -518,17 +517,11 @@ def test_boundary_explicit_deny_blocks_even_with_allow(self) -> None:
assert ecs[0].governance_confidence == "complete"
assert "explicit boundary Deny" in ecs[0].binding_reason

def test_trust_edge_also_action_intersected(self) -> None:
"""BND-1 fix applies symmetrically to _trust edges (dst-constrained).

Not in the plan's 8-test list; added as a regression guard because the
binding logic handles both edge layers and both should be exercised.
"""
def test_trust_edge_is_not_boundary_intersected(self) -> None:
"""Permission-boundary action intersection is permission-edge-only."""
ecs = self._bind(
boundary_arn="arn:aws:iam::222222\u003222222:policy/AssumeRoleOnly",
allowed_actions=["sts:AssumeRole"],
edge_type="sts:AssumeRole_trust",
)
assert len(ecs) == 1
assert ecs[0].likely_blocking is False
assert ecs[0].governance_confidence == "complete"
assert ecs == []