diff --git a/.env.example b/.env.example index 72351421..afc378ed 100644 --- a/.env.example +++ b/.env.example @@ -52,6 +52,13 @@ # --- Chutes --- #CHUTES_API_KEY_1="YOUR_CHUTES_API_KEY" +# --- Hatz AI --- +# Hatz uses the Responses API internally. The proxy translates Chat Completions +# requests to Hatz's /v1/openai/responses endpoint automatically. +# Authentication uses X-API-Key header (not Bearer token). +#HATZ_API_KEY_1="YOUR_HATZ_API_KEY" +#HATZ_API_KEY_2="YOUR_HATZ_API_KEY_2" + # ------------------------------------------------------------------------------ # | [OAUTH] Provider OAuth 2.0 Credentials | # ------------------------------------------------------------------------------ diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index f7ebbde5..e78a2005 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -237,6 +237,48 @@ The manager supports loading credentials from two sources, with a clear priority - This is the key to "Stateless Deployment" for platforms like Railway, Render, Heroku - Credentials are referenced internally using `env://` URIs (e.g., `env://gemini_cli/1`) +#### 2.6.4. Remote Host Authentication (SSH Port Forwarding) + +When the proxy is deployed on a remote host (VPS, cloud server, etc.), OAuth authentication requires special handling because OAuth callbacks are sent to `localhost`, which on the remote server refers to the server itself, not your local machine. + +**The Problem:** + +- Proxy runs on remote VPS at `your-vps-ip` +- You attempt to add OAuth credentials using the credential tool on the VPS +- OAuth provider redirects to `http://localhost:PORT/callback` +- On the VPS, `localhost` points to the VPS's localhost, not your local browser +- The callback fails because your browser cannot connect to the VPS's localhost + +**The Solution: SSH Port Forwarding** + +Create an SSH tunnel to forward OAuth callback ports from the VPS to your local machine: + +```bash +# Single provider examples +ssh -L 8085:localhost:8085 user@your-vps-ip # Gemini CLI +ssh -L 51121:localhost:51121 user@your-vps-ip # Antigravity +ssh -L 11451:localhost:11451 user@your-vps-ip # iFlow + +# Multiple providers simultaneously +ssh -L 8085:localhost:8085 -L 51121:localhost:51121 -L 11451:localhost:11451 user@your-vps-ip +``` + +**Workflow:** + +1. **Establish SSH tunnel** (keep this connection open) +2. **Run credential tool on VPS** (in separate SSH session) +3. **Complete browser-based OAuth** - callbacks are forwarded via tunnel +4. **Close SSH tunnel** after authentication completes + +**Alternative Approach: Local Authentication + Export** + +If SSH port forwarding is not feasible: +1. Complete OAuth flows locally on your machine +2. Export credentials to environment variables using credential tool's export feature +3. Deploy `.env` file to remote server + +This approach uses the credential tool's export functionality to generate environment variable representations of OAuth credentials, which can then be deployed to stateless environments without requiring SSH tunnels. + **Gemini CLI Environment Variables:** Single credential (legacy format): diff --git a/Deployment guide.md b/Deployment guide.md index 44c7e033..6a921b9d 100644 --- a/Deployment guide.md +++ b/Deployment guide.md @@ -523,13 +523,13 @@ SSH tunnels forward ports from your local machine to the remote VPS, allowing yo From your **local machine**, open a terminal and run: ```bash -# Forward all OAuth callback ports at once -ssh -L 51121:localhost:51121 -L 8085:localhost:8085 -L 11451:localhost:11451 user@your-vps-ip +# Forward all OAuth callback ports at once (recommended) +ssh -L 8085:localhost:8085 -L 51121:localhost:51121 -L 11451:localhost:11451 user@your-vps-ip # Alternative: Forward ports individually as needed -ssh -L 51121:localhost:51121 user@your-vps-ip # For Antigravity ssh -L 8085:localhost:8085 user@your-vps-ip # For Gemini CLI -ssh -L 11451:localhost:11451 user@your-vps-ip # For iFlow +ssh -L 51121:localhost:51121 user@your-vps-ip # For Antigravity +ssh -L 11451:localhost:11451 user@your-vps-ip # For iFlow ``` **Keep this SSH session open** during the entire authentication process. diff --git a/README.md b/README.md index a7c3c438..9532cfea 100644 --- a/README.md +++ b/README.md @@ -774,6 +774,76 @@ For platforms without file persistence (Railway, Render, Vercel): +
+Remote Host Deployment (SSH Port Forwarding) + +When the proxy is running on a remote host (VPS, cloud server, etc.), OAuth token authentication requires SSH port forwarding. This is because the OAuth callback URL is sent to `localhost`, which on the remote server points to the server itself, not your local machine. + +**The Problem:** + +- You run the proxy on a remote VPS +- You try to add OAuth credentials using the credential tool +- The OAuth provider redirects to `http://localhost:PORT/callback` +- On the VPS, `localhost` refers to the VPS, not your local machine +- The callback fails because your browser can't reach the VPS's localhost + +**The Solution: SSH Port Forwarding** + +Use SSH to tunnel the OAuth callback ports from the VPS back to your local machine. You only need to do this when adding OAuth credentials. + +**Single Provider Examples:** + +```bash +# Gemini CLI (port 8085) +ssh -L 8085:localhost:8085 user@your-vps-ip + +# Antigravity (port 51121) +ssh -L 51121:localhost:51121 user@your-vps-ip + +# iFlow (port 11451) +ssh -L 11451:localhost:11451 user@your-vps-ip +``` + +**Multiple Providers at Once:** + +```bash +# Forward all three OAuth ports simultaneously +ssh -L 8085:localhost:8085 -L 51121:localhost:51121 -L 11451:localhost:11451 user@your-vps-ip +``` + +**Complete Workflow:** + +1. **Establish SSH tunnel** (keep this connection open): + ```bash + ssh -L 8085:localhost:8085 -L 51121:localhost:51121 user@your-vps-ip + ``` + +2. **Run the credential tool on the VPS** (in a separate terminal or SSH session): + ```bash + ssh user@your-vps-ip + cd /path/to/LLM-API-Key-Proxy + python -m rotator_library.credential_tool + ``` + +3. **Complete OAuth authentication**: + - The credential tool will open a browser window + - Because of the SSH tunnel, the callback will be forwarded to your local machine + - Complete the authentication flow as normal + +4. **Close SSH tunnel** after authentication is complete + +**Alternative: Local Authentication + Deploy Credentials** + +If you prefer not to use SSH port forwarding: + +1. Complete OAuth flows locally on your machine +2. Export credentials to environment variables using the credential tool +3. Deploy the `.env` file to your remote server + +See the "Stateless Deployment" section above for details on exporting credentials. + +
+
OAuth Callback Port Configuration @@ -930,11 +1000,13 @@ For OAuth providers (Antigravity, Gemini CLI, etc.), you must authenticate local ```bash # Forward callback ports through SSH -ssh -L 51121:localhost:51121 -L 8085:localhost:8085 user@your-vps +ssh -L 8085:localhost:8085 -L 51121:localhost:51121 -L 11451:localhost:11451 user@your-vps-ip -# Then run credential tool on the VPS +# Then run credential tool on the VPS in a separate terminal ``` +This creates a tunnel that forwards OAuth callback ports from the VPS to your local machine, allowing the browser-based authentication to complete successfully. + **Systemd Service:** ```ini @@ -953,6 +1025,7 @@ WantedBy=multi-user.target ``` See [VPS Deployment](Deployment%20guide.md#appendix-deploying-to-a-custom-vps) for complete guide. +See the [Remote Host Deployment (SSH Port Forwarding)](#remote-host-deployment-ssh-port-forwarding) section above for detailed OAuth setup instructions.
@@ -967,6 +1040,7 @@ See [VPS Deployment](Deployment%20guide.md#appendix-deploying-to-a-custom-vps) f | All keys on cooldown | All keys failed recently; check `logs/detailed_logs/` for upstream errors | | Model not found | Verify format is `provider/model_name` (e.g., `gemini/gemini-2.5-flash`) | | OAuth callback failed | Ensure callback port (8085, 51121, 11451) isn't blocked by firewall | +| OAuth callback failed on remote VPS | Use SSH port forwarding: `ssh -L 8085:localhost:8085 -L 51121:localhost:51121 user@your-vps-ip` | | Streaming hangs | Increase `TIMEOUT_READ_STREAMING`; check provider status | **Detailed Logs:** diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index 12014bdc..2209ae2d 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -703,6 +703,9 @@ async def verify_anthropic_api_key( Dependency to verify API key for Anthropic endpoints. Accepts either x-api-key header (Anthropic style) or Authorization Bearer (OpenAI style). """ + # If PROXY_API_KEY is not set or empty, skip verification (open access) + if not PROXY_API_KEY: + return x_api_key or auth # Check x-api-key first (Anthropic style) if x_api_key and x_api_key == PROXY_API_KEY: return x_api_key diff --git a/src/rotator_library/providers/antigravity_provider.py b/src/rotator_library/providers/antigravity_provider.py index 8f3d48f0..38bbb07f 100644 --- a/src/rotator_library/providers/antigravity_provider.py +++ b/src/rotator_library/providers/antigravity_provider.py @@ -747,16 +747,19 @@ def _clean_claude_schema(schema: Any, for_gemini: bool = False) -> Any: "$id", "$ref", "$defs", - "$schema", # Rejected by 'parameters' key + "$schema", + "$comment", + "$vocabulary", + "$dynamicRef", + "$dynamicAnchor", "definitions", - "default", # Rejected by 'parameters' key - "examples", # Rejected by 'parameters' key + "default", # Rejected by 'parameters' key, sometimes + "examples", # Rejected by 'parameters' key, sometimes "title", # May cause issues in nested objects } - # Validation keywords - only remove at schema-definition level, - # NOT when they appear as property names under "properties" - # Note: These are common property names that could be used by tools: + # Validation keywords to strip ONLY for Claude (Gemini accepts these) + # These are common property names that could be used by tools: # - "pattern" (glob, grep, regex tools) # - "format" (export, date/time tools) # - "minimum"/"maximum" (range tools) @@ -765,27 +768,55 @@ def _clean_claude_schema(schema: Any, for_gemini: bool = False) -> Any: # but we now use 'parameters' key which may silently ignore some): # Note: $schema, default, examples, title moved to meta_keywords (always stripped) validation_keywords_claude_only = { + # Array validation - Gemini accepts "minItems", "maxItems", - "uniqueItems", + # String validation - Gemini accepts "pattern", "minLength", "maxLength", + "format", + # Number validation - Gemini accepts "minimum", "maximum", + # Object validation - Gemini accepts + "minProperties", + "maxProperties", + # Composition - Gemini accepts + "not", + "prefixItems", + } + + # Validation keywords to strip for ALL models (Gemini and Claude) + validation_keywords_all_models = { + # Number validation - Gemini rejects "exclusiveMinimum", "exclusiveMaximum", "multipleOf", - "format", - "minProperties", - "maxProperties", + # Array validation - Gemini rejects + "uniqueItems", + "contains", + "minContains", + "maxContains", + "unevaluatedItems", + # Object validation - Gemini rejects "propertyNames", + "unevaluatedProperties", + "dependentRequired", + "dependentSchemas", + # Content validation - Gemini rejects "contentEncoding", "contentMediaType", "contentSchema", + # Meta annotations - Gemini rejects + "examples", "deprecated", "readOnly", "writeOnly", + # Conditional - Gemini rejects + "if", + "then", + "else", } # Handle 'anyOf', 'oneOf', and 'allOf' for Claude @@ -891,6 +922,10 @@ def _clean_claude_schema(schema: Any, for_gemini: bool = False) -> Any: # For Claude: skip - not supported continue + # Strip keywords unsupported by ALL models (both Gemini and Claude) + if key in validation_keywords_all_models: + continue + # Special handling for additionalProperties: # For Gemini: pass through as-is (Gemini accepts {}, true, false, typed schemas) # For Claude: normalize permissive values ({} or true) to true diff --git a/src/rotator_library/providers/hatz_provider.py b/src/rotator_library/providers/hatz_provider.py new file mode 100644 index 00000000..4b26716d --- /dev/null +++ b/src/rotator_library/providers/hatz_provider.py @@ -0,0 +1,857 @@ +# SPDX-License-Identifier: LGPL-3.0-only + +""" +Hatz AI Provider - Responses API Translation Layer + +Provider implementation for the Hatz AI API (https://ai.hatz.ai). +Translates between OpenAI Chat Completions format (client-facing) and +Hatz's Responses API format (backend). + +Request flow: + Client (OpenAI Chat Completions) -> translate -> Hatz Responses API + Hatz Responses API SSE -> translate -> OpenAI Chat Completions SSE + +Authentication: X-API-Key header +Responses endpoint: POST /v1/openai/responses +Models endpoint: GET /v1/chat/models + +Environment variables: + HATZ_API_BASE: API base URL (default: https://ai.hatz.ai/v1) + HATZ_API_KEY_1, HATZ_API_KEY_2, ...: API keys for rotation +""" + +import json +import time +import os +import httpx +import logging +from typing import Union, AsyncGenerator, List, Dict, Any, Optional +from .provider_interface import ProviderInterface +from ..model_definitions import ModelDefinitions +from ..timeout_config import TimeoutConfig +from ..transaction_logger import ProviderLogger +import litellm +from litellm.exceptions import RateLimitError, AuthenticationError + +lib_logger = logging.getLogger("rotator_library") + + +class HatzProvider(ProviderInterface): + """ + Provider for the Hatz AI Responses API. + + Accepts OpenAI Chat Completions requests from clients, translates them + to Hatz Responses API format, and translates the streaming/non-streaming + responses back to OpenAI Chat Completions format. + """ + + skip_cost_calculation = True + + def __init__(self): + self.api_base = os.environ.get("HATZ_API_BASE", "https://ai.hatz.ai/v1") + self.model_definitions = ModelDefinitions() + + def has_custom_logic(self) -> bool: + return True + + async def get_models( + self, api_key: str, client: httpx.AsyncClient + ) -> List[str]: + """ + Fetch available models from Hatz's /chat/models endpoint. + + Hatz returns {"data": [{"name": "model-id", ...}]}. + Combines with static model definitions from HATZ_MODELS env var. + """ + models = [] + env_var_ids = set() + + # Source 1: Static model definitions from HATZ_MODELS env var + static_models = self.model_definitions.get_all_provider_models("hatz") + if static_models: + for model in static_models: + model_name = model.split("/")[-1] if "/" in model else model + models.append(model if "/" in model else f"hatz/{model}") + env_var_ids.add(model_name) + lib_logger.info( + f"Loaded {len(static_models)} static models for hatz" + ) + + # Source 2: Dynamic discovery from Hatz API + try: + models_url = f"{self.api_base.rstrip('/')}/chat/models" + response = await client.get( + models_url, + headers={"X-API-Key": api_key}, + ) + response.raise_for_status() + data = response.json().get("data", []) + + for model_info in data: + model_id = model_info.get("name", "") + if model_id and model_id not in env_var_ids: + models.append(f"hatz/{model_id}") + + lib_logger.info( + f"Discovered {len(data)} models from Hatz API " + f"({len(models)} total after dedup)" + ) + except httpx.RequestError as e: + lib_logger.warning(f"Failed to fetch Hatz models: {e}") + except Exception as e: + lib_logger.warning(f"Failed to parse Hatz models response: {e}") + + return models + + async def get_auth_header( + self, credential_identifier: str + ) -> Dict[str, str]: + """Return X-API-Key header for Hatz authentication.""" + return {"X-API-Key": credential_identifier} + + # ========================================================================= + # Request Translation: OpenAI Chat Completions -> Hatz Responses API + # ========================================================================= + + def _convert_messages_to_input( + self, messages: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """ + Convert OpenAI Chat Completions messages to Hatz Responses API input items. + + The Responses API accepts simple role-based messages (user, assistant, system) + but does NOT accept: + - role: "tool" (tool results) -> must become function_call_output items + - assistant messages with tool_calls -> must be split into text message + + separate function_call items + + Conversions: + {"role": "user/system", "content": "..."} -> pass through as-is + {"role": "assistant", "content": "..."} -> pass through (text only) + {"role": "assistant", "tool_calls": [...]} -> split into: + - {"role": "assistant", "content": "..."} (if content exists) + - {"type": "function_call", "id": "...", "call_id": "...", + "name": "...", "arguments": "..."} for each tool_call + {"role": "tool", "tool_call_id": "...", "content": "..."} -> + {"type": "function_call_output", "call_id": "...", "output": "..."} + """ + input_items: List[Dict[str, Any]] = [] + + for msg in messages: + role = msg.get("role", "") + + if role == "tool": + # Tool result -> function_call_output + call_id = msg.get("tool_call_id", "") + content = msg.get("content", "") + # Handle content that may be a list of content parts + if isinstance(content, list): + # Join text parts for the output string + text_parts = [] + for part in content: + if isinstance(part, dict): + text_parts.append(part.get("text", str(part))) + else: + text_parts.append(str(part)) + content = "\n".join(text_parts) + + input_items.append({ + "type": "function_call_output", + "call_id": call_id, + "output": content if isinstance(content, str) else str(content), + }) + + elif role == "assistant": + tool_calls = msg.get("tool_calls") + + if tool_calls: + # Assistant message with tool calls: emit text (if any), then + # each tool call as a separate function_call item + content = msg.get("content") + if content: + input_items.append({ + "type": "message", + "role": "assistant", + "content": content, + }) + + for tc in tool_calls: + func = tc.get("function", {}) + tc_id = tc.get("id", "") + input_items.append({ + "type": "function_call", + "id": f"fc_{tc_id}", + "call_id": tc_id, + "name": func.get("name", ""), + "arguments": func.get("arguments", "{}"), + }) + else: + # Simple assistant text message - pass through + item: Dict[str, Any] = {"role": "assistant"} + content = msg.get("content") + if content is not None: + item["content"] = content + # Preserve reasoning_content if present (for thinking models) + if msg.get("reasoning_content"): + item["reasoning_content"] = msg["reasoning_content"] + input_items.append(item) + + else: + # user, system, developer, etc. - pass through as-is + # Only include known safe fields to avoid sending unsupported + # OpenAI-specific fields (like thinking_signature) + item = {"role": role} + content = msg.get("content") + if content is not None: + item["content"] = content + # Pass through 'name' if present (for multi-user scenarios) + if msg.get("name"): + item["name"] = msg["name"] + input_items.append(item) + + return input_items + + def _translate_tools( + self, tools: Optional[List[Dict[str, Any]]] + ) -> Optional[List[Dict[str, Any]]]: + """ + Convert OpenAI Chat Completions tool format to Responses API format. + + OpenAI: {"type": "function", "function": {"name": "fn", "description": "...", "parameters": {...}}} + Responses: {"type": "function", "name": "fn", "description": "...", "parameters": {...}} + """ + if not tools: + return None + + converted = [] + for tool in tools: + if tool.get("type") == "function" and "function" in tool: + func = tool["function"] + resp_tool: Dict[str, Any] = {"type": "function"} + if "name" in func: + resp_tool["name"] = func["name"] + if "description" in func: + resp_tool["description"] = func["description"] + if "parameters" in func: + resp_tool["parameters"] = func["parameters"] + converted.append(resp_tool) + else: + # Pass through unknown tool types as-is + converted.append(tool) + + return converted if converted else None + + def _build_responses_payload(self, **kwargs) -> Dict[str, Any]: + """ + Translate an OpenAI Chat Completions request into a Hatz Responses API payload. + + Mappings: + messages -> input + model -> model (strip hatz/ prefix) + tools -> tools (flatten function wrapper) + max_tokens -> max_output_tokens + reasoning_effort -> reasoning.effort + temperature, top_p, stop, seed, stream -> pass through + """ + payload: Dict[str, Any] = {} + + # Model (already stripped of hatz/ prefix by caller) + if "model" in kwargs: + payload["model"] = kwargs["model"] + + # Messages -> input (convert from OpenAI Chat format to Responses API format) + if "messages" in kwargs: + payload["input"] = self._convert_messages_to_input(kwargs["messages"]) + + # Stream + payload["stream"] = True # Always stream internally + + # Tools: convert from Chat Completions format to Responses API format + tools = self._translate_tools(kwargs.get("tools")) + if tools is not None: + payload["tools"] = tools + + # max_tokens -> max_output_tokens + # Hatz requires max_output_tokens >= 4096; smaller values cause errors. + # Only send if the value is >= 4096 (otherwise let Hatz use its default). + if "max_tokens" in kwargs and kwargs["max_tokens"] is not None: + max_tokens = kwargs["max_tokens"] + if max_tokens >= 4096: + payload["max_output_tokens"] = max_tokens + + # reasoning_effort -> reasoning.effort + if "reasoning_effort" in kwargs and kwargs["reasoning_effort"] is not None: + payload["reasoning"] = {"effort": kwargs["reasoning_effort"]} + + # Pass-through parameters + for param in ("temperature", "top_p", "stop", "seed", "tool_choice"): + if param in kwargs and kwargs[param] is not None: + payload[param] = kwargs[param] + + return payload + + # ========================================================================= + # Response Translation: Hatz Responses API -> OpenAI Chat Completions + # ========================================================================= + + def _make_openai_chunk( + self, + response_id: str, + model: str, + created: int, + delta: Dict[str, Any], + finish_reason: Optional[str] = None, + usage: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """Build an OpenAI Chat Completions streaming chunk dict.""" + chunk: Dict[str, Any] = { + "id": response_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + } + + if usage is not None: + # Usage-only chunk has empty choices + chunk["choices"] = [] + chunk["usage"] = usage + else: + chunk["choices"] = [ + { + "index": 0, + "delta": delta, + "finish_reason": finish_reason, + } + ] + + return chunk + + def _convert_responses_event( + self, + event: Dict[str, Any], + model: str, + state: Dict[str, Any], + ) -> List[Dict[str, Any]]: + """ + Convert a single Hatz Responses API SSE event to OpenAI Chat Completions chunks. + + Args: + event: Parsed JSON from a Responses API SSE line + model: The model name to use in output (with hatz/ prefix) + state: Mutable state dict tracking response_id, created, tool indices, etc. + + Returns: + List of OpenAI-format chunk dicts (may be empty, one, or multiple) + """ + event_type = event.get("type", "") + chunks: List[Dict[str, Any]] = [] + + resp_id = state.get("response_id", "chatcmpl-hatz") + created = state.get("created", int(time.time())) + + # --- response.created: extract IDs and emit initial role chunk --- + if event_type == "response.created": + resp_data = event.get("response", {}) + state["response_id"] = resp_data.get("id", resp_id) + state["created"] = resp_data.get("created_at", created) + resp_id = state["response_id"] + created = state["created"] + + chunks.append(self._make_openai_chunk( + resp_id, model, created, + delta={"role": "assistant"}, + )) + + # --- response.output_text.delta: text content --- + elif event_type == "response.output_text.delta": + text_delta = event.get("delta", "") + if text_delta: + chunks.append(self._make_openai_chunk( + resp_id, model, created, + delta={"content": text_delta}, + )) + + # --- response.output_item.added: start of a new output item --- + elif event_type == "response.output_item.added": + item = event.get("item", {}) + output_index = event.get("output_index", 0) + + if item.get("type") == "function_call": + # Assign a tool_call index (0-based, only counting function_calls) + tc_index = state.get("next_tool_call_index", 0) + state["next_tool_call_index"] = tc_index + 1 + # Map output_index -> tool_call_index for argument deltas + state.setdefault("output_to_tc_index", {})[output_index] = tc_index + + # Extract the call_id for OpenAI's tool_call id field + call_id = item.get("call_id", item.get("id", f"call_{tc_index}")) + func_name = item.get("name", "") + + # Store the call_id mapping for later reference + state.setdefault("tc_call_ids", {})[output_index] = call_id + + chunks.append(self._make_openai_chunk( + resp_id, model, created, + delta={ + "tool_calls": [{ + "index": tc_index, + "id": call_id, + "type": "function", + "function": {"name": func_name, "arguments": ""}, + }] + }, + )) + + # --- response.function_call_arguments.delta: tool call argument streaming --- + elif event_type == "response.function_call_arguments.delta": + output_index = event.get("output_index", 0) + arg_delta = event.get("delta", "") + tc_index = state.get("output_to_tc_index", {}).get(output_index) + + if tc_index is not None and arg_delta: + chunks.append(self._make_openai_chunk( + resp_id, model, created, + delta={ + "tool_calls": [{ + "index": tc_index, + "function": {"arguments": arg_delta}, + }] + }, + )) + + # --- response.completed: emit finish + usage chunk --- + elif event_type == "response.completed": + resp_data = event.get("response", {}) + + # Determine finish reason + has_tool_calls = state.get("next_tool_call_index", 0) > 0 + finish_reason = "tool_calls" if has_tool_calls else "stop" + + # Map Responses API usage to Chat Completions usage + usage_data = resp_data.get("usage", {}) + prompt_tokens = usage_data.get("input_tokens", 0) + completion_tokens = usage_data.get("output_tokens", 0) + total_tokens = usage_data.get("total_tokens", 0) + + # Ensure completion_tokens >= 1 so the proxy's streaming wrapper + # recognizes this as the final chunk and emits finish_reason. + # Hatz Responses API often returns 0 tokens in streaming mode. + if completion_tokens == 0: + completion_tokens = 1 + total_tokens = max(total_tokens, prompt_tokens + 1) + + # Emit a single combined finish + usage chunk. + # The proxy's _safe_streaming_wrapper requires completion_tokens > 0 + # on the same chunk as finish_reason to emit it correctly. + chunk: Dict[str, Any] = { + "id": resp_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [{ + "index": 0, + "delta": {}, + "finish_reason": finish_reason, + }], + "usage": { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": total_tokens, + }, + } + chunks.append(chunk) + + # --- response.incomplete: handle truncated responses --- + elif event_type == "response.incomplete": + resp_data = event.get("response", {}) + + # Emit finish chunk with "length" reason (truncated) + usage_data = resp_data.get("usage", {}) + prompt_tokens = usage_data.get("input_tokens", 0) + completion_tokens = usage_data.get("output_tokens", 0) + total_tokens = usage_data.get("total_tokens", 0) + if completion_tokens == 0: + completion_tokens = 1 + total_tokens = max(total_tokens, prompt_tokens + 1) + + chunk: Dict[str, Any] = { + "id": resp_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [{ + "index": 0, + "delta": {}, + "finish_reason": "length", + }], + "usage": { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": total_tokens, + }, + } + chunks.append(chunk) + + # Other event types (output_item.done, etc.) are ignored + return chunks + + def _convert_non_streaming_response( + self, response_data: Dict[str, Any], model: str + ) -> Dict[str, Any]: + """ + Convert a non-streaming Hatz Responses API response to OpenAI Chat Completions format. + + Maps output items (message content + function_calls) to choices[0].message. + """ + content_parts = [] + tool_calls = [] + tc_index = 0 + + for item in response_data.get("output", []): + item_type = item.get("type", "") + + if item_type == "message": + # Extract text from content blocks + for block in item.get("content", []): + if block.get("type") == "output_text": + content_parts.append(block.get("text", "")) + + elif item_type == "function_call": + call_id = item.get("call_id", item.get("id", f"call_{tc_index}")) + tool_calls.append({ + "id": call_id, + "type": "function", + "function": { + "name": item.get("name", ""), + "arguments": item.get("arguments", ""), + }, + }) + tc_index += 1 + + # Build message + message: Dict[str, Any] = {"role": "assistant"} + content = "".join(content_parts) + message["content"] = content if content else None + + if tool_calls: + message["tool_calls"] = tool_calls + + # Determine finish reason + finish_reason = "tool_calls" if tool_calls else "stop" + + # Map usage + raw_usage = response_data.get("usage", {}) + usage = { + "prompt_tokens": raw_usage.get("input_tokens", 0), + "completion_tokens": raw_usage.get("output_tokens", 0), + "total_tokens": raw_usage.get("total_tokens", 0), + } + + return { + "id": response_data.get("id", "chatcmpl-hatz"), + "object": "chat.completion", + "created": response_data.get("created_at", int(time.time())), + "model": model, + "choices": [{ + "index": 0, + "message": message, + "finish_reason": finish_reason, + }], + "usage": usage, + } + + # ========================================================================= + # Stream-to-Completion Reassembly + # ========================================================================= + + def _stream_to_completion_response( + self, chunks: List[litellm.ModelResponse] + ) -> litellm.ModelResponse: + """ + Reassemble streaming OpenAI chunks into a single completion response. + Used for non-streaming mode (which internally uses streaming). + """ + if not chunks: + return litellm.ModelResponse( + id="chatcmpl-hatz-empty", + object="chat.completion", + model="unknown", + choices=[{ + "index": 0, + "message": {"role": "assistant", "content": ""}, + "finish_reason": "stop", + }], + usage={"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, + ) + + first_chunk = chunks[0] + content_parts = [] + tool_calls_by_index: Dict[int, Dict[str, Any]] = {} + usage_data = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} + chunk_finish_reason = None + reasoning_content_parts = [] + + def _get(obj, key, default=None): + """Get a value from either a dict or an object attribute.""" + if isinstance(obj, dict): + return obj.get(key, default) + return getattr(obj, key, default) + + for chunk in chunks: + for choice in chunk.choices: + delta = _get(choice, "delta") + if not delta: + continue + + # Accumulate text content + content = _get(delta, "content") + if content: + content_parts.append(content) + + # Accumulate reasoning content + reasoning = _get(delta, "reasoning_content") + if reasoning: + reasoning_content_parts.append(reasoning) + + # Accumulate tool calls + tc_list = _get(delta, "tool_calls") + if tc_list: + for tc in tc_list: + idx = _get(tc, "index", 0) + if idx not in tool_calls_by_index: + tool_calls_by_index[idx] = { + "id": _get(tc, "id", f"call_{idx}"), + "type": "function", + "function": {"name": "", "arguments": ""}, + } + tc_id = _get(tc, "id") + if tc_id: + tool_calls_by_index[idx]["id"] = tc_id + tc_func = _get(tc, "function") + if tc_func: + tc_name = _get(tc_func, "name") + if tc_name: + tool_calls_by_index[idx]["function"]["name"] = tc_name + tc_args = _get(tc_func, "arguments") + if tc_args: + tool_calls_by_index[idx]["function"][ + "arguments" + ] += tc_args + + # Track finish reason + fr = _get(choice, "finish_reason") + if fr: + chunk_finish_reason = fr + + # Track usage if present + if hasattr(chunk, "usage") and chunk.usage: + usage = chunk.usage + pt = _get(usage, "prompt_tokens", 0) + ct = _get(usage, "completion_tokens", 0) + tt = _get(usage, "total_tokens", 0) + if pt: + usage_data["prompt_tokens"] = pt + if ct: + usage_data["completion_tokens"] = ct + if tt: + usage_data["total_tokens"] = tt + + # Build final message + final_message: Dict[str, Any] = { + "role": "assistant", + "content": "".join(content_parts) if content_parts else None, + } + + if reasoning_content_parts: + final_message["reasoning_content"] = "".join(reasoning_content_parts) + + aggregated_tool_calls = [ + tool_calls_by_index[idx] + for idx in sorted(tool_calls_by_index.keys()) + ] + if aggregated_tool_calls: + final_message["tool_calls"] = aggregated_tool_calls + + # Determine finish_reason + if aggregated_tool_calls: + finish_reason = "tool_calls" + elif chunk_finish_reason: + finish_reason = chunk_finish_reason + else: + finish_reason = "stop" + + final_response_data = { + "id": first_chunk.id, + "object": "chat.completion", + "created": first_chunk.created, + "model": first_chunk.model, + "choices": [{ + "index": 0, + "message": final_message, + "finish_reason": finish_reason, + }], + "usage": usage_data, + } + + return litellm.ModelResponse(**final_response_data) + + # ========================================================================= + # Main Entry Point + # ========================================================================= + + async def acompletion( + self, client: httpx.AsyncClient, **kwargs + ) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]: + """ + Execute a chat completion request against Hatz's Responses API. + + Translates OpenAI Chat Completions requests to Hatz Responses API format, + streams the response, and translates SSE events back to OpenAI format. + + For non-streaming requests, internally uses streaming and reassembles. + """ + credential = kwargs.pop("credential_identifier") + transaction_context = kwargs.pop("transaction_context", None) + model = kwargs["model"] + + # Create provider logger from transaction context + file_logger = ProviderLogger(transaction_context) + + async def make_request(): + """Prepare and send the API request to Hatz Responses API.""" + # Strip provider prefix: "hatz/anthropic.claude-opus-4-6" -> "anthropic.claude-opus-4-6" + model_name = model.split("/")[-1] if "/" in model else model + kwargs_with_stripped_model = {**kwargs, "model": model_name} + + # Build Responses API payload from Chat Completions params + payload = self._build_responses_payload(**kwargs_with_stripped_model) + + headers = { + "X-API-Key": credential, + "Content-Type": "application/json", + "Accept": "text/event-stream", + } + + # Responses API endpoint + url = f"{self.api_base.rstrip('/')}/openai/responses" + + # Log request + file_logger.log_request(payload) + lib_logger.debug(f"Hatz Responses API Request URL: {url}") + + return client.stream( + "POST", + url, + headers=headers, + json=payload, + timeout=TimeoutConfig.streaming(), + ) + + async def stream_handler(response_stream): + """Handle the Responses API SSE stream, translate to OpenAI chunks.""" + # Mutable state for tracking across SSE events + state: Dict[str, Any] = { + "response_id": "chatcmpl-hatz", + "created": int(time.time()), + "next_tool_call_index": 0, + "output_to_tc_index": {}, + "tc_call_ids": {}, + } + + try: + async with response_stream as response: + # Check for HTTP errors + if response.status_code >= 400: + error_text = await response.aread() + error_text = ( + error_text.decode("utf-8") + if isinstance(error_text, bytes) + else error_text + ) + + if response.status_code == 401: + raise AuthenticationError( + f"Hatz authentication failed: {error_text}", + llm_provider="hatz", + model=model, + response=response, + ) + elif response.status_code == 429: + raise RateLimitError( + f"Hatz rate limit exceeded: {error_text}", + llm_provider="hatz", + model=model, + response=response, + ) + else: + error_msg = ( + f"Hatz HTTP {response.status_code} error: {error_text}" + ) + file_logger.log_error(error_msg) + raise httpx.HTTPStatusError( + f"HTTP {response.status_code}: {error_text}", + request=response.request, + response=response, + ) + + # Process Responses API SSE events + async for line in response.aiter_lines(): + file_logger.log_response_chunk(line) + + if not line.startswith("data:"): + continue + + # Handle both "data:" and "data: " formats + if line.startswith("data: "): + data_str = line[6:] + else: + data_str = line[5:] + + if data_str.strip() == "[DONE]": + break + + try: + event = json.loads(data_str) + except json.JSONDecodeError: + lib_logger.warning( + f"Could not decode JSON from Hatz: {line}" + ) + continue + + # Translate Responses API event -> OpenAI chunks + openai_chunks = self._convert_responses_event( + event, model, state + ) + for chunk_dict in openai_chunks: + yield litellm.ModelResponse(**chunk_dict) + + except httpx.HTTPStatusError: + raise + except Exception as e: + file_logger.log_error(f"Error during Hatz stream processing: {e}") + lib_logger.error( + f"Error during Hatz stream processing: {e}", exc_info=True + ) + raise + + async def logging_stream_wrapper(): + """Wrap the stream to log the final reassembled response.""" + openai_chunks = [] + try: + async for chunk in stream_handler(await make_request()): + openai_chunks.append(chunk) + yield chunk + finally: + if openai_chunks: + final_response = self._stream_to_completion_response(openai_chunks) + file_logger.log_final_response(final_response.dict()) + + if kwargs.get("stream"): + return logging_stream_wrapper() + else: + + async def non_stream_wrapper(): + chunks = [chunk async for chunk in logging_stream_wrapper()] + return self._stream_to_completion_response(chunks) + + return await non_stream_wrapper()