diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index c2e116073..2ce3b5bbb 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -67,11 +67,17 @@ const notifyEndSyncFailure = ({ }); }; -export const endSyncTask = async ( - worker: string, - status: Enums<"task_status">, - showToast: boolean = false, -): Promise => { +export const endSyncTask = async ({ + worker, + status, + showToast = false, + startTime, +}: { + worker: string; + status: Enums<"task_status">; + showToast: boolean; + startTime: Date; +}): Promise => { try { const supabaseClient = await getLoggedInClient(); if (!supabaseClient) return; @@ -85,6 +91,7 @@ export const endSyncTask = async ( s_function: SYNC_FUNCTION, s_worker: worker, s_status: status, + s_started_at: startTime.toISOString(), }); if (error) { console.error("endSyncTask: Error calling end_sync_task:", error); @@ -392,7 +399,7 @@ export const setSyncActivity = (active: boolean) => { export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { if (!doSync) return; console.debug("starting createOrUpdateDiscourseEmbedding"); - const startTime = new Date().valueOf(); + const startTime = new Date(); let success = true; let claimed = false; const worker = window.roamAlphaAPI.user.uid(); @@ -455,14 +462,15 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { context, }); await cleanupOrphanedNodes(supabaseClient, context); - await endSyncTask(worker, "complete", showToast); - const duration = (new Date().valueOf() - startTime) / 1000.0; + await endSyncTask({ worker, status: "complete", showToast, startTime }); + const duration = (new Date().valueOf() - startTime.valueOf()) / 1000.0; posthog.capture("Sync complete", { duration }); } catch (error) { console.error("createOrUpdateDiscourseEmbedding: Process failed:", error); success = false; - if (worker && claimed) await endSyncTask(worker, "failed", showToast); - const duration = (new Date().valueOf() - startTime) / 1000.0; + if (worker && claimed) + await endSyncTask({ worker, status: "failed", showToast, startTime }); + const duration = (new Date().valueOf() - startTime.valueOf()) / 1000.0; posthog.capture("Sync error", { duration }); if (error instanceof FatalError) { doSync = false; diff --git a/packages/database/src/dbTypes.ts b/packages/database/src/dbTypes.ts index 1f3529b30..b968544ba 100644 --- a/packages/database/src/dbTypes.ts +++ b/packages/database/src/dbTypes.ts @@ -1469,6 +1469,7 @@ export type Database = { end_sync_task: { Args: { s_function: string + s_started_at?: string s_status: Database["public"]["Enums"]["task_status"] s_target: number s_worker: string diff --git a/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql b/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql new file mode 100644 index 000000000..12d353969 --- /dev/null +++ b/packages/database/supabase/migrations/20260321191952_end_sync_task_w_start_time.sql @@ -0,0 +1,83 @@ +DROP FUNCTION IF EXISTS public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status); + +CREATE OR REPLACE FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz = NULL +) RETURNS void +SET search_path = '' +LANGUAGE plpgsql +AS $$ +DECLARE t_id INTEGER; +DECLARE t_worker varchar; +DECLARE t_status public.task_status; +DECLARE t_failure_count SMALLINT; +DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; +DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE; +DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; +BEGIN + SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start + INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start + FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; + ASSERT s_status > 'active'; + IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN + -- we probably took too long. Let the other task have priority. + RETURN; + END IF; + ASSERT t_worker = s_worker, 'Wrong worker'; + ASSERT s_status >= t_status, 'do not go back in status'; + IF s_status = 'complete' THEN + t_last_task_end := now(); + t_last_success_start := t_last_task_start; + t_failure_count := 0; + ELSE + IF t_status != s_status THEN + t_failure_count := t_failure_count + 1; + END IF; + END IF; + + UPDATE public.sync_info + SET status = s_status, + task_times_out_at=null, + last_task_end=t_last_task_end, + last_success_start=t_last_success_start, + failure_count=t_failure_count + WHERE id=t_id; +END; +$$; + +ALTER FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz +) OWNER TO "postgres"; + +GRANT ALL ON FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz +) TO "anon"; +GRANT ALL ON FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz +) TO "authenticated"; +GRANT ALL ON FUNCTION public.end_sync_task( + s_target bigint, + s_function character varying, + s_worker character varying, + s_status public.task_status, + s_started_at timestamptz +) TO "service_role"; diff --git a/packages/database/supabase/schemas/sync.sql b/packages/database/supabase/schemas/sync.sql index 6ae25b7f3..10e73a91d 100644 --- a/packages/database/supabase/schemas/sync.sql +++ b/packages/database/supabase/schemas/sync.sql @@ -50,7 +50,8 @@ CREATE OR REPLACE FUNCTION public.end_sync_task( s_target bigint, s_function character varying, s_worker character varying, - s_status public.task_status + s_status public.task_status, + s_started_at timestamptz = NULL ) RETURNS void SET search_path = '' LANGUAGE plpgsql @@ -67,6 +68,10 @@ BEGIN INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; ASSERT s_status > 'active'; + IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN + -- we probably took too long. Let the other task have priority. + RETURN; + END IF; ASSERT t_worker = s_worker, 'Wrong worker'; ASSERT s_status >= t_status, 'do not go back in status'; IF s_status = 'complete' THEN @@ -93,7 +98,8 @@ ALTER FUNCTION public.end_sync_task( s_target bigint, s_function character varying, s_worker character varying, - s_status public.task_status + s_status public.task_status, + s_started_at timestamptz ) OWNER TO "postgres"; @@ -190,19 +196,22 @@ GRANT ALL ON FUNCTION public.end_sync_task( s_target bigint, s_function character varying, s_worker character varying, - s_status public.task_status + s_status public.task_status, + s_started_at timestamptz ) TO "anon"; GRANT ALL ON FUNCTION public.end_sync_task( s_target bigint, s_function character varying, s_worker character varying, - s_status public.task_status + s_status public.task_status, + s_started_at timestamptz ) TO "authenticated"; GRANT ALL ON FUNCTION public.end_sync_task( s_target bigint, s_function character varying, s_worker character varying, - s_status public.task_status + s_status public.task_status, + s_started_at timestamptz ) TO "service_role"; GRANT ALL ON FUNCTION public.propose_sync_task(