From e1e3adf9efa29ca4b6ae89c67842a48015d43dd2 Mon Sep 17 00:00:00 2001 From: Luca Forstner Date: Thu, 26 Mar 2026 12:42:56 +0100 Subject: [PATCH 1/2] ref: Use diagnostics channels for ai sdk wrapper --- .../plugins/ai-sdk-channels.ts | 28 +- .../instrumentation/plugins/ai-sdk-plugin.ts | 138 ++- js/src/wrappers/ai-sdk/ai-sdk.ts | 1091 +++-------------- 3 files changed, 259 insertions(+), 998 deletions(-) diff --git a/js/src/instrumentation/plugins/ai-sdk-channels.ts b/js/src/instrumentation/plugins/ai-sdk-channels.ts index 1c835ad7..5366fbc7 100644 --- a/js/src/instrumentation/plugins/ai-sdk-channels.ts +++ b/js/src/instrumentation/plugins/ai-sdk-channels.ts @@ -1,16 +1,24 @@ import { channel, defineChannels } from "../core/channel-definitions"; +import type { ChannelSpanInfo } from "../core/types"; import type { + AISDK, AISDKCallParams, AISDKResult, } from "../../vendor-sdk-types/ai-sdk"; type AISDKStreamResult = AISDKResult | AsyncIterable; +type AISDKChannelContext = { + aiSDK?: AISDK; + denyOutputPaths?: string[]; + self?: unknown; + span_info?: ChannelSpanInfo; +}; export const aiSDKChannels = defineChannels("ai", { generateText: channel< [AISDKCallParams], AISDKStreamResult, - Record, + AISDKChannelContext, unknown >({ channelName: "generateText", @@ -19,7 +27,7 @@ export const aiSDKChannels = defineChannels("ai", { streamText: channel< [AISDKCallParams], AISDKStreamResult, - Record, + AISDKChannelContext, unknown >({ channelName: "streamText", @@ -28,7 +36,7 @@ export const aiSDKChannels = defineChannels("ai", { streamTextSync: channel< [AISDKCallParams], AISDKResult, - Record, + AISDKChannelContext, unknown >({ channelName: "streamText.sync", @@ -37,7 +45,7 @@ export const aiSDKChannels = defineChannels("ai", { generateObject: channel< [AISDKCallParams], AISDKStreamResult, - Record, + AISDKChannelContext, unknown >({ channelName: "generateObject", @@ -46,7 +54,7 @@ export const aiSDKChannels = defineChannels("ai", { streamObject: channel< [AISDKCallParams], AISDKStreamResult, - Record, + AISDKChannelContext, unknown >({ channelName: "streamObject", @@ -55,7 +63,7 @@ export const aiSDKChannels = defineChannels("ai", { streamObjectSync: channel< [AISDKCallParams], AISDKResult, - Record, + AISDKChannelContext, unknown >({ channelName: "streamObject.sync", @@ -64,7 +72,7 @@ export const aiSDKChannels = defineChannels("ai", { agentGenerate: channel< [AISDKCallParams], AISDKStreamResult, - Record, + AISDKChannelContext, unknown >({ channelName: "Agent.generate", @@ -73,7 +81,7 @@ export const aiSDKChannels = defineChannels("ai", { agentStream: channel< [AISDKCallParams], AISDKStreamResult, - Record, + AISDKChannelContext, unknown >({ channelName: "Agent.stream", @@ -82,7 +90,7 @@ export const aiSDKChannels = defineChannels("ai", { toolLoopAgentGenerate: channel< [AISDKCallParams], AISDKStreamResult, - Record, + AISDKChannelContext, unknown >({ channelName: "ToolLoopAgent.generate", @@ -91,7 +99,7 @@ export const aiSDKChannels = defineChannels("ai", { toolLoopAgentStream: channel< [AISDKCallParams], AISDKStreamResult, - Record, + AISDKChannelContext, unknown >({ channelName: "ToolLoopAgent.stream", diff --git a/js/src/instrumentation/plugins/ai-sdk-plugin.ts b/js/src/instrumentation/plugins/ai-sdk-plugin.ts index fe6c041a..cdc30888 100644 --- a/js/src/instrumentation/plugins/ai-sdk-plugin.ts +++ b/js/src/instrumentation/plugins/ai-sdk-plugin.ts @@ -7,11 +7,12 @@ import { import { SpanTypeAttribute } from "../../../util/index"; import { getCurrentUnixTimestamp } from "../../util"; import { type Span, withCurrent } from "../../logger"; -import { processInputAttachments } from "../../wrappers/attachment-utils"; +import { processInputAttachmentsSync } from "../../wrappers/ai-sdk/ai-sdk"; import { normalizeAISDKLoggedOutput } from "../../wrappers/ai-sdk/normalize-logged-output"; import { serializeAISDKToolsForLogging } from "../../wrappers/ai-sdk/tool-serialization"; import { aiSDKChannels } from "./ai-sdk-channels"; import type { + AISDK, AISDKCallParams, AISDKLanguageModel, AISDKModel, @@ -101,7 +102,10 @@ export class AISDKPlugin extends BasePlugin { prepareAISDKInput(params, event, span, denyOutputPaths), extractOutput: (result, endEvent) => { finalizeAISDKChildTracing(endEvent as { [key: string]: unknown }); - return processAISDKOutput(result, denyOutputPaths); + return processAISDKOutput( + result, + resolveDenyOutputPaths(endEvent, denyOutputPaths), + ); }, extractMetrics: (result, _startTime, endEvent) => extractTopLevelAISDKMetrics(result, endEvent), @@ -116,13 +120,17 @@ export class AISDKPlugin extends BasePlugin { type: SpanTypeAttribute.LLM, extractInput: ([params], event, span) => prepareAISDKInput(params, event, span, denyOutputPaths), - extractOutput: (result) => processAISDKOutput(result, denyOutputPaths), + extractOutput: (result, endEvent) => + processAISDKOutput( + result, + resolveDenyOutputPaths(endEvent, denyOutputPaths), + ), extractMetrics: (result, startTime, endEvent) => extractTopLevelAISDKMetrics(result, endEvent, startTime), aggregateChunks: aggregateAISDKChunks, patchResult: ({ endEvent, result, span, startTime }) => patchAISDKStreamingResult({ - denyOutputPaths, + defaultDenyOutputPaths: denyOutputPaths, endEvent, result, span, @@ -140,7 +148,7 @@ export class AISDKPlugin extends BasePlugin { prepareAISDKInput(params, event, span, denyOutputPaths), patchResult: ({ endEvent, result, span, startTime }) => patchAISDKStreamingResult({ - denyOutputPaths, + defaultDenyOutputPaths: denyOutputPaths, endEvent, result, span, @@ -158,7 +166,10 @@ export class AISDKPlugin extends BasePlugin { prepareAISDKInput(params, event, span, denyOutputPaths), extractOutput: (result, endEvent) => { finalizeAISDKChildTracing(endEvent as { [key: string]: unknown }); - return processAISDKOutput(result, denyOutputPaths); + return processAISDKOutput( + result, + resolveDenyOutputPaths(endEvent, denyOutputPaths), + ); }, extractMetrics: (result, _startTime, endEvent) => extractTopLevelAISDKMetrics(result, endEvent), @@ -173,13 +184,17 @@ export class AISDKPlugin extends BasePlugin { type: SpanTypeAttribute.LLM, extractInput: ([params], event, span) => prepareAISDKInput(params, event, span, denyOutputPaths), - extractOutput: (result) => processAISDKOutput(result, denyOutputPaths), + extractOutput: (result, endEvent) => + processAISDKOutput( + result, + resolveDenyOutputPaths(endEvent, denyOutputPaths), + ), extractMetrics: (result, startTime, endEvent) => extractTopLevelAISDKMetrics(result, endEvent, startTime), aggregateChunks: aggregateAISDKChunks, patchResult: ({ endEvent, result, span, startTime }) => patchAISDKStreamingResult({ - denyOutputPaths, + defaultDenyOutputPaths: denyOutputPaths, endEvent, result, span, @@ -197,7 +212,7 @@ export class AISDKPlugin extends BasePlugin { prepareAISDKInput(params, event, span, denyOutputPaths), patchResult: ({ endEvent, result, span, startTime }) => patchAISDKStreamingResult({ - denyOutputPaths, + defaultDenyOutputPaths: denyOutputPaths, endEvent, result, span, @@ -215,7 +230,10 @@ export class AISDKPlugin extends BasePlugin { prepareAISDKInput(params, event, span, denyOutputPaths), extractOutput: (result, endEvent) => { finalizeAISDKChildTracing(endEvent as { [key: string]: unknown }); - return processAISDKOutput(result, denyOutputPaths); + return processAISDKOutput( + result, + resolveDenyOutputPaths(endEvent, denyOutputPaths), + ); }, extractMetrics: (result, _startTime, endEvent) => extractTopLevelAISDKMetrics(result, endEvent), @@ -230,13 +248,17 @@ export class AISDKPlugin extends BasePlugin { type: SpanTypeAttribute.LLM, extractInput: ([params], event, span) => prepareAISDKInput(params, event, span, denyOutputPaths), - extractOutput: (result) => processAISDKOutput(result, denyOutputPaths), + extractOutput: (result, endEvent) => + processAISDKOutput( + result, + resolveDenyOutputPaths(endEvent, denyOutputPaths), + ), extractMetrics: (result, startTime, endEvent) => extractTopLevelAISDKMetrics(result, endEvent, startTime), aggregateChunks: aggregateAISDKChunks, patchResult: ({ endEvent, result, span, startTime }) => patchAISDKStreamingResult({ - denyOutputPaths, + defaultDenyOutputPaths: denyOutputPaths, endEvent, result, span, @@ -254,7 +276,10 @@ export class AISDKPlugin extends BasePlugin { prepareAISDKInput(params, event, span, denyOutputPaths), extractOutput: (result, endEvent) => { finalizeAISDKChildTracing(endEvent as { [key: string]: unknown }); - return processAISDKOutput(result, denyOutputPaths); + return processAISDKOutput( + result, + resolveDenyOutputPaths(endEvent, denyOutputPaths), + ); }, extractMetrics: (result, _startTime, endEvent) => extractTopLevelAISDKMetrics(result, endEvent), @@ -269,13 +294,17 @@ export class AISDKPlugin extends BasePlugin { type: SpanTypeAttribute.LLM, extractInput: ([params], event, span) => prepareAISDKInput(params, event, span, denyOutputPaths), - extractOutput: (result) => processAISDKOutput(result, denyOutputPaths), + extractOutput: (result, endEvent) => + processAISDKOutput( + result, + resolveDenyOutputPaths(endEvent, denyOutputPaths), + ), extractMetrics: (result, startTime, endEvent) => extractTopLevelAISDKMetrics(result, endEvent, startTime), aggregateChunks: aggregateAISDKChunks, patchResult: ({ endEvent, result, span, startTime }) => patchAISDKStreamingResult({ - denyOutputPaths, + defaultDenyOutputPaths: denyOutputPaths, endEvent, result, span, @@ -286,41 +315,61 @@ export class AISDKPlugin extends BasePlugin { } } +function resolveDenyOutputPaths( + event: { denyOutputPaths?: string[] } | undefined, + defaultDenyOutputPaths: string[], +): string[] { + return event?.denyOutputPaths ?? defaultDenyOutputPaths; +} + /** * Process AI SDK input parameters, converting attachments as needed. */ -function processAISDKInput(params: AISDKCallParams): unknown { - if (!params) return params; - - // Use the attachment processing from the manual wrapper, but keep tool - // definitions in metadata to match the OpenAI wrapper convention. - const input = processInputAttachments(params); - if (!input || typeof input !== "object" || Array.isArray(input)) { - return input; - } - - const { tools: _tools, ...rest } = input as Record; - return rest; +function processAISDKInput( + params: AISDKCallParams, +): ReturnType { + return processInputAttachmentsSync(params); } function prepareAISDKInput( params: AISDKCallParams, - event: { self?: unknown; [key: string]: unknown }, + event: { + aiSDK?: AISDK; + denyOutputPaths?: string[]; + self?: unknown; + [key: string]: unknown; + }, span: Span, - denyOutputPaths: string[], + defaultDenyOutputPaths: string[], ): { input: unknown; metadata: Record; } { - const input = processAISDKInput(params); + const { input, outputPromise } = processAISDKInput(params); + if (outputPromise && input && typeof input === "object") { + outputPromise + .then((resolvedData) => { + span.log({ + input: { + ...(input as Record), + ...resolvedData, + }, + }); + }) + .catch(() => { + // Use the placeholder response_format if async resolution fails. + }); + } + const metadata = extractMetadataFromParams(params, event.self); const childTracing = prepareAISDKChildTracing( params, event.self, span, - denyOutputPaths, + defaultDenyOutputPaths, + event.aiSDK, ); - event.__braintrust_ai_sdk_model_wrapped = childTracing.modelWrapped; + event.modelWrapped = childTracing.modelWrapped; if (childTracing.cleanup) { event.__braintrust_ai_sdk_cleanup = childTracing.cleanup; } @@ -348,7 +397,10 @@ function extractTopLevelAISDKMetrics( } function hasModelChildTracing(event?: { [key: string]: unknown }): boolean { - return event?.__braintrust_ai_sdk_model_wrapped === true; + return ( + event?.modelWrapped === true || + event?.__braintrust_ai_sdk_model_wrapped === true + ); } /** @@ -401,6 +453,7 @@ function prepareAISDKChildTracing( self: unknown, parentSpan: Span, denyOutputPaths: string[], + aiSDK?: AISDK, ): { cleanup?: () => void; modelWrapped: boolean; @@ -411,7 +464,7 @@ function prepareAISDKChildTracing( let modelWrapped = false; const patchModel = (model: AISDKModel | undefined): void => { - const resolvedModel = resolveAISDKModel(model); + const resolvedModel = resolveAISDKModel(model, aiSDK); if ( !resolvedModel || typeof resolvedModel !== "object" || @@ -456,7 +509,7 @@ function prepareAISDKChildTracing( type: SpanTypeAttribute.LLM, }, event: { - input: processAISDKInput(options), + input: processAISDKInput(options).input, metadata: baseMetadata, }, }, @@ -473,7 +526,7 @@ function prepareAISDKChildTracing( type: SpanTypeAttribute.LLM, }, event: { - input: processAISDKInput(options), + input: processAISDKInput(options).input, metadata: baseMetadata, }, }); @@ -704,13 +757,13 @@ function finalizeAISDKChildTracing(event?: { [key: string]: unknown }): void { } function patchAISDKStreamingResult(args: { - denyOutputPaths: string[]; - endEvent: { [key: string]: unknown }; + defaultDenyOutputPaths: string[]; + endEvent: { denyOutputPaths?: string[]; [key: string]: unknown }; result: AISDKResult; span: Span; startTime: number; }): boolean { - const { denyOutputPaths, endEvent, result, span, startTime } = args; + const { defaultDenyOutputPaths, endEvent, result, span, startTime } = args; if (!result || typeof result !== "object") { return false; @@ -742,7 +795,7 @@ function patchAISDKStreamingResult(args: { const output = await processAISDKStreamingOutput( result, - denyOutputPaths, + resolveDenyOutputPaths(endEvent, defaultDenyOutputPaths), ); const metadata = buildResolvedMetadataPayload(result).metadata; @@ -848,6 +901,7 @@ function buildResolvedMetadataPayload(result: AISDKResult): { function resolveAISDKModel( model: AISDKModel | undefined, + aiSDK?: AISDK, ): AISDKModel | undefined { if (typeof model !== "string") { return model; @@ -860,7 +914,9 @@ function resolveAISDKModel( languageModel?: (modelId: string) => AISDKLanguageModel; }; } - ).AI_SDK_DEFAULT_PROVIDER ?? null; + ).AI_SDK_DEFAULT_PROVIDER ?? + aiSDK?.gateway ?? + null; if (provider && typeof provider.languageModel === "function") { return provider.languageModel(model); diff --git a/js/src/wrappers/ai-sdk/ai-sdk.ts b/js/src/wrappers/ai-sdk/ai-sdk.ts index d65bf3e1..c5cbd1b7 100644 --- a/js/src/wrappers/ai-sdk/ai-sdk.ts +++ b/js/src/wrappers/ai-sdk/ai-sdk.ts @@ -1,7 +1,8 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { startSpan, traced, withCurrent, Attachment } from "../../logger"; +import { Attachment } from "../../logger"; import { SpanTypeAttribute } from "../../../util"; +import { aiSDKChannels } from "../../instrumentation/plugins/ai-sdk-channels"; import { convertDataToBlob, getExtensionFromMediaType, @@ -16,15 +17,11 @@ import type { AISDKCallParams, AISDKGeneratedFile, AISDKGenerateFunction, - AISDKLanguageModel, AISDKModel, - AISDKModelStreamChunk, AISDKOutputObject, AISDKOutputResponseFormat, AISDKResult, AISDKStreamFunction, - AISDKTool, - AISDKTools, } from "../../vendor-sdk-types/ai-sdk"; // list of json paths to remove from output field @@ -208,14 +205,17 @@ const wrapAgentGenerate = ( instance: AISDKAgentInstance, options: WrapAISDKOptions = {}, ) => { - return async (params: AISDKCallParams) => + const defaultName = `${instance.constructor.name}.generate`; + return async (params: AISDKCallParams & SpanInfo) => makeGenerateTextWrapper( - `${instance.constructor.name}.generate`, + aiSDKChannels.generateText, + defaultName, + generate.bind(instance), + { + self: instance, + spanType: SpanTypeAttribute.FUNCTION, + }, options, - generate.bind(instance), // as of v5 this is just streamText under the hood - // Follows what the AI SDK does under the hood when calling generateText - undefined, - SpanTypeAttribute.FUNCTION, )({ ...instance.settings, ...params }); }; @@ -224,396 +224,66 @@ const wrapAgentStream = ( instance: AISDKAgentInstance, options: WrapAISDKOptions = {}, ) => { - return (params: AISDKCallParams) => - makeStreamTextWrapper( - `${instance.constructor.name}.stream`, + const defaultName = `${instance.constructor.name}.stream`; + return (params: AISDKCallParams & SpanInfo) => + makeStreamWrapper( + aiSDKChannels.streamTextSync, + defaultName, + stream.bind(instance), + { + self: instance, + spanType: SpanTypeAttribute.FUNCTION, + }, options, - stream.bind(instance), // as of v5 this is just streamText under the hood - undefined, // aiSDK not needed since model is already on instance - SpanTypeAttribute.FUNCTION, )({ ...instance.settings, ...params }); }; const makeGenerateTextWrapper = ( + channel: + | typeof aiSDKChannels.generateText + | typeof aiSDKChannels.generateObject, name: string, - options: WrapAISDKOptions, generateText: AISDKGenerateFunction, - aiSDK?: AISDK, - spanType: SpanTypeAttribute = SpanTypeAttribute.LLM, + contextOptions: { + aiSDK?: AISDK; + self?: unknown; + spanType?: SpanTypeAttribute; + } = {}, + options: WrapAISDKOptions = {}, ) => { const wrapper = async function (allParams: AISDKCallParams & SpanInfo) { - // Extract span_info from params (used by Braintrust-managed prompts) const { span_info, ...params } = allParams; - const { - metadata: spanInfoMetadata, - name: spanName, - spanAttributes: spanInfoAttrs, - } = span_info ?? {}; - - const { model, provider } = serializeModelWithProvider(params.model); - const serializedTools = serializeAISDKToolsForLogging(params.tools); - - // Process input attachments (including async Output.object resolution for v6) - const processedInput = await processInputAttachments(params); - - return traced( - async (span) => { - const { wrappedModel, getMetrics } = wrapModelAndGetMetrics( - params.model, - aiSDK, - ); - - const result = await generateText({ - ...params, - model: wrappedModel, - tools: wrapTools(params.tools), - }); - - // Extract resolved model/provider from gateway routing if available - const gatewayInfo = extractGatewayRoutingInfo(result); - const resolvedMetadata: Record = {}; - if (gatewayInfo?.provider) { - resolvedMetadata.provider = gatewayInfo.provider; - } - if (gatewayInfo?.model) { - resolvedMetadata.model = gatewayInfo.model; - } - - span.log({ - output: await processOutput(result, options.denyOutputPaths), - metrics: getMetrics(result), - ...(Object.keys(resolvedMetadata).length > 0 - ? { metadata: resolvedMetadata } - : {}), - }); - - return result; - }, - { - name: spanName || name, - spanAttributes: { - type: spanType, - ...spanInfoAttrs, - }, - event: { - input: processedInput, - metadata: { - ...spanInfoMetadata, - model, - ...(provider ? { provider } : {}), - ...(serializedTools ? { tools: serializedTools } : {}), - braintrust: { - integration_name: "ai-sdk", - sdk_language: "typescript", - }, - }, - }, - }, + const tracedParams = { ...params }; + + return channel.tracePromise( + () => generateText(tracedParams), + createAISDKChannelContext(tracedParams, { + aiSDK: contextOptions.aiSDK, + denyOutputPaths: options.denyOutputPaths, + self: contextOptions.self, + span_info: mergeSpanInfo(span_info, { + name, + spanType: contextOptions.spanType, + }), + }), ); }; Object.defineProperty(wrapper, "name", { value: name, writable: false }); return wrapper; }; -/** - * Resolves a model string ID to a model instance using AI SDK's global provider. - * This mirrors the internal resolveLanguageModel function in AI SDK. - */ -const resolveModel = ( - model: AISDKModel | undefined, - ai?: AISDK, -): AISDKModel | undefined => { - if (typeof model !== "string") { - return model; - } - // Use AI SDK's global provider if set, otherwise fall back to gateway - const provider = - ( - globalThis as typeof globalThis & { - AI_SDK_DEFAULT_PROVIDER?: { - languageModel?: (modelId: string) => AISDKLanguageModel; - }; - } - ).AI_SDK_DEFAULT_PROVIDER ?? - ai?.gateway ?? - null; - if (provider && typeof provider.languageModel === "function") { - return provider.languageModel(model); - } - // If no provider available, return as-is (AI SDK will resolve it) - return model; -}; - -/** - * Wraps a model's doGenerate method to create a span for each LLM call. - * This allows visibility into each step of a multi-round tool interaction. - */ -const wrapModel = ( - model: AISDKModel | undefined, - ai?: AISDK, -): AISDKModel | undefined => { - // Resolve string model IDs to model instances - const resolvedModel = resolveModel(model, ai); - - if ( - !resolvedModel || - typeof resolvedModel !== "object" || - typeof resolvedModel.doGenerate !== "function" - ) { - return model; // Return original if we can't wrap - } - - // Already wrapped - avoid double wrapping - if (resolvedModel._braintrustWrapped) { - return resolvedModel; - } - - const originalDoGenerate = resolvedModel.doGenerate.bind(resolvedModel); - const originalDoStream = resolvedModel.doStream?.bind(resolvedModel); - - const { model: modelId, provider } = - serializeModelWithProvider(resolvedModel); - - const wrappedDoGenerate = async (options: AISDKCallParams) => { - const serializedTools = serializeAISDKToolsForLogging(options.tools); - // Process input attachments (including async Output.object resolution for v6) - const processedInput = await processInputAttachments(options); - - return traced( - async (span) => { - const result = await originalDoGenerate(options); - - // Extract resolved model/provider from gateway routing if available - const gatewayInfo = extractGatewayRoutingInfo(result); - const resolvedMetadata: Record = {}; - if (gatewayInfo?.provider) { - resolvedMetadata.provider = gatewayInfo.provider; - } - if (gatewayInfo?.model) { - resolvedMetadata.model = gatewayInfo.model; - } - if (result.finishReason !== undefined) { - resolvedMetadata.finish_reason = result.finishReason; - } - - span.log({ - output: await processOutput(result), - metrics: extractTokenMetrics(result), - ...(Object.keys(resolvedMetadata).length > 0 - ? { metadata: resolvedMetadata } - : {}), - }); - - return result; - }, - { - name: "doGenerate", - spanAttributes: { - type: SpanTypeAttribute.LLM, - }, - event: { - input: processedInput, - metadata: { - model: modelId, - ...(provider ? { provider } : {}), - ...(serializedTools ? { tools: serializedTools } : {}), - braintrust: { - integration_name: "ai-sdk", - sdk_language: "typescript", - }, - }, - }, - }, - ); - }; - - const wrappedDoStream = async (options: AISDKCallParams) => { - const startTime = Date.now(); - let receivedFirst = false; - const serializedTools = serializeAISDKToolsForLogging(options.tools); - - // Process input attachments (including async Output.object resolution for v6) - const processedInput = await processInputAttachments(options); - - const span = startSpan({ - name: "doStream", - spanAttributes: { - type: SpanTypeAttribute.LLM, - }, - event: { - input: processedInput, - metadata: { - model: modelId, - ...(provider ? { provider } : {}), - ...(serializedTools ? { tools: serializedTools } : {}), - braintrust: { - integration_name: "ai-sdk", - sdk_language: "typescript", - }, - }, - }, - }); - - const result = await originalDoStream!(options); - - // Accumulate streamed content for output logging - const output: Record = {}; - let text = ""; - let reasoning = ""; - const toolCalls: unknown[] = []; - let object: unknown = undefined; // For structured output / streamObject - - // Helper to extract text from various chunk formats - const extractTextDelta = (chunk: AISDKModelStreamChunk): string => { - // Try all known property names for text deltas - if (typeof chunk.textDelta === "string") return chunk.textDelta; - if (typeof chunk.delta === "string") return chunk.delta; - if (typeof chunk.text === "string") return chunk.text; - // For content property (some providers use this) - if (typeof chunk.content === "string") return chunk.content; - return ""; - }; - - const transformStream = new TransformStream({ - async transform(chunk, controller) { - // Track time to first token on any chunk type - if (!receivedFirst) { - receivedFirst = true; - span.log({ - metrics: { - time_to_first_token: (Date.now() - startTime) / 1000, - }, - }); - } - - switch (chunk.type) { - case "text-delta": - text += extractTextDelta(chunk); - break; - case "reasoning-delta": - // Reasoning chunks use delta or text property - if (chunk.delta) { - reasoning += chunk.delta; - } else if (chunk.text) { - reasoning += chunk.text; - } - break; - case "tool-call": - toolCalls.push(chunk); - break; - case "object": - // Structured output - capture the final object - object = chunk.object; - break; - case "raw": - // Raw chunks may contain text content for structured output / JSON mode - // The rawValue often contains the delta text from the provider - if (chunk.rawValue) { - const rawVal = chunk.rawValue; - // OpenAI format: rawValue.delta.content or rawValue.choices[0].delta.content - if (rawVal.delta?.content) { - text += rawVal.delta.content; - } else if (rawVal.choices?.[0]?.delta?.content) { - text += rawVal.choices[0].delta.content; - } else if (typeof rawVal.text === "string") { - text += rawVal.text; - } else if (typeof rawVal.content === "string") { - text += rawVal.content; - } - } - break; - case "finish": - output.text = text; - output.reasoning = reasoning; - output.toolCalls = toolCalls; - output.finishReason = chunk.finishReason; - output.usage = chunk.usage; - - // Include object for structured output if captured - if (object !== undefined) { - output.object = object; - } - - // Extract resolved model/provider from gateway routing if available - const gatewayInfo = extractGatewayRoutingInfo(output); - const resolvedMetadata: Record = {}; - if (gatewayInfo?.provider) { - resolvedMetadata.provider = gatewayInfo.provider; - } - if (gatewayInfo?.model) { - resolvedMetadata.model = gatewayInfo.model; - } - if (chunk.finishReason !== undefined) { - resolvedMetadata.finish_reason = chunk.finishReason; - } - - span.log({ - output: await processOutput(output), - metrics: extractTokenMetrics(output as AISDKResult), - ...(Object.keys(resolvedMetadata).length > 0 - ? { metadata: resolvedMetadata } - : {}), - }); - span.end(); - break; - } - controller.enqueue(chunk); - }, - }); - - return { - ...result, - stream: result.stream.pipeThrough(transformStream), - }; - }; - - return new Proxy(resolvedModel, { - get(target, prop, receiver) { - if (prop === "_braintrustWrapped") { - return true; - } - if (prop === "doGenerate") { - return wrappedDoGenerate; - } - if (prop === "doStream" && originalDoStream) { - return wrappedDoStream; - } - return Reflect.get(target, prop, receiver); - }, - }); -}; - const wrapGenerateText = ( generateText: AISDKGenerateFunction, options: WrapAISDKOptions = {}, aiSDK?: AISDK, ) => { - return makeGenerateTextWrapper("generateText", options, generateText, aiSDK); -}; - -/** - * Wraps the model and returns a metrics extractor for the parent span. - * When the model is wrapped, child doGenerate/doStream spans already carry - * token/cost metrics, so the extractor returns undefined to prevent - * double-counting on the parent. - */ -const wrapModelAndGetMetrics = ( - model: AISDKModel | undefined, - aiSDK?: AISDK, -): { - wrappedModel: AISDKModel | undefined; - getMetrics: (result: AISDKResult) => Record | undefined; -} => { - const wrappedModel = wrapModel(model, aiSDK); - const modelIsWrapped = - typeof wrappedModel === "object" && - wrappedModel !== null && - wrappedModel._braintrustWrapped === true; - return { - wrappedModel, - getMetrics: (result: AISDKResult) => - modelIsWrapped ? undefined : extractTokenMetrics(result), - }; + return makeGenerateTextWrapper( + aiSDKChannels.generateText, + "generateText", + generateText, + { aiSDK }, + options, + ); }; const wrapGenerateObject = ( @@ -621,248 +291,44 @@ const wrapGenerateObject = ( options: WrapAISDKOptions = {}, aiSDK?: AISDK, ) => { - return async function generateObjectWrapper( - allParams: AISDKCallParams & SpanInfo, - ) { - // Extract span_info from params (used by Braintrust-managed prompts) - const { span_info, ...params } = allParams; - const { - metadata: spanInfoMetadata, - name: spanName, - spanAttributes: spanInfoAttrs, - } = span_info ?? {}; - - const { model, provider } = serializeModelWithProvider(params.model); - const serializedTools = serializeAISDKToolsForLogging(params.tools); - - // Process input attachments (including async Output.object resolution for v6) - const processedInput = await processInputAttachments(params); - - return traced( - async (span) => { - const { wrappedModel, getMetrics } = wrapModelAndGetMetrics( - params.model, - aiSDK, - ); - - const result = await generateObject({ - ...params, - model: wrappedModel, - tools: wrapTools(params.tools), - }); - - const output = await processOutput(result, options.denyOutputPaths); - - // Extract resolved model/provider from gateway routing if available - const gatewayInfo = extractGatewayRoutingInfo(result); - const resolvedMetadata: Record = {}; - if (gatewayInfo?.provider) { - resolvedMetadata.provider = gatewayInfo.provider; - } - if (gatewayInfo?.model) { - resolvedMetadata.model = gatewayInfo.model; - } - - span.log({ - output, - metrics: getMetrics(result), - ...(Object.keys(resolvedMetadata).length > 0 - ? { metadata: resolvedMetadata } - : {}), - }); - - return result; - }, - { - name: spanName || "generateObject", - spanAttributes: { - type: SpanTypeAttribute.LLM, - ...spanInfoAttrs, - }, - event: { - input: processedInput, - metadata: { - ...spanInfoMetadata, - model, - ...(provider ? { provider } : {}), - ...(serializedTools ? { tools: serializedTools } : {}), - braintrust: { - integration_name: "ai-sdk", - sdk_language: "typescript", - }, - }, - }, - }, - ); - }; + return makeGenerateTextWrapper( + aiSDKChannels.generateObject, + "generateObject", + generateObject, + { aiSDK }, + options, + ); }; -const makeStreamTextWrapper = ( +const makeStreamWrapper = ( + channel: + | typeof aiSDKChannels.streamTextSync + | typeof aiSDKChannels.streamObjectSync, name: string, - options: WrapAISDKOptions, streamText: AISDKStreamFunction, - aiSDK?: AISDK, - spanType: SpanTypeAttribute = SpanTypeAttribute.LLM, + contextOptions: { + aiSDK?: AISDK; + self?: unknown; + spanType?: SpanTypeAttribute; + } = {}, + options: WrapAISDKOptions = {}, ) => { - // Note: streamText returns a sync result (stream object), so we cannot make this async - // For v6, Output.object responseFormat is a Promise - we handle this by: - // 1. Processing input synchronously (v5 works, v6 gets placeholder) - // 2. Updating the span with resolved schema when Promise completes const wrapper = function (allParams: AISDKCallParams & SpanInfo) { - // Extract span_info from params (used by Braintrust-managed prompts) const { span_info, ...params } = allParams; - const { - metadata: spanInfoMetadata, - name: spanName, - spanAttributes: spanInfoAttrs, - } = span_info ?? {}; - - const { model, provider } = serializeModelWithProvider(params.model); - const serializedTools = serializeAISDKToolsForLogging(params.tools); - - // Process input attachments synchronously - // v5: responseFormat is a plain object - captured fully - // v6: responseFormat is a Promise - captured as placeholder, then updated when resolved - const { input: processedInput, outputPromise } = - processInputAttachmentsSync(params); - - const span = startSpan({ - name: spanName || name, - spanAttributes: { - type: spanType, - ...spanInfoAttrs, - }, - event: { - input: processedInput, - metadata: { - ...spanInfoMetadata, - model, - ...(provider ? { provider } : {}), - ...(serializedTools ? { tools: serializedTools } : {}), - braintrust: { - integration_name: "ai-sdk", - sdk_language: "typescript", - }, - }, - }, - }); - - // v6: Update input with resolved output schema when Promise completes - if (outputPromise) { - outputPromise - .then((resolvedData) => { - span.log({ input: { ...processedInput, ...resolvedData } }); - }) - .catch(() => { - // Silently ignore resolution errors - placeholder will be used - }); - } - - try { - const startTime = Date.now(); - let receivedFirst = false; - const { wrappedModel, getMetrics } = wrapModelAndGetMetrics( - params.model, - aiSDK, - ); - - const result = withCurrent(span, () => - streamText({ - ...params, - model: wrappedModel, - tools: wrapTools(params.tools), - onChunk: (chunk: AISDKModelStreamChunk) => { - if (!receivedFirst) { - receivedFirst = true; - span.log({ - metrics: { - time_to_first_token: (Date.now() - startTime) / 1000, - }, - }); - } - - params.onChunk?.(chunk); - }, - onFinish: async (event: AISDKResult) => { - params.onFinish?.(event); - - // Extract resolved model/provider from gateway routing if available - const gatewayInfo = extractGatewayRoutingInfo(event); - const resolvedMetadata: Record = {}; - if (gatewayInfo?.provider) { - resolvedMetadata.provider = gatewayInfo.provider; - } - if (gatewayInfo?.model) { - resolvedMetadata.model = gatewayInfo.model; - } - - span.log({ - output: await processOutput(event, options.denyOutputPaths), - metrics: getMetrics(event), - ...(Object.keys(resolvedMetadata).length > 0 - ? { metadata: resolvedMetadata } - : {}), - }); - - span.end(); - }, - onError: async (err: unknown) => { - params.onError?.(err); - - span.log({ - error: serializeError(err), - }); - - span.end(); - }, + const tracedParams = { ...params }; + + return channel.traceSync( + () => streamText(tracedParams), + createAISDKChannelContext(tracedParams, { + aiSDK: contextOptions.aiSDK, + denyOutputPaths: options.denyOutputPaths, + self: contextOptions.self, + span_info: mergeSpanInfo(span_info, { + name, + spanType: contextOptions.spanType, }), - ); - - // Use stream tee to track first token regardless of consumption method - const trackFirstToken = () => { - if (!receivedFirst) { - receivedFirst = true; - span.log({ - metrics: { - time_to_first_token: (Date.now() - startTime) / 1000, - }, - }); - } - }; - - if (result && result.baseStream) { - const [stream1, stream2] = result.baseStream.tee(); - result.baseStream = stream2; - - stream1 - .pipeThrough( - new TransformStream({ - transform(chunk, controller) { - trackFirstToken(); - controller.enqueue(chunk); - }, - }), - ) - .pipeTo( - new WritableStream({ - write() { - // Discard chunks - we only care about the side effect - }, - }), - ) - .catch(() => { - // Silently ignore errors from the tracking stream - }); - } - - return result; - } catch (error) { - span.log({ - error: serializeError(error), - }); - span.end(); - throw error; - } + }), + ); }; Object.defineProperty(wrapper, "name", { value: name, writable: false }); return wrapper; @@ -873,7 +339,13 @@ const wrapStreamText = ( options: WrapAISDKOptions = {}, aiSDK?: AISDK, ) => { - return makeStreamTextWrapper("streamText", options, streamText, aiSDK); + return makeStreamWrapper( + aiSDKChannels.streamTextSync, + "streamText", + streamText, + { aiSDK }, + options, + ); }; const wrapStreamObject = ( @@ -881,340 +353,65 @@ const wrapStreamObject = ( options: WrapAISDKOptions = {}, aiSDK?: AISDK, ) => { - // Note: streamObject returns a sync result (stream object), so we cannot make this async - // For v6, Output.object responseFormat is a Promise - we handle this by: - // 1. Processing input synchronously (v5 works, v6 gets placeholder) - // 2. Updating the span with resolved schema when Promise completes - return function streamObjectWrapper(allParams: AISDKCallParams & SpanInfo) { - // Extract span_info from params (used by Braintrust-managed prompts) - const { span_info, ...params } = allParams; - const { - metadata: spanInfoMetadata, - name: spanName, - spanAttributes: spanInfoAttrs, - } = span_info ?? {}; - - const { model, provider } = serializeModelWithProvider(params.model); - const serializedTools = serializeAISDKToolsForLogging(params.tools); - - // Process input attachments synchronously - // v5: responseFormat is a plain object - captured fully - // v6: responseFormat is a Promise - captured as placeholder, then updated when resolved - const { input: processedInput, outputPromise } = - processInputAttachmentsSync(params); - - const span = startSpan({ - name: spanName || "streamObject", - spanAttributes: { - type: SpanTypeAttribute.LLM, - ...spanInfoAttrs, - }, - event: { - input: processedInput, - metadata: { - ...spanInfoMetadata, - model, - ...(provider ? { provider } : {}), - ...(serializedTools ? { tools: serializedTools } : {}), - braintrust: { - integration_name: "ai-sdk", - sdk_language: "typescript", - }, - }, - }, - }); - - // v6: Update input with resolved output schema when Promise completes - if (outputPromise) { - outputPromise - .then((resolvedData) => { - span.log({ input: { ...processedInput, ...resolvedData } }); - }) - .catch(() => { - // Silently ignore resolution errors - placeholder will be used - }); - } - - try { - const startTime = Date.now(); - let receivedFirst = false; - const { wrappedModel, getMetrics } = wrapModelAndGetMetrics( - params.model, - aiSDK, - ); - - const result = withCurrent(span, () => - streamObject({ - ...params, - model: wrappedModel, - tools: wrapTools(params.tools), - onChunk: (chunk: AISDKModelStreamChunk) => { - if (!receivedFirst) { - receivedFirst = true; - span.log({ - metrics: { - time_to_first_token: (Date.now() - startTime) / 1000, - }, - }); - } - params.onChunk?.(chunk); - }, - onFinish: async (event: AISDKResult) => { - params.onFinish?.(event); - - // Extract resolved model/provider from gateway routing if available - const gatewayInfo = extractGatewayRoutingInfo(event); - const resolvedMetadata: Record = {}; - if (gatewayInfo?.provider) { - resolvedMetadata.provider = gatewayInfo.provider; - } - if (gatewayInfo?.model) { - resolvedMetadata.model = gatewayInfo.model; - } - - span.log({ - output: await processOutput(event, options.denyOutputPaths), - metrics: getMetrics(event), - ...(Object.keys(resolvedMetadata).length > 0 - ? { metadata: resolvedMetadata } - : {}), - }); - - span.end(); - }, - onError: async (err: unknown) => { - params.onError?.(err); - - span.log({ - error: serializeError(err), - }); - - span.end(); - }, - }), - ); - - // Use stream tee to track first token regardless of consumption method - const trackFirstToken = () => { - if (!receivedFirst) { - receivedFirst = true; - span.log({ - metrics: { - time_to_first_token: (Date.now() - startTime) / 1000, - }, - }); - } - }; - - if (result && result.baseStream) { - const [stream1, stream2] = result.baseStream.tee(); - result.baseStream = stream2; - - stream1 - .pipeThrough( - new TransformStream({ - transform(chunk, controller) { - trackFirstToken(); - controller.enqueue(chunk); - }, - }), - ) - .pipeTo( - new WritableStream({ - write() { - // Discard chunks - we only care about the side effect - }, - }), - ) - .catch(() => { - // Silently ignore errors from the tracking stream - }); - } - - return result; - } catch (error) { - span.log({ - error: serializeError(error), - }); - span.end(); - throw error; - } - }; -}; - -/** - * Wraps AI SDK tools with tracing support - * - * Supports all AI SDK versions (v3-v6): - * - Tools created with ai.tool() or tool() helper (have execute function) - * - Raw tool definitions with parameters only (v3-v4) - * - RSC tools with render function (v3-v4) - * - * Tools with execute are wrapped with tracing; others are passed through as-is. - */ -const wrapTools = (tools: AISDKTools | undefined) => { - if (!tools) return tools; - - const inferName = (tool: AISDKTool, fallback: string) => - (tool && (tool.name || tool.toolName || tool.id)) || fallback; - - if (Array.isArray(tools)) { - return tools.map((tool, idx) => { - const name = inferName(tool, `tool[${idx}]`); - return wrapToolExecute(tool, name); - }); - } - - const wrappedTools: Record = {}; - for (const [key, tool] of Object.entries(tools)) { - wrappedTools[key] = wrapToolExecute(tool, key); - } - return wrappedTools; -}; - -/** - * Checks if a value is an AsyncGenerator. - * AsyncGenerators are returned by async generator functions (async function* () {}) - * and must be iterated to consume their yielded values. - */ -const isAsyncGenerator = (value: any): value is AsyncGenerator => { - return ( - value != null && - typeof value === "object" && - typeof value[Symbol.asyncIterator] === "function" && - typeof value.next === "function" && - typeof value.return === "function" && - typeof value.throw === "function" + return makeStreamWrapper( + aiSDKChannels.streamObjectSync, + "streamObject", + streamObject, + { aiSDK }, + options, ); }; -const wrapToolExecute = (tool: AISDKTool, name: string): AISDKTool => { - // Only wrap tools that have an execute function (created with tool() helper) - // AI SDK v3-v6: tool({ description, inputSchema/parameters, execute }) +function mergeSpanInfo( + spanInfo: SpanInfo["span_info"] | undefined, + defaults: { + name?: string; + spanType?: SpanTypeAttribute; + }, +): SpanInfo["span_info"] | undefined { if ( - tool != null && - typeof tool === "object" && - "execute" in tool && - typeof tool.execute === "function" + defaults.name === undefined && + defaults.spanType === undefined && + spanInfo === undefined ) { - // Use Proxy with full transparency to wrap execute without breaking Zod schemas - // The Proxy must implement all traps to be fully transparent for object iteration - const originalExecute = tool.execute; - return new Proxy(tool, { - get(target, prop) { - if (prop === "execute") { - // Return a function that handles both regular async functions and async generators - const wrappedExecute = (...args: unknown[]) => { - const result = originalExecute.apply(target, args); - - // Check if the result is an async generator (from async function* () {}) - // AI SDK v5 supports async generator tools that yield intermediate status updates - if (isAsyncGenerator(result)) { - // Return a wrapper async generator that: - // 1. Iterates through the original generator - // 2. Yields all intermediate values (so consumers see status updates) - // 3. Tracks and logs the final yielded value as the tool output - return (async function* () { - const span = startSpan({ - name, - spanAttributes: { - type: SpanTypeAttribute.TOOL, - }, - }); - span.log({ input: args.length === 1 ? args[0] : args }); - - try { - let lastValue: unknown; - for await (const value of result) { - lastValue = value; - yield value; - } - // Log the final yielded value as the output - span.log({ output: lastValue }); - } catch (error) { - span.log({ error: serializeError(error) }); - throw error; - } finally { - span.end(); - } - })(); - } - - // For regular async functions, use traced as before - return traced( - async (span) => { - span.log({ input: args.length === 1 ? args[0] : args }); - const awaitedResult = await result; - span.log({ output: awaitedResult }); - return awaitedResult; - }, - { - name, - spanAttributes: { - type: SpanTypeAttribute.TOOL, - }, - }, - ); - }; - return wrappedExecute; - } - return target[prop]; - }, - // Implement additional traps for full transparency - has(target, prop) { - return prop in target; - }, - ownKeys(target) { - return Reflect.ownKeys(target); - }, - getOwnPropertyDescriptor(target, prop) { - return Object.getOwnPropertyDescriptor(target, prop); - }, - set(target, prop, value) { - target[prop] = value; - return true; - }, - deleteProperty(target, prop) { - delete target[prop]; - return true; - }, - defineProperty(target, prop, descriptor) { - Object.defineProperty(target, prop, descriptor); - return true; - }, - getPrototypeOf(target) { - return Object.getPrototypeOf(target); - }, - setPrototypeOf(target, proto) { - Object.setPrototypeOf(target, proto); - return true; - }, - isExtensible(target) { - return Object.isExtensible(target); - }, - preventExtensions(target) { - Object.preventExtensions(target); - return true; - }, - }); - } - // Pass through tools without execute (e.g., RSC tools with only render, raw definitions) - return tool; -}; - -const serializeError = (error: unknown) => { - if (error instanceof Error) { - return error; + return undefined; } - if (typeof error === "object" && error !== null) { - try { - return JSON.stringify(error); - } catch {} - } + return { + ...spanInfo, + ...(spanInfo?.name ? {} : defaults.name ? { name: defaults.name } : {}), + ...(defaults.spanType !== undefined || spanInfo?.spanAttributes + ? { + spanAttributes: { + ...(defaults.spanType !== undefined + ? { type: defaults.spanType } + : {}), + ...(spanInfo?.spanAttributes ?? {}), + }, + } + : {}), + }; +} - return String(error); -}; +function createAISDKChannelContext( + params: AISDKCallParams, + context: { + aiSDK?: AISDK; + denyOutputPaths?: string[]; + self?: unknown; + span_info?: SpanInfo["span_info"]; + } = {}, +) { + return { + arguments: [params] as [AISDKCallParams], + ...(context.aiSDK ? { aiSDK: context.aiSDK } : {}), + ...(context.denyOutputPaths + ? { denyOutputPaths: context.denyOutputPaths } + : {}), + ...(context.self !== undefined ? { self: context.self } : {}), + ...(context.span_info ? { span_info: context.span_info } : {}), + }; +} /** * Parses a gateway model string like "openai/gpt-5-mini" into provider and model. @@ -1243,7 +440,7 @@ function parseGatewayModelString(modelString: string): { * * @param model - Either a model object (with modelId and optional provider) or a model string */ -function serializeModelWithProvider(model: AISDKModel | undefined): { +export function serializeModelWithProvider(model: AISDKModel | undefined): { model: string | undefined; provider?: string; } { @@ -1267,7 +464,7 @@ function serializeModelWithProvider(model: AISDKModel | undefined): { * Extracts gateway routing info from the result's providerMetadata. * This provides the actual resolved provider and model used by the gateway. */ -function extractGatewayRoutingInfo(result: AISDKResult): { +export function extractGatewayRoutingInfo(result: AISDKResult): { model?: string; provider?: string; } | null { @@ -1460,7 +657,7 @@ const serializeOutputObject = ( * Result from sync input processing. * For v6, includes a Promise to resolve the async output schema. */ -interface ProcessInputSyncResult { +export interface ProcessInputSyncResult { input: AISDKCallParams; // v6: Promise that resolves to { output: { response_format: {...} } } when available outputPromise?: Promise<{ @@ -1475,7 +672,7 @@ interface ProcessInputSyncResult { * For v5: responseFormat is a plain object - captured fully * For v6: responseFormat is a Promise - returns initial input + Promise for update */ -const processInputAttachmentsSync = ( +export const processInputAttachmentsSync = ( input: AISDKCallParams, ): ProcessInputSyncResult => { if (!input) return { input }; @@ -1561,7 +758,7 @@ const processInputAttachmentsSync = ( * For v5: responseFormat is a plain object - captured fully * For v6: responseFormat is a Promise - awaited and captured fully */ -const processInputAttachments = async ( +export const processInputAttachments = async ( input: AISDKCallParams, ): Promise => { if (!input) return input; @@ -1875,7 +1072,7 @@ const extractGetterValues = ( return getterValues; }; -const processOutput = async ( +export const processOutput = async ( output: AISDKResult, denyOutputPaths?: string[], ) => { From 99a6525b3f755e8ad88e8315768c4ecf9dd2c57d Mon Sep 17 00:00:00 2001 From: Luca Forstner Date: Thu, 26 Mar 2026 17:13:47 +0100 Subject: [PATCH 2/2] fix --- .../instrumentation/core/channel-tracing.ts | 5 +- .../instrumentation/plugins/ai-sdk-plugin.ts | 261 +++++++++++++++--- js/src/wrappers/ai-sdk/ai-sdk.ts | 53 ++-- 3 files changed, 255 insertions(+), 64 deletions(-) diff --git a/js/src/instrumentation/core/channel-tracing.ts b/js/src/instrumentation/core/channel-tracing.ts index 3b2aec4b..e34a9660 100644 --- a/js/src/instrumentation/core/channel-tracing.ts +++ b/js/src/instrumentation/core/channel-tracing.ts @@ -568,12 +568,13 @@ export function traceSyncStreamChannel( const { span, startTime } = spanData; const endEvent = event as EndOf; + const result = endEvent.result; if ( config.patchResult?.({ channelName, endEvent, - result: endEvent.result, + result, span, startTime, }) @@ -581,7 +582,7 @@ export function traceSyncStreamChannel( return; } - const stream = endEvent.result; + const stream = result; if (!isSyncStreamLike>(stream)) { span.end(); diff --git a/js/src/instrumentation/plugins/ai-sdk-plugin.ts b/js/src/instrumentation/plugins/ai-sdk-plugin.ts index cdc30888..ddcaaad6 100644 --- a/js/src/instrumentation/plugins/ai-sdk-plugin.ts +++ b/js/src/instrumentation/plugins/ai-sdk-plugin.ts @@ -463,7 +463,9 @@ function prepareAISDKChildTracing( const patchedTools = new WeakSet(); let modelWrapped = false; - const patchModel = (model: AISDKModel | undefined): void => { + const patchModel = ( + model: AISDKModel | undefined, + ): AISDKModel | undefined => { const resolvedModel = resolveAISDKModel(model, aiSDK); if ( !resolvedModel || @@ -472,7 +474,7 @@ function prepareAISDKChildTracing( patchedModels.has(resolvedModel) || (resolvedModel as { [AUTO_PATCHED_MODEL]?: boolean })[AUTO_PATCHED_MODEL] ) { - return; + return resolvedModel; } patchedModels.add(resolvedModel); @@ -534,6 +536,8 @@ function prepareAISDKChildTracing( const result = await withCurrent(span, () => Reflect.apply(originalDoStream, resolvedModel, [options]), ); + const streamStartTime = getCurrentUnixTimestamp(); + let firstChunkTime: number | undefined; const output: Record = {}; let text = ""; let reasoning = ""; @@ -542,6 +546,10 @@ function prepareAISDKChildTracing( const transformStream = new TransformStream({ transform(chunk: AISDKModelStreamChunk, controller) { + if (firstChunkTime === undefined) { + firstChunkTime = getCurrentUnixTimestamp(); + } + switch (chunk.type) { case "text-delta": text += extractTextDelta(chunk); @@ -589,12 +597,20 @@ function prepareAISDKChildTracing( output.object = object; } + const metrics = extractTokenMetrics(output as AISDKResult); + if (firstChunkTime !== undefined) { + metrics.time_to_first_token = Math.max( + firstChunkTime - streamStartTime, + 1e-6, + ); + } + span.log({ output: processAISDKOutput( output as AISDKResult, denyOutputPaths, ), - metrics: extractTokenMetrics(output as AISDKResult), + metrics, ...buildResolvedMetadataPayload(output as AISDKResult), }); span.end(); @@ -620,6 +636,8 @@ function prepareAISDKChildTracing( AUTO_PATCHED_MODEL ]; }); + + return resolvedModel; }; const patchTool = (tool: AISDKTool, name: string): void => { @@ -711,7 +729,14 @@ function prepareAISDKChildTracing( }; if (params && typeof params === "object") { - patchModel(params.model); + const patchedParamModel = patchModel(params.model); + if ( + typeof params.model === "string" && + patchedParamModel && + typeof patchedParamModel === "object" + ) { + params.model = patchedParamModel; + } patchTools(params.tools); } @@ -722,12 +747,26 @@ function prepareAISDKChildTracing( }; if (selfRecord.model !== undefined) { - patchModel(selfRecord.model); + const patchedSelfModel = patchModel(selfRecord.model); + if ( + typeof selfRecord.model === "string" && + patchedSelfModel && + typeof patchedSelfModel === "object" + ) { + selfRecord.model = patchedSelfModel; + } } if (selfRecord.settings && typeof selfRecord.settings === "object") { if (selfRecord.settings.model !== undefined) { - patchModel(selfRecord.settings.model); + const patchedSettingsModel = patchModel(selfRecord.settings.model); + if ( + typeof selfRecord.settings.model === "string" && + patchedSettingsModel && + typeof patchedSettingsModel === "object" + ) { + selfRecord.settings.model = patchedSettingsModel; + } } if (selfRecord.settings.tools !== undefined) { patchTools(selfRecord.settings.tools); @@ -770,51 +809,107 @@ function patchAISDKStreamingResult(args: { } const resultRecord = result as Record; - if (!isReadableStreamLike(resultRecord.baseStream)) { + if (isReadableStreamLike(resultRecord.baseStream)) { + let firstChunkTime: number | undefined; + + const wrappedBaseStream = resultRecord.baseStream.pipeThrough( + new TransformStream({ + transform(chunk, controller) { + if (firstChunkTime === undefined) { + firstChunkTime = getCurrentUnixTimestamp(); + } + controller.enqueue(chunk); + }, + async flush() { + const metrics = extractTopLevelAISDKMetrics(result, endEvent); + if ( + metrics.time_to_first_token === undefined && + firstChunkTime !== undefined + ) { + metrics.time_to_first_token = firstChunkTime - startTime; + } + + const output = await processAISDKStreamingOutput( + result, + resolveDenyOutputPaths(endEvent, defaultDenyOutputPaths), + ); + const metadata = buildResolvedMetadataPayload(result).metadata; + + span.log({ + output, + ...(metadata ? { metadata } : {}), + metrics, + }); + + finalizeAISDKChildTracing(endEvent); + span.end(); + }, + }), + ); + + Object.defineProperty(resultRecord, "baseStream", { + configurable: true, + enumerable: true, + value: wrappedBaseStream, + writable: true, + }); + + return true; + } + + const streamField = findAsyncIterableField(resultRecord, [ + "partialObjectStream", + "textStream", + "fullStream", + "stream", + ]); + if (!streamField) { return false; } let firstChunkTime: number | undefined; + const wrappedStream = createPatchedAsyncIterable(streamField.stream, { + onChunk: () => { + if (firstChunkTime === undefined) { + firstChunkTime = getCurrentUnixTimestamp(); + } + }, + onComplete: async () => { + const metrics = extractTopLevelAISDKMetrics(result, endEvent); + if ( + metrics.time_to_first_token === undefined && + firstChunkTime !== undefined + ) { + metrics.time_to_first_token = firstChunkTime - startTime; + } - const wrappedBaseStream = resultRecord.baseStream.pipeThrough( - new TransformStream({ - transform(chunk, controller) { - if (firstChunkTime === undefined) { - firstChunkTime = getCurrentUnixTimestamp(); - } - controller.enqueue(chunk); - }, - async flush() { - const metrics = extractTopLevelAISDKMetrics(result, endEvent); - if ( - metrics.time_to_first_token === undefined && - firstChunkTime !== undefined - ) { - metrics.time_to_first_token = firstChunkTime - startTime; - } - - const output = await processAISDKStreamingOutput( - result, - resolveDenyOutputPaths(endEvent, defaultDenyOutputPaths), - ); - const metadata = buildResolvedMetadataPayload(result).metadata; - - span.log({ - output, - ...(metadata ? { metadata } : {}), - metrics, - }); + const output = await processAISDKStreamingOutput( + result, + resolveDenyOutputPaths(endEvent, defaultDenyOutputPaths), + ); + const metadata = buildResolvedMetadataPayload(result).metadata; - finalizeAISDKChildTracing(endEvent); - span.end(); - }, - }), - ); + span.log({ + output, + ...(metadata ? { metadata } : {}), + metrics, + }); + finalizeAISDKChildTracing(endEvent); + span.end(); + }, + onError: (error) => { + span.log({ + error: error.message, + }); + finalizeAISDKChildTracing(endEvent); + span.end(); + }, + }); - Object.defineProperty(resultRecord, "baseStream", { + Object.defineProperty(resultRecord, streamField.field, { configurable: true, enumerable: true, - value: wrappedBaseStream, + value: wrappedStream, writable: true, }); @@ -831,6 +926,60 @@ function isReadableStreamLike(value: unknown): value is { ); } +function isAsyncIterableLike(value: unknown): value is AsyncIterable { + return ( + value != null && + typeof value === "object" && + typeof (value as { [Symbol.asyncIterator]?: unknown })[ + Symbol.asyncIterator + ] === "function" + ); +} + +function findAsyncIterableField( + result: Record, + candidateFields: string[], +): { field: string; stream: AsyncIterable } | null { + for (const field of candidateFields) { + try { + const stream = result[field]; + if (isAsyncIterableLike(stream)) { + return { field, stream }; + } + } catch { + // Ignore getter failures. + } + } + + return null; +} + +function createPatchedAsyncIterable( + stream: AsyncIterable, + hooks: { + onChunk: (chunk: unknown) => void; + onComplete: () => Promise; + onError: (error: Error) => void; + }, +): AsyncIterable { + return { + async *[Symbol.asyncIterator]() { + try { + for await (const chunk of stream) { + hooks.onChunk(chunk); + yield chunk; + } + await hooks.onComplete(); + } catch (error) { + hooks.onError( + error instanceof Error ? error : new Error(String(error)), + ); + throw error; + } + }, + }; +} + async function processAISDKStreamingOutput( result: AISDKResult, denyOutputPaths: string[], @@ -844,8 +993,11 @@ async function processAISDKStreamingOutput( const outputRecord = output as Record; try { - if ("text" in result && typeof result.text === "string") { - outputRecord.text = result.text; + if ("text" in result) { + const resolvedText = await Promise.resolve(result.text); + if (typeof resolvedText === "string") { + outputRecord.text = resolvedText; + } } } catch { // Ignore getter failures @@ -862,6 +1014,17 @@ async function processAISDKStreamingOutput( // Ignore getter/promise failures } + try { + if ("finishReason" in result) { + const resolvedFinishReason = await Promise.resolve(result.finishReason); + if (resolvedFinishReason !== undefined) { + outputRecord.finishReason = resolvedFinishReason; + } + } + } catch { + // Ignore getter/promise failures + } + return outputRecord; } @@ -892,7 +1055,14 @@ function buildResolvedMetadataPayload(result: AISDKResult): { if (gatewayInfo?.model) { metadata.model = gatewayInfo.model; } - if (result.finishReason !== undefined) { + if ( + result.finishReason !== undefined && + !( + result.finishReason && + typeof result.finishReason === "object" && + typeof (result.finishReason as { then?: unknown }).then === "function" + ) + ) { metadata.finish_reason = result.finishReason; } @@ -1080,6 +1250,7 @@ function extractGetterValues( const getterValues: Record = {}; const getterNames = [ + "content", "text", "object", "finishReason", diff --git a/js/src/wrappers/ai-sdk/ai-sdk.ts b/js/src/wrappers/ai-sdk/ai-sdk.ts index c5cbd1b7..2dc20060 100644 --- a/js/src/wrappers/ai-sdk/ai-sdk.ts +++ b/js/src/wrappers/ai-sdk/ai-sdk.ts @@ -227,6 +227,7 @@ const wrapAgentStream = ( const defaultName = `${instance.constructor.name}.stream`; return (params: AISDKCallParams & SpanInfo) => makeStreamWrapper( + aiSDKChannels.agentStream, aiSDKChannels.streamTextSync, defaultName, stream.bind(instance), @@ -301,7 +302,12 @@ const wrapGenerateObject = ( }; const makeStreamWrapper = ( - channel: + asyncChannel: + | typeof aiSDKChannels.streamText + | typeof aiSDKChannels.streamObject + | typeof aiSDKChannels.agentStream + | typeof aiSDKChannels.toolLoopAgentStream, + syncChannel: | typeof aiSDKChannels.streamTextSync | typeof aiSDKChannels.streamObjectSync, name: string, @@ -313,22 +319,29 @@ const makeStreamWrapper = ( } = {}, options: WrapAISDKOptions = {}, ) => { + const useAsyncChannel = isAsyncFunction(streamText); + const wrapper = function (allParams: AISDKCallParams & SpanInfo) { const { span_info, ...params } = allParams; const tracedParams = { ...params }; - - return channel.traceSync( - () => streamText(tracedParams), - createAISDKChannelContext(tracedParams, { - aiSDK: contextOptions.aiSDK, - denyOutputPaths: options.denyOutputPaths, - self: contextOptions.self, - span_info: mergeSpanInfo(span_info, { - name, - spanType: contextOptions.spanType, - }), + const context = createAISDKChannelContext(tracedParams, { + aiSDK: contextOptions.aiSDK, + denyOutputPaths: options.denyOutputPaths, + self: contextOptions.self, + span_info: mergeSpanInfo(span_info, { + name, + spanType: contextOptions.spanType, }), - ); + }); + + if (useAsyncChannel) { + return asyncChannel.tracePromise( + () => Promise.resolve(streamText(tracedParams)), + context, + ); + } + + return syncChannel.traceSync(() => streamText(tracedParams), context); }; Object.defineProperty(wrapper, "name", { value: name, writable: false }); return wrapper; @@ -340,6 +353,7 @@ const wrapStreamText = ( aiSDK?: AISDK, ) => { return makeStreamWrapper( + aiSDKChannels.streamText, aiSDKChannels.streamTextSync, "streamText", streamText, @@ -354,6 +368,7 @@ const wrapStreamObject = ( aiSDK?: AISDK, ) => { return makeStreamWrapper( + aiSDKChannels.streamObject, aiSDKChannels.streamObjectSync, "streamObject", streamObject, @@ -393,6 +408,10 @@ function mergeSpanInfo( }; } +function isAsyncFunction(fn: unknown): boolean { + return typeof fn === "function" && fn.constructor?.name === "AsyncFunction"; +} + function createAISDKChannelContext( params: AISDKCallParams, context: { @@ -440,7 +459,7 @@ function parseGatewayModelString(modelString: string): { * * @param model - Either a model object (with modelId and optional provider) or a model string */ -export function serializeModelWithProvider(model: AISDKModel | undefined): { +function serializeModelWithProvider(model: AISDKModel | undefined): { model: string | undefined; provider?: string; } { @@ -464,7 +483,7 @@ export function serializeModelWithProvider(model: AISDKModel | undefined): { * Extracts gateway routing info from the result's providerMetadata. * This provides the actual resolved provider and model used by the gateway. */ -export function extractGatewayRoutingInfo(result: AISDKResult): { +function extractGatewayRoutingInfo(result: AISDKResult): { model?: string; provider?: string; } | null { @@ -758,7 +777,7 @@ export const processInputAttachmentsSync = ( * For v5: responseFormat is a plain object - captured fully * For v6: responseFormat is a Promise - awaited and captured fully */ -export const processInputAttachments = async ( +const processInputAttachments = async ( input: AISDKCallParams, ): Promise => { if (!input) return input; @@ -1072,7 +1091,7 @@ const extractGetterValues = ( return getterValues; }; -export const processOutput = async ( +const processOutput = async ( output: AISDKResult, denyOutputPaths?: string[], ) => {