diff --git a/anyscan_rate_controller.py b/anyscan_rate_controller.py index 9a8acaa..3766ea4 100644 --- a/anyscan_rate_controller.py +++ b/anyscan_rate_controller.py @@ -21,6 +21,7 @@ import json import os +import re import subprocess import sys import threading @@ -39,9 +40,34 @@ DEFAULT_ACHIEVED_RATIO_FLOOR = 0.9 DEFAULT_CALIBRATION_PATH = Path("/var/lib/agentd/rate-calibration.json") DEFAULT_TERMINATE_GRACE_SECONDS = 5 +# Loadavg / vcpu ratio above which we treat the host as CPU-saturated. +# A 1-min loadavg per vCPU >= 0.8 means the run-queue is sized close to +# the available CPUs; combined with a heartbeat-jitter signal that means +# our process is starved, not the NIC. +DEFAULT_CPU_LOAD_THRESHOLD = 0.8 +# Below this drop ratio we treat the few packets the kernel dropped as +# noise rather than evidence of NIC saturation, so when CPU pressure is +# also present we attribute the slip to CPU. 0.001 = 0.1% of tx_packets. +DEFAULT_DROP_RATIO_THRESHOLD = 0.001 +# Default DMI path Linux uses to expose the system product name. On AWS +# bare-metal hosts (c6in.metal et al) this often contains the actual +# instance type; on EC2 VMs it usually contains "Amazon EC2" and we have +# to fall back to IMDS for the type. +DEFAULT_DMI_PRODUCT_PATH = "/sys/devices/virtual/dmi/id/product_name" +DEFAULT_IMDS_TIMEOUT_SECONDS = 1.0 CLEAN = "clean" -SLIP = "slip" +# Network-side slip: kernel TX queue overflow, rate-limit overhead, or +# under-achieved rate without a CPU explanation. Response: shrink rate. +SLIP_NETWORK = "slip_network" +# Host-CPU starvation: heartbeat slip co-occurs with high loadavg/vcpu. +# Response: leave the rate alone — shrinking it does not free CPU and +# wastes the headroom we already converged to. +SLIP_CPU = "slip_cpu" +# Backwards-compatibility alias. The pre-PR controller only emitted a +# single SLIP value (semantically equivalent to SLIP_NETWORK), and +# external callers + tests still compare against it. +SLIP = SLIP_NETWORK @dataclass(frozen=True) @@ -55,6 +81,8 @@ class AimdPolicy: achieved_ratio_floor: float = DEFAULT_ACHIEVED_RATIO_FLOOR heartbeat_latency_threshold_ms: int = DEFAULT_HEARTBEAT_LATENCY_THRESHOLD_MS window_seconds: int = DEFAULT_WINDOW_SECONDS + cpu_load_threshold: float = DEFAULT_CPU_LOAD_THRESHOLD + drop_ratio_threshold: float = DEFAULT_DROP_RATIO_THRESHOLD def __post_init__(self) -> None: if self.floor <= 0: @@ -71,6 +99,10 @@ def __post_init__(self) -> None: raise ValueError("heartbeat_latency_threshold_ms must be > 0") if self.window_seconds <= 0: raise ValueError("window_seconds must be > 0") + if self.cpu_load_threshold <= 0: + raise ValueError("cpu_load_threshold must be > 0") + if not 0.0 <= self.drop_ratio_threshold <= 1.0: + raise ValueError("drop_ratio_threshold must be in [0, 1]") @dataclass(frozen=True) @@ -108,44 +140,89 @@ def __init__(self, exit_code: int, *, window_index: int, set_rate: int) -> None: def classify_window( measurement: WindowMeasurement, policy: AimdPolicy, + *, + system_load: Optional["SystemLoad"] = None, ) -> str: - """Return ``CLEAN`` or ``SLIP`` for the supplied window measurement. + """Return ``CLEAN``, ``SLIP_NETWORK``, or ``SLIP_CPU`` for a window. A window is clean iff the kernel is not dropping packets, the host can still service its own scheduler (no heartbeat slip), and the scanner is actually consuming the rate budget we set. The achieved-rate floor is skipped when the scanner finished naturally inside the window because that means it ran out of targets, not throttle. + + When ``system_load`` is supplied we further distinguish CPU-caused + slips (heartbeat slip co-occurring with a saturated run-queue) from + network-caused slips (kernel TX drops or rate-limit overhead). On CPU + slips the controller leaves the rate alone — halving it would not + free any CPU and would just waste the headroom we already learned. + When ``system_load`` is None we keep the legacy "any slip is network" + behavior so existing call sites and bundles without the loadavg + reader keep their pre-improvement semantics. """ - if measurement.tx_dropped_delta > 0: - return SLIP - if measurement.heartbeat_max_latency_ms > policy.heartbeat_latency_threshold_ms: - return SLIP - if measurement.scanner_finished_naturally: + network_drops = measurement.tx_dropped_delta > 0 + rate_starved = ( + not measurement.scanner_finished_naturally + and measurement.achieved_pps + 1e-9 + < policy.achieved_ratio_floor * float(measurement.set_rate) + ) + heartbeat_slip = ( + measurement.heartbeat_max_latency_ms > policy.heartbeat_latency_threshold_ms + ) + + if system_load is None: + if network_drops or heartbeat_slip or rate_starved: + return SLIP_NETWORK return CLEAN - threshold = policy.achieved_ratio_floor * float(measurement.set_rate) - if measurement.achieved_pps + 1e-9 < threshold: - return SLIP - return CLEAN + + cpu_saturated = system_load.load_average_per_vcpu > policy.cpu_load_threshold + cpu_pressure = cpu_saturated and heartbeat_slip + network_pressure = network_drops or rate_starved or (heartbeat_slip and not cpu_saturated) + + if not cpu_pressure and not network_pressure: + return CLEAN + if cpu_pressure and not network_pressure: + return SLIP_CPU + if network_pressure and not cpu_pressure: + return SLIP_NETWORK + + # Both signals firing — pick the dominant cause. A non-trivial drop + # ratio means the NIC is genuinely overrun (shrink rate); pure + # rate-starvation with no drops on a saturated host points at CPU + # contention starving the scanner threads, not the wire. + drop_ratio = ( + measurement.tx_dropped_delta / measurement.tx_packets_delta + if measurement.tx_packets_delta > 0 + else 0.0 + ) + if drop_ratio > policy.drop_ratio_threshold: + return SLIP_NETWORK + return SLIP_CPU def compute_next_rate( policy: AimdPolicy, current_rate: int, measurement: WindowMeasurement, + *, + system_load: Optional["SystemLoad"] = None, ) -> int: """Return the rate to use for the next window. - Clean -> additive bump capped at ceiling. Slip -> multiplicative shrink - floored at policy.floor. Clamped on both sides regardless of the - starting point so a misconfigured ``current_rate`` cannot escape the - bounds. + Clean -> additive bump capped at ceiling. SLIP_NETWORK -> multiplicative + shrink floored at policy.floor. SLIP_CPU -> rate is held; the right + response is to shed subprocess concurrency (handled by the multi-NIC + parent), not to crater the rate we already learned. Clamped on both + sides regardless of the starting point so a misconfigured + ``current_rate`` cannot escape the bounds. """ - classification = classify_window(measurement, policy) + classification = classify_window(measurement, policy, system_load=system_load) if classification == CLEAN: proposed = current_rate + policy.additive_step + elif classification == SLIP_CPU: + proposed = current_rate else: proposed = int(current_rate * policy.multiplicative_factor) return clamp_rate(proposed, policy) @@ -350,6 +427,254 @@ def _read_counter(self, name: str) -> int: return 0 +# --------------------------------------------------------------------------- +# System load (for CPU-vs-network slip distinction) +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class SystemLoad: + """Snapshot of the host's run-queue pressure relative to its vCPU count. + + Sampled once per AIMD window so :func:`classify_window` can tell a + CPU-starved process apart from a NIC-saturated one. Both can produce + heartbeat slip; only the network case responds to shrinking the rate. + """ + + load_average_1min: float + vcpu_count: int + + @property + def load_average_per_vcpu(self) -> float: + if self.vcpu_count <= 0: + return self.load_average_1min + return self.load_average_1min / float(self.vcpu_count) + + +class SystemLoadReader: + """Reads /proc/loadavg and reports loadavg-per-vcpu. + + The vCPU count is captured once at construction time (default + ``os.cpu_count()``) because the topology is stable for the lifetime + of a worker process; the loadavg field is re-read on every + :meth:`read` call so the controller observes contention on a + per-window basis. Tests inject ``loadavg_path`` and ``vcpu_count`` + so they don't depend on the host's actual /proc. + """ + + def __init__( + self, + *, + loadavg_path: os.PathLike[str] | str = "/proc/loadavg", + vcpu_count: Optional[int] = None, + ) -> None: + self._loadavg_path = Path(loadavg_path) + self._vcpu_count = ( + vcpu_count if vcpu_count is not None and vcpu_count > 0 else max(1, os.cpu_count() or 1) + ) + + @property + def vcpu_count(self) -> int: + return self._vcpu_count + + def read(self) -> SystemLoad: + try: + raw = self._loadavg_path.read_text().strip() + except OSError: + return SystemLoad(0.0, self._vcpu_count) + parts = raw.split() + if not parts: + return SystemLoad(0.0, self._vcpu_count) + try: + load = float(parts[0]) + except ValueError: + return SystemLoad(0.0, self._vcpu_count) + if load < 0: + load = 0.0 + return SystemLoad(load, self._vcpu_count) + + +# --------------------------------------------------------------------------- +# Per-instance defaults +# +# Different AWS instance classes have wildly different natural pps ceilings +# — c6in.xlarge tops out around 1.7M, c6in.metal delivers 12M+ at 1-NIC +# (anygpt-4 bench data). Starting every host at the conservative 500k seed +# means c6in.metal wastes 4-6 windows ramping; pinning the seed and +# ceiling per-class skips that ramp and lets the controller settle into +# the correct band on window 1. +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class InstanceDefaults: + """Per-class starting rate and AIMD bounds. + + ``starting_rate`` seeds the controller when there is no per-interface + calibration to reuse. ``floor`` and ``ceiling`` widen / clamp the + AIMD band so the controller can both ramp up to the host's natural + ceiling and shrink down to a useful floor on slip. + """ + + starting_rate: int + floor: int + ceiling: int + + +# Conservative table; ceilings are kernel-TX bounds we have measured (or +# estimated linearly within a class), not ENA spec maxima. Anything not +# listed falls through to DEFAULT_FLOOR/DEFAULT_CEILING and the +# operator-supplied SCANNER_DEFAULT_RATE. +INSTANCE_TYPE_DEFAULTS: dict[str, InstanceDefaults] = { + "m5.xlarge": InstanceDefaults(200_000, 100_000, 1_000_000), + "m5.2xlarge": InstanceDefaults(400_000, 100_000, 1_500_000), + "c6in.xlarge": InstanceDefaults(500_000, 100_000, 2_000_000), + "c6in.2xlarge": InstanceDefaults(1_000_000, 100_000, 4_000_000), + "c6in.4xlarge": InstanceDefaults(1_500_000, 200_000, 6_000_000), + "c6in.8xlarge": InstanceDefaults(3_000_000, 500_000, 8_000_000), + "c6in.16xlarge": InstanceDefaults(3_500_000, 500_000, 10_000_000), + "c6in.32xlarge": InstanceDefaults(4_000_000, 1_000_000, 12_000_000), + "c6in.metal": InstanceDefaults(4_000_000, 1_000_000, 12_000_000), +} + + +_INSTANCE_TYPE_RE = re.compile(r"^[a-z0-9]+\.[a-z0-9]+$") + + +def _looks_like_instance_type(value: str) -> bool: + return bool(_INSTANCE_TYPE_RE.match(value)) + + +def detect_instance_type( + *, + env: Optional[Mapping[str, str]] = None, + dmi_path: os.PathLike[str] | str = DEFAULT_DMI_PRODUCT_PATH, + imds_fetcher: Optional[Callable[[], Optional[str]]] = None, +) -> Optional[str]: + """Best-effort detection of the EC2 instance type running this worker. + + Resolution order, first hit wins: + 1. ``ANYSCAN_INSTANCE_TYPE`` from ``env`` — explicit override for + tests, dev runs, and to skip IMDS round-trips on subsequent + child processes (the parent caches its detection there). + 2. ``/sys/devices/virtual/dmi/id/product_name``: on AWS bare-metal + hosts this often exposes the actual instance type. On VMs it + is usually ``Amazon EC2`` and we fall through. + 3. ``imds_fetcher()`` (defaults to a real IMDSv2 round-trip): the + authoritative source for VM instance types. Tightly bounded + timeout so a missing IMDS does not delay scanner startup. + + Returns ``None`` when no source produces a recognizable type. The + fetcher hook exists so unit tests do not have to monkey-patch + urllib. + """ + + env = env if env is not None else os.environ + explicit = env.get("ANYSCAN_INSTANCE_TYPE") + if explicit and explicit.strip(): + candidate = explicit.strip() + if _looks_like_instance_type(candidate): + return candidate + + try: + product = Path(dmi_path).read_text().strip() + except OSError: + product = "" + if product and _looks_like_instance_type(product): + return product + + fetcher = imds_fetcher if imds_fetcher is not None else _default_imds_fetcher + try: + fetched = fetcher() + except Exception: # noqa: BLE001 - best-effort detection + fetched = None + if fetched and _looks_like_instance_type(fetched.strip()): + return fetched.strip() + return None + + +def _default_imds_fetcher() -> Optional[str]: + """Real-world IMDSv2 round-trip used when no test fetcher is supplied.""" + + import urllib.error + import urllib.request + + timeout = DEFAULT_IMDS_TIMEOUT_SECONDS + try: + token_req = urllib.request.Request( + "http://169.254.169.254/latest/api/token", + method="PUT", + headers={"X-aws-ec2-metadata-token-ttl-seconds": "60"}, + ) + with urllib.request.urlopen(token_req, timeout=timeout) as resp: + token = resp.read().decode("ascii", errors="replace").strip() + except (urllib.error.URLError, OSError, ValueError): + return None + try: + type_req = urllib.request.Request( + "http://169.254.169.254/latest/meta-data/instance-type", + headers={"X-aws-ec2-metadata-token": token}, + ) + with urllib.request.urlopen(type_req, timeout=timeout) as resp: + return resp.read().decode("ascii", errors="replace").strip() + except (urllib.error.URLError, OSError, ValueError): + return None + + +def lookup_instance_defaults(instance_type: Optional[str]) -> Optional[InstanceDefaults]: + if not instance_type: + return None + return INSTANCE_TYPE_DEFAULTS.get(instance_type.strip()) + + +def apply_instance_defaults( + *, + policy: AimdPolicy, + fallback_rate: int, + instance_type: Optional[str], + env: Optional[Mapping[str, str]] = None, +) -> tuple[AimdPolicy, int]: + """Layer the per-instance defaults under any explicit env overrides. + + The contract is: env-supplied knobs always win. We only fill in floor, + ceiling, and starting (fallback) rate when the corresponding env var + is missing. This means an operator who pins ANYSCAN_RATE_CEILING by + hand still gets exactly that value on c6in.metal, while a stock host + silently picks up the table's c6in.metal ceiling. + """ + + defaults = lookup_instance_defaults(instance_type) + if defaults is None: + return policy, fallback_rate + + env = env if env is not None else os.environ + new_floor = ( + defaults.floor + if not _env_value_present(env, "ANYSCAN_RATE_FLOOR") + else policy.floor + ) + new_ceiling = ( + defaults.ceiling + if not _env_value_present(env, "ANYSCAN_RATE_CEILING") + else policy.ceiling + ) + if new_ceiling < new_floor: + new_ceiling = new_floor + + new_policy = replace(policy, floor=new_floor, ceiling=new_ceiling) + + if _env_value_present(env, "SCANNER_DEFAULT_RATE"): + new_starting = fallback_rate + else: + new_starting = defaults.starting_rate + return new_policy, new_starting + + +def _env_value_present(env: Mapping[str, str], key: str) -> bool: + raw = env.get(key) + return raw is not None and raw.strip() != "" + + # --------------------------------------------------------------------------- # Telemetry # --------------------------------------------------------------------------- @@ -396,6 +721,12 @@ class ControllerOptions: calibration: Optional[RateCalibrationStore] persist_on_clean: bool = True terminate_grace_seconds: float = DEFAULT_TERMINATE_GRACE_SECONDS + # Optional CPU-pressure source. Sampled once per window and passed + # into ``classify_window`` so the controller can hold rate when the + # slip is CPU-caused. When ``None`` (e.g. older bundles or test + # harnesses) the controller falls back to the legacy + # any-slip-is-network classification. + system_load_reader: Optional["SystemLoadReader"] = None class WindowRunner: @@ -436,43 +767,52 @@ def run(self) -> list[WindowReport]: rate = clamp_rate(self._options.starting_rate, self._options.policy) reports: list[WindowReport] = [] max_clean_rate = 0 + last_persisted_rate = 0 idx = 0 - while True: - idx += 1 - if self._max_windows is not None and idx > self._max_windows: - break - measurement = self._runner.run( - rate=rate, - window_seconds=self._options.window_seconds, - is_first_window=idx == 1, - ) - # Distinguish "scanner finished and exited cleanly" from - # "scanner crashed mid-window". The former is success and stops - # the loop; the latter is a hard failure that must propagate so - # we don't quietly respawn into the same broken state. - if ( - not measurement.scanner_finished_naturally - and measurement.scanner_exit_code != 0 - and measurement.elapsed_seconds < self._options.window_seconds - ): - raise ScannerWindowError( - measurement.scanner_exit_code, - window_index=idx, + try: + while True: + idx += 1 + if self._max_windows is not None and idx > self._max_windows: + break + measurement = self._runner.run( + rate=rate, + window_seconds=self._options.window_seconds, + is_first_window=idx == 1, + ) + # Distinguish "scanner finished and exited cleanly" from + # "scanner crashed mid-window". The former is success and stops + # the loop; the latter is a hard failure that must propagate so + # we don't quietly respawn into the same broken state. + if ( + not measurement.scanner_finished_naturally + and measurement.scanner_exit_code != 0 + and measurement.elapsed_seconds < self._options.window_seconds + ): + raise ScannerWindowError( + measurement.scanner_exit_code, + window_index=idx, + set_rate=rate, + ) + system_load = ( + self._options.system_load_reader.read() + if self._options.system_load_reader is not None + else None + ) + classification = classify_window( + measurement, self._options.policy, system_load=system_load + ) + next_rate = compute_next_rate( + self._options.policy, rate, measurement, system_load=system_load + ) + report = WindowReport( + index=idx, set_rate=rate, + measurement=measurement, + classification=classification, + next_rate=next_rate, ) - classification = classify_window(measurement, self._options.policy) - next_rate = compute_next_rate(self._options.policy, rate, measurement) - report = WindowReport( - index=idx, - set_rate=rate, - measurement=measurement, - classification=classification, - next_rate=next_rate, - ) - reports.append(report) - emit_metric( - "rate_adjustment", - { + reports.append(report) + metric_payload: dict[str, object] = { "window": idx, "set_rate": rate, "achieved_pps": int(measurement.achieved_pps), @@ -482,30 +822,62 @@ def run(self) -> list[WindowReport]: "classification": classification, "next_rate": next_rate, "interface": self._options.interface, - }, - sink=self._log_sink, - ) - if classification == CLEAN and rate > max_clean_rate: - max_clean_rate = rate - if measurement.scanner_finished_naturally: - break - rate = next_rate + } + if system_load is not None: + metric_payload["loadavg_per_vcpu"] = round( + system_load.load_average_per_vcpu, 3 + ) + metric_payload["vcpu_count"] = system_load.vcpu_count + emit_metric("rate_adjustment", metric_payload, sink=self._log_sink) + if classification == CLEAN and rate > max_clean_rate: + max_clean_rate = rate + # Persist on every clean window where we set a new + # high-water mark. PR #58 only wrote at end-of-run, so + # a scanner crash mid-loop dropped the calibration on + # the floor; this guarantees the learned rate + # survives even partial windows. + last_persisted_rate = self._maybe_persist_calibration( + max_clean_rate, last_persisted_rate + ) + if measurement.scanner_finished_naturally: + break + rate = next_rate + finally: + # Terminal persist regardless of how the loop exited (natural + # finish, max_windows cap, ScannerWindowError). Idempotent + # against the per-window writes above so we don't spam the + # store on a clean exit. + if max_clean_rate > 0: + self._maybe_persist_calibration(max_clean_rate, last_persisted_rate) + return reports + + def _maybe_persist_calibration( + self, learned_rate: int, last_persisted_rate: int + ) -> int: + """Best-effort write of ``learned_rate`` to the calibration store. - if ( + Returns the rate now reflected on disk (either the new write or + ``last_persisted_rate`` when the write was a no-op or failed). + Never raises; persistence is always best-effort. + """ + + if not ( self._options.persist_on_clean and self._options.calibration is not None and self._options.interface - and max_clean_rate > 0 + and learned_rate > last_persisted_rate ): - try: - self._options.calibration.store(self._options.interface, max_clean_rate) - except Exception as error: # noqa: BLE001 - best-effort persistence - emit_metric( - "calibration_persist_failed", - {"interface": self._options.interface, "error": str(error)}, - sink=self._log_sink, - ) - return reports + return last_persisted_rate + try: + self._options.calibration.store(self._options.interface, learned_rate) + return learned_rate + except Exception as error: # noqa: BLE001 - best-effort persistence + emit_metric( + "calibration_persist_failed", + {"interface": self._options.interface, "error": str(error)}, + sink=self._log_sink, + ) + return last_persisted_rate # --------------------------------------------------------------------------- @@ -637,6 +1009,16 @@ def policy_from_env(env: Mapping[str, str]) -> AimdPolicy: DEFAULT_HEARTBEAT_LATENCY_THRESHOLD_MS, ), window_seconds=_env_int(env, "ANYSCAN_RATE_WINDOW_SECONDS", DEFAULT_WINDOW_SECONDS), + cpu_load_threshold=_env_float( + env, + "ANYSCAN_CPU_LOAD_THRESHOLD", + DEFAULT_CPU_LOAD_THRESHOLD, + ), + drop_ratio_threshold=_env_float( + env, + "ANYSCAN_DROP_RATIO_THRESHOLD", + DEFAULT_DROP_RATIO_THRESHOLD, + ), ) @@ -695,6 +1077,13 @@ def resolve_starting_rate( "JitterMonitor", "NicStatsReader", "NicCounters", + "SystemLoad", + "SystemLoadReader", + "InstanceDefaults", + "INSTANCE_TYPE_DEFAULTS", + "detect_instance_type", + "lookup_instance_defaults", + "apply_instance_defaults", "SubprocessWindowRunner", "WindowRunner", "compute_next_rate", @@ -712,6 +1101,11 @@ def resolve_starting_rate( "DEFAULT_HEARTBEAT_LATENCY_THRESHOLD_MS", "DEFAULT_ACHIEVED_RATIO_FLOOR", "DEFAULT_CALIBRATION_PATH", + "DEFAULT_CPU_LOAD_THRESHOLD", + "DEFAULT_DROP_RATIO_THRESHOLD", + "DEFAULT_DMI_PRODUCT_PATH", "CLEAN", "SLIP", + "SLIP_NETWORK", + "SLIP_CPU", ] diff --git a/install-worker-bundle.sh b/install-worker-bundle.sh index da98675..d36cc8a 100755 --- a/install-worker-bundle.sh +++ b/install-worker-bundle.sh @@ -315,6 +315,14 @@ apply_host_resource_defaults() { && [ "$scanner_eni_candidates" != "${scanner_eni_candidates%,*}" ]; then upsert_env_value "ANYSCAN_SCANNER_INTERFACES" "$scanner_eni_candidates" "$RUNTIME_ENV_FILE" fi + # Subprocess concurrency cap for the multi-NIC parent. anygpt-4 + # c6in.metal bench data: 4-NIC sustained 12.8M aggregate pps; + # 8-NIC regressed to 1.3M because shards 5-8 CPU-starved the + # others. 4 is the kernel TX sweet spot regardless of NIC count. + # Operators can raise/lower per host class after install. + if [ -z "$(env_value "ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES" "$RUNTIME_ENV_FILE" || true)" ]; then + upsert_env_value "ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES" "4" "$RUNTIME_ENV_FILE" + fi fi # Defaults for the egress bandwidth reservation that ExecStartPre installs. diff --git a/runtime.worker.env.template b/runtime.worker.env.template index 0095825..7f285ba 100644 --- a/runtime.worker.env.template +++ b/runtime.worker.env.template @@ -65,6 +65,16 @@ POLL_INTERVAL_SECONDS=15 # in the installer environment to suppress auto-discovery. # ANYSCAN_SCANNER_INTERFACES=eth0,eth1,eth2,eth3,eth4,eth5,eth6,eth7 +# Cap on simultaneously active shard subprocesses spawned by the multi-NIC +# parent. anygpt-4 c6in.metal data: 4-NIC sustained 12.8M pps aggregate, +# 8-NIC regressed to 1.3M because shards 5-8 CPU-starved the others into +# AIMD-cratering on every window. The cap truncates ANYSCAN_SCANNER_INTERFACES +# to its first N entries so the adapter only fans out to N NICs even when +# more are attached; the remaining NICs sit idle for the scan but the +# kernel TX sweet spot is preserved. Set 0 or negative to disable the cap +# (legacy unbounded fan-out, NOT recommended on 8+ NIC hosts). +# ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES=4 + # Dynamic AIMD port-scan rate adjustment. The adapter respawns the bundled # scanner per window via its existing checkpoint+resume model, measures # achieved pps from /sys/class/net/$IFACE/statistics/tx_packets and @@ -81,6 +91,26 @@ POLL_INTERVAL_SECONDS=15 # ANYSCAN_RATE_ACHIEVED_RATIO_FLOOR=0.9 # achieved/set must clear this for clean # ANYSCAN_HEARTBEAT_LATENCY_THRESHOLD_MS=5000 # ANYSCAN_RATE_CALIBRATION_PATH=/var/lib/agentd/rate-calibration.json +# CPU-vs-network slip distinction. Heartbeat slip is ambiguous between +# CPU starvation (host pegged, scanner thread didn't get scheduled) and +# kernel TX overrun (NIC saturated). When loadavg/vcpu exceeds the +# threshold AND heartbeat slips, we attribute the slip to CPU and +# *hold* the rate instead of halving it — shrinking the rate doesn't +# free CPU and just wastes the headroom we already learned. The cap on +# concurrent shards (above) is the response that actually frees CPU. +# Both knobs default to values measured to keep us in the right +# classification at c6in.metal-class load profiles. +# ANYSCAN_CPU_LOAD_THRESHOLD=0.8 # loadavg/vcpu beyond which "CPU pressure" +# ANYSCAN_DROP_RATIO_THRESHOLD=0.001 # drop_ratio above this picks network as dominant +# Per-instance starting-rate / floor / ceiling table. Detected from +# /sys/devices/virtual/dmi/id/product_name first; on EC2 VMs that +# usually returns "Amazon EC2" so the adapter falls back to IMDSv2 +# (http://169.254.169.254/latest/meta-data/instance-type) with a +# 1s timeout. Set ANYSCAN_INSTANCE_TYPE to skip detection — the +# multi-NIC parent does this automatically so children inherit the +# resolved type without redoing IMDS. Operator overrides (any +# ANYSCAN_RATE_* knob set explicitly) always win over the table. +# ANYSCAN_INSTANCE_TYPE=c6in.metal # Egress bandwidth reservation (ExecStartPre runs reserve-control-bandwidth.sh). # Reserves a guaranteed slice of the NIC for control-plane traffic so a busy diff --git a/test_anyscan_rate_controller.py b/test_anyscan_rate_controller.py index 2fed5a9..f0e3631 100644 --- a/test_anyscan_rate_controller.py +++ b/test_anyscan_rate_controller.py @@ -453,5 +453,436 @@ def test_resolve_starting_rate_clamps_calibration_into_band(self) -> None: self.assertEqual(resolved, 4_000_000) +class CpuVsNetworkSlipTests(unittest.TestCase): + """Verify CPU-pressure scenarios stop cratering the rate (improvement #1).""" + + def setUp(self) -> None: + self.policy = rc.AimdPolicy() + + def _load(self, *, load: float, vcpu: int = 8) -> rc.SystemLoad: + return rc.SystemLoad(load_average_1min=load, vcpu_count=vcpu) + + def test_cpu_pressure_holds_rate_instead_of_halving(self) -> None: + # 8 vCPUs, load 8.5 (load/vcpu=1.06 > 0.8) AND heartbeat slip, + # tx_dropped=0, achieved == set: pure CPU starvation. + measurement = make_measurement( + set_rate=4_000_000, + achieved_pps=3_900_000, + tx_dropped_delta=0, + heartbeat_max_latency_ms=6_000, + ) + load = self._load(load=8.5, vcpu=8) + self.assertEqual( + rc.classify_window(measurement, self.policy, system_load=load), + rc.SLIP_CPU, + ) + self.assertEqual( + rc.compute_next_rate(self.policy, 4_000_000, measurement, system_load=load), + 4_000_000, # held, not halved + ) + + def test_network_pressure_still_halves_rate_with_low_load(self) -> None: + # tx_dropped > 0, load low (no CPU pressure): pure network slip. + measurement = make_measurement( + set_rate=4_000_000, + achieved_pps=3_900_000, + tx_dropped_delta=10_000, + heartbeat_max_latency_ms=200, + ) + load = self._load(load=2.0, vcpu=8) # 0.25 per vcpu, well under threshold + self.assertEqual( + rc.classify_window(measurement, self.policy, system_load=load), + rc.SLIP_NETWORK, + ) + self.assertEqual( + rc.compute_next_rate(self.policy, 4_000_000, measurement, system_load=load), + 2_000_000, # halved + ) + + def test_both_pressure_picks_dominant_via_drop_ratio(self) -> None: + # Significant drop ratio: NIC genuinely overrun, network is dominant + # cause even though CPU is also pegged. Halve. + big_drops = make_measurement( + set_rate=4_000_000, + achieved_pps=3_900_000, + tx_dropped_delta=200_000, # 200k of ~117M packets — 0.17% > 0.1% threshold + heartbeat_max_latency_ms=6_000, + ) + load = self._load(load=8.5, vcpu=8) + self.assertEqual( + rc.classify_window(big_drops, self.policy, system_load=load), + rc.SLIP_NETWORK, + ) + # Trivial drop ratio: drops are noise from the AIMD probe; CPU is + # actually the dominant cause. Hold rate. + tiny_drops = make_measurement( + set_rate=4_000_000, + achieved_pps=3_900_000, + tx_dropped_delta=10, # 10 of 117M = 8.5e-8 << threshold + heartbeat_max_latency_ms=6_000, + ) + self.assertEqual( + rc.classify_window(tiny_drops, self.policy, system_load=load), + rc.SLIP_CPU, + ) + + def test_legacy_classify_when_no_system_load_supplied(self) -> None: + # Drops + heartbeat, no system_load -> legacy SLIP/SLIP_NETWORK. + # Ensures bundles without the loadavg reader keep their pre-PR + # behavior. Asserted both via the constant alias and the new name. + measurement = make_measurement( + set_rate=2_000_000, + achieved_pps=1_900_000, + tx_dropped_delta=42, + ) + self.assertEqual(rc.classify_window(measurement, self.policy), rc.SLIP) + self.assertEqual(rc.classify_window(measurement, self.policy), rc.SLIP_NETWORK) + self.assertEqual( + rc.compute_next_rate(self.policy, 2_000_000, measurement), + 1_000_000, + ) + + def test_low_load_with_heartbeat_jitter_classified_network(self) -> None: + # Load is low but heartbeat slipped — unusual, but legacy behavior + # treats this as network-style slip (rate-side response). Preserve. + measurement = make_measurement( + set_rate=2_000_000, + achieved_pps=1_900_000, + heartbeat_max_latency_ms=6_000, + ) + load = self._load(load=1.0, vcpu=8) # 0.125 per vcpu + self.assertEqual( + rc.classify_window(measurement, self.policy, system_load=load), + rc.SLIP_NETWORK, + ) + + def test_drops_with_low_load_classified_network(self) -> None: + # Drops with no CPU pressure: classic NIC saturation, halve. + measurement = make_measurement( + set_rate=2_000_000, + achieved_pps=1_950_000, + tx_dropped_delta=500, + heartbeat_max_latency_ms=200, + ) + load = self._load(load=1.0, vcpu=8) + self.assertEqual( + rc.classify_window(measurement, self.policy, system_load=load), + rc.SLIP_NETWORK, + ) + + def test_clean_window_with_load_info_still_clean(self) -> None: + # No drops, no jitter, achieved == set, even on a busy box: clean. + measurement = make_measurement(set_rate=1_000_000, achieved_pps=970_000) + load = self._load(load=10.0, vcpu=8) # busy but not slipping + self.assertEqual( + rc.classify_window(measurement, self.policy, system_load=load), + rc.CLEAN, + ) + + def test_invalid_policy_rejects_bad_thresholds(self) -> None: + with self.assertRaises(ValueError): + rc.AimdPolicy(cpu_load_threshold=0.0) + with self.assertRaises(ValueError): + rc.AimdPolicy(drop_ratio_threshold=-0.1) + with self.assertRaises(ValueError): + rc.AimdPolicy(drop_ratio_threshold=1.5) + + +class SystemLoadReaderTests(unittest.TestCase): + def test_reads_one_minute_load_from_synthetic_proc(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + path = Path(tmpdir) / "loadavg" + path.write_text("8.50 6.10 5.00 1/3000 1234\n") + reader = rc.SystemLoadReader(loadavg_path=path, vcpu_count=8) + load = reader.read() + self.assertAlmostEqual(load.load_average_1min, 8.5) + self.assertEqual(load.vcpu_count, 8) + self.assertAlmostEqual(load.load_average_per_vcpu, 8.5 / 8) + + def test_missing_proc_returns_zero_load(self) -> None: + reader = rc.SystemLoadReader(loadavg_path="/nonexistent/loadavg", vcpu_count=4) + load = reader.read() + self.assertEqual(load.load_average_1min, 0.0) + self.assertEqual(load.vcpu_count, 4) + + def test_corrupt_proc_returns_zero_load(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + path = Path(tmpdir) / "loadavg" + path.write_text("not-a-number\n") + reader = rc.SystemLoadReader(loadavg_path=path, vcpu_count=2) + self.assertEqual(reader.read().load_average_1min, 0.0) + + def test_per_vcpu_handles_zero_vcpu_gracefully(self) -> None: + # Defense-in-depth: vcpu_count=0 must not divide-by-zero. + load = rc.SystemLoad(load_average_1min=4.0, vcpu_count=0) + self.assertEqual(load.load_average_per_vcpu, 4.0) + + +class InstanceDefaultsTests(unittest.TestCase): + def test_lookup_known_classes(self) -> None: + metal = rc.lookup_instance_defaults("c6in.metal") + self.assertEqual(metal.starting_rate, 4_000_000) + self.assertEqual(metal.ceiling, 12_000_000) + small = rc.lookup_instance_defaults("c6in.xlarge") + self.assertEqual(small.starting_rate, 500_000) + self.assertEqual(small.ceiling, 2_000_000) + m5 = rc.lookup_instance_defaults("m5.xlarge") + self.assertEqual(m5.starting_rate, 200_000) + + def test_lookup_unknown_returns_none(self) -> None: + self.assertIsNone(rc.lookup_instance_defaults("alien.42xlarge")) + self.assertIsNone(rc.lookup_instance_defaults("")) + self.assertIsNone(rc.lookup_instance_defaults(None)) + + def test_apply_fills_unset_knobs_for_metal(self) -> None: + policy = rc.AimdPolicy() # defaults + new_policy, new_starting = rc.apply_instance_defaults( + policy=policy, + fallback_rate=500_000, + instance_type="c6in.metal", + env={}, + ) + self.assertEqual(new_policy.floor, 1_000_000) + self.assertEqual(new_policy.ceiling, 12_000_000) + self.assertEqual(new_starting, 4_000_000) + + def test_apply_preserves_explicit_env_overrides(self) -> None: + policy = rc.policy_from_env( + { + "ANYSCAN_RATE_FLOOR": "200000", + "ANYSCAN_RATE_CEILING": "5000000", + } + ) + new_policy, new_starting = rc.apply_instance_defaults( + policy=policy, + fallback_rate=750_000, + instance_type="c6in.metal", + env={ + "ANYSCAN_RATE_FLOOR": "200000", + "ANYSCAN_RATE_CEILING": "5000000", + "SCANNER_DEFAULT_RATE": "750000", + }, + ) + self.assertEqual(new_policy.floor, 200_000) + self.assertEqual(new_policy.ceiling, 5_000_000) + self.assertEqual(new_starting, 750_000) + + def test_apply_partial_overrides_only_fills_missing(self) -> None: + # Operator pinned floor but not ceiling: floor stays at env value, + # ceiling gets the metal default. + policy = rc.policy_from_env({"ANYSCAN_RATE_FLOOR": "300000"}) + new_policy, new_starting = rc.apply_instance_defaults( + policy=policy, + fallback_rate=500_000, + instance_type="c6in.metal", + env={"ANYSCAN_RATE_FLOOR": "300000"}, + ) + self.assertEqual(new_policy.floor, 300_000) + self.assertEqual(new_policy.ceiling, 12_000_000) + self.assertEqual(new_starting, 4_000_000) + + def test_apply_unknown_instance_type_is_no_op(self) -> None: + policy = rc.AimdPolicy() + new_policy, new_starting = rc.apply_instance_defaults( + policy=policy, + fallback_rate=500_000, + instance_type=None, + env={}, + ) + self.assertEqual(new_policy, policy) + self.assertEqual(new_starting, 500_000) + + def test_detect_via_dmi_when_product_name_is_real(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + dmi = Path(tmpdir) / "product_name" + dmi.write_text("c6in.metal\n") + detected = rc.detect_instance_type( + env={}, + dmi_path=dmi, + imds_fetcher=lambda: None, + ) + self.assertEqual(detected, "c6in.metal") + + def test_detect_skips_dmi_amazon_ec2_falls_back_to_imds(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + dmi = Path(tmpdir) / "product_name" + dmi.write_text("Amazon EC2\n") + detected = rc.detect_instance_type( + env={}, + dmi_path=dmi, + imds_fetcher=lambda: "c6in.xlarge", + ) + self.assertEqual(detected, "c6in.xlarge") + + def test_detect_env_override_short_circuits(self) -> None: + # The IMDS fetcher MUST NOT run when env supplies the type. + called: list[bool] = [] + + def fetcher() -> str: + called.append(True) + return "c6in.xlarge" + + detected = rc.detect_instance_type( + env={"ANYSCAN_INSTANCE_TYPE": "c6in.metal"}, + dmi_path="/nonexistent", + imds_fetcher=fetcher, + ) + self.assertEqual(detected, "c6in.metal") + self.assertEqual(called, []) + + def test_detect_returns_none_when_all_sources_fail(self) -> None: + detected = rc.detect_instance_type( + env={}, + dmi_path="/nonexistent/path", + imds_fetcher=lambda: None, + ) + self.assertIsNone(detected) + + def test_detect_rejects_garbage_strings(self) -> None: + # Random product strings (e.g. on-prem hardware) must not match. + with tempfile.TemporaryDirectory() as tmpdir: + dmi = Path(tmpdir) / "product_name" + dmi.write_text("PowerEdge R740xd\n") + detected = rc.detect_instance_type( + env={}, + dmi_path=dmi, + imds_fetcher=lambda: None, + ) + self.assertIsNone(detected) + + +class PartialWindowCalibrationTests(unittest.TestCase): + """Verify calibration is persisted on every clean window AND on terminal.""" + + def _stub_runner( + self, + rates_then_crash: list[tuple[int, str]], + ) -> tuple[rc.WindowRunner, list[int]]: + """Build a runner that emits exactly the given (rate, classification) + pairs and then raises a mid-window crash on the next call.""" + + seen_rates: list[int] = [] + cursor = {"i": 0} + + class ScriptedRunner(rc.WindowRunner): + def run(self, *, rate, window_seconds, is_first_window): + seen_rates.append(rate) + idx = cursor["i"] + cursor["i"] += 1 + if idx >= len(rates_then_crash): + # Simulate scanner crash mid-window. + return rc.WindowMeasurement( + set_rate=rate, + elapsed_seconds=1.0, + tx_packets_delta=0, + tx_dropped_delta=0, + heartbeat_max_latency_ms=0, + scanner_finished_naturally=False, + scanner_exit_code=139, + ) + _expected_rate, kind = rates_then_crash[idx] + if kind == "clean": + return make_measurement( + set_rate=rate, achieved_pps=rate * 0.97 + ) + if kind == "slip": + return make_measurement( + set_rate=rate, + achieved_pps=rate * 0.95, + tx_dropped_delta=42, + ) + raise AssertionError(f"unknown kind: {kind}") + + return ScriptedRunner(), seen_rates + + def test_persists_on_each_clean_window(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + calib = rc.RateCalibrationStore(Path(tmpdir) / "rate-calibration.json") + policy = rc.AimdPolicy(window_seconds=30) + # Clean window 1 at 500k -> bumps to 700k. Clean window 2 at 700k. + # Then a mid-window crash on window 3. + runner, seen = self._stub_runner( + [ + (500_000, "clean"), + (700_000, "clean"), + ] + ) + controller = rc.RateController( + options=rc.ControllerOptions( + policy=policy, + window_seconds=float(policy.window_seconds), + interface="eth0", + starting_rate=500_000, + calibration=calib, + ), + runner=runner, + log_sink=io.StringIO(), + ) + with self.assertRaises(rc.ScannerWindowError): + controller.run() + self.assertEqual(seen, [500_000, 700_000, 900_000]) + # Calibration must reflect the highest CLEAN rate seen (700_000), + # NOT 500_000 (older snapshot would persist that) and NOT + # 900_000 (the crashed rate). + entry = calib.lookup("eth0") + self.assertIsNotNone(entry) + self.assertEqual(entry.learned_rate, 700_000) + + def test_persists_when_crash_strikes_before_any_clean_window(self) -> None: + # If the scanner crashes on window 1, max_clean_rate is 0 and we + # must NOT persist anything (regression guard for not stomping + # the prior calibration with 0). + with tempfile.TemporaryDirectory() as tmpdir: + calib = rc.RateCalibrationStore(Path(tmpdir) / "rate-calibration.json") + calib.store("eth0", 1_500_000, now_iso="2026-04-26T00:00:00Z") + runner, _ = self._stub_runner([]) + controller = rc.RateController( + options=rc.ControllerOptions( + policy=rc.AimdPolicy(window_seconds=30), + window_seconds=30.0, + interface="eth0", + starting_rate=500_000, + calibration=calib, + ), + runner=runner, + log_sink=io.StringIO(), + ) + with self.assertRaises(rc.ScannerWindowError): + controller.run() + entry = calib.lookup("eth0") + self.assertIsNotNone(entry) + self.assertEqual(entry.learned_rate, 1_500_000) # untouched + + def test_persists_after_natural_finish(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + calib = rc.RateCalibrationStore(Path(tmpdir) / "rate-calibration.json") + policy = rc.AimdPolicy(window_seconds=30) + scenario = StubWindow( + achieved_pps_for_set={500_000: 480_000, 700_000: 680_000}, + drops_for_set={}, + heartbeat_for_set={}, + natural_finish_after_window=2, + ) + runner = StubRunner(scenario) + controller = rc.RateController( + options=rc.ControllerOptions( + policy=policy, + window_seconds=float(policy.window_seconds), + interface="eth1", + starting_rate=500_000, + calibration=calib, + ), + runner=runner, + log_sink=io.StringIO(), + max_windows=5, + ) + controller.run() + entry = calib.lookup("eth1") + self.assertIsNotNone(entry) + # Window 1: clean at 500k. Window 2: clean at 700k AND scanner + # finished naturally. Highest clean rate = 700k. + self.assertEqual(entry.learned_rate, 700_000) + + if __name__ == "__main__": unittest.main() diff --git a/test_vulnscanner_adapter_multinic.py b/test_vulnscanner_adapter_multinic.py index d5d46eb..8425fd1 100644 --- a/test_vulnscanner_adapter_multinic.py +++ b/test_vulnscanner_adapter_multinic.py @@ -440,5 +440,103 @@ def test_two_interfaces_fan_out_and_merge(self) -> None: self.assertIn("10.0.0.5:80", result.stdout) +class CapConcurrentSubprocessesTests(unittest.TestCase): + """Verify the multi-NIC adapter caps concurrent shards (improvement #2).""" + + def test_cap_truncates_to_first_n_interfaces(self) -> None: + ifaces = ["eth0", "eth1", "eth2", "eth3", "eth4", "eth5", "eth6", "eth7"] + capped = adapter.cap_concurrent_subprocesses(ifaces, max_concurrent=4) + self.assertEqual(capped, ["eth0", "eth1", "eth2", "eth3"]) + + def test_cap_below_count_returns_full_list(self) -> None: + capped = adapter.cap_concurrent_subprocesses(["eth0", "eth1"], max_concurrent=4) + self.assertEqual(capped, ["eth0", "eth1"]) + + def test_cap_zero_or_negative_disables_cap(self) -> None: + ifaces = ["eth0", "eth1", "eth2"] + self.assertEqual(adapter.cap_concurrent_subprocesses(ifaces, max_concurrent=0), ifaces) + self.assertEqual(adapter.cap_concurrent_subprocesses(ifaces, max_concurrent=-1), ifaces) + + def test_cap_does_not_mutate_input(self) -> None: + ifaces = ["eth0", "eth1", "eth2", "eth3", "eth4"] + original = list(ifaces) + adapter.cap_concurrent_subprocesses(ifaces, max_concurrent=2) + self.assertEqual(ifaces, original) + + +class MultiNicSubprocessCapIntegrationTests(unittest.TestCase): + """End-to-end: pass 8 interfaces, observe 4 spawn calls (default cap=4).""" + + def _run_with_interfaces( + self, interfaces: list[str], *, env_max: str | None = None + ) -> tuple[int, list[str]]: + invocation = { + "target_range": "10.0.0.0-10.0.0.255", + "ports": "80", + "rate_limit": 0, + } + spawn_calls: list[str] = [] + + class StubChild: + def __init__(self, iface: str, shard_output: Path) -> None: + self._iface = iface + self._shard_output = shard_output + self.pid = 100000 + len(spawn_calls) + self.returncode = 0 + + def wait(self) -> int: + self._shard_output.write_text("") + return 0 + + def poll(self) -> int: + return 0 + + def fake_spawn(invocation_dict, *, interface, stderr_log): + shard_output = Path(invocation_dict["output_path"]) + shard_output.parent.mkdir(parents=True, exist_ok=True) + stderr_log.parent.mkdir(parents=True, exist_ok=True) + stderr_log.write_text("") + spawn_calls.append(interface) + return StubChild(interface, shard_output) + + with tempfile.TemporaryDirectory() as tmp: + output_path = Path(tmp) / "merged.out" + output_path.touch() + env_patch: dict[str, str] = {} + if env_max is not None: + env_patch["ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES"] = env_max + with mock.patch.object(adapter, "_spawn_shard_adapter", side_effect=fake_spawn), \ + mock.patch.dict(os.environ, env_patch, clear=False): + exit_code = adapter.run_multi_nic_scanner( + invocation, output_path, interfaces + ) + return exit_code, spawn_calls + + def test_eight_interfaces_capped_to_four_by_default(self) -> None: + ifaces = [f"eth{i}" for i in range(8)] + exit_code, spawned = self._run_with_interfaces(ifaces) + self.assertEqual(exit_code, 0) + self.assertEqual(spawned, ["eth0", "eth1", "eth2", "eth3"]) + + def test_env_override_raises_cap(self) -> None: + ifaces = [f"eth{i}" for i in range(8)] + exit_code, spawned = self._run_with_interfaces(ifaces, env_max="6") + self.assertEqual(exit_code, 0) + self.assertEqual(spawned, ["eth0", "eth1", "eth2", "eth3", "eth4", "eth5"]) + + def test_env_override_zero_disables_cap(self) -> None: + ifaces = [f"eth{i}" for i in range(8)] + exit_code, spawned = self._run_with_interfaces(ifaces, env_max="0") + self.assertEqual(exit_code, 0) + self.assertEqual(spawned, ifaces) + + def test_three_interfaces_unchanged_under_default_cap(self) -> None: + # Cap=4 with 3 NICs is a no-op — common path on smaller boxes. + ifaces = ["eth0", "eth1", "eth2"] + exit_code, spawned = self._run_with_interfaces(ifaces) + self.assertEqual(exit_code, 0) + self.assertEqual(spawned, ifaces) + + if __name__ == "__main__": unittest.main() diff --git a/vulnscanner-zmap-adapter.py b/vulnscanner-zmap-adapter.py index 142c743..42211fd 100755 --- a/vulnscanner-zmap-adapter.py +++ b/vulnscanner-zmap-adapter.py @@ -37,6 +37,15 @@ # c6in.xlarge confirms 1/2/4 receivers all hit the same send ceiling. DEFAULT_RECEIVER_THREADS = 1 DEFAULT_COOLDOWN_SECONDS = 5 +# Concurrency cap for the multi-NIC parent. anygpt-4 bench data shows the +# kernel TX path on c6in.metal-class boxes saturates around 4 concurrent +# scanner processes regardless of how many ENAs are attached: 4-NIC peaks +# at 12.8M aggregate, 8-NIC regressed to 1.3M because the 5th-8th +# children CPU-starved the first four into AIMD-cratering on every +# window. Keeping at most 4 active shards leaves headroom for agentd + +# other system processes and keeps the per-shard AIMD loops in their +# converged band. Override via ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES. +DEFAULT_MAX_CONCURRENT_SUBPROCESSES = 4 CURRENT_CHILD: subprocess.Popen[bytes] | None = None PROGRESS_LINE_RE = re.compile( r"(?P\d+):(?P\d+)\s+(?P\d+)%;\s+send:\s+" @@ -351,6 +360,25 @@ def parse_scanner_interfaces(value: str | None) -> list[str]: return result +def cap_concurrent_subprocesses( + interfaces: list[str], *, max_concurrent: int +) -> list[str]: + """Truncate the per-shard interface list to ``max_concurrent`` entries. + + The kernel TX path saturates well below the per-NIC ENA spec on + c6in.metal — anygpt-4 saw 4-NIC sustain 12.8M aggregate while 8-NIC + cratered to 1.3M because the extra shards CPU-starved the originals. + Capping the active shard count below the available NIC count keeps + the working set inside the kernel sweet spot. ``max_concurrent`` of 0 + or negative disables the cap entirely so test harnesses can exercise + the unbounded path. + """ + + if max_concurrent <= 0: + return list(interfaces) + return list(interfaces[:max_concurrent]) + + def format_target_range(start: int, end: int) -> str: """Render an IPv4 [start, end] inclusive range using the scanner's accepted forms. @@ -551,7 +579,23 @@ def run_dynamic_scanner( nic_reader = ( rate_controller.NicStatsReader(interface) if interface is not None else None ) + system_load_reader = rate_controller.SystemLoadReader() fallback_rate = resolve_rate_limit(invocation) + # Layer per-instance defaults underneath any explicit env knobs. The + # detection short-circuits on ANYSCAN_INSTANCE_TYPE so a multi-NIC + # parent that already detected once doesn't redo IMDS in every + # child. apply_instance_defaults preserves operator overrides — env + # knobs always win — and only fills in floor/ceiling/starting_rate + # when the operator hasn't pinned them. + instance_type = rate_controller.detect_instance_type(env=env) + if instance_type: + os.environ.setdefault("ANYSCAN_INSTANCE_TYPE", instance_type) + policy, fallback_rate = rate_controller.apply_instance_defaults( + policy=policy, + fallback_rate=fallback_rate, + instance_type=instance_type, + env=env, + ) calibration_path = env_string("ANYSCAN_RATE_CALIBRATION_PATH") or str( rate_controller.DEFAULT_CALIBRATION_PATH ) @@ -567,6 +611,7 @@ def run_dynamic_scanner( "controller_started", { "interface": interface, + "instance_type": instance_type, "starting_rate": starting_rate, "fallback_rate": fallback_rate, "policy_floor": policy.floor, @@ -575,6 +620,9 @@ def run_dynamic_scanner( "multiplicative_factor": policy.multiplicative_factor, "window_seconds": policy.window_seconds, "heartbeat_threshold_ms": policy.heartbeat_latency_threshold_ms, + "cpu_load_threshold": policy.cpu_load_threshold, + "drop_ratio_threshold": policy.drop_ratio_threshold, + "vcpu_count": system_load_reader.vcpu_count, "calibration_path": str(calibration.path), }, ) @@ -640,6 +688,7 @@ def command_for_rate(rate: int, is_first_window: bool) -> list[str]: interface=interface, starting_rate=starting_rate, calibration=calibration, + system_load_reader=system_load_reader, ), runner=runner, ) @@ -792,6 +841,14 @@ def run_multi_nic_scanner( operators see all rate-controller telemetry in one journal stream. """ + requested_interfaces = list(interfaces) + max_concurrent = env_int( + "ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES", + DEFAULT_MAX_CONCURRENT_SUBPROCESSES, + ) + interfaces = cap_concurrent_subprocesses( + requested_interfaces, max_concurrent=max_concurrent + ) target_range = require_string(invocation, "target_range") shards = split_target_range_for_shards(target_range, len(interfaces)) if len(shards) < len(interfaces): @@ -800,12 +857,24 @@ def run_multi_nic_scanner( # rather than spawning empty children. interfaces = interfaces[: len(shards)] + # Detect once at the parent so each shard child inherits the resolved + # type via ANYSCAN_INSTANCE_TYPE without redoing the IMDS round-trip. + if rate_controller is not None: + instance_type = rate_controller.detect_instance_type(env=os.environ) + if instance_type: + os.environ.setdefault("ANYSCAN_INSTANCE_TYPE", instance_type) + else: + instance_type = None + if rate_controller is not None: rate_controller.emit_metric( "multi_nic_orchestration_started", { "interfaces": interfaces, + "requested_interfaces": requested_interfaces, "shard_count": len(shards), + "max_concurrent": max_concurrent, + "instance_type": instance_type, "target_range": target_range, }, )