Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,30 @@ def activate(
for index, job_set in enumerate(job_sets):
if not job_set:
continue
# Separate local-activity resolutions from other jobs.
# Local activity results may arrive in non-deterministic
# order (completion order varies between first-execution
# and replay). Sort them by sequence number so coroutines
# always resume in scheduling order, ensuring deterministic
# interleaving regardless of execution mode.
local_activity_jobs = []
other_jobs = []
for job in job_set:
# Let errors bubble out of these to the caller to fail the task
if (
job.HasField("resolve_activity")
and job.resolve_activity.is_local
):
local_activity_jobs.append(job)
else:
other_jobs.append(job)

# Apply non-local-activity jobs first
for job in other_jobs:
self._apply(job)

# Apply local activity resolutions sorted by seq number.
local_activity_jobs.sort(key=lambda j: j.resolve_activity.seq)
for job in local_activity_jobs:
self._apply(job)

# Run one iteration of the loop. We do not allow conditions to
Expand Down
93 changes: 93 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
from temporalio.worker import (
ExecuteWorkflowInput,
HandleSignalInput,
Replayer,
UnsandboxedWorkflowRunner,
Worker,
WorkflowInstance,
Expand Down Expand Up @@ -892,6 +893,98 @@ async def test_workflow_simple_local_activity(client: Client):
assert result == "Hello, Temporal!"


@activity.defn
async def local_activity_slow(index: int) -> None:
if index % 2 == 0:
await asyncio.sleep(0.05)


@activity.defn
async def local_activity_fast(index: int) -> None:
return None


@activity.defn
async def local_activity_gate() -> None:
await asyncio.sleep(0.05)


@workflow.defn
class ConcurrentLocalActivityReplayWorkflow:
"""Workflow that runs two concurrent coroutines with local activities.
This reproduces a replay nondeterminism bug: during first execution,
local activities take real time, creating a deterministic interleaving.
During replay, all local activities return instantly from markers, which
can reorder coroutine scheduling and produce a different command sequence.
"""

@workflow.run
async def run(self) -> list[int]:
async def lifecycle_a(index: int) -> int:
await workflow.execute_local_activity(
local_activity_slow,
args=[index * 2],
start_to_close_timeout=timedelta(seconds=5),
)
await workflow.execute_local_activity(
local_activity_fast,
args=[index * 2],
start_to_close_timeout=timedelta(seconds=5),
)
return index * 2

async def lifecycle_b(index: int) -> int:
await workflow.execute_local_activity(
local_activity_gate,
start_to_close_timeout=timedelta(seconds=5),
)
await workflow.execute_local_activity(
local_activity_slow,
args=[index * 2 + 1],
start_to_close_timeout=timedelta(seconds=5),
)
await workflow.execute_local_activity(
local_activity_fast,
args=[index * 2 + 1],
start_to_close_timeout=timedelta(seconds=5),
)
return index * 2 + 1

results: list[int] = []
for index in range(20):
results.extend(await asyncio.gather(lifecycle_a(index), lifecycle_b(index)))
return results


async def test_workflow_concurrent_local_activity_replay(client: Client):
"""Test that concurrent local activities replay deterministically.
Runs a workflow with two concurrent coroutines that each issue multiple
local activities, then replays the history. Without the fix, replay
fails with NondeterminismError because the local activity command order
diverges from the recorded marker order.
"""
async with new_worker(
client,
ConcurrentLocalActivityReplayWorkflow,
activities=[local_activity_slow, local_activity_fast, local_activity_gate],
) as worker:
handle = await client.start_workflow(
ConcurrentLocalActivityReplayWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
expected = [v for i in range(20) for v in (i * 2, i * 2 + 1)]
assert await handle.result() == expected

history = await handle.fetch_history()

await Replayer(
workflows=[ConcurrentLocalActivityReplayWorkflow],
).replay_workflow(history)


@activity.defn
async def wait_cancel() -> str:
try:
Expand Down