Skip to content

Commit 474303e

Browse files
feat(replay): Replay-Bypass guard — skip external side-effects on a deliberate replay (ADR-0027) (#4)
* feat(replay): Replay-Bypass guard — skip external side-effects on a deliberate replay (ADR-0027) Parity with the Go reference: ReceivedMessage headers + HeaderPublisher, Redrive Bypass option stamping bq-replay-bypass, and is_replay / bypass_external_effects helpers. 160 tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * chore(release): v1.10.0 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent e16002a commit 474303e

8 files changed

Lines changed: 275 additions & 26 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "babelqueue"
7-
version = "1.9.0"
7+
version = "1.10.0"
88
description = "Polyglot Queues, Simplified — the Python core: the canonical BabelQueue wire-envelope codec, contracts and dead-letter helpers."
99
readme = "README.md"
1010
requires-python = ">=3.9"

src/babelqueue/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@
1111

1212
from __future__ import annotations
1313

14-
from . import dead_letter, idempotency, redrive
14+
from . import dead_letter, idempotency, redrive, replay
1515
from .app import BabelQueue
1616
from .codec import SCHEMA_VERSION, SOURCE_LANG, EnvelopeCodec
1717
from .contracts import HasTraceId, PolyglotMessage
1818
from .idempotency import IdempotencyStore, InMemoryStore
1919
from .exceptions import BabelQueueError, UnknownUrnError
20+
from .replay import HEADER_REPLAY_BYPASS, bypass_external_effects, is_replay
2021
from .routing import UnknownUrnStrategy
21-
from .transport import InMemoryTransport, ReceivedMessage, Transport
22+
from .transport import HeaderPublisher, InMemoryTransport, ReceivedMessage, Transport
2223

2324
__version__ = "1.6.0"
2425

@@ -33,11 +34,16 @@
3334
"Transport",
3435
"InMemoryTransport",
3536
"ReceivedMessage",
37+
"HeaderPublisher",
3638
"BabelQueueError",
3739
"UnknownUrnError",
3840
"dead_letter",
3941
"idempotency",
4042
"redrive",
43+
"replay",
44+
"is_replay",
45+
"bypass_external_effects",
46+
"HEADER_REPLAY_BYPASS",
4147
"IdempotencyStore",
4248
"InMemoryStore",
4349
"__version__",

src/babelqueue/app.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ def on_order_created(data, meta):
2525
from . import dead_letter
2626
from .codec import EnvelopeCodec
2727
from .exceptions import UnknownUrnError
28+
from .replay import HEADER_REPLAY_BYPASS, _replay_scope
2829
from .routing import UnknownUrnStrategy
2930
from .transport import ReceivedMessage, Transport, make_transport
3031

@@ -116,18 +117,19 @@ def consume(
116117

117118
def dispatch(self, received: ReceivedMessage) -> None:
118119
"""Route one reserved message to its handler and acknowledge it."""
119-
envelope = EnvelopeCodec.decode(received.body)
120-
urn = str(envelope.get("job") or envelope.get("urn") or "")
121-
handler = self._handlers.get(urn) if urn else None
122-
123-
try:
124-
if handler is None:
125-
self._route_unknown(urn, received, envelope)
126-
return
127-
self._invoke(handler, envelope)
128-
self.transport.ack(received)
129-
except Exception as exc: # noqa: BLE001 - one bad message must not kill the loop
130-
self._retry_or_dead_letter(received, envelope, exc)
120+
with _replay_scope(bool(received.headers.get(HEADER_REPLAY_BYPASS))):
121+
envelope = EnvelopeCodec.decode(received.body)
122+
urn = str(envelope.get("job") or envelope.get("urn") or "")
123+
handler = self._handlers.get(urn) if urn else None
124+
125+
try:
126+
if handler is None:
127+
self._route_unknown(urn, received, envelope)
128+
return
129+
self._invoke(handler, envelope)
130+
self.transport.ack(received)
131+
except Exception as exc: # noqa: BLE001 - one bad message must not kill the loop
132+
self._retry_or_dead_letter(received, envelope, exc)
131133

132134
# -- Internals ----------------------------------------------------------
133135

src/babelqueue/redrive.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@
2020
acknowledged only after a successful re-publish, and an undecodable body is restored, not
2121
dropped.
2222
23-
Replay safety today is sandbox routing (``to_queue``) + ``dry_run``. The **Replay-Bypass**
24-
guard — a ``bq-replay-bypass`` transport header surfaced to handlers so a replay can skip
25-
external side-effects (don't re-charge, don't re-email) — is a documented phase two: like the
26-
OpenTelemetry ``traceparent`` follow-up, it carries out-of-band metadata as a transport header
27-
and so touches the runtime + every transport binding. Until then, sandbox routing is the
28-
safe-replay answer.
23+
Replay safety is sandbox routing (``to_queue``) + ``dry_run``, plus the **Replay-Bypass** guard
24+
(``bypass=True``): it stamps a ``bq-replay-bypass`` transport header that the runtime surfaces to
25+
handlers via :func:`babelqueue.is_replay` / :func:`babelqueue.bypass_external_effects`, so a
26+
replay can skip external side-effects that already fired (don't re-charge, don't re-email) — see
27+
:mod:`babelqueue.replay` (ADR-0027). The header rides out of band, so the envelope stays frozen;
28+
it propagates over a real broker only once that broker's transport implements the optional
29+
:class:`~babelqueue.transport.HeaderPublisher` capability (the in-memory transport does today).
2930
"""
3031

3132
from __future__ import annotations
@@ -34,7 +35,8 @@
3435
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple
3536

3637
from .codec import EnvelopeCodec
37-
from .transport import ReceivedMessage, Transport
38+
from .replay import HEADER_REPLAY_BYPASS
39+
from .transport import HeaderPublisher, ReceivedMessage, Transport
3840

3941
Envelope = Mapping[str, Any]
4042
Select = Callable[[Envelope], bool]
@@ -51,6 +53,7 @@ class RedriveItem:
5153
from_queue: str
5254
to: str # target queue (the plan, even on a dry run; "" when skipped/undecodable)
5355
redriven: bool # True only when actually re-published to ``to``
56+
bypassed: bool = False # True when the bq-replay-bypass header was stamped on the message
5457

5558

5659
@dataclass
@@ -70,6 +73,7 @@ def redrive(
7073
max: int = 0,
7174
dry_run: bool = False,
7275
select: Optional[Select] = None,
76+
bypass: bool = False,
7377
timeout: float = 1.0,
7478
) -> RedriveResult:
7579
"""Move dead-lettered messages off ``dlq`` and replay them; see the module docstring."""
@@ -125,7 +129,9 @@ def redrive(
125129
reset.pop("dead_letter", None)
126130
reset["attempts"] = 0
127131
try:
128-
transport.publish(target, EnvelopeCodec.encode(reset))
132+
item.bypassed = _publish_redriven(
133+
transport, target, EnvelopeCodec.encode(reset), bypass
134+
)
129135
except Exception:
130136
transport.publish(dlq, message.body) # restore on a publish failure, then surface
131137
transport.ack(message)
@@ -150,6 +156,17 @@ def _decoded(body: str) -> Optional[Dict[str, Any]]:
150156
return envelope
151157

152158

159+
def _publish_redriven(transport: Transport, queue: str, body: str, bypass: bool) -> bool:
160+
"""Re-publish a reset message to ``queue``. When ``bypass`` is set and the transport is a
161+
:class:`~babelqueue.transport.HeaderPublisher`, stamp the ``bq-replay-bypass`` header and
162+
return True; otherwise publish plainly and return False."""
163+
if bypass and isinstance(transport, HeaderPublisher):
164+
transport.publish_with_headers(queue, body, {HEADER_REPLAY_BYPASS: "1"})
165+
return True
166+
transport.publish(queue, body)
167+
return False
168+
169+
153170
def _source_queue_of(envelope: Envelope) -> str:
154171
"""Default redrive target: ``dead_letter.original_queue``, falling back to ``meta.queue``."""
155172
dead_letter = envelope.get("dead_letter")

src/babelqueue/replay.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""Optional Replay-Bypass guard (ADR-0027): skip external side-effects on a deliberate replay.
2+
3+
The Python mirror of the Go ``replay.go``. It closes the loop left open by
4+
:mod:`babelqueue.redrive`: a deliberate replay re-runs the handler, and its external
5+
side-effects re-fire (a second charge, a duplicate email). :mod:`babelqueue.idempotency` stops
6+
an *accidental* duplicate; this stops the *intended* reprocess from re-firing effects that
7+
already happened.
8+
9+
``Redrive`` (with ``bypass=True``) stamps a ``bq-replay-bypass`` transport header on a redriven
10+
message; the runtime surfaces it to the handler via a :class:`contextvars.ContextVar`. A handler
11+
wraps its external, non-idempotent side in :func:`bypass_external_effects` so a replay re-runs the
12+
idempotent core but skips effects that already fired::
13+
14+
from babelqueue import is_replay, bypass_external_effects
15+
16+
@app.handler("urn:babel:orders:created")
17+
def on_order_created(data, meta):
18+
save_order(data) # idempotent core — always runs
19+
bypass_external_effects(lambda: send_email(data)) # external effect — skipped on replay
20+
21+
The marker rides **out of band** as a transport header, so the frozen envelope is untouched
22+
(GR-1). It propagates over a real broker only once that broker's concrete transport implements
23+
the optional :class:`~babelqueue.transport.HeaderPublisher` capability — a follow-up, like the
24+
broker bindings; the in-memory transport supports it today, so the path is end-to-end testable.
25+
"""
26+
27+
from __future__ import annotations
28+
29+
import contextlib
30+
import contextvars
31+
from typing import Callable, Iterator, Optional, TypeVar
32+
33+
#: The out-of-band transport header :func:`~babelqueue.redrive.redrive` stamps on a replayed
34+
#: message (with ``bypass=True``) and that the runtime surfaces as :func:`is_replay`.
35+
HEADER_REPLAY_BYPASS = "bq-replay-bypass"
36+
37+
_replay_var: contextvars.ContextVar[bool] = contextvars.ContextVar(
38+
"babelqueue_replay", default=False
39+
)
40+
41+
T = TypeVar("T")
42+
43+
44+
def is_replay() -> bool:
45+
"""Whether the message currently being handled was redriven with the replay-bypass marker.
46+
47+
True means this is a deliberate replay, so external side-effects that already happened should
48+
be skipped. Reads the flag the runtime set on the context from the
49+
:data:`HEADER_REPLAY_BYPASS` transport header.
50+
"""
51+
return _replay_var.get()
52+
53+
54+
def bypass_external_effects(fn: Callable[[], T]) -> Optional[T]:
55+
"""Run ``fn`` unless the current message is a replay (see :func:`is_replay`), in which case
56+
skip it and return ``None``.
57+
58+
Wrap the external, non-idempotent side of a handler — sending an email, charging a card,
59+
calling a third party — so a replay re-runs the idempotent core but does not re-fire effects
60+
that already happened.
61+
"""
62+
if is_replay():
63+
return None
64+
return fn()
65+
66+
67+
@contextlib.contextmanager
68+
def _replay_scope(active: bool) -> Iterator[None]:
69+
"""Internal: mark the current context as a replay (or not) for the span of one dispatch."""
70+
token = _replay_var.set(active)
71+
try:
72+
yield
73+
finally:
74+
_replay_var.reset(token)

src/babelqueue/transport.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88

99
from abc import ABC, abstractmethod
1010
from collections import defaultdict, deque
11-
from dataclasses import dataclass
12-
from typing import Any, Deque, Dict, Optional
11+
from dataclasses import dataclass, field
12+
from typing import Any, Deque, Dict, Optional, Protocol, runtime_checkable
1313

1414
from .exceptions import BabelQueueError
1515

@@ -21,6 +21,9 @@ class ReceivedMessage:
2121
body: str
2222
queue: str
2323
handle: Any = None
24+
#: Out-of-band transport headers a :class:`HeaderPublisher` carried with the message
25+
#: (e.g. the ``bq-replay-bypass`` marker). Empty for transports that don't surface them.
26+
headers: Dict[str, str] = field(default_factory=dict)
2427

2528

2629
class Transport(ABC):
@@ -42,20 +45,42 @@ def close(self) -> None: # pragma: no cover - optional
4245
"""Release any resources (override if needed)."""
4346

4447

48+
@runtime_checkable
49+
class HeaderPublisher(Protocol):
50+
"""Optional :class:`Transport` capability: publish a body together with out-of-band
51+
transport headers (e.g. the replay-bypass marker), for brokers that carry per-message
52+
metadata. A transport that does not implement it simply does not propagate headers —
53+
callers fall back to plain :meth:`Transport.publish` (ADR-0027)."""
54+
55+
def publish_with_headers(self, queue: str, body: str, headers: Dict[str, str]) -> None:
56+
"""Append ``body`` to ``queue`` along with out-of-band ``headers``."""
57+
...
58+
59+
4560
class InMemoryTransport(Transport):
4661
"""In-process transport for tests and broker-free local runs (``memory://``)."""
4762

4863
def __init__(self) -> None:
64+
# Bodies and their out-of-band headers are kept in lockstep parallel deques, so the
65+
# body storage layout stays a plain Deque[str].
4966
self._queues: Dict[str, Deque[str]] = defaultdict(deque)
67+
self._headers: Dict[str, Deque[Dict[str, str]]] = defaultdict(deque)
5068

5169
def publish(self, queue: str, body: str) -> None:
5270
self._queues[queue].append(body)
71+
self._headers[queue].append({})
72+
73+
def publish_with_headers(self, queue: str, body: str, headers: Dict[str, str]) -> None:
74+
self._queues[queue].append(body)
75+
self._headers[queue].append(dict(headers))
5376

5477
def pop(self, queue: str, timeout: float = 1.0) -> Optional[ReceivedMessage]:
5578
dq = self._queues.get(queue)
5679
if not dq:
5780
return None
58-
return ReceivedMessage(body=dq.popleft(), queue=queue)
81+
return ReceivedMessage(
82+
body=dq.popleft(), queue=queue, headers=self._headers[queue].popleft()
83+
)
5984

6085
def ack(self, message: ReceivedMessage) -> None:
6186
# Already removed on pop; nothing to do.

tests/test_redrive.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def test_redrive_to_source():
3333
result = redrive(t, "orders.dlq")
3434

3535
assert result.redriven == 1 and result.skipped == 0
36+
assert result.items[0].bypassed is False # bypass is off by default
3637
got = _drain(t, "orders")
3738
assert len(got) == 1
3839
assert "dead_letter" not in got[0]

0 commit comments

Comments
 (0)