diff --git a/apps/roam/src/components/settings/DiscourseNodeSuggestiveRules.tsx b/apps/roam/src/components/settings/DiscourseNodeSuggestiveRules.tsx index 6fc756ff1..bb043f0b1 100644 --- a/apps/roam/src/components/settings/DiscourseNodeSuggestiveRules.tsx +++ b/apps/roam/src/components/settings/DiscourseNodeSuggestiveRules.tsx @@ -47,10 +47,9 @@ const DiscourseNodeSuggestiveRules = ({ const handleUpdateEmbeddings = async (): Promise => { setIsUpdating(true); try { - const blockNodesSince = await getAllDiscourseNodesSince( - new Date(0).toISOString(), - [node], - ); + const blockNodesSince = await getAllDiscourseNodesSince(undefined, [ + node, + ]); const supabaseClient = await getLoggedInClient(); if (!supabaseClient) return; diff --git a/apps/roam/src/utils/getAllDiscourseNodesSince.ts b/apps/roam/src/utils/getAllDiscourseNodesSince.ts index 5c71993ec..53046c40e 100644 --- a/apps/roam/src/utils/getAllDiscourseNodesSince.ts +++ b/apps/roam/src/utils/getAllDiscourseNodesSince.ts @@ -2,7 +2,7 @@ import { type DiscourseNode } from "./getDiscourseNodes"; import getDiscourseNodeFormatExpression from "./getDiscourseNodeFormatExpression"; import extractRef from "roamjs-components/util/extractRef"; -type ISODateString = string; +const DEFAULT_TIME = new Date("1970-01-01").getTime(); export type RoamDiscourseNodeData = { author_local_id: string; @@ -63,10 +63,10 @@ export const getDiscourseNodeTypeWithSettingsBlockNodes = async ( }; export const getAllDiscourseNodesSince = async ( - since: ISODateString | undefined, + since: number | undefined, nodeTypes: DiscourseNode[], ): Promise => { - const sinceMs = since ? new Date(since).getTime() : 0; + const sinceMs = since ?? DEFAULT_TIME; if (!nodeTypes.length) { return []; } @@ -144,10 +144,10 @@ export const getAllDiscourseNodesSince = async ( }; export const nodeTypeSince = async ( - since: ISODateString, + since: number | undefined, nodeTypes: DiscourseNode[], ) => { - const sinceMs = new Date(since).getTime(); + const sinceMs = since ?? DEFAULT_TIME; const filterMap = await Promise.all( nodeTypes.map((node) => { const query = ` diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index c2df0fd2d..f423a3dc4 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -19,6 +19,7 @@ import { import { fetchEmbeddingsForNodes } from "./upsertNodesAsContentWithEmbeddings"; import { convertRoamNodeToLocalContent } from "./upsertNodesAsContentWithEmbeddings"; import type { DGSupabaseClient } from "@repo/database/lib/client"; +import { intersection } from "@repo/utils/setOperations"; import type { Json, CompositeTypes, Enums } from "@repo/database/dbTypes"; import { render as renderToast } from "roamjs-components/components/Toast"; import internalError from "~/utils/internalError"; @@ -34,7 +35,6 @@ const BASE_SYNC_INTERVAL = 5 * 60 * 1000; // 5 minutes const SYNC_TIMEOUT = "60s"; // must be less than half the SYNC_INTERVAL. const BATCH_SIZE = 200; const CONCEPT_BATCH_SIZE = 200; -const DEFAULT_TIME = new Date("1970-01-01"); type SyncTaskInfo = { lastUpdateTime?: Date; @@ -286,7 +286,7 @@ export const convertDgToSupabaseConcepts = async ({ context, }: { nodesSince: RoamDiscourseNodeData[]; - since: string; + since: number | undefined; allNodeTypes: DiscourseNode[]; supabaseClient: DGSupabaseClient; context: SupabaseContext; @@ -427,6 +427,7 @@ const upsertUsers = async ( }; let doSync = true; +let initialSync = true; let numFailures = 0; const MAX_FAILURES = 5; type TimeoutValue = ReturnType; @@ -447,12 +448,50 @@ export const setSyncActivity = (active: boolean) => { } }; +const getAllMissingOrNewDiscourseNodes = async ({ + supabaseClient, + spaceId, + since, + nodeTypes, +}: { + supabaseClient: DGSupabaseClient; + spaceId: number; + since: number | undefined; + nodeTypes: DiscourseNode[]; +}): Promise => { + const allNodes = await getAllDiscourseNodesSince(undefined, nodeTypes); + if (since === undefined) return allNodes; + const newNodes = await getAllDiscourseNodesSince(since, nodeTypes); + const existingContentIdsReq = await supabaseClient + .from("my_contents") + .select("source_local_id") + .eq("space_id", spaceId); + if (existingContentIdsReq.error) throw existingContentIdsReq.error; + const existingConceptIdsReq = await supabaseClient + .from("my_concepts") + .select("source_local_id") + .eq("space_id", spaceId); + if (existingConceptIdsReq.error) throw existingConceptIdsReq.error; + const existingIds = new Set([ + ...intersection( + new Set((existingConceptIdsReq.data || []).map((d) => d.source_local_id)), + new Set((existingContentIdsReq.data || []).map((d) => d.source_local_id)), + ), + ...newNodes.map((n) => n.source_local_id), + ]); + return [ + ...newNodes, + ...allNodes.filter((n) => !existingIds.has(n.source_local_id)), + ]; +}; + export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { if (!doSync) return; console.debug("starting createOrUpdateDiscourseEmbedding"); const startTime = new Date(); let success = true; let claimed = false; + const isInitialSync = initialSync; // record state at start const worker = window.roamAlphaAPI.user.uid(); try { @@ -489,16 +528,21 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { } claimed = true; const allUsers = await getAllUsers(); - const sinceTime = (lastUpdateTime || DEFAULT_TIME).valueOf() - 1000; // add a one-second buffer - const time = new Date(sinceTime).toISOString(); + const sinceTime = lastUpdateTime + ? lastUpdateTime.valueOf() - 1000 // add a one-second buffer + : undefined; const allDgNodeTypes = getDiscourseNodes().filter( (n) => n.backedBy === "user", ); - const allNodeInstances = await getAllDiscourseNodesSince( - time, - allDgNodeTypes, - ); + const allNodeInstances = isInitialSync + ? await getAllMissingOrNewDiscourseNodes({ + supabaseClient, + spaceId: context.spaceId, + since: sinceTime, + nodeTypes: allDgNodeTypes, + }) + : await getAllDiscourseNodesSince(sinceTime, allDgNodeTypes); await upsertUsers(allUsers, supabaseClient, context); await upsertNodesToSupabaseAsContentWithEmbeddings( allNodeInstances, @@ -507,13 +551,14 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { ); await convertDgToSupabaseConcepts({ nodesSince: allNodeInstances, - since: time, + since: sinceTime, allNodeTypes: allDgNodeTypes, supabaseClient, context, }); await cleanupOrphanedNodes(supabaseClient, context); await endSyncTask({ worker, status: "complete", showToast, startTime }); + initialSync = false; const duration = (new Date().valueOf() - startTime.valueOf()) / 1000.0; posthog.capture("Sync complete", { duration }); } catch (error) {