Skip to content

Commit 22fc73a

Browse files
authored
Merge pull request #9 from viamus/feature/pipeline-resume-checkpoint
Add pipeline resume with checkpoint persistence
2 parents 0e5dfa7 + d1aa967 commit 22fc73a

7 files changed

Lines changed: 410 additions & 25 deletions

File tree

CodeGenesis.Engine/Cli/RunPipelineCommand.cs

Lines changed: 104 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ namespace CodeGenesis.Engine.Cli;
1010
public sealed class RunPipelineCommand(
1111
IClaudeRunner claude,
1212
PipelineExecutor executor,
13-
PipelineRenderer renderer) : AsyncCommand<RunPipelineCommandSettings>
13+
PipelineRenderer renderer,
14+
CheckpointManager checkpointManager) : AsyncCommand<RunPipelineCommandSettings>
1415
{
1516
public override async Task<int> ExecuteAsync(CommandContext commandContext, RunPipelineCommandSettings settings)
1617
{
@@ -28,6 +29,54 @@ public override async Task<int> ExecuteAsync(CommandContext commandContext, RunP
2829
return 1;
2930
}
3031

32+
// Compute YAML hash for checkpoint comparison
33+
var yamlHash = checkpointManager.ComputeFileHash(settings.File);
34+
35+
// Handle --resume / --from-step
36+
HashSet<string>? completedSteps = null;
37+
string? resumeFromStep = null;
38+
PipelineCheckpoint? checkpoint = null;
39+
40+
if (settings.Resume || settings.FromStep is not null)
41+
{
42+
checkpoint = checkpointManager.Load(settings.File);
43+
44+
if (checkpoint is null)
45+
{
46+
renderer.RenderError("No checkpoint found. Run the pipeline without --resume first.");
47+
return 1;
48+
}
49+
50+
// Warn if YAML changed since checkpoint
51+
if (checkpoint.YamlHash != yamlHash)
52+
{
53+
renderer.RenderInfo("Warning: Pipeline YAML has changed since the last checkpoint. Resuming with current config.");
54+
}
55+
56+
completedSteps = new HashSet<string>(checkpoint.CompletedSteps);
57+
58+
if (settings.Resume)
59+
{
60+
// --resume: determine resume point from checkpoint
61+
resumeFromStep = checkpoint.FailedStepName;
62+
// If no failed step recorded, all completed steps will be skipped
63+
// and execution continues from the first non-completed step
64+
65+
if (completedSteps.Count == 0 && resumeFromStep is null)
66+
{
67+
renderer.RenderInfo("Nothing to resume — no steps were completed.");
68+
return 0;
69+
}
70+
}
71+
else if (settings.FromStep is not null)
72+
{
73+
// --from-step: validate step exists in pipeline
74+
resumeFromStep = settings.FromStep;
75+
}
76+
77+
// Restore cached step outputs into context
78+
}
79+
3180
// Build template variables from inputs (defaults + overrides)
3281
var variables = new Dictionary<string, string>();
3382
foreach (var (key, input) in config.Inputs)
@@ -71,6 +120,16 @@ public override async Task<int> ExecuteAsync(CommandContext commandContext, RunP
71120
WorkingDirectory = workingDir
72121
};
73122

123+
// Restore cached step outputs from checkpoint
124+
if (checkpoint is not null && completedSteps is not null)
125+
{
126+
foreach (var (key, value) in checkpoint.StepOutputs)
127+
{
128+
if (completedSteps.Contains(key))
129+
context.StepOutputs[key] = value;
130+
}
131+
}
132+
74133
// Build step tree using StepBuilder
75134
List<IPipelineStep> steps;
76135
try
@@ -84,6 +143,21 @@ public override async Task<int> ExecuteAsync(CommandContext commandContext, RunP
84143
return 1;
85144
}
86145

146+
// Validate --from-step target exists
147+
if (settings.FromStep is not null && !steps.Any(s => s.Name == settings.FromStep))
148+
{
149+
renderer.RenderError($"Step '{settings.FromStep}' not found in pipeline.");
150+
return 1;
151+
}
152+
153+
// Check if all steps are already completed (--resume with nothing left)
154+
if (completedSteps is not null && resumeFromStep is null && steps.All(s => completedSteps.Contains(s.Name)))
155+
{
156+
renderer.RenderInfo("All steps already completed. Nothing to resume.");
157+
checkpointManager.Delete(settings.File);
158+
return 0;
159+
}
160+
87161
using var cts = new CancellationTokenSource();
88162
Console.CancelKeyPress += (_, e) =>
89163
{
@@ -93,27 +167,39 @@ public override async Task<int> ExecuteAsync(CommandContext commandContext, RunP
93167
};
94168

95169
// Execute pipeline — resolve templates for each step just before it runs
96-
var success = await executor.RunAsync(steps, context, cts.Token, onBeforeStep: step =>
97-
{
98-
if (step is DynamicStep dynamicStep)
170+
var success = await executor.RunAsync(steps, context, cts.Token,
171+
onBeforeStep: step =>
99172
{
100-
// Re-resolve the prompt and system prompt with latest step outputs
101-
var allVars = new Dictionary<string, string>(variables);
102-
foreach (var (key, value) in context.StepOutputs)
103-
allVars[$"steps.{key}"] = value;
104-
105-
dynamicStep.UpdateResolvedPrompt(
106-
PipelineConfigLoader.ResolveTemplate(
107-
dynamicStep.OriginalPromptTemplate, allVars));
108-
109-
if (dynamicStep.OriginalSystemPromptTemplate is not null)
173+
if (step is DynamicStep dynamicStep)
110174
{
111-
dynamicStep.UpdateResolvedSystemPrompt(
175+
// Re-resolve the prompt and system prompt with latest step outputs
176+
var allVars = new Dictionary<string, string>(variables);
177+
foreach (var (key, value) in context.StepOutputs)
178+
allVars[$"steps.{key}"] = value;
179+
180+
dynamicStep.UpdateResolvedPrompt(
112181
PipelineConfigLoader.ResolveTemplate(
113-
dynamicStep.OriginalSystemPromptTemplate, allVars));
182+
dynamicStep.OriginalPromptTemplate, allVars));
183+
184+
if (dynamicStep.OriginalSystemPromptTemplate is not null)
185+
{
186+
dynamicStep.UpdateResolvedSystemPrompt(
187+
PipelineConfigLoader.ResolveTemplate(
188+
dynamicStep.OriginalSystemPromptTemplate, allVars));
189+
}
114190
}
115-
}
116-
});
191+
},
192+
completedSteps: completedSteps,
193+
resumeFromStep: resumeFromStep,
194+
checkpointManager: checkpointManager,
195+
pipelineFile: settings.File,
196+
yamlHash: yamlHash,
197+
pipelineName: config.Pipeline.Name);
198+
199+
if (!success)
200+
{
201+
renderer.RenderResumeHint(settings.File);
202+
}
117203

118204
return success ? 0 : 1;
119205
}

CodeGenesis.Engine/Cli/RunPipelineCommandSettings.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.ComponentModel;
2+
using Spectre.Console;
23
using Spectre.Console.Cli;
34

45
namespace CodeGenesis.Engine.Cli;
@@ -20,4 +21,20 @@ public sealed class RunPipelineCommandSettings : CommandSettings
2021
[CommandOption("-m|--model")]
2122
[Description("Claude model override (e.g. claude-sonnet-4-6)")]
2223
public string? Model { get; set; }
24+
25+
[CommandOption("--resume")]
26+
[Description("Resume from the last checkpoint (skips completed steps)")]
27+
public bool Resume { get; set; }
28+
29+
[CommandOption("--from-step <STEP>")]
30+
[Description("Resume from a specific step name (re-executes that step and all following)")]
31+
public string? FromStep { get; set; }
32+
33+
public override ValidationResult Validate()
34+
{
35+
if (Resume && FromStep is not null)
36+
return ValidationResult.Error("--resume and --from-step are mutually exclusive.");
37+
38+
return ValidationResult.Success();
39+
}
2340
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
using System.Security.Cryptography;
2+
using System.Text.Json;
3+
using Microsoft.Extensions.Logging;
4+
5+
namespace CodeGenesis.Engine.Pipeline;
6+
7+
public sealed class CheckpointManager(ILogger<CheckpointManager> logger)
8+
{
9+
private static readonly JsonSerializerOptions JsonOptions = new()
10+
{
11+
WriteIndented = true,
12+
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
13+
};
14+
15+
public string GetCheckpointPath(string pipelineFilePath)
16+
{
17+
var fullPath = Path.GetFullPath(pipelineFilePath);
18+
var dir = Path.GetDirectoryName(fullPath)!;
19+
var stem = Path.GetFileNameWithoutExtension(fullPath);
20+
var checkpointDir = Path.Combine(dir, ".codegenesis");
21+
return Path.Combine(checkpointDir, $"{stem}.checkpoint.json");
22+
}
23+
24+
public PipelineCheckpoint? Load(string pipelineFilePath)
25+
{
26+
var path = GetCheckpointPath(pipelineFilePath);
27+
if (!File.Exists(path))
28+
return null;
29+
30+
try
31+
{
32+
var json = File.ReadAllText(path);
33+
return JsonSerializer.Deserialize<PipelineCheckpoint>(json, JsonOptions);
34+
}
35+
catch (Exception ex)
36+
{
37+
logger.LogWarning(ex, "Failed to load checkpoint from {Path}, ignoring", path);
38+
return null;
39+
}
40+
}
41+
42+
public void Save(PipelineCheckpoint checkpoint, string pipelineFilePath)
43+
{
44+
var path = GetCheckpointPath(pipelineFilePath);
45+
var dir = Path.GetDirectoryName(path)!;
46+
Directory.CreateDirectory(dir);
47+
48+
var json = JsonSerializer.Serialize(checkpoint, JsonOptions);
49+
var tmpPath = path + ".tmp";
50+
51+
try
52+
{
53+
File.WriteAllText(tmpPath, json);
54+
File.Move(tmpPath, path, overwrite: true);
55+
logger.LogDebug("Checkpoint saved to {Path}", path);
56+
}
57+
catch (Exception ex)
58+
{
59+
logger.LogWarning(ex, "Failed to save checkpoint to {Path}", path);
60+
// Clean up temp file if it exists
61+
try { File.Delete(tmpPath); } catch { /* best effort */ }
62+
}
63+
}
64+
65+
public void Delete(string pipelineFilePath)
66+
{
67+
var path = GetCheckpointPath(pipelineFilePath);
68+
try
69+
{
70+
if (File.Exists(path))
71+
{
72+
File.Delete(path);
73+
logger.LogDebug("Checkpoint deleted: {Path}", path);
74+
}
75+
}
76+
catch (Exception ex)
77+
{
78+
logger.LogWarning(ex, "Failed to delete checkpoint at {Path}", path);
79+
}
80+
}
81+
82+
public string ComputeFileHash(string filePath)
83+
{
84+
var bytes = File.ReadAllBytes(filePath);
85+
var hash = SHA256.HashData(bytes);
86+
return Convert.ToHexStringLower(hash);
87+
}
88+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
namespace CodeGenesis.Engine.Pipeline;
2+
3+
public sealed class PipelineCheckpoint
4+
{
5+
public int Version { get; init; } = 1;
6+
public required string PipelineFile { get; init; }
7+
public required string PipelineName { get; init; }
8+
public required string YamlHash { get; init; }
9+
public required DateTime LastUpdatedUtc { get; init; }
10+
public required List<string> CompletedSteps { get; init; }
11+
public required Dictionary<string, string> StepOutputs { get; init; }
12+
public string? FailedStepName { get; init; }
13+
public required CheckpointMetrics Metrics { get; init; }
14+
}
15+
16+
public sealed class CheckpointMetrics
17+
{
18+
public int TotalInputTokens { get; init; }
19+
public int TotalOutputTokens { get; init; }
20+
public double TotalCostUsd { get; init; }
21+
public int StepsCompleted { get; init; }
22+
}

0 commit comments

Comments
 (0)