diff --git a/CHANGELOG.md b/CHANGELOG.md index bfed95eb4aa..7852e054d00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4935](https://github.com/open-telemetry/opentelemetry-python/pull/4935)) - `opentelemetry-sdk`: upgrade vendored OTel configuration schema from v1.0.0-rc.3 to v1.0.0 ([#4965](https://github.com/open-telemetry/opentelemetry-python/pull/4965)) +- `opentelemetry-sdk`: implement exporter metrics + ([#4976](https://github.com/open-telemetry/opentelemetry-python/pull/4976)) - `opentelemetry-exporter-prometheus`: Fix metric name prefix ([#4895](https://github.com/open-telemetry/opentelemetry-python/pull/4895)) diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py new file mode 100644 index 00000000000..c3c605f31c8 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py @@ -0,0 +1,121 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from collections import Counter +from time import perf_counter +from typing import TYPE_CHECKING, Callable + +from opentelemetry.metrics import MeterProvider, get_meter_provider +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OTEL_COMPONENT_NAME, + OTEL_COMPONENT_TYPE, +) +from opentelemetry.semconv._incubating.metrics.otel_metrics import ( + create_otel_sdk_exporter_log_exported, + create_otel_sdk_exporter_log_inflight, + create_otel_sdk_exporter_metric_data_point_exported, + create_otel_sdk_exporter_metric_data_point_inflight, + create_otel_sdk_exporter_operation_duration, + create_otel_sdk_exporter_span_exported, + create_otel_sdk_exporter_span_inflight, +) +from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE +from opentelemetry.semconv.attributes.server_attributes import ( + SERVER_ADDRESS, + SERVER_PORT, +) + +if TYPE_CHECKING: + from typing import Literal + from urllib.parse import ParseResult as UrlParseResult + + from opentelemetry.util.types import Attributes, AttributeValue + +_component_counter = Counter() + + +class ExporterMetrics: + def __init__( + self, + component_type: str, + signal: Literal["span", "log", "metric_data_point"], + endpoint: UrlParseResult, + meter_provider: MeterProvider | None, + ) -> None: + if signal == "span": + create_exported = create_otel_sdk_exporter_span_exported + create_inflight = create_otel_sdk_exporter_span_inflight + elif signal == "log": + create_exported = create_otel_sdk_exporter_log_exported + create_inflight = create_otel_sdk_exporter_log_inflight + else: + create_exported = ( + create_otel_sdk_exporter_metric_data_point_exported + ) + create_inflight = ( + create_otel_sdk_exporter_metric_data_point_inflight + ) + + port = endpoint.port + if port is None: + if endpoint.scheme == "https": + port = 443 + elif endpoint.scheme == "http": + port = 80 + + count = _component_counter[component_type] + _component_counter[component_type] = count + 1 + self._standard_attrs: dict[str, AttributeValue] = { + OTEL_COMPONENT_TYPE: component_type, + OTEL_COMPONENT_NAME: f"{component_type}/{count}", + } + if endpoint.hostname: + self._standard_attrs[SERVER_ADDRESS] = endpoint.hostname + if port is not None: + self._standard_attrs[SERVER_PORT] = port + + meter_provider = meter_provider or get_meter_provider() + meter = meter_provider.get_meter("opentelemetry-sdk") + self._inflight = create_inflight(meter) + self._exported = create_exported(meter) + self._duration = create_otel_sdk_exporter_operation_duration(meter) + + def start_export( + self, num_items: int + ) -> Callable[[Exception | None, Attributes], None]: + start_time = perf_counter() + self._inflight.add(num_items, self._standard_attrs) + + def finish_export( + error: Exception | None, + error_attrs: Attributes, + ): + end_time = perf_counter() + self._inflight.add(-num_items, self._standard_attrs) + exported_attrs = ( + {**self._standard_attrs, ERROR_TYPE: type(error).__qualname__} + if error + else self._standard_attrs + ) + self._exported.add(num_items, exported_attrs) + duration_attrs = ( + {**exported_attrs, **error_attrs} + if error_attrs + else exported_attrs + ) + self._duration.record(end_time - start_time, duration_attrs) + + return finish_export diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py index 63d8ac9cfb0..a86af8dfbca 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py @@ -22,6 +22,7 @@ _get_credentials, environ_to_compression, ) +from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( ExportLogsServiceRequest, ) @@ -44,6 +45,9 @@ OTEL_EXPORTER_OTLP_LOGS_INSECURE, OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, ) +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) class OTLPLogExporter( @@ -66,6 +70,8 @@ def __init__( timeout: Optional[float] = None, compression: Optional[Compression] = None, channel_options: Optional[Tuple[Tuple[str, str]]] = None, + *, + meter_provider: Optional[MeterProvider] = None, ): insecure_logs = environ.get(OTEL_EXPORTER_OTLP_LOGS_INSECURE) if insecure is None and insecure_logs is not None: @@ -105,6 +111,9 @@ def __init__( stub=LogsServiceStub, result=LogRecordExportResult, channel_options=channel_options, + component_type=OtelComponentTypeValues.OTLP_GRPC_LOG_EXPORTER.value, + signal="log", + meter_provider=meter_provider, ) def _translate_data( @@ -112,6 +121,9 @@ def _translate_data( ) -> ExportLogsServiceRequest: return encode_logs(data) + def _count_data(self, data: Sequence[ReadableLogRecord]): + return len(data) + def export( # type: ignore [reportIncompatibleMethodOverride] self, batch: Sequence[ReadableLogRecord], diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 89c2608c30a..5a3f690141b 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -56,12 +56,16 @@ secure_channel, ssl_channel_credentials, ) +from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( + ExporterMetrics, +) from opentelemetry.exporter.otlp.proto.common._internal import ( _get_resource_data, ) from opentelemetry.exporter.otlp.proto.grpc import ( _OTLP_GRPC_CHANNEL_OPTIONS, ) +from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( ExportLogsServiceRequest, ) @@ -104,6 +108,9 @@ from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExportResult +from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( + RPC_RESPONSE_STATUS_CODE, +) from opentelemetry.util._importlib_metadata import entry_points from opentelemetry.util.re import parse_env_headers @@ -299,6 +306,10 @@ def __init__( timeout: Optional[float] = None, compression: Optional[Compression] = None, channel_options: Optional[Tuple[Tuple[str, str]]] = None, + *, + component_type: Optional[str] = None, + signal: Literal["span", "log", "metric_data_point"] = "span", + meter_provider: Optional[MeterProvider] = None, ): super().__init__() self._result = result @@ -372,6 +383,16 @@ def __init__( OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, ) + self._component_type = component_type or type(self).__qualname__ + self._signal: Literal["span", "log", "metric_data_point"] = signal + self._parsed_url = parsed_url + self._metrics = ExporterMetrics( + self._component_type, + signal, + parsed_url, + meter_provider, + ) + self._initialize_channel_and_stub() def _initialize_channel_and_stub(self): @@ -404,6 +425,13 @@ def _translate_data( ) -> ExportServiceRequestT: pass + @abstractmethod + def _count_data( + self, + data: SDKDataT, + ) -> int: + pass + def _export( self, data: SDKDataT, @@ -412,6 +440,8 @@ def _export( logger.warning("Exporter already shutdown, ignoring batch") return self._result.FAILURE # type: ignore [reportReturnType] + finish_export = self._metrics.start_export(self._count_data(data)) + # FIXME remove this check if the export type for traces # gets updated to a class that represents the proto # TracesData and use the code below instead. @@ -425,6 +455,7 @@ def _export( metadata=self._headers, timeout=deadline_sec - time(), ) + finish_export(None, None) return self._result.SUCCESS # type: ignore [reportReturnType] except RpcError as error: retry_info_bin = dict(error.trailing_metadata()).get( # type: ignore [reportAttributeAccessIssue] @@ -472,6 +503,9 @@ def _export( error.code(), # type: ignore [reportAttributeAccessIssue] exc_info=error.code() == StatusCode.UNKNOWN, # type: ignore [reportAttributeAccessIssue] ) + finish_export( + error, {RPC_RESPONSE_STATUS_CODE: error.code().name} + ) return self._result.FAILURE # type: ignore [reportReturnType] logger.warning( "Transient error %s encountered while exporting %s to %s, retrying in %.2fs.", @@ -510,3 +544,11 @@ def _exporting(self) -> str: warning messages. """ pass + + def _set_meter_provider(self, meter_provider: MeterProvider) -> None: + self._metrics = ExporterMetrics( + self._component_type, + self._signal, + self._parsed_url, + meter_provider, + ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index af77f6d1239..b8d000c6b17 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -16,7 +16,7 @@ from dataclasses import replace from logging import getLogger from os import environ -from typing import Iterable, List, Tuple, Union +from typing import Iterable, List, Optional, Tuple, Union from typing import Sequence as TypingSequence from grpc import ChannelCredentials, Compression @@ -32,6 +32,7 @@ environ_to_compression, get_resource_data, ) +from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( ExportMetricsServiceRequest, ) @@ -72,6 +73,9 @@ from opentelemetry.sdk.metrics.export import ( # noqa: F401 Histogram as HistogramType, ) +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) _logger = getLogger(__name__) @@ -109,6 +113,8 @@ def __init__( preferred_aggregation: dict[type, Aggregation] | None = None, max_export_batch_size: int | None = None, channel_options: Tuple[Tuple[str, str]] | None = None, + *, + meter_provider: Optional[MeterProvider] = None, ): insecure_metrics = environ.get(OTEL_EXPORTER_OTLP_METRICS_INSECURE) if insecure is None and insecure_metrics is not None: @@ -153,6 +159,9 @@ def __init__( timeout=timeout or environ_timeout, compression=compression, channel_options=channel_options, + component_type=OtelComponentTypeValues.OTLP_GRPC_METRIC_EXPORTER.value, + signal="metric_data_point", + meter_provider=meter_provider, ) self._max_export_batch_size: int | None = max_export_batch_size @@ -162,6 +171,16 @@ def _translate_data( # type: ignore [reportIncompatibleMethodOverride] ) -> ExportMetricsServiceRequest: return encode_metrics(data) + def _count_data(self, data: MetricsData): + num_items = 0 + + for resource_metrics in data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + for metric in scope_metrics.metrics: + num_items += len(metric.data.data_points) + + return num_items + def export( self, metrics_data: MetricsData, @@ -268,6 +287,9 @@ def _split_metrics_data( def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis) + def set_meter_provider(self, meter_provider: MeterProvider): + return self._set_meter_provider(meter_provider) + @property def _exporting(self) -> str: return "metrics" diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py index 19b189e5b9c..fdd5c9309f6 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py @@ -28,6 +28,7 @@ environ_to_compression, get_resource_data, ) +from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( ExportTraceServiceRequest, ) @@ -58,6 +59,9 @@ ) from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) logger = logging.getLogger(__name__) @@ -95,6 +99,8 @@ def __init__( timeout: Optional[float] = None, compression: Optional[Compression] = None, channel_options: Optional[Tuple[Tuple[str, str]]] = None, + *, + meter_provider: Optional[MeterProvider] = None, ): insecure_spans = environ.get(OTEL_EXPORTER_OTLP_TRACES_INSECURE) if insecure is None and insecure_spans is not None: @@ -135,6 +141,9 @@ def __init__( timeout=timeout or environ_timeout, compression=compression, channel_options=channel_options, + component_type=OtelComponentTypeValues.OTLP_GRPC_SPAN_EXPORTER.value, + signal="span", + meter_provider=meter_provider, ) def _translate_data( @@ -142,6 +151,9 @@ def _translate_data( ) -> ExportTraceServiceRequest: return encode_spans(data) + def _count_data(self, data: Sequence[ReadableSpan]): + return len(data) + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: return self._export(spans) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py index 94e8cc944c3..64c6eb57714 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -402,6 +402,10 @@ def test_translate_log_data(self): expected, self.exporter._translate_data([self.log_data_1]) ) + def test_count_log_data(self): + # pylint: disable=protected-access + self.assertEqual(1, self.exporter._count_data([self.log_data_1])) + def test_translate_multiple_logs(self): expected = ExportLogsServiceRequest( resource_logs=[ @@ -539,3 +543,12 @@ def test_translate_multiple_logs(self): [self.log_data_1, self.log_data_2, self.log_data_3] ), ) + + def test_count_multiple_logs(self): + self.assertEqual( + 3, + # pylint: disable=protected-access + self.exporter._count_data( + [self.log_data_1, self.log_data_2, self.log_data_3] + ), + ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index de27d0fe792..39a85b6ea69 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -55,6 +55,8 @@ _OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER, OTEL_EXPORTER_OTLP_COMPRESSION, ) +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader from opentelemetry.sdk.trace import ReadableSpan, _Span from opentelemetry.sdk.trace.export import ( SpanExporter, @@ -78,13 +80,23 @@ class OTLPSpanExporterForTesting( ], ): def __init__(self, **kwargs): - super().__init__(TraceServiceStub, SpanExportResult, **kwargs) + super().__init__( + TraceServiceStub, + SpanExportResult, + component_type="test_span_exporter", + signal="span", + meter_provider=kwargs.pop("meter_provider", None), + **kwargs, + ) def _translate_data( self, data: Sequence[ReadableSpan] ) -> ExportTraceServiceRequest: return encode_spans(data) + def _count_data(self, data: Sequence[ReadableSpan]) -> int: + return len(data) + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: return self._export(spans) @@ -154,6 +166,7 @@ def join(self, timeout: Optional[float] = None) -> Any: return self._return +# pylint: disable=too-many-public-methods class TestOTLPExporterMixin(TestCase): def setUp(self): self.server = server(ThreadPoolExecutor(max_workers=10)) @@ -161,7 +174,14 @@ def setUp(self): self.server.add_insecure_port("127.0.0.1:4317") self.server.start() - self.exporter = OTLPSpanExporterForTesting(insecure=True) + + self.metric_reader = InMemoryMetricReader() + self.meter_provider = MeterProvider( + metric_readers=[self.metric_reader] + ) + self.exporter = OTLPSpanExporterForTesting( + insecure=True, meter_provider=self.meter_provider + ) self.span = _Span( "a", context=Mock( @@ -372,6 +392,26 @@ def test_shutdown(self): self.assertEqual( self.exporter.export([self.span]), SpanExportResult.SUCCESS ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertEqual(metrics[1].name, "otel.sdk.exporter.span.exported") + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertEqual(metrics[2].name, "otel.sdk.exporter.span.inflight") + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.exporter.shutdown() with self.assertLogs(level=WARNING) as warning: self.assertEqual( @@ -446,7 +486,9 @@ def test_retry_info_is_respected(self): mock_trace_service, self.server, ) - exporter = OTLPSpanExporterForTesting(insecure=True, timeout=10) + exporter = OTLPSpanExporterForTesting( + insecure=True, timeout=10, meter_provider=self.meter_provider + ) before = time.time() self.assertEqual( exporter.export([self.span]), @@ -457,6 +499,51 @@ def test_retry_info_is_respected(self): # 1 second plus wiggle room so the test passes consistently. self.assertAlmostEqual(after - before, 1, 1) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertEqual( + metrics[0].data.data_points[0].attributes["error.type"], + "_InactiveRpcError", + ) + self.assertEqual( + metrics[0] + .data.data_points[0] + .attributes["rpc.response.status_code"], + "UNAVAILABLE", + ) + self.assertEqual(metrics[1].name, "otel.sdk.exporter.span.exported") + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertEqual( + metrics[1].data.data_points[0].attributes["error.type"], + "_InactiveRpcError", + ) + self.assertNotIn( + "rpc.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual(metrics[2].name, "otel.sdk.exporter.span.inflight") + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "rpc.response.status_code", + metrics[2].data.data_points[0].attributes, + ) + @unittest.skipIf( system() == "Windows", "For gRPC + windows there's some added delay in the RPCs which breaks the assertion over amount of time passed.", @@ -548,6 +635,51 @@ def test_permanent_failure(self): "Failed to export traces to localhost:4317, error code: StatusCode.ALREADY_EXISTS", ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertEqual( + metrics[0].data.data_points[0].attributes["error.type"], + "_InactiveRpcError", + ) + self.assertEqual( + metrics[0] + .data.data_points[0] + .attributes["rpc.response.status_code"], + "ALREADY_EXISTS", + ) + self.assertEqual(metrics[1].name, "otel.sdk.exporter.span.exported") + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertEqual( + metrics[1].data.data_points[0].attributes["error.type"], + "_InactiveRpcError", + ) + self.assertNotIn( + "rpc.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual(metrics[2].name, "otel.sdk.exporter.span.inflight") + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "rpc.response.status_code", + metrics[2].data.data_points[0].attributes, + ) + def test_unavailable_reconnects(self): """Test that the exporter reconnects on UNAVAILABLE error""" add_TraceServiceServicer_to_server( @@ -571,3 +703,13 @@ def test_unavailable_reconnects(self): # must be from the reconnection logic. self.assertTrue(mock_insecure_channel.called) # Verify that reconnection enabled flag is set + + def assert_standard_metric_attrs(self, attributes): + self.assertEqual( + attributes["otel.component.type"], "test_span_exporter" + ) + self.assertTrue( + attributes["otel.component.name"].startswith("test_span_exporter/") + ) + self.assertEqual(attributes["server.address"], "localhost") + self.assertEqual(attributes["server.port"], 4317) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index 16fd666dd1a..6799178b658 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -590,6 +590,69 @@ def test_split_metrics_data_many_resources_scopes_metrics(self): split_metrics_data, ) + def test_count_metrics_data(self): + # GIVEN + metrics_data = MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + ], + ), + _gauge( + index=2, + data_points=[ + _number_data_point(12), + ], + ), + ], + ), + _scope_metrics( + index=2, + metrics=[ + _gauge( + index=3, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + _resource_metrics( + index=2, + scope_metrics=[ + _scope_metrics( + index=3, + metrics=[ + _gauge( + index=4, + data_points=[ + _number_data_point(14), + ], + ), + ], + ), + ], + ), + ] + ) + # WHEN + # pylint: disable=protected-access + count = OTLPMetricExporter(max_export_batch_size=2)._count_data( + metrics_data, + ) + # THEN + self.assertEqual(count, 4) + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") def test_insecure_https_endpoint(self, mock_secure_channel): OTLPMetricExporter(endpoint="https://ab.c:123", insecure=True) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index cbe6298df77..c05fc0565d3 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -481,6 +481,10 @@ def test_translate_spans(self): # pylint: disable=protected-access self.assertEqual(expected, self.exporter._translate_data([self.span])) + def test_count_spans(self): + # pylint: disable=protected-access + self.assertEqual(1, self.exporter._count_data([self.span])) + def test_translate_spans_multi(self): expected = ExportTraceServiceRequest( resource_spans=[ @@ -660,6 +664,13 @@ def test_translate_spans_multi(self): self.exporter._translate_data([self.span, self.span2, self.span3]), ) + def test_count_spans_multi(self): + self.assertEqual( + # pylint: disable=protected-access + 3, + self.exporter._count_data([self.span, self.span2, self.span3]), + ) + def _check_translated_status( self, translated: ExportTraceServiceRequest, diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 7aea76be8d2..770eeeba037 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -21,10 +21,14 @@ from os import environ from time import time from typing import Dict, Optional, Sequence +from urllib.parse import urlparse import requests from requests.exceptions import ConnectionError +from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( + ExporterMetrics, +) from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.exporter.otlp.proto.http import ( _OTLP_HTTP_HEADERS, @@ -34,6 +38,7 @@ _is_retryable, _load_session_from_envvar, ) +from opentelemetry.metrics import MeterProvider from opentelemetry.sdk._logs import ReadableLogRecord from opentelemetry.sdk._logs.export import ( LogRecordExporter, @@ -57,6 +62,12 @@ OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, OTEL_EXPORTER_OTLP_TIMEOUT, ) +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) +from opentelemetry.semconv.attributes.http_attributes import ( + HTTP_RESPONSE_STATUS_CODE, +) from opentelemetry.util.re import parse_env_headers _logger = logging.getLogger(__name__) @@ -82,6 +93,8 @@ def __init__( timeout: Optional[float] = None, compression: Optional[Compression] = None, session: Optional[requests.Session] = None, + *, + meter_provider: Optional[MeterProvider] = None, ): self._shutdown_is_occuring = threading.Event() self._endpoint = endpoint or environ.get( @@ -139,6 +152,13 @@ def __init__( ) self._shutdown = False + self._metrics = ExporterMetrics( + OtelComponentTypeValues.OTLP_HTTP_LOG_EXPORTER.value, + "log", + urlparse(self._endpoint), + meter_provider, + ) + def _export( self, serialized_data: bytes, timeout_sec: Optional[float] = None ): @@ -183,17 +203,22 @@ def export( _logger.warning("Exporter already shutdown, ignoring batch") return LogRecordExportResult.FAILURE + finish_export = self._metrics.start_export(len(batch)) + serialized_data = encode_logs(batch).SerializeToString() deadline_sec = time() + self._timeout for retry_num in range(_MAX_RETRYS): # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + export_error: Optional[Exception] = None try: resp = self._export(serialized_data, deadline_sec - time()) if resp.ok: + finish_export(None, None) return LogRecordExportResult.SUCCESS except requests.exceptions.RequestException as error: reason = error + export_error = error retryable = isinstance(error, ConnectionError) status_code = None else: @@ -207,6 +232,12 @@ def export( status_code, reason, ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + finish_export(export_error, error_attrs) return LogRecordExportResult.FAILURE if ( @@ -218,6 +249,12 @@ def export( "Failed to export logs batch due to timeout, " "max retries or shutdown." ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + finish_export(export_error, error_attrs) return LogRecordExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting logs batch, retrying in %.2fs.", diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 7e08f624375..3d8250f0485 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -29,11 +29,15 @@ Optional, Sequence, ) +from urllib.parse import urlparse import requests from requests.exceptions import ConnectionError from typing_extensions import deprecated +from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( + ExporterMetrics, +) from opentelemetry.exporter.otlp.proto.common._internal import ( _get_resource_data, ) @@ -51,6 +55,7 @@ _is_retryable, _load_session_from_envvar, ) +from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401 ExportMetricsServiceRequest, ) @@ -96,6 +101,12 @@ Histogram as HistogramType, ) from opentelemetry.sdk.resources import Resource as SDKResource +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) +from opentelemetry.semconv.attributes.http_attributes import ( + HTTP_RESPONSE_STATUS_CODE, +) from opentelemetry.util.re import parse_env_headers _logger = logging.getLogger(__name__) @@ -122,6 +133,8 @@ def __init__( preferred_temporality: dict[type, AggregationTemporality] | None = None, preferred_aggregation: dict[type, Aggregation] | None = None, + *, + meter_provider: Optional[MeterProvider] = None, ): self._shutdown_in_progress = threading.Event() self._endpoint = endpoint or environ.get( @@ -182,6 +195,13 @@ def __init__( ) self._shutdown = False + self._metrics = ExporterMetrics( + OtelComponentTypeValues.OTLP_HTTP_METRIC_EXPORTER.value, + "metric_data_point", + urlparse(self._endpoint), + meter_provider, + ) + def _export( self, serialized_data: bytes, timeout_sec: Optional[float] = None ): @@ -228,17 +248,29 @@ def export( if self._shutdown: _logger.warning("Exporter already shutdown, ignoring batch") return MetricExportResult.FAILURE + + num_items = 0 + for resource_metrics in metrics_data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + for metric in scope_metrics.metrics: + num_items += len(metric.data.data_points) + + finish_export = self._metrics.start_export(num_items) + serialized_data = encode_metrics(metrics_data).SerializeToString() deadline_sec = time() + self._timeout for retry_num in range(_MAX_RETRYS): # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + export_error: Optional[Exception] = None try: resp = self._export(serialized_data, deadline_sec - time()) if resp.ok: + finish_export(None, None) return MetricExportResult.SUCCESS except requests.exceptions.RequestException as error: reason = error + export_error = error retryable = isinstance(error, ConnectionError) status_code = None else: @@ -252,6 +284,12 @@ def export( status_code, reason, ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + finish_export(export_error, error_attrs) return MetricExportResult.FAILURE if ( retry_num + 1 == _MAX_RETRYS @@ -262,6 +300,12 @@ def export( "Failed to export metrics batch due to timeout, " "max retries or shutdown." ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + finish_export(export_error, error_attrs) return MetricExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", @@ -290,6 +334,14 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" return True + def set_meter_provider(self, meter_provider: MeterProvider) -> None: + self._metrics = ExporterMetrics( + OtelComponentTypeValues.OTLP_HTTP_METRIC_EXPORTER.value, + "metric_data_point", + urlparse(self._endpoint), + meter_provider, + ) + @deprecated( "Use one of the encoders from opentelemetry-exporter-otlp-proto-common instead. Deprecated since version 1.18.0.", diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index d02f94adf05..4f9f960b47f 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -21,10 +21,14 @@ from os import environ from time import time from typing import Dict, Optional, Sequence +from urllib.parse import urlparse import requests from requests.exceptions import ConnectionError +from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( + ExporterMetrics, +) from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( encode_spans, ) @@ -36,6 +40,7 @@ _is_retryable, _load_session_from_envvar, ) +from opentelemetry.metrics import MeterProvider from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER, OTEL_EXPORTER_OTLP_CERTIFICATE, @@ -55,6 +60,12 @@ ) from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) +from opentelemetry.semconv.attributes.http_attributes import ( + HTTP_RESPONSE_STATUS_CODE, +) from opentelemetry.util.re import parse_env_headers _logger = logging.getLogger(__name__) @@ -78,6 +89,8 @@ def __init__( timeout: Optional[float] = None, compression: Optional[Compression] = None, session: Optional[requests.Session] = None, + *, + meter_provider: Optional[MeterProvider] = None, ): self._shutdown_in_progress = threading.Event() self._endpoint = endpoint or environ.get( @@ -134,6 +147,13 @@ def __init__( ) self._shutdown = False + self._metrics = ExporterMetrics( + OtelComponentTypeValues.OTLP_HTTP_SPAN_EXPORTER.value, + "span", + urlparse(self._endpoint), + meter_provider, + ) + def _export( self, serialized_data: bytes, timeout_sec: Optional[float] = None ): @@ -176,17 +196,22 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: _logger.warning("Exporter already shutdown, ignoring batch") return SpanExportResult.FAILURE + finish_export = self._metrics.start_export(len(spans)) + serialized_data = encode_spans(spans).SerializePartialToString() deadline_sec = time() + self._timeout for retry_num in range(_MAX_RETRYS): # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + export_error: Optional[Exception] = None try: resp = self._export(serialized_data, deadline_sec - time()) if resp.ok: + finish_export(None, None) return SpanExportResult.SUCCESS except requests.exceptions.RequestException as error: reason = error + export_error = error retryable = isinstance(error, ConnectionError) status_code = None else: @@ -200,6 +225,12 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: status_code, reason, ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + finish_export(export_error, error_attrs) return SpanExportResult.FAILURE if ( @@ -211,6 +242,12 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: "Failed to export span batch due to timeout, " "max retries or shutdown." ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + finish_export(export_error, error_attrs) return SpanExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting span batch, retrying in %.2fs.", diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 2dbbadccb9e..25839305bbc 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -58,6 +58,7 @@ from opentelemetry.sdk.metrics import ( Counter, Histogram, + MeterProvider, ObservableCounter, ObservableGauge, ObservableUpDownCounter, @@ -65,6 +66,7 @@ ) from opentelemetry.sdk.metrics.export import ( AggregationTemporality, + InMemoryMetricReader, MetricExportResult, MetricsData, ResourceMetrics, @@ -89,9 +91,13 @@ OS_ENV_TIMEOUT = "30" -# pylint: disable=protected-access +# pylint: disable=protected-access,too-many-public-methods class TestOTLPMetricExporter(TestCase): def setUp(self): + self.metric_reader = InMemoryMetricReader() + self.meter_provider = MeterProvider( + metric_readers=[self.metric_reader] + ) self.metrics = { "sum_int": MetricsData( resource_metrics=[ @@ -319,12 +325,37 @@ def test_success(self, mock_post): mock_post.return_value = resp exporter = OTLPMetricExporter() + exporter.set_meter_provider(self.meter_provider) self.assertEqual( exporter.export(self.metrics["sum_int"]), MetricExportResult.SUCCESS, ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.metric_data_point.exported" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertEqual( + metrics[1].name, "otel.sdk.exporter.metric_data_point.inflight" + ) + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertEqual( + metrics[2].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + @patch.object(Session, "post") def test_failure(self, mock_post): resp = Response() @@ -332,12 +363,60 @@ def test_failure(self, mock_post): mock_post.return_value = resp exporter = OTLPMetricExporter() + exporter.set_meter_provider(self.meter_provider) self.assertEqual( exporter.export(self.metrics["sum_int"]), MetricExportResult.FAILURE, ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.metric_data_point.exported" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[0].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[0].data.data_points[0].attributes, + ) + self.assertEqual( + metrics[1].name, "otel.sdk.exporter.metric_data_point.inflight" + ) + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual( + metrics[2].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[2].data.data_points[0].attributes + ) + self.assertEqual( + metrics[2] + .data.data_points[0] + .attributes["http.response.status_code"], + 401, + ) + @patch.object(Session, "post") def test_serialization(self, mock_post): resp = Response() @@ -534,7 +613,9 @@ def test_preferred_aggregation_override(self): @patch.object(Session, "post") def test_retry_timeout(self, mock_post): - exporter = OTLPMetricExporter(timeout=1.5) + exporter = OTLPMetricExporter( + timeout=1.5, meter_provider=self.meter_provider + ) resp = Response() resp.status_code = 503 @@ -557,6 +638,53 @@ def test_retry_timeout(self, mock_post): warning.records[0].message, ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.metric_data_point.exported" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[0].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[0].data.data_points[0].attributes, + ) + self.assertEqual( + metrics[1].name, "otel.sdk.exporter.metric_data_point.inflight" + ) + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual( + metrics[2].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[2].data.data_points[0].attributes + ) + self.assertEqual( + metrics[2] + .data.data_points[0] + .attributes["http.response.status_code"], + 503, + ) + @patch.object(Session, "post") def test_export_no_collector_available_retryable(self, mock_post): exporter = OTLPMetricExporter(timeout=1.5) @@ -635,3 +763,15 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): ) assert after - before < 0.2 + + def assert_standard_metric_attrs(self, attributes): + self.assertEqual( + attributes["otel.component.type"], "otlp_http_metric_exporter" + ) + self.assertTrue( + attributes["otel.component.name"].startswith( + "otlp_http_metric_exporter/" + ) + ) + self.assertEqual(attributes["server.address"], "localhost") + self.assertEqual(attributes["server.port"], 4318) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index c86ac1f6ba1..7981b0bc821 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -59,6 +59,8 @@ OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, OTEL_EXPORTER_OTLP_TIMEOUT, ) +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.test.mock_test_classes import IterEntryPoint @@ -78,6 +80,12 @@ class TestOTLPHTTPLogExporter(unittest.TestCase): + def setUp(self): + self.metric_reader = InMemoryMetricReader() + self.meter_provider = MeterProvider( + metric_readers=[self.metric_reader] + ) + def test_constructor_default(self): exporter = OTLPLogExporter() @@ -461,7 +469,9 @@ def test_2xx_status_code(self, mock_otlp_metric_exporter): @patch.object(Session, "post") def test_retry_timeout(self, mock_post): - exporter = OTLPLogExporter(timeout=1.5) + exporter = OTLPLogExporter( + timeout=1.5, meter_provider=self.meter_provider + ) resp = Response() resp.status_code = 503 @@ -484,6 +494,49 @@ def test_retry_timeout(self, mock_post): warning.records[0].message, ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual(metrics[0].name, "otel.sdk.exporter.log.exported") + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[0].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[0].data.data_points[0].attributes, + ) + self.assertEqual(metrics[1].name, "otel.sdk.exporter.log.inflight") + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual( + metrics[2].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[2].data.data_points[0].attributes + ) + self.assertEqual( + metrics[2] + .data.data_points[0] + .attributes["http.response.status_code"], + 503, + ) + @patch.object(Session, "post") def test_export_no_collector_available_retryable(self, mock_post): exporter = OTLPLogExporter(timeout=1.5) @@ -504,7 +557,9 @@ def test_export_no_collector_available_retryable(self, mock_post): @patch.object(Session, "post") def test_export_no_collector_available(self, mock_post): - exporter = OTLPLogExporter(timeout=1.5) + exporter = OTLPLogExporter( + timeout=1.5, meter_provider=self.meter_provider + ) mock_post.side_effect = requests.exceptions.RequestException() with self.assertLogs(level=WARNING) as warning: @@ -518,6 +573,49 @@ def test_export_no_collector_available(self, mock_post): warning.records[0].message, ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual(metrics[0].name, "otel.sdk.exporter.log.exported") + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertEqual( + metrics[0].data.data_points[0].attributes["error.type"], + "RequestException", + ) + self.assertNotIn( + "http.response.status_code", + metrics[0].data.data_points[0].attributes, + ) + self.assertEqual(metrics[1].name, "otel.sdk.exporter.log.inflight") + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual( + metrics[2].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertEqual( + metrics[2].data.data_points[0].attributes["error.type"], + "RequestException", + ) + self.assertNotIn( + "http.response.status_code", + metrics[2].data.data_points[0].attributes, + ) + @patch.object(Session, "post") def test_timeout_set_correctly(self, mock_post): resp = Response() @@ -562,3 +660,15 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): ) assert after - before < 0.2 + + def assert_standard_metric_attrs(self, attributes): + self.assertEqual( + attributes["otel.component.type"], "otlp_http_log_exporter" + ) + self.assertTrue( + attributes["otel.component.name"].startswith( + "otlp_http_log_exporter/" + ) + ) + self.assertEqual(attributes["server.address"], "localhost") + self.assertEqual(attributes["server.port"], 4318) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 5f61344bbf1..0df471aa693 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -49,6 +49,8 @@ OTEL_EXPORTER_OTLP_TRACES_HEADERS, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, ) +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader from opentelemetry.sdk.trace import _Span from opentelemetry.sdk.trace.export import SpanExportResult from opentelemetry.test.mock_test_classes import IterEntryPoint @@ -73,6 +75,12 @@ # pylint: disable=protected-access class TestOTLPSpanExporter(unittest.TestCase): + def setUp(self): + self.metric_reader = InMemoryMetricReader() + self.meter_provider = MeterProvider( + metric_readers=[self.metric_reader] + ) + def test_constructor_default(self): exporter = OTLPSpanExporter() @@ -281,7 +289,9 @@ def test_2xx_status_code(self, mock_otlp_metric_exporter): @patch.object(Session, "post") def test_retry_timeout(self, mock_post): - exporter = OTLPSpanExporter(timeout=1.5) + exporter = OTLPSpanExporter( + timeout=1.5, meter_provider=self.meter_provider + ) resp = Response() resp.status_code = 503 @@ -304,6 +314,49 @@ def test_retry_timeout(self, mock_post): warning.records[0].message, ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[0].data.data_points[0].attributes + ) + self.assertEqual( + metrics[0] + .data.data_points[0] + .attributes["http.response.status_code"], + 503, + ) + self.assertEqual(metrics[1].name, "otel.sdk.exporter.span.exported") + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual(metrics[2].name, "otel.sdk.exporter.span.inflight") + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[2].data.data_points[0].attributes, + ) + @patch.object(Session, "post") def test_export_no_collector_available_retryable(self, mock_post): exporter = OTLPSpanExporter(timeout=1.5) @@ -324,7 +377,9 @@ def test_export_no_collector_available_retryable(self, mock_post): @patch.object(Session, "post") def test_export_no_collector_available(self, mock_post): - exporter = OTLPSpanExporter(timeout=1.5) + exporter = OTLPSpanExporter( + timeout=1.5, meter_provider=self.meter_provider + ) mock_post.side_effect = requests.exceptions.RequestException() with self.assertLogs(level=WARNING) as warning: @@ -338,6 +393,49 @@ def test_export_no_collector_available(self, mock_post): warning.records[0].message, ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertEqual( + metrics[0].data.data_points[0].attributes["error.type"], + "RequestException", + ) + self.assertNotIn( + "http.response.status_code", + metrics[0].data.data_points[0].attributes, + ) + self.assertEqual(metrics[1].name, "otel.sdk.exporter.span.exported") + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertEqual( + metrics[1].data.data_points[0].attributes["error.type"], + "RequestException", + ) + self.assertNotIn( + "http.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual(metrics[2].name, "otel.sdk.exporter.span.inflight") + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[2].data.data_points[0].attributes, + ) + @patch.object(Session, "post") def test_timeout_set_correctly(self, mock_post): resp = Response() @@ -380,3 +478,15 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): ) assert after - before < 0.2 + + def assert_standard_metric_attrs(self, attributes): + self.assertEqual( + attributes["otel.component.type"], "otlp_http_span_exporter" + ) + self.assertTrue( + attributes["otel.component.name"].startswith( + "otlp_http_span_exporter/" + ) + ) + self.assertEqual(attributes["server.address"], "localhost") + self.assertEqual(attributes["server.port"], 4318)