From f51fe443f5fb0a0029ab6d6cad27aa2a3c9a073e Mon Sep 17 00:00:00 2001 From: Stephen Belanger Date: Fri, 27 Mar 2026 22:32:27 +0800 Subject: [PATCH 1/2] fix(anthropic): preserve all streaming content block types (#1660, #1661, #1621) - Extend `AnthropicOutputContentBlock` with `thinking` and `citations` variants - Add `AnthropicCitation` interface to vendor types - Add `thinking_delta`, `citations_delta`, and `signature_delta` to stream event delta union - Replace flat `string[]` accumulator with structured `{ textDeltas, citations }` per block - Handle `thinking_delta` events (accumulated like text) - Handle `citations_delta` events (accumulated separately and merged into finalized text block) - Make `finalizeContentBlock()` forward-compatible: unknown block types are now preserved as-is instead of being silently deleted, fixing `server_tool_use`, `web_search_tool_result` and any future Anthropic content block types - Keep structured output when any text block carries citations - Add unit tests for all new block types and mixed content scenarios Co-Authored-By: Claude Sonnet 4.6 --- .../plugins/anthropic-plugin.test.ts | 224 ++++++++++++++++++ .../plugins/anthropic-plugin.ts | 91 +++++-- js/src/vendor-sdk-types/anthropic.ts | 14 +- 3 files changed, 304 insertions(+), 25 deletions(-) diff --git a/js/src/instrumentation/plugins/anthropic-plugin.test.ts b/js/src/instrumentation/plugins/anthropic-plugin.test.ts index 889f83d08..35112328e 100644 --- a/js/src/instrumentation/plugins/anthropic-plugin.test.ts +++ b/js/src/instrumentation/plugins/anthropic-plugin.test.ts @@ -471,6 +471,230 @@ describe("aggregateAnthropicStreamChunks", () => { expect(result.output).toBe("Hi"); }); + + it("should capture thinking content blocks from thinking_delta events", () => { + const chunks = [ + { + type: "content_block_start", + index: 0, + content_block: { type: "thinking" }, + }, + { + type: "content_block_delta", + index: 0, + delta: { type: "thinking_delta", thinking: "Let me think..." }, + }, + { + type: "content_block_delta", + index: 0, + delta: { type: "thinking_delta", thinking: " Yes, I understand." }, + }, + { type: "content_block_stop", index: 0 }, + { + type: "content_block_start", + index: 1, + content_block: { type: "text" }, + }, + { + type: "content_block_delta", + index: 1, + delta: { type: "text_delta", text: "The answer is 42." }, + }, + { type: "content_block_stop", index: 1 }, + ]; + + const result = aggregateAnthropicStreamChunks(chunks); + + expect(result.output).toEqual({ + content: [ + { type: "thinking", thinking: "Let me think... Yes, I understand." }, + { type: "text", text: "The answer is 42." }, + ], + }); + }); + + it("should capture citations from citations_delta events and attach to text blocks", () => { + const citation1 = { + type: "char_location", + cited_text: "source text", + document_index: 0, + start_char_index: 0, + end_char_index: 11, + }; + const chunks = [ + { + type: "content_block_start", + index: 0, + content_block: { type: "text" }, + }, + { + type: "content_block_delta", + index: 0, + delta: { type: "text_delta", text: "According to the document." }, + }, + { + type: "content_block_delta", + index: 0, + delta: { type: "citations_delta", citation: citation1 }, + }, + { type: "content_block_stop", index: 0 }, + ]; + + const result = aggregateAnthropicStreamChunks(chunks); + + expect(result.output).toEqual({ + content: [ + { + type: "text", + text: "According to the document.", + citations: [citation1], + }, + ], + }); + }); + + it("should preserve server_tool_use blocks without deltas", () => { + const chunks = [ + { + type: "content_block_start", + index: 0, + content_block: { + type: "server_tool_use", + id: "srvtoolu_abc123", + name: "web_search", + input: { query: "braintrust" }, + }, + }, + { type: "content_block_stop", index: 0 }, + ]; + + const result = aggregateAnthropicStreamChunks(chunks); + + expect(result.output).toEqual({ + content: [ + { + type: "server_tool_use", + id: "srvtoolu_abc123", + name: "web_search", + input: { query: "braintrust" }, + }, + ], + }); + }); + + it("should preserve web_search_tool_result blocks without deltas", () => { + const chunks = [ + { + type: "content_block_start", + index: 0, + content_block: { + type: "web_search_tool_result", + tool_use_id: "srvtoolu_abc123", + content: [ + { + type: "web_search_result", + url: "https://example.com", + title: "Example", + }, + ], + }, + }, + { type: "content_block_stop", index: 0 }, + ]; + + const result = aggregateAnthropicStreamChunks(chunks); + + expect(result.output).toEqual({ + content: [ + { + type: "web_search_tool_result", + tool_use_id: "srvtoolu_abc123", + content: [ + { + type: "web_search_result", + url: "https://example.com", + title: "Example", + }, + ], + }, + ], + }); + }); + + it("should preserve unknown future content block types", () => { + const chunks = [ + { + type: "content_block_start", + index: 0, + content_block: { type: "some_future_type", data: "value" }, + }, + { type: "content_block_stop", index: 0 }, + ]; + + const result = aggregateAnthropicStreamChunks(chunks); + + expect(result.output).toEqual({ + content: [{ type: "some_future_type", data: "value" }], + }); + }); + + it("should handle mixed content: thinking + text + tool_use", () => { + const chunks = [ + { + type: "content_block_start", + index: 0, + content_block: { type: "thinking" }, + }, + { + type: "content_block_delta", + index: 0, + delta: { type: "thinking_delta", thinking: "Reasoning..." }, + }, + { type: "content_block_stop", index: 0 }, + { + type: "content_block_start", + index: 1, + content_block: { type: "text" }, + }, + { + type: "content_block_delta", + index: 1, + delta: { type: "text_delta", text: "I'll use a tool." }, + }, + { type: "content_block_stop", index: 1 }, + { + type: "content_block_start", + index: 2, + content_block: { + type: "tool_use", + id: "tu_123", + name: "calc", + input: {}, + }, + }, + { + type: "content_block_delta", + index: 2, + delta: { type: "input_json_delta", partial_json: '{"x":' }, + }, + { + type: "content_block_delta", + index: 2, + delta: { type: "input_json_delta", partial_json: "1}" }, + }, + { type: "content_block_stop", index: 2 }, + ]; + + const result = aggregateAnthropicStreamChunks(chunks); + + expect(result.output).toEqual({ + content: [ + { type: "thinking", thinking: "Reasoning..." }, + { type: "text", text: "I'll use a tool." }, + { type: "tool_use", id: "tu_123", name: "calc", input: { x: 1 } }, + ], + }); + }); }); describe("processAttachmentsInInput", () => { diff --git a/js/src/instrumentation/plugins/anthropic-plugin.ts b/js/src/instrumentation/plugins/anthropic-plugin.ts index 862bf7583..c20b1b61d 100644 --- a/js/src/instrumentation/plugins/anthropic-plugin.ts +++ b/js/src/instrumentation/plugins/anthropic-plugin.ts @@ -7,6 +7,7 @@ import { finalizeAnthropicTokens } from "../../wrappers/anthropic-tokens-util"; import { anthropicChannels } from "./anthropic-channels"; import type { AnthropicBase64Source, + AnthropicCitation, AnthropicCreateParams, AnthropicInputMessage, AnthropicMessage, @@ -136,6 +137,11 @@ export function parseMetricsFromUsage( * - message_delta: Final usage stats and metadata * - message_stop: End of stream */ +type ContentBlockAccumulator = { + textDeltas: string[]; + citations: AnthropicCitation[]; +}; + export function aggregateAnthropicStreamChunks( chunks: AnthropicStreamEvent[], ): { @@ -145,7 +151,7 @@ export function aggregateAnthropicStreamChunks( } { const fallbackTextDeltas: string[] = []; const contentBlocks: Record = {}; - const contentBlockDeltas: Record = {}; + const contentBlockDeltas: Record = {}; let metrics: Record = {}; let metadata: Record = {}; let role: string | undefined; @@ -166,32 +172,42 @@ export function aggregateAnthropicStreamChunks( case "content_block_start": if (event.content_block) { contentBlocks[event.index] = event.content_block; - contentBlockDeltas[event.index] = []; + contentBlockDeltas[event.index] = { textDeltas: [], citations: [] }; } break; - case "content_block_delta": - if (event.delta?.type === "text_delta") { - const text = event.delta.text; + case "content_block_delta": { + const acc = contentBlockDeltas[event.index]; + const delta = event.delta; + if (delta?.type === "text_delta") { + const text = delta.text; if (text) { - if ( - contentBlocks[event.index] !== undefined || - contentBlockDeltas[event.index] !== undefined - ) { - contentBlockDeltas[event.index] ??= []; - contentBlockDeltas[event.index].push(text); + if (acc !== undefined) { + acc.textDeltas.push(text); } else { fallbackTextDeltas.push(text); } } - } else if (event.delta?.type === "input_json_delta") { - const partialJson = event.delta.partial_json; + } else if (delta?.type === "input_json_delta") { + const partialJson = delta.partial_json; if (partialJson) { - contentBlockDeltas[event.index] ??= []; - contentBlockDeltas[event.index].push(partialJson); + if (acc !== undefined) { + acc.textDeltas.push(partialJson); + } + } + } else if (delta?.type === "thinking_delta") { + const thinking = delta.thinking; + if (thinking && acc !== undefined) { + acc.textDeltas.push(thinking); + } + } else if (delta?.type === "citations_delta") { + if (acc !== undefined) { + acc.citations.push(delta.citation); } } + // signature_delta: ignored (not needed for tracing) break; + } case "content_block_stop": finalizeContentBlock( @@ -227,7 +243,10 @@ export function aggregateAnthropicStreamChunks( let output: unknown = fallbackTextDeltas.join(""); if (orderedContent.length > 0) { - if (orderedContent.every(isTextContentBlock)) { + if ( + orderedContent.every(isTextContentBlock) && + orderedContent.every((block) => !block.citations?.length) + ) { output = orderedContent.map((block) => block.text).join(""); } else { output = { @@ -255,7 +274,7 @@ export function aggregateAnthropicStreamChunks( function finalizeContentBlock( index: number, contentBlocks: Record, - contentBlockDeltas: Record, + contentBlockDeltas: Record, fallbackTextDeltas: string[], ): void { const contentBlock = contentBlocks[index]; @@ -263,7 +282,8 @@ function finalizeContentBlock( return; } - const text = contentBlockDeltas[index]?.join("") ?? ""; + const acc = contentBlockDeltas[index]; + const text = acc?.textDeltas.join("") ?? ""; if (isToolUseContentBlock(contentBlock)) { if (!text) { @@ -288,17 +308,36 @@ function finalizeContentBlock( return; } + const updated: AnthropicOutputContentBlock = { ...contentBlock, text }; + if (acc?.citations.length) { + ( + updated as { + type: "text"; + text: string; + citations?: AnthropicCitation[]; + } + ).citations = acc.citations; + } + contentBlocks[index] = updated; + return; + } + + if (isThinkingContentBlock(contentBlock)) { + if (!text) { + delete contentBlocks[index]; + return; + } + contentBlocks[index] = { ...contentBlock, - text, + thinking: text, }; return; } - if (text) { - fallbackTextDeltas.push(text); - } - delete contentBlocks[index]; + // Forward-compatible default: preserve unrecognized blocks as-is rather than deleting. + // This ensures future Anthropic content block types (server_tool_use, web_search_tool_result, etc.) + // are not silently dropped from traces. } function isTextContentBlock( @@ -313,6 +352,12 @@ function isToolUseContentBlock( return contentBlock.type === "tool_use"; } +function isThinkingContentBlock( + contentBlock: AnthropicOutputContentBlock, +): contentBlock is Extract { + return contentBlock.type === "thinking"; +} + function isAnthropicBase64ContentBlock( input: Record, ): input is Record & { diff --git a/js/src/vendor-sdk-types/anthropic.ts b/js/src/vendor-sdk-types/anthropic.ts index 440251148..d661471c1 100644 --- a/js/src/vendor-sdk-types/anthropic.ts +++ b/js/src/vendor-sdk-types/anthropic.ts @@ -82,14 +82,20 @@ export interface AnthropicMessage { stop_sequence?: string | null; } +export interface AnthropicCitation { + type: string; + [key: string]: unknown; +} + export type AnthropicOutputContentBlock = - | { type: "text"; text: string } + | { type: "text"; text: string; citations?: AnthropicCitation[] } | { type: "tool_use"; id: string; name: string; input: Record; } + | { type: "thinking"; thinking: string } | { type: string }; export interface AnthropicUsage { @@ -111,7 +117,11 @@ export type AnthropicStreamEvent = index: number; delta: | { type: "text_delta"; text: string } - | { type: "input_json_delta"; partial_json: string }; + | { type: "input_json_delta"; partial_json: string } + | { type: "thinking_delta"; thinking: string } + | { type: "citations_delta"; citation: AnthropicCitation } + | { type: "signature_delta"; signature: string } + | { type: string }; } | { type: "content_block_stop"; index: number } | { From f937a6401fda2d2d81110a3351a0419378f4752d Mon Sep 17 00:00:00 2001 From: Stephen Belanger Date: Fri, 27 Mar 2026 22:48:04 +0800 Subject: [PATCH 2/2] fix: use 'in' guards instead of type narrowing for catch-all delta union TypeScript cannot narrow a discriminated union member when a catch-all { type: string } overlaps with specific literal types. Use runtime 'in' checks + casts to safely extract fields from known delta types while still allowing unknown future delta types to fall through. Co-Authored-By: Claude Sonnet 4.6 --- .../plugins/anthropic-plugin.ts | 35 +++++++++++-------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/js/src/instrumentation/plugins/anthropic-plugin.ts b/js/src/instrumentation/plugins/anthropic-plugin.ts index c20b1b61d..3632c0b80 100644 --- a/js/src/instrumentation/plugins/anthropic-plugin.ts +++ b/js/src/instrumentation/plugins/anthropic-plugin.ts @@ -179,8 +179,9 @@ export function aggregateAnthropicStreamChunks( case "content_block_delta": { const acc = contentBlockDeltas[event.index]; const delta = event.delta; - if (delta?.type === "text_delta") { - const text = delta.text; + if (!delta) break; + if (delta.type === "text_delta" && "text" in delta) { + const text = (delta as { type: string; text: string }).text; if (text) { if (acc !== undefined) { acc.textDeltas.push(text); @@ -188,24 +189,30 @@ export function aggregateAnthropicStreamChunks( fallbackTextDeltas.push(text); } } - } else if (delta?.type === "input_json_delta") { - const partialJson = delta.partial_json; - if (partialJson) { - if (acc !== undefined) { - acc.textDeltas.push(partialJson); - } + } else if ( + delta.type === "input_json_delta" && + "partial_json" in delta + ) { + const partialJson = (delta as { type: string; partial_json: string }) + .partial_json; + if (partialJson && acc !== undefined) { + acc.textDeltas.push(partialJson); } - } else if (delta?.type === "thinking_delta") { - const thinking = delta.thinking; + } else if (delta.type === "thinking_delta" && "thinking" in delta) { + const thinking = (delta as { type: string; thinking: string }) + .thinking; if (thinking && acc !== undefined) { acc.textDeltas.push(thinking); } - } else if (delta?.type === "citations_delta") { - if (acc !== undefined) { - acc.citations.push(delta.citation); + } else if (delta.type === "citations_delta" && "citation" in delta) { + const citation = ( + delta as { type: string; citation: AnthropicCitation } + ).citation; + if (citation && acc !== undefined) { + acc.citations.push(citation); } } - // signature_delta: ignored (not needed for tracing) + // signature_delta and unknown future delta types: ignored break; }