From aaea2751ca0d2ca576da20e96a5f11df9a34b24e Mon Sep 17 00:00:00 2001 From: mdp28 <212812676+mdp28@users.noreply.github.com> Date: Fri, 15 May 2026 16:50:34 +1000 Subject: [PATCH 1/2] Add external data replication plugin --- dist/plugins.ts | 1 + plugins/replication/README.md | 28 + plugins/replication/index.test.ts | 241 +++++++++ plugins/replication/index.ts | 872 ++++++++++++++++++++++++++++++ plugins/replication/meta.json | 21 + src/plugin.test.ts | 2 +- src/plugin.ts | 1 - 7 files changed, 1164 insertions(+), 2 deletions(-) create mode 100644 plugins/replication/README.md create mode 100644 plugins/replication/index.test.ts create mode 100644 plugins/replication/index.ts create mode 100644 plugins/replication/meta.json diff --git a/dist/plugins.ts b/dist/plugins.ts index 7dd252a..0a5cac5 100644 --- a/dist/plugins.ts +++ b/dist/plugins.ts @@ -6,3 +6,4 @@ export { ChangeDataCapturePlugin } from '../plugins/cdc' export { QueryLogPlugin } from '../plugins/query-log' export { ResendPlugin } from '../plugins/resend' export { ClerkPlugin } from '../plugins/clerk' +export { ReplicationPlugin } from '../plugins/replication' diff --git a/plugins/replication/README.md b/plugins/replication/README.md new file mode 100644 index 0000000..6c1bc80 --- /dev/null +++ b/plugins/replication/README.md @@ -0,0 +1,28 @@ +# Replication Plugin + +Pull rows from a configured external data source into the internal Durable Object SQLite database so StarbaseDB can serve as a close-to-edge read replica. + +```ts +import { ReplicationPlugin } from '@outerbase/starbasedb/plugins' + +new ReplicationPlugin({ + defaultIntervalSeconds: 60, + tables: [ + { + sourceTable: 'public.users', + targetTable: 'users', + columns: ['id', 'email', 'created_at', 'updated_at'], + cursorColumn: 'updated_at', + cursorValueType: 'date', + primaryKey: 'id', + batchSize: 500, + mode: 'upsert', + }, + ], +}) +``` + +The plugin expects `dataSource.source` to be `internal` and `dataSource.external` to contain the remote database connection. It stores cursor state in `tmp_starbasedb_replication_state`, records each run in `tmp_starbasedb_replication_runs`, and can be triggered manually with `POST /replication/pull` or `POST /replication/pull/:tableName`. + +Set `autoPull: false` to disable interval checks before queries and rely only on the authenticated pull routes. Static `where` clauses are supported for trusted application configuration; table and column identifiers are validated before SQL is generated. + diff --git a/plugins/replication/index.test.ts b/plugins/replication/index.test.ts new file mode 100644 index 0000000..7ece103 --- /dev/null +++ b/plugins/replication/index.test.ts @@ -0,0 +1,241 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { ReplicationPlugin } from './index' +import { StarbaseDBConfiguration } from '../../src/handler' +import { DataSource } from '../../src/types' + +let mockConfig: StarbaseDBConfiguration +let mockDataSource: DataSource +let executeQuery: ReturnType +let externalExecutor: ReturnType +let stateRows: any[] + +beforeEach(() => { + stateRows = [] + executeQuery = vi.fn(async ({ sql }) => { + if ( + sql.includes('FROM tmp_starbasedb_replication_state') && + sql.includes('WHERE table_name = ?') + ) { + return stateRows + } + + return [] + }) + externalExecutor = vi.fn() + mockConfig = { role: 'admin' } + mockDataSource = { + source: 'internal', + external: { + dialect: 'postgresql', + host: 'db.example.com', + port: 5432, + user: 'postgres', + password: 'password', + database: 'app', + }, + rpc: { + executeQuery, + }, + } as unknown as DataSource +}) + +describe('ReplicationPlugin', () => { + it('pulls external rows into the internal table and advances cursor state', async () => { + externalExecutor.mockResolvedValue([ + { id: 1, email: 'a@example.com', updated_at: '2026-05-14T00:00:00Z' }, + { id: 2, email: 'b@example.com', updated_at: '2026-05-15T00:00:00Z' }, + ]) + const plugin = new ReplicationPlugin({ + defaultIntervalSeconds: 0, + externalExecutor, + tables: [ + { + sourceTable: 'public.users', + targetTable: 'users', + columns: ['id', 'email', 'updated_at'], + cursorColumn: 'updated_at', + cursorValueType: 'date', + primaryKey: 'id', + batchSize: 2, + }, + ], + }) + + const result = await plugin.sync({ + dataSource: mockDataSource, + config: mockConfig, + force: true, + }) + + expect(externalExecutor).toHaveBeenCalledWith({ + sql: 'SELECT "id", "email", "updated_at" FROM "public"."users" ORDER BY "updated_at" ASC LIMIT 2', + params: [], + dataSource: mockDataSource, + config: mockConfig, + }) + expect(executeQuery).toHaveBeenCalledWith({ + sql: expect.stringContaining('ON CONFLICT ("id") DO UPDATE SET'), + params: [1, 'a@example.com', '2026-05-14T00:00:00Z'], + }) + expect(executeQuery).toHaveBeenCalledWith({ + sql: expect.stringContaining( + 'INSERT INTO tmp_starbasedb_replication_state' + ), + params: [ + 'users', + '2026-05-15T00:00:00Z', + 'date', + expect.any(String), + 2, + ], + }) + expect(result).toEqual([ + expect.objectContaining({ + table: 'users', + sourceTable: 'public.users', + targetTable: 'users', + skipped: false, + rowsRead: 2, + rowsWritten: 2, + cursorEnd: '2026-05-15T00:00:00Z', + }), + ]) + }) + + it('uses the stored cursor value for append-only polling', async () => { + stateRows = [ + { + table_name: 'events', + last_cursor_value: '5', + last_cursor_type: 'number', + last_synced_at: '2026-05-15T00:00:00Z', + total_rows_synced: 5, + }, + ] + externalExecutor.mockResolvedValue([{ id: 6, name: 'created' }]) + const plugin = new ReplicationPlugin({ + externalExecutor, + tables: [ + { + sourceTable: 'events', + cursorColumn: 'id', + cursorValueType: 'number', + batchSize: 10, + }, + ], + }) + + await plugin.sync({ + dataSource: mockDataSource, + config: mockConfig, + force: true, + }) + + expect(externalExecutor).toHaveBeenCalledWith( + expect.objectContaining({ + sql: 'SELECT * FROM "events" WHERE "id" > ? ORDER BY "id" ASC LIMIT 10', + params: [5], + }) + ) + }) + + it('skips auto sync when a table interval is not due', async () => { + stateRows = [ + { + table_name: 'users', + last_synced_at: new Date().toISOString(), + total_rows_synced: 10, + }, + ] + const plugin = new ReplicationPlugin({ + externalExecutor, + tables: [ + { + sourceTable: 'users', + intervalSeconds: 60, + }, + ], + }) + + const result = await plugin.sync({ + dataSource: mockDataSource, + config: mockConfig, + force: false, + }) + + expect(externalExecutor).not.toHaveBeenCalled() + expect(result).toEqual([ + expect.objectContaining({ + table: 'users', + skipped: true, + rowsRead: 0, + rowsWritten: 0, + }), + ]) + }) + + it('runs due replication in beforeQuery without modifying the user query', async () => { + externalExecutor.mockResolvedValue([]) + const plugin = new ReplicationPlugin({ + externalExecutor, + tables: [{ sourceTable: 'users', intervalSeconds: 0 }], + }) + + const result = await plugin.beforeQuery({ + sql: 'SELECT * FROM users', + params: [1], + dataSource: mockDataSource, + config: mockConfig, + }) + + expect(externalExecutor).toHaveBeenCalledTimes(1) + expect(result).toEqual({ + sql: 'SELECT * FROM users', + params: [1], + }) + }) + + it('requires an internal target data source with an external source configured', async () => { + const plugin = new ReplicationPlugin({ + externalExecutor, + tables: [{ sourceTable: 'users' }], + }) + + await expect( + plugin.sync({ + dataSource: { ...mockDataSource, source: 'external' } as any, + config: mockConfig, + force: true, + }) + ).rejects.toThrow( + 'ReplicationPlugin can only write to an internal StarbaseDB data source.' + ) + + await expect( + plugin.sync({ + dataSource: { ...mockDataSource, external: undefined } as any, + config: mockConfig, + force: true, + }) + ).rejects.toThrow( + 'ReplicationPlugin requires dataSource.external to pull from an external source.' + ) + }) + + it('rejects unsafe table and column identifiers', () => { + expect( + () => + new ReplicationPlugin({ + tables: [{ sourceTable: 'users; DROP TABLE users' }], + }) + ).toThrow('Invalid SQL identifier') + + expect( + () => + new ReplicationPlugin({ + tables: [{ sourceTable: 'users', columns: ['id, name'] }], + }) + ).toThrow('Invalid SQL identifier') + }) +}) + diff --git a/plugins/replication/index.ts b/plugins/replication/index.ts new file mode 100644 index 0000000..a7dcb58 --- /dev/null +++ b/plugins/replication/index.ts @@ -0,0 +1,872 @@ +import { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler' +import { executeExternalQuery } from '../../src/operation' +import { StarbasePlugin } from '../../src/plugin' +import { DataSource, QueryResult } from '../../src/types' +import { createResponse } from '../../src/utils' + +const SQL_QUERIES = { + CREATE_STATE_TABLE: ` + CREATE TABLE IF NOT EXISTS tmp_starbasedb_replication_state ( + table_name TEXT PRIMARY KEY, + last_cursor_value TEXT, + last_cursor_type TEXT, + last_synced_at TEXT, + total_rows_synced INTEGER NOT NULL DEFAULT 0, + last_error TEXT + ) + `, + CREATE_RUNS_TABLE: ` + CREATE TABLE IF NOT EXISTS tmp_starbasedb_replication_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + table_name TEXT NOT NULL, + started_at TEXT NOT NULL, + finished_at TEXT NOT NULL, + rows_read INTEGER NOT NULL, + rows_written INTEGER NOT NULL, + cursor_start TEXT, + cursor_end TEXT, + error TEXT + ) + `, + SELECT_STATE: ` + SELECT table_name, last_cursor_value, last_cursor_type, last_synced_at, total_rows_synced, last_error + FROM tmp_starbasedb_replication_state + WHERE table_name = ? + `, + SELECT_ALL_STATE: ` + SELECT table_name, last_cursor_value, last_cursor_type, last_synced_at, total_rows_synced, last_error + FROM tmp_starbasedb_replication_state + ORDER BY table_name + `, + UPSERT_STATE: ` + INSERT INTO tmp_starbasedb_replication_state ( + table_name, last_cursor_value, last_cursor_type, last_synced_at, total_rows_synced, last_error + ) VALUES (?, ?, ?, ?, ?, NULL) + ON CONFLICT(table_name) DO UPDATE SET + last_cursor_value = excluded.last_cursor_value, + last_cursor_type = excluded.last_cursor_type, + last_synced_at = excluded.last_synced_at, + total_rows_synced = excluded.total_rows_synced, + last_error = NULL + `, + UPDATE_STATE_ERROR: ` + INSERT INTO tmp_starbasedb_replication_state ( + table_name, last_synced_at, total_rows_synced, last_error + ) VALUES (?, ?, 0, ?) + ON CONFLICT(table_name) DO UPDATE SET + last_synced_at = excluded.last_synced_at, + last_error = excluded.last_error + `, + INSERT_RUN: ` + INSERT INTO tmp_starbasedb_replication_runs ( + table_name, started_at, finished_at, rows_read, rows_written, cursor_start, cursor_end, error + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `, +} + +type ExternalExecutor = typeof executeExternalQuery + +export type ReplicationMode = + | 'upsert' + | 'replace' + | 'insert' + | 'insert-or-ignore' + +export type CursorValueType = 'number' | 'string' | 'date' + +export interface ReplicationTableOptions { + sourceTable: string + targetTable?: string + columns?: string[] + cursorColumn?: string + cursorValueType?: CursorValueType + primaryKey?: string | string[] + where?: string + batchSize?: number + intervalSeconds?: number + mode?: ReplicationMode + transform?: ( + row: Record + ) => Record | Promise> +} + +export interface ReplicationPluginOptions { + tables: ReplicationTableOptions[] + autoPull?: boolean + defaultBatchSize?: number + defaultIntervalSeconds?: number + failQueriesOnError?: boolean + continueOnTableError?: boolean + pathPrefix?: string + externalExecutor?: ExternalExecutor +} + +export interface ReplicationResult { + table: string + sourceTable: string + targetTable: string + skipped: boolean + rowsRead: number + rowsWritten: number + cursorStart?: string | null + cursorEnd?: string | null + error?: string +} + +type ReplicationStateRow = QueryResult & { + table_name: string + last_cursor_value?: string | null + last_cursor_type?: CursorValueType | null + last_synced_at?: string | null + total_rows_synced?: number | string | null + last_error?: string | null +} + +type NormalizedTableOptions = Required< + Pick +> & + Omit & { + batchSize: number + intervalSeconds: number + primaryKey: string[] + mode: ReplicationMode + } + +const IDENTIFIER_REGEX = /^[A-Za-z_][A-Za-z0-9_]*$/ + +export class ReplicationPlugin extends StarbasePlugin { + public pathPrefix: string + private readonly tables: NormalizedTableOptions[] + private readonly autoPull: boolean + private readonly failQueriesOnError: boolean + private readonly continueOnTableError: boolean + private readonly externalExecutor: ExternalExecutor + private dataSource?: DataSource + private config?: StarbaseDBConfiguration + private initialized = false + private initPromise?: Promise + private inFlightSyncs = new Map>() + + constructor(opts: ReplicationPluginOptions) { + super('starbasedb:replication', { requiresAuth: true }) + + if (!opts.tables.length) { + throw new Error('ReplicationPlugin requires at least one table.') + } + + this.pathPrefix = opts.pathPrefix ?? '/replication' + this.autoPull = opts.autoPull ?? true + this.failQueriesOnError = opts.failQueriesOnError ?? false + this.continueOnTableError = opts.continueOnTableError ?? true + this.externalExecutor = opts.externalExecutor ?? executeExternalQuery + this.tables = opts.tables.map((table) => + this.normalizeTableOptions(table, opts) + ) + } + + override async register(app: StarbaseApp) { + app.use(async (c, next) => { + this.dataSource = c.get('dataSource') + this.config = c.get('config') + await next() + }) + + app.get(`${this.pathPrefix}/status`, async () => { + if (!this.dataSource) { + return createResponse( + undefined, + 'ReplicationPlugin has not been initialized.', + 500 + ) + } + + await this.ensureInitialized(this.dataSource) + const state = await this.readAllState(this.dataSource) + + return createResponse( + { + configuredTables: this.tables.map((table) => ({ + sourceTable: table.sourceTable, + targetTable: table.targetTable, + cursorColumn: table.cursorColumn, + batchSize: table.batchSize, + intervalSeconds: table.intervalSeconds, + mode: table.mode, + })), + state, + }, + undefined, + 200 + ) + }) + + app.post(`${this.pathPrefix}/pull`, async (c) => { + if (!this.dataSource || !this.config) { + return createResponse( + undefined, + 'ReplicationPlugin has not been initialized.', + 500 + ) + } + + const body = await this.safeJson<{ tables?: string[] }>(c.req.raw) + const result = await this.sync({ + dataSource: this.dataSource, + config: this.config, + force: true, + tableNames: body?.tables, + }) + + return createResponse(result, undefined, this.hasError(result) ? 500 : 200) + }) + + app.post(`${this.pathPrefix}/pull/:tableName`, async (c) => { + if (!this.dataSource || !this.config) { + return createResponse( + undefined, + 'ReplicationPlugin has not been initialized.', + 500 + ) + } + + const tableName = c.req.param('tableName') + const result = await this.sync({ + dataSource: this.dataSource, + config: this.config, + force: true, + tableNames: [tableName], + }) + + return createResponse(result, undefined, this.hasError(result) ? 500 : 200) + }) + } + + override async beforeQuery(opts: { + sql: string + params?: unknown[] + dataSource?: DataSource + config?: StarbaseDBConfiguration + }): Promise<{ sql: string; params?: unknown[] }> { + if (!this.autoPull || !opts.dataSource || !opts.config) { + return { sql: opts.sql, params: opts.params } + } + + try { + await this.sync({ + dataSource: opts.dataSource, + config: opts.config, + force: false, + }) + } catch (error) { + console.error('ReplicationPlugin beforeQuery failed:', error) + + if (this.failQueriesOnError) { + throw error + } + } + + return { sql: opts.sql, params: opts.params } + } + + public async sync(opts: { + dataSource: DataSource + config: StarbaseDBConfiguration + force?: boolean + tableNames?: string[] + }): Promise { + const { dataSource, config, force = false, tableNames } = opts + + await this.ensureInitialized(dataSource) + + const selectedTables = this.selectTables(tableNames) + const results: ReplicationResult[] = [] + + for (const table of selectedTables) { + try { + if (!force && !(await this.isDue(table, dataSource))) { + results.push(this.skippedResult(table)) + continue + } + + results.push(await this.syncTable(table, dataSource, config)) + } catch (error) { + const message = + error instanceof Error ? error.message : String(error) + await this.recordError(table, dataSource, message) + results.push({ + ...this.emptyResult(table), + skipped: false, + error: message, + }) + + if (!this.continueOnTableError) { + throw error + } + } + } + + return results + } + + private async syncTable( + table: NormalizedTableOptions, + dataSource: DataSource, + config: StarbaseDBConfiguration + ): Promise { + const key = this.tableKey(table) + const existing = this.inFlightSyncs.get(key) + + if (existing) { + return existing + } + + const promise = this.runTableSync(table, dataSource, config).finally( + () => { + this.inFlightSyncs.delete(key) + } + ) + this.inFlightSyncs.set(key, promise) + return promise + } + + private async runTableSync( + table: NormalizedTableOptions, + dataSource: DataSource, + config: StarbaseDBConfiguration + ): Promise { + const startedAt = new Date().toISOString() + const state = await this.readState(table, dataSource) + const query = this.buildSelectQuery(table, state, dataSource) + const externalRows = await this.externalExecutor({ + sql: query.sql, + params: query.params, + dataSource, + config, + }) + const rows = Array.isArray(externalRows) + ? (externalRows as Record[]) + : [] + const transformedRows = await this.transformRows(table, rows) + const cursorEnd = this.findCursorEnd(table, rows, state) + + for (const row of transformedRows) { + const insert = this.buildInsertQuery(table, row) + await dataSource.rpc.executeQuery({ + sql: insert.sql, + params: insert.params, + }) + } + + await this.upsertState({ + table, + dataSource, + state, + cursorEnd, + rowsWritten: transformedRows.length, + }) + + const result: ReplicationResult = { + table: this.tableKey(table), + sourceTable: table.sourceTable, + targetTable: table.targetTable, + skipped: false, + rowsRead: rows.length, + rowsWritten: transformedRows.length, + cursorStart: state?.last_cursor_value ?? null, + cursorEnd: cursorEnd == null ? null : String(cursorEnd), + } + + await this.recordRun(table, dataSource, { + ...result, + startedAt, + finishedAt: new Date().toISOString(), + }) + + return result + } + + private async ensureInitialized(dataSource: DataSource): Promise { + this.assertInternalReplicaDataSource(dataSource) + + if (this.initialized) { + return + } + + this.initPromise ??= (async () => { + await dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.CREATE_STATE_TABLE, + params: [], + }) + await dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.CREATE_RUNS_TABLE, + params: [], + }) + this.initialized = true + })() + + await this.initPromise + } + + private assertInternalReplicaDataSource(dataSource: DataSource) { + if (dataSource.source !== 'internal') { + throw new Error( + 'ReplicationPlugin can only write to an internal StarbaseDB data source.' + ) + } + + if (!dataSource.external) { + throw new Error( + 'ReplicationPlugin requires dataSource.external to pull from an external source.' + ) + } + } + + private buildSelectQuery( + table: NormalizedTableOptions, + state: ReplicationStateRow | undefined, + dataSource: DataSource + ): { sql: string; params: unknown[] } { + const quoteStyle = + dataSource.external && 'dialect' in dataSource.external + ? dataSource.external.dialect + : 'sqlite' + const columns = table.columns?.length + ? table.columns.map((column) => this.quoteIdentifier(column, quoteStyle)).join(', ') + : '*' + const conditions: string[] = [] + const params: unknown[] = [] + + if (table.where) { + conditions.push(`(${table.where})`) + } + + if (table.cursorColumn && state?.last_cursor_value != null) { + conditions.push( + `${this.quoteIdentifier(table.cursorColumn, quoteStyle)} > ?` + ) + params.push(this.deserializeCursorValue(state)) + } + + const where = conditions.length + ? ` WHERE ${conditions.join(' AND ')}` + : '' + const orderBy = table.cursorColumn + ? ` ORDER BY ${this.quoteIdentifier(table.cursorColumn, quoteStyle)} ASC` + : '' + + return { + sql: `SELECT ${columns} FROM ${this.quoteQualifiedIdentifier( + table.sourceTable, + quoteStyle + )}${where}${orderBy} LIMIT ${table.batchSize}`, + params, + } + } + + private buildInsertQuery( + table: NormalizedTableOptions, + row: Record + ): { sql: string; params: unknown[] } { + const columns = Object.keys(row) + + if (!columns.length) { + throw new Error( + `Replication row for ${table.targetTable} does not contain any columns.` + ) + } + + const quotedColumns = columns.map((column) => + this.quoteIdentifier(column, 'sqlite') + ) + const placeholders = columns.map(() => '?').join(', ') + const params = columns.map((column) => row[column]) + const targetTable = this.quoteQualifiedIdentifier( + table.targetTable, + 'sqlite' + ) + + if (table.mode === 'insert') { + return { + sql: `INSERT INTO ${targetTable} (${quotedColumns.join( + ', ' + )}) VALUES (${placeholders})`, + params, + } + } + + if (table.mode === 'insert-or-ignore') { + return { + sql: `INSERT OR IGNORE INTO ${targetTable} (${quotedColumns.join( + ', ' + )}) VALUES (${placeholders})`, + params, + } + } + + if (table.mode === 'replace' || table.primaryKey.length === 0) { + return { + sql: `INSERT OR REPLACE INTO ${targetTable} (${quotedColumns.join( + ', ' + )}) VALUES (${placeholders})`, + params, + } + } + + const conflictColumns = table.primaryKey.map((column) => + this.quoteIdentifier(column, 'sqlite') + ) + const updateColumns = columns.filter( + (column) => !table.primaryKey.includes(column) + ) + const conflictAction = updateColumns.length + ? `DO UPDATE SET ${updateColumns + .map((column) => { + const quoted = this.quoteIdentifier(column, 'sqlite') + return `${quoted} = excluded.${quoted}` + }) + .join(', ')}` + : 'DO NOTHING' + + return { + sql: `INSERT INTO ${targetTable} (${quotedColumns.join( + ', ' + )}) VALUES (${placeholders}) ON CONFLICT (${conflictColumns.join( + ', ' + )}) ${conflictAction}`, + params, + } + } + + private async transformRows( + table: NormalizedTableOptions, + rows: Record[] + ): Promise[]> { + if (!table.transform) { + return rows + } + + const transformedRows: Record[] = [] + + for (const row of rows) { + transformedRows.push(await table.transform(row)) + } + + return transformedRows + } + + private findCursorEnd( + table: NormalizedTableOptions, + rows: Record[], + state?: ReplicationStateRow + ): unknown { + if (!table.cursorColumn || rows.length === 0) { + return state?.last_cursor_value ?? null + } + + const cursorValues = rows + .map((row) => row[table.cursorColumn as string]) + .filter((value) => value !== undefined && value !== null) + + if (!cursorValues.length) { + return state?.last_cursor_value ?? null + } + + return cursorValues.reduce((max, value) => + this.compareCursorValues(value, max, table.cursorValueType) > 0 + ? value + : max + ) + } + + private compareCursorValues( + left: unknown, + right: unknown, + type: CursorValueType = 'string' + ): number { + if (type === 'number') { + return Number(left) - Number(right) + } + + const leftValue = + type === 'date' ? new Date(String(left)).getTime() : String(left) + const rightValue = + type === 'date' ? new Date(String(right)).getTime() : String(right) + + if (leftValue > rightValue) return 1 + if (leftValue < rightValue) return -1 + return 0 + } + + private deserializeCursorValue(state: ReplicationStateRow): unknown { + if (state.last_cursor_type === 'number') { + return Number(state.last_cursor_value) + } + + return state.last_cursor_value + } + + private cursorTypeForValue( + table: NormalizedTableOptions, + value: unknown + ): CursorValueType | null { + if (value == null) { + return table.cursorValueType ?? null + } + + if (table.cursorValueType) { + return table.cursorValueType + } + + return typeof value === 'number' ? 'number' : 'string' + } + + private async readState( + table: NormalizedTableOptions, + dataSource: DataSource + ): Promise { + const rows = (await dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.SELECT_STATE, + params: [this.tableKey(table)], + })) as ReplicationStateRow[] + + return rows[0] + } + + private async readAllState(dataSource: DataSource) { + return (await dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.SELECT_ALL_STATE, + params: [], + })) as ReplicationStateRow[] + } + + private async isDue( + table: NormalizedTableOptions, + dataSource: DataSource + ): Promise { + const state = await this.readState(table, dataSource) + + if (!state?.last_synced_at) { + return true + } + + if (table.intervalSeconds <= 0) { + return true + } + + const lastSyncedAt = new Date(state.last_synced_at).getTime() + return Date.now() - lastSyncedAt >= table.intervalSeconds * 1000 + } + + private async upsertState(opts: { + table: NormalizedTableOptions + dataSource: DataSource + state?: ReplicationStateRow + cursorEnd: unknown + rowsWritten: number + }) { + const { table, dataSource, state, cursorEnd, rowsWritten } = opts + const totalRowsSynced = + Number(state?.total_rows_synced ?? 0) + rowsWritten + const cursorType = this.cursorTypeForValue(table, cursorEnd) + + await dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.UPSERT_STATE, + params: [ + this.tableKey(table), + cursorEnd == null ? null : String(cursorEnd), + cursorType, + new Date().toISOString(), + totalRowsSynced, + ], + }) + } + + private async recordRun( + table: NormalizedTableOptions, + dataSource: DataSource, + result: ReplicationResult & { startedAt: string; finishedAt: string } + ) { + await dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.INSERT_RUN, + params: [ + this.tableKey(table), + result.startedAt, + result.finishedAt, + result.rowsRead, + result.rowsWritten, + result.cursorStart ?? null, + result.cursorEnd ?? null, + result.error ?? null, + ], + }) + } + + private async recordError( + table: NormalizedTableOptions, + dataSource: DataSource, + error: string + ) { + const now = new Date().toISOString() + await dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.UPDATE_STATE_ERROR, + params: [this.tableKey(table), now, error], + }) + await dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.INSERT_RUN, + params: [this.tableKey(table), now, now, 0, 0, null, null, error], + }) + } + + private normalizeTableOptions( + table: ReplicationTableOptions, + pluginOptions: ReplicationPluginOptions + ): NormalizedTableOptions { + this.validateQualifiedIdentifier(table.sourceTable) + const targetTable = + table.targetTable ?? table.sourceTable.split('.').at(-1) ?? '' + this.validateQualifiedIdentifier(targetTable) + + table.columns?.forEach((column) => this.validateIdentifier(column)) + if (table.cursorColumn) { + this.validateIdentifier(table.cursorColumn) + } + + const primaryKey = Array.isArray(table.primaryKey) + ? table.primaryKey + : table.primaryKey + ? [table.primaryKey] + : table.cursorColumn + ? [table.cursorColumn] + : [] + primaryKey.forEach((column) => this.validateIdentifier(column)) + + return { + ...table, + sourceTable: table.sourceTable, + targetTable, + primaryKey, + batchSize: this.normalizePositiveInteger( + table.batchSize ?? pluginOptions.defaultBatchSize ?? 500, + 'batchSize' + ), + intervalSeconds: this.normalizeNonNegativeInteger( + table.intervalSeconds ?? + pluginOptions.defaultIntervalSeconds ?? + 60, + 'intervalSeconds' + ), + mode: table.mode ?? 'upsert', + } + } + + private selectTables(tableNames?: string[]): NormalizedTableOptions[] { + if (!tableNames?.length) { + return this.tables + } + + const names = new Set(tableNames) + const selected = this.tables.filter( + (table) => + names.has(table.sourceTable) || + names.has(table.targetTable) || + names.has(this.tableKey(table)) + ) + + if (selected.length !== tableNames.length) { + const found = new Set( + selected.flatMap((table) => [ + table.sourceTable, + table.targetTable, + this.tableKey(table), + ]) + ) + const missing = tableNames.filter((name) => !found.has(name)) + throw new Error( + `Unknown replication table(s): ${missing.join(', ')}` + ) + } + + return selected + } + + private skippedResult(table: NormalizedTableOptions): ReplicationResult { + return { + ...this.emptyResult(table), + skipped: true, + } + } + + private emptyResult(table: NormalizedTableOptions): ReplicationResult { + return { + table: this.tableKey(table), + sourceTable: table.sourceTable, + targetTable: table.targetTable, + skipped: false, + rowsRead: 0, + rowsWritten: 0, + } + } + + private tableKey(table: NormalizedTableOptions): string { + return table.targetTable + } + + private hasError(result: ReplicationResult[]): boolean { + return result.some((item) => item.error) + } + + private async safeJson(request: Request): Promise { + try { + return (await request.json()) as T + } catch { + return undefined + } + } + + private quoteIdentifier( + identifier: string, + dialect: 'mysql' | 'postgresql' | 'sqlite' = 'sqlite' + ): string { + this.validateIdentifier(identifier) + const quote = dialect === 'mysql' ? '`' : '"' + return `${quote}${identifier}${quote}` + } + + private quoteQualifiedIdentifier( + identifier: string, + dialect: 'mysql' | 'postgresql' | 'sqlite' = 'sqlite' + ): string { + this.validateQualifiedIdentifier(identifier) + return identifier + .split('.') + .map((part) => this.quoteIdentifier(part, dialect)) + .join('.') + } + + private validateIdentifier(identifier: string) { + if (!IDENTIFIER_REGEX.test(identifier)) { + throw new Error(`Invalid SQL identifier: ${identifier}`) + } + } + + private validateQualifiedIdentifier(identifier: string) { + identifier.split('.').forEach((part) => this.validateIdentifier(part)) + } + + private normalizePositiveInteger(value: number, name: string): number { + if (!Number.isInteger(value) || value <= 0) { + throw new Error(`${name} must be a positive integer.`) + } + + return value + } + + private normalizeNonNegativeInteger(value: number, name: string): number { + if (!Number.isInteger(value) || value < 0) { + throw new Error(`${name} must be a non-negative integer.`) + } + + return value + } +} + diff --git a/plugins/replication/meta.json b/plugins/replication/meta.json new file mode 100644 index 0000000..c8952df --- /dev/null +++ b/plugins/replication/meta.json @@ -0,0 +1,21 @@ +{ + "version": "1.0.0", + "resources": { + "tables": { + "tmp_starbasedb_replication_state": { + "description": "Stores per-table external replication cursor state." + }, + "tmp_starbasedb_replication_runs": { + "description": "Stores per-run replication status and error history." + } + }, + "secrets": {}, + "variables": {} + }, + "dependencies": { + "tables": {}, + "secrets": {}, + "variables": {} + } +} + diff --git a/src/plugin.test.ts b/src/plugin.test.ts index 006e6df..a6f1932 100644 --- a/src/plugin.test.ts +++ b/src/plugin.test.ts @@ -139,7 +139,7 @@ describe('StarbasePluginRegistry', () => { sql: 'SELECT * FROM users', }) - expect(mockPlugin.beforeQuery).toHaveBeenCalled() + expect(mockPlugin.beforeQuery).toHaveBeenCalledTimes(1) expect(result.sql).toBe('SELECT * FROM users /* modified */') }) diff --git a/src/plugin.ts b/src/plugin.ts index f1879c1..a8be3ad 100644 --- a/src/plugin.ts +++ b/src/plugin.ts @@ -89,7 +89,6 @@ export class StarbasePluginRegistry { dataSource: opts.dataSource, config: opts.config, }) - await plugin.beforeQuery(opts) sql = modified.sql params = modified.params } From 7b01361e4b8f2334e7efe2d3b95baa0efbc684e4 Mon Sep 17 00:00:00 2001 From: mdp28 <212812676+mdp28@users.noreply.github.com> Date: Sat, 16 May 2026 15:36:39 +1000 Subject: [PATCH 2/2] Harden replication plugin cursor sync --- plugins/replication/README.md | 3 +- plugins/replication/index.test.ts | 180 +++++++++++++- plugins/replication/index.ts | 387 ++++++++++++++++++++++++------ src/do.test.ts | 2 + src/do.ts | 40 +-- 5 files changed, 516 insertions(+), 96 deletions(-) diff --git a/plugins/replication/README.md b/plugins/replication/README.md index 6c1bc80..f190f2f 100644 --- a/plugins/replication/README.md +++ b/plugins/replication/README.md @@ -24,5 +24,6 @@ new ReplicationPlugin({ The plugin expects `dataSource.source` to be `internal` and `dataSource.external` to contain the remote database connection. It stores cursor state in `tmp_starbasedb_replication_state`, records each run in `tmp_starbasedb_replication_runs`, and can be triggered manually with `POST /replication/pull` or `POST /replication/pull/:tableName`. -Set `autoPull: false` to disable interval checks before queries and rely only on the authenticated pull routes. Static `where` clauses are supported for trusted application configuration; table and column identifiers are validated before SQL is generated. +Manual replication routes require an admin request. Set `autoPull: false` to disable interval checks before queries and rely only on the pull routes. Static `where` clauses are supported for trusted application configuration; table and column identifiers are validated before SQL is generated. +Incremental tables must configure a `cursorColumn` plus either `primaryKey` or `cursorTieBreakerColumn`. The plugin stores both cursor and tie-breaker values so repeated cursor values across a batch boundary are not skipped. Tables without a cursor are treated as full refreshes and are selected without a batch limit. diff --git a/plugins/replication/index.test.ts b/plugins/replication/index.test.ts index 7ece103..019c926 100644 --- a/plugins/replication/index.test.ts +++ b/plugins/replication/index.test.ts @@ -6,6 +6,7 @@ import { DataSource } from '../../src/types' let mockConfig: StarbaseDBConfiguration let mockDataSource: DataSource let executeQuery: ReturnType +let executeTransaction: ReturnType let externalExecutor: ReturnType let stateRows: any[] @@ -21,6 +22,17 @@ beforeEach(() => { return [] }) + executeTransaction = vi.fn( + async (queries: { sql: string; params?: unknown[] }[]) => { + const results = [] + + for (const query of queries) { + results.push(await executeQuery(query)) + } + + return results + } + ) externalExecutor = vi.fn() mockConfig = { role: 'admin' } mockDataSource = { @@ -35,6 +47,7 @@ beforeEach(() => { }, rpc: { executeQuery, + executeTransaction, }, } as unknown as DataSource }) @@ -42,8 +55,16 @@ beforeEach(() => { describe('ReplicationPlugin', () => { it('pulls external rows into the internal table and advances cursor state', async () => { externalExecutor.mockResolvedValue([ - { id: 1, email: 'a@example.com', updated_at: '2026-05-14T00:00:00Z' }, - { id: 2, email: 'b@example.com', updated_at: '2026-05-15T00:00:00Z' }, + { + id: 1, + email: 'a@example.com', + updated_at: '2026-05-14T00:00:00Z', + }, + { + id: 2, + email: 'b@example.com', + updated_at: '2026-05-15T00:00:00Z', + }, ]) const plugin = new ReplicationPlugin({ defaultIntervalSeconds: 0, @@ -68,7 +89,7 @@ describe('ReplicationPlugin', () => { }) expect(externalExecutor).toHaveBeenCalledWith({ - sql: 'SELECT "id", "email", "updated_at" FROM "public"."users" ORDER BY "updated_at" ASC LIMIT 2', + sql: 'SELECT "id", "email", "updated_at" FROM "public"."users" ORDER BY "updated_at" ASC, "id" ASC LIMIT 2', params: [], dataSource: mockDataSource, config: mockConfig, @@ -85,6 +106,8 @@ describe('ReplicationPlugin', () => { 'users', '2026-05-15T00:00:00Z', 'date', + '2', + 'number', expect.any(String), 2, ], @@ -120,6 +143,7 @@ describe('ReplicationPlugin', () => { sourceTable: 'events', cursorColumn: 'id', cursorValueType: 'number', + primaryKey: 'id', batchSize: 10, }, ], @@ -139,6 +163,76 @@ describe('ReplicationPlugin', () => { ) }) + it('uses a primary key tie breaker when cursor values repeat across batches', async () => { + stateRows = [ + { + table_name: 'events', + last_cursor_value: '2026-05-15T00:00:00Z', + last_cursor_type: 'date', + last_cursor_tie_breaker_value: '10', + last_cursor_tie_breaker_type: 'number', + total_rows_synced: 10, + }, + ] + externalExecutor.mockResolvedValue([ + { id: 11, updated_at: '2026-05-15T00:00:00Z' }, + { id: 12, updated_at: '2026-05-16T00:00:00Z' }, + ]) + const plugin = new ReplicationPlugin({ + externalExecutor, + tables: [ + { + sourceTable: 'events', + columns: ['id', 'updated_at'], + cursorColumn: 'updated_at', + cursorValueType: 'date', + primaryKey: 'id', + batchSize: 10, + }, + ], + }) + + const result = await plugin.sync({ + dataSource: mockDataSource, + config: mockConfig, + force: true, + }) + + expect(externalExecutor).toHaveBeenCalledWith( + expect.objectContaining({ + sql: 'SELECT "id", "updated_at" FROM "events" WHERE ("updated_at" > ? OR ("updated_at" = ? AND "id" > ?)) ORDER BY "updated_at" ASC, "id" ASC LIMIT 10', + params: ['2026-05-15T00:00:00Z', '2026-05-15T00:00:00Z', 10], + }) + ) + expect(result[0]).toEqual( + expect.objectContaining({ + cursorEnd: '2026-05-16T00:00:00Z', + cursorTieBreakerEnd: '12', + }) + ) + }) + + it('runs no-cursor tables as full refreshes instead of first-page-only polling', async () => { + externalExecutor.mockResolvedValue([{ id: 1 }, { id: 2 }]) + const plugin = new ReplicationPlugin({ + externalExecutor, + tables: [{ sourceTable: 'users', batchSize: 1 }], + }) + + await plugin.sync({ + dataSource: mockDataSource, + config: mockConfig, + force: true, + }) + + expect(externalExecutor).toHaveBeenCalledWith( + expect.objectContaining({ + sql: 'SELECT * FROM "users"', + params: [], + }) + ) + }) + it('skips auto sync when a table interval is not due', async () => { stateRows = [ { @@ -237,5 +331,83 @@ describe('ReplicationPlugin', () => { }) ).toThrow('Invalid SQL identifier') }) -}) + it('requires an explicit primary key or tie breaker for cursor tables', () => { + expect( + () => + new ReplicationPlugin({ + tables: [ + { + sourceTable: 'events', + cursorColumn: 'updated_at', + }, + ], + }) + ).toThrow('requires primaryKey or cursorTieBreakerColumn') + }) + + it('uses the internal transaction RPC for row writes and state updates', async () => { + let insertCount = 0 + executeQuery = vi.fn(async ({ sql }) => { + if ( + sql.includes('FROM tmp_starbasedb_replication_state') && + sql.includes('WHERE table_name = ?') + ) { + return [] + } + + if (sql.startsWith('INSERT INTO "users"')) { + insertCount += 1 + + if (insertCount === 2) { + throw new Error('insert failed') + } + } + + return [] + }) + executeTransaction = vi.fn( + async (queries: { sql: string; params?: unknown[] }[]) => { + const results = [] + + for (const query of queries) { + results.push(await executeQuery(query)) + } + + return results + } + ) + ;(mockDataSource.rpc as any).executeQuery = executeQuery + ;(mockDataSource.rpc as any).executeTransaction = executeTransaction + externalExecutor.mockResolvedValue([{ id: 1 }, { id: 2 }]) + const plugin = new ReplicationPlugin({ + externalExecutor, + continueOnTableError: false, + tables: [{ sourceTable: 'users', primaryKey: 'id' }], + }) + + await expect( + plugin.sync({ + dataSource: mockDataSource, + config: mockConfig, + force: true, + }) + ).rejects.toThrow('insert failed') + + expect(executeTransaction).toHaveBeenCalledTimes(1) + const transactionQueries = executeTransaction.mock + .calls[0][0] as Array<{ + sql: string + }> + expect(transactionQueries.map((query) => query.sql)).toEqual([ + 'INSERT INTO "users" ("id") VALUES (?) ON CONFLICT ("id") DO NOTHING', + 'INSERT INTO "users" ("id") VALUES (?) ON CONFLICT ("id") DO NOTHING', + expect.stringContaining( + 'INSERT INTO tmp_starbasedb_replication_state' + ), + expect.stringContaining( + 'INSERT INTO tmp_starbasedb_replication_runs' + ), + ]) + }) +}) diff --git a/plugins/replication/index.ts b/plugins/replication/index.ts index a7dcb58..ca85072 100644 --- a/plugins/replication/index.ts +++ b/plugins/replication/index.ts @@ -10,6 +10,8 @@ const SQL_QUERIES = { table_name TEXT PRIMARY KEY, last_cursor_value TEXT, last_cursor_type TEXT, + last_cursor_tie_breaker_value TEXT, + last_cursor_tie_breaker_type TEXT, last_synced_at TEXT, total_rows_synced INTEGER NOT NULL DEFAULT 0, last_error TEXT @@ -29,22 +31,24 @@ const SQL_QUERIES = { ) `, SELECT_STATE: ` - SELECT table_name, last_cursor_value, last_cursor_type, last_synced_at, total_rows_synced, last_error + SELECT table_name, last_cursor_value, last_cursor_type, last_cursor_tie_breaker_value, last_cursor_tie_breaker_type, last_synced_at, total_rows_synced, last_error FROM tmp_starbasedb_replication_state WHERE table_name = ? `, SELECT_ALL_STATE: ` - SELECT table_name, last_cursor_value, last_cursor_type, last_synced_at, total_rows_synced, last_error + SELECT table_name, last_cursor_value, last_cursor_type, last_cursor_tie_breaker_value, last_cursor_tie_breaker_type, last_synced_at, total_rows_synced, last_error FROM tmp_starbasedb_replication_state ORDER BY table_name `, UPSERT_STATE: ` INSERT INTO tmp_starbasedb_replication_state ( - table_name, last_cursor_value, last_cursor_type, last_synced_at, total_rows_synced, last_error - ) VALUES (?, ?, ?, ?, ?, NULL) + table_name, last_cursor_value, last_cursor_type, last_cursor_tie_breaker_value, last_cursor_tie_breaker_type, last_synced_at, total_rows_synced, last_error + ) VALUES (?, ?, ?, ?, ?, ?, ?, NULL) ON CONFLICT(table_name) DO UPDATE SET last_cursor_value = excluded.last_cursor_value, last_cursor_type = excluded.last_cursor_type, + last_cursor_tie_breaker_value = excluded.last_cursor_tie_breaker_value, + last_cursor_tie_breaker_type = excluded.last_cursor_tie_breaker_type, last_synced_at = excluded.last_synced_at, total_rows_synced = excluded.total_rows_synced, last_error = NULL @@ -80,6 +84,8 @@ export interface ReplicationTableOptions { columns?: string[] cursorColumn?: string cursorValueType?: CursorValueType + cursorTieBreakerColumn?: string + cursorTieBreakerValueType?: CursorValueType primaryKey?: string | string[] where?: string batchSize?: number @@ -110,6 +116,8 @@ export interface ReplicationResult { rowsWritten: number cursorStart?: string | null cursorEnd?: string | null + cursorTieBreakerStart?: string | null + cursorTieBreakerEnd?: string | null error?: string } @@ -117,11 +125,30 @@ type ReplicationStateRow = QueryResult & { table_name: string last_cursor_value?: string | null last_cursor_type?: CursorValueType | null + last_cursor_tie_breaker_value?: string | null + last_cursor_tie_breaker_type?: CursorValueType | null last_synced_at?: string | null total_rows_synced?: number | string | null last_error?: string | null } +type ReplicationStateBookmark = { + cursorEnd: unknown + cursorTieBreakerEnd: unknown +} + +type InternalQuery = { + sql: string + params?: unknown[] +} + +type InternalTransactionExecutor = { + executeTransaction( + queries: InternalQuery[], + isRaw: boolean + ): Promise +} + type NormalizedTableOptions = Required< Pick > & @@ -172,7 +199,7 @@ export class ReplicationPlugin extends StarbasePlugin { }) app.get(`${this.pathPrefix}/status`, async () => { - if (!this.dataSource) { + if (!this.dataSource || !this.config) { return createResponse( undefined, 'ReplicationPlugin has not been initialized.', @@ -180,6 +207,10 @@ export class ReplicationPlugin extends StarbasePlugin { ) } + if (!this.isAdmin(this.config)) { + return createResponse(undefined, 'Unauthorized request', 400) + } + await this.ensureInitialized(this.dataSource) const state = await this.readAllState(this.dataSource) @@ -209,6 +240,10 @@ export class ReplicationPlugin extends StarbasePlugin { ) } + if (!this.isAdmin(this.config)) { + return createResponse(undefined, 'Unauthorized request', 400) + } + const body = await this.safeJson<{ tables?: string[] }>(c.req.raw) const result = await this.sync({ dataSource: this.dataSource, @@ -217,7 +252,11 @@ export class ReplicationPlugin extends StarbasePlugin { tableNames: body?.tables, }) - return createResponse(result, undefined, this.hasError(result) ? 500 : 200) + return createResponse( + result, + undefined, + this.hasError(result) ? 500 : 200 + ) }) app.post(`${this.pathPrefix}/pull/:tableName`, async (c) => { @@ -229,6 +268,10 @@ export class ReplicationPlugin extends StarbasePlugin { ) } + if (!this.isAdmin(this.config)) { + return createResponse(undefined, 'Unauthorized request', 400) + } + const tableName = c.req.param('tableName') const result = await this.sync({ dataSource: this.dataSource, @@ -237,7 +280,11 @@ export class ReplicationPlugin extends StarbasePlugin { tableNames: [tableName], }) - return createResponse(result, undefined, this.hasError(result) ? 500 : 200) + return createResponse( + result, + undefined, + this.hasError(result) ? 500 : 200 + ) }) } @@ -347,23 +394,7 @@ export class ReplicationPlugin extends StarbasePlugin { ? (externalRows as Record[]) : [] const transformedRows = await this.transformRows(table, rows) - const cursorEnd = this.findCursorEnd(table, rows, state) - - for (const row of transformedRows) { - const insert = this.buildInsertQuery(table, row) - await dataSource.rpc.executeQuery({ - sql: insert.sql, - params: insert.params, - }) - } - - await this.upsertState({ - table, - dataSource, - state, - cursorEnd, - rowsWritten: transformedRows.length, - }) + const bookmark = this.findStateBookmark(table, rows, state) const result: ReplicationResult = { table: this.tableKey(table), @@ -373,14 +404,35 @@ export class ReplicationPlugin extends StarbasePlugin { rowsRead: rows.length, rowsWritten: transformedRows.length, cursorStart: state?.last_cursor_value ?? null, - cursorEnd: cursorEnd == null ? null : String(cursorEnd), + cursorEnd: + bookmark.cursorEnd == null ? null : String(bookmark.cursorEnd), + cursorTieBreakerStart: state?.last_cursor_tie_breaker_value ?? null, + cursorTieBreakerEnd: + bookmark.cursorTieBreakerEnd == null + ? null + : String(bookmark.cursorTieBreakerEnd), } - await this.recordRun(table, dataSource, { - ...result, - startedAt, - finishedAt: new Date().toISOString(), - }) + const writeQueries: InternalQuery[] = transformedRows.map((row) => + this.buildInsertQuery(table, row) + ) + writeQueries.push( + this.buildUpsertStateQuery({ + table, + state, + bookmark, + rowsWritten: transformedRows.length, + }) + ) + writeQueries.push( + this.buildRecordRunQuery(table, { + ...result, + startedAt, + finishedAt: new Date().toISOString(), + }) + ) + + await this.executeInternalTransaction(dataSource, writeQueries) return result } @@ -397,6 +449,14 @@ export class ReplicationPlugin extends StarbasePlugin { sql: SQL_QUERIES.CREATE_STATE_TABLE, params: [], }) + await this.addStateColumnIfMissing( + dataSource, + 'last_cursor_tie_breaker_value TEXT' + ) + await this.addStateColumnIfMissing( + dataSource, + 'last_cursor_tie_breaker_type TEXT' + ) await dataSource.rpc.executeQuery({ sql: SQL_QUERIES.CREATE_RUNS_TABLE, params: [], @@ -421,6 +481,25 @@ export class ReplicationPlugin extends StarbasePlugin { } } + private async addStateColumnIfMissing( + dataSource: DataSource, + columnDefinition: string + ) { + try { + await dataSource.rpc.executeQuery({ + sql: `ALTER TABLE tmp_starbasedb_replication_state ADD COLUMN ${columnDefinition}`, + params: [], + }) + } catch (error) { + const message = + error instanceof Error ? error.message : String(error) + + if (!/duplicate column|already exists/i.test(message)) { + throw error + } + } + } + private buildSelectQuery( table: NormalizedTableOptions, state: ReplicationStateRow | undefined, @@ -431,7 +510,9 @@ export class ReplicationPlugin extends StarbasePlugin { ? dataSource.external.dialect : 'sqlite' const columns = table.columns?.length - ? table.columns.map((column) => this.quoteIdentifier(column, quoteStyle)).join(', ') + ? table.columns + .map((column) => this.quoteIdentifier(column, quoteStyle)) + .join(', ') : '*' const conditions: string[] = [] const params: unknown[] = [] @@ -441,24 +522,64 @@ export class ReplicationPlugin extends StarbasePlugin { } if (table.cursorColumn && state?.last_cursor_value != null) { - conditions.push( - `${this.quoteIdentifier(table.cursorColumn, quoteStyle)} > ?` + const cursorIdentifier = this.quoteIdentifier( + table.cursorColumn, + quoteStyle ) - params.push(this.deserializeCursorValue(state)) + const cursorValue = this.deserializeCursorValue(state) + + if ( + this.usesCursorTieBreaker(table) && + state.last_cursor_tie_breaker_value != null + ) { + const tieBreakerIdentifier = this.quoteIdentifier( + table.cursorTieBreakerColumn as string, + quoteStyle + ) + conditions.push( + `(${cursorIdentifier} > ? OR (${cursorIdentifier} = ? AND ${tieBreakerIdentifier} > ?))` + ) + params.push( + cursorValue, + cursorValue, + this.deserializeCursorTieBreakerValue(state) + ) + } else { + conditions.push(`${cursorIdentifier} > ?`) + params.push(cursorValue) + } } const where = conditions.length ? ` WHERE ${conditions.join(' AND ')}` : '' - const orderBy = table.cursorColumn - ? ` ORDER BY ${this.quoteIdentifier(table.cursorColumn, quoteStyle)} ASC` + const orderByParts: string[] = [] + if (table.cursorColumn) { + orderByParts.push( + this.quoteIdentifier(table.cursorColumn, quoteStyle) + ) + + if (this.usesCursorTieBreaker(table)) { + orderByParts.push( + this.quoteIdentifier( + table.cursorTieBreakerColumn as string, + quoteStyle + ) + ) + } + } + const orderBy = orderByParts.length + ? ` ORDER BY ${orderByParts + .map((identifier) => `${identifier} ASC`) + .join(', ')}` : '' + const limit = table.cursorColumn ? ` LIMIT ${table.batchSize}` : '' return { sql: `SELECT ${columns} FROM ${this.quoteQualifiedIdentifier( table.sourceTable, quoteStyle - )}${where}${orderBy} LIMIT ${table.batchSize}`, + )}${where}${orderBy}${limit}`, params, } } @@ -554,27 +675,76 @@ export class ReplicationPlugin extends StarbasePlugin { return transformedRows } - private findCursorEnd( + private findStateBookmark( table: NormalizedTableOptions, rows: Record[], state?: ReplicationStateRow - ): unknown { + ): ReplicationStateBookmark { if (!table.cursorColumn || rows.length === 0) { - return state?.last_cursor_value ?? null + return { + cursorEnd: state?.last_cursor_value ?? null, + cursorTieBreakerEnd: + state?.last_cursor_tie_breaker_value ?? null, + } + } + + const cursorColumn = table.cursorColumn + const tieBreakerColumn = this.usesCursorTieBreaker(table) + ? table.cursorTieBreakerColumn + : undefined + const cursorRows = rows.filter( + (row) => + row[cursorColumn] !== undefined && + row[cursorColumn] !== null && + (!tieBreakerColumn || + (row[tieBreakerColumn] !== undefined && + row[tieBreakerColumn] !== null)) + ) + + if (!cursorRows.length) { + return { + cursorEnd: state?.last_cursor_value ?? null, + cursorTieBreakerEnd: + state?.last_cursor_tie_breaker_value ?? null, + } + } + + const lastRow = cursorRows.reduce((maxRow, row) => + this.compareCursorRows(table, row, maxRow) > 0 ? row : maxRow + ) + + return { + cursorEnd: lastRow[cursorColumn], + cursorTieBreakerEnd: tieBreakerColumn + ? lastRow[tieBreakerColumn] + : null, + } + } + + private compareCursorRows( + table: NormalizedTableOptions, + left: Record, + right: Record + ): number { + if (!table.cursorColumn) { + return 0 } - const cursorValues = rows - .map((row) => row[table.cursorColumn as string]) - .filter((value) => value !== undefined && value !== null) + const cursorCompare = this.compareCursorValues( + left[table.cursorColumn], + right[table.cursorColumn], + table.cursorValueType + ) - if (!cursorValues.length) { - return state?.last_cursor_value ?? null + if (cursorCompare !== 0 || !this.usesCursorTieBreaker(table)) { + return cursorCompare } - return cursorValues.reduce((max, value) => - this.compareCursorValues(value, max, table.cursorValueType) > 0 - ? value - : max + const tieBreakerColumn = table.cursorTieBreakerColumn as string + return this.compareCursorValues( + left[tieBreakerColumn], + right[tieBreakerColumn], + table.cursorTieBreakerValueType ) } @@ -605,16 +775,26 @@ export class ReplicationPlugin extends StarbasePlugin { return state.last_cursor_value } + private deserializeCursorTieBreakerValue( + state: ReplicationStateRow + ): unknown { + if (state.last_cursor_tie_breaker_type === 'number') { + return Number(state.last_cursor_tie_breaker_value) + } + + return state.last_cursor_tie_breaker_value + } + private cursorTypeForValue( - table: NormalizedTableOptions, - value: unknown + value: unknown, + configuredType?: CursorValueType ): CursorValueType | null { if (value == null) { - return table.cursorValueType ?? null + return configuredType ?? null } - if (table.cursorValueType) { - return table.cursorValueType + if (configuredType) { + return configuredType } return typeof value === 'number' ? 'number' : 'string' @@ -627,7 +807,7 @@ export class ReplicationPlugin extends StarbasePlugin { const rows = (await dataSource.rpc.executeQuery({ sql: SQL_QUERIES.SELECT_STATE, params: [this.tableKey(table)], - })) as ReplicationStateRow[] + })) as unknown as ReplicationStateRow[] return rows[0] } @@ -636,7 +816,7 @@ export class ReplicationPlugin extends StarbasePlugin { return (await dataSource.rpc.executeQuery({ sql: SQL_QUERIES.SELECT_ALL_STATE, params: [], - })) as ReplicationStateRow[] + })) as unknown as ReplicationStateRow[] } private async isDue( @@ -657,36 +837,45 @@ export class ReplicationPlugin extends StarbasePlugin { return Date.now() - lastSyncedAt >= table.intervalSeconds * 1000 } - private async upsertState(opts: { + private buildUpsertStateQuery(opts: { table: NormalizedTableOptions - dataSource: DataSource state?: ReplicationStateRow - cursorEnd: unknown + bookmark: ReplicationStateBookmark rowsWritten: number - }) { - const { table, dataSource, state, cursorEnd, rowsWritten } = opts + }): InternalQuery { + const { table, state, bookmark, rowsWritten } = opts const totalRowsSynced = Number(state?.total_rows_synced ?? 0) + rowsWritten - const cursorType = this.cursorTypeForValue(table, cursorEnd) + const cursorType = this.cursorTypeForValue( + bookmark.cursorEnd, + table.cursorValueType + ) + const tieBreakerType = this.cursorTypeForValue( + bookmark.cursorTieBreakerEnd, + table.cursorTieBreakerValueType + ) - await dataSource.rpc.executeQuery({ + return { sql: SQL_QUERIES.UPSERT_STATE, params: [ this.tableKey(table), - cursorEnd == null ? null : String(cursorEnd), + bookmark.cursorEnd == null ? null : String(bookmark.cursorEnd), cursorType, + bookmark.cursorTieBreakerEnd == null + ? null + : String(bookmark.cursorTieBreakerEnd), + tieBreakerType, new Date().toISOString(), totalRowsSynced, ], - }) + } } - private async recordRun( + private buildRecordRunQuery( table: NormalizedTableOptions, - dataSource: DataSource, result: ReplicationResult & { startedAt: string; finishedAt: string } - ) { - await dataSource.rpc.executeQuery({ + ): InternalQuery { + return { sql: SQL_QUERIES.INSERT_RUN, params: [ this.tableKey(table), @@ -698,7 +887,7 @@ export class ReplicationPlugin extends StarbasePlugin { result.cursorEnd ?? null, result.error ?? null, ], - }) + } } private async recordError( @@ -730,20 +919,39 @@ export class ReplicationPlugin extends StarbasePlugin { if (table.cursorColumn) { this.validateIdentifier(table.cursorColumn) } + if (table.cursorTieBreakerColumn) { + this.validateIdentifier(table.cursorTieBreakerColumn) + } const primaryKey = Array.isArray(table.primaryKey) ? table.primaryKey : table.primaryKey ? [table.primaryKey] - : table.cursorColumn - ? [table.cursorColumn] - : [] + : [] primaryKey.forEach((column) => this.validateIdentifier(column)) + const cursorTieBreakerColumn = table.cursorColumn + ? (table.cursorTieBreakerColumn ?? primaryKey[0]) + : undefined + + if (table.cursorColumn && !cursorTieBreakerColumn) { + throw new Error( + `Replication table ${table.sourceTable} requires primaryKey or cursorTieBreakerColumn when cursorColumn is configured.` + ) + } + + if (table.columns?.length && table.cursorColumn) { + this.requireSelectedColumn(table, table.cursorColumn) + + if (cursorTieBreakerColumn) { + this.requireSelectedColumn(table, cursorTieBreakerColumn) + } + } return { ...table, sourceTable: table.sourceTable, targetTable, + cursorTieBreakerColumn, primaryKey, batchSize: this.normalizePositiveInteger( table.batchSize ?? pluginOptions.defaultBatchSize ?? 500, @@ -811,10 +1019,31 @@ export class ReplicationPlugin extends StarbasePlugin { return table.targetTable } + private usesCursorTieBreaker(table: NormalizedTableOptions): boolean { + return Boolean( + table.cursorColumn && + table.cursorTieBreakerColumn && + table.cursorTieBreakerColumn !== table.cursorColumn + ) + } + + private async executeInternalTransaction( + dataSource: DataSource, + queries: InternalQuery[] + ) { + await ( + dataSource.rpc as unknown as InternalTransactionExecutor + ).executeTransaction(queries, false) + } + private hasError(result: ReplicationResult[]): boolean { return result.some((item) => item.error) } + private isAdmin(config: StarbaseDBConfiguration): boolean { + return config.role === 'admin' + } + private async safeJson(request: Request): Promise { try { return (await request.json()) as T @@ -853,6 +1082,17 @@ export class ReplicationPlugin extends StarbasePlugin { identifier.split('.').forEach((part) => this.validateIdentifier(part)) } + private requireSelectedColumn( + table: ReplicationTableOptions, + column: string + ) { + if (!table.columns?.includes(column)) { + throw new Error( + `Replication table ${table.sourceTable} columns must include ${column}.` + ) + } + } + private normalizePositiveInteger(value: number, name: string): number { if (!Number.isInteger(value) || value <= 0) { throw new Error(`${name} must be a positive integer.`) @@ -869,4 +1109,3 @@ export class ReplicationPlugin extends StarbasePlugin { return value } } - diff --git a/src/do.test.ts b/src/do.test.ts index 272c4e9..569bdbf 100644 --- a/src/do.test.ts +++ b/src/do.test.ts @@ -74,6 +74,7 @@ const mockStorage = { rowsWritten: 1, }), }, + transactionSync: vi.fn((closure: () => unknown) => closure()), } const mockDurableObjectState = { @@ -114,6 +115,7 @@ describe('StarbaseDBDurableObject Tests', () => { ] const result = await instance.executeTransaction(queries, false) + expect(mockStorage.transactionSync).toHaveBeenCalledTimes(1) expect(mockStorage.sql.exec).toHaveBeenCalledTimes(2) expect(result.length).toBe(2) }) diff --git a/src/do.ts b/src/do.ts index b6bb2b6..27d547c 100644 --- a/src/do.ts +++ b/src/do.ts @@ -72,6 +72,7 @@ export class StarbaseDBDurableObject extends DurableObject { deleteAlarm: this.deleteAlarm.bind(this), getStatistics: this.getStatistics.bind(this), executeQuery: this.executeQuery.bind(this), + executeTransaction: this.executeTransaction.bind(this), } } @@ -260,7 +261,7 @@ export class StarbaseDBDurableObject extends DurableObject { } } - private async executeRawQuery< + private executeRawQuery< T extends Record = Record< string, SqlStorageValue @@ -284,14 +285,11 @@ export class StarbaseDBDurableObject extends DurableObject { } } - public async executeQuery(opts: { - sql: string - params?: unknown[] + private formatQueryResult>( + cursor: SqlStorageCursor, isRaw?: boolean - }) { - const cursor = await this.executeRawQuery(opts) - - if (opts.isRaw) { + ) { + if (isRaw) { return { columns: cursor.columnNames, rows: Array.from(cursor.raw()), @@ -305,20 +303,28 @@ export class StarbaseDBDurableObject extends DurableObject { return cursor.toArray() } + public async executeQuery(opts: { + sql: string + params?: unknown[] + isRaw?: boolean + }) { + const cursor = this.executeRawQuery(opts) + return this.formatQueryResult(cursor, opts.isRaw) + } + public async executeTransaction( queries: { sql: string; params?: unknown[] }[], isRaw: boolean ): Promise { - const results = [] - try { - for (const queryObj of queries) { - const { sql, params } = queryObj - const result = await this.executeQuery({ sql, params, isRaw }) - results.push(result) - } - - return results + return this.storage.transactionSync(() => + queries.map(({ sql, params }) => + this.formatQueryResult( + this.executeRawQuery({ sql, params }), + isRaw + ) + ) + ) } catch (error) { console.error('Transaction Execution Error:', error) throw error