Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions mellea/stdlib/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ def __init__(self, mot: ModelOutputThunk, ctx: Context) -> None:
self._event_queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue()
self._orchestration_task: asyncio.Task[None] | None = None
self._done = asyncio.Event()
# Set synchronously at the very top of _orchestrate_streaming, before
# any await, so external coordinators (e.g. cancellation tests) can wait
# until the task is live and suspended at its first I/O point.
# Single-use: not reset between runs; this object is not re-entrant.
self._orchestration_started = asyncio.Event()
# Stashed so acomplete() surfaces orchestrator failures even when the
# consumer never iterates astream(). Cleared once consumed by
# whichever of the two reads it first.
Expand Down Expand Up @@ -448,6 +453,12 @@ async def _orchestrate_streaming(
chunking: ChunkingStrategy,
val_backend: Backend,
) -> None:
# Signal that the coroutine body is executing before the first suspension.
# External coordinators waiting on _orchestration_started are guaranteed to
# resume only after this task has yielded at its first real await, so a
# subsequent cancel() always lands on a live, non-done task.
result._orchestration_started.set()

accumulated = ""
emitted_end = 0 # byte offset in accumulated after the last emitted chunk
prev_chunk_count = 0
Expand Down
14 changes: 10 additions & 4 deletions test/stdlib/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,9 +936,14 @@ async def test_external_task_cancellation_releases_consumers() -> None:
)

assert result._orchestration_task is not None
# Yield once so the orchestration task enters its main loop before we
# cancel it.
await asyncio.sleep(0.01)
# Wait until the orchestration coroutine has started and hit its first
# suspension point. Using _orchestration_started rather than a wall-clock
# sleep avoids the race where a fast runner drains the whole stream within
# the sleep window, making cancel() a no-op on an already-done task.
# _orchestration_started.wait() must precede cancel() — cancelling before
# the first scheduling means the event is never set.
await asyncio.wait_for(result._orchestration_started.wait(), timeout=2.0)
assert not result._orchestration_task.done() # guard: task must still be live

# Same mechanism asyncio.wait_for uses on timeout.
result._orchestration_task.cancel()
Expand Down Expand Up @@ -971,7 +976,8 @@ async def test_external_cancellation_acomplete_raise_once() -> None:
)

assert result._orchestration_task is not None
await asyncio.sleep(0.01)
await asyncio.wait_for(result._orchestration_started.wait(), timeout=2.0)
assert not result._orchestration_task.done() # guard: task must still be live
result._orchestration_task.cancel()
await asyncio.wait_for(result._done.wait(), timeout=2.0)

Expand Down
Loading