Skip to content

Commit 213f250

Browse files
committed
Refactor rabbit provider by breaking out helper method for reuse.
1 parent cb26cd2 commit 213f250

1 file changed

Lines changed: 25 additions & 28 deletions

File tree

src/OpenDDD/Infrastructure/Events/RabbitMq/RabbitMqMessagingProvider.cs

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -45,22 +45,11 @@ public async Task<ISubscription> SubscribeAsync(string topic, string consumerGro
4545

4646
if (_channel is null) throw new InvalidOperationException("RabbitMQ channel is not available.");
4747

48-
bool exchangeExists = await ExchangeExistsAsync(topic, cancellationToken);
49-
50-
if (!exchangeExists)
48+
if (_autoCreateTopics)
5149
{
52-
if (_autoCreateTopics)
53-
{
54-
await _channel.ExchangeDeclareAsync(topic, ExchangeType.Fanout, durable: true, autoDelete: false, cancellationToken: cancellationToken);
55-
_logger.LogInformation("Auto-created exchange (topic): {Topic}", topic);
56-
}
57-
else
58-
{
59-
_logger.LogError("Cannot subscribe to non-existent topic: {Topic}", topic);
60-
throw new InvalidOperationException($"Topic '{topic}' does not exist.");
61-
}
50+
await CreateTopicIfNotExistsAsync(topic, cancellationToken);
6251
}
63-
52+
6453
await _channel.ExchangeDeclareAsync(topic, ExchangeType.Fanout, durable: true, autoDelete: false, cancellationToken: cancellationToken);
6554
var queueName = $"{consumerGroup}.{topic}";
6655
await _channel.QueueDeclareAsync(queueName, durable: true, exclusive: false, autoDelete: false, cancellationToken: cancellationToken);
@@ -76,7 +65,7 @@ public async Task<ISubscription> SubscribeAsync(string topic, string consumerGro
7665

7766
return subscription;
7867
}
79-
68+
8069
public async Task UnsubscribeAsync(ISubscription subscription, CancellationToken cancellationToken = default)
8170
{
8271
if (subscription == null)
@@ -107,20 +96,9 @@ public async Task PublishAsync(string topic, string message, CancellationToken c
10796

10897
if (_channel is null) throw new InvalidOperationException("RabbitMQ channel is not available.");
10998

110-
bool exchangeExists = await ExchangeExistsAsync(topic, cancellationToken);
111-
112-
if (!exchangeExists)
99+
if (_autoCreateTopics)
113100
{
114-
if (_autoCreateTopics)
115-
{
116-
await _channel.ExchangeDeclareAsync(topic, ExchangeType.Fanout, durable: true, autoDelete: false, cancellationToken: cancellationToken);
117-
_logger.LogInformation("Auto-created exchange (topic): {Topic}", topic);
118-
}
119-
else
120-
{
121-
_logger.LogError("Cannot publish to non-existent topic: {Topic}", topic);
122-
throw new InvalidOperationException($"Topic '{topic}' does not exist.");
123-
}
101+
await CreateTopicIfNotExistsAsync(topic, cancellationToken);
124102
}
125103

126104
var body = Encoding.UTF8.GetBytes(message);
@@ -137,6 +115,25 @@ private async Task EnsureConnectedAsync(CancellationToken cancellationToken)
137115
_channel = await _connection.CreateChannelAsync(null, cancellationToken);
138116
}
139117

118+
private async Task CreateTopicIfNotExistsAsync(string topic, CancellationToken cancellationToken)
119+
{
120+
bool exchangeExists = await ExchangeExistsAsync(topic, cancellationToken);
121+
122+
if (!exchangeExists)
123+
{
124+
if (_autoCreateTopics)
125+
{
126+
await _channel.ExchangeDeclareAsync(topic, ExchangeType.Fanout, durable: true, autoDelete: false, cancellationToken: cancellationToken);
127+
_logger.LogInformation("Auto-created exchange (topic): {Topic}", topic);
128+
}
129+
else
130+
{
131+
_logger.LogError("Cannot subscribe to non-existent topic: {Topic}", topic);
132+
throw new InvalidOperationException($"Topic '{topic}' does not exist.");
133+
}
134+
}
135+
}
136+
140137
private async Task<bool> ExchangeExistsAsync(string exchange, CancellationToken cancellationToken)
141138
{
142139
try

0 commit comments

Comments
 (0)