From 4f25eb486e1bd3502239e500cce538f7e30c2ea1 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Tue, 17 Mar 2026 19:38:27 -0700 Subject: [PATCH 1/4] initial commit --- src/InProcessTestHost/InProcessTestHost.csproj | 2 +- .../Sidecar/InMemoryOrchestrationService.cs | 9 +++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/InProcessTestHost/InProcessTestHost.csproj b/src/InProcessTestHost/InProcessTestHost.csproj index 239338158..9cdc750c0 100644 --- a/src/InProcessTestHost/InProcessTestHost.csproj +++ b/src/InProcessTestHost/InProcessTestHost.csproj @@ -5,7 +5,7 @@ Microsoft.DurableTask.Testing Microsoft.DurableTask.InProcessTestHost Microsoft.DurableTask.InProcessTestHost - 0.2.0-preview.1 + 0.2.1-preview.1 $(NoWarn);CA1848 diff --git a/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs b/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs index 40691bd91..a4dd987e9 100644 --- a/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs +++ b/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs @@ -879,7 +879,7 @@ public PurgeResult PurgeInstanceState(PurgeInstanceFilter purgeInstanceFilter) class ReadyToRunQueue { readonly Channel readyToRunQueue = Channel.CreateUnbounded(); - readonly Dictionary readyInstances = new(StringComparer.OrdinalIgnoreCase); + readonly ConcurrentDictionary readyInstances = new(StringComparer.OrdinalIgnoreCase); public void Reset() { @@ -893,7 +893,7 @@ public async ValueTask TakeNextAsync(CancellationToken SerializedInstanceState state = await this.readyToRunQueue.Reader.ReadAsync(ct); lock (state) { - if (this.readyInstances.Remove(state.InstanceId)) + if (this.readyInstances.TryRemove(state.InstanceId, out _)) { if (state.IsLoaded) { @@ -909,12 +909,9 @@ public async ValueTask TakeNextAsync(CancellationToken public void Schedule(SerializedInstanceState state) { - // TODO: There is a race condition here. If another thread is calling TakeNextAsync - // and removed the queue item before updating the dictionary, then we'll fail - // to update the readyToRunQueue and the orchestration will get stuck. if (this.readyInstances.TryAdd(state.InstanceId, state)) { - if (!this.readyToRunQueue.Writer.TryWrite(state)) + if (!this.readyToRunQueue.Writer.TryWrite(state)) { throw new InvalidOperationException($"unable to write to queue for {state.InstanceId}"); } From 45cf7e0eab80d3e13200c8ae06ab7ed3d728527a Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Tue, 17 Mar 2026 19:48:42 -0700 Subject: [PATCH 2/4] add test --- .../ConcurrentTimerTests.cs | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 test/InProcessTestHost.Tests/ConcurrentTimerTests.cs diff --git a/test/InProcessTestHost.Tests/ConcurrentTimerTests.cs b/test/InProcessTestHost.Tests/ConcurrentTimerTests.cs new file mode 100644 index 000000000..12fbc4488 --- /dev/null +++ b/test/InProcessTestHost.Tests/ConcurrentTimerTests.cs @@ -0,0 +1,104 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Testing; +using Microsoft.DurableTask.Worker; +using Xunit; + +namespace InProcessTestHost.Tests; + +/// +/// Tests to verify that multiple orchestrations with identical timer FireAt timestamps +/// all complete correctly without any being dropped. +/// +public class ConcurrentTimerTests +{ + [Fact] + // Test that multi orchestrations with the same timer that fire at the same time + // can all complete correctly. + public async Task MultipleOrchestrations_WithSameTimerFireAt_AllComplete() + { + const int orchestrationCount = 10; + const string orchestratorName = "TimerOrchestrator"; + + await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks => + { + tasks.AddOrchestratorFunc(orchestratorName, async (ctx, fireAt) => + { + await ctx.CreateTimer(fireAt, CancellationToken.None); + return $"done:{ctx.InstanceId}"; + }); + }); + + DateTime sharedFireAt = DateTime.UtcNow.AddSeconds(10); + + string[] instanceIds = new string[orchestrationCount]; + for (int i = 0; i < orchestrationCount; i++) + { + instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync( + orchestratorName, sharedFireAt); + } + + using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30)); + + Task[] waitTasks = instanceIds + .Select(id => host.Client.WaitForInstanceCompletionAsync( + id, getInputsAndOutputs: true, cts.Token)) + .ToArray(); + + OrchestrationMetadata[] results = await Task.WhenAll(waitTasks); + + for (int i = 0; i < orchestrationCount; i++) + { + Assert.NotNull(results[i]); + Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus); + string output = results[i].ReadOutputAs()!; + Assert.Equal($"done:{instanceIds[i]}", output); + } + } + + [Fact] + // Test that fan-out sub-orchestrations with the same timer fire at time + // can all complete correctly. + public async Task SubOrchestrations_WithIdenticalTimers_AllComplete() + { + const int subOrchestrationCount = 10; + const string parentName = "ParentOrchestrator"; + const string childName = "ChildTimerOrchestrator"; + + await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks => + { + tasks.AddOrchestratorFunc(parentName, async ctx => + { + DateTime sharedFireAt = ctx.CurrentUtcDateTime.AddSeconds(2); + + // A parent orchestration will schedule 10 sub-orchestrations which has a timer + // fires at the same time. + Task[] childTasks = Enumerable.Range(0, subOrchestrationCount) + .Select(i => ctx.CallSubOrchestratorAsync(childName, sharedFireAt)) + .ToArray(); + + string[] results = await Task.WhenAll(childTasks); + return results.Length; + }); + + tasks.AddOrchestratorFunc(childName, async (ctx, fireAt) => + { + await ctx.CreateTimer(fireAt, CancellationToken.None); + return $"child-done:{ctx.InstanceId}"; + }); + }); + + string instanceId = await host.Client.ScheduleNewOrchestrationInstanceAsync(parentName); + + using CancellationTokenSource cts = new(TimeSpan.FromSeconds(60)); + OrchestrationMetadata metadata = await host.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, cts.Token); + + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(subOrchestrationCount, metadata.ReadOutputAs()); + } +} From 5290fe50985ea26125cde011b19be6bda6b8cba2 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Tue, 17 Mar 2026 19:52:31 -0700 Subject: [PATCH 3/4] address copilot comment --- src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs b/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs index a4dd987e9..17d69c5e3 100644 --- a/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs +++ b/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs @@ -879,7 +879,7 @@ public PurgeResult PurgeInstanceState(PurgeInstanceFilter purgeInstanceFilter) class ReadyToRunQueue { readonly Channel readyToRunQueue = Channel.CreateUnbounded(); - readonly ConcurrentDictionary readyInstances = new(StringComparer.OrdinalIgnoreCase); + readonly ConcurrentDictionary readyInstances = new(StringComparer.OrdinalIgnoreCase); public void Reset() { From 9e942b2adee6f73561c9618ce89e59d682aa471a Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Tue, 17 Mar 2026 20:37:55 -0700 Subject: [PATCH 4/4] address copilot comment --- test/InProcessTestHost.Tests/ConcurrentTimerTests.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/InProcessTestHost.Tests/ConcurrentTimerTests.cs b/test/InProcessTestHost.Tests/ConcurrentTimerTests.cs index 12fbc4488..75a7ef3e3 100644 --- a/test/InProcessTestHost.Tests/ConcurrentTimerTests.cs +++ b/test/InProcessTestHost.Tests/ConcurrentTimerTests.cs @@ -16,7 +16,7 @@ namespace InProcessTestHost.Tests; public class ConcurrentTimerTests { [Fact] - // Test that multi orchestrations with the same timer that fire at the same time + // Test that multiple orchestrations with the same timer that fire at the same time // can all complete correctly. public async Task MultipleOrchestrations_WithSameTimerFireAt_AllComplete() { @@ -32,7 +32,7 @@ public async Task MultipleOrchestrations_WithSameTimerFireAt_AllComplete() }); }); - DateTime sharedFireAt = DateTime.UtcNow.AddSeconds(10); + DateTime sharedFireAt = DateTime.UtcNow.AddSeconds(5); string[] instanceIds = new string[orchestrationCount]; for (int i = 0; i < orchestrationCount; i++) @@ -60,7 +60,7 @@ public async Task MultipleOrchestrations_WithSameTimerFireAt_AllComplete() } [Fact] - // Test that fan-out sub-orchestrations with the same timer fire at time + // Test that fan-out sub-orchestrations with timers that all fire at the same time // can all complete correctly. public async Task SubOrchestrations_WithIdenticalTimers_AllComplete() {