Skip to content

Latest commit

 

History

History
719 lines (528 loc) · 26 KB

File metadata and controls

719 lines (528 loc) · 26 KB

Typed Data Contracts

Introduction

This document describes the canonical typed data contracts used for cross-layer and cross-domain data exchange in the LLM Interactive Proxy. These contracts provide strict typing for data flowing between transport, core services, and connector layers, reducing reliance on ad hoc dict[str, Any] and Any types.

Purpose

The typed data contracts system ensures:

  • Type Safety: Cross-layer boundaries use explicit, typed contracts instead of Any or unconstrained dictionaries
  • Maintainability: Clear data shapes make the codebase easier to understand and modify
  • Debuggability: Explicit contracts improve error messages and debugging workflows
  • Consistency: Single canonical representation per concept reduces conversion overhead

Scope

This guidance applies to:

  • Cross-layer boundaries: Transport ↔ Core ↔ Connector interfaces
  • Cross-domain boundaries: Routing, failover, usage tracking, capture, and connector interfaces
  • Boundary conversion points: Explicit points where data representation changes

This guidance does not apply to:

  • Internal implementation details within a single layer
  • Test utilities and fixtures (though tests should use canonical contracts when testing boundaries)
  • Legacy compatibility shims (documented exceptions)

Canonical Contract Set v1

The following contracts are the canonical representations used for cross-layer data exchange. These are the source of truth for their respective concepts.

Request Payloads

CanonicalChatRequest

Location: src/core/domain/chat.py

Purpose: Canonical representation of chat completion requests flowing through the core processing pipeline.

Key Fields:

  • model: str - Model identifier
  • messages: list[ChatMessage] - Conversation messages
  • temperature: float | None - Sampling temperature
  • max_completion_tokens: int | None - Token limit
  • tools: list[dict[str, Any]] | None - Tool definitions
  • extra_body: dict[str, Any] | None - Protocol-specific extensions

Usage:

  • Controllers convert inbound HTTP payloads to CanonicalChatRequest before invoking core services
  • Core services and connectors receive CanonicalChatRequest as input
  • All protocol-specific request formats normalize to this canonical shape

Alias: ChatRequest is a compatibility alias for CanonicalChatRequest.

Request Context

RequestContext

Location: src/core/domain/request_context.py

Purpose: Transport-agnostic request context carrying cross-layer metadata and correlation identifiers.

Key Fields:

  • domain_request: CanonicalChatRequest | None - Canonical request payload
  • raw_body: bytes | None - Raw HTTP body bytes (for capture)
  • backend: str | None - Resolved backend identifier
  • effective_model: str | None - Effective model after resolution
  • extensions: dict[str, JsonValue] - Single extension container (see Extension-Field Policy)
  • session_id: str | None - Session correlation identifier
  • request_id: str | None - Request correlation identifier
  • original_domain_request: CanonicalChatRequest | None - Original request before mutations (provenance)

Usage:

  • Populated by transport adapters during request adaptation
  • Passed through core services for session resolution, routing, and capture
  • Preserves original request for debugging and accounting

Important: All cross-layer context data must use explicit typed fields. Dynamic attribute assignment (context.attr = value) is not allowed in boundary code.

Backend Routing

BackendTarget

Location: src/core/domain/backend_target.py

Purpose: Canonical contract for resolved backend target (backend + model + URI parameters).

Key Fields:

  • backend: str - Backend identifier (e.g., "openai", "anthropic", "gemini")
  • model: str - Model identifier (e.g., "gpt-4", "claude-3-5-sonnet")
  • uri_params: dict[str, JsonValue] - URI parameters extracted from model string

Usage:

  • Output of backend model resolver
  • Input to backend completion flow
  • Handoff between routing and completion orchestration

Compatibility: Provides from_resolved_target() and to_resolved_target() for migration from legacy ResolvedTarget NamedTuple.

Usage and Metrics

UsageSummary

Location: src/core/domain/usage_summary.py

Purpose: Canonical contract for token usage and provider-specific usage metadata.

Key Fields:

  • prompt_tokens: int | None - Prompt token count
  • completion_tokens: int | None - Completion token count
  • total_tokens: int | None - Total token count
  • extensions: dict[str, JsonValue] - Provider-specific usage details

Usage:

  • Returned by connectors in response metadata
  • Recorded in wire capture and usage tracking
  • Merged across multiple backend calls (failover scenarios)

Compatibility: Provides from_dict() for parsing provider API responses.

Response Envelopes

ResponseEnvelope

Location: src/core/domain/responses.py

Purpose: Transport-agnostic container for non-streaming responses.

Key Fields:

  • content: dict[str, Any] | str | bytes | None - Response content (JSON dict, string, bytes, or None)
  • usage: UsageSummary | None - Token usage summary
  • metadata: dict[str, JsonValue] | None - Response metadata
  • headers: dict[str, str] | None - HTTP headers
  • status_code: int - HTTP status code
  • media_type: str - Content type

Usage:

  • Returned by connectors to core services
  • Adapted by transport layer to HTTP responses
  • Captured in wire capture for replay

Note: content field narrowed from Any to dict[str, Any] | str | bytes | None in Phase B+. This provides type safety while maintaining flexibility for the known response types used across the codebase.

StreamingResponseEnvelope

Location: src/core/domain/responses.py

Purpose: Transport-agnostic container for streaming responses.

Key Fields:

  • content: AsyncIterator[ProcessedResponse] | None - Stream iterator
  • metadata: dict[str, JsonValue] | None - Response metadata
  • headers: dict[str, str] | None - HTTP headers
  • cancel_callback: Callable[[], Awaitable[None]] | None - Cancellation handler

Usage:

  • Returned by connectors for streaming responses
  • Adapted by transport layer to SSE/streaming HTTP responses
  • Captured incrementally in wire capture

Streaming Chunks

StreamingContent

Location: src/core/domain/streaming/streaming_content.py

Purpose: Canonical internal representation for streaming chunks flowing through the pipeline.

Key Fields:

  • content: str | dict | bytes - Chunk content
  • metadata: dict[str, Any] - Chunk metadata
  • is_done: bool - Completion marker
  • is_empty: bool | None - Empty chunk indicator
  • stream_id: str | None - Stream correlation identifier
  • usage: dict[str, Any] | None - Token usage for this chunk

Usage:

  • Internal representation in streaming processors
  • Lightweight dataclass for performance-sensitive hot paths
  • Converted to StreamingChunk at serialization boundaries

StreamingChunk

Location: src/core/domain/streaming/contracts.py

Purpose: Typed serialization/validation contract for streaming chunks at boundaries.

Key Fields:

  • payload: StreamingPayload - Typed payload (text, opaque_json, binary, empty)
  • metadata: StreamingMetadata - Typed metadata (provider, finish_reason, tool_calls, etc.)
  • is_done: bool - Completion marker
  • is_empty: bool - Empty chunk indicator
  • is_cancellation: bool - Cancellation marker

Usage:

  • Used by SSE serializer for validation
  • Used in error envelopes and done markers
  • Provides strong schema validation at boundaries

Conversion: StreamingContent.to_typed_chunk() and StreamingContent.from_typed_chunk() provide bidirectional conversion.

Boundary Conversion Points

Data representation changes occur only at explicit boundary conversion points. These are the only places where conversions between representations are allowed.

Transport ↔ Domain

Location: src/core/transport/fastapi/request_adapters.py and controller adapters

Conversions:

  • Inbound: HTTP request body → CanonicalChatRequest
  • Inbound: HTTP headers/cookies → RequestContext (with domain_request and raw_body populated)
  • Outbound: ResponseEnvelope / StreamingResponseEnvelope → HTTP response

Rules:

  • Controllers must convert to canonical contracts before invoking core services
  • Transport-specific types (FastAPI Request, Response) must not leak into core services
  • Raw body bytes must be captured in RequestContext.raw_body for wire capture

Example:

# Controller receives HTTP request
async def chat_completion(request: Request):
    # Convert to canonical contract
    domain_request = await adapt_request_to_canonical(request)
    context = RequestContext(
        headers=request.headers,
        cookies=request.cookies,
        domain_request=domain_request,
        raw_body=await request.body(),
        # ... other fields
    )
    # Pass canonical contracts to core service
    result = await request_processor.process(domain_request, context)
    # Adapt response envelope to HTTP
    return adapt_envelope_to_http(result)

Domain ↔ Connector

Location: src/core/services/backend_completion_flow/ and connector interfaces

Conversions:

  • Outbound: CanonicalChatRequest + BackendTarget → Provider-specific request format (inside connector)
  • Inbound: Provider response → ResponseEnvelope / StreamingResponseEnvelope

Rules:

  • Connectors receive canonical contracts as input
  • Provider-specific request construction happens inside connectors
  • Connectors return transport-agnostic envelopes, not provider-specific types

Example:

# Connector receives canonical contracts
async def complete(
    self,
    request: CanonicalChatRequest,
    target: BackendTarget,
    context: RequestContext,
) -> ResponseEnvelope:
    # Convert to provider format (internal to connector)
    provider_request = self._to_provider_format(request, target)
    # Call provider API
    provider_response = await self._client.chat(provider_request)
    # Convert to canonical envelope
    return self._to_envelope(provider_response)

Domain ↔ Capture/Replay

Location: src/core/simulation/capture_decoder.py and wire capture services

Conversions:

  • Capture: Canonical contracts → CBOR bytes (deterministic serialization)
  • Replay: CBOR bytes → Canonical contracts (best-effort decoding)

Rules:

  • Raw bytes are the source of truth for capture fidelity
  • Decoding failures are non-blocking (best-effort)
  • Decoded contracts are used for simulation/debugging, not as authoritative source

Example:

# Capture: contract → bytes
capture_entry = CaptureEntry(
    request=domain_request.model_dump_json(),
    response=envelope.model_dump_json(),
    # ... other fields
)
cbor_bytes = encode_cbor(capture_entry)

# Replay: bytes → contract (best-effort)
try:
    decoded = decode_cbor(cbor_bytes)
    request = CanonicalChatRequest.model_validate_json(decoded.request)
except Exception as e:
    logger.warning(f"Failed to decode contract: {e}")
    # Fall back to raw bytes inspection

Serialization Utilities

Overview

The src/core/common/contract_serialization.py module provides centralized utilities for serializing canonical contracts with deterministic output and secret-safe logging. These utilities ensure consistent serialization behavior across wire capture, logging, and debugging workflows.

Requirements: This section addresses:

  • Requirement 7.3: Deterministic serialization for diff-based debugging and stable replay workflows
  • NFR4.2: Secret-safe logging that avoids emitting sensitive request/response content unless explicitly permitted

Functions

serialize_for_capture(contract: Any) -> bytes

Serializes a canonical contract for wire capture with deterministic output.

Requirement: Implements Requirement 7.3 (deterministic serialization for diff-based debugging and stable replay workflows).

Characteristics:

  • Deterministic: Identical input always produces identical output (sorted keys, compact format)
  • No Redaction: Preserves full fidelity for debugging and replay
  • CBOR-Compatible: Output is suitable for CBOR encoding

Usage:

from src.core.common.contract_serialization import serialize_for_capture

# Serialize request/response for capture
capture_bytes = serialize_for_capture(canonical_request)
# Use in wire capture services

Supported Types:

  • Pydantic models (via model_dump(mode="json"))
  • dict and list (JSON-serialized with sorted keys)
  • bytes and str (passed through)
  • Objects with __dict__ (converted to dict)

serialize_for_logging(contract: Any, *, redact: bool = True) -> str

Serializes a canonical contract for logging with optional secret redaction.

Requirements:

  • Implements Requirement 7.3 (deterministic serialization for consistent log output)
  • Implements NFR4.2 (secret-safe logging that avoids emitting sensitive content unless explicitly permitted)

Characteristics:

  • Redaction: By default, redacts sensitive fields (API keys, passwords, etc.)
  • Deterministic: Sorted keys for consistent log output
  • JSON Format: Returns JSON string suitable for log messages

Usage:

from src.core.common.contract_serialization import serialize_for_logging

# Log with redaction (default)
log_str = serialize_for_logging(request, redact=True)
logger.info("Request: %s", log_str)

# Log without redaction (for debugging)
log_str = serialize_for_logging(request, redact=False)
logger.debug("Full request: %s", log_str)

Redaction Behavior:

  • Redacts fields matching DEFAULT_REDACTED_FIELDS (e.g., api_key, password, authorization)
  • Preserves first 2 and last 2 characters for strings > 6 characters
  • Fully masks strings ≤ 6 characters
  • Recursively redacts nested dictionaries and lists

Supported Types:

  • Pydantic models (via model_dump(mode="json"))
  • dict and list (with recursive redaction)
  • Objects with __dict__ (converted to dict)

serialize_dict_for_capture(data: dict[str, Any]) -> bytes

Helper function for serializing dictionaries with deterministic formatting.

Usage:

from src.core.common.contract_serialization import serialize_dict_for_capture

# Serialize metadata dict for capture
metadata_bytes = serialize_dict_for_capture(metadata_dict)

When to Use Which Function

Use Case Function Redaction
Wire capture (CBOR/JSON) serialize_for_capture() No
Logging (error/debug messages) serialize_for_logging() Yes (default)
Logging (full fidelity debugging) serialize_for_logging(redact=False) No
Metadata serialization serialize_dict_for_capture() No

Redaction Policy

Default Redacted Fields: Defined in src/core/common/logging_utils.py:

  • api_key, apikey, api-key
  • password, passwd
  • authorization, auth
  • token, access_token, refresh_token
  • secret, secret_key
  • And others (see DEFAULT_REDACTED_FIELDS)

Customization: To redact additional fields, pass a custom redacted_fields set to redact_dict() (used internally by serialize_for_logging).

Examples

Wire Capture:

# In wire capture service
from src.core.common.contract_serialization import serialize_for_capture

capture_bytes = serialize_for_capture(request_payload)
# Write to CBOR file

Error Logging:

# In error handler
from src.core.common.contract_serialization import serialize_for_logging

if exc.details:
    redacted_details = serialize_for_logging(exc.details, redact=True)
    logger.error("Error details: %s", redacted_details)

Boundary Validation Logging:

# In boundary validation
from src.core.common.contract_serialization import serialize_for_logging

redacted_details = serialize_for_logging(details, redact=True)
logger.warning("Validation failed: %s", message, extra={"details": redacted_details})

Deterministic Serialization Policy

Requirement: Requirement 7.3 mandates deterministic serialization for diff-based debugging and stable replay workflows.

All serialization functions use deterministic JSON formatting:

  • Sorted Keys: sort_keys=True ensures consistent key ordering
  • Compact Format: separators=(",", ":") for capture, indent=None for logging
  • ASCII Handling: ensure_ascii=False preserves Unicode characters

This ensures:

  • Identical inputs produce identical outputs (critical for diff-based debugging)
  • Stable replay workflows (captured data is deterministic)
  • Consistent log output (easier to search and analyze)

Rationale: Without deterministic serialization, the same contract could produce different serialized output depending on dictionary key insertion order, making diff-based debugging unreliable and replay workflows inconsistent.

Extension-Field Policy

Single Extension Container Rule

Each canonical contract may have at most one explicitly named extension container:

  • RequestContext.extensions: dict[str, JsonValue]
  • UsageSummary.extensions: dict[str, JsonValue]
  • ResponseEnvelope.metadata: dict[str, JsonValue] | None

Rationale: Multiple extension containers create ambiguity about where to place new fields. A single container makes the policy clear.

JSON-Serializable Constraint

Extension values must be JSON-serializable. Use pydantic.types.JsonValue type:

from pydantic.types import JsonValue

extensions: dict[str, JsonValue] = {}
# JsonValue = str | int | float | bool | None | list[JsonValue] | dict[str, JsonValue]

Rationale:

  • Extensions must be serializable for wire capture and debugging
  • JSON serialization ensures deterministic capture metadata
  • Type checker can validate JSON-serializable constraint

When to Use Extensions

Extensions are appropriate when:

  1. Vendor/Protocol-Specific: Data is specific to a single provider or protocol
  2. Unstable: Field shape changes frequently or is experimental
  3. Low-Frequency Access: Field is rarely accessed in core logic
  4. Cross-Layer Necessity: Data must flow across layers but doesn't warrant a typed field

Extensions are not appropriate when:

  1. Stable and Frequently Used: Field is stable and accessed frequently → promote to typed field
  2. Core Semantic Data: Field is part of the core contract semantics → use typed field
  3. Type Safety Critical: Field shape affects correctness → use typed field

Examples

Good: Protocol-specific metadata

# OpenAI-specific request metadata
context.extensions["openai_service_tier"] = "priority"

# Gemini-specific generation config
request.extra_body = {"generation_config": {...}}  # extra_body is protocol-specific

Bad: Core semantic data in extensions

# BAD: Core semantic field should be typed
context.extensions["backend"] = "openai"  # Should use context.backend

# BAD: Frequently accessed field should be typed
context.extensions["session_id"] = session_id  # Should use context.session_id

Promotion Process

When an extension key becomes stable and frequently used, it should be promoted to a first-class typed field.

Promotion Criteria

An extension key should be promoted when:

  1. Stability: Field shape has been stable for multiple releases
  2. Frequency: Field is accessed in multiple places across layers
  3. Semantic Importance: Field affects core contract semantics or correctness
  4. Type Safety: Stronger typing would catch bugs or improve maintainability

Promotion Steps

  1. Add Typed Field: Add the field to the canonical contract with appropriate type
  2. Migration Period: Support both extension key and typed field during migration
  3. Update Writers: Update all code that writes the extension key to use the typed field
  4. Update Readers: Update all code that reads the extension key to use the typed field
  5. Deprecation: Mark extension key access as deprecated (if still supported)
  6. Removal: Remove extension key support after migration period

Example: Promoting backend Extension

Before (extension):

# Writers
context.extensions["backend"] = "openai"

# Readers
backend = context.extensions.get("backend")

After (typed field):

# Contract definition
class RequestContext:
    backend: str | None = None
    extensions: dict[str, JsonValue] = field(default_factory=dict)

# Writers
context.backend = "openai"

# Readers
backend = context.backend

Migration (support both):

# Writers: use typed field
context.backend = "openai"

# Readers: check typed field first, fall back to extension for compatibility
backend = context.backend or context.extensions.get("backend")

Examples

Before/After: Function Signatures

Before (ad hoc types):

async def process_request(
    request_data: dict[str, Any],
    context: Any,
    backend_info: dict[str, Any],
) -> dict[str, Any]:
    # Type checker can't validate shapes
    # Runtime errors possible from missing keys
    backend = backend_info.get("backend")
    model = backend_info.get("model")
    # ...

After (canonical contracts):

async def process_request(
    request: CanonicalChatRequest,
    context: RequestContext,
    target: BackendTarget,
) -> ResponseEnvelope:
    # Type checker validates shapes
    # IDE autocomplete works
    backend = target.backend
    model = target.model
    # ...

Before/After: Extension Usage

Before (unconstrained dict):

# No type safety
metadata: dict[str, Any] = {}
metadata["usage"] = {"tokens": 100}  # Could be anything
metadata["custom_field"] = SomeComplexObject()  # Not serializable

After (constrained extensions):

from pydantic.types import JsonValue

# Type-safe extensions
extensions: dict[str, JsonValue] = {}
extensions["usage"] = {"tokens": 100}  # Validated as JSON-serializable
# extensions["custom_field"] = SomeComplexObject()  # Type error!

Before/After: Request Context

Before (dynamic attributes):

# Dynamic attribute assignment (requires type: ignore)
context.domain_request = request  # type: ignore[attr-defined]
context.raw_body = body_bytes  # type: ignore[attr-defined]
context.backend = "openai"  # type: ignore[attr-defined]

# Readers must use getattr with defaults
backend = getattr(context, "backend", None)

After (explicit typed fields):

# Explicit typed fields
context.domain_request = request  # Type-safe
context.raw_body = body_bytes  # Type-safe
context.backend = "openai"  # Type-safe

# Readers use direct attribute access
backend = context.backend  # Type-safe, IDE autocomplete works

PR Checklist

When modifying cross-layer boundaries, verify:

  • No new Any in src/core/interfaces/ function signatures for cross-layer seams
  • No new dict[str, Any] for contract-shaped payloads; use JsonValue or a named contract
  • No new type: ignore in boundary modules (src/core/interfaces/, src/core/domain/, src/core/transport/) without documented rationale
  • Canonical contracts used at all cross-layer boundaries (transport ↔ core ↔ connector)
  • Extensions constrained to JsonValue type (not Any)
  • Single extension container per contract (not multiple extension fields)

Local Validation

Run the boundary type checker before submitting PRs:

./.venv/Scripts/python.exe dev/scripts/check_boundary_types.py

This script checks for:

  • Any in function signatures in boundary modules (within the declared enforcement scope)
  • dict[str, Any] for contract-shaped data in boundary signatures
  • Violations are checked only for files in the boundary enforcement scope (see dev/boundary_types_scope.json)

Enforcement Scope: The boundary type checker uses a scope configuration file (dev/boundary_types_scope.json) that defines which files are subject to boundary type enforcement. Phase 0 scope includes explicit file pinning for the highest-leverage seams (connector base API, response processor interfaces, transport adapter protocols, and canonical contract carriers).

Phase 0 Enforcement Status: Phase 0 scope is enforced; violations in Phase 0 scope files will cause the boundary type check to fail. Other areas outside Phase 0 scope are advisory until the scope is expanded in later phases. The Phase 0 scope focuses on signature-first enforcement of the highest-leverage boundary surfaces.

If Violations Are Necessary

If you must introduce a violation (e.g., legacy compatibility):

  1. Document rationale in code comments explaining why the violation is necessary
  2. Add follow-up task to remove the violation in a future PR
  3. Add time-bounded allowlist entry in dev/boundary_types_allowlist.json with:
    • File path and optional symbol name
    • Violation type (Any-in-signature or dict[str, Any])
    • Rationale and tracking reference (issue/spec)
    • Expiration date (RFC3339 timestamp)

Example allowlist entry:

{
  "file": "src/core/interfaces/legacy_adapter.py",
  "symbol": "legacy_compat_method",
  "violation": "Any-in-signature",
  "reason": "Legacy compatibility shim, will be removed after migration",
  "expires_at": "2025-06-30T00:00:00Z",
  "tracking": "typed-contracts-boundary-hardening Phase 1"
}

Important: Allowlist entries expire. Expired entries cause the boundary type check to fail, requiring either renewal or fixing the violation.

Related Documentation

References

  • Specification: .kiro/specs/typed-contracts-boundary-hardening/
  • Design Document: .kiro/specs/typed-contracts-boundary-hardening/design.md
  • Requirements: .kiro/specs/typed-contracts-boundary-hardening/requirements.md
  • Previous Spec: .kiro/specs/cross-layer-typed-data-contracts/ (this spec is a follow-up)