Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/DurableTask.Core/Logging/EventIds.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ static class EventIds
public const int ProcessWorkItemStarting = 27;
public const int ProcessWorkItemCompleted = 28;
public const int ProcessWorkItemFailed = 29;
public const int DispatcherLoopFailed = 30;

public const int SchedulingOrchestration = 40;
public const int RaisingEvent = 41;
Expand Down
31 changes: 31 additions & 0 deletions src/DurableTask.Core/Logging/LogEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,37 @@ void IEventSourceEvent.WriteEventSource() =>
StructuredEventSource.Log.DispatcherStopped(this.Dispatcher, Utils.AppName, Utils.PackageVersion);
}

internal class DispatcherLoopFailed : StructuredLogEvent, IEventSourceEvent
{
public DispatcherLoopFailed(WorkItemDispatcherContext context, Exception exception)
{
this.Dispatcher = context.GetDisplayName();
this.Details = exception.ToString();
}

[StructuredLogField]
public string Dispatcher { get; }

[StructuredLogField]
public string Details { get; }

public override EventId EventId => new EventId(
EventIds.DispatcherLoopFailed,
nameof(EventIds.DispatcherLoopFailed));

public override LogLevel Level => LogLevel.Error;

protected override string CreateLogMessage() =>
$"{this.Dispatcher}: Unhandled exception in dispatch loop. Will retry after backoff. Details: {this.Details}";

void IEventSourceEvent.WriteEventSource() =>
StructuredEventSource.Log.DispatcherLoopFailed(
this.Dispatcher,
this.Details,
Utils.AppName,
Utils.PackageVersion);
}

internal class DispatchersStopping : StructuredLogEvent, IEventSourceEvent
{
public DispatchersStopping(string name, string id, int concurrentWorkItemCount, int activeFetchers)
Expand Down
15 changes: 15 additions & 0 deletions src/DurableTask.Core/Logging/LogHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,21 @@ internal void DispatcherStopped(WorkItemDispatcherContext context)
}
}

/// <summary>
/// Logs that a work item dispatch loop encountered an unhandled exception.
/// </summary>
/// <param name="context">The context of the dispatcher that failed.</param>
/// <param name="exception">The unhandled exception.</param>
internal void DispatcherLoopFailed(WorkItemDispatcherContext context, Exception exception)
{
if (this.IsStructuredLoggingEnabled)
{
this.WriteStructuredLog(
new LogEvents.DispatcherLoopFailed(context, exception),
exception);
}
}

/// <summary>
/// Logs that the work item dispatcher is watching for individual dispatch loops to finish stopping.
/// </summary>
Expand Down
9 changes: 9 additions & 0 deletions src/DurableTask.Core/Logging/StructuredEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ internal void DispatcherStopped(string Dispatcher, string AppName, string Extens
}
}

[Event(EventIds.DispatcherLoopFailed, Level = EventLevel.Error, Version = 1)]
internal void DispatcherLoopFailed(string Dispatcher, string Details, string AppName, string ExtensionVersion)
{
if (this.IsEnabled(EventLevel.Error))
{
this.WriteEvent(EventIds.DispatcherLoopFailed, Dispatcher, Details, AppName, ExtensionVersion);
}
}

[Event(EventIds.DispatchersStopping, Level = EventLevel.Verbose, Version = 1)]
internal void DispatchersStopping(
string Dispatcher,
Expand Down
240 changes: 143 additions & 97 deletions src/DurableTask.Core/WorkItemDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,14 @@ public async Task StartAsync()

// We just want this to Run we intentionally don't wait
#pragma warning disable 4014
Task.Run(() => this.DispatchAsync(context));
Task.Run(() => this.DispatchAsync(context)).ContinueWith(t =>
{
TraceHelper.TraceException(
TraceEventType.Critical,
"WorkItemDispatcherDispatch-FatalTermination",
t.Exception,
$"Dispatch loop for '{this.name}' terminated fatally!");
}, TaskContinuationOptions.OnlyOnFaulted);
#pragma warning restore 4014
}
}
Expand Down Expand Up @@ -224,128 +231,167 @@ async Task DispatchAsync(WorkItemDispatcherContext context)
bool logThrottle = true;
while (this.isStarted)
{
if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5)))
var semaphoreAcquired = false;
var scheduledWorkItem = false;
try
{
if (logThrottle)
if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5)))
{
// This can happen frequently under heavy load.
// To avoid log spam, we log just once until we can proceed.
this.LogHelper.FetchingThrottled(
context,
this.concurrentWorkItemCount,
this.MaxConcurrentWorkItems);
TraceHelper.Trace(
TraceEventType.Warning,
"WorkItemDispatcherDispatch-MaxOperations",
this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept."));

logThrottle = false;
if (logThrottle)
{
// This can happen frequently under heavy load.
// To avoid log spam, we log just once until we can proceed.
this.LogHelper.FetchingThrottled(
context,
this.concurrentWorkItemCount,
this.MaxConcurrentWorkItems);
TraceHelper.Trace(
TraceEventType.Warning,
"WorkItemDispatcherDispatch-MaxOperations",
this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept."));

logThrottle = false;
}

continue;
}

continue;
}
semaphoreAcquired = true;
logThrottle = true;

logThrottle = true;
var delaySecs = 0;
T workItem = default(T);
try
{
Interlocked.Increment(ref this.activeFetchers);
this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems);
TraceHelper.Trace(
TraceEventType.Verbose,
"WorkItemDispatcherDispatch-StartFetch",
this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));

var delaySecs = 0;
T workItem = default(T);
try
{
Interlocked.Increment(ref this.activeFetchers);
this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems);
TraceHelper.Trace(
TraceEventType.Verbose,
"WorkItemDispatcherDispatch-StartFetch",
this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
Stopwatch timer = Stopwatch.StartNew();
workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token);

Stopwatch timer = Stopwatch.StartNew();
workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token);
if (!IsNull(workItem))
{
string workItemId = this.workItemIdentifier(workItem);
this.LogHelper.FetchWorkItemCompleted(
context,
workItemId,
timer.Elapsed,
this.concurrentWorkItemCount,
this.MaxConcurrentWorkItems);
}

if (!IsNull(workItem))
TraceHelper.Trace(
TraceEventType.Verbose,
"WorkItemDispatcherDispatch-EndFetch",
this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
}
catch (TimeoutException)
{
string workItemId = this.workItemIdentifier(workItem);
this.LogHelper.FetchWorkItemCompleted(
context,
workItemId,
timer.Elapsed,
this.concurrentWorkItemCount,
this.MaxConcurrentWorkItems);
delaySecs = 0;
}

TraceHelper.Trace(
TraceEventType.Verbose,
"WorkItemDispatcherDispatch-EndFetch",
this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
}
catch (TimeoutException)
{
delaySecs = 0;
}
catch (TaskCanceledException exception)
{
TraceHelper.Trace(
TraceEventType.Information,
"WorkItemDispatcherDispatch-TaskCanceledException",
this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}"));
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
}
catch (Exception exception)
{
if (!this.isStarted)
catch (TaskCanceledException exception)
{
TraceHelper.Trace(
TraceEventType.Information,
"WorkItemDispatcherDispatch-HarmlessException",
this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}"));
TraceEventType.Information,
"WorkItemDispatcherDispatch-TaskCanceledException",
this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}"));
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
}
else
catch (Exception exception)
{
this.LogHelper.FetchWorkItemFailure(context, exception);
// TODO : dump full node context here
TraceHelper.TraceException(
TraceEventType.Warning,
"WorkItemDispatcherDispatch-Exception",
exception,
this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}"));
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
if (!this.isStarted)
{
TraceHelper.Trace(
TraceEventType.Information,
"WorkItemDispatcherDispatch-HarmlessException",
this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}"));
}
else
{
this.LogHelper.FetchWorkItemFailure(context, exception);
// TODO : dump full node context here
TraceHelper.TraceException(
TraceEventType.Warning,
"WorkItemDispatcherDispatch-Exception",
exception,
this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}"));
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
}
}
finally
{
Interlocked.Decrement(ref this.activeFetchers);
}
}
finally
{
Interlocked.Decrement(ref this.activeFetchers);
}

var scheduledWorkItem = false;
if (!IsNull(workItem))
{
if (!this.isStarted)
if (!IsNull(workItem))
{
if (this.SafeReleaseWorkItem != null)
if (!this.isStarted)
{
await this.SafeReleaseWorkItem(workItem);
if (this.SafeReleaseWorkItem != null)
{
await this.SafeReleaseWorkItem(workItem);
}
}
else
{
Interlocked.Increment(ref this.concurrentWorkItemCount);
// We just want this to Run we intentionally don't wait
#pragma warning disable 4014
Task.Run(() => this.ProcessWorkItemAsync(context, workItem));
#pragma warning restore 4014

scheduledWorkItem = true;
}
}
else

delaySecs = Math.Max(this.delayOverrideSecs, delaySecs);
if (delaySecs > 0)
{
Interlocked.Increment(ref this.concurrentWorkItemCount);
// We just want this to Run we intentionally don't wait
#pragma warning disable 4014
Task.Run(() => this.ProcessWorkItemAsync(context, workItem));
#pragma warning restore 4014
await Task.Delay(TimeSpan.FromSeconds(delaySecs));
}

scheduledWorkItem = true;
if (!scheduledWorkItem)
{
this.concurrencyLock.Release();
}
}

delaySecs = Math.Max(this.delayOverrideSecs, delaySecs);
if (delaySecs > 0)
catch (Exception exception) when (!Utils.IsFatal(exception))
{
await Task.Delay(TimeSpan.FromSeconds(delaySecs));
}
// Catch-all for any unhandled exception in the dispatch loop body.
// Without this, the dispatch loop would silently terminate because
// DispatchAsync runs as a fire-and-forget Task.Run.
this.LogHelper.DispatcherLoopFailed(context, exception);
TraceHelper.TraceException(
TraceEventType.Error,
"WorkItemDispatcherDispatch-UnhandledException",
exception,
this.GetFormattedLog(dispatcherId,
$"Unhandled exception in dispatch loop. Will retry after backoff."));

// Release the semaphore if we acquired it but never handed it off
// to ProcessWorkItemAsync, to avoid permanently reducing concurrency.
if (semaphoreAcquired && !scheduledWorkItem)
{
try { this.concurrencyLock.Release(); } catch { /* best effort */ }
}

if (!scheduledWorkItem)
{
this.concurrencyLock.Release();
try
{
await Task.Delay(TimeSpan.FromSeconds(BackOffIntervalOnInvalidOperationSecs), this.shutdownCancellationTokenSource.Token);
}
catch (OperationCanceledException)
{
// Shutdown requested during backoff; exit promptly.
}
catch (ObjectDisposedException)
{
// CancellationTokenSource was disposed (e.g., Dispose called
// shortly after StopAsync); treat as shutdown.
}
}
}

Expand Down
Loading
Loading