diff --git a/CHANGELOG.md b/CHANGELOG.md index abf32976e9..3b1127a91e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- `opentelemetry-sdk`: fix multi-processor `force_flush` skipping remaining processors when one returns `None` + ([#5179](https://github.com/open-telemetry/opentelemetry-python/pull/5179)) - `opentelemetry-api`: Enforce W3C Baggage size limits on outbound propagation in `W3CBaggagePropagator.inject()`. Previously only inbound extraction enforced limits; now inject also caps entries at 180, individual pairs at 4096 bytes, and total header at 8192 bytes per the W3C Baggage spec. The extract path max_pairs limit now counts all size-valid entries rather than only successfully parsed ones. ([#5163](https://github.com/open-telemetry/opentelemetry-python/pull/5163)) - `opentelemetry-sdk`: add `additional_properties` support to generated config models via custom `datamodel-codegen` template, enabling plugin/custom component names to flow through typed dataclasses diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py index 956d9f28bd..9e8007fc41 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py @@ -415,15 +415,16 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: False otherwise. """ deadline_ns = time_ns() + timeout_millis * 1000000 + all_flushed = True for lp in self._log_record_processors: current_ts = time_ns() if current_ts >= deadline_ns: return False - if not lp.force_flush((deadline_ns - current_ts) // 1000000): - return False + if lp.force_flush((deadline_ns - current_ts) // 1000000) is False: + all_flushed = False - return True + return all_flushed class ConcurrentMultiLogRecordProcessor(LogRecordProcessor): @@ -497,7 +498,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: return False for future in done_futures: - if not future.result(): + if future.result() is False: return False return True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index 18fced7061..f3a2410f25 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -141,7 +141,7 @@ def on_end(self, span: "ReadableSpan") -> None: def shutdown(self) -> None: """Called when a :class:`opentelemetry.sdk.trace.TracerProvider` is shutdown.""" - def force_flush(self, timeout_millis: int = 30000) -> bool: # type: ignore[reportReturnType] + def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=no-self-use """Export all ended spans to the configured Exporter that have not yet been exported. @@ -152,6 +152,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # type: ignore[repo Returns: False if the timeout is exceeded, True otherwise. """ + return True # Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved @@ -214,15 +215,19 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: given timeout, False otherwise. """ deadline_ns = time_ns() + timeout_millis * 1000000 + all_flushed = True for sp in self._span_processors: current_time_ns = time_ns() if current_time_ns >= deadline_ns: return False - if not sp.force_flush((deadline_ns - current_time_ns) // 1000000): - return False + if ( + sp.force_flush((deadline_ns - current_time_ns) // 1000000) + is False + ): + all_flushed = False - return True + return all_flushed class ConcurrentMultiSpanProcessor(SpanProcessor): @@ -325,7 +330,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: return False for future in done_futures: - if not future.result(): + if future.result() is False: return False return True diff --git a/opentelemetry-sdk/tests/logs/test_multi_log_processor.py b/opentelemetry-sdk/tests/logs/test_multi_log_processor.py index 05fc0fe03d..5642d35885 100644 --- a/opentelemetry-sdk/tests/logs/test_multi_log_processor.py +++ b/opentelemetry-sdk/tests/logs/test_multi_log_processor.py @@ -172,6 +172,36 @@ def delay(_): self.assertEqual(mock_processor1.force_flush.call_count, 1) self.assertEqual(mock_processor2.force_flush.call_count, 0) + def test_force_flush_processor_returns_false(self): + multi_log_record_processor = SynchronousMultiLogRecordProcessor() + + mock_processor1 = Mock(spec=LogRecordProcessor) + mock_processor1.force_flush = Mock(return_value=False) + multi_log_record_processor.add_log_record_processor(mock_processor1) + mock_processor2 = Mock(spec=LogRecordProcessor) + mock_processor2.force_flush = Mock(return_value=True) + multi_log_record_processor.add_log_record_processor(mock_processor2) + + ret_value = multi_log_record_processor.force_flush(50) + self.assertFalse(ret_value) + self.assertEqual(mock_processor1.force_flush.call_count, 1) + self.assertEqual(mock_processor2.force_flush.call_count, 1) + + def test_force_flush_processor_returns_none(self): + multi_log_record_processor = SynchronousMultiLogRecordProcessor() + + mock_processor1 = Mock(spec=LogRecordProcessor) + mock_processor1.force_flush = Mock(return_value=None) + multi_log_record_processor.add_log_record_processor(mock_processor1) + mock_processor2 = Mock(spec=LogRecordProcessor) + mock_processor2.force_flush = Mock(return_value=True) + multi_log_record_processor.add_log_record_processor(mock_processor2) + + ret_value = multi_log_record_processor.force_flush(50) + self.assertTrue(ret_value) + self.assertEqual(mock_processor1.force_flush.call_count, 1) + self.assertEqual(mock_processor2.force_flush.call_count, 1) + class TestConcurrentMultiLogRecordProcessor( MultiLogRecordProcessorTestBase, unittest.TestCase @@ -200,3 +230,39 @@ def delay(_): for mock in mocks: self.assertEqual(1, mock.force_flush.call_count) multi_log_record_processor.shutdown() + + def test_force_flush_processor_returns_false(self): + multi_log_record_processor = ConcurrentMultiLogRecordProcessor() + + false_mock = Mock(spec=LogRecordProcessor) + false_mock.force_flush = Mock(return_value=False) + mocks = [Mock(spec=LogRecordProcessor) for _ in range(4)] + mocks.insert(0, false_mock) + + for mock_processor in mocks: + multi_log_record_processor.add_log_record_processor(mock_processor) + + ret_value = multi_log_record_processor.force_flush() + + self.assertFalse(ret_value) + for mock in mocks: + self.assertEqual(1, mock.force_flush.call_count) + multi_log_record_processor.shutdown() + + def test_force_flush_processor_returns_none(self): + multi_log_record_processor = ConcurrentMultiLogRecordProcessor() + + none_mock = Mock(spec=LogRecordProcessor) + none_mock.force_flush = Mock(return_value=None) + mocks = [Mock(spec=LogRecordProcessor) for _ in range(4)] + mocks.insert(0, none_mock) + + for mock_processor in mocks: + multi_log_record_processor.add_log_record_processor(mock_processor) + + ret_value = multi_log_record_processor.force_flush() + + self.assertTrue(ret_value) + for mock in mocks: + self.assertEqual(1, mock.force_flush.call_count) + multi_log_record_processor.shutdown() diff --git a/opentelemetry-sdk/tests/trace/test_span_processor.py b/opentelemetry-sdk/tests/trace/test_span_processor.py index c751ab96d9..be1f239fa2 100644 --- a/opentelemetry-sdk/tests/trace/test_span_processor.py +++ b/opentelemetry-sdk/tests/trace/test_span_processor.py @@ -435,7 +435,35 @@ def test_force_flush_late_by_span_processor(self): flushed = multi_processor.force_flush(50) self.assertFalse(flushed) self.assertEqual(1, mock_processor1.force_flush.call_count) - self.assertEqual(0, mock_processor2.force_flush.call_count) + self.assertEqual(1, mock_processor2.force_flush.call_count) + + def test_force_flush_processor_returns_none(self): + multi_processor = trace.SynchronousMultiSpanProcessor() + + mock_processor1 = mock.Mock(spec=trace.SpanProcessor) + mock_processor1.force_flush = mock.Mock(return_value=None) + multi_processor.add_span_processor(mock_processor1) + mock_processor2 = mock.Mock(spec=trace.SpanProcessor) + mock_processor2.force_flush = mock.Mock(return_value=True) + multi_processor.add_span_processor(mock_processor2) + + flushed = multi_processor.force_flush(50) + self.assertTrue(flushed) + self.assertEqual(1, mock_processor1.force_flush.call_count) + self.assertEqual(1, mock_processor2.force_flush.call_count) + + def test_force_flush_default_processor(self): + multi_processor = trace.SynchronousMultiSpanProcessor() + + default_processor = trace.SpanProcessor() + multi_processor.add_span_processor(default_processor) + mock_processor = mock.Mock(spec=trace.SpanProcessor) + mock_processor.force_flush = mock.Mock(return_value=True) + multi_processor.add_span_processor(mock_processor) + + flushed = multi_processor.force_flush(50) + self.assertTrue(flushed) + self.assertEqual(1, mock_processor.force_flush.call_count) class TestConcurrentMultiSpanProcessor( @@ -488,6 +516,24 @@ def test_force_flush_late_by_span_processor(self): self.assertEqual(1, mock_processor.force_flush.call_count) multi_processor.shutdown() + def test_force_flush_processor_returns_none(self): + multi_processor = trace.ConcurrentMultiSpanProcessor(5) + + none_mock = mock.Mock(spec=trace.SpanProcessor) + none_mock.force_flush = mock.Mock(return_value=None) + mocks = [mock.Mock(spec=trace.SpanProcessor) for _ in range(0, 4)] + mocks.insert(0, none_mock) + + for mock_processor in mocks: + multi_processor.add_span_processor(mock_processor) + + flushed = multi_processor.force_flush() + + self.assertTrue(flushed) + for mock_processor in mocks: + self.assertEqual(1, mock_processor.force_flush.call_count) + multi_processor.shutdown() + def test_processor_gc(self): multi_processor = trace.ConcurrentMultiSpanProcessor(5) weak_ref = weakref.ref(multi_processor)