Skip to content

Commit 8dd9eab

Browse files
committed
Add celery_app parameter to job_queue_size for priority queue support
Fixes RabbitMQ PRECONDITION_FAILED error when querying queues configured with custom arguments like x-max-priority. The celery_app parameter allows extracting queue arguments from the app's task_queues configuration and passing them to queue_declare(). The celery_app and broker_url parameters are mutually exclusive - a ValueError is raised if both are provided.
1 parent 0923bed commit 8dd9eab

2 files changed

Lines changed: 225 additions & 37 deletions

File tree

hirefire_resource/macro/celery.py

Lines changed: 94 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,26 @@ class ChannelError(Exception):
2525
from hirefire_resource.errors import MissingQueueError
2626

2727

28+
def _get_queue_arguments_from_app(app, queues):
29+
"""
30+
Extract queue arguments from Celery app configuration for specified queues.
31+
32+
Args:
33+
app: Celery app instance with task_queues configuration
34+
queues: List of queue names to extract arguments for
35+
36+
Returns:
37+
dict: Mapping of queue name to queue_arguments dict
38+
"""
39+
queue_args = {}
40+
task_queues = getattr(app.conf, "task_queues", None) or []
41+
for q in task_queues:
42+
queue_name = getattr(q, "name", None)
43+
if queue_name and queue_name in queues:
44+
queue_args[queue_name] = getattr(q, "queue_arguments", None)
45+
return queue_args
46+
47+
2848
def mitigate_connection_reset_error(retries=10, delay=1):
2949
"""
3050
Decorator to retry a function when ConnectionResetError occurs.
@@ -210,7 +230,7 @@ async def async_job_queue_latency(*queues, broker_url=None):
210230

211231

212232
@mitigate_connection_reset_error()
213-
def job_queue_size(*queues, broker_url=None):
233+
def job_queue_size(*queues, broker_url=None, celery_app=None):
214234
"""
215235
Calculates the total job queue size across the specified queues using Celery with either Redis
216236
or RabbitMQ (AMQP) as the broker.
@@ -229,22 +249,31 @@ def job_queue_size(*queues, broker_url=None):
229249
using a workaround, such as a separate queue for scheduled tasks that forwards tasks ready
230250
to run to the relevant regular queues. When using RabbitMQ (AMQP), consider using the
231251
Delayed Message Plugin.
252+
- For RabbitMQ queues with custom arguments (e.g., x-max-priority for priority queues),
253+
pass your configured Celery app via the `celery_app` parameter. This allows the function
254+
to extract and use the correct queue arguments when querying RabbitMQ.
232255
233256
Args:
234257
*queues (str): Names of the queues for size measurement.
235-
broker_url (str, optional): The broker URL. Defaults in the following order:
258+
broker_url (str, optional): The broker URL. Cannot be used together with `celery_app`.
259+
Defaults in the following order:
236260
- Passed argument `broker_url`.
237261
- Environment variables `AMQP_URL`, `RABBITMQ_URL`, `RABBITMQ_BIGWIG_URL`,
238262
`CLOUDAMQP_URL`, `REDIS_TLS_URL`, `REDIS_URL`, `REDISTOGO_URL`, `REDISCLOUD_URL`,
239263
`OPENREDIS_URL`.
240264
- "amqp://guest:guest@localhost:5672" if AMQP is available, otherwise
241265
"redis://localhost:6379/0".
266+
celery_app (Celery, optional): A configured Celery app instance. Cannot be used together
267+
with `broker_url`. When provided, the function uses this app's connection and extracts
268+
queue arguments from celery_app.conf.task_queues. This is required for RabbitMQ queues
269+
with custom arguments like x-max-priority.
242270
243271
Returns:
244272
int: The cumulative job queue size across the specified queues.
245273
246274
Raises:
247275
MissingQueueError: If no queue names are provided.
276+
ValueError: If both `broker_url` and `celery_app` are provided.
248277
249278
Examples:
250279
>>> job_queue_size("celery")
@@ -255,43 +284,57 @@ def job_queue_size(*queues, broker_url=None):
255284
42
256285
>>> job_queue_size("celery", broker_url="redis://localhost:6379/0")
257286
42
287+
>>> # For priority queues, pass your configured Celery app:
288+
>>> job_queue_size("celery", celery_app=celery_app)
289+
42
258290
"""
259291
if not queues:
260292
raise MissingQueueError()
261293

262-
broker_url = (
263-
broker_url
264-
or os.environ.get("AMQP_URL")
265-
or os.environ.get("RABBITMQ_URL")
266-
or os.environ.get("RABBITMQ_BIGWIG_URL")
267-
or os.environ.get("CLOUDAMQP_URL")
268-
or os.environ.get("REDIS_TLS_URL")
269-
or os.environ.get("REDIS_URL")
270-
or os.environ.get("REDISTOGO_URL")
271-
or os.environ.get("REDISCLOUD_URL")
272-
or os.environ.get("OPENREDIS_URL")
273-
)
274-
275-
if not broker_url:
276-
if AMQP_AVAILABLE:
277-
broker_url = "amqp://guest:guest@localhost:5672"
278-
else:
279-
broker_url = "redis://localhost:6379/0"
280-
281-
app = Celery(broker=broker_url)
294+
if celery_app is not None and broker_url is not None:
295+
raise ValueError(
296+
"Cannot specify both 'celery_app' and 'broker_url'. "
297+
"Use 'celery_app' to pass your configured Celery app (recommended for priority queues), "
298+
"or 'broker_url' for simple setups."
299+
)
300+
301+
if celery_app is None:
302+
broker_url = (
303+
broker_url
304+
or os.environ.get("AMQP_URL")
305+
or os.environ.get("RABBITMQ_URL")
306+
or os.environ.get("RABBITMQ_BIGWIG_URL")
307+
or os.environ.get("CLOUDAMQP_URL")
308+
or os.environ.get("REDIS_TLS_URL")
309+
or os.environ.get("REDIS_URL")
310+
or os.environ.get("REDISTOGO_URL")
311+
or os.environ.get("REDISCLOUD_URL")
312+
or os.environ.get("OPENREDIS_URL")
313+
)
314+
315+
if not broker_url:
316+
if AMQP_AVAILABLE:
317+
broker_url = "amqp://guest:guest@localhost:5672"
318+
else:
319+
broker_url = "redis://localhost:6379/0"
320+
321+
celery_app = Celery(broker=broker_url)
322+
323+
# Extract queue arguments from app configuration (for priority queues, etc.)
324+
queue_args = _get_queue_arguments_from_app(celery_app, queues)
282325

283326
try:
284-
with app.connection_or_acquire() as connection:
327+
with celery_app.connection_or_acquire() as connection:
285328
with connection.channel() as channel:
286-
worker_task_count = _job_queue_size_worker(app, queues)
287-
broker_task_count = _job_queue_size_broker(channel, queues)
329+
worker_task_count = _job_queue_size_worker(celery_app, queues)
330+
broker_task_count = _job_queue_size_broker(channel, queues, queue_args)
288331
return worker_task_count + broker_task_count
289332

290333
except OperationalError:
291334
return 0
292335

293336

294-
async def async_job_queue_size(*queues, broker_url=None):
337+
async def async_job_queue_size(*queues, broker_url=None, celery_app=None):
295338
"""
296339
Asynchronously calculates the total job queue size across the specified queues using Celery with
297340
either Redis or RabbitMQ (AMQP) as the broker.
@@ -311,22 +354,29 @@ async def async_job_queue_size(*queues, broker_url=None):
311354
using a workaround, such as a separate queue for scheduled tasks that forwards tasks ready
312355
to run to the relevant regular queues. When using RabbitMQ (AMQP), consider using the
313356
Delayed Message Plugin.
357+
- For RabbitMQ queues with custom arguments (e.g., x-max-priority for priority queues),
358+
pass your configured Celery app via the `celery_app` parameter.
314359
315360
Args:
316361
*queues (str): Names of the queues for size measurement.
317-
broker_url (str, optional): The broker URL. Defaults in the following order:
362+
broker_url (str, optional): The broker URL. Cannot be used together with `celery_app`.
363+
Defaults in the following order:
318364
- Passed argument `broker_url`.
319365
- Environment variables `AMQP_URL`, `RABBITMQ_URL`, `RABBITMQ_BIGWIG_URL`,
320366
`CLOUDAMQP_URL`, `REDIS_TLS_URL`, `REDIS_URL`, `REDISTOGO_URL`, `REDISCLOUD_URL`,
321367
`OPENREDIS_URL`.
322368
- "amqp://guest:guest@localhost:5672" if AMQP is available, otherwise
323369
"redis://localhost:6379/0".
370+
celery_app (Celery, optional): A configured Celery app instance. Cannot be used together
371+
with `broker_url`. When provided, the function uses this app's connection and extracts
372+
queue arguments from celery_app.conf.task_queues.
324373
325374
Returns:
326375
int: The cumulative job queue size across the specified queues.
327376
328377
Raises:
329378
MissingQueueError: If no queue names are provided.
379+
ValueError: If both `broker_url` and `celery_app` are provided.
330380
331381
Examples:
332382
>>> await async_job_queue_size("celery")
@@ -335,11 +385,13 @@ async def async_job_queue_size(*queues, broker_url=None):
335385
85
336386
>>> await async_job_queue_size("celery", broker_url="amqp://user:password@host:5672")
337387
42
338-
>>> await async_job_queue_size("celery", broker_url="redis://localhost:6379/0")
388+
>>> await async_job_queue_size("celery", celery_app=celery_app)
339389
42
340390
"""
341391
loop = asyncio.get_event_loop()
342-
func = functools.partial(job_queue_size, *queues, broker_url=broker_url)
392+
func = functools.partial(
393+
job_queue_size, *queues, broker_url=broker_url, celery_app=celery_app
394+
)
343395
return await loop.run_in_executor(None, func)
344396

345397

@@ -399,22 +451,28 @@ def _job_queue_size_worker(app, queues):
399451
return sum(worker_data.get(queue, 0) for queue in queues)
400452

401453

402-
def _job_queue_size_broker(channel, queues):
454+
def _job_queue_size_broker(channel, queues, queue_args=None):
455+
if queue_args is None:
456+
queue_args = {}
457+
403458
if hasattr(channel, "_size"):
404-
fn = _job_queue_size_redis
459+
return sum(_job_queue_size_redis(channel, queue) for queue in queues)
405460
else:
406-
fn = _job_queue_size_rabbitmq
407-
408-
return sum(fn(channel, queue) for queue in queues)
461+
return sum(
462+
_job_queue_size_rabbitmq(channel, queue, queue_args.get(queue))
463+
for queue in queues
464+
)
409465

410466

411467
def _job_queue_size_redis(channel, queue):
412468
return channel.client.llen(queue)
413469

414470

415-
def _job_queue_size_rabbitmq(channel, queue):
471+
def _job_queue_size_rabbitmq(channel, queue, arguments=None):
416472
try:
417-
return channel.queue_declare(queue=queue, passive=True).message_count
473+
return channel.queue_declare(
474+
queue=queue, passive=True, arguments=arguments
475+
).message_count
418476
except ChannelError:
419477
return 0
420478

tests/hirefire_resource/macro/test_celery.py

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import math
22
from datetime import datetime, timedelta, timezone
3-
from unittest.mock import patch
43

54
import pytest
65
from celery import Celery
6+
from kombu import Queue
77

88
from hirefire_resource.errors import MissingQueueError
99
from hirefire_resource.macro.celery import (
@@ -215,3 +215,133 @@ async def test_job_queue_size_with_jobs_async(celery_app):
215215
)
216216
== 10
217217
)
218+
219+
220+
# Tests for priority queues (RabbitMQ only)
221+
222+
223+
@pytest.fixture
224+
def priority_celery_app():
225+
"""Create a Celery app with priority queue configuration."""
226+
broker_url = "amqp://guest:guest@localhost:5672"
227+
app = Celery(broker=broker_url)
228+
229+
# Configure queues with x-max-priority
230+
queue_arguments = {"x-max-priority": 10}
231+
app.conf.task_queues = [
232+
Queue("priority_queue", queue_arguments=queue_arguments),
233+
]
234+
app.conf.task_queue_max_priority = 10
235+
app.conf.task_default_priority = 5
236+
237+
return app
238+
239+
240+
@pytest.fixture
241+
def setup_priority_queue(priority_celery_app):
242+
"""Create the priority queue in RabbitMQ with x-max-priority argument."""
243+
with priority_celery_app.connection_or_acquire() as connection:
244+
channel = connection.default_channel
245+
# Delete queue if it exists (to start fresh)
246+
channel.queue_delete(queue="priority_queue")
247+
# Create queue WITH x-max-priority argument
248+
channel.queue_declare(
249+
queue="priority_queue",
250+
durable=True,
251+
auto_delete=False,
252+
arguments={"x-max-priority": 10},
253+
)
254+
255+
yield priority_celery_app
256+
257+
# Cleanup: delete the queue after test
258+
with priority_celery_app.connection_or_acquire() as connection:
259+
channel = connection.default_channel
260+
channel.queue_delete(queue="priority_queue")
261+
262+
263+
def test_job_queue_size_priority_queue_with_broker_url(setup_priority_queue):
264+
"""
265+
Test job_queue_size with broker_url on a priority queue.
266+
267+
Note: In our test environment (RabbitMQ 4.2.2, py-amqp 5.3.1), this works fine
268+
because passive=True declarations don't validate arguments. However, some
269+
RabbitMQ versions or configurations may return PRECONDITION_FAILED.
270+
271+
For guaranteed compatibility with priority queues, use celery_app parameter instead.
272+
"""
273+
priority_celery_app = setup_priority_queue
274+
broker_url = priority_celery_app.conf.broker_url
275+
276+
# Add tasks to the queue
277+
priority_celery_app.send_task("test_task", queue="priority_queue")
278+
priority_celery_app.send_task("test_task", queue="priority_queue")
279+
280+
# Using broker_url (no queue arguments passed)
281+
result = job_queue_size("priority_queue", broker_url=broker_url)
282+
283+
# In our test environment this works, but may fail in other environments
284+
assert result == 2
285+
286+
287+
def test_job_queue_size_priority_queue_with_celery_app_returns_correct_count(
288+
setup_priority_queue,
289+
):
290+
"""
291+
Test that job_queue_size returns the correct count when passed the Celery app
292+
that has the queue configuration with x-max-priority.
293+
294+
This is the recommended approach for priority queues - by passing the celery_app,
295+
we extract the queue arguments and pass them to queue_declare.
296+
"""
297+
priority_celery_app = setup_priority_queue
298+
299+
# Add tasks to the queue
300+
priority_celery_app.send_task("test_task", queue="priority_queue")
301+
priority_celery_app.send_task("test_task", queue="priority_queue")
302+
priority_celery_app.send_task("test_task", queue="priority_queue")
303+
304+
# Recommended: Pass the celery_app parameter so queue arguments are extracted
305+
result = job_queue_size("priority_queue", celery_app=priority_celery_app)
306+
307+
# This should return the correct count
308+
assert result == 3
309+
310+
311+
def test_job_queue_size_raises_error_when_both_broker_url_and_celery_app_provided(
312+
setup_priority_queue,
313+
):
314+
"""
315+
Test that job_queue_size raises ValueError when both broker_url and celery_app
316+
are provided, since they are mutually exclusive.
317+
"""
318+
priority_celery_app = setup_priority_queue
319+
320+
with pytest.raises(ValueError) as exc_info:
321+
job_queue_size(
322+
"priority_queue",
323+
broker_url="amqp://guest:guest@localhost:5672",
324+
celery_app=priority_celery_app,
325+
)
326+
327+
assert "Cannot specify both" in str(exc_info.value)
328+
329+
330+
@pytest.mark.asyncio
331+
async def test_async_job_queue_size_raises_error_when_both_broker_url_and_celery_app_provided(
332+
setup_priority_queue,
333+
):
334+
"""
335+
Test that async_job_queue_size raises ValueError when both broker_url and celery_app
336+
are provided, since they are mutually exclusive.
337+
"""
338+
priority_celery_app = setup_priority_queue
339+
340+
with pytest.raises(ValueError) as exc_info:
341+
await async_job_queue_size(
342+
"priority_queue",
343+
broker_url="amqp://guest:guest@localhost:5672",
344+
celery_app=priority_celery_app,
345+
)
346+
347+
assert "Cannot specify both" in str(exc_info.value)

0 commit comments

Comments
 (0)