From f0fb030f770d6d8030ab7eef866c9e7664d7e38a Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Sat, 21 Mar 2026 15:31:03 -0400 Subject: [PATCH 1/4] eng-1322: No error if a task ends that was started before currently completed task --- apps/roam/src/utils/syncDgNodesToSupabase.ts | 28 ++++++---- packages/database/src/dbTypes.ts | 1 + ...60321191952_end_sync_task_w_start_time.sql | 53 +++++++++++++++++++ packages/database/supabase/schemas/sync.sql | 7 ++- 4 files changed, 78 insertions(+), 11 deletions(-) create mode 100644 packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index c2e116073..2ce3b5bbb 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -67,11 +67,17 @@ const notifyEndSyncFailure = ({ }); }; -export const endSyncTask = async ( - worker: string, - status: Enums<"task_status">, - showToast: boolean = false, -): Promise => { +export const endSyncTask = async ({ + worker, + status, + showToast = false, + startTime, +}: { + worker: string; + status: Enums<"task_status">; + showToast: boolean; + startTime: Date; +}): Promise => { try { const supabaseClient = await getLoggedInClient(); if (!supabaseClient) return; @@ -85,6 +91,7 @@ export const endSyncTask = async ( s_function: SYNC_FUNCTION, s_worker: worker, s_status: status, + s_started_at: startTime.toISOString(), }); if (error) { console.error("endSyncTask: Error calling end_sync_task:", error); @@ -392,7 +399,7 @@ export const setSyncActivity = (active: boolean) => { export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { if (!doSync) return; console.debug("starting createOrUpdateDiscourseEmbedding"); - const startTime = new Date().valueOf(); + const startTime = new Date(); let success = true; let claimed = false; const worker = window.roamAlphaAPI.user.uid(); @@ -455,14 +462,15 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { context, }); await cleanupOrphanedNodes(supabaseClient, context); - await endSyncTask(worker, "complete", showToast); - const duration = (new Date().valueOf() - startTime) / 1000.0; + await endSyncTask({ worker, status: "complete", showToast, startTime }); + const duration = (new Date().valueOf() - startTime.valueOf()) / 1000.0; posthog.capture("Sync complete", { duration }); } catch (error) { console.error("createOrUpdateDiscourseEmbedding: Process failed:", error); success = false; - if (worker && claimed) await endSyncTask(worker, "failed", showToast); - const duration = (new Date().valueOf() - startTime) / 1000.0; + if (worker && claimed) + await endSyncTask({ worker, status: "failed", showToast, startTime }); + const duration = (new Date().valueOf() - startTime.valueOf()) / 1000.0; posthog.capture("Sync error", { duration }); if (error instanceof FatalError) { doSync = false; diff --git a/packages/database/src/dbTypes.ts b/packages/database/src/dbTypes.ts index 1f3529b30..b968544ba 100644 --- a/packages/database/src/dbTypes.ts +++ b/packages/database/src/dbTypes.ts @@ -1469,6 +1469,7 @@ export type Database = { end_sync_task: { Args: { s_function: string + s_started_at?: string s_status: Database["public"]["Enums"]["task_status"] s_target: number s_worker: string diff --git a/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql b/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql new file mode 100644 index 000000000..7bc47dcd8 --- /dev/null +++ b/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql @@ -0,0 +1,53 @@ +DROP FUNCTION IF EXISTS public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status); + +CREATE OR REPLACE FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamp = NULL +) RETURNS void +SET search_path = '' +LANGUAGE plpgsql +AS $$ +DECLARE t_id INTEGER; +DECLARE t_worker varchar; +DECLARE t_status public.task_status; +DECLARE t_failure_count SMALLINT; +DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; +DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE; +DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; +BEGIN + SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start + INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start + FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; + ASSERT s_status > 'active'; + IF t_worker != s_worker AND COALESCE(s_started_at, last_task_start) < last_task_start THEN + -- we probably took too long. Let the other task have priority. + RETURN; + END IF; + ASSERT t_worker = s_worker, 'Wrong worker'; + ASSERT s_status >= t_status, 'do not go back in status'; + IF s_status = 'complete' THEN + t_last_task_end := now(); + t_last_success_start := t_last_task_start; + t_failure_count := 0; + ELSE + IF t_status != s_status THEN + t_failure_count := t_failure_count + 1; + END IF; + END IF; + + UPDATE public.sync_info + SET status = s_status, + task_times_out_at=null, + last_task_end=t_last_task_end, + last_success_start=t_last_success_start, + failure_count=t_failure_count + WHERE id=t_id; +END; +$$; diff --git a/packages/database/supabase/schemas/sync.sql b/packages/database/supabase/schemas/sync.sql index 6ae25b7f3..28ccd92f9 100644 --- a/packages/database/supabase/schemas/sync.sql +++ b/packages/database/supabase/schemas/sync.sql @@ -50,7 +50,8 @@ CREATE OR REPLACE FUNCTION public.end_sync_task( s_target bigint, s_function character varying, s_worker character varying, - s_status public.task_status + s_status public.task_status, + s_started_at timestamp = NULL ) RETURNS void SET search_path = '' LANGUAGE plpgsql @@ -67,6 +68,10 @@ BEGIN INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; ASSERT s_status > 'active'; + IF t_worker != s_worker AND COALESCE(s_started_at, last_task_start) < last_task_start THEN + -- we probably took too long. Let the other task have priority. + RETURN; + END IF; ASSERT t_worker = s_worker, 'Wrong worker'; ASSERT s_status >= t_status, 'do not go back in status'; IF s_status = 'complete' THEN From 363a9ac23a1eba6fd58513b823f934dbd1260a4d Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Sat, 21 Mar 2026 15:35:44 -0400 Subject: [PATCH 2/4] typo --- .../migrations/20260321191952_end_sync_task_w_start_time.sql | 2 +- packages/database/supabase/schemas/sync.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql b/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql index 7bc47dcd8..fee68e453 100644 --- a/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql +++ b/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql @@ -26,7 +26,7 @@ BEGIN INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; ASSERT s_status > 'active'; - IF t_worker != s_worker AND COALESCE(s_started_at, last_task_start) < last_task_start THEN + IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN -- we probably took too long. Let the other task have priority. RETURN; END IF; diff --git a/packages/database/supabase/schemas/sync.sql b/packages/database/supabase/schemas/sync.sql index 28ccd92f9..6d0012a54 100644 --- a/packages/database/supabase/schemas/sync.sql +++ b/packages/database/supabase/schemas/sync.sql @@ -68,7 +68,7 @@ BEGIN INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; ASSERT s_status > 'active'; - IF t_worker != s_worker AND COALESCE(s_started_at, last_task_start) < last_task_start THEN + IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN -- we probably took too long. Let the other task have priority. RETURN; END IF; From 88a30b6c2904d137732318dc2727a98745782ea9 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Sat, 21 Mar 2026 16:03:39 -0400 Subject: [PATCH 3/4] also change parameters in grant and set owner --- ...60321191952_end_sync_task_w_start_time.sql | 30 +++++++++++++++++++ packages/database/supabase/schemas/sync.sql | 12 +++++--- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql b/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql index fee68e453..666c3d97e 100644 --- a/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql +++ b/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql @@ -51,3 +51,33 @@ BEGIN WHERE id=t_id; END; $$; + +ALTER FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamp +) OWNER TO "postgres"; + +GRANT ALL ON FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamp +) TO "anon"; +GRANT ALL ON FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamp +) TO "authenticated"; +GRANT ALL ON FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamp +) TO "service_role"; diff --git a/packages/database/supabase/schemas/sync.sql b/packages/database/supabase/schemas/sync.sql index 6d0012a54..3961426f1 100644 --- a/packages/database/supabase/schemas/sync.sql +++ b/packages/database/supabase/schemas/sync.sql @@ -98,7 +98,8 @@ ALTER FUNCTION public.end_sync_task( s_target bigint, s_function character varying, s_worker character varying, - s_status public.task_status + s_status public.task_status, + s_started_at timestamp ) OWNER TO "postgres"; @@ -195,19 +196,22 @@ GRANT ALL ON FUNCTION public.end_sync_task( s_target bigint, s_function character varying, s_worker character varying, - s_status public.task_status + s_status public.task_status, + s_started_at timestamp ) TO "anon"; GRANT ALL ON FUNCTION public.end_sync_task( s_target bigint, s_function character varying, s_worker character varying, - s_status public.task_status + s_status public.task_status, + s_started_at timestamp ) TO "authenticated"; GRANT ALL ON FUNCTION public.end_sync_task( s_target bigint, s_function character varying, s_worker character varying, - s_status public.task_status + s_status public.task_status, + s_started_at timestamp ) TO "service_role"; GRANT ALL ON FUNCTION public.propose_sync_task( From 907bc49907e7629b515fae9088a08bfaccd3254f Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Sat, 21 Mar 2026 16:06:02 -0400 Subject: [PATCH 4/4] Use timestamptz --- .../20260321191952_end_sync_task_w_start_time.sql | 10 +++++----- packages/database/supabase/schemas/sync.sql | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql b/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql index 666c3d97e..12d353969 100644 --- a/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql +++ b/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql @@ -9,7 +9,7 @@ CREATE OR REPLACE FUNCTION public.end_sync_task( s_function character varying, s_worker character varying, s_status public.task_status, - s_started_at timestamp = NULL + s_started_at timestamptz = NULL ) RETURNS void SET search_path = '' LANGUAGE plpgsql @@ -57,7 +57,7 @@ ALTER FUNCTION public.end_sync_task( s_function character varying, s_worker character varying, s_status public.task_status, - s_started_at timestamp + s_started_at timestamptz ) OWNER TO "postgres"; GRANT ALL ON FUNCTION public.end_sync_task( @@ -65,19 +65,19 @@ GRANT ALL ON FUNCTION public.end_sync_task( s_function character varying, s_worker character varying, s_status public.task_status, - s_started_at timestamp + s_started_at timestamptz ) TO "anon"; GRANT ALL ON FUNCTION public.end_sync_task( s_target bigint, s_function character varying, s_worker character varying, s_status public.task_status, - s_started_at timestamp + s_started_at timestamptz ) TO "authenticated"; GRANT ALL ON FUNCTION public.end_sync_task( s_target bigint, s_function character varying, s_worker character varying, s_status public.task_status, - s_started_at timestamp + s_started_at timestamptz ) TO "service_role"; diff --git a/packages/database/supabase/schemas/sync.sql b/packages/database/supabase/schemas/sync.sql index 3961426f1..10e73a91d 100644 --- a/packages/database/supabase/schemas/sync.sql +++ b/packages/database/supabase/schemas/sync.sql @@ -51,7 +51,7 @@ CREATE OR REPLACE FUNCTION public.end_sync_task( s_function character varying, s_worker character varying, s_status public.task_status, - s_started_at timestamp = NULL + s_started_at timestamptz = NULL ) RETURNS void SET search_path = '' LANGUAGE plpgsql @@ -99,7 +99,7 @@ ALTER FUNCTION public.end_sync_task( s_function character varying, s_worker character varying, s_status public.task_status, - s_started_at timestamp + s_started_at timestamptz ) OWNER TO "postgres"; @@ -197,21 +197,21 @@ GRANT ALL ON FUNCTION public.end_sync_task( s_function character varying, s_worker character varying, s_status public.task_status, - s_started_at timestamp + s_started_at timestamptz ) TO "anon"; GRANT ALL ON FUNCTION public.end_sync_task( s_target bigint, s_function character varying, s_worker character varying, s_status public.task_status, - s_started_at timestamp + s_started_at timestamptz ) TO "authenticated"; GRANT ALL ON FUNCTION public.end_sync_task( s_target bigint, s_function character varying, s_worker character varying, s_status public.task_status, - s_started_at timestamp + s_started_at timestamptz ) TO "service_role"; GRANT ALL ON FUNCTION public.propose_sync_task(