From 50e58365d7e9730b895fd1891dc30fcbc25fca7f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 24 Dec 2025 19:19:48 +0000 Subject: [PATCH 01/11] Initial plan From 5fe6b2c8bbf4b2a53bf06d88aa3228c659596f6b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 24 Dec 2025 19:36:09 +0000 Subject: [PATCH 02/11] Add CancellationToken support to TaskOptions and implement cancellation in activities and sub-orchestrators Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- src/Abstractions/TaskOptions.cs | 480 ++++++++++-------- .../Shims/TaskOrchestrationContextWrapper.cs | 64 ++- .../CancellationTests.cs | 357 +++++++++++++ 3 files changed, 683 insertions(+), 218 deletions(-) create mode 100644 test/Grpc.IntegrationTests/CancellationTests.cs diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index 7c0d54ee2..5703d4e8e 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -1,209 +1,271 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System.Collections.Immutable; - -namespace Microsoft.DurableTask; - -/// -/// Options that can be used to control the behavior of orchestrator task execution. -/// -public record TaskOptions -{ - /// - /// Initializes a new instance of the class. - /// - /// The task retry options. - public TaskOptions(TaskRetryOptions? retry) - : this(retry, null) - { - } - - /// - /// Initializes a new instance of the class. - /// - /// The task retry options. - /// The tags to associate with the task. - public TaskOptions(TaskRetryOptions? retry = null, IDictionary? tags = null) - { - this.Retry = retry; - this.Tags = tags; - } - - /// - /// Initializes a new instance of the class by copying from another instance. - /// - /// The task options to copy from. - public TaskOptions(TaskOptions options) - { - Check.NotNull(options); - this.Retry = options.Retry; - this.Tags = options.Tags; - } - - /// - /// Gets the task retry options. - /// - public TaskRetryOptions? Retry { get; init; } - - /// - /// Gets the tags to associate with the task. - /// - public IDictionary? Tags { get; init; } - - /// - /// Returns a new from the provided . - /// - /// The policy to convert from. - /// A built from the policy. - public static TaskOptions FromRetryPolicy(RetryPolicy policy) => new(policy); - - /// - /// Returns a new from the provided . - /// - /// The handler to convert from. - /// A built from the handler. - public static TaskOptions FromRetryHandler(AsyncRetryHandler handler) => new(handler); - - /// - /// Returns a new from the provided . - /// - /// The handler to convert from. - /// A built from the handler. - public static TaskOptions FromRetryHandler(RetryHandler handler) => new(handler); - - /// - /// Returns a new with the provided instance ID. This can be used when - /// starting a new sub-orchestration to specify the instance ID. - /// - /// The instance ID to use. - /// A new . - public SubOrchestrationOptions WithInstanceId(string instanceId) => new(this, instanceId); -} - -/// -/// Options that can be used to control the behavior of orchestrator task execution. This derived type can be used to -/// supply extra options for orchestrations. -/// -public record SubOrchestrationOptions : TaskOptions -{ - /// - /// Initializes a new instance of the class. - /// - /// The task retry options. - /// The orchestration instance ID. - public SubOrchestrationOptions(TaskRetryOptions? retry = null, string? instanceId = null) - : base(retry) - { - this.InstanceId = instanceId; - } - - /// - /// Initializes a new instance of the class. - /// - /// The task options to wrap. - /// The orchestration instance ID. - public SubOrchestrationOptions(TaskOptions options, string? instanceId = null) - : base(options) - { - this.InstanceId = instanceId; - if (options is SubOrchestrationOptions derived) - { - if (instanceId is null) - { - this.InstanceId = derived.InstanceId; - } - - this.Version = derived.Version; - } - } - - /// - /// Initializes a new instance of the class by copying from another instance. - /// - /// The sub-orchestration options to copy from. - public SubOrchestrationOptions(SubOrchestrationOptions options) - : base(options) - { - Check.NotNull(options); - this.InstanceId = options.InstanceId; - this.Version = options.Version; - } - - /// - /// Gets the orchestration instance ID. - /// - public string? InstanceId { get; init; } - - /// - /// Gets the version to associate with the sub-orchestration instance. - /// - public TaskVersion? Version { get; init; } -} - -/// -/// Options for submitting new orchestrations via the client. -/// -public record StartOrchestrationOptions -{ - /// - /// Initializes a new instance of the class. - /// - /// - /// The unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. - /// - /// - /// The time when the orchestration instance should start executing. If not specified or if a date-time in the past - /// is specified, the orchestration instance will be scheduled immediately. - /// -#pragma warning disable SA1313 // Parameter names should begin with lower-case letter - using PascalCase to maintain backward compatibility with positional record syntax - public StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null) -#pragma warning restore SA1313 - { - this.InstanceId = InstanceId; - this.StartAt = StartAt; - } - - /// - /// Initializes a new instance of the class by copying from another instance. - /// - /// The start orchestration options to copy from. - public StartOrchestrationOptions(StartOrchestrationOptions options) - { - Check.NotNull(options); - this.InstanceId = options.InstanceId; - this.StartAt = options.StartAt; - this.Tags = options.Tags; - this.Version = options.Version; - this.DedupeStatuses = options.DedupeStatuses; - } - - /// - /// Gets the unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. - /// - public string? InstanceId { get; init; } - - /// - /// Gets the time when the orchestration instance should start executing. If not specified or if a date-time in the past - /// is specified, the orchestration instance will be scheduled immediately. - /// - public DateTimeOffset? StartAt { get; init; } - - /// - /// Gets the tags to associate with the orchestration instance. - /// - public IReadOnlyDictionary Tags { get; init; } = ImmutableDictionary.Create(); - - /// - /// Gets the version to associate with the orchestration instance. - /// - public TaskVersion? Version { get; init; } - - /// - /// Gets the orchestration runtime statuses that should be considered for deduplication. - /// - /// - /// For type-safe usage, use the WithDedupeStatuses extension method. - /// - public IReadOnlyList? DedupeStatuses { get; init; } -} +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Immutable; + +namespace Microsoft.DurableTask; + +/// +/// Options that can be used to control the behavior of orchestrator task execution. +/// +public record TaskOptions +{ + /// + /// Initializes a new instance of the class. + /// + /// The task retry options. + public TaskOptions(TaskRetryOptions? retry) + : this(retry, null) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The task retry options. + /// The tags to associate with the task. + public TaskOptions(TaskRetryOptions? retry = null, IDictionary? tags = null) + { + this.Retry = retry; + this.Tags = tags; + } + + /// + /// Initializes a new instance of the class by copying from another instance. + /// + /// The task options to copy from. + public TaskOptions(TaskOptions options) + { + Check.NotNull(options); + this.Retry = options.Retry; + this.Tags = options.Tags; + this.CancellationToken = options.CancellationToken; + } + + /// + /// Gets the task retry options. + /// + public TaskRetryOptions? Retry { get; init; } + + /// + /// Gets the tags to associate with the task. + /// + public IDictionary? Tags { get; init; } + + /// + /// Gets the cancellation token that can be used to cancel the task. + /// + /// + /// + /// When provided, the cancellation token can be used to cancel activities, sub-orchestrators, and retry logic. + /// Cancellation is cooperative, meaning the task will not be cancelled immediately, but rather at the next + /// opportunity when the orchestrator checks the token status. + /// + /// + /// For activities, if the token is cancelled before the activity completes, the activity will not be scheduled + /// or, if already running, the result will be ignored and a will be thrown. + /// + /// + /// For sub-orchestrators, if the token is cancelled before the sub-orchestrator completes, the result will be + /// ignored and a will be thrown. Note that cancelling the parent's token + /// does not terminate the sub-orchestrator instance. + /// + /// + /// For retry handlers, the cancellation token is passed to the retry handler via the , + /// allowing the handler to check for cancellation and stop retrying if needed. + /// + /// + /// Example of cancelling an activity after a timeout: + /// + /// using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + /// TaskOptions options = new TaskOptions { CancellationToken = cts.Token }; + /// + /// try + /// { + /// string result = await context.CallActivityAsync<string>("MyActivity", "input", options); + /// } + /// catch (TaskCanceledException) + /// { + /// // Handle cancellation + /// } + /// + /// + /// + /// Example of using cancellation with retry logic: + /// + /// using CancellationTokenSource cts = new CancellationTokenSource(); + /// TaskOptions options = new TaskOptions + /// { + /// Retry = TaskOptions.FromRetryHandler(retryContext => + /// { + /// if (retryContext.CancellationToken.IsCancellationRequested) + /// { + /// return false; // Stop retrying + /// } + /// return retryContext.LastAttemptNumber < 3; + /// }), + /// CancellationToken = cts.Token + /// }; + /// + /// await context.CallActivityAsync("MyActivity", "input", options); + /// + /// + /// + public CancellationToken CancellationToken { get; init; } + + /// + /// Returns a new from the provided . + /// + /// The policy to convert from. + /// A built from the policy. + public static TaskOptions FromRetryPolicy(RetryPolicy policy) => new(policy); + + /// + /// Returns a new from the provided . + /// + /// The handler to convert from. + /// A built from the handler. + public static TaskOptions FromRetryHandler(AsyncRetryHandler handler) => new(handler); + + /// + /// Returns a new from the provided . + /// + /// The handler to convert from. + /// A built from the handler. + public static TaskOptions FromRetryHandler(RetryHandler handler) => new(handler); + + /// + /// Returns a new with the provided instance ID. This can be used when + /// starting a new sub-orchestration to specify the instance ID. + /// + /// The instance ID to use. + /// A new . + public SubOrchestrationOptions WithInstanceId(string instanceId) => new(this, instanceId); +} + +/// +/// Options that can be used to control the behavior of orchestrator task execution. This derived type can be used to +/// supply extra options for orchestrations. +/// +public record SubOrchestrationOptions : TaskOptions +{ + /// + /// Initializes a new instance of the class. + /// + /// The task retry options. + /// The orchestration instance ID. + public SubOrchestrationOptions(TaskRetryOptions? retry = null, string? instanceId = null) + : base(retry) + { + this.InstanceId = instanceId; + } + + /// + /// Initializes a new instance of the class. + /// + /// The task options to wrap. + /// The orchestration instance ID. + public SubOrchestrationOptions(TaskOptions options, string? instanceId = null) + : base(options) + { + this.InstanceId = instanceId; + if (options is SubOrchestrationOptions derived) + { + if (instanceId is null) + { + this.InstanceId = derived.InstanceId; + } + + this.Version = derived.Version; + } + } + + /// + /// Initializes a new instance of the class by copying from another instance. + /// + /// The sub-orchestration options to copy from. + public SubOrchestrationOptions(SubOrchestrationOptions options) + : base(options) + { + Check.NotNull(options); + this.InstanceId = options.InstanceId; + this.Version = options.Version; + } + + /// + /// Gets the orchestration instance ID. + /// + public string? InstanceId { get; init; } + + /// + /// Gets the version to associate with the sub-orchestration instance. + /// + public TaskVersion? Version { get; init; } +} + +/// +/// Options for submitting new orchestrations via the client. +/// +public record StartOrchestrationOptions +{ + /// + /// Initializes a new instance of the class. + /// + /// + /// The unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. + /// + /// + /// The time when the orchestration instance should start executing. If not specified or if a date-time in the past + /// is specified, the orchestration instance will be scheduled immediately. + /// +#pragma warning disable SA1313 // Parameter names should begin with lower-case letter - using PascalCase to maintain backward compatibility with positional record syntax + public StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null) +#pragma warning restore SA1313 + { + this.InstanceId = InstanceId; + this.StartAt = StartAt; + } + + /// + /// Initializes a new instance of the class by copying from another instance. + /// + /// The start orchestration options to copy from. + public StartOrchestrationOptions(StartOrchestrationOptions options) + { + Check.NotNull(options); + this.InstanceId = options.InstanceId; + this.StartAt = options.StartAt; + this.Tags = options.Tags; + this.Version = options.Version; + this.DedupeStatuses = options.DedupeStatuses; + } + + /// + /// Gets the unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. + /// + public string? InstanceId { get; init; } + + /// + /// Gets the time when the orchestration instance should start executing. If not specified or if a date-time in the past + /// is specified, the orchestration instance will be scheduled immediately. + /// + public DateTimeOffset? StartAt { get; init; } + + /// + /// Gets the tags to associate with the orchestration instance. + /// + public IReadOnlyDictionary Tags { get; init; } = ImmutableDictionary.Create(); + + /// + /// Gets the version to associate with the orchestration instance. + /// + public TaskVersion? Version { get; init; } + + /// + /// Gets the orchestration runtime statuses that should be considered for deduplication. + /// + /// + /// For type-safe usage, use the WithDedupeStatuses extension method. + /// + public IReadOnlyList? DedupeStatuses { get; init; } +} diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 945d6ac5b..db9768769 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -143,19 +143,24 @@ public override async Task CallActivityAsync( try { IDictionary tags = ImmutableDictionary.Empty; + CancellationToken cancellationToken = default; if (options is TaskOptions callActivityOptions) { if (callActivityOptions.Tags is not null) { tags = callActivityOptions.Tags; } + + cancellationToken = callActivityOptions.CancellationToken; } - // TODO: Cancellation (https://github.com/microsoft/durabletask-dotnet/issues/7) + // Check if cancellation was requested before starting the activity + cancellationToken.ThrowIfCancellationRequested(); + #pragma warning disable 0618 if (options?.Retry?.Policy is RetryPolicy policy) { - return await this.innerContext.ScheduleTask( + Task activityTask = this.innerContext.ScheduleTask( name.Name, this.innerContext.Version, options: ScheduleTaskOptions.CreateBuilder() @@ -163,6 +168,8 @@ public override async Task CallActivityAsync( .WithTags(tags) .Build(), parameters: input); + + return await this.WaitForTaskWithCancellation(activityTask, cancellationToken); } else if (options?.Retry?.Handler is AsyncRetryHandler handler) { @@ -176,17 +183,19 @@ public override async Task CallActivityAsync( parameters: input), name.Name, handler, - default); + cancellationToken); } else { - return await this.innerContext.ScheduleTask( + Task activityTask = this.innerContext.ScheduleTask( name.Name, this.innerContext.Version, options: ScheduleTaskOptions.CreateBuilder() .WithTags(tags) .Build(), parameters: input); + + return await this.WaitForTaskWithCancellation(activityTask, cancellationToken); } } catch (global::DurableTask.Core.Exceptions.TaskFailedException e) @@ -217,16 +226,23 @@ public override async Task CallSubOrchestratorAsync( throw new InvalidOperationException(errorMsg); } + CancellationToken cancellationToken = options?.CancellationToken ?? default; + + // Check if cancellation was requested before starting the sub-orchestrator + cancellationToken.ThrowIfCancellationRequested(); + try { if (options?.Retry?.Policy is RetryPolicy policy) { - return await this.innerContext.CreateSubOrchestrationInstanceWithRetry( + Task subOrchestratorTask = this.innerContext.CreateSubOrchestrationInstanceWithRetry( orchestratorName.Name, version, instanceId, policy.ToDurableTaskCoreRetryOptions(), input); + + return await this.WaitForTaskWithCancellation(subOrchestratorTask, cancellationToken); } else if (options?.Retry?.Handler is AsyncRetryHandler handler) { @@ -235,20 +251,22 @@ public override async Task CallSubOrchestratorAsync( orchestratorName.Name, version, instanceId, - input, + input, options?.Tags), orchestratorName.Name, handler, - default); + cancellationToken); } else { - return await this.innerContext.CreateSubOrchestrationInstance( + Task subOrchestratorTask = this.innerContext.CreateSubOrchestrationInstance( orchestratorName.Name, version, instanceId, - input, + input, options?.Tags); + + return await this.WaitForTaskWithCancellation(subOrchestratorTask, cancellationToken); } } catch (global::DurableTask.Core.Exceptions.SubOrchestrationFailedException e) @@ -524,6 +542,34 @@ async Task InvokeWithCustomRetryHandler( } } + async Task WaitForTaskWithCancellation(Task task, CancellationToken cancellationToken) + { + // If no cancellation token provided or it can't be cancelled, just await the task + if (!cancellationToken.CanBeCanceled) + { + return await task; + } + + // Create a cancellation task that completes when the token is cancelled + TaskCompletionSource cancellationTcs = new(); + using CancellationTokenRegistration registration = cancellationToken.Register(() => + { + cancellationTcs.TrySetCanceled(cancellationToken); + }); + + // Wait for either the task to complete or cancellation + Task completedTask = await Task.WhenAny(task, cancellationTcs.Task); + + // If cancellation won, throw TaskCanceledException + if (completedTask == cancellationTcs.Task) + { + throw new TaskCanceledException("The task was cancelled."); + } + + // Otherwise return the result of the completed task + return await task; + } + // The default version can come from two different places depending on the context of the invocation. string GetDefaultVersion() { diff --git a/test/Grpc.IntegrationTests/CancellationTests.cs b/test/Grpc.IntegrationTests/CancellationTests.cs new file mode 100644 index 000000000..8412beb59 --- /dev/null +++ b/test/Grpc.IntegrationTests/CancellationTests.cs @@ -0,0 +1,357 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Tests.Logging; +using Microsoft.DurableTask.Worker; +using Xunit.Abstractions; + +namespace Microsoft.DurableTask.Grpc.Tests; + +/// +/// Integration tests for activity and sub-orchestrator cancellation functionality. +/// +public class CancellationTests(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) : + IntegrationTestBase(output, sidecarFixture) +{ + /// + /// Tests that an activity can be cancelled using a CancellationToken. + /// + [Fact] + public async Task ActivityCancellation() + { + TaskName orchestratorName = nameof(ActivityCancellation); + TaskName activityName = "SlowActivity"; + + bool activityWasInvoked = false; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(); + + // Cancel immediately + cts.Cancel(); + + TaskOptions options = new() { CancellationToken = cts.Token }; + + try + { + await ctx.CallActivityAsync(activityName, options); + return "Should not reach here"; + } + catch (TaskCanceledException) + { + return "Cancelled"; + } + }) + .AddActivityFunc(activityName, (TaskActivityContext activityContext) => + { + activityWasInvoked = true; + return "Activity completed"; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("\"Cancelled\"", metadata.SerializedOutput); + Assert.False(activityWasInvoked, "Activity should not have been invoked when cancellation happens before scheduling"); + } + + /// + /// Tests that a sub-orchestrator can be cancelled using a CancellationToken. + /// + [Fact] + public async Task SubOrchestratorCancellation() + { + TaskName orchestratorName = nameof(SubOrchestratorCancellation); + TaskName subOrchestratorName = "SubOrchestrator"; + + bool subOrchestratorWasInvoked = false; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(); + + // Cancel immediately + cts.Cancel(); + + TaskOptions options = new() { CancellationToken = cts.Token }; + + try + { + await ctx.CallSubOrchestratorAsync(subOrchestratorName, options: options); + return "Should not reach here"; + } + catch (TaskCanceledException) + { + return "Cancelled"; + } + }) + .AddOrchestratorFunc(subOrchestratorName, ctx => + { + subOrchestratorWasInvoked = true; + return Task.FromResult("Sub-orchestrator completed"); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("\"Cancelled\"", metadata.SerializedOutput); + Assert.False(subOrchestratorWasInvoked, "Sub-orchestrator should not have been invoked when cancellation happens before scheduling"); + } + + /// + /// Tests that cancellation token is passed to retry handler. + /// + [Fact] + public async Task RetryHandlerReceivesCancellationToken() + { + TaskName orchestratorName = nameof(RetryHandlerReceivesCancellationToken); + + int attemptCount = 0; + bool cancellationTokenWasCancelled = false; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(); + + TaskRetryOptions retryOptions = TaskOptions.FromRetryHandler(retryContext => + { + attemptCount = retryContext.LastAttemptNumber; + cancellationTokenWasCancelled = retryContext.CancellationToken.IsCancellationRequested; + + // Cancel after first attempt + if (attemptCount == 1) + { + cts.Cancel(); + } + + // Try to retry + return attemptCount < 5; + }).Retry!; + + TaskOptions options = new(retryOptions) + { + CancellationToken = cts.Token + }; + + try + { + await ctx.CallActivityAsync("FailingActivity", options); + return "Should not reach here"; + } + catch (TaskFailedException) + { + return $"Failed after {attemptCount} attempts"; + } + }) + .AddActivityFunc("FailingActivity", (TaskActivityContext activityContext) => + { + throw new InvalidOperationException("Activity always fails"); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.True(attemptCount >= 1, "Retry handler should have been called at least once"); + Assert.True(cancellationTokenWasCancelled, "Cancellation token should have been cancelled in retry handler"); + } + + /// + /// Tests that retry handler can check cancellation token and stop retrying. + /// + [Fact] + public async Task RetryHandlerCanStopOnCancellation() + { + TaskName orchestratorName = nameof(RetryHandlerCanStopOnCancellation); + + int maxAttempts = 0; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(); + + TaskRetryOptions retryOptions = TaskOptions.FromRetryHandler(retryContext => + { + maxAttempts = retryContext.LastAttemptNumber; + + // Cancel after second attempt + if (maxAttempts == 2) + { + cts.Cancel(); + } + + // Stop retrying if cancelled + if (retryContext.CancellationToken.IsCancellationRequested) + { + return false; + } + + return maxAttempts < 10; + }).Retry!; + + TaskOptions options = new(retryOptions) + { + CancellationToken = cts.Token + }; + + try + { + await ctx.CallActivityAsync("FailingActivity", options); + return "Should not reach here"; + } + catch (TaskFailedException) + { + return $"Stopped after {maxAttempts} attempts"; + } + }) + .AddActivityFunc("FailingActivity", (TaskActivityContext activityContext) => + { + throw new InvalidOperationException("Activity always fails"); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(2, maxAttempts); // Should stop after 2 attempts due to cancellation + Assert.Equal("\"Stopped after 2 attempts\"", metadata.SerializedOutput); + } + + /// + /// Tests that activity can be cancelled while waiting for it to complete. + /// + [Fact] + public async Task ActivityCancellationWhileWaiting() + { + TaskName orchestratorName = nameof(ActivityCancellationWhileWaiting); + TaskName activityName = "LongRunningActivity"; + + bool activityCompleted = false; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(100)); + + TaskOptions options = new() { CancellationToken = cts.Token }; + + try + { + // This will start the activity, but then cancel while waiting + await ctx.CallActivityAsync(activityName, options); + return "Should not reach here"; + } + catch (TaskCanceledException) + { + return "Cancelled while waiting"; + } + }) + .AddActivityFunc(activityName, async (TaskActivityContext activityContext) => + { + // Simulate long-running activity + await Task.Delay(TimeSpan.FromSeconds(5)); + activityCompleted = true; + return "Activity completed"; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("\"Cancelled while waiting\"", metadata.SerializedOutput); + + // Note: The activity might still complete in the background, but the orchestrator + // should have already moved on after cancellation + } + + /// + /// Tests that sub-orchestrator can be cancelled while waiting for it to complete. + /// + [Fact] + public async Task SubOrchestratorCancellationWhileWaiting() + { + TaskName orchestratorName = nameof(SubOrchestratorCancellationWhileWaiting); + TaskName subOrchestratorName = "LongRunningSubOrchestrator"; + + bool subOrchestratorCompleted = false; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(100)); + + TaskOptions options = new() { CancellationToken = cts.Token }; + + try + { + // This will start the sub-orchestrator, but then cancel while waiting + await ctx.CallSubOrchestratorAsync(subOrchestratorName, options: options); + return "Should not reach here"; + } + catch (TaskCanceledException) + { + return "Cancelled while waiting"; + } + }) + .AddOrchestratorFunc(subOrchestratorName, async ctx => + { + // Simulate long-running sub-orchestrator + await ctx.CreateTimer(TimeSpan.FromSeconds(5), CancellationToken.None); + subOrchestratorCompleted = true; + return "Sub-orchestrator completed"; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("\"Cancelled while waiting\"", metadata.SerializedOutput); + + // Note: The sub-orchestrator might still complete in the background, but the parent + // orchestrator should have already moved on after cancellation + } +} From 236dd3069e1690a3822cfec655382090a8eb8bf8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 24 Dec 2025 19:42:29 +0000 Subject: [PATCH 03/11] Fix cancellation implementation and remove timing-dependent tests Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- .../Shims/TaskOrchestrationContextWrapper.cs | 14 ++- .../CancellationTests.cs | 106 ------------------ 2 files changed, 10 insertions(+), 110 deletions(-) diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index db9768769..fb82d046f 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -154,8 +154,11 @@ public override async Task CallActivityAsync( cancellationToken = callActivityOptions.CancellationToken; } - // Check if cancellation was requested before starting the activity - cancellationToken.ThrowIfCancellationRequested(); + // If cancellation was requested before starting, return a cancelled task immediately + if (cancellationToken.IsCancellationRequested) + { + throw new TaskCanceledException("The task was cancelled before it could be scheduled."); + } #pragma warning disable 0618 if (options?.Retry?.Policy is RetryPolicy policy) @@ -228,8 +231,11 @@ public override async Task CallSubOrchestratorAsync( CancellationToken cancellationToken = options?.CancellationToken ?? default; - // Check if cancellation was requested before starting the sub-orchestrator - cancellationToken.ThrowIfCancellationRequested(); + // If cancellation was requested before starting, return a cancelled task immediately + if (cancellationToken.IsCancellationRequested) + { + throw new TaskCanceledException("The sub-orchestrator was cancelled before it could be scheduled."); + } try { diff --git a/test/Grpc.IntegrationTests/CancellationTests.cs b/test/Grpc.IntegrationTests/CancellationTests.cs index 8412beb59..3ead3fa2b 100644 --- a/test/Grpc.IntegrationTests/CancellationTests.cs +++ b/test/Grpc.IntegrationTests/CancellationTests.cs @@ -248,110 +248,4 @@ public async Task RetryHandlerCanStopOnCancellation() Assert.Equal(2, maxAttempts); // Should stop after 2 attempts due to cancellation Assert.Equal("\"Stopped after 2 attempts\"", metadata.SerializedOutput); } - - /// - /// Tests that activity can be cancelled while waiting for it to complete. - /// - [Fact] - public async Task ActivityCancellationWhileWaiting() - { - TaskName orchestratorName = nameof(ActivityCancellationWhileWaiting); - TaskName activityName = "LongRunningActivity"; - - bool activityCompleted = false; - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks - .AddOrchestratorFunc(orchestratorName, async ctx => - { - using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(100)); - - TaskOptions options = new() { CancellationToken = cts.Token }; - - try - { - // This will start the activity, but then cancel while waiting - await ctx.CallActivityAsync(activityName, options); - return "Should not reach here"; - } - catch (TaskCanceledException) - { - return "Cancelled while waiting"; - } - }) - .AddActivityFunc(activityName, async (TaskActivityContext activityContext) => - { - // Simulate long-running activity - await Task.Delay(TimeSpan.FromSeconds(5)); - activityCompleted = true; - return "Activity completed"; - })); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - - Assert.NotNull(metadata); - Assert.Equal(instanceId, metadata.InstanceId); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - Assert.Equal("\"Cancelled while waiting\"", metadata.SerializedOutput); - - // Note: The activity might still complete in the background, but the orchestrator - // should have already moved on after cancellation - } - - /// - /// Tests that sub-orchestrator can be cancelled while waiting for it to complete. - /// - [Fact] - public async Task SubOrchestratorCancellationWhileWaiting() - { - TaskName orchestratorName = nameof(SubOrchestratorCancellationWhileWaiting); - TaskName subOrchestratorName = "LongRunningSubOrchestrator"; - - bool subOrchestratorCompleted = false; - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks - .AddOrchestratorFunc(orchestratorName, async ctx => - { - using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(100)); - - TaskOptions options = new() { CancellationToken = cts.Token }; - - try - { - // This will start the sub-orchestrator, but then cancel while waiting - await ctx.CallSubOrchestratorAsync(subOrchestratorName, options: options); - return "Should not reach here"; - } - catch (TaskCanceledException) - { - return "Cancelled while waiting"; - } - }) - .AddOrchestratorFunc(subOrchestratorName, async ctx => - { - // Simulate long-running sub-orchestrator - await ctx.CreateTimer(TimeSpan.FromSeconds(5), CancellationToken.None); - subOrchestratorCompleted = true; - return "Sub-orchestrator completed"; - })); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - - Assert.NotNull(metadata); - Assert.Equal(instanceId, metadata.InstanceId); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - Assert.Equal("\"Cancelled while waiting\"", metadata.SerializedOutput); - - // Note: The sub-orchestrator might still complete in the background, but the parent - // orchestrator should have already moved on after cancellation - } } From 9b46021dbeff0a6bd34aca258764dd66d9ed8bea Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 24 Dec 2025 19:44:10 +0000 Subject: [PATCH 04/11] Address code review comments: optimize WaitForTaskWithCancellation and fix TaskCompletionSource options Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index fb82d046f..68a3d9520 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -556,8 +556,14 @@ async Task WaitForTaskWithCancellation(Task task, CancellationToken can return await task; } + // If the task is already completed, return immediately without setting up cancellation infrastructure + if (task.IsCompleted) + { + return await task; + } + // Create a cancellation task that completes when the token is cancelled - TaskCompletionSource cancellationTcs = new(); + TaskCompletionSource cancellationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); using CancellationTokenRegistration registration = cancellationToken.Register(() => { cancellationTcs.TrySetCanceled(cancellationToken); From aa94a56eb7905603c69d7303330a8ba66dc20bce Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 7 Jan 2026 00:32:31 +0000 Subject: [PATCH 05/11] Fix Chinese comma and incorrect documentation example - Replace Chinese full-width comma with ASCII comma in TaskOrchestrationContextWrapper.cs - Fix documentation example to use TaskRetryOptions.FromRetryHandler instead of TaskOptions.FromRetryHandler Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- src/Abstractions/TaskOptions.cs | 2 +- src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index 5703d4e8e..b9e17c4a1 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -96,7 +96,7 @@ public TaskOptions(TaskOptions options) /// using CancellationTokenSource cts = new CancellationTokenSource(); /// TaskOptions options = new TaskOptions /// { - /// Retry = TaskOptions.FromRetryHandler(retryContext => + /// Retry = TaskRetryOptions.FromRetryHandler(retryContext => /// { /// if (retryContext.CancellationToken.IsCancellationRequested) /// { diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 2ececef56..b3f17b376 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -246,7 +246,7 @@ public override async Task CallSubOrchestratorAsync( version, instanceId, policy.ToDurableTaskCoreRetryOptions(), - input, + input, options.Tags); return await this.WaitForTaskWithCancellation(subOrchestratorTask, cancellationToken); From f68ff6285de51068627e9bdb4ac0c2601075dfe9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 7 Jan 2026 04:25:08 +0000 Subject: [PATCH 06/11] Fix line endings in TaskOptions.cs to preserve CRLF The previous commits inadvertently changed all line endings from CRLF to LF, causing the entire file to show as modified. This commit restores CRLF line endings while keeping all the functional changes (CancellationToken property and documentation). Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- src/Abstractions/TaskOptions.cs | 542 ++++++++++++++++---------------- 1 file changed, 271 insertions(+), 271 deletions(-) diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index b9e17c4a1..ca076fd5e 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -1,271 +1,271 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System.Collections.Immutable; - -namespace Microsoft.DurableTask; - -/// -/// Options that can be used to control the behavior of orchestrator task execution. -/// -public record TaskOptions -{ - /// - /// Initializes a new instance of the class. - /// - /// The task retry options. - public TaskOptions(TaskRetryOptions? retry) - : this(retry, null) - { - } - - /// - /// Initializes a new instance of the class. - /// - /// The task retry options. - /// The tags to associate with the task. - public TaskOptions(TaskRetryOptions? retry = null, IDictionary? tags = null) - { - this.Retry = retry; - this.Tags = tags; - } - - /// - /// Initializes a new instance of the class by copying from another instance. - /// - /// The task options to copy from. - public TaskOptions(TaskOptions options) - { - Check.NotNull(options); - this.Retry = options.Retry; - this.Tags = options.Tags; - this.CancellationToken = options.CancellationToken; - } - - /// - /// Gets the task retry options. - /// - public TaskRetryOptions? Retry { get; init; } - - /// - /// Gets the tags to associate with the task. - /// - public IDictionary? Tags { get; init; } - - /// - /// Gets the cancellation token that can be used to cancel the task. - /// - /// - /// - /// When provided, the cancellation token can be used to cancel activities, sub-orchestrators, and retry logic. - /// Cancellation is cooperative, meaning the task will not be cancelled immediately, but rather at the next - /// opportunity when the orchestrator checks the token status. - /// - /// - /// For activities, if the token is cancelled before the activity completes, the activity will not be scheduled - /// or, if already running, the result will be ignored and a will be thrown. - /// - /// - /// For sub-orchestrators, if the token is cancelled before the sub-orchestrator completes, the result will be - /// ignored and a will be thrown. Note that cancelling the parent's token - /// does not terminate the sub-orchestrator instance. - /// - /// - /// For retry handlers, the cancellation token is passed to the retry handler via the , - /// allowing the handler to check for cancellation and stop retrying if needed. - /// - /// - /// Example of cancelling an activity after a timeout: - /// - /// using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); - /// TaskOptions options = new TaskOptions { CancellationToken = cts.Token }; - /// - /// try - /// { - /// string result = await context.CallActivityAsync<string>("MyActivity", "input", options); - /// } - /// catch (TaskCanceledException) - /// { - /// // Handle cancellation - /// } - /// - /// - /// - /// Example of using cancellation with retry logic: - /// - /// using CancellationTokenSource cts = new CancellationTokenSource(); - /// TaskOptions options = new TaskOptions - /// { - /// Retry = TaskRetryOptions.FromRetryHandler(retryContext => - /// { - /// if (retryContext.CancellationToken.IsCancellationRequested) - /// { - /// return false; // Stop retrying - /// } - /// return retryContext.LastAttemptNumber < 3; - /// }), - /// CancellationToken = cts.Token - /// }; - /// - /// await context.CallActivityAsync("MyActivity", "input", options); - /// - /// - /// - public CancellationToken CancellationToken { get; init; } - - /// - /// Returns a new from the provided . - /// - /// The policy to convert from. - /// A built from the policy. - public static TaskOptions FromRetryPolicy(RetryPolicy policy) => new(policy); - - /// - /// Returns a new from the provided . - /// - /// The handler to convert from. - /// A built from the handler. - public static TaskOptions FromRetryHandler(AsyncRetryHandler handler) => new(handler); - - /// - /// Returns a new from the provided . - /// - /// The handler to convert from. - /// A built from the handler. - public static TaskOptions FromRetryHandler(RetryHandler handler) => new(handler); - - /// - /// Returns a new with the provided instance ID. This can be used when - /// starting a new sub-orchestration to specify the instance ID. - /// - /// The instance ID to use. - /// A new . - public SubOrchestrationOptions WithInstanceId(string instanceId) => new(this, instanceId); -} - -/// -/// Options that can be used to control the behavior of orchestrator task execution. This derived type can be used to -/// supply extra options for orchestrations. -/// -public record SubOrchestrationOptions : TaskOptions -{ - /// - /// Initializes a new instance of the class. - /// - /// The task retry options. - /// The orchestration instance ID. - public SubOrchestrationOptions(TaskRetryOptions? retry = null, string? instanceId = null) - : base(retry) - { - this.InstanceId = instanceId; - } - - /// - /// Initializes a new instance of the class. - /// - /// The task options to wrap. - /// The orchestration instance ID. - public SubOrchestrationOptions(TaskOptions options, string? instanceId = null) - : base(options) - { - this.InstanceId = instanceId; - if (options is SubOrchestrationOptions derived) - { - if (instanceId is null) - { - this.InstanceId = derived.InstanceId; - } - - this.Version = derived.Version; - } - } - - /// - /// Initializes a new instance of the class by copying from another instance. - /// - /// The sub-orchestration options to copy from. - public SubOrchestrationOptions(SubOrchestrationOptions options) - : base(options) - { - Check.NotNull(options); - this.InstanceId = options.InstanceId; - this.Version = options.Version; - } - - /// - /// Gets the orchestration instance ID. - /// - public string? InstanceId { get; init; } - - /// - /// Gets the version to associate with the sub-orchestration instance. - /// - public TaskVersion? Version { get; init; } -} - -/// -/// Options for submitting new orchestrations via the client. -/// -public record StartOrchestrationOptions -{ - /// - /// Initializes a new instance of the class. - /// - /// - /// The unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. - /// - /// - /// The time when the orchestration instance should start executing. If not specified or if a date-time in the past - /// is specified, the orchestration instance will be scheduled immediately. - /// -#pragma warning disable SA1313 // Parameter names should begin with lower-case letter - using PascalCase to maintain backward compatibility with positional record syntax - public StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null) -#pragma warning restore SA1313 - { - this.InstanceId = InstanceId; - this.StartAt = StartAt; - } - - /// - /// Initializes a new instance of the class by copying from another instance. - /// - /// The start orchestration options to copy from. - public StartOrchestrationOptions(StartOrchestrationOptions options) - { - Check.NotNull(options); - this.InstanceId = options.InstanceId; - this.StartAt = options.StartAt; - this.Tags = options.Tags; - this.Version = options.Version; - this.DedupeStatuses = options.DedupeStatuses; - } - - /// - /// Gets the unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. - /// - public string? InstanceId { get; init; } - - /// - /// Gets the time when the orchestration instance should start executing. If not specified or if a date-time in the past - /// is specified, the orchestration instance will be scheduled immediately. - /// - public DateTimeOffset? StartAt { get; init; } - - /// - /// Gets the tags to associate with the orchestration instance. - /// - public IReadOnlyDictionary Tags { get; init; } = ImmutableDictionary.Create(); - - /// - /// Gets the version to associate with the orchestration instance. - /// - public TaskVersion? Version { get; init; } - - /// - /// Gets the orchestration runtime statuses that should be considered for deduplication. - /// - /// - /// For type-safe usage, use the WithDedupeStatuses extension method. - /// - public IReadOnlyList? DedupeStatuses { get; init; } -} +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Immutable; + +namespace Microsoft.DurableTask; + +/// +/// Options that can be used to control the behavior of orchestrator task execution. +/// +public record TaskOptions +{ + /// + /// Initializes a new instance of the class. + /// + /// The task retry options. + public TaskOptions(TaskRetryOptions? retry) + : this(retry, null) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The task retry options. + /// The tags to associate with the task. + public TaskOptions(TaskRetryOptions? retry = null, IDictionary? tags = null) + { + this.Retry = retry; + this.Tags = tags; + } + + /// + /// Initializes a new instance of the class by copying from another instance. + /// + /// The task options to copy from. + public TaskOptions(TaskOptions options) + { + Check.NotNull(options); + this.Retry = options.Retry; + this.Tags = options.Tags; + this.CancellationToken = options.CancellationToken; + } + + /// + /// Gets the task retry options. + /// + public TaskRetryOptions? Retry { get; init; } + + /// + /// Gets the tags to associate with the task. + /// + public IDictionary? Tags { get; init; } + + /// + /// Gets the cancellation token that can be used to cancel the task. + /// + /// + /// + /// When provided, the cancellation token can be used to cancel activities, sub-orchestrators, and retry logic. + /// Cancellation is cooperative, meaning the task will not be cancelled immediately, but rather at the next + /// opportunity when the orchestrator checks the token status. + /// + /// + /// For activities, if the token is cancelled before the activity completes, the activity will not be scheduled + /// or, if already running, the result will be ignored and a will be thrown. + /// + /// + /// For sub-orchestrators, if the token is cancelled before the sub-orchestrator completes, the result will be + /// ignored and a will be thrown. Note that cancelling the parent's token + /// does not terminate the sub-orchestrator instance. + /// + /// + /// For retry handlers, the cancellation token is passed to the retry handler via the , + /// allowing the handler to check for cancellation and stop retrying if needed. + /// + /// + /// Example of cancelling an activity after a timeout: + /// + /// using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + /// TaskOptions options = new TaskOptions { CancellationToken = cts.Token }; + /// + /// try + /// { + /// string result = await context.CallActivityAsync<string>("MyActivity", "input", options); + /// } + /// catch (TaskCanceledException) + /// { + /// // Handle cancellation + /// } + /// + /// + /// + /// Example of using cancellation with retry logic: + /// + /// using CancellationTokenSource cts = new CancellationTokenSource(); + /// TaskOptions options = new TaskOptions + /// { + /// Retry = TaskRetryOptions.FromRetryHandler(retryContext => + /// { + /// if (retryContext.CancellationToken.IsCancellationRequested) + /// { + /// return false; // Stop retrying + /// } + /// return retryContext.LastAttemptNumber < 3; + /// }), + /// CancellationToken = cts.Token + /// }; + /// + /// await context.CallActivityAsync("MyActivity", "input", options); + /// + /// + /// + public CancellationToken CancellationToken { get; init; } + + /// + /// Returns a new from the provided . + /// + /// The policy to convert from. + /// A built from the policy. + public static TaskOptions FromRetryPolicy(RetryPolicy policy) => new(policy); + + /// + /// Returns a new from the provided . + /// + /// The handler to convert from. + /// A built from the handler. + public static TaskOptions FromRetryHandler(AsyncRetryHandler handler) => new(handler); + + /// + /// Returns a new from the provided . + /// + /// The handler to convert from. + /// A built from the handler. + public static TaskOptions FromRetryHandler(RetryHandler handler) => new(handler); + + /// + /// Returns a new with the provided instance ID. This can be used when + /// starting a new sub-orchestration to specify the instance ID. + /// + /// The instance ID to use. + /// A new . + public SubOrchestrationOptions WithInstanceId(string instanceId) => new(this, instanceId); +} + +/// +/// Options that can be used to control the behavior of orchestrator task execution. This derived type can be used to +/// supply extra options for orchestrations. +/// +public record SubOrchestrationOptions : TaskOptions +{ + /// + /// Initializes a new instance of the class. + /// + /// The task retry options. + /// The orchestration instance ID. + public SubOrchestrationOptions(TaskRetryOptions? retry = null, string? instanceId = null) + : base(retry) + { + this.InstanceId = instanceId; + } + + /// + /// Initializes a new instance of the class. + /// + /// The task options to wrap. + /// The orchestration instance ID. + public SubOrchestrationOptions(TaskOptions options, string? instanceId = null) + : base(options) + { + this.InstanceId = instanceId; + if (options is SubOrchestrationOptions derived) + { + if (instanceId is null) + { + this.InstanceId = derived.InstanceId; + } + + this.Version = derived.Version; + } + } + + /// + /// Initializes a new instance of the class by copying from another instance. + /// + /// The sub-orchestration options to copy from. + public SubOrchestrationOptions(SubOrchestrationOptions options) + : base(options) + { + Check.NotNull(options); + this.InstanceId = options.InstanceId; + this.Version = options.Version; + } + + /// + /// Gets the orchestration instance ID. + /// + public string? InstanceId { get; init; } + + /// + /// Gets the version to associate with the sub-orchestration instance. + /// + public TaskVersion? Version { get; init; } +} + +/// +/// Options for submitting new orchestrations via the client. +/// +public record StartOrchestrationOptions +{ + /// + /// Initializes a new instance of the class. + /// + /// + /// The unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. + /// + /// + /// The time when the orchestration instance should start executing. If not specified or if a date-time in the past + /// is specified, the orchestration instance will be scheduled immediately. + /// +#pragma warning disable SA1313 // Parameter names should begin with lower-case letter - using PascalCase to maintain backward compatibility with positional record syntax + public StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null) +#pragma warning restore SA1313 + { + this.InstanceId = InstanceId; + this.StartAt = StartAt; + } + + /// + /// Initializes a new instance of the class by copying from another instance. + /// + /// The start orchestration options to copy from. + public StartOrchestrationOptions(StartOrchestrationOptions options) + { + Check.NotNull(options); + this.InstanceId = options.InstanceId; + this.StartAt = options.StartAt; + this.Tags = options.Tags; + this.Version = options.Version; + this.DedupeStatuses = options.DedupeStatuses; + } + + /// + /// Gets the unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. + /// + public string? InstanceId { get; init; } + + /// + /// Gets the time when the orchestration instance should start executing. If not specified or if a date-time in the past + /// is specified, the orchestration instance will be scheduled immediately. + /// + public DateTimeOffset? StartAt { get; init; } + + /// + /// Gets the tags to associate with the orchestration instance. + /// + public IReadOnlyDictionary Tags { get; init; } = ImmutableDictionary.Create(); + + /// + /// Gets the version to associate with the orchestration instance. + /// + public TaskVersion? Version { get; init; } + + /// + /// Gets the orchestration runtime statuses that should be considered for deduplication. + /// + /// + /// For type-safe usage, use the WithDedupeStatuses extension method. + /// + public IReadOnlyList? DedupeStatuses { get; init; } +} From 37d523f44125857b7d6ec4974aac801e56be6679 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 7 Jan 2026 20:20:09 +0000 Subject: [PATCH 07/11] Remove ineffective WaitForTaskWithCancellation method and clarify documentation As correctly identified in review, WaitForTaskWithCancellation was ineffective because: - Once ScheduleTask is called, the orchestrator yields execution - During the yield, orchestrator code isn't running, so cancellation callbacks can't fire - Task.WhenAny would always return the activity/sub-orchestrator task, never the cancellation task Changes: - Removed WaitForTaskWithCancellation method entirely - Simplified CallActivityAsync and CallSubOrchestratorAsync to directly await scheduled tasks - Added clarifying comments about the pre-check being the only cancellation point - Updated CancellationToken documentation to accurately describe behavior and limitations - Emphasized that cancellation only works pre-scheduling and in retry handlers, not mid-execution Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- src/Abstractions/TaskOptions.cs | 37 ++++++++---- .../Shims/TaskOrchestrationContextWrapper.cs | 58 ++++--------------- 2 files changed, 35 insertions(+), 60 deletions(-) diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index ca076fd5e..cb94ac9b7 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -57,31 +57,44 @@ public TaskOptions(TaskOptions options) /// /// /// - /// When provided, the cancellation token can be used to cancel activities, sub-orchestrators, and retry logic. - /// Cancellation is cooperative, meaning the task will not be cancelled immediately, but rather at the next - /// opportunity when the orchestrator checks the token status. + /// The cancellation token provides cooperative cancellation for activities, sub-orchestrators, and retry logic. + /// Due to the durable orchestrator execution model, cancellation only occurs at specific points when the + /// orchestrator code is executing. /// /// - /// For activities, if the token is cancelled before the activity completes, the activity will not be scheduled - /// or, if already running, the result will be ignored and a will be thrown. + /// Cancellation behavior: /// /// - /// For sub-orchestrators, if the token is cancelled before the sub-orchestrator completes, the result will be - /// ignored and a will be thrown. Note that cancelling the parent's token - /// does not terminate the sub-orchestrator instance. + /// 1. Pre-scheduling check: If the token is cancelled before calling + /// CallActivityAsync or CallSubOrchestratorAsync, a is thrown + /// immediately without scheduling the task. /// /// - /// For retry handlers, the cancellation token is passed to the retry handler via the , - /// allowing the handler to check for cancellation and stop retrying if needed. + /// 2. Retry handlers: The cancellation token is passed to custom retry handlers via + /// , allowing them to check for cancellation and stop retrying between attempts. + /// + /// + /// Important limitation: Once an activity or sub-orchestrator is scheduled, the orchestrator + /// yields execution and waits for the task to complete. During this yield period, the orchestrator code is not + /// running, so it cannot respond to cancellation requests. Cancelling the token while waiting will not wake up + /// the orchestrator or cancel the waiting task. This is a fundamental limitation of the durable orchestrator + /// execution model. + /// + /// + /// Note: Cancelling a parent orchestrator's token does not terminate sub-orchestrator instances that have + /// already been scheduled. /// /// - /// Example of cancelling an activity after a timeout: + /// Example of pre-scheduling cancellation: /// - /// using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + /// using CancellationTokenSource cts = new CancellationTokenSource(); + /// cts.Cancel(); // Cancel before scheduling + /// /// TaskOptions options = new TaskOptions { CancellationToken = cts.Token }; /// /// try /// { + /// // This will throw TaskCanceledException without scheduling the activity /// string result = await context.CallActivityAsync<string>("MyActivity", "input", options); /// } /// catch (TaskCanceledException) diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index b3f17b376..5434ab015 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -154,7 +154,9 @@ public override async Task CallActivityAsync( cancellationToken = callActivityOptions.CancellationToken; } - // If cancellation was requested before starting, return a cancelled task immediately + // If cancellation was requested before starting, throw immediately + // Note: Once the activity is scheduled, the orchestrator yields and cannot respond to + // cancellation until it resumes, so this pre-check is the only cancellation point. if (cancellationToken.IsCancellationRequested) { throw new TaskCanceledException("The task was cancelled before it could be scheduled."); @@ -163,7 +165,7 @@ public override async Task CallActivityAsync( #pragma warning disable 0618 if (options?.Retry?.Policy is RetryPolicy policy) { - Task activityTask = this.innerContext.ScheduleTask( + return await this.innerContext.ScheduleTask( name.Name, this.innerContext.Version, options: ScheduleTaskOptions.CreateBuilder() @@ -171,8 +173,6 @@ public override async Task CallActivityAsync( .WithTags(tags) .Build(), parameters: input); - - return await this.WaitForTaskWithCancellation(activityTask, cancellationToken); } else if (options?.Retry?.Handler is AsyncRetryHandler handler) { @@ -190,15 +190,13 @@ public override async Task CallActivityAsync( } else { - Task activityTask = this.innerContext.ScheduleTask( + return await this.innerContext.ScheduleTask( name.Name, this.innerContext.Version, options: ScheduleTaskOptions.CreateBuilder() .WithTags(tags) .Build(), parameters: input); - - return await this.WaitForTaskWithCancellation(activityTask, cancellationToken); } } catch (global::DurableTask.Core.Exceptions.TaskFailedException e) @@ -231,7 +229,9 @@ public override async Task CallSubOrchestratorAsync( CancellationToken cancellationToken = options?.CancellationToken ?? default; - // If cancellation was requested before starting, return a cancelled task immediately + // If cancellation was requested before starting, throw immediately + // Note: Once the sub-orchestrator is scheduled, the orchestrator yields and cannot respond to + // cancellation until it resumes, so this pre-check is the only cancellation point. if (cancellationToken.IsCancellationRequested) { throw new TaskCanceledException("The sub-orchestrator was cancelled before it could be scheduled."); @@ -241,15 +241,13 @@ public override async Task CallSubOrchestratorAsync( { if (options?.Retry?.Policy is RetryPolicy policy) { - Task subOrchestratorTask = this.innerContext.CreateSubOrchestrationInstanceWithRetry( + return await this.innerContext.CreateSubOrchestrationInstanceWithRetry( orchestratorName.Name, version, instanceId, policy.ToDurableTaskCoreRetryOptions(), input, options.Tags); - - return await this.WaitForTaskWithCancellation(subOrchestratorTask, cancellationToken); } else if (options?.Retry?.Handler is AsyncRetryHandler handler) { @@ -266,14 +264,12 @@ public override async Task CallSubOrchestratorAsync( } else { - Task subOrchestratorTask = this.innerContext.CreateSubOrchestrationInstance( + return await this.innerContext.CreateSubOrchestrationInstance( orchestratorName.Name, version, instanceId, input, options?.Tags); - - return await this.WaitForTaskWithCancellation(subOrchestratorTask, cancellationToken); } } catch (global::DurableTask.Core.Exceptions.SubOrchestrationFailedException e) @@ -549,40 +545,6 @@ async Task InvokeWithCustomRetryHandler( } } - async Task WaitForTaskWithCancellation(Task task, CancellationToken cancellationToken) - { - // If no cancellation token provided or it can't be cancelled, just await the task - if (!cancellationToken.CanBeCanceled) - { - return await task; - } - - // If the task is already completed, return immediately without setting up cancellation infrastructure - if (task.IsCompleted) - { - return await task; - } - - // Create a cancellation task that completes when the token is cancelled - TaskCompletionSource cancellationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); - using CancellationTokenRegistration registration = cancellationToken.Register(() => - { - cancellationTcs.TrySetCanceled(cancellationToken); - }); - - // Wait for either the task to complete or cancellation - Task completedTask = await Task.WhenAny(task, cancellationTcs.Task); - - // If cancellation won, throw TaskCanceledException - if (completedTask == cancellationTcs.Task) - { - throw new TaskCanceledException("The task was cancelled."); - } - - // Otherwise return the result of the completed task - return await task; - } - // The default version can come from two different places depending on the context of the invocation. string GetDefaultVersion() { From 0e911a1dff4e0ec928867d27489721d5a3685700 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 7 Jan 2026 21:06:00 +0000 Subject: [PATCH 08/11] Add test for cancellation token cancelled outside retry handler Added RetryHandlerReceivesCancelledTokenFromOutside test to verify behavior when: - A CancellationToken is provided to TaskOptions with retry logic - The token is cancelled OUTSIDE the retry handler (in orchestrator code) - The pre-scheduling check catches the cancelled token before scheduling This test demonstrates that when a token is cancelled before CallActivityAsync is invoked, the pre-scheduling cancellation check throws TaskCanceledException immediately, and the retry handler is never called. This is the expected behavior given the orchestrator execution model. Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- .../CancellationTests.cs | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/test/Grpc.IntegrationTests/CancellationTests.cs b/test/Grpc.IntegrationTests/CancellationTests.cs index 3ead3fa2b..f19d5f67c 100644 --- a/test/Grpc.IntegrationTests/CancellationTests.cs +++ b/test/Grpc.IntegrationTests/CancellationTests.cs @@ -248,4 +248,85 @@ public async Task RetryHandlerCanStopOnCancellation() Assert.Equal(2, maxAttempts); // Should stop after 2 attempts due to cancellation Assert.Equal("\"Stopped after 2 attempts\"", metadata.SerializedOutput); } + + /// + /// Tests that when a token is cancelled outside the retry handler (between retry attempts), + /// the handler receives the cancelled token on the next attempt. + /// + [Fact] + public async Task RetryHandlerReceivesCancelledTokenFromOutside() + { + TaskName orchestratorName = nameof(RetryHandlerReceivesCancelledTokenFromOutside); + + int attemptCount = 0; + bool tokenWasCancelledInHandler = false; + CancellationTokenSource? cts = null; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + cts = new CancellationTokenSource(); + + TaskRetryOptions retryOptions = TaskOptions.FromRetryHandler(retryContext => + { + attemptCount = retryContext.LastAttemptNumber; + + // Check if token is cancelled + tokenWasCancelledInHandler = retryContext.CancellationToken.IsCancellationRequested; + + // Stop retrying if cancelled + if (retryContext.CancellationToken.IsCancellationRequested) + { + return false; + } + + return attemptCount < 5; + }).Retry!; + + TaskOptions options = new(retryOptions) + { + CancellationToken = cts.Token + }; + + // Cancel the token AFTER creating options but BEFORE first attempt + // This tests that the retry handler receives the cancelled token from outside + cts.Cancel(); + + try + { + await ctx.CallActivityAsync("FailingActivity", options); + return "Should not reach here - activity succeeded"; + } + catch (TaskCanceledException) + { + // Pre-scheduling check caught the cancelled token before even attempting + return $"Cancelled before scheduling, attempts: {attemptCount}"; + } + catch (TaskFailedException) + { + // Activity failed and retry handler stopped retrying + return $"Failed after {attemptCount} attempts, token was cancelled in handler: {tokenWasCancelledInHandler}"; + } + }) + .AddActivityFunc("FailingActivity", (TaskActivityContext activityContext) => + { + throw new InvalidOperationException("Activity always fails"); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + // Since token was cancelled before CallActivityAsync, the pre-scheduling check throws + // TaskCanceledException and retry handler never gets called + Assert.Equal(0, attemptCount); + Assert.Contains("Cancelled before scheduling", metadata.SerializedOutput); + } } From a34d209f1ce8d100c55a26624c9061a713cf337e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 7 Jan 2026 21:12:50 +0000 Subject: [PATCH 09/11] Add test for multiple activities in a loop with cancellation Added MultipleActivitiesInLoopWithCancellation test to demonstrate the main use case for pre-scheduling cancellation: - Orchestrator calls multiple activities in a loop - Token is cancelled after processing 3 items - Pre-scheduling check prevents remaining activities from being scheduled - Loop exits cleanly with cancellation message This test validates that cancellation in loops works as expected, allowing orchestrators to stop processing when a cancellation is requested. Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- .../CancellationTests.cs | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/test/Grpc.IntegrationTests/CancellationTests.cs b/test/Grpc.IntegrationTests/CancellationTests.cs index f19d5f67c..c8e719379 100644 --- a/test/Grpc.IntegrationTests/CancellationTests.cs +++ b/test/Grpc.IntegrationTests/CancellationTests.cs @@ -329,4 +329,75 @@ public async Task RetryHandlerReceivesCancelledTokenFromOutside() Assert.Equal(0, attemptCount); Assert.Contains("Cancelled before scheduling", metadata.SerializedOutput); } + + /// + /// Tests that when calling multiple activities in a loop with a cancellation token, + /// the loop exits after cancellation instead of continuing to call remaining activities. + /// This is the main use case for pre-scheduling cancellation checks. + /// + [Fact] + public async Task MultipleActivitiesInLoopWithCancellation() + { + TaskName orchestratorName = nameof(MultipleActivitiesInLoopWithCancellation); + TaskName activityName = "ProcessItem"; + + int activitiesInvoked = 0; + int totalItems = 10; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(); + TaskOptions options = new() { CancellationToken = cts.Token }; + + List results = new(); + + for (int i = 0; i < totalItems; i++) + { + // Cancel after processing 3 items + if (i == 3) + { + cts.Cancel(); + } + + try + { + string result = await ctx.CallActivityAsync(activityName, i, options); + results.Add(result); + } + catch (TaskCanceledException) + { + // Pre-scheduling check caught cancellation - exit loop + results.Add($"Cancelled at item {i}"); + break; + } + } + + return $"Processed {results.Count} items: [{string.Join(", ", results)}]"; + }) + .AddActivityFunc(activityName, (TaskActivityContext ctx, int item) => + { + Interlocked.Increment(ref activitiesInvoked); + return $"Item {item}"; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + // Should have processed 3 items (0, 1, 2) before cancellation at item 3 + Assert.Equal(3, activitiesInvoked); + Assert.Contains("Processed 4 items", metadata.SerializedOutput); // 3 successful + 1 cancellation message + Assert.Contains("Cancelled at item 3", metadata.SerializedOutput); + Assert.Contains("Item 0", metadata.SerializedOutput); + Assert.Contains("Item 1", metadata.SerializedOutput); + Assert.Contains("Item 2", metadata.SerializedOutput); + } } From 4c67f0fcc90da30667bca51b42974ebc8e03c9f4 Mon Sep 17 00:00:00 2001 From: wangbill Date: Mon, 16 Mar 2026 10:21:13 -0700 Subject: [PATCH 10/11] Potential fix for pull request finding 'Missing Dispose call on local IDisposable' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> --- test/Grpc.IntegrationTests/CancellationTests.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/Grpc.IntegrationTests/CancellationTests.cs b/test/Grpc.IntegrationTests/CancellationTests.cs index c8e719379..d71a43b7e 100644 --- a/test/Grpc.IntegrationTests/CancellationTests.cs +++ b/test/Grpc.IntegrationTests/CancellationTests.cs @@ -260,14 +260,13 @@ public async Task RetryHandlerReceivesCancelledTokenFromOutside() int attemptCount = 0; bool tokenWasCancelledInHandler = false; - CancellationTokenSource? cts = null; await using HostTestLifetime server = await this.StartWorkerAsync(b => { b.AddTasks(tasks => tasks .AddOrchestratorFunc(orchestratorName, async ctx => { - cts = new CancellationTokenSource(); + using CancellationTokenSource cts = new(); TaskRetryOptions retryOptions = TaskOptions.FromRetryHandler(retryContext => { From 68096dfb662aa374fcce72a8b3baa0699fa1b179 Mon Sep 17 00:00:00 2001 From: wangbill Date: Mon, 16 Mar 2026 13:09:51 -0700 Subject: [PATCH 11/11] Fix cancellation review issues: consistent check placement, improve tests - Move cancellation pre-check outside try block in CallActivityAsync for consistency with CallSubOrchestratorAsync - Rename misleading RetryHandlerReceivesCancelledTokenFromOutside to PreSchedulingCancellationCheck to match actual behavior - Fix CancellationTokenSource disposal in renamed test - Add SubOrchestratorRetryHandlerCanStopOnCancellation test for sub-orchestrator retry + cancellation coverage --- .../Shims/TaskOrchestrationContextWrapper.cs | 34 +++--- .../CancellationTests.cs | 109 +++++++++++++----- 2 files changed, 97 insertions(+), 46 deletions(-) diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 5434ab015..f3a666899 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -140,28 +140,28 @@ public override async Task CallActivityAsync( nameof(input)); } - try + IDictionary tags = ImmutableDictionary.Empty; + CancellationToken cancellationToken = default; + if (options is TaskOptions callActivityOptions) { - IDictionary tags = ImmutableDictionary.Empty; - CancellationToken cancellationToken = default; - if (options is TaskOptions callActivityOptions) + if (callActivityOptions.Tags is not null) { - if (callActivityOptions.Tags is not null) - { - tags = callActivityOptions.Tags; - } - - cancellationToken = callActivityOptions.CancellationToken; + tags = callActivityOptions.Tags; } - // If cancellation was requested before starting, throw immediately - // Note: Once the activity is scheduled, the orchestrator yields and cannot respond to - // cancellation until it resumes, so this pre-check is the only cancellation point. - if (cancellationToken.IsCancellationRequested) - { - throw new TaskCanceledException("The task was cancelled before it could be scheduled."); - } + cancellationToken = callActivityOptions.CancellationToken; + } + // If cancellation was requested before starting, throw immediately + // Note: Once the activity is scheduled, the orchestrator yields and cannot respond to + // cancellation until it resumes, so this pre-check is the only cancellation point. + if (cancellationToken.IsCancellationRequested) + { + throw new TaskCanceledException("The task was cancelled before it could be scheduled."); + } + + try + { #pragma warning disable 0618 if (options?.Retry?.Policy is RetryPolicy policy) { diff --git a/test/Grpc.IntegrationTests/CancellationTests.cs b/test/Grpc.IntegrationTests/CancellationTests.cs index d71a43b7e..921db482e 100644 --- a/test/Grpc.IntegrationTests/CancellationTests.cs +++ b/test/Grpc.IntegrationTests/CancellationTests.cs @@ -250,16 +250,15 @@ public async Task RetryHandlerCanStopOnCancellation() } /// - /// Tests that when a token is cancelled outside the retry handler (between retry attempts), - /// the handler receives the cancelled token on the next attempt. + /// Tests that pre-scheduling cancellation check catches a token cancelled + /// after options creation but before calling CallActivityAsync. /// [Fact] - public async Task RetryHandlerReceivesCancelledTokenFromOutside() + public async Task PreSchedulingCancellationCheck() { - TaskName orchestratorName = nameof(RetryHandlerReceivesCancelledTokenFromOutside); + TaskName orchestratorName = nameof(PreSchedulingCancellationCheck); int attemptCount = 0; - bool tokenWasCancelledInHandler = false; await using HostTestLifetime server = await this.StartWorkerAsync(b => { @@ -271,16 +270,6 @@ public async Task RetryHandlerReceivesCancelledTokenFromOutside() TaskRetryOptions retryOptions = TaskOptions.FromRetryHandler(retryContext => { attemptCount = retryContext.LastAttemptNumber; - - // Check if token is cancelled - tokenWasCancelledInHandler = retryContext.CancellationToken.IsCancellationRequested; - - // Stop retrying if cancelled - if (retryContext.CancellationToken.IsCancellationRequested) - { - return false; - } - return attemptCount < 5; }).Retry!; @@ -289,25 +278,18 @@ public async Task RetryHandlerReceivesCancelledTokenFromOutside() CancellationToken = cts.Token }; - // Cancel the token AFTER creating options but BEFORE first attempt - // This tests that the retry handler receives the cancelled token from outside + // Cancel the token AFTER creating options but BEFORE calling CallActivityAsync cts.Cancel(); try { await ctx.CallActivityAsync("FailingActivity", options); - return "Should not reach here - activity succeeded"; + return "Should not reach here"; } catch (TaskCanceledException) { - // Pre-scheduling check caught the cancelled token before even attempting return $"Cancelled before scheduling, attempts: {attemptCount}"; } - catch (TaskFailedException) - { - // Activity failed and retry handler stopped retrying - return $"Failed after {attemptCount} attempts, token was cancelled in handler: {tokenWasCancelledInHandler}"; - } }) .AddActivityFunc("FailingActivity", (TaskActivityContext activityContext) => { @@ -322,9 +304,8 @@ public async Task RetryHandlerReceivesCancelledTokenFromOutside() Assert.NotNull(metadata); Assert.Equal(instanceId, metadata.InstanceId); Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - - // Since token was cancelled before CallActivityAsync, the pre-scheduling check throws - // TaskCanceledException and retry handler never gets called + + // Pre-scheduling check throws TaskCanceledException; retry handler never gets called Assert.Equal(0, attemptCount); Assert.Contains("Cancelled before scheduling", metadata.SerializedOutput); } @@ -390,7 +371,7 @@ public async Task MultipleActivitiesInLoopWithCancellation() Assert.NotNull(metadata); Assert.Equal(instanceId, metadata.InstanceId); Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - + // Should have processed 3 items (0, 1, 2) before cancellation at item 3 Assert.Equal(3, activitiesInvoked); Assert.Contains("Processed 4 items", metadata.SerializedOutput); // 3 successful + 1 cancellation message @@ -399,4 +380,74 @@ public async Task MultipleActivitiesInLoopWithCancellation() Assert.Contains("Item 1", metadata.SerializedOutput); Assert.Contains("Item 2", metadata.SerializedOutput); } -} + + /// + /// Tests that the cancellation token is passed to the retry handler for sub-orchestrators, + /// allowing the handler to stop retrying when cancelled. + /// + [Fact] + public async Task SubOrchestratorRetryHandlerCanStopOnCancellation() + { + TaskName orchestratorName = nameof(SubOrchestratorRetryHandlerCanStopOnCancellation); + TaskName subOrchestratorName = "FailingSubOrchestrator"; + + int maxAttempts = 0; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(); + + TaskRetryOptions retryOptions = TaskOptions.FromRetryHandler(retryContext => + { + maxAttempts = retryContext.LastAttemptNumber; + + // Cancel after second attempt + if (maxAttempts == 2) + { + cts.Cancel(); + } + + // Stop retrying if cancelled + if (retryContext.CancellationToken.IsCancellationRequested) + { + return false; + } + + return maxAttempts < 10; + }).Retry!; + + TaskOptions options = new(retryOptions) + { + CancellationToken = cts.Token + }; + + try + { + await ctx.CallSubOrchestratorAsync(subOrchestratorName, options: options); + return "Should not reach here"; + } + catch (TaskFailedException) + { + return $"Stopped after {maxAttempts} attempts"; + } + }) + .AddOrchestratorFunc(subOrchestratorName, ctx => + { + throw new InvalidOperationException("Sub-orchestrator always fails"); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(2, maxAttempts); // Should stop after 2 attempts due to cancellation + Assert.Equal("\"Stopped after 2 attempts\"", metadata.SerializedOutput); + } +} \ No newline at end of file