feat: Allow running asyncpipeline.run in a place with an existing loop#11457
Draft
sjrl wants to merge 1 commit into
Draft
feat: Allow running asyncpipeline.run in a place with an existing loop#11457sjrl wants to merge 1 commit into
sjrl wants to merge 1 commit into
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. 1 Skipped Deployment
|
Contributor
Coverage reportClick to see where and how coverage changed
This report was generated by python-coverage-comment-action |
||||||||||||||||||||||||
mpangrazzi
reviewed
Jun 3, 2026
Contributor
mpangrazzi
left a comment
There was a problem hiding this comment.
Looks already good to me, I've left a few comments!
| :raises RuntimeError: | ||
| If called from within an async context. Use `run_async` instead. | ||
| """ | ||
| coro = self.run_async(data=data, include_outputs_from=include_outputs_from, concurrency_limit=concurrency_limit) |
Contributor
There was a problem hiding this comment.
I would maybe try to create this coroutine after the exact runner strategy is known
| except BaseException as error: # noqa: BLE001 - captured and re-raised in the calling thread | ||
| box["error"] = error | ||
|
|
||
| thread = threading.Thread(target=_worker, name="haystack-async-pipeline-run", daemon=True) |
Contributor
There was a problem hiding this comment.
Why using daemon=True here? Since run() cannot return until that thread has finished, a non-daemon thread better matches the contract: the pipeline run is foreground work and should complete / raise. WDYT?
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def _run_coroutine_in_new_loop(coro: Coroutine[Any, Any, Any]) -> Any: |
Contributor
There was a problem hiding this comment.
nit: I would probably use:
T = TypeVar("T")
def _run_coroutine_in_new_loop(coro: Coroutine[Any, Any, T]) -> T:as a small type refinement.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Related Issues
The crux of the problem is if we use
AsyncPipelinein aSuperComponentwe run into an issue if this super component is called fromPipeline.runin an async application (e.g. Jupyter Notebook or FastAPI app). The call trace would bePipeline.run -> HybridRetriever.run --> AsyncPipeline.runwhich would raise an error fromAsyncPipeline.runif an existing asyncio loop already exists.Related to PRs: deepset-ai/haystack-core-integrations#3369 and deepset-ai/haystack-core-integrations#3374
Proposed Changes:
One solution for overcoming the above stated issue by creating a new thread with its own asyncio event loop. I also propose an alternative below.
How did you test it?
New tests
Notes for the reviewer
Alternative
Merging the
PipelineandAsyncPipelineclasses into onePipelineclass with arunandrun_asyncmethods (like we do for Components) would solve this problem by making run a proper sync call path and keep run_async a proper async path.Checklist
fix:,feat:,build:,chore:,ci:,docs:,style:,refactor:,perf:,test:and added!in case the PR includes breaking changes.