feat: Add agentscope integration for tracing#147
feat: Add agentscope integration for tracing#147Abhijeet Prasad (AbhiPrasad) wants to merge 4 commits intomainfrom
Conversation
32ee952 to
27dc963
Compare
| return False | ||
|
|
||
|
|
||
| async def _model_call_wrapper(wrapped: Any, instance: Any, args: Any, kwargs: dict[str, Any]) -> Any: |
There was a problem hiding this comment.
Span ends before the stream is consumed when using the current AgentScope streaming pattern. The span's metrics.end timestamp is recorded before output is logged, producing a near-zero duration even though the stream took 300ms.
Demo example:
import asyncio
from contextlib import aclosing
from braintrust import logger
from braintrust.integrations.agentscope.tracing import _model_call_wrapper
from braintrust.logger import start_span
from braintrust.test_helpers import init_test_logger
init_test_logger("demo")
class FakeModel:
model_name = "gpt-4o-mini"
async def slow_stream():
for i in range(3):
await asyncio.sleep(0.1)
yield {"content": [{"type": "text", "text": f"chunk-{i}"}]}
async def wrapped_streaming(*_args, **_kwargs):
return slow_stream()
async def main():
with logger._internal_with_memory_background_logger() as bgl:
stream = await _model_call_wrapper(
wrapped_streaming, FakeModel(), args=([{"role": "user", "content": "hi"}],), kwargs={}
)
async for _ in stream:
pass
spans = bgl.pop()
span = spans[0]
m = span.get("metrics", {})
duration_ms = round((m.get("end", 0) - m.get("start", 0)) * 1000)
print(f"duration={duration_ms}ms output={span.get('output')}")
print(f"bug confirmed: span ended {duration_ms}ms after start, output logged after end")
asyncio.run(main())There was a problem hiding this comment.
I think this might fix the issue (at least the demo file finds ~300ms instead of ~0ms).
Also apply a similar change to _toolkit_call_tool_function_wrapper.
async def _model_call_wrapper(wrapped: Any, instance: Any, args: Any, kwargs: dict[str, Any]) -> Any:
- with start_span(
- name=f"{_model_provider_name(instance)}.call",
- type=SpanTypeAttribute.LLM,
- input=_model_call_input(args, kwargs),
- metadata=_model_call_metadata(instance, kwargs),
- ) as span:
+ with contextlib.ExitStack() as stack:
+ span = stack.enter_context(
+ start_span(
+ name=f"{_model_provider_name(instance)}.call",
+ type=SpanTypeAttribute.LLM,
+ input=_model_call_input(args, kwargs),
+ metadata=_model_call_metadata(instance, kwargs),
+ )
+ )
try:
result = await wrapped(*args, **kwargs)
if _is_async_iterator(result):
+ deferred = stack.pop_all()
async def _trace():
- last_chunk = None
- async with aclosing(result) as agen:
- async for chunk in agen:
- last_chunk = chunk
- yield chunk
- if last_chunk is not None:
- span.log(output=_model_call_output(last_chunk), metrics=_extract_metrics(last_chunk))
+ with deferred:
+ last_chunk = None
+ async with aclosing(result) as agen:
+ async for chunk in agen:
+ last_chunk = chunk
+ yield chunk
+ if last_chunk is not None:
+ span.log(output=_model_call_output(last_chunk), metrics=_extract_metrics(last_chunk))
return _trace()fd6b14b to
8fff014
Compare
There was a problem hiding this comment.
Factory function instead of 10 almost identical classes?
There was a problem hiding this comment.
for this kind of work, I prefer against functions. Makes git diffs harder to read, and the duplication here is fine. I consider this basically "config" classes.
| return str(getattr(tool_call, "name", "unknown_tool")) | ||
|
|
||
|
|
||
| async def _agent_call_wrapper(wrapped: Any, instance: Any, args: Any, kwargs: dict[str, Any]) -> Any: |
There was a problem hiding this comment.
_agent_call_wrapper, _sequential_pipeline_wrapper and _fanout_pipeline_wrapper are almost the same, only name and metadata change, factory function?
| if _is_async_iterator(result): | ||
| deferred = stack.pop_all() | ||
|
|
||
| async def _trace(): |
There was a problem hiding this comment.
diff <(sed -n '264,274p' tracing.py) <(sed -n '223,233p' tracing.py)
9c9< span.log(output=_model_call_output(last_chunk), metrics=_extract_metrics(last_chunk))
---
> span.log(output=last_chunk)the async def _trace() functions are almost identical, though with only 2 occurrences I'm not sure a common helper function is needed.
resolves #145