diff --git a/src/DurableTask.Core/Logging/EventIds.cs b/src/DurableTask.Core/Logging/EventIds.cs
index f23459c79..049af43f3 100644
--- a/src/DurableTask.Core/Logging/EventIds.cs
+++ b/src/DurableTask.Core/Logging/EventIds.cs
@@ -31,6 +31,7 @@ static class EventIds
public const int ProcessWorkItemStarting = 27;
public const int ProcessWorkItemCompleted = 28;
public const int ProcessWorkItemFailed = 29;
+ public const int DispatcherLoopFailed = 30;
public const int SchedulingOrchestration = 40;
public const int RaisingEvent = 41;
diff --git a/src/DurableTask.Core/Logging/LogEvents.cs b/src/DurableTask.Core/Logging/LogEvents.cs
index 360528638..e5fe2ee33 100644
--- a/src/DurableTask.Core/Logging/LogEvents.cs
+++ b/src/DurableTask.Core/Logging/LogEvents.cs
@@ -158,6 +158,37 @@ void IEventSourceEvent.WriteEventSource() =>
StructuredEventSource.Log.DispatcherStopped(this.Dispatcher, Utils.AppName, Utils.PackageVersion);
}
+ internal class DispatcherLoopFailed : StructuredLogEvent, IEventSourceEvent
+ {
+ public DispatcherLoopFailed(WorkItemDispatcherContext context, Exception exception)
+ {
+ this.Dispatcher = context.GetDisplayName();
+ this.Details = exception.ToString();
+ }
+
+ [StructuredLogField]
+ public string Dispatcher { get; }
+
+ [StructuredLogField]
+ public string Details { get; }
+
+ public override EventId EventId => new EventId(
+ EventIds.DispatcherLoopFailed,
+ nameof(EventIds.DispatcherLoopFailed));
+
+ public override LogLevel Level => LogLevel.Error;
+
+ protected override string CreateLogMessage() =>
+ $"{this.Dispatcher}: Unhandled exception in dispatch loop. Will retry after backoff. Details: {this.Details}";
+
+ void IEventSourceEvent.WriteEventSource() =>
+ StructuredEventSource.Log.DispatcherLoopFailed(
+ this.Dispatcher,
+ this.Details,
+ Utils.AppName,
+ Utils.PackageVersion);
+ }
+
internal class DispatchersStopping : StructuredLogEvent, IEventSourceEvent
{
public DispatchersStopping(string name, string id, int concurrentWorkItemCount, int activeFetchers)
diff --git a/src/DurableTask.Core/Logging/LogHelper.cs b/src/DurableTask.Core/Logging/LogHelper.cs
index 1ae501b79..6efbe0cfe 100644
--- a/src/DurableTask.Core/Logging/LogHelper.cs
+++ b/src/DurableTask.Core/Logging/LogHelper.cs
@@ -107,6 +107,21 @@ internal void DispatcherStopped(WorkItemDispatcherContext context)
}
}
+ ///
+ /// Logs that a work item dispatch loop encountered an unhandled exception.
+ ///
+ /// The context of the dispatcher that failed.
+ /// The unhandled exception.
+ internal void DispatcherLoopFailed(WorkItemDispatcherContext context, Exception exception)
+ {
+ if (this.IsStructuredLoggingEnabled)
+ {
+ this.WriteStructuredLog(
+ new LogEvents.DispatcherLoopFailed(context, exception),
+ exception);
+ }
+ }
+
///
/// Logs that the work item dispatcher is watching for individual dispatch loops to finish stopping.
///
diff --git a/src/DurableTask.Core/Logging/StructuredEventSource.cs b/src/DurableTask.Core/Logging/StructuredEventSource.cs
index 47a94b1a4..129bac158 100644
--- a/src/DurableTask.Core/Logging/StructuredEventSource.cs
+++ b/src/DurableTask.Core/Logging/StructuredEventSource.cs
@@ -102,6 +102,15 @@ internal void DispatcherStopped(string Dispatcher, string AppName, string Extens
}
}
+ [Event(EventIds.DispatcherLoopFailed, Level = EventLevel.Error, Version = 1)]
+ internal void DispatcherLoopFailed(string Dispatcher, string Details, string AppName, string ExtensionVersion)
+ {
+ if (this.IsEnabled(EventLevel.Error))
+ {
+ this.WriteEvent(EventIds.DispatcherLoopFailed, Dispatcher, Details, AppName, ExtensionVersion);
+ }
+ }
+
[Event(EventIds.DispatchersStopping, Level = EventLevel.Verbose, Version = 1)]
internal void DispatchersStopping(
string Dispatcher,
diff --git a/src/DurableTask.Core/WorkItemDispatcher.cs b/src/DurableTask.Core/WorkItemDispatcher.cs
index 3829578d8..1a64065e5 100644
--- a/src/DurableTask.Core/WorkItemDispatcher.cs
+++ b/src/DurableTask.Core/WorkItemDispatcher.cs
@@ -142,7 +142,14 @@ public async Task StartAsync()
// We just want this to Run we intentionally don't wait
#pragma warning disable 4014
- Task.Run(() => this.DispatchAsync(context));
+ Task.Run(() => this.DispatchAsync(context)).ContinueWith(t =>
+ {
+ TraceHelper.TraceException(
+ TraceEventType.Critical,
+ "WorkItemDispatcherDispatch-FatalTermination",
+ t.Exception,
+ $"Dispatch loop for '{this.name}' terminated fatally!");
+ }, TaskContinuationOptions.OnlyOnFaulted);
#pragma warning restore 4014
}
}
@@ -224,128 +231,167 @@ async Task DispatchAsync(WorkItemDispatcherContext context)
bool logThrottle = true;
while (this.isStarted)
{
- if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5)))
+ var semaphoreAcquired = false;
+ var scheduledWorkItem = false;
+ try
{
- if (logThrottle)
+ if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5)))
{
- // This can happen frequently under heavy load.
- // To avoid log spam, we log just once until we can proceed.
- this.LogHelper.FetchingThrottled(
- context,
- this.concurrentWorkItemCount,
- this.MaxConcurrentWorkItems);
- TraceHelper.Trace(
- TraceEventType.Warning,
- "WorkItemDispatcherDispatch-MaxOperations",
- this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept."));
-
- logThrottle = false;
+ if (logThrottle)
+ {
+ // This can happen frequently under heavy load.
+ // To avoid log spam, we log just once until we can proceed.
+ this.LogHelper.FetchingThrottled(
+ context,
+ this.concurrentWorkItemCount,
+ this.MaxConcurrentWorkItems);
+ TraceHelper.Trace(
+ TraceEventType.Warning,
+ "WorkItemDispatcherDispatch-MaxOperations",
+ this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept."));
+
+ logThrottle = false;
+ }
+
+ continue;
}
- continue;
- }
+ semaphoreAcquired = true;
+ logThrottle = true;
- logThrottle = true;
+ var delaySecs = 0;
+ T workItem = default(T);
+ try
+ {
+ Interlocked.Increment(ref this.activeFetchers);
+ this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems);
+ TraceHelper.Trace(
+ TraceEventType.Verbose,
+ "WorkItemDispatcherDispatch-StartFetch",
+ this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
- var delaySecs = 0;
- T workItem = default(T);
- try
- {
- Interlocked.Increment(ref this.activeFetchers);
- this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems);
- TraceHelper.Trace(
- TraceEventType.Verbose,
- "WorkItemDispatcherDispatch-StartFetch",
- this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
+ Stopwatch timer = Stopwatch.StartNew();
+ workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token);
- Stopwatch timer = Stopwatch.StartNew();
- workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token);
+ if (!IsNull(workItem))
+ {
+ string workItemId = this.workItemIdentifier(workItem);
+ this.LogHelper.FetchWorkItemCompleted(
+ context,
+ workItemId,
+ timer.Elapsed,
+ this.concurrentWorkItemCount,
+ this.MaxConcurrentWorkItems);
+ }
- if (!IsNull(workItem))
+ TraceHelper.Trace(
+ TraceEventType.Verbose,
+ "WorkItemDispatcherDispatch-EndFetch",
+ this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
+ }
+ catch (TimeoutException)
{
- string workItemId = this.workItemIdentifier(workItem);
- this.LogHelper.FetchWorkItemCompleted(
- context,
- workItemId,
- timer.Elapsed,
- this.concurrentWorkItemCount,
- this.MaxConcurrentWorkItems);
+ delaySecs = 0;
}
-
- TraceHelper.Trace(
- TraceEventType.Verbose,
- "WorkItemDispatcherDispatch-EndFetch",
- this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
- }
- catch (TimeoutException)
- {
- delaySecs = 0;
- }
- catch (TaskCanceledException exception)
- {
- TraceHelper.Trace(
- TraceEventType.Information,
- "WorkItemDispatcherDispatch-TaskCanceledException",
- this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}"));
- delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
- }
- catch (Exception exception)
- {
- if (!this.isStarted)
+ catch (TaskCanceledException exception)
{
TraceHelper.Trace(
- TraceEventType.Information,
- "WorkItemDispatcherDispatch-HarmlessException",
- this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}"));
+ TraceEventType.Information,
+ "WorkItemDispatcherDispatch-TaskCanceledException",
+ this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}"));
+ delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
}
- else
+ catch (Exception exception)
{
- this.LogHelper.FetchWorkItemFailure(context, exception);
- // TODO : dump full node context here
- TraceHelper.TraceException(
- TraceEventType.Warning,
- "WorkItemDispatcherDispatch-Exception",
- exception,
- this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}"));
- delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
+ if (!this.isStarted)
+ {
+ TraceHelper.Trace(
+ TraceEventType.Information,
+ "WorkItemDispatcherDispatch-HarmlessException",
+ this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}"));
+ }
+ else
+ {
+ this.LogHelper.FetchWorkItemFailure(context, exception);
+ // TODO : dump full node context here
+ TraceHelper.TraceException(
+ TraceEventType.Warning,
+ "WorkItemDispatcherDispatch-Exception",
+ exception,
+ this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}"));
+ delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
+ }
+ }
+ finally
+ {
+ Interlocked.Decrement(ref this.activeFetchers);
}
- }
- finally
- {
- Interlocked.Decrement(ref this.activeFetchers);
- }
- var scheduledWorkItem = false;
- if (!IsNull(workItem))
- {
- if (!this.isStarted)
+ if (!IsNull(workItem))
{
- if (this.SafeReleaseWorkItem != null)
+ if (!this.isStarted)
{
- await this.SafeReleaseWorkItem(workItem);
+ if (this.SafeReleaseWorkItem != null)
+ {
+ await this.SafeReleaseWorkItem(workItem);
+ }
+ }
+ else
+ {
+ Interlocked.Increment(ref this.concurrentWorkItemCount);
+ // We just want this to Run we intentionally don't wait
+ #pragma warning disable 4014
+ Task.Run(() => this.ProcessWorkItemAsync(context, workItem));
+ #pragma warning restore 4014
+
+ scheduledWorkItem = true;
}
}
- else
+
+ delaySecs = Math.Max(this.delayOverrideSecs, delaySecs);
+ if (delaySecs > 0)
{
- Interlocked.Increment(ref this.concurrentWorkItemCount);
- // We just want this to Run we intentionally don't wait
- #pragma warning disable 4014
- Task.Run(() => this.ProcessWorkItemAsync(context, workItem));
- #pragma warning restore 4014
+ await Task.Delay(TimeSpan.FromSeconds(delaySecs));
+ }
- scheduledWorkItem = true;
+ if (!scheduledWorkItem)
+ {
+ this.concurrencyLock.Release();
}
}
-
- delaySecs = Math.Max(this.delayOverrideSecs, delaySecs);
- if (delaySecs > 0)
+ catch (Exception exception) when (!Utils.IsFatal(exception))
{
- await Task.Delay(TimeSpan.FromSeconds(delaySecs));
- }
+ // Catch-all for any unhandled exception in the dispatch loop body.
+ // Without this, the dispatch loop would silently terminate because
+ // DispatchAsync runs as a fire-and-forget Task.Run.
+ this.LogHelper.DispatcherLoopFailed(context, exception);
+ TraceHelper.TraceException(
+ TraceEventType.Error,
+ "WorkItemDispatcherDispatch-UnhandledException",
+ exception,
+ this.GetFormattedLog(dispatcherId,
+ $"Unhandled exception in dispatch loop. Will retry after backoff."));
+
+ // Release the semaphore if we acquired it but never handed it off
+ // to ProcessWorkItemAsync, to avoid permanently reducing concurrency.
+ if (semaphoreAcquired && !scheduledWorkItem)
+ {
+ try { this.concurrencyLock.Release(); } catch { /* best effort */ }
+ }
- if (!scheduledWorkItem)
- {
- this.concurrencyLock.Release();
+ try
+ {
+ await Task.Delay(TimeSpan.FromSeconds(BackOffIntervalOnInvalidOperationSecs), this.shutdownCancellationTokenSource.Token);
+ }
+ catch (OperationCanceledException)
+ {
+ // Shutdown requested during backoff; exit promptly.
+ }
+ catch (ObjectDisposedException)
+ {
+ // CancellationTokenSource was disposed (e.g., Dispose called
+ // shortly after StopAsync); treat as shutdown.
+ }
}
}
diff --git a/test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs b/test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs
new file mode 100644
index 000000000..275503748
--- /dev/null
+++ b/test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs
@@ -0,0 +1,442 @@
+// ----------------------------------------------------------------------------------
+// Copyright Microsoft Corporation
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ----------------------------------------------------------------------------------
+#nullable enable
+namespace DurableTask.Core.Tests
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using DurableTask.Core.Logging;
+ using Microsoft.Extensions.Logging;
+ using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+ [TestClass]
+ public class WorkItemDispatcherTests
+ {
+ // A generous-but-bounded timeout for CI. Tests normally complete in <1s;
+ // this guards against hangs without wasting time on every run.
+ static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(10);
+ [TestMethod]
+ public async Task DispatchLoop_SurvivesOuterException_ViaFaultyDelayCallback()
+ {
+ // Arrange: GetDelayInSecondsAfterOnFetchException is called from inside the
+ // inner catch block, but if it throws, the exception escapes the inner
+ // catch and is caught by the outer catch — exercising the new safety net.
+ // We verify that DispatcherLoopFailed is logged and the loop continues.
+ int fetchCallCount = 0;
+ var secondFetchStarted = new TaskCompletionSource();
+
+ var logMessages = new ConcurrentBag();
+ var loggerFactory = LoggerFactory.Create(builder =>
+ {
+ builder.AddProvider(new InMemoryLoggerProvider(logMessages));
+ builder.SetMinimumLevel(LogLevel.Trace);
+ });
+
+ var dispatcher = new WorkItemDispatcher(
+ "TestDispatcher",
+ workItemIdentifier: item => item ?? "null",
+ fetchWorkItem: (timeout, ct) =>
+ {
+ int count = Interlocked.Increment(ref fetchCallCount);
+ if (count == 1)
+ {
+ // This exception will be caught by the inner catch, which then
+ // calls GetDelayInSecondsAfterOnFetchException (which also throws).
+ throw new InvalidOperationException("Trigger inner catch");
+ }
+
+ // If we reach here, the outer catch handled the callback failure
+ // and the loop continued.
+ secondFetchStarted.TrySetResult(true);
+ return Task.FromResult(null!);
+ },
+ processWorkItem: item => Task.CompletedTask);
+
+ dispatcher.MaxConcurrentWorkItems = 1;
+ dispatcher.DispatcherCount = 1;
+ dispatcher.LogHelper = new LogHelper(loggerFactory.CreateLogger("DurableTask.Core"));
+
+ // This callback is invoked from the inner catch. Throwing here causes
+ // the exception to escape to the outer catch.
+ int delayCallCount = 0;
+ dispatcher.GetDelayInSecondsAfterOnFetchException = (ex) =>
+ {
+ if (Interlocked.Increment(ref delayCallCount) == 1)
+ {
+ throw new InvalidOperationException("Failure in delay callback");
+ }
+
+ return 0;
+ };
+
+ // Act
+ await dispatcher.StartAsync();
+
+ // Wait for the loop to recover from the outer catch and start a second fetch.
+ // The outer catch backoff is 10s, but may be cancelled on shutdown. We give it
+ // enough time for the backoff to elapse. If the loop died, this will time out.
+ bool recovered = await Task.WhenAny(
+ secondFetchStarted.Task,
+ Task.Delay(TimeSpan.FromSeconds(15))) == secondFetchStarted.Task;
+
+ await dispatcher.StopAsync(forced: true);
+ dispatcher.Dispose();
+
+ // Assert: The dispatch loop survived the outer exception and continued
+ Assert.IsTrue(recovered, "Dispatch loop should have recovered after outer catch exception.");
+
+ // The outer catch should have logged DispatcherLoopFailed
+ bool hasLoopFailed = logMessages.Any(m =>
+ m.EventId.Name == nameof(Logging.EventIds.DispatcherLoopFailed));
+ Assert.IsTrue(hasLoopFailed, "Expected DispatcherLoopFailed log event from the outer catch.");
+ }
+
+ [TestMethod]
+ public async Task DispatchLoop_SurvivesMultipleExceptionTypes_AndContinuesProcessing()
+ {
+ // Arrange: Throw a variety of exception types from fetch across consecutive calls.
+ // All are handled by the inner catch, but this verifies the loop keeps going.
+ int fetchCallCount = 0;
+ var recoverySignal = new TaskCompletionSource();
+
+ var dispatcher = new WorkItemDispatcher(
+ "TestDispatcher",
+ workItemIdentifier: item => item ?? "null",
+ fetchWorkItem: (timeout, ct) =>
+ {
+ int count = Interlocked.Increment(ref fetchCallCount);
+ switch (count)
+ {
+ case 1: throw new InvalidOperationException("Test exception");
+ case 2: throw new TimeoutException("Test timeout");
+ case 3: throw new TaskCanceledException("Test cancel");
+ default:
+ if (count >= 5)
+ {
+ recoverySignal.TrySetResult(true);
+ }
+
+ return Task.FromResult(null!);
+ }
+ },
+ processWorkItem: item => Task.CompletedTask);
+
+ dispatcher.MaxConcurrentWorkItems = 1;
+ dispatcher.DispatcherCount = 1;
+
+ // Act
+ await dispatcher.StartAsync();
+
+ bool recovered = await Task.WhenAny(
+ recoverySignal.Task,
+ Task.Delay(TestTimeout)) == recoverySignal.Task;
+
+ await dispatcher.StopAsync(forced: true);
+ dispatcher.Dispose();
+
+ // Assert: The dispatcher survived all exception types and kept fetching
+ Assert.IsTrue(recovered, "Dispatch loop should have recovered after multiple exception types.");
+ Assert.IsTrue(fetchCallCount >= 5, $"Expected at least 5 fetch calls, got {fetchCallCount}.");
+ }
+
+ [TestMethod]
+ public async Task DispatchLoop_LogsFetchWorkItemFailure_WhenFetchThrows()
+ {
+ // Arrange: Use a logging ILogger to capture log events.
+ // The fetch exception is handled by the inner catch, which logs
+ // FetchWorkItemFailure — verifying that path works correctly.
+ var logMessages = new ConcurrentBag();
+ var loggerFactory = LoggerFactory.Create(builder =>
+ {
+ builder.AddProvider(new InMemoryLoggerProvider(logMessages));
+ builder.SetMinimumLevel(LogLevel.Trace);
+ });
+
+ int fetchCallCount = 0;
+ var dispatcherRecovered = new TaskCompletionSource();
+
+ var dispatcher = new WorkItemDispatcher(
+ "TestDispatcher",
+ workItemIdentifier: item => item ?? "null",
+ fetchWorkItem: (timeout, ct) =>
+ {
+ int count = Interlocked.Increment(ref fetchCallCount);
+ if (count == 1)
+ {
+ // This exception is caught by the inner catch block,
+ // which logs FetchWorkItemFailure (not DispatcherLoopFailed).
+ throw new InvalidOperationException("Simulated fetch failure");
+ }
+
+ if (count >= 3)
+ {
+ dispatcherRecovered.TrySetResult(true);
+ }
+
+ return Task.FromResult(null!);
+ },
+ processWorkItem: item => Task.CompletedTask);
+
+ dispatcher.MaxConcurrentWorkItems = 1;
+ dispatcher.DispatcherCount = 1;
+ dispatcher.LogHelper = new LogHelper(loggerFactory.CreateLogger("DurableTask.Core"));
+
+ // Act
+ await dispatcher.StartAsync();
+
+ bool recovered = await Task.WhenAny(
+ dispatcherRecovered.Task,
+ Task.Delay(TestTimeout)) == dispatcherRecovered.Task;
+
+ await dispatcher.StopAsync(forced: true);
+ dispatcher.Dispose();
+
+ // Assert
+ Assert.IsTrue(recovered, "Dispatch loop should have recovered after exception.");
+
+ // The inner catch handles InvalidOperationException from fetch,
+ // so we verify FetchWorkItemFailure was logged
+ bool hasFetchFailure = logMessages.Any(m =>
+ m.EventId.Name == nameof(Logging.EventIds.FetchWorkItemFailure));
+ Assert.IsTrue(hasFetchFailure, "Expected FetchWorkItemFailure log event.");
+ }
+
+ [TestMethod]
+ public async Task DispatchLoop_ProcessesWorkItemsSuccessfully()
+ {
+ // Arrange
+ var processedItems = new ConcurrentBag();
+ int fetchCallCount = 0;
+ var allItemsProcessed = new TaskCompletionSource();
+ string[] workItems = { "item-1", "item-2", "item-3" };
+
+ var dispatcher = new WorkItemDispatcher(
+ "TestDispatcher",
+ workItemIdentifier: item => item,
+ fetchWorkItem: (timeout, ct) =>
+ {
+ int count = Interlocked.Increment(ref fetchCallCount);
+ if (count <= workItems.Length)
+ {
+ return Task.FromResult(workItems[count - 1]);
+ }
+
+ return Task.FromResult(null!);
+ },
+ processWorkItem: item =>
+ {
+ processedItems.Add(item);
+ if (processedItems.Count >= workItems.Length)
+ {
+ allItemsProcessed.TrySetResult(true);
+ }
+
+ return Task.CompletedTask;
+ });
+
+ dispatcher.MaxConcurrentWorkItems = 5;
+ dispatcher.DispatcherCount = 1;
+
+ // Act
+ await dispatcher.StartAsync();
+
+ bool completed = await Task.WhenAny(
+ allItemsProcessed.Task,
+ Task.Delay(TestTimeout)) == allItemsProcessed.Task;
+
+ await dispatcher.StopAsync(forced: true);
+ dispatcher.Dispose();
+
+ // Assert
+ Assert.IsTrue(completed, "All work items should have been processed.");
+ CollectionAssert.AreEquivalent(workItems, processedItems.ToArray());
+ }
+
+ [TestMethod]
+ public async Task DispatchLoop_StopsGracefully_WhenNoExceptions()
+ {
+ // Arrange
+ var logMessages = new ConcurrentBag();
+ var loggerFactory = LoggerFactory.Create(builder =>
+ {
+ builder.AddProvider(new InMemoryLoggerProvider(logMessages));
+ builder.SetMinimumLevel(LogLevel.Trace);
+ });
+
+ // Use a signal so we know the dispatch loop is actively running
+ // before we ask it to stop, rather than relying on a fixed delay.
+ var fetchStarted = new TaskCompletionSource();
+
+ var dispatcher = new WorkItemDispatcher(
+ "TestDispatcher",
+ workItemIdentifier: item => item ?? "null",
+ fetchWorkItem: (timeout, ct) =>
+ {
+ fetchStarted.TrySetResult(true);
+ return Task.FromResult(null!);
+ },
+ processWorkItem: item => Task.CompletedTask);
+
+ dispatcher.MaxConcurrentWorkItems = 1;
+ dispatcher.DispatcherCount = 1;
+ dispatcher.LogHelper = new LogHelper(loggerFactory.CreateLogger("DurableTask.Core"));
+
+ // Act
+ await dispatcher.StartAsync();
+
+ // Wait until the dispatch loop has actually started fetching
+ await Task.WhenAny(fetchStarted.Task, Task.Delay(TestTimeout));
+ Assert.IsTrue(fetchStarted.Task.IsCompleted, "Dispatch loop should have started fetching.");
+
+ await dispatcher.StopAsync(forced: false);
+
+ // The DispatcherStopped event is logged asynchronously after the
+ // dispatch loop exits. Poll for it instead of using a fixed delay.
+ var sw = Stopwatch.StartNew();
+ while (sw.Elapsed < TestTimeout)
+ {
+ if (logMessages.Any(m => m.EventId.Name == nameof(Logging.EventIds.DispatcherStopped)))
+ {
+ break;
+ }
+
+ await Task.Delay(50);
+ }
+
+ dispatcher.Dispose();
+
+ // Assert: DispatcherStopped event should be logged, not DispatcherLoopFailed
+ bool hasStopped = logMessages.Any(m =>
+ m.EventId.Name == nameof(Logging.EventIds.DispatcherStopped));
+ bool hasFailed = logMessages.Any(m =>
+ m.EventId.Name == nameof(Logging.EventIds.DispatcherLoopFailed));
+
+ Assert.IsTrue(hasStopped, "Expected DispatcherStopped log event on graceful shutdown.");
+ Assert.IsFalse(hasFailed, "DispatcherLoopFailed should not be logged during graceful shutdown.");
+ }
+
+ [TestMethod]
+ public void DispatcherLoopFailed_LogEvent_HasCorrectProperties()
+ {
+ // Arrange
+ var context = new WorkItemDispatcherContext("TestDispatcher", "test-id", "0");
+ var exception = new ObjectDisposedException("testSemaphore");
+
+ // Act
+ var logEvent = new LogEvents.DispatcherLoopFailed(context, exception);
+
+ // Assert
+ Assert.AreEqual(LogLevel.Error, logEvent.Level);
+ Assert.AreEqual(Logging.EventIds.DispatcherLoopFailed, logEvent.EventId.Id);
+ Assert.AreEqual(nameof(Logging.EventIds.DispatcherLoopFailed), logEvent.EventId.Name);
+
+ string message = ((ILogEvent)logEvent).FormattedMessage;
+ Assert.IsTrue(message.Contains("TestDispatcher"), "Message should contain the dispatcher name.");
+ Assert.IsTrue(message.Contains("Unhandled exception"), "Message should mention unhandled exception.");
+ Assert.IsTrue(message.Contains("testSemaphore"), "Message should contain exception details.");
+ }
+
+ [TestMethod]
+ public void DispatcherLoopFailed_LogEvent_ExposesStructuredFields()
+ {
+ // Arrange
+ var context = new WorkItemDispatcherContext("ActivityDispatcher", "abc123", "1");
+ var exception = new InvalidOperationException("Something went wrong");
+
+ // Act
+ var logEvent = new LogEvents.DispatcherLoopFailed(context, exception);
+
+ // Assert: Verify the structured log fields are accessible via the dictionary interface
+ var dict = (IReadOnlyDictionary)logEvent;
+ Assert.IsTrue(dict.ContainsKey("Dispatcher"), "Should have 'Dispatcher' field.");
+ Assert.IsTrue(dict.ContainsKey("Details"), "Should have 'Details' field.");
+
+ string dispatcher = (string)dict["Dispatcher"];
+ string details = (string)dict["Details"];
+
+ Assert.IsTrue(dispatcher.Contains("ActivityDispatcher"), "Dispatcher field should contain dispatcher name.");
+ Assert.IsTrue(details.Contains("Something went wrong"), "Details field should contain exception message.");
+ }
+
+ #region Test Helpers
+
+ ///
+ /// A simple log entry captured by the in-memory logger.
+ ///
+ class LogEntry
+ {
+ public LogLevel Level { get; set; }
+ public EventId EventId { get; set; }
+ public string? Message { get; set; }
+ public Exception? Exception { get; set; }
+ }
+
+ class NoOpDisposable : IDisposable
+ {
+ public static readonly NoOpDisposable Instance = new NoOpDisposable();
+ public void Dispose() { }
+ }
+
+ ///
+ /// An ILogger that captures log entries to a concurrent bag for test assertions.
+ ///
+ class InMemoryLogger : ILogger
+ {
+ readonly ConcurrentBag logs;
+
+ public InMemoryLogger(ConcurrentBag logs)
+ {
+ this.logs = logs;
+ }
+
+ public IDisposable BeginScope(TState state) => NoOpDisposable.Instance;
+
+ public bool IsEnabled(LogLevel logLevel) => true;
+
+ public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter)
+ {
+ this.logs.Add(new LogEntry
+ {
+ Level = logLevel,
+ EventId = eventId,
+ Message = formatter(state, exception),
+ Exception = exception,
+ });
+ }
+ }
+
+ ///
+ /// An ILoggerProvider that creates InMemoryLogger instances.
+ ///
+ class InMemoryLoggerProvider : ILoggerProvider
+ {
+ readonly ConcurrentBag logs;
+
+ public InMemoryLoggerProvider(ConcurrentBag logs)
+ {
+ this.logs = logs;
+ }
+
+ public ILogger CreateLogger(string categoryName) => new InMemoryLogger(this.logs);
+
+ public void Dispose() { }
+ }
+
+ #endregion
+ }
+}