Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,29 @@ FI_SRC = $(FI_DIR)/fractional_indexing.c
# Combined for SQLite extension build
SRC_FILES = $(CORE_SRC) $(SQLITE_SRC) $(FI_SRC)

TEST_SRC = $(wildcard $(TEST_DIR)/*.c)
# network_unit.c is built separately with networking ENABLED (see network-unittest),
# so keep it out of the default OMIT_NETWORK test build.
TEST_SRC = $(filter-out $(TEST_DIR)/network_unit.c,$(wildcard $(TEST_DIR)/*.c))
TEST_FILES = $(SRC_FILES) $(TEST_SRC) $(wildcard $(SQLITE_DIR)/*.c)
RELEASE_OBJ = $(patsubst %.c, $(BUILD_RELEASE)/%.o, $(notdir $(SRC_FILES)))
TEST_OBJ = $(patsubst %.c, $(BUILD_TEST)/%.o, $(notdir $(TEST_FILES)))
COV_FILES = $(filter-out $(SRC_DIR)/lz4.c $(NETWORK_DIR)/network.c $(SQLITE_IMPL_DIR)/sql_sqlite.c $(POSTGRES_IMPL_DIR)/database_postgresql.c $(FI_SRC), $(SRC_FILES))
CURL_LIB = $(CURL_DIR)/$(PLATFORM)/libcurl.a
TEST_TARGET = $(patsubst %.c,$(DIST_DIR)/%$(EXE), $(notdir $(TEST_SRC)))

# Network-enabled unit tests: rebuild the codebase with networking ON (T_CFLAGS
# minus OMIT_NETWORK) and link curl, so network.c's internal functions can be
# tested directly on in-memory buffers. NT_LDFLAGS reuses the platform LDFLAGS
# (which carries -lcurl) minus the shared-library-only flags (-shared on Linux,
# -dynamiclib on macOS) so it links as an executable, plus the test link libs.
# -undefined dynamic_lookup is kept: the test never opens a connection, so curl's
# transport symbols are linked but never invoked.
BUILD_NETTEST = build/nettest
NT_CFLAGS = $(filter-out -DCLOUDSYNC_OMIT_NETWORK,$(T_CFLAGS))
NT_LDFLAGS = $(filter-out -shared -dynamiclib -headerpad_max_install_names,$(LDFLAGS)) $(T_LDFLAGS)
NT_SRC = $(SRC_FILES) $(SQLITE_DIR)/sqlite3.c $(TEST_DIR)/network_unit.c
NT_OBJ = $(patsubst %.c,$(BUILD_NETTEST)/%.o,$(notdir $(NT_SRC)))

# Build curl hermetically: neutralize the developer's ambient build env so
# curl's ./configure compile tests aren't broken by overrides leaking in
# (e.g. exported LDFLAGS/CPPFLAGS/LIBS pointing at Homebrew). Build flags for
Expand Down Expand Up @@ -261,8 +276,20 @@ $(BUILD_TEST)/sqlite3.o: $(SQLITE_DIR)/sqlite3.c
$(BUILD_TEST)/%.o: %.c
$(CC) $(T_CFLAGS) -c $< -o $@

# Network-enabled object files (networking ON, for network-unittest)
$(BUILD_NETTEST):
mkdir -p $(BUILD_NETTEST)
$(BUILD_NETTEST)/sqlite3.o: $(SQLITE_DIR)/sqlite3.c | $(BUILD_NETTEST)
$(CC) $(CFLAGS) -DSQLITE_DQS=0 -DSQLITE_CORE -c $< -o $@
$(BUILD_NETTEST)/%.o: %.c | $(BUILD_NETTEST)
$(CC) $(NT_CFLAGS) -c $< -o $@

# Run code coverage (--css-file $(CUSTOM_CSS))
test: $(TARGET) $(TEST_TARGET) unittest e2e
# dist/network_unit is listed before `unittest` so it is built during the file-build
# phase: on Android the host `make test` aborts when `unittest` runs the cross-built
# dist/unit, and network_unit (not in TEST_TARGET) would otherwise never be built
# before `make test -n` captures the on-emulator command script.
test: $(TARGET) $(TEST_TARGET) $(DIST_DIR)/network_unit$(EXE) unittest network-unittest e2e
set -e; $(SQLITE3) ":memory:" -cmd ".bail on" ".load ./$<" "SELECT cloudsync_version();"
ifneq ($(COVERAGE),false)
mkdir -p $(COV_DIR)
Expand All @@ -274,6 +301,17 @@ endif
unittest: $(TARGET) $(DIST_DIR)/unit$(EXE)
@./$(DIST_DIR)/unit$(EXE)

# Network-enabled unit test binary. Link it via a file rule (like dist/unit), not in
# the run recipe below: on Android `make test` runs binaries on the emulator from a
# script generated by `make test -n`, so a link command inside the recipe would be
# emitted and the cross-compiler invoked on-device (it isn't there → exit 127).
$(DIST_DIR)/network_unit$(EXE): $(CURL_LIB) $(NT_OBJ)
$(CC) $(NT_OBJ) -o $@ $(NT_LDFLAGS)

# Run the network-layer unit tests (networking compiled in, no server)
network-unittest: $(DIST_DIR)/network_unit$(EXE)
@./$(DIST_DIR)/network_unit$(EXE)

# Run end-to-end integration tests
e2e: $(TARGET) $(DIST_DIR)/integration$(EXE)
@if [ -f .env ]; then \
Expand Down
46 changes: 33 additions & 13 deletions src/network/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -1601,18 +1601,22 @@ static int network_send_payload_to_apply(sqlite3_context *context, network_data
return SQLITE_OK;
}

static void network_sync_state_update_from_response(NETWORK_RESULT *res,
int64_t *last_optimistic_version,
int64_t *last_confirmed_version,
int *gaps_size,
char **apply_failure_json,
char **check_failure_json) {
void network_sync_state_update_from_response(NETWORK_RESULT *res,
int64_t *last_optimistic_version,
int64_t *last_confirmed_version,
int *gaps_size,
char **apply_failure_json,
char **check_failure_json) {
if (!res || res->code != CLOUDSYNC_NETWORK_BUFFER || !res->buffer) return;

int64_t parsed_version = json_extract_int(res->buffer, res->blen, "lastOptimisticVersion", -1);
if (parsed_version > *last_optimistic_version) *last_optimistic_version = parsed_version;
parsed_version = json_extract_int(res->buffer, res->blen, "lastConfirmedVersion", -1);
if (parsed_version > *last_confirmed_version) *last_confirmed_version = parsed_version;
// Take the latest valid (>= 0) value, not the max: the server can move these
// BACKWARD on a rollback when a later send chunk fails, and lastOptimisticVersion
// becomes the durable send checkpoint — masking a decrease would advance the
// checkpoint past the rolled-back changes and silently drop them.
int64_t parsed_optimistic = json_extract_int(res->buffer, res->blen, "lastOptimisticVersion", -1);
if (parsed_optimistic >= 0) *last_optimistic_version = parsed_optimistic;
int64_t parsed_confirmed = json_extract_int(res->buffer, res->blen, "lastConfirmedVersion", -1);
if (parsed_confirmed >= 0) *last_confirmed_version = parsed_confirmed;
int parsed_gaps_size = json_extract_array_size(res->buffer, res->blen, "gaps");
if (parsed_gaps_size >= 0) *gaps_size = parsed_gaps_size;

Expand All @@ -1627,10 +1631,16 @@ static void network_sync_state_update_from_response(NETWORK_RESULT *res,
if (*check_failure_json) cloudsync_memory_free(*check_failure_json);
*check_failure_json = check_failure;
}

#ifdef CLOUDSYNC_NETWORK_TRACE
// Full endpoint response body that the sync-state fields above were parsed from.
// The buffer is not guaranteed NUL-terminated, so bound the print with its length.
fprintf(stderr, "[cloudsync-network] sync_state response=%.*s\n", (int)res->blen, res->buffer);
#endif
}

static const char *network_compute_status(int64_t last_optimistic, int64_t last_confirmed,
int gaps_size, int64_t local_version) {
const char *network_compute_status(int64_t last_optimistic, int64_t last_confirmed,
int gaps_size, int64_t local_version) {
if (last_optimistic < 0 || last_confirmed < 0) return "error";
if (gaps_size > 0 || last_optimistic < local_version) return "out-of-sync";
if (last_optimistic == last_confirmed) return "synced";
Expand Down Expand Up @@ -1715,6 +1725,11 @@ int cloudsync_network_send_changes_internal (sqlite3_context *context, int argc,
bool sent_any = false;
int sent_chunks = 0; // payload chunks sent this call
int64_t sent_bytes = 0; // serialized payload bytes sent this call
// Lower bound of the coverage window announced to /apply. The server tracks
// per-site applied ranges from version 1, so the announced ranges must tile
// [send_checkpoint+1 .. watermark] contiguously; otherwise db_versions consumed
// without a local-site change (e.g. merged remote changes) read back as gaps.
int64_t announce_lo = db_version + 1;

while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
const void *blob = sqlite3_column_blob(stmt, 0);
Expand All @@ -1731,8 +1746,12 @@ int cloudsync_network_send_changes_internal (sqlite3_context *context, int argc,
goto cleanup;
}

// Announce the covered window, not just where the data sits, so db_versions
// skipped by non-local-change transactions don't read back as server-side gaps.
int64_t announce_min = network_announce_min(announce_lo, db_version_min);

NETWORK_RESULT res = {0};
rc = network_send_payload_to_apply(context, netdata, blob, blob_size, db_version_min, db_version_max, &res);
rc = network_send_payload_to_apply(context, netdata, blob, blob_size, announce_min, db_version_max, &res);
if (rc != SQLITE_OK) goto cleanup;

if (res.code == CLOUDSYNC_NETWORK_BUFFER && res.buffer) {
Expand All @@ -1750,6 +1769,7 @@ int cloudsync_network_send_changes_internal (sqlite3_context *context, int argc,
sent_chunks++;
sent_bytes += payload_size;
if (watermark > new_db_version) new_db_version = watermark;
announce_lo = db_version_max + 1; // next chunk continues the window here
}
if (rc != SQLITE_DONE) {
sqlite3_result_error(context, sqlite3_errmsg(db), -1);
Expand Down
15 changes: 15 additions & 0 deletions src/network/network_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
#ifndef __CLOUDSYNC_NETWORK_PRIVATE__
#define __CLOUDSYNC_NETWORK_PRIVATE__

#include <stdint.h>
#include <stddef.h>

#define CLOUDSYNC_DEFAULT_ADDRESS "https://cloudsync.sqlite.ai"
#define CLOUDSYNC_ENDPOINT_PREFIX "v2/cloudsync/databases"
#define CLOUDSYNC_ENDPOINT_UPLOAD "upload"
Expand Down Expand Up @@ -46,6 +49,18 @@ bool network_data_set_endpoints (network_data *data, char *auth, char *check, ch
bool network_send_buffer(network_data *data, const char *endpoint, const char *authentication, const void *blob, int blob_size);
NETWORK_RESULT network_receive_buffer (network_data *data, const char *endpoint, const char *authentication, bool zero_terminated, bool is_post_request, char *json_payload, const char **extra_headers, int nextra_headers);

// Exposed (non-static) for the network unit test; otherwise internal to network.c.
void network_sync_state_update_from_response(NETWORK_RESULT *res, int64_t *last_optimistic_version, int64_t *last_confirmed_version, int *gaps_size, char **apply_failure_json, char **check_failure_json);
const char *network_compute_status(int64_t last_optimistic, int64_t last_confirmed, int gaps_size, int64_t local_version);

// Lower bound of the coverage window a send chunk announces to /apply: the running
// window start, but never above the chunk's own min (so consecutive chunks sharing a
// db_version — a value fragmented across chunks — keep min<=max). The caller advances
// window_lo to chunk_db_version_max+1 after each chunk so the ranges tile contiguously.
static inline int64_t network_announce_min(int64_t window_lo, int64_t chunk_db_version_min) {
return window_lo < chunk_db_version_min ? window_lo : chunk_db_version_min;
}

#ifdef CLOUDSYNC_NETWORK_TRACE
const char *network_trace_endpoint_name(network_data *data, const char *endpoint);
const char *network_trace_result_name(int code);
Expand Down
124 changes: 9 additions & 115 deletions src/postgresql/cloudsync.sql.in
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ CREATE OR REPLACE FUNCTION cloudsync_payload_chunks(
since_db_version bigint DEFAULT NULL,
filter_site_id bytea DEFAULT NULL,
until_db_version bigint DEFAULT NULL,
exclude_filter_site_id boolean DEFAULT false
exclude_filter_site_id boolean DEFAULT false,
resume_db_version bigint DEFAULT NULL,
resume_seq bigint DEFAULT NULL,
resume_frag_offset bigint DEFAULT NULL
)
RETURNS TABLE (
payload bytea,
Expand All @@ -162,7 +165,11 @@ RETURNS TABLE (
rows bigint,
db_version_min bigint,
db_version_max bigint,
watermark_db_version bigint
watermark_db_version bigint,
next_db_version bigint,
next_seq bigint,
next_frag_offset bigint,
is_final boolean
)
AS 'MODULE_PATHNAME', 'cloudsync_payload_chunks'
LANGUAGE C VOLATILE;
Expand Down Expand Up @@ -191,119 +198,6 @@ RETURNS bytea
AS 'MODULE_PATHNAME', 'cloudsync_uuid_blob'
LANGUAGE C IMMUTABLE;

-- Download spool (server-side /check chunk staging)
--
-- The /check endpoint runs on a separate host and reaches the tenant DB over a
-- network driver, which materializes the whole result set. Streaming chunks
-- directly with SELECT * FROM cloudsync_payload_chunks(...) would therefore pull
-- every chunk into the server's memory at once. Instead the server fills a
-- window's chunk stream once into this table and pages it out one chunk per call.
--
-- UNLOGGED: this is regenerable scratch state, so skip WAL. Created at install
-- time (not lazily inside the function) to avoid the plpgsql create-then-use
-- cached-plan pitfall.
CREATE TABLE IF NOT EXISTS cloudsync_payload_spool (
stream_id text NOT NULL,
chunk_index bigint NOT NULL,
payload bytea NOT NULL,
payload_size bigint NOT NULL,
db_version_min bigint NOT NULL,
db_version_max bigint NOT NULL,
watermark bigint NOT NULL,
is_final boolean NOT NULL DEFAULT false,
created_at bigint NOT NULL DEFAULT extract(epoch FROM now())::bigint,
PRIMARY KEY (stream_id, chunk_index)
);

-- cloudsync_payload_spool_fill(stream_id, since, filter_site_id, exclude)
-- Generate the whole chunk stream for a window once into cloudsync_payload_spool.
-- Returns the number of chunks spooled. Idempotent: a prior complete fill is kept.
-- Parameters are p_-prefixed so they never collide with the table's columns
-- (an unqualified `stream_id` would otherwise be ambiguous with the parameter).
CREATE OR REPLACE FUNCTION cloudsync_payload_spool_fill(
p_stream_id text,
p_since_db_version bigint,
p_filter_site_id bytea DEFAULT NULL,
p_exclude_filter_site_id boolean DEFAULT false
) RETURNS bigint AS $$
DECLARE
existing bigint;
cnt bigint := 0;
rec record;
BEGIN
-- Stale-GC of abandoned streams. fill runs once per stream (coarse-grained),
-- so unlike per-fragment cleanup there is no O(n^2) risk and no throttle.
DELETE FROM cloudsync_payload_spool
WHERE stream_id IN (
SELECT s.stream_id FROM cloudsync_payload_spool s
GROUP BY s.stream_id
HAVING max(s.created_at) < extract(epoch FROM now())::bigint - 86400);

-- Idempotent: a prior complete fill stays as-is (fill is atomic, so rows
-- present == complete stream).
SELECT count(*) INTO existing FROM cloudsync_payload_spool WHERE stream_id = p_stream_id;
IF existing > 0 THEN
RETURN existing;
END IF;

-- Generate the stream one chunk at a time. A cursor FOR loop fetches in
-- batches rather than materializing the whole SRF into a tuplestore.
FOR rec IN
SELECT c.payload, c.chunk_index, c.payload_size, c.db_version_min, c.db_version_max, c.watermark_db_version
FROM cloudsync_payload_chunks(p_since_db_version, p_filter_site_id, NULL, p_exclude_filter_site_id) c
LOOP
INSERT INTO cloudsync_payload_spool
(stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final)
VALUES (p_stream_id, rec.chunk_index, rec.payload, rec.payload_size,
rec.db_version_min, rec.db_version_max, rec.watermark_db_version, false);
cnt := cnt + 1;
END LOOP;

IF cnt > 0 THEN
UPDATE cloudsync_payload_spool SET is_final = true
WHERE stream_id = p_stream_id
AND chunk_index = (SELECT max(x.chunk_index) FROM cloudsync_payload_spool x
WHERE x.stream_id = p_stream_id);
END IF;

RETURN cnt;
END;
$$ LANGUAGE plpgsql VOLATILE;

-- cloudsync_payload_spool_drop(stream_id) -> number of chunks removed.
CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop(p_stream_id text)
RETURNS bigint AS $$
DECLARE
deleted bigint := 0;
BEGIN
DELETE FROM cloudsync_payload_spool WHERE stream_id = p_stream_id;
GET DIAGNOSTICS deleted = ROW_COUNT;
RETURN deleted;
END;
$$ LANGUAGE plpgsql VOLATILE;

-- cloudsync_payload_spool_drop_chunk(stream_id, chunk_index) -> number of chunks removed.
-- Explicit early cleanup for one S3-backed chunk; stream-level TTL/GC remains unchanged.
CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop_chunk(p_stream_id text, p_chunk_index bigint)
RETURNS bigint AS $$
DECLARE
deleted bigint := 0;
BEGIN
IF p_stream_id IS NULL THEN
RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: stream_id is required.';
END IF;
IF p_chunk_index IS NULL THEN
RAISE EXCEPTION 'cloudsync_payload_spool_drop_chunk: chunk_index is required.';
END IF;

DELETE FROM cloudsync_payload_spool
WHERE stream_id = p_stream_id
AND chunk_index = p_chunk_index;
GET DIAGNOSTICS deleted = ROW_COUNT;
RETURN deleted;
END;
$$ LANGUAGE plpgsql VOLATILE;

-- Payload decoding and application
CREATE OR REPLACE FUNCTION cloudsync_payload_decode(payload bytea)
RETURNS integer
Expand Down
Loading
Loading