diff --git a/Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs b/Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs new file mode 100644 index 000000000..c87180758 --- /dev/null +++ b/Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs @@ -0,0 +1,303 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- +#nullable enable +namespace DurableTask.AzureStorage.Tests.Storage +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using Azure; + using Azure.Data.Tables; + using DurableTask.AzureStorage.Storage; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class TableDeleteBatchParallelTests + { + const string ConnectionString = "UseDevelopmentStorage=true"; + const string TableName = "TestTable"; + + [TestMethod] + public async Task DeleteBatchParallelAsync_EmptyBatch_ReturnsEmptyResults() + { + Table table = CreateTableWithMockedClient(out _, out _); + var entities = new List(); + + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); + + Assert.AreEqual(0, results.Responses.Count); + Assert.AreEqual(0, results.RequestCount); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_SingleBatch_SubmitsOneTransaction() + { + var entities = CreateTestEntities("pk", count: 50); + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.Is>(a => a.Count() == 50), + It.IsAny())) + .ReturnsAsync(CreateMockBatchResponse(50)); + + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); + + Assert.AreEqual(50, results.Responses.Count); + tableClient.Verify( + t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), + Times.Once); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_MultipleBatches_SplitsIntoChunksOf100() + { + var entities = CreateTestEntities("pk", count: 250); + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .ReturnsAsync((IEnumerable batch, CancellationToken _) => + CreateMockBatchResponse(batch.Count())); + + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); + + Assert.AreEqual(250, results.Responses.Count); + tableClient.Verify( + t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), + Times.Exactly(3)); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_SubmitsBatchesConcurrently() + { + var entities = CreateTestEntities("pk", count: 500); // 5 batches of 100 + int concurrentCount = 0; + int maxConcurrent = 0; + + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .Returns(async (IEnumerable batch, CancellationToken _) => + { + int current = Interlocked.Increment(ref concurrentCount); + int snapshot; + do + { + snapshot = Volatile.Read(ref maxConcurrent); + } + while (current > snapshot && Interlocked.CompareExchange(ref maxConcurrent, current, snapshot) != snapshot); + + await Task.Delay(50); + Interlocked.Decrement(ref concurrentCount); + + return CreateMockBatchResponse(batch.Count()); + }); + + await table.DeleteBatchParallelAsync(entities); + + // All 5 batches should run concurrently since there's no internal semaphore + Assert.IsTrue( + maxConcurrent > 1, + $"Expected concurrent execution, but max concurrent was {maxConcurrent}"); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_BatchFails404_FallsBackToIndividualDeletes() + { + var entities = CreateTestEntities("pk", count: 3); + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .ThrowsAsync(new RequestFailedException(404, "Entity not found")); + + var mockResponse = new Mock(); + tableClient + .Setup(t => t.DeleteEntityAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(mockResponse.Object); + + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); + + Assert.AreEqual(3, results.Responses.Count); + tableClient.Verify( + t => t.DeleteEntityAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), + Times.Exactly(3)); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_IndividualDeleteSkips404() + { + var entities = CreateTestEntities("pk", count: 3); + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .ThrowsAsync(new RequestFailedException(404, "Entity not found")); + + int callCount = 0; + var mockResponse = new Mock(); + tableClient + .Setup(t => t.DeleteEntityAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns((string pk, string rk, ETag ifMatch, CancellationToken ct) => + { + int call = Interlocked.Increment(ref callCount); + if (call == 2) + { + throw new RequestFailedException(404, "Entity already deleted"); + } + return Task.FromResult(mockResponse.Object); + }); + + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); + + Assert.AreEqual(2, results.Responses.Count); + Assert.AreEqual(3, results.RequestCount); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_ExactlyOneBatch_NoBoundaryIssues() + { + var entities = CreateTestEntities("pk", count: 100); + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.Is>(a => a.Count() == 100), + It.IsAny())) + .ReturnsAsync(CreateMockBatchResponse(100)); + + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); + + Assert.AreEqual(100, results.Responses.Count); + tableClient.Verify( + t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), + Times.Once); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_101Entities_CreatesTwoBatches() + { + var entities = CreateTestEntities("pk", count: 101); + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .ReturnsAsync((IEnumerable batch, CancellationToken _) => + CreateMockBatchResponse(batch.Count())); + + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); + + Assert.AreEqual(101, results.Responses.Count); + tableClient.Verify( + t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), + Times.Exactly(2)); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_CancellationToken_IsPropagated() + { + var entities = CreateTestEntities("pk", count: 200); + using var cts = new CancellationTokenSource(); + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + int batchesSubmitted = 0; + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .Returns(async (IEnumerable batch, CancellationToken ct) => + { + int count = Interlocked.Increment(ref batchesSubmitted); + if (count == 1) + { + cts.Cancel(); + } + ct.ThrowIfCancellationRequested(); + return CreateMockBatchResponse(batch.Count()); + }); + + await Assert.ThrowsExceptionAsync( + () => table.DeleteBatchParallelAsync(entities, cts.Token)); + } + + #region Helper Methods + + static Table CreateTableWithMockedClient( + out Mock tableServiceClient, + out Mock tableClient) + { + var settings = new AzureStorageOrchestrationServiceSettings + { + StorageAccountClientProvider = new StorageAccountClientProvider(ConnectionString), + }; + + var azureStorageClient = new AzureStorageClient(settings); + + tableServiceClient = new Mock(MockBehavior.Strict, ConnectionString); + tableClient = new Mock(MockBehavior.Loose, ConnectionString, TableName); + tableClient.Setup(t => t.Name).Returns(TableName); + tableServiceClient.Setup(t => t.GetTableClient(TableName)).Returns(tableClient.Object); + + return new Table(azureStorageClient, tableServiceClient.Object, TableName); + } + + static List CreateTestEntities(string partitionKey, int count) + { + var entities = new List(count); + for (int i = 0; i < count; i++) + { + entities.Add(new TableEntity(partitionKey, $"rk_{i:D5}") + { + ETag = ETag.All, + }); + } + return entities; + } + + static Response> CreateMockBatchResponse(int count) + { + var responses = new List(); + for (int i = 0; i < count; i++) + { + responses.Add(new Mock().Object); + } + return Response.FromValue>(responses, new Mock().Object); + } + + #endregion + } +} diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 9ad814c43..e26b45451 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -2040,10 +2040,11 @@ async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync /// async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter) { - PurgeHistoryResult storagePurgeHistoryResult = await this.PurgeInstanceHistoryAsync( - purgeInstanceFilter.CreatedTimeFrom, - purgeInstanceFilter.CreatedTimeTo, - purgeInstanceFilter.RuntimeStatus); + PurgeHistoryResult storagePurgeHistoryResult = await this.trackingStore.PurgeInstanceHistoryAsync( + purgeInstanceFilter.CreatedTimeFrom, + purgeInstanceFilter.CreatedTimeTo, + purgeInstanceFilter.RuntimeStatus, + purgeInstanceFilter.Timeout); return storagePurgeHistoryResult.ToCorePurgeHistoryResult(); } #nullable enable diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index ff48d507f..48f46ffac 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -318,8 +318,9 @@ public string GetNewLargeMessageBlobName(MessageData message) public async Task DeleteLargeMessageBlobs(string sanitizedInstanceId, CancellationToken cancellationToken = default) { - int storageOperationCount = 1; - if (await this.blobContainer.ExistsAsync(cancellationToken)) + int storageOperationCount = 0; + + try { await foreach (Page page in this.blobContainer.ListBlobsAsync(sanitizedInstanceId, cancellationToken).AsPages()) { @@ -329,6 +330,22 @@ public async Task DeleteLargeMessageBlobs(string sanitizedInstanceId, Cance storageOperationCount += page.Values.Count; } + + // Count the list operation even if no blobs found (the initial list request still happened) + if (storageOperationCount == 0) + { + storageOperationCount = 1; + } + } + catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404) + { + // Container does not exist; nothing to delete. + storageOperationCount = 1; + } + catch (Azure.RequestFailedException ex) when (ex.Status == 404) + { + // Container does not exist; nothing to delete. + storageOperationCount = 1; } return storageOperationCount; diff --git a/src/DurableTask.AzureStorage/PurgeHistoryResult.cs b/src/DurableTask.AzureStorage/PurgeHistoryResult.cs index d42ce9d45..3ed7b3e18 100644 --- a/src/DurableTask.AzureStorage/PurgeHistoryResult.cs +++ b/src/DurableTask.AzureStorage/PurgeHistoryResult.cs @@ -33,6 +33,19 @@ public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDel this.RowsDeleted = rowsDeleted; } + /// + /// Constructor for purge history statistics with completion status. + /// + /// Requests sent to storage + /// Number of instances deleted + /// Number of rows deleted + /// Whether the purge operation completed all matching instances. + public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDeleted, bool? isComplete) + : this(storageRequests, instancesDeleted, rowsDeleted) + { + this.IsComplete = isComplete; + } + /// /// Number of requests sent to Storage during this execution of purge history /// @@ -48,12 +61,20 @@ public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDel /// public int RowsDeleted { get; } + /// + /// Gets a value indicating whether the purge operation is complete. + /// true if all matching instances were purged; + /// false if more instances remain and purge should be called again; + /// null if completion status is unknown. + /// + public bool? IsComplete { get; } + /// /// Converts from AzureStorage.PurgeHistoryResult to Core.PurgeResult type /// public PurgeResult ToCorePurgeHistoryResult() { - return new PurgeResult(this.InstancesDeleted); + return new PurgeResult(this.InstancesDeleted, this.IsComplete); } } } diff --git a/src/DurableTask.AzureStorage/Storage/Table.cs b/src/DurableTask.AzureStorage/Storage/Table.cs index 7d726e1d7..33542c64e 100644 --- a/src/DurableTask.AzureStorage/Storage/Table.cs +++ b/src/DurableTask.AzureStorage/Storage/Table.cs @@ -117,6 +117,114 @@ public async Task DeleteBatchAsync(IEnumerable en return await this.ExecuteBatchAsync(entityBatch, item => new TableTransactionAction(TableTransactionActionType.Delete, item), cancellationToken: cancellationToken); } + /// + /// Deletes entities in parallel batches of up to 100. Each batch is an atomic transaction, + /// but multiple batches are submitted concurrently for improved throughput. + /// Concurrency is controlled by the global . + /// If a batch fails because an entity was already deleted (404/EntityNotFound), + /// it falls back to individual deletes for that batch, skipping already-deleted entities. + /// + public async Task DeleteBatchParallelAsync( + IReadOnlyList entityBatch, + CancellationToken cancellationToken = default) where T : ITableEntity + { + if (entityBatch.Count == 0) + { + return new TableTransactionResults(Array.Empty(), TimeSpan.Zero, 0); + } + + const int batchSize = 100; + int chunkCount = (entityBatch.Count + batchSize - 1) / batchSize; + var chunks = new List>(chunkCount); + + var currentChunk = new List(batchSize); + foreach (T entity in entityBatch) + { + currentChunk.Add(new TableTransactionAction(TableTransactionActionType.Delete, entity)); + if (currentChunk.Count == batchSize) + { + chunks.Add(currentChunk); + currentChunk = new List(batchSize); + } + } + + if (currentChunk.Count > 0) + { + chunks.Add(currentChunk); + } + + var resultsBuilder = new TableTransactionResultsBuilder(); + + TableTransactionResults[] allResults = await Task.WhenAll( + chunks.Select(chunk => this.ExecuteBatchWithFallbackAsync(chunk, cancellationToken))); + + foreach (TableTransactionResults result in allResults) + { + resultsBuilder.Add(result); + } + + return resultsBuilder.ToResults(); + } + + /// + /// Executes a batch transaction. If it fails due to an entity not found (404), + /// falls back to individual delete operations, skipping entities that are already gone. + /// + async Task ExecuteBatchWithFallbackAsync( + List batch, + CancellationToken cancellationToken) + { + try + { + return await this.ExecuteBatchAsync(batch, cancellationToken); + } + catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404) + { + // One or more entities in the batch were already deleted. + // Fall back to individual deletes, skipping 404s. + return await this.DeleteEntitiesIndividuallyAsync(batch, cancellationToken); + } + catch (RequestFailedException ex) when (ex.Status == 404) + { + return await this.DeleteEntitiesIndividuallyAsync(batch, cancellationToken); + } + } + + async Task DeleteEntitiesIndividuallyAsync( + List batch, + CancellationToken cancellationToken) + { + var responses = new List(); + var stopwatch = Stopwatch.StartNew(); + int requestCount = 0; + + foreach (TableTransactionAction action in batch) + { + requestCount++; + try + { + Response response = await this.tableClient.DeleteEntityAsync( + action.Entity.PartitionKey, + action.Entity.RowKey, + ETag.All, + cancellationToken).DecorateFailure(); + responses.Add(response); + this.stats.TableEntitiesWritten.Increment(); + } + catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404) + { + // Entity already deleted; skip. + } + catch (RequestFailedException ex) when (ex.Status == 404) + { + // Entity already deleted; skip. + } + } + + stopwatch.Stop(); + return new TableTransactionResults(responses, stopwatch.Elapsed, requestCount); + } + public async Task InsertOrMergeBatchAsync(IEnumerable entityBatch, CancellationToken cancellationToken = default) where T : ITableEntity { TableTransactionResults results = await this.ExecuteBatchAsync(entityBatch, item => new TableTransactionAction(TableTransactionActionType.UpsertMerge, item), cancellationToken: cancellationToken); diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index f1e67a0bf..b1c3791ab 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -557,6 +557,7 @@ async Task DeleteHistoryAsync( DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, + TimeSpan? timeout, CancellationToken cancellationToken) { var condition = OrchestrationInstanceStatusQueryCondition.Parse( @@ -568,26 +569,80 @@ async Task DeleteHistoryAsync( ODataCondition odata = condition.ToOData(); - // Limit to batches of 100 to avoid excessive memory usage and table storage scanning int storageRequests = 0; int instancesDeleted = 0; int rowsDeleted = 0; - var options = new ParallelOptions { MaxDegreeOfParallelism = this.settings.MaxStorageOperationConcurrency }; - AsyncPageable entitiesPageable = this.InstancesTable.ExecuteQueryAsync(odata.Filter, select: odata.Select, cancellationToken: cancellationToken); - await foreach (Page page in entitiesPageable.AsPages(pageSizeHint: 100)) + // Create a timeout CTS if a timeout was specified. When no timeout is specified, + // the purge runs unbounded (original behavior). + using CancellationTokenSource timeoutCts = timeout.HasValue + ? new CancellationTokenSource(timeout.Value) + : null; + using CancellationTokenSource linkedCts = timeout.HasValue + ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token) + : null; + CancellationToken effectiveToken = linkedCts?.Token ?? cancellationToken; + + // Limit concurrent instance purges to avoid overwhelming storage with too many parallel operations. + // Each instance purge internally spawns multiple parallel storage operations, so this should be + // kept moderate. Using 100 to match the original implicit concurrency from pageSizeHint. + const int MaxPurgeInstanceConcurrency = 100; + var throttle = new SemaphoreSlim(MaxPurgeInstanceConcurrency); + var pendingTasks = new List(); + + bool timedOut = false; + + try { - // The underlying client throttles - await Task.WhenAll(page.Values.Select(async instance => + AsyncPageable entitiesPageable = this.InstancesTable.ExecuteQueryAsync(odata.Filter, select: odata.Select, cancellationToken: effectiveToken); + await foreach (Page page in entitiesPageable.AsPages(pageSizeHint: MaxPurgeInstanceConcurrency)) { - PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, cancellationToken); - Interlocked.Add(ref instancesDeleted, statisticsFromDeletion.InstancesDeleted); - Interlocked.Add(ref storageRequests, statisticsFromDeletion.RowsDeleted); - Interlocked.Add(ref rowsDeleted, statisticsFromDeletion.RowsDeleted); - })); + foreach (OrchestrationInstanceStatus instance in page.Values) + { + effectiveToken.ThrowIfCancellationRequested(); + + await throttle.WaitAsync(effectiveToken); + pendingTasks.Add(Task.Run(async () => + { + try + { + PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, effectiveToken); + Interlocked.Add(ref instancesDeleted, statisticsFromDeletion.InstancesDeleted); + Interlocked.Add(ref storageRequests, statisticsFromDeletion.StorageRequests); + Interlocked.Add(ref rowsDeleted, statisticsFromDeletion.RowsDeleted); + } + finally + { + throttle.Release(); + } + })); + } + } + } + catch (OperationCanceledException) when (timeoutCts != null && timeoutCts.IsCancellationRequested) + { + // Timeout reached — stop accepting new instances. + timedOut = true; + } + + // Wait for all dispatched deletions to finish or be cancelled by the timeout. + try + { + await Task.WhenAll(pendingTasks); } + catch (OperationCanceledException) when (timeoutCts != null && timeoutCts.IsCancellationRequested) + { + // In-flight deletes were cancelled by the timeout — expected. + timedOut = true; + } + + // Determine completion status: + // - If a timeout was specified and fired, more instances may remain (isComplete = false). + // - If a timeout was specified and didn't fire, all instances were purged (isComplete = true). + // - If no timeout was specified, we purge everything (isComplete = null for backward compat). + bool? isComplete = timeout.HasValue ? !timedOut : (bool?)null; - return new PurgeHistoryResult(storageRequests, instancesDeleted, rowsDeleted); + return new PurgeHistoryResult(storageRequests, instancesDeleted, rowsDeleted, isComplete); } async Task DeleteAllDataForOrchestrationInstance(OrchestrationInstanceStatus orchestrationInstanceStatus, CancellationToken cancellationToken) @@ -618,7 +673,7 @@ async Task DeleteAllDataForOrchestrationInstance(Orchestrati }), Task.Run(async () => { - var deletedEntitiesResponseInfo = await this.HistoryTable.DeleteBatchAsync(historyEntities, cancellationToken); + var deletedEntitiesResponseInfo = await this.HistoryTable.DeleteBatchParallelAsync(historyEntities, cancellationToken); Interlocked.Add(ref rowsDeleted, deletedEntitiesResponseInfo.Responses.Count); Interlocked.Add(ref storageRequests, deletedEntitiesResponseInfo.RequestCount); }), @@ -677,6 +732,7 @@ public override async Task PurgeInstanceHistoryAsync( DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, + TimeSpan? timeout = null, CancellationToken cancellationToken = default) { Stopwatch stopwatch = Stopwatch.StartNew(); @@ -686,7 +742,7 @@ public override async Task PurgeInstanceHistoryAsync( status == OrchestrationStatus.Canceled || status == OrchestrationStatus.Failed).ToList(); - PurgeHistoryResult result = await this.DeleteHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatusList, cancellationToken); + PurgeHistoryResult result = await this.DeleteHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatusList, timeout, cancellationToken); this.settings.Logger.PurgeInstanceHistory( this.storageAccountName, diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index a1fc52e9f..263f329f7 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -194,8 +194,9 @@ interface ITrackingStore /// Start creation time for querying instances for purging /// End creation time for querying instances for purging /// List of runtime status for querying instances for purging. Only Completed, Terminated, or Failed will be processed + /// Maximum time to spend purging. If null, all matching instances are purged with no time limit. /// The token to monitor for cancellation requests. The default value is . /// Class containing number of storage requests sent, along with instances and rows deleted/purged - Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, CancellationToken cancellationToken = default); + Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan? timeout = null, CancellationToken cancellationToken = default); } } diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index d02a729c0..6eb14aeed 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -86,7 +86,7 @@ public virtual Task PurgeInstanceHistoryAsync(string instanc } /// - public virtual Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, CancellationToken cancellationToken = default) + public virtual Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan? timeout = null, CancellationToken cancellationToken = default) { throw new NotSupportedException(); } diff --git a/src/DurableTask.Core/PurgeInstanceFilter.cs b/src/DurableTask.Core/PurgeInstanceFilter.cs index 30f68a23f..dcad059e8 100644 --- a/src/DurableTask.Core/PurgeInstanceFilter.cs +++ b/src/DurableTask.Core/PurgeInstanceFilter.cs @@ -48,5 +48,14 @@ public PurgeInstanceFilter(DateTime createdTimeFrom, DateTime? createdTimeTo, IE /// The runtime status of the orchestrations to purge. /// public IEnumerable? RuntimeStatus { get; } + + /// + /// The maximum amount of time to spend purging instances in a single call. + /// If null (default), all matching instances are purged with no time limit. + /// When set, the purge stops accepting new instances after this duration elapses + /// and returns with set to false. + /// Already-started instance deletions will complete before the method returns. + /// + public TimeSpan? Timeout { get; set; } } } \ No newline at end of file diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index 3a48ee5ff..cc3067233 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -561,6 +561,205 @@ private async Task GetBlobCount(string containerName, string directoryName) } + [TestMethod] + public async Task PurgeMultipleInstancesHistoryByTimePeriod_ScalabilityValidation() + { + // This test validates scale improvements: parallel batch delete and pipelined page processing. + // Runs multiple concurrent orchestrations, then purges all of them by time period. + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + DateTime startDateTime = DateTime.Now; + + // Create multiple orchestration instances concurrently + const int instanceCount = 5; + var clients = new List(); + var instanceIds = new List(); + + for (int i = 0; i < instanceCount; i++) + { + string instanceId = $"purge-scale-{Guid.NewGuid():N}"; + instanceIds.Add(instanceId); + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.Factorial), 10, instanceId); + clients.Add(client); + } + + // Wait for all orchestrations to complete + foreach (var client in clients) + { + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + } + + // Verify all instances have history + foreach (string instanceId in instanceIds) + { + List historyEvents = await clients[0].GetOrchestrationHistoryAsync(instanceId); + Assert.IsTrue(historyEvents.Count > 0, $"Instance {instanceId} should have history events"); + } + + // Purge all instances by time period + await clients[0].PurgeInstanceHistoryByTimePeriod( + startDateTime, + DateTime.UtcNow, + new List { OrchestrationStatus.Completed }); + + // Verify all history is purged + foreach (string instanceId in instanceIds) + { + List historyEvents = await clients[0].GetOrchestrationHistoryAsync(instanceId); + Assert.AreEqual(0, historyEvents.Count, $"Instance {instanceId} should have no history after purge"); + + IList stateList = await clients[0].GetStateAsync(instanceId); + Assert.AreEqual(1, stateList.Count); + Assert.IsNull(stateList[0], $"Instance {instanceId} state should be null after purge"); + } + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task PurgeSingleInstanceWithIdempotency() + { + // This test validates that purging the same instance twice doesn't cause errors + // (testing the idempotent batch delete fallback). + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + string instanceId = Guid.NewGuid().ToString(); + await host.StartAsync(); + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.Factorial), 110, instanceId); + await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + + // First purge should succeed + await client.PurgeInstanceHistory(); + + List historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); + Assert.AreEqual(0, historyEvents.Count); + + // Second purge of the same instance should not throw + // (the instance row is already gone, so PurgeInstanceHistoryAsync returns 0) + await client.PurgeInstanceHistory(); + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task PurgeSingleInstance_WithLargeBlobs_CleansUpBlobs() + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + + string instanceId = Guid.NewGuid().ToString(); + // Generate a payload large enough to be stored as a blob (>60KB threshold) + string largeMessage = new string('x', 70 * 1024); + + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.Echo), largeMessage, instanceId); + OrchestrationState status = await client.WaitForCompletionAsync(TimeSpan.FromMinutes(2)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + + // Verify blobs exist before purge + int blobCount = await this.GetBlobCount("test-largemessages", instanceId); + Assert.IsTrue(blobCount > 0, "Should have large message blobs before purge"); + + // Purge + await client.PurgeInstanceHistory(); + + // Verify blobs are cleaned up + blobCount = await this.GetBlobCount("test-largemessages", instanceId); + Assert.AreEqual(0, blobCount, "All large message blobs should be deleted after purge"); + + // Verify history is gone + List historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); + Assert.AreEqual(0, historyEvents.Count); + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task PurgeInstance_WithManyHistoryRows_DeletesAll() + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + + string instanceId = Guid.NewGuid().ToString(); + // FanOutFanIn with 50 parallel activities creates 100+ history rows + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.FanOutFanIn), 50, instanceId); + OrchestrationState status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + + // Verify lots of history exists + List historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); + Assert.IsTrue(historyEvents.Count > 50, $"Expected many history events, got {historyEvents.Count}"); + + // Purge + await client.PurgeInstanceHistory(); + + // Verify clean + historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); + Assert.AreEqual(0, historyEvents.Count); + + IList stateList = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, stateList.Count); + Assert.IsNull(stateList[0]); + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task PurgeReturnsIsComplete() + { + // Validates that purge returns IsComplete = true when all instances are purged + // within the built-in 30-second timeout. + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + DateTime startDateTime = DateTime.UtcNow; + + const int totalInstances = 5; + var clients = new List(); + for (int i = 0; i < totalInstances; i++) + { + string instanceId = $"purge-complete-{Guid.NewGuid():N}"; + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.Factorial), 10, instanceId); + clients.Add(client); + } + + foreach (var client in clients) + { + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + } + + DateTime endDateTime = DateTime.UtcNow; + var statuses = new List { OrchestrationStatus.Completed }; + + // Purge should complete within the 30s built-in timeout for a small number of instances + await clients[0].PurgeInstanceHistoryByTimePeriod( + startDateTime, endDateTime, statuses); + + // Verify all history is purged + foreach (var client in clients) + { + List historyEvents = await client.GetOrchestrationHistoryAsync( + client.InstanceId); + Assert.AreEqual(0, historyEvents.Count, "History should be purged"); + } + + await host.StopAsync(); + } + } + [TestMethod] public async Task PurgeInstanceHistoryForTimePeriodDeletePartially() {