Skip to content

Commit aad1f0f

Browse files
committed
feat(redrive): DLQ redrive tooling — safe replay (ADR-0026)
The Python mirror of babelqueue-go redrive: redrive(transport, dlq, *, to_queue, max, dry_run, select) reads dead-lettered messages off a DLQ and re-publishes each to its dead_letter.original_queue or to_queue, reset (strip dead_letter, attempts->0, preserve job/trace_id/data/meta). Drains-then-processes; acks only after a successful re-publish; restores undecodable bodies and on publish failure. Pure codec+transport, no new dependency; envelope frozen. Replay-Bypass header is a documented phase two.
1 parent 6499fc0 commit aad1f0f

3 files changed

Lines changed: 306 additions & 1 deletion

File tree

src/babelqueue/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from __future__ import annotations
1313

14-
from . import dead_letter, idempotency
14+
from . import dead_letter, idempotency, redrive
1515
from .app import BabelQueue
1616
from .codec import SCHEMA_VERSION, SOURCE_LANG, EnvelopeCodec
1717
from .contracts import HasTraceId, PolyglotMessage
@@ -37,6 +37,7 @@
3737
"UnknownUrnError",
3838
"dead_letter",
3939
"idempotency",
40+
"redrive",
4041
"IdempotencyStore",
4142
"InMemoryStore",
4243
"__version__",

src/babelqueue/redrive.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
"""Optional DLQ redrive tooling (ADR-0026): safe replay off the dead-letter queue.
2+
3+
The Python mirror of the Go ``Redrive``. It reads dead-lettered messages off a DLQ and
4+
re-publishes each to its source queue (its ``dead_letter.original_queue``) or a chosen
5+
``to_queue``, **reset for reprocessing**: the ``dead_letter`` block is removed and ``attempts``
6+
reset to 0, while ``job``, ``trace_id``, ``data`` and ``meta`` are preserved verbatim. It is
7+
the operator-side counterpart to the runtime's dead-letter routing — the contract leaves
8+
redrive to tooling, and this is that tool.
9+
10+
from babelqueue import BabelQueue
11+
from babelqueue.redrive import redrive
12+
13+
app = BabelQueue("redis://localhost:6379/0")
14+
result = redrive(app.transport, "orders.dlq") # back to each source
15+
result = redrive(app.transport, "orders.dlq", to_queue="sandbox") # safe sandbox replay
16+
plan = redrive(app.transport, "orders.dlq", dry_run=True) # inspect, change nothing
17+
18+
Messages are drained from the DLQ first and then processed, so restored messages (skipped,
19+
dry-run, or undecodable) are never re-encountered in the same run; a DLQ message is
20+
acknowledged only after a successful re-publish, and an undecodable body is restored, not
21+
dropped.
22+
23+
Replay safety today is sandbox routing (``to_queue``) + ``dry_run``. The **Replay-Bypass**
24+
guard — a ``bq-replay-bypass`` transport header surfaced to handlers so a replay can skip
25+
external side-effects (don't re-charge, don't re-email) — is a documented phase two: like the
26+
OpenTelemetry ``traceparent`` follow-up, it carries out-of-band metadata as a transport header
27+
and so touches the runtime + every transport binding. Until then, sandbox routing is the
28+
safe-replay answer.
29+
"""
30+
31+
from __future__ import annotations
32+
33+
from dataclasses import dataclass, field
34+
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple
35+
36+
from .codec import EnvelopeCodec
37+
from .transport import ReceivedMessage, Transport
38+
39+
Envelope = Mapping[str, Any]
40+
Select = Callable[[Envelope], bool]
41+
42+
43+
@dataclass
44+
class RedriveItem:
45+
"""What happened to one message during a redrive run."""
46+
47+
message_id: str
48+
trace_id: str
49+
urn: str
50+
reason: str
51+
from_queue: str
52+
to: str # target queue (the plan, even on a dry run; "" when skipped/undecodable)
53+
redriven: bool # True only when actually re-published to ``to``
54+
55+
56+
@dataclass
57+
class RedriveResult:
58+
"""Summary of a redrive run."""
59+
60+
redriven: int = 0
61+
skipped: int = 0
62+
items: List[RedriveItem] = field(default_factory=list)
63+
64+
65+
def redrive(
66+
transport: Transport,
67+
dlq: str,
68+
*,
69+
to_queue: Optional[str] = None,
70+
max: int = 0,
71+
dry_run: bool = False,
72+
select: Optional[Select] = None,
73+
timeout: float = 1.0,
74+
) -> RedriveResult:
75+
"""Move dead-lettered messages off ``dlq`` and replay them; see the module docstring."""
76+
# Drain up to ``max`` messages (or all available) before processing any of them.
77+
batch: List[Tuple[ReceivedMessage, Optional[Dict[str, Any]]]] = []
78+
while max == 0 or len(batch) < max:
79+
message = transport.pop(dlq, timeout)
80+
if message is None:
81+
break
82+
batch.append((message, _decoded(message.body)))
83+
84+
result = RedriveResult()
85+
for message, envelope in batch:
86+
if envelope is None:
87+
transport.publish(dlq, message.body) # restore the poison body; never drop it
88+
transport.ack(message)
89+
result.skipped += 1
90+
result.items.append(RedriveItem("", "", "", "", dlq, "", False))
91+
continue
92+
93+
meta_raw = envelope.get("meta")
94+
meta: Mapping[str, Any] = meta_raw if isinstance(meta_raw, Mapping) else {}
95+
dl_raw = envelope.get("dead_letter")
96+
dead_letter: Mapping[str, Any] = dl_raw if isinstance(dl_raw, Mapping) else {}
97+
item = RedriveItem(
98+
message_id=str(meta.get("id", "")),
99+
trace_id=str(envelope.get("trace_id", "")),
100+
urn=EnvelopeCodec.urn(envelope),
101+
reason=str(dead_letter.get("reason", "")),
102+
from_queue=dlq,
103+
to="",
104+
redriven=False,
105+
)
106+
107+
if select is not None and not select(envelope):
108+
transport.publish(dlq, message.body) # not selected: restore unchanged
109+
transport.ack(message)
110+
result.skipped += 1
111+
result.items.append(item)
112+
continue
113+
114+
target = to_queue or _source_queue_of(envelope)
115+
item.to = target
116+
117+
if dry_run:
118+
transport.publish(dlq, message.body) # report the plan; restore unchanged
119+
transport.ack(message)
120+
result.skipped += 1
121+
result.items.append(item)
122+
continue
123+
124+
reset = dict(envelope)
125+
reset.pop("dead_letter", None)
126+
reset["attempts"] = 0
127+
try:
128+
transport.publish(target, EnvelopeCodec.encode(reset))
129+
except Exception:
130+
transport.publish(dlq, message.body) # restore on a publish failure, then surface
131+
transport.ack(message)
132+
raise
133+
transport.ack(message)
134+
item.redriven = True
135+
result.redriven += 1
136+
result.items.append(item)
137+
138+
return result
139+
140+
141+
def _decoded(body: str) -> Optional[Dict[str, Any]]:
142+
"""Decode a DLQ body, or None when it is not a redrivable envelope.
143+
144+
``EnvelopeCodec.decode`` returns ``{}`` for malformed/non-object input; an object with no
145+
string ``job`` is likewise not redrivable.
146+
"""
147+
envelope = EnvelopeCodec.decode(body)
148+
if not envelope or not isinstance(envelope.get("job"), str):
149+
return None
150+
return envelope
151+
152+
153+
def _source_queue_of(envelope: Envelope) -> str:
154+
"""Default redrive target: ``dead_letter.original_queue``, falling back to ``meta.queue``."""
155+
dead_letter = envelope.get("dead_letter")
156+
if isinstance(dead_letter, Mapping):
157+
original = dead_letter.get("original_queue")
158+
if isinstance(original, str) and original:
159+
return original
160+
meta = envelope.get("meta")
161+
if isinstance(meta, Mapping):
162+
queue = meta.get("queue")
163+
if isinstance(queue, str):
164+
return queue
165+
return ""

tests/test_redrive.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
"""Tests for the optional DLQ redrive tooling (ADR-0026)."""
2+
3+
import pytest
4+
5+
from babelqueue import dead_letter
6+
from babelqueue.codec import EnvelopeCodec
7+
from babelqueue.redrive import redrive
8+
from babelqueue.transport import InMemoryTransport
9+
10+
11+
def _dead_letter(transport, dlq, urn, original_queue, data=None):
12+
env = EnvelopeCodec.make(urn, data or {}, queue=original_queue)
13+
annotated = dead_letter.annotate(env, "failed", original_queue, 3, error="boom")
14+
transport.publish(dlq, EnvelopeCodec.encode(annotated))
15+
return annotated
16+
17+
18+
def _drain(transport, queue):
19+
out = []
20+
while True:
21+
message = transport.pop(queue, 0)
22+
if message is None:
23+
break
24+
out.append(EnvelopeCodec.decode(message.body))
25+
transport.ack(message)
26+
return out
27+
28+
29+
def test_redrive_to_source():
30+
t = InMemoryTransport()
31+
orig = _dead_letter(t, "orders.dlq", "urn:babel:orders:created", "orders", {"order_id": 1})
32+
33+
result = redrive(t, "orders.dlq")
34+
35+
assert result.redriven == 1 and result.skipped == 0
36+
got = _drain(t, "orders")
37+
assert len(got) == 1
38+
assert "dead_letter" not in got[0]
39+
assert got[0]["attempts"] == 0
40+
assert got[0]["trace_id"] == orig["trace_id"]
41+
assert got[0]["data"] == {"order_id": 1}
42+
assert EnvelopeCodec.urn(got[0]) == "urn:babel:orders:created"
43+
assert _drain(t, "orders.dlq") == []
44+
45+
46+
def test_redrive_to_sandbox():
47+
t = InMemoryTransport()
48+
_dead_letter(t, "orders.dlq", "urn:babel:orders:created", "orders")
49+
50+
result = redrive(t, "orders.dlq", to_queue="sandbox")
51+
52+
assert result.redriven == 1
53+
assert _drain(t, "orders") == []
54+
assert len(_drain(t, "sandbox")) == 1
55+
56+
57+
def test_redrive_dry_run():
58+
t = InMemoryTransport()
59+
_dead_letter(t, "orders.dlq", "urn:babel:orders:created", "orders")
60+
61+
result = redrive(t, "orders.dlq", dry_run=True)
62+
63+
assert result.redriven == 0 and result.skipped == 1
64+
assert result.items[0].to == "orders"
65+
assert result.items[0].redriven is False
66+
assert _drain(t, "orders") == []
67+
dlq = _drain(t, "orders.dlq")
68+
assert len(dlq) == 1 and "dead_letter" in dlq[0]
69+
70+
71+
def test_redrive_select():
72+
t = InMemoryTransport()
73+
_dead_letter(t, "dlq", "urn:babel:orders:created", "orders")
74+
_dead_letter(t, "dlq", "urn:babel:emails:welcome", "emails")
75+
76+
result = redrive(t, "dlq", select=lambda e: EnvelopeCodec.urn(e) == "urn:babel:orders:created")
77+
78+
assert result.redriven == 1 and result.skipped == 1
79+
assert len(_drain(t, "orders")) == 1
80+
assert _drain(t, "emails") == []
81+
assert len(_drain(t, "dlq")) == 1 # the unselected one is restored
82+
83+
84+
def test_redrive_max():
85+
t = InMemoryTransport()
86+
for _ in range(3):
87+
_dead_letter(t, "dlq", "urn:babel:orders:created", "orders")
88+
89+
result = redrive(t, "dlq", max=2)
90+
91+
assert result.redriven == 2
92+
assert len(_drain(t, "dlq")) == 1 # Max respected
93+
94+
95+
def test_redrive_no_dead_letter_falls_back_to_meta_queue():
96+
t = InMemoryTransport()
97+
# a plain (never dead-lettered) envelope on the DLQ — redrive falls back to meta.queue
98+
env = EnvelopeCodec.make("urn:babel:orders:created", {}, queue="orders")
99+
t.publish("dlq", EnvelopeCodec.encode(env))
100+
101+
result = redrive(t, "dlq")
102+
103+
assert result.redriven == 1
104+
assert len(_drain(t, "orders")) == 1
105+
106+
107+
class _FailOnTarget(InMemoryTransport):
108+
"""An in-memory transport that refuses to publish to one queue."""
109+
110+
def __init__(self, fail_queue):
111+
super().__init__()
112+
self._fail_queue = fail_queue
113+
114+
def publish(self, queue, body):
115+
if queue == self._fail_queue:
116+
raise RuntimeError("publish refused")
117+
super().publish(queue, body)
118+
119+
120+
def test_redrive_publish_failure_restores():
121+
t = _FailOnTarget("orders")
122+
_dead_letter(t, "dlq", "urn:babel:orders:created", "orders")
123+
124+
with pytest.raises(RuntimeError):
125+
redrive(t, "dlq")
126+
127+
assert len(_drain(t, "dlq")) == 1 # restored to the DLQ, not lost
128+
assert _drain(t, "orders") == [] # nothing reached the source queue
129+
130+
131+
def test_redrive_undecodable_restored():
132+
t = InMemoryTransport()
133+
t.publish("dlq", "not-json{{{")
134+
135+
result = redrive(t, "dlq")
136+
137+
assert result.redriven == 0 and result.skipped == 1
138+
message = t.pop("dlq", 0)
139+
assert message is not None and message.body == "not-json{{{"

0 commit comments

Comments
 (0)