Skip to content

feat: partial turn preservation and cooperative stream cancellation#279

Open
cpsievert wants to merge 19 commits intomainfrom
feat/stream-cancellation
Open

feat: partial turn preservation and cooperative stream cancellation#279
cpsievert wants to merge 19 commits intomainfrom
feat/stream-cancellation

Conversation

@cpsievert
Copy link
Copy Markdown
Collaborator

@cpsievert cpsievert commented Apr 2, 2026

Summary

PR 2 in the streaming improvements series (after #276). Adds:

  • Partial turn preservation: When a stream is interrupted (closed early, cancelled), the accumulated content so far is saved as a partial AssistantTurn with partial_reason set, so conversation state isn't lost
  • StreamController: A cooperative cancellation mechanism for stream() and stream_async() — callers can request the stream stop cleanly via controller.cancel(), which triggers the partial turn preservation path
  • Display improvements: Partial turns show [interrupted] in the Chat repr; partial turns are excluded from token accounting and cost calculations

Changes

  • chatlas/_turn.py: Added partial_reason field to AssistantTurn and merge_content_text helper
  • chatlas/_stream_controller.py: New StreamController class for cooperative cancellation
  • chatlas/_chat.py: stream()/stream_async() accept optional controller parameter; _submit_turns/_submit_turns_async wrap streaming in try/finally to preserve partial turns on interruption
  • chatlas/__init__.py: Export StreamController

Test plan

  • make check-types passes (0 errors)
  • VCR tests for partial turn preservation (sync + async)
  • VCR tests for StreamController cancellation (sync + async)
  • Snapshot test for [interrupted] display in Chat repr
  • Existing tests still pass (465 passed, 4 skipped)

🤖 Generated with Claude Code

cpsievert and others added 11 commits April 2, 2026 12:45
Add partial_reason field and is_partial property to AssistantTurn for
marking incomplete turns on stream interruption. Add merge_content_text()
helper to combine adjacent ContentText/ContentThinking fragments.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Restructure _submit_turns and _submit_turns_async to eagerly append a
partial AssistantTurn to self._turns before streaming begins. On each
chunk, content is appended to the partial turn in-place. On normal
completion, the partial turn is replaced with the full turn. On
interruption (GeneratorExit, KeyboardInterrupt, CancelledError), the
finally block merges adjacent content fragments via merge_content_text().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Partial turns (from interrupted streams) have no token or cost data.
Filter them out in get_cost() and get_tokens() to avoid errors.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Partial assistant turns now show their partial_reason (e.g. [interrupted])
instead of token counts. Token/cost totals in the Chat header exclude
partial turns.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Cast Content to ContentUnion for list append compatibility and
merge_content_text results. Use isinstance check in finally block
instead of accessing is_partial on Turn base type.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
StreamController provides a simple cancel/reset/cancelled/reason API
for cooperatively cancelling streaming responses. Exported from chatlas.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Thread StreamController through stream → _chat_impl → _submit_turns
(and async equivalents). When controller.cancelled is True, the
streaming loop breaks and the partial turn's reason is set from the
controller. Also skips tool invocation when cancelled.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Both chat() and chat_async() now create an internal StreamController
and thread it through _chat_impl. This ensures the try/finally partial
turn machinery is always active, even for non-streaming chat calls.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Capture all content types (not just text) in partial turns so
  ContentToolRequest etc. aren't silently dropped on interruption
- Default-create StreamController when none provided, eliminating
  all `if controller is not None` guards
- Add comments explaining for/else + GeneratorExit interaction
- Add thread-safety comment on StreamController.cancel() ordering
- Return list[ContentUnion] from merge_content_text to avoid casts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

This comment was marked as outdated.

cpsievert and others added 7 commits April 2, 2026 14:24
Introduces TurnAccumulator in chatlas/_turn_accumulator.py mirroring
ellmer's R6 class, along with merge_content_text helper and full test
coverage in tests/test_turn_accumulator.py.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Replace assert with RuntimeError for precondition checks
- Narrow update_turn param to ContentUnion (removes cast)
- Use model_construct for ContentThinking merge (consistency)
- Remove unused ContentToolRequest import from tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ator

Delegates partial-turn lifecycle management to TurnAccumulator, replacing
the inline for/else + partial turn index tracking with clean begin/update/
complete/finalize calls. Also closes the HTTP response in finally, drops the
local merge_content_text (now in _turn_accumulator.py), and updates the test
import accordingly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tion

Four copies of the validate-type/compute-tokens/compute-cost/log pattern
(sync/async × streaming/non-streaming) consolidated into one function.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…turn filtering

- Add _ensure_ready() to StreamController that warns and auto-resets
  if already cancelled (aligns with ellmer's as_controller() behavior)
- Add _as_controller() helper, replacing redundant StreamController()
  creation at 6 call sites with one consistent pattern
- Widen TurnAccumulator.update_turn to accept Content, removing 2 cast
  sites and the ContentUnion import from _chat.py
- Fix get_tokens() to filter partial turns at any position in history,
  not just trailing (aligns with ellmer's discard approach)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@cpsievert cpsievert requested a review from Copilot April 2, 2026 20:45
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (2)

chatlas/_chat.py:1271

  • stream() wraps _chat_impl() but doesn’t explicitly close the underlying generator if the caller closes the wrapper early. Because the partial-turn preservation relies on generator finalization (finally in _submit_turns), it’s safer to ensure generator.close() is called in a finally block inside wrapper() so the partial turn (and provider response) are finalized deterministically (especially on non-refcounted Python implementations).
            controller=controller,
        )

        def wrapper() -> Generator[
            str | ContentThinking | ContentToolRequest | ContentToolResult, None, None

chatlas/_chat.py:1386

  • Similar to the sync path: stream_async()’s wrapper doesn’t explicitly ensure the underlying async generator is closed when the wrapper is closed early. Adding a try/finally that awaits the inner generator’s aclose() (when available) would make partial-turn preservation and transport cleanup deterministic.
        controller = _as_controller(controller)

        async def wrapper() -> AsyncGenerator[
            str | ContentThinking | ContentToolRequest | ContentToolResult, None
        ]:
            with display:
                async for chunk in self._chat_impl_async(

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +15 to +20
if isinstance(last, ContentText) and isinstance(item, ContentText):
merged[-1] = ContentText.model_construct(text=last.text + item.text)
elif isinstance(last, ContentThinking) and isinstance(item, ContentThinking):
merged[-1] = ContentThinking.model_construct(
thinking=last.thinking + item.thinking
)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When merging adjacent ContentThinking fragments, the new object is constructed with only thinking=... and drops any extra metadata that may be present on either fragment. Providers appear to rely on ContentThinking.extra (e.g., signatures / original blocks) when serializing back into API message formats, so this merge can silently lose important information. Consider preserving extra (e.g., keep the first/last non-None extra, or merge dicts when both present) or avoid merging thinking blocks when extra differs.

Copilot uses AI. Check for mistakes.
Comment on lines +2805 to +2806
turn = resolve_assistant_turn(self.provider, turn)
self._turns.extend([user_turn, turn])
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this logic now indented into the else block?

def _ensure_ready(self) -> None:
"""Auto-reset if already cancelled (prevents stale controller bugs)."""
if self._cancelled:
import warnings
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to top level import.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants