feat: partial turn preservation and cooperative stream cancellation#279
feat: partial turn preservation and cooperative stream cancellation#279
Conversation
Add partial_reason field and is_partial property to AssistantTurn for marking incomplete turns on stream interruption. Add merge_content_text() helper to combine adjacent ContentText/ContentThinking fragments. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Restructure _submit_turns and _submit_turns_async to eagerly append a partial AssistantTurn to self._turns before streaming begins. On each chunk, content is appended to the partial turn in-place. On normal completion, the partial turn is replaced with the full turn. On interruption (GeneratorExit, KeyboardInterrupt, CancelledError), the finally block merges adjacent content fragments via merge_content_text(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Partial turns (from interrupted streams) have no token or cost data. Filter them out in get_cost() and get_tokens() to avoid errors. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Partial assistant turns now show their partial_reason (e.g. [interrupted]) instead of token counts. Token/cost totals in the Chat header exclude partial turns. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Cast Content to ContentUnion for list append compatibility and merge_content_text results. Use isinstance check in finally block instead of accessing is_partial on Turn base type. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
StreamController provides a simple cancel/reset/cancelled/reason API for cooperatively cancelling streaming responses. Exported from chatlas. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Thread StreamController through stream → _chat_impl → _submit_turns (and async equivalents). When controller.cancelled is True, the streaming loop breaks and the partial turn's reason is set from the controller. Also skips tool invocation when cancelled. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Both chat() and chat_async() now create an internal StreamController and thread it through _chat_impl. This ensures the try/finally partial turn machinery is always active, even for non-streaming chat calls. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Capture all content types (not just text) in partial turns so ContentToolRequest etc. aren't silently dropped on interruption - Default-create StreamController when none provided, eliminating all `if controller is not None` guards - Add comments explaining for/else + GeneratorExit interaction - Add thread-safety comment on StreamController.cancel() ordering - Return list[ContentUnion] from merge_content_text to avoid casts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Introduces TurnAccumulator in chatlas/_turn_accumulator.py mirroring ellmer's R6 class, along with merge_content_text helper and full test coverage in tests/test_turn_accumulator.py. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Replace assert with RuntimeError for precondition checks - Narrow update_turn param to ContentUnion (removes cast) - Use model_construct for ContentThinking merge (consistency) - Remove unused ContentToolRequest import from tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ator Delegates partial-turn lifecycle management to TurnAccumulator, replacing the inline for/else + partial turn index tracking with clean begin/update/ complete/finalize calls. Also closes the HTTP response in finally, drops the local merge_content_text (now in _turn_accumulator.py), and updates the test import accordingly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tion Four copies of the validate-type/compute-tokens/compute-cost/log pattern (sync/async × streaming/non-streaming) consolidated into one function. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…turn filtering - Add _ensure_ready() to StreamController that warns and auto-resets if already cancelled (aligns with ellmer's as_controller() behavior) - Add _as_controller() helper, replacing redundant StreamController() creation at 6 call sites with one consistent pattern - Widen TurnAccumulator.update_turn to accept Content, removing 2 cast sites and the ContentUnion import from _chat.py - Fix get_tokens() to filter partial turns at any position in history, not just trailing (aligns with ellmer's discard approach) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (2)
chatlas/_chat.py:1271
stream()wraps_chat_impl()but doesn’t explicitly close the underlying generator if the caller closes the wrapper early. Because the partial-turn preservation relies on generator finalization (finallyin_submit_turns), it’s safer to ensuregenerator.close()is called in afinallyblock insidewrapper()so the partial turn (and provider response) are finalized deterministically (especially on non-refcounted Python implementations).
controller=controller,
)
def wrapper() -> Generator[
str | ContentThinking | ContentToolRequest | ContentToolResult, None, None
chatlas/_chat.py:1386
- Similar to the sync path:
stream_async()’s wrapper doesn’t explicitly ensure the underlying async generator is closed when the wrapper is closed early. Adding atry/finallythat awaits the inner generator’saclose()(when available) would make partial-turn preservation and transport cleanup deterministic.
controller = _as_controller(controller)
async def wrapper() -> AsyncGenerator[
str | ContentThinking | ContentToolRequest | ContentToolResult, None
]:
with display:
async for chunk in self._chat_impl_async(
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if isinstance(last, ContentText) and isinstance(item, ContentText): | ||
| merged[-1] = ContentText.model_construct(text=last.text + item.text) | ||
| elif isinstance(last, ContentThinking) and isinstance(item, ContentThinking): | ||
| merged[-1] = ContentThinking.model_construct( | ||
| thinking=last.thinking + item.thinking | ||
| ) |
There was a problem hiding this comment.
When merging adjacent ContentThinking fragments, the new object is constructed with only thinking=... and drops any extra metadata that may be present on either fragment. Providers appear to rely on ContentThinking.extra (e.g., signatures / original blocks) when serializing back into API message formats, so this merge can silently lose important information. Consider preserving extra (e.g., keep the first/last non-None extra, or merge dicts when both present) or avoid merging thinking blocks when extra differs.
| turn = resolve_assistant_turn(self.provider, turn) | ||
| self._turns.extend([user_turn, turn]) |
There was a problem hiding this comment.
Why is this logic now indented into the else block?
| def _ensure_ready(self) -> None: | ||
| """Auto-reset if already cancelled (prevents stale controller bugs).""" | ||
| if self._cancelled: | ||
| import warnings |
There was a problem hiding this comment.
Move to top level import.
Summary
PR 2 in the streaming improvements series (after #276). Adds:
AssistantTurnwithpartial_reasonset, so conversation state isn't lostStreamController: A cooperative cancellation mechanism forstream()andstream_async()— callers can request the stream stop cleanly viacontroller.cancel(), which triggers the partial turn preservation path[interrupted]in the Chat repr; partial turns are excluded from token accounting and cost calculationsChanges
chatlas/_turn.py: Addedpartial_reasonfield toAssistantTurnandmerge_content_texthelperchatlas/_stream_controller.py: NewStreamControllerclass for cooperative cancellationchatlas/_chat.py:stream()/stream_async()accept optionalcontrollerparameter;_submit_turns/_submit_turns_asyncwrap streaming in try/finally to preserve partial turns on interruptionchatlas/__init__.py: ExportStreamControllerTest plan
make check-typespasses (0 errors)[interrupted]display in Chat repr🤖 Generated with Claude Code