Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/agents/tracing/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def __init__(
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self._shutdown_event = threading.Event()

# Keep a client open for connection pooling across multiple export calls
self._client = httpx.Client(timeout=httpx.Timeout(timeout=60, connect=5.0))
Expand Down Expand Up @@ -213,8 +214,12 @@ def _timeout_for_deadline(self, deadline: float | None) -> httpx.Timeout | None:

def _sleep_before_retry(self, sleep_time: float, deadline: float | None) -> bool:
if deadline is None:
time.sleep(sleep_time)
return True
if self._shutdown_event.wait(sleep_time):
logger.warning(
"[non-fatal] Tracing: shutdown requested during retry backoff, giving up."
)
return False
return not self._shutdown_event.is_set()

remaining = deadline - time.monotonic()
if remaining <= 0:
Expand Down Expand Up @@ -510,6 +515,9 @@ def close(self):
"""Close the underlying HTTP client."""
self._client.close()

def _request_shutdown(self) -> None:
self._shutdown_event.set()


class BatchTraceProcessor(TracingProcessor):
"""Some implementation notes:
Expand Down Expand Up @@ -598,6 +606,11 @@ def shutdown(self, timeout: float | None = None):
Called when the application stops. We signal our thread to stop, then join it.
"""
self._shutdown_event.set()
if timeout is not None:
request_exporter_shutdown = getattr(self._exporter, "_request_shutdown", None)
if callable(request_exporter_shutdown):
request_exporter_shutdown()

deadline = None if timeout is None else time.monotonic() + timeout
self._shutdown_deadline = deadline

Expand Down
127 changes: 101 additions & 26 deletions tests/test_trace_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,16 +394,6 @@ def test_get_trace_provider_force_flush_flushes_default_processor(mocked_exporte
processor.shutdown()


@pytest.fixture
def patched_time_sleep():
"""
Fixture to replace time.sleep with a no-op to speed up tests
that rely on retry/backoff logic.
"""
with patch("time.sleep") as mock_sleep:
yield mock_sleep


def mock_processor():
processor = MagicMock()
processor.on_trace_start = MagicMock()
Expand Down Expand Up @@ -463,42 +453,109 @@ def test_backend_span_exporter_4xx_client_error(mock_client):


@patch("httpx.Client")
def test_backend_span_exporter_5xx_retry(mock_client, patched_time_sleep):
def test_backend_span_exporter_5xx_retry(mock_client):
mock_response = MagicMock()
mock_response.status_code = 500

# Make post() return 500 every time
mock_client.return_value.post.return_value = mock_response

exporter = BackendSpanExporter(api_key="test_key", max_retries=3, base_delay=0.1, max_delay=0.2)
exporter.export([get_span(mock_processor())])
with patch.object(exporter._shutdown_event, "wait", return_value=False) as wait_for_retry:
exporter.export([get_span(mock_processor())])

# Should retry up to max_retries times
assert mock_client.return_value.post.call_count == 3
assert patched_time_sleep.call_count == 2
assert wait_for_retry.call_count == 2

exporter.close()


@patch("httpx.Client")
def test_backend_span_exporter_deadline_stops_during_5xx_retry_backoff(
mock_client,
patched_time_sleep,
):
def test_backend_span_exporter_deadline_stops_during_5xx_retry_backoff(mock_client):
mock_response = MagicMock()
mock_response.status_code = 504
mock_client.return_value.post.return_value = mock_response

exporter = BackendSpanExporter(api_key="test_key", max_retries=3, base_delay=1.0)
exporter._export_with_deadline([get_span(mock_processor())], deadline=time.monotonic() + 0.01)
with patch("time.sleep") as sleep_for_retry:
exporter._export_with_deadline(
[get_span(mock_processor())], deadline=time.monotonic() + 0.01
)

assert mock_client.return_value.post.call_count == 1
patched_time_sleep.assert_called_once()
assert patched_time_sleep.call_args.args[0] <= 0.1
sleep_for_retry.assert_called_once()
assert sleep_for_retry.call_args.args[0] <= 0.1

exporter.close()


@patch("httpx.Client")
def test_batch_trace_processor_shutdown_interrupts_exporter_retry_backoff(mock_client):
post_called = threading.Event()
mock_response = MagicMock()
mock_response.status_code = 504

def post(**kwargs: Any) -> Any:
post_called.set()
return mock_response

mock_client.return_value.post.side_effect = post

exporter = BackendSpanExporter(
api_key="test_key",
max_retries=100,
base_delay=10.0,
max_delay=10.0,
)
processor = BatchTraceProcessor(
exporter=exporter,
max_queue_size=1,
max_batch_size=1,
schedule_delay=60.0,
export_trigger_ratio=1.0,
)

processor.on_span_end(get_span(processor))
assert post_called.wait(timeout=2.0)

start = time.monotonic()
processor.shutdown(timeout=1.0)
elapsed = time.monotonic() - start

assert elapsed < 0.5
assert processor._worker_thread is not None
assert not processor._worker_thread.is_alive()
assert mock_client.return_value.post.call_count == 1

exporter.close()


@patch("httpx.Client")
def test_batch_trace_processor_shutdown_without_timeout_preserves_export_retries(mock_client):
mock_response = MagicMock()
mock_response.status_code = 504
mock_client.return_value.post.return_value = mock_response

exporter = BackendSpanExporter(
api_key="test_key",
max_retries=3,
base_delay=0.1,
max_delay=0.2,
)
processor = BatchTraceProcessor(exporter=exporter)
processor._queue.put_nowait(get_span(processor))

with patch.object(exporter._shutdown_event, "wait", return_value=False) as wait_for_retry:
processor.shutdown(timeout=None)

assert mock_client.return_value.post.call_count == 3
assert wait_for_retry.call_count == 2

exporter.close()


@pytest.mark.serial
def test_tracing_atexit_cleanup_timeout_preserves_process_exit_code_on_504() -> None:
request_seen = threading.Event()

Expand Down Expand Up @@ -544,6 +601,19 @@ def log_message(self, format: str, *args: Any) -> None:
)
provider = DefaultTraceProvider()
provider.register_processor(processor)
original_shutdown = provider.shutdown

def timed_shutdown(*args, **kwargs):
shutdown_started = time.monotonic()
try:
return original_shutdown(*args, **kwargs)
finally:
print(
f"shutdown_elapsed={{time.monotonic() - shutdown_started:.6f}}",
flush=True,
)

provider.shutdown = timed_shutdown
tracing_setup.set_trace_provider(provider)

with trace("probe"):
Expand All @@ -555,35 +625,40 @@ def log_message(self, format: str, *args: Any) -> None:
"""
)

start = time.monotonic()
try:
result = subprocess.run(
[sys.executable, "-c", script],
check=False,
capture_output=True,
text=True,
timeout=3.0,
timeout=10.0,
)
elapsed = time.monotonic() - start
finally:
server.shutdown()
server.server_close()

assert request_seen.is_set()
assert result.returncode == 7
assert elapsed < 2.8
shutdown_elapsed_prefix = "shutdown_elapsed="
shutdown_elapsed_lines = [
line for line in result.stdout.splitlines() if line.startswith(shutdown_elapsed_prefix)
]
assert len(shutdown_elapsed_lines) == 1
assert float(shutdown_elapsed_lines[0][len(shutdown_elapsed_prefix) :]) < 0.5


@patch("httpx.Client")
def test_backend_span_exporter_request_error(mock_client, patched_time_sleep):
def test_backend_span_exporter_request_error(mock_client):
# Make post() raise a RequestError each time
mock_client.return_value.post.side_effect = httpx.RequestError("Network error")

exporter = BackendSpanExporter(api_key="test_key", max_retries=2, base_delay=0.1, max_delay=0.2)
exporter.export([get_span(mock_processor())])
with patch.object(exporter._shutdown_event, "wait", return_value=False) as wait_for_retry:
exporter.export([get_span(mock_processor())])

# Should retry up to max_retries times
assert mock_client.return_value.post.call_count == 2
wait_for_retry.assert_called_once()

exporter.close()

Expand Down
Loading