-
Notifications
You must be signed in to change notification settings - Fork 381
Description
What happened?
In ResultAggregator.consume_and_break_on_interrupt(), when a non-blocking message/send request interrupts early (e.g., blocking=False), the remaining event consumption is delegated to a background task via asyncio.create_task(). However, the returned Task object is not saved, making it eligible for garbage collection.
# src/a2a/server/tasks/result_aggregator.py, line 162
asyncio.create_task( # noqa: RUF006
self._continue_consuming(event_stream, event_callback)
)
Per the Python docs:
Important: Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn't referenced elsewhere may get garbage collected at any time, even before it's done.
When this task is garbage collected:
- The event_stream async generator is finalized
- Remaining events (artifacts, completed/failed status updates) are never processed
- The
event_callback(push notification sender) never fires for the final task state - The task remains stuck in working state permanently from the client's perspective
Steps to Reproduce
- Configure an A2A server with push notifications enabled
- Send a
message/sendrequest with MessageSendConfiguration(blocking=False, push_notification_config=...) - Have the agent executor take a non-trivial amount of time (>10 seconds) to complete
- Observe that the task is acknowledged with working status but never transitions to completed
- The push notification webhook for the final state is never sent
The longer the agent takes, the more likely the GC collects the task. For agents running 60+ seconds, this is essentially 100% reproducible.
Expected Behavior
The background _continue_consuming task should complete, processing all remaining events and firing the push notification callback for each, including the terminal state.
Root Cause
The noqa: RUF006 comment on line 162 suppresses Ruff's asyncio-dangling-task rule, which specifically warns about this pattern. The TODO on line 161 also acknowledges the issue:
# TODO: We should track all outstanding tasks to ensure they eventually complete.
DefaultRequestHandler already has a _background_tasks: set[asyncio.Task] and _track_background_task() method for exactly this purpose, but ResultAggregator doesn't use it.
Suggested Fix
Return the background task from consume_and_break_on_interrupt() so the caller can track it, or accept a task-tracking callback. For example:
# Return the background task in the tuple
async def consume_and_break_on_interrupt(
self, consumer, blocking=True, event_callback=None
) -> tuple[Task | Message | None, bool, asyncio.Task | None]:
...
bg_task = None
if should_interrupt:
bg_task = asyncio.create_task(
self._continue_consuming(event_stream, event_callback)
)
interrupted = True
break
return await self.task_manager.get_task(), interrupted, bg_task
Then in DefaultRequestHandler.on_message_send:
result, interrupted_or_non_blocking, bg_task = await result_aggregator.consume_and_break_on_interrupt(...)
if bg_task:
bg_task.set_name(f'continue_consuming:{task_id}')
self._track_background_task(bg_task)
Environment
- a2a-sdk version: 0.3.22 (also present on latest main)
- Python: 3.12.12
- OS: macOS (ARM64)
Related
- PR fix: non-blocking
send_messageserver handler not invoke push notification #394 introduced the event_callback mechanism but also introduced this untracked task - Issue [Feat]: Non blocking with intermediary responses (via push notification) #239 (original non-blocking push notification bug)
Relevant log output
Code of Conduct
- I agree to follow this project's Code of Conduct