-
-
Notifications
You must be signed in to change notification settings - Fork 96
Expand file tree
/
Copy pathSendAndReceive.cs
More file actions
84 lines (70 loc) · 3.25 KB
/
SendAndReceive.cs
File metadata and controls
84 lines (70 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
using Eventuous.Azure.ServiceBus.Producers;
using Eventuous.Azure.ServiceBus.Subscriptions;
using Eventuous.Producers;
namespace Eventuous.Tests.Azure.ServiceBus;
[NotInParallel]
[TopicAndQueueSource]
public class SendAndReceive {
static CancellationToken TestCancellationToken => TestContext.Current!.CancellationToken;
ServiceBusProducer _producer = null!;
ServiceBusSubscription _subscription = null!;
readonly string _correlationId;
readonly Metadata _metadata;
readonly TestEventHandler _handler = new();
readonly AzureServiceBusFixture _fixture;
readonly StreamName _streamName;
readonly ServiceBusProducerOptions _serviceBusProducerOptions;
readonly ServiceBusSubscriptionOptions _serviceBusSubscriptionOptions;
public SendAndReceive(AzureServiceBusFixture fixture, ServiceBusProducerOptions producerOptions, ServiceBusSubscriptionOptions subscriptionOptions) {
_streamName = new(producerOptions.QueueOrTopicName);
_correlationId = Guid.NewGuid().ToString();
_metadata = new Metadata().With(MetaTags.CorrelationId, _correlationId);
_serviceBusProducerOptions = producerOptions;
_serviceBusSubscriptionOptions = subscriptionOptions;
_fixture = fixture;
}
[Test]
[Retry(3)]
public async Task SingleMessage() {
await _producer.Produce(_streamName, SomeEvent.Create(), _metadata, cancellationToken: TestCancellationToken);
// Assert
await _handler.AssertThat()
.Timebox(TimeSpan.FromSeconds(5))
.Single()
.Match(evt => evt is SomeEvent)
.Validate(TestCancellationToken);
}
[Test]
[Retry(3)]
public async Task LoadsOfMessages() {
const int count = 200;
var events = Enumerable.Range(0, count).Select(SomeEvent.Create).ToList();
await _producer.Produce(_streamName, events, _metadata, cancellationToken: TestCancellationToken);
// Assert
await _handler.AssertThat()
.Timebox(TimeSpan.FromSeconds(20))
.Exactly(count)
.Match(evt => evt is SomeEvent)
.Validate(TestCancellationToken);
var handledMessageIds = _handler.Messages
.OfType<SomeEvent>()
.Select(m => m.Id)
.Order()
.ToList();
await Assert.That(handledMessageIds).IsEquivalentTo(events.Select(e => e.Id));
}
[After(Test)]
public async ValueTask CleanUpProducerAndSubscription() {
await _producer.StopAsync(TestCancellationToken);
await _subscription.Unsubscribe(_ => { }, TestCancellationToken);
await _subscription.DisposeAsync();
await _producer.DisposeAsync();
}
[Before(Test)]
public async Task StartProducerAndSubscription() {
_producer = _fixture.CreateProducer(_serviceBusProducerOptions);
_subscription = _fixture.CreateSubscription(_serviceBusSubscriptionOptions, _handler, _correlationId);
await _producer.StartAsync(TestCancellationToken);
await _subscription.Subscribe(_ => { }, (_, _, _) => { }, TestCancellationToken);
}
}