Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4935](https://github.com/open-telemetry/opentelemetry-python/pull/4935))
- `opentelemetry-sdk`: upgrade vendored OTel configuration schema from v1.0.0-rc.3 to v1.0.0
([#4965](https://github.com/open-telemetry/opentelemetry-python/pull/4965))
- `opentelemetry-sdk`: implement exporter metrics
([#4976](https://github.com/open-telemetry/opentelemetry-python/pull/4976))
- `opentelemetry-exporter-prometheus`: Fix metric name prefix
([#4895](https://github.com/open-telemetry/opentelemetry-python/pull/4895))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from collections import Counter
from time import perf_counter
from typing import TYPE_CHECKING, Callable

from opentelemetry.metrics import MeterProvider, get_meter_provider
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OTEL_COMPONENT_NAME,
OTEL_COMPONENT_TYPE,
)
from opentelemetry.semconv._incubating.metrics.otel_metrics import (
create_otel_sdk_exporter_log_exported,
create_otel_sdk_exporter_log_inflight,
create_otel_sdk_exporter_metric_data_point_exported,
create_otel_sdk_exporter_metric_data_point_inflight,
create_otel_sdk_exporter_operation_duration,
create_otel_sdk_exporter_span_exported,
create_otel_sdk_exporter_span_inflight,
)
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
from opentelemetry.semconv.attributes.server_attributes import (
SERVER_ADDRESS,
SERVER_PORT,
)

if TYPE_CHECKING:
from typing import Literal
from urllib.parse import ParseResult as UrlParseResult

from opentelemetry.util.types import Attributes, AttributeValue

_component_counter = Counter()


class ExporterMetrics:
def __init__(
self,
component_type: str,
signal: Literal["span", "log", "metric_data_point"],
endpoint: UrlParseResult,
meter_provider: MeterProvider | None,
) -> None:
if signal == "span":
create_exported = create_otel_sdk_exporter_span_exported
create_inflight = create_otel_sdk_exporter_span_inflight
elif signal == "log":
create_exported = create_otel_sdk_exporter_log_exported
create_inflight = create_otel_sdk_exporter_log_inflight
else:
create_exported = (
create_otel_sdk_exporter_metric_data_point_exported
)
create_inflight = (
create_otel_sdk_exporter_metric_data_point_inflight
)

port = endpoint.port
if port is None:
if endpoint.scheme == "https":
port = 443
elif endpoint.scheme == "http":
port = 80

count = _component_counter[component_type]
_component_counter[component_type] = count + 1
self._standard_attrs: dict[str, AttributeValue] = {
OTEL_COMPONENT_TYPE: component_type,
OTEL_COMPONENT_NAME: f"{component_type}/{count}",
}
if endpoint.hostname:
self._standard_attrs[SERVER_ADDRESS] = endpoint.hostname
if port is not None:
self._standard_attrs[SERVER_PORT] = port

meter_provider = meter_provider or get_meter_provider()
meter = meter_provider.get_meter("opentelemetry-sdk")
self._inflight = create_inflight(meter)
self._exported = create_exported(meter)
self._duration = create_otel_sdk_exporter_operation_duration(meter)

def start_export(
self, num_items: int
) -> Callable[[Exception | None, Attributes], None]:
start_time = perf_counter()
self._inflight.add(num_items, self._standard_attrs)

def finish_export(
error: Exception | None,
error_attrs: Attributes,
):
end_time = perf_counter()
self._inflight.add(-num_items, self._standard_attrs)
exported_attrs = (
{**self._standard_attrs, ERROR_TYPE: type(error).__qualname__}
if error
else self._standard_attrs
)
self._exported.add(num_items, exported_attrs)
duration_attrs = (
{**exported_attrs, **error_attrs}
if error_attrs
else exported_attrs
)
self._duration.record(end_time - start_time, duration_attrs)

return finish_export
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
_get_credentials,
environ_to_compression,
)
from opentelemetry.metrics import MeterProvider
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
ExportLogsServiceRequest,
)
Expand All @@ -44,6 +45,9 @@
OTEL_EXPORTER_OTLP_LOGS_INSECURE,
OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
)
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OtelComponentTypeValues,
)


class OTLPLogExporter(
Expand All @@ -66,6 +70,8 @@ def __init__(
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
channel_options: Optional[Tuple[Tuple[str, str]]] = None,
*,
meter_provider: Optional[MeterProvider] = None,
):
insecure_logs = environ.get(OTEL_EXPORTER_OTLP_LOGS_INSECURE)
if insecure is None and insecure_logs is not None:
Expand Down Expand Up @@ -105,13 +111,19 @@ def __init__(
stub=LogsServiceStub,
result=LogRecordExportResult,
channel_options=channel_options,
component_type=OtelComponentTypeValues.OTLP_GRPC_LOG_EXPORTER.value,
signal="log",
meter_provider=meter_provider,
)

def _translate_data(
self, data: Sequence[ReadableLogRecord]
) -> ExportLogsServiceRequest:
return encode_logs(data)

def _count_data(self, data: Sequence[ReadableLogRecord]):
return len(data)

def export( # type: ignore [reportIncompatibleMethodOverride]
self,
batch: Sequence[ReadableLogRecord],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@
secure_channel,
ssl_channel_credentials,
)
from opentelemetry.exporter.otlp.proto.common._exporter_metrics import (
ExporterMetrics,
)
from opentelemetry.exporter.otlp.proto.common._internal import (
_get_resource_data,
)
from opentelemetry.exporter.otlp.proto.grpc import (
_OTLP_GRPC_CHANNEL_OPTIONS,
)
from opentelemetry.metrics import MeterProvider
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
ExportLogsServiceRequest,
)
Expand Down Expand Up @@ -104,6 +108,9 @@
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExportResult
from opentelemetry.semconv._incubating.attributes.rpc_attributes import (
RPC_RESPONSE_STATUS_CODE,
)
from opentelemetry.util._importlib_metadata import entry_points
from opentelemetry.util.re import parse_env_headers

Expand Down Expand Up @@ -299,6 +306,10 @@ def __init__(
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
channel_options: Optional[Tuple[Tuple[str, str]]] = None,
*,
component_type: Optional[str] = None,
signal: Literal["span", "log", "metric_data_point"] = "span",
meter_provider: Optional[MeterProvider] = None,
):
super().__init__()
self._result = result
Expand Down Expand Up @@ -372,6 +383,16 @@ def __init__(
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
)

self._component_type = component_type or type(self).__qualname__
self._signal: Literal["span", "log", "metric_data_point"] = signal
self._parsed_url = parsed_url
self._metrics = ExporterMetrics(
self._component_type,
signal,
parsed_url,
meter_provider,
)

self._initialize_channel_and_stub()

def _initialize_channel_and_stub(self):
Expand Down Expand Up @@ -404,6 +425,13 @@ def _translate_data(
) -> ExportServiceRequestT:
pass

@abstractmethod
def _count_data(
self,
data: SDKDataT,
) -> int:
pass

def _export(
self,
data: SDKDataT,
Expand All @@ -412,6 +440,8 @@ def _export(
logger.warning("Exporter already shutdown, ignoring batch")
return self._result.FAILURE # type: ignore [reportReturnType]

finish_export = self._metrics.start_export(self._count_data(data))

# FIXME remove this check if the export type for traces
# gets updated to a class that represents the proto
# TracesData and use the code below instead.
Expand All @@ -425,6 +455,7 @@ def _export(
metadata=self._headers,
timeout=deadline_sec - time(),
)
finish_export(None, None)
return self._result.SUCCESS # type: ignore [reportReturnType]
except RpcError as error:
retry_info_bin = dict(error.trailing_metadata()).get( # type: ignore [reportAttributeAccessIssue]
Expand Down Expand Up @@ -472,6 +503,9 @@ def _export(
error.code(), # type: ignore [reportAttributeAccessIssue]
exc_info=error.code() == StatusCode.UNKNOWN, # type: ignore [reportAttributeAccessIssue]
)
finish_export(
error, {RPC_RESPONSE_STATUS_CODE: error.code().name}
)
return self._result.FAILURE # type: ignore [reportReturnType]
logger.warning(
"Transient error %s encountered while exporting %s to %s, retrying in %.2fs.",
Expand Down Expand Up @@ -510,3 +544,11 @@ def _exporting(self) -> str:
warning messages.
"""
pass

def _set_meter_provider(self, meter_provider: MeterProvider) -> None:
self._metrics = ExporterMetrics(
self._component_type,
self._signal,
self._parsed_url,
meter_provider,
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from dataclasses import replace
from logging import getLogger
from os import environ
from typing import Iterable, List, Tuple, Union
from typing import Iterable, List, Optional, Tuple, Union
from typing import Sequence as TypingSequence

from grpc import ChannelCredentials, Compression
Expand All @@ -32,6 +32,7 @@
environ_to_compression,
get_resource_data,
)
from opentelemetry.metrics import MeterProvider
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
ExportMetricsServiceRequest,
)
Expand Down Expand Up @@ -72,6 +73,9 @@
from opentelemetry.sdk.metrics.export import ( # noqa: F401
Histogram as HistogramType,
)
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OtelComponentTypeValues,
)

_logger = getLogger(__name__)

Expand Down Expand Up @@ -109,6 +113,8 @@ def __init__(
preferred_aggregation: dict[type, Aggregation] | None = None,
max_export_batch_size: int | None = None,
channel_options: Tuple[Tuple[str, str]] | None = None,
*,
meter_provider: Optional[MeterProvider] = None,
):
insecure_metrics = environ.get(OTEL_EXPORTER_OTLP_METRICS_INSECURE)
if insecure is None and insecure_metrics is not None:
Expand Down Expand Up @@ -153,6 +159,9 @@ def __init__(
timeout=timeout or environ_timeout,
compression=compression,
channel_options=channel_options,
component_type=OtelComponentTypeValues.OTLP_GRPC_METRIC_EXPORTER.value,
signal="metric_data_point",
meter_provider=meter_provider,
)

self._max_export_batch_size: int | None = max_export_batch_size
Expand All @@ -162,6 +171,16 @@ def _translate_data( # type: ignore [reportIncompatibleMethodOverride]
) -> ExportMetricsServiceRequest:
return encode_metrics(data)

def _count_data(self, data: MetricsData):
num_items = 0

for resource_metrics in data.resource_metrics:
for scope_metrics in resource_metrics.scope_metrics:
for metric in scope_metrics.metrics:
num_items += len(metric.data.data_points)

return num_items

def export(
self,
metrics_data: MetricsData,
Expand Down Expand Up @@ -268,6 +287,9 @@ def _split_metrics_data(
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)

def set_meter_provider(self, meter_provider: MeterProvider):
return self._set_meter_provider(meter_provider)

@property
def _exporting(self) -> str:
return "metrics"
Expand Down
Loading
Loading