|
| 1 | +"""Apache ActiveMQ Artemis transport over AMQP 1.0. Requires the ``artemis`` extra: |
| 2 | +
|
| 3 | + pip install "babelqueue[artemis]" |
| 4 | +
|
| 5 | +Artemis speaks AMQP 1.0 (not the 0-9-1 of RabbitMQ), so this transport uses the |
| 6 | +``python-qpid-proton`` blocking client rather than ``pika``. Producing sends the canonical |
| 7 | +envelope as the message body and projects the contract envelope fields onto the AMQP |
| 8 | +properties a JMS peer reads: ``correlation-id`` = trace_id (JMSCorrelationID), ``creation-time`` |
| 9 | += meta.created_at (JMSTimestamp), the ``x-opt-jms-type`` message annotation = URN (JMSType, the |
| 10 | +AMQP-JMS mapping), plus the ``bq-`` application properties (``bq-schema-version`` / |
| 11 | +``bq-source-lang`` / ``bq-attempts`` / ``bq-app-id``) — so a Java (JMS) or .NET/Node/... peer |
| 12 | +routes and correlates without parsing the body. The URN in the body's ``job`` field stays |
| 13 | +authoritative. |
| 14 | +
|
| 15 | +Consuming reserves one message at a time (``receive`` -> process -> ``accept``); the |
| 16 | +authoritative attempt count is the envelope's ``attempts`` (the body), reconciled against the |
| 17 | +broker's native AMQP ``delivery-count`` as ``attempts = max(body, delivery_count)`` — no -1, |
| 18 | +because the AMQP header counter is 0-based (0 on first delivery), counting prior failed |
| 19 | +deliveries. (The Java JMS binding reads ``JMSXDeliveryCount`` which is 1-based and subtracts 1, |
| 20 | +arriving at the same 0-based ``attempts``.) A message left un-accepted is redelivered |
| 21 | +(at-least-once). |
| 22 | +
|
| 23 | +This implements §7 of the broker-bindings contract. The envelope is unchanged |
| 24 | +(``schema_version`` stays 1); Apache ActiveMQ Artemis is purely additive. |
| 25 | +
|
| 26 | +URL form: ``artemis://host:5672`` (or ``artemis+ssl://`` for TLS) — translated to the |
| 27 | +``amqp://`` / ``amqps://`` proton speaks. For credentials or a custom connection, build the |
| 28 | +transport directly and pass it via ``BabelQueue(transport=...)`` or |
| 29 | +``ArtemisTransport(connection=...)``. |
| 30 | +""" |
| 31 | + |
| 32 | +from __future__ import annotations |
| 33 | + |
| 34 | +from typing import Any, Dict, Optional |
| 35 | + |
| 36 | +from .codec import EnvelopeCodec |
| 37 | +from .transport import ReceivedMessage, Transport |
| 38 | + |
| 39 | +JMS_TYPE_ANNOTATION = "x-opt-jms-type" |
| 40 | +APP_ID = "babelqueue" |
| 41 | + |
| 42 | + |
| 43 | +class ArtemisTransport(Transport): |
| 44 | + def __init__( |
| 45 | + self, |
| 46 | + url: str = "artemis://localhost:5672", |
| 47 | + *, |
| 48 | + connection: Any = None, |
| 49 | + credit: int = 1, |
| 50 | + receive_timeout_millis: int = 1000, |
| 51 | + **connect_options: Any, |
| 52 | + ) -> None: |
| 53 | + self._url = url or "artemis://localhost:5672" |
| 54 | + self._credit = credit |
| 55 | + self._receive_timeout_millis = receive_timeout_millis |
| 56 | + self._connect_options = connect_options |
| 57 | + self._senders: Dict[str, Any] = {} |
| 58 | + self._receivers: Dict[str, Any] = {} |
| 59 | + |
| 60 | + if connection is not None: |
| 61 | + self._connection = connection |
| 62 | + return |
| 63 | + self._connection = self._build_connection() # pragma: no cover - needs Artemis / network |
| 64 | + |
| 65 | + def _build_connection(self) -> Any: # pragma: no cover - needs Artemis / network |
| 66 | + try: |
| 67 | + from proton.utils import BlockingConnection |
| 68 | + except ImportError as exc: |
| 69 | + raise ImportError( |
| 70 | + "ArtemisTransport requires the 'python-qpid-proton' package. Install with " |
| 71 | + 'pip install "babelqueue[artemis]".' |
| 72 | + ) from exc |
| 73 | + return BlockingConnection(self._to_amqp_url(self._url), **self._connect_options) |
| 74 | + |
| 75 | + # -- helpers ------------------------------------------------------------ |
| 76 | + |
| 77 | + @staticmethod |
| 78 | + def _to_amqp_url(url: str) -> str: |
| 79 | + """``artemis://`` -> ``amqp://`` and ``artemis+ssl://`` -> ``amqps://`` (proton's schemes).""" |
| 80 | + if url.startswith("artemis+ssl://"): |
| 81 | + return "amqps://" + url[len("artemis+ssl://"):] |
| 82 | + if url.startswith("artemis://"): |
| 83 | + return "amqp://" + url[len("artemis://"):] |
| 84 | + return url |
| 85 | + |
| 86 | + def _sender(self, queue: str) -> Any: |
| 87 | + sender = self._senders.get(queue) |
| 88 | + if sender is None: |
| 89 | + sender = self._connection.create_sender(queue) |
| 90 | + self._senders[queue] = sender |
| 91 | + return sender |
| 92 | + |
| 93 | + def _receiver(self, queue: str) -> Any: |
| 94 | + receiver = self._receivers.get(queue) |
| 95 | + if receiver is None: |
| 96 | + receiver = self._connection.create_receiver(queue, credit=self._credit) |
| 97 | + self._receivers[queue] = receiver |
| 98 | + return receiver |
| 99 | + |
| 100 | + @staticmethod |
| 101 | + def _projection(body: str) -> Dict[str, str]: |
| 102 | + """AMQP application properties (string->string) — a redundant, routable view of the |
| 103 | + body: bq-schema-version/bq-source-lang/bq-attempts/bq-app-id. §7.2 (the URN is carried |
| 104 | + by the x-opt-jms-type annotation, trace_id by correlation-id).""" |
| 105 | + env = EnvelopeCodec.decode(body) |
| 106 | + if not env: |
| 107 | + return {} |
| 108 | + meta = env.get("meta") or {} |
| 109 | + |
| 110 | + props: Dict[str, str] = {} |
| 111 | + if meta.get("schema_version") is not None: |
| 112 | + props["bq-schema-version"] = str(meta["schema_version"]) |
| 113 | + if meta.get("lang"): |
| 114 | + props["bq-source-lang"] = str(meta["lang"]) |
| 115 | + props["bq-attempts"] = str(int(env.get("attempts", 0) or 0)) |
| 116 | + props["bq-app-id"] = APP_ID |
| 117 | + return props |
| 118 | + |
| 119 | + @staticmethod |
| 120 | + def _jms_type(body: str) -> str: |
| 121 | + env = EnvelopeCodec.decode(body) |
| 122 | + return str(env["job"]) if env and env.get("job") else "" |
| 123 | + |
| 124 | + @staticmethod |
| 125 | + def _correlation_id(body: str) -> str: |
| 126 | + env = EnvelopeCodec.decode(body) |
| 127 | + return str(env["trace_id"]) if env and env.get("trace_id") else "" |
| 128 | + |
| 129 | + @staticmethod |
| 130 | + def _creation_seconds(body: str) -> Optional[float]: |
| 131 | + """proton's creation_time is float seconds; the contract's created_at is epoch ms.""" |
| 132 | + env = EnvelopeCodec.decode(body) |
| 133 | + meta = (env or {}).get("meta") or {} |
| 134 | + created_at = meta.get("created_at") |
| 135 | + if created_at is None: |
| 136 | + return None |
| 137 | + try: |
| 138 | + return int(created_at) / 1000.0 |
| 139 | + except (TypeError, ValueError): # pragma: no cover - defensive |
| 140 | + return None |
| 141 | + |
| 142 | + def _message(self, body: str) -> Any: |
| 143 | + """Build the proton Message projecting the §7 JMS-readable metadata.""" |
| 144 | + from proton import Message, symbol |
| 145 | + |
| 146 | + message: Any = Message(body=body) |
| 147 | + props = self._projection(body) |
| 148 | + if props: |
| 149 | + message.properties = props |
| 150 | + correlation_id = self._correlation_id(body) |
| 151 | + if correlation_id: |
| 152 | + message.correlation_id = correlation_id |
| 153 | + creation = self._creation_seconds(body) |
| 154 | + if creation is not None: |
| 155 | + message.creation_time = creation |
| 156 | + jms_type = self._jms_type(body) |
| 157 | + if jms_type: |
| 158 | + message.annotations = {symbol(JMS_TYPE_ANNOTATION): jms_type} |
| 159 | + return message |
| 160 | + |
| 161 | + @staticmethod |
| 162 | + def _reconcile(body: str, delivery_count: Any) -> str: |
| 163 | + """Set attempts to max(current, delivery_count). The AMQP delivery-count header is |
| 164 | + 0-based (0 on first delivery), so it maps directly to attempts with no -1. The runtime |
| 165 | + retries by republishing with attempts+1 in the body (delivery-count back to 0), so a |
| 166 | + republished message must not have its higher body count lowered.""" |
| 167 | + try: |
| 168 | + dc = int(delivery_count) |
| 169 | + except (ValueError, TypeError): |
| 170 | + return body |
| 171 | + if dc <= 0: |
| 172 | + return body |
| 173 | + env = EnvelopeCodec.decode(body) |
| 174 | + if not env or dc <= int(env.get("attempts", 0) or 0): |
| 175 | + return body |
| 176 | + env["attempts"] = dc |
| 177 | + return EnvelopeCodec.encode(env) |
| 178 | + |
| 179 | + @staticmethod |
| 180 | + def _delivery_count(message: Any) -> int: |
| 181 | + value = getattr(message, "delivery_count", 0) |
| 182 | + try: |
| 183 | + return int(value) |
| 184 | + except (TypeError, ValueError): # pragma: no cover - defensive |
| 185 | + return 0 |
| 186 | + |
| 187 | + @staticmethod |
| 188 | + def _payload(message: Any) -> str: |
| 189 | + body = getattr(message, "body", None) |
| 190 | + if isinstance(body, bytes): |
| 191 | + return body.decode("utf-8") |
| 192 | + return str(body) if body is not None else "" |
| 193 | + |
| 194 | + # -- Transport ---------------------------------------------------------- |
| 195 | + |
| 196 | + def publish(self, queue: str, body: str) -> None: |
| 197 | + self._sender(queue).send(self._message(body)) |
| 198 | + |
| 199 | + def pop(self, queue: str, timeout: float = 1.0) -> Optional[ReceivedMessage]: |
| 200 | + wait = self._receive_timeout_millis / 1000.0 |
| 201 | + if timeout and timeout > 0: |
| 202 | + wait = timeout |
| 203 | + receiver = self._receiver(queue) |
| 204 | + try: |
| 205 | + message = receiver.receive(timeout=wait) |
| 206 | + except Exception as exc: # noqa: BLE001 - proton raises proton.Timeout on no message |
| 207 | + if type(exc).__name__ == "Timeout": |
| 208 | + return None |
| 209 | + raise |
| 210 | + if message is None: # pragma: no cover - defensive (real client raises instead) |
| 211 | + return None |
| 212 | + body = self._reconcile(self._payload(message), self._delivery_count(message)) |
| 213 | + return ReceivedMessage(body=body, queue=queue, handle=receiver) |
| 214 | + |
| 215 | + def ack(self, message: ReceivedMessage) -> None: |
| 216 | + if message.handle is None: |
| 217 | + return |
| 218 | + message.handle.accept() |
| 219 | + |
| 220 | + def close(self) -> None: # pragma: no cover - resource cleanup |
| 221 | + try: |
| 222 | + self._connection.close() |
| 223 | + except Exception: # noqa: BLE001 - best-effort cleanup |
| 224 | + pass |
0 commit comments