diff --git a/examples/nuxtjs-live-avatars/package-lock.json b/examples/nuxtjs-live-avatars/package-lock.json
index 8d46914c8a..1153232b1d 100644
--- a/examples/nuxtjs-live-avatars/package-lock.json
+++ b/examples/nuxtjs-live-avatars/package-lock.json
@@ -5486,9 +5486,9 @@
}
},
"node_modules/devalue": {
- "version": "5.6.2",
- "resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.2.tgz",
- "integrity": "sha512-nPRkjWzzDQlsejL1WVifk5rvcFi/y1onBRxjaFMjZeR9mFpqu2gmAZ9xUB9/IEanEP/vBtGeGganC/GO1fmufg==",
+ "version": "5.6.4",
+ "resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.4.tgz",
+ "integrity": "sha512-Gp6rDldRsFh/7XuouDbxMH3Mx8GMCcgzIb1pDTvNyn8pZGQ22u+Wa+lGV9dQCltFQ7uVw0MhRyb8XDskNFOReA==",
"dev": true,
"license": "MIT"
},
diff --git a/examples/react-native-todo-list/package-lock.json b/examples/react-native-todo-list/package-lock.json
index 540da0e572..02d2759c9b 100644
--- a/examples/react-native-todo-list/package-lock.json
+++ b/examples/react-native-todo-list/package-lock.json
@@ -7417,9 +7417,9 @@
"license": "MIT"
},
"node_modules/fast-xml-parser": {
- "version": "4.5.3",
- "resolved": "https://registry.npmjs.org/fast-xml-parser/-/fast-xml-parser-4.5.3.tgz",
- "integrity": "sha512-RKihhV+SHsIUGXObeVy9AXiBbFwkVk7Syp8XgwN5U3JV416+Gwp/GO9i0JYKmikykgz/UHRrrV4ROuZEo/T0ig==",
+ "version": "4.5.4",
+ "resolved": "https://registry.npmjs.org/fast-xml-parser/-/fast-xml-parser-4.5.4.tgz",
+ "integrity": "sha512-jE8ugADnYOBsu1uaoayVl1tVKAMNOXyjwvv2U6udEA2ORBhDooJDWoGxTkhd4Qn4yh59JVVt/pKXtjPwx9OguQ==",
"devOptional": true,
"funding": [
{
@@ -7429,7 +7429,7 @@
],
"license": "MIT",
"dependencies": {
- "strnum": "^1.1.1"
+ "strnum": "^1.0.5"
},
"bin": {
"fxparser": "src/cli/cli.js"
diff --git a/packages/liveblocks-server/src/YjsStorage.ts b/packages/liveblocks-server/src/YjsStorage.ts
index 526d298d55..819922699b 100644
--- a/packages/liveblocks-server/src/YjsStorage.ts
+++ b/packages/liveblocks-server/src/YjsStorage.ts
@@ -15,7 +15,6 @@
* along with this program. If not, see .
*/
-import { DefaultMap } from "@liveblocks/core";
import { Base64 } from "js-base64";
import { nanoid } from "nanoid";
import * as Y from "yjs";
@@ -25,30 +24,24 @@ import { ROOT_YDOC_ID } from "~/decoders";
import type { IStorageDriver } from "~/interfaces";
import type { Logger } from "~/lib/Logger";
-// How big an update can be until we compress all individual updates into
-// a single vector and persist that instead (i.e. when we trigger "garbage
-// collection")
-const MAX_Y_UPDATE_SIZE = 100_000;
-
-type YUpdateInfo = {
- currentKey: string;
- lastVector: Uint8Array | undefined;
-};
+// How many updates to store before compacting
+const UPDATE_COUNT_THRESHOLD = 1_000;
export class YjsStorage {
private readonly driver: IStorageDriver;
+ private readonly updateCountThreshold: number;
private readonly doc: Y.Doc = new Y.Doc(); // the root document
- private readonly lastUpdatesById = new Map();
private readonly lastSnapshotById = new Map();
- // Keeps track of which keys are loaded, so we can clean them up without calling `.list()`
- private readonly keysById = new DefaultMap>(
- () => new Set()
- );
private readonly initPromisesById: Map> = new Map();
+ private readonly storedKeysById: Map = new Map();
- constructor(driver: IStorageDriver) {
+ constructor(
+ driver: IStorageDriver,
+ updateCountThreshold: number = UPDATE_COUNT_THRESHOLD
+ ) {
this.driver = driver;
+ this.updateCountThreshold = updateCountThreshold;
this.doc.on("subdocs", ({ removed }) => {
removed.forEach((subdoc: Y.Doc) => {
subdoc.destroy(); // will remove listeners
@@ -138,7 +131,9 @@ export class YjsStorage {
/**
* @param update base64 encoded uint8array
- * @returns
+ * @returns { isUpdated: boolean; snapshotHash: string }
+ * isUpdated: true if the update had an effect on the YDoc
+ * snapshotHash: the hash of the new snapshot
*/
public async addYDocUpdate(
logger: Logger,
@@ -163,7 +158,7 @@ export class YjsStorage {
// Check the snapshot before/after to see if the update had an effect
const updated = !Y.equalSnapshots(beforeSnapshot, afterSnapshot);
if (updated) {
- await this.handleYDocUpdate(doc);
+ await this.handleYDocUpdate(doc, updateAsU8, isV2);
}
return {
@@ -208,9 +203,7 @@ export class YjsStorage {
//
// this.doc = new Y.Doc();
// this.initPromisesById.clear();
- // this.lastUpdatesById.clear();
- // this.keysById.clear();
- // this.initPromisesById.clear();
+ // this.lastSnapshotById.clear();
}
// ------------------------------------------------------------------------------------
@@ -236,64 +229,36 @@ export class YjsStorage {
this.lastSnapshotById.set(docId, snapshot);
return snapshot;
}
- /**
- * Given a record of updates, merge them and compress if savings are significant
- */
- private _loadAndCompressYJSUpdates = async (
- docUpdates: Record,
+
+ // compact the updates into a single update and write it to the durable storage
+ private _compactYJSUpdates = async (
doc: Y.Doc,
- docId: YDocId
+ docId: YDocId,
+ storedKeys: string[]
): Promise => {
- // the percent we need to save to trigger re-writing storage, ie. only rewrite storage if we save more than 20%
- const SAVINGS_THRESHOLD = 0.2;
- // get all updates from disk
- const updates = Object.values(docUpdates);
- // uint8arrays size on disk is equal to their length, combine them to see how much we're using
- const sizeOnDisk = updates.reduce((acc, update) => {
- return acc + update.length;
- }, 0);
- if (updates.length > 0) {
- const docKeys = Object.keys(docUpdates);
- // keep track of keys in use
- this.keysById.set(docId, new Set(docKeys));
-
- const mergedUpdate = Y.mergeUpdates(updates);
- // Garbage collection won't happen unless we actually apply the update
- Y.applyUpdate(doc, mergedUpdate);
-
- // get the update so we can check out how big it is
- const garbageCollectedUpdate = Y.encodeStateAsUpdate(doc);
-
- if (
- garbageCollectedUpdate.length <
- sizeOnDisk * (1 - SAVINGS_THRESHOLD)
- ) {
- const newKey = nanoid();
- await this.driver.write_y_updates(
- docId,
- newKey,
- garbageCollectedUpdate
- );
- // delete all old keys, we're going to write new merged updates
- await this.driver.delete_y_updates(docId, docKeys);
- this.keysById.set(docId, new Set([newKey]));
- }
- }
+ const compactedUpdate = Y.encodeStateAsUpdate(doc);
+ const newKey = nanoid();
+ await this.driver.write_y_updates(docId, newKey, compactedUpdate);
+ // Todo: after we kill the kv driver, we should have an overwrite method in the driverso we don't need to delete and write
+ await this.driver.delete_y_updates(docId, storedKeys);
+ this.storedKeysById.set(docId, [newKey]);
};
private _loadYDocFromDurableStorage = async (
doc: Y.Doc,
docId: YDocId
): Promise => {
- const docUpdates = Object.fromEntries(
- await this.driver.iter_y_updates(docId)
- );
- await this._loadAndCompressYJSUpdates(docUpdates, doc, docId);
- // store the vector of the last update
- this.lastUpdatesById.set(docId, {
- currentKey: nanoid(),
- lastVector: Y.encodeStateVector(doc),
- });
+ const storedKeys: string[] = [];
+ for (const [key, update] of await this.driver.iter_y_updates(docId)) {
+ Y.applyUpdate(doc, update);
+ storedKeys.push(key);
+ }
+ // after compaction, there will only be one unique key.
+ if (this.shouldCompact(storedKeys)) {
+ await this._compactYJSUpdates(doc, docId, storedKeys);
+ } else {
+ this.storedKeysById.set(docId, storedKeys);
+ }
doc.emit("load", [doc]); // sets the "isLoaded" to true on the doc
return doc;
@@ -333,51 +298,41 @@ export class YjsStorage {
}
// When the YJS doc changes, update it in durable storage
- private async handleYDocUpdate(doc: Y.Doc): Promise {
+ private async handleYDocUpdate(
+ doc: Y.Doc,
+ update: Uint8Array,
+ isV2: boolean | undefined
+ ): Promise {
+ // Todo: in the future, we should pass this detail to the driver so it can store the version as metadata
+ // this will be easy for sqlite drivers, but not for the KV driver
+ const v1update = isV2 ? Y.convertUpdateFormatV2ToV1(update) : update;
const docId: YDocId =
doc.guid === this.doc.guid ? ROOT_YDOC_ID : (doc.guid as Guid);
- const docUpdateInfo = this.lastUpdatesById.get(docId);
- // get the update since last vector
- const updateSinceLastVector = Y.encodeStateAsUpdate(
- doc,
- docUpdateInfo?.lastVector
- );
- // this should happen before the await on putYDoc to avoid race conditions
- // but we need the current key before, so store it here
- const storageKey = docUpdateInfo?.currentKey ?? nanoid();
- if (updateSinceLastVector.length > MAX_Y_UPDATE_SIZE) {
- // compress update, not using the vector, we want to write the whole doc
- const newKey = nanoid();
- await this.driver.write_y_updates(
- docId,
- newKey,
- Y.encodeStateAsUpdate(doc)
- );
- // delete all old keys on disk
- await this.driver.delete_y_updates(
- docId,
- Array.from(this.keysById.getOrCreate(docId))
- );
- // update the keys we have stored
- this.keysById.set(docId, new Set([newKey]));
- // future updates will write from this vector and to this key
- this.lastUpdatesById.set(docId, {
- currentKey: nanoid(), // start writing to a new key
- lastVector: Y.encodeStateVector(doc),
- });
+
+ const storedKeys = this.storedKeysById.get(docId);
+
+ // Every UPDATE_COUNT_THRESHOLD updates, we compact the updates
+ if (this.shouldCompact(storedKeys)) {
+ await this._compactYJSUpdates(doc, docId, storedKeys || []);
+ return;
+ }
+
+ // the whole concept of storing keys is not needed when we kill the kv driver, all of this stuff is trivial in sqlite
+ const newKey = nanoid();
+ await this.driver.write_y_updates(docId, newKey, v1update);
+
+ // update the stored keys, which we'll need for compaction.
+ if (!storedKeys) {
+ this.storedKeysById.set(docId, [newKey]);
} else {
- // in this case, the update is small enough, just overwrite it
- await this.driver.write_y_updates(
- docId,
- storageKey,
- updateSinceLastVector
- );
- const keys = [storageKey];
- // keep track of keys used
- const currentKeys = this.keysById.getOrCreate(docId);
- for (const key of keys) {
- currentKeys.add(key);
- }
+ storedKeys.push(newKey);
+ }
+ }
+
+ private shouldCompact(storedKeys: string[] | undefined): boolean {
+ if (!storedKeys) {
+ return false;
}
+ return storedKeys.length >= this.updateCountThreshold;
}
}