From a5cf61c4fc4a5e83d68997df710eed51cd306015 Mon Sep 17 00:00:00 2001 From: Eric Gold Date: Mon, 9 Mar 2026 16:53:37 -0700 Subject: [PATCH 01/12] Add Checkpoint member to SuperStepStartInfo --- .../Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs index 94ccb06dd1..c53613b68e 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs @@ -18,4 +18,11 @@ public sealed class SuperStepStartInfo(HashSet? sendingExecutors = null) /// Gets a value indicating whether there are any external messages queued during the previous SuperStep. /// public bool HasExternalMessages { get; init; } + + /// + /// Gets the corresponding to the checkpoint restored at the start of this SuperStep, if any. + /// if checkpointing was not enabled when the run was started. + /// + public CheckpointInfo? Checkpoint { get; init; } + } From a81c3e2961363d97cc9540b40533f19a9d1d3444 Mon Sep 17 00:00:00 2001 From: Eric Gold Date: Mon, 9 Mar 2026 17:50:08 -0700 Subject: [PATCH 02/12] initial proposed solution from Claude. Causes unit tests to fail, likely b/c the test definitions are now wrong --- .../InProc/InProcStepTracer.cs | 5 +++-- .../Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs | 6 +++++- .../src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs | 1 - 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcStepTracer.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcStepTracer.cs index 0affd47f00..2b71a483f8 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcStepTracer.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcStepTracer.cs @@ -31,7 +31,7 @@ internal sealed class InProcStepTracer : IStepTracer /// The Step Number of the last SuperStep. Note that Step Numbers are 0-indexed. public void Reload(int lastStepNumber = 0) => this._nextStepNumber = lastStepNumber + 1; - public SuperStepStartedEvent Advance(StepContext step) + public SuperStepStartedEvent Advance(StepContext step, CheckpointInfo? startCheckpoint = null) { this._nextStepNumber++; this.Activated.Clear(); @@ -57,7 +57,8 @@ public SuperStepStartedEvent Advance(StepContext step) return new SuperStepStartedEvent(this.StepNumber, new SuperStepStartInfo(sendingExecutors) { - HasExternalMessages = hasExternalMessages + HasExternalMessages = hasExternalMessages, + Checkpoint = startCheckpoint }); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs index 2a61f80ced..1c1120c62d 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs @@ -244,7 +244,11 @@ await executor.ExecuteCoreAsync( private async ValueTask RunSuperstepAsync(StepContext currentStep, CancellationToken cancellationToken) { - await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep)).ConfigureAwait(false); + // Save a checkpoint before the superstep executes, capturing the pre-delivery state. + await this.CheckpointAsync(cancellationToken).ConfigureAwait(false); + CheckpointInfo? startCheckpoint = this.StepTracer.Checkpoint; + + await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep, startCheckpoint)).ConfigureAwait(false); // Deliver the messages and queue the next step List receiverTasks = diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs index c53613b68e..ffa4852bd6 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs @@ -24,5 +24,4 @@ public sealed class SuperStepStartInfo(HashSet? sendingExecutors = null) /// if checkpointing was not enabled when the run was started. /// public CheckpointInfo? Checkpoint { get; init; } - } From d1b44498f44f67b04e6bbc3848803d8c95c69fa8 Mon Sep 17 00:00:00 2001 From: Eric Gold Date: Mon, 9 Mar 2026 18:07:16 -0700 Subject: [PATCH 03/12] Fix Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync definition --- .../CheckpointParentTests.cs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs index 0ecf3c993f..0f281b0d2f 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs @@ -38,7 +38,11 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi List checkpoints = []; await foreach (WorkflowEvent evt in run.WatchStreamAsync()) { - if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) + { + checkpoints.Add(startCp); + } + else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) { checkpoints.Add(cp); } @@ -51,6 +55,13 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi Checkpoint storedFirst = await ((ICheckpointManager)checkpointManager) .LookupCheckpointAsync(firstCheckpoint.SessionId, firstCheckpoint); storedFirst.Parent.Should().BeNull("the first checkpoint should have no parent"); + + // Assert: The second checkpoint should have 1 parent, the first checkpoint. + CheckpointInfo secondCheckpoint = checkpoints[1]; + Checkpoint storedSecond = await ((ICheckpointManager)checkpointManager) + .LookupCheckpointAsync(secondCheckpoint.SessionId, secondCheckpoint); + storedSecond.Parent.Should().NotBeNull("the second checkpoint should have a parent"); + storedSecond.Parent.Should().Be(firstCheckpoint, "the second checkpoint's parent should be the first checkpoint"); } [Theory] From 151f7db7b9888485aca4706c36752b866f5828c9 Mon Sep 17 00:00:00 2001 From: Eric Gold Date: Mon, 9 Mar 2026 18:26:17 -0700 Subject: [PATCH 04/12] fix Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync definition --- .../CheckpointParentTests.cs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs index 0f281b0d2f..086f199e6d 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs @@ -90,13 +90,18 @@ internal async Task Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync(Exe await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) { - if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + if (checkpoints.Count >= 3) + { + cts.Cancel(); + } + + if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) + { + checkpoints.Add(startCp); + } + else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) { checkpoints.Add(cp); - if (checkpoints.Count >= 3) - { - cts.Cancel(); - } } } From 30380d67bd9cedca3b7e03ca7a0560ef29cdffe8 Mon Sep 17 00:00:00 2001 From: Eric Gold Date: Tue, 10 Mar 2026 11:46:41 -0700 Subject: [PATCH 05/12] update Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAsync definition. I think this is correct, but the test now hangs, likely because the first checkpoint (or it's related data structures) we're now creating aren't initialized correctly. --- .../CheckpointParentTests.cs | 40 ++++++++++++------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs index 086f199e6d..55b8102e1a 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs @@ -90,11 +90,6 @@ internal async Task Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync(Exe await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) { - if (checkpoints.Count >= 3) - { - cts.Cancel(); - } - if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) { checkpoints.Add(startCp); @@ -103,6 +98,11 @@ internal async Task Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync(Exe { checkpoints.Add(cp); } + + if (checkpoints.Count >= 3) + { + cts.Cancel(); + } } // Assert: We should have at least 3 checkpoints @@ -148,13 +148,18 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs using CancellationTokenSource cts = new(); await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) { - if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) + { + firstRunCheckpoints.Add(startCp); + } + else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) { firstRunCheckpoints.Add(cp); - if (firstRunCheckpoints.Count >= 2) - { - cts.Cancel(); - } + } + + if (firstRunCheckpoints.Count >= 2) + { + cts.Cancel(); } } @@ -171,13 +176,18 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs using CancellationTokenSource cts2 = new(); await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token)) { - if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) + { + resumedCheckpoints.Add(startCp); + } + else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) { resumedCheckpoints.Add(cp); - if (resumedCheckpoints.Count >= 1) - { - cts2.Cancel(); - } + } + + if (resumedCheckpoints.Count >= 1) + { + cts2.Cancel(); } } From 61fb686bceac73f78b33caf67447718ab06e9801 Mon Sep 17 00:00:00 2001 From: Eric Gold Date: Tue, 10 Mar 2026 12:17:26 -0700 Subject: [PATCH 06/12] All unit tests now passing. --- .../InProc/InProcessRunner.cs | 9 ++++++--- .../InProc/InProcessRunnerContext.cs | 6 ++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs index 1c1120c62d..56047a036b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs @@ -245,7 +245,7 @@ await executor.ExecuteCoreAsync( private async ValueTask RunSuperstepAsync(StepContext currentStep, CancellationToken cancellationToken) { // Save a checkpoint before the superstep executes, capturing the pre-delivery state. - await this.CheckpointAsync(cancellationToken).ConfigureAwait(false); + await this.CheckpointAsync(currentStep, cancellationToken).ConfigureAwait(false); CheckpointInfo? startCheckpoint = this.StepTracer.Checkpoint; await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep, startCheckpoint)).ConfigureAwait(false); @@ -282,7 +282,7 @@ await this.RaiseWorkflowEventAsync(this.StepTracer.Complete(this.RunContext.Next private WorkflowInfo? _workflowInfoCache; private CheckpointInfo? _lastCheckpointInfo; private readonly List _checkpoints = []; - internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = default) + internal async ValueTask CheckpointAsync(StepContext? queuedMessagesOverride, CancellationToken cancellationToken = default) { this.RunContext.CheckEnded(); if (this.CheckpointManager is null) @@ -303,7 +303,7 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d await prepareTask.ConfigureAwait(false); await this.RunContext.StateManager.PublishUpdatesAsync(this.StepTracer).ConfigureAwait(false); - RunnerStateData runnerData = await this.RunContext.ExportStateAsync().ConfigureAwait(false); + RunnerStateData runnerData = await this.RunContext.ExportStateAsync(queuedMessagesOverride).ConfigureAwait(false); Dictionary stateData = await this.RunContext.StateManager.ExportStateAsync().ConfigureAwait(false); Checkpoint checkpoint = new(this.StepTracer.StepNumber, this._workflowInfoCache, runnerData, stateData, edgeData, this._lastCheckpointInfo); @@ -312,6 +312,9 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d this._checkpoints.Add(this._lastCheckpointInfo); } + internal ValueTask CheckpointAsync(CancellationToken cancellationToken = default) + => this.CheckpointAsync(null, cancellationToken); + public async ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default) { this.RunContext.CheckEnded(); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index eda7b90a80..e3156715ce 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -379,11 +379,13 @@ async Task InvokeCheckpointRestoredAsync(Task executorTask) } } - internal ValueTask ExportStateAsync() + internal ValueTask ExportStateAsync(StepContext? queuedMessagesOverride = null) { this.CheckEnded(); - Dictionary> queuedMessages = this._nextStep.ExportMessages(); + StepContext queuedMessagesSource = queuedMessagesOverride ?? this._nextStep; + Dictionary> queuedMessages = queuedMessagesSource.ExportMessages(); + RunnerStateData result = new(instantiatedExecutors: [.. this._executors.Keys], queuedMessages, outstandingRequests: [.. this._externalRequests.Values]); From 4c137362fe9b34d0f404e6d3969c833033597dac Mon Sep 17 00:00:00 2001 From: Eric Gold Date: Tue, 10 Mar 2026 12:58:27 -0700 Subject: [PATCH 07/12] Add a couple more unit tests as well as fix failed InProcessRun_StateShouldPersist_CheckpointedAsync test definition --- .../CheckpointParentTests.cs | 118 ++++++++++++++++++ .../InProcessStateTests.cs | 2 +- 2 files changed, 119 insertions(+), 1 deletion(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs index 55b8102e1a..489682636b 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs @@ -198,4 +198,122 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs storedResumed.Parent.Should().NotBeNull("checkpoint created after resume should have a parent"); storedResumed.Parent.Should().Be(resumePoint, "checkpoint after resume should reference the checkpoint we resumed from"); } + + [Theory] + [InlineData(ExecutionEnvironment.InProcess_Lockstep)] + [InlineData(ExecutionEnvironment.InProcess_OffThread)] + internal async Task Checkpoint_AfterResumeFromSuperstepStart_CountCheckpointsEmittedAsync(ExecutionEnvironment environment) + { + // Arrange: A basic workflow with 3 executor stages + ForwardMessageExecutor executorA = new("A"); + ForwardMessageExecutor executorB = new("B"); + ForwardMessageExecutor executorC = new("C"); + + Workflow workflow = new WorkflowBuilder(executorA) + .AddEdge(executorA, executorB) + .AddEdge(executorB, executorC) + .Build(); + + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); + + // First run: collect a checkpoint to resume from + await using StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); + + List firstRunCheckpoints = []; + using CancellationTokenSource cts = new(); + await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) + { + if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) + { + firstRunCheckpoints.Add(startCp); + } + } + + firstRunCheckpoints.Should().HaveCount(3); + CheckpointInfo resumePoint = firstRunCheckpoints[1]; + + // Dispose the first run to release workflow ownership before resuming. + await run.DisposeAsync(); + + // Act: Resume from the first checkpoint + StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); + + List resumedCheckpoints = []; + using CancellationTokenSource cts2 = new(); + await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token)) + { + if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) + { + resumedCheckpoints.Add(startCp); + } + else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + { + resumedCheckpoints.Add(cp); + } + } + + // Assert: The workflow should save the right number of checkpoints on re-run. + resumedCheckpoints.Should().NotBeEmpty(); + resumedCheckpoints.Should().HaveCount(4, "the resumed workflow has 2 executors to run, each generating 2 checkpoints"); + } + + [Theory] + [InlineData(ExecutionEnvironment.InProcess_Lockstep)] + [InlineData(ExecutionEnvironment.InProcess_OffThread)] + internal async Task Checkpoint_AfterResumeFromSuperstepCompleted_CountCheckpointsEmittedAsync(ExecutionEnvironment environment) + { + // Arrange: A basic workflow with 3 executor stages + ForwardMessageExecutor executorA = new("A"); + ForwardMessageExecutor executorB = new("B"); + ForwardMessageExecutor executorC = new("C"); + + Workflow workflow = new WorkflowBuilder(executorA) + .AddEdge(executorA, executorB) + .AddEdge(executorB, executorC) + .Build(); + + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); + + // First run: collect a checkpoint to resume from + await using StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); + + List firstRunCheckpoints = []; + using CancellationTokenSource cts = new(); + await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) + { + if (evt is SuperStepCompletedEvent completedEvent && completedEvent.CompletionInfo?.Checkpoint is { } startCp) + { + firstRunCheckpoints.Add(startCp); + } + } + + firstRunCheckpoints.Should().HaveCount(3); + CheckpointInfo resumePoint = firstRunCheckpoints[1]; + + // Dispose the first run to release workflow ownership before resuming. + await run.DisposeAsync(); + + // Act: Resume from the first checkpoint + StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); + + List resumedCheckpoints = []; + using CancellationTokenSource cts2 = new(); + await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token)) + { + if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) + { + resumedCheckpoints.Add(startCp); + } + else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + { + resumedCheckpoints.Add(cp); + } + } + + // Assert: The workflow should save the right number of checkpoints on re-run. + resumedCheckpoints.Should().NotBeEmpty(); + resumedCheckpoints.Should().HaveCount(2, "the resumed workflow has 1 executor to run, generating 2 checkpoints"); + } } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessStateTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessStateTests.cs index acffbdd336..d93c654730 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessStateTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessStateTests.cs @@ -133,7 +133,7 @@ public async Task InProcessRun_StateShouldPersist_CheckpointedAsync() Run checkpointed = await InProcessExecution.RunAsync(workflow, new(), CheckpointManager.Default); - checkpointed.Checkpoints.Should().HaveCount(4); + checkpointed.Checkpoints.Should().HaveCount(8); RunStatus status = await checkpointed.GetStatusAsync(); status.Should().Be(RunStatus.Idle); From 1f859cc7a960faf3cc5c477c0be33055c30bdb16 Mon Sep 17 00:00:00 2001 From: Eric Gold Date: Tue, 10 Mar 2026 14:07:33 -0700 Subject: [PATCH 08/12] Update dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs rename local variable Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../CheckpointParentTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs index 489682636b..42eb511faa 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs @@ -283,9 +283,9 @@ internal async Task Checkpoint_AfterResumeFromSuperstepCompleted_CountCheckpoint using CancellationTokenSource cts = new(); await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) { - if (evt is SuperStepCompletedEvent completedEvent && completedEvent.CompletionInfo?.Checkpoint is { } startCp) + if (evt is SuperStepCompletedEvent completedEvent && completedEvent.CompletionInfo?.Checkpoint is { } completedCp) { - firstRunCheckpoints.Add(startCp); + firstRunCheckpoints.Add(completedCp); } } From 4c44228c9afca51544ef5caaf5c7c6560a514238 Mon Sep 17 00:00:00 2001 From: Eric Gold Date: Tue, 10 Mar 2026 16:35:26 -0700 Subject: [PATCH 09/12] Address GitHubCopilot's comments on CheckpointParentTest.cs. All minor changes --- .../CheckpointParentTests.cs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs index 42eb511faa..867ace65d7 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs @@ -32,7 +32,7 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); // Act - StreamingRun run = + await using StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); List checkpoints = []; @@ -218,11 +218,10 @@ internal async Task Checkpoint_AfterResumeFromSuperstepStart_CountCheckpointsEmi InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); // First run: collect a checkpoint to resume from - await using StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); + StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); List firstRunCheckpoints = []; - using CancellationTokenSource cts = new(); - await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) { if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) { @@ -236,12 +235,11 @@ internal async Task Checkpoint_AfterResumeFromSuperstepStart_CountCheckpointsEmi // Dispose the first run to release workflow ownership before resuming. await run.DisposeAsync(); - // Act: Resume from the first checkpoint + // Act: Resume from the second checkpoint StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); List resumedCheckpoints = []; - using CancellationTokenSource cts2 = new(); - await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token)) + await foreach (WorkflowEvent evt in resumed.WatchStreamAsync()) { if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) { @@ -277,11 +275,10 @@ internal async Task Checkpoint_AfterResumeFromSuperstepCompleted_CountCheckpoint InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); // First run: collect a checkpoint to resume from - await using StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); + StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); List firstRunCheckpoints = []; - using CancellationTokenSource cts = new(); - await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) { if (evt is SuperStepCompletedEvent completedEvent && completedEvent.CompletionInfo?.Checkpoint is { } completedCp) { @@ -295,7 +292,7 @@ internal async Task Checkpoint_AfterResumeFromSuperstepCompleted_CountCheckpoint // Dispose the first run to release workflow ownership before resuming. await run.DisposeAsync(); - // Act: Resume from the first checkpoint + // Act: Resume from the second checkpoint StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); List resumedCheckpoints = []; From 9f4de9407f9ce890df1fcd2b63e57db4e7ee4df5 Mon Sep 17 00:00:00 2001 From: Eric Gold Date: Tue, 10 Mar 2026 17:29:53 -0700 Subject: [PATCH 10/12] minor suggestions from Copilot --- .../Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs | 2 +- .../CheckpointParentTests.cs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs index ffa4852bd6..a6fb85a35d 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs @@ -20,7 +20,7 @@ public sealed class SuperStepStartInfo(HashSet? sendingExecutors = null) public bool HasExternalMessages { get; init; } /// - /// Gets the corresponding to the checkpoint restored at the start of this SuperStep, if any. + /// Gets the corresponding to the checkpoint created at the start of this SuperStep, if any. /// if checkpointing was not enabled when the run was started. /// public CheckpointInfo? Checkpoint { get; init; } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs index 867ace65d7..801d8d73f0 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs @@ -142,7 +142,7 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); // First run: collect a checkpoint to resume from - await using StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); + StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); List firstRunCheckpoints = []; using CancellationTokenSource cts = new(); @@ -170,7 +170,7 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs await run.DisposeAsync(); // Act: Resume from the first checkpoint - StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); + await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); List resumedCheckpoints = []; using CancellationTokenSource cts2 = new(); @@ -236,7 +236,7 @@ internal async Task Checkpoint_AfterResumeFromSuperstepStart_CountCheckpointsEmi await run.DisposeAsync(); // Act: Resume from the second checkpoint - StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); + await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); List resumedCheckpoints = []; await foreach (WorkflowEvent evt in resumed.WatchStreamAsync()) @@ -293,7 +293,7 @@ internal async Task Checkpoint_AfterResumeFromSuperstepCompleted_CountCheckpoint await run.DisposeAsync(); // Act: Resume from the second checkpoint - StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); + await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); List resumedCheckpoints = []; using CancellationTokenSource cts2 = new(); From 386a5df8008ec22fd0447327929cf315376e7990 Mon Sep 17 00:00:00 2001 From: Eric Gold Date: Tue, 10 Mar 2026 20:58:17 -0700 Subject: [PATCH 11/12] resolve 1 more of Copilot's test comments --- .../CheckpointParentTests.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs index 801d8d73f0..f6b9186415 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs @@ -57,6 +57,7 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi storedFirst.Parent.Should().BeNull("the first checkpoint should have no parent"); // Assert: The second checkpoint should have 1 parent, the first checkpoint. + checkpoints.Should().HaveCountGreaterThanOrEqualTo(2, "multiple checkpoints should have been created, and we can't verify parent checkpoint without at least 2 checkpoints present."); CheckpointInfo secondCheckpoint = checkpoints[1]; Checkpoint storedSecond = await ((ICheckpointManager)checkpointManager) .LookupCheckpointAsync(secondCheckpoint.SessionId, secondCheckpoint); From 2fcf73230feb01143a5910392f3abb720751a11f Mon Sep 17 00:00:00 2001 From: Eric Gold Date: Tue, 10 Mar 2026 21:00:37 -0700 Subject: [PATCH 12/12] Update dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../CheckpointParentTests.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs index f6b9186415..1f8e79fbb7 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs @@ -297,8 +297,7 @@ internal async Task Checkpoint_AfterResumeFromSuperstepCompleted_CountCheckpoint await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); List resumedCheckpoints = []; - using CancellationTokenSource cts2 = new(); - await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token)) + await foreach (WorkflowEvent evt in resumed.WatchStreamAsync()) { if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) {