Skip to content

feat: Allow running asyncpipeline.run in a place with an existing loop#11457

Draft
sjrl wants to merge 1 commit into
mainfrom
fix/async-pipeline-run-in-running-loop
Draft

feat: Allow running asyncpipeline.run in a place with an existing loop#11457
sjrl wants to merge 1 commit into
mainfrom
fix/async-pipeline-run-in-running-loop

Conversation

@sjrl
Copy link
Copy Markdown
Contributor

@sjrl sjrl commented Jun 1, 2026

Related Issues

The crux of the problem is if we use AsyncPipeline in a SuperComponent we run into an issue if this super component is called from Pipeline.run in an async application (e.g. Jupyter Notebook or FastAPI app). The call trace would be Pipeline.run -> HybridRetriever.run --> AsyncPipeline.run which would raise an error from AsyncPipeline.run if an existing asyncio loop already exists.

Related to PRs: deepset-ai/haystack-core-integrations#3369 and deepset-ai/haystack-core-integrations#3374

Proposed Changes:

One solution for overcoming the above stated issue by creating a new thread with its own asyncio event loop. I also propose an alternative below.

How did you test it?

New tests

Notes for the reviewer

Alternative

Merging the Pipeline and AsyncPipeline classes into one Pipeline class with a run and run_async methods (like we do for Components) would solve this problem by making run a proper sync call path and keep run_async a proper async path.

Checklist

  • I have read the contributors guidelines and the code of conduct.
  • I have updated the related issue with new insights and changes.
  • I have added unit tests and updated the docstrings.
  • I've used one of the conventional commit types for my PR title: fix:, feat:, build:, chore:, ci:, docs:, style:, refactor:, perf:, test: and added ! in case the PR includes breaking changes.
  • I have documented my code.
  • I have added a release note file, following the contributors guidelines.
  • I have run pre-commit hooks and fixed any issue.

@vercel
Copy link
Copy Markdown

vercel Bot commented Jun 1, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
haystack-docs Ignored Ignored Jun 1, 2026 7:16am

Request Review

@sjrl sjrl self-assigned this Jun 1, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 1, 2026

Coverage report

Click to see where and how coverage changed

FileStatementsMissingCoverageCoverage
(new stmts)
Lines missing
  haystack/core/pipeline
  async_pipeline.py 54-55, 62
Project Total  

This report was generated by python-coverage-comment-action

Copy link
Copy Markdown
Contributor

@mpangrazzi mpangrazzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks already good to me, I've left a few comments!

:raises RuntimeError:
If called from within an async context. Use `run_async` instead.
"""
coro = self.run_async(data=data, include_outputs_from=include_outputs_from, concurrency_limit=concurrency_limit)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would maybe try to create this coroutine after the exact runner strategy is known

except BaseException as error: # noqa: BLE001 - captured and re-raised in the calling thread
box["error"] = error

thread = threading.Thread(target=_worker, name="haystack-async-pipeline-run", daemon=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using daemon=True here? Since run() cannot return until that thread has finished, a non-daemon thread better matches the contract: the pipeline run is foreground work and should complete / raise. WDYT?

logger = logging.getLogger(__name__)


def _run_coroutine_in_new_loop(coro: Coroutine[Any, Any, Any]) -> Any:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would probably use:

T = TypeVar("T")

def _run_coroutine_in_new_loop(coro: Coroutine[Any, Any, T]) -> T:

as a small type refinement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants