Skip to content

Commit f0bf780

Browse files
author
Andrei Bratu
authored
Merge pull request #9 from humanloop/flow-complete-fix
Flow complete fix
2 parents d3dbafa + 8feeed5 commit f0bf780

8 files changed

Lines changed: 254 additions & 297 deletions

File tree

package.json

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,56 +12,56 @@
1212
"test": "jest"
1313
},
1414
"dependencies": {
15-
"url-join": "4.0.1",
16-
"form-data": "^4.0.0",
17-
"formdata-node": "^6.0.3",
18-
"node-fetch": "2.7.0",
19-
"qs": "6.11.2",
20-
"readable-stream": "^4.5.2",
21-
"form-data-encoder": "^4.0.2",
2215
"@opentelemetry/api": "1.9.0",
2316
"@opentelemetry/auto-instrumentations-node": "0.53.0",
2417
"@opentelemetry/sdk-metrics": "1.28.0",
2518
"@opentelemetry/sdk-node": "0.55.0",
2619
"@opentelemetry/sdk-trace-node": "1.28.0",
20+
"@traceloop/ai-semantic-conventions": "0.11.6",
2721
"@traceloop/instrumentation-anthropic": "0.11.1",
2822
"@traceloop/instrumentation-cohere": "0.11.1",
2923
"@traceloop/instrumentation-openai": "0.11.3",
30-
"@traceloop/ai-semantic-conventions": "0.11.6",
31-
"uuid": "11.0.3",
32-
"nanoid": "5.0.9",
3324
"cli-progress": "^3.12.0",
25+
"form-data": "^4.0.0",
26+
"form-data-encoder": "^4.0.2",
27+
"formdata-node": "^6.0.3",
3428
"lodash": "4.17.21",
29+
"nanoid": "5.0.9",
30+
"node-fetch": "2.7.0",
3531
"p-map": "7.0.3",
36-
"stable-hash": "0.0.4"
32+
"qs": "6.11.2",
33+
"readable-stream": "^4.5.2",
34+
"stable-hash": "0.0.4",
35+
"url-join": "4.0.1",
36+
"uuid": "11.0.3"
3737
},
3838
"devDependencies": {
39-
"@types/url-join": "4.0.1",
40-
"@types/qs": "6.9.8",
39+
"@anthropic-ai/sdk": "^0.32.1",
40+
"@babel/core": "^7.26.0",
41+
"@babel/plugin-transform-modules-commonjs": "^7.26.3",
42+
"@babel/preset-env": "^7.26.0",
43+
"@trivago/prettier-plugin-sort-imports": "4.3.0",
44+
"@types/cli-progress": "^3.11.6",
45+
"@types/jest": "29.5.5",
46+
"@types/lodash": "4.14.74",
47+
"@types/node": "17.0.33",
4148
"@types/node-fetch": "2.6.9",
49+
"@types/qs": "6.9.8",
4250
"@types/readable-stream": "^4.0.15",
51+
"@types/url-join": "4.0.1",
52+
"babel-jest": "^29.7.0",
53+
"cohere-ai": "^7.15.0",
54+
"dotenv": "^16.4.6",
4355
"fetch-mock-jest": "^1.5.1",
44-
"webpack": "^5.94.0",
45-
"ts-loader": "^9.3.1",
4656
"jest": "29.7.0",
47-
"@types/jest": "29.5.5",
48-
"ts-jest": "29.1.1",
4957
"jest-environment-jsdom": "29.7.0",
50-
"@types/node": "17.0.33",
58+
"jsonschema": "^1.4.1",
59+
"openai": "^4.74.0",
5160
"prettier": "^3.4.2",
61+
"ts-jest": "29.1.1",
62+
"ts-loader": "^9.3.1",
5263
"typescript": "4.6.4",
53-
"openai": "^4.74.0",
54-
"@anthropic-ai/sdk": "^0.32.1",
55-
"cohere-ai": "^7.15.0",
56-
"dotenv": "^16.4.6",
57-
"jsonschema": "^1.4.1",
58-
"@types/cli-progress": "^3.11.6",
59-
"babel-jest": "^29.7.0",
60-
"@babel/core": "^7.26.0",
61-
"@babel/plugin-transform-modules-commonjs": "^7.26.3",
62-
"@babel/preset-env": "^7.26.0",
63-
"@types/lodash": "4.14.74",
64-
"@trivago/prettier-plugin-sort-imports": "4.3.0"
64+
"webpack": "^5.94.0"
6565
},
6666
"browser": {
6767
"fs": false,

src/otel/constants.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,9 @@ export const HUMANLOOP_LOG_KEY = "humanloop.log";
55
export const HUMANLOOP_FILE_TYPE_KEY = "humanloop.file_type";
66
export const HUMANLOOP_PATH_KEY = "humanloop.file.path";
77
export const HUMANLOOP_META_FUNCTION_NAME = "humanloop.meta.function_name";
8-
export const HUMANLOOP_PARENT_SPAN_CTX_KEY = "humanloop.context.parentSpanId";
9-
export const HUMANLOOP_TRACE_FLOW_CTX_KEY = "humanloop.context.traceFlow";
8+
export const HUMANLOOP_FLOW_PREREQUISITES_KEY = "humanloop.flow.prerequisites";
9+
10+
export const HUMANLOOP_SPAN_PREFIX = "humanloop.";
11+
export const HUMANLOOP_FLOW_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}flow`;
12+
export const HUMANLOOP_PROMPT_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}prompt`;
13+
export const HUMANLOOP_TOOL_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}tool`;

src/otel/exporter.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { HumanloopClient } from "../humanloop.client";
66
import {
77
HUMANLOOP_FILE_KEY,
88
HUMANLOOP_FILE_TYPE_KEY,
9+
HUMANLOOP_FLOW_PREREQUISITES_KEY,
910
HUMANLOOP_LOG_KEY,
1011
HUMANLOOP_PATH_KEY,
1112
} from "./constants";
@@ -30,13 +31,17 @@ export class HumanloopSpanExporter implements SpanExporter {
3031
private shutdownFlag: boolean;
3132
private readonly uploadPromises: Promise<void>[];
3233
private readonly exportedSpans: ReadableSpan[];
34+
// List of spans that must be uploaded before completing the Flow log
35+
// This maps [flow log span ID] -> [set of child span IDs]
36+
private readonly prerequisites: Map<string, Set<string>>;
3337

3438
constructor(client: HumanloopClient) {
3539
this.client = client;
3640
this.spanIdToUploadedLogId = new Map();
3741
this.shutdownFlag = false;
3842
this.uploadPromises = [];
3943
this.exportedSpans = [];
44+
this.prerequisites = new Map();
4045
}
4146

4247
export(spans: ReadableSpan[]): ExportResult {
@@ -69,6 +74,32 @@ export class HumanloopSpanExporter implements SpanExporter {
6974
await this.shutdown();
7075
}
7176

77+
/**
78+
* Mark a span as uploaded to the Humanloop.
79+
*
80+
* A Log might be contained inside a Flow trace, which must be marked as complete
81+
* when all its children are uploaded. Each Flow Log span contains a
82+
* 'humanloop.flow.prerequisites' attribute, which is a list of all spans that must
83+
* be uploaded before the Flow Log is marked as complete.
84+
*
85+
* This method finds the trace the Span belongs to and removes the Span from the list.
86+
* Once all prerequisites are uploaded, the method marks the Flow Log as complete.
87+
*
88+
* @param spanId - The ID of the span that has been uploaded.
89+
*/
90+
private markSpanCompleted(spanId: string) {
91+
for (const [flowLogSpanId, flowChildrenSpanIds] of this.prerequisites) {
92+
if (flowChildrenSpanIds.has(spanId)) {
93+
flowChildrenSpanIds.delete(spanId);
94+
if (flowChildrenSpanIds.size === 0) {
95+
const flowLogId = this.spanIdToUploadedLogId.get(flowLogSpanId)!;
96+
this.client.flows.updateLog(flowLogId, { traceStatus: "complete" });
97+
}
98+
break;
99+
}
100+
}
101+
}
102+
72103
private async exportSpanDispatch(span: ReadableSpan): Promise<void> {
73104
const fileType = span.attributes[HUMANLOOP_FILE_TYPE_KEY];
74105
const parentSpanId = span.parentSpanId;
@@ -130,6 +161,7 @@ export class HumanloopSpanExporter implements SpanExporter {
130161
} catch (error) {
131162
console.error(`Error exporting prompt: ${error}`);
132163
}
164+
this.markSpanCompleted(span.spanContext().spanId);
133165
}
134166

135167
private async exportTool(span: ReadableSpan): Promise<void> {
@@ -158,6 +190,7 @@ export class HumanloopSpanExporter implements SpanExporter {
158190
} catch (error) {
159191
console.error(`Error exporting tool: ${error}`);
160192
}
193+
this.markSpanCompleted(span.spanContext().spanId);
161194
}
162195

163196
private async exportFlow(span: ReadableSpan): Promise<void> {
@@ -168,6 +201,18 @@ export class HumanloopSpanExporter implements SpanExporter {
168201
logObject.startTime = hrTimeToDate(span.startTime);
169202
logObject.endTime = hrTimeToDate(span.endTime);
170203
logObject.createdAt = hrTimeToDate(span.endTime);
204+
// Spans that must be uploaded before the Flow Span is completed
205+
let prerequisites: string[] | undefined = undefined;
206+
try {
207+
prerequisites = readFromOpenTelemetrySpan(
208+
span,
209+
HUMANLOOP_FLOW_PREREQUISITES_KEY,
210+
) as unknown as string[];
211+
} catch (error) {
212+
prerequisites = [];
213+
}
214+
215+
this.prerequisites.set(span.spanContext().spanId, new Set(prerequisites));
171216

172217
const spanParentId = span.parentSpanId;
173218
const traceParentId = spanParentId
@@ -188,5 +233,6 @@ export class HumanloopSpanExporter implements SpanExporter {
188233
} catch (error) {
189234
console.error("Error exporting flow: ", error, span.spanContext().spanId);
190235
}
236+
this.markSpanCompleted(span.spanContext().spanId);
191237
}
192238
}

src/otel/helpers.ts

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import { AttributeValue, SpanKind } from "@opentelemetry/api";
22
import { ReadableSpan } from "@opentelemetry/sdk-trace-base";
3-
import { v4 as uuidv4 } from "uuid";
43

54
// Constants for Humanloop attributes
6-
import { HUMANLOOP_FILE_TYPE_KEY } from "./constants";
5+
import { HUMANLOOP_SPAN_PREFIX } from "./constants";
76

87
export type NestedDict = { [key: string]: NestedDict | AttributeValue };
98
export type NestedList = Array<NestedDict | AttributeValue>;
@@ -34,7 +33,7 @@ function _listToOtelFormat(lst: NestedList): NestedDict {
3433
*/
3534
export function writeToOpenTelemetrySpan(
3635
span: ReadableSpan,
37-
value: NestedDict | NestedList | AttributeValue,
36+
value: NestedDict | NestedList | AttributeValue | any[],
3837
key: string,
3938
): void {
4039
let toWriteCopy: NestedDict;
@@ -191,16 +190,7 @@ export function isLLMProviderCall(span: ReadableSpan): boolean {
191190
* @returns True if the span was created by the Humanloop SDK, false otherwise
192191
*/
193192
export function isHumanloopSpan(span: ReadableSpan): boolean {
194-
return span.attributes[HUMANLOOP_FILE_TYPE_KEY] !== undefined;
195-
}
196-
197-
/**
198-
* Generates a unique span ID.
199-
*
200-
* @returns A UUID string
201-
*/
202-
export function generateSpanId(): string {
203-
return uuidv4();
193+
return span.name.startsWith(HUMANLOOP_SPAN_PREFIX);
204194
}
205195

206196
/**

src/otel/processor.ts

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { PromptKernelRequest } from "../api/types/PromptKernelRequest";
1313
import {
1414
HUMANLOOP_FILE_KEY,
1515
HUMANLOOP_FILE_TYPE_KEY,
16+
HUMANLOOP_FLOW_SPAN_NAME,
1617
HUMANLOOP_LOG_KEY,
1718
HUMANLOOP_META_FUNCTION_NAME,
1819
} from "./constants";
@@ -36,15 +37,37 @@ interface CompletableSpan {
3637
export class HumanloopSpanProcessor implements SpanProcessor {
3738
private spanExporter: SpanExporter;
3839
private children: Map<string, CompletableSpan[]>;
40+
// List of all span IDs that are contained in a Flow trace
41+
// They are passed to the Exporter as a span attribute
42+
// so the Exporter knows when to complete a trace
43+
private prerequisites: Map<string, string[]>;
3944

4045
constructor(exporter: SpanExporter) {
4146
this.spanExporter = exporter;
4247
this.children = new Map();
48+
this.prerequisites = new Map();
4349
}
4450

4551
async forceFlush(): Promise<void> {}
4652

4753
onStart(span: Span, _: Context): void {
54+
const spanId = span.spanContext().spanId;
55+
const parentSpanId = span.parentSpanId;
56+
if (span.name === HUMANLOOP_FLOW_SPAN_NAME) {
57+
this.prerequisites.set(spanId, []);
58+
}
59+
if (parentSpanId !== undefined && isHumanloopSpan(span)) {
60+
for (const [traceHead, allTraceNodes] of this.prerequisites) {
61+
if (
62+
parentSpanId === traceHead ||
63+
allTraceNodes.includes(parentSpanId)
64+
) {
65+
allTraceNodes.push(spanId);
66+
this.prerequisites.set(traceHead, allTraceNodes);
67+
break;
68+
}
69+
}
70+
}
4871
// Handle stream case: when Prompt instrumented function calls a provider with streaming: true
4972
// The instrumentor span will end only when the ChunksResponse is consumed, which can happen
5073
// after the span created by the Prompt utility finishes. To handle this, we register all instrumentor
@@ -66,6 +89,7 @@ export class HumanloopSpanProcessor implements SpanProcessor {
6689
*/
6790
onEnd(span: ReadableSpan): void {
6891
if (isHumanloopSpan(span)) {
92+
// Wait for children to complete asynchronously
6993
new Promise<void>((resolve) => {
7094
const checkChildrenSpans = () => {
7195
const childrenSpans = this.children.get(span.spanContext().spanId);
@@ -79,15 +103,28 @@ export class HumanloopSpanProcessor implements SpanProcessor {
79103
};
80104
checkChildrenSpans();
81105
}).then((_) => {
82-
// All children/ instrumentor spans have arrived, we can process the
106+
// All instrumentor spans have arrived, we can process the
83107
// Humanloop parent span owning them
108+
if (span.name === HUMANLOOP_FLOW_SPAN_NAME) {
109+
// If the span if a Flow Log, add attribute with all span IDs it
110+
// needs to wait before completion
111+
writeToOpenTelemetrySpan(
112+
span,
113+
this.prerequisites.get(span.spanContext().spanId) || [],
114+
HUMANLOOP_LOG_KEY,
115+
);
116+
this.prerequisites.delete(span.spanContext().spanId);
117+
}
118+
84119
this.processSpanDispatch(
85120
span,
86121
this.children.get(span.spanContext().spanId) || [],
87122
);
123+
88124
// Release references
89125
this.children.delete(span.spanContext().spanId);
90-
// Export the Humanloop span
126+
127+
// Pass Humanloop span to Exporter
91128
this.spanExporter.export([span], (result: ExportResult) => {
92129
if (result.code !== ExportResultCode.SUCCESS) {
93130
console.error("Failed to export span:", result.error);
@@ -182,7 +219,7 @@ export class HumanloopSpanProcessor implements SpanProcessor {
182219
// Placeholder for processing other file types
183220
break;
184221
default:
185-
console.error("Unknown Humanloop File Span", span);
222+
console.error("Unknown Humanloop File span", span);
186223
}
187224
}
188225

0 commit comments

Comments
 (0)