From 400bb36a539a7ba09c797c0bfc5a98af04e53ddd Mon Sep 17 00:00:00 2001 From: c <37263590+Aphroq@users.noreply.github.com> Date: Mon, 11 May 2026 12:39:17 +0000 Subject: [PATCH] Interrupt tracing retry backoff on shutdown --- src/agents/tracing/processors.py | 17 ++++- tests/test_trace_processor.py | 127 ++++++++++++++++++++++++------- 2 files changed, 116 insertions(+), 28 deletions(-) diff --git a/src/agents/tracing/processors.py b/src/agents/tracing/processors.py index ec51659b0f..6711bc92de 100644 --- a/src/agents/tracing/processors.py +++ b/src/agents/tracing/processors.py @@ -73,6 +73,7 @@ def __init__( self.max_retries = max_retries self.base_delay = base_delay self.max_delay = max_delay + self._shutdown_event = threading.Event() # Keep a client open for connection pooling across multiple export calls self._client = httpx.Client(timeout=httpx.Timeout(timeout=60, connect=5.0)) @@ -213,8 +214,12 @@ def _timeout_for_deadline(self, deadline: float | None) -> httpx.Timeout | None: def _sleep_before_retry(self, sleep_time: float, deadline: float | None) -> bool: if deadline is None: - time.sleep(sleep_time) - return True + if self._shutdown_event.wait(sleep_time): + logger.warning( + "[non-fatal] Tracing: shutdown requested during retry backoff, giving up." + ) + return False + return not self._shutdown_event.is_set() remaining = deadline - time.monotonic() if remaining <= 0: @@ -510,6 +515,9 @@ def close(self): """Close the underlying HTTP client.""" self._client.close() + def _request_shutdown(self) -> None: + self._shutdown_event.set() + class BatchTraceProcessor(TracingProcessor): """Some implementation notes: @@ -598,6 +606,11 @@ def shutdown(self, timeout: float | None = None): Called when the application stops. We signal our thread to stop, then join it. """ self._shutdown_event.set() + if timeout is not None: + request_exporter_shutdown = getattr(self._exporter, "_request_shutdown", None) + if callable(request_exporter_shutdown): + request_exporter_shutdown() + deadline = None if timeout is None else time.monotonic() + timeout self._shutdown_deadline = deadline diff --git a/tests/test_trace_processor.py b/tests/test_trace_processor.py index 94b1d01ebf..c0d8898599 100644 --- a/tests/test_trace_processor.py +++ b/tests/test_trace_processor.py @@ -394,16 +394,6 @@ def test_get_trace_provider_force_flush_flushes_default_processor(mocked_exporte processor.shutdown() -@pytest.fixture -def patched_time_sleep(): - """ - Fixture to replace time.sleep with a no-op to speed up tests - that rely on retry/backoff logic. - """ - with patch("time.sleep") as mock_sleep: - yield mock_sleep - - def mock_processor(): processor = MagicMock() processor.on_trace_start = MagicMock() @@ -463,7 +453,7 @@ def test_backend_span_exporter_4xx_client_error(mock_client): @patch("httpx.Client") -def test_backend_span_exporter_5xx_retry(mock_client, patched_time_sleep): +def test_backend_span_exporter_5xx_retry(mock_client): mock_response = MagicMock() mock_response.status_code = 500 @@ -471,34 +461,101 @@ def test_backend_span_exporter_5xx_retry(mock_client, patched_time_sleep): mock_client.return_value.post.return_value = mock_response exporter = BackendSpanExporter(api_key="test_key", max_retries=3, base_delay=0.1, max_delay=0.2) - exporter.export([get_span(mock_processor())]) + with patch.object(exporter._shutdown_event, "wait", return_value=False) as wait_for_retry: + exporter.export([get_span(mock_processor())]) # Should retry up to max_retries times assert mock_client.return_value.post.call_count == 3 - assert patched_time_sleep.call_count == 2 + assert wait_for_retry.call_count == 2 exporter.close() @patch("httpx.Client") -def test_backend_span_exporter_deadline_stops_during_5xx_retry_backoff( - mock_client, - patched_time_sleep, -): +def test_backend_span_exporter_deadline_stops_during_5xx_retry_backoff(mock_client): mock_response = MagicMock() mock_response.status_code = 504 mock_client.return_value.post.return_value = mock_response exporter = BackendSpanExporter(api_key="test_key", max_retries=3, base_delay=1.0) - exporter._export_with_deadline([get_span(mock_processor())], deadline=time.monotonic() + 0.01) + with patch("time.sleep") as sleep_for_retry: + exporter._export_with_deadline( + [get_span(mock_processor())], deadline=time.monotonic() + 0.01 + ) assert mock_client.return_value.post.call_count == 1 - patched_time_sleep.assert_called_once() - assert patched_time_sleep.call_args.args[0] <= 0.1 + sleep_for_retry.assert_called_once() + assert sleep_for_retry.call_args.args[0] <= 0.1 exporter.close() +@patch("httpx.Client") +def test_batch_trace_processor_shutdown_interrupts_exporter_retry_backoff(mock_client): + post_called = threading.Event() + mock_response = MagicMock() + mock_response.status_code = 504 + + def post(**kwargs: Any) -> Any: + post_called.set() + return mock_response + + mock_client.return_value.post.side_effect = post + + exporter = BackendSpanExporter( + api_key="test_key", + max_retries=100, + base_delay=10.0, + max_delay=10.0, + ) + processor = BatchTraceProcessor( + exporter=exporter, + max_queue_size=1, + max_batch_size=1, + schedule_delay=60.0, + export_trigger_ratio=1.0, + ) + + processor.on_span_end(get_span(processor)) + assert post_called.wait(timeout=2.0) + + start = time.monotonic() + processor.shutdown(timeout=1.0) + elapsed = time.monotonic() - start + + assert elapsed < 0.5 + assert processor._worker_thread is not None + assert not processor._worker_thread.is_alive() + assert mock_client.return_value.post.call_count == 1 + + exporter.close() + + +@patch("httpx.Client") +def test_batch_trace_processor_shutdown_without_timeout_preserves_export_retries(mock_client): + mock_response = MagicMock() + mock_response.status_code = 504 + mock_client.return_value.post.return_value = mock_response + + exporter = BackendSpanExporter( + api_key="test_key", + max_retries=3, + base_delay=0.1, + max_delay=0.2, + ) + processor = BatchTraceProcessor(exporter=exporter) + processor._queue.put_nowait(get_span(processor)) + + with patch.object(exporter._shutdown_event, "wait", return_value=False) as wait_for_retry: + processor.shutdown(timeout=None) + + assert mock_client.return_value.post.call_count == 3 + assert wait_for_retry.call_count == 2 + + exporter.close() + + +@pytest.mark.serial def test_tracing_atexit_cleanup_timeout_preserves_process_exit_code_on_504() -> None: request_seen = threading.Event() @@ -544,6 +601,19 @@ def log_message(self, format: str, *args: Any) -> None: ) provider = DefaultTraceProvider() provider.register_processor(processor) + original_shutdown = provider.shutdown + + def timed_shutdown(*args, **kwargs): + shutdown_started = time.monotonic() + try: + return original_shutdown(*args, **kwargs) + finally: + print( + f"shutdown_elapsed={{time.monotonic() - shutdown_started:.6f}}", + flush=True, + ) + + provider.shutdown = timed_shutdown tracing_setup.set_trace_provider(provider) with trace("probe"): @@ -555,35 +625,40 @@ def log_message(self, format: str, *args: Any) -> None: """ ) - start = time.monotonic() try: result = subprocess.run( [sys.executable, "-c", script], check=False, capture_output=True, text=True, - timeout=3.0, + timeout=10.0, ) - elapsed = time.monotonic() - start finally: server.shutdown() server.server_close() assert request_seen.is_set() assert result.returncode == 7 - assert elapsed < 2.8 + shutdown_elapsed_prefix = "shutdown_elapsed=" + shutdown_elapsed_lines = [ + line for line in result.stdout.splitlines() if line.startswith(shutdown_elapsed_prefix) + ] + assert len(shutdown_elapsed_lines) == 1 + assert float(shutdown_elapsed_lines[0][len(shutdown_elapsed_prefix) :]) < 0.5 @patch("httpx.Client") -def test_backend_span_exporter_request_error(mock_client, patched_time_sleep): +def test_backend_span_exporter_request_error(mock_client): # Make post() raise a RequestError each time mock_client.return_value.post.side_effect = httpx.RequestError("Network error") exporter = BackendSpanExporter(api_key="test_key", max_retries=2, base_delay=0.1, max_delay=0.2) - exporter.export([get_span(mock_processor())]) + with patch.object(exporter._shutdown_event, "wait", return_value=False) as wait_for_retry: + exporter.export([get_span(mock_processor())]) # Should retry up to max_retries times assert mock_client.return_value.post.call_count == 2 + wait_for_retry.assert_called_once() exporter.close()