Skip to content

Commit 376d1ea

Browse files
authored
Fix anthropic streaming (#423)
1 parent 6a50633 commit 376d1ea

2 files changed

Lines changed: 132 additions & 2 deletions

File tree

packages/proxy/src/providers/anthropic.test.ts

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { it, expect, describe } from "vitest";
22
import { callProxyV1, createCapturingFetch } from "../../utils/tests";
3+
import { FetchFn } from "../proxy";
34
import {
45
OpenAIChatCompletion,
56
OpenAIChatCompletionChunk,
@@ -51,6 +52,100 @@ it("should convert OpenAI streaming request to Anthropic and back", async () =>
5152
expect(hasContent).toBe(true);
5253
});
5354

55+
it("should request identity encoding for streaming Anthropic chat completions", async () => {
56+
const { fetch, requests } = createCapturingFetch({ captureOnly: true });
57+
58+
await callProxyV1<OpenAIChatCompletionCreateParams, OpenAIChatCompletion>({
59+
body: {
60+
model: "claude-3-haiku-20240307",
61+
messages: [{ role: "user", content: "Stream a short response." }],
62+
stream: true,
63+
max_tokens: 32,
64+
},
65+
fetch,
66+
});
67+
68+
expect(requests).toHaveLength(1);
69+
expect(requests[0].headers["accept-encoding"]).toBe("identity");
70+
});
71+
72+
it("should mark streaming responses as no-transform", async () => {
73+
const encoder = new TextEncoder();
74+
const anthropicEvents = [
75+
{
76+
type: "message_start",
77+
message: {
78+
id: "msg_test",
79+
type: "message",
80+
role: "assistant",
81+
content: [],
82+
model: "claude-3-haiku-20240307",
83+
stop_reason: null,
84+
stop_sequence: null,
85+
usage: { input_tokens: 4, output_tokens: 0 },
86+
},
87+
},
88+
{
89+
type: "content_block_start",
90+
index: 0,
91+
content_block: { type: "text", text: "" },
92+
},
93+
{
94+
type: "content_block_delta",
95+
index: 0,
96+
delta: { type: "text_delta", text: "hello" },
97+
},
98+
{
99+
type: "message_delta",
100+
delta: { stop_reason: "end_turn", stop_sequence: null },
101+
usage: { output_tokens: 1 },
102+
},
103+
];
104+
const fetch: FetchFn = async () =>
105+
new Response(
106+
new ReadableStream({
107+
start(controller) {
108+
for (const event of anthropicEvents) {
109+
controller.enqueue(
110+
encoder.encode(`data: ${JSON.stringify(event)}\n\n`),
111+
);
112+
}
113+
controller.close();
114+
},
115+
}),
116+
{
117+
headers: {
118+
"content-type": "text/event-stream; charset=utf-8",
119+
},
120+
},
121+
);
122+
123+
const { headers, events } = await callProxyV1<
124+
OpenAIChatCompletionCreateParams,
125+
OpenAIChatCompletionChunk
126+
>({
127+
body: {
128+
model: "claude-3-haiku-20240307",
129+
messages: [{ role: "user", content: "Say hello." }],
130+
stream: true,
131+
max_tokens: 32,
132+
},
133+
fetch,
134+
getApiSecrets: async () => [
135+
{
136+
type: "anthropic",
137+
secret: "test-secret",
138+
name: "anthropic",
139+
},
140+
],
141+
});
142+
143+
expect(headers["cache-control"]).toContain("no-transform");
144+
expect(
145+
events().some((event) => event.data.choices[0]?.delta?.content === "hello"),
146+
).toBe(true);
147+
});
148+
54149
it("should convert OpenAI non-streaming request to Anthropic and back", async () => {
55150
const { json } = await callProxyV1<
56151
OpenAIChatCompletionCreateParams,

packages/proxy/src/proxy.ts

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ export async function proxyV1({
475475
const startTime = getCurrentUnixTimestamp();
476476
let spanType: SpanType | undefined = undefined;
477477
const isStreaming = !!bodyData?.stream;
478+
let responseCacheControl: string | undefined;
478479

479480
let stream: ReadableStream<Uint8Array> | null = null;
480481
if (readFromCache) {
@@ -498,7 +499,8 @@ export async function proxyV1({
498499
setHeader(name, value);
499500
}
500501
setHeader(CACHED_HEADER, "HIT");
501-
setHeader("cache-control", `max-age=${responseMaxAge}`);
502+
responseCacheControl = `max-age=${responseMaxAge}`;
503+
setHeader("cache-control", responseCacheControl);
502504
setHeader("age", `${age}`);
503505

504506
spanType = guessSpanType(url, bodyData?.model);
@@ -710,7 +712,8 @@ export async function proxyV1({
710712
}
711713
setHeader(CACHED_HEADER, "MISS");
712714
if (writeToCache) {
713-
setHeader("cache-control", `max-age=${cacheTTL}`);
715+
responseCacheControl = `max-age=${cacheTTL}`;
716+
setHeader("cache-control", responseCacheControl);
714717
setHeader("age", "0");
715718
}
716719

@@ -745,6 +748,14 @@ export async function proxyV1({
745748
}
746749
}
747750

751+
if (stream && isStreaming) {
752+
responseCacheControl = appendCacheControlDirective(
753+
responseCacheControl,
754+
"no-transform",
755+
);
756+
setHeader("cache-control", responseCacheControl);
757+
}
758+
748759
if (stream) {
749760
let first = true;
750761
const allChunks: Uint8Array[] = [];
@@ -2806,6 +2817,13 @@ async function fetchAnthropicChatCompletions({
28062817
}
28072818
}
28082819

2820+
if (secret.type === "anthropic" && params.stream === true) {
2821+
// Cloudflare Workers auto-negotiates compression on subrequests. For SSE,
2822+
// force identity encoding so the Anthropic stream is transformed and
2823+
// forwarded incrementally instead of being buffered behind compression.
2824+
headers["accept-encoding"] = "identity";
2825+
}
2826+
28092827
if (secret.type === "bedrock") {
28102828
return fetchBedrockAnthropic({
28112829
secret,
@@ -3429,6 +3447,23 @@ export function createEventStreamTransformer(
34293447
}
34303448
// --------------------------------------------------
34313449

3450+
function appendCacheControlDirective(
3451+
value: string | undefined,
3452+
directive: string,
3453+
): string {
3454+
const normalizedDirective = directive.toLowerCase();
3455+
const existingDirectives = value
3456+
?.split(",")
3457+
.map((part) => part.trim().toLowerCase())
3458+
.filter((part) => part.length > 0);
3459+
3460+
if (existingDirectives?.includes(normalizedDirective)) {
3461+
return value!;
3462+
}
3463+
3464+
return value ? `${value}, ${directive}` : directive;
3465+
}
3466+
34323467
function parseEnumHeader<T>(
34333468
headerName: string,
34343469
headerTypes: readonly T[],

0 commit comments

Comments
 (0)