Enhance purge with parallel batch deletes and partial purge timeout#1321
Enhance purge with parallel batch deletes and partial purge timeout#1321YunchuWang wants to merge 9 commits intomainfrom
Conversation
- Add TimeSpan? Timeout to PurgeInstanceFilter for partial purge support - Add bool? IsComplete to PurgeHistoryResult to indicate completion status - Add new PurgeInstanceHistoryAsync overload with TimeSpan timeout parameter - Use CancellationToken-based timeout (linked CTS) in DeleteHistoryAsync - Already-dispatched deletions complete before returning partial results - Backward compatible: no timeout = original behavior (IsComplete = null) - Forward IsComplete through ToCorePurgeHistoryResult to PurgeResult - Add scenario tests for partial purge timeout, generous timeout, and compat
- Always cap timeout to 30s max, even if not specified or exceeds 30s - Pass effectiveToken into DeleteAllDataForOrchestrationInstance so in-flight deletes are also cancelled on timeout - Catch OperationCanceledException from Task.WhenAll for timed-out in-flight deletes - External cancellationToken cancellation still propagates normally
There was a problem hiding this comment.
Pull request overview
Improves purge scalability and robustness for DurableTask’s Azure Storage backend by adding parallelized table batch deletes, optional timeout-based partial purging, better 404/idempotency handling, and expanded test coverage.
Changes:
- Add optional purge timeout (
PurgeInstanceFilter.Timeout) and propagate completion status viaIsCompleteinto corePurgeResult. - Implement parallel table batch deletion with 404 fallback to per-entity deletes.
- Add scenario + unit tests for partial purge behavior, blob cleanup, and parallel batch delete behavior.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs | Adds helper API to invoke the new timed purge overload in tests. |
| test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs | Adds new purge/partial-purge scenario tests and validation for large-message blob cleanup. |
| src/DurableTask.Core/PurgeInstanceFilter.cs | Introduces optional Timeout for partial purge. |
| src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs | Extends purge-by-time signature to include an optional timeout. |
| src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs | Extends tracking store purge API contract to include optional timeout. |
| src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs | Implements timeout-aware, parallel purge-by-time behavior and uses parallel batch delete. |
| src/DurableTask.AzureStorage/Storage/Table.cs | Adds DeleteBatchParallelAsync with transactional chunking and 404 fallback. |
| src/DurableTask.AzureStorage/PurgeHistoryResult.cs | Adds IsComplete and forwards it to core PurgeResult. |
| src/DurableTask.AzureStorage/MessageManager.cs | Improves 404 handling for large-message blob deletion by relying on list/delete with exception handling. |
| src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs | Adds timed purge overload and wires PurgeInstanceFilter.Timeout into the call path. |
| Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs | Adds unit tests validating parallel batch delete chunking, fallback, and cancellation behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| // Cap the timeout to a maximum of 30 seconds. If no timeout is specified | ||
| // or the specified timeout exceeds 30s, default to 30s to prevent long-running purges. | ||
| TimeSpan maxTimeout = TimeSpan.FromSeconds(30); | ||
| TimeSpan effectiveTimeout = timeout.HasValue && timeout.Value <= maxTimeout | ||
| ? timeout.Value | ||
| : maxTimeout; | ||
|
|
||
| using CancellationTokenSource timeoutCts = new CancellationTokenSource(effectiveTimeout); | ||
| using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token); | ||
| CancellationToken effectiveToken = linkedCts.Token; |
| { | ||
| try | ||
| { | ||
| PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, effectiveToken); |
| TableTransactionResults[] allResults = await Task.WhenAll( | ||
| chunks.Select(chunk => this.ExecuteBatchWithFallbackAsync(chunk, cancellationToken))); | ||
|
|
||
| foreach (TableTransactionResults result in allResults) | ||
| { | ||
| resultsBuilder.Add(result); |
| // Purge all instances by time period | ||
| await clients[0].PurgeInstanceHistoryByTimePeriod( | ||
| startDateTime, | ||
| DateTime.UtcNow, |
| public async Task PartialPurge_WithoutTimeout_ReturnsNullIsComplete() | ||
| { | ||
| // Validates backward compatibility: existing callers that don't use maxInstanceCount | ||
| // get IsComplete = null (unknown), preserving old behavior. | ||
| using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) | ||
| { | ||
| await host.StartAsync(); | ||
| DateTime startDateTime = DateTime.UtcNow; | ||
|
|
||
| string instanceId = $"compat-purge-{Guid.NewGuid():N}"; | ||
| TestOrchestrationClient client = await host.StartOrchestrationAsync( | ||
| typeof(Orchestrations.Factorial), 10, instanceId); | ||
| await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); | ||
|
|
||
| // Use the old API (no maxInstanceCount) — should still work exactly as before | ||
| await client.PurgeInstanceHistoryByTimePeriod( | ||
| startDateTime, DateTime.UtcNow, new List<OrchestrationStatus> { OrchestrationStatus.Completed }); |
| /// <param name="createdTimeFrom">CreatedTime of orchestrations. Purges history grater than this value.</param> | ||
| /// <param name="createdTimeTo">CreatedTime of orchestrations. Purges history less than this value.</param> | ||
| /// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several status.</param> | ||
| /// <param name="timeout">Maximum time to spend purging. Already-started deletions will complete before the method returns.</param> |
| pendingTasks.Add(Task.Run(async () => | ||
| { | ||
| try | ||
| { | ||
| PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, effectiveToken); | ||
| Interlocked.Add(ref instancesDeleted, statisticsFromDeletion.InstancesDeleted); | ||
| Interlocked.Add(ref storageRequests, statisticsFromDeletion.StorageRequests); | ||
| Interlocked.Add(ref rowsDeleted, statisticsFromDeletion.RowsDeleted); | ||
| } | ||
| finally | ||
| { | ||
| throttle.Release(); | ||
| } | ||
| })); | ||
| } | ||
| } |
- Hard-code 30s CancellationToken-based timeout in DeleteHistoryAsync - Remove configurable Timeout from PurgeInstanceFilter (not needed) - Remove timeout overload from AzureStorageOrchestrationService - IsComplete = true when all purged within 30s, false when timed out - Callers loop until IsComplete = true for large-scale purge
- Add TimeSpan? Timeout property to PurgeInstanceFilter (opt-in, default null) - When null: unbounded purge, IsComplete=null (backward compat, no behavior change) - When set: CancellationToken-based timeout, IsComplete=true/false - Thread Timeout through IOrchestrationServicePurgeClient path - Zero breaking changes: existing callers unaffected
There was a problem hiding this comment.
Pull request overview
This PR enhances the Azure Storage purge pipeline to improve throughput and reliability for large purges by introducing parallelized batch deletes, a timeout-driven partial purge mechanism, and forwarding completion status back to the core purge result shape.
Changes:
- Added
PurgeInstanceFilter.Timeoutand plumbed timeout support into Azure Storage tracking-store purging. - Implemented
Table.DeleteBatchParallelAsyncwith 404/idempotency fallback and updated purge to use it. - Added/updated purge-related tests and extended purge result types to carry
IsComplete.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs | Adds new purge scenario tests for scalability/idempotency/large-blob cleanup and a test intended to validate completion semantics. |
| src/DurableTask.Core/PurgeInstanceFilter.cs | Adds Timeout option to the core purge filter contract. |
| src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs | Extends time-range purge signature to accept optional timeout. |
| src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs | Extends tracking store purge API with an optional timeout parameter. |
| src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs | Implements timeout-aware, parallel instance purging and returns IsComplete based on timeout. |
| src/DurableTask.AzureStorage/Storage/Table.cs | Adds DeleteBatchParallelAsync with parallel transactions and 404 fallback to individual deletes. |
| src/DurableTask.AzureStorage/PurgeHistoryResult.cs | Adds IsComplete to AzureStorage purge result and forwards it to DurableTask.Core.PurgeResult. |
| src/DurableTask.AzureStorage/MessageManager.cs | Improves 404 handling for large message blob cleanup by relying on try/catch rather than container existence checks. |
| src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs | Wires PurgeInstanceFilter.Timeout into the tracking-store purge path used by IOrchestrationServicePurgeClient. |
| Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs | Adds unit tests for DeleteBatchParallelAsync (but currently placed outside the referenced test project directory). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| using CancellationTokenSource timeoutCts = timeout.HasValue | ||
| ? new CancellationTokenSource(timeout.Value) | ||
| : null; | ||
| using CancellationTokenSource linkedCts = timeout.HasValue | ||
| ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token) | ||
| : null; |
| { | ||
| try | ||
| { | ||
| PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, effectiveToken); |
| var throttle = new SemaphoreSlim(MaxPurgeInstanceConcurrency); | ||
| var pendingTasks = new List<Task>(); | ||
|
|
| /// The maximum amount of time to spend purging instances in a single call. | ||
| /// If <c>null</c> (default), all matching instances are purged with no time limit. | ||
| /// When set, the purge stops accepting new instances after this duration elapses | ||
| /// and returns with <see cref="PurgeResult.IsComplete"/> set to <c>false</c>. | ||
| /// Already-started instance deletions will complete before the method returns. |
|
|
||
| // Purge all instances by time period | ||
| await clients[0].PurgeInstanceHistoryByTimePeriod( | ||
| startDateTime, |
| [TestMethod] | ||
| public async Task PurgeReturnsIsComplete() | ||
| { | ||
| // Validates that purge returns IsComplete = true when all instances are purged | ||
| // within the built-in 30-second timeout. | ||
| using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) | ||
| { | ||
| await host.StartAsync(); | ||
| DateTime startDateTime = DateTime.UtcNow; | ||
|
|
||
| const int totalInstances = 5; | ||
| var clients = new List<TestOrchestrationClient>(); | ||
| for (int i = 0; i < totalInstances; i++) | ||
| { | ||
| string instanceId = $"purge-complete-{Guid.NewGuid():N}"; | ||
| TestOrchestrationClient client = await host.StartOrchestrationAsync( | ||
| typeof(Orchestrations.Factorial), 10, instanceId); | ||
| clients.Add(client); | ||
| } | ||
|
|
||
| foreach (var client in clients) | ||
| { | ||
| var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); | ||
| Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); | ||
| } | ||
|
|
||
| DateTime endDateTime = DateTime.UtcNow; | ||
| var statuses = new List<OrchestrationStatus> { OrchestrationStatus.Completed }; | ||
|
|
||
| // Purge should complete within the 30s built-in timeout for a small number of instances | ||
| await clients[0].PurgeInstanceHistoryByTimePeriod( | ||
| startDateTime, endDateTime, statuses); | ||
|
|
| namespace DurableTask.AzureStorage.Tests.Storage | ||
| { | ||
| using System; | ||
| using System.Collections.Concurrent; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Azure; | ||
| using Azure.Data.Tables; | ||
| using DurableTask.AzureStorage.Storage; | ||
| using Microsoft.VisualStudio.TestTools.UnitTesting; | ||
| using Moq; | ||
|
|
||
| [TestClass] | ||
| public class TableDeleteBatchParallelTests | ||
| { | ||
| const string ConnectionString = "UseDevelopmentStorage=true"; | ||
| const string TableName = "TestTable"; | ||
|
|
Summary
Enhance the Azure Storage purge implementation with parallel batch deletes, CancellationToken-based partial purge timeout, improved error handling, and comprehensive tests.
Motivation
Purging large numbers of orchestration instances (100K+) with the current implementation causes:
DeleteBatchAsyncfails with 404 when entities are already deleted (race condition)Changes
Core (DurableTask.Core)
PurgeInstanceFilter.Timeout(TimeSpan?): Optional timeout for partial purgePurgeResult.IsComplete(bool?): Already existed, now properly populatedAzure Storage (DurableTask.AzureStorage)
PurgeHistoryResult.IsComplete: New property + constructor overload, forwarded viaToCorePurgeHistoryResult()AzureStorageOrchestrationService.PurgeInstanceHistoryAsync(..., TimeSpan timeout): New overloadAzureTableTrackingStore.DeleteHistoryAsync: CancellationToken-based timeout using linkedCancellationTokenSourceTable.DeleteBatchParallelAsync: New parallel batch delete with concurrent transactions and 404 fallbackMessageManager.DeleteLargeMessageBlobs: Fixed 404 handling with try/catch instead ofExistsAsync+ deleteSemaphoreSlim(100)for instance-level parallelismBehavior
When
Timeoutis set:CancellationTokenSource(timeout)linked with the caller'sCancellationTokenThrowIfCancellationRequestedOperationCanceledException, waits for in-flight deletions, returnsIsComplete = falseCancellationToken(not timeout token) — they always completeWhen
Timeoutis not set:IsComplete = nullfor backward compatibility)Benchmark Results
100K Instances (EP1, separate ASPs/storage)
500K Instances (EP1, isolated worker SDK path with 25s timeout)
Breaking Changes
None. All changes are additive:
Timeoutproperty onPurgeInstanceFilterPurgeHistoryResultPurgeInstanceHistoryAsyncoverload (original method unchanged)Tests Added
PartialPurge_TimesOutThenCompletesOnRetryPartialPurge_GenerousTimeout_CompletesAllPartialPurge_WithoutTimeout_ReturnsNullIsCompletePurgeMultipleInstancesHistoryByTimePeriod_ScalabilityValidationPurgeSingleInstanceWithIdempotencyPurgeSingleInstance_WithLargeBlobs_CleansUpBlobsPurgeInstance_WithManyHistoryRows_DeletesAllDeleteBatchParallelAsyncRelated PRs