From c0f8d021e016d031c8d2e565340528c80528846d Mon Sep 17 00:00:00 2001 From: akamabu Date: Wed, 13 May 2026 04:21:58 +0000 Subject: [PATCH 1/2] fix: stream and async R2 database dumps --- src/export/dump.test.ts | 299 +++++++++++++++++++++++++++++----------- src/export/dump.ts | 193 +++++++++++++++++++------- src/handler.ts | 9 +- src/index.ts | 10 ++ 4 files changed, 378 insertions(+), 133 deletions(-) diff --git a/src/export/dump.test.ts b/src/export/dump.test.ts index ca65b43..11a2120 100644 --- a/src/export/dump.test.ts +++ b/src/export/dump.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect, vi, beforeEach } from 'vitest' -import { dumpDatabaseRoute } from './dump' -import { executeOperation } from '.' +import { dumpDatabaseRoute, createDatabaseDumpStream } from './dump' +import * as exportIndex from '.' import { createResponse } from '../utils' import type { DataSource } from '../types' import type { StarbaseDBConfiguration } from '../handler' @@ -11,7 +11,7 @@ vi.mock('.', () => ({ vi.mock('../utils', () => ({ createResponse: vi.fn( - (data, message, status) => + (data: any, message: any, status: any) => new Response(JSON.stringify({ result: data, error: message }), { status, headers: { 'Content-Type': 'application/json' }, @@ -19,127 +19,258 @@ vi.mock('../utils', () => ({ ), })) -let mockDataSource: DataSource -let mockConfig: StarbaseDBConfiguration +const mockExecuteOperation = vi.mocked(exportIndex.executeOperation) -beforeEach(() => { - vi.clearAllMocks() - - mockDataSource = { - source: 'external', - external: { dialect: 'sqlite' }, +function makeDataSource(): DataSource { + return { + source: 'internal', rpc: { executeQuery: vi.fn() }, } as any +} - mockConfig = { +function makeConfig(extra: Partial = {}): StarbaseDBConfiguration { + return { outerbaseApiKey: 'mock-api-key', role: 'admin', features: { allowlist: true, rls: true, rest: true }, + ...extra, + } +} + +function makeRequest(url = 'https://example.com/export/dump') { + return new Request(url) +} + +async function readStream(stream: ReadableStream): Promise { + const reader = stream.getReader() + const decoder = new TextDecoder() + let text = '' + while (true) { + const { done, value } = await reader.read() + if (done) break + text += decoder.decode(value) } + return text +} + +beforeEach(() => { + mockExecuteOperation.mockReset() +}) + +// ─── createDatabaseDumpStream ──────────────────────────────────────────────── + +describe('createDatabaseDumpStream', () => { + it('streams SQL header + CREATE + INSERT statements', async () => { + mockExecuteOperation + .mockResolvedValueOnce([{ name: 'users' }]) + .mockResolvedValueOnce([{ sql: 'CREATE TABLE users (id INTEGER, name TEXT)' }]) + .mockResolvedValueOnce([{ id: 1, name: 'Alice' }, { id: 2, name: 'Bob' }]) + .mockResolvedValueOnce([]) // end of pages + + const text = await readStream(createDatabaseDumpStream(makeDataSource(), makeConfig(), 500)) + + expect(text).toContain('SQLite format 3') + expect(text).toContain('CREATE TABLE users (id INTEGER, name TEXT);') + expect(text).toContain(`INSERT INTO "users" VALUES (1, 'Alice');`) + expect(text).toContain(`INSERT INTO "users" VALUES (2, 'Bob');`) + }) + + it('handles NULL, boolean, and number values correctly', async () => { + mockExecuteOperation + .mockResolvedValueOnce([{ name: 'data' }]) + .mockResolvedValueOnce([{ sql: 'CREATE TABLE data (a INTEGER, b INTEGER, c INTEGER)' }]) + .mockResolvedValueOnce([{ a: null, b: true, c: 42 }]) + .mockResolvedValueOnce([]) + + const text = await readStream(createDatabaseDumpStream(makeDataSource(), makeConfig(), 500)) + + expect(text).toContain(`INSERT INTO "data" VALUES (NULL, 1, 42);`) + }) + + it('paginates rows using LIMIT/OFFSET', async () => { + const chunkSize = 2 + mockExecuteOperation + .mockResolvedValueOnce([{ name: 'items' }]) + .mockResolvedValueOnce([{ sql: 'CREATE TABLE items (id INTEGER)' }]) + .mockResolvedValueOnce([{ id: 1 }, { id: 2 }]) // full page + .mockResolvedValueOnce([{ id: 3 }]) // partial → last page + + const text = await readStream(createDatabaseDumpStream(makeDataSource(), makeConfig(), chunkSize)) + + expect(text).toContain('INSERT INTO "items" VALUES (1);') + expect(text).toContain('INSERT INTO "items" VALUES (2);') + expect(text).toContain('INSERT INTO "items" VALUES (3);') + // 1 (tables) + 1 (schema) + 2 (pages) = 4 + expect(mockExecuteOperation).toHaveBeenCalledTimes(4) + }) + + it('escapes single quotes in string values', async () => { + mockExecuteOperation + .mockResolvedValueOnce([{ name: 'users' }]) + .mockResolvedValueOnce([{ sql: 'CREATE TABLE users (id INTEGER, bio TEXT)' }]) + .mockResolvedValueOnce([{ id: 1, bio: "Alice's adventure" }]) + .mockResolvedValueOnce([]) + + const text = await readStream(createDatabaseDumpStream(makeDataSource(), makeConfig(), 500)) + + expect(text).toContain(`INSERT INTO "users" VALUES (1, 'Alice''s adventure');`) + }) + + it('handles empty database (no tables)', async () => { + mockExecuteOperation.mockResolvedValueOnce([]) + + const text = await readStream(createDatabaseDumpStream(makeDataSource(), makeConfig(), 500)) + + expect(text).toBe('SQLite format 3\0') + }) + + it('skips tables with no schema entry', async () => { + mockExecuteOperation + .mockResolvedValueOnce([{ name: 'ghost' }]) + .mockResolvedValueOnce([]) // no schema row + .mockResolvedValueOnce([]) // no data rows + + const text = await readStream(createDatabaseDumpStream(makeDataSource(), makeConfig(), 500)) + + expect(text).not.toContain('CREATE TABLE') + expect(text).not.toContain('INSERT INTO') + }) }) -describe('Database Dump Module', () => { - it('should return a database dump when tables exist', async () => { - vi.mocked(executeOperation) +// ─── dumpDatabaseRoute (sync streaming) ───────────────────────────────────── + +describe('dumpDatabaseRoute (sync)', () => { + it('returns a streaming response with correct headers', async () => { + mockExecuteOperation .mockResolvedValueOnce([{ name: 'users' }, { name: 'orders' }]) - .mockResolvedValueOnce([ - { sql: 'CREATE TABLE users (id INTEGER, name TEXT);' }, - ]) - .mockResolvedValueOnce([ - { id: 1, name: 'Alice' }, - { id: 2, name: 'Bob' }, - ]) - .mockResolvedValueOnce([ - { sql: 'CREATE TABLE orders (id INTEGER, total REAL);' }, - ]) - .mockResolvedValueOnce([ - { id: 1, total: 99.99 }, - { id: 2, total: 49.5 }, - ]) - - const response = await dumpDatabaseRoute(mockDataSource, mockConfig) + // users schema + page + .mockResolvedValueOnce([{ sql: 'CREATE TABLE users (id INTEGER, name TEXT)' }]) + .mockResolvedValueOnce([{ id: 1, name: 'Alice' }, { id: 2, name: 'Bob' }]) + // orders schema + page + .mockResolvedValueOnce([{ sql: 'CREATE TABLE orders (id INTEGER, total REAL)' }]) + .mockResolvedValueOnce([{ id: 1, total: 99.99 }, { id: 2, total: 49.5 }]) + + const response = await dumpDatabaseRoute(makeRequest(), makeDataSource(), makeConfig()) expect(response).toBeInstanceOf(Response) - expect(response.headers.get('Content-Type')).toBe( - 'application/x-sqlite3' - ) + expect(response.headers.get('Content-Type')).toBe('application/x-sqlite3') expect(response.headers.get('Content-Disposition')).toBe( 'attachment; filename="database_dump.sql"' ) const dumpText = await response.text() - expect(dumpText).toContain( - 'CREATE TABLE users (id INTEGER, name TEXT);' - ) - expect(dumpText).toContain("INSERT INTO users VALUES (1, 'Alice');") - expect(dumpText).toContain("INSERT INTO users VALUES (2, 'Bob');") - expect(dumpText).toContain( - 'CREATE TABLE orders (id INTEGER, total REAL);' - ) - expect(dumpText).toContain('INSERT INTO orders VALUES (1, 99.99);') - expect(dumpText).toContain('INSERT INTO orders VALUES (2, 49.5);') + expect(dumpText).toContain('CREATE TABLE users (id INTEGER, name TEXT);') + expect(dumpText).toContain(`INSERT INTO "users" VALUES (1, 'Alice');`) + expect(dumpText).toContain(`INSERT INTO "users" VALUES (2, 'Bob');`) + expect(dumpText).toContain('CREATE TABLE orders (id INTEGER, total REAL);') + expect(dumpText).toContain('INSERT INTO "orders" VALUES (1, 99.99);') + expect(dumpText).toContain('INSERT INTO "orders" VALUES (2, 49.5);') }) - it('should handle empty databases (no tables)', async () => { - vi.mocked(executeOperation).mockResolvedValueOnce([]) + it('handles empty database', async () => { + mockExecuteOperation.mockResolvedValueOnce([]) - const response = await dumpDatabaseRoute(mockDataSource, mockConfig) + const response = await dumpDatabaseRoute(makeRequest(), makeDataSource(), makeConfig()) expect(response).toBeInstanceOf(Response) - expect(response.headers.get('Content-Type')).toBe( - 'application/x-sqlite3' - ) + expect(response.headers.get('Content-Type')).toBe('application/x-sqlite3') const dumpText = await response.text() expect(dumpText).toBe('SQLite format 3\0') }) - it('should handle databases with tables but no data', async () => { - vi.mocked(executeOperation) + it('returns 500 when executeOperation throws synchronously before stream', async () => { + vi.spyOn(console, 'error').mockImplementation(() => {}) + // Make the outer try/catch fire by throwing in the URL parse (simulate bad request) + const badReq = { url: 'not-a-url' } as any + const response = await dumpDatabaseRoute(badReq, makeDataSource(), makeConfig()) + expect(response.status).toBe(500) + }) +}) + +// ─── dumpDatabaseRoute (async / R2) ───────────────────────────────────────── + +describe('dumpDatabaseRoute (async=true)', () => { + it('returns 400 when no R2 bucket is configured', async () => { + const req = makeRequest('https://example.com/export/dump?async=true') + const response = await dumpDatabaseRoute(req, makeDataSource(), makeConfig()) + + expect(response.status).toBe(400) + const json: any = await response.json() + expect(json.error).toContain('EXPORT_BUCKET') + }) + + it('returns 202 accepted and uploads to R2 bucket', async () => { + const mockPut = vi.fn().mockResolvedValue(undefined) + const mockBucket = { put: mockPut } as any + + mockExecuteOperation .mockResolvedValueOnce([{ name: 'users' }]) - .mockResolvedValueOnce([ - { sql: 'CREATE TABLE users (id INTEGER, name TEXT);' }, - ]) + .mockResolvedValueOnce([{ sql: 'CREATE TABLE users (id INTEGER)' }]) + .mockResolvedValueOnce([{ id: 1 }]) .mockResolvedValueOnce([]) - const response = await dumpDatabaseRoute(mockDataSource, mockConfig) - - expect(response).toBeInstanceOf(Response) - const dumpText = await response.text() - expect(dumpText).toContain( - 'CREATE TABLE users (id INTEGER, name TEXT);' + const req = makeRequest('https://example.com/export/dump?async=true') + const response = await dumpDatabaseRoute( + req, + makeDataSource(), + makeConfig({ export: { bucket: mockBucket } }) ) - expect(dumpText).not.toContain('INSERT INTO users VALUES') + + expect(response.status).toBe(202) + const json: any = await response.json() + expect(json.result.status).toBe('accepted') + expect(json.result.filename).toMatch(/^dump_/) + expect(mockPut).toHaveBeenCalledOnce() }) - it('should escape single quotes properly in string values', async () => { - vi.mocked(executeOperation) - .mockResolvedValueOnce([{ name: 'users' }]) - .mockResolvedValueOnce([ - { sql: 'CREATE TABLE users (id INTEGER, bio TEXT);' }, - ]) - .mockResolvedValueOnce([{ id: 1, bio: "Alice's adventure" }]) + it('uses custom filename from query param', async () => { + const mockPut = vi.fn().mockResolvedValue(undefined) + const mockBucket = { put: mockPut } as any - const response = await dumpDatabaseRoute(mockDataSource, mockConfig) + mockExecuteOperation.mockResolvedValueOnce([]) - expect(response).toBeInstanceOf(Response) - const dumpText = await response.text() - expect(dumpText).toContain( - "INSERT INTO users VALUES (1, 'Alice''s adventure');" + const req = makeRequest( + 'https://example.com/export/dump?async=true&filename=my_backup.sql' + ) + const response = await dumpDatabaseRoute( + req, + makeDataSource(), + makeConfig({ export: { bucket: mockBucket } }) + ) + + expect(response.status).toBe(202) + const json: any = await response.json() + expect(json.result.filename).toBe('my_backup.sql') + expect(mockPut).toHaveBeenCalledWith( + 'my_backup.sql', + expect.anything(), + expect.objectContaining({ httpMetadata: { contentType: 'application/x-sqlite3' } }) ) }) - it('should return a 500 response when an error occurs', async () => { - const consoleErrorMock = vi - .spyOn(console, 'error') - .mockImplementation(() => {}) - vi.mocked(executeOperation).mockRejectedValue( - new Error('Database Error') + it('fires callback URL on completion', async () => { + const fetchMock = vi.fn().mockResolvedValue(new Response('ok')) + vi.stubGlobal('fetch', fetchMock) + + const mockPut = vi.fn().mockResolvedValue(undefined) + const mockBucket = { put: mockPut } as any + + mockExecuteOperation.mockResolvedValueOnce([]) + + const req = makeRequest('https://example.com/export/dump?async=true') + await dumpDatabaseRoute( + req, + makeDataSource(), + makeConfig({ + export: { bucket: mockBucket, callbackUrl: 'https://hooks.example.com/done' }, + }) ) - const response = await dumpDatabaseRoute(mockDataSource, mockConfig) + expect(fetchMock).toHaveBeenCalledWith( + 'https://hooks.example.com/done', + expect.objectContaining({ method: 'POST' }) + ) - expect(response.status).toBe(500) - const jsonResponse: { error: string } = await response.json() - expect(jsonResponse.error).toBe('Failed to create database dump') + vi.unstubAllGlobals() }) }) diff --git a/src/export/dump.ts b/src/export/dump.ts index 91a2e89..aedb56b 100644 --- a/src/export/dump.ts +++ b/src/export/dump.ts @@ -3,67 +3,166 @@ import { StarbaseDBConfiguration } from '../handler' import { DataSource } from '../types' import { createResponse } from '../utils' -export async function dumpDatabaseRoute( - dataSource: DataSource, - config: StarbaseDBConfiguration -): Promise { +const DEFAULT_EXPORT_CHUNK_SIZE = 500 + +function quoteIdentifier(identifier: string): string { + return `"${identifier.replace(/"/g, '""')}"` +} + +function sqlValue(value: unknown): string { + if (value === null || value === undefined) return 'NULL' + if (typeof value === 'number' || typeof value === 'bigint') return String(value) + if (typeof value === 'boolean') return value ? '1' : '0' + if (value instanceof ArrayBuffer) { + return `X'${Array.from(new Uint8Array(value)).map((b) => b.toString(16).padStart(2, '0')).join('')}'` + } + return `'${String(value).replace(/'/g, "''")}'` +} + +function dumpFilename() { + return `dump_${new Date().toISOString().replace(/[-:]/g, '').replace(/\.\d{3}Z$/, 'Z').replace('T', '-')}.sql` +} + +async function notifyCallback(url: string | undefined, payload: Record) { + if (!url) return try { - // Get all table names - const tablesResult = await executeOperation( - [{ sql: "SELECT name FROM sqlite_master WHERE type='table';" }], - dataSource, - config - ) + await fetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(payload), + }) + } catch (error) { + console.error('Database dump callback failed:', error) + } +} - const tables = tablesResult.map((row: any) => row.name) - let dumpContent = 'SQLite format 3\0' // SQLite file header - - // Iterate through all tables - for (const table of tables) { - // Get table schema - const schemaResult = await executeOperation( - [ - { - sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name='${table}';`, - }, - ], - dataSource, - config - ) +export function createDatabaseDumpStream( + dataSource: DataSource, + config: StarbaseDBConfiguration, + chunkSize = DEFAULT_EXPORT_CHUNK_SIZE +): ReadableStream { + const encoder = new TextEncoder() - if (schemaResult.length) { - const schema = schemaResult[0].sql - dumpContent += `\n-- Table: ${table}\n${schema};\n\n` - } + return new ReadableStream({ + async start(controller) { + try { + controller.enqueue(encoder.encode('SQLite format 3\0')) - // Get table data - const dataResult = await executeOperation( - [{ sql: `SELECT * FROM ${table};` }], - dataSource, - config - ) - - for (const row of dataResult) { - const values = Object.values(row).map((value) => - typeof value === 'string' - ? `'${value.replace(/'/g, "''")}'` - : value + const tablesResult = await executeOperation( + [{ sql: "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name;" }], + dataSource, + config ) - dumpContent += `INSERT INTO ${table} VALUES (${values.join(', ')});\n` + const tables = tablesResult.map((row: any) => row.name) + + for (const table of tables) { + const schemaResult = await executeOperation( + [ + { + sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name=?;`, + params: [table], + }, + ], + dataSource, + config + ) + + if (schemaResult.length) { + controller.enqueue( + encoder.encode(`\n-- Table: ${table}\n${schemaResult[0].sql};\n\n`) + ) + } + + let offset = 0 + while (true) { + const rows = await executeOperation( + [ + { + sql: `SELECT * FROM ${quoteIdentifier(table)} LIMIT ? OFFSET ?;`, + params: [chunkSize, offset], + }, + ], + dataSource, + config + ) + + if (!rows.length) break + + let chunk = '' + for (const row of rows) { + const values = Object.values(row).map(sqlValue) + chunk += `INSERT INTO ${quoteIdentifier(table)} VALUES (${values.join(', ')});\n` + } + controller.enqueue(encoder.encode(chunk)) + + if (rows.length < chunkSize) break + offset += rows.length + } + + controller.enqueue(encoder.encode('\n')) + } + + controller.close() + } catch (error) { + controller.error(error) } + }, + }) +} - dumpContent += '\n' - } +async function createAsyncDump( + request: Request, + dataSource: DataSource, + config: StarbaseDBConfiguration +) { + const bucket = config.export?.bucket + if (!bucket) { + return createResponse( + undefined, + 'Async database dumps require an EXPORT_BUCKET R2 binding.', + 400 + ) + } + + const url = new URL(request.url) + const filename = url.searchParams.get('filename') || dumpFilename() + const callbackUrl = url.searchParams.get('callbackUrl') || config.export?.callbackUrl + const chunkSize = config.export?.chunkSize || DEFAULT_EXPORT_CHUNK_SIZE + const stream = createDatabaseDumpStream(dataSource, config, chunkSize) + const job = bucket + .put(filename, stream, { httpMetadata: { contentType: 'application/x-sqlite3' } }) + .then(() => notifyCallback(callbackUrl, { status: 'completed', filename })) + .catch((error) => notifyCallback(callbackUrl, { status: 'failed', filename, error: error?.message })) - // Create a Blob from the dump content - const blob = new Blob([dumpContent], { type: 'application/x-sqlite3' }) + dataSource.executionContext?.waitUntil(job) + if (!dataSource.executionContext) await job + + return createResponse({ filename, status: 'accepted' }, undefined, 202) +} + +export async function dumpDatabaseRoute( + request: Request, + dataSource: DataSource, + config: StarbaseDBConfiguration +): Promise { + try { + if (new URL(request.url).searchParams.get('async') === 'true') { + return createAsyncDump(request, dataSource, config) + } const headers = new Headers({ 'Content-Type': 'application/x-sqlite3', 'Content-Disposition': 'attachment; filename="database_dump.sql"', }) - return new Response(blob, { headers }) + return new Response( + createDatabaseDumpStream( + dataSource, + config, + config.export?.chunkSize || DEFAULT_EXPORT_CHUNK_SIZE + ), + { headers } + ) } catch (error: any) { console.error('Database Dump Error:', error) return createResponse(undefined, 'Failed to create database dump', 500) diff --git a/src/handler.ts b/src/handler.ts index 3fa0085..38b4141 100644 --- a/src/handler.ts +++ b/src/handler.ts @@ -19,6 +19,11 @@ import { StarbasePlugin, StarbasePluginRegistry } from './plugin' export interface StarbaseDBConfiguration { outerbaseApiKey?: string role: 'admin' | 'client' + export?: { + bucket?: R2Bucket + callbackUrl?: string + chunkSize?: number + } features?: { allowlist?: boolean rls?: boolean @@ -120,8 +125,8 @@ export class StarbaseDB { } if (this.getFeature('export')) { - this.app.get('/export/dump', this.isInternalSource, async () => { - return dumpDatabaseRoute(this.dataSource, this.config) + this.app.get('/export/dump', this.isInternalSource, async (c) => { + return dumpDatabaseRoute(c.req.raw, this.dataSource, this.config) }) this.app.get( diff --git a/src/index.ts b/src/index.ts index 4d08932..5c8bae1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -56,6 +56,11 @@ export interface Env { HYPERDRIVE: Hyperdrive + // Optional R2 destination for long-running async exports. + EXPORT_BUCKET?: R2Bucket + EXPORT_CALLBACK_URL?: string + EXPORT_CHUNK_SIZE?: number + // ## DO NOT REMOVE: TEMPLATE INTERFACE ## } @@ -187,6 +192,11 @@ export default { const config: StarbaseDBConfiguration = { outerbaseApiKey: env.OUTERBASE_API_KEY, role, + export: { + bucket: env.EXPORT_BUCKET, + callbackUrl: env.EXPORT_CALLBACK_URL, + chunkSize: env.EXPORT_CHUNK_SIZE, + }, features: { allowlist: env.ENABLE_ALLOWLIST, rls: env.ENABLE_RLS, From b86323c2342b86f7013405d3ad6ff5630e4c9c6f Mon Sep 17 00:00:00 2001 From: akamabu Date: Thu, 14 May 2026 04:33:34 +0000 Subject: [PATCH 2/2] fix: make async database dumps resumable --- src/export/dump.test.ts | 57 ++++++++++--- src/export/dump.ts | 183 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 218 insertions(+), 22 deletions(-) diff --git a/src/export/dump.test.ts b/src/export/dump.test.ts index 11a2120..13851dd 100644 --- a/src/export/dump.test.ts +++ b/src/export/dump.test.ts @@ -41,6 +41,23 @@ function makeRequest(url = 'https://example.com/export/dump') { return new Request(url) } +function makeMockBucket() { + const objects = new Map() + const bucket = { + get: vi.fn(async (key: string) => { + if (!objects.has(key)) return null + const value = objects.get(key)! + return { json: async () => JSON.parse(value), text: async () => value } + }), + put: vi.fn(async (key: string, value: any) => { + objects.set(key, typeof value === 'string' ? value : String(value)) + return undefined + }), + objects, + } + return bucket as any +} + async function readStream(stream: ReadableStream): Promise { const reader = stream.getReader() const decoder = new TextDecoder() @@ -200,8 +217,8 @@ describe('dumpDatabaseRoute (async=true)', () => { }) it('returns 202 accepted and uploads to R2 bucket', async () => { - const mockPut = vi.fn().mockResolvedValue(undefined) - const mockBucket = { put: mockPut } as any + const mockBucket = makeMockBucket() + const mockPut = mockBucket.put mockExecuteOperation .mockResolvedValueOnce([{ name: 'users' }]) @@ -216,16 +233,25 @@ describe('dumpDatabaseRoute (async=true)', () => { makeConfig({ export: { bucket: mockBucket } }) ) - expect(response.status).toBe(202) + expect(response.status).toBe(200) const json: any = await response.json() - expect(json.result.status).toBe('accepted') + expect(json.result.status).toBe('completed') expect(json.result.filename).toMatch(/^dump_/) - expect(mockPut).toHaveBeenCalledOnce() + expect(mockPut).toHaveBeenCalledWith( + expect.stringContaining('.manifest.json'), + expect.any(String), + expect.objectContaining({ httpMetadata: { contentType: 'application/json' } }) + ) + expect(mockPut).toHaveBeenCalledWith( + expect.stringContaining('.parts/'), + expect.any(String), + expect.objectContaining({ httpMetadata: { contentType: 'application/sql' } }) + ) }) it('uses custom filename from query param', async () => { - const mockPut = vi.fn().mockResolvedValue(undefined) - const mockBucket = { put: mockPut } as any + const mockBucket = makeMockBucket() + const mockPut = mockBucket.put mockExecuteOperation.mockResolvedValueOnce([]) @@ -238,13 +264,18 @@ describe('dumpDatabaseRoute (async=true)', () => { makeConfig({ export: { bucket: mockBucket } }) ) - expect(response.status).toBe(202) + expect(response.status).toBe(200) const json: any = await response.json() expect(json.result.filename).toBe('my_backup.sql') expect(mockPut).toHaveBeenCalledWith( - 'my_backup.sql', - expect.anything(), - expect.objectContaining({ httpMetadata: { contentType: 'application/x-sqlite3' } }) + 'my_backup.sql.manifest.json', + expect.any(String), + expect.objectContaining({ httpMetadata: { contentType: 'application/json' } }) + ) + expect(mockPut).toHaveBeenCalledWith( + 'my_backup.sql.parts/00000000.sql', + expect.any(String), + expect.objectContaining({ httpMetadata: { contentType: 'application/sql' } }) ) }) @@ -252,8 +283,8 @@ describe('dumpDatabaseRoute (async=true)', () => { const fetchMock = vi.fn().mockResolvedValue(new Response('ok')) vi.stubGlobal('fetch', fetchMock) - const mockPut = vi.fn().mockResolvedValue(undefined) - const mockBucket = { put: mockPut } as any + const mockBucket = makeMockBucket() + const mockPut = mockBucket.put mockExecuteOperation.mockResolvedValueOnce([]) diff --git a/src/export/dump.ts b/src/export/dump.ts index aedb56b..2a8528a 100644 --- a/src/export/dump.ts +++ b/src/export/dump.ts @@ -4,6 +4,21 @@ import { DataSource } from '../types' import { createResponse } from '../utils' const DEFAULT_EXPORT_CHUNK_SIZE = 500 +const DEFAULT_ASYNC_TIME_BUDGET_MS = 20_000 + +type AsyncDumpManifest = { + status: 'running' | 'completed' | 'failed' + filename: string + callbackUrl?: string + chunkSize: number + tableIndex: number + offset: number + part: number + tables?: string[] + error?: string + createdAt: string + updatedAt: string +} function quoteIdentifier(identifier: string): string { return `"${identifier.replace(/"/g, '""')}"` @@ -23,6 +38,14 @@ function dumpFilename() { return `dump_${new Date().toISOString().replace(/[-:]/g, '').replace(/\.\d{3}Z$/, 'Z').replace('T', '-')}.sql` } +function manifestKey(filename: string) { + return `${filename}.manifest.json` +} + +function partKey(filename: string, part: number) { + return `${filename}.parts/${String(part).padStart(8, '0')}.sql` +} + async function notifyCallback(url: string | undefined, payload: Record) { if (!url) return try { @@ -110,6 +133,124 @@ export function createDatabaseDumpStream( }) } +async function readManifest(bucket: R2Bucket, filename: string): Promise { + const object = await bucket.get(manifestKey(filename)) + if (!object) return undefined + return object.json() +} + +async function writeManifest(bucket: R2Bucket, manifest: AsyncDumpManifest) { + manifest.updatedAt = new Date().toISOString() + await bucket.put(manifestKey(manifest.filename), JSON.stringify(manifest, null, 2), { + httpMetadata: { contentType: 'application/json' }, + }) +} + +async function ensureManifest( + bucket: R2Bucket, + filename: string, + callbackUrl: string | undefined, + chunkSize: number +): Promise { + const existing = await readManifest(bucket, filename) + if (existing) return existing + + const now = new Date().toISOString() + const manifest: AsyncDumpManifest = { + status: 'running', + filename, + callbackUrl, + chunkSize, + tableIndex: 0, + offset: 0, + part: 0, + createdAt: now, + updatedAt: now, + } + await writeManifest(bucket, manifest) + return manifest +} + +async function writePart(bucket: R2Bucket, manifest: AsyncDumpManifest, body: string) { + await bucket.put(partKey(manifest.filename, manifest.part), body, { + httpMetadata: { contentType: 'application/sql' }, + }) + manifest.part += 1 +} + +async function processAsyncDumpSlice( + bucket: R2Bucket, + manifest: AsyncDumpManifest, + dataSource: DataSource, + config: StarbaseDBConfiguration, + timeBudgetMs = DEFAULT_ASYNC_TIME_BUDGET_MS +) { + const startedAt = Date.now() + + if (!manifest.tables) { + const tablesResult = await executeOperation( + [{ sql: "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name;" }], + dataSource, + config + ) + manifest.tables = tablesResult.map((row: any) => row.name) + await writePart(bucket, manifest, 'SQLite format 3\0') + await writeManifest(bucket, manifest) + } + + while (manifest.tableIndex < manifest.tables.length) { + const table = manifest.tables[manifest.tableIndex] + + if (manifest.offset === 0) { + const schemaResult = await executeOperation( + [{ sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name=?;`, params: [table] }], + dataSource, + config + ) + if (schemaResult.length) { + await writePart(bucket, manifest, `\n-- Table: ${table}\n${schemaResult[0].sql};\n\n`) + } + } + + const rows = await executeOperation( + [{ sql: `SELECT * FROM ${quoteIdentifier(table)} LIMIT ? OFFSET ?;`, params: [manifest.chunkSize, manifest.offset] }], + dataSource, + config + ) + + if (!rows.length) { + await writePart(bucket, manifest, '\n') + manifest.tableIndex += 1 + manifest.offset = 0 + await writeManifest(bucket, manifest) + continue + } + + let chunk = '' + for (const row of rows) { + const values = Object.values(row).map(sqlValue) + chunk += `INSERT INTO ${quoteIdentifier(table)} VALUES (${values.join(', ')});\n` + } + await writePart(bucket, manifest, chunk) + + if (rows.length < manifest.chunkSize) { + await writePart(bucket, manifest, '\n') + manifest.tableIndex += 1 + manifest.offset = 0 + } else { + manifest.offset += rows.length + } + + await writeManifest(bucket, manifest) + + if (Date.now() - startedAt > timeBudgetMs) return + } + + manifest.status = 'completed' + await writeManifest(bucket, manifest) + await notifyCallback(manifest.callbackUrl, { status: 'completed', filename: manifest.filename, parts: manifest.part }) +} + async function createAsyncDump( request: Request, dataSource: DataSource, @@ -127,17 +268,41 @@ async function createAsyncDump( const url = new URL(request.url) const filename = url.searchParams.get('filename') || dumpFilename() const callbackUrl = url.searchParams.get('callbackUrl') || config.export?.callbackUrl - const chunkSize = config.export?.chunkSize || DEFAULT_EXPORT_CHUNK_SIZE - const stream = createDatabaseDumpStream(dataSource, config, chunkSize) - const job = bucket - .put(filename, stream, { httpMetadata: { contentType: 'application/x-sqlite3' } }) - .then(() => notifyCallback(callbackUrl, { status: 'completed', filename })) - .catch((error) => notifyCallback(callbackUrl, { status: 'failed', filename, error: error?.message })) + const chunkSize = Number(config.export?.chunkSize || DEFAULT_EXPORT_CHUNK_SIZE) + + const manifest = await ensureManifest(bucket, filename, callbackUrl, chunkSize) + if (manifest.status === 'completed') { + return createResponse({ filename, status: 'completed', parts: manifest.part }, undefined, 200) + } + + try { + await processAsyncDumpSlice(bucket, manifest, dataSource, config) + } catch (error: any) { + manifest.status = 'failed' + manifest.error = error?.message || String(error) + await writeManifest(bucket, manifest) + await notifyCallback(callbackUrl, { status: 'failed', filename, error: manifest.error }) + } - dataSource.executionContext?.waitUntil(job) - if (!dataSource.executionContext) await job + const latest = (await readManifest(bucket, filename)) || manifest + const continueUrl = new URL(request.url) + continueUrl.searchParams.set('async', 'true') + continueUrl.searchParams.set('filename', filename) - return createResponse({ filename, status: 'accepted' }, undefined, 202) + return createResponse( + { + filename, + status: latest.status, + parts: latest.part, + tableIndex: latest.tableIndex, + offset: latest.offset, + continueUrl: latest.status === 'running' ? continueUrl.toString() : undefined, + manifest: manifestKey(filename), + partsPrefix: `${filename}.parts/`, + }, + undefined, + latest.status === 'completed' && manifest.tables ? 200 : 202 + ) } export async function dumpDatabaseRoute(