diff --git a/js/src/framework.test.ts b/js/src/framework.test.ts index 6379905f..b1157c96 100644 --- a/js/src/framework.test.ts +++ b/js/src/framework.test.ts @@ -8,13 +8,14 @@ import { vi, } from "vitest"; import { + _exportsForTestingOnly as frameworkExportsForTestingOnly, defaultErrorScoreHandler, Eval, EvalScorer, runEvaluator, } from "./framework"; import { - _exportsForTestingOnly, + _exportsForTestingOnly as loggerExportsForTestingOnly, BraintrustState, initLogger, TestBackgroundLogger, @@ -33,6 +34,60 @@ class NoopProgressReporter implements ProgressReporter { public increment() {} } +test("waitForLogs3XactIngestion polls btql until the xact is queryable", async () => { + const post = vi + .fn() + .mockResolvedValueOnce({ + json: async () => ({ data: [] }), + }) + .mockResolvedValueOnce({ + json: async () => ({ data: [{ id: "span-1" }] }), + }); + const state = { + login: vi.fn().mockResolvedValue(undefined), + apiConn: () => ({ post }), + } as unknown as BraintrustState; + + await frameworkExportsForTestingOnly.waitForLogs3XactIngestion({ + state, + objectType: "experiment", + objectId: "exp-123", + rootSpanId: "root-456", + xactId: "xact-789", + initialBackoffMs: 0, + maxBackoffMs: 0, + timeoutMs: 100, + }); + + expect(state.login).toHaveBeenCalledWith({}); + expect(post).toHaveBeenCalledTimes(2); + expect(post).toHaveBeenNthCalledWith( + 1, + "btql", + expect.objectContaining({ + query: expect.objectContaining({ + select: [{ op: "literal", value: 1 }], + filter: { + op: "and", + children: [ + { + op: "eq", + left: { op: "ident", name: ["root_span_id"] }, + right: { op: "literal", value: "root-456" }, + }, + { + op: "eq", + left: { op: "ident", name: ["_xact_id"] }, + right: { op: "literal", value: "xact-789" }, + }, + ], + }, + }), + }), + { headers: { "Accept-Encoding": "gzip" } }, + ); +}); + test("meta (write) is passed to task", async () => { const metadata = { bar: "baz", @@ -559,7 +614,7 @@ test("trialIndex with multiple inputs", async () => { }); test("Eval with noSendLogs: true runs locally without creating experiment", async () => { - const memoryLogger = _exportsForTestingOnly.useTestBackgroundLogger(); + const memoryLogger = loggerExportsForTestingOnly.useTestBackgroundLogger(); const result = await Eval( "test-no-logs", @@ -677,10 +732,10 @@ test("Eval with returnResults: true collects all results", async () => { }); test("tags can be appended and logged to root span", async () => { - await _exportsForTestingOnly.simulateLoginForTests(); - const memoryLogger = _exportsForTestingOnly.useTestBackgroundLogger(); + await loggerExportsForTestingOnly.simulateLoginForTests(); + const memoryLogger = loggerExportsForTestingOnly.useTestBackgroundLogger(); const experiment = - _exportsForTestingOnly.initTestExperiment("js-tags-append"); + loggerExportsForTestingOnly.initTestExperiment("js-tags-append"); const initialTags = ["cookies n cream"]; const appendedTags = ["chocolate", "vanilla", "strawberry"]; @@ -736,9 +791,10 @@ test.each([ expectedTags: ["chocolate", "vanilla", "strawberry"], }, ])("$title", async ({ providedTags, expectedTags }) => { - await _exportsForTestingOnly.simulateLoginForTests(); - const memoryLogger = _exportsForTestingOnly.useTestBackgroundLogger(); - const experiment = _exportsForTestingOnly.initTestExperiment("js-tags-list"); + await loggerExportsForTestingOnly.simulateLoginForTests(); + const memoryLogger = loggerExportsForTestingOnly.useTestBackgroundLogger(); + const experiment = + loggerExportsForTestingOnly.initTestExperiment("js-tags-list"); const result = await runEvaluator( experiment, @@ -769,9 +825,10 @@ test.each([ }); test("tags are persisted with a failing scorer", async () => { - await _exportsForTestingOnly.simulateLoginForTests(); - const memoryLogger = _exportsForTestingOnly.useTestBackgroundLogger(); - const experiment = _exportsForTestingOnly.initTestExperiment("js-tags-list"); + await loggerExportsForTestingOnly.simulateLoginForTests(); + const memoryLogger = loggerExportsForTestingOnly.useTestBackgroundLogger(); + const experiment = + loggerExportsForTestingOnly.initTestExperiment("js-tags-list"); const expectedTags = ["chocolate", "vanilla", "strawberry"]; @@ -809,10 +866,10 @@ test("tags are persisted with a failing scorer", async () => { }); test("tags remain empty when not set", async () => { - await _exportsForTestingOnly.simulateLoginForTests(); - const memoryLogger = _exportsForTestingOnly.useTestBackgroundLogger(); + await loggerExportsForTestingOnly.simulateLoginForTests(); + const memoryLogger = loggerExportsForTestingOnly.useTestBackgroundLogger(); const experiment = - _exportsForTestingOnly.initTestExperiment("js-tags-append"); + loggerExportsForTestingOnly.initTestExperiment("js-tags-append"); const result = await runEvaluator( experiment, @@ -842,10 +899,10 @@ test("tags remain empty when not set", async () => { }); test("scorer spans have purpose='scorer' attribute", async () => { - await _exportsForTestingOnly.simulateLoginForTests(); - const memoryLogger = _exportsForTestingOnly.useTestBackgroundLogger(); + await loggerExportsForTestingOnly.simulateLoginForTests(); + const memoryLogger = loggerExportsForTestingOnly.useTestBackgroundLogger(); const experiment = - _exportsForTestingOnly.initTestExperiment("js-scorer-purpose"); + loggerExportsForTestingOnly.initTestExperiment("js-scorer-purpose"); const result = await runEvaluator( experiment, @@ -892,8 +949,8 @@ test("scorer spans have purpose='scorer' attribute", async () => { expect((span as any).span_attributes?.purpose).not.toBe("scorer"); } - _exportsForTestingOnly.clearTestBackgroundLogger(); - _exportsForTestingOnly.simulateLogoutForTests(); + loggerExportsForTestingOnly.clearTestBackgroundLogger(); + loggerExportsForTestingOnly.simulateLogoutForTests(); }); // ========== framework2 metadata tests ========== @@ -1486,7 +1543,7 @@ describe("framework2 metadata support", () => { }); test("Eval with enableCache: false does not use span cache", async () => { - await _exportsForTestingOnly.simulateLoginForTests(); + await loggerExportsForTestingOnly.simulateLoginForTests(); const state = new BraintrustState({ apiKey: "test-api-key", appUrl: "https://example.com", @@ -1511,7 +1568,7 @@ test("Eval with enableCache: false does not use span cache", async () => { }); test("Eval with enableCache: true (default) uses span cache", async () => { - await _exportsForTestingOnly.simulateLoginForTests(); + await loggerExportsForTestingOnly.simulateLoginForTests(); const state = new BraintrustState({ apiKey: "test-api-key", appUrl: "https://example.com", @@ -1536,9 +1593,9 @@ test("Eval with enableCache: true (default) uses span cache", async () => { }); test("Eval with parent flushes evaluator state, not global state", async () => { - await _exportsForTestingOnly.simulateLoginForTests(); + await loggerExportsForTestingOnly.simulateLoginForTests(); - _exportsForTestingOnly.useTestBackgroundLogger(); + loggerExportsForTestingOnly.useTestBackgroundLogger(); const evaluatorState = new BraintrustState({ apiKey: "test-api-key", @@ -1567,6 +1624,6 @@ test("Eval with parent flushes evaluator state, not global state", async () => { expect(evaluatorFlushSpy).toHaveBeenCalled(); - _exportsForTestingOnly.clearTestBackgroundLogger(); - _exportsForTestingOnly.simulateLogoutForTests(); + loggerExportsForTestingOnly.clearTestBackgroundLogger(); + loggerExportsForTestingOnly.simulateLogoutForTests(); }); diff --git a/js/src/framework.ts b/js/src/framework.ts index cfdd62bd..4a2a3a7b 100644 --- a/js/src/framework.ts +++ b/js/src/framework.ts @@ -67,6 +67,93 @@ export type BaseExperiment< name?: string; }; +const ENSURE_SPANS_FLUSH_INITIAL_BACKOFF_MS = 100; +const ENSURE_SPANS_FLUSH_MAX_BACKOFF_MS = 2_000; +const ENSURE_SPANS_FLUSH_TIMEOUT_MS = 30_000; + +async function waitForLogs3XactIngestion(args: { + state: BraintrustState; + objectType: "experiment" | "project_logs" | "playground_logs"; + objectId: string; + rootSpanId: string; + xactId: string; + initialBackoffMs?: number; + maxBackoffMs?: number; + timeoutMs?: number; +}): Promise { + const { + state, + objectType, + objectId, + rootSpanId, + xactId, + initialBackoffMs = ENSURE_SPANS_FLUSH_INITIAL_BACKOFF_MS, + maxBackoffMs = ENSURE_SPANS_FLUSH_MAX_BACKOFF_MS, + timeoutMs = ENSURE_SPANS_FLUSH_TIMEOUT_MS, + } = args; + + await state.login({}); + + const startedAt = Date.now(); + let backoffMs = initialBackoffMs; + + while (true) { + const response = await state.apiConn().post( + "btql", + { + query: { + select: [ + { + alias: "id", + expr: { op: "ident", name: ["id"] }, + }, + ], + from: { + op: "function", + name: { + op: "ident", + name: [objectType], + }, + args: [{ op: "literal", value: objectId }], + }, + filter: { + op: "and", + children: [ + { + op: "eq", + left: { op: "ident", name: ["root_span_id"] }, + right: { op: "literal", value: rootSpanId }, + }, + { + op: "eq", + left: { op: "ident", name: ["_xact_id"] }, + right: { op: "literal", value: xactId }, + }, + ], + }, + limit: 1, + }, + brainstore_realtime: false, + query_source: `sdk_ensure_spans_flushed_de15bf`, + }, + { headers: { "Accept-Encoding": "gzip" } }, + ); + const result = await response.json(); + if (Array.isArray(result.data) && result.data.length > 0) { + return; + } + + if (Date.now() - startedAt >= timeoutMs) { + throw new Error( + `Timed out waiting for logs3 xact ${xactId} to become queryable`, + ); + } + + await new Promise((resolve) => setTimeout(resolve, backoffMs)); + backoffMs = Math.min(backoffMs * 2, maxBackoffMs); + } +} + /** * Use this to specify that the dataset should actually be the data from a previous (base) experiment. * If you do not specify a name, Braintrust will automatically figure out the best base experiment to @@ -1041,7 +1128,20 @@ async function runEvaluatorInternal( }; const callback = async (rootSpan: Span) => { - const state = evaluator.state ?? _internalGetGlobalState(); + const state = + experiment?.loggingState ?? + evaluator.state ?? + _internalGetGlobalState(); + const parentStr = state.currentParent.getStore(); + const parentComponents = parentStr + ? SpanComponentsV3.fromStr(parentStr) + : null; + const traceObjectType = parentComponents + ? spanObjectTypeV3ToTypedString(parentComponents.data.object_type) + : "experiment"; + const traceObjectId = + parentComponents?.data.object_id ?? + (experimentIdPromise ? ((await experimentIdPromise) ?? "") : ""); const ensureSpansFlushed = async () => { // Flush native Braintrust spans if (experiment) { @@ -1056,25 +1156,23 @@ async function runEvaluatorInternal( if (state) { await state.flushOtel(); } - }; - const parentStr = state.currentParent.getStore(); - const parentComponents = parentStr - ? SpanComponentsV3.fromStr(parentStr) - : null; + const xactId = state?.bgLogger().lastFlushedXactId(); + if (state && xactId && traceObjectId) { + await waitForLogs3XactIngestion({ + state, + objectType: traceObjectType, + objectId: traceObjectId, + rootSpanId: rootSpan.rootSpanId, + xactId, + }); + } + }; const trace = state ? new LocalTrace({ - objectType: parentComponents - ? spanObjectTypeV3ToTypedString( - parentComponents.data.object_type, - ) - : "experiment", - objectId: - parentComponents?.data.object_id ?? - (experimentIdPromise - ? ((await experimentIdPromise) ?? "") - : ""), + objectType: traceObjectType, + objectId: traceObjectId, rootSpanId: rootSpan.rootSpanId, ensureSpansFlushed, state, @@ -1717,3 +1815,7 @@ const defaultReporter: ReporterDef = { return evalReports.every((r) => r); }, }; + +export const _exportsForTestingOnly = { + waitForLogs3XactIngestion, +}; diff --git a/js/src/logger.test.ts b/js/src/logger.test.ts index fc4f14f7..f91cb896 100644 --- a/js/src/logger.test.ts +++ b/js/src/logger.test.ts @@ -27,6 +27,26 @@ import { SpanComponentsV3 } from "../util/span_identifier_v3"; configureNode(); +test("extractLastXactIdFromLogs3Response returns the last xact id", () => { + expect( + _exportsForTestingOnly.extractLastXactIdFromLogs3Response({ + rows: [{ _xact_id: "xact-1" }, { _xact_id: "xact-2" }], + }), + ).toBe("xact-2"); + + expect( + _exportsForTestingOnly.extractLastXactIdFromLogs3Response({ + xact_ids: ["xact-1", "xact-2"], + }), + ).toBe("xact-2"); +}); + +test("maxXactId keeps the numerically largest xact id", () => { + expect(_exportsForTestingOnly.maxXactId("10", "9")).toBe("10"); + expect(_exportsForTestingOnly.maxXactId("10", "11")).toBe("11"); + expect(_exportsForTestingOnly.maxXactId(null, "11")).toBe("11"); +}); + test("renderMessage with file content parts", () => { const message = { role: "user" as const, diff --git a/js/src/logger.ts b/js/src/logger.ts index 658d47ae..be3ab72d 100644 --- a/js/src/logger.ts +++ b/js/src/logger.ts @@ -2509,6 +2509,74 @@ function constructLogs3Data(items: LogItemWithMeta[]) { return `{"rows": ${constructJsonArray(items.map((i) => i.str))}, "api_version": 2}`; } +function extractLastXactIdFromLogs3Response(response: unknown): string | null { + if (typeof response === "string") { + return null; + } + + if (Array.isArray(response)) { + for (let i = response.length - 1; i >= 0; i--) { + const xactId = extractLastXactIdFromLogs3Response(response[i]); + if (xactId) { + return xactId; + } + } + return null; + } + + if (!response || typeof response !== "object") { + return null; + } + + const record = response as Record; + for (const field of ["_xact_id", "xact_id", "transaction_id"] as const) { + const value = record[field]; + if (typeof value === "string") { + return value; + } + } + + for (const field of ["_xact_ids", "xact_ids"] as const) { + const value = record[field]; + if (Array.isArray(value)) { + for (let i = value.length - 1; i >= 0; i--) { + if (typeof value[i] === "string") { + return value[i]; + } + } + } + } + + for (const field of ["data", "rows"] as const) { + const xactId = extractLastXactIdFromLogs3Response(record[field]); + if (xactId) { + return xactId; + } + } + + return null; +} + +function maxXactId( + currentXactId: string | null, + candidateXactId: string | null, +): string | null { + if (!candidateXactId) { + return currentXactId; + } + if (!currentXactId) { + return candidateXactId; + } + + try { + return BigInt(candidateXactId) > BigInt(currentXactId) + ? candidateXactId + : currentXactId; + } catch { + return currentXactId; + } +} + export function constructLogs3OverflowRequest(key: string) { return { rows: { @@ -2627,6 +2695,7 @@ const DEFAULT_FLUSH_BACKPRESSURE_BYTES = 10 * 1024 * 1024; // 10 MB interface BackgroundLogger { log(items: LazyValue[]): void; flush(): Promise; + lastFlushedXactId(): string | null; pendingFlushBytes(): number; flushBackpressureBytes(): number; setMaskingFunction( @@ -2652,6 +2721,10 @@ export class TestBackgroundLogger implements BackgroundLogger { return Promise.resolve(); } + lastFlushedXactId(): string | null { + return null; + } + pendingFlushBytes(): number { return 0; } @@ -2745,6 +2818,7 @@ class HTTPBackgroundLogger implements BackgroundLogger { public failedPublishPayloadsDir: string | undefined = undefined; public allPublishPayloadsDir: string | undefined = undefined; private _flushBackpressureBytes: number = DEFAULT_FLUSH_BACKPRESSURE_BYTES; + private _lastFlushedXactId: string | null = null; private _pendingBytes: number = 0; @@ -2848,6 +2922,10 @@ class HTTPBackgroundLogger implements BackgroundLogger { return this._pendingBytes; } + lastFlushedXactId(): string | null { + return this._lastFlushedXactId; + } + flushBackpressureBytes(): number { return this._flushBackpressureBytes; } @@ -3168,6 +3246,7 @@ class HTTPBackgroundLogger implements BackgroundLogger { const startTime = now(); let error: unknown = undefined; try { + let response: unknown; if (overflowRows) { if (!overflowUpload) { const currentUpload = await this.requestLogs3OverflowUpload(conn, { @@ -3181,12 +3260,19 @@ class HTTPBackgroundLogger implements BackgroundLogger { ); overflowUpload = currentUpload; } - await conn.post_json( + response = await conn.post_json( "logs3", constructLogs3OverflowRequest(overflowUpload.key), ); } else { - await conn.post_json("logs3", dataStr); + response = await conn.post_json("logs3", dataStr); + } + const lastXactId = extractLastXactIdFromLogs3Response(response); + if (lastXactId) { + this._lastFlushedXactId = maxXactId( + this._lastFlushedXactId, + lastXactId, + ); } } catch (e) { error = e; @@ -5771,9 +5857,7 @@ export class ObjectFetcher implements AsyncIterable< let maxVersion: string | undefined = undefined; for await (const record of this.fetch(options)) { const xactId = String(record[TRANSACTION_ID_FIELD] ?? "0"); - if (maxVersion === undefined || xactId > maxVersion) { - maxVersion = xactId; - } + maxVersion = maxXactId(maxVersion ?? null, xactId) ?? undefined; } return maxVersion; } @@ -8051,6 +8135,8 @@ function resetIdGenStateForTests() { export const _exportsForTestingOnly = { extractAttachments, + extractLastXactIdFromLogs3Response, + maxXactId, deepCopyEvent, useTestBackgroundLogger, clearTestBackgroundLogger,