diff --git a/CHANGELOG.md b/CHANGELOG.md index bfed95eb4aa..7ebab5ba63e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,8 +18,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4958](https://github.com/open-telemetry/opentelemetry-python/pull/4958)) - `opentelemetry-sdk`: fix type annotations on `MetricReader` and related types ([#4938](https://github.com/open-telemetry/opentelemetry-python/pull/4938/)) -- Implement log creation metric +- `opentelemetry-sdk`: implement log creation metric ([#4935](https://github.com/open-telemetry/opentelemetry-python/pull/4935)) +- `opentelemetry-sdk`: implement metric reader metrics + ([#4970](https://github.com/open-telemetry/opentelemetry-python/pull/4970)) - `opentelemetry-sdk`: upgrade vendored OTel configuration schema from v1.0.0-rc.3 to v1.0.0 ([#4965](https://github.com/open-telemetry/opentelemetry-python/pull/4965)) - `opentelemetry-exporter-prometheus`: Fix metric name prefix diff --git a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py index 1ccc6d7291a..3748ee91964 100644 --- a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py @@ -105,6 +105,9 @@ MetricsData, Sum, ) +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) from opentelemetry.util.types import Attributes _logger = getLogger(__name__) @@ -142,7 +145,8 @@ def __init__( ObservableCounter: AggregationTemporality.CUMULATIVE, ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, ObservableGauge: AggregationTemporality.CUMULATIVE, - } + }, + otel_component_type=OtelComponentTypeValues.PROMETHEUS_HTTP_TEXT_METRIC_EXPORTER.value, ) self._collector = _CustomCollector( disable_target_info=disable_target_info, prefix=prefix diff --git a/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py b/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py index eacbc845a32..26770c9e1f4 100644 --- a/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py @@ -27,6 +27,7 @@ PrometheusMetricReader, _CustomCollector, ) +from opentelemetry.metrics import NoOpMeterProvider from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import ( AggregationTemporality, @@ -332,6 +333,8 @@ def test_check_value(self): def test_multiple_collection_calls(self): metric_reader = PrometheusMetricReader() provider = MeterProvider(metric_readers=[metric_reader]) + # Disable SDK metrics since they are not constant across collections + metric_reader._set_meter_provider(NoOpMeterProvider()) meter = provider.get_meter("getting-started", "0.1.2") counter = meter.create_counter("counter") counter.add(1) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index 1ffcd5a14c5..89bef8edbb6 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -458,6 +458,7 @@ def __init__( metric_reader._set_collect_callback( self._measurement_consumer.collect ) + metric_reader._set_meter_provider(self) def force_flush(self, timeout_millis: float = 10_000) -> bool: deadline_ns = time_ns() + timeout_millis * 10**6 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index f878755be86..d395270461d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -22,7 +22,7 @@ from os import environ, linesep from sys import stdout from threading import Event, Lock, RLock, Thread -from time import time_ns +from time import perf_counter, time_ns from typing import IO, Callable, Iterable, Optional from typing_extensions import final @@ -35,6 +35,7 @@ detach, set_value, ) +from opentelemetry.metrics import MeterProvider, NoOpMeterProvider from opentelemetry.sdk.environment_variables import ( OTEL_METRIC_EXPORT_INTERVAL, OTEL_METRIC_EXPORT_TIMEOUT, @@ -61,8 +62,13 @@ _UpDownCounter, ) from opentelemetry.sdk.metrics._internal.point import MetricsData +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) from opentelemetry.util._once import Once +from ._metric_reader_metrics import MetricReaderMetrics + _logger = getLogger(__name__) @@ -220,6 +226,8 @@ def __init__( type, "opentelemetry.sdk.metrics.view.Aggregation" ] | None = None, + *, + otel_component_type: str | None = None, ) -> None: self._collect: Callable[ [ @@ -318,6 +326,13 @@ def __init__( else: raise Exception(f"Invalid instrument class found {typ}") + self._otel_component_type = ( + otel_component_type or type(self).__qualname__ + ) + self._metrics = MetricReaderMetrics( + self._otel_component_type, NoOpMeterProvider() + ) + @final def collect(self, timeout_millis: float = 10_000) -> None: """Collects the metrics from the internal SDK state and @@ -337,7 +352,11 @@ def collect(self, timeout_millis: float = 10_000) -> None: ) return - metrics = self._collect(self, timeout_millis=timeout_millis) + start_time = perf_counter() + try: + metrics = self._collect(self, timeout_millis=timeout_millis) + finally: + self._metrics.record_collection(perf_counter() - start_time) if metrics is not None: self._receive_metrics( @@ -368,6 +387,11 @@ def _receive_metrics( ) -> None: """Called by `MetricReader.collect` when it receives a batch of metrics""" + def _set_meter_provider(self, meter_provider: MeterProvider) -> None: + self._metrics = MetricReaderMetrics( + self._otel_component_type, meter_provider + ) + def force_flush(self, timeout_millis: float = 10_000) -> bool: self.collect(timeout_millis=timeout_millis) return True @@ -451,6 +475,7 @@ def __init__( super().__init__( preferred_temporality=exporter._preferred_temporality, preferred_aggregation=exporter._preferred_aggregation, + otel_component_type=OtelComponentTypeValues.PERIODIC_METRIC_READER.value, ) # This lock is held whenever calling self._exporter.export() to prevent concurrent diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/_metric_reader_metrics.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/_metric_reader_metrics.py new file mode 100644 index 00000000000..435d9c2da7b --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/_metric_reader_metrics.py @@ -0,0 +1,34 @@ +from collections import Counter + +from opentelemetry.metrics import MeterProvider +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OTEL_COMPONENT_NAME, + OTEL_COMPONENT_TYPE, +) +from opentelemetry.semconv._incubating.metrics.otel_metrics import ( + create_otel_sdk_metric_reader_collection_duration, +) + +_component_counter = Counter() + + +class MetricReaderMetrics: + def __init__( + self, component_type: str, meter_provider: MeterProvider + ) -> None: + meter = meter_provider.get_meter("opentelemetry-sdk") + + count = _component_counter[component_type] + _component_counter[component_type] = count + 1 + + self._standard_attrs = { + OTEL_COMPONENT_TYPE: component_type, + OTEL_COMPONENT_NAME: f"{component_type}/{count}", + } + + self._collection_duration = ( + create_otel_sdk_metric_reader_collection_duration(meter) + ) + + def record_collection(self, duration: float) -> None: + self._collection_duration.record(duration, self._standard_attrs) diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py b/opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py index 05ccd1469c9..8aa2321cb47 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py @@ -18,6 +18,7 @@ from pytest import mark +from opentelemetry.metrics import NoOpMeterProvider from opentelemetry.sdk.metrics import Histogram, MeterProvider from opentelemetry.sdk.metrics.export import ( AggregationTemporality, @@ -46,6 +47,9 @@ def test_synchronous_delta_temporality(self): ) provider = MeterProvider(metric_readers=[reader]) + # Disable SDK metrics + # pylint: disable=protected-access + reader._set_meter_provider(NoOpMeterProvider()) meter = provider.get_meter("name", "version") histogram = meter.create_histogram("histogram") @@ -159,7 +163,7 @@ def test_synchronous_delta_temporality(self): provider.shutdown() @mark.skipif( - system() != "Linux", + system() == "Windows", reason=( "Tests fail because Windows time_ns resolution is too low so " "two different time measurements may end up having the exact same" @@ -177,6 +181,9 @@ def test_synchronous_cumulative_temporality(self): ) provider = MeterProvider(metric_readers=[reader]) + # Disable SDK metrics + # pylint: disable=protected-access + reader._set_meter_provider(NoOpMeterProvider()) meter = provider.get_meter("name", "version") histogram = meter.create_histogram("histogram") diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_exponential_bucket_histogram.py b/opentelemetry-sdk/tests/metrics/integration_test/test_exponential_bucket_histogram.py index fa44cc6ce50..2e066e102da 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_exponential_bucket_histogram.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_exponential_bucket_histogram.py @@ -18,6 +18,7 @@ from pytest import mark +from opentelemetry.metrics import NoOpMeterProvider from opentelemetry.sdk.metrics import Histogram, MeterProvider from opentelemetry.sdk.metrics.export import ( AggregationTemporality, @@ -57,6 +58,9 @@ def test_synchronous_delta_temporality(self): ) provider = MeterProvider(metric_readers=[reader]) + # Disable SDK metrics + # pylint: disable=protected-access + reader._set_meter_provider(NoOpMeterProvider()) meter = provider.get_meter("name", "version") histogram = meter.create_histogram("histogram") @@ -191,6 +195,9 @@ def test_synchronous_cumulative_temporality(self): ) provider = MeterProvider(metric_readers=[reader]) + # Disable SDK metrics + # pylint: disable=protected-access + reader._set_meter_provider(NoOpMeterProvider()) meter = provider.get_meter("name", "version") histogram = meter.create_histogram("histogram") diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_sum_aggregation.py b/opentelemetry-sdk/tests/metrics/integration_test/test_sum_aggregation.py index b876ac99064..61371416508 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_sum_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_sum_aggregation.py @@ -21,7 +21,7 @@ from pytest import mark from opentelemetry.context import Context -from opentelemetry.metrics import Observation +from opentelemetry.metrics import NoOpMeterProvider, Observation from opentelemetry.sdk.metrics import Counter, MeterProvider, ObservableCounter from opentelemetry.sdk.metrics._internal.exemplar import AlwaysOnExemplarFilter from opentelemetry.sdk.metrics.export import ( @@ -33,7 +33,7 @@ class TestSumAggregation(TestCase): @mark.skipif( - system() != "Linux", + system() == "Windows", reason=( "Tests fail because Windows time_ns resolution is too low so " "two different time measurements may end up having the exact same" @@ -68,6 +68,9 @@ def observable_counter_callback(callback_options): ) provider = MeterProvider(metric_readers=[reader]) + # Disable SDK metrics + # pylint: disable=protected-access + reader._set_meter_provider(NoOpMeterProvider()) meter = provider.get_meter("name", "version") meter.create_observable_counter( @@ -156,7 +159,7 @@ def observable_counter_callback(callback_options): self.assertIsNone(metrics_data) @mark.skipif( - system() != "Linux", + system() == "Windows", reason=( "Tests fail because Windows time_ns resolution is too low so " "two different time measurements may end up having the exact same" @@ -191,6 +194,9 @@ def observable_counter_callback(callback_options): ) provider = MeterProvider(metric_readers=[reader]) + # Disable SDK metrics + # pylint: disable=protected-access + reader._set_meter_provider(NoOpMeterProvider()) meter = provider.get_meter("name", "version") meter.create_observable_counter( @@ -251,7 +257,7 @@ def observable_counter_callback(callback_options): self.assertIsNone(metrics_data) @mark.skipif( - system() != "Linux", + system() == "Windows", reason=( "Tests fail because Windows time_ns resolution is too low so " "two different time measurements may end up having the exact same" @@ -267,6 +273,9 @@ def test_synchronous_delta_temporality(self): ) provider = MeterProvider(metric_readers=[reader]) + # Disable SDK metrics + # pylint: disable=protected-access + reader._set_meter_provider(NoOpMeterProvider()) meter = provider.get_meter("name", "version") counter = meter.create_counter("counter") @@ -378,7 +387,7 @@ def test_synchronous_delta_temporality(self): provider.shutdown() @mark.skipif( - system() != "Linux", + system() == "Windows", reason=( "Tests fail because Windows time_ns resolution is too low so " "two different time measurements may end up having the exact same" @@ -394,6 +403,9 @@ def test_synchronous_cumulative_temporality(self): ) provider = MeterProvider(metric_readers=[reader]) + # Disable SDK metrics + # pylint: disable=protected-access + reader._set_meter_provider(NoOpMeterProvider()) meter = provider.get_meter("name", "version") counter = meter.create_counter("counter") diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 2caa2fad11f..3e47e577689 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -19,14 +19,19 @@ import weakref from logging import WARNING from time import sleep, time_ns -from typing import Optional +from typing import Optional, cast from unittest.mock import Mock import pytest -from opentelemetry.sdk.metrics import Counter, MetricsTimeoutError +from opentelemetry.sdk.metrics import ( + Counter, + MeterProvider, + MetricsTimeoutError, +) from opentelemetry.sdk.metrics._internal import _Counter from opentelemetry.sdk.metrics._internal.point import ( + HistogramDataPoint, MetricsData, ResourceMetrics, ScopeMetrics, @@ -309,3 +314,49 @@ def test_metric_exporer_gc(self): weak_ref(), "The PeriodicExportingMetricReader object created by this test wasn't garbage collected", ) + + def test_metric_reader_metrics(self): + exporter = FakeMetricsExporter() + pmr = PeriodicExportingMetricReader( + exporter, export_interval_millis=100000 + ) + mp = MeterProvider(metric_readers=[pmr]) + + counter = mp.get_meter("test").create_counter("test_counter") + counter.add(1) + + mp.force_flush() + self.assertEqual(len(exporter.metrics), 1) + # Need a second collection to get the metric we recorded during first collection + exporter.metrics.clear() + mp.force_flush() + self.assertEqual(len(exporter.metrics), 1) + metric_data = exporter.metrics[0] + + scope_metrics = [ + sm + for sm in metric_data.resource_metrics[0].scope_metrics + if sm.scope.name == "opentelemetry-sdk" + ] + self.assertEqual(len(scope_metrics), 1) + reader_metrics = [ + m + for m in scope_metrics[0].metrics + if m.name == "otel.sdk.metric_reader.collection.duration" + ] + self.assertEqual(len(reader_metrics), 1) + metric = reader_metrics[0] + + point = metric.data.data_points[0] + histogram = cast(HistogramDataPoint, point) + self.assertEqual(histogram.count, 1) + attrs = histogram.attributes + assert attrs is not None + self.assertEqual( + attrs["otel.component.type"], "periodic_metric_reader" + ) + name = attrs["otel.component.name"] + assert isinstance(name, str) + self.assertTrue(name.startswith("periodic_metric_reader/")) + + mp.shutdown()