Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- `opentelemetry-sdk`: Add fork-safety to metrics `SynchronousMeasurementConsumer` by registering a post-fork child hook, lazily reinitializing metric reader storages on first use in the child process, and clearing asynchronous instruments to avoid duplicated state after `fork()`
([#4767](https://github.com/open-telemetry/opentelemetry-python/pull/4767))
- Fix incorrect code example in `create_tracer()` docstring
([#5072](https://github.com/open-telemetry/opentelemetry-python/issues/5072))
- `opentelemetry-sdk`: add `load_entry_point` shared utility to declarative file configuration for loading plugins via entry points; refactor propagator loading to use it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

# pylint: disable=unused-import

import os
from abc import ABC, abstractmethod
from threading import Lock
from time import time_ns
Expand Down Expand Up @@ -76,8 +77,23 @@ def __init__(
self._async_instruments: List[
opentelemetry.sdk.metrics._internal.instrument._Asynchronous
] = []
self._needs_storage_reinit = False
if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=self._at_fork_reinit)

def _at_fork_reinit(self):
"""Reinitialize lock in child process after fork"""
self._lock = Lock()
# Lazy reinitialization of storages on first use post fork. This is
# done to avoid the overhead of reinitializing the storages on
# every fork.
self._needs_storage_reinit = True
self._async_instruments.clear()

def consume_measurement(self, measurement: Measurement) -> None:
if self._needs_storage_reinit:
self._reinit_storages()

should_sample_exemplar = (
self._sdk_config.exemplar_filter.should_sample(
measurement.value,
Expand Down Expand Up @@ -105,6 +121,9 @@ def collect(
metric_reader: "opentelemetry.sdk.metrics.export.MetricReader",
timeout_millis: float = 10_000,
) -> Optional[MetricsData]:
if self._needs_storage_reinit:
self._reinit_storages()

with self._lock:
metric_reader_storage = self._reader_storages[metric_reader]
# for now, just use the defaults
Expand Down Expand Up @@ -143,3 +162,14 @@ def collect(
result = self._reader_storages[metric_reader].collect()

return result

def _reinit_storages(self):
# Reinitialize the storages after a fork to avoid duplicate data points.
# The flag is cleared inside the lock so concurrent callers only run
# reinit once.
with self._lock:
if not self._needs_storage_reinit:
return
for storage in self._reader_storages.values():
storage._at_fork_reinit()
self._needs_storage_reinit = False
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ def __init__(
self._instrument_class_temporality = instrument_class_temporality
self._instrument_class_aggregation = instrument_class_aggregation

def _at_fork_reinit(self) -> None:
"""Reinitialize locks and clear state in the child process after fork."""
self._lock = RLock()
self._instrument_view_instrument_matches.clear()

def _get_or_init_view_instrument_match(
self, instrument: _Instrument
) -> List[_ViewInstrumentMatch]:
Expand Down
Loading
Loading