From 3b38a818acf85f19b3b3ecd0466cb8a250dcfa46 Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Fri, 26 Jun 2026 18:08:35 -0600 Subject: [PATCH 01/13] feat(payload): prototype positional-cursor resume for chunk vtab MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cloudsync_payload_chunks could only resume a window from since>db_version, so a chunk boundary that lands inside a single committed db_version (or inside a fragmented oversized value) was not addressable — which is the whole reason the server stages the stream into a cloudsync_payload_spool table to page it out. This makes the vtab resumable by position instead. Add an optional positional cursor: hidden inputs resume_db_version, resume_seq, resume_frag_offset start the scan at (db_version, seq) inclusive and re-enter a mid-value fragment at a byte offset; new outputs next_db_version, next_seq, next_frag_offset and is_final report where the emitted chunk stopped. A stateless /check can then page the whole window with an O(1) seek per call (vs O(N^2) replay-from-since), no spool table, no server-side state. Legacy since>db_version callers (send path, spool fill) are unchanged: columns are appended and the positional branch only activates when resume_db_version is bound. Tiling is exact, not idempotent-overlap: (db_version, seq) is a unique total order (changes rowid = (db_version<<30)|seq), next_* names the row that did not fit or the exact byte already emitted, and the fragment plan is a deterministic function of the row, so a resumed fragment tiles identically. The drain-start cursor must seek exclusive-after the last applied change (see docs/internal design note) so the protocol never relies on changes-level idempotency to absorb a re-sent row. New unit test Payload Chunks Positional Resume drives a window mixing a db_version split across chunks with a value larger than the chunk budget, pages it one chunk per call via the cursor, and asserts byte-identity with a full-window scan (including a mid-fragment resume). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/sqlite/cloudsync_sqlite.c | 167 +++++++++++++++++++++++++++------- test/unit.c | 148 ++++++++++++++++++++++++++++++ 2 files changed, 283 insertions(+), 32 deletions(-) diff --git a/src/sqlite/cloudsync_sqlite.c b/src/sqlite/cloudsync_sqlite.c index ba7a150..bba0478 100644 --- a/src/sqlite/cloudsync_sqlite.c +++ b/src/sqlite/cloudsync_sqlite.c @@ -1144,6 +1144,14 @@ typedef struct { int value_header_len; const char *value_data; int64_t value_data_len; + // Positional-cursor outputs: the resume point AFTER the chunk currently held. + // These live in the per-scan reset region (after eof) so xFilter's bulk memset + // clears them. next_* is the (db_version, seq, frag_offset) a follow-up call + // passes back as resume_* to continue exactly where this chunk stopped. + int64_t next_dbv; + int64_t next_seq; + int64_t next_frag_offset; + bool is_final; } cloudsync_payload_chunks_cursor; static int payload_chunks_connect(sqlite3 *db, void *aux, int argc, const char *const *argv, sqlite3_vtab **vtab, char **err) { @@ -1151,7 +1159,13 @@ static int payload_chunks_connect(sqlite3 *db, void *aux, int argc, const char * int rc = sqlite3_declare_vtab(db, "CREATE TABLE x(payload BLOB, chunk_index INTEGER, payload_size INTEGER, rows INTEGER, " "db_version_min INTEGER, db_version_max INTEGER, watermark_db_version INTEGER, " - "since_db_version HIDDEN, site_id HIDDEN, until_db_version HIDDEN, exclude_filter_site_id HIDDEN)"); + "since_db_version HIDDEN, site_id HIDDEN, until_db_version HIDDEN, exclude_filter_site_id HIDDEN, " + // Positional-cursor outputs (cols 11..14): the resume point after the + // emitted chunk, plus a final-chunk flag. A stateless /check passes these + // back as the resume_* inputs (cols 15..17) to continue the drain without + // a spool table — O(1) seek per chunk instead of replaying from since. + "next_db_version INTEGER, next_seq INTEGER, next_frag_offset INTEGER, is_final INTEGER, " + "resume_db_version HIDDEN, resume_seq HIDDEN, resume_frag_offset HIDDEN)"); if (rc != SQLITE_OK) return rc; cloudsync_payload_chunks_vtab *p = sqlite3_malloc64(sizeof(*p)); if (!p) return SQLITE_NOMEM; @@ -1185,19 +1199,23 @@ static int payload_chunks_close(sqlite3_vtab_cursor *cursor) { static int payload_chunks_best_index(sqlite3_vtab *vtab, sqlite3_index_info *idxinfo) { UNUSED_PARAMETER(vtab); + // Assign argvIndex in a canonical hidden-column order so xFilter can read argv + // in a fixed order regardless of how SQLite presents constraints. idxNum bit k + // is set when handled_cols[k] is bound; xFilter reads argv in this same order. + // bit0=since_db_version(7) bit1=site_id(8) bit2=until_db_version(9) + // bit3=exclude_filter_site_id(10) bit4=resume_db_version(15) + // bit5=resume_seq(16) bit6=resume_frag_offset(17) + static const int handled_cols[] = {7, 8, 9, 10, 15, 16, 17}; int argv_index = 1; int idxnum = 0; - // Assign argvIndex in canonical hidden-column order (7..10) so xFilter can - // read argv in a fixed order regardless of how SQLite presents constraints. - // Hidden columns: 7=since_db_version, 8=site_id, 9=until_db_version, - // 10=exclude_filter_site_id. - for (int col = 7; col <= 10; ++col) { + for (size_t k = 0; k < sizeof(handled_cols) / sizeof(handled_cols[0]); ++k) { + int col = handled_cols[k]; for (int i = 0; i < idxinfo->nConstraint; ++i) { struct sqlite3_index_constraint *cn = &idxinfo->aConstraint[i]; if (!cn->usable || cn->op != SQLITE_INDEX_CONSTRAINT_EQ || cn->iColumn != col) continue; idxinfo->aConstraintUsage[i].argvIndex = argv_index++; idxinfo->aConstraintUsage[i].omit = 1; - idxnum |= (1 << (col - 7)); + idxnum |= (1 << k); break; // at most one constraint consumed per hidden column } } @@ -1249,6 +1267,33 @@ static int payload_chunks_plan_fragment(cloudsync_payload_chunks_cursor *c) { return SQLITE_OK; } +// Set up fragment state for the current source row (a single value larger than +// max_chunk_size) so emit_fragment can stream it. start_offset is the byte offset +// within the encoded value to resume from (0 when first reaching the value; +// >0 when a positional cursor resumes mid-value). frag_part is derived from the +// offset so the fragment's part index is consistent whether reached by streaming +// or by a seek. The plan (frag_target/frag_count) is a deterministic function of +// the row, so a resumed fragment tiles identically to a streamed one. +static int payload_chunks_begin_fragment(cloudsync_payload_chunks_cursor *c, int64_t start_offset) { + dbvalue_t *col_value = (dbvalue_t *)sqlite3_column_value(c->src, 3); + int type = database_value_type(col_value); + if (type != DBTYPE_TEXT && type != DBTYPE_BLOB) return SQLITE_TOOBIG; + int64_t raw_len = 0; + int header_len = cloudsync_payload_encoded_value_header(col_value, c->value_header, sizeof(c->value_header), &raw_len); + if (header_len <= 0) return SQLITE_ERROR; + c->value_header_len = header_len; + c->value_data = (const char *)database_value_blob(col_value); + c->value_data_len = raw_len; + c->frag_total = header_len + raw_len; + c->frag_offset = start_offset; + int rc = payload_chunks_plan_fragment(c); + if (rc != SQLITE_OK) return rc; + c->frag_part = (c->frag_target > 0) ? (int)(start_offset / c->frag_target) : 0; + c->frag_checksum = cloudsync_payload_encoded_value_checksum(col_value); + c->frag_active = true; + return SQLITE_OK; +} + static int payload_chunks_emit_fragment(cloudsync_payload_chunks_cursor *c) { cloudsync_context *data = c->vtab->data; if (c->payload) { cloudsync_memory_free(c->payload); c->payload = NULL; } @@ -1321,23 +1366,9 @@ static int payload_chunks_build_next(cloudsync_payload_chunks_cursor *c) { if ((int64_t)row_size + (int64_t)payload_header_size + CLOUDSYNC_PAYLOAD_CHUNK_SAFETY_MARGIN > max_size) { if (cloudsync_payload_context_nrows(payload) > 0) break; - dbvalue_t *col_value = (dbvalue_t *)rowv[3]; - int type = database_value_type(col_value); - if (type != DBTYPE_TEXT && type != DBTYPE_BLOB) { cloudsync_memory_free(payload); return SQLITE_TOOBIG; } - int64_t raw_len = 0; - int header_len = cloudsync_payload_encoded_value_header(col_value, c->value_header, sizeof(c->value_header), &raw_len); - if (header_len <= 0) { cloudsync_memory_free(payload); return SQLITE_ERROR; } - c->value_header_len = header_len; - c->value_data = (const char *)database_value_blob(col_value); - c->value_data_len = raw_len; - c->frag_total = header_len + raw_len; - c->frag_offset = 0; - c->frag_part = 0; - rc = payload_chunks_plan_fragment(c); - if (rc != SQLITE_OK) { cloudsync_memory_free(payload); return rc; } - c->frag_checksum = cloudsync_payload_encoded_value_checksum(col_value); - c->frag_active = true; cloudsync_memory_free(payload); + rc = payload_chunks_begin_fragment(c, 0); + if (rc != SQLITE_OK) return rc; return payload_chunks_emit_fragment(c); } @@ -1360,6 +1391,38 @@ static int payload_chunks_build_next(cloudsync_payload_chunks_cursor *c) { return SQLITE_OK; } +// Record the resume point a stateless caller passes back to continue after the +// chunk just built. Reads the source statement, which is positioned at the next +// unconsumed row (or the same row when a value is still mid-fragment). Must be +// called only after build_next produced a chunk (i.e. !eof). +static void payload_chunks_set_next_cursor(cloudsync_payload_chunks_cursor *c) { + if (c->frag_active) { + // Mid-value: resume the same row at the next byte offset. + c->next_dbv = sqlite3_column_int64(c->src, 5); + c->next_seq = sqlite3_column_int64(c->src, 8); + c->next_frag_offset = c->frag_offset; + c->is_final = false; + } else if (c->has_row) { + // Row boundary: the next chunk starts at the current (unconsumed) row. + c->next_dbv = sqlite3_column_int64(c->src, 5); + c->next_seq = sqlite3_column_int64(c->src, 8); + c->next_frag_offset = 0; + c->is_final = false; + } else { + // Stream exhausted: this was the last chunk of the window. + c->next_dbv = c->watermark; + c->next_seq = 0; + c->next_frag_offset = 0; + c->is_final = true; + } +} + +static int payload_chunks_advance(cloudsync_payload_chunks_cursor *c) { + int rc = payload_chunks_build_next(c); + if (rc == SQLITE_OK && !c->eof) payload_chunks_set_next_cursor(c); + return rc; +} + static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const char *idxstr, int argc, sqlite3_value **argv) { UNUSED_PARAMETER(idxstr); UNUSED_PARAMETER(argc); cloudsync_payload_chunks_cursor *c = (cloudsync_payload_chunks_cursor *)cursor; @@ -1378,6 +1441,13 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const bool site_id_given = false; int64_t until = 0; bool exclude = false; + // Positional resume cursor (cols 15..17): when resume_db_version is bound the + // scan starts at (resume_db_version, resume_seq) inclusive and the first chunk + // resumes a mid-value fragment at resume_frag_offset, instead of replaying the + // whole window from `since`. Lets a stateless /check page the stream with an + // O(1) seek per call and no spool table. + bool positional = false; + int64_t resume_dbv = 0, resume_seq = 0, resume_frag = 0; if (idxnum & 1) since = sqlite3_value_int64(argv[argi++]); if (idxnum & 2) { if (sqlite3_value_type(argv[argi]) != SQLITE_NULL) { @@ -1389,6 +1459,9 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const } if (idxnum & 4) until = sqlite3_value_int64(argv[argi++]); if (idxnum & 8) exclude = (sqlite3_value_int(argv[argi++]) != 0); + if (idxnum & 16) { resume_dbv = sqlite3_value_int64(argv[argi++]); positional = true; } + if (idxnum & 32) resume_seq = sqlite3_value_int64(argv[argi++]); + if (idxnum & 64) resume_frag = sqlite3_value_int64(argv[argi++]); // Resolve the site filter: // exclude=true -> all sites except filter_site_id (CHECK path); site required @@ -1421,24 +1494,50 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const } c->watermark = until; - char *sql = sqlite3_mprintf( - "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " - "FROM cloudsync_changes WHERE db_version>? AND site_id%s? AND db_version<=? ORDER BY db_version, seq ASC", - site_op); + // Window upper bound is always `until`. The lower bound is either the legacy + // exclusive `since` (db_version > since) or the inclusive positional cursor + // (db_version, seq) >= (resume_dbv, resume_seq). + char *sql; + if (positional) { + sql = sqlite3_mprintf( + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes WHERE db_version<=? AND site_id%s? AND " + "(db_version>? OR (db_version=? AND seq>=?)) ORDER BY db_version, seq ASC", + site_op); + } else { + sql = sqlite3_mprintf( + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes WHERE db_version>? AND site_id%s? AND db_version<=? ORDER BY db_version, seq ASC", + site_op); + } if (!sql) return SQLITE_NOMEM; int rc = sqlite3_prepare_v2(c->vtab->db, sql, -1, &c->src, NULL); sqlite3_free(sql); if (rc != SQLITE_OK) return rc; - sqlite3_bind_int64(c->src, 1, since); - sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT); - sqlite3_bind_int64(c->src, 3, until); + if (positional) { + sqlite3_bind_int64(c->src, 1, until); + sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT); + sqlite3_bind_int64(c->src, 3, resume_dbv); + sqlite3_bind_int64(c->src, 4, resume_dbv); + sqlite3_bind_int64(c->src, 5, resume_seq); + } else { + sqlite3_bind_int64(c->src, 1, since); + sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT); + sqlite3_bind_int64(c->src, 3, until); + } rc = payload_chunks_step_source(c); if (rc != SQLITE_OK) return rc; - return payload_chunks_build_next(c); + // Resuming inside a value that was fragmented across chunks: the first row is + // that value; re-establish the fragment plan and skip to resume_frag. + if (positional && resume_frag > 0 && c->has_row) { + rc = payload_chunks_begin_fragment(c, resume_frag); + if (rc != SQLITE_OK) return rc; + } + return payload_chunks_advance(c); } static int payload_chunks_next(sqlite3_vtab_cursor *cursor) { - return payload_chunks_build_next((cloudsync_payload_chunks_cursor *)cursor); + return payload_chunks_advance((cloudsync_payload_chunks_cursor *)cursor); } static int payload_chunks_eof(sqlite3_vtab_cursor *cursor) { @@ -1455,6 +1554,10 @@ static int payload_chunks_column(sqlite3_vtab_cursor *cursor, sqlite3_context *c case 4: sqlite3_result_int64(ctx, c->dbv_min); break; case 5: sqlite3_result_int64(ctx, c->dbv_max); break; case 6: sqlite3_result_int64(ctx, c->watermark); break; + case 11: sqlite3_result_int64(ctx, c->next_dbv); break; + case 12: sqlite3_result_int64(ctx, c->next_seq); break; + case 13: sqlite3_result_int64(ctx, c->next_frag_offset); break; + case 14: sqlite3_result_int(ctx, c->is_final ? 1 : 0); break; default: sqlite3_result_null(ctx); break; } return SQLITE_OK; diff --git a/test/unit.c b/test/unit.c index c23247c..88c4eff 100644 --- a/test/unit.c +++ b/test/unit.c @@ -12556,6 +12556,153 @@ bool do_test_payload_chunks_split_dbversion (bool print_result, bool cleanup_dat return result; } +// Proves the positional-cursor resume of cloudsync_payload_chunks: paging the +// window one chunk per call with an O(1) (db_version, seq, frag_offset) seek +// yields byte-identical chunks to a single full-window scan. The dataset mixes a +// db_version split across chunks (row-boundary resumes, incl. resumes landing +// INSIDE a single committed version that the old since>db_version cursor could not +// express) with a value larger than the chunk budget (mid-fragment resumes). +bool do_test_payload_chunks_positional_resume (bool print_result, bool cleanup_databases) { + sqlite3 *db = NULL; + sqlite3_stmt *stmt = NULL; + test_payload_chunk *base = NULL; int base_count = 0, base_cap = 0; + test_payload_chunk *pos = NULL; int pos_count = 0, pos_cap = 0; + int64_t watermark = -1; + bool result = false; + int rc = SQLITE_OK; + + time_t timestamp = time(NULL); + int saved_counter = test_counter++; + + db = do_create_database_file(0, timestamp, saved_counter); + if (!db) goto finalize; + rc = sqlite3_exec(db, + "CREATE TABLE split_test (id TEXT PRIMARY KEY, body TEXT DEFAULT '');" + "SELECT cloudsync_init('split_test');" + "SELECT cloudsync_set('payload_max_chunk_size', '262144');", + NULL, NULL, NULL); + if (rc != SQLITE_OK) goto finalize; + + // tx1: ~500 medium rows in one transaction -> one db_version split across + // several v2 chunks (row-boundary resumes within a single version). + rc = sqlite3_exec(db, + "WITH RECURSIVE c(i) AS (SELECT 1 UNION ALL SELECT i+1 FROM c WHERE i < 500) " + "INSERT INTO split_test(id, body) SELECT printf('row-%04d', i), hex(randomblob(700)) FROM c;", + NULL, NULL, NULL); + if (rc != SQLITE_OK) goto finalize; + + // tx2: one value far larger than the chunk budget -> v3 fragments across + // several chunks (mid-fragment resumes inside a single value). + rc = sqlite3_exec(db, + "INSERT INTO split_test(id, body) VALUES ('big', hex(randomblob(900000)));", + NULL, NULL, NULL); + if (rc != SQLITE_OK) goto finalize; + + // Baseline: every chunk of the whole window, in order, via the legacy scan. + rc = sqlite3_prepare_v2(db, + "SELECT payload, watermark_db_version FROM cloudsync_payload_chunks " + "WHERE since_db_version=0 ORDER BY chunk_index;", -1, &stmt, NULL); + if (rc != SQLITE_OK) goto finalize; + while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { + int len = sqlite3_column_bytes(stmt, 0); + const void *payload = sqlite3_column_blob(stmt, 0); + if (!payload || len <= 0) goto finalize; + watermark = sqlite3_column_int64(stmt, 1); + if (base_count == base_cap) { + int nc = base_cap ? base_cap * 2 : 8; + test_payload_chunk *t = realloc(base, sizeof(*t) * nc); + if (!t) goto finalize; + memset(t + base_cap, 0, sizeof(*t) * (nc - base_cap)); + base = t; base_cap = nc; + } + base[base_count].data = malloc(len); + if (!base[base_count].data) goto finalize; + memcpy(base[base_count].data, payload, len); + base[base_count].len = len; + ++base_count; + } + if (rc != SQLITE_DONE) goto finalize; + sqlite3_finalize(stmt); stmt = NULL; + + // Scenario must actually exercise multiple chunks (and thus resumes). + if (base_count < 4 || watermark <= 0) goto finalize; + + // Positional drain: one chunk per call, seeking to the cursor the previous + // chunk reported. until is the frozen watermark from the baseline. + rc = sqlite3_prepare_v2(db, + "SELECT payload, next_db_version, next_seq, next_frag_offset, is_final " + "FROM cloudsync_payload_chunks " + "WHERE until_db_version=?1 AND resume_db_version=?2 AND resume_seq=?3 AND resume_frag_offset=?4 " + "LIMIT 1;", -1, &stmt, NULL); + if (rc != SQLITE_OK) goto finalize; + + int64_t rdbv = 0, rseq = 0, rfrag = 0; + bool done = false; + bool saw_frag_resume = false; // a follow-up call actually resumed mid-value + // Hard cap guards against a resume bug looping forever. + for (int guard = 0; !done && guard <= base_count + 2; ++guard) { + if (rfrag > 0) saw_frag_resume = true; + sqlite3_reset(stmt); + sqlite3_bind_int64(stmt, 1, watermark); + sqlite3_bind_int64(stmt, 2, rdbv); + sqlite3_bind_int64(stmt, 3, rseq); + sqlite3_bind_int64(stmt, 4, rfrag); + rc = sqlite3_step(stmt); + if (rc != SQLITE_ROW) goto finalize; // every step before is_final must yield a chunk + int len = sqlite3_column_bytes(stmt, 0); + const void *payload = sqlite3_column_blob(stmt, 0); + if (!payload || len <= 0) goto finalize; + rdbv = sqlite3_column_int64(stmt, 1); + rseq = sqlite3_column_int64(stmt, 2); + rfrag = sqlite3_column_int64(stmt, 3); + done = sqlite3_column_int(stmt, 4) != 0; + if (pos_count == pos_cap) { + int nc = pos_cap ? pos_cap * 2 : 8; + test_payload_chunk *t = realloc(pos, sizeof(*t) * nc); + if (!t) goto finalize; + memset(t + pos_cap, 0, sizeof(*t) * (nc - pos_cap)); + pos = t; pos_cap = nc; + } + pos[pos_count].data = malloc(len); + if (!pos[pos_count].data) goto finalize; + memcpy(pos[pos_count].data, payload, len); + pos[pos_count].len = len; + ++pos_count; + } + sqlite3_finalize(stmt); stmt = NULL; + + // The positional drain must terminate exactly on is_final, reproduce the + // baseline chunk sequence byte-for-byte, and have actually exercised a + // mid-value (fragment) resume — not only row-boundary resumes. + if (!done || pos_count != base_count || !saw_frag_resume) goto finalize; + for (int i = 0; i < base_count; ++i) { + if (pos[i].len != base[i].len) goto finalize; + if (memcmp(pos[i].data, base[i].data, base[i].len) != 0) goto finalize; + } + + result = true; + +finalize: + if (!result && print_result) { + printf("do_test_payload_chunks_positional_resume error: %s (base=%d, pos=%d, watermark=%lld)\n", + db ? sqlite3_errmsg(db) : "no db", base_count, pos_count, (long long)watermark); + } + if (stmt) sqlite3_finalize(stmt); + test_payload_chunks_free(base, base_count); + test_payload_chunks_free(pos, pos_count); + if (db) close_db(db); + if (cleanup_databases) { + char path[256], walpath[300], shmpath[300]; + do_build_database_path(path, 0, timestamp, saved_counter); + snprintf(walpath, sizeof(walpath), "%s-wal", path); + snprintf(shmpath, sizeof(shmpath), "%s-shm", path); + file_delete_internal(path); + file_delete_internal(walpath); + file_delete_internal(shmpath); + } + return result; +} + // Exercises the server-side download spool: cloudsync_payload_spool_fill stages a // window's whole chunk stream once, and the /check path pages it out one chunk per // call. Verifies byte-identity with direct cloudsync_payload_chunks generation, @@ -13196,6 +13343,7 @@ int main (int argc, const char * argv[]) { result += test_report("Payload Chunks Large Values:", do_test_payload_chunks_large_values(print_result, cleanup_databases)); result += test_report("Payload Chunks Site Exclusion:", do_test_payload_chunks_site_exclusion(print_result, cleanup_databases)); result += test_report("Payload Chunks Split db_version:", do_test_payload_chunks_split_dbversion(print_result, cleanup_databases)); + result += test_report("Payload Chunks Positional Resume:", do_test_payload_chunks_positional_resume(print_result, cleanup_databases)); result += test_report("Payload Download Spool:", do_test_payload_spool(print_result, cleanup_databases)); // close local database From 889d2a0c95eb188eae24a5769e54d1eb47911fb2 Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Mon, 29 Jun 2026 15:26:50 -0600 Subject: [PATCH 02/13] feat(postgres): positional-cursor resume for cloudsync_payload_chunks SRF Mirror the SQLite vtab's positional cursor on the PostgreSQL SRF so the /check job can page the node one chunk per round-trip without the cloudsync_payload_spool table. Adds inputs resume_db_version, resume_seq, resume_frag_offset (start the scan at (db_version, seq) inclusive and re-enter a mid-value fragment at a byte offset) and outputs next_db_version, next_seq, next_frag_offset, is_final (where the emitted chunk stopped). The positional query reuses the (db_version > $ OR (db_version = $ AND seq >= $)) shape already used by payload_blob_checked's estimate, with an inclusive seq to match the vtab's exact tiling. Fragment setup is factored into payload_chunks_pg_begin_fragment so a streamed and a resumed fragment build identically; the next cursor is read from the buffered source row (or the same row mid-fragment), peeking one row ahead to set is_final. Legacy callers (send path, spool fill) are unchanged: new args default to NULL so the positional branch only activates when resume_db_version is bound, and the existing output columns keep their positions. New test 55_payload_chunks_positional_resume.sql resumes at every non-final chunk's reported cursor and asserts the fetched chunk equals the next chunk of a full-window scan byte-for-byte, including a mid-fragment resume. Verified against PostgreSQL 17; existing chunk/spool/fragment tests (52-54) still pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/postgresql/cloudsync.sql.in | 11 +- src/postgresql/cloudsync_postgresql.c | 157 +++++++++++++----- .../migrations/cloudsync--1.0--1.1.sql | 11 +- .../55_payload_chunks_positional_resume.sql | 88 ++++++++++ test/postgresql/full_test.sql | 1 + 5 files changed, 219 insertions(+), 49 deletions(-) create mode 100644 test/postgresql/55_payload_chunks_positional_resume.sql diff --git a/src/postgresql/cloudsync.sql.in b/src/postgresql/cloudsync.sql.in index 31bb994..3dd19f1 100644 --- a/src/postgresql/cloudsync.sql.in +++ b/src/postgresql/cloudsync.sql.in @@ -153,7 +153,10 @@ CREATE OR REPLACE FUNCTION cloudsync_payload_chunks( since_db_version bigint DEFAULT NULL, filter_site_id bytea DEFAULT NULL, until_db_version bigint DEFAULT NULL, - exclude_filter_site_id boolean DEFAULT false + exclude_filter_site_id boolean DEFAULT false, + resume_db_version bigint DEFAULT NULL, + resume_seq bigint DEFAULT NULL, + resume_frag_offset bigint DEFAULT NULL ) RETURNS TABLE ( payload bytea, @@ -162,7 +165,11 @@ RETURNS TABLE ( rows bigint, db_version_min bigint, db_version_max bigint, - watermark_db_version bigint + watermark_db_version bigint, + next_db_version bigint, + next_seq bigint, + next_frag_offset bigint, + is_final boolean ) AS 'MODULE_PATHNAME', 'cloudsync_payload_chunks' LANGUAGE C VOLATILE; diff --git a/src/postgresql/cloudsync_postgresql.c b/src/postgresql/cloudsync_postgresql.c index d402a52..3f62d2f 100644 --- a/src/postgresql/cloudsync_postgresql.c +++ b/src/postgresql/cloudsync_postgresql.c @@ -1213,6 +1213,46 @@ static bytea *payload_chunks_emit_pg_fragment(PayloadChunksState *st, cloudsync_ return result; } +// Set up fragment state for the currently-fetched oversized value so +// emit_pg_fragment can stream it. start_offset is the byte offset within the value +// to resume from (0 when first reaching it; >0 when a positional cursor resumes +// mid-value). frag_part is derived from the offset so a streamed and a resumed +// fragment carry the same part index. The plan (frag_target/frag_count) is a +// deterministic function of the row, so a resumed fragment tiles identically. +static void payload_chunks_pg_begin_fragment(PayloadChunksState *st, cloudsync_context *data, int64 start_offset) { + st->frag_total = VARSIZE_ANY_EXHDR(st->col_value); + st->frag_offset = start_offset; + st->frag_target = cloudsync_payload_fragment_data_size(data, + st->tbl, -1, + VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk), + st->col_name, -1, + st->col_version, st->db_version, + VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id), + st->cl, st->seq, + st->frag_total, 0, 1); + if (st->frag_target <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size"))); + for (int i = 0; i < CLOUDSYNC_PAYLOAD_FRAGMENT_SIZE_FIXPOINT_ITERATIONS; ++i) { + int count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target); + if (count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload requires too many fragments"))); + int planned = cloudsync_payload_fragment_data_size(data, + st->tbl, -1, + VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk), + st->col_name, -1, + st->col_version, st->db_version, + VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id), + st->cl, st->seq, + st->frag_total, count - 1, count); + if (planned <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size"))); + if (planned == st->frag_target) break; + st->frag_target = planned; + } + st->frag_count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target); + if (st->frag_count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("payload requires too many fragments"))); + st->frag_part = (st->frag_target > 0) ? (int)(start_offset / st->frag_target) : 0; + st->frag_checksum = pk_checksum(VARDATA_ANY(st->col_value), (size_t)st->frag_total); + st->frag_active = true; +} + static bytea *payload_chunks_build_pg_next(PayloadChunksState *st, cloudsync_context *data, int64 *rows, int64 *dbv_min, int64 *dbv_max) { *rows = *dbv_min = *dbv_max = 0; @@ -1237,37 +1277,7 @@ static bytea *payload_chunks_build_pg_next(PayloadChunksState *st, cloudsync_con if ((int64)row_size + (int64)header_size + CLOUDSYNC_PAYLOAD_CHUNK_SAFETY_MARGIN > st->max_size) { if (cloudsync_payload_context_nrows(payload) > 0) break; - st->frag_total = VARSIZE_ANY_EXHDR(st->col_value); - st->frag_offset = 0; - st->frag_part = 0; - st->frag_target = cloudsync_payload_fragment_data_size(data, - st->tbl, -1, - VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk), - st->col_name, -1, - st->col_version, st->db_version, - VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id), - st->cl, st->seq, - st->frag_total, 0, 1); - if (st->frag_target <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size"))); - for (int i = 0; i < CLOUDSYNC_PAYLOAD_FRAGMENT_SIZE_FIXPOINT_ITERATIONS; ++i) { - int count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target); - if (count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload requires too many fragments"))); - int planned = cloudsync_payload_fragment_data_size(data, - st->tbl, -1, - VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk), - st->col_name, -1, - st->col_version, st->db_version, - VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id), - st->cl, st->seq, - st->frag_total, count - 1, count); - if (planned <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size"))); - if (planned == st->frag_target) break; - st->frag_target = planned; - } - st->frag_count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target); - if (st->frag_count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("payload requires too many fragments"))); - st->frag_checksum = pk_checksum(VARDATA_ANY(st->col_value), (size_t)st->frag_total); - st->frag_active = true; + payload_chunks_pg_begin_fragment(st, data, 0); cloudsync_memory_free(payload); return payload_chunks_emit_pg_fragment(st, data, rows, dbv_min, dbv_max); } @@ -1327,6 +1337,15 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) { int64 since = PG_ARGISNULL(0) ? dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SEND_DBVERSION) : PG_GETARG_INT64(0); bytea *site_id = PG_ARGISNULL(1) ? NULL : PG_GETARG_BYTEA_PP(1); bool exclude = PG_ARGISNULL(3) ? false : PG_GETARG_BOOL(3); + // Positional resume cursor: when resume_db_version is given the scan starts + // at (resume_db_version, resume_seq) inclusive and the first chunk resumes a + // mid-value fragment at resume_frag_offset, instead of replaying from `since`. + // Lets the /check job page one chunk per round-trip with an O(1) seek and no + // spool table. + bool positional = !PG_ARGISNULL(4); + int64 resume_dbv = PG_ARGISNULL(4) ? 0 : PG_GETARG_INT64(4); + int64 resume_seq = PG_ARGISNULL(5) ? 0 : PG_GETARG_INT64(5); + int64 resume_frag = PG_ARGISNULL(6) ? 0 : PG_GETARG_INT64(6); // Site filter resolution: // exclude=true -> all sites except filter_site_id (CHECK path); site required // filter given -> only that site @@ -1361,23 +1380,53 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) { StringInfoData q; initStringInfo(&q); - if (exclude) { - // $1=since (into changes_select), $2=site to exclude, $3=until watermark - appendStringInfoString(&q, - "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " - "FROM cloudsync_changes_select($1,NULL) WHERE site_id <> $2 AND db_version <= $3 ORDER BY db_version, seq ASC"); + if (positional) { + // Inclusive positional lower bound (db_version, seq) >= (resume_dbv, + // resume_seq) within db_version <= until. $1=site, $2=until, $3=resume_dbv, + // $4=resume_seq. (seq >= matches the SQLite vtab's exact tiling; contrast + // with payload_blob_checked's exclusive seq > for its last-applied cursor.) + if (exclude) { + appendStringInfoString(&q, + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes_select(0,NULL) " + "WHERE site_id <> $1 AND db_version <= $2 AND (db_version > $3 OR (db_version = $3 AND seq >= $4)) " + "ORDER BY db_version, seq ASC"); + } else { + appendStringInfoString(&q, + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes_select(0,$1) " + "WHERE db_version <= $2 AND (db_version > $3 OR (db_version = $3 AND seq >= $4)) " + "ORDER BY db_version, seq ASC"); + } + Oid argtypes[4] = {BYTEAOID, INT8OID, INT8OID, INT8OID}; + Datum values[4] = {PointerGetDatum(site_id), Int64GetDatum(until), Int64GetDatum(resume_dbv), Int64GetDatum(resume_seq)}; + char nulls[4] = {' ', ' ', ' ', ' '}; + st->portal = SPI_cursor_open_with_args(NULL, q.data, 4, argtypes, values, nulls, true, 0); } else { - appendStringInfoString(&q, - "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " - "FROM cloudsync_changes_select($1,$2) WHERE db_version <= $3 ORDER BY db_version, seq ASC"); + if (exclude) { + // $1=since (into changes_select), $2=site to exclude, $3=until watermark + appendStringInfoString(&q, + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes_select($1,NULL) WHERE site_id <> $2 AND db_version <= $3 ORDER BY db_version, seq ASC"); + } else { + appendStringInfoString(&q, + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes_select($1,$2) WHERE db_version <= $3 ORDER BY db_version, seq ASC"); + } + Oid argtypes[3] = {INT8OID, BYTEAOID, INT8OID}; + Datum values[3] = {Int64GetDatum(since), PointerGetDatum(site_id), Int64GetDatum(until)}; + char nulls[3] = {' ', ' ', ' '}; + st->portal = SPI_cursor_open_with_args(NULL, q.data, 3, argtypes, values, nulls, true, 0); } - Oid argtypes[3] = {INT8OID, BYTEAOID, INT8OID}; - Datum values[3] = {Int64GetDatum(since), PointerGetDatum(site_id), Int64GetDatum(until)}; - char nulls[3] = {' ', ' ', ' '}; - st->portal = SPI_cursor_open_with_args(NULL, q.data, 3, argtypes, values, nulls, true, 0); pfree(q.data); if (!st->portal) ereport(ERROR, (errmsg("SPI_cursor_open failed"))); + // Resuming inside a value that was fragmented across chunks: the first row is + // that value; re-establish the fragment plan and skip to resume_frag. + if (positional && resume_frag > 0 && payload_chunks_fetch_current(st)) { + payload_chunks_pg_begin_fragment(st, data, resume_frag); + } + TupleDesc outdesc; if (get_call_result_type(fcinfo, NULL, &outdesc) != TYPEFUNC_COMPOSITE) ereport(ERROR, (errmsg("return type must be composite"))); st->outdesc = BlessTupleDesc(outdesc); @@ -1400,8 +1449,22 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) { SRF_RETURN_DONE(funcctx); } - Datum outvals[7]; - bool outnulls[7] = {false,false,false,false,false,false,false}; + // Resume point a stateless caller passes back to continue after this chunk. + // frag_active -> same value, next byte offset; otherwise peek the next row + // (buffered for the following build call): a row -> its (db_version, seq); + // end of stream -> this was the final chunk. + int64 next_dbv, next_seq, next_frag; + bool is_final; + if (st->frag_active) { + next_dbv = st->db_version; next_seq = st->seq; next_frag = st->frag_offset; is_final = false; + } else if (payload_chunks_fetch_current(st)) { + next_dbv = st->db_version; next_seq = st->seq; next_frag = 0; is_final = false; + } else { + next_dbv = st->watermark; next_seq = 0; next_frag = 0; is_final = true; + } + + Datum outvals[11]; + bool outnulls[11] = {false,false,false,false,false,false,false,false,false,false,false}; outvals[0] = PointerGetDatum(payload); outvals[1] = Int64GetDatum(st->chunk_index++); outvals[2] = Int64GetDatum(VARSIZE_ANY_EXHDR(payload)); @@ -1409,6 +1472,10 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) { outvals[4] = Int64GetDatum(dbv_min); outvals[5] = Int64GetDatum(dbv_max); outvals[6] = Int64GetDatum(st->watermark); + outvals[7] = Int64GetDatum(next_dbv); + outvals[8] = Int64GetDatum(next_seq); + outvals[9] = Int64GetDatum(next_frag); + outvals[10] = BoolGetDatum(is_final); HeapTuple outtup = heap_form_tuple(st->outdesc, outvals, outnulls); SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(outtup)); } diff --git a/src/postgresql/migrations/cloudsync--1.0--1.1.sql b/src/postgresql/migrations/cloudsync--1.0--1.1.sql index 9239722..5566999 100644 --- a/src/postgresql/migrations/cloudsync--1.0--1.1.sql +++ b/src/postgresql/migrations/cloudsync--1.0--1.1.sql @@ -13,7 +13,10 @@ CREATE OR REPLACE FUNCTION cloudsync_payload_chunks( since_db_version bigint DEFAULT NULL, filter_site_id bytea DEFAULT NULL, until_db_version bigint DEFAULT NULL, - exclude_filter_site_id boolean DEFAULT false + exclude_filter_site_id boolean DEFAULT false, + resume_db_version bigint DEFAULT NULL, + resume_seq bigint DEFAULT NULL, + resume_frag_offset bigint DEFAULT NULL ) RETURNS TABLE ( payload bytea, @@ -22,7 +25,11 @@ RETURNS TABLE ( rows bigint, db_version_min bigint, db_version_max bigint, - watermark_db_version bigint + watermark_db_version bigint, + next_db_version bigint, + next_seq bigint, + next_frag_offset bigint, + is_final boolean ) AS 'MODULE_PATHNAME', 'cloudsync_payload_chunks' LANGUAGE C VOLATILE; diff --git a/test/postgresql/55_payload_chunks_positional_resume.sql b/test/postgresql/55_payload_chunks_positional_resume.sql new file mode 100644 index 0000000..6bd482f --- /dev/null +++ b/test/postgresql/55_payload_chunks_positional_resume.sql @@ -0,0 +1,88 @@ +-- Payload chunks positional-cursor resume +-- +-- Proves the positional cursor on cloudsync_payload_chunks tiles a window exactly: +-- resuming at any chunk's (next_db_version, next_seq, next_frag_offset) reproduces +-- the following chunk byte-for-byte, including boundaries that fall inside a single +-- committed db_version and inside a value larger than the chunk budget. No spool +-- table, no idempotent overlap. + +\set testid '55-positional' +\ir helper_test_init.sql + +\connect postgres +\ir helper_psql_conn_setup.sql +DROP DATABASE IF EXISTS cloudsync_test_55_positional; +CREATE DATABASE cloudsync_test_55_positional; + +\connect cloudsync_test_55_positional +\ir helper_psql_conn_setup.sql +CREATE EXTENSION IF NOT EXISTS cloudsync; +CREATE TABLE split_test (id TEXT PRIMARY KEY, body BYTEA DEFAULT '\x'::bytea); +SELECT cloudsync_init('split_test', 'CLS', 1) AS _init \gset +SELECT cloudsync_set('payload_max_chunk_size', '262144'); + +-- tx1: many medium incompressible rows in one statement -> a single db_version +-- split across several chunks (row-boundary resumes, incl. resumes landing INSIDE +-- one committed version that the legacy since>db_version cursor could not express). +INSERT INTO split_test(id, body) +SELECT format('row-%s', lpad(i::text, 4, '0')), + decode((SELECT string_agg(md5((i * 1000 + j)::text), '') FROM generate_series(1, 88) AS s(j)), 'hex') +FROM generate_series(1, 500) AS g(i); + +-- tx2: one value larger than the chunk budget -> v3 fragments (mid-fragment resumes). +INSERT INTO split_test(id, body) +VALUES ('big', decode((SELECT string_agg(md5(j::text), '') FROM generate_series(1, 30000) AS s(j)), 'hex')); + +-- For each non-final chunk of the full-window scan, resume at its reported cursor +-- and fetch the first chunk; it must equal the next chunk of the full scan. The +-- correlated SRF subquery uses ORDER BY ... LIMIT 1 so each resume call drains +-- fully (no early-terminated value-per-call SRF). +WITH base AS ( + SELECT chunk_index, payload, next_db_version, next_seq, next_frag_offset, is_final, watermark_db_version + FROM cloudsync_payload_chunks(0, cloudsync_siteid(), NULL, false) +), +resumed AS ( + SELECT b.chunk_index, + (b.next_frag_offset > 0) AS is_frag_boundary, + (SELECT r.payload + FROM cloudsync_payload_chunks(NULL, cloudsync_siteid(), + (SELECT max(watermark_db_version) FROM base), false, + b.next_db_version, b.next_seq, b.next_frag_offset) r + ORDER BY r.chunk_index LIMIT 1) AS next_payload + FROM base b + WHERE NOT b.is_final +) +SELECT + (SELECT count(*) FROM base) AS base_count, + coalesce((SELECT bool_and(r.next_payload = b2.payload) + FROM resumed r JOIN base b2 ON b2.chunk_index = r.chunk_index + 1), false) AS chunks_identical, + coalesce((SELECT bool_or(is_frag_boundary) FROM resumed), false) AS saw_frag +\gset + +SELECT (:base_count::int >= 4) AS enough_chunks \gset +\if :enough_chunks +\echo [PASS] (:testid) window produced multiple chunks (:base_count) +\else +\echo [FAIL] (:testid) expected a multi-chunk window, got :base_count +SELECT (:fail::int + 1) AS fail \gset +\endif + +\if :chunks_identical +\echo [PASS] (:testid) positional resume reproduced every following chunk byte-for-byte +\else +\echo [FAIL] (:testid) a positional resume did not reproduce the next chunk +SELECT (:fail::int + 1) AS fail \gset +\endif + +\if :saw_frag +\echo [PASS] (:testid) mid-fragment resume exercised +\else +\echo [FAIL] (:testid) mid-fragment resume not exercised +SELECT (:fail::int + 1) AS fail \gset +\endif + +\ir helper_test_cleanup.sql +\if :should_cleanup +\connect postgres +DROP DATABASE IF EXISTS cloudsync_test_55_positional; +\endif diff --git a/test/postgresql/full_test.sql b/test/postgresql/full_test.sql index 13509db..c6f38b7 100644 --- a/test/postgresql/full_test.sql +++ b/test/postgresql/full_test.sql @@ -62,6 +62,7 @@ \ir 52_payload_chunks.sql \ir 53_payload_blob_checked_pg_try.sql \ir 54_payload_chunks_fragment_state.sql +\ir 55_payload_chunks_positional_resume.sql -- 'Test summary' \echo '\nTest summary:' From 4a69a512c0fa0946231d87cd94bd9497f84e7bab Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Mon, 29 Jun 2026 15:49:59 -0600 Subject: [PATCH 03/13] test(postgres): add drain+apply round-trip to positional-resume test Part 1 of test 55 proves the positional cursor produces byte-identical chunks to a full scan. Part 2 now closes the loop end-to-end: a plpgsql helper drains the whole window the way the /check job will (legacy exclusive since=0, then the positional cursor one chunk per call, ORDER BY chunk_index LIMIT 1 so each value-per-call SRF runs to completion), the drained stream is applied to a fresh database, and the receiver's table content is hashed and compared to the source. This validates the real path (positional drain -> apply -> faithful replica), not just byte-identity against a baseline. Verified on PostgreSQL 17: 501 rows reproduced, hashes match. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../55_payload_chunks_positional_resume.sql | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/test/postgresql/55_payload_chunks_positional_resume.sql b/test/postgresql/55_payload_chunks_positional_resume.sql index 6bd482f..c1c1f95 100644 --- a/test/postgresql/55_payload_chunks_positional_resume.sql +++ b/test/postgresql/55_payload_chunks_positional_resume.sql @@ -5,6 +5,10 @@ -- the following chunk byte-for-byte, including boundaries that fall inside a single -- committed db_version and inside a value larger than the chunk budget. No spool -- table, no idempotent overlap. +-- +-- Part 2 is end-to-end: drain the whole window the way the /check job will (one +-- chunk per call via the positional cursor), apply that stream to a fresh database, +-- and assert the receiver's table content hashes identically to the source. \set testid '55-positional' \ir helper_test_init.sql @@ -12,7 +16,9 @@ \connect postgres \ir helper_psql_conn_setup.sql DROP DATABASE IF EXISTS cloudsync_test_55_positional; +DROP DATABASE IF EXISTS cloudsync_test_55_positional_dst; CREATE DATABASE cloudsync_test_55_positional; +CREATE DATABASE cloudsync_test_55_positional_dst; \connect cloudsync_test_55_positional \ir helper_psql_conn_setup.sql @@ -81,8 +87,78 @@ SELECT (:fail::int + 1) AS fail \gset SELECT (:fail::int + 1) AS fail \gset \endif +-- Part 2: end-to-end drain + apply round-trip. +-- +-- Drain the window exactly as the /check job will: start with the legacy +-- exclusive cursor (since=0), then step the positional cursor one chunk per call, +-- collecting payloads in drain order. ORDER BY chunk_index LIMIT 1 forces each +-- value-per-call SRF to run to completion (no early-terminated cursor). The drained +-- chunks are returned hex-joined so they can cross \connect into the receiver DB. +CREATE OR REPLACE FUNCTION _positional_drain_hex() RETURNS text LANGUAGE plpgsql AS $$ +DECLARE + rdbv bigint; rseq bigint; rfrag bigint; wm bigint := 0; + rec record; parts text[] := '{}'; guard int := 0; +BEGIN + LOOP + guard := guard + 1; + IF guard > 100000 THEN RAISE EXCEPTION 'positional drain did not terminate'; END IF; + IF wm = 0 THEN + SELECT * INTO rec FROM cloudsync_payload_chunks(0, cloudsync_siteid(), NULL, false) + ORDER BY chunk_index LIMIT 1; + IF NOT FOUND THEN EXIT; END IF; + wm := rec.watermark_db_version; + ELSE + SELECT * INTO rec FROM cloudsync_payload_chunks(NULL, cloudsync_siteid(), wm, false, rdbv, rseq, rfrag) + ORDER BY chunk_index LIMIT 1; + IF NOT FOUND THEN EXIT; END IF; + END IF; + parts := array_append(parts, encode(rec.payload, 'hex')); + rdbv := rec.next_db_version; rseq := rec.next_seq; rfrag := rec.next_frag_offset; + EXIT WHEN rec.is_final; + END LOOP; + RETURN array_to_string(parts, ','); +END $$; + +SELECT _positional_drain_hex() AS chunks_hex \gset +SELECT + md5(string_agg(id || ':' || encode(body, 'hex'), '|' ORDER BY id)) AS src_hash, + count(*) AS src_count +FROM split_test \gset + +\connect cloudsync_test_55_positional_dst +\ir helper_psql_conn_setup.sql +CREATE EXTENSION IF NOT EXISTS cloudsync; +CREATE TABLE split_test (id TEXT PRIMARY KEY, body BYTEA DEFAULT '\x'::bytea); +SELECT cloudsync_init('split_test', 'CLS', 1) AS _init_dst \gset +SELECT cloudsync_set('payload_max_chunk_size', '262144'); + +-- Reconstitute the drained chunks and apply them (reverse order on purpose: apply +-- must be order-independent and reassemble fragments regardless). +CREATE TEMP TABLE chunk_transport(ord int, payload bytea); +INSERT INTO chunk_transport(ord, payload) +SELECT ord::int, decode(chunk_hex, 'hex') +FROM unnest(string_to_array(:'chunks_hex', ',')) WITH ORDINALITY AS t(chunk_hex, ord); + +SELECT coalesce(sum(cloudsync_payload_apply(payload)), 0) AS applied_rows +FROM (SELECT payload FROM chunk_transport ORDER BY ord DESC) AS ordered \gset + +SELECT + md5(string_agg(id || ':' || encode(body, 'hex'), '|' ORDER BY id)) AS dst_hash, + count(*) AS dst_count +FROM split_test \gset + +SELECT (:'dst_hash' = :'src_hash' AND :dst_count::int = :src_count::int + AND :dst_count::int > 0) AS roundtrip_ok \gset +\if :roundtrip_ok +\echo [PASS] (:testid) positional drain applied to a fresh database reproduces the source (:dst_count rows) +\else +\echo [FAIL] (:testid) drain/apply mismatch (src_count=:src_count dst_count=:dst_count hashes :'src_hash' vs :'dst_hash') +SELECT (:fail::int + 1) AS fail \gset +\endif + \ir helper_test_cleanup.sql \if :should_cleanup \connect postgres DROP DATABASE IF EXISTS cloudsync_test_55_positional; +DROP DATABASE IF EXISTS cloudsync_test_55_positional_dst; \endif From d3024127502ac9ccc70ee9644e8f08819aef8043 Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Mon, 29 Jun 2026 16:02:45 -0600 Subject: [PATCH 04/13] test(sqlite): add drain+apply round-trip to positional-resume unit test Mirror the PostgreSQL test's end-to-end check on the SQLite side: after asserting the positional drain reproduces the full-scan chunks byte-for-byte, apply that drained stream to a fresh receiver database and assert its split_test content matches the source via test_split_tables_equal. Chunks are applied in reverse drain order so the apply path must reassemble v3 fragments and merge rows independent of transport order. Validates positional drain -> apply -> faithful replica, the real path the /check job will use. All unit tests pass, no leaks. Co-Authored-By: Claude Opus 4.8 (1M context) --- test/unit.c | 50 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 43 insertions(+), 7 deletions(-) diff --git a/test/unit.c b/test/unit.c index 88c4eff..e77e227 100644 --- a/test/unit.c +++ b/test/unit.c @@ -12562,9 +12562,14 @@ bool do_test_payload_chunks_split_dbversion (bool print_result, bool cleanup_dat // db_version split across chunks (row-boundary resumes, incl. resumes landing // INSIDE a single committed version that the old since>db_version cursor could not // express) with a value larger than the chunk budget (mid-fragment resumes). +// Part 2 is end-to-end: the positionally-drained stream is applied to a fresh +// receiver and its table content is compared to the source (drain -> apply -> +// faithful replica), the real path the /check job will use. bool do_test_payload_chunks_positional_resume (bool print_result, bool cleanup_databases) { sqlite3 *db = NULL; + sqlite3 *db2 = NULL; sqlite3_stmt *stmt = NULL; + sqlite3_stmt *apply = NULL; test_payload_chunk *base = NULL; int base_count = 0, base_cap = 0; test_payload_chunk *pos = NULL; int pos_count = 0, pos_cap = 0; int64_t watermark = -1; @@ -12680,6 +12685,33 @@ bool do_test_payload_chunks_positional_resume (bool print_result, bool cleanup_d if (memcmp(pos[i].data, base[i].data, base[i].len) != 0) goto finalize; } + // End-to-end: apply the positionally-drained stream to a fresh receiver and + // assert its table content matches the source. This exercises the real /check + // path (positional drain -> apply -> faithful replica), not just byte-identity. + db2 = do_create_database_file(1, timestamp, saved_counter); + if (!db2) goto finalize; + rc = sqlite3_exec(db2, + "CREATE TABLE split_test (id TEXT PRIMARY KEY, body TEXT DEFAULT '');" + "SELECT cloudsync_init('split_test');", + NULL, NULL, NULL); + if (rc != SQLITE_OK) goto finalize; + + rc = sqlite3_prepare_v2(db2, "SELECT cloudsync_payload_apply(?);", -1, &apply, NULL); + if (rc != SQLITE_OK) goto finalize; + // Apply in reverse drain order: apply must reassemble v3 fragments and merge + // rows independent of transport order. + for (int i = pos_count - 1; i >= 0; --i) { + rc = sqlite3_bind_blob(apply, 1, pos[i].data, pos[i].len, SQLITE_STATIC); + if (rc != SQLITE_OK) goto finalize; + rc = sqlite3_step(apply); + if (rc != SQLITE_ROW) goto finalize; + sqlite3_reset(apply); + sqlite3_clear_bindings(apply); + } + sqlite3_finalize(apply); apply = NULL; + + if (!test_split_tables_equal(db, db2)) goto finalize; + result = true; finalize: @@ -12688,17 +12720,21 @@ bool do_test_payload_chunks_positional_resume (bool print_result, bool cleanup_d db ? sqlite3_errmsg(db) : "no db", base_count, pos_count, (long long)watermark); } if (stmt) sqlite3_finalize(stmt); + if (apply) sqlite3_finalize(apply); test_payload_chunks_free(base, base_count); test_payload_chunks_free(pos, pos_count); if (db) close_db(db); + if (db2) close_db(db2); if (cleanup_databases) { - char path[256], walpath[300], shmpath[300]; - do_build_database_path(path, 0, timestamp, saved_counter); - snprintf(walpath, sizeof(walpath), "%s-wal", path); - snprintf(shmpath, sizeof(shmpath), "%s-shm", path); - file_delete_internal(path); - file_delete_internal(walpath); - file_delete_internal(shmpath); + for (int i = 0; i < 2; ++i) { + char path[256], walpath[300], shmpath[300]; + do_build_database_path(path, i, timestamp, saved_counter); + snprintf(walpath, sizeof(walpath), "%s-wal", path); + snprintf(shmpath, sizeof(shmpath), "%s-shm", path); + file_delete_internal(path); + file_delete_internal(walpath); + file_delete_internal(shmpath); + } } return result; } From ecadfaedf163ce27c25bfd5c759af80d7196988c Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Mon, 29 Jun 2026 20:23:32 -0600 Subject: [PATCH 05/13] test(bench): add local positional /check drain benchmark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit chunk_bench builds a window of N chunks and times paging the whole window one chunk per call via the positional cursor on cloudsync_payload_chunks, reporting wall time, per-chunk cost and throughput. Local-only (loads the built dylib, no network). Env-tunable rows/row_bytes/txns/repeats/chunk_size (TXNS splits the rows across db_versions). Lets the drain's computational growth be tracked — e.g. to confirm a future indexed (db_version, seq) seek flattens it from O(N^2) to O(N). Co-Authored-By: Claude Opus 4.8 (1M context) --- test/chunk_bench.c | 177 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 test/chunk_bench.c diff --git a/test/chunk_bench.c b/test/chunk_bench.c new file mode 100644 index 0000000..4321ac2 --- /dev/null +++ b/test/chunk_bench.c @@ -0,0 +1,177 @@ +// +// chunk_bench.c +// cloudsync +// +// Local-only benchmark for the positional /check drain: build a window of N +// chunks and time paging the whole window one chunk per call via the +// (resume_db_version, resume_seq, resume_frag_offset) cursor on +// cloudsync_payload_chunks. Reports wall time and per-chunk cost so the +// computational growth of the drain (currently O(N^2): each resume re-scans +// cloudsync_changes) can be tracked — e.g. to confirm a future indexed +// (db_version, seq) seek flattens it to O(N). +// +// Env: CHUNK_BENCH_ROWS (default 400), CHUNK_BENCH_ROW_BYTES (default 60000), +// CHUNK_BENCH_TXNS (default 1; rows split across this many db_versions), +// CHUNK_BENCH_REPEATS (default 5), CHUNK_BENCH_CHUNK_SIZE (default 262144). +// + +#include +#include +#include +#include +#include +#include "sqlite3.h" + +#define DB_PATH "dist/chunk-bench.sqlite" +#define EXT_PATH "./dist/cloudsync" + +static double monotonic_ms(void) { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return ((double)ts.tv_sec * 1000.0) + ((double)ts.tv_nsec / 1000000.0); +} + +static int env_int(const char *name, int dflt) { + const char *v = getenv(name); + if (!v || !*v) return dflt; + char *end = NULL; + long p = strtol(v, &end, 10); + if (!end || *end != '\0' || p <= 0) return dflt; + return (int)p; +} + +static int db_exec(sqlite3 *db, const char *sql) { + char *err = NULL; + int rc = sqlite3_exec(db, sql, NULL, NULL, &err); + if (rc != SQLITE_OK) { + fprintf(stderr, "exec failed: %s: %s\n", sql, err ? err : sqlite3_errmsg(db)); + sqlite3_free(err); + } + return rc; +} + +// Drain the whole window via the positional cursor, one chunk per query. Returns +// the chunk count and accumulates total payload bytes touched into *bytes. +static int drain_positional(sqlite3 *db, int *chunks_out, long long *bytes_out) { + const char *first_sql = + "SELECT payload, next_db_version, next_seq, next_frag_offset, is_final, watermark_db_version " + "FROM cloudsync_payload_chunks WHERE since_db_version=0 LIMIT 1;"; + const char *resume_sql = + "SELECT payload, next_db_version, next_seq, next_frag_offset, is_final " + "FROM cloudsync_payload_chunks " + "WHERE until_db_version=?1 AND resume_db_version=?2 AND resume_seq=?3 AND resume_frag_offset=?4 LIMIT 1;"; + sqlite3_stmt *first = NULL, *resume = NULL; + int rc = sqlite3_prepare_v2(db, first_sql, -1, &first, NULL); + if (rc != SQLITE_OK) goto done; + rc = sqlite3_prepare_v2(db, resume_sql, -1, &resume, NULL); + if (rc != SQLITE_OK) goto done; + + int chunks = 0; + long long bytes = 0; + long long watermark = 0, rdbv = 0, rseq = 0, rfrag = 0; + bool is_final = false; + + rc = sqlite3_step(first); + if (rc == SQLITE_ROW) { + bytes += sqlite3_column_bytes(first, 0); + rdbv = sqlite3_column_int64(first, 1); + rseq = sqlite3_column_int64(first, 2); + rfrag = sqlite3_column_int64(first, 3); + is_final = sqlite3_column_int(first, 4) != 0; + watermark = sqlite3_column_int64(first, 5); + chunks++; + } else if (rc == SQLITE_DONE) { + rc = SQLITE_OK; + goto done; // empty window + } else { + goto done; + } + + while (!is_final) { + sqlite3_reset(resume); + sqlite3_bind_int64(resume, 1, watermark); + sqlite3_bind_int64(resume, 2, rdbv); + sqlite3_bind_int64(resume, 3, rseq); + sqlite3_bind_int64(resume, 4, rfrag); + rc = sqlite3_step(resume); + if (rc != SQLITE_ROW) { if (rc == SQLITE_DONE) rc = SQLITE_OK; break; } + bytes += sqlite3_column_bytes(resume, 0); + rdbv = sqlite3_column_int64(resume, 1); + rseq = sqlite3_column_int64(resume, 2); + rfrag = sqlite3_column_int64(resume, 3); + is_final = sqlite3_column_int(resume, 4) != 0; + chunks++; + } + rc = SQLITE_OK; + *chunks_out = chunks; + *bytes_out = bytes; + +done: + if (first) sqlite3_finalize(first); + if (resume) sqlite3_finalize(resume); + return rc; +} + +int main(void) { + int rows = env_int("CHUNK_BENCH_ROWS", 400); + int row_bytes = env_int("CHUNK_BENCH_ROW_BYTES", 60000); + int repeats = env_int("CHUNK_BENCH_REPEATS", 5); + int chunk_size = env_int("CHUNK_BENCH_CHUNK_SIZE", 262144); + + remove(DB_PATH); + sqlite3 *db = NULL; + if (sqlite3_open(DB_PATH, &db) != SQLITE_OK) { fprintf(stderr, "open failed\n"); return 1; } + if (sqlite3_enable_load_extension(db, 1) != SQLITE_OK) return 1; + if (db_exec(db, "SELECT load_extension('" EXT_PATH "');") != SQLITE_OK) return 1; + + char setup[256]; + snprintf(setup, sizeof(setup), + "CREATE TABLE chunk_bench (id TEXT PRIMARY KEY, body BLOB);" + "SELECT cloudsync_init('chunk_bench');" + "SELECT cloudsync_set('payload_max_chunk_size', '%d');", chunk_size); + if (db_exec(db, setup) != SQLITE_OK) return 1; + + // Split the rows across CHUNK_BENCH_TXNS transactions: each is one db_version, + // so TXNS=1 is the pathological single-version window and TXNS=rows is the + // many-versions case a real /check window resembles. Incompressible bodies keep + // the window many-chunked. + int txns = env_int("CHUNK_BENCH_TXNS", 1); + if (txns < 1) txns = 1; + if (txns > rows) txns = rows; + int idbase = 0; + for (int t = 0; t < txns; ++t) { + int n = rows / txns + (t < rows % txns ? 1 : 0); + if (n <= 0) continue; + char insert[256]; + snprintf(insert, sizeof(insert), + "WITH RECURSIVE c(i) AS (SELECT %d UNION ALL SELECT i+1 FROM c WHERE i < %d) " + "INSERT INTO chunk_bench(id, body) SELECT printf('row-%%06d', i), randomblob(%d) FROM c;", + idbase + 1, idbase + n, row_bytes); + if (db_exec(db, insert) != SQLITE_OK) return 1; + idbase += n; + } + + int chunks = 0; + long long bytes = 0; + double best = 1e18, sum = 0; + for (int r = 0; r < repeats; ++r) { + double t0 = monotonic_ms(); + if (drain_positional(db, &chunks, &bytes) != SQLITE_OK) { fprintf(stderr, "positional drain failed\n"); return 1; } + double dt = monotonic_ms() - t0; + sum += dt; if (dt < best) best = dt; + } + + double mean = sum / repeats; + printf("\nPositional /check drain benchmark (local SQLite, no network)\n"); + printf("rows: %d row_bytes: %d txns: %d chunk_size: %d repeats: %d\n", + rows, row_bytes, txns, chunk_size, repeats); + printf("chunks: %d payload_bytes: %lld\n", chunks, bytes); + printf("drain: best=%.2f ms mean=%.2f ms\n", best, mean); + if (chunks > 0) + printf("per-chunk: best=%.3f ms throughput: %.1f MB/s\n", + best / chunks, (double)bytes / 1024.0 / 1024.0 / (best / 1000.0)); + + sqlite3_close(db); + remove(DB_PATH); + return 0; +} From 5ee59f2ffe0277151983f6094f8656450ed5eee0 Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Mon, 29 Jun 2026 21:01:24 -0600 Subject: [PATCH 06/13] refactor(payload): remove the cloudsync_payload_spool table and functions The /check job now pages the tenant via the positional cursor on cloudsync_payload_chunks, so the server-side download spool is dead. Remove the cloudsync_payload_spool table and the cloudsync_payload_spool_fill / _drop / _drop_chunk functions on both engines (SQLite: sql_sqlite.c constants, sql.h decls, cloudsync_sqlite.c functions + registration; PostgreSQL: cloudsync.sql.in and the 1.0->1.1 migration), plus the SQLite "Payload Download Spool" unit test and the spool sections of the PostgreSQL chunk test. The client-side cursor paging (network.c) is unchanged: the wire protocol still pages chunks by an integer cursor; only the server's node-side staging mechanism changed. All remaining chunk/fragment/positional tests pass on both engines. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/postgresql/cloudsync.sql.in | 113 ----------- .../migrations/cloudsync--1.0--1.1.sql | 90 --------- src/sql.h | 8 - src/sqlite/cloudsync_sqlite.c | 154 -------------- src/sqlite/sql_sqlite.c | 46 ----- test/postgresql/52_payload_chunks.sql | 96 --------- test/unit.c | 191 ------------------ 7 files changed, 698 deletions(-) diff --git a/src/postgresql/cloudsync.sql.in b/src/postgresql/cloudsync.sql.in index 3dd19f1..92bb6f4 100644 --- a/src/postgresql/cloudsync.sql.in +++ b/src/postgresql/cloudsync.sql.in @@ -198,119 +198,6 @@ RETURNS bytea AS 'MODULE_PATHNAME', 'cloudsync_uuid_blob' LANGUAGE C IMMUTABLE; --- Download spool (server-side /check chunk staging) --- --- The /check endpoint runs on a separate host and reaches the tenant DB over a --- network driver, which materializes the whole result set. Streaming chunks --- directly with SELECT * FROM cloudsync_payload_chunks(...) would therefore pull --- every chunk into the server's memory at once. Instead the server fills a --- window's chunk stream once into this table and pages it out one chunk per call. --- --- UNLOGGED: this is regenerable scratch state, so skip WAL. Created at install --- time (not lazily inside the function) to avoid the plpgsql create-then-use --- cached-plan pitfall. -CREATE TABLE IF NOT EXISTS cloudsync_payload_spool ( - stream_id text NOT NULL, - chunk_index bigint NOT NULL, - payload bytea NOT NULL, - payload_size bigint NOT NULL, - db_version_min bigint NOT NULL, - db_version_max bigint NOT NULL, - watermark bigint NOT NULL, - is_final boolean NOT NULL DEFAULT false, - created_at bigint NOT NULL DEFAULT extract(epoch FROM now())::bigint, - PRIMARY KEY (stream_id, chunk_index) -); - --- cloudsync_payload_spool_fill(stream_id, since, filter_site_id, exclude) --- Generate the whole chunk stream for a window once into cloudsync_payload_spool. --- Returns the number of chunks spooled. Idempotent: a prior complete fill is kept. --- Parameters are p_-prefixed so they never collide with the table's columns --- (an unqualified `stream_id` would otherwise be ambiguous with the parameter). -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_fill( - p_stream_id text, - p_since_db_version bigint, - p_filter_site_id bytea DEFAULT NULL, - p_exclude_filter_site_id boolean DEFAULT false -) RETURNS bigint AS $$ -DECLARE - existing bigint; - cnt bigint := 0; - rec record; -BEGIN - -- Stale-GC of abandoned streams. fill runs once per stream (coarse-grained), - -- so unlike per-fragment cleanup there is no O(n^2) risk and no throttle. - DELETE FROM cloudsync_payload_spool - WHERE stream_id IN ( - SELECT s.stream_id FROM cloudsync_payload_spool s - GROUP BY s.stream_id - HAVING max(s.created_at) < extract(epoch FROM now())::bigint - 86400); - - -- Idempotent: a prior complete fill stays as-is (fill is atomic, so rows - -- present == complete stream). - SELECT count(*) INTO existing FROM cloudsync_payload_spool WHERE stream_id = p_stream_id; - IF existing > 0 THEN - RETURN existing; - END IF; - - -- Generate the stream one chunk at a time. A cursor FOR loop fetches in - -- batches rather than materializing the whole SRF into a tuplestore. - FOR rec IN - SELECT c.payload, c.chunk_index, c.payload_size, c.db_version_min, c.db_version_max, c.watermark_db_version - FROM cloudsync_payload_chunks(p_since_db_version, p_filter_site_id, NULL, p_exclude_filter_site_id) c - LOOP - INSERT INTO cloudsync_payload_spool - (stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final) - VALUES (p_stream_id, rec.chunk_index, rec.payload, rec.payload_size, - rec.db_version_min, rec.db_version_max, rec.watermark_db_version, false); - cnt := cnt + 1; - END LOOP; - - IF cnt > 0 THEN - UPDATE cloudsync_payload_spool SET is_final = true - WHERE stream_id = p_stream_id - AND chunk_index = (SELECT max(x.chunk_index) FROM cloudsync_payload_spool x - WHERE x.stream_id = p_stream_id); - END IF; - - RETURN cnt; -END; -$$ LANGUAGE plpgsql VOLATILE; - --- cloudsync_payload_spool_drop(stream_id) -> number of chunks removed. -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop(p_stream_id text) -RETURNS bigint AS $$ -DECLARE - deleted bigint := 0; -BEGIN - DELETE FROM cloudsync_payload_spool WHERE stream_id = p_stream_id; - GET DIAGNOSTICS deleted = ROW_COUNT; - RETURN deleted; -END; -$$ LANGUAGE plpgsql VOLATILE; - --- cloudsync_payload_spool_drop_chunk(stream_id, chunk_index) -> number of chunks removed. --- Explicit early cleanup for one S3-backed chunk; stream-level TTL/GC remains unchanged. -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop_chunk(p_stream_id text, p_chunk_index bigint) -RETURNS bigint AS $$ -DECLARE - deleted bigint := 0; -BEGIN - IF p_stream_id IS NULL THEN - RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: stream_id is required.'; - END IF; - IF p_chunk_index IS NULL THEN - RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: chunk_index is required.'; - END IF; - - DELETE FROM cloudsync_payload_spool - WHERE stream_id = p_stream_id - AND chunk_index = p_chunk_index; - GET DIAGNOSTICS deleted = ROW_COUNT; - RETURN deleted; -END; -$$ LANGUAGE plpgsql VOLATILE; - -- Payload decoding and application CREATE OR REPLACE FUNCTION cloudsync_payload_decode(payload bytea) RETURNS integer diff --git a/src/postgresql/migrations/cloudsync--1.0--1.1.sql b/src/postgresql/migrations/cloudsync--1.0--1.1.sql index 5566999..1269bc9 100644 --- a/src/postgresql/migrations/cloudsync--1.0--1.1.sql +++ b/src/postgresql/migrations/cloudsync--1.0--1.1.sql @@ -55,93 +55,3 @@ RETURNS bytea AS 'MODULE_PATHNAME', 'cloudsync_uuid_blob' LANGUAGE C IMMUTABLE; --- Download spool: the /check path fills a window's chunk stream once and pages it --- out one chunk per call so the network driver never re-materializes the whole --- stream. See cloudsync.sql.in for the rationale. -CREATE TABLE IF NOT EXISTS cloudsync_payload_spool ( - stream_id text NOT NULL, - chunk_index bigint NOT NULL, - payload bytea NOT NULL, - payload_size bigint NOT NULL, - db_version_min bigint NOT NULL, - db_version_max bigint NOT NULL, - watermark bigint NOT NULL, - is_final boolean NOT NULL DEFAULT false, - created_at bigint NOT NULL DEFAULT extract(epoch FROM now())::bigint, - PRIMARY KEY (stream_id, chunk_index) -); - -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_fill( - p_stream_id text, - p_since_db_version bigint, - p_filter_site_id bytea DEFAULT NULL, - p_exclude_filter_site_id boolean DEFAULT false -) RETURNS bigint AS $$ -DECLARE - existing bigint; - cnt bigint := 0; - rec record; -BEGIN - DELETE FROM cloudsync_payload_spool - WHERE stream_id IN ( - SELECT s.stream_id FROM cloudsync_payload_spool s - GROUP BY s.stream_id - HAVING max(s.created_at) < extract(epoch FROM now())::bigint - 86400); - - SELECT count(*) INTO existing FROM cloudsync_payload_spool WHERE stream_id = p_stream_id; - IF existing > 0 THEN - RETURN existing; - END IF; - - FOR rec IN - SELECT c.payload, c.chunk_index, c.payload_size, c.db_version_min, c.db_version_max, c.watermark_db_version - FROM cloudsync_payload_chunks(p_since_db_version, p_filter_site_id, NULL, p_exclude_filter_site_id) c - LOOP - INSERT INTO cloudsync_payload_spool - (stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final) - VALUES (p_stream_id, rec.chunk_index, rec.payload, rec.payload_size, - rec.db_version_min, rec.db_version_max, rec.watermark_db_version, false); - cnt := cnt + 1; - END LOOP; - - IF cnt > 0 THEN - UPDATE cloudsync_payload_spool SET is_final = true - WHERE stream_id = p_stream_id - AND chunk_index = (SELECT max(x.chunk_index) FROM cloudsync_payload_spool x - WHERE x.stream_id = p_stream_id); - END IF; - - RETURN cnt; -END; -$$ LANGUAGE plpgsql VOLATILE; - -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop(p_stream_id text) -RETURNS bigint AS $$ -DECLARE - deleted bigint := 0; -BEGIN - DELETE FROM cloudsync_payload_spool WHERE stream_id = p_stream_id; - GET DIAGNOSTICS deleted = ROW_COUNT; - RETURN deleted; -END; -$$ LANGUAGE plpgsql VOLATILE; - -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop_chunk(p_stream_id text, p_chunk_index bigint) -RETURNS bigint AS $$ -DECLARE - deleted bigint := 0; -BEGIN - IF p_stream_id IS NULL THEN - RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: stream_id is required.'; - END IF; - IF p_chunk_index IS NULL THEN - RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: chunk_index is required.'; - END IF; - - DELETE FROM cloudsync_payload_spool - WHERE stream_id = p_stream_id - AND chunk_index = p_chunk_index; - GET DIAGNOSTICS deleted = ROW_COUNT; - RETURN deleted; -END; -$$ LANGUAGE plpgsql VOLATILE; diff --git a/src/sql.h b/src/sql.h index 3e3ca27..6837121 100644 --- a/src/sql.h +++ b/src/sql.h @@ -74,14 +74,6 @@ extern const char * const SQL_PAYLOAD_FRAGMENTS_SELECT; extern const char * const SQL_PAYLOAD_FRAGMENTS_DELETE; extern const char * const SQL_PAYLOAD_FRAGMENTS_CLEANUP_STALE; -extern const char * const SQL_PAYLOAD_SPOOL_CREATE_TABLE; -extern const char * const SQL_PAYLOAD_SPOOL_COUNT; -extern const char * const SQL_PAYLOAD_SPOOL_FILL_INSERT; -extern const char * const SQL_PAYLOAD_SPOOL_MARK_FINAL; -extern const char * const SQL_PAYLOAD_SPOOL_DELETE; -extern const char * const SQL_PAYLOAD_SPOOL_DELETE_CHUNK; -extern const char * const SQL_PAYLOAD_SPOOL_CLEANUP_STALE; - // BLOCKS (block-level LWW) extern const char * const SQL_BLOCKS_CREATE_TABLE; extern const char * const SQL_BLOCKS_UPSERT; diff --git a/src/sqlite/cloudsync_sqlite.c b/src/sqlite/cloudsync_sqlite.c index bba0478..80d45cc 100644 --- a/src/sqlite/cloudsync_sqlite.c +++ b/src/sqlite/cloudsync_sqlite.c @@ -1852,152 +1852,6 @@ void dbsync_payload_load (sqlite3_context *context, int argc, sqlite3_value **ar } #endif -// MARK: - Download spool - - -// Abandoned download streams (a client that started a chunked /check drain and -// never finished) are reaped after this many seconds. Matches the v3 fragment -// stale window. -#define CLOUDSYNC_PAYLOAD_SPOOL_STALE_SECONDS (24*60*60) - -// cloudsync_payload_spool_fill(stream_id, since_db_version, filter_site_id, exclude) -// Generate the whole chunk stream for a window once into cloudsync_payload_spool -// so the /check path can page it out one chunk per call. Returns the number of -// chunks spooled for stream_id. Idempotent: a prior complete fill is kept as-is. -static void dbsync_payload_spool_fill (sqlite3_context *context, int argc, sqlite3_value **argv) { - DEBUG_FUNCTION("cloudsync_payload_spool_fill"); - UNUSED_PARAMETER(argc); - - sqlite3 *db = sqlite3_context_db_handle(context); - if (sqlite3_value_type(argv[0]) == SQLITE_NULL) { - sqlite3_result_error(context, "cloudsync_payload_spool_fill: stream_id is required.", -1); - return; - } - const char *stream_id = (const char *)sqlite3_value_text(argv[0]); - int stream_id_len = sqlite3_value_bytes(argv[0]); - int64_t since = sqlite3_value_int64(argv[1]); - bool site_given = (sqlite3_value_type(argv[2]) != SQLITE_NULL); - int exclude = (sqlite3_value_int(argv[3]) != 0); - - sqlite3_stmt *stmt = NULL; - int rc = sqlite3_exec(db, SQL_PAYLOAD_SPOOL_CREATE_TABLE, NULL, NULL, NULL); - if (rc != SQLITE_OK) goto error; - - // Stale-GC of abandoned streams (coarse-grained, no throttle needed). - if (sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_CLEANUP_STALE, -1, &stmt, NULL) == SQLITE_OK) { - sqlite3_bind_int64(stmt, 1, (int64_t)time(NULL) - CLOUDSYNC_PAYLOAD_SPOOL_STALE_SECONDS); - sqlite3_step(stmt); - } - sqlite3_finalize(stmt); stmt = NULL; - - // Atomic fill: a partial/failed generation must never persist, so that the - // idempotency check below ("rows present == complete stream") holds. - rc = sqlite3_exec(db, "SAVEPOINT cloudsync_spool_fill;", NULL, NULL, NULL); - if (rc != SQLITE_OK) goto error; - - int64_t count = 0; - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_COUNT, -1, &stmt, NULL); - if (rc != SQLITE_OK) goto rollback; - sqlite3_bind_text(stmt, 1, stream_id, stream_id_len, SQLITE_TRANSIENT); - if (sqlite3_step(stmt) == SQLITE_ROW) count = sqlite3_column_int64(stmt, 0); - sqlite3_finalize(stmt); stmt = NULL; - - if (count == 0) { - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_FILL_INSERT, -1, &stmt, NULL); - if (rc != SQLITE_OK) goto rollback; - sqlite3_bind_text(stmt, 1, stream_id, stream_id_len, SQLITE_TRANSIENT); - sqlite3_bind_int64(stmt, 2, since); - if (site_given) sqlite3_bind_blob(stmt, 3, sqlite3_value_blob(argv[2]), sqlite3_value_bytes(argv[2]), SQLITE_TRANSIENT); - else sqlite3_bind_null(stmt, 3); - sqlite3_bind_int(stmt, 4, exclude); - rc = sqlite3_step(stmt); - sqlite3_finalize(stmt); stmt = NULL; - if (rc != SQLITE_DONE) goto rollback; - - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_MARK_FINAL, -1, &stmt, NULL); - if (rc != SQLITE_OK) goto rollback; - sqlite3_bind_text(stmt, 1, stream_id, stream_id_len, SQLITE_TRANSIENT); - rc = sqlite3_step(stmt); - sqlite3_finalize(stmt); stmt = NULL; - if (rc != SQLITE_DONE) goto rollback; - - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_COUNT, -1, &stmt, NULL); - if (rc != SQLITE_OK) goto rollback; - sqlite3_bind_text(stmt, 1, stream_id, stream_id_len, SQLITE_TRANSIENT); - if (sqlite3_step(stmt) == SQLITE_ROW) count = sqlite3_column_int64(stmt, 0); - sqlite3_finalize(stmt); stmt = NULL; - } - - sqlite3_exec(db, "RELEASE cloudsync_spool_fill;", NULL, NULL, NULL); - sqlite3_result_int64(context, count); - return; - -rollback: - sqlite3_finalize(stmt); - sqlite3_result_error(context, sqlite3_errmsg(db), -1); - sqlite3_result_error_code(context, rc); - sqlite3_exec(db, "ROLLBACK TO cloudsync_spool_fill; RELEASE cloudsync_spool_fill;", NULL, NULL, NULL); - return; - -error: - sqlite3_result_error(context, sqlite3_errmsg(db), -1); - sqlite3_result_error_code(context, rc); -} - -// cloudsync_payload_spool_drop(stream_id) -> number of chunks removed. -// Called once a stream has been fully delivered/acked (or to force-evict). -static void dbsync_payload_spool_drop (sqlite3_context *context, int argc, sqlite3_value **argv) { - DEBUG_FUNCTION("cloudsync_payload_spool_drop"); - UNUSED_PARAMETER(argc); - - sqlite3 *db = sqlite3_context_db_handle(context); - if (sqlite3_value_type(argv[0]) == SQLITE_NULL) { - sqlite3_result_error(context, "cloudsync_payload_spool_drop: stream_id is required.", -1); - return; - } - - int rc = sqlite3_exec(db, SQL_PAYLOAD_SPOOL_CREATE_TABLE, NULL, NULL, NULL); - if (rc != SQLITE_OK) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - - sqlite3_stmt *stmt = NULL; - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_DELETE, -1, &stmt, NULL); - if (rc != SQLITE_OK) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - sqlite3_bind_text(stmt, 1, (const char *)sqlite3_value_text(argv[0]), sqlite3_value_bytes(argv[0]), SQLITE_TRANSIENT); - rc = sqlite3_step(stmt); - sqlite3_finalize(stmt); - if (rc != SQLITE_DONE) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - sqlite3_result_int64(context, sqlite3_changes(db)); -} - -// cloudsync_payload_spool_drop_chunk(stream_id, chunk_index) -> number of chunks removed. -// Called after one S3-backed chunk has been safely persisted outside the spool. -static void dbsync_payload_spool_drop_chunk (sqlite3_context *context, int argc, sqlite3_value **argv) { - DEBUG_FUNCTION("cloudsync_payload_spool_drop_chunk"); - UNUSED_PARAMETER(argc); - - sqlite3 *db = sqlite3_context_db_handle(context); - if (sqlite3_value_type(argv[0]) == SQLITE_NULL) { - sqlite3_result_error(context, "cloudsync_payload_spool_drop_chunk: stream_id is required.", -1); - return; - } - if (sqlite3_value_type(argv[1]) == SQLITE_NULL) { - sqlite3_result_error(context, "cloudsync_payload_spool_drop_chunk: chunk_index is required.", -1); - return; - } - - int rc = sqlite3_exec(db, SQL_PAYLOAD_SPOOL_CREATE_TABLE, NULL, NULL, NULL); - if (rc != SQLITE_OK) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - - sqlite3_stmt *stmt = NULL; - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_DELETE_CHUNK, -1, &stmt, NULL); - if (rc != SQLITE_OK) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - sqlite3_bind_text(stmt, 1, (const char *)sqlite3_value_text(argv[0]), sqlite3_value_bytes(argv[0]), SQLITE_TRANSIENT); - sqlite3_bind_int64(stmt, 2, sqlite3_value_int64(argv[1])); - rc = sqlite3_step(stmt); - sqlite3_finalize(stmt); - if (rc != SQLITE_DONE) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - sqlite3_result_int64(context, sqlite3_changes(db)); -} - // MARK: - Register - int dbsync_register_with_flags (sqlite3 *db, const char *name, void (*xfunc)(sqlite3_context*,int,sqlite3_value**), void (*xstep)(sqlite3_context*,int,sqlite3_value**), void (*xfinal)(sqlite3_context*), int nargs, int flags, char **pzErrMsg, void *ctx, void (*ctx_free)(void *)) { @@ -2319,14 +2173,6 @@ int dbsync_register_functions (sqlite3 *db, char **pzErrMsg) { rc = dbsync_register_function(db, "cloudsync_payload_blob_checked", dbsync_payload_blob_checked, 5, pzErrMsg, ctx, NULL); if (rc != SQLITE_OK) return rc; - // Download spool (server-side /check chunk staging) - rc = dbsync_register_function(db, "cloudsync_payload_spool_fill", dbsync_payload_spool_fill, 4, pzErrMsg, ctx, NULL); - if (rc != SQLITE_OK) return rc; - rc = dbsync_register_function(db, "cloudsync_payload_spool_drop", dbsync_payload_spool_drop, 1, pzErrMsg, ctx, NULL); - if (rc != SQLITE_OK) return rc; - rc = dbsync_register_function(db, "cloudsync_payload_spool_drop_chunk", dbsync_payload_spool_drop_chunk, 2, pzErrMsg, ctx, NULL); - if (rc != SQLITE_OK) return rc; - #ifdef CLOUDSYNC_DESKTOP_OS rc = dbsync_register_function(db, "cloudsync_payload_save", dbsync_payload_save, 1, pzErrMsg, ctx, NULL); if (rc != SQLITE_OK) return rc; diff --git a/src/sqlite/sql_sqlite.c b/src/sqlite/sql_sqlite.c index 66d163f..f01a307 100644 --- a/src/sqlite/sql_sqlite.c +++ b/src/sqlite/sql_sqlite.c @@ -311,52 +311,6 @@ const char * const SQL_PAYLOAD_FRAGMENTS_CLEANUP_STALE = "SELECT value_id FROM cloudsync_payload_fragments GROUP BY value_id " "HAVING COUNT(*) < MAX(part_count));"; -// MARK: Payload download spool (server-side /check chunk staging) - -// One row per transport chunk of a download stream. Keyed by (stream_id, -// chunk_index); the server fills a whole window once and pages it out one chunk -// per /check call so the network driver never re-materializes the whole stream. -const char * const SQL_PAYLOAD_SPOOL_CREATE_TABLE = - "CREATE TABLE IF NOT EXISTS cloudsync_payload_spool (" - "stream_id TEXT NOT NULL, chunk_index INTEGER NOT NULL, " - "payload BLOB NOT NULL, payload_size INTEGER NOT NULL, " - "db_version_min INTEGER NOT NULL, db_version_max INTEGER NOT NULL, " - "watermark INTEGER NOT NULL, is_final INTEGER NOT NULL DEFAULT 0, " - "created_at INTEGER NOT NULL DEFAULT (unixepoch()), " - "PRIMARY KEY(stream_id, chunk_index)) WITHOUT ROWID;"; - -const char * const SQL_PAYLOAD_SPOOL_COUNT = - "SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id=?;"; - -// Generate the window's chunk stream once (the vtab streams row-by-row under -// INSERT...SELECT, so peak memory stays at one chunk). until_db_version is left -// unconstrained so the vtab pins the watermark to the current max db_version. -const char * const SQL_PAYLOAD_SPOOL_FILL_INSERT = - "INSERT INTO cloudsync_payload_spool " - "(stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final) " - "SELECT ?1, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark_db_version, 0 " - "FROM cloudsync_payload_chunks " - "WHERE since_db_version=?2 AND site_id=?3 AND exclude_filter_site_id=?4;"; - -const char * const SQL_PAYLOAD_SPOOL_MARK_FINAL = - "UPDATE cloudsync_payload_spool SET is_final=1 " - "WHERE stream_id=?1 AND chunk_index=(" - "SELECT MAX(chunk_index) FROM cloudsync_payload_spool WHERE stream_id=?1);"; - -const char * const SQL_PAYLOAD_SPOOL_DELETE = - "DELETE FROM cloudsync_payload_spool WHERE stream_id=?;"; - -const char * const SQL_PAYLOAD_SPOOL_DELETE_CHUNK = - "DELETE FROM cloudsync_payload_spool WHERE stream_id=? AND chunk_index=?;"; - -// Drop whole abandoned streams (every chunk older than the cutoff). fill is -// coarse-grained (once per stream), so unlike per-fragment cleanup there is no -// O(n^2) risk and no throttle is needed. -const char * const SQL_PAYLOAD_SPOOL_CLEANUP_STALE = - "DELETE FROM cloudsync_payload_spool WHERE stream_id IN (" - "SELECT stream_id FROM cloudsync_payload_spool GROUP BY stream_id " - "HAVING MAX(created_at) < ?);"; - // MARK: Blocks (block-level LWW) const char * const SQL_BLOCKS_CREATE_TABLE = diff --git a/test/postgresql/52_payload_chunks.sql b/test/postgresql/52_payload_chunks.sql index 2e1cfd6..ecdd364 100644 --- a/test/postgresql/52_payload_chunks.sql +++ b/test/postgresql/52_payload_chunks.sql @@ -166,102 +166,6 @@ SELECT (:str_arg_chunks::int = :incl_local_chunks::int) AS str_arg_ok \gset SELECT (:fail::int + 1) AS fail \gset \endif --- Download spool: fill stages the whole stream once; paging it back must be --- byte-identical to direct cloudsync_payload_chunks generation, with is_final --- only on the last chunk and a single stable watermark. -SELECT cloudsync_payload_spool_fill('stream-A', 0, cloudsync_siteid(), false) AS spool_fill_count \gset - -SELECT - md5(string_agg(encode(payload, 'hex'), ',' ORDER BY chunk_index)) AS spool_md5, - count(*) FILTER (WHERE is_final) AS spool_final_count, - max(chunk_index) FILTER (WHERE is_final) AS spool_final_idx, - max(chunk_index) AS spool_max_idx, - count(DISTINCT watermark) AS spool_watermark_distinct -FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' \gset - -SELECT md5(string_agg(encode(payload, 'hex'), ',' ORDER BY chunk_index)) AS direct_md5 -FROM cloudsync_payload_chunks(0, cloudsync_siteid(), NULL, false) \gset - -SELECT (:spool_fill_count::int = :chunk_count::int - AND :'spool_md5' = :'direct_md5' - AND :spool_final_count::int = 1 - AND :spool_final_idx::int = :spool_max_idx::int - AND :spool_watermark_distinct::int = 1) AS spool_ok \gset -\if :spool_ok -\echo [PASS] (:testid) Spool fill/page is byte-identical to direct generation (:spool_fill_count chunks, final on last) -\else -\echo [FAIL] (:testid) Spool fill/page mismatch (fill=:spool_fill_count vs :chunk_count, final_count=:spool_final_count) -SELECT (:fail::int + 1) AS fail \gset -\endif - --- Idempotent re-fill (no duplicate rows), empty window -> 0, drop reports count. -SELECT cloudsync_payload_spool_fill('stream-A', 0, cloudsync_siteid(), false) AS spool_refill_count \gset -SELECT count(*) AS spool_rows_after_refill FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' \gset -SELECT cloudsync_payload_spool_fill('stream-empty', 999999, cloudsync_siteid(), false) AS spool_empty_count \gset -SELECT cloudsync_payload_spool_drop('stream-A') AS spool_drop_count \gset -SELECT count(*) AS spool_rows_after_drop FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' \gset - -SELECT (:spool_refill_count::int = :chunk_count::int - AND :spool_rows_after_refill::int = :chunk_count::int - AND :spool_empty_count::int = 0 - AND :spool_drop_count::int = :chunk_count::int - AND :spool_rows_after_drop::int = 0) AS spool_lifecycle_ok \gset -\if :spool_lifecycle_ok -\echo [PASS] (:testid) Spool idempotent re-fill, empty window, and drop behave correctly -\else -\echo [FAIL] (:testid) Spool lifecycle mismatch (refill=:spool_refill_count empty=:spool_empty_count drop=:spool_drop_count after_drop=:spool_rows_after_drop) -SELECT (:fail::int + 1) AS fail \gset -\endif - --- Stale-GC: an abandoned >24h stream is reaped on the next fill. -INSERT INTO cloudsync_payload_spool - (stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final, created_at) -VALUES ('stale', 0, '\x00'::bytea, 1, 1, 1, 1, true, extract(epoch FROM now())::bigint - 90000); -SELECT cloudsync_payload_spool_fill('stream-B', 0, cloudsync_siteid(), false) AS _spool_gc_fill \gset -SELECT - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stale') AS stale_remaining, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-B') AS streamb_count \gset -SELECT (:stale_remaining::int = 0 AND :streamb_count::int = :chunk_count::int) AS spool_gc_ok \gset -\if :spool_gc_ok -\echo [PASS] (:testid) Spool stale-GC reaps abandoned streams on fill -\else -\echo [FAIL] (:testid) Spool stale-GC did not reap the abandoned stream (stale_remaining=:stale_remaining) -SELECT (:fail::int + 1) AS fail \gset -\endif - --- Chunk drop: explicit early cleanup for one S3-backed chunk is scoped to one --- (stream_id, chunk_index), idempotent, and leaves other chunks/streams intact. -SELECT cloudsync_payload_spool_fill('stream-A', 0, cloudsync_siteid(), false) AS spool_refill_for_chunk_drop \gset -SELECT cloudsync_payload_spool_drop_chunk('stream-A', 1) AS spool_chunk_drop_count \gset -SELECT cloudsync_payload_spool_drop_chunk('stream-A', 1) AS spool_chunk_drop_repeat_count \gset -SELECT cloudsync_payload_spool_drop_chunk('stream-A', 999999) AS spool_chunk_drop_missing_count \gset -SELECT - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-A') AS streama_after_chunk_drop, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' AND chunk_index = 0) AS streama_chunk0, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' AND chunk_index = 1) AS streama_chunk1, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' AND chunk_index = 2) AS streama_chunk2, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-B') AS streamb_after_chunk_drop, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-B' AND chunk_index = 1) AS streamb_chunk1 \gset -SELECT (:spool_refill_for_chunk_drop::int = :chunk_count::int - AND :spool_chunk_drop_count::int = 1 - AND :spool_chunk_drop_repeat_count::int = 0 - AND :spool_chunk_drop_missing_count::int = 0 - AND :streama_after_chunk_drop::int = :chunk_count::int - 1 - AND :streama_chunk0::int = 1 - AND :streama_chunk1::int = 0 - AND :streama_chunk2::int = 1 - AND :streamb_after_chunk_drop::int = :chunk_count::int - AND :streamb_chunk1::int = 1) AS spool_chunk_drop_ok \gset -\if :spool_chunk_drop_ok -\echo [PASS] (:testid) Spool chunk drop is scoped and idempotent -\else -\echo [FAIL] (:testid) Spool chunk drop mismatch (drop=:spool_chunk_drop_count repeat=:spool_chunk_drop_repeat_count missing=:spool_chunk_drop_missing_count streamA=:streama_after_chunk_drop streamB=:streamb_after_chunk_drop) -SELECT (:fail::int + 1) AS fail \gset -\endif - -SELECT cloudsync_payload_spool_drop('stream-A'); -SELECT cloudsync_payload_spool_drop('stream-B'); - SELECT md5(string_agg(id || ':' || note || ':' || encode(data, 'hex'), '|' ORDER BY id)) AS src_hash, count(*) AS src_count diff --git a/test/unit.c b/test/unit.c index e77e227..969dc28 100644 --- a/test/unit.c +++ b/test/unit.c @@ -12739,196 +12739,6 @@ bool do_test_payload_chunks_positional_resume (bool print_result, bool cleanup_d return result; } -// Exercises the server-side download spool: cloudsync_payload_spool_fill stages a -// window's whole chunk stream once, and the /check path pages it out one chunk per -// call. Verifies byte-identity with direct cloudsync_payload_chunks generation, -// is_final marking, idempotent re-fill, drop, empty window, and stale-GC. -bool do_test_payload_spool (bool print_result, bool cleanup_databases) { - sqlite3 *db = NULL; - sqlite3_stmt *stmt = NULL; - test_payload_chunk *chunks = NULL; - int chunk_count = 0, chunk_cap = 0, v3_count = 0; - bool result = false; - int rc = SQLITE_OK; - - time_t timestamp = time(NULL); - int saved_counter = test_counter++; - - db = do_create_database_file(0, timestamp, saved_counter); - if (!db) goto finalize; - rc = sqlite3_exec(db, - "CREATE TABLE payload_chunk_test (" - "id TEXT PRIMARY KEY, note TEXT DEFAULT '', data BLOB DEFAULT x'');" - "SELECT cloudsync_init('payload_chunk_test');", - NULL, NULL, NULL); - if (rc != SQLITE_OK) goto finalize; - - // One oversized value (-> v3 fragment chunks) plus many small rows (-> several - // v2 chunks) under a tiny budget, so the window is a multi-chunk stream. - rc = sqlite3_exec(db, - "SELECT cloudsync_set('payload_max_chunk_size', '262144');" - "INSERT INTO payload_chunk_test(id, note, data) " - "VALUES ('big', lower(hex(randomblob(360000))), randomblob(720000));" - "WITH RECURSIVE c(i) AS (SELECT 1 UNION ALL SELECT i+1 FROM c WHERE i < 260) " - "INSERT INTO payload_chunk_test(id, note, data) " - "SELECT printf('row-%03d', i), printf('small-%03d-%s', i, hex(randomblob(850))), randomblob(512) FROM c;", - NULL, NULL, NULL); - if (rc != SQLITE_OK) goto finalize; - - // Reference stream: collect the chunks the vtab generates directly for the - // whole window (since_db_version=0), in order. - rc = sqlite3_prepare_v2(db, - "SELECT payload FROM cloudsync_payload_chunks WHERE since_db_version=0 ORDER BY chunk_index;", - -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { - int len = sqlite3_column_bytes(stmt, 0); - const void *payload = sqlite3_column_blob(stmt, 0); - if (!payload || len <= 0) goto finalize; - if (len > 4 && ((const unsigned char *)payload)[4] == 3) ++v3_count; - if (chunk_count == chunk_cap) { - int new_cap = chunk_cap ? chunk_cap * 2 : 16; - test_payload_chunk *nc = realloc(chunks, sizeof(*chunks) * new_cap); - if (!nc) goto finalize; - memset(nc + chunk_cap, 0, sizeof(*chunks) * (new_cap - chunk_cap)); - chunks = nc; chunk_cap = new_cap; - } - chunks[chunk_count].data = malloc(len); - if (!chunks[chunk_count].data) goto finalize; - memcpy(chunks[chunk_count].data, payload, len); - chunks[chunk_count].len = len; - ++chunk_count; - } - if (rc != SQLITE_DONE) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - // Scenario needs a genuine multi-chunk stream including >= 2 v3 fragments. - if (chunk_count < 5 || v3_count < 2) goto finalize; - - // --- fill stages the whole stream; the return value is the chunk count --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_fill('stream-A', 0, NULL, 0);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != chunk_count) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - - // --- page through the spool: byte-identity, watermark stable, is_final last --- - rc = sqlite3_prepare_v2(db, - "SELECT payload, watermark, is_final FROM cloudsync_payload_spool " - "WHERE stream_id='stream-A' ORDER BY chunk_index;", - -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - int idx = 0; - int64_t watermark = -1; - while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { - if (idx >= chunk_count) goto finalize; - int len = sqlite3_column_bytes(stmt, 0); - const void *payload = sqlite3_column_blob(stmt, 0); - int64_t wm = sqlite3_column_int64(stmt, 1); - int is_final = sqlite3_column_int(stmt, 2); - if (len != chunks[idx].len || memcmp(payload, chunks[idx].data, len) != 0) goto finalize; - if (watermark < 0) watermark = wm; else if (wm != watermark) goto finalize; - if (is_final != (idx == chunk_count - 1)) goto finalize; - ++idx; - } - if (rc != SQLITE_DONE || idx != chunk_count || watermark <= 0) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - - // --- idempotent re-fill: same count, no duplicate rows --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_fill('stream-A', 0, NULL, 0);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != chunk_count) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, "SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A';", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != chunk_count) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - - // --- empty window: since past the watermark yields zero chunks --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_fill('stream-empty', 999999, NULL, 0);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 0) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - - // --- stale-GC: an abandoned >24h stream is reaped on the next fill --- - rc = sqlite3_exec(db, - "INSERT INTO cloudsync_payload_spool " - "(stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final, created_at) " - "VALUES ('stale', 0, x'00', 1, 1, 1, 1, 1, unixepoch()-90000);", - NULL, NULL, NULL); - if (rc != SQLITE_OK) goto finalize; - rc = sqlite3_exec(db, "SELECT cloudsync_payload_spool_fill('stream-B', 0, NULL, 0);", NULL, NULL, NULL); - if (rc != SQLITE_OK) goto finalize; - rc = sqlite3_prepare_v2(db, - "SELECT (SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stale'), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-B');", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW) goto finalize; - if (sqlite3_column_int(stmt, 0) != 0 || sqlite3_column_int(stmt, 1) != chunk_count) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - - // --- chunk drop removes only one matching chunk and reports the count --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_drop_chunk('stream-A', 1);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 1) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_drop_chunk('stream-A', 1);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 0) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_drop_chunk('stream-A', 999999);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 0) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, - "SELECT " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A'), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A' AND chunk_index=0), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A' AND chunk_index=1), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A' AND chunk_index=2), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-B'), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-B' AND chunk_index=1);", - -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW) goto finalize; - if (sqlite3_column_int(stmt, 0) != chunk_count - 1 || - sqlite3_column_int(stmt, 1) != 1 || - sqlite3_column_int(stmt, 2) != 0 || - sqlite3_column_int(stmt, 3) != 1 || - sqlite3_column_int(stmt, 4) != chunk_count || - sqlite3_column_int(stmt, 5) != 1) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - - // --- drop removes the stream and reports the number of chunks removed --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_drop('stream-A');", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != chunk_count - 1) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, "SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A';", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 0) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - - result = true; - -finalize: - if (!result && print_result) { - printf("do_test_payload_spool error: %s (chunks=%d, v3=%d)\n", - db ? sqlite3_errmsg(db) : "no db", chunk_count, v3_count); - } - if (stmt) sqlite3_finalize(stmt); - test_payload_chunks_free(chunks, chunk_count); - if (db) close_db(db); - if (cleanup_databases) { - char path[256], walpath[300], shmpath[300]; - do_build_database_path(path, 0, timestamp, saved_counter); - snprintf(walpath, sizeof(walpath), "%s-wal", path); - snprintf(shmpath, sizeof(shmpath), "%s-shm", path); - file_delete_internal(path); - file_delete_internal(walpath); - file_delete_internal(shmpath); - } - return result; -} - bool do_test_payload_idempotency (int nclients, bool print_result, bool cleanup_databases) { sqlite3 *db[2] = {NULL, NULL}; bool result = false; @@ -13380,7 +13190,6 @@ int main (int argc, const char * argv[]) { result += test_report("Payload Chunks Site Exclusion:", do_test_payload_chunks_site_exclusion(print_result, cleanup_databases)); result += test_report("Payload Chunks Split db_version:", do_test_payload_chunks_split_dbversion(print_result, cleanup_databases)); result += test_report("Payload Chunks Positional Resume:", do_test_payload_chunks_positional_resume(print_result, cleanup_databases)); - result += test_report("Payload Download Spool:", do_test_payload_spool(print_result, cleanup_databases)); // close local database close_db(db); From 01f6d25ae72de09ac8bc03b8483611339281cc3f Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Tue, 30 Jun 2026 11:28:17 -0600 Subject: [PATCH 07/13] fix(network): track lastOptimisticVersion latest-valid, not monotonic max MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit network_sync_state_update_from_response kept the maximum optimistic/confirmed version seen across a multi-chunk send. But the server can move lastOptimisticVersion BACKWARD when a later chunk fails (rollback), and that value becomes the durable send checkpoint (CLOUDSYNC_KEY_SEND_DBVERSION) — so masking a decrease advanced the checkpoint past the rolled-back changes, which were then never re-sent (silent data loss). Take the latest valid (>= 0) value instead, matching how `gaps` is already handled; a missing/unparseable field (-1) still leaves the value untouched. Pre-existing since the chunked-payload transport work, unreleased. Add a network-layer unit test harness (the layer had none): network_unit.c is built with networking compiled in (T_CFLAGS minus OMIT_NETWORK, linked against curl) so the internal parsing/state functions can be exercised on in-memory NETWORK_RESULT buffers with no server. `make network-unittest` runs it (also wired into `make test`). The new test reproduces the rollback (50→100→50 must end at 50, not 100) and covers the missing-field no-op and network_compute_status; network_sync_state_update_from_response / network_compute_status are de-static'd for it. Co-Authored-By: Claude Opus 4.8 (1M context) --- Makefile | 31 ++++++++++- src/network/network.c | 34 +++++++----- src/network/network_private.h | 7 +++ test/network_unit.c | 99 +++++++++++++++++++++++++++++++++++ 4 files changed, 157 insertions(+), 14 deletions(-) create mode 100644 test/network_unit.c diff --git a/Makefile b/Makefile index fac34df..d4f4f14 100644 --- a/Makefile +++ b/Makefile @@ -78,7 +78,9 @@ FI_SRC = $(FI_DIR)/fractional_indexing.c # Combined for SQLite extension build SRC_FILES = $(CORE_SRC) $(SQLITE_SRC) $(FI_SRC) -TEST_SRC = $(wildcard $(TEST_DIR)/*.c) +# network_unit.c is built separately with networking ENABLED (see network-unittest), +# so keep it out of the default OMIT_NETWORK test build. +TEST_SRC = $(filter-out $(TEST_DIR)/network_unit.c,$(wildcard $(TEST_DIR)/*.c)) TEST_FILES = $(SRC_FILES) $(TEST_SRC) $(wildcard $(SQLITE_DIR)/*.c) RELEASE_OBJ = $(patsubst %.c, $(BUILD_RELEASE)/%.o, $(notdir $(SRC_FILES))) TEST_OBJ = $(patsubst %.c, $(BUILD_TEST)/%.o, $(notdir $(TEST_FILES))) @@ -86,6 +88,18 @@ COV_FILES = $(filter-out $(SRC_DIR)/lz4.c $(NETWORK_DIR)/network.c $(SQLITE_IMPL CURL_LIB = $(CURL_DIR)/$(PLATFORM)/libcurl.a TEST_TARGET = $(patsubst %.c,$(DIST_DIR)/%$(EXE), $(notdir $(TEST_SRC))) +# Network-enabled unit tests: rebuild the codebase with networking ON (T_CFLAGS +# minus OMIT_NETWORK) and link curl, so network.c's internal functions can be +# tested directly on in-memory buffers. NT_LDFLAGS reuses the platform LDFLAGS +# (which carries -lcurl) minus the shared-library-only flags, plus the test link +# libs. -undefined dynamic_lookup is kept: the test never opens a connection, so +# curl's transport symbols are linked but never invoked. +BUILD_NETTEST = build/nettest +NT_CFLAGS = $(filter-out -DCLOUDSYNC_OMIT_NETWORK,$(T_CFLAGS)) +NT_LDFLAGS = $(filter-out -dynamiclib -headerpad_max_install_names,$(LDFLAGS)) $(T_LDFLAGS) +NT_SRC = $(SRC_FILES) $(SQLITE_DIR)/sqlite3.c $(TEST_DIR)/network_unit.c +NT_OBJ = $(patsubst %.c,$(BUILD_NETTEST)/%.o,$(notdir $(NT_SRC))) + # Build curl hermetically: neutralize the developer's ambient build env so # curl's ./configure compile tests aren't broken by overrides leaking in # (e.g. exported LDFLAGS/CPPFLAGS/LIBS pointing at Homebrew). Build flags for @@ -261,8 +275,16 @@ $(BUILD_TEST)/sqlite3.o: $(SQLITE_DIR)/sqlite3.c $(BUILD_TEST)/%.o: %.c $(CC) $(T_CFLAGS) -c $< -o $@ +# Network-enabled object files (networking ON, for network-unittest) +$(BUILD_NETTEST): + mkdir -p $(BUILD_NETTEST) +$(BUILD_NETTEST)/sqlite3.o: $(SQLITE_DIR)/sqlite3.c | $(BUILD_NETTEST) + $(CC) $(CFLAGS) -DSQLITE_DQS=0 -DSQLITE_CORE -c $< -o $@ +$(BUILD_NETTEST)/%.o: %.c | $(BUILD_NETTEST) + $(CC) $(NT_CFLAGS) -c $< -o $@ + # Run code coverage (--css-file $(CUSTOM_CSS)) -test: $(TARGET) $(TEST_TARGET) unittest e2e +test: $(TARGET) $(TEST_TARGET) unittest network-unittest e2e set -e; $(SQLITE3) ":memory:" -cmd ".bail on" ".load ./$<" "SELECT cloudsync_version();" ifneq ($(COVERAGE),false) mkdir -p $(COV_DIR) @@ -274,6 +296,11 @@ endif unittest: $(TARGET) $(DIST_DIR)/unit$(EXE) @./$(DIST_DIR)/unit$(EXE) +# Run the network-layer unit tests (networking compiled in, no server) +network-unittest: $(CURL_LIB) $(NT_OBJ) + $(CC) $(NT_OBJ) -o $(DIST_DIR)/network_unit$(EXE) $(NT_LDFLAGS) + @./$(DIST_DIR)/network_unit$(EXE) + # Run end-to-end integration tests e2e: $(TARGET) $(DIST_DIR)/integration$(EXE) @if [ -f .env ]; then \ diff --git a/src/network/network.c b/src/network/network.c index f8b6c24..7a96e9b 100644 --- a/src/network/network.c +++ b/src/network/network.c @@ -1601,18 +1601,22 @@ static int network_send_payload_to_apply(sqlite3_context *context, network_data return SQLITE_OK; } -static void network_sync_state_update_from_response(NETWORK_RESULT *res, - int64_t *last_optimistic_version, - int64_t *last_confirmed_version, - int *gaps_size, - char **apply_failure_json, - char **check_failure_json) { +void network_sync_state_update_from_response(NETWORK_RESULT *res, + int64_t *last_optimistic_version, + int64_t *last_confirmed_version, + int *gaps_size, + char **apply_failure_json, + char **check_failure_json) { if (!res || res->code != CLOUDSYNC_NETWORK_BUFFER || !res->buffer) return; - int64_t parsed_version = json_extract_int(res->buffer, res->blen, "lastOptimisticVersion", -1); - if (parsed_version > *last_optimistic_version) *last_optimistic_version = parsed_version; - parsed_version = json_extract_int(res->buffer, res->blen, "lastConfirmedVersion", -1); - if (parsed_version > *last_confirmed_version) *last_confirmed_version = parsed_version; + // Take the latest valid (>= 0) value, not the max: the server can move these + // BACKWARD on a rollback when a later send chunk fails, and lastOptimisticVersion + // becomes the durable send checkpoint — masking a decrease would advance the + // checkpoint past the rolled-back changes and silently drop them. + int64_t parsed_optimistic = json_extract_int(res->buffer, res->blen, "lastOptimisticVersion", -1); + if (parsed_optimistic >= 0) *last_optimistic_version = parsed_optimistic; + int64_t parsed_confirmed = json_extract_int(res->buffer, res->blen, "lastConfirmedVersion", -1); + if (parsed_confirmed >= 0) *last_confirmed_version = parsed_confirmed; int parsed_gaps_size = json_extract_array_size(res->buffer, res->blen, "gaps"); if (parsed_gaps_size >= 0) *gaps_size = parsed_gaps_size; @@ -1627,10 +1631,16 @@ static void network_sync_state_update_from_response(NETWORK_RESULT *res, if (*check_failure_json) cloudsync_memory_free(*check_failure_json); *check_failure_json = check_failure; } + + #ifdef CLOUDSYNC_NETWORK_TRACE + // Full endpoint response body that the sync-state fields above were parsed from. + // The buffer is not guaranteed NUL-terminated, so bound the print with its length. + fprintf(stderr, "[cloudsync-network] sync_state response=%.*s\n", (int)res->blen, res->buffer); + #endif } -static const char *network_compute_status(int64_t last_optimistic, int64_t last_confirmed, - int gaps_size, int64_t local_version) { +const char *network_compute_status(int64_t last_optimistic, int64_t last_confirmed, + int gaps_size, int64_t local_version) { if (last_optimistic < 0 || last_confirmed < 0) return "error"; if (gaps_size > 0 || last_optimistic < local_version) return "out-of-sync"; if (last_optimistic == last_confirmed) return "synced"; diff --git a/src/network/network_private.h b/src/network/network_private.h index 82c00d7..b150d42 100644 --- a/src/network/network_private.h +++ b/src/network/network_private.h @@ -8,6 +8,9 @@ #ifndef __CLOUDSYNC_NETWORK_PRIVATE__ #define __CLOUDSYNC_NETWORK_PRIVATE__ +#include +#include + #define CLOUDSYNC_DEFAULT_ADDRESS "https://cloudsync.sqlite.ai" #define CLOUDSYNC_ENDPOINT_PREFIX "v2/cloudsync/databases" #define CLOUDSYNC_ENDPOINT_UPLOAD "upload" @@ -46,6 +49,10 @@ bool network_data_set_endpoints (network_data *data, char *auth, char *check, ch bool network_send_buffer(network_data *data, const char *endpoint, const char *authentication, const void *blob, int blob_size); NETWORK_RESULT network_receive_buffer (network_data *data, const char *endpoint, const char *authentication, bool zero_terminated, bool is_post_request, char *json_payload, const char **extra_headers, int nextra_headers); +// Exposed (non-static) for the network unit test; otherwise internal to network.c. +void network_sync_state_update_from_response(NETWORK_RESULT *res, int64_t *last_optimistic_version, int64_t *last_confirmed_version, int *gaps_size, char **apply_failure_json, char **check_failure_json); +const char *network_compute_status(int64_t last_optimistic, int64_t last_confirmed, int gaps_size, int64_t local_version); + #ifdef CLOUDSYNC_NETWORK_TRACE const char *network_trace_endpoint_name(network_data *data, const char *endpoint); const char *network_trace_result_name(int code); diff --git a/test/network_unit.c b/test/network_unit.c new file mode 100644 index 0000000..d5971b1 --- /dev/null +++ b/test/network_unit.c @@ -0,0 +1,99 @@ +// +// network_unit.c +// cloudsync +// +// Unit tests for the network layer's pure response-handling logic. Built with +// networking ENABLED (unlike dist/unit, which is -DCLOUDSYNC_OMIT_NETWORK), so it +// can call the internal functions directly on crafted in-memory NETWORK_RESULT +// buffers — no server, no sockets. +// + +#include +#include +#include +#include +#include +#include "network_private.h" + +static int failures = 0; + +static void check(const char *name, bool ok) { + printf("%-64s %s\n", name, ok ? "OK" : "FAIL"); + if (!ok) failures++; +} + +static NETWORK_RESULT json_buffer(char *json) { + NETWORK_RESULT r = {0}; + r.code = CLOUDSYNC_NETWORK_BUFFER; + r.buffer = json; + r.blen = strlen(json); + return r; +} + +// Regression: lastOptimisticVersion must track the LATEST valid value, including a +// decrease. The server can roll the optimistic version back when a later send chunk +// fails; since it becomes the durable send checkpoint, a monotonic "max" would mask +// the rollback and skip the rolled-back changes on the next send. +static bool test_optimistic_version_rollback(void) { + int64_t optimistic = -1, confirmed = -1; + int gaps = -1; + char *apply = NULL, *check_fail = NULL; + bool ok = true; + + char j1[] = "{\"lastOptimisticVersion\":50,\"lastConfirmedVersion\":10}"; + NETWORK_RESULT r1 = json_buffer(j1); + network_sync_state_update_from_response(&r1, &optimistic, &confirmed, &gaps, &apply, &check_fail); + ok = ok && optimistic == 50 && confirmed == 10; + + char j2[] = "{\"lastOptimisticVersion\":100,\"lastConfirmedVersion\":20}"; + NETWORK_RESULT r2 = json_buffer(j2); + network_sync_state_update_from_response(&r2, &optimistic, &confirmed, &gaps, &apply, &check_fail); + ok = ok && optimistic == 100 && confirmed == 20; + + // Server rolls back on a later chunk error: the value must DECREASE to 50. + char j3[] = "{\"lastOptimisticVersion\":50,\"lastConfirmedVersion\":20}"; + NETWORK_RESULT r3 = json_buffer(j3); + network_sync_state_update_from_response(&r3, &optimistic, &confirmed, &gaps, &apply, &check_fail); + ok = ok && optimistic == 50; + + // A response missing the field (parsed -1) must NOT clobber the current value. + char j4[] = "{\"lastConfirmedVersion\":20}"; + NETWORK_RESULT r4 = json_buffer(j4); + network_sync_state_update_from_response(&r4, &optimistic, &confirmed, &gaps, &apply, &check_fail); + ok = ok && optimistic == 50; + + ok = ok && apply == NULL && check_fail == NULL; // no failures object in these responses + return ok; +} + +// A non-BUFFER result (or NULL buffer) must leave the accumulators untouched. +static bool test_non_buffer_is_noop(void) { + int64_t optimistic = 7, confirmed = 3; + int gaps = 0; + char *apply = NULL, *check_fail = NULL; + + NETWORK_RESULT err = {0}; + err.code = CLOUDSYNC_NETWORK_ERROR; + network_sync_state_update_from_response(&err, &optimistic, &confirmed, &gaps, &apply, &check_fail); + return optimistic == 7 && confirmed == 3 && gaps == 0; +} + +static bool test_compute_status(void) { + bool ok = true; + ok = ok && strcmp(network_compute_status(100, 100, 0, 100), "synced") == 0; + ok = ok && strcmp(network_compute_status(100, 50, 0, 100), "syncing") == 0; + ok = ok && strcmp(network_compute_status(100, 100, 1, 100), "out-of-sync") == 0; // gaps + ok = ok && strcmp(network_compute_status(90, 90, 0, 100), "out-of-sync") == 0; // behind local + ok = ok && strcmp(network_compute_status(-1, 100, 0, 100), "error") == 0; // unparsed + return ok; +} + +int main(void) { + printf("\nNetwork unit tests\n"); + check("optimistic/confirmed version folds latest-valid (allows rollback):", test_optimistic_version_rollback()); + check("non-buffer response is a no-op:", test_non_buffer_is_noop()); + check("network_compute_status:", test_compute_status()); + if (failures) { printf("\n%d test(s) FAILED\n", failures); return 1; } + printf("\nAll network unit tests passed\n"); + return 0; +} From 0e36154cb52d0200f3b862c52fc4925e0e42654c Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Tue, 30 Jun 2026 16:02:02 -0600 Subject: [PATCH 08/13] fix(build): drop -shared from the network-unittest executable link The network-unittest target (added in 01f6d25) links dist/network_unit as an executable from NT_LDFLAGS, derived from the platform LDFLAGS. Linux, Android and Windows put -shared in LDFLAGS (the extension is a .so there); the filter only stripped macOS's -dynamiclib, so -shared leaked into the executable link and ld tried to emit a shared object from non-PIC objects: relocation R_X86_64_PC32 ... can not be used when making a shared object Strip -shared too so it links as a plain executable. macOS is unaffected. Co-Authored-By: Claude Opus 4.8 (1M context) --- Makefile | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index d4f4f14..40986f6 100644 --- a/Makefile +++ b/Makefile @@ -91,12 +91,13 @@ TEST_TARGET = $(patsubst %.c,$(DIST_DIR)/%$(EXE), $(notdir $(TEST_SRC))) # Network-enabled unit tests: rebuild the codebase with networking ON (T_CFLAGS # minus OMIT_NETWORK) and link curl, so network.c's internal functions can be # tested directly on in-memory buffers. NT_LDFLAGS reuses the platform LDFLAGS -# (which carries -lcurl) minus the shared-library-only flags, plus the test link -# libs. -undefined dynamic_lookup is kept: the test never opens a connection, so -# curl's transport symbols are linked but never invoked. +# (which carries -lcurl) minus the shared-library-only flags (-shared on Linux, +# -dynamiclib on macOS) so it links as an executable, plus the test link libs. +# -undefined dynamic_lookup is kept: the test never opens a connection, so curl's +# transport symbols are linked but never invoked. BUILD_NETTEST = build/nettest NT_CFLAGS = $(filter-out -DCLOUDSYNC_OMIT_NETWORK,$(T_CFLAGS)) -NT_LDFLAGS = $(filter-out -dynamiclib -headerpad_max_install_names,$(LDFLAGS)) $(T_LDFLAGS) +NT_LDFLAGS = $(filter-out -shared -dynamiclib -headerpad_max_install_names,$(LDFLAGS)) $(T_LDFLAGS) NT_SRC = $(SRC_FILES) $(SQLITE_DIR)/sqlite3.c $(TEST_DIR)/network_unit.c NT_OBJ = $(patsubst %.c,$(BUILD_NETTEST)/%.o,$(notdir $(NT_SRC))) From 93591c239165b99be1dc1761de9c524d64104310 Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Tue, 30 Jun 2026 16:02:03 -0600 Subject: [PATCH 09/13] fix(network): announce the covered window on send, not just the change range MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cloudsync_network_send_changes announced each chunk's actual [db_version_min, db_version_max] to /apply. When the local db_version clock has advanced past the site's own changes — e.g. after merging applied remote changes, or any transaction that produced no local-site change — those skipped versions are absent from the sent ranges, so the server's per-site coverage (ComputeGaps walks it from version 1) reports them as gaps and never advances lastOptimisticVersion to the local version. The send then churns and never reaches "synced". Announce the covered window instead: min = a running window lower bound (send_checkpoint+1, chained to the previous chunk's max+1), max = the chunk's own max, tiling [send_checkpoint+1 .. watermark] contiguously. network_announce_min() clamps the min to the chunk's own min so consecutive chunks sharing a db_version (a value fragmented across chunks) keep min<=max; a partial-fragment failure stays safe because the receiver stages incomplete fragments (0 applied rows), so coverage is only recorded once the value is whole. Pre-existing since the chunked-payload transport (f1ace81), unreleased. Tests: network-unit case for the tiling (leading hole / inter-chunk hole / same-version fragment pair); e2e repro test_send_gap_from_clock_hole forces a clock jump with cloudsync_db_version_next() and asserts serverVersion reaches localVersion (red before this change, green after). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/network/network.c | 12 +++++- src/network/network_private.h | 8 ++++ test/integration.c | 74 ++++++++++++++++++++++++++++++++++- test/network_unit.c | 27 +++++++++++++ 4 files changed, 119 insertions(+), 2 deletions(-) diff --git a/src/network/network.c b/src/network/network.c index 7a96e9b..4ed61c4 100644 --- a/src/network/network.c +++ b/src/network/network.c @@ -1725,6 +1725,11 @@ int cloudsync_network_send_changes_internal (sqlite3_context *context, int argc, bool sent_any = false; int sent_chunks = 0; // payload chunks sent this call int64_t sent_bytes = 0; // serialized payload bytes sent this call + // Lower bound of the coverage window announced to /apply. The server tracks + // per-site applied ranges from version 1, so the announced ranges must tile + // [send_checkpoint+1 .. watermark] contiguously; otherwise db_versions consumed + // without a local-site change (e.g. merged remote changes) read back as gaps. + int64_t announce_lo = db_version + 1; while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { const void *blob = sqlite3_column_blob(stmt, 0); @@ -1741,8 +1746,12 @@ int cloudsync_network_send_changes_internal (sqlite3_context *context, int argc, goto cleanup; } + // Announce the covered window, not just where the data sits, so db_versions + // skipped by non-local-change transactions don't read back as server-side gaps. + int64_t announce_min = network_announce_min(announce_lo, db_version_min); + NETWORK_RESULT res = {0}; - rc = network_send_payload_to_apply(context, netdata, blob, blob_size, db_version_min, db_version_max, &res); + rc = network_send_payload_to_apply(context, netdata, blob, blob_size, announce_min, db_version_max, &res); if (rc != SQLITE_OK) goto cleanup; if (res.code == CLOUDSYNC_NETWORK_BUFFER && res.buffer) { @@ -1760,6 +1769,7 @@ int cloudsync_network_send_changes_internal (sqlite3_context *context, int argc, sent_chunks++; sent_bytes += payload_size; if (watermark > new_db_version) new_db_version = watermark; + announce_lo = db_version_max + 1; // next chunk continues the window here } if (rc != SQLITE_DONE) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); diff --git a/src/network/network_private.h b/src/network/network_private.h index b150d42..d541a8c 100644 --- a/src/network/network_private.h +++ b/src/network/network_private.h @@ -53,6 +53,14 @@ NETWORK_RESULT network_receive_buffer (network_data *data, const char *endpoint, void network_sync_state_update_from_response(NETWORK_RESULT *res, int64_t *last_optimistic_version, int64_t *last_confirmed_version, int *gaps_size, char **apply_failure_json, char **check_failure_json); const char *network_compute_status(int64_t last_optimistic, int64_t last_confirmed, int gaps_size, int64_t local_version); +// Lower bound of the coverage window a send chunk announces to /apply: the running +// window start, but never above the chunk's own min (so consecutive chunks sharing a +// db_version — a value fragmented across chunks — keep min<=max). The caller advances +// window_lo to chunk_db_version_max+1 after each chunk so the ranges tile contiguously. +static inline int64_t network_announce_min(int64_t window_lo, int64_t chunk_db_version_min) { + return window_lo < chunk_db_version_min ? window_lo : chunk_db_version_min; +} + #ifdef CLOUDSYNC_NETWORK_TRACE const char *network_trace_endpoint_name(network_data *data, const char *endpoint); const char *network_trace_result_name(int code); diff --git a/test/integration.c b/test/integration.c index 8700524..fb9b4fb 100644 --- a/test/integration.c +++ b/test/integration.c @@ -563,12 +563,83 @@ int test_enable_disable(const char *db_path) { rc = db_expect_int(db2, sql, 1); RCHECK rc = db_exec(db2, "SELECT cloudsync_terminate();"); RCHECK - + sqlite3_close(db2); ABORT_TEST } +// Reproduces the spurious-gap bug in the send path: when the local db_version clock +// has been advanced past the site's own changes — as happens when applied remote +// changes bump the clock — the send announces only the change's own db_version range, +// so the skipped versions stay a gap in the server's per-site coverage and +// lastOptimisticVersion can never reach localVersion. cloudsync_db_version_next() +// forces the jump deterministically, no second database required. Expected to FAIL +// until the send announces the covered window [last_sent+1 .. watermark]. +int test_send_gap_from_clock_hole(const char *db_path) { + sqlite3 *db = NULL; + int rc = open_load_ext(db_path, &db); RCHECK + rc = db_init(db); RCHECK // create users/activities/workouts (this db is fresh) + + char value[UUID_STR_MAXLEN]; + cloudsync_uuid_v7_string(value, true); + char sql[256]; + + rc = db_exec(db, "SELECT cloudsync_init('users');"); RCHECK + rc = db_exec(db, "SELECT cloudsync_init('activities');"); RCHECK + rc = db_exec(db, "SELECT cloudsync_init('workouts');"); RCHECK + + // Force the next local change to land at db_version 10, leaving 1..9 with no + // local-site change (the "hole" that merging applied remote changes would create). + rc = db_exec(db, "SELECT cloudsync_db_version_next(10);"); RCHECK + + snprintf(sql, sizeof(sql), "INSERT INTO users (id, name) VALUES ('%s', '%s');", value, value); + rc = db_exec(db, sql); RCHECK + + // sanity: the change really landed at db_version 10, so there is a leading hole + rc = db_expect_int(db, "SELECT cloudsync_db_version();", 10); RCHECK + + // init network + char network_init[1024]; + const char* test_db_id = getenv("INTEGRATION_TEST_DATABASE_ID"); + if (!test_db_id) { + fprintf(stderr, "Error: INTEGRATION_TEST_DATABASE_ID not set.\n"); + exit(1); + } + const char* custom_address = getenv("INTEGRATION_TEST_CLOUDSYNC_ADDRESS"); + if (custom_address) { + snprintf(network_init, sizeof(network_init), + "SELECT cloudsync_network_init_custom('%s', '%s');", custom_address, test_db_id); + } else { + snprintf(network_init, sizeof(network_init), + "SELECT cloudsync_network_init('%s');", test_db_id); + } + rc = db_exec(db, network_init); RCHECK + + const char* apikey = getenv("INTEGRATION_TEST_APIKEY"); + if (apikey) { + char set_apikey[512]; + snprintf(set_apikey, sizeof(set_apikey), + "SELECT cloudsync_network_set_apikey('%s');", apikey); + rc = db_exec(db, set_apikey); RCHECK + } + + // Send once. The server applies the change and computes lastOptimisticVersion + // (serverVersion) synchronously from its per-site applied ranges. With contiguous + // coverage it reaches localVersion (10); with the gap bug it stays at 0 because + // db_versions 1..9 are reported missing. + rc = db_expect_int(db, + "SELECT (j ->> '$.send.serverVersion') = (j ->> '$.send.localVersion') " + " AND (j ->> '$.send.localVersion') = 10 " + "FROM (SELECT cloudsync_network_send_changes() AS j);", 1); RCHECK + + rc = db_exec(db, "SELECT cloudsync_cleanup('users');"); RCHECK + rc = db_exec(db, "SELECT cloudsync_cleanup('activities');"); RCHECK + rc = db_exec(db, "SELECT cloudsync_cleanup('workouts');"); RCHECK + +ABORT_TEST +} + int test_chunked_payload_paths(void) { int rc = SQLITE_OK; sqlite3 *sender = NULL; @@ -1508,6 +1579,7 @@ int main (void) { rc += test_report("Is Enabled Test:", test_is_enabled(DB_PATH)); rc += test_report("DB Version Test:", test_db_version(DB_PATH)); rc += test_report("Enable Disable Test:", test_enable_disable(DB_PATH)); + rc += test_report("Send Gap From Clock Hole Test:", test_send_gap_from_clock_hole(":memory:")); // Chunked payload tests run only when INTEGRATION_TEST_CHUNKED_DATABASE_ID points at a // tenant with a small payload_max_chunk_size; state the skip reason once for the group. diff --git a/test/network_unit.c b/test/network_unit.c index d5971b1..d9f4299 100644 --- a/test/network_unit.c +++ b/test/network_unit.c @@ -78,6 +78,32 @@ static bool test_non_buffer_is_noop(void) { return optimistic == 7 && confirmed == 3 && gaps == 0; } +// The send announces a contiguous coverage window, not the raw chunk ranges, so +// skipped db_versions don't look like gaps. Walk the tiling exactly as the send loop +// does: announce [network_announce_min(lo, chunk_min) .. chunk_max], then lo = max+1. +static bool test_announce_window_tiling(void) { + bool ok = true; + int64_t lo = 0 + 1; // send checkpoint (since) = 0 + + // leading hole: a single change at db_version 7 must announce from 1, not 7. + ok = ok && network_announce_min(lo, 7) == 1; + lo = 7 + 1; + + // inter-chunk hole: next change at 20 (8..19 empty) announces from 8 → no gap. + ok = ok && network_announce_min(lo, 20) == 8; + lo = 20 + 1; + + // same-db_version fragments (a value split across two chunks): first fragment + // announces from the running lo, second must clamp to the shared version (30), + // never lo (31) — that would make min > max. + ok = ok && network_announce_min(lo, 30) == 21; // first fragment of v30 + lo = 30 + 1; + ok = ok && network_announce_min(lo, 30) == 30; // second fragment of v30: 30, not 31 + lo = 30 + 1; + + return ok; +} + static bool test_compute_status(void) { bool ok = true; ok = ok && strcmp(network_compute_status(100, 100, 0, 100), "synced") == 0; @@ -92,6 +118,7 @@ int main(void) { printf("\nNetwork unit tests\n"); check("optimistic/confirmed version folds latest-valid (allows rollback):", test_optimistic_version_rollback()); check("non-buffer response is a no-op:", test_non_buffer_is_noop()); + check("send announce-window tiling (hole / inter-chunk / fragment):", test_announce_window_tiling()); check("network_compute_status:", test_compute_status()); if (failures) { printf("\n%d test(s) FAILED\n", failures); return 1; } printf("\nAll network unit tests passed\n"); From 2028cc35a57a412cf6e3173b38d7a266d4464a43 Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Tue, 30 Jun 2026 16:20:33 -0600 Subject: [PATCH 10/13] fix(build): link network_unit via a file rule so Android runs only the binary The Android CI runs test binaries on the emulator from a command script generated by `make test -n` (it pushes the workspace and executes the script via adb). The network-unittest recipe linked dist/network_unit *inside the recipe*, so `make -n` emitted the cross-compiler link command into that on-device script; the emulator has no host NDK toolchain, so it failed with x86_64-linux-android26-clang: not found (EXIT_CODE=127). Move the link to a file-target rule (like dist/unit) and keep only the run in the network-unittest recipe, so once the host build is done `make -n` emits just `./dist/network_unit`. Co-Authored-By: Claude Opus 4.8 (1M context) --- Makefile | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 40986f6..05be9fd 100644 --- a/Makefile +++ b/Makefile @@ -297,9 +297,15 @@ endif unittest: $(TARGET) $(DIST_DIR)/unit$(EXE) @./$(DIST_DIR)/unit$(EXE) +# Network-enabled unit test binary. Link it via a file rule (like dist/unit), not in +# the run recipe below: on Android `make test` runs binaries on the emulator from a +# script generated by `make test -n`, so a link command inside the recipe would be +# emitted and the cross-compiler invoked on-device (it isn't there → exit 127). +$(DIST_DIR)/network_unit$(EXE): $(CURL_LIB) $(NT_OBJ) + $(CC) $(NT_OBJ) -o $@ $(NT_LDFLAGS) + # Run the network-layer unit tests (networking compiled in, no server) -network-unittest: $(CURL_LIB) $(NT_OBJ) - $(CC) $(NT_OBJ) -o $(DIST_DIR)/network_unit$(EXE) $(NT_LDFLAGS) +network-unittest: $(DIST_DIR)/network_unit$(EXE) @./$(DIST_DIR)/network_unit$(EXE) # Run end-to-end integration tests From 47c109c41f41f5cab27867fa0b7df3e42f89d354 Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Tue, 30 Jun 2026 16:23:23 -0600 Subject: [PATCH 11/13] test(bench): comment steps and print network JSON responses under trace Add step-by-step comments through the benchmark flow and, in trace builds, print the raw JSON returned by the cloudsync_network_* functions: route the pre-measure-sync and cleanup-send calls through a trace-aware db_exec_network helper, and surface the send/check response JSON from timed_request. Co-Authored-By: Claude Opus 4.8 (1M context) --- test/sync_bench.c | 39 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/test/sync_bench.c b/test/sync_bench.c index 684f795..5fd496a 100644 --- a/test/sync_bench.c +++ b/test/sync_bench.c @@ -134,6 +134,21 @@ static int timed_query_text(sqlite3 *db, const char *sql, char **out, double *st return rc; } +// Runs a cloudsync_network_* scalar function. In trace mode it captures and prints +// the JSON the function returns; otherwise it executes without keeping the result. +static int db_exec_network(sqlite3 *db, const char *operation, const char *sql) { +#ifdef CLOUDSYNC_NETWORK_TRACE + char *result = NULL; + int rc = query_text(db, sql, &result); + bench_trace("response op=%s json=%s", operation, result ? result : "(null)"); + free(result); + return rc; +#else + (void)operation; + return db_exec(db, sql); +#endif +} + static int open_load_ext(const char *db_path, sqlite3 **out_db) { bench_trace("step=open-load-extension db_path=%s begin", db_path); sqlite3 *db = NULL; @@ -205,8 +220,10 @@ static int init_network(sqlite3 *db, const char *label, const char *database_id, if (rc != SQLITE_OK) return rc; } + // Drain any pre-existing backlog so the measured send->apply later starts from a + // clean baseline. This is untimed warm-up, not part of the latency measurement. bench_trace("step=pre-measure-sync db=%s begin sql=cloudsync_network_sync(500,4)", label); - rc = db_exec(db, "SELECT cloudsync_network_sync(500, 4);"); + rc = db_exec_network(db, "pre-measure-sync", "SELECT cloudsync_network_sync(500, 4);"); bench_trace("step=pre-measure-sync db=%s end rc=%d", label, rc); return rc; } @@ -383,6 +400,9 @@ static int timed_request(sqlite3 *db, sync_bench_request *request, const char *o if (strcmp(operation, "check") == 0 && request->result_json) { request->rows_received = json_int_at_path(db, request->result_json, "$.receive.rows", -1); } + // Surface the raw JSON returned by the network function (send/check) under trace. + bench_trace("response op=%s attempt=%d json=%s", operation, attempt, + request->result_json ? request->result_json : "(null)"); return request->sqlite_rc; } @@ -522,6 +542,8 @@ int main(void) { remove(DB_B_PATH); cloudsync_memory_init(1); + // Step 1 - setup: open both local databases (sender A, receiver B), load the + // extension, create the schema, attach to the network, and drain to a clean baseline. bench_trace("step=benchmark-setup begin database_id=%s poll_delay_ms=%d max_polls=%d", database_id, poll_delay_ms, max_polls); rc = setup_database("db_a", DB_A_PATH, database_id, address, apikey, &db_a); if (rc != SQLITE_OK) goto cleanup; @@ -529,15 +551,19 @@ int main(void) { if (rc != SQLITE_OK) goto cleanup; bench_trace("step=benchmark-setup end rc=%d", rc); + // Prune stale rows from prior runs and push those deletions so they don't pollute + // the receiver's backlog during the measured round-trip. rc = cleanup_old_benchmark_rows(db_a, cleanup_older_than_seconds, &cleanup_deleted_rows); if (rc != SQLITE_OK) goto cleanup; if (cleanup_deleted_rows > 0) { bench_trace("step=cleanup-send db=db_a deleted=%d begin sql=cloudsync_network_send_changes", cleanup_deleted_rows); - rc = db_exec(db_a, "SELECT cloudsync_network_send_changes();"); + rc = db_exec_network(db_a, "cleanup-send", "SELECT cloudsync_network_send_changes();"); bench_trace("step=cleanup-send db=db_a deleted=%d end rc=%d", cleanup_deleted_rows, rc); if (rc != SQLITE_OK) goto cleanup; } + // Step 3 - build the unique benchmark row: a UUIDv7 id plus an incompressible + // random blob, so the payload survives compression and exercises a realistic size. cloudsync_uuid_v7_string(row_id, true); snprintf(marker, sizeof(marker), "sync-bench-%s", row_id); snprintf(payload, sizeof(payload), "payload-%s", row_id); @@ -553,9 +579,12 @@ int main(void) { random_blob = &empty_blob; } + // Insert the row on the sender. This is the change whose propagation we time. rc = insert_benchmark_row(db_a, row_id, payload, marker, random_blob, random_blob_size); if (rc != SQLITE_OK) goto cleanup; + // Step 4 - precondition: the row must not already exist on the receiver, or the + // measurement would be meaningless. bench_trace("step=verify-before-send db=db_b row_id=%s begin", row_id); rc = verify_row(db_b, row_id, payload, marker, random_blob, random_blob_size, &applied); bench_trace("step=verify-before-send db=db_b row_id=%s end rc=%d applied=%s", row_id, rc, applied ? "true" : "false"); @@ -566,6 +595,8 @@ int main(void) { goto cleanup; } + // Step 5 - measured send: push the row from A and capture the send summary + // (status / localVersion / serverVersion). total_start_ms anchors the latency clock. bench_trace("step=send db=db_a row_id=%s begin sql=cloudsync_network_send_changes", row_id); rc = timed_request(db_a, &requests[request_count++], "send", 1, "SELECT cloudsync_network_send_changes();"); bench_trace("step=send db=db_a row_id=%s end rc=%d elapsed_ms=%.2f", row_id, rc, requests[request_count - 1].elapsed_ms); @@ -575,6 +606,8 @@ int main(void) { send_summary.server_version = json_int_at_path(db_a, requests[0].result_json, "$.send.serverVersion", -1); double total_start_ms = requests[0].started_ms; + // Step 6 - poll loop: receive on B (sleep between attempts) until the row is + // applied and verified, or max_polls is exhausted. for (int i = 0; i < max_polls; i++) { if (i > 0 && poll_delay_ms > 0) { bench_trace("step=poll-sleep attempt=%d delay_ms=%d begin", i + 1, poll_delay_ms); @@ -609,6 +642,8 @@ int main(void) { rc = SQLITE_BUSY; } + // Step 7 - aggregate timings: split the end-to-end latency into time spent in + // network requests, in poll sleeps, and the remaining local overhead. for (int i = 0; i < request_count; i++) request_ms += requests[i].elapsed_ms; measured_overhead_ms = total_ms - request_ms - poll_sleep_ms; if (measured_overhead_ms < 0.0 && measured_overhead_ms > -0.01) measured_overhead_ms = 0.0; From 2479549dbdfd5f6fc1f7dc1a32ea9257f9ad48a1 Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Tue, 30 Jun 2026 16:52:45 -0600 Subject: [PATCH 12/13] fix(build): build dist/network_unit before unittest in the test target MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Android CI host runs `make test`, which aborts when `unittest` runs the cross-built dist/unit (an Android binary on the Linux host). dist/network_unit is not in TEST_TARGET (network_unit.c is filtered out of TEST_SRC), so it was built only via the network-unittest prerequisite — ordered after unittest, i.e. never, once make aborted. `make test -n` then saw the binary missing and emitted the NDK link into the on-emulator command script (2028cc3's file-rule split was necessary but not sufficient). List dist/network_unit as a direct `test` prerequisite before unittest so it builds during the file-build phase, exactly like dist/unit. Co-Authored-By: Claude Opus 4.8 (1M context) --- Makefile | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 05be9fd..f9d1acf 100644 --- a/Makefile +++ b/Makefile @@ -285,7 +285,11 @@ $(BUILD_NETTEST)/%.o: %.c | $(BUILD_NETTEST) $(CC) $(NT_CFLAGS) -c $< -o $@ # Run code coverage (--css-file $(CUSTOM_CSS)) -test: $(TARGET) $(TEST_TARGET) unittest network-unittest e2e +# dist/network_unit is listed before `unittest` so it is built during the file-build +# phase: on Android the host `make test` aborts when `unittest` runs the cross-built +# dist/unit, and network_unit (not in TEST_TARGET) would otherwise never be built +# before `make test -n` captures the on-emulator command script. +test: $(TARGET) $(TEST_TARGET) $(DIST_DIR)/network_unit$(EXE) unittest network-unittest e2e set -e; $(SQLITE3) ":memory:" -cmd ".bail on" ".load ./$<" "SELECT cloudsync_version();" ifneq ($(COVERAGE),false) mkdir -p $(COV_DIR) From f56098f0150f26c1f5ba4daae5fd793ec772530a Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Tue, 30 Jun 2026 16:52:45 -0600 Subject: [PATCH 13/13] test(e2e): assert send return values, not just data convergence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The send path's status/version reporting had no e2e coverage — both the spurious gap and the optimistic-version regression moved data correctly while reporting the wrong status, so data-only assertions missed them. Add two helpers: - db_send_ok(): run a send expected to succeed and assert it did not fail at the protocol level (status != "error", no send.lastFailure) — catches a server-reported apply/check failure that does not raise a SQL error, which a plain db_exec of the send ignored. Applied at the basic and chunked primary send sites (not the two failure-path tests, nor the best-effort in-loop sends). - db_send_await_converge(): send, then poll until send.serverVersion reaches send.localVersion (durably covered, no gap), robust to the server's async apply. test_send_gap_from_clock_hole now uses it instead of a single synchronous assert. Full e2e suite (incl. chunked) green against a live server. Co-Authored-By: Claude Opus 4.8 (1M context) --- test/integration.c | 58 +++++++++++++++++++++++++++++++++------------- 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/test/integration.c b/test/integration.c index fb9b4fb..fc39efb 100644 --- a/test/integration.c +++ b/test/integration.c @@ -236,6 +236,35 @@ int db_expect_min (sqlite3 *db, const char *sql, int expect_min) { return SQLITE_OK; } +// Run a send that is expected to succeed and assert it didn't fail at the protocol +// level: status is not "error" and the server reported no per-chunk failure +// (send.lastFailure is omitted on success). Catches a server-reported apply/check +// failure that doesn't raise a SQL error — invisible to a plain db_exec of the send. +int db_send_ok (sqlite3 *db) { + return db_expect_int(db, + "SELECT (j ->> '$.send.status') <> 'error' AND (j ->> '$.send.lastFailure') IS NULL " + "FROM (SELECT cloudsync_network_send_changes() AS j);", 1); +} + +// Send, then poll until the server's optimistic version (send.serverVersion) catches +// up to send.localVersion — the change is durably covered with no gap. Robust to the +// server's asynchronous apply: the first call sends, later calls are no-ops that +// re-read status. Fails if it has not converged within max_attempts. +int db_send_await_converge (sqlite3 *db, int max_attempts, int delay_ms) { + const char *sql = + "SELECT (j ->> '$.send.serverVersion') >= (j ->> '$.send.localVersion') " + "FROM (SELECT cloudsync_network_send_changes() AS j);"; + for (int i = 0; i < max_attempts; i++) { + int converged = 0; + int rc = db_select_int(db, sql, &converged); + if (rc != SQLITE_OK) return rc; + if (converged) return SQLITE_OK; + if (i + 1 < max_attempts) sqlite3_sleep(delay_ms); + } + printf("Error: send did not converge (serverVersion < localVersion) after %d attempts\n", max_attempts); + return SQLITE_ERROR; +} + int integration_network_init(sqlite3 *db, const char *database_id, char *network_init, size_t network_init_len) { if (!database_id) { fprintf(stderr, "Error: integration database ID not set.\n"); @@ -528,7 +557,7 @@ int test_enable_disable(const char *db_path) { rc = db_exec(db, set_apikey); RCHECK } - rc = db_exec(db, "SELECT cloudsync_network_send_changes();"); RCHECK + rc = db_send_ok(db); RCHECK rc = db_exec(db, "SELECT cloudsync_cleanup('users');"); RCHECK rc = db_exec(db, "SELECT cloudsync_cleanup('activities');"); RCHECK rc = db_exec(db, "SELECT cloudsync_cleanup('workouts');"); RCHECK @@ -624,14 +653,11 @@ int test_send_gap_from_clock_hole(const char *db_path) { rc = db_exec(db, set_apikey); RCHECK } - // Send once. The server applies the change and computes lastOptimisticVersion - // (serverVersion) synchronously from its per-site applied ranges. With contiguous - // coverage it reaches localVersion (10); with the gap bug it stays at 0 because - // db_versions 1..9 are reported missing. - rc = db_expect_int(db, - "SELECT (j ->> '$.send.serverVersion') = (j ->> '$.send.localVersion') " - " AND (j ->> '$.send.localVersion') = 10 " - "FROM (SELECT cloudsync_network_send_changes() AS j);", 1); RCHECK + // Send, then poll until the server's optimistic version (serverVersion) reaches + // localVersion (10). With contiguous coverage it converges; with the gap bug it + // never does — serverVersion stays at 0 because db_versions 1..9 are reported + // missing. Polling absorbs the server's asynchronous apply. + rc = db_send_await_converge(db, 8, 1000); RCHECK rc = db_exec(db, "SELECT cloudsync_cleanup('users');"); RCHECK rc = db_exec(db, "SELECT cloudsync_cleanup('activities');"); RCHECK @@ -665,7 +691,7 @@ int test_chunked_payload_paths(void) { rc = db_expect_min(sender, "SELECT COUNT(*) FROM cloudsync_payload_chunks();", 2); if (rc != SQLITE_OK) goto cleanup; rc = db_expect_min(sender, "SELECT COUNT(*) FROM cloudsync_payload_chunks() WHERE hex(substr(payload,5,1))='03';", 2); if (rc != SQLITE_OK) goto cleanup; - rc = db_exec(sender, "SELECT cloudsync_network_send_changes();"); if (rc != SQLITE_OK) goto cleanup; + rc = db_send_ok(sender); if (rc != SQLITE_OK) goto cleanup; cleanup_remote_row = true; for (int attempt = 0; attempt < 30; ++attempt) { @@ -738,7 +764,7 @@ int test_chunked_payload_rowset_path(void) { rc = db_expect_min(sender, "SELECT COUNT(*) FROM cloudsync_payload_chunks();", 2); if (rc != SQLITE_OK) goto cleanup; rc = db_expect_int(sender, "SELECT COUNT(*) FROM cloudsync_payload_chunks() WHERE hex(substr(payload,5,1))='03';", 0); if (rc != SQLITE_OK) goto cleanup; - rc = db_exec(sender, "SELECT cloudsync_network_send_changes();"); if (rc != SQLITE_OK) goto cleanup; + rc = db_send_ok(sender); if (rc != SQLITE_OK) goto cleanup; cleanup_remote_rows = true; for (int attempt = 0; attempt < 30; ++attempt) { @@ -822,7 +848,7 @@ int test_chunked_payload_single_sync_drain(void) { // Sender splits into multiple non-fragment chunks. rc = db_expect_min(sender, "SELECT COUNT(*) FROM cloudsync_payload_chunks();", 2); if (rc != SQLITE_OK) goto cleanup; - rc = db_exec(sender, "SELECT cloudsync_network_send_changes();"); if (rc != SQLITE_OK) goto cleanup; + rc = db_send_ok(sender); if (rc != SQLITE_OK) goto cleanup; cleanup_remote_rows = true; for (int attempt = 0; attempt < 40; ++attempt) { @@ -921,7 +947,7 @@ int test_chunked_payload_capped_receive(void) { rc = db_expect_min(sender, "SELECT COUNT(*) FROM cloudsync_payload_chunks();", 2); if (rc != SQLITE_OK) goto cleanup; - rc = db_exec(sender, "SELECT cloudsync_network_send_changes();"); if (rc != SQLITE_OK) goto cleanup; + rc = db_send_ok(sender); if (rc != SQLITE_OK) goto cleanup; cleanup_remote_rows = true; for (int attempt = 0; attempt < 80; ++attempt) { @@ -1017,7 +1043,7 @@ int test_chunked_payload_batched_receive(void) { rc = db_expect_min(sender, "SELECT COUNT(*) FROM cloudsync_payload_chunks();", 3); if (rc != SQLITE_OK) goto cleanup; - rc = db_exec(sender, "SELECT cloudsync_network_send_changes();"); if (rc != SQLITE_OK) goto cleanup; + rc = db_send_ok(sender); if (rc != SQLITE_OK) goto cleanup; cleanup_remote_rows = true; for (int attempt = 0; attempt < 80; ++attempt) { @@ -1189,7 +1215,7 @@ int test_chunked_negative_cache_invalidation(void) { "INSERT INTO chunked_payload_items (id, body) VALUES ('%s', 'negative-cache-sentinel');", sentinel_id); rc = db_exec(sender, sql); if (rc != SQLITE_OK) goto cleanup; - rc = db_exec(sender, "SELECT cloudsync_network_send_changes();"); if (rc != SQLITE_OK) goto cleanup; + rc = db_send_ok(sender); if (rc != SQLITE_OK) goto cleanup; cleanup_sentinel = true; // Phase 1: drain the receiver until it is provably caught up. Each bare @@ -1267,7 +1293,7 @@ int test_chunked_negative_cache_invalidation(void) { "INSERT INTO chunked_payload_items (id, body) VALUES ('%s', 'negative-cache');", row_id); rc = db_exec(sender, sql); if (rc != SQLITE_OK) goto cleanup; - rc = db_exec(sender, "SELECT cloudsync_network_send_changes();"); if (rc != SQLITE_OK) goto cleanup; + rc = db_send_ok(sender); if (rc != SQLITE_OK) goto cleanup; cleanup_remote_row = true; // Phase 4: the receiver must now pick up the change on a subsequent receive.