-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathqueue.py
More file actions
73 lines (58 loc) · 2.35 KB
/
queue.py
File metadata and controls
73 lines (58 loc) · 2.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import enum
from dataclasses import dataclass, field
from pamqp.common import FieldTable
class QueueType(str, enum.Enum):
"""Enum representing different types of RabbitMQ queues."""
QUORUM = "quorum"
CLASSIC = "classic"
STREAM = "stream"
@dataclass(frozen=True)
class Queue:
"""
Represents a RabbitMQ queue configuration.
Attributes:
name: The name of the queue.
type: The type of the queue (quorum, classic, stream).
declare: Whether to declare the queue on startup.
durable: Whether the queue should survive broker restarts.
exclusive: Whether the queue is exclusive to the connection.
passive: Whether to check if the queue exists without creating it.
auto_delete: Whether the queue should be auto-deleted.
max_priority: The maximum priority for the queue.
arguments: Additional arguments for the queue declaration.
timeout: Timeout for queue declaration.
routing_key: The routing key for the queue.
bind_arguments: Arguments for binding the queue.
bind_timeout: Timeout for binding the queue.
consumer_arguments: Arguments for the consumer.
"""
declare: bool = True
# will be passed as arguments
type: QueueType = QueueType.QUORUM
# from declare_queue arguments
name: str = "taskiq"
durable: bool = True
exclusive: bool = False
passive: bool = False
auto_delete: bool = False
max_priority: int | None = None
arguments: FieldTable = field(default_factory=dict)
timeout: int | float | None = None
# will be used during binding to tasks exchange
routing_key: str | None = None
bind_arguments: FieldTable = field(default_factory=dict)
bind_timeout: int | float | None = None
# will be used during message consumption
consumer_arguments: FieldTable = field(default_factory=dict)
@property
def delay_queue_name(self) -> str:
"""Return the name of the delay queue for this queue."""
return f"{self.name}.delay"
@property
def delay_queue_routing_key(self) -> str:
"""Return the routing key used to publish messages to the delay queue."""
return self.delay_queue_name
@property
def queue_routing_key(self) -> str:
"""Return the effective routing key for this queue."""
return self.routing_key or self.name