Skip to content

Commit feec842

Browse files
smoreinisstainless-app[bot]declan-scalemichael-chou359
authored
perf(tracing): span queue linger + per-loop httpx keepalive (#362)
Co-authored-by: stainless-app[bot] <142633134+stainless-app[bot]@users.noreply.github.com> Co-authored-by: Declan Brady <declan.brady@scale.com> Co-authored-by: Michael Chou <michael.chou@scale.com>
1 parent 6f1c14f commit feec842

9 files changed

Lines changed: 967 additions & 92 deletions

File tree

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"id": "0",
7+
"metadata": {},
8+
"outputs": [],
9+
"source": [
10+
"from agentex import Agentex\n",
11+
"\n",
12+
"client = Agentex(base_url=\"http://localhost:5003\")"
13+
]
14+
},
15+
{
16+
"cell_type": "code",
17+
"execution_count": null,
18+
"id": "1",
19+
"metadata": {},
20+
"outputs": [],
21+
"source": [
22+
"AGENT_NAME = \"s010-multiturn\""
23+
]
24+
},
25+
{
26+
"cell_type": "code",
27+
"execution_count": null,
28+
"id": "2",
29+
"metadata": {},
30+
"outputs": [],
31+
"source": [
32+
"# # (Optional) Create a new task. If you don't create a new task, each message will be sent to a new task. The server will create the task for you.\n",
33+
"\n",
34+
"# import uuid\n",
35+
"\n",
36+
"# TASK_ID = str(uuid.uuid4())[:8]\n",
37+
"\n",
38+
"# rpc_response = client.agents.rpc_by_name(\n",
39+
"# agent_name=AGENT_NAME,\n",
40+
"# method=\"task/create\",\n",
41+
"# params={\n",
42+
"# \"name\": f\"{TASK_ID}-task\",\n",
43+
"# \"params\": {}\n",
44+
"# }\n",
45+
"# )\n",
46+
"\n",
47+
"# task = rpc_response.result\n",
48+
"# print(task)"
49+
]
50+
},
51+
{
52+
"cell_type": "code",
53+
"execution_count": null,
54+
"id": "3",
55+
"metadata": {},
56+
"outputs": [],
57+
"source": [
58+
"# Test non streaming response\n",
59+
"from agentex.types import TextContent\n",
60+
"\n",
61+
"# The response is expected to be a list of TaskMessage objects, which is a union of the following types:\n",
62+
"# - TextContent: A message with just text content \n",
63+
"# - DataContent: A message with JSON-serializable data content\n",
64+
"# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool\n",
65+
"# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content\n",
66+
"\n",
67+
"# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n",
68+
"\n",
69+
"rpc_response = client.agents.send_message(\n",
70+
" agent_name=AGENT_NAME,\n",
71+
" params={\n",
72+
" \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n",
73+
" \"stream\": False\n",
74+
" }\n",
75+
")\n",
76+
"\n",
77+
"if not rpc_response or not rpc_response.result:\n",
78+
" raise ValueError(\"No result in response\")\n",
79+
"\n",
80+
"# Extract and print just the text content from the response\n",
81+
"for task_message in rpc_response.result:\n",
82+
" content = task_message.content\n",
83+
" if isinstance(content, TextContent):\n",
84+
" text = content.content\n",
85+
" print(text)\n"
86+
]
87+
},
88+
{
89+
"cell_type": "code",
90+
"execution_count": null,
91+
"id": "4",
92+
"metadata": {},
93+
"outputs": [],
94+
"source": [
95+
"# Test streaming response\n",
96+
"from agentex.types.text_delta import TextDelta\n",
97+
"from agentex.types.task_message_update import StreamTaskMessageFull, StreamTaskMessageDelta\n",
98+
"\n",
99+
"# The result object of message/send will be a TaskMessageUpdate which is a union of the following types:\n",
100+
"# - StreamTaskMessageStart: \n",
101+
"# - An indicator that a streaming message was started, doesn't contain any useful content\n",
102+
"# - StreamTaskMessageDelta: \n",
103+
"# - A delta of a streaming message, contains the text delta to aggregate\n",
104+
"# - StreamTaskMessageDone: \n",
105+
"# - An indicator that a streaming message was done, doesn't contain any useful content\n",
106+
"# - StreamTaskMessageFull: \n",
107+
"# - A non-streaming message, there is nothing to aggregate, since this contains the full message, not deltas\n",
108+
"\n",
109+
"# Whenn processing StreamTaskMessageDelta, if you are expecting more than TextDeltas, such as DataDelta, ToolRequestDelta, or ToolResponseDelta, you can process them as well\n",
110+
"# Whenn processing StreamTaskMessageFull, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n",
111+
"\n",
112+
"for agent_rpc_response_chunk in client.agents.send_message_stream(\n",
113+
" agent_name=AGENT_NAME,\n",
114+
" params={\n",
115+
" \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n",
116+
" \"stream\": True\n",
117+
" }\n",
118+
"):\n",
119+
" # We know that the result of the message/send when stream is set to True will be a TaskMessageUpdate\n",
120+
" task_message_update = agent_rpc_response_chunk.result\n",
121+
" # Print oly the text deltas as they arrive or any full messages\n",
122+
" if isinstance(task_message_update, StreamTaskMessageDelta):\n",
123+
" delta = task_message_update.delta\n",
124+
" if isinstance(delta, TextDelta):\n",
125+
" print(delta.text_delta, end=\"\", flush=True)\n",
126+
" else:\n",
127+
" print(f\"Found non-text {type(task_message)} object in streaming message.\")\n",
128+
" elif isinstance(task_message_update, StreamTaskMessageFull):\n",
129+
" content = task_message_update.content\n",
130+
" if isinstance(content, TextContent):\n",
131+
" print(content.content)\n",
132+
" else:\n",
133+
" print(f\"Found non-text {type(task_message)} object in full message.\")\n"
134+
]
135+
},
136+
{
137+
"cell_type": "code",
138+
"execution_count": null,
139+
"id": "5",
140+
"metadata": {},
141+
"outputs": [],
142+
"source": []
143+
}
144+
],
145+
"metadata": {
146+
"kernelspec": {
147+
"display_name": ".venv",
148+
"language": "python",
149+
"name": "python3"
150+
},
151+
"language_info": {
152+
"codemirror_mode": {
153+
"name": "ipython",
154+
"version": 3
155+
},
156+
"file_extension": ".py",
157+
"mimetype": "text/x-python",
158+
"name": "python",
159+
"nbconvert_exporter": "python",
160+
"pygments_lexer": "ipython3",
161+
"version": "3.12.9"
162+
}
163+
},
164+
"nbformat": 4,
165+
"nbformat_minor": 5
166+
}

examples/tutorials/00_sync/010_multiturn/dev.ipynb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@
144144
],
145145
"metadata": {
146146
"kernelspec": {
147-
"display_name": ".venv",
147+
"display_name": "Python 3 (ipykernel)",
148148
"language": "python",
149149
"name": "python3"
150150
},
@@ -158,7 +158,7 @@
158158
"name": "python",
159159
"nbconvert_exporter": "python",
160160
"pygments_lexer": "ipython3",
161-
"version": "3.12.9"
161+
"version": "3.14.2"
162162
}
163163
},
164164
"nbformat": 4,

src/agentex/lib/adk/_modules/tracing.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,13 @@ def _tracing_service(self) -> TracingService:
6767
if self._tracing_service_lazy is None or (loop_id is not None and loop_id != self._bound_loop_id):
6868
import httpx
6969

70-
# Disable keepalive so each span HTTP call gets a fresh TCP
71-
# connection. Reused connections carry asyncio primitives bound
72-
# to the event loop that created them; in sync-ACP / streaming
73-
# contexts the loop context can shift between calls, causing
74-
# "bound to a different event loop" RuntimeErrors.
70+
# Keepalive ON: connections are reused within a single event
71+
# loop, eliminating the TLS-handshake-per-span penalty under
72+
# load. Cross-loop safety is preserved by rebuilding the
73+
# client whenever loop_id changes (the conditional above).
7574
agentex_client = create_async_agentex_client(
7675
http_client=httpx.AsyncClient(
77-
limits=httpx.Limits(max_keepalive_connections=0),
76+
limits=httpx.Limits(max_keepalive_connections=20),
7877
),
7978
)
8079
tracer = AsyncTracer(agentex_client)

src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
from typing import Any, Dict, override
1+
import asyncio
2+
import weakref
3+
from typing import TYPE_CHECKING, Any, Dict, override
24

35
from agentex import Agentex
46
from agentex.types.span import Span
@@ -9,6 +11,9 @@
911
AsyncTracingProcessor,
1012
)
1113

14+
if TYPE_CHECKING:
15+
from agentex import AsyncAgentex
16+
1217

1318
class AgentexSyncTracingProcessor(SyncTracingProcessor):
1419
def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002
@@ -67,19 +72,40 @@ def shutdown(self) -> None:
6772

6873
class AgentexAsyncTracingProcessor(AsyncTracingProcessor):
6974
def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002
75+
# Per-event-loop client cache. httpx.AsyncClient is bound to the
76+
# loop that created it, so in sync-ACP / streaming contexts (where
77+
# the active loop can change between requests) we keep one client
78+
# per loop instead of disabling keepalive entirely. The cache is a
79+
# WeakKeyDictionary so a GC'd loop and its client are evicted
80+
# automatically — using id() as a key would reuse entries when
81+
# CPython recycles a freed loop's memory address.
82+
self._clients_by_loop: weakref.WeakKeyDictionary[
83+
asyncio.AbstractEventLoop, "AsyncAgentex"
84+
] = weakref.WeakKeyDictionary()
85+
86+
def _build_client(self) -> "AsyncAgentex":
7087
import httpx
7188

72-
# Disable keepalive so each span HTTP call gets a fresh TCP connection.
73-
# Reused connections carry asyncio primitives bound to the event loop
74-
# that created them; in sync-ACP / streaming contexts the loop context
75-
# can shift between calls, causing "bound to a different event loop"
76-
# RuntimeErrors.
77-
self.client = create_async_agentex_client(
89+
# Keepalive ON: connections are reused within a single event loop,
90+
# eliminating the TLS-handshake-per-span penalty under load.
91+
return create_async_agentex_client(
7892
http_client=httpx.AsyncClient(
79-
limits=httpx.Limits(max_keepalive_connections=0),
93+
limits=httpx.Limits(max_keepalive_connections=20),
8094
),
8195
)
8296

97+
@property
98+
def client(self) -> "AsyncAgentex":
99+
try:
100+
loop = asyncio.get_running_loop()
101+
except RuntimeError:
102+
return self._build_client()
103+
client = self._clients_by_loop.get(loop)
104+
if client is None:
105+
client = self._build_client()
106+
self._clients_by_loop[loop] = client
107+
return client
108+
83109
# TODO(AGX1-199): Add batch create/update endpoints to Agentex API and use
84110
# them here instead of one HTTP call per span.
85111
# https://linear.app/scale-epd/issue/AGX1-199/add-agentex-batch-endpoint-for-traces

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py

Lines changed: 53 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import annotations
22

3+
import asyncio
4+
import weakref
35
from typing import cast, override
46

57
import scale_gp_beta.lib.tracing as tracing
@@ -92,23 +94,50 @@ def shutdown(self) -> None:
9294
class SGPAsyncTracingProcessor(AsyncTracingProcessor):
9395
def __init__(self, config: SGPTracingProcessorConfig):
9496
self.disabled = config.sgp_api_key == "" or config.sgp_account_id == ""
97+
self._config = config
98+
# Per-event-loop client cache. httpx.AsyncClient ties its connection
99+
# pool to the loop it was created on; in sync-ACP / streaming contexts
100+
# the active loop can change between requests. Caching per loop lets
101+
# us keep keepalive on within each loop while staying safe across
102+
# loops. The cache is a WeakKeyDictionary so a GC'd loop and its
103+
# client are evicted automatically — using id() as a key would reuse
104+
# entries when CPython recycles a freed loop's memory address.
105+
self._clients_by_loop: weakref.WeakKeyDictionary[
106+
asyncio.AbstractEventLoop, AsyncSGPClient
107+
] = weakref.WeakKeyDictionary()
108+
self.env_vars = EnvironmentVariables.refresh()
109+
110+
def _build_client(self) -> AsyncSGPClient:
95111
import httpx
96112

97-
# Disable keepalive so each HTTP call gets a fresh TCP connection,
98-
# avoiding "bound to a different event loop" errors in sync-ACP.
99-
self.sgp_async_client = (
100-
AsyncSGPClient(
101-
api_key=config.sgp_api_key,
102-
account_id=config.sgp_account_id,
103-
base_url=config.sgp_base_url,
104-
http_client=httpx.AsyncClient(
105-
limits=httpx.Limits(max_keepalive_connections=0),
106-
),
107-
)
108-
if not self.disabled
109-
else None
113+
return AsyncSGPClient(
114+
api_key=self._config.sgp_api_key,
115+
account_id=self._config.sgp_account_id,
116+
base_url=self._config.sgp_base_url,
117+
# Keepalive ON: connections are reused within a single event loop,
118+
# which removes the TLS-handshake-per-span penalty observed under
119+
# load. Cross-loop safety is preserved by the per-loop cache.
120+
http_client=httpx.AsyncClient(
121+
limits=httpx.Limits(max_keepalive_connections=20),
122+
),
110123
)
111-
self.env_vars = EnvironmentVariables.refresh()
124+
125+
def _get_client(self) -> AsyncSGPClient | None:
126+
"""Return the AsyncSGPClient bound to the current event loop, creating
127+
one on first use. Returns None when the processor is disabled."""
128+
if self.disabled:
129+
return None
130+
try:
131+
loop = asyncio.get_running_loop()
132+
except RuntimeError:
133+
# Called from outside an event loop — should not happen on the
134+
# hot path, but build a one-off client rather than crashing.
135+
return self._build_client()
136+
client = self._clients_by_loop.get(loop)
137+
if client is None:
138+
client = self._build_client()
139+
self._clients_by_loop[loop] = client
140+
return client
112141

113142
@override
114143
async def on_span_start(self, span: Span) -> None:
@@ -123,31 +152,29 @@ async def on_spans_start(self, spans: list[Span]) -> None:
123152
if not spans:
124153
return
125154

126-
sgp_spans = [_build_sgp_span(span, self.env_vars) for span in spans]
127-
128-
if self.disabled:
155+
client = self._get_client()
156+
if client is None:
129157
logger.warning("SGP is disabled, skipping span upsert")
130158
return
131-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
132-
items=[s.to_request_params() for s in sgp_spans]
133-
)
159+
160+
sgp_spans = [_build_sgp_span(span, self.env_vars) for span in spans]
161+
await client.spans.upsert_batch(items=[s.to_request_params() for s in sgp_spans])
134162

135163
@override
136164
async def on_spans_end(self, spans: list[Span]) -> None:
137165
if not spans:
138166
return
139167

168+
client = self._get_client()
169+
if client is None:
170+
return
171+
140172
sgp_spans: list[SGPSpan] = []
141173
for span in spans:
142174
sgp_span = _build_sgp_span(span, self.env_vars)
143175
sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr]
144176
sgp_spans.append(sgp_span)
145-
146-
if self.disabled:
147-
return
148-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
149-
items=[s.to_request_params() for s in sgp_spans]
150-
)
177+
await client.spans.upsert_batch(items=[s.to_request_params() for s in sgp_spans])
151178

152179
@override
153180
async def shutdown(self) -> None:

0 commit comments

Comments
 (0)