Skip to content

Experimental Native Protocol Rewrite#162

Draft
Mirrowel wants to merge 182 commits into
devfrom
experimental
Draft

Experimental Native Protocol Rewrite#162
Mirrowel wants to merge 182 commits into
devfrom
experimental

Conversation

@Mirrowel
Copy link
Copy Markdown
Owner

Experimental Native Protocol Roadmap

This branch is for a long-running experimental rewrite that makes native protocol support the first-class extension point of rotator_library, while preserving the existing credential rotation, quota, fair-cycle, session tracking, and provider plugin strengths.

Operating Rules

  • Work only on the experimental branch.
  • Keep all repository work inside C:\Projects\test\LLM-API-Key-Proxy and child paths.
  • Treat commits as checkpoints. A phase may contain many commits.
  • Commit messages must include a body describing what changed, why, tests run, and follow-up considerations.
  • Do not commit phase reports written for the user unless explicitly requested. Planning docs under docs/experimental/ are committed.
  • Before each phase implementation, first produce a fresh exhaustive phase plan in conversation text, based on the current code state. Only after that plan is settled should it be written to docs/experimental/phase-N-*.md.
  • After each phase implementation, call both explore and explore-heavy agents to review the work against the phase plan, external reference areas, and current proxy behavior. Fix findings and re-review as needed.
  • Keep LiteLLM as a fallback path for protocols/providers that are not natively covered yet. Native protocol support should be preferred when available.

Strategic Goal

The target architecture is:

client API request
  -> protocol parse into unified representation
  -> field-cache injection
  -> adapter chain
  -> provider override hooks
  -> provider-native request build
  -> provider execution and credential rotation
  -> provider-native response/stream parse
  -> field-cache extraction
  -> adapter chain
  -> protocol formatting for the client
  -> transaction logging for every transform state

Providers should be able to declare an existing protocol and only override the parts that are genuinely provider-specific. A custom provider should usually be configurable through protocol choice, adapters, field-cache rules, auth strategy, and model options rather than requiring a large bespoke provider implementation.

Priority Order

  1. Native protocol foundations, unified types, transformers, adapters, and field-cache rules.
  2. OpenAI Responses API support, including future WebSocket extension points.
  3. Provider work following the protocol layer: Claude Code, Codex, Copilot, Antigravity, and Gemini CLI parity review.
  4. Routing and fallback groups, with optional target-group selectors later.
  5. Retry, provider/model cooldown, and failover cleanup.
  6. Protocol-aware quota, usage, and cost normalization.
  7. Streaming library hardening: SSE now, WebSocket-ready later.
  8. Config polish using .env and optional JSON. No SQLite dependency for now.
  9. Extensive staged tests and review-agent verification.

Non-Goals For This Branch

  • Do not make the proxy a full multi-user admin product yet.
  • Do not require SQLite or Postgres for the main feature set.
  • Do not remove LiteLLM before native coverage exists.
  • Do not replace the existing UsageManager, fair-cycle, custom caps, or evidence-based SessionTracker.
  • Do not port frontend/UI work from the external reference gateway.

Current Strengths To Preserve

  • Credential-level rotation and priority-aware selection.
  • Fair cycle and custom caps.
  • Windowed quota tracking and quota groups.
  • Evidence-based session tracking with compaction handling.
  • Provider plugin discovery.
  • Gemini CLI provider behavior unless a reviewed change is clearly better.
  • Resilient file/JSON state writing.
  • Dynamic OpenAI-compatible provider discovery.

Reference Gateway Ideas To Import Carefully

  • Unified protocol/transformer style.
  • Adapter registry and configurable provider/model adapters.
  • Target groups and direct routing syntax, adapted into fallback-first routing.
  • Responses API transformer and storage concepts.
  • Stream TTFB/stall detection concepts, implemented with Python-native async primitives.
  • Provider/model cooldown and retry-history concepts.
  • Usage/cost normalization and provider-reported cost extraction.
  • Broader provider support patterns for Claude Code, Codex, Copilot, and Antigravity.

Phase Index

  1. Protocol Core.
  2. Transform Pass Logging.
  3. Adapter and Field Cache System.
  4. Responses API and WebSocket-Ready Transport Shape.
  5. Provider Protocol Overhaul.
  6. Routing and Fallback Groups.
  7. Retry/Cooldown/Failover Cleanup.
  8. Streaming Library Upgrade.
  9. Usage, Quota, and Cost Accuracy.
  10. Config Polish.

Each phase may be subdivided if implementation scope becomes too large.

Completeness Matrix

This matrix exists so the branch does not lose any requested scope while phases evolve. The phase plans are still refreshed before implementation, but every item below must remain accounted for.

Requested area Planned coverage
Protocols are priority #1 Phases 1 and 4 create native protocol foundations and Responses support before provider work.
Protocols are bases, not gospel Phase 1 requires override-friendly protocol methods, subclassing, copy/mutate registration, and provider-specific overrides.
Move away from LiteLLM Phase 1 adds a litellm_fallback protocol path; later providers should prefer native protocols and use LiteLLM only for unsupported coverage.
Add protocols automatically like providers Phase 1 adds protocol auto-discovery and registry behavior modeled after provider discovery.
Cover current providers and reference providers Phase 1 protocols must cover shapes used by current providers; Phase 5 covers Claude Code, Codex, Copilot, Antigravity, and Gemini CLI parity.
Responses API is very needed Phase 4 is dedicated to Responses, previous_response_id, storage, SSE, and WebSocket-ready transport shape.
WebSocket support later Phases 1, 4, and 8 require transport separation so WebSocket can be added without rewriting protocol logic.
Adapters/transformers tied to protocols Phases 1, 2, and 3 define protocol parse/build plus transform tracing, adapter registry, and field-cache rules.
Cache and return provider fields Phase 3 implements configurable extraction/injection rules for request, response, and stream fields with scope and mode controls.
Reasoning content and similar fields Phase 3 explicitly covers reasoning content, thinking signatures, prompt cache keys, response IDs, and provider session IDs.
Return all possible or last user/assistant use Phase 3 modes include last, all, last_user_turn, last_assistant_turn, and per_tool_call.
Per-model custom provider behavior Phases 3, 5, and 10 cover provider/model field cache rules, adapters, model options, and optional JSON config.
Transaction logging after every transform Phase 2 adds ordered request, response, and stream transform trace passes and integrates them with transaction logging.
Comments, docstrings, and key decisions All implementation phases require docstrings for public abstractions and comments for non-obvious transform, protocol, and future-extension decisions.
Providers are priority #2 Phase 5 follows protocol foundations with Claude Code, Codex, Copilot, Antigravity, and Gemini CLI parity review.
Antigravity comparison Phase 5 explicitly compares the reference Antigravity behavior against src/rotator_library/providers/_retired/.
Routing is interesting Phase 6 implements fallback chains first, with target-group selectors later if useful.
Fallback groups preferred over target groups Phase 6 starts with ordered fallback groups and only adds target-group-style selectors after that base works.
Retry/cooldown/failover cleanup Phase 7 makes provider/model cooldown real, adds retry history, backoff, retry-after precedence, and success reset.
Quota/usage/cost improvements Phase 9 adds protocol-aware normalizers, provider-reported cost extraction, structured cost fields, and checker abstractions while keeping existing usage engines.
Streaming as library capability Phase 8 hardens streaming below the proxy route layer with TTFB, TTFT, stall detection, cancellation, and transport-aware stream events.
Config via env/json, no SQLite Phase 10 adds optional JSON config with env overrides and validation. SQLite remains out of scope.
Multi-user proxy later The branch keeps multi-user/admin features as a future expansion and only preserves extension points where natural.
Exhaustive tests in stages Every phase requires tests alongside implementation and phase-end review by both explore and explore-heavy.
Reports are for the user, not git 06-phase-workflow.md says planning docs are committed, but phase reports are not committed by default.

Code Quality Expectations

  • Public protocol, adapter, transport, field-cache, and provider-extension classes must have docstrings that explain intent, override points, and future expansion hooks.
  • Non-obvious transformations must have comments explaining why data is changed, preserved, reordered, or intentionally dropped.
  • Lossy protocol conversions must be documented at the conversion site.
  • Future WebSocket, target-group, and multi-user extension seams should be noted in comments where they affect today's design.
  • Tests should prefer golden fixtures for protocol shapes and focused unit tests for transform edge cases.

Mirrowel added 30 commits May 30, 2026 22:35
Captures the experimental branch workflow, protocol architecture goals, transform logging requirements, field-cache rules, provider priorities, routing, retry, usage, streaming, and config direction.

Documents that every phase must be freshly planned in conversation, written as planning docs, reviewed by explore and explore-heavy agents, and reported to the user without committing reports by default.
Introduces protocol-neutral request, response, stream event, content, tool, reasoning, usage, cost, and context dataclasses with JSON-safe serialization for future transform tracing.

Adds the override-friendly ProtocolAdapter base and auto-discovery registry with alias handling, duplicate detection, shared stateless instances, and tests for serialization, default preservation, registration, aliases, and protocol errors.

Tests: python -m pytest tests/test_protocol_registry.py
Adds the explicit LiteLLM fallback protocol marker and a native OpenAI Chat Completions adapter for request parsing/building, response parsing/formatting, usage and provider-reported cost extraction, reasoning preservation, tool calls, multimodal content blocks, and SSE chunk parsing.

The adapter remains isolated from runtime execution and preserves unknown extension fields for future adapter, field-cache, and transform logging phases.

Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py
Adds a native Anthropic Messages protocol adapter for request parsing/building, response formatting, stream event parsing, tool_use/tool_result blocks, thinking and redacted-thinking signature preservation, and cache usage normalization.

The existing compatibility routes remain untouched; this adapter is an isolated base for later native provider execution, field-cache rules, and transform logging.

Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py
Adds a native Gemini generateContent adapter for request parsing/building, response formatting, stream event parsing, content parts, function calls/responses, thought signatures, generation config, safety settings, tools, and Gemini usage metadata.

The adapter preserves raw Gemini-native fields and remains isolated from runtime execution so provider migration can happen in later checkpoints.

Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py
Adds a native Responses protocol adapter for request parsing/building, response formatting, event-stream parsing, previous_response_id preservation, input and output item handling, reasoning items, function calls, usage details, provider-reported costs, and a WebSocket-ready transport capability flag.

Routes, storage, and runtime wiring remain deferred to later checkpoints; this commit only adds the reusable protocol base and tests.

Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py tests/test_protocol_responses.py
Addresses Phase 1 review findings by treating raw payloads as provenance instead of stale formatting authority in native adapters, tightening registry alias/name collision handling, adding JSON-safe serialization fallbacks, and avoiding default reasoning-token double counting.

Adds nested raw preservation for tool, result, and reasoning structures, exposes WebSocket as a future Responses transport seam rather than current formatting support, expands Gemini tool declaration parsing, and switches protocol tests to the public package import path through a local test path fixture.

Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py tests/test_protocol_responses.py

Tests: python -m pytest tests/test_session_tracking.py tests/test_selection_engine.py
Preserves Anthropic system block shape and metadata during rebuilds, keeps unknown Responses output items while still applying unified-message mutations, and groups Gemini multi-declaration tools back into their original native tool container.

Adds tests for Anthropic system cache metadata, Responses future output-item preservation, and Gemini multi-declaration rebuild fidelity.

Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py tests/test_protocol_responses.py

Tests: python -m pytest tests/test_session_tracking.py tests/test_selection_engine.py
Adds the Phase 2 plan for additive transform-pass transaction logging, including trace entry shape, writer behavior, request/response/stream pass names, sanitization, TransactionLogger and ProviderLogger integration, tests, risks, and review checkpoints.

The Phase 1 report remains uncommitted for user-facing review only.
Introduces transform trace entries, a local-sequence JSONL/snapshot writer, recursive key-based redaction, filesystem-safe snapshot names, and JSON-safe payload serialization for future protocol and adapter pass logging.

The trace writer is observability-only and isolated from runtime transaction logging in this checkpoint.

Tests: python -m pytest tests/test_transform_trace.py
Wires the transform trace writer into TransactionLogger and ProviderLogger while preserving legacy request, transformed request, response, streaming chunk, metadata, and provider log files.

Adds trace entries for raw client requests, prepared provider requests, raw and parsed stream chunks, assembled stream responses, final client responses, provider request payloads, provider raw stream chunks, provider final responses, and provider errors.

Includes transaction logger tests for legacy compatibility, redaction, equality-skipped transformed requests, provider traces, streaming wrapper traces, and disabled logging.

Tests: python -m pytest tests/test_transform_trace.py tests/test_transaction_logger_transform_trace.py

Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py tests/test_protocol_responses.py

Tests: python -m pytest tests/test_session_tracking.py tests/test_selection_engine.py
Hardens Phase 2 tracing after review by adding request, session, scope, classifier, exact model, and credential correlation to trace entries where available.

Expands redaction for cookies and credential-bearing headers, extracts structured fields from SDK-like objects before repr fallback, scrubs header-like secrets from provider error text, and adds a standardized transform_log_error helper.

Prevents provider snapshot collisions by namespacing provider writer snapshots while keeping stream chunks in JSONL only.

Tests: python -m pytest tests/test_transform_trace.py tests/test_transaction_logger_transform_trace.py

Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py tests/test_protocol_responses.py

Tests: python -m pytest tests/test_session_tracking.py tests/test_selection_engine.py
Adds the Phase 3 plan for the adapter registry, built-in adapter bases, field-cache rule schema, path engine, store abstractions, scoped key behavior, transform trace integration, tests, risks, and review checkpoints.

Reports remain uncommitted for user-facing review only.
Adds the Phase 3 adapter foundation with an override-friendly async base adapter, adapter context, ordered chain runner, auto-discovered registry, aliases, duplicate collision checks, and built-in base adapters for no-op, model override, developer-role suppression, and reasoning content normalization.

Runtime request execution is not wired to the adapter chain yet; this checkpoint keeps behavior unchanged while establishing the extension point for native protocols and providers.

Tests: python -m pytest tests/test_adapter_registry.py
Adds field-cache rule and injection dataclasses, cache context scope values, default provider/model/classifier/session scoping, and a small JSON-path-like engine for extraction and predictable injection.

The path helper supports dict keys, list indexes, wildcard extraction, tail indexes, missing-path no-ops, and explicit errors for malformed paths or wildcard injection.

Tests: python -m pytest tests/test_field_cache_paths.py
Adds async field-cache stores, a ProviderCache-backed wrapper, scoped cache key construction, and the extraction/injection engine for last, all, turn-compatible, stream-event, and per-tool-call-validated rules.

The engine copies payloads by default, isolates values by provider/model/session/classifier/credential scope, skips missing required session scope, and emits transform trace metadata when a transaction logger is supplied.

Tests: python -m pytest tests/test_field_cache_engine.py tests/test_field_cache_paths.py
Adds the missing before_field_cache_extraction and before_field_cache_injection trace passes so field-cache operations now emit both before and after states.

Adds trace-focused tests for adapter chains, field-cache extraction/injection, rule metadata, cache hits, mutation flags, and transform_log_error emission on failed injection.

Tests: python -m pytest tests/test_field_cache_trace.py tests/test_field_cache_engine.py tests/test_adapter_registry.py
Adds optional provider declarations for native protocol name, ordered adapter names, adapter config, and field-cache rules, all defaulting to empty/no-op behavior so existing providers remain on the current execution path until they opt in.

These methods are the Phase 3 bridge that later provider work will use to attach native protocols, adapter chains, and provider-specific field-cache rules per model.

Tests: python -m pytest tests/test_provider_protocol_declarations.py tests/test_adapter_registry.py tests/test_field_cache_engine.py tests/test_field_cache_paths.py tests/test_field_cache_trace.py
Adds the planned field_rename adapter, fixes field-cache trace direction for stream-sourced request injection, caps trace sample values, and documents the current limits of turn/tool-cache modes.

Expands coverage for credential/provider scope isolation, stream-sourced injection trace direction, large sample truncation, field_rename behavior, and plain provider no-op protocol defaults.

Tests: python -m pytest tests/test_adapter_registry.py tests/test_field_cache_paths.py tests/test_field_cache_engine.py tests/test_field_cache_trace.py tests/test_provider_protocol_declarations.py

Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py tests/test_protocol_responses.py tests/test_transform_trace.py tests/test_transaction_logger_transform_trace.py tests/test_session_tracking.py tests/test_selection_engine.py
Adds the Phase 4 plan for Responses routes, response storage, previous_response_id continuation, bridge execution through the current client path, HTTP SSE conversion, WebSocket extension seams, tests, risks, and review checkpoints.

Reports remain uncommitted for user-facing review only.
Adds the Phase 4 Responses storage foundation with StoredResponse, local response ID generation, an in-memory store, and a ProviderCache-backed wrapper that accepts an injected cache instead of constructing one globally.

The store supports save, get, delete, and input item listing, preserves JSON-safe response metadata for previous_response_id continuation, and avoids SQLite or new persistence dependencies.

Tests: python -m pytest tests/test_responses_store.py
Adds the temporary Responses-to-chat bridge for Phase 4, converting parsed Responses requests into current chat-completions kwargs and converting chat-completion responses back into Responses objects.

The bridge preserves previous_response_id metadata, parent response messages, tool definitions, generation parameters, and unsupported extension fields for trace/debugging until native provider execution is wired in later phases.

Tests: python -m pytest tests/test_responses_bridge.py tests/test_responses_store.py
Adds the non-streaming Responses service around the protocol adapter, bridge, and response store with validation, previous_response_id loading, get/delete/input-items helpers, and transform trace passes.

The service keeps Phase 4 runtime conservative by bridging through the existing chat completion client path while preserving response storage and lineage metadata for later native provider work.

Tests: python -m pytest tests/test_responses_service.py tests/test_responses_bridge.py tests/test_responses_store.py
Adds FastAPI routes for POST /v1/responses, GET /v1/responses/{id}, DELETE /v1/responses/{id}, and GET /v1/responses/{id}/input_items using the Phase 4 ResponsesService.

The create route currently handles non-streaming requests through the bridge and returns a documented 501 for streaming until the SSE checkpoint lands next.

Tests: python -m pytest tests/test_responses_routes.py tests/test_responses_service.py tests/test_responses_bridge.py tests/test_responses_store.py
Adds Responses HTTP SSE formatting, chat-stream conversion, streamed response accumulation/storage, response.failed events on stream errors, and a WebSocket formatter seam that is explicit but not exposed as a runtime route.

Updates POST /v1/responses to return text/event-stream for stream=true while preserving the existing non-stream route behavior.

Tests: python -m pytest tests/test_responses_streaming.py tests/test_responses_routes.py tests/test_responses_service.py tests/test_responses_bridge.py tests/test_responses_store.py
Wires Responses routes into the transform trace logger when request logging is enabled, adds coverage for unsupported Responses fields preserved in bridge metadata, and strengthens streaming tests to assert SSE event order.

Tests: python -m pytest tests/test_responses_store.py tests/test_responses_bridge.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_streaming.py

Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py tests/test_protocol_responses.py tests/test_transform_trace.py tests/test_transaction_logger_transform_trace.py tests/test_adapter_registry.py tests/test_field_cache_paths.py tests/test_field_cache_engine.py tests/test_field_cache_trace.py tests/test_provider_protocol_declarations.py tests/test_session_tracking.py tests/test_selection_engine.py
Adds the Phase 5 plan for native provider execution, provider declarations, HTTP and streaming seams, priority provider order, Antigravity restoration constraints, Gemini CLI parity review, fallback policy, transform tracing, field-cache rules, tests, and review checkpoints.

Reports remain uncommitted for user-facing review only.
Adds the Phase 5 native provider foundation with execution context, HTTP transport wrapper, and non-streaming executor that runs protocol selection, adapter chains, field-cache injection/extraction, provider HTTP calls, and transform tracing.

The foundation is not wired into live request execution yet, preserving current provider behavior while giving priority provider work a testable native path.

Tests: python -m pytest tests/test_native_provider_executor.py tests/test_responses_store.py tests/test_responses_bridge.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_streaming.py
Adds opt-in native provider streaming support with streaming-capable transport seam, raw chunk tracing, protocol stream parsing, field-cache stream extraction, formatted client stream events, and transform error logging.

The streaming foundation remains isolated from live provider routing so existing providers keep current behavior while Phase 5 provider implementations gain a mocked native stream path.

Tests: python -m pytest tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_responses_store.py tests/test_responses_bridge.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_streaming.py tests/test_protocol_responses.py tests/test_transform_trace.py tests/test_transaction_logger_transform_trace.py
Adds the first priority Phase 5 provider as an explicit native integration skeleton with Anthropic Messages protocol declaration, adapter config, thinking-signature field-cache rule, native header/endpoint helpers, and mock-friendly model discovery.

This does not assume undocumented live behavior or wire the provider into the runtime native executor yet; it establishes a tested provider declaration path for later native routing.

Tests: python -m pytest tests/test_claude_code_provider.py tests/test_provider_protocol_declarations.py tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_protocol_anthropic_messages.py tests/test_adapter_registry.py tests/test_field_cache_engine.py
Mirrowel added 28 commits May 31, 2026 19:50
Extends non-streaming response usage normalization to dict responses so final_client_response always reflects post-normalization usage, matching the Phase 2c trace contract.

Tests: pytest tests/test_executor_usage_accounting.py tests/test_anthropic_transform_tracing.py tests/test_transaction_logger_transform_trace.py tests/test_responses_streaming.py tests/test_request_executor_stream_metrics.py
Adds the Phase 3c corrective plan for unified field-cache runtime sources and targets, native adapter trace safety, and credential-scope fail-closed behavior.

Tests: not run (planning document only)
Executes unified request/response/stream-event field-cache sources, supports metadata and unified-request injection targets in native execution, suppresses unsafe generic native adapter traces, and fails closed on missing credential scope.

Tests: pytest tests/test_field_cache_engine.py tests/test_field_cache_trace.py tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_adapter_registry.py tests/test_request_executor_native_routing.py tests/test_protocol_openai_chat.py tests/test_responses_streaming.py tests/test_transform_trace.py
Extracts request-source field-cache rules in native execution and redacts configured metadata-injection paths in native traces.

Tests: pytest tests/test_field_cache_engine.py tests/test_field_cache_trace.py tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_adapter_registry.py tests/test_request_executor_native_routing.py tests/test_protocol_openai_chat.py tests/test_responses_streaming.py tests/test_transform_trace.py
Adds the Phase 4c corrective plan for configurable Responses storage, continuation lineage replay, and top-level route error bodies.

Tests: not run (planning document only)
Adds configurable provider-cache-backed Responses storage, wires app startup to the configured store, replays parent input/output lineage for continuations, and returns top-level Responses error bodies.

Tests: pytest tests/test_experimental_config.py tests/test_responses_store.py tests/test_responses_bridge.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_streaming.py tests/test_protocol_responses.py tests/test_responses_usage_accounting.py tests/test_stream_transport.py
Replays parent Responses function-call output items as chat tool calls during previous_response_id continuations and documents durable Responses store env vars.

Tests: pytest tests/test_experimental_config.py tests/test_responses_store.py tests/test_responses_bridge.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_streaming.py tests/test_protocol_responses.py tests/test_responses_usage_accounting.py tests/test_stream_transport.py
Converts stored Responses function_call_output inputs into Chat tool messages during continuation lineage replay so tool-use continuations keep both calls and results.

Tests: pytest tests/test_experimental_config.py tests/test_responses_store.py tests/test_responses_bridge.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_streaming.py tests/test_protocol_responses.py tests/test_responses_usage_accounting.py tests/test_stream_transport.py
Adds the Phase 5c corrective plan for client-protocol native responses, provider native contract hooks, Claude Code hardening, Antigravity alias preservation, and centralized native streaming fail-closed behavior.

Tests: not run (planning document only)
Carries a client protocol through native execution so chat-completion routes receive OpenAI Chat responses from Anthropic, Responses, and Gemini native provider calls. Adds explicit provider native endpoint/header/operation hooks, Claude Code max_tokens/auth hardening, and safer Antigravity alias/thinking metadata handling.

Tests: pytest tests/test_request_executor_native_routing.py tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_native_streaming_transport_seam.py tests/test_claude_code_provider.py tests/test_codex_provider.py tests/test_copilot_provider.py tests/test_antigravity_provider_restore.py tests/test_provider_protocol_declarations.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_responses.py tests/test_protocol_gemini.py
Applies Antigravity thinking aliases to upstream request behavior, honors provider native opt-out hooks in auto mode, and formats cross-protocol native stream events as client OpenAI Chat SSE instead of raw provider chunks.

Tests: pytest tests/test_request_executor_native_routing.py tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_native_streaming_transport_seam.py tests/test_claude_code_provider.py tests/test_codex_provider.py tests/test_copilot_provider.py tests/test_antigravity_provider_restore.py tests/test_provider_protocol_declarations.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_responses.py tests/test_protocol_gemini.py
Uses the fail-closed native streaming support helper in live routing, extracts provider-native response cache fields before client-protocol formatting, and scopes Antigravity quota groups by model family instead of broad Gemini/Claude buckets.

Tests: pytest tests/test_request_executor_native_routing.py tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_native_streaming_transport_seam.py tests/test_claude_code_provider.py tests/test_codex_provider.py tests/test_copilot_provider.py tests/test_antigravity_provider_restore.py tests/test_provider_protocol_declarations.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_responses.py tests/test_protocol_gemini.py
Removes ambiguous Gemini 3 Pro preview from reverse alias normalization so plain preview requests stay preview while explicit low/high aliases still carry thinking behavior.

Tests: pytest tests/test_antigravity_provider_restore.py tests/test_request_executor_native_routing.py tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_provider_protocol_declarations.py tests/test_protocol_openai_chat.py tests/test_protocol_gemini.py
Adds the Phase 6c corrective plan for stale fallback tests, group target promotion, streaming execution-mode parity, native config hard stops, route-error aliases, and target namespace adjustment.

Tests: not run (planning document only)
Replaces stale fallback-group tests with current routing coverage, promotes requested models within fallback groups, aligns streaming execution-mode precedence with non-streaming behavior, hard-stops native config errors, expands structured route-error aliases, and rewrites target session namespaces during fallback cloning.

Tests: pytest tests/test_fallback_groups.py tests/test_fallback_resolver.py tests/test_fallback_policy.py tests/test_fallback_attempt_runner.py tests/test_request_executor_fallback_groups.py tests/test_request_executor_fallback_error_summary.py tests/test_routing_config.py tests/test_config_routing_json.py tests/test_routing_attempts.py tests/test_request_builder_routing.py tests/test_request_executor_native_routing.py tests/test_streaming_fallback_policy.py tests/test_retry_policy.py tests/test_cooldown_activation.py
Makes streaming auto-native selection call the same provider opt-out hook as non-streaming execution and rewrites fallback session namespaces with the target usage scope instead of preserving the first target scope prefix.

Tests: pytest tests/test_fallback_groups.py tests/test_fallback_resolver.py tests/test_fallback_policy.py tests/test_fallback_attempt_runner.py tests/test_request_executor_fallback_groups.py tests/test_request_executor_fallback_error_summary.py tests/test_routing_config.py tests/test_config_routing_json.py tests/test_routing_attempts.py tests/test_request_builder_routing.py tests/test_request_executor_native_routing.py tests/test_streaming_fallback_policy.py tests/test_retry_policy.py tests/test_cooldown_activation.py
Adds the Phase 7c corrective plan for cooldown budget fail-fast behavior, transient backoff thresholds, success reset, structured routing attempt history, and model-scoped cooldown isolation.

Tests: not run (planning document only)
Fails fast when active cooldowns exceed the request budget, prevents single generic transients from starting provider-wide cooldowns, records skipped transient failures for bounded backoff, clears failure history on success, and populates sanitized routing attempt history.

Tests: pytest tests/test_retry_policy.py tests/test_cooldown_activation.py tests/test_request_executor_fallback_groups.py tests/test_streaming_fallback_policy.py tests/test_fallback_policy.py tests/test_fallback_groups.py tests/test_fallback_resolver.py tests/test_request_executor_fallback_error_summary.py tests/test_routing_attempts.py tests/test_streaming_error_handler.py tests/test_streaming_usage_accounting.py tests/test_request_executor_stream_metrics.py
Adds the Phase 8c corrective plan for Responses runtime stream settings, heartbeat frames, upstream close behavior, timeout handling, disconnect handling, and Anthropic iterator close safety.

Tests: not run (planning document only)
Applies stream runtime settings to Responses streaming, adds non-visible heartbeat formatting, enforces TTFB and stall timeout handling, closes upstream streams on timeout or disconnect, and closes Anthropic compatibility streams when only the iterator exposes close.

Tests: pytest tests/test_responses_streaming.py tests/test_responses_routes.py tests/test_anthropic_transform_tracing.py tests/test_streaming_error_handler.py tests/test_streaming_usage_accounting.py tests/test_request_executor_stream_metrics.py tests/test_native_provider_streaming.py tests/test_native_streaming_transport_seam.py tests/test_stream_policy.py tests/test_stream_transport.py tests/test_config_stream_settings.py tests/test_stream_metrics.py tests/test_stream_events.py
Keeps pending upstream/acquisition tasks alive across heartbeat frames, enforces TTFB across acquisition and first-chunk waits, awaits pending task cancellation before upstream close, and adds combined heartbeat-timeout coverage.

Tests: pytest tests/test_responses_streaming.py tests/test_responses_routes.py tests/test_anthropic_transform_tracing.py tests/test_streaming_error_handler.py tests/test_streaming_usage_accounting.py tests/test_request_executor_stream_metrics.py tests/test_native_provider_streaming.py tests/test_native_streaming_transport_seam.py tests/test_stream_policy.py tests/test_stream_transport.py tests/test_config_stream_settings.py tests/test_stream_metrics.py tests/test_stream_events.py
Prevents heartbeat-yielded upstream and acquisition tasks from being overwritten before their results are consumed, and cancels pending acquisition/read tasks during final stream cleanup before closing upstream.

Tests: pytest tests/test_responses_streaming.py tests/test_responses_routes.py tests/test_anthropic_transform_tracing.py tests/test_streaming_error_handler.py tests/test_streaming_usage_accounting.py tests/test_request_executor_stream_metrics.py tests/test_native_provider_streaming.py tests/test_native_streaming_transport_seam.py tests/test_stream_policy.py tests/test_stream_transport.py tests/test_config_stream_settings.py tests/test_stream_metrics.py tests/test_stream_events.py
Uses one Responses TTFB deadline across stream acquisition and first chunk, and closes streams returned by completed acquisition tasks when the generator exits after a heartbeat.

Tests: pytest tests/test_responses_streaming.py tests/test_responses_routes.py tests/test_anthropic_transform_tracing.py tests/test_streaming_error_handler.py tests/test_streaming_usage_accounting.py tests/test_request_executor_stream_metrics.py tests/test_native_provider_streaming.py tests/test_native_streaming_transport_seam.py tests/test_stream_policy.py tests/test_stream_transport.py tests/test_config_stream_settings.py tests/test_stream_metrics.py tests/test_stream_events.py
Adds the Phase 9c corrective plan for top-level provider costs, cache-write double-count prevention, Responses SSE cost metadata, native streaming cost traces, and stream cost visibility policy.

Tests: not run (planning document only)
Preserves top-level provider-reported costs with nested usage, avoids OpenAI cache-write double counting, sums structured provider cost breakdowns, carries Responses SSE cost metadata through streamed completions, treats cost events as non-visible metadata, and adds native streaming usage/cost summary traces.

Tests: pytest tests/test_usage_accounting.py tests/test_usage_costs.py tests/test_usage_quota_snapshots.py tests/test_responses_streaming.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_stream_policy.py tests/test_native_provider_streaming.py tests/test_native_provider_executor.py tests/test_streaming_usage_accounting.py tests/test_executor_usage_accounting.py tests/test_responses_usage_accounting.py tests/test_native_usage_accounting.py
Adds request_cost_usd support for SSE cost comments and protocol usage, preserves top-level stream cost siblings in live streaming and Responses bridge paths, expands structured breakdown summing, treats scalar cost events as retry-safe metadata, and preserves earlier native stream cost when later token usage arrives.

Tests: pytest tests/test_usage_accounting.py tests/test_usage_costs.py tests/test_usage_quota_snapshots.py tests/test_responses_streaming.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_bridge.py tests/test_stream_policy.py tests/test_native_provider_streaming.py tests/test_native_provider_executor.py tests/test_streaming_usage_accounting.py tests/test_executor_usage_accounting.py tests/test_responses_usage_accounting.py tests/test_native_usage_accounting.py tests/test_protocol_openai_chat.py tests/test_protocol_responses.py
Merges top-level provider cost siblings for dict streaming chunks, broadens reference cost breakdown aliases, and lets later raw native token usage replace cost-only records while preserving earlier provider-reported cost.

Tests: pytest tests/test_usage_accounting.py tests/test_usage_costs.py tests/test_usage_quota_snapshots.py tests/test_responses_streaming.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_bridge.py tests/test_stream_policy.py tests/test_native_provider_streaming.py tests/test_native_provider_executor.py tests/test_streaming_usage_accounting.py tests/test_executor_usage_accounting.py tests/test_responses_usage_accounting.py tests/test_native_usage_accounting.py tests/test_protocol_openai_chat.py tests/test_protocol_responses.py
Converts cost_in_usd_ticks to USD before summing, preserves top-level estimated_cost across accounting and streaming/Responses paths, and adds regression coverage for estimated-cost cost metadata.

Tests: pytest tests/test_usage_accounting.py tests/test_usage_costs.py tests/test_usage_quota_snapshots.py tests/test_responses_streaming.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_bridge.py tests/test_stream_policy.py tests/test_native_provider_streaming.py tests/test_native_provider_executor.py tests/test_streaming_usage_accounting.py tests/test_executor_usage_accounting.py tests/test_responses_usage_accounting.py tests/test_native_usage_accounting.py tests/test_protocol_openai_chat.py tests/test_protocol_responses.py
@mirrobot-agent
Copy link
Copy Markdown
Contributor

Starting my review of the third-pass audit remediation changes — covering protocol output correctness, native operation guardrails, streaming hardening, Responses runtime improvements, cost accounting, routing parity, and much more across ~100 files. I'll analyze each subsystem and report back with a bundled review.

Copy link
Copy Markdown
Contributor

@mirrobot-agent mirrobot-agent Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall Assessment

This is a massive third-pass audit remediation that addresses findings from the explore and explore-heavy reviewers across all 10 phases of the experimental rewrite. The changeset spans ~9,000 lines across 100 files and is impressively well-scoped — each change maps back to a specific finding in third-pass-audit-findings.md.

The code quality is high. The streaming hardening (TTFB/stall timeouts, heartbeats, upstream close safety, CancelledError handling) is thorough and consistent across both the chat-completions StreamingHandler and the Responses stream_events path. The protocol output correctness fixes (OpenAI Chat/Responses usage formatting, legacy function_call handling, Ollama format_response, native operation guardrails) are all well-tested. The cost accounting improvements (provider-reported cost preservation, cache-write double-count fix, SSE cost comment parsing, structured cost breakdowns) are comprehensive and correctly prioritized.

Status of Previous Feedback

All 12 issues from the initial review and 5 suggestions from the follow-up have been addressed in this changeset:

  • service.py CancelledError: Now caught by the Responses finally block and the Anthropic wrapper's finally block.
  • Streaming adapter chain gap: The NativeProviderExecutor.stream() now runs stream-event adapter chains and extracts usage/cost.
  • Bare except Exception: pass: Replaced with structured error handling and _can_retry_stream_after_error with explicit emitted_output tracking.
  • Multi-turn tool use: _parent_output_to_messages now handles function_call and custom_tool_call items, and _message_to_chat handles tool role messages.
  • Auth header mismatch: ClaudeCodeProvider now supports auto/x-api-key/bearer modes via CLAUDE_CODE_AUTH_HEADER.
  • Hard litellm dependency: Not directly addressed in this diff, but the cost system now has provider-reported cost that bypasses litellm pricing — worth verifying this is tracked.

Key Suggestions

  • Model-scoped cooldown bypasses failure-history threshold — contradicts Phase 7c plan (see inline)
  • finally block recovery can raiseacquire_task.result() can replace the original exception (see inline)
  • Polling logic duplicationnext_upstream_chunk and acquire_upstream_stream share ~90% of their structure (see inline)
  • Disconnect task leak — if is_disconnected() raises, the task is never cleaned up (see inline)
  • Gemini cache-write double-count inconsistency — OpenAI path fixed but Gemini path not (see inline)
  • Private _save_to_disk coupling — fragile dependency on provider cache internals (see inline)

Architectural Notes

The new streaming runtime settings system (get_stream_runtime_settings()) is well-designed — lazy-loaded via _retry_settings() to avoid import cycles, with env overriding JSON. The routing attempt history (_append_routing_attempt_history) is a solid observability addition that correctly sanitizes error types and tracks timing. The native provider cross-protocol formatting (client_protocol_name on NativeProviderContext) is a clean solution to the "responses formatted in provider protocol" blocker.

One architectural observation: the Responses streaming path now has its own independent polling/timeout/heartbeat implementation rather than reusing the StreamingHandler infrastructure. This is understandable given the different event model (Responses events vs SSE chunks), but it means streaming policy changes must be applied in two places.

This review was generated by an AI assistant.

Comment on lines +113 to +120
if error_type in {"server_error", "api_connection"} and default_duration >= provider_cooldown_min_seconds:
backoff_level = 0
duration = int(default_duration)
if scope == "model":
return ProviderCooldownDecision(True, duration=duration, reason="model_capacity_cooldown", scope=scope, model=model, backoff_level=backoff_level)
if failure_history is None:
return ProviderCooldownDecision(False, reason="missing_failure_history", scope=scope, model=model if scope == "model" else None)
backoff = failure_history.backoff_for(provider=provider, error_type=error_type, scope=scope, model=model if scope == "model" else None, default_duration=duration)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hard early return for scope == "model" bypasses the failure-history threshold check entirely:

if scope == "model":
    return ProviderCooldownDecision(True, duration=duration, reason="model_capacity_cooldown", scope=scope, model=model, backoff_level=backoff_level)

This means a single server_error without retry_after will always start a model-scoped cooldown immediately, even if it's the first transient failure. The Phase 7c plan states: "No-retry_after server_error and api_connection should start cooldown only after FailureHistory.backoff_for() crosses the configured threshold." The provider-scoped path (below) correctly checks the threshold, but the model-scoped path does not.

If this is intentional (model-capacity errors are more actionable than generic transients), consider adding a comment explaining the policy difference. Otherwise, the model-scoped path should also check backoff_level > 0 before returning True.

Suggested change
if error_type in {"server_error", "api_connection"} and default_duration >= provider_cooldown_min_seconds:
backoff_level = 0
duration = int(default_duration)
if scope == "model":
return ProviderCooldownDecision(True, duration=duration, reason="model_capacity_cooldown", scope=scope, model=model, backoff_level=backoff_level)
if failure_history is None:
return ProviderCooldownDecision(False, reason="missing_failure_history", scope=scope, model=model if scope == "model" else None)
backoff = failure_history.backoff_for(provider=provider, error_type=error_type, scope=scope, model=model if scope == "model" else None, default_duration=duration)
if scope == "model":
if backoff_level <= 0 and failure_history is not None:
return ProviderCooldownDecision(False, reason="transient_backoff_threshold_not_met", scope=scope, model=model)
return ProviderCooldownDecision(True, duration=duration, reason="model_capacity_cooldown", scope=scope, model=model, backoff_level=backoff_level)

Comment on lines +570 to +580
yield ResponsesStreamEvent("response.failed", failed)
self._trace(transaction_logger, "stream_done_event", {"raw": "done"}, direction="stream", stage="final", metadata={"transport": transport, "failed": True})
yield ResponsesStreamEvent("done", {}, terminal=True)
finally:
if chat_stream is None and acquire_task is not None and acquire_task.done() and not acquire_task.cancelled():
try:
chat_stream = acquire_task.result()
stream_iterator = chat_stream.__aiter__()
except Exception:
chat_stream = None
await cancel_task(pending_next_task)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This recovery path in the finally block can itself raise an exception, replacing the original one:

if chat_stream is None and acquire_task is not None and acquire_task.done() and not acquire_task.cancelled():
    chat_stream = acquire_task.result()  # Can raise!

If acquire_task completed with a ResponsesServiceError (e.g., from a TTFB timeout), calling .result() will re-raise that error inside the finally block, masking whatever exception triggered the finally in the first place.

Suggested change
yield ResponsesStreamEvent("response.failed", failed)
self._trace(transaction_logger, "stream_done_event", {"raw": "done"}, direction="stream", stage="final", metadata={"transport": transport, "failed": True})
yield ResponsesStreamEvent("done", {}, terminal=True)
finally:
if chat_stream is None and acquire_task is not None and acquire_task.done() and not acquire_task.cancelled():
try:
chat_stream = acquire_task.result()
stream_iterator = chat_stream.__aiter__()
except Exception:
chat_stream = None
await cancel_task(pending_next_task)
if chat_stream is None and acquire_task is not None and acquire_task.done() and not acquire_task.cancelled():
try:
chat_stream = acquire_task.result()
stream_iterator = chat_stream.__aiter__()
except Exception:
chat_stream = None

Comment on lines +231 to +420
usage = None
item_started = False
monitor = StreamMonitor(clock=time.monotonic)
stream_settings = get_stream_runtime_settings()
chat_stream = None
stream_iterator = None
upstream_closed = False
pending_next_task = None
pending_next_started_at = None
pending_next_last_heartbeat_at = None
acquire_task = None
acquire_started_at = None
acquire_last_heartbeat_at = None
ttfb_started_at = time.monotonic()

async def cancel_task(task: Any) -> None:
"""Cancel and await an in-flight stream task before closing its source."""

if task is None or task.done():
return
task.cancel()
try:
await task
except (asyncio.CancelledError, StopAsyncIteration):
return
except Exception:
return

async def close_upstream(reason: str) -> None:
"""Best-effort close for upstream Responses bridge streams."""

nonlocal upstream_closed
if upstream_closed:
return
attempted = False
for candidate in (stream_iterator, chat_stream):
if candidate is None:
continue
attempted = True
try:
closer = getattr(candidate, "aclose", None)
if callable(closer):
await closer()
upstream_closed = True
self._trace(transaction_logger, "responses_stream_upstream_closed", {"reason": reason}, direction="stream", stage="provider", metadata={"transport": transport})
return
closer = getattr(candidate, "close", None)
if callable(closer):
closer()
upstream_closed = True
self._trace(transaction_logger, "responses_stream_upstream_closed", {"reason": reason}, direction="stream", stage="provider", metadata={"transport": transport})
return
except Exception as exc:
self._trace(transaction_logger, "responses_stream_upstream_close_failed", {"reason": reason, "error_type": type(exc).__name__}, direction="stream", stage="provider", metadata={"transport": transport})
continue
if attempted:
self._trace(transaction_logger, "responses_stream_upstream_close_failed", {"reason": reason, "error_type": "no_close_method"}, direction="stream", stage="provider", metadata={"transport": transport})

async def next_upstream_chunk(*, first: bool) -> tuple[str, Any]:
"""Return the next upstream chunk or a control marker."""

timeout = stream_settings.ttfb_timeout_seconds if first else stream_settings.stall_timeout_seconds
heartbeat = stream_settings.heartbeat_seconds
nonlocal pending_next_task, pending_next_started_at, pending_next_last_heartbeat_at
if pending_next_task is None:
pending_next_task = asyncio.create_task(stream_iterator.__anext__())
pending_next_started_at = time.monotonic()
pending_next_last_heartbeat_at = pending_next_started_at
next_task = pending_next_task
started_at = ttfb_started_at if first else (pending_next_started_at or time.monotonic())
while True:
if next_task.done():
pending_next_task = None
pending_next_started_at = None
pending_next_last_heartbeat_at = None
return "chunk", next_task.result()
if request is not None and await request.is_disconnected():
self._trace(transaction_logger, "responses_stream_disconnected", {"reason": "client_disconnected"}, direction="stream", stage="client", metadata={"transport": transport})
if stream_settings.cancel_upstream_on_disconnect:
await cancel_task(next_task)
pending_next_task = None
await close_upstream("client_disconnected")
return "disconnect", None
elapsed = time.monotonic() - started_at
waits = []
if timeout is not None:
remaining_timeout = timeout - elapsed
if remaining_timeout <= 0:
await cancel_task(next_task)
pending_next_task = None
await close_upstream("ttfb_timeout" if first else "stall_timeout")
raise ResponsesServiceError(
f"Responses stream {'TTFB' if first else 'stall'} timeout",
status_code=504,
error_type="api_connection",
)
waits.append(remaining_timeout)
if heartbeat is not None:
last_heartbeat_at = pending_next_last_heartbeat_at or started_at
remaining_heartbeat = heartbeat - (time.monotonic() - last_heartbeat_at)
if remaining_heartbeat <= 0:
pending_next_last_heartbeat_at = time.monotonic()
return "heartbeat", None
waits.append(remaining_heartbeat)
wait_timeout = min(waits) if waits else None
if wait_timeout is None:
chunk = await next_task
pending_next_task = None
pending_next_started_at = None
pending_next_last_heartbeat_at = None
return "chunk", chunk
done, _ = await asyncio.wait({next_task}, timeout=wait_timeout)
if done:
pending_next_task = None
pending_next_started_at = None
pending_next_last_heartbeat_at = None
return "chunk", next_task.result()
last_heartbeat_at = pending_next_last_heartbeat_at or started_at
if heartbeat is not None and time.monotonic() - last_heartbeat_at >= heartbeat:
pending_next_last_heartbeat_at = time.monotonic()
return "heartbeat", None

async def acquire_upstream_stream() -> tuple[str, Any]:
"""Acquire the upstream stream under the same TTFB/disconnect policy."""

nonlocal acquire_task, acquire_started_at, acquire_last_heartbeat_at
if acquire_task is None:
acquire_task = asyncio.create_task(client.acompletion(request=request, **chat_kwargs))
acquire_started_at = ttfb_started_at
acquire_last_heartbeat_at = acquire_started_at
task = acquire_task
started_at = acquire_started_at or time.monotonic()
timeout = stream_settings.ttfb_timeout_seconds
heartbeat = stream_settings.heartbeat_seconds
while True:
if task.done():
acquire_task = None
acquire_started_at = None
acquire_last_heartbeat_at = None
return "stream", task.result()
if request is not None and await request.is_disconnected():
self._trace(transaction_logger, "responses_stream_disconnected", {"reason": "client_disconnected", "phase": "acquire"}, direction="stream", stage="client", metadata={"transport": transport})
await cancel_task(task)
acquire_task = None
acquire_started_at = None
acquire_last_heartbeat_at = None
return "disconnect", None
waits = []
elapsed = time.monotonic() - started_at
if timeout is not None:
remaining_timeout = timeout - elapsed
if remaining_timeout <= 0:
await cancel_task(task)
acquire_task = None
acquire_started_at = None
acquire_last_heartbeat_at = None
raise ResponsesServiceError("Responses stream TTFB timeout", status_code=504, error_type="api_connection")
waits.append(remaining_timeout)
if heartbeat is not None:
last_heartbeat_at = acquire_last_heartbeat_at or started_at
remaining_heartbeat = heartbeat - (time.monotonic() - last_heartbeat_at)
if remaining_heartbeat <= 0:
acquire_last_heartbeat_at = time.monotonic()
return "heartbeat", None
waits.append(remaining_heartbeat)
wait_timeout = min(waits) if waits else None
if wait_timeout is None:
stream = await task
acquire_task = None
acquire_started_at = None
acquire_last_heartbeat_at = None
return "stream", stream
done, _ = await asyncio.wait({task}, timeout=wait_timeout)
if done:
acquire_task = None
acquire_started_at = None
acquire_last_heartbeat_at = None
return "stream", task.result()
last_heartbeat_at = acquire_last_heartbeat_at or started_at
if heartbeat is not None and time.monotonic() - last_heartbeat_at >= heartbeat:
acquire_last_heartbeat_at = time.monotonic()
return "heartbeat", None

if transaction_logger:
self._trace(
transaction_logger,
"stream_started",
{"event": StreamEvent("started", protocol="responses").to_dict(), "metrics": monitor.metrics.to_dict()},
direction="stream",
stage="client",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next_upstream_chunk (~90 lines) and acquire_upstream_stream (~55 lines) share nearly identical polling/disconnect/timeout/heartbeat logic. The structural pattern is the same: create task -> loop with asyncio.wait -> check disconnect -> check timeout -> emit heartbeat -> get result.

Any future change to the polling pattern (e.g., adding a new policy check) must be applied in both places. Consider extracting a shared _poll_upstream_async(task_factory, *, first: bool) helper that both call sites use, with the task factory being the only difference (stream_iterator.__anext__ vs client.acompletion).

Comment on lines +160 to +182
done, _ = await asyncio.wait(wait_tasks, timeout=wait_seconds)
if disconnect_task is not None:
if disconnect_task in done and disconnect_task.result():
stream_cancelled = True
next_task.cancel()
with contextlib.suppress(asyncio.CancelledError, StopAsyncIteration):
await next_task
await close_upstream("client_disconnect")
return
if not disconnect_task.done():
disconnect_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await disconnect_task
if next_task in done:
chunk = next_task.result()
break

timeout_error = _stream_timeout_error(monitor, stream_settings)
if timeout_error:
next_task.cancel()
with contextlib.suppress(asyncio.CancelledError, StopAsyncIteration):
await next_task
await close_upstream(timeout_error[0], force=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If request.is_disconnected() raises an unexpected exception, the disconnect_task created via asyncio.create_task(request.is_disconnected()) will never be awaited or cancelled. Python 3.9+ warns about unretrieved task exceptions, and Python 3.12+ logs Task was destroyed but it is pending! warnings.

Consider wrapping the disconnect task creation and cleanup in a try/finally pattern within the wait loop, or cancelling it explicitly in all exit paths (including the except Exception re-raise below).

Suggested change
done, _ = await asyncio.wait(wait_tasks, timeout=wait_seconds)
if disconnect_task is not None:
if disconnect_task in done and disconnect_task.result():
stream_cancelled = True
next_task.cancel()
with contextlib.suppress(asyncio.CancelledError, StopAsyncIteration):
await next_task
await close_upstream("client_disconnect")
return
if not disconnect_task.done():
disconnect_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await disconnect_task
if next_task in done:
chunk = next_task.result()
break
timeout_error = _stream_timeout_error(monitor, stream_settings)
if timeout_error:
next_task.cancel()
with contextlib.suppress(asyncio.CancelledError, StopAsyncIteration):
await next_task
await close_upstream(timeout_error[0], force=True)
disconnect_task = None
try:
if request is not None:
disconnect_task = asyncio.create_task(request.is_disconnected())
wait_tasks.add(disconnect_task)
done, _ = await asyncio.wait(wait_tasks, timeout=wait_seconds)
if disconnect_task is not None:
if disconnect_task in done and disconnect_task.result():
stream_cancelled = True
next_task.cancel()
with contextlib.suppress(asyncio.CancelledError, StopAsyncIteration):
await next_task
await close_upstream("client_disconnect")
return
if not disconnect_task.done():
disconnect_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await disconnect_task
finally:
if disconnect_task is not None and not disconnect_task.done():
disconnect_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await disconnect_task

Comment on lines +209 to +211
)
if reasoning and completion_tokens >= reasoning:
completion_tokens -= reasoning
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OpenAI-like path now correctly subtracts both cache_read and cache_write from prompt_tokens to avoid double-counting:

input_tokens = max(0, prompt_tokens - cache_read - cache_write)

However, _from_gemini_usage still only subtracts cache_read:

input_tokens=max(0, prompt_tokens - cache_read)

If a Gemini provider starts reporting cache_write_tokens (or cachedWriteContentTokens), those tokens would be double-counted in the normalized total. The Gemini protocol already parses cache_write_tokens from the response, so this is a latent consistency gap rather than a hypothetical.

Comment on lines +86 to +92

async def save(self, response: StoredResponse) -> None:
await self._cache.store_async(self._key(response.id), json.dumps(response.to_dict(), ensure_ascii=False))
flush = getattr(self._cache, "_save_to_disk", None)
if callable(flush):
await flush()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This directly accesses a private method on the provider cache:

flush = getattr(self._cache, "_save_to_disk", None)
if callable(flush):
    await flush()

If the provider cache implementation refactors _save_to_disk to a different name (e.g., _flush_to_disk, _persist), saves will silently stop flushing to disk, and the Responses store will appear to lose data between restarts.

Consider either: (a) adding a public flush() or sync() method to the cache interface that this code calls, or (b) adding a comment documenting the coupling so future maintainers know to update this call site.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Agent Monitored Monitored for AI Agent to review PR's and commits enhancement New feature or request Priority

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant