Skip to content

Commit 4182dd8

Browse files
authored
Merge pull request #1658 from bcgov/dev
Dev
2 parents 0db3f61 + 0a76b2b commit 4182dd8

11 files changed

Lines changed: 527 additions & 239 deletions

File tree

Lines changed: 87 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,123 @@
1-
21
using Microsoft.Extensions.Logging;
32
using RabbitMQ.Client;
43
using System;
4+
using System.Collections.Concurrent;
5+
using System.Threading;
56
using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces;
67

78
namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ
89
{
9-
public sealed class ChannelProvider : IChannelProvider
10+
public sealed class ChannelProvider : IChannelProvider, IDisposable
1011
{
1112
private readonly IConnectionProvider _connectionProvider;
1213
private readonly ILogger<ChannelProvider> _logger;
13-
private IModel? _model;
14+
private readonly int _maxChannels;
15+
private readonly ConcurrentQueue<IModel> _channelPool = new();
16+
private int _currentChannelCount;
17+
private bool _disposed;
1418

15-
public ChannelProvider(
16-
IConnectionProvider connectionProvider,
17-
ILogger<ChannelProvider> logger)
19+
public ChannelProvider(IConnectionProvider connectionProvider, ILogger<ChannelProvider> logger, int maxChannels = 10000)
1820
{
19-
_connectionProvider = connectionProvider;
20-
_logger = logger;
21+
_connectionProvider = connectionProvider ?? throw new ArgumentNullException(nameof(connectionProvider));
22+
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
23+
_maxChannels = maxChannels;
2124
}
2225

2326
public IModel? GetChannel()
2427
{
25-
if (_model == null || !_model.IsOpen && _connectionProvider != null)
28+
ThrowIfDisposed();
29+
30+
// Try to reuse a channel from the pool
31+
while (_channelPool.TryDequeue(out var channel))
32+
{
33+
if (channel.IsOpen)
34+
return channel;
35+
36+
DisposeChannel(channel);
37+
}
38+
39+
// Try to create a new channel if we haven't reached max
40+
if (Interlocked.Increment(ref _currentChannelCount) <= _maxChannels)
2641
{
2742
try
2843
{
29-
IConnection? connection = _connectionProvider.GetConnection();
30-
if (connection != null) {
31-
_model = connection.CreateModel();
32-
}
44+
var connection = _connectionProvider.GetConnection();
45+
if (connection != null && connection.IsOpen)
46+
return connection.CreateModel();
47+
48+
_logger.LogWarning("RabbitMQ connection is not open.");
49+
Interlocked.Decrement(ref _currentChannelCount); // failed to create
3350
}
3451
catch (Exception ex)
3552
{
36-
var ExceptionMessage = ex.Message;
37-
_logger.LogError(ex, "ChannelProvider GetChannel Exception: {ExceptionMessage}", ExceptionMessage);
53+
_logger.LogError(ex, "Error creating RabbitMQ channel.");
54+
Interlocked.Decrement(ref _currentChannelCount); // failed to create
3855
}
3956
}
57+
else
58+
{
59+
Interlocked.Decrement(ref _currentChannelCount); // revert increment since max reached
60+
_logger.LogWarning("Max channel count reached ({MaxChannels}). Cannot create new channel.", _maxChannels);
61+
}
4062

41-
return _model;
63+
return null;
4264
}
4365

44-
public void Dispose()
66+
public void ReturnChannel(IModel channel)
67+
{
68+
if (_disposed)
69+
{
70+
DisposeChannel(channel);
71+
return;
72+
}
73+
74+
if (channel.IsOpen)
75+
_channelPool.Enqueue(channel);
76+
else
77+
DisposeChannel(channel);
78+
}
79+
80+
private void DisposeChannel(IModel channel)
4581
{
4682
try
4783
{
48-
if (_model != null)
49-
{
50-
_model.Close();
51-
_model.Dispose();
52-
}
84+
if (channel.IsOpen)
85+
channel.Close();
5386
}
5487
catch (Exception ex)
5588
{
56-
_logger.LogCritical(ex, "Cannot dispose RabbitMq channel or connection");
89+
_logger.LogWarning(ex, "Exception while closing RabbitMQ channel.");
90+
}
91+
92+
try
93+
{
94+
channel.Dispose();
95+
}
96+
catch (Exception ex)
97+
{
98+
_logger.LogWarning(ex, "Exception while disposing RabbitMQ channel.");
99+
}
100+
101+
Interlocked.Decrement(ref _currentChannelCount);
102+
}
103+
104+
private void ThrowIfDisposed()
105+
{
106+
if (_disposed)
107+
throw new ObjectDisposedException(nameof(ChannelProvider));
108+
}
109+
110+
public void Dispose()
111+
{
112+
if (_disposed)
113+
return;
114+
115+
_disposed = true;
116+
117+
while (_channelPool.TryDequeue(out var channel))
118+
{
119+
DisposeChannel(channel);
57120
}
58121
}
59122
}
60-
}
123+
}

applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/Interfaces/IChannelProvider.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces
66
public interface IChannelProvider : IDisposable
77
{
88
IModel? GetChannel();
9+
void ReturnChannel(IModel channel);
910
}
1011
}

applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/Interfaces/IQueueChannelProvider.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
1-
namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces
1+
using System;
2+
using RabbitMQ.Client;
3+
4+
namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces
25
{
36
/// <summary>
4-
/// A channel provider that Declares and Binds a specific queue
7+
/// Provides a RabbitMQ channel that declares and binds a specific queue and its dead-letter queue.
58
/// </summary>
6-
#pragma warning disable S2326
7-
public interface IQueueChannelProvider<in TQueueMessage> : IChannelProvider where TQueueMessage : IQueueMessage
9+
public interface IQueueChannelProvider<TQueueMessage> : IDisposable where TQueueMessage : IQueueMessage
810
{
11+
12+
/// <summary>
13+
/// Gets a channel for publishing or consuming messages of the specified type.
14+
/// </summary>
15+
IModel GetChannel();
916
}
10-
#pragma warning restore S2326
1117
}
12-
Lines changed: 133 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,174 @@
11
using RabbitMQ.Client;
22
using System;
33
using System.Collections.Generic;
4+
using Microsoft.Extensions.Logging;
45
using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Constants;
56
using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces;
7+
using System.Threading;
68

79
namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ
810
{
9-
public class QueueChannelProvider<TQueueMessage> : IQueueChannelProvider<TQueueMessage> where TQueueMessage : IQueueMessage
11+
public class QueueChannelProvider<TQueueMessage>(IChannelProvider channelProvider, ILogger<QueueChannelProvider<TQueueMessage>> logger) : IQueueChannelProvider<TQueueMessage>
12+
where TQueueMessage : IQueueMessage
1013
{
11-
private readonly IChannelProvider _channelProvider;
14+
private readonly IChannelProvider _channelProvider = channelProvider ?? throw new ArgumentNullException(nameof(channelProvider));
15+
private readonly ILogger<QueueChannelProvider<TQueueMessage>> _logger = logger ?? throw new ArgumentNullException(nameof(logger));
16+
private readonly Lock _lock = new();
1217
private IModel? _channel;
13-
private bool disposedValue;
14-
private readonly string _queueName;
18+
private bool _disposed;
19+
private bool _queuesDeclared;
20+
private readonly string _queueName = typeof(TQueueMessage).Name;
1521

16-
public QueueChannelProvider(
17-
IChannelProvider channelProvider)
22+
public IModel GetChannel()
1823
{
19-
_channelProvider = channelProvider;
20-
_queueName = typeof(TQueueMessage).Name;
24+
ObjectDisposedException.ThrowIf(_disposed, typeof(QueueChannelProvider<TQueueMessage>));
25+
26+
lock (_lock)
27+
{
28+
if (_channel == null || !_channel.IsOpen)
29+
{
30+
_channel?.Dispose();
31+
_channel = _channelProvider.GetChannel();
32+
_queuesDeclared = false;
33+
}
34+
35+
if (_channel == null || !_channel.IsOpen)
36+
throw new InvalidOperationException("Failed to get a valid RabbitMQ channel");
37+
38+
if (!_queuesDeclared)
39+
{
40+
DeclareQueueAndDeadLetter(_channel);
41+
_queuesDeclared = true;
42+
}
43+
44+
return _channel;
45+
}
2146
}
2247

23-
public IModel? GetChannel()
48+
private void DeclareQueueAndDeadLetter(IModel channel)
2449
{
25-
_channel = _channelProvider?.GetChannel();
26-
DeclareQueueAndDeadLetter();
27-
return _channel;
50+
try
51+
{
52+
// First, try to declare the queue as passive to check if it exists
53+
try
54+
{
55+
channel.QueueDeclarePassive(_queueName);
56+
// Queue exists and is compatible, just declare exchange and binding
57+
DeclareCompatibleQueue(channel);
58+
return;
59+
}
60+
catch (global::RabbitMQ.Client.Exceptions.OperationInterruptedException ex)
61+
{
62+
// The channel is now closed. Get a new one immediately.
63+
_channel?.Dispose();
64+
_channel = _channelProvider.GetChannel();
65+
channel = _channel ?? throw new InvalidOperationException("Failed to get a new RabbitMQ channel after an error.");
66+
67+
// Check the reason for the exception
68+
if (ex.ShutdownReason.ReplyCode == 404)
69+
{
70+
// Queue not found, declare it with the full dead-letter configuration
71+
DeclareQueueWithDeadLetter(channel);
72+
return;
73+
}
74+
if (ex.ShutdownReason.ReplyText.Contains("inequivalent arg"))
75+
{
76+
_logger.LogDebug("Queue {QueueName} exists with incompatible configuration, falling back to compatibility mode.", _queueName);
77+
DeclareCompatibleQueue(channel);
78+
return;
79+
}
80+
81+
// Re-throw any other exceptions
82+
throw;
83+
}
84+
}
85+
catch (Exception ex)
86+
{
87+
throw new InvalidOperationException($"Failed to declare queues for {_queueName}", ex);
88+
}
2889
}
2990

30-
private void DeclareQueueAndDeadLetter()
91+
private void DeclareQueueWithDeadLetter(IModel channel)
3192
{
32-
var deadLetterQueueName = $"{_queueName}{QueueingConstants.DeadletterAddition}";
93+
var dlxName = $"{_queueName}.dlx";
94+
var dlqName = $"{_queueName}{QueueingConstants.DeadletterAddition}";
95+
var mainExchange = $"{_queueName}.exchange";
3396

34-
// Declare the DeadLetter Queue
35-
var deadLetterQueueArgs = new Dictionary<string, object>
97+
channel.ExchangeDeclare(dlxName, ExchangeType.Direct, durable: true);
98+
99+
var dlqArgs = new Dictionary<string, object>
36100
{
37-
{ "x-queue-type", "quorum" },
38-
{ "overflow", "reject-publish" } // If the queue is full, reject the publish
101+
{ "x-queue-type", "quorum" },
102+
{ "x-overflow", "reject-publish" }
39103
};
40-
if(_channel == null) return;
41104

42-
_channel.ExchangeDeclare(deadLetterQueueName, ExchangeType.Direct);
43-
_channel.QueueDeclare(deadLetterQueueName, true, false, false, deadLetterQueueArgs);
44-
_channel.QueueBind(deadLetterQueueName, deadLetterQueueName, deadLetterQueueName, null);
105+
channel.QueueDeclare(dlqName, durable: true, exclusive: false, autoDelete: false, arguments: dlqArgs);
106+
channel.QueueBind(dlqName, dlxName, dlqName);
107+
108+
channel.ExchangeDeclare(mainExchange, ExchangeType.Direct, durable: true);
45109

46-
// Declare the Queue
47-
var queueArgs = new Dictionary<string, object>
110+
var mainQArgs = new Dictionary<string, object>
48111
{
49-
{ "x-dead-letter-exchange", deadLetterQueueName },
50-
{ "x-dead-letter-routing-key", deadLetterQueueName },
51112
{ "x-queue-type", "quorum" },
52-
{ "x-dead-letter-strategy", "at-least-once" }, // Ensure that deadletter messages are delivered in any case see: https://www.rabbitmq.com/quorum-queues.html#dead-lettering
53-
{ "overflow", "reject-publish" } // If the queue is full, reject the publish
113+
{ "x-overflow", "reject-publish" },
114+
{ "x-dead-letter-exchange", dlxName },
115+
{ "x-dead-letter-routing-key", dlqName },
116+
{ "x-dead-letter-strategy", "at-least-once" },
117+
{ "x-delivery-limit", 10 }
54118
};
55119

56-
_channel.ExchangeDeclare(_queueName, ExchangeType.Direct);
57-
_channel.QueueDeclare(_queueName, true, false, false, queueArgs);
58-
_channel.QueueBind(_queueName, _queueName, _queueName, null);
120+
channel.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false, arguments: mainQArgs);
121+
channel.QueueBind(_queueName, mainExchange, _queueName);
59122
}
60123

61-
protected virtual void Dispose(bool disposing)
124+
private void DeclareCompatibleQueue(IModel channel)
62125
{
63-
if (!disposedValue)
126+
var mainExchange = $"{_queueName}.exchange";
127+
128+
try
64129
{
65-
if (disposing)
66-
{
67-
// dispose managed state (managed objects)
68-
}
130+
channel.ExchangeDeclare(mainExchange, ExchangeType.Direct, durable: true);
131+
channel.QueueBind(_queueName, mainExchange, _queueName);
69132

70-
disposedValue = true;
133+
_logger.LogWarning("Queue {QueueName} exists with incompatible configuration. Running in compatibility mode without dead letter support.", _queueName);
134+
}
135+
catch (Exception ex)
136+
{
137+
throw new InvalidOperationException(
138+
$"Failed to declare queue {_queueName} in compatibility mode. " +
139+
"The existing queue has incompatible configuration and cannot be used.", ex);
71140
}
72141
}
73142

74143
public void Dispose()
75144
{
76-
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
77-
Dispose(disposing: true);
145+
Dispose(true);
78146
GC.SuppressFinalize(this);
79147
}
148+
149+
protected virtual void Dispose(bool disposing)
150+
{
151+
if (_disposed) return;
152+
153+
if (disposing)
154+
{
155+
if (_channel != null && _channelProvider != null)
156+
{
157+
try
158+
{
159+
_channelProvider.ReturnChannel(_channel);
160+
}
161+
catch
162+
{
163+
_channel?.Dispose();
164+
}
165+
}
166+
}
167+
168+
_disposed = true;
169+
_channel = null;
170+
}
171+
172+
public string QueueName => _queueName;
80173
}
81174
}

0 commit comments

Comments
 (0)