diff --git a/js/src/auto-instrumentations/configs/anthropic.ts b/js/src/auto-instrumentations/configs/anthropic.ts index 04873047..a0ae7d31 100644 --- a/js/src/auto-instrumentations/configs/anthropic.ts +++ b/js/src/auto-instrumentations/configs/anthropic.ts @@ -57,4 +57,71 @@ export const anthropicConfigs: InstrumentationConfig[] = [ kind: "Async", }, }, + + // Message Batches API (>=0.39.0 layout) + { + channelName: anthropicChannels.messagesBatchesCreate.channelName, + module: { + name: "@anthropic-ai/sdk", + versionRange: ">=0.39.0", + filePath: "resources/messages/batches.mjs", + }, + functionQuery: { + className: "Batches", + methodName: "create", + kind: "Async", + }, + }, + { + channelName: anthropicChannels.messagesBatchesRetrieve.channelName, + module: { + name: "@anthropic-ai/sdk", + versionRange: ">=0.39.0", + filePath: "resources/messages/batches.mjs", + }, + functionQuery: { + className: "Batches", + methodName: "retrieve", + kind: "Async", + }, + }, + { + channelName: anthropicChannels.messagesBatchesList.channelName, + module: { + name: "@anthropic-ai/sdk", + versionRange: ">=0.39.0", + filePath: "resources/messages/batches.mjs", + }, + functionQuery: { + className: "Batches", + methodName: "list", + kind: "Async", + }, + }, + { + channelName: anthropicChannels.messagesBatchesCancel.channelName, + module: { + name: "@anthropic-ai/sdk", + versionRange: ">=0.39.0", + filePath: "resources/messages/batches.mjs", + }, + functionQuery: { + className: "Batches", + methodName: "cancel", + kind: "Async", + }, + }, + { + channelName: anthropicChannels.messagesBatchesDelete.channelName, + module: { + name: "@anthropic-ai/sdk", + versionRange: ">=0.39.0", + filePath: "resources/messages/batches.mjs", + }, + functionQuery: { + className: "Batches", + methodName: "delete", + kind: "Async", + }, + }, ]; diff --git a/js/src/instrumentation/plugins/anthropic-channels.ts b/js/src/instrumentation/plugins/anthropic-channels.ts index 48a069f7..bc5c3c0f 100644 --- a/js/src/instrumentation/plugins/anthropic-channels.ts +++ b/js/src/instrumentation/plugins/anthropic-channels.ts @@ -1,7 +1,10 @@ import { channel, defineChannels } from "../core/channel-definitions"; import type { + AnthropicBatchCreateParams, + AnthropicBatchListParams, AnthropicCreateParams, AnthropicMessage, + AnthropicMessageBatch, AnthropicStreamEvent, } from "../../vendor-sdk-types/anthropic"; @@ -26,4 +29,40 @@ export const anthropicChannels = defineChannels("@anthropic-ai/sdk", { channelName: "beta.messages.create", kind: "async", }), + messagesBatchesCreate: channel< + [AnthropicBatchCreateParams], + AnthropicMessageBatch, + Record + >({ + channelName: "messages.batches.create", + kind: "async", + }), + messagesBatchesRetrieve: channel< + [string], + AnthropicMessageBatch, + Record + >({ + channelName: "messages.batches.retrieve", + kind: "async", + }), + messagesBatchesList: channel< + [AnthropicBatchListParams?], + unknown, + Record + >({ + channelName: "messages.batches.list", + kind: "async", + }), + messagesBatchesCancel: channel< + [string], + AnthropicMessageBatch, + Record + >({ + channelName: "messages.batches.cancel", + kind: "async", + }), + messagesBatchesDelete: channel<[string], unknown, Record>({ + channelName: "messages.batches.delete", + kind: "async", + }), }); diff --git a/js/src/instrumentation/plugins/anthropic-plugin.test.ts b/js/src/instrumentation/plugins/anthropic-plugin.test.ts index 889f83d0..91206951 100644 --- a/js/src/instrumentation/plugins/anthropic-plugin.test.ts +++ b/js/src/instrumentation/plugins/anthropic-plugin.test.ts @@ -93,8 +93,8 @@ describe("AnthropicPlugin", () => { plugin.enable(); plugin.disable(); - // Should have subscribed twice - expect(mockChannel.subscribe).toHaveBeenCalledTimes(4); // 2 channels × 2 enables + // Should have subscribed twice (7 channels × 2 enables) + expect(mockChannel.subscribe).toHaveBeenCalledTimes(14); // 7 channels × 2 enables }); }); }); diff --git a/js/src/instrumentation/plugins/anthropic-plugin.ts b/js/src/instrumentation/plugins/anthropic-plugin.ts index 862bf758..73c41387 100644 --- a/js/src/instrumentation/plugins/anthropic-plugin.ts +++ b/js/src/instrumentation/plugins/anthropic-plugin.ts @@ -1,5 +1,9 @@ import { BasePlugin } from "../core"; -import { traceStreamingChannel, unsubscribeAll } from "../core/channel-tracing"; +import { + traceAsyncChannel, + traceStreamingChannel, + unsubscribeAll, +} from "../core/channel-tracing"; import { Attachment } from "../../logger"; import { SpanTypeAttribute, isObject } from "../../../util/index"; import { filterFrom, getCurrentUnixTimestamp } from "../../util"; @@ -95,6 +99,75 @@ export class AnthropicPlugin extends BasePlugin { name: "anthropic.messages.create", }), ); + + // Message Batches API + this.unsubscribers.push( + traceAsyncChannel(anthropicChannels.messagesBatchesCreate, { + name: "anthropic.messages.batches.create", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => ({ + input: params.requests, + metadata: { + ...filterFrom(params, ["requests"]), + provider: "anthropic", + }, + }), + extractOutput: (result) => result ?? null, + extractMetrics: () => ({}), + }), + ); + + this.unsubscribers.push( + traceAsyncChannel(anthropicChannels.messagesBatchesRetrieve, { + name: "anthropic.messages.batches.retrieve", + type: SpanTypeAttribute.LLM, + extractInput: ([batchId]) => ({ + input: batchId, + metadata: { provider: "anthropic" }, + }), + extractOutput: (result) => result ?? null, + extractMetrics: () => ({}), + }), + ); + + this.unsubscribers.push( + traceAsyncChannel(anthropicChannels.messagesBatchesList, { + name: "anthropic.messages.batches.list", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => ({ + input: params ?? null, + metadata: { provider: "anthropic" }, + }), + extractOutput: (result) => result ?? null, + extractMetrics: () => ({}), + }), + ); + + this.unsubscribers.push( + traceAsyncChannel(anthropicChannels.messagesBatchesCancel, { + name: "anthropic.messages.batches.cancel", + type: SpanTypeAttribute.LLM, + extractInput: ([batchId]) => ({ + input: batchId, + metadata: { provider: "anthropic" }, + }), + extractOutput: (result) => result ?? null, + extractMetrics: () => ({}), + }), + ); + + this.unsubscribers.push( + traceAsyncChannel(anthropicChannels.messagesBatchesDelete, { + name: "anthropic.messages.batches.delete", + type: SpanTypeAttribute.LLM, + extractInput: ([batchId]) => ({ + input: batchId, + metadata: { provider: "anthropic" }, + }), + extractOutput: (result) => result ?? null, + extractMetrics: () => ({}), + }), + ); } } diff --git a/js/src/vendor-sdk-types/anthropic.ts b/js/src/vendor-sdk-types/anthropic.ts index 44025114..bd20d867 100644 --- a/js/src/vendor-sdk-types/anthropic.ts +++ b/js/src/vendor-sdk-types/anthropic.ts @@ -23,6 +23,51 @@ export interface AnthropicMessages { ) => AnthropicAPIPromise< AnthropicMessage | AsyncIterable >; + batches: AnthropicBatches; +} + +// Batches API + +export interface AnthropicBatches { + create: ( + params: AnthropicBatchCreateParams, + ) => AnthropicAPIPromise; + retrieve: (batchId: string) => AnthropicAPIPromise; + list: (params?: AnthropicBatchListParams) => AnthropicAPIPromise; + cancel: (batchId: string) => AnthropicAPIPromise; + delete: (batchId: string) => AnthropicAPIPromise; +} + +export interface AnthropicBatchCreateParams { + requests: AnthropicBatchRequest[]; + [key: string]: unknown; +} + +export interface AnthropicBatchRequest { + custom_id: string; + params: AnthropicCreateParams; +} + +export interface AnthropicBatchListParams { + [key: string]: unknown; +} + +export interface AnthropicMessageBatch { + id: string; + type: "message_batch"; + processing_status: "in_progress" | "canceling" | "ended" | string; + request_counts: { + processing: number; + succeeded: number; + errored: number; + canceled: number; + expired: number; + }; + created_at: string; + expires_at: string; + archived_at?: string | null; + cancel_initiated_at?: string | null; + results_url?: string | null; } export interface AnthropicAPIPromise extends Promise { diff --git a/js/src/wrappers/anthropic.ts b/js/src/wrappers/anthropic.ts index 7048e5f4..bdb9fac6 100644 --- a/js/src/wrappers/anthropic.ts +++ b/js/src/wrappers/anthropic.ts @@ -1,6 +1,7 @@ import { anthropicChannels } from "../instrumentation/plugins/anthropic-channels"; import { TypedApplyProxy } from "../typed-instrumentation-helpers"; import type { + AnthropicBatches, AnthropicBeta, AnthropicClient, AnthropicMessages, @@ -79,11 +80,86 @@ function messagesProxy( return createProxy(target.create, channel); } + if (prop === "batches" && target.batches) { + return batchesProxy(target.batches); + } + return Reflect.get(target, prop, receiver); }, }); } +function batchesProxy(batches: AnthropicBatches): AnthropicBatches { + return new Proxy(batches, { + get(target, prop, receiver) { + switch (prop) { + case "create": + return new TypedApplyProxy(target.create, { + apply(fn, thisArg, args) { + return anthropicChannels.messagesBatchesCreate.tracePromise( + () => + Reflect.apply(fn, thisArg, args) as ReturnType< + AnthropicBatches["create"] + >, + { arguments: args }, + ); + }, + }); + case "retrieve": + return new TypedApplyProxy(target.retrieve, { + apply(fn, thisArg, args) { + return anthropicChannels.messagesBatchesRetrieve.tracePromise( + () => + Reflect.apply(fn, thisArg, args) as ReturnType< + AnthropicBatches["retrieve"] + >, + { arguments: args }, + ); + }, + }); + case "list": + return new TypedApplyProxy(target.list, { + apply(fn, thisArg, args) { + return anthropicChannels.messagesBatchesList.tracePromise( + () => + Reflect.apply(fn, thisArg, args) as ReturnType< + AnthropicBatches["list"] + >, + { arguments: args }, + ); + }, + }); + case "cancel": + return new TypedApplyProxy(target.cancel, { + apply(fn, thisArg, args) { + return anthropicChannels.messagesBatchesCancel.tracePromise( + () => + Reflect.apply(fn, thisArg, args) as ReturnType< + AnthropicBatches["cancel"] + >, + { arguments: args }, + ); + }, + }); + case "delete": + return new TypedApplyProxy(target.delete, { + apply(fn, thisArg, args) { + return anthropicChannels.messagesBatchesDelete.tracePromise( + () => + Reflect.apply(fn, thisArg, args) as ReturnType< + AnthropicBatches["delete"] + >, + { arguments: args }, + ); + }, + }); + default: + return Reflect.get(target, prop, receiver); + } + }, + }); +} + function createProxy( create: AnthropicMessages["create"], channel: