Skip to content

feat(stdlib): add stream_with_chunking() with per-chunk validation (#901)#942

Open
planetf1 wants to merge 24 commits into
generative-computing:mainfrom
planetf1:feat/901-stream-with-chunking
Open

feat(stdlib): add stream_with_chunking() with per-chunk validation (#901)#942
planetf1 wants to merge 24 commits into
generative-computing:mainfrom
planetf1:feat/901-stream-with-chunking

Conversation

@planetf1
Copy link
Copy Markdown
Contributor

@planetf1 planetf1 commented Apr 27, 2026

Misc PR

Type of PR

  • Bug Fix
  • New Feature
  • Documentation
  • Other

Description

Implements stream_with_chunking() — the core streaming orchestration primitive for the streaming validation epic (#891), closing issue #901.

Part of epic #891 · Wave 3 of 4

This PR closes #901. The following are explicitly out of scope and tracked in #902 (the final wave — nothing further is planned after #902):

  • Streaming event types (ChunkEvent, QuickCheckEvent, StreamingDoneEvent, FullValidationEvent, CompletedEvent, ErrorEvent)
  • Application-level OTEL span and span events for the orchestrator
  • record_requirement_check / record_requirement_failure / record_sampling_outcome / record_error calls
  • ErrorEvent replacing the MelleaLogger.warning stopgap (see TODO(#902) in streaming.py)
  • Narrative documentation: how-to section, requirements-system concept update, end-to-end tutorial

Builds on the now-merged #925 (squash-merged as upstream commit 7912a1df), which added Requirement.stream_validate(). This branch has been rebased directly onto upstream/main; the 13 Wave 3 commits are 8128dfad..3fb501ef.

What changed

  • mellea/core/base.py — adds ModelOutputThunk.cancel_generation(error=None): cancels in-progress _generate / _generate_extra tasks, drains the internal async queue (to release any blocked put() calls), closes the open telemetry span (recording the provided error if given, else a generic RuntimeError("Generation cancelled")), and marks the MOT as computed. Uses .pop() on _meta["_telemetry_span"] to prevent KeyError if two coroutines race before _computed is set.

  • mellea/stdlib/streaming.py (new) — StreamChunkingResult and stream_with_chunking():

    • Starts a background asyncio.Task that consumes the MOT's async stream, splits accumulated text via ChunkingStrategy, and calls stream_validate() once per complete chunk, passing that single chunk (not the accumulation).
    • When an astream() iteration produces multiple new chunks, they are validated sequentially in order so early exit prevents later chunks in the same batch from being validated or emitted to the consumer.
    • Early exit: on first "fail" result, cancel_generation() is called and StreamChunkingResult.completed is set to False. The failing chunk is not emitted to the consumer; use streaming_failures to inspect what failed.
    • End-of-stream flush: when the stream ends naturally, any trailing fragment withheld by the chunker (e.g. a final sentence with no trailing whitespace) is released via the new ChunkingStrategy.flush() method and run through stream_validate on the same terms as regular chunks. Skipped on early exit (the fragment is mid-token and incomplete).
    • Final validation: after natural completion, validate() is called on all non-failed requirements. Skipped on early exit.
    • Clone-per-call: copy(req) clones each requirement before use; originals are never mutated.
    • String aliases "sentence", "word", "paragraph" resolve to the corresponding ChunkingStrategy subclasses.
    • Exception in stream_validate → orchestrator calls cancel_generation(error=exc) so backend telemetry records the real cause, then surfaces the exception to the consumer via astream() / acomplete().
    • finally block calls cancel_generation() when not already computed, so the backend producer is stopped even on external CancelledError (which bypasses except Exception).
  • mellea/stdlib/chunking.py — adds ChunkingStrategy.flush(accumulated_text) -> list[str] (default returns [] — backward-compatible for external chunkers). SentenceChunker, WordChunker, and ParagraphChunker each override to return the withheld trailing fragment.

  • mellea/stdlib/__init__.py — re-exports StreamChunkingResult and stream_with_chunking.

  • docs/examples/streaming/streaming_chunking.py (new) — end-to-end example with a stateful MaxSentencesReq showing the canonical accumulate-on-self pattern. Marked # pytest: ollama, e2e.

Spec adherence and deliberate variations

For reviewer attention. Items (a)–(d) are spec-compliant (explicit or, in (d), one of the options the spec names); items (e)–(h) are design decisions taken where the spec is silent.

(a) Chunk semantics — spec-compliant. Addresses @jakelorocco's review on #925 (now approved). stream_validate receives a single complete chunk from the chunking strategy, not the accumulated output. Matches the contract in the #891 epic and the #900 spec.

(b) Clone-per-attempt — spec-compliant. copy(req) clones each requirement before use; originals are never mutated (per #891 and #901).

(c) Early-exit cancellation — spec-compliant. cancel_generation() is called on first "fail" to stop the backend producer before it blocks on mot._async_queue (per #891).

(d) End-of-stream flush — spec-compliant design choice. The #899 ABC docstring offers two options for the trailing fragment: "pass to a final validator or discard". This PR takes the first by adding ChunkingStrategy.flush(accumulated_text) and running the returned fragment through stream_validate on the same terms as regular chunks, so the "no unvalidated content reaches the consumer" invariant extends to the trailing fragment.

(e) Exception handling in stream_validatevariation; spec-silent. The spec covers cancel_generation() on explicit "fail" but not on unhandled exceptions. We extend the same resource-leak reasoning: any orchestrator exit must stop the producer. The original exception is passed to cancel_generation(error=...) so the backend telemetry span records the real cause. If cancel_generation() itself raises, we log via MelleaLogger.warning with a TODO(#902) marker and still surface the original exception to the consumer. The finally block additionally covers external CancelledError (which bypasses except Exception) for the same reason.

(f) astream() single-consumervariation; spec-silent. #901 does not say whether re-iteration is supported. Current implementation is single-consumer (queue drained to the None sentinel). Docstring updated to state the contract explicitly.

(g) Validator latencyvariation; spec-silent. Chunks are emitted only after all active validators return for that chunk. A slow stream_validate therefore adds latency; the preserved invariant is that the consumer never sees unvalidated content. A concurrent-emission fast path may be added if a concrete use case drives it.

(h) Orchestrator-level OTEL deferred to #902 — per the epic's explicit instruction: "these event types are the equivalent observability mechanism for the streaming path". Backend-layer instrumentation (mot._meta["_telemetry_span"], token/latency/error metrics via plugins) remains intact. Not in this PR: application-level trace span for stream_with_chunking, span events for chunk lifecycle, record_requirement_check / record_requirement_failure / record_sampling_outcome / record_error calls, and the ErrorEvent that will replace the MelleaLogger.warning stopgap. All enumerated in the #902 acceptance criteria.

Testing

  • Tests added to the respective file if code was changed
    • test/stdlib/test_streaming.py (new) — 12 unit tests via StreamingMockBackend (no Ollama needed) covering normal completion, early exit on fail, clone isolation, quick_check_backend routing (asserts clone saw val_backend for every call), deadlock prevention, as_thunk correctness, astream() chunk granularity, no-requirements passthrough, per-chunk contract (exact-match capture of individual chunks), trailing-fragment flush, early exit on trailing fragment, multi-chunk batch with mid-batch fail, cancel-on-fail spy verification, and exception-in-validator cancellation.
    • test/stdlib/test_chunking.py — 13 new tests covering ChunkingStrategy.flush (ABC default + each built-in chunker's fragment logic).
    • test/core/test_stream_validate.py (on stacked feat: add stream_validate() hook to Requirement (#900) #925) — 9 tests.
    • Full related suite: 69 passed locally (uv run pytest test/stdlib/test_streaming.py test/stdlib/test_chunking.py test/core/test_astream_mock.py -q).
  • New code coverage: ~92% on streaming.py (16 tests). Remaining uncovered lines are all error-within-error cleanup paths (cancel_generation raising during exception handling, external CancelledError cleanup failing, acomplete re-raising an orchestration exception) and the TOCTOU RuntimeError race — these require fault injection beyond unit scope and are acceptable gaps.
  • Ensure existing tests and github automation passes (a maintainer will kick off the github automation when the rest of the PR is populated)
  • Integration test against local Ollama (granite4:micro) — example runs cleanly; both sentences of a non-terminated response reach the consumer via the flush path.

Attribution

  • AI coding assistants used

@github-actions github-actions Bot added the enhancement New feature or request label Apr 27, 2026
@planetf1 planetf1 force-pushed the feat/901-stream-with-chunking branch 2 times, most recently from 44025e4 to 76a3eb9 Compare April 28, 2026 16:09
planetf1 added a commit to planetf1/mellea that referenced this pull request Apr 28, 2026
Addresses issues raised by independent review on top of PR generative-computing#942.

Orchestrator (mellea/stdlib/streaming.py):
- except Exception now calls mot.cancel_generation() before surfacing
  the exception to the consumer — previously the backend producer was
  left running, eventually blocking on mot._async_queue (maxsize=20).
  Cleanup failures are logged via MelleaLogger.warning with a
  TODO(generative-computing#902) marker; generative-computing#902 replaces the log with a proper ErrorEvent.
- RuntimeError catch in the astream() loop now re-raises unless
  mot.is_computed() is true, so only the documented "already computed"
  race is swallowed.
- astream() docstring now states the single-consumer contract
  explicitly; a second iteration blocks on an empty queue with no
  sentinel to deliver.
- as_thunk docstring now flags the early-exit case: cancel_generation
  forces is_computed=True without running post_processing(), so
  generation.usage and related telemetry fields may be None.

Chunker (mellea/stdlib/chunking.py):
- SentenceChunker.flush switches from .strip() to .rstrip() with a
  comment explaining why: the loop's lstrip has already removed
  leading whitespace, and trailing whitespace on a sentence fragment
  is non-semantic (consistent with split() returning sentences
  without trailing whitespace).
- ParagraphChunker.flush adds a docstring noting the deliberate
  asymmetry: paragraph fragments are returned byte-for-byte because
  internal whitespace (e.g. trailing \n of a list item) can be
  semantically meaningful.

Tests (test/stdlib/test_streaming.py):
- test_stream_validate_receives_individual_chunks now uses exact-
  match on the captured chunk list, which directly regresses if
  someone reverts to accumulated-text semantics.
- test_multiple_chunks_in_one_batch_with_mid_batch_fail: response
  fed as one large token so split() yields 4 sentences at once;
  verifies chunk 1 emits, chunk 2 fails (not emitted), chunks 3 and
  4 are neither validated nor emitted.
- test_cancel_generation_invoked_on_fail: spies on
  ModelOutputThunk.cancel_generation and asserts it was called on
  the "fail" early-exit path.
- test_exception_in_stream_validate_cancels_generation: a requirement
  that raises must cause cancel_generation to run and the exception
  to surface via astream()/acomplete() without hanging.

Telemetry observability (orchestrator-level spans, metrics, span
events) remains deferred to generative-computing#902 per the epic, which now has the
acceptance criteria updated to cover event emission, the OTEL bridge,
and the ErrorEvent type that will replace the MelleaLogger stopgap.

Assisted-by: Claude Code
planetf1 added 12 commits April 28, 2026 20:32
Adds an async cancel_generation() method that cancels in-progress
_generate and _generate_extra tasks, drains the internal async queue
to release any blocked put() calls, closes the open telemetry span,
and sets _computed=True so the MOT is immediately usable.

Required by the stream_with_chunking() orchestrator (generative-computing#901) for clean
early-exit when a streaming requirement returns "fail".

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
…enerative-computing#901)

Adds stream_with_chunking() — the core streaming orchestration
primitive that consumes a ModelOutputThunk's async stream via a
background asyncio.Task, applies a ChunkingStrategy to produce
semantic chunks, and runs stream_validate() in parallel across all
requirements at each chunk boundary.

Key behaviours:
- Early exit: if any requirement returns "fail" during streaming,
  generation is cancelled immediately via cancel_generation() and
  StreamChunkingResult.completed is set to False.
- Final validation: after natural completion, validate() is called on
  all non-failed requirements.
- Clone-per-call: requirements are cloned (copy(req)) before each
  invocation; originals are never mutated.
- String aliases: "sentence", "word", "paragraph" map to the
  corresponding ChunkingStrategy subclasses.

StreamChunkingResult exposes:
- astream() — async iterator yielding individual validated chunks
- acomplete() — await full completion including final validation
- as_thunk — wrap full_text as a computed ModelOutputThunk
- completed, full_text, final_validations, streaming_failures

Re-exports StreamChunkingResult and stream_with_chunking from
mellea.stdlib for day-to-day use.

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
Adds test/stdlib/test_streaming.py with 9 unit tests covering:
- Normal completion: validate() called at stream end, completed=True
- Early exit on "fail": completed=False, streaming_failures populated
- Clone isolation: originals never mutated across retries
- quick_check_backend routing: validation uses alternate backend
- Deadlock prevention: early exit with asyncio.wait_for timeout
- as_thunk correctness: value=full_text, raises before acomplete()
- astream() yields individual chunks (not accumulated text)
- No requirements: streams without validation

StreamingMockBackend subclasses Backend and feeds a fixed response
string into a MOT queue char-by-char via asyncio.create_task,
following the create_manual_mock_thunk() pattern from test_astream_mock.py.

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
Adds docs/examples/streaming/streaming_chunking.py demonstrating
stream_with_chunking() end-to-end: defining a custom stream_validate()
override, consuming chunks via astream(), and awaiting acomplete() to
inspect final_validations and streaming_failures.

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
Fixes [no_class_args] CI failure — the docs build-and-validate checker
requires __init__ parameters to be documented in the class docstring (not
__init__) per Option C convention.

Assisted-by: Claude Code
Fixes second [no_raises] CI failure — stream_with_chunking raises
ValueError for unknown chunking aliases but had no Raises: section.

Assisted-by: Claude Code
…e call

Aligns the orchestrator with the chunk-at-a-time design set out in the
generative-computing#891 epic and generative-computing#900 spec. Previously _orchestrate_streaming passed the
full accumulated text to stream_validate, and called it once per batch
of new chunks rather than once per chunk. This masked the design intent
of the chunking strategy and forced stateful requirements into the
self._seen_len workaround.

Behaviour changes:
- stream_validate is called once per complete chunk produced by the
  chunking strategy (not once per astream() iteration)
- The call receives that single chunk (not the accumulated text)
- Multiple chunks from one astream() iteration are validated in order;
  early exit on a "fail" prevents later chunks in the same batch from
  being validated or emitted
- On early exit, the failing chunk is no longer emitted to the consumer;
  consumers inspect StreamChunkingResult.streaming_failures instead
  (previous behaviour emitted whatever the current batch contained)

Test changes:
- FailAfterWordsReq now maintains a running word count on self, since
  each stream_validate call sees a one-word chunk rather than the
  growing accumulation
- New test_stream_validate_receives_individual_chunks asserts the
  per-chunk contract directly by capturing the cloned requirement and
  checking the chunks it saw

Docstring updated to describe the per-chunk contract, the in-order
validation of a batch, the non-emission of failing chunks, and the MOT
single-consumer constraint.

Assisted-by: Claude Code
Two documentation fixes following the per-chunk semantics correction:

- streaming_chunking.py: MaxSentencesReq previously counted sentence-end
  punctuation in the chunk, which worked under the old accumulated-text
  behaviour but returns at most 1 per sentence under delta semantics.
  Rewritten to increment self._count once per chunk -- the canonical
  pattern for a requirement that needs context beyond a single chunk.

- stream_with_chunking docstring: add a Note that chunks are emitted to
  the consumer only after every active validator returns for that chunk.
  A slow stream_validate (e.g. an LLM-based one) therefore adds latency
  to every chunk. The invariant preserved is that the consumer never
  sees unvalidated content; a concurrent-emission fast path may be added
  in future if a concrete use case calls for it.

Assisted-by: Claude Code
ChunkingStrategy.split() withholds the trailing fragment by design
(generative-computing#899). Previously the orchestrator discarded it — it appeared in
full_text and the final validate() saw it, but it was never yielded to
astream() consumers and never seen by stream_validate. For a response
that did not end in a chunk terminator (e.g. "Sentence one. Sentence
two." with no trailing whitespace under SentenceChunker), the last
sentence silently bypassed streaming validation.

Adds ChunkingStrategy.flush(accumulated_text) -> list[str]:
- Default in the ABC returns [] (backward-compatible — external
  chunkers retain the old discard behaviour until they opt in).
- SentenceChunker, WordChunker, ParagraphChunker each override to
  return the withheld trailing fragment as a single-element list.

_orchestrate_streaming calls chunking.flush(accumulated) after the main
loop (only when the stream ended naturally, not on early exit — a
cancelled stream's trailing fragment is by definition incomplete).
Each flushed chunk goes through the same stream_validate / emit path
as regular chunks, so the "no unvalidated content reaches the consumer"
invariant extends to the trailing fragment, and a fail on the fragment
still records a streaming failure and skips final validate().

Tests:
- 13 new chunker tests covering the default-discard behaviour and each
  built-in's flush logic (empty input, fragment-present, already-
  terminated cases).
- test_trailing_fragment_is_flushed_to_consumer: stream_validate sees
  the fragment and astream yields it.
- test_early_exit_on_trailing_fragment: fail on the flushed fragment
  propagates to streaming_failures and skips final validation.

Assisted-by: Claude Code
Addresses issues raised by independent review on top of PR generative-computing#942.

Orchestrator (mellea/stdlib/streaming.py):
- except Exception now calls mot.cancel_generation() before surfacing
  the exception to the consumer — previously the backend producer was
  left running, eventually blocking on mot._async_queue (maxsize=20).
  Cleanup failures are logged via MelleaLogger.warning with a
  TODO(generative-computing#902) marker; generative-computing#902 replaces the log with a proper ErrorEvent.
- RuntimeError catch in the astream() loop now re-raises unless
  mot.is_computed() is true, so only the documented "already computed"
  race is swallowed.
- astream() docstring now states the single-consumer contract
  explicitly; a second iteration blocks on an empty queue with no
  sentinel to deliver.
- as_thunk docstring now flags the early-exit case: cancel_generation
  forces is_computed=True without running post_processing(), so
  generation.usage and related telemetry fields may be None.

Chunker (mellea/stdlib/chunking.py):
- SentenceChunker.flush switches from .strip() to .rstrip() with a
  comment explaining why: the loop's lstrip has already removed
  leading whitespace, and trailing whitespace on a sentence fragment
  is non-semantic (consistent with split() returning sentences
  without trailing whitespace).
- ParagraphChunker.flush adds a docstring noting the deliberate
  asymmetry: paragraph fragments are returned byte-for-byte because
  internal whitespace (e.g. trailing \n of a list item) can be
  semantically meaningful.

Tests (test/stdlib/test_streaming.py):
- test_stream_validate_receives_individual_chunks now uses exact-
  match on the captured chunk list, which directly regresses if
  someone reverts to accumulated-text semantics.
- test_multiple_chunks_in_one_batch_with_mid_batch_fail: response
  fed as one large token so split() yields 4 sentences at once;
  verifies chunk 1 emits, chunk 2 fails (not emitted), chunks 3 and
  4 are neither validated nor emitted.
- test_cancel_generation_invoked_on_fail: spies on
  ModelOutputThunk.cancel_generation and asserts it was called on
  the "fail" early-exit path.
- test_exception_in_stream_validate_cancels_generation: a requirement
  that raises must cause cancel_generation to run and the exception
  to surface via astream()/acomplete() without hanging.

Telemetry observability (orchestrator-level spans, metrics, span
events) remains deferred to generative-computing#902 per the epic, which now has the
acceptance criteria updated to cover event emission, the OTEL bridge,
and the ErrorEvent type that will replace the MelleaLogger stopgap.

Assisted-by: Claude Code
Three items from the second independent review:

cancel_generation(error=) — accept an optional Exception parameter.
When the orchestrator enters the except Exception path, it now passes
the caught exception to cancel_generation() so the backend telemetry
span records the real cause via set_span_error instead of a generic
RuntimeError("Generation cancelled"). The original exception still
surfaces to the consumer via astream()/acomplete(); this is purely an
OTEL accuracy fix. Backward-compatible: the default None preserves the
previous "Generation cancelled" message for the normal fail path.

stream_with_chunking docstring — the "After the stream ends (naturally
or via early exit), validate() is called" wording overstated behaviour.
The orchestrator actually skips final validate() on early exit
(test_early_exit_on_fail verifies final_validations == []). Docstring
now correctly says final validate() runs only on natural completion.

test_exception_in_stream_validate_cancels_generation docstring — the
test fails on chunk 1 so the queue never actually fills; it verifies
the cancel-on-exception path and the no-hang guarantee but does not
directly prove the worst-case "producer blocked on full queue"
scenario. Docstring now states what it actually covers and points at
test/core/ for the cancel_generation drain logic.

Assisted-by: Claude Code
The Docs CI docstring quality gate [no_class_args]-equivalent check
requires every documented method with typed params to have an Args
section and a Returns section matching the return annotation.

SentenceChunker.flush, WordChunker.flush, and ParagraphChunker.flush
all took accumulated_text and returned list[str] without the sections.
Add both to each override, documenting each flush's specific semantics
(rstrip for sentences, whitespace-split trailing fragment for words,
byte-for-byte for paragraphs).

Assisted-by: Claude Code
@planetf1 planetf1 force-pushed the feat/901-stream-with-chunking branch from cb75fa7 to 74c009d Compare April 28, 2026 19:33
- _orchestrate_streaming: add cancel_generation() in finally block so the
  backend producer is stopped even on external CancelledError (BaseException
  bypasses except Exception, leaving _generate hung on a full queue)
- cancel_generation: replace .get + del on _telemetry_span with .pop to
  prevent KeyError if two coroutines race before _computed is set
- Example and test doubles: add super().__init__() to Requirement subclasses
  so description/validation_fn/_output are always initialised
- docs/examples: fix pytest tier marker integration → e2e (Ollama example
  must be e2e per MARKERS_GUIDE; all peer examples use e2e)
- test_quick_check_backend_routing: capture clone via __copy__ intercept and
  assert all seen_backends are val_backend, not just clone-isolation check

Assisted-by: Claude Code
@planetf1 planetf1 marked this pull request as ready for review April 29, 2026 10:32
@planetf1 planetf1 requested review from a team, jakelorocco and nrfulton as code owners April 29, 2026 10:32
@planetf1
Copy link
Copy Markdown
Contributor Author

FYI @jakelorocco — next part of the streaming validation epic is ready for review.

@planetf1
Copy link
Copy Markdown
Contributor Author

planetf1 commented Apr 29, 2026

Resolved — all issues addressed across Wave 3 and Wave 4 fix rounds. Ready for review.

@planetf1 planetf1 marked this pull request as draft April 29, 2026 13:34
@planetf1 planetf1 marked this pull request as ready for review April 29, 2026 15:41
Comment thread mellea/stdlib/streaming.py Outdated
Comment thread mellea/stdlib/streaming.py Outdated
Addresses review feedback on `_orchestrate_streaming` cleanup:

- Exceptions caught by the orchestrator were only pushed to the chunk
  queue, so callers who skipped `astream()` and went straight to
  `acomplete()` saw the call return silently. Stash the exception on the
  result and raise it from `acomplete()` with raise-once semantics
  (cleared by whichever of astream/acomplete reads it first).
- The finally cleanup caught `BaseException`, silently absorbing
  CancelledError/KeyboardInterrupt/SystemExit. Narrow to `except
  Exception` and switch the terminator to `put_nowait(None)` + `set()`
  so the sync ops always run even when the task is being cancelled
  (otherwise acomplete consumers hang).

Two regression tests:
- test_acomplete_surfaces_exception_without_astream
- test_external_task_cancellation_releases_consumers

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
@planetf1
Copy link
Copy Markdown
Contributor Author

planetf1 commented May 1, 2026

Thanks for the review @psschwei - resolved both points.

Comment thread mellea/core/base.py
Comment thread mellea/stdlib/__init__.py
Comment thread mellea/stdlib/chunking.py
Comment thread test/stdlib/test_streaming.py
Adds a `_cancelled` attribute (False by default) on `ModelOutputThunk`, set
to True inside `cancel_generation()` just before `_computed = True`, exposed
via a read-only `cancelled` property. Propagated through
`StreamChunkingResult.as_thunk` so consumers that only hold the wrapped
thunk can still distinguish cancellation from a natural completion.

Addresses nrfulton's review feedback on generative-computing#942 and pre-stages the
cancel-vs-complete signal that generative-computing#902's `CompletedEvent` needs to surface.

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
planetf1 added 2 commits May 5, 2026 11:59
Adds a short note on the ChunkingStrategy class docstring stating that the ABC operates on text streams only and does not support multi-modal output (audio segments, image regions). Addresses review feedback on generative-computing#942 without expanding scope.

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
Adds test_cancelled_flag_reflects_cancellation_state covering both the early-exit path (cancelled is True, is_computed True, propagates through as_thunk) and the normal-completion path (cancelled is False). Pairs with the cancellation flag added in the prior commit. Addresses nrfulton's review feedback on generative-computing#942.

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
@planetf1
Copy link
Copy Markdown
Contributor Author

planetf1 commented May 5, 2026

@nrfulton thanks for the review - I think it's clear we really want some changes in phase 2 for handling semantics better. Are you ok to move forward with current changes (the original plan) or do you want to resolve the phase 2 design/questions first?

@planetf1 planetf1 requested a review from nrfulton May 5, 2026 11:13
Copy link
Copy Markdown
Member

@psschwei psschwei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things found on a quick AI review. will try to do a more thorough review on monday

Comment thread mellea/stdlib/streaming.py
Comment thread mellea/stdlib/streaming.py Outdated
Comment thread mellea/stdlib/streaming.py Outdated
Comment thread mellea/stdlib/streaming.py
@planetf1
Copy link
Copy Markdown
Contributor Author

All of @psschwei's review comments are now addressed — details in the individual threads, but the short version:

  • Raise-once guard — replaced the stash-as-sentinel pattern with a dedicated _exception_surfaced flag so the two "None" cases (never set vs. already cleared) can't be confused
  • _copy_from / __copy__ / __deepcopy___cancelled is now propagated in all three paths
  • full_text on early exit — tracks emitted_text separately so it reflects only what the consumer actually received, not the raw accumulated delta
  • Indentation question — inline comment added; the outer break is targeting the while loop intentionally

All covered by regression tests. @psschwei @nrfulton — would really appreciate a re-review when you get a chance. There's a stacked PR (#958) waiting behind this one, so getting this merged would unblock that too.

Three fixes:

1. Raise-once regression (5850f92): replace the _orchestration_exception
   stash as the already-surfaced sentinel with a dedicated
   _exception_surfaced: bool flag. The previous guard conflated
   'stash never set' with 'already cleared by acomplete()' -- both show
   as None, so a subsequent astream() call could silently skip a queued
   exception item and return zero chunks with no error.

2. _copy_from / __copy__ / __deepcopy__ omitted _cancelled: the cancelled
   flag added in 4f508fd was not propagated by any of the three copy
   paths on ModelOutputThunk.

3. full_text on early exit now reflects only validated-and-emitted chunks:
   accumulated += delta ran unconditionally before chunk iteration, so
   when a multi-chunk delta contained a failing chunk, full_text (and
   as_thunk.value) included the failing chunk's text and any later chunks
   from the same delta that were never validated. A new emitted_text
   local tracks only the chunks emitted to the consumer queue; full_text
   is set from emitted_text on early exit and from accumulated on natural
   completion.

Three new regression tests cover each fix. Inline comment added on the
while-loop break to address indentation question.

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
Copy link
Copy Markdown
Member

@psschwei psschwei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

edit: sigh... comments posted on wrong lines 😢
will repost review

Copy link
Copy Markdown
Member

@psschwei psschwei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is close and the focused streaming/chunking suite passes locally.

I found a few lifecycle issues that seem worth fixing before merge: one setup-path leak before the orchestrator exists, one validator-concurrency cleanup issue, and one HF-specific cancellation gap where the asyncio task cancellation does not necessarily stop the underlying generation thread.

result = StreamChunkingResult(mot, gen_ctx)

cloned_reqs = [copy(req) for req in (quick_check_requirements or [])]
val_backend = quick_check_backend if quick_check_backend is not None else backend
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setup order can leak an already-started backend stream if requirement cloning fails.

generate_from_context() starts the backend _generate task, then copy(req) runs before the orchestration task/finally block exists. If a requirement’s __copy__ raises, nothing cancels or drains the MOT queue, so real backend producers can remain pending against the maxsize=20 queue.

Could we either clone requirements before starting generation, or wrap the post-generate_from_context()setup intry/exceptand callawait mot.cancel_generation(error=exc)` before re-raising?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed. The leak window is as described: backend generation is in flight from generate_from_context, copy(req) then runs with no cleanup wrapping it, and a raising __copy__ leaves the MOT feeder stuck against the maxsize=20 queue with no consumer.

Addressed in 7fc40a4 by reordering copy(req) ahead of generate_from_context, so the backend simply never starts if cloning raises. Preferred reordering over wrap-and-cancel because it eliminates the failure case rather than cleaning up after it.

The window after generate_from_context returns and before the orchestration task is armed (StreamChunkingResult(...) plus asyncio.create_task(...)) is covered by try/except BaseException; BaseException rather than Exception so loop-shutdown and cancellation paths are caught. Cleanup calls mot.cancel_generation() inside a nested try/except Exception, so a cleanup failure cannot mask the original exception — same shape as streaming.py:288–294. On a failing create_task, coro.close() suppresses the "coroutine was never awaited" warning.

Scope check: the in-tree Requirement subclasses do not override __copy__, so no current caller trips the bug. However, the base-class docstring at core/requirement.py:302 advertises __copy__ as the recommended isolation mechanism for stateful stream_validate overrides, so user-defined subclasses are the intended fix target.

Docstring changes in the same commit:

  • stream_with_chunking: the existing "before use" sentence now says "before backend generation begins" and records the no-leak guarantee; a new Note: block documents that __copy__ exceptions propagate with no backend started.
  • Requirement.stream_validate gets a sentence at the __copy__ paragraph explaining the failure contract for override authors.

Test: parametrised over _PlainReq (no override) and _RaisingCopyReq (__copy__ raises ValueError). The failure branch asserts generate_from_context_call_count == 0 — the hard invariant: backend never started.

Comment thread mellea/core/base.py
self._generate.cancel()

if self._generate_extra is not None and not self._generate_extra.done():
self._generate_extra.cancel()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cancels the asyncio task wrapper, but for the HF backend _generate_extra is created from asyncio.to_thread(model.generate, ...). Cancelling the asyncio task does not stop an already-running generation thread, so early-exit validation can return while HF/GPU generation continues in the background.

Can we add a cooperative cancellation hook for backend producers, at least for HF? For example, HF could install a StoppingCriteria backed by a cancellation event, and cancel_generation() could trigger that before awaiting/cancelling the task wrappers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed. cancel_generation today cancels the asyncio task wrapping asyncio.to_thread(model.generate, ...), but the thread itself runs to completion — the GPU stays busy and the MOT reports cancelled=True while work is still ongoing.

Addressed in d8018dd by adding a generic cooperative-cancel hook on ModelOutputThunk and wiring HuggingFace to it via StoppingCriteria:

  • ModelOutputThunk gains _cancel_hook: Callable[[], None] | None (default None). cancel_generation calls the hook before cancelling _generate/_generate_extra, so the thread receives its stop signal first and the subsequent task cancellation then unblocks the awaiter. The hook is called inside a try/except Exception so a faulty hook cannot mask the existing cancel path.
  • HuggingFaceBackend builds a threading.Event per streaming request, attaches a private _EventStoppingCriteria (merged with any user-supplied stopping_criteria via _install_cancel_stopping_criteria()), and assigns output._cancel_hook = cancel_event.set. Both streaming paths (KV-cache and standard) are updated; the hook is armed before the task is created so a cancel racing with task creation still fires cleanly.
  • All three MOT copy sites (_copy_from, __copy__, __deepcopy__) reset _cancel_hook to None on the copy — a copied MOT is a distinct computation and must not share the original's thread signal.

Chose the generic hook rather than an HF-specific cancel event because a future litellm or vLLM streaming path has the same shape (blocking work wrapped in to_thread), and keeping the contract on MOT avoids a second round of changes to core/base.py when that lands.

Tests in test/core/test_base.py:

  • test_cancel_generation_invokes_cancel_hook_before_task_cancel: hook fires and cancel_generation() returns within 1 s when the thread only exits on event.set().
  • test_cancel_hook_not_forwarded_by_copy_methods: all three copy sites verified to reset the hook.
  • test_cancel_generation_hook_exception_is_suppressed: faulty hook does not mask cancellation.

Comment thread mellea/stdlib/streaming.py Outdated
Comment on lines +200 to +205
await asyncio.gather(
*[
req.stream_validate(c, backend=val_backend, ctx=ctx)
for _, req in active
]
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncio.gather() propagates the first validator exception, but sibling validators are not cancelled. That means if one stream_validate() raises while another validator is still doing work, the orchestrator starts cleanup and surfaces the error while the sibling validator can continue running in the background.

That is especially risky if validators call an LLM or mutate external state. Could we switch this to TaskGroup, or explicitly create tasks and cancel/await the remaining tasks on the first exception? The same concern applies to the final validate() gather below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed. Both asyncio.gather call sites (inside _validate_and_emit around line 200, and the final validate() gather around line 270) use the default return_exceptions=False behaviour: the first failing task raises and peer tasks are left detached. If a slow validator is still awaiting a backend response when a sibling fails, it runs on unobserved until it completes or the event loop shuts down.

Addressed in 7fc40a4 by replacing both gather sites with asyncio.TaskGroup. On first failure, the TaskGroup cancels every peer task, awaits their cancellations, and re-raises. Python floor is already 3.11 per pyproject.toml, so this is a direct substitution with no conditional imports.

One adjustment to the outer handler: TaskGroup wraps failures in an ExceptionGroup. ExceptionGroup inherits from Exception, so the existing except Exception as exc: still catches it, but cancel_generation(error=exc) and the downstream _chunk_queue.put(exc) are more readable with the original exception. The handler now unwraps a single-element group before forwarding. If multiple validators fail simultaneously, the suppressed siblings are logged before taking exceptions[0] so they're not invisible in production.

Observable behaviour on success is unchanged (same pvr list, same final_validations). On failure the caller still sees the first exception with the same type — the only change is that peer tasks are now awaited to completion before the handler runs, rather than leaking into the background.

Test: test_stream_with_chunking_cancels_peer_validators — one requirement raises immediately in stream_validate, the second sleeps for 5 s and sets a flag on completion. await result.acomplete() raises the first exception, and the slow sibling's completion flag is asserted to be unset afterwards, proving TaskGroup cancelled it rather than left it detached.

planetf1 added 2 commits May 13, 2026 07:52
…idators on gather failure

- Reorder copy(req) before generate_from_context so a raising __copy__
  cannot leave a backend feeder task wedged against a full _async_queue
- Add try/except BaseException around the post-generate setup window;
  on failure cancel_generation() is called and the exception re-raised
- Replace both asyncio.gather sites in _orchestrate_streaming with
  asyncio.TaskGroup so peer validators are cancelled on first failure
- Unwrap ExceptionGroup from TaskGroup in the exception handler;
  log any suppressed siblings before forwarding the first exception
- Extend Requirement.stream_validate docstring with __copy__ failure contract

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
…ding.Event

- Add _cancel_hook: Callable[[], None] | None to ModelOutputThunk; called
  before asyncio task cancellation so backends running generation in a thread
  receive a cooperative stop signal; exceptions logged and suppressed
- Reset _cancel_hook = None in all three copy sites (_copy_from, __copy__,
  __deepcopy__) — copied MOTs are distinct computations
- Add _EventStoppingCriteria and _install_cancel_stopping_criteria helpers
  in huggingface.py; both streaming paths wire output._cancel_hook = event.set
  before creating the generation task
- Move import logging to module level in base.py

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
@planetf1 planetf1 requested a review from psschwei May 13, 2026 06:53
planetf1 added 2 commits May 13, 2026 12:10
- huggingface.py: guard _install_cancel_stopping_criteria behind `if
  stream:` at both call sites (chat and non-chat paths). Previously the
  StoppingCriteria was installed unconditionally, silently wrapping any
  caller-supplied stopping_criteria and adding per-token overhead on all
  non-streaming HF calls. Non-streaming paths have no orchestrator to
  call cancel_generation(), so the hook was dead code on that path.

- base.py: in cancel_generation(), use task.cancelling() > 0 to
  distinguish a child task's CancelledError (expected after .cancel())
  from an external CancelledError injected into the outer task. Previously
  both were caught and swallowed, meaning an external cancellation of the
  orchestration task during cleanup would be silently absorbed rather
  than propagated.

- streaming.py: assert the returned MOT is not already computed before
  starting the orchestrator. A backend that silently ignores
  ModelOption.STREAM would previously cause the orchestrator loop to
  skip, produce empty full_text, and pass final validators against "".
  The new RuntimeError fails fast with an actionable message.

Assisted-by: Claude Code
- huggingface.py: replace `_cancel_event.set if stream else None` ternary
  with `if stream: output._cancel_hook = _cancel_event.set` to silence
  Pyright possibly-unbound warning and improve readability (no-op change
  since _cancel_hook defaults to None)

- base.py: correct misleading comment "Python 3.12+" → "Python 3.11+";
  Task.cancelling() was added in 3.11, the comment was wrong

- streaming.py: add RuntimeError to Raises: section of
  stream_with_chunking() docstring (already raised since bf9a62b)

- test/core/test_base.py: add regression test for the outer-cancellation
  re-raise path in cancel_generation(); exercises cur.cancelling() > 0
  branch that was added in bf9a62b but had no direct test coverage

Assisted-by: Claude Code
@planetf1
Copy link
Copy Markdown
Contributor Author

Two fix commits landed since the last review round — summary for @psschwei and @nrfulton:

bf9a62bc — three pre-merge correctness fixes

  1. HF backend scope leak (huggingface.py): _install_cancel_stopping_criteria was unconditionally called on both streaming and non-streaming paths. On the non-streaming path there is no orchestrator calling cancel_generation(), so the StoppingCriteria was dead code and silently wrapped any user-supplied stopping_criteria on every decode step. Now guarded with if stream:.

  2. Outer CancelledError absorption (core/base.py): cancel_generation() was catching all CancelledErrors with a bare pass, which silently absorbed external cancellation (e.g. asyncio.wait_for timeout). Fixed to re-raise when asyncio.current_task().cancelling() > 0, i.e. when the outer task itself is being cancelled rather than the inner _generate task we just called .cancel() on.

  3. Non-streaming MOT guard (stdlib/streaming.py): stream_with_chunking() now raises RuntimeError immediately if the backend returns an already-computed ModelOutputThunk. Previously this would cause the orchestrator loop to skip entirely, producing empty output and silently passing all final validators against an empty string.

9a715d62 — follow-up polish

  • huggingface.py: replaced _cancel_event.set if stream else None ternary with an explicit if stream: block to silence the Pyright possibly-unbound warning
  • base.py: corrected misleading comment "Python 3.12+""Python 3.11+" (Task.cancelling() was added in 3.11)
  • streaming.py: added RuntimeError to the Raises: section of stream_with_chunking()'s docstring
  • test/core/test_base.py: added test_cancel_generation_propagates_outer_cancellation — a direct regression test for the re-raise path in fix 2, which had no test coverage

Full fast test suite (1 462 tests) passes.

planetf1 added 2 commits May 13, 2026 13:05
The CancelledError re-raise path added in bf9a62b means the method
now raises asyncio.CancelledError when the calling task is being
cancelled (cur.cancelling() > 0). The docstring quality gate
(build-and-validate CI) flags any function that raises without a
Raises: section.

Assisted-by: Claude Code
After triggering the build-and-validate quality gate twice in the same
PR (missing Raises: on stream_with_chunking and cancel_generation),
add an explicit pre-push check to the self-review checklist. Any diff
that adds raise statements to library code should run the audit tool
locally before pushing.

Assisted-by: Claude Code
Copy link
Copy Markdown
Member

@psschwei psschwei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one minor question, but otherwise LGTM

Comment on lines +136 to +139
if self._orchestration_task is not None and self._orchestration_task.done():
task_exc = self._orchestration_task.exception()
if task_exc is not None:
raise task_exc
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we also need to flip _exception_surfaced here?

Copy link
Copy Markdown
Contributor

@jakelorocco jakelorocco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good; a few minor nits

Comment on lines +42 to +51
async def stream_validate(
self, chunk: str, *, backend: Backend, ctx: Context
) -> PartialValidationResult:
self._count += 1
if self._count > self._limit:
return PartialValidationResult(
"fail",
reason=f"Response exceeded {self._limit} sentence limit mid-stream",
)
return PartialValidationResult("unknown")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation means that you can only utilize this requirement with a given chunker. Is that the intention? I feel like we should encourage writing requirements that are chunker agnostic where possible.

Comment on lines +53 to +61
async def validate(
self,
backend: Backend,
ctx: Context,
*,
format: type | None = None,
model_options: dict | None = None,
) -> ValidationResult:
return ValidationResult(result=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think validate and stream_validate should return equivalent results for most requirements.

Comment on lines +1039 to +1055
req = FailOnNthChunkText(n=2)

result = await stream_with_chunking(
_action(), backend, _ctx(), quick_check_requirements=[req], chunking="sentence"
)
yielded: list[str] = []
async for chunk in result.astream():
yielded.append(chunk)
await result.acomplete()

assert result.completed is False
# Consumer received only chunk 1.
assert yielded == ["One."]
# full_text must match what the consumer received — not the raw delta.
assert result.full_text == "One."
# as_thunk.value must agree with full_text.
assert result.as_thunk.value == result.full_text
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't hold if you have it fail on the 3rd chunk. Individual chunks strip whitespace, but the full text has whitespace between sentences.

Comment on lines +151 to +157
On early exit, ``cancel_generation()`` forces the MOT into a
computed state without running the backend's
``post_processing()``. Telemetry fields on the returned thunk
(``generation.usage``, ``generation.ttfb_ms``, etc.) may
therefore be ``None`` or reflect the partial state at
cancellation time. ``value`` and ``streaming`` are reliable;
usage totals are not.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that post_processing doesn't run makes me a bit nervous. It means that several key chunk concatenation steps won't be done for the mot's values either.

Maybe this just requires adding some documentation that cancelled mots won't have a parsed_repr value, etc...

raise task_exc

@property
def as_thunk(self) -> ModelOutputThunk:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mots can be typed. We lose that typing information here. I wonder if we should propagate it or just force the caller to investigate the parsed type?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(stdlib): implement stream_with_chunking() with per-chunk validation

5 participants