From 896768b0e30f747f65abe4f6ca86d4412a8a939e Mon Sep 17 00:00:00 2001 From: skullcmd Date: Tue, 28 Apr 2026 17:13:36 +0000 Subject: [PATCH] feat(ec2-worker): scale launch path beyond 8 ENIs to c6in.metal max of 15 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds opt-in multi-ENI attach to ec2_worker_manager. ANYSCAN_MAX_ENIS unset preserves the legacy single-NIC RunInstances payload shape; set to N (recommended 15 on c6in.metal) attaches min(N, hw_cap) ENIs at launch, spread across NetworkCards via DescribeInstanceTypes so a 15-ENI launch on c6in.metal lands 5/4/3/3 across the four physical cards instead of hard-failing on card 0's 5-slot limit. Also documents in plans/2026-04-27-portscan-afxdp-plan-v1.md §6.1 that ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES=4 stays the per-host CPU sweet spot regardless of NIC count — the additional NICs unlock a kernel- upgrade ceiling, not a userland cap one (anygpt-4 8/8 collapse data). - tools/ec2_worker_manager.py: new helpers (parse_max_enis, compute_target_eni_count, distribute_enis_across_cards, build_network_interfaces, eni_cap_from_describe_response, network_cards_from_describe_response); ManagerConfig.max_enis + eni_subnet_ids; recreate_instance() routes through NetworkInterfaces when opted in, falls back to single-NIC on Describe denial / missing subnet so existing fleets are unaffected. - tools/test_ec2_worker_manager.py: 40 unit tests covering env parsing, hardware-cap detection, per-card distribution, and the run_instances launch payload (boto3 stubbed at import). - tools/test-install-worker-bundle-eni-discovery.sh: bash sanity test confirms tune-scanner-host.sh + reserve-control-bandwidth.sh comma- list iterators handle 15 ENIs and the install-worker-bundle.sh multi-NIC gate fires for a 15-entry candidate list (regression guard against any future hardcoded N=8 cap). - plans/2026-04-27-portscan-afxdp-plan-v1.md: new §6.1 risk-register sub-section explaining why subproc cap stays at 4 when scaling NICs past 8. Default behavior unchanged when ANYSCAN_MAX_ENIS is unset. Co-Authored-By: Claude Opus 4.7 (1M context) --- plans/2026-04-27-portscan-afxdp-plan-v1.md | 23 + tools/ec2_worker_manager.py | 326 ++++++++++- ...est-install-worker-bundle-eni-discovery.sh | 92 +++ tools/test_ec2_worker_manager.py | 529 ++++++++++++++++++ 4 files changed, 965 insertions(+), 5 deletions(-) create mode 100755 tools/test-install-worker-bundle-eni-discovery.sh create mode 100644 tools/test_ec2_worker_manager.py diff --git a/plans/2026-04-27-portscan-afxdp-plan-v1.md b/plans/2026-04-27-portscan-afxdp-plan-v1.md index 2a7e348..b9693c7 100644 --- a/plans/2026-04-27-portscan-afxdp-plan-v1.md +++ b/plans/2026-04-27-portscan-afxdp-plan-v1.md @@ -340,6 +340,29 @@ AF_XDP requires `CAP_NET_RAW` and `CAP_BPF` (kernel ≥ 5.8) or `CAP_SYS_ADMIN` | Kernel < 5.10 on some legacy worker hosts | Low | High (if hit) | Runtime probe gates AF_XDP off and logs; falls back to AF_PACKET cleanly. | | systemd unit missing `CAP_BPF` | High (will hit on first prod run) | High | Phase 2 PR includes systemd unit edit; Phase 1 just records the dependency. | | Phase 2 LOC estimate slips | Medium | Low | Subdivide Phase 2 into the four PRs in §8 so a slip in one doesn't block the others. | +| Operator scales NIC count past CPU sweet spot | Medium | High | Subprocess concurrency cap stays at **`ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES=4`** regardless of NIC count — see §6.1 below. | + +### 6.1 Multi-NIC subprocess cap stays at 4 regardless of NIC count + +The c6in.xlarge → c6in.metal upgrade path lets operators attach up to 15 ENIs (anygpt-45 wires the launch path; PR sequence: PR #64 ENI auto-discovery → PR #65 AF_XDP plan → anygpt-45 launch-path scaling). Every additional ENI in AF_XDP `drv+copy` mode (the only mode ENA exposes on kernel ≤6.12.74) adds ~3M pps linearly until CPU contention saturates, so the temptation is to also raise the per-host *subprocess* cap — `ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES` (defaulted to 4 by `install-worker-bundle.sh:323-324`) — proportionally with NIC count. + +**Don't.** anygpt-4's c6in.metal bench data already settled this: + +| NIC count | Subproc cap | Aggregate pps | +|-----------|-------------|-------------------| +| 4 | 4 | 12.8 M (kernel TX peak) | +| 8 | 8 | 1.3 M (CPU-thrash collapse) | +| 8 | 4 | ~12 M (parity with 4-NIC) | + +The 8/8 row is the load-bearing data point: shards 5-8 CPU-starved shards 1-4, NAPI softirqs queued behind userland sender loops, and the aggregate dropped by an order of magnitude. **The host-CPU sweet spot is `cap=4` and that does not move when you add NICs** — it's the kernel TX-batch + AIMD reaction-time interaction, not a NIC-count function. + +What more NICs *do* unlock, when paired with `cap=4`: + +- **AF_PACKET (today):** the per-socket TX-lock keeps each shard near 3M pps. `cap=4` × 4 NICs = ~12M aggregate. Adding NICs 5-15 buys nothing in this mode because `cap=4` only ever drives 4 of them. +- **AF_XDP `drv+copy` (Phase 2 of this plan, kernel ≤6.12.74):** each shard rises to ~3M pps × `cap=4` = ~12-22M aggregate (per the anyscan_afxdp_ena_constraint memory). Adding NICs 5-15 still buys nothing **at `cap=4`**. +- **AF_XDP `zerocopy` (kernel >6.12.74, separate worker):** the per-shard ceiling rises into the 8-12M pps range, and *now* multiple sender loops on different NICs can run concurrently because the syscall path is mostly gone. At that point a higher cap becomes worth measuring — but that's a kernel-upgrade story, gated on this plan's §4.3 probe returning a `zerocopy`-capable host. + +So: **leave `ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES=4`** when scaling beyond 8 ENIs. The remaining 11 NICs are headroom for the kernel-upgrade worker that flips ENA from `drv+copy` to `zerocopy`; they are not free pps on the existing kernel. Any operator who raises the cap should re-bench on a fresh c6in.metal box first — anygpt-4's 8/8 collapse is the stop sign. --- diff --git a/tools/ec2_worker_manager.py b/tools/ec2_worker_manager.py index a3e58c3..86f6a78 100644 --- a/tools/ec2_worker_manager.py +++ b/tools/ec2_worker_manager.py @@ -62,6 +62,217 @@ def split_csv(value: str | None) -> list[str]: return parts +def parse_max_enis(value: str | None) -> int | None: + """Parse ANYSCAN_MAX_ENIS. Empty/unset preserves single-NIC behavior. + + Multi-ENI attach engages only when the operator opts in by setting + ANYSCAN_MAX_ENIS to a positive integer. The recommended value on + c6in.metal (and any instance type whose `MaximumNetworkInterfaces` + reaches 15) is 15; the manager will clamp the requested count down + to whatever the hardware actually exposes via DescribeInstanceTypes. + Non-positive or non-integer inputs raise SystemExit so a typo in the + operator's environment fails loudly rather than silently dropping + back to single-NIC. + """ + if value is None: + return None + raw = value.strip() + if not raw: + return None + try: + parsed = int(raw) + except ValueError as exc: + raise SystemExit( + f"ANYSCAN_MAX_ENIS must be a positive integer, got {value!r}" + ) from exc + if parsed <= 0: + raise SystemExit( + f"ANYSCAN_MAX_ENIS must be a positive integer, got {parsed}" + ) + return parsed + + +def compute_target_eni_count(hardware_cap: int, max_enis: int | None) -> int: + """Return how many ENIs to attach at launch. + + When max_enis is None the manager keeps the legacy single-NIC launch + path. When set, the attach count is clamped to min(hardware_cap, + max_enis) — and to at least 1 even if the AWS describe call returned + a clearly-bogus zero, so the launch never goes out with an empty + NetworkInterfaces list. + """ + if max_enis is None: + return 1 + cap = max(1, hardware_cap) + return max(1, min(cap, max_enis)) + + +def distribute_enis_across_cards( + target_count: int, network_cards: list[dict[str, Any]] | None +) -> list[tuple[int, int]]: + """Place `target_count` ENIs across the instance's network cards. + + Returns a list of (NetworkCardIndex, per-card-DeviceIndex) tuples in + the order ENIs should appear in the RunInstances NetworkInterfaces + array. Round-robin across cards is preferred over packing one card: + every additional card unlocks an independent PCIe queue/IRQ tree, + which is the whole point of attaching more ENIs at this scale. + + On c6in.metal DescribeInstanceTypes returns NetworkCards = [ + {NetworkCardIndex:0, MaximumNetworkInterfaces:5}, (primary) + {NetworkCardIndex:1, MaximumNetworkInterfaces:4}, + {NetworkCardIndex:2, MaximumNetworkInterfaces:3}, + {NetworkCardIndex:3, MaximumNetworkInterfaces:3}, + ] — total 15. Without per-card distribution, attaching 15 ENIs to + card 0 (the default) hard-fails RunInstances because card 0 only + has 5 slots. + + When `network_cards` is None or empty (single-card instance types, + or a stripped-down DescribeInstanceTypes payload), all ENIs land on + card 0 with sequential DeviceIndex values — the legacy single-card + behavior every pre-c6in instance type uses. + """ + if target_count < 1: + raise ValueError("target_count must be >= 1") + + if not network_cards: + return [(0, idx) for idx in range(target_count)] + + cards: list[tuple[int, int]] = [] + for card in network_cards: + if not isinstance(card, dict): + continue + index = card.get("NetworkCardIndex") + capacity = card.get("MaximumNetworkInterfaces") + if not isinstance(index, int) or not isinstance(capacity, int): + continue + if capacity < 1: + continue + cards.append((index, capacity)) + if not cards: + return [(0, idx) for idx in range(target_count)] + cards.sort(key=lambda entry: entry[0]) + + # Per-card next-DeviceIndex counter. The primary ENI must land on + # the primary card (NetworkCardIndex 0) at DeviceIndex 0, and each + # subsequent ENI takes the next free slot on the next card in + # round-robin order. + used: dict[int, int] = {index: 0 for index, _ in cards} + capacity_by_card = dict(cards) + placement: list[tuple[int, int]] = [] + + # Force ENI 0 onto card 0 if card 0 exists; AWS rejects a primary + # ENI on a non-zero card. + primary_card = cards[0][0] + placement.append((primary_card, 0)) + used[primary_card] = 1 + if target_count == 1: + return placement + + card_order = [index for index, _ in cards] + cursor = 1 # next card index in round-robin + placed = 1 + while placed < target_count: + # Advance through cards looking for the next one with a free + # slot. If every card is full we silently truncate; the caller + # has already clamped target_count to MaximumNetworkInterfaces + # so this branch should be unreachable, but it keeps the helper + # total instead of raising mid-launch. + attempts = 0 + while attempts < len(card_order): + card_index = card_order[cursor % len(card_order)] + cursor += 1 + attempts += 1 + if used[card_index] < capacity_by_card[card_index]: + placement.append((card_index, used[card_index])) + used[card_index] += 1 + placed += 1 + break + else: + break + return placement + + +def build_network_interfaces( + *, + target_count: int, + subnet_ids: list[str], + security_group_ids: list[str], + network_cards: list[dict[str, Any]] | None = None, +) -> list[dict[str, Any]]: + """Construct the NetworkInterfaces parameter for ec2:RunInstances. + + Subnets rotate round-robin through `subnet_ids` so an operator with + a multi-AZ-incompatible /28 primary subnet can supply a comma list + in ANYSCAN_EC2_ENI_SUBNET_IDS to spread secondaries across larger + subnets in the same AZ. Single-subnet operators are unaffected: + every ENI lands in `subnet_ids[0]`. + + `network_cards` (when supplied from DescribeInstanceTypes' NetworkInfo + payload) drives ENI placement across physical network cards via + `distribute_enis_across_cards` — required on multi-card instance + types like c6in.metal where the primary card holds only 5 of the 15 + available ENI slots. + """ + if target_count < 1: + raise ValueError("target_count must be >= 1") + if not subnet_ids: + raise ValueError("subnet_ids must contain at least one subnet") + placements = distribute_enis_across_cards(target_count, network_cards) + interfaces: list[dict[str, Any]] = [] + for sequence_index, (card_index, device_index) in enumerate(placements): + spec: dict[str, Any] = { + "DeviceIndex": device_index, + "SubnetId": subnet_ids[sequence_index % len(subnet_ids)], + } + # Only emit NetworkCardIndex when we actually had per-card data + # to act on. Single-card instance types (the entire pre-c6in + # fleet) keep the legacy payload shape; that matters because + # some older instance types reject NetworkCardIndex outright. + if network_cards: + spec["NetworkCardIndex"] = card_index + if security_group_ids: + spec["Groups"] = list(security_group_ids) + interfaces.append(spec) + return interfaces + + +def eni_cap_from_describe_response(response: dict[str, Any]) -> int | None: + """Pull MaximumNetworkInterfaces out of DescribeInstanceTypes payload. + + Returns None when the field is missing/malformed; callers fall back + to a safe single-NIC launch in that case rather than guessing a + higher number. + """ + types = response.get("InstanceTypes") or [] + if not types: + return None + network = (types[0] or {}).get("NetworkInfo") or {} + cap = network.get("MaximumNetworkInterfaces") + if not isinstance(cap, int) or cap < 1: + return None + return cap + + +def network_cards_from_describe_response( + response: dict[str, Any], +) -> list[dict[str, Any]]: + """Pull the NetworkCards list out of DescribeInstanceTypes payload. + + Returns an empty list when the payload is missing the field — the + caller treats that as "single-card instance type" and skips the + NetworkCardIndex assignment. + """ + types = response.get("InstanceTypes") or [] + if not types: + return [] + network = (types[0] or {}).get("NetworkInfo") or {} + cards = network.get("NetworkCards") or [] + if not isinstance(cards, list): + return [] + return [card for card in cards if isinstance(card, dict)] + + @dataclass class ManagerConfig: region: str @@ -88,6 +299,15 @@ class ManagerConfig: bootstrap_grace_seconds: int worker_stale_seconds: int loop_interval_seconds: int + # Multi-ENI launch knobs. When `max_enis` is None the manager preserves + # the legacy single-NIC launch path (top-level SubnetId/SecurityGroupIds + # passed to RunInstances). When `max_enis` is set, RunInstances is + # invoked with an explicit NetworkInterfaces array sized to + # min(hw_cap_from_describe_instance_types, max_enis); each ENI rotates + # through `eni_subnet_ids` (defaults to `[subnet_id]`) and shares the + # primary ENI's security group. + max_enis: int | None + eni_subnet_ids: list[str] @classmethod def from_env(cls) -> "ManagerConfig": @@ -143,6 +363,8 @@ def from_env(cls) -> "ManagerConfig": "ANYSCAN_EC2_WORKER_STALE_SECONDS", DEFAULT_WORKER_STALE_SECONDS ), loop_interval_seconds=env_int("ANYSCAN_EC2_LOOP_INTERVAL_SECONDS", 60), + max_enis=parse_max_enis(env_string("ANYSCAN_MAX_ENIS")), + eni_subnet_ids=split_csv(env_string("ANYSCAN_EC2_ENI_SUBNET_IDS")), ) @@ -617,6 +839,48 @@ def build_user_data(self) -> str: ) return "\n".join(lines) + "\n" + def _describe_instance_type_network_info( + self, + ) -> tuple[int | None, list[dict[str, Any]]]: + """Look up ENI cap + NetworkCards layout for the configured type. + + Returns (cap, network_cards). `cap` is None on any failure path + so callers can fall back to a single-NIC launch instead of + guessing — the cost of guessing too high is a hard RunInstances + failure that takes the whole worker offline. NetworkCards is a + list (possibly empty) of {NetworkCardIndex, MaximumNetworkInterfaces} + dicts copied from the DescribeInstanceTypes payload; consumed by + `distribute_enis_across_cards` so 15-ENI launches on c6in.metal + spread correctly across the 4 physical cards. + """ + try: + response = self.ec2.describe_instance_types( + InstanceTypes=[self.config.instance_type] + ) + except botocore.exceptions.ClientError: + return None, [] + except Exception: + return None, [] + return ( + eni_cap_from_describe_response(response), + network_cards_from_describe_response(response), + ) + + def _resolve_eni_subnet_pool(self) -> list[str]: + """Return the ordered subnet pool ENI specs round-robin through. + + Defaults to the single primary subnet so single-subnet operators + get min(N, hw_cap) ENIs in one subnet without extra config. When + the operator sets ANYSCAN_EC2_ENI_SUBNET_IDS the pool is taken + from there verbatim; the primary subnet (for legacy single-NIC + launches) is left as ANYSCAN_EC2_SUBNET_ID and unaffected. + """ + if self.config.eni_subnet_ids: + return list(self.config.eni_subnet_ids) + if self.config.subnet_id: + return [self.config.subnet_id] + return [] + def recreate_instance(self) -> dict[str, Any]: current_id = self.current_instance_id() terminated = None @@ -642,12 +906,59 @@ def recreate_instance(self) -> dict[str, Any]: } if self.config.key_name: launch_args["KeyName"] = self.config.key_name - if self.config.security_group_id: - launch_args["SecurityGroupIds"] = [self.config.security_group_id] - if self.config.subnet_id: - launch_args["SubnetId"] = self.config.subnet_id if self.config.instance_profile_arn: launch_args["IamInstanceProfile"] = {"Arn": self.config.instance_profile_arn} + + # Multi-ENI launch path: only engages when the operator opts in + # via ANYSCAN_MAX_ENIS. Single-NIC behavior is preserved when the + # env var is unset so existing fleets are unaffected. + eni_attach_info: dict[str, Any] = {"requested": self.config.max_enis} + target_count = 1 + network_cards: list[dict[str, Any]] = [] + if self.config.max_enis is not None: + hw_cap, network_cards = self._describe_instance_type_network_info() + eni_attach_info["hardware_cap"] = hw_cap + eni_attach_info["network_cards"] = network_cards + if hw_cap is None: + # Describe failed — fall back to single-NIC rather than + # guessing 15 and failing RunInstances. The operator can + # raise the IAM permission and retry; the worker stays + # up on one NIC in the meantime. + eni_attach_info["fallback_reason"] = ( + "DescribeInstanceTypes returned no usable cap; falling back to single-NIC launch" + ) + else: + target_count = compute_target_eni_count(hw_cap, self.config.max_enis) + + if target_count > 1: + subnet_pool = self._resolve_eni_subnet_pool() + if not subnet_pool: + eni_attach_info["fallback_reason"] = ( + "no subnet configured (ANYSCAN_EC2_SUBNET_ID and ANYSCAN_EC2_ENI_SUBNET_IDS both empty); falling back to single-NIC launch" + ) + target_count = 1 + else: + security_group_ids = ( + [self.config.security_group_id] if self.config.security_group_id else [] + ) + launch_args["NetworkInterfaces"] = build_network_interfaces( + target_count=target_count, + subnet_ids=subnet_pool, + security_group_ids=security_group_ids, + network_cards=network_cards, + ) + eni_attach_info["attached"] = target_count + eni_attach_info["subnet_pool"] = subnet_pool + + if target_count == 1: + # Legacy single-NIC path: top-level SubnetId/SecurityGroupIds + # are mutually exclusive with NetworkInterfaces, so we only + # set them on this branch. + if self.config.security_group_id: + launch_args["SecurityGroupIds"] = [self.config.security_group_id] + if self.config.subnet_id: + launch_args["SubnetId"] = self.config.subnet_id + response = self.ec2.run_instances(**launch_args) instance = response["Instances"][0] self.set_instance_id(instance["InstanceId"]) @@ -659,7 +970,12 @@ def recreate_instance(self) -> dict[str, Any]: except Exception: pass self._save_state() - return {"terminated": terminated, "launched": instance, "ssh_rule": ssh_rule} + return { + "terminated": terminated, + "launched": instance, + "ssh_rule": ssh_rule, + "eni_attach": eni_attach_info, + } def run_once(self) -> dict[str, Any]: snapshot = self.health_snapshot() diff --git a/tools/test-install-worker-bundle-eni-discovery.sh b/tools/test-install-worker-bundle-eni-discovery.sh new file mode 100755 index 0000000..8fe4f53 --- /dev/null +++ b/tools/test-install-worker-bundle-eni-discovery.sh @@ -0,0 +1,92 @@ +#!/usr/bin/env bash +# Sanity-check that the per-NIC iteration paths in install-worker-bundle.sh, +# tune-scanner-host.sh, and reserve-control-bandwidth.sh handle a c6in.metal +# fleet of 15 ENIs without truncating or capping the list at 8 (the prior +# AF_PACKET-bench shape from PR #64). +# +# This is a static sanity test: we don't actually attach 15 NICs to the +# CI runner. We assert the comma-list iterators terminate and emit every +# entry by feeding them a synthetic 15-iface input. If a future change +# adds a hardcoded N=8 cap, this test surfaces it with a non-zero exit. + +set -euo pipefail + +REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +TUNE_SH="$REPO_ROOT/tools/tune-scanner-host.sh" +RESERVE_SH="$REPO_ROOT/reserve-control-bandwidth.sh" +INSTALL_SH="$REPO_ROOT/install-worker-bundle.sh" + +fail() { + printf 'FAIL: %s\n' "$1" >&2 + exit 1 +} + +check_exists() { + [ -r "$1" ] || fail "missing $1" +} +check_exists "$TUNE_SH" +check_exists "$RESERVE_SH" +check_exists "$INSTALL_SH" + +# Build a 15-iface comma list mirroring what +# detect_host_scanner_eni_candidates emits on c6in.metal once 15 ENIs are +# attached. +synthetic="" +for n in 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14; do + if [ -z "$synthetic" ]; then + synthetic="ens$n" + else + synthetic="$synthetic,ens$n" + fi +done + +# Source resolve_managed_interfaces from reserve-control-bandwidth.sh in +# a subshell with the synthetic input. We expect 15 distinct iface names +# on stdout, one per line. +count="$( + ANYSCAN_RESERVE_INTERFACES="$synthetic" \ + bash -c ' + set -eu + # shellcheck source=/dev/null + # Pull only the helper definitions we need; running the whole + # script would try to apply tc. + sed -n "/^command_exists/,/^}$/p; /^detect_default_interface/,/^}$/p; /^resolve_managed_interfaces/,/^}$/p" "'"$RESERVE_SH"'" > /tmp/reserve-helpers.sh + # shellcheck source=/dev/null + . /tmp/reserve-helpers.sh + resolve_managed_interfaces | sort -u | wc -l + ' +)" +[ "$count" = "15" ] || fail "reserve-control-bandwidth.sh resolve_managed_interfaces returned $count entries, expected 15" +printf 'PASS: resolve_managed_interfaces handles 15 ENIs (got %s)\n' "$count" + +# Same for tune-scanner-host.sh resolve_ifaces. +count="$( + ANYSCAN_TUNE_INTERFACES="$synthetic" \ + bash -c ' + set -eu + sed -n "/^command_exists/,/^}$/p; /^detect_default_interface/,/^}$/p; /^resolve_ifaces/,/^}$/p" "'"$TUNE_SH"'" > /tmp/tune-helpers.sh + # shellcheck source=/dev/null + . /tmp/tune-helpers.sh + resolve_ifaces | sort -u | wc -l + ' +)" +[ "$count" = "15" ] || fail "tune-scanner-host.sh resolve_ifaces returned $count entries, expected 15" +printf 'PASS: tune-scanner-host.sh resolve_ifaces handles 15 ENIs (got %s)\n' "$count" + +# Confirm install-worker-bundle.sh's "more than one entry" gate triggers +# correctly for a 15-entry list (regression guard for the comma-strip +# trick at lines 314-316). +multi="$( + bash -c ' + cands="'"$synthetic"'" + if [ -n "$cands" ] && [ "$cands" != "${cands%,*}" ]; then + echo yes + else + echo no + fi + ' +)" +[ "$multi" = "yes" ] || fail "install-worker-bundle.sh multi-NIC gate did not trigger for 15-entry list" +printf 'PASS: install-worker-bundle.sh multi-NIC gate triggers for 15-entry list\n' + +printf '\nAll multi-ENI iteration sanity checks passed.\n' diff --git a/tools/test_ec2_worker_manager.py b/tools/test_ec2_worker_manager.py new file mode 100644 index 0000000..9a523a0 --- /dev/null +++ b/tools/test_ec2_worker_manager.py @@ -0,0 +1,529 @@ +"""Unit tests for the multi-ENI launch path in ec2_worker_manager.py. + +Covers env-var parsing, hardware-cap detection, ENI distribution across +network cards, and the recreate_instance launch payload. boto3 is stubbed +out at import time so the tests run without AWS credentials. + +Run from the anyscan repo root: + + python3 -m unittest tools.test_ec2_worker_manager -v +""" + +from __future__ import annotations + +import importlib.util +import os +import sys +import unittest +from pathlib import Path +from typing import Any +from unittest import mock + + +REPO_ROOT = Path(__file__).resolve().parent.parent +MODULE_PATH = REPO_ROOT / "tools" / "ec2_worker_manager.py" + + +class _StubClientError(Exception): + """Stand-in for botocore.exceptions.ClientError used by the module.""" + + def __init__(self, response: dict[str, Any]) -> None: + super().__init__(response.get("Error", {}).get("Message", "stub")) + self.response = response + + +def _load_module(): + if "ec2_worker_manager" in sys.modules: + del sys.modules["ec2_worker_manager"] + boto3_stub = mock.MagicMock() + botocore_stub = mock.MagicMock() + botocore_exc = mock.MagicMock() + botocore_exc.ClientError = _StubClientError + sys.modules["boto3"] = boto3_stub + sys.modules["botocore"] = botocore_stub + sys.modules["botocore.exceptions"] = botocore_exc + botocore_stub.exceptions = botocore_exc + + spec = importlib.util.spec_from_file_location( + "ec2_worker_manager", MODULE_PATH + ) + assert spec is not None and spec.loader is not None + module = importlib.util.module_from_spec(spec) + sys.modules["ec2_worker_manager"] = module + spec.loader.exec_module(module) + return module + + +m = _load_module() + + +def _c6in_metal_describe() -> dict[str, Any]: + """The DescribeInstanceTypes payload AWS returns for c6in.metal.""" + return { + "InstanceTypes": [ + { + "InstanceType": "c6in.metal", + "NetworkInfo": { + "MaximumNetworkInterfaces": 15, + "MaximumNetworkCards": 4, + "NetworkCards": [ + {"NetworkCardIndex": 0, "MaximumNetworkInterfaces": 5}, + {"NetworkCardIndex": 1, "MaximumNetworkInterfaces": 4}, + {"NetworkCardIndex": 2, "MaximumNetworkInterfaces": 3}, + {"NetworkCardIndex": 3, "MaximumNetworkInterfaces": 3}, + ], + }, + } + ] + } + + +def _c6in_xlarge_describe() -> dict[str, Any]: + return { + "InstanceTypes": [ + { + "InstanceType": "c6in.xlarge", + "NetworkInfo": { + "MaximumNetworkInterfaces": 4, + "NetworkCards": [ + {"NetworkCardIndex": 0, "MaximumNetworkInterfaces": 4} + ], + }, + } + ] + } + + +class ParseMaxEnisTests(unittest.TestCase): + """ANYSCAN_MAX_ENIS env parsing — explicit opt-in is the only signal.""" + + def test_unset_returns_none(self): + self.assertIsNone(m.parse_max_enis(None)) + + def test_empty_string_returns_none(self): + self.assertIsNone(m.parse_max_enis("")) + self.assertIsNone(m.parse_max_enis(" ")) + + def test_positive_integer_parses(self): + self.assertEqual(m.parse_max_enis("15"), 15) + self.assertEqual(m.parse_max_enis(" 4 "), 4) + + def test_zero_raises(self): + with self.assertRaises(SystemExit): + m.parse_max_enis("0") + + def test_negative_raises(self): + with self.assertRaises(SystemExit): + m.parse_max_enis("-5") + + def test_non_integer_raises(self): + with self.assertRaises(SystemExit): + m.parse_max_enis("fifteen") + + +class ComputeTargetEniCountTests(unittest.TestCase): + """target = 1 unless operator opted in; otherwise min(hw_cap, max_enis).""" + + def test_unset_max_enis_keeps_single_nic(self): + self.assertEqual(m.compute_target_eni_count(15, None), 1) + + def test_clamped_to_hardware_cap(self): + self.assertEqual(m.compute_target_eni_count(4, 15), 4) + + def test_below_hardware_cap_passes_through(self): + self.assertEqual(m.compute_target_eni_count(15, 8), 8) + + def test_equal_to_hardware_cap_passes_through(self): + self.assertEqual(m.compute_target_eni_count(15, 15), 15) + + def test_zero_or_negative_hardware_cap_floors_at_one(self): + self.assertEqual(m.compute_target_eni_count(0, 15), 1) + self.assertEqual(m.compute_target_eni_count(-3, 15), 1) + + +class EniCapFromDescribeResponseTests(unittest.TestCase): + """eni_cap_from_describe_response handles AWS payload edge cases.""" + + def test_c6in_metal_payload(self): + self.assertEqual(m.eni_cap_from_describe_response(_c6in_metal_describe()), 15) + + def test_c6in_xlarge_payload(self): + self.assertEqual(m.eni_cap_from_describe_response(_c6in_xlarge_describe()), 4) + + def test_missing_instance_types_returns_none(self): + self.assertIsNone(m.eni_cap_from_describe_response({})) + self.assertIsNone(m.eni_cap_from_describe_response({"InstanceTypes": []})) + + def test_missing_network_info_returns_none(self): + self.assertIsNone(m.eni_cap_from_describe_response({"InstanceTypes": [{}]})) + + def test_non_integer_cap_returns_none(self): + payload = {"InstanceTypes": [{"NetworkInfo": {"MaximumNetworkInterfaces": "15"}}]} + self.assertIsNone(m.eni_cap_from_describe_response(payload)) + + def test_zero_cap_returns_none(self): + payload = {"InstanceTypes": [{"NetworkInfo": {"MaximumNetworkInterfaces": 0}}]} + self.assertIsNone(m.eni_cap_from_describe_response(payload)) + + +class NetworkCardsFromDescribeResponseTests(unittest.TestCase): + def test_c6in_metal_payload(self): + cards = m.network_cards_from_describe_response(_c6in_metal_describe()) + self.assertEqual(len(cards), 4) + self.assertEqual( + sum(c["MaximumNetworkInterfaces"] for c in cards), 15 + ) + + def test_missing_returns_empty(self): + self.assertEqual(m.network_cards_from_describe_response({}), []) + self.assertEqual( + m.network_cards_from_describe_response({"InstanceTypes": [{}]}), [] + ) + + def test_drops_non_dict_entries(self): + payload = { + "InstanceTypes": [ + {"NetworkInfo": {"NetworkCards": [None, "garbage", {"NetworkCardIndex": 0, "MaximumNetworkInterfaces": 1}]}} + ] + } + cards = m.network_cards_from_describe_response(payload) + self.assertEqual(cards, [{"NetworkCardIndex": 0, "MaximumNetworkInterfaces": 1}]) + + +class DistributeEnisAcrossCardsTests(unittest.TestCase): + """ENI distribution preserves per-card capacity and round-robins across cards.""" + + def test_no_card_data_lays_out_sequentially_on_card_zero(self): + self.assertEqual( + m.distribute_enis_across_cards(3, []), + [(0, 0), (0, 1), (0, 2)], + ) + + def test_none_card_data_lays_out_sequentially_on_card_zero(self): + self.assertEqual( + m.distribute_enis_across_cards(2, None), + [(0, 0), (0, 1)], + ) + + def test_c6in_metal_15_eni_layout_respects_per_card_caps(self): + cards = _c6in_metal_describe()["InstanceTypes"][0]["NetworkInfo"][ + "NetworkCards" + ] + placement = m.distribute_enis_across_cards(15, cards) + self.assertEqual(len(placement), 15) + per_card: dict[int, list[int]] = {} + for card_idx, dev_idx in placement: + per_card.setdefault(card_idx, []).append(dev_idx) + # Each card should be at exactly its declared capacity for the + # full-15 case; the round-robin policy fills evenly. + self.assertEqual(per_card[0], [0, 1, 2, 3, 4]) + self.assertEqual(per_card[1], [0, 1, 2, 3]) + self.assertEqual(per_card[2], [0, 1, 2]) + self.assertEqual(per_card[3], [0, 1, 2]) + + def test_primary_eni_lands_on_card_zero(self): + cards = _c6in_metal_describe()["InstanceTypes"][0]["NetworkInfo"][ + "NetworkCards" + ] + placement = m.distribute_enis_across_cards(1, cards) + self.assertEqual(placement, [(0, 0)]) + + def test_partial_count_round_robins_across_cards(self): + cards = _c6in_metal_describe()["InstanceTypes"][0]["NetworkInfo"][ + "NetworkCards" + ] + # 4 ENIs across 4 cards: one per card. + placement = m.distribute_enis_across_cards(4, cards) + self.assertEqual( + sorted(card_idx for card_idx, _ in placement), + [0, 1, 2, 3], + ) + + def test_zero_target_raises(self): + with self.assertRaises(ValueError): + m.distribute_enis_across_cards(0, None) + + +class BuildNetworkInterfacesTests(unittest.TestCase): + """The boto3 NetworkInterfaces parameter shape.""" + + def test_legacy_single_card_path_omits_network_card_index(self): + ifs = m.build_network_interfaces( + target_count=2, + subnet_ids=["subnet-aaa"], + security_group_ids=["sg-1"], + network_cards=None, + ) + self.assertEqual(len(ifs), 2) + for spec in ifs: + self.assertNotIn("NetworkCardIndex", spec) + self.assertEqual(spec["SubnetId"], "subnet-aaa") + self.assertEqual(spec["Groups"], ["sg-1"]) + self.assertEqual([s["DeviceIndex"] for s in ifs], [0, 1]) + + def test_multi_card_path_emits_network_card_index(self): + cards = _c6in_metal_describe()["InstanceTypes"][0]["NetworkInfo"][ + "NetworkCards" + ] + ifs = m.build_network_interfaces( + target_count=15, + subnet_ids=["subnet-aaa"], + security_group_ids=["sg-1"], + network_cards=cards, + ) + self.assertEqual(len(ifs), 15) + for spec in ifs: + self.assertIn("NetworkCardIndex", spec) + + def test_subnet_pool_round_robins(self): + ifs = m.build_network_interfaces( + target_count=4, + subnet_ids=["subnet-A", "subnet-B"], + security_group_ids=["sg-1"], + ) + self.assertEqual( + [s["SubnetId"] for s in ifs], + ["subnet-A", "subnet-B", "subnet-A", "subnet-B"], + ) + + def test_empty_subnet_pool_raises(self): + with self.assertRaises(ValueError): + m.build_network_interfaces( + target_count=2, + subnet_ids=[], + security_group_ids=["sg-1"], + ) + + def test_no_security_groups_omits_groups_key(self): + ifs = m.build_network_interfaces( + target_count=1, + subnet_ids=["subnet-aaa"], + security_group_ids=[], + ) + self.assertNotIn("Groups", ifs[0]) + + +def _make_config(**overrides) -> Any: + base = dict( + region="us-east-1", + instance_type="c6in.metal", + ami_id="ami-fake", + key_name="anyscan-ec2-us-east-1", + ssh_private_key_path=Path("/tmp/anyscan-fake.key"), + subnet_id="subnet-primary", + security_group_id="sg-1", + instance_profile_arn=None, + worker_name="anyscan-ec2-worker", + worker_pool=None, + worker_tags=["anyscan-ec2-managed"], + control_plane_url="http://127.0.0.1:8088", + control_plane_username="admin", + control_plane_password=None, + install_url="https://example.invalid/install.sh", + google_healthcheck_url="https://example.invalid/healthz", + state_file=Path("/tmp/anyscan-fake-state.json"), + instance_id=None, + ssh_username="admin", + ssh_source_cidr=None, + auto_authorize_ssh=False, + bootstrap_grace_seconds=900, + worker_stale_seconds=90, + loop_interval_seconds=60, + max_enis=None, + eni_subnet_ids=[], + ) + base.update(overrides) + return m.ManagerConfig(**base) + + +class _FakeEc2Client: + """Minimal in-memory EC2 stub recording RunInstances calls.""" + + def __init__(self, describe_payload: dict[str, Any] | None = None) -> None: + self.describe_payload = describe_payload + self.run_instances_calls: list[dict[str, Any]] = [] + self.terminate_calls: list[list[str]] = [] + + def describe_instance_types(self, *, InstanceTypes): + if self.describe_payload is None: + raise _StubClientError( + {"Error": {"Code": "UnauthorizedOperation", "Message": "no IAM"}} + ) + return self.describe_payload + + def run_instances(self, **kwargs): + self.run_instances_calls.append(kwargs) + return {"Instances": [{"InstanceId": "i-fake-123"}]} + + def terminate_instances(self, *, InstanceIds): + self.terminate_calls.append(list(InstanceIds)) + return {"TerminatingInstances": [{"InstanceId": InstanceIds[0]}]} + + # describe_instances + status calls happen in current_instance_id / + # _describe_instance; the tests below stub current_instance_id() to + # return None so those paths never fire. + + +def _make_manager(config: Any, ec2_client: _FakeEc2Client) -> Any: + """Construct an Ec2WorkerManager bypassing __init__ side-effects.""" + manager = m.Ec2WorkerManager.__new__(m.Ec2WorkerManager) + manager.config = config + manager.state = {} + manager.ec2 = ec2_client + manager.session = mock.MagicMock() + manager.ec2_resource = mock.MagicMock() + manager.sts = mock.MagicMock() + manager.api = mock.MagicMock() + manager._save_state = mock.MagicMock() + manager._load_state = mock.MagicMock(return_value={}) + return manager + + +class RecreateInstanceLaunchPathTests(unittest.TestCase): + """Multi-ENI vs single-NIC RunInstances payload selection.""" + + def test_unset_preserves_legacy_single_nic_payload(self): + """ANYSCAN_MAX_ENIS unset → legacy SubnetId/SecurityGroupIds shape.""" + config = _make_config(max_enis=None) + ec2 = _FakeEc2Client(describe_payload=_c6in_metal_describe()) + manager = _make_manager(config, ec2) + + with mock.patch.object(manager, "current_instance_id", return_value=None), \ + mock.patch.object(manager, "ensure_ssh_access", return_value=None), \ + mock.patch.object(manager, "build_user_data", return_value="#!fake"): + result = manager.recreate_instance() + + self.assertEqual(len(ec2.run_instances_calls), 1) + call = ec2.run_instances_calls[0] + self.assertNotIn("NetworkInterfaces", call) + self.assertEqual(call.get("SubnetId"), "subnet-primary") + self.assertEqual(call.get("SecurityGroupIds"), ["sg-1"]) + self.assertEqual(result["eni_attach"]["requested"], None) + # Describe should not be called when the operator hasn't opted in. + self.assertEqual(ec2.describe_payload, _c6in_metal_describe()) # untouched + + def test_max_enis_15_on_c6in_metal_emits_15_network_interfaces(self): + config = _make_config(max_enis=15, instance_type="c6in.metal") + ec2 = _FakeEc2Client(describe_payload=_c6in_metal_describe()) + manager = _make_manager(config, ec2) + + with mock.patch.object(manager, "current_instance_id", return_value=None), \ + mock.patch.object(manager, "ensure_ssh_access", return_value=None), \ + mock.patch.object(manager, "build_user_data", return_value="#!fake"): + result = manager.recreate_instance() + + call = ec2.run_instances_calls[0] + self.assertNotIn("SubnetId", call) + self.assertNotIn("SecurityGroupIds", call) + self.assertIn("NetworkInterfaces", call) + self.assertEqual(len(call["NetworkInterfaces"]), 15) + # Every NIC carries the security group on its Groups key. + for nic in call["NetworkInterfaces"]: + self.assertEqual(nic["Groups"], ["sg-1"]) + self.assertEqual(nic["SubnetId"], "subnet-primary") + self.assertIn("NetworkCardIndex", nic) + self.assertEqual(result["eni_attach"]["attached"], 15) + self.assertEqual(result["eni_attach"]["hardware_cap"], 15) + + def test_max_enis_15_on_c6in_xlarge_clamps_to_4(self): + config = _make_config(max_enis=15, instance_type="c6in.xlarge") + ec2 = _FakeEc2Client(describe_payload=_c6in_xlarge_describe()) + manager = _make_manager(config, ec2) + + with mock.patch.object(manager, "current_instance_id", return_value=None), \ + mock.patch.object(manager, "ensure_ssh_access", return_value=None), \ + mock.patch.object(manager, "build_user_data", return_value="#!fake"): + result = manager.recreate_instance() + + call = ec2.run_instances_calls[0] + self.assertEqual(len(call["NetworkInterfaces"]), 4) + self.assertEqual(result["eni_attach"]["attached"], 4) + self.assertEqual(result["eni_attach"]["hardware_cap"], 4) + + def test_describe_failure_falls_back_to_single_nic(self): + """Describe denial → single-NIC launch + reason recorded.""" + config = _make_config(max_enis=15, instance_type="c6in.metal") + ec2 = _FakeEc2Client(describe_payload=None) # raises ClientError + manager = _make_manager(config, ec2) + + with mock.patch.object(manager, "current_instance_id", return_value=None), \ + mock.patch.object(manager, "ensure_ssh_access", return_value=None), \ + mock.patch.object(manager, "build_user_data", return_value="#!fake"): + result = manager.recreate_instance() + + call = ec2.run_instances_calls[0] + self.assertNotIn("NetworkInterfaces", call) + self.assertEqual(call.get("SubnetId"), "subnet-primary") + self.assertIn("fallback_reason", result["eni_attach"]) + + def test_max_enis_set_but_no_subnet_falls_back_to_single_nic(self): + config = _make_config( + max_enis=15, instance_type="c6in.metal", subnet_id=None, eni_subnet_ids=[] + ) + ec2 = _FakeEc2Client(describe_payload=_c6in_metal_describe()) + manager = _make_manager(config, ec2) + + with mock.patch.object(manager, "current_instance_id", return_value=None), \ + mock.patch.object(manager, "ensure_ssh_access", return_value=None), \ + mock.patch.object(manager, "build_user_data", return_value="#!fake"): + result = manager.recreate_instance() + + call = ec2.run_instances_calls[0] + self.assertNotIn("NetworkInterfaces", call) + self.assertIn("fallback_reason", result["eni_attach"]) + + def test_eni_subnet_ids_pool_round_robins(self): + config = _make_config( + max_enis=4, + instance_type="c6in.metal", + eni_subnet_ids=["subnet-X", "subnet-Y"], + ) + ec2 = _FakeEc2Client(describe_payload=_c6in_metal_describe()) + manager = _make_manager(config, ec2) + + with mock.patch.object(manager, "current_instance_id", return_value=None), \ + mock.patch.object(manager, "ensure_ssh_access", return_value=None), \ + mock.patch.object(manager, "build_user_data", return_value="#!fake"): + manager.recreate_instance() + + call = ec2.run_instances_calls[0] + subnet_sequence = [nic["SubnetId"] for nic in call["NetworkInterfaces"]] + self.assertEqual( + subnet_sequence, ["subnet-X", "subnet-Y", "subnet-X", "subnet-Y"] + ) + + +class FromEnvIntegrationTests(unittest.TestCase): + """ManagerConfig.from_env honors ANYSCAN_MAX_ENIS / ENI_SUBNET_IDS.""" + + def setUp(self) -> None: + self._snapshot = dict(os.environ) + # Ensure required env is set so from_env doesn't SystemExit on AMI lookup. + os.environ["ANYSCAN_EC2_REGION"] = "us-east-1" + os.environ["ANYSCAN_EC2_AMI_ID"] = "ami-fake" + + def tearDown(self) -> None: + os.environ.clear() + os.environ.update(self._snapshot) + + def test_unset_max_enis(self): + os.environ.pop("ANYSCAN_MAX_ENIS", None) + cfg = m.ManagerConfig.from_env() + self.assertIsNone(cfg.max_enis) + self.assertEqual(cfg.eni_subnet_ids, []) + + def test_set_max_enis(self): + os.environ["ANYSCAN_MAX_ENIS"] = "15" + os.environ["ANYSCAN_EC2_ENI_SUBNET_IDS"] = "subnet-A, subnet-B" + cfg = m.ManagerConfig.from_env() + self.assertEqual(cfg.max_enis, 15) + self.assertEqual(cfg.eni_subnet_ids, ["subnet-A", "subnet-B"]) + + def test_invalid_max_enis_exits(self): + os.environ["ANYSCAN_MAX_ENIS"] = "0" + with self.assertRaises(SystemExit): + m.ManagerConfig.from_env() + + +if __name__ == "__main__": + unittest.main()