Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- `opentelemetry-sdk`: fix multi-processor `force_flush` skipping remaining processors when one returns `None`
([#5179](https://github.com/open-telemetry/opentelemetry-python/pull/5179))
- `opentelemetry-api`: Enforce W3C Baggage size limits on outbound propagation in `W3CBaggagePropagator.inject()`. Previously only inbound extraction enforced limits; now inject also caps entries at 180, individual pairs at 4096 bytes, and total header at 8192 bytes per the W3C Baggage spec. The extract path max_pairs limit now counts all size-valid entries rather than only successfully parsed ones.
([#5163](https://github.com/open-telemetry/opentelemetry-python/pull/5163))
- `opentelemetry-sdk`: add `additional_properties` support to generated config models via custom `datamodel-codegen` template, enabling plugin/custom component names to flow through typed dataclasses
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,15 +415,16 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
False otherwise.
"""
deadline_ns = time_ns() + timeout_millis * 1000000
all_flushed = True
for lp in self._log_record_processors:
current_ts = time_ns()
if current_ts >= deadline_ns:
return False

if not lp.force_flush((deadline_ns - current_ts) // 1000000):
return False
if lp.force_flush((deadline_ns - current_ts) // 1000000) is False:
all_flushed = False

return True
return all_flushed


class ConcurrentMultiLogRecordProcessor(LogRecordProcessor):
Expand Down Expand Up @@ -497,7 +498,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
return False

for future in done_futures:
if not future.result():
if future.result() is False:
return False

return True
Expand Down
15 changes: 10 additions & 5 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def on_end(self, span: "ReadableSpan") -> None:
def shutdown(self) -> None:
"""Called when a :class:`opentelemetry.sdk.trace.TracerProvider` is shutdown."""

def force_flush(self, timeout_millis: int = 30000) -> bool: # type: ignore[reportReturnType]
def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=no-self-use
"""Export all ended spans to the configured Exporter that have not yet
been exported.

Expand All @@ -152,6 +152,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # type: ignore[repo
Returns:
False if the timeout is exceeded, True otherwise.
"""
return True


# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved
Expand Down Expand Up @@ -214,15 +215,19 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
given timeout, False otherwise.
"""
deadline_ns = time_ns() + timeout_millis * 1000000
all_flushed = True
for sp in self._span_processors:
current_time_ns = time_ns()
if current_time_ns >= deadline_ns:
return False

if not sp.force_flush((deadline_ns - current_time_ns) // 1000000):
return False
if (
sp.force_flush((deadline_ns - current_time_ns) // 1000000)
is False
):
all_flushed = False

return True
return all_flushed


class ConcurrentMultiSpanProcessor(SpanProcessor):
Expand Down Expand Up @@ -325,7 +330,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
return False

for future in done_futures:
if not future.result():
if future.result() is False:
return False

return True
Expand Down
66 changes: 66 additions & 0 deletions opentelemetry-sdk/tests/logs/test_multi_log_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,36 @@ def delay(_):
self.assertEqual(mock_processor1.force_flush.call_count, 1)
self.assertEqual(mock_processor2.force_flush.call_count, 0)

def test_force_flush_processor_returns_false(self):
multi_log_record_processor = SynchronousMultiLogRecordProcessor()

mock_processor1 = Mock(spec=LogRecordProcessor)
mock_processor1.force_flush = Mock(return_value=False)
multi_log_record_processor.add_log_record_processor(mock_processor1)
mock_processor2 = Mock(spec=LogRecordProcessor)
mock_processor2.force_flush = Mock(return_value=True)
multi_log_record_processor.add_log_record_processor(mock_processor2)

ret_value = multi_log_record_processor.force_flush(50)
self.assertFalse(ret_value)
self.assertEqual(mock_processor1.force_flush.call_count, 1)
self.assertEqual(mock_processor2.force_flush.call_count, 1)

def test_force_flush_processor_returns_none(self):
multi_log_record_processor = SynchronousMultiLogRecordProcessor()

mock_processor1 = Mock(spec=LogRecordProcessor)
mock_processor1.force_flush = Mock(return_value=None)
multi_log_record_processor.add_log_record_processor(mock_processor1)
mock_processor2 = Mock(spec=LogRecordProcessor)
mock_processor2.force_flush = Mock(return_value=True)
multi_log_record_processor.add_log_record_processor(mock_processor2)

ret_value = multi_log_record_processor.force_flush(50)
self.assertTrue(ret_value)
self.assertEqual(mock_processor1.force_flush.call_count, 1)
self.assertEqual(mock_processor2.force_flush.call_count, 1)


class TestConcurrentMultiLogRecordProcessor(
MultiLogRecordProcessorTestBase, unittest.TestCase
Expand Down Expand Up @@ -200,3 +230,39 @@ def delay(_):
for mock in mocks:
self.assertEqual(1, mock.force_flush.call_count)
multi_log_record_processor.shutdown()

def test_force_flush_processor_returns_false(self):
multi_log_record_processor = ConcurrentMultiLogRecordProcessor()

false_mock = Mock(spec=LogRecordProcessor)
false_mock.force_flush = Mock(return_value=False)
mocks = [Mock(spec=LogRecordProcessor) for _ in range(4)]
mocks.insert(0, false_mock)

for mock_processor in mocks:
multi_log_record_processor.add_log_record_processor(mock_processor)

ret_value = multi_log_record_processor.force_flush()

self.assertFalse(ret_value)
for mock in mocks:
self.assertEqual(1, mock.force_flush.call_count)
multi_log_record_processor.shutdown()

def test_force_flush_processor_returns_none(self):
multi_log_record_processor = ConcurrentMultiLogRecordProcessor()

none_mock = Mock(spec=LogRecordProcessor)
none_mock.force_flush = Mock(return_value=None)
mocks = [Mock(spec=LogRecordProcessor) for _ in range(4)]
mocks.insert(0, none_mock)

for mock_processor in mocks:
multi_log_record_processor.add_log_record_processor(mock_processor)

ret_value = multi_log_record_processor.force_flush()

self.assertTrue(ret_value)
for mock in mocks:
self.assertEqual(1, mock.force_flush.call_count)
multi_log_record_processor.shutdown()
48 changes: 47 additions & 1 deletion opentelemetry-sdk/tests/trace/test_span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,35 @@ def test_force_flush_late_by_span_processor(self):
flushed = multi_processor.force_flush(50)
self.assertFalse(flushed)
self.assertEqual(1, mock_processor1.force_flush.call_count)
self.assertEqual(0, mock_processor2.force_flush.call_count)
self.assertEqual(1, mock_processor2.force_flush.call_count)

def test_force_flush_processor_returns_none(self):
multi_processor = trace.SynchronousMultiSpanProcessor()

mock_processor1 = mock.Mock(spec=trace.SpanProcessor)
mock_processor1.force_flush = mock.Mock(return_value=None)
multi_processor.add_span_processor(mock_processor1)
mock_processor2 = mock.Mock(spec=trace.SpanProcessor)
mock_processor2.force_flush = mock.Mock(return_value=True)
multi_processor.add_span_processor(mock_processor2)

flushed = multi_processor.force_flush(50)
self.assertTrue(flushed)
self.assertEqual(1, mock_processor1.force_flush.call_count)
self.assertEqual(1, mock_processor2.force_flush.call_count)

def test_force_flush_default_processor(self):
multi_processor = trace.SynchronousMultiSpanProcessor()

default_processor = trace.SpanProcessor()
multi_processor.add_span_processor(default_processor)
mock_processor = mock.Mock(spec=trace.SpanProcessor)
mock_processor.force_flush = mock.Mock(return_value=True)
multi_processor.add_span_processor(mock_processor)

flushed = multi_processor.force_flush(50)
self.assertTrue(flushed)
self.assertEqual(1, mock_processor.force_flush.call_count)


class TestConcurrentMultiSpanProcessor(
Expand Down Expand Up @@ -488,6 +516,24 @@ def test_force_flush_late_by_span_processor(self):
self.assertEqual(1, mock_processor.force_flush.call_count)
multi_processor.shutdown()

def test_force_flush_processor_returns_none(self):
multi_processor = trace.ConcurrentMultiSpanProcessor(5)

none_mock = mock.Mock(spec=trace.SpanProcessor)
none_mock.force_flush = mock.Mock(return_value=None)
mocks = [mock.Mock(spec=trace.SpanProcessor) for _ in range(0, 4)]
mocks.insert(0, none_mock)

for mock_processor in mocks:
multi_processor.add_span_processor(mock_processor)

flushed = multi_processor.force_flush()

self.assertTrue(flushed)
for mock_processor in mocks:
self.assertEqual(1, mock_processor.force_flush.call_count)
multi_processor.shutdown()

def test_processor_gc(self):
multi_processor = trace.ConcurrentMultiSpanProcessor(5)
weak_ref = weakref.ref(multi_processor)
Expand Down