Skip to content

Commit 4739403

Browse files
committed
feat: optional idempotency (ADR-0022) and per-URN schema validation (ADR-0024)
Opt-in, dependency-free helpers; wire envelope stays frozen. Includes the vendored payload_schema cross-SDK conformance cases.
1 parent e3d4428 commit 4739403

8 files changed

Lines changed: 665 additions & 1 deletion

File tree

src/babelqueue/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@
1111

1212
from __future__ import annotations
1313

14-
from . import dead_letter
14+
from . import dead_letter, idempotency
1515
from .app import BabelQueue
1616
from .codec import SCHEMA_VERSION, SOURCE_LANG, EnvelopeCodec
1717
from .contracts import HasTraceId, PolyglotMessage
18+
from .idempotency import IdempotencyStore, InMemoryStore
1819
from .exceptions import BabelQueueError, UnknownUrnError
1920
from .routing import UnknownUrnStrategy
2021
from .transport import InMemoryTransport, ReceivedMessage, Transport
@@ -35,5 +36,8 @@
3536
"BabelQueueError",
3637
"UnknownUrnError",
3738
"dead_letter",
39+
"idempotency",
40+
"IdempotencyStore",
41+
"InMemoryStore",
3842
"__version__",
3943
]

src/babelqueue/exceptions.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,17 @@ class BabelQueueError(Exception):
99

1010
class UnknownUrnError(BabelQueueError):
1111
"""A consumed message carries a URN with no mapped handler (strategy "fail")."""
12+
13+
14+
class InvalidPayloadError(BabelQueueError):
15+
"""A message's ``data`` does not match the JSON Schema registered for its URN (ADR-0024).
16+
17+
Raised by the producer-side :func:`babelqueue.schema.validate` and the consumer-side
18+
:func:`babelqueue.schema.wrap`; the latter lets the runtime redeliver (and eventually
19+
dead-letter) a poison message.
20+
"""
21+
22+
def __init__(self, urn: str, violation: str) -> None:
23+
super().__init__(f"data for {urn!r} does not match its URN schema: {violation}")
24+
self.urn = urn
25+
self.violation = violation

src/babelqueue/idempotency.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
"""Optional idempotency helper (ADR-0022): dedupe a consume handler on ``meta.id``.
2+
3+
The Python mirror of the PHP ``BabelQueue\\Idempotency`` and Go ``idempotency``
4+
helpers. It wraps a handler so a message whose ``meta.id`` was already processed
5+
successfully is skipped instead of run again, composing with the runtime's
6+
ack-on-return / redeliver-on-raise contract::
7+
8+
from babelqueue import BabelQueue
9+
from babelqueue.idempotency import InMemoryStore, wrap
10+
11+
app = BabelQueue("redis://localhost:6379/0", queue="orders")
12+
store = InMemoryStore()
13+
app.register("urn:babel:orders:created", wrap(store, on_order_created))
14+
15+
A previously-seen id returns early (the runtime acks it, so the broker stops
16+
redelivering); a raising handler leaves the id unmarked so a redelivery runs it again
17+
(retry / dead-letter still apply); a message with no usable ``meta.id`` runs unchanged.
18+
This is "seen-set" post-success dedupe — not exactly-once and not in-flight concurrency
19+
locking; a transactional / outbox mode is a documented future direction.
20+
"""
21+
22+
from __future__ import annotations
23+
24+
import functools
25+
import threading
26+
from typing import Any, Callable, Mapping, Protocol, runtime_checkable
27+
28+
Handler = Callable[..., None]
29+
30+
31+
@runtime_checkable
32+
class IdempotencyStore(Protocol):
33+
"""A record of message ids already processed, keyed on ``meta.id``."""
34+
35+
def seen(self, message_id: str) -> bool:
36+
"""Whether this id has already been processed (remembered)."""
37+
38+
def remember(self, message_id: str) -> None:
39+
"""Record this id as processed."""
40+
41+
def forget(self, message_id: str) -> None:
42+
"""Drop an id from the store (manual eviction; a backend may also expire ids)."""
43+
44+
45+
class InMemoryStore:
46+
"""Process-local, thread-safe :class:`IdempotencyStore`.
47+
48+
For tests and single-process consumers; it is not shared across workers and not
49+
persistent — use a Redis- or database-backed store for production fleets.
50+
"""
51+
52+
def __init__(self) -> None:
53+
self._seen: set[str] = set()
54+
self._lock = threading.Lock()
55+
56+
def seen(self, message_id: str) -> bool:
57+
with self._lock:
58+
return message_id in self._seen
59+
60+
def remember(self, message_id: str) -> None:
61+
with self._lock:
62+
self._seen.add(message_id)
63+
64+
def forget(self, message_id: str) -> None:
65+
with self._lock:
66+
self._seen.discard(message_id)
67+
68+
69+
def wrap(store: IdempotencyStore, handler: Handler) -> Handler:
70+
"""Wrap ``handler`` so a message whose ``meta.id`` was already processed is skipped.
71+
72+
The returned callable keeps ``handler``'s signature (via :func:`functools.wraps`),
73+
so the runtime's introspection still passes it the right number of positional args
74+
(``data, meta`` or ``data, meta, envelope``).
75+
"""
76+
77+
@functools.wraps(handler)
78+
def wrapped(*args: Any) -> None:
79+
meta = args[1] if len(args) > 1 and isinstance(args[1], Mapping) else {}
80+
message_id = meta.get("id")
81+
82+
# No usable id → cannot dedupe; run the handler unchanged.
83+
if not isinstance(message_id, str) or message_id == "":
84+
handler(*args)
85+
return
86+
87+
# Already processed on an earlier delivery: return so the runtime acks it.
88+
if store.seen(message_id):
89+
return
90+
91+
# First success wins; a raise here leaves the id unmarked → retry/DLQ apply.
92+
handler(*args)
93+
store.remember(message_id)
94+
95+
return wrapped

src/babelqueue/schema.py

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
"""Optional per-URN payload schema validation (ADR-0024).
2+
3+
The Python mirror of the Go ``schema`` package and PHP ``BabelQueue\\Schema``. A
4+
:class:`SchemaProvider` supplies a JSON Schema for a message URN — typically read from a
5+
babelqueue-registry ``registry.json`` — and the message's ``data`` is validated against it.
6+
It is opt-in: a URN with no registered schema is never validated.
7+
8+
- **Producer-side (recommended):** call :func:`validate` before publishing so invalid data
9+
never enters the queue, or :func:`check` to branch without raising::
10+
11+
from babelqueue.schema import MapProvider, validate
12+
13+
provider = MapProvider.from_json({"urn:babel:orders:created": ORDERS_SCHEMA_JSON})
14+
validate(provider, "urn:babel:orders:created", {"order_id": 7}) # raises on mismatch
15+
16+
- **Consumer-side (safety net):** wrap a handler with :func:`wrap`. Because a Python handler
17+
receives ``data`` (and ``meta``, ``envelope``) positionally rather than a message object,
18+
the URN is passed explicitly — usually the same URN you register under::
19+
20+
app.register(URN, wrap(provider, URN, on_order_created))
21+
22+
Invalid data raises :class:`~babelqueue.exceptions.InvalidPayloadError`, so the runtime
23+
redelivers (and eventually dead-letters) the poison message; a URN with no schema runs the
24+
handler unchanged.
25+
26+
The validator is a small subset of JSON Schema (draft-07) whose verdicts match the Go and
27+
PHP validators and babelqueue-registry's ``compat`` linter: ``type``, ``required``,
28+
``properties``, ``additionalProperties``, ``items``, ``enum``, ``const``, ``minLength``,
29+
``minimum``. Unknown keywords are ignored. Zero dependencies (stdlib only).
30+
"""
31+
32+
from __future__ import annotations
33+
34+
import functools
35+
import json
36+
import os
37+
import threading
38+
from typing import Any, Callable, Mapping, Optional, Protocol, runtime_checkable
39+
40+
from .exceptions import InvalidPayloadError
41+
42+
Handler = Callable[..., None]
43+
44+
45+
@runtime_checkable
46+
class SchemaProvider(Protocol):
47+
"""A source of per-URN ``data`` schemas, keyed on the message URN."""
48+
49+
def schema_for(self, urn: str) -> Optional[Mapping[str, Any]]:
50+
"""The JSON Schema registered for ``urn``, or None when none is registered."""
51+
52+
53+
class MapProvider:
54+
"""In-memory :class:`SchemaProvider`, for tests and for embedding schemas in code."""
55+
56+
def __init__(self, schemas: Mapping[str, Mapping[str, Any]]) -> None:
57+
self._schemas: dict[str, Mapping[str, Any]] = dict(schemas)
58+
59+
@classmethod
60+
def from_json(cls, raw: Mapping[str, str]) -> "MapProvider":
61+
"""Build a provider from URN -> raw JSON Schema strings, decoding each."""
62+
schemas: dict[str, Mapping[str, Any]] = {}
63+
for urn, body in raw.items():
64+
decoded = json.loads(body)
65+
if not isinstance(decoded, dict):
66+
raise ValueError(f"schema: invalid JSON schema for {urn!r}")
67+
schemas[urn] = decoded
68+
return cls(schemas)
69+
70+
def schema_for(self, urn: str) -> Optional[Mapping[str, Any]]:
71+
return self._schemas.get(urn)
72+
73+
74+
class DirProvider:
75+
"""Reads schemas from a babelqueue-registry manifest (``registry.json``): a list of
76+
``{urn, schema}`` entries mapping each URN to a schema file for its ``data`` block. The
77+
bridge that makes the registry's governed schemas enforceable at runtime. Schema files
78+
are read and decoded lazily and cached (thread-safe). A URN absent from the manifest
79+
returns None (skip); a URN whose schema file is missing raises (config/IO error)."""
80+
81+
def __init__(self, manifest_path: str) -> None:
82+
with open(manifest_path, "r", encoding="utf-8") as fh:
83+
manifest = json.load(fh)
84+
if not isinstance(manifest, dict):
85+
raise ValueError(f"schema: invalid registry manifest {manifest_path!r}")
86+
87+
self._dir = os.path.dirname(manifest_path)
88+
self._files: dict[str, str] = {}
89+
self._cache: dict[str, Mapping[str, Any]] = {}
90+
self._lock = threading.Lock()
91+
92+
entries = manifest.get("schemas") or []
93+
if isinstance(entries, list):
94+
for entry in entries:
95+
if not isinstance(entry, Mapping):
96+
continue
97+
urn = entry.get("urn")
98+
file = entry.get("schema")
99+
if isinstance(urn, str) and urn and isinstance(file, str) and file:
100+
self._files[urn] = file
101+
102+
def schema_for(self, urn: str) -> Optional[Mapping[str, Any]]:
103+
with self._lock:
104+
cached = self._cache.get(urn)
105+
if cached is not None:
106+
return cached
107+
file = self._files.get(urn)
108+
if file is None:
109+
return None
110+
path = file if os.path.isabs(file) else os.path.join(self._dir, file)
111+
with open(path, "r", encoding="utf-8") as fh:
112+
decoded = json.load(fh)
113+
if not isinstance(decoded, dict):
114+
raise ValueError(f"schema: invalid schema for {urn!r} ({file})")
115+
self._cache[urn] = decoded
116+
return decoded
117+
118+
119+
def check(provider: SchemaProvider, urn: str, data: Mapping[str, Any]) -> Optional[str]:
120+
"""The first ``data`` violation for ``(urn, data)``, or None when it is valid or when no
121+
schema is registered for the URN (opt-in). Non-raising; for producer-side branching."""
122+
schema = provider.schema_for(urn)
123+
if schema is None:
124+
return None
125+
return validate_schema(schema, dict(data))
126+
127+
128+
def validate(provider: SchemaProvider, urn: str, data: Mapping[str, Any]) -> None:
129+
"""Validate ``(urn, data)`` against its registered schema, raising otherwise. The
130+
producer-side guard; call it before publishing.
131+
132+
:raises InvalidPayloadError: when the data does not match the URN's schema.
133+
"""
134+
violation = check(provider, urn, data)
135+
if violation is not None:
136+
raise InvalidPayloadError(urn, violation)
137+
138+
139+
def wrap(provider: SchemaProvider, urn: str, handler: Handler) -> Handler:
140+
"""Wrap a consume handler so each message's ``data`` is validated against ``urn``'s
141+
schema before the handler runs. The returned callable keeps ``handler``'s signature (via
142+
:func:`functools.wraps`), so the runtime still passes it the right number of positional
143+
args (``data, meta`` or ``data, meta, envelope``)."""
144+
145+
@functools.wraps(handler)
146+
def wrapped(*args: Any) -> None:
147+
data = args[0] if args and isinstance(args[0], Mapping) else {}
148+
validate(provider, urn, data)
149+
handler(*args)
150+
151+
return wrapped
152+
153+
154+
def validate_schema(schema: Mapping[str, Any], value: Any, path: str = "") -> Optional[str]:
155+
"""The first violation of ``value`` against a (subset) JSON Schema node, or None."""
156+
if "const" in schema and not _equal(value, schema["const"]):
157+
return _violation(path, "wrong_const")
158+
159+
enum = schema.get("enum")
160+
if isinstance(enum, list) and not any(_equal(value, item) for item in enum):
161+
return _violation(path, "not_in_enum")
162+
163+
typ = schema.get("type")
164+
if typ == "object":
165+
return _check_object(schema, value, path)
166+
if typ == "array":
167+
return _check_array(schema, value, path)
168+
if typ == "string":
169+
if not isinstance(value, str):
170+
return _violation(path, "not_a_string")
171+
min_len = schema.get("minLength")
172+
if isinstance(min_len, (int, float)) and len(value) < int(min_len):
173+
return _violation(path, "below_min_length")
174+
return None
175+
if typ == "integer":
176+
if not _is_integer(value):
177+
return _violation(path, "not_an_integer")
178+
return _check_minimum(schema, value, path)
179+
if typ == "number":
180+
if not _is_number(value):
181+
return _violation(path, "not_a_number")
182+
return _check_minimum(schema, value, path)
183+
if typ == "boolean":
184+
return None if isinstance(value, bool) else _violation(path, "not_a_boolean")
185+
if typ == "null":
186+
return None if value is None else _violation(path, "not_null")
187+
return None
188+
189+
190+
def _check_object(schema: Mapping[str, Any], value: Any, path: str) -> Optional[str]:
191+
if not isinstance(value, Mapping):
192+
return _violation(path, "not_an_object")
193+
194+
required = schema.get("required")
195+
if isinstance(required, list):
196+
for key in required:
197+
if isinstance(key, str) and key not in value:
198+
return _violation(_join(path, key), "missing_required")
199+
200+
properties = schema.get("properties")
201+
properties = properties if isinstance(properties, Mapping) else {}
202+
additional_allowed = schema.get("additionalProperties") is not False
203+
204+
for key, item in value.items():
205+
name = str(key)
206+
prop = properties.get(name)
207+
if isinstance(prop, Mapping):
208+
violation = validate_schema(prop, item, _join(path, name))
209+
if violation is not None:
210+
return violation
211+
continue
212+
if not additional_allowed:
213+
return _violation(_join(path, name), "additional_not_allowed")
214+
215+
return None
216+
217+
218+
def _check_array(schema: Mapping[str, Any], value: Any, path: str) -> Optional[str]:
219+
if not isinstance(value, list):
220+
return _violation(path, "not_an_array")
221+
items = schema.get("items")
222+
if not isinstance(items, Mapping):
223+
return None
224+
for i, item in enumerate(value):
225+
violation = validate_schema(items, item, f"{path}[{i}]")
226+
if violation is not None:
227+
return violation
228+
return None
229+
230+
231+
def _check_minimum(schema: Mapping[str, Any], value: Any, path: str) -> Optional[str]:
232+
minimum = schema.get("minimum")
233+
if isinstance(minimum, (int, float)) and not isinstance(minimum, bool) and float(value) < float(minimum):
234+
return _violation(path, "below_minimum")
235+
return None
236+
237+
238+
def _is_integer(value: Any) -> bool:
239+
if isinstance(value, bool):
240+
return False
241+
if isinstance(value, int):
242+
return True
243+
return isinstance(value, float) and value.is_integer()
244+
245+
246+
def _is_number(value: Any) -> bool:
247+
return isinstance(value, (int, float)) and not isinstance(value, bool)
248+
249+
250+
def _equal(a: Any, b: Any) -> bool:
251+
# Type-aware equality so True != 1 and an integer const never matches a float value,
252+
# matching the strict comparisons in the Go and PHP validators.
253+
return type(a) is type(b) and a == b
254+
255+
256+
def _violation(path: str, reason: str) -> str:
257+
return f"{path or '<root>'}: {reason}"
258+
259+
260+
def _join(path: str, key: str) -> str:
261+
return key if path == "" else f"{path}.{key}"

0 commit comments

Comments
 (0)