From 5bca88b01284fe03946f58f737b4389a1544c8f3 Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Fri, 26 Jun 2026 18:52:49 +0100 Subject: [PATCH 1/3] fix: stop background threads leaking across tests The polling and streaming worker threads could outlive the test that created them. Once leaked, they raced with pyfakefs (which is not thread-safe) during another test's filesystem patching, corrupting it, and pytest 9 turns the resulting unhandled thread exceptions into test errors. This surfaced as flaky failures on the pytest 7->9 bump (#228). - polling_manager: wait on the stop event instead of time.sleep so stop() interrupts the interval immediately and the thread joins promptly, and guard update_environment so an unexpected error never kills the thread. - streaming_manager: catch any exception in the run loop rather than a fixed tuple, so the thread never propagates an unhandled exception. - tests: join (not just stop) the threads on teardown so none outlive their test, and keep the realtime test's stream worker off the network. beep boop --- flagsmith/polling_manager.py | 12 +++++++++--- flagsmith/streaming_manager.py | 4 +++- tests/conftest.py | 15 +++++++++++---- tests/test_flagsmith.py | 9 ++++++++- 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/flagsmith/polling_manager.py b/flagsmith/polling_manager.py index b152e68..23eb4de 100644 --- a/flagsmith/polling_manager.py +++ b/flagsmith/polling_manager.py @@ -2,7 +2,6 @@ import logging import threading -import time import typing if typing.TYPE_CHECKING: @@ -25,9 +24,16 @@ def __init__( self.refresh_interval_seconds = refresh_interval_seconds def run(self) -> None: + # Wait on the stop event rather than sleeping so stop() interrupts + # the interval immediately and the thread can be joined promptly. while not self._stop_event.is_set(): - self.main.update_environment() - time.sleep(self.refresh_interval_seconds) + try: + self.main.update_environment() + except Exception: + # Never let an unexpected error kill the polling thread; log + # it and try again on the next interval. + logger.exception("Error updating environment") + self._stop_event.wait(self.refresh_interval_seconds) def stop(self) -> None: self._stop_event.set() diff --git a/flagsmith/streaming_manager.py b/flagsmith/streaming_manager.py index c4f2c6c..6f8cbb0 100644 --- a/flagsmith/streaming_manager.py +++ b/flagsmith/streaming_manager.py @@ -40,7 +40,9 @@ def run(self) -> None: for event in sse_client.events(): self.on_event(map_sse_event_to_stream_event(event)) - except (requests.RequestException, ValueError, TypeError): + except Exception: + # Never let an unexpected error kill the stream thread; log it + # and reconnect on the next loop iteration. logger.exception("Error opening or reading from the event stream") def stop(self) -> None: diff --git a/tests/conftest.py b/tests/conftest.py index 4631958..730e421 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -40,11 +40,18 @@ def tracking_init(self: Flagsmith, *args: typing.Any, **kwargs: typing.Any) -> N monkeypatch.setattr(Flagsmith, "__init__", tracking_init) yield + # Join, not just stop: pyfakefs is not thread-safe, so a thread that is + # still winding down races with the filesystem patching the next test + # performs (and, under pytest>=8.1, surfaces as a thread-exception error). for flagsmith in instances: - if getattr(flagsmith, "environment_data_polling_manager_thread", None): - flagsmith.environment_data_polling_manager_thread.stop() - if getattr(flagsmith, "event_stream_thread", None): - flagsmith.event_stream_thread.stop() + if polling := getattr( + flagsmith, "environment_data_polling_manager_thread", None + ): + polling.stop() + polling.join(timeout=5) + if stream := getattr(flagsmith, "event_stream_thread", None): + stream.stop() + stream.join(timeout=5) if flagsmith._event_processor: flagsmith._event_processor.stop() diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index 95c9605..0db48e2 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -795,8 +795,15 @@ def test_stream_not_used_by_default( def test_stream_used_when_enable_realtime_updates_is_true( - requests_session_response_ok: None, server_api_key: str + requests_session_response_ok: None, server_api_key: str, mocker: MockerFixture ) -> None: + # Given + # Keep the stream worker off the network so it stays idle until torn down. + mocker.patch( + "flagsmith.streaming_manager.requests.get", + side_effect=requests.exceptions.ReadTimeout(), + ) + # When flagsmith = Flagsmith( environment_key=server_api_key, From d424282be8d96256d3096e3e05f18c64b1fabc78 Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Fri, 26 Jun 2026 19:03:11 +0100 Subject: [PATCH 2/3] chore: drop explanatory comments beep boop --- flagsmith/polling_manager.py | 4 ---- flagsmith/streaming_manager.py | 2 -- tests/conftest.py | 3 --- tests/test_flagsmith.py | 1 - 4 files changed, 10 deletions(-) diff --git a/flagsmith/polling_manager.py b/flagsmith/polling_manager.py index 23eb4de..7462f52 100644 --- a/flagsmith/polling_manager.py +++ b/flagsmith/polling_manager.py @@ -24,14 +24,10 @@ def __init__( self.refresh_interval_seconds = refresh_interval_seconds def run(self) -> None: - # Wait on the stop event rather than sleeping so stop() interrupts - # the interval immediately and the thread can be joined promptly. while not self._stop_event.is_set(): try: self.main.update_environment() except Exception: - # Never let an unexpected error kill the polling thread; log - # it and try again on the next interval. logger.exception("Error updating environment") self._stop_event.wait(self.refresh_interval_seconds) diff --git a/flagsmith/streaming_manager.py b/flagsmith/streaming_manager.py index 6f8cbb0..6ce793c 100644 --- a/flagsmith/streaming_manager.py +++ b/flagsmith/streaming_manager.py @@ -41,8 +41,6 @@ def run(self) -> None: self.on_event(map_sse_event_to_stream_event(event)) except Exception: - # Never let an unexpected error kill the stream thread; log it - # and reconnect on the next loop iteration. logger.exception("Error opening or reading from the event stream") def stop(self) -> None: diff --git a/tests/conftest.py b/tests/conftest.py index 730e421..1858d8c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -40,9 +40,6 @@ def tracking_init(self: Flagsmith, *args: typing.Any, **kwargs: typing.Any) -> N monkeypatch.setattr(Flagsmith, "__init__", tracking_init) yield - # Join, not just stop: pyfakefs is not thread-safe, so a thread that is - # still winding down races with the filesystem patching the next test - # performs (and, under pytest>=8.1, surfaces as a thread-exception error). for flagsmith in instances: if polling := getattr( flagsmith, "environment_data_polling_manager_thread", None diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index 0db48e2..ce63dbf 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -798,7 +798,6 @@ def test_stream_used_when_enable_realtime_updates_is_true( requests_session_response_ok: None, server_api_key: str, mocker: MockerFixture ) -> None: # Given - # Keep the stream worker off the network so it stays idle until torn down. mocker.patch( "flagsmith.streaming_manager.requests.get", side_effect=requests.exceptions.ReadTimeout(), From 5717a1b9f20c4696de22f458eded890c27b5bc4f Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Fri, 26 Jun 2026 19:04:51 +0100 Subject: [PATCH 3/3] fix: back off after stream errors to avoid a busy loop beep boop --- flagsmith/streaming_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flagsmith/streaming_manager.py b/flagsmith/streaming_manager.py index 6ce793c..dc402d0 100644 --- a/flagsmith/streaming_manager.py +++ b/flagsmith/streaming_manager.py @@ -42,6 +42,7 @@ def run(self) -> None: except Exception: logger.exception("Error opening or reading from the event stream") + self._stop_event.wait(1) def stop(self) -> None: self._stop_event.set()