Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions apps/obsidian/src/utils/publishNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ import {
getFileForNodeInstanceIds,
loadRelations,
saveRelations,
type RelationsFile,
} from "./relationsStore";
import type { RelationInstance } from "~/types";
import { getAvailableGroupIds } from "./importNodes";
import { syncAllNodesAndRelations } from "./syncDgNodesToSupabase";
import type { DiscourseNodeInVault } from "./getDiscourseNodes";
import type { SupabaseContext } from "./supabaseContext";
import type { TablesInsert } from "@repo/database/dbTypes";

const publishSchema = async ({
client,
Expand Down Expand Up @@ -74,6 +78,21 @@ const intersection = <T>(set1: Set<T>, set2: Set<T>): Set<T> => {
return r;
};

const difference = <T>(set1: Set<T>, set2: Set<T>): Set<T> => {
// @ts-expect-error - Set.difference is ES2025 feature
if (set1.difference) return set1.difference(set2); // eslint-disable-line
const result = new Set(set1);
if (set1.size <= set2.size)
for (const e of set1) {
if (set2.has(e)) result.delete(e);
}
else
for (const e of set2) {
if (result.has(e)) result.delete(e);
}
return result;
};

export const publishNewRelation = async (
plugin: DiscourseGraphPlugin,
relation: RelationInstance,
Expand Down Expand Up @@ -249,6 +268,146 @@ export const publishNode = async ({
return await publishNodeToGroup({ plugin, file, frontmatter, myGroup });
};

export const ensurePublishedRelationsAccuracy = async ({
client,
context,
plugin,
allNodesById,
relationInstancesData,
}: {
client: DGSupabaseClient;
context: SupabaseContext;
plugin: DiscourseGraphPlugin;
allNodesById: Record<string, DiscourseNodeInVault>;
relationInstancesData: RelationsFile;
}): Promise<void> => {
const myGroups = await getAvailableGroupIds(client);
const relationInstances = Object.values(relationInstancesData.relations);
const syncedRelationIdsResult = await client
.from("Concept")
.select("source_local_id")
.eq("space_id", context.spaceId)
.eq("is_schema", false)
.gt("arity", 0);
if (syncedRelationIdsResult.error) {
console.error(
"Could not get synced relation ids",
syncedRelationIdsResult.error,
);
return;
}
const syncedRelationIds = new Set(
(syncedRelationIdsResult.data || []).map((x) => x.source_local_id!),
);
// Also a good time to look at orphan relations
const existingRelationIds = new Set(relationInstances.map((r) => r.id));
const orphanRelationIds = difference(syncedRelationIds, existingRelationIds);
if (orphanRelationIds.size) {
const r = await client
.from("Concept")
.delete()
.eq("space_id", context.spaceId)
.in("source_local_id", [...orphanRelationIds]);
if (!r.error) {
for (const id of orphanRelationIds) {
syncedRelationIds.delete(id);
}
}
}
let changed = false;
const missingPublishRecords: TablesInsert<"ResourceAccess">[] = [];
for (const group of myGroups) {
const publishableRelations = relationInstances.filter(
(r) =>
!r.importedFromRid &&
syncedRelationIds.has(r.id) &&
(
(allNodesById[r.source]?.frontmatter?.publishedToGroups as
| string[]
| undefined) || []
).indexOf(group) >= 0 &&
(
(allNodesById[r.destination]?.frontmatter?.publishedToGroups as
| string[]
| undefined) || []
).indexOf(group) >= 0,
);
const publishableRelationIds = new Set(
publishableRelations.map((x) => x.id),
);
const publishedIds = await client
.from("ResourceAccess")
.select("source_local_id")
.eq("account_uid", group)
.eq("space_id", context.spaceId);
if (publishedIds.error) {
console.error("Could not get synced relation ids", publishedIds.error);
continue;
}
const publishedRelationIds = intersection(
syncedRelationIds,
new Set((publishedIds.data || []).map((x) => x.source_local_id)),
);
const missingPublishableIds = difference(
publishableRelationIds,
publishedRelationIds,
);
if (missingPublishableIds.size > 0) {
missingPublishRecords.push(
/* eslint-disable @typescript-eslint/naming-convention */
...[...missingPublishableIds].map((source_local_id) => ({
source_local_id,
space_id: context.spaceId,
account_uid: group,
})),
/* eslint-enable @typescript-eslint/naming-convention */
);
}
const extraPublishableIds = difference(
publishedRelationIds,
publishableRelationIds,
);
if (extraPublishableIds.size > 0) {
const r = await client
.from("ResourceAccess")
.delete()
.eq("account_uid", group)
.eq("space_id", context.spaceId)
.in("source_local_id", [...extraPublishableIds]);
if (r.error) console.error(r.error);
else {
for (const id of extraPublishableIds) {
const rel = relationInstancesData.relations[id];
const pos = (rel?.publishedToGroupId || []).indexOf(group);
if (pos >= 0) {
rel!.publishedToGroupId!.splice(pos, 1);
changed = true;
}
}
}
}
}
if (missingPublishRecords.length > 0) {
const r = await client.from("ResourceAccess").upsert(missingPublishRecords);
if (r.error) console.error(r.error);
else {
for (const record of missingPublishRecords) {
const rel = relationInstancesData.relations[record.source_local_id];
const group = record.account_uid;
const pos = (rel?.publishedToGroupId || []).indexOf(group);
if (rel && pos < 0) {
if (rel.publishedToGroupId === undefined) rel.publishedToGroupId = [];
rel.publishedToGroupId.push(group);
changed = true;
}
}
}
}
if (changed) {
await saveRelations(plugin, relationInstancesData);
}
};

export const publishNodeToGroup = async ({
plugin,
file,
Expand Down
15 changes: 14 additions & 1 deletion apps/obsidian/src/utils/syncDgNodesToSupabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
type SupabaseContext,
} from "./supabaseContext";
import { default as DiscourseGraphPlugin } from "~/index";
import { publishNode } from "./publishNode";
import { publishNode, ensurePublishedRelationsAccuracy } from "./publishNode";
import { upsertNodesToSupabaseAsContentWithEmbeddings } from "./upsertNodesAsContentWithEmbeddings";
import {
orderConceptsByDependency,
Expand Down Expand Up @@ -416,6 +416,7 @@ export const syncAllNodesAndRelations = async (
accountLocalId,
plugin,
allNodes,
startupRun: true,
});

// When synced nodes are already published, ensure non-text assets are in storage.
Expand All @@ -433,13 +434,15 @@ const convertDgToSupabaseConcepts = async ({
accountLocalId,
plugin,
allNodes,
startupRun,
}: {
nodesSince: ObsidianDiscourseNodeData[];
supabaseClient: DGSupabaseClient;
context: SupabaseContext;
accountLocalId: string;
plugin: DiscourseGraphPlugin;
allNodes?: DiscourseNodeInVault[];
startupRun?: boolean;
}): Promise<void> => {
const lastNodeSchemaSync = (
await getLastNodeSchemaSyncTime(supabaseClient, context.spaceId)
Expand Down Expand Up @@ -564,6 +567,16 @@ const convertDgToSupabaseConcepts = async ({
throw new Error(`upsert_concepts failed: ${errorMessage}`);
}
}
if (startupRun === true) {
// occasional extra work: Make sure relations that should be published are.
await ensurePublishedRelationsAccuracy({
client: supabaseClient,
context,
plugin,
allNodesById,
relationInstancesData,
});
}
};

/**
Expand Down
Loading