Skip to content

Commit 951bc8a

Browse files
committed
feat(kafka): Apache Kafka transport (§6, kafka:// scheme) —
value+bq- headers, process-then-commit
1 parent af5c776 commit 951bc8a

6 files changed

Lines changed: 380 additions & 3 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,6 @@ venv/
1212
.coverage
1313
coverage.xml
1414
htmlcov/
15+
16+
# CommitBrief local config and cache
17+
.commitbrief/

CHANGELOG.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
The envelope wire format is versioned separately by `meta.schema_version`
88
(currently **1**) — see the contract at [babelqueue.com](https://babelqueue.com).
99

10+
## [1.4.0] - 2026-06-13
11+
12+
### Added
13+
- **Apache Kafka transport** (`babelqueue[kafka]`, `confluent-kafka`) — `KafkaTransport`,
14+
selected by the `kafka://` URL scheme (e.g. `kafka://host:9092`; or pass an injected
15+
`producer` + `consumer_factory`). Implements [§6 of the broker-bindings
16+
contract](https://babelqueue.com/docs/spec/1.x/broker-bindings#apache-kafka): the record
17+
**value** is the canonical envelope, projected onto native Kafka record headers (UTF-8 byte
18+
strings) — `bq-job` = URN, `bq-trace-id`, `bq-message-id`, plus `bq-schema-version`/
19+
`bq-source-lang`/`bq-attempts` — with the record timestamp mirroring `meta.created_at`.
20+
Consume is **process-then-commit** (`pop` reserves via `poll` with `enable.auto.commit=false`,
21+
`ack` commits the offset); the **`bq-attempts` header is the authoritative attempt counter**
22+
(the body's `attempts` is the fallback for non-BabelQueue producers). The projection +
23+
reconciliation + publish/pop/ack flow are unit-tested with no broker and no `confluent-kafka`
24+
(the kafka import is lazy; the transport talks to injected producer/consumer fakes). The
25+
envelope is unchanged (`schema_version: 1`); Apache Kafka is purely additive. Ships as a MINOR.
26+
1027
## [1.3.0] - 2026-06-13
1128

1229
### Added

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "babelqueue"
7-
version = "1.3.0"
7+
version = "1.4.0"
88
description = "Polyglot Queues, Simplified — the Python core: the canonical BabelQueue wire-envelope codec, contracts and dead-letter helpers."
99
readme = "README.md"
1010
requires-python = ">=3.9"
@@ -34,6 +34,7 @@ amqp = ["pika>=1.3"]
3434
sqs = ["boto3>=1.26"]
3535
azureservicebus = ["azure-servicebus>=7.11", "azure-identity>=1.15"]
3636
pulsar = ["pulsar-client>=3.4"]
37+
kafka = ["confluent-kafka>=2.3"]
3738
celery = ["celery>=5"]
3839
django = ["django>=4.2"]
3940
dev = ["pytest>=7", "pytest-cov>=4", "mypy>=1.8", "ruff>=0.5"]

src/babelqueue/kafka_transport.py

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
"""Apache Kafka transport. Requires the ``kafka`` extra:
2+
3+
pip install "babelqueue[kafka]"
4+
5+
Producing writes the canonical envelope as the record value and projects the contract
6+
envelope fields onto native Kafka record headers (UTF-8 byte strings): ``bq-job`` = URN,
7+
``bq-trace-id`` = trace_id, ``bq-message-id`` = meta.id, plus ``bq-schema-version`` /
8+
``bq-source-lang`` / ``bq-attempts`` — so a Java/.NET/... peer can route on ``bq-job``
9+
without parsing the body — with the record timestamp mirroring ``meta.created_at``.
10+
Consuming is process-then-commit: ``poll`` reserves a record (``enable.auto.commit=false``)
11+
and ``ack`` commits the offset only after the handler returns (at-least-once). Kafka has no
12+
native delivery count, so the ``bq-attempts`` header is the authoritative retry counter (the
13+
body's ``attempts`` is the fallback for non-BabelQueue producers); the runtime owns retry by
14+
republishing with attempts+1 and dead-letters to ``<queue>.dlq``.
15+
16+
This implements §6 of the broker-bindings contract. The envelope is unchanged
17+
(``schema_version`` stays 1); Apache Kafka is purely additive.
18+
19+
URL form: ``kafka://host:9092[,host2:9092]``. The default consumer group is ``babelqueue``.
20+
For a custom client, build the transport directly and pass it via ``BabelQueue(transport=...)``
21+
or ``KafkaTransport(producer=..., consumer_factory=...)``.
22+
"""
23+
24+
from __future__ import annotations
25+
26+
from typing import Any, Callable, Dict, List, Optional, Tuple
27+
28+
from .codec import EnvelopeCodec
29+
from .transport import ReceivedMessage, Transport
30+
31+
Headers = List[Tuple[str, bytes]]
32+
33+
34+
def _brokers_from_url(url: str) -> str:
35+
rest = url.split("://", 1)[1] if "://" in url else url
36+
return rest.split("/", 1)[0] or "localhost:9092"
37+
38+
39+
class KafkaTransport(Transport):
40+
def __init__(
41+
self,
42+
url: str = "kafka://localhost:9092",
43+
*,
44+
producer: Any = None,
45+
consumer_factory: Optional[Callable[[str], Any]] = None,
46+
group_id: str = "babelqueue",
47+
**client_config: Any,
48+
) -> None:
49+
self._brokers = _brokers_from_url(url or "kafka://localhost:9092")
50+
self._group_id = group_id
51+
self._client_config = client_config
52+
self._producer = producer
53+
self._consumer_factory = consumer_factory
54+
self._consumers: Dict[str, Any] = {}
55+
56+
# -- helpers ------------------------------------------------------------
57+
58+
def _producer_(self) -> Any:
59+
if self._producer is None:
60+
self._producer = self._build_producer() # pragma: no cover - needs Kafka / network
61+
return self._producer
62+
63+
def _build_producer(self) -> Any: # pragma: no cover - needs Kafka / network
64+
from confluent_kafka import Producer
65+
66+
return Producer({"bootstrap.servers": self._brokers, **self._client_config})
67+
68+
def _consumer(self, queue: str) -> Any:
69+
consumer = self._consumers.get(queue)
70+
if consumer is None:
71+
if self._consumer_factory is not None:
72+
consumer = self._consumer_factory(queue)
73+
else:
74+
consumer = self._build_consumer(queue) # pragma: no cover - needs Kafka / network
75+
self._consumers[queue] = consumer
76+
return consumer
77+
78+
def _build_consumer(self, queue: str) -> Any: # pragma: no cover - needs Kafka / network
79+
from confluent_kafka import Consumer
80+
81+
consumer = Consumer(
82+
{
83+
"bootstrap.servers": self._brokers,
84+
"group.id": self._group_id,
85+
"enable.auto.commit": False,
86+
"auto.offset.reset": "earliest",
87+
**self._client_config,
88+
}
89+
)
90+
consumer.subscribe([queue])
91+
return consumer
92+
93+
@staticmethod
94+
def _projection(body: str) -> Headers:
95+
"""Native Kafka record headers (UTF-8 byte values) — a redundant, routable view of the
96+
body: bq-job/bq-trace-id/bq-message-id + bq-schema-version/lang/attempts. §6.3."""
97+
env = EnvelopeCodec.decode(body)
98+
if not env:
99+
return []
100+
meta = env.get("meta") or {}
101+
102+
headers: Headers = []
103+
104+
def add(key: str, value: Any) -> None:
105+
if value is not None and value != "":
106+
headers.append((key, str(value).encode("utf-8")))
107+
108+
add("bq-job", env.get("job"))
109+
add("bq-trace-id", env.get("trace_id"))
110+
add("bq-message-id", meta.get("id"))
111+
if meta.get("schema_version") is not None:
112+
headers.append(("bq-schema-version", str(meta["schema_version"]).encode("utf-8")))
113+
add("bq-source-lang", meta.get("lang"))
114+
headers.append(("bq-attempts", str(int(env.get("attempts", 0) or 0)).encode("utf-8")))
115+
return headers
116+
117+
@staticmethod
118+
def _reconcile(body: str, headers: Any) -> str:
119+
"""Set attempts to the authoritative bq-attempts header (falling back to the body's own
120+
attempts when the header is absent/unparseable — a non-BabelQueue producer). §6.5."""
121+
env = EnvelopeCodec.decode(body)
122+
if not env:
123+
return body
124+
attempts = int(env.get("attempts", 0) or 0)
125+
for key, value in headers or []:
126+
if key == "bq-attempts":
127+
raw = value.decode("utf-8") if isinstance(value, (bytes, bytearray)) else str(value)
128+
try:
129+
attempts = int(raw)
130+
except (ValueError, TypeError):
131+
pass
132+
break
133+
if attempts == int(env.get("attempts", 0) or 0):
134+
return body
135+
env["attempts"] = attempts
136+
return EnvelopeCodec.encode(env)
137+
138+
@staticmethod
139+
def _payload(message: Any) -> str:
140+
value = message.value()
141+
if isinstance(value, (bytes, bytearray)):
142+
return value.decode("utf-8")
143+
return str(value) if value is not None else ""
144+
145+
# -- Transport ----------------------------------------------------------
146+
147+
def publish(self, queue: str, body: str) -> None:
148+
env = EnvelopeCodec.decode(body)
149+
meta = env.get("meta") or {}
150+
producer = self._producer_()
151+
kwargs: Dict[str, Any] = {"value": body.encode("utf-8"), "headers": self._projection(body)}
152+
created_at = meta.get("created_at")
153+
if created_at:
154+
kwargs["timestamp"] = int(created_at)
155+
producer.produce(queue, **kwargs)
156+
producer.poll(0)
157+
158+
def pop(self, queue: str, timeout: float = 1.0) -> Optional[ReceivedMessage]:
159+
wait = timeout if timeout and timeout > 0 else 1.0
160+
message = self._consumer(queue).poll(wait)
161+
if message is None or message.error() is not None:
162+
return None
163+
body = self._reconcile(self._payload(message), message.headers())
164+
return ReceivedMessage(body=body, queue=queue, handle=message)
165+
166+
def ack(self, message: ReceivedMessage) -> None:
167+
if message.handle is None:
168+
return
169+
self._consumer(message.queue).commit(message=message.handle, asynchronous=False)
170+
171+
def close(self) -> None: # pragma: no cover - resource cleanup
172+
try:
173+
if self._producer is not None:
174+
self._producer.flush()
175+
except Exception: # noqa: BLE001 - best-effort cleanup
176+
pass
177+
for consumer in self._consumers.values():
178+
try:
179+
consumer.close()
180+
except Exception: # noqa: BLE001 - best-effort cleanup
181+
pass

src/babelqueue/transport.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,13 @@ def make_transport(broker_url: str) -> Transport:
9191
from .pulsar_transport import PulsarTransport
9292

9393
return PulsarTransport(broker_url)
94+
if scheme == "kafka":
95+
from .kafka_transport import KafkaTransport
96+
97+
return KafkaTransport(broker_url)
9498

9599
raise BabelQueueError(
96100
f"Unsupported broker scheme {scheme!r}. Use 'memory://', 'redis://', "
97-
"'amqp://', 'sqs://', 'sb://' or 'pulsar://', or pass your own Transport via "
98-
"BabelQueue(transport=...)."
101+
"'amqp://', 'sqs://', 'sb://', 'pulsar://' or 'kafka://', or pass your own Transport "
102+
"via BabelQueue(transport=...)."
99103
)

0 commit comments

Comments
 (0)