From f88504eb4157e036fccc17d31cb1b420222b01f8 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Wed, 29 Apr 2026 23:51:04 -0400 Subject: [PATCH] generalize OTLP-HTTP exporter --- .../pyproject.toml | 2 +- .../exporter/otlp/proto/http/_client.py | 182 ++++++++++++++ .../otlp/proto/http/_common/__init__.py | 75 ++++-- .../otlp/proto/http/_log_exporter/__init__.py | 184 ++++---------- .../exporter/otlp/proto/http/_session.py | 111 +++++++++ .../proto/http/metric_exporter/__init__.py | 234 ++++-------------- .../proto/http/trace_exporter/__init__.py | 181 ++++---------- 7 files changed, 476 insertions(+), 493 deletions(-) create mode 100644 exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_client.py create mode 100644 exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_session.py diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml b/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml index 719a2cd84f..90d9073265 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml +++ b/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml @@ -31,7 +31,7 @@ dependencies = [ "opentelemetry-proto == 1.42.0.dev", "opentelemetry-sdk ~= 1.42.0.dev", "opentelemetry-exporter-otlp-proto-common == 1.42.0.dev", - "requests ~= 2.7", + "urllib3 >= 2.0, < 3.0", "typing-extensions >= 4.5.0", ] diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_client.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_client.py new file mode 100644 index 0000000000..e74d9de725 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_client.py @@ -0,0 +1,182 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gzip +import logging +import random +import threading +import zlib +from dataclasses import dataclass +from io import BytesIO +from time import time +from typing import Generic, TypeVar, Final, Any, Literal, Mapping + +from opentelemetry.exporter.otlp.proto.http import Compression +from opentelemetry.exporter.otlp.proto.http._common import _is_retryable +from opentelemetry.exporter.otlp.proto.http._session import ( + HttpSession, HttpResponse, +) + +_logger = logging.getLogger(__name__) + +_MAX_RETRIES: Final[int] = 6 +_CONNECTION_ERRORS: Final[frozenset[str]] = frozenset({ + "urllib3.exceptions.ProtocolError", + "urllib3.exceptions.ProxyError", + "urllib3.exceptions.TimeoutError", + "urllib3.exceptions.ReadTimeoutError", + "urllib3.exceptions.ConnectTimeoutError", + "urllib3.exceptions.NewConnectionError", + "requests.exceptions.ConnectionError", + "requests.exceptions.ProxyError", + "requests.exceptions.ConnectTimeout", + "requests.exceptions.ReadTimeout", +}) + +T = TypeVar("T", bound=HttpSession) + + +@dataclass(slots=True, frozen=True) +class ExportResult: + success: bool + status_code: int | None + error: Exception | None + + +class OTLPHttpClient(Generic[T]): + """Shared HTTP transport used by the OTLP HTTP exporters. + + Owns compression, the POST request, and the retry loop with + exponential backoff bounded by an absolute deadline. Generic over + the :class:`HttpSession` implementation. + """ + + def __init__( + self, + session: T, + endpoint: str, + timeout: float, + compression: Compression, + shutdown_event: threading.Event, + headers: Mapping[str, str], + kind: Literal["span", "logs", "metrics"], + ) -> None: + self._session: T = session + self._endpoint = endpoint + self._timeout = timeout + self._compression = compression + self._shutdown_event = shutdown_event + self._headers = headers + self._kind = kind + + def _compress(self, serialized_data: bytes) -> bytes: + if self._compression is Compression.Gzip: + buf = BytesIO() + with gzip.GzipFile(fileobj=buf, mode="w") as gz: + gz.write(serialized_data) + return buf.getvalue() + if self._compression is Compression.Deflate: + return zlib.compress(serialized_data) + return serialized_data + + def _submit(self, data: bytes, timeout: float) -> HttpResponse: + try: + return self._session.request( + "POST", + self._endpoint, + headers=dict(self._headers), + data=data, + timeout=timeout, + ) + except Exception as error: + if _get_fqn(type(error)) not in _CONNECTION_ERRORS: + raise + # Backends may close keep-alive connections mid-flight; one + # retry covers the common stale-connection case before + # falling through to the outer backoff. + return self._session.request( + "POST", + self._endpoint, + headers=dict(self._headers), + data=data, + timeout=timeout, + ) + + def export(self, data: bytes) -> ExportResult: + data = self._compress(data) + deadline_sec = time() + self._timeout + + for retry_num in range(_MAX_RETRIES): + backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + export_error: Exception | None = None + status_code: int | None = None + retryable: bool + reason: str | Exception | None + + try: + resp = self._submit(data, max(deadline_sec - time(), 0.0)) + except Exception as error: + export_error = error + reason = error + retryable = _get_fqn(type(error)) in _CONNECTION_ERRORS + else: + status_code = resp.status_code + if 200 <= status_code < 400: + return ExportResult(True, status_code, None) + reason = resp.reason + retryable = _is_retryable(resp) + + if not retryable: + _logger.error( + "Failed to export %s batch code: %s, reason: %s", + self._kind, + status_code, + reason, + ) + return ExportResult(False, status_code, export_error) + + if ( + retry_num + 1 == _MAX_RETRIES + or backoff_seconds > (deadline_sec - time()) + or self._shutdown_event.is_set() + ): + _logger.error( + "Failed to export %s batch due to timeout, " + "max retries or shutdown.", + self._kind, + ) + return ExportResult(False, status_code, export_error) + + _logger.warning( + "Transient error %s encountered while exporting %s batch, retrying in %.2fs.", + reason, + self._kind, + backoff_seconds, + ) + shutdown = self._shutdown_event.wait(backoff_seconds) + if shutdown: + _logger.warning("Shutdown in progress, aborting retry.") + break + + return ExportResult(False, None, None) + + def close(self) -> None: + self._session.close() + + +def _get_fqn(ty: type) -> str: + module = ty.__module__ + if module is None or module == 'builtins': + return ty.__qualname__ + return f"{module}.{ty.__qualname__}" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py index 1bdb7d228c..84bc60b121 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py @@ -11,22 +11,31 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations from os import environ -from typing import Literal, Optional - -import requests +from typing import TYPE_CHECKING, Literal, Mapping +from opentelemetry.exporter.otlp.proto.http import ( + _OTLP_HTTP_HEADERS, + Compression, +) from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_CREDENTIAL_PROVIDER, ) from opentelemetry.util._importlib_metadata import entry_points +if TYPE_CHECKING: + from opentelemetry.exporter.otlp.proto.http._session import ( + HttpSession, + HttpResponse, + ) + -def _is_retryable(resp: requests.Response) -> bool: +def _is_retryable(resp: HttpResponse) -> bool: if resp.status_code == 408: return True - if resp.status_code >= 500 and resp.status_code <= 599: + if 500 <= resp.status_code <= 599: return True return False @@ -37,30 +46,42 @@ def _load_session_from_envvar( "OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER", "OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER", ], -) -> Optional[requests.Session]: +) -> HttpSession | None: _credential_env = environ.get( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_CREDENTIAL_PROVIDER ) or environ.get(cred_envvar) - if _credential_env: - try: - maybe_session = next( - iter( - entry_points( - group="opentelemetry_otlp_credential_provider", - name=_credential_env, - ) + if not _credential_env: + return None + try: + maybe_session = next( + iter( + entry_points( + group="opentelemetry_otlp_credential_provider", + name=_credential_env, ) - ).load()() - except StopIteration: - raise RuntimeError( - f"Requested component '{_credential_env}' not found in " - f"entry point 'opentelemetry_otlp_credential_provider'" ) - if isinstance(maybe_session, requests.Session): - return maybe_session - else: - raise RuntimeError( - f"Requested component '{_credential_env}' is of type {type(maybe_session)}" - f" must be of type `requests.Session`." - ) - return None + ).load()() + except StopIteration: + raise RuntimeError( + f"Requested component '{_credential_env}' not found in " + f"entry point 'opentelemetry_otlp_credential_provider'" + ) + if not ( + hasattr(maybe_session, "request") and hasattr(maybe_session, "close") + ): + raise RuntimeError( + f"Requested component '{_credential_env}' is of type " + f"{type(maybe_session)}; must implement the HttpSession protocol " + f"(request() and close() methods)." + ) + return maybe_session + + +def _build_default_headers( + user_headers: Mapping[str, str], compression: Compression +) -> dict[str, str]: + """Combine OTLP defaults with user headers; user values override.""" + merged = {**_OTLP_HTTP_HEADERS, **user_headers} + if compression is not Compression.NoCompression: + merged["Content-Encoding"] = compression.value + return merged diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 6032433dd1..080bf83f9d 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -11,33 +11,28 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations -import gzip import logging -import random import threading -import zlib -from io import BytesIO from os import environ -from time import time -from typing import Dict, Optional, Sequence +from typing import Sequence from urllib.parse import urlparse -import requests -from requests.exceptions import ConnectionError - from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( ExporterMetrics, ) from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs -from opentelemetry.exporter.otlp.proto.http import ( - _OTLP_HTTP_HEADERS, - Compression, -) +from opentelemetry.exporter.otlp.proto.http import Compression +from opentelemetry.exporter.otlp.proto.http._client import OTLPHttpClient from opentelemetry.exporter.otlp.proto.http._common import ( - _is_retryable, + _build_default_headers, _load_session_from_envvar, ) +from opentelemetry.exporter.otlp.proto.http._session import ( + HttpSession, + Urllib3Session, +) from opentelemetry.metrics import MeterProvider from opentelemetry.sdk._logs import ReadableLogRecord from opentelemetry.sdk._logs.export import ( @@ -79,22 +74,21 @@ DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_LOGS_EXPORT_PATH = "v1/logs" DEFAULT_TIMEOUT = 10 # in seconds -_MAX_RETRYS = 6 class OTLPLogExporter(LogRecordExporter): def __init__( self, - endpoint: Optional[str] = None, - certificate_file: Optional[str] = None, - client_key_file: Optional[str] = None, - client_certificate_file: Optional[str] = None, - headers: Optional[Dict[str, str]] = None, - timeout: Optional[float] = None, - compression: Optional[Compression] = None, - session: Optional[requests.Session] = None, + endpoint: str | None = None, + certificate_file: str | None = None, + client_key_file: str | None = None, + client_certificate_file: str | None = None, + headers: dict[str, str] | None = None, + timeout: float | None = None, + compression: Compression | None = None, + session: HttpSession | None = None, *, - meter_provider: Optional[MeterProvider] = None, + meter_provider: MeterProvider | None = None, ): self._shutdown_is_occuring = threading.Event() self._endpoint = endpoint or environ.get( @@ -103,7 +97,6 @@ def __init__( environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT) ), ) - # Keeping these as instance variables because they are used in tests self._certificate_file = certificate_file or environ.get( OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE, environ.get(OTEL_EXPORTER_OTLP_CERTIFICATE, True), @@ -135,21 +128,26 @@ def __init__( ) ) self._compression = compression or _compression_from_env() - self._session = ( - session - or _load_session_from_envvar( - _OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER - ) - or requests.Session() + + session_ = session or _load_session_from_envvar( + _OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER + ) + + self._session: HttpSession = session_ or Urllib3Session( + verify=self._certificate_file, + cert=self._client_cert, + ) + self._client: OTLPHttpClient[HttpSession] = OTLPHttpClient( + session=self._session, + endpoint=self._endpoint, + timeout=self._timeout, + compression=self._compression, + shutdown_event=self._shutdown_is_occuring, + headers=_build_default_headers( + self._headers, self._compression + ), + kind="logs", ) - self._session.headers.update(self._headers) - self._session.headers.update(_OTLP_HTTP_HEADERS) - # let users override our defaults - self._session.headers.update(self._headers) - if self._compression is not Compression.NoCompression: - self._session.headers.update( - {"Content-Encoding": self._compression.value} - ) self._shutdown = False self._metrics = ExporterMetrics( @@ -159,43 +157,6 @@ def __init__( meter_provider, ) - def _export( - self, serialized_data: bytes, timeout_sec: Optional[float] = None - ): - data = serialized_data - if self._compression == Compression.Gzip: - gzip_data = BytesIO() - with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: - gzip_stream.write(serialized_data) - data = gzip_data.getvalue() - elif self._compression == Compression.Deflate: - data = zlib.compress(serialized_data) - - if timeout_sec is None: - timeout_sec = self._timeout - - # By default, keep-alive is enabled in Session's request - # headers. Backends may choose to close the connection - # while a post happens which causes an unhandled - # exception. This try/except will retry the post on such exceptions - try: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - except ConnectionError: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - return resp - def export( self, batch: Sequence[ReadableLogRecord] ) -> LogRecordExportResult: @@ -205,66 +166,15 @@ def export( with self._metrics.export_operation(len(batch)) as result: serialized_data = encode_logs(batch).SerializeToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return LogRecordExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code - - if not retryable: - _logger.error( - "Failed to export logs batch code: %s, reason: %s", - status_code, - reason, - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return LogRecordExportResult.FAILURE - - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export logs batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return LogRecordExportResult.FAILURE - _logger.warning( - "Transient error %s encountered while exporting logs batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_is_occuring.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break + outcome = self._client.export(serialized_data) + if outcome.success: + return LogRecordExportResult.SUCCESS + result.error = outcome.error + result.error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: outcome.status_code} + if outcome.status_code is not None + else None + ) return LogRecordExportResult.FAILURE def force_flush(self, timeout_millis: float = 10_000) -> bool: @@ -277,7 +187,7 @@ def shutdown(self): return self._shutdown = True self._shutdown_is_occuring.set() - self._session.close() + self._client.close() def _compression_from_env() -> Compression: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_session.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_session.py new file mode 100644 index 0000000000..5f66387f61 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_session.py @@ -0,0 +1,111 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import warnings +from typing import Protocol, runtime_checkable + +import urllib3 + + +@runtime_checkable +class HttpResponse(Protocol): + status_code: int + reason: str | None + + +@runtime_checkable +class HttpSession(Protocol): + def request( + self, + method: str, + url: str, + *, + headers: dict[str, str] | None = None, + timeout: float | None = None, + data: bytes | None = None, + ) -> HttpResponse: ... + + def close(self) -> None: ... + + +class _Urllib3Response: + def __init__(self, resp: urllib3.HTTPResponse) -> None: + self._resp = resp + + @property + def status_code(self) -> int: + return self._resp.status + + @property + def reason(self) -> str | None: + return self._resp.reason + +class Urllib3Session: + """Default :class:`HttpSession` implementation backed by ``urllib3``. + + ``verify`` follows the same semantics as ``requests``: ``True`` enables + certificate verification against the system trust store, ``False`` + disables it, and a string is treated as a path to a CA bundle. ``cert`` + is either a path to a client certificate file, or a ``(cert, key)`` + tuple for mutual TLS. + """ + + def __init__( + self, + *, + verify: bool | str = True, + cert: str | tuple[str, str] | None = None, + ) -> None: + pool_kwargs: dict[str, object] = { + "retries": urllib3.Retry(0, redirect=False), + } + if verify is False: + pool_kwargs["cert_reqs"] = "CERT_NONE" + warnings.filterwarnings( + "ignore", + category=urllib3.exceptions.InsecureRequestWarning, + ) + else: + pool_kwargs["cert_reqs"] = "CERT_REQUIRED" + if isinstance(verify, str): + pool_kwargs["ca_certs"] = verify + if isinstance(cert, tuple): + pool_kwargs["cert_file"] = cert[0] + pool_kwargs["key_file"] = cert[1] + elif isinstance(cert, str): + pool_kwargs["cert_file"] = cert + + self._pool = urllib3.PoolManager(**pool_kwargs) + + def request( + self, + method: str, + url: str, + *, + headers: dict[str, str] | None = None, + timeout: float | None = None, + data: bytes | None = None, + ) -> HttpResponse: + resp = self._pool.request( + method, + url, + body=data, + headers=headers, + timeout=timeout, + ) + return _Urllib3Response(resp) + + def close(self) -> None: + self._pool.clear() diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index efd63b4543..3724b46e4c 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -12,14 +12,9 @@ # limitations under the License. from __future__ import annotations -import gzip import logging -import random import threading -import zlib -from io import BytesIO from os import environ -from time import time from typing import ( # noqa: F401 Any, Callable, @@ -30,8 +25,6 @@ ) from urllib.parse import urlparse -import requests -from requests.exceptions import ConnectionError from typing_extensions import deprecated from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( @@ -46,13 +39,14 @@ from opentelemetry.exporter.otlp.proto.common.metrics_encoder import ( encode_metrics, ) -from opentelemetry.exporter.otlp.proto.http import ( - _OTLP_HTTP_HEADERS, - Compression, -) +from opentelemetry.exporter.otlp.proto.http import Compression +from opentelemetry.exporter.otlp.proto.http._client import OTLPHttpClient from opentelemetry.exporter.otlp.proto.http._common import ( - _is_retryable, - _load_session_from_envvar, + _load_session_from_envvar, _build_default_headers, +) +from opentelemetry.exporter.otlp.proto.http._session import ( + HttpSession, + Urllib3Session, ) from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401 @@ -115,7 +109,6 @@ DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_METRICS_EXPORT_PATH = "v1/metrics" DEFAULT_TIMEOUT = 10 # in seconds -_MAX_RETRYS = 6 class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin): @@ -128,7 +121,7 @@ def __init__( headers: dict[str, str] | None = None, timeout: float | None = None, compression: Compression | None = None, - session: requests.Session | None = None, + session: HttpSession | None = None, preferred_temporality: dict[type, AggregationTemporality] | None = None, preferred_aggregation: dict[type, Aggregation] | None = None, @@ -146,7 +139,9 @@ def __init__( headers: Headers to be sent with HTTP requests at export timeout: Timeout in seconds for export compression: Compression to use; one of none, gzip, deflate - session: Requests session to use at export + session: HTTP session to use at export. Any object implementing + the :class:`HttpSession` protocol works; a ``requests.Session`` + continues to work via structural typing. preferred_temporality: Map of preferred temporality for each metric type. See `opentelemetry.sdk.metrics.export.MetricReader` for more details on what preferred temporality is. @@ -195,21 +190,24 @@ def __init__( ) ) self._compression = compression or _compression_from_env() - self._session = ( - session - or _load_session_from_envvar( - _OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER - ) - or requests.Session() + + session_ = session or _load_session_from_envvar( + _OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER + ) + + self._session: HttpSession = session_ or Urllib3Session( + verify=self._certificate_file, + cert=self._client_cert, + ) + self._client: OTLPHttpClient[HttpSession] = OTLPHttpClient( + session=self._session, + endpoint=self._endpoint, + timeout=self._timeout, + compression=self._compression, + shutdown_event=self._shutdown_in_progress, + headers=_build_default_headers(self._headers, self._compression), + kind="metrics", ) - self._session.headers.update(self._headers) - self._session.headers.update(_OTLP_HTTP_HEADERS) - # let users override our defaults - self._session.headers.update(self._headers) - if self._compression is not Compression.NoCompression: - self._session.headers.update( - {"Content-Encoding": self._compression.value} - ) self._common_configuration( preferred_temporality, preferred_aggregation @@ -224,120 +222,23 @@ def __init__( meter_provider, ) - def _export( - self, serialized_data: bytes, timeout_sec: Optional[float] = None - ): - data = serialized_data - if self._compression == Compression.Gzip: - gzip_data = BytesIO() - with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: - gzip_stream.write(serialized_data) - data = gzip_data.getvalue() - elif self._compression == Compression.Deflate: - data = zlib.compress(serialized_data) - - if timeout_sec is None: - timeout_sec = self._timeout - - # By default, keep-alive is enabled in Session's request - # headers. Backends may choose to close the connection - # while a post happens which causes an unhandled - # exception. This try/except will retry the post on such exceptions - try: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - except ConnectionError: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - return resp - def _export_with_retries( self, export_request: ExportMetricsServiceRequest, - deadline_sec: float, num_items: int, ) -> MetricExportResult: - """Export serialized data with retry logic until success, non-transient error, or exponential backoff maxed out. - - Args: - export_request: ExportMetricsServiceRequest object containing metrics data to export - deadline_sec: timestamp deadline for the export - - Returns: - MetricExportResult: SUCCESS if export succeeded, FAILURE otherwise - """ + """Export serialized data with retry logic until success, non-transient error, or exponential backoff maxed out.""" with self._metrics.export_operation(num_items) as result: serialized_data = export_request.SerializeToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return MetricExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code - - if not retryable: - _logger.error( - "Failed to export metrics batch code: %s, reason: %s", - status_code, - reason, - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return MetricExportResult.FAILURE - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export metrics batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return MetricExportResult.FAILURE - - _logger.warning( - "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_in_progress.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break + outcome = self._client.export(serialized_data) + if outcome.success: + return MetricExportResult.SUCCESS + result.error = outcome.error + result.error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: outcome.status_code} + if outcome.status_code is not None + else None + ) return MetricExportResult.FAILURE def export( @@ -357,13 +258,10 @@ def export( num_items += len(metric.data.data_points) export_request = encode_metrics(metrics_data) - deadline_sec = time() + self._timeout # If no batch size configured, export as single batch with retries as configured if self._max_export_batch_size is None: - return self._export_with_retries( - export_request, deadline_sec, num_items - ) + return self._export_with_retries(export_request, num_items) # Else, export in batches of configured size batched_export_requests = _split_metrics_data( @@ -372,9 +270,7 @@ def export( for split_metrics_data in batched_export_requests: export_result = self._export_with_retries( - split_metrics_data, - deadline_sec, - num_items, + split_metrics_data, num_items ) if export_result != MetricExportResult.SUCCESS: return MetricExportResult.FAILURE @@ -388,7 +284,7 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: return self._shutdown = True self._shutdown_in_progress.set() - self._session.close() + self._client.close() @property def _exporting(self) -> str: @@ -561,53 +457,7 @@ def _split_metrics_data( def _get_split_resource_metrics_pb2( split_resource_metrics: List[Dict], ) -> List[pb2.ResourceMetrics]: - """Helper that returns a list of pb2.ResourceMetrics objects based on split_resource_metrics. - Example input: - - ```python - [ - { - "resource": , - "schema_url": "http://foo-bar", - "scope_metrics": [ - "scope": , - "schema_url": "http://foo-baz", - "metrics": [ - { - "name": "apples", - "description": "number of apples purchased", - "sum": { - "aggregation_temporality": 1, - "is_monotonic": "false", - "data_points": [ - { - start_time_unix_nano: 1000 - time_unix_nano: 1001 - exemplars { - time_unix_nano: 1002 - span_id: "foo-span" - trace_id: "foo-trace" - as_int: 5 - } - as_int: 5 - } - ] - } - }, - ], - ], - }, - ] - ``` - - Args: - split_resource_metrics: A list of dict representations of ResourceMetrics, - ScopeMetrics, Metrics, and data points. - - Returns: - List[pb2.ResourceMetrics]: A list of pb2.ResourceMetrics objects containing - pb2.ScopeMetrics, pb2.Metrics, and data points - """ + """Helper that returns a list of pb2.ResourceMetrics objects based on split_resource_metrics.""" split_resource_metrics_pb = [] for resource_metrics in split_resource_metrics: new_resource_metrics = pb2.ResourceMetrics( diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 018d89df1e..1586e905fa 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -11,35 +11,30 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations -import gzip import logging -import random import threading -import zlib -from io import BytesIO from os import environ -from time import time -from typing import Dict, Optional, Sequence +from typing import Sequence from urllib.parse import urlparse -import requests -from requests.exceptions import ConnectionError - from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( ExporterMetrics, ) from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( encode_spans, ) -from opentelemetry.exporter.otlp.proto.http import ( - _OTLP_HTTP_HEADERS, - Compression, -) +from opentelemetry.exporter.otlp.proto.http import Compression +from opentelemetry.exporter.otlp.proto.http._client import OTLPHttpClient from opentelemetry.exporter.otlp.proto.http._common import ( - _is_retryable, + _build_default_headers, _load_session_from_envvar, ) +from opentelemetry.exporter.otlp.proto.http._session import ( + HttpSession, + Urllib3Session, +) from opentelemetry.metrics import MeterProvider from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER, @@ -75,22 +70,21 @@ DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_TRACES_EXPORT_PATH = "v1/traces" DEFAULT_TIMEOUT = 10 # in seconds -_MAX_RETRYS = 6 class OTLPSpanExporter(SpanExporter): def __init__( self, - endpoint: Optional[str] = None, - certificate_file: Optional[str] = None, - client_key_file: Optional[str] = None, - client_certificate_file: Optional[str] = None, - headers: Optional[Dict[str, str]] = None, - timeout: Optional[float] = None, - compression: Optional[Compression] = None, - session: Optional[requests.Session] = None, + endpoint: str | None = None, + certificate_file: str | None = None, + client_key_file: str | None = None, + client_certificate_file: str | None = None, + headers: dict[str, str] | None = None, + timeout: float | None = None, + compression: Compression | None = None, + session: HttpSession | None = None, *, - meter_provider: Optional[MeterProvider] = None, + meter_provider: MeterProvider | None = None, ): self._shutdown_in_progress = threading.Event() self._endpoint = endpoint or environ.get( @@ -130,21 +124,24 @@ def __init__( ) ) self._compression = compression or _compression_from_env() - self._session = ( - session - or _load_session_from_envvar( - _OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER - ) - or requests.Session() + + session_ = session or _load_session_from_envvar( + _OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER + ) + + self._session: HttpSession = session_ or Urllib3Session( + verify=self._certificate_file, + cert=self._client_cert, + ) + self._client: OTLPHttpClient[HttpSession] = OTLPHttpClient( + session=self._session, + endpoint=self._endpoint, + timeout=self._timeout, + compression=self._compression, + shutdown_event=self._shutdown_in_progress, + headers=_build_default_headers(self._headers, self._compression), + kind="span", ) - self._session.headers.update(self._headers) - self._session.headers.update(_OTLP_HTTP_HEADERS) - # let users override our defaults - self._session.headers.update(self._headers) - if self._compression is not Compression.NoCompression: - self._session.headers.update( - {"Content-Encoding": self._compression.value} - ) self._shutdown = False self._metrics = ExporterMetrics( @@ -154,43 +151,6 @@ def __init__( meter_provider, ) - def _export( - self, serialized_data: bytes, timeout_sec: Optional[float] = None - ): - data = serialized_data - if self._compression == Compression.Gzip: - gzip_data = BytesIO() - with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: - gzip_stream.write(serialized_data) - data = gzip_data.getvalue() - elif self._compression == Compression.Deflate: - data = zlib.compress(serialized_data) - - if timeout_sec is None: - timeout_sec = self._timeout - - # By default, keep-alive is enabled in Session's request - # headers. Backends may choose to close the connection - # while a post happens which causes an unhandled - # exception. This try/except will retry the post on such exceptions - try: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - except ConnectionError: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - return resp - def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: if self._shutdown: _logger.warning("Exporter already shutdown, ignoring batch") @@ -198,66 +158,15 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: with self._metrics.export_operation(len(spans)) as result: serialized_data = encode_spans(spans).SerializePartialToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return SpanExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code - - if not retryable: - _logger.error( - "Failed to export span batch code: %s, reason: %s", - status_code, - reason, - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return SpanExportResult.FAILURE - - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export span batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return SpanExportResult.FAILURE - _logger.warning( - "Transient error %s encountered while exporting span batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_in_progress.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break + outcome = self._client.export(serialized_data) + if outcome.success: + return SpanExportResult.SUCCESS + result.error = outcome.error + result.error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: outcome.status_code} + if outcome.status_code is not None + else None + ) return SpanExportResult.FAILURE def shutdown(self): @@ -266,7 +175,7 @@ def shutdown(self): return self._shutdown = True self._shutdown_in_progress.set() - self._session.close() + self._client.close() def force_flush(self, timeout_millis: int = 30000) -> bool: """Nothing is buffered in this exporter, so this method does nothing."""