Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,13 @@ cdef class ScopedState(object):

@property
def nsecs(self):
return self._nsecs
cdef pythread.PyThread_type_lock lock = self.sampler.lock
cdef int64_t val
with nogil:
pythread.PyThread_acquire_lock(lock, pythread.WAIT_LOCK)
val = self._nsecs
pythread.PyThread_release_lock(lock)
return val
Comment thread
derrickaw marked this conversation as resolved.
Comment thread
derrickaw marked this conversation as resolved.

def sampled_seconds(self):
return 1e-9 * self.nsecs
Expand Down
43 changes: 43 additions & 0 deletions sdks/python/apache_beam/runners/worker/statesampler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# pytype: skip-file

import logging
import threading
import time
import unittest
from unittest import mock
Expand Down Expand Up @@ -312,6 +313,48 @@ def test_do_operation_process_timer_with_exception(self, mock_get_dofn_specs):
actual_value, state_duration_ms * (1.0 - margin_of_error))
_LOGGER.info("Exception test finished successfully.")

def test_concurrent_nsecs_reads(self):
"""Verify that concurrent reads of nsecs behave correctly under thread contention.

This test runs state transitions on the main thread and reads `nsecs` properties
from a secondary Python thread, while the background sampler thread is concurrently
updating counter states.
"""
if not statesampler.FAST_SAMPLER:
self.skipTest('test_concurrent_nsecs_reads requires FAST_SAMPLER')

counter_factory = CounterFactory()
sampler = statesampler.StateSampler(
'concurrent', counter_factory, sampling_period_ms=1)

sampler.start()
reader_thread = None
try:
state_a = sampler.scoped_state('step1', 'statea')
state_b = sampler.scoped_state('step1', 'stateb')

stop_signal = False

def read_nsecs_loop():
while not stop_signal:
_ = state_a.nsecs
_ = state_b.nsecs
time.sleep(0.001)

reader_thread = threading.Thread(target=read_nsecs_loop)
reader_thread.start()

for _ in range(100):
with state_a:
time.sleep(0.001)
with state_b:
time.sleep(0.001)
finally:
if reader_thread is not None:
stop_signal = True
reader_thread.join()
sampler.stop()
Comment thread
derrickaw marked this conversation as resolved.


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
Loading