diff --git a/plugins/replication/README.md b/plugins/replication/README.md new file mode 100644 index 0000000..9c7c652 --- /dev/null +++ b/plugins/replication/README.md @@ -0,0 +1,89 @@ +# ReplicationPlugin + +The `ReplicationPlugin` pulls data from an external PostgreSQL source into StarbaseDB's internal Durable Object SQLite database on demand. Each sync cycle fetches only rows newer than the last checkpoint (cursor-based incremental sync), upserts them into the local SQLite replica, and updates a checkpoint record. The SQLite replica can then be queried at the edge with no round-trip to the origin database. + +## Enabling the Plugin + +Add `ReplicationPlugin` to the `plugins` array when constructing `StarbaseDB`: + +```typescript +import { StarbaseDB } from '@outerbase/starbasedb' +import { ReplicationPlugin } from '@outerbase/starbasedb/plugins' + +const replication = new ReplicationPlugin({ + tables: ['users', 'orders'], + cursorColumn: 'id', + batchSize: 500, + ctx, +}) + +const db = new StarbaseDB({ dataSource, config, plugins: [replication] }) +``` + +## Configuration Options + +| Option | Type | Default | Description | +| -------------- | ------------------ | ------- | ----------------------------------------------------------------------------------- | +| `tables` | `string[]` | `[]` | Tables to replicate. An empty array replicates **all** public tables automatically. | +| `cursorColumn` | `string` | `'id'` | Column used as the incremental cursor (must be monotonically increasing). | +| `batchSize` | `number` | `500` | Maximum rows fetched per table per sync cycle. | +| `ctx` | `ExecutionContext` | — | Cloudflare Worker execution context. Required for non-blocking `waitUntil` support. | + +## Required `wrangler.toml` Variables + +The plugin reads from whatever external PostgreSQL source is configured for StarbaseDB: + +```toml +[vars] +EXTERNAL_DB_TYPE = "postgresql" +EXTERNAL_DB_HOST = "your-postgres-host" +EXTERNAL_DB_PORT = 5432 +EXTERNAL_DB_USER = "your-user" +EXTERNAL_DB_PASS = "your-password" +EXTERNAL_DB_DATABASE = "your-database" +EXTERNAL_DB_DEFAULT_SCHEMA = "public" # optional, defaults to "public" +``` + +## How Incremental Sync Works + +Each table gets its own checkpoint row stored in `tmp_replication_checkpoints`. + +1. **First sync** — no checkpoint exists, so the full table is fetched in batches of `batchSize`, ordered by `cursorColumn` ascending. +2. **Subsequent syncs** — only rows where `cursorColumn > last_cursor_value` are fetched. After each batch, the checkpoint is updated with the highest cursor value seen and the cumulative row count. +3. Rows are written to SQLite using `INSERT OR REPLACE`, so re-syncing the same rows is always safe (idempotent). + +### Triggering a Sync + +**Manually** via the HTTP API (admin-only): + +```bash +curl -X POST https://your-worker/replication/run \ + -H "Authorization: Bearer " +``` + +**On a schedule** — use the [CronPlugin](../cron) to call `POST /replication/run` on an interval, or wire it to a Cloudflare Cron Trigger in your worker's `scheduled` handler: + +```typescript +export default { + async scheduled(_event, _env, ctx) { + ctx.waitUntil(replication.sync()) // or POST /replication/run + }, +} +``` + +### Checking Status + +```bash +curl https://your-worker/replication/status \ + -H "Authorization: Bearer " +``` + +Returns all checkpoint rows plus the current plugin configuration. + +## Limitations + +- **PostgreSQL only** — MySQL, D1, Turso, and Starbase sources are not yet supported. Sync will log a warning and exit early for non-Postgres sources. +- **No DELETE replication** — rows deleted from the source are not removed from the SQLite replica. Full-table refresh (dropping and re-creating the replica) is a planned future feature. +- **Append/update only** — the cursor assumes rows are either inserted or updated with an ever-increasing cursor value. Backfilled or out-of-order rows may not be captured. +- **Schema must pre-exist in SQLite** — the plugin does not create destination tables automatically. You must create matching tables in SQLite before the first sync. +- **Single-column cursor** — composite cursors are not supported. diff --git a/plugins/replication/index.test.ts b/plugins/replication/index.test.ts new file mode 100644 index 0000000..8047885 --- /dev/null +++ b/plugins/replication/index.test.ts @@ -0,0 +1,357 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { Hono } from 'hono' +import { ReplicationPlugin } from './index' +import type { DataSource } from '../../src/types' +import type { StarbaseDBConfiguration, StarbaseApp } from '../../src/handler' + +vi.mock('../../src/operation', () => ({ + executeExternalQuery: vi.fn().mockResolvedValue([]), +})) + +import { executeExternalQuery } from '../../src/operation' + +// rpc.executeQuery is a Cloudflare Stub type at compile time but a Vitest mock at runtime. +// The double cast via unknown is required to access mock methods in tests. +type MockFn = ReturnType +const rpcMock = (ds: DataSource): MockFn => + ds.rpc.executeQuery as unknown as MockFn + +function createMockDataSource(): DataSource { + return { + source: 'internal', + external: { + dialect: 'postgresql', + host: 'mock-host', + port: 5432, + user: 'mock-user', + password: 'mock-password', + database: 'mock-db', + } as any, + rpc: { + executeQuery: vi.fn().mockResolvedValue([]), + } as any, + } as DataSource +} + +function createTestApp( + config: StarbaseDBConfiguration, + dataSource: DataSource +): StarbaseApp { + const app = new Hono() as unknown as StarbaseApp + app.use('*', async (c, next) => { + c.set('config', config) + c.set('dataSource', dataSource) + await next() + }) + return app +} + +describe('ReplicationPlugin', () => { + let mockDataSource: DataSource + const adminConfig: StarbaseDBConfiguration = { role: 'admin' } + const clientConfig: StarbaseDBConfiguration = { role: 'client' } + + beforeEach(() => { + mockDataSource = createMockDataSource() + vi.clearAllMocks() + }) + + describe('buildFetchQuery', () => { + it('should build query without WHERE clause when no checkpoint exists', () => { + const plugin = new ReplicationPlugin() + const { sql, params } = (plugin as any).buildFetchQuery( + 'users', + 'id', + null, + 500 + ) + expect(sql).toBe('SELECT * FROM users ORDER BY id ASC LIMIT ?') + expect(params).toEqual([500]) + }) + + it('should build query with WHERE clause when checkpoint value exists', () => { + const plugin = new ReplicationPlugin() + const { sql, params } = (plugin as any).buildFetchQuery( + 'users', + 'id', + '42', + 100 + ) + expect(sql).toBe( + 'SELECT * FROM users WHERE id > ? ORDER BY id ASC LIMIT ?' + ) + expect(params).toEqual(['42', 100]) + }) + + it('should use provided cursor column and batch size', () => { + const plugin = new ReplicationPlugin() + const { sql, params } = (plugin as any).buildFetchQuery( + 'orders', + 'created_at', + '2024-01-01', + 250 + ) + expect(sql).toBe( + 'SELECT * FROM orders WHERE created_at > ? ORDER BY created_at ASC LIMIT ?' + ) + expect(params).toEqual(['2024-01-01', 250]) + }) + }) + + describe('GET /replication/status', () => { + it('should return 401 for non-admin role', async () => { + const plugin = new ReplicationPlugin() + const app = createTestApp(clientConfig, mockDataSource) + await plugin.register(app) + + const response = await app.request('/replication/status') + expect(response.status).toBe(401) + }) + + it('should return checkpoint data and plugin config for admin role', async () => { + const checkpoints = [ + { + table_name: 'users', + cursor_column: 'id', + last_cursor_value: '10', + last_synced_at: '2024-01-01T00:00:00', + rows_synced: 10, + }, + ] + + rpcMock(mockDataSource) + .mockResolvedValueOnce([]) // CREATE_CHECKPOINT_TABLE + .mockResolvedValueOnce(checkpoints) // GET_ALL_CHECKPOINTS + + const plugin = new ReplicationPlugin({ tables: ['users'] }) + const app = createTestApp(adminConfig, mockDataSource) + await plugin.register(app) + + const response = await app.request('/replication/status') + expect(response.status).toBe(200) + + const body = (await response.json()) as any + expect(body.result.checkpoints).toEqual(checkpoints) + expect(body.result.config).toBeDefined() + expect(body.result.config.tables).toEqual(['users']) + }) + }) + + describe('POST /replication/run', () => { + it('should return 401 for non-admin role', async () => { + const plugin = new ReplicationPlugin() + const app = createTestApp(clientConfig, mockDataSource) + await plugin.register(app) + + const response = await app.request('/replication/run', { + method: 'POST', + }) + expect(response.status).toBe(401) + }) + + it('should return success JSON and start replication', async () => { + const plugin = new ReplicationPlugin() + const app = createTestApp(adminConfig, mockDataSource) + await plugin.register(app) + + const response = await app.request('/replication/run', { + method: 'POST', + }) + expect(response.status).toBe(200) + + const body = (await response.json()) as any + expect(body.result.success).toBe(true) + expect(body.result.message).toBe('Replication started') + }) + + it('should call waitUntil with sync promise when executionContext is provided', async () => { + const waitUntilMock = vi.fn() + const mockCtx = { + waitUntil: waitUntilMock, + } as unknown as ExecutionContext + + const plugin = new ReplicationPlugin({ ctx: mockCtx }) + const app = createTestApp(adminConfig, mockDataSource) + await plugin.register(app) + + await app.request('/replication/run', { method: 'POST' }) + + expect(waitUntilMock).toHaveBeenCalledWith(expect.any(Promise)) + }) + }) + + describe('sync - checkpoint upsert logic', () => { + it('should upsert checkpoint with correct SQL and params after syncing rows', async () => { + const plugin = new ReplicationPlugin({ + tables: ['users'], + cursorColumn: 'id', + batchSize: 100, + }) + + ;(plugin as any).dataSource = mockDataSource + ;(plugin as any).config = adminConfig + + vi.mocked(executeExternalQuery).mockResolvedValueOnce([ + { id: 5, name: 'Alice' }, + ]) + + rpcMock(mockDataSource) + .mockResolvedValueOnce([]) // GET_CHECKPOINT (no existing checkpoint) + .mockResolvedValueOnce(undefined) // INSERT OR REPLACE users row + .mockResolvedValueOnce(undefined) // UPSERT_CHECKPOINT + + await (plugin as any).sync() + + const calls = rpcMock(mockDataSource).mock.calls + + const checkpointCall = calls.find((call: any[]) => { + const opts = call[0] as { sql: string; params: unknown[] } + return opts.sql.includes( + 'INSERT OR REPLACE INTO tmp_replication_checkpoints' + ) + }) + + expect(checkpointCall).toBeDefined() + const opts = checkpointCall![0] as { + sql: string + params: unknown[] + } + expect(opts.params[0]).toBe('users') // table_name + expect(opts.params[1]).toBe('id') // cursor_column + expect(opts.params[2]).toBe('5') // last_cursor_value (stringified) + expect(opts.params[3]).toBe(1) // rows_synced + }) + + it('should accumulate rows_synced across multiple syncs', async () => { + const plugin = new ReplicationPlugin({ + tables: ['users'], + cursorColumn: 'id', + batchSize: 100, + }) + + ;(plugin as any).dataSource = mockDataSource + ;(plugin as any).config = adminConfig + + vi.mocked(executeExternalQuery).mockResolvedValueOnce([ + { id: 10, name: 'Bob' }, + ]) + + rpcMock(mockDataSource) + .mockResolvedValueOnce([ + { + table_name: 'users', + cursor_column: 'id', + last_cursor_value: '5', + rows_synced: 1, + }, + ]) // GET_CHECKPOINT returns existing checkpoint + .mockResolvedValueOnce(undefined) // INSERT OR REPLACE + .mockResolvedValueOnce(undefined) // UPSERT_CHECKPOINT + + await (plugin as any).sync() + + const calls = rpcMock(mockDataSource).mock.calls + + const checkpointCall = calls.find((call: any[]) => { + const opts = call[0] as { sql: string; params: unknown[] } + return opts.sql.includes( + 'INSERT OR REPLACE INTO tmp_replication_checkpoints' + ) + }) + + expect(checkpointCall).toBeDefined() + const opts = checkpointCall![0] as { + sql: string + params: unknown[] + } + expect(opts.params[2]).toBe('10') // last_cursor_value updated + expect(opts.params[3]).toBe(2) // rows_synced = 1 previous + 1 new + }) + }) + + describe('Idempotency', () => { + it('should use INSERT OR REPLACE to avoid duplicates when syncing the same rows twice', async () => { + const plugin = new ReplicationPlugin({ + tables: ['users'], + cursorColumn: 'id', + batchSize: 100, + }) + + ;(plugin as any).dataSource = mockDataSource + ;(plugin as any).config = adminConfig + + const mockRows = [{ id: 1, name: 'Alice' }] + + // First sync + vi.mocked(executeExternalQuery).mockResolvedValueOnce(mockRows) + rpcMock(mockDataSource) + .mockResolvedValueOnce([]) // GET_CHECKPOINT + .mockResolvedValueOnce(undefined) // INSERT OR REPLACE row + .mockResolvedValueOnce(undefined) // UPSERT_CHECKPOINT + + await (plugin as any).sync() + + const firstCalls = rpcMock(mockDataSource).mock.calls + const firstUpsert = firstCalls.find((call: any[]) => { + const opts = call[0] as { sql: string } + return opts.sql.startsWith('INSERT OR REPLACE INTO users') + }) + + expect(firstUpsert).toBeDefined() + expect((firstUpsert![0] as { sql: string }).sql).toContain( + 'INSERT OR REPLACE' + ) + + // Second sync with same rows — INSERT OR REPLACE guarantees no duplicates + vi.clearAllMocks() + vi.mocked(executeExternalQuery).mockResolvedValueOnce(mockRows) + rpcMock(mockDataSource) + .mockResolvedValueOnce([ + { + table_name: 'users', + cursor_column: 'id', + last_cursor_value: '0', + rows_synced: 1, + }, + ]) // GET_CHECKPOINT + .mockResolvedValueOnce(undefined) // INSERT OR REPLACE row + .mockResolvedValueOnce(undefined) // UPSERT_CHECKPOINT + + await (plugin as any).sync() + + const secondCalls = rpcMock(mockDataSource).mock.calls + const secondUpsert = secondCalls.find((call: any[]) => { + const opts = call[0] as { sql: string } + return opts.sql.startsWith('INSERT OR REPLACE INTO users') + }) + + expect(secondUpsert).toBeDefined() + expect((secondUpsert![0] as { sql: string }).sql).toContain( + 'INSERT OR REPLACE' + ) + }) + }) + + describe('sync - non-Postgres source', () => { + it('should skip sync and warn when external source is not PostgreSQL', async () => { + const warnSpy = vi + .spyOn(console, 'warn') + .mockImplementation(() => {}) + + const plugin = new ReplicationPlugin({ tables: ['users'] }) + + ;(plugin as any).dataSource = { + ...mockDataSource, + external: { dialect: 'mysql' }, + } + ;(plugin as any).config = adminConfig + + await (plugin as any).sync() + + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining('Only PostgreSQL') + ) + expect(rpcMock(mockDataSource)).not.toHaveBeenCalled() + }) + }) +}) diff --git a/plugins/replication/index.ts b/plugins/replication/index.ts new file mode 100644 index 0000000..ef14c81 --- /dev/null +++ b/plugins/replication/index.ts @@ -0,0 +1,272 @@ +import { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler' +import { StarbasePlugin } from '../../src/plugin' +import { DataSource, QueryResult } from '../../src/types' +import { createResponse } from '../../src/utils' +import { executeExternalQuery } from '../../src/operation' + +export interface ReplicationConfig { + /** Tables to replicate. Empty array means all tables from external source. */ + tables?: string[] + /** Column to use as cursor for incremental sync (e.g. 'id' or 'created_at') */ + cursorColumn?: string + /** Rows to fetch per batch. Default: 500 */ + batchSize?: number + /** Cloudflare ExecutionContext for waitUntil support */ + ctx?: ExecutionContext +} + +const DEFAULT_BATCH_SIZE = 500 +const DEFAULT_CURSOR_COLUMN = 'id' + +const SQL_QUERIES = { + CREATE_CHECKPOINT_TABLE: ` + CREATE TABLE IF NOT EXISTS tmp_replication_checkpoints ( + table_name TEXT NOT NULL PRIMARY KEY, + cursor_column TEXT NOT NULL, + last_cursor_value TEXT, + last_synced_at DATETIME, + rows_synced INTEGER DEFAULT 0 + ) + `, + GET_ALL_CHECKPOINTS: `SELECT * FROM tmp_replication_checkpoints`, + GET_CHECKPOINT: `SELECT * FROM tmp_replication_checkpoints WHERE table_name = ?`, + UPSERT_CHECKPOINT: ` + INSERT OR REPLACE INTO tmp_replication_checkpoints + (table_name, cursor_column, last_cursor_value, last_synced_at, rows_synced) + VALUES (?, ?, ?, datetime('now'), ?) + `, + GET_EXTERNAL_TABLES: ` + SELECT table_name FROM information_schema.tables + WHERE table_schema = 'public' AND table_type = 'BASE TABLE' + `, +} + +export class ReplicationPlugin extends StarbasePlugin { + public pathPrefix: string = '/replication' + private dataSource?: DataSource + private config?: StarbaseDBConfiguration + private executionContext?: ExecutionContext + private readonly replicationConfig: Required> + + constructor(opts?: ReplicationConfig) { + super('starbasedb:replication', { requiresAuth: true }) + this.replicationConfig = { + tables: opts?.tables ?? [], + cursorColumn: opts?.cursorColumn ?? DEFAULT_CURSOR_COLUMN, + batchSize: opts?.batchSize ?? DEFAULT_BATCH_SIZE, + } + this.executionContext = opts?.ctx + } + + override async register(app: StarbaseApp): Promise { + app.use(async (c, next) => { + this.dataSource = c.get('dataSource') + this.config = c.get('config') + await this.dataSource?.rpc.executeQuery({ + sql: SQL_QUERIES.CREATE_CHECKPOINT_TABLE, + params: [], + }) + await next() + }) + + app.get(`${this.pathPrefix}/status`, async (c) => { + const config = c.get('config') + if (config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized', 401) + } + + const checkpoints = (await this.dataSource?.rpc.executeQuery({ + sql: SQL_QUERIES.GET_ALL_CHECKPOINTS, + params: [], + })) as QueryResult[] + + return createResponse( + { + checkpoints: checkpoints ?? [], + config: this.replicationConfig, + }, + undefined, + 200 + ) + }) + + app.post(`${this.pathPrefix}/run`, async (c) => { + const config = c.get('config') + if (config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized', 401) + } + + this.executionContext?.waitUntil(this.sync()) + + return createResponse( + { success: true, message: 'Replication started' }, + undefined, + 200 + ) + }) + } + + // Table and column names are structural identifiers, not user-supplied values; + // cursor value and batch size are safely parameterized with `?` placeholders. + private buildFetchQuery( + table: string, + cursorColumn: string, + lastCursorValue: string | null, + batchSize: number + ): { sql: string; params: unknown[] } { + if (lastCursorValue !== null && lastCursorValue !== undefined) { + return { + sql: `SELECT * FROM ${table} WHERE ${cursorColumn} > ? ORDER BY ${cursorColumn} ASC LIMIT ?`, + params: [lastCursorValue, batchSize], + } + } + + return { + sql: `SELECT * FROM ${table} ORDER BY ${cursorColumn} ASC LIMIT ?`, + params: [batchSize], + } + } + + private inferSQLiteType(columnName: string): string { + if (columnName === 'id' || columnName.endsWith('_id')) { + return 'INTEGER' + } + + return 'TEXT' + } + + private sanitizeIdentifier(identifier: string): string { + if (!/^[A-Za-z0-9_]+$/.test(identifier)) { + throw new Error( + `[replication] Invalid identifier "${identifier}". Only alphanumeric characters and underscores are allowed.` + ) + } + + return identifier + } + + private async sync(): Promise { + if (!this.dataSource || !this.config) { + console.warn('[replication] Plugin not properly initialized.') + return + } + + if (this.dataSource.external?.dialect !== 'postgresql') { + console.warn( + '[replication] Only PostgreSQL external sources are supported. Skipping sync.' + ) + return + } + + const tables = await this.getTablesToSync() + + for (const table of tables) { + try { + await this.syncTable(table) + } catch (error) { + console.error( + `[replication] Error syncing table "${table}":`, + error + ) + } + } + } + + private async getTablesToSync(): Promise { + if (this.replicationConfig.tables.length > 0) { + return this.replicationConfig.tables + } + + const rows = (await executeExternalQuery({ + sql: SQL_QUERIES.GET_EXTERNAL_TABLES, + params: [], + dataSource: this.dataSource!, + config: this.config!, + })) as QueryResult[] + + return rows.map((row) => row['table_name'] as string) + } + + private async syncTable(table: string): Promise { + const { cursorColumn, batchSize } = this.replicationConfig + + const checkpointRows = (await this.dataSource!.rpc.executeQuery({ + sql: SQL_QUERIES.GET_CHECKPOINT, + params: [table], + })) as QueryResult[] + + const checkpoint = checkpointRows?.[0] + const lastCursorValue = + (checkpoint?.last_cursor_value as string | null) ?? null + + const { sql, params } = this.buildFetchQuery( + table, + cursorColumn, + lastCursorValue, + batchSize + ) + + const rows = (await executeExternalQuery({ + sql, + params, + dataSource: this.dataSource!, + config: this.config!, + })) as QueryResult[] + + if (!rows || rows.length === 0) { + console.log(`[replication] No new rows for table "${table}"`) + return + } + + const firstRow = rows[0] + const firstRowColumns = Object.keys(firstRow) + + if (firstRowColumns.length === 0) { + throw new Error( + `[replication] Cannot sync table "${table}" because the first row has no columns.` + ) + } + + const safeTableName = this.sanitizeIdentifier(table) + const safeCursorColumn = this.sanitizeIdentifier(cursorColumn) + const createTableColumns = firstRowColumns + .map((columnName) => { + const safeColumnName = this.sanitizeIdentifier(columnName) + return `${safeColumnName} ${this.inferSQLiteType(columnName)}` + }) + .join(', ') + + await this.dataSource!.rpc.executeQuery({ + sql: `CREATE TABLE IF NOT EXISTS ${safeTableName} (${createTableColumns}, PRIMARY KEY (${safeCursorColumn}))`, + params: [], + }) + + for (const row of rows) { + const columns = Object.keys(row) + const safeColumns = columns.map((columnName) => + this.sanitizeIdentifier(columnName) + ) + const placeholders = safeColumns.map(() => '?').join(', ') + const values = columns.map((col) => row[col]) + + // Table name and column names are structural identifiers from the + // external schema; row values are safely parameterized. + await this.dataSource!.rpc.executeQuery({ + sql: `INSERT OR REPLACE INTO ${safeTableName} (${safeColumns.join(', ')}) VALUES (${placeholders})`, + params: values, + }) + } + + const lastRow = rows[rows.length - 1] + const newCursorValue = String(lastRow[cursorColumn] ?? '') + const previousRowsSynced = (checkpoint?.rows_synced as number) ?? 0 + const totalRowsSynced = previousRowsSynced + rows.length + + await this.dataSource!.rpc.executeQuery({ + sql: SQL_QUERIES.UPSERT_CHECKPOINT, + params: [table, cursorColumn, newCursorValue, totalRowsSynced], + }) + + console.log(`[replication] synced ${rows.length} rows from "${table}"`) + } +} diff --git a/src/index.ts b/src/index.ts index 4d08932..5cd3754 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,6 +12,7 @@ import { QueryLogPlugin } from '../plugins/query-log' import { StatsPlugin } from '../plugins/stats' import { CronPlugin } from '../plugins/cron' import { InterfacePlugin } from '../plugins/interface' +import { ReplicationPlugin } from '../plugins/replication' export { StarbaseDBDurableObject } from './do' @@ -222,6 +223,11 @@ export default { preventSelectStar: false, }), new QueryLogPlugin({ ctx }), + new ReplicationPlugin({ + tables: ['replication_demo'], + cursorColumn: 'id', + ctx, + }), cdcPlugin, cronPlugin, new StatsPlugin(),