Experimental Native Protocol Rewrite#162
Conversation
Captures the experimental branch workflow, protocol architecture goals, transform logging requirements, field-cache rules, provider priorities, routing, retry, usage, streaming, and config direction. Documents that every phase must be freshly planned in conversation, written as planning docs, reviewed by explore and explore-heavy agents, and reported to the user without committing reports by default.
Introduces protocol-neutral request, response, stream event, content, tool, reasoning, usage, cost, and context dataclasses with JSON-safe serialization for future transform tracing. Adds the override-friendly ProtocolAdapter base and auto-discovery registry with alias handling, duplicate detection, shared stateless instances, and tests for serialization, default preservation, registration, aliases, and protocol errors. Tests: python -m pytest tests/test_protocol_registry.py
Adds the explicit LiteLLM fallback protocol marker and a native OpenAI Chat Completions adapter for request parsing/building, response parsing/formatting, usage and provider-reported cost extraction, reasoning preservation, tool calls, multimodal content blocks, and SSE chunk parsing. The adapter remains isolated from runtime execution and preserves unknown extension fields for future adapter, field-cache, and transform logging phases. Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py
Adds a native Anthropic Messages protocol adapter for request parsing/building, response formatting, stream event parsing, tool_use/tool_result blocks, thinking and redacted-thinking signature preservation, and cache usage normalization. The existing compatibility routes remain untouched; this adapter is an isolated base for later native provider execution, field-cache rules, and transform logging. Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py
Adds a native Gemini generateContent adapter for request parsing/building, response formatting, stream event parsing, content parts, function calls/responses, thought signatures, generation config, safety settings, tools, and Gemini usage metadata. The adapter preserves raw Gemini-native fields and remains isolated from runtime execution so provider migration can happen in later checkpoints. Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py
Adds a native Responses protocol adapter for request parsing/building, response formatting, event-stream parsing, previous_response_id preservation, input and output item handling, reasoning items, function calls, usage details, provider-reported costs, and a WebSocket-ready transport capability flag. Routes, storage, and runtime wiring remain deferred to later checkpoints; this commit only adds the reusable protocol base and tests. Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py tests/test_protocol_responses.py
Addresses Phase 1 review findings by treating raw payloads as provenance instead of stale formatting authority in native adapters, tightening registry alias/name collision handling, adding JSON-safe serialization fallbacks, and avoiding default reasoning-token double counting. Adds nested raw preservation for tool, result, and reasoning structures, exposes WebSocket as a future Responses transport seam rather than current formatting support, expands Gemini tool declaration parsing, and switches protocol tests to the public package import path through a local test path fixture. Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py tests/test_protocol_responses.py Tests: python -m pytest tests/test_session_tracking.py tests/test_selection_engine.py
Preserves Anthropic system block shape and metadata during rebuilds, keeps unknown Responses output items while still applying unified-message mutations, and groups Gemini multi-declaration tools back into their original native tool container. Adds tests for Anthropic system cache metadata, Responses future output-item preservation, and Gemini multi-declaration rebuild fidelity. Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py tests/test_protocol_responses.py Tests: python -m pytest tests/test_session_tracking.py tests/test_selection_engine.py
Adds the Phase 2 plan for additive transform-pass transaction logging, including trace entry shape, writer behavior, request/response/stream pass names, sanitization, TransactionLogger and ProviderLogger integration, tests, risks, and review checkpoints. The Phase 1 report remains uncommitted for user-facing review only.
Introduces transform trace entries, a local-sequence JSONL/snapshot writer, recursive key-based redaction, filesystem-safe snapshot names, and JSON-safe payload serialization for future protocol and adapter pass logging. The trace writer is observability-only and isolated from runtime transaction logging in this checkpoint. Tests: python -m pytest tests/test_transform_trace.py
Wires the transform trace writer into TransactionLogger and ProviderLogger while preserving legacy request, transformed request, response, streaming chunk, metadata, and provider log files. Adds trace entries for raw client requests, prepared provider requests, raw and parsed stream chunks, assembled stream responses, final client responses, provider request payloads, provider raw stream chunks, provider final responses, and provider errors. Includes transaction logger tests for legacy compatibility, redaction, equality-skipped transformed requests, provider traces, streaming wrapper traces, and disabled logging. Tests: python -m pytest tests/test_transform_trace.py tests/test_transaction_logger_transform_trace.py Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py tests/test_protocol_responses.py Tests: python -m pytest tests/test_session_tracking.py tests/test_selection_engine.py
Hardens Phase 2 tracing after review by adding request, session, scope, classifier, exact model, and credential correlation to trace entries where available. Expands redaction for cookies and credential-bearing headers, extracts structured fields from SDK-like objects before repr fallback, scrubs header-like secrets from provider error text, and adds a standardized transform_log_error helper. Prevents provider snapshot collisions by namespacing provider writer snapshots while keeping stream chunks in JSONL only. Tests: python -m pytest tests/test_transform_trace.py tests/test_transaction_logger_transform_trace.py Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py tests/test_protocol_responses.py Tests: python -m pytest tests/test_session_tracking.py tests/test_selection_engine.py
Adds the Phase 3 plan for the adapter registry, built-in adapter bases, field-cache rule schema, path engine, store abstractions, scoped key behavior, transform trace integration, tests, risks, and review checkpoints. Reports remain uncommitted for user-facing review only.
Adds the Phase 3 adapter foundation with an override-friendly async base adapter, adapter context, ordered chain runner, auto-discovered registry, aliases, duplicate collision checks, and built-in base adapters for no-op, model override, developer-role suppression, and reasoning content normalization. Runtime request execution is not wired to the adapter chain yet; this checkpoint keeps behavior unchanged while establishing the extension point for native protocols and providers. Tests: python -m pytest tests/test_adapter_registry.py
Adds field-cache rule and injection dataclasses, cache context scope values, default provider/model/classifier/session scoping, and a small JSON-path-like engine for extraction and predictable injection. The path helper supports dict keys, list indexes, wildcard extraction, tail indexes, missing-path no-ops, and explicit errors for malformed paths or wildcard injection. Tests: python -m pytest tests/test_field_cache_paths.py
Adds async field-cache stores, a ProviderCache-backed wrapper, scoped cache key construction, and the extraction/injection engine for last, all, turn-compatible, stream-event, and per-tool-call-validated rules. The engine copies payloads by default, isolates values by provider/model/session/classifier/credential scope, skips missing required session scope, and emits transform trace metadata when a transaction logger is supplied. Tests: python -m pytest tests/test_field_cache_engine.py tests/test_field_cache_paths.py
Adds the missing before_field_cache_extraction and before_field_cache_injection trace passes so field-cache operations now emit both before and after states. Adds trace-focused tests for adapter chains, field-cache extraction/injection, rule metadata, cache hits, mutation flags, and transform_log_error emission on failed injection. Tests: python -m pytest tests/test_field_cache_trace.py tests/test_field_cache_engine.py tests/test_adapter_registry.py
Adds optional provider declarations for native protocol name, ordered adapter names, adapter config, and field-cache rules, all defaulting to empty/no-op behavior so existing providers remain on the current execution path until they opt in. These methods are the Phase 3 bridge that later provider work will use to attach native protocols, adapter chains, and provider-specific field-cache rules per model. Tests: python -m pytest tests/test_provider_protocol_declarations.py tests/test_adapter_registry.py tests/test_field_cache_engine.py tests/test_field_cache_paths.py tests/test_field_cache_trace.py
Adds the planned field_rename adapter, fixes field-cache trace direction for stream-sourced request injection, caps trace sample values, and documents the current limits of turn/tool-cache modes. Expands coverage for credential/provider scope isolation, stream-sourced injection trace direction, large sample truncation, field_rename behavior, and plain provider no-op protocol defaults. Tests: python -m pytest tests/test_adapter_registry.py tests/test_field_cache_paths.py tests/test_field_cache_engine.py tests/test_field_cache_trace.py tests/test_provider_protocol_declarations.py Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py tests/test_protocol_responses.py tests/test_transform_trace.py tests/test_transaction_logger_transform_trace.py tests/test_session_tracking.py tests/test_selection_engine.py
Adds the Phase 4 plan for Responses routes, response storage, previous_response_id continuation, bridge execution through the current client path, HTTP SSE conversion, WebSocket extension seams, tests, risks, and review checkpoints. Reports remain uncommitted for user-facing review only.
Adds the Phase 4 Responses storage foundation with StoredResponse, local response ID generation, an in-memory store, and a ProviderCache-backed wrapper that accepts an injected cache instead of constructing one globally. The store supports save, get, delete, and input item listing, preserves JSON-safe response metadata for previous_response_id continuation, and avoids SQLite or new persistence dependencies. Tests: python -m pytest tests/test_responses_store.py
Adds the temporary Responses-to-chat bridge for Phase 4, converting parsed Responses requests into current chat-completions kwargs and converting chat-completion responses back into Responses objects. The bridge preserves previous_response_id metadata, parent response messages, tool definitions, generation parameters, and unsupported extension fields for trace/debugging until native provider execution is wired in later phases. Tests: python -m pytest tests/test_responses_bridge.py tests/test_responses_store.py
Adds the non-streaming Responses service around the protocol adapter, bridge, and response store with validation, previous_response_id loading, get/delete/input-items helpers, and transform trace passes. The service keeps Phase 4 runtime conservative by bridging through the existing chat completion client path while preserving response storage and lineage metadata for later native provider work. Tests: python -m pytest tests/test_responses_service.py tests/test_responses_bridge.py tests/test_responses_store.py
Adds FastAPI routes for POST /v1/responses, GET /v1/responses/{id}, DELETE /v1/responses/{id}, and GET /v1/responses/{id}/input_items using the Phase 4 ResponsesService.
The create route currently handles non-streaming requests through the bridge and returns a documented 501 for streaming until the SSE checkpoint lands next.
Tests: python -m pytest tests/test_responses_routes.py tests/test_responses_service.py tests/test_responses_bridge.py tests/test_responses_store.py
Adds Responses HTTP SSE formatting, chat-stream conversion, streamed response accumulation/storage, response.failed events on stream errors, and a WebSocket formatter seam that is explicit but not exposed as a runtime route. Updates POST /v1/responses to return text/event-stream for stream=true while preserving the existing non-stream route behavior. Tests: python -m pytest tests/test_responses_streaming.py tests/test_responses_routes.py tests/test_responses_service.py tests/test_responses_bridge.py tests/test_responses_store.py
Wires Responses routes into the transform trace logger when request logging is enabled, adds coverage for unsupported Responses fields preserved in bridge metadata, and strengthens streaming tests to assert SSE event order. Tests: python -m pytest tests/test_responses_store.py tests/test_responses_bridge.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_streaming.py Tests: python -m pytest tests/test_protocol_registry.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_gemini.py tests/test_protocol_responses.py tests/test_transform_trace.py tests/test_transaction_logger_transform_trace.py tests/test_adapter_registry.py tests/test_field_cache_paths.py tests/test_field_cache_engine.py tests/test_field_cache_trace.py tests/test_provider_protocol_declarations.py tests/test_session_tracking.py tests/test_selection_engine.py
Adds the Phase 5 plan for native provider execution, provider declarations, HTTP and streaming seams, priority provider order, Antigravity restoration constraints, Gemini CLI parity review, fallback policy, transform tracing, field-cache rules, tests, and review checkpoints. Reports remain uncommitted for user-facing review only.
Adds the Phase 5 native provider foundation with execution context, HTTP transport wrapper, and non-streaming executor that runs protocol selection, adapter chains, field-cache injection/extraction, provider HTTP calls, and transform tracing. The foundation is not wired into live request execution yet, preserving current provider behavior while giving priority provider work a testable native path. Tests: python -m pytest tests/test_native_provider_executor.py tests/test_responses_store.py tests/test_responses_bridge.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_streaming.py
Adds opt-in native provider streaming support with streaming-capable transport seam, raw chunk tracing, protocol stream parsing, field-cache stream extraction, formatted client stream events, and transform error logging. The streaming foundation remains isolated from live provider routing so existing providers keep current behavior while Phase 5 provider implementations gain a mocked native stream path. Tests: python -m pytest tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_responses_store.py tests/test_responses_bridge.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_streaming.py tests/test_protocol_responses.py tests/test_transform_trace.py tests/test_transaction_logger_transform_trace.py
Adds the first priority Phase 5 provider as an explicit native integration skeleton with Anthropic Messages protocol declaration, adapter config, thinking-signature field-cache rule, native header/endpoint helpers, and mock-friendly model discovery. This does not assume undocumented live behavior or wire the provider into the runtime native executor yet; it establishes a tested provider declaration path for later native routing. Tests: python -m pytest tests/test_claude_code_provider.py tests/test_provider_protocol_declarations.py tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_protocol_anthropic_messages.py tests/test_adapter_registry.py tests/test_field_cache_engine.py
Extends non-streaming response usage normalization to dict responses so final_client_response always reflects post-normalization usage, matching the Phase 2c trace contract. Tests: pytest tests/test_executor_usage_accounting.py tests/test_anthropic_transform_tracing.py tests/test_transaction_logger_transform_trace.py tests/test_responses_streaming.py tests/test_request_executor_stream_metrics.py
Adds the Phase 3c corrective plan for unified field-cache runtime sources and targets, native adapter trace safety, and credential-scope fail-closed behavior. Tests: not run (planning document only)
Executes unified request/response/stream-event field-cache sources, supports metadata and unified-request injection targets in native execution, suppresses unsafe generic native adapter traces, and fails closed on missing credential scope. Tests: pytest tests/test_field_cache_engine.py tests/test_field_cache_trace.py tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_adapter_registry.py tests/test_request_executor_native_routing.py tests/test_protocol_openai_chat.py tests/test_responses_streaming.py tests/test_transform_trace.py
Extracts request-source field-cache rules in native execution and redacts configured metadata-injection paths in native traces. Tests: pytest tests/test_field_cache_engine.py tests/test_field_cache_trace.py tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_adapter_registry.py tests/test_request_executor_native_routing.py tests/test_protocol_openai_chat.py tests/test_responses_streaming.py tests/test_transform_trace.py
Adds the Phase 4c corrective plan for configurable Responses storage, continuation lineage replay, and top-level route error bodies. Tests: not run (planning document only)
Adds configurable provider-cache-backed Responses storage, wires app startup to the configured store, replays parent input/output lineage for continuations, and returns top-level Responses error bodies. Tests: pytest tests/test_experimental_config.py tests/test_responses_store.py tests/test_responses_bridge.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_streaming.py tests/test_protocol_responses.py tests/test_responses_usage_accounting.py tests/test_stream_transport.py
Replays parent Responses function-call output items as chat tool calls during previous_response_id continuations and documents durable Responses store env vars. Tests: pytest tests/test_experimental_config.py tests/test_responses_store.py tests/test_responses_bridge.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_streaming.py tests/test_protocol_responses.py tests/test_responses_usage_accounting.py tests/test_stream_transport.py
Converts stored Responses function_call_output inputs into Chat tool messages during continuation lineage replay so tool-use continuations keep both calls and results. Tests: pytest tests/test_experimental_config.py tests/test_responses_store.py tests/test_responses_bridge.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_streaming.py tests/test_protocol_responses.py tests/test_responses_usage_accounting.py tests/test_stream_transport.py
Adds the Phase 5c corrective plan for client-protocol native responses, provider native contract hooks, Claude Code hardening, Antigravity alias preservation, and centralized native streaming fail-closed behavior. Tests: not run (planning document only)
Carries a client protocol through native execution so chat-completion routes receive OpenAI Chat responses from Anthropic, Responses, and Gemini native provider calls. Adds explicit provider native endpoint/header/operation hooks, Claude Code max_tokens/auth hardening, and safer Antigravity alias/thinking metadata handling. Tests: pytest tests/test_request_executor_native_routing.py tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_native_streaming_transport_seam.py tests/test_claude_code_provider.py tests/test_codex_provider.py tests/test_copilot_provider.py tests/test_antigravity_provider_restore.py tests/test_provider_protocol_declarations.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_responses.py tests/test_protocol_gemini.py
Applies Antigravity thinking aliases to upstream request behavior, honors provider native opt-out hooks in auto mode, and formats cross-protocol native stream events as client OpenAI Chat SSE instead of raw provider chunks. Tests: pytest tests/test_request_executor_native_routing.py tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_native_streaming_transport_seam.py tests/test_claude_code_provider.py tests/test_codex_provider.py tests/test_copilot_provider.py tests/test_antigravity_provider_restore.py tests/test_provider_protocol_declarations.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_responses.py tests/test_protocol_gemini.py
Uses the fail-closed native streaming support helper in live routing, extracts provider-native response cache fields before client-protocol formatting, and scopes Antigravity quota groups by model family instead of broad Gemini/Claude buckets. Tests: pytest tests/test_request_executor_native_routing.py tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_native_streaming_transport_seam.py tests/test_claude_code_provider.py tests/test_codex_provider.py tests/test_copilot_provider.py tests/test_antigravity_provider_restore.py tests/test_provider_protocol_declarations.py tests/test_protocol_openai_chat.py tests/test_protocol_anthropic_messages.py tests/test_protocol_responses.py tests/test_protocol_gemini.py
Removes ambiguous Gemini 3 Pro preview from reverse alias normalization so plain preview requests stay preview while explicit low/high aliases still carry thinking behavior. Tests: pytest tests/test_antigravity_provider_restore.py tests/test_request_executor_native_routing.py tests/test_native_provider_executor.py tests/test_native_provider_streaming.py tests/test_provider_protocol_declarations.py tests/test_protocol_openai_chat.py tests/test_protocol_gemini.py
Adds the Phase 6c corrective plan for stale fallback tests, group target promotion, streaming execution-mode parity, native config hard stops, route-error aliases, and target namespace adjustment. Tests: not run (planning document only)
Replaces stale fallback-group tests with current routing coverage, promotes requested models within fallback groups, aligns streaming execution-mode precedence with non-streaming behavior, hard-stops native config errors, expands structured route-error aliases, and rewrites target session namespaces during fallback cloning. Tests: pytest tests/test_fallback_groups.py tests/test_fallback_resolver.py tests/test_fallback_policy.py tests/test_fallback_attempt_runner.py tests/test_request_executor_fallback_groups.py tests/test_request_executor_fallback_error_summary.py tests/test_routing_config.py tests/test_config_routing_json.py tests/test_routing_attempts.py tests/test_request_builder_routing.py tests/test_request_executor_native_routing.py tests/test_streaming_fallback_policy.py tests/test_retry_policy.py tests/test_cooldown_activation.py
Makes streaming auto-native selection call the same provider opt-out hook as non-streaming execution and rewrites fallback session namespaces with the target usage scope instead of preserving the first target scope prefix. Tests: pytest tests/test_fallback_groups.py tests/test_fallback_resolver.py tests/test_fallback_policy.py tests/test_fallback_attempt_runner.py tests/test_request_executor_fallback_groups.py tests/test_request_executor_fallback_error_summary.py tests/test_routing_config.py tests/test_config_routing_json.py tests/test_routing_attempts.py tests/test_request_builder_routing.py tests/test_request_executor_native_routing.py tests/test_streaming_fallback_policy.py tests/test_retry_policy.py tests/test_cooldown_activation.py
Adds the Phase 7c corrective plan for cooldown budget fail-fast behavior, transient backoff thresholds, success reset, structured routing attempt history, and model-scoped cooldown isolation. Tests: not run (planning document only)
Fails fast when active cooldowns exceed the request budget, prevents single generic transients from starting provider-wide cooldowns, records skipped transient failures for bounded backoff, clears failure history on success, and populates sanitized routing attempt history. Tests: pytest tests/test_retry_policy.py tests/test_cooldown_activation.py tests/test_request_executor_fallback_groups.py tests/test_streaming_fallback_policy.py tests/test_fallback_policy.py tests/test_fallback_groups.py tests/test_fallback_resolver.py tests/test_request_executor_fallback_error_summary.py tests/test_routing_attempts.py tests/test_streaming_error_handler.py tests/test_streaming_usage_accounting.py tests/test_request_executor_stream_metrics.py
Adds the Phase 8c corrective plan for Responses runtime stream settings, heartbeat frames, upstream close behavior, timeout handling, disconnect handling, and Anthropic iterator close safety. Tests: not run (planning document only)
Applies stream runtime settings to Responses streaming, adds non-visible heartbeat formatting, enforces TTFB and stall timeout handling, closes upstream streams on timeout or disconnect, and closes Anthropic compatibility streams when only the iterator exposes close. Tests: pytest tests/test_responses_streaming.py tests/test_responses_routes.py tests/test_anthropic_transform_tracing.py tests/test_streaming_error_handler.py tests/test_streaming_usage_accounting.py tests/test_request_executor_stream_metrics.py tests/test_native_provider_streaming.py tests/test_native_streaming_transport_seam.py tests/test_stream_policy.py tests/test_stream_transport.py tests/test_config_stream_settings.py tests/test_stream_metrics.py tests/test_stream_events.py
Keeps pending upstream/acquisition tasks alive across heartbeat frames, enforces TTFB across acquisition and first-chunk waits, awaits pending task cancellation before upstream close, and adds combined heartbeat-timeout coverage. Tests: pytest tests/test_responses_streaming.py tests/test_responses_routes.py tests/test_anthropic_transform_tracing.py tests/test_streaming_error_handler.py tests/test_streaming_usage_accounting.py tests/test_request_executor_stream_metrics.py tests/test_native_provider_streaming.py tests/test_native_streaming_transport_seam.py tests/test_stream_policy.py tests/test_stream_transport.py tests/test_config_stream_settings.py tests/test_stream_metrics.py tests/test_stream_events.py
Prevents heartbeat-yielded upstream and acquisition tasks from being overwritten before their results are consumed, and cancels pending acquisition/read tasks during final stream cleanup before closing upstream. Tests: pytest tests/test_responses_streaming.py tests/test_responses_routes.py tests/test_anthropic_transform_tracing.py tests/test_streaming_error_handler.py tests/test_streaming_usage_accounting.py tests/test_request_executor_stream_metrics.py tests/test_native_provider_streaming.py tests/test_native_streaming_transport_seam.py tests/test_stream_policy.py tests/test_stream_transport.py tests/test_config_stream_settings.py tests/test_stream_metrics.py tests/test_stream_events.py
Uses one Responses TTFB deadline across stream acquisition and first chunk, and closes streams returned by completed acquisition tasks when the generator exits after a heartbeat. Tests: pytest tests/test_responses_streaming.py tests/test_responses_routes.py tests/test_anthropic_transform_tracing.py tests/test_streaming_error_handler.py tests/test_streaming_usage_accounting.py tests/test_request_executor_stream_metrics.py tests/test_native_provider_streaming.py tests/test_native_streaming_transport_seam.py tests/test_stream_policy.py tests/test_stream_transport.py tests/test_config_stream_settings.py tests/test_stream_metrics.py tests/test_stream_events.py
Adds the Phase 9c corrective plan for top-level provider costs, cache-write double-count prevention, Responses SSE cost metadata, native streaming cost traces, and stream cost visibility policy. Tests: not run (planning document only)
Preserves top-level provider-reported costs with nested usage, avoids OpenAI cache-write double counting, sums structured provider cost breakdowns, carries Responses SSE cost metadata through streamed completions, treats cost events as non-visible metadata, and adds native streaming usage/cost summary traces. Tests: pytest tests/test_usage_accounting.py tests/test_usage_costs.py tests/test_usage_quota_snapshots.py tests/test_responses_streaming.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_stream_policy.py tests/test_native_provider_streaming.py tests/test_native_provider_executor.py tests/test_streaming_usage_accounting.py tests/test_executor_usage_accounting.py tests/test_responses_usage_accounting.py tests/test_native_usage_accounting.py
Adds request_cost_usd support for SSE cost comments and protocol usage, preserves top-level stream cost siblings in live streaming and Responses bridge paths, expands structured breakdown summing, treats scalar cost events as retry-safe metadata, and preserves earlier native stream cost when later token usage arrives. Tests: pytest tests/test_usage_accounting.py tests/test_usage_costs.py tests/test_usage_quota_snapshots.py tests/test_responses_streaming.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_bridge.py tests/test_stream_policy.py tests/test_native_provider_streaming.py tests/test_native_provider_executor.py tests/test_streaming_usage_accounting.py tests/test_executor_usage_accounting.py tests/test_responses_usage_accounting.py tests/test_native_usage_accounting.py tests/test_protocol_openai_chat.py tests/test_protocol_responses.py
Merges top-level provider cost siblings for dict streaming chunks, broadens reference cost breakdown aliases, and lets later raw native token usage replace cost-only records while preserving earlier provider-reported cost. Tests: pytest tests/test_usage_accounting.py tests/test_usage_costs.py tests/test_usage_quota_snapshots.py tests/test_responses_streaming.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_bridge.py tests/test_stream_policy.py tests/test_native_provider_streaming.py tests/test_native_provider_executor.py tests/test_streaming_usage_accounting.py tests/test_executor_usage_accounting.py tests/test_responses_usage_accounting.py tests/test_native_usage_accounting.py tests/test_protocol_openai_chat.py tests/test_protocol_responses.py
Converts cost_in_usd_ticks to USD before summing, preserves top-level estimated_cost across accounting and streaming/Responses paths, and adds regression coverage for estimated-cost cost metadata. Tests: pytest tests/test_usage_accounting.py tests/test_usage_costs.py tests/test_usage_quota_snapshots.py tests/test_responses_streaming.py tests/test_responses_service.py tests/test_responses_routes.py tests/test_responses_bridge.py tests/test_stream_policy.py tests/test_native_provider_streaming.py tests/test_native_provider_executor.py tests/test_streaming_usage_accounting.py tests/test_executor_usage_accounting.py tests/test_responses_usage_accounting.py tests/test_native_usage_accounting.py tests/test_protocol_openai_chat.py tests/test_protocol_responses.py
|
Starting my review of the third-pass audit remediation changes — covering protocol output correctness, native operation guardrails, streaming hardening, Responses runtime improvements, cost accounting, routing parity, and much more across ~100 files. I'll analyze each subsystem and report back with a bundled review. |
There was a problem hiding this comment.
Overall Assessment
This is a massive third-pass audit remediation that addresses findings from the explore and explore-heavy reviewers across all 10 phases of the experimental rewrite. The changeset spans ~9,000 lines across 100 files and is impressively well-scoped — each change maps back to a specific finding in third-pass-audit-findings.md.
The code quality is high. The streaming hardening (TTFB/stall timeouts, heartbeats, upstream close safety, CancelledError handling) is thorough and consistent across both the chat-completions StreamingHandler and the Responses stream_events path. The protocol output correctness fixes (OpenAI Chat/Responses usage formatting, legacy function_call handling, Ollama format_response, native operation guardrails) are all well-tested. The cost accounting improvements (provider-reported cost preservation, cache-write double-count fix, SSE cost comment parsing, structured cost breakdowns) are comprehensive and correctly prioritized.
Status of Previous Feedback
All 12 issues from the initial review and 5 suggestions from the follow-up have been addressed in this changeset:
- service.py CancelledError: Now caught by the Responses
finallyblock and the Anthropic wrapper'sfinallyblock. - Streaming adapter chain gap: The
NativeProviderExecutor.stream()now runs stream-event adapter chains and extracts usage/cost. - Bare
except Exception: pass: Replaced with structured error handling and_can_retry_stream_after_errorwith explicitemitted_outputtracking. - Multi-turn tool use:
_parent_output_to_messagesnow handlesfunction_callandcustom_tool_callitems, and_message_to_chathandlestoolrole messages. - Auth header mismatch:
ClaudeCodeProvidernow supportsauto/x-api-key/bearermodes viaCLAUDE_CODE_AUTH_HEADER. - Hard litellm dependency: Not directly addressed in this diff, but the cost system now has provider-reported cost that bypasses litellm pricing — worth verifying this is tracked.
Key Suggestions
- Model-scoped cooldown bypasses failure-history threshold — contradicts Phase 7c plan (see inline)
finallyblock recovery can raise —acquire_task.result()can replace the original exception (see inline)- Polling logic duplication —
next_upstream_chunkandacquire_upstream_streamshare ~90% of their structure (see inline) - Disconnect task leak — if
is_disconnected()raises, the task is never cleaned up (see inline) - Gemini cache-write double-count inconsistency — OpenAI path fixed but Gemini path not (see inline)
- Private
_save_to_diskcoupling — fragile dependency on provider cache internals (see inline)
Architectural Notes
The new streaming runtime settings system (get_stream_runtime_settings()) is well-designed — lazy-loaded via _retry_settings() to avoid import cycles, with env overriding JSON. The routing attempt history (_append_routing_attempt_history) is a solid observability addition that correctly sanitizes error types and tracks timing. The native provider cross-protocol formatting (client_protocol_name on NativeProviderContext) is a clean solution to the "responses formatted in provider protocol" blocker.
One architectural observation: the Responses streaming path now has its own independent polling/timeout/heartbeat implementation rather than reusing the StreamingHandler infrastructure. This is understandable given the different event model (Responses events vs SSE chunks), but it means streaming policy changes must be applied in two places.
This review was generated by an AI assistant.
| if error_type in {"server_error", "api_connection"} and default_duration >= provider_cooldown_min_seconds: | ||
| backoff_level = 0 | ||
| duration = int(default_duration) | ||
| if scope == "model": | ||
| return ProviderCooldownDecision(True, duration=duration, reason="model_capacity_cooldown", scope=scope, model=model, backoff_level=backoff_level) | ||
| if failure_history is None: | ||
| return ProviderCooldownDecision(False, reason="missing_failure_history", scope=scope, model=model if scope == "model" else None) | ||
| backoff = failure_history.backoff_for(provider=provider, error_type=error_type, scope=scope, model=model if scope == "model" else None, default_duration=duration) |
There was a problem hiding this comment.
This hard early return for scope == "model" bypasses the failure-history threshold check entirely:
if scope == "model":
return ProviderCooldownDecision(True, duration=duration, reason="model_capacity_cooldown", scope=scope, model=model, backoff_level=backoff_level)This means a single server_error without retry_after will always start a model-scoped cooldown immediately, even if it's the first transient failure. The Phase 7c plan states: "No-retry_after server_error and api_connection should start cooldown only after FailureHistory.backoff_for() crosses the configured threshold." The provider-scoped path (below) correctly checks the threshold, but the model-scoped path does not.
If this is intentional (model-capacity errors are more actionable than generic transients), consider adding a comment explaining the policy difference. Otherwise, the model-scoped path should also check backoff_level > 0 before returning True.
| if error_type in {"server_error", "api_connection"} and default_duration >= provider_cooldown_min_seconds: | |
| backoff_level = 0 | |
| duration = int(default_duration) | |
| if scope == "model": | |
| return ProviderCooldownDecision(True, duration=duration, reason="model_capacity_cooldown", scope=scope, model=model, backoff_level=backoff_level) | |
| if failure_history is None: | |
| return ProviderCooldownDecision(False, reason="missing_failure_history", scope=scope, model=model if scope == "model" else None) | |
| backoff = failure_history.backoff_for(provider=provider, error_type=error_type, scope=scope, model=model if scope == "model" else None, default_duration=duration) | |
| if scope == "model": | |
| if backoff_level <= 0 and failure_history is not None: | |
| return ProviderCooldownDecision(False, reason="transient_backoff_threshold_not_met", scope=scope, model=model) | |
| return ProviderCooldownDecision(True, duration=duration, reason="model_capacity_cooldown", scope=scope, model=model, backoff_level=backoff_level) |
| yield ResponsesStreamEvent("response.failed", failed) | ||
| self._trace(transaction_logger, "stream_done_event", {"raw": "done"}, direction="stream", stage="final", metadata={"transport": transport, "failed": True}) | ||
| yield ResponsesStreamEvent("done", {}, terminal=True) | ||
| finally: | ||
| if chat_stream is None and acquire_task is not None and acquire_task.done() and not acquire_task.cancelled(): | ||
| try: | ||
| chat_stream = acquire_task.result() | ||
| stream_iterator = chat_stream.__aiter__() | ||
| except Exception: | ||
| chat_stream = None | ||
| await cancel_task(pending_next_task) |
There was a problem hiding this comment.
This recovery path in the finally block can itself raise an exception, replacing the original one:
if chat_stream is None and acquire_task is not None and acquire_task.done() and not acquire_task.cancelled():
chat_stream = acquire_task.result() # Can raise!If acquire_task completed with a ResponsesServiceError (e.g., from a TTFB timeout), calling .result() will re-raise that error inside the finally block, masking whatever exception triggered the finally in the first place.
| yield ResponsesStreamEvent("response.failed", failed) | |
| self._trace(transaction_logger, "stream_done_event", {"raw": "done"}, direction="stream", stage="final", metadata={"transport": transport, "failed": True}) | |
| yield ResponsesStreamEvent("done", {}, terminal=True) | |
| finally: | |
| if chat_stream is None and acquire_task is not None and acquire_task.done() and not acquire_task.cancelled(): | |
| try: | |
| chat_stream = acquire_task.result() | |
| stream_iterator = chat_stream.__aiter__() | |
| except Exception: | |
| chat_stream = None | |
| await cancel_task(pending_next_task) | |
| if chat_stream is None and acquire_task is not None and acquire_task.done() and not acquire_task.cancelled(): | |
| try: | |
| chat_stream = acquire_task.result() | |
| stream_iterator = chat_stream.__aiter__() | |
| except Exception: | |
| chat_stream = None |
| usage = None | ||
| item_started = False | ||
| monitor = StreamMonitor(clock=time.monotonic) | ||
| stream_settings = get_stream_runtime_settings() | ||
| chat_stream = None | ||
| stream_iterator = None | ||
| upstream_closed = False | ||
| pending_next_task = None | ||
| pending_next_started_at = None | ||
| pending_next_last_heartbeat_at = None | ||
| acquire_task = None | ||
| acquire_started_at = None | ||
| acquire_last_heartbeat_at = None | ||
| ttfb_started_at = time.monotonic() | ||
|
|
||
| async def cancel_task(task: Any) -> None: | ||
| """Cancel and await an in-flight stream task before closing its source.""" | ||
|
|
||
| if task is None or task.done(): | ||
| return | ||
| task.cancel() | ||
| try: | ||
| await task | ||
| except (asyncio.CancelledError, StopAsyncIteration): | ||
| return | ||
| except Exception: | ||
| return | ||
|
|
||
| async def close_upstream(reason: str) -> None: | ||
| """Best-effort close for upstream Responses bridge streams.""" | ||
|
|
||
| nonlocal upstream_closed | ||
| if upstream_closed: | ||
| return | ||
| attempted = False | ||
| for candidate in (stream_iterator, chat_stream): | ||
| if candidate is None: | ||
| continue | ||
| attempted = True | ||
| try: | ||
| closer = getattr(candidate, "aclose", None) | ||
| if callable(closer): | ||
| await closer() | ||
| upstream_closed = True | ||
| self._trace(transaction_logger, "responses_stream_upstream_closed", {"reason": reason}, direction="stream", stage="provider", metadata={"transport": transport}) | ||
| return | ||
| closer = getattr(candidate, "close", None) | ||
| if callable(closer): | ||
| closer() | ||
| upstream_closed = True | ||
| self._trace(transaction_logger, "responses_stream_upstream_closed", {"reason": reason}, direction="stream", stage="provider", metadata={"transport": transport}) | ||
| return | ||
| except Exception as exc: | ||
| self._trace(transaction_logger, "responses_stream_upstream_close_failed", {"reason": reason, "error_type": type(exc).__name__}, direction="stream", stage="provider", metadata={"transport": transport}) | ||
| continue | ||
| if attempted: | ||
| self._trace(transaction_logger, "responses_stream_upstream_close_failed", {"reason": reason, "error_type": "no_close_method"}, direction="stream", stage="provider", metadata={"transport": transport}) | ||
|
|
||
| async def next_upstream_chunk(*, first: bool) -> tuple[str, Any]: | ||
| """Return the next upstream chunk or a control marker.""" | ||
|
|
||
| timeout = stream_settings.ttfb_timeout_seconds if first else stream_settings.stall_timeout_seconds | ||
| heartbeat = stream_settings.heartbeat_seconds | ||
| nonlocal pending_next_task, pending_next_started_at, pending_next_last_heartbeat_at | ||
| if pending_next_task is None: | ||
| pending_next_task = asyncio.create_task(stream_iterator.__anext__()) | ||
| pending_next_started_at = time.monotonic() | ||
| pending_next_last_heartbeat_at = pending_next_started_at | ||
| next_task = pending_next_task | ||
| started_at = ttfb_started_at if first else (pending_next_started_at or time.monotonic()) | ||
| while True: | ||
| if next_task.done(): | ||
| pending_next_task = None | ||
| pending_next_started_at = None | ||
| pending_next_last_heartbeat_at = None | ||
| return "chunk", next_task.result() | ||
| if request is not None and await request.is_disconnected(): | ||
| self._trace(transaction_logger, "responses_stream_disconnected", {"reason": "client_disconnected"}, direction="stream", stage="client", metadata={"transport": transport}) | ||
| if stream_settings.cancel_upstream_on_disconnect: | ||
| await cancel_task(next_task) | ||
| pending_next_task = None | ||
| await close_upstream("client_disconnected") | ||
| return "disconnect", None | ||
| elapsed = time.monotonic() - started_at | ||
| waits = [] | ||
| if timeout is not None: | ||
| remaining_timeout = timeout - elapsed | ||
| if remaining_timeout <= 0: | ||
| await cancel_task(next_task) | ||
| pending_next_task = None | ||
| await close_upstream("ttfb_timeout" if first else "stall_timeout") | ||
| raise ResponsesServiceError( | ||
| f"Responses stream {'TTFB' if first else 'stall'} timeout", | ||
| status_code=504, | ||
| error_type="api_connection", | ||
| ) | ||
| waits.append(remaining_timeout) | ||
| if heartbeat is not None: | ||
| last_heartbeat_at = pending_next_last_heartbeat_at or started_at | ||
| remaining_heartbeat = heartbeat - (time.monotonic() - last_heartbeat_at) | ||
| if remaining_heartbeat <= 0: | ||
| pending_next_last_heartbeat_at = time.monotonic() | ||
| return "heartbeat", None | ||
| waits.append(remaining_heartbeat) | ||
| wait_timeout = min(waits) if waits else None | ||
| if wait_timeout is None: | ||
| chunk = await next_task | ||
| pending_next_task = None | ||
| pending_next_started_at = None | ||
| pending_next_last_heartbeat_at = None | ||
| return "chunk", chunk | ||
| done, _ = await asyncio.wait({next_task}, timeout=wait_timeout) | ||
| if done: | ||
| pending_next_task = None | ||
| pending_next_started_at = None | ||
| pending_next_last_heartbeat_at = None | ||
| return "chunk", next_task.result() | ||
| last_heartbeat_at = pending_next_last_heartbeat_at or started_at | ||
| if heartbeat is not None and time.monotonic() - last_heartbeat_at >= heartbeat: | ||
| pending_next_last_heartbeat_at = time.monotonic() | ||
| return "heartbeat", None | ||
|
|
||
| async def acquire_upstream_stream() -> tuple[str, Any]: | ||
| """Acquire the upstream stream under the same TTFB/disconnect policy.""" | ||
|
|
||
| nonlocal acquire_task, acquire_started_at, acquire_last_heartbeat_at | ||
| if acquire_task is None: | ||
| acquire_task = asyncio.create_task(client.acompletion(request=request, **chat_kwargs)) | ||
| acquire_started_at = ttfb_started_at | ||
| acquire_last_heartbeat_at = acquire_started_at | ||
| task = acquire_task | ||
| started_at = acquire_started_at or time.monotonic() | ||
| timeout = stream_settings.ttfb_timeout_seconds | ||
| heartbeat = stream_settings.heartbeat_seconds | ||
| while True: | ||
| if task.done(): | ||
| acquire_task = None | ||
| acquire_started_at = None | ||
| acquire_last_heartbeat_at = None | ||
| return "stream", task.result() | ||
| if request is not None and await request.is_disconnected(): | ||
| self._trace(transaction_logger, "responses_stream_disconnected", {"reason": "client_disconnected", "phase": "acquire"}, direction="stream", stage="client", metadata={"transport": transport}) | ||
| await cancel_task(task) | ||
| acquire_task = None | ||
| acquire_started_at = None | ||
| acquire_last_heartbeat_at = None | ||
| return "disconnect", None | ||
| waits = [] | ||
| elapsed = time.monotonic() - started_at | ||
| if timeout is not None: | ||
| remaining_timeout = timeout - elapsed | ||
| if remaining_timeout <= 0: | ||
| await cancel_task(task) | ||
| acquire_task = None | ||
| acquire_started_at = None | ||
| acquire_last_heartbeat_at = None | ||
| raise ResponsesServiceError("Responses stream TTFB timeout", status_code=504, error_type="api_connection") | ||
| waits.append(remaining_timeout) | ||
| if heartbeat is not None: | ||
| last_heartbeat_at = acquire_last_heartbeat_at or started_at | ||
| remaining_heartbeat = heartbeat - (time.monotonic() - last_heartbeat_at) | ||
| if remaining_heartbeat <= 0: | ||
| acquire_last_heartbeat_at = time.monotonic() | ||
| return "heartbeat", None | ||
| waits.append(remaining_heartbeat) | ||
| wait_timeout = min(waits) if waits else None | ||
| if wait_timeout is None: | ||
| stream = await task | ||
| acquire_task = None | ||
| acquire_started_at = None | ||
| acquire_last_heartbeat_at = None | ||
| return "stream", stream | ||
| done, _ = await asyncio.wait({task}, timeout=wait_timeout) | ||
| if done: | ||
| acquire_task = None | ||
| acquire_started_at = None | ||
| acquire_last_heartbeat_at = None | ||
| return "stream", task.result() | ||
| last_heartbeat_at = acquire_last_heartbeat_at or started_at | ||
| if heartbeat is not None and time.monotonic() - last_heartbeat_at >= heartbeat: | ||
| acquire_last_heartbeat_at = time.monotonic() | ||
| return "heartbeat", None | ||
|
|
||
| if transaction_logger: | ||
| self._trace( | ||
| transaction_logger, | ||
| "stream_started", | ||
| {"event": StreamEvent("started", protocol="responses").to_dict(), "metrics": monitor.metrics.to_dict()}, | ||
| direction="stream", | ||
| stage="client", |
There was a problem hiding this comment.
next_upstream_chunk (~90 lines) and acquire_upstream_stream (~55 lines) share nearly identical polling/disconnect/timeout/heartbeat logic. The structural pattern is the same: create task -> loop with asyncio.wait -> check disconnect -> check timeout -> emit heartbeat -> get result.
Any future change to the polling pattern (e.g., adding a new policy check) must be applied in both places. Consider extracting a shared _poll_upstream_async(task_factory, *, first: bool) helper that both call sites use, with the task factory being the only difference (stream_iterator.__anext__ vs client.acompletion).
| done, _ = await asyncio.wait(wait_tasks, timeout=wait_seconds) | ||
| if disconnect_task is not None: | ||
| if disconnect_task in done and disconnect_task.result(): | ||
| stream_cancelled = True | ||
| next_task.cancel() | ||
| with contextlib.suppress(asyncio.CancelledError, StopAsyncIteration): | ||
| await next_task | ||
| await close_upstream("client_disconnect") | ||
| return | ||
| if not disconnect_task.done(): | ||
| disconnect_task.cancel() | ||
| with contextlib.suppress(asyncio.CancelledError): | ||
| await disconnect_task | ||
| if next_task in done: | ||
| chunk = next_task.result() | ||
| break | ||
|
|
||
| timeout_error = _stream_timeout_error(monitor, stream_settings) | ||
| if timeout_error: | ||
| next_task.cancel() | ||
| with contextlib.suppress(asyncio.CancelledError, StopAsyncIteration): | ||
| await next_task | ||
| await close_upstream(timeout_error[0], force=True) |
There was a problem hiding this comment.
If request.is_disconnected() raises an unexpected exception, the disconnect_task created via asyncio.create_task(request.is_disconnected()) will never be awaited or cancelled. Python 3.9+ warns about unretrieved task exceptions, and Python 3.12+ logs Task was destroyed but it is pending! warnings.
Consider wrapping the disconnect task creation and cleanup in a try/finally pattern within the wait loop, or cancelling it explicitly in all exit paths (including the except Exception re-raise below).
| done, _ = await asyncio.wait(wait_tasks, timeout=wait_seconds) | |
| if disconnect_task is not None: | |
| if disconnect_task in done and disconnect_task.result(): | |
| stream_cancelled = True | |
| next_task.cancel() | |
| with contextlib.suppress(asyncio.CancelledError, StopAsyncIteration): | |
| await next_task | |
| await close_upstream("client_disconnect") | |
| return | |
| if not disconnect_task.done(): | |
| disconnect_task.cancel() | |
| with contextlib.suppress(asyncio.CancelledError): | |
| await disconnect_task | |
| if next_task in done: | |
| chunk = next_task.result() | |
| break | |
| timeout_error = _stream_timeout_error(monitor, stream_settings) | |
| if timeout_error: | |
| next_task.cancel() | |
| with contextlib.suppress(asyncio.CancelledError, StopAsyncIteration): | |
| await next_task | |
| await close_upstream(timeout_error[0], force=True) | |
| disconnect_task = None | |
| try: | |
| if request is not None: | |
| disconnect_task = asyncio.create_task(request.is_disconnected()) | |
| wait_tasks.add(disconnect_task) | |
| done, _ = await asyncio.wait(wait_tasks, timeout=wait_seconds) | |
| if disconnect_task is not None: | |
| if disconnect_task in done and disconnect_task.result(): | |
| stream_cancelled = True | |
| next_task.cancel() | |
| with contextlib.suppress(asyncio.CancelledError, StopAsyncIteration): | |
| await next_task | |
| await close_upstream("client_disconnect") | |
| return | |
| if not disconnect_task.done(): | |
| disconnect_task.cancel() | |
| with contextlib.suppress(asyncio.CancelledError): | |
| await disconnect_task | |
| finally: | |
| if disconnect_task is not None and not disconnect_task.done(): | |
| disconnect_task.cancel() | |
| with contextlib.suppress(asyncio.CancelledError): | |
| await disconnect_task |
| ) | ||
| if reasoning and completion_tokens >= reasoning: | ||
| completion_tokens -= reasoning |
There was a problem hiding this comment.
The OpenAI-like path now correctly subtracts both cache_read and cache_write from prompt_tokens to avoid double-counting:
input_tokens = max(0, prompt_tokens - cache_read - cache_write)However, _from_gemini_usage still only subtracts cache_read:
input_tokens=max(0, prompt_tokens - cache_read)If a Gemini provider starts reporting cache_write_tokens (or cachedWriteContentTokens), those tokens would be double-counted in the normalized total. The Gemini protocol already parses cache_write_tokens from the response, so this is a latent consistency gap rather than a hypothetical.
|
|
||
| async def save(self, response: StoredResponse) -> None: | ||
| await self._cache.store_async(self._key(response.id), json.dumps(response.to_dict(), ensure_ascii=False)) | ||
| flush = getattr(self._cache, "_save_to_disk", None) | ||
| if callable(flush): | ||
| await flush() | ||
|
|
There was a problem hiding this comment.
This directly accesses a private method on the provider cache:
flush = getattr(self._cache, "_save_to_disk", None)
if callable(flush):
await flush()If the provider cache implementation refactors _save_to_disk to a different name (e.g., _flush_to_disk, _persist), saves will silently stop flushing to disk, and the Responses store will appear to lose data between restarts.
Consider either: (a) adding a public flush() or sync() method to the cache interface that this code calls, or (b) adding a comment documenting the coupling so future maintainers know to update this call site.
Experimental Native Protocol Roadmap
This branch is for a long-running experimental rewrite that makes native protocol support the first-class extension point of
rotator_library, while preserving the existing credential rotation, quota, fair-cycle, session tracking, and provider plugin strengths.Operating Rules
experimentalbranch.C:\Projects\test\LLM-API-Key-Proxyand child paths.docs/experimental/are committed.docs/experimental/phase-N-*.md.exploreandexplore-heavyagents to review the work against the phase plan, external reference areas, and current proxy behavior. Fix findings and re-review as needed.Strategic Goal
The target architecture is:
Providers should be able to declare an existing protocol and only override the parts that are genuinely provider-specific. A custom provider should usually be configurable through protocol choice, adapters, field-cache rules, auth strategy, and model options rather than requiring a large bespoke provider implementation.
Priority Order
.envand optional JSON. No SQLite dependency for now.Non-Goals For This Branch
UsageManager, fair-cycle, custom caps, or evidence-basedSessionTracker.Current Strengths To Preserve
Reference Gateway Ideas To Import Carefully
Phase Index
Each phase may be subdivided if implementation scope becomes too large.
Completeness Matrix
This matrix exists so the branch does not lose any requested scope while phases evolve. The phase plans are still refreshed before implementation, but every item below must remain accounted for.
litellm_fallbackprotocol path; later providers should prefer native protocols and use LiteLLM only for unsupported coverage.previous_response_id, storage, SSE, and WebSocket-ready transport shape.last,all,last_user_turn,last_assistant_turn, andper_tool_call.src/rotator_library/providers/_retired/.exploreandexplore-heavy.06-phase-workflow.mdsays planning docs are committed, but phase reports are not committed by default.Code Quality Expectations