diff --git a/README.md b/README.md
index 91264da..236d872 100644
--- a/README.md
+++ b/README.md
@@ -26,32 +26,37 @@ This does require a paid ChatGPT account.
#### GUI Application
-If you're on **macOS**, you can download the GUI app from the [GitHub releases](https://github.com/RayBytes/ChatMock/releases).
+If you're on **macOS**, you can download the GUI app from the [GitHub releases](https://github.com/RayBytes/ChatMock/releases).
+
> **Note:** Since ChatMock isn't signed with an Apple Developer ID, you may need to run the following command in your terminal to open the app:
>
> ```bash
> xattr -dr com.apple.quarantine /Applications/ChatMock.app
> ```
>
-> *[More info here.](https://github.com/deskflow/deskflow/wiki/Running-on-macOS)*
+> _[More info here.](https://github.com/deskflow/deskflow/wiki/Running-on-macOS)_
#### Command Line (Homebrew)
You can also install ChatMock as a command-line tool using [Homebrew](https://brew.sh/):
+
```
brew tap RayBytes/chatmock
brew install chatmock
```
### Python
+
If you wish to just simply run this as a python flask server, you are also freely welcome too.
Clone or download this repository, then cd into the project directory. Then follow the instrunctions listed below.
1. Sign in with your ChatGPT account and follow the prompts
+
```bash
python chatmock.py login
```
+
You can make sure this worked by running `python chatmock.py info`
2. After the login completes successfully, you can just simply start the local server
@@ -59,6 +64,7 @@ You can make sure this worked by running `python chatmock.py info`
```bash
python chatmock.py serve
```
+
Then, you can simply use the address and port as the baseURL as you require (http://127.0.0.1:8000 by default)
**Reminder:** When setting a baseURL in other applications, make you sure you include /v1/ at the end of the URL if you're using this as a OpenAI compatible endpoint (e.g http://127.0.0.1:8000/v1)
@@ -69,7 +75,7 @@ Read [the docker instrunctions here](https://github.com/RayBytes/ChatMock/blob/m
# Examples
-### Python
+### Python
```python
from openai import OpenAI
@@ -101,7 +107,7 @@ curl http://127.0.0.1:8000/v1/chat/completions \
# What's supported
-- Tool/Function calling
+- Tool/Function calling
- Vision/Image understanding
- Thinking summaries (through thinking tags)
- Thinking effort
@@ -109,15 +115,18 @@ curl http://127.0.0.1:8000/v1/chat/completions \
## Notes & Limits
- Requires an active, paid ChatGPT account.
-- Some context length might be taken up by internal instructions (but they dont seem to degrade the model)
+- Some context length might be taken up by internal instructions (but they dont seem to degrade the model)
- Use responsibly and at your own risk. This project is not affiliated with OpenAI, and is a educational exercise.
# Supported models
+
- `gpt-5`
- `gpt-5.1`
- `gpt-5.2`
+- `gpt-5.3`
- `gpt-5-codex`
- `gpt-5.2-codex`
+- `gpt-5.3-codex`
- `gpt-5.1-codex`
- `gpt-5.1-codex-max`
- `gpt-5.1-codex-mini`
@@ -128,30 +137,31 @@ curl http://127.0.0.1:8000/v1/chat/completions \
### Thinking effort
- `--reasoning-effort` (choice of minimal,low,medium,high,xhigh)
-GPT-5 has a configurable amount of "effort" it can put into thinking, which may cause it to take more time for a response to return, but may overall give a smarter answer. Applying this parameter after `serve` forces the server to use this reasoning effort by default, unless overrided by the API request with a different effort set. The default reasoning effort without setting this parameter is `medium`.
- The `gpt-5.1` family (including codex) supports `low`, `medium`, and `high` while `gpt-5.1-codex-max` adds `xhigh`. The `gpt-5.2` family (including codex) supports `low`, `medium`, `high`, and `xhigh`.
+ GPT-5 has a configurable amount of "effort" it can put into thinking, which may cause it to take more time for a response to return, but may overall give a smarter answer. Applying this parameter after `serve` forces the server to use this reasoning effort by default, unless overrided by the API request with a different effort set. The default reasoning effort without setting this parameter is `medium`.
+ The `gpt-5.1` family (including codex) supports `low`, `medium`, and `high` while `gpt-5.1-codex-max` adds `xhigh`. The `gpt-5.2` family (including codex) supports `low`, `medium`, `high`, and `xhigh`.
### Thinking summaries
- `--reasoning-summary` (choice of auto,concise,detailed,none)
-Models like GPT-5 do not return raw thinking content, but instead return thinking summaries. These can also be customised by you.
+ Models like GPT-5 do not return raw thinking content, but instead return thinking summaries. These can also be customised by you.
### OpenAI Tools
- `--enable-web-search`
-You can also access OpenAI tools through this project. Currently, only web search is available.
-You can enable it by starting the server with this parameter, which will allow OpenAI to determine when a request requires a web search, or you can use the following parameters during a request to the API to enable web search:
-
-`responses_tools`: supports `[{"type":"web_search"}]` / `{ "type": "web_search_preview" }`
-`responses_tool_choice`: `"auto"` or `"none"`
+ You can also access OpenAI tools through this project. Currently, only web search is available.
+ You can enable it by starting the server with this parameter, which will allow OpenAI to determine when a request requires a web search, or you can use the following parameters during a request to the API to enable web search:
+
+ `responses_tools`: supports `[{"type":"web_search"}]` / `{ "type": "web_search_preview" }`
+ `responses_tool_choice`: `"auto"` or `"none"`
#### Example usage
+
```json
{
"model": "gpt-5",
- "messages": [{"role":"user","content":"Find current METAR rules"}],
+ "messages": [{ "role": "user", "content": "Find current METAR rules" }],
"stream": true,
- "responses_tools": [{"type": "web_search"}],
+ "responses_tools": [{ "type": "web_search" }],
"responses_tool_choice": "auto"
}
```
@@ -159,18 +169,16 @@ You can enable it by starting the server with this parameter, which will allow O
### Expose reasoning models
- `--expose-reasoning-models`
-If your preferred app doesn’t support selecting reasoning effort, or you just want a simpler approach, this parameter exposes each reasoning level as a separate, queryable model. Each reasoning level also appears individually under /v1/models, so model pickers in your favorite chat apps will list all reasoning options as distinct models you can switch between.
+ If your preferred app doesn’t support selecting reasoning effort, or you just want a simpler approach, this parameter exposes each reasoning level as a separate, queryable model. Each reasoning level also appears individually under /v1/models, so model pickers in your favorite chat apps will list all reasoning options as distinct models you can switch between.
## Notes
+
If you wish to have the fastest responses, I'd recommend setting `--reasoning-effort` to low, and `--reasoning-summary` to none.
All parameters and choices can be seen by sending `python chatmock.py serve --h`
The context size of this route is also larger than what you get access to in the regular ChatGPT app.
When the model returns a thinking summary, the model will send back thinking tags to make it compatible with chat apps. **If you don't like this behavior, you can instead set `--reasoning-compat` to legacy, and reasoning will be set in the reasoning tag instead of being returned in the actual response text.**
-
## Star History
[](https://www.star-history.com/#RayBytes/ChatMock&Timeline)
-
-
diff --git a/chatmock/reasoning.py b/chatmock/reasoning.py
index 5b04ac2..98a0ffd 100644
--- a/chatmock/reasoning.py
+++ b/chatmock/reasoning.py
@@ -11,6 +11,8 @@ def allowed_efforts_for_model(model: str | None) -> Set[str]:
if not base:
return DEFAULT_REASONING_EFFORTS
normalized = base.split(":", 1)[0]
+ if normalized.startswith("gpt-5.3-codex"):
+ return {"low", "medium", "high", "xhigh"}
if normalized.startswith("gpt-5.2"):
return {"low", "medium", "high", "xhigh"}
if normalized.startswith("gpt-5.1-codex-max"):
@@ -73,6 +75,19 @@ def apply_reasoning_to_message(
message["reasoning"] = {"content": [{"type": "text", "text": rtxt}]}
return message
+ if compat == "copilot":
+ # Send reasoning via reasoning_text field for the Copilot
+ # Chat extension's thinking content parser.
+ rtxt_parts: list[str] = []
+ if isinstance(reasoning_summary_text, str) and reasoning_summary_text.strip():
+ rtxt_parts.append(reasoning_summary_text)
+ if isinstance(reasoning_full_text, str) and reasoning_full_text.strip():
+ rtxt_parts.append(reasoning_full_text)
+ rtxt = "\n\n".join([p for p in rtxt_parts if p])
+ if rtxt:
+ message["reasoning_text"] = rtxt
+ return message
+
if compat in ("legacy", "current"):
if reasoning_summary_text:
message["reasoning_summary"] = reasoning_summary_text
diff --git a/chatmock/routes_ollama.py b/chatmock/routes_ollama.py
index 413adff..c03cbe4 100644
--- a/chatmock/routes_ollama.py
+++ b/chatmock/routes_ollama.py
@@ -5,7 +5,15 @@
import time
from typing import Any, Dict, List
-from flask import Blueprint, Response, current_app, jsonify, make_response, request, stream_with_context
+from flask import (
+ Blueprint,
+ Response,
+ current_app,
+ jsonify,
+ make_response,
+ request,
+ stream_with_context,
+)
from .config import BASE_INSTRUCTIONS, GPT5_CODEX_INSTRUCTIONS
from .limits import record_rate_limits_from_response
@@ -17,7 +25,11 @@
)
from .transform import convert_ollama_messages, normalize_ollama_tools
from .upstream import normalize_model_name, start_upstream_request
-from .utils import convert_chat_messages_to_responses_input, convert_tools_chat_to_responses
+from .utils import (
+ convert_chat_messages_to_responses_input,
+ convert_tools_chat_to_responses,
+ derive_copilot_tools_dynamically,
+)
ollama_bp = Blueprint("ollama", __name__)
@@ -71,8 +83,15 @@ def ollama_version() -> Response:
def _instructions_for_model(model: str) -> str:
base = current_app.config.get("BASE_INSTRUCTIONS", BASE_INSTRUCTIONS)
- if model.startswith("gpt-5-codex") or model.startswith("gpt-5.1-codex") or model.startswith("gpt-5.2-codex"):
- codex = current_app.config.get("GPT5_CODEX_INSTRUCTIONS") or GPT5_CODEX_INSTRUCTIONS
+ if (
+ model.startswith("gpt-5-codex")
+ or model.startswith("gpt-5.1-codex")
+ or model.startswith("gpt-5.2-codex")
+ or model.startswith("gpt-5.3-codex")
+ ):
+ codex = (
+ current_app.config.get("GPT5_CODEX_INSTRUCTIONS") or GPT5_CODEX_INSTRUCTIONS
+ )
if isinstance(codex, str) and codex.strip():
return codex
return base
@@ -93,17 +112,22 @@ def ollama_tags() -> Response:
if bool(current_app.config.get("VERBOSE")):
print("IN GET /api/tags")
expose_variants = bool(current_app.config.get("EXPOSE_REASONING_MODELS"))
- model_ids = [
- "gpt-5",
- "gpt-5.1",
- "gpt-5.2",
- "gpt-5-codex",
- "gpt-5.2-codex",
- "gpt-5.1-codex",
- "gpt-5.1-codex-max",
- "gpt-5.1-codex-mini",
- "codex-mini",
+ _MODEL_DEFS = [
+ ("gpt-5", 128000, 128000, ["completion", "tools", "vision"]),
+ ("gpt-5.1", 128000, 64000, ["completion", "tools", "vision"]),
+ ("gpt-5.2", 128000, 64000, ["completion", "tools", "vision"]),
+ ("gpt-5.3", 128000, 64000, ["completion", "tools", "vision"]),
+ ("gpt-5-codex", 128000, 128000, ["completion", "tools", "vision"]),
+ ("gpt-5.2-codex", 272000, 128000, ["completion", "tools", "vision"]),
+ ("gpt-5.3-codex", 272000, 128000, ["completion", "tools", "vision"]),
+ ("gpt-5.1-codex", 128000, 128000, ["completion", "tools", "vision"]),
+ ("gpt-5.1-codex-max", 128000, 128000, ["completion", "tools", "vision"]),
+ ("gpt-5.1-codex-mini", 128000, 128000, ["completion", "tools", "vision"]),
+ ("codex-mini", 128000, 128000, ["completion", "tools"]),
]
+ model_ids = []
+ for base, _ctx_in, _ctx_out, _caps in _MODEL_DEFS:
+ model_ids.append(base)
if expose_variants:
model_ids.extend(
[
@@ -118,6 +142,10 @@ def ollama_tags() -> Response:
"gpt-5.2-high",
"gpt-5.2-medium",
"gpt-5.2-low",
+ "gpt-5.3-xhigh",
+ "gpt-5.3-high",
+ "gpt-5.3-medium",
+ "gpt-5.3-low",
"gpt-5-codex-high",
"gpt-5-codex-medium",
"gpt-5-codex-low",
@@ -125,6 +153,10 @@ def ollama_tags() -> Response:
"gpt-5.2-codex-high",
"gpt-5.2-codex-medium",
"gpt-5.2-codex-low",
+ "gpt-5.3-codex-xhigh",
+ "gpt-5.3-codex-high",
+ "gpt-5.3-codex-medium",
+ "gpt-5.3-codex-low",
"gpt-5.1-codex-high",
"gpt-5.1-codex-medium",
"gpt-5.1-codex-low",
@@ -134,8 +166,27 @@ def ollama_tags() -> Response:
"gpt-5.1-codex-max-low",
]
)
+ # Build lookup for context/capabilities from _MODEL_DEFS
+ _model_info_map = {
+ base: (ctx_in, ctx_out, caps) for base, ctx_in, ctx_out, caps in _MODEL_DEFS
+ }
models = []
for model_id in model_ids:
+ # Find matching base model for context/caps lookup
+ info = _model_info_map.get(model_id)
+ if not info:
+ for base, ctx_in, ctx_out, caps in _MODEL_DEFS:
+ if model_id.startswith(base):
+ info = (ctx_in, ctx_out, caps)
+ break
+ ctx_in, ctx_out, caps = (
+ info if info else (128000, 128000, ["completion", "tools", "vision"])
+ )
+ # Compute input/output token splits the same way the
+ # Copilot extension does: maxOutput = min(4096, ctx/2),
+ # maxInput = ctx - maxOutput.
+ max_output = ctx_in // 2 if ctx_in < 4096 else 4096
+ max_input = ctx_in - max_output
models.append(
{
"name": model_id,
@@ -151,6 +202,21 @@ def ollama_tags() -> Response:
"parameter_size": "8.0B",
"quantization_level": "Q4_0",
},
+ "capabilities": caps,
+ "model_info": {
+ "general.architecture": "llama",
+ "llama.context_length": ctx_in,
+ },
+ # -----------------------------------------------------------
+ # Extra fields read by the VS Code Copilot extension's
+ # Ollama provider (Out → WQ path). Without these the
+ # Language-Models panel shows empty Context Size /
+ # Capabilities columns.
+ # -----------------------------------------------------------
+ "maxInputTokens": max_input,
+ "maxOutputTokens": max_output,
+ "toolCalling": "tools" in caps,
+ "vision": "vision" in caps,
}
)
payload = {"models": models}
@@ -172,7 +238,9 @@ def ollama_show() -> Response:
except Exception:
pass
try:
- payload = json.loads(raw_body) if raw_body else (request.get_json(silent=True) or {})
+ payload = (
+ json.loads(raw_body) if raw_body else (request.get_json(silent=True) or {})
+ )
except Exception:
payload = request.get_json(silent=True) or {}
model = payload.get("model")
@@ -181,9 +249,32 @@ def ollama_show() -> Response:
if verbose:
_log_json("OUT POST /api/show", err)
return jsonify(err), 400
+
+ # Model-specific context sizes
+ _SHOW_DEFS = {
+ "gpt-5": (128000, ["completion", "tools", "vision", "thinking"]),
+ "gpt-5.1": (128000, ["completion", "tools", "vision", "thinking"]),
+ "gpt-5.2": (128000, ["completion", "tools", "vision", "thinking"]),
+ "gpt-5.3": (128000, ["completion", "tools", "vision", "thinking"]),
+ "gpt-5-codex": (128000, ["completion", "tools", "vision", "thinking"]),
+ "gpt-5.2-codex": (272000, ["completion", "tools", "vision", "thinking"]),
+ "gpt-5.3-codex": (272000, ["completion", "tools", "vision", "thinking"]),
+ "gpt-5.1-codex": (128000, ["completion", "tools", "vision", "thinking"]),
+ "gpt-5.1-codex-max": (128000, ["completion", "tools", "vision", "thinking"]),
+ "gpt-5.1-codex-mini": (128000, ["completion", "tools", "vision", "thinking"]),
+ "codex-mini": (128000, ["completion", "tools"]),
+ }
+ ctx_len = 128000
+ caps = ["completion", "tools", "vision", "thinking"]
+ for base, (cl, c) in _SHOW_DEFS.items():
+ if model.strip().lower().startswith(base):
+ ctx_len = cl
+ caps = c
+ break
+
v1_show_response = {
- "modelfile": "# Modelfile generated by \"ollama show\"\n# To build a new Modelfile based on this one, replace the FROM line with:\n# FROM llava:latest\n\nFROM /models/blobs/sha256:placeholder\nTEMPLATE \"\"\"{{ .System }}\nUSER: {{ .Prompt }}\nASSISTANT: \"\"\"\nPARAMETER num_ctx 100000\nPARAMETER stop \"\"\nPARAMETER stop \"USER:\"\nPARAMETER stop \"ASSISTANT:\"",
- "parameters": "num_keep 24\nstop \"<|start_header_id|>\"\nstop \"<|end_header_id|>\"\nstop \"<|eot_id|>\"",
+ "modelfile": '# Modelfile generated by "ollama show"\n# To build a new Modelfile based on this one, replace the FROM line with:\n# FROM llava:latest\n\nFROM /models/blobs/sha256:placeholder\nTEMPLATE """{{ .System }}\nUSER: {{ .Prompt }}\nASSISTANT: """\nPARAMETER num_ctx 100000\nPARAMETER stop ""\nPARAMETER stop "USER:"\nPARAMETER stop "ASSISTANT:"',
+ "parameters": 'num_keep 24\nstop "<|start_header_id|>"\nstop "<|end_header_id|>"\nstop "<|eot_id|>"',
"template": "{{ if .System }}<|start_header_id|>system<|end_header_id|>\n\n{{ .System }}<|eot_id|>{{ end }}{{ if .Prompt }}<|start_header_id|>user<|end_header_id|>\n\n{{ .Prompt }}<|eot_id|>{{ end }}<|start_header_id|>assistant<|end_header_id|>\n\n{{ .Response }}<|eot_id|>",
"details": {
"parent_model": "",
@@ -196,9 +287,9 @@ def ollama_show() -> Response:
"model_info": {
"general.architecture": "llama",
"general.file_type": 2,
- "llama.context_length": 2000000,
+ "llama.context_length": ctx_len,
},
- "capabilities": ["completion", "vision", "tools", "thinking"],
+ "capabilities": caps,
}
if verbose:
_log_json("OUT POST /api/show", v1_show_response)
@@ -229,14 +320,32 @@ def ollama_chat() -> Response:
model = payload.get("model")
raw_messages = payload.get("messages")
messages = convert_ollama_messages(
- raw_messages, payload.get("images") if isinstance(payload.get("images"), list) else None
+ raw_messages,
+ payload.get("images") if isinstance(payload.get("images"), list) else None,
)
+ # Extract the client's system message (if any) for use as the
+ # Responses API ``instructions`` parameter. When a client such as
+ # VS Code Copilot sends its own system prompt it already describes
+ # the available tools and constraints. Injecting the default
+ # Codex-CLI instructions on top conflicts and causes the model to
+ # output raw text simulating tool calls instead of real ones.
+ client_system_instructions: str | None = None
if isinstance(messages, list):
- sys_idx = next((i for i, m in enumerate(messages) if isinstance(m, dict) and m.get("role") == "system"), None)
+ sys_idx = next(
+ (
+ i
+ for i, m in enumerate(messages)
+ if isinstance(m, dict) and m.get("role") == "system"
+ ),
+ None,
+ )
if isinstance(sys_idx, int):
sys_msg = messages.pop(sys_idx)
content = sys_msg.get("content") if isinstance(sys_msg, dict) else ""
- messages.insert(0, {"role": "user", "content": content})
+ if isinstance(content, str) and content.strip():
+ client_system_instructions = content
+ else:
+ messages.insert(0, {"role": "user", "content": content})
stream_req = payload.get("stream")
if stream_req is None:
stream_req = True
@@ -249,13 +358,19 @@ def ollama_chat() -> Response:
# Passthrough Responses API tools (web_search) via ChatMock extension fields
extra_tools: List[Dict[str, Any]] = []
had_responses_tools = False
- rt_payload = payload.get("responses_tools") if isinstance(payload.get("responses_tools"), list) else []
+ rt_payload = (
+ payload.get("responses_tools")
+ if isinstance(payload.get("responses_tools"), list)
+ else []
+ )
if isinstance(rt_payload, list):
for _t in rt_payload:
if not (isinstance(_t, dict) and isinstance(_t.get("type"), str)):
continue
if _t.get("type") not in ("web_search", "web_search_preview"):
- err = {"error": "Only web_search/web_search_preview are supported in responses_tools"}
+ err = {
+ "error": "Only web_search/web_search_preview are supported in responses_tools"
+ }
if verbose:
_log_json("OUT POST /api/chat", err)
return jsonify(err), 400
@@ -266,6 +381,7 @@ def ollama_chat() -> Response:
extra_tools = [{"type": "web_search"}]
if extra_tools:
import json as _json
+
MAX_TOOLS_BYTES = 32768
try:
size = len(_json.dumps(extra_tools))
@@ -291,12 +407,29 @@ def ollama_chat() -> Response:
input_items = convert_chat_messages_to_responses_input(messages)
+ if not tools_responses:
+ tools_responses = derive_copilot_tools_dynamically(
+ messages,
+ client_system_instructions,
+ payload.get("input"),
+ )
+ if verbose and tools_responses:
+ print(
+ f"[Ollama fallback] Derived {len(tools_responses)} tool schemas "
+ "from prompt/history (no tools in request body)"
+ )
+
model_reasoning = extract_reasoning_from_model_name(model)
normalized_model = normalize_model_name(model)
+ effective_instructions = (
+ client_system_instructions
+ if client_system_instructions
+ else _instructions_for_model(normalized_model)
+ )
upstream, error_resp = start_upstream_request(
normalized_model,
input_items,
- instructions=_instructions_for_model(normalized_model),
+ instructions=effective_instructions,
tools=tools_responses,
tool_choice=tool_choice,
parallel_tool_calls=parallel_tool_calls,
@@ -325,13 +458,21 @@ def ollama_chat() -> Response:
if upstream.status_code >= 400:
try:
- err_body = json.loads(upstream.content.decode("utf-8", errors="ignore")) if upstream.content else {"raw": upstream.text}
+ err_body = (
+ json.loads(upstream.content.decode("utf-8", errors="ignore"))
+ if upstream.content
+ else {"raw": upstream.text}
+ )
except Exception:
err_body = {"raw": upstream.text}
if had_responses_tools:
if verbose:
- print("[Passthrough] Upstream rejected tools; retrying without extras (args redacted)")
- base_tools_only = convert_tools_chat_to_responses(normalize_ollama_tools(tools_req))
+ print(
+ "[Passthrough] Upstream rejected tools; retrying without extras (args redacted)"
+ )
+ base_tools_only = convert_tools_chat_to_responses(
+ normalize_ollama_tools(tools_req)
+ )
safe_choice = payload.get("tool_choice", "auto")
upstream2, err2 = start_upstream_request(
normalize_model_name(model),
@@ -351,14 +492,34 @@ def ollama_chat() -> Response:
if err2 is None and upstream2 is not None and upstream2.status_code < 400:
upstream = upstream2
else:
- err = {"error": {"message": (err_body.get("error", {}) or {}).get("message", "Upstream error"), "code": "RESPONSES_TOOLS_REJECTED"}}
+ err = {
+ "error": {
+ "message": (err_body.get("error", {}) or {}).get(
+ "message", "Upstream error"
+ ),
+ "code": "RESPONSES_TOOLS_REJECTED",
+ }
+ }
if verbose:
_log_json("OUT POST /api/chat", err)
- return jsonify(err), (upstream2.status_code if upstream2 is not None else upstream.status_code)
+ return jsonify(err), (
+ upstream2.status_code
+ if upstream2 is not None
+ else upstream.status_code
+ )
else:
if verbose:
- print("/api/chat upstream error status=", upstream.status_code, " body:", json.dumps(err_body)[:2000])
- err = {"error": (err_body.get("error", {}) or {}).get("message", "Upstream error")}
+ print(
+ "/api/chat upstream error status=",
+ upstream.status_code,
+ " body:",
+ json.dumps(err_body)[:2000],
+ )
+ err = {
+ "error": (err_body.get("error", {}) or {}).get(
+ "message", "Upstream error"
+ )
+ }
if verbose:
_log_json("OUT POST /api/chat", err)
return jsonify(err), upstream.status_code
@@ -367,21 +528,34 @@ def ollama_chat() -> Response:
model_out = model if isinstance(model, str) and model.strip() else normalized_model
if stream_req:
+
def _gen():
- compat = (current_app.config.get("REASONING_COMPAT", "think-tags") or "think-tags").strip().lower()
+ compat = (
+ (
+ current_app.config.get("REASONING_COMPAT", "think-tags")
+ or "think-tags"
+ )
+ .strip()
+ .lower()
+ )
think_open = False
think_closed = False
saw_any_summary = False
pending_summary_paragraph = False
full_parts: List[str] = []
+ tool_calls: List[Dict[str, Any]] = []
try:
for raw_line in upstream.iter_lines(decode_unicode=False):
if not raw_line:
continue
- line = raw_line.decode("utf-8", errors="ignore") if isinstance(raw_line, (bytes, bytearray)) else raw_line
+ line = (
+ raw_line.decode("utf-8", errors="ignore")
+ if isinstance(raw_line, (bytes, bytearray))
+ else raw_line
+ )
if not line.startswith("data: "):
continue
- data = line[len("data: "):].strip()
+ data = line[len("data: ") :].strip()
if not data:
continue
if data == "[DONE]":
@@ -397,16 +571,25 @@ def _gen():
pending_summary_paragraph = True
else:
saw_any_summary = True
- elif kind in ("response.reasoning_summary_text.delta", "response.reasoning_text.delta"):
+ elif kind in (
+ "response.reasoning_summary_text.delta",
+ "response.reasoning_text.delta",
+ ):
delta_txt = evt.get("delta") or ""
if compat == "o3":
- if kind == "response.reasoning_summary_text.delta" and pending_summary_paragraph:
+ if (
+ kind == "response.reasoning_summary_text.delta"
+ and pending_summary_paragraph
+ ):
yield (
json.dumps(
{
"model": model_out,
"created_at": created_at,
- "message": {"role": "assistant", "content": "\n"},
+ "message": {
+ "role": "assistant",
+ "content": "\n",
+ },
"done": False,
}
)
@@ -420,7 +603,10 @@ def _gen():
{
"model": model_out,
"created_at": created_at,
- "message": {"role": "assistant", "content": delta_txt},
+ "message": {
+ "role": "assistant",
+ "content": delta_txt,
+ },
"done": False,
}
)
@@ -434,7 +620,10 @@ def _gen():
{
"model": model_out,
"created_at": created_at,
- "message": {"role": "assistant", "content": ""},
+ "message": {
+ "role": "assistant",
+ "content": "",
+ },
"done": False,
}
)
@@ -443,13 +632,19 @@ def _gen():
full_parts.append("")
think_open = True
if think_open and not think_closed:
- if kind == "response.reasoning_summary_text.delta" and pending_summary_paragraph:
+ if (
+ kind == "response.reasoning_summary_text.delta"
+ and pending_summary_paragraph
+ ):
yield (
json.dumps(
{
"model": model_out,
"created_at": created_at,
- "message": {"role": "assistant", "content": "\n"},
+ "message": {
+ "role": "assistant",
+ "content": "\n",
+ },
"done": False,
}
)
@@ -463,7 +658,10 @@ def _gen():
{
"model": model_out,
"created_at": created_at,
- "message": {"role": "assistant", "content": delta_txt},
+ "message": {
+ "role": "assistant",
+ "content": delta_txt,
+ },
"done": False,
}
)
@@ -480,7 +678,10 @@ def _gen():
{
"model": model_out,
"created_at": created_at,
- "message": {"role": "assistant", "content": ""},
+ "message": {
+ "role": "assistant",
+ "content": "",
+ },
"done": False,
}
)
@@ -495,13 +696,43 @@ def _gen():
{
"model": model_out,
"created_at": created_at,
- "message": {"role": "assistant", "content": delta},
+ "message": {
+ "role": "assistant",
+ "content": delta,
+ },
"done": False,
}
)
+ "\n"
)
full_parts.append(delta)
+ elif kind == "response.output_item.done":
+ item = evt.get("item") or {}
+ if (
+ isinstance(item, dict)
+ and item.get("type") == "function_call"
+ ):
+ call_id = item.get("call_id") or item.get("id") or ""
+ name = item.get("name") or ""
+ args_raw = item.get("arguments") or ""
+ if isinstance(args_raw, str):
+ try:
+ args_obj = json.loads(args_raw) if args_raw else {}
+ except (json.JSONDecodeError, ValueError):
+ args_obj = {"raw": args_raw}
+ elif isinstance(args_raw, dict):
+ args_obj = args_raw
+ else:
+ args_obj = {}
+ if isinstance(call_id, str) and isinstance(name, str):
+ tool_calls.append(
+ {
+ "function": {
+ "name": name,
+ "arguments": args_obj,
+ },
+ }
+ )
elif kind == "response.completed":
break
finally:
@@ -519,14 +750,19 @@ def _gen():
+ "\n"
)
full_parts.append("")
+ done_msg: Dict[str, Any] = {"role": "assistant", "content": ""}
+ if tool_calls:
+ done_msg["tool_calls"] = tool_calls
done_obj = {
"model": model_out,
"created_at": created_at,
- "message": {"role": "assistant", "content": ""},
+ "message": done_msg,
"done": True,
+ "done_reason": "stop",
}
done_obj.update(_OLLAMA_FAKE_EVAL)
yield json.dumps(done_obj) + "\n"
+
if verbose:
print("OUT POST /api/chat (streaming response)")
stream_iter = stream_with_context(_gen())
@@ -548,10 +784,14 @@ def _gen():
for raw in upstream.iter_lines(decode_unicode=False):
if not raw:
continue
- line = raw.decode("utf-8", errors="ignore") if isinstance(raw, (bytes, bytearray)) else raw
+ line = (
+ raw.decode("utf-8", errors="ignore")
+ if isinstance(raw, (bytes, bytearray))
+ else raw
+ )
if not line.startswith("data: "):
continue
- data = line[len("data: "):].strip()
+ data = line[len("data: ") :].strip()
if not data:
continue
if data == "[DONE]":
@@ -573,7 +813,11 @@ def _gen():
call_id = item.get("call_id") or item.get("id") or ""
name = item.get("name") or ""
args = item.get("arguments") or ""
- if isinstance(call_id, str) and isinstance(name, str) and isinstance(args, str):
+ if (
+ isinstance(call_id, str)
+ and isinstance(name, str)
+ and isinstance(args, str)
+ ):
tool_calls.append(
{
"id": call_id,
@@ -586,7 +830,9 @@ def _gen():
finally:
upstream.close()
- if (current_app.config.get("REASONING_COMPAT", "think-tags") or "think-tags").strip().lower() == "think-tags":
+ if (
+ current_app.config.get("REASONING_COMPAT", "think-tags") or "think-tags"
+ ).strip().lower() == "think-tags":
rtxt_parts = []
if isinstance(reasoning_summary_text, str) and reasoning_summary_text.strip():
rtxt_parts.append(reasoning_summary_text)
@@ -599,7 +845,11 @@ def _gen():
out_json = {
"model": normalize_model_name(model),
"created_at": created_at,
- "message": {"role": "assistant", "content": full_text, **({"tool_calls": tool_calls} if tool_calls else {})},
+ "message": {
+ "role": "assistant",
+ "content": full_text,
+ **({"tool_calls": tool_calls} if tool_calls else {}),
+ },
"done": True,
"done_reason": "stop",
}
diff --git a/chatmock/routes_openai.py b/chatmock/routes_openai.py
index c7a2c94..9d8df0c 100644
--- a/chatmock/routes_openai.py
+++ b/chatmock/routes_openai.py
@@ -19,6 +19,7 @@
from .utils import (
convert_chat_messages_to_responses_input,
convert_tools_chat_to_responses,
+ derive_copilot_tools_dynamically,
sse_translate_chat,
sse_translate_text,
)
@@ -59,14 +60,22 @@ def _gen():
def _instructions_for_model(model: str) -> str:
base = current_app.config.get("BASE_INSTRUCTIONS", BASE_INSTRUCTIONS)
- if model.startswith("gpt-5-codex") or model.startswith("gpt-5.1-codex") or model.startswith("gpt-5.2-codex"):
- codex = current_app.config.get("GPT5_CODEX_INSTRUCTIONS") or GPT5_CODEX_INSTRUCTIONS
+ if (
+ model.startswith("gpt-5-codex")
+ or model.startswith("gpt-5.1-codex")
+ or model.startswith("gpt-5.2-codex")
+ or model.startswith("gpt-5.3-codex")
+ ):
+ codex = (
+ current_app.config.get("GPT5_CODEX_INSTRUCTIONS") or GPT5_CODEX_INSTRUCTIONS
+ )
if isinstance(codex, str) and codex.strip():
return codex
return base
@openai_bp.route("/v1/chat/completions", methods=["POST"])
+@openai_bp.route("/chat/completions", methods=["POST"])
def chat_completions() -> Response:
verbose = bool(current_app.config.get("VERBOSE"))
verbose_obfuscation = bool(current_app.config.get("VERBOSE_OBFUSCATION"))
@@ -75,6 +84,12 @@ def chat_completions() -> Response:
reasoning_compat = current_app.config.get("REASONING_COMPAT", "think-tags")
debug_model = current_app.config.get("DEBUG_MODEL")
+ # Detect requests coming from the Copilot Ollama provider (non-/v1
+ # path and/or Responses-API-style "input" list) and switch to the
+ # copilot reasoning compat mode so that tags are not
+ # included as raw content.
+ is_copilot_ollama = request.path == "/chat/completions"
+
raw = request.get_data(cache=True, as_text=True) or ""
if verbose:
try:
@@ -107,20 +122,48 @@ def chat_completions() -> Response:
_log_json("OUT POST /v1/chat/completions", err)
return jsonify(err), 400
+ # Extract the client's system message (if any) for use as the
+ # Responses API ``instructions`` parameter. When a client such as
+ # VS Code Copilot sends its own system prompt it already describes
+ # the available tools, personality, and constraints. Injecting the
+ # default Codex-CLI instructions on top of that would conflict (the
+ # model would see two contradictory tool schemas) and cause the
+ # model to generate raw text that *simulates* tool calls instead
+ # of producing real function-call output items.
+ client_system_instructions: str | None = None
if isinstance(messages, list):
- sys_idx = next((i for i, m in enumerate(messages) if isinstance(m, dict) and m.get("role") == "system"), None)
+ sys_idx = next(
+ (
+ i
+ for i, m in enumerate(messages)
+ if isinstance(m, dict) and m.get("role") == "system"
+ ),
+ None,
+ )
if isinstance(sys_idx, int):
sys_msg = messages.pop(sys_idx)
content = sys_msg.get("content") if isinstance(sys_msg, dict) else ""
- messages.insert(0, {"role": "user", "content": content})
+ if isinstance(content, str) and content.strip():
+ client_system_instructions = content
+ else:
+ messages.insert(0, {"role": "user", "content": content})
is_stream = bool(payload.get("stream"))
- stream_options = payload.get("stream_options") if isinstance(payload.get("stream_options"), dict) else {}
+ stream_options = (
+ payload.get("stream_options")
+ if isinstance(payload.get("stream_options"), dict)
+ else {}
+ )
include_usage = bool(stream_options.get("include_usage", False))
tools_responses = convert_tools_chat_to_responses(payload.get("tools"))
+
tool_choice = payload.get("tool_choice", "auto")
parallel_tool_calls = bool(payload.get("parallel_tool_calls", False))
- responses_tools_payload = payload.get("responses_tools") if isinstance(payload.get("responses_tools"), list) else []
+ responses_tools_payload = (
+ payload.get("responses_tools")
+ if isinstance(payload.get("responses_tools"), list)
+ else []
+ )
extra_tools: List[Dict[str, Any]] = []
had_responses_tools = False
if isinstance(responses_tools_payload, list):
@@ -141,18 +184,27 @@ def chat_completions() -> Response:
if not extra_tools and bool(current_app.config.get("DEFAULT_WEB_SEARCH")):
responses_tool_choice = payload.get("responses_tool_choice")
- if not (isinstance(responses_tool_choice, str) and responses_tool_choice == "none"):
+ if not (
+ isinstance(responses_tool_choice, str)
+ and responses_tool_choice == "none"
+ ):
extra_tools = [{"type": "web_search"}]
if extra_tools:
import json as _json
+
MAX_TOOLS_BYTES = 32768
try:
size = len(_json.dumps(extra_tools))
except Exception:
size = 0
if size > MAX_TOOLS_BYTES:
- err = {"error": {"message": "responses_tools too large", "code": "RESPONSES_TOOLS_TOO_LARGE"}}
+ err = {
+ "error": {
+ "message": "responses_tools too large",
+ "code": "RESPONSES_TOOLS_TOO_LARGE",
+ }
+ }
if verbose:
_log_json("OUT POST /v1/chat/completions", err)
return jsonify(err), 400
@@ -160,17 +212,55 @@ def chat_completions() -> Response:
tools_responses = (tools_responses or []) + extra_tools
responses_tool_choice = payload.get("responses_tool_choice")
- if isinstance(responses_tool_choice, str) and responses_tool_choice in ("auto", "none"):
+ if isinstance(responses_tool_choice, str) and responses_tool_choice in (
+ "auto",
+ "none",
+ ):
tool_choice = responses_tool_choice
- input_items = convert_chat_messages_to_responses_input(messages)
- if not input_items and isinstance(payload.get("prompt"), str) and payload.get("prompt").strip():
+ # If the payload already contains ``input`` as a list (Responses API
+ # format), use it directly instead of converting from chat messages.
+ raw_input = payload.get("input")
+ if not messages and isinstance(raw_input, list) and raw_input:
+ input_items = raw_input
+ else:
+ input_items = convert_chat_messages_to_responses_input(messages)
+ if (
+ not input_items
+ and isinstance(payload.get("prompt"), str)
+ and payload.get("prompt").strip()
+ ):
input_items = [
- {"type": "message", "role": "user", "content": [{"type": "input_text", "text": payload.get("prompt")}]}
+ {
+ "type": "message",
+ "role": "user",
+ "content": [{"type": "input_text", "text": payload.get("prompt")}],
+ }
]
+ # ------------------------------------------------------------------
+ # Copilot Ollama BYOK fallback (dynamic): if no structured tools are
+ # present in the request, derive schemas from the Copilot system
+ # instructions and/or historical function calls in chat/input.
+ # ------------------------------------------------------------------
+ if is_copilot_ollama and not tools_responses:
+ tools_responses = derive_copilot_tools_dynamically(
+ messages,
+ client_system_instructions,
+ raw_input,
+ )
+ if verbose and tools_responses:
+ print(
+ f"[Copilot fallback] Derived {len(tools_responses)} tool schemas "
+ "from prompt/history (no tools in request body)"
+ )
+
model_reasoning = extract_reasoning_from_model_name(requested_model)
- reasoning_overrides = payload.get("reasoning") if isinstance(payload.get("reasoning"), dict) else model_reasoning
+ reasoning_overrides = (
+ payload.get("reasoning")
+ if isinstance(payload.get("reasoning"), dict)
+ else model_reasoning
+ )
reasoning_param = build_reasoning_param(
reasoning_effort,
reasoning_summary,
@@ -178,10 +268,15 @@ def chat_completions() -> Response:
allowed_efforts=allowed_efforts_for_model(model),
)
+ effective_instructions = (
+ client_system_instructions
+ if client_system_instructions
+ else _instructions_for_model(model)
+ )
upstream, error_resp = start_upstream_request(
model,
input_items,
- instructions=_instructions_for_model(model),
+ instructions=effective_instructions,
tools=tools_responses,
tool_choice=tool_choice,
parallel_tool_calls=parallel_tool_calls,
@@ -207,12 +302,18 @@ def chat_completions() -> Response:
if upstream.status_code >= 400:
try:
raw = upstream.content
- err_body = json.loads(raw.decode("utf-8", errors="ignore")) if raw else {"raw": upstream.text}
+ err_body = (
+ json.loads(raw.decode("utf-8", errors="ignore"))
+ if raw
+ else {"raw": upstream.text}
+ )
except Exception:
err_body = {"raw": upstream.text}
if had_responses_tools:
if verbose:
- print("[Passthrough] Upstream rejected tools; retrying without extra tools (args redacted)")
+ print(
+ "[Passthrough] Upstream rejected tools; retrying without extra tools (args redacted)"
+ )
base_tools_only = convert_tools_chat_to_responses(payload.get("tools"))
safe_choice = payload.get("tool_choice", "auto")
upstream2, err2 = start_upstream_request(
@@ -230,17 +331,29 @@ def chat_completions() -> Response:
else:
err = {
"error": {
- "message": (err_body.get("error", {}) or {}).get("message", "Upstream error"),
+ "message": (err_body.get("error", {}) or {}).get(
+ "message", "Upstream error"
+ ),
"code": "RESPONSES_TOOLS_REJECTED",
}
}
if verbose:
_log_json("OUT POST /v1/chat/completions", err)
- return jsonify(err), (upstream2.status_code if upstream2 is not None else upstream.status_code)
+ return jsonify(err), (
+ upstream2.status_code
+ if upstream2 is not None
+ else upstream.status_code
+ )
else:
if verbose:
print("Upstream error status=", upstream.status_code)
- err = {"error": {"message": (err_body.get("error", {}) or {}).get("message", "Upstream error")}}
+ err = {
+ "error": {
+ "message": (err_body.get("error", {}) or {}).get(
+ "message", "Upstream error"
+ )
+ }
+ }
if verbose:
_log_json("OUT POST /v1/chat/completions", err)
return jsonify(err), upstream.status_code
@@ -248,16 +361,19 @@ def chat_completions() -> Response:
if is_stream:
if verbose:
print("OUT POST /v1/chat/completions (streaming response)")
+ effective_compat = "copilot" if is_copilot_ollama else reasoning_compat
stream_iter = sse_translate_chat(
upstream,
requested_model or model,
created,
verbose=verbose_obfuscation,
vlog=print if verbose_obfuscation else None,
- reasoning_compat=reasoning_compat,
+ reasoning_compat=effective_compat,
include_usage=include_usage,
)
- stream_iter = _wrap_stream_logging("STREAM OUT /v1/chat/completions", stream_iter, verbose)
+ stream_iter = _wrap_stream_logging(
+ "STREAM OUT /v1/chat/completions", stream_iter, verbose
+ )
resp = Response(
stream_iter,
status=upstream.status_code,
@@ -287,14 +403,19 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
return {"prompt_tokens": pt, "completion_tokens": ct, "total_tokens": tt}
except Exception:
return None
+
try:
for raw in upstream.iter_lines(decode_unicode=False):
if not raw:
continue
- line = raw.decode("utf-8", errors="ignore") if isinstance(raw, (bytes, bytearray)) else raw
+ line = (
+ raw.decode("utf-8", errors="ignore")
+ if isinstance(raw, (bytes, bytearray))
+ else raw
+ )
if not line.startswith("data: "):
continue
- data = line[len("data: "):].strip()
+ data = line[len("data: ") :].strip()
if not data:
continue
if data == "[DONE]":
@@ -307,7 +428,9 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
mu = _extract_usage(evt)
if mu:
usage_obj = mu
- if isinstance(evt.get("response"), dict) and isinstance(evt["response"].get("id"), str):
+ if isinstance(evt.get("response"), dict) and isinstance(
+ evt["response"].get("id"), str
+ ):
response_id = evt["response"].get("id") or response_id
if kind == "response.output_text.delta":
full_text += evt.get("delta") or ""
@@ -321,7 +444,11 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
call_id = item.get("call_id") or item.get("id") or ""
name = item.get("name") or ""
args = item.get("arguments") or ""
- if isinstance(call_id, str) and isinstance(name, str) and isinstance(args, str):
+ if (
+ isinstance(call_id, str)
+ and isinstance(name, str)
+ and isinstance(args, str)
+ ):
tool_calls.append(
{
"id": call_id,
@@ -330,7 +457,11 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
}
)
elif kind == "response.failed":
- error_message = evt.get("response", {}).get("error", {}).get("message", "response.failed")
+ error_message = (
+ evt.get("response", {})
+ .get("error", {})
+ .get("message", "response.failed")
+ )
elif kind == "response.completed":
break
finally:
@@ -342,10 +473,16 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
resp.headers.setdefault(k, v)
return resp
- message: Dict[str, Any] = {"role": "assistant", "content": full_text if full_text else None}
+ message: Dict[str, Any] = {
+ "role": "assistant",
+ "content": full_text if full_text else None,
+ }
if tool_calls:
message["tool_calls"] = tool_calls
- message = apply_reasoning_to_message(message, reasoning_summary_text, reasoning_full_text, reasoning_compat)
+ effective_compat_ns = "copilot" if is_copilot_ollama else reasoning_compat
+ message = apply_reasoning_to_message(
+ message, reasoning_summary_text, reasoning_full_text, effective_compat_ns
+ )
completion = {
"id": response_id or "chatcmpl",
"object": "chat.completion",
@@ -369,6 +506,7 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
@openai_bp.route("/v1/completions", methods=["POST"])
+@openai_bp.route("/completions", methods=["POST"])
def completions() -> Response:
verbose = bool(current_app.config.get("VERBOSE"))
verbose_obfuscation = bool(current_app.config.get("VERBOSE_OBFUSCATION"))
@@ -398,14 +536,22 @@ def completions() -> Response:
if not isinstance(prompt, str):
prompt = payload.get("suffix") or ""
stream_req = bool(payload.get("stream", False))
- stream_options = payload.get("stream_options") if isinstance(payload.get("stream_options"), dict) else {}
+ stream_options = (
+ payload.get("stream_options")
+ if isinstance(payload.get("stream_options"), dict)
+ else {}
+ )
include_usage = bool(stream_options.get("include_usage", False))
messages = [{"role": "user", "content": prompt or ""}]
input_items = convert_chat_messages_to_responses_input(messages)
model_reasoning = extract_reasoning_from_model_name(requested_model)
- reasoning_overrides = payload.get("reasoning") if isinstance(payload.get("reasoning"), dict) else model_reasoning
+ reasoning_overrides = (
+ payload.get("reasoning")
+ if isinstance(payload.get("reasoning"), dict)
+ else model_reasoning
+ )
reasoning_param = build_reasoning_param(
reasoning_effort,
reasoning_summary,
@@ -437,10 +583,20 @@ def completions() -> Response:
created = int(time.time())
if upstream.status_code >= 400:
try:
- err_body = json.loads(upstream.content.decode("utf-8", errors="ignore")) if upstream.content else {"raw": upstream.text}
+ err_body = (
+ json.loads(upstream.content.decode("utf-8", errors="ignore"))
+ if upstream.content
+ else {"raw": upstream.text}
+ )
except Exception:
err_body = {"raw": upstream.text}
- err = {"error": {"message": (err_body.get("error", {}) or {}).get("message", "Upstream error")}}
+ err = {
+ "error": {
+ "message": (err_body.get("error", {}) or {}).get(
+ "message", "Upstream error"
+ )
+ }
+ }
if verbose:
_log_json("OUT POST /v1/completions", err)
return jsonify(err), upstream.status_code
@@ -456,7 +612,9 @@ def completions() -> Response:
vlog=(print if verbose_obfuscation else None),
include_usage=include_usage,
)
- stream_iter = _wrap_stream_logging("STREAM OUT /v1/completions", stream_iter, verbose)
+ stream_iter = _wrap_stream_logging(
+ "STREAM OUT /v1/completions", stream_iter, verbose
+ )
resp = Response(
stream_iter,
status=upstream.status_code,
@@ -470,6 +628,7 @@ def completions() -> Response:
full_text = ""
response_id = "cmpl"
usage_obj: Dict[str, int] | None = None
+
def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
try:
usage = (evt.get("response") or {}).get("usage")
@@ -481,14 +640,19 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
return {"prompt_tokens": pt, "completion_tokens": ct, "total_tokens": tt}
except Exception:
return None
+
try:
for raw_line in upstream.iter_lines(decode_unicode=False):
if not raw_line:
continue
- line = raw_line.decode("utf-8", errors="ignore") if isinstance(raw_line, (bytes, bytearray)) else raw_line
+ line = (
+ raw_line.decode("utf-8", errors="ignore")
+ if isinstance(raw_line, (bytes, bytearray))
+ else raw_line
+ )
if not line.startswith("data: "):
continue
- data = line[len("data: "):].strip()
+ data = line[len("data: ") :].strip()
if not data or data == "[DONE]":
if data == "[DONE]":
break
@@ -497,7 +661,9 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
evt = json.loads(data)
except Exception:
continue
- if isinstance(evt.get("response"), dict) and isinstance(evt["response"].get("id"), str):
+ if isinstance(evt.get("response"), dict) and isinstance(
+ evt["response"].get("id"), str
+ ):
response_id = evt["response"].get("id") or response_id
mu = _extract_usage(evt)
if mu:
@@ -529,14 +695,17 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
@openai_bp.route("/v1/models", methods=["GET"])
+@openai_bp.route("/models", methods=["GET"])
def list_models() -> Response:
expose_variants = bool(current_app.config.get("EXPOSE_REASONING_MODELS"))
model_groups = [
("gpt-5", ["high", "medium", "low", "minimal"]),
("gpt-5.1", ["high", "medium", "low"]),
("gpt-5.2", ["xhigh", "high", "medium", "low"]),
+ ("gpt-5.3", ["xhigh", "high", "medium", "low"]),
("gpt-5-codex", ["high", "medium", "low"]),
("gpt-5.2-codex", ["xhigh", "high", "medium", "low"]),
+ ("gpt-5.3-codex", ["xhigh", "high", "medium", "low"]),
("gpt-5.1-codex", ["high", "medium", "low"]),
("gpt-5.1-codex-max", ["xhigh", "high", "medium", "low"]),
("gpt-5.1-codex-mini", []),
diff --git a/chatmock/upstream.py b/chatmock/upstream.py
index 4803954..dbe6609 100644
--- a/chatmock/upstream.py
+++ b/chatmock/upstream.py
@@ -48,6 +48,12 @@ def normalize_model_name(name: str | None, debug_model: str | None = None) -> st
"gpt5.2-codex": "gpt-5.2-codex",
"gpt-5.2-codex": "gpt-5.2-codex",
"gpt-5.2-codex-latest": "gpt-5.2-codex",
+ "gpt5.3": "gpt-5.3",
+ "gpt-5.3": "gpt-5.3",
+ "gpt-5.3-latest": "gpt-5.3",
+ "gpt5.3-codex": "gpt-5.3-codex",
+ "gpt-5.3-codex": "gpt-5.3-codex",
+ "gpt-5.3-codex-latest": "gpt-5.3-codex",
"gpt5-codex": "gpt-5-codex",
"gpt-5-codex": "gpt-5-codex",
"gpt-5-codex-latest": "gpt-5-codex",
@@ -57,6 +63,22 @@ def normalize_model_name(name: str | None, debug_model: str | None = None) -> st
"codex-mini": "codex-mini-latest",
"codex-mini-latest": "codex-mini-latest",
"gpt-5.1-codex-mini": "gpt-5.1-codex-mini",
+ # Workaround: the Copilot Chat extension's Ollama provider has a bug
+ # where it passes the raw /api/tags array to Object.entries(), causing
+ # numeric array indices to be used as model IDs instead of model names.
+ # Map these indices back to the correct model names based on the order
+ # in routes_ollama.py's /api/tags response.
+ "0": "gpt-5",
+ "1": "gpt-5.1",
+ "2": "gpt-5.2",
+ "3": "gpt-5.3",
+ "4": "gpt-5-codex",
+ "5": "gpt-5.2-codex",
+ "6": "gpt-5.3-codex",
+ "7": "gpt-5.1-codex",
+ "8": "gpt-5.1-codex-max",
+ "9": "gpt-5.1-codex-mini",
+ "10": "codex-mini-latest",
}
return mapping.get(base, base)
@@ -104,10 +126,18 @@ def start_upstream_request(
responses_payload = {
"model": model,
- "instructions": instructions if isinstance(instructions, str) and instructions.strip() else instructions,
+ "instructions": (
+ instructions
+ if isinstance(instructions, str) and instructions.strip()
+ else instructions
+ ),
"input": input_items,
"tools": tools or [],
- "tool_choice": tool_choice if tool_choice in ("auto", "none") or isinstance(tool_choice, dict) else "auto",
+ "tool_choice": (
+ tool_choice
+ if tool_choice in ("auto", "none") or isinstance(tool_choice, dict)
+ else "auto"
+ ),
"parallel_tool_calls": bool(parallel_tool_calls),
"store": False,
"stream": True,
@@ -145,7 +175,10 @@ def start_upstream_request(
timeout=600,
)
except requests.RequestException as e:
- resp = make_response(jsonify({"error": {"message": f"Upstream ChatGPT request failed: {e}"}}), 502)
+ resp = make_response(
+ jsonify({"error": {"message": f"Upstream ChatGPT request failed: {e}"}}),
+ 502,
+ )
for k, v in build_cors_headers().items():
resp.headers.setdefault(k, v)
return None, resp
diff --git a/chatmock/utils.py b/chatmock/utils.py
index 79703a5..651bffa 100644
--- a/chatmock/utils.py
+++ b/chatmock/utils.py
@@ -5,6 +5,7 @@
import hashlib
import json
import os
+import re
import secrets
import sys
from typing import Any, Dict, List, Optional, Tuple
@@ -85,7 +86,9 @@ def generate_pkce() -> "PkceCodes":
return PkceCodes(code_verifier=code_verifier, code_challenge=code_challenge)
-def convert_chat_messages_to_responses_input(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+def convert_chat_messages_to_responses_input(
+ messages: List[Dict[str, Any]],
+) -> List[Dict[str, Any]]:
def _normalize_image_data_url(url: str) -> str:
try:
if not isinstance(url, str):
@@ -152,7 +155,11 @@ def _normalize_image_data_url(url: str) -> str:
fn = tc.get("function") if isinstance(tc.get("function"), dict) else {}
name = fn.get("name") if isinstance(fn, dict) else None
args = fn.get("arguments") if isinstance(fn, dict) else None
- if isinstance(call_id, str) and isinstance(name, str) and isinstance(args, str):
+ if (
+ isinstance(call_id, str)
+ and isinstance(name, str)
+ and isinstance(args, str)
+ ):
input_items.append(
{
"type": "function_call",
@@ -178,7 +185,12 @@ def _normalize_image_data_url(url: str) -> str:
image = part.get("image_url")
url = image.get("url") if isinstance(image, dict) else image
if isinstance(url, str) and url:
- content_items.append({"type": "input_image", "image_url": _normalize_image_data_url(url)})
+ content_items.append(
+ {
+ "type": "input_image",
+ "image_url": _normalize_image_data_url(url),
+ }
+ )
elif isinstance(content, str) and content:
kind = "output_text" if role == "assistant" else "input_text"
content_items.append({"type": kind, "text": content})
@@ -186,7 +198,9 @@ def _normalize_image_data_url(url: str) -> str:
if not content_items:
continue
role_out = "assistant" if role == "assistant" else "user"
- input_items.append({"type": "message", "role": role_out, "content": content_items})
+ input_items.append(
+ {"type": "message", "role": role_out, "content": content_items}
+ )
return input_items
@@ -219,7 +233,328 @@ def convert_tools_chat_to_responses(tools: Any) -> List[Dict[str, Any]]:
return out
-def load_chatgpt_tokens(ensure_fresh: bool = True) -> tuple[str | None, str | None, str | None]:
+def extract_copilot_tools_from_system_instructions(
+ instructions: str,
+) -> List[Dict[str, Any]]:
+ """Extract Copilot tool schemas from system instructions text.
+
+ Copilot often serializes available tools using TypeScript-style blocks:
+
+ // Tool description
+ type tool_name = (_: {
+ // Property description
+ prop: string,
+ optionalProp?: number,
+ }) => any;
+
+ This parser converts those declarations into Responses-API function tools.
+ """
+ if not isinstance(instructions, str) or not instructions.strip():
+ return []
+
+ out: List[Dict[str, Any]] = []
+ seen: set[str] = set()
+
+ tool_block_re = re.compile(
+ r"(?P(?:\s*//[^\n]*\n)*)\s*type\s+"
+ r"(?P[A-Za-z_][A-Za-z0-9_-]*)\s*=\s*\(_:\s*\{"
+ r"(?P.*?)\}\)\s*=>\s*any;",
+ re.DOTALL,
+ )
+ prop_re = re.compile(
+ r"^\s*(?P[A-Za-z_][A-Za-z0-9_]*)"
+ r"(?P\?)?\s*:\s*(?P[^,]+),?\s*$"
+ )
+
+ def _json_schema(ts_type: str) -> Dict[str, Any] | None:
+ """Convert a TypeScript type annotation into a JSON Schema dict."""
+ t = (ts_type or "").strip()
+ if not t:
+ return None
+ if t in ("string",):
+ return {"type": "string"}
+ if t in ("number", "integer"):
+ return {"type": "number"}
+ if t in ("boolean",):
+ return {"type": "boolean"}
+ # Array or Foo[]
+ if t.startswith("Array<") and t.endswith(">"):
+ inner = t[6:-1].strip()
+ child = _json_schema(inner)
+ return {"type": "array", "items": child or {}}
+ if t.endswith("[]"):
+ inner = t[:-2].strip()
+ child = _json_schema(inner)
+ return {"type": "array", "items": child or {}}
+ if t.startswith("{") or t.startswith("Record<"):
+ return {"type": "object"}
+ if t in ("object",):
+ return {"type": "object"}
+ return None
+
+ for match in tool_block_re.finditer(instructions):
+ name = match.group("name")
+ if not isinstance(name, str) or not name or name in seen:
+ continue
+
+ raw_comments = match.group("comments") or ""
+ description_lines = []
+ for c in raw_comments.splitlines():
+ c = c.strip()
+ if c.startswith("//"):
+ text = c[2:].strip()
+ if text:
+ description_lines.append(text)
+ tool_description = " ".join(description_lines).strip() or f"Tool {name}"
+
+ properties: Dict[str, Any] = {}
+ required: List[str] = []
+ pending_comments: List[str] = []
+
+ body = match.group("body") or ""
+ for line in body.splitlines():
+ stripped = line.strip()
+ if not stripped:
+ continue
+ if stripped.startswith("//"):
+ text = stripped[2:].strip()
+ if text:
+ pending_comments.append(text)
+ continue
+
+ prop_match = prop_re.match(line)
+ if not prop_match:
+ pending_comments = []
+ continue
+
+ key = prop_match.group("key")
+ optional = bool(prop_match.group("optional"))
+ typ = prop_match.group("typ") or ""
+ type_schema = _json_schema(typ)
+
+ schema: Dict[str, Any] = {}
+ if type_schema is not None:
+ schema.update(type_schema)
+ if pending_comments:
+ schema["description"] = " ".join(pending_comments)
+
+ properties[key] = schema
+ if not optional:
+ required.append(key)
+ pending_comments = []
+
+ out.append(
+ {
+ "type": "function",
+ "name": name,
+ "description": tool_description,
+ "strict": False,
+ "parameters": {
+ "type": "object",
+ "properties": properties,
+ "required": required,
+ "additionalProperties": True,
+ },
+ }
+ )
+ seen.add(name)
+
+ return out
+
+
+def infer_tools_from_chat_history(
+ messages: List[Dict[str, Any]],
+ input_items: List[Dict[str, Any]] | None = None,
+) -> List[Dict[str, Any]]:
+ """Infer tool schemas from historical function calls.
+
+ This is a best-effort fallback when the client omitted ``tools``.
+ It inspects prior assistant tool calls and Responses-API input
+ function_call items to recover names and argument shapes.
+ """
+ examples_by_name: Dict[str, List[Dict[str, Any]]] = {}
+
+ def _record(name: Any, args: Any) -> None:
+ if not isinstance(name, str) or not name:
+ return
+ if isinstance(args, str):
+ try:
+ args = json.loads(args)
+ except Exception:
+ return
+ if not isinstance(args, dict):
+ return
+ examples_by_name.setdefault(name, []).append(args)
+
+ if isinstance(messages, list):
+ for message in messages:
+ if not isinstance(message, dict):
+ continue
+ if message.get("role") != "assistant":
+ continue
+ tool_calls = message.get("tool_calls")
+ if not isinstance(tool_calls, list):
+ continue
+ for tc in tool_calls:
+ if not isinstance(tc, dict):
+ continue
+ fn = tc.get("function") if isinstance(tc.get("function"), dict) else {}
+ _record(fn.get("name"), fn.get("arguments"))
+
+ if isinstance(input_items, list):
+ for item in input_items:
+ if not isinstance(item, dict):
+ continue
+ if item.get("type") != "function_call":
+ continue
+ _record(item.get("name"), item.get("arguments"))
+
+ if not examples_by_name:
+ return []
+
+ def _infer_schema(value: Any) -> Dict[str, Any] | None:
+ """Infer a JSON Schema fragment from a Python value."""
+ if isinstance(value, bool):
+ return {"type": "boolean"}
+ if isinstance(value, (int, float)):
+ return {"type": "number"}
+ if isinstance(value, str):
+ return {"type": "string"}
+ if isinstance(value, list):
+ # Try to infer inner item type from first element
+ if value:
+ inner = _infer_schema(value[0])
+ return {"type": "array", "items": inner or {}}
+ return {"type": "array", "items": {}}
+ if isinstance(value, dict):
+ return {"type": "object"}
+ return None
+
+ tools: List[Dict[str, Any]] = []
+ for name, rows in examples_by_name.items():
+ if not rows:
+ continue
+
+ key_counts: Dict[str, int] = {}
+ total = len(rows)
+
+ key_schemas: Dict[str, Dict[str, Any]] = {}
+
+ for row in rows:
+ for key, value in row.items():
+ key_counts[key] = key_counts.get(key, 0) + 1
+ s = _infer_schema(value)
+ if s:
+ # Keep the richest schema seen for each key
+ if key not in key_schemas:
+ key_schemas[key] = s
+ elif s.get("items") and not key_schemas[key].get("items"):
+ key_schemas[key] = s
+
+ properties: Dict[str, Any] = {}
+ required: List[str] = []
+ for key in sorted(key_counts.keys()):
+ schema = key_schemas.get(key, {})
+ properties[key] = schema
+ if key_counts[key] == total:
+ required.append(key)
+
+ tools.append(
+ {
+ "type": "function",
+ "name": name,
+ "description": f"Tool {name}",
+ "strict": False,
+ "parameters": {
+ "type": "object",
+ "properties": properties,
+ "required": required,
+ "additionalProperties": True,
+ },
+ }
+ )
+
+ return tools
+
+
+def derive_copilot_tools_dynamically(
+ messages: List[Dict[str, Any]],
+ client_system_instructions: str | None,
+ raw_input_items: Any,
+) -> List[Dict[str, Any]]:
+ """Build tool definitions without hardcoded static lists.
+
+ Priority order:
+ 1) Structured declarations parsed from Copilot system instructions
+ 2) Historical function_call argument shapes inferred from chat/input
+ """
+ extracted = extract_copilot_tools_from_system_instructions(
+ client_system_instructions or ""
+ )
+ inferred = infer_tools_from_chat_history(
+ messages,
+ raw_input_items if isinstance(raw_input_items, list) else None,
+ )
+
+ by_name: Dict[str, Dict[str, Any]] = {}
+ for t in extracted:
+ name = t.get("name")
+ if isinstance(name, str) and name:
+ by_name[name] = t
+
+ for t in inferred:
+ name = t.get("name")
+ if not (isinstance(name, str) and name):
+ continue
+ if name not in by_name:
+ by_name[name] = t
+ continue
+ params_existing = (
+ by_name[name].get("parameters")
+ if isinstance(by_name[name].get("parameters"), dict)
+ else {}
+ )
+ params_inferred = (
+ t.get("parameters") if isinstance(t.get("parameters"), dict) else {}
+ )
+ props_existing = (
+ params_existing.get("properties")
+ if isinstance(params_existing.get("properties"), dict)
+ else {}
+ )
+ props_inferred = (
+ params_inferred.get("properties")
+ if isinstance(params_inferred.get("properties"), dict)
+ else {}
+ )
+ for key, val in props_inferred.items():
+ if key not in props_existing:
+ props_existing[key] = val
+ req_existing = (
+ params_existing.get("required")
+ if isinstance(params_existing.get("required"), list)
+ else []
+ )
+ req_inferred = (
+ params_inferred.get("required")
+ if isinstance(params_inferred.get("required"), list)
+ else []
+ )
+ for key in req_inferred:
+ if isinstance(key, str) and key not in req_existing:
+ req_existing.append(key)
+ params_existing["properties"] = props_existing
+ params_existing["required"] = req_existing
+ params_existing.setdefault("type", "object")
+ params_existing.setdefault("additionalProperties", True)
+ by_name[name]["parameters"] = params_existing
+
+ return list(by_name.values())
+
+
+def load_chatgpt_tokens(
+ ensure_fresh: bool = True,
+) -> tuple[str | None, str | None, str | None]:
auth = read_auth_file()
if not isinstance(auth, dict):
return None, None, None
@@ -231,7 +566,12 @@ def load_chatgpt_tokens(ensure_fresh: bool = True) -> tuple[str | None, str | No
refresh_token: Optional[str] = tokens.get("refresh_token")
last_refresh = auth.get("last_refresh")
- if ensure_fresh and isinstance(refresh_token, str) and refresh_token and CLIENT_ID_DEFAULT:
+ if (
+ ensure_fresh
+ and isinstance(refresh_token, str)
+ and refresh_token
+ and CLIENT_ID_DEFAULT
+ ):
needs_refresh = _should_refresh_access_token(access_token, last_refresh)
if needs_refresh or not (isinstance(access_token, str) and access_token):
refreshed = _refresh_chatgpt_tokens(refresh_token, CLIENT_ID_DEFAULT)
@@ -260,13 +600,17 @@ def load_chatgpt_tokens(ensure_fresh: bool = True) -> tuple[str | None, str | No
if not isinstance(account_id, str) or not account_id:
account_id = _derive_account_id(id_token)
- access_token = access_token if isinstance(access_token, str) and access_token else None
+ access_token = (
+ access_token if isinstance(access_token, str) and access_token else None
+ )
id_token = id_token if isinstance(id_token, str) and id_token else None
account_id = account_id if isinstance(account_id, str) and account_id else None
return access_token, account_id, id_token
-def _should_refresh_access_token(access_token: Optional[str], last_refresh: Any) -> bool:
+def _should_refresh_access_token(
+ access_token: Optional[str], last_refresh: Any
+) -> bool:
if not isinstance(access_token, str) or not access_token:
return True
@@ -288,7 +632,9 @@ def _should_refresh_access_token(access_token: Optional[str], last_refresh: Any)
return False
-def _refresh_chatgpt_tokens(refresh_token: str, client_id: str) -> Optional[Dict[str, Optional[str]]]:
+def _refresh_chatgpt_tokens(
+ refresh_token: str, client_id: str
+) -> Optional[Dict[str, Optional[str]]]:
payload = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
@@ -320,7 +666,11 @@ def _refresh_chatgpt_tokens(refresh_token: str, client_id: str) -> Optional[Dict
return None
account_id = _derive_account_id(id_token)
- new_refresh_token = new_refresh_token if isinstance(new_refresh_token, str) and new_refresh_token else refresh_token
+ new_refresh_token = (
+ new_refresh_token
+ if isinstance(new_refresh_token, str) and new_refresh_token
+ else refresh_token
+ )
return {
"id_token": id_token,
"access_token": access_token,
@@ -329,7 +679,9 @@ def _refresh_chatgpt_tokens(refresh_token: str, client_id: str) -> Optional[Dict
}
-def _persist_refreshed_auth(auth: Dict[str, Any], updated_tokens: Dict[str, Any]) -> Optional[Tuple[Dict[str, Any], Dict[str, Any]]]:
+def _persist_refreshed_auth(
+ auth: Dict[str, Any], updated_tokens: Dict[str, Any]
+) -> Optional[Tuple[Dict[str, Any], Dict[str, Any]]]:
updated_auth = dict(auth)
updated_auth["tokens"] = updated_tokens
updated_auth["last_refresh"] = _now_iso8601()
@@ -343,7 +695,9 @@ def _derive_account_id(id_token: Optional[str]) -> Optional[str]:
if not isinstance(id_token, str) or not id_token:
return None
claims = parse_jwt_claims(id_token) or {}
- auth_claims = claims.get("https://api.openai.com/auth") if isinstance(claims, dict) else None
+ auth_claims = (
+ claims.get("https://api.openai.com/auth") if isinstance(claims, dict) else None
+ )
if isinstance(auth_claims, dict):
account_id = auth_claims.get("chatgpt_account_id")
if isinstance(account_id, str) and account_id:
@@ -364,7 +718,9 @@ def _parse_iso8601(value: str) -> Optional[datetime.datetime]:
def _now_iso8601() -> str:
- return datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z")
+ return (
+ datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z")
+ )
def get_effective_chatgpt_auth() -> tuple[str | None, str | None]:
@@ -396,14 +752,14 @@ def sse_translate_chat(
ws_state: dict[str, Any] = {}
ws_index: dict[str, int] = {}
ws_next_index: int = 0
-
+
def _serialize_tool_args(eff_args: Any) -> str:
"""
Serialize tool call arguments with proper JSON handling.
-
+
Args:
eff_args: Arguments to serialize (dict, list, str, or other)
-
+
Returns:
JSON string representation of the arguments
"""
@@ -413,14 +769,14 @@ def _serialize_tool_args(eff_args: Any) -> str:
try:
parsed = json.loads(eff_args)
if isinstance(parsed, (dict, list)):
- return json.dumps(parsed)
+ return json.dumps(parsed)
else:
- return json.dumps({"query": eff_args})
+ return json.dumps({"query": eff_args})
except (json.JSONDecodeError, ValueError):
return json.dumps({"query": eff_args})
else:
return "{}"
-
+
def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
try:
usage = (evt.get("response") or {}).get("usage")
@@ -432,6 +788,7 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
return {"prompt_tokens": pt, "completion_tokens": ct, "total_tokens": tt}
except Exception:
return None
+
try:
try:
line_iterator = upstream.iter_lines(decode_unicode=False)
@@ -474,7 +831,9 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
yield b"data: [DONE]\n\n"
return
kind = evt.get("type")
- if isinstance(evt.get("response"), dict) and isinstance(evt["response"].get("id"), str):
+ if isinstance(evt.get("response"), dict) and isinstance(
+ evt["response"].get("id"), str
+ ):
response_id = evt["response"].get("id") or response_id
if isinstance(kind, str) and ("web_search_call" in kind):
@@ -482,25 +841,44 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
call_id = evt.get("item_id") or "ws_call"
if verbose and vlog:
try:
- vlog(f"CM_TOOLS {kind} id={call_id} -> tool_calls(web_search)")
+ vlog(
+ f"CM_TOOLS {kind} id={call_id} -> tool_calls(web_search)"
+ )
except Exception:
pass
- item = evt.get('item') if isinstance(evt.get('item'), dict) else {}
- params_dict = ws_state.setdefault(call_id, {}) if isinstance(ws_state.get(call_id), dict) else {}
+ item = evt.get("item") if isinstance(evt.get("item"), dict) else {}
+ params_dict = (
+ ws_state.setdefault(call_id, {})
+ if isinstance(ws_state.get(call_id), dict)
+ else {}
+ )
+
def _merge_from(src):
if not isinstance(src, dict):
return
- for whole in ('parameters','args','arguments','input'):
+ for whole in ("parameters", "args", "arguments", "input"):
if isinstance(src.get(whole), dict):
params_dict.update(src.get(whole))
- if isinstance(src.get('query'), str): params_dict.setdefault('query', src.get('query'))
- if isinstance(src.get('q'), str): params_dict.setdefault('query', src.get('q'))
- for rk in ('recency','time_range','days'):
- if src.get(rk) is not None and rk not in params_dict: params_dict[rk] = src.get(rk)
- for dk in ('domains','include_domains','include'):
- if isinstance(src.get(dk), list) and 'domains' not in params_dict: params_dict['domains'] = src.get(dk)
- for mk in ('max_results','topn','limit'):
- if src.get(mk) is not None and 'max_results' not in params_dict: params_dict['max_results'] = src.get(mk)
+ if isinstance(src.get("query"), str):
+ params_dict.setdefault("query", src.get("query"))
+ if isinstance(src.get("q"), str):
+ params_dict.setdefault("query", src.get("q"))
+ for rk in ("recency", "time_range", "days"):
+ if src.get(rk) is not None and rk not in params_dict:
+ params_dict[rk] = src.get(rk)
+ for dk in ("domains", "include_domains", "include"):
+ if (
+ isinstance(src.get(dk), list)
+ and "domains" not in params_dict
+ ):
+ params_dict["domains"] = src.get(dk)
+ for mk in ("max_results", "topn", "limit"):
+ if (
+ src.get(mk) is not None
+ and "max_results" not in params_dict
+ ):
+ params_dict["max_results"] = src.get(mk)
+
_merge_from(item)
_merge_from(evt if isinstance(evt, dict) else None)
params = params_dict if params_dict else None
@@ -509,7 +887,9 @@ def _merge_from(src):
ws_state.setdefault(call_id, {}).update(params)
except Exception:
pass
- eff_params = ws_state.get(call_id, params if isinstance(params, (dict, list, str)) else {})
+ eff_params = ws_state.get(
+ call_id, params if isinstance(params, (dict, list, str)) else {}
+ )
args_str = _serialize_tool_args(eff_params)
if call_id not in ws_index:
ws_index[call_id] = ws_next_index
@@ -529,7 +909,10 @@ def _merge_from(src):
"index": _idx,
"id": call_id,
"type": "function",
- "function": {"name": "web_search", "arguments": args_str},
+ "function": {
+ "name": "web_search",
+ "arguments": args_str,
+ },
}
]
},
@@ -549,6 +932,7 @@ def _merge_from(src):
],
}
yield f"data: {json.dumps(finish_chunk)}\n\n".encode("utf-8")
+ sent_stop_chunk = True
except Exception:
pass
@@ -560,7 +944,13 @@ def _merge_from(src):
"object": "chat.completion.chunk",
"created": created,
"model": model,
- "choices": [{"index": 0, "delta": {"content": ""}, "finish_reason": None}],
+ "choices": [
+ {
+ "index": 0,
+ "delta": {"content": ""},
+ "finish_reason": None,
+ }
+ ],
}
yield f"data: {json.dumps(close_chunk)}\n\n".encode("utf-8")
think_open = False
@@ -571,35 +961,51 @@ def _merge_from(src):
"object": "chat.completion.chunk",
"created": created,
"model": model,
- "choices": [{"index": 0, "delta": {"content": delta}, "finish_reason": None}],
+ "choices": [
+ {"index": 0, "delta": {"content": delta}, "finish_reason": None}
+ ],
}
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
elif kind == "response.output_item.done":
item = evt.get("item") or {}
- if isinstance(item, dict) and (item.get("type") == "function_call" or item.get("type") == "web_search_call"):
+ if isinstance(item, dict) and (
+ item.get("type") == "function_call"
+ or item.get("type") == "web_search_call"
+ ):
call_id = item.get("call_id") or item.get("id") or ""
- name = item.get("name") or ("web_search" if item.get("type") == "web_search_call" else "")
+ name = item.get("name") or (
+ "web_search" if item.get("type") == "web_search_call" else ""
+ )
raw_args = item.get("arguments") or item.get("parameters")
if isinstance(raw_args, dict):
try:
ws_state.setdefault(call_id, {}).update(raw_args)
except Exception:
pass
- eff_args = ws_state.get(call_id, raw_args if isinstance(raw_args, (dict, list, str)) else {})
+ eff_args = ws_state.get(
+ call_id,
+ raw_args if isinstance(raw_args, (dict, list, str)) else {},
+ )
try:
args = _serialize_tool_args(eff_args)
except Exception:
args = "{}"
if item.get("type") == "web_search_call" and verbose and vlog:
try:
- vlog(f"CM_TOOLS response.output_item.done web_search_call id={call_id} has_args={bool(args)}")
+ vlog(
+ f"CM_TOOLS response.output_item.done web_search_call id={call_id} has_args={bool(args)}"
+ )
except Exception:
pass
if call_id not in ws_index:
ws_index[call_id] = ws_next_index
ws_next_index += 1
_idx = ws_index.get(call_id, 0)
- if isinstance(call_id, str) and isinstance(name, str) and isinstance(args, str):
+ if (
+ isinstance(call_id, str)
+ and isinstance(name, str)
+ and isinstance(args, str)
+ ):
delta_chunk = {
"id": response_id,
"object": "chat.completion.chunk",
@@ -614,7 +1020,10 @@ def _merge_from(src):
"index": _idx,
"id": call_id,
"type": "function",
- "function": {"name": name, "arguments": args},
+ "function": {
+ "name": name,
+ "arguments": args,
+ },
}
]
},
@@ -629,19 +1038,30 @@ def _merge_from(src):
"object": "chat.completion.chunk",
"created": created,
"model": model,
- "choices": [{"index": 0, "delta": {}, "finish_reason": "tool_calls"}],
+ "choices": [
+ {"index": 0, "delta": {}, "finish_reason": "tool_calls"}
+ ],
}
yield f"data: {json.dumps(finish_chunk)}\n\n".encode("utf-8")
+ sent_stop_chunk = True
elif kind == "response.reasoning_summary_part.added":
- if compat in ("think-tags", "o3"):
+ if compat in ("think-tags", "o3", "copilot"):
if saw_any_summary:
pending_summary_paragraph = True
else:
saw_any_summary = True
- elif kind in ("response.reasoning_summary_text.delta", "response.reasoning_text.delta"):
+ elif kind in (
+ "response.reasoning_summary_text.delta",
+ "response.reasoning_text.delta",
+ ):
delta_txt = evt.get("delta") or ""
- if compat == "o3":
- if kind == "response.reasoning_summary_text.delta" and pending_summary_paragraph:
+ if compat == "copilot":
+ # Send reasoning via reasoning_text field so the Copilot
+ # Chat extension picks it up as proper thinking content.
+ if (
+ kind == "response.reasoning_summary_text.delta"
+ and pending_summary_paragraph
+ ):
nl_chunk = {
"id": response_id,
"object": "chat.completion.chunk",
@@ -650,7 +1070,7 @@ def _merge_from(src):
"choices": [
{
"index": 0,
- "delta": {"reasoning": {"content": [{"type": "text", "text": "\n"}]}},
+ "delta": {"reasoning_text": "\n"},
"finish_reason": None,
}
],
@@ -665,7 +1085,49 @@ def _merge_from(src):
"choices": [
{
"index": 0,
- "delta": {"reasoning": {"content": [{"type": "text", "text": delta_txt}]}},
+ "delta": {"reasoning_text": delta_txt},
+ "finish_reason": None,
+ }
+ ],
+ }
+ yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
+ elif compat == "o3":
+ if (
+ kind == "response.reasoning_summary_text.delta"
+ and pending_summary_paragraph
+ ):
+ nl_chunk = {
+ "id": response_id,
+ "object": "chat.completion.chunk",
+ "created": created,
+ "model": model,
+ "choices": [
+ {
+ "index": 0,
+ "delta": {
+ "reasoning": {
+ "content": [{"type": "text", "text": "\n"}]
+ }
+ },
+ "finish_reason": None,
+ }
+ ],
+ }
+ yield f"data: {json.dumps(nl_chunk)}\n\n".encode("utf-8")
+ pending_summary_paragraph = False
+ chunk = {
+ "id": response_id,
+ "object": "chat.completion.chunk",
+ "created": created,
+ "model": model,
+ "choices": [
+ {
+ "index": 0,
+ "delta": {
+ "reasoning": {
+ "content": [{"type": "text", "text": delta_txt}]
+ }
+ },
"finish_reason": None,
}
],
@@ -678,18 +1140,33 @@ def _merge_from(src):
"object": "chat.completion.chunk",
"created": created,
"model": model,
- "choices": [{"index": 0, "delta": {"content": ""}, "finish_reason": None}],
+ "choices": [
+ {
+ "index": 0,
+ "delta": {"content": ""},
+ "finish_reason": None,
+ }
+ ],
}
yield f"data: {json.dumps(open_chunk)}\n\n".encode("utf-8")
think_open = True
if think_open and not think_closed:
- if kind == "response.reasoning_summary_text.delta" and pending_summary_paragraph:
+ if (
+ kind == "response.reasoning_summary_text.delta"
+ and pending_summary_paragraph
+ ):
nl_chunk = {
"id": response_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
- "choices": [{"index": 0, "delta": {"content": "\n"}, "finish_reason": None}],
+ "choices": [
+ {
+ "index": 0,
+ "delta": {"content": "\n"},
+ "finish_reason": None,
+ }
+ ],
}
yield f"data: {json.dumps(nl_chunk)}\n\n".encode("utf-8")
pending_summary_paragraph = False
@@ -698,7 +1175,13 @@ def _merge_from(src):
"object": "chat.completion.chunk",
"created": created,
"model": model,
- "choices": [{"index": 0, "delta": {"content": delta_txt}, "finish_reason": None}],
+ "choices": [
+ {
+ "index": 0,
+ "delta": {"content": delta_txt},
+ "finish_reason": None,
+ }
+ ],
}
yield f"data: {json.dumps(content_chunk)}\n\n".encode("utf-8")
else:
@@ -711,7 +1194,10 @@ def _merge_from(src):
"choices": [
{
"index": 0,
- "delta": {"reasoning_summary": delta_txt, "reasoning": delta_txt},
+ "delta": {
+ "reasoning_summary": delta_txt,
+ "reasoning": delta_txt,
+ },
"finish_reason": None,
}
],
@@ -724,7 +1210,11 @@ def _merge_from(src):
"created": created,
"model": model,
"choices": [
- {"index": 0, "delta": {"reasoning": delta_txt}, "finish_reason": None}
+ {
+ "index": 0,
+ "delta": {"reasoning": delta_txt},
+ "finish_reason": None,
+ }
],
}
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
@@ -741,7 +1231,11 @@ def _merge_from(src):
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
sent_stop_chunk = True
elif kind == "response.failed":
- err = evt.get("response", {}).get("error", {}).get("message", "response.failed")
+ err = (
+ evt.get("response", {})
+ .get("error", {})
+ .get("message", "response.failed")
+ )
chunk = {"error": {"message": err}}
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
elif kind == "response.completed":
@@ -754,7 +1248,13 @@ def _merge_from(src):
"object": "chat.completion.chunk",
"created": created,
"model": model,
- "choices": [{"index": 0, "delta": {"content": ""}, "finish_reason": None}],
+ "choices": [
+ {
+ "index": 0,
+ "delta": {"content": ""},
+ "finish_reason": None,
+ }
+ ],
}
yield f"data: {json.dumps(close_chunk)}\n\n".encode("utf-8")
think_open = False
@@ -777,7 +1277,9 @@ def _merge_from(src):
"object": "chat.completion.chunk",
"created": created,
"model": model,
- "choices": [{"index": 0, "delta": {}, "finish_reason": None}],
+ "choices": [
+ {"index": 0, "delta": {}, "finish_reason": None}
+ ],
"usage": upstream_usage,
}
yield f"data: {json.dumps(usage_chunk)}\n\n".encode("utf-8")
@@ -789,10 +1291,18 @@ def _merge_from(src):
upstream.close()
-def sse_translate_text(upstream, model: str, created: int, verbose: bool = False, vlog=None, *, include_usage: bool = False):
+def sse_translate_text(
+ upstream,
+ model: str,
+ created: int,
+ verbose: bool = False,
+ vlog=None,
+ *,
+ include_usage: bool = False,
+):
response_id = "cmpl-stream"
upstream_usage = None
-
+
def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
try:
usage = (evt.get("response") or {}).get("usage")
@@ -804,16 +1314,21 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
return {"prompt_tokens": pt, "completion_tokens": ct, "total_tokens": tt}
except Exception:
return None
+
try:
for raw_line in upstream.iter_lines(decode_unicode=False):
if not raw_line:
continue
- line = raw_line.decode("utf-8", errors="ignore") if isinstance(raw_line, (bytes, bytearray)) else raw_line
+ line = (
+ raw_line.decode("utf-8", errors="ignore")
+ if isinstance(raw_line, (bytes, bytearray))
+ else raw_line
+ )
if verbose and vlog:
vlog(line)
if not line.startswith("data: "):
continue
- data = line[len("data: "):].strip()
+ data = line[len("data: ") :].strip()
if not data or data == "[DONE]":
if data == "[DONE]":
chunk = {
@@ -830,7 +1345,9 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
except Exception:
continue
kind = evt.get("type")
- if isinstance(evt.get("response"), dict) and isinstance(evt["response"].get("id"), str):
+ if isinstance(evt.get("response"), dict) and isinstance(
+ evt["response"].get("id"), str
+ ):
response_id = evt["response"].get("id") or response_id
if kind == "response.output_text.delta":
delta_text = evt.get("delta") or ""
@@ -839,7 +1356,9 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
"object": "text_completion.chunk",
"created": created,
"model": model,
- "choices": [{"index": 0, "text": delta_text, "finish_reason": None}],
+ "choices": [
+ {"index": 0, "text": delta_text, "finish_reason": None}
+ ],
}
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
elif kind == "response.output_text.done":
@@ -862,7 +1381,9 @@ def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
"object": "text_completion.chunk",
"created": created,
"model": model,
- "choices": [{"index": 0, "text": "", "finish_reason": None}],
+ "choices": [
+ {"index": 0, "text": "", "finish_reason": None}
+ ],
"usage": upstream_usage,
}
yield f"data: {json.dumps(usage_chunk)}\n\n".encode("utf-8")