diff --git a/src/DurableTask.Core/Logging/EventIds.cs b/src/DurableTask.Core/Logging/EventIds.cs index f23459c79..049af43f3 100644 --- a/src/DurableTask.Core/Logging/EventIds.cs +++ b/src/DurableTask.Core/Logging/EventIds.cs @@ -31,6 +31,7 @@ static class EventIds public const int ProcessWorkItemStarting = 27; public const int ProcessWorkItemCompleted = 28; public const int ProcessWorkItemFailed = 29; + public const int DispatcherLoopFailed = 30; public const int SchedulingOrchestration = 40; public const int RaisingEvent = 41; diff --git a/src/DurableTask.Core/Logging/LogEvents.cs b/src/DurableTask.Core/Logging/LogEvents.cs index 360528638..e5fe2ee33 100644 --- a/src/DurableTask.Core/Logging/LogEvents.cs +++ b/src/DurableTask.Core/Logging/LogEvents.cs @@ -158,6 +158,37 @@ void IEventSourceEvent.WriteEventSource() => StructuredEventSource.Log.DispatcherStopped(this.Dispatcher, Utils.AppName, Utils.PackageVersion); } + internal class DispatcherLoopFailed : StructuredLogEvent, IEventSourceEvent + { + public DispatcherLoopFailed(WorkItemDispatcherContext context, Exception exception) + { + this.Dispatcher = context.GetDisplayName(); + this.Details = exception.ToString(); + } + + [StructuredLogField] + public string Dispatcher { get; } + + [StructuredLogField] + public string Details { get; } + + public override EventId EventId => new EventId( + EventIds.DispatcherLoopFailed, + nameof(EventIds.DispatcherLoopFailed)); + + public override LogLevel Level => LogLevel.Error; + + protected override string CreateLogMessage() => + $"{this.Dispatcher}: Unhandled exception in dispatch loop. Will retry after backoff. Details: {this.Details}"; + + void IEventSourceEvent.WriteEventSource() => + StructuredEventSource.Log.DispatcherLoopFailed( + this.Dispatcher, + this.Details, + Utils.AppName, + Utils.PackageVersion); + } + internal class DispatchersStopping : StructuredLogEvent, IEventSourceEvent { public DispatchersStopping(string name, string id, int concurrentWorkItemCount, int activeFetchers) diff --git a/src/DurableTask.Core/Logging/LogHelper.cs b/src/DurableTask.Core/Logging/LogHelper.cs index 1ae501b79..6efbe0cfe 100644 --- a/src/DurableTask.Core/Logging/LogHelper.cs +++ b/src/DurableTask.Core/Logging/LogHelper.cs @@ -107,6 +107,21 @@ internal void DispatcherStopped(WorkItemDispatcherContext context) } } + /// + /// Logs that a work item dispatch loop encountered an unhandled exception. + /// + /// The context of the dispatcher that failed. + /// The unhandled exception. + internal void DispatcherLoopFailed(WorkItemDispatcherContext context, Exception exception) + { + if (this.IsStructuredLoggingEnabled) + { + this.WriteStructuredLog( + new LogEvents.DispatcherLoopFailed(context, exception), + exception); + } + } + /// /// Logs that the work item dispatcher is watching for individual dispatch loops to finish stopping. /// diff --git a/src/DurableTask.Core/Logging/StructuredEventSource.cs b/src/DurableTask.Core/Logging/StructuredEventSource.cs index 47a94b1a4..129bac158 100644 --- a/src/DurableTask.Core/Logging/StructuredEventSource.cs +++ b/src/DurableTask.Core/Logging/StructuredEventSource.cs @@ -102,6 +102,15 @@ internal void DispatcherStopped(string Dispatcher, string AppName, string Extens } } + [Event(EventIds.DispatcherLoopFailed, Level = EventLevel.Error, Version = 1)] + internal void DispatcherLoopFailed(string Dispatcher, string Details, string AppName, string ExtensionVersion) + { + if (this.IsEnabled(EventLevel.Error)) + { + this.WriteEvent(EventIds.DispatcherLoopFailed, Dispatcher, Details, AppName, ExtensionVersion); + } + } + [Event(EventIds.DispatchersStopping, Level = EventLevel.Verbose, Version = 1)] internal void DispatchersStopping( string Dispatcher, diff --git a/src/DurableTask.Core/WorkItemDispatcher.cs b/src/DurableTask.Core/WorkItemDispatcher.cs index 3829578d8..1a64065e5 100644 --- a/src/DurableTask.Core/WorkItemDispatcher.cs +++ b/src/DurableTask.Core/WorkItemDispatcher.cs @@ -142,7 +142,14 @@ public async Task StartAsync() // We just want this to Run we intentionally don't wait #pragma warning disable 4014 - Task.Run(() => this.DispatchAsync(context)); + Task.Run(() => this.DispatchAsync(context)).ContinueWith(t => + { + TraceHelper.TraceException( + TraceEventType.Critical, + "WorkItemDispatcherDispatch-FatalTermination", + t.Exception, + $"Dispatch loop for '{this.name}' terminated fatally!"); + }, TaskContinuationOptions.OnlyOnFaulted); #pragma warning restore 4014 } } @@ -224,128 +231,167 @@ async Task DispatchAsync(WorkItemDispatcherContext context) bool logThrottle = true; while (this.isStarted) { - if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5))) + var semaphoreAcquired = false; + var scheduledWorkItem = false; + try { - if (logThrottle) + if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5))) { - // This can happen frequently under heavy load. - // To avoid log spam, we log just once until we can proceed. - this.LogHelper.FetchingThrottled( - context, - this.concurrentWorkItemCount, - this.MaxConcurrentWorkItems); - TraceHelper.Trace( - TraceEventType.Warning, - "WorkItemDispatcherDispatch-MaxOperations", - this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept.")); - - logThrottle = false; + if (logThrottle) + { + // This can happen frequently under heavy load. + // To avoid log spam, we log just once until we can proceed. + this.LogHelper.FetchingThrottled( + context, + this.concurrentWorkItemCount, + this.MaxConcurrentWorkItems); + TraceHelper.Trace( + TraceEventType.Warning, + "WorkItemDispatcherDispatch-MaxOperations", + this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept.")); + + logThrottle = false; + } + + continue; } - continue; - } + semaphoreAcquired = true; + logThrottle = true; - logThrottle = true; + var delaySecs = 0; + T workItem = default(T); + try + { + Interlocked.Increment(ref this.activeFetchers); + this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems); + TraceHelper.Trace( + TraceEventType.Verbose, + "WorkItemDispatcherDispatch-StartFetch", + this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)")); - var delaySecs = 0; - T workItem = default(T); - try - { - Interlocked.Increment(ref this.activeFetchers); - this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems); - TraceHelper.Trace( - TraceEventType.Verbose, - "WorkItemDispatcherDispatch-StartFetch", - this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)")); + Stopwatch timer = Stopwatch.StartNew(); + workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token); - Stopwatch timer = Stopwatch.StartNew(); - workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token); + if (!IsNull(workItem)) + { + string workItemId = this.workItemIdentifier(workItem); + this.LogHelper.FetchWorkItemCompleted( + context, + workItemId, + timer.Elapsed, + this.concurrentWorkItemCount, + this.MaxConcurrentWorkItems); + } - if (!IsNull(workItem)) + TraceHelper.Trace( + TraceEventType.Verbose, + "WorkItemDispatcherDispatch-EndFetch", + this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)")); + } + catch (TimeoutException) { - string workItemId = this.workItemIdentifier(workItem); - this.LogHelper.FetchWorkItemCompleted( - context, - workItemId, - timer.Elapsed, - this.concurrentWorkItemCount, - this.MaxConcurrentWorkItems); + delaySecs = 0; } - - TraceHelper.Trace( - TraceEventType.Verbose, - "WorkItemDispatcherDispatch-EndFetch", - this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)")); - } - catch (TimeoutException) - { - delaySecs = 0; - } - catch (TaskCanceledException exception) - { - TraceHelper.Trace( - TraceEventType.Information, - "WorkItemDispatcherDispatch-TaskCanceledException", - this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}")); - delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception); - } - catch (Exception exception) - { - if (!this.isStarted) + catch (TaskCanceledException exception) { TraceHelper.Trace( - TraceEventType.Information, - "WorkItemDispatcherDispatch-HarmlessException", - this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}")); + TraceEventType.Information, + "WorkItemDispatcherDispatch-TaskCanceledException", + this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}")); + delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception); } - else + catch (Exception exception) { - this.LogHelper.FetchWorkItemFailure(context, exception); - // TODO : dump full node context here - TraceHelper.TraceException( - TraceEventType.Warning, - "WorkItemDispatcherDispatch-Exception", - exception, - this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}")); - delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception); + if (!this.isStarted) + { + TraceHelper.Trace( + TraceEventType.Information, + "WorkItemDispatcherDispatch-HarmlessException", + this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}")); + } + else + { + this.LogHelper.FetchWorkItemFailure(context, exception); + // TODO : dump full node context here + TraceHelper.TraceException( + TraceEventType.Warning, + "WorkItemDispatcherDispatch-Exception", + exception, + this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}")); + delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception); + } + } + finally + { + Interlocked.Decrement(ref this.activeFetchers); } - } - finally - { - Interlocked.Decrement(ref this.activeFetchers); - } - var scheduledWorkItem = false; - if (!IsNull(workItem)) - { - if (!this.isStarted) + if (!IsNull(workItem)) { - if (this.SafeReleaseWorkItem != null) + if (!this.isStarted) { - await this.SafeReleaseWorkItem(workItem); + if (this.SafeReleaseWorkItem != null) + { + await this.SafeReleaseWorkItem(workItem); + } + } + else + { + Interlocked.Increment(ref this.concurrentWorkItemCount); + // We just want this to Run we intentionally don't wait + #pragma warning disable 4014 + Task.Run(() => this.ProcessWorkItemAsync(context, workItem)); + #pragma warning restore 4014 + + scheduledWorkItem = true; } } - else + + delaySecs = Math.Max(this.delayOverrideSecs, delaySecs); + if (delaySecs > 0) { - Interlocked.Increment(ref this.concurrentWorkItemCount); - // We just want this to Run we intentionally don't wait - #pragma warning disable 4014 - Task.Run(() => this.ProcessWorkItemAsync(context, workItem)); - #pragma warning restore 4014 + await Task.Delay(TimeSpan.FromSeconds(delaySecs)); + } - scheduledWorkItem = true; + if (!scheduledWorkItem) + { + this.concurrencyLock.Release(); } } - - delaySecs = Math.Max(this.delayOverrideSecs, delaySecs); - if (delaySecs > 0) + catch (Exception exception) when (!Utils.IsFatal(exception)) { - await Task.Delay(TimeSpan.FromSeconds(delaySecs)); - } + // Catch-all for any unhandled exception in the dispatch loop body. + // Without this, the dispatch loop would silently terminate because + // DispatchAsync runs as a fire-and-forget Task.Run. + this.LogHelper.DispatcherLoopFailed(context, exception); + TraceHelper.TraceException( + TraceEventType.Error, + "WorkItemDispatcherDispatch-UnhandledException", + exception, + this.GetFormattedLog(dispatcherId, + $"Unhandled exception in dispatch loop. Will retry after backoff.")); + + // Release the semaphore if we acquired it but never handed it off + // to ProcessWorkItemAsync, to avoid permanently reducing concurrency. + if (semaphoreAcquired && !scheduledWorkItem) + { + try { this.concurrencyLock.Release(); } catch { /* best effort */ } + } - if (!scheduledWorkItem) - { - this.concurrencyLock.Release(); + try + { + await Task.Delay(TimeSpan.FromSeconds(BackOffIntervalOnInvalidOperationSecs), this.shutdownCancellationTokenSource.Token); + } + catch (OperationCanceledException) + { + // Shutdown requested during backoff; exit promptly. + } + catch (ObjectDisposedException) + { + // CancellationTokenSource was disposed (e.g., Dispose called + // shortly after StopAsync); treat as shutdown. + } } } diff --git a/test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs b/test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs new file mode 100644 index 000000000..275503748 --- /dev/null +++ b/test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs @@ -0,0 +1,442 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- +#nullable enable +namespace DurableTask.Core.Tests +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using DurableTask.Core.Logging; + using Microsoft.Extensions.Logging; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + public class WorkItemDispatcherTests + { + // A generous-but-bounded timeout for CI. Tests normally complete in <1s; + // this guards against hangs without wasting time on every run. + static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(10); + [TestMethod] + public async Task DispatchLoop_SurvivesOuterException_ViaFaultyDelayCallback() + { + // Arrange: GetDelayInSecondsAfterOnFetchException is called from inside the + // inner catch block, but if it throws, the exception escapes the inner + // catch and is caught by the outer catch — exercising the new safety net. + // We verify that DispatcherLoopFailed is logged and the loop continues. + int fetchCallCount = 0; + var secondFetchStarted = new TaskCompletionSource(); + + var logMessages = new ConcurrentBag(); + var loggerFactory = LoggerFactory.Create(builder => + { + builder.AddProvider(new InMemoryLoggerProvider(logMessages)); + builder.SetMinimumLevel(LogLevel.Trace); + }); + + var dispatcher = new WorkItemDispatcher( + "TestDispatcher", + workItemIdentifier: item => item ?? "null", + fetchWorkItem: (timeout, ct) => + { + int count = Interlocked.Increment(ref fetchCallCount); + if (count == 1) + { + // This exception will be caught by the inner catch, which then + // calls GetDelayInSecondsAfterOnFetchException (which also throws). + throw new InvalidOperationException("Trigger inner catch"); + } + + // If we reach here, the outer catch handled the callback failure + // and the loop continued. + secondFetchStarted.TrySetResult(true); + return Task.FromResult(null!); + }, + processWorkItem: item => Task.CompletedTask); + + dispatcher.MaxConcurrentWorkItems = 1; + dispatcher.DispatcherCount = 1; + dispatcher.LogHelper = new LogHelper(loggerFactory.CreateLogger("DurableTask.Core")); + + // This callback is invoked from the inner catch. Throwing here causes + // the exception to escape to the outer catch. + int delayCallCount = 0; + dispatcher.GetDelayInSecondsAfterOnFetchException = (ex) => + { + if (Interlocked.Increment(ref delayCallCount) == 1) + { + throw new InvalidOperationException("Failure in delay callback"); + } + + return 0; + }; + + // Act + await dispatcher.StartAsync(); + + // Wait for the loop to recover from the outer catch and start a second fetch. + // The outer catch backoff is 10s, but may be cancelled on shutdown. We give it + // enough time for the backoff to elapse. If the loop died, this will time out. + bool recovered = await Task.WhenAny( + secondFetchStarted.Task, + Task.Delay(TimeSpan.FromSeconds(15))) == secondFetchStarted.Task; + + await dispatcher.StopAsync(forced: true); + dispatcher.Dispose(); + + // Assert: The dispatch loop survived the outer exception and continued + Assert.IsTrue(recovered, "Dispatch loop should have recovered after outer catch exception."); + + // The outer catch should have logged DispatcherLoopFailed + bool hasLoopFailed = logMessages.Any(m => + m.EventId.Name == nameof(Logging.EventIds.DispatcherLoopFailed)); + Assert.IsTrue(hasLoopFailed, "Expected DispatcherLoopFailed log event from the outer catch."); + } + + [TestMethod] + public async Task DispatchLoop_SurvivesMultipleExceptionTypes_AndContinuesProcessing() + { + // Arrange: Throw a variety of exception types from fetch across consecutive calls. + // All are handled by the inner catch, but this verifies the loop keeps going. + int fetchCallCount = 0; + var recoverySignal = new TaskCompletionSource(); + + var dispatcher = new WorkItemDispatcher( + "TestDispatcher", + workItemIdentifier: item => item ?? "null", + fetchWorkItem: (timeout, ct) => + { + int count = Interlocked.Increment(ref fetchCallCount); + switch (count) + { + case 1: throw new InvalidOperationException("Test exception"); + case 2: throw new TimeoutException("Test timeout"); + case 3: throw new TaskCanceledException("Test cancel"); + default: + if (count >= 5) + { + recoverySignal.TrySetResult(true); + } + + return Task.FromResult(null!); + } + }, + processWorkItem: item => Task.CompletedTask); + + dispatcher.MaxConcurrentWorkItems = 1; + dispatcher.DispatcherCount = 1; + + // Act + await dispatcher.StartAsync(); + + bool recovered = await Task.WhenAny( + recoverySignal.Task, + Task.Delay(TestTimeout)) == recoverySignal.Task; + + await dispatcher.StopAsync(forced: true); + dispatcher.Dispose(); + + // Assert: The dispatcher survived all exception types and kept fetching + Assert.IsTrue(recovered, "Dispatch loop should have recovered after multiple exception types."); + Assert.IsTrue(fetchCallCount >= 5, $"Expected at least 5 fetch calls, got {fetchCallCount}."); + } + + [TestMethod] + public async Task DispatchLoop_LogsFetchWorkItemFailure_WhenFetchThrows() + { + // Arrange: Use a logging ILogger to capture log events. + // The fetch exception is handled by the inner catch, which logs + // FetchWorkItemFailure — verifying that path works correctly. + var logMessages = new ConcurrentBag(); + var loggerFactory = LoggerFactory.Create(builder => + { + builder.AddProvider(new InMemoryLoggerProvider(logMessages)); + builder.SetMinimumLevel(LogLevel.Trace); + }); + + int fetchCallCount = 0; + var dispatcherRecovered = new TaskCompletionSource(); + + var dispatcher = new WorkItemDispatcher( + "TestDispatcher", + workItemIdentifier: item => item ?? "null", + fetchWorkItem: (timeout, ct) => + { + int count = Interlocked.Increment(ref fetchCallCount); + if (count == 1) + { + // This exception is caught by the inner catch block, + // which logs FetchWorkItemFailure (not DispatcherLoopFailed). + throw new InvalidOperationException("Simulated fetch failure"); + } + + if (count >= 3) + { + dispatcherRecovered.TrySetResult(true); + } + + return Task.FromResult(null!); + }, + processWorkItem: item => Task.CompletedTask); + + dispatcher.MaxConcurrentWorkItems = 1; + dispatcher.DispatcherCount = 1; + dispatcher.LogHelper = new LogHelper(loggerFactory.CreateLogger("DurableTask.Core")); + + // Act + await dispatcher.StartAsync(); + + bool recovered = await Task.WhenAny( + dispatcherRecovered.Task, + Task.Delay(TestTimeout)) == dispatcherRecovered.Task; + + await dispatcher.StopAsync(forced: true); + dispatcher.Dispose(); + + // Assert + Assert.IsTrue(recovered, "Dispatch loop should have recovered after exception."); + + // The inner catch handles InvalidOperationException from fetch, + // so we verify FetchWorkItemFailure was logged + bool hasFetchFailure = logMessages.Any(m => + m.EventId.Name == nameof(Logging.EventIds.FetchWorkItemFailure)); + Assert.IsTrue(hasFetchFailure, "Expected FetchWorkItemFailure log event."); + } + + [TestMethod] + public async Task DispatchLoop_ProcessesWorkItemsSuccessfully() + { + // Arrange + var processedItems = new ConcurrentBag(); + int fetchCallCount = 0; + var allItemsProcessed = new TaskCompletionSource(); + string[] workItems = { "item-1", "item-2", "item-3" }; + + var dispatcher = new WorkItemDispatcher( + "TestDispatcher", + workItemIdentifier: item => item, + fetchWorkItem: (timeout, ct) => + { + int count = Interlocked.Increment(ref fetchCallCount); + if (count <= workItems.Length) + { + return Task.FromResult(workItems[count - 1]); + } + + return Task.FromResult(null!); + }, + processWorkItem: item => + { + processedItems.Add(item); + if (processedItems.Count >= workItems.Length) + { + allItemsProcessed.TrySetResult(true); + } + + return Task.CompletedTask; + }); + + dispatcher.MaxConcurrentWorkItems = 5; + dispatcher.DispatcherCount = 1; + + // Act + await dispatcher.StartAsync(); + + bool completed = await Task.WhenAny( + allItemsProcessed.Task, + Task.Delay(TestTimeout)) == allItemsProcessed.Task; + + await dispatcher.StopAsync(forced: true); + dispatcher.Dispose(); + + // Assert + Assert.IsTrue(completed, "All work items should have been processed."); + CollectionAssert.AreEquivalent(workItems, processedItems.ToArray()); + } + + [TestMethod] + public async Task DispatchLoop_StopsGracefully_WhenNoExceptions() + { + // Arrange + var logMessages = new ConcurrentBag(); + var loggerFactory = LoggerFactory.Create(builder => + { + builder.AddProvider(new InMemoryLoggerProvider(logMessages)); + builder.SetMinimumLevel(LogLevel.Trace); + }); + + // Use a signal so we know the dispatch loop is actively running + // before we ask it to stop, rather than relying on a fixed delay. + var fetchStarted = new TaskCompletionSource(); + + var dispatcher = new WorkItemDispatcher( + "TestDispatcher", + workItemIdentifier: item => item ?? "null", + fetchWorkItem: (timeout, ct) => + { + fetchStarted.TrySetResult(true); + return Task.FromResult(null!); + }, + processWorkItem: item => Task.CompletedTask); + + dispatcher.MaxConcurrentWorkItems = 1; + dispatcher.DispatcherCount = 1; + dispatcher.LogHelper = new LogHelper(loggerFactory.CreateLogger("DurableTask.Core")); + + // Act + await dispatcher.StartAsync(); + + // Wait until the dispatch loop has actually started fetching + await Task.WhenAny(fetchStarted.Task, Task.Delay(TestTimeout)); + Assert.IsTrue(fetchStarted.Task.IsCompleted, "Dispatch loop should have started fetching."); + + await dispatcher.StopAsync(forced: false); + + // The DispatcherStopped event is logged asynchronously after the + // dispatch loop exits. Poll for it instead of using a fixed delay. + var sw = Stopwatch.StartNew(); + while (sw.Elapsed < TestTimeout) + { + if (logMessages.Any(m => m.EventId.Name == nameof(Logging.EventIds.DispatcherStopped))) + { + break; + } + + await Task.Delay(50); + } + + dispatcher.Dispose(); + + // Assert: DispatcherStopped event should be logged, not DispatcherLoopFailed + bool hasStopped = logMessages.Any(m => + m.EventId.Name == nameof(Logging.EventIds.DispatcherStopped)); + bool hasFailed = logMessages.Any(m => + m.EventId.Name == nameof(Logging.EventIds.DispatcherLoopFailed)); + + Assert.IsTrue(hasStopped, "Expected DispatcherStopped log event on graceful shutdown."); + Assert.IsFalse(hasFailed, "DispatcherLoopFailed should not be logged during graceful shutdown."); + } + + [TestMethod] + public void DispatcherLoopFailed_LogEvent_HasCorrectProperties() + { + // Arrange + var context = new WorkItemDispatcherContext("TestDispatcher", "test-id", "0"); + var exception = new ObjectDisposedException("testSemaphore"); + + // Act + var logEvent = new LogEvents.DispatcherLoopFailed(context, exception); + + // Assert + Assert.AreEqual(LogLevel.Error, logEvent.Level); + Assert.AreEqual(Logging.EventIds.DispatcherLoopFailed, logEvent.EventId.Id); + Assert.AreEqual(nameof(Logging.EventIds.DispatcherLoopFailed), logEvent.EventId.Name); + + string message = ((ILogEvent)logEvent).FormattedMessage; + Assert.IsTrue(message.Contains("TestDispatcher"), "Message should contain the dispatcher name."); + Assert.IsTrue(message.Contains("Unhandled exception"), "Message should mention unhandled exception."); + Assert.IsTrue(message.Contains("testSemaphore"), "Message should contain exception details."); + } + + [TestMethod] + public void DispatcherLoopFailed_LogEvent_ExposesStructuredFields() + { + // Arrange + var context = new WorkItemDispatcherContext("ActivityDispatcher", "abc123", "1"); + var exception = new InvalidOperationException("Something went wrong"); + + // Act + var logEvent = new LogEvents.DispatcherLoopFailed(context, exception); + + // Assert: Verify the structured log fields are accessible via the dictionary interface + var dict = (IReadOnlyDictionary)logEvent; + Assert.IsTrue(dict.ContainsKey("Dispatcher"), "Should have 'Dispatcher' field."); + Assert.IsTrue(dict.ContainsKey("Details"), "Should have 'Details' field."); + + string dispatcher = (string)dict["Dispatcher"]; + string details = (string)dict["Details"]; + + Assert.IsTrue(dispatcher.Contains("ActivityDispatcher"), "Dispatcher field should contain dispatcher name."); + Assert.IsTrue(details.Contains("Something went wrong"), "Details field should contain exception message."); + } + + #region Test Helpers + + /// + /// A simple log entry captured by the in-memory logger. + /// + class LogEntry + { + public LogLevel Level { get; set; } + public EventId EventId { get; set; } + public string? Message { get; set; } + public Exception? Exception { get; set; } + } + + class NoOpDisposable : IDisposable + { + public static readonly NoOpDisposable Instance = new NoOpDisposable(); + public void Dispose() { } + } + + /// + /// An ILogger that captures log entries to a concurrent bag for test assertions. + /// + class InMemoryLogger : ILogger + { + readonly ConcurrentBag logs; + + public InMemoryLogger(ConcurrentBag logs) + { + this.logs = logs; + } + + public IDisposable BeginScope(TState state) => NoOpDisposable.Instance; + + public bool IsEnabled(LogLevel logLevel) => true; + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) + { + this.logs.Add(new LogEntry + { + Level = logLevel, + EventId = eventId, + Message = formatter(state, exception), + Exception = exception, + }); + } + } + + /// + /// An ILoggerProvider that creates InMemoryLogger instances. + /// + class InMemoryLoggerProvider : ILoggerProvider + { + readonly ConcurrentBag logs; + + public InMemoryLoggerProvider(ConcurrentBag logs) + { + this.logs = logs; + } + + public ILogger CreateLogger(string categoryName) => new InMemoryLogger(this.logs); + + public void Dispose() { } + } + + #endregion + } +}