Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/InProcessTestHost/InProcessTestHost.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<RootNamespace>Microsoft.DurableTask.Testing</RootNamespace>
<AssemblyName>Microsoft.DurableTask.InProcessTestHost</AssemblyName>
<PackageId>Microsoft.DurableTask.InProcessTestHost</PackageId>
<Version>0.2.0-preview.1</Version>
<Version>0.2.1-preview.1</Version>

<!-- Suppress CA1848: Use LoggerMessage delegates for high-performance logging scenarios -->
<NoWarn>$(NoWarn);CA1848</NoWarn>
Expand Down
9 changes: 3 additions & 6 deletions src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ public PurgeResult PurgeInstanceState(PurgeInstanceFilter purgeInstanceFilter)
class ReadyToRunQueue
{
readonly Channel<SerializedInstanceState> readyToRunQueue = Channel.CreateUnbounded<SerializedInstanceState>();
readonly Dictionary<string, object> readyInstances = new(StringComparer.OrdinalIgnoreCase);
readonly ConcurrentDictionary<string, SerializedInstanceState> readyInstances = new(StringComparer.OrdinalIgnoreCase);

public void Reset()
{
Expand All @@ -893,7 +893,7 @@ public async ValueTask<SerializedInstanceState> TakeNextAsync(CancellationToken
SerializedInstanceState state = await this.readyToRunQueue.Reader.ReadAsync(ct);
lock (state)
{
if (this.readyInstances.Remove(state.InstanceId))
if (this.readyInstances.TryRemove(state.InstanceId, out _))
{
if (state.IsLoaded)
{
Expand All @@ -909,12 +909,9 @@ public async ValueTask<SerializedInstanceState> TakeNextAsync(CancellationToken

public void Schedule(SerializedInstanceState state)
{
// TODO: There is a race condition here. If another thread is calling TakeNextAsync
// and removed the queue item before updating the dictionary, then we'll fail
// to update the readyToRunQueue and the orchestration will get stuck.
if (this.readyInstances.TryAdd(state.InstanceId, state))
{
if (!this.readyToRunQueue.Writer.TryWrite(state))
if (!this.readyToRunQueue.Writer.TryWrite(state))
{
throw new InvalidOperationException($"unable to write to queue for {state.InstanceId}");
}
Expand Down
104 changes: 104 additions & 0 deletions test/InProcessTestHost.Tests/ConcurrentTimerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Testing;
using Microsoft.DurableTask.Worker;
using Xunit;

namespace InProcessTestHost.Tests;

/// <summary>
/// Tests to verify that multiple orchestrations with identical timer FireAt timestamps
/// all complete correctly without any being dropped.
/// </summary>
public class ConcurrentTimerTests
{
[Fact]
// Test that multiple orchestrations with the same timer that fire at the same time
// can all complete correctly.
public async Task MultipleOrchestrations_WithSameTimerFireAt_AllComplete()
{
const int orchestrationCount = 10;
const string orchestratorName = "TimerOrchestrator";

await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks =>
{
tasks.AddOrchestratorFunc<DateTime, string>(orchestratorName, async (ctx, fireAt) =>
{
await ctx.CreateTimer(fireAt, CancellationToken.None);
return $"done:{ctx.InstanceId}";
});
});

DateTime sharedFireAt = DateTime.UtcNow.AddSeconds(5);

string[] instanceIds = new string[orchestrationCount];
for (int i = 0; i < orchestrationCount; i++)
{
instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync(
orchestratorName, sharedFireAt);
}

using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30));

Task<OrchestrationMetadata>[] waitTasks = instanceIds
.Select(id => host.Client.WaitForInstanceCompletionAsync(
id, getInputsAndOutputs: true, cts.Token))
.ToArray();

OrchestrationMetadata[] results = await Task.WhenAll(waitTasks);

for (int i = 0; i < orchestrationCount; i++)
{
Assert.NotNull(results[i]);
Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus);
string output = results[i].ReadOutputAs<string>()!;
Assert.Equal($"done:{instanceIds[i]}", output);
}
}

[Fact]
// Test that fan-out sub-orchestrations with timers that all fire at the same time
// can all complete correctly.
public async Task SubOrchestrations_WithIdenticalTimers_AllComplete()
{
const int subOrchestrationCount = 10;
const string parentName = "ParentOrchestrator";
const string childName = "ChildTimerOrchestrator";

await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks =>
{
tasks.AddOrchestratorFunc<int>(parentName, async ctx =>
{
DateTime sharedFireAt = ctx.CurrentUtcDateTime.AddSeconds(2);

// A parent orchestration will schedule 10 sub-orchestrations which has a timer
// fires at the same time.
Task<string>[] childTasks = Enumerable.Range(0, subOrchestrationCount)
.Select(i => ctx.CallSubOrchestratorAsync<string>(childName, sharedFireAt))
.ToArray();

string[] results = await Task.WhenAll(childTasks);
return results.Length;
});

tasks.AddOrchestratorFunc<DateTime, string>(childName, async (ctx, fireAt) =>
{
await ctx.CreateTimer(fireAt, CancellationToken.None);
return $"child-done:{ctx.InstanceId}";
});
});

string instanceId = await host.Client.ScheduleNewOrchestrationInstanceAsync(parentName);

using CancellationTokenSource cts = new(TimeSpan.FromSeconds(60));
OrchestrationMetadata metadata = await host.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, cts.Token);

Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
Assert.Equal(subOrchestrationCount, metadata.ReadOutputAs<int>());
}
}
Loading