-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathConcurrentTimerTests.cs
More file actions
104 lines (86 loc) · 4 KB
/
ConcurrentTimerTests.cs
File metadata and controls
104 lines (86 loc) · 4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Testing;
using Microsoft.DurableTask.Worker;
using Xunit;
namespace InProcessTestHost.Tests;
/// <summary>
/// Tests to verify that multiple orchestrations with identical timer FireAt timestamps
/// all complete correctly without any being dropped.
/// </summary>
public class ConcurrentTimerTests
{
[Fact]
// Test that multiple orchestrations with the same timer that fire at the same time
// can all complete correctly.
public async Task MultipleOrchestrations_WithSameTimerFireAt_AllComplete()
{
const int orchestrationCount = 10;
const string orchestratorName = "TimerOrchestrator";
await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks =>
{
tasks.AddOrchestratorFunc<DateTime, string>(orchestratorName, async (ctx, fireAt) =>
{
await ctx.CreateTimer(fireAt, CancellationToken.None);
return $"done:{ctx.InstanceId}";
});
});
DateTime sharedFireAt = DateTime.UtcNow.AddSeconds(5);
string[] instanceIds = new string[orchestrationCount];
for (int i = 0; i < orchestrationCount; i++)
{
instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync(
orchestratorName, sharedFireAt);
}
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30));
Task<OrchestrationMetadata>[] waitTasks = instanceIds
.Select(id => host.Client.WaitForInstanceCompletionAsync(
id, getInputsAndOutputs: true, cts.Token))
.ToArray();
OrchestrationMetadata[] results = await Task.WhenAll(waitTasks);
for (int i = 0; i < orchestrationCount; i++)
{
Assert.NotNull(results[i]);
Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus);
string output = results[i].ReadOutputAs<string>()!;
Assert.Equal($"done:{instanceIds[i]}", output);
}
}
[Fact]
// Test that fan-out sub-orchestrations with timers that all fire at the same time
// can all complete correctly.
public async Task SubOrchestrations_WithIdenticalTimers_AllComplete()
{
const int subOrchestrationCount = 10;
const string parentName = "ParentOrchestrator";
const string childName = "ChildTimerOrchestrator";
await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks =>
{
tasks.AddOrchestratorFunc<int>(parentName, async ctx =>
{
DateTime sharedFireAt = ctx.CurrentUtcDateTime.AddSeconds(2);
// A parent orchestration will schedule 10 sub-orchestrations which has a timer
// fires at the same time.
Task<string>[] childTasks = Enumerable.Range(0, subOrchestrationCount)
.Select(i => ctx.CallSubOrchestratorAsync<string>(childName, sharedFireAt))
.ToArray();
string[] results = await Task.WhenAll(childTasks);
return results.Length;
});
tasks.AddOrchestratorFunc<DateTime, string>(childName, async (ctx, fireAt) =>
{
await ctx.CreateTimer(fireAt, CancellationToken.None);
return $"child-done:{ctx.InstanceId}";
});
});
string instanceId = await host.Client.ScheduleNewOrchestrationInstanceAsync(parentName);
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(60));
OrchestrationMetadata metadata = await host.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, cts.Token);
Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
Assert.Equal(subOrchestrationCount, metadata.ReadOutputAs<int>());
}
}