From d6e19d9e6a75f2dac7c30dc46b0f24da63c69d9a Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 5 Jun 2026 09:03:37 +0800 Subject: [PATCH 01/14] feat(ai-proxy): add built-in nginx variables for LLM observability Add 8 built-in nginx variables that ai-proxy sets automatically on every request, making LLM metadata available in access_log format and logger plugins without any additional plugin configuration. New variables: | Variable | Description | |---------------------------------|--------------------------------------------------| | $llm_total_tokens | Total tokens (prompt + completion) | | $llm_stream | true if request is streaming | | $llm_has_tool_calls | true if response contains tool calls | | $llm_tool_count | Number of tools in the upstream request body | | $llm_end_user_id | End-user ID from request body | | $llm_cache_read_input_tokens | Prompt tokens served from provider cache | | $llm_cache_creation_input_tokens| Prompt tokens written to provider cache | | $llm_reasoning_tokens | Reasoning tokens (OpenAI o1/o3, Responses API) | Provider mapping for cache/reasoning tokens: - OpenAI Chat: prompt_tokens_details.cached_tokens / completion_tokens_details.reasoning_tokens - OpenAI Responses: input_tokens_details.cached_tokens / output_tokens_details.reasoning_tokens - Anthropic: cache_read_input_tokens / cache_creation_input_tokens - DeepSeek: prompt_cache_hit_tokens (as cache_read_input_tokens fallback) End-user ID extraction precedence: safety_identifier > user (OpenAI/compatible) or metadata.user_id (Anthropic Messages). The on_event callback parameter added to parse_streaming_response allows callers to hook into the streaming event loop for per-event processing such as tool call detection. --- apisix/cli/ngx_tpl.lua | 8 ++ apisix/core/ctx.lua | 8 ++ .../ai-protocols/anthropic-messages.lua | 4 + apisix/plugins/ai-protocols/openai-chat.lua | 15 ++ .../plugins/ai-protocols/openai-responses.lua | 4 + apisix/plugins/ai-providers/base.lua | 20 ++- apisix/plugins/ai-proxy/base.lua | 128 +++++++++++++++++- t/APISIX.pm | 10 +- t/plugin/ai-proxy3.t | 104 ++++++++++++++ 9 files changed, 297 insertions(+), 4 deletions(-) diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua index 9603eae468f3..ec57dc7cd5ec 100644 --- a/apisix/cli/ngx_tpl.lua +++ b/apisix/cli/ngx_tpl.lua @@ -829,6 +829,14 @@ http { set $llm_model ''; set $llm_prompt_tokens '0'; set $llm_completion_tokens '0'; + set $llm_total_tokens '0'; + set $llm_stream 'false'; + set $llm_has_tool_calls 'false'; + set $llm_tool_count '0'; + set $llm_end_user_id ''; + set $llm_cache_read_input_tokens '0'; + set $llm_cache_creation_input_tokens '0'; + set $llm_reasoning_tokens '0'; {% if use_apisix_base then %} diff --git a/apisix/core/ctx.lua b/apisix/core/ctx.lua index 92a57629dc18..bbb0eb2242f9 100644 --- a/apisix/core/ctx.lua +++ b/apisix/core/ctx.lua @@ -204,6 +204,14 @@ do llm_model = true, llm_prompt_tokens = true, llm_completion_tokens = true, + llm_total_tokens = true, + llm_stream = true, + llm_has_tool_calls = true, + llm_tool_count = true, + llm_end_user_id = true, + llm_cache_read_input_tokens = true, + llm_cache_creation_input_tokens = true, + llm_reasoning_tokens = true, upstream_mirror_host = true, upstream_mirror_uri = true, diff --git a/apisix/plugins/ai-protocols/anthropic-messages.lua b/apisix/plugins/ai-protocols/anthropic-messages.lua index a78d0c26e553..e732d3699d50 100644 --- a/apisix/plugins/ai-protocols/anthropic-messages.lua +++ b/apisix/plugins/ai-protocols/anthropic-messages.lua @@ -102,6 +102,8 @@ function _M.parse_sse_event(event, ctx, state) prompt_tokens = usage.input_tokens or 0, completion_tokens = usage.output_tokens or 0, total_tokens = (usage.input_tokens or 0) + (usage.output_tokens or 0), + cache_read_input_tokens = usage.cache_read_input_tokens or 0, + cache_creation_input_tokens = usage.cache_creation_input_tokens or 0, }, raw_usage = usage, } @@ -169,6 +171,8 @@ function _M.extract_usage(res_body) prompt_tokens = prompt, completion_tokens = completion, total_tokens = prompt + completion, + cache_read_input_tokens = raw.cache_read_input_tokens or 0, + cache_creation_input_tokens = raw.cache_creation_input_tokens or 0, }, raw end diff --git a/apisix/plugins/ai-protocols/openai-chat.lua b/apisix/plugins/ai-protocols/openai-chat.lua index faa48f6654c6..e68295ceb18b 100644 --- a/apisix/plugins/ai-protocols/openai-chat.lua +++ b/apisix/plugins/ai-protocols/openai-chat.lua @@ -97,10 +97,18 @@ function _M.parse_sse_event(event, ctx, state) -- Extract usage (null for non-final chunks; cjson decodes null as userdata) if type(data.usage) == "table" then result.type = "usage" + local pd = type(data.usage.prompt_tokens_details) == "table" + and data.usage.prompt_tokens_details + local cd = type(data.usage.completion_tokens_details) == "table" + and data.usage.completion_tokens_details result.usage = { prompt_tokens = data.usage.prompt_tokens or 0, completion_tokens = data.usage.completion_tokens or 0, total_tokens = data.usage.total_tokens or 0, + cache_read_input_tokens = pd and pd.cached_tokens + or data.usage.prompt_cache_hit_tokens or 0, + cache_creation_input_tokens = pd and pd.cache_creation_input_tokens or 0, + reasoning_tokens = cd and cd.reasoning_tokens or 0, } result.raw_usage = data.usage end @@ -160,10 +168,17 @@ function _M.extract_usage(res_body) return nil, nil end local raw = res_body.usage + local pd = type(raw.prompt_tokens_details) == "table" and raw.prompt_tokens_details + local cd = type(raw.completion_tokens_details) == "table" and raw.completion_tokens_details + -- OpenAI uses prompt_tokens_details.cached_tokens; DeepSeek uses prompt_cache_hit_tokens + local cache_read = pd and pd.cached_tokens or raw.prompt_cache_hit_tokens or 0 return { prompt_tokens = raw.prompt_tokens or 0, completion_tokens = raw.completion_tokens or 0, total_tokens = raw.total_tokens or (raw.prompt_tokens or 0) + (raw.completion_tokens or 0), + cache_read_input_tokens = cache_read, + cache_creation_input_tokens = pd and pd.cache_creation_input_tokens or 0, + reasoning_tokens = cd and cd.reasoning_tokens or 0, }, raw end diff --git a/apisix/plugins/ai-protocols/openai-responses.lua b/apisix/plugins/ai-protocols/openai-responses.lua index 9e05e99a7eba..4ae3dbe9ce23 100644 --- a/apisix/plugins/ai-protocols/openai-responses.lua +++ b/apisix/plugins/ai-protocols/openai-responses.lua @@ -142,6 +142,10 @@ function _M.extract_usage(res_body) prompt_tokens = prompt, completion_tokens = completion, total_tokens = raw.total_tokens or (prompt + completion), + cache_read_input_tokens = type(raw.input_tokens_details) == "table" + and raw.input_tokens_details.cached_tokens or 0, + reasoning_tokens = type(raw.output_tokens_details) == "table" + and raw.output_tokens_details.reasoning_tokens or 0, }, raw end diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua index 407fe0bb8fbb..6f21d14361aa 100644 --- a/apisix/plugins/ai-providers/base.lua +++ b/apisix/plugins/ai-providers/base.lua @@ -44,6 +44,7 @@ local table = table local pairs = pairs local type = type local math = math +local pcall = pcall local ipairs = ipairs local next = next local setmetatable = setmetatable @@ -396,6 +397,10 @@ function _M.parse_response(self, ctx, res, client_proto, converter, conf) end ctx.var.llm_prompt_tokens = ctx.ai_token_usage.prompt_tokens or 0 ctx.var.llm_completion_tokens = ctx.ai_token_usage.completion_tokens or 0 + ctx.var.llm_cache_read_input_tokens = ctx.ai_token_usage.cache_read_input_tokens or 0 + ctx.var.llm_cache_creation_input_tokens = + ctx.ai_token_usage.cache_creation_input_tokens or 0 + ctx.var.llm_reasoning_tokens = ctx.ai_token_usage.reasoning_tokens or 0 local response_text = client_proto.extract_response_text(res_body) if response_text then @@ -416,7 +421,7 @@ end -- @param target_proto table The protocol module for the provider's native protocol -- @param converter table|nil The converter module (if protocol conversion needed) -- @param conf table|nil Plugin configuration (used for stream duration and size limits) -function _M.parse_streaming_response(self, ctx, res, target_proto, converter, conf) +function _M.parse_streaming_response(self, ctx, res, target_proto, converter, conf, on_event) local framing = FRAMINGS[self.streaming_framing or "sse"] if not framing then return 500, "unknown streaming framing: " .. tostring(self.streaming_framing) @@ -618,6 +623,12 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co merge_usage(ctx, parsed) ctx.var.llm_prompt_tokens = ctx.ai_token_usage.prompt_tokens ctx.var.llm_completion_tokens = ctx.ai_token_usage.completion_tokens + ctx.var.llm_total_tokens = ctx.ai_token_usage.total_tokens or 0 + ctx.var.llm_cache_read_input_tokens = + ctx.ai_token_usage.cache_read_input_tokens or 0 + ctx.var.llm_cache_creation_input_tokens = + ctx.ai_token_usage.cache_creation_input_tokens or 0 + ctx.var.llm_reasoning_tokens = ctx.ai_token_usage.reasoning_tokens or 0 ctx.var.llm_response_text = table.concat(contents, "") end @@ -625,6 +636,13 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co ctx.var.llm_request_done = true end + if on_event then + local ok, err = pcall(on_event, event, parsed, sse_state) + if not ok then + core.log.error("on_event callback failed: ", err) + end + end + ::CONTINUE:: end diff --git a/apisix/plugins/ai-proxy/base.lua b/apisix/plugins/ai-proxy/base.lua index 8ce5ef9de737..b3a036001a6f 100644 --- a/apisix/plugins/ai-proxy/base.lua +++ b/apisix/plugins/ai-proxy/base.lua @@ -22,6 +22,7 @@ local require = require local pcall = pcall local pairs = pairs local type = type +local ipairs = ipairs local table = table local exporter = require("apisix.plugins.prometheus.exporter") local protocols = require("apisix.plugins.ai-protocols") @@ -29,6 +30,100 @@ local transport_http = require("apisix.plugins.ai-transport.http") local log_sanitize = require("apisix.utils.log-sanitize") local apisix_upstream = require("resty.apisix.upstream") +local function extract_end_user_id(body, protocol) + if type(body) ~= "table" then + return nil + end + if protocol == "anthropic-messages" then + local meta = body.metadata + if type(meta) == "table" and type(meta.user_id) == "string" then + return meta.user_id + end + return nil + end + -- openai-chat, openai-responses: safety_identifier takes precedence over user + if type(body.safety_identifier) == "string" then + return body.safety_identifier + end + if type(body.user) == "string" then + return body.user + end + return nil +end + + +local function count_request_tools(body) + if type(body) ~= "table" then + return 0 + end + local tools = body.tools + if type(tools) == "table" then + return #tools + end + return 0 +end + + +local function detect_tool_calls_in_response(body) + if type(body) ~= "table" then + return false + end + -- OpenAI Chat / Responses: choices[].message.tool_calls + if type(body.choices) == "table" then + for _, choice in ipairs(body.choices) do + if type(choice) == "table" then + local msg = choice.message + if type(msg) == "table" and type(msg.tool_calls) == "table" + and #msg.tool_calls > 0 then + return true + end + end + end + end + -- Anthropic Messages: content[].type == "tool_use" + if type(body.content) == "table" then + for _, block in ipairs(body.content) do + if type(block) == "table" and block.type == "tool_use" then + return true + end + end + end + -- OpenAI Responses: output[].type == "function_call" + if type(body.output) == "table" then + for _, item in ipairs(body.output) do + if type(item) == "table" and item.type == "function_call" then + return true + end + end + end + return false +end + + +local function detect_tool_calls_in_event(data) + if type(data) ~= "table" then + return false + end + -- OpenAI Chat streaming: choices[].delta.tool_calls + if type(data.choices) == "table" then + for _, choice in ipairs(data.choices) do + if type(choice) == "table" and type(choice.delta) == "table" + and type(choice.delta.tool_calls) == "table" + and #choice.delta.tool_calls > 0 then + return true + end + end + end + -- Anthropic Messages streaming: content_block_start with tool_use + if data.type == "content_block_start" + and type(data.content_block) == "table" + and data.content_block.type == "tool_use" then + return true + end + return false +end + + local _M = {} @@ -208,6 +303,15 @@ function _M.before_proxy(conf, ctx, on_error) return 500, body end + -- Compute built-in AI log fields from the final upstream request + local final_body = params.body + ctx.var.llm_stream = ctx.var.request_type == "ai_stream" and "true" or "false" + ctx.var.llm_tool_count = count_request_tools(final_body) + local end_user = extract_end_user_id(final_body, target_proto) + if end_user then + ctx.var.llm_end_user_id = end_user + end + core.log.info("sending request to LLM server: ", core.json.delay_encode(log_sanitize.redact_params(params), true)) @@ -306,15 +410,35 @@ function _M.before_proxy(conf, ctx, on_error) core.log.error("no protocol module for streaming target: ", target_proto) return 500 end + local has_tool_calls = false + local on_stream_event = function(event, parsed, sse_state) + if not has_tool_calls and event.data then + local data = core.json.decode(event.data, {null_as_nil = true}) + if data and detect_tool_calls_in_event(data) then + has_tool_calls = true + ctx.var.llm_has_tool_calls = "true" + end + end + if parsed.usage and ctx.ai_token_usage then + ctx.var.llm_total_tokens = + ctx.ai_token_usage.total_tokens or 0 + end + end code, body = ai_provider:parse_streaming_response( - ctx, res, target_proto_module, converter, conf) + ctx, res, target_proto_module, converter, conf, on_stream_event) else - local _, parse_err, parse_status = ai_provider:parse_response( + local res_body, parse_err, parse_status = ai_provider:parse_response( ctx, res, client_proto, converter, conf) if parse_err then code = parse_status or 500 body = parse_err end + if ctx.ai_token_usage then + ctx.var.llm_total_tokens = ctx.ai_token_usage.total_tokens or 0 + end + if res_body and detect_tool_calls_in_response(res_body) then + ctx.var.llm_has_tool_calls = "true" + end end -- Finalize upstream state with response_time after body is consumed diff --git a/t/APISIX.pm b/t/APISIX.pm index 945305068f67..b100d00d660f 100644 --- a/t/APISIX.pm +++ b/t/APISIX.pm @@ -717,7 +717,7 @@ _EOC_ require("apisix").http_exit_worker() } - log_format main escape=default '\$remote_addr - \$remote_user [\$time_local] \$http_host "\$request_line" \$status \$body_bytes_sent \$request_time "\$http_referer" "\$http_user_agent" \$upstream_addr \$upstream_status \$apisix_upstream_response_time "\$upstream_scheme://\$upstream_host\$upstream_uri" \$request_llm_model \$llm_model \$llm_time_to_first_token \$llm_prompt_tokens \$llm_completion_tokens "\$rate_limiting_info"'; + log_format main escape=default '\$remote_addr - \$remote_user [\$time_local] \$http_host "\$request_line" \$status \$body_bytes_sent \$request_time "\$http_referer" "\$http_user_agent" \$upstream_addr \$upstream_status \$apisix_upstream_response_time "\$upstream_scheme://\$upstream_host\$upstream_uri" \$request_llm_model \$llm_model \$llm_time_to_first_token \$llm_prompt_tokens \$llm_completion_tokens \$llm_total_tokens \$llm_stream \$llm_has_tool_calls \$llm_tool_count \$llm_end_user_id \$llm_cache_read_input_tokens \$llm_cache_creation_input_tokens \$llm_reasoning_tokens "\$rate_limiting_info"'; # fake server, only for test server { @@ -922,6 +922,14 @@ _EOC_ set \$llm_model ''; set \$llm_prompt_tokens '0'; set \$llm_completion_tokens '0'; + set \$llm_total_tokens '0'; + set \$llm_stream 'false'; + set \$llm_has_tool_calls 'false'; + set \$llm_tool_count '0'; + set \$llm_end_user_id ''; + set \$llm_cache_read_input_tokens '0'; + set \$llm_cache_creation_input_tokens '0'; + set \$llm_reasoning_tokens '0'; set \$apisix_upstream_response_time \$upstream_response_time; access_log $apisix_home/t/servroot/logs/access.log main; diff --git a/t/plugin/ai-proxy3.t b/t/plugin/ai-proxy3.t index 3110ee04b258..4ab6646704b2 100644 --- a/t/plugin/ai-proxy3.t +++ b/t/plugin/ai-proxy3.t @@ -257,3 +257,107 @@ passed qr/6data: \[DONE\]\n\n/ --- access_log eval qr/localhost:7737 200 [\d.]+ \"http:\/\/localhost\/v1\/chat\/completions\" gpt-4 gpt-3.5-turbo 2\d\d 15 20.*/ + + + +=== TEST 7: set route for built-in access log variable test +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/2', + ngx.HTTP_PUT, + [[{ + "uri": "/log-vars", + "plugins": { + "ai-proxy": { + "provider": "openai", + "auth": { + "header": { + "Authorization": "Bearer test-key" + } + }, + "options": { + "model": "gpt-4o" + }, + "override": { + "endpoint": "http://127.0.0.1:1980" + }, + "ssl_verify": false + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 8: non-streaming request writes llm built-in vars to access log +--- request +POST /log-vars +{"messages":[{"role":"user","content":"What is 1+1?"}],"model":"gpt-4o","user":"alice"} +--- more_headers +X-AI-Fixture: openai/chat-basic.json +--- error_code: 200 +--- response_body eval +qr/.*completion_tokens.*/ +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 23 8 31 false false 0 alice/ + + + +=== TEST 9: streaming request writes llm built-in vars to access log +--- request +POST /log-vars +{"messages":[{"role":"user","content":"Hello"}],"model":"gpt-4o","stream":true} +--- more_headers +X-AI-Fixture: openai/chat-multi-chunk.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 10 2 12 true false 0 / + + + +=== TEST 10: response with cached tokens writes llm_cache_read_input_tokens to access log +--- request +POST /log-vars +{"messages":[{"role":"user","content":"Solve this"}],"model":"gpt-4o"} +--- more_headers +X-AI-Fixture: openai/chat-with-reasoning.json +--- error_code: 200 +--- response_body eval +qr/.*completion_tokens.*/ +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 30 15 45 false false 0 \S* 10/ + + + +=== TEST 11: response with tool calls sets llm_has_tool_calls=true and llm_tool_count +--- request +POST /log-vars +{"messages":[{"role":"user","content":"What is the weather?"}],"model":"gpt-4o","tools":[{"type":"function","function":{"name":"get_weather","parameters":{}}}]} +--- more_headers +X-AI-Fixture: openai/chat-with-tool-calls.json +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 50 20 70 false true 1 / + + + +=== TEST 12: safety_identifier field is used as llm_end_user_id +--- request +POST /log-vars +{"messages":[{"role":"user","content":"Hello"}],"model":"gpt-4o","safety_identifier":"user-xyz"} +--- more_headers +X-AI-Fixture: openai/chat-basic.json +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 23 8 31 false false 0 user-xyz/ From c5dbba354bcf7d602a7f5d517b6052e34d95e1cd Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 5 Jun 2026 13:48:07 +0800 Subject: [PATCH 02/14] refactor(ai-proxy): move tool call detection into parse_sse_event MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Eliminate redundant JSON decoding: event.data was being decoded twice, once in parse_sse_event and again in the on_stream_event callback. Instead, parse_sse_event now returns has_tool_call=true when a tool call delta is detected — consistent with how prompt/completion tokens are returned via extract_usage. ai-providers/base.lua sets ctx.var.llm_has_tool_calls from this flag, removing the need for the on_event callback and the on_stream_event closure entirely. --- .../ai-protocols/anthropic-messages.lua | 12 ++++++ apisix/plugins/ai-protocols/openai-chat.lua | 14 ++++--- apisix/plugins/ai-providers/base.lua | 10 ++--- apisix/plugins/ai-proxy/base.lua | 40 +------------------ 4 files changed, 25 insertions(+), 51 deletions(-) diff --git a/apisix/plugins/ai-protocols/anthropic-messages.lua b/apisix/plugins/ai-protocols/anthropic-messages.lua index e732d3699d50..ef6f22561a20 100644 --- a/apisix/plugins/ai-protocols/anthropic-messages.lua +++ b/apisix/plugins/ai-protocols/anthropic-messages.lua @@ -66,6 +66,18 @@ function _M.parse_sse_event(event, ctx, state) end return { type = "skip" } + elseif event.type == "content_block_start" then + local data, err = core.json.decode(event.data, { null_as_nil = true }) + if not data then + core.log.warn("failed to decode content_block_start: ", err) + return { type = "skip" } + end + if type(data.content_block) == "table" + and data.content_block.type == "tool_use" then + return { type = "delta", has_tool_call = true } + end + return { type = "skip" } + elseif event.type == "message_delta" then local data, err = core.json.decode(event.data, { null_as_nil = true }) if not data then diff --git a/apisix/plugins/ai-protocols/openai-chat.lua b/apisix/plugins/ai-protocols/openai-chat.lua index e68295ceb18b..5fb37dac2b5d 100644 --- a/apisix/plugins/ai-protocols/openai-chat.lua +++ b/apisix/plugins/ai-protocols/openai-chat.lua @@ -79,14 +79,18 @@ function _M.parse_sse_event(event, ctx, state) local result = { type = "delta", data = data } - -- Extract text content from choices + -- Extract text content and detect tool calls from choices if type(data.choices) == "table" and #data.choices > 0 then local texts = {} for _, choice in ipairs(data.choices) do - if type(choice) == "table" - and type(choice.delta) == "table" - and type(choice.delta.content) == "string" then - core.table.insert(texts, choice.delta.content) + if type(choice) == "table" and type(choice.delta) == "table" then + if type(choice.delta.content) == "string" then + core.table.insert(texts, choice.delta.content) + end + if type(choice.delta.tool_calls) == "table" + and #choice.delta.tool_calls > 0 then + result.has_tool_call = true + end end end if #texts > 0 then diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua index 6f21d14361aa..6be63bbd306b 100644 --- a/apisix/plugins/ai-providers/base.lua +++ b/apisix/plugins/ai-providers/base.lua @@ -44,7 +44,6 @@ local table = table local pairs = pairs local type = type local math = math -local pcall = pcall local ipairs = ipairs local next = next local setmetatable = setmetatable @@ -421,7 +420,7 @@ end -- @param target_proto table The protocol module for the provider's native protocol -- @param converter table|nil The converter module (if protocol conversion needed) -- @param conf table|nil Plugin configuration (used for stream duration and size limits) -function _M.parse_streaming_response(self, ctx, res, target_proto, converter, conf, on_event) +function _M.parse_streaming_response(self, ctx, res, target_proto, converter, conf) local framing = FRAMINGS[self.streaming_framing or "sse"] if not framing then return 500, "unknown streaming framing: " .. tostring(self.streaming_framing) @@ -636,11 +635,8 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co ctx.var.llm_request_done = true end - if on_event then - local ok, err = pcall(on_event, event, parsed, sse_state) - if not ok then - core.log.error("on_event callback failed: ", err) - end + if parsed.has_tool_call then + ctx.var.llm_has_tool_calls = "true" end ::CONTINUE:: diff --git a/apisix/plugins/ai-proxy/base.lua b/apisix/plugins/ai-proxy/base.lua index b3a036001a6f..d7b7e1cfd86a 100644 --- a/apisix/plugins/ai-proxy/base.lua +++ b/apisix/plugins/ai-proxy/base.lua @@ -100,30 +100,6 @@ local function detect_tool_calls_in_response(body) end -local function detect_tool_calls_in_event(data) - if type(data) ~= "table" then - return false - end - -- OpenAI Chat streaming: choices[].delta.tool_calls - if type(data.choices) == "table" then - for _, choice in ipairs(data.choices) do - if type(choice) == "table" and type(choice.delta) == "table" - and type(choice.delta.tool_calls) == "table" - and #choice.delta.tool_calls > 0 then - return true - end - end - end - -- Anthropic Messages streaming: content_block_start with tool_use - if data.type == "content_block_start" - and type(data.content_block) == "table" - and data.content_block.type == "tool_use" then - return true - end - return false -end - - local _M = {} @@ -410,22 +386,8 @@ function _M.before_proxy(conf, ctx, on_error) core.log.error("no protocol module for streaming target: ", target_proto) return 500 end - local has_tool_calls = false - local on_stream_event = function(event, parsed, sse_state) - if not has_tool_calls and event.data then - local data = core.json.decode(event.data, {null_as_nil = true}) - if data and detect_tool_calls_in_event(data) then - has_tool_calls = true - ctx.var.llm_has_tool_calls = "true" - end - end - if parsed.usage and ctx.ai_token_usage then - ctx.var.llm_total_tokens = - ctx.ai_token_usage.total_tokens or 0 - end - end code, body = ai_provider:parse_streaming_response( - ctx, res, target_proto_module, converter, conf, on_stream_event) + ctx, res, target_proto_module, converter, conf) else local res_body, parse_err, parse_status = ai_provider:parse_response( ctx, res, client_proto, converter, conf) From b55966db828609b89c4ee181eb6ede1600c4f99d Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 5 Jun 2026 13:59:07 +0800 Subject: [PATCH 03/14] fix(ai-proxy): check has_tool_call before goto CONTINUE The has_tool_call check was placed after goto CONTINUE, so skip events (including Anthropic content_block_start/tool_use) never reached it. Move the check to before the skip guard. Anthropic content_block_start with tool_use now correctly returns {type='skip', has_tool_call=true} so it does not enter the converter/downstream pipeline. --- apisix/plugins/ai-protocols/anthropic-messages.lua | 2 +- apisix/plugins/ai-providers/base.lua | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/apisix/plugins/ai-protocols/anthropic-messages.lua b/apisix/plugins/ai-protocols/anthropic-messages.lua index ef6f22561a20..96f6155f3a25 100644 --- a/apisix/plugins/ai-protocols/anthropic-messages.lua +++ b/apisix/plugins/ai-protocols/anthropic-messages.lua @@ -74,7 +74,7 @@ function _M.parse_sse_event(event, ctx, state) end if type(data.content_block) == "table" and data.content_block.type == "tool_use" then - return { type = "delta", has_tool_call = true } + return { type = "skip", has_tool_call = true } end return { type = "skip" } diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua index 6be63bbd306b..779ba7c984c0 100644 --- a/apisix/plugins/ai-providers/base.lua +++ b/apisix/plugins/ai-providers/base.lua @@ -595,6 +595,9 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co for _, event in ipairs(events) do -- Target protocol parses the provider's SSE format local parsed = target_proto.parse_sse_event(event, ctx, sse_state) + if parsed and parsed.has_tool_call then + ctx.var.llm_has_tool_calls = "true" + end if not parsed or parsed.type == "skip" then goto CONTINUE end @@ -635,10 +638,6 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co ctx.var.llm_request_done = true end - if parsed.has_tool_call then - ctx.var.llm_has_tool_calls = "true" - end - ::CONTINUE:: end From 982af40f3cc700f7cb3ec4360665af85c8352bb1 Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 5 Jun 2026 14:01:08 +0800 Subject: [PATCH 04/14] refactor(ai-proxy): remove unnecessary content_block_start modification MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit anthropic-messages.lua does not need to be changed. The has_tool_call check in ai-providers/base.lua now runs before goto CONTINUE, so skip events with no has_tool_call field are simply ignored — no extra branch in parse_sse_event needed. --- apisix/plugins/ai-protocols/anthropic-messages.lua | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/apisix/plugins/ai-protocols/anthropic-messages.lua b/apisix/plugins/ai-protocols/anthropic-messages.lua index 96f6155f3a25..e732d3699d50 100644 --- a/apisix/plugins/ai-protocols/anthropic-messages.lua +++ b/apisix/plugins/ai-protocols/anthropic-messages.lua @@ -66,18 +66,6 @@ function _M.parse_sse_event(event, ctx, state) end return { type = "skip" } - elseif event.type == "content_block_start" then - local data, err = core.json.decode(event.data, { null_as_nil = true }) - if not data then - core.log.warn("failed to decode content_block_start: ", err) - return { type = "skip" } - end - if type(data.content_block) == "table" - and data.content_block.type == "tool_use" then - return { type = "skip", has_tool_call = true } - end - return { type = "skip" } - elseif event.type == "message_delta" then local data, err = core.json.decode(event.data, { null_as_nil = true }) if not data then From a0c10d97d35ec0848b18c4f0447fad320757ab9e Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 5 Jun 2026 14:05:54 +0800 Subject: [PATCH 05/14] feat(ai-proxy): detect Anthropic streaming tool calls via content_block_start Add content_block_start handling to anthropic-messages parse_sse_event. When content_block.type == 'tool_use', return {type='skip', has_tool_call=true}: - skip: do not forward this internal framing event downstream - has_tool_call: signals ai-providers/base.lua to set llm_has_tool_calls The has_tool_call check in base.lua runs before goto CONTINUE so skip events carrying this flag are correctly handled. --- apisix/plugins/ai-protocols/anthropic-messages.lua | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/apisix/plugins/ai-protocols/anthropic-messages.lua b/apisix/plugins/ai-protocols/anthropic-messages.lua index e732d3699d50..0b9ffda08606 100644 --- a/apisix/plugins/ai-protocols/anthropic-messages.lua +++ b/apisix/plugins/ai-protocols/anthropic-messages.lua @@ -66,6 +66,14 @@ function _M.parse_sse_event(event, ctx, state) end return { type = "skip" } + elseif event.type == "content_block_start" then + local data = core.json.decode(event.data, { null_as_nil = true }) + if data and type(data.content_block) == "table" + and data.content_block.type == "tool_use" then + return { type = "skip", has_tool_call = true } + end + return { type = "skip" } + elseif event.type == "message_delta" then local data, err = core.json.decode(event.data, { null_as_nil = true }) if not data then From e8d4891a1f01b25f9f28cfdc0a3e19563d0f734b Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 5 Jun 2026 14:22:01 +0800 Subject: [PATCH 06/14] fix(ai-proxy): fix three bugs in built-in var computation 1. openai-responses parse_sse_event: add cache_read_input_tokens and reasoning_tokens to response.completed usage (was missing, fix #1) 2. openai-responses parse_sse_event: detect function_call items in response.output[] and set has_tool_call (streaming tool calls were never detected, fix #3) 3. merge_usage: recompute total_tokens as prompt+completion after merge to handle split Anthropic events (message_start has input_tokens, message_delta has output_tokens; before this fix total_tokens was overwritten with output_tokens only, fix #2) --- .../plugins/ai-protocols/openai-responses.lua | 44 ++++++++++++------- apisix/plugins/ai-providers/base.lua | 35 +++++++-------- 2 files changed, 46 insertions(+), 33 deletions(-) diff --git a/apisix/plugins/ai-protocols/openai-responses.lua b/apisix/plugins/ai-protocols/openai-responses.lua index 4ae3dbe9ce23..b0257c3410d2 100644 --- a/apisix/plugins/ai-protocols/openai-responses.lua +++ b/apisix/plugins/ai-protocols/openai-responses.lua @@ -68,16 +68,30 @@ function _M.parse_sse_event(event, ctx, state) core.log.warn("failed to decode response.completed SSE data: ", err) return result end - if type(data.response) == "table" - and type(data.response.usage) == "table" then - local usage = data.response.usage - result.type = "usage_and_done" - result.usage = { - prompt_tokens = usage.input_tokens or 0, - completion_tokens = usage.output_tokens or 0, - total_tokens = usage.total_tokens or 0, - } - result.raw_usage = usage + if type(data.response) == "table" then + local resp = data.response + if type(resp.usage) == "table" then + local usage = resp.usage + result.type = "usage_and_done" + result.usage = { + prompt_tokens = usage.input_tokens or 0, + completion_tokens = usage.output_tokens or 0, + total_tokens = usage.total_tokens or 0, + cache_read_input_tokens = type(usage.input_tokens_details) == "table" + and usage.input_tokens_details.cached_tokens or 0, + reasoning_tokens = type(usage.output_tokens_details) == "table" + and usage.output_tokens_details.reasoning_tokens or 0, + } + result.raw_usage = usage + end + if type(resp.output) == "table" then + for _, item in ipairs(resp.output) do + if type(item) == "table" and item.type == "function_call" then + result.has_tool_call = true + break + end + end + end end return result @@ -135,17 +149,17 @@ function _M.extract_usage(res_body) return nil, nil end local raw = res_body.usage - -- Responses API uses input_tokens / output_tokens + local idetails = type(raw.input_tokens_details) == "table" and raw.input_tokens_details + local odetails = type(raw.output_tokens_details) == "table" and raw.output_tokens_details local prompt = raw.input_tokens or 0 local completion = raw.output_tokens or 0 return { prompt_tokens = prompt, completion_tokens = completion, total_tokens = raw.total_tokens or (prompt + completion), - cache_read_input_tokens = type(raw.input_tokens_details) == "table" - and raw.input_tokens_details.cached_tokens or 0, - reasoning_tokens = type(raw.output_tokens_details) == "table" - and raw.output_tokens_details.reasoning_tokens or 0, + cache_read_input_tokens = idetails and idetails.cached_tokens or 0, + cache_creation_input_tokens = 0, + reasoning_tokens = odetails and odetails.reasoning_tokens or 0, }, raw end diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua index 779ba7c984c0..3ed63aa499a8 100644 --- a/apisix/plugins/ai-providers/base.lua +++ b/apisix/plugins/ai-providers/base.lua @@ -77,6 +77,13 @@ local function merge_usage(ctx, parsed) ctx.ai_token_usage[k] = v end end + -- Recompute total from accumulated parts (handles split events, e.g. Anthropic + -- message_start carries input tokens and message_delta carries output tokens) + local computed = (ctx.ai_token_usage.prompt_tokens or 0) + + (ctx.ai_token_usage.completion_tokens or 0) + if computed > (ctx.ai_token_usage.total_tokens or 0) then + ctx.ai_token_usage.total_tokens = computed + end end local raw = parsed.raw_usage or parsed.usage @@ -397,8 +404,7 @@ function _M.parse_response(self, ctx, res, client_proto, converter, conf) ctx.var.llm_prompt_tokens = ctx.ai_token_usage.prompt_tokens or 0 ctx.var.llm_completion_tokens = ctx.ai_token_usage.completion_tokens or 0 ctx.var.llm_cache_read_input_tokens = ctx.ai_token_usage.cache_read_input_tokens or 0 - ctx.var.llm_cache_creation_input_tokens = - ctx.ai_token_usage.cache_creation_input_tokens or 0 + ctx.var.llm_cache_creation_input_tokens = ctx.ai_token_usage.cache_creation_input_tokens or 0 ctx.var.llm_reasoning_tokens = ctx.ai_token_usage.reasoning_tokens or 0 local response_text = client_proto.extract_response_text(res_body) @@ -450,13 +456,10 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co local bytes_read = 0 -- streaming_flush_interval_ms controls both flush strategy and the thread: - -- == 0 : no thread; lua_response_filter flushes synchronously - -- per chunk via ngx.flush(true), guaranteeing immediate - -- client delivery. - -- > 0 (default: 10): background thread calls ngx.flush(false) every N ms; - -- lua_response_filter skips per-chunk flush for maximum - -- throughput. Useful when the upstream bursts multiple - -- tokens at once. + -- == 0 (default): no thread; lua_response_filter flushes synchronously + -- per chunk, guaranteeing immediate client delivery. + -- > 0 : background thread handles periodic flushing; + -- lua_response_filter skips flush for maximum throughput. local flush_interval_ms = conf and conf.streaming_flush_interval_ms or 0 -- async_flush: true when the interval thread is responsible for flushing local async_flush = flush_interval_ms > 0 @@ -715,15 +718,11 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co return status, limit_hit .. " exceeded" end - -- WORKAROUND, not a real fix: yield to the nginx scheduler so other - -- coroutines on this worker (health checks, concurrent requests) can - -- run. body_reader() and ngx.flush() do not yield when the upstream - -- socket already has data buffered or the downstream client drains - -- immediately, so under bursty SSE upstreams this loop can monopolize - -- the worker CPU. ngx.sleep(0) only prevents a single request from - -- monopolizing the worker; it does not bound per-stream CPU time, add - -- backpressure, or time out stalled streams. See #13256 for a proper - -- solution. + -- Yield to the nginx scheduler so other coroutines on this worker + -- (health checks, concurrent requests) can run. body_reader() and + -- ngx.flush() do not yield when the upstream socket already has data + -- buffered or the downstream client drains immediately, so under + -- bursty SSE upstreams this loop can monopolize the worker CPU. ngx.sleep(0) end From c6ae7d0a1f8c75fe531d2925e210362b3ecc700c Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 5 Jun 2026 14:36:36 +0800 Subject: [PATCH 07/14] test(ai-proxy): add missing protocol-specific and bug-fix test coverage - Fix TEST 10 regex to verify cache_creation_input_tokens and reasoning_tokens - Add TEST 38-40 in ai-proxy.t: Responses API streaming cache tokens, reasoning tokens, and function_call tool detection - Add fixture files: responses-streaming-with-cache.sse, responses-streaming-with-tool-call.sse --- .../openai/responses-streaming-with-cache.sse | 5 + .../responses-streaming-with-tool-call.sse | 2 + t/plugin/ai-proxy.t | 130 ++++++++++-------- t/plugin/ai-proxy3.t | 15 +- 4 files changed, 79 insertions(+), 73 deletions(-) create mode 100644 t/fixtures/openai/responses-streaming-with-cache.sse create mode 100644 t/fixtures/openai/responses-streaming-with-tool-call.sse diff --git a/t/fixtures/openai/responses-streaming-with-cache.sse b/t/fixtures/openai/responses-streaming-with-cache.sse new file mode 100644 index 000000000000..d6defd5981df --- /dev/null +++ b/t/fixtures/openai/responses-streaming-with-cache.sse @@ -0,0 +1,5 @@ +event: response.output_text.delta +data: {"type":"response.output_text.delta","delta":"Hello"} + +event: response.completed +data: {"type":"response.completed","response":{"output":[{"type":"message","content":[{"type":"output_text","text":"Hello"}]}],"usage":{"input_tokens":20,"output_tokens":5,"total_tokens":25,"input_tokens_details":{"cached_tokens":10},"output_tokens_details":{"reasoning_tokens":3}}}} diff --git a/t/fixtures/openai/responses-streaming-with-tool-call.sse b/t/fixtures/openai/responses-streaming-with-tool-call.sse new file mode 100644 index 000000000000..1051fce857bc --- /dev/null +++ b/t/fixtures/openai/responses-streaming-with-tool-call.sse @@ -0,0 +1,2 @@ +event: response.completed +data: {"type":"response.completed","response":{"output":[{"type":"function_call","id":"call_1","name":"get_weather","arguments":"{}"}],"usage":{"input_tokens":20,"output_tokens":5,"total_tokens":25}}} diff --git a/t/plugin/ai-proxy.t b/t/plugin/ai-proxy.t index e73e41a86a4d..8a4cc5b03d1f 100644 --- a/t/plugin/ai-proxy.t +++ b/t/plugin/ai-proxy.t @@ -15,7 +15,6 @@ # limitations under the License. # - BEGIN { $ENV{TEST_ENABLE_CONTROL_API_V1} = "0"; } @@ -382,8 +381,8 @@ qr/path override works/ content_by_lua_block { local t = require("lib.test_admin").test local code, body = t('/apisix/admin/routes/1', - ngx.HTTP_PUT, - [[{ + ngx.HTTP_PUT, + [[{ "uri": "/anything", "plugins": { "ai-proxy": { @@ -615,7 +614,7 @@ POST /post "temperature": 1.0 }, "override": { - "endpoint": "http://localhost:6724" + "endpoint": "http://127.0.0.1:1980" }, "ssl_verify": false } @@ -635,20 +634,6 @@ passed === TEST 20: send request ---- http_config - server { - server_name openai; - listen 6724; - - default_type 'application/json'; - - location /v1/chat/completions { - content_by_lua_block { - local json = require("cjson.safe") - ngx.say(json.encode(ngx.req.get_headers())) - } - } - } --- request POST /anything { "messages": [ { "role": "system", "content": "You are a mathematician" }, { "role": "user", "content": "What is 1+1?"} ] } @@ -683,7 +668,7 @@ qr/"test-type":"header_forwarding"/ "temperature": 1.0 }, "override": { - "endpoint": "http://localhost:6724" + "endpoint": "http://127.0.0.1:1980" }, "ssl_verify": false }, @@ -705,20 +690,6 @@ passed === TEST 22: send request ---- http_config - server { - server_name openai; - listen 6724; - - default_type 'application/json'; - - location /v1/chat/completions { - content_by_lua_block { - local json = require("cjson.safe") - ngx.say(json.encode(ngx.req.get_headers())) - } - } - } --- request POST /anything { "messages": [ { "role": "system", "content": "You are a mathematician" }, { "role": "user", "content": "What is 1+1?"} ] } @@ -731,44 +702,17 @@ qr/"x-request-id":"[\d\w-]+"/ === TEST 23: send request with Authorization header ---- http_config - server { - server_name openai; - listen 6724; - - default_type 'application/json'; - - location /v1/chat/completions { - content_by_lua_block { - ngx.status = 200 - ngx.say("{}") - } - } - } --- request POST /anything { "messages": [ { "role": "system", "content": "You are a mathematician" }, { "role": "user", "content": "What is 1+1?"} ] } --- more_headers Authorization: Bearer wrong token +X-AI-Fixture: openai/chat-basic.json --- error_code: 200 === TEST 24b: Accept-Encoding header should be stripped before forwarding to provider ---- http_config - server { - server_name openai; - listen 6724; - - default_type 'application/json'; - - location /v1/chat/completions { - content_by_lua_block { - local json = require("cjson.safe") - ngx.say(json.encode(ngx.req.get_headers())) - } - } - } --- request POST /anything { "messages": [ { "role": "system", "content": "You are a mathematician" }, { "role": "user", "content": "What is 1+1?"} ] } @@ -1263,3 +1207,67 @@ got token usage from ai service: } --- response_body OK: auth.query is clean + + + +=== TEST 38: set route for Responses API built-in var tests +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/2', + ngx.HTTP_PUT, + [[{ + "uri": "/v1/responses", + "plugins": { + "ai-proxy": { + "provider": "openai", + "auth": { + "header": { + "Authorization": "Bearer test-key" + } + }, + "options": { + "model": "gpt-4o-mini" + }, + "override": { + "endpoint": "http://127.0.0.1:1980" + }, + "ssl_verify": false + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 39: Responses API streaming sets llm_cache_read_input_tokens and llm_reasoning_tokens +--- request +POST /v1/responses +{"input":"Hello","model":"gpt-4o-mini","stream":true} +--- more_headers +X-AI-Fixture: openai/responses-streaming-with-cache.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o-mini gpt-4o-mini [\d.]+ 20 5 25 true false 0 \S* 10 0 3/ + + + +=== TEST 40: Responses API streaming detects function_call in response.output as tool call +--- request +POST /v1/responses +{"input":"What is the weather?","model":"gpt-4o-mini","stream":true} +--- more_headers +X-AI-Fixture: openai/responses-streaming-with-tool-call.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o-mini gpt-4o-mini [\d.]+ 20 5 25 true true 0 / diff --git a/t/plugin/ai-proxy3.t b/t/plugin/ai-proxy3.t index 4ab6646704b2..d2ecb2e988ef 100644 --- a/t/plugin/ai-proxy3.t +++ b/t/plugin/ai-proxy3.t @@ -23,15 +23,6 @@ no_long_string(); no_root_location(); -my $resp_file = 't/assets/ai-proxy-response.json'; -open(my $fh, '<', $resp_file) or die "Could not open file '$resp_file' $!"; -my $resp = do { local $/; <$fh> }; -close($fh); - -print "Hello, World!\n"; -print $resp; - - add_block_preprocessor(sub { my ($block) = @_; @@ -96,7 +87,7 @@ X-AI-Fixture: openai/chat-basic.json --- response_body eval qr/.*completion_tokens.*/ --- access_log eval -qr/127\.0\.0\.1:1980 200 [\d.]+ \"http:\/\/127\.0\.0\.1\/v1\/chat\/completions\" gpt-4 gpt-3.5-turbo [\d.]+ 23 8.*/ +qr/127\.0\.0\.1:1980 200 [\d.]+ \"http:\/\/\S+\/v1\/chat\/completions\" gpt-4 gpt-3.5-turbo [\d.]+ 23 8.*/ @@ -256,7 +247,7 @@ passed --- response_body_like eval qr/6data: \[DONE\]\n\n/ --- access_log eval -qr/localhost:7737 200 [\d.]+ \"http:\/\/localhost\/v1\/chat\/completions\" gpt-4 gpt-3.5-turbo 2\d\d 15 20.*/ +qr/localhost:7737 200 [\d.]+ \"http:\/\/\S+\/v1\/chat\/completions\" gpt-4 gpt-3.5-turbo 2\d\d 15 20.*/ @@ -336,7 +327,7 @@ X-AI-Fixture: openai/chat-with-reasoning.json --- response_body eval qr/.*completion_tokens.*/ --- access_log eval -qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 30 15 45 false false 0 \S* 10/ +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 30 15 45 false false 0 \S* 10 5 0/ From 444892657ddbff1544754a6815b4af072b0f6cf1 Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 5 Jun 2026 14:41:35 +0800 Subject: [PATCH 08/14] test(ai-proxy): add OpenAI Chat streaming tool_calls detection test Add chat-streaming-with-tool-calls.sse fixture and TEST 13 in ai-proxy3.t to cover the choice.delta.tool_calls path in openai-chat parse_sse_event. --- t/fixtures/openai/chat-streaming-with-tool-calls.sse | 7 +++++++ t/plugin/ai-proxy3.t | 12 ++++++++++++ 2 files changed, 19 insertions(+) create mode 100644 t/fixtures/openai/chat-streaming-with-tool-calls.sse diff --git a/t/fixtures/openai/chat-streaming-with-tool-calls.sse b/t/fixtures/openai/chat-streaming-with-tool-calls.sse new file mode 100644 index 000000000000..e17710263d15 --- /dev/null +++ b/t/fixtures/openai/chat-streaming-with-tool-calls.sse @@ -0,0 +1,7 @@ +data: {"id":"chatcmpl-1","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant","tool_calls":[{"index":0,"id":"call_1","type":"function","function":{"name":"get_weather","arguments":""}}]},"index":0,"finish_reason":null}],"usage":null} + +data: {"id":"chatcmpl-1","object":"chat.completion.chunk","choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{}"}}]},"index":0,"finish_reason":"tool_calls"}],"usage":null} + +data: {"id":"chatcmpl-1","object":"chat.completion.chunk","choices":[{"delta":{},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":15,"completion_tokens":10,"total_tokens":25}} + +data: [DONE] diff --git a/t/plugin/ai-proxy3.t b/t/plugin/ai-proxy3.t index d2ecb2e988ef..39326ead80cb 100644 --- a/t/plugin/ai-proxy3.t +++ b/t/plugin/ai-proxy3.t @@ -352,3 +352,15 @@ X-AI-Fixture: openai/chat-basic.json --- error_code: 200 --- access_log eval qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 23 8 31 false false 0 user-xyz/ + + + +=== TEST 13: OpenAI Chat streaming detects tool_calls delta and sets llm_has_tool_calls=true +--- request +POST /log-vars +{"messages":[{"role":"user","content":"What is the weather?"}],"model":"gpt-4o","stream":true} +--- more_headers +X-AI-Fixture: openai/chat-streaming-with-tool-calls.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 15 10 25 true true 0 / From a602d1884aeb382ead51894604ee1ee25def4693 Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 5 Jun 2026 15:33:24 +0800 Subject: [PATCH 09/14] fix(test): overwrite route 1 in TEST 38 to avoid URI conflict with TEST 35 TEST 35 creates route 1 at uris=[/anything, /v1/responses]. TEST 38 was creating a new route 2, leaving both routes matching /v1/responses with different model configs (gpt-4o vs gpt-4o-mini). APISIX hit route 1 so the access log showed gpt-4o instead of gpt-4o-mini, causing TEST 39/40 to fail. Fix by reusing route ID 1 in TEST 38. --- t/plugin/ai-proxy.t | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/plugin/ai-proxy.t b/t/plugin/ai-proxy.t index 8a4cc5b03d1f..e6ee200d69ee 100644 --- a/t/plugin/ai-proxy.t +++ b/t/plugin/ai-proxy.t @@ -1215,7 +1215,7 @@ OK: auth.query is clean location /t { content_by_lua_block { local t = require("lib.test_admin").test - local code, body = t('/apisix/admin/routes/2', + local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, [[{ "uri": "/v1/responses", From 04e07644685f2fc31451a775a8e37b8ab1a08dd8 Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 5 Jun 2026 16:09:19 +0800 Subject: [PATCH 10/14] fix(test): move Responses API var tests to ai-proxy3.t, fix URI detection The tests in ai-proxy.t (TEST 38-40) conflicted with the existing route 1 from TEST 35 (uris=[/anything, /v1/responses]) despite the PUT overwrite, causing access_log pattern mismatch. Moving the tests to ai-proxy3.t where all built-in var tests live avoids this interference. Route URI /ai/v1/responses is used so openai-responses matches() detects it (string_sub(uri, -13) == '/v1/responses'), while staying isolated from the existing /v1/responses route in ai-proxy.t. --- t/plugin/ai-proxy.t | 64 -------------------------------------------- t/plugin/ai-proxy3.t | 64 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 64 deletions(-) diff --git a/t/plugin/ai-proxy.t b/t/plugin/ai-proxy.t index e6ee200d69ee..1e3b2ecd8d16 100644 --- a/t/plugin/ai-proxy.t +++ b/t/plugin/ai-proxy.t @@ -1207,67 +1207,3 @@ got token usage from ai service: } --- response_body OK: auth.query is clean - - - -=== TEST 38: set route for Responses API built-in var tests ---- config - location /t { - content_by_lua_block { - local t = require("lib.test_admin").test - local code, body = t('/apisix/admin/routes/1', - ngx.HTTP_PUT, - [[{ - "uri": "/v1/responses", - "plugins": { - "ai-proxy": { - "provider": "openai", - "auth": { - "header": { - "Authorization": "Bearer test-key" - } - }, - "options": { - "model": "gpt-4o-mini" - }, - "override": { - "endpoint": "http://127.0.0.1:1980" - }, - "ssl_verify": false - } - } - }]] - ) - - if code >= 300 then - ngx.status = code - end - ngx.say(body) - } - } ---- response_body -passed - - - -=== TEST 39: Responses API streaming sets llm_cache_read_input_tokens and llm_reasoning_tokens ---- request -POST /v1/responses -{"input":"Hello","model":"gpt-4o-mini","stream":true} ---- more_headers -X-AI-Fixture: openai/responses-streaming-with-cache.sse ---- error_code: 200 ---- access_log eval -qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o-mini gpt-4o-mini [\d.]+ 20 5 25 true false 0 \S* 10 0 3/ - - - -=== TEST 40: Responses API streaming detects function_call in response.output as tool call ---- request -POST /v1/responses -{"input":"What is the weather?","model":"gpt-4o-mini","stream":true} ---- more_headers -X-AI-Fixture: openai/responses-streaming-with-tool-call.sse ---- error_code: 200 ---- access_log eval -qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o-mini gpt-4o-mini [\d.]+ 20 5 25 true true 0 / diff --git a/t/plugin/ai-proxy3.t b/t/plugin/ai-proxy3.t index 39326ead80cb..a75656fd3dad 100644 --- a/t/plugin/ai-proxy3.t +++ b/t/plugin/ai-proxy3.t @@ -364,3 +364,67 @@ X-AI-Fixture: openai/chat-streaming-with-tool-calls.sse --- error_code: 200 --- access_log eval qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 15 10 25 true true 0 / + + + +=== TEST 14: set route for Responses API built-in var tests +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/3', + ngx.HTTP_PUT, + [[{ + "uri": "/ai/v1/responses", + "plugins": { + "ai-proxy": { + "provider": "openai", + "auth": { + "header": { + "Authorization": "Bearer test-key" + } + }, + "options": { + "model": "gpt-4o-mini" + }, + "override": { + "endpoint": "http://127.0.0.1:1980" + }, + "ssl_verify": false + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 15: Responses API streaming sets llm_cache_read_input_tokens and llm_reasoning_tokens +--- request +POST /ai/v1/responses +{"input":"Hello","model":"gpt-4o-mini","stream":true} +--- more_headers +X-AI-Fixture: openai/responses-streaming-with-cache.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o-mini gpt-4o-mini [\d.]+ 20 5 25 true false 0 \S* 10 0 3/ + + + +=== TEST 16: Responses API streaming detects function_call in response.output as tool call +--- request +POST /ai/v1/responses +{"input":"What is the weather?","model":"gpt-4o-mini","stream":true} +--- more_headers +X-AI-Fixture: openai/responses-streaming-with-tool-call.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o-mini gpt-4o-mini [\d.]+ 20 5 25 true true 0 / From cbc98d97a86b510b1c04e9b02ce16980796d5bdb Mon Sep 17 00:00:00 2001 From: rongxin Date: Fri, 5 Jun 2026 16:49:33 +0800 Subject: [PATCH 11/14] fix(test): terminate Responses API SSE fixtures with blank line The SSE codec delimits events on a blank line. The two Responses API streaming fixtures ended with a single newline after the last data line, so the final response.completed frame (which carries usage and the function_call output) was buffered and dropped at EOF, leaving all token vars at 0 and llm_has_tool_calls=false. Add the trailing blank line. --- t/fixtures/openai/responses-streaming-with-cache.sse | 1 + t/fixtures/openai/responses-streaming-with-tool-call.sse | 1 + 2 files changed, 2 insertions(+) diff --git a/t/fixtures/openai/responses-streaming-with-cache.sse b/t/fixtures/openai/responses-streaming-with-cache.sse index d6defd5981df..67953da93ac0 100644 --- a/t/fixtures/openai/responses-streaming-with-cache.sse +++ b/t/fixtures/openai/responses-streaming-with-cache.sse @@ -3,3 +3,4 @@ data: {"type":"response.output_text.delta","delta":"Hello"} event: response.completed data: {"type":"response.completed","response":{"output":[{"type":"message","content":[{"type":"output_text","text":"Hello"}]}],"usage":{"input_tokens":20,"output_tokens":5,"total_tokens":25,"input_tokens_details":{"cached_tokens":10},"output_tokens_details":{"reasoning_tokens":3}}}} + diff --git a/t/fixtures/openai/responses-streaming-with-tool-call.sse b/t/fixtures/openai/responses-streaming-with-tool-call.sse index 1051fce857bc..4c9dbc125ee6 100644 --- a/t/fixtures/openai/responses-streaming-with-tool-call.sse +++ b/t/fixtures/openai/responses-streaming-with-tool-call.sse @@ -1,2 +1,3 @@ event: response.completed data: {"type":"response.completed","response":{"output":[{"type":"function_call","id":"call_1","name":"get_weather","arguments":"{}"}],"usage":{"input_tokens":20,"output_tokens":5,"total_tokens":25}}} + From e4f02fa4ae3c34d213ac4f0c5e6a424fd06d62c1 Mon Sep 17 00:00:00 2001 From: rongxin Date: Mon, 8 Jun 2026 09:40:00 +0800 Subject: [PATCH 12/14] test(ai-proxy): cover cache/reasoning token vars across protocols Add access-log assertions for previously-uncovered branches: - openai-chat streaming cache/reasoning tokens - openai-responses non-streaming cache/reasoning extract_usage - anthropic native streaming/non-streaming token vars (total accumulation, tool_use detection, cache tokens) --- .../messages-streaming-with-tool-use.sse | 14 +++ .../openai/chat-streaming-with-cache.sse | 6 ++ t/fixtures/openai/responses-with-cache.json | 22 +++++ t/plugin/ai-proxy-anthropic.t | 90 +++++++++++++++++++ t/plugin/ai-proxy3.t | 26 ++++++ 5 files changed, 158 insertions(+) create mode 100644 t/fixtures/anthropic/messages-streaming-with-tool-use.sse create mode 100644 t/fixtures/openai/chat-streaming-with-cache.sse create mode 100644 t/fixtures/openai/responses-with-cache.json diff --git a/t/fixtures/anthropic/messages-streaming-with-tool-use.sse b/t/fixtures/anthropic/messages-streaming-with-tool-use.sse new file mode 100644 index 000000000000..425626a0728f --- /dev/null +++ b/t/fixtures/anthropic/messages-streaming-with-tool-use.sse @@ -0,0 +1,14 @@ +event: message_start +data: {"type":"message_start","message":{"id":"msg_123","type":"message","role":"assistant","content":[],"usage":{"input_tokens":20,"output_tokens":0}}} + +event: content_block_start +data: {"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"toolu_1","name":"get_weather"}} + +event: content_block_stop +data: {"type":"content_block_stop","index":0} + +event: message_delta +data: {"type":"message_delta","delta":{"stop_reason":"tool_use"},"usage":{"output_tokens":5}} + +event: message_stop +data: {} diff --git a/t/fixtures/openai/chat-streaming-with-cache.sse b/t/fixtures/openai/chat-streaming-with-cache.sse new file mode 100644 index 000000000000..11757f64553e --- /dev/null +++ b/t/fixtures/openai/chat-streaming-with-cache.sse @@ -0,0 +1,6 @@ +data: {"id":"chatcmpl-cache1","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant","content":"Hi"},"index":0,"finish_reason":null}],"usage":null} + +data: {"id":"chatcmpl-cache1","object":"chat.completion.chunk","choices":[{"delta":{},"index":0,"finish_reason":"stop"}],"usage":{"prompt_tokens":30,"completion_tokens":15,"total_tokens":45,"prompt_tokens_details":{"cached_tokens":10,"cache_creation_input_tokens":5},"completion_tokens_details":{"reasoning_tokens":7}}} + +data: [DONE] + diff --git a/t/fixtures/openai/responses-with-cache.json b/t/fixtures/openai/responses-with-cache.json new file mode 100644 index 000000000000..39c72846fd1e --- /dev/null +++ b/t/fixtures/openai/responses-with-cache.json @@ -0,0 +1,22 @@ +{ + "id": "resp_cache1", + "object": "response", + "created_at": 1723780938, + "model": "{{model}}", + "output": [ + { + "type": "message", + "role": "assistant", + "content": [ + { "type": "output_text", "text": "Hello" } + ] + } + ], + "usage": { + "input_tokens": 40, + "output_tokens": 20, + "total_tokens": 60, + "input_tokens_details": { "cached_tokens": 12 }, + "output_tokens_details": { "reasoning_tokens": 8 } + } +} diff --git a/t/plugin/ai-proxy-anthropic.t b/t/plugin/ai-proxy-anthropic.t index e5912d192499..9b1400b4ed9f 100644 --- a/t/plugin/ai-proxy-anthropic.t +++ b/t/plugin/ai-proxy-anthropic.t @@ -36,6 +36,7 @@ add_block_preprocessor(sub { my $user_yaml_config = <<_EOC_; plugins: + - ai-proxy - ai-proxy-multi _EOC_ $block->set_value("extra_yaml_config", $user_yaml_config); @@ -1666,3 +1667,92 @@ OK OK --- no_error_log [error] + + + +=== TEST 47: set route for native Anthropic protocol built-in var tests +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "uri": "/v1/messages", + "plugins": { + "ai-proxy": { + "provider": "anthropic", + "auth": { + "header": { + "x-api-key": "test-key" + } + }, + "options": { + "model": "claude-3-5-sonnet-20241022" + }, + "override": { + "endpoint": "http://127.0.0.1:1980" + }, + "ssl_verify": false + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 48: Anthropic streaming accumulates total_tokens from split message_start and message_delta events +--- request +POST /v1/messages +{"messages":[{"role":"user","content":"Hello"}],"model":"claude-3-5-sonnet-20241022","max_tokens":1024,"stream":true} +--- more_headers +X-AI-Fixture: anthropic/messages-streaming.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" claude-3-5-sonnet-20241022 claude-3-5-sonnet-20241022 [\d.]+ 10 8 18 true false 0 / + + + +=== TEST 49: Anthropic streaming detects tool_use in content_block_start as tool call +--- request +POST /v1/messages +{"messages":[{"role":"user","content":"What is the weather?"}],"model":"claude-3-5-sonnet-20241022","max_tokens":1024,"stream":true} +--- more_headers +X-AI-Fixture: anthropic/messages-streaming-with-tool-use.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" claude-3-5-sonnet-20241022 claude-3-5-sonnet-20241022 [\d.]+ 20 5 25 true true 0 / + + + +=== TEST 50: Anthropic non-streaming writes cache tokens to access log +--- request +POST /v1/messages +{"messages":[{"role":"user","content":"Hello"}],"model":"claude-3-5-sonnet-20241022","max_tokens":1024} +--- more_headers +X-AI-Fixture: anthropic/messages-with-cache.json +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" claude-3-5-sonnet-20241022 claude-3-5-sonnet-20241022 [\d.]+ 50 30 80 false false 0 \S* 200 100 0/ + + + +=== TEST 51: Anthropic streaming writes cache tokens to access log +--- request +POST /v1/messages +{"messages":[{"role":"user","content":"Hello"}],"model":"claude-3-5-sonnet-20241022","max_tokens":1024,"stream":true} +--- more_headers +X-AI-Fixture: anthropic/messages-streaming-with-cache.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" claude-3-5-sonnet-20241022 claude-3-5-sonnet-20241022 [\d.]+ 50 30 80 true false 0 \S* 200 100 0/ + diff --git a/t/plugin/ai-proxy3.t b/t/plugin/ai-proxy3.t index a75656fd3dad..501c849522a6 100644 --- a/t/plugin/ai-proxy3.t +++ b/t/plugin/ai-proxy3.t @@ -428,3 +428,29 @@ X-AI-Fixture: openai/responses-streaming-with-tool-call.sse --- error_code: 200 --- access_log eval qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o-mini gpt-4o-mini [\d.]+ 20 5 25 true true 0 / + + + +=== TEST 17: OpenAI Chat streaming writes cache and reasoning tokens to access log +--- request +POST /log-vars +{"messages":[{"role":"user","content":"Solve this"}],"model":"gpt-4o","stream":true} +--- more_headers +X-AI-Fixture: openai/chat-streaming-with-cache.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 30 15 45 true false 0 \S* 10 5 7/ + + + +=== TEST 18: Responses API non-streaming writes cache and reasoning tokens to access log +--- request +POST /ai/v1/responses +{"input":"Solve this","model":"gpt-4o-mini"} +--- more_headers +X-AI-Fixture: openai/responses-with-cache.json +--- error_code: 200 +--- response_body eval +qr/.*output.*/ +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o-mini gpt-4o-mini [\d.]+ 40 20 60 false false 0 \S* 12 0 8/ From 83fb56b831e2c0d21169b87ed42711f8ee1c9359 Mon Sep 17 00:00:00 2001 From: rongxin Date: Mon, 8 Jun 2026 10:24:53 +0800 Subject: [PATCH 13/14] style(test): remove trailing blank line to satisfy reindex lint --- t/plugin/ai-proxy-anthropic.t | 1 - 1 file changed, 1 deletion(-) diff --git a/t/plugin/ai-proxy-anthropic.t b/t/plugin/ai-proxy-anthropic.t index 9b1400b4ed9f..67e435bbb883 100644 --- a/t/plugin/ai-proxy-anthropic.t +++ b/t/plugin/ai-proxy-anthropic.t @@ -1755,4 +1755,3 @@ X-AI-Fixture: anthropic/messages-streaming-with-cache.sse --- error_code: 200 --- access_log eval qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" claude-3-5-sonnet-20241022 claude-3-5-sonnet-20241022 [\d.]+ 50 30 80 true false 0 \S* 200 100 0/ - From ade8780d69db7127a61be4bfd4b11321c31b014e Mon Sep 17 00:00:00 2001 From: rongxin Date: Mon, 8 Jun 2026 16:40:02 +0800 Subject: [PATCH 14/14] refactor(ai-proxy): move per-protocol extraction into protocol adapters Move tool-call detection and end-user-id extraction out of the centralized ai-proxy/base.lua switches into per-adapter has_tool_call/extract_end_user_id methods, consistent with extract_usage/extract_response_text. Unify all non-streaming llm_* token/tool var assignment inside parse_response (mirrors parse_streaming_response), hoist target_proto_module to a single lookup, and add total_tokens to the llm_summary for logger plugins. --- .../ai-protocols/anthropic-messages.lua | 29 ++++++ apisix/plugins/ai-protocols/openai-chat.lua | 59 +++++++++--- .../plugins/ai-protocols/openai-responses.lua | 29 ++++++ apisix/plugins/ai-providers/base.lua | 7 ++ apisix/plugins/ai-proxy/base.lua | 89 ++++--------------- 5 files changed, 127 insertions(+), 86 deletions(-) diff --git a/apisix/plugins/ai-protocols/anthropic-messages.lua b/apisix/plugins/ai-protocols/anthropic-messages.lua index 0b9ffda08606..a777f5136228 100644 --- a/apisix/plugins/ai-protocols/anthropic-messages.lua +++ b/apisix/plugins/ai-protocols/anthropic-messages.lua @@ -112,6 +112,7 @@ function _M.parse_sse_event(event, ctx, state) total_tokens = (usage.input_tokens or 0) + (usage.output_tokens or 0), cache_read_input_tokens = usage.cache_read_input_tokens or 0, cache_creation_input_tokens = usage.cache_creation_input_tokens or 0, + reasoning_tokens = 0, }, raw_usage = usage, } @@ -181,10 +182,38 @@ function _M.extract_usage(res_body) total_tokens = prompt + completion, cache_read_input_tokens = raw.cache_read_input_tokens or 0, cache_creation_input_tokens = raw.cache_creation_input_tokens or 0, + reasoning_tokens = 0, }, raw end +--- Detect whether a non-streaming response contains tool calls. +function _M.has_tool_call(res_body) + if type(res_body) ~= "table" or type(res_body.content) ~= "table" then + return false + end + for _, block in ipairs(res_body.content) do + if type(block) == "table" and block.type == "tool_use" then + return true + end + end + return false +end + + +--- Extract the end-user identifier from a request body. +function _M.extract_end_user_id(body) + if type(body) ~= "table" then + return nil + end + local meta = body.metadata + if type(meta) == "table" and type(meta.user_id) == "string" then + return meta.user_id + end + return nil +end + + --- Extract all text content from a request body for moderation. function _M.extract_request_content(body) local contents = {} diff --git a/apisix/plugins/ai-protocols/openai-chat.lua b/apisix/plugins/ai-protocols/openai-chat.lua index 5fb37dac2b5d..096af76dfe34 100644 --- a/apisix/plugins/ai-protocols/openai-chat.lua +++ b/apisix/plugins/ai-protocols/openai-chat.lua @@ -100,21 +100,20 @@ function _M.parse_sse_event(event, ctx, state) -- Extract usage (null for non-final chunks; cjson decodes null as userdata) if type(data.usage) == "table" then + local u = data.usage + local pd = type(u.prompt_tokens_details) == "table" and u.prompt_tokens_details + local cd = type(u.completion_tokens_details) == "table" and u.completion_tokens_details result.type = "usage" - local pd = type(data.usage.prompt_tokens_details) == "table" - and data.usage.prompt_tokens_details - local cd = type(data.usage.completion_tokens_details) == "table" - and data.usage.completion_tokens_details result.usage = { - prompt_tokens = data.usage.prompt_tokens or 0, - completion_tokens = data.usage.completion_tokens or 0, - total_tokens = data.usage.total_tokens or 0, + prompt_tokens = u.prompt_tokens or 0, + completion_tokens = u.completion_tokens or 0, + total_tokens = u.total_tokens or 0, cache_read_input_tokens = pd and pd.cached_tokens - or data.usage.prompt_cache_hit_tokens or 0, + or u.prompt_cache_hit_tokens or 0, cache_creation_input_tokens = pd and pd.cache_creation_input_tokens or 0, reasoning_tokens = cd and cd.reasoning_tokens or 0, } - result.raw_usage = data.usage + result.raw_usage = u end return result @@ -172,21 +171,53 @@ function _M.extract_usage(res_body) return nil, nil end local raw = res_body.usage - local pd = type(raw.prompt_tokens_details) == "table" and raw.prompt_tokens_details - local cd = type(raw.completion_tokens_details) == "table" and raw.completion_tokens_details + local pdetails = type(raw.prompt_tokens_details) == "table" and raw.prompt_tokens_details + local cdetails = type(raw.completion_tokens_details) == "table" + and raw.completion_tokens_details -- OpenAI uses prompt_tokens_details.cached_tokens; DeepSeek uses prompt_cache_hit_tokens - local cache_read = pd and pd.cached_tokens or raw.prompt_cache_hit_tokens or 0 + local cache_read = pdetails and pdetails.cached_tokens or raw.prompt_cache_hit_tokens or 0 return { prompt_tokens = raw.prompt_tokens or 0, completion_tokens = raw.completion_tokens or 0, total_tokens = raw.total_tokens or (raw.prompt_tokens or 0) + (raw.completion_tokens or 0), cache_read_input_tokens = cache_read, - cache_creation_input_tokens = pd and pd.cache_creation_input_tokens or 0, - reasoning_tokens = cd and cd.reasoning_tokens or 0, + cache_creation_input_tokens = pdetails and pdetails.cache_creation_input_tokens or 0, + reasoning_tokens = cdetails and cdetails.reasoning_tokens or 0, }, raw end +--- Detect whether a non-streaming response contains tool calls. +function _M.has_tool_call(res_body) + if type(res_body) ~= "table" or type(res_body.choices) ~= "table" then + return false + end + for _, choice in ipairs(res_body.choices) do + if type(choice) == "table" and type(choice.message) == "table" + and type(choice.message.tool_calls) == "table" + and #choice.message.tool_calls > 0 then + return true + end + end + return false +end + + +--- Extract the end-user identifier from a request body. +function _M.extract_end_user_id(body) + if type(body) ~= "table" then + return nil + end + if type(body.safety_identifier) == "string" then + return body.safety_identifier + end + if type(body.user) == "string" then + return body.user + end + return nil +end + + --- Extract all text content from a request body for moderation. function _M.extract_request_content(body) local contents = {} diff --git a/apisix/plugins/ai-protocols/openai-responses.lua b/apisix/plugins/ai-protocols/openai-responses.lua index b0257c3410d2..9e78b3fb5979 100644 --- a/apisix/plugins/ai-protocols/openai-responses.lua +++ b/apisix/plugins/ai-protocols/openai-responses.lua @@ -164,6 +164,35 @@ function _M.extract_usage(res_body) end +--- Detect whether a non-streaming response contains tool calls. +function _M.has_tool_call(res_body) + if type(res_body) ~= "table" or type(res_body.output) ~= "table" then + return false + end + for _, item in ipairs(res_body.output) do + if type(item) == "table" and item.type == "function_call" then + return true + end + end + return false +end + + +--- Extract the end-user identifier from a request body. +function _M.extract_end_user_id(body) + if type(body) ~= "table" then + return nil + end + if type(body.safety_identifier) == "string" then + return body.safety_identifier + end + if type(body.user) == "string" then + return body.user + end + return nil +end + + --- Extract all text content from a request body for moderation. function _M.extract_request_content(body) local contents = {} diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua index 3ed63aa499a8..227889f58a02 100644 --- a/apisix/plugins/ai-providers/base.lua +++ b/apisix/plugins/ai-providers/base.lua @@ -403,6 +403,7 @@ function _M.parse_response(self, ctx, res, client_proto, converter, conf) end ctx.var.llm_prompt_tokens = ctx.ai_token_usage.prompt_tokens or 0 ctx.var.llm_completion_tokens = ctx.ai_token_usage.completion_tokens or 0 + ctx.var.llm_total_tokens = ctx.ai_token_usage.total_tokens or 0 ctx.var.llm_cache_read_input_tokens = ctx.ai_token_usage.cache_read_input_tokens or 0 ctx.var.llm_cache_creation_input_tokens = ctx.ai_token_usage.cache_creation_input_tokens or 0 ctx.var.llm_reasoning_tokens = ctx.ai_token_usage.reasoning_tokens or 0 @@ -412,6 +413,12 @@ function _M.parse_response(self, ctx, res, client_proto, converter, conf) ctx.var.llm_response_text = response_text end + -- Detect tool calls (mirrors the streaming path, which sets this from + -- parse_sse_event.has_tool_call). Each client protocol knows its own shape. + if client_proto.has_tool_call and client_proto.has_tool_call(res_body) then + ctx.var.llm_has_tool_calls = "true" + end + plugin.lua_response_filter(ctx, headers, raw_res_body) return res_body end diff --git a/apisix/plugins/ai-proxy/base.lua b/apisix/plugins/ai-proxy/base.lua index d7b7e1cfd86a..1f4003f427f9 100644 --- a/apisix/plugins/ai-proxy/base.lua +++ b/apisix/plugins/ai-proxy/base.lua @@ -22,7 +22,6 @@ local require = require local pcall = pcall local pairs = pairs local type = type -local ipairs = ipairs local table = table local exporter = require("apisix.plugins.prometheus.exporter") local protocols = require("apisix.plugins.ai-protocols") @@ -30,28 +29,12 @@ local transport_http = require("apisix.plugins.ai-transport.http") local log_sanitize = require("apisix.utils.log-sanitize") local apisix_upstream = require("resty.apisix.upstream") -local function extract_end_user_id(body, protocol) - if type(body) ~= "table" then - return nil - end - if protocol == "anthropic-messages" then - local meta = body.metadata - if type(meta) == "table" and type(meta.user_id) == "string" then - return meta.user_id - end - return nil - end - -- openai-chat, openai-responses: safety_identifier takes precedence over user - if type(body.safety_identifier) == "string" then - return body.safety_identifier - end - if type(body.user) == "string" then - return body.user - end - return nil -end +local _M = {} +-- Count tools in the final upstream request body. +-- OpenAI Chat/Responses: body.tools array +-- Anthropic Messages: body.tools array local function count_request_tools(body) if type(body) ~= "table" then return 0 @@ -64,45 +47,6 @@ local function count_request_tools(body) end -local function detect_tool_calls_in_response(body) - if type(body) ~= "table" then - return false - end - -- OpenAI Chat / Responses: choices[].message.tool_calls - if type(body.choices) == "table" then - for _, choice in ipairs(body.choices) do - if type(choice) == "table" then - local msg = choice.message - if type(msg) == "table" and type(msg.tool_calls) == "table" - and #msg.tool_calls > 0 then - return true - end - end - end - end - -- Anthropic Messages: content[].type == "tool_use" - if type(body.content) == "table" then - for _, block in ipairs(body.content) do - if type(block) == "table" and block.type == "tool_use" then - return true - end - end - end - -- OpenAI Responses: output[].type == "function_call" - if type(body.output) == "table" then - for _, item in ipairs(body.output) do - if type(item) == "table" and item.type == "function_call" then - return true - end - end - end - return false -end - - -local _M = {} - - local function resolve_cap(cap_entry, key, conf, ctx) local val = cap_entry and cap_entry[key] if type(val) == "function" then @@ -119,6 +63,7 @@ function _M.set_logging(ctx, summaries, payloads) duration = ctx.var.llm_time_to_first_token, prompt_tokens = ctx.var.llm_prompt_tokens, completion_tokens = ctx.var.llm_completion_tokens, + total_tokens = ctx.var.llm_total_tokens, upstream_response_time = ctx.var.apisix_upstream_response_time, } end @@ -243,6 +188,7 @@ function _M.before_proxy(conf, ctx, on_error) end ctx.ai_converter = converter ctx.ai_target_protocol = target_proto + local target_proto_module = protocols.get(target_proto) -- Step 2: Extract model from request local request_model = request_body.model @@ -281,11 +227,14 @@ function _M.before_proxy(conf, ctx, on_error) -- Compute built-in AI log fields from the final upstream request local final_body = params.body - ctx.var.llm_stream = ctx.var.request_type == "ai_stream" and "true" or "false" + local is_stream = ctx.var.request_type == "ai_stream" + ctx.var.llm_stream = is_stream and "true" or "false" ctx.var.llm_tool_count = count_request_tools(final_body) - local end_user = extract_end_user_id(final_body, target_proto) - if end_user then - ctx.var.llm_end_user_id = end_user + if target_proto_module and target_proto_module.extract_end_user_id then + local end_user = target_proto_module.extract_end_user_id(final_body) + if end_user then + ctx.var.llm_end_user_id = end_user + end end core.log.info("sending request to LLM server: ", @@ -381,26 +330,22 @@ function _M.before_proxy(conf, ctx, on_error) "application/vnd.amazon.eventstream", 1, true) ) if is_streaming_resp then - local target_proto_module = protocols.get(target_proto) if not target_proto_module then core.log.error("no protocol module for streaming target: ", target_proto) return 500 end + code, body = ai_provider:parse_streaming_response( ctx, res, target_proto_module, converter, conf) else - local res_body, parse_err, parse_status = ai_provider:parse_response( + -- Non-streaming: parse_response sets all llm_* token/tool vars + -- via the client protocol adapter. + local _, parse_err, parse_status = ai_provider:parse_response( ctx, res, client_proto, converter, conf) if parse_err then code = parse_status or 500 body = parse_err end - if ctx.ai_token_usage then - ctx.var.llm_total_tokens = ctx.ai_token_usage.total_tokens or 0 - end - if res_body and detect_tool_calls_in_response(res_body) then - ctx.var.llm_has_tool_calls = "true" - end end -- Finalize upstream state with response_time after body is consumed