diff --git a/src/InProcessTestHost/InProcessTestHost.csproj b/src/InProcessTestHost/InProcessTestHost.csproj
index 239338158..9cdc750c0 100644
--- a/src/InProcessTestHost/InProcessTestHost.csproj
+++ b/src/InProcessTestHost/InProcessTestHost.csproj
@@ -5,7 +5,7 @@
Microsoft.DurableTask.Testing
Microsoft.DurableTask.InProcessTestHost
Microsoft.DurableTask.InProcessTestHost
- 0.2.0-preview.1
+ 0.2.1-preview.1
$(NoWarn);CA1848
diff --git a/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs b/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs
index 40691bd91..17d69c5e3 100644
--- a/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs
+++ b/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs
@@ -879,7 +879,7 @@ public PurgeResult PurgeInstanceState(PurgeInstanceFilter purgeInstanceFilter)
class ReadyToRunQueue
{
readonly Channel readyToRunQueue = Channel.CreateUnbounded();
- readonly Dictionary readyInstances = new(StringComparer.OrdinalIgnoreCase);
+ readonly ConcurrentDictionary readyInstances = new(StringComparer.OrdinalIgnoreCase);
public void Reset()
{
@@ -893,7 +893,7 @@ public async ValueTask TakeNextAsync(CancellationToken
SerializedInstanceState state = await this.readyToRunQueue.Reader.ReadAsync(ct);
lock (state)
{
- if (this.readyInstances.Remove(state.InstanceId))
+ if (this.readyInstances.TryRemove(state.InstanceId, out _))
{
if (state.IsLoaded)
{
@@ -909,12 +909,9 @@ public async ValueTask TakeNextAsync(CancellationToken
public void Schedule(SerializedInstanceState state)
{
- // TODO: There is a race condition here. If another thread is calling TakeNextAsync
- // and removed the queue item before updating the dictionary, then we'll fail
- // to update the readyToRunQueue and the orchestration will get stuck.
if (this.readyInstances.TryAdd(state.InstanceId, state))
{
- if (!this.readyToRunQueue.Writer.TryWrite(state))
+ if (!this.readyToRunQueue.Writer.TryWrite(state))
{
throw new InvalidOperationException($"unable to write to queue for {state.InstanceId}");
}
diff --git a/test/InProcessTestHost.Tests/ConcurrentTimerTests.cs b/test/InProcessTestHost.Tests/ConcurrentTimerTests.cs
new file mode 100644
index 000000000..75a7ef3e3
--- /dev/null
+++ b/test/InProcessTestHost.Tests/ConcurrentTimerTests.cs
@@ -0,0 +1,104 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using Microsoft.DurableTask;
+using Microsoft.DurableTask.Client;
+using Microsoft.DurableTask.Testing;
+using Microsoft.DurableTask.Worker;
+using Xunit;
+
+namespace InProcessTestHost.Tests;
+
+///
+/// Tests to verify that multiple orchestrations with identical timer FireAt timestamps
+/// all complete correctly without any being dropped.
+///
+public class ConcurrentTimerTests
+{
+ [Fact]
+ // Test that multiple orchestrations with the same timer that fire at the same time
+ // can all complete correctly.
+ public async Task MultipleOrchestrations_WithSameTimerFireAt_AllComplete()
+ {
+ const int orchestrationCount = 10;
+ const string orchestratorName = "TimerOrchestrator";
+
+ await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks =>
+ {
+ tasks.AddOrchestratorFunc(orchestratorName, async (ctx, fireAt) =>
+ {
+ await ctx.CreateTimer(fireAt, CancellationToken.None);
+ return $"done:{ctx.InstanceId}";
+ });
+ });
+
+ DateTime sharedFireAt = DateTime.UtcNow.AddSeconds(5);
+
+ string[] instanceIds = new string[orchestrationCount];
+ for (int i = 0; i < orchestrationCount; i++)
+ {
+ instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync(
+ orchestratorName, sharedFireAt);
+ }
+
+ using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30));
+
+ Task[] waitTasks = instanceIds
+ .Select(id => host.Client.WaitForInstanceCompletionAsync(
+ id, getInputsAndOutputs: true, cts.Token))
+ .ToArray();
+
+ OrchestrationMetadata[] results = await Task.WhenAll(waitTasks);
+
+ for (int i = 0; i < orchestrationCount; i++)
+ {
+ Assert.NotNull(results[i]);
+ Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus);
+ string output = results[i].ReadOutputAs()!;
+ Assert.Equal($"done:{instanceIds[i]}", output);
+ }
+ }
+
+ [Fact]
+ // Test that fan-out sub-orchestrations with timers that all fire at the same time
+ // can all complete correctly.
+ public async Task SubOrchestrations_WithIdenticalTimers_AllComplete()
+ {
+ const int subOrchestrationCount = 10;
+ const string parentName = "ParentOrchestrator";
+ const string childName = "ChildTimerOrchestrator";
+
+ await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks =>
+ {
+ tasks.AddOrchestratorFunc(parentName, async ctx =>
+ {
+ DateTime sharedFireAt = ctx.CurrentUtcDateTime.AddSeconds(2);
+
+ // A parent orchestration will schedule 10 sub-orchestrations which has a timer
+ // fires at the same time.
+ Task[] childTasks = Enumerable.Range(0, subOrchestrationCount)
+ .Select(i => ctx.CallSubOrchestratorAsync(childName, sharedFireAt))
+ .ToArray();
+
+ string[] results = await Task.WhenAll(childTasks);
+ return results.Length;
+ });
+
+ tasks.AddOrchestratorFunc(childName, async (ctx, fireAt) =>
+ {
+ await ctx.CreateTimer(fireAt, CancellationToken.None);
+ return $"child-done:{ctx.InstanceId}";
+ });
+ });
+
+ string instanceId = await host.Client.ScheduleNewOrchestrationInstanceAsync(parentName);
+
+ using CancellationTokenSource cts = new(TimeSpan.FromSeconds(60));
+ OrchestrationMetadata metadata = await host.Client.WaitForInstanceCompletionAsync(
+ instanceId, getInputsAndOutputs: true, cts.Token);
+
+ Assert.NotNull(metadata);
+ Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
+ Assert.Equal(subOrchestrationCount, metadata.ReadOutputAs());
+ }
+}