Skip to content

Commit 2cb64f7

Browse files
author
Andrei Bratu
committed
added explanations on OTEL processor and exporter
1 parent 6871044 commit 2cb64f7

File tree

3 files changed

+141
-47
lines changed

3 files changed

+141
-47
lines changed

src/humanloop/otel/exporter.py

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,25 @@
3636
class HumanloopSpanExporter(SpanExporter):
3737
"""Upload Spans created by SDK decorators to Humanloop.
3838
39-
Spans not created by Humanloop SDK decorators will be ignored.
39+
Spans not created by Humanloop SDK decorators will be dropped.
40+
41+
Each Humanloop Span contains information about the File to log against and
42+
the Log to create. We are using the .log actions that pass the kernel in the
43+
request. This allows us to create new Versions if the decorated function
44+
is changed.
45+
46+
The exporter uploads Spans top-to-bottom, where a Span is uploaded only after
47+
its parent Span has been uploaded. This is necessary for Flow Traces, where
48+
the parent Span is a Flow Log and the children are the Logs in the Trace.
49+
50+
The exporter keeps an upload queue and only uploads a Span if its direct parent has
51+
been uploaded.
4052
"""
53+
# NOTE: LLM Instrumentors will only intercept calls to the provider made via the
54+
# offical libraries e.g. import openai from openai. This is 100% the reason why
55+
# prompt call is not intercepted by the Instrumentor. The way to fix this is likely
56+
# overriding the hl_client.prompt.call utility. @James I'll do this since it will
57+
# involve looking at the EvaluationContext deep magic.
4158

4259
DEFAULT_NUMBER_THREADS = 4
4360

@@ -65,19 +82,20 @@ def __init__(
6582
for _ in range(worker_threads or self.DEFAULT_NUMBER_THREADS)
6683
]
6784
# Signals threads no more work will arrive and
68-
# they should wind down if the queue is empty
85+
# they should wind down after they empty the queue
6986
self._shutdown: bool = False
7087
for thread in self._threads:
7188
thread.start()
7289
logger.debug("Exporter Thread %s started", thread.ident)
7390
# Flow Log Span ID mapping to children Spans that must be uploaded first
74-
self._flow_log_prerequisites: dict[int, set[int]] = {}
91+
self._spans_to_complete_flow_trace: dict[int, set[int]] = {}
7592

7693
def export(self, spans: trace.Sequence[ReadableSpan]) -> SpanExportResult:
7794
def is_evaluated_file(
7895
span: ReadableSpan,
7996
evaluation_context: Optional[EvaluationContext],
8097
) -> bool:
98+
"""Check if the Span corresponds to a File evaluated by the run utility."""
8199
if evaluation_context is None:
82100
return False
83101

@@ -87,6 +105,8 @@ def is_evaluated_file(
87105
try:
88106
evaluation_context = self._client.evaluation_context_variable.get()
89107
if len(spans) > 1:
108+
# Note: the HL logging and run utilities all send a single span
109+
# export accepts multiple spans to adhere to OTEL API
90110
raise RuntimeError("HumanloopSpanExporter expected a single span when running an evaluation")
91111
if not is_evaluated_file(spans[0], evaluation_context):
92112
evaluation_context = None
@@ -98,6 +118,8 @@ def is_evaluated_file(
98118
# We pass the EvaluationContext from the eval_run utility thread to
99119
# the export thread so the .log action works as expected
100120
evaluation_context_copy = None
121+
# Deep magic: the evaluation context is thread-specific global store of data
122+
# We need to copy it to the Exporter threads so the .log action works as expected
101123
for context_var, context_var_value in contextvars.copy_context().items():
102124
if context_var.name == EVALUATION_CONTEXT_VARIABLE_NAME:
103125
evaluation_context_copy = context_var_value
@@ -125,6 +147,18 @@ def is_evaluated_file(
125147
spans[0].attributes,
126148
)
127149
# Mark the EvaluationContext as used
150+
# run utility will set an EvaluationContext per thread
151+
# we mark it as None so the run utility is notified that
152+
# the Exporter will handle logging (not guaranteed to succeed)
153+
# NOTE: This is how we avoid double logging: the run utility
154+
# calls the File.callable, then checks if the EvaluationContext
155+
# is None. If it is, the callable is not a decorated prompt, nor it
156+
# contains log statements, so run manually creates a Log.
157+
# NOTE NOTE: This is proof OTEL logic runs on same thread: run utility
158+
# knows EvaluationContext is None after calling File.callable, so it must have ran
159+
# all the way here. Wondering if the async waiting in HLProcessor breaks this assumption
160+
# thus creating the bug. Maybe run utility should wait for a bit too before checking
161+
# if the EvaluationContext is None.
128162
self._client.evaluation_context_variable.set(None)
129163
return SpanExportResult.SUCCESS
130164
else:
@@ -175,6 +209,7 @@ def _do_work(self):
175209
# not resetting the EvaluationContext in the scope of the export thread
176210
self._client.evaluation_context_variable.set(evaluation_context)
177211
except EmptyQueue:
212+
# Wait for the another span to arrive
178213
continue
179214
if span_to_export.parent is None:
180215
# Span is not part of a Flow Log
@@ -199,8 +234,13 @@ def _do_work(self):
199234
self._upload_queue.put((span_to_export, evaluation_context))
200235
self._upload_queue.task_done()
201236

202-
def _mark_span_completed(self, span_id: int) -> None:
203-
for flow_log_span_id, flow_children_span_ids in self._flow_log_prerequisites.items():
237+
def _mark_span_as_uploaded(self, span_id: int) -> None:
238+
"""Mark a Span as uploaded for Flow trace completion.
239+
240+
If this Span corresponds to the last child in the Flow trace,
241+
mark the Flow Log as complete.
242+
"""
243+
for flow_log_span_id, flow_children_span_ids in self._spans_to_complete_flow_trace.items():
204244
if span_id in flow_children_span_ids:
205245
flow_children_span_ids.remove(span_id)
206246
if len(flow_children_span_ids) == 0:
@@ -216,6 +256,7 @@ def _mark_span_completed(self, span_id: int) -> None:
216256
break
217257

218258
def _export_span_dispatch(self, span: ReadableSpan) -> None:
259+
"""Call the appropriate BaseHumanloop.X.log based on the Span type."""
219260
hl_file = read_from_opentelemetry_span(span, key=HUMANLOOP_FILE_KEY)
220261
file_type = span._attributes.get(HUMANLOOP_FILE_TYPE_KEY) # type: ignore
221262
parent_span_id = span.parent.span_id if span.parent else None
@@ -236,16 +277,16 @@ def _export_span_dispatch(self, span: ReadableSpan) -> None:
236277
)
237278

238279
if file_type == "prompt":
239-
export_func = self._export_prompt
280+
export_func = self._export_prompt_span
240281
elif file_type == "tool":
241-
export_func = self._export_tool
282+
export_func = self._export_tool_span
242283
elif file_type == "flow":
243-
export_func = self._export_flow
284+
export_func = self._export_flow_span
244285
else:
245286
raise NotImplementedError(f"Unknown span type: {hl_file}")
246287
export_func(span=span)
247288

248-
def _export_prompt(self, span: ReadableSpan) -> None:
289+
def _export_prompt_span(self, span: ReadableSpan) -> None:
249290
file_object: dict[str, Any] = read_from_opentelemetry_span(
250291
span,
251292
key=HUMANLOOP_FILE_KEY,
@@ -254,8 +295,8 @@ def _export_prompt(self, span: ReadableSpan) -> None:
254295
span,
255296
key=HUMANLOOP_LOG_KEY,
256297
)
257-
# NOTE: Due to OTel conventions, attributes with value of None are removed
258-
# If not present, instantiate as empty dictionary
298+
# NOTE: Due to OTEL conventions, attributes with value of None are removed
299+
# on write to Span. If not present, instantiate these as empty
259300
if "inputs" not in log_object:
260301
log_object["inputs"] = {}
261302
if "messages" not in log_object:
@@ -282,9 +323,9 @@ def _export_prompt(self, span: ReadableSpan) -> None:
282323
self._span_id_to_uploaded_log_id[span.context.span_id] = log_response.id
283324
except HumanloopApiError:
284325
self._span_id_to_uploaded_log_id[span.context.span_id] = None
285-
self._mark_span_completed(span_id=span.context.span_id)
326+
self._mark_span_as_uploaded(span_id=span.context.span_id)
286327

287-
def _export_tool(self, span: ReadableSpan) -> None:
328+
def _export_tool_span(self, span: ReadableSpan) -> None:
288329
file_object: dict[str, Any] = read_from_opentelemetry_span(
289330
span,
290331
key=HUMANLOOP_FILE_KEY,
@@ -301,6 +342,7 @@ def _export_tool(self, span: ReadableSpan) -> None:
301342
trace_parent_id = self._span_id_to_uploaded_log_id[span_parent_id] if span_parent_id else None
302343

303344
# API expects an empty dictionary if user does not supply attributes
345+
# NOTE: see comment in _export_prompt_span about OTEL conventions
304346
if not tool.get("attributes"):
305347
tool["attributes"] = {}
306348
if not tool.get("setup_values"):
@@ -318,9 +360,9 @@ def _export_tool(self, span: ReadableSpan) -> None:
318360
self._span_id_to_uploaded_log_id[span.context.span_id] = log_response.id
319361
except HumanloopApiError:
320362
self._span_id_to_uploaded_log_id[span.context.span_id] = None
321-
self._mark_span_completed(span_id=span.context.span_id)
363+
self._mark_span_as_uploaded(span_id=span.context.span_id)
322364

323-
def _export_flow(self, span: ReadableSpan) -> None:
365+
def _export_flow_span(self, span: ReadableSpan) -> None:
324366
file_object: dict[str, Any] = read_from_opentelemetry_span(
325367
span,
326368
key=HUMANLOOP_FILE_KEY,
@@ -330,14 +372,17 @@ def _export_flow(self, span: ReadableSpan) -> None:
330372
key=HUMANLOOP_LOG_KEY,
331373
)
332374
# Spans that must be uploaded before the Flow Span is completed
375+
# We instantiate the list of prerequisites from the attribute
376+
# passed by the Processor. Each uploaded child in the trace
377+
# will check if it's the last one and mark the Flow Log as complete
333378
try:
334379
prerequisites: list[int] = read_from_opentelemetry_span( # type: ignore
335380
span=span,
336381
key=HUMANLOOP_FLOW_PREREQUISITES_KEY,
337382
)
338-
self._flow_log_prerequisites[span.context.span_id] = set(prerequisites)
383+
self._spans_to_complete_flow_trace[span.context.span_id] = set(prerequisites)
339384
except KeyError:
340-
self._flow_log_prerequisites[span.context.span_id] = set()
385+
self._spans_to_complete_flow_trace[span.context.span_id] = set()
341386

342387
path: str = file_object["path"]
343388
flow: FlowKernelRequestParams
@@ -362,4 +407,4 @@ def _export_flow(self, span: ReadableSpan) -> None:
362407
except HumanloopApiError as e:
363408
logger.error(str(e))
364409
self._span_id_to_uploaded_log_id[span.context.span_id] = None
365-
self._mark_span_completed(span_id=span.context.span_id)
410+
self._mark_span_as_uploaded(span_id=span.context.span_id)

0 commit comments

Comments
 (0)