Skip to content
51 changes: 20 additions & 31 deletions src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,37 +150,29 @@ public bool IsOutOfOrderMessage(MessageData message)
return false;
}

if (this.IsNonexistantInstance() && message.OriginalQueueMessage.DequeueCount > 5)
{
// The first five times a message for a nonexistant instance is dequeued, give the message the benefit
// of the doubt and assume that the instance hasn't had its history table populated yet. After the
// fifth execution, ~30 seconds have passed and the most likely scenario is that this is a zombie event.
// This means the history table for the message's orchestration no longer exists, either due to an explicit
// PurgeHistory request or due to a ContinueAsNew call cleaning the old execution's history.
return false;
}

if (this.LastCheckpointTime > message.TaskMessage.Event.Timestamp)
{
// LastCheckpointTime represents the time at which the most recent history checkpoint completed.
// The checkpoint is written to the history table only *after* all queue messages are sent.
// A message is out of order when its timestamp *preceeds* the most recent checkpoint timestamp.
// In this case, we see that the checkpoint came *after* the message, so there is no out-of-order
// concern. Note that this logic only applies for messages sent by orchestrations to themselves.
// The next check considers the other cases (activities, sub-orchestrations, etc.).
// Orchestration checkpoint time information was added only after v1.6.4.
return false;
}

// The first five times a message for a nonexistant instance is dequeued, give the message the benefit
// of the doubt and assume that the instance hasn't had its history table populated yet. After the
// fifth execution, ~30 seconds have passed and the most likely scenario is that this is a zombie event.
// This means the history table for the message's orchestration no longer exists, either due to an explicit
// PurgeHistory request or due to a ContinueAsNew call cleaning the old execution's history.
bool nonExistentInstance = this.IsNonexistantInstance() && message.OriginalQueueMessage.DequeueCount <= 5;

// LastCheckpointTime represents the time at which the most recent history checkpoint completed.
// The checkpoint is written to the history table only *after* all queue messages are sent.
// A message is out of order when its timestamp *preceeds* the most recent checkpoint timestamp.
// In this case, we see that the checkpoint came *after* the message, so there is no out-of-order
Comment thread
sophiatev marked this conversation as resolved.
Outdated
// concern. Note that this logic only applies for messages sent by orchestrations to themselves.
// The next check considers the other cases (activities, sub-orchestrations, etc.).
// Orchestration checkpoint time information was added only after v1.6.4.
bool isStaleCheckpoint = this.LastCheckpointTime <= message.TaskMessage.Event.Timestamp;

bool triggeringTaskDoesNotExist = true;
if (Utils.TryGetTaskScheduledId(message.TaskMessage.Event, out int taskScheduledId))
{
// This message is a response to a task. Search the history to make sure that we've recorded the fact that
// this task was scheduled.
HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.LastOrDefault(e => e.EventId == taskScheduledId);
if (mostRecentTaskEvent != null)
{
return false;
}
triggeringTaskDoesNotExist = mostRecentTaskEvent == null;
}

if (message.TaskMessage.Event.EventType == EventType.EventRaised)
Expand All @@ -190,15 +182,12 @@ public bool IsOutOfOrderMessage(MessageData message)
if (requestId != null)
{
HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.FirstOrDefault(e => e.EventType == EventType.EventSent && FindRequestId(((EventSentEvent)e).Input)?.ToString() == requestId);
if (mostRecentTaskEvent != null)
{
return false;
}
triggeringTaskDoesNotExist = mostRecentTaskEvent == null;
}
}

// The message is out of order and cannot be handled by the current session.
return true;
return nonExistentInstance || isStaleCheckpoint || triggeringTaskDoesNotExist;
Comment thread
sophiatev marked this conversation as resolved.
Outdated
Comment thread
sophiatev marked this conversation as resolved.
Outdated
Comment thread
sophiatev marked this conversation as resolved.
Outdated
}

Guid? FindRequestId(string input)
Expand Down
Loading