-
Notifications
You must be signed in to change notification settings - Fork 1.3k
.NET: Add checkpoint on super step started event: issue #4280 #4604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a5cf61c
a81c3e2
d1b4449
151f7db
30380d6
61fb686
4c13736
1f859cc
4c44228
71f50f0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,4 +18,10 @@ public sealed class SuperStepStartInfo(HashSet<string>? sendingExecutors = null) | |||||||||||
| /// Gets a value indicating whether there are any external messages queued during the previous SuperStep. | ||||||||||||
| /// </summary> | ||||||||||||
| public bool HasExternalMessages { get; init; } | ||||||||||||
|
|
||||||||||||
| /// <summary> | ||||||||||||
| /// Gets the <see cref="CheckpointInfo"/> corresponding to the checkpoint restored at the start of this SuperStep, if any. | ||||||||||||
|
||||||||||||
| /// Gets the <see cref="CheckpointInfo"/> corresponding to the checkpoint restored at the start of this SuperStep, if any. | |
| /// Gets the <see cref="CheckpointInfo"/> for the checkpoint created immediately before raising | |
| /// the <c>SuperStepStartedEvent</c> for this SuperStep (i.e., the checkpoint taken after the | |
| /// previous SuperStep completed). The <see cref="CheckpointInfo.StepNumber"/> therefore | |
| /// corresponds to the previously completed SuperStep, not the upcoming one. |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -32,13 +32,17 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi | |||||
| InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); | ||||||
|
|
||||||
| // Act | ||||||
| StreamingRun run = | ||||||
| await using StreamingRun run = | ||||||
| await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); | ||||||
|
|
||||||
| List<CheckpointInfo> checkpoints = []; | ||||||
| await foreach (WorkflowEvent evt in run.WatchStreamAsync()) | ||||||
| { | ||||||
| if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) | ||||||
| if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) | ||||||
| { | ||||||
| checkpoints.Add(startCp); | ||||||
| } | ||||||
| else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) | ||||||
| { | ||||||
| checkpoints.Add(cp); | ||||||
| } | ||||||
|
|
@@ -51,6 +55,13 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi | |||||
| Checkpoint storedFirst = await ((ICheckpointManager)checkpointManager) | ||||||
| .LookupCheckpointAsync(firstCheckpoint.SessionId, firstCheckpoint); | ||||||
| storedFirst.Parent.Should().BeNull("the first checkpoint should have no parent"); | ||||||
|
|
||||||
| // Assert: The second checkpoint should have 1 parent, the first checkpoint. | ||||||
| CheckpointInfo secondCheckpoint = checkpoints[1]; | ||||||
| Checkpoint storedSecond = await ((ICheckpointManager)checkpointManager) | ||||||
| .LookupCheckpointAsync(secondCheckpoint.SessionId, secondCheckpoint); | ||||||
| storedSecond.Parent.Should().NotBeNull("the second checkpoint should have a parent"); | ||||||
| storedSecond.Parent.Should().Be(firstCheckpoint, "the second checkpoint's parent should be the first checkpoint"); | ||||||
| } | ||||||
|
|
||||||
| [Theory] | ||||||
|
|
@@ -79,13 +90,18 @@ internal async Task Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync(Exe | |||||
|
|
||||||
| await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) | ||||||
| { | ||||||
| if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) | ||||||
| if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) | ||||||
| { | ||||||
| checkpoints.Add(startCp); | ||||||
| } | ||||||
| else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) | ||||||
| { | ||||||
| checkpoints.Add(cp); | ||||||
| if (checkpoints.Count >= 3) | ||||||
| { | ||||||
| cts.Cancel(); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| if (checkpoints.Count >= 3) | ||||||
| { | ||||||
| cts.Cancel(); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -132,13 +148,18 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs | |||||
| using CancellationTokenSource cts = new(); | ||||||
| await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) | ||||||
| { | ||||||
| if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) | ||||||
| if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) | ||||||
| { | ||||||
| firstRunCheckpoints.Add(startCp); | ||||||
| } | ||||||
| else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) | ||||||
| { | ||||||
| firstRunCheckpoints.Add(cp); | ||||||
| if (firstRunCheckpoints.Count >= 2) | ||||||
| { | ||||||
| cts.Cancel(); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| if (firstRunCheckpoints.Count >= 2) | ||||||
| { | ||||||
| cts.Cancel(); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -155,13 +176,18 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs | |||||
| using CancellationTokenSource cts2 = new(); | ||||||
| await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token)) | ||||||
| { | ||||||
| if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) | ||||||
| if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) | ||||||
| { | ||||||
| resumedCheckpoints.Add(startCp); | ||||||
| } | ||||||
| else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) | ||||||
| { | ||||||
| resumedCheckpoints.Add(cp); | ||||||
| if (resumedCheckpoints.Count >= 1) | ||||||
| { | ||||||
| cts2.Cancel(); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| if (resumedCheckpoints.Count >= 1) | ||||||
| { | ||||||
| cts2.Cancel(); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -172,4 +198,119 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs | |||||
| storedResumed.Parent.Should().NotBeNull("checkpoint created after resume should have a parent"); | ||||||
| storedResumed.Parent.Should().Be(resumePoint, "checkpoint after resume should reference the checkpoint we resumed from"); | ||||||
| } | ||||||
|
|
||||||
| [Theory] | ||||||
| [InlineData(ExecutionEnvironment.InProcess_Lockstep)] | ||||||
| [InlineData(ExecutionEnvironment.InProcess_OffThread)] | ||||||
| internal async Task Checkpoint_AfterResumeFromSuperstepStart_CountCheckpointsEmittedAsync(ExecutionEnvironment environment) | ||||||
| { | ||||||
| // Arrange: A basic workflow with 3 executor stages | ||||||
| ForwardMessageExecutor<string> executorA = new("A"); | ||||||
| ForwardMessageExecutor<string> executorB = new("B"); | ||||||
| ForwardMessageExecutor<string> executorC = new("C"); | ||||||
|
|
||||||
| Workflow workflow = new WorkflowBuilder(executorA) | ||||||
| .AddEdge(executorA, executorB) | ||||||
| .AddEdge(executorB, executorC) | ||||||
| .Build(); | ||||||
|
|
||||||
| CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); | ||||||
| InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); | ||||||
|
|
||||||
| // First run: collect a checkpoint to resume from | ||||||
| StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello"); | ||||||
|
|
||||||
| List<CheckpointInfo> firstRunCheckpoints = []; | ||||||
| await foreach (WorkflowEvent evt in run.WatchStreamAsync()) | ||||||
| { | ||||||
| if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp) | ||||||
| { | ||||||
| firstRunCheckpoints.Add(startCp); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| firstRunCheckpoints.Should().HaveCount(3); | ||||||
| CheckpointInfo resumePoint = firstRunCheckpoints[1]; | ||||||
|
|
||||||
| // Dispose the first run to release workflow ownership before resuming. | ||||||
| await run.DisposeAsync(); | ||||||
|
|
||||||
elgold92 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| // Act: Resume from the second checkpoint | ||||||
| StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); | ||||||
|
||||||
| StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); | |
| await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checkpoint created at the beginning of
RunSuperstepAsyncis stamped usingStepTracer.StepNumber, butStepTracer.Advance(...)increments the step number afterward. This means theStartInfo.Checkpointmetadata will generally be associated with the previous step number (and the first start checkpoint is-1/IsInitial), which can be surprising when correlating checkpoints toSuperStepStartedEvent.StepNumber. Consider advancing the step counter before creating the start-of-step checkpoint, or allowingCheckpointAsyncto accept an explicit step number to use for start checkpoints.