-
Notifications
You must be signed in to change notification settings - Fork 3
Wait for spans to become available before we run getThread/getSpans #1670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
0b53895
7caf063
ca98214
f8d54f7
68d28ca
4c54559
b5ccbd5
dd775b7
b929009
b2cb1a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,6 +67,93 @@ export type BaseExperiment< | |
| name?: string; | ||
| }; | ||
|
|
||
| const ENSURE_SPANS_FLUSH_INITIAL_BACKOFF_MS = 100; | ||
| const ENSURE_SPANS_FLUSH_MAX_BACKOFF_MS = 2_000; | ||
| const ENSURE_SPANS_FLUSH_TIMEOUT_MS = 30_000; | ||
|
|
||
| async function waitForLogs3XactIngestion(args: { | ||
| state: BraintrustState; | ||
| objectType: "experiment" | "project_logs" | "playground_logs"; | ||
| objectId: string; | ||
| rootSpanId: string; | ||
| xactId: string; | ||
| initialBackoffMs?: number; | ||
| maxBackoffMs?: number; | ||
| timeoutMs?: number; | ||
| }): Promise<void> { | ||
| const { | ||
| state, | ||
| objectType, | ||
| objectId, | ||
| rootSpanId, | ||
| xactId, | ||
| initialBackoffMs = ENSURE_SPANS_FLUSH_INITIAL_BACKOFF_MS, | ||
| maxBackoffMs = ENSURE_SPANS_FLUSH_MAX_BACKOFF_MS, | ||
| timeoutMs = ENSURE_SPANS_FLUSH_TIMEOUT_MS, | ||
| } = args; | ||
|
|
||
| await state.login({}); | ||
|
|
||
| const startedAt = Date.now(); | ||
| let backoffMs = initialBackoffMs; | ||
|
|
||
| while (true) { | ||
| const response = await state.apiConn().post( | ||
| "btql", | ||
| { | ||
| query: { | ||
| select: [ | ||
| { | ||
| alias: "id", | ||
| expr: { op: "ident", name: ["id"] }, | ||
| }, | ||
| ], | ||
| from: { | ||
| op: "function", | ||
| name: { | ||
| op: "ident", | ||
| name: [objectType], | ||
| }, | ||
| args: [{ op: "literal", value: objectId }], | ||
| }, | ||
| filter: { | ||
| op: "and", | ||
| children: [ | ||
| { | ||
| op: "eq", | ||
| left: { op: "ident", name: ["root_span_id"] }, | ||
| right: { op: "literal", value: rootSpanId }, | ||
| }, | ||
| { | ||
| op: "eq", | ||
| left: { op: "ident", name: ["_xact_id"] }, | ||
| right: { op: "literal", value: xactId }, | ||
| }, | ||
| ], | ||
| }, | ||
| limit: 1, | ||
| }, | ||
| brainstore_realtime: false, | ||
| query_source: `sdk_ensure_spans_flushed_de15bf`, | ||
| }, | ||
| { headers: { "Accept-Encoding": "gzip" } }, | ||
| ); | ||
| const result = await response.json(); | ||
| if (Array.isArray(result.data) && result.data.length > 0) { | ||
| return; | ||
| } | ||
|
|
||
| if (Date.now() - startedAt >= timeoutMs) { | ||
| throw new Error( | ||
| `Timed out waiting for logs3 xact ${xactId} to become queryable`, | ||
| ); | ||
| } | ||
|
|
||
| await new Promise((resolve) => setTimeout(resolve, backoffMs)); | ||
| backoffMs = Math.min(backoffMs * 2, maxBackoffMs); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Use this to specify that the dataset should actually be the data from a previous (base) experiment. | ||
| * If you do not specify a name, Braintrust will automatically figure out the best base experiment to | ||
|
|
@@ -1041,7 +1128,20 @@ async function runEvaluatorInternal( | |
| }; | ||
|
|
||
| const callback = async (rootSpan: Span) => { | ||
| const state = evaluator.state ?? _internalGetGlobalState(); | ||
| const state = | ||
| experiment?.loggingState ?? | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this now modifies the state variable so we should make sure that it works properly in the downstream callsites for |
||
| evaluator.state ?? | ||
| _internalGetGlobalState(); | ||
| const parentStr = state.currentParent.getStore(); | ||
| const parentComponents = parentStr | ||
| ? SpanComponentsV3.fromStr(parentStr) | ||
| : null; | ||
| const traceObjectType = parentComponents | ||
| ? spanObjectTypeV3ToTypedString(parentComponents.data.object_type) | ||
| : "experiment"; | ||
| const traceObjectId = | ||
| parentComponents?.data.object_id ?? | ||
| (experimentIdPromise ? ((await experimentIdPromise) ?? "") : ""); | ||
| const ensureSpansFlushed = async () => { | ||
| // Flush native Braintrust spans | ||
| if (experiment) { | ||
|
|
@@ -1056,25 +1156,23 @@ async function runEvaluatorInternal( | |
| if (state) { | ||
| await state.flushOtel(); | ||
| } | ||
| }; | ||
|
|
||
| const parentStr = state.currentParent.getStore(); | ||
| const parentComponents = parentStr | ||
| ? SpanComponentsV3.fromStr(parentStr) | ||
| : null; | ||
| const xactId = state?.bgLogger().lastFlushedXactId(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. want to double check this works in otel land
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does not :( |
||
| if (state && xactId && traceObjectId) { | ||
| await waitForLogs3XactIngestion({ | ||
CLowbrow marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| state, | ||
| objectType: traceObjectType, | ||
| objectId: traceObjectId, | ||
| rootSpanId: rootSpan.rootSpanId, | ||
| xactId, | ||
| }); | ||
| } | ||
| }; | ||
|
|
||
| const trace = state | ||
| ? new LocalTrace({ | ||
| objectType: parentComponents | ||
| ? spanObjectTypeV3ToTypedString( | ||
| parentComponents.data.object_type, | ||
| ) | ||
| : "experiment", | ||
| objectId: | ||
| parentComponents?.data.object_id ?? | ||
| (experimentIdPromise | ||
| ? ((await experimentIdPromise) ?? "") | ||
| : ""), | ||
| objectType: traceObjectType, | ||
| objectId: traceObjectId, | ||
| rootSpanId: rootSpan.rootSpanId, | ||
| ensureSpansFlushed, | ||
| state, | ||
|
|
@@ -1717,3 +1815,7 @@ const defaultReporter: ReporterDef<boolean> = { | |
| return evalReports.every((r) => r); | ||
| }, | ||
| }; | ||
|
|
||
| export const _exportsForTestingOnly = { | ||
| waitForLogs3XactIngestion, | ||
| }; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably want to verify this work for playgrounds also
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works in playgrounds locally, we probably want to push this to staging to check though