Skip to content

Commit ffba94b

Browse files
committed
feat(outbox): transactional outbox helper — Python core (ADR-0029)
Ports the producer-side transactional outbox as babelqueue.outbox: Outbox.write encodes the frozen envelope and persists it via a caller-bound OutboxStore inside the caller's own DB transaction (no dual-write), and OutboxRelay.flush/drain forwards stored rows verbatim through the transport — mark_published only after publish returns, a raising publish -> mark_failed + linear backoff (row stays pending), one poison row never blocks the batch. At-least-once handoff; consumers dedupe on meta.id (the Idempotent mirror). Frozen bytes ride verbatim (GR-1/4/5); OutboxStore is a Protocol so the core stays stdlib-only (GR-7). v1.12.0.
1 parent e98fcfb commit ffba94b

6 files changed

Lines changed: 707 additions & 3 deletions

File tree

CHANGELOG.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,31 @@ The envelope wire format is versioned separately by `meta.schema_version`
99

1010
## [Unreleased]
1111

12+
## [1.12.0] - 2026-06-21
13+
14+
### Added
15+
- **Transactional outbox helper** (ADR-0029) — the optional `babelqueue.outbox` module ports the
16+
PHP `BabelQueue\Outbox` helper to Python, removing the producer **dual write**: the message is
17+
persisted **into your database, in the same transaction** as the business data (so it commits or
18+
rolls back atomically with it), and a separate **relay** publishes the durable rows afterwards.
19+
No distributed transaction; exactly-once *handoff* into the broker, then at-least-once on the wire
20+
(the consumer dedupes on `meta.id` via the idempotency helper, the consumer-side mirror, ADR-0022).
21+
The core stays **stdlib-only** (GR-7): `OutboxStore` is an abstract `typing.Protocol` the caller
22+
binds to their own DB — the module ships only the in-memory `InMemoryOutboxStore` reference and
23+
pulls in **no** DB driver. `Outbox.write(envelope)` encodes via the frozen `EnvelopeCodec` and
24+
delegates to `OutboxStore.save` **inside the transaction the caller already opened** — it does not
25+
begin/commit anything (the caller owns the transaction boundary). `OutboxRelay.flush()` publishes
26+
one batch through the existing publish-only `Transport`, marking each row published **only after**
27+
the transport accepts it, or failed (caught → `mark_failed`, row left pending, with a bounded
28+
linear backoff via an injectable sleeper) so one poison row never blocks the batch;
29+
`OutboxRelay.drain()` loops while a pass makes progress, with a safety ceiling. The relay
30+
publishes the **stored bytes verbatim** — it never decodes, rebuilds or re-encodes the envelope —
31+
so `trace_id` is preserved end-to-end and the body is byte-compatible across SDKs (GR-1/GR-4/GR-5).
32+
Unit-tested without a broker (write stores the encoded envelope byte-identical; relay publishes via
33+
a fake `Transport` + marks published; a raising publish → `mark_failed`, row still pending, batch
34+
continues; `drain` loops to empty and stops on no-progress; backoff grows linearly and caps via the
35+
injected sleeper). The envelope is unchanged (`schema_version: 1`); this is purely additive.
36+
1237
## [1.11.0] - 2026-06-21
1338

1439
### Added

README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,47 @@ def on_created(data, meta): ...
182182
python manage.py babelqueue_worker --queue orders # run the consumer
183183
```
184184

185+
## Transactional outbox (optional)
186+
187+
The `babelqueue.outbox` helper (ADR-0029) removes the producer **dual write**: "commit the
188+
business row" and "publish to the broker" are two systems that can disagree on a crash. Instead the
189+
message is persisted **into your database, in the same transaction** as the business data — so it
190+
commits or rolls back atomically with it — and a separate **relay** publishes the durable rows
191+
afterwards. No distributed transaction; exactly-once *handoff* into the broker, then at-least-once
192+
on the wire (the consumer dedupes on `meta.id` — see the idempotency helper, the mirror of this).
193+
194+
The core stays **stdlib-only**: `OutboxStore` is an abstract `Protocol` you bind to **your own DB**
195+
(the core ships no driver). The stored value is the `EnvelopeCodec`-encoded envelope **byte-for-byte
196+
unchanged** (frozen, `schema_version: 1`); the relay publishes those exact bytes — it never decodes,
197+
rebuilds or re-encodes — so `trace_id` is preserved end-to-end.
198+
199+
```python
200+
from babelqueue import BabelQueue, EnvelopeCodec
201+
from babelqueue.outbox import Outbox, OutboxRelay, InMemoryOutboxStore
202+
203+
store = InMemoryOutboxStore() # production: your own OutboxStore adapter, DB-backed
204+
outbox = Outbox(store)
205+
206+
# write side — YOU own the transaction boundary (this is the whole point):
207+
with db.transaction(): # your own open transaction
208+
db.insert_order(order) # the business write
209+
envelope = EnvelopeCodec.make("urn:babel:orders:created", {"order_id": 1042}, queue="orders")
210+
outbox.write(envelope) # same connection, same tx — both, or neither
211+
212+
# read/publish side — run on a short interval, after the business tx commits:
213+
app = BabelQueue("redis://localhost:6379/0", queue="orders")
214+
relay = OutboxRelay(app.transport, store)
215+
relay.drain() # publish all pending rows; flush() does one batch
216+
```
217+
218+
`Outbox.write` only encodes and calls `OutboxStore.save` — it does **not** begin or commit anything.
219+
A `save` runs inside the transaction you already opened; you commit both together. `OutboxRelay`
220+
marks a row published only **after** the transport accepts it; a publish that raises is recorded via
221+
`mark_failed` (with a bounded, injectable-sleeper backoff) and left pending for a later pass, so one
222+
poison row never blocks the batch. Implement `OutboxStore` over your DB (claim rows oldest-first,
223+
ideally with `SELECT … FOR UPDATE SKIP LOCKED` so two relays don't double-publish); `InMemoryOutboxStore`
224+
is the reference for tests and single-process demos (no real transaction).
225+
185226
## OpenTelemetry tracing (optional)
186227

187228
`pip install "babelqueue[otel]"` adds the optional `babelqueue.otel` module — the core never

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "babelqueue"
7-
version = "1.11.0"
7+
version = "1.12.0"
88
description = "Polyglot Queues, Simplified — the Python core: the canonical BabelQueue wire-envelope codec, contracts and dead-letter helpers."
99
readme = "README.md"
1010
requires-python = ">=3.9"

src/babelqueue/__init__.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,26 @@
1111

1212
from __future__ import annotations
1313

14-
from . import dead_letter, headers, idempotency, redrive, replay
14+
from . import dead_letter, headers, idempotency, outbox, redrive, replay
1515
from .app import BabelQueue
1616
from .codec import SCHEMA_VERSION, SOURCE_LANG, EnvelopeCodec
1717
from .contracts import HasTraceId, PolyglotMessage
1818
from .headers import headers_from_context
1919
from .idempotency import IdempotencyStore, InMemoryStore
20+
from .outbox import (
21+
InMemoryOutboxStore,
22+
Outbox,
23+
OutboxRecord,
24+
OutboxRelay,
25+
OutboxRelayResult,
26+
OutboxStore,
27+
)
2028
from .exceptions import BabelQueueError, UnknownUrnError
2129
from .replay import HEADER_REPLAY_BYPASS, bypass_external_effects, is_replay
2230
from .routing import UnknownUrnStrategy
2331
from .transport import HeaderPublisher, InMemoryTransport, ReceivedMessage, Transport
2432

25-
__version__ = "1.11.0"
33+
__version__ = "1.12.0"
2634

2735
__all__ = [
2836
"BabelQueue",
@@ -41,6 +49,13 @@
4149
"dead_letter",
4250
"headers",
4351
"idempotency",
52+
"outbox",
53+
"Outbox",
54+
"OutboxStore",
55+
"OutboxRecord",
56+
"OutboxRelay",
57+
"OutboxRelayResult",
58+
"InMemoryOutboxStore",
4459
"redrive",
4560
"replay",
4661
"is_replay",

0 commit comments

Comments
 (0)