Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ internal sealed class InProcStepTracer : IStepTracer
/// <param name="lastStepNumber">The Step Number of the last SuperStep. Note that Step Numbers are 0-indexed.</param>
public void Reload(int lastStepNumber = 0) => this._nextStepNumber = lastStepNumber + 1;

public SuperStepStartedEvent Advance(StepContext step)
public SuperStepStartedEvent Advance(StepContext step, CheckpointInfo? startCheckpoint = null)
{
this._nextStepNumber++;
this.Activated.Clear();
Expand All @@ -57,7 +57,8 @@ public SuperStepStartedEvent Advance(StepContext step)

return new SuperStepStartedEvent(this.StepNumber, new SuperStepStartInfo(sendingExecutors)
{
HasExternalMessages = hasExternalMessages
HasExternalMessages = hasExternalMessages,
Checkpoint = startCheckpoint
});
}

Expand Down
13 changes: 10 additions & 3 deletions dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ await executor.ExecuteCoreAsync(

private async ValueTask RunSuperstepAsync(StepContext currentStep, CancellationToken cancellationToken)
{
await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep)).ConfigureAwait(false);
// Save a checkpoint before the superstep executes, capturing the pre-delivery state.
await this.CheckpointAsync(currentStep, cancellationToken).ConfigureAwait(false);
CheckpointInfo? startCheckpoint = this.StepTracer.Checkpoint;

await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep, startCheckpoint)).ConfigureAwait(false);

Comment on lines +247 to 252
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkpoint created at the beginning of RunSuperstepAsync is stamped using StepTracer.StepNumber, but StepTracer.Advance(...) increments the step number afterward. This means the StartInfo.Checkpoint metadata will generally be associated with the previous step number (and the first start checkpoint is -1/IsInitial), which can be surprising when correlating checkpoints to SuperStepStartedEvent.StepNumber. Consider advancing the step counter before creating the start-of-step checkpoint, or allowing CheckpointAsync to accept an explicit step number to use for start checkpoints.

Suggested change
// Save a checkpoint before the superstep executes, capturing the pre-delivery state.
await this.CheckpointAsync(currentStep, cancellationToken).ConfigureAwait(false);
CheckpointInfo? startCheckpoint = this.StepTracer.Checkpoint;
await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep, startCheckpoint)).ConfigureAwait(false);
// Capture the checkpoint from the previous step (if any) to correlate with the step-start event.
CheckpointInfo? previousCheckpoint = this.StepTracer.Checkpoint;
await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep, previousCheckpoint)).ConfigureAwait(false);
// Save a checkpoint at the beginning of the superstep, capturing the pre-delivery state for this step.
await this.CheckpointAsync(currentStep, cancellationToken).ConfigureAwait(false);

Copilot uses AI. Check for mistakes.
// Deliver the messages and queue the next step
List<Task> receiverTasks =
Expand Down Expand Up @@ -278,7 +282,7 @@ await this.RaiseWorkflowEventAsync(this.StepTracer.Complete(this.RunContext.Next
private WorkflowInfo? _workflowInfoCache;
private CheckpointInfo? _lastCheckpointInfo;
private readonly List<CheckpointInfo> _checkpoints = [];
internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = default)
internal async ValueTask CheckpointAsync(StepContext? queuedMessagesOverride, CancellationToken cancellationToken = default)
{
this.RunContext.CheckEnded();
if (this.CheckpointManager is null)
Expand All @@ -299,7 +303,7 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d
await prepareTask.ConfigureAwait(false);
await this.RunContext.StateManager.PublishUpdatesAsync(this.StepTracer).ConfigureAwait(false);

RunnerStateData runnerData = await this.RunContext.ExportStateAsync().ConfigureAwait(false);
RunnerStateData runnerData = await this.RunContext.ExportStateAsync(queuedMessagesOverride).ConfigureAwait(false);
Dictionary<ScopeKey, PortableValue> stateData = await this.RunContext.StateManager.ExportStateAsync().ConfigureAwait(false);

Checkpoint checkpoint = new(this.StepTracer.StepNumber, this._workflowInfoCache, runnerData, stateData, edgeData, this._lastCheckpointInfo);
Expand All @@ -308,6 +312,9 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d
this._checkpoints.Add(this._lastCheckpointInfo);
}

internal ValueTask CheckpointAsync(CancellationToken cancellationToken = default)
=> this.CheckpointAsync(null, cancellationToken);

public async ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default)
{
this.RunContext.CheckEnded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,13 @@ async Task InvokeCheckpointRestoredAsync(Task<Executor> executorTask)
}
}

internal ValueTask<RunnerStateData> ExportStateAsync()
internal ValueTask<RunnerStateData> ExportStateAsync(StepContext? queuedMessagesOverride = null)
{
this.CheckEnded();

Dictionary<string, List<PortableMessageEnvelope>> queuedMessages = this._nextStep.ExportMessages();
StepContext queuedMessagesSource = queuedMessagesOverride ?? this._nextStep;
Dictionary<string, List<PortableMessageEnvelope>> queuedMessages = queuedMessagesSource.ExportMessages();

RunnerStateData result = new(instantiatedExecutors: [.. this._executors.Keys],
queuedMessages,
outstandingRequests: [.. this._externalRequests.Values]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,10 @@ public sealed class SuperStepStartInfo(HashSet<string>? sendingExecutors = null)
/// Gets a value indicating whether there are any external messages queued during the previous SuperStep.
/// </summary>
public bool HasExternalMessages { get; init; }

/// <summary>
/// Gets the <see cref="CheckpointInfo"/> corresponding to the checkpoint restored at the start of this SuperStep, if any.
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The XML doc for SuperStepStartInfo.Checkpoint says this is the checkpoint restored at the start of the SuperStep, but this property is populated from the checkpoint created immediately before raising SuperStepStartedEvent (pre-delivery). Please update the summary to match the actual semantics (and optionally clarify whether its StepNumber corresponds to the previous completed step or the upcoming step).

Suggested change
/// Gets the <see cref="CheckpointInfo"/> corresponding to the checkpoint restored at the start of this SuperStep, if any.
/// Gets the <see cref="CheckpointInfo"/> for the checkpoint created immediately before raising
/// the <c>SuperStepStartedEvent</c> for this SuperStep (i.e., the checkpoint taken after the
/// previous SuperStep completed). The <see cref="CheckpointInfo.StepNumber"/> therefore
/// corresponds to the previously completed SuperStep, not the upcoming one.

Copilot uses AI. Check for mistakes.
/// <see langword="null"/> if checkpointing was not enabled when the run was started.
/// </summary>
public CheckpointInfo? Checkpoint { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();

// Act
StreamingRun run =
await using StreamingRun run =
await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");

List<CheckpointInfo> checkpoints = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
checkpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
checkpoints.Add(cp);
}
Expand All @@ -51,6 +55,13 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi
Checkpoint storedFirst = await ((ICheckpointManager)checkpointManager)
.LookupCheckpointAsync(firstCheckpoint.SessionId, firstCheckpoint);
storedFirst.Parent.Should().BeNull("the first checkpoint should have no parent");

// Assert: The second checkpoint should have 1 parent, the first checkpoint.
CheckpointInfo secondCheckpoint = checkpoints[1];
Checkpoint storedSecond = await ((ICheckpointManager)checkpointManager)
.LookupCheckpointAsync(secondCheckpoint.SessionId, secondCheckpoint);
storedSecond.Parent.Should().NotBeNull("the second checkpoint should have a parent");
storedSecond.Parent.Should().Be(firstCheckpoint, "the second checkpoint's parent should be the first checkpoint");
}

[Theory]
Expand Down Expand Up @@ -79,13 +90,18 @@ internal async Task Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync(Exe

await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
checkpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
checkpoints.Add(cp);
if (checkpoints.Count >= 3)
{
cts.Cancel();
}
}

if (checkpoints.Count >= 3)
{
cts.Cancel();
}
}

Expand Down Expand Up @@ -132,13 +148,18 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs
using CancellationTokenSource cts = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
firstRunCheckpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
firstRunCheckpoints.Add(cp);
if (firstRunCheckpoints.Count >= 2)
{
cts.Cancel();
}
}

if (firstRunCheckpoints.Count >= 2)
{
cts.Cancel();
}
}

Expand All @@ -155,13 +176,18 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs
using CancellationTokenSource cts2 = new();
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
resumedCheckpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
resumedCheckpoints.Add(cp);
if (resumedCheckpoints.Count >= 1)
{
cts2.Cancel();
}
}

if (resumedCheckpoints.Count >= 1)
{
cts2.Cancel();
}
}

Expand All @@ -172,4 +198,119 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs
storedResumed.Parent.Should().NotBeNull("checkpoint created after resume should have a parent");
storedResumed.Parent.Should().Be(resumePoint, "checkpoint after resume should reference the checkpoint we resumed from");
}

[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
internal async Task Checkpoint_AfterResumeFromSuperstepStart_CountCheckpointsEmittedAsync(ExecutionEnvironment environment)
{
// Arrange: A basic workflow with 3 executor stages
ForwardMessageExecutor<string> executorA = new("A");
ForwardMessageExecutor<string> executorB = new("B");
ForwardMessageExecutor<string> executorC = new("C");

Workflow workflow = new WorkflowBuilder(executorA)
.AddEdge(executorA, executorB)
.AddEdge(executorB, executorC)
.Build();

CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();

// First run: collect a checkpoint to resume from
StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");

List<CheckpointInfo> firstRunCheckpoints = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
firstRunCheckpoints.Add(startCp);
}
}

firstRunCheckpoints.Should().HaveCount(3);
CheckpointInfo resumePoint = firstRunCheckpoints[1];

// Dispose the first run to release workflow ownership before resuming.
await run.DisposeAsync();

// Act: Resume from the second checkpoint
StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint);
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the new resume/count tests, StreamingRun run / StreamingRun resumed aren’t wrapped in await using (and resumed isn’t disposed at all). Other tests in this file use await using to ensure workflow ownership/resources are released even on assertion failure; using the same pattern here would avoid leaks and reduce test flakiness.

Suggested change
StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint);
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint);

Copilot uses AI. Check for mistakes.

List<CheckpointInfo> resumedCheckpoints = [];
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync())
{
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
resumedCheckpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
resumedCheckpoints.Add(cp);
}
}

// Assert: The workflow should save the right number of checkpoints on re-run.
resumedCheckpoints.Should().NotBeEmpty();
resumedCheckpoints.Should().HaveCount(4, "the resumed workflow has 2 executors to run, each generating 2 checkpoints");
}

[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
internal async Task Checkpoint_AfterResumeFromSuperstepCompleted_CountCheckpointsEmittedAsync(ExecutionEnvironment environment)
{
// Arrange: A basic workflow with 3 executor stages
ForwardMessageExecutor<string> executorA = new("A");
ForwardMessageExecutor<string> executorB = new("B");
ForwardMessageExecutor<string> executorC = new("C");

Workflow workflow = new WorkflowBuilder(executorA)
.AddEdge(executorA, executorB)
.AddEdge(executorB, executorC)
.Build();

CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();

// First run: collect a checkpoint to resume from
StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");

List<CheckpointInfo> firstRunCheckpoints = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is SuperStepCompletedEvent completedEvent && completedEvent.CompletionInfo?.Checkpoint is { } completedCp)
{
firstRunCheckpoints.Add(completedCp);
}
}

firstRunCheckpoints.Should().HaveCount(3);
CheckpointInfo resumePoint = firstRunCheckpoints[1];

// Dispose the first run to release workflow ownership before resuming.
await run.DisposeAsync();

// Act: Resume from the second checkpoint
StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint);

List<CheckpointInfo> resumedCheckpoints = [];
using CancellationTokenSource cts2 = new();
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token))
{
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
resumedCheckpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
resumedCheckpoints.Add(cp);
}
}

// Assert: The workflow should save the right number of checkpoints on re-run.
resumedCheckpoints.Should().NotBeEmpty();
resumedCheckpoints.Should().HaveCount(2, "the resumed workflow has 1 executor to run, generating 2 checkpoints");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public async Task InProcessRun_StateShouldPersist_CheckpointedAsync()

Run checkpointed = await InProcessExecution.RunAsync<TurnToken>(workflow, new(), CheckpointManager.Default);

checkpointed.Checkpoints.Should().HaveCount(4);
checkpointed.Checkpoints.Should().HaveCount(8);

RunStatus status = await checkpointed.GetStatusAsync();
status.Should().Be(RunStatus.Idle);
Expand Down
Loading