Skip to content
Merged

Dev #1658

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,60 +1,123 @@

using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using System;
using System.Collections.Concurrent;
using System.Threading;
using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces;

namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ
{
public sealed class ChannelProvider : IChannelProvider
public sealed class ChannelProvider : IChannelProvider, IDisposable
{
private readonly IConnectionProvider _connectionProvider;
private readonly ILogger<ChannelProvider> _logger;
private IModel? _model;
private readonly int _maxChannels;
private readonly ConcurrentQueue<IModel> _channelPool = new();
private int _currentChannelCount;
private bool _disposed;

public ChannelProvider(
IConnectionProvider connectionProvider,
ILogger<ChannelProvider> logger)
public ChannelProvider(IConnectionProvider connectionProvider, ILogger<ChannelProvider> logger, int maxChannels = 10000)
{
_connectionProvider = connectionProvider;
_logger = logger;
_connectionProvider = connectionProvider ?? throw new ArgumentNullException(nameof(connectionProvider));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_maxChannels = maxChannels;
}

public IModel? GetChannel()
{
if (_model == null || !_model.IsOpen && _connectionProvider != null)
ThrowIfDisposed();

// Try to reuse a channel from the pool
while (_channelPool.TryDequeue(out var channel))
{
if (channel.IsOpen)
return channel;

DisposeChannel(channel);
}

// Try to create a new channel if we haven't reached max
if (Interlocked.Increment(ref _currentChannelCount) <= _maxChannels)
{
try
{
IConnection? connection = _connectionProvider.GetConnection();
if (connection != null) {
_model = connection.CreateModel();
}
var connection = _connectionProvider.GetConnection();
if (connection != null && connection.IsOpen)
return connection.CreateModel();

_logger.LogWarning("RabbitMQ connection is not open.");
Interlocked.Decrement(ref _currentChannelCount); // failed to create
}
catch (Exception ex)
{
var ExceptionMessage = ex.Message;
_logger.LogError(ex, "ChannelProvider GetChannel Exception: {ExceptionMessage}", ExceptionMessage);
_logger.LogError(ex, "Error creating RabbitMQ channel.");
Interlocked.Decrement(ref _currentChannelCount); // failed to create
}
}
else
{
Interlocked.Decrement(ref _currentChannelCount); // revert increment since max reached
_logger.LogWarning("Max channel count reached ({MaxChannels}). Cannot create new channel.", _maxChannels);
}

return _model;
return null;
}

public void Dispose()
public void ReturnChannel(IModel channel)
{
if (_disposed)
{
DisposeChannel(channel);
return;
}

if (channel.IsOpen)
_channelPool.Enqueue(channel);
else
DisposeChannel(channel);
}

private void DisposeChannel(IModel channel)
{
try
{
if (_model != null)
{
_model.Close();
_model.Dispose();
}
if (channel.IsOpen)
channel.Close();
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Cannot dispose RabbitMq channel or connection");
_logger.LogWarning(ex, "Exception while closing RabbitMQ channel.");
}

try
{
channel.Dispose();
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Exception while disposing RabbitMQ channel.");
}

Interlocked.Decrement(ref _currentChannelCount);
}

private void ThrowIfDisposed()
{
if (_disposed)
throw new ObjectDisposedException(nameof(ChannelProvider));
}

public void Dispose()
{
if (_disposed)
return;

_disposed = true;

while (_channelPool.TryDequeue(out var channel))
{
DisposeChannel(channel);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces
public interface IChannelProvider : IDisposable
{
IModel? GetChannel();
void ReturnChannel(IModel channel);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces
using System;
using RabbitMQ.Client;

namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces
{
/// <summary>
/// A channel provider that Declares and Binds a specific queue
/// Provides a RabbitMQ channel that declares and binds a specific queue and its dead-letter queue.
/// </summary>
#pragma warning disable S2326
public interface IQueueChannelProvider<in TQueueMessage> : IChannelProvider where TQueueMessage : IQueueMessage
public interface IQueueChannelProvider<TQueueMessage> : IDisposable where TQueueMessage : IQueueMessage
{

/// <summary>
/// Gets a channel for publishing or consuming messages of the specified type.
/// </summary>
IModel GetChannel();
}
#pragma warning restore S2326
}

Original file line number Diff line number Diff line change
@@ -1,81 +1,174 @@
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Constants;
using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces;
using System.Threading;

namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ
{
public class QueueChannelProvider<TQueueMessage> : IQueueChannelProvider<TQueueMessage> where TQueueMessage : IQueueMessage
public class QueueChannelProvider<TQueueMessage>(IChannelProvider channelProvider, ILogger<QueueChannelProvider<TQueueMessage>> logger) : IQueueChannelProvider<TQueueMessage>
where TQueueMessage : IQueueMessage
{
private readonly IChannelProvider _channelProvider;
private readonly IChannelProvider _channelProvider = channelProvider ?? throw new ArgumentNullException(nameof(channelProvider));
private readonly ILogger<QueueChannelProvider<TQueueMessage>> _logger = logger ?? throw new ArgumentNullException(nameof(logger));
private readonly Lock _lock = new();
private IModel? _channel;
private bool disposedValue;
private readonly string _queueName;
private bool _disposed;
private bool _queuesDeclared;
private readonly string _queueName = typeof(TQueueMessage).Name;

public QueueChannelProvider(
IChannelProvider channelProvider)
public IModel GetChannel()
{
_channelProvider = channelProvider;
_queueName = typeof(TQueueMessage).Name;
ObjectDisposedException.ThrowIf(_disposed, typeof(QueueChannelProvider<TQueueMessage>));

lock (_lock)
{
if (_channel == null || !_channel.IsOpen)
{
_channel?.Dispose();
_channel = _channelProvider.GetChannel();
_queuesDeclared = false;
}

if (_channel == null || !_channel.IsOpen)
throw new InvalidOperationException("Failed to get a valid RabbitMQ channel");

if (!_queuesDeclared)
{
DeclareQueueAndDeadLetter(_channel);
_queuesDeclared = true;
}

return _channel;
}
}

public IModel? GetChannel()
private void DeclareQueueAndDeadLetter(IModel channel)
{
_channel = _channelProvider?.GetChannel();
DeclareQueueAndDeadLetter();
return _channel;
try
{
// First, try to declare the queue as passive to check if it exists
try
{
channel.QueueDeclarePassive(_queueName);
// Queue exists and is compatible, just declare exchange and binding
DeclareCompatibleQueue(channel);
return;
}
catch (global::RabbitMQ.Client.Exceptions.OperationInterruptedException ex)
{
// The channel is now closed. Get a new one immediately.
_channel?.Dispose();
_channel = _channelProvider.GetChannel();
channel = _channel ?? throw new InvalidOperationException("Failed to get a new RabbitMQ channel after an error.");

// Check the reason for the exception
if (ex.ShutdownReason.ReplyCode == 404)
{
// Queue not found, declare it with the full dead-letter configuration
DeclareQueueWithDeadLetter(channel);
return;
}
if (ex.ShutdownReason.ReplyText.Contains("inequivalent arg"))
{
_logger.LogDebug("Queue {QueueName} exists with incompatible configuration, falling back to compatibility mode.", _queueName);
DeclareCompatibleQueue(channel);
return;
}

// Re-throw any other exceptions
throw;
}
}
catch (Exception ex)
{
throw new InvalidOperationException($"Failed to declare queues for {_queueName}", ex);
}
}

private void DeclareQueueAndDeadLetter()
private void DeclareQueueWithDeadLetter(IModel channel)
{
var deadLetterQueueName = $"{_queueName}{QueueingConstants.DeadletterAddition}";
var dlxName = $"{_queueName}.dlx";
var dlqName = $"{_queueName}{QueueingConstants.DeadletterAddition}";
var mainExchange = $"{_queueName}.exchange";

// Declare the DeadLetter Queue
var deadLetterQueueArgs = new Dictionary<string, object>
channel.ExchangeDeclare(dlxName, ExchangeType.Direct, durable: true);

var dlqArgs = new Dictionary<string, object>
{
{ "x-queue-type", "quorum" },
{ "overflow", "reject-publish" } // If the queue is full, reject the publish
{ "x-queue-type", "quorum" },
{ "x-overflow", "reject-publish" }
};
if(_channel == null) return;

_channel.ExchangeDeclare(deadLetterQueueName, ExchangeType.Direct);
_channel.QueueDeclare(deadLetterQueueName, true, false, false, deadLetterQueueArgs);
_channel.QueueBind(deadLetterQueueName, deadLetterQueueName, deadLetterQueueName, null);
channel.QueueDeclare(dlqName, durable: true, exclusive: false, autoDelete: false, arguments: dlqArgs);
channel.QueueBind(dlqName, dlxName, dlqName);

channel.ExchangeDeclare(mainExchange, ExchangeType.Direct, durable: true);

// Declare the Queue
var queueArgs = new Dictionary<string, object>
var mainQArgs = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", deadLetterQueueName },
{ "x-dead-letter-routing-key", deadLetterQueueName },
{ "x-queue-type", "quorum" },
{ "x-dead-letter-strategy", "at-least-once" }, // Ensure that deadletter messages are delivered in any case see: https://www.rabbitmq.com/quorum-queues.html#dead-lettering
{ "overflow", "reject-publish" } // If the queue is full, reject the publish
{ "x-overflow", "reject-publish" },
{ "x-dead-letter-exchange", dlxName },
{ "x-dead-letter-routing-key", dlqName },
{ "x-dead-letter-strategy", "at-least-once" },
{ "x-delivery-limit", 10 }
};

_channel.ExchangeDeclare(_queueName, ExchangeType.Direct);
_channel.QueueDeclare(_queueName, true, false, false, queueArgs);
_channel.QueueBind(_queueName, _queueName, _queueName, null);
channel.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false, arguments: mainQArgs);
channel.QueueBind(_queueName, mainExchange, _queueName);
}

protected virtual void Dispose(bool disposing)
private void DeclareCompatibleQueue(IModel channel)
{
if (!disposedValue)
var mainExchange = $"{_queueName}.exchange";

try
{
if (disposing)
{
// dispose managed state (managed objects)
}
channel.ExchangeDeclare(mainExchange, ExchangeType.Direct, durable: true);
channel.QueueBind(_queueName, mainExchange, _queueName);

disposedValue = true;
_logger.LogWarning("Queue {QueueName} exists with incompatible configuration. Running in compatibility mode without dead letter support.", _queueName);
}
catch (Exception ex)
{
throw new InvalidOperationException(
$"Failed to declare queue {_queueName} in compatibility mode. " +
"The existing queue has incompatible configuration and cannot be used.", ex);
}
}

public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (_disposed) return;

if (disposing)
{
if (_channel != null && _channelProvider != null)
{
try
{
_channelProvider.ReturnChannel(_channel);
}
catch
{
_channel?.Dispose();
}
}
}

_disposed = true;
_channel = null;
}

public string QueueName => _queueName;
}
}
Loading
Loading