From 694cb0bdca2831095ec75a1886971ac58172c7b1 Mon Sep 17 00:00:00 2001 From: Bartek Wolowiec Date: Tue, 17 Mar 2026 14:38:00 +0000 Subject: [PATCH] EventQueue: enqueue items in child queues without blocking --- src/a2a/server/events/event_queue.py | 38 ++++++++++++++++++++---- tests/server/events/test_event_queue.py | 39 +++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 5 deletions(-) diff --git a/src/a2a/server/events/event_queue.py b/src/a2a/server/events/event_queue.py index d216d7eb..1044eb82 100644 --- a/src/a2a/server/events/event_queue.py +++ b/src/a2a/server/events/event_queue.py @@ -41,10 +41,11 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None: self._children: list[EventQueue] = [] self._is_closed = False self._lock = asyncio.Lock() + self._bg_tasks: set[asyncio.Task[None]] = set() logger.debug('EventQueue initialized.') async def enqueue_event(self, event: Event) -> None: - """Enqueues an event to this queue and all its children. + """Enqueues an event to this queue and propagates it to all child queues. Args: event: The event object to enqueue. @@ -59,7 +60,12 @@ async def enqueue_event(self, event: Event) -> None: # Make sure to use put instead of put_nowait to avoid blocking the event loop. await self.queue.put(event) for child in self._children: - await child.enqueue_event(event) + # We use a background task to enqueue to children to avoid blocking + # the parent queue if a child queue is full (e.g. slow consumer). + # This prevents deadlocks where a slow consumer blocks the producer. + task = asyncio.create_task(child.enqueue_event(event)) + self._bg_tasks.add(task) + task.add_done_callback(self._bg_tasks.discard) async def dequeue_event(self, no_wait: bool = False) -> Event: """Dequeues an event from the queue. @@ -132,6 +138,17 @@ def tap(self) -> 'EventQueue': self._children.append(queue) return queue + async def flush(self) -> None: + """Waits for all pending background propagation tasks to complete recursively.""" + while self._bg_tasks: + # Copy the set to avoid "Set changed size during iteration" + tasks = list(self._bg_tasks) + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + if self._children: + await asyncio.gather(*(child.flush() for child in self._children)) + async def close(self, immediate: bool = False) -> None: """Closes the queue for future push events and also closes all child queues. @@ -161,6 +178,12 @@ async def close(self, immediate: bool = False) -> None: return if not self._is_closed: self._is_closed = True + + if immediate: + # Cancel all pending background propagation tasks + for task in self._bg_tasks: + task.cancel() + # If using python 3.13 or higher, use shutdown but match <3.13 semantics if sys.version_info >= (3, 13): if immediate: @@ -170,10 +193,12 @@ async def close(self, immediate: bool = False) -> None: for child in self._children: await child.close(True) return - # Graceful: prevent further gets/puts via shutdown, then wait for drain and children + # Graceful: prevent further gets/puts via shutdown, then wait for drain, propagation and children self.queue.shutdown(False) await asyncio.gather( - self.queue.join(), *(child.close() for child in self._children) + self.queue.join(), + self.flush(), + *(child.close() for child in self._children), ) # Otherwise, join the queue else: @@ -182,8 +207,11 @@ async def close(self, immediate: bool = False) -> None: for child in self._children: await child.close(immediate) return + # Graceful: wait for drain, propagation and children await asyncio.gather( - self.queue.join(), *(child.close() for child in self._children) + self.queue.join(), + self.flush(), + *(child.close() for child in self._children), ) def is_closed(self) -> bool: diff --git a/tests/server/events/test_event_queue.py b/tests/server/events/test_event_queue.py index 686a90b3..ed4a46c8 100644 --- a/tests/server/events/test_event_queue.py +++ b/tests/server/events/test_event_queue.py @@ -160,6 +160,9 @@ async def test_enqueue_event_propagates_to_children( await event_queue.enqueue_event(event1) await event_queue.enqueue_event(event2) + # Wait for all background tasks to complete + await event_queue.flush() + # Check parent queue assert await event_queue.dequeue_event(no_wait=True) == event1 assert await event_queue.dequeue_event(no_wait=True) == event2 @@ -203,6 +206,36 @@ async def test_enqueue_event_when_closed( await child_queue.dequeue_event(no_wait=True) +@pytest.mark.asyncio +async def test_event_queue_slow_consumer_does_not_block_parent( + event_queue: EventQueue, +) -> None: + """Test that a slow or blocked consumer on a tapped queue doesn't block the parent queue.""" + child_queue = event_queue.tap() + + # Artificially limit the child queue to a size of 1 so it fills up instantly + child_queue.queue = asyncio.Queue(maxsize=1) + + # Enqueue first event. It should fit in the child queue. + event1 = create_sample_message('1') + await event_queue.enqueue_event(event1) + + # Enqueue second event. The child queue is now full. + # If the parent blocks on `await child_queue.enqueue_event()`, this will hang. + event2 = create_sample_message('2') + try: + # Give it a short timeout. If it hangs, it means the parent is blocked. + await asyncio.wait_for(event_queue.enqueue_event(event2), timeout=0.1) + except asyncio.TimeoutError: + pytest.fail( + 'Parent EventQueue was blocked by a full child queue (slow consumer)!' + ) + + # Clean up to prevent background tasks from leaking or complaining + await child_queue.dequeue_event() + await child_queue.dequeue_event() + + @pytest.fixture def expected_queue_closed_exception() -> type[Exception]: if sys.version_info < (3, 13): @@ -420,6 +453,9 @@ async def test_close_immediate_propagates_to_children( event = create_sample_message() await event_queue.enqueue_event(event) + # Wait for background propagation to finish + await event_queue.flush() + assert child_queue.is_closed() is False assert child_queue.queue.empty() is False @@ -440,6 +476,9 @@ async def test_clear_events_current_queue_only(event_queue: EventQueue) -> None: await event_queue.enqueue_event(event1) await event_queue.enqueue_event(event2) + # Wait for all background tasks to complete + await event_queue.flush() + # Clear only parent queue await event_queue.clear_events(clear_child_queues=False)