Skip to content

Commit 32d2ae0

Browse files
committed
test(kafka): add the §6 conformance runner
1 parent b4ed227 commit 32d2ae0

1 file changed

Lines changed: 52 additions & 0 deletions

File tree

tests/test_kafka_conformance.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
"""Apache Kafka binding conformance: the vendored manifest's ``kafka`` block locks the §6
2+
header projection (bq-* UTF-8 byte strings) and the ``attempts`` reconciliation (the
3+
``bq-attempts`` header is authoritative when present, else the body — NOT a max). No
4+
confluent-kafka, no broker — pure transport logic against golden values."""
5+
6+
from __future__ import annotations
7+
8+
import json
9+
import os
10+
import unittest
11+
12+
from babelqueue import EnvelopeCodec
13+
from babelqueue.kafka_transport import KafkaTransport
14+
15+
CONFORMANCE = os.path.join(os.path.dirname(__file__), "conformance")
16+
17+
18+
class KafkaConformanceTest(unittest.TestCase):
19+
@classmethod
20+
def setUpClass(cls) -> None:
21+
with open(os.path.join(CONFORMANCE, "manifest.json"), encoding="utf-8") as fh:
22+
cls.kafka = json.load(fh)["kafka"]
23+
24+
def test_property_projection(self) -> None:
25+
projection = self.kafka["property_projection"]
26+
with open(os.path.join(CONFORMANCE, projection["envelope_file"]), encoding="utf-8") as fh:
27+
body = fh.read()
28+
29+
got = {key: value.decode("utf-8") for key, value in KafkaTransport._projection(body)}
30+
self.assertEqual(got, projection["headers"])
31+
32+
def test_attempts_reconciliation(self) -> None:
33+
for case in self.kafka["attempts_reconciliation"]["cases"]:
34+
env = EnvelopeCodec.make("urn:babel:orders:created", {"x": 1})
35+
env["attempts"] = case["body_attempts"]
36+
body = EnvelopeCodec.encode(env)
37+
38+
headers = []
39+
if case["header_attempts"] is not None:
40+
headers = [("bq-attempts", str(case["header_attempts"]).encode("utf-8"))]
41+
42+
out = KafkaTransport._reconcile(body, headers)
43+
44+
self.assertEqual(
45+
EnvelopeCodec.decode(out)["attempts"],
46+
case["expected_attempts"],
47+
case["name"],
48+
)
49+
50+
51+
if __name__ == "__main__": # pragma: no cover
52+
unittest.main()

0 commit comments

Comments
 (0)