Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions install-worker-bundle.sh
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,17 @@ apply_host_resource_defaults() {
if [ -z "$(env_value "ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES" "$RUNTIME_ENV_FILE" || true)" ]; then
upsert_env_value "ANYSCAN_RATE_MAX_CONCURRENT_SUBPROCESSES" "4" "$RUNTIME_ENV_FILE"
fi
# Phase 2 PR D of plans/2026-04-27-portscan-afxdp-plan-v1.md §3.7.
# The adapter forwards this value as scanner --io-engine=<value>.
# AF_PACKET is the unconditional default on fresh installs so the
# opt-in knob is explicit in /etc/agentd/runtime.env; flipping
# it to af_xdp also requires ANYSCAN_AF_XDP_AVAILABLE=true (set
# by apply_afxdp_availability below) and the CAP_BPF grant in
# anyscan-worker.service. Operators who already have a value
# pinned (in-place upgrades) keep their setting.
if [ -z "$(env_value "ANYSCAN_SCANNER_IO_ENGINE" "$RUNTIME_ENV_FILE" || true)" ]; then
upsert_env_value "ANYSCAN_SCANNER_IO_ENGINE" "af_packet" "$RUNTIME_ENV_FILE"
fi
fi

# Defaults for the egress bandwidth reservation that ExecStartPre installs.
Expand Down
17 changes: 17 additions & 0 deletions runtime.worker.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,23 @@ POLL_INTERVAL_SECONDS=15
# ANYSCAN_RESERVE_TOR_CGROUP_PATH=system.slice/agentd-tunnel.service
# ANYSCAN_RESERVE_TOR_PORTS=9001,9030,9101

# Scanner I/O engine. Phase 2 PR D of plans/2026-04-27-portscan-afxdp-plan-v1.md
# §3.7. The bundled scanner exposes --io-engine={af_packet,af_xdp}; the
# adapter reads this knob and forwards it as the flag value. AF_PACKET is
# the unconditional default and the unconditional fallback. AF_XDP is
# opt-in per worker and is only honored when:
# 1. ANYSCAN_AF_XDP_AVAILABLE=true, which install-worker-bundle.sh writes
# after probing kernel >=5.10 + libxdp.so loadable; AND
# 2. The systemd unit grants CAP_BPF (anyscan-worker.service /
# anyscan-worker-only.service already include it so the operator
# does not have to refresh the unit when flipping this knob).
# When ANYSCAN_SCANNER_IO_ENGINE=af_xdp is requested but the install-time
# probe set ANYSCAN_AF_XDP_AVAILABLE=false (or the variable is absent),
# the adapter logs a warning to stderr and falls back to af_packet so the
# scanner does not crash at startup with a libxdp dlopen error. Set to
# "af_packet" or leave unset to stay on the known-safe default.
# ANYSCAN_SCANNER_IO_ENGINE=af_packet

# Installed bundle asset locations
EXTENSION_MANIFEST_PATHS=/opt/agentd/extensions/bootstrap-provisioner.json,/opt/agentd/extensions/portscan-adapter.json
ARTIFACT_DIR=/var/lib/agentd/artifacts
Expand Down
294 changes: 294 additions & 0 deletions test_vulnscanner_adapter_io_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
"""Unit tests for ANYSCAN_SCANNER_IO_ENGINE plumbing in vulnscanner-zmap-adapter.py.

PR D of plans/2026-04-27-portscan-afxdp-plan-v1.md §3.7 wires the runtime
opt-in env knob into the adapter. AF_PACKET stays the unconditional
default and the AF_XDP request is gated on ANYSCAN_AF_XDP_AVAILABLE
(written by install-worker-bundle.sh's runtime probe). When the knob
points at af_xdp on a host where the kernel/libxdp probe failed, the
adapter must fall back to af_packet and emit a warning rather than
silently scanning at AF_PACKET speeds while the operator believes XDP
is engaged.

Run via ``python3 -m unittest test_vulnscanner_adapter_io_engine -v``
from the anyscan repo root.
"""

from __future__ import annotations

import contextlib
import importlib.util
import io
import json
import os
import shutil
import subprocess
import sys
import tempfile
import textwrap
import unittest
from pathlib import Path
from unittest import mock


REPO_ROOT = Path(__file__).resolve().parent
ADAPTER_PATH = REPO_ROOT / "vulnscanner-zmap-adapter.py"


def _load_adapter():
spec = importlib.util.spec_from_file_location("vulnscanner_zmap_adapter", ADAPTER_PATH)
assert spec is not None and spec.loader is not None
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module


adapter = _load_adapter()


IO_ENGINE_ENV_KEYS = ("ANYSCAN_SCANNER_IO_ENGINE", "ANYSCAN_AF_XDP_AVAILABLE")


def _clear_io_engine_env() -> None:
for key in IO_ENGINE_ENV_KEYS:
os.environ.pop(key, None)


class ResolveIoEngineTests(unittest.TestCase):
"""resolve_io_engine() must select af_packet/af_xdp per env + AF_XDP probe."""

def setUp(self) -> None:
self._snapshot = {key: os.environ.get(key) for key in IO_ENGINE_ENV_KEYS}
_clear_io_engine_env()

def tearDown(self) -> None:
for key, value in self._snapshot.items():
if value is None:
os.environ.pop(key, None)
else:
os.environ[key] = value

def test_unset_defaults_to_af_packet(self) -> None:
self.assertEqual(adapter.resolve_io_engine(), "af_packet")

def test_explicit_af_packet(self) -> None:
os.environ["ANYSCAN_SCANNER_IO_ENGINE"] = "af_packet"
self.assertEqual(adapter.resolve_io_engine(), "af_packet")

def test_af_packet_does_not_consult_af_xdp_available(self) -> None:
# AF_XDP availability has no bearing when the operator did not
# request the AF_XDP path; staying on af_packet must not depend
# on libxdp being installed.
os.environ["ANYSCAN_SCANNER_IO_ENGINE"] = "af_packet"
os.environ["ANYSCAN_AF_XDP_AVAILABLE"] = "false"
self.assertEqual(adapter.resolve_io_engine(), "af_packet")

def test_af_xdp_with_runtime_available(self) -> None:
os.environ["ANYSCAN_SCANNER_IO_ENGINE"] = "af_xdp"
os.environ["ANYSCAN_AF_XDP_AVAILABLE"] = "true"
captured = io.StringIO()
with contextlib.redirect_stderr(captured):
self.assertEqual(adapter.resolve_io_engine(), "af_xdp")
self.assertEqual(captured.getvalue(), "")

def test_af_xdp_request_uppercase_normalizes(self) -> None:
os.environ["ANYSCAN_SCANNER_IO_ENGINE"] = "AF_XDP"
os.environ["ANYSCAN_AF_XDP_AVAILABLE"] = "true"
self.assertEqual(adapter.resolve_io_engine(), "af_xdp")

def test_af_xdp_with_unavailable_runtime_falls_back_with_warning(self) -> None:
os.environ["ANYSCAN_SCANNER_IO_ENGINE"] = "af_xdp"
os.environ["ANYSCAN_AF_XDP_AVAILABLE"] = "false"
captured = io.StringIO()
with contextlib.redirect_stderr(captured):
self.assertEqual(adapter.resolve_io_engine(), "af_packet")
message = captured.getvalue()
self.assertIn("af_xdp", message)
self.assertIn("ANYSCAN_AF_XDP_AVAILABLE", message)

def test_af_xdp_without_availability_var_falls_back(self) -> None:
# Missing ANYSCAN_AF_XDP_AVAILABLE behaves the same as false:
# the installer always writes the value (true OR false), so a
# missing key implies an old install where libxdp probe never
# ran. Be conservative.
os.environ["ANYSCAN_SCANNER_IO_ENGINE"] = "af_xdp"
captured = io.StringIO()
with contextlib.redirect_stderr(captured):
self.assertEqual(adapter.resolve_io_engine(), "af_packet")
self.assertIn("ANYSCAN_AF_XDP_AVAILABLE", captured.getvalue())

def test_invalid_value_falls_back_to_af_packet_with_warning(self) -> None:
os.environ["ANYSCAN_SCANNER_IO_ENGINE"] = "dpdk"
captured = io.StringIO()
with contextlib.redirect_stderr(captured):
self.assertEqual(adapter.resolve_io_engine(), "af_packet")
self.assertIn("dpdk", captured.getvalue())

def test_blank_value_defaults_to_af_packet_silently(self) -> None:
os.environ["ANYSCAN_SCANNER_IO_ENGINE"] = ""
captured = io.StringIO()
with contextlib.redirect_stderr(captured):
self.assertEqual(adapter.resolve_io_engine(), "af_packet")
self.assertEqual(captured.getvalue(), "")


class BuildCommandIoEngineTests(unittest.TestCase):
"""build_command must append --io-engine=<value> as a flag."""

def setUp(self) -> None:
self._snapshot = {key: os.environ.get(key) for key in IO_ENGINE_ENV_KEYS}
_clear_io_engine_env()
self._scanner_patch = mock.patch.object(
adapter, "resolve_scanner_binary", return_value=Path("/usr/bin/scanner")
)
self._scanner_patch.start()
self.addCleanup(self._scanner_patch.stop)

def tearDown(self) -> None:
for key, value in self._snapshot.items():
if value is None:
os.environ.pop(key, None)
else:
os.environ[key] = value

def _build(self) -> list[str]:
invocation = {
"target_range": "10.0.0.0/24",
"ports": "80",
"rate_limit": 0,
}
return adapter.build_command(invocation, Path("/tmp/out"))

def test_default_appends_io_engine_af_packet(self) -> None:
cmd = self._build()
self.assertIn("--io-engine=af_packet", cmd)
# Sanity: only one --io-engine flag, regardless of form.
io_engine_args = [arg for arg in cmd if arg.startswith("--io-engine")]
self.assertEqual(len(io_engine_args), 1)

def test_af_xdp_request_with_runtime_available_appends_af_xdp(self) -> None:
os.environ["ANYSCAN_SCANNER_IO_ENGINE"] = "af_xdp"
os.environ["ANYSCAN_AF_XDP_AVAILABLE"] = "true"
cmd = self._build()
self.assertIn("--io-engine=af_xdp", cmd)
self.assertNotIn("--io-engine=af_packet", cmd)

def test_af_xdp_request_without_runtime_falls_back_to_af_packet(self) -> None:
os.environ["ANYSCAN_SCANNER_IO_ENGINE"] = "af_xdp"
os.environ["ANYSCAN_AF_XDP_AVAILABLE"] = "false"
captured = io.StringIO()
with contextlib.redirect_stderr(captured):
cmd = self._build()
self.assertIn("--io-engine=af_packet", cmd)
self.assertNotIn("--io-engine=af_xdp", cmd)
self.assertIn("ANYSCAN_AF_XDP_AVAILABLE", captured.getvalue())

def test_invalid_request_falls_back_to_af_packet(self) -> None:
os.environ["ANYSCAN_SCANNER_IO_ENGINE"] = "garbage"
captured = io.StringIO()
with contextlib.redirect_stderr(captured):
cmd = self._build()
self.assertIn("--io-engine=af_packet", cmd)


class AdapterIoEngineIntegrationTests(unittest.TestCase):
"""End-to-end: spawn the adapter and confirm the scanner gets --io-engine=."""

def setUp(self) -> None:
self._tmp = Path(tempfile.mkdtemp(prefix="adapter-io-engine-"))
self.addCleanup(shutil.rmtree, self._tmp, ignore_errors=True)
self._stub_log = self._tmp / "calls.log"
self._stub = self._tmp / "stub-scanner.sh"
self._stub.write_text(
textwrap.dedent(
"""
#!/usr/bin/env bash
printf '%%s\\n' "$*" >>"%s"
output=""
while [ $# -gt 0 ]; do
case "$1" in
--output-file) output="$2"; shift 2 ;;
*) shift ;;
esac
done
if [ -n "$output" ]; then
: >"$output"
fi
printf '0:00 100%%; send: 1 1.00p/s (1.00p/s avg); recv: 0 0p/s\\n' >&2
exit 0
"""
% str(self._stub_log)
).strip()
+ "\n"
)
os.chmod(self._stub, 0o755)
self._output_path = self._tmp / "adapter.out"

def _run_adapter(self, env_overrides: dict[str, str]) -> subprocess.CompletedProcess[str]:
invocation = {
"target_range": "10.0.0.0-10.0.0.3",
"ports": "80",
"rate_limit": 0,
"output_path": str(self._output_path),
}
env = os.environ.copy()
env["SCANNER_BIN"] = str(self._stub)
# Force the legacy static path so the AIMD respawn loop does not
# add extra invocations the test would have to model.
env["ANYSCAN_DYNAMIC_RATE_ENABLED"] = "false"
# Strip ANYSCAN_SCANNER_INTERFACES so the parent does not engage
# multi-NIC fan-out from a host inheriting it from the env.
env.pop("ANYSCAN_SCANNER_INTERFACES", None)
for key in IO_ENGINE_ENV_KEYS:
env.pop(key, None)
env.update(env_overrides)
return subprocess.run(
[sys.executable, str(ADAPTER_PATH)],
input=json.dumps(invocation),
env=env,
text=True,
capture_output=True,
timeout=30,
)

def _read_calls(self) -> list[str]:
if not self._stub_log.exists():
return []
return [line for line in self._stub_log.read_text().splitlines() if line.strip()]

def test_default_passes_io_engine_af_packet(self) -> None:
result = self._run_adapter({})
self.assertEqual(result.returncode, 0, msg=result.stderr)
calls = self._read_calls()
self.assertEqual(len(calls), 1)
self.assertIn("--io-engine=af_packet", calls[0])

def test_af_xdp_request_with_runtime_available_passes_af_xdp(self) -> None:
result = self._run_adapter(
{
"ANYSCAN_SCANNER_IO_ENGINE": "af_xdp",
"ANYSCAN_AF_XDP_AVAILABLE": "true",
}
)
self.assertEqual(result.returncode, 0, msg=result.stderr)
calls = self._read_calls()
self.assertEqual(len(calls), 1)
self.assertIn("--io-engine=af_xdp", calls[0])
self.assertNotIn("--io-engine=af_packet", calls[0])

def test_af_xdp_request_without_runtime_falls_back_to_af_packet(self) -> None:
result = self._run_adapter(
{
"ANYSCAN_SCANNER_IO_ENGINE": "af_xdp",
"ANYSCAN_AF_XDP_AVAILABLE": "false",
}
)
self.assertEqual(result.returncode, 0, msg=result.stderr)
calls = self._read_calls()
self.assertEqual(len(calls), 1)
self.assertIn("--io-engine=af_packet", calls[0])
# The warning must surface so operators see the downgrade in journal.
self.assertIn("ANYSCAN_AF_XDP_AVAILABLE", result.stderr)


if __name__ == "__main__":
unittest.main()
49 changes: 49 additions & 0 deletions vulnscanner-zmap-adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,54 @@ def env_flag(name: str, default: bool = False) -> bool:
return value.lower() in {"1", "true", "yes", "on"}


# Phase 2 PR D of plans/2026-04-27-portscan-afxdp-plan-v1.md §3.7. The
# scanner accepts --io-engine={af_packet,af_xdp}; AF_PACKET stays the
# unconditional default and AF_XDP is opt-in per worker. The install-time
# probe in install-worker-bundle.sh::probe_afxdp_runtime_available
# writes ANYSCAN_AF_XDP_AVAILABLE=true|false based on kernel >=5.10
# AND libxdp.so being loadable; we refuse to forward af_xdp when the
# probe failed because the scanner would crash at startup with a dlopen
# error and the worker has no way to recover. The fall-back warning is
# loud on purpose: silently dropping back to AF_PACKET would let an
# operator who flipped the knob keep believing they were running on the
# fast path.
SUPPORTED_IO_ENGINES = ("af_packet", "af_xdp")
DEFAULT_IO_ENGINE = "af_packet"


def resolve_io_engine() -> str:
"""Resolve the scanner --io-engine value from env, with AF_XDP gating.

Returns the engine string the adapter should pass to the scanner.
Always returns a value from ``SUPPORTED_IO_ENGINES``; unrecognized,
unset, or ungated requests collapse to ``af_packet`` so the scanner
stays on its known-safe default. Warnings are emitted to stderr
when a request is downgraded so the journal carries an audit trail.
"""

raw = env_string("ANYSCAN_SCANNER_IO_ENGINE")
if raw is None:
return DEFAULT_IO_ENGINE
requested = raw.lower()
if requested not in SUPPORTED_IO_ENGINES:
print(
f"[anyscan-adapter] unrecognized ANYSCAN_SCANNER_IO_ENGINE={raw!r}; "
f"falling back to {DEFAULT_IO_ENGINE}",
file=sys.stderr,
)
return DEFAULT_IO_ENGINE
if requested == "af_xdp" and not env_flag("ANYSCAN_AF_XDP_AVAILABLE"):
print(
"[anyscan-adapter] ANYSCAN_SCANNER_IO_ENGINE=af_xdp requested but "
"ANYSCAN_AF_XDP_AVAILABLE!=true; the install-time probe did not "
"detect a kernel/libxdp combination that supports AF_XDP. "
f"Falling back to {DEFAULT_IO_ENGINE}.",
file=sys.stderr,
)
return DEFAULT_IO_ENGINE
return requested


def resolve_scanner_binary() -> Path:
candidates: list[Path] = []
configured = env_string("SCANNER_BIN")
Expand Down Expand Up @@ -192,6 +240,7 @@ def build_command(
str(cooldown),
"--output-file",
str(output_path),
f"--io-engine={resolve_io_engine()}",
]
checkpoint_path = invocation.get("checkpoint_path")
if isinstance(checkpoint_path, str) and checkpoint_path.strip():
Expand Down