Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
b1fac81
fix(api): realigning the signature API path (#284)
guzmud Jun 10, 2026
23ec1fa
fix(signature): SignatureTarget comeback (#284)
guzmud Jun 10, 2026
86570b6
fix(signature): withdrawing external injector type and related elemen…
guzmud Jun 17, 2026
71fb0ba
feat(signature): adding InjectionExecutionActions type mirroring OAEV…
guzmud Jun 17, 2026
61d916a
fix(signature): updating SignatureCallbackPayload and creating Signat…
guzmud Jun 17, 2026
e73edd4
feat(signature): adding a ExecutionDetails model for non-signature el…
guzmud Jun 18, 2026
247bf69
fix(signature): use the new models to build the payload and withdraw …
guzmud Jun 18, 2026
b3643af
✅ test(signature): update transmission constraints for envelope-split…
Kakudou Jun 19, 2026
e6efd4e
✅ test(signature): update transmission step implementations for envel…
Kakudou Jun 19, 2026
70de05c
✨ feat(signature): implement envelope-split chunking in SignatureApiM…
Kakudou Jun 19, 2026
50405ed
♻️ refactor(signature_manager): wire send_signatures to typed models …
Kakudou Jun 19, 2026
270c137
🐛 fix(signature_type): correct match_score type annotation to int | None
Kakudou Jun 19, 2026
427f3b6
🐛 fix(models): use Field default_factory for UTC-aware start_time
Kakudou Jun 19, 2026
f9b5643
🐛 fix(models): replace set() with ordered dedup list in normalize_sig…
Kakudou Jun 19, 2026
277da77
🐛 fix(models): apply math.ceil with None guard to execution_duration
Kakudou Jun 19, 2026
f1d2753
🐛 fix(models): return empty dict instead of None from get_extra
Kakudou Jun 19, 2026
40f558e
🐛 fix(signature_manager): guard extra_signatures None in build_payload
Kakudou Jun 19, 2026
cde6120
🐛 fix(signature_manager): exclude unknown sentinel from IPv4 cache hit
Kakudou Jun 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 108 additions & 159 deletions pyoaev/apis/signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
from pyoaev import exceptions as exc
from pyoaev.base import RESTManager, RESTObject
from pyoaev.exceptions import SignatureTransmissionError
from pyoaev.signatures.models import SignatureCallbackPayload
from pyoaev.signatures.models import (
ExecutionDetails,
SignatureCallbackPayload,
SignatureOutputStructure,
)


class Signature(RESTObject):
Expand All @@ -22,7 +26,7 @@ class Signature(RESTObject):
class SignatureApiManager(RESTManager):
"""Manage signature callback transport to the OpenAEV backend.

Handles payload validation, auto-chunking, and retry with exponential backoff.
Handles payload validation and retry with exponential backoff.
"""

_path = "/injects"
Expand All @@ -32,10 +36,6 @@ class SignatureApiManager(RESTManager):
MAX_RETRIES = 3
RETRY_DELAYS = (1, 2, 4)

_CHUNK_METADATA_RESERVE = len(
',"chunk_index":99999,"total_chunks":99999,"phase":"execution_complete_extended"'
)

def __init__(self, openaev: "Any", parent: "Any" = None) -> None:
"""Initialize the signature API manager.

Expand Down Expand Up @@ -68,47 +68,66 @@ def logger(self, value: logging.Logger) -> None:
def send_signatures(
self,
inject_id: str,
phase: str,
signatures: dict[str, Any],
signatures: SignatureOutputStructure,
execution_details: ExecutionDetails,
max_payload_size: int | None = None,
logger: logging.Logger | None = None,
) -> None:
"""Send compiled signatures to the inject callback endpoint.

Auto-chunks payloads exceeding max_payload_size and retries on 5xx errors.

Args:
inject_id: Inject UUID.
phase: Execution phase (e.g. 'execution_complete').
signatures: Full signatures dict (canonical or flat, grouped on the fly).
execution_details:

Raises:
SignatureTransmissionError: Validation failed, 4xx hit, or retries exhausted.
"""
self._logger.debug("send_signatures inject_id=%s phase=%s", inject_id, phase)
signatures = self._normalize_signature_payload(signatures)
payload = self._build_callback_payload(signatures, phase=phase)
effective_max_size = (
max_payload_size if max_payload_size is not None else self._max_payload_size
)
effective_logger = logger if logger is not None else self._logger

serialized = json.dumps(payload, separators=(",", ":")).encode()
effective_logger.debug(
"send_signatures inject_id=%s, execution_status=%s, execution_action=%s",
inject_id,
execution_details.execution_status,
execution_details.execution_action,
)
signatures.normalize_signature_payload()
payload = self._build_callback_payload(
signatures=signatures, execution_details=execution_details
)
payload_size = len(json.dumps(payload).encode("utf-8"))

if len(serialized) <= self._max_payload_size:
self._send_with_retry(inject_id, payload)
else:
self._send_chunked(inject_id, payload["expectation_signature"], phase=phase)
if payload_size <= effective_max_size:
self._send_with_retry(inject_id, payload, logger=effective_logger)
return

sig_data = json.loads(payload["execution_output_structured"])
targets = sig_data["signatures"]["targets"]
envelopes = self._split_into_envelopes(
payload,
sig_data,
targets,
max_payload_size=effective_max_size,
logger=effective_logger,
)
for envelope in envelopes:
self._send_with_retry(inject_id, envelope, logger=effective_logger)

def _build_callback_payload(
self,
signatures: dict[str, Any],
*,
phase: str | None = None,
chunk_index: int | None = None,
total_chunks: int | None = None,
signatures: SignatureOutputStructure,
execution_details: ExecutionDetails,
) -> dict[str, Any]:
"""Validate and wrap signatures in the strict callback envelope.

Args:
signatures: The inner signatures body, already normalised.
phase: Execution phase string (e.g. 'execution_complete').
chunk_index: 0-based index when chunking, None for single POSTs.
total_chunks: Chunk count when chunking, None for single POSTs.
execution_details: The execution metadata to be stored next to the signatures in the payload.

Returns:
The validated dict ready for wire transmission.
Expand All @@ -117,150 +136,76 @@ def _build_callback_payload(
SignatureTransmissionError: Envelope failed Pydantic validation.
"""
try:
envelope = SignatureCallbackPayload.model_validate(
{
"expectation_signature": signatures,
"phase": phase,
"chunk_index": chunk_index,
"total_chunks": total_chunks,
}
envelope = SignatureCallbackPayload.build_from_models(
signatures, execution_details
)
except ValidationError as ve:
raise SignatureTransmissionError(
error_message=f"Invalid signatures payload: {ve}",
) from ve
return envelope.model_dump(mode="json", exclude_none=True)

def _normalize_signature_payload(
self, signatures: dict[str, Any]
) -> dict[str, Any]:
"""Regroup signature_values by expectation_type within each target.
envelope_dict = envelope.model_dump(mode="json", exclude_none=True)
SignatureCallbackPayload.model_validate(envelope_dict)
return envelope_dict

Accepts flat or pre-grouped input and returns canonical grouped form.
def _split_into_envelopes(
self,
base_payload: dict[str, Any],
sig_data: dict[str, Any],
targets: list[dict[str, Any]],
max_payload_size: int | None = None,
logger: logging.Logger | None = None,
) -> list[dict[str, Any]]:
effective_max = (
max_payload_size if max_payload_size is not None else self._max_payload_size
)
effective_logger = logger if logger is not None else self._logger

Args:
signatures: Raw signatures dict with any mix of flat and grouped entries.
envelopes: list[dict[str, Any]] = []
current_targets: list[dict[str, Any]] = []

Returns:
New dict where every signature_values list is in canonical grouped form.
"""
targets = signatures.get("targets")
if not targets:
return signatures

normalized_targets: list[dict[str, Any]] = []
for target in targets:
sig_values = target.get("signature_values")
if not sig_values:
normalized_targets.append(target)
continue

grouped: dict[str, list[dict[str, Any]]] = {}
order: list[str] = []

for entry in sig_values:
etype = entry.get("expectation_type")
if etype not in grouped:
grouped[etype] = []
order.append(etype)

if "values" in entry and isinstance(entry["values"], list):
grouped[etype].extend(entry["values"])
trial_targets = current_targets + [target]
trial_envelope = self._build_envelope(base_payload, sig_data, trial_targets)
trial_size = len(json.dumps(trial_envelope).encode("utf-8"))

if trial_size > effective_max:
if current_targets:
envelopes.append(
self._build_envelope(base_payload, sig_data, current_targets)
)
current_targets = [target]
else:
grouped[etype].append(
{k: v for k, v in entry.items() if k != "expectation_type"}
effective_logger.warning(
"Single target exceeds max_payload_size (%d bytes > %d limit). Sending oversized envelope.",
trial_size,
effective_max,
)
envelopes.append(trial_envelope)
current_targets = []
else:
current_targets = trial_targets

if current_targets:
envelopes.append(
self._build_envelope(base_payload, sig_data, current_targets)
)

normalized_target = dict(target)
normalized_target["signature_values"] = [
{"expectation_type": etype, "values": grouped[etype]} for etype in order
]
normalized_targets.append(normalized_target)

normalized = dict(signatures)
normalized["targets"] = normalized_targets
return normalized

def _send_chunked(
self, inject_id: str, signatures: dict[str, Any], phase: str | None = None
) -> None:
"""Split targets across sequential POSTs, each tagged with chunk metadata.

Args:
inject_id: Inject UUID for the callback path.
signatures: Normalised inner signatures body to partition.
phase: Execution phase forwarded to each chunk envelope.

Raises:
SignatureTransmissionError: A single target alone exceeds max_payload_size.
"""
targets = signatures.get("targets", [])
if not targets:
payload = self._build_callback_payload(signatures, phase=phase)
size = len(json.dumps(payload, separators=(",", ":")).encode())
if size > self._max_payload_size:
self._logger.warning(
"Payload of %d bytes exceeds max_payload_size %d but has no "
"'targets' key to chunk on; sending unchunked",
size,
self._max_payload_size,
)
self._send_with_retry(inject_id, payload)
return

budget = max(self._max_payload_size - self._CHUNK_METADATA_RESERVE, 0)
chunks: list[list[Any]] = []
current_chunk: list[Any] = []
return envelopes

for target in targets:
candidate = current_chunk + [target]
size = len(
json.dumps(
{"expectation_signature": {"targets": candidate}},
separators=(",", ":"),
).encode()
)
def _build_envelope(
self,
base_payload: dict[str, Any],
sig_data: dict[str, Any],
targets_subset: list[dict[str, Any]],
) -> dict[str, Any]:
subset_sig = dict(sig_data)
subset_sig["signatures"] = dict(sig_data["signatures"])
subset_sig["signatures"]["targets"] = targets_subset

if size <= budget:
current_chunk.append(target)
continue

if not current_chunk:
raise SignatureTransmissionError(
error_message=(
f"Single target payload of {size} bytes exceeds "
f"max_payload_size {self._max_payload_size}; cannot chunk further"
),
)

chunks.append(current_chunk)
current_chunk = [target]
solo_size = len(
json.dumps(
{"expectation_signature": {"targets": [target]}},
separators=(",", ":"),
).encode()
)
if solo_size > budget:
raise SignatureTransmissionError(
error_message=(
f"Single target payload of {solo_size} bytes exceeds "
f"max_payload_size {self._max_payload_size}; cannot chunk further"
),
)

if current_chunk:
chunks.append(current_chunk)

total_chunks = len(chunks)
for idx, chunk_targets in enumerate(chunks):
chunk_payload = self._build_callback_payload(
{"targets": chunk_targets},
phase=phase,
chunk_index=idx,
total_chunks=total_chunks,
)
self._send_with_retry(inject_id, chunk_payload)
envelope = dict(base_payload)
envelope["execution_output_structured"] = json.dumps(subset_sig)
SignatureCallbackPayload.model_validate(envelope)
return envelope

@exc.on_http_error(exc.OpenAEVUpdateError)
def callback(
Expand All @@ -276,12 +221,15 @@ def callback(
Returns:
The parsed response from the backend.
"""
path = f"{self.path}/{inject_id}/callback"
path = f"{self.path}/execution/callback/{inject_id}"
result = self.openaev.http_post(path, post_data=data, **kwargs)
return result

def _send_with_retry(
self, inject_id: str, payload: dict[str, Any]
self,
inject_id: str,
payload: dict[str, Any],
logger: logging.Logger | None = None,
) -> dict[str, Any]:
"""Retry callback() with exponential backoff on 5xx, immediate raise on 4xx.

Expand All @@ -297,6 +245,7 @@ def _send_with_retry(
"""
from pyoaev.exceptions import OpenAEVError

effective_logger = logger if logger is not None else self._logger
last_error: Exception | None = None

for attempt in range(self.MAX_RETRIES + 1):
Expand All @@ -308,7 +257,7 @@ def _send_with_retry(
body_str = ""
if ex.response_body:
body_str = ex.response_body.decode(errors="replace")
self._logger.error(
effective_logger.error(
"Client error %d sending signatures: %s",
status,
body_str or ex.error_message,
Expand All @@ -322,7 +271,7 @@ def _send_with_retry(
last_error = ex
if attempt < self.MAX_RETRIES:
delay = self.RETRY_DELAYS[attempt]
self._logger.warning(
effective_logger.warning(
"Retry %d/%d after %ds (HTTP %s): %s",
attempt + 1,
self.MAX_RETRIES,
Expand Down
Loading
Loading