diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua index 9603eae468f3..ec57dc7cd5ec 100644 --- a/apisix/cli/ngx_tpl.lua +++ b/apisix/cli/ngx_tpl.lua @@ -829,6 +829,14 @@ http { set $llm_model ''; set $llm_prompt_tokens '0'; set $llm_completion_tokens '0'; + set $llm_total_tokens '0'; + set $llm_stream 'false'; + set $llm_has_tool_calls 'false'; + set $llm_tool_count '0'; + set $llm_end_user_id ''; + set $llm_cache_read_input_tokens '0'; + set $llm_cache_creation_input_tokens '0'; + set $llm_reasoning_tokens '0'; {% if use_apisix_base then %} diff --git a/apisix/core/ctx.lua b/apisix/core/ctx.lua index 92a57629dc18..bbb0eb2242f9 100644 --- a/apisix/core/ctx.lua +++ b/apisix/core/ctx.lua @@ -204,6 +204,14 @@ do llm_model = true, llm_prompt_tokens = true, llm_completion_tokens = true, + llm_total_tokens = true, + llm_stream = true, + llm_has_tool_calls = true, + llm_tool_count = true, + llm_end_user_id = true, + llm_cache_read_input_tokens = true, + llm_cache_creation_input_tokens = true, + llm_reasoning_tokens = true, upstream_mirror_host = true, upstream_mirror_uri = true, diff --git a/apisix/plugins/ai-protocols/anthropic-messages.lua b/apisix/plugins/ai-protocols/anthropic-messages.lua index a78d0c26e553..a777f5136228 100644 --- a/apisix/plugins/ai-protocols/anthropic-messages.lua +++ b/apisix/plugins/ai-protocols/anthropic-messages.lua @@ -66,6 +66,14 @@ function _M.parse_sse_event(event, ctx, state) end return { type = "skip" } + elseif event.type == "content_block_start" then + local data = core.json.decode(event.data, { null_as_nil = true }) + if data and type(data.content_block) == "table" + and data.content_block.type == "tool_use" then + return { type = "skip", has_tool_call = true } + end + return { type = "skip" } + elseif event.type == "message_delta" then local data, err = core.json.decode(event.data, { null_as_nil = true }) if not data then @@ -102,6 +110,9 @@ function _M.parse_sse_event(event, ctx, state) prompt_tokens = usage.input_tokens or 0, completion_tokens = usage.output_tokens or 0, total_tokens = (usage.input_tokens or 0) + (usage.output_tokens or 0), + cache_read_input_tokens = usage.cache_read_input_tokens or 0, + cache_creation_input_tokens = usage.cache_creation_input_tokens or 0, + reasoning_tokens = 0, }, raw_usage = usage, } @@ -169,10 +180,40 @@ function _M.extract_usage(res_body) prompt_tokens = prompt, completion_tokens = completion, total_tokens = prompt + completion, + cache_read_input_tokens = raw.cache_read_input_tokens or 0, + cache_creation_input_tokens = raw.cache_creation_input_tokens or 0, + reasoning_tokens = 0, }, raw end +--- Detect whether a non-streaming response contains tool calls. +function _M.has_tool_call(res_body) + if type(res_body) ~= "table" or type(res_body.content) ~= "table" then + return false + end + for _, block in ipairs(res_body.content) do + if type(block) == "table" and block.type == "tool_use" then + return true + end + end + return false +end + + +--- Extract the end-user identifier from a request body. +function _M.extract_end_user_id(body) + if type(body) ~= "table" then + return nil + end + local meta = body.metadata + if type(meta) == "table" and type(meta.user_id) == "string" then + return meta.user_id + end + return nil +end + + --- Extract all text content from a request body for moderation. function _M.extract_request_content(body) local contents = {} diff --git a/apisix/plugins/ai-protocols/openai-chat.lua b/apisix/plugins/ai-protocols/openai-chat.lua index faa48f6654c6..096af76dfe34 100644 --- a/apisix/plugins/ai-protocols/openai-chat.lua +++ b/apisix/plugins/ai-protocols/openai-chat.lua @@ -79,14 +79,18 @@ function _M.parse_sse_event(event, ctx, state) local result = { type = "delta", data = data } - -- Extract text content from choices + -- Extract text content and detect tool calls from choices if type(data.choices) == "table" and #data.choices > 0 then local texts = {} for _, choice in ipairs(data.choices) do - if type(choice) == "table" - and type(choice.delta) == "table" - and type(choice.delta.content) == "string" then - core.table.insert(texts, choice.delta.content) + if type(choice) == "table" and type(choice.delta) == "table" then + if type(choice.delta.content) == "string" then + core.table.insert(texts, choice.delta.content) + end + if type(choice.delta.tool_calls) == "table" + and #choice.delta.tool_calls > 0 then + result.has_tool_call = true + end end end if #texts > 0 then @@ -96,13 +100,20 @@ function _M.parse_sse_event(event, ctx, state) -- Extract usage (null for non-final chunks; cjson decodes null as userdata) if type(data.usage) == "table" then + local u = data.usage + local pd = type(u.prompt_tokens_details) == "table" and u.prompt_tokens_details + local cd = type(u.completion_tokens_details) == "table" and u.completion_tokens_details result.type = "usage" result.usage = { - prompt_tokens = data.usage.prompt_tokens or 0, - completion_tokens = data.usage.completion_tokens or 0, - total_tokens = data.usage.total_tokens or 0, + prompt_tokens = u.prompt_tokens or 0, + completion_tokens = u.completion_tokens or 0, + total_tokens = u.total_tokens or 0, + cache_read_input_tokens = pd and pd.cached_tokens + or u.prompt_cache_hit_tokens or 0, + cache_creation_input_tokens = pd and pd.cache_creation_input_tokens or 0, + reasoning_tokens = cd and cd.reasoning_tokens or 0, } - result.raw_usage = data.usage + result.raw_usage = u end return result @@ -160,14 +171,53 @@ function _M.extract_usage(res_body) return nil, nil end local raw = res_body.usage + local pdetails = type(raw.prompt_tokens_details) == "table" and raw.prompt_tokens_details + local cdetails = type(raw.completion_tokens_details) == "table" + and raw.completion_tokens_details + -- OpenAI uses prompt_tokens_details.cached_tokens; DeepSeek uses prompt_cache_hit_tokens + local cache_read = pdetails and pdetails.cached_tokens or raw.prompt_cache_hit_tokens or 0 return { prompt_tokens = raw.prompt_tokens or 0, completion_tokens = raw.completion_tokens or 0, total_tokens = raw.total_tokens or (raw.prompt_tokens or 0) + (raw.completion_tokens or 0), + cache_read_input_tokens = cache_read, + cache_creation_input_tokens = pdetails and pdetails.cache_creation_input_tokens or 0, + reasoning_tokens = cdetails and cdetails.reasoning_tokens or 0, }, raw end +--- Detect whether a non-streaming response contains tool calls. +function _M.has_tool_call(res_body) + if type(res_body) ~= "table" or type(res_body.choices) ~= "table" then + return false + end + for _, choice in ipairs(res_body.choices) do + if type(choice) == "table" and type(choice.message) == "table" + and type(choice.message.tool_calls) == "table" + and #choice.message.tool_calls > 0 then + return true + end + end + return false +end + + +--- Extract the end-user identifier from a request body. +function _M.extract_end_user_id(body) + if type(body) ~= "table" then + return nil + end + if type(body.safety_identifier) == "string" then + return body.safety_identifier + end + if type(body.user) == "string" then + return body.user + end + return nil +end + + --- Extract all text content from a request body for moderation. function _M.extract_request_content(body) local contents = {} diff --git a/apisix/plugins/ai-protocols/openai-responses.lua b/apisix/plugins/ai-protocols/openai-responses.lua index 9e05e99a7eba..9e78b3fb5979 100644 --- a/apisix/plugins/ai-protocols/openai-responses.lua +++ b/apisix/plugins/ai-protocols/openai-responses.lua @@ -68,16 +68,30 @@ function _M.parse_sse_event(event, ctx, state) core.log.warn("failed to decode response.completed SSE data: ", err) return result end - if type(data.response) == "table" - and type(data.response.usage) == "table" then - local usage = data.response.usage - result.type = "usage_and_done" - result.usage = { - prompt_tokens = usage.input_tokens or 0, - completion_tokens = usage.output_tokens or 0, - total_tokens = usage.total_tokens or 0, - } - result.raw_usage = usage + if type(data.response) == "table" then + local resp = data.response + if type(resp.usage) == "table" then + local usage = resp.usage + result.type = "usage_and_done" + result.usage = { + prompt_tokens = usage.input_tokens or 0, + completion_tokens = usage.output_tokens or 0, + total_tokens = usage.total_tokens or 0, + cache_read_input_tokens = type(usage.input_tokens_details) == "table" + and usage.input_tokens_details.cached_tokens or 0, + reasoning_tokens = type(usage.output_tokens_details) == "table" + and usage.output_tokens_details.reasoning_tokens or 0, + } + result.raw_usage = usage + end + if type(resp.output) == "table" then + for _, item in ipairs(resp.output) do + if type(item) == "table" and item.type == "function_call" then + result.has_tool_call = true + break + end + end + end end return result @@ -135,17 +149,50 @@ function _M.extract_usage(res_body) return nil, nil end local raw = res_body.usage - -- Responses API uses input_tokens / output_tokens + local idetails = type(raw.input_tokens_details) == "table" and raw.input_tokens_details + local odetails = type(raw.output_tokens_details) == "table" and raw.output_tokens_details local prompt = raw.input_tokens or 0 local completion = raw.output_tokens or 0 return { prompt_tokens = prompt, completion_tokens = completion, total_tokens = raw.total_tokens or (prompt + completion), + cache_read_input_tokens = idetails and idetails.cached_tokens or 0, + cache_creation_input_tokens = 0, + reasoning_tokens = odetails and odetails.reasoning_tokens or 0, }, raw end +--- Detect whether a non-streaming response contains tool calls. +function _M.has_tool_call(res_body) + if type(res_body) ~= "table" or type(res_body.output) ~= "table" then + return false + end + for _, item in ipairs(res_body.output) do + if type(item) == "table" and item.type == "function_call" then + return true + end + end + return false +end + + +--- Extract the end-user identifier from a request body. +function _M.extract_end_user_id(body) + if type(body) ~= "table" then + return nil + end + if type(body.safety_identifier) == "string" then + return body.safety_identifier + end + if type(body.user) == "string" then + return body.user + end + return nil +end + + --- Extract all text content from a request body for moderation. function _M.extract_request_content(body) local contents = {} diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua index 407fe0bb8fbb..227889f58a02 100644 --- a/apisix/plugins/ai-providers/base.lua +++ b/apisix/plugins/ai-providers/base.lua @@ -77,6 +77,13 @@ local function merge_usage(ctx, parsed) ctx.ai_token_usage[k] = v end end + -- Recompute total from accumulated parts (handles split events, e.g. Anthropic + -- message_start carries input tokens and message_delta carries output tokens) + local computed = (ctx.ai_token_usage.prompt_tokens or 0) + + (ctx.ai_token_usage.completion_tokens or 0) + if computed > (ctx.ai_token_usage.total_tokens or 0) then + ctx.ai_token_usage.total_tokens = computed + end end local raw = parsed.raw_usage or parsed.usage @@ -396,12 +403,22 @@ function _M.parse_response(self, ctx, res, client_proto, converter, conf) end ctx.var.llm_prompt_tokens = ctx.ai_token_usage.prompt_tokens or 0 ctx.var.llm_completion_tokens = ctx.ai_token_usage.completion_tokens or 0 + ctx.var.llm_total_tokens = ctx.ai_token_usage.total_tokens or 0 + ctx.var.llm_cache_read_input_tokens = ctx.ai_token_usage.cache_read_input_tokens or 0 + ctx.var.llm_cache_creation_input_tokens = ctx.ai_token_usage.cache_creation_input_tokens or 0 + ctx.var.llm_reasoning_tokens = ctx.ai_token_usage.reasoning_tokens or 0 local response_text = client_proto.extract_response_text(res_body) if response_text then ctx.var.llm_response_text = response_text end + -- Detect tool calls (mirrors the streaming path, which sets this from + -- parse_sse_event.has_tool_call). Each client protocol knows its own shape. + if client_proto.has_tool_call and client_proto.has_tool_call(res_body) then + ctx.var.llm_has_tool_calls = "true" + end + plugin.lua_response_filter(ctx, headers, raw_res_body) return res_body end @@ -446,13 +463,10 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co local bytes_read = 0 -- streaming_flush_interval_ms controls both flush strategy and the thread: - -- == 0 : no thread; lua_response_filter flushes synchronously - -- per chunk via ngx.flush(true), guaranteeing immediate - -- client delivery. - -- > 0 (default: 10): background thread calls ngx.flush(false) every N ms; - -- lua_response_filter skips per-chunk flush for maximum - -- throughput. Useful when the upstream bursts multiple - -- tokens at once. + -- == 0 (default): no thread; lua_response_filter flushes synchronously + -- per chunk, guaranteeing immediate client delivery. + -- > 0 : background thread handles periodic flushing; + -- lua_response_filter skips flush for maximum throughput. local flush_interval_ms = conf and conf.streaming_flush_interval_ms or 0 -- async_flush: true when the interval thread is responsible for flushing local async_flush = flush_interval_ms > 0 @@ -591,6 +605,9 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co for _, event in ipairs(events) do -- Target protocol parses the provider's SSE format local parsed = target_proto.parse_sse_event(event, ctx, sse_state) + if parsed and parsed.has_tool_call then + ctx.var.llm_has_tool_calls = "true" + end if not parsed or parsed.type == "skip" then goto CONTINUE end @@ -618,6 +635,12 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co merge_usage(ctx, parsed) ctx.var.llm_prompt_tokens = ctx.ai_token_usage.prompt_tokens ctx.var.llm_completion_tokens = ctx.ai_token_usage.completion_tokens + ctx.var.llm_total_tokens = ctx.ai_token_usage.total_tokens or 0 + ctx.var.llm_cache_read_input_tokens = + ctx.ai_token_usage.cache_read_input_tokens or 0 + ctx.var.llm_cache_creation_input_tokens = + ctx.ai_token_usage.cache_creation_input_tokens or 0 + ctx.var.llm_reasoning_tokens = ctx.ai_token_usage.reasoning_tokens or 0 ctx.var.llm_response_text = table.concat(contents, "") end @@ -702,15 +725,11 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co return status, limit_hit .. " exceeded" end - -- WORKAROUND, not a real fix: yield to the nginx scheduler so other - -- coroutines on this worker (health checks, concurrent requests) can - -- run. body_reader() and ngx.flush() do not yield when the upstream - -- socket already has data buffered or the downstream client drains - -- immediately, so under bursty SSE upstreams this loop can monopolize - -- the worker CPU. ngx.sleep(0) only prevents a single request from - -- monopolizing the worker; it does not bound per-stream CPU time, add - -- backpressure, or time out stalled streams. See #13256 for a proper - -- solution. + -- Yield to the nginx scheduler so other coroutines on this worker + -- (health checks, concurrent requests) can run. body_reader() and + -- ngx.flush() do not yield when the upstream socket already has data + -- buffered or the downstream client drains immediately, so under + -- bursty SSE upstreams this loop can monopolize the worker CPU. ngx.sleep(0) end diff --git a/apisix/plugins/ai-proxy/base.lua b/apisix/plugins/ai-proxy/base.lua index 8ce5ef9de737..1f4003f427f9 100644 --- a/apisix/plugins/ai-proxy/base.lua +++ b/apisix/plugins/ai-proxy/base.lua @@ -32,6 +32,21 @@ local apisix_upstream = require("resty.apisix.upstream") local _M = {} +-- Count tools in the final upstream request body. +-- OpenAI Chat/Responses: body.tools array +-- Anthropic Messages: body.tools array +local function count_request_tools(body) + if type(body) ~= "table" then + return 0 + end + local tools = body.tools + if type(tools) == "table" then + return #tools + end + return 0 +end + + local function resolve_cap(cap_entry, key, conf, ctx) local val = cap_entry and cap_entry[key] if type(val) == "function" then @@ -48,6 +63,7 @@ function _M.set_logging(ctx, summaries, payloads) duration = ctx.var.llm_time_to_first_token, prompt_tokens = ctx.var.llm_prompt_tokens, completion_tokens = ctx.var.llm_completion_tokens, + total_tokens = ctx.var.llm_total_tokens, upstream_response_time = ctx.var.apisix_upstream_response_time, } end @@ -172,6 +188,7 @@ function _M.before_proxy(conf, ctx, on_error) end ctx.ai_converter = converter ctx.ai_target_protocol = target_proto + local target_proto_module = protocols.get(target_proto) -- Step 2: Extract model from request local request_model = request_body.model @@ -208,6 +225,18 @@ function _M.before_proxy(conf, ctx, on_error) return 500, body end + -- Compute built-in AI log fields from the final upstream request + local final_body = params.body + local is_stream = ctx.var.request_type == "ai_stream" + ctx.var.llm_stream = is_stream and "true" or "false" + ctx.var.llm_tool_count = count_request_tools(final_body) + if target_proto_module and target_proto_module.extract_end_user_id then + local end_user = target_proto_module.extract_end_user_id(final_body) + if end_user then + ctx.var.llm_end_user_id = end_user + end + end + core.log.info("sending request to LLM server: ", core.json.delay_encode(log_sanitize.redact_params(params), true)) @@ -301,14 +330,16 @@ function _M.before_proxy(conf, ctx, on_error) "application/vnd.amazon.eventstream", 1, true) ) if is_streaming_resp then - local target_proto_module = protocols.get(target_proto) if not target_proto_module then core.log.error("no protocol module for streaming target: ", target_proto) return 500 end + code, body = ai_provider:parse_streaming_response( ctx, res, target_proto_module, converter, conf) else + -- Non-streaming: parse_response sets all llm_* token/tool vars + -- via the client protocol adapter. local _, parse_err, parse_status = ai_provider:parse_response( ctx, res, client_proto, converter, conf) if parse_err then diff --git a/t/APISIX.pm b/t/APISIX.pm index 945305068f67..b100d00d660f 100644 --- a/t/APISIX.pm +++ b/t/APISIX.pm @@ -717,7 +717,7 @@ _EOC_ require("apisix").http_exit_worker() } - log_format main escape=default '\$remote_addr - \$remote_user [\$time_local] \$http_host "\$request_line" \$status \$body_bytes_sent \$request_time "\$http_referer" "\$http_user_agent" \$upstream_addr \$upstream_status \$apisix_upstream_response_time "\$upstream_scheme://\$upstream_host\$upstream_uri" \$request_llm_model \$llm_model \$llm_time_to_first_token \$llm_prompt_tokens \$llm_completion_tokens "\$rate_limiting_info"'; + log_format main escape=default '\$remote_addr - \$remote_user [\$time_local] \$http_host "\$request_line" \$status \$body_bytes_sent \$request_time "\$http_referer" "\$http_user_agent" \$upstream_addr \$upstream_status \$apisix_upstream_response_time "\$upstream_scheme://\$upstream_host\$upstream_uri" \$request_llm_model \$llm_model \$llm_time_to_first_token \$llm_prompt_tokens \$llm_completion_tokens \$llm_total_tokens \$llm_stream \$llm_has_tool_calls \$llm_tool_count \$llm_end_user_id \$llm_cache_read_input_tokens \$llm_cache_creation_input_tokens \$llm_reasoning_tokens "\$rate_limiting_info"'; # fake server, only for test server { @@ -922,6 +922,14 @@ _EOC_ set \$llm_model ''; set \$llm_prompt_tokens '0'; set \$llm_completion_tokens '0'; + set \$llm_total_tokens '0'; + set \$llm_stream 'false'; + set \$llm_has_tool_calls 'false'; + set \$llm_tool_count '0'; + set \$llm_end_user_id ''; + set \$llm_cache_read_input_tokens '0'; + set \$llm_cache_creation_input_tokens '0'; + set \$llm_reasoning_tokens '0'; set \$apisix_upstream_response_time \$upstream_response_time; access_log $apisix_home/t/servroot/logs/access.log main; diff --git a/t/fixtures/anthropic/messages-streaming-with-tool-use.sse b/t/fixtures/anthropic/messages-streaming-with-tool-use.sse new file mode 100644 index 000000000000..425626a0728f --- /dev/null +++ b/t/fixtures/anthropic/messages-streaming-with-tool-use.sse @@ -0,0 +1,14 @@ +event: message_start +data: {"type":"message_start","message":{"id":"msg_123","type":"message","role":"assistant","content":[],"usage":{"input_tokens":20,"output_tokens":0}}} + +event: content_block_start +data: {"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"toolu_1","name":"get_weather"}} + +event: content_block_stop +data: {"type":"content_block_stop","index":0} + +event: message_delta +data: {"type":"message_delta","delta":{"stop_reason":"tool_use"},"usage":{"output_tokens":5}} + +event: message_stop +data: {} diff --git a/t/fixtures/openai/chat-streaming-with-cache.sse b/t/fixtures/openai/chat-streaming-with-cache.sse new file mode 100644 index 000000000000..11757f64553e --- /dev/null +++ b/t/fixtures/openai/chat-streaming-with-cache.sse @@ -0,0 +1,6 @@ +data: {"id":"chatcmpl-cache1","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant","content":"Hi"},"index":0,"finish_reason":null}],"usage":null} + +data: {"id":"chatcmpl-cache1","object":"chat.completion.chunk","choices":[{"delta":{},"index":0,"finish_reason":"stop"}],"usage":{"prompt_tokens":30,"completion_tokens":15,"total_tokens":45,"prompt_tokens_details":{"cached_tokens":10,"cache_creation_input_tokens":5},"completion_tokens_details":{"reasoning_tokens":7}}} + +data: [DONE] + diff --git a/t/fixtures/openai/chat-streaming-with-tool-calls.sse b/t/fixtures/openai/chat-streaming-with-tool-calls.sse new file mode 100644 index 000000000000..e17710263d15 --- /dev/null +++ b/t/fixtures/openai/chat-streaming-with-tool-calls.sse @@ -0,0 +1,7 @@ +data: {"id":"chatcmpl-1","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant","tool_calls":[{"index":0,"id":"call_1","type":"function","function":{"name":"get_weather","arguments":""}}]},"index":0,"finish_reason":null}],"usage":null} + +data: {"id":"chatcmpl-1","object":"chat.completion.chunk","choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{}"}}]},"index":0,"finish_reason":"tool_calls"}],"usage":null} + +data: {"id":"chatcmpl-1","object":"chat.completion.chunk","choices":[{"delta":{},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":15,"completion_tokens":10,"total_tokens":25}} + +data: [DONE] diff --git a/t/fixtures/openai/responses-streaming-with-cache.sse b/t/fixtures/openai/responses-streaming-with-cache.sse new file mode 100644 index 000000000000..67953da93ac0 --- /dev/null +++ b/t/fixtures/openai/responses-streaming-with-cache.sse @@ -0,0 +1,6 @@ +event: response.output_text.delta +data: {"type":"response.output_text.delta","delta":"Hello"} + +event: response.completed +data: {"type":"response.completed","response":{"output":[{"type":"message","content":[{"type":"output_text","text":"Hello"}]}],"usage":{"input_tokens":20,"output_tokens":5,"total_tokens":25,"input_tokens_details":{"cached_tokens":10},"output_tokens_details":{"reasoning_tokens":3}}}} + diff --git a/t/fixtures/openai/responses-streaming-with-tool-call.sse b/t/fixtures/openai/responses-streaming-with-tool-call.sse new file mode 100644 index 000000000000..4c9dbc125ee6 --- /dev/null +++ b/t/fixtures/openai/responses-streaming-with-tool-call.sse @@ -0,0 +1,3 @@ +event: response.completed +data: {"type":"response.completed","response":{"output":[{"type":"function_call","id":"call_1","name":"get_weather","arguments":"{}"}],"usage":{"input_tokens":20,"output_tokens":5,"total_tokens":25}}} + diff --git a/t/fixtures/openai/responses-with-cache.json b/t/fixtures/openai/responses-with-cache.json new file mode 100644 index 000000000000..39c72846fd1e --- /dev/null +++ b/t/fixtures/openai/responses-with-cache.json @@ -0,0 +1,22 @@ +{ + "id": "resp_cache1", + "object": "response", + "created_at": 1723780938, + "model": "{{model}}", + "output": [ + { + "type": "message", + "role": "assistant", + "content": [ + { "type": "output_text", "text": "Hello" } + ] + } + ], + "usage": { + "input_tokens": 40, + "output_tokens": 20, + "total_tokens": 60, + "input_tokens_details": { "cached_tokens": 12 }, + "output_tokens_details": { "reasoning_tokens": 8 } + } +} diff --git a/t/plugin/ai-proxy-anthropic.t b/t/plugin/ai-proxy-anthropic.t index e5912d192499..67e435bbb883 100644 --- a/t/plugin/ai-proxy-anthropic.t +++ b/t/plugin/ai-proxy-anthropic.t @@ -36,6 +36,7 @@ add_block_preprocessor(sub { my $user_yaml_config = <<_EOC_; plugins: + - ai-proxy - ai-proxy-multi _EOC_ $block->set_value("extra_yaml_config", $user_yaml_config); @@ -1666,3 +1667,91 @@ OK OK --- no_error_log [error] + + + +=== TEST 47: set route for native Anthropic protocol built-in var tests +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "uri": "/v1/messages", + "plugins": { + "ai-proxy": { + "provider": "anthropic", + "auth": { + "header": { + "x-api-key": "test-key" + } + }, + "options": { + "model": "claude-3-5-sonnet-20241022" + }, + "override": { + "endpoint": "http://127.0.0.1:1980" + }, + "ssl_verify": false + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 48: Anthropic streaming accumulates total_tokens from split message_start and message_delta events +--- request +POST /v1/messages +{"messages":[{"role":"user","content":"Hello"}],"model":"claude-3-5-sonnet-20241022","max_tokens":1024,"stream":true} +--- more_headers +X-AI-Fixture: anthropic/messages-streaming.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" claude-3-5-sonnet-20241022 claude-3-5-sonnet-20241022 [\d.]+ 10 8 18 true false 0 / + + + +=== TEST 49: Anthropic streaming detects tool_use in content_block_start as tool call +--- request +POST /v1/messages +{"messages":[{"role":"user","content":"What is the weather?"}],"model":"claude-3-5-sonnet-20241022","max_tokens":1024,"stream":true} +--- more_headers +X-AI-Fixture: anthropic/messages-streaming-with-tool-use.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" claude-3-5-sonnet-20241022 claude-3-5-sonnet-20241022 [\d.]+ 20 5 25 true true 0 / + + + +=== TEST 50: Anthropic non-streaming writes cache tokens to access log +--- request +POST /v1/messages +{"messages":[{"role":"user","content":"Hello"}],"model":"claude-3-5-sonnet-20241022","max_tokens":1024} +--- more_headers +X-AI-Fixture: anthropic/messages-with-cache.json +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" claude-3-5-sonnet-20241022 claude-3-5-sonnet-20241022 [\d.]+ 50 30 80 false false 0 \S* 200 100 0/ + + + +=== TEST 51: Anthropic streaming writes cache tokens to access log +--- request +POST /v1/messages +{"messages":[{"role":"user","content":"Hello"}],"model":"claude-3-5-sonnet-20241022","max_tokens":1024,"stream":true} +--- more_headers +X-AI-Fixture: anthropic/messages-streaming-with-cache.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" claude-3-5-sonnet-20241022 claude-3-5-sonnet-20241022 [\d.]+ 50 30 80 true false 0 \S* 200 100 0/ diff --git a/t/plugin/ai-proxy.t b/t/plugin/ai-proxy.t index e73e41a86a4d..1e3b2ecd8d16 100644 --- a/t/plugin/ai-proxy.t +++ b/t/plugin/ai-proxy.t @@ -15,7 +15,6 @@ # limitations under the License. # - BEGIN { $ENV{TEST_ENABLE_CONTROL_API_V1} = "0"; } @@ -382,8 +381,8 @@ qr/path override works/ content_by_lua_block { local t = require("lib.test_admin").test local code, body = t('/apisix/admin/routes/1', - ngx.HTTP_PUT, - [[{ + ngx.HTTP_PUT, + [[{ "uri": "/anything", "plugins": { "ai-proxy": { @@ -615,7 +614,7 @@ POST /post "temperature": 1.0 }, "override": { - "endpoint": "http://localhost:6724" + "endpoint": "http://127.0.0.1:1980" }, "ssl_verify": false } @@ -635,20 +634,6 @@ passed === TEST 20: send request ---- http_config - server { - server_name openai; - listen 6724; - - default_type 'application/json'; - - location /v1/chat/completions { - content_by_lua_block { - local json = require("cjson.safe") - ngx.say(json.encode(ngx.req.get_headers())) - } - } - } --- request POST /anything { "messages": [ { "role": "system", "content": "You are a mathematician" }, { "role": "user", "content": "What is 1+1?"} ] } @@ -683,7 +668,7 @@ qr/"test-type":"header_forwarding"/ "temperature": 1.0 }, "override": { - "endpoint": "http://localhost:6724" + "endpoint": "http://127.0.0.1:1980" }, "ssl_verify": false }, @@ -705,20 +690,6 @@ passed === TEST 22: send request ---- http_config - server { - server_name openai; - listen 6724; - - default_type 'application/json'; - - location /v1/chat/completions { - content_by_lua_block { - local json = require("cjson.safe") - ngx.say(json.encode(ngx.req.get_headers())) - } - } - } --- request POST /anything { "messages": [ { "role": "system", "content": "You are a mathematician" }, { "role": "user", "content": "What is 1+1?"} ] } @@ -731,44 +702,17 @@ qr/"x-request-id":"[\d\w-]+"/ === TEST 23: send request with Authorization header ---- http_config - server { - server_name openai; - listen 6724; - - default_type 'application/json'; - - location /v1/chat/completions { - content_by_lua_block { - ngx.status = 200 - ngx.say("{}") - } - } - } --- request POST /anything { "messages": [ { "role": "system", "content": "You are a mathematician" }, { "role": "user", "content": "What is 1+1?"} ] } --- more_headers Authorization: Bearer wrong token +X-AI-Fixture: openai/chat-basic.json --- error_code: 200 === TEST 24b: Accept-Encoding header should be stripped before forwarding to provider ---- http_config - server { - server_name openai; - listen 6724; - - default_type 'application/json'; - - location /v1/chat/completions { - content_by_lua_block { - local json = require("cjson.safe") - ngx.say(json.encode(ngx.req.get_headers())) - } - } - } --- request POST /anything { "messages": [ { "role": "system", "content": "You are a mathematician" }, { "role": "user", "content": "What is 1+1?"} ] } diff --git a/t/plugin/ai-proxy3.t b/t/plugin/ai-proxy3.t index 3110ee04b258..501c849522a6 100644 --- a/t/plugin/ai-proxy3.t +++ b/t/plugin/ai-proxy3.t @@ -23,15 +23,6 @@ no_long_string(); no_root_location(); -my $resp_file = 't/assets/ai-proxy-response.json'; -open(my $fh, '<', $resp_file) or die "Could not open file '$resp_file' $!"; -my $resp = do { local $/; <$fh> }; -close($fh); - -print "Hello, World!\n"; -print $resp; - - add_block_preprocessor(sub { my ($block) = @_; @@ -96,7 +87,7 @@ X-AI-Fixture: openai/chat-basic.json --- response_body eval qr/.*completion_tokens.*/ --- access_log eval -qr/127\.0\.0\.1:1980 200 [\d.]+ \"http:\/\/127\.0\.0\.1\/v1\/chat\/completions\" gpt-4 gpt-3.5-turbo [\d.]+ 23 8.*/ +qr/127\.0\.0\.1:1980 200 [\d.]+ \"http:\/\/\S+\/v1\/chat\/completions\" gpt-4 gpt-3.5-turbo [\d.]+ 23 8.*/ @@ -256,4 +247,210 @@ passed --- response_body_like eval qr/6data: \[DONE\]\n\n/ --- access_log eval -qr/localhost:7737 200 [\d.]+ \"http:\/\/localhost\/v1\/chat\/completions\" gpt-4 gpt-3.5-turbo 2\d\d 15 20.*/ +qr/localhost:7737 200 [\d.]+ \"http:\/\/\S+\/v1\/chat\/completions\" gpt-4 gpt-3.5-turbo 2\d\d 15 20.*/ + + + +=== TEST 7: set route for built-in access log variable test +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/2', + ngx.HTTP_PUT, + [[{ + "uri": "/log-vars", + "plugins": { + "ai-proxy": { + "provider": "openai", + "auth": { + "header": { + "Authorization": "Bearer test-key" + } + }, + "options": { + "model": "gpt-4o" + }, + "override": { + "endpoint": "http://127.0.0.1:1980" + }, + "ssl_verify": false + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 8: non-streaming request writes llm built-in vars to access log +--- request +POST /log-vars +{"messages":[{"role":"user","content":"What is 1+1?"}],"model":"gpt-4o","user":"alice"} +--- more_headers +X-AI-Fixture: openai/chat-basic.json +--- error_code: 200 +--- response_body eval +qr/.*completion_tokens.*/ +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 23 8 31 false false 0 alice/ + + + +=== TEST 9: streaming request writes llm built-in vars to access log +--- request +POST /log-vars +{"messages":[{"role":"user","content":"Hello"}],"model":"gpt-4o","stream":true} +--- more_headers +X-AI-Fixture: openai/chat-multi-chunk.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 10 2 12 true false 0 / + + + +=== TEST 10: response with cached tokens writes llm_cache_read_input_tokens to access log +--- request +POST /log-vars +{"messages":[{"role":"user","content":"Solve this"}],"model":"gpt-4o"} +--- more_headers +X-AI-Fixture: openai/chat-with-reasoning.json +--- error_code: 200 +--- response_body eval +qr/.*completion_tokens.*/ +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 30 15 45 false false 0 \S* 10 5 0/ + + + +=== TEST 11: response with tool calls sets llm_has_tool_calls=true and llm_tool_count +--- request +POST /log-vars +{"messages":[{"role":"user","content":"What is the weather?"}],"model":"gpt-4o","tools":[{"type":"function","function":{"name":"get_weather","parameters":{}}}]} +--- more_headers +X-AI-Fixture: openai/chat-with-tool-calls.json +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 50 20 70 false true 1 / + + + +=== TEST 12: safety_identifier field is used as llm_end_user_id +--- request +POST /log-vars +{"messages":[{"role":"user","content":"Hello"}],"model":"gpt-4o","safety_identifier":"user-xyz"} +--- more_headers +X-AI-Fixture: openai/chat-basic.json +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 23 8 31 false false 0 user-xyz/ + + + +=== TEST 13: OpenAI Chat streaming detects tool_calls delta and sets llm_has_tool_calls=true +--- request +POST /log-vars +{"messages":[{"role":"user","content":"What is the weather?"}],"model":"gpt-4o","stream":true} +--- more_headers +X-AI-Fixture: openai/chat-streaming-with-tool-calls.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 15 10 25 true true 0 / + + + +=== TEST 14: set route for Responses API built-in var tests +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/3', + ngx.HTTP_PUT, + [[{ + "uri": "/ai/v1/responses", + "plugins": { + "ai-proxy": { + "provider": "openai", + "auth": { + "header": { + "Authorization": "Bearer test-key" + } + }, + "options": { + "model": "gpt-4o-mini" + }, + "override": { + "endpoint": "http://127.0.0.1:1980" + }, + "ssl_verify": false + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 15: Responses API streaming sets llm_cache_read_input_tokens and llm_reasoning_tokens +--- request +POST /ai/v1/responses +{"input":"Hello","model":"gpt-4o-mini","stream":true} +--- more_headers +X-AI-Fixture: openai/responses-streaming-with-cache.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o-mini gpt-4o-mini [\d.]+ 20 5 25 true false 0 \S* 10 0 3/ + + + +=== TEST 16: Responses API streaming detects function_call in response.output as tool call +--- request +POST /ai/v1/responses +{"input":"What is the weather?","model":"gpt-4o-mini","stream":true} +--- more_headers +X-AI-Fixture: openai/responses-streaming-with-tool-call.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o-mini gpt-4o-mini [\d.]+ 20 5 25 true true 0 / + + + +=== TEST 17: OpenAI Chat streaming writes cache and reasoning tokens to access log +--- request +POST /log-vars +{"messages":[{"role":"user","content":"Solve this"}],"model":"gpt-4o","stream":true} +--- more_headers +X-AI-Fixture: openai/chat-streaming-with-cache.sse +--- error_code: 200 +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o gpt-4o [\d.]+ 30 15 45 true false 0 \S* 10 5 7/ + + + +=== TEST 18: Responses API non-streaming writes cache and reasoning tokens to access log +--- request +POST /ai/v1/responses +{"input":"Solve this","model":"gpt-4o-mini"} +--- more_headers +X-AI-Fixture: openai/responses-with-cache.json +--- error_code: 200 +--- response_body eval +qr/.*output.*/ +--- access_log eval +qr/127\.0\.0\.1:1980 200 [\d.]+ \"\S+\" gpt-4o-mini gpt-4o-mini [\d.]+ 40 20 60 false false 0 \S* 12 0 8/