Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions MessageBusReader/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class Program
private static ServiceBusClient _client;
private static ServiceBusProcessor _processor;

private const string QueueName = "error";
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this new variable errorQueueName please?


Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have several different error queues I though we might benefit from a global variable to switch which queue we are processing.

private static TaskCompletionSource<int> _taskCompletionSource;
private static Task<int> _loopTask;
private static int _completeCounter = 0;
Expand Down Expand Up @@ -54,7 +56,7 @@ static async Task MainAsync()
ReceiveMode = ServiceBusReceiveMode.PeekLock,
};

_processor = _client.CreateProcessor("error", options);
_processor = _client.CreateProcessor(QueueName, options);

_processor.ProcessMessageAsync += ProcessMessagesAsync;
_processor.ProcessErrorAsync += ExceptionReceivedHandler;
Expand Down Expand Up @@ -105,6 +107,17 @@ private static async Task ProcessMessagesAsync(ProcessMessageEventArgs args)

string type = typeValue.ToString();

// Academy class/chapter create race condition.
if (type == "Edrington.Academy.Contracts.Events.AcademyCourseInteracted, Edrington.Academy.Contracts")
{
if (message.ContainsError("VALIDATION_ERROR"))
{
await ReturnToSource(args, _delay);
_delay++;
return;
}
}

// SynchroniseConsentPreferences
if (type == "Edrington.Data.Consumer.Commands.SynchroniseConsentPreferences, Edrington.Data")
{
Expand Down Expand Up @@ -256,11 +269,9 @@ static async Task MoveDeadletter()
SubQueue = SubQueue.DeadLetter
};

string queueName = "error";

_processor = _client.CreateProcessor(queueName, options);
_processor = _client.CreateProcessor(QueueName, options);

_processor.ProcessMessageAsync += args => ReturnDeadletterAsync(args, queueName);
_processor.ProcessMessageAsync += args => ReturnDeadletterAsync(args, QueueName);
_processor.ProcessErrorAsync += ExceptionReceivedHandler;

_taskCompletionSource = new TaskCompletionSource<int>();
Expand Down