From bb028af265061659e093456fb7c6595e50a687de Mon Sep 17 00:00:00 2001 From: Achal Date: Mon, 9 Mar 2026 13:34:04 +0530 Subject: [PATCH 1/4] Add concurrency tests for meter/instrument methods --- .../tests/metrics/test_instruments.py | 44 +++++++++++++- opentelemetry-api/tests/metrics/test_meter.py | 59 ++++++++++++++++++- .../tests/metrics/test_meter_provider.py | 34 ++++++++++- 3 files changed, 131 insertions(+), 6 deletions(-) diff --git a/opentelemetry-api/tests/metrics/test_instruments.py b/opentelemetry-api/tests/metrics/test_instruments.py index 982cb6b6112..9c2371ad85d 100644 --- a/opentelemetry-api/tests/metrics/test_instruments.py +++ b/opentelemetry-api/tests/metrics/test_instruments.py @@ -13,6 +13,7 @@ # limitations under the License. # type: ignore +import threading from inspect import Signature, isabstract, signature from unittest import TestCase @@ -32,8 +33,6 @@ _Gauge, ) -# FIXME Test that the instrument methods can be called concurrently safely. - class ChildInstrument(Instrument): # pylint: disable=useless-parent-delegation @@ -724,3 +723,44 @@ def test_description_check(self): ], "", ) + + +class TestConcurrency(TestCase): + def _run_concurrently(self, fn, num_threads=10, calls_per_thread=100): + """Helper: run fn concurrently across threads and assert no exceptions.""" + errors = [] + + def worker(): + try: + for _ in range(calls_per_thread): + fn() + except Exception as exc: # pylint: disable=broad-except + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(num_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + self.assertEqual([], errors) + + def test_counter_add_concurrent(self): + """Test that Counter.add can be called concurrently safely.""" + counter = NoOpCounter("name") + self._run_concurrently(lambda: counter.add(1)) + + def test_up_down_counter_add_concurrent(self): + """Test that UpDownCounter.add can be called concurrently safely.""" + up_down_counter = NoOpUpDownCounter("name") + self._run_concurrently(lambda: up_down_counter.add(1)) + + def test_histogram_record_concurrent(self): + """Test that Histogram.record can be called concurrently safely.""" + histogram = NoOpHistogram("name") + self._run_concurrently(lambda: histogram.record(1)) + + def test_gauge_set_concurrent(self): + """Test that Gauge.set can be called concurrently safely.""" + gauge = NoOpMeter("name").create_gauge("name") + self._run_concurrently(lambda: gauge.set(1)) diff --git a/opentelemetry-api/tests/metrics/test_meter.py b/opentelemetry-api/tests/metrics/test_meter.py index 5a7ef3bc8b2..64b5c971309 100644 --- a/opentelemetry-api/tests/metrics/test_meter.py +++ b/opentelemetry-api/tests/metrics/test_meter.py @@ -13,14 +13,13 @@ # limitations under the License. # type: ignore +import threading from logging import WARNING from unittest import TestCase from unittest.mock import Mock, patch from opentelemetry.metrics import Meter, NoOpMeter -# FIXME Test that the meter methods can be called concurrently safely. - class ChildMeter(Meter): # pylint: disable=signature-differs @@ -195,3 +194,59 @@ def test_create_observable_up_down_counter(self): self.assertTrue( Meter.create_observable_up_down_counter.__isabstractmethod__ ) + + +class TestConcurrency(TestCase): + def _run_concurrently(self, fn, num_threads=10, calls_per_thread=100): + """Helper: run fn concurrently across threads and assert no exceptions.""" + errors = [] + + def worker(): + try: + for _ in range(calls_per_thread): + fn() + except Exception as exc: # pylint: disable=broad-except + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(num_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + self.assertEqual([], errors) + + def test_create_counter_concurrent(self): + """Test that Meter.create_counter can be called concurrently safely.""" + meter = NoOpMeter("name") + self._run_concurrently(lambda: meter.create_counter("counter")) + + def test_create_up_down_counter_concurrent(self): + """Test that Meter.create_up_down_counter can be called concurrently safely.""" + meter = NoOpMeter("name") + self._run_concurrently(lambda: meter.create_up_down_counter("up_down_counter")) + + def test_create_observable_counter_concurrent(self): + """Test that Meter.create_observable_counter can be called concurrently safely.""" + meter = NoOpMeter("name") + self._run_concurrently(lambda: meter.create_observable_counter("observable_counter", lambda options: [])) + + def test_create_histogram_concurrent(self): + """Test that Meter.create_histogram can be called concurrently safely.""" + meter = NoOpMeter("name") + self._run_concurrently(lambda: meter.create_histogram("histogram")) + + def test_create_gauge_concurrent(self): + """Test that Meter.create_gauge can be called concurrently safely.""" + meter = NoOpMeter("name") + self._run_concurrently(lambda: meter.create_gauge("gauge")) + + def test_create_observable_gauge_concurrent(self): + """Test that Meter.create_observable_gauge can be called concurrently safely.""" + meter = NoOpMeter("name") + self._run_concurrently(lambda: meter.create_observable_gauge("observable_gauge", lambda options: [])) + + def test_create_observable_up_down_counter_concurrent(self): + """Test that Meter.create_observable_up_down_counter can be called concurrently safely.""" + meter = NoOpMeter("name") + self._run_concurrently(lambda: meter.create_observable_up_down_counter("observable_up_down_counter", lambda options: [])) diff --git a/opentelemetry-api/tests/metrics/test_meter_provider.py b/opentelemetry-api/tests/metrics/test_meter_provider.py index dfaf94bcec2..56dd9f6b022 100644 --- a/opentelemetry-api/tests/metrics/test_meter_provider.py +++ b/opentelemetry-api/tests/metrics/test_meter_provider.py @@ -15,6 +15,7 @@ # pylint: disable=protected-access +import threading from unittest import TestCase from unittest.mock import Mock, patch @@ -48,8 +49,6 @@ reset_metrics_globals, ) -# FIXME Test that the instrument methods can be called concurrently safely. - @fixture def reset_meter_provider(): @@ -165,6 +164,37 @@ def test_get_meter_wrapper(self): self.assertEqual(meter.schema_url, "schema_url") +class TestConcurrency(TestCase): + def _run_concurrently(self, fn, num_threads=10, calls_per_thread=100): + """Helper: run fn concurrently across threads and assert no exceptions.""" + errors = [] + + def worker(): + try: + for _ in range(calls_per_thread): + fn() + except Exception as exc: # pylint: disable=broad-except + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(num_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + self.assertEqual([], errors) + + def test_no_op_meter_provider_get_meter_concurrent(self): + """Test that NoOpMeterProvider.get_meter can be called concurrently safely.""" + meter_provider = NoOpMeterProvider() + self._run_concurrently(lambda: meter_provider.get_meter("name")) + + def test_proxy_meter_provider_get_meter_concurrent(self): + """Test that _ProxyMeterProvider.get_meter can be called concurrently safely.""" + meter_provider = _ProxyMeterProvider() + self._run_concurrently(lambda: meter_provider.get_meter("name")) + + class TestProxy(MetricsGlobalsTest, TestCase): def test_global_proxy_meter_provider(self): # Global get_meter_provider() should initially be a _ProxyMeterProvider From 45c47b41e19b47455b375b59b08489099784ecdb Mon Sep 17 00:00:00 2001 From: Achal Date: Mon, 9 Mar 2026 14:17:10 +0530 Subject: [PATCH 2/4] Fix formatting issues with ruff-pre-commit --- opentelemetry-api/tests/metrics/test_meter.py | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/opentelemetry-api/tests/metrics/test_meter.py b/opentelemetry-api/tests/metrics/test_meter.py index 64b5c971309..95f15eb9138 100644 --- a/opentelemetry-api/tests/metrics/test_meter.py +++ b/opentelemetry-api/tests/metrics/test_meter.py @@ -224,12 +224,18 @@ def test_create_counter_concurrent(self): def test_create_up_down_counter_concurrent(self): """Test that Meter.create_up_down_counter can be called concurrently safely.""" meter = NoOpMeter("name") - self._run_concurrently(lambda: meter.create_up_down_counter("up_down_counter")) + self._run_concurrently( + lambda: meter.create_up_down_counter("up_down_counter") + ) def test_create_observable_counter_concurrent(self): """Test that Meter.create_observable_counter can be called concurrently safely.""" meter = NoOpMeter("name") - self._run_concurrently(lambda: meter.create_observable_counter("observable_counter", lambda options: [])) + self._run_concurrently( + lambda: meter.create_observable_counter( + "observable_counter", lambda options: [] + ) + ) def test_create_histogram_concurrent(self): """Test that Meter.create_histogram can be called concurrently safely.""" @@ -244,9 +250,17 @@ def test_create_gauge_concurrent(self): def test_create_observable_gauge_concurrent(self): """Test that Meter.create_observable_gauge can be called concurrently safely.""" meter = NoOpMeter("name") - self._run_concurrently(lambda: meter.create_observable_gauge("observable_gauge", lambda options: [])) + self._run_concurrently( + lambda: meter.create_observable_gauge( + "observable_gauge", lambda options: [] + ) + ) def test_create_observable_up_down_counter_concurrent(self): """Test that Meter.create_observable_up_down_counter can be called concurrently safely.""" meter = NoOpMeter("name") - self._run_concurrently(lambda: meter.create_observable_up_down_counter("observable_up_down_counter", lambda options: [])) + self._run_concurrently( + lambda: meter.create_observable_up_down_counter( + "observable_up_down_counter", lambda options: [] + ) + ) From d71b5d83d47bd2f85f547ec91be42f354e9e4783 Mon Sep 17 00:00:00 2001 From: Achal Date: Mon, 9 Mar 2026 14:33:07 +0530 Subject: [PATCH 3/4] Fix pylint invalid-name for thread variables --- opentelemetry-api/tests/metrics/test_instruments.py | 8 ++++---- opentelemetry-api/tests/metrics/test_meter.py | 8 ++++---- opentelemetry-api/tests/metrics/test_meter_provider.py | 8 ++++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/opentelemetry-api/tests/metrics/test_instruments.py b/opentelemetry-api/tests/metrics/test_instruments.py index 9c2371ad85d..a21ab76923f 100644 --- a/opentelemetry-api/tests/metrics/test_instruments.py +++ b/opentelemetry-api/tests/metrics/test_instruments.py @@ -738,10 +738,10 @@ def worker(): errors.append(exc) threads = [threading.Thread(target=worker) for _ in range(num_threads)] - for t in threads: - t.start() - for t in threads: - t.join() + for thread in threads: + thread.start() + for thread in threads: + thread.join() self.assertEqual([], errors) diff --git a/opentelemetry-api/tests/metrics/test_meter.py b/opentelemetry-api/tests/metrics/test_meter.py index 95f15eb9138..cb79ab462f4 100644 --- a/opentelemetry-api/tests/metrics/test_meter.py +++ b/opentelemetry-api/tests/metrics/test_meter.py @@ -209,10 +209,10 @@ def worker(): errors.append(exc) threads = [threading.Thread(target=worker) for _ in range(num_threads)] - for t in threads: - t.start() - for t in threads: - t.join() + for thread in threads: + thread.start() + for thread in threads: + thread.join() self.assertEqual([], errors) diff --git a/opentelemetry-api/tests/metrics/test_meter_provider.py b/opentelemetry-api/tests/metrics/test_meter_provider.py index 56dd9f6b022..b8b78412ace 100644 --- a/opentelemetry-api/tests/metrics/test_meter_provider.py +++ b/opentelemetry-api/tests/metrics/test_meter_provider.py @@ -177,10 +177,10 @@ def worker(): errors.append(exc) threads = [threading.Thread(target=worker) for _ in range(num_threads)] - for t in threads: - t.start() - for t in threads: - t.join() + for thread in threads: + thread.start() + for thread in threads: + thread.join() self.assertEqual([], errors) From 3fdef73f6da506c58428fa75fc8e1befe1b37b9e Mon Sep 17 00:00:00 2001 From: Achal Date: Fri, 13 Mar 2026 18:27:32 +0530 Subject: [PATCH 4/4] Update concurrency tests to use ConcurrencyTestBase and add result validation --- .../tests/metrics/test_instruments.py | 39 ++++------ opentelemetry-api/tests/metrics/test_meter.py | 71 +++++++++++-------- .../tests/metrics/test_meter_provider.py | 35 ++++----- 3 files changed, 68 insertions(+), 77 deletions(-) diff --git a/opentelemetry-api/tests/metrics/test_instruments.py b/opentelemetry-api/tests/metrics/test_instruments.py index a21ab76923f..4eaf004cb5f 100644 --- a/opentelemetry-api/tests/metrics/test_instruments.py +++ b/opentelemetry-api/tests/metrics/test_instruments.py @@ -13,7 +13,6 @@ # limitations under the License. # type: ignore -import threading from inspect import Signature, isabstract, signature from unittest import TestCase @@ -32,6 +31,7 @@ UpDownCounter, _Gauge, ) +from opentelemetry.test.concurrency_test import ConcurrencyTestBase class ChildInstrument(Instrument): @@ -725,42 +725,31 @@ def test_description_check(self): ) -class TestConcurrency(TestCase): - def _run_concurrently(self, fn, num_threads=10, calls_per_thread=100): - """Helper: run fn concurrently across threads and assert no exceptions.""" - errors = [] - - def worker(): - try: - for _ in range(calls_per_thread): - fn() - except Exception as exc: # pylint: disable=broad-except - errors.append(exc) - - threads = [threading.Thread(target=worker) for _ in range(num_threads)] - for thread in threads: - thread.start() - for thread in threads: - thread.join() - - self.assertEqual([], errors) - +class TestConcurrency(ConcurrencyTestBase): def test_counter_add_concurrent(self): """Test that Counter.add can be called concurrently safely.""" counter = NoOpCounter("name") - self._run_concurrently(lambda: counter.add(1)) + results = self.run_with_many_threads(lambda: counter.add(1)) + self.assertEqual(len(results), 100) + self.assertTrue(all(result is None for result in results)) def test_up_down_counter_add_concurrent(self): """Test that UpDownCounter.add can be called concurrently safely.""" up_down_counter = NoOpUpDownCounter("name") - self._run_concurrently(lambda: up_down_counter.add(1)) + results = self.run_with_many_threads(lambda: up_down_counter.add(1)) + self.assertEqual(len(results), 100) + self.assertTrue(all(result is None for result in results)) def test_histogram_record_concurrent(self): """Test that Histogram.record can be called concurrently safely.""" histogram = NoOpHistogram("name") - self._run_concurrently(lambda: histogram.record(1)) + results = self.run_with_many_threads(lambda: histogram.record(1)) + self.assertEqual(len(results), 100) + self.assertTrue(all(result is None for result in results)) def test_gauge_set_concurrent(self): """Test that Gauge.set can be called concurrently safely.""" gauge = NoOpMeter("name").create_gauge("name") - self._run_concurrently(lambda: gauge.set(1)) + results = self.run_with_many_threads(lambda: gauge.set(1)) + self.assertEqual(len(results), 100) + self.assertTrue(all(result is None for result in results)) diff --git a/opentelemetry-api/tests/metrics/test_meter.py b/opentelemetry-api/tests/metrics/test_meter.py index cb79ab462f4..e21b221efbe 100644 --- a/opentelemetry-api/tests/metrics/test_meter.py +++ b/opentelemetry-api/tests/metrics/test_meter.py @@ -13,12 +13,22 @@ # limitations under the License. # type: ignore -import threading from logging import WARNING from unittest import TestCase from unittest.mock import Mock, patch -from opentelemetry.metrics import Meter, NoOpMeter +from opentelemetry.metrics import ( + Counter, + Histogram, + Meter, + NoOpMeter, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, + _Gauge, +) +from opentelemetry.test.concurrency_test import ConcurrencyTestBase class ChildMeter(Meter): @@ -196,71 +206,74 @@ def test_create_observable_up_down_counter(self): ) -class TestConcurrency(TestCase): - def _run_concurrently(self, fn, num_threads=10, calls_per_thread=100): - """Helper: run fn concurrently across threads and assert no exceptions.""" - errors = [] - - def worker(): - try: - for _ in range(calls_per_thread): - fn() - except Exception as exc: # pylint: disable=broad-except - errors.append(exc) - - threads = [threading.Thread(target=worker) for _ in range(num_threads)] - for thread in threads: - thread.start() - for thread in threads: - thread.join() - - self.assertEqual([], errors) - +class TestConcurrency(ConcurrencyTestBase): def test_create_counter_concurrent(self): """Test that Meter.create_counter can be called concurrently safely.""" meter = NoOpMeter("name") - self._run_concurrently(lambda: meter.create_counter("counter")) + results = self.run_with_many_threads( + lambda: meter.create_counter("counter") + ) + self.assertEqual(len(results), 100) + self.assertTrue(all(isinstance(r, Counter) for r in results)) def test_create_up_down_counter_concurrent(self): """Test that Meter.create_up_down_counter can be called concurrently safely.""" meter = NoOpMeter("name") - self._run_concurrently( + results = self.run_with_many_threads( lambda: meter.create_up_down_counter("up_down_counter") ) + self.assertEqual(len(results), 100) + self.assertTrue(all(isinstance(r, UpDownCounter) for r in results)) def test_create_observable_counter_concurrent(self): """Test that Meter.create_observable_counter can be called concurrently safely.""" meter = NoOpMeter("name") - self._run_concurrently( + results = self.run_with_many_threads( lambda: meter.create_observable_counter( "observable_counter", lambda options: [] ) ) + self.assertEqual(len(results), 100) + self.assertTrue(all(isinstance(r, ObservableCounter) for r in results)) def test_create_histogram_concurrent(self): """Test that Meter.create_histogram can be called concurrently safely.""" meter = NoOpMeter("name") - self._run_concurrently(lambda: meter.create_histogram("histogram")) + results = self.run_with_many_threads( + lambda: meter.create_histogram("histogram") + ) + self.assertEqual(len(results), 100) + self.assertTrue(all(isinstance(r, Histogram) for r in results)) def test_create_gauge_concurrent(self): """Test that Meter.create_gauge can be called concurrently safely.""" meter = NoOpMeter("name") - self._run_concurrently(lambda: meter.create_gauge("gauge")) + results = self.run_with_many_threads( + lambda: meter.create_gauge("gauge") + ) + self.assertEqual(len(results), 100) + self.assertTrue(all(isinstance(r, _Gauge) for r in results)) def test_create_observable_gauge_concurrent(self): """Test that Meter.create_observable_gauge can be called concurrently safely.""" meter = NoOpMeter("name") - self._run_concurrently( + results = self.run_with_many_threads( lambda: meter.create_observable_gauge( "observable_gauge", lambda options: [] ) ) + self.assertEqual(len(results), 100) + self.assertTrue(all(isinstance(r, ObservableGauge) for r in results)) def test_create_observable_up_down_counter_concurrent(self): """Test that Meter.create_observable_up_down_counter can be called concurrently safely.""" meter = NoOpMeter("name") - self._run_concurrently( + results = self.run_with_many_threads( lambda: meter.create_observable_up_down_counter( "observable_up_down_counter", lambda options: [] ) ) + self.assertEqual(len(results), 100) + self.assertTrue( + all(isinstance(r, ObservableUpDownCounter) for r in results) + ) diff --git a/opentelemetry-api/tests/metrics/test_meter_provider.py b/opentelemetry-api/tests/metrics/test_meter_provider.py index b8b78412ace..171911fd7de 100644 --- a/opentelemetry-api/tests/metrics/test_meter_provider.py +++ b/opentelemetry-api/tests/metrics/test_meter_provider.py @@ -15,7 +15,6 @@ # pylint: disable=protected-access -import threading from unittest import TestCase from unittest.mock import Mock, patch @@ -44,6 +43,7 @@ _ProxyObservableUpDownCounter, _ProxyUpDownCounter, ) +from opentelemetry.test.concurrency_test import ConcurrencyTestBase from opentelemetry.test.globals_test import ( MetricsGlobalsTest, reset_metrics_globals, @@ -164,35 +164,24 @@ def test_get_meter_wrapper(self): self.assertEqual(meter.schema_url, "schema_url") -class TestConcurrency(TestCase): - def _run_concurrently(self, fn, num_threads=10, calls_per_thread=100): - """Helper: run fn concurrently across threads and assert no exceptions.""" - errors = [] - - def worker(): - try: - for _ in range(calls_per_thread): - fn() - except Exception as exc: # pylint: disable=broad-except - errors.append(exc) - - threads = [threading.Thread(target=worker) for _ in range(num_threads)] - for thread in threads: - thread.start() - for thread in threads: - thread.join() - - self.assertEqual([], errors) - +class TestConcurrency(ConcurrencyTestBase): def test_no_op_meter_provider_get_meter_concurrent(self): """Test that NoOpMeterProvider.get_meter can be called concurrently safely.""" meter_provider = NoOpMeterProvider() - self._run_concurrently(lambda: meter_provider.get_meter("name")) + results = self.run_with_many_threads( + lambda: meter_provider.get_meter("name") + ) + self.assertEqual(len(results), 100) + self.assertTrue(all(isinstance(r, NoOpMeter) for r in results)) def test_proxy_meter_provider_get_meter_concurrent(self): """Test that _ProxyMeterProvider.get_meter can be called concurrently safely.""" meter_provider = _ProxyMeterProvider() - self._run_concurrently(lambda: meter_provider.get_meter("name")) + results = self.run_with_many_threads( + lambda: meter_provider.get_meter("name") + ) + self.assertEqual(len(results), 100) + self.assertTrue(all(isinstance(r, _ProxyMeter) for r in results)) class TestProxy(MetricsGlobalsTest, TestCase):