fix: Replicate data from external source to internal source with a Plugin (#72)#219
Open
IbrahimLaeeq wants to merge 1 commit into
Open
fix: Replicate data from external source to internal source with a Plugin (#72)#219IbrahimLaeeq wants to merge 1 commit into
IbrahimLaeeq wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new Replicator plugin intended to pull append-only data from configured external sources into the internal Durable Object SQLite database, with admin management endpoints and tests/docs.
Changes:
- Registers
ReplicatorPluginin the default worker plugin list. - Adds the replicator plugin implementation, tests, README, and metadata.
- Adds a new
pnpm-workspace.yamlbuild-approval-related file.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
src/index.ts |
Enables the new Replicator plugin by default. |
pnpm-workspace.yaml |
Adds pnpm build approval configuration. |
plugins/replicator/README.md |
Documents replicator behavior, endpoints, and polling. |
plugins/replicator/meta.json |
Describes plugin-managed resources. |
plugins/replicator/index.ts |
Implements registration, sync logic, query builders, and admin routes. |
plugins/replicator/index.test.ts |
Adds unit coverage for helpers and core sync behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| params: undefined, | ||
| isRaw: false, | ||
| dataSource: externalDataSource, | ||
| config: this.config ?? ({ role: 'admin' } as any), |
Comment on lines
+264
to
+269
| const where = | ||
| table.last_value !== null && table.last_value !== undefined | ||
| ? ` WHERE ${trackingColumn} > ${this.quoteLiteral(table.last_value)}` | ||
| : '' | ||
|
|
||
| return `SELECT * FROM ${qualifiedName}${where} ORDER BY ${trackingColumn} ASC LIMIT ${Number(table.batch_size) || 1000}` |
Comment on lines
+220
to
+221
| opts.intervalSeconds ?? 300, | ||
| opts.batchSize ?? 1000, |
Comment on lines
+1
to
+3
| allowBuilds: | ||
| esbuild: set this to true or false | ||
| workerd: set this to true or false |
| tracking_column = excluded.tracking_column, | ||
| interval_seconds = excluded.interval_seconds, | ||
| batch_size = excluded.batch_size, | ||
| is_active = excluded.is_active |
Comment on lines
+297
to
+303
| const externalDataSource: DataSource = { | ||
| ...this.dataSource, | ||
| source: | ||
| 'connectionString' in this.dataSource.external | ||
| ? 'hyperdrive' | ||
| : 'external', | ||
| } |
| * each sync the plugin pulls rows where `tracking_column` is greater than the | ||
| * last value it observed and upserts them into the internal table. | ||
| */ | ||
| export class ReplicatorPlugin extends StarbasePlugin { |
| this.config = c?.get('config') | ||
| await this.init() | ||
|
|
||
| if (this.autoSyncOnRequest) { |
Comment on lines
+214
to
+222
| await this.dataSource.rpc.executeQuery({ | ||
| sql: SQL_QUERIES.UPSERT_TABLE, | ||
| params: [ | ||
| opts.table, | ||
| opts.schema ?? null, | ||
| opts.trackingColumn, | ||
| opts.intervalSeconds ?? 300, | ||
| opts.batchSize ?? 1000, | ||
| opts.isActive === false ? 0 : 1, |
Comment on lines
+334
to
+336
| for (const row of rows) { | ||
| const { sql, params } = this.buildUpsertQuery(table.table_name, row) | ||
| await this.dataSource.rpc.executeQuery({ sql, params }) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #72.
/claim #72
a
tmp_replicator_tablestable (matching thetmp_-prefixed convention of the cron/query-log plugins). Admin endpoints manage it:GET /replicator/tables– listPOST /replicator/tables– register/update a table (table,schema,trackingColumn,intervalSeconds,batchSize,isActive)DELETE /replicator/tables/:table– removePOST /replicator/sync?table=– trigger a sync (all tables or one)tracking_column(e.g.id/created_at); the plugin keeps alast_valuewatermark and only pullsWHERE tracking_column > last_value ORDER BY ... LIMIT batch_size.interval_seconds; withautoSyncOnRequest(default on) theregistermiddleware opportunistically syncs any table whose interval has elapsed, in the background viactx.waitUntil.POST /replicator/syncalso lets an external scheduler / the cron plugin drive cadence.Dialect-aware identifier quoting (backticks for MySQL, double quotes otherwise) and safe literal rendering for the watermark are factored into small public helpers.
plugins/replicator/index.test.ts— 21 tests covering identifier/literal quoting,buildSelectQuery,buildUpsertQuery,isDueinterval logic,registerTable, andsyncTable/sync(including watermark advancement and per-table error isolation).plugins/replicator/README.mdandmeta.json— docs and resource metadata, following the existing plugin layout.Verified against the repository's own test suite before submission.