From 767d0b9d178d37595cd31d2d7096a50c4587e9c5 Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Fri, 22 May 2026 11:05:23 +0100 Subject: [PATCH 1/2] fix(streaming): replace sleep gate with _started event for cancel tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test_external_task_cancellation_releases_consumers and test_external_cancellation_acomplete_raise_once both slept 10 ms before calling cancel() on the orchestration task. On a fast runner the mock backend (sleep(0) between tokens) can drain the entire stream within that window, making cancel() a no-op on an already-done task — no CancelledError is ever raised and the test fails. Add StreamChunkingResult._started (asyncio.Event), set at the very top of _orchestrate_streaming before its first suspension point. Tests now await _started before cancelling: after _started fires the orchestration is live but suspended, and cancel() is called without yielding back to the event loop, so the task cannot complete between the two calls. Fixes #1132 Assisted-by: Claude Code Signed-off-by: Nigel Jones --- mellea/stdlib/streaming.py | 8 ++++++++ test/stdlib/test_streaming.py | 10 ++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/mellea/stdlib/streaming.py b/mellea/stdlib/streaming.py index ee3d9b76f..7b5f0ee58 100644 --- a/mellea/stdlib/streaming.py +++ b/mellea/stdlib/streaming.py @@ -244,6 +244,9 @@ def __init__(self, mot: ModelOutputThunk, ctx: Context) -> None: self._event_queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue() self._orchestration_task: asyncio.Task[None] | None = None self._done = asyncio.Event() + # Set by _orchestrate_streaming before its first suspension point so + # tests can wait for the task to be live before cancelling externally. + self._started = asyncio.Event() # Stashed so acomplete() surfaces orchestrator failures even when the # consumer never iterates astream(). Cleared once consumed by # whichever of the two reads it first. @@ -448,6 +451,11 @@ async def _orchestrate_streaming( chunking: ChunkingStrategy, val_backend: Backend, ) -> None: + # Signal that the coroutine body is executing before the first suspension. + # Tests that cancel the task externally wait on this to avoid a race where + # cancel() fires after the task has already completed (no-op on a done task). + result._started.set() + accumulated = "" emitted_end = 0 # byte offset in accumulated after the last emitted chunk prev_chunk_count = 0 diff --git a/test/stdlib/test_streaming.py b/test/stdlib/test_streaming.py index f5e3af3c1..3e5e98f40 100644 --- a/test/stdlib/test_streaming.py +++ b/test/stdlib/test_streaming.py @@ -936,9 +936,11 @@ async def test_external_task_cancellation_releases_consumers() -> None: ) assert result._orchestration_task is not None - # Yield once so the orchestration task enters its main loop before we - # cancel it. - await asyncio.sleep(0.01) + # Wait until the orchestration coroutine has started and hit its first + # suspension point. Using _started rather than a wall-clock sleep avoids + # the race where a fast runner drains the whole stream within the sleep + # window, making cancel() a no-op on an already-done task. + await asyncio.wait_for(result._started.wait(), timeout=2.0) # Same mechanism asyncio.wait_for uses on timeout. result._orchestration_task.cancel() @@ -971,7 +973,7 @@ async def test_external_cancellation_acomplete_raise_once() -> None: ) assert result._orchestration_task is not None - await asyncio.sleep(0.01) + await asyncio.wait_for(result._started.wait(), timeout=2.0) result._orchestration_task.cancel() await asyncio.wait_for(result._done.wait(), timeout=2.0) From 3dd9173a0a2e9c36642a80c1435119d4593090f2 Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Fri, 22 May 2026 11:17:12 +0100 Subject: [PATCH 2/2] =?UTF-8?q?refactor(streaming):=20rename=20=5Fstarted?= =?UTF-8?q?=20=E2=86=92=20=5Forchestration=5Fstarted;=20add=20done()=20gua?= =?UTF-8?q?rd?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three improvements from code review: - Rename _started to _orchestration_started for consistency with the existing _orchestration_task and _orchestration_exception fields. - Expand the attribute comment to make explicit that the event is set synchronously before any await, and that the object is single-use. - Add `assert not result._orchestration_task.done()` after the _orchestration_started.wait() in both cancellation tests as a regression guard: if a future change makes the orchestration synchronous to its first yield, the tests would silently regress to vacuous-pass without this assertion. Assisted-by: Claude Code Signed-off-by: Nigel Jones --- mellea/stdlib/streaming.py | 15 +++++++++------ test/stdlib/test_streaming.py | 14 +++++++++----- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/mellea/stdlib/streaming.py b/mellea/stdlib/streaming.py index 7b5f0ee58..c2804fe88 100644 --- a/mellea/stdlib/streaming.py +++ b/mellea/stdlib/streaming.py @@ -244,9 +244,11 @@ def __init__(self, mot: ModelOutputThunk, ctx: Context) -> None: self._event_queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue() self._orchestration_task: asyncio.Task[None] | None = None self._done = asyncio.Event() - # Set by _orchestrate_streaming before its first suspension point so - # tests can wait for the task to be live before cancelling externally. - self._started = asyncio.Event() + # Set synchronously at the very top of _orchestrate_streaming, before + # any await, so external coordinators (e.g. cancellation tests) can wait + # until the task is live and suspended at its first I/O point. + # Single-use: not reset between runs; this object is not re-entrant. + self._orchestration_started = asyncio.Event() # Stashed so acomplete() surfaces orchestrator failures even when the # consumer never iterates astream(). Cleared once consumed by # whichever of the two reads it first. @@ -452,9 +454,10 @@ async def _orchestrate_streaming( val_backend: Backend, ) -> None: # Signal that the coroutine body is executing before the first suspension. - # Tests that cancel the task externally wait on this to avoid a race where - # cancel() fires after the task has already completed (no-op on a done task). - result._started.set() + # External coordinators waiting on _orchestration_started are guaranteed to + # resume only after this task has yielded at its first real await, so a + # subsequent cancel() always lands on a live, non-done task. + result._orchestration_started.set() accumulated = "" emitted_end = 0 # byte offset in accumulated after the last emitted chunk diff --git a/test/stdlib/test_streaming.py b/test/stdlib/test_streaming.py index 3e5e98f40..ee4d67f8f 100644 --- a/test/stdlib/test_streaming.py +++ b/test/stdlib/test_streaming.py @@ -937,10 +937,13 @@ async def test_external_task_cancellation_releases_consumers() -> None: assert result._orchestration_task is not None # Wait until the orchestration coroutine has started and hit its first - # suspension point. Using _started rather than a wall-clock sleep avoids - # the race where a fast runner drains the whole stream within the sleep - # window, making cancel() a no-op on an already-done task. - await asyncio.wait_for(result._started.wait(), timeout=2.0) + # suspension point. Using _orchestration_started rather than a wall-clock + # sleep avoids the race where a fast runner drains the whole stream within + # the sleep window, making cancel() a no-op on an already-done task. + # _orchestration_started.wait() must precede cancel() — cancelling before + # the first scheduling means the event is never set. + await asyncio.wait_for(result._orchestration_started.wait(), timeout=2.0) + assert not result._orchestration_task.done() # guard: task must still be live # Same mechanism asyncio.wait_for uses on timeout. result._orchestration_task.cancel() @@ -973,7 +976,8 @@ async def test_external_cancellation_acomplete_raise_once() -> None: ) assert result._orchestration_task is not None - await asyncio.wait_for(result._started.wait(), timeout=2.0) + await asyncio.wait_for(result._orchestration_started.wait(), timeout=2.0) + assert not result._orchestration_task.done() # guard: task must still be live result._orchestration_task.cancel() await asyncio.wait_for(result._done.wait(), timeout=2.0)