diff --git a/pyoaev/apis/signature.py b/pyoaev/apis/signature.py index bed3e9b..6bbb125 100644 --- a/pyoaev/apis/signature.py +++ b/pyoaev/apis/signature.py @@ -10,7 +10,11 @@ from pyoaev import exceptions as exc from pyoaev.base import RESTManager, RESTObject from pyoaev.exceptions import SignatureTransmissionError -from pyoaev.signatures.models import SignatureCallbackPayload +from pyoaev.signatures.models import ( + ExecutionDetails, + SignatureCallbackPayload, + SignatureOutputStructure, +) class Signature(RESTObject): @@ -22,7 +26,7 @@ class Signature(RESTObject): class SignatureApiManager(RESTManager): """Manage signature callback transport to the OpenAEV backend. - Handles payload validation, auto-chunking, and retry with exponential backoff. + Handles payload validation and retry with exponential backoff. """ _path = "/injects" @@ -32,10 +36,6 @@ class SignatureApiManager(RESTManager): MAX_RETRIES = 3 RETRY_DELAYS = (1, 2, 4) - _CHUNK_METADATA_RESERVE = len( - ',"chunk_index":99999,"total_chunks":99999,"phase":"execution_complete_extended"' - ) - def __init__(self, openaev: "Any", parent: "Any" = None) -> None: """Initialize the signature API manager. @@ -68,8 +68,10 @@ def logger(self, value: logging.Logger) -> None: def send_signatures( self, inject_id: str, - phase: str, - signatures: dict[str, Any], + signatures: SignatureOutputStructure, + execution_details: ExecutionDetails, + max_payload_size: int | None = None, + logger: logging.Logger | None = None, ) -> None: """Send compiled signatures to the inject callback endpoint. @@ -77,38 +79,55 @@ def send_signatures( Args: inject_id: Inject UUID. - phase: Execution phase (e.g. 'execution_complete'). signatures: Full signatures dict (canonical or flat, grouped on the fly). + execution_details: Raises: SignatureTransmissionError: Validation failed, 4xx hit, or retries exhausted. """ - self._logger.debug("send_signatures inject_id=%s phase=%s", inject_id, phase) - signatures = self._normalize_signature_payload(signatures) - payload = self._build_callback_payload(signatures, phase=phase) + effective_max_size = ( + max_payload_size if max_payload_size is not None else self._max_payload_size + ) + effective_logger = logger if logger is not None else self._logger - serialized = json.dumps(payload, separators=(",", ":")).encode() + effective_logger.debug( + "send_signatures inject_id=%s, execution_status=%s, execution_action=%s", + inject_id, + execution_details.execution_status, + execution_details.execution_action, + ) + signatures.normalize_signature_payload() + payload = self._build_callback_payload( + signatures=signatures, execution_details=execution_details + ) + payload_size = len(json.dumps(payload).encode("utf-8")) - if len(serialized) <= self._max_payload_size: - self._send_with_retry(inject_id, payload) - else: - self._send_chunked(inject_id, payload["expectation_signature"], phase=phase) + if payload_size <= effective_max_size: + self._send_with_retry(inject_id, payload, logger=effective_logger) + return + + sig_data = json.loads(payload["execution_output_structured"]) + targets = sig_data["signatures"]["targets"] + envelopes = self._split_into_envelopes( + payload, + sig_data, + targets, + max_payload_size=effective_max_size, + logger=effective_logger, + ) + for envelope in envelopes: + self._send_with_retry(inject_id, envelope, logger=effective_logger) def _build_callback_payload( self, - signatures: dict[str, Any], - *, - phase: str | None = None, - chunk_index: int | None = None, - total_chunks: int | None = None, + signatures: SignatureOutputStructure, + execution_details: ExecutionDetails, ) -> dict[str, Any]: """Validate and wrap signatures in the strict callback envelope. Args: signatures: The inner signatures body, already normalised. - phase: Execution phase string (e.g. 'execution_complete'). - chunk_index: 0-based index when chunking, None for single POSTs. - total_chunks: Chunk count when chunking, None for single POSTs. + execution_details: The execution metadata to be stored next to the signatures in the payload. Returns: The validated dict ready for wire transmission. @@ -117,150 +136,76 @@ def _build_callback_payload( SignatureTransmissionError: Envelope failed Pydantic validation. """ try: - envelope = SignatureCallbackPayload.model_validate( - { - "expectation_signature": signatures, - "phase": phase, - "chunk_index": chunk_index, - "total_chunks": total_chunks, - } + envelope = SignatureCallbackPayload.build_from_models( + signatures, execution_details ) except ValidationError as ve: raise SignatureTransmissionError( error_message=f"Invalid signatures payload: {ve}", ) from ve - return envelope.model_dump(mode="json", exclude_none=True) - - def _normalize_signature_payload( - self, signatures: dict[str, Any] - ) -> dict[str, Any]: - """Regroup signature_values by expectation_type within each target. + envelope_dict = envelope.model_dump(mode="json", exclude_none=True) + SignatureCallbackPayload.model_validate(envelope_dict) + return envelope_dict - Accepts flat or pre-grouped input and returns canonical grouped form. + def _split_into_envelopes( + self, + base_payload: dict[str, Any], + sig_data: dict[str, Any], + targets: list[dict[str, Any]], + max_payload_size: int | None = None, + logger: logging.Logger | None = None, + ) -> list[dict[str, Any]]: + effective_max = ( + max_payload_size if max_payload_size is not None else self._max_payload_size + ) + effective_logger = logger if logger is not None else self._logger - Args: - signatures: Raw signatures dict with any mix of flat and grouped entries. + envelopes: list[dict[str, Any]] = [] + current_targets: list[dict[str, Any]] = [] - Returns: - New dict where every signature_values list is in canonical grouped form. - """ - targets = signatures.get("targets") - if not targets: - return signatures - - normalized_targets: list[dict[str, Any]] = [] for target in targets: - sig_values = target.get("signature_values") - if not sig_values: - normalized_targets.append(target) - continue - - grouped: dict[str, list[dict[str, Any]]] = {} - order: list[str] = [] - - for entry in sig_values: - etype = entry.get("expectation_type") - if etype not in grouped: - grouped[etype] = [] - order.append(etype) - - if "values" in entry and isinstance(entry["values"], list): - grouped[etype].extend(entry["values"]) + trial_targets = current_targets + [target] + trial_envelope = self._build_envelope(base_payload, sig_data, trial_targets) + trial_size = len(json.dumps(trial_envelope).encode("utf-8")) + + if trial_size > effective_max: + if current_targets: + envelopes.append( + self._build_envelope(base_payload, sig_data, current_targets) + ) + current_targets = [target] else: - grouped[etype].append( - {k: v for k, v in entry.items() if k != "expectation_type"} + effective_logger.warning( + "Single target exceeds max_payload_size (%d bytes > %d limit). Sending oversized envelope.", + trial_size, + effective_max, ) + envelopes.append(trial_envelope) + current_targets = [] + else: + current_targets = trial_targets + + if current_targets: + envelopes.append( + self._build_envelope(base_payload, sig_data, current_targets) + ) - normalized_target = dict(target) - normalized_target["signature_values"] = [ - {"expectation_type": etype, "values": grouped[etype]} for etype in order - ] - normalized_targets.append(normalized_target) - - normalized = dict(signatures) - normalized["targets"] = normalized_targets - return normalized - - def _send_chunked( - self, inject_id: str, signatures: dict[str, Any], phase: str | None = None - ) -> None: - """Split targets across sequential POSTs, each tagged with chunk metadata. - - Args: - inject_id: Inject UUID for the callback path. - signatures: Normalised inner signatures body to partition. - phase: Execution phase forwarded to each chunk envelope. - - Raises: - SignatureTransmissionError: A single target alone exceeds max_payload_size. - """ - targets = signatures.get("targets", []) - if not targets: - payload = self._build_callback_payload(signatures, phase=phase) - size = len(json.dumps(payload, separators=(",", ":")).encode()) - if size > self._max_payload_size: - self._logger.warning( - "Payload of %d bytes exceeds max_payload_size %d but has no " - "'targets' key to chunk on; sending unchunked", - size, - self._max_payload_size, - ) - self._send_with_retry(inject_id, payload) - return - - budget = max(self._max_payload_size - self._CHUNK_METADATA_RESERVE, 0) - chunks: list[list[Any]] = [] - current_chunk: list[Any] = [] + return envelopes - for target in targets: - candidate = current_chunk + [target] - size = len( - json.dumps( - {"expectation_signature": {"targets": candidate}}, - separators=(",", ":"), - ).encode() - ) + def _build_envelope( + self, + base_payload: dict[str, Any], + sig_data: dict[str, Any], + targets_subset: list[dict[str, Any]], + ) -> dict[str, Any]: + subset_sig = dict(sig_data) + subset_sig["signatures"] = dict(sig_data["signatures"]) + subset_sig["signatures"]["targets"] = targets_subset - if size <= budget: - current_chunk.append(target) - continue - - if not current_chunk: - raise SignatureTransmissionError( - error_message=( - f"Single target payload of {size} bytes exceeds " - f"max_payload_size {self._max_payload_size}; cannot chunk further" - ), - ) - - chunks.append(current_chunk) - current_chunk = [target] - solo_size = len( - json.dumps( - {"expectation_signature": {"targets": [target]}}, - separators=(",", ":"), - ).encode() - ) - if solo_size > budget: - raise SignatureTransmissionError( - error_message=( - f"Single target payload of {solo_size} bytes exceeds " - f"max_payload_size {self._max_payload_size}; cannot chunk further" - ), - ) - - if current_chunk: - chunks.append(current_chunk) - - total_chunks = len(chunks) - for idx, chunk_targets in enumerate(chunks): - chunk_payload = self._build_callback_payload( - {"targets": chunk_targets}, - phase=phase, - chunk_index=idx, - total_chunks=total_chunks, - ) - self._send_with_retry(inject_id, chunk_payload) + envelope = dict(base_payload) + envelope["execution_output_structured"] = json.dumps(subset_sig) + SignatureCallbackPayload.model_validate(envelope) + return envelope @exc.on_http_error(exc.OpenAEVUpdateError) def callback( @@ -276,12 +221,15 @@ def callback( Returns: The parsed response from the backend. """ - path = f"{self.path}/{inject_id}/callback" + path = f"{self.path}/execution/callback/{inject_id}" result = self.openaev.http_post(path, post_data=data, **kwargs) return result def _send_with_retry( - self, inject_id: str, payload: dict[str, Any] + self, + inject_id: str, + payload: dict[str, Any], + logger: logging.Logger | None = None, ) -> dict[str, Any]: """Retry callback() with exponential backoff on 5xx, immediate raise on 4xx. @@ -297,6 +245,7 @@ def _send_with_retry( """ from pyoaev.exceptions import OpenAEVError + effective_logger = logger if logger is not None else self._logger last_error: Exception | None = None for attempt in range(self.MAX_RETRIES + 1): @@ -308,7 +257,7 @@ def _send_with_retry( body_str = "" if ex.response_body: body_str = ex.response_body.decode(errors="replace") - self._logger.error( + effective_logger.error( "Client error %d sending signatures: %s", status, body_str or ex.error_message, @@ -322,7 +271,7 @@ def _send_with_retry( last_error = ex if attempt < self.MAX_RETRIES: delay = self.RETRY_DELAYS[attempt] - self._logger.warning( + effective_logger.warning( "Retry %d/%d after %ds (HTTP %s): %s", attempt + 1, self.MAX_RETRIES, diff --git a/pyoaev/signatures/__init__.py b/pyoaev/signatures/__init__.py index d8a60cd..dc2e061 100644 --- a/pyoaev/signatures/__init__.py +++ b/pyoaev/signatures/__init__.py @@ -1,31 +1,37 @@ from pyoaev.signatures.models import ( CloudInjectorConfig, ExpectationSignatureGroup, - ExternalInjectorConfig, ExtraSignatureData, InjectorConfig, NetworkInjectorConfig, SignatureCallbackPayload, SignaturePayload, + SignatureTarget, SignatureValue, TargetSignatures, build_network_configs, ) from pyoaev.signatures.signature_manager import SignatureManager -from pyoaev.signatures.types import ExpectationType, MatchTypes, SignatureTypes +from pyoaev.signatures.types import ( + ExpectationType, + InjectExecutionActions, + MatchTypes, + SignatureTypes, +) __all__ = [ "CloudInjectorConfig", "ExpectationSignatureGroup", - "ExternalInjectorConfig", "ExpectationType", "ExtraSignatureData", "InjectorConfig", + "InjectExecutionActions", "MatchTypes", "NetworkInjectorConfig", "SignatureCallbackPayload", "SignatureManager", "SignaturePayload", + "SignatureTarget", "SignatureTypes", "SignatureValue", "TargetSignatures", diff --git a/pyoaev/signatures/models.py b/pyoaev/signatures/models.py index 553c7e2..a133798 100644 --- a/pyoaev/signatures/models.py +++ b/pyoaev/signatures/models.py @@ -1,6 +1,9 @@ """Pydantic schemas pinning every shape SignatureManager touches.""" import ipaddress +import math +from collections import defaultdict +from datetime import datetime, timezone from typing import Any from pydantic import ( @@ -8,11 +11,13 @@ ConfigDict, Field, JsonValue, + TypeAdapter, + computed_field, field_validator, model_validator, ) -from pyoaev.signatures.types import ExpectationType +from pyoaev.signatures.types import ExpectationType, InjectExecutionActions class SignatureValue(BaseModel): @@ -46,23 +51,34 @@ class ExtraSignatureData(BaseModel): prevention: dict[str, JsonValue] | None = Field(default_factory=dict) vulnerability: dict[str, JsonValue] | None = Field(default_factory=dict) - def get_extra(self, expectation_type: str): + def get_extra(self, expectation_type: str) -> dict[str, JsonValue]: if expectation_type.lower() == "detection": - return self.detection + return self.detection or {} if expectation_type.lower() == "prevention": - return self.prevention + return self.prevention or {} if expectation_type.lower() == "vulnerability": - return self.vulnerability + return self.vulnerability or {} raise ValueError( f"Expectation type should be one of the available parameters: {list(self.model_fields.keys())}" ) +class SignatureTarget(BaseModel): + """Target identity on the wire.""" + + model_config = ConfigDict(extra="forbid") + + agent: str | None = None + asset: str | None = None + asset_group: str | None = None + + class TargetSignatures(BaseModel): """A target plus everything observed about it, grouped by expectation.""" model_config = ConfigDict(extra="allow") + signature_target: SignatureTarget signature_values: list[ExpectationSignatureGroup] @@ -74,19 +90,104 @@ class SignaturePayload(BaseModel): targets: list[TargetSignatures] +class SignatureOutputStructure(BaseModel): + """Structured output to be serialized as a str in the callback payload yet data has to follow model.""" + + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + signatures: SignaturePayload + + def normalize_signature_payload(self) -> None: + """ + Regroup signature_values by expectation_type within each target. + """ + normalized_targets: list[TargetSignatures] = [] + + for target in self.signatures.targets: + if not target.signature_values: + normalized_targets.append(target) + continue + + grouped: dict[str, list[dict[str, Any]]] = defaultdict(list) + order: list[str] = [] + + for entry in target.signature_values: + if entry.expectation_type not in order: + order.append(entry.expectation_type) + grouped[entry.expectation_type].extend(entry.values) + + normalized_target = TargetSignatures( + signature_target=target.signature_target, + signature_values=[ + ExpectationSignatureGroup( + expectation_type=expectation_type, + values=grouped[expectation_type], + ) + for expectation_type in order + ], + ) + + normalized_targets.append(normalized_target) + + self.signatures.targets = normalized_targets + + +class ExecutionDetails(BaseModel): + """Helper to wrap the execution-related details for the callback payload""" + + model_config = ConfigDict(extra="forbid") + + start_time: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + end_time: datetime | None = None + + execution_status: str + execution_message: str = "" + execution_action: InjectExecutionActions | None = None + + @computed_field + @property + def execution_duration(self) -> float: + try: + return (self.end_time - self.start_time).total_seconds() + except: + return 0.0 + + class SignatureCallbackPayload(BaseModel): - """Outer POST envelope. Pure ``{signatures}`` when unchunked, plus chunk fields when split.""" + """Outer POST envelope validated by ``SignatureApiManager`` before wire transmission.""" model_config = ConfigDict(populate_by_name=True, extra="forbid") - expectation_signature: SignaturePayload - phase: str | None = None - chunk_index: int | None = None - total_chunks: int | None = None + execution_message: str + execution_output_structured: str | None = None + execution_status: str + execution_duration: int | None = None + execution_action: InjectExecutionActions | None = None + + @field_validator("execution_output_structured", mode="after") + @classmethod + def is_proper_signature_output_structure(cls, value: str) -> str: + TypeAdapter(SignatureOutputStructure).validate_json(value) + return value + + @classmethod + def build_from_models( + cls, signatures: SignatureOutputStructure, execution_details: ExecutionDetails + ): + """Producing a SignatureCallbackPayload from the data of a SignatureOutputStructure and of a ExecutionDetails.""" + return cls( + execution_message=execution_details.execution_message, + execution_output_structured=signatures.model_dump_json(exclude_none=True), + execution_status=execution_details.execution_status, + execution_duration=math.ceil(execution_details.execution_duration) + if execution_details.execution_duration is not None + else None, + execution_action=execution_details.execution_action, + ) class PreExecutionSignature(BaseModel): - """Pre-execution data dump. Field set varies by category: network, cloud, external.""" + """Pre-execution data dump. Field set varies by category: network, cloud.""" model_config = ConfigDict(extra="allow") @@ -106,9 +207,6 @@ class PreExecutionSignature(BaseModel): cloud_region: str | None = None target_service: str | None = None - # External - query: str | None = None - class PostExecutionSignature(PreExecutionSignature): """Post-execution view: pre-execution fields plus outcome, end_time, and any partial results.""" @@ -159,7 +257,7 @@ class NetworkInjectorConfig(BaseModel): def check_one(cls, data): assert ( sum( - value != None + value is not None for key, value in data.items() if key in ["target_ipv4", "target_ipv6", "target_hostname"] ) @@ -179,17 +277,7 @@ class CloudInjectorConfig(BaseModel): target_service: str | None = None -class ExternalInjectorConfig(BaseModel): - """A single external scan target (e.g. Shodan): a query against an asset.""" - - model_config = ConfigDict(extra="forbid") - - query: str - target_ipv4: str | None = None - target_hostname: str | None = None - - -InjectorConfig = NetworkInjectorConfig | CloudInjectorConfig | ExternalInjectorConfig +InjectorConfig = NetworkInjectorConfig | CloudInjectorConfig # --------------------------------------------------------------------------- @@ -245,6 +333,7 @@ def build_network_configs( __all__ = [ "SignatureValue", "ExpectationSignatureGroup", + "SignatureTarget", "TargetSignatures", "SignaturePayload", "SignatureCallbackPayload", @@ -255,7 +344,6 @@ def build_network_configs( "ToolOutput", "NetworkInjectorConfig", "CloudInjectorConfig", - "ExternalInjectorConfig", "InjectorConfig", "build_network_configs", ] diff --git a/pyoaev/signatures/signature_manager.py b/pyoaev/signatures/signature_manager.py index aaf101d..f9d0c25 100644 --- a/pyoaev/signatures/signature_manager.py +++ b/pyoaev/signatures/signature_manager.py @@ -12,14 +12,16 @@ from pyoaev.exceptions import OpenAEVError from pyoaev.signatures.models import ( CloudInjectorConfig, + ExecutionDetails, ExpectationSignatureGroup, - ExternalInjectorConfig, ExtraSignatureData, InjectorConfig, NetworkInjectorConfig, PostExecutionSignature, PreExecutionSignature, + SignatureOutputStructure, SignaturePayload, + SignatureTarget, SignatureValue, TargetSignatures, ToolOutput, @@ -32,7 +34,7 @@ class SignatureManager: """End-to-end signature pipeline: compile, merge, transmit. One class, three jobs.""" - DEFAULT_MAX_PAYLOAD_SIZE = 1_048_576 # 1 MiB + DEFAULT_MAX_PAYLOAD_SIZE = 5_242_880 # 5 MiB def __init__( self, @@ -57,8 +59,8 @@ def compile_pre_execution_signatures( """Build pre-execution signature dicts from one or more typed injector configs. The category is carried by the config type itself - (:class:`NetworkInjectorConfig`, :class:`CloudInjectorConfig`, - :class:`ExternalInjectorConfig`), so no separate ``category`` flag is needed. + (:class:`NetworkInjectorConfig`, :class:`CloudInjectorConfig`), + so no separate ``category`` flag is needed. Args: config: A single injector config or a homogeneous list of them. @@ -95,7 +97,7 @@ def _compile_one(self, config: InjectorConfig, start_time: str) -> dict[str, Any Common pipeline for every category: 1. Seed the base dict with ``start_time`` and category-specific context - (network gets resolved source IPs; cloud/external add nothing). + (network gets resolved source IPs; cloud add nothing). 2. Layer the config's own fields on top. 3. Run it through :class:`PreExecutionSignature` for validation and emit JSON-ready output stripped of ``None``\\ s. @@ -109,14 +111,14 @@ def _source_context(self, config: InjectorConfig) -> dict[str, Any]: """Return the source identity bits injected for the config's category. Only network signatures need the running container's source IPs; - cloud and external rows have no source identity to carry. + cloud rows have no source identity to carry. """ if isinstance(config, NetworkInjectorConfig): return { "source_ipv4": self.resolve_container_ip(), "source_ipv6": self._cached_ipv6, } - if isinstance(config, (CloudInjectorConfig, ExternalInjectorConfig)): + if isinstance(config, CloudInjectorConfig): return {} raise TypeError(f"unsupported injector config type: {type(config).__name__}") @@ -173,6 +175,7 @@ def _merge_post( @staticmethod def build_payload( post_signatures: dict[str, Any] | list[dict[str, Any]], + targets_meta: dict[str, str] | list[dict[str, str]], expectation_types: list[str], extra_signatures: ExtraSignatureData | None = None, ) -> dict[str, Any]: @@ -183,6 +186,7 @@ def build_payload( Args: post_signatures: A single post-execution dict or a list (multi-targets). + targets_meta: Target metadata dict(s) with keys like agent, asset, asset_group. expectation_types: The 1+ expectation type labels (e.g. ['DETECTION', 'PREVENTION']). extra_signatures: Optional mapping of expectation types to additional signature fields that will be merged into the base post_signatures. @@ -192,23 +196,20 @@ def build_payload( """ if isinstance(post_signatures, dict): post_signatures = [post_signatures] + if isinstance(targets_meta, dict): + targets_meta = [targets_meta] * len(post_signatures) targets = [] - for signature in post_signatures: + for signature, target in zip(post_signatures, targets_meta): signature_values = [] - for expectation_type in expectation_types: signature_data = signature.copy() - signature_data.update(extra_signatures.get_extra(expectation_type)) - + if extra_signatures: + signature_data.update(extra_signatures.get_extra(expectation_type)) values = [ - SignatureValue( - signature_type=key, - signature_value=value, - ) + SignatureValue(signature_type=key, signature_value=value) for key, value in signature_data.items() ] - signature_values.append( ExpectationSignatureGroup( expectation_type=expectation_type, @@ -217,6 +218,7 @@ def build_payload( ) targets.append( TargetSignatures( + signature_target=SignatureTarget(**target), signature_values=signature_values, ) ) @@ -231,19 +233,28 @@ def send_signatures( ) -> None: """Ship signatures to the callback endpoint via the Signature API manager. - Delegates transport (retry, chunking, validation) to ``client.signature``. + Constructs typed ``SignatureOutputStructure`` and ``ExecutionDetails`` + models, then delegates transport (retry, envelope splitting, validation) + to ``client.signature``. Args: inject_id: Inject UUID. - phase: Execution phase. - signatures: Full signatures dict, canonical or flat, both grouped on the fly. + phase: Execution phase (mapped to ``execution_status``). + signatures: Full signatures dict with a ``targets`` list. Raises: SignatureTransmissionError: Validation failed, 4xx hit, or retries exhausted. """ - self.client.signature.max_payload_size = self.max_payload_size - self.client.signature.logger = self.logger - self.client.signature.send_signatures(inject_id, phase, signatures) + sig_output = SignatureOutputStructure(signatures=SignaturePayload(**signatures)) + exec_details = ExecutionDetails(execution_status=phase) + + self.client.signature.send_signatures( + inject_id, + sig_output, + exec_details, + max_payload_size=self.max_payload_size, + logger=self.logger, + ) def resolve_container_ip(self) -> str: """Sniff the container's primary IPv4. Env var, hostname, then ``hostname -i``. @@ -251,7 +262,7 @@ def resolve_container_ip(self) -> str: Returns: The IPv4 string, or ``'unknown'`` with a single warning when all strategies fail. """ - if self._cached_ipv4: + if self._cached_ipv4 and self._cached_ipv4 != "unknown": return self._cached_ipv4 env_ip = os.environ.get("CONTAINER_IP") diff --git a/pyoaev/signatures/signature_type.py b/pyoaev/signatures/signature_type.py index e0b0500..bab72db 100644 --- a/pyoaev/signatures/signature_type.py +++ b/pyoaev/signatures/signature_type.py @@ -19,7 +19,7 @@ def __init__( self, label: SignatureTypes, match_type: MatchTypes = MatchTypes.MATCH_TYPE_SIMPLE, - match_score: int = None, + match_score: int | None = None, ): self.label = label self.match_policy = SignatureMatch(match_type, match_score) diff --git a/pyoaev/signatures/types.py b/pyoaev/signatures/types.py index ca2737d..0c4f78b 100644 --- a/pyoaev/signatures/types.py +++ b/pyoaev/signatures/types.py @@ -7,6 +7,17 @@ class ExpectationType(str, Enum): VULNERABILITY = "VULNERABILITY" +class InjectExecutionActions(str, Enum): + PREREQUISITE_CHECK = "prerequisite_check" + PREREQUISITE_EXECUTION = "prerequisite_execution" + CLEANUP_EXECUTION = "cleanup_execution" + COMMAND_EXECUTION = "command_execution" + DNS_RESOLUTION = "dns_resolution" + FILE_EXECUTION = "file_execution" + FILE_DROP = "file_drop" + COMPLETE = "complete" + + class MatchTypes(str, Enum): MATCH_TYPE_FUZZY = "fuzzy" MATCH_TYPE_SIMPLE = "simple" diff --git a/test/signatures/constraints/signature_manager_transmission_constraints.feature b/test/signatures/constraints/signature_manager_transmission_constraints.feature index 0fcf4f1..fabdd5c 100644 --- a/test/signatures/constraints/signature_manager_transmission_constraints.feature +++ b/test/signatures/constraints/signature_manager_transmission_constraints.feature @@ -6,14 +6,13 @@ Feature: SignatureManager transmission constraints Background: Given a SignatureManager initialised with constructor SignatureManager(client, logger) - Scenario: Payload exceeding MAX_PAYLOAD_SIZE is auto-chunked with chunk metadata + Scenario: Payload exceeding MAX_PAYLOAD_SIZE is split into multiple sequential envelopes Given a compiled payload whose serialised size exceeds MAX_PAYLOAD_SIZE by at least a factor of 2 And the backend responds with HTTP 200 When I call send_signatures for inject_id "inject-abc-001" with phase "execution_complete" - Then the payload is sent as multiple sequential POST requests to /injects/inject-abc-001/callback - And each POST request body contains chunk_index as a 0-based integer - And each POST request body contains total_chunks as a positive integer matching the total number of chunks sent - And each POST request body contains only "signatures", "chunk_index" and "total_chunks" at the top level + Then the payload is sent as multiple sequential POST requests to /injects/execution/callback/inject-abc-001 + And each POST request body is a valid self-contained envelope with the same structure as a single-send payload + And no POST request body contains chunk_index or total_chunks keys And the union of targets across all POST requests equals the original target set And no individual POST request body exceeds MAX_PAYLOAD_SIZE bytes @@ -21,7 +20,7 @@ Feature: SignatureManager transmission constraints Given a compiled post-execution payload for inject_id "inject-abc-001" And the backend responds with HTTP 503 on every attempt When I call send_signatures for inject_id "inject-abc-001" with phase "execution_complete" - Then send_signatures sends a total of 4 POST requests to /injects/inject-abc-001/callback + Then send_signatures sends a total of 4 POST requests to /injects/execution/callback/inject-abc-001 And a WARNING log message containing the retry attempt number is emitted before each of the 3 retry attempts And the wait before attempt 2 is 1 second And the wait before attempt 3 is 2 seconds @@ -32,7 +31,7 @@ Feature: SignatureManager transmission constraints Given a compiled post-execution payload for inject_id "inject-abc-001" And the backend responds with HTTP 400 and body '{"error": "bad request"}' When I call send_signatures for inject_id "inject-abc-001" with phase "execution_complete" - Then only 1 POST request is sent to /injects/inject-abc-001/callback + Then only 1 POST request is sent to /injects/execution/callback/inject-abc-001 And an ERROR log message containing status code 400 and the response body is emitted And an exception is raised immediately And no sleep or wait occurs before the exception is raised diff --git a/test/signatures/features/signature_manager_pre_execution.feature b/test/signatures/features/signature_manager_pre_execution.feature index 1ba3ed5..1ae80da 100644 --- a/test/signatures/features/signature_manager_pre_execution.feature +++ b/test/signatures/features/signature_manager_pre_execution.feature @@ -40,14 +40,6 @@ Feature: SignatureManager pre-execution signature compilation And the returned dict does not contain target_ipv4 And the returned dict does not contain target_ipv6 - Scenario: External category returns scan target fields and no source IP - Given an ExternalInjectorConfig with target_ipv4="203.0.113.5" and query="port:22 os:linux" - When I call compile_pre_execution_signatures with the config - Then the returned dict contains target_ipv4 equal to "203.0.113.5" - And the returned dict contains query equal to "port:22 os:linux" - And the returned dict contains start_time as a UTC ISO 8601 string - But the returned dict does not contain source_ipv4 - Scenario Outline: Network multi-target returns one dict per target with a shared source IP Given a list of 3 NetworkInjectorConfig with target_ipv4 "10.0.0.1", "10.0.0.2", "10.0.0.3" And the running container has a resolvable IPv4 address "172.17.0.2" diff --git a/test/signatures/features/signature_manager_transmission.feature b/test/signatures/features/signature_manager_transmission.feature index 94e34bd..2d228cb 100644 --- a/test/signatures/features/signature_manager_transmission.feature +++ b/test/signatures/features/signature_manager_transmission.feature @@ -21,7 +21,7 @@ Feature: SignatureManager signature transmission and container IP resolution Given a compiled payload with 1 target, expectation_type "DETECTION", signature_type "public_ip", signature_value "203.0.113.5" And the backend responds with HTTP 200 When I call send_signatures for inject_id "inject-abc-001" with phase "execution_complete" - Then a POST request is sent to /injects/inject-abc-001/callback + Then a POST request is sent to /injects/execution/callback/inject-abc-001 And the POST request body contains signatures.targets as a list And signatures.targets[0].signature_values[0].expectation_type equals "DETECTION" And signatures.targets[0].signature_values[0].values[0].signature_type equals "public_ip" diff --git a/test/signatures/test_signature_manager_pre_execution.py b/test/signatures/test_signature_manager_pre_execution.py index f7581dc..93db6cc 100644 --- a/test/signatures/test_signature_manager_pre_execution.py +++ b/test/signatures/test_signature_manager_pre_execution.py @@ -7,7 +7,6 @@ from pyoaev.signatures.models import ( CloudInjectorConfig, - ExternalInjectorConfig, NetworkInjectorConfig, build_network_configs, ) @@ -42,14 +41,6 @@ def test_cloud_category_required_fields(): pass -@scenario( - "features/signature_manager_pre_execution.feature", - "External category returns scan target fields and no source IP", -) -def test_external_category_fields(): - pass - - @scenario( "features/signature_manager_pre_execution.feature", "Network multi-target returns one dict per target with a shared source IP", @@ -161,16 +152,6 @@ def cloud_config_single( ) -@given( - parsers.parse( - 'an ExternalInjectorConfig with target_ipv4="{target_ipv4}" and query="{query}"' - ), - target_fixture="config", -) -def external_config_single(target_ipv4, query): - return ExternalInjectorConfig(target_ipv4=target_ipv4, query=query) - - @given( parsers.parse( "a list of 3 NetworkInjectorConfig with target_ipv4 " diff --git a/test/signatures/test_signature_manager_transmission.py b/test/signatures/test_signature_manager_transmission.py index 6519bd6..edee730 100644 --- a/test/signatures/test_signature_manager_transmission.py +++ b/test/signatures/test_signature_manager_transmission.py @@ -29,9 +29,9 @@ def test_send_signatures_posts_with_agreed_nested_schema(): @scenario( "constraints/signature_manager_transmission_constraints.feature", - "Payload exceeding MAX_PAYLOAD_SIZE is auto-chunked with chunk metadata", + "Payload exceeding MAX_PAYLOAD_SIZE is split into multiple sequential envelopes", ) -def test_payload_exceeding_max_payload_size_is_split_into_sequential_chunks(): +def test_payload_exceeding_max_payload_size_is_split_into_sequential_envelopes(): pass @@ -87,6 +87,12 @@ def context(): } +def _extract_targets(body: dict) -> list[dict]: + """Parse targets from the SignatureCallbackPayload wire format.""" + sig_data = json.loads(body["execution_output_structured"]) + return sig_data["signatures"]["targets"] + + def _build_signature_payload( signature_value="203.0.113.5", expectation_types=None, @@ -325,18 +331,25 @@ def compiled_payload_grouped_by_expectation( "signature_values": [ { "expectation_type": expectation_a, - "signature_type": "public_ip", - "signature_value": "203.0.113.5", + "values": [ + { + "signature_type": "public_ip", + "signature_value": "203.0.113.5", + }, + { + "signature_type": "hostname", + "signature_value": "host-a.internal", + }, + ], }, { "expectation_type": expectation_b, - "signature_type": "public_ip", - "signature_value": "198.51.100.10", - }, - { - "expectation_type": expectation_a, - "signature_type": "hostname", - "signature_value": "host-a.internal", + "values": [ + { + "signature_type": "public_ip", + "signature_value": "198.51.100.10", + }, + ], }, ], } @@ -379,18 +392,22 @@ def send_signatures_completes_without_exception(context): @then( parsers.parse( - "a POST request is sent to /injects/{inject_id}/callback", + "a POST request is sent to /injects/execution/callback/{inject_id}", ) ) def assert_post_request_sent_to_callback(context, inject_id): assert context["captured_calls"] - assert context["captured_calls"][-1]["path"] == f"/injects/{inject_id}/callback" + assert ( + context["captured_calls"][-1]["path"] + == f"/injects/execution/callback/{inject_id}" + ) @then("the POST request body contains signatures.targets as a list") def assert_targets_is_list(context): body = context["captured_calls"][-1]["post_data"] - assert isinstance(body["expectation_signature"]["targets"], list) + targets = _extract_targets(body) + assert isinstance(targets, list) @then( @@ -400,9 +417,8 @@ def assert_targets_is_list(context): ) def assert_expectation_type(context, expected_value): body = context["captured_calls"][-1]["post_data"] - assert body["expectation_signature"]["targets"][0]["signature_values"][0][ - "expectation_type" - ] == (expected_value) + targets = _extract_targets(body) + assert targets[0]["signature_values"][0]["expectation_type"] == expected_value @then( @@ -412,10 +428,9 @@ def assert_expectation_type(context, expected_value): ) def assert_signature_type(context, expected_value): body = context["captured_calls"][-1]["post_data"] + targets = _extract_targets(body) assert ( - body["expectation_signature"]["targets"][0]["signature_values"][0]["values"][0][ - "signature_type" - ] + targets[0]["signature_values"][0]["values"][0]["signature_type"] == expected_value ) @@ -427,10 +442,9 @@ def assert_signature_type(context, expected_value): ) def assert_signature_value(context, expected_value): body = context["captured_calls"][-1]["post_data"] + targets = _extract_targets(body) assert ( - body["expectation_signature"]["targets"][0]["signature_values"][0]["values"][0][ - "signature_value" - ] + targets[0]["signature_values"][0]["values"][0]["signature_value"] == expected_value ) @@ -438,54 +452,42 @@ def assert_signature_value(context, expected_value): @then("signatures.targets[0] contains a signature_target key") def assert_signature_target_key(context): body = context["captured_calls"][-1]["post_data"] - assert "signature_target" in body["expectation_signature"]["targets"][0] + targets = _extract_targets(body) + assert "signature_target" in targets[0] @then( parsers.parse( - "the payload is sent as multiple sequential POST requests to /injects/{inject_id}/callback", + "the payload is sent as multiple sequential POST requests to /injects/execution/callback/{inject_id}", ) ) def assert_payload_sent_as_multiple_chunks(context, inject_id): assert context["send_exception"] is None assert len(context["captured_calls"]) > 1 assert all( - call_item["path"] == f"/injects/{inject_id}/callback" + call_item["path"] == f"/injects/execution/callback/{inject_id}" for call_item in context["captured_calls"] ) -@then("each POST request body contains chunk_index as a 0-based integer") -def assert_chunk_index_present(context): - for index, call_item in enumerate(context["captured_calls"]): - post_data = call_item["post_data"] - assert isinstance(post_data["chunk_index"], int) - assert post_data["chunk_index"] == index - - @then( - "each POST request body contains total_chunks as a positive integer matching the total number of chunks sent" + "each POST request body is a valid self-contained envelope with the same structure as a single-send payload" ) -def assert_total_chunks_present(context): - total_chunks = len(context["captured_calls"]) +def assert_each_envelope_is_self_contained(context): for call_item in context["captured_calls"]: post_data = call_item["post_data"] - assert isinstance(post_data["total_chunks"], int) - assert post_data["total_chunks"] > 0 - assert post_data["total_chunks"] == total_chunks + assert "execution_output_structured" in post_data + targets = _extract_targets(post_data) + assert isinstance(targets, list) + assert len(targets) > 0 -@then( - 'each POST request body contains only "signatures", "chunk_index" and "total_chunks" at the top level' -) -def assert_chunked_envelope_is_strict(context): - expected_keys = {"expectation_signature", "chunk_index", "total_chunks", "phase"} +@then("no POST request body contains chunk_index or total_chunks keys") +def assert_no_chunk_metadata(context): for call_item in context["captured_calls"]: post_data = call_item["post_data"] - assert set(post_data.keys()) == expected_keys, ( - f"Chunked envelope must contain exactly {expected_keys}, " - f"got {set(post_data.keys())}" - ) + assert "chunk_index" not in post_data + assert "total_chunks" not in post_data @then("the union of targets across all POST requests equals the original target set") @@ -494,10 +496,10 @@ def assert_targets_union_matches_original(context): sent_targets = [ target for call_item in context["captured_calls"] - for target in call_item["post_data"]["expectation_signature"]["targets"] + for target in _extract_targets(call_item["post_data"]) ] assert len(sent_targets) == len(original_targets), ( - f"Expected {len(original_targets)} targets across all chunks, " + f"Expected {len(original_targets)} targets across all envelopes, " f"got {len(sent_targets)}" ) for original, sent in zip(original_targets, sent_targets): @@ -515,13 +517,13 @@ def assert_payload_size_per_chunk(context): @then( parsers.parse( - "send_signatures sends a total of {total_requests:d} POST requests to /injects/{inject_id}/callback" + "send_signatures sends a total of {total_requests:d} POST requests to /injects/execution/callback/{inject_id}" ) ) def assert_total_post_requests(context, total_requests, inject_id): assert len(context["captured_calls"]) == total_requests assert all( - call_item["path"] == f"/injects/{inject_id}/callback" + call_item["path"] == f"/injects/execution/callback/{inject_id}" for call_item in context["captured_calls"] ) @@ -553,12 +555,15 @@ def assert_signature_transmission_error_after_retries(context): @then( parsers.parse( - "only {request_count:d} POST request is sent to /injects/{inject_id}/callback" + "only {request_count:d} POST request is sent to /injects/execution/callback/{inject_id}" ) ) def assert_single_post_request(context, request_count, inject_id): assert len(context["captured_calls"]) == request_count - assert context["captured_calls"][0]["path"] == f"/injects/{inject_id}/callback" + assert ( + context["captured_calls"][0]["path"] + == f"/injects/execution/callback/{inject_id}" + ) @then( @@ -616,7 +621,8 @@ def assert_no_exception_from_resolve_container_ip(context): ) def assert_signature_values_nested_by_expectation_type(context): body = context["captured_calls"][-1]["post_data"] - entries = body["expectation_signature"]["targets"][0]["signature_values"] + targets = _extract_targets(body) + entries = targets[0]["signature_values"] expectation_types = {entry["expectation_type"] for entry in entries} assert expectation_types == {"DETECTION", "PREVENTION"} @@ -626,7 +632,8 @@ def assert_signature_values_nested_by_expectation_type(context): ) def assert_detection_values_grouped_correctly(context): body = context["captured_calls"][-1]["post_data"] - entries = body["expectation_signature"]["targets"][0]["signature_values"] + targets = _extract_targets(body) + entries = targets[0]["signature_values"] detection_entry = next( entry for entry in entries if entry["expectation_type"] == "DETECTION" ) @@ -640,7 +647,8 @@ def assert_detection_values_grouped_correctly(context): ) def assert_prevention_values_grouped_correctly(context): body = context["captured_calls"][-1]["post_data"] - entries = body["expectation_signature"]["targets"][0]["signature_values"] + targets = _extract_targets(body) + entries = targets[0]["signature_values"] prevention_entry = next( entry for entry in entries if entry["expectation_type"] == "PREVENTION" )