Skip to content

Commit ef41ddf

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Fixing the spans produced by agent calls to have the right parent spans
This change fixes an issue where OpenTelemetry spans generated during agent executions were not correctly associated with their parent spans, leading to fragmented traces. ### Core Changes * **Explicit Context Propagation**: Instead of relying on `Context.current()`, which can be unreliable in asynchronous RxJava streams, the OTel `Context` is now explicitly captured at entry points (like `Runner.runAsync`) and passed down through method signatures in `BaseAgent`, `BaseLlmFlow`, and `Functions`. * **RxJava Context Support**: Introduced `Tracing.withContext(Context)`, a new set of transformers (`FlowableTransformer`, etc.) that re-activates a captured OTel context during the subscription of reactive streams. This ensures that any work done inside `flatMap` or `concatMap` remains within the correct trace. * **Synchronous Scope Management**: Wrapped direct calls to plugins and tool callbacks in `try-with-resources` blocks using `context.makeCurrent()` to ensure the tracing context is active during synchronous execution. * **Tracing Enhancements**: * Updated `TracerProvider` to propagate the agent name via a new `AGENT_NAME_CONTEXT_KEY`. * Improved agent name retrieval from spans using reflection (supporting `ReadableSpan`) when it's not explicitly available in the context. * Modified span lifecycle management to start the span immediately upon subscription setup (via `Flowable.defer`) rather than waiting for `doOnSubscribe`. ### Impacted Areas * **`BaseAgent` & `BaseLlmFlow`**: Now strictly pass the parent context to all internal stages (preprocessing, model calls, postprocessing, and callbacks). * **`Runner`**: Entry points for `runAsync` and `runLive` are now consistently wrapped in an `"invocation"` span that serves as the root for the agent's work. * **`PluginManager`**: Ensures that plugin-provided callbacks are executed within the trace context captured when the callback was triggered. * **`Functions`**: Tool execution, including before/after callbacks and response event building, is now correctly parented. ### Note on Reviewer Comments The reviewer noted a `ComplianceLint` warning regarding pending files in `Functions.java` that needs to be addressed by taking a snapshot. PiperOrigin-RevId: 879355958
1 parent 11ce49e commit ef41ddf

6 files changed

Lines changed: 519 additions & 281 deletions

File tree

core/src/main/java/com/google/adk/agents/BaseAgent.java

Lines changed: 60 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.errorprone.annotations.CanIgnoreReturnValue;
3030
import com.google.errorprone.annotations.DoNotCall;
3131
import com.google.genai.types.Content;
32+
import io.opentelemetry.context.Context;
3233
import io.reactivex.rxjava3.core.Completable;
3334
import io.reactivex.rxjava3.core.Flowable;
3435
import io.reactivex.rxjava3.core.Maybe;
@@ -312,38 +313,46 @@ public Flowable<Event> runAsync(InvocationContext parentContext) {
312313
private Flowable<Event> run(
313314
InvocationContext parentContext,
314315
Function<InvocationContext, Flowable<Event>> runImplementation) {
316+
Context otelParentContext = Context.current();
317+
InvocationContext invocationContext = createInvocationContext(parentContext);
318+
315319
return Flowable.defer(
316-
() -> {
317-
InvocationContext invocationContext = createInvocationContext(parentContext);
318-
319-
return callCallback(
320-
beforeCallbacksToFunctions(
321-
invocationContext.pluginManager(), beforeAgentCallback),
322-
invocationContext)
323-
.flatMapPublisher(
324-
beforeEventOpt -> {
325-
if (invocationContext.endInvocation()) {
326-
return Flowable.fromOptional(beforeEventOpt);
327-
}
328-
329-
Flowable<Event> beforeEvents = Flowable.fromOptional(beforeEventOpt);
330-
Flowable<Event> mainEvents =
331-
Flowable.defer(() -> runImplementation.apply(invocationContext));
332-
Flowable<Event> afterEvents =
333-
Flowable.defer(
334-
() ->
335-
callCallback(
336-
afterCallbacksToFunctions(
337-
invocationContext.pluginManager(), afterAgentCallback),
338-
invocationContext)
339-
.flatMapPublisher(Flowable::fromOptional));
340-
341-
return Flowable.concat(beforeEvents, mainEvents, afterEvents);
342-
})
343-
.compose(
344-
Tracing.traceAgent(
345-
"invoke_agent " + name(), name(), description(), invocationContext));
346-
});
320+
() -> {
321+
return callCallback(
322+
beforeCallbacksToFunctions(
323+
invocationContext.pluginManager(), beforeAgentCallback),
324+
invocationContext,
325+
otelParentContext)
326+
.flatMapPublisher(
327+
beforeEventOpt -> {
328+
if (invocationContext.endInvocation()) {
329+
return Flowable.fromOptional(beforeEventOpt);
330+
}
331+
332+
Flowable<Event> beforeEvents = Flowable.fromOptional(beforeEventOpt);
333+
Flowable<Event> mainEvents =
334+
runImplementation
335+
.apply(invocationContext)
336+
.compose(Tracing.withContext(otelParentContext));
337+
Flowable<Event> afterEvents =
338+
callCallback(
339+
afterCallbacksToFunctions(
340+
invocationContext.pluginManager(), afterAgentCallback),
341+
invocationContext,
342+
otelParentContext)
343+
.flatMapPublisher(Flowable::fromOptional)
344+
.compose(Tracing.withContext(otelParentContext));
345+
346+
return Flowable.concat(beforeEvents, mainEvents, afterEvents);
347+
});
348+
})
349+
.compose(
350+
Tracing.traceAgent(
351+
otelParentContext,
352+
"invoke_agent " + name(),
353+
name(),
354+
description(),
355+
invocationContext));
347356
}
348357

349358
/**
@@ -387,7 +396,8 @@ private <T> ImmutableList<Function<CallbackContext, Maybe<Content>>> callbacksTo
387396
*/
388397
private Single<Optional<Event>> callCallback(
389398
List<Function<CallbackContext, Maybe<Content>>> agentCallbacks,
390-
InvocationContext invocationContext) {
399+
InvocationContext invocationContext,
400+
Context otelParentContext) {
391401
if (agentCallbacks.isEmpty()) {
392402
return Single.just(Optional.empty());
393403
}
@@ -397,25 +407,24 @@ private Single<Optional<Event>> callCallback(
397407

398408
return Flowable.fromIterable(agentCallbacks)
399409
.concatMap(
400-
callback -> {
401-
Maybe<Content> maybeContent = callback.apply(callbackContext);
402-
403-
return maybeContent
404-
.map(
405-
content -> {
406-
invocationContext.setEndInvocation(true);
407-
return Optional.of(
408-
Event.builder()
409-
.id(Event.generateEventId())
410-
.invocationId(invocationContext.invocationId())
411-
.author(name())
412-
.branch(invocationContext.branch())
413-
.actions(callbackContext.eventActions())
414-
.content(content)
415-
.build());
416-
})
417-
.toFlowable();
418-
})
410+
callback ->
411+
callback
412+
.apply(callbackContext)
413+
.map(
414+
content -> {
415+
invocationContext.setEndInvocation(true);
416+
return Optional.of(
417+
Event.builder()
418+
.id(Event.generateEventId())
419+
.invocationId(invocationContext.invocationId())
420+
.author(name())
421+
.branch(invocationContext.branch())
422+
.actions(callbackContext.eventActions())
423+
.content(content)
424+
.build());
425+
})
426+
.toFlowable()
427+
.compose(Tracing.withContext(otelParentContext)))
419428
.firstElement()
420429
.switchIfEmpty(
421430
Single.defer(

0 commit comments

Comments
 (0)