-
Notifications
You must be signed in to change notification settings - Fork 47
Expand file tree
/
Copy pathTestQueueTestFixture.cs
More file actions
103 lines (86 loc) · 3.12 KB
/
TestQueueTestFixture.cs
File metadata and controls
103 lines (86 loc) · 3.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
namespace Gofer.NET.Tests
{
public class TaskQueueTestFixture
{
private static readonly ReaderWriterLock Locker = new ReaderWriterLock();
public static string SemaphoreText => "completed";
public TaskQueue TaskQueue { get; }
public static string RedisConnectionString => "localhost:6379";
private readonly string _semaphoreFile;
public static TaskQueue UniqueRedisTaskQueue(string prefix = null)
{
var taskQueueName = $"{prefix ?? nameof(TaskQueueTestFixture)}::{Guid.NewGuid().ToString()}";
return TaskQueue.Redis(RedisConnectionString, taskQueueName);
}
public TaskQueueTestFixture(string uniqueId, TaskQueue taskQueue = null)
{
_semaphoreFile = Path.Combine(AppContext.BaseDirectory, uniqueId, Path.GetTempFileName());
var testQueueName = uniqueId + "::TestQueue";
TaskQueue = taskQueue ?? TaskQueueTestFixture.UniqueRedisTaskQueue(uniqueId);
// Clear out the queue
while (TaskQueue.Dequeue().Result != null) { }
}
public async Task PushPopExecuteWriteSemaphore()
{
await TaskQueue.Enqueue(() => WriteSemaphore(_semaphoreFile));
var dequeuedTaskInfo = await TaskQueue.Dequeue();
await dequeuedTaskInfo.ExecuteTask();
}
public void EnsureSemaphoreDoesntExist()
{
File.Delete(_semaphoreFile);
File.Exists(_semaphoreFile).Should().Be(false);
}
public void EnsureSemaphore()
{
EnsureSemaphore(_semaphoreFile);
}
public static void EnsureSemaphore(string semaphoreFile)
{
try
{
Locker.AcquireReaderLock(30000);
File.ReadAllText(semaphoreFile).Should().Be(SemaphoreText);
}
finally
{
Locker.ReleaseReaderLock();
}
}
public static void WriteSemaphore(string semaphoreFile)
{
WriteSemaphoreValue(semaphoreFile, SemaphoreText);
}
public static async Task WaitForTaskClientCancellationAndWriteSemaphore(string semaphoreFile, TimeSpan timeout)
{
var token = TaskClient.GetListenCancellation();
if (!token.CanBeCanceled)
throw new InvalidOperationException("This method must be called from a task client callback");
try
{
await Task.Delay(timeout, token);
}
catch (OperationCanceledException)
{
}
WriteSemaphore(semaphoreFile);
}
public static void WriteSemaphoreValue(string semaphoreFile, object value)
{
try
{
Locker.AcquireWriterLock(30000);
File.AppendAllText(semaphoreFile, value?.ToString() ?? "null");
}
finally
{
Locker.ReleaseWriterLock();
}
}
}
}