Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions plans/2026-04-27-portscan-afxdp-plan-v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,29 @@ AF_XDP requires `CAP_NET_RAW` and `CAP_BPF` (kernel ≥ 5.8) or `CAP_SYS_ADMIN`
| Kernel < 5.10 on some legacy worker hosts | Low | High (if hit) | Runtime probe gates AF_XDP off and logs; falls back to AF_PACKET cleanly. |
| systemd unit missing `CAP_BPF` | High (will hit on first prod run) | High | Phase 2 PR includes systemd unit edit; Phase 1 just records the dependency. |
| Phase 2 LOC estimate slips | Medium | Low | Subdivide Phase 2 into the four PRs in §8 so a slip in one doesn't block the others. |
| Operator scales NIC count past CPU sweet spot | Medium | High | Subprocess concurrency cap stays at **`ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES=4`** regardless of NIC count — see §6.1 below. |

### 6.1 Multi-NIC subprocess cap stays at 4 regardless of NIC count

The c6in.xlarge → c6in.metal upgrade path lets operators attach up to 15 ENIs (anygpt-45 wires the launch path; PR sequence: PR #64 ENI auto-discovery → PR #65 AF_XDP plan → anygpt-45 launch-path scaling). Every additional ENI in AF_XDP `drv+copy` mode (the only mode ENA exposes on kernel ≤6.12.74) adds ~3M pps linearly until CPU contention saturates, so the temptation is to also raise the per-host *subprocess* cap — `ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES` (defaulted to 4 by `install-worker-bundle.sh:323-324`) — proportionally with NIC count.

**Don't.** anygpt-4's c6in.metal bench data already settled this:

| NIC count | Subproc cap | Aggregate pps |
|-----------|-------------|-------------------|
| 4 | 4 | 12.8 M (kernel TX peak) |
| 8 | 8 | 1.3 M (CPU-thrash collapse) |
| 8 | 4 | ~12 M (parity with 4-NIC) |

The 8/8 row is the load-bearing data point: shards 5-8 CPU-starved shards 1-4, NAPI softirqs queued behind userland sender loops, and the aggregate dropped by an order of magnitude. **The host-CPU sweet spot is `cap=4` and that does not move when you add NICs** — it's the kernel TX-batch + AIMD reaction-time interaction, not a NIC-count function.

What more NICs *do* unlock, when paired with `cap=4`:

- **AF_PACKET (today):** the per-socket TX-lock keeps each shard near 3M pps. `cap=4` × 4 NICs = ~12M aggregate. Adding NICs 5-15 buys nothing in this mode because `cap=4` only ever drives 4 of them.
- **AF_XDP `drv+copy` (Phase 2 of this plan, kernel ≤6.12.74):** each shard rises to ~3M pps × `cap=4` = ~12-22M aggregate (per the anyscan_afxdp_ena_constraint memory). Adding NICs 5-15 still buys nothing **at `cap=4`**.
- **AF_XDP `zerocopy` (kernel >6.12.74, separate worker):** the per-shard ceiling rises into the 8-12M pps range, and *now* multiple sender loops on different NICs can run concurrently because the syscall path is mostly gone. At that point a higher cap becomes worth measuring — but that's a kernel-upgrade story, gated on this plan's §4.3 probe returning a `zerocopy`-capable host.

So: **leave `ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES=4`** when scaling beyond 8 ENIs. The remaining 11 NICs are headroom for the kernel-upgrade worker that flips ENA from `drv+copy` to `zerocopy`; they are not free pps on the existing kernel. Any operator who raises the cap should re-bench on a fresh c6in.metal box first — anygpt-4's 8/8 collapse is the stop sign.

---

Expand Down
326 changes: 321 additions & 5 deletions tools/ec2_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,217 @@ def split_csv(value: str | None) -> list[str]:
return parts


def parse_max_enis(value: str | None) -> int | None:
"""Parse ANYSCAN_MAX_ENIS. Empty/unset preserves single-NIC behavior.

Multi-ENI attach engages only when the operator opts in by setting
ANYSCAN_MAX_ENIS to a positive integer. The recommended value on
c6in.metal (and any instance type whose `MaximumNetworkInterfaces`
reaches 15) is 15; the manager will clamp the requested count down
to whatever the hardware actually exposes via DescribeInstanceTypes.
Non-positive or non-integer inputs raise SystemExit so a typo in the
operator's environment fails loudly rather than silently dropping
back to single-NIC.
"""
if value is None:
return None
raw = value.strip()
if not raw:
return None
try:
parsed = int(raw)
except ValueError as exc:
raise SystemExit(
f"ANYSCAN_MAX_ENIS must be a positive integer, got {value!r}"
) from exc
if parsed <= 0:
raise SystemExit(
f"ANYSCAN_MAX_ENIS must be a positive integer, got {parsed}"
)
return parsed


def compute_target_eni_count(hardware_cap: int, max_enis: int | None) -> int:
"""Return how many ENIs to attach at launch.

When max_enis is None the manager keeps the legacy single-NIC launch
path. When set, the attach count is clamped to min(hardware_cap,
max_enis) — and to at least 1 even if the AWS describe call returned
a clearly-bogus zero, so the launch never goes out with an empty
NetworkInterfaces list.
"""
if max_enis is None:
return 1
cap = max(1, hardware_cap)
return max(1, min(cap, max_enis))


def distribute_enis_across_cards(
target_count: int, network_cards: list[dict[str, Any]] | None
) -> list[tuple[int, int]]:
"""Place `target_count` ENIs across the instance's network cards.

Returns a list of (NetworkCardIndex, per-card-DeviceIndex) tuples in
the order ENIs should appear in the RunInstances NetworkInterfaces
array. Round-robin across cards is preferred over packing one card:
every additional card unlocks an independent PCIe queue/IRQ tree,
which is the whole point of attaching more ENIs at this scale.

On c6in.metal DescribeInstanceTypes returns NetworkCards = [
{NetworkCardIndex:0, MaximumNetworkInterfaces:5}, (primary)
{NetworkCardIndex:1, MaximumNetworkInterfaces:4},
{NetworkCardIndex:2, MaximumNetworkInterfaces:3},
{NetworkCardIndex:3, MaximumNetworkInterfaces:3},
] — total 15. Without per-card distribution, attaching 15 ENIs to
card 0 (the default) hard-fails RunInstances because card 0 only
has 5 slots.

When `network_cards` is None or empty (single-card instance types,
or a stripped-down DescribeInstanceTypes payload), all ENIs land on
card 0 with sequential DeviceIndex values — the legacy single-card
behavior every pre-c6in instance type uses.
"""
if target_count < 1:
raise ValueError("target_count must be >= 1")

if not network_cards:
return [(0, idx) for idx in range(target_count)]

cards: list[tuple[int, int]] = []
for card in network_cards:
if not isinstance(card, dict):
continue
index = card.get("NetworkCardIndex")
capacity = card.get("MaximumNetworkInterfaces")
if not isinstance(index, int) or not isinstance(capacity, int):
continue
if capacity < 1:
continue
cards.append((index, capacity))
if not cards:
return [(0, idx) for idx in range(target_count)]
cards.sort(key=lambda entry: entry[0])

# Per-card next-DeviceIndex counter. The primary ENI must land on
# the primary card (NetworkCardIndex 0) at DeviceIndex 0, and each
# subsequent ENI takes the next free slot on the next card in
# round-robin order.
used: dict[int, int] = {index: 0 for index, _ in cards}
capacity_by_card = dict(cards)
placement: list[tuple[int, int]] = []

# Force ENI 0 onto card 0 if card 0 exists; AWS rejects a primary
# ENI on a non-zero card.
primary_card = cards[0][0]
placement.append((primary_card, 0))
used[primary_card] = 1
if target_count == 1:
return placement

card_order = [index for index, _ in cards]
cursor = 1 # next card index in round-robin
placed = 1
while placed < target_count:
# Advance through cards looking for the next one with a free
# slot. If every card is full we silently truncate; the caller
# has already clamped target_count to MaximumNetworkInterfaces
# so this branch should be unreachable, but it keeps the helper
# total instead of raising mid-launch.
attempts = 0
while attempts < len(card_order):
card_index = card_order[cursor % len(card_order)]
cursor += 1
attempts += 1
if used[card_index] < capacity_by_card[card_index]:
placement.append((card_index, used[card_index]))
used[card_index] += 1
placed += 1
break
else:
break
return placement


def build_network_interfaces(
*,
target_count: int,
subnet_ids: list[str],
security_group_ids: list[str],
network_cards: list[dict[str, Any]] | None = None,
) -> list[dict[str, Any]]:
"""Construct the NetworkInterfaces parameter for ec2:RunInstances.

Subnets rotate round-robin through `subnet_ids` so an operator with
a multi-AZ-incompatible /28 primary subnet can supply a comma list
in ANYSCAN_EC2_ENI_SUBNET_IDS to spread secondaries across larger
subnets in the same AZ. Single-subnet operators are unaffected:
every ENI lands in `subnet_ids[0]`.

`network_cards` (when supplied from DescribeInstanceTypes' NetworkInfo
payload) drives ENI placement across physical network cards via
`distribute_enis_across_cards` — required on multi-card instance
types like c6in.metal where the primary card holds only 5 of the 15
available ENI slots.
"""
if target_count < 1:
raise ValueError("target_count must be >= 1")
if not subnet_ids:
raise ValueError("subnet_ids must contain at least one subnet")
placements = distribute_enis_across_cards(target_count, network_cards)
interfaces: list[dict[str, Any]] = []
for sequence_index, (card_index, device_index) in enumerate(placements):
spec: dict[str, Any] = {
"DeviceIndex": device_index,
"SubnetId": subnet_ids[sequence_index % len(subnet_ids)],
}
# Only emit NetworkCardIndex when we actually had per-card data
# to act on. Single-card instance types (the entire pre-c6in
# fleet) keep the legacy payload shape; that matters because
# some older instance types reject NetworkCardIndex outright.
if network_cards:
spec["NetworkCardIndex"] = card_index
if security_group_ids:
spec["Groups"] = list(security_group_ids)
interfaces.append(spec)
return interfaces


def eni_cap_from_describe_response(response: dict[str, Any]) -> int | None:
"""Pull MaximumNetworkInterfaces out of DescribeInstanceTypes payload.

Returns None when the field is missing/malformed; callers fall back
to a safe single-NIC launch in that case rather than guessing a
higher number.
"""
types = response.get("InstanceTypes") or []
if not types:
return None
network = (types[0] or {}).get("NetworkInfo") or {}
cap = network.get("MaximumNetworkInterfaces")
if not isinstance(cap, int) or cap < 1:
return None
return cap


def network_cards_from_describe_response(
response: dict[str, Any],
) -> list[dict[str, Any]]:
"""Pull the NetworkCards list out of DescribeInstanceTypes payload.

Returns an empty list when the payload is missing the field — the
caller treats that as "single-card instance type" and skips the
NetworkCardIndex assignment.
"""
types = response.get("InstanceTypes") or []
if not types:
return []
network = (types[0] or {}).get("NetworkInfo") or {}
cards = network.get("NetworkCards") or []
if not isinstance(cards, list):
return []
return [card for card in cards if isinstance(card, dict)]


@dataclass
class ManagerConfig:
region: str
Expand All @@ -88,6 +299,15 @@ class ManagerConfig:
bootstrap_grace_seconds: int
worker_stale_seconds: int
loop_interval_seconds: int
# Multi-ENI launch knobs. When `max_enis` is None the manager preserves
# the legacy single-NIC launch path (top-level SubnetId/SecurityGroupIds
# passed to RunInstances). When `max_enis` is set, RunInstances is
# invoked with an explicit NetworkInterfaces array sized to
# min(hw_cap_from_describe_instance_types, max_enis); each ENI rotates
# through `eni_subnet_ids` (defaults to `[subnet_id]`) and shares the
# primary ENI's security group.
max_enis: int | None
eni_subnet_ids: list[str]

@classmethod
def from_env(cls) -> "ManagerConfig":
Expand Down Expand Up @@ -143,6 +363,8 @@ def from_env(cls) -> "ManagerConfig":
"ANYSCAN_EC2_WORKER_STALE_SECONDS", DEFAULT_WORKER_STALE_SECONDS
),
loop_interval_seconds=env_int("ANYSCAN_EC2_LOOP_INTERVAL_SECONDS", 60),
max_enis=parse_max_enis(env_string("ANYSCAN_MAX_ENIS")),
eni_subnet_ids=split_csv(env_string("ANYSCAN_EC2_ENI_SUBNET_IDS")),
)


Expand Down Expand Up @@ -617,6 +839,48 @@ def build_user_data(self) -> str:
)
return "\n".join(lines) + "\n"

def _describe_instance_type_network_info(
self,
) -> tuple[int | None, list[dict[str, Any]]]:
"""Look up ENI cap + NetworkCards layout for the configured type.

Returns (cap, network_cards). `cap` is None on any failure path
so callers can fall back to a single-NIC launch instead of
guessing — the cost of guessing too high is a hard RunInstances
failure that takes the whole worker offline. NetworkCards is a
list (possibly empty) of {NetworkCardIndex, MaximumNetworkInterfaces}
dicts copied from the DescribeInstanceTypes payload; consumed by
`distribute_enis_across_cards` so 15-ENI launches on c6in.metal
spread correctly across the 4 physical cards.
"""
try:
response = self.ec2.describe_instance_types(
InstanceTypes=[self.config.instance_type]
)
except botocore.exceptions.ClientError:
return None, []
except Exception:
return None, []
return (
eni_cap_from_describe_response(response),
network_cards_from_describe_response(response),
)

def _resolve_eni_subnet_pool(self) -> list[str]:
"""Return the ordered subnet pool ENI specs round-robin through.

Defaults to the single primary subnet so single-subnet operators
get min(N, hw_cap) ENIs in one subnet without extra config. When
the operator sets ANYSCAN_EC2_ENI_SUBNET_IDS the pool is taken
from there verbatim; the primary subnet (for legacy single-NIC
launches) is left as ANYSCAN_EC2_SUBNET_ID and unaffected.
"""
if self.config.eni_subnet_ids:
return list(self.config.eni_subnet_ids)
if self.config.subnet_id:
return [self.config.subnet_id]
return []

def recreate_instance(self) -> dict[str, Any]:
current_id = self.current_instance_id()
terminated = None
Expand All @@ -642,12 +906,59 @@ def recreate_instance(self) -> dict[str, Any]:
}
if self.config.key_name:
launch_args["KeyName"] = self.config.key_name
if self.config.security_group_id:
launch_args["SecurityGroupIds"] = [self.config.security_group_id]
if self.config.subnet_id:
launch_args["SubnetId"] = self.config.subnet_id
if self.config.instance_profile_arn:
launch_args["IamInstanceProfile"] = {"Arn": self.config.instance_profile_arn}

# Multi-ENI launch path: only engages when the operator opts in
# via ANYSCAN_MAX_ENIS. Single-NIC behavior is preserved when the
# env var is unset so existing fleets are unaffected.
eni_attach_info: dict[str, Any] = {"requested": self.config.max_enis}
target_count = 1
network_cards: list[dict[str, Any]] = []
if self.config.max_enis is not None:
hw_cap, network_cards = self._describe_instance_type_network_info()
eni_attach_info["hardware_cap"] = hw_cap
eni_attach_info["network_cards"] = network_cards
if hw_cap is None:
# Describe failed — fall back to single-NIC rather than
# guessing 15 and failing RunInstances. The operator can
# raise the IAM permission and retry; the worker stays
# up on one NIC in the meantime.
eni_attach_info["fallback_reason"] = (
"DescribeInstanceTypes returned no usable cap; falling back to single-NIC launch"
)
else:
target_count = compute_target_eni_count(hw_cap, self.config.max_enis)

if target_count > 1:
subnet_pool = self._resolve_eni_subnet_pool()
if not subnet_pool:
eni_attach_info["fallback_reason"] = (
"no subnet configured (ANYSCAN_EC2_SUBNET_ID and ANYSCAN_EC2_ENI_SUBNET_IDS both empty); falling back to single-NIC launch"
)
target_count = 1
else:
security_group_ids = (
[self.config.security_group_id] if self.config.security_group_id else []
)
launch_args["NetworkInterfaces"] = build_network_interfaces(
target_count=target_count,
subnet_ids=subnet_pool,
security_group_ids=security_group_ids,
network_cards=network_cards,
)
eni_attach_info["attached"] = target_count
eni_attach_info["subnet_pool"] = subnet_pool

if target_count == 1:
# Legacy single-NIC path: top-level SubnetId/SecurityGroupIds
# are mutually exclusive with NetworkInterfaces, so we only
# set them on this branch.
if self.config.security_group_id:
launch_args["SecurityGroupIds"] = [self.config.security_group_id]
if self.config.subnet_id:
launch_args["SubnetId"] = self.config.subnet_id
Comment on lines +959 to +960
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve configured subnet pool on single-NIC fallback

When ANYSCAN_MAX_ENIS is set and DescribeInstanceTypes fails, target_count falls back to 1 and this branch only copies self.config.subnet_id into run_instances. If an operator relies on ANYSCAN_EC2_ENI_SUBNET_IDS (with ANYSCAN_EC2_SUBNET_ID unset), the launch request is sent without any subnet even though valid subnets were configured, so EC2 launch fails in accounts without a default VPC. Because recreate_instance() may terminate the current instance before launching the replacement, this can turn a transient describe-permission issue into worker downtime.

Useful? React with 👍 / 👎.


response = self.ec2.run_instances(**launch_args)
instance = response["Instances"][0]
self.set_instance_id(instance["InstanceId"])
Expand All @@ -659,7 +970,12 @@ def recreate_instance(self) -> dict[str, Any]:
except Exception:
pass
self._save_state()
return {"terminated": terminated, "launched": instance, "ssh_rule": ssh_rule}
return {
"terminated": terminated,
"launched": instance,
"ssh_rule": ssh_rule,
"eni_attach": eni_attach_info,
}

def run_once(self) -> dict[str, Any]:
snapshot = self.health_snapshot()
Expand Down
Loading