diff --git a/src/InProcessTestHost/InProcessTestHost.csproj b/src/InProcessTestHost/InProcessTestHost.csproj index 239338158..9cdc750c0 100644 --- a/src/InProcessTestHost/InProcessTestHost.csproj +++ b/src/InProcessTestHost/InProcessTestHost.csproj @@ -5,7 +5,7 @@ Microsoft.DurableTask.Testing Microsoft.DurableTask.InProcessTestHost Microsoft.DurableTask.InProcessTestHost - 0.2.0-preview.1 + 0.2.1-preview.1 $(NoWarn);CA1848 diff --git a/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs b/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs index 40691bd91..17d69c5e3 100644 --- a/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs +++ b/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs @@ -879,7 +879,7 @@ public PurgeResult PurgeInstanceState(PurgeInstanceFilter purgeInstanceFilter) class ReadyToRunQueue { readonly Channel readyToRunQueue = Channel.CreateUnbounded(); - readonly Dictionary readyInstances = new(StringComparer.OrdinalIgnoreCase); + readonly ConcurrentDictionary readyInstances = new(StringComparer.OrdinalIgnoreCase); public void Reset() { @@ -893,7 +893,7 @@ public async ValueTask TakeNextAsync(CancellationToken SerializedInstanceState state = await this.readyToRunQueue.Reader.ReadAsync(ct); lock (state) { - if (this.readyInstances.Remove(state.InstanceId)) + if (this.readyInstances.TryRemove(state.InstanceId, out _)) { if (state.IsLoaded) { @@ -909,12 +909,9 @@ public async ValueTask TakeNextAsync(CancellationToken public void Schedule(SerializedInstanceState state) { - // TODO: There is a race condition here. If another thread is calling TakeNextAsync - // and removed the queue item before updating the dictionary, then we'll fail - // to update the readyToRunQueue and the orchestration will get stuck. if (this.readyInstances.TryAdd(state.InstanceId, state)) { - if (!this.readyToRunQueue.Writer.TryWrite(state)) + if (!this.readyToRunQueue.Writer.TryWrite(state)) { throw new InvalidOperationException($"unable to write to queue for {state.InstanceId}"); } diff --git a/test/InProcessTestHost.Tests/ConcurrentTimerTests.cs b/test/InProcessTestHost.Tests/ConcurrentTimerTests.cs new file mode 100644 index 000000000..75a7ef3e3 --- /dev/null +++ b/test/InProcessTestHost.Tests/ConcurrentTimerTests.cs @@ -0,0 +1,104 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Testing; +using Microsoft.DurableTask.Worker; +using Xunit; + +namespace InProcessTestHost.Tests; + +/// +/// Tests to verify that multiple orchestrations with identical timer FireAt timestamps +/// all complete correctly without any being dropped. +/// +public class ConcurrentTimerTests +{ + [Fact] + // Test that multiple orchestrations with the same timer that fire at the same time + // can all complete correctly. + public async Task MultipleOrchestrations_WithSameTimerFireAt_AllComplete() + { + const int orchestrationCount = 10; + const string orchestratorName = "TimerOrchestrator"; + + await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks => + { + tasks.AddOrchestratorFunc(orchestratorName, async (ctx, fireAt) => + { + await ctx.CreateTimer(fireAt, CancellationToken.None); + return $"done:{ctx.InstanceId}"; + }); + }); + + DateTime sharedFireAt = DateTime.UtcNow.AddSeconds(5); + + string[] instanceIds = new string[orchestrationCount]; + for (int i = 0; i < orchestrationCount; i++) + { + instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync( + orchestratorName, sharedFireAt); + } + + using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30)); + + Task[] waitTasks = instanceIds + .Select(id => host.Client.WaitForInstanceCompletionAsync( + id, getInputsAndOutputs: true, cts.Token)) + .ToArray(); + + OrchestrationMetadata[] results = await Task.WhenAll(waitTasks); + + for (int i = 0; i < orchestrationCount; i++) + { + Assert.NotNull(results[i]); + Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus); + string output = results[i].ReadOutputAs()!; + Assert.Equal($"done:{instanceIds[i]}", output); + } + } + + [Fact] + // Test that fan-out sub-orchestrations with timers that all fire at the same time + // can all complete correctly. + public async Task SubOrchestrations_WithIdenticalTimers_AllComplete() + { + const int subOrchestrationCount = 10; + const string parentName = "ParentOrchestrator"; + const string childName = "ChildTimerOrchestrator"; + + await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks => + { + tasks.AddOrchestratorFunc(parentName, async ctx => + { + DateTime sharedFireAt = ctx.CurrentUtcDateTime.AddSeconds(2); + + // A parent orchestration will schedule 10 sub-orchestrations which has a timer + // fires at the same time. + Task[] childTasks = Enumerable.Range(0, subOrchestrationCount) + .Select(i => ctx.CallSubOrchestratorAsync(childName, sharedFireAt)) + .ToArray(); + + string[] results = await Task.WhenAll(childTasks); + return results.Length; + }); + + tasks.AddOrchestratorFunc(childName, async (ctx, fireAt) => + { + await ctx.CreateTimer(fireAt, CancellationToken.None); + return $"child-done:{ctx.InstanceId}"; + }); + }); + + string instanceId = await host.Client.ScheduleNewOrchestrationInstanceAsync(parentName); + + using CancellationTokenSource cts = new(TimeSpan.FromSeconds(60)); + OrchestrationMetadata metadata = await host.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, cts.Token); + + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(subOrchestrationCount, metadata.ReadOutputAs()); + } +}