diff --git a/CHANGELOG.md b/CHANGELOG.md index afe82e55b2..6da573fd94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#5135](https://github.com/open-telemetry/opentelemetry-python/pull/5135)) - ci: wait for tracecontext server readiness instead of a fixed sleep in `scripts/tracecontext-integration-test.sh` ([#5149](https://github.com/open-telemetry/opentelemetry-python/pull/5149)) +- `opentelemetry-exporter-otlp-proto-http`: Generalize OTLP HTTP exporters to allow for alternate HTTP client usage + ([#5169](https://github.com/open-telemetry/opentelemetry-python/pull/5169)) ## Version 1.41.0/0.62b0 (2026-04-09) diff --git a/docs-requirements.txt b/docs-requirements.txt index b9efb88906..4359694a39 100644 --- a/docs-requirements.txt +++ b/docs-requirements.txt @@ -27,3 +27,4 @@ wrapt>=1.0.0,<2.0.0 markupsafe~=2.0 protobuf==5.29.6 prometheus-client~=0.22.1 +requests~=2.7 diff --git a/docs/conf.py b/docs/conf.py index 8ca6a83b5b..3b1b5a80ab 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -87,6 +87,7 @@ "wrapt": ("https://wrapt.readthedocs.io/en/latest/", None), "pymongo": ("https://pymongo.readthedocs.io/en/stable/", None), "grpc": ("https://grpc.github.io/grpc/python/", None), + "requests": ("https://requests.readthedocs.io/en/latest/", None), } # http://www.sphinx-doc.org/en/master/config.html#confval-nitpicky @@ -132,6 +133,10 @@ "py:class", "opentelemetry.exporter.otlp.proto.grpc.exporter.OTLPExporterMixin", ), + ( + "py:class", + "opentelemetry.exporter.otlp.proto.http._transport.BaseHTTPTransport", + ), ( "py:class", "opentelemetry.proto.collector.trace.v1.trace_service_pb2.ExportTraceServiceRequest", diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml b/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml index 719a2cd84f..b13b0107dc 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml +++ b/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml @@ -31,8 +31,8 @@ dependencies = [ "opentelemetry-proto == 1.42.0.dev", "opentelemetry-sdk ~= 1.42.0.dev", "opentelemetry-exporter-otlp-proto-common == 1.42.0.dev", - "requests ~= 2.7", "typing-extensions >= 4.5.0", + "urllib3 >= 1.11" ] [project.entry-points.opentelemetry_traces_exporter] @@ -52,6 +52,9 @@ Repository = "https://github.com/open-telemetry/opentelemetry-python" gcp-auth = [ "opentelemetry-exporter-credential-provider-gcp >= 0.59b0", ] +requests = [ + "requests ~= 2.7" +] [tool.hatch.version] path = "src/opentelemetry/exporter/otlp/proto/http/version/__init__.py" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/__init__.py index 745a60424e..e60b3bf2eb 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/__init__.py @@ -71,10 +71,13 @@ """ import enum +from typing import Final from .version import __version__ -_OTLP_HTTP_HEADERS = { +_CONTENT_ENCODING_HEADER: Final[str] = "Content-Encoding" + +_OTLP_HTTP_HEADERS: Final[dict[str, str]] = { "Content-Type": "application/x-protobuf", "User-Agent": "OTel-OTLP-Exporter-Python/" + __version__, } diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py index 1bdb7d228c..43935dc97d 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py @@ -11,33 +11,77 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations from os import environ -from typing import Literal, Optional - -import requests +from typing import TYPE_CHECKING, Final, Literal +from opentelemetry.exporter.otlp.proto.http import Compression +from opentelemetry.exporter.otlp.proto.http._transport import BaseHTTPTransport +from opentelemetry.exporter.otlp.proto.http._transport._requests import ( + RequestsHTTPTransport, +) +from opentelemetry.exporter.otlp.proto.http._transport._urllib3 import ( + Urllib3HTTPTransport, +) from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_CREDENTIAL_PROVIDER, + OTEL_EXPORTER_OTLP_COMPRESSION, + OTEL_EXPORTER_OTLP_ENDPOINT, ) from opentelemetry.util._importlib_metadata import entry_points +if TYPE_CHECKING: + import requests + +_DEFAULT_ENDPOINT: Final[str] = "http://localhost:4318" +DEFAULT_COMPRESSION: Final[Compression] = Compression.NoCompression + + +def _compression_from_env( + signal_compression_envvar: Literal[ + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION", + "OTEL_EXPORTER_OTLP_METRICS_COMPRESSION", + "OTEL_EXPORTER_OTLP_TRACES_COMPRESSION", + ], +) -> Compression: + compression = ( + environ.get( + signal_compression_envvar, + environ.get( + OTEL_EXPORTER_OTLP_COMPRESSION, DEFAULT_COMPRESSION.value + ), + ) + .lower() + .strip() + ) + return Compression(compression) + -def _is_retryable(resp: requests.Response) -> bool: - if resp.status_code == 408: - return True - if resp.status_code >= 500 and resp.status_code <= 599: - return True - return False +def _endpoint_from_env( + signal_endpoint_envvar: Literal[ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", + "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", + ], + default_signal_path: Literal["v1/logs", "v1/metrics", "v1/traces"], +) -> str: + base = ( + environ.get( + OTEL_EXPORTER_OTLP_ENDPOINT, _DEFAULT_ENDPOINT + ).removesuffix("/") + + "/" + ) + return environ.get(signal_endpoint_envvar, base + default_signal_path) -def _load_session_from_envvar( +def _session_from_env( cred_envvar: Literal[ "OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER", "OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER", "OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER", ], -) -> Optional[requests.Session]: +) -> requests.Session | None: _credential_env = environ.get( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_CREDENTIAL_PROVIDER ) or environ.get(cred_envvar) @@ -56,6 +100,8 @@ def _load_session_from_envvar( f"Requested component '{_credential_env}' not found in " f"entry point 'opentelemetry_otlp_credential_provider'" ) + import requests # noqa: PLC0415 + if isinstance(maybe_session, requests.Session): return maybe_session else: @@ -64,3 +110,13 @@ def _load_session_from_envvar( f" must be of type `requests.Session`." ) return None + + +def _transport_from_args( + session: requests.Session | None, + verify: bool | str, + cert: str | tuple[str, str] | None, +) -> BaseHTTPTransport: + if session is not None: + return RequestsHTTPTransport(verify=verify, cert=cert, session=session) + return Urllib3HTTPTransport(verify=verify, cert=cert) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 6032433dd1..80cb766551 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -12,31 +12,42 @@ # See the License for the specific language governing permissions and # limitations under the License. -import gzip +from __future__ import annotations + import logging -import random import threading -import zlib -from io import BytesIO from os import environ -from time import time -from typing import Dict, Optional, Sequence +from typing import ( + TYPE_CHECKING, + Dict, + Final, + Literal, + Optional, + Sequence, + overload, +) from urllib.parse import urlparse -import requests -from requests.exceptions import ConnectionError - from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( ExporterMetrics, ) from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.exporter.otlp.proto.http import ( + _CONTENT_ENCODING_HEADER, _OTLP_HTTP_HEADERS, Compression, ) from opentelemetry.exporter.otlp.proto.http._common import ( - _is_retryable, - _load_session_from_envvar, + _compression_from_env, + _endpoint_from_env, + _session_from_env, + _transport_from_args, +) +from opentelemetry.exporter.otlp.proto.http._otlp_client import ( + OTLPHTTPClient, +) +from opentelemetry.exporter.otlp.proto.http._transport import ( + BaseHTTPTransport, ) from opentelemetry.metrics import MeterProvider from opentelemetry.sdk._logs import ReadableLogRecord @@ -50,8 +61,6 @@ OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_KEY, - OTEL_EXPORTER_OTLP_COMPRESSION, - OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE, OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE, @@ -70,19 +79,51 @@ ) from opentelemetry.util.re import parse_env_headers +if TYPE_CHECKING: + import requests + _logger = logging.getLogger(__name__) # This prevents logs generated when a log fails to be written to generate another log which fails to be written etc. etc. _logger.addFilter(DuplicateFilter()) -DEFAULT_COMPRESSION = Compression.NoCompression -DEFAULT_ENDPOINT = "http://localhost:4318/" -DEFAULT_LOGS_EXPORT_PATH = "v1/logs" -DEFAULT_TIMEOUT = 10 # in seconds -_MAX_RETRYS = 6 +DEFAULT_TIMEOUT: Final[int] = 10 # in seconds +DEFAULT_LOGS_EXPORT_PATH: Final[Literal["v1/logs"]] = "v1/logs" class OTLPLogExporter(LogRecordExporter): + @overload + def __init__( + self, + endpoint: str | None = ..., + certificate_file: str | None = ..., + client_key_file: str | None = ..., + client_certificate_file: str | None = ..., + headers: dict[str, str] | None = ..., + timeout: float | None = ..., + compression: Compression | None = ..., + session: requests.Session | None = ..., + *, + meter_provider: MeterProvider | None = ..., + _transport: None = ..., + ) -> None: ... + + @overload + def __init__( + self, + endpoint: str | None = ..., + certificate_file: str | None = ..., + client_key_file: str | None = ..., + client_certificate_file: str | None = ..., + headers: dict[str, str] | None = ..., + timeout: float | None = ..., + compression: Compression | None = ..., + session: None = ..., + *, + meter_provider: MeterProvider | None = ..., + _transport: BaseHTTPTransport, + ) -> None: ... + def __init__( self, endpoint: Optional[str] = None, @@ -95,13 +136,11 @@ def __init__( session: Optional[requests.Session] = None, *, meter_provider: Optional[MeterProvider] = None, + _transport: Optional[BaseHTTPTransport] = None, ): - self._shutdown_is_occuring = threading.Event() - self._endpoint = endpoint or environ.get( - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, - _append_logs_path( - environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT) - ), + self._shutdown_event = threading.Event() + self._endpoint = endpoint or _endpoint_from_env( + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, DEFAULT_LOGS_EXPORT_PATH ) # Keeping these as instance variables because they are used in tests self._certificate_file = certificate_file or environ.get( @@ -134,22 +173,35 @@ def __init__( environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), ) ) - self._compression = compression or _compression_from_env() - self._session = ( + self._compression = compression or _compression_from_env( + OTEL_EXPORTER_OTLP_LOGS_COMPRESSION + ) + + transport = _transport or _transport_from_args( session - or _load_session_from_envvar( + or _session_from_env( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER - ) - or requests.Session() + ), + self._certificate_file, + self._client_cert, ) - self._session.headers.update(self._headers) - self._session.headers.update(_OTLP_HTTP_HEADERS) - # let users override our defaults - self._session.headers.update(self._headers) + + client_headers: dict[str, str] = { + **_OTLP_HTTP_HEADERS, + **self._headers, + } if self._compression is not Compression.NoCompression: - self._session.headers.update( - {"Content-Encoding": self._compression.value} - ) + client_headers[_CONTENT_ENCODING_HEADER] = self._compression.value + + self._client = OTLPHTTPClient( + transport=transport, + endpoint=self._endpoint, + timeout=self._timeout, + compression=self._compression, + shutdown_event=self._shutdown_event, + headers=client_headers, + kind="logs", + ) self._shutdown = False self._metrics = ExporterMetrics( @@ -159,43 +211,6 @@ def __init__( meter_provider, ) - def _export( - self, serialized_data: bytes, timeout_sec: Optional[float] = None - ): - data = serialized_data - if self._compression == Compression.Gzip: - gzip_data = BytesIO() - with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: - gzip_stream.write(serialized_data) - data = gzip_data.getvalue() - elif self._compression == Compression.Deflate: - data = zlib.compress(serialized_data) - - if timeout_sec is None: - timeout_sec = self._timeout - - # By default, keep-alive is enabled in Session's request - # headers. Backends may choose to close the connection - # while a post happens which causes an unhandled - # exception. This try/except will retry the post on such exceptions - try: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - except ConnectionError: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - return resp - def export( self, batch: Sequence[ReadableLogRecord] ) -> LogRecordExportResult: @@ -205,66 +220,15 @@ def export( with self._metrics.export_operation(len(batch)) as result: serialized_data = encode_logs(batch).SerializeToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return LogRecordExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code - - if not retryable: - _logger.error( - "Failed to export logs batch code: %s, reason: %s", - status_code, - reason, - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return LogRecordExportResult.FAILURE - - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export logs batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return LogRecordExportResult.FAILURE - _logger.warning( - "Transient error %s encountered while exporting logs batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_is_occuring.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break + export_result = self._client.export(serialized_data) + if export_result.success: + return LogRecordExportResult.SUCCESS + + result.error = export_result.error + if export_result.status_code is not None: + result.error_attrs = { + HTTP_RESPONSE_STATUS_CODE: export_result.status_code + } return LogRecordExportResult.FAILURE def force_flush(self, timeout_millis: float = 10_000) -> bool: @@ -276,23 +240,5 @@ def shutdown(self): _logger.warning("Exporter already shutdown, ignoring call") return self._shutdown = True - self._shutdown_is_occuring.set() - self._session.close() - - -def _compression_from_env() -> Compression: - compression = ( - environ.get( - OTEL_EXPORTER_OTLP_LOGS_COMPRESSION, - environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"), - ) - .lower() - .strip() - ) - return Compression(compression) - - -def _append_logs_path(endpoint: str) -> str: - if endpoint.endswith("/"): - return endpoint + DEFAULT_LOGS_EXPORT_PATH - return endpoint + f"/{DEFAULT_LOGS_EXPORT_PATH}" + self._shutdown_event.set() + self._client.close() diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_otlp_client.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_otlp_client.py new file mode 100644 index 0000000000..0d3e94ca8b --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_otlp_client.py @@ -0,0 +1,192 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gzip +import logging +import random +import threading +import time +import zlib +from dataclasses import dataclass +from http import HTTPStatus +from io import BytesIO +from typing import Final, Literal, Mapping + +from opentelemetry.exporter.otlp.proto.http import Compression +from opentelemetry.exporter.otlp.proto.http._transport import ( + BaseHTTPResult, + BaseHTTPTransport, +) + +_logger = logging.getLogger(__name__) + +_MAX_RETRIES: Final[int] = 6 + + +def _is_retryable(status_code: int | None) -> bool: + if status_code is None: + return False + if status_code == HTTPStatus.REQUEST_TIMEOUT.value: + return True + if 500 <= status_code <= 599: + return True + return False + + +@dataclass(slots=True, frozen=True) +class ExportResult: + """Outcome of an OTLP export attempt, including retry exhaustion.""" + + success: bool + status_code: int | None + reason: str | None + error: Exception | None + + +class OTLPHTTPClient: + """Sends serialized OTLP payloads over HTTP with retry logic. + + Compression, backoff, and connection-error recovery are handled internally. + Callers interact through the :meth:`export` and :meth:`close` methods. + """ + + def __init__( + self, + transport: BaseHTTPTransport, + endpoint: str, + timeout: float, + compression: Compression, + shutdown_event: threading.Event, + headers: Mapping[str, str], + kind: Literal["spans", "logs", "metrics"], + jitter: float = 0.2, + ) -> None: + self._transport = transport + self._endpoint = endpoint + self._timeout = timeout + self._compression = compression + self._shutdown_event = shutdown_event + self._headers = dict(headers) + self._kind = kind + self._jitter = min(max(jitter, 0.0), 1.0) + + def _compute_backoff(self, retry: int) -> float: + return 2**retry * random.uniform(1 - self._jitter, 1 + self._jitter) + + def _compress(self, serialized_data: bytes) -> bytes: + if self._compression is Compression.Gzip: + buf = BytesIO() + with gzip.GzipFile(fileobj=buf, mode="w") as gz: + gz.write(serialized_data) + return buf.getvalue() + if self._compression is Compression.Deflate: + return zlib.compress(serialized_data) + return serialized_data + + def _submit(self, data: bytes, timeout: float) -> BaseHTTPResult: + deadline = time.time() + timeout + result = self._transport.request( + "POST", + self._endpoint, + headers=self._headers, + data=data, + timeout=timeout, + ) + if ( + result.error is not None + and result.is_connection_error() + and (remaining := deadline - time.time()) > 0 + ): + # Immediately retry connection errors once without backoff. These + # usually indicate a stale pooled connection that the transport will + # reestablish on the next attempt. + result = self._transport.request( + "POST", + self._endpoint, + headers=self._headers, + data=data, + timeout=remaining, + ) + return result + + def export(self, data: bytes) -> ExportResult: + """Export a serialized payload, retrying on transient failures. + + :param data: Serialized bytes to send. + :returns: An :class:`ExportResult` indicating success or the reason for failure. + """ + data = self._compress(data) + deadline = time.time() + self._timeout + + for retry in range(_MAX_RETRIES): + backoff = self._compute_backoff(retry) + status_code: int | None = None + reason: str | None = None + export_error: Exception | None + retryable: bool + + try: + result = self._submit(data, max(deadline - time.time(), 0.0)) + except Exception as error: + export_error = error + retryable = False + else: + status_code = result.status_code + reason = result.reason + if status_code is not None and 200 <= status_code < 400: + return ExportResult(True, status_code, reason, None) + export_error = result.error + retryable = ( + _is_retryable(status_code) + if status_code + else result.is_connection_error() + ) + + if not retryable: + _logger.error( + "Failed to export %s batch code: %s, reason: %s", + self._kind, + status_code, + reason or export_error or "unknown", + ) + return ExportResult(False, status_code, reason, export_error) + + if ( + retry + 1 == _MAX_RETRIES + or backoff > (deadline - time.time()) + or self._shutdown_event.is_set() + ): + _logger.error( + "Failed to export %s batch due to timeout, " + "max retries or shutdown.", + self._kind, + ) + return ExportResult(False, status_code, reason, export_error) + + _logger.warning( + "Transient error %s encountered while exporting %s batch, retrying in %.2fs.", + reason or export_error, + self._kind, + backoff, + ) + shutdown = self._shutdown_event.wait(backoff) + if shutdown: + _logger.warning("Shutdown in progress, aborting retry.") + break + + return ExportResult(False, None, None, None) + + def close(self) -> None: + """Close the underlying transport and release its resources.""" + self._transport.close() diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_transport/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_transport/__init__.py new file mode 100644 index 0000000000..68681e5756 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_transport/__init__.py @@ -0,0 +1,64 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from dataclasses import dataclass + + +@dataclass(frozen=True, slots=True) +class BaseHTTPResult(ABC): + """Outcome of a single HTTP request made by a :class:`BaseHTTPTransport`. + + Either ``status_code`` and ``reason`` are populated (server responded), + or ``error`` is set (request failed before a response was received). + """ + + status_code: int | None = None + reason: str | None = None + error: Exception | None = None + + @abstractmethod + def is_connection_error(self) -> bool: + """Return ``True`` if the failure is a transport-level connection error.""" + ... + + +class BaseHTTPTransport(ABC): + """Abstract HTTP transport interface used by OTLP HTTP exporters.""" + + @abstractmethod + def request( + self, + method: str, + url: str, + *, + headers: dict[str, str] | None = None, + timeout: float | None = None, + data: bytes | None = None, + ) -> BaseHTTPResult: + """Send an HTTP request and return the result. + + :param method: HTTP method (e.g. ``"POST"``). + :param url: Target URL. + :param headers: Optional HTTP headers to include in the request. + :param timeout: Optional request timeout in seconds. + :param data: Optional request body. + :returns: A :class:`BaseHTTPResult` describing the outcome. + """ + ... + + @abstractmethod + def close(self) -> None: + """Release any resources held by the transport.""" + ... diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_transport/_requests.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_transport/_requests.py new file mode 100644 index 0000000000..ce8b691df9 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_transport/_requests.py @@ -0,0 +1,102 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import functools +import warnings +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from opentelemetry.exporter.otlp.proto.http._transport import ( + BaseHTTPResult, + BaseHTTPTransport, +) + +if TYPE_CHECKING: + import requests + + +@functools.cache +def _get_connection_error_types() -> tuple[type[Exception], ...]: + import requests.exceptions # noqa: PLC0415 + + return ( + requests.exceptions.ConnectionError, + requests.exceptions.ConnectTimeout, + requests.exceptions.ReadTimeout, + requests.exceptions.Timeout, + requests.exceptions.SSLError, + requests.exceptions.ProxyError, + ) + + +@dataclass(frozen=True, slots=True) +class RequestsHTTPResult(BaseHTTPResult): + def is_connection_error(self) -> bool: + if self.error is None: + return False + return isinstance(self.error, _get_connection_error_types()) + + +class RequestsHTTPTransport(BaseHTTPTransport): + def __init__( + self, + *, + verify: bool | str = True, + cert: str | tuple[str, str] | None = None, + session: requests.Session | None = None, + ) -> None: + import requests # noqa: PLC0415 + + self._session = session if session is not None else requests.Session() + self._session.verify = verify + if cert is not None: + self._session.cert = cert + + if verify is False: + from urllib3.exceptions import ( # noqa: PLC0415 + InsecureRequestWarning, + ) + + warnings.filterwarnings("ignore", category=InsecureRequestWarning) + + def request( + self, + method: str, + url: str, + *, + headers: dict[str, str] | None = None, + timeout: float | None = None, + data: bytes | None = None, + ) -> BaseHTTPResult: + try: + response = self._session.request( + method=method, + url=url, + headers=headers, + data=data, + timeout=timeout, + allow_redirects=False, + ) + except Exception as exc: + return RequestsHTTPResult(error=exc) + + return RequestsHTTPResult( + status_code=response.status_code, + reason=response.reason, + ) + + def close(self) -> None: + self._session.close() diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_transport/_urllib3.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_transport/_urllib3.py new file mode 100644 index 0000000000..97114ba626 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_transport/_urllib3.py @@ -0,0 +1,116 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import warnings +from dataclasses import dataclass + +from opentelemetry.exporter.otlp.proto.http._transport import ( + BaseHTTPResult, + BaseHTTPTransport, +) + + +@functools.cache +def _get_connection_error_types() -> tuple[type[Exception], ...]: + import urllib3.exceptions # noqa: PLC0415 + + types: list[type[Exception]] = [ + urllib3.exceptions.ConnectionError, + urllib3.exceptions.NewConnectionError, + urllib3.exceptions.ConnectTimeoutError, + urllib3.exceptions.MaxRetryError, + urllib3.exceptions.ProtocolError, + ] + + # NameResolutionError was added in urllib3 2.0 + name_resolution_error = getattr( + urllib3.exceptions, "NameResolutionError", None + ) + if name_resolution_error is not None: + types.append(name_resolution_error) + + return tuple(types) + + +@dataclass(frozen=True, slots=True) +class Urllib3HTTPResult(BaseHTTPResult): + def is_connection_error(self) -> bool: + if self.error is None: + return False + return isinstance(self.error, _get_connection_error_types()) + + +class Urllib3HTTPTransport(BaseHTTPTransport): + def __init__( + self, + *, + verify: bool | str = True, + cert: str | tuple[str, str] | None = None, + ) -> None: + import urllib3 # noqa: PLC0415 + + pool_kwargs: dict[str, object] = { + "retries": urllib3.Retry(0, redirect=False), + } + if verify is False: + pool_kwargs["cert_reqs"] = "CERT_NONE" + warnings.filterwarnings( + "ignore", + category=urllib3.exceptions.InsecureRequestWarning, + ) + else: + pool_kwargs["cert_reqs"] = "CERT_REQUIRED" + if isinstance(verify, str): + pool_kwargs["ca_certs"] = verify + if isinstance(cert, tuple): + pool_kwargs["cert_file"] = cert[0] + pool_kwargs["key_file"] = cert[1] + elif isinstance(cert, str): + pool_kwargs["cert_file"] = cert + + self._pool = urllib3.PoolManager(**pool_kwargs) # type: ignore + + def request( + self, + method: str, + url: str, + *, + headers: dict[str, str] | None = None, + timeout: float | None = None, + data: bytes | None = None, + ) -> BaseHTTPResult: + import urllib3 # noqa: PLC0415 + + try: + response = self._pool.request( + method=method, + url=url, + headers=headers, + body=data, + timeout=urllib3.Timeout(total=timeout) + if timeout is not None + else None, + preload_content=True, + ) + except Exception as exc: + return Urllib3HTTPResult(error=exc) + + return Urllib3HTTPResult( + status_code=response.status, + reason=response.reason, + ) + + def close(self) -> None: + self._pool.clear() diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index efd63b4543..9919a15de5 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -12,26 +12,23 @@ # limitations under the License. from __future__ import annotations -import gzip import logging -import random import threading -import zlib -from io import BytesIO from os import environ -from time import time from typing import ( # noqa: F401 + TYPE_CHECKING, Any, Callable, Dict, + Final, Iterable, List, + Literal, Optional, + overload, ) from urllib.parse import urlparse -import requests -from requests.exceptions import ConnectionError from typing_extensions import deprecated from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( @@ -47,12 +44,21 @@ encode_metrics, ) from opentelemetry.exporter.otlp.proto.http import ( + _CONTENT_ENCODING_HEADER, _OTLP_HTTP_HEADERS, Compression, ) from opentelemetry.exporter.otlp.proto.http._common import ( - _is_retryable, - _load_session_from_envvar, + _compression_from_env, + _endpoint_from_env, + _session_from_env, + _transport_from_args, +) +from opentelemetry.exporter.otlp.proto.http._otlp_client import ( + OTLPHTTPClient, +) +from opentelemetry.exporter.otlp.proto.http._transport import ( + BaseHTTPTransport, ) from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401 @@ -75,8 +81,6 @@ OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_KEY, - OTEL_EXPORTER_OTLP_COMPRESSION, - OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE, OTEL_EXPORTER_OTLP_METRICS_CLIENT_CERTIFICATE, @@ -108,17 +112,56 @@ ) from opentelemetry.util.re import parse_env_headers +if TYPE_CHECKING: + import requests + + _logger = logging.getLogger(__name__) -DEFAULT_COMPRESSION = Compression.NoCompression -DEFAULT_ENDPOINT = "http://localhost:4318/" -DEFAULT_METRICS_EXPORT_PATH = "v1/metrics" -DEFAULT_TIMEOUT = 10 # in seconds -_MAX_RETRYS = 6 +DEFAULT_TIMEOUT: Final[int] = 10 # in seconds +DEFAULT_METRICS_EXPORT_PATH: Final[Literal["v1/metrics"]] = "v1/metrics" class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin): + @overload + def __init__( + self, + endpoint: str | None = ..., + certificate_file: str | None = ..., + client_key_file: str | None = ..., + client_certificate_file: str | None = ..., + headers: dict[str, str] | None = ..., + timeout: float | None = ..., + compression: Compression | None = ..., + session: requests.Session | None = ..., + preferred_temporality: dict[type, AggregationTemporality] | None = ..., + preferred_aggregation: dict[type, Aggregation] | None = ..., + max_export_batch_size: int | None = ..., + *, + meter_provider: MeterProvider | None = ..., + _transport: None = ..., + ) -> None: ... + + @overload + def __init__( + self, + endpoint: str | None = ..., + certificate_file: str | None = ..., + client_key_file: str | None = ..., + client_certificate_file: str | None = ..., + headers: dict[str, str] | None = ..., + timeout: float | None = ..., + compression: Compression | None = ..., + session: None = ..., + preferred_temporality: dict[type, AggregationTemporality] | None = ..., + preferred_aggregation: dict[type, Aggregation] | None = ..., + max_export_batch_size: int | None = ..., + *, + meter_provider: MeterProvider | None = ..., + _transport: BaseHTTPTransport, + ) -> None: ... + def __init__( self, endpoint: str | None = None, @@ -135,6 +178,7 @@ def __init__( max_export_batch_size: int | None = None, *, meter_provider: Optional[MeterProvider] = None, + _transport: BaseHTTPTransport | None = None, ): """OTLP HTTP metrics exporter @@ -157,12 +201,9 @@ def __init__( If not set there is no limit to the number of data points in a request. If it is set and the number of data points exceeds the max, the request will be split. """ - self._shutdown_in_progress = threading.Event() - self._endpoint = endpoint or environ.get( - OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, - _append_metrics_path( - environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT) - ), + self._shutdown_event = threading.Event() + self._endpoint = endpoint or _endpoint_from_env( + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, DEFAULT_METRICS_EXPORT_PATH ) self._certificate_file = certificate_file or environ.get( OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE, @@ -194,22 +235,35 @@ def __init__( environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), ) ) - self._compression = compression or _compression_from_env() - self._session = ( + self._compression = compression or _compression_from_env( + OTEL_EXPORTER_OTLP_METRICS_COMPRESSION + ) + + transport = _transport or _transport_from_args( session - or _load_session_from_envvar( + or _session_from_env( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER - ) - or requests.Session() + ), + self._certificate_file, + self._client_cert, ) - self._session.headers.update(self._headers) - self._session.headers.update(_OTLP_HTTP_HEADERS) - # let users override our defaults - self._session.headers.update(self._headers) + + client_headers: dict[str, str] = { + **_OTLP_HTTP_HEADERS, + **self._headers, + } if self._compression is not Compression.NoCompression: - self._session.headers.update( - {"Content-Encoding": self._compression.value} - ) + client_headers[_CONTENT_ENCODING_HEADER] = self._compression.value + + self._client = OTLPHTTPClient( + transport=transport, + endpoint=self._endpoint, + timeout=self._timeout, + compression=self._compression, + shutdown_event=self._shutdown_event, + headers=client_headers, + kind="metrics", + ) self._common_configuration( preferred_temporality, preferred_aggregation @@ -224,120 +278,30 @@ def __init__( meter_provider, ) - def _export( - self, serialized_data: bytes, timeout_sec: Optional[float] = None - ): - data = serialized_data - if self._compression == Compression.Gzip: - gzip_data = BytesIO() - with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: - gzip_stream.write(serialized_data) - data = gzip_data.getvalue() - elif self._compression == Compression.Deflate: - data = zlib.compress(serialized_data) - - if timeout_sec is None: - timeout_sec = self._timeout - - # By default, keep-alive is enabled in Session's request - # headers. Backends may choose to close the connection - # while a post happens which causes an unhandled - # exception. This try/except will retry the post on such exceptions - try: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - except ConnectionError: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - return resp - def _export_with_retries( self, export_request: ExportMetricsServiceRequest, - deadline_sec: float, num_items: int, ) -> MetricExportResult: - """Export serialized data with retry logic until success, non-transient error, or exponential backoff maxed out. + """Export serialized data via the shared OTLPHTTPClient. Args: export_request: ExportMetricsServiceRequest object containing metrics data to export - deadline_sec: timestamp deadline for the export Returns: MetricExportResult: SUCCESS if export succeeded, FAILURE otherwise """ with self._metrics.export_operation(num_items) as result: serialized_data = export_request.SerializeToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return MetricExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code - - if not retryable: - _logger.error( - "Failed to export metrics batch code: %s, reason: %s", - status_code, - reason, - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return MetricExportResult.FAILURE - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export metrics batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return MetricExportResult.FAILURE - - _logger.warning( - "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_in_progress.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break + export_result = self._client.export(serialized_data) + if export_result.success: + return MetricExportResult.SUCCESS + + result.error = export_result.error + if export_result.status_code is not None: + result.error_attrs = { + HTTP_RESPONSE_STATUS_CODE: export_result.status_code + } return MetricExportResult.FAILURE def export( @@ -357,13 +321,10 @@ def export( num_items += len(metric.data.data_points) export_request = encode_metrics(metrics_data) - deadline_sec = time() + self._timeout # If no batch size configured, export as single batch with retries as configured if self._max_export_batch_size is None: - return self._export_with_retries( - export_request, deadline_sec, num_items - ) + return self._export_with_retries(export_request, num_items) # Else, export in batches of configured size batched_export_requests = _split_metrics_data( @@ -373,7 +334,6 @@ def export( for split_metrics_data in batched_export_requests: export_result = self._export_with_retries( split_metrics_data, - deadline_sec, num_items, ) if export_result != MetricExportResult.SUCCESS: @@ -387,8 +347,8 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: _logger.warning("Exporter already shutdown, ignoring call") return self._shutdown = True - self._shutdown_in_progress.set() - self._session.close() + self._shutdown_event.set() + self._client.close() @property def _exporting(self) -> str: @@ -725,21 +685,3 @@ def get_resource_data( name: str, ) -> List[PB2Resource]: return _get_resource_data(sdk_resource_scope_data, resource_class, name) - - -def _compression_from_env() -> Compression: - compression = ( - environ.get( - OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, - environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"), - ) - .lower() - .strip() - ) - return Compression(compression) - - -def _append_metrics_path(endpoint: str) -> str: - if endpoint.endswith("/"): - return endpoint + DEFAULT_METRICS_EXPORT_PATH - return endpoint + f"/{DEFAULT_METRICS_EXPORT_PATH}" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 018d89df1e..fa37af26ed 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -12,20 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import gzip +from __future__ import annotations + import logging -import random import threading -import zlib -from io import BytesIO from os import environ -from time import time -from typing import Dict, Optional, Sequence +from typing import ( + TYPE_CHECKING, + Dict, + Final, + Literal, + Optional, + Sequence, + overload, +) from urllib.parse import urlparse -import requests -from requests.exceptions import ConnectionError - from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( ExporterMetrics, ) @@ -33,12 +35,21 @@ encode_spans, ) from opentelemetry.exporter.otlp.proto.http import ( + _CONTENT_ENCODING_HEADER, _OTLP_HTTP_HEADERS, Compression, ) from opentelemetry.exporter.otlp.proto.http._common import ( - _is_retryable, - _load_session_from_envvar, + _compression_from_env, + _endpoint_from_env, + _session_from_env, + _transport_from_args, +) +from opentelemetry.exporter.otlp.proto.http._otlp_client import ( + OTLPHTTPClient, +) +from opentelemetry.exporter.otlp.proto.http._transport import ( + BaseHTTPTransport, ) from opentelemetry.metrics import MeterProvider from opentelemetry.sdk.environment_variables import ( @@ -46,8 +57,6 @@ OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_KEY, - OTEL_EXPORTER_OTLP_COMPRESSION, - OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TIMEOUT, OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE, @@ -68,17 +77,49 @@ ) from opentelemetry.util.re import parse_env_headers +if TYPE_CHECKING: + import requests + _logger = logging.getLogger(__name__) -DEFAULT_COMPRESSION = Compression.NoCompression -DEFAULT_ENDPOINT = "http://localhost:4318/" -DEFAULT_TRACES_EXPORT_PATH = "v1/traces" -DEFAULT_TIMEOUT = 10 # in seconds -_MAX_RETRYS = 6 +DEFAULT_TIMEOUT: Final[int] = 10 # in seconds +DEFAULT_TRACES_EXPORT_PATH: Final[Literal["v1/traces"]] = "v1/traces" class OTLPSpanExporter(SpanExporter): + @overload + def __init__( + self, + endpoint: str | None = ..., + certificate_file: str | None = ..., + client_key_file: str | None = ..., + client_certificate_file: str | None = ..., + headers: dict[str, str] | None = ..., + timeout: float | None = ..., + compression: Compression | None = ..., + session: requests.Session | None = ..., + *, + meter_provider: MeterProvider | None = ..., + _transport: None = ..., + ) -> None: ... + + @overload + def __init__( + self, + endpoint: str | None = ..., + certificate_file: str | None = ..., + client_key_file: str | None = ..., + client_certificate_file: str | None = ..., + headers: dict[str, str] | None = ..., + timeout: float | None = ..., + compression: Compression | None = ..., + session: None = ..., + *, + meter_provider: MeterProvider | None = ..., + _transport: BaseHTTPTransport, + ) -> None: ... + def __init__( self, endpoint: Optional[str] = None, @@ -91,13 +132,11 @@ def __init__( session: Optional[requests.Session] = None, *, meter_provider: Optional[MeterProvider] = None, + _transport: Optional[BaseHTTPTransport] = None, ): - self._shutdown_in_progress = threading.Event() - self._endpoint = endpoint or environ.get( - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, - _append_trace_path( - environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT) - ), + self._shutdown_event = threading.Event() + self._endpoint = endpoint or _endpoint_from_env( + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, DEFAULT_TRACES_EXPORT_PATH ) self._certificate_file = certificate_file or environ.get( OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE, @@ -129,22 +168,35 @@ def __init__( environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), ) ) - self._compression = compression or _compression_from_env() - self._session = ( + self._compression = compression or _compression_from_env( + OTEL_EXPORTER_OTLP_TRACES_COMPRESSION + ) + + transport = _transport or _transport_from_args( session - or _load_session_from_envvar( + or _session_from_env( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER - ) - or requests.Session() + ), + self._certificate_file, + self._client_cert, ) - self._session.headers.update(self._headers) - self._session.headers.update(_OTLP_HTTP_HEADERS) - # let users override our defaults - self._session.headers.update(self._headers) + + client_headers: dict[str, str] = { + **_OTLP_HTTP_HEADERS, + **self._headers, + } if self._compression is not Compression.NoCompression: - self._session.headers.update( - {"Content-Encoding": self._compression.value} - ) + client_headers[_CONTENT_ENCODING_HEADER] = self._compression.value + + self._client = OTLPHTTPClient( + transport=transport, + endpoint=self._endpoint, + timeout=self._timeout, + compression=self._compression, + shutdown_event=self._shutdown_event, + headers=client_headers, + kind="spans", + ) self._shutdown = False self._metrics = ExporterMetrics( @@ -154,43 +206,6 @@ def __init__( meter_provider, ) - def _export( - self, serialized_data: bytes, timeout_sec: Optional[float] = None - ): - data = serialized_data - if self._compression == Compression.Gzip: - gzip_data = BytesIO() - with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: - gzip_stream.write(serialized_data) - data = gzip_data.getvalue() - elif self._compression == Compression.Deflate: - data = zlib.compress(serialized_data) - - if timeout_sec is None: - timeout_sec = self._timeout - - # By default, keep-alive is enabled in Session's request - # headers. Backends may choose to close the connection - # while a post happens which causes an unhandled - # exception. This try/except will retry the post on such exceptions - try: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - except ConnectionError: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - return resp - def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: if self._shutdown: _logger.warning("Exporter already shutdown, ignoring batch") @@ -198,66 +213,15 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: with self._metrics.export_operation(len(spans)) as result: serialized_data = encode_spans(spans).SerializePartialToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return SpanExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code - - if not retryable: - _logger.error( - "Failed to export span batch code: %s, reason: %s", - status_code, - reason, - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return SpanExportResult.FAILURE - - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export span batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return SpanExportResult.FAILURE - _logger.warning( - "Transient error %s encountered while exporting span batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_in_progress.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break + export_result = self._client.export(serialized_data) + if export_result.success: + return SpanExportResult.SUCCESS + + result.error = export_result.error + if export_result.status_code is not None: + result.error_attrs = { + HTTP_RESPONSE_STATUS_CODE: export_result.status_code + } return SpanExportResult.FAILURE def shutdown(self): @@ -265,27 +229,9 @@ def shutdown(self): _logger.warning("Exporter already shutdown, ignoring call") return self._shutdown = True - self._shutdown_in_progress.set() - self._session.close() + self._shutdown_event.set() + self._client.close() def force_flush(self, timeout_millis: int = 30000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" return True - - -def _compression_from_env() -> Compression: - compression = ( - environ.get( - OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, - environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"), - ) - .lower() - .strip() - ) - return Compression(compression) - - -def _append_trace_path(endpoint: str) -> str: - if endpoint.endswith("/"): - return endpoint + DEFAULT_TRACES_EXPORT_PATH - return endpoint + f"/{DEFAULT_TRACES_EXPORT_PATH}" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 5f7ae2afa9..5484e751b9 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -19,7 +19,7 @@ from os import environ from typing import List from unittest import TestCase -from unittest.mock import ANY, MagicMock, Mock, patch +from unittest.mock import ANY, MagicMock, patch import requests from requests import Session @@ -30,9 +30,21 @@ encode_metrics, ) from opentelemetry.exporter.otlp.proto.http import Compression -from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( +from opentelemetry.exporter.otlp.proto.http._common import ( + _DEFAULT_ENDPOINT, DEFAULT_COMPRESSION, - DEFAULT_ENDPOINT, +) +from opentelemetry.exporter.otlp.proto.http._otlp_client import ( + ExportResult, + OTLPHTTPClient, +) +from opentelemetry.exporter.otlp.proto.http._transport._requests import ( + RequestsHTTPTransport, +) +from opentelemetry.exporter.otlp.proto.http._transport._urllib3 import ( + Urllib3HTTPTransport, +) +from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( DEFAULT_METRICS_EXPORT_PATH, DEFAULT_TIMEOUT, OTLPMetricExporter, @@ -143,7 +155,8 @@ def test_constructor_default(self): exporter = OTLPMetricExporter() self.assertEqual( - exporter._endpoint, DEFAULT_ENDPOINT + DEFAULT_METRICS_EXPORT_PATH + exporter._endpoint, + f"{_DEFAULT_ENDPOINT}/{DEFAULT_METRICS_EXPORT_PATH}", ) self.assertEqual(exporter._certificate_file, True) self.assertEqual(exporter._client_certificate_file, None) @@ -151,16 +164,18 @@ def test_constructor_default(self): self.assertEqual(exporter._timeout, DEFAULT_TIMEOUT) self.assertIs(exporter._compression, DEFAULT_COMPRESSION) self.assertEqual(exporter._headers, {}) - self.assertIsInstance(exporter._session, Session) - self.assertIn("User-Agent", exporter._session.headers) + self.assertIn("User-Agent", exporter._client._headers) self.assertEqual( - exporter._session.headers.get("Content-Type"), + exporter._client._headers.get("Content-Type"), "application/x-protobuf", ) self.assertEqual( - exporter._session.headers.get("User-Agent"), + exporter._client._headers.get("User-Agent"), "OTel-OTLP-Exporter-Python/" + __version__, ) + self.assertIsInstance( + exporter._client._transport, Urllib3HTTPTransport + ) @patch.dict( "os.environ", @@ -211,13 +226,17 @@ def f(): "user-agent": "metrics-user-agent", }, ) - self.assertIsInstance(exporter._session, Session) + self.assertIsInstance( + exporter._client._transport, RequestsHTTPTransport + ) + self.assertIs(exporter._client._transport._session, credential) + self.assertIsInstance(exporter._client._transport._session, Session) self.assertEqual( - exporter._session.headers.get("User-Agent"), + exporter._client._headers.get("user-agent"), "metrics-user-agent", ) self.assertEqual( - exporter._session.headers.get("Content-Type"), + exporter._client._headers.get("Content-Type"), "application/x-protobuf", ) @@ -235,6 +254,7 @@ def f(): }, ) def test_exporter_constructor_take_priority(self): + session = Session() exporter = OTLPMetricExporter( endpoint="example.com/1234", certificate_file="path/to/service.crt", @@ -243,7 +263,7 @@ def test_exporter_constructor_take_priority(self): headers={"testHeader1": "value1", "testHeader2": "value2"}, timeout=20, compression=Compression.NoCompression, - session=Session(), + session=session, ) self.assertEqual(exporter._endpoint, "example.com/1234") @@ -258,7 +278,11 @@ def test_exporter_constructor_take_priority(self): exporter._headers, {"testHeader1": "value1", "testHeader2": "value2"}, ) - self.assertIsInstance(exporter._session, Session) + self.assertIsInstance( + exporter._client._transport, RequestsHTTPTransport + ) + self.assertIs(exporter._client._transport._session, session) + self.assertIsInstance(exporter._client._transport._session, Session) @patch.dict( "os.environ", @@ -334,13 +358,13 @@ def test_headers_parse_from_env(self): ), ) - @patch.object(Session, "post") + @patch.object(Session, "request") def test_success(self, mock_post): resp = Response() resp.status_code = 200 mock_post.return_value = resp - exporter = OTLPMetricExporter() + exporter = OTLPMetricExporter(session=Session()) exporter.set_meter_provider(self.meter_provider) self.assertEqual( @@ -372,13 +396,13 @@ def test_success(self, mock_post): metrics[2].data.data_points[0].attributes ) - @patch.object(Session, "post") + @patch.object(Session, "request") def test_failure(self, mock_post): resp = Response() resp.status_code = 401 mock_post.return_value = resp - exporter = OTLPMetricExporter() + exporter = OTLPMetricExporter(session=Session()) exporter.set_meter_provider(self.meter_provider) self.assertEqual( @@ -433,13 +457,13 @@ def test_failure(self, mock_post): 401, ) - @patch.object(Session, "post") - def test_serialization(self, mock_post): + @patch.object(Session, "request") + def test_serialization(self, mock_request): resp = Response() resp.status_code = 200 - mock_post.return_value = resp + mock_request.return_value = resp - exporter = OTLPMetricExporter() + exporter = OTLPMetricExporter(session=Session()) self.assertEqual( exporter.export(self.metrics["sum_int"]), @@ -447,12 +471,13 @@ def test_serialization(self, mock_post): ) serialized_data = encode_metrics(self.metrics["sum_int"]) - mock_post.assert_called_once_with( + mock_request.assert_called_once_with( + method="POST", url=exporter._endpoint, + headers=ANY, data=serialized_data.SerializeToString(), - verify=exporter._certificate_file, timeout=ANY, # Timeout is a float based on real time, can't put an exact value here. - cert=exporter._client_cert, + allow_redirects=False, ) def test_split_metrics_data_many_data_points(self): @@ -952,7 +977,7 @@ def _create_metrics_data_multiple_data_points( ] ) - @patch.object(Session, "post") + @patch.object(Session, "request") def test_export_max_export_batch_size_single_batch_integration( self, mock_post ): @@ -964,7 +989,9 @@ def test_export_max_export_batch_size_single_batch_integration( metrics_data = ( TestOTLPMetricExporter._create_metrics_data_multiple_data_points(2) ) - exporter = OTLPMetricExporter(max_export_batch_size=3) + exporter = OTLPMetricExporter( + session=Session(), max_export_batch_size=3 + ) result = exporter.export(metrics_data) self.assertEqual(result, MetricExportResult.SUCCESS) @@ -974,9 +1001,6 @@ def test_export_max_export_batch_size_single_batch_integration( call_args = mock_post.call_args self.assertEqual(call_args.kwargs["url"], exporter._endpoint) self.assertIsInstance(call_args.kwargs["data"], bytes) - self.assertEqual( - call_args.kwargs["verify"], exporter._certificate_file - ) batch_data = call_args.kwargs["data"] request = ExportMetricsServiceRequest() request.ParseFromString(batch_data) @@ -986,7 +1010,7 @@ def test_export_max_export_batch_size_single_batch_integration( metric_names = {metric.name for metric in metrics} self.assertEqual(metric_names, {"sum_int_0", "sum_int_1"}) - @patch.object(Session, "post") + @patch.object(Session, "request") def test_export_max_export_batch_size_multiple_batches_integration( self, mock_post ): @@ -998,7 +1022,9 @@ def test_export_max_export_batch_size_multiple_batches_integration( metrics_data = ( TestOTLPMetricExporter._create_metrics_data_multiple_data_points(3) ) - exporter = OTLPMetricExporter(max_export_batch_size=2) + exporter = OTLPMetricExporter( + session=Session(), max_export_batch_size=2 + ) result = exporter.export(metrics_data) self.assertEqual(result, MetricExportResult.SUCCESS) @@ -1007,9 +1033,6 @@ def test_export_max_export_batch_size_multiple_batches_integration( for call_args in mock_post.call_args_list: self.assertEqual(call_args.kwargs["url"], exporter._endpoint) self.assertIsInstance(call_args.kwargs["data"], bytes) - self.assertEqual( - call_args.kwargs["verify"], exporter._certificate_file - ) self.assertEqual(len(mock_post.call_args_list), 2) # First batch should contain sum_int_0 and sum_int_1 @@ -1035,7 +1058,7 @@ def test_export_max_export_batch_size_multiple_batches_integration( self.assertEqual(len(second_metrics), 1) self.assertEqual(second_metrics[0].name, "sum_int_2") - @patch.object(Session, "post") + @patch.object(Session, "request") def test_export_max_export_batch_size_retry_scenarios_integration( self, mock_post ): @@ -1051,7 +1074,9 @@ def test_export_max_export_batch_size_retry_scenarios_integration( metrics_data = ( TestOTLPMetricExporter._create_metrics_data_multiple_data_points(3) ) - exporter = OTLPMetricExporter(max_export_batch_size=2) + exporter = OTLPMetricExporter( + session=Session(), max_export_batch_size=2 + ) # Export should fail when second batch fails result = exporter.export(metrics_data) @@ -1070,7 +1095,7 @@ def test_export_max_export_batch_size_retry_scenarios_integration( first_metric_names = {metric.name for metric in first_metrics} self.assertEqual(first_metric_names, {"sum_int_0", "sum_int_1"}) - @patch.object(Session, "post") + @patch.object(Session, "request") def test_export_max_export_batch_size_retryable_failure_integration( self, mock_post ): @@ -1089,7 +1114,9 @@ def test_export_max_export_batch_size_retryable_failure_integration( metrics_data = ( TestOTLPMetricExporter._create_metrics_data_multiple_data_points(3) ) - exporter = OTLPMetricExporter(max_export_batch_size=2, timeout=2.0) + exporter = OTLPMetricExporter( + session=Session(), max_export_batch_size=2, timeout=2.0 + ) # Export should eventually succeed after retry result = exporter.export(metrics_data) @@ -1265,8 +1292,12 @@ def test_exponential_explicit_bucket_histogram(self): ExplicitBucketHistogramAggregation, ) - @patch.object(OTLPMetricExporter, "_export", return_value=Mock(ok=True)) - def test_2xx_status_code(self, mock_otlp_metric_exporter): + @patch.object( + OTLPHTTPClient, + "export", + return_value=ExportResult(True, 200, "OK", None), + ) + def test_2xx_status_code(self, mock_client_export): """ Test that any HTTP 2XX code returns a successful result """ @@ -1291,10 +1322,10 @@ def test_preferred_aggregation_override(self): exporter._preferred_aggregation[Histogram], histogram_aggregation ) - @patch.object(Session, "post") + @patch.object(Session, "request") def test_retry_timeout(self, mock_post): exporter = OTLPMetricExporter( - timeout=1.5, meter_provider=self.meter_provider + session=Session(), timeout=1.5, meter_provider=self.meter_provider ) resp = Response() @@ -1365,9 +1396,9 @@ def test_retry_timeout(self, mock_post): 503, ) - @patch.object(Session, "post") + @patch.object(Session, "request") def test_export_no_collector_available_retryable(self, mock_post): - exporter = OTLPMetricExporter(timeout=1.5) + exporter = OTLPMetricExporter(session=Session(), timeout=1.5) msg = "Server not available." mock_post.side_effect = ConnectionError(msg) with self.assertLogs(level=WARNING) as warning: @@ -1383,9 +1414,9 @@ def test_export_no_collector_available_retryable(self, mock_post): warning.records[0].message, ) - @patch.object(Session, "post") + @patch.object(Session, "request") def test_export_no_collector_available(self, mock_post): - exporter = OTLPMetricExporter(timeout=1.5) + exporter = OTLPMetricExporter(session=Session(), timeout=1.5) mock_post.side_effect = requests.exceptions.RequestException() with self.assertLogs(level=WARNING) as warning: @@ -1399,7 +1430,7 @@ def test_export_no_collector_available(self, mock_post): warning.records[0].message, ) - @patch.object(Session, "post") + @patch.object(Session, "request") def test_timeout_set_correctly(self, mock_post): resp = Response() resp.status_code = 200 @@ -1410,12 +1441,12 @@ def export_side_effect(*args, **kwargs): return resp mock_post.side_effect = export_side_effect - exporter = OTLPMetricExporter(timeout=0.4) + exporter = OTLPMetricExporter(session=Session(), timeout=0.4) exporter.export(self.metrics["sum_int"]) - @patch.object(Session, "post") + @patch.object(Session, "request") def test_shutdown_interrupts_retry_backoff(self, mock_post): - exporter = OTLPMetricExporter(timeout=1.5) + exporter = OTLPMetricExporter(session=Session(), timeout=1.5) resp = Response() resp.status_code = 503 diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_otlp_http_client.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_otlp_http_client.py new file mode 100644 index 0000000000..dffc4a03f3 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_otlp_http_client.py @@ -0,0 +1,280 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gzip +import threading +import unittest +import zlib +from dataclasses import dataclass +from unittest.mock import Mock, patch + +from opentelemetry.exporter.otlp.proto.http import Compression +from opentelemetry.exporter.otlp.proto.http._otlp_client import ( + OTLPHTTPClient, +) +from opentelemetry.exporter.otlp.proto.http._transport import ( + BaseHTTPResult, + BaseHTTPTransport, +) + + +@dataclass(frozen=True, slots=True) +class _TestHTTPResult(BaseHTTPResult): + connection_error: bool = False + + def is_connection_error(self) -> bool: + return self.connection_error + + +class _TestHTTPTransport(BaseHTTPTransport): + def __init__(self, *results): + self.results = list(results) + self.requests = [] + self.closed = False + + def request( + self, + method, + url, + *, + headers=None, + timeout=None, + data=None, + ): + self.requests.append( + { + "method": method, + "url": url, + "headers": headers, + "timeout": timeout, + "data": data, + } + ) + result = self.results.pop(0) + if isinstance(result, Exception): + raise result + return result + + def close(self): + self.closed = True + + +class TestOTLPHTTPClient(unittest.TestCase): + @staticmethod + def _client( + transport, + *, + timeout=5.0, + compression=Compression.NoCompression, + shutdown_event=None, + jitter=0.0, + ): + return OTLPHTTPClient( + transport=transport, + endpoint="http://example.test/v1/traces", + timeout=timeout, + compression=compression, + shutdown_event=shutdown_event or threading.Event(), + headers={"content-type": "application/x-protobuf"}, + kind="spans", + jitter=jitter, + ) + + def test_export_returns_success_for_success_status_codes(self): + cases = ( + (200, "OK"), + (204, "No Content"), + (302, "Found"), + ) + + for status_code, reason in cases: + with self.subTest(status_code=status_code): + transport = _TestHTTPTransport( + _TestHTTPResult(status_code=status_code, reason=reason) + ) + client = self._client(transport) + + result = client.export(b"payload") + + self.assertTrue(result.success) + self.assertEqual(result.status_code, status_code) + self.assertEqual(result.reason, reason) + self.assertIsNone(result.error) + + @patch( + "opentelemetry.exporter.otlp.proto.http._otlp_client.time.time", + side_effect=(100.0, 100.0, 100.0), + ) + def test_export_sends_request_arguments(self, mock_time): + transport = _TestHTTPTransport( + _TestHTTPResult(status_code=200, reason="OK") + ) + client = self._client(transport, timeout=3.0) + + client.export(b"payload") + + self.assertEqual(len(transport.requests), 1) + self.assertEqual( + transport.requests[0], + { + "method": "POST", + "url": "http://example.test/v1/traces", + "headers": {"content-type": "application/x-protobuf"}, + "timeout": 3.0, + "data": b"payload", + }, + ) + self.assertEqual(mock_time.call_count, 3) + + def test_export_compresses_payload(self): + cases = ( + ( + Compression.NoCompression, + lambda data: data, + ), + ( + Compression.Gzip, + gzip.decompress, + ), + ( + Compression.Deflate, + zlib.decompress, + ), + ) + + for compression, decompress in cases: + with self.subTest(compression=compression): + transport = _TestHTTPTransport( + _TestHTTPResult(status_code=200, reason="OK") + ) + client = self._client(transport, compression=compression) + + result = client.export(b"payload") + + self.assertTrue(result.success) + self.assertEqual( + decompress(transport.requests[0]["data"]), b"payload" + ) + + def test_export_retries_retryable_status_codes(self): + cases = ( + (408, "Request Timeout"), + (500, "Internal Server Error"), + (503, "Service Unavailable"), + ) + + for status_code, reason in cases: + with self.subTest(status_code=status_code): + shutdown_event = Mock(spec=threading.Event) + shutdown_event.is_set.return_value = False + shutdown_event.wait.return_value = False + transport = _TestHTTPTransport( + _TestHTTPResult( + status_code=status_code, + reason=reason, + ), + _TestHTTPResult(status_code=200, reason="OK"), + ) + client = self._client( + transport, + shutdown_event=shutdown_event, + ) + + result = client.export(b"payload") + + self.assertTrue(result.success) + self.assertEqual(len(transport.requests), 2) + shutdown_event.wait.assert_called_once_with(1.0) + + def test_export_retries_connection_errors_immediately(self): + error = RuntimeError("connection failed") + transport = _TestHTTPTransport( + _TestHTTPResult(error=error, connection_error=True), + _TestHTTPResult(status_code=200, reason="OK"), + ) + client = self._client(transport) + + result = client.export(b"payload") + + self.assertTrue(result.success) + self.assertEqual(len(transport.requests), 2) + self.assertAlmostEqual(transport.requests[0]["timeout"], 5.0, 2) + self.assertLessEqual( + transport.requests[1]["timeout"], + transport.requests[0]["timeout"], + ) + self.assertGreater(transport.requests[1]["timeout"], 0.0) + + def test_export_returns_failure_for_non_retryable_errors(self): + exception = RuntimeError("request failed") + cases = ( + ( + _TestHTTPResult(status_code=400, reason="Bad Request"), + 400, + "Bad Request", + None, + ), + ( + _TestHTTPResult(error=exception), + None, + None, + exception, + ), + ( + exception, + None, + None, + exception, + ), + ) + + for ( + response, + expected_status_code, + expected_reason, + expected_error, + ) in cases: + with self.subTest(response=type(response).__name__): + transport = _TestHTTPTransport(response) + client = self._client(transport) + + result = client.export(b"payload") + + self.assertFalse(result.success) + self.assertEqual(result.status_code, expected_status_code) + self.assertEqual(result.reason, expected_reason) + self.assertIs(result.error, expected_error) + + def test_export_returns_failure_when_shutdown_blocks_retry(self): + shutdown_event = Mock(spec=threading.Event) + shutdown_event.is_set.return_value = True + transport = _TestHTTPTransport( + _TestHTTPResult(status_code=503, reason="Service Unavailable") + ) + client = self._client(transport, shutdown_event=shutdown_event) + + result = client.export(b"payload") + + self.assertFalse(result.success) + self.assertEqual(result.status_code, 503) + self.assertEqual(result.reason, "Service Unavailable") + shutdown_event.wait.assert_not_called() + + def test_close_closes_transport(self): + transport = _TestHTTPTransport() + client = self._client(transport) + + client.close() + + self.assertTrue(transport.closed) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_otlp_http_transport.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_otlp_http_transport.py new file mode 100644 index 0000000000..27c4e53471 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_otlp_http_transport.py @@ -0,0 +1,293 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest.mock import Mock, patch + +import requests +import urllib3 +from requests.models import Response + +from opentelemetry.exporter.otlp.proto.http._common import ( + _transport_from_args, +) +from opentelemetry.exporter.otlp.proto.http._transport._requests import ( + RequestsHTTPResult, + RequestsHTTPTransport, +) +from opentelemetry.exporter.otlp.proto.http._transport._urllib3 import ( + Urllib3HTTPResult, + Urllib3HTTPTransport, +) + + +# pylint: disable=protected-access +class TestRequestsHTTPTransport(unittest.TestCase): + def test_constructor_configures_session(self): + session = requests.Session() + + transport = RequestsHTTPTransport( + verify="certificate.pem", + cert=("client-cert.pem", "client-key.pem"), + session=session, + ) + + self.assertIs(transport._session, session) + self.assertEqual(session.verify, "certificate.pem") + self.assertEqual(session.cert, ("client-cert.pem", "client-key.pem")) + + @patch.object(requests.Session, "request") + def test_request_returns_response_result(self, mock_request): + response = Response() + response.status_code = 202 + response.reason = "Accepted" + mock_request.return_value = response + transport = RequestsHTTPTransport(session=requests.Session()) + + result = transport.request( + "POST", + "http://example.test/v1/traces", + headers={"content-type": "application/x-protobuf"}, + data=b"payload", + timeout=1.5, + ) + + self.assertEqual(result.status_code, 202) + self.assertEqual(result.reason, "Accepted") + self.assertIsNone(result.error) + mock_request.assert_called_once_with( + method="POST", + url="http://example.test/v1/traces", + headers={"content-type": "application/x-protobuf"}, + data=b"payload", + timeout=1.5, + allow_redirects=False, + ) + + def test_request_returns_error_result(self): + session = Mock(spec=requests.Session) + error = requests.exceptions.RequestException("request failed") + session.request.side_effect = error + transport = RequestsHTTPTransport(session=session) + + result = transport.request("POST", "http://example.test/v1/traces") + + self.assertIsNone(result.status_code) + self.assertIsNone(result.reason) + self.assertIs(result.error, error) + + def test_result_identifies_connection_errors(self): + cases = ( + (requests.exceptions.ConnectionError("connection"), True), + (requests.exceptions.ConnectTimeout("connect timeout"), True), + (requests.exceptions.ReadTimeout("read timeout"), True), + (requests.exceptions.Timeout("timeout"), True), + (requests.exceptions.SSLError("ssl"), True), + (requests.exceptions.ProxyError("proxy"), True), + (requests.exceptions.RequestException("request"), False), + (None, False), + ) + + for error, expected in cases: + with self.subTest( + error=type(error).__name__ if error else None, + expected=expected, + ): + self.assertEqual( + RequestsHTTPResult(error=error).is_connection_error(), + expected, + ) + + def test_close_closes_session(self): + session = Mock(spec=requests.Session) + transport = RequestsHTTPTransport(session=session) + + self.assertIs(transport._session, session) + transport.close() + + session.close.assert_called_once_with() + + +class TestUrllib3HTTPTransport(unittest.TestCase): + @patch("urllib3.PoolManager") + def test_constructor_configures_pool_manager(self, mock_pool_manager): + cases = ( + ( + {"verify": True, "cert": None}, + {"cert_reqs": "CERT_REQUIRED"}, + ), + ( + {"verify": False, "cert": None}, + {"cert_reqs": "CERT_NONE"}, + ), + ( + {"verify": "certificate.pem", "cert": None}, + { + "cert_reqs": "CERT_REQUIRED", + "ca_certs": "certificate.pem", + }, + ), + ( + {"verify": True, "cert": "client-cert.pem"}, + { + "cert_reqs": "CERT_REQUIRED", + "cert_file": "client-cert.pem", + }, + ), + ( + { + "verify": True, + "cert": ("client-cert.pem", "client-key.pem"), + }, + { + "cert_reqs": "CERT_REQUIRED", + "cert_file": "client-cert.pem", + "key_file": "client-key.pem", + }, + ), + ) + + for kwargs, expected_pool_kwargs in cases: + with self.subTest(**kwargs): + mock_pool_manager.reset_mock() + + Urllib3HTTPTransport(**kwargs) + + mock_pool_manager.assert_called_once() + pool_kwargs = mock_pool_manager.call_args.kwargs + retries = pool_kwargs.pop("retries") + self.assertIsInstance(retries, urllib3.Retry) + self.assertEqual(retries.total, 0) + self.assertFalse(retries.redirect) + self.assertEqual(pool_kwargs, expected_pool_kwargs) + + @patch("urllib3.PoolManager") + def test_request_returns_response_result(self, mock_pool_manager): + pool = mock_pool_manager.return_value + pool.request.return_value = Mock(status=204, reason="No Content") + transport = Urllib3HTTPTransport() + + result = transport.request( + "POST", + "http://example.test/v1/metrics", + headers={"content-type": "application/x-protobuf"}, + data=b"payload", + timeout=2.5, + ) + + self.assertEqual(result.status_code, 204) + self.assertEqual(result.reason, "No Content") + self.assertIsNone(result.error) + pool.request.assert_called_once() + self.assertEqual(pool.request.call_args.kwargs["method"], "POST") + self.assertEqual( + pool.request.call_args.kwargs["url"], + "http://example.test/v1/metrics", + ) + self.assertEqual( + pool.request.call_args.kwargs["headers"], + {"content-type": "application/x-protobuf"}, + ) + self.assertEqual(pool.request.call_args.kwargs["body"], b"payload") + self.assertTrue(pool.request.call_args.kwargs["preload_content"]) + timeout = pool.request.call_args.kwargs["timeout"] + self.assertIsInstance(timeout, urllib3.Timeout) + self.assertEqual(timeout.total, 2.5) + + @patch("urllib3.PoolManager") + def test_request_without_timeout_uses_no_timeout(self, mock_pool_manager): + pool = mock_pool_manager.return_value + pool.request.return_value = Mock(status=200, reason="OK") + transport = Urllib3HTTPTransport() + + result = transport.request("POST", "http://example.test/v1/logs") + + self.assertEqual(result.status_code, 200) + self.assertIsNone(pool.request.call_args.kwargs["timeout"]) + + @patch("urllib3.PoolManager") + def test_request_returns_error_result(self, mock_pool_manager): + error = urllib3.exceptions.HTTPError("request failed") + pool = mock_pool_manager.return_value + pool.request.side_effect = error + transport = Urllib3HTTPTransport() + + result = transport.request("POST", "http://example.test/v1/logs") + + self.assertIsNone(result.status_code) + self.assertIsNone(result.reason) + self.assertIs(result.error, error) + + def test_result_identifies_connection_errors(self): + cases = ( + (urllib3.exceptions.ConnectionError("connection"), True), + ( + urllib3.exceptions.NewConnectionError(None, "new connection"), + True, + ), + ( + urllib3.exceptions.ConnectTimeoutError( + None, "http://example.test", "connect timeout" + ), + True, + ), + ( + urllib3.exceptions.MaxRetryError( + None, "http://example.test", "max retry" + ), + True, + ), + (urllib3.exceptions.ProtocolError("protocol"), True), + (urllib3.exceptions.HTTPError("request"), False), + (None, False), + ) + + for error, expected in cases: + with self.subTest( + error=type(error).__name__ if error else None, + expected=expected, + ): + self.assertEqual( + Urllib3HTTPResult(error=error).is_connection_error(), + expected, + ) + + @patch("urllib3.PoolManager") + def test_close_clears_pool(self, mock_pool_manager): + pool = mock_pool_manager.return_value + transport = Urllib3HTTPTransport() + + self.assertIs(transport._pool, pool) + transport.close() + + pool.clear.assert_called_once_with() + + +class TestTransportFromArgs(unittest.TestCase): + def test_returns_requests_transport_for_session(self): + session = requests.Session() + + transport = _transport_from_args(session, True, None) + + self.assertIsInstance(transport, RequestsHTTPTransport) + self.assertIs(transport._session, session) + + @patch("urllib3.PoolManager") + def test_returns_urllib3_transport_without_session( + self, mock_pool_manager + ): + transport = _transport_from_args(None, True, None) + + self.assertIsInstance(transport, Urllib3HTTPTransport) + mock_pool_manager.assert_called_once() diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index 7981b0bc82..6cfe39dd5a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -19,7 +19,7 @@ import unittest from logging import WARNING from typing import List -from unittest.mock import MagicMock, Mock, patch +from unittest.mock import MagicMock, patch import requests from google.protobuf.json_format import MessageToDict @@ -29,13 +29,25 @@ from opentelemetry._logs import LogRecord, SeverityNumber from opentelemetry.exporter.otlp.proto.http import Compression -from opentelemetry.exporter.otlp.proto.http._log_exporter import ( +from opentelemetry.exporter.otlp.proto.http._common import ( + _DEFAULT_ENDPOINT, DEFAULT_COMPRESSION, - DEFAULT_ENDPOINT, +) +from opentelemetry.exporter.otlp.proto.http._log_exporter import ( DEFAULT_LOGS_EXPORT_PATH, DEFAULT_TIMEOUT, OTLPLogExporter, ) +from opentelemetry.exporter.otlp.proto.http._otlp_client import ( + ExportResult, + OTLPHTTPClient, +) +from opentelemetry.exporter.otlp.proto.http._transport._requests import ( + RequestsHTTPTransport, +) +from opentelemetry.exporter.otlp.proto.http._transport._urllib3 import ( + Urllib3HTTPTransport, +) from opentelemetry.exporter.otlp.proto.http.version import __version__ from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( ExportLogsServiceRequest, @@ -90,7 +102,8 @@ def test_constructor_default(self): exporter = OTLPLogExporter() self.assertEqual( - exporter._endpoint, DEFAULT_ENDPOINT + DEFAULT_LOGS_EXPORT_PATH + exporter._endpoint, + f"{_DEFAULT_ENDPOINT}/{DEFAULT_LOGS_EXPORT_PATH}", ) self.assertEqual(exporter._certificate_file, True) self.assertEqual(exporter._client_certificate_file, None) @@ -98,16 +111,18 @@ def test_constructor_default(self): self.assertEqual(exporter._timeout, DEFAULT_TIMEOUT) self.assertIs(exporter._compression, DEFAULT_COMPRESSION) self.assertEqual(exporter._headers, {}) - self.assertIsInstance(exporter._session, requests.Session) - self.assertIn("User-Agent", exporter._session.headers) + self.assertIn("User-Agent", exporter._client._headers) self.assertEqual( - exporter._session.headers.get("Content-Type"), + exporter._client._headers.get("Content-Type"), "application/x-protobuf", ) self.assertEqual( - exporter._session.headers.get("User-Agent"), + exporter._client._headers.get("User-Agent"), "OTel-OTLP-Exporter-Python/" + __version__, ) + self.assertIsInstance( + exporter._client._transport, Urllib3HTTPTransport + ) @patch.dict( "os.environ", @@ -158,15 +173,12 @@ def f(): "user-agent": "LogsUserAgent", }, ) - self.assertIs(exporter._session, credential) - self.assertIsInstance(exporter._session, requests.Session) - self.assertEqual( - exporter._session.headers.get("User-Agent"), - "LogsUserAgent", + self.assertIsInstance( + exporter._client._transport, RequestsHTTPTransport ) - self.assertEqual( - exporter._session.headers.get("Content-Type"), - "application/x-protobuf", + self.assertIs(exporter._client._transport._session, credential) + self.assertIsInstance( + exporter._client._transport._session, requests.Session ) @patch.dict( @@ -211,7 +223,7 @@ def test_exception_raised_when_entrypoint_does_not_exist(self): }, ) def test_exporter_constructor_take_priority(self): - sess = MagicMock() + session = Session() exporter = OTLPLogExporter( endpoint="endpoint.local:69/logs", certificate_file="/hello.crt", @@ -220,7 +232,7 @@ def test_exporter_constructor_take_priority(self): headers={"testHeader1": "value1", "testHeader2": "value2"}, timeout=70, compression=Compression.NoCompression, - session=sess(), + session=session, ) self.assertEqual(exporter._endpoint, "endpoint.local:69/logs") @@ -233,7 +245,10 @@ def test_exporter_constructor_take_priority(self): exporter._headers, {"testHeader1": "value1", "testHeader2": "value2"}, ) - self.assertTrue(sess.called) + self.assertIsInstance( + exporter._client._transport, RequestsHTTPTransport + ) + self.assertIs(exporter._client._transport._session, session) @patch.dict( "os.environ", @@ -268,14 +283,17 @@ def test_exporter_env(self): "user-agent": "Overridden", }, ) - self.assertIsInstance(exporter._session, requests.Session) @staticmethod def export_log_and_deserialize(log): - with patch("requests.Session.post") as mock_post: - exporter = OTLPLogExporter() + resp = Response() + resp.status_code = 200 + with patch( + "requests.Session.request", return_value=resp + ) as mock_request: + exporter = OTLPLogExporter(session=Session()) exporter.export([log]) - request_body = mock_post.call_args[1]["data"] + request_body = mock_request.call_args[1]["data"] request = ExportLogsServiceRequest() request.ParseFromString(request_body) request_dict = MessageToDict(request) @@ -456,8 +474,12 @@ def _get_sdk_log_data() -> List[ReadWriteLogRecord]: return [log1, log2, log3, log4] - @patch.object(OTLPLogExporter, "_export", return_value=Mock(ok=True)) - def test_2xx_status_code(self, mock_otlp_metric_exporter): + @patch.object( + OTLPHTTPClient, + "export", + return_value=ExportResult(True, 200, "OK", None), + ) + def test_2xx_status_code(self, mock_client_export): """ Test that any HTTP 2XX code returns a successful result """ @@ -467,16 +489,16 @@ def test_2xx_status_code(self, mock_otlp_metric_exporter): LogRecordExportResult.SUCCESS, ) - @patch.object(Session, "post") - def test_retry_timeout(self, mock_post): + @patch.object(Session, "request") + def test_retry_timeout(self, mock_request): exporter = OTLPLogExporter( - timeout=1.5, meter_provider=self.meter_provider + session=Session(), timeout=1.5, meter_provider=self.meter_provider ) resp = Response() resp.status_code = 503 resp.reason = "UNAVAILABLE" - mock_post.return_value = resp + mock_request.return_value = resp with self.assertLogs(level=WARNING) as warning: before = time.time() # Set timeout to 1.5 seconds @@ -486,7 +508,7 @@ def test_retry_timeout(self, mock_post): ) after = time.time() # First call at time 0, second at time 1, then an early return before the second backoff sleep b/c it would exceed timeout. - self.assertEqual(mock_post.call_count, 2) + self.assertEqual(mock_request.call_count, 2) # There's a +/-20% jitter on each backoff. self.assertTrue(0.75 < after - before < 1.25) self.assertIn( @@ -537,11 +559,11 @@ def test_retry_timeout(self, mock_post): 503, ) - @patch.object(Session, "post") - def test_export_no_collector_available_retryable(self, mock_post): - exporter = OTLPLogExporter(timeout=1.5) + @patch.object(Session, "request") + def test_export_no_collector_available_retryable(self, mock_request): + exporter = OTLPLogExporter(session=Session(), timeout=1.5) msg = "Server not available." - mock_post.side_effect = ConnectionError(msg) + mock_request.side_effect = ConnectionError(msg) with self.assertLogs(level=WARNING) as warning: self.assertEqual( exporter.export(self._get_sdk_log_data()), @@ -549,25 +571,25 @@ def test_export_no_collector_available_retryable(self, mock_post): ) # Check for greater 2 because the request is on each retry # done twice at the moment. - self.assertGreater(mock_post.call_count, 2) + self.assertGreater(mock_request.call_count, 2) self.assertIn( f"Transient error {msg} encountered while exporting logs batch, retrying in", warning.records[0].message, ) - @patch.object(Session, "post") - def test_export_no_collector_available(self, mock_post): + @patch.object(Session, "request") + def test_export_no_collector_available(self, mock_request): exporter = OTLPLogExporter( - timeout=1.5, meter_provider=self.meter_provider + session=Session(), timeout=1.5, meter_provider=self.meter_provider ) - mock_post.side_effect = requests.exceptions.RequestException() + mock_request.side_effect = requests.exceptions.RequestException() with self.assertLogs(level=WARNING) as warning: self.assertEqual( exporter.export(self._get_sdk_log_data()), LogRecordExportResult.FAILURE, ) - self.assertEqual(mock_post.call_count, 1) + self.assertEqual(mock_request.call_count, 1) self.assertIn( "Failed to export logs batch code", warning.records[0].message, @@ -616,8 +638,8 @@ def test_export_no_collector_available(self, mock_post): metrics[2].data.data_points[0].attributes, ) - @patch.object(Session, "post") - def test_timeout_set_correctly(self, mock_post): + @patch.object(Session, "request") + def test_timeout_set_correctly(self, mock_request): resp = Response() resp.status_code = 200 @@ -626,18 +648,18 @@ def export_side_effect(*args, **kwargs): self.assertAlmostEqual(0.4, kwargs["timeout"], 2) return resp - mock_post.side_effect = export_side_effect - exporter = OTLPLogExporter(timeout=0.4) + mock_request.side_effect = export_side_effect + exporter = OTLPLogExporter(session=Session(), timeout=0.4) exporter.export(self._get_sdk_log_data()) - @patch.object(Session, "post") - def test_shutdown_interrupts_retry_backoff(self, mock_post): - exporter = OTLPLogExporter(timeout=1.5) + @patch.object(Session, "request") + def test_shutdown_interrupts_retry_backoff(self, mock_request): + exporter = OTLPLogExporter(session=Session(), timeout=1.5) resp = Response() resp.status_code = 503 resp.reason = "UNAVAILABLE" - mock_post.return_value = resp + mock_request.return_value = resp thread = threading.Thread( target=exporter.export, args=(self._get_sdk_log_data(),) ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 0df471aa69..487dfcc603 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -24,9 +24,21 @@ from requests.models import Response from opentelemetry.exporter.otlp.proto.http import Compression -from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( +from opentelemetry.exporter.otlp.proto.http._common import ( + _DEFAULT_ENDPOINT, DEFAULT_COMPRESSION, - DEFAULT_ENDPOINT, +) +from opentelemetry.exporter.otlp.proto.http._otlp_client import ( + ExportResult, + OTLPHTTPClient, +) +from opentelemetry.exporter.otlp.proto.http._transport._requests import ( + RequestsHTTPTransport, +) +from opentelemetry.exporter.otlp.proto.http._transport._urllib3 import ( + Urllib3HTTPTransport, +) +from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( DEFAULT_TIMEOUT, DEFAULT_TRACES_EXPORT_PATH, OTLPSpanExporter, @@ -85,7 +97,8 @@ def test_constructor_default(self): exporter = OTLPSpanExporter() self.assertEqual( - exporter._endpoint, DEFAULT_ENDPOINT + DEFAULT_TRACES_EXPORT_PATH + exporter._endpoint, + f"{_DEFAULT_ENDPOINT}/{DEFAULT_TRACES_EXPORT_PATH}", ) self.assertEqual(exporter._certificate_file, True) self.assertEqual(exporter._client_certificate_file, None) @@ -93,16 +106,18 @@ def test_constructor_default(self): self.assertEqual(exporter._timeout, DEFAULT_TIMEOUT) self.assertIs(exporter._compression, DEFAULT_COMPRESSION) self.assertEqual(exporter._headers, {}) - self.assertIsInstance(exporter._session, requests.Session) - self.assertIn("User-Agent", exporter._session.headers) + self.assertIn("User-Agent", exporter._client._headers) self.assertEqual( - exporter._session.headers.get("Content-Type"), + exporter._client._headers.get("Content-Type"), "application/x-protobuf", ) self.assertEqual( - exporter._session.headers.get("User-Agent"), + exporter._client._headers.get("User-Agent"), "OTel-OTLP-Exporter-Python/" + __version__, ) + self.assertIsInstance( + exporter._client._transport, Urllib3HTTPTransport + ) @patch.dict( "os.environ", @@ -153,15 +168,12 @@ def f(): "user-agent": "TraceUserAgent", }, ) - self.assertIs(exporter._session, credential) - self.assertIsInstance(exporter._session, requests.Session) - self.assertEqual( - exporter._session.headers.get("Content-Type"), - "application/x-protobuf", + self.assertIsInstance( + exporter._client._transport, RequestsHTTPTransport ) - self.assertEqual( - exporter._session.headers.get("User-Agent"), - "TraceUserAgent", + self.assertIs(exporter._client._transport._session, credential) + self.assertIsInstance( + exporter._client._transport._session, requests.Session ) @patch.dict( @@ -178,6 +190,7 @@ def f(): }, ) def test_exporter_constructor_take_priority(self): + session = requests.Session() exporter = OTLPSpanExporter( endpoint="example.com/1234", certificate_file="path/to/service.crt", @@ -186,7 +199,7 @@ def test_exporter_constructor_take_priority(self): headers={"testHeader1": "value1", "testHeader2": "value2"}, timeout=20, compression=Compression.NoCompression, - session=requests.Session(), + session=session, ) self.assertEqual(exporter._endpoint, "example.com/1234") @@ -201,7 +214,13 @@ def test_exporter_constructor_take_priority(self): exporter._headers, {"testHeader1": "value1", "testHeader2": "value2"}, ) - self.assertIsInstance(exporter._session, requests.Session) + self.assertIsInstance( + exporter._client._transport, RequestsHTTPTransport + ) + self.assertIs(exporter._client._transport._session, session) + self.assertIsInstance( + exporter._client._transport._session, requests.Session + ) @patch.dict( "os.environ", @@ -277,8 +296,12 @@ def test_headers_parse_from_env(self): ), ) - @patch.object(OTLPSpanExporter, "_export", return_value=Mock(ok=True)) - def test_2xx_status_code(self, mock_otlp_metric_exporter): + @patch.object( + OTLPHTTPClient, + "export", + return_value=ExportResult(True, 200, "OK", None), + ) + def test_2xx_status_code(self, mock_client_export): """ Test that any HTTP 2XX code returns a successful result """ @@ -287,16 +310,16 @@ def test_2xx_status_code(self, mock_otlp_metric_exporter): OTLPSpanExporter().export(MagicMock()), SpanExportResult.SUCCESS ) - @patch.object(Session, "post") - def test_retry_timeout(self, mock_post): + @patch.object(Session, "request") + def test_retry_timeout(self, mock_request): exporter = OTLPSpanExporter( - timeout=1.5, meter_provider=self.meter_provider + session=Session(), timeout=1.5, meter_provider=self.meter_provider ) resp = Response() resp.status_code = 503 resp.reason = "UNAVAILABLE" - mock_post.return_value = resp + mock_request.return_value = resp with self.assertLogs(level=WARNING) as warning: before = time.time() # Set timeout to 1.5 seconds @@ -306,11 +329,11 @@ def test_retry_timeout(self, mock_post): ) after = time.time() # First call at time 0, second at time 1, then an early return before the second backoff sleep b/c it would exceed timeout. - self.assertEqual(mock_post.call_count, 2) + self.assertEqual(mock_request.call_count, 2) # There's a +/-20% jitter on each backoff. self.assertTrue(0.75 < after - before < 1.25) self.assertIn( - "Transient error UNAVAILABLE encountered while exporting span batch, retrying in", + "Transient error UNAVAILABLE encountered while exporting spans batch, retrying in", warning.records[0].message, ) @@ -357,11 +380,11 @@ def test_retry_timeout(self, mock_post): metrics[2].data.data_points[0].attributes, ) - @patch.object(Session, "post") - def test_export_no_collector_available_retryable(self, mock_post): - exporter = OTLPSpanExporter(timeout=1.5) + @patch.object(Session, "request") + def test_export_no_collector_available_retryable(self, mock_request): + exporter = OTLPSpanExporter(session=Session(), timeout=1.5) msg = "Server not available." - mock_post.side_effect = ConnectionError(msg) + mock_request.side_effect = ConnectionError(msg) with self.assertLogs(level=WARNING) as warning: self.assertEqual( exporter.export([BASIC_SPAN]), @@ -369,27 +392,27 @@ def test_export_no_collector_available_retryable(self, mock_post): ) # Check for greater 2 because the request is on each retry # done twice at the moment. - self.assertGreater(mock_post.call_count, 2) + self.assertGreater(mock_request.call_count, 2) self.assertIn( - f"Transient error {msg} encountered while exporting span batch, retrying in", + f"Transient error {msg} encountered while exporting spans batch, retrying in", warning.records[0].message, ) - @patch.object(Session, "post") - def test_export_no_collector_available(self, mock_post): + @patch.object(Session, "request") + def test_export_no_collector_available(self, mock_request): exporter = OTLPSpanExporter( - timeout=1.5, meter_provider=self.meter_provider + session=Session(), timeout=1.5, meter_provider=self.meter_provider ) - mock_post.side_effect = requests.exceptions.RequestException() + mock_request.side_effect = requests.exceptions.RequestException() with self.assertLogs(level=WARNING) as warning: self.assertEqual( exporter.export([BASIC_SPAN]), SpanExportResult.FAILURE, ) - self.assertEqual(mock_post.call_count, 1) + self.assertEqual(mock_request.call_count, 1) self.assertIn( - "Failed to export span batch code", + "Failed to export spans batch code", warning.records[0].message, ) @@ -436,8 +459,8 @@ def test_export_no_collector_available(self, mock_post): metrics[2].data.data_points[0].attributes, ) - @patch.object(Session, "post") - def test_timeout_set_correctly(self, mock_post): + @patch.object(Session, "request") + def test_timeout_set_correctly(self, mock_request): resp = Response() resp.status_code = 200 @@ -446,18 +469,18 @@ def export_side_effect(*args, **kwargs): self.assertAlmostEqual(0.4, kwargs["timeout"], 2) return resp - mock_post.side_effect = export_side_effect - exporter = OTLPSpanExporter(timeout=0.4) + mock_request.side_effect = export_side_effect + exporter = OTLPSpanExporter(session=Session(), timeout=0.4) exporter.export([BASIC_SPAN]) - @patch.object(Session, "post") - def test_shutdown_interrupts_retry_backoff(self, mock_post): - exporter = OTLPSpanExporter(timeout=1.5) + @patch.object(Session, "request") + def test_shutdown_interrupts_retry_backoff(self, mock_request): + exporter = OTLPSpanExporter(session=Session(), timeout=1.5) resp = Response() resp.status_code = 503 resp.reason = "UNAVAILABLE" - mock_post.return_value = resp + mock_request.return_value = resp thread = threading.Thread(target=exporter.export, args=([BASIC_SPAN],)) with self.assertLogs(level=WARNING) as warning: before = time.time() @@ -469,7 +492,7 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): thread.join() after = time.time() self.assertIn( - "Transient error UNAVAILABLE encountered while exporting span batch, retrying in", + "Transient error UNAVAILABLE encountered while exporting spans batch, retrying in", warning.records[0].message, ) self.assertIn( diff --git a/pyproject.toml b/pyproject.toml index 4e4257ceac..7f054475d9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ dependencies = [ "opentelemetry-proto-json", "opentelemetry-test-utils", "opentelemetry-exporter-otlp-proto-grpc", - "opentelemetry-exporter-otlp-proto-http", + "opentelemetry-exporter-otlp-proto-http[requests]", "opentelemetry-exporter-otlp-proto-common", "opentelemetry-exporter-zipkin-json", "opentelemetry-exporter-prometheus", diff --git a/uv.lock b/uv.lock index ab6be2bcb2..17b7e9f2f4 100644 --- a/uv.lock +++ b/uv.lock @@ -920,14 +920,17 @@ dependencies = [ { name = "opentelemetry-exporter-otlp-proto-common" }, { name = "opentelemetry-proto" }, { name = "opentelemetry-sdk" }, - { name = "requests" }, { name = "typing-extensions" }, + { name = "urllib3" }, ] [package.optional-dependencies] gcp-auth = [ { name = "opentelemetry-exporter-credential-provider-gcp" }, ] +requests = [ + { name = "requests" }, +] [package.metadata] requires-dist = [ @@ -937,10 +940,11 @@ requires-dist = [ { name = "opentelemetry-exporter-otlp-proto-common", editable = "exporter/opentelemetry-exporter-otlp-proto-common" }, { name = "opentelemetry-proto", editable = "opentelemetry-proto" }, { name = "opentelemetry-sdk", editable = "opentelemetry-sdk" }, - { name = "requests", specifier = "~=2.7" }, + { name = "requests", marker = "extra == 'requests'", specifier = "~=2.7" }, { name = "typing-extensions", specifier = ">=4.5.0" }, + { name = "urllib3", specifier = ">=1.11" }, ] -provides-extras = ["gcp-auth"] +provides-extras = ["gcp-auth", "requests"] [[package]] name = "opentelemetry-exporter-prometheus" @@ -1021,7 +1025,7 @@ dependencies = [ { name = "opentelemetry-codegen-json" }, { name = "opentelemetry-exporter-otlp-proto-common" }, { name = "opentelemetry-exporter-otlp-proto-grpc" }, - { name = "opentelemetry-exporter-otlp-proto-http" }, + { name = "opentelemetry-exporter-otlp-proto-http", extra = ["requests"] }, { name = "opentelemetry-exporter-prometheus" }, { name = "opentelemetry-exporter-zipkin-json" }, { name = "opentelemetry-propagator-b3" }, @@ -1047,7 +1051,7 @@ requires-dist = [ { name = "opentelemetry-codegen-json", editable = "codegen/opentelemetry-codegen-json" }, { name = "opentelemetry-exporter-otlp-proto-common", editable = "exporter/opentelemetry-exporter-otlp-proto-common" }, { name = "opentelemetry-exporter-otlp-proto-grpc", editable = "exporter/opentelemetry-exporter-otlp-proto-grpc" }, - { name = "opentelemetry-exporter-otlp-proto-http", editable = "exporter/opentelemetry-exporter-otlp-proto-http" }, + { name = "opentelemetry-exporter-otlp-proto-http", extras = ["requests"], editable = "exporter/opentelemetry-exporter-otlp-proto-http" }, { name = "opentelemetry-exporter-prometheus", editable = "exporter/opentelemetry-exporter-prometheus" }, { name = "opentelemetry-exporter-zipkin-json", editable = "exporter/opentelemetry-exporter-zipkin-json" }, { name = "opentelemetry-propagator-b3", editable = "propagator/opentelemetry-propagator-b3" },