Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""OTLP Exporter"""
"""OTLP Exporter

This module provides a mixin class for OTLP exporters that send telemetry data
to an OpenTelemetry Collector via gRPC. It includes a configurable reconnection
Comment thread
dheeraj-vanamala marked this conversation as resolved.
Outdated
logic to handle transient collector outages.

Environment Variables:
OTEL_EXPORTER_OTLP_RETRY_INTERVAL: Base retry interval in seconds (default: 2.0).
OTEL_EXPORTER_OTLP_MAX_RETRIES: Maximum number of retry attempts (default: 20).
OTEL_EXPORTER_OTLP_RETRY_TIMEOUT: Total retry timeout in seconds (default: 300).
OTEL_EXPORTER_OTLP_RETRY_MAX_DELAY: Maximum delay between retries in seconds (default: 60.0).
Comment thread
dheeraj-vanamala marked this conversation as resolved.
Outdated
OTEL_EXPORTER_OTLP_RETRY_FACTOR: Exponential backoff factor (default: 1.5).
OTEL_EXPORTER_OTLP_RETRY_JITTER: Jitter factor for retry delay (default: 0.2).
"""

import random
import threading
Expand Down Expand Up @@ -251,17 +264,24 @@ def _get_credentials(
if certificate_file:
client_key_file = environ.get(client_key_file_env_key)
client_certificate_file = environ.get(client_certificate_file_env_key)
return _load_credentials(
credentials = _load_credentials(
certificate_file, client_key_file, client_certificate_file
)
if credentials is not None:
return credentials
return ssl_channel_credentials()


# pylint: disable=no-member
class OTLPExporterMixin(
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT, ExportStubT]
):
"""OTLP span exporter
"""OTLP gRPC exporter mixin.

This class provides the base functionality for OTLP exporters that send
telemetry data (spans or metrics) to an OpenTelemetry Collector via gRPC.
Comment thread
dheeraj-vanamala marked this conversation as resolved.
Outdated
It includes a configurable reconnection mechanism to handle transient
collector outages.

Args:
endpoint: OpenTelemetry Collector receiver endpoint
Expand Down Expand Up @@ -308,6 +328,8 @@ def __init__(
if parsed_url.netloc:
self._endpoint = parsed_url.netloc

self._insecure = insecure
self._credentials = credentials
self._headers = headers or environ.get(OTEL_EXPORTER_OTLP_HEADERS)
if isinstance(self._headers, str):
temp_headers = parse_env_headers(self._headers, liberal=True)
Expand Down Expand Up @@ -341,16 +363,49 @@ def __init__(
if compression is None
else compression
) or Compression.NoCompression
self._compression = compression

# Initialize the channel and stub using the proper method
self._initialize_channel_and_stub()
Comment thread
dheeraj-vanamala marked this conversation as resolved.

if insecure:
def _initialize_channel_and_stub(self):
"""
Create a new gRPC channel and stub.

This method is used during initialization and by the reconnection
mechanism to reinitialize the channel on transient errors.
"""
# Add channel options for better reconnection behavior
# Only add these if we're dealing with reconnection scenarios
channel_options = []
if hasattr(self, "_channel_reconnection_enabled"):
Comment thread
dheeraj-vanamala marked this conversation as resolved.
Outdated
channel_options = [
("grpc.keepalive_time_ms", 30000),
("grpc.keepalive_timeout_ms", 15000),
("grpc.keepalive_permit_without_calls", 1),
("grpc.initial_reconnect_backoff_ms", 5000),
("grpc.min_reconnect_backoff_ms", 5000),
("grpc.max_reconnect_backoff_ms", 30000),
]

# Merge reconnection options with existing channel options
Comment thread
dheeraj-vanamala marked this conversation as resolved.
Outdated
current_options = list(self._channel_options)
Comment thread
dheeraj-vanamala marked this conversation as resolved.
Outdated
# Filter out options that we are about to override
reconnection_keys = {opt[0] for opt in channel_options}
Comment thread
dheeraj-vanamala marked this conversation as resolved.
Outdated
current_options = [
opt for opt in current_options if opt[0] not in reconnection_keys
]
final_options = tuple(current_options + channel_options)

if self._insecure:
self._channel = insecure_channel(
self._endpoint,
compression=compression,
options=self._channel_options,
compression=self._compression,
options=final_options,
)
else:
self._credentials = _get_credentials(
credentials,
self._credentials,
Comment thread
dheeraj-vanamala marked this conversation as resolved.
Outdated
_OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER,
OTEL_EXPORTER_OTLP_CERTIFICATE,
OTEL_EXPORTER_OTLP_CLIENT_KEY,
Expand All @@ -359,13 +414,14 @@ def __init__(
self._channel = secure_channel(
self._endpoint,
self._credentials,
compression=compression,
options=self._channel_options,
compression=self._compression,
options=final_options,
)
self._client = self._stub(self._channel) # type: ignore [reportCallIssue]

self._shutdown_in_progress = threading.Event()
self._shutdown = False
if not hasattr(self, "_shutdown_in_progress"):
self._shutdown_in_progress = threading.Event()
self._shutdown = False
Comment thread
dheeraj-vanamala marked this conversation as resolved.
Outdated

@abstractmethod
def _translate_data(
Expand Down Expand Up @@ -407,6 +463,26 @@ def _export(
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

# For UNAVAILABLE errors, reinitialize the channel to force reconnection
if error.code() == StatusCode.UNAVAILABLE: # type: ignore
Comment thread
dheeraj-vanamala marked this conversation as resolved.
Outdated
logger.debug(
"Reinitializing gRPC channel for %s exporter due to UNAVAILABLE error",
self._exporting,
)
try:
self._channel.close()
except Exception as e:
logger.debug(
"Error closing channel for %s exporter to %s: %s",
self._exporting,
self._endpoint,
str(e),
)
# Enable channel reconnection for subsequent calls
self._channel_reconnection_enabled = True
self._initialize_channel_and_stub()

if (
error.code() not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue]
or retry_num + 1 == _MAX_RETRYS
Expand Down Expand Up @@ -436,6 +512,12 @@ def _export(
return self._result.FAILURE # type: ignore [reportReturnType]

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
"""
Shut down the exporter.

Args:
timeout_millis: Timeout in milliseconds for shutting down the exporter.
"""
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring call")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from unittest import TestCase
from unittest.mock import Mock, patch

import grpc
from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module
Duration,
)
Expand Down Expand Up @@ -89,8 +90,8 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
def _exporting(self):
return "traces"

def shutdown(self, timeout_millis=30_000):
return OTLPExporterMixin.shutdown(self, timeout_millis)
def shutdown(self, timeout_millis: float = 30_000, **kwargs):
return OTLPExporterMixin.shutdown(self, timeout_millis, **kwargs)


class TraceServiceServicerWithExportParams(TraceServiceServicer):
Expand Down Expand Up @@ -511,6 +512,16 @@ def test_timeout_set_correctly(self):
self.assertEqual(mock_trace_service.num_requests, 2)
self.assertAlmostEqual(after - before, 1.4, 1)

def test_channel_options_set_correctly(self):
"""Test that gRPC channel options are set correctly for keepalive and reconnection"""
# This test verifies that the channel is created with the right options
# We patch grpc.insecure_channel to ensure it is called without errors
with patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel"
) as mock_channel:
OTLPSpanExporterForTesting(insecure=True)
self.assertTrue(mock_channel.called)

def test_otlp_headers_from_env(self):
# pylint: disable=protected-access
# This ensures that there is no other header than standard user-agent.
Expand All @@ -534,3 +545,30 @@ def test_permanent_failure(self):
warning.records[-1].message,
"Failed to export traces to localhost:4317, error code: StatusCode.ALREADY_EXISTS",
)

def test_unavailable_reconnects(self):
"""Test that the exporter reconnects on UNAVAILABLE error"""
add_TraceServiceServicer_to_server(
TraceServiceServicerWithExportParams(StatusCode.UNAVAILABLE),
self.server,
)

# Spy on grpc.insecure_channel to verify it's called for reconnection
with patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel",
side_effect=grpc.insecure_channel,
) as mock_insecure_channel:
# Mock sleep to avoid waiting
with patch("time.sleep"):
# We expect FAILURE because the server keeps returning UNAVAILABLE
# but we want to verify reconnection attempts happened
self.exporter.export([self.span])

# Verify that we attempted to reinitialize the channel (called insecure_channel)
# Since the initial channel was created in setUp (unpatched), this call
# must be from the reconnection logic.
self.assertTrue(mock_insecure_channel.called)
# Verify that reconnection enabled flag is set
self.assertTrue(
getattr(self.exporter, "_channel_reconnection_enabled", False)
)