-
Notifications
You must be signed in to change notification settings - Fork 391
EventQueue: enqueue items in child queues without blocking #860
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 1.0-dev
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,10 +41,11 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None: | |
| self._children: list[EventQueue] = [] | ||
| self._is_closed = False | ||
| self._lock = asyncio.Lock() | ||
| self._bg_tasks: set[asyncio.Task[None]] = set() | ||
| logger.debug('EventQueue initialized.') | ||
|
|
||
| async def enqueue_event(self, event: Event) -> None: | ||
| """Enqueues an event to this queue and all its children. | ||
| """Enqueues an event to this queue and propagates it to all child queues. | ||
|
|
||
| Args: | ||
| event: The event object to enqueue. | ||
|
|
@@ -59,7 +60,12 @@ async def enqueue_event(self, event: Event) -> None: | |
| # Make sure to use put instead of put_nowait to avoid blocking the event loop. | ||
| await self.queue.put(event) | ||
| for child in self._children: | ||
| await child.enqueue_event(event) | ||
| # We use a background task to enqueue to children to avoid blocking | ||
| # the parent queue if a child queue is full (e.g. slow consumer). | ||
| # This prevents deadlocks where a slow consumer blocks the producer. | ||
| task = asyncio.create_task(child.enqueue_event(event)) | ||
| self._bg_tasks.add(task) | ||
| task.add_done_callback(self._bg_tasks.discard) | ||
|
|
||
| async def dequeue_event(self, no_wait: bool = False) -> Event: | ||
| """Dequeues an event from the queue. | ||
|
|
@@ -132,6 +138,17 @@ def tap(self) -> 'EventQueue': | |
| self._children.append(queue) | ||
| return queue | ||
|
|
||
| async def flush(self) -> None: | ||
| """Waits for all pending background propagation tasks to complete recursively.""" | ||
| while self._bg_tasks: | ||
| # Copy the set to avoid "Set changed size during iteration" | ||
| tasks = list(self._bg_tasks) | ||
| if tasks: | ||
| await asyncio.gather(*tasks, return_exceptions=True) | ||
|
|
||
| if self._children: | ||
| await asyncio.gather(*(child.flush() for child in self._children)) | ||
|
|
||
| async def close(self, immediate: bool = False) -> None: | ||
| """Closes the queue for future push events and also closes all child queues. | ||
|
|
||
|
|
@@ -161,6 +178,12 @@ async def close(self, immediate: bool = False) -> None: | |
| return | ||
| if not self._is_closed: | ||
| self._is_closed = True | ||
|
|
||
| if immediate: | ||
| # Cancel all pending background propagation tasks | ||
| for task in self._bg_tasks: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| task.cancel() | ||
|
|
||
| # If using python 3.13 or higher, use shutdown but match <3.13 semantics | ||
| if sys.version_info >= (3, 13): | ||
| if immediate: | ||
|
|
@@ -170,10 +193,12 @@ async def close(self, immediate: bool = False) -> None: | |
| for child in self._children: | ||
| await child.close(True) | ||
| return | ||
| # Graceful: prevent further gets/puts via shutdown, then wait for drain and children | ||
| # Graceful: prevent further gets/puts via shutdown, then wait for drain, propagation and children | ||
| self.queue.shutdown(False) | ||
| await asyncio.gather( | ||
| self.queue.join(), *(child.close() for child in self._children) | ||
| self.queue.join(), | ||
| self.flush(), | ||
| *(child.close() for child in self._children), | ||
| ) | ||
| # Otherwise, join the queue | ||
| else: | ||
|
|
@@ -182,8 +207,11 @@ async def close(self, immediate: bool = False) -> None: | |
| for child in self._children: | ||
| await child.close(immediate) | ||
| return | ||
| # Graceful: wait for drain, propagation and children | ||
| await asyncio.gather( | ||
| self.queue.join(), *(child.close() for child in self._children) | ||
| self.queue.join(), | ||
| self.flush(), | ||
| *(child.close() for child in self._children), | ||
| ) | ||
|
|
||
| def is_closed(self) -> bool: | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -160,6 +160,9 @@ async def test_enqueue_event_propagates_to_children( | |||||||||||||||
| await event_queue.enqueue_event(event1) | ||||||||||||||||
| await event_queue.enqueue_event(event2) | ||||||||||||||||
|
|
||||||||||||||||
| # Wait for all background tasks to complete | ||||||||||||||||
| await event_queue.flush() | ||||||||||||||||
|
|
||||||||||||||||
| # Check parent queue | ||||||||||||||||
| assert await event_queue.dequeue_event(no_wait=True) == event1 | ||||||||||||||||
| assert await event_queue.dequeue_event(no_wait=True) == event2 | ||||||||||||||||
|
|
@@ -203,6 +206,36 @@ async def test_enqueue_event_when_closed( | |||||||||||||||
| await child_queue.dequeue_event(no_wait=True) | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| @pytest.mark.asyncio | ||||||||||||||||
| async def test_event_queue_slow_consumer_does_not_block_parent( | ||||||||||||||||
| event_queue: EventQueue, | ||||||||||||||||
| ) -> None: | ||||||||||||||||
| """Test that a slow or blocked consumer on a tapped queue doesn't block the parent queue.""" | ||||||||||||||||
| child_queue = event_queue.tap() | ||||||||||||||||
|
|
||||||||||||||||
| # Artificially limit the child queue to a size of 1 so it fills up instantly | ||||||||||||||||
| child_queue.queue = asyncio.Queue(maxsize=1) | ||||||||||||||||
|
|
||||||||||||||||
| # Enqueue first event. It should fit in the child queue. | ||||||||||||||||
| event1 = create_sample_message('1') | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||
| await event_queue.enqueue_event(event1) | ||||||||||||||||
|
|
||||||||||||||||
| # Enqueue second event. The child queue is now full. | ||||||||||||||||
| # If the parent blocks on `await child_queue.enqueue_event()`, this will hang. | ||||||||||||||||
| event2 = create_sample_message('2') | ||||||||||||||||
| try: | ||||||||||||||||
| # Give it a short timeout. If it hangs, it means the parent is blocked. | ||||||||||||||||
| await asyncio.wait_for(event_queue.enqueue_event(event2), timeout=0.1) | ||||||||||||||||
|
Comment on lines
+225
to
+228
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||||||||||||
| except asyncio.TimeoutError: | ||||||||||||||||
| pytest.fail( | ||||||||||||||||
| 'Parent EventQueue was blocked by a full child queue (slow consumer)!' | ||||||||||||||||
| ) | ||||||||||||||||
|
|
||||||||||||||||
| # Clean up to prevent background tasks from leaking or complaining | ||||||||||||||||
| await child_queue.dequeue_event() | ||||||||||||||||
| await child_queue.dequeue_event() | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current cleanup logic might be racy. After the first
Suggested change
|
||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| @pytest.fixture | ||||||||||||||||
| def expected_queue_closed_exception() -> type[Exception]: | ||||||||||||||||
| if sys.version_info < (3, 13): | ||||||||||||||||
|
|
@@ -420,6 +453,9 @@ async def test_close_immediate_propagates_to_children( | |||||||||||||||
| event = create_sample_message() | ||||||||||||||||
| await event_queue.enqueue_event(event) | ||||||||||||||||
|
|
||||||||||||||||
| # Wait for background propagation to finish | ||||||||||||||||
| await event_queue.flush() | ||||||||||||||||
|
|
||||||||||||||||
| assert child_queue.is_closed() is False | ||||||||||||||||
| assert child_queue.queue.empty() is False | ||||||||||||||||
|
|
||||||||||||||||
|
|
@@ -440,6 +476,9 @@ async def test_clear_events_current_queue_only(event_queue: EventQueue) -> None: | |||||||||||||||
| await event_queue.enqueue_event(event1) | ||||||||||||||||
| await event_queue.enqueue_event(event2) | ||||||||||||||||
|
|
||||||||||||||||
| # Wait for all background tasks to complete | ||||||||||||||||
| await event_queue.flush() | ||||||||||||||||
|
|
||||||||||||||||
| # Clear only parent queue | ||||||||||||||||
| await event_queue.clear_events(clear_child_queues=False) | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
if tasks:check is redundant because thewhile self._bg_tasks:loop on line 143 already ensures thattaskswill be a non-empty list here. You can remove this conditional and un-indent the following line for simplification.