Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a5cf61c
Add Checkpoint member to SuperStepStartInfo
Mar 9, 2026
a81c3e2
initial proposed solution from Claude. Causes unit tests to fail, lik…
Mar 10, 2026
d1b4449
Fix Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync definition
Mar 10, 2026
151f7db
fix Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync definition
Mar 10, 2026
30380d6
update Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAsyn…
Mar 10, 2026
61fb686
All unit tests now passing.
Mar 10, 2026
4c13736
Add a couple more unit tests as well as fix failed InProcessRun_State…
Mar 10, 2026
1f859cc
Update dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Checkpoin…
elgold92 Mar 10, 2026
4c44228
Address GitHubCopilot's comments on CheckpointParentTest.cs. All mino…
Mar 10, 2026
71f50f0
Merge branch 'main' into ericgold/CheckpointOnSuperStepStarted
elgold92 Mar 10, 2026
9f4de94
minor suggestions from Copilot
Mar 11, 2026
90d7b08
Merge branch 'ericgold/CheckpointOnSuperStepStarted' of https://githu…
Mar 11, 2026
3bf2b1c
Merge branch 'main' into ericgold/CheckpointOnSuperStepStarted
elgold92 Mar 11, 2026
386a5df
resolve 1 more of Copilot's test comments
Mar 11, 2026
2fcf732
Update dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Checkpoin…
elgold92 Mar 11, 2026
f797794
Merge branch 'main' into ericgold/CheckpointOnSuperStepStarted
elgold92 Mar 11, 2026
3b8cb64
Merge branch 'main' into ericgold/CheckpointOnSuperStepStarted
elgold92 Mar 15, 2026
7999edc
Merge branch 'main' into ericgold/CheckpointOnSuperStepStarted
elgold92 Mar 19, 2026
894297a
Merge branch 'main' into ericgold/CheckpointOnSuperStepStarted
elgold92 Mar 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ internal sealed class InProcStepTracer : IStepTracer
/// <param name="lastStepNumber">The Step Number of the last SuperStep. Note that Step Numbers are 0-indexed.</param>
public void Reload(int lastStepNumber = 0) => this._nextStepNumber = lastStepNumber + 1;

public SuperStepStartedEvent Advance(StepContext step)
public SuperStepStartedEvent Advance(StepContext step, CheckpointInfo? startCheckpoint = null)
{
this._nextStepNumber++;
this.Activated.Clear();
Expand All @@ -57,7 +57,8 @@ public SuperStepStartedEvent Advance(StepContext step)

return new SuperStepStartedEvent(this.StepNumber, new SuperStepStartInfo(sendingExecutors)
{
HasExternalMessages = hasExternalMessages
HasExternalMessages = hasExternalMessages,
Checkpoint = startCheckpoint
});
}

Expand Down
13 changes: 10 additions & 3 deletions dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ await executor.ExecuteCoreAsync(

private async ValueTask RunSuperstepAsync(StepContext currentStep, CancellationToken cancellationToken)
{
await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep)).ConfigureAwait(false);
// Save a checkpoint before the superstep executes, capturing the pre-delivery state.
await this.CheckpointAsync(currentStep, cancellationToken).ConfigureAwait(false);
CheckpointInfo? startCheckpoint = this.StepTracer.Checkpoint;

await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep, startCheckpoint)).ConfigureAwait(false);

Comment thread
elgold92 marked this conversation as resolved.
// Deliver the messages and queue the next step
List<Task> receiverTasks =
Expand Down Expand Up @@ -278,7 +282,7 @@ await this.RaiseWorkflowEventAsync(this.StepTracer.Complete(this.RunContext.Next
private WorkflowInfo? _workflowInfoCache;
private CheckpointInfo? _lastCheckpointInfo;
private readonly List<CheckpointInfo> _checkpoints = [];
internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = default)
internal async ValueTask CheckpointAsync(StepContext? queuedMessagesOverride, CancellationToken cancellationToken = default)
{
this.RunContext.CheckEnded();
if (this.CheckpointManager is null)
Expand All @@ -299,7 +303,7 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d
await prepareTask.ConfigureAwait(false);
await this.RunContext.StateManager.PublishUpdatesAsync(this.StepTracer).ConfigureAwait(false);

RunnerStateData runnerData = await this.RunContext.ExportStateAsync().ConfigureAwait(false);
RunnerStateData runnerData = await this.RunContext.ExportStateAsync(queuedMessagesOverride).ConfigureAwait(false);
Dictionary<ScopeKey, PortableValue> stateData = await this.RunContext.StateManager.ExportStateAsync().ConfigureAwait(false);

Checkpoint checkpoint = new(this.StepTracer.StepNumber, this._workflowInfoCache, runnerData, stateData, edgeData, this._lastCheckpointInfo);
Expand All @@ -308,6 +312,9 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d
this._checkpoints.Add(this._lastCheckpointInfo);
}

internal ValueTask CheckpointAsync(CancellationToken cancellationToken = default)
=> this.CheckpointAsync(null, cancellationToken);

public async ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default)
{
this.RunContext.CheckEnded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,13 @@ async Task InvokeCheckpointRestoredAsync(Task<Executor> executorTask)
}
}

internal ValueTask<RunnerStateData> ExportStateAsync()
internal ValueTask<RunnerStateData> ExportStateAsync(StepContext? queuedMessagesOverride = null)
{
this.CheckEnded();

Dictionary<string, List<PortableMessageEnvelope>> queuedMessages = this._nextStep.ExportMessages();
StepContext queuedMessagesSource = queuedMessagesOverride ?? this._nextStep;
Dictionary<string, List<PortableMessageEnvelope>> queuedMessages = queuedMessagesSource.ExportMessages();

RunnerStateData result = new(instantiatedExecutors: [.. this._executors.Keys],
queuedMessages,
outstandingRequests: [.. this._externalRequests.Values]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,10 @@ public sealed class SuperStepStartInfo(HashSet<string>? sendingExecutors = null)
/// Gets a value indicating whether there are any external messages queued during the previous SuperStep.
/// </summary>
public bool HasExternalMessages { get; init; }

/// <summary>
/// Gets the <see cref="CheckpointInfo"/> corresponding to the checkpoint restored at the start of this SuperStep, if any.
Comment thread
elgold92 marked this conversation as resolved.
Outdated
/// <see langword="null"/> if checkpointing was not enabled when the run was started.
/// </summary>
public CheckpointInfo? Checkpoint { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi
List<CheckpointInfo> checkpoints = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
Comment thread
elgold92 marked this conversation as resolved.
Outdated
{
checkpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
checkpoints.Add(cp);
}
Expand All @@ -51,6 +55,13 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi
Checkpoint storedFirst = await ((ICheckpointManager)checkpointManager)
.LookupCheckpointAsync(firstCheckpoint.SessionId, firstCheckpoint);
storedFirst.Parent.Should().BeNull("the first checkpoint should have no parent");

// Assert: The second checkpoint should have 1 parent, the first checkpoint.
CheckpointInfo secondCheckpoint = checkpoints[1];
Checkpoint storedSecond = await ((ICheckpointManager)checkpointManager)
Comment thread
elgold92 marked this conversation as resolved.
.LookupCheckpointAsync(secondCheckpoint.SessionId, secondCheckpoint);
storedSecond.Parent.Should().NotBeNull("the second checkpoint should have a parent");
storedSecond.Parent.Should().Be(firstCheckpoint, "the second checkpoint's parent should be the first checkpoint");
}

[Theory]
Expand Down Expand Up @@ -79,13 +90,18 @@ internal async Task Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync(Exe

await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
checkpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
checkpoints.Add(cp);
if (checkpoints.Count >= 3)
{
cts.Cancel();
}
}

if (checkpoints.Count >= 3)
{
cts.Cancel();
}
}

Expand Down Expand Up @@ -132,13 +148,18 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs
using CancellationTokenSource cts = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
firstRunCheckpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
firstRunCheckpoints.Add(cp);
if (firstRunCheckpoints.Count >= 2)
{
cts.Cancel();
}
}

if (firstRunCheckpoints.Count >= 2)
{
cts.Cancel();
}
}

Expand All @@ -155,13 +176,18 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs
using CancellationTokenSource cts2 = new();
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
resumedCheckpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
resumedCheckpoints.Add(cp);
if (resumedCheckpoints.Count >= 1)
{
cts2.Cancel();
}
}

if (resumedCheckpoints.Count >= 1)
{
cts2.Cancel();
}
}

Expand All @@ -172,4 +198,122 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs
storedResumed.Parent.Should().NotBeNull("checkpoint created after resume should have a parent");
storedResumed.Parent.Should().Be(resumePoint, "checkpoint after resume should reference the checkpoint we resumed from");
}

[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
internal async Task Checkpoint_AfterResumeFromSuperstepStart_CountCheckpointsEmittedAsync(ExecutionEnvironment environment)
{
// Arrange: A basic workflow with 3 executor stages
ForwardMessageExecutor<string> executorA = new("A");
ForwardMessageExecutor<string> executorB = new("B");
ForwardMessageExecutor<string> executorC = new("C");

Workflow workflow = new WorkflowBuilder(executorA)
.AddEdge(executorA, executorB)
.AddEdge(executorB, executorC)
.Build();

CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();

// First run: collect a checkpoint to resume from
await using StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");

List<CheckpointInfo> firstRunCheckpoints = [];
Comment thread
elgold92 marked this conversation as resolved.
using CancellationTokenSource cts = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token))
{
Comment thread
elgold92 marked this conversation as resolved.
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
firstRunCheckpoints.Add(startCp);
}
}
Comment thread
elgold92 marked this conversation as resolved.

firstRunCheckpoints.Should().HaveCount(3);
CheckpointInfo resumePoint = firstRunCheckpoints[1];

// Dispose the first run to release workflow ownership before resuming.
await run.DisposeAsync();

Comment thread
elgold92 marked this conversation as resolved.
// Act: Resume from the first checkpoint
Comment thread
elgold92 marked this conversation as resolved.
Outdated
StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint);
Comment thread
elgold92 marked this conversation as resolved.
Outdated

List<CheckpointInfo> resumedCheckpoints = [];
using CancellationTokenSource cts2 = new();
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token))
{
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
resumedCheckpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
resumedCheckpoints.Add(cp);
}
}

// Assert: The workflow should save the right number of checkpoints on re-run.
resumedCheckpoints.Should().NotBeEmpty();
resumedCheckpoints.Should().HaveCount(4, "the resumed workflow has 2 executors to run, each generating 2 checkpoints");
}

[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
internal async Task Checkpoint_AfterResumeFromSuperstepCompleted_CountCheckpointsEmittedAsync(ExecutionEnvironment environment)
{
// Arrange: A basic workflow with 3 executor stages
ForwardMessageExecutor<string> executorA = new("A");
ForwardMessageExecutor<string> executorB = new("B");
ForwardMessageExecutor<string> executorC = new("C");

Workflow workflow = new WorkflowBuilder(executorA)
.AddEdge(executorA, executorB)
.AddEdge(executorB, executorC)
.Build();

CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();

// First run: collect a checkpoint to resume from
await using StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");

List<CheckpointInfo> firstRunCheckpoints = [];
Comment thread
elgold92 marked this conversation as resolved.
using CancellationTokenSource cts = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token))
{
Comment thread
elgold92 marked this conversation as resolved.
if (evt is SuperStepCompletedEvent completedEvent && completedEvent.CompletionInfo?.Checkpoint is { } completedCp)
{
firstRunCheckpoints.Add(completedCp);
}
}
Comment thread
elgold92 marked this conversation as resolved.

firstRunCheckpoints.Should().HaveCount(3);
CheckpointInfo resumePoint = firstRunCheckpoints[1];

// Dispose the first run to release workflow ownership before resuming.
await run.DisposeAsync();

// Act: Resume from the first checkpoint
Comment thread
elgold92 marked this conversation as resolved.
Outdated
StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint);

List<CheckpointInfo> resumedCheckpoints = [];
using CancellationTokenSource cts2 = new();
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token))
Comment thread
elgold92 marked this conversation as resolved.
Outdated
{
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
resumedCheckpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
resumedCheckpoints.Add(cp);
}
}

// Assert: The workflow should save the right number of checkpoints on re-run.
resumedCheckpoints.Should().NotBeEmpty();
resumedCheckpoints.Should().HaveCount(2, "the resumed workflow has 1 executor to run, generating 2 checkpoints");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public async Task InProcessRun_StateShouldPersist_CheckpointedAsync()

Run checkpointed = await InProcessExecution.RunAsync<TurnToken>(workflow, new(), CheckpointManager.Default);

checkpointed.Checkpoints.Should().HaveCount(4);
checkpointed.Checkpoints.Should().HaveCount(8);

RunStatus status = await checkpointed.GetStatusAsync();
status.Should().Be(RunStatus.Idle);
Expand Down
Loading