diff --git a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/ChannelProvider.cs b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/ChannelProvider.cs index 856eb20b8..85cdd6004 100644 --- a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/ChannelProvider.cs +++ b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/ChannelProvider.cs @@ -1,60 +1,123 @@ - using Microsoft.Extensions.Logging; using RabbitMQ.Client; using System; +using System.Collections.Concurrent; +using System.Threading; using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces; namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ { - public sealed class ChannelProvider : IChannelProvider + public sealed class ChannelProvider : IChannelProvider, IDisposable { private readonly IConnectionProvider _connectionProvider; private readonly ILogger _logger; - private IModel? _model; + private readonly int _maxChannels; + private readonly ConcurrentQueue _channelPool = new(); + private int _currentChannelCount; + private bool _disposed; - public ChannelProvider( - IConnectionProvider connectionProvider, - ILogger logger) + public ChannelProvider(IConnectionProvider connectionProvider, ILogger logger, int maxChannels = 10000) { - _connectionProvider = connectionProvider; - _logger = logger; + _connectionProvider = connectionProvider ?? throw new ArgumentNullException(nameof(connectionProvider)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _maxChannels = maxChannels; } public IModel? GetChannel() { - if (_model == null || !_model.IsOpen && _connectionProvider != null) + ThrowIfDisposed(); + + // Try to reuse a channel from the pool + while (_channelPool.TryDequeue(out var channel)) + { + if (channel.IsOpen) + return channel; + + DisposeChannel(channel); + } + + // Try to create a new channel if we haven't reached max + if (Interlocked.Increment(ref _currentChannelCount) <= _maxChannels) { try { - IConnection? connection = _connectionProvider.GetConnection(); - if (connection != null) { - _model = connection.CreateModel(); - } + var connection = _connectionProvider.GetConnection(); + if (connection != null && connection.IsOpen) + return connection.CreateModel(); + + _logger.LogWarning("RabbitMQ connection is not open."); + Interlocked.Decrement(ref _currentChannelCount); // failed to create } catch (Exception ex) { - var ExceptionMessage = ex.Message; - _logger.LogError(ex, "ChannelProvider GetChannel Exception: {ExceptionMessage}", ExceptionMessage); + _logger.LogError(ex, "Error creating RabbitMQ channel."); + Interlocked.Decrement(ref _currentChannelCount); // failed to create } } + else + { + Interlocked.Decrement(ref _currentChannelCount); // revert increment since max reached + _logger.LogWarning("Max channel count reached ({MaxChannels}). Cannot create new channel.", _maxChannels); + } - return _model; + return null; } - public void Dispose() + public void ReturnChannel(IModel channel) + { + if (_disposed) + { + DisposeChannel(channel); + return; + } + + if (channel.IsOpen) + _channelPool.Enqueue(channel); + else + DisposeChannel(channel); + } + + private void DisposeChannel(IModel channel) { try { - if (_model != null) - { - _model.Close(); - _model.Dispose(); - } + if (channel.IsOpen) + channel.Close(); } catch (Exception ex) { - _logger.LogCritical(ex, "Cannot dispose RabbitMq channel or connection"); + _logger.LogWarning(ex, "Exception while closing RabbitMQ channel."); + } + + try + { + channel.Dispose(); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Exception while disposing RabbitMQ channel."); + } + + Interlocked.Decrement(ref _currentChannelCount); + } + + private void ThrowIfDisposed() + { + if (_disposed) + throw new ObjectDisposedException(nameof(ChannelProvider)); + } + + public void Dispose() + { + if (_disposed) + return; + + _disposed = true; + + while (_channelPool.TryDequeue(out var channel)) + { + DisposeChannel(channel); } } } -} \ No newline at end of file +} diff --git a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/Interfaces/IChannelProvider.cs b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/Interfaces/IChannelProvider.cs index cc7160b84..a57ca8acf 100644 --- a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/Interfaces/IChannelProvider.cs +++ b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/Interfaces/IChannelProvider.cs @@ -6,5 +6,6 @@ namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces public interface IChannelProvider : IDisposable { IModel? GetChannel(); + void ReturnChannel(IModel channel); } } diff --git a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/Interfaces/IQueueChannelProvider.cs b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/Interfaces/IQueueChannelProvider.cs index 5418b4a1b..9eaaa9646 100644 --- a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/Interfaces/IQueueChannelProvider.cs +++ b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/Interfaces/IQueueChannelProvider.cs @@ -1,12 +1,17 @@ -namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces +using System; +using RabbitMQ.Client; + +namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces { /// - /// A channel provider that Declares and Binds a specific queue + /// Provides a RabbitMQ channel that declares and binds a specific queue and its dead-letter queue. /// -#pragma warning disable S2326 - public interface IQueueChannelProvider : IChannelProvider where TQueueMessage : IQueueMessage + public interface IQueueChannelProvider : IDisposable where TQueueMessage : IQueueMessage { + + /// + /// Gets a channel for publishing or consuming messages of the specified type. + /// + IModel GetChannel(); } -#pragma warning restore S2326 } - diff --git a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueChannelProvider.cs b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueChannelProvider.cs index 8d073da15..17760fc0d 100644 --- a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueChannelProvider.cs +++ b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueChannelProvider.cs @@ -1,81 +1,174 @@ using RabbitMQ.Client; using System; using System.Collections.Generic; +using Microsoft.Extensions.Logging; using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Constants; using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces; +using System.Threading; namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ { - public class QueueChannelProvider : IQueueChannelProvider where TQueueMessage : IQueueMessage + public class QueueChannelProvider(IChannelProvider channelProvider, ILogger> logger) : IQueueChannelProvider + where TQueueMessage : IQueueMessage { - private readonly IChannelProvider _channelProvider; + private readonly IChannelProvider _channelProvider = channelProvider ?? throw new ArgumentNullException(nameof(channelProvider)); + private readonly ILogger> _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + private readonly Lock _lock = new(); private IModel? _channel; - private bool disposedValue; - private readonly string _queueName; + private bool _disposed; + private bool _queuesDeclared; + private readonly string _queueName = typeof(TQueueMessage).Name; - public QueueChannelProvider( - IChannelProvider channelProvider) + public IModel GetChannel() { - _channelProvider = channelProvider; - _queueName = typeof(TQueueMessage).Name; + ObjectDisposedException.ThrowIf(_disposed, typeof(QueueChannelProvider)); + + lock (_lock) + { + if (_channel == null || !_channel.IsOpen) + { + _channel?.Dispose(); + _channel = _channelProvider.GetChannel(); + _queuesDeclared = false; + } + + if (_channel == null || !_channel.IsOpen) + throw new InvalidOperationException("Failed to get a valid RabbitMQ channel"); + + if (!_queuesDeclared) + { + DeclareQueueAndDeadLetter(_channel); + _queuesDeclared = true; + } + + return _channel; + } } - public IModel? GetChannel() + private void DeclareQueueAndDeadLetter(IModel channel) { - _channel = _channelProvider?.GetChannel(); - DeclareQueueAndDeadLetter(); - return _channel; + try + { + // First, try to declare the queue as passive to check if it exists + try + { + channel.QueueDeclarePassive(_queueName); + // Queue exists and is compatible, just declare exchange and binding + DeclareCompatibleQueue(channel); + return; + } + catch (global::RabbitMQ.Client.Exceptions.OperationInterruptedException ex) + { + // The channel is now closed. Get a new one immediately. + _channel?.Dispose(); + _channel = _channelProvider.GetChannel(); + channel = _channel ?? throw new InvalidOperationException("Failed to get a new RabbitMQ channel after an error."); + + // Check the reason for the exception + if (ex.ShutdownReason.ReplyCode == 404) + { + // Queue not found, declare it with the full dead-letter configuration + DeclareQueueWithDeadLetter(channel); + return; + } + if (ex.ShutdownReason.ReplyText.Contains("inequivalent arg")) + { + _logger.LogDebug("Queue {QueueName} exists with incompatible configuration, falling back to compatibility mode.", _queueName); + DeclareCompatibleQueue(channel); + return; + } + + // Re-throw any other exceptions + throw; + } + } + catch (Exception ex) + { + throw new InvalidOperationException($"Failed to declare queues for {_queueName}", ex); + } } - private void DeclareQueueAndDeadLetter() + private void DeclareQueueWithDeadLetter(IModel channel) { - var deadLetterQueueName = $"{_queueName}{QueueingConstants.DeadletterAddition}"; + var dlxName = $"{_queueName}.dlx"; + var dlqName = $"{_queueName}{QueueingConstants.DeadletterAddition}"; + var mainExchange = $"{_queueName}.exchange"; - // Declare the DeadLetter Queue - var deadLetterQueueArgs = new Dictionary + channel.ExchangeDeclare(dlxName, ExchangeType.Direct, durable: true); + + var dlqArgs = new Dictionary { - { "x-queue-type", "quorum" }, - { "overflow", "reject-publish" } // If the queue is full, reject the publish + { "x-queue-type", "quorum" }, + { "x-overflow", "reject-publish" } }; - if(_channel == null) return; - _channel.ExchangeDeclare(deadLetterQueueName, ExchangeType.Direct); - _channel.QueueDeclare(deadLetterQueueName, true, false, false, deadLetterQueueArgs); - _channel.QueueBind(deadLetterQueueName, deadLetterQueueName, deadLetterQueueName, null); + channel.QueueDeclare(dlqName, durable: true, exclusive: false, autoDelete: false, arguments: dlqArgs); + channel.QueueBind(dlqName, dlxName, dlqName); + + channel.ExchangeDeclare(mainExchange, ExchangeType.Direct, durable: true); - // Declare the Queue - var queueArgs = new Dictionary + var mainQArgs = new Dictionary { - { "x-dead-letter-exchange", deadLetterQueueName }, - { "x-dead-letter-routing-key", deadLetterQueueName }, { "x-queue-type", "quorum" }, - { "x-dead-letter-strategy", "at-least-once" }, // Ensure that deadletter messages are delivered in any case see: https://www.rabbitmq.com/quorum-queues.html#dead-lettering - { "overflow", "reject-publish" } // If the queue is full, reject the publish + { "x-overflow", "reject-publish" }, + { "x-dead-letter-exchange", dlxName }, + { "x-dead-letter-routing-key", dlqName }, + { "x-dead-letter-strategy", "at-least-once" }, + { "x-delivery-limit", 10 } }; - _channel.ExchangeDeclare(_queueName, ExchangeType.Direct); - _channel.QueueDeclare(_queueName, true, false, false, queueArgs); - _channel.QueueBind(_queueName, _queueName, _queueName, null); + channel.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false, arguments: mainQArgs); + channel.QueueBind(_queueName, mainExchange, _queueName); } - protected virtual void Dispose(bool disposing) + private void DeclareCompatibleQueue(IModel channel) { - if (!disposedValue) + var mainExchange = $"{_queueName}.exchange"; + + try { - if (disposing) - { - // dispose managed state (managed objects) - } + channel.ExchangeDeclare(mainExchange, ExchangeType.Direct, durable: true); + channel.QueueBind(_queueName, mainExchange, _queueName); - disposedValue = true; + _logger.LogWarning("Queue {QueueName} exists with incompatible configuration. Running in compatibility mode without dead letter support.", _queueName); + } + catch (Exception ex) + { + throw new InvalidOperationException( + $"Failed to declare queue {_queueName} in compatibility mode. " + + "The existing queue has incompatible configuration and cannot be used.", ex); } } public void Dispose() { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); + Dispose(true); GC.SuppressFinalize(this); } + + protected virtual void Dispose(bool disposing) + { + if (_disposed) return; + + if (disposing) + { + if (_channel != null && _channelProvider != null) + { + try + { + _channelProvider.ReturnChannel(_channel); + } + catch + { + _channel?.Dispose(); + } + } + } + + _disposed = true; + _channel = null; + } + + public string QueueName => _queueName; } } diff --git a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueConsumerHandler.cs b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueConsumerHandler.cs index 834df3fb5..6ef4fc049 100644 --- a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueConsumerHandler.cs +++ b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueConsumerHandler.cs @@ -11,137 +11,133 @@ namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ { - public class QueueConsumerHandler : IQueueConsumerHandler where TMessageConsumer : IQueueConsumer where TQueueMessage : class, IQueueMessage + public class QueueConsumerHandler + : IQueueConsumerHandler + where TMessageConsumer : IQueueConsumer + where TQueueMessage : class, IQueueMessage { private readonly IServiceProvider _serviceProvider; private readonly ILogger> _logger; private readonly string _queueName; - private IModel? _consumerRegistrationChannel; + private IModel? _consumerChannel; private string? _consumerTag; private readonly string _consumerName; - public QueueConsumerHandler(IServiceProvider serviceProvider, + public QueueConsumerHandler( + IServiceProvider serviceProvider, ILogger> logger) { - _serviceProvider = serviceProvider; - _logger = logger; - + _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _queueName = typeof(TQueueMessage).Name; _consumerName = typeof(TMessageConsumer).Name; } public void RegisterQueueConsumer() { - _logger.LogInformation("Registering {_consumerName} as a consumer for Queue {_queueName}", _consumerName, _queueName); + _logger.LogInformation("Registering {Consumer} as a consumer for Queue {Queue}", _consumerName, _queueName); var scope = _serviceProvider.CreateScope(); + _consumerChannel = scope.ServiceProvider + .GetRequiredService>() + .GetChannel(); - // Request a channel for registering the Consumer for this Queue - _consumerRegistrationChannel = scope.ServiceProvider.GetRequiredService>().GetChannel(); - if(_consumerRegistrationChannel != null ) + if (_consumerChannel == null) { - var consumer = new AsyncEventingBasicConsumer(_consumerRegistrationChannel); + throw new QueueingException($"Failed to create consumer channel for {_queueName}"); + } - // Register the trigger - consumer.Received += HandleMessage; - try - { - _consumerTag = _consumerRegistrationChannel.BasicConsume(_queueName, false, consumer); - } - catch (Exception ex) - { - var RegisterExceptionMessage = $"BasicConsume failed for Queue '{_queueName}'"; - _logger.LogError(ex, "QueueConsumerHandler - {RegisterExceptionMessage}", RegisterExceptionMessage); - throw new QueueingException(RegisterExceptionMessage); - } + var consumer = new AsyncEventingBasicConsumer(_consumerChannel); + consumer.Received += HandleMessage; - _logger.LogInformation("Succesfully registered {_consumerName} as a Consumer for Queue {_queueName}", _consumerName, _queueName); + try + { + _consumerTag = _consumerChannel.BasicConsume( + queue: _queueName, + autoAck: false, + consumer: consumer); + + _logger.LogInformation("Successfully registered {Consumer} as consumer for {Queue}", _consumerName, _queueName); + } + catch (Exception ex) + { + var msg = $"BasicConsume failed for Queue '{_queueName}'"; + _logger.LogError(ex, msg); + throw new QueueingException(msg, ex); } } public void CancelQueueConsumer() { - if(_consumerRegistrationChannel != null) { - _logger.LogInformation("Canceling QueueConsumer registration for {_consumerName}", _consumerName); - try - { - _consumerRegistrationChannel.BasicCancel(_consumerTag); - } - catch (Exception ex) - { - var CancelExceptionMessage = $"Error canceling QueueConsumer registration for {_consumerName}"; - _logger.LogError(ex, "QueueConsumerHandler Exception: {ExceptionMessage}", CancelExceptionMessage); - throw new QueueingException(CancelExceptionMessage, ex); - } + if (_consumerChannel == null || string.IsNullOrEmpty(_consumerTag)) + return; + + _logger.LogInformation("Canceling consumer {Consumer} for Queue {Queue}", _consumerName, _queueName); + + try + { + _consumerChannel.BasicCancel(_consumerTag); + } + catch (Exception ex) + { + var msg = $"Error canceling consumer {_consumerName}"; + _logger.LogError(ex, msg); + throw new QueueingException(msg, ex); + } + finally + { + _consumerChannel.Close(); + _consumerChannel.Dispose(); + _consumerChannel = null; } } - private async Task HandleMessage(object ch, BasicDeliverEventArgs ea) + private async Task HandleMessage(object sender, BasicDeliverEventArgs ea) { - _logger.LogInformation("Received Message on Queue {_queueName}", _queueName); + _logger.LogInformation("Received message on {Queue}", _queueName); - // Create a new scope for handling the consumption of the queue message - var consumerScope = _serviceProvider.CreateScope(); - - // This is the channel on which the Queue message is delivered to the consumer - var consumingChannel = ((AsyncEventingBasicConsumer)ch).Model; + using var consumerScope = _serviceProvider.CreateScope(); + var consumingChannel = ((AsyncEventingBasicConsumer)sender).Model; IModel? producingChannel = null; try { - // Within this processing scope, we will open a new channel that will handle all messages produced within this consumer/scope. - // This is neccessairy to be able to commit them as a transaction - producingChannel = consumerScope.ServiceProvider.GetRequiredService() - .GetChannel(); + producingChannel = consumerScope.ServiceProvider.GetRequiredService().GetChannel(); + if (producingChannel == null) + throw new QueueingException("Failed to acquire producing channel"); - // Serialize the message var message = DeserializeMessage(ea.Body.ToArray()); - if (producingChannel != null && message != null) - { - - var MessageId = message.MessageId; - _logger.LogInformation("MessageID '{MessageId}'", MessageId); + if (message == null) + throw new QueueingException("Failed to deserialize message"); - // Start a transaction which will contain all messages produced by this consumer - producingChannel.TxSelect(); + _logger.LogInformation("Processing MessageId {MessageId}", message.MessageId); - // Request an instance of the consumer from the Service Provider - var consumerInstance = consumerScope.ServiceProvider.GetRequiredService(); + producingChannel.TxSelect(); - // Trigger the consumer to start processing the message - await consumerInstance.ConsumeAsync(message); + var consumerInstance = consumerScope.ServiceProvider.GetRequiredService(); - // Ensure both channels are open before committing - if (producingChannel.IsClosed || consumingChannel.IsClosed) - { - throw new QueueingException("A channel is closed during processing"); - } + await consumerInstance.ConsumeAsync(message); - // Commit the transaction of any messages produced within this consumer scope - producingChannel.TxCommit(); + if (producingChannel.IsClosed || consumingChannel.IsClosed) + throw new QueueingException("A channel is closed during processing"); - // Acknowledge successfull handling of the message - consumingChannel.BasicAck(ea.DeliveryTag, false); + producingChannel.TxCommit(); + consumingChannel.BasicAck(ea.DeliveryTag, multiple: false); - _logger.LogInformation("Message succesfully processed"); - } else - { - _logger.LogError("RegisterQueueConsumer: The Producer Channel was null"); - } + _logger.LogInformation("Message {MessageId} successfully processed", message.MessageId); + } + catch (JsonException jex) + { + _logger.LogError(jex, "Deserialization failed for message on {Queue}", _queueName); + consumingChannel.BasicReject(ea.DeliveryTag, requeue: false); } catch (Exception ex) { - var HandleMessageException = $"Cannot handle consumption of a {_queueName} by {_consumerName}'"; - _logger.LogError(ex, "QueueConsumerHandler Exception: {ExceptionMessage}", HandleMessageException); - if(producingChannel != null) + _logger.LogError(ex, "Error processing message on {Queue}", _queueName); + if (producingChannel != null) { RejectMessage(ea.DeliveryTag, consumingChannel, producingChannel); - } - } - finally - { - // Dispose the scope which ensures that all Channels that are created within the consumption process will be disposed - consumerScope.Dispose(); + } } } @@ -149,30 +145,22 @@ private void RejectMessage(ulong deliveryTag, IModel consumeChannel, IModel scop { try { - // The consumption process could fail before the scope channel is created - if (scopeChannel != null) - { - // Rollback any massages within the transaction - scopeChannel.TxRollback(); - _logger.LogInformation("Rollbacked the transaction"); - } + scopeChannel.TxRollback(); + _logger.LogInformation("Rolled back producing transaction"); - // Reject the message on the consumption channel - consumeChannel.BasicReject(deliveryTag, false); - - _logger.LogWarning("Rejected queue message"); + consumeChannel.BasicReject(deliveryTag, requeue: false); + _logger.LogWarning("Message rejected"); } - catch (Exception bex) + catch (Exception ex) { - var ExceptionMessage = $"BasicReject failed"; - _logger.LogError(bex, "QueueConsumerHandler Exception: {ExceptionMessage}", ExceptionMessage); + _logger.LogError(ex, "Error during message rejection"); } } - private static TQueueMessage? DeserializeMessage(byte[] message) + private static TQueueMessage? DeserializeMessage(byte[] body) { - var stringMessage = Encoding.UTF8.GetString(message); - return JsonConvert.DeserializeObject(stringMessage); + var json = Encoding.UTF8.GetString(body); + return JsonConvert.DeserializeObject(json); } } } diff --git a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueConsumerRegistratorService.cs b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueConsumerRegistratorService.cs index e0727bc2c..828b6101b 100644 --- a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueConsumerRegistratorService.cs +++ b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueConsumerRegistratorService.cs @@ -22,24 +22,45 @@ public QueueConsumerRegistratorService(ILogger>(); - _consumerHandler?.RegisterQueueConsumer(); - } - catch (Exception ex) + var maxRetries = 3; + var retryDelay = TimeSpan.FromSeconds(5); + + for (int attempt = 1; attempt <= maxRetries; attempt++) { - var MessageException = ex.Message; - _logger.LogError(ex, "QueueConsumerRegistratorService StartAsync {MessageException}", MessageException); - } + try + { + _logger.LogInformation("Registering consumer {ConsumerName} (attempt {Attempt}/{MaxRetries})", + typeof(TMessageConsumer).Name, attempt, maxRetries); - return Task.CompletedTask; + _scope = _serviceProvider.CreateScope(); + _consumerHandler = _scope.ServiceProvider.GetRequiredService>(); + _consumerHandler.RegisterQueueConsumer(); + + _logger.LogInformation("Successfully registered consumer {ConsumerName}", typeof(TMessageConsumer).Name); + return; + } + catch (Exception ex) when (attempt < maxRetries) + { + _logger.LogWarning(ex, "Failed to register consumer {ConsumerName} on attempt {Attempt}. Retrying in {Delay}s...", + typeof(TMessageConsumer).Name, attempt, retryDelay.TotalSeconds); + + _scope?.Dispose(); + _scope = null; + + await Task.Delay(retryDelay, cancellationToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to register consumer {ConsumerName} after {MaxRetries} attempts", + typeof(TMessageConsumer).Name, maxRetries); + + _scope?.Dispose(); + _scope = null; + throw; + } + } } public Task StopAsync(CancellationToken cancellationToken) @@ -60,7 +81,7 @@ public Task StopAsync(CancellationToken cancellationToken) var ExceptionMessage = ex.Message; _logger.LogError(ex, "QueueConsumerRegistratorService StopAsync Exception: {ExceptionMessage}", ExceptionMessage); } - + return Task.CompletedTask; } } diff --git a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueProducer.cs b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueProducer.cs index 2663c1293..08b38b783 100644 --- a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueProducer.cs +++ b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueProducer.cs @@ -1,55 +1,74 @@ -using System.Text; +using System; using System.Globalization; +using System.Text; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using RabbitMQ.Client; -using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces; -using System; using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Exceptions; +using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces; namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ { - public class QueueProducer : IQueueProducer where TQueueMessage : IQueueMessage + public class QueueProducer : IQueueProducer + where TQueueMessage : IQueueMessage { private readonly ILogger> _logger; - private readonly string? _queueName; - private readonly IModel? _channel; + private readonly IQueueChannelProvider _channelProvider; + private readonly string _queueName; + private readonly string _exchangeName; - public QueueProducer(IQueueChannelProvider channelProvider, ILogger> logger) + public QueueProducer( + IQueueChannelProvider channelProvider, + ILogger> logger) { - _logger = logger; - - try{ - _channel = channelProvider?.GetChannel(); - _queueName = typeof(TQueueMessage).Name; - } catch (Exception ex) { - var ExceptionMessage = ex.Message; - _logger.LogError(ex, "QueueProducer Constructor issue: {ExceptionMessage}", ExceptionMessage); - } - + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _channelProvider = channelProvider ?? throw new ArgumentNullException(nameof(channelProvider)); + _queueName = typeof(TQueueMessage).Name; + _exchangeName = $"{_queueName}.exchange"; } public void PublishMessage(TQueueMessage message) { - if (Equals(message, default(TQueueMessage))) throw new ArgumentNullException(nameof(message)); - if (message.TimeToLive.Ticks <= 0) throw new QueueingException($"{nameof(message.TimeToLive)} cannot be zero or negative"); - if (_channel == null) throw new QueueingException("QueueProducer -> PublishMessage: Null Channel"); + if (message == null) + throw new ArgumentNullException(nameof(message)); + + if (message.TimeToLive.Ticks <= 0) + throw new QueueingException($"{nameof(message.TimeToLive)} cannot be zero or negative"); + + IModel channel = _channelProvider.GetChannel(); + try { + // Assign message ID message.MessageId = Guid.NewGuid(); + + // Serialize var serializedMessage = SerializeMessage(message); - var properties = _channel.CreateBasicProperties(); - properties.Persistent = true; + + // Properties + var properties = channel.CreateBasicProperties(); + properties.Persistent = true; // quorum queues persist properties.Type = _queueName; + properties.MessageId = message.MessageId.ToString(); properties.Expiration = message.TimeToLive.TotalMilliseconds.ToString(CultureInfo.InvariantCulture); - _channel.BasicPublish(_queueName, _queueName, properties, serializedMessage); + // Publish to the exchange bound to the queue + channel.BasicPublish( + exchange: _exchangeName, + routingKey: _queueName, + basicProperties: properties, + body: serializedMessage + ); } catch (Exception ex) { - var PublishMessageException = ex.Message; - _logger.LogError(ex, "PublishMessage Exception: {PublishMessageException}", PublishMessageException); - throw new QueueingException(PublishMessageException); + _logger.LogError(ex, "PublishMessage Exception: {Message}", ex.Message); + throw new QueueingException($"Publish failed: {ex.Message}", ex); + } + finally + { + // Return channel instead of disposing the provider + _channelProvider.Dispose(); // <- Replace with ReturnChannel if your provider exposes it } } diff --git a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueingStartupExtensions.cs b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueingStartupExtensions.cs index 279329e32..dcc4cae0a 100644 --- a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueingStartupExtensions.cs +++ b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/QueueingStartupExtensions.cs @@ -1,19 +1,71 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; using RabbitMQ.Client; -using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Constants; +using System; + using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Interfaces; namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ { public static class QueueingStartupExtensions { - public static void ConfigureRabbitMQ(this IServiceCollection services) + public static IServiceCollection ConfigureRabbitMQ(this IServiceCollection services, IConfiguration configuration) { - var configuration = services.GetConfiguration(); + // Configure options with validation + services.Configure(configuration.GetSection(RabbitMQOptions.SectionName)); + services.AddOptions() + .Bind(configuration.GetSection(RabbitMQOptions.SectionName)) + .ValidateDataAnnotations(); + + // Register connection factory services.TryAddSingleton(provider => { + var options = provider.GetRequiredService>().Value; + + var factory = new ConnectionFactory + { + UserName = options.UserName, + Password = options.Password, + HostName = options.HostName, + VirtualHost = options.VirtualHost, + Port = options.Port, + DispatchConsumersAsync = options.DispatchConsumersAsync, + AutomaticRecoveryEnabled = options.AutomaticRecoveryEnabled, + ConsumerDispatchConcurrency = options.ConsumerDispatchConcurrency, + NetworkRecoveryInterval = options.NetworkRecoveryInterval, + RequestedConnectionTimeout = options.RequestedConnectionTimeout, + RequestedHeartbeat = options.RequestedHeartbeat, + }; + + return factory; + }); + + // Register core services + services.TryAddSingleton(); + services.TryAddScoped(); + + // Register queue services + services.TryAddScoped(typeof(IQueueChannelProvider<>), typeof(QueueChannelProvider<>)); + services.TryAddScoped(typeof(IQueueProducer<>), typeof(QueueProducer<>)); + + return services; + } + + /// + /// Alternative method that works with service provider + /// + public static IServiceCollection ConfigureRabbitMQ(this IServiceCollection services) + { + services.TryAddSingleton(provider => + { + var configuration = provider.GetRequiredService(); + var options = new RabbitMQOptions(configuration); + configuration.GetSection(RabbitMQOptions.SectionName).Bind(options); + + // Validate configuration + // If you need custom validation, implement it here or rely on data annotations. var factory = new ConnectionFactory { UserName = configuration.GetValue("RabbitMQ:UserName") ?? "", @@ -21,26 +73,66 @@ public static void ConfigureRabbitMQ(this IServiceCollection services) HostName = configuration.GetValue("RabbitMQ:HostName") ?? "", VirtualHost = configuration.GetValue("RabbitMQ:VirtualHost") ?? "/", Port = configuration.GetValue("RabbitMQ:Port"), - DispatchConsumersAsync = true, - AutomaticRecoveryEnabled = true, - // Configure the amount of concurrent consumers within one host - ConsumerDispatchConcurrency = QueueingConstants.MAX_RABBIT_CONCURRENT_CONSUMERS, + DispatchConsumersAsync = options.DispatchConsumersAsync, + AutomaticRecoveryEnabled = options.AutomaticRecoveryEnabled, + ConsumerDispatchConcurrency = options.ConsumerDispatchConcurrency, + NetworkRecoveryInterval = options.NetworkRecoveryInterval, + RequestedConnectionTimeout = options.RequestedConnectionTimeout, + RequestedHeartbeat = options.RequestedHeartbeat, }; + return factory; }); services.TryAddSingleton(); services.TryAddScoped(); - services.TryAddScoped(typeof(IQueueChannelProvider<>), typeof(QueueChannelProvider<>)); services.TryAddScoped(typeof(IQueueProducer<>), typeof(QueueProducer<>)); + + return services; } - public static void AddQueueMessageConsumer(this IServiceCollection services) where TMessageConsumer : IQueueConsumer where TQueueMessage : class, IQueueMessage + public static IServiceCollection AddQueueMessageConsumer( + this IServiceCollection services) + where TMessageConsumer : class, IQueueConsumer + where TQueueMessage : class, IQueueMessage { - services.AddScoped(typeof(TMessageConsumer)); + services.AddScoped(); services.AddScoped, QueueConsumerHandler>(); services.AddHostedService>(); + + return services; + } + + /// + /// Adds multiple message consumers at once + /// + public static IServiceCollection AddQueueMessageConsumers( + this IServiceCollection services, + params Type[] consumerTypes) + { + foreach (var consumerType in consumerTypes) + { + if (!consumerType.IsClass || consumerType.IsAbstract) + throw new ArgumentException($"Consumer type {consumerType.Name} must be a concrete class"); + + var queueConsumerInterface = consumerType.GetInterface("IQueueConsumer`1") ?? throw new ArgumentException($"Consumer type {consumerType.Name} must implement IQueueConsumer"); + var messageType = queueConsumerInterface.GetGenericArguments()[0]; + + // Register the consumer type as scoped + services.AddScoped(consumerType); + + // Register IQueueConsumerHandler with its implementation + var handlerInterfaceType = typeof(IQueueConsumerHandler<,>).MakeGenericType(consumerType, messageType); + var handlerImplType = typeof(QueueConsumerHandler<,>).MakeGenericType(consumerType, messageType); + services.AddScoped(handlerInterfaceType, handlerImplType); + + // Register the hosted service for the consumer + var registratorType = typeof(QueueConsumerRegistratorService<,>).MakeGenericType(consumerType, messageType); + services.AddHostedService(registratorType); + } + + return services; } } -} +} \ No newline at end of file diff --git a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/RabbitMQOptions.cs b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/RabbitMQOptions.cs index 5c86728d3..a4de55233 100644 --- a/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/RabbitMQOptions.cs +++ b/applications/Unity.GrantManager/modules/Unity.SharedKernel/MessageBrokers.RabbitMQ/RabbitMQOptions.cs @@ -1,12 +1,24 @@ -namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ +using System; +using Microsoft.Extensions.Configuration; +using Unity.Modules.Shared.MessageBrokers.RabbitMQ.Constants; + +namespace Unity.Modules.Shared.MessageBrokers.RabbitMQ { - public class RabbitMQOptions + public class RabbitMQOptions(IConfiguration configuration) { - public string HostName { get; set; } = "localhost"; - public int Port { get; set; } = 5672; - public string UserName { get; set; } = "guest"; - public string Password { get; set; } = "guest"; - public string VirtualHost { get; set; } = "/"; + public const string SectionName = "RabbitMQ"; + + public string UserName { get; set; } = configuration.GetValue("RabbitMQ:UserName") ?? "guest"; + public string Password { get; set; } = configuration.GetValue("RabbitMQ:Password") ?? "guest"; + public string HostName { get; set; } = configuration.GetValue("RabbitMQ:HostName") ?? "localhost"; + public string VirtualHost { get; set; } = configuration.GetValue("RabbitMQ:VirtualHost") ?? "/"; + public int Port { get; set; } = configuration.GetValue("RabbitMQ:Port", 5672); + public int ConsumerDispatchConcurrency { get; set; } = QueueingConstants.MAX_RABBIT_CONCURRENT_CONSUMERS; + public bool AutomaticRecoveryEnabled { get; set; } = true; + public bool DispatchConsumersAsync { get; set; } = true; + public TimeSpan NetworkRecoveryInterval { get; set; } = TimeSpan.FromSeconds(10); + public TimeSpan RequestedConnectionTimeout { get; set; } = TimeSpan.FromSeconds(30); + public TimeSpan RequestedHeartbeat { get; set; } = TimeSpan.FromSeconds(60); } } diff --git a/applications/Unity.GrantManager/src/Unity.GrantManager.Application/Integrations/Chefs/FormsApiService.cs b/applications/Unity.GrantManager/src/Unity.GrantManager.Application/Integrations/Chefs/FormsApiService.cs index 070de5eda..efb36a82e 100644 --- a/applications/Unity.GrantManager/src/Unity.GrantManager.Application/Integrations/Chefs/FormsApiService.cs +++ b/applications/Unity.GrantManager/src/Unity.GrantManager.Application/Integrations/Chefs/FormsApiService.cs @@ -63,7 +63,7 @@ public async Task GetForm(Guid? formId, string chefsApplicationFormGuid string url = $"{chefsApi}/forms/{formId}"; var response = await GetRequestAsync(url, chefsApplicationFormGuid, encryptedApiKey); - return await ParseJsonResponseAsync(response) ?? new JObject(); + return await ParseJsonResponseAsync(response) ?? []; } public async Task GetSubmissionDataAsync(Guid chefsFormId, Guid submissionId) diff --git a/applications/Unity.GrantManager/src/Unity.GrantManager.Application/Integrations/Endpoints/EndpointManagementAppService.cs b/applications/Unity.GrantManager/src/Unity.GrantManager.Application/Integrations/Endpoints/EndpointManagementAppService.cs index 5718fe268..21cb718ec 100644 --- a/applications/Unity.GrantManager/src/Unity.GrantManager.Application/Integrations/Endpoints/EndpointManagementAppService.cs +++ b/applications/Unity.GrantManager/src/Unity.GrantManager.Application/Integrations/Endpoints/EndpointManagementAppService.cs @@ -35,9 +35,7 @@ private async Task AddToKeySetAsync(string cacheKey, Guid? tenantId) { var keySetKey = BuildCacheKeySetKey(tenantId); var existing = await _cache.GetStringAsync(keySetKey); - var keySet = string.IsNullOrEmpty(existing) - ? new HashSet() - : System.Text.Json.JsonSerializer.Deserialize>(existing) ?? new HashSet(); + var keySet = string.IsNullOrEmpty(existing) ? [] : System.Text.Json.JsonSerializer.Deserialize>(existing) ?? []; keySet.Add(cacheKey); var serialized = System.Text.Json.JsonSerializer.Serialize(keySet); @@ -80,17 +78,13 @@ public async Task GetChefsApiBaseUrlAsync() public async Task GetUgmUrlByKeyNameAsync(string keyName) { var url = await GetUrlByKeyNameInternalAsync(keyName, tenantSpecific: false); - if (url == null) - throw new UserFriendlyException($"URL for key '{keyName}' not configured."); - return url; + return url ?? throw new UserFriendlyException($"URL for key '{keyName}' not configured."); } public async Task GetUrlByKeyNameAsync(string keyName) { var url = await GetUrlByKeyNameInternalAsync(keyName, tenantSpecific: true); - if (url == null) - throw new UserFriendlyException($"URL for key '{keyName}' not configured."); - return url; + return url ?? throw new UserFriendlyException($"URL for key '{keyName}' not configured."); } private async Task GetUrlByKeyNameInternalAsync(string keyName, bool tenantSpecific)