diff --git a/CHANGELOG.md b/CHANGELOG.md index ccde93fb14e..a60f6c081c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- `opentelemetry-sdk`: fix `BatchProcessor.force_flush` to respect `timeout_millis`, previously the timeout was ignored and the flush would block until all telemetry was exported + ([#4982](https://github.com/open-telemetry/opentelemetry-python/pull/4982)) - `opentelemetry-sdk`: Add file configuration support with YAML/JSON loading, environment variable substitution, and schema validation against the vendored OTel config JSON schema ([#4898](https://github.com/open-telemetry/opentelemetry-python/pull/4898)) - Fix intermittent CI failures in `getting-started` and `tracecontext` jobs caused by GitHub git CDN SHA propagation lag by installing contrib packages from the already-checked-out local copy instead of a second git clone diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py index d18acfd029d..2d82b067e4c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py @@ -18,6 +18,7 @@ import enum import inspect import logging +import math import os import threading import time @@ -169,12 +170,19 @@ def worker(self): self._worker_awaken.clear() self._export(BatchExportStrategy.EXPORT_ALL) - def _export(self, batch_strategy: BatchExportStrategy) -> None: + def _export( + self, + batch_strategy: BatchExportStrategy, + deadline: float = math.inf, + ) -> bool: + # Returns True if all batches were exported, False if deadline was reached. with self._export_lock: iteration = 0 # We could see concurrent export calls from worker and force_flush. We call _should_export_batch # once the lock is obtained to see if we still need to make the requested export. while self._should_export_batch(batch_strategy, iteration): + if time.time() >= deadline: + return False iteration += 1 token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) try: @@ -195,6 +203,7 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None: "Exception while exporting %s.", self._exporting ) detach(token) + return True def emit(self, data: Telemetry) -> None: if self._shutdown: @@ -236,10 +245,13 @@ def shutdown(self, timeout_millis: int = 30000): # call is ongoing and the thread isn't finished. In this case we will return instead of waiting on # the thread to finish. - # TODO: Fix force flush so the timeout is used https://github.com/open-telemetry/opentelemetry-python/issues/4568. def force_flush(self, timeout_millis: Optional[int] = None) -> bool: if self._shutdown: return False + deadline = ( + time.time() + (timeout_millis / 1000) + if timeout_millis is not None + else math.inf + ) # Blocking call to export. - self._export(BatchExportStrategy.EXPORT_ALL) - return True + return self._export(BatchExportStrategy.EXPORT_ALL, deadline) diff --git a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py index f5ed15110b9..c9fe11536f9 100644 --- a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py +++ b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py @@ -15,11 +15,13 @@ # pylint: disable=protected-access import gc import logging +import math import multiprocessing import os import threading import time import unittest +import unittest.mock import weakref from platform import system from typing import Any @@ -171,6 +173,68 @@ def test_force_flush_flushes_telemetry( exporter.export.assert_called_once_with([telemetry for _ in range(10)]) batch_processor.shutdown() + # pylint: disable=no-self-use + def test_force_flush_returns_true_when_all_exported( + self, batch_processor_class, telemetry + ): + exporter = Mock() + batch_processor = batch_processor_class( + exporter, + max_queue_size=15, + max_export_batch_size=15, + schedule_delay_millis=30000, + export_timeout_millis=500, + ) + for _ in range(10): + batch_processor._batch_processor.emit(telemetry) + result = batch_processor.force_flush(timeout_millis=5000) + assert result is True + exporter.export.assert_called_once() + batch_processor.shutdown() + + # pylint: disable=no-self-use + def test_force_flush_returns_false_when_timeout_exceeded( + self, batch_processor_class, telemetry + ): + exporter = Mock() + batch_processor = batch_processor_class( + exporter, + max_queue_size=15, + max_export_batch_size=1, + schedule_delay_millis=30000, + export_timeout_millis=500, + ) + for _ in range(3): + batch_processor._batch_processor.emit(telemetry) + # Mock time.time() to always return math.inf, simulating deadline already exceeded. + with unittest.mock.patch( + "opentelemetry.sdk._shared_internal.time" + ) as mock_time: + mock_time.time.return_value = math.inf + result = batch_processor.force_flush(timeout_millis=100) + assert result is False + batch_processor.shutdown() + + # pylint: disable=no-self-use + def test_force_flush_returns_false_when_shutdown( + self, batch_processor_class, telemetry + ): + exporter = Mock() + batch_processor = batch_processor_class( + exporter, + max_queue_size=15, + max_export_batch_size=15, + schedule_delay_millis=30000, + export_timeout_millis=500, + ) + batch_processor.shutdown() + for _ in range(10): + batch_processor._batch_processor.emit(telemetry) + result = batch_processor.force_flush(timeout_millis=5000) + assert result is False + # Nothing should have been exported after shutdown. + exporter.export.assert_not_called() + @unittest.skipUnless( hasattr(os, "fork"), "needs *nix",