Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import * as Sentry from '@sentry/node';
import { loggingTransport } from '@sentry-internal/node-integration-tests';

Sentry.init({
dsn: 'https://public@dsn.ingest.sentry.io/1337',
release: '1.0',
tracesSampleRate: 1.0,
sendDefaultPii: true,
transport: loggingTransport,
traceLifecycle: 'stream',
integrations: [
Sentry.anthropicAIIntegration({
enableTruncation: true,
}),
],
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import * as Sentry from '@sentry/node';
import { loggingTransport } from '@sentry-internal/node-integration-tests';

Sentry.init({
dsn: 'https://public@dsn.ingest.sentry.io/1337',
release: '1.0',
tracesSampleRate: 1.0,
sendDefaultPii: true,
transport: loggingTransport,
traceLifecycle: 'stream',
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import Anthropic from '@anthropic-ai/sdk';
import * as Sentry from '@sentry/node';
import express from 'express';

function startMockAnthropicServer() {
const app = express();
app.use(express.json({ limit: '10mb' }));

app.post('/anthropic/v1/messages', (req, res) => {
res.send({
id: 'msg_streaming_test',
type: 'message',
model: req.body.model,
role: 'assistant',
content: [{ type: 'text', text: 'Response' }],
stop_reason: 'end_turn',
stop_sequence: null,
usage: { input_tokens: 10, output_tokens: 5 },
});
});

return new Promise(resolve => {
const server = app.listen(0, () => {
resolve(server);
});
});
}

async function run() {
const server = await startMockAnthropicServer();

await Sentry.startSpan({ op: 'function', name: 'main' }, async () => {
const client = new Anthropic({
apiKey: 'mock-api-key',
baseURL: `http://localhost:${server.address().port}/anthropic`,
});

// Long content that would normally be truncated
const longContent = 'A'.repeat(50_000);
await client.messages.create({
model: 'claude-3-haiku-20240307',
max_tokens: 100,
messages: [{ role: 'user', content: longContent }],
});
});

// Flush is required when span streaming is enabled to ensure streamed spans are sent before the process exits
await Sentry.flush(2000);

server.close();
}

run();
Original file line number Diff line number Diff line change
Expand Up @@ -844,4 +844,52 @@ describe('Anthropic integration', () => {
});
},
);

const streamingLongContent = 'A'.repeat(50_000);

createEsmAndCjsTests(__dirname, 'scenario-streaming.mjs', 'instrument-streaming.mjs', (createRunner, test) => {
test('automatically disables truncation when span streaming is enabled', async () => {
await createRunner()
.expect({
span: container => {
const spans = container.items;

const chatSpan = spans.find(s =>
s.attributes?.[GEN_AI_INPUT_MESSAGES_ATTRIBUTE]?.value?.includes(streamingLongContent),
);
expect(chatSpan).toBeDefined();
},
})
.start()
.completed();
});
});

createEsmAndCjsTests(
__dirname,
'scenario-streaming.mjs',
'instrument-streaming-with-truncation.mjs',
(createRunner, test) => {
test('respects explicit enableTruncation: true even when span streaming is enabled', async () => {
await createRunner()
.expect({
span: container => {
const spans = container.items;

// With explicit enableTruncation: true, content should be truncated despite streaming.
// Find the chat span by matching the start of the truncated content (the 'A' repeated messages).
const chatSpan = spans.find(s =>
s.attributes?.[GEN_AI_INPUT_MESSAGES_ATTRIBUTE]?.value?.startsWith('[{"role":"user","content":"AAAA'),
);
expect(chatSpan).toBeDefined();
expect(chatSpan!.attributes[GEN_AI_INPUT_MESSAGES_ATTRIBUTE].value.length).toBeLessThan(
streamingLongContent.length,
);
},
})
.start()
.completed();
});
},
);
});
11 changes: 11 additions & 0 deletions packages/core/src/tracing/ai/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
import { captureException } from '../../exports';
import { getClient } from '../../currentScopes';
import { hasSpanStreamingEnabled } from '../spans/hasSpanStreamingEnabled';
import type { Span } from '../../types-hoist/span';
import { isThenable } from '../../utils/is';
import {
Expand Down Expand Up @@ -56,6 +57,16 @@ export function resolveAIRecordingOptions<T extends AIRecordingOptions>(options?
} as T & Required<AIRecordingOptions>;
}

/**
* Resolves whether truncation should be enabled.
* If the user explicitly set `enableTruncation`, that value is used.
* Otherwise, truncation is disabled when span streaming is active.
*/
export function shouldEnableTruncation(enableTruncation: boolean | undefined): boolean {
const client = getClient();
return enableTruncation ?? !(client && hasSpanStreamingEnabled(client));
}

/**
* Build method path from current traversal
*/
Expand Down
7 changes: 4 additions & 3 deletions packages/core/src/tracing/anthropic-ai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
buildMethodPath,
resolveAIRecordingOptions,
setTokenUsageAttributes,
shouldEnableTruncation,
wrapPromiseWithMethods,
} from '../ai/utils';
import { ANTHROPIC_METHOD_REGISTRY } from './constants';
Expand Down Expand Up @@ -206,7 +207,7 @@ function handleStreamingRequest<T extends unknown[], R>(
originalResult = originalMethod.apply(context, args) as Promise<R>;

if (options.recordInputs && params) {
addPrivateRequestAttributes(span, params, options.enableTruncation ?? true);
addPrivateRequestAttributes(span, params, shouldEnableTruncation(options.enableTruncation));
}

return (async () => {
Expand All @@ -228,7 +229,7 @@ function handleStreamingRequest<T extends unknown[], R>(
return startSpanManual(spanConfig, span => {
try {
if (options.recordInputs && params) {
addPrivateRequestAttributes(span, params, options.enableTruncation ?? true);
addPrivateRequestAttributes(span, params, shouldEnableTruncation(options.enableTruncation));
}
const messageStream = target.apply(context, args);
return instrumentMessageStream(messageStream, span, options.recordOutputs ?? false);
Expand Down Expand Up @@ -289,7 +290,7 @@ function instrumentMethod<T extends unknown[], R>(
originalResult = target.apply(context, args) as Promise<R>;

if (options.recordInputs && params) {
addPrivateRequestAttributes(span, params, options.enableTruncation ?? true);
addPrivateRequestAttributes(span, params, shouldEnableTruncation(options.enableTruncation));
}

return originalResult.then(
Expand Down
Loading