From 793123b14e3c2292caf059d526a02dc854edaae7 Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Fri, 12 Jun 2026 14:59:01 +0200 Subject: [PATCH 1/4] fix: thread-safety issues Signed-off-by: Lea Konvalinka --- openfeature/_event_support.py | 17 +- openfeature/api.py | 1 - openfeature/client.py | 12 +- openfeature/hook/__init__.py | 15 +- openfeature/provider/__init__.py | 5 +- openfeature/provider/_registry.py | 3 +- openfeature/transaction_context/__init__.py | 17 +- tests/provider/test_registry_concurrency.py | 261 ++++++++++++++++++++ tests/test_client.py | 58 ++++- tests/test_locks.py | 126 ++++++++++ 10 files changed, 490 insertions(+), 25 deletions(-) create mode 100644 tests/provider/test_registry_concurrency.py create mode 100644 tests/test_locks.py diff --git a/openfeature/_event_support.py b/openfeature/_event_support.py index 3928be3e..afa6855e 100644 --- a/openfeature/_event_support.py +++ b/openfeature/_event_support.py @@ -61,6 +61,16 @@ def add_client_handler( handlers = _client_handlers[client][event] handlers.append(handler) + # Immediate handler fires outside the lock intentionally: the status check calls + # client.get_provider_status() which acquires the registry lock, and the registry + # holds its lock when calling run_handlers_for_provider → _client_lock, so checking + # under _client_lock would deadlock. As a consequence, a narrow double-fire is + # possible if run_handlers_for_provider fires between the append and this check. + # + # double-fire case: some thread calls add_handler(handler1), which adds the handler and runs it immediately, and + # after adding it, the lock is released and a second thread that was waiting on the lock aquires it to run + # run_handlers_for_providers which then calls every handler for this client its handler list (including the one + # that was just added) _run_immediate_handler(client, event, handler) @@ -78,6 +88,7 @@ def add_global_handler(event: ProviderEvent, handler: EventHandler) -> None: from openfeature.api import get_client # noqa: PLC0415 + # See comment in add_client_handler for why this runs outside the lock. _run_immediate_handler(get_client(), event, handler) @@ -134,6 +145,6 @@ def _run_handler(handler: EventHandler, details: EventDetails) -> None: def clear() -> None: with _global_lock: - _global_handlers.clear() - with _client_lock: - _client_handlers.clear() + with _client_lock: + _global_handlers.clear() + _client_handlers.clear() diff --git a/openfeature/api.py b/openfeature/api.py index 4585e50e..80fa2434 100644 --- a/openfeature/api.py +++ b/openfeature/api.py @@ -62,7 +62,6 @@ def set_provider_and_wait(provider: FeatureProvider, domain: str | None = None) def clear_providers() -> None: provider_registry.clear_providers() - _event_support.clear() def get_provider_metadata(domain: str | None = None) -> Metadata: diff --git a/openfeature/client.py b/openfeature/client.py index 95dc5b6d..17ba8adb 100644 --- a/openfeature/client.py +++ b/openfeature/client.py @@ -1,4 +1,5 @@ import logging +import threading import typing from collections.abc import Awaitable, Mapping, Sequence from dataclasses import dataclass @@ -86,6 +87,7 @@ def __init__( self.version = version self.context = context or EvaluationContext() self.hooks = hooks or [] + self._hooks_lock = threading.Lock() @property def provider(self) -> FeatureProvider: @@ -98,7 +100,8 @@ def get_metadata(self) -> ClientMetadata: return ClientMetadata(domain=self.domain) def add_hooks(self, hooks: list[Hook]) -> None: - self.hooks = self.hooks + hooks + with self._hooks_lock: + self.hooks = self.hooks + hooks def get_boolean_value( self, @@ -468,8 +471,9 @@ def _establish_hooks_and_provider( def _assert_provider_status( self, + provider: FeatureProvider, ) -> OpenFeatureError | None: - status = self.get_provider_status() + status = provider_registry.get_provider_status(provider) if status == ProviderStatus.NOT_READY: return ProviderNotReadyError() if status == ProviderStatus.FATAL: @@ -589,7 +593,7 @@ async def evaluate_flag_details_async( ) try: - if provider_err := self._assert_provider_status(): + if provider_err := self._assert_provider_status(provider): error_hooks( flag_type, provider_err, @@ -765,7 +769,7 @@ def evaluate_flag_details( ) try: - if provider_err := self._assert_provider_status(): + if provider_err := self._assert_provider_status(provider): error_hooks( flag_type, provider_err, diff --git a/openfeature/hook/__init__.py b/openfeature/hook/__init__.py index 247d316b..e822f093 100644 --- a/openfeature/hook/__init__.py +++ b/openfeature/hook/__init__.py @@ -1,5 +1,6 @@ from __future__ import annotations +import threading import typing from collections.abc import Mapping, MutableMapping, Sequence from datetime import datetime @@ -24,6 +25,7 @@ ] _hooks: list[Hook] = [] +_hooks_lock = threading.Lock() # https://openfeature.dev/specification/sections/hooks/#requirement-461 @@ -150,15 +152,20 @@ def supports_flag_value_type(self, flag_type: FlagType) -> bool: """ return True +# while the lock guarantees safety, even without them there was never a loss within 50.000 runs (with the default GIL +# switch interval of 5ms). only when the switch interval was significantly shortened to 0.1 microseconds, losses were +# observed without locks every now and then. def add_hooks(hooks: list[Hook]) -> None: - global _hooks - _hooks = _hooks + hooks + with _hooks_lock: + global _hooks + _hooks = _hooks + hooks def clear_hooks() -> None: - global _hooks - _hooks = [] + with _hooks_lock: + global _hooks + _hooks = [] def get_hooks() -> list[Hook]: diff --git a/openfeature/provider/__init__.py b/openfeature/provider/__init__.py index 1b2b5206..e02aab78 100644 --- a/openfeature/provider/__init__.py +++ b/openfeature/provider/__init__.py @@ -261,5 +261,6 @@ def emit_provider_stale(self, details: ProviderEventDetails) -> None: self.emit(ProviderEvent.PROVIDER_STALE, details) def emit(self, event: ProviderEvent, details: ProviderEventDetails) -> None: - if hasattr(self, "_on_emit"): - self._on_emit(self, event, details) + on_emit = getattr(self, "_on_emit", None) + if on_emit is not None: + on_emit(self, event, details) diff --git a/openfeature/provider/_registry.py b/openfeature/provider/_registry.py index e46caadd..41fa4d96 100644 --- a/openfeature/provider/_registry.py +++ b/openfeature/provider/_registry.py @@ -1,6 +1,6 @@ import threading -from openfeature._event_support import run_handlers_for_provider +from openfeature._event_support import run_handlers_for_provider, clear as clear_event_handlers from openfeature.evaluation_context import EvaluationContext, get_evaluation_context from openfeature.event import ( ProviderEvent, @@ -93,6 +93,7 @@ def clear_providers(self) -> None: self._provider_status = { self._default_provider: ProviderStatus.READY, } + clear_event_handlers() def shutdown(self) -> None: with self._lock: diff --git a/openfeature/transaction_context/__init__.py b/openfeature/transaction_context/__init__.py index 15ac7e01..e44f6135 100644 --- a/openfeature/transaction_context/__init__.py +++ b/openfeature/transaction_context/__init__.py @@ -1,3 +1,5 @@ +import threading + from openfeature.evaluation_context import EvaluationContext from openfeature.transaction_context.context_var_transaction_context_propagator import ( ContextVarsTransactionContextPropagator, @@ -21,13 +23,15 @@ _evaluation_transaction_context_propagator: TransactionContextPropagator = ( NoOpTransactionContextPropagator() ) +_propagator_lock = threading.Lock() def set_transaction_context_propagator( transaction_context_propagator: TransactionContextPropagator, ) -> None: global _evaluation_transaction_context_propagator - _evaluation_transaction_context_propagator = transaction_context_propagator + with _propagator_lock: + _evaluation_transaction_context_propagator = transaction_context_propagator def clear_transaction_context_propagator() -> None: @@ -35,11 +39,12 @@ def clear_transaction_context_propagator() -> None: def get_transaction_context() -> EvaluationContext: - return _evaluation_transaction_context_propagator.get_transaction_context() + with _propagator_lock: + propagator = _evaluation_transaction_context_propagator + return propagator.get_transaction_context() def set_transaction_context(evaluation_context: EvaluationContext) -> None: - global _evaluation_transaction_context_propagator - _evaluation_transaction_context_propagator.set_transaction_context( - evaluation_context - ) + with _propagator_lock: + propagator = _evaluation_transaction_context_propagator + propagator.set_transaction_context(evaluation_context) diff --git a/tests/provider/test_registry_concurrency.py b/tests/provider/test_registry_concurrency.py new file mode 100644 index 00000000..1953d695 --- /dev/null +++ b/tests/provider/test_registry_concurrency.py @@ -0,0 +1,261 @@ +""" +Barrier-based concurrency regression tests for ProviderRegistry. + +Each test forces the exact thread interleaving that caused the documented race +condition before the registry lock was introduced. The threading.Barrier ensures +both threads reach the entry point of the critical section simultaneously, maximising +the chance of hitting the bad interleaving on every run. + +These tests would have produced one of the following symptoms without the lock: + - AssertionError (e.g. shutdown called twice) + - KeyError (concurrent del on the same dict key) + - Undetected lost update (initialize called twice, silently) +""" + +import threading +from unittest.mock import Mock, call + +from openfeature.provider._registry import ProviderRegistry + + +def _make_provider(name: str = "") -> Mock: + """Return a Mock that satisfies the FeatureProvider protocol well enough for the registry.""" + p = Mock() + p.get_metadata.return_value = Mock(name=name) + return p + + +# --------------------------------------------------------------------------- +# set_provider races +# --------------------------------------------------------------------------- + + +def test_concurrent_replacement_of_same_domain_shuts_down_old_provider_exactly_once(): + """ + Race: two threads both call set_provider("domain", new_X) while an old provider + is registered. Without the lock both threads read old_provider, delete the key, + and call _shutdown_provider — double-shutdown or KeyError on the second del. + + Forced interleaving: + Thread A ──► barrier.wait() ──► set_provider("domain", new1) ──► + Thread B ──► barrier.wait() ──► set_provider("domain", new2) ──► + + Invariant: old_provider.shutdown called exactly once. + """ + registry = ProviderRegistry() + old_provider = _make_provider("old") + registry.set_provider("domain", old_provider) + + barrier = threading.Barrier(2) + errors: list[Exception] = [] + + def replace(new_provider: Mock) -> None: + try: + barrier.wait() + registry.set_provider("domain", new_provider) + except Exception as e: + errors.append(e) + + new1, new2 = _make_provider("new1"), _make_provider("new2") + threads = [ + threading.Thread(target=replace, args=(new1,)), + threading.Thread(target=replace, args=(new2,)), + ] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, f"Exception(s) raised in threads: {errors}" + old_provider.shutdown.assert_called_once() + + +def test_concurrent_replacement_of_same_domain_does_not_leave_orphaned_initialized_provider(): + """ + Race: two threads both call set_provider("domain", new_X) while an old provider + is registered. + + Invariant: any newly introduced provider that gets initialized is either: + - the provider currently registered for the domain, or + - subsequently shut down and no longer READY. + """ + from openfeature.provider import ProviderStatus + + registry = ProviderRegistry() + registry.set_provider("domain", _make_provider("old")) + + barrier = threading.Barrier(2) + errors: list[Exception] = [] + + def replace(new_provider: Mock) -> None: + try: + barrier.wait() + registry.set_provider("domain", new_provider) + except Exception as e: + errors.append(e) + + new1, new2 = _make_provider("new1"), _make_provider("new2") + threads = [ + threading.Thread(target=replace, args=(new1,)), + threading.Thread(target=replace, args=(new2,)), + ] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, f"Exception(s) raised in threads: {errors}" + + registered = registry.get_provider("domain") + + for candidate in (new1, new2): + was_initialized = candidate.initialize.call_count > 0 + is_registered = candidate is registered + was_shutdown = candidate.shutdown.call_count > 0 + status = registry.get_provider_status(candidate) + + if was_initialized and not is_registered: + assert was_shutdown, "Initialized provider became orphaned without shutdown" + assert status == ProviderStatus.NOT_READY + +# without locks in registry.py, this fails like 25 times out of 5000 +# with locks all tests pass +def test_concurrent_registration_of_same_provider_to_different_domains_initializes_exactly_once(): + """ + Race: two threads call set_provider with the *same* provider object but different + domains. Without the lock both threads check `provider not in providers.values()` + simultaneously (True for both, since neither has inserted yet) and both call + _initialize_provider — double-initialize. + + Forced interleaving: + Thread A ──► barrier.wait() ──► set_provider("domain1", provider) ──► + Thread B ──► barrier.wait() ──► set_provider("domain2", provider) ──► + + Invariant: provider.initialize called exactly once. + """ + registry = ProviderRegistry() + provider = _make_provider("shared") + barrier = threading.Barrier(2) + errors: list[Exception] = [] + + def register(domain: str) -> None: + try: + barrier.wait() + registry.set_provider(domain, provider) + except Exception as e: + errors.append(e) + + threads = [ + threading.Thread(target=register, args=("domain1",)), + threading.Thread(target=register, args=("domain2",)), + ] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, f"Exception(s) raised in threads: {errors}" + provider.initialize.assert_called_once() + + +# --------------------------------------------------------------------------- +# set_default_provider race +# --------------------------------------------------------------------------- + +# without locks in registry.py, this fails like 20 times out of 5000 +# with locks all tests pass +def test_concurrent_set_default_provider_shuts_down_old_default_exactly_once(): + """ + Race: two threads both call set_default_provider concurrently while the same + old default is active. Without the lock both threads read the same old default, + pass the `not in providers.values()` check, and both call _shutdown_provider — + double-shutdown. + + Forced interleaving: + Thread A ──► barrier.wait() ──► set_default_provider(new1) ──► + Thread B ──► barrier.wait() ──► set_default_provider(new2) ──► + + Invariant: old_default.shutdown called exactly once. + """ + registry = ProviderRegistry() + old_default = _make_provider("old_default") + registry.set_default_provider(old_default) + + barrier = threading.Barrier(2) + errors: list[Exception] = [] + + def replace(new_provider: Mock) -> None: + try: + barrier.wait() + registry.set_default_provider(new_provider) + except Exception as e: + errors.append(e) + + new1, new2 = _make_provider("new1"), _make_provider("new2") + threads = [ + threading.Thread(target=replace, args=(new1,)), + threading.Thread(target=replace, args=(new2,)), + ] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, f"Exception(s) raised in threads: {errors}" + old_default.shutdown.assert_called_once() + + +# --------------------------------------------------------------------------- +# clear_providers race +# --------------------------------------------------------------------------- + + +def test_concurrent_get_provider_during_clear_never_returns_shut_down_provider(): + """ + Race: one thread calls clear_providers() while another repeatedly calls + get_provider(). Without the lock there was a window after shutdown() but + before _providers.clear() where get_provider returned a provider that had + already been shut down. + + Forced interleaving: + Thread A ──► barrier.wait() ──► clear_providers() ──► + Thread B ──► barrier.wait() ──► get_provider() x N ──► + + Invariant: every provider returned by get_provider is either the pre-clear + provider (READY) or the post-clear NoOp (READY). It must never be a provider + whose status is NOT_READY (i.e. shut down but still visible in the registry). + """ + from openfeature.provider import ProviderStatus + from openfeature.provider.no_op_provider import NoOpProvider + + registry = ProviderRegistry() + provider = _make_provider("active") + registry.set_provider("domain", provider) + + barrier = threading.Barrier(2) + bad_observations: list[str] = [] + + def clear() -> None: + barrier.wait() + registry.clear_providers() + + def read() -> None: + barrier.wait() + for _ in range(200): + #p = registry.get_provider("domain") + status = registry.get_provider_status(registry.get_provider("domain")) + if status == ProviderStatus.NOT_READY: + bad_observations.append( + f"get_provider returned shut-down provider " + ) + + threads = [ + threading.Thread(target=clear), + threading.Thread(target=read), + ] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not bad_observations, "\n".join(bad_observations) diff --git a/tests/test_client.py b/tests/test_client.py index 44b49e5f..7be709a4 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -4,11 +4,12 @@ import types import uuid from concurrent.futures import ThreadPoolExecutor -from unittest.mock import MagicMock +from unittest.mock import MagicMock, Mock import pytest from openfeature import _event_support, api +from openfeature import client as client_module from openfeature.api import ( add_hooks, clear_hooks, @@ -20,7 +21,7 @@ from openfeature.client import OpenFeatureClient, _typecheck_flag_value from openfeature.evaluation_context import EvaluationContext from openfeature.event import EventDetails, ProviderEvent, ProviderEventDetails -from openfeature.exception import ErrorCode, OpenFeatureError +from openfeature.exception import ErrorCode, OpenFeatureError, ProviderFatalError from openfeature.flag_evaluation import FlagResolutionDetails, FlagType, Reason from openfeature.hook import Hook from openfeature.provider import FeatureProvider, ProviderStatus @@ -292,8 +293,12 @@ async def test_should_shortcircuit_if_provider_is_not_ready( no_op_provider_client, monkeypatch ): # Given + from openfeature.provider._registry import provider_registry + monkeypatch.setattr( - no_op_provider_client, "get_provider_status", lambda: ProviderStatus.NOT_READY + provider_registry, + "get_provider_status", + lambda provider: ProviderStatus.NOT_READY, ) spy_hook = MagicMock(spec=Hook) no_op_provider_client.add_hooks([spy_hook]) @@ -322,8 +327,12 @@ async def test_should_shortcircuit_if_provider_is_in_irrecoverable_error_state( no_op_provider_client, monkeypatch ): # Given + from openfeature.provider._registry import provider_registry + monkeypatch.setattr( - no_op_provider_client, "get_provider_status", lambda: ProviderStatus.FATAL + provider_registry, + "get_provider_status", + lambda provider: ProviderStatus.FATAL, ) spy_hook = MagicMock(spec=Hook) no_op_provider_client.add_hooks([spy_hook]) @@ -768,3 +777,44 @@ def test_should_noop_if_provider_does_not_support_tracking(monkeypatch): set_provider(provider) client = get_client() client.track(tracking_event_name="test") + + +def test_assert_provider_status_uses_passed_provider_not_current_registry_state(): + """ + Regression: the flag evaluation pipeline captures a provider reference + at the start of _establish_hooks_and_provider and uses that same + reference for the actual resolution call. The status assertion must + use that captured reference too — otherwise a concurrent provider swap + between capture and status-check would cause the check to run against + a *different* provider than the one actually evaluated, e.g. allowing + a FATAL provider's flag to be evaluated as if it were READY. + + This is a direct invariant test: when called with provider X while + self.provider returns Y, _assert_provider_status must report X's status. + """ + fatal_provider = NoOpProvider() + ready_provider = NoOpProvider() + + registry_mock = Mock() + registry_mock.get_provider_status.side_effect = lambda p: ( + ProviderStatus.FATAL if p is fatal_provider else ProviderStatus.READY + ) + registry_mock.get_provider.return_value = ready_provider + + original = client_module.provider_registry + client_module.provider_registry = registry_mock + try: + c = OpenFeatureClient(domain=None, version=None) + assert c.provider is ready_provider, ( + "test setup: self.provider should resolve via the patched registry" + ) + + err = c._assert_provider_status(fatal_provider) + assert isinstance(err, ProviderFatalError), ( + "status check used self.provider (READY) instead of the captured " + "fatal_provider — TOCTOU regression" + ) + registry_mock.get_provider_status.assert_any_call(fatal_provider) + assert c._assert_provider_status(ready_provider) is None + finally: + client_module.provider_registry = original diff --git a/tests/test_locks.py b/tests/test_locks.py new file mode 100644 index 00000000..6a3bd184 --- /dev/null +++ b/tests/test_locks.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +import threading +from unittest.mock import Mock + +from openfeature import _event_support +from openfeature import api as api_module +from openfeature import hook as hook_module +from openfeature.client import OpenFeatureClient + +from openfeature.evaluation_context import EvaluationContext +from openfeature.event import ProviderEvent, ProviderEventDetails +from openfeature.flag_evaluation import FlagResolutionDetails +from openfeature.hook import Hook, add_hooks, clear_hooks, get_hooks +from openfeature.provider import AbstractProvider +from openfeature.provider import _registry +from openfeature.transaction_context import ( + clear_transaction_context_propagator, + get_transaction_context, + set_transaction_context_propagator, +) +from openfeature.transaction_context.no_op_transaction_context_propagator import ( + NoOpTransactionContextPropagator, +) + + +class _StubProvider(AbstractProvider): + """Minimal concrete provider for use in concurrency tests.""" + + def get_metadata(self): + m = Mock() + m.name = "stub" + return m + + def resolve_boolean_details(self, flag_key, default_value, evaluation_context=None): + return FlagResolutionDetails(value=default_value) + + def resolve_string_details(self, flag_key, default_value, evaluation_context=None): + return FlagResolutionDetails(value=default_value) + + def resolve_integer_details(self, flag_key, default_value, evaluation_context=None): + return FlagResolutionDetails(value=default_value) + + def resolve_float_details(self, flag_key, default_value, evaluation_context=None): + return FlagResolutionDetails(value=default_value) + + def resolve_object_details(self, flag_key, default_value, evaluation_context=None): + return FlagResolutionDetails(value=default_value) + + + +def test_clear_providers_does_not_fire_handler_against_removed_provider(): + """ + Forced interleaving: + Clearer ──► clear_providers() ──► [registry emptied] ──► barrier ──► clear handlers + Dispatcher ──► barrier ──► dispatch_event(provider, ...) + + Invariant: the handler is never invoked for a provider that is no longer + present in the registry. + """ + registry = _registry.provider_registry + + api_module.clear_providers() + provider = _StubProvider() + api_module.set_provider(provider) + + fired_against_removed_provider: list[bool] = [] + + def handler(details) -> None: + registered = { + registry.get_default_provider(), + *registry._providers.values(), + } + if provider not in registered: + fired_against_removed_provider.append(True) + + api_module.add_handler(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, handler) + + window = threading.Barrier(2) + original_clear = _registry.clear_event_handlers + + def clear_event_handlers_in_window() -> None: + # Reached only after the registry has already been emptied; rendezvous + # here so the dispatcher enters precisely the providers-gone / + # handlers-still-present window the old code exposed. + window.wait(timeout=5.0) + original_clear() + + errors: list[Exception] = [] + + def clearer() -> None: + try: + api_module.clear_providers() + except Exception as e: # pragma: no cover + errors.append(e) + + def dispatcher() -> None: + try: + window.wait(timeout=5.0) + registry.dispatch_event( + provider, + ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, + ProviderEventDetails(), + ) + except Exception as e: # pragma: no cover + errors.append(e) + + _registry.clear_event_handlers = clear_event_handlers_in_window + threads = [threading.Thread(target=clearer), threading.Thread(target=dispatcher)] + try: + for t in threads: + t.start() + for t in threads: + t.join(timeout=10.0) + assert not [t for t in threads if t.is_alive()], ( + "threads did not complete — possible deadlock" + ) + assert not errors, f"Exception(s) raised in threads: {errors}" + assert not fired_against_removed_provider, ( + "event handler fired for a provider already removed from the " + "registry: clear_providers() exposed the gap between clearing " + "providers and clearing event handlers" + ) + finally: + _registry.clear_event_handlers = original_clear + api_module.clear_providers() \ No newline at end of file From d1e88b79d237c4341b979f5a8af7e77da76c9176 Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Thu, 18 Jun 2026 15:10:36 +0200 Subject: [PATCH 2/4] minor corrections Signed-off-by: Lea Konvalinka --- openfeature/_event_support.py | 17 +++++++---------- openfeature/client.py | 2 ++ openfeature/hook/__init__.py | 4 ++-- openfeature/provider/_registry.py | 14 +++++++++----- 4 files changed, 20 insertions(+), 17 deletions(-) diff --git a/openfeature/_event_support.py b/openfeature/_event_support.py index afa6855e..41fc28f0 100644 --- a/openfeature/_event_support.py +++ b/openfeature/_event_support.py @@ -61,16 +61,13 @@ def add_client_handler( handlers = _client_handlers[client][event] handlers.append(handler) - # Immediate handler fires outside the lock intentionally: the status check calls - # client.get_provider_status() which acquires the registry lock, and the registry - # holds its lock when calling run_handlers_for_provider → _client_lock, so checking - # under _client_lock would deadlock. As a consequence, a narrow double-fire is - # possible if run_handlers_for_provider fires between the append and this check. - # - # double-fire case: some thread calls add_handler(handler1), which adds the handler and runs it immediately, and - # after adding it, the lock is released and a second thread that was waiting on the lock aquires it to run - # run_handlers_for_providers which then calls every handler for this client its handler list (including the one - # that was just added) + # outside the lock intentionally: the immediate-fire status check acquires the registry lock, so calling it + # under _client_lock risks lock-order inversion against run_handlers_for_provider (registry lock → _client_lock). + # As a consequence, a narrow double-fire is possible: if dispatch_event(client's event) runs concurrently, it + # sets the matching provider status (enabling the immediate fire below) and then re-runs every handler for this + # client. If _run_immediate_handler lands after that status set but before dispatch snapshots the handler list, + # the handler fires twice — once here, once from dispatch. Only happens when the registered event matches the event + # being dispatched; otherwise the immediate fire is a no-op. _run_immediate_handler(client, event, handler) diff --git a/openfeature/client.py b/openfeature/client.py index 17ba8adb..9bf7f513 100644 --- a/openfeature/client.py +++ b/openfeature/client.py @@ -100,6 +100,8 @@ def get_metadata(self) -> ClientMetadata: return ClientMetadata(domain=self.domain) def add_hooks(self, hooks: list[Hook]) -> None: + # Guards the read-concat-store against a lost update; this practically never races under the default 5ms GIL + # switch interval, but is essential under a no-GIL build. with self._hooks_lock: self.hooks = self.hooks + hooks diff --git a/openfeature/hook/__init__.py b/openfeature/hook/__init__.py index e822f093..b0f142cc 100644 --- a/openfeature/hook/__init__.py +++ b/openfeature/hook/__init__.py @@ -152,9 +152,9 @@ def supports_flag_value_type(self, flag_type: FlagType) -> bool: """ return True -# while the lock guarantees safety, even without them there was never a loss within 50.000 runs (with the default GIL +# while the lock guarantees safety, even without it there was never a loss within 50.000 runs (with the default GIL # switch interval of 5ms). only when the switch interval was significantly shortened to 0.1 microseconds, losses were -# observed without locks every now and then. +# observed without locks every now and then. with a no-GIL python, the lock would be essential def add_hooks(hooks: list[Hook]) -> None: with _hooks_lock: diff --git a/openfeature/provider/_registry.py b/openfeature/provider/_registry.py index 41fa4d96..8e0a1874 100644 --- a/openfeature/provider/_registry.py +++ b/openfeature/provider/_registry.py @@ -54,9 +54,10 @@ def set_provider( self._shutdown_if_unused(old_provider) def get_provider(self, domain: str | None) -> FeatureProvider: - if domain is None: - return self._default_provider - return self._providers.get(domain, self._default_provider) + with self._lock: + if domain is None: + return self._default_provider + return self._providers.get(domain, self._default_provider) def set_default_provider( self, provider: FeatureProvider, wait_for_init: bool = False @@ -83,7 +84,8 @@ def set_default_provider( self._shutdown_if_unused(old_provider) def get_default_provider(self) -> FeatureProvider: - return self._default_provider + with self._lock: + return self._default_provider def clear_providers(self) -> None: self.shutdown() @@ -99,6 +101,7 @@ def shutdown(self) -> None: with self._lock: providers = {self._default_provider, *self._providers.values()} + # do we want to move this inside the lock? it allows a narrow double-shutdown window for provider in providers: self._shutdown_provider(provider) @@ -215,7 +218,8 @@ def _shutdown_provider( provider.detach() def get_provider_status(self, provider: FeatureProvider) -> ProviderStatus: - return self._provider_status.get(provider, ProviderStatus.NOT_READY) + with self._lock: + return self._provider_status.get(provider, ProviderStatus.NOT_READY) def dispatch_event( self, From 67b5fc498b395c3c58bfa59d28552539f2e5547c Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Fri, 19 Jun 2026 09:18:39 +0200 Subject: [PATCH 3/4] add explanation comment Signed-off-by: Lea Konvalinka --- openfeature/provider/_registry.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/openfeature/provider/_registry.py b/openfeature/provider/_registry.py index 8e0a1874..63fa6ce3 100644 --- a/openfeature/provider/_registry.py +++ b/openfeature/provider/_registry.py @@ -54,6 +54,12 @@ def set_provider( self._shutdown_if_unused(old_provider) def get_provider(self, domain: str | None) -> FeatureProvider: + # defensive lock under the GIL as the op is basically atomic + # but we might want to keep it so a provider that's about + # to be shut down isn't returned + # however it contributes to a potential deadlock that is currently + # still in place (clear_providers: registry's lock -> _event_support's lock; + # run_handlers_for_provider: _event_support's lock -> registry's lock) with self._lock: if domain is None: return self._default_provider @@ -218,6 +224,10 @@ def _shutdown_provider( provider.detach() def get_provider_status(self, provider: FeatureProvider) -> ProviderStatus: + # defensive lock under the GIL as the op is basically atomic + # but we might want to keep it so a provider that's about + # to be shut down isn't returned + # however, removing it would enable moving _run_immediate_handler into the lock i think with self._lock: return self._provider_status.get(provider, ProviderStatus.NOT_READY) From 5d11dbcad8735f46876480ba32e0fd41461a7ba7 Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Fri, 19 Jun 2026 12:46:44 +0200 Subject: [PATCH 4/4] remove leftover files Signed-off-by: Lea Konvalinka --- tests/provider/test_registry_concurrency.py | 261 -------------------- tests/test_client.py | 18 -- tests/test_locks.py | 126 ---------- 3 files changed, 405 deletions(-) delete mode 100644 tests/provider/test_registry_concurrency.py delete mode 100644 tests/test_locks.py diff --git a/tests/provider/test_registry_concurrency.py b/tests/provider/test_registry_concurrency.py deleted file mode 100644 index 1953d695..00000000 --- a/tests/provider/test_registry_concurrency.py +++ /dev/null @@ -1,261 +0,0 @@ -""" -Barrier-based concurrency regression tests for ProviderRegistry. - -Each test forces the exact thread interleaving that caused the documented race -condition before the registry lock was introduced. The threading.Barrier ensures -both threads reach the entry point of the critical section simultaneously, maximising -the chance of hitting the bad interleaving on every run. - -These tests would have produced one of the following symptoms without the lock: - - AssertionError (e.g. shutdown called twice) - - KeyError (concurrent del on the same dict key) - - Undetected lost update (initialize called twice, silently) -""" - -import threading -from unittest.mock import Mock, call - -from openfeature.provider._registry import ProviderRegistry - - -def _make_provider(name: str = "") -> Mock: - """Return a Mock that satisfies the FeatureProvider protocol well enough for the registry.""" - p = Mock() - p.get_metadata.return_value = Mock(name=name) - return p - - -# --------------------------------------------------------------------------- -# set_provider races -# --------------------------------------------------------------------------- - - -def test_concurrent_replacement_of_same_domain_shuts_down_old_provider_exactly_once(): - """ - Race: two threads both call set_provider("domain", new_X) while an old provider - is registered. Without the lock both threads read old_provider, delete the key, - and call _shutdown_provider — double-shutdown or KeyError on the second del. - - Forced interleaving: - Thread A ──► barrier.wait() ──► set_provider("domain", new1) ──► - Thread B ──► barrier.wait() ──► set_provider("domain", new2) ──► - - Invariant: old_provider.shutdown called exactly once. - """ - registry = ProviderRegistry() - old_provider = _make_provider("old") - registry.set_provider("domain", old_provider) - - barrier = threading.Barrier(2) - errors: list[Exception] = [] - - def replace(new_provider: Mock) -> None: - try: - barrier.wait() - registry.set_provider("domain", new_provider) - except Exception as e: - errors.append(e) - - new1, new2 = _make_provider("new1"), _make_provider("new2") - threads = [ - threading.Thread(target=replace, args=(new1,)), - threading.Thread(target=replace, args=(new2,)), - ] - for t in threads: - t.start() - for t in threads: - t.join() - - assert not errors, f"Exception(s) raised in threads: {errors}" - old_provider.shutdown.assert_called_once() - - -def test_concurrent_replacement_of_same_domain_does_not_leave_orphaned_initialized_provider(): - """ - Race: two threads both call set_provider("domain", new_X) while an old provider - is registered. - - Invariant: any newly introduced provider that gets initialized is either: - - the provider currently registered for the domain, or - - subsequently shut down and no longer READY. - """ - from openfeature.provider import ProviderStatus - - registry = ProviderRegistry() - registry.set_provider("domain", _make_provider("old")) - - barrier = threading.Barrier(2) - errors: list[Exception] = [] - - def replace(new_provider: Mock) -> None: - try: - barrier.wait() - registry.set_provider("domain", new_provider) - except Exception as e: - errors.append(e) - - new1, new2 = _make_provider("new1"), _make_provider("new2") - threads = [ - threading.Thread(target=replace, args=(new1,)), - threading.Thread(target=replace, args=(new2,)), - ] - for t in threads: - t.start() - for t in threads: - t.join() - - assert not errors, f"Exception(s) raised in threads: {errors}" - - registered = registry.get_provider("domain") - - for candidate in (new1, new2): - was_initialized = candidate.initialize.call_count > 0 - is_registered = candidate is registered - was_shutdown = candidate.shutdown.call_count > 0 - status = registry.get_provider_status(candidate) - - if was_initialized and not is_registered: - assert was_shutdown, "Initialized provider became orphaned without shutdown" - assert status == ProviderStatus.NOT_READY - -# without locks in registry.py, this fails like 25 times out of 5000 -# with locks all tests pass -def test_concurrent_registration_of_same_provider_to_different_domains_initializes_exactly_once(): - """ - Race: two threads call set_provider with the *same* provider object but different - domains. Without the lock both threads check `provider not in providers.values()` - simultaneously (True for both, since neither has inserted yet) and both call - _initialize_provider — double-initialize. - - Forced interleaving: - Thread A ──► barrier.wait() ──► set_provider("domain1", provider) ──► - Thread B ──► barrier.wait() ──► set_provider("domain2", provider) ──► - - Invariant: provider.initialize called exactly once. - """ - registry = ProviderRegistry() - provider = _make_provider("shared") - barrier = threading.Barrier(2) - errors: list[Exception] = [] - - def register(domain: str) -> None: - try: - barrier.wait() - registry.set_provider(domain, provider) - except Exception as e: - errors.append(e) - - threads = [ - threading.Thread(target=register, args=("domain1",)), - threading.Thread(target=register, args=("domain2",)), - ] - for t in threads: - t.start() - for t in threads: - t.join() - - assert not errors, f"Exception(s) raised in threads: {errors}" - provider.initialize.assert_called_once() - - -# --------------------------------------------------------------------------- -# set_default_provider race -# --------------------------------------------------------------------------- - -# without locks in registry.py, this fails like 20 times out of 5000 -# with locks all tests pass -def test_concurrent_set_default_provider_shuts_down_old_default_exactly_once(): - """ - Race: two threads both call set_default_provider concurrently while the same - old default is active. Without the lock both threads read the same old default, - pass the `not in providers.values()` check, and both call _shutdown_provider — - double-shutdown. - - Forced interleaving: - Thread A ──► barrier.wait() ──► set_default_provider(new1) ──► - Thread B ──► barrier.wait() ──► set_default_provider(new2) ──► - - Invariant: old_default.shutdown called exactly once. - """ - registry = ProviderRegistry() - old_default = _make_provider("old_default") - registry.set_default_provider(old_default) - - barrier = threading.Barrier(2) - errors: list[Exception] = [] - - def replace(new_provider: Mock) -> None: - try: - barrier.wait() - registry.set_default_provider(new_provider) - except Exception as e: - errors.append(e) - - new1, new2 = _make_provider("new1"), _make_provider("new2") - threads = [ - threading.Thread(target=replace, args=(new1,)), - threading.Thread(target=replace, args=(new2,)), - ] - for t in threads: - t.start() - for t in threads: - t.join() - - assert not errors, f"Exception(s) raised in threads: {errors}" - old_default.shutdown.assert_called_once() - - -# --------------------------------------------------------------------------- -# clear_providers race -# --------------------------------------------------------------------------- - - -def test_concurrent_get_provider_during_clear_never_returns_shut_down_provider(): - """ - Race: one thread calls clear_providers() while another repeatedly calls - get_provider(). Without the lock there was a window after shutdown() but - before _providers.clear() where get_provider returned a provider that had - already been shut down. - - Forced interleaving: - Thread A ──► barrier.wait() ──► clear_providers() ──► - Thread B ──► barrier.wait() ──► get_provider() x N ──► - - Invariant: every provider returned by get_provider is either the pre-clear - provider (READY) or the post-clear NoOp (READY). It must never be a provider - whose status is NOT_READY (i.e. shut down but still visible in the registry). - """ - from openfeature.provider import ProviderStatus - from openfeature.provider.no_op_provider import NoOpProvider - - registry = ProviderRegistry() - provider = _make_provider("active") - registry.set_provider("domain", provider) - - barrier = threading.Barrier(2) - bad_observations: list[str] = [] - - def clear() -> None: - barrier.wait() - registry.clear_providers() - - def read() -> None: - barrier.wait() - for _ in range(200): - #p = registry.get_provider("domain") - status = registry.get_provider_status(registry.get_provider("domain")) - if status == ProviderStatus.NOT_READY: - bad_observations.append( - f"get_provider returned shut-down provider " - ) - - threads = [ - threading.Thread(target=clear), - threading.Thread(target=read), - ] - for t in threads: - t.start() - for t in threads: - t.join() - - assert not bad_observations, "\n".join(bad_observations) diff --git a/tests/test_client.py b/tests/test_client.py index 7be709a4..64598936 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -292,9 +292,6 @@ def test_provider_should_return_error_status_if_failed(): async def test_should_shortcircuit_if_provider_is_not_ready( no_op_provider_client, monkeypatch ): - # Given - from openfeature.provider._registry import provider_registry - monkeypatch.setattr( provider_registry, "get_provider_status", @@ -326,9 +323,6 @@ async def test_should_shortcircuit_if_provider_is_not_ready( async def test_should_shortcircuit_if_provider_is_in_irrecoverable_error_state( no_op_provider_client, monkeypatch ): - # Given - from openfeature.provider._registry import provider_registry - monkeypatch.setattr( provider_registry, "get_provider_status", @@ -780,18 +774,6 @@ def test_should_noop_if_provider_does_not_support_tracking(monkeypatch): def test_assert_provider_status_uses_passed_provider_not_current_registry_state(): - """ - Regression: the flag evaluation pipeline captures a provider reference - at the start of _establish_hooks_and_provider and uses that same - reference for the actual resolution call. The status assertion must - use that captured reference too — otherwise a concurrent provider swap - between capture and status-check would cause the check to run against - a *different* provider than the one actually evaluated, e.g. allowing - a FATAL provider's flag to be evaluated as if it were READY. - - This is a direct invariant test: when called with provider X while - self.provider returns Y, _assert_provider_status must report X's status. - """ fatal_provider = NoOpProvider() ready_provider = NoOpProvider() diff --git a/tests/test_locks.py b/tests/test_locks.py deleted file mode 100644 index 6a3bd184..00000000 --- a/tests/test_locks.py +++ /dev/null @@ -1,126 +0,0 @@ -from __future__ import annotations - -import threading -from unittest.mock import Mock - -from openfeature import _event_support -from openfeature import api as api_module -from openfeature import hook as hook_module -from openfeature.client import OpenFeatureClient - -from openfeature.evaluation_context import EvaluationContext -from openfeature.event import ProviderEvent, ProviderEventDetails -from openfeature.flag_evaluation import FlagResolutionDetails -from openfeature.hook import Hook, add_hooks, clear_hooks, get_hooks -from openfeature.provider import AbstractProvider -from openfeature.provider import _registry -from openfeature.transaction_context import ( - clear_transaction_context_propagator, - get_transaction_context, - set_transaction_context_propagator, -) -from openfeature.transaction_context.no_op_transaction_context_propagator import ( - NoOpTransactionContextPropagator, -) - - -class _StubProvider(AbstractProvider): - """Minimal concrete provider for use in concurrency tests.""" - - def get_metadata(self): - m = Mock() - m.name = "stub" - return m - - def resolve_boolean_details(self, flag_key, default_value, evaluation_context=None): - return FlagResolutionDetails(value=default_value) - - def resolve_string_details(self, flag_key, default_value, evaluation_context=None): - return FlagResolutionDetails(value=default_value) - - def resolve_integer_details(self, flag_key, default_value, evaluation_context=None): - return FlagResolutionDetails(value=default_value) - - def resolve_float_details(self, flag_key, default_value, evaluation_context=None): - return FlagResolutionDetails(value=default_value) - - def resolve_object_details(self, flag_key, default_value, evaluation_context=None): - return FlagResolutionDetails(value=default_value) - - - -def test_clear_providers_does_not_fire_handler_against_removed_provider(): - """ - Forced interleaving: - Clearer ──► clear_providers() ──► [registry emptied] ──► barrier ──► clear handlers - Dispatcher ──► barrier ──► dispatch_event(provider, ...) - - Invariant: the handler is never invoked for a provider that is no longer - present in the registry. - """ - registry = _registry.provider_registry - - api_module.clear_providers() - provider = _StubProvider() - api_module.set_provider(provider) - - fired_against_removed_provider: list[bool] = [] - - def handler(details) -> None: - registered = { - registry.get_default_provider(), - *registry._providers.values(), - } - if provider not in registered: - fired_against_removed_provider.append(True) - - api_module.add_handler(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, handler) - - window = threading.Barrier(2) - original_clear = _registry.clear_event_handlers - - def clear_event_handlers_in_window() -> None: - # Reached only after the registry has already been emptied; rendezvous - # here so the dispatcher enters precisely the providers-gone / - # handlers-still-present window the old code exposed. - window.wait(timeout=5.0) - original_clear() - - errors: list[Exception] = [] - - def clearer() -> None: - try: - api_module.clear_providers() - except Exception as e: # pragma: no cover - errors.append(e) - - def dispatcher() -> None: - try: - window.wait(timeout=5.0) - registry.dispatch_event( - provider, - ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, - ProviderEventDetails(), - ) - except Exception as e: # pragma: no cover - errors.append(e) - - _registry.clear_event_handlers = clear_event_handlers_in_window - threads = [threading.Thread(target=clearer), threading.Thread(target=dispatcher)] - try: - for t in threads: - t.start() - for t in threads: - t.join(timeout=10.0) - assert not [t for t in threads if t.is_alive()], ( - "threads did not complete — possible deadlock" - ) - assert not errors, f"Exception(s) raised in threads: {errors}" - assert not fired_against_removed_provider, ( - "event handler fired for a provider already removed from the " - "registry: clear_providers() exposed the gap between clearing " - "providers and clearing event handlers" - ) - finally: - _registry.clear_event_handlers = original_clear - api_module.clear_providers() \ No newline at end of file