Skip to content

Commit a21620d

Browse files
committed
feat(asb): Azure Service Bus transport (sb:// scheme) + conformance runner
1 parent f714909 commit a21620d

8 files changed

Lines changed: 449 additions & 3 deletions

File tree

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ jobs:
4444
- name: Install (dev + all adapters for type context)
4545
run: |
4646
python -m pip install --upgrade pip
47-
pip install -e ".[dev,celery,django,redis,amqp,sqs]"
47+
pip install -e ".[dev,celery,django,redis,amqp,sqs,azureservicebus]"
4848
- name: Ruff
4949
run: ruff check src tests
5050
- name: Mypy
@@ -89,7 +89,7 @@ jobs:
8989
- name: Install (all adapters — full coverage with brokers)
9090
run: |
9191
python -m pip install --upgrade pip
92-
pip install -e ".[redis,amqp,sqs,celery,django,dev]"
92+
pip install -e ".[redis,amqp,sqs,azureservicebus,celery,django,dev]"
9393
9494
- name: Wait for ElasticMQ
9595
run: |

CHANGELOG.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
The envelope wire format is versioned separately by `meta.schema_version`
88
(currently **1**) — see the contract at [babelqueue.com](https://babelqueue.com).
99

10+
## [Unreleased]
11+
12+
### Added
13+
- **Azure Service Bus transport** (`babelqueue[azureservicebus]`, `azure-servicebus`) —
14+
`AsbTransport`, selected by the `sb://` URL scheme (e.g.
15+
`sb://<namespace>.servicebus.windows.net`, Azure AD via `DefaultAzureCredential`; or pass
16+
`connection_string=...` / a built `client`). Implements [§4 of the broker-bindings
17+
contract](https://babelqueue.com/docs/spec/1.x/broker-bindings#azure-service-bus): the
18+
canonical envelope is the message body, projected onto native Service Bus fields
19+
(`Subject` = URN, `CorrelationId` = `trace_id`, `MessageId` = `meta.id`, plus the `bq-`
20+
application properties); PeekLock reserve → `complete_message` ack; `attempts` reconciled
21+
to the broker-authoritative `DeliveryCount − 1`. A client can be injected for tests/DI. The
22+
projection + reconciliation are unit-tested with no broker and no `azure-servicebus` (the
23+
azure import is lazy); the publish flow is covered with a fake client. The envelope is
24+
unchanged (`schema_version: 1`); Azure Service Bus is purely additive. Ships as a MINOR.
25+
1026
## [1.1.0] - 2026-06-12
1127

1228
### Added

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ dependencies = []
3232
redis = ["redis>=4"]
3333
amqp = ["pika>=1.3"]
3434
sqs = ["boto3>=1.26"]
35+
azureservicebus = ["azure-servicebus>=7.11", "azure-identity>=1.15"]
3536
celery = ["celery>=5"]
3637
django = ["django>=4.2"]
3738
dev = ["pytest>=7", "pytest-cov>=4", "mypy>=1.8", "ruff>=0.5"]

src/babelqueue/asb_transport.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
"""Azure Service Bus transport. Requires the ``azureservicebus`` extra:
2+
3+
pip install "babelqueue[azureservicebus]"
4+
5+
Producing sends the canonical envelope as the message body and projects the contract
6+
envelope fields onto native Service Bus fields — ``Subject`` = URN, ``CorrelationId`` =
7+
trace_id, ``MessageId`` = meta.id, plus the ``bq-`` application properties
8+
(``bq-schema-version`` / ``bq-source-lang`` / ``bq-created-at``) — so a .NET/Java/... peer
9+
can route on ``Subject`` and correlate on ``CorrelationId`` without parsing the body.
10+
Consuming uses the PeekLock reservation model (``receive_messages`` -> process ->
11+
``complete_message``); the authoritative attempt count is the broker's native
12+
``DeliveryCount`` (1-based), reconciled onto the envelope as ``attempts = DeliveryCount - 1``.
13+
A message left un-acked has its lock expire and is redelivered (at-least-once).
14+
15+
This implements §4 of the broker-bindings contract. The envelope is unchanged
16+
(``schema_version`` stays 1); Azure Service Bus is purely additive.
17+
18+
URL form: ``sb://<namespace>.servicebus.windows.net`` (Azure AD via
19+
``DefaultAzureCredential``). For connection-string auth or a custom client, build the
20+
transport directly and pass it via ``BabelQueue(transport=...)`` or
21+
``AsbTransport(connection_string=...)``.
22+
"""
23+
24+
from __future__ import annotations
25+
26+
from typing import Any, Dict, Optional
27+
from urllib.parse import urlsplit
28+
29+
from .codec import EnvelopeCodec
30+
from .transport import ReceivedMessage, Transport
31+
32+
33+
class AsbTransport(Transport):
34+
def __init__(
35+
self,
36+
url: str = "sb://",
37+
*,
38+
client: Any = None,
39+
connection_string: Optional[str] = None,
40+
credential: Any = None,
41+
max_wait_time: Optional[float] = None,
42+
) -> None:
43+
parts = urlsplit(url) if url else urlsplit("sb://")
44+
self._namespace = parts.hostname or None
45+
self._connection_string = connection_string
46+
self._credential = credential
47+
self._max_wait_time = max_wait_time
48+
self._senders: Dict[str, Any] = {}
49+
self._receivers: Dict[str, Any] = {}
50+
51+
if client is not None:
52+
self._client = client
53+
return
54+
self._client = self._build_client() # pragma: no cover - needs Azure / network
55+
56+
def _build_client(self) -> Any: # pragma: no cover - needs Azure / network
57+
try:
58+
from azure.servicebus import ServiceBusClient
59+
except ImportError as exc:
60+
raise ImportError(
61+
"AsbTransport requires the 'azure-servicebus' package. Install with "
62+
'pip install "babelqueue[azureservicebus]".'
63+
) from exc
64+
import os
65+
66+
cs = self._connection_string or os.environ.get("AZURE_SERVICEBUS_CONNECTION_STRING")
67+
if cs:
68+
return ServiceBusClient.from_connection_string(cs)
69+
if self._namespace:
70+
credential = self._credential
71+
if credential is None:
72+
from azure.identity import DefaultAzureCredential
73+
74+
credential = DefaultAzureCredential()
75+
return ServiceBusClient(self._namespace, credential)
76+
raise ValueError(
77+
"AsbTransport needs a connection string, a namespace + credential, or an injected client."
78+
)
79+
80+
# -- helpers ------------------------------------------------------------
81+
82+
def _sender(self, queue: str) -> Any:
83+
sender = self._senders.get(queue)
84+
if sender is None:
85+
sender = self._client.get_queue_sender(queue)
86+
self._senders[queue] = sender
87+
return sender
88+
89+
def _receiver(self, queue: str) -> Any:
90+
receiver = self._receivers.get(queue)
91+
if receiver is None:
92+
receiver = self._client.get_queue_receiver(queue)
93+
self._receivers[queue] = receiver
94+
return receiver
95+
96+
@staticmethod
97+
def _projection(body: str) -> Dict[str, Any]:
98+
"""Native ServiceBusMessage kwargs — Subject/CorrelationId/MessageId + the bq-
99+
application properties (a redundant, routable view of the body). §4.2–§4.3."""
100+
try:
101+
env: Dict[str, Any] = EnvelopeCodec.decode(body)
102+
except (ValueError, TypeError): # pragma: no cover - defensive
103+
return {"content_type": "application/json"}
104+
meta = env.get("meta") or {}
105+
106+
props: Dict[str, Any] = {}
107+
if meta.get("schema_version") is not None:
108+
props["bq-schema-version"] = meta["schema_version"]
109+
if meta.get("lang"):
110+
props["bq-source-lang"] = meta["lang"]
111+
if meta.get("created_at") is not None:
112+
props["bq-created-at"] = meta["created_at"]
113+
114+
kwargs: Dict[str, Any] = {"content_type": "application/json"}
115+
if env.get("job"):
116+
kwargs["subject"] = env["job"]
117+
if env.get("trace_id"):
118+
kwargs["correlation_id"] = env["trace_id"]
119+
if meta.get("id"):
120+
kwargs["message_id"] = meta["id"]
121+
if props:
122+
kwargs["application_properties"] = props
123+
return kwargs
124+
125+
@staticmethod
126+
def _reconcile(body: str, delivery_count: Any) -> str:
127+
"""Set attempts to max(current, DeliveryCount - 1) — DeliveryCount (1-based) is the
128+
broker's native redelivery floor, but the runtime retries by republishing with
129+
attempts+1 in the body, so a republished message (DeliveryCount back to 1) must not
130+
have its higher body count lowered. First delivery (DeliveryCount 1) reads 0."""
131+
try:
132+
dc = int(delivery_count)
133+
except (ValueError, TypeError):
134+
return body
135+
if dc <= 1:
136+
return body
137+
native = dc - 1
138+
env = EnvelopeCodec.decode(body)
139+
if not env or native <= int(env.get("attempts", 0)):
140+
return body
141+
env["attempts"] = native
142+
return EnvelopeCodec.encode(env)
143+
144+
# -- Transport ----------------------------------------------------------
145+
146+
def publish(self, queue: str, body: str) -> None:
147+
from azure.servicebus import ServiceBusMessage
148+
149+
message = ServiceBusMessage(body, **self._projection(body))
150+
self._sender(queue).send_messages(message)
151+
152+
def pop(self, queue: str, timeout: float = 1.0) -> Optional[ReceivedMessage]:
153+
wait = self._max_wait_time
154+
if wait is None and timeout and timeout > 0:
155+
wait = timeout
156+
messages = self._receiver(queue).receive_messages(max_message_count=1, max_wait_time=wait)
157+
if not messages:
158+
return None
159+
message = messages[0]
160+
body = self._reconcile(str(message), getattr(message, "delivery_count", None))
161+
return ReceivedMessage(body=body, queue=queue, handle=message)
162+
163+
def ack(self, message: ReceivedMessage) -> None:
164+
if message.handle is None:
165+
return
166+
self._receiver(message.queue).complete_message(message.handle)
167+
168+
def close(self) -> None: # pragma: no cover - resource cleanup
169+
for resource in (*self._senders.values(), *self._receivers.values(), self._client):
170+
try:
171+
resource.close()
172+
except Exception: # noqa: BLE001 - best-effort cleanup
173+
pass

src/babelqueue/transport.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,12 @@ def make_transport(broker_url: str) -> Transport:
8383
from .sqs_transport import SqsTransport
8484

8585
return SqsTransport(broker_url)
86+
if scheme in ("sb", "servicebus", "azureservicebus"):
87+
from .asb_transport import AsbTransport
88+
89+
return AsbTransport(broker_url)
8690

8791
raise BabelQueueError(
8892
f"Unsupported broker scheme {scheme!r}. Use 'memory://', 'redis://', "
89-
"'amqp://' or 'sqs://', or pass your own Transport via BabelQueue(transport=...)."
93+
"'amqp://', 'sqs://' or 'sb://', or pass your own Transport via BabelQueue(transport=...)."
9094
)

tests/conformance/manifest.json

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,5 +93,34 @@
9393
{ "name": "absent-count", "body_attempts": 3, "approximate_receive_count": null, "expected_attempts": 3 }
9494
]
9595
}
96+
},
97+
"asb": {
98+
"description": "Azure Service Bus binding conformance (broker-bindings.md §4). Every SDK that ships an ASB transport must satisfy these. The envelope body stays byte-identical (the 'cases' above); these lock the native projection + reconciliation the binding adds. Per-message values reuse fixtures/order-created.json so the expected projection is deterministic.",
99+
"property_projection": {
100+
"description": "On produce, the transport MUST project these native Service Bus message fields from the envelope: Subject = job (the URN), CorrelationId = trace_id, MessageId = meta.id, ContentType = application/json, plus the bq- ApplicationProperties as native AMQP-typed values (numbers stay numbers, not strings). Applies to every ASB-producing SDK.",
101+
"envelope_file": "fixtures/order-created.json",
102+
"message": {
103+
"subject": "urn:babel:orders:created",
104+
"correlation_id": "7b3f9c2a-e41d-4f88-9b2a-1c0d5e6f7a8b",
105+
"message_id": "f1e2d3c4-b5a6-4789-90ab-cdef01234567",
106+
"content_type": "application/json"
107+
},
108+
"application_properties": {
109+
"bq-schema-version": 1,
110+
"bq-source-lang": "php",
111+
"bq-created-at": 1749132727000
112+
}
113+
},
114+
"attempts_reconciliation": {
115+
"description": "On consume, attempts = max(body.attempts, DeliveryCount - 1): a first delivery (DeliveryCount 1) reads 0, a runtime-incremented body count is never lowered, and DeliveryCount <= 1 leaves the body's own count untouched (the runtime retries by republishing with attempts+1). DeliveryCount is the native 1-based ASB redelivery counter; the rule is identical across the native-consumer SDKs (.NET/Java/Node) and the Transport+App SDKs (Python/Go).",
116+
"cases": [
117+
{ "name": "first-delivery", "body_attempts": 0, "delivery_count": 1, "expected_attempts": 0 },
118+
{ "name": "third-delivery", "body_attempts": 0, "delivery_count": 3, "expected_attempts": 2 },
119+
{ "name": "native-exceeds-body", "body_attempts": 2, "delivery_count": 5, "expected_attempts": 4 },
120+
{ "name": "never-lower-runtime", "body_attempts": 5, "delivery_count": 2, "expected_attempts": 5 },
121+
{ "name": "first-delivery-keeps-body", "body_attempts": 4, "delivery_count": 1, "expected_attempts": 4 },
122+
{ "name": "zero-count-keeps-body", "body_attempts": 3, "delivery_count": 0, "expected_attempts": 3 }
123+
]
124+
}
96125
}
97126
}

tests/test_asb_conformance.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
"""Azure Service Bus binding conformance: the vendored manifest's ``asb`` block locks the
2+
§4 native projection (Subject/CorrelationId/MessageId + bq- application properties) and the
3+
``attempts = max(body, DeliveryCount - 1)`` reconciliation. No azure-servicebus, no broker —
4+
pure transport logic against golden values."""
5+
6+
from __future__ import annotations
7+
8+
import json
9+
import os
10+
import unittest
11+
12+
from babelqueue import EnvelopeCodec
13+
from babelqueue.asb_transport import AsbTransport
14+
15+
CONFORMANCE = os.path.join(os.path.dirname(__file__), "conformance")
16+
17+
18+
class AsbConformanceTest(unittest.TestCase):
19+
@classmethod
20+
def setUpClass(cls) -> None:
21+
with open(os.path.join(CONFORMANCE, "manifest.json"), encoding="utf-8") as fh:
22+
cls.asb = json.load(fh)["asb"]
23+
24+
def test_property_projection(self) -> None:
25+
projection = self.asb["property_projection"]
26+
with open(os.path.join(CONFORMANCE, projection["envelope_file"]), encoding="utf-8") as fh:
27+
body = fh.read()
28+
29+
got = AsbTransport._projection(body)
30+
message = projection["message"]
31+
32+
self.assertEqual(got.get("subject"), message["subject"])
33+
self.assertEqual(got.get("correlation_id"), message["correlation_id"])
34+
self.assertEqual(got.get("message_id"), message["message_id"])
35+
self.assertEqual(got.get("content_type"), message["content_type"])
36+
self.assertEqual(got.get("application_properties"), projection["application_properties"])
37+
38+
def test_attempts_reconciliation(self) -> None:
39+
for case in self.asb["attempts_reconciliation"]["cases"]:
40+
env = EnvelopeCodec.make("urn:babel:orders:created", {"x": 1})
41+
env["attempts"] = case["body_attempts"]
42+
body = EnvelopeCodec.encode(env)
43+
44+
out = AsbTransport._reconcile(body, case["delivery_count"])
45+
46+
self.assertEqual(
47+
EnvelopeCodec.decode(out)["attempts"],
48+
case["expected_attempts"],
49+
case["name"],
50+
)
51+
52+
53+
if __name__ == "__main__": # pragma: no cover
54+
unittest.main()

0 commit comments

Comments
 (0)