Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 6 additions & 25 deletions src/migrations/004_orchestration.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,10 @@ CREATE TABLE IF NOT EXISTS pending_state_injections (
created_at_ms INTEGER NOT NULL
);

-- Extend runs table
ALTER TABLE runs ADD COLUMN state_json TEXT;
ALTER TABLE runs ADD COLUMN workflow_id TEXT REFERENCES workflows(id);
ALTER TABLE runs ADD COLUMN forked_from_run_id TEXT REFERENCES runs(id);
ALTER TABLE runs ADD COLUMN forked_from_checkpoint_id TEXT REFERENCES checkpoints(id);
ALTER TABLE runs ADD COLUMN checkpoint_count INTEGER DEFAULT 0;

-- Extend steps table
ALTER TABLE steps ADD COLUMN state_before_json TEXT;
ALTER TABLE steps ADD COLUMN state_after_json TEXT;
ALTER TABLE steps ADD COLUMN state_updates_json TEXT;
-- NOTE: parent_step_id already exists from 001_init.sql — do NOT add it again

-- Subgraph support: parent run linkage and per-run config
ALTER TABLE runs ADD COLUMN parent_run_id TEXT REFERENCES runs(id);
ALTER TABLE runs ADD COLUMN config_json TEXT;
-- Extend runs/steps tables.
-- These columns are added conditionally from store.zig because SQLite does not
-- support ALTER TABLE ADD COLUMN IF NOT EXISTS.
-- NOTE: parent_step_id already exists from 001_init.sql — do NOT add it again.

-- Node-level cache (Gap 3)
CREATE TABLE IF NOT EXISTS node_cache (
Expand All @@ -86,12 +74,5 @@ CREATE TABLE IF NOT EXISTS pending_writes (
);
CREATE INDEX IF NOT EXISTS idx_pending_writes_run ON pending_writes(run_id);

-- Token accounting columns on runs
ALTER TABLE runs ADD COLUMN total_input_tokens INTEGER DEFAULT 0;
ALTER TABLE runs ADD COLUMN total_output_tokens INTEGER DEFAULT 0;
ALTER TABLE runs ADD COLUMN total_tokens INTEGER DEFAULT 0;

-- Token accounting columns on steps
ALTER TABLE steps ADD COLUMN input_tokens INTEGER DEFAULT 0;
ALTER TABLE steps ADD COLUMN output_tokens INTEGER DEFAULT 0;
ALTER TABLE steps ADD COLUMN total_tokens INTEGER DEFAULT 0;
-- Token accounting columns on runs/steps are also added conditionally from
-- store.zig for the same reason.
170 changes: 146 additions & 24 deletions src/store.zig
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,29 @@ pub const Store = struct {
allocator: std.mem.Allocator,

const Self = @This();
const MigrationColumn = struct {
table_name: []const u8,
column_name: []const u8,
column_def: []const u8,
};
const migration_004_columns = [_]MigrationColumn{
.{ .table_name = "runs", .column_name = "state_json", .column_def = "state_json TEXT" },
.{ .table_name = "runs", .column_name = "workflow_id", .column_def = "workflow_id TEXT REFERENCES workflows(id)" },
.{ .table_name = "runs", .column_name = "forked_from_run_id", .column_def = "forked_from_run_id TEXT REFERENCES runs(id)" },
.{ .table_name = "runs", .column_name = "forked_from_checkpoint_id", .column_def = "forked_from_checkpoint_id TEXT REFERENCES checkpoints(id)" },
.{ .table_name = "runs", .column_name = "checkpoint_count", .column_def = "checkpoint_count INTEGER DEFAULT 0" },
.{ .table_name = "steps", .column_name = "state_before_json", .column_def = "state_before_json TEXT" },
.{ .table_name = "steps", .column_name = "state_after_json", .column_def = "state_after_json TEXT" },
.{ .table_name = "steps", .column_name = "state_updates_json", .column_def = "state_updates_json TEXT" },
.{ .table_name = "runs", .column_name = "parent_run_id", .column_def = "parent_run_id TEXT REFERENCES runs(id)" },
.{ .table_name = "runs", .column_name = "config_json", .column_def = "config_json TEXT" },
.{ .table_name = "runs", .column_name = "total_input_tokens", .column_def = "total_input_tokens INTEGER DEFAULT 0" },
.{ .table_name = "runs", .column_name = "total_output_tokens", .column_def = "total_output_tokens INTEGER DEFAULT 0" },
.{ .table_name = "runs", .column_name = "total_tokens", .column_def = "total_tokens INTEGER DEFAULT 0" },
.{ .table_name = "steps", .column_name = "input_tokens", .column_def = "input_tokens INTEGER DEFAULT 0" },
.{ .table_name = "steps", .column_name = "output_tokens", .column_def = "output_tokens INTEGER DEFAULT 0" },
.{ .table_name = "steps", .column_name = "total_tokens", .column_def = "total_tokens INTEGER DEFAULT 0" },
};

pub fn init(allocator: std.mem.Allocator, db_path: [*:0]const u8) !Self {
var db: ?*c.sqlite3 = null;
Expand Down Expand Up @@ -106,50 +129,126 @@ pub const Store = struct {
}
}

fn migrate(self: *Self) !void {
// Migration 001
const sql_001 = @embedFile("migrations/001_init.sql");
fn getUserVersion(self: *Self) !i64 {
var stmt: ?*c.sqlite3_stmt = null;
const rc = c.sqlite3_prepare_v2(self.db, "PRAGMA user_version;", -1, &stmt, null);
if (rc != c.SQLITE_OK) return error.SqlitePrepareFailed;
defer _ = c.sqlite3_finalize(stmt);
const step_rc = c.sqlite3_step(stmt);
if (step_rc == c.SQLITE_ROW) {
return colInt(stmt, 0);
}
if (step_rc == c.SQLITE_DONE) return 0;
return error.SqliteStepFailed;
}

fn setUserVersion(self: *Self, version: i64) !void {
var buf: [64]u8 = undefined;
const sql = try std.fmt.bufPrintZ(&buf, "PRAGMA user_version = {d};", .{version});
var err_msg: [*c]u8 = null;
var prc = c.sqlite3_exec(self.db, sql_001.ptr, null, null, &err_msg);
if (prc != c.SQLITE_OK) {
const rc = c.sqlite3_exec(self.db, @ptrCast(sql.ptr), null, null, &err_msg);
if (rc != c.SQLITE_OK) {
if (err_msg) |msg| {
log.err("migration 001 failed (rc={d}): {s}", .{ prc, std.mem.span(msg) });
log.err("failed to set schema user_version={d} (rc={d}): {s}", .{ version, rc, std.mem.span(msg) });
c.sqlite3_free(msg);
}
return error.MigrationFailed;
return error.SqliteExecFailed;
}
}

// Migration 002 — new tables (idempotent via IF NOT EXISTS)
const sql_002 = @embedFile("migrations/002_advanced_steps.sql");
prc = c.sqlite3_exec(self.db, sql_002.ptr, null, null, &err_msg);
fn execMigrationSql(self: *Self, num: i64, sql: [*:0]const u8) !void {
var err_msg: [*c]u8 = null;
const prc = c.sqlite3_exec(self.db, sql, null, null, &err_msg);
if (prc != c.SQLITE_OK) {
if (err_msg) |msg| {
log.err("migration 002 failed (rc={d}): {s}", .{ prc, std.mem.span(msg) });
log.err("migration {d:0>3} failed (rc={d}): {s}", .{ num, prc, std.mem.span(msg) });
c.sqlite3_free(msg);
}
return error.MigrationFailed;
}
}

fn runMigration(self: *Self, num: i64, sql: [*:0]const u8) !void {
try self.execMigrationSql(num, sql);
try self.setUserVersion(num);
log.info("migration {d:0>3} applied", .{num});
}

fn hasColumn(self: *Self, table_name: []const u8, column_name: []const u8) !bool {
var buf: [64]u8 = undefined;
const sql = try std.fmt.bufPrintZ(&buf, "PRAGMA table_info({s});", .{table_name});
var stmt: ?*c.sqlite3_stmt = null;
const rc = c.sqlite3_prepare_v2(self.db, @ptrCast(sql.ptr), -1, &stmt, null);
if (rc != c.SQLITE_OK) return error.SqlitePrepareFailed;
defer _ = c.sqlite3_finalize(stmt);

while (true) {
const step_rc = c.sqlite3_step(stmt);
if (step_rc == c.SQLITE_DONE) break;
if (step_rc != c.SQLITE_ROW) return error.SqliteStepFailed;

const name_ptr = c.sqlite3_column_text(stmt, 1);
if (name_ptr == null) continue;

const name_len: usize = @intCast(c.sqlite3_column_bytes(stmt, 1));
const name = name_ptr[0..name_len];
if (std.mem.eql(u8, name, column_name)) return true;
}

// Migration 003 — tracker integration state
const sql_003 = @embedFile("migrations/003_tracker.sql");
prc = c.sqlite3_exec(self.db, sql_003.ptr, null, null, &err_msg);
if (prc != c.SQLITE_OK) {
return false;
}

fn ensureColumn(self: *Self, table_name: []const u8, column_name: []const u8, column_def: []const u8) !void {
if (try self.hasColumn(table_name, column_name)) return;

var buf: [256]u8 = undefined;
const sql = try std.fmt.bufPrintZ(&buf, "ALTER TABLE {s} ADD COLUMN {s};", .{ table_name, column_def });
var err_msg: [*c]u8 = null;
const rc = c.sqlite3_exec(self.db, @ptrCast(sql.ptr), null, null, &err_msg);
if (rc != c.SQLITE_OK) {
if (err_msg) |msg| {
log.err("migration 003 failed (rc={d}): {s}", .{ prc, std.mem.span(msg) });
log.err("migration 004 failed to add {s}.{s} (rc={d}): {s}", .{ table_name, column_name, rc, std.mem.span(msg) });
c.sqlite3_free(msg);
}
return error.MigrationFailed;
}
}

// Migration 004 — orchestration schema (workflows, checkpoints, agent_events)
fn migrateOrchestration(self: *Self) !void {
const sql_004 = @embedFile("migrations/004_orchestration.sql");
prc = c.sqlite3_exec(self.db, sql_004.ptr, null, null, &err_msg);
if (prc != c.SQLITE_OK) {
if (err_msg) |msg| {
log.err("migration 004 failed (rc={d}): {s}", .{ prc, std.mem.span(msg) });
c.sqlite3_free(msg);
}
return error.MigrationFailed;
try self.execMigrationSql(4, sql_004.ptr);
for (migration_004_columns) |col| {
try self.ensureColumn(col.table_name, col.column_name, col.column_def);
}
try self.setUserVersion(4);
log.info("migration 004 applied", .{});
}

fn migrate(self: *Self) !void {
const current = try self.getUserVersion();
log.info("schema user_version={d}", .{current});

// Migration 001
if (current < 1) {
const sql_001 = @embedFile("migrations/001_init.sql");
try self.runMigration(1, sql_001.ptr);
}

// Migration 002 — new tables (idempotent via IF NOT EXISTS)
if (current < 2) {
const sql_002 = @embedFile("migrations/002_advanced_steps.sql");
try self.runMigration(2, sql_002.ptr);
}

// Migration 003 — tracker integration state
if (current < 3) {
const sql_003 = @embedFile("migrations/003_tracker.sql");
try self.runMigration(3, sql_003.ptr);
}

// Migration 004 — orchestration schema (workflows, checkpoints, agent_events)
if (current < 4) {
try self.migrateOrchestration();
}
}

Expand Down Expand Up @@ -1797,6 +1896,29 @@ test "Store: init and deinit" {
defer s.deinit();
}

test "Store: reopens legacy schema with user_version reset to zero" {
const allocator = std.testing.allocator;
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();

const root = try tmp.dir.realpathAlloc(allocator, ".");
defer allocator.free(root);
const db_path = try std.fs.path.join(allocator, &.{ root, "legacy.db" });
defer allocator.free(db_path);
const db_path_z = try allocator.dupeZ(u8, db_path);
defer allocator.free(db_path_z);

{
var initial = try Store.init(allocator, db_path_z);
try initial.setUserVersion(0);
initial.deinit();
}

var reopened = try Store.init(allocator, db_path_z);
defer reopened.deinit();
try std.testing.expectEqual(@as(i64, 4), try reopened.getUserVersion());
}

test "Store: insert and get worker" {
const allocator = std.testing.allocator;
var s = try Store.init(allocator, ":memory:");
Expand Down
Loading