Skip to content

Enhance purge with parallel batch deletes and partial purge timeout#1321

Open
YunchuWang wants to merge 9 commits intomainfrom
wangbill/enpurge
Open

Enhance purge with parallel batch deletes and partial purge timeout#1321
YunchuWang wants to merge 9 commits intomainfrom
wangbill/enpurge

Conversation

@YunchuWang
Copy link
Member

Summary

Enhance the Azure Storage purge implementation with parallel batch deletes, CancellationToken-based partial purge timeout, improved error handling, and comprehensive tests.

Motivation

Purging large numbers of orchestration instances (100K+) with the current implementation causes:

  1. Timeouts: Sequential batch deletes are too slow, causing gRPC deadline timeouts in isolated worker
  2. Storage errors: DeleteBatchAsync fails with 404 when entities are already deleted (race condition)
  3. Silent data loss: gRPC cancellation kills the response but not the in-flight storage operations — caller has no visibility into progress
  4. No progress tracking: No way to know how many instances remain

Changes

Core (DurableTask.Core)

  • PurgeInstanceFilter.Timeout (TimeSpan?): Optional timeout for partial purge
  • PurgeResult.IsComplete (bool?): Already existed, now properly populated

Azure Storage (DurableTask.AzureStorage)

  • PurgeHistoryResult.IsComplete: New property + constructor overload, forwarded via ToCorePurgeHistoryResult()
  • AzureStorageOrchestrationService.PurgeInstanceHistoryAsync(..., TimeSpan timeout): New overload
  • AzureTableTrackingStore.DeleteHistoryAsync: CancellationToken-based timeout using linked CancellationTokenSource
  • Table.DeleteBatchParallelAsync: New parallel batch delete with concurrent transactions and 404 fallback
  • MessageManager.DeleteLargeMessageBlobs: Fixed 404 handling with try/catch instead of ExistsAsync + delete
  • Concurrency control: SemaphoreSlim(100) for instance-level parallelism

Behavior

When Timeout is set:

  • Creates a CancellationTokenSource(timeout) linked with the caller's CancellationToken
  • Passes the effective token to table queries, throttle waits, and ThrowIfCancellationRequested
  • On timeout: catches OperationCanceledException, waits for in-flight deletions, returns IsComplete = false
  • Already-dispatched instance deletions use the original CancellationToken (not timeout token) — they always complete

When Timeout is not set:

  • Existing behavior unchanged (IsComplete = null for backward compatibility)

Benchmark Results

100K Instances (EP1, separate ASPs/storage)

Metric Baseline (stock) Optimized Delta
Total Deleted 28,702 99,949 3.5x
Purge Rate 48.7 inst/s 336.5 inst/s 6.9x
Errors 16 0 Error-free

500K Instances (EP1, isolated worker SDK path with 25s timeout)

Metric Baseline (no timeout) Optimized (25s timeout) Delta
Reported Deleted 17,402 (3.5%) 499,560 (99.9%) 28.7x
Purge Rate 12.3 inst/s 318.1 inst/s 25.9x
Errors 41 (95%) 0 Error-free

Breaking Changes

None. All changes are additive:

  • New optional Timeout property on PurgeInstanceFilter
  • New constructor overload on PurgeHistoryResult
  • New PurgeInstanceHistoryAsync overload (original method unchanged)
  • Internal interface/base class changes are non-public

Tests Added

  • PartialPurge_TimesOutThenCompletesOnRetry
  • PartialPurge_GenerousTimeout_CompletesAll
  • PartialPurge_WithoutTimeout_ReturnsNullIsComplete
  • PurgeMultipleInstancesHistoryByTimePeriod_ScalabilityValidation
  • PurgeSingleInstanceWithIdempotency
  • PurgeSingleInstance_WithLargeBlobs_CleansUpBlobs
  • PurgeInstance_WithManyHistoryRows_DeletesAll
  • 9 unit tests for DeleteBatchParallelAsync

Related PRs

YunchuWang and others added 7 commits March 13, 2026 15:33
- Add TimeSpan? Timeout to PurgeInstanceFilter for partial purge support
- Add bool? IsComplete to PurgeHistoryResult to indicate completion status
- Add new PurgeInstanceHistoryAsync overload with TimeSpan timeout parameter
- Use CancellationToken-based timeout (linked CTS) in DeleteHistoryAsync
- Already-dispatched deletions complete before returning partial results
- Backward compatible: no timeout = original behavior (IsComplete = null)
- Forward IsComplete through ToCorePurgeHistoryResult to PurgeResult
- Add scenario tests for partial purge timeout, generous timeout, and compat
- Always cap timeout to 30s max, even if not specified or exceeds 30s
- Pass effectiveToken into DeleteAllDataForOrchestrationInstance so in-flight deletes are also cancelled on timeout
- Catch OperationCanceledException from Task.WhenAll for timed-out in-flight deletes
- External cancellationToken cancellation still propagates normally
Copilot AI review requested due to automatic review settings March 18, 2026 19:24
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Improves purge scalability and robustness for DurableTask’s Azure Storage backend by adding parallelized table batch deletes, optional timeout-based partial purging, better 404/idempotency handling, and expanded test coverage.

Changes:

  • Add optional purge timeout (PurgeInstanceFilter.Timeout) and propagate completion status via IsComplete into core PurgeResult.
  • Implement parallel table batch deletion with 404 fallback to per-entity deletes.
  • Add scenario + unit tests for partial purge behavior, blob cleanup, and parallel batch delete behavior.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs Adds helper API to invoke the new timed purge overload in tests.
test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs Adds new purge/partial-purge scenario tests and validation for large-message blob cleanup.
src/DurableTask.Core/PurgeInstanceFilter.cs Introduces optional Timeout for partial purge.
src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs Extends purge-by-time signature to include an optional timeout.
src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs Extends tracking store purge API contract to include optional timeout.
src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs Implements timeout-aware, parallel purge-by-time behavior and uses parallel batch delete.
src/DurableTask.AzureStorage/Storage/Table.cs Adds DeleteBatchParallelAsync with transactional chunking and 404 fallback.
src/DurableTask.AzureStorage/PurgeHistoryResult.cs Adds IsComplete and forwards it to core PurgeResult.
src/DurableTask.AzureStorage/MessageManager.cs Improves 404 handling for large-message blob deletion by relying on list/delete with exception handling.
src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs Adds timed purge overload and wires PurgeInstanceFilter.Timeout into the call path.
Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs Adds unit tests validating parallel batch delete chunking, fallback, and cancellation behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +576 to +585
// Cap the timeout to a maximum of 30 seconds. If no timeout is specified
// or the specified timeout exceeds 30s, default to 30s to prevent long-running purges.
TimeSpan maxTimeout = TimeSpan.FromSeconds(30);
TimeSpan effectiveTimeout = timeout.HasValue && timeout.Value <= maxTimeout
? timeout.Value
: maxTimeout;

using CancellationTokenSource timeoutCts = new CancellationTokenSource(effectiveTimeout);
using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);
CancellationToken effectiveToken = linkedCts.Token;
{
try
{
PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, effectiveToken);
Comment on lines +158 to +163
TableTransactionResults[] allResults = await Task.WhenAll(
chunks.Select(chunk => this.ExecuteBatchWithFallbackAsync(chunk, cancellationToken)));

foreach (TableTransactionResults result in allResults)
{
resultsBuilder.Add(result);
// Purge all instances by time period
await clients[0].PurgeInstanceHistoryByTimePeriod(
startDateTime,
DateTime.UtcNow,
Comment on lines +812 to +828
public async Task PartialPurge_WithoutTimeout_ReturnsNullIsComplete()
{
// Validates backward compatibility: existing callers that don't use maxInstanceCount
// get IsComplete = null (unknown), preserving old behavior.
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false))
{
await host.StartAsync();
DateTime startDateTime = DateTime.UtcNow;

string instanceId = $"compat-purge-{Guid.NewGuid():N}";
TestOrchestrationClient client = await host.StartOrchestrationAsync(
typeof(Orchestrations.Factorial), 10, instanceId);
await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60));

// Use the old API (no maxInstanceCount) — should still work exactly as before
await client.PurgeInstanceHistoryByTimePeriod(
startDateTime, DateTime.UtcNow, new List<OrchestrationStatus> { OrchestrationStatus.Completed });
Comment on lines +2038 to +2041
/// <param name="createdTimeFrom">CreatedTime of orchestrations. Purges history grater than this value.</param>
/// <param name="createdTimeTo">CreatedTime of orchestrations. Purges history less than this value.</param>
/// <param name="runtimeStatus">RuntimeStatus of orchestrations. You can specify several status.</param>
/// <param name="timeout">Maximum time to spend purging. Already-started deletions will complete before the method returns.</param>
Comment on lines +606 to +621
pendingTasks.Add(Task.Run(async () =>
{
try
{
PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, effectiveToken);
Interlocked.Add(ref instancesDeleted, statisticsFromDeletion.InstancesDeleted);
Interlocked.Add(ref storageRequests, statisticsFromDeletion.StorageRequests);
Interlocked.Add(ref rowsDeleted, statisticsFromDeletion.RowsDeleted);
}
finally
{
throttle.Release();
}
}));
}
}
- Hard-code 30s CancellationToken-based timeout in DeleteHistoryAsync
- Remove configurable Timeout from PurgeInstanceFilter (not needed)
- Remove timeout overload from AzureStorageOrchestrationService
- IsComplete = true when all purged within 30s, false when timed out
- Callers loop until IsComplete = true for large-scale purge
Comment on lines +752 to +757
foreach (var client in clients)
{
List<HistoryStateEvent> historyEvents = await client.GetOrchestrationHistoryAsync(
client.InstanceId);
Assert.AreEqual(0, historyEvents.Count, "History should be purged");
}
- Add TimeSpan? Timeout property to PurgeInstanceFilter (opt-in, default null)
- When null: unbounded purge, IsComplete=null (backward compat, no behavior change)
- When set: CancellationToken-based timeout, IsComplete=true/false
- Thread Timeout through IOrchestrationServicePurgeClient path
- Zero breaking changes: existing callers unaffected
// Each instance purge internally spawns multiple parallel storage operations, so this should be
// kept moderate. Using 100 to match the original implicit concurrency from pageSizeHint.
const int MaxPurgeInstanceConcurrency = 100;
var throttle = new SemaphoreSlim(MaxPurgeInstanceConcurrency);
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enhances the Azure Storage purge pipeline to improve throughput and reliability for large purges by introducing parallelized batch deletes, a timeout-driven partial purge mechanism, and forwarding completion status back to the core purge result shape.

Changes:

  • Added PurgeInstanceFilter.Timeout and plumbed timeout support into Azure Storage tracking-store purging.
  • Implemented Table.DeleteBatchParallelAsync with 404/idempotency fallback and updated purge to use it.
  • Added/updated purge-related tests and extended purge result types to carry IsComplete.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs Adds new purge scenario tests for scalability/idempotency/large-blob cleanup and a test intended to validate completion semantics.
src/DurableTask.Core/PurgeInstanceFilter.cs Adds Timeout option to the core purge filter contract.
src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs Extends time-range purge signature to accept optional timeout.
src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs Extends tracking store purge API with an optional timeout parameter.
src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs Implements timeout-aware, parallel instance purging and returns IsComplete based on timeout.
src/DurableTask.AzureStorage/Storage/Table.cs Adds DeleteBatchParallelAsync with parallel transactions and 404 fallback to individual deletes.
src/DurableTask.AzureStorage/PurgeHistoryResult.cs Adds IsComplete to AzureStorage purge result and forwards it to DurableTask.Core.PurgeResult.
src/DurableTask.AzureStorage/MessageManager.cs Improves 404 handling for large message blob cleanup by relying on try/catch rather than container existence checks.
src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs Wires PurgeInstanceFilter.Timeout into the tracking-store purge path used by IOrchestrationServicePurgeClient.
Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs Adds unit tests for DeleteBatchParallelAsync (but currently placed outside the referenced test project directory).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +578 to +583
using CancellationTokenSource timeoutCts = timeout.HasValue
? new CancellationTokenSource(timeout.Value)
: null;
using CancellationTokenSource linkedCts = timeout.HasValue
? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token)
: null;
{
try
{
PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, effectiveToken);
Comment on lines +590 to +592
var throttle = new SemaphoreSlim(MaxPurgeInstanceConcurrency);
var pendingTasks = new List<Task>();

Comment on lines +53 to +57
/// The maximum amount of time to spend purging instances in a single call.
/// If <c>null</c> (default), all matching instances are purged with no time limit.
/// When set, the purge stops accepting new instances after this duration elapses
/// and returns with <see cref="PurgeResult.IsComplete"/> set to <c>false</c>.
/// Already-started instance deletions will complete before the method returns.

// Purge all instances by time period
await clients[0].PurgeInstanceHistoryByTimePeriod(
startDateTime,
Comment on lines +718 to +750
[TestMethod]
public async Task PurgeReturnsIsComplete()
{
// Validates that purge returns IsComplete = true when all instances are purged
// within the built-in 30-second timeout.
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false))
{
await host.StartAsync();
DateTime startDateTime = DateTime.UtcNow;

const int totalInstances = 5;
var clients = new List<TestOrchestrationClient>();
for (int i = 0; i < totalInstances; i++)
{
string instanceId = $"purge-complete-{Guid.NewGuid():N}";
TestOrchestrationClient client = await host.StartOrchestrationAsync(
typeof(Orchestrations.Factorial), 10, instanceId);
clients.Add(client);
}

foreach (var client in clients)
{
var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60));
Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);
}

DateTime endDateTime = DateTime.UtcNow;
var statuses = new List<OrchestrationStatus> { OrchestrationStatus.Completed };

// Purge should complete within the 30s built-in timeout for a small number of instances
await clients[0].PurgeInstanceHistoryByTimePeriod(
startDateTime, endDateTime, statuses);

Comment on lines +14 to +33
namespace DurableTask.AzureStorage.Tests.Storage
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Data.Tables;
using DurableTask.AzureStorage.Storage;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

[TestClass]
public class TableDeleteBatchParallelTests
{
const string ConnectionString = "UseDevelopmentStorage=true";
const string TableName = "TestTable";

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants