From 3fbcfff78210c2e13c9788717d1ff5ceb475a490 Mon Sep 17 00:00:00 2001 From: Bernd Verst Date: Wed, 18 Mar 2026 15:15:57 -0700 Subject: [PATCH 1/4] Fix WorkItemDispatcher.DispatchAsync silent termination due to unhandled exceptions Wrap the DispatchAsync loop body in an outer try/catch to prevent the dispatch loop from silently dying when exceptions occur outside the inner fetch try/catch block. Add ContinueWith to Task.Run in StartAsync as a last-resort safety net for fatal exceptions. Add new DispatcherLoopFailed structured log event (EventId 30) at Error level with full exception details for telemetry visibility. Add comprehensive unit tests for the dispatch loop resilience. Fixes Azure#1320 --- .../WorkItemDispatcherTests.cs | 389 ++++++++++++++++++ src/DurableTask.Core/Logging/EventIds.cs | 1 + src/DurableTask.Core/Logging/LogEvents.cs | 31 ++ src/DurableTask.Core/Logging/LogHelper.cs | 15 + .../Logging/StructuredEventSource.cs | 9 + src/DurableTask.Core/WorkItemDispatcher.cs | 221 +++++----- 6 files changed, 568 insertions(+), 98 deletions(-) create mode 100644 Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs diff --git a/Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs b/Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs new file mode 100644 index 000000000..33216a76f --- /dev/null +++ b/Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs @@ -0,0 +1,389 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- +#nullable enable +namespace DurableTask.Core.Tests +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using DurableTask.Core.Logging; + using Microsoft.Extensions.Logging; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + public class WorkItemDispatcherTests + { + [TestMethod] + public async Task DispatchLoop_SurvivesUnhandledException_AndContinuesProcessing() + { + // Arrange: The fetch callback throws an ObjectDisposedException on the first call, + // then returns null (no work item) on subsequent calls. + int fetchCallCount = 0; + var fetchCalledAfterException = new TaskCompletionSource(); + + var dispatcher = new WorkItemDispatcher( + "TestDispatcher", + workItemIdentifier: item => item, + fetchWorkItem: (timeout, ct) => + { + int count = Interlocked.Increment(ref fetchCallCount); + if (count == 1) + { + // Simulate an exception that would escape the inner try/catch + // by throwing from the fetch callback. This gets caught by the inner catch, + // but let's test the outer catch by using a different approach below. + throw new InvalidOperationException("Test fetch exception"); + } + + if (count >= 3) + { + fetchCalledAfterException.TrySetResult(true); + } + + return Task.FromResult(null!); + }, + processWorkItem: item => Task.CompletedTask); + + dispatcher.MaxConcurrentWorkItems = 1; + dispatcher.DispatcherCount = 1; + + // Act + await dispatcher.StartAsync(); + + // Wait for the dispatcher to recover and fetch again after the exception + bool recovered = await Task.WhenAny( + fetchCalledAfterException.Task, + Task.Delay(TimeSpan.FromSeconds(30))) == fetchCalledAfterException.Task; + + await dispatcher.StopAsync(forced: true); + dispatcher.Dispose(); + + // Assert: The dispatcher continued running after the exception + Assert.IsTrue(recovered, "Dispatch loop should have continued after the exception."); + Assert.IsTrue(fetchCallCount >= 3, $"Expected at least 3 fetch calls, got {fetchCallCount}."); + } + + [TestMethod] + public async Task DispatchLoop_SurvivesSafeReleaseWorkItemException_AndContinuesProcessing() + { + // Arrange: SafeReleaseWorkItem throws, which is outside the inner try/catch. + // The dispatch loop should catch this via the outer try/catch and continue. + int fetchCallCount = 0; + var fetchCalledAfterException = new TaskCompletionSource(); + bool safeReleaseThrew = false; + + var dispatcher = new WorkItemDispatcher( + "TestDispatcher", + workItemIdentifier: item => item ?? "null", + fetchWorkItem: (timeout, ct) => + { + int count = Interlocked.Increment(ref fetchCallCount); + if (count == 1) + { + // Return a work item that will trigger SafeReleaseWorkItem + return Task.FromResult("work-item-1"); + } + + if (count >= 3 && safeReleaseThrew) + { + fetchCalledAfterException.TrySetResult(true); + } + + return Task.FromResult(null!); + }, + processWorkItem: item => Task.CompletedTask); + + dispatcher.MaxConcurrentWorkItems = 1; + dispatcher.DispatcherCount = 1; + + // Start, then immediately stop to make isStarted = false, + // so when the work item comes back, SafeReleaseWorkItem is called. + // Instead, let's simulate SafeReleaseWorkItem throwing by setting it up + // to throw on the first call. The SafeReleaseWorkItem is called when + // isStarted is false and a workItem was fetched. + // This is tricky to test in isolation. Let's test a simpler scenario instead. + + // Act & Assert: just verify the dispatcher starts and stops cleanly + await dispatcher.StartAsync(); + + bool completed = await Task.WhenAny( + fetchCalledAfterException.Task, + Task.Delay(TimeSpan.FromSeconds(15))) == fetchCalledAfterException.Task; + + await dispatcher.StopAsync(forced: true); + dispatcher.Dispose(); + + // The fetch was called multiple times, proving the loop is alive + Assert.IsTrue(fetchCallCount >= 2, $"Expected at least 2 fetch calls, got {fetchCallCount}."); + } + + [TestMethod] + public async Task DispatchLoop_LogsErrorAndRetries_WhenOuterExceptionOccurs() + { + // Arrange: Use a logging ILogger to capture log events + var logMessages = new ConcurrentBag(); + var loggerFactory = LoggerFactory.Create(builder => + { + builder.AddProvider(new InMemoryLoggerProvider(logMessages)); + builder.SetMinimumLevel(LogLevel.Trace); + }); + + int fetchCallCount = 0; + var dispatcherRecovered = new TaskCompletionSource(); + + var dispatcher = new WorkItemDispatcher( + "TestDispatcher", + workItemIdentifier: item => item ?? "null", + fetchWorkItem: (timeout, ct) => + { + int count = Interlocked.Increment(ref fetchCallCount); + if (count == 1) + { + // This exception is caught by the inner catch block + throw new InvalidOperationException("Simulated fetch failure"); + } + + if (count >= 3) + { + dispatcherRecovered.TrySetResult(true); + } + + return Task.FromResult(null!); + }, + processWorkItem: item => Task.CompletedTask); + + dispatcher.MaxConcurrentWorkItems = 1; + dispatcher.DispatcherCount = 1; + dispatcher.LogHelper = new LogHelper(loggerFactory.CreateLogger("DurableTask.Core")); + + // Act + await dispatcher.StartAsync(); + + bool recovered = await Task.WhenAny( + dispatcherRecovered.Task, + Task.Delay(TimeSpan.FromSeconds(30))) == dispatcherRecovered.Task; + + await dispatcher.StopAsync(forced: true); + dispatcher.Dispose(); + + // Assert + Assert.IsTrue(recovered, "Dispatch loop should have recovered after exception."); + + // The inner catch handles InvalidOperationException from fetch, + // so we verify FetchWorkItemFailure was logged + bool hasFetchFailure = logMessages.Any(m => + m.EventId.Name == nameof(Logging.EventIds.FetchWorkItemFailure)); + Assert.IsTrue(hasFetchFailure, "Expected FetchWorkItemFailure log event."); + } + + [TestMethod] + public async Task DispatchLoop_ProcessesWorkItemsSuccessfully() + { + // Arrange + var processedItems = new ConcurrentBag(); + int fetchCallCount = 0; + var allItemsProcessed = new TaskCompletionSource(); + string[] workItems = { "item-1", "item-2", "item-3" }; + + var dispatcher = new WorkItemDispatcher( + "TestDispatcher", + workItemIdentifier: item => item, + fetchWorkItem: (timeout, ct) => + { + int count = Interlocked.Increment(ref fetchCallCount); + if (count <= workItems.Length) + { + return Task.FromResult(workItems[count - 1]); + } + + return Task.FromResult(null!); + }, + processWorkItem: item => + { + processedItems.Add(item); + if (processedItems.Count >= workItems.Length) + { + allItemsProcessed.TrySetResult(true); + } + + return Task.CompletedTask; + }); + + dispatcher.MaxConcurrentWorkItems = 5; + dispatcher.DispatcherCount = 1; + + // Act + await dispatcher.StartAsync(); + + bool completed = await Task.WhenAny( + allItemsProcessed.Task, + Task.Delay(TimeSpan.FromSeconds(30))) == allItemsProcessed.Task; + + await dispatcher.StopAsync(forced: true); + dispatcher.Dispose(); + + // Assert + Assert.IsTrue(completed, "All work items should have been processed."); + CollectionAssert.AreEquivalent(workItems, processedItems.ToArray()); + } + + [TestMethod] + public async Task DispatchLoop_StopsGracefully_WhenNoExceptions() + { + // Arrange + var logMessages = new ConcurrentBag(); + var loggerFactory = LoggerFactory.Create(builder => + { + builder.AddProvider(new InMemoryLoggerProvider(logMessages)); + builder.SetMinimumLevel(LogLevel.Trace); + }); + + var dispatcher = new WorkItemDispatcher( + "TestDispatcher", + workItemIdentifier: item => item ?? "null", + fetchWorkItem: (timeout, ct) => Task.FromResult(null!), + processWorkItem: item => Task.CompletedTask); + + dispatcher.MaxConcurrentWorkItems = 1; + dispatcher.DispatcherCount = 1; + dispatcher.LogHelper = new LogHelper(loggerFactory.CreateLogger("DurableTask.Core")); + + // Act + await dispatcher.StartAsync(); + // Give it a moment to start dispatching + await Task.Delay(500); + await dispatcher.StopAsync(forced: false); + + // The DispatcherStopped event is logged asynchronously after the + // dispatch loop exits its while loop, which may happen slightly + // after StopAsync returns. Give it a moment to complete. + await Task.Delay(2000); + dispatcher.Dispose(); + + // Assert: DispatcherStopped event should be logged, not DispatcherLoopFailed + bool hasStopped = logMessages.Any(m => + m.EventId.Name == nameof(Logging.EventIds.DispatcherStopped)); + bool hasFailed = logMessages.Any(m => + m.EventId.Name == nameof(Logging.EventIds.DispatcherLoopFailed)); + + Assert.IsTrue(hasStopped, "Expected DispatcherStopped log event on graceful shutdown."); + Assert.IsFalse(hasFailed, "DispatcherLoopFailed should not be logged during graceful shutdown."); + } + + [TestMethod] + public void DispatcherLoopFailed_LogEvent_HasCorrectProperties() + { + // Arrange + var context = new WorkItemDispatcherContext("TestDispatcher", "test-id", "0"); + var exception = new ObjectDisposedException("testSemaphore"); + + // Act + var logEvent = new LogEvents.DispatcherLoopFailed(context, exception); + + // Assert + Assert.AreEqual(LogLevel.Error, logEvent.Level); + Assert.AreEqual(Logging.EventIds.DispatcherLoopFailed, logEvent.EventId.Id); + Assert.AreEqual(nameof(Logging.EventIds.DispatcherLoopFailed), logEvent.EventId.Name); + + string message = ((ILogEvent)logEvent).FormattedMessage; + Assert.IsTrue(message.Contains("TestDispatcher"), "Message should contain the dispatcher name."); + Assert.IsTrue(message.Contains("Unhandled exception"), "Message should mention unhandled exception."); + Assert.IsTrue(message.Contains("testSemaphore"), "Message should contain exception details."); + } + + [TestMethod] + public void DispatcherLoopFailed_LogEvent_ExposesStructuredFields() + { + // Arrange + var context = new WorkItemDispatcherContext("ActivityDispatcher", "abc123", "1"); + var exception = new InvalidOperationException("Something went wrong"); + + // Act + var logEvent = new LogEvents.DispatcherLoopFailed(context, exception); + + // Assert: Verify the structured log fields are accessible via the dictionary interface + var dict = (IReadOnlyDictionary)logEvent; + Assert.IsTrue(dict.ContainsKey("Dispatcher"), "Should have 'Dispatcher' field."); + Assert.IsTrue(dict.ContainsKey("Details"), "Should have 'Details' field."); + + string dispatcher = (string)dict["Dispatcher"]; + string details = (string)dict["Details"]; + + Assert.IsTrue(dispatcher.Contains("ActivityDispatcher"), "Dispatcher field should contain dispatcher name."); + Assert.IsTrue(details.Contains("Something went wrong"), "Details field should contain exception message."); + } + + #region Test Helpers + + /// + /// A simple log entry captured by the in-memory logger. + /// + class LogEntry + { + public LogLevel Level { get; set; } + public EventId EventId { get; set; } + public string? Message { get; set; } + public Exception? Exception { get; set; } + } + + /// + /// An ILogger that captures log entries to a concurrent bag for test assertions. + /// + class InMemoryLogger : ILogger + { + readonly ConcurrentBag logs; + + public InMemoryLogger(ConcurrentBag logs) + { + this.logs = logs; + } + + public IDisposable BeginScope(TState state) => null!; + + public bool IsEnabled(LogLevel logLevel) => true; + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) + { + this.logs.Add(new LogEntry + { + Level = logLevel, + EventId = eventId, + Message = formatter(state, exception), + Exception = exception, + }); + } + } + + /// + /// An ILoggerProvider that creates InMemoryLogger instances. + /// + class InMemoryLoggerProvider : ILoggerProvider + { + readonly ConcurrentBag logs; + + public InMemoryLoggerProvider(ConcurrentBag logs) + { + this.logs = logs; + } + + public ILogger CreateLogger(string categoryName) => new InMemoryLogger(this.logs); + + public void Dispose() { } + } + + #endregion + } +} diff --git a/src/DurableTask.Core/Logging/EventIds.cs b/src/DurableTask.Core/Logging/EventIds.cs index f23459c79..049af43f3 100644 --- a/src/DurableTask.Core/Logging/EventIds.cs +++ b/src/DurableTask.Core/Logging/EventIds.cs @@ -31,6 +31,7 @@ static class EventIds public const int ProcessWorkItemStarting = 27; public const int ProcessWorkItemCompleted = 28; public const int ProcessWorkItemFailed = 29; + public const int DispatcherLoopFailed = 30; public const int SchedulingOrchestration = 40; public const int RaisingEvent = 41; diff --git a/src/DurableTask.Core/Logging/LogEvents.cs b/src/DurableTask.Core/Logging/LogEvents.cs index 360528638..e5fe2ee33 100644 --- a/src/DurableTask.Core/Logging/LogEvents.cs +++ b/src/DurableTask.Core/Logging/LogEvents.cs @@ -158,6 +158,37 @@ void IEventSourceEvent.WriteEventSource() => StructuredEventSource.Log.DispatcherStopped(this.Dispatcher, Utils.AppName, Utils.PackageVersion); } + internal class DispatcherLoopFailed : StructuredLogEvent, IEventSourceEvent + { + public DispatcherLoopFailed(WorkItemDispatcherContext context, Exception exception) + { + this.Dispatcher = context.GetDisplayName(); + this.Details = exception.ToString(); + } + + [StructuredLogField] + public string Dispatcher { get; } + + [StructuredLogField] + public string Details { get; } + + public override EventId EventId => new EventId( + EventIds.DispatcherLoopFailed, + nameof(EventIds.DispatcherLoopFailed)); + + public override LogLevel Level => LogLevel.Error; + + protected override string CreateLogMessage() => + $"{this.Dispatcher}: Unhandled exception in dispatch loop. Will retry after backoff. Details: {this.Details}"; + + void IEventSourceEvent.WriteEventSource() => + StructuredEventSource.Log.DispatcherLoopFailed( + this.Dispatcher, + this.Details, + Utils.AppName, + Utils.PackageVersion); + } + internal class DispatchersStopping : StructuredLogEvent, IEventSourceEvent { public DispatchersStopping(string name, string id, int concurrentWorkItemCount, int activeFetchers) diff --git a/src/DurableTask.Core/Logging/LogHelper.cs b/src/DurableTask.Core/Logging/LogHelper.cs index 1ae501b79..6efbe0cfe 100644 --- a/src/DurableTask.Core/Logging/LogHelper.cs +++ b/src/DurableTask.Core/Logging/LogHelper.cs @@ -107,6 +107,21 @@ internal void DispatcherStopped(WorkItemDispatcherContext context) } } + /// + /// Logs that a work item dispatch loop encountered an unhandled exception. + /// + /// The context of the dispatcher that failed. + /// The unhandled exception. + internal void DispatcherLoopFailed(WorkItemDispatcherContext context, Exception exception) + { + if (this.IsStructuredLoggingEnabled) + { + this.WriteStructuredLog( + new LogEvents.DispatcherLoopFailed(context, exception), + exception); + } + } + /// /// Logs that the work item dispatcher is watching for individual dispatch loops to finish stopping. /// diff --git a/src/DurableTask.Core/Logging/StructuredEventSource.cs b/src/DurableTask.Core/Logging/StructuredEventSource.cs index 47a94b1a4..129bac158 100644 --- a/src/DurableTask.Core/Logging/StructuredEventSource.cs +++ b/src/DurableTask.Core/Logging/StructuredEventSource.cs @@ -102,6 +102,15 @@ internal void DispatcherStopped(string Dispatcher, string AppName, string Extens } } + [Event(EventIds.DispatcherLoopFailed, Level = EventLevel.Error, Version = 1)] + internal void DispatcherLoopFailed(string Dispatcher, string Details, string AppName, string ExtensionVersion) + { + if (this.IsEnabled(EventLevel.Error)) + { + this.WriteEvent(EventIds.DispatcherLoopFailed, Dispatcher, Details, AppName, ExtensionVersion); + } + } + [Event(EventIds.DispatchersStopping, Level = EventLevel.Verbose, Version = 1)] internal void DispatchersStopping( string Dispatcher, diff --git a/src/DurableTask.Core/WorkItemDispatcher.cs b/src/DurableTask.Core/WorkItemDispatcher.cs index 3829578d8..d507b2616 100644 --- a/src/DurableTask.Core/WorkItemDispatcher.cs +++ b/src/DurableTask.Core/WorkItemDispatcher.cs @@ -142,7 +142,14 @@ public async Task StartAsync() // We just want this to Run we intentionally don't wait #pragma warning disable 4014 - Task.Run(() => this.DispatchAsync(context)); + Task.Run(() => this.DispatchAsync(context)).ContinueWith(t => + { + TraceHelper.TraceException( + TraceEventType.Critical, + "WorkItemDispatcherDispatch-FatalTermination", + t.Exception, + $"Dispatch loop for '{this.name}' terminated fatally!"); + }, TaskContinuationOptions.OnlyOnFaulted); #pragma warning restore 4014 } } @@ -224,128 +231,146 @@ async Task DispatchAsync(WorkItemDispatcherContext context) bool logThrottle = true; while (this.isStarted) { - if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5))) + try { - if (logThrottle) + if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5))) { - // This can happen frequently under heavy load. - // To avoid log spam, we log just once until we can proceed. - this.LogHelper.FetchingThrottled( - context, - this.concurrentWorkItemCount, - this.MaxConcurrentWorkItems); - TraceHelper.Trace( - TraceEventType.Warning, - "WorkItemDispatcherDispatch-MaxOperations", - this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept.")); - - logThrottle = false; + if (logThrottle) + { + // This can happen frequently under heavy load. + // To avoid log spam, we log just once until we can proceed. + this.LogHelper.FetchingThrottled( + context, + this.concurrentWorkItemCount, + this.MaxConcurrentWorkItems); + TraceHelper.Trace( + TraceEventType.Warning, + "WorkItemDispatcherDispatch-MaxOperations", + this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept.")); + + logThrottle = false; + } + + continue; } - continue; - } + logThrottle = true; - logThrottle = true; + var delaySecs = 0; + T workItem = default(T); + try + { + Interlocked.Increment(ref this.activeFetchers); + this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems); + TraceHelper.Trace( + TraceEventType.Verbose, + "WorkItemDispatcherDispatch-StartFetch", + this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)")); - var delaySecs = 0; - T workItem = default(T); - try - { - Interlocked.Increment(ref this.activeFetchers); - this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems); - TraceHelper.Trace( - TraceEventType.Verbose, - "WorkItemDispatcherDispatch-StartFetch", - this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)")); + Stopwatch timer = Stopwatch.StartNew(); + workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token); - Stopwatch timer = Stopwatch.StartNew(); - workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token); + if (!IsNull(workItem)) + { + string workItemId = this.workItemIdentifier(workItem); + this.LogHelper.FetchWorkItemCompleted( + context, + workItemId, + timer.Elapsed, + this.concurrentWorkItemCount, + this.MaxConcurrentWorkItems); + } - if (!IsNull(workItem)) + TraceHelper.Trace( + TraceEventType.Verbose, + "WorkItemDispatcherDispatch-EndFetch", + this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)")); + } + catch (TimeoutException) { - string workItemId = this.workItemIdentifier(workItem); - this.LogHelper.FetchWorkItemCompleted( - context, - workItemId, - timer.Elapsed, - this.concurrentWorkItemCount, - this.MaxConcurrentWorkItems); + delaySecs = 0; } - - TraceHelper.Trace( - TraceEventType.Verbose, - "WorkItemDispatcherDispatch-EndFetch", - this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)")); - } - catch (TimeoutException) - { - delaySecs = 0; - } - catch (TaskCanceledException exception) - { - TraceHelper.Trace( - TraceEventType.Information, - "WorkItemDispatcherDispatch-TaskCanceledException", - this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}")); - delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception); - } - catch (Exception exception) - { - if (!this.isStarted) + catch (TaskCanceledException exception) { TraceHelper.Trace( - TraceEventType.Information, - "WorkItemDispatcherDispatch-HarmlessException", - this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}")); + TraceEventType.Information, + "WorkItemDispatcherDispatch-TaskCanceledException", + this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}")); + delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception); } - else + catch (Exception exception) { - this.LogHelper.FetchWorkItemFailure(context, exception); - // TODO : dump full node context here - TraceHelper.TraceException( - TraceEventType.Warning, - "WorkItemDispatcherDispatch-Exception", - exception, - this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}")); - delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception); + if (!this.isStarted) + { + TraceHelper.Trace( + TraceEventType.Information, + "WorkItemDispatcherDispatch-HarmlessException", + this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}")); + } + else + { + this.LogHelper.FetchWorkItemFailure(context, exception); + // TODO : dump full node context here + TraceHelper.TraceException( + TraceEventType.Warning, + "WorkItemDispatcherDispatch-Exception", + exception, + this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}")); + delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception); + } + } + finally + { + Interlocked.Decrement(ref this.activeFetchers); } - } - finally - { - Interlocked.Decrement(ref this.activeFetchers); - } - var scheduledWorkItem = false; - if (!IsNull(workItem)) - { - if (!this.isStarted) + var scheduledWorkItem = false; + if (!IsNull(workItem)) { - if (this.SafeReleaseWorkItem != null) + if (!this.isStarted) + { + if (this.SafeReleaseWorkItem != null) + { + await this.SafeReleaseWorkItem(workItem); + } + } + else { - await this.SafeReleaseWorkItem(workItem); + Interlocked.Increment(ref this.concurrentWorkItemCount); + // We just want this to Run we intentionally don't wait + #pragma warning disable 4014 + Task.Run(() => this.ProcessWorkItemAsync(context, workItem)); + #pragma warning restore 4014 + + scheduledWorkItem = true; } } - else - { - Interlocked.Increment(ref this.concurrentWorkItemCount); - // We just want this to Run we intentionally don't wait - #pragma warning disable 4014 - Task.Run(() => this.ProcessWorkItemAsync(context, workItem)); - #pragma warning restore 4014 - scheduledWorkItem = true; + delaySecs = Math.Max(this.delayOverrideSecs, delaySecs); + if (delaySecs > 0) + { + await Task.Delay(TimeSpan.FromSeconds(delaySecs)); } - } - delaySecs = Math.Max(this.delayOverrideSecs, delaySecs); - if (delaySecs > 0) - { - await Task.Delay(TimeSpan.FromSeconds(delaySecs)); + if (!scheduledWorkItem) + { + this.concurrencyLock.Release(); + } } - - if (!scheduledWorkItem) + catch (Exception exception) when (!Utils.IsFatal(exception)) { - this.concurrencyLock.Release(); + // Catch-all for any unhandled exception in the dispatch loop body. + // Without this, the dispatch loop would silently terminate because + // DispatchAsync runs as a fire-and-forget Task.Run. + this.LogHelper.DispatcherLoopFailed(context, exception); + TraceHelper.TraceException( + TraceEventType.Error, + "WorkItemDispatcherDispatch-UnhandledException", + exception, + this.GetFormattedLog(dispatcherId, + $"Unhandled exception in dispatch loop. Will retry after backoff.")); + + await Task.Delay(TimeSpan.FromSeconds(BackOffIntervalOnInvalidOperationSecs)); } } From 6ef0a6db3782850c9cef295ab5805d2587e0b04e Mon Sep 17 00:00:00 2001 From: Bernd Verst Date: Wed, 18 Mar 2026 15:26:28 -0700 Subject: [PATCH 2/4] Improve test timing for CI reliability - Replace hard-coded 30s timeouts with a shared 10s TestTimeout constant - Rewrite broken SurvivesSafeReleaseWorkItemException test that always hit its 15s timeout (safeReleaseThrew was never set to true) replaced with SurvivesMultipleExceptionTypes that tests various exception types and completes in <10ms - Replace fixed Task.Delay(500) + Task.Delay(2000) in StopsGracefully test with event-driven signaling: a TaskCompletionSource confirms the loop is running before stopping, and polling with 50ms intervals replaces the 2s fixed sleep for DispatcherStopped detection - Total test suite time reduced from ~20s to ~2-3s --- .../WorkItemDispatcherTests.cs | 92 +++++++++++-------- 1 file changed, 55 insertions(+), 37 deletions(-) diff --git a/Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs b/Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs index 33216a76f..7442b3af3 100644 --- a/Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs +++ b/Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs @@ -16,6 +16,7 @@ namespace DurableTask.Core.Tests using System; using System.Collections.Concurrent; using System.Collections.Generic; + using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -26,6 +27,9 @@ namespace DurableTask.Core.Tests [TestClass] public class WorkItemDispatcherTests { + // A generous-but-bounded timeout for CI. Tests normally complete in <1s; + // this guards against hangs without wasting time on every run. + static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(10); [TestMethod] public async Task DispatchLoop_SurvivesUnhandledException_AndContinuesProcessing() { @@ -66,7 +70,7 @@ public async Task DispatchLoop_SurvivesUnhandledException_AndContinuesProcessing // Wait for the dispatcher to recover and fetch again after the exception bool recovered = await Task.WhenAny( fetchCalledAfterException.Task, - Task.Delay(TimeSpan.FromSeconds(30))) == fetchCalledAfterException.Task; + Task.Delay(TestTimeout)) == fetchCalledAfterException.Task; await dispatcher.StopAsync(forced: true); dispatcher.Dispose(); @@ -77,13 +81,12 @@ public async Task DispatchLoop_SurvivesUnhandledException_AndContinuesProcessing } [TestMethod] - public async Task DispatchLoop_SurvivesSafeReleaseWorkItemException_AndContinuesProcessing() + public async Task DispatchLoop_SurvivesMultipleExceptionTypes_AndContinuesProcessing() { - // Arrange: SafeReleaseWorkItem throws, which is outside the inner try/catch. - // The dispatch loop should catch this via the outer try/catch and continue. + // Arrange: Throw a variety of exception types from fetch across consecutive calls. + // All are handled by the inner catch, but this verifies the loop keeps going. int fetchCallCount = 0; - var fetchCalledAfterException = new TaskCompletionSource(); - bool safeReleaseThrew = false; + var recoverySignal = new TaskCompletionSource(); var dispatcher = new WorkItemDispatcher( "TestDispatcher", @@ -91,43 +94,38 @@ public async Task DispatchLoop_SurvivesSafeReleaseWorkItemException_AndContinues fetchWorkItem: (timeout, ct) => { int count = Interlocked.Increment(ref fetchCallCount); - if (count == 1) + switch (count) { - // Return a work item that will trigger SafeReleaseWorkItem - return Task.FromResult("work-item-1"); + case 1: throw new InvalidOperationException("Test exception"); + case 2: throw new TimeoutException("Test timeout"); + case 3: throw new TaskCanceledException("Test cancel"); + default: + if (count >= 5) + { + recoverySignal.TrySetResult(true); + } + + return Task.FromResult(null!); } - - if (count >= 3 && safeReleaseThrew) - { - fetchCalledAfterException.TrySetResult(true); - } - - return Task.FromResult(null!); }, processWorkItem: item => Task.CompletedTask); dispatcher.MaxConcurrentWorkItems = 1; dispatcher.DispatcherCount = 1; - // Start, then immediately stop to make isStarted = false, - // so when the work item comes back, SafeReleaseWorkItem is called. - // Instead, let's simulate SafeReleaseWorkItem throwing by setting it up - // to throw on the first call. The SafeReleaseWorkItem is called when - // isStarted is false and a workItem was fetched. - // This is tricky to test in isolation. Let's test a simpler scenario instead. - - // Act & Assert: just verify the dispatcher starts and stops cleanly + // Act await dispatcher.StartAsync(); - bool completed = await Task.WhenAny( - fetchCalledAfterException.Task, - Task.Delay(TimeSpan.FromSeconds(15))) == fetchCalledAfterException.Task; + bool recovered = await Task.WhenAny( + recoverySignal.Task, + Task.Delay(TestTimeout)) == recoverySignal.Task; await dispatcher.StopAsync(forced: true); dispatcher.Dispose(); - // The fetch was called multiple times, proving the loop is alive - Assert.IsTrue(fetchCallCount >= 2, $"Expected at least 2 fetch calls, got {fetchCallCount}."); + // Assert: The dispatcher survived all exception types and kept fetching + Assert.IsTrue(recovered, "Dispatch loop should have recovered after multiple exception types."); + Assert.IsTrue(fetchCallCount >= 5, $"Expected at least 5 fetch calls, got {fetchCallCount}."); } [TestMethod] @@ -174,7 +172,7 @@ public async Task DispatchLoop_LogsErrorAndRetries_WhenOuterExceptionOccurs() bool recovered = await Task.WhenAny( dispatcherRecovered.Task, - Task.Delay(TimeSpan.FromSeconds(30))) == dispatcherRecovered.Task; + Task.Delay(TestTimeout)) == dispatcherRecovered.Task; await dispatcher.StopAsync(forced: true); dispatcher.Dispose(); @@ -230,7 +228,7 @@ public async Task DispatchLoop_ProcessesWorkItemsSuccessfully() bool completed = await Task.WhenAny( allItemsProcessed.Task, - Task.Delay(TimeSpan.FromSeconds(30))) == allItemsProcessed.Task; + Task.Delay(TestTimeout)) == allItemsProcessed.Task; await dispatcher.StopAsync(forced: true); dispatcher.Dispose(); @@ -251,10 +249,18 @@ public async Task DispatchLoop_StopsGracefully_WhenNoExceptions() builder.SetMinimumLevel(LogLevel.Trace); }); + // Use a signal so we know the dispatch loop is actively running + // before we ask it to stop, rather than relying on a fixed delay. + var fetchStarted = new TaskCompletionSource(); + var dispatcher = new WorkItemDispatcher( "TestDispatcher", workItemIdentifier: item => item ?? "null", - fetchWorkItem: (timeout, ct) => Task.FromResult(null!), + fetchWorkItem: (timeout, ct) => + { + fetchStarted.TrySetResult(true); + return Task.FromResult(null!); + }, processWorkItem: item => Task.CompletedTask); dispatcher.MaxConcurrentWorkItems = 1; @@ -263,14 +269,26 @@ public async Task DispatchLoop_StopsGracefully_WhenNoExceptions() // Act await dispatcher.StartAsync(); - // Give it a moment to start dispatching - await Task.Delay(500); + + // Wait until the dispatch loop has actually started fetching + await Task.WhenAny(fetchStarted.Task, Task.Delay(TestTimeout)); + Assert.IsTrue(fetchStarted.Task.IsCompleted, "Dispatch loop should have started fetching."); + await dispatcher.StopAsync(forced: false); // The DispatcherStopped event is logged asynchronously after the - // dispatch loop exits its while loop, which may happen slightly - // after StopAsync returns. Give it a moment to complete. - await Task.Delay(2000); + // dispatch loop exits. Poll for it instead of using a fixed delay. + var sw = Stopwatch.StartNew(); + while (sw.Elapsed < TestTimeout) + { + if (logMessages.Any(m => m.EventId.Name == nameof(Logging.EventIds.DispatcherStopped))) + { + break; + } + + await Task.Delay(50); + } + dispatcher.Dispose(); // Assert: DispatcherStopped event should be logged, not DispatcherLoopFailed From cf6aefe0f07c25e3fe5abf745883b47c7074f0c9 Mon Sep 17 00:00:00 2001 From: Bernd Verst Date: Wed, 18 Mar 2026 16:09:21 -0700 Subject: [PATCH 3/4] Address PR review feedback from Copilot - Fix semaphore leak in outer catch: track whether concurrencyLock was acquired and release it in the catch to avoid permanently reducing available concurrency - Pass shutdownCancellationTokenSource.Token to Task.Delay in the outer catch backoff so shutdown requests don't wait the full 10s interval - Rewrite test 1 to exercise the actual outer catch path by throwing from GetDelayInSecondsAfterOnFetchException (which escapes the inner catch) and assert that DispatcherLoopFailed is logged - Rename test 3 from LogsErrorAndRetries_WhenOuterExceptionOccurs to LogsFetchWorkItemFailure_WhenFetchThrows to accurately describe what it tests (inner catch, not outer catch) - Fix InMemoryLogger.BeginScope to return a no-op IDisposable instead of null to avoid NREs when scopes are disposed --- .../WorkItemDispatcherTests.cs | 83 +++++++++++++------ src/DurableTask.Core/WorkItemDispatcher.cs | 18 +++- 2 files changed, 76 insertions(+), 25 deletions(-) diff --git a/Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs b/Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs index 7442b3af3..32df11181 100644 --- a/Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs +++ b/Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs @@ -31,53 +31,79 @@ public class WorkItemDispatcherTests // this guards against hangs without wasting time on every run. static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(10); [TestMethod] - public async Task DispatchLoop_SurvivesUnhandledException_AndContinuesProcessing() + public async Task DispatchLoop_SurvivesOuterException_ViaFaultySafeReleaseWorkItem() { - // Arrange: The fetch callback throws an ObjectDisposedException on the first call, - // then returns null (no work item) on subsequent calls. + // Arrange: GetDelayInSecondsAfterOnFetchException is called from inside the + // inner catch block, but if it throws, the exception escapes the inner + // catch and is caught by the outer catch — exercising the new safety net. + // We verify that DispatcherLoopFailed is logged and the loop continues. int fetchCallCount = 0; - var fetchCalledAfterException = new TaskCompletionSource(); + var secondFetchStarted = new TaskCompletionSource(); + + var logMessages = new ConcurrentBag(); + var loggerFactory = LoggerFactory.Create(builder => + { + builder.AddProvider(new InMemoryLoggerProvider(logMessages)); + builder.SetMinimumLevel(LogLevel.Trace); + }); var dispatcher = new WorkItemDispatcher( "TestDispatcher", - workItemIdentifier: item => item, + workItemIdentifier: item => item ?? "null", fetchWorkItem: (timeout, ct) => { int count = Interlocked.Increment(ref fetchCallCount); if (count == 1) { - // Simulate an exception that would escape the inner try/catch - // by throwing from the fetch callback. This gets caught by the inner catch, - // but let's test the outer catch by using a different approach below. - throw new InvalidOperationException("Test fetch exception"); - } - - if (count >= 3) - { - fetchCalledAfterException.TrySetResult(true); + // This exception will be caught by the inner catch, which then + // calls GetDelayInSecondsAfterOnFetchException (which also throws). + throw new InvalidOperationException("Trigger inner catch"); } + // If we reach here, the outer catch handled the callback failure + // and the loop continued. + secondFetchStarted.TrySetResult(true); return Task.FromResult(null!); }, processWorkItem: item => Task.CompletedTask); dispatcher.MaxConcurrentWorkItems = 1; dispatcher.DispatcherCount = 1; + dispatcher.LogHelper = new LogHelper(loggerFactory.CreateLogger("DurableTask.Core")); + + // This callback is invoked from the inner catch. Throwing here causes + // the exception to escape to the outer catch. + int delayCallCount = 0; + dispatcher.GetDelayInSecondsAfterOnFetchException = (ex) => + { + if (Interlocked.Increment(ref delayCallCount) == 1) + { + throw new InvalidOperationException("Failure in delay callback"); + } + + return 0; + }; // Act await dispatcher.StartAsync(); - // Wait for the dispatcher to recover and fetch again after the exception + // Wait for the loop to recover from the outer catch and start a second fetch. + // The outer catch backoff is 10s, but may be cancelled on shutdown. We give it + // enough time for the backoff to elapse. If the loop died, this will time out. bool recovered = await Task.WhenAny( - fetchCalledAfterException.Task, - Task.Delay(TestTimeout)) == fetchCalledAfterException.Task; + secondFetchStarted.Task, + Task.Delay(TimeSpan.FromSeconds(15))) == secondFetchStarted.Task; await dispatcher.StopAsync(forced: true); dispatcher.Dispose(); - // Assert: The dispatcher continued running after the exception - Assert.IsTrue(recovered, "Dispatch loop should have continued after the exception."); - Assert.IsTrue(fetchCallCount >= 3, $"Expected at least 3 fetch calls, got {fetchCallCount}."); + // Assert: The dispatch loop survived the outer exception and continued + Assert.IsTrue(recovered, "Dispatch loop should have recovered after outer catch exception."); + + // The outer catch should have logged DispatcherLoopFailed + bool hasLoopFailed = logMessages.Any(m => + m.EventId.Name == nameof(Logging.EventIds.DispatcherLoopFailed)); + Assert.IsTrue(hasLoopFailed, "Expected DispatcherLoopFailed log event from the outer catch."); } [TestMethod] @@ -129,9 +155,11 @@ public async Task DispatchLoop_SurvivesMultipleExceptionTypes_AndContinuesProces } [TestMethod] - public async Task DispatchLoop_LogsErrorAndRetries_WhenOuterExceptionOccurs() + public async Task DispatchLoop_LogsFetchWorkItemFailure_WhenFetchThrows() { - // Arrange: Use a logging ILogger to capture log events + // Arrange: Use a logging ILogger to capture log events. + // The fetch exception is handled by the inner catch, which logs + // FetchWorkItemFailure — verifying that path works correctly. var logMessages = new ConcurrentBag(); var loggerFactory = LoggerFactory.Create(builder => { @@ -150,7 +178,8 @@ public async Task DispatchLoop_LogsErrorAndRetries_WhenOuterExceptionOccurs() int count = Interlocked.Increment(ref fetchCallCount); if (count == 1) { - // This exception is caught by the inner catch block + // This exception is caught by the inner catch block, + // which logs FetchWorkItemFailure (not DispatcherLoopFailed). throw new InvalidOperationException("Simulated fetch failure"); } @@ -357,6 +386,12 @@ class LogEntry public Exception? Exception { get; set; } } + class NoOpDisposable : IDisposable + { + public static readonly NoOpDisposable Instance = new NoOpDisposable(); + public void Dispose() { } + } + /// /// An ILogger that captures log entries to a concurrent bag for test assertions. /// @@ -369,7 +404,7 @@ public InMemoryLogger(ConcurrentBag logs) this.logs = logs; } - public IDisposable BeginScope(TState state) => null!; + public IDisposable BeginScope(TState state) => NoOpDisposable.Instance; public bool IsEnabled(LogLevel logLevel) => true; diff --git a/src/DurableTask.Core/WorkItemDispatcher.cs b/src/DurableTask.Core/WorkItemDispatcher.cs index d507b2616..eee812024 100644 --- a/src/DurableTask.Core/WorkItemDispatcher.cs +++ b/src/DurableTask.Core/WorkItemDispatcher.cs @@ -231,6 +231,7 @@ async Task DispatchAsync(WorkItemDispatcherContext context) bool logThrottle = true; while (this.isStarted) { + var semaphoreAcquired = false; try { if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5))) @@ -254,6 +255,7 @@ async Task DispatchAsync(WorkItemDispatcherContext context) continue; } + semaphoreAcquired = true; logThrottle = true; var delaySecs = 0; @@ -370,7 +372,21 @@ async Task DispatchAsync(WorkItemDispatcherContext context) this.GetFormattedLog(dispatcherId, $"Unhandled exception in dispatch loop. Will retry after backoff.")); - await Task.Delay(TimeSpan.FromSeconds(BackOffIntervalOnInvalidOperationSecs)); + // Release the semaphore if we acquired it but never handed it off + // to ProcessWorkItemAsync, to avoid permanently reducing concurrency. + if (semaphoreAcquired) + { + try { this.concurrencyLock.Release(); } catch { /* best effort */ } + } + + try + { + await Task.Delay(TimeSpan.FromSeconds(BackOffIntervalOnInvalidOperationSecs), this.shutdownCancellationTokenSource.Token); + } + catch (OperationCanceledException) + { + // Shutdown requested during backoff; exit promptly. + } } } From 09d65c6ebe8c1b8f1fa3eb55fca0f2836420bfd3 Mon Sep 17 00:00:00 2001 From: Bernd Verst Date: Wed, 18 Mar 2026 16:25:16 -0700 Subject: [PATCH 4/4] Address round 2 PR review feedback - Fix double-release risk: move scheduledWorkItem declaration outside the inner try block so the outer catch can check whether the semaphore was handed off to ProcessWorkItemAsync before releasing it - Guard CTS.Token against ObjectDisposedException in backoff delay: catch ObjectDisposedException alongside OperationCanceledException so a disposed CTS during rapid shutdown doesn't fault the dispatch loop - Move test file from Test/ (uppercase) to test/ (lowercase) to match the csproj location and ensure tests compile on case-sensitive file systems - Rename test from ViaFaultySafeReleaseWorkItem to ViaFaultyDelayCallback to accurately describe the mechanism being tested --- src/DurableTask.Core/WorkItemDispatcher.cs | 9 +++++++-- .../DurableTask.Core.Tests/WorkItemDispatcherTests.cs | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) rename {Test => test}/DurableTask.Core.Tests/WorkItemDispatcherTests.cs (99%) diff --git a/src/DurableTask.Core/WorkItemDispatcher.cs b/src/DurableTask.Core/WorkItemDispatcher.cs index eee812024..1a64065e5 100644 --- a/src/DurableTask.Core/WorkItemDispatcher.cs +++ b/src/DurableTask.Core/WorkItemDispatcher.cs @@ -232,6 +232,7 @@ async Task DispatchAsync(WorkItemDispatcherContext context) while (this.isStarted) { var semaphoreAcquired = false; + var scheduledWorkItem = false; try { if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5))) @@ -326,7 +327,6 @@ async Task DispatchAsync(WorkItemDispatcherContext context) Interlocked.Decrement(ref this.activeFetchers); } - var scheduledWorkItem = false; if (!IsNull(workItem)) { if (!this.isStarted) @@ -374,7 +374,7 @@ async Task DispatchAsync(WorkItemDispatcherContext context) // Release the semaphore if we acquired it but never handed it off // to ProcessWorkItemAsync, to avoid permanently reducing concurrency. - if (semaphoreAcquired) + if (semaphoreAcquired && !scheduledWorkItem) { try { this.concurrencyLock.Release(); } catch { /* best effort */ } } @@ -387,6 +387,11 @@ async Task DispatchAsync(WorkItemDispatcherContext context) { // Shutdown requested during backoff; exit promptly. } + catch (ObjectDisposedException) + { + // CancellationTokenSource was disposed (e.g., Dispose called + // shortly after StopAsync); treat as shutdown. + } } } diff --git a/Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs b/test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs similarity index 99% rename from Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs rename to test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs index 32df11181..275503748 100644 --- a/Test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs +++ b/test/DurableTask.Core.Tests/WorkItemDispatcherTests.cs @@ -31,7 +31,7 @@ public class WorkItemDispatcherTests // this guards against hangs without wasting time on every run. static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(10); [TestMethod] - public async Task DispatchLoop_SurvivesOuterException_ViaFaultySafeReleaseWorkItem() + public async Task DispatchLoop_SurvivesOuterException_ViaFaultyDelayCallback() { // Arrange: GetDelayInSecondsAfterOnFetchException is called from inside the // inner catch block, but if it throws, the exception escapes the inner