Skip to content

fix: In AsyncPipeline cancel inflight tasks if a PipelineRuntimeError is raised by another component#11499

Open
sjrl wants to merge 1 commit into
v3from
cancel-inflight-tasks-async-pipeline
Open

fix: In AsyncPipeline cancel inflight tasks if a PipelineRuntimeError is raised by another component#11499
sjrl wants to merge 1 commit into
v3from
cancel-inflight-tasks-async-pipeline

Conversation

@sjrl
Copy link
Copy Markdown
Contributor

@sjrl sjrl commented Jun 3, 2026

Related Issues

  • fixes #issue-number

Proposed Changes:

Fixes an issue that any inflight tasks would leak if a component raised an error in AsyncPipeline with concurrency > 1

How did you test it?

Added new unit test

Notes for the reviewer

Found this while investigating merging Pipeline and AsyncPipeline. Was wondering since we are leaning towards merging the two classes if it would be possible to also bring concurrency to Pipeline.run. This is somewhat complicated by the BreakPoint feature though since the creation of a PipelineSnaphsot is only valid if there are no in-flight tasks.

Checklist

  • I have read the contributors guidelines and the code of conduct.
  • I have updated the related issue with new insights and changes.
  • I have added unit tests and updated the docstrings.
  • I've used one of the conventional commit types for my PR title: fix:, feat:, build:, chore:, ci:, docs:, style:, refactor:, perf:, test: and added ! in case the PR includes breaking changes.
  • I have documented my code.
  • I have added a release note file, following the contributors guidelines.
  • I have run pre-commit hooks and fixed any issue.

@sjrl sjrl requested a review from a team as a code owner June 3, 2026 12:21
@sjrl sjrl requested review from anakin87 and removed request for a team June 3, 2026 12:21
@vercel
Copy link
Copy Markdown

vercel Bot commented Jun 3, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
haystack-docs Ignored Ignored Preview Jun 3, 2026 12:21pm

Request Review

@sjrl sjrl self-assigned this Jun 3, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 3, 2026

Coverage report

Click to see where and how coverage changed

FileStatementsMissingCoverageCoverage
(new stmts)
Lines missing
  haystack/core/pipeline
  async_pipeline.py
Project Total  

This report was generated by python-coverage-comment-action

@claude
Copy link
Copy Markdown

claude Bot commented Jun 3, 2026

Code review

No issues found. Checked for bugs and CLAUDE.md compliance.

Copy link
Copy Markdown
Member

@anakin87 anakin87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I left two comments.

Was wondering since we are leaning towards merging the two classes if it would be possible to also bring concurrency to Pipeline.run.

I would support this if feasible. Concurrency!=async and this would make things clearer.

# A component failed. Cancel and drain the remaining in-flight tasks so they don't keep running in
# the background (and leak) after the run is aborted, then re-raise the original error.
await AsyncPipeline._cancel_in_flight_tasks(running_tasks, scheduled_components)
raise
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might also want to apply this cancellation logic to run_async_generator to take into account early stop in iteration

we could do this by something similar (in run_async_generator)

              try:
                  while True:
                      ...
              finally:
                  if running_tasks:
                      await AsyncPipeline._cancel_in_flight_tasks(running_tasks, scheduled_components)

WDYT?

Fixed a task leak in ``AsyncPipeline`` when running components concurrently. Previously, if one component
raised an error while sibling components were still running, those in-flight tasks were neither awaited nor
cancelled and kept running in the background until the event loop was torn down. They are now cancelled and
drained before the original error is re-raised.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be only valid for native async components. Sync ones run using loop.run_in_executor and it seems not to be an easy way to cancel them. Maybe we should add a quick not on this aspect?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants