Skip to content

Commit dc6d828

Browse files
committed
test: fix flaky pika integration test — poll basic_get instead of immediate get
test_publish_sets_contract_amqp_properties published then did a single immediate basic_get on a separate channel; a freshly published message is not always retrievable on the very next get, so it intermittently returned None (AssertionError: unexpectedly None). Add a short poll-with-timeout helper (_get) and use it for both basic_get sites.
1 parent e2788f6 commit dc6d828

1 file changed

Lines changed: 15 additions & 2 deletions

File tree

tests/test_pika_transport.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from __future__ import annotations
99

1010
import os
11+
import time
1112
import unittest
1213
import uuid
1314

@@ -51,6 +52,18 @@ def _depth(self, queue: str) -> int:
5152
method = self.ctl.queue_declare(queue=queue, durable=True, passive=True)
5253
return method.method.message_count
5354

55+
def _get(self, queue: str, timeout: float = 5.0):
56+
"""basic_get with a short poll. A message published on a separate
57+
channel is not always retrievable on the very next get, so wait
58+
briefly for it rather than asserting on a single immediate poll —
59+
otherwise this races and flakes under CI load."""
60+
deadline = time.monotonic() + timeout
61+
while True:
62+
frame = self.ctl.basic_get(queue=queue, auto_ack=True)
63+
if frame[0] is not None or time.monotonic() >= deadline:
64+
return frame
65+
time.sleep(0.05)
66+
5467
def test_publish_consume_round_trip_and_ack(self) -> None:
5568
app = BabelQueue(AMQP_URL, queue=self.queue)
5669
seen = {}
@@ -70,7 +83,7 @@ def test_publish_sets_contract_amqp_properties(self) -> None:
7083
app = BabelQueue(AMQP_URL, queue=self.queue)
7184
app.publish("urn:babel:orders:created", {"order_id": 1}, trace_id="trace-amqp")
7285

73-
method, props, body = self.ctl.basic_get(queue=self.queue, auto_ack=True)
86+
method, props, body = self._get(self.queue)
7487
self.assertIsNotNone(method)
7588
self.assertEqual(props.type, "urn:babel:orders:created") # route on properties.type
7689
self.assertEqual(props.correlation_id, "trace-amqp") # trace_id
@@ -89,7 +102,7 @@ def handle(data, meta): # noqa: ANN001
89102
app.consume(max_messages=2, timeout=3)
90103

91104
self.assertEqual(self._depth(f"{self.queue}.dlq"), 1)
92-
_m, _p, body = self.ctl.basic_get(queue=f"{self.queue}.dlq", auto_ack=True)
105+
_m, _p, body = self._get(f"{self.queue}.dlq")
93106
env = EnvelopeCodec.decode(body.decode("utf-8"))
94107
self.assertEqual(env["dead_letter"]["reason"], "failed")
95108

0 commit comments

Comments
 (0)