diff --git a/mellea/stdlib/streaming.py b/mellea/stdlib/streaming.py index ee3d9b76f..c2804fe88 100644 --- a/mellea/stdlib/streaming.py +++ b/mellea/stdlib/streaming.py @@ -244,6 +244,11 @@ def __init__(self, mot: ModelOutputThunk, ctx: Context) -> None: self._event_queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue() self._orchestration_task: asyncio.Task[None] | None = None self._done = asyncio.Event() + # Set synchronously at the very top of _orchestrate_streaming, before + # any await, so external coordinators (e.g. cancellation tests) can wait + # until the task is live and suspended at its first I/O point. + # Single-use: not reset between runs; this object is not re-entrant. + self._orchestration_started = asyncio.Event() # Stashed so acomplete() surfaces orchestrator failures even when the # consumer never iterates astream(). Cleared once consumed by # whichever of the two reads it first. @@ -448,6 +453,12 @@ async def _orchestrate_streaming( chunking: ChunkingStrategy, val_backend: Backend, ) -> None: + # Signal that the coroutine body is executing before the first suspension. + # External coordinators waiting on _orchestration_started are guaranteed to + # resume only after this task has yielded at its first real await, so a + # subsequent cancel() always lands on a live, non-done task. + result._orchestration_started.set() + accumulated = "" emitted_end = 0 # byte offset in accumulated after the last emitted chunk prev_chunk_count = 0 diff --git a/test/stdlib/test_streaming.py b/test/stdlib/test_streaming.py index f5e3af3c1..ee4d67f8f 100644 --- a/test/stdlib/test_streaming.py +++ b/test/stdlib/test_streaming.py @@ -936,9 +936,14 @@ async def test_external_task_cancellation_releases_consumers() -> None: ) assert result._orchestration_task is not None - # Yield once so the orchestration task enters its main loop before we - # cancel it. - await asyncio.sleep(0.01) + # Wait until the orchestration coroutine has started and hit its first + # suspension point. Using _orchestration_started rather than a wall-clock + # sleep avoids the race where a fast runner drains the whole stream within + # the sleep window, making cancel() a no-op on an already-done task. + # _orchestration_started.wait() must precede cancel() — cancelling before + # the first scheduling means the event is never set. + await asyncio.wait_for(result._orchestration_started.wait(), timeout=2.0) + assert not result._orchestration_task.done() # guard: task must still be live # Same mechanism asyncio.wait_for uses on timeout. result._orchestration_task.cancel() @@ -971,7 +976,8 @@ async def test_external_cancellation_acomplete_raise_once() -> None: ) assert result._orchestration_task is not None - await asyncio.sleep(0.01) + await asyncio.wait_for(result._orchestration_started.wait(), timeout=2.0) + assert not result._orchestration_task.done() # guard: task must still be live result._orchestration_task.cancel() await asyncio.wait_for(result._done.wait(), timeout=2.0)