Skip to content

Commit e98fcfb

Browse files
committed
feat(otel): W3C traceparent transport-header propagation — Python SDK (ADR-0028)
Mirrors the Go reference: a dependency-free header seam (publish_with_headers + headers_from_context surfacing a delivered message's headers) and W3C traceparent inject/extract in otel.py (consumer span becomes a child of the producer's remote parent; no header falls back to the v0.1 trace_id behaviour). Per-transport carriers: Redis __bq_frame (bare-value back-compat), AMQP header table, SQS MessageAttributes, in-memory reference. Envelope frozen (GR-1), otel stays the [otel] extra (GR-7), trace_id preserved (GR-4). Also bump to 1.11.0 and reconcile the drifted __version__ (was 1.6.0) to the package version.
1 parent dc6d828 commit e98fcfb

15 files changed

Lines changed: 1003 additions & 41 deletions

CHANGELOG.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,31 @@ The envelope wire format is versioned separately by `meta.schema_version`
99

1010
## [Unreleased]
1111

12+
## [1.11.0] - 2026-06-21
13+
14+
### Added
15+
- **W3C `traceparent` span-context propagation** (OpenTelemetry v0.2, ADR-0028) — the optional
16+
`babelqueue.otel` module now carries true cross-hop **span** parent-child linkage, not just
17+
shared-`trace_id` correlation. On publish, `otel.publish` injects the active span context as a
18+
W3C `traceparent` **transport header** (and still stamps `trace_id` for the v0.1 fallback); on
19+
consume, `otel.wrap_handler` extracts it and starts the CONSUMER span as a true **child** of the
20+
producer span. With no `traceparent` present it falls back to the v0.1 `trace_id`-derived parent,
21+
so it is a strict, backward-compatible upgrade — no regression. The header rides **out of band**
22+
via a new dependency-free core seam — `BabelQueue.publish_with_headers(urn, data, headers, …)`
23+
(produce side) and `babelqueue.headers_from_context()` (consume side, surfaced by the runtime) —
24+
so the wire envelope stays **frozen** (`schema_version: 1`, GR-1) and the core stays
25+
zero-dependency (OTel remains the optional `[otel]` extra, GR-7). `traceparent` is carried on the
26+
**in-memory** (reference), **Redis** (a transport-owned `__bq_frame` JSON frame with bare-value
27+
back-compat, so cross-version queues interoperate; degrades to a bare publish in Laravel-compat
28+
mode), **RabbitMQ** (native AMQP header table, beside the contract `x-*` headers) and **SQS**
29+
(native `MessageAttributes`, beside the contract `bq-*` attributes) transports; where a transport
30+
can't carry headers, propagation degrades cleanly to v0.1 `trace_id` correlation with no error.
31+
A plain `publish` is byte-identical to before. Unit-tested without a broker (frame round-trip +
32+
bare back-compat, header merge/extract per transport, and an in-memory producer→consumer
33+
parent-child end-to-end with the OTel SDK's `InMemorySpanExporter`); broker-gated integration
34+
tests assert a published `traceparent` arrives on the consumed message's headers beside the
35+
unchanged body. The envelope is unchanged; this is purely additive. Ships as a MINOR.
36+
1237
## [1.6.0] - 2026-06-14
1338

1439
### Added

README.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,42 @@ def on_created(data, meta): ...
182182
python manage.py babelqueue_worker --queue orders # run the consumer
183183
```
184184

185+
## OpenTelemetry tracing (optional)
186+
187+
`pip install "babelqueue[otel]"` adds the optional `babelqueue.otel` module — the core never
188+
imports OpenTelemetry, so it stays zero-dependency. It emits a PRODUCER span per publish and a
189+
CONSUMER span per handled message, correlated across every hop and SDK, at two layered levels:
190+
191+
- **`trace_id` correlation** (v0.1): the envelope's `trace_id` maps 1:1 to an OTel trace id, so
192+
every hop that shares a `trace_id` shares one trace — with **zero** wire/transport change.
193+
- **W3C `traceparent` span linkage** (v0.2): the producer also injects its active span context as
194+
a `traceparent` **transport header** (beside the frozen envelope, never in it), so the consumer
195+
starts its span as a true **child** of the producer span — real cross-hop parent-child linkage.
196+
With no `traceparent` present it falls back to the v0.1 `trace_id` behaviour, so enabling it is a
197+
strict, backward-compatible upgrade.
198+
199+
```python
200+
from opentelemetry import trace
201+
from babelqueue import BabelQueue, otel
202+
203+
tracer = trace.get_tracer("orders")
204+
app = BabelQueue("redis://localhost:6379/0", queue="orders")
205+
206+
# consumer: wrap_handler starts a CONSUMER span (child of the producer span when a
207+
# traceparent rode along; else in the trace_id-derived trace)
208+
app.register("urn:babel:orders:created", otel.wrap_handler(tracer, on_order_created))
209+
210+
# producer: otel.publish starts a PRODUCER span and carries traceparent + trace_id
211+
otel.publish(tracer, app, "urn:babel:orders:created", {"order_id": 1042})
212+
```
213+
214+
The `traceparent` rides the out-of-band transport-header seam (`publish_with_headers` /
215+
`headers_from_context`) — the same seam the replay-bypass marker uses — so the envelope stays
216+
frozen (`schema_version: 1`). It is carried on the in-memory, Redis (a transport-owned JSON frame,
217+
with bare-value back-compat), RabbitMQ (AMQP header table) and SQS (`MessageAttributes`)
218+
transports; where a transport can't carry it, propagation degrades cleanly to v0.1 `trace_id`
219+
correlation with no error.
220+
185221
## What's here
186222

187223
The codec/contracts/dead-letter (zero-dep core), the `BabelQueue` runtime

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "babelqueue"
7-
version = "1.10.0"
7+
version = "1.11.0"
88
description = "Polyglot Queues, Simplified — the Python core: the canonical BabelQueue wire-envelope codec, contracts and dead-letter helpers."
99
readme = "README.md"
1010
requires-python = ">=3.9"

src/babelqueue/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,18 @@
1111

1212
from __future__ import annotations
1313

14-
from . import dead_letter, idempotency, redrive, replay
14+
from . import dead_letter, headers, idempotency, redrive, replay
1515
from .app import BabelQueue
1616
from .codec import SCHEMA_VERSION, SOURCE_LANG, EnvelopeCodec
1717
from .contracts import HasTraceId, PolyglotMessage
18+
from .headers import headers_from_context
1819
from .idempotency import IdempotencyStore, InMemoryStore
1920
from .exceptions import BabelQueueError, UnknownUrnError
2021
from .replay import HEADER_REPLAY_BYPASS, bypass_external_effects, is_replay
2122
from .routing import UnknownUrnStrategy
2223
from .transport import HeaderPublisher, InMemoryTransport, ReceivedMessage, Transport
2324

24-
__version__ = "1.6.0"
25+
__version__ = "1.11.0"
2526

2627
__all__ = [
2728
"BabelQueue",
@@ -38,11 +39,13 @@
3839
"BabelQueueError",
3940
"UnknownUrnError",
4041
"dead_letter",
42+
"headers",
4143
"idempotency",
4244
"redrive",
4345
"replay",
4446
"is_replay",
4547
"bypass_external_effects",
48+
"headers_from_context",
4649
"HEADER_REPLAY_BYPASS",
4750
"IdempotencyStore",
4851
"InMemoryStore",

src/babelqueue/app.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ def on_order_created(data, meta):
2525
from . import dead_letter
2626
from .codec import EnvelopeCodec
2727
from .exceptions import UnknownUrnError
28+
from .headers import _headers_scope
2829
from .replay import HEADER_REPLAY_BYPASS, _replay_scope
2930
from .routing import UnknownUrnStrategy
30-
from .transport import ReceivedMessage, Transport, make_transport
31+
from .transport import HeaderPublisher, ReceivedMessage, Transport, make_transport
3132

3233
Handler = Callable[..., None]
3334

@@ -70,6 +71,37 @@ def publish(
7071
self.transport.publish(target, EnvelopeCodec.encode(envelope))
7172
return envelope["meta"]["id"]
7273

74+
def publish_with_headers(
75+
self,
76+
urn: str,
77+
data: Mapping[str, Any],
78+
headers: Mapping[str, str],
79+
*,
80+
queue: Optional[str] = None,
81+
trace_id: Optional[str] = None,
82+
) -> str:
83+
"""Publish a message together with out-of-band transport ``headers``; returns its id.
84+
85+
The headers ride **beside** the frozen envelope (GR-1) on the transport's per-message
86+
metadata channel — e.g. a W3C ``traceparent`` for cross-hop span linkage (ADR-0028) —
87+
never inside it. It is the produce-side counterpart of the headers the runtime surfaces
88+
to a handler via :func:`~babelqueue.headers.headers_from_context`.
89+
90+
When the transport implements :class:`~babelqueue.transport.HeaderPublisher` and
91+
``headers`` is non-empty, the headers are propagated; otherwise it transparently falls
92+
back to a plain :meth:`publish` (the headers are dropped — no error, no regression),
93+
exactly as :func:`~babelqueue.redrive.redrive` degrades. Passing empty headers is
94+
equivalent to :meth:`publish`, so callers need not branch on transport capability.
95+
"""
96+
target = queue or self.queue
97+
envelope = EnvelopeCodec.make(urn, data, queue=target, trace_id=trace_id)
98+
body = EnvelopeCodec.encode(envelope)
99+
if headers and isinstance(self.transport, HeaderPublisher):
100+
self.transport.publish_with_headers(target, body, dict(headers))
101+
else:
102+
self.transport.publish(target, body)
103+
return envelope["meta"]["id"]
104+
73105
# -- Register handlers --------------------------------------------------
74106

75107
def handler(self, urn: str) -> Callable[[Handler], Handler]:
@@ -116,8 +148,16 @@ def consume(
116148
run = consume
117149

118150
def dispatch(self, received: ReceivedMessage) -> None:
119-
"""Route one reserved message to its handler and acknowledge it."""
120-
with _replay_scope(bool(received.headers.get(HEADER_REPLAY_BYPASS))):
151+
"""Route one reserved message to its handler and acknowledge it.
152+
153+
The delivered message's out-of-band transport headers are surfaced onto the context for
154+
the span of this dispatch (:func:`~babelqueue.headers.headers_from_context`), so a handler
155+
or an optional wrapper (e.g. the ``otel`` module reading a W3C ``traceparent``, ADR-0028)
156+
can read metadata that travels beside the frozen envelope (GR-1).
157+
"""
158+
with _headers_scope(received.headers), _replay_scope(
159+
bool(received.headers.get(HEADER_REPLAY_BYPASS))
160+
):
121161
envelope = EnvelopeCodec.decode(received.body)
122162
urn = str(envelope.get("job") or envelope.get("urn") or "")
123163
handler = self._handlers.get(urn) if urn else None

src/babelqueue/headers.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""Consume-side out-of-band transport headers (the Python mirror of Go's ``headers.go``).
2+
3+
The runtime surfaces a delivered message's :attr:`~babelqueue.transport.ReceivedMessage.headers`
4+
onto a :class:`contextvars.ContextVar` for the span of one dispatch, so a handler — or an
5+
optional wrapper such as the ``otel`` module — can read per-message metadata that travels
6+
**beside** the frozen envelope (GR-1), never in it. It is the consume-side counterpart of
7+
:class:`~babelqueue.transport.HeaderPublisher`.
8+
9+
This is the same out-of-band seam the replay-bypass marker rides (ADR-0027); ADR-0028's W3C
10+
``traceparent`` (for cross-hop span parent-child linkage) is the second rider on it. The header
11+
map is read-only — treat it as immutable. Adds only :mod:`contextvars` + a plain ``dict`` (no
12+
dependency), exactly like :mod:`babelqueue.replay`.
13+
"""
14+
15+
from __future__ import annotations
16+
17+
import contextlib
18+
import contextvars
19+
from typing import Dict, Iterator, Mapping, Optional
20+
21+
#: The delivered message's out-of-band transport headers, for the span of one dispatch.
22+
#: Defaults to an empty mapping so :func:`headers_from_context` is always nil-safe.
23+
_headers_var: contextvars.ContextVar[Mapping[str, str]] = contextvars.ContextVar(
24+
"babelqueue_headers", default={}
25+
)
26+
27+
28+
def headers_from_context() -> Mapping[str, str]:
29+
"""Return the out-of-band transport headers that arrived with the message currently being
30+
handled, or an empty mapping when none were carried (or the transport surfaces none).
31+
32+
The returned mapping is read-only — do not mutate it. It is the consume-side counterpart of
33+
:class:`~babelqueue.transport.HeaderPublisher`: a handler or an optional wrapper (e.g. the
34+
``otel`` module's :func:`~babelqueue.otel.wrap_handler`) reads per-message metadata that
35+
travels beside the frozen envelope, never in it (GR-1).
36+
"""
37+
return _headers_var.get()
38+
39+
40+
@contextlib.contextmanager
41+
def _headers_scope(headers: Optional[Mapping[str, str]]) -> Iterator[None]:
42+
"""Internal: surface ``headers`` on the context for the span of one dispatch, then reset.
43+
44+
A nil/empty map is fine; reads stay nil-safe. The runtime calls this in
45+
:meth:`~babelqueue.app.BabelQueue.dispatch` so wrappers can read the delivered headers.
46+
"""
47+
scoped: Mapping[str, str] = headers or {}
48+
token = _headers_var.set(scoped)
49+
try:
50+
yield
51+
finally:
52+
_headers_var.reset(token)
53+
54+
55+
def merge_headers(*sources: Optional[Mapping[str, str]]) -> Dict[str, str]:
56+
"""Combine header maps into a single ``dict[str, str]``, dropping blank keys and blank values.
57+
58+
Later sources win a key collision. Returns a fresh dict (callers may mutate it freely). Used
59+
to merge an injected ``traceparent`` onto a transport's contract headers without clobbering
60+
them — the contract keys are passed *last* so they win (mirrors the Go merge-not-clobber).
61+
"""
62+
out: Dict[str, str] = {}
63+
for source in sources:
64+
if not source:
65+
continue
66+
for key, value in source.items():
67+
if not key or value is None or value == "":
68+
continue
69+
out[str(key)] = str(value)
70+
return out

0 commit comments

Comments
 (0)