Skip to content

EventQueue: enqueue items in child queues without blocking#860

Draft
bartek-w wants to merge 1 commit into1.0-devfrom
bartekw-event-queue
Draft

EventQueue: enqueue items in child queues without blocking#860
bartek-w wants to merge 1 commit into1.0-devfrom
bartekw-event-queue

Conversation

@bartek-w
Copy link
Collaborator

This change allows EventQueue.enqueue_event(...) to return to caller immediately without blocking.

The issue: when one of the child queues is blocked it doesn't block enqueue to all the other child. Additionally it doesn't block the caller at all (that in some scenarios and current architectures could lead to deadlock).

Additional cost: We store all the enqueued tasks in _bg_tasks.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request modifies the EventQueue class to enqueue events to child queues without blocking the parent queue. It introduces background tasks for event propagation and includes mechanisms for managing and cleaning up these tasks during queue closure. This change prevents potential deadlocks and ensures that the event queue can be closed gracefully or immediately.

Highlights

  • Non-blocking Enqueue: The enqueue_event method now uses background tasks to propagate events to child queues, preventing blocking when a child queue is full.
  • Deadlock Prevention: This change avoids potential deadlocks that could occur when a slow consumer blocks the parent queue.
  • Background Task Management: Introduced _bg_tasks to track and manage background tasks, including a flush method to wait for their completion and cancellation during immediate close.
  • Flush Method: Added a flush method to ensure all background tasks for event propagation are completed before closing the queue.
  • Immediate Close: The close method now cancels pending background tasks when called with immediate=True.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a non-blocking mechanism for propagating events to child queues in EventQueue, which is a solid improvement to prevent deadlocks with slow consumers. The use of background tasks is well-implemented, and the addition of flush() and updates to close() are necessary and correctly placed. I've identified a potential race condition in the close method and a few areas for minor improvements in both the implementation and tests to enhance robustness and readability. Overall, this is a great change that improves the resilience of the event system.


if immediate:
# Cancel all pending background propagation tasks
for task in self._bg_tasks:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Iterating directly over self._bg_tasks is not safe here. If a task being cancelled finishes quickly, its done_callback will modify the set, leading to a RuntimeError: Set changed size during iteration. You should iterate over a copy of the set to prevent this.

            for task in list(self._bg_tasks):


# Clean up to prevent background tasks from leaking or complaining
await child_queue.dequeue_event()
await child_queue.dequeue_event()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current cleanup logic might be racy. After the first dequeue_event, the blocked background task for the second event can proceed, but it's not guaranteed to have completed before the second dequeue_event is called. This could lead to a flaky test. Using event_queue.flush() would make this more robust.

Suggested change
await child_queue.dequeue_event()
await event_queue.flush() # Wait for the background task for event2 to complete
await child_queue.dequeue_event() # Dequeue event2

Comment on lines +146 to +147
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The if tasks: check is redundant because the while self._bg_tasks: loop on line 143 already ensures that tasks will be a non-empty list here. You can remove this conditional and un-indent the following line for simplification.

            await asyncio.gather(*tasks, return_exceptions=True)

child_queue.queue = asyncio.Queue(maxsize=1)

# Enqueue first event. It should fit in the child queue.
event1 = create_sample_message('1')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The event1 variable is assigned but never used. You can enqueue the event directly to improve readability.

Suggested change
event1 = create_sample_message('1')
await event_queue.enqueue_event(create_sample_message('1'))

Comment on lines +225 to +228
event2 = create_sample_message('2')
try:
# Give it a short timeout. If it hangs, it means the parent is blocked.
await asyncio.wait_for(event_queue.enqueue_event(event2), timeout=0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The event2 variable is assigned but never used. You can remove it and create the message directly inside the enqueue_event call to make the code cleaner.

Suggested change
event2 = create_sample_message('2')
try:
# Give it a short timeout. If it hangs, it means the parent is blocked.
await asyncio.wait_for(event_queue.enqueue_event(event2), timeout=0.1)
try:
# Give it a short timeout. If it hangs, it means the parent is blocked.
await asyncio.wait_for(event_queue.enqueue_event(create_sample_message('2')), timeout=0.1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant