Skip to content

Commit 646bb87

Browse files
sophiatevSophia Tevosyan
andauthored
Fix race condition with continue-as-new (#1303)
* moved placement of session.UpdatedRuntimeState call to be before we commit any outbound messages * removed the unnecessary changes from the orchestration service, due to initial misdiagnosis, and changed the IsOutOfOrder logic instead * fixed the endlessly abandoning nonexistent instances bug * updated implementation and added tests * updated the tests a bit, resolved some copilot comments * restored a usings, added a comment * updated a potentially flaky test * changed instance/execution IDs in tests to be random --------- Co-authored-by: Sophia Tevosyan <stevosyan@microsoft.com>
1 parent 06d542d commit 646bb87

4 files changed

Lines changed: 538 additions & 298 deletions

File tree

src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1821,7 +1821,7 @@ public async Task SendTaskOrchestrationMessageAsync(TaskMessage message)
18211821
await this.SendTaskOrchestrationMessageInternalAsync(EmptySourceInstance, controlQueue, message);
18221822
}
18231823

1824-
Task<MessageData> SendTaskOrchestrationMessageInternalAsync(
1824+
internal Task<MessageData> SendTaskOrchestrationMessageInternalAsync(
18251825
OrchestrationInstance sourceInstance,
18261826
ControlQueue controlQueue,
18271827
TaskMessage message)

src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -164,18 +164,6 @@ public bool IsOutOfOrderMessage(MessageData message)
164164
return false;
165165
}
166166

167-
if (this.LastCheckpointTime > message.TaskMessage.Event.Timestamp)
168-
{
169-
// LastCheckpointTime represents the time at which the most recent history checkpoint completed.
170-
// The checkpoint is written to the history table only *after* all queue messages are sent.
171-
// A message is out of order when its timestamp *preceeds* the most recent checkpoint timestamp.
172-
// In this case, we see that the checkpoint came *after* the message, so there is no out-of-order
173-
// concern. Note that this logic only applies for messages sent by orchestrations to themselves.
174-
// The next check considers the other cases (activities, sub-orchestrations, etc.).
175-
// Orchestration checkpoint time information was added only after v1.6.4.
176-
return false;
177-
}
178-
179167
if (Utils.TryGetTaskScheduledId(message.TaskMessage.Event, out int taskScheduledId))
180168
{
181169
// This message is a response to a task. Search the history to make sure that we've recorded the fact that
@@ -193,7 +181,7 @@ public bool IsOutOfOrderMessage(MessageData message)
193181
var requestId = ((EventRaisedEvent)message.TaskMessage.Event).Name;
194182
if (requestId != null)
195183
{
196-
HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.FirstOrDefault(e => e.EventType == EventType.EventSent && FindRequestId(((EventSentEvent)e).Input)?.ToString() == requestId);
184+
HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.LastOrDefault(e => e.EventType == EventType.EventSent && FindRequestId(((EventSentEvent)e).Input)?.ToString() == requestId);
197185
if (mostRecentTaskEvent != null)
198186
{
199187
return false;

test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs

Lines changed: 0 additions & 275 deletions
Original file line numberDiff line numberDiff line change
@@ -643,281 +643,6 @@ await TestHelpers.WaitFor(
643643
}
644644
}
645645

646-
/// <summary>
647-
/// Confirm that:
648-
/// 1. If <see cref="AzureStorageOrchestrationServiceSettings.UseInstanceTableEtag"/> is true, and a worker attempts to update the instance table with a stale
649-
/// etag upon completing a work item, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has the correct status code
650-
/// (precondition failed).
651-
/// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table. When it attempts to update
652-
/// the instance table with a stale etag, it will fail.
653-
/// 2. If <see cref="AzureStorageOrchestrationServiceSettings.UseInstanceTableEtag"/> is false for the above scenario, then the call to update the instance table
654-
/// will go through, and the instance table will be updated with a "stale" status.
655-
/// </summary>
656-
/// <remarks>
657-
/// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker
658-
/// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker
659-
/// has since updated" the instance table.
660-
/// </remarks>
661-
/// <param name="useInstanceEtag">The value to use for <see cref="AzureStorageOrchestrationServiceSettings.UseInstanceTableEtag"/></param>
662-
/// <returns></returns>
663-
[DataTestMethod]
664-
[DataRow(true)]
665-
[DataRow(false)]
666-
public async Task WorkerAttemptingToUpdateInstanceTableAfterStalling(bool useInstanceEtag)
667-
{
668-
AzureStorageOrchestrationService service = null;
669-
try
670-
{
671-
var orchestrationInstance = new OrchestrationInstance
672-
{
673-
InstanceId = "instance_id",
674-
ExecutionId = "execution_id",
675-
};
676-
677-
ExecutionStartedEvent startedEvent = new(-1, string.Empty)
678-
{
679-
Name = "orchestration",
680-
Version = string.Empty,
681-
OrchestrationInstance = orchestrationInstance,
682-
ScheduledStartTime = DateTime.UtcNow,
683-
};
684-
685-
var settings = new AzureStorageOrchestrationServiceSettings
686-
{
687-
PartitionCount = 1,
688-
StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()),
689-
TaskHubName = TestHelpers.GetTestTaskHubName(),
690-
ExtendedSessionsEnabled = false,
691-
UseInstanceTableEtag = useInstanceEtag
692-
};
693-
this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe);
694-
695-
service = new AzureStorageOrchestrationService(settings);
696-
await service.CreateAsync();
697-
await service.StartAsync();
698-
699-
// Create the orchestration and get the first work item and start "working" on it
700-
await service.CreateTaskOrchestrationAsync(
701-
new TaskMessage()
702-
{
703-
OrchestrationInstance = orchestrationInstance,
704-
Event = startedEvent
705-
});
706-
var workItem = await service.LockNextTaskOrchestrationWorkItemAsync(
707-
TimeSpan.FromMinutes(5),
708-
CancellationToken.None);
709-
var runtimeState = workItem.OrchestrationRuntimeState;
710-
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
711-
runtimeState.AddEvent(startedEvent);
712-
runtimeState.AddEvent(new TaskScheduledEvent(0));
713-
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
714-
715-
AzureStorageClient azureStorageClient = new AzureStorageClient(settings);
716-
717-
// Now manually update the instance to have status "Completed"
718-
Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName);
719-
TableEntity entity = new(orchestrationInstance.InstanceId, "")
720-
{
721-
["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"),
722-
};
723-
await instanceTable.MergeEntityAsync(entity, Azure.ETag.All);
724-
725-
if (useInstanceEtag)
726-
{
727-
// Confirm an exception is thrown due to the etag mismatch for the instance table when the worker attempts to complete the work item
728-
SessionAbortedException exception = await Assert.ThrowsExceptionAsync<SessionAbortedException>(async () =>
729-
await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null)
730-
);
731-
Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException));
732-
DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException;
733-
Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode);
734-
}
735-
else
736-
{
737-
await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null);
738-
739-
var queryCondition = new OrchestrationInstanceStatusQueryCondition
740-
{
741-
InstanceId = "instance_id",
742-
FetchInput = false,
743-
};
744-
745-
ODataCondition odata = queryCondition.ToOData();
746-
OrchestrationInstanceStatus instanceTableEntity = await instanceTable
747-
.ExecuteQueryAsync<OrchestrationInstanceStatus>(odata.Filter, 1, odata.Select, CancellationToken.None)
748-
.FirstOrDefaultAsync();
749-
750-
// Confirm the instance table was updated with a "stale" status
751-
Assert.IsNotNull(instanceTableEntity);
752-
Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus);
753-
}
754-
}
755-
finally
756-
{
757-
await service?.StopAsync(isForced: true);
758-
}
759-
}
760-
761-
/// <summary>
762-
/// Confirm that:
763-
/// 1. If <see cref="AzureStorageOrchestrationServiceSettings.UseInstanceTableEtag"/> is true, and a worker attempts to update the instance table with a stale
764-
/// etag upon completing a work item for a suborchestration, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has
765-
/// the correct status code (conflict).
766-
/// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table for the first work item
767-
/// for a suborchestration. When it attempts to insert a new entity into the instance table for the suborchestration (since for a suborchestration,
768-
/// the instance entity is only created upon completion of the first work item), it will fail.
769-
/// 2. If <see cref="AzureStorageOrchestrationServiceSettings.UseInstanceTableEtag"/> is false for the above scenario, then the call to update the instance table
770-
/// will go through, and the instance table will be updated with a "stale" status.
771-
/// </summary>
772-
/// <remarks>
773-
/// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker
774-
/// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker
775-
/// has since updated" the instance table.
776-
/// </remarks>
777-
/// <param name="useInstanceEtag">The value to use for <see cref="AzureStorageOrchestrationServiceSettings.UseInstanceTableEtag"/></param>
778-
/// <returns></returns>
779-
[DataTestMethod]
780-
[DataRow(true)]
781-
[DataRow(false)]
782-
public async Task WorkerAttemptingToUpdateInstanceTableAfterStallingForSubOrchestration(bool useInstanceEtag)
783-
{
784-
AzureStorageOrchestrationService service = null;
785-
try
786-
{
787-
var orchestrationInstance = new OrchestrationInstance
788-
{
789-
InstanceId = "instance_id",
790-
ExecutionId = "execution_id",
791-
};
792-
793-
ExecutionStartedEvent startedEvent = new(-1, string.Empty)
794-
{
795-
Name = "orchestration",
796-
Version = string.Empty,
797-
OrchestrationInstance = orchestrationInstance,
798-
ScheduledStartTime = DateTime.UtcNow,
799-
};
800-
801-
var settings = new AzureStorageOrchestrationServiceSettings
802-
{
803-
PartitionCount = 1,
804-
StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()),
805-
TaskHubName = TestHelpers.GetTestTaskHubName(),
806-
ExtendedSessionsEnabled = false,
807-
UseInstanceTableEtag = useInstanceEtag
808-
};
809-
this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe);
810-
811-
service = new AzureStorageOrchestrationService(settings);
812-
await service.CreateAsync();
813-
await service.StartAsync();
814-
815-
// Create the orchestration and get the first work item and start "working" on it
816-
await service.CreateTaskOrchestrationAsync(
817-
new TaskMessage()
818-
{
819-
OrchestrationInstance = orchestrationInstance,
820-
Event = startedEvent
821-
});
822-
var workItem = await service.LockNextTaskOrchestrationWorkItemAsync(
823-
TimeSpan.FromMinutes(5),
824-
CancellationToken.None);
825-
var runtimeState = workItem.OrchestrationRuntimeState;
826-
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
827-
runtimeState.AddEvent(startedEvent);
828-
runtimeState.AddEvent(new SubOrchestrationInstanceCreatedEvent(0)
829-
{
830-
Name = "suborchestration",
831-
InstanceId = "sub_instance_id"
832-
});
833-
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
834-
835-
// Create the task message to start the suborchestration
836-
var subOrchestrationExecutionStartedEvent = new ExecutionStartedEvent(-1, string.Empty)
837-
{
838-
OrchestrationInstance = new OrchestrationInstance
839-
{
840-
InstanceId = "sub_instance_id",
841-
ExecutionId = Guid.NewGuid().ToString("N")
842-
},
843-
ParentInstance = new ParentInstance
844-
{
845-
OrchestrationInstance = runtimeState.OrchestrationInstance,
846-
Name = runtimeState.Name,
847-
Version = runtimeState.Version,
848-
TaskScheduleId = 0,
849-
},
850-
Name = "suborchestration"
851-
};
852-
List<TaskMessage> orchestratorMessages =
853-
new() {
854-
new TaskMessage()
855-
{
856-
OrchestrationInstance = subOrchestrationExecutionStartedEvent.OrchestrationInstance,
857-
Event = subOrchestrationExecutionStartedEvent,
858-
}
859-
};
860-
861-
// Complete the first work item, which will send the execution started message for the suborchestration
862-
await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List<TaskMessage>(), orchestratorMessages, new List<TaskMessage>(), null, null);
863-
864-
// Now get the work item for the suborchestration and "work" on it
865-
workItem = await service.LockNextTaskOrchestrationWorkItemAsync(
866-
TimeSpan.FromMinutes(5),
867-
CancellationToken.None);
868-
runtimeState = workItem.OrchestrationRuntimeState;
869-
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
870-
runtimeState.AddEvent(subOrchestrationExecutionStartedEvent);
871-
runtimeState.AddEvent(new TaskScheduledEvent(0));
872-
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
873-
874-
AzureStorageClient azureStorageClient = new(settings);
875-
Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName);
876-
// Now manually update the suborchestration to have status "Completed"
877-
TableEntity entity = new("sub_instance_id", "")
878-
{
879-
["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"),
880-
};
881-
await instanceTable.InsertEntityAsync(entity);
882-
883-
if (useInstanceEtag)
884-
{
885-
// Confirm an exception is thrown because the worker attempts to insert a new entity for the suborchestration into the instance table
886-
// when one already exists
887-
SessionAbortedException exception = await Assert.ThrowsExceptionAsync<SessionAbortedException>(async () =>
888-
await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null)
889-
);
890-
Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException));
891-
DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException;
892-
Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode);
893-
}
894-
else
895-
{
896-
await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null);
897-
898-
var queryCondition = new OrchestrationInstanceStatusQueryCondition
899-
{
900-
InstanceId = "sub_instance_id",
901-
FetchInput = false,
902-
};
903-
904-
ODataCondition odata = queryCondition.ToOData();
905-
OrchestrationInstanceStatus instanceTableEntity = await instanceTable
906-
.ExecuteQueryAsync<OrchestrationInstanceStatus>(odata.Filter, 1, odata.Select, CancellationToken.None)
907-
.FirstOrDefaultAsync();
908-
909-
// Confirm the instance table was updated with a "stale" status
910-
Assert.IsNotNull(instanceTableEntity);
911-
Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus);
912-
}
913-
914-
}
915-
finally
916-
{
917-
await service?.StopAsync(isForced: true);
918-
}
919-
}
920-
921646
[TestMethod]
922647
public async Task MonitorIdleTaskHubDisconnected()
923648
{

0 commit comments

Comments
 (0)