-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexporter.ts
More file actions
238 lines (212 loc) · 8.82 KB
/
exporter.ts
File metadata and controls
238 lines (212 loc) · 8.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import { ExportResult, ExportResultCode } from "@opentelemetry/core";
import { ReadableSpan, SpanExporter } from "@opentelemetry/sdk-trace-base";
import { FlowKernelRequest, PromptKernelRequest, ToolKernelRequest } from "../api";
import { HumanloopClient } from "../humanloop.client";
import {
HUMANLOOP_FILE_KEY,
HUMANLOOP_FILE_TYPE_KEY,
HUMANLOOP_FLOW_PREREQUISITES_KEY,
HUMANLOOP_LOG_KEY,
HUMANLOOP_PATH_KEY,
} from "./constants";
import { isHumanloopSpan, readFromOpenTelemetrySpan } from "./helpers";
/**
* Converts a high-resolution time tuple to a JavaScript Date object.
*
* @param hrTime - A tuple containing the high-resolution time, where the first element is the number of seconds
* and the second element is the number of nanoseconds.
* @returns A Date object representing the high-resolution time.
*/
function hrTimeToDate(hrTime: [number, number]): Date {
const [seconds, nanoseconds] = hrTime;
const secondsTotal = seconds + nanoseconds / 1e9;
return new Date(secondsTotal * 1000);
}
export class HumanloopSpanExporter implements SpanExporter {
private readonly client: HumanloopClient;
private readonly spanIdToUploadedLogId: Map<string, string | null>;
private shutdownFlag: boolean;
private readonly uploadPromises: Promise<void>[];
private readonly exportedSpans: ReadableSpan[];
// List of spans that must be uploaded before completing the Flow log
// This maps [flow log span ID] -> [set of child span IDs]
private readonly prerequisites: Map<string, Set<string>>;
constructor(client: HumanloopClient) {
this.client = client;
this.spanIdToUploadedLogId = new Map();
this.shutdownFlag = false;
this.uploadPromises = [];
this.exportedSpans = [];
this.prerequisites = new Map();
}
export(spans: ReadableSpan[]): ExportResult {
if (this.shutdownFlag) {
return {
code: ExportResultCode.FAILED,
error: new Error("Exporter is shutting down"),
};
}
for (const span of spans) {
if (isHumanloopSpan(span)) {
this.uploadPromises.push(this.exportSpanDispatch(span));
}
}
this.exportedSpans.push(...spans);
return {
code: ExportResultCode.SUCCESS,
};
}
async shutdown(): Promise<void> {
this.shutdownFlag = true;
await Promise.all(this.uploadPromises);
}
async forceFlush(): Promise<void> {
await this.shutdown();
}
/**
* Mark a span as uploaded to the Humanloop.
*
* A Log might be contained inside a Flow trace, which must be marked as complete
* when all its children are uploaded. Each Flow Log span contains a
* 'humanloop.flow.prerequisites' attribute, which is a list of all spans that must
* be uploaded before the Flow Log is marked as complete.
*
* This method finds the trace the Span belongs to and removes the Span from the list.
* Once all prerequisites are uploaded, the method marks the Flow Log as complete.
*
* @param spanId - The ID of the span that has been uploaded.
*/
private markSpanCompleted(spanId: string) {
for (const [flowLogSpanId, flowChildrenSpanIds] of this.prerequisites) {
if (flowChildrenSpanIds.has(spanId)) {
flowChildrenSpanIds.delete(spanId);
if (flowChildrenSpanIds.size === 0) {
const flowLogId = this.spanIdToUploadedLogId.get(flowLogSpanId)!;
this.client.flows.updateLog(flowLogId, { logStatus: "complete" });
}
break;
}
}
}
private async exportSpanDispatch(span: ReadableSpan): Promise<void> {
const fileType = span.attributes[HUMANLOOP_FILE_TYPE_KEY];
const parentSpanId = span.parentSpanId;
while (parentSpanId && !this.spanIdToUploadedLogId.has(parentSpanId)) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
try {
switch (fileType) {
case "prompt":
await this.exportPrompt(span);
break;
case "tool":
await this.exportTool(span);
break;
case "flow":
await this.exportFlow(span);
break;
default:
throw new Error(`Unknown span type: ${fileType}`);
}
} catch (error) {
console.error(`Failed to export span: ${error}`);
}
}
public getExportedSpans(): ReadableSpan[] {
return this.exportedSpans;
}
private async exportPrompt(span: ReadableSpan): Promise<void> {
const fileObject = readFromOpenTelemetrySpan(span, HUMANLOOP_FILE_KEY);
const logObject = readFromOpenTelemetrySpan(span, HUMANLOOP_LOG_KEY) as {
[key: string]: unknown;
};
logObject.startTime = hrTimeToDate(span.startTime);
logObject.endTime = hrTimeToDate(span.endTime);
logObject.createdAt = hrTimeToDate(span.endTime);
const path = span.attributes[HUMANLOOP_PATH_KEY] as string;
const spanParentId = span.parentSpanId;
const traceParentId =
spanParentId !== undefined
? (this.spanIdToUploadedLogId.get(spanParentId) as string)
: undefined;
const prompt: PromptKernelRequest = (fileObject.prompt ||
{}) as unknown as PromptKernelRequest;
try {
const response = await this.client.prompts.log({
path: path,
prompt,
traceParentId,
...logObject,
});
this.spanIdToUploadedLogId.set(span.spanContext().spanId, response.id);
} catch (error) {
console.error(`Error exporting prompt: ${error}`);
}
this.markSpanCompleted(span.spanContext().spanId);
}
private async exportTool(span: ReadableSpan): Promise<void> {
const fileObject = readFromOpenTelemetrySpan(span, HUMANLOOP_FILE_KEY);
const logObject = readFromOpenTelemetrySpan(span, HUMANLOOP_LOG_KEY) as {
[key: string]: unknown;
};
logObject.startTime = hrTimeToDate(span.startTime);
logObject.endTime = hrTimeToDate(span.endTime);
logObject.createdAt = hrTimeToDate(span.endTime);
const path = span.attributes[HUMANLOOP_PATH_KEY] as string;
const spanParentId = span.parentSpanId;
const traceParentId = spanParentId
? (this.spanIdToUploadedLogId.get(spanParentId) as string)
: undefined;
try {
const response = await this.client.tools.log({
path: path,
tool: fileObject.tool as ToolKernelRequest,
traceParentId,
...logObject,
});
this.spanIdToUploadedLogId.set(span.spanContext().spanId, response.id);
} catch (error) {
console.error(`Error exporting tool: ${error}`);
}
this.markSpanCompleted(span.spanContext().spanId);
}
private async exportFlow(span: ReadableSpan): Promise<void> {
const fileObject = readFromOpenTelemetrySpan(span, HUMANLOOP_FILE_KEY);
const logObject = readFromOpenTelemetrySpan(span, HUMANLOOP_LOG_KEY) as {
[key: string]: unknown;
};
logObject.startTime = hrTimeToDate(span.startTime);
logObject.endTime = hrTimeToDate(span.endTime);
logObject.createdAt = hrTimeToDate(span.endTime);
// Spans that must be uploaded before the Flow Span is completed
let prerequisites: string[] | undefined = undefined;
try {
prerequisites = readFromOpenTelemetrySpan(
span,
HUMANLOOP_FLOW_PREREQUISITES_KEY,
) as unknown as string[];
} catch (error) {
prerequisites = [];
}
this.prerequisites.set(span.spanContext().spanId, new Set(prerequisites));
const spanParentId = span.parentSpanId;
const traceParentId = spanParentId
? (this.spanIdToUploadedLogId.get(spanParentId) as string)
: undefined;
const path = span.attributes[HUMANLOOP_PATH_KEY] as string;
try {
const response = await this.client.flows.log({
path: path as string,
flow: (fileObject.flow as unknown as FlowKernelRequest) || {
attributes: {},
},
traceParentId,
...logObject,
});
this.spanIdToUploadedLogId.set(span.spanContext().spanId, response.id);
} catch (error) {
console.error("Error exporting flow: ", error, span.spanContext().spanId);
}
this.markSpanCompleted(span.spanContext().spanId);
}
}