diff --git a/src/migrations/004_orchestration.sql b/src/migrations/004_orchestration.sql index ce69d40..53e0240 100644 --- a/src/migrations/004_orchestration.sql +++ b/src/migrations/004_orchestration.sql @@ -49,22 +49,10 @@ CREATE TABLE IF NOT EXISTS pending_state_injections ( created_at_ms INTEGER NOT NULL ); --- Extend runs table -ALTER TABLE runs ADD COLUMN state_json TEXT; -ALTER TABLE runs ADD COLUMN workflow_id TEXT REFERENCES workflows(id); -ALTER TABLE runs ADD COLUMN forked_from_run_id TEXT REFERENCES runs(id); -ALTER TABLE runs ADD COLUMN forked_from_checkpoint_id TEXT REFERENCES checkpoints(id); -ALTER TABLE runs ADD COLUMN checkpoint_count INTEGER DEFAULT 0; - --- Extend steps table -ALTER TABLE steps ADD COLUMN state_before_json TEXT; -ALTER TABLE steps ADD COLUMN state_after_json TEXT; -ALTER TABLE steps ADD COLUMN state_updates_json TEXT; --- NOTE: parent_step_id already exists from 001_init.sql — do NOT add it again - --- Subgraph support: parent run linkage and per-run config -ALTER TABLE runs ADD COLUMN parent_run_id TEXT REFERENCES runs(id); -ALTER TABLE runs ADD COLUMN config_json TEXT; +-- Extend runs/steps tables. +-- These columns are added conditionally from store.zig because SQLite does not +-- support ALTER TABLE ADD COLUMN IF NOT EXISTS. +-- NOTE: parent_step_id already exists from 001_init.sql — do NOT add it again. -- Node-level cache (Gap 3) CREATE TABLE IF NOT EXISTS node_cache ( @@ -86,12 +74,5 @@ CREATE TABLE IF NOT EXISTS pending_writes ( ); CREATE INDEX IF NOT EXISTS idx_pending_writes_run ON pending_writes(run_id); --- Token accounting columns on runs -ALTER TABLE runs ADD COLUMN total_input_tokens INTEGER DEFAULT 0; -ALTER TABLE runs ADD COLUMN total_output_tokens INTEGER DEFAULT 0; -ALTER TABLE runs ADD COLUMN total_tokens INTEGER DEFAULT 0; - --- Token accounting columns on steps -ALTER TABLE steps ADD COLUMN input_tokens INTEGER DEFAULT 0; -ALTER TABLE steps ADD COLUMN output_tokens INTEGER DEFAULT 0; -ALTER TABLE steps ADD COLUMN total_tokens INTEGER DEFAULT 0; +-- Token accounting columns on runs/steps are also added conditionally from +-- store.zig for the same reason. diff --git a/src/store.zig b/src/store.zig index ebf0ca5..5b072cf 100644 --- a/src/store.zig +++ b/src/store.zig @@ -57,6 +57,29 @@ pub const Store = struct { allocator: std.mem.Allocator, const Self = @This(); + const MigrationColumn = struct { + table_name: []const u8, + column_name: []const u8, + column_def: []const u8, + }; + const migration_004_columns = [_]MigrationColumn{ + .{ .table_name = "runs", .column_name = "state_json", .column_def = "state_json TEXT" }, + .{ .table_name = "runs", .column_name = "workflow_id", .column_def = "workflow_id TEXT REFERENCES workflows(id)" }, + .{ .table_name = "runs", .column_name = "forked_from_run_id", .column_def = "forked_from_run_id TEXT REFERENCES runs(id)" }, + .{ .table_name = "runs", .column_name = "forked_from_checkpoint_id", .column_def = "forked_from_checkpoint_id TEXT REFERENCES checkpoints(id)" }, + .{ .table_name = "runs", .column_name = "checkpoint_count", .column_def = "checkpoint_count INTEGER DEFAULT 0" }, + .{ .table_name = "steps", .column_name = "state_before_json", .column_def = "state_before_json TEXT" }, + .{ .table_name = "steps", .column_name = "state_after_json", .column_def = "state_after_json TEXT" }, + .{ .table_name = "steps", .column_name = "state_updates_json", .column_def = "state_updates_json TEXT" }, + .{ .table_name = "runs", .column_name = "parent_run_id", .column_def = "parent_run_id TEXT REFERENCES runs(id)" }, + .{ .table_name = "runs", .column_name = "config_json", .column_def = "config_json TEXT" }, + .{ .table_name = "runs", .column_name = "total_input_tokens", .column_def = "total_input_tokens INTEGER DEFAULT 0" }, + .{ .table_name = "runs", .column_name = "total_output_tokens", .column_def = "total_output_tokens INTEGER DEFAULT 0" }, + .{ .table_name = "runs", .column_name = "total_tokens", .column_def = "total_tokens INTEGER DEFAULT 0" }, + .{ .table_name = "steps", .column_name = "input_tokens", .column_def = "input_tokens INTEGER DEFAULT 0" }, + .{ .table_name = "steps", .column_name = "output_tokens", .column_def = "output_tokens INTEGER DEFAULT 0" }, + .{ .table_name = "steps", .column_name = "total_tokens", .column_def = "total_tokens INTEGER DEFAULT 0" }, + }; pub fn init(allocator: std.mem.Allocator, db_path: [*:0]const u8) !Self { var db: ?*c.sqlite3 = null; @@ -106,50 +129,126 @@ pub const Store = struct { } } - fn migrate(self: *Self) !void { - // Migration 001 - const sql_001 = @embedFile("migrations/001_init.sql"); + fn getUserVersion(self: *Self) !i64 { + var stmt: ?*c.sqlite3_stmt = null; + const rc = c.sqlite3_prepare_v2(self.db, "PRAGMA user_version;", -1, &stmt, null); + if (rc != c.SQLITE_OK) return error.SqlitePrepareFailed; + defer _ = c.sqlite3_finalize(stmt); + const step_rc = c.sqlite3_step(stmt); + if (step_rc == c.SQLITE_ROW) { + return colInt(stmt, 0); + } + if (step_rc == c.SQLITE_DONE) return 0; + return error.SqliteStepFailed; + } + + fn setUserVersion(self: *Self, version: i64) !void { + var buf: [64]u8 = undefined; + const sql = try std.fmt.bufPrintZ(&buf, "PRAGMA user_version = {d};", .{version}); var err_msg: [*c]u8 = null; - var prc = c.sqlite3_exec(self.db, sql_001.ptr, null, null, &err_msg); - if (prc != c.SQLITE_OK) { + const rc = c.sqlite3_exec(self.db, @ptrCast(sql.ptr), null, null, &err_msg); + if (rc != c.SQLITE_OK) { if (err_msg) |msg| { - log.err("migration 001 failed (rc={d}): {s}", .{ prc, std.mem.span(msg) }); + log.err("failed to set schema user_version={d} (rc={d}): {s}", .{ version, rc, std.mem.span(msg) }); c.sqlite3_free(msg); } - return error.MigrationFailed; + return error.SqliteExecFailed; } + } - // Migration 002 — new tables (idempotent via IF NOT EXISTS) - const sql_002 = @embedFile("migrations/002_advanced_steps.sql"); - prc = c.sqlite3_exec(self.db, sql_002.ptr, null, null, &err_msg); + fn execMigrationSql(self: *Self, num: i64, sql: [*:0]const u8) !void { + var err_msg: [*c]u8 = null; + const prc = c.sqlite3_exec(self.db, sql, null, null, &err_msg); if (prc != c.SQLITE_OK) { if (err_msg) |msg| { - log.err("migration 002 failed (rc={d}): {s}", .{ prc, std.mem.span(msg) }); + log.err("migration {d:0>3} failed (rc={d}): {s}", .{ num, prc, std.mem.span(msg) }); c.sqlite3_free(msg); } return error.MigrationFailed; } + } + + fn runMigration(self: *Self, num: i64, sql: [*:0]const u8) !void { + try self.execMigrationSql(num, sql); + try self.setUserVersion(num); + log.info("migration {d:0>3} applied", .{num}); + } + + fn hasColumn(self: *Self, table_name: []const u8, column_name: []const u8) !bool { + var buf: [64]u8 = undefined; + const sql = try std.fmt.bufPrintZ(&buf, "PRAGMA table_info({s});", .{table_name}); + var stmt: ?*c.sqlite3_stmt = null; + const rc = c.sqlite3_prepare_v2(self.db, @ptrCast(sql.ptr), -1, &stmt, null); + if (rc != c.SQLITE_OK) return error.SqlitePrepareFailed; + defer _ = c.sqlite3_finalize(stmt); + + while (true) { + const step_rc = c.sqlite3_step(stmt); + if (step_rc == c.SQLITE_DONE) break; + if (step_rc != c.SQLITE_ROW) return error.SqliteStepFailed; + + const name_ptr = c.sqlite3_column_text(stmt, 1); + if (name_ptr == null) continue; + + const name_len: usize = @intCast(c.sqlite3_column_bytes(stmt, 1)); + const name = name_ptr[0..name_len]; + if (std.mem.eql(u8, name, column_name)) return true; + } - // Migration 003 — tracker integration state - const sql_003 = @embedFile("migrations/003_tracker.sql"); - prc = c.sqlite3_exec(self.db, sql_003.ptr, null, null, &err_msg); - if (prc != c.SQLITE_OK) { + return false; + } + + fn ensureColumn(self: *Self, table_name: []const u8, column_name: []const u8, column_def: []const u8) !void { + if (try self.hasColumn(table_name, column_name)) return; + + var buf: [256]u8 = undefined; + const sql = try std.fmt.bufPrintZ(&buf, "ALTER TABLE {s} ADD COLUMN {s};", .{ table_name, column_def }); + var err_msg: [*c]u8 = null; + const rc = c.sqlite3_exec(self.db, @ptrCast(sql.ptr), null, null, &err_msg); + if (rc != c.SQLITE_OK) { if (err_msg) |msg| { - log.err("migration 003 failed (rc={d}): {s}", .{ prc, std.mem.span(msg) }); + log.err("migration 004 failed to add {s}.{s} (rc={d}): {s}", .{ table_name, column_name, rc, std.mem.span(msg) }); c.sqlite3_free(msg); } return error.MigrationFailed; } + } - // Migration 004 — orchestration schema (workflows, checkpoints, agent_events) + fn migrateOrchestration(self: *Self) !void { const sql_004 = @embedFile("migrations/004_orchestration.sql"); - prc = c.sqlite3_exec(self.db, sql_004.ptr, null, null, &err_msg); - if (prc != c.SQLITE_OK) { - if (err_msg) |msg| { - log.err("migration 004 failed (rc={d}): {s}", .{ prc, std.mem.span(msg) }); - c.sqlite3_free(msg); - } - return error.MigrationFailed; + try self.execMigrationSql(4, sql_004.ptr); + for (migration_004_columns) |col| { + try self.ensureColumn(col.table_name, col.column_name, col.column_def); + } + try self.setUserVersion(4); + log.info("migration 004 applied", .{}); + } + + fn migrate(self: *Self) !void { + const current = try self.getUserVersion(); + log.info("schema user_version={d}", .{current}); + + // Migration 001 + if (current < 1) { + const sql_001 = @embedFile("migrations/001_init.sql"); + try self.runMigration(1, sql_001.ptr); + } + + // Migration 002 — new tables (idempotent via IF NOT EXISTS) + if (current < 2) { + const sql_002 = @embedFile("migrations/002_advanced_steps.sql"); + try self.runMigration(2, sql_002.ptr); + } + + // Migration 003 — tracker integration state + if (current < 3) { + const sql_003 = @embedFile("migrations/003_tracker.sql"); + try self.runMigration(3, sql_003.ptr); + } + + // Migration 004 — orchestration schema (workflows, checkpoints, agent_events) + if (current < 4) { + try self.migrateOrchestration(); } } @@ -1797,6 +1896,29 @@ test "Store: init and deinit" { defer s.deinit(); } +test "Store: reopens legacy schema with user_version reset to zero" { + const allocator = std.testing.allocator; + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + + const root = try tmp.dir.realpathAlloc(allocator, "."); + defer allocator.free(root); + const db_path = try std.fs.path.join(allocator, &.{ root, "legacy.db" }); + defer allocator.free(db_path); + const db_path_z = try allocator.dupeZ(u8, db_path); + defer allocator.free(db_path_z); + + { + var initial = try Store.init(allocator, db_path_z); + try initial.setUserVersion(0); + initial.deinit(); + } + + var reopened = try Store.init(allocator, db_path_z); + defer reopened.deinit(); + try std.testing.expectEqual(@as(i64, 4), try reopened.getUserVersion()); +} + test "Store: insert and get worker" { const allocator = std.testing.allocator; var s = try Store.init(allocator, ":memory:");