From 9ae5b54ee0c0018bab25ab4c7f3627be95bd8023 Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Fri, 27 Mar 2026 15:48:42 -0700 Subject: [PATCH] fix --- .../proxy/src/providers/anthropic.test.ts | 95 +++++++++++++++++++ packages/proxy/src/proxy.ts | 39 +++++++- 2 files changed, 132 insertions(+), 2 deletions(-) diff --git a/packages/proxy/src/providers/anthropic.test.ts b/packages/proxy/src/providers/anthropic.test.ts index 238e8df0..0dd33b2d 100644 --- a/packages/proxy/src/providers/anthropic.test.ts +++ b/packages/proxy/src/providers/anthropic.test.ts @@ -1,5 +1,6 @@ import { it, expect, describe } from "vitest"; import { callProxyV1, createCapturingFetch } from "../../utils/tests"; +import { FetchFn } from "../proxy"; import { OpenAIChatCompletion, OpenAIChatCompletionChunk, @@ -51,6 +52,100 @@ it("should convert OpenAI streaming request to Anthropic and back", async () => expect(hasContent).toBe(true); }); +it("should request identity encoding for streaming Anthropic chat completions", async () => { + const { fetch, requests } = createCapturingFetch({ captureOnly: true }); + + await callProxyV1({ + body: { + model: "claude-3-haiku-20240307", + messages: [{ role: "user", content: "Stream a short response." }], + stream: true, + max_tokens: 32, + }, + fetch, + }); + + expect(requests).toHaveLength(1); + expect(requests[0].headers["accept-encoding"]).toBe("identity"); +}); + +it("should mark streaming responses as no-transform", async () => { + const encoder = new TextEncoder(); + const anthropicEvents = [ + { + type: "message_start", + message: { + id: "msg_test", + type: "message", + role: "assistant", + content: [], + model: "claude-3-haiku-20240307", + stop_reason: null, + stop_sequence: null, + usage: { input_tokens: 4, output_tokens: 0 }, + }, + }, + { + type: "content_block_start", + index: 0, + content_block: { type: "text", text: "" }, + }, + { + type: "content_block_delta", + index: 0, + delta: { type: "text_delta", text: "hello" }, + }, + { + type: "message_delta", + delta: { stop_reason: "end_turn", stop_sequence: null }, + usage: { output_tokens: 1 }, + }, + ]; + const fetch: FetchFn = async () => + new Response( + new ReadableStream({ + start(controller) { + for (const event of anthropicEvents) { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify(event)}\n\n`), + ); + } + controller.close(); + }, + }), + { + headers: { + "content-type": "text/event-stream; charset=utf-8", + }, + }, + ); + + const { headers, events } = await callProxyV1< + OpenAIChatCompletionCreateParams, + OpenAIChatCompletionChunk + >({ + body: { + model: "claude-3-haiku-20240307", + messages: [{ role: "user", content: "Say hello." }], + stream: true, + max_tokens: 32, + }, + fetch, + getApiSecrets: async () => [ + { + type: "anthropic", + secret: "test-secret", + name: "anthropic", + }, + ], + }); + + expect(headers["cache-control"]).toContain("no-transform"); + expect( + events().some((event) => event.data.choices[0]?.delta?.content === "hello"), + ).toBe(true); +}); + it("should convert OpenAI non-streaming request to Anthropic and back", async () => { const { json } = await callProxyV1< OpenAIChatCompletionCreateParams, diff --git a/packages/proxy/src/proxy.ts b/packages/proxy/src/proxy.ts index 23e9dc04..5009d567 100644 --- a/packages/proxy/src/proxy.ts +++ b/packages/proxy/src/proxy.ts @@ -475,6 +475,7 @@ export async function proxyV1({ const startTime = getCurrentUnixTimestamp(); let spanType: SpanType | undefined = undefined; const isStreaming = !!bodyData?.stream; + let responseCacheControl: string | undefined; let stream: ReadableStream | null = null; if (readFromCache) { @@ -498,7 +499,8 @@ export async function proxyV1({ setHeader(name, value); } setHeader(CACHED_HEADER, "HIT"); - setHeader("cache-control", `max-age=${responseMaxAge}`); + responseCacheControl = `max-age=${responseMaxAge}`; + setHeader("cache-control", responseCacheControl); setHeader("age", `${age}`); spanType = guessSpanType(url, bodyData?.model); @@ -710,7 +712,8 @@ export async function proxyV1({ } setHeader(CACHED_HEADER, "MISS"); if (writeToCache) { - setHeader("cache-control", `max-age=${cacheTTL}`); + responseCacheControl = `max-age=${cacheTTL}`; + setHeader("cache-control", responseCacheControl); setHeader("age", "0"); } @@ -745,6 +748,14 @@ export async function proxyV1({ } } + if (stream && isStreaming) { + responseCacheControl = appendCacheControlDirective( + responseCacheControl, + "no-transform", + ); + setHeader("cache-control", responseCacheControl); + } + if (stream) { let first = true; const allChunks: Uint8Array[] = []; @@ -2806,6 +2817,13 @@ async function fetchAnthropicChatCompletions({ } } + if (secret.type === "anthropic" && params.stream === true) { + // Cloudflare Workers auto-negotiates compression on subrequests. For SSE, + // force identity encoding so the Anthropic stream is transformed and + // forwarded incrementally instead of being buffered behind compression. + headers["accept-encoding"] = "identity"; + } + if (secret.type === "bedrock") { return fetchBedrockAnthropic({ secret, @@ -3429,6 +3447,23 @@ export function createEventStreamTransformer( } // -------------------------------------------------- +function appendCacheControlDirective( + value: string | undefined, + directive: string, +): string { + const normalizedDirective = directive.toLowerCase(); + const existingDirectives = value + ?.split(",") + .map((part) => part.trim().toLowerCase()) + .filter((part) => part.length > 0); + + if (existingDirectives?.includes(normalizedDirective)) { + return value!; + } + + return value ? `${value}, ${directive}` : directive; +} + function parseEnumHeader( headerName: string, headerTypes: readonly T[],