diff --git a/CHANGELOG.md b/CHANGELOG.md index afe82e55b2..3976475d46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- `opentelemetry-exporter-otlp-proto-http`: refactor shared HTTP exporter logic into common module, extract `_setup_session`, `_export`, + `_export_with_retries`, and `_compression_from_env` from trace/log/metric exporters into `_common` + ([#2990](https://github.com/open-telemetry/opentelemetry-python/pull/5160)) - `opentelemetry-sdk`: add `additional_properties` support to generated config models via custom `datamodel-codegen` template, enabling plugin/custom component names to flow through typed dataclasses ([#5131](https://github.com/open-telemetry/opentelemetry-python/pull/5131)) - Fix incorrect code example in `create_tracer()` docstring diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py index 1bdb7d228c..af4f364f62 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py @@ -12,15 +12,55 @@ # See the License for the specific language governing permissions and # limitations under the License. +import gzip +import logging +import random +import threading +import zlib +from dataclasses import dataclass +from io import BytesIO from os import environ -from typing import Literal, Optional +from time import time +from typing import Any, Dict, Optional +from urllib.parse import urlparse import requests +from requests.exceptions import ConnectionError +from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( + ExporterMetrics, +) +from opentelemetry.exporter.otlp.proto.http import ( + _OTLP_HTTP_HEADERS, + Compression, +) +from opentelemetry.metrics import MeterProvider from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_CREDENTIAL_PROVIDER, + OTEL_EXPORTER_OTLP_CERTIFICATE, + OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, + OTEL_EXPORTER_OTLP_CLIENT_KEY, + OTEL_EXPORTER_OTLP_COMPRESSION, + OTEL_EXPORTER_OTLP_ENDPOINT, + OTEL_EXPORTER_OTLP_HEADERS, + OTEL_EXPORTER_OTLP_TIMEOUT, +) +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) +from opentelemetry.semconv.attributes.http_attributes import ( + HTTP_RESPONSE_STATUS_CODE, ) from opentelemetry.util._importlib_metadata import entry_points +from opentelemetry.util.re import parse_env_headers + +_logger = logging.getLogger(__name__) + +_MAX_RETRIES = 6 + +DEFAULT_COMPRESSION = Compression.NoCompression +DEFAULT_ENDPOINT = "http://localhost:4318/" +DEFAULT_TIMEOUT = 10 # in seconds def _is_retryable(resp: requests.Response) -> bool: @@ -32,11 +72,7 @@ def _is_retryable(resp: requests.Response) -> bool: def _load_session_from_envvar( - cred_envvar: Literal[ - "OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER", - "OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER", - "OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER", - ], + cred_envvar: str, ) -> Optional[requests.Session]: _credential_env = environ.get( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_CREDENTIAL_PROVIDER @@ -64,3 +100,235 @@ def _load_session_from_envvar( f" must be of type `requests.Session`." ) return None + + +def _compression_from_env(compression_envvar: str) -> Compression: + compression = ( + environ.get( + compression_envvar, + environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"), + ) + .lower() + .strip() + ) + return Compression(compression) + + +def _append_signal_path(endpoint: str, signal_path: str) -> str: + if endpoint.endswith("/"): + return endpoint + signal_path + return endpoint + f"/{signal_path}" + + +@dataclass(frozen=True) +class _SignalConfig: + endpoint_envvar: str + certificate_envvar: str + client_key_envvar: str + client_certificate_envvar: str + headers_envvar: str + timeout_envvar: str + compression_envvar: str + credential_envvar: str + default_export_path: str + component_type: OtelComponentTypeValues + signal_name: str + + +class OTLPHttpClient: + def __init__( + self, + endpoint: Optional[str], + certificate_file: Optional[str], + client_key_file: Optional[str], + client_certificate_file: Optional[str], + headers: Optional[Dict[str, str]], + timeout: Optional[float], + compression: Optional[Compression], + session: Optional[requests.Session], + meter_provider: Optional[MeterProvider], + signal_config: _SignalConfig, + ): + self._shutdown_in_progress = threading.Event() + self._endpoint = endpoint or environ.get( + signal_config.endpoint_envvar, + _append_signal_path( + environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT), + signal_config.default_export_path, + ), + ) + self._certificate_file = certificate_file or environ.get( + signal_config.certificate_envvar, + environ.get(OTEL_EXPORTER_OTLP_CERTIFICATE, True), + ) + self._client_key_file = client_key_file or environ.get( + signal_config.client_key_envvar, + environ.get(OTEL_EXPORTER_OTLP_CLIENT_KEY, None), + ) + self._client_certificate_file = client_certificate_file or environ.get( + signal_config.client_certificate_envvar, + environ.get(OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, None), + ) + self._client_cert = ( + (self._client_certificate_file, self._client_key_file) + if self._client_certificate_file and self._client_key_file + else self._client_certificate_file + ) + headers_string = environ.get( + signal_config.headers_envvar, + environ.get(OTEL_EXPORTER_OTLP_HEADERS, ""), + ) + self._headers = headers or parse_env_headers( + headers_string, liberal=True + ) + self._timeout = timeout or float( + environ.get( + signal_config.timeout_envvar, + environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), + ) + ) + self._compression = compression or _compression_from_env( + signal_config.compression_envvar + ) + self._session = self._setup_session( + session, signal_config.credential_envvar + ) + self._shutdown = False + self._metrics = ExporterMetrics( + signal_config.component_type, + signal_config.signal_name, + urlparse(self._endpoint), + meter_provider, + ) + + def _setup_session( + self, + session: Optional[requests.Session], + cred_envvar: str, + ) -> requests.Session: + configured_session = ( + session + or _load_session_from_envvar(cred_envvar) + or requests.Session() + ) + configured_session.headers.update(self._headers) + configured_session.headers.update(_OTLP_HTTP_HEADERS) + # let users override our defaults + configured_session.headers.update(self._headers) + if self._compression is not Compression.NoCompression: + configured_session.headers.update( + {"Content-Encoding": self._compression.value} + ) + return configured_session + + def _export( + self, serialized_data: bytes, timeout_sec: float + ) -> requests.Response: + data = serialized_data + if self._compression == Compression.Gzip: + gzip_data = BytesIO() + with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: + gzip_stream.write(serialized_data) + data = gzip_data.getvalue() + elif self._compression == Compression.Deflate: + data = zlib.compress(serialized_data) + + # By default, keep-alive is enabled in Session's request + # headers. Backends may choose to close the connection + # while a post happens which causes an unhandled + # exception. This try/except will retry the post on such exceptions + try: + resp = self._session.post( + url=self._endpoint, + data=data, + verify=self._certificate_file, + timeout=timeout_sec, + cert=self._client_cert, + ) + except ConnectionError: + resp = self._session.post( + url=self._endpoint, + data=data, + verify=self._certificate_file, + timeout=timeout_sec, + cert=self._client_cert, + ) + return resp + + def _export_with_retries( + self, + serialized_data: bytes, + result: Any, + batch_name: str, + ) -> bool: + deadline_sec = time() + self._timeout + for retry_num in range(_MAX_RETRIES): + # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. + backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + export_error: Optional[Exception] = None + try: + if ( + resp := self._export( + serialized_data, deadline_sec - time() + ) + ).ok: + return True + except requests.exceptions.RequestException as error: + reason = error + export_error = error + retryable = isinstance(error, ConnectionError) + status_code = None + else: + reason = resp.reason + retryable = _is_retryable(resp) + status_code = resp.status_code + + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + + if not retryable: + _logger.error( + "Failed to export %s batch code: %s, reason: %s", + batch_name, + status_code, + reason, + ) + result.error = export_error + result.error_attrs = error_attrs + return False + + if ( + retry_num + 1 == _MAX_RETRIES + or backoff_seconds > (deadline_sec - time()) + or self._shutdown_in_progress.is_set() + ): + _logger.error( + "Failed to export %s batch due to timeout, " + "max retries or shutdown.", + batch_name, + ) + result.error = export_error + result.error_attrs = error_attrs + return False + + _logger.warning( + "Transient error %s encountered while exporting %s batch, retrying in %.2fs.", + reason, + batch_name, + backoff_seconds, + ) + if self._shutdown_in_progress.wait(backoff_seconds): + _logger.warning("Shutdown in progress, aborting retry.") + break + return False + + def shutdown(self, timeout_millis: Optional[float] = None) -> None: + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring call") + return + self._shutdown = True + self._shutdown_in_progress.set() + self._session.close() diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 6032433dd1..b4238ee108 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -12,31 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -import gzip import logging -import random -import threading -import zlib -from io import BytesIO -from os import environ -from time import time from typing import Dict, Optional, Sequence -from urllib.parse import urlparse import requests -from requests.exceptions import ConnectionError -from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( - ExporterMetrics, -) from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.exporter.otlp.proto.http import ( - _OTLP_HTTP_HEADERS, Compression, ) from opentelemetry.exporter.otlp.proto.http._common import ( - _is_retryable, - _load_session_from_envvar, + OTLPHttpClient, + _SignalConfig, ) from opentelemetry.metrics import MeterProvider from opentelemetry.sdk._logs import ReadableLogRecord @@ -47,12 +34,6 @@ from opentelemetry.sdk._shared_internal import DuplicateFilter from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER, - OTEL_EXPORTER_OTLP_CERTIFICATE, - OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, - OTEL_EXPORTER_OTLP_CLIENT_KEY, - OTEL_EXPORTER_OTLP_COMPRESSION, - OTEL_EXPORTER_OTLP_ENDPOINT, - OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE, OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE, OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY, @@ -60,29 +41,33 @@ OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, OTEL_EXPORTER_OTLP_LOGS_HEADERS, OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, - OTEL_EXPORTER_OTLP_TIMEOUT, ) from opentelemetry.semconv._incubating.attributes.otel_attributes import ( OtelComponentTypeValues, ) -from opentelemetry.semconv.attributes.http_attributes import ( - HTTP_RESPONSE_STATUS_CODE, -) -from opentelemetry.util.re import parse_env_headers _logger = logging.getLogger(__name__) # This prevents logs generated when a log fails to be written to generate another log which fails to be written etc. etc. _logger.addFilter(DuplicateFilter()) - -DEFAULT_COMPRESSION = Compression.NoCompression -DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_LOGS_EXPORT_PATH = "v1/logs" -DEFAULT_TIMEOUT = 10 # in seconds -_MAX_RETRYS = 6 + +_LOGS_CONFIG = _SignalConfig( + endpoint_envvar=OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, + certificate_envvar=OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE, + client_key_envvar=OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY, + client_certificate_envvar=OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE, + headers_envvar=OTEL_EXPORTER_OTLP_LOGS_HEADERS, + timeout_envvar=OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, + compression_envvar=OTEL_EXPORTER_OTLP_LOGS_COMPRESSION, + credential_envvar=_OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER, + default_export_path=DEFAULT_LOGS_EXPORT_PATH, + component_type=OtelComponentTypeValues.OTLP_HTTP_LOG_EXPORTER, + signal_name="logs", +) -class OTLPLogExporter(LogRecordExporter): +class OTLPLogExporter(OTLPHttpClient, LogRecordExporter): def __init__( self, endpoint: Optional[str] = None, @@ -96,106 +81,20 @@ def __init__( *, meter_provider: Optional[MeterProvider] = None, ): - self._shutdown_is_occuring = threading.Event() - self._endpoint = endpoint or environ.get( - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, - _append_logs_path( - environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT) - ), - ) - # Keeping these as instance variables because they are used in tests - self._certificate_file = certificate_file or environ.get( - OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE, - environ.get(OTEL_EXPORTER_OTLP_CERTIFICATE, True), - ) - self._client_key_file = client_key_file or environ.get( - OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY, - environ.get(OTEL_EXPORTER_OTLP_CLIENT_KEY, None), - ) - self._client_certificate_file = client_certificate_file or environ.get( - OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE, - environ.get(OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, None), - ) - self._client_cert = ( - (self._client_certificate_file, self._client_key_file) - if self._client_certificate_file and self._client_key_file - else self._client_certificate_file - ) - headers_string = environ.get( - OTEL_EXPORTER_OTLP_LOGS_HEADERS, - environ.get(OTEL_EXPORTER_OTLP_HEADERS, ""), - ) - self._headers = headers or parse_env_headers( - headers_string, liberal=True - ) - self._timeout = timeout or float( - environ.get( - OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, - environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), - ) - ) - self._compression = compression or _compression_from_env() - self._session = ( - session - or _load_session_from_envvar( - _OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER - ) - or requests.Session() - ) - self._session.headers.update(self._headers) - self._session.headers.update(_OTLP_HTTP_HEADERS) - # let users override our defaults - self._session.headers.update(self._headers) - if self._compression is not Compression.NoCompression: - self._session.headers.update( - {"Content-Encoding": self._compression.value} - ) - self._shutdown = False - - self._metrics = ExporterMetrics( - OtelComponentTypeValues.OTLP_HTTP_LOG_EXPORTER, - "logs", - urlparse(self._endpoint), - meter_provider, + OTLPHttpClient.__init__( + self, + endpoint=endpoint, + certificate_file=certificate_file, + client_key_file=client_key_file, + client_certificate_file=client_certificate_file, + headers=headers, + timeout=timeout, + compression=compression, + session=session, + meter_provider=meter_provider, + signal_config=_LOGS_CONFIG, ) - def _export( - self, serialized_data: bytes, timeout_sec: Optional[float] = None - ): - data = serialized_data - if self._compression == Compression.Gzip: - gzip_data = BytesIO() - with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: - gzip_stream.write(serialized_data) - data = gzip_data.getvalue() - elif self._compression == Compression.Deflate: - data = zlib.compress(serialized_data) - - if timeout_sec is None: - timeout_sec = self._timeout - - # By default, keep-alive is enabled in Session's request - # headers. Backends may choose to close the connection - # while a post happens which causes an unhandled - # exception. This try/except will retry the post on such exceptions - try: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - except ConnectionError: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - return resp - def export( self, batch: Sequence[ReadableLogRecord] ) -> LogRecordExportResult: @@ -203,96 +102,14 @@ def export( _logger.warning("Exporter already shutdown, ignoring batch") return LogRecordExportResult.FAILURE + serialized_data = encode_logs(batch).SerializeToString() with self._metrics.export_operation(len(batch)) as result: - serialized_data = encode_logs(batch).SerializeToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return LogRecordExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code - - if not retryable: - _logger.error( - "Failed to export logs batch code: %s, reason: %s", - status_code, - reason, - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return LogRecordExportResult.FAILURE - - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export logs batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return LogRecordExportResult.FAILURE - _logger.warning( - "Transient error %s encountered while exporting logs batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_is_occuring.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break - return LogRecordExportResult.FAILURE + return ( + LogRecordExportResult.SUCCESS + if self._export_with_retries(serialized_data, result, "logs") + else LogRecordExportResult.FAILURE + ) def force_flush(self, timeout_millis: float = 10_000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" return True - - def shutdown(self): - if self._shutdown: - _logger.warning("Exporter already shutdown, ignoring call") - return - self._shutdown = True - self._shutdown_is_occuring.set() - self._session.close() - - -def _compression_from_env() -> Compression: - compression = ( - environ.get( - OTEL_EXPORTER_OTLP_LOGS_COMPRESSION, - environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"), - ) - .lower() - .strip() - ) - return Compression(compression) - - -def _append_logs_path(endpoint: str) -> str: - if endpoint.endswith("/"): - return endpoint + DEFAULT_LOGS_EXPORT_PATH - return endpoint + f"/{DEFAULT_LOGS_EXPORT_PATH}" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index efd63b4543..47fb5ff6e9 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -12,14 +12,7 @@ # limitations under the License. from __future__ import annotations -import gzip import logging -import random -import threading -import zlib -from io import BytesIO -from os import environ -from time import time from typing import ( # noqa: F401 Any, Callable, @@ -31,7 +24,6 @@ from urllib.parse import urlparse import requests -from requests.exceptions import ConnectionError from typing_extensions import deprecated from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( @@ -47,12 +39,11 @@ encode_metrics, ) from opentelemetry.exporter.otlp.proto.http import ( - _OTLP_HTTP_HEADERS, Compression, ) from opentelemetry.exporter.otlp.proto.http._common import ( - _is_retryable, - _load_session_from_envvar, + OTLPHttpClient, + _SignalConfig, ) from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401 @@ -72,12 +63,6 @@ ) from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER, - OTEL_EXPORTER_OTLP_CERTIFICATE, - OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, - OTEL_EXPORTER_OTLP_CLIENT_KEY, - OTEL_EXPORTER_OTLP_COMPRESSION, - OTEL_EXPORTER_OTLP_ENDPOINT, - OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE, OTEL_EXPORTER_OTLP_METRICS_CLIENT_CERTIFICATE, OTEL_EXPORTER_OTLP_METRICS_CLIENT_KEY, @@ -85,7 +70,6 @@ OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_METRICS_HEADERS, OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, - OTEL_EXPORTER_OTLP_TIMEOUT, ) from opentelemetry.sdk.metrics._internal.aggregation import Aggregation from opentelemetry.sdk.metrics.export import ( # noqa: F401 @@ -103,22 +87,29 @@ from opentelemetry.semconv._incubating.attributes.otel_attributes import ( OtelComponentTypeValues, ) -from opentelemetry.semconv.attributes.http_attributes import ( - HTTP_RESPONSE_STATUS_CODE, -) -from opentelemetry.util.re import parse_env_headers _logger = logging.getLogger(__name__) - -DEFAULT_COMPRESSION = Compression.NoCompression -DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_METRICS_EXPORT_PATH = "v1/metrics" -DEFAULT_TIMEOUT = 10 # in seconds -_MAX_RETRYS = 6 + +_METRICS_CONFIG = _SignalConfig( + endpoint_envvar=OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, + certificate_envvar=OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE, + client_key_envvar=OTEL_EXPORTER_OTLP_METRICS_CLIENT_KEY, + client_certificate_envvar=OTEL_EXPORTER_OTLP_METRICS_CLIENT_CERTIFICATE, + headers_envvar=OTEL_EXPORTER_OTLP_METRICS_HEADERS, + timeout_envvar=OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, + compression_envvar=OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, + credential_envvar=_OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER, + default_export_path=DEFAULT_METRICS_EXPORT_PATH, + component_type=OtelComponentTypeValues.OTLP_HTTP_METRIC_EXPORTER, + signal_name="metrics", +) -class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin): +class OTLPMetricExporter( + OTLPHttpClient, MetricExporter, OTLPMetricExporterMixin +): def __init__( self, endpoint: str | None = None, @@ -157,188 +148,23 @@ def __init__( If not set there is no limit to the number of data points in a request. If it is set and the number of data points exceeds the max, the request will be split. """ - self._shutdown_in_progress = threading.Event() - self._endpoint = endpoint or environ.get( - OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, - _append_metrics_path( - environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT) - ), - ) - self._certificate_file = certificate_file or environ.get( - OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE, - environ.get(OTEL_EXPORTER_OTLP_CERTIFICATE, True), - ) - self._client_key_file = client_key_file or environ.get( - OTEL_EXPORTER_OTLP_METRICS_CLIENT_KEY, - environ.get(OTEL_EXPORTER_OTLP_CLIENT_KEY, None), - ) - self._client_certificate_file = client_certificate_file or environ.get( - OTEL_EXPORTER_OTLP_METRICS_CLIENT_CERTIFICATE, - environ.get(OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, None), - ) - self._client_cert = ( - (self._client_certificate_file, self._client_key_file) - if self._client_certificate_file and self._client_key_file - else self._client_certificate_file - ) - headers_string = environ.get( - OTEL_EXPORTER_OTLP_METRICS_HEADERS, - environ.get(OTEL_EXPORTER_OTLP_HEADERS, ""), - ) - self._headers = headers or parse_env_headers( - headers_string, liberal=True - ) - self._timeout = timeout or float( - environ.get( - OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, - environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), - ) - ) - self._compression = compression or _compression_from_env() - self._session = ( - session - or _load_session_from_envvar( - _OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER - ) - or requests.Session() + OTLPHttpClient.__init__( + self, + endpoint=endpoint, + certificate_file=certificate_file, + client_key_file=client_key_file, + client_certificate_file=client_certificate_file, + headers=headers, + timeout=timeout, + compression=compression, + session=session, + meter_provider=meter_provider, + signal_config=_METRICS_CONFIG, ) - self._session.headers.update(self._headers) - self._session.headers.update(_OTLP_HTTP_HEADERS) - # let users override our defaults - self._session.headers.update(self._headers) - if self._compression is not Compression.NoCompression: - self._session.headers.update( - {"Content-Encoding": self._compression.value} - ) - self._common_configuration( preferred_temporality, preferred_aggregation ) self._max_export_batch_size: int | None = max_export_batch_size - self._shutdown = False - - self._metrics = ExporterMetrics( - OtelComponentTypeValues.OTLP_HTTP_METRIC_EXPORTER, - "metrics", - urlparse(self._endpoint), - meter_provider, - ) - - def _export( - self, serialized_data: bytes, timeout_sec: Optional[float] = None - ): - data = serialized_data - if self._compression == Compression.Gzip: - gzip_data = BytesIO() - with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: - gzip_stream.write(serialized_data) - data = gzip_data.getvalue() - elif self._compression == Compression.Deflate: - data = zlib.compress(serialized_data) - - if timeout_sec is None: - timeout_sec = self._timeout - - # By default, keep-alive is enabled in Session's request - # headers. Backends may choose to close the connection - # while a post happens which causes an unhandled - # exception. This try/except will retry the post on such exceptions - try: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - except ConnectionError: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - return resp - - def _export_with_retries( - self, - export_request: ExportMetricsServiceRequest, - deadline_sec: float, - num_items: int, - ) -> MetricExportResult: - """Export serialized data with retry logic until success, non-transient error, or exponential backoff maxed out. - - Args: - export_request: ExportMetricsServiceRequest object containing metrics data to export - deadline_sec: timestamp deadline for the export - - Returns: - MetricExportResult: SUCCESS if export succeeded, FAILURE otherwise - """ - with self._metrics.export_operation(num_items) as result: - serialized_data = export_request.SerializeToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return MetricExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code - - if not retryable: - _logger.error( - "Failed to export metrics batch code: %s, reason: %s", - status_code, - reason, - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return MetricExportResult.FAILURE - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export metrics batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return MetricExportResult.FAILURE - - _logger.warning( - "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_in_progress.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break - return MetricExportResult.FAILURE def export( self, @@ -357,39 +183,32 @@ def export( num_items += len(metric.data.data_points) export_request = encode_metrics(metrics_data) - deadline_sec = time() + self._timeout + + def _do_export(request: ExportMetricsServiceRequest) -> bool: + serialized_data = request.SerializeToString() + with self._metrics.export_operation(num_items) as result: + return self._export_with_retries( + serialized_data, result, "metrics" + ) # If no batch size configured, export as single batch with retries as configured if self._max_export_batch_size is None: - return self._export_with_retries( - export_request, deadline_sec, num_items + return ( + MetricExportResult.SUCCESS + if _do_export(export_request) + else MetricExportResult.FAILURE ) # Else, export in batches of configured size - batched_export_requests = _split_metrics_data( + for split_request in _split_metrics_data( export_request, self._max_export_batch_size - ) - - for split_metrics_data in batched_export_requests: - export_result = self._export_with_retries( - split_metrics_data, - deadline_sec, - num_items, - ) - if export_result != MetricExportResult.SUCCESS: + ): + if not _do_export(split_request): return MetricExportResult.FAILURE # Only returns SUCCESS if all batches succeeded return MetricExportResult.SUCCESS - def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: - if self._shutdown: - _logger.warning("Exporter already shutdown, ignoring call") - return - self._shutdown = True - self._shutdown_in_progress.set() - self._session.close() - @property def _exporting(self) -> str: return "metrics" @@ -725,21 +544,3 @@ def get_resource_data( name: str, ) -> List[PB2Resource]: return _get_resource_data(sdk_resource_scope_data, resource_class, name) - - -def _compression_from_env() -> Compression: - compression = ( - environ.get( - OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, - environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"), - ) - .lower() - .strip() - ) - return Compression(compression) - - -def _append_metrics_path(endpoint: str) -> str: - if endpoint.endswith("/"): - return endpoint + DEFAULT_METRICS_EXPORT_PATH - return endpoint + f"/{DEFAULT_METRICS_EXPORT_PATH}" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 018d89df1e..7951452990 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -12,44 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -import gzip import logging -import random -import threading -import zlib -from io import BytesIO -from os import environ -from time import time from typing import Dict, Optional, Sequence -from urllib.parse import urlparse import requests -from requests.exceptions import ConnectionError -from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( - ExporterMetrics, -) from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( encode_spans, ) from opentelemetry.exporter.otlp.proto.http import ( - _OTLP_HTTP_HEADERS, Compression, ) from opentelemetry.exporter.otlp.proto.http._common import ( - _is_retryable, - _load_session_from_envvar, + OTLPHttpClient, + _SignalConfig, ) from opentelemetry.metrics import MeterProvider from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER, - OTEL_EXPORTER_OTLP_CERTIFICATE, - OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, - OTEL_EXPORTER_OTLP_CLIENT_KEY, - OTEL_EXPORTER_OTLP_COMPRESSION, - OTEL_EXPORTER_OTLP_ENDPOINT, - OTEL_EXPORTER_OTLP_HEADERS, - OTEL_EXPORTER_OTLP_TIMEOUT, OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE, OTEL_EXPORTER_OTLP_TRACES_CLIENT_CERTIFICATE, OTEL_EXPORTER_OTLP_TRACES_CLIENT_KEY, @@ -63,22 +43,27 @@ from opentelemetry.semconv._incubating.attributes.otel_attributes import ( OtelComponentTypeValues, ) -from opentelemetry.semconv.attributes.http_attributes import ( - HTTP_RESPONSE_STATUS_CODE, -) -from opentelemetry.util.re import parse_env_headers _logger = logging.getLogger(__name__) - -DEFAULT_COMPRESSION = Compression.NoCompression -DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_TRACES_EXPORT_PATH = "v1/traces" -DEFAULT_TIMEOUT = 10 # in seconds -_MAX_RETRYS = 6 + +_TRACES_CONFIG = _SignalConfig( + endpoint_envvar=OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, + certificate_envvar=OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE, + client_key_envvar=OTEL_EXPORTER_OTLP_TRACES_CLIENT_KEY, + client_certificate_envvar=OTEL_EXPORTER_OTLP_TRACES_CLIENT_CERTIFICATE, + headers_envvar=OTEL_EXPORTER_OTLP_TRACES_HEADERS, + timeout_envvar=OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, + compression_envvar=OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, + credential_envvar=_OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER, + default_export_path=DEFAULT_TRACES_EXPORT_PATH, + component_type=OtelComponentTypeValues.OTLP_HTTP_SPAN_EXPORTER, + signal_name="traces", +) -class OTLPSpanExporter(SpanExporter): +class OTLPSpanExporter(OTLPHttpClient, SpanExporter): def __init__( self, endpoint: Optional[str] = None, @@ -92,200 +77,33 @@ def __init__( *, meter_provider: Optional[MeterProvider] = None, ): - self._shutdown_in_progress = threading.Event() - self._endpoint = endpoint or environ.get( - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, - _append_trace_path( - environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT) - ), - ) - self._certificate_file = certificate_file or environ.get( - OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE, - environ.get(OTEL_EXPORTER_OTLP_CERTIFICATE, True), - ) - self._client_key_file = client_key_file or environ.get( - OTEL_EXPORTER_OTLP_TRACES_CLIENT_KEY, - environ.get(OTEL_EXPORTER_OTLP_CLIENT_KEY, None), - ) - self._client_certificate_file = client_certificate_file or environ.get( - OTEL_EXPORTER_OTLP_TRACES_CLIENT_CERTIFICATE, - environ.get(OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, None), - ) - self._client_cert = ( - (self._client_certificate_file, self._client_key_file) - if self._client_certificate_file and self._client_key_file - else self._client_certificate_file - ) - headers_string = environ.get( - OTEL_EXPORTER_OTLP_TRACES_HEADERS, - environ.get(OTEL_EXPORTER_OTLP_HEADERS, ""), - ) - self._headers = headers or parse_env_headers( - headers_string, liberal=True - ) - self._timeout = timeout or float( - environ.get( - OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, - environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), - ) - ) - self._compression = compression or _compression_from_env() - self._session = ( - session - or _load_session_from_envvar( - _OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER - ) - or requests.Session() - ) - self._session.headers.update(self._headers) - self._session.headers.update(_OTLP_HTTP_HEADERS) - # let users override our defaults - self._session.headers.update(self._headers) - if self._compression is not Compression.NoCompression: - self._session.headers.update( - {"Content-Encoding": self._compression.value} - ) - self._shutdown = False - - self._metrics = ExporterMetrics( - OtelComponentTypeValues.OTLP_HTTP_SPAN_EXPORTER, - "traces", - urlparse(self._endpoint), - meter_provider, + OTLPHttpClient.__init__( + self, + endpoint=endpoint, + certificate_file=certificate_file, + client_key_file=client_key_file, + client_certificate_file=client_certificate_file, + headers=headers, + timeout=timeout, + compression=compression, + session=session, + meter_provider=meter_provider, + signal_config=_TRACES_CONFIG, ) - def _export( - self, serialized_data: bytes, timeout_sec: Optional[float] = None - ): - data = serialized_data - if self._compression == Compression.Gzip: - gzip_data = BytesIO() - with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: - gzip_stream.write(serialized_data) - data = gzip_data.getvalue() - elif self._compression == Compression.Deflate: - data = zlib.compress(serialized_data) - - if timeout_sec is None: - timeout_sec = self._timeout - - # By default, keep-alive is enabled in Session's request - # headers. Backends may choose to close the connection - # while a post happens which causes an unhandled - # exception. This try/except will retry the post on such exceptions - try: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - except ConnectionError: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - return resp - def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: if self._shutdown: _logger.warning("Exporter already shutdown, ignoring batch") return SpanExportResult.FAILURE + serialized_data = encode_spans(spans).SerializePartialToString() with self._metrics.export_operation(len(spans)) as result: - serialized_data = encode_spans(spans).SerializePartialToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return SpanExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code - - if not retryable: - _logger.error( - "Failed to export span batch code: %s, reason: %s", - status_code, - reason, - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return SpanExportResult.FAILURE - - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export span batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return SpanExportResult.FAILURE - _logger.warning( - "Transient error %s encountered while exporting span batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_in_progress.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break - return SpanExportResult.FAILURE - - def shutdown(self): - if self._shutdown: - _logger.warning("Exporter already shutdown, ignoring call") - return - self._shutdown = True - self._shutdown_in_progress.set() - self._session.close() + return ( + SpanExportResult.SUCCESS + if self._export_with_retries(serialized_data, result, "span") + else SpanExportResult.FAILURE + ) def force_flush(self, timeout_millis: int = 30000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" return True - - -def _compression_from_env() -> Compression: - compression = ( - environ.get( - OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, - environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"), - ) - .lower() - .strip() - ) - return Compression(compression) - - -def _append_trace_path(endpoint: str) -> str: - if endpoint.endswith("/"): - return endpoint + DEFAULT_TRACES_EXPORT_PATH - return endpoint + f"/{DEFAULT_TRACES_EXPORT_PATH}" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 5f7ae2afa9..048f92c1b5 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -30,11 +30,13 @@ encode_metrics, ) from opentelemetry.exporter.otlp.proto.http import Compression -from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( +from opentelemetry.exporter.otlp.proto.http._common import ( DEFAULT_COMPRESSION, DEFAULT_ENDPOINT, - DEFAULT_METRICS_EXPORT_PATH, DEFAULT_TIMEOUT, +) +from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( + DEFAULT_METRICS_EXPORT_PATH, OTLPMetricExporter, _get_split_resource_metrics_pb2, _split_metrics_data, @@ -1265,7 +1267,10 @@ def test_exponential_explicit_bucket_histogram(self): ExplicitBucketHistogramAggregation, ) - @patch.object(OTLPMetricExporter, "_export", return_value=Mock(ok=True)) + @patch( + "opentelemetry.exporter.otlp.proto.http._common.OTLPHttpClient._export", + return_value=Mock(ok=True), + ) def test_2xx_status_code(self, mock_otlp_metric_exporter): """ Test that any HTTP 2XX code returns a successful result diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index 7981b0bc82..854efbca91 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -29,11 +29,13 @@ from opentelemetry._logs import LogRecord, SeverityNumber from opentelemetry.exporter.otlp.proto.http import Compression -from opentelemetry.exporter.otlp.proto.http._log_exporter import ( +from opentelemetry.exporter.otlp.proto.http._common import ( DEFAULT_COMPRESSION, DEFAULT_ENDPOINT, - DEFAULT_LOGS_EXPORT_PATH, DEFAULT_TIMEOUT, +) +from opentelemetry.exporter.otlp.proto.http._log_exporter import ( + DEFAULT_LOGS_EXPORT_PATH, OTLPLogExporter, ) from opentelemetry.exporter.otlp.proto.http.version import __version__ @@ -456,7 +458,10 @@ def _get_sdk_log_data() -> List[ReadWriteLogRecord]: return [log1, log2, log3, log4] - @patch.object(OTLPLogExporter, "_export", return_value=Mock(ok=True)) + @patch( + "opentelemetry.exporter.otlp.proto.http._common.OTLPHttpClient._export", + return_value=Mock(ok=True), + ) def test_2xx_status_code(self, mock_otlp_metric_exporter): """ Test that any HTTP 2XX code returns a successful result diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 0df471aa69..7881bd4315 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -24,10 +24,12 @@ from requests.models import Response from opentelemetry.exporter.otlp.proto.http import Compression -from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( +from opentelemetry.exporter.otlp.proto.http._common import ( DEFAULT_COMPRESSION, DEFAULT_ENDPOINT, DEFAULT_TIMEOUT, +) +from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( DEFAULT_TRACES_EXPORT_PATH, OTLPSpanExporter, ) @@ -277,7 +279,10 @@ def test_headers_parse_from_env(self): ), ) - @patch.object(OTLPSpanExporter, "_export", return_value=Mock(ok=True)) + @patch( + "opentelemetry.exporter.otlp.proto.http._common.OTLPHttpClient._export", + return_value=Mock(ok=True), + ) def test_2xx_status_code(self, mock_otlp_metric_exporter): """ Test that any HTTP 2XX code returns a successful result