-
Notifications
You must be signed in to change notification settings - Fork 425
Expand file tree
/
Copy pathevent_queue.py
More file actions
272 lines (229 loc) · 11 KB
/
event_queue.py
File metadata and controls
272 lines (229 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
import asyncio
import logging
import sys
from a2a.types.a2a_pb2 import (
Message,
Task,
TaskArtifactUpdateEvent,
TaskStatusUpdateEvent,
)
from a2a.utils.telemetry import SpanKind, trace_class
logger = logging.getLogger(__name__)
Event = Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent
"""Type alias for events that can be enqueued."""
DEFAULT_MAX_QUEUE_SIZE = 1024
@trace_class(kind=SpanKind.SERVER)
class EventQueue:
"""Event queue for A2A responses from agent.
Acts as a buffer between the agent's asynchronous execution and the
server's response handling (e.g., streaming via SSE). Supports tapping
to create child queues that receive the same events.
"""
def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None:
"""Initializes the EventQueue."""
# Make sure the `asyncio.Queue` is bounded.
# If it's unbounded (maxsize=0), then `queue.put()` never needs to wait,
# and so the streaming won't work correctly.
if max_queue_size <= 0:
raise ValueError('max_queue_size must be greater than 0')
self.queue: asyncio.Queue[Event] = asyncio.Queue(maxsize=max_queue_size)
self._children: list[EventQueue] = []
self._is_closed = False
self._lock = asyncio.Lock()
self._bg_tasks: set[asyncio.Task[None]] = set()
logger.debug('EventQueue initialized.')
async def enqueue_event(self, event: Event) -> None:
"""Enqueues an event to this queue and propagates it to all child queues.
Args:
event: The event object to enqueue.
"""
async with self._lock:
if self._is_closed:
logger.warning('Queue is closed. Event will not be enqueued.')
return
logger.debug('Enqueuing event of type: %s', type(event))
# Make sure to use put instead of put_nowait to avoid blocking the event loop.
await self.queue.put(event)
for child in self._children:
# We use a background task to enqueue to children to avoid blocking
# the parent queue if a child queue is full (e.g. slow consumer).
# This prevents deadlocks where a slow consumer blocks the producer.
task = asyncio.create_task(child.enqueue_event(event))
self._bg_tasks.add(task)
task.add_done_callback(self._bg_tasks.discard)
async def dequeue_event(self, no_wait: bool = False) -> Event:
"""Dequeues an event from the queue.
This implementation expects that dequeue to raise an exception when
the queue has been closed. In python 3.13+ this is naturally provided
by the QueueShutDown exception generated when the queue has closed and
the user is awaiting the queue.get method. Python<=3.12 this needs to
manage this lifecycle itself. The current implementation can lead to
blocking if the dequeue_event is called before the EventQueue has been
closed but when there are no events on the queue. Two ways to avoid this
are to call this with no_wait = True which won't block, but is the
callers responsibility to retry as appropriate. Alternatively, one can
use an async Task management solution to cancel the get task if the queue
has closed or some other condition is met. The implementation of the
EventConsumer uses an async.wait with a timeout to abort the
dequeue_event call and retry, when it will return with a closed error.
Args:
no_wait: If True, retrieve an event immediately or raise `asyncio.QueueEmpty`.
If False (default), wait until an event is available.
Returns:
The next event from the queue.
Raises:
asyncio.QueueEmpty: If `no_wait` is True and the queue is empty.
asyncio.QueueShutDown: If the queue has been closed and is empty.
"""
async with self._lock:
if (
sys.version_info < (3, 13)
and self._is_closed
and self.queue.empty()
):
# On 3.13+, skip early raise; await self.queue.get() will raise QueueShutDown after shutdown()
logger.warning('Queue is closed. Event will not be dequeued.')
raise asyncio.QueueEmpty('Queue is closed.')
if no_wait:
logger.debug('Attempting to dequeue event (no_wait=True).')
event = self.queue.get_nowait()
logger.debug(
'Dequeued event (no_wait=True) of type: %s', type(event)
)
return event
logger.debug('Attempting to dequeue event (waiting).')
event = await self.queue.get()
logger.debug('Dequeued event (waited) of type: %s', type(event))
return event
def task_done(self) -> None:
"""Signals that a formerly enqueued task is complete.
Used in conjunction with `dequeue_event` to track processed items.
"""
logger.debug('Marking task as done in EventQueue.')
self.queue.task_done()
def tap(self) -> 'EventQueue':
"""Taps the event queue to create a new child queue that receives all future events.
Returns:
A new `EventQueue` instance that will receive all events enqueued
to this parent queue from this point forward.
"""
logger.debug('Tapping EventQueue to create a child queue.')
queue = EventQueue()
self._children.append(queue)
return queue
async def flush(self) -> None:
"""Waits for all pending background propagation tasks to complete recursively."""
while self._bg_tasks:
# Copy the set to avoid "Set changed size during iteration"
tasks = list(self._bg_tasks)
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
if self._children:
await asyncio.gather(*(child.flush() for child in self._children))
async def close(self, immediate: bool = False) -> None:
"""Closes the queue for future push events and also closes all child queues.
Once closed, no new events can be enqueued. Behavior is consistent across
Python versions:
- Python >= 3.13: Uses `asyncio.Queue.shutdown` to stop the queue. With
`immediate=True` the queue is shut down and pending events are cleared; with
`immediate=False` the queue is shut down and we wait for it to drain via
`queue.join()`.
- Python < 3.13: Emulates the same semantics by clearing on `immediate=True`
or awaiting `queue.join()` on `immediate=False`.
Consumers attempting to dequeue after close on an empty queue will observe
`asyncio.QueueShutDown` on Python >= 3.13 and `asyncio.QueueEmpty` on
Python < 3.13.
Args:
immediate (bool):
- True: Immediately closes the queue and clears all unprocessed events without waiting for them to be consumed. This is suitable for scenarios where you need to forcefully interrupt and quickly release resources.
- False (default): Gracefully closes the queue, waiting for all queued events to be processed (i.e., the queue is drained) before closing. This is suitable when you want to ensure all events are handled.
"""
logger.debug('Closing EventQueue.')
async with self._lock:
# If already closed, just return.
if self._is_closed and not immediate:
return
if not self._is_closed:
self._is_closed = True
if immediate:
# Cancel all pending background propagation tasks
for task in self._bg_tasks:
task.cancel()
# If using python 3.13 or higher, use shutdown but match <3.13 semantics
if sys.version_info >= (3, 13):
if immediate:
# Immediate: stop queue and clear any pending events, then close children
self.queue.shutdown(True)
await self.clear_events(True)
for child in self._children:
await child.close(True)
return
# Graceful: prevent further gets/puts via shutdown, then wait for drain, propagation and children
self.queue.shutdown(False)
await asyncio.gather(
self.queue.join(),
self.flush(),
*(child.close() for child in self._children),
)
# Otherwise, join the queue
else:
if immediate:
await self.clear_events(True)
for child in self._children:
await child.close(immediate)
return
# Graceful: wait for drain, propagation and children
await asyncio.gather(
self.queue.join(),
self.flush(),
*(child.close() for child in self._children),
)
def is_closed(self) -> bool:
"""Checks if the queue is closed."""
return self._is_closed
async def clear_events(self, clear_child_queues: bool = True) -> None:
"""Clears all events from the current queue and optionally all child queues.
This method removes all pending events from the queue without processing them.
Child queues can be optionally cleared based on the clear_child_queues parameter.
Args:
clear_child_queues: If True (default), clear all child queues as well.
If False, only clear the current queue, leaving child queues untouched.
"""
logger.debug('Clearing all events from EventQueue and child queues.')
# Clear all events from the queue, even if closed
cleared_count = 0
async with self._lock:
try:
while True:
event = self.queue.get_nowait()
logger.debug(
'Discarding unprocessed event of type: %s, content: %s',
type(event),
event,
)
self.queue.task_done()
cleared_count += 1
except asyncio.QueueEmpty:
pass
except Exception as e:
# Handle Python 3.13+ QueueShutDown
if (
sys.version_info >= (3, 13)
and type(e).__name__ == 'QueueShutDown'
):
pass
else:
raise
if cleared_count > 0:
logger.debug(
'Cleared %d unprocessed events from EventQueue.',
cleared_count,
)
# Clear all child queues (lock released before awaiting child tasks)
if clear_child_queues and self._children:
child_tasks = [
asyncio.create_task(child.clear_events())
for child in self._children
]
if child_tasks:
await asyncio.gather(*child_tasks, return_exceptions=True)