-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathConsumerAsyncBase.cs
More file actions
119 lines (107 loc) · 4.3 KB
/
ConsumerAsyncBase.cs
File metadata and controls
119 lines (107 loc) · 4.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client.Core.Options;
using RabbitMQ.Client.Events;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace RabbitMQ.Client.Core.Abstractions
{
///<summary>
/// Rabbit MQ Async Consumer Abstraction Wrapper
///</summary>
public abstract class ConsumerAsyncBase<TConsumer> : BackgroundService where TConsumer : class
{
protected ConnectionFactory ConnectionFactory;
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly QueueOptions<TConsumer> _queueOptions;
private readonly ILogger<TConsumer> _logger;
protected string Id;
public ConsumerAsyncBase(
IOptions<RabbitMQConnectionOptions> connectionOptions,
IOptions<QueueOptions<TConsumer>> queueOptions,
ILogger<TConsumer> logger)
{
ConnectionFactory = new ConnectionFactory
{
HostName = connectionOptions.Value.HostName,
Port = connectionOptions.Value.Port,
UserName = connectionOptions.Value.UserName,
Password = connectionOptions.Value.Password,
VirtualHost = connectionOptions.Value.VirtualHost,
DispatchConsumersAsync = true
};
_connection = ConnectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_queueOptions = queueOptions.Value;
Id = $"{Environment.UserName}-{typeof(TConsumer).Name}@{Guid.NewGuid()}";
_logger = logger;
Setup();
}
protected void Setup()
{
_logger.LogDebug($"Declaring queue {_queueOptions.QueueName}");
_channel.QueueDeclare(
queue: _queueOptions.QueueName,
durable: _queueOptions.Durable,
exclusive: _queueOptions.Exclusive,
autoDelete: _queueOptions.AutoDelete
);
_logger.LogDebug($"Queue declared and channel is open? {_channel.IsOpen}");
SetupExchangeBindings(_channel, _queueOptions);
}
///<summary>
/// Initialize Queue consumption
///</summary>
public void Initialize()
{
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += OnReceived;
consumer.Registered += ConsumerRegistered;
consumer.Unregistered += ConsumerUnregistered;
_channel.BasicConsume(
_queueOptions.QueueName,
_queueOptions.AutoAck,
consumer
);
_logger.LogDebug($"Consumer {Id} intialized!");
}
///<summary>
/// Execute consumption and keeps consumer attached to queue
///</summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Initialize();
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMilliseconds(1));
}
}
private Task ConsumerUnregistered(object sender, ConsumerEventArgs e)
{
_logger.LogDebug($"Unregister {Id} -> {e.ConsumerTags.Aggregate((a, b) => $"{a};{b}")}");
return Task.CompletedTask;
}
private Task ConsumerRegistered(object sender, ConsumerEventArgs e)
{
_logger.LogDebug($"Register {Id} -> {e.ConsumerTags.Aggregate((a, b) => $"{a};{b}")}");
return Task.CompletedTask;
}
protected void SetupExchangeBindings(IModel channel, QueueOptions<TConsumer> queueOptions)
{
foreach (var exchangeToBind in queueOptions.Bindings)
{
_logger.LogDebug($"Binding {queueOptions.QueueName} to exchange {exchangeToBind.Exchange} with routing key {exchangeToBind.RoutingKey}");
_channel.QueueBind(
exchange: exchangeToBind.Exchange,
queue: queueOptions.QueueName,
routingKey: exchangeToBind.RoutingKey
);
}
}
public abstract Task OnReceived(object sender, BasicDeliverEventArgs e);
}
}