Skip to content

Commit 512c843

Browse files
committed
perf(tracing): bounded-concurrency span export
Span export was strictly serial: the drain loop awaited each upsert_batch to completion before sending the next, so per-pod egress was capped at ~1/request-latency (~150ms server-side ⇒ ~6-7 PUT/s/pod) regardless of CPU or backend headroom. Under load the queue backlogged and only drained after the run. The drain now dispatches each batch as its own task, bounded by AGENTEX_SPAN_QUEUE_CONCURRENCY (default 8), so multiple upsert_batch requests are in flight at once and a pod can keep up with span production. The per-span START-before-END invariant is preserved: END send-tasks snapshot the in-flight START tasks and await them before issuing. Since a span's START is always enqueued (and thus dispatched) before its END, that span's START send is either still in flight (waited on) or already done. Independent spans export fully concurrently. Setting concurrency=1 restores the old serial behavior.
1 parent feec842 commit 512c843

2 files changed

Lines changed: 216 additions & 33 deletions

File tree

src/agentex/lib/core/tracing/span_queue.py

Lines changed: 100 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@
2020
_DEFAULT_MAX_SIZE = 0
2121
# Total attempts per batch for a *transient* failure (1 == no retry).
2222
_DEFAULT_MAX_RETRIES = 1
23+
# Max number of batch-export HTTP requests in flight at once. The export
24+
# backend (EGP) processes each upsert_batch in ~150ms but serves many requests
25+
# concurrently; issuing one batch at a time caps per-pod egress at ~1/latency.
26+
# Sending several concurrently lets a pod keep up with span production under
27+
# load. ``1`` restores the old strictly-serial behavior.
28+
_DEFAULT_CONCURRENCY = 8
2329
# HTTP statuses worth retrying at the queue level. These are explicit
2430
# backpressure / transient signals; everything else (esp. 401/403/4xx auth and
2531
# validation errors) is a permanent failure that re-enqueuing cannot fix. Note
@@ -76,15 +82,23 @@ class AsyncSpanQueue:
7682
"""Background FIFO queue for async span processing.
7783
7884
Span events are enqueued synchronously (non-blocking) and drained by a
79-
background task. Items are processed in batches: all START events in a
80-
batch are flushed concurrently, then all END events, so that per-span
81-
start-before-end ordering is preserved while HTTP calls for independent
82-
spans execute in parallel.
83-
84-
Once the drain loop picks up the first item, it lingers up to
85-
``linger_ms`` waiting for more items to coalesce into the same batch.
86-
Without the linger the drain almost always returned size-1 batches under
87-
real agent workloads, because spans typically arrive a few ms apart.
85+
background task. The drain coalesces ready events into batches and
86+
*dispatches* each batch's export as its own task, so up to ``concurrency``
87+
batch requests can be in flight at once. This matters because each
88+
``upsert_batch`` HTTP call takes tens-to-hundreds of ms server-side; issuing
89+
them one at a time caps a pod's egress at ~1/latency and lets a backlog
90+
build under load.
91+
92+
Ordering guarantee: a span's START export always completes before its END
93+
export is issued. END batches wait on the START batches that were in flight
94+
when they were formed; because a span's START is always enqueued before its
95+
END, that span's START send is either still in flight (and waited on) or
96+
already finished. Independent spans export fully concurrently.
97+
98+
Once the drain loop picks up the first item, it lingers up to ``linger_ms``
99+
waiting for more items to coalesce into the same batch. Without the linger
100+
the drain almost always returned size-1 batches under real agent workloads,
101+
because spans typically arrive a few ms apart.
88102
89103
Reliability:
90104
- ``max_size`` bounds the queue. When full, new events are dropped and
@@ -101,6 +115,7 @@ def __init__(
101115
linger_ms: int | None = None,
102116
max_size: int | None = None,
103117
max_retries: int | None = None,
118+
concurrency: int | None = None,
104119
) -> None:
105120
resolved_max_size = (
106121
_read_int_env("AGENTEX_SPAN_QUEUE_MAX_SIZE", _DEFAULT_MAX_SIZE) if max_size is None else max(0, max_size)
@@ -115,6 +130,17 @@ def __init__(
115130
if max_retries is None
116131
else max(1, max_retries)
117132
)
133+
self._concurrency = (
134+
_read_int_env("AGENTEX_SPAN_QUEUE_CONCURRENCY", _DEFAULT_CONCURRENCY, minimum=1)
135+
if concurrency is None
136+
else max(1, concurrency)
137+
)
138+
# Bounds concurrent export HTTP requests.
139+
self._send_sema = asyncio.Semaphore(self._concurrency)
140+
# Outstanding dispatched send tasks, and the subset that are START
141+
# sends (END sends wait on these to preserve per-span ordering).
142+
self._inflight: set[asyncio.Task[None]] = set()
143+
self._inflight_starts: set[asyncio.Task[None]] = set()
118144
# Total spans dropped for any reason (full queue, shutdown, permanent
119145
# failure, exhausted retries). Surfaced for metrics/observability so
120146
# span loss stops being silent.
@@ -169,6 +195,11 @@ def _ensure_drain_running(self) -> None:
169195

170196
async def _drain_loop(self) -> None:
171197
while True:
198+
# Backpressure: cap the number of in-flight send tasks so the drain
199+
# does not run unboundedly ahead of the exporters.
200+
while len(self._inflight) >= self._concurrency:
201+
await asyncio.wait(set(self._inflight), return_when=asyncio.FIRST_COMPLETED)
202+
172203
# Block until at least one item is available.
173204
first = await self._queue.get()
174205
batch: list[_SpanQueueItem] = [first]
@@ -196,22 +227,43 @@ async def _drain_loop(self) -> None:
196227
except asyncio.QueueEmpty:
197228
break
198229

199-
try:
200-
# Separate START and END events. Processing all STARTs before
201-
# ENDs ensures that on_span_start completes before on_span_end
202-
# for any span whose both events land in the same batch.
203-
starts = [i for i in batch if i.event_type == SpanEventType.START]
204-
ends = [i for i in batch if i.event_type == SpanEventType.END]
205-
206-
if starts:
207-
await self._process_items(starts)
208-
if ends:
209-
await self._process_items(ends)
210-
finally:
211-
for _ in batch:
212-
self._queue.task_done()
213-
# Release span data for GC.
214-
batch.clear()
230+
# Separate START and END events and dispatch each as its own send
231+
# task. Dispatching STARTs first (so they are registered before the
232+
# END snapshot) guarantees an END never outruns a START of the same
233+
# span whose events land in this batch.
234+
starts = [i for i in batch if i.event_type == SpanEventType.START]
235+
ends = [i for i in batch if i.event_type == SpanEventType.END]
236+
if starts:
237+
self._dispatch(starts, SpanEventType.START)
238+
if ends:
239+
self._dispatch(ends, SpanEventType.END)
240+
241+
def _dispatch(self, items: list[_SpanQueueItem], event_type: SpanEventType) -> None:
242+
"""Spawn a background task to export ``items``.
243+
244+
END sends snapshot the currently in-flight START tasks and wait for them
245+
before issuing, preserving the per-span START-before-END invariant.
246+
"""
247+
barrier = tuple(self._inflight_starts) if event_type == SpanEventType.END else ()
248+
task = asyncio.create_task(self._run_send(items, barrier))
249+
self._inflight.add(task)
250+
task.add_done_callback(self._inflight.discard)
251+
if event_type == SpanEventType.START:
252+
self._inflight_starts.add(task)
253+
task.add_done_callback(self._inflight_starts.discard)
254+
255+
async def _run_send(self, items: list[_SpanQueueItem], barrier: tuple[asyncio.Task[None], ...]) -> None:
256+
try:
257+
if barrier:
258+
# Wait for the START sends this END batch depends on. Their
259+
# exceptions are irrelevant here — we only need them finished.
260+
await asyncio.gather(*barrier, return_exceptions=True)
261+
await self._process_items(items)
262+
finally:
263+
# Mark every item done so shutdown's queue.join() can complete only
264+
# once all sends (and their retries) have finished.
265+
for _ in items:
266+
self._queue.task_done()
215267

216268
async def _process_items(self, items: list[_SpanQueueItem]) -> None:
217269
"""Dispatch a batch of same-event-type items to each processor in one call.
@@ -243,10 +295,12 @@ async def _handle(
243295
) -> None:
244296
spans = [item.span for item in items]
245297
try:
246-
if event_type == SpanEventType.START:
247-
await p.on_spans_start(spans)
248-
else:
249-
await p.on_spans_end(spans)
298+
# Hold a concurrency slot only for the duration of the HTTP call.
299+
async with self._send_sema:
300+
if event_type == SpanEventType.START:
301+
await p.on_spans_start(spans)
302+
else:
303+
await p.on_spans_end(spans)
250304
except Exception as exc:
251305
self._handle_failure(p, items, event_type, exc)
252306

@@ -307,21 +361,37 @@ def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None:
307361

308362
async def shutdown(self, timeout: float = 30.0) -> None:
309363
self._stopping = True
310-
if self._queue.empty() and (self._drain_task is None or self._drain_task.done()):
364+
drain_idle = self._drain_task is None or self._drain_task.done()
365+
if self._queue.empty() and drain_idle and not self._inflight:
311366
return
367+
368+
timed_out = False
312369
try:
370+
# join() returns once every enqueued (and re-enqueued) item has been
371+
# marked done by its send task.
313372
await asyncio.wait_for(self._queue.join(), timeout=timeout)
314373
except asyncio.TimeoutError:
374+
timed_out = True
315375
logger.warning(
316376
"Span queue shutdown timed out after %.1fs with %d items remaining", timeout, self._queue.qsize()
317377
)
378+
318379
if self._drain_task is not None and not self._drain_task.done():
319380
self._drain_task.cancel()
320381
try:
321382
await self._drain_task
322383
except asyncio.CancelledError:
323384
pass
324385

386+
# Clean up any in-flight send tasks. On a clean shutdown these are
387+
# already finishing; on timeout, cancel the stragglers so we don't hang.
388+
inflight = list(self._inflight)
389+
if inflight:
390+
if timed_out:
391+
for task in inflight:
392+
task.cancel()
393+
await asyncio.gather(*inflight, return_exceptions=True)
394+
325395

326396
_default_span_queue: AsyncSpanQueue | None = None
327397

tests/lib/core/tracing/test_span_queue.py

Lines changed: 116 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -425,9 +425,10 @@ async def block_first(spans: list[Span]) -> None:
425425
proc.on_spans_start = AsyncMock(side_effect=block_first)
426426
proc.on_spans_end = AsyncMock()
427427

428-
# max_size=1, no linger: the drain pulls item-0 and blocks; item-1 fills
429-
# the queue; items 2 and 3 are dropped.
430-
queue = AsyncSpanQueue(max_size=1, linger_ms=0)
428+
# max_size=1, no linger, concurrency=1: the drain dispatches item-0 and
429+
# then blocks at the in-flight cap; item-1 fills the queue; items 2 and 3
430+
# are dropped.
431+
queue = AsyncSpanQueue(max_size=1, linger_ms=0, concurrency=1)
431432

432433
queue.enqueue(SpanEventType.START, _make_span("s0"), [proc])
433434
await asyncio.sleep(0.02) # let the drain pick up s0 and block
@@ -536,6 +537,118 @@ async def always_503(spans: list[Span]) -> None:
536537
assert queue.dropped_spans == 1
537538

538539

540+
class TestAsyncSpanQueueConcurrency:
541+
"""Span export should issue multiple batch requests concurrently (bounded),
542+
so per-pod egress isn't capped at one in-flight request — while still
543+
guaranteeing a span's START send completes before its END send.
544+
"""
545+
546+
async def test_batches_dispatched_concurrently_up_to_bound(self):
547+
current = 0
548+
max_seen = 0
549+
lock = asyncio.Lock()
550+
551+
async def slow_start(spans: list[Span]) -> None:
552+
nonlocal current, max_seen
553+
async with lock:
554+
current += 1
555+
max_seen = max(max_seen, current)
556+
await asyncio.sleep(0.05)
557+
async with lock:
558+
current -= 1
559+
560+
proc = AsyncMock()
561+
proc.on_spans_start = AsyncMock(side_effect=slow_start)
562+
proc.on_spans_end = AsyncMock()
563+
564+
# batch_size=1 → each span is its own batch/send; concurrency=4 caps
565+
# simultaneous in-flight sends.
566+
queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=4)
567+
for i in range(8):
568+
queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc])
569+
570+
await queue.shutdown()
571+
572+
assert proc.on_spans_start.call_count == 8
573+
assert 2 <= max_seen <= 4, f"expected bounded concurrency (2..4), saw {max_seen}"
574+
575+
async def test_concurrency_one_serializes(self):
576+
current = 0
577+
max_seen = 0
578+
lock = asyncio.Lock()
579+
580+
async def slow_start(spans: list[Span]) -> None:
581+
nonlocal current, max_seen
582+
async with lock:
583+
current += 1
584+
max_seen = max(max_seen, current)
585+
await asyncio.sleep(0.03)
586+
async with lock:
587+
current -= 1
588+
589+
proc = AsyncMock()
590+
proc.on_spans_start = AsyncMock(side_effect=slow_start)
591+
proc.on_spans_end = AsyncMock()
592+
593+
queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=1)
594+
for i in range(4):
595+
queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc])
596+
597+
await queue.shutdown()
598+
599+
assert max_seen == 1, f"concurrency=1 must serialize sends, saw {max_seen}"
600+
601+
async def test_concurrent_is_faster_than_serial(self):
602+
async def slow_start(spans: list[Span]) -> None:
603+
await asyncio.sleep(0.05)
604+
605+
proc = AsyncMock()
606+
proc.on_spans_start = AsyncMock(side_effect=slow_start)
607+
proc.on_spans_end = AsyncMock()
608+
609+
queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=8)
610+
for i in range(8):
611+
queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc])
612+
613+
start = time.monotonic()
614+
await queue.shutdown()
615+
elapsed = time.monotonic() - start
616+
617+
serial = 8 * 0.05
618+
assert elapsed < serial * 0.5, f"concurrent drain took {elapsed:.3f}s; serial would be {serial:.3f}s"
619+
620+
async def test_end_waits_for_start_of_same_span(self):
621+
"""The per-span ordering invariant: a span's END upsert must not be sent
622+
until its START upsert has completed, even with concurrency enabled."""
623+
log: list[tuple[str, str]] = []
624+
625+
async def on_start(spans: list[Span]) -> None:
626+
log.append(("start_enter", spans[0].id))
627+
await asyncio.sleep(0.05)
628+
log.append(("start_exit", spans[0].id))
629+
630+
async def on_end(spans: list[Span]) -> None:
631+
log.append(("end_enter", spans[0].id))
632+
await asyncio.sleep(0.01)
633+
log.append(("end_exit", spans[0].id))
634+
635+
proc = AsyncMock()
636+
proc.on_spans_start = AsyncMock(side_effect=on_start)
637+
proc.on_spans_end = AsyncMock(side_effect=on_end)
638+
639+
queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=4)
640+
queue.enqueue(SpanEventType.START, _make_span("A"), [proc])
641+
await asyncio.sleep(0.01) # let the START send begin (and block on sleep)
642+
queue.enqueue(SpanEventType.END, _make_span("A"), [proc])
643+
644+
await queue.shutdown()
645+
646+
# END must not enter until START has exited for the same span.
647+
start_exit = log.index(("start_exit", "A"))
648+
end_enter = log.index(("end_enter", "A"))
649+
assert start_exit < end_enter, f"END began before START completed: {log}"
650+
651+
539652
class TestAsyncSpanQueueIntegration:
540653
async def test_integration_with_async_trace(self):
541654
call_log: list[tuple[str, str]] = []

0 commit comments

Comments
 (0)