From 26734db3e2439978f65ecc5eaa6baad02872c515 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Fri, 13 Mar 2026 15:33:27 -0700 Subject: [PATCH 1/8] enhance purge --- .../PurgeScalabilityTests.cs | 446 ++++++++++++++++++ .../Storage/TableDeleteBatchParallelTests.cs | 372 +++++++++++++++ .../MessageManager.cs | 23 +- src/DurableTask.AzureStorage/Storage/Table.cs | 120 +++++ .../Tracking/AzureTableTrackingStore.cs | 61 ++- .../AzureStorageScenarioTests.cs | 86 ++++ 6 files changed, 1093 insertions(+), 15 deletions(-) create mode 100644 Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs create mode 100644 Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs diff --git a/Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs b/Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs new file mode 100644 index 000000000..065946ece --- /dev/null +++ b/Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs @@ -0,0 +1,446 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- +#nullable enable +namespace DurableTask.AzureStorage.Tests +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using Azure; + using Azure.Data.Tables; + using DurableTask.AzureStorage.Monitoring; + using DurableTask.AzureStorage.Storage; + using DurableTask.AzureStorage.Tracking; + using DurableTask.Core; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class PurgeScalabilityTests + { + const string ConnectionString = "UseDevelopmentStorage=true"; + const string InstancesTableName = "TestInstances"; + const string HistoryTableName = "TestHistory"; + + /// + /// Verifies that the DeleteHistoryAsync pipeline processes instances + /// and that the storageRequests counter accumulates correctly. + /// Previously there was a bug where rowsDeleted was counted as storageRequests. + /// + [TestMethod] + public async Task DeleteHistoryAsync_ViaPublicApi_AccumulatesStatistics() + { + // Arrange - create a tracking store with mocked tables + var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables(); + + // Setup: Instances table query returns 2 instances across 1 page + var instance1 = new OrchestrationInstanceStatus + { + PartitionKey = "instance1", + RowKey = "", + RuntimeStatus = "Completed", + CreatedTime = DateTime.UtcNow.AddHours(-2), + }; + var instance2 = new OrchestrationInstanceStatus + { + PartitionKey = "instance2", + RowKey = "", + RuntimeStatus = "Completed", + CreatedTime = DateTime.UtcNow.AddHours(-1), + }; + + instancesTableClient + .Setup(t => t.QueryAsync( + It.IsAny(), + It.IsAny(), + It.IsAny>(), + It.IsAny())) + .Returns(AsyncPageable.FromPages(new[] + { + Page.FromValues( + new List { instance1, instance2 }, + null, + new Mock().Object) + })); + + // History table query: each instance has 5 history rows + historyTableClient + .Setup(t => t.QueryAsync( + It.IsAny(), + It.IsAny(), + It.IsAny>(), + It.IsAny())) + .Returns((string filter, int? maxPerPage, IEnumerable? select, CancellationToken ct) => + { + string pk = filter.Contains("instance1") ? "instance1" : "instance2"; + var entities = Enumerable.Range(0, 5) + .Select(i => new TableEntity(pk, $"rk_{i}") { ETag = ETag.All }) + .ToList(); + return AsyncPageable.FromPages(new[] + { + Page.FromValues(entities, null, new Mock().Object) + }); + }); + + // History batch delete succeeds + historyTableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .ReturnsAsync((IEnumerable actions, CancellationToken _) => + { + int count = actions.Count(); + var responses = Enumerable.Range(0, count).Select(_ => new Mock().Object).ToList(); + return Response.FromValue>(responses, new Mock().Object); + }); + + // Instances table delete succeeds + instancesTableClient + .Setup(t => t.DeleteEntityAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new Mock().Object); + + // Act + PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync( + createdTimeFrom: DateTime.UtcNow.AddHours(-3), + createdTimeTo: DateTime.UtcNow, + runtimeStatus: new[] { OrchestrationStatus.Completed }); + + // Assert + Assert.AreEqual(2, result.InstancesDeleted); + Assert.AreEqual(10, result.RowsDeleted); // 5 history rows per instance × 2 instances + } + + /// + /// Verifies that the purge pipeline limits concurrent instance purges + /// to a bounded number (MaxPurgeInstanceConcurrency = 100). + /// With only 10 instances, all should run concurrently but not exceed the limit. + /// + [TestMethod] + public async Task DeleteHistoryAsync_RespectsMaxConcurrency() + { + // Arrange + int concurrentCount = 0; + int maxObservedConcurrency = 0; + + var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables(); + + // 10 instances to delete + var instances = Enumerable.Range(0, 10).Select(i => + new OrchestrationInstanceStatus + { + PartitionKey = $"instance{i}", + RowKey = "", + RuntimeStatus = "Completed", + CreatedTime = DateTime.UtcNow.AddHours(-1), + }).ToList(); + + instancesTableClient + .Setup(t => t.QueryAsync( + It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) + .Returns(AsyncPageable.FromPages(new[] + { + Page.FromValues(instances, null, new Mock().Object) + })); + + // History: empty for each instance (no rows to delete) + historyTableClient + .Setup(t => t.QueryAsync( + It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) + .Returns(AsyncPageable.FromPages(new[] + { + Page.FromValues(new List(), null, new Mock().Object) + })); + + // Instances delete: track concurrency + instancesTableClient + .Setup(t => t.DeleteEntityAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(async (string pk, string rk, ETag etag, CancellationToken ct) => + { + int current = Interlocked.Increment(ref concurrentCount); + int snapshot; + do + { + snapshot = Volatile.Read(ref maxObservedConcurrency); + } + while (current > snapshot && Interlocked.CompareExchange(ref maxObservedConcurrency, current, snapshot) != snapshot); + + await Task.Delay(30); // Simulate latency + Interlocked.Decrement(ref concurrentCount); + return new Mock().Object; + }); + + // Act + PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync( + createdTimeFrom: DateTime.UtcNow.AddHours(-3), + createdTimeTo: DateTime.UtcNow, + runtimeStatus: new[] { OrchestrationStatus.Completed }); + + // Assert + Assert.AreEqual(10, result.InstancesDeleted); + // With 10 instances and MaxPurgeInstanceConcurrency=100, all 10 should be able to run concurrently + Assert.IsTrue( + maxObservedConcurrency <= 100, + $"Max observed concurrency ({maxObservedConcurrency}) should not exceed MaxPurgeInstanceConcurrency (100)"); + } + + /// + /// Verifies that single-instance purge removes the TimestampProperty from the projection columns. + /// + [TestMethod] + public async Task PurgeInstanceHistory_ProjectsOnlyPKAndRK() + { + // Arrange + var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables(); + + // Setup instances table to return one instance + var instance = new OrchestrationInstanceStatus + { + PartitionKey = "testInstance", + RowKey = "", + RuntimeStatus = "Completed", + }; + + instancesTableClient + .Setup(t => t.QueryAsync( + It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) + .Returns(AsyncPageable.FromPages(new[] + { + Page.FromValues( + new List { instance }, null, new Mock().Object) + })); + + // Capture the actual select columns passed to the history query + IEnumerable? capturedSelect = null; + historyTableClient + .Setup(t => t.QueryAsync( + It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) + .Callback((string filter, int? maxPerPage, IEnumerable? select, CancellationToken ct) => + { + capturedSelect = select; + }) + .Returns(AsyncPageable.FromPages(new[] + { + Page.FromValues(new List(), null, new Mock().Object) + })); + + instancesTableClient + .Setup(t => t.DeleteEntityAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new Mock().Object); + + // Act + await trackingStore.PurgeInstanceHistoryAsync("testInstance"); + + // Assert: Projection should NOT include Timestamp + Assert.IsNotNull(capturedSelect, "Select projection was not provided"); + var selectList = capturedSelect!.ToList(); + Assert.IsTrue(selectList.Contains("PartitionKey"), "Should project PartitionKey"); + Assert.IsTrue(selectList.Contains("RowKey"), "Should project RowKey"); + Assert.IsFalse(selectList.Contains("Timestamp"), "Should NOT project Timestamp"); + } + + /// + /// Verifies that a single-instance purge deletes instance from instances table. + /// + [TestMethod] + public async Task PurgeInstanceHistory_ByInstanceId_DeletesInstance() + { + // Arrange + var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables(); + + var instance = new OrchestrationInstanceStatus + { + PartitionKey = "myInstance", + RowKey = "", + RuntimeStatus = "Completed", + }; + + instancesTableClient + .Setup(t => t.QueryAsync( + It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) + .Returns(AsyncPageable.FromPages(new[] + { + Page.FromValues( + new List { instance }, null, new Mock().Object) + })); + + historyTableClient + .Setup(t => t.QueryAsync( + It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) + .Returns(AsyncPageable.FromPages(new[] + { + Page.FromValues(new List(), null, new Mock().Object) + })); + + instancesTableClient + .Setup(t => t.DeleteEntityAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new Mock().Object); + + // Act + PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync("myInstance"); + + // Assert + Assert.AreEqual(1, result.InstancesDeleted); + instancesTableClient.Verify( + t => t.DeleteEntityAsync("myInstance", string.Empty, ETag.All, It.IsAny()), + Times.Once); + } + + /// + /// Verifies that purging a non-existent instance returns zero. + /// + [TestMethod] + public async Task PurgeInstanceHistory_InstanceNotFound_ReturnsZero() + { + // Arrange + var (trackingStore, instancesTableClient, _) = CreateTrackingStoreWithMockedTables(); + + instancesTableClient + .Setup(t => t.QueryAsync( + It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) + .Returns(AsyncPageable.FromPages(new[] + { + Page.FromValues( + new List(), null, new Mock().Object) + })); + + // Act + PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync("nonexistent"); + + // Assert + Assert.AreEqual(0, result.InstancesDeleted); + Assert.AreEqual(0, result.RowsDeleted); + } + + /// + /// Verifies that history batch delete uses parallel execution (via DeleteBatchParallelAsync). + /// + [TestMethod] + public async Task PurgeInstanceHistory_WithManyHistoryRows_UsesParallelBatchDelete() + { + // Arrange + var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables(); + + var instance = new OrchestrationInstanceStatus + { + PartitionKey = "testPK", + RowKey = "", + RuntimeStatus = "Completed", + }; + + instancesTableClient + .Setup(t => t.QueryAsync( + It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) + .Returns(AsyncPageable.FromPages(new[] + { + Page.FromValues( + new List { instance }, null, new Mock().Object) + })); + + // Return 250 history entities (which should be split into 3 batches: 100+100+50) + var historyEntities = Enumerable.Range(0, 250) + .Select(i => new TableEntity("testPK", $"rk_{i:D5}") { ETag = ETag.All }) + .ToList(); + + historyTableClient + .Setup(t => t.QueryAsync( + It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) + .Returns(AsyncPageable.FromPages(new[] + { + Page.FromValues(historyEntities, null, new Mock().Object) + })); + + // Track batch deletes + int batchDeleteCallCount = 0; + historyTableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .ReturnsAsync((IEnumerable actions, CancellationToken _) => + { + Interlocked.Increment(ref batchDeleteCallCount); + int count = actions.Count(); + var responses = Enumerable.Range(0, count).Select(_ => new Mock().Object).ToList(); + return Response.FromValue>(responses, new Mock().Object); + }); + + instancesTableClient + .Setup(t => t.DeleteEntityAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new Mock().Object); + + // Act + PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync("testPK"); + + // Assert + Assert.AreEqual(1, result.InstancesDeleted); + Assert.AreEqual(250, result.RowsDeleted); + // Should have made 3 batch delete calls (100 + 100 + 50) + Assert.AreEqual(3, batchDeleteCallCount, "Expected 3 batch transactions for 250 entities"); + } + + #region Helper Methods + + static (AzureTableTrackingStore trackingStore, Mock instancesTableClient, Mock historyTableClient) + CreateTrackingStoreWithMockedTables(Action? modifySettings = null) + { + var settings = new AzureStorageOrchestrationServiceSettings + { + StorageAccountClientProvider = new StorageAccountClientProvider(ConnectionString), + FetchLargeMessageDataEnabled = false, + }; + + modifySettings?.Invoke(settings); + + var azureStorageClient = new AzureStorageClient(settings); + + // Create mocked instances table + var instancesServiceClient = new Mock(MockBehavior.Strict, ConnectionString); + var instancesTableClient = new Mock(MockBehavior.Loose, ConnectionString, InstancesTableName); + instancesTableClient.Setup(t => t.Name).Returns(InstancesTableName); + instancesServiceClient.Setup(t => t.GetTableClient(InstancesTableName)).Returns(instancesTableClient.Object); + var instancesTable = new Table(azureStorageClient, instancesServiceClient.Object, InstancesTableName); + + // Create mocked history table + var historyServiceClient = new Mock(MockBehavior.Strict, ConnectionString); + var historyTableClient = new Mock(MockBehavior.Loose, ConnectionString, HistoryTableName); + historyTableClient.Setup(t => t.Name).Returns(HistoryTableName); + historyServiceClient.Setup(t => t.GetTableClient(HistoryTableName)).Returns(historyTableClient.Object); + var historyTable = new Table(azureStorageClient, historyServiceClient.Object, HistoryTableName); + + // Create mock message manager that returns 1 storage operation (no blobs to delete) + var messageManager = new Mock( + settings, azureStorageClient, "test-largemessages") { CallBase = false }; + messageManager + .Setup(m => m.DeleteLargeMessageBlobs(It.IsAny(), It.IsAny())) + .ReturnsAsync(1); + + // Create tracking store using the internal test constructor + var trackingStore = new AzureTableTrackingStore( + azureStorageClient, + historyTable, + instancesTable, + messageManager.Object); + + return (trackingStore, instancesTableClient, historyTableClient); + } + + #endregion + } +} \ No newline at end of file diff --git a/Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs b/Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs new file mode 100644 index 000000000..6c49679ca --- /dev/null +++ b/Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs @@ -0,0 +1,372 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- +#nullable enable +namespace DurableTask.AzureStorage.Tests.Storage +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using Azure; + using Azure.Data.Tables; + using DurableTask.AzureStorage.Storage; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class TableDeleteBatchParallelTests + { + const string ConnectionString = "UseDevelopmentStorage=true"; + const string TableName = "TestTable"; + + [TestMethod] + public async Task DeleteBatchParallelAsync_EmptyBatch_ReturnsEmptyResults() + { + // Arrange + Table table = CreateTableWithMockedClient(out _, out _); + var entities = new List(); + + // Act + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4); + + // Assert + Assert.AreEqual(0, results.Responses.Count); + Assert.AreEqual(0, results.RequestCount); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_SingleBatch_SubmitsOneTransaction() + { + // Arrange + var entities = CreateTestEntities("pk", count: 50); + + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + var mockResponses = CreateMockBatchResponse(50); + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.Is>(a => a.Count() == 50), + It.IsAny())) + .ReturnsAsync(mockResponses); + + // Act + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4); + + // Assert + Assert.AreEqual(50, results.Responses.Count); + tableClient.Verify( + t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), + Times.Once); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_MultipleBatches_SplitsIntoChunksOf100() + { + // Arrange + var entities = CreateTestEntities("pk", count: 250); + + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .ReturnsAsync((IEnumerable batch, CancellationToken _) => + CreateMockBatchResponse(batch.Count())); + + // Act + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 10); + + // Assert: 250 entities = 3 batches (100 + 100 + 50) + Assert.AreEqual(250, results.Responses.Count); + tableClient.Verify( + t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), + Times.Exactly(3)); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_RespectsMaxParallelism() + { + // Arrange + int maxParallelism = 2; + var entities = CreateTestEntities("pk", count: 500); // 5 batches of 100 + int concurrentCount = 0; + int maxConcurrent = 0; + + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .Returns(async (IEnumerable batch, CancellationToken _) => + { + int current = Interlocked.Increment(ref concurrentCount); + int snapshot; + do + { + snapshot = Volatile.Read(ref maxConcurrent); + } + while (current > snapshot && Interlocked.CompareExchange(ref maxConcurrent, current, snapshot) != snapshot); + + await Task.Delay(50); // Simulate some latency + Interlocked.Decrement(ref concurrentCount); + + return CreateMockBatchResponse(batch.Count()); + }); + + // Act + await table.DeleteBatchParallelAsync(entities, maxParallelism: maxParallelism); + + // Assert + Assert.IsTrue( + maxConcurrent <= maxParallelism, + $"Max concurrent batches ({maxConcurrent}) exceeded maxParallelism ({maxParallelism})"); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_BatchFails404_FallsBackToIndividualDeletes() + { + // Arrange + var entities = CreateTestEntities("pk", count: 3); + + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + // First batch call fails with 404 (e.g., one entity already deleted) + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .ThrowsAsync(new RequestFailedException(404, "Entity not found")); + + // Individual deletes succeed + var mockResponse = new Mock(); + tableClient + .Setup(t => t.DeleteEntityAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(mockResponse.Object); + + // Act + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4); + + // Assert: should fall back to individual deletes + Assert.AreEqual(3, results.Responses.Count); + tableClient.Verify( + t => t.DeleteEntityAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), + Times.Exactly(3)); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_IndividualDeleteSkips404() + { + // Arrange + var entities = CreateTestEntities("pk", count: 3); + + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + // Batch fails with 404 + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .ThrowsAsync(new RequestFailedException(404, "Entity not found")); + + // Individual delete: first succeeds, second returns 404, third succeeds + int callCount = 0; + var mockResponse = new Mock(); + tableClient + .Setup(t => t.DeleteEntityAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns((string pk, string rk, ETag ifMatch, CancellationToken ct) => + { + int call = Interlocked.Increment(ref callCount); + if (call == 2) + { + throw new RequestFailedException(404, "Entity already deleted"); + } + return Task.FromResult(mockResponse.Object); + }); + + // Act + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4); + + // Assert: only 2 responses (the 404 was skipped) + Assert.AreEqual(2, results.Responses.Count); + Assert.AreEqual(3, results.RequestCount); // Still counted 3 requests + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_ExactlyOneBatch_NoBoundaryIssues() + { + // Arrange: exactly 100 entities = 1 batch + var entities = CreateTestEntities("pk", count: 100); + + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.Is>(a => a.Count() == 100), + It.IsAny())) + .ReturnsAsync(CreateMockBatchResponse(100)); + + // Act + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4); + + // Assert + Assert.AreEqual(100, results.Responses.Count); + tableClient.Verify( + t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), + Times.Once); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_101Entities_CreatesTwoBatches() + { + // Arrange: 101 entities = 2 batches (100 + 1) + var entities = CreateTestEntities("pk", count: 101); + + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .ReturnsAsync((IEnumerable batch, CancellationToken _) => + CreateMockBatchResponse(batch.Count())); + + // Act + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4); + + // Assert + Assert.AreEqual(101, results.Responses.Count); + tableClient.Verify( + t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), + Times.Exactly(2)); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_CancellationToken_IsPropagated() + { + // Arrange + var entities = CreateTestEntities("pk", count: 200); + using var cts = new CancellationTokenSource(); + + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + int batchesSubmitted = 0; + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .Returns(async (IEnumerable batch, CancellationToken ct) => + { + int count = Interlocked.Increment(ref batchesSubmitted); + if (count == 1) + { + // Cancel after first batch starts + cts.Cancel(); + } + ct.ThrowIfCancellationRequested(); + return CreateMockBatchResponse(batch.Count()); + }); + + // Act & Assert + await Assert.ThrowsExceptionAsync( + () => table.DeleteBatchParallelAsync(entities, maxParallelism: 1, cts.Token)); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_MaxParallelismOne_ExecutesSequentially() + { + // Arrange + var entities = CreateTestEntities("pk", count: 300); // 3 batches + var batchOrder = new ConcurrentBag(); + int batchIndex = 0; + + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .Returns(async (IEnumerable batch, CancellationToken _) => + { + int idx = Interlocked.Increment(ref batchIndex); + batchOrder.Add(idx); + await Task.Delay(10); + return CreateMockBatchResponse(batch.Count()); + }); + + // Act + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 1); + + // Assert + Assert.AreEqual(300, results.Responses.Count); + Assert.AreEqual(3, batchOrder.Count); + } + + #region Helper Methods + + static Table CreateTableWithMockedClient( + out Mock tableServiceClient, + out Mock tableClient) + { + var settings = new AzureStorageOrchestrationServiceSettings + { + StorageAccountClientProvider = new StorageAccountClientProvider(ConnectionString), + }; + + var azureStorageClient = new AzureStorageClient(settings); + + tableServiceClient = new Mock(MockBehavior.Strict, ConnectionString); + tableClient = new Mock(MockBehavior.Loose, ConnectionString, TableName); + tableClient.Setup(t => t.Name).Returns(TableName); + tableServiceClient.Setup(t => t.GetTableClient(TableName)).Returns(tableClient.Object); + + return new Table(azureStorageClient, tableServiceClient.Object, TableName); + } + + static List CreateTestEntities(string partitionKey, int count) + { + var entities = new List(count); + for (int i = 0; i < count; i++) + { + entities.Add(new TableEntity(partitionKey, $"rk_{i:D5}") + { + ETag = ETag.All, + }); + } + return entities; + } + + static Response> CreateMockBatchResponse(int count) + { + var responses = new List(); + for (int i = 0; i < count; i++) + { + responses.Add(new Mock().Object); + } + return Response.FromValue>(responses, new Mock().Object); + } + + #endregion + } +} diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index ff48d507f..6e122c556 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -316,10 +316,11 @@ public string GetNewLargeMessageBlobName(MessageData message) return $"{instanceId}/message-{activityId}-{eventType}.json.gz"; } - public async Task DeleteLargeMessageBlobs(string sanitizedInstanceId, CancellationToken cancellationToken = default) + public virtual async Task DeleteLargeMessageBlobs(string sanitizedInstanceId, CancellationToken cancellationToken = default) { - int storageOperationCount = 1; - if (await this.blobContainer.ExistsAsync(cancellationToken)) + int storageOperationCount = 0; + + try { await foreach (Page page in this.blobContainer.ListBlobsAsync(sanitizedInstanceId, cancellationToken).AsPages()) { @@ -329,6 +330,22 @@ public async Task DeleteLargeMessageBlobs(string sanitizedInstanceId, Cance storageOperationCount += page.Values.Count; } + + // Count the list operation even if no blobs found (the initial list request still happened) + if (storageOperationCount == 0) + { + storageOperationCount = 1; + } + } + catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404) + { + // Container does not exist; nothing to delete. + storageOperationCount = 1; + } + catch (Azure.RequestFailedException ex) when (ex.Status == 404) + { + // Container does not exist; nothing to delete. + storageOperationCount = 1; } return storageOperationCount; diff --git a/src/DurableTask.AzureStorage/Storage/Table.cs b/src/DurableTask.AzureStorage/Storage/Table.cs index 7d726e1d7..4c41c0a85 100644 --- a/src/DurableTask.AzureStorage/Storage/Table.cs +++ b/src/DurableTask.AzureStorage/Storage/Table.cs @@ -117,6 +117,126 @@ public async Task DeleteBatchAsync(IEnumerable en return await this.ExecuteBatchAsync(entityBatch, item => new TableTransactionAction(TableTransactionActionType.Delete, item), cancellationToken: cancellationToken); } + /// + /// Deletes entities in parallel batches of up to 100. Each batch is an atomic transaction, + /// but multiple batches are submitted concurrently for improved throughput. + /// If a batch fails because an entity was already deleted (404/EntityNotFound), + /// it falls back to individual deletes for that batch, skipping already-deleted entities. + /// + public async Task DeleteBatchParallelAsync( + IReadOnlyList entityBatch, + int maxParallelism, + CancellationToken cancellationToken = default) where T : ITableEntity + { + if (entityBatch.Count == 0) + { + return new TableTransactionResults(Array.Empty(), TimeSpan.Zero, 0); + } + + const int batchSize = 100; + int chunkCount = (entityBatch.Count + batchSize - 1) / batchSize; + var chunks = new List>(chunkCount); + + var currentChunk = new List(batchSize); + foreach (T entity in entityBatch) + { + currentChunk.Add(new TableTransactionAction(TableTransactionActionType.Delete, entity)); + if (currentChunk.Count == batchSize) + { + chunks.Add(currentChunk); + currentChunk = new List(batchSize); + } + } + + if (currentChunk.Count > 0) + { + chunks.Add(currentChunk); + } + + var resultsBuilder = new TableTransactionResultsBuilder(); + var semaphore = new SemaphoreSlim(Math.Max(1, maxParallelism)); + + var tasks = chunks.Select(async chunk => + { + await semaphore.WaitAsync(cancellationToken); + try + { + return await this.ExecuteBatchWithFallbackAsync(chunk, cancellationToken); + } + finally + { + semaphore.Release(); + } + }); + + TableTransactionResults[] allResults = await Task.WhenAll(tasks); + foreach (TableTransactionResults result in allResults) + { + resultsBuilder.Add(result); + } + + return resultsBuilder.ToResults(); + } + + /// + /// Executes a batch transaction. If it fails due to an entity not found (404), + /// falls back to individual delete operations, skipping entities that are already gone. + /// + async Task ExecuteBatchWithFallbackAsync( + List batch, + CancellationToken cancellationToken) + { + try + { + return await this.ExecuteBatchAsync(batch, cancellationToken); + } + catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404) + { + // One or more entities in the batch were already deleted. + // Fall back to individual deletes, skipping 404s. + return await this.DeleteEntitiesIndividuallyAsync(batch, cancellationToken); + } + catch (RequestFailedException ex) when (ex.Status == 404) + { + return await this.DeleteEntitiesIndividuallyAsync(batch, cancellationToken); + } + } + + async Task DeleteEntitiesIndividuallyAsync( + List batch, + CancellationToken cancellationToken) + { + var responses = new List(); + var stopwatch = Stopwatch.StartNew(); + int requestCount = 0; + + foreach (TableTransactionAction action in batch) + { + requestCount++; + try + { + Response response = await this.tableClient.DeleteEntityAsync( + action.Entity.PartitionKey, + action.Entity.RowKey, + ETag.All, + cancellationToken).DecorateFailure(); + responses.Add(response); + this.stats.TableEntitiesWritten.Increment(); + } + catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404) + { + // Entity already deleted; skip. + } + catch (RequestFailedException ex) when (ex.Status == 404) + { + // Entity already deleted; skip. + } + } + + stopwatch.Stop(); + return new TableTransactionResults(responses, stopwatch.Elapsed, requestCount); + } + public async Task InsertOrMergeBatchAsync(IEnumerable entityBatch, CancellationToken cancellationToken = default) where T : ITableEntity { TableTransactionResults results = await this.ExecuteBatchAsync(entityBatch, item => new TableTransactionAction(TableTransactionActionType.UpsertMerge, item), cancellationToken: cancellationToken); diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index f1e67a0bf..c82605f65 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -118,6 +118,25 @@ internal AzureTableTrackingStore( }; } + // For testing purge functionality where HistoryTable and MessageManager are needed + internal AzureTableTrackingStore( + AzureStorageClient azureStorageClient, + Table historyTable, + Table instancesTable, + MessageManager messageManager) + { + this.azureStorageClient = azureStorageClient; + this.settings = azureStorageClient.Settings; + this.stats = azureStorageClient.Stats; + this.storageAccountName = azureStorageClient.TableAccountName; + this.taskHubName = this.settings.TaskHubName; + this.HistoryTable = historyTable; + this.InstancesTable = instancesTable; + this.messageManager = messageManager; + + this.settings.FetchLargeMessageDataEnabled = false; + } + internal Table HistoryTable { get; } internal Table InstancesTable { get; } @@ -568,25 +587,42 @@ async Task DeleteHistoryAsync( ODataCondition odata = condition.ToOData(); - // Limit to batches of 100 to avoid excessive memory usage and table storage scanning int storageRequests = 0; int instancesDeleted = 0; int rowsDeleted = 0; - var options = new ParallelOptions { MaxDegreeOfParallelism = this.settings.MaxStorageOperationConcurrency }; + // Limit concurrent instance purges to avoid overwhelming storage with too many parallel operations. + // Each instance purge internally spawns multiple parallel storage operations, so this should be + // kept moderate. Using 100 to match the original implicit concurrency from pageSizeHint. + const int MaxPurgeInstanceConcurrency = 100; + var throttle = new SemaphoreSlim(MaxPurgeInstanceConcurrency); + var pendingTasks = new List(); + AsyncPageable entitiesPageable = this.InstancesTable.ExecuteQueryAsync(odata.Filter, select: odata.Select, cancellationToken: cancellationToken); - await foreach (Page page in entitiesPageable.AsPages(pageSizeHint: 100)) + await foreach (Page page in entitiesPageable.AsPages(pageSizeHint: MaxPurgeInstanceConcurrency)) { - // The underlying client throttles - await Task.WhenAll(page.Values.Select(async instance => + foreach (OrchestrationInstanceStatus instance in page.Values) { - PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, cancellationToken); - Interlocked.Add(ref instancesDeleted, statisticsFromDeletion.InstancesDeleted); - Interlocked.Add(ref storageRequests, statisticsFromDeletion.RowsDeleted); - Interlocked.Add(ref rowsDeleted, statisticsFromDeletion.RowsDeleted); - })); + await throttle.WaitAsync(cancellationToken); + pendingTasks.Add(Task.Run(async () => + { + try + { + PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, cancellationToken); + Interlocked.Add(ref instancesDeleted, statisticsFromDeletion.InstancesDeleted); + Interlocked.Add(ref storageRequests, statisticsFromDeletion.StorageRequests); + Interlocked.Add(ref rowsDeleted, statisticsFromDeletion.RowsDeleted); + } + finally + { + throttle.Release(); + } + })); + } } + await Task.WhenAll(pendingTasks); + return new PurgeHistoryResult(storageRequests, instancesDeleted, rowsDeleted); } @@ -601,13 +637,14 @@ async Task DeleteAllDataForOrchestrationInstance(Orchestrati .GetHistoryEntitiesResponseInfoAsync( instanceId: sanitizedInstanceId, expectedExecutionId: null, - projectionColumns: new[] { RowKeyProperty, PartitionKeyProperty, TimestampProperty }, + projectionColumns: new[] { RowKeyProperty, PartitionKeyProperty }, cancellationToken) .GetResultsAsync(cancellationToken: cancellationToken); storageRequests += results.RequestCount; IReadOnlyList historyEntities = results.Entities; + int maxParallelism = Math.Max(1, this.settings.MaxStorageOperationConcurrency); var tasks = new List { @@ -618,7 +655,7 @@ async Task DeleteAllDataForOrchestrationInstance(Orchestrati }), Task.Run(async () => { - var deletedEntitiesResponseInfo = await this.HistoryTable.DeleteBatchAsync(historyEntities, cancellationToken); + var deletedEntitiesResponseInfo = await this.HistoryTable.DeleteBatchParallelAsync(historyEntities, maxParallelism, cancellationToken); Interlocked.Add(ref rowsDeleted, deletedEntitiesResponseInfo.Responses.Count); Interlocked.Add(ref storageRequests, deletedEntitiesResponseInfo.RequestCount); }), diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index 502e5fab6..f07a79016 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -560,6 +560,92 @@ private async Task GetBlobCount(string containerName, string directoryName) } + [TestMethod] + public async Task PurgeMultipleInstancesHistoryByTimePeriod_ScalabilityValidation() + { + // This test validates scale improvements: parallel batch delete and pipelined page processing. + // Runs multiple concurrent orchestrations, then purges all of them by time period. + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + DateTime startDateTime = DateTime.Now; + + // Create multiple orchestration instances concurrently + const int instanceCount = 5; + var clients = new List(); + var instanceIds = new List(); + + for (int i = 0; i < instanceCount; i++) + { + string instanceId = $"purge-scale-{Guid.NewGuid():N}"; + instanceIds.Add(instanceId); + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.Factorial), 10, instanceId); + clients.Add(client); + } + + // Wait for all orchestrations to complete + foreach (var client in clients) + { + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + } + + // Verify all instances have history + foreach (string instanceId in instanceIds) + { + List historyEvents = await clients[0].GetOrchestrationHistoryAsync(instanceId); + Assert.IsTrue(historyEvents.Count > 0, $"Instance {instanceId} should have history events"); + } + + // Purge all instances by time period + await clients[0].PurgeInstanceHistoryByTimePeriod( + startDateTime, + DateTime.UtcNow, + new List { OrchestrationStatus.Completed }); + + // Verify all history is purged + foreach (string instanceId in instanceIds) + { + List historyEvents = await clients[0].GetOrchestrationHistoryAsync(instanceId); + Assert.AreEqual(0, historyEvents.Count, $"Instance {instanceId} should have no history after purge"); + + IList stateList = await clients[0].GetStateAsync(instanceId); + Assert.AreEqual(1, stateList.Count); + Assert.IsNull(stateList[0], $"Instance {instanceId} state should be null after purge"); + } + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task PurgeSingleInstanceWithIdempotency() + { + // This test validates that purging the same instance twice doesn't cause errors + // (testing the idempotent batch delete fallback). + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + string instanceId = Guid.NewGuid().ToString(); + await host.StartAsync(); + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.Factorial), 110, instanceId); + await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + + // First purge should succeed + await client.PurgeInstanceHistory(); + + List historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); + Assert.AreEqual(0, historyEvents.Count); + + // Second purge of the same instance should not throw + // (the instance row is already gone, so PurgeInstanceHistoryAsync returns 0) + await client.PurgeInstanceHistory(); + + await host.StopAsync(); + } + } + [TestMethod] public async Task PurgeInstanceHistoryForTimePeriodDeletePartially() { From 8a886b4dfb2ee98cdbff54b4bf3e8b524d45e1e4 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Fri, 13 Mar 2026 16:05:14 -0700 Subject: [PATCH 2/8] revert timestamp change --- .../PurgeScalabilityTests.cs | 8 ++++---- .../Tracking/AzureTableTrackingStore.cs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs b/Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs index 065946ece..e6bb3197f 100644 --- a/Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs +++ b/Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs @@ -199,10 +199,10 @@ public async Task DeleteHistoryAsync_RespectsMaxConcurrency() } /// - /// Verifies that single-instance purge removes the TimestampProperty from the projection columns. + /// Verifies that single-instance purge projects PK, RK, and Timestamp for history query. /// [TestMethod] - public async Task PurgeInstanceHistory_ProjectsOnlyPKAndRK() + public async Task PurgeInstanceHistory_ProjectsPKRKAndTimestamp() { // Arrange var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables(); @@ -246,12 +246,12 @@ public async Task PurgeInstanceHistory_ProjectsOnlyPKAndRK() // Act await trackingStore.PurgeInstanceHistoryAsync("testInstance"); - // Assert: Projection should NOT include Timestamp + // Assert: Projection should include PK, RK, and Timestamp Assert.IsNotNull(capturedSelect, "Select projection was not provided"); var selectList = capturedSelect!.ToList(); Assert.IsTrue(selectList.Contains("PartitionKey"), "Should project PartitionKey"); Assert.IsTrue(selectList.Contains("RowKey"), "Should project RowKey"); - Assert.IsFalse(selectList.Contains("Timestamp"), "Should NOT project Timestamp"); + Assert.IsTrue(selectList.Contains("Timestamp"), "Should project Timestamp"); } /// diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index c82605f65..c9a850d6b 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -637,7 +637,7 @@ async Task DeleteAllDataForOrchestrationInstance(Orchestrati .GetHistoryEntitiesResponseInfoAsync( instanceId: sanitizedInstanceId, expectedExecutionId: null, - projectionColumns: new[] { RowKeyProperty, PartitionKeyProperty }, + projectionColumns: new[] { RowKeyProperty, PartitionKeyProperty, TimestampProperty }, cancellationToken) .GetResultsAsync(cancellationToken: cancellationToken); From db2cb4d8cc9422e634fc7b2c9092bb05f927975f Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Fri, 13 Mar 2026 16:46:57 -0700 Subject: [PATCH 3/8] Refactor purge functionality and add tests for large message cleanup --- .../PurgeScalabilityTests.cs | 446 ------------------ .../MessageManager.cs | 2 +- .../Tracking/AzureTableTrackingStore.cs | 19 - .../AzureStorageScenarioTests.cs | 68 +++ 4 files changed, 69 insertions(+), 466 deletions(-) delete mode 100644 Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs diff --git a/Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs b/Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs deleted file mode 100644 index e6bb3197f..000000000 --- a/Test/DurableTask.AzureStorage.Tests/PurgeScalabilityTests.cs +++ /dev/null @@ -1,446 +0,0 @@ -// ---------------------------------------------------------------------------------- -// Copyright Microsoft Corporation -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// http://www.apache.org/licenses/LICENSE-2.0 -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// ---------------------------------------------------------------------------------- -#nullable enable -namespace DurableTask.AzureStorage.Tests -{ - using System; - using System.Collections.Concurrent; - using System.Collections.Generic; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; - using Azure; - using Azure.Data.Tables; - using DurableTask.AzureStorage.Monitoring; - using DurableTask.AzureStorage.Storage; - using DurableTask.AzureStorage.Tracking; - using DurableTask.Core; - using Microsoft.VisualStudio.TestTools.UnitTesting; - using Moq; - - [TestClass] - public class PurgeScalabilityTests - { - const string ConnectionString = "UseDevelopmentStorage=true"; - const string InstancesTableName = "TestInstances"; - const string HistoryTableName = "TestHistory"; - - /// - /// Verifies that the DeleteHistoryAsync pipeline processes instances - /// and that the storageRequests counter accumulates correctly. - /// Previously there was a bug where rowsDeleted was counted as storageRequests. - /// - [TestMethod] - public async Task DeleteHistoryAsync_ViaPublicApi_AccumulatesStatistics() - { - // Arrange - create a tracking store with mocked tables - var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables(); - - // Setup: Instances table query returns 2 instances across 1 page - var instance1 = new OrchestrationInstanceStatus - { - PartitionKey = "instance1", - RowKey = "", - RuntimeStatus = "Completed", - CreatedTime = DateTime.UtcNow.AddHours(-2), - }; - var instance2 = new OrchestrationInstanceStatus - { - PartitionKey = "instance2", - RowKey = "", - RuntimeStatus = "Completed", - CreatedTime = DateTime.UtcNow.AddHours(-1), - }; - - instancesTableClient - .Setup(t => t.QueryAsync( - It.IsAny(), - It.IsAny(), - It.IsAny>(), - It.IsAny())) - .Returns(AsyncPageable.FromPages(new[] - { - Page.FromValues( - new List { instance1, instance2 }, - null, - new Mock().Object) - })); - - // History table query: each instance has 5 history rows - historyTableClient - .Setup(t => t.QueryAsync( - It.IsAny(), - It.IsAny(), - It.IsAny>(), - It.IsAny())) - .Returns((string filter, int? maxPerPage, IEnumerable? select, CancellationToken ct) => - { - string pk = filter.Contains("instance1") ? "instance1" : "instance2"; - var entities = Enumerable.Range(0, 5) - .Select(i => new TableEntity(pk, $"rk_{i}") { ETag = ETag.All }) - .ToList(); - return AsyncPageable.FromPages(new[] - { - Page.FromValues(entities, null, new Mock().Object) - }); - }); - - // History batch delete succeeds - historyTableClient - .Setup(t => t.SubmitTransactionAsync( - It.IsAny>(), - It.IsAny())) - .ReturnsAsync((IEnumerable actions, CancellationToken _) => - { - int count = actions.Count(); - var responses = Enumerable.Range(0, count).Select(_ => new Mock().Object).ToList(); - return Response.FromValue>(responses, new Mock().Object); - }); - - // Instances table delete succeeds - instancesTableClient - .Setup(t => t.DeleteEntityAsync( - It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .ReturnsAsync(new Mock().Object); - - // Act - PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync( - createdTimeFrom: DateTime.UtcNow.AddHours(-3), - createdTimeTo: DateTime.UtcNow, - runtimeStatus: new[] { OrchestrationStatus.Completed }); - - // Assert - Assert.AreEqual(2, result.InstancesDeleted); - Assert.AreEqual(10, result.RowsDeleted); // 5 history rows per instance × 2 instances - } - - /// - /// Verifies that the purge pipeline limits concurrent instance purges - /// to a bounded number (MaxPurgeInstanceConcurrency = 100). - /// With only 10 instances, all should run concurrently but not exceed the limit. - /// - [TestMethod] - public async Task DeleteHistoryAsync_RespectsMaxConcurrency() - { - // Arrange - int concurrentCount = 0; - int maxObservedConcurrency = 0; - - var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables(); - - // 10 instances to delete - var instances = Enumerable.Range(0, 10).Select(i => - new OrchestrationInstanceStatus - { - PartitionKey = $"instance{i}", - RowKey = "", - RuntimeStatus = "Completed", - CreatedTime = DateTime.UtcNow.AddHours(-1), - }).ToList(); - - instancesTableClient - .Setup(t => t.QueryAsync( - It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) - .Returns(AsyncPageable.FromPages(new[] - { - Page.FromValues(instances, null, new Mock().Object) - })); - - // History: empty for each instance (no rows to delete) - historyTableClient - .Setup(t => t.QueryAsync( - It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) - .Returns(AsyncPageable.FromPages(new[] - { - Page.FromValues(new List(), null, new Mock().Object) - })); - - // Instances delete: track concurrency - instancesTableClient - .Setup(t => t.DeleteEntityAsync( - It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(async (string pk, string rk, ETag etag, CancellationToken ct) => - { - int current = Interlocked.Increment(ref concurrentCount); - int snapshot; - do - { - snapshot = Volatile.Read(ref maxObservedConcurrency); - } - while (current > snapshot && Interlocked.CompareExchange(ref maxObservedConcurrency, current, snapshot) != snapshot); - - await Task.Delay(30); // Simulate latency - Interlocked.Decrement(ref concurrentCount); - return new Mock().Object; - }); - - // Act - PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync( - createdTimeFrom: DateTime.UtcNow.AddHours(-3), - createdTimeTo: DateTime.UtcNow, - runtimeStatus: new[] { OrchestrationStatus.Completed }); - - // Assert - Assert.AreEqual(10, result.InstancesDeleted); - // With 10 instances and MaxPurgeInstanceConcurrency=100, all 10 should be able to run concurrently - Assert.IsTrue( - maxObservedConcurrency <= 100, - $"Max observed concurrency ({maxObservedConcurrency}) should not exceed MaxPurgeInstanceConcurrency (100)"); - } - - /// - /// Verifies that single-instance purge projects PK, RK, and Timestamp for history query. - /// - [TestMethod] - public async Task PurgeInstanceHistory_ProjectsPKRKAndTimestamp() - { - // Arrange - var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables(); - - // Setup instances table to return one instance - var instance = new OrchestrationInstanceStatus - { - PartitionKey = "testInstance", - RowKey = "", - RuntimeStatus = "Completed", - }; - - instancesTableClient - .Setup(t => t.QueryAsync( - It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) - .Returns(AsyncPageable.FromPages(new[] - { - Page.FromValues( - new List { instance }, null, new Mock().Object) - })); - - // Capture the actual select columns passed to the history query - IEnumerable? capturedSelect = null; - historyTableClient - .Setup(t => t.QueryAsync( - It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) - .Callback((string filter, int? maxPerPage, IEnumerable? select, CancellationToken ct) => - { - capturedSelect = select; - }) - .Returns(AsyncPageable.FromPages(new[] - { - Page.FromValues(new List(), null, new Mock().Object) - })); - - instancesTableClient - .Setup(t => t.DeleteEntityAsync( - It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .ReturnsAsync(new Mock().Object); - - // Act - await trackingStore.PurgeInstanceHistoryAsync("testInstance"); - - // Assert: Projection should include PK, RK, and Timestamp - Assert.IsNotNull(capturedSelect, "Select projection was not provided"); - var selectList = capturedSelect!.ToList(); - Assert.IsTrue(selectList.Contains("PartitionKey"), "Should project PartitionKey"); - Assert.IsTrue(selectList.Contains("RowKey"), "Should project RowKey"); - Assert.IsTrue(selectList.Contains("Timestamp"), "Should project Timestamp"); - } - - /// - /// Verifies that a single-instance purge deletes instance from instances table. - /// - [TestMethod] - public async Task PurgeInstanceHistory_ByInstanceId_DeletesInstance() - { - // Arrange - var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables(); - - var instance = new OrchestrationInstanceStatus - { - PartitionKey = "myInstance", - RowKey = "", - RuntimeStatus = "Completed", - }; - - instancesTableClient - .Setup(t => t.QueryAsync( - It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) - .Returns(AsyncPageable.FromPages(new[] - { - Page.FromValues( - new List { instance }, null, new Mock().Object) - })); - - historyTableClient - .Setup(t => t.QueryAsync( - It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) - .Returns(AsyncPageable.FromPages(new[] - { - Page.FromValues(new List(), null, new Mock().Object) - })); - - instancesTableClient - .Setup(t => t.DeleteEntityAsync( - It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .ReturnsAsync(new Mock().Object); - - // Act - PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync("myInstance"); - - // Assert - Assert.AreEqual(1, result.InstancesDeleted); - instancesTableClient.Verify( - t => t.DeleteEntityAsync("myInstance", string.Empty, ETag.All, It.IsAny()), - Times.Once); - } - - /// - /// Verifies that purging a non-existent instance returns zero. - /// - [TestMethod] - public async Task PurgeInstanceHistory_InstanceNotFound_ReturnsZero() - { - // Arrange - var (trackingStore, instancesTableClient, _) = CreateTrackingStoreWithMockedTables(); - - instancesTableClient - .Setup(t => t.QueryAsync( - It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) - .Returns(AsyncPageable.FromPages(new[] - { - Page.FromValues( - new List(), null, new Mock().Object) - })); - - // Act - PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync("nonexistent"); - - // Assert - Assert.AreEqual(0, result.InstancesDeleted); - Assert.AreEqual(0, result.RowsDeleted); - } - - /// - /// Verifies that history batch delete uses parallel execution (via DeleteBatchParallelAsync). - /// - [TestMethod] - public async Task PurgeInstanceHistory_WithManyHistoryRows_UsesParallelBatchDelete() - { - // Arrange - var (trackingStore, instancesTableClient, historyTableClient) = CreateTrackingStoreWithMockedTables(); - - var instance = new OrchestrationInstanceStatus - { - PartitionKey = "testPK", - RowKey = "", - RuntimeStatus = "Completed", - }; - - instancesTableClient - .Setup(t => t.QueryAsync( - It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) - .Returns(AsyncPageable.FromPages(new[] - { - Page.FromValues( - new List { instance }, null, new Mock().Object) - })); - - // Return 250 history entities (which should be split into 3 batches: 100+100+50) - var historyEntities = Enumerable.Range(0, 250) - .Select(i => new TableEntity("testPK", $"rk_{i:D5}") { ETag = ETag.All }) - .ToList(); - - historyTableClient - .Setup(t => t.QueryAsync( - It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny())) - .Returns(AsyncPageable.FromPages(new[] - { - Page.FromValues(historyEntities, null, new Mock().Object) - })); - - // Track batch deletes - int batchDeleteCallCount = 0; - historyTableClient - .Setup(t => t.SubmitTransactionAsync( - It.IsAny>(), - It.IsAny())) - .ReturnsAsync((IEnumerable actions, CancellationToken _) => - { - Interlocked.Increment(ref batchDeleteCallCount); - int count = actions.Count(); - var responses = Enumerable.Range(0, count).Select(_ => new Mock().Object).ToList(); - return Response.FromValue>(responses, new Mock().Object); - }); - - instancesTableClient - .Setup(t => t.DeleteEntityAsync( - It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .ReturnsAsync(new Mock().Object); - - // Act - PurgeHistoryResult result = await trackingStore.PurgeInstanceHistoryAsync("testPK"); - - // Assert - Assert.AreEqual(1, result.InstancesDeleted); - Assert.AreEqual(250, result.RowsDeleted); - // Should have made 3 batch delete calls (100 + 100 + 50) - Assert.AreEqual(3, batchDeleteCallCount, "Expected 3 batch transactions for 250 entities"); - } - - #region Helper Methods - - static (AzureTableTrackingStore trackingStore, Mock instancesTableClient, Mock historyTableClient) - CreateTrackingStoreWithMockedTables(Action? modifySettings = null) - { - var settings = new AzureStorageOrchestrationServiceSettings - { - StorageAccountClientProvider = new StorageAccountClientProvider(ConnectionString), - FetchLargeMessageDataEnabled = false, - }; - - modifySettings?.Invoke(settings); - - var azureStorageClient = new AzureStorageClient(settings); - - // Create mocked instances table - var instancesServiceClient = new Mock(MockBehavior.Strict, ConnectionString); - var instancesTableClient = new Mock(MockBehavior.Loose, ConnectionString, InstancesTableName); - instancesTableClient.Setup(t => t.Name).Returns(InstancesTableName); - instancesServiceClient.Setup(t => t.GetTableClient(InstancesTableName)).Returns(instancesTableClient.Object); - var instancesTable = new Table(azureStorageClient, instancesServiceClient.Object, InstancesTableName); - - // Create mocked history table - var historyServiceClient = new Mock(MockBehavior.Strict, ConnectionString); - var historyTableClient = new Mock(MockBehavior.Loose, ConnectionString, HistoryTableName); - historyTableClient.Setup(t => t.Name).Returns(HistoryTableName); - historyServiceClient.Setup(t => t.GetTableClient(HistoryTableName)).Returns(historyTableClient.Object); - var historyTable = new Table(azureStorageClient, historyServiceClient.Object, HistoryTableName); - - // Create mock message manager that returns 1 storage operation (no blobs to delete) - var messageManager = new Mock( - settings, azureStorageClient, "test-largemessages") { CallBase = false }; - messageManager - .Setup(m => m.DeleteLargeMessageBlobs(It.IsAny(), It.IsAny())) - .ReturnsAsync(1); - - // Create tracking store using the internal test constructor - var trackingStore = new AzureTableTrackingStore( - azureStorageClient, - historyTable, - instancesTable, - messageManager.Object); - - return (trackingStore, instancesTableClient, historyTableClient); - } - - #endregion - } -} \ No newline at end of file diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index 6e122c556..48f46ffac 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -316,7 +316,7 @@ public string GetNewLargeMessageBlobName(MessageData message) return $"{instanceId}/message-{activityId}-{eventType}.json.gz"; } - public virtual async Task DeleteLargeMessageBlobs(string sanitizedInstanceId, CancellationToken cancellationToken = default) + public async Task DeleteLargeMessageBlobs(string sanitizedInstanceId, CancellationToken cancellationToken = default) { int storageOperationCount = 0; diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index c9a850d6b..67e697589 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -118,25 +118,6 @@ internal AzureTableTrackingStore( }; } - // For testing purge functionality where HistoryTable and MessageManager are needed - internal AzureTableTrackingStore( - AzureStorageClient azureStorageClient, - Table historyTable, - Table instancesTable, - MessageManager messageManager) - { - this.azureStorageClient = azureStorageClient; - this.settings = azureStorageClient.Settings; - this.stats = azureStorageClient.Stats; - this.storageAccountName = azureStorageClient.TableAccountName; - this.taskHubName = this.settings.TaskHubName; - this.HistoryTable = historyTable; - this.InstancesTable = instancesTable; - this.messageManager = messageManager; - - this.settings.FetchLargeMessageDataEnabled = false; - } - internal Table HistoryTable { get; } internal Table InstancesTable { get; } diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index 4d8c779d2..05a448fec 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -647,6 +647,74 @@ public async Task PurgeSingleInstanceWithIdempotency() } } + [TestMethod] + public async Task PurgeSingleInstance_WithLargeBlobs_CleansUpBlobs() + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + + string instanceId = Guid.NewGuid().ToString(); + // Generate a payload large enough to be stored as a blob (>60KB threshold) + string largeMessage = new string('x', 70 * 1024); + + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.Echo), largeMessage, instanceId); + OrchestrationState status = await client.WaitForCompletionAsync(TimeSpan.FromMinutes(2)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + + // Verify blobs exist before purge + int blobCount = await this.GetBlobCount("test-largemessages", instanceId); + Assert.IsTrue(blobCount > 0, "Should have large message blobs before purge"); + + // Purge + await client.PurgeInstanceHistory(); + + // Verify blobs are cleaned up + blobCount = await this.GetBlobCount("test-largemessages", instanceId); + Assert.AreEqual(0, blobCount, "All large message blobs should be deleted after purge"); + + // Verify history is gone + List historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); + Assert.AreEqual(0, historyEvents.Count); + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task PurgeInstance_WithManyHistoryRows_DeletesAll() + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + + string instanceId = Guid.NewGuid().ToString(); + // FanOutFanIn with 50 parallel activities creates 100+ history rows + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.FanOutFanIn), 50, instanceId); + OrchestrationState status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + + // Verify lots of history exists + List historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); + Assert.IsTrue(historyEvents.Count > 50, $"Expected many history events, got {historyEvents.Count}"); + + // Purge + await client.PurgeInstanceHistory(); + + // Verify clean + historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); + Assert.AreEqual(0, historyEvents.Count); + + IList stateList = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, stateList.Count); + Assert.IsNull(stateList[0]); + + await host.StopAsync(); + } + } + [TestMethod] public async Task PurgeInstanceHistoryForTimePeriodDeletePartially() { From 1603320dbeadbc430d9d3a12942f84ac122bedf2 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Fri, 13 Mar 2026 17:51:32 -0700 Subject: [PATCH 4/8] remove semaphore --- .../Storage/TableDeleteBatchParallelTests.cs | 101 +++--------------- src/DurableTask.AzureStorage/Storage/Table.cs | 18 +--- .../Tracking/AzureTableTrackingStore.cs | 3 +- 3 files changed, 20 insertions(+), 102 deletions(-) diff --git a/Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs b/Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs index 6c49679ca..c87180758 100644 --- a/Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs +++ b/Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs @@ -34,14 +34,11 @@ public class TableDeleteBatchParallelTests [TestMethod] public async Task DeleteBatchParallelAsync_EmptyBatch_ReturnsEmptyResults() { - // Arrange Table table = CreateTableWithMockedClient(out _, out _); var entities = new List(); - // Act - TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4); + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); - // Assert Assert.AreEqual(0, results.Responses.Count); Assert.AreEqual(0, results.RequestCount); } @@ -49,22 +46,17 @@ public async Task DeleteBatchParallelAsync_EmptyBatch_ReturnsEmptyResults() [TestMethod] public async Task DeleteBatchParallelAsync_SingleBatch_SubmitsOneTransaction() { - // Arrange var entities = CreateTestEntities("pk", count: 50); - Table table = CreateTableWithMockedClient(out _, out Mock tableClient); - var mockResponses = CreateMockBatchResponse(50); tableClient .Setup(t => t.SubmitTransactionAsync( It.Is>(a => a.Count() == 50), It.IsAny())) - .ReturnsAsync(mockResponses); + .ReturnsAsync(CreateMockBatchResponse(50)); - // Act - TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4); + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); - // Assert Assert.AreEqual(50, results.Responses.Count); tableClient.Verify( t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), @@ -74,9 +66,7 @@ public async Task DeleteBatchParallelAsync_SingleBatch_SubmitsOneTransaction() [TestMethod] public async Task DeleteBatchParallelAsync_MultipleBatches_SplitsIntoChunksOf100() { - // Arrange var entities = CreateTestEntities("pk", count: 250); - Table table = CreateTableWithMockedClient(out _, out Mock tableClient); tableClient @@ -86,10 +76,8 @@ public async Task DeleteBatchParallelAsync_MultipleBatches_SplitsIntoChunksOf100 .ReturnsAsync((IEnumerable batch, CancellationToken _) => CreateMockBatchResponse(batch.Count())); - // Act - TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 10); + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); - // Assert: 250 entities = 3 batches (100 + 100 + 50) Assert.AreEqual(250, results.Responses.Count); tableClient.Verify( t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), @@ -97,10 +85,8 @@ public async Task DeleteBatchParallelAsync_MultipleBatches_SplitsIntoChunksOf100 } [TestMethod] - public async Task DeleteBatchParallelAsync_RespectsMaxParallelism() + public async Task DeleteBatchParallelAsync_SubmitsBatchesConcurrently() { - // Arrange - int maxParallelism = 2; var entities = CreateTestEntities("pk", count: 500); // 5 batches of 100 int concurrentCount = 0; int maxConcurrent = 0; @@ -121,37 +107,32 @@ public async Task DeleteBatchParallelAsync_RespectsMaxParallelism() } while (current > snapshot && Interlocked.CompareExchange(ref maxConcurrent, current, snapshot) != snapshot); - await Task.Delay(50); // Simulate some latency + await Task.Delay(50); Interlocked.Decrement(ref concurrentCount); return CreateMockBatchResponse(batch.Count()); }); - // Act - await table.DeleteBatchParallelAsync(entities, maxParallelism: maxParallelism); + await table.DeleteBatchParallelAsync(entities); - // Assert + // All 5 batches should run concurrently since there's no internal semaphore Assert.IsTrue( - maxConcurrent <= maxParallelism, - $"Max concurrent batches ({maxConcurrent}) exceeded maxParallelism ({maxParallelism})"); + maxConcurrent > 1, + $"Expected concurrent execution, but max concurrent was {maxConcurrent}"); } [TestMethod] public async Task DeleteBatchParallelAsync_BatchFails404_FallsBackToIndividualDeletes() { - // Arrange var entities = CreateTestEntities("pk", count: 3); - Table table = CreateTableWithMockedClient(out _, out Mock tableClient); - // First batch call fails with 404 (e.g., one entity already deleted) tableClient .Setup(t => t.SubmitTransactionAsync( It.IsAny>(), It.IsAny())) .ThrowsAsync(new RequestFailedException(404, "Entity not found")); - // Individual deletes succeed var mockResponse = new Mock(); tableClient .Setup(t => t.DeleteEntityAsync( @@ -161,10 +142,8 @@ public async Task DeleteBatchParallelAsync_BatchFails404_FallsBackToIndividualDe It.IsAny())) .ReturnsAsync(mockResponse.Object); - // Act - TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4); + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); - // Assert: should fall back to individual deletes Assert.AreEqual(3, results.Responses.Count); tableClient.Verify( t => t.DeleteEntityAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), @@ -174,19 +153,15 @@ public async Task DeleteBatchParallelAsync_BatchFails404_FallsBackToIndividualDe [TestMethod] public async Task DeleteBatchParallelAsync_IndividualDeleteSkips404() { - // Arrange var entities = CreateTestEntities("pk", count: 3); - Table table = CreateTableWithMockedClient(out _, out Mock tableClient); - // Batch fails with 404 tableClient .Setup(t => t.SubmitTransactionAsync( It.IsAny>(), It.IsAny())) .ThrowsAsync(new RequestFailedException(404, "Entity not found")); - // Individual delete: first succeeds, second returns 404, third succeeds int callCount = 0; var mockResponse = new Mock(); tableClient @@ -205,20 +180,16 @@ public async Task DeleteBatchParallelAsync_IndividualDeleteSkips404() return Task.FromResult(mockResponse.Object); }); - // Act - TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4); + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); - // Assert: only 2 responses (the 404 was skipped) Assert.AreEqual(2, results.Responses.Count); - Assert.AreEqual(3, results.RequestCount); // Still counted 3 requests + Assert.AreEqual(3, results.RequestCount); } [TestMethod] public async Task DeleteBatchParallelAsync_ExactlyOneBatch_NoBoundaryIssues() { - // Arrange: exactly 100 entities = 1 batch var entities = CreateTestEntities("pk", count: 100); - Table table = CreateTableWithMockedClient(out _, out Mock tableClient); tableClient @@ -227,10 +198,8 @@ public async Task DeleteBatchParallelAsync_ExactlyOneBatch_NoBoundaryIssues() It.IsAny())) .ReturnsAsync(CreateMockBatchResponse(100)); - // Act - TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4); + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); - // Assert Assert.AreEqual(100, results.Responses.Count); tableClient.Verify( t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), @@ -240,9 +209,7 @@ public async Task DeleteBatchParallelAsync_ExactlyOneBatch_NoBoundaryIssues() [TestMethod] public async Task DeleteBatchParallelAsync_101Entities_CreatesTwoBatches() { - // Arrange: 101 entities = 2 batches (100 + 1) var entities = CreateTestEntities("pk", count: 101); - Table table = CreateTableWithMockedClient(out _, out Mock tableClient); tableClient @@ -252,10 +219,8 @@ public async Task DeleteBatchParallelAsync_101Entities_CreatesTwoBatches() .ReturnsAsync((IEnumerable batch, CancellationToken _) => CreateMockBatchResponse(batch.Count())); - // Act - TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 4); + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); - // Assert Assert.AreEqual(101, results.Responses.Count); tableClient.Verify( t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), @@ -265,10 +230,8 @@ public async Task DeleteBatchParallelAsync_101Entities_CreatesTwoBatches() [TestMethod] public async Task DeleteBatchParallelAsync_CancellationToken_IsPropagated() { - // Arrange var entities = CreateTestEntities("pk", count: 200); using var cts = new CancellationTokenSource(); - Table table = CreateTableWithMockedClient(out _, out Mock tableClient); int batchesSubmitted = 0; @@ -281,46 +244,14 @@ public async Task DeleteBatchParallelAsync_CancellationToken_IsPropagated() int count = Interlocked.Increment(ref batchesSubmitted); if (count == 1) { - // Cancel after first batch starts cts.Cancel(); } ct.ThrowIfCancellationRequested(); return CreateMockBatchResponse(batch.Count()); }); - // Act & Assert await Assert.ThrowsExceptionAsync( - () => table.DeleteBatchParallelAsync(entities, maxParallelism: 1, cts.Token)); - } - - [TestMethod] - public async Task DeleteBatchParallelAsync_MaxParallelismOne_ExecutesSequentially() - { - // Arrange - var entities = CreateTestEntities("pk", count: 300); // 3 batches - var batchOrder = new ConcurrentBag(); - int batchIndex = 0; - - Table table = CreateTableWithMockedClient(out _, out Mock tableClient); - - tableClient - .Setup(t => t.SubmitTransactionAsync( - It.IsAny>(), - It.IsAny())) - .Returns(async (IEnumerable batch, CancellationToken _) => - { - int idx = Interlocked.Increment(ref batchIndex); - batchOrder.Add(idx); - await Task.Delay(10); - return CreateMockBatchResponse(batch.Count()); - }); - - // Act - TableTransactionResults results = await table.DeleteBatchParallelAsync(entities, maxParallelism: 1); - - // Assert - Assert.AreEqual(300, results.Responses.Count); - Assert.AreEqual(3, batchOrder.Count); + () => table.DeleteBatchParallelAsync(entities, cts.Token)); } #region Helper Methods diff --git a/src/DurableTask.AzureStorage/Storage/Table.cs b/src/DurableTask.AzureStorage/Storage/Table.cs index 4c41c0a85..33542c64e 100644 --- a/src/DurableTask.AzureStorage/Storage/Table.cs +++ b/src/DurableTask.AzureStorage/Storage/Table.cs @@ -120,12 +120,12 @@ public async Task DeleteBatchAsync(IEnumerable en /// /// Deletes entities in parallel batches of up to 100. Each batch is an atomic transaction, /// but multiple batches are submitted concurrently for improved throughput. + /// Concurrency is controlled by the global . /// If a batch fails because an entity was already deleted (404/EntityNotFound), /// it falls back to individual deletes for that batch, skipping already-deleted entities. /// public async Task DeleteBatchParallelAsync( IReadOnlyList entityBatch, - int maxParallelism, CancellationToken cancellationToken = default) where T : ITableEntity { if (entityBatch.Count == 0) @@ -154,22 +154,10 @@ public async Task DeleteBatchParallelAsync( } var resultsBuilder = new TableTransactionResultsBuilder(); - var semaphore = new SemaphoreSlim(Math.Max(1, maxParallelism)); - var tasks = chunks.Select(async chunk => - { - await semaphore.WaitAsync(cancellationToken); - try - { - return await this.ExecuteBatchWithFallbackAsync(chunk, cancellationToken); - } - finally - { - semaphore.Release(); - } - }); + TableTransactionResults[] allResults = await Task.WhenAll( + chunks.Select(chunk => this.ExecuteBatchWithFallbackAsync(chunk, cancellationToken))); - TableTransactionResults[] allResults = await Task.WhenAll(tasks); foreach (TableTransactionResults result in allResults) { resultsBuilder.Add(result); diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 67e697589..f06a9a558 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -625,7 +625,6 @@ async Task DeleteAllDataForOrchestrationInstance(Orchestrati storageRequests += results.RequestCount; IReadOnlyList historyEntities = results.Entities; - int maxParallelism = Math.Max(1, this.settings.MaxStorageOperationConcurrency); var tasks = new List { @@ -636,7 +635,7 @@ async Task DeleteAllDataForOrchestrationInstance(Orchestrati }), Task.Run(async () => { - var deletedEntitiesResponseInfo = await this.HistoryTable.DeleteBatchParallelAsync(historyEntities, maxParallelism, cancellationToken); + var deletedEntitiesResponseInfo = await this.HistoryTable.DeleteBatchParallelAsync(historyEntities, cancellationToken); Interlocked.Add(ref rowsDeleted, deletedEntitiesResponseInfo.Responses.Count); Interlocked.Add(ref storageRequests, deletedEntitiesResponseInfo.RequestCount); }), From 473cb65198e6c9c04fea175c3b0904cde7787f62 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Mon, 16 Mar 2026 09:00:38 -0700 Subject: [PATCH 5/8] Add partial purge with timeout support - Add TimeSpan? Timeout to PurgeInstanceFilter for partial purge support - Add bool? IsComplete to PurgeHistoryResult to indicate completion status - Add new PurgeInstanceHistoryAsync overload with TimeSpan timeout parameter - Use CancellationToken-based timeout (linked CTS) in DeleteHistoryAsync - Already-dispatched deletions complete before returning partial results - Backward compatible: no timeout = original behavior (IsComplete = null) - Forward IsComplete through ToCorePurgeHistoryResult to PurgeResult - Add scenario tests for partial purge timeout, generous timeout, and compat --- .../AzureStorageOrchestrationService.cs | 29 ++++- .../PurgeHistoryResult.cs | 23 +++- .../Tracking/AzureTableTrackingStore.cs | 67 +++++++--- .../Tracking/ITrackingStore.cs | 3 +- .../Tracking/TrackingStoreBase.cs | 2 +- src/DurableTask.Core/PurgeInstanceFilter.cs | 9 ++ .../AzureStorageScenarioTests.cs | 119 ++++++++++++++++++ .../TestOrchestrationClient.cs | 8 ++ 8 files changed, 235 insertions(+), 25 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 9ad814c43..68b263c46 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -2030,6 +2030,21 @@ public Task PurgeInstanceHistoryAsync(DateTime createdTimeFr return this.trackingStore.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus); } + /// + /// Purge history for orchestrations that match the specified parameters, with a timeout. + /// Use this overload to perform partial purges and avoid timeouts when there are many instances. + /// Check to determine if more purging is needed. + /// + /// CreatedTime of orchestrations. Purges history grater than this value. + /// CreatedTime of orchestrations. Purges history less than this value. + /// RuntimeStatus of orchestrations. You can specify several status. + /// Maximum time to spend purging. Already-started deletions will complete before the method returns. + /// Class containing number of storage requests sent, along with instances and rows deleted/purged + public Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan timeout) + { + return this.trackingStore.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus, timeout); + } + /// async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(string instanceId) { @@ -2040,10 +2055,16 @@ async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync /// async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter) { - PurgeHistoryResult storagePurgeHistoryResult = await this.PurgeInstanceHistoryAsync( - purgeInstanceFilter.CreatedTimeFrom, - purgeInstanceFilter.CreatedTimeTo, - purgeInstanceFilter.RuntimeStatus); + PurgeHistoryResult storagePurgeHistoryResult = purgeInstanceFilter.Timeout.HasValue + ? await this.PurgeInstanceHistoryAsync( + purgeInstanceFilter.CreatedTimeFrom, + purgeInstanceFilter.CreatedTimeTo, + purgeInstanceFilter.RuntimeStatus, + purgeInstanceFilter.Timeout.Value) + : await this.PurgeInstanceHistoryAsync( + purgeInstanceFilter.CreatedTimeFrom, + purgeInstanceFilter.CreatedTimeTo, + purgeInstanceFilter.RuntimeStatus); return storagePurgeHistoryResult.ToCorePurgeHistoryResult(); } #nullable enable diff --git a/src/DurableTask.AzureStorage/PurgeHistoryResult.cs b/src/DurableTask.AzureStorage/PurgeHistoryResult.cs index d42ce9d45..3ed7b3e18 100644 --- a/src/DurableTask.AzureStorage/PurgeHistoryResult.cs +++ b/src/DurableTask.AzureStorage/PurgeHistoryResult.cs @@ -33,6 +33,19 @@ public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDel this.RowsDeleted = rowsDeleted; } + /// + /// Constructor for purge history statistics with completion status. + /// + /// Requests sent to storage + /// Number of instances deleted + /// Number of rows deleted + /// Whether the purge operation completed all matching instances. + public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDeleted, bool? isComplete) + : this(storageRequests, instancesDeleted, rowsDeleted) + { + this.IsComplete = isComplete; + } + /// /// Number of requests sent to Storage during this execution of purge history /// @@ -48,12 +61,20 @@ public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDel /// public int RowsDeleted { get; } + /// + /// Gets a value indicating whether the purge operation is complete. + /// true if all matching instances were purged; + /// false if more instances remain and purge should be called again; + /// null if completion status is unknown. + /// + public bool? IsComplete { get; } + /// /// Converts from AzureStorage.PurgeHistoryResult to Core.PurgeResult type /// public PurgeResult ToCorePurgeHistoryResult() { - return new PurgeResult(this.InstancesDeleted); + return new PurgeResult(this.InstancesDeleted, this.IsComplete); } } } diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index f06a9a558..fc83c6d6c 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -557,6 +557,7 @@ async Task DeleteHistoryAsync( DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, + TimeSpan? timeout, CancellationToken cancellationToken) { var condition = OrchestrationInstanceStatusQueryCondition.Parse( @@ -572,6 +573,16 @@ async Task DeleteHistoryAsync( int instancesDeleted = 0; int rowsDeleted = 0; + // Create a timeout token that cancels after the specified duration. + // If no timeout is specified, use CancellationToken.None so behavior is unchanged. + using CancellationTokenSource timeoutCts = timeout.HasValue + ? new CancellationTokenSource(timeout.Value) + : null; + using CancellationTokenSource linkedCts = timeout.HasValue + ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token) + : null; + CancellationToken effectiveToken = linkedCts?.Token ?? cancellationToken; + // Limit concurrent instance purges to avoid overwhelming storage with too many parallel operations. // Each instance purge internally spawns multiple parallel storage operations, so this should be // kept moderate. Using 100 to match the original implicit concurrency from pageSizeHint. @@ -579,32 +590,51 @@ async Task DeleteHistoryAsync( var throttle = new SemaphoreSlim(MaxPurgeInstanceConcurrency); var pendingTasks = new List(); - AsyncPageable entitiesPageable = this.InstancesTable.ExecuteQueryAsync(odata.Filter, select: odata.Select, cancellationToken: cancellationToken); - await foreach (Page page in entitiesPageable.AsPages(pageSizeHint: MaxPurgeInstanceConcurrency)) + bool timedOut = false; + + try { - foreach (OrchestrationInstanceStatus instance in page.Values) + AsyncPageable entitiesPageable = this.InstancesTable.ExecuteQueryAsync(odata.Filter, select: odata.Select, cancellationToken: effectiveToken); + await foreach (Page page in entitiesPageable.AsPages(pageSizeHint: MaxPurgeInstanceConcurrency)) { - await throttle.WaitAsync(cancellationToken); - pendingTasks.Add(Task.Run(async () => + foreach (OrchestrationInstanceStatus instance in page.Values) { - try - { - PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, cancellationToken); - Interlocked.Add(ref instancesDeleted, statisticsFromDeletion.InstancesDeleted); - Interlocked.Add(ref storageRequests, statisticsFromDeletion.StorageRequests); - Interlocked.Add(ref rowsDeleted, statisticsFromDeletion.RowsDeleted); - } - finally + effectiveToken.ThrowIfCancellationRequested(); + + await throttle.WaitAsync(effectiveToken); + pendingTasks.Add(Task.Run(async () => { - throttle.Release(); - } - })); + try + { + PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, cancellationToken); + Interlocked.Add(ref instancesDeleted, statisticsFromDeletion.InstancesDeleted); + Interlocked.Add(ref storageRequests, statisticsFromDeletion.StorageRequests); + Interlocked.Add(ref rowsDeleted, statisticsFromDeletion.RowsDeleted); + } + finally + { + throttle.Release(); + } + })); + } } } + catch (OperationCanceledException) when (timeoutCts != null && timeoutCts.IsCancellationRequested) + { + // Timeout reached — stop accepting new instances but let already-dispatched ones finish. + timedOut = true; + } + // Wait for all dispatched deletions to finish before returning. await Task.WhenAll(pendingTasks); - return new PurgeHistoryResult(storageRequests, instancesDeleted, rowsDeleted); + // Determine completion status: + // - If a timeout was specified and fired, more instances may remain (isComplete = false). + // - If a timeout was specified and didn't fire, all instances were purged (isComplete = true). + // - If no timeout was specified, we purge everything (isComplete = null for backward compat). + bool? isComplete = timeout.HasValue ? !timedOut : (bool?)null; + + return new PurgeHistoryResult(storageRequests, instancesDeleted, rowsDeleted, isComplete); } async Task DeleteAllDataForOrchestrationInstance(OrchestrationInstanceStatus orchestrationInstanceStatus, CancellationToken cancellationToken) @@ -694,6 +724,7 @@ public override async Task PurgeInstanceHistoryAsync( DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, + TimeSpan? timeout = null, CancellationToken cancellationToken = default) { Stopwatch stopwatch = Stopwatch.StartNew(); @@ -703,7 +734,7 @@ public override async Task PurgeInstanceHistoryAsync( status == OrchestrationStatus.Canceled || status == OrchestrationStatus.Failed).ToList(); - PurgeHistoryResult result = await this.DeleteHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatusList, cancellationToken); + PurgeHistoryResult result = await this.DeleteHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatusList, timeout, cancellationToken); this.settings.Logger.PurgeInstanceHistory( this.storageAccountName, diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index a1fc52e9f..73f9583a8 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -194,8 +194,9 @@ interface ITrackingStore /// Start creation time for querying instances for purging /// End creation time for querying instances for purging /// List of runtime status for querying instances for purging. Only Completed, Terminated, or Failed will be processed + /// Maximum time to spend purging. If null, all matching instances are purged. /// The token to monitor for cancellation requests. The default value is . /// Class containing number of storage requests sent, along with instances and rows deleted/purged - Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, CancellationToken cancellationToken = default); + Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan? timeout = null, CancellationToken cancellationToken = default); } } diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index d02a729c0..6eb14aeed 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -86,7 +86,7 @@ public virtual Task PurgeInstanceHistoryAsync(string instanc } /// - public virtual Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, CancellationToken cancellationToken = default) + public virtual Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan? timeout = null, CancellationToken cancellationToken = default) { throw new NotSupportedException(); } diff --git a/src/DurableTask.Core/PurgeInstanceFilter.cs b/src/DurableTask.Core/PurgeInstanceFilter.cs index 30f68a23f..ab4fef6d9 100644 --- a/src/DurableTask.Core/PurgeInstanceFilter.cs +++ b/src/DurableTask.Core/PurgeInstanceFilter.cs @@ -48,5 +48,14 @@ public PurgeInstanceFilter(DateTime createdTimeFrom, DateTime? createdTimeTo, IE /// The runtime status of the orchestrations to purge. /// public IEnumerable? RuntimeStatus { get; } + + /// + /// The maximum amount of time to spend purging instances in a single call. + /// If null, all matching instances are purged regardless of how long it takes. + /// When set, the purge stops accepting new instances after this duration elapses + /// and returns with set to false. + /// Already-started instance deletions will complete before the method returns. + /// + public TimeSpan? Timeout { get; set; } } } \ No newline at end of file diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index 05a448fec..f9509dd57 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -715,6 +715,125 @@ public async Task PurgeInstance_WithManyHistoryRows_DeletesAll() } } + [TestMethod] + public async Task PartialPurge_TimesOutThenCompletesOnRetry() + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + DateTime startDateTime = DateTime.UtcNow; + + // Create several orchestration instances + const int totalInstances = 5; + var clients = new List(); + for (int i = 0; i < totalInstances; i++) + { + string instanceId = $"partial-purge-{Guid.NewGuid():N}"; + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.Factorial), 10, instanceId); + clients.Add(client); + } + + foreach (var client in clients) + { + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + } + + DateTime endDateTime = DateTime.UtcNow; + var statuses = new List { OrchestrationStatus.Completed }; + + // Partial purge with a very short timeout — may not finish all instances + PurgeHistoryResult firstResult = await clients[0].PurgeInstanceHistoryByTimePeriodPartial( + startDateTime, endDateTime, statuses, timeout: TimeSpan.FromMilliseconds(1)); + + // With such a tiny timeout, IsComplete should be non-null (timeout was specified) + Assert.IsNotNull(firstResult.IsComplete, "IsComplete should be non-null when timeout is specified"); + + if (firstResult.IsComplete == false) + { + // More work to do — purge the rest with a generous timeout + PurgeHistoryResult secondResult = await clients[0].PurgeInstanceHistoryByTimePeriodPartial( + startDateTime, endDateTime, statuses, timeout: TimeSpan.FromMinutes(2)); + + Assert.AreEqual(true, secondResult.IsComplete, "Should complete with generous timeout"); + Assert.AreEqual(totalInstances, firstResult.InstancesDeleted + secondResult.InstancesDeleted, + "Total purged across both calls should equal total instances"); + } + else + { + // All instances purged in the first call (possible if storage is fast) + Assert.AreEqual(totalInstances, firstResult.InstancesDeleted); + } + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task PartialPurge_GenerousTimeout_CompletesAll() + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + DateTime startDateTime = DateTime.UtcNow; + + const int totalInstances = 5; + var clients = new List(); + for (int i = 0; i < totalInstances; i++) + { + string instanceId = $"full-purge-{Guid.NewGuid():N}"; + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.Factorial), 10, instanceId); + clients.Add(client); + } + + foreach (var client in clients) + { + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + } + + DateTime endDateTime = DateTime.UtcNow; + var statuses = new List { OrchestrationStatus.Completed }; + + // Generous timeout — should purge all instances and return complete + PurgeHistoryResult result = await clients[0].PurgeInstanceHistoryByTimePeriodPartial( + startDateTime, endDateTime, statuses, timeout: TimeSpan.FromMinutes(2)); + + Assert.AreEqual(totalInstances, result.InstancesDeleted); + Assert.AreEqual(true, result.IsComplete, "Should be complete with generous timeout"); + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task PartialPurge_WithoutTimeout_ReturnsNullIsComplete() + { + // Validates backward compatibility: existing callers that don't use maxInstanceCount + // get IsComplete = null (unknown), preserving old behavior. + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + DateTime startDateTime = DateTime.UtcNow; + + string instanceId = $"compat-purge-{Guid.NewGuid():N}"; + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.Factorial), 10, instanceId); + await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + + // Use the old API (no maxInstanceCount) — should still work exactly as before + await client.PurgeInstanceHistoryByTimePeriod( + startDateTime, DateTime.UtcNow, new List { OrchestrationStatus.Completed }); + + List historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); + Assert.AreEqual(0, historyEvents.Count, "History should be purged"); + + await host.StopAsync(); + } + } + [TestMethod] public async Task PurgeInstanceHistoryForTimePeriodDeletePartially() { diff --git a/test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs b/test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs index 92d018eed..7f6048b4a 100644 --- a/test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs +++ b/test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs @@ -200,6 +200,14 @@ public Task PurgeInstanceHistoryByTimePeriod(DateTime createdTimeFrom, DateTime? return service.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus); } + public Task PurgeInstanceHistoryByTimePeriodPartial(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan timeout) + { + Trace.TraceInformation($"Purging history (partial, timeout {timeout}) from {createdTimeFrom} to {createdTimeTo}"); + + AzureStorageOrchestrationService service = (AzureStorageOrchestrationService)this.client.ServiceClient; + return service.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus, timeout); + } + public async Task> GetOrchestrationHistoryAsync(string instanceId) { Trace.TraceInformation($"Getting history for instance with id - {this.instanceId}"); From 972357a8bb802b69bcb292cd6f9f233ba9e08e03 Mon Sep 17 00:00:00 2001 From: wangbill Date: Mon, 16 Mar 2026 14:01:25 -0700 Subject: [PATCH 6/8] Enforce 30s timeout cap and use effectiveToken for in-flight deletes - Always cap timeout to 30s max, even if not specified or exceeds 30s - Pass effectiveToken into DeleteAllDataForOrchestrationInstance so in-flight deletes are also cancelled on timeout - Catch OperationCanceledException from Task.WhenAll for timed-out in-flight deletes - External cancellationToken cancellation still propagates normally --- .../Tracking/AzureTableTrackingStore.cs | 44 +++++++++++-------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index fc83c6d6c..86b1ee663 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -573,15 +573,16 @@ async Task DeleteHistoryAsync( int instancesDeleted = 0; int rowsDeleted = 0; - // Create a timeout token that cancels after the specified duration. - // If no timeout is specified, use CancellationToken.None so behavior is unchanged. - using CancellationTokenSource timeoutCts = timeout.HasValue - ? new CancellationTokenSource(timeout.Value) - : null; - using CancellationTokenSource linkedCts = timeout.HasValue - ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token) - : null; - CancellationToken effectiveToken = linkedCts?.Token ?? cancellationToken; + // Cap the timeout to a maximum of 30 seconds. If no timeout is specified + // or the specified timeout exceeds 30s, default to 30s to prevent long-running purges. + TimeSpan maxTimeout = TimeSpan.FromSeconds(30); + TimeSpan effectiveTimeout = timeout.HasValue && timeout.Value <= maxTimeout + ? timeout.Value + : maxTimeout; + + using CancellationTokenSource timeoutCts = new CancellationTokenSource(effectiveTimeout); + using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token); + CancellationToken effectiveToken = linkedCts.Token; // Limit concurrent instance purges to avoid overwhelming storage with too many parallel operations. // Each instance purge internally spawns multiple parallel storage operations, so this should be @@ -606,7 +607,7 @@ async Task DeleteHistoryAsync( { try { - PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, cancellationToken); + PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, effectiveToken); Interlocked.Add(ref instancesDeleted, statisticsFromDeletion.InstancesDeleted); Interlocked.Add(ref storageRequests, statisticsFromDeletion.StorageRequests); Interlocked.Add(ref rowsDeleted, statisticsFromDeletion.RowsDeleted); @@ -619,20 +620,27 @@ async Task DeleteHistoryAsync( } } } - catch (OperationCanceledException) when (timeoutCts != null && timeoutCts.IsCancellationRequested) + catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested) { - // Timeout reached — stop accepting new instances but let already-dispatched ones finish. + // Timeout reached — stop accepting new instances. timedOut = true; } - // Wait for all dispatched deletions to finish before returning. - await Task.WhenAll(pendingTasks); + // Wait for all dispatched deletions to finish or be cancelled by the timeout. + try + { + await Task.WhenAll(pendingTasks); + } + catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested) + { + // In-flight deletes were cancelled by the timeout — expected. + timedOut = true; + } // Determine completion status: - // - If a timeout was specified and fired, more instances may remain (isComplete = false). - // - If a timeout was specified and didn't fire, all instances were purged (isComplete = true). - // - If no timeout was specified, we purge everything (isComplete = null for backward compat). - bool? isComplete = timeout.HasValue ? !timedOut : (bool?)null; + // - If the timeout fired, more instances may remain (isComplete = false). + // - If everything completed within the timeout, all instances were purged (isComplete = true). + bool? isComplete = !timedOut; return new PurgeHistoryResult(storageRequests, instancesDeleted, rowsDeleted, isComplete); } From 1ef227d4b4ca20b6fb288627dea92d6f2b43be92 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Wed, 18 Mar 2026 18:48:27 -0700 Subject: [PATCH 7/8] Enforce 30s purge timeout and return IsComplete - Hard-code 30s CancellationToken-based timeout in DeleteHistoryAsync - Remove configurable Timeout from PurgeInstanceFilter (not needed) - Remove timeout overload from AzureStorageOrchestrationService - IsComplete = true when all purged within 30s, false when timed out - Callers loop until IsComplete = true for large-scale purge --- .../AzureStorageOrchestrationService.cs | 23 +---- .../Tracking/AzureTableTrackingStore.cs | 16 ++-- .../Tracking/ITrackingStore.cs | 3 +- .../Tracking/TrackingStoreBase.cs | 2 +- src/DurableTask.Core/PurgeInstanceFilter.cs | 9 -- .../AzureStorageScenarioTests.cs | 96 +++---------------- .../TestOrchestrationClient.cs | 8 -- 7 files changed, 20 insertions(+), 137 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 68b263c46..c62748494 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -2030,21 +2030,6 @@ public Task PurgeInstanceHistoryAsync(DateTime createdTimeFr return this.trackingStore.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus); } - /// - /// Purge history for orchestrations that match the specified parameters, with a timeout. - /// Use this overload to perform partial purges and avoid timeouts when there are many instances. - /// Check to determine if more purging is needed. - /// - /// CreatedTime of orchestrations. Purges history grater than this value. - /// CreatedTime of orchestrations. Purges history less than this value. - /// RuntimeStatus of orchestrations. You can specify several status. - /// Maximum time to spend purging. Already-started deletions will complete before the method returns. - /// Class containing number of storage requests sent, along with instances and rows deleted/purged - public Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan timeout) - { - return this.trackingStore.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus, timeout); - } - /// async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(string instanceId) { @@ -2055,13 +2040,7 @@ async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync /// async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter) { - PurgeHistoryResult storagePurgeHistoryResult = purgeInstanceFilter.Timeout.HasValue - ? await this.PurgeInstanceHistoryAsync( - purgeInstanceFilter.CreatedTimeFrom, - purgeInstanceFilter.CreatedTimeTo, - purgeInstanceFilter.RuntimeStatus, - purgeInstanceFilter.Timeout.Value) - : await this.PurgeInstanceHistoryAsync( + PurgeHistoryResult storagePurgeHistoryResult = await this.PurgeInstanceHistoryAsync( purgeInstanceFilter.CreatedTimeFrom, purgeInstanceFilter.CreatedTimeTo, purgeInstanceFilter.RuntimeStatus); diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 86b1ee663..3c482ee92 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -557,7 +557,6 @@ async Task DeleteHistoryAsync( DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, - TimeSpan? timeout, CancellationToken cancellationToken) { var condition = OrchestrationInstanceStatusQueryCondition.Parse( @@ -573,14 +572,12 @@ async Task DeleteHistoryAsync( int instancesDeleted = 0; int rowsDeleted = 0; - // Cap the timeout to a maximum of 30 seconds. If no timeout is specified - // or the specified timeout exceeds 30s, default to 30s to prevent long-running purges. - TimeSpan maxTimeout = TimeSpan.FromSeconds(30); - TimeSpan effectiveTimeout = timeout.HasValue && timeout.Value <= maxTimeout - ? timeout.Value - : maxTimeout; + // Enforce a 30-second timeout to prevent long-running purge operations that can + // cause gRPC deadline timeouts in the isolated worker model. The caller should + // check IsComplete and loop if more instances remain. + TimeSpan purgeTimeout = TimeSpan.FromSeconds(30); - using CancellationTokenSource timeoutCts = new CancellationTokenSource(effectiveTimeout); + using CancellationTokenSource timeoutCts = new CancellationTokenSource(purgeTimeout); using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token); CancellationToken effectiveToken = linkedCts.Token; @@ -732,7 +729,6 @@ public override async Task PurgeInstanceHistoryAsync( DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, - TimeSpan? timeout = null, CancellationToken cancellationToken = default) { Stopwatch stopwatch = Stopwatch.StartNew(); @@ -742,7 +738,7 @@ public override async Task PurgeInstanceHistoryAsync( status == OrchestrationStatus.Canceled || status == OrchestrationStatus.Failed).ToList(); - PurgeHistoryResult result = await this.DeleteHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatusList, timeout, cancellationToken); + PurgeHistoryResult result = await this.DeleteHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatusList, cancellationToken); this.settings.Logger.PurgeInstanceHistory( this.storageAccountName, diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index 73f9583a8..a1fc52e9f 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -194,9 +194,8 @@ interface ITrackingStore /// Start creation time for querying instances for purging /// End creation time for querying instances for purging /// List of runtime status for querying instances for purging. Only Completed, Terminated, or Failed will be processed - /// Maximum time to spend purging. If null, all matching instances are purged. /// The token to monitor for cancellation requests. The default value is . /// Class containing number of storage requests sent, along with instances and rows deleted/purged - Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan? timeout = null, CancellationToken cancellationToken = default); + Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, CancellationToken cancellationToken = default); } } diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index 6eb14aeed..d02a729c0 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -86,7 +86,7 @@ public virtual Task PurgeInstanceHistoryAsync(string instanc } /// - public virtual Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan? timeout = null, CancellationToken cancellationToken = default) + public virtual Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, CancellationToken cancellationToken = default) { throw new NotSupportedException(); } diff --git a/src/DurableTask.Core/PurgeInstanceFilter.cs b/src/DurableTask.Core/PurgeInstanceFilter.cs index ab4fef6d9..30f68a23f 100644 --- a/src/DurableTask.Core/PurgeInstanceFilter.cs +++ b/src/DurableTask.Core/PurgeInstanceFilter.cs @@ -48,14 +48,5 @@ public PurgeInstanceFilter(DateTime createdTimeFrom, DateTime? createdTimeTo, IE /// The runtime status of the orchestrations to purge. /// public IEnumerable? RuntimeStatus { get; } - - /// - /// The maximum amount of time to spend purging instances in a single call. - /// If null, all matching instances are purged regardless of how long it takes. - /// When set, the purge stops accepting new instances after this duration elapses - /// and returns with set to false. - /// Already-started instance deletions will complete before the method returns. - /// - public TimeSpan? Timeout { get; set; } } } \ No newline at end of file diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index f9509dd57..cc3067233 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -716,19 +716,20 @@ public async Task PurgeInstance_WithManyHistoryRows_DeletesAll() } [TestMethod] - public async Task PartialPurge_TimesOutThenCompletesOnRetry() + public async Task PurgeReturnsIsComplete() { + // Validates that purge returns IsComplete = true when all instances are purged + // within the built-in 30-second timeout. using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) { await host.StartAsync(); DateTime startDateTime = DateTime.UtcNow; - // Create several orchestration instances const int totalInstances = 5; var clients = new List(); for (int i = 0; i < totalInstances; i++) { - string instanceId = $"partial-purge-{Guid.NewGuid():N}"; + string instanceId = $"purge-complete-{Guid.NewGuid():N}"; TestOrchestrationClient client = await host.StartOrchestrationAsync( typeof(Orchestrations.Factorial), 10, instanceId); clients.Add(client); @@ -743,93 +744,18 @@ public async Task PartialPurge_TimesOutThenCompletesOnRetry() DateTime endDateTime = DateTime.UtcNow; var statuses = new List { OrchestrationStatus.Completed }; - // Partial purge with a very short timeout — may not finish all instances - PurgeHistoryResult firstResult = await clients[0].PurgeInstanceHistoryByTimePeriodPartial( - startDateTime, endDateTime, statuses, timeout: TimeSpan.FromMilliseconds(1)); - - // With such a tiny timeout, IsComplete should be non-null (timeout was specified) - Assert.IsNotNull(firstResult.IsComplete, "IsComplete should be non-null when timeout is specified"); - - if (firstResult.IsComplete == false) - { - // More work to do — purge the rest with a generous timeout - PurgeHistoryResult secondResult = await clients[0].PurgeInstanceHistoryByTimePeriodPartial( - startDateTime, endDateTime, statuses, timeout: TimeSpan.FromMinutes(2)); - - Assert.AreEqual(true, secondResult.IsComplete, "Should complete with generous timeout"); - Assert.AreEqual(totalInstances, firstResult.InstancesDeleted + secondResult.InstancesDeleted, - "Total purged across both calls should equal total instances"); - } - else - { - // All instances purged in the first call (possible if storage is fast) - Assert.AreEqual(totalInstances, firstResult.InstancesDeleted); - } - - await host.StopAsync(); - } - } - - [TestMethod] - public async Task PartialPurge_GenerousTimeout_CompletesAll() - { - using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) - { - await host.StartAsync(); - DateTime startDateTime = DateTime.UtcNow; - - const int totalInstances = 5; - var clients = new List(); - for (int i = 0; i < totalInstances; i++) - { - string instanceId = $"full-purge-{Guid.NewGuid():N}"; - TestOrchestrationClient client = await host.StartOrchestrationAsync( - typeof(Orchestrations.Factorial), 10, instanceId); - clients.Add(client); - } + // Purge should complete within the 30s built-in timeout for a small number of instances + await clients[0].PurgeInstanceHistoryByTimePeriod( + startDateTime, endDateTime, statuses); + // Verify all history is purged foreach (var client in clients) { - var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); - Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + List historyEvents = await client.GetOrchestrationHistoryAsync( + client.InstanceId); + Assert.AreEqual(0, historyEvents.Count, "History should be purged"); } - DateTime endDateTime = DateTime.UtcNow; - var statuses = new List { OrchestrationStatus.Completed }; - - // Generous timeout — should purge all instances and return complete - PurgeHistoryResult result = await clients[0].PurgeInstanceHistoryByTimePeriodPartial( - startDateTime, endDateTime, statuses, timeout: TimeSpan.FromMinutes(2)); - - Assert.AreEqual(totalInstances, result.InstancesDeleted); - Assert.AreEqual(true, result.IsComplete, "Should be complete with generous timeout"); - - await host.StopAsync(); - } - } - - [TestMethod] - public async Task PartialPurge_WithoutTimeout_ReturnsNullIsComplete() - { - // Validates backward compatibility: existing callers that don't use maxInstanceCount - // get IsComplete = null (unknown), preserving old behavior. - using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) - { - await host.StartAsync(); - DateTime startDateTime = DateTime.UtcNow; - - string instanceId = $"compat-purge-{Guid.NewGuid():N}"; - TestOrchestrationClient client = await host.StartOrchestrationAsync( - typeof(Orchestrations.Factorial), 10, instanceId); - await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); - - // Use the old API (no maxInstanceCount) — should still work exactly as before - await client.PurgeInstanceHistoryByTimePeriod( - startDateTime, DateTime.UtcNow, new List { OrchestrationStatus.Completed }); - - List historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); - Assert.AreEqual(0, historyEvents.Count, "History should be purged"); - await host.StopAsync(); } } diff --git a/test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs b/test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs index 7f6048b4a..92d018eed 100644 --- a/test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs +++ b/test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs @@ -200,14 +200,6 @@ public Task PurgeInstanceHistoryByTimePeriod(DateTime createdTimeFrom, DateTime? return service.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus); } - public Task PurgeInstanceHistoryByTimePeriodPartial(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan timeout) - { - Trace.TraceInformation($"Purging history (partial, timeout {timeout}) from {createdTimeFrom} to {createdTimeTo}"); - - AzureStorageOrchestrationService service = (AzureStorageOrchestrationService)this.client.ServiceClient; - return service.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus, timeout); - } - public async Task> GetOrchestrationHistoryAsync(string instanceId) { Trace.TraceInformation($"Getting history for instance with id - {this.instanceId}"); From 0362969ca9212a792413afab21096ef9b1497e7f Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Wed, 18 Mar 2026 22:38:12 -0700 Subject: [PATCH 8/8] Add opt-in Timeout on PurgeInstanceFilter for partial purge - Add TimeSpan? Timeout property to PurgeInstanceFilter (opt-in, default null) - When null: unbounded purge, IsComplete=null (backward compat, no behavior change) - When set: CancellationToken-based timeout, IsComplete=true/false - Thread Timeout through IOrchestrationServicePurgeClient path - Zero breaking changes: existing callers unaffected --- .../AzureStorageOrchestrationService.cs | 5 +-- .../Tracking/AzureTableTrackingStore.cs | 32 +++++++++++-------- .../Tracking/ITrackingStore.cs | 3 +- .../Tracking/TrackingStoreBase.cs | 2 +- src/DurableTask.Core/PurgeInstanceFilter.cs | 9 ++++++ 5 files changed, 33 insertions(+), 18 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index c62748494..e26b45451 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -2040,10 +2040,11 @@ async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync /// async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter) { - PurgeHistoryResult storagePurgeHistoryResult = await this.PurgeInstanceHistoryAsync( + PurgeHistoryResult storagePurgeHistoryResult = await this.trackingStore.PurgeInstanceHistoryAsync( purgeInstanceFilter.CreatedTimeFrom, purgeInstanceFilter.CreatedTimeTo, - purgeInstanceFilter.RuntimeStatus); + purgeInstanceFilter.RuntimeStatus, + purgeInstanceFilter.Timeout); return storagePurgeHistoryResult.ToCorePurgeHistoryResult(); } #nullable enable diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 3c482ee92..b1c3791ab 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -557,6 +557,7 @@ async Task DeleteHistoryAsync( DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, + TimeSpan? timeout, CancellationToken cancellationToken) { var condition = OrchestrationInstanceStatusQueryCondition.Parse( @@ -572,14 +573,15 @@ async Task DeleteHistoryAsync( int instancesDeleted = 0; int rowsDeleted = 0; - // Enforce a 30-second timeout to prevent long-running purge operations that can - // cause gRPC deadline timeouts in the isolated worker model. The caller should - // check IsComplete and loop if more instances remain. - TimeSpan purgeTimeout = TimeSpan.FromSeconds(30); - - using CancellationTokenSource timeoutCts = new CancellationTokenSource(purgeTimeout); - using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token); - CancellationToken effectiveToken = linkedCts.Token; + // Create a timeout CTS if a timeout was specified. When no timeout is specified, + // the purge runs unbounded (original behavior). + using CancellationTokenSource timeoutCts = timeout.HasValue + ? new CancellationTokenSource(timeout.Value) + : null; + using CancellationTokenSource linkedCts = timeout.HasValue + ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token) + : null; + CancellationToken effectiveToken = linkedCts?.Token ?? cancellationToken; // Limit concurrent instance purges to avoid overwhelming storage with too many parallel operations. // Each instance purge internally spawns multiple parallel storage operations, so this should be @@ -617,7 +619,7 @@ async Task DeleteHistoryAsync( } } } - catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested) + catch (OperationCanceledException) when (timeoutCts != null && timeoutCts.IsCancellationRequested) { // Timeout reached — stop accepting new instances. timedOut = true; @@ -628,16 +630,17 @@ async Task DeleteHistoryAsync( { await Task.WhenAll(pendingTasks); } - catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested) + catch (OperationCanceledException) when (timeoutCts != null && timeoutCts.IsCancellationRequested) { // In-flight deletes were cancelled by the timeout — expected. timedOut = true; } // Determine completion status: - // - If the timeout fired, more instances may remain (isComplete = false). - // - If everything completed within the timeout, all instances were purged (isComplete = true). - bool? isComplete = !timedOut; + // - If a timeout was specified and fired, more instances may remain (isComplete = false). + // - If a timeout was specified and didn't fire, all instances were purged (isComplete = true). + // - If no timeout was specified, we purge everything (isComplete = null for backward compat). + bool? isComplete = timeout.HasValue ? !timedOut : (bool?)null; return new PurgeHistoryResult(storageRequests, instancesDeleted, rowsDeleted, isComplete); } @@ -729,6 +732,7 @@ public override async Task PurgeInstanceHistoryAsync( DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, + TimeSpan? timeout = null, CancellationToken cancellationToken = default) { Stopwatch stopwatch = Stopwatch.StartNew(); @@ -738,7 +742,7 @@ public override async Task PurgeInstanceHistoryAsync( status == OrchestrationStatus.Canceled || status == OrchestrationStatus.Failed).ToList(); - PurgeHistoryResult result = await this.DeleteHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatusList, cancellationToken); + PurgeHistoryResult result = await this.DeleteHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatusList, timeout, cancellationToken); this.settings.Logger.PurgeInstanceHistory( this.storageAccountName, diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index a1fc52e9f..263f329f7 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -194,8 +194,9 @@ interface ITrackingStore /// Start creation time for querying instances for purging /// End creation time for querying instances for purging /// List of runtime status for querying instances for purging. Only Completed, Terminated, or Failed will be processed + /// Maximum time to spend purging. If null, all matching instances are purged with no time limit. /// The token to monitor for cancellation requests. The default value is . /// Class containing number of storage requests sent, along with instances and rows deleted/purged - Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, CancellationToken cancellationToken = default); + Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan? timeout = null, CancellationToken cancellationToken = default); } } diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index d02a729c0..6eb14aeed 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -86,7 +86,7 @@ public virtual Task PurgeInstanceHistoryAsync(string instanc } /// - public virtual Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, CancellationToken cancellationToken = default) + public virtual Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan? timeout = null, CancellationToken cancellationToken = default) { throw new NotSupportedException(); } diff --git a/src/DurableTask.Core/PurgeInstanceFilter.cs b/src/DurableTask.Core/PurgeInstanceFilter.cs index 30f68a23f..dcad059e8 100644 --- a/src/DurableTask.Core/PurgeInstanceFilter.cs +++ b/src/DurableTask.Core/PurgeInstanceFilter.cs @@ -48,5 +48,14 @@ public PurgeInstanceFilter(DateTime createdTimeFrom, DateTime? createdTimeTo, IE /// The runtime status of the orchestrations to purge. /// public IEnumerable? RuntimeStatus { get; } + + /// + /// The maximum amount of time to spend purging instances in a single call. + /// If null (default), all matching instances are purged with no time limit. + /// When set, the purge stops accepting new instances after this duration elapses + /// and returns with set to false. + /// Already-started instance deletions will complete before the method returns. + /// + public TimeSpan? Timeout { get; set; } } } \ No newline at end of file