Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions packages/proxy/src/providers/anthropic.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { it, expect, describe } from "vitest";
import { callProxyV1, createCapturingFetch } from "../../utils/tests";
import { FetchFn } from "../proxy";
import {
OpenAIChatCompletion,
OpenAIChatCompletionChunk,
Expand Down Expand Up @@ -51,6 +52,100 @@ it("should convert OpenAI streaming request to Anthropic and back", async () =>
expect(hasContent).toBe(true);
});

it("should request identity encoding for streaming Anthropic chat completions", async () => {
const { fetch, requests } = createCapturingFetch({ captureOnly: true });

await callProxyV1<OpenAIChatCompletionCreateParams, OpenAIChatCompletion>({
body: {
model: "claude-3-haiku-20240307",
messages: [{ role: "user", content: "Stream a short response." }],
stream: true,
max_tokens: 32,
},
fetch,
});

expect(requests).toHaveLength(1);
expect(requests[0].headers["accept-encoding"]).toBe("identity");
});

it("should mark streaming responses as no-transform", async () => {
const encoder = new TextEncoder();
const anthropicEvents = [
{
type: "message_start",
message: {
id: "msg_test",
type: "message",
role: "assistant",
content: [],
model: "claude-3-haiku-20240307",
stop_reason: null,
stop_sequence: null,
usage: { input_tokens: 4, output_tokens: 0 },
},
},
{
type: "content_block_start",
index: 0,
content_block: { type: "text", text: "" },
},
{
type: "content_block_delta",
index: 0,
delta: { type: "text_delta", text: "hello" },
},
{
type: "message_delta",
delta: { stop_reason: "end_turn", stop_sequence: null },
usage: { output_tokens: 1 },
},
];
const fetch: FetchFn = async () =>
new Response(
new ReadableStream({
start(controller) {
for (const event of anthropicEvents) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify(event)}\n\n`),
);
}
controller.close();
},
}),
{
headers: {
"content-type": "text/event-stream; charset=utf-8",
},
},
);

const { headers, events } = await callProxyV1<
OpenAIChatCompletionCreateParams,
OpenAIChatCompletionChunk
>({
body: {
model: "claude-3-haiku-20240307",
messages: [{ role: "user", content: "Say hello." }],
stream: true,
max_tokens: 32,
},
fetch,
getApiSecrets: async () => [
{
type: "anthropic",
secret: "test-secret",
name: "anthropic",
},
],
});

expect(headers["cache-control"]).toContain("no-transform");
expect(
events().some((event) => event.data.choices[0]?.delta?.content === "hello"),
).toBe(true);
});

it("should convert OpenAI non-streaming request to Anthropic and back", async () => {
const { json } = await callProxyV1<
OpenAIChatCompletionCreateParams,
Expand Down
39 changes: 37 additions & 2 deletions packages/proxy/src/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ export async function proxyV1({
const startTime = getCurrentUnixTimestamp();
let spanType: SpanType | undefined = undefined;
const isStreaming = !!bodyData?.stream;
let responseCacheControl: string | undefined;

let stream: ReadableStream<Uint8Array> | null = null;
if (readFromCache) {
Expand All @@ -498,7 +499,8 @@ export async function proxyV1({
setHeader(name, value);
}
setHeader(CACHED_HEADER, "HIT");
setHeader("cache-control", `max-age=${responseMaxAge}`);
responseCacheControl = `max-age=${responseMaxAge}`;
setHeader("cache-control", responseCacheControl);
setHeader("age", `${age}`);

spanType = guessSpanType(url, bodyData?.model);
Expand Down Expand Up @@ -710,7 +712,8 @@ export async function proxyV1({
}
setHeader(CACHED_HEADER, "MISS");
if (writeToCache) {
setHeader("cache-control", `max-age=${cacheTTL}`);
responseCacheControl = `max-age=${cacheTTL}`;
setHeader("cache-control", responseCacheControl);
setHeader("age", "0");
}

Expand Down Expand Up @@ -745,6 +748,14 @@ export async function proxyV1({
}
}

if (stream && isStreaming) {
responseCacheControl = appendCacheControlDirective(
responseCacheControl,
"no-transform",
);
setHeader("cache-control", responseCacheControl);
}

if (stream) {
let first = true;
const allChunks: Uint8Array[] = [];
Expand Down Expand Up @@ -2806,6 +2817,13 @@ async function fetchAnthropicChatCompletions({
}
}

if (secret.type === "anthropic" && params.stream === true) {
// Cloudflare Workers auto-negotiates compression on subrequests. For SSE,
// force identity encoding so the Anthropic stream is transformed and
// forwarded incrementally instead of being buffered behind compression.
headers["accept-encoding"] = "identity";
}

if (secret.type === "bedrock") {
return fetchBedrockAnthropic({
secret,
Expand Down Expand Up @@ -3429,6 +3447,23 @@ export function createEventStreamTransformer(
}
// --------------------------------------------------

function appendCacheControlDirective(
value: string | undefined,
directive: string,
): string {
const normalizedDirective = directive.toLowerCase();
const existingDirectives = value
?.split(",")
.map((part) => part.trim().toLowerCase())
.filter((part) => part.length > 0);

if (existingDirectives?.includes(normalizedDirective)) {
return value!;
}

return value ? `${value}, ${directive}` : directive;
}

function parseEnumHeader<T>(
headerName: string,
headerTypes: readonly T[],
Expand Down
Loading