From 26734db3e2439978f65ecc5eaa6baad02872c515 Mon Sep 17 00:00:00 2001
From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com>
Date: Fri, 13 Mar 2026 15:33:27 -0700
Subject: [PATCH 1/8] enhance purge
---
.../PurgeScalabilityTests.cs | 446 ++++++++++++++++++
.../Storage/TableDeleteBatchParallelTests.cs | 372 +++++++++++++++
.../MessageManager.cs | 23 +-
src/DurableTask.AzureStorage/Storage/Table.cs | 120 +++++
.../Tracking/AzureTableTrackingStore.cs | 61 ++-
.../AzureStorageScenarioTests.cs | 86 ++++
6 files changed, 1093 insertions(+), 15 deletions(-)
create mode 100644 Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs
create mode 100644 Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs
diff --git a/Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs b/Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs
new file mode 100644
index 000000000..065946ece
--- /dev/null
+++ b/Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs
@@ -0,0 +1,446 @@
+// ----------------------------------------------------------------------------------
+// Copyright Microsoft Corporation
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ----------------------------------------------------------------------------------
+#nullable enable
+namespace DurableTask.AzureStorage.Tests
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Azure;
+ using Azure.Data.Tables;
+ using DurableTask.AzureStorage.Monitoring;
+ using DurableTask.AzureStorage.Storage;
+ using DurableTask.AzureStorage.Tracking;
+ using DurableTask.Core;
+ using Microsoft.VisualStudio.TestTools.UnitTesting;
+ using Moq;
+
+ [TestClass]
+ public class PurgeScalabilityTests
+ {
+ const string ConnectionString = "UseDevelopmentStorage=true";
+ const string InstancesTableName = "TestInstances";
+ const string HistoryTableName = "TestHistory";
+
+ ///
+ /// Verifies that the DeleteHistoryAsync pipeline processes instances
+ /// and that the storageRequests counter accumulates correctly.
+ /// Previously there was a bug where rowsDeleted was counted as storageRequests.
+ ///
+ [TestMethod]
+ public async Task DeleteHistoryAsync_ViaPublicApi_AccumulatesStatistics()
+ {
+ // Arrange - create a tracking store with mocked tables
+ var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables();
+
+ // Setup: Instances table query returns 2 instances across 1 page
+ var instance1 = new OrchestrationInstanceStatus
+ {
+ PartitionKey = "instance1",
+ RowKey = "",
+ RuntimeStatus = "Completed",
+ CreatedTime = DateTime.UtcNow.AddHours(-2),
+ };
+ var instance2 = new OrchestrationInstanceStatus
+ {
+ PartitionKey = "instance2",
+ RowKey = "",
+ RuntimeStatus = "Completed",
+ CreatedTime = DateTime.UtcNow.AddHours(-1),
+ };
+
+ instancesTableClient
+ .Setup(t => t.QueryAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny>(),
+ It.IsAny()))
+ .Returns(AsyncPageable.FromPages(new[]
+ {
+ Page.FromValues(
+ new List { instance1, instance2 },
+ null,
+ new Mock().Object)
+ }));
+
+ // History table query: each instance has 5 history rows
+ historyTableClient
+ .Setup(t => t.QueryAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny>(),
+ It.IsAny()))
+ .Returns((string filter, int? maxPerPage, IEnumerable? select, CancellationToken ct) =>
+ {
+ string pk = filter.Contains("instance1") ? "instance1" : "instance2";
+ var entities = Enumerable.Range(0, 5)
+ .Select(i => new TableEntity(pk, $"rk_{i}") { ETag = ETag.All })
+ .ToList();
+ return AsyncPageable.FromPages(new[]
+ {
+ Page.FromValues(entities, null, new Mock().Object)
+ });
+ });
+
+ // History batch delete succeeds
+ historyTableClient
+ .Setup(t => t.SubmitTransactionAsync(
+ It.IsAny>(),
+ It.IsAny()))
+ .ReturnsAsync((IEnumerable actions, CancellationToken _) =>
+ {
+ int count = actions.Count();
+ var responses = Enumerable.Range(0, count).Select(_ => new Mock().Object).ToList();
+ return Response.FromValue>(responses, new Mock().Object);
+ });
+
+ // Instances table delete succeeds
+ instancesTableClient
+ .Setup(t => t.DeleteEntityAsync(
+ It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()))
+ .ReturnsAsync(new Mock().Object);
+
+ // Act
+ PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync(
+ createdTimeFrom: DateTime.UtcNow.AddHours(-3),
+ createdTimeTo: DateTime.UtcNow,
+ runtimeStatus: new[] { OrchestrationStatus.Completed });
+
+ // Assert
+ Assert.AreEqual(2, result.InstancesDeleted);
+ Assert.AreEqual(10, result.RowsDeleted); // 5 history rows per instance × 2 instances
+ }
+
+ ///
+ /// Verifies that the purge pipeline limits concurrent instance purges
+ /// to a bounded number (MaxPurgeInstanceConcurrency = 100).
+ /// With only 10 instances, all should run concurrently but not exceed the limit.
+ ///
+ [TestMethod]
+ public async Task DeleteHistoryAsync_RespectsMaxConcurrency()
+ {
+ // Arrange
+ int concurrentCount = 0;
+ int maxObservedConcurrency = 0;
+
+ var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables();
+
+ // 10 instances to delete
+ var instances = Enumerable.Range(0, 10).Select(i =>
+ new OrchestrationInstanceStatus
+ {
+ PartitionKey = $"instance{i}",
+ RowKey = "",
+ RuntimeStatus = "Completed",
+ CreatedTime = DateTime.UtcNow.AddHours(-1),
+ }).ToList();
+
+ instancesTableClient
+ .Setup(t => t.QueryAsync(
+ It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny()))
+ .Returns(AsyncPageable.FromPages(new[]
+ {
+ Page.FromValues(instances, null, new Mock().Object)
+ }));
+
+ // History: empty for each instance (no rows to delete)
+ historyTableClient
+ .Setup(t => t.QueryAsync(
+ It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny()))
+ .Returns(AsyncPageable.FromPages(new[]
+ {
+ Page.FromValues(new List(), null, new Mock().Object)
+ }));
+
+ // Instances delete: track concurrency
+ instancesTableClient
+ .Setup(t => t.DeleteEntityAsync(
+ It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()))
+ .Returns(async (string pk, string rk, ETag etag, CancellationToken ct) =>
+ {
+ int current = Interlocked.Increment(ref concurrentCount);
+ int snapshot;
+ do
+ {
+ snapshot = Volatile.Read(ref maxObservedConcurrency);
+ }
+ while (current > snapshot && Interlocked.CompareExchange(ref maxObservedConcurrency, current, snapshot) != snapshot);
+
+ await Task.Delay(30); // Simulate latency
+ Interlocked.Decrement(ref concurrentCount);
+ return new Mock().Object;
+ });
+
+ // Act
+ PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync(
+ createdTimeFrom: DateTime.UtcNow.AddHours(-3),
+ createdTimeTo: DateTime.UtcNow,
+ runtimeStatus: new[] { OrchestrationStatus.Completed });
+
+ // Assert
+ Assert.AreEqual(10, result.InstancesDeleted);
+ // With 10 instances and MaxPurgeInstanceConcurrency=100, all 10 should be able to run concurrently
+ Assert.IsTrue(
+ maxObservedConcurrency <= 100,
+ $"Max observed concurrency ({maxObservedConcurrency}) should not exceed MaxPurgeInstanceConcurrency (100)");
+ }
+
+ ///
+ /// Verifies that single-instance purge removes the TimestampProperty from the projection columns.
+ ///
+ [TestMethod]
+ public async Task PurgeInstanceHistory_ProjectsOnlyPKAndRK()
+ {
+ // Arrange
+ var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables();
+
+ // Setup instances table to return one instance
+ var instance = new OrchestrationInstanceStatus
+ {
+ PartitionKey = "testInstance",
+ RowKey = "",
+ RuntimeStatus = "Completed",
+ };
+
+ instancesTableClient
+ .Setup(t => t.QueryAsync(
+ It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny()))
+ .Returns(AsyncPageable.FromPages(new[]
+ {
+ Page.FromValues(
+ new List { instance }, null, new Mock().Object)
+ }));
+
+ // Capture the actual select columns passed to the history query
+ IEnumerable? capturedSelect = null;
+ historyTableClient
+ .Setup(t => t.QueryAsync(
+ It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny()))
+ .Callback((string filter, int? maxPerPage, IEnumerable? select, CancellationToken ct) =>
+ {
+ capturedSelect = select;
+ })
+ .Returns(AsyncPageable.FromPages(new[]
+ {
+ Page.FromValues(new List(), null, new Mock().Object)
+ }));
+
+ instancesTableClient
+ .Setup(t => t.DeleteEntityAsync(
+ It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()))
+ .ReturnsAsync(new Mock().Object);
+
+ // Act
+ await trackingStore.PurgeInstanceHistoryAsync("testInstance");
+
+ // Assert: Projection should NOT include Timestamp
+ Assert.IsNotNull(capturedSelect, "Select projection was not provided");
+ var selectList = capturedSelect!.ToList();
+ Assert.IsTrue(selectList.Contains("PartitionKey"), "Should project PartitionKey");
+ Assert.IsTrue(selectList.Contains("RowKey"), "Should project RowKey");
+ Assert.IsFalse(selectList.Contains("Timestamp"), "Should NOT project Timestamp");
+ }
+
+ ///
+ /// Verifies that a single-instance purge deletes instance from instances table.
+ ///
+ [TestMethod]
+ public async Task PurgeInstanceHistory_ByInstanceId_DeletesInstance()
+ {
+ // Arrange
+ var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables();
+
+ var instance = new OrchestrationInstanceStatus
+ {
+ PartitionKey = "myInstance",
+ RowKey = "",
+ RuntimeStatus = "Completed",
+ };
+
+ instancesTableClient
+ .Setup(t => t.QueryAsync(
+ It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny()))
+ .Returns(AsyncPageable.FromPages(new[]
+ {
+ Page.FromValues(
+ new List { instance }, null, new Mock().Object)
+ }));
+
+ historyTableClient
+ .Setup(t => t.QueryAsync(
+ It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny()))
+ .Returns(AsyncPageable.FromPages(new[]
+ {
+ Page.FromValues(new List(), null, new Mock().Object)
+ }));
+
+ instancesTableClient
+ .Setup(t => t.DeleteEntityAsync(
+ It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()))
+ .ReturnsAsync(new Mock().Object);
+
+ // Act
+ PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync("myInstance");
+
+ // Assert
+ Assert.AreEqual(1, result.InstancesDeleted);
+ instancesTableClient.Verify(
+ t => t.DeleteEntityAsync("myInstance", string.Empty, ETag.All, It.IsAny()),
+ Times.Once);
+ }
+
+ ///
+ /// Verifies that purging a non-existent instance returns zero.
+ ///
+ [TestMethod]
+ public async Task PurgeInstanceHistory_InstanceNotFound_ReturnsZero()
+ {
+ // Arrange
+ var (trackingStore, instancesTableClient, _) = CreateTrackingStoreWithMockedTables();
+
+ instancesTableClient
+ .Setup(t => t.QueryAsync(
+ It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny()))
+ .Returns(AsyncPageable.FromPages(new[]
+ {
+ Page.FromValues(
+ new List(), null, new Mock().Object)
+ }));
+
+ // Act
+ PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync("nonexistent");
+
+ // Assert
+ Assert.AreEqual(0, result.InstancesDeleted);
+ Assert.AreEqual(0, result.RowsDeleted);
+ }
+
+ ///
+ /// Verifies that history batch delete uses parallel execution (via DeleteBatchParallelAsync).
+ ///
+ [TestMethod]
+ public async Task PurgeInstanceHistory_WithManyHistoryRows_UsesParallelBatchDelete()
+ {
+ // Arrange
+ var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables();
+
+ var instance = new OrchestrationInstanceStatus
+ {
+ PartitionKey = "testPK",
+ RowKey = "",
+ RuntimeStatus = "Completed",
+ };
+
+ instancesTableClient
+ .Setup(t => t.QueryAsync(
+ It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny()))
+ .Returns(AsyncPageable.FromPages(new[]
+ {
+ Page.FromValues(
+ new List { instance }, null, new Mock().Object)
+ }));
+
+ // Return 250 history entities (which should be split into 3 batches: 100+100+50)
+ var historyEntities = Enumerable.Range(0, 250)
+ .Select(i => new TableEntity("testPK", $"rk_{i:D5}") { ETag = ETag.All })
+ .ToList();
+
+ historyTableClient
+ .Setup(t => t.QueryAsync(
+ It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny()))
+ .Returns(AsyncPageable.FromPages(new[]
+ {
+ Page.FromValues(historyEntities, null, new Mock().Object)
+ }));
+
+ // Track batch deletes
+ int batchDeleteCallCount = 0;
+ historyTableClient
+ .Setup(t => t.SubmitTransactionAsync(
+ It.IsAny>(),
+ It.IsAny()))
+ .ReturnsAsync((IEnumerable actions, CancellationToken _) =>
+ {
+ Interlocked.Increment(ref batchDeleteCallCount);
+ int count = actions.Count();
+ var responses = Enumerable.Range(0, count).Select(_ => new Mock().Object).ToList();
+ return Response.FromValue>(responses, new Mock().Object);
+ });
+
+ instancesTableClient
+ .Setup(t => t.DeleteEntityAsync(
+ It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()))
+ .ReturnsAsync(new Mock().Object);
+
+ // Act
+ PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync("testPK");
+
+ // Assert
+ Assert.AreEqual(1, result.InstancesDeleted);
+ Assert.AreEqual(250, result.RowsDeleted);
+ // Should have made 3 batch delete calls (100 + 100 + 50)
+ Assert.AreEqual(3, batchDeleteCallCount, "Expected 3 batch transactions for 250 entities");
+ }
+
+ #region Helper Methods
+
+ static (AzureTableTrackingStore trackingStore, Mock instancesTableClient, Mock historyTableClient)
+ CreateTrackingStoreWithMockedTables(Action? modifySettings = null)
+ {
+ var settings = new AzureStorageOrchestrationServiceSettings
+ {
+ StorageAccountClientProvider = new StorageAccountClientProvider(ConnectionString),
+ FetchLargeMessageDataEnabled = false,
+ };
+
+ modifySettings?.Invoke(settings);
+
+ var azureStorageClient = new AzureStorageClient(settings);
+
+ // Create mocked instances table
+ var instancesServiceClient = new Mock(MockBehavior.Strict, ConnectionString);
+ var instancesTableClient = new Mock(MockBehavior.Loose, ConnectionString, InstancesTableName);
+ instancesTableClient.Setup(t => t.Name).Returns(InstancesTableName);
+ instancesServiceClient.Setup(t => t.GetTableClient(InstancesTableName)).Returns(instancesTableClient.Object);
+ var instancesTable = new Table(azureStorageClient, instancesServiceClient.Object, InstancesTableName);
+
+ // Create mocked history table
+ var historyServiceClient = new Mock(MockBehavior.Strict, ConnectionString);
+ var historyTableClient = new Mock(MockBehavior.Loose, ConnectionString, HistoryTableName);
+ historyTableClient.Setup(t => t.Name).Returns(HistoryTableName);
+ historyServiceClient.Setup(t => t.GetTableClient(HistoryTableName)).Returns(historyTableClient.Object);
+ var historyTable = new Table(azureStorageClient, historyServiceClient.Object, HistoryTableName);
+
+ // Create mock message manager that returns 1 storage operation (no blobs to delete)
+ var messageManager = new Mock(
+ settings, azureStorageClient, "test-largemessages") { CallBase = false };
+ messageManager
+ .Setup(m => m.DeleteLargeMessageBlobs(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(1);
+
+ // Create tracking store using the internal test constructor
+ var trackingStore = new AzureTableTrackingStore(
+ azureStorageClient,
+ historyTable,
+ instancesTable,
+ messageManager.Object);
+
+ return (trackingStore, instancesTableClient, historyTableClient);
+ }
+
+ #endregion
+ }
+}
\ No newline at end of file
diff --git a/Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs b/Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs
new file mode 100644
index 000000000..6c49679ca
--- /dev/null
+++ b/Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs
@@ -0,0 +1,372 @@
+// ----------------------------------------------------------------------------------
+// Copyright Microsoft Corporation
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ----------------------------------------------------------------------------------
+#nullable enable
+namespace DurableTask.AzureStorage.Tests.Storage
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Azure;
+ using Azure.Data.Tables;
+ using DurableTask.AzureStorage.Storage;
+ using Microsoft.VisualStudio.TestTools.UnitTesting;
+ using Moq;
+
+ [TestClass]
+ public class TableDeleteBatchParallelTests
+ {
+ const string ConnectionString = "UseDevelopmentStorage=true";
+ const string TableName = "TestTable";
+
+ [TestMethod]
+ public async Task DeleteBatchParallelAsync_EmptyBatch_ReturnsEmptyResults()
+ {
+ // Arrange
+ Table table = CreateTableWithMockedClient(out _, out _);
+ var entities = new List();
+
+ // Act
+ TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4);
+
+ // Assert
+ Assert.AreEqual(0, results.Responses.Count);
+ Assert.AreEqual(0, results.RequestCount);
+ }
+
+ [TestMethod]
+ public async Task DeleteBatchParallelAsync_SingleBatch_SubmitsOneTransaction()
+ {
+ // Arrange
+ var entities = CreateTestEntities("pk", count: 50);
+
+ Table table = CreateTableWithMockedClient(out _, out Mock tableClient);
+
+ var mockResponses = CreateMockBatchResponse(50);
+ tableClient
+ .Setup(t => t.SubmitTransactionAsync(
+ It.Is>(a => a.Count() == 50),
+ It.IsAny()))
+ .ReturnsAsync(mockResponses);
+
+ // Act
+ TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4);
+
+ // Assert
+ Assert.AreEqual(50, results.Responses.Count);
+ tableClient.Verify(
+ t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()),
+ Times.Once);
+ }
+
+ [TestMethod]
+ public async Task DeleteBatchParallelAsync_MultipleBatches_SplitsIntoChunksOf100()
+ {
+ // Arrange
+ var entities = CreateTestEntities("pk", count: 250);
+
+ Table table = CreateTableWithMockedClient(out _, out Mock tableClient);
+
+ tableClient
+ .Setup(t => t.SubmitTransactionAsync(
+ It.IsAny>(),
+ It.IsAny()))
+ .ReturnsAsync((IEnumerable batch, CancellationToken _) =>
+ CreateMockBatchResponse(batch.Count()));
+
+ // Act
+ TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 10);
+
+ // Assert: 250 entities = 3 batches (100 + 100 + 50)
+ Assert.AreEqual(250, results.Responses.Count);
+ tableClient.Verify(
+ t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()),
+ Times.Exactly(3));
+ }
+
+ [TestMethod]
+ public async Task DeleteBatchParallelAsync_RespectsMaxParallelism()
+ {
+ // Arrange
+ int maxParallelism = 2;
+ var entities = CreateTestEntities("pk", count: 500); // 5 batches of 100
+ int concurrentCount = 0;
+ int maxConcurrent = 0;
+
+ Table table = CreateTableWithMockedClient(out _, out Mock tableClient);
+
+ tableClient
+ .Setup(t => t.SubmitTransactionAsync(
+ It.IsAny>(),
+ It.IsAny()))
+ .Returns(async (IEnumerable batch, CancellationToken _) =>
+ {
+ int current = Interlocked.Increment(ref concurrentCount);
+ int snapshot;
+ do
+ {
+ snapshot = Volatile.Read(ref maxConcurrent);
+ }
+ while (current > snapshot && Interlocked.CompareExchange(ref maxConcurrent, current, snapshot) != snapshot);
+
+ await Task.Delay(50); // Simulate some latency
+ Interlocked.Decrement(ref concurrentCount);
+
+ return CreateMockBatchResponse(batch.Count());
+ });
+
+ // Act
+ await table.DeleteBatchParallelAsync(entities, maxParallelism: maxParallelism);
+
+ // Assert
+ Assert.IsTrue(
+ maxConcurrent <= maxParallelism,
+ $"Max concurrent batches ({maxConcurrent}) exceeded maxParallelism ({maxParallelism})");
+ }
+
+ [TestMethod]
+ public async Task DeleteBatchParallelAsync_BatchFails404_FallsBackToIndividualDeletes()
+ {
+ // Arrange
+ var entities = CreateTestEntities("pk", count: 3);
+
+ Table table = CreateTableWithMockedClient(out _, out Mock tableClient);
+
+ // First batch call fails with 404 (e.g., one entity already deleted)
+ tableClient
+ .Setup(t => t.SubmitTransactionAsync(
+ It.IsAny>(),
+ It.IsAny()))
+ .ThrowsAsync(new RequestFailedException(404, "Entity not found"));
+
+ // Individual deletes succeed
+ var mockResponse = new Mock();
+ tableClient
+ .Setup(t => t.DeleteEntityAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .ReturnsAsync(mockResponse.Object);
+
+ // Act
+ TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4);
+
+ // Assert: should fall back to individual deletes
+ Assert.AreEqual(3, results.Responses.Count);
+ tableClient.Verify(
+ t => t.DeleteEntityAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()),
+ Times.Exactly(3));
+ }
+
+ [TestMethod]
+ public async Task DeleteBatchParallelAsync_IndividualDeleteSkips404()
+ {
+ // Arrange
+ var entities = CreateTestEntities("pk", count: 3);
+
+ Table table = CreateTableWithMockedClient(out _, out Mock tableClient);
+
+ // Batch fails with 404
+ tableClient
+ .Setup(t => t.SubmitTransactionAsync(
+ It.IsAny>(),
+ It.IsAny()))
+ .ThrowsAsync(new RequestFailedException(404, "Entity not found"));
+
+ // Individual delete: first succeeds, second returns 404, third succeeds
+ int callCount = 0;
+ var mockResponse = new Mock();
+ tableClient
+ .Setup(t => t.DeleteEntityAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Returns((string pk, string rk, ETag ifMatch, CancellationToken ct) =>
+ {
+ int call = Interlocked.Increment(ref callCount);
+ if (call == 2)
+ {
+ throw new RequestFailedException(404, "Entity already deleted");
+ }
+ return Task.FromResult(mockResponse.Object);
+ });
+
+ // Act
+ TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4);
+
+ // Assert: only 2 responses (the 404 was skipped)
+ Assert.AreEqual(2, results.Responses.Count);
+ Assert.AreEqual(3, results.RequestCount); // Still counted 3 requests
+ }
+
+ [TestMethod]
+ public async Task DeleteBatchParallelAsync_ExactlyOneBatch_NoBoundaryIssues()
+ {
+ // Arrange: exactly 100 entities = 1 batch
+ var entities = CreateTestEntities("pk", count: 100);
+
+ Table table = CreateTableWithMockedClient(out _, out Mock tableClient);
+
+ tableClient
+ .Setup(t => t.SubmitTransactionAsync(
+ It.Is>(a => a.Count() == 100),
+ It.IsAny()))
+ .ReturnsAsync(CreateMockBatchResponse(100));
+
+ // Act
+ TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4);
+
+ // Assert
+ Assert.AreEqual(100, results.Responses.Count);
+ tableClient.Verify(
+ t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()),
+ Times.Once);
+ }
+
+ [TestMethod]
+ public async Task DeleteBatchParallelAsync_101Entities_CreatesTwoBatches()
+ {
+ // Arrange: 101 entities = 2 batches (100 + 1)
+ var entities = CreateTestEntities("pk", count: 101);
+
+ Table table = CreateTableWithMockedClient(out _, out Mock tableClient);
+
+ tableClient
+ .Setup(t => t.SubmitTransactionAsync(
+ It.IsAny>(),
+ It.IsAny()))
+ .ReturnsAsync((IEnumerable batch, CancellationToken _) =>
+ CreateMockBatchResponse(batch.Count()));
+
+ // Act
+ TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4);
+
+ // Assert
+ Assert.AreEqual(101, results.Responses.Count);
+ tableClient.Verify(
+ t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()),
+ Times.Exactly(2));
+ }
+
+ [TestMethod]
+ public async Task DeleteBatchParallelAsync_CancellationToken_IsPropagated()
+ {
+ // Arrange
+ var entities = CreateTestEntities("pk", count: 200);
+ using var cts = new CancellationTokenSource();
+
+ Table table = CreateTableWithMockedClient(out _, out Mock tableClient);
+
+ int batchesSubmitted = 0;
+ tableClient
+ .Setup(t => t.SubmitTransactionAsync(
+ It.IsAny>(),
+ It.IsAny()))
+ .Returns(async (IEnumerable batch, CancellationToken ct) =>
+ {
+ int count = Interlocked.Increment(ref batchesSubmitted);
+ if (count == 1)
+ {
+ // Cancel after first batch starts
+ cts.Cancel();
+ }
+ ct.ThrowIfCancellationRequested();
+ return CreateMockBatchResponse(batch.Count());
+ });
+
+ // Act & Assert
+ await Assert.ThrowsExceptionAsync(
+ () => table.DeleteBatchParallelAsync(entities, maxParallelism: 1, cts.Token));
+ }
+
+ [TestMethod]
+ public async Task DeleteBatchParallelAsync_MaxParallelismOne_ExecutesSequentially()
+ {
+ // Arrange
+ var entities = CreateTestEntities("pk", count: 300); // 3 batches
+ var batchOrder = new ConcurrentBag();
+ int batchIndex = 0;
+
+ Table table = CreateTableWithMockedClient(out _, out Mock tableClient);
+
+ tableClient
+ .Setup(t => t.SubmitTransactionAsync(
+ It.IsAny>(),
+ It.IsAny()))
+ .Returns(async (IEnumerable batch, CancellationToken _) =>
+ {
+ int idx = Interlocked.Increment(ref batchIndex);
+ batchOrder.Add(idx);
+ await Task.Delay(10);
+ return CreateMockBatchResponse(batch.Count());
+ });
+
+ // Act
+ TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 1);
+
+ // Assert
+ Assert.AreEqual(300, results.Responses.Count);
+ Assert.AreEqual(3, batchOrder.Count);
+ }
+
+ #region Helper Methods
+
+ static Table CreateTableWithMockedClient(
+ out Mock tableServiceClient,
+ out Mock tableClient)
+ {
+ var settings = new AzureStorageOrchestrationServiceSettings
+ {
+ StorageAccountClientProvider = new StorageAccountClientProvider(ConnectionString),
+ };
+
+ var azureStorageClient = new AzureStorageClient(settings);
+
+ tableServiceClient = new Mock(MockBehavior.Strict, ConnectionString);
+ tableClient = new Mock(MockBehavior.Loose, ConnectionString, TableName);
+ tableClient.Setup(t => t.Name).Returns(TableName);
+ tableServiceClient.Setup(t => t.GetTableClient(TableName)).Returns(tableClient.Object);
+
+ return new Table(azureStorageClient, tableServiceClient.Object, TableName);
+ }
+
+ static List CreateTestEntities(string partitionKey, int count)
+ {
+ var entities = new List(count);
+ for (int i = 0; i < count; i++)
+ {
+ entities.Add(new TableEntity(partitionKey, $"rk_{i:D5}")
+ {
+ ETag = ETag.All,
+ });
+ }
+ return entities;
+ }
+
+ static Response> CreateMockBatchResponse(int count)
+ {
+ var responses = new List();
+ for (int i = 0; i < count; i++)
+ {
+ responses.Add(new Mock().Object);
+ }
+ return Response.FromValue>(responses, new Mock().Object);
+ }
+
+ #endregion
+ }
+}
diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs
index ff48d507f..6e122c556 100644
--- a/src/DurableTask.AzureStorage/MessageManager.cs
+++ b/src/DurableTask.AzureStorage/MessageManager.cs
@@ -316,10 +316,11 @@ public string GetNewLargeMessageBlobName(MessageData message)
return $"{instanceId}/message-{activityId}-{eventType}.json.gz";
}
- public async Task DeleteLargeMessageBlobs(string sanitizedInstanceId, CancellationToken cancellationToken = default)
+ public virtual async Task DeleteLargeMessageBlobs(string sanitizedInstanceId, CancellationToken cancellationToken = default)
{
- int storageOperationCount = 1;
- if (await this.blobContainer.ExistsAsync(cancellationToken))
+ int storageOperationCount = 0;
+
+ try
{
await foreach (Page page in this.blobContainer.ListBlobsAsync(sanitizedInstanceId, cancellationToken).AsPages())
{
@@ -329,6 +330,22 @@ public async Task DeleteLargeMessageBlobs(string sanitizedInstanceId, Cance
storageOperationCount += page.Values.Count;
}
+
+ // Count the list operation even if no blobs found (the initial list request still happened)
+ if (storageOperationCount == 0)
+ {
+ storageOperationCount = 1;
+ }
+ }
+ catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404)
+ {
+ // Container does not exist; nothing to delete.
+ storageOperationCount = 1;
+ }
+ catch (Azure.RequestFailedException ex) when (ex.Status == 404)
+ {
+ // Container does not exist; nothing to delete.
+ storageOperationCount = 1;
}
return storageOperationCount;
diff --git a/src/DurableTask.AzureStorage/Storage/Table.cs b/src/DurableTask.AzureStorage/Storage/Table.cs
index 7d726e1d7..4c41c0a85 100644
--- a/src/DurableTask.AzureStorage/Storage/Table.cs
+++ b/src/DurableTask.AzureStorage/Storage/Table.cs
@@ -117,6 +117,126 @@ public async Task DeleteBatchAsync(IEnumerable en
return await this.ExecuteBatchAsync(entityBatch, item => new TableTransactionAction(TableTransactionActionType.Delete, item), cancellationToken: cancellationToken);
}
+ ///
+ /// Deletes entities in parallel batches of up to 100. Each batch is an atomic transaction,
+ /// but multiple batches are submitted concurrently for improved throughput.
+ /// If a batch fails because an entity was already deleted (404/EntityNotFound),
+ /// it falls back to individual deletes for that batch, skipping already-deleted entities.
+ ///
+ public async Task DeleteBatchParallelAsync(
+ IReadOnlyList entityBatch,
+ int maxParallelism,
+ CancellationToken cancellationToken = default) where T : ITableEntity
+ {
+ if (entityBatch.Count == 0)
+ {
+ return new TableTransactionResults(Array.Empty(), TimeSpan.Zero, 0);
+ }
+
+ const int batchSize = 100;
+ int chunkCount = (entityBatch.Count + batchSize - 1) / batchSize;
+ var chunks = new List>(chunkCount);
+
+ var currentChunk = new List(batchSize);
+ foreach (T entity in entityBatch)
+ {
+ currentChunk.Add(new TableTransactionAction(TableTransactionActionType.Delete, entity));
+ if (currentChunk.Count == batchSize)
+ {
+ chunks.Add(currentChunk);
+ currentChunk = new List(batchSize);
+ }
+ }
+
+ if (currentChunk.Count > 0)
+ {
+ chunks.Add(currentChunk);
+ }
+
+ var resultsBuilder = new TableTransactionResultsBuilder();
+ var semaphore = new SemaphoreSlim(Math.Max(1, maxParallelism));
+
+ var tasks = chunks.Select(async chunk =>
+ {
+ await semaphore.WaitAsync(cancellationToken);
+ try
+ {
+ return await this.ExecuteBatchWithFallbackAsync(chunk, cancellationToken);
+ }
+ finally
+ {
+ semaphore.Release();
+ }
+ });
+
+ TableTransactionResults[] allResults = await Task.WhenAll(tasks);
+ foreach (TableTransactionResults result in allResults)
+ {
+ resultsBuilder.Add(result);
+ }
+
+ return resultsBuilder.ToResults();
+ }
+
+ ///
+ /// Executes a batch transaction. If it fails due to an entity not found (404),
+ /// falls back to individual delete operations, skipping entities that are already gone.
+ ///
+ async Task ExecuteBatchWithFallbackAsync(
+ List batch,
+ CancellationToken cancellationToken)
+ {
+ try
+ {
+ return await this.ExecuteBatchAsync(batch, cancellationToken);
+ }
+ catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404)
+ {
+ // One or more entities in the batch were already deleted.
+ // Fall back to individual deletes, skipping 404s.
+ return await this.DeleteEntitiesIndividuallyAsync(batch, cancellationToken);
+ }
+ catch (RequestFailedException ex) when (ex.Status == 404)
+ {
+ return await this.DeleteEntitiesIndividuallyAsync(batch, cancellationToken);
+ }
+ }
+
+ async Task DeleteEntitiesIndividuallyAsync(
+ List batch,
+ CancellationToken cancellationToken)
+ {
+ var responses = new List();
+ var stopwatch = Stopwatch.StartNew();
+ int requestCount = 0;
+
+ foreach (TableTransactionAction action in batch)
+ {
+ requestCount++;
+ try
+ {
+ Response response = await this.tableClient.DeleteEntityAsync(
+ action.Entity.PartitionKey,
+ action.Entity.RowKey,
+ ETag.All,
+ cancellationToken).DecorateFailure();
+ responses.Add(response);
+ this.stats.TableEntitiesWritten.Increment();
+ }
+ catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404)
+ {
+ // Entity already deleted; skip.
+ }
+ catch (RequestFailedException ex) when (ex.Status == 404)
+ {
+ // Entity already deleted; skip.
+ }
+ }
+
+ stopwatch.Stop();
+ return new TableTransactionResults(responses, stopwatch.Elapsed, requestCount);
+ }
+
public async Task InsertOrMergeBatchAsync(IEnumerable entityBatch, CancellationToken cancellationToken = default) where T : ITableEntity
{
TableTransactionResults results = await this.ExecuteBatchAsync(entityBatch, item => new TableTransactionAction(TableTransactionActionType.UpsertMerge, item), cancellationToken: cancellationToken);
diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
index f1e67a0bf..c82605f65 100644
--- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
+++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
@@ -118,6 +118,25 @@ internal AzureTableTrackingStore(
};
}
+ // For testing purge functionality where HistoryTable and MessageManager are needed
+ internal AzureTableTrackingStore(
+ AzureStorageClient azureStorageClient,
+ Table historyTable,
+ Table instancesTable,
+ MessageManager messageManager)
+ {
+ this.azureStorageClient = azureStorageClient;
+ this.settings = azureStorageClient.Settings;
+ this.stats = azureStorageClient.Stats;
+ this.storageAccountName = azureStorageClient.TableAccountName;
+ this.taskHubName = this.settings.TaskHubName;
+ this.HistoryTable = historyTable;
+ this.InstancesTable = instancesTable;
+ this.messageManager = messageManager;
+
+ this.settings.FetchLargeMessageDataEnabled = false;
+ }
+
internal Table HistoryTable { get; }
internal Table InstancesTable { get; }
@@ -568,25 +587,42 @@ async Task DeleteHistoryAsync(
ODataCondition odata = condition.ToOData();
- // Limit to batches of 100 to avoid excessive memory usage and table storage scanning
int storageRequests = 0;
int instancesDeleted = 0;
int rowsDeleted = 0;
- var options = new ParallelOptions { MaxDegreeOfParallelism = this.settings.MaxStorageOperationConcurrency };
+ // Limit concurrent instance purges to avoid overwhelming storage with too many parallel operations.
+ // Each instance purge internally spawns multiple parallel storage operations, so this should be
+ // kept moderate. Using 100 to match the original implicit concurrency from pageSizeHint.
+ const int MaxPurgeInstanceConcurrency = 100;
+ var throttle = new SemaphoreSlim(MaxPurgeInstanceConcurrency);
+ var pendingTasks = new List();
+
AsyncPageable entitiesPageable = this.InstancesTable.ExecuteQueryAsync(odata.Filter, select: odata.Select, cancellationToken: cancellationToken);
- await foreach (Page page in entitiesPageable.AsPages(pageSizeHint: 100))
+ await foreach (Page page in entitiesPageable.AsPages(pageSizeHint: MaxPurgeInstanceConcurrency))
{
- // The underlying client throttles
- await Task.WhenAll(page.Values.Select(async instance =>
+ foreach (OrchestrationInstanceStatus instance in page.Values)
{
- PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, cancellationToken);
- Interlocked.Add(ref instancesDeleted, statisticsFromDeletion.InstancesDeleted);
- Interlocked.Add(ref storageRequests, statisticsFromDeletion.RowsDeleted);
- Interlocked.Add(ref rowsDeleted, statisticsFromDeletion.RowsDeleted);
- }));
+ await throttle.WaitAsync(cancellationToken);
+ pendingTasks.Add(Task.Run(async () =>
+ {
+ try
+ {
+ PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, cancellationToken);
+ Interlocked.Add(ref instancesDeleted, statisticsFromDeletion.InstancesDeleted);
+ Interlocked.Add(ref storageRequests, statisticsFromDeletion.StorageRequests);
+ Interlocked.Add(ref rowsDeleted, statisticsFromDeletion.RowsDeleted);
+ }
+ finally
+ {
+ throttle.Release();
+ }
+ }));
+ }
}
+ await Task.WhenAll(pendingTasks);
+
return new PurgeHistoryResult(storageRequests, instancesDeleted, rowsDeleted);
}
@@ -601,13 +637,14 @@ async Task DeleteAllDataForOrchestrationInstance(Orchestrati
.GetHistoryEntitiesResponseInfoAsync(
instanceId: sanitizedInstanceId,
expectedExecutionId: null,
- projectionColumns: new[] { RowKeyProperty, PartitionKeyProperty, TimestampProperty },
+ projectionColumns: new[] { RowKeyProperty, PartitionKeyProperty },
cancellationToken)
.GetResultsAsync(cancellationToken: cancellationToken);
storageRequests += results.RequestCount;
IReadOnlyList historyEntities = results.Entities;
+ int maxParallelism = Math.Max(1, this.settings.MaxStorageOperationConcurrency);
var tasks = new List
{
@@ -618,7 +655,7 @@ async Task DeleteAllDataForOrchestrationInstance(Orchestrati
}),
Task.Run(async () =>
{
- var deletedEntitiesResponseInfo = await this.HistoryTable.DeleteBatchAsync(historyEntities, cancellationToken);
+ var deletedEntitiesResponseInfo = await this.HistoryTable.DeleteBatchParallelAsync(historyEntities, maxParallelism, cancellationToken);
Interlocked.Add(ref rowsDeleted, deletedEntitiesResponseInfo.Responses.Count);
Interlocked.Add(ref storageRequests, deletedEntitiesResponseInfo.RequestCount);
}),
diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
index 502e5fab6..f07a79016 100644
--- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
+++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
@@ -560,6 +560,92 @@ private async Task GetBlobCount(string containerName, string directoryName)
}
+ [TestMethod]
+ public async Task PurgeMultipleInstancesHistoryByTimePeriod_ScalabilityValidation()
+ {
+ // This test validates scale improvements: parallel batch delete and pipelined page processing.
+ // Runs multiple concurrent orchestrations, then purges all of them by time period.
+ using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false))
+ {
+ await host.StartAsync();
+ DateTime startDateTime = DateTime.Now;
+
+ // Create multiple orchestration instances concurrently
+ const int instanceCount = 5;
+ var clients = new List();
+ var instanceIds = new List();
+
+ for (int i = 0; i < instanceCount; i++)
+ {
+ string instanceId = $"purge-scale-{Guid.NewGuid():N}";
+ instanceIds.Add(instanceId);
+ TestOrchestrationClient client = await host.StartOrchestrationAsync(
+ typeof(Orchestrations.Factorial), 10, instanceId);
+ clients.Add(client);
+ }
+
+ // Wait for all orchestrations to complete
+ foreach (var client in clients)
+ {
+ var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60));
+ Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);
+ }
+
+ // Verify all instances have history
+ foreach (string instanceId in instanceIds)
+ {
+ List historyEvents = await clients[0].GetOrchestrationHistoryAsync(instanceId);
+ Assert.IsTrue(historyEvents.Count > 0, $"Instance {instanceId} should have history events");
+ }
+
+ // Purge all instances by time period
+ await clients[0].PurgeInstanceHistoryByTimePeriod(
+ startDateTime,
+ DateTime.UtcNow,
+ new List { OrchestrationStatus.Completed });
+
+ // Verify all history is purged
+ foreach (string instanceId in instanceIds)
+ {
+ List historyEvents = await clients[0].GetOrchestrationHistoryAsync(instanceId);
+ Assert.AreEqual(0, historyEvents.Count, $"Instance {instanceId} should have no history after purge");
+
+ IList stateList = await clients[0].GetStateAsync(instanceId);
+ Assert.AreEqual(1, stateList.Count);
+ Assert.IsNull(stateList[0], $"Instance {instanceId} state should be null after purge");
+ }
+
+ await host.StopAsync();
+ }
+ }
+
+ [TestMethod]
+ public async Task PurgeSingleInstanceWithIdempotency()
+ {
+ // This test validates that purging the same instance twice doesn't cause errors
+ // (testing the idempotent batch delete fallback).
+ using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false))
+ {
+ string instanceId = Guid.NewGuid().ToString();
+ await host.StartAsync();
+ TestOrchestrationClient client = await host.StartOrchestrationAsync(
+ typeof(Orchestrations.Factorial), 110, instanceId);
+ await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60));
+
+ // First purge should succeed
+ await client.PurgeInstanceHistory();
+
+ List