From df0baae56ec4655f57ed75bfccadb43dd25ffff8 Mon Sep 17 00:00:00 2001 From: Achal Date: Sun, 15 Mar 2026 20:16:58 +0530 Subject: [PATCH 1/3] fix: respect timeout_millis in BatchProcessor.force_flush (#4568) --- .../sdk/_shared_internal/__init__.py | 15 ++-- .../shared_internal/test_batch_processor.py | 69 +++++++++++++++++++ 2 files changed, 80 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py index d18acfd029d..26265480668 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py @@ -169,12 +169,15 @@ def worker(self): self._worker_awaken.clear() self._export(BatchExportStrategy.EXPORT_ALL) - def _export(self, batch_strategy: BatchExportStrategy) -> None: + def _export(self, batch_strategy: BatchExportStrategy, flush_should_end: Optional[float] = None) -> bool: + # Returns True if all batches were exported, False if flush_should_end was reached. with self._export_lock: iteration = 0 # We could see concurrent export calls from worker and force_flush. We call _should_export_batch # once the lock is obtained to see if we still need to make the requested export. while self._should_export_batch(batch_strategy, iteration): + if flush_should_end is not None and time.time() >= flush_should_end: + return False iteration += 1 token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) try: @@ -195,6 +198,7 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None: "Exception while exporting %s.", self._exporting ) detach(token) + return True def emit(self, data: Telemetry) -> None: if self._shutdown: @@ -236,10 +240,13 @@ def shutdown(self, timeout_millis: int = 30000): # call is ongoing and the thread isn't finished. In this case we will return instead of waiting on # the thread to finish. - # TODO: Fix force flush so the timeout is used https://github.com/open-telemetry/opentelemetry-python/issues/4568. def force_flush(self, timeout_millis: Optional[int] = None) -> bool: if self._shutdown: return False + flush_should_end = ( + time.time() + (timeout_millis / 1000) + if timeout_millis is not None + else None + ) # Blocking call to export. - self._export(BatchExportStrategy.EXPORT_ALL) - return True + return self._export(BatchExportStrategy.EXPORT_ALL, flush_should_end) diff --git a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py index f5ed15110b9..08e81d6f886 100644 --- a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py +++ b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py @@ -171,6 +171,75 @@ def test_force_flush_flushes_telemetry( exporter.export.assert_called_once_with([telemetry for _ in range(10)]) batch_processor.shutdown() + # pylint: disable=no-self-use + def test_force_flush_returns_true_when_all_exported( + self, batch_processor_class, telemetry + ): + exporter = Mock() + batch_processor = batch_processor_class( + exporter, + max_queue_size=15, + max_export_batch_size=15, + schedule_delay_millis=30000, + export_timeout_millis=500, + ) + for _ in range(10): + batch_processor._batch_processor.emit(telemetry) + result = batch_processor.force_flush(timeout_millis=5000) + assert result is True + exporter.export.assert_called_once() + batch_processor.shutdown() + + # pylint: disable=no-self-use + def test_force_flush_returns_false_when_timeout_exceeded( + self, batch_processor_class, telemetry + ): + call_count = 0 + + def slow_export(batch): + nonlocal call_count + call_count += 1 + # Sleep long enough that the deadline is exceeded after first batch. + time.sleep(0.2) + + exporter = Mock() + exporter.export.side_effect = slow_export + batch_processor = batch_processor_class( + exporter, + max_queue_size=50, + max_export_batch_size=1, + schedule_delay_millis=30000, + export_timeout_millis=500, + ) + for _ in range(10): + batch_processor._batch_processor.emit(telemetry) + # 100ms timeout, each export takes 200ms, so deadline is hit after first batch. + result = batch_processor.force_flush(timeout_millis=100) + assert result is False + # Exporter was called at least once but not for all batches. + assert 1 <= call_count < 10 + batch_processor.shutdown() + + # pylint: disable=no-self-use + def test_force_flush_returns_false_when_shutdown( + self, batch_processor_class, telemetry + ): + exporter = Mock() + batch_processor = batch_processor_class( + exporter, + max_queue_size=15, + max_export_batch_size=15, + schedule_delay_millis=30000, + export_timeout_millis=500, + ) + batch_processor.shutdown() + for _ in range(10): + batch_processor._batch_processor.emit(telemetry) + result = batch_processor.force_flush(timeout_millis=5000) + assert result is False + # Nothing should have been exported after shutdown. + exporter.export.assert_not_called() + @unittest.skipUnless( hasattr(os, "fork"), "needs *nix", From 45f8937de197fc889ed997b3ed2f07cff75406d6 Mon Sep 17 00:00:00 2001 From: Achal Date: Sun, 15 Mar 2026 20:41:47 +0530 Subject: [PATCH 2/3] fix: increase queue size in test_force_flush_returns_false_when_timeout_exceeded to avoid flakiness on fast CI runners Fixes #4568. --- .../opentelemetry/sdk/_shared_internal/__init__.py | 11 +++++++++-- .../tests/shared_internal/test_batch_processor.py | 7 ++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py index 26265480668..9a3cec87040 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py @@ -169,14 +169,21 @@ def worker(self): self._worker_awaken.clear() self._export(BatchExportStrategy.EXPORT_ALL) - def _export(self, batch_strategy: BatchExportStrategy, flush_should_end: Optional[float] = None) -> bool: + def _export( + self, + batch_strategy: BatchExportStrategy, + flush_should_end: Optional[float] = None, + ) -> bool: # Returns True if all batches were exported, False if flush_should_end was reached. with self._export_lock: iteration = 0 # We could see concurrent export calls from worker and force_flush. We call _should_export_batch # once the lock is obtained to see if we still need to make the requested export. while self._should_export_batch(batch_strategy, iteration): - if flush_should_end is not None and time.time() >= flush_should_end: + if ( + flush_should_end is not None + and time.time() >= flush_should_end + ): return False iteration += 1 token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) diff --git a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py index 08e81d6f886..47f5a7a7d3d 100644 --- a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py +++ b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py @@ -206,18 +206,19 @@ def slow_export(batch): exporter.export.side_effect = slow_export batch_processor = batch_processor_class( exporter, - max_queue_size=50, + max_queue_size=200, max_export_batch_size=1, + # Long enough that the worker thread won't wake up during the test. schedule_delay_millis=30000, export_timeout_millis=500, ) - for _ in range(10): + for _ in range(50): batch_processor._batch_processor.emit(telemetry) # 100ms timeout, each export takes 200ms, so deadline is hit after first batch. result = batch_processor.force_flush(timeout_millis=100) assert result is False # Exporter was called at least once but not for all batches. - assert 1 <= call_count < 10 + assert 1 <= call_count < 50 batch_processor.shutdown() # pylint: disable=no-self-use From fa685923ab1e97266b3c4ad8baacf3ef50da2011 Mon Sep 17 00:00:00 2001 From: Achal Date: Sun, 15 Mar 2026 20:41:47 +0530 Subject: [PATCH 3/3] fix: respect timeout_millis in BatchProcessor.force_flush Fixes #4568. --- CHANGELOG.md | 2 ++ .../sdk/_shared_internal/__init__.py | 16 +++++------- .../shared_internal/test_batch_processor.py | 26 +++++++------------ 3 files changed, 19 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ccde93fb14e..a60f6c081c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- `opentelemetry-sdk`: fix `BatchProcessor.force_flush` to respect `timeout_millis`, previously the timeout was ignored and the flush would block until all telemetry was exported + ([#4982](https://github.com/open-telemetry/opentelemetry-python/pull/4982)) - `opentelemetry-sdk`: Add file configuration support with YAML/JSON loading, environment variable substitution, and schema validation against the vendored OTel config JSON schema ([#4898](https://github.com/open-telemetry/opentelemetry-python/pull/4898)) - Fix intermittent CI failures in `getting-started` and `tracecontext` jobs caused by GitHub git CDN SHA propagation lag by installing contrib packages from the already-checked-out local copy instead of a second git clone diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py index 9a3cec87040..2d82b067e4c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py @@ -18,6 +18,7 @@ import enum import inspect import logging +import math import os import threading import time @@ -172,18 +173,15 @@ def worker(self): def _export( self, batch_strategy: BatchExportStrategy, - flush_should_end: Optional[float] = None, + deadline: float = math.inf, ) -> bool: - # Returns True if all batches were exported, False if flush_should_end was reached. + # Returns True if all batches were exported, False if deadline was reached. with self._export_lock: iteration = 0 # We could see concurrent export calls from worker and force_flush. We call _should_export_batch # once the lock is obtained to see if we still need to make the requested export. while self._should_export_batch(batch_strategy, iteration): - if ( - flush_should_end is not None - and time.time() >= flush_should_end - ): + if time.time() >= deadline: return False iteration += 1 token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) @@ -250,10 +248,10 @@ def shutdown(self, timeout_millis: int = 30000): def force_flush(self, timeout_millis: Optional[int] = None) -> bool: if self._shutdown: return False - flush_should_end = ( + deadline = ( time.time() + (timeout_millis / 1000) if timeout_millis is not None - else None + else math.inf ) # Blocking call to export. - return self._export(BatchExportStrategy.EXPORT_ALL, flush_should_end) + return self._export(BatchExportStrategy.EXPORT_ALL, deadline) diff --git a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py index 47f5a7a7d3d..c9fe11536f9 100644 --- a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py +++ b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py @@ -15,11 +15,13 @@ # pylint: disable=protected-access import gc import logging +import math import multiprocessing import os import threading import time import unittest +import unittest.mock import weakref from platform import system from typing import Any @@ -194,31 +196,23 @@ def test_force_flush_returns_true_when_all_exported( def test_force_flush_returns_false_when_timeout_exceeded( self, batch_processor_class, telemetry ): - call_count = 0 - - def slow_export(batch): - nonlocal call_count - call_count += 1 - # Sleep long enough that the deadline is exceeded after first batch. - time.sleep(0.2) - exporter = Mock() - exporter.export.side_effect = slow_export batch_processor = batch_processor_class( exporter, - max_queue_size=200, + max_queue_size=15, max_export_batch_size=1, - # Long enough that the worker thread won't wake up during the test. schedule_delay_millis=30000, export_timeout_millis=500, ) - for _ in range(50): + for _ in range(3): batch_processor._batch_processor.emit(telemetry) - # 100ms timeout, each export takes 200ms, so deadline is hit after first batch. - result = batch_processor.force_flush(timeout_millis=100) + # Mock time.time() to always return math.inf, simulating deadline already exceeded. + with unittest.mock.patch( + "opentelemetry.sdk._shared_internal.time" + ) as mock_time: + mock_time.time.return_value = math.inf + result = batch_processor.force_flush(timeout_millis=100) assert result is False - # Exporter was called at least once but not for all batches. - assert 1 <= call_count < 50 batch_processor.shutdown() # pylint: disable=no-self-use