From d5f5d96dfce1a66433e173fcdc460864064b5929 Mon Sep 17 00:00:00 2001 From: Paul Cacheux Date: Tue, 2 Jun 2026 18:18:41 +0200 Subject: [PATCH 1/2] Add test for concurrent local activity replay nondeterminism Add a regression test that runs two concurrent coroutines (via asyncio.gather) each issuing multiple sequential local activities, then replays the recorded history. Without a fix, replay fails with NondeterminismError because local activity markers are consumed in a different order than the original execution. --- tests/worker/test_workflow.py | 93 +++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index cb5bb8067..50cec1499 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -114,6 +114,7 @@ from temporalio.worker import ( ExecuteWorkflowInput, HandleSignalInput, + Replayer, UnsandboxedWorkflowRunner, Worker, WorkflowInstance, @@ -892,6 +893,98 @@ async def test_workflow_simple_local_activity(client: Client): assert result == "Hello, Temporal!" +@activity.defn +async def local_activity_slow(index: int) -> None: + if index % 2 == 0: + await asyncio.sleep(0.05) + + +@activity.defn +async def local_activity_fast(index: int) -> None: + return None + + +@activity.defn +async def local_activity_gate() -> None: + await asyncio.sleep(0.05) + + +@workflow.defn +class ConcurrentLocalActivityReplayWorkflow: + """Workflow that runs two concurrent coroutines with local activities. + + This reproduces a replay nondeterminism bug: during first execution, + local activities take real time, creating a deterministic interleaving. + During replay, all local activities return instantly from markers, which + can reorder coroutine scheduling and produce a different command sequence. + """ + + @workflow.run + async def run(self) -> list[int]: + async def lifecycle_a(index: int) -> int: + await workflow.execute_local_activity( + local_activity_slow, + args=[index * 2], + start_to_close_timeout=timedelta(seconds=5), + ) + await workflow.execute_local_activity( + local_activity_fast, + args=[index * 2], + start_to_close_timeout=timedelta(seconds=5), + ) + return index * 2 + + async def lifecycle_b(index: int) -> int: + await workflow.execute_local_activity( + local_activity_gate, + start_to_close_timeout=timedelta(seconds=5), + ) + await workflow.execute_local_activity( + local_activity_slow, + args=[index * 2 + 1], + start_to_close_timeout=timedelta(seconds=5), + ) + await workflow.execute_local_activity( + local_activity_fast, + args=[index * 2 + 1], + start_to_close_timeout=timedelta(seconds=5), + ) + return index * 2 + 1 + + results: list[int] = [] + for index in range(20): + results.extend(await asyncio.gather(lifecycle_a(index), lifecycle_b(index))) + return results + + +async def test_workflow_concurrent_local_activity_replay(client: Client): + """Test that concurrent local activities replay deterministically. + + Runs a workflow with two concurrent coroutines that each issue multiple + local activities, then replays the history. Without the fix, replay + fails with NondeterminismError because the local activity command order + diverges from the recorded marker order. + """ + async with new_worker( + client, + ConcurrentLocalActivityReplayWorkflow, + activities=[local_activity_slow, local_activity_fast, local_activity_gate], + ) as worker: + handle = await client.start_workflow( + ConcurrentLocalActivityReplayWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + expected = [v for i in range(20) for v in (i * 2, i * 2 + 1)] + assert await handle.result() == expected + + history = await handle.fetch_history() + + await Replayer( + workflows=[ConcurrentLocalActivityReplayWorkflow], + ).replay_workflow(history) + + @activity.defn async def wait_cancel() -> str: try: From 02e2c5ef85034cb04d420cdbd94bc6c16a41396d Mon Sep 17 00:00:00 2001 From: Paul Cacheux Date: Tue, 2 Jun 2026 18:18:53 +0200 Subject: [PATCH 2/2] Fix concurrent local activity replay nondeterminism During first execution, local activity resolve_activity jobs arrive in completion order (wall-clock). During replay, they arrive in sequence number order. When multiple coroutines issue local activities concurrently, this ordering difference causes coroutines to resume in a different order, assigning different sequence numbers to subsequent commands and triggering NondeterminismError. Fix by sorting local activity resolutions by sequence number before processing. This ensures coroutines always resume in scheduling order regardless of execution mode. --- temporalio/worker/_workflow_instance.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 16c3483d8..cac240e86 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -461,8 +461,30 @@ def activate( for index, job_set in enumerate(job_sets): if not job_set: continue + # Separate local-activity resolutions from other jobs. + # Local activity results may arrive in non-deterministic + # order (completion order varies between first-execution + # and replay). Sort them by sequence number so coroutines + # always resume in scheduling order, ensuring deterministic + # interleaving regardless of execution mode. + local_activity_jobs = [] + other_jobs = [] for job in job_set: - # Let errors bubble out of these to the caller to fail the task + if ( + job.HasField("resolve_activity") + and job.resolve_activity.is_local + ): + local_activity_jobs.append(job) + else: + other_jobs.append(job) + + # Apply non-local-activity jobs first + for job in other_jobs: + self._apply(job) + + # Apply local activity resolutions sorted by seq number. + local_activity_jobs.sort(key=lambda j: j.resolve_activity.seq) + for job in local_activity_jobs: self._apply(job) # Run one iteration of the loop. We do not allow conditions to