From 43dd61d2505b00230b2d29020e43cb23b161b0ec Mon Sep 17 00:00:00 2001 From: Stephen Belanger Date: Sat, 28 Mar 2026 00:13:47 +0800 Subject: [PATCH 1/2] feat: Add Message Batches API instrumentation (#1691) - Add vendor types for AnthropicBatches, AnthropicMessageBatch, and batch params - Add diagnostic channels for create, retrieve, list, cancel, and delete batch ops - Add batchesProxy() in wrapAnthropic() to intercept all Batches methods - Add AnthropicPlugin subscriptions for batch channels via traceAsyncChannel - Add auto-instrumentation configs targeting Batches class methods (>=0.39.0) Co-Authored-By: Claude Sonnet 4.6 --- .../configs/anthropic.ts | 67 ++++++++++++++++ .../plugins/anthropic-channels.ts | 39 ++++++++++ .../plugins/anthropic-plugin.test.ts | 4 +- .../plugins/anthropic-plugin.ts | 77 ++++++++++++++++++- js/src/vendor-sdk-types/anthropic.ts | 45 +++++++++++ js/src/wrappers/anthropic.ts | 76 ++++++++++++++++++ 6 files changed, 305 insertions(+), 3 deletions(-) diff --git a/js/src/auto-instrumentations/configs/anthropic.ts b/js/src/auto-instrumentations/configs/anthropic.ts index 048730471..a0ae7d31d 100644 --- a/js/src/auto-instrumentations/configs/anthropic.ts +++ b/js/src/auto-instrumentations/configs/anthropic.ts @@ -57,4 +57,71 @@ export const anthropicConfigs: InstrumentationConfig[] = [ kind: "Async", }, }, + + // Message Batches API (>=0.39.0 layout) + { + channelName: anthropicChannels.messagesBatchesCreate.channelName, + module: { + name: "@anthropic-ai/sdk", + versionRange: ">=0.39.0", + filePath: "resources/messages/batches.mjs", + }, + functionQuery: { + className: "Batches", + methodName: "create", + kind: "Async", + }, + }, + { + channelName: anthropicChannels.messagesBatchesRetrieve.channelName, + module: { + name: "@anthropic-ai/sdk", + versionRange: ">=0.39.0", + filePath: "resources/messages/batches.mjs", + }, + functionQuery: { + className: "Batches", + methodName: "retrieve", + kind: "Async", + }, + }, + { + channelName: anthropicChannels.messagesBatchesList.channelName, + module: { + name: "@anthropic-ai/sdk", + versionRange: ">=0.39.0", + filePath: "resources/messages/batches.mjs", + }, + functionQuery: { + className: "Batches", + methodName: "list", + kind: "Async", + }, + }, + { + channelName: anthropicChannels.messagesBatchesCancel.channelName, + module: { + name: "@anthropic-ai/sdk", + versionRange: ">=0.39.0", + filePath: "resources/messages/batches.mjs", + }, + functionQuery: { + className: "Batches", + methodName: "cancel", + kind: "Async", + }, + }, + { + channelName: anthropicChannels.messagesBatchesDelete.channelName, + module: { + name: "@anthropic-ai/sdk", + versionRange: ">=0.39.0", + filePath: "resources/messages/batches.mjs", + }, + functionQuery: { + className: "Batches", + methodName: "delete", + kind: "Async", + }, + }, ]; diff --git a/js/src/instrumentation/plugins/anthropic-channels.ts b/js/src/instrumentation/plugins/anthropic-channels.ts index 48a069f79..bc5c3c0f0 100644 --- a/js/src/instrumentation/plugins/anthropic-channels.ts +++ b/js/src/instrumentation/plugins/anthropic-channels.ts @@ -1,7 +1,10 @@ import { channel, defineChannels } from "../core/channel-definitions"; import type { + AnthropicBatchCreateParams, + AnthropicBatchListParams, AnthropicCreateParams, AnthropicMessage, + AnthropicMessageBatch, AnthropicStreamEvent, } from "../../vendor-sdk-types/anthropic"; @@ -26,4 +29,40 @@ export const anthropicChannels = defineChannels("@anthropic-ai/sdk", { channelName: "beta.messages.create", kind: "async", }), + messagesBatchesCreate: channel< + [AnthropicBatchCreateParams], + AnthropicMessageBatch, + Record + >({ + channelName: "messages.batches.create", + kind: "async", + }), + messagesBatchesRetrieve: channel< + [string], + AnthropicMessageBatch, + Record + >({ + channelName: "messages.batches.retrieve", + kind: "async", + }), + messagesBatchesList: channel< + [AnthropicBatchListParams?], + unknown, + Record + >({ + channelName: "messages.batches.list", + kind: "async", + }), + messagesBatchesCancel: channel< + [string], + AnthropicMessageBatch, + Record + >({ + channelName: "messages.batches.cancel", + kind: "async", + }), + messagesBatchesDelete: channel<[string], unknown, Record>({ + channelName: "messages.batches.delete", + kind: "async", + }), }); diff --git a/js/src/instrumentation/plugins/anthropic-plugin.test.ts b/js/src/instrumentation/plugins/anthropic-plugin.test.ts index 889f83d08..912069517 100644 --- a/js/src/instrumentation/plugins/anthropic-plugin.test.ts +++ b/js/src/instrumentation/plugins/anthropic-plugin.test.ts @@ -93,8 +93,8 @@ describe("AnthropicPlugin", () => { plugin.enable(); plugin.disable(); - // Should have subscribed twice - expect(mockChannel.subscribe).toHaveBeenCalledTimes(4); // 2 channels × 2 enables + // Should have subscribed twice (7 channels × 2 enables) + expect(mockChannel.subscribe).toHaveBeenCalledTimes(14); // 7 channels × 2 enables }); }); }); diff --git a/js/src/instrumentation/plugins/anthropic-plugin.ts b/js/src/instrumentation/plugins/anthropic-plugin.ts index 862bf7583..d917524c7 100644 --- a/js/src/instrumentation/plugins/anthropic-plugin.ts +++ b/js/src/instrumentation/plugins/anthropic-plugin.ts @@ -1,5 +1,9 @@ import { BasePlugin } from "../core"; -import { traceStreamingChannel, unsubscribeAll } from "../core/channel-tracing"; +import { + traceAsyncChannel, + traceStreamingChannel, + unsubscribeAll, +} from "../core/channel-tracing"; import { Attachment } from "../../logger"; import { SpanTypeAttribute, isObject } from "../../../util/index"; import { filterFrom, getCurrentUnixTimestamp } from "../../util"; @@ -7,9 +11,11 @@ import { finalizeAnthropicTokens } from "../../wrappers/anthropic-tokens-util"; import { anthropicChannels } from "./anthropic-channels"; import type { AnthropicBase64Source, + AnthropicBatchCreateParams, AnthropicCreateParams, AnthropicInputMessage, AnthropicMessage, + AnthropicMessageBatch, AnthropicOutputContentBlock, AnthropicStreamEvent, AnthropicUsage, @@ -95,6 +101,75 @@ export class AnthropicPlugin extends BasePlugin { name: "anthropic.messages.create", }), ); + + // Message Batches API + this.unsubscribers.push( + traceAsyncChannel(anthropicChannels.messagesBatchesCreate, { + name: "anthropic.messages.batches.create", + type: SpanTypeAttribute.LLM, + extractInput: ([params]: [AnthropicBatchCreateParams]) => ({ + input: params.requests, + metadata: { + ...filterFrom(params, ["requests"]), + provider: "anthropic", + }, + }), + extractOutput: (result: AnthropicMessageBatch) => result ?? null, + extractMetrics: () => ({}), + }), + ); + + this.unsubscribers.push( + traceAsyncChannel(anthropicChannels.messagesBatchesRetrieve, { + name: "anthropic.messages.batches.retrieve", + type: SpanTypeAttribute.LLM, + extractInput: ([batchId]: [string]) => ({ + input: batchId, + metadata: { provider: "anthropic" }, + }), + extractOutput: (result: AnthropicMessageBatch) => result ?? null, + extractMetrics: () => ({}), + }), + ); + + this.unsubscribers.push( + traceAsyncChannel(anthropicChannels.messagesBatchesList, { + name: "anthropic.messages.batches.list", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => ({ + input: params ?? null, + metadata: { provider: "anthropic" }, + }), + extractOutput: (result) => result ?? null, + extractMetrics: () => ({}), + }), + ); + + this.unsubscribers.push( + traceAsyncChannel(anthropicChannels.messagesBatchesCancel, { + name: "anthropic.messages.batches.cancel", + type: SpanTypeAttribute.LLM, + extractInput: ([batchId]: [string]) => ({ + input: batchId, + metadata: { provider: "anthropic" }, + }), + extractOutput: (result: AnthropicMessageBatch) => result ?? null, + extractMetrics: () => ({}), + }), + ); + + this.unsubscribers.push( + traceAsyncChannel(anthropicChannels.messagesBatchesDelete, { + name: "anthropic.messages.batches.delete", + type: SpanTypeAttribute.LLM, + extractInput: ([batchId]: [string]) => ({ + input: batchId, + metadata: { provider: "anthropic" }, + }), + extractOutput: (result) => result ?? null, + extractMetrics: () => ({}), + }), + ); } } diff --git a/js/src/vendor-sdk-types/anthropic.ts b/js/src/vendor-sdk-types/anthropic.ts index 440251148..bd20d867b 100644 --- a/js/src/vendor-sdk-types/anthropic.ts +++ b/js/src/vendor-sdk-types/anthropic.ts @@ -23,6 +23,51 @@ export interface AnthropicMessages { ) => AnthropicAPIPromise< AnthropicMessage | AsyncIterable >; + batches: AnthropicBatches; +} + +// Batches API + +export interface AnthropicBatches { + create: ( + params: AnthropicBatchCreateParams, + ) => AnthropicAPIPromise; + retrieve: (batchId: string) => AnthropicAPIPromise; + list: (params?: AnthropicBatchListParams) => AnthropicAPIPromise; + cancel: (batchId: string) => AnthropicAPIPromise; + delete: (batchId: string) => AnthropicAPIPromise; +} + +export interface AnthropicBatchCreateParams { + requests: AnthropicBatchRequest[]; + [key: string]: unknown; +} + +export interface AnthropicBatchRequest { + custom_id: string; + params: AnthropicCreateParams; +} + +export interface AnthropicBatchListParams { + [key: string]: unknown; +} + +export interface AnthropicMessageBatch { + id: string; + type: "message_batch"; + processing_status: "in_progress" | "canceling" | "ended" | string; + request_counts: { + processing: number; + succeeded: number; + errored: number; + canceled: number; + expired: number; + }; + created_at: string; + expires_at: string; + archived_at?: string | null; + cancel_initiated_at?: string | null; + results_url?: string | null; } export interface AnthropicAPIPromise extends Promise { diff --git a/js/src/wrappers/anthropic.ts b/js/src/wrappers/anthropic.ts index 7048e5f40..bdb9fac67 100644 --- a/js/src/wrappers/anthropic.ts +++ b/js/src/wrappers/anthropic.ts @@ -1,6 +1,7 @@ import { anthropicChannels } from "../instrumentation/plugins/anthropic-channels"; import { TypedApplyProxy } from "../typed-instrumentation-helpers"; import type { + AnthropicBatches, AnthropicBeta, AnthropicClient, AnthropicMessages, @@ -79,11 +80,86 @@ function messagesProxy( return createProxy(target.create, channel); } + if (prop === "batches" && target.batches) { + return batchesProxy(target.batches); + } + return Reflect.get(target, prop, receiver); }, }); } +function batchesProxy(batches: AnthropicBatches): AnthropicBatches { + return new Proxy(batches, { + get(target, prop, receiver) { + switch (prop) { + case "create": + return new TypedApplyProxy(target.create, { + apply(fn, thisArg, args) { + return anthropicChannels.messagesBatchesCreate.tracePromise( + () => + Reflect.apply(fn, thisArg, args) as ReturnType< + AnthropicBatches["create"] + >, + { arguments: args }, + ); + }, + }); + case "retrieve": + return new TypedApplyProxy(target.retrieve, { + apply(fn, thisArg, args) { + return anthropicChannels.messagesBatchesRetrieve.tracePromise( + () => + Reflect.apply(fn, thisArg, args) as ReturnType< + AnthropicBatches["retrieve"] + >, + { arguments: args }, + ); + }, + }); + case "list": + return new TypedApplyProxy(target.list, { + apply(fn, thisArg, args) { + return anthropicChannels.messagesBatchesList.tracePromise( + () => + Reflect.apply(fn, thisArg, args) as ReturnType< + AnthropicBatches["list"] + >, + { arguments: args }, + ); + }, + }); + case "cancel": + return new TypedApplyProxy(target.cancel, { + apply(fn, thisArg, args) { + return anthropicChannels.messagesBatchesCancel.tracePromise( + () => + Reflect.apply(fn, thisArg, args) as ReturnType< + AnthropicBatches["cancel"] + >, + { arguments: args }, + ); + }, + }); + case "delete": + return new TypedApplyProxy(target.delete, { + apply(fn, thisArg, args) { + return anthropicChannels.messagesBatchesDelete.tracePromise( + () => + Reflect.apply(fn, thisArg, args) as ReturnType< + AnthropicBatches["delete"] + >, + { arguments: args }, + ); + }, + }); + default: + return Reflect.get(target, prop, receiver); + } + }, + }); +} + function createProxy( create: AnthropicMessages["create"], channel: From a163d9ee0a0e808ffe34e036b04f04ec580b47d8 Mon Sep 17 00:00:00 2001 From: Stephen Belanger Date: Sat, 28 Mar 2026 00:20:50 +0800 Subject: [PATCH 2/2] fix: Remove explicit type annotations that caused TS2322 in batch extractInput The destructured parameter type annotations (e.g. `([params]: [T])`) were stricter than the expected `(args: [T, ...any[]], event, span) => ...` type, causing assignability errors. Removing the annotations lets TypeScript infer the correct types from the channel's generic parameters. Co-Authored-By: Claude Sonnet 4.6 --- .../instrumentation/plugins/anthropic-plugin.ts | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/js/src/instrumentation/plugins/anthropic-plugin.ts b/js/src/instrumentation/plugins/anthropic-plugin.ts index d917524c7..73c413874 100644 --- a/js/src/instrumentation/plugins/anthropic-plugin.ts +++ b/js/src/instrumentation/plugins/anthropic-plugin.ts @@ -11,11 +11,9 @@ import { finalizeAnthropicTokens } from "../../wrappers/anthropic-tokens-util"; import { anthropicChannels } from "./anthropic-channels"; import type { AnthropicBase64Source, - AnthropicBatchCreateParams, AnthropicCreateParams, AnthropicInputMessage, AnthropicMessage, - AnthropicMessageBatch, AnthropicOutputContentBlock, AnthropicStreamEvent, AnthropicUsage, @@ -107,14 +105,14 @@ export class AnthropicPlugin extends BasePlugin { traceAsyncChannel(anthropicChannels.messagesBatchesCreate, { name: "anthropic.messages.batches.create", type: SpanTypeAttribute.LLM, - extractInput: ([params]: [AnthropicBatchCreateParams]) => ({ + extractInput: ([params]) => ({ input: params.requests, metadata: { ...filterFrom(params, ["requests"]), provider: "anthropic", }, }), - extractOutput: (result: AnthropicMessageBatch) => result ?? null, + extractOutput: (result) => result ?? null, extractMetrics: () => ({}), }), ); @@ -123,11 +121,11 @@ export class AnthropicPlugin extends BasePlugin { traceAsyncChannel(anthropicChannels.messagesBatchesRetrieve, { name: "anthropic.messages.batches.retrieve", type: SpanTypeAttribute.LLM, - extractInput: ([batchId]: [string]) => ({ + extractInput: ([batchId]) => ({ input: batchId, metadata: { provider: "anthropic" }, }), - extractOutput: (result: AnthropicMessageBatch) => result ?? null, + extractOutput: (result) => result ?? null, extractMetrics: () => ({}), }), ); @@ -149,11 +147,11 @@ export class AnthropicPlugin extends BasePlugin { traceAsyncChannel(anthropicChannels.messagesBatchesCancel, { name: "anthropic.messages.batches.cancel", type: SpanTypeAttribute.LLM, - extractInput: ([batchId]: [string]) => ({ + extractInput: ([batchId]) => ({ input: batchId, metadata: { provider: "anthropic" }, }), - extractOutput: (result: AnthropicMessageBatch) => result ?? null, + extractOutput: (result) => result ?? null, extractMetrics: () => ({}), }), ); @@ -162,7 +160,7 @@ export class AnthropicPlugin extends BasePlugin { traceAsyncChannel(anthropicChannels.messagesBatchesDelete, { name: "anthropic.messages.batches.delete", type: SpanTypeAttribute.LLM, - extractInput: ([batchId]: [string]) => ({ + extractInput: ([batchId]) => ({ input: batchId, metadata: { provider: "anthropic" }, }),