diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 16c3483d8..cac240e86 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -461,8 +461,30 @@ def activate( for index, job_set in enumerate(job_sets): if not job_set: continue + # Separate local-activity resolutions from other jobs. + # Local activity results may arrive in non-deterministic + # order (completion order varies between first-execution + # and replay). Sort them by sequence number so coroutines + # always resume in scheduling order, ensuring deterministic + # interleaving regardless of execution mode. + local_activity_jobs = [] + other_jobs = [] for job in job_set: - # Let errors bubble out of these to the caller to fail the task + if ( + job.HasField("resolve_activity") + and job.resolve_activity.is_local + ): + local_activity_jobs.append(job) + else: + other_jobs.append(job) + + # Apply non-local-activity jobs first + for job in other_jobs: + self._apply(job) + + # Apply local activity resolutions sorted by seq number. + local_activity_jobs.sort(key=lambda j: j.resolve_activity.seq) + for job in local_activity_jobs: self._apply(job) # Run one iteration of the loop. We do not allow conditions to diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index cb5bb8067..50cec1499 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -114,6 +114,7 @@ from temporalio.worker import ( ExecuteWorkflowInput, HandleSignalInput, + Replayer, UnsandboxedWorkflowRunner, Worker, WorkflowInstance, @@ -892,6 +893,98 @@ async def test_workflow_simple_local_activity(client: Client): assert result == "Hello, Temporal!" +@activity.defn +async def local_activity_slow(index: int) -> None: + if index % 2 == 0: + await asyncio.sleep(0.05) + + +@activity.defn +async def local_activity_fast(index: int) -> None: + return None + + +@activity.defn +async def local_activity_gate() -> None: + await asyncio.sleep(0.05) + + +@workflow.defn +class ConcurrentLocalActivityReplayWorkflow: + """Workflow that runs two concurrent coroutines with local activities. + + This reproduces a replay nondeterminism bug: during first execution, + local activities take real time, creating a deterministic interleaving. + During replay, all local activities return instantly from markers, which + can reorder coroutine scheduling and produce a different command sequence. + """ + + @workflow.run + async def run(self) -> list[int]: + async def lifecycle_a(index: int) -> int: + await workflow.execute_local_activity( + local_activity_slow, + args=[index * 2], + start_to_close_timeout=timedelta(seconds=5), + ) + await workflow.execute_local_activity( + local_activity_fast, + args=[index * 2], + start_to_close_timeout=timedelta(seconds=5), + ) + return index * 2 + + async def lifecycle_b(index: int) -> int: + await workflow.execute_local_activity( + local_activity_gate, + start_to_close_timeout=timedelta(seconds=5), + ) + await workflow.execute_local_activity( + local_activity_slow, + args=[index * 2 + 1], + start_to_close_timeout=timedelta(seconds=5), + ) + await workflow.execute_local_activity( + local_activity_fast, + args=[index * 2 + 1], + start_to_close_timeout=timedelta(seconds=5), + ) + return index * 2 + 1 + + results: list[int] = [] + for index in range(20): + results.extend(await asyncio.gather(lifecycle_a(index), lifecycle_b(index))) + return results + + +async def test_workflow_concurrent_local_activity_replay(client: Client): + """Test that concurrent local activities replay deterministically. + + Runs a workflow with two concurrent coroutines that each issue multiple + local activities, then replays the history. Without the fix, replay + fails with NondeterminismError because the local activity command order + diverges from the recorded marker order. + """ + async with new_worker( + client, + ConcurrentLocalActivityReplayWorkflow, + activities=[local_activity_slow, local_activity_fast, local_activity_gate], + ) as worker: + handle = await client.start_workflow( + ConcurrentLocalActivityReplayWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + expected = [v for i in range(20) for v in (i * 2, i * 2 + 1)] + assert await handle.result() == expected + + history = await handle.fetch_history() + + await Replayer( + workflows=[ConcurrentLocalActivityReplayWorkflow], + ).replay_workflow(history) + + @activity.defn async def wait_cancel() -> str: try: