fix: In AsyncPipeline cancel inflight tasks if a PipelineRuntimeError is raised by another component#11499
fix: In AsyncPipeline cancel inflight tasks if a PipelineRuntimeError is raised by another component#11499sjrl wants to merge 1 commit into
AsyncPipeline cancel inflight tasks if a PipelineRuntimeError is raised by another component#11499Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. 1 Skipped Deployment
|
Coverage reportClick to see where and how coverage changed
This report was generated by python-coverage-comment-action |
||||||||||||||||||||||||
Code reviewNo issues found. Checked for bugs and CLAUDE.md compliance. |
anakin87
left a comment
There was a problem hiding this comment.
Thank you. I left two comments.
Was wondering since we are leaning towards merging the two classes if it would be possible to also bring concurrency to Pipeline.run.
I would support this if feasible. Concurrency!=async and this would make things clearer.
| # A component failed. Cancel and drain the remaining in-flight tasks so they don't keep running in | ||
| # the background (and leak) after the run is aborted, then re-raise the original error. | ||
| await AsyncPipeline._cancel_in_flight_tasks(running_tasks, scheduled_components) | ||
| raise |
There was a problem hiding this comment.
we might also want to apply this cancellation logic to run_async_generator to take into account early stop in iteration
we could do this by something similar (in run_async_generator)
try:
while True:
...
finally:
if running_tasks:
await AsyncPipeline._cancel_in_flight_tasks(running_tasks, scheduled_components)WDYT?
| Fixed a task leak in ``AsyncPipeline`` when running components concurrently. Previously, if one component | ||
| raised an error while sibling components were still running, those in-flight tasks were neither awaited nor | ||
| cancelled and kept running in the background until the event loop was torn down. They are now cancelled and | ||
| drained before the original error is re-raised. |
There was a problem hiding this comment.
this seems to be only valid for native async components. Sync ones run using loop.run_in_executor and it seems not to be an easy way to cancel them. Maybe we should add a quick not on this aspect?
Related Issues
Proposed Changes:
Fixes an issue that any inflight tasks would leak if a component raised an error in
AsyncPipelinewith concurrency > 1How did you test it?
Added new unit test
Notes for the reviewer
Found this while investigating merging
PipelineandAsyncPipeline. Was wondering since we are leaning towards merging the two classes if it would be possible to also bring concurrency toPipeline.run. This is somewhat complicated by theBreakPointfeature though since the creation of aPipelineSnaphsotis only valid if there are no in-flight tasks.Checklist
fix:,feat:,build:,chore:,ci:,docs:,style:,refactor:,perf:,test:and added!in case the PR includes breaking changes.