From 0b53895b04d7d19dc5f5ef4151bfddb648e0c75a Mon Sep 17 00:00:00 2001 From: Alex Z Date: Wed, 25 Mar 2026 12:07:32 -0700 Subject: [PATCH 01/10] slop --- js/src/framework.test.ts | 106 +++++++++++++++++++++++-------- js/src/framework.ts | 130 ++++++++++++++++++++++++++++++++++----- js/src/logger.test.ts | 14 +++++ js/src/logger.ts | 68 +++++++++++++++++++- 4 files changed, 275 insertions(+), 43 deletions(-) diff --git a/js/src/framework.test.ts b/js/src/framework.test.ts index 6379905f..1cdca1c5 100644 --- a/js/src/framework.test.ts +++ b/js/src/framework.test.ts @@ -8,13 +8,14 @@ import { vi, } from "vitest"; import { + _exportsForTestingOnly as frameworkExportsForTestingOnly, defaultErrorScoreHandler, Eval, EvalScorer, runEvaluator, } from "./framework"; import { - _exportsForTestingOnly, + _exportsForTestingOnly as loggerExportsForTestingOnly, BraintrustState, initLogger, TestBackgroundLogger, @@ -33,6 +34,59 @@ class NoopProgressReporter implements ProgressReporter { public increment() {} } +test("waitForLogs3XactIngestion polls btql until the xact is queryable", async () => { + const post = vi + .fn() + .mockResolvedValueOnce({ + json: async () => ({ data: [] }), + }) + .mockResolvedValueOnce({ + json: async () => ({ data: [{ id: "span-1" }] }), + }); + const state = { + login: vi.fn().mockResolvedValue(undefined), + apiConn: () => ({ post }), + } as unknown as BraintrustState; + + await frameworkExportsForTestingOnly.waitForLogs3XactIngestion({ + state, + objectType: "experiment", + objectId: "exp-123", + rootSpanId: "root-456", + xactId: "xact-789", + initialBackoffMs: 0, + maxBackoffMs: 0, + timeoutMs: 100, + }); + + expect(state.login).toHaveBeenCalledWith({}); + expect(post).toHaveBeenCalledTimes(2); + expect(post).toHaveBeenNthCalledWith( + 1, + "btql", + expect.objectContaining({ + query: expect.objectContaining({ + filter: { + op: "and", + children: [ + { + op: "eq", + left: { op: "ident", name: ["root_span_id"] }, + right: { op: "literal", value: "root-456" }, + }, + { + op: "eq", + left: { op: "ident", name: ["_xact_id"] }, + right: { op: "literal", value: "xact-789" }, + }, + ], + }, + }), + }), + { headers: { "Accept-Encoding": "gzip" } }, + ); +}); + test("meta (write) is passed to task", async () => { const metadata = { bar: "baz", @@ -559,7 +613,7 @@ test("trialIndex with multiple inputs", async () => { }); test("Eval with noSendLogs: true runs locally without creating experiment", async () => { - const memoryLogger = _exportsForTestingOnly.useTestBackgroundLogger(); + const memoryLogger = loggerExportsForTestingOnly.useTestBackgroundLogger(); const result = await Eval( "test-no-logs", @@ -677,10 +731,10 @@ test("Eval with returnResults: true collects all results", async () => { }); test("tags can be appended and logged to root span", async () => { - await _exportsForTestingOnly.simulateLoginForTests(); - const memoryLogger = _exportsForTestingOnly.useTestBackgroundLogger(); + await loggerExportsForTestingOnly.simulateLoginForTests(); + const memoryLogger = loggerExportsForTestingOnly.useTestBackgroundLogger(); const experiment = - _exportsForTestingOnly.initTestExperiment("js-tags-append"); + loggerExportsForTestingOnly.initTestExperiment("js-tags-append"); const initialTags = ["cookies n cream"]; const appendedTags = ["chocolate", "vanilla", "strawberry"]; @@ -736,9 +790,10 @@ test.each([ expectedTags: ["chocolate", "vanilla", "strawberry"], }, ])("$title", async ({ providedTags, expectedTags }) => { - await _exportsForTestingOnly.simulateLoginForTests(); - const memoryLogger = _exportsForTestingOnly.useTestBackgroundLogger(); - const experiment = _exportsForTestingOnly.initTestExperiment("js-tags-list"); + await loggerExportsForTestingOnly.simulateLoginForTests(); + const memoryLogger = loggerExportsForTestingOnly.useTestBackgroundLogger(); + const experiment = + loggerExportsForTestingOnly.initTestExperiment("js-tags-list"); const result = await runEvaluator( experiment, @@ -769,9 +824,10 @@ test.each([ }); test("tags are persisted with a failing scorer", async () => { - await _exportsForTestingOnly.simulateLoginForTests(); - const memoryLogger = _exportsForTestingOnly.useTestBackgroundLogger(); - const experiment = _exportsForTestingOnly.initTestExperiment("js-tags-list"); + await loggerExportsForTestingOnly.simulateLoginForTests(); + const memoryLogger = loggerExportsForTestingOnly.useTestBackgroundLogger(); + const experiment = + loggerExportsForTestingOnly.initTestExperiment("js-tags-list"); const expectedTags = ["chocolate", "vanilla", "strawberry"]; @@ -809,10 +865,10 @@ test("tags are persisted with a failing scorer", async () => { }); test("tags remain empty when not set", async () => { - await _exportsForTestingOnly.simulateLoginForTests(); - const memoryLogger = _exportsForTestingOnly.useTestBackgroundLogger(); + await loggerExportsForTestingOnly.simulateLoginForTests(); + const memoryLogger = loggerExportsForTestingOnly.useTestBackgroundLogger(); const experiment = - _exportsForTestingOnly.initTestExperiment("js-tags-append"); + loggerExportsForTestingOnly.initTestExperiment("js-tags-append"); const result = await runEvaluator( experiment, @@ -842,10 +898,10 @@ test("tags remain empty when not set", async () => { }); test("scorer spans have purpose='scorer' attribute", async () => { - await _exportsForTestingOnly.simulateLoginForTests(); - const memoryLogger = _exportsForTestingOnly.useTestBackgroundLogger(); + await loggerExportsForTestingOnly.simulateLoginForTests(); + const memoryLogger = loggerExportsForTestingOnly.useTestBackgroundLogger(); const experiment = - _exportsForTestingOnly.initTestExperiment("js-scorer-purpose"); + loggerExportsForTestingOnly.initTestExperiment("js-scorer-purpose"); const result = await runEvaluator( experiment, @@ -892,8 +948,8 @@ test("scorer spans have purpose='scorer' attribute", async () => { expect((span as any).span_attributes?.purpose).not.toBe("scorer"); } - _exportsForTestingOnly.clearTestBackgroundLogger(); - _exportsForTestingOnly.simulateLogoutForTests(); + loggerExportsForTestingOnly.clearTestBackgroundLogger(); + loggerExportsForTestingOnly.simulateLogoutForTests(); }); // ========== framework2 metadata tests ========== @@ -1486,7 +1542,7 @@ describe("framework2 metadata support", () => { }); test("Eval with enableCache: false does not use span cache", async () => { - await _exportsForTestingOnly.simulateLoginForTests(); + await loggerExportsForTestingOnly.simulateLoginForTests(); const state = new BraintrustState({ apiKey: "test-api-key", appUrl: "https://example.com", @@ -1511,7 +1567,7 @@ test("Eval with enableCache: false does not use span cache", async () => { }); test("Eval with enableCache: true (default) uses span cache", async () => { - await _exportsForTestingOnly.simulateLoginForTests(); + await loggerExportsForTestingOnly.simulateLoginForTests(); const state = new BraintrustState({ apiKey: "test-api-key", appUrl: "https://example.com", @@ -1536,9 +1592,9 @@ test("Eval with enableCache: true (default) uses span cache", async () => { }); test("Eval with parent flushes evaluator state, not global state", async () => { - await _exportsForTestingOnly.simulateLoginForTests(); + await loggerExportsForTestingOnly.simulateLoginForTests(); - _exportsForTestingOnly.useTestBackgroundLogger(); + loggerExportsForTestingOnly.useTestBackgroundLogger(); const evaluatorState = new BraintrustState({ apiKey: "test-api-key", @@ -1567,6 +1623,6 @@ test("Eval with parent flushes evaluator state, not global state", async () => { expect(evaluatorFlushSpy).toHaveBeenCalled(); - _exportsForTestingOnly.clearTestBackgroundLogger(); - _exportsForTestingOnly.simulateLogoutForTests(); + loggerExportsForTestingOnly.clearTestBackgroundLogger(); + loggerExportsForTestingOnly.simulateLogoutForTests(); }); diff --git a/js/src/framework.ts b/js/src/framework.ts index cfdd62bd..a2681e88 100644 --- a/js/src/framework.ts +++ b/js/src/framework.ts @@ -67,6 +67,89 @@ export type BaseExperiment< name?: string; }; +const ENSURE_SPANS_FLUSH_INITIAL_BACKOFF_MS = 100; +const ENSURE_SPANS_FLUSH_MAX_BACKOFF_MS = 2_000; +const ENSURE_SPANS_FLUSH_TIMEOUT_MS = 30_000; + +async function waitForLogs3XactIngestion(args: { + state: BraintrustState; + objectType: "experiment" | "project_logs" | "playground_logs"; + objectId: string; + rootSpanId: string; + xactId: string; + initialBackoffMs?: number; + maxBackoffMs?: number; + timeoutMs?: number; +}): Promise { + const { + state, + objectType, + objectId, + rootSpanId, + xactId, + initialBackoffMs = ENSURE_SPANS_FLUSH_INITIAL_BACKOFF_MS, + maxBackoffMs = ENSURE_SPANS_FLUSH_MAX_BACKOFF_MS, + timeoutMs = ENSURE_SPANS_FLUSH_TIMEOUT_MS, + } = args; + + await state.login({}); + + const startedAt = Date.now(); + let backoffMs = initialBackoffMs; + + while (true) { + const response = await state.apiConn().post( + "btql", + { + query: { + select: [{ op: "star" }], + from: { + op: "function", + name: { + op: "ident", + name: [objectType], + }, + args: [{ op: "literal", value: objectId }], + }, + filter: { + op: "and", + children: [ + { + op: "eq", + left: { op: "ident", name: ["root_span_id"] }, + right: { op: "literal", value: rootSpanId }, + }, + { + op: "eq", + left: { op: "ident", name: ["_xact_id"] }, + right: { op: "literal", value: xactId }, + }, + ], + }, + limit: 1, + }, + use_columnstore: false, + brainstore_realtime: true, + query_source: `js_sdk_ensure_spans_flushed_${objectType}`, + }, + { headers: { "Accept-Encoding": "gzip" } }, + ); + const result = await response.json(); + if (Array.isArray(result.data) && result.data.length > 0) { + return; + } + + if (Date.now() - startedAt >= timeoutMs) { + throw new Error( + `Timed out waiting for logs3 xact ${xactId} to become queryable`, + ); + } + + await new Promise((resolve) => setTimeout(resolve, backoffMs)); + backoffMs = Math.min(backoffMs * 2, maxBackoffMs); + } +} + /** * Use this to specify that the dataset should actually be the data from a previous (base) experiment. * If you do not specify a name, Braintrust will automatically figure out the best base experiment to @@ -1041,7 +1124,20 @@ async function runEvaluatorInternal( }; const callback = async (rootSpan: Span) => { - const state = evaluator.state ?? _internalGetGlobalState(); + const state = + experiment?.loggingState ?? + evaluator.state ?? + _internalGetGlobalState(); + const parentStr = state.currentParent.getStore(); + const parentComponents = parentStr + ? SpanComponentsV3.fromStr(parentStr) + : null; + const traceObjectType = parentComponents + ? spanObjectTypeV3ToTypedString(parentComponents.data.object_type) + : "experiment"; + const traceObjectId = + parentComponents?.data.object_id ?? + (experimentIdPromise ? ((await experimentIdPromise) ?? "") : ""); const ensureSpansFlushed = async () => { // Flush native Braintrust spans if (experiment) { @@ -1056,25 +1152,23 @@ async function runEvaluatorInternal( if (state) { await state.flushOtel(); } - }; - const parentStr = state.currentParent.getStore(); - const parentComponents = parentStr - ? SpanComponentsV3.fromStr(parentStr) - : null; + const xactId = state?.bgLogger().lastFlushedXactId(); + if (state && xactId && traceObjectId) { + await waitForLogs3XactIngestion({ + state, + objectType: traceObjectType, + objectId: traceObjectId, + rootSpanId: rootSpan.rootSpanId, + xactId, + }); + } + }; const trace = state ? new LocalTrace({ - objectType: parentComponents - ? spanObjectTypeV3ToTypedString( - parentComponents.data.object_type, - ) - : "experiment", - objectId: - parentComponents?.data.object_id ?? - (experimentIdPromise - ? ((await experimentIdPromise) ?? "") - : ""), + objectType: traceObjectType, + objectId: traceObjectId, rootSpanId: rootSpan.rootSpanId, ensureSpansFlushed, state, @@ -1717,3 +1811,7 @@ const defaultReporter: ReporterDef = { return evalReports.every((r) => r); }, }; + +export const _exportsForTestingOnly = { + waitForLogs3XactIngestion, +}; diff --git a/js/src/logger.test.ts b/js/src/logger.test.ts index fc4f14f7..c7720408 100644 --- a/js/src/logger.test.ts +++ b/js/src/logger.test.ts @@ -27,6 +27,20 @@ import { SpanComponentsV3 } from "../util/span_identifier_v3"; configureNode(); +test("extractLastXactIdFromLogs3Response returns the last xact id", () => { + expect( + _exportsForTestingOnly.extractLastXactIdFromLogs3Response({ + rows: [{ _xact_id: "xact-1" }, { _xact_id: "xact-2" }], + }), + ).toBe("xact-2"); + + expect( + _exportsForTestingOnly.extractLastXactIdFromLogs3Response({ + xact_ids: ["xact-1", "xact-2"], + }), + ).toBe("xact-2"); +}); + test("renderMessage with file content parts", () => { const message = { role: "user" as const, diff --git a/js/src/logger.ts b/js/src/logger.ts index 658d47ae..f7da0bab 100644 --- a/js/src/logger.ts +++ b/js/src/logger.ts @@ -2509,6 +2509,54 @@ function constructLogs3Data(items: LogItemWithMeta[]) { return `{"rows": ${constructJsonArray(items.map((i) => i.str))}, "api_version": 2}`; } +function extractLastXactIdFromLogs3Response(response: unknown): string | null { + if (typeof response === "string") { + return null; + } + + if (Array.isArray(response)) { + for (let i = response.length - 1; i >= 0; i--) { + const xactId = extractLastXactIdFromLogs3Response(response[i]); + if (xactId) { + return xactId; + } + } + return null; + } + + if (!response || typeof response !== "object") { + return null; + } + + const record = response as Record; + for (const field of ["_xact_id", "xact_id", "transaction_id"] as const) { + const value = record[field]; + if (typeof value === "string") { + return value; + } + } + + for (const field of ["_xact_ids", "xact_ids"] as const) { + const value = record[field]; + if (Array.isArray(value)) { + for (let i = value.length - 1; i >= 0; i--) { + if (typeof value[i] === "string") { + return value[i]; + } + } + } + } + + for (const field of ["data", "rows"] as const) { + const xactId = extractLastXactIdFromLogs3Response(record[field]); + if (xactId) { + return xactId; + } + } + + return null; +} + export function constructLogs3OverflowRequest(key: string) { return { rows: { @@ -2627,6 +2675,7 @@ const DEFAULT_FLUSH_BACKPRESSURE_BYTES = 10 * 1024 * 1024; // 10 MB interface BackgroundLogger { log(items: LazyValue[]): void; flush(): Promise; + lastFlushedXactId(): string | null; pendingFlushBytes(): number; flushBackpressureBytes(): number; setMaskingFunction( @@ -2652,6 +2701,10 @@ export class TestBackgroundLogger implements BackgroundLogger { return Promise.resolve(); } + lastFlushedXactId(): string | null { + return null; + } + pendingFlushBytes(): number { return 0; } @@ -2745,6 +2798,7 @@ class HTTPBackgroundLogger implements BackgroundLogger { public failedPublishPayloadsDir: string | undefined = undefined; public allPublishPayloadsDir: string | undefined = undefined; private _flushBackpressureBytes: number = DEFAULT_FLUSH_BACKPRESSURE_BYTES; + private _lastFlushedXactId: string | null = null; private _pendingBytes: number = 0; @@ -2848,6 +2902,10 @@ class HTTPBackgroundLogger implements BackgroundLogger { return this._pendingBytes; } + lastFlushedXactId(): string | null { + return this._lastFlushedXactId; + } + flushBackpressureBytes(): number { return this._flushBackpressureBytes; } @@ -3168,6 +3226,7 @@ class HTTPBackgroundLogger implements BackgroundLogger { const startTime = now(); let error: unknown = undefined; try { + let response: unknown; if (overflowRows) { if (!overflowUpload) { const currentUpload = await this.requestLogs3OverflowUpload(conn, { @@ -3181,12 +3240,16 @@ class HTTPBackgroundLogger implements BackgroundLogger { ); overflowUpload = currentUpload; } - await conn.post_json( + response = await conn.post_json( "logs3", constructLogs3OverflowRequest(overflowUpload.key), ); } else { - await conn.post_json("logs3", dataStr); + response = await conn.post_json("logs3", dataStr); + } + const lastXactId = extractLastXactIdFromLogs3Response(response); + if (lastXactId) { + this._lastFlushedXactId = lastXactId; } } catch (e) { error = e; @@ -8051,6 +8114,7 @@ function resetIdGenStateForTests() { export const _exportsForTestingOnly = { extractAttachments, + extractLastXactIdFromLogs3Response, deepCopyEvent, useTestBackgroundLogger, clearTestBackgroundLogger, From 7caf063ffff815a2fb8fd07a4c2cffe75975c6c8 Mon Sep 17 00:00:00 2001 From: Alex Z Date: Wed, 25 Mar 2026 14:14:55 -0700 Subject: [PATCH 02/10] a few fixes --- js/src/framework.test.ts | 1 + js/src/framework.ts | 2 +- js/src/logger.test.ts | 6 ++++++ js/src/logger.ts | 26 +++++++++++++++++++++++++- 4 files changed, 33 insertions(+), 2 deletions(-) diff --git a/js/src/framework.test.ts b/js/src/framework.test.ts index 1cdca1c5..b1157c96 100644 --- a/js/src/framework.test.ts +++ b/js/src/framework.test.ts @@ -66,6 +66,7 @@ test("waitForLogs3XactIngestion polls btql until the xact is queryable", async ( "btql", expect.objectContaining({ query: expect.objectContaining({ + select: [{ op: "literal", value: 1 }], filter: { op: "and", children: [ diff --git a/js/src/framework.ts b/js/src/framework.ts index a2681e88..811960f9 100644 --- a/js/src/framework.ts +++ b/js/src/framework.ts @@ -102,7 +102,7 @@ async function waitForLogs3XactIngestion(args: { "btql", { query: { - select: [{ op: "star" }], + select: [{ op: "literal", value: 1 }], from: { op: "function", name: { diff --git a/js/src/logger.test.ts b/js/src/logger.test.ts index c7720408..f91cb896 100644 --- a/js/src/logger.test.ts +++ b/js/src/logger.test.ts @@ -41,6 +41,12 @@ test("extractLastXactIdFromLogs3Response returns the last xact id", () => { ).toBe("xact-2"); }); +test("maxXactId keeps the numerically largest xact id", () => { + expect(_exportsForTestingOnly.maxXactId("10", "9")).toBe("10"); + expect(_exportsForTestingOnly.maxXactId("10", "11")).toBe("11"); + expect(_exportsForTestingOnly.maxXactId(null, "11")).toBe("11"); +}); + test("renderMessage with file content parts", () => { const message = { role: "user" as const, diff --git a/js/src/logger.ts b/js/src/logger.ts index f7da0bab..1aabc273 100644 --- a/js/src/logger.ts +++ b/js/src/logger.ts @@ -2557,6 +2557,26 @@ function extractLastXactIdFromLogs3Response(response: unknown): string | null { return null; } +function maxXactId( + currentXactId: string | null, + candidateXactId: string | null, +): string | null { + if (!candidateXactId) { + return currentXactId; + } + if (!currentXactId) { + return candidateXactId; + } + + try { + return BigInt(candidateXactId) > BigInt(currentXactId) + ? candidateXactId + : currentXactId; + } catch { + return currentXactId; + } +} + export function constructLogs3OverflowRequest(key: string) { return { rows: { @@ -3249,7 +3269,10 @@ class HTTPBackgroundLogger implements BackgroundLogger { } const lastXactId = extractLastXactIdFromLogs3Response(response); if (lastXactId) { - this._lastFlushedXactId = lastXactId; + this._lastFlushedXactId = maxXactId( + this._lastFlushedXactId, + lastXactId, + ); } } catch (e) { error = e; @@ -8115,6 +8138,7 @@ function resetIdGenStateForTests() { export const _exportsForTestingOnly = { extractAttachments, extractLastXactIdFromLogs3Response, + maxXactId, deepCopyEvent, useTestBackgroundLogger, clearTestBackgroundLogger, From ca982146f05345c710185d1ddd259f7e6baeeb0c Mon Sep 17 00:00:00 2001 From: Alex Z Date: Wed, 25 Mar 2026 14:18:00 -0700 Subject: [PATCH 03/10] standarize --- js/src/logger.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/js/src/logger.ts b/js/src/logger.ts index 1aabc273..a6fe8f8f 100644 --- a/js/src/logger.ts +++ b/js/src/logger.ts @@ -5857,9 +5857,7 @@ export class ObjectFetcher implements AsyncIterable< let maxVersion: string | undefined = undefined; for await (const record of this.fetch(options)) { const xactId = String(record[TRANSACTION_ID_FIELD] ?? "0"); - if (maxVersion === undefined || xactId > maxVersion) { - maxVersion = xactId; - } + maxVersion = maxXactId(maxVersion, xactId) ?? undefined; } return maxVersion; } From f8d54f74319ed9fe7dca565276ff12c6ac129e3f Mon Sep 17 00:00:00 2001 From: Alex Z Date: Wed, 25 Mar 2026 14:34:53 -0700 Subject: [PATCH 04/10] better query id --- js/src/framework.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/src/framework.ts b/js/src/framework.ts index 811960f9..2b35413d 100644 --- a/js/src/framework.ts +++ b/js/src/framework.ts @@ -130,7 +130,7 @@ async function waitForLogs3XactIngestion(args: { }, use_columnstore: false, brainstore_realtime: true, - query_source: `js_sdk_ensure_spans_flushed_${objectType}`, + query_source: `sdk_ensure_spans_flushed_de15bf`, }, { headers: { "Accept-Encoding": "gzip" } }, ); From 68d28ca9c48329a41a644f6a295a5364e6cfe346 Mon Sep 17 00:00:00 2001 From: Alex Z Date: Wed, 25 Mar 2026 14:56:12 -0700 Subject: [PATCH 05/10] types --- js/src/logger.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/src/logger.ts b/js/src/logger.ts index a6fe8f8f..be3ab72d 100644 --- a/js/src/logger.ts +++ b/js/src/logger.ts @@ -5857,7 +5857,7 @@ export class ObjectFetcher implements AsyncIterable< let maxVersion: string | undefined = undefined; for await (const record of this.fetch(options)) { const xactId = String(record[TRANSACTION_ID_FIELD] ?? "0"); - maxVersion = maxXactId(maxVersion, xactId) ?? undefined; + maxVersion = maxXactId(maxVersion ?? null, xactId) ?? undefined; } return maxVersion; } From 4c54559c373c8e1d85eecd80aadf439a0004c57c Mon Sep 17 00:00:00 2001 From: Alex Z Date: Thu, 26 Mar 2026 11:17:27 -0700 Subject: [PATCH 06/10] types --- js/src/framework.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/js/src/framework.ts b/js/src/framework.ts index 2b35413d..e10a3f94 100644 --- a/js/src/framework.ts +++ b/js/src/framework.ts @@ -102,7 +102,12 @@ async function waitForLogs3XactIngestion(args: { "btql", { query: { - select: [{ op: "literal", value: 1 }], + select: [ + { + alias: "id", + expr: { op: "ident", name: ["id"] }, + }, + ], from: { op: "function", name: { From b5ccbd5f5142e4962ec25810a6fecde104708a4e Mon Sep 17 00:00:00 2001 From: Alex Z Date: Thu, 26 Mar 2026 11:57:07 -0700 Subject: [PATCH 07/10] types --- js/src/framework.ts | 55 +++++++++------------------------------------ 1 file changed, 11 insertions(+), 44 deletions(-) diff --git a/js/src/framework.ts b/js/src/framework.ts index e10a3f94..6a57f695 100644 --- a/js/src/framework.ts +++ b/js/src/framework.ts @@ -83,9 +83,7 @@ async function waitForLogs3XactIngestion(args: { }): Promise { const { state, - objectType, objectId, - rootSpanId, xactId, initialBackoffMs = ENSURE_SPANS_FLUSH_INITIAL_BACKOFF_MS, maxBackoffMs = ENSURE_SPANS_FLUSH_MAX_BACKOFF_MS, @@ -96,51 +94,20 @@ async function waitForLogs3XactIngestion(args: { const startedAt = Date.now(); let backoffMs = initialBackoffMs; + const targetXactId = BigInt(xactId); while (true) { - const response = await state.apiConn().post( - "btql", - { - query: { - select: [ - { - alias: "id", - expr: { op: "ident", name: ["id"] }, - }, - ], - from: { - op: "function", - name: { - op: "ident", - name: [objectType], - }, - args: [{ op: "literal", value: objectId }], - }, - filter: { - op: "and", - children: [ - { - op: "eq", - left: { op: "ident", name: ["root_span_id"] }, - right: { op: "literal", value: rootSpanId }, - }, - { - op: "eq", - left: { op: "ident", name: ["_xact_id"] }, - right: { op: "literal", value: xactId }, - }, - ], - }, - limit: 1, - }, - use_columnstore: false, - brainstore_realtime: true, - query_source: `sdk_ensure_spans_flushed_de15bf`, - }, - { headers: { "Accept-Encoding": "gzip" } }, + const result = await state.apiConn().get_json( + `brainstore/backfill/status/object/${objectId}`, ); - const result = await response.json(); - if (Array.isArray(result.data) && result.data.length > 0) { + + if ( + result && + typeof result === "object" && + "last_processed_xact_id" in result && + typeof result.last_processed_xact_id === "string" && + BigInt(result.last_processed_xact_id) >= targetXactId + ) { return; } From dd775b730d42c02505be65e9d3a2e5946fd7b22d Mon Sep 17 00:00:00 2001 From: Alex Z Date: Thu, 26 Mar 2026 12:02:48 -0700 Subject: [PATCH 08/10] fix api --- js/src/framework.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/js/src/framework.ts b/js/src/framework.ts index 6a57f695..bfbb8209 100644 --- a/js/src/framework.ts +++ b/js/src/framework.ts @@ -83,6 +83,7 @@ async function waitForLogs3XactIngestion(args: { }): Promise { const { state, + objectType, objectId, xactId, initialBackoffMs = ENSURE_SPANS_FLUSH_INITIAL_BACKOFF_MS, @@ -95,10 +96,11 @@ async function waitForLogs3XactIngestion(args: { const startedAt = Date.now(); let backoffMs = initialBackoffMs; const targetXactId = BigInt(xactId); + const brainstoreObjectId = `${objectType}:${objectId}`; while (true) { const result = await state.apiConn().get_json( - `brainstore/backfill/status/object/${objectId}`, + `brainstore/backfill/status/object/${brainstoreObjectId}`, ); if ( From b9290090d652ddabdbf686ddc9456ebaa093b164 Mon Sep 17 00:00:00 2001 From: Alex Z Date: Thu, 26 Mar 2026 12:38:47 -0700 Subject: [PATCH 09/10] rewind --- js/src/framework.ts | 55 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/js/src/framework.ts b/js/src/framework.ts index bfbb8209..e10a3f94 100644 --- a/js/src/framework.ts +++ b/js/src/framework.ts @@ -85,6 +85,7 @@ async function waitForLogs3XactIngestion(args: { state, objectType, objectId, + rootSpanId, xactId, initialBackoffMs = ENSURE_SPANS_FLUSH_INITIAL_BACKOFF_MS, maxBackoffMs = ENSURE_SPANS_FLUSH_MAX_BACKOFF_MS, @@ -95,21 +96,51 @@ async function waitForLogs3XactIngestion(args: { const startedAt = Date.now(); let backoffMs = initialBackoffMs; - const targetXactId = BigInt(xactId); - const brainstoreObjectId = `${objectType}:${objectId}`; while (true) { - const result = await state.apiConn().get_json( - `brainstore/backfill/status/object/${brainstoreObjectId}`, + const response = await state.apiConn().post( + "btql", + { + query: { + select: [ + { + alias: "id", + expr: { op: "ident", name: ["id"] }, + }, + ], + from: { + op: "function", + name: { + op: "ident", + name: [objectType], + }, + args: [{ op: "literal", value: objectId }], + }, + filter: { + op: "and", + children: [ + { + op: "eq", + left: { op: "ident", name: ["root_span_id"] }, + right: { op: "literal", value: rootSpanId }, + }, + { + op: "eq", + left: { op: "ident", name: ["_xact_id"] }, + right: { op: "literal", value: xactId }, + }, + ], + }, + limit: 1, + }, + use_columnstore: false, + brainstore_realtime: true, + query_source: `sdk_ensure_spans_flushed_de15bf`, + }, + { headers: { "Accept-Encoding": "gzip" } }, ); - - if ( - result && - typeof result === "object" && - "last_processed_xact_id" in result && - typeof result.last_processed_xact_id === "string" && - BigInt(result.last_processed_xact_id) >= targetXactId - ) { + const result = await response.json(); + if (Array.isArray(result.data) && result.data.length > 0) { return; } From b2cb1a2fe7b6b62a9fe0b517f74941a1e7721ed9 Mon Sep 17 00:00:00 2001 From: Alex Z Date: Thu, 26 Mar 2026 15:31:57 -0700 Subject: [PATCH 10/10] turn realtime off --- js/src/framework.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/js/src/framework.ts b/js/src/framework.ts index e10a3f94..4a2a3a7b 100644 --- a/js/src/framework.ts +++ b/js/src/framework.ts @@ -133,8 +133,7 @@ async function waitForLogs3XactIngestion(args: { }, limit: 1, }, - use_columnstore: false, - brainstore_realtime: true, + brainstore_realtime: false, query_source: `sdk_ensure_spans_flushed_de15bf`, }, { headers: { "Accept-Encoding": "gzip" } },