diff --git a/Makefile b/Makefile index fac34df..f9d1acf 100644 --- a/Makefile +++ b/Makefile @@ -78,7 +78,9 @@ FI_SRC = $(FI_DIR)/fractional_indexing.c # Combined for SQLite extension build SRC_FILES = $(CORE_SRC) $(SQLITE_SRC) $(FI_SRC) -TEST_SRC = $(wildcard $(TEST_DIR)/*.c) +# network_unit.c is built separately with networking ENABLED (see network-unittest), +# so keep it out of the default OMIT_NETWORK test build. +TEST_SRC = $(filter-out $(TEST_DIR)/network_unit.c,$(wildcard $(TEST_DIR)/*.c)) TEST_FILES = $(SRC_FILES) $(TEST_SRC) $(wildcard $(SQLITE_DIR)/*.c) RELEASE_OBJ = $(patsubst %.c, $(BUILD_RELEASE)/%.o, $(notdir $(SRC_FILES))) TEST_OBJ = $(patsubst %.c, $(BUILD_TEST)/%.o, $(notdir $(TEST_FILES))) @@ -86,6 +88,19 @@ COV_FILES = $(filter-out $(SRC_DIR)/lz4.c $(NETWORK_DIR)/network.c $(SQLITE_IMPL CURL_LIB = $(CURL_DIR)/$(PLATFORM)/libcurl.a TEST_TARGET = $(patsubst %.c,$(DIST_DIR)/%$(EXE), $(notdir $(TEST_SRC))) +# Network-enabled unit tests: rebuild the codebase with networking ON (T_CFLAGS +# minus OMIT_NETWORK) and link curl, so network.c's internal functions can be +# tested directly on in-memory buffers. NT_LDFLAGS reuses the platform LDFLAGS +# (which carries -lcurl) minus the shared-library-only flags (-shared on Linux, +# -dynamiclib on macOS) so it links as an executable, plus the test link libs. +# -undefined dynamic_lookup is kept: the test never opens a connection, so curl's +# transport symbols are linked but never invoked. +BUILD_NETTEST = build/nettest +NT_CFLAGS = $(filter-out -DCLOUDSYNC_OMIT_NETWORK,$(T_CFLAGS)) +NT_LDFLAGS = $(filter-out -shared -dynamiclib -headerpad_max_install_names,$(LDFLAGS)) $(T_LDFLAGS) +NT_SRC = $(SRC_FILES) $(SQLITE_DIR)/sqlite3.c $(TEST_DIR)/network_unit.c +NT_OBJ = $(patsubst %.c,$(BUILD_NETTEST)/%.o,$(notdir $(NT_SRC))) + # Build curl hermetically: neutralize the developer's ambient build env so # curl's ./configure compile tests aren't broken by overrides leaking in # (e.g. exported LDFLAGS/CPPFLAGS/LIBS pointing at Homebrew). Build flags for @@ -261,8 +276,20 @@ $(BUILD_TEST)/sqlite3.o: $(SQLITE_DIR)/sqlite3.c $(BUILD_TEST)/%.o: %.c $(CC) $(T_CFLAGS) -c $< -o $@ +# Network-enabled object files (networking ON, for network-unittest) +$(BUILD_NETTEST): + mkdir -p $(BUILD_NETTEST) +$(BUILD_NETTEST)/sqlite3.o: $(SQLITE_DIR)/sqlite3.c | $(BUILD_NETTEST) + $(CC) $(CFLAGS) -DSQLITE_DQS=0 -DSQLITE_CORE -c $< -o $@ +$(BUILD_NETTEST)/%.o: %.c | $(BUILD_NETTEST) + $(CC) $(NT_CFLAGS) -c $< -o $@ + # Run code coverage (--css-file $(CUSTOM_CSS)) -test: $(TARGET) $(TEST_TARGET) unittest e2e +# dist/network_unit is listed before `unittest` so it is built during the file-build +# phase: on Android the host `make test` aborts when `unittest` runs the cross-built +# dist/unit, and network_unit (not in TEST_TARGET) would otherwise never be built +# before `make test -n` captures the on-emulator command script. +test: $(TARGET) $(TEST_TARGET) $(DIST_DIR)/network_unit$(EXE) unittest network-unittest e2e set -e; $(SQLITE3) ":memory:" -cmd ".bail on" ".load ./$<" "SELECT cloudsync_version();" ifneq ($(COVERAGE),false) mkdir -p $(COV_DIR) @@ -274,6 +301,17 @@ endif unittest: $(TARGET) $(DIST_DIR)/unit$(EXE) @./$(DIST_DIR)/unit$(EXE) +# Network-enabled unit test binary. Link it via a file rule (like dist/unit), not in +# the run recipe below: on Android `make test` runs binaries on the emulator from a +# script generated by `make test -n`, so a link command inside the recipe would be +# emitted and the cross-compiler invoked on-device (it isn't there → exit 127). +$(DIST_DIR)/network_unit$(EXE): $(CURL_LIB) $(NT_OBJ) + $(CC) $(NT_OBJ) -o $@ $(NT_LDFLAGS) + +# Run the network-layer unit tests (networking compiled in, no server) +network-unittest: $(DIST_DIR)/network_unit$(EXE) + @./$(DIST_DIR)/network_unit$(EXE) + # Run end-to-end integration tests e2e: $(TARGET) $(DIST_DIR)/integration$(EXE) @if [ -f .env ]; then \ diff --git a/src/network/network.c b/src/network/network.c index f8b6c24..4ed61c4 100644 --- a/src/network/network.c +++ b/src/network/network.c @@ -1601,18 +1601,22 @@ static int network_send_payload_to_apply(sqlite3_context *context, network_data return SQLITE_OK; } -static void network_sync_state_update_from_response(NETWORK_RESULT *res, - int64_t *last_optimistic_version, - int64_t *last_confirmed_version, - int *gaps_size, - char **apply_failure_json, - char **check_failure_json) { +void network_sync_state_update_from_response(NETWORK_RESULT *res, + int64_t *last_optimistic_version, + int64_t *last_confirmed_version, + int *gaps_size, + char **apply_failure_json, + char **check_failure_json) { if (!res || res->code != CLOUDSYNC_NETWORK_BUFFER || !res->buffer) return; - int64_t parsed_version = json_extract_int(res->buffer, res->blen, "lastOptimisticVersion", -1); - if (parsed_version > *last_optimistic_version) *last_optimistic_version = parsed_version; - parsed_version = json_extract_int(res->buffer, res->blen, "lastConfirmedVersion", -1); - if (parsed_version > *last_confirmed_version) *last_confirmed_version = parsed_version; + // Take the latest valid (>= 0) value, not the max: the server can move these + // BACKWARD on a rollback when a later send chunk fails, and lastOptimisticVersion + // becomes the durable send checkpoint — masking a decrease would advance the + // checkpoint past the rolled-back changes and silently drop them. + int64_t parsed_optimistic = json_extract_int(res->buffer, res->blen, "lastOptimisticVersion", -1); + if (parsed_optimistic >= 0) *last_optimistic_version = parsed_optimistic; + int64_t parsed_confirmed = json_extract_int(res->buffer, res->blen, "lastConfirmedVersion", -1); + if (parsed_confirmed >= 0) *last_confirmed_version = parsed_confirmed; int parsed_gaps_size = json_extract_array_size(res->buffer, res->blen, "gaps"); if (parsed_gaps_size >= 0) *gaps_size = parsed_gaps_size; @@ -1627,10 +1631,16 @@ static void network_sync_state_update_from_response(NETWORK_RESULT *res, if (*check_failure_json) cloudsync_memory_free(*check_failure_json); *check_failure_json = check_failure; } + + #ifdef CLOUDSYNC_NETWORK_TRACE + // Full endpoint response body that the sync-state fields above were parsed from. + // The buffer is not guaranteed NUL-terminated, so bound the print with its length. + fprintf(stderr, "[cloudsync-network] sync_state response=%.*s\n", (int)res->blen, res->buffer); + #endif } -static const char *network_compute_status(int64_t last_optimistic, int64_t last_confirmed, - int gaps_size, int64_t local_version) { +const char *network_compute_status(int64_t last_optimistic, int64_t last_confirmed, + int gaps_size, int64_t local_version) { if (last_optimistic < 0 || last_confirmed < 0) return "error"; if (gaps_size > 0 || last_optimistic < local_version) return "out-of-sync"; if (last_optimistic == last_confirmed) return "synced"; @@ -1715,6 +1725,11 @@ int cloudsync_network_send_changes_internal (sqlite3_context *context, int argc, bool sent_any = false; int sent_chunks = 0; // payload chunks sent this call int64_t sent_bytes = 0; // serialized payload bytes sent this call + // Lower bound of the coverage window announced to /apply. The server tracks + // per-site applied ranges from version 1, so the announced ranges must tile + // [send_checkpoint+1 .. watermark] contiguously; otherwise db_versions consumed + // without a local-site change (e.g. merged remote changes) read back as gaps. + int64_t announce_lo = db_version + 1; while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { const void *blob = sqlite3_column_blob(stmt, 0); @@ -1731,8 +1746,12 @@ int cloudsync_network_send_changes_internal (sqlite3_context *context, int argc, goto cleanup; } + // Announce the covered window, not just where the data sits, so db_versions + // skipped by non-local-change transactions don't read back as server-side gaps. + int64_t announce_min = network_announce_min(announce_lo, db_version_min); + NETWORK_RESULT res = {0}; - rc = network_send_payload_to_apply(context, netdata, blob, blob_size, db_version_min, db_version_max, &res); + rc = network_send_payload_to_apply(context, netdata, blob, blob_size, announce_min, db_version_max, &res); if (rc != SQLITE_OK) goto cleanup; if (res.code == CLOUDSYNC_NETWORK_BUFFER && res.buffer) { @@ -1750,6 +1769,7 @@ int cloudsync_network_send_changes_internal (sqlite3_context *context, int argc, sent_chunks++; sent_bytes += payload_size; if (watermark > new_db_version) new_db_version = watermark; + announce_lo = db_version_max + 1; // next chunk continues the window here } if (rc != SQLITE_DONE) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); diff --git a/src/network/network_private.h b/src/network/network_private.h index 82c00d7..d541a8c 100644 --- a/src/network/network_private.h +++ b/src/network/network_private.h @@ -8,6 +8,9 @@ #ifndef __CLOUDSYNC_NETWORK_PRIVATE__ #define __CLOUDSYNC_NETWORK_PRIVATE__ +#include +#include + #define CLOUDSYNC_DEFAULT_ADDRESS "https://cloudsync.sqlite.ai" #define CLOUDSYNC_ENDPOINT_PREFIX "v2/cloudsync/databases" #define CLOUDSYNC_ENDPOINT_UPLOAD "upload" @@ -46,6 +49,18 @@ bool network_data_set_endpoints (network_data *data, char *auth, char *check, ch bool network_send_buffer(network_data *data, const char *endpoint, const char *authentication, const void *blob, int blob_size); NETWORK_RESULT network_receive_buffer (network_data *data, const char *endpoint, const char *authentication, bool zero_terminated, bool is_post_request, char *json_payload, const char **extra_headers, int nextra_headers); +// Exposed (non-static) for the network unit test; otherwise internal to network.c. +void network_sync_state_update_from_response(NETWORK_RESULT *res, int64_t *last_optimistic_version, int64_t *last_confirmed_version, int *gaps_size, char **apply_failure_json, char **check_failure_json); +const char *network_compute_status(int64_t last_optimistic, int64_t last_confirmed, int gaps_size, int64_t local_version); + +// Lower bound of the coverage window a send chunk announces to /apply: the running +// window start, but never above the chunk's own min (so consecutive chunks sharing a +// db_version — a value fragmented across chunks — keep min<=max). The caller advances +// window_lo to chunk_db_version_max+1 after each chunk so the ranges tile contiguously. +static inline int64_t network_announce_min(int64_t window_lo, int64_t chunk_db_version_min) { + return window_lo < chunk_db_version_min ? window_lo : chunk_db_version_min; +} + #ifdef CLOUDSYNC_NETWORK_TRACE const char *network_trace_endpoint_name(network_data *data, const char *endpoint); const char *network_trace_result_name(int code); diff --git a/src/postgresql/cloudsync.sql.in b/src/postgresql/cloudsync.sql.in index 31bb994..92bb6f4 100644 --- a/src/postgresql/cloudsync.sql.in +++ b/src/postgresql/cloudsync.sql.in @@ -153,7 +153,10 @@ CREATE OR REPLACE FUNCTION cloudsync_payload_chunks( since_db_version bigint DEFAULT NULL, filter_site_id bytea DEFAULT NULL, until_db_version bigint DEFAULT NULL, - exclude_filter_site_id boolean DEFAULT false + exclude_filter_site_id boolean DEFAULT false, + resume_db_version bigint DEFAULT NULL, + resume_seq bigint DEFAULT NULL, + resume_frag_offset bigint DEFAULT NULL ) RETURNS TABLE ( payload bytea, @@ -162,7 +165,11 @@ RETURNS TABLE ( rows bigint, db_version_min bigint, db_version_max bigint, - watermark_db_version bigint + watermark_db_version bigint, + next_db_version bigint, + next_seq bigint, + next_frag_offset bigint, + is_final boolean ) AS 'MODULE_PATHNAME', 'cloudsync_payload_chunks' LANGUAGE C VOLATILE; @@ -191,119 +198,6 @@ RETURNS bytea AS 'MODULE_PATHNAME', 'cloudsync_uuid_blob' LANGUAGE C IMMUTABLE; --- Download spool (server-side /check chunk staging) --- --- The /check endpoint runs on a separate host and reaches the tenant DB over a --- network driver, which materializes the whole result set. Streaming chunks --- directly with SELECT * FROM cloudsync_payload_chunks(...) would therefore pull --- every chunk into the server's memory at once. Instead the server fills a --- window's chunk stream once into this table and pages it out one chunk per call. --- --- UNLOGGED: this is regenerable scratch state, so skip WAL. Created at install --- time (not lazily inside the function) to avoid the plpgsql create-then-use --- cached-plan pitfall. -CREATE TABLE IF NOT EXISTS cloudsync_payload_spool ( - stream_id text NOT NULL, - chunk_index bigint NOT NULL, - payload bytea NOT NULL, - payload_size bigint NOT NULL, - db_version_min bigint NOT NULL, - db_version_max bigint NOT NULL, - watermark bigint NOT NULL, - is_final boolean NOT NULL DEFAULT false, - created_at bigint NOT NULL DEFAULT extract(epoch FROM now())::bigint, - PRIMARY KEY (stream_id, chunk_index) -); - --- cloudsync_payload_spool_fill(stream_id, since, filter_site_id, exclude) --- Generate the whole chunk stream for a window once into cloudsync_payload_spool. --- Returns the number of chunks spooled. Idempotent: a prior complete fill is kept. --- Parameters are p_-prefixed so they never collide with the table's columns --- (an unqualified `stream_id` would otherwise be ambiguous with the parameter). -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_fill( - p_stream_id text, - p_since_db_version bigint, - p_filter_site_id bytea DEFAULT NULL, - p_exclude_filter_site_id boolean DEFAULT false -) RETURNS bigint AS $$ -DECLARE - existing bigint; - cnt bigint := 0; - rec record; -BEGIN - -- Stale-GC of abandoned streams. fill runs once per stream (coarse-grained), - -- so unlike per-fragment cleanup there is no O(n^2) risk and no throttle. - DELETE FROM cloudsync_payload_spool - WHERE stream_id IN ( - SELECT s.stream_id FROM cloudsync_payload_spool s - GROUP BY s.stream_id - HAVING max(s.created_at) < extract(epoch FROM now())::bigint - 86400); - - -- Idempotent: a prior complete fill stays as-is (fill is atomic, so rows - -- present == complete stream). - SELECT count(*) INTO existing FROM cloudsync_payload_spool WHERE stream_id = p_stream_id; - IF existing > 0 THEN - RETURN existing; - END IF; - - -- Generate the stream one chunk at a time. A cursor FOR loop fetches in - -- batches rather than materializing the whole SRF into a tuplestore. - FOR rec IN - SELECT c.payload, c.chunk_index, c.payload_size, c.db_version_min, c.db_version_max, c.watermark_db_version - FROM cloudsync_payload_chunks(p_since_db_version, p_filter_site_id, NULL, p_exclude_filter_site_id) c - LOOP - INSERT INTO cloudsync_payload_spool - (stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final) - VALUES (p_stream_id, rec.chunk_index, rec.payload, rec.payload_size, - rec.db_version_min, rec.db_version_max, rec.watermark_db_version, false); - cnt := cnt + 1; - END LOOP; - - IF cnt > 0 THEN - UPDATE cloudsync_payload_spool SET is_final = true - WHERE stream_id = p_stream_id - AND chunk_index = (SELECT max(x.chunk_index) FROM cloudsync_payload_spool x - WHERE x.stream_id = p_stream_id); - END IF; - - RETURN cnt; -END; -$$ LANGUAGE plpgsql VOLATILE; - --- cloudsync_payload_spool_drop(stream_id) -> number of chunks removed. -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop(p_stream_id text) -RETURNS bigint AS $$ -DECLARE - deleted bigint := 0; -BEGIN - DELETE FROM cloudsync_payload_spool WHERE stream_id = p_stream_id; - GET DIAGNOSTICS deleted = ROW_COUNT; - RETURN deleted; -END; -$$ LANGUAGE plpgsql VOLATILE; - --- cloudsync_payload_spool_drop_chunk(stream_id, chunk_index) -> number of chunks removed. --- Explicit early cleanup for one S3-backed chunk; stream-level TTL/GC remains unchanged. -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop_chunk(p_stream_id text, p_chunk_index bigint) -RETURNS bigint AS $$ -DECLARE - deleted bigint := 0; -BEGIN - IF p_stream_id IS NULL THEN - RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: stream_id is required.'; - END IF; - IF p_chunk_index IS NULL THEN - RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: chunk_index is required.'; - END IF; - - DELETE FROM cloudsync_payload_spool - WHERE stream_id = p_stream_id - AND chunk_index = p_chunk_index; - GET DIAGNOSTICS deleted = ROW_COUNT; - RETURN deleted; -END; -$$ LANGUAGE plpgsql VOLATILE; - -- Payload decoding and application CREATE OR REPLACE FUNCTION cloudsync_payload_decode(payload bytea) RETURNS integer diff --git a/src/postgresql/cloudsync_postgresql.c b/src/postgresql/cloudsync_postgresql.c index d402a52..3f62d2f 100644 --- a/src/postgresql/cloudsync_postgresql.c +++ b/src/postgresql/cloudsync_postgresql.c @@ -1213,6 +1213,46 @@ static bytea *payload_chunks_emit_pg_fragment(PayloadChunksState *st, cloudsync_ return result; } +// Set up fragment state for the currently-fetched oversized value so +// emit_pg_fragment can stream it. start_offset is the byte offset within the value +// to resume from (0 when first reaching it; >0 when a positional cursor resumes +// mid-value). frag_part is derived from the offset so a streamed and a resumed +// fragment carry the same part index. The plan (frag_target/frag_count) is a +// deterministic function of the row, so a resumed fragment tiles identically. +static void payload_chunks_pg_begin_fragment(PayloadChunksState *st, cloudsync_context *data, int64 start_offset) { + st->frag_total = VARSIZE_ANY_EXHDR(st->col_value); + st->frag_offset = start_offset; + st->frag_target = cloudsync_payload_fragment_data_size(data, + st->tbl, -1, + VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk), + st->col_name, -1, + st->col_version, st->db_version, + VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id), + st->cl, st->seq, + st->frag_total, 0, 1); + if (st->frag_target <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size"))); + for (int i = 0; i < CLOUDSYNC_PAYLOAD_FRAGMENT_SIZE_FIXPOINT_ITERATIONS; ++i) { + int count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target); + if (count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload requires too many fragments"))); + int planned = cloudsync_payload_fragment_data_size(data, + st->tbl, -1, + VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk), + st->col_name, -1, + st->col_version, st->db_version, + VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id), + st->cl, st->seq, + st->frag_total, count - 1, count); + if (planned <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size"))); + if (planned == st->frag_target) break; + st->frag_target = planned; + } + st->frag_count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target); + if (st->frag_count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("payload requires too many fragments"))); + st->frag_part = (st->frag_target > 0) ? (int)(start_offset / st->frag_target) : 0; + st->frag_checksum = pk_checksum(VARDATA_ANY(st->col_value), (size_t)st->frag_total); + st->frag_active = true; +} + static bytea *payload_chunks_build_pg_next(PayloadChunksState *st, cloudsync_context *data, int64 *rows, int64 *dbv_min, int64 *dbv_max) { *rows = *dbv_min = *dbv_max = 0; @@ -1237,37 +1277,7 @@ static bytea *payload_chunks_build_pg_next(PayloadChunksState *st, cloudsync_con if ((int64)row_size + (int64)header_size + CLOUDSYNC_PAYLOAD_CHUNK_SAFETY_MARGIN > st->max_size) { if (cloudsync_payload_context_nrows(payload) > 0) break; - st->frag_total = VARSIZE_ANY_EXHDR(st->col_value); - st->frag_offset = 0; - st->frag_part = 0; - st->frag_target = cloudsync_payload_fragment_data_size(data, - st->tbl, -1, - VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk), - st->col_name, -1, - st->col_version, st->db_version, - VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id), - st->cl, st->seq, - st->frag_total, 0, 1); - if (st->frag_target <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size"))); - for (int i = 0; i < CLOUDSYNC_PAYLOAD_FRAGMENT_SIZE_FIXPOINT_ITERATIONS; ++i) { - int count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target); - if (count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload requires too many fragments"))); - int planned = cloudsync_payload_fragment_data_size(data, - st->tbl, -1, - VARDATA_ANY(st->pk), VARSIZE_ANY_EXHDR(st->pk), - st->col_name, -1, - st->col_version, st->db_version, - VARDATA_ANY(st->site_id), VARSIZE_ANY_EXHDR(st->site_id), - st->cl, st->seq, - st->frag_total, count - 1, count); - if (planned <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg(CLOUDSYNC_ERRCODE_CHUNK_TOO_LARGE "payload fragment metadata exceeds max chunk size"))); - if (planned == st->frag_target) break; - st->frag_target = planned; - } - st->frag_count = cloudsync_payload_fragment_count(st->frag_total, st->frag_target); - if (st->frag_count <= 0) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("payload requires too many fragments"))); - st->frag_checksum = pk_checksum(VARDATA_ANY(st->col_value), (size_t)st->frag_total); - st->frag_active = true; + payload_chunks_pg_begin_fragment(st, data, 0); cloudsync_memory_free(payload); return payload_chunks_emit_pg_fragment(st, data, rows, dbv_min, dbv_max); } @@ -1327,6 +1337,15 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) { int64 since = PG_ARGISNULL(0) ? dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SEND_DBVERSION) : PG_GETARG_INT64(0); bytea *site_id = PG_ARGISNULL(1) ? NULL : PG_GETARG_BYTEA_PP(1); bool exclude = PG_ARGISNULL(3) ? false : PG_GETARG_BOOL(3); + // Positional resume cursor: when resume_db_version is given the scan starts + // at (resume_db_version, resume_seq) inclusive and the first chunk resumes a + // mid-value fragment at resume_frag_offset, instead of replaying from `since`. + // Lets the /check job page one chunk per round-trip with an O(1) seek and no + // spool table. + bool positional = !PG_ARGISNULL(4); + int64 resume_dbv = PG_ARGISNULL(4) ? 0 : PG_GETARG_INT64(4); + int64 resume_seq = PG_ARGISNULL(5) ? 0 : PG_GETARG_INT64(5); + int64 resume_frag = PG_ARGISNULL(6) ? 0 : PG_GETARG_INT64(6); // Site filter resolution: // exclude=true -> all sites except filter_site_id (CHECK path); site required // filter given -> only that site @@ -1361,23 +1380,53 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) { StringInfoData q; initStringInfo(&q); - if (exclude) { - // $1=since (into changes_select), $2=site to exclude, $3=until watermark - appendStringInfoString(&q, - "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " - "FROM cloudsync_changes_select($1,NULL) WHERE site_id <> $2 AND db_version <= $3 ORDER BY db_version, seq ASC"); + if (positional) { + // Inclusive positional lower bound (db_version, seq) >= (resume_dbv, + // resume_seq) within db_version <= until. $1=site, $2=until, $3=resume_dbv, + // $4=resume_seq. (seq >= matches the SQLite vtab's exact tiling; contrast + // with payload_blob_checked's exclusive seq > for its last-applied cursor.) + if (exclude) { + appendStringInfoString(&q, + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes_select(0,NULL) " + "WHERE site_id <> $1 AND db_version <= $2 AND (db_version > $3 OR (db_version = $3 AND seq >= $4)) " + "ORDER BY db_version, seq ASC"); + } else { + appendStringInfoString(&q, + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes_select(0,$1) " + "WHERE db_version <= $2 AND (db_version > $3 OR (db_version = $3 AND seq >= $4)) " + "ORDER BY db_version, seq ASC"); + } + Oid argtypes[4] = {BYTEAOID, INT8OID, INT8OID, INT8OID}; + Datum values[4] = {PointerGetDatum(site_id), Int64GetDatum(until), Int64GetDatum(resume_dbv), Int64GetDatum(resume_seq)}; + char nulls[4] = {' ', ' ', ' ', ' '}; + st->portal = SPI_cursor_open_with_args(NULL, q.data, 4, argtypes, values, nulls, true, 0); } else { - appendStringInfoString(&q, - "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " - "FROM cloudsync_changes_select($1,$2) WHERE db_version <= $3 ORDER BY db_version, seq ASC"); + if (exclude) { + // $1=since (into changes_select), $2=site to exclude, $3=until watermark + appendStringInfoString(&q, + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes_select($1,NULL) WHERE site_id <> $2 AND db_version <= $3 ORDER BY db_version, seq ASC"); + } else { + appendStringInfoString(&q, + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes_select($1,$2) WHERE db_version <= $3 ORDER BY db_version, seq ASC"); + } + Oid argtypes[3] = {INT8OID, BYTEAOID, INT8OID}; + Datum values[3] = {Int64GetDatum(since), PointerGetDatum(site_id), Int64GetDatum(until)}; + char nulls[3] = {' ', ' ', ' '}; + st->portal = SPI_cursor_open_with_args(NULL, q.data, 3, argtypes, values, nulls, true, 0); } - Oid argtypes[3] = {INT8OID, BYTEAOID, INT8OID}; - Datum values[3] = {Int64GetDatum(since), PointerGetDatum(site_id), Int64GetDatum(until)}; - char nulls[3] = {' ', ' ', ' '}; - st->portal = SPI_cursor_open_with_args(NULL, q.data, 3, argtypes, values, nulls, true, 0); pfree(q.data); if (!st->portal) ereport(ERROR, (errmsg("SPI_cursor_open failed"))); + // Resuming inside a value that was fragmented across chunks: the first row is + // that value; re-establish the fragment plan and skip to resume_frag. + if (positional && resume_frag > 0 && payload_chunks_fetch_current(st)) { + payload_chunks_pg_begin_fragment(st, data, resume_frag); + } + TupleDesc outdesc; if (get_call_result_type(fcinfo, NULL, &outdesc) != TYPEFUNC_COMPOSITE) ereport(ERROR, (errmsg("return type must be composite"))); st->outdesc = BlessTupleDesc(outdesc); @@ -1400,8 +1449,22 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) { SRF_RETURN_DONE(funcctx); } - Datum outvals[7]; - bool outnulls[7] = {false,false,false,false,false,false,false}; + // Resume point a stateless caller passes back to continue after this chunk. + // frag_active -> same value, next byte offset; otherwise peek the next row + // (buffered for the following build call): a row -> its (db_version, seq); + // end of stream -> this was the final chunk. + int64 next_dbv, next_seq, next_frag; + bool is_final; + if (st->frag_active) { + next_dbv = st->db_version; next_seq = st->seq; next_frag = st->frag_offset; is_final = false; + } else if (payload_chunks_fetch_current(st)) { + next_dbv = st->db_version; next_seq = st->seq; next_frag = 0; is_final = false; + } else { + next_dbv = st->watermark; next_seq = 0; next_frag = 0; is_final = true; + } + + Datum outvals[11]; + bool outnulls[11] = {false,false,false,false,false,false,false,false,false,false,false}; outvals[0] = PointerGetDatum(payload); outvals[1] = Int64GetDatum(st->chunk_index++); outvals[2] = Int64GetDatum(VARSIZE_ANY_EXHDR(payload)); @@ -1409,6 +1472,10 @@ Datum cloudsync_payload_chunks(PG_FUNCTION_ARGS) { outvals[4] = Int64GetDatum(dbv_min); outvals[5] = Int64GetDatum(dbv_max); outvals[6] = Int64GetDatum(st->watermark); + outvals[7] = Int64GetDatum(next_dbv); + outvals[8] = Int64GetDatum(next_seq); + outvals[9] = Int64GetDatum(next_frag); + outvals[10] = BoolGetDatum(is_final); HeapTuple outtup = heap_form_tuple(st->outdesc, outvals, outnulls); SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(outtup)); } diff --git a/src/postgresql/migrations/cloudsync--1.0--1.1.sql b/src/postgresql/migrations/cloudsync--1.0--1.1.sql index 9239722..1269bc9 100644 --- a/src/postgresql/migrations/cloudsync--1.0--1.1.sql +++ b/src/postgresql/migrations/cloudsync--1.0--1.1.sql @@ -13,7 +13,10 @@ CREATE OR REPLACE FUNCTION cloudsync_payload_chunks( since_db_version bigint DEFAULT NULL, filter_site_id bytea DEFAULT NULL, until_db_version bigint DEFAULT NULL, - exclude_filter_site_id boolean DEFAULT false + exclude_filter_site_id boolean DEFAULT false, + resume_db_version bigint DEFAULT NULL, + resume_seq bigint DEFAULT NULL, + resume_frag_offset bigint DEFAULT NULL ) RETURNS TABLE ( payload bytea, @@ -22,7 +25,11 @@ RETURNS TABLE ( rows bigint, db_version_min bigint, db_version_max bigint, - watermark_db_version bigint + watermark_db_version bigint, + next_db_version bigint, + next_seq bigint, + next_frag_offset bigint, + is_final boolean ) AS 'MODULE_PATHNAME', 'cloudsync_payload_chunks' LANGUAGE C VOLATILE; @@ -48,93 +55,3 @@ RETURNS bytea AS 'MODULE_PATHNAME', 'cloudsync_uuid_blob' LANGUAGE C IMMUTABLE; --- Download spool: the /check path fills a window's chunk stream once and pages it --- out one chunk per call so the network driver never re-materializes the whole --- stream. See cloudsync.sql.in for the rationale. -CREATE TABLE IF NOT EXISTS cloudsync_payload_spool ( - stream_id text NOT NULL, - chunk_index bigint NOT NULL, - payload bytea NOT NULL, - payload_size bigint NOT NULL, - db_version_min bigint NOT NULL, - db_version_max bigint NOT NULL, - watermark bigint NOT NULL, - is_final boolean NOT NULL DEFAULT false, - created_at bigint NOT NULL DEFAULT extract(epoch FROM now())::bigint, - PRIMARY KEY (stream_id, chunk_index) -); - -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_fill( - p_stream_id text, - p_since_db_version bigint, - p_filter_site_id bytea DEFAULT NULL, - p_exclude_filter_site_id boolean DEFAULT false -) RETURNS bigint AS $$ -DECLARE - existing bigint; - cnt bigint := 0; - rec record; -BEGIN - DELETE FROM cloudsync_payload_spool - WHERE stream_id IN ( - SELECT s.stream_id FROM cloudsync_payload_spool s - GROUP BY s.stream_id - HAVING max(s.created_at) < extract(epoch FROM now())::bigint - 86400); - - SELECT count(*) INTO existing FROM cloudsync_payload_spool WHERE stream_id = p_stream_id; - IF existing > 0 THEN - RETURN existing; - END IF; - - FOR rec IN - SELECT c.payload, c.chunk_index, c.payload_size, c.db_version_min, c.db_version_max, c.watermark_db_version - FROM cloudsync_payload_chunks(p_since_db_version, p_filter_site_id, NULL, p_exclude_filter_site_id) c - LOOP - INSERT INTO cloudsync_payload_spool - (stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final) - VALUES (p_stream_id, rec.chunk_index, rec.payload, rec.payload_size, - rec.db_version_min, rec.db_version_max, rec.watermark_db_version, false); - cnt := cnt + 1; - END LOOP; - - IF cnt > 0 THEN - UPDATE cloudsync_payload_spool SET is_final = true - WHERE stream_id = p_stream_id - AND chunk_index = (SELECT max(x.chunk_index) FROM cloudsync_payload_spool x - WHERE x.stream_id = p_stream_id); - END IF; - - RETURN cnt; -END; -$$ LANGUAGE plpgsql VOLATILE; - -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop(p_stream_id text) -RETURNS bigint AS $$ -DECLARE - deleted bigint := 0; -BEGIN - DELETE FROM cloudsync_payload_spool WHERE stream_id = p_stream_id; - GET DIAGNOSTICS deleted = ROW_COUNT; - RETURN deleted; -END; -$$ LANGUAGE plpgsql VOLATILE; - -CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop_chunk(p_stream_id text, p_chunk_index bigint) -RETURNS bigint AS $$ -DECLARE - deleted bigint := 0; -BEGIN - IF p_stream_id IS NULL THEN - RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: stream_id is required.'; - END IF; - IF p_chunk_index IS NULL THEN - RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: chunk_index is required.'; - END IF; - - DELETE FROM cloudsync_payload_spool - WHERE stream_id = p_stream_id - AND chunk_index = p_chunk_index; - GET DIAGNOSTICS deleted = ROW_COUNT; - RETURN deleted; -END; -$$ LANGUAGE plpgsql VOLATILE; diff --git a/src/sql.h b/src/sql.h index 3e3ca27..6837121 100644 --- a/src/sql.h +++ b/src/sql.h @@ -74,14 +74,6 @@ extern const char * const SQL_PAYLOAD_FRAGMENTS_SELECT; extern const char * const SQL_PAYLOAD_FRAGMENTS_DELETE; extern const char * const SQL_PAYLOAD_FRAGMENTS_CLEANUP_STALE; -extern const char * const SQL_PAYLOAD_SPOOL_CREATE_TABLE; -extern const char * const SQL_PAYLOAD_SPOOL_COUNT; -extern const char * const SQL_PAYLOAD_SPOOL_FILL_INSERT; -extern const char * const SQL_PAYLOAD_SPOOL_MARK_FINAL; -extern const char * const SQL_PAYLOAD_SPOOL_DELETE; -extern const char * const SQL_PAYLOAD_SPOOL_DELETE_CHUNK; -extern const char * const SQL_PAYLOAD_SPOOL_CLEANUP_STALE; - // BLOCKS (block-level LWW) extern const char * const SQL_BLOCKS_CREATE_TABLE; extern const char * const SQL_BLOCKS_UPSERT; diff --git a/src/sqlite/cloudsync_sqlite.c b/src/sqlite/cloudsync_sqlite.c index ba7a150..80d45cc 100644 --- a/src/sqlite/cloudsync_sqlite.c +++ b/src/sqlite/cloudsync_sqlite.c @@ -1144,6 +1144,14 @@ typedef struct { int value_header_len; const char *value_data; int64_t value_data_len; + // Positional-cursor outputs: the resume point AFTER the chunk currently held. + // These live in the per-scan reset region (after eof) so xFilter's bulk memset + // clears them. next_* is the (db_version, seq, frag_offset) a follow-up call + // passes back as resume_* to continue exactly where this chunk stopped. + int64_t next_dbv; + int64_t next_seq; + int64_t next_frag_offset; + bool is_final; } cloudsync_payload_chunks_cursor; static int payload_chunks_connect(sqlite3 *db, void *aux, int argc, const char *const *argv, sqlite3_vtab **vtab, char **err) { @@ -1151,7 +1159,13 @@ static int payload_chunks_connect(sqlite3 *db, void *aux, int argc, const char * int rc = sqlite3_declare_vtab(db, "CREATE TABLE x(payload BLOB, chunk_index INTEGER, payload_size INTEGER, rows INTEGER, " "db_version_min INTEGER, db_version_max INTEGER, watermark_db_version INTEGER, " - "since_db_version HIDDEN, site_id HIDDEN, until_db_version HIDDEN, exclude_filter_site_id HIDDEN)"); + "since_db_version HIDDEN, site_id HIDDEN, until_db_version HIDDEN, exclude_filter_site_id HIDDEN, " + // Positional-cursor outputs (cols 11..14): the resume point after the + // emitted chunk, plus a final-chunk flag. A stateless /check passes these + // back as the resume_* inputs (cols 15..17) to continue the drain without + // a spool table — O(1) seek per chunk instead of replaying from since. + "next_db_version INTEGER, next_seq INTEGER, next_frag_offset INTEGER, is_final INTEGER, " + "resume_db_version HIDDEN, resume_seq HIDDEN, resume_frag_offset HIDDEN)"); if (rc != SQLITE_OK) return rc; cloudsync_payload_chunks_vtab *p = sqlite3_malloc64(sizeof(*p)); if (!p) return SQLITE_NOMEM; @@ -1185,19 +1199,23 @@ static int payload_chunks_close(sqlite3_vtab_cursor *cursor) { static int payload_chunks_best_index(sqlite3_vtab *vtab, sqlite3_index_info *idxinfo) { UNUSED_PARAMETER(vtab); + // Assign argvIndex in a canonical hidden-column order so xFilter can read argv + // in a fixed order regardless of how SQLite presents constraints. idxNum bit k + // is set when handled_cols[k] is bound; xFilter reads argv in this same order. + // bit0=since_db_version(7) bit1=site_id(8) bit2=until_db_version(9) + // bit3=exclude_filter_site_id(10) bit4=resume_db_version(15) + // bit5=resume_seq(16) bit6=resume_frag_offset(17) + static const int handled_cols[] = {7, 8, 9, 10, 15, 16, 17}; int argv_index = 1; int idxnum = 0; - // Assign argvIndex in canonical hidden-column order (7..10) so xFilter can - // read argv in a fixed order regardless of how SQLite presents constraints. - // Hidden columns: 7=since_db_version, 8=site_id, 9=until_db_version, - // 10=exclude_filter_site_id. - for (int col = 7; col <= 10; ++col) { + for (size_t k = 0; k < sizeof(handled_cols) / sizeof(handled_cols[0]); ++k) { + int col = handled_cols[k]; for (int i = 0; i < idxinfo->nConstraint; ++i) { struct sqlite3_index_constraint *cn = &idxinfo->aConstraint[i]; if (!cn->usable || cn->op != SQLITE_INDEX_CONSTRAINT_EQ || cn->iColumn != col) continue; idxinfo->aConstraintUsage[i].argvIndex = argv_index++; idxinfo->aConstraintUsage[i].omit = 1; - idxnum |= (1 << (col - 7)); + idxnum |= (1 << k); break; // at most one constraint consumed per hidden column } } @@ -1249,6 +1267,33 @@ static int payload_chunks_plan_fragment(cloudsync_payload_chunks_cursor *c) { return SQLITE_OK; } +// Set up fragment state for the current source row (a single value larger than +// max_chunk_size) so emit_fragment can stream it. start_offset is the byte offset +// within the encoded value to resume from (0 when first reaching the value; +// >0 when a positional cursor resumes mid-value). frag_part is derived from the +// offset so the fragment's part index is consistent whether reached by streaming +// or by a seek. The plan (frag_target/frag_count) is a deterministic function of +// the row, so a resumed fragment tiles identically to a streamed one. +static int payload_chunks_begin_fragment(cloudsync_payload_chunks_cursor *c, int64_t start_offset) { + dbvalue_t *col_value = (dbvalue_t *)sqlite3_column_value(c->src, 3); + int type = database_value_type(col_value); + if (type != DBTYPE_TEXT && type != DBTYPE_BLOB) return SQLITE_TOOBIG; + int64_t raw_len = 0; + int header_len = cloudsync_payload_encoded_value_header(col_value, c->value_header, sizeof(c->value_header), &raw_len); + if (header_len <= 0) return SQLITE_ERROR; + c->value_header_len = header_len; + c->value_data = (const char *)database_value_blob(col_value); + c->value_data_len = raw_len; + c->frag_total = header_len + raw_len; + c->frag_offset = start_offset; + int rc = payload_chunks_plan_fragment(c); + if (rc != SQLITE_OK) return rc; + c->frag_part = (c->frag_target > 0) ? (int)(start_offset / c->frag_target) : 0; + c->frag_checksum = cloudsync_payload_encoded_value_checksum(col_value); + c->frag_active = true; + return SQLITE_OK; +} + static int payload_chunks_emit_fragment(cloudsync_payload_chunks_cursor *c) { cloudsync_context *data = c->vtab->data; if (c->payload) { cloudsync_memory_free(c->payload); c->payload = NULL; } @@ -1321,23 +1366,9 @@ static int payload_chunks_build_next(cloudsync_payload_chunks_cursor *c) { if ((int64_t)row_size + (int64_t)payload_header_size + CLOUDSYNC_PAYLOAD_CHUNK_SAFETY_MARGIN > max_size) { if (cloudsync_payload_context_nrows(payload) > 0) break; - dbvalue_t *col_value = (dbvalue_t *)rowv[3]; - int type = database_value_type(col_value); - if (type != DBTYPE_TEXT && type != DBTYPE_BLOB) { cloudsync_memory_free(payload); return SQLITE_TOOBIG; } - int64_t raw_len = 0; - int header_len = cloudsync_payload_encoded_value_header(col_value, c->value_header, sizeof(c->value_header), &raw_len); - if (header_len <= 0) { cloudsync_memory_free(payload); return SQLITE_ERROR; } - c->value_header_len = header_len; - c->value_data = (const char *)database_value_blob(col_value); - c->value_data_len = raw_len; - c->frag_total = header_len + raw_len; - c->frag_offset = 0; - c->frag_part = 0; - rc = payload_chunks_plan_fragment(c); - if (rc != SQLITE_OK) { cloudsync_memory_free(payload); return rc; } - c->frag_checksum = cloudsync_payload_encoded_value_checksum(col_value); - c->frag_active = true; cloudsync_memory_free(payload); + rc = payload_chunks_begin_fragment(c, 0); + if (rc != SQLITE_OK) return rc; return payload_chunks_emit_fragment(c); } @@ -1360,6 +1391,38 @@ static int payload_chunks_build_next(cloudsync_payload_chunks_cursor *c) { return SQLITE_OK; } +// Record the resume point a stateless caller passes back to continue after the +// chunk just built. Reads the source statement, which is positioned at the next +// unconsumed row (or the same row when a value is still mid-fragment). Must be +// called only after build_next produced a chunk (i.e. !eof). +static void payload_chunks_set_next_cursor(cloudsync_payload_chunks_cursor *c) { + if (c->frag_active) { + // Mid-value: resume the same row at the next byte offset. + c->next_dbv = sqlite3_column_int64(c->src, 5); + c->next_seq = sqlite3_column_int64(c->src, 8); + c->next_frag_offset = c->frag_offset; + c->is_final = false; + } else if (c->has_row) { + // Row boundary: the next chunk starts at the current (unconsumed) row. + c->next_dbv = sqlite3_column_int64(c->src, 5); + c->next_seq = sqlite3_column_int64(c->src, 8); + c->next_frag_offset = 0; + c->is_final = false; + } else { + // Stream exhausted: this was the last chunk of the window. + c->next_dbv = c->watermark; + c->next_seq = 0; + c->next_frag_offset = 0; + c->is_final = true; + } +} + +static int payload_chunks_advance(cloudsync_payload_chunks_cursor *c) { + int rc = payload_chunks_build_next(c); + if (rc == SQLITE_OK && !c->eof) payload_chunks_set_next_cursor(c); + return rc; +} + static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const char *idxstr, int argc, sqlite3_value **argv) { UNUSED_PARAMETER(idxstr); UNUSED_PARAMETER(argc); cloudsync_payload_chunks_cursor *c = (cloudsync_payload_chunks_cursor *)cursor; @@ -1378,6 +1441,13 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const bool site_id_given = false; int64_t until = 0; bool exclude = false; + // Positional resume cursor (cols 15..17): when resume_db_version is bound the + // scan starts at (resume_db_version, resume_seq) inclusive and the first chunk + // resumes a mid-value fragment at resume_frag_offset, instead of replaying the + // whole window from `since`. Lets a stateless /check page the stream with an + // O(1) seek per call and no spool table. + bool positional = false; + int64_t resume_dbv = 0, resume_seq = 0, resume_frag = 0; if (idxnum & 1) since = sqlite3_value_int64(argv[argi++]); if (idxnum & 2) { if (sqlite3_value_type(argv[argi]) != SQLITE_NULL) { @@ -1389,6 +1459,9 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const } if (idxnum & 4) until = sqlite3_value_int64(argv[argi++]); if (idxnum & 8) exclude = (sqlite3_value_int(argv[argi++]) != 0); + if (idxnum & 16) { resume_dbv = sqlite3_value_int64(argv[argi++]); positional = true; } + if (idxnum & 32) resume_seq = sqlite3_value_int64(argv[argi++]); + if (idxnum & 64) resume_frag = sqlite3_value_int64(argv[argi++]); // Resolve the site filter: // exclude=true -> all sites except filter_site_id (CHECK path); site required @@ -1421,24 +1494,50 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const } c->watermark = until; - char *sql = sqlite3_mprintf( - "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " - "FROM cloudsync_changes WHERE db_version>? AND site_id%s? AND db_version<=? ORDER BY db_version, seq ASC", - site_op); + // Window upper bound is always `until`. The lower bound is either the legacy + // exclusive `since` (db_version > since) or the inclusive positional cursor + // (db_version, seq) >= (resume_dbv, resume_seq). + char *sql; + if (positional) { + sql = sqlite3_mprintf( + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes WHERE db_version<=? AND site_id%s? AND " + "(db_version>? OR (db_version=? AND seq>=?)) ORDER BY db_version, seq ASC", + site_op); + } else { + sql = sqlite3_mprintf( + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes WHERE db_version>? AND site_id%s? AND db_version<=? ORDER BY db_version, seq ASC", + site_op); + } if (!sql) return SQLITE_NOMEM; int rc = sqlite3_prepare_v2(c->vtab->db, sql, -1, &c->src, NULL); sqlite3_free(sql); if (rc != SQLITE_OK) return rc; - sqlite3_bind_int64(c->src, 1, since); - sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT); - sqlite3_bind_int64(c->src, 3, until); + if (positional) { + sqlite3_bind_int64(c->src, 1, until); + sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT); + sqlite3_bind_int64(c->src, 3, resume_dbv); + sqlite3_bind_int64(c->src, 4, resume_dbv); + sqlite3_bind_int64(c->src, 5, resume_seq); + } else { + sqlite3_bind_int64(c->src, 1, since); + sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT); + sqlite3_bind_int64(c->src, 3, until); + } rc = payload_chunks_step_source(c); if (rc != SQLITE_OK) return rc; - return payload_chunks_build_next(c); + // Resuming inside a value that was fragmented across chunks: the first row is + // that value; re-establish the fragment plan and skip to resume_frag. + if (positional && resume_frag > 0 && c->has_row) { + rc = payload_chunks_begin_fragment(c, resume_frag); + if (rc != SQLITE_OK) return rc; + } + return payload_chunks_advance(c); } static int payload_chunks_next(sqlite3_vtab_cursor *cursor) { - return payload_chunks_build_next((cloudsync_payload_chunks_cursor *)cursor); + return payload_chunks_advance((cloudsync_payload_chunks_cursor *)cursor); } static int payload_chunks_eof(sqlite3_vtab_cursor *cursor) { @@ -1455,6 +1554,10 @@ static int payload_chunks_column(sqlite3_vtab_cursor *cursor, sqlite3_context *c case 4: sqlite3_result_int64(ctx, c->dbv_min); break; case 5: sqlite3_result_int64(ctx, c->dbv_max); break; case 6: sqlite3_result_int64(ctx, c->watermark); break; + case 11: sqlite3_result_int64(ctx, c->next_dbv); break; + case 12: sqlite3_result_int64(ctx, c->next_seq); break; + case 13: sqlite3_result_int64(ctx, c->next_frag_offset); break; + case 14: sqlite3_result_int(ctx, c->is_final ? 1 : 0); break; default: sqlite3_result_null(ctx); break; } return SQLITE_OK; @@ -1749,152 +1852,6 @@ void dbsync_payload_load (sqlite3_context *context, int argc, sqlite3_value **ar } #endif -// MARK: - Download spool - - -// Abandoned download streams (a client that started a chunked /check drain and -// never finished) are reaped after this many seconds. Matches the v3 fragment -// stale window. -#define CLOUDSYNC_PAYLOAD_SPOOL_STALE_SECONDS (24*60*60) - -// cloudsync_payload_spool_fill(stream_id, since_db_version, filter_site_id, exclude) -// Generate the whole chunk stream for a window once into cloudsync_payload_spool -// so the /check path can page it out one chunk per call. Returns the number of -// chunks spooled for stream_id. Idempotent: a prior complete fill is kept as-is. -static void dbsync_payload_spool_fill (sqlite3_context *context, int argc, sqlite3_value **argv) { - DEBUG_FUNCTION("cloudsync_payload_spool_fill"); - UNUSED_PARAMETER(argc); - - sqlite3 *db = sqlite3_context_db_handle(context); - if (sqlite3_value_type(argv[0]) == SQLITE_NULL) { - sqlite3_result_error(context, "cloudsync_payload_spool_fill: stream_id is required.", -1); - return; - } - const char *stream_id = (const char *)sqlite3_value_text(argv[0]); - int stream_id_len = sqlite3_value_bytes(argv[0]); - int64_t since = sqlite3_value_int64(argv[1]); - bool site_given = (sqlite3_value_type(argv[2]) != SQLITE_NULL); - int exclude = (sqlite3_value_int(argv[3]) != 0); - - sqlite3_stmt *stmt = NULL; - int rc = sqlite3_exec(db, SQL_PAYLOAD_SPOOL_CREATE_TABLE, NULL, NULL, NULL); - if (rc != SQLITE_OK) goto error; - - // Stale-GC of abandoned streams (coarse-grained, no throttle needed). - if (sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_CLEANUP_STALE, -1, &stmt, NULL) == SQLITE_OK) { - sqlite3_bind_int64(stmt, 1, (int64_t)time(NULL) - CLOUDSYNC_PAYLOAD_SPOOL_STALE_SECONDS); - sqlite3_step(stmt); - } - sqlite3_finalize(stmt); stmt = NULL; - - // Atomic fill: a partial/failed generation must never persist, so that the - // idempotency check below ("rows present == complete stream") holds. - rc = sqlite3_exec(db, "SAVEPOINT cloudsync_spool_fill;", NULL, NULL, NULL); - if (rc != SQLITE_OK) goto error; - - int64_t count = 0; - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_COUNT, -1, &stmt, NULL); - if (rc != SQLITE_OK) goto rollback; - sqlite3_bind_text(stmt, 1, stream_id, stream_id_len, SQLITE_TRANSIENT); - if (sqlite3_step(stmt) == SQLITE_ROW) count = sqlite3_column_int64(stmt, 0); - sqlite3_finalize(stmt); stmt = NULL; - - if (count == 0) { - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_FILL_INSERT, -1, &stmt, NULL); - if (rc != SQLITE_OK) goto rollback; - sqlite3_bind_text(stmt, 1, stream_id, stream_id_len, SQLITE_TRANSIENT); - sqlite3_bind_int64(stmt, 2, since); - if (site_given) sqlite3_bind_blob(stmt, 3, sqlite3_value_blob(argv[2]), sqlite3_value_bytes(argv[2]), SQLITE_TRANSIENT); - else sqlite3_bind_null(stmt, 3); - sqlite3_bind_int(stmt, 4, exclude); - rc = sqlite3_step(stmt); - sqlite3_finalize(stmt); stmt = NULL; - if (rc != SQLITE_DONE) goto rollback; - - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_MARK_FINAL, -1, &stmt, NULL); - if (rc != SQLITE_OK) goto rollback; - sqlite3_bind_text(stmt, 1, stream_id, stream_id_len, SQLITE_TRANSIENT); - rc = sqlite3_step(stmt); - sqlite3_finalize(stmt); stmt = NULL; - if (rc != SQLITE_DONE) goto rollback; - - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_COUNT, -1, &stmt, NULL); - if (rc != SQLITE_OK) goto rollback; - sqlite3_bind_text(stmt, 1, stream_id, stream_id_len, SQLITE_TRANSIENT); - if (sqlite3_step(stmt) == SQLITE_ROW) count = sqlite3_column_int64(stmt, 0); - sqlite3_finalize(stmt); stmt = NULL; - } - - sqlite3_exec(db, "RELEASE cloudsync_spool_fill;", NULL, NULL, NULL); - sqlite3_result_int64(context, count); - return; - -rollback: - sqlite3_finalize(stmt); - sqlite3_result_error(context, sqlite3_errmsg(db), -1); - sqlite3_result_error_code(context, rc); - sqlite3_exec(db, "ROLLBACK TO cloudsync_spool_fill; RELEASE cloudsync_spool_fill;", NULL, NULL, NULL); - return; - -error: - sqlite3_result_error(context, sqlite3_errmsg(db), -1); - sqlite3_result_error_code(context, rc); -} - -// cloudsync_payload_spool_drop(stream_id) -> number of chunks removed. -// Called once a stream has been fully delivered/acked (or to force-evict). -static void dbsync_payload_spool_drop (sqlite3_context *context, int argc, sqlite3_value **argv) { - DEBUG_FUNCTION("cloudsync_payload_spool_drop"); - UNUSED_PARAMETER(argc); - - sqlite3 *db = sqlite3_context_db_handle(context); - if (sqlite3_value_type(argv[0]) == SQLITE_NULL) { - sqlite3_result_error(context, "cloudsync_payload_spool_drop: stream_id is required.", -1); - return; - } - - int rc = sqlite3_exec(db, SQL_PAYLOAD_SPOOL_CREATE_TABLE, NULL, NULL, NULL); - if (rc != SQLITE_OK) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - - sqlite3_stmt *stmt = NULL; - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_DELETE, -1, &stmt, NULL); - if (rc != SQLITE_OK) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - sqlite3_bind_text(stmt, 1, (const char *)sqlite3_value_text(argv[0]), sqlite3_value_bytes(argv[0]), SQLITE_TRANSIENT); - rc = sqlite3_step(stmt); - sqlite3_finalize(stmt); - if (rc != SQLITE_DONE) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - sqlite3_result_int64(context, sqlite3_changes(db)); -} - -// cloudsync_payload_spool_drop_chunk(stream_id, chunk_index) -> number of chunks removed. -// Called after one S3-backed chunk has been safely persisted outside the spool. -static void dbsync_payload_spool_drop_chunk (sqlite3_context *context, int argc, sqlite3_value **argv) { - DEBUG_FUNCTION("cloudsync_payload_spool_drop_chunk"); - UNUSED_PARAMETER(argc); - - sqlite3 *db = sqlite3_context_db_handle(context); - if (sqlite3_value_type(argv[0]) == SQLITE_NULL) { - sqlite3_result_error(context, "cloudsync_payload_spool_drop_chunk: stream_id is required.", -1); - return; - } - if (sqlite3_value_type(argv[1]) == SQLITE_NULL) { - sqlite3_result_error(context, "cloudsync_payload_spool_drop_chunk: chunk_index is required.", -1); - return; - } - - int rc = sqlite3_exec(db, SQL_PAYLOAD_SPOOL_CREATE_TABLE, NULL, NULL, NULL); - if (rc != SQLITE_OK) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - - sqlite3_stmt *stmt = NULL; - rc = sqlite3_prepare_v2(db, SQL_PAYLOAD_SPOOL_DELETE_CHUNK, -1, &stmt, NULL); - if (rc != SQLITE_OK) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - sqlite3_bind_text(stmt, 1, (const char *)sqlite3_value_text(argv[0]), sqlite3_value_bytes(argv[0]), SQLITE_TRANSIENT); - sqlite3_bind_int64(stmt, 2, sqlite3_value_int64(argv[1])); - rc = sqlite3_step(stmt); - sqlite3_finalize(stmt); - if (rc != SQLITE_DONE) { sqlite3_result_error(context, sqlite3_errmsg(db), -1); return; } - sqlite3_result_int64(context, sqlite3_changes(db)); -} - // MARK: - Register - int dbsync_register_with_flags (sqlite3 *db, const char *name, void (*xfunc)(sqlite3_context*,int,sqlite3_value**), void (*xstep)(sqlite3_context*,int,sqlite3_value**), void (*xfinal)(sqlite3_context*), int nargs, int flags, char **pzErrMsg, void *ctx, void (*ctx_free)(void *)) { @@ -2216,14 +2173,6 @@ int dbsync_register_functions (sqlite3 *db, char **pzErrMsg) { rc = dbsync_register_function(db, "cloudsync_payload_blob_checked", dbsync_payload_blob_checked, 5, pzErrMsg, ctx, NULL); if (rc != SQLITE_OK) return rc; - // Download spool (server-side /check chunk staging) - rc = dbsync_register_function(db, "cloudsync_payload_spool_fill", dbsync_payload_spool_fill, 4, pzErrMsg, ctx, NULL); - if (rc != SQLITE_OK) return rc; - rc = dbsync_register_function(db, "cloudsync_payload_spool_drop", dbsync_payload_spool_drop, 1, pzErrMsg, ctx, NULL); - if (rc != SQLITE_OK) return rc; - rc = dbsync_register_function(db, "cloudsync_payload_spool_drop_chunk", dbsync_payload_spool_drop_chunk, 2, pzErrMsg, ctx, NULL); - if (rc != SQLITE_OK) return rc; - #ifdef CLOUDSYNC_DESKTOP_OS rc = dbsync_register_function(db, "cloudsync_payload_save", dbsync_payload_save, 1, pzErrMsg, ctx, NULL); if (rc != SQLITE_OK) return rc; diff --git a/src/sqlite/sql_sqlite.c b/src/sqlite/sql_sqlite.c index 66d163f..f01a307 100644 --- a/src/sqlite/sql_sqlite.c +++ b/src/sqlite/sql_sqlite.c @@ -311,52 +311,6 @@ const char * const SQL_PAYLOAD_FRAGMENTS_CLEANUP_STALE = "SELECT value_id FROM cloudsync_payload_fragments GROUP BY value_id " "HAVING COUNT(*) < MAX(part_count));"; -// MARK: Payload download spool (server-side /check chunk staging) - -// One row per transport chunk of a download stream. Keyed by (stream_id, -// chunk_index); the server fills a whole window once and pages it out one chunk -// per /check call so the network driver never re-materializes the whole stream. -const char * const SQL_PAYLOAD_SPOOL_CREATE_TABLE = - "CREATE TABLE IF NOT EXISTS cloudsync_payload_spool (" - "stream_id TEXT NOT NULL, chunk_index INTEGER NOT NULL, " - "payload BLOB NOT NULL, payload_size INTEGER NOT NULL, " - "db_version_min INTEGER NOT NULL, db_version_max INTEGER NOT NULL, " - "watermark INTEGER NOT NULL, is_final INTEGER NOT NULL DEFAULT 0, " - "created_at INTEGER NOT NULL DEFAULT (unixepoch()), " - "PRIMARY KEY(stream_id, chunk_index)) WITHOUT ROWID;"; - -const char * const SQL_PAYLOAD_SPOOL_COUNT = - "SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id=?;"; - -// Generate the window's chunk stream once (the vtab streams row-by-row under -// INSERT...SELECT, so peak memory stays at one chunk). until_db_version is left -// unconstrained so the vtab pins the watermark to the current max db_version. -const char * const SQL_PAYLOAD_SPOOL_FILL_INSERT = - "INSERT INTO cloudsync_payload_spool " - "(stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final) " - "SELECT ?1, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark_db_version, 0 " - "FROM cloudsync_payload_chunks " - "WHERE since_db_version=?2 AND site_id=?3 AND exclude_filter_site_id=?4;"; - -const char * const SQL_PAYLOAD_SPOOL_MARK_FINAL = - "UPDATE cloudsync_payload_spool SET is_final=1 " - "WHERE stream_id=?1 AND chunk_index=(" - "SELECT MAX(chunk_index) FROM cloudsync_payload_spool WHERE stream_id=?1);"; - -const char * const SQL_PAYLOAD_SPOOL_DELETE = - "DELETE FROM cloudsync_payload_spool WHERE stream_id=?;"; - -const char * const SQL_PAYLOAD_SPOOL_DELETE_CHUNK = - "DELETE FROM cloudsync_payload_spool WHERE stream_id=? AND chunk_index=?;"; - -// Drop whole abandoned streams (every chunk older than the cutoff). fill is -// coarse-grained (once per stream), so unlike per-fragment cleanup there is no -// O(n^2) risk and no throttle is needed. -const char * const SQL_PAYLOAD_SPOOL_CLEANUP_STALE = - "DELETE FROM cloudsync_payload_spool WHERE stream_id IN (" - "SELECT stream_id FROM cloudsync_payload_spool GROUP BY stream_id " - "HAVING MAX(created_at) < ?);"; - // MARK: Blocks (block-level LWW) const char * const SQL_BLOCKS_CREATE_TABLE = diff --git a/test/chunk_bench.c b/test/chunk_bench.c new file mode 100644 index 0000000..4321ac2 --- /dev/null +++ b/test/chunk_bench.c @@ -0,0 +1,177 @@ +// +// chunk_bench.c +// cloudsync +// +// Local-only benchmark for the positional /check drain: build a window of N +// chunks and time paging the whole window one chunk per call via the +// (resume_db_version, resume_seq, resume_frag_offset) cursor on +// cloudsync_payload_chunks. Reports wall time and per-chunk cost so the +// computational growth of the drain (currently O(N^2): each resume re-scans +// cloudsync_changes) can be tracked — e.g. to confirm a future indexed +// (db_version, seq) seek flattens it to O(N). +// +// Env: CHUNK_BENCH_ROWS (default 400), CHUNK_BENCH_ROW_BYTES (default 60000), +// CHUNK_BENCH_TXNS (default 1; rows split across this many db_versions), +// CHUNK_BENCH_REPEATS (default 5), CHUNK_BENCH_CHUNK_SIZE (default 262144). +// + +#include +#include +#include +#include +#include +#include "sqlite3.h" + +#define DB_PATH "dist/chunk-bench.sqlite" +#define EXT_PATH "./dist/cloudsync" + +static double monotonic_ms(void) { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return ((double)ts.tv_sec * 1000.0) + ((double)ts.tv_nsec / 1000000.0); +} + +static int env_int(const char *name, int dflt) { + const char *v = getenv(name); + if (!v || !*v) return dflt; + char *end = NULL; + long p = strtol(v, &end, 10); + if (!end || *end != '\0' || p <= 0) return dflt; + return (int)p; +} + +static int db_exec(sqlite3 *db, const char *sql) { + char *err = NULL; + int rc = sqlite3_exec(db, sql, NULL, NULL, &err); + if (rc != SQLITE_OK) { + fprintf(stderr, "exec failed: %s: %s\n", sql, err ? err : sqlite3_errmsg(db)); + sqlite3_free(err); + } + return rc; +} + +// Drain the whole window via the positional cursor, one chunk per query. Returns +// the chunk count and accumulates total payload bytes touched into *bytes. +static int drain_positional(sqlite3 *db, int *chunks_out, long long *bytes_out) { + const char *first_sql = + "SELECT payload, next_db_version, next_seq, next_frag_offset, is_final, watermark_db_version " + "FROM cloudsync_payload_chunks WHERE since_db_version=0 LIMIT 1;"; + const char *resume_sql = + "SELECT payload, next_db_version, next_seq, next_frag_offset, is_final " + "FROM cloudsync_payload_chunks " + "WHERE until_db_version=?1 AND resume_db_version=?2 AND resume_seq=?3 AND resume_frag_offset=?4 LIMIT 1;"; + sqlite3_stmt *first = NULL, *resume = NULL; + int rc = sqlite3_prepare_v2(db, first_sql, -1, &first, NULL); + if (rc != SQLITE_OK) goto done; + rc = sqlite3_prepare_v2(db, resume_sql, -1, &resume, NULL); + if (rc != SQLITE_OK) goto done; + + int chunks = 0; + long long bytes = 0; + long long watermark = 0, rdbv = 0, rseq = 0, rfrag = 0; + bool is_final = false; + + rc = sqlite3_step(first); + if (rc == SQLITE_ROW) { + bytes += sqlite3_column_bytes(first, 0); + rdbv = sqlite3_column_int64(first, 1); + rseq = sqlite3_column_int64(first, 2); + rfrag = sqlite3_column_int64(first, 3); + is_final = sqlite3_column_int(first, 4) != 0; + watermark = sqlite3_column_int64(first, 5); + chunks++; + } else if (rc == SQLITE_DONE) { + rc = SQLITE_OK; + goto done; // empty window + } else { + goto done; + } + + while (!is_final) { + sqlite3_reset(resume); + sqlite3_bind_int64(resume, 1, watermark); + sqlite3_bind_int64(resume, 2, rdbv); + sqlite3_bind_int64(resume, 3, rseq); + sqlite3_bind_int64(resume, 4, rfrag); + rc = sqlite3_step(resume); + if (rc != SQLITE_ROW) { if (rc == SQLITE_DONE) rc = SQLITE_OK; break; } + bytes += sqlite3_column_bytes(resume, 0); + rdbv = sqlite3_column_int64(resume, 1); + rseq = sqlite3_column_int64(resume, 2); + rfrag = sqlite3_column_int64(resume, 3); + is_final = sqlite3_column_int(resume, 4) != 0; + chunks++; + } + rc = SQLITE_OK; + *chunks_out = chunks; + *bytes_out = bytes; + +done: + if (first) sqlite3_finalize(first); + if (resume) sqlite3_finalize(resume); + return rc; +} + +int main(void) { + int rows = env_int("CHUNK_BENCH_ROWS", 400); + int row_bytes = env_int("CHUNK_BENCH_ROW_BYTES", 60000); + int repeats = env_int("CHUNK_BENCH_REPEATS", 5); + int chunk_size = env_int("CHUNK_BENCH_CHUNK_SIZE", 262144); + + remove(DB_PATH); + sqlite3 *db = NULL; + if (sqlite3_open(DB_PATH, &db) != SQLITE_OK) { fprintf(stderr, "open failed\n"); return 1; } + if (sqlite3_enable_load_extension(db, 1) != SQLITE_OK) return 1; + if (db_exec(db, "SELECT load_extension('" EXT_PATH "');") != SQLITE_OK) return 1; + + char setup[256]; + snprintf(setup, sizeof(setup), + "CREATE TABLE chunk_bench (id TEXT PRIMARY KEY, body BLOB);" + "SELECT cloudsync_init('chunk_bench');" + "SELECT cloudsync_set('payload_max_chunk_size', '%d');", chunk_size); + if (db_exec(db, setup) != SQLITE_OK) return 1; + + // Split the rows across CHUNK_BENCH_TXNS transactions: each is one db_version, + // so TXNS=1 is the pathological single-version window and TXNS=rows is the + // many-versions case a real /check window resembles. Incompressible bodies keep + // the window many-chunked. + int txns = env_int("CHUNK_BENCH_TXNS", 1); + if (txns < 1) txns = 1; + if (txns > rows) txns = rows; + int idbase = 0; + for (int t = 0; t < txns; ++t) { + int n = rows / txns + (t < rows % txns ? 1 : 0); + if (n <= 0) continue; + char insert[256]; + snprintf(insert, sizeof(insert), + "WITH RECURSIVE c(i) AS (SELECT %d UNION ALL SELECT i+1 FROM c WHERE i < %d) " + "INSERT INTO chunk_bench(id, body) SELECT printf('row-%%06d', i), randomblob(%d) FROM c;", + idbase + 1, idbase + n, row_bytes); + if (db_exec(db, insert) != SQLITE_OK) return 1; + idbase += n; + } + + int chunks = 0; + long long bytes = 0; + double best = 1e18, sum = 0; + for (int r = 0; r < repeats; ++r) { + double t0 = monotonic_ms(); + if (drain_positional(db, &chunks, &bytes) != SQLITE_OK) { fprintf(stderr, "positional drain failed\n"); return 1; } + double dt = monotonic_ms() - t0; + sum += dt; if (dt < best) best = dt; + } + + double mean = sum / repeats; + printf("\nPositional /check drain benchmark (local SQLite, no network)\n"); + printf("rows: %d row_bytes: %d txns: %d chunk_size: %d repeats: %d\n", + rows, row_bytes, txns, chunk_size, repeats); + printf("chunks: %d payload_bytes: %lld\n", chunks, bytes); + printf("drain: best=%.2f ms mean=%.2f ms\n", best, mean); + if (chunks > 0) + printf("per-chunk: best=%.3f ms throughput: %.1f MB/s\n", + best / chunks, (double)bytes / 1024.0 / 1024.0 / (best / 1000.0)); + + sqlite3_close(db); + remove(DB_PATH); + return 0; +} diff --git a/test/integration.c b/test/integration.c index 8700524..fc39efb 100644 --- a/test/integration.c +++ b/test/integration.c @@ -236,6 +236,35 @@ int db_expect_min (sqlite3 *db, const char *sql, int expect_min) { return SQLITE_OK; } +// Run a send that is expected to succeed and assert it didn't fail at the protocol +// level: status is not "error" and the server reported no per-chunk failure +// (send.lastFailure is omitted on success). Catches a server-reported apply/check +// failure that doesn't raise a SQL error — invisible to a plain db_exec of the send. +int db_send_ok (sqlite3 *db) { + return db_expect_int(db, + "SELECT (j ->> '$.send.status') <> 'error' AND (j ->> '$.send.lastFailure') IS NULL " + "FROM (SELECT cloudsync_network_send_changes() AS j);", 1); +} + +// Send, then poll until the server's optimistic version (send.serverVersion) catches +// up to send.localVersion — the change is durably covered with no gap. Robust to the +// server's asynchronous apply: the first call sends, later calls are no-ops that +// re-read status. Fails if it has not converged within max_attempts. +int db_send_await_converge (sqlite3 *db, int max_attempts, int delay_ms) { + const char *sql = + "SELECT (j ->> '$.send.serverVersion') >= (j ->> '$.send.localVersion') " + "FROM (SELECT cloudsync_network_send_changes() AS j);"; + for (int i = 0; i < max_attempts; i++) { + int converged = 0; + int rc = db_select_int(db, sql, &converged); + if (rc != SQLITE_OK) return rc; + if (converged) return SQLITE_OK; + if (i + 1 < max_attempts) sqlite3_sleep(delay_ms); + } + printf("Error: send did not converge (serverVersion < localVersion) after %d attempts\n", max_attempts); + return SQLITE_ERROR; +} + int integration_network_init(sqlite3 *db, const char *database_id, char *network_init, size_t network_init_len) { if (!database_id) { fprintf(stderr, "Error: integration database ID not set.\n"); @@ -528,7 +557,7 @@ int test_enable_disable(const char *db_path) { rc = db_exec(db, set_apikey); RCHECK } - rc = db_exec(db, "SELECT cloudsync_network_send_changes();"); RCHECK + rc = db_send_ok(db); RCHECK rc = db_exec(db, "SELECT cloudsync_cleanup('users');"); RCHECK rc = db_exec(db, "SELECT cloudsync_cleanup('activities');"); RCHECK rc = db_exec(db, "SELECT cloudsync_cleanup('workouts');"); RCHECK @@ -563,12 +592,80 @@ int test_enable_disable(const char *db_path) { rc = db_expect_int(db2, sql, 1); RCHECK rc = db_exec(db2, "SELECT cloudsync_terminate();"); RCHECK - + sqlite3_close(db2); ABORT_TEST } +// Reproduces the spurious-gap bug in the send path: when the local db_version clock +// has been advanced past the site's own changes — as happens when applied remote +// changes bump the clock — the send announces only the change's own db_version range, +// so the skipped versions stay a gap in the server's per-site coverage and +// lastOptimisticVersion can never reach localVersion. cloudsync_db_version_next() +// forces the jump deterministically, no second database required. Expected to FAIL +// until the send announces the covered window [last_sent+1 .. watermark]. +int test_send_gap_from_clock_hole(const char *db_path) { + sqlite3 *db = NULL; + int rc = open_load_ext(db_path, &db); RCHECK + rc = db_init(db); RCHECK // create users/activities/workouts (this db is fresh) + + char value[UUID_STR_MAXLEN]; + cloudsync_uuid_v7_string(value, true); + char sql[256]; + + rc = db_exec(db, "SELECT cloudsync_init('users');"); RCHECK + rc = db_exec(db, "SELECT cloudsync_init('activities');"); RCHECK + rc = db_exec(db, "SELECT cloudsync_init('workouts');"); RCHECK + + // Force the next local change to land at db_version 10, leaving 1..9 with no + // local-site change (the "hole" that merging applied remote changes would create). + rc = db_exec(db, "SELECT cloudsync_db_version_next(10);"); RCHECK + + snprintf(sql, sizeof(sql), "INSERT INTO users (id, name) VALUES ('%s', '%s');", value, value); + rc = db_exec(db, sql); RCHECK + + // sanity: the change really landed at db_version 10, so there is a leading hole + rc = db_expect_int(db, "SELECT cloudsync_db_version();", 10); RCHECK + + // init network + char network_init[1024]; + const char* test_db_id = getenv("INTEGRATION_TEST_DATABASE_ID"); + if (!test_db_id) { + fprintf(stderr, "Error: INTEGRATION_TEST_DATABASE_ID not set.\n"); + exit(1); + } + const char* custom_address = getenv("INTEGRATION_TEST_CLOUDSYNC_ADDRESS"); + if (custom_address) { + snprintf(network_init, sizeof(network_init), + "SELECT cloudsync_network_init_custom('%s', '%s');", custom_address, test_db_id); + } else { + snprintf(network_init, sizeof(network_init), + "SELECT cloudsync_network_init('%s');", test_db_id); + } + rc = db_exec(db, network_init); RCHECK + + const char* apikey = getenv("INTEGRATION_TEST_APIKEY"); + if (apikey) { + char set_apikey[512]; + snprintf(set_apikey, sizeof(set_apikey), + "SELECT cloudsync_network_set_apikey('%s');", apikey); + rc = db_exec(db, set_apikey); RCHECK + } + + // Send, then poll until the server's optimistic version (serverVersion) reaches + // localVersion (10). With contiguous coverage it converges; with the gap bug it + // never does — serverVersion stays at 0 because db_versions 1..9 are reported + // missing. Polling absorbs the server's asynchronous apply. + rc = db_send_await_converge(db, 8, 1000); RCHECK + + rc = db_exec(db, "SELECT cloudsync_cleanup('users');"); RCHECK + rc = db_exec(db, "SELECT cloudsync_cleanup('activities');"); RCHECK + rc = db_exec(db, "SELECT cloudsync_cleanup('workouts');"); RCHECK + +ABORT_TEST +} + int test_chunked_payload_paths(void) { int rc = SQLITE_OK; sqlite3 *sender = NULL; @@ -594,7 +691,7 @@ int test_chunked_payload_paths(void) { rc = db_expect_min(sender, "SELECT COUNT(*) FROM cloudsync_payload_chunks();", 2); if (rc != SQLITE_OK) goto cleanup; rc = db_expect_min(sender, "SELECT COUNT(*) FROM cloudsync_payload_chunks() WHERE hex(substr(payload,5,1))='03';", 2); if (rc != SQLITE_OK) goto cleanup; - rc = db_exec(sender, "SELECT cloudsync_network_send_changes();"); if (rc != SQLITE_OK) goto cleanup; + rc = db_send_ok(sender); if (rc != SQLITE_OK) goto cleanup; cleanup_remote_row = true; for (int attempt = 0; attempt < 30; ++attempt) { @@ -667,7 +764,7 @@ int test_chunked_payload_rowset_path(void) { rc = db_expect_min(sender, "SELECT COUNT(*) FROM cloudsync_payload_chunks();", 2); if (rc != SQLITE_OK) goto cleanup; rc = db_expect_int(sender, "SELECT COUNT(*) FROM cloudsync_payload_chunks() WHERE hex(substr(payload,5,1))='03';", 0); if (rc != SQLITE_OK) goto cleanup; - rc = db_exec(sender, "SELECT cloudsync_network_send_changes();"); if (rc != SQLITE_OK) goto cleanup; + rc = db_send_ok(sender); if (rc != SQLITE_OK) goto cleanup; cleanup_remote_rows = true; for (int attempt = 0; attempt < 30; ++attempt) { @@ -751,7 +848,7 @@ int test_chunked_payload_single_sync_drain(void) { // Sender splits into multiple non-fragment chunks. rc = db_expect_min(sender, "SELECT COUNT(*) FROM cloudsync_payload_chunks();", 2); if (rc != SQLITE_OK) goto cleanup; - rc = db_exec(sender, "SELECT cloudsync_network_send_changes();"); if (rc != SQLITE_OK) goto cleanup; + rc = db_send_ok(sender); if (rc != SQLITE_OK) goto cleanup; cleanup_remote_rows = true; for (int attempt = 0; attempt < 40; ++attempt) { @@ -850,7 +947,7 @@ int test_chunked_payload_capped_receive(void) { rc = db_expect_min(sender, "SELECT COUNT(*) FROM cloudsync_payload_chunks();", 2); if (rc != SQLITE_OK) goto cleanup; - rc = db_exec(sender, "SELECT cloudsync_network_send_changes();"); if (rc != SQLITE_OK) goto cleanup; + rc = db_send_ok(sender); if (rc != SQLITE_OK) goto cleanup; cleanup_remote_rows = true; for (int attempt = 0; attempt < 80; ++attempt) { @@ -946,7 +1043,7 @@ int test_chunked_payload_batched_receive(void) { rc = db_expect_min(sender, "SELECT COUNT(*) FROM cloudsync_payload_chunks();", 3); if (rc != SQLITE_OK) goto cleanup; - rc = db_exec(sender, "SELECT cloudsync_network_send_changes();"); if (rc != SQLITE_OK) goto cleanup; + rc = db_send_ok(sender); if (rc != SQLITE_OK) goto cleanup; cleanup_remote_rows = true; for (int attempt = 0; attempt < 80; ++attempt) { @@ -1118,7 +1215,7 @@ int test_chunked_negative_cache_invalidation(void) { "INSERT INTO chunked_payload_items (id, body) VALUES ('%s', 'negative-cache-sentinel');", sentinel_id); rc = db_exec(sender, sql); if (rc != SQLITE_OK) goto cleanup; - rc = db_exec(sender, "SELECT cloudsync_network_send_changes();"); if (rc != SQLITE_OK) goto cleanup; + rc = db_send_ok(sender); if (rc != SQLITE_OK) goto cleanup; cleanup_sentinel = true; // Phase 1: drain the receiver until it is provably caught up. Each bare @@ -1196,7 +1293,7 @@ int test_chunked_negative_cache_invalidation(void) { "INSERT INTO chunked_payload_items (id, body) VALUES ('%s', 'negative-cache');", row_id); rc = db_exec(sender, sql); if (rc != SQLITE_OK) goto cleanup; - rc = db_exec(sender, "SELECT cloudsync_network_send_changes();"); if (rc != SQLITE_OK) goto cleanup; + rc = db_send_ok(sender); if (rc != SQLITE_OK) goto cleanup; cleanup_remote_row = true; // Phase 4: the receiver must now pick up the change on a subsequent receive. @@ -1508,6 +1605,7 @@ int main (void) { rc += test_report("Is Enabled Test:", test_is_enabled(DB_PATH)); rc += test_report("DB Version Test:", test_db_version(DB_PATH)); rc += test_report("Enable Disable Test:", test_enable_disable(DB_PATH)); + rc += test_report("Send Gap From Clock Hole Test:", test_send_gap_from_clock_hole(":memory:")); // Chunked payload tests run only when INTEGRATION_TEST_CHUNKED_DATABASE_ID points at a // tenant with a small payload_max_chunk_size; state the skip reason once for the group. diff --git a/test/network_unit.c b/test/network_unit.c new file mode 100644 index 0000000..d9f4299 --- /dev/null +++ b/test/network_unit.c @@ -0,0 +1,126 @@ +// +// network_unit.c +// cloudsync +// +// Unit tests for the network layer's pure response-handling logic. Built with +// networking ENABLED (unlike dist/unit, which is -DCLOUDSYNC_OMIT_NETWORK), so it +// can call the internal functions directly on crafted in-memory NETWORK_RESULT +// buffers — no server, no sockets. +// + +#include +#include +#include +#include +#include +#include "network_private.h" + +static int failures = 0; + +static void check(const char *name, bool ok) { + printf("%-64s %s\n", name, ok ? "OK" : "FAIL"); + if (!ok) failures++; +} + +static NETWORK_RESULT json_buffer(char *json) { + NETWORK_RESULT r = {0}; + r.code = CLOUDSYNC_NETWORK_BUFFER; + r.buffer = json; + r.blen = strlen(json); + return r; +} + +// Regression: lastOptimisticVersion must track the LATEST valid value, including a +// decrease. The server can roll the optimistic version back when a later send chunk +// fails; since it becomes the durable send checkpoint, a monotonic "max" would mask +// the rollback and skip the rolled-back changes on the next send. +static bool test_optimistic_version_rollback(void) { + int64_t optimistic = -1, confirmed = -1; + int gaps = -1; + char *apply = NULL, *check_fail = NULL; + bool ok = true; + + char j1[] = "{\"lastOptimisticVersion\":50,\"lastConfirmedVersion\":10}"; + NETWORK_RESULT r1 = json_buffer(j1); + network_sync_state_update_from_response(&r1, &optimistic, &confirmed, &gaps, &apply, &check_fail); + ok = ok && optimistic == 50 && confirmed == 10; + + char j2[] = "{\"lastOptimisticVersion\":100,\"lastConfirmedVersion\":20}"; + NETWORK_RESULT r2 = json_buffer(j2); + network_sync_state_update_from_response(&r2, &optimistic, &confirmed, &gaps, &apply, &check_fail); + ok = ok && optimistic == 100 && confirmed == 20; + + // Server rolls back on a later chunk error: the value must DECREASE to 50. + char j3[] = "{\"lastOptimisticVersion\":50,\"lastConfirmedVersion\":20}"; + NETWORK_RESULT r3 = json_buffer(j3); + network_sync_state_update_from_response(&r3, &optimistic, &confirmed, &gaps, &apply, &check_fail); + ok = ok && optimistic == 50; + + // A response missing the field (parsed -1) must NOT clobber the current value. + char j4[] = "{\"lastConfirmedVersion\":20}"; + NETWORK_RESULT r4 = json_buffer(j4); + network_sync_state_update_from_response(&r4, &optimistic, &confirmed, &gaps, &apply, &check_fail); + ok = ok && optimistic == 50; + + ok = ok && apply == NULL && check_fail == NULL; // no failures object in these responses + return ok; +} + +// A non-BUFFER result (or NULL buffer) must leave the accumulators untouched. +static bool test_non_buffer_is_noop(void) { + int64_t optimistic = 7, confirmed = 3; + int gaps = 0; + char *apply = NULL, *check_fail = NULL; + + NETWORK_RESULT err = {0}; + err.code = CLOUDSYNC_NETWORK_ERROR; + network_sync_state_update_from_response(&err, &optimistic, &confirmed, &gaps, &apply, &check_fail); + return optimistic == 7 && confirmed == 3 && gaps == 0; +} + +// The send announces a contiguous coverage window, not the raw chunk ranges, so +// skipped db_versions don't look like gaps. Walk the tiling exactly as the send loop +// does: announce [network_announce_min(lo, chunk_min) .. chunk_max], then lo = max+1. +static bool test_announce_window_tiling(void) { + bool ok = true; + int64_t lo = 0 + 1; // send checkpoint (since) = 0 + + // leading hole: a single change at db_version 7 must announce from 1, not 7. + ok = ok && network_announce_min(lo, 7) == 1; + lo = 7 + 1; + + // inter-chunk hole: next change at 20 (8..19 empty) announces from 8 → no gap. + ok = ok && network_announce_min(lo, 20) == 8; + lo = 20 + 1; + + // same-db_version fragments (a value split across two chunks): first fragment + // announces from the running lo, second must clamp to the shared version (30), + // never lo (31) — that would make min > max. + ok = ok && network_announce_min(lo, 30) == 21; // first fragment of v30 + lo = 30 + 1; + ok = ok && network_announce_min(lo, 30) == 30; // second fragment of v30: 30, not 31 + lo = 30 + 1; + + return ok; +} + +static bool test_compute_status(void) { + bool ok = true; + ok = ok && strcmp(network_compute_status(100, 100, 0, 100), "synced") == 0; + ok = ok && strcmp(network_compute_status(100, 50, 0, 100), "syncing") == 0; + ok = ok && strcmp(network_compute_status(100, 100, 1, 100), "out-of-sync") == 0; // gaps + ok = ok && strcmp(network_compute_status(90, 90, 0, 100), "out-of-sync") == 0; // behind local + ok = ok && strcmp(network_compute_status(-1, 100, 0, 100), "error") == 0; // unparsed + return ok; +} + +int main(void) { + printf("\nNetwork unit tests\n"); + check("optimistic/confirmed version folds latest-valid (allows rollback):", test_optimistic_version_rollback()); + check("non-buffer response is a no-op:", test_non_buffer_is_noop()); + check("send announce-window tiling (hole / inter-chunk / fragment):", test_announce_window_tiling()); + check("network_compute_status:", test_compute_status()); + if (failures) { printf("\n%d test(s) FAILED\n", failures); return 1; } + printf("\nAll network unit tests passed\n"); + return 0; +} diff --git a/test/postgresql/52_payload_chunks.sql b/test/postgresql/52_payload_chunks.sql index 2e1cfd6..ecdd364 100644 --- a/test/postgresql/52_payload_chunks.sql +++ b/test/postgresql/52_payload_chunks.sql @@ -166,102 +166,6 @@ SELECT (:str_arg_chunks::int = :incl_local_chunks::int) AS str_arg_ok \gset SELECT (:fail::int + 1) AS fail \gset \endif --- Download spool: fill stages the whole stream once; paging it back must be --- byte-identical to direct cloudsync_payload_chunks generation, with is_final --- only on the last chunk and a single stable watermark. -SELECT cloudsync_payload_spool_fill('stream-A', 0, cloudsync_siteid(), false) AS spool_fill_count \gset - -SELECT - md5(string_agg(encode(payload, 'hex'), ',' ORDER BY chunk_index)) AS spool_md5, - count(*) FILTER (WHERE is_final) AS spool_final_count, - max(chunk_index) FILTER (WHERE is_final) AS spool_final_idx, - max(chunk_index) AS spool_max_idx, - count(DISTINCT watermark) AS spool_watermark_distinct -FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' \gset - -SELECT md5(string_agg(encode(payload, 'hex'), ',' ORDER BY chunk_index)) AS direct_md5 -FROM cloudsync_payload_chunks(0, cloudsync_siteid(), NULL, false) \gset - -SELECT (:spool_fill_count::int = :chunk_count::int - AND :'spool_md5' = :'direct_md5' - AND :spool_final_count::int = 1 - AND :spool_final_idx::int = :spool_max_idx::int - AND :spool_watermark_distinct::int = 1) AS spool_ok \gset -\if :spool_ok -\echo [PASS] (:testid) Spool fill/page is byte-identical to direct generation (:spool_fill_count chunks, final on last) -\else -\echo [FAIL] (:testid) Spool fill/page mismatch (fill=:spool_fill_count vs :chunk_count, final_count=:spool_final_count) -SELECT (:fail::int + 1) AS fail \gset -\endif - --- Idempotent re-fill (no duplicate rows), empty window -> 0, drop reports count. -SELECT cloudsync_payload_spool_fill('stream-A', 0, cloudsync_siteid(), false) AS spool_refill_count \gset -SELECT count(*) AS spool_rows_after_refill FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' \gset -SELECT cloudsync_payload_spool_fill('stream-empty', 999999, cloudsync_siteid(), false) AS spool_empty_count \gset -SELECT cloudsync_payload_spool_drop('stream-A') AS spool_drop_count \gset -SELECT count(*) AS spool_rows_after_drop FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' \gset - -SELECT (:spool_refill_count::int = :chunk_count::int - AND :spool_rows_after_refill::int = :chunk_count::int - AND :spool_empty_count::int = 0 - AND :spool_drop_count::int = :chunk_count::int - AND :spool_rows_after_drop::int = 0) AS spool_lifecycle_ok \gset -\if :spool_lifecycle_ok -\echo [PASS] (:testid) Spool idempotent re-fill, empty window, and drop behave correctly -\else -\echo [FAIL] (:testid) Spool lifecycle mismatch (refill=:spool_refill_count empty=:spool_empty_count drop=:spool_drop_count after_drop=:spool_rows_after_drop) -SELECT (:fail::int + 1) AS fail \gset -\endif - --- Stale-GC: an abandoned >24h stream is reaped on the next fill. -INSERT INTO cloudsync_payload_spool - (stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final, created_at) -VALUES ('stale', 0, '\x00'::bytea, 1, 1, 1, 1, true, extract(epoch FROM now())::bigint - 90000); -SELECT cloudsync_payload_spool_fill('stream-B', 0, cloudsync_siteid(), false) AS _spool_gc_fill \gset -SELECT - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stale') AS stale_remaining, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-B') AS streamb_count \gset -SELECT (:stale_remaining::int = 0 AND :streamb_count::int = :chunk_count::int) AS spool_gc_ok \gset -\if :spool_gc_ok -\echo [PASS] (:testid) Spool stale-GC reaps abandoned streams on fill -\else -\echo [FAIL] (:testid) Spool stale-GC did not reap the abandoned stream (stale_remaining=:stale_remaining) -SELECT (:fail::int + 1) AS fail \gset -\endif - --- Chunk drop: explicit early cleanup for one S3-backed chunk is scoped to one --- (stream_id, chunk_index), idempotent, and leaves other chunks/streams intact. -SELECT cloudsync_payload_spool_fill('stream-A', 0, cloudsync_siteid(), false) AS spool_refill_for_chunk_drop \gset -SELECT cloudsync_payload_spool_drop_chunk('stream-A', 1) AS spool_chunk_drop_count \gset -SELECT cloudsync_payload_spool_drop_chunk('stream-A', 1) AS spool_chunk_drop_repeat_count \gset -SELECT cloudsync_payload_spool_drop_chunk('stream-A', 999999) AS spool_chunk_drop_missing_count \gset -SELECT - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-A') AS streama_after_chunk_drop, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' AND chunk_index = 0) AS streama_chunk0, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' AND chunk_index = 1) AS streama_chunk1, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-A' AND chunk_index = 2) AS streama_chunk2, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-B') AS streamb_after_chunk_drop, - (SELECT count(*) FROM cloudsync_payload_spool WHERE stream_id = 'stream-B' AND chunk_index = 1) AS streamb_chunk1 \gset -SELECT (:spool_refill_for_chunk_drop::int = :chunk_count::int - AND :spool_chunk_drop_count::int = 1 - AND :spool_chunk_drop_repeat_count::int = 0 - AND :spool_chunk_drop_missing_count::int = 0 - AND :streama_after_chunk_drop::int = :chunk_count::int - 1 - AND :streama_chunk0::int = 1 - AND :streama_chunk1::int = 0 - AND :streama_chunk2::int = 1 - AND :streamb_after_chunk_drop::int = :chunk_count::int - AND :streamb_chunk1::int = 1) AS spool_chunk_drop_ok \gset -\if :spool_chunk_drop_ok -\echo [PASS] (:testid) Spool chunk drop is scoped and idempotent -\else -\echo [FAIL] (:testid) Spool chunk drop mismatch (drop=:spool_chunk_drop_count repeat=:spool_chunk_drop_repeat_count missing=:spool_chunk_drop_missing_count streamA=:streama_after_chunk_drop streamB=:streamb_after_chunk_drop) -SELECT (:fail::int + 1) AS fail \gset -\endif - -SELECT cloudsync_payload_spool_drop('stream-A'); -SELECT cloudsync_payload_spool_drop('stream-B'); - SELECT md5(string_agg(id || ':' || note || ':' || encode(data, 'hex'), '|' ORDER BY id)) AS src_hash, count(*) AS src_count diff --git a/test/postgresql/55_payload_chunks_positional_resume.sql b/test/postgresql/55_payload_chunks_positional_resume.sql new file mode 100644 index 0000000..c1c1f95 --- /dev/null +++ b/test/postgresql/55_payload_chunks_positional_resume.sql @@ -0,0 +1,164 @@ +-- Payload chunks positional-cursor resume +-- +-- Proves the positional cursor on cloudsync_payload_chunks tiles a window exactly: +-- resuming at any chunk's (next_db_version, next_seq, next_frag_offset) reproduces +-- the following chunk byte-for-byte, including boundaries that fall inside a single +-- committed db_version and inside a value larger than the chunk budget. No spool +-- table, no idempotent overlap. +-- +-- Part 2 is end-to-end: drain the whole window the way the /check job will (one +-- chunk per call via the positional cursor), apply that stream to a fresh database, +-- and assert the receiver's table content hashes identically to the source. + +\set testid '55-positional' +\ir helper_test_init.sql + +\connect postgres +\ir helper_psql_conn_setup.sql +DROP DATABASE IF EXISTS cloudsync_test_55_positional; +DROP DATABASE IF EXISTS cloudsync_test_55_positional_dst; +CREATE DATABASE cloudsync_test_55_positional; +CREATE DATABASE cloudsync_test_55_positional_dst; + +\connect cloudsync_test_55_positional +\ir helper_psql_conn_setup.sql +CREATE EXTENSION IF NOT EXISTS cloudsync; +CREATE TABLE split_test (id TEXT PRIMARY KEY, body BYTEA DEFAULT '\x'::bytea); +SELECT cloudsync_init('split_test', 'CLS', 1) AS _init \gset +SELECT cloudsync_set('payload_max_chunk_size', '262144'); + +-- tx1: many medium incompressible rows in one statement -> a single db_version +-- split across several chunks (row-boundary resumes, incl. resumes landing INSIDE +-- one committed version that the legacy since>db_version cursor could not express). +INSERT INTO split_test(id, body) +SELECT format('row-%s', lpad(i::text, 4, '0')), + decode((SELECT string_agg(md5((i * 1000 + j)::text), '') FROM generate_series(1, 88) AS s(j)), 'hex') +FROM generate_series(1, 500) AS g(i); + +-- tx2: one value larger than the chunk budget -> v3 fragments (mid-fragment resumes). +INSERT INTO split_test(id, body) +VALUES ('big', decode((SELECT string_agg(md5(j::text), '') FROM generate_series(1, 30000) AS s(j)), 'hex')); + +-- For each non-final chunk of the full-window scan, resume at its reported cursor +-- and fetch the first chunk; it must equal the next chunk of the full scan. The +-- correlated SRF subquery uses ORDER BY ... LIMIT 1 so each resume call drains +-- fully (no early-terminated value-per-call SRF). +WITH base AS ( + SELECT chunk_index, payload, next_db_version, next_seq, next_frag_offset, is_final, watermark_db_version + FROM cloudsync_payload_chunks(0, cloudsync_siteid(), NULL, false) +), +resumed AS ( + SELECT b.chunk_index, + (b.next_frag_offset > 0) AS is_frag_boundary, + (SELECT r.payload + FROM cloudsync_payload_chunks(NULL, cloudsync_siteid(), + (SELECT max(watermark_db_version) FROM base), false, + b.next_db_version, b.next_seq, b.next_frag_offset) r + ORDER BY r.chunk_index LIMIT 1) AS next_payload + FROM base b + WHERE NOT b.is_final +) +SELECT + (SELECT count(*) FROM base) AS base_count, + coalesce((SELECT bool_and(r.next_payload = b2.payload) + FROM resumed r JOIN base b2 ON b2.chunk_index = r.chunk_index + 1), false) AS chunks_identical, + coalesce((SELECT bool_or(is_frag_boundary) FROM resumed), false) AS saw_frag +\gset + +SELECT (:base_count::int >= 4) AS enough_chunks \gset +\if :enough_chunks +\echo [PASS] (:testid) window produced multiple chunks (:base_count) +\else +\echo [FAIL] (:testid) expected a multi-chunk window, got :base_count +SELECT (:fail::int + 1) AS fail \gset +\endif + +\if :chunks_identical +\echo [PASS] (:testid) positional resume reproduced every following chunk byte-for-byte +\else +\echo [FAIL] (:testid) a positional resume did not reproduce the next chunk +SELECT (:fail::int + 1) AS fail \gset +\endif + +\if :saw_frag +\echo [PASS] (:testid) mid-fragment resume exercised +\else +\echo [FAIL] (:testid) mid-fragment resume not exercised +SELECT (:fail::int + 1) AS fail \gset +\endif + +-- Part 2: end-to-end drain + apply round-trip. +-- +-- Drain the window exactly as the /check job will: start with the legacy +-- exclusive cursor (since=0), then step the positional cursor one chunk per call, +-- collecting payloads in drain order. ORDER BY chunk_index LIMIT 1 forces each +-- value-per-call SRF to run to completion (no early-terminated cursor). The drained +-- chunks are returned hex-joined so they can cross \connect into the receiver DB. +CREATE OR REPLACE FUNCTION _positional_drain_hex() RETURNS text LANGUAGE plpgsql AS $$ +DECLARE + rdbv bigint; rseq bigint; rfrag bigint; wm bigint := 0; + rec record; parts text[] := '{}'; guard int := 0; +BEGIN + LOOP + guard := guard + 1; + IF guard > 100000 THEN RAISE EXCEPTION 'positional drain did not terminate'; END IF; + IF wm = 0 THEN + SELECT * INTO rec FROM cloudsync_payload_chunks(0, cloudsync_siteid(), NULL, false) + ORDER BY chunk_index LIMIT 1; + IF NOT FOUND THEN EXIT; END IF; + wm := rec.watermark_db_version; + ELSE + SELECT * INTO rec FROM cloudsync_payload_chunks(NULL, cloudsync_siteid(), wm, false, rdbv, rseq, rfrag) + ORDER BY chunk_index LIMIT 1; + IF NOT FOUND THEN EXIT; END IF; + END IF; + parts := array_append(parts, encode(rec.payload, 'hex')); + rdbv := rec.next_db_version; rseq := rec.next_seq; rfrag := rec.next_frag_offset; + EXIT WHEN rec.is_final; + END LOOP; + RETURN array_to_string(parts, ','); +END $$; + +SELECT _positional_drain_hex() AS chunks_hex \gset +SELECT + md5(string_agg(id || ':' || encode(body, 'hex'), '|' ORDER BY id)) AS src_hash, + count(*) AS src_count +FROM split_test \gset + +\connect cloudsync_test_55_positional_dst +\ir helper_psql_conn_setup.sql +CREATE EXTENSION IF NOT EXISTS cloudsync; +CREATE TABLE split_test (id TEXT PRIMARY KEY, body BYTEA DEFAULT '\x'::bytea); +SELECT cloudsync_init('split_test', 'CLS', 1) AS _init_dst \gset +SELECT cloudsync_set('payload_max_chunk_size', '262144'); + +-- Reconstitute the drained chunks and apply them (reverse order on purpose: apply +-- must be order-independent and reassemble fragments regardless). +CREATE TEMP TABLE chunk_transport(ord int, payload bytea); +INSERT INTO chunk_transport(ord, payload) +SELECT ord::int, decode(chunk_hex, 'hex') +FROM unnest(string_to_array(:'chunks_hex', ',')) WITH ORDINALITY AS t(chunk_hex, ord); + +SELECT coalesce(sum(cloudsync_payload_apply(payload)), 0) AS applied_rows +FROM (SELECT payload FROM chunk_transport ORDER BY ord DESC) AS ordered \gset + +SELECT + md5(string_agg(id || ':' || encode(body, 'hex'), '|' ORDER BY id)) AS dst_hash, + count(*) AS dst_count +FROM split_test \gset + +SELECT (:'dst_hash' = :'src_hash' AND :dst_count::int = :src_count::int + AND :dst_count::int > 0) AS roundtrip_ok \gset +\if :roundtrip_ok +\echo [PASS] (:testid) positional drain applied to a fresh database reproduces the source (:dst_count rows) +\else +\echo [FAIL] (:testid) drain/apply mismatch (src_count=:src_count dst_count=:dst_count hashes :'src_hash' vs :'dst_hash') +SELECT (:fail::int + 1) AS fail \gset +\endif + +\ir helper_test_cleanup.sql +\if :should_cleanup +\connect postgres +DROP DATABASE IF EXISTS cloudsync_test_55_positional; +DROP DATABASE IF EXISTS cloudsync_test_55_positional_dst; +\endif diff --git a/test/postgresql/full_test.sql b/test/postgresql/full_test.sql index 13509db..c6f38b7 100644 --- a/test/postgresql/full_test.sql +++ b/test/postgresql/full_test.sql @@ -62,6 +62,7 @@ \ir 52_payload_chunks.sql \ir 53_payload_blob_checked_pg_try.sql \ir 54_payload_chunks_fragment_state.sql +\ir 55_payload_chunks_positional_resume.sql -- 'Test summary' \echo '\nTest summary:' diff --git a/test/sync_bench.c b/test/sync_bench.c index 684f795..5fd496a 100644 --- a/test/sync_bench.c +++ b/test/sync_bench.c @@ -134,6 +134,21 @@ static int timed_query_text(sqlite3 *db, const char *sql, char **out, double *st return rc; } +// Runs a cloudsync_network_* scalar function. In trace mode it captures and prints +// the JSON the function returns; otherwise it executes without keeping the result. +static int db_exec_network(sqlite3 *db, const char *operation, const char *sql) { +#ifdef CLOUDSYNC_NETWORK_TRACE + char *result = NULL; + int rc = query_text(db, sql, &result); + bench_trace("response op=%s json=%s", operation, result ? result : "(null)"); + free(result); + return rc; +#else + (void)operation; + return db_exec(db, sql); +#endif +} + static int open_load_ext(const char *db_path, sqlite3 **out_db) { bench_trace("step=open-load-extension db_path=%s begin", db_path); sqlite3 *db = NULL; @@ -205,8 +220,10 @@ static int init_network(sqlite3 *db, const char *label, const char *database_id, if (rc != SQLITE_OK) return rc; } + // Drain any pre-existing backlog so the measured send->apply later starts from a + // clean baseline. This is untimed warm-up, not part of the latency measurement. bench_trace("step=pre-measure-sync db=%s begin sql=cloudsync_network_sync(500,4)", label); - rc = db_exec(db, "SELECT cloudsync_network_sync(500, 4);"); + rc = db_exec_network(db, "pre-measure-sync", "SELECT cloudsync_network_sync(500, 4);"); bench_trace("step=pre-measure-sync db=%s end rc=%d", label, rc); return rc; } @@ -383,6 +400,9 @@ static int timed_request(sqlite3 *db, sync_bench_request *request, const char *o if (strcmp(operation, "check") == 0 && request->result_json) { request->rows_received = json_int_at_path(db, request->result_json, "$.receive.rows", -1); } + // Surface the raw JSON returned by the network function (send/check) under trace. + bench_trace("response op=%s attempt=%d json=%s", operation, attempt, + request->result_json ? request->result_json : "(null)"); return request->sqlite_rc; } @@ -522,6 +542,8 @@ int main(void) { remove(DB_B_PATH); cloudsync_memory_init(1); + // Step 1 - setup: open both local databases (sender A, receiver B), load the + // extension, create the schema, attach to the network, and drain to a clean baseline. bench_trace("step=benchmark-setup begin database_id=%s poll_delay_ms=%d max_polls=%d", database_id, poll_delay_ms, max_polls); rc = setup_database("db_a", DB_A_PATH, database_id, address, apikey, &db_a); if (rc != SQLITE_OK) goto cleanup; @@ -529,15 +551,19 @@ int main(void) { if (rc != SQLITE_OK) goto cleanup; bench_trace("step=benchmark-setup end rc=%d", rc); + // Prune stale rows from prior runs and push those deletions so they don't pollute + // the receiver's backlog during the measured round-trip. rc = cleanup_old_benchmark_rows(db_a, cleanup_older_than_seconds, &cleanup_deleted_rows); if (rc != SQLITE_OK) goto cleanup; if (cleanup_deleted_rows > 0) { bench_trace("step=cleanup-send db=db_a deleted=%d begin sql=cloudsync_network_send_changes", cleanup_deleted_rows); - rc = db_exec(db_a, "SELECT cloudsync_network_send_changes();"); + rc = db_exec_network(db_a, "cleanup-send", "SELECT cloudsync_network_send_changes();"); bench_trace("step=cleanup-send db=db_a deleted=%d end rc=%d", cleanup_deleted_rows, rc); if (rc != SQLITE_OK) goto cleanup; } + // Step 3 - build the unique benchmark row: a UUIDv7 id plus an incompressible + // random blob, so the payload survives compression and exercises a realistic size. cloudsync_uuid_v7_string(row_id, true); snprintf(marker, sizeof(marker), "sync-bench-%s", row_id); snprintf(payload, sizeof(payload), "payload-%s", row_id); @@ -553,9 +579,12 @@ int main(void) { random_blob = &empty_blob; } + // Insert the row on the sender. This is the change whose propagation we time. rc = insert_benchmark_row(db_a, row_id, payload, marker, random_blob, random_blob_size); if (rc != SQLITE_OK) goto cleanup; + // Step 4 - precondition: the row must not already exist on the receiver, or the + // measurement would be meaningless. bench_trace("step=verify-before-send db=db_b row_id=%s begin", row_id); rc = verify_row(db_b, row_id, payload, marker, random_blob, random_blob_size, &applied); bench_trace("step=verify-before-send db=db_b row_id=%s end rc=%d applied=%s", row_id, rc, applied ? "true" : "false"); @@ -566,6 +595,8 @@ int main(void) { goto cleanup; } + // Step 5 - measured send: push the row from A and capture the send summary + // (status / localVersion / serverVersion). total_start_ms anchors the latency clock. bench_trace("step=send db=db_a row_id=%s begin sql=cloudsync_network_send_changes", row_id); rc = timed_request(db_a, &requests[request_count++], "send", 1, "SELECT cloudsync_network_send_changes();"); bench_trace("step=send db=db_a row_id=%s end rc=%d elapsed_ms=%.2f", row_id, rc, requests[request_count - 1].elapsed_ms); @@ -575,6 +606,8 @@ int main(void) { send_summary.server_version = json_int_at_path(db_a, requests[0].result_json, "$.send.serverVersion", -1); double total_start_ms = requests[0].started_ms; + // Step 6 - poll loop: receive on B (sleep between attempts) until the row is + // applied and verified, or max_polls is exhausted. for (int i = 0; i < max_polls; i++) { if (i > 0 && poll_delay_ms > 0) { bench_trace("step=poll-sleep attempt=%d delay_ms=%d begin", i + 1, poll_delay_ms); @@ -609,6 +642,8 @@ int main(void) { rc = SQLITE_BUSY; } + // Step 7 - aggregate timings: split the end-to-end latency into time spent in + // network requests, in poll sleeps, and the remaining local overhead. for (int i = 0; i < request_count; i++) request_ms += requests[i].elapsed_ms; measured_overhead_ms = total_ms - request_ms - poll_sleep_ms; if (measured_overhead_ms < 0.0 && measured_overhead_ms > -0.01) measured_overhead_ms = 0.0; diff --git a/test/unit.c b/test/unit.c index c23247c..969dc28 100644 --- a/test/unit.c +++ b/test/unit.c @@ -12556,15 +12556,23 @@ bool do_test_payload_chunks_split_dbversion (bool print_result, bool cleanup_dat return result; } -// Exercises the server-side download spool: cloudsync_payload_spool_fill stages a -// window's whole chunk stream once, and the /check path pages it out one chunk per -// call. Verifies byte-identity with direct cloudsync_payload_chunks generation, -// is_final marking, idempotent re-fill, drop, empty window, and stale-GC. -bool do_test_payload_spool (bool print_result, bool cleanup_databases) { +// Proves the positional-cursor resume of cloudsync_payload_chunks: paging the +// window one chunk per call with an O(1) (db_version, seq, frag_offset) seek +// yields byte-identical chunks to a single full-window scan. The dataset mixes a +// db_version split across chunks (row-boundary resumes, incl. resumes landing +// INSIDE a single committed version that the old since>db_version cursor could not +// express) with a value larger than the chunk budget (mid-fragment resumes). +// Part 2 is end-to-end: the positionally-drained stream is applied to a fresh +// receiver and its table content is compared to the source (drain -> apply -> +// faithful replica), the real path the /check job will use. +bool do_test_payload_chunks_positional_resume (bool print_result, bool cleanup_databases) { sqlite3 *db = NULL; + sqlite3 *db2 = NULL; sqlite3_stmt *stmt = NULL; - test_payload_chunk *chunks = NULL; - int chunk_count = 0, chunk_cap = 0, v3_count = 0; + sqlite3_stmt *apply = NULL; + test_payload_chunk *base = NULL; int base_count = 0, base_cap = 0; + test_payload_chunk *pos = NULL; int pos_count = 0, pos_cap = 0; + int64_t watermark = -1; bool result = false; int rc = SQLITE_OK; @@ -12574,174 +12582,159 @@ bool do_test_payload_spool (bool print_result, bool cleanup_databases) { db = do_create_database_file(0, timestamp, saved_counter); if (!db) goto finalize; rc = sqlite3_exec(db, - "CREATE TABLE payload_chunk_test (" - "id TEXT PRIMARY KEY, note TEXT DEFAULT '', data BLOB DEFAULT x'');" - "SELECT cloudsync_init('payload_chunk_test');", + "CREATE TABLE split_test (id TEXT PRIMARY KEY, body TEXT DEFAULT '');" + "SELECT cloudsync_init('split_test');" + "SELECT cloudsync_set('payload_max_chunk_size', '262144');", NULL, NULL, NULL); if (rc != SQLITE_OK) goto finalize; - // One oversized value (-> v3 fragment chunks) plus many small rows (-> several - // v2 chunks) under a tiny budget, so the window is a multi-chunk stream. + // tx1: ~500 medium rows in one transaction -> one db_version split across + // several v2 chunks (row-boundary resumes within a single version). rc = sqlite3_exec(db, - "SELECT cloudsync_set('payload_max_chunk_size', '262144');" - "INSERT INTO payload_chunk_test(id, note, data) " - "VALUES ('big', lower(hex(randomblob(360000))), randomblob(720000));" - "WITH RECURSIVE c(i) AS (SELECT 1 UNION ALL SELECT i+1 FROM c WHERE i < 260) " - "INSERT INTO payload_chunk_test(id, note, data) " - "SELECT printf('row-%03d', i), printf('small-%03d-%s', i, hex(randomblob(850))), randomblob(512) FROM c;", + "WITH RECURSIVE c(i) AS (SELECT 1 UNION ALL SELECT i+1 FROM c WHERE i < 500) " + "INSERT INTO split_test(id, body) SELECT printf('row-%04d', i), hex(randomblob(700)) FROM c;", NULL, NULL, NULL); if (rc != SQLITE_OK) goto finalize; - // Reference stream: collect the chunks the vtab generates directly for the - // whole window (since_db_version=0), in order. + // tx2: one value far larger than the chunk budget -> v3 fragments across + // several chunks (mid-fragment resumes inside a single value). + rc = sqlite3_exec(db, + "INSERT INTO split_test(id, body) VALUES ('big', hex(randomblob(900000)));", + NULL, NULL, NULL); + if (rc != SQLITE_OK) goto finalize; + + // Baseline: every chunk of the whole window, in order, via the legacy scan. rc = sqlite3_prepare_v2(db, - "SELECT payload FROM cloudsync_payload_chunks WHERE since_db_version=0 ORDER BY chunk_index;", - -1, &stmt, NULL); + "SELECT payload, watermark_db_version FROM cloudsync_payload_chunks " + "WHERE since_db_version=0 ORDER BY chunk_index;", -1, &stmt, NULL); if (rc != SQLITE_OK) goto finalize; while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { int len = sqlite3_column_bytes(stmt, 0); const void *payload = sqlite3_column_blob(stmt, 0); if (!payload || len <= 0) goto finalize; - if (len > 4 && ((const unsigned char *)payload)[4] == 3) ++v3_count; - if (chunk_count == chunk_cap) { - int new_cap = chunk_cap ? chunk_cap * 2 : 16; - test_payload_chunk *nc = realloc(chunks, sizeof(*chunks) * new_cap); - if (!nc) goto finalize; - memset(nc + chunk_cap, 0, sizeof(*chunks) * (new_cap - chunk_cap)); - chunks = nc; chunk_cap = new_cap; - } - chunks[chunk_count].data = malloc(len); - if (!chunks[chunk_count].data) goto finalize; - memcpy(chunks[chunk_count].data, payload, len); - chunks[chunk_count].len = len; - ++chunk_count; + watermark = sqlite3_column_int64(stmt, 1); + if (base_count == base_cap) { + int nc = base_cap ? base_cap * 2 : 8; + test_payload_chunk *t = realloc(base, sizeof(*t) * nc); + if (!t) goto finalize; + memset(t + base_cap, 0, sizeof(*t) * (nc - base_cap)); + base = t; base_cap = nc; + } + base[base_count].data = malloc(len); + if (!base[base_count].data) goto finalize; + memcpy(base[base_count].data, payload, len); + base[base_count].len = len; + ++base_count; } if (rc != SQLITE_DONE) goto finalize; sqlite3_finalize(stmt); stmt = NULL; - // Scenario needs a genuine multi-chunk stream including >= 2 v3 fragments. - if (chunk_count < 5 || v3_count < 2) goto finalize; - // --- fill stages the whole stream; the return value is the chunk count --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_fill('stream-A', 0, NULL, 0);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != chunk_count) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; + // Scenario must actually exercise multiple chunks (and thus resumes). + if (base_count < 4 || watermark <= 0) goto finalize; - // --- page through the spool: byte-identity, watermark stable, is_final last --- + // Positional drain: one chunk per call, seeking to the cursor the previous + // chunk reported. until is the frozen watermark from the baseline. rc = sqlite3_prepare_v2(db, - "SELECT payload, watermark, is_final FROM cloudsync_payload_spool " - "WHERE stream_id='stream-A' ORDER BY chunk_index;", - -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - int idx = 0; - int64_t watermark = -1; - while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { - if (idx >= chunk_count) goto finalize; + "SELECT payload, next_db_version, next_seq, next_frag_offset, is_final " + "FROM cloudsync_payload_chunks " + "WHERE until_db_version=?1 AND resume_db_version=?2 AND resume_seq=?3 AND resume_frag_offset=?4 " + "LIMIT 1;", -1, &stmt, NULL); + if (rc != SQLITE_OK) goto finalize; + + int64_t rdbv = 0, rseq = 0, rfrag = 0; + bool done = false; + bool saw_frag_resume = false; // a follow-up call actually resumed mid-value + // Hard cap guards against a resume bug looping forever. + for (int guard = 0; !done && guard <= base_count + 2; ++guard) { + if (rfrag > 0) saw_frag_resume = true; + sqlite3_reset(stmt); + sqlite3_bind_int64(stmt, 1, watermark); + sqlite3_bind_int64(stmt, 2, rdbv); + sqlite3_bind_int64(stmt, 3, rseq); + sqlite3_bind_int64(stmt, 4, rfrag); + rc = sqlite3_step(stmt); + if (rc != SQLITE_ROW) goto finalize; // every step before is_final must yield a chunk int len = sqlite3_column_bytes(stmt, 0); const void *payload = sqlite3_column_blob(stmt, 0); - int64_t wm = sqlite3_column_int64(stmt, 1); - int is_final = sqlite3_column_int(stmt, 2); - if (len != chunks[idx].len || memcmp(payload, chunks[idx].data, len) != 0) goto finalize; - if (watermark < 0) watermark = wm; else if (wm != watermark) goto finalize; - if (is_final != (idx == chunk_count - 1)) goto finalize; - ++idx; - } - if (rc != SQLITE_DONE || idx != chunk_count || watermark <= 0) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - - // --- idempotent re-fill: same count, no duplicate rows --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_fill('stream-A', 0, NULL, 0);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != chunk_count) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, "SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A';", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != chunk_count) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - - // --- empty window: since past the watermark yields zero chunks --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_fill('stream-empty', 999999, NULL, 0);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 0) goto finalize; + if (!payload || len <= 0) goto finalize; + rdbv = sqlite3_column_int64(stmt, 1); + rseq = sqlite3_column_int64(stmt, 2); + rfrag = sqlite3_column_int64(stmt, 3); + done = sqlite3_column_int(stmt, 4) != 0; + if (pos_count == pos_cap) { + int nc = pos_cap ? pos_cap * 2 : 8; + test_payload_chunk *t = realloc(pos, sizeof(*t) * nc); + if (!t) goto finalize; + memset(t + pos_cap, 0, sizeof(*t) * (nc - pos_cap)); + pos = t; pos_cap = nc; + } + pos[pos_count].data = malloc(len); + if (!pos[pos_count].data) goto finalize; + memcpy(pos[pos_count].data, payload, len); + pos[pos_count].len = len; + ++pos_count; + } sqlite3_finalize(stmt); stmt = NULL; - // --- stale-GC: an abandoned >24h stream is reaped on the next fill --- - rc = sqlite3_exec(db, - "INSERT INTO cloudsync_payload_spool " - "(stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final, created_at) " - "VALUES ('stale', 0, x'00', 1, 1, 1, 1, 1, unixepoch()-90000);", + // The positional drain must terminate exactly on is_final, reproduce the + // baseline chunk sequence byte-for-byte, and have actually exercised a + // mid-value (fragment) resume — not only row-boundary resumes. + if (!done || pos_count != base_count || !saw_frag_resume) goto finalize; + for (int i = 0; i < base_count; ++i) { + if (pos[i].len != base[i].len) goto finalize; + if (memcmp(pos[i].data, base[i].data, base[i].len) != 0) goto finalize; + } + + // End-to-end: apply the positionally-drained stream to a fresh receiver and + // assert its table content matches the source. This exercises the real /check + // path (positional drain -> apply -> faithful replica), not just byte-identity. + db2 = do_create_database_file(1, timestamp, saved_counter); + if (!db2) goto finalize; + rc = sqlite3_exec(db2, + "CREATE TABLE split_test (id TEXT PRIMARY KEY, body TEXT DEFAULT '');" + "SELECT cloudsync_init('split_test');", NULL, NULL, NULL); if (rc != SQLITE_OK) goto finalize; - rc = sqlite3_exec(db, "SELECT cloudsync_payload_spool_fill('stream-B', 0, NULL, 0);", NULL, NULL, NULL); - if (rc != SQLITE_OK) goto finalize; - rc = sqlite3_prepare_v2(db, - "SELECT (SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stale'), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-B');", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW) goto finalize; - if (sqlite3_column_int(stmt, 0) != 0 || sqlite3_column_int(stmt, 1) != chunk_count) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - // --- chunk drop removes only one matching chunk and reports the count --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_drop_chunk('stream-A', 1);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 1) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_drop_chunk('stream-A', 1);", -1, &stmt, NULL); + rc = sqlite3_prepare_v2(db2, "SELECT cloudsync_payload_apply(?);", -1, &apply, NULL); if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 0) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_drop_chunk('stream-A', 999999);", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 0) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, - "SELECT " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A'), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A' AND chunk_index=0), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A' AND chunk_index=1), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A' AND chunk_index=2), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-B'), " - "(SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-B' AND chunk_index=1);", - -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW) goto finalize; - if (sqlite3_column_int(stmt, 0) != chunk_count - 1 || - sqlite3_column_int(stmt, 1) != 1 || - sqlite3_column_int(stmt, 2) != 0 || - sqlite3_column_int(stmt, 3) != 1 || - sqlite3_column_int(stmt, 4) != chunk_count || - sqlite3_column_int(stmt, 5) != 1) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; + // Apply in reverse drain order: apply must reassemble v3 fragments and merge + // rows independent of transport order. + for (int i = pos_count - 1; i >= 0; --i) { + rc = sqlite3_bind_blob(apply, 1, pos[i].data, pos[i].len, SQLITE_STATIC); + if (rc != SQLITE_OK) goto finalize; + rc = sqlite3_step(apply); + if (rc != SQLITE_ROW) goto finalize; + sqlite3_reset(apply); + sqlite3_clear_bindings(apply); + } + sqlite3_finalize(apply); apply = NULL; - // --- drop removes the stream and reports the number of chunks removed --- - rc = sqlite3_prepare_v2(db, "SELECT cloudsync_payload_spool_drop('stream-A');", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != chunk_count - 1) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; - rc = sqlite3_prepare_v2(db, "SELECT COUNT(*) FROM cloudsync_payload_spool WHERE stream_id='stream-A';", -1, &stmt, NULL); - if (rc != SQLITE_OK) goto finalize; - if (sqlite3_step(stmt) != SQLITE_ROW || sqlite3_column_int(stmt, 0) != 0) goto finalize; - sqlite3_finalize(stmt); stmt = NULL; + if (!test_split_tables_equal(db, db2)) goto finalize; result = true; finalize: if (!result && print_result) { - printf("do_test_payload_spool error: %s (chunks=%d, v3=%d)\n", - db ? sqlite3_errmsg(db) : "no db", chunk_count, v3_count); + printf("do_test_payload_chunks_positional_resume error: %s (base=%d, pos=%d, watermark=%lld)\n", + db ? sqlite3_errmsg(db) : "no db", base_count, pos_count, (long long)watermark); } if (stmt) sqlite3_finalize(stmt); - test_payload_chunks_free(chunks, chunk_count); + if (apply) sqlite3_finalize(apply); + test_payload_chunks_free(base, base_count); + test_payload_chunks_free(pos, pos_count); if (db) close_db(db); + if (db2) close_db(db2); if (cleanup_databases) { - char path[256], walpath[300], shmpath[300]; - do_build_database_path(path, 0, timestamp, saved_counter); - snprintf(walpath, sizeof(walpath), "%s-wal", path); - snprintf(shmpath, sizeof(shmpath), "%s-shm", path); - file_delete_internal(path); - file_delete_internal(walpath); - file_delete_internal(shmpath); + for (int i = 0; i < 2; ++i) { + char path[256], walpath[300], shmpath[300]; + do_build_database_path(path, i, timestamp, saved_counter); + snprintf(walpath, sizeof(walpath), "%s-wal", path); + snprintf(shmpath, sizeof(shmpath), "%s-shm", path); + file_delete_internal(path); + file_delete_internal(walpath); + file_delete_internal(shmpath); + } } return result; } @@ -13196,7 +13189,7 @@ int main (int argc, const char * argv[]) { result += test_report("Payload Chunks Large Values:", do_test_payload_chunks_large_values(print_result, cleanup_databases)); result += test_report("Payload Chunks Site Exclusion:", do_test_payload_chunks_site_exclusion(print_result, cleanup_databases)); result += test_report("Payload Chunks Split db_version:", do_test_payload_chunks_split_dbversion(print_result, cleanup_databases)); - result += test_report("Payload Download Spool:", do_test_payload_spool(print_result, cleanup_databases)); + result += test_report("Payload Chunks Positional Resume:", do_test_payload_chunks_positional_resume(print_result, cleanup_databases)); // close local database close_db(db);