diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcStepTracer.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcStepTracer.cs
index 0affd47f00..2b71a483f8 100644
--- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcStepTracer.cs
+++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcStepTracer.cs
@@ -31,7 +31,7 @@ internal sealed class InProcStepTracer : IStepTracer
/// The Step Number of the last SuperStep. Note that Step Numbers are 0-indexed.
public void Reload(int lastStepNumber = 0) => this._nextStepNumber = lastStepNumber + 1;
- public SuperStepStartedEvent Advance(StepContext step)
+ public SuperStepStartedEvent Advance(StepContext step, CheckpointInfo? startCheckpoint = null)
{
this._nextStepNumber++;
this.Activated.Clear();
@@ -57,7 +57,8 @@ public SuperStepStartedEvent Advance(StepContext step)
return new SuperStepStartedEvent(this.StepNumber, new SuperStepStartInfo(sendingExecutors)
{
- HasExternalMessages = hasExternalMessages
+ HasExternalMessages = hasExternalMessages,
+ Checkpoint = startCheckpoint
});
}
diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs
index 2a61f80ced..56047a036b 100644
--- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs
+++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs
@@ -244,7 +244,11 @@ await executor.ExecuteCoreAsync(
private async ValueTask RunSuperstepAsync(StepContext currentStep, CancellationToken cancellationToken)
{
- await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep)).ConfigureAwait(false);
+ // Save a checkpoint before the superstep executes, capturing the pre-delivery state.
+ await this.CheckpointAsync(currentStep, cancellationToken).ConfigureAwait(false);
+ CheckpointInfo? startCheckpoint = this.StepTracer.Checkpoint;
+
+ await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep, startCheckpoint)).ConfigureAwait(false);
// Deliver the messages and queue the next step
List receiverTasks =
@@ -278,7 +282,7 @@ await this.RaiseWorkflowEventAsync(this.StepTracer.Complete(this.RunContext.Next
private WorkflowInfo? _workflowInfoCache;
private CheckpointInfo? _lastCheckpointInfo;
private readonly List _checkpoints = [];
- internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = default)
+ internal async ValueTask CheckpointAsync(StepContext? queuedMessagesOverride, CancellationToken cancellationToken = default)
{
this.RunContext.CheckEnded();
if (this.CheckpointManager is null)
@@ -299,7 +303,7 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d
await prepareTask.ConfigureAwait(false);
await this.RunContext.StateManager.PublishUpdatesAsync(this.StepTracer).ConfigureAwait(false);
- RunnerStateData runnerData = await this.RunContext.ExportStateAsync().ConfigureAwait(false);
+ RunnerStateData runnerData = await this.RunContext.ExportStateAsync(queuedMessagesOverride).ConfigureAwait(false);
Dictionary stateData = await this.RunContext.StateManager.ExportStateAsync().ConfigureAwait(false);
Checkpoint checkpoint = new(this.StepTracer.StepNumber, this._workflowInfoCache, runnerData, stateData, edgeData, this._lastCheckpointInfo);
@@ -308,6 +312,9 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d
this._checkpoints.Add(this._lastCheckpointInfo);
}
+ internal ValueTask CheckpointAsync(CancellationToken cancellationToken = default)
+ => this.CheckpointAsync(null, cancellationToken);
+
public async ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default)
{
this.RunContext.CheckEnded();
diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs
index eda7b90a80..e3156715ce 100644
--- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs
+++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs
@@ -379,11 +379,13 @@ async Task InvokeCheckpointRestoredAsync(Task executorTask)
}
}
- internal ValueTask ExportStateAsync()
+ internal ValueTask ExportStateAsync(StepContext? queuedMessagesOverride = null)
{
this.CheckEnded();
- Dictionary> queuedMessages = this._nextStep.ExportMessages();
+ StepContext queuedMessagesSource = queuedMessagesOverride ?? this._nextStep;
+ Dictionary> queuedMessages = queuedMessagesSource.ExportMessages();
+
RunnerStateData result = new(instantiatedExecutors: [.. this._executors.Keys],
queuedMessages,
outstandingRequests: [.. this._externalRequests.Values]);
diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs
index 94ccb06dd1..a6fb85a35d 100644
--- a/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs
+++ b/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs
@@ -18,4 +18,10 @@ public sealed class SuperStepStartInfo(HashSet? sendingExecutors = null)
/// Gets a value indicating whether there are any external messages queued during the previous SuperStep.
///
public bool HasExternalMessages { get; init; }
+
+ ///
+ /// Gets the corresponding to the checkpoint created at the start of this SuperStep, if any.
+ /// if checkpointing was not enabled when the run was started.
+ ///
+ public CheckpointInfo? Checkpoint { get; init; }
}
diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs
index 0ecf3c993f..1f8e79fbb7 100644
--- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs
+++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs
@@ -32,13 +32,17 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
// Act
- StreamingRun run =
+ await using StreamingRun run =
await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");
List checkpoints = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
- if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
+ if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
+ {
+ checkpoints.Add(startCp);
+ }
+ else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
checkpoints.Add(cp);
}
@@ -51,6 +55,14 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi
Checkpoint storedFirst = await ((ICheckpointManager)checkpointManager)
.LookupCheckpointAsync(firstCheckpoint.SessionId, firstCheckpoint);
storedFirst.Parent.Should().BeNull("the first checkpoint should have no parent");
+
+ // Assert: The second checkpoint should have 1 parent, the first checkpoint.
+ checkpoints.Should().HaveCountGreaterThanOrEqualTo(2, "multiple checkpoints should have been created, and we can't verify parent checkpoint without at least 2 checkpoints present.");
+ CheckpointInfo secondCheckpoint = checkpoints[1];
+ Checkpoint storedSecond = await ((ICheckpointManager)checkpointManager)
+ .LookupCheckpointAsync(secondCheckpoint.SessionId, secondCheckpoint);
+ storedSecond.Parent.Should().NotBeNull("the second checkpoint should have a parent");
+ storedSecond.Parent.Should().Be(firstCheckpoint, "the second checkpoint's parent should be the first checkpoint");
}
[Theory]
@@ -79,13 +91,18 @@ internal async Task Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync(Exe
await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token))
{
- if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
+ if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
+ {
+ checkpoints.Add(startCp);
+ }
+ else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
checkpoints.Add(cp);
- if (checkpoints.Count >= 3)
- {
- cts.Cancel();
- }
+ }
+
+ if (checkpoints.Count >= 3)
+ {
+ cts.Cancel();
}
}
@@ -126,19 +143,24 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
// First run: collect a checkpoint to resume from
- await using StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");
+ StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");
List firstRunCheckpoints = [];
using CancellationTokenSource cts = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token))
{
- if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
+ if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
+ {
+ firstRunCheckpoints.Add(startCp);
+ }
+ else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
firstRunCheckpoints.Add(cp);
- if (firstRunCheckpoints.Count >= 2)
- {
- cts.Cancel();
- }
+ }
+
+ if (firstRunCheckpoints.Count >= 2)
+ {
+ cts.Cancel();
}
}
@@ -149,19 +171,24 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs
await run.DisposeAsync();
// Act: Resume from the first checkpoint
- StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint);
+ await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint);
List resumedCheckpoints = [];
using CancellationTokenSource cts2 = new();
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token))
{
- if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
+ if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
+ {
+ resumedCheckpoints.Add(startCp);
+ }
+ else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
resumedCheckpoints.Add(cp);
- if (resumedCheckpoints.Count >= 1)
- {
- cts2.Cancel();
- }
+ }
+
+ if (resumedCheckpoints.Count >= 1)
+ {
+ cts2.Cancel();
}
}
@@ -172,4 +199,118 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs
storedResumed.Parent.Should().NotBeNull("checkpoint created after resume should have a parent");
storedResumed.Parent.Should().Be(resumePoint, "checkpoint after resume should reference the checkpoint we resumed from");
}
+
+ [Theory]
+ [InlineData(ExecutionEnvironment.InProcess_Lockstep)]
+ [InlineData(ExecutionEnvironment.InProcess_OffThread)]
+ internal async Task Checkpoint_AfterResumeFromSuperstepStart_CountCheckpointsEmittedAsync(ExecutionEnvironment environment)
+ {
+ // Arrange: A basic workflow with 3 executor stages
+ ForwardMessageExecutor executorA = new("A");
+ ForwardMessageExecutor executorB = new("B");
+ ForwardMessageExecutor executorC = new("C");
+
+ Workflow workflow = new WorkflowBuilder(executorA)
+ .AddEdge(executorA, executorB)
+ .AddEdge(executorB, executorC)
+ .Build();
+
+ CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
+ InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
+
+ // First run: collect a checkpoint to resume from
+ StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");
+
+ List firstRunCheckpoints = [];
+ await foreach (WorkflowEvent evt in run.WatchStreamAsync())
+ {
+ if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
+ {
+ firstRunCheckpoints.Add(startCp);
+ }
+ }
+
+ firstRunCheckpoints.Should().HaveCount(3);
+ CheckpointInfo resumePoint = firstRunCheckpoints[1];
+
+ // Dispose the first run to release workflow ownership before resuming.
+ await run.DisposeAsync();
+
+ // Act: Resume from the second checkpoint
+ await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint);
+
+ List resumedCheckpoints = [];
+ await foreach (WorkflowEvent evt in resumed.WatchStreamAsync())
+ {
+ if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
+ {
+ resumedCheckpoints.Add(startCp);
+ }
+ else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
+ {
+ resumedCheckpoints.Add(cp);
+ }
+ }
+
+ // Assert: The workflow should save the right number of checkpoints on re-run.
+ resumedCheckpoints.Should().NotBeEmpty();
+ resumedCheckpoints.Should().HaveCount(4, "the resumed workflow has 2 executors to run, each generating 2 checkpoints");
+ }
+
+ [Theory]
+ [InlineData(ExecutionEnvironment.InProcess_Lockstep)]
+ [InlineData(ExecutionEnvironment.InProcess_OffThread)]
+ internal async Task Checkpoint_AfterResumeFromSuperstepCompleted_CountCheckpointsEmittedAsync(ExecutionEnvironment environment)
+ {
+ // Arrange: A basic workflow with 3 executor stages
+ ForwardMessageExecutor executorA = new("A");
+ ForwardMessageExecutor executorB = new("B");
+ ForwardMessageExecutor executorC = new("C");
+
+ Workflow workflow = new WorkflowBuilder(executorA)
+ .AddEdge(executorA, executorB)
+ .AddEdge(executorB, executorC)
+ .Build();
+
+ CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
+ InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
+
+ // First run: collect a checkpoint to resume from
+ StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");
+
+ List firstRunCheckpoints = [];
+ await foreach (WorkflowEvent evt in run.WatchStreamAsync())
+ {
+ if (evt is SuperStepCompletedEvent completedEvent && completedEvent.CompletionInfo?.Checkpoint is { } completedCp)
+ {
+ firstRunCheckpoints.Add(completedCp);
+ }
+ }
+
+ firstRunCheckpoints.Should().HaveCount(3);
+ CheckpointInfo resumePoint = firstRunCheckpoints[1];
+
+ // Dispose the first run to release workflow ownership before resuming.
+ await run.DisposeAsync();
+
+ // Act: Resume from the second checkpoint
+ await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint);
+
+ List resumedCheckpoints = [];
+ await foreach (WorkflowEvent evt in resumed.WatchStreamAsync())
+ {
+ if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
+ {
+ resumedCheckpoints.Add(startCp);
+ }
+ else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
+ {
+ resumedCheckpoints.Add(cp);
+ }
+ }
+
+ // Assert: The workflow should save the right number of checkpoints on re-run.
+ resumedCheckpoints.Should().NotBeEmpty();
+ resumedCheckpoints.Should().HaveCount(2, "the resumed workflow has 1 executor to run, generating 2 checkpoints");
+ }
}
diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessStateTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessStateTests.cs
index acffbdd336..d93c654730 100644
--- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessStateTests.cs
+++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessStateTests.cs
@@ -133,7 +133,7 @@ public async Task InProcessRun_StateShouldPersist_CheckpointedAsync()
Run checkpointed = await InProcessExecution.RunAsync(workflow, new(), CheckpointManager.Default);
- checkpointed.Checkpoints.Should().HaveCount(4);
+ checkpointed.Checkpoints.Should().HaveCount(8);
RunStatus status = await checkpointed.GetStatusAsync();
status.Should().Be(RunStatus.Idle);