From 4d3a4f3fa8a1a0d1c09b3e0a43e7945da8571a27 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 8 Jun 2026 17:06:16 +0000 Subject: [PATCH 1/6] fix race condition in reads --- .../runners/worker/statesampler_fast.pyx | 6 ++- .../runners/worker/statesampler_test.py | 41 +++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx index 45700a0b0f81..19b41a2f1a6f 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx @@ -217,7 +217,11 @@ cdef class ScopedState(object): @property def nsecs(self): - return self._nsecs + cdef int64_t val + pythread.PyThread_acquire_lock(self.sampler.lock, pythread.WAIT_LOCK) + val = self._nsecs + pythread.PyThread_release_lock(self.sampler.lock) + return val def sampled_seconds(self): return 1e-9 * self.nsecs diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 0d0ce1d2c8dc..04838ed7d66e 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -312,6 +312,47 @@ def test_do_operation_process_timer_with_exception(self, mock_get_dofn_specs): actual_value, state_duration_ms * (1.0 - margin_of_error)) _LOGGER.info("Exception test finished successfully.") + def test_concurrent_nsecs_reads_without_deadlock(self): + """Verify that concurrent reads of nsecs do not cause deadlocks. + + This test runs state transitions on the main thread and reads `nsecs` properties + from a secondary Python thread, while the background sampler thread is concurrently + acquiring the sampler lock and updating counter states. + """ + import threading + if not statesampler.FAST_SAMPLER: + self.skipTest('Requires FAST_SAMPLER') + + counter_factory = CounterFactory() + sampler = statesampler.StateSampler( + 'concurrent', counter_factory, sampling_period_ms=1) + + sampler.start() + state_a = sampler.scoped_state('step1', 'statea') + state_b = sampler.scoped_state('step1', 'stateb') + + stop_signal = False + + def read_nsecs_loop(): + while not stop_signal: + _ = state_a.nsecs + _ = state_b.nsecs + time.sleep(0.001) + + reader_thread = threading.Thread(target=read_nsecs_loop) + reader_thread.start() + + try: + for _ in range(100): + with state_a: + time.sleep(0.001) + with state_b: + time.sleep(0.001) + finally: + stop_signal = True + reader_thread.join() + sampler.stop() + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From 8378c6c5e067f1f7da07e82f33644529b8ead374 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 8 Jun 2026 17:24:55 +0000 Subject: [PATCH 2/6] rename test case - doc string --- sdks/python/apache_beam/runners/worker/statesampler_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 04838ed7d66e..45456356fc7b 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -312,12 +312,12 @@ def test_do_operation_process_timer_with_exception(self, mock_get_dofn_specs): actual_value, state_duration_ms * (1.0 - margin_of_error)) _LOGGER.info("Exception test finished successfully.") - def test_concurrent_nsecs_reads_without_deadlock(self): - """Verify that concurrent reads of nsecs do not cause deadlocks. + def test_concurrent_nsecs_reads(self): + """Verify that concurrent reads of nsecs behave correctly under thread contention. This test runs state transitions on the main thread and reads `nsecs` properties from a secondary Python thread, while the background sampler thread is concurrently - acquiring the sampler lock and updating counter states. + updating counter states. """ import threading if not statesampler.FAST_SAMPLER: From d0e188317d2b69fb3899dd3f2b80d8822925c35e Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 8 Jun 2026 17:26:14 +0000 Subject: [PATCH 3/6] move import to top --- sdks/python/apache_beam/runners/worker/statesampler_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 45456356fc7b..23ad9a7bf807 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -19,6 +19,7 @@ # pytype: skip-file import logging +import threading import time import unittest from unittest import mock @@ -319,7 +320,6 @@ def test_concurrent_nsecs_reads(self): from a secondary Python thread, while the background sampler thread is concurrently updating counter states. """ - import threading if not statesampler.FAST_SAMPLER: self.skipTest('Requires FAST_SAMPLER') From 7a3d8f51022876a21032d31b6b3e90870c05c874 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 8 Jun 2026 22:46:38 +0000 Subject: [PATCH 4/6] address gemini --- .../apache_beam/runners/worker/statesampler_fast.pyx | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx index 19b41a2f1a6f..64b0996afe33 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx @@ -218,9 +218,10 @@ cdef class ScopedState(object): @property def nsecs(self): cdef int64_t val - pythread.PyThread_acquire_lock(self.sampler.lock, pythread.WAIT_LOCK) - val = self._nsecs - pythread.PyThread_release_lock(self.sampler.lock) + with nogil: + pythread.PyThread_acquire_lock(self.sampler.lock, pythread.WAIT_LOCK) + val = self._nsecs + pythread.PyThread_release_lock(self.sampler.lock) return val def sampled_seconds(self): From 41e4ece98182a60485411b525a186f2d6f3f63c7 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 9 Jun 2026 11:47:58 +0000 Subject: [PATCH 5/6] address gemini comments --- .../runners/worker/statesampler_fast.pyx | 5 +-- .../runners/worker/statesampler_test.py | 32 ++++++++++--------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx index 64b0996afe33..7075ef47017d 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx @@ -217,11 +217,12 @@ cdef class ScopedState(object): @property def nsecs(self): + cdef pythread.PyThread_type_lock lock = self.sampler.lock cdef int64_t val with nogil: - pythread.PyThread_acquire_lock(self.sampler.lock, pythread.WAIT_LOCK) + pythread.PyThread_acquire_lock(lock, pythread.WAIT_LOCK) val = self._nsecs - pythread.PyThread_release_lock(self.sampler.lock) + pythread.PyThread_release_lock(lock) return val def sampled_seconds(self): diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 23ad9a7bf807..0d51c1e52e83 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -321,36 +321,38 @@ def test_concurrent_nsecs_reads(self): updating counter states. """ if not statesampler.FAST_SAMPLER: - self.skipTest('Requires FAST_SAMPLER') + self.skipTest('test_concurrent_nsecs_reads requires FAST_SAMPLER') counter_factory = CounterFactory() sampler = statesampler.StateSampler( - 'concurrent', counter_factory, sampling_period_ms=1) + 'basic', counter_factory, sampling_period_ms=1) sampler.start() - state_a = sampler.scoped_state('step1', 'statea') - state_b = sampler.scoped_state('step1', 'stateb') + reader_thread = None + try: + state_a = sampler.scoped_state('step1', 'statea') + state_b = sampler.scoped_state('step1', 'stateb') - stop_signal = False + stop_signal = False - def read_nsecs_loop(): - while not stop_signal: - _ = state_a.nsecs - _ = state_b.nsecs - time.sleep(0.001) + def read_nsecs_loop(): + while not stop_signal: + _ = state_a.nsecs + _ = state_b.nsecs + time.sleep(0.001) - reader_thread = threading.Thread(target=read_nsecs_loop) - reader_thread.start() + reader_thread = threading.Thread(target=read_nsecs_loop) + reader_thread.start() - try: for _ in range(100): with state_a: time.sleep(0.001) with state_b: time.sleep(0.001) finally: - stop_signal = True - reader_thread.join() + if reader_thread is not None: + stop_signal = True + reader_thread.join() sampler.stop() From 9ab9aecd904c21932c23065d791613a77d30fcf7 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 9 Jun 2026 11:48:15 +0000 Subject: [PATCH 6/6] address gemini comments --- sdks/python/apache_beam/runners/worker/statesampler_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 0d51c1e52e83..0495dc507da0 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -325,7 +325,7 @@ def test_concurrent_nsecs_reads(self): counter_factory = CounterFactory() sampler = statesampler.StateSampler( - 'basic', counter_factory, sampling_period_ms=1) + 'concurrent', counter_factory, sampling_period_ms=1) sampler.start() reader_thread = None