Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- `opentelemetry-sdk`: fix `BatchProcessor.force_flush` to respect `timeout_millis`, previously the timeout was ignored and the flush would block until all telemetry was exported
([#4982](https://github.com/open-telemetry/opentelemetry-python/pull/4982))
- `opentelemetry-sdk`: Add file configuration support with YAML/JSON loading, environment variable substitution, and schema validation against the vendored OTel config JSON schema
([#4898](https://github.com/open-telemetry/opentelemetry-python/pull/4898))
- Fix intermittent CI failures in `getting-started` and `tracecontext` jobs caused by GitHub git CDN SHA propagation lag by installing contrib packages from the already-checked-out local copy instead of a second git clone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import enum
import inspect
import logging
import math
import os
import threading
import time
Expand Down Expand Up @@ -169,12 +170,19 @@ def worker(self):
self._worker_awaken.clear()
self._export(BatchExportStrategy.EXPORT_ALL)

def _export(self, batch_strategy: BatchExportStrategy) -> None:
def _export(
self,
batch_strategy: BatchExportStrategy,
deadline: float = math.inf,
) -> bool:
# Returns True if all batches were exported, False if deadline was reached.
with self._export_lock:
iteration = 0
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
# once the lock is obtained to see if we still need to make the requested export.
while self._should_export_batch(batch_strategy, iteration):
if time.time() >= deadline:
return False
iteration += 1
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
Expand All @@ -195,6 +203,7 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
"Exception while exporting %s.", self._exporting
)
detach(token)
return True

def emit(self, data: Telemetry) -> None:
if self._shutdown:
Expand Down Expand Up @@ -236,10 +245,13 @@ def shutdown(self, timeout_millis: int = 30000):
# call is ongoing and the thread isn't finished. In this case we will return instead of waiting on
# the thread to finish.

# TODO: Fix force flush so the timeout is used https://github.com/open-telemetry/opentelemetry-python/issues/4568.
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
if self._shutdown:
return False
deadline = (
time.time() + (timeout_millis / 1000)
if timeout_millis is not None
else math.inf
)
# Blocking call to export.
self._export(BatchExportStrategy.EXPORT_ALL)
return True
return self._export(BatchExportStrategy.EXPORT_ALL, deadline)
64 changes: 64 additions & 0 deletions opentelemetry-sdk/tests/shared_internal/test_batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
# pylint: disable=protected-access
import gc
import logging
import math
import multiprocessing
import os
import threading
import time
import unittest
import unittest.mock
import weakref
from platform import system
from typing import Any
Expand Down Expand Up @@ -171,6 +173,68 @@ def test_force_flush_flushes_telemetry(
exporter.export.assert_called_once_with([telemetry for _ in range(10)])
batch_processor.shutdown()

# pylint: disable=no-self-use
def test_force_flush_returns_true_when_all_exported(
self, batch_processor_class, telemetry
):
exporter = Mock()
batch_processor = batch_processor_class(
exporter,
max_queue_size=15,
max_export_batch_size=15,
schedule_delay_millis=30000,
export_timeout_millis=500,
)
for _ in range(10):
batch_processor._batch_processor.emit(telemetry)
result = batch_processor.force_flush(timeout_millis=5000)
assert result is True
exporter.export.assert_called_once()
batch_processor.shutdown()

# pylint: disable=no-self-use
def test_force_flush_returns_false_when_timeout_exceeded(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just add some mocks here to test this behavior?

self, batch_processor_class, telemetry
):
exporter = Mock()
batch_processor = batch_processor_class(
exporter,
max_queue_size=15,
max_export_batch_size=1,
schedule_delay_millis=30000,
export_timeout_millis=500,
)
for _ in range(3):
batch_processor._batch_processor.emit(telemetry)
# Mock time.time() to always return math.inf, simulating deadline already exceeded.
with unittest.mock.patch(
"opentelemetry.sdk._shared_internal.time"
) as mock_time:
mock_time.time.return_value = math.inf
result = batch_processor.force_flush(timeout_millis=100)
assert result is False
batch_processor.shutdown()

# pylint: disable=no-self-use
def test_force_flush_returns_false_when_shutdown(
self, batch_processor_class, telemetry
):
exporter = Mock()
batch_processor = batch_processor_class(
exporter,
max_queue_size=15,
max_export_batch_size=15,
schedule_delay_millis=30000,
export_timeout_millis=500,
)
batch_processor.shutdown()
for _ in range(10):
batch_processor._batch_processor.emit(telemetry)
result = batch_processor.force_flush(timeout_millis=5000)
assert result is False
# Nothing should have been exported after shutdown.
exporter.export.assert_not_called()

@unittest.skipUnless(
hasattr(os, "fork"),
"needs *nix",
Expand Down
Loading