diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcStepTracer.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcStepTracer.cs index 0affd47f00..2b71a483f8 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcStepTracer.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcStepTracer.cs @@ -31,7 +31,7 @@ internal sealed class InProcStepTracer : IStepTracer /// The Step Number of the last SuperStep. Note that Step Numbers are 0-indexed. public void Reload(int lastStepNumber = 0) => this._nextStepNumber = lastStepNumber + 1; - public SuperStepStartedEvent Advance(StepContext step) + public SuperStepStartedEvent Advance(StepContext step, CheckpointInfo? startCheckpoint = null) { this._nextStepNumber++; this.Activated.Clear(); @@ -57,7 +57,8 @@ public SuperStepStartedEvent Advance(StepContext step) return new SuperStepStartedEvent(this.StepNumber, new SuperStepStartInfo(sendingExecutors) { - HasExternalMessages = hasExternalMessages + HasExternalMessages = hasExternalMessages, + Checkpoint = startCheckpoint }); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs index 2a61f80ced..56047a036b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs @@ -244,7 +244,11 @@ await executor.ExecuteCoreAsync( private async ValueTask RunSuperstepAsync(StepContext currentStep, CancellationToken cancellationToken) { - await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep)).ConfigureAwait(false); + // Save a checkpoint before the superstep executes, capturing the pre-delivery state. + await this.CheckpointAsync(currentStep, cancellationToken).ConfigureAwait(false); + CheckpointInfo? startCheckpoint = this.StepTracer.Checkpoint; + + await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep, startCheckpoint)).ConfigureAwait(false); // Deliver the messages and queue the next step List receiverTasks = @@ -278,7 +282,7 @@ await this.RaiseWorkflowEventAsync(this.StepTracer.Complete(this.RunContext.Next private WorkflowInfo? _workflowInfoCache; private CheckpointInfo? _lastCheckpointInfo; private readonly List _checkpoints = []; - internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = default) + internal async ValueTask CheckpointAsync(StepContext? queuedMessagesOverride, CancellationToken cancellationToken = default) { this.RunContext.CheckEnded(); if (this.CheckpointManager is null) @@ -299,7 +303,7 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d await prepareTask.ConfigureAwait(false); await this.RunContext.StateManager.PublishUpdatesAsync(this.StepTracer).ConfigureAwait(false); - RunnerStateData runnerData = await this.RunContext.ExportStateAsync().ConfigureAwait(false); + RunnerStateData runnerData = await this.RunContext.ExportStateAsync(queuedMessagesOverride).ConfigureAwait(false); Dictionary stateData = await this.RunContext.StateManager.ExportStateAsync().ConfigureAwait(false); Checkpoint checkpoint = new(this.StepTracer.StepNumber, this._workflowInfoCache, runnerData, stateData, edgeData, this._lastCheckpointInfo); @@ -308,6 +312,9 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d this._checkpoints.Add(this._lastCheckpointInfo); } + internal ValueTask CheckpointAsync(CancellationToken cancellationToken = default) + => this.CheckpointAsync(null, cancellationToken); + public async ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default) { this.RunContext.CheckEnded(); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index eda7b90a80..e3156715ce 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -379,11 +379,13 @@ async Task InvokeCheckpointRestoredAsync(Task executorTask) } } - internal ValueTask ExportStateAsync() + internal ValueTask ExportStateAsync(StepContext? queuedMessagesOverride = null) { this.CheckEnded(); - Dictionary> queuedMessages = this._nextStep.ExportMessages(); + StepContext queuedMessagesSource = queuedMessagesOverride ?? this._nextStep; + Dictionary> queuedMessages = queuedMessagesSource.ExportMessages(); + RunnerStateData result = new(instantiatedExecutors: [.. this._executors.Keys], queuedMessages, outstandingRequests: [.. this._externalRequests.Values]); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs index 94ccb06dd1..a6fb85a35d 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs @@ -18,4 +18,10 @@ public sealed class SuperStepStartInfo(HashSet? sendingExecutors = null) /// Gets a value indicating whether there are any external messages queued during the previous SuperStep. /// public bool HasExternalMessages { get; init; } + + /// + /// Gets the corresponding to the checkpoint created at the start of this SuperStep, if any. + /// if checkpointing was not enabled when the run was started. + /// + public CheckpointInfo? Checkpoint { get; init; } } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs index 0ecf3c993f..1f8e79fbb7 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs @@ -32,13 +32,17 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); // Act - StreamingRun run = + await using StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); List checkpoints = []; await foreach (WorkflowEvent evt in run.WatchStreamAsync()) { - if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) + { + checkpoints.Add(startCp); + } + else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) { checkpoints.Add(cp); } @@ -51,6 +55,14 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi Checkpoint storedFirst = await ((ICheckpointManager)checkpointManager) .LookupCheckpointAsync(firstCheckpoint.SessionId, firstCheckpoint); storedFirst.Parent.Should().BeNull("the first checkpoint should have no parent"); + + // Assert: The second checkpoint should have 1 parent, the first checkpoint. + checkpoints.Should().HaveCountGreaterThanOrEqualTo(2, "multiple checkpoints should have been created, and we can't verify parent checkpoint without at least 2 checkpoints present."); + CheckpointInfo secondCheckpoint = checkpoints[1]; + Checkpoint storedSecond = await ((ICheckpointManager)checkpointManager) + .LookupCheckpointAsync(secondCheckpoint.SessionId, secondCheckpoint); + storedSecond.Parent.Should().NotBeNull("the second checkpoint should have a parent"); + storedSecond.Parent.Should().Be(firstCheckpoint, "the second checkpoint's parent should be the first checkpoint"); } [Theory] @@ -79,13 +91,18 @@ internal async Task Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync(Exe await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) { - if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) + { + checkpoints.Add(startCp); + } + else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) { checkpoints.Add(cp); - if (checkpoints.Count >= 3) - { - cts.Cancel(); - } + } + + if (checkpoints.Count >= 3) + { + cts.Cancel(); } } @@ -126,19 +143,24 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); // First run: collect a checkpoint to resume from - await using StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); + StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); List firstRunCheckpoints = []; using CancellationTokenSource cts = new(); await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) { - if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) + { + firstRunCheckpoints.Add(startCp); + } + else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) { firstRunCheckpoints.Add(cp); - if (firstRunCheckpoints.Count >= 2) - { - cts.Cancel(); - } + } + + if (firstRunCheckpoints.Count >= 2) + { + cts.Cancel(); } } @@ -149,19 +171,24 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs await run.DisposeAsync(); // Act: Resume from the first checkpoint - StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); + await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); List resumedCheckpoints = []; using CancellationTokenSource cts2 = new(); await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token)) { - if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) + { + resumedCheckpoints.Add(startCp); + } + else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) { resumedCheckpoints.Add(cp); - if (resumedCheckpoints.Count >= 1) - { - cts2.Cancel(); - } + } + + if (resumedCheckpoints.Count >= 1) + { + cts2.Cancel(); } } @@ -172,4 +199,118 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs storedResumed.Parent.Should().NotBeNull("checkpoint created after resume should have a parent"); storedResumed.Parent.Should().Be(resumePoint, "checkpoint after resume should reference the checkpoint we resumed from"); } + + [Theory] + [InlineData(ExecutionEnvironment.InProcess_Lockstep)] + [InlineData(ExecutionEnvironment.InProcess_OffThread)] + internal async Task Checkpoint_AfterResumeFromSuperstepStart_CountCheckpointsEmittedAsync(ExecutionEnvironment environment) + { + // Arrange: A basic workflow with 3 executor stages + ForwardMessageExecutor executorA = new("A"); + ForwardMessageExecutor executorB = new("B"); + ForwardMessageExecutor executorC = new("C"); + + Workflow workflow = new WorkflowBuilder(executorA) + .AddEdge(executorA, executorB) + .AddEdge(executorB, executorC) + .Build(); + + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); + + // First run: collect a checkpoint to resume from + StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); + + List firstRunCheckpoints = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) + { + firstRunCheckpoints.Add(startCp); + } + } + + firstRunCheckpoints.Should().HaveCount(3); + CheckpointInfo resumePoint = firstRunCheckpoints[1]; + + // Dispose the first run to release workflow ownership before resuming. + await run.DisposeAsync(); + + // Act: Resume from the second checkpoint + await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); + + List resumedCheckpoints = []; + await foreach (WorkflowEvent evt in resumed.WatchStreamAsync()) + { + if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) + { + resumedCheckpoints.Add(startCp); + } + else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + { + resumedCheckpoints.Add(cp); + } + } + + // Assert: The workflow should save the right number of checkpoints on re-run. + resumedCheckpoints.Should().NotBeEmpty(); + resumedCheckpoints.Should().HaveCount(4, "the resumed workflow has 2 executors to run, each generating 2 checkpoints"); + } + + [Theory] + [InlineData(ExecutionEnvironment.InProcess_Lockstep)] + [InlineData(ExecutionEnvironment.InProcess_OffThread)] + internal async Task Checkpoint_AfterResumeFromSuperstepCompleted_CountCheckpointsEmittedAsync(ExecutionEnvironment environment) + { + // Arrange: A basic workflow with 3 executor stages + ForwardMessageExecutor executorA = new("A"); + ForwardMessageExecutor executorB = new("B"); + ForwardMessageExecutor executorC = new("C"); + + Workflow workflow = new WorkflowBuilder(executorA) + .AddEdge(executorA, executorB) + .AddEdge(executorB, executorC) + .Build(); + + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); + + // First run: collect a checkpoint to resume from + StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); + + List firstRunCheckpoints = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + if (evt is SuperStepCompletedEvent completedEvent && completedEvent.CompletionInfo?.Checkpoint is { } completedCp) + { + firstRunCheckpoints.Add(completedCp); + } + } + + firstRunCheckpoints.Should().HaveCount(3); + CheckpointInfo resumePoint = firstRunCheckpoints[1]; + + // Dispose the first run to release workflow ownership before resuming. + await run.DisposeAsync(); + + // Act: Resume from the second checkpoint + await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); + + List resumedCheckpoints = []; + await foreach (WorkflowEvent evt in resumed.WatchStreamAsync()) + { + if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) + { + resumedCheckpoints.Add(startCp); + } + else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + { + resumedCheckpoints.Add(cp); + } + } + + // Assert: The workflow should save the right number of checkpoints on re-run. + resumedCheckpoints.Should().NotBeEmpty(); + resumedCheckpoints.Should().HaveCount(2, "the resumed workflow has 1 executor to run, generating 2 checkpoints"); + } } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessStateTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessStateTests.cs index acffbdd336..d93c654730 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessStateTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessStateTests.cs @@ -133,7 +133,7 @@ public async Task InProcessRun_StateShouldPersist_CheckpointedAsync() Run checkpointed = await InProcessExecution.RunAsync(workflow, new(), CheckpointManager.Default); - checkpointed.Checkpoints.Should().HaveCount(4); + checkpointed.Checkpoints.Should().HaveCount(8); RunStatus status = await checkpointed.GetStatusAsync(); status.Should().Be(RunStatus.Idle);