.NET: Add checkpoint on super step started event: issue #4280#4604
.NET: Add checkpoint on super step started event: issue #4280#4604elgold92 wants to merge 8 commits intomicrosoft:mainfrom
Conversation
…ely b/c the test definitions are now wrong
…c definition. I think this is correct, but the test now hangs, likely because the first checkpoint (or it's related data structures) we're now creating aren't initialized correctly.
…ShouldPersist_CheckpointedAsync test definition
There was a problem hiding this comment.
Pull request overview
Adds support for creating/checkpointing workflow state at the SuperStepStarted boundary so runs can resume from “pre-delivery” checkpoints (addressing #4280), and updates tests accordingly.
Changes:
- Add a
CheckpointInfo?field toSuperStepStartInfoand populate it onSuperStepStartedEvent. - Create a checkpoint at the start of each superstep (capturing pre-delivery queued messages) by extending runner state export to accept an override
StepContext. - Update and expand unit tests to account for the additional checkpoints and validate parent chaining/resume behavior.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InProcessStateTests.cs | Updates expected checkpoint count due to start+end checkpointing per superstep. |
| dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs | Extends tests to include checkpoints emitted on SuperStepStartedEvent and adds new resume/count assertions. |
| dotnet/src/Microsoft.Agents.AI.Workflows/SuperStepStartInfo.cs | Adds Checkpoint property to expose the checkpoint emitted at superstep start. |
| dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs | Allows exporting runner state from an alternate StepContext (pre-delivery snapshot). |
| dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs | Saves a checkpoint before superstep execution and wires it into the started event. |
| dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcStepTracer.cs | Plumbs the start-checkpoint into SuperStepStartedEvent payload. |
dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs
Outdated
Show resolved
Hide resolved
| public bool HasExternalMessages { get; init; } | ||
|
|
||
| /// <summary> | ||
| /// Gets the <see cref="CheckpointInfo"/> corresponding to the checkpoint restored at the start of this SuperStep, if any. |
There was a problem hiding this comment.
The XML doc for SuperStepStartInfo.Checkpoint says this is the checkpoint restored at the start of the SuperStep, but this property is populated from the checkpoint created immediately before raising SuperStepStartedEvent (pre-delivery). Please update the summary to match the actual semantics (and optionally clarify whether its StepNumber corresponds to the previous completed step or the upcoming step).
| /// Gets the <see cref="CheckpointInfo"/> corresponding to the checkpoint restored at the start of this SuperStep, if any. | |
| /// Gets the <see cref="CheckpointInfo"/> for the checkpoint created immediately before raising | |
| /// the <c>SuperStepStartedEvent</c> for this SuperStep (i.e., the checkpoint taken after the | |
| /// previous SuperStep completed). The <see cref="CheckpointInfo.StepNumber"/> therefore | |
| /// corresponds to the previously completed SuperStep, not the upcoming one. |
| StreamingRun run = | ||
| await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); | ||
|
|
||
| List<CheckpointInfo> checkpoints = []; | ||
| await foreach (WorkflowEvent evt in run.WatchStreamAsync()) | ||
| { | ||
| if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) | ||
| if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) |
There was a problem hiding this comment.
StreamingRun is IAsyncDisposable, but this test creates run without await using/disposing it. Disposing the run is important to release workflow ownership and underlying resources; please wrap it in await using or ensure DisposeAsync() is called in a finally block.
| List<CheckpointInfo> firstRunCheckpoints = []; | ||
| using CancellationTokenSource cts = new(); | ||
| await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) | ||
| { | ||
| if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) | ||
| { | ||
| firstRunCheckpoints.Add(startCp); | ||
| } | ||
| } |
There was a problem hiding this comment.
In Checkpoint_AfterResumeFromSuperstepStart_CountCheckpointsEmittedAsync, CancellationTokenSource cts is created and passed to WatchStreamAsync, but it is never canceled. Either remove the CTS and use WatchStreamAsync() directly, or cancel once the expected number of checkpoints has been collected to keep the test bounded and avoid potential hangs if the stream doesn’t complete.
| // Dispose the first run to release workflow ownership before resuming. | ||
| await run.DisposeAsync(); | ||
|
|
There was a problem hiding this comment.
run is declared with await using, but later DisposeAsync() is called explicitly as well. Since await using already disposes the run, drop the explicit DisposeAsync() (or remove await using and keep the manual dispose) to avoid redundant disposal and keep the test clearer.
| // Dispose the first run to release workflow ownership before resuming. | |
| await run.DisposeAsync(); |
| List<CheckpointInfo> firstRunCheckpoints = []; | ||
| using CancellationTokenSource cts = new(); | ||
| await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) | ||
| { | ||
| if (evt is SuperStepCompletedEvent completedEvent && completedEvent.CompletionInfo?.Checkpoint is { } startCp) | ||
| { | ||
| firstRunCheckpoints.Add(startCp); | ||
| } | ||
| } |
There was a problem hiding this comment.
In Checkpoint_AfterResumeFromSuperstepCompleted_CountCheckpointsEmittedAsync, CancellationTokenSource cts is created and passed to WatchStreamAsync, but it is never canceled. Either remove the CTS or cancel once you’ve collected the expected checkpoints so the test stays bounded if the stream doesn’t complete.
…tParentTests.cs rename local variable Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
@copilot open a new pull request to apply changes based on the comments in this thread. Changes look to be generally minor improvements. |
| // Dispose the first run to release workflow ownership before resuming. | ||
| await run.DisposeAsync(); | ||
|
|
||
| // Act: Resume from the first checkpoint |
There was a problem hiding this comment.
This comment says "Resume from the first checkpoint", but resumePoint is taken from firstRunCheckpoints[1] (the second checkpoint). Please update the comment to match the code to avoid confusion when reading/fixing these tests.
| // Act: Resume from the first checkpoint | |
| // Act: Resume from the second checkpoint |
| List<CheckpointInfo> firstRunCheckpoints = []; | ||
| using CancellationTokenSource cts = new(); | ||
| await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) | ||
| { |
There was a problem hiding this comment.
This test creates a CancellationTokenSource and passes cts.Token to WatchStreamAsync, but never cancels it (the stream is consumed to completion). Consider removing the CTS and using WatchStreamAsync() directly, or canceling once the needed checkpoints have been collected to keep the test intent clear.
| // Dispose the first run to release workflow ownership before resuming. | ||
| await run.DisposeAsync(); | ||
|
|
||
| // Act: Resume from the first checkpoint |
There was a problem hiding this comment.
This comment says "Resume from the first checkpoint", but resumePoint is taken from firstRunCheckpoints[1] (the second checkpoint). Please update the comment to match the code to avoid confusion when reading/fixing these tests.
| // Act: Resume from the first checkpoint | |
| // Act: Resume from the second checkpoint |
| List<CheckpointInfo> firstRunCheckpoints = []; | ||
| using CancellationTokenSource cts = new(); | ||
| await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) | ||
| { |
There was a problem hiding this comment.
This test creates a CancellationTokenSource and passes cts.Token to WatchStreamAsync, but never cancels it (the stream is consumed to completion). Consider removing the CTS and using WatchStreamAsync() directly, or canceling once the needed checkpoints have been collected to keep the test intent clear.
Motivation and Context
Address issue #4280, allowing workflows to resume from checkpoints saved from SuperStepStarted events.
Description
Adds
CheckpointInfo?field to theSuperStepStartInfoclass, populating this information in the InProcessRunner and InProcStepTracer. Also updates associated unit tests to expect more checkpoints to be created on checkpointed workflows.