1- using Microsoft . Extensions . Logging ;
2- using RabbitMQ . Client ;
31using System ;
42using System . Collections . Concurrent ;
53using System . Threading ;
4+ using Microsoft . Extensions . Logging ;
5+ using RabbitMQ . Client ;
66using Unity . Modules . Shared . MessageBrokers . RabbitMQ . Interfaces ;
77
88namespace Unity . Modules . Shared . MessageBrokers . RabbitMQ
99{
10- public sealed class ChannelProvider : IChannelProvider , IDisposable
10+ public sealed class PooledChannelProvider (
11+ IConnectionProvider connectionProvider ,
12+ ILogger < PooledChannelProvider > logger ,
13+ int maxChannels = PooledChannelProvider . DefaultMaxChannels ) : IChannelProvider
1114 {
12- private readonly IConnectionProvider _connectionProvider ;
13- private readonly ILogger < ChannelProvider > _logger ;
14- private readonly int _maxChannels ;
15+ private readonly IConnectionProvider _connectionProvider = connectionProvider ?? throw new ArgumentNullException ( nameof ( connectionProvider ) ) ;
16+ private readonly ILogger < PooledChannelProvider > _logger = logger ?? throw new ArgumentNullException ( nameof ( logger ) ) ;
17+ private readonly int _maxChannels = maxChannels ;
1518 private readonly ConcurrentQueue < IModel > _channelPool = new ( ) ;
1619 private int _currentChannelCount ;
1720 private bool _disposed ;
18- private const int DefaultMaxChannels = 10000 ;
1921
20- public ChannelProvider ( IConnectionProvider connectionProvider , ILogger < ChannelProvider > logger , int maxChannels = DefaultMaxChannels )
21- {
22- _connectionProvider = connectionProvider ?? throw new ArgumentNullException ( nameof ( connectionProvider ) ) ;
23- _logger = logger ?? throw new ArgumentNullException ( nameof ( logger ) ) ;
24- _maxChannels = maxChannels ;
25- }
22+ private const int DefaultMaxChannels = 1000 ;
2623
24+ /// <summary>
25+ /// Get a channel from the pool or create a new one if under max limit
26+ /// </summary>
2727 public IModel ? GetChannel ( )
2828 {
2929 ThrowIfDisposed ( ) ;
3030
31- // Try to reuse a channel from the pool
31+ // Try to reuse a channel
3232 while ( _channelPool . TryDequeue ( out var channel ) )
3333 {
34- if ( channel . IsOpen )
35- return channel ;
36-
34+ if ( channel . IsOpen ) return channel ;
3735 DisposeChannel ( channel ) ;
3836 }
3937
40- // Try to create a new channel if we haven't reached max
38+ // Create a new channel if we have capacity
4139 if ( Interlocked . Increment ( ref _currentChannelCount ) <= _maxChannels )
4240 {
4341 try
4442 {
4543 var connection = _connectionProvider . GetConnection ( ) ;
4644 if ( connection != null && connection . IsOpen )
45+ {
4746 return connection . CreateModel ( ) ;
47+ }
4848
4949 _logger . LogWarning ( "RabbitMQ connection is not open." ) ;
50- Interlocked . Decrement ( ref _currentChannelCount ) ; // failed to create
50+ Interlocked . Decrement ( ref _currentChannelCount ) ;
5151 }
5252 catch ( Exception ex )
5353 {
5454 _logger . LogError ( ex , "Error creating RabbitMQ channel." ) ;
55- Interlocked . Decrement ( ref _currentChannelCount ) ; // failed to create
55+ Interlocked . Decrement ( ref _currentChannelCount ) ;
5656 }
5757 }
5858 else
5959 {
60- Interlocked . Decrement ( ref _currentChannelCount ) ; // revert increment since max reached
60+ Interlocked . Decrement ( ref _currentChannelCount ) ;
6161 _logger . LogWarning ( "Max channel count reached ({MaxChannels}). Cannot create new channel." , _maxChannels ) ;
6262 }
6363
6464 return null ;
6565 }
6666
67+ /// <summary>
68+ /// Return a channel to the pool
69+ /// </summary>
6770 public void ReturnChannel ( IModel channel )
6871 {
69- if ( _disposed )
72+ if ( _disposed || channel == null )
7073 {
71- DisposeChannel ( channel ) ;
74+ if ( channel != null )
75+ DisposeChannel ( channel ) ;
7276 return ;
7377 }
7478
@@ -80,39 +84,22 @@ public void ReturnChannel(IModel channel)
8084
8185 private void DisposeChannel ( IModel channel )
8286 {
83- try
84- {
85- if ( channel . IsOpen )
86- channel . Close ( ) ;
87- }
88- catch ( Exception ex )
89- {
90- _logger . LogWarning ( ex , "Exception while closing RabbitMQ channel." ) ;
91- }
87+ if ( channel == null ) return ;
9288
93- try
94- {
95- channel . Dispose ( ) ;
96- }
97- catch ( Exception ex )
98- {
99- _logger . LogWarning ( ex , "Exception while disposing RabbitMQ channel." ) ;
100- }
89+ try { if ( channel . IsOpen ) channel . Close ( ) ; } catch ( Exception ex ) { _logger . LogWarning ( ex , "Error closing channel." ) ; }
90+ try { channel . Dispose ( ) ; } catch ( Exception ex ) { _logger . LogWarning ( ex , "Error disposing channel." ) ; }
10191
10292 Interlocked . Decrement ( ref _currentChannelCount ) ;
10393 }
10494
10595 private void ThrowIfDisposed ( )
10696 {
107- if ( _disposed )
108- throw new ObjectDisposedException ( nameof ( ChannelProvider ) ) ;
97+ ObjectDisposedException . ThrowIf ( _disposed , nameof ( PooledChannelProvider ) ) ;
10998 }
11099
111100 public void Dispose ( )
112101 {
113- if ( _disposed )
114- return ;
115-
102+ if ( _disposed ) return ;
116103 _disposed = true ;
117104
118105 while ( _channelPool . TryDequeue ( out var channel ) )
0 commit comments