From 64e405fbd37b5122b43fbd5add7abcef9156cbde Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Sun, 24 May 2026 11:27:13 -0400 Subject: [PATCH 1/2] ENG-1793 sync all missing nodes at startup --- apps/roam/src/utils/syncDgNodesToSupabase.ts | 53 ++++++++++++++++++-- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index c2df0fd2d..07d4ac967 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -19,6 +19,7 @@ import { import { fetchEmbeddingsForNodes } from "./upsertNodesAsContentWithEmbeddings"; import { convertRoamNodeToLocalContent } from "./upsertNodesAsContentWithEmbeddings"; import type { DGSupabaseClient } from "@repo/database/lib/client"; +import { intersection } from "@repo/utils/setOperations"; import type { Json, CompositeTypes, Enums } from "@repo/database/dbTypes"; import { render as renderToast } from "roamjs-components/components/Toast"; import internalError from "~/utils/internalError"; @@ -427,6 +428,7 @@ const upsertUsers = async ( }; let doSync = true; +let initialSync = true; let numFailures = 0; const MAX_FAILURES = 5; type TimeoutValue = ReturnType; @@ -447,12 +449,50 @@ export const setSyncActivity = (active: boolean) => { } }; +const getAllMissingOrNewDiscourseNodes = async ({ + supabaseClient, + spaceId, + since, + nodeTypes, +}: { + supabaseClient: DGSupabaseClient; + spaceId: number; + since: string | undefined; + nodeTypes: DiscourseNode[]; +}): Promise => { + const allNodes = await getAllDiscourseNodesSince(undefined, nodeTypes); + if (since === undefined) return allNodes; + const newNodes = await getAllDiscourseNodesSince(since, nodeTypes); + const existingContentIdsReq = await supabaseClient + .from("my_contents") + .select("source_local_id") + .eq("space_id", spaceId); + if (existingContentIdsReq.error) throw existingContentIdsReq.error; + const existingConceptIdsReq = await supabaseClient + .from("my_concepts") + .select("source_local_id") + .eq("space_id", spaceId); + if (existingConceptIdsReq.error) throw existingConceptIdsReq.error; + const existingIds = new Set([ + ...intersection( + new Set((existingConceptIdsReq.data || []).map((d) => d.source_local_id)), + new Set((existingContentIdsReq.data || []).map((d) => d.source_local_id)), + ), + ...newNodes.map((n) => n.source_local_id), + ]); + return [ + ...newNodes, + ...allNodes.filter((n) => !existingIds.has(n.source_local_id)), + ]; +}; + export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { if (!doSync) return; console.debug("starting createOrUpdateDiscourseEmbedding"); const startTime = new Date(); let success = true; let claimed = false; + const isInitialSync = initialSync; // record state at start const worker = window.roamAlphaAPI.user.uid(); try { @@ -495,10 +535,14 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { (n) => n.backedBy === "user", ); - const allNodeInstances = await getAllDiscourseNodesSince( - time, - allDgNodeTypes, - ); + const allNodeInstances = isInitialSync + ? await getAllMissingOrNewDiscourseNodes({ + supabaseClient, + spaceId: context.spaceId, + since: time, + nodeTypes: allDgNodeTypes, + }) + : await getAllDiscourseNodesSince(time, allDgNodeTypes); await upsertUsers(allUsers, supabaseClient, context); await upsertNodesToSupabaseAsContentWithEmbeddings( allNodeInstances, @@ -514,6 +558,7 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { }); await cleanupOrphanedNodes(supabaseClient, context); await endSyncTask({ worker, status: "complete", showToast, startTime }); + initialSync = false; const duration = (new Date().valueOf() - startTime.valueOf()) / 1000.0; posthog.capture("Sync complete", { duration }); } catch (error) { From 673d338230e86b11fd03c6546af488982248867d Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Sun, 24 May 2026 12:00:48 -0400 Subject: [PATCH 2/2] Fix: ensure getAllMissingOrNewDiscourseNodes actually gets undefined. Refactoring: Avoid conversions by passing timestamps to nodeTypeSince, getAllDiscourseNodesSince, convertDgToSupabaseConcepts. --- .../settings/DiscourseNodeSuggestiveRules.tsx | 7 +++---- apps/roam/src/utils/getAllDiscourseNodesSince.ts | 10 +++++----- apps/roam/src/utils/syncDgNodesToSupabase.ts | 16 ++++++++-------- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/apps/roam/src/components/settings/DiscourseNodeSuggestiveRules.tsx b/apps/roam/src/components/settings/DiscourseNodeSuggestiveRules.tsx index 6fc756ff1..bb043f0b1 100644 --- a/apps/roam/src/components/settings/DiscourseNodeSuggestiveRules.tsx +++ b/apps/roam/src/components/settings/DiscourseNodeSuggestiveRules.tsx @@ -47,10 +47,9 @@ const DiscourseNodeSuggestiveRules = ({ const handleUpdateEmbeddings = async (): Promise => { setIsUpdating(true); try { - const blockNodesSince = await getAllDiscourseNodesSince( - new Date(0).toISOString(), - [node], - ); + const blockNodesSince = await getAllDiscourseNodesSince(undefined, [ + node, + ]); const supabaseClient = await getLoggedInClient(); if (!supabaseClient) return; diff --git a/apps/roam/src/utils/getAllDiscourseNodesSince.ts b/apps/roam/src/utils/getAllDiscourseNodesSince.ts index 5c71993ec..53046c40e 100644 --- a/apps/roam/src/utils/getAllDiscourseNodesSince.ts +++ b/apps/roam/src/utils/getAllDiscourseNodesSince.ts @@ -2,7 +2,7 @@ import { type DiscourseNode } from "./getDiscourseNodes"; import getDiscourseNodeFormatExpression from "./getDiscourseNodeFormatExpression"; import extractRef from "roamjs-components/util/extractRef"; -type ISODateString = string; +const DEFAULT_TIME = new Date("1970-01-01").getTime(); export type RoamDiscourseNodeData = { author_local_id: string; @@ -63,10 +63,10 @@ export const getDiscourseNodeTypeWithSettingsBlockNodes = async ( }; export const getAllDiscourseNodesSince = async ( - since: ISODateString | undefined, + since: number | undefined, nodeTypes: DiscourseNode[], ): Promise => { - const sinceMs = since ? new Date(since).getTime() : 0; + const sinceMs = since ?? DEFAULT_TIME; if (!nodeTypes.length) { return []; } @@ -144,10 +144,10 @@ export const getAllDiscourseNodesSince = async ( }; export const nodeTypeSince = async ( - since: ISODateString, + since: number | undefined, nodeTypes: DiscourseNode[], ) => { - const sinceMs = new Date(since).getTime(); + const sinceMs = since ?? DEFAULT_TIME; const filterMap = await Promise.all( nodeTypes.map((node) => { const query = ` diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index 07d4ac967..f423a3dc4 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -35,7 +35,6 @@ const BASE_SYNC_INTERVAL = 5 * 60 * 1000; // 5 minutes const SYNC_TIMEOUT = "60s"; // must be less than half the SYNC_INTERVAL. const BATCH_SIZE = 200; const CONCEPT_BATCH_SIZE = 200; -const DEFAULT_TIME = new Date("1970-01-01"); type SyncTaskInfo = { lastUpdateTime?: Date; @@ -287,7 +286,7 @@ export const convertDgToSupabaseConcepts = async ({ context, }: { nodesSince: RoamDiscourseNodeData[]; - since: string; + since: number | undefined; allNodeTypes: DiscourseNode[]; supabaseClient: DGSupabaseClient; context: SupabaseContext; @@ -457,7 +456,7 @@ const getAllMissingOrNewDiscourseNodes = async ({ }: { supabaseClient: DGSupabaseClient; spaceId: number; - since: string | undefined; + since: number | undefined; nodeTypes: DiscourseNode[]; }): Promise => { const allNodes = await getAllDiscourseNodesSince(undefined, nodeTypes); @@ -529,8 +528,9 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { } claimed = true; const allUsers = await getAllUsers(); - const sinceTime = (lastUpdateTime || DEFAULT_TIME).valueOf() - 1000; // add a one-second buffer - const time = new Date(sinceTime).toISOString(); + const sinceTime = lastUpdateTime + ? lastUpdateTime.valueOf() - 1000 // add a one-second buffer + : undefined; const allDgNodeTypes = getDiscourseNodes().filter( (n) => n.backedBy === "user", ); @@ -539,10 +539,10 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { ? await getAllMissingOrNewDiscourseNodes({ supabaseClient, spaceId: context.spaceId, - since: time, + since: sinceTime, nodeTypes: allDgNodeTypes, }) - : await getAllDiscourseNodesSince(time, allDgNodeTypes); + : await getAllDiscourseNodesSince(sinceTime, allDgNodeTypes); await upsertUsers(allUsers, supabaseClient, context); await upsertNodesToSupabaseAsContentWithEmbeddings( allNodeInstances, @@ -551,7 +551,7 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { ); await convertDgToSupabaseConcepts({ nodesSince: allNodeInstances, - since: time, + since: sinceTime, allNodeTypes: allDgNodeTypes, supabaseClient, context,