Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ const DiscourseNodeSuggestiveRules = ({
const handleUpdateEmbeddings = async (): Promise<void> => {
setIsUpdating(true);
try {
const blockNodesSince = await getAllDiscourseNodesSince(
new Date(0).toISOString(),
[node],
);
const blockNodesSince = await getAllDiscourseNodesSince(undefined, [
node,
]);
const supabaseClient = await getLoggedInClient();
if (!supabaseClient) return;

Expand Down
10 changes: 5 additions & 5 deletions apps/roam/src/utils/getAllDiscourseNodesSince.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { type DiscourseNode } from "./getDiscourseNodes";
import getDiscourseNodeFormatExpression from "./getDiscourseNodeFormatExpression";
import extractRef from "roamjs-components/util/extractRef";

type ISODateString = string;
const DEFAULT_TIME = new Date("1970-01-01").getTime();

export type RoamDiscourseNodeData = {
author_local_id: string;
Expand Down Expand Up @@ -63,10 +63,10 @@ export const getDiscourseNodeTypeWithSettingsBlockNodes = async (
};

export const getAllDiscourseNodesSince = async (
since: ISODateString | undefined,
since: number | undefined,
nodeTypes: DiscourseNode[],
): Promise<RoamDiscourseNodeData[]> => {
const sinceMs = since ? new Date(since).getTime() : 0;
const sinceMs = since ?? DEFAULT_TIME;
if (!nodeTypes.length) {
return [];
}
Expand Down Expand Up @@ -144,10 +144,10 @@ export const getAllDiscourseNodesSince = async (
};

export const nodeTypeSince = async (
since: ISODateString,
since: number | undefined,
nodeTypes: DiscourseNode[],
) => {
const sinceMs = new Date(since).getTime();
const sinceMs = since ?? DEFAULT_TIME;
const filterMap = await Promise.all(
nodeTypes.map((node) => {
const query = `
Expand Down
63 changes: 54 additions & 9 deletions apps/roam/src/utils/syncDgNodesToSupabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
import { fetchEmbeddingsForNodes } from "./upsertNodesAsContentWithEmbeddings";
import { convertRoamNodeToLocalContent } from "./upsertNodesAsContentWithEmbeddings";
import type { DGSupabaseClient } from "@repo/database/lib/client";
import { intersection } from "@repo/utils/setOperations";
import type { Json, CompositeTypes, Enums } from "@repo/database/dbTypes";
import { render as renderToast } from "roamjs-components/components/Toast";
import internalError from "~/utils/internalError";
Expand All @@ -34,7 +35,6 @@ const BASE_SYNC_INTERVAL = 5 * 60 * 1000; // 5 minutes
const SYNC_TIMEOUT = "60s"; // must be less than half the SYNC_INTERVAL.
const BATCH_SIZE = 200;
const CONCEPT_BATCH_SIZE = 200;
const DEFAULT_TIME = new Date("1970-01-01");

type SyncTaskInfo = {
lastUpdateTime?: Date;
Expand Down Expand Up @@ -286,7 +286,7 @@ export const convertDgToSupabaseConcepts = async ({
context,
}: {
nodesSince: RoamDiscourseNodeData[];
since: string;
since: number | undefined;
allNodeTypes: DiscourseNode[];
supabaseClient: DGSupabaseClient;
context: SupabaseContext;
Expand Down Expand Up @@ -427,6 +427,7 @@ const upsertUsers = async (
};

let doSync = true;
let initialSync = true;
let numFailures = 0;
const MAX_FAILURES = 5;
type TimeoutValue = ReturnType<typeof setTimeout>;
Expand All @@ -447,12 +448,50 @@ export const setSyncActivity = (active: boolean) => {
}
};

const getAllMissingOrNewDiscourseNodes = async ({
supabaseClient,
spaceId,
since,
nodeTypes,
}: {
supabaseClient: DGSupabaseClient;
spaceId: number;
since: number | undefined;
nodeTypes: DiscourseNode[];
}): Promise<RoamDiscourseNodeData[]> => {
const allNodes = await getAllDiscourseNodesSince(undefined, nodeTypes);
if (since === undefined) return allNodes;
const newNodes = await getAllDiscourseNodesSince(since, nodeTypes);
const existingContentIdsReq = await supabaseClient
.from("my_contents")
.select("source_local_id")
.eq("space_id", spaceId);
if (existingContentIdsReq.error) throw existingContentIdsReq.error;
const existingConceptIdsReq = await supabaseClient
.from("my_concepts")
.select("source_local_id")
.eq("space_id", spaceId);
if (existingConceptIdsReq.error) throw existingConceptIdsReq.error;
const existingIds = new Set([
...intersection(
new Set((existingConceptIdsReq.data || []).map((d) => d.source_local_id)),
new Set((existingContentIdsReq.data || []).map((d) => d.source_local_id)),
),
...newNodes.map((n) => n.source_local_id),
]);
return [
...newNodes,
...allNodes.filter((n) => !existingIds.has(n.source_local_id)),
];
};

export const createOrUpdateDiscourseEmbedding = async (showToast = false) => {
if (!doSync) return;
console.debug("starting createOrUpdateDiscourseEmbedding");
const startTime = new Date();
let success = true;
let claimed = false;
const isInitialSync = initialSync; // record state at start
const worker = window.roamAlphaAPI.user.uid();

try {
Expand Down Expand Up @@ -489,16 +528,21 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => {
}
claimed = true;
const allUsers = await getAllUsers();
const sinceTime = (lastUpdateTime || DEFAULT_TIME).valueOf() - 1000; // add a one-second buffer
const time = new Date(sinceTime).toISOString();
const sinceTime = lastUpdateTime
? lastUpdateTime.valueOf() - 1000 // add a one-second buffer
: undefined;
const allDgNodeTypes = getDiscourseNodes().filter(
(n) => n.backedBy === "user",
);

const allNodeInstances = await getAllDiscourseNodesSince(
time,
allDgNodeTypes,
);
const allNodeInstances = isInitialSync
? await getAllMissingOrNewDiscourseNodes({
supabaseClient,
spaceId: context.spaceId,
since: sinceTime,
nodeTypes: allDgNodeTypes,
})
: await getAllDiscourseNodesSince(sinceTime, allDgNodeTypes);
await upsertUsers(allUsers, supabaseClient, context);
await upsertNodesToSupabaseAsContentWithEmbeddings(
allNodeInstances,
Expand All @@ -507,13 +551,14 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => {
);
await convertDgToSupabaseConcepts({
nodesSince: allNodeInstances,
since: time,
since: sinceTime,
allNodeTypes: allDgNodeTypes,
supabaseClient,
context,
});
await cleanupOrphanedNodes(supabaseClient, context);
await endSyncTask({ worker, status: "complete", showToast, startTime });
initialSync = false;
const duration = (new Date().valueOf() - startTime.valueOf()) / 1000.0;
posthog.capture("Sync complete", { duration });
} catch (error) {
Expand Down
Loading