diff --git a/examples/nuxtjs-live-avatars/package-lock.json b/examples/nuxtjs-live-avatars/package-lock.json index 8d46914c8a..1153232b1d 100644 --- a/examples/nuxtjs-live-avatars/package-lock.json +++ b/examples/nuxtjs-live-avatars/package-lock.json @@ -5486,9 +5486,9 @@ } }, "node_modules/devalue": { - "version": "5.6.2", - "resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.2.tgz", - "integrity": "sha512-nPRkjWzzDQlsejL1WVifk5rvcFi/y1onBRxjaFMjZeR9mFpqu2gmAZ9xUB9/IEanEP/vBtGeGganC/GO1fmufg==", + "version": "5.6.4", + "resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.4.tgz", + "integrity": "sha512-Gp6rDldRsFh/7XuouDbxMH3Mx8GMCcgzIb1pDTvNyn8pZGQ22u+Wa+lGV9dQCltFQ7uVw0MhRyb8XDskNFOReA==", "dev": true, "license": "MIT" }, diff --git a/examples/react-native-todo-list/package-lock.json b/examples/react-native-todo-list/package-lock.json index 540da0e572..02d2759c9b 100644 --- a/examples/react-native-todo-list/package-lock.json +++ b/examples/react-native-todo-list/package-lock.json @@ -7417,9 +7417,9 @@ "license": "MIT" }, "node_modules/fast-xml-parser": { - "version": "4.5.3", - "resolved": "https://registry.npmjs.org/fast-xml-parser/-/fast-xml-parser-4.5.3.tgz", - "integrity": "sha512-RKihhV+SHsIUGXObeVy9AXiBbFwkVk7Syp8XgwN5U3JV416+Gwp/GO9i0JYKmikykgz/UHRrrV4ROuZEo/T0ig==", + "version": "4.5.4", + "resolved": "https://registry.npmjs.org/fast-xml-parser/-/fast-xml-parser-4.5.4.tgz", + "integrity": "sha512-jE8ugADnYOBsu1uaoayVl1tVKAMNOXyjwvv2U6udEA2ORBhDooJDWoGxTkhd4Qn4yh59JVVt/pKXtjPwx9OguQ==", "devOptional": true, "funding": [ { @@ -7429,7 +7429,7 @@ ], "license": "MIT", "dependencies": { - "strnum": "^1.1.1" + "strnum": "^1.0.5" }, "bin": { "fxparser": "src/cli/cli.js" diff --git a/packages/liveblocks-server/src/YjsStorage.ts b/packages/liveblocks-server/src/YjsStorage.ts index 526d298d55..819922699b 100644 --- a/packages/liveblocks-server/src/YjsStorage.ts +++ b/packages/liveblocks-server/src/YjsStorage.ts @@ -15,7 +15,6 @@ * along with this program. If not, see . */ -import { DefaultMap } from "@liveblocks/core"; import { Base64 } from "js-base64"; import { nanoid } from "nanoid"; import * as Y from "yjs"; @@ -25,30 +24,24 @@ import { ROOT_YDOC_ID } from "~/decoders"; import type { IStorageDriver } from "~/interfaces"; import type { Logger } from "~/lib/Logger"; -// How big an update can be until we compress all individual updates into -// a single vector and persist that instead (i.e. when we trigger "garbage -// collection") -const MAX_Y_UPDATE_SIZE = 100_000; - -type YUpdateInfo = { - currentKey: string; - lastVector: Uint8Array | undefined; -}; +// How many updates to store before compacting +const UPDATE_COUNT_THRESHOLD = 1_000; export class YjsStorage { private readonly driver: IStorageDriver; + private readonly updateCountThreshold: number; private readonly doc: Y.Doc = new Y.Doc(); // the root document - private readonly lastUpdatesById = new Map(); private readonly lastSnapshotById = new Map(); - // Keeps track of which keys are loaded, so we can clean them up without calling `.list()` - private readonly keysById = new DefaultMap>( - () => new Set() - ); private readonly initPromisesById: Map> = new Map(); + private readonly storedKeysById: Map = new Map(); - constructor(driver: IStorageDriver) { + constructor( + driver: IStorageDriver, + updateCountThreshold: number = UPDATE_COUNT_THRESHOLD + ) { this.driver = driver; + this.updateCountThreshold = updateCountThreshold; this.doc.on("subdocs", ({ removed }) => { removed.forEach((subdoc: Y.Doc) => { subdoc.destroy(); // will remove listeners @@ -138,7 +131,9 @@ export class YjsStorage { /** * @param update base64 encoded uint8array - * @returns + * @returns { isUpdated: boolean; snapshotHash: string } + * isUpdated: true if the update had an effect on the YDoc + * snapshotHash: the hash of the new snapshot */ public async addYDocUpdate( logger: Logger, @@ -163,7 +158,7 @@ export class YjsStorage { // Check the snapshot before/after to see if the update had an effect const updated = !Y.equalSnapshots(beforeSnapshot, afterSnapshot); if (updated) { - await this.handleYDocUpdate(doc); + await this.handleYDocUpdate(doc, updateAsU8, isV2); } return { @@ -208,9 +203,7 @@ export class YjsStorage { // // this.doc = new Y.Doc(); // this.initPromisesById.clear(); - // this.lastUpdatesById.clear(); - // this.keysById.clear(); - // this.initPromisesById.clear(); + // this.lastSnapshotById.clear(); } // ------------------------------------------------------------------------------------ @@ -236,64 +229,36 @@ export class YjsStorage { this.lastSnapshotById.set(docId, snapshot); return snapshot; } - /** - * Given a record of updates, merge them and compress if savings are significant - */ - private _loadAndCompressYJSUpdates = async ( - docUpdates: Record, + + // compact the updates into a single update and write it to the durable storage + private _compactYJSUpdates = async ( doc: Y.Doc, - docId: YDocId + docId: YDocId, + storedKeys: string[] ): Promise => { - // the percent we need to save to trigger re-writing storage, ie. only rewrite storage if we save more than 20% - const SAVINGS_THRESHOLD = 0.2; - // get all updates from disk - const updates = Object.values(docUpdates); - // uint8arrays size on disk is equal to their length, combine them to see how much we're using - const sizeOnDisk = updates.reduce((acc, update) => { - return acc + update.length; - }, 0); - if (updates.length > 0) { - const docKeys = Object.keys(docUpdates); - // keep track of keys in use - this.keysById.set(docId, new Set(docKeys)); - - const mergedUpdate = Y.mergeUpdates(updates); - // Garbage collection won't happen unless we actually apply the update - Y.applyUpdate(doc, mergedUpdate); - - // get the update so we can check out how big it is - const garbageCollectedUpdate = Y.encodeStateAsUpdate(doc); - - if ( - garbageCollectedUpdate.length < - sizeOnDisk * (1 - SAVINGS_THRESHOLD) - ) { - const newKey = nanoid(); - await this.driver.write_y_updates( - docId, - newKey, - garbageCollectedUpdate - ); - // delete all old keys, we're going to write new merged updates - await this.driver.delete_y_updates(docId, docKeys); - this.keysById.set(docId, new Set([newKey])); - } - } + const compactedUpdate = Y.encodeStateAsUpdate(doc); + const newKey = nanoid(); + await this.driver.write_y_updates(docId, newKey, compactedUpdate); + // Todo: after we kill the kv driver, we should have an overwrite method in the driverso we don't need to delete and write + await this.driver.delete_y_updates(docId, storedKeys); + this.storedKeysById.set(docId, [newKey]); }; private _loadYDocFromDurableStorage = async ( doc: Y.Doc, docId: YDocId ): Promise => { - const docUpdates = Object.fromEntries( - await this.driver.iter_y_updates(docId) - ); - await this._loadAndCompressYJSUpdates(docUpdates, doc, docId); - // store the vector of the last update - this.lastUpdatesById.set(docId, { - currentKey: nanoid(), - lastVector: Y.encodeStateVector(doc), - }); + const storedKeys: string[] = []; + for (const [key, update] of await this.driver.iter_y_updates(docId)) { + Y.applyUpdate(doc, update); + storedKeys.push(key); + } + // after compaction, there will only be one unique key. + if (this.shouldCompact(storedKeys)) { + await this._compactYJSUpdates(doc, docId, storedKeys); + } else { + this.storedKeysById.set(docId, storedKeys); + } doc.emit("load", [doc]); // sets the "isLoaded" to true on the doc return doc; @@ -333,51 +298,41 @@ export class YjsStorage { } // When the YJS doc changes, update it in durable storage - private async handleYDocUpdate(doc: Y.Doc): Promise { + private async handleYDocUpdate( + doc: Y.Doc, + update: Uint8Array, + isV2: boolean | undefined + ): Promise { + // Todo: in the future, we should pass this detail to the driver so it can store the version as metadata + // this will be easy for sqlite drivers, but not for the KV driver + const v1update = isV2 ? Y.convertUpdateFormatV2ToV1(update) : update; const docId: YDocId = doc.guid === this.doc.guid ? ROOT_YDOC_ID : (doc.guid as Guid); - const docUpdateInfo = this.lastUpdatesById.get(docId); - // get the update since last vector - const updateSinceLastVector = Y.encodeStateAsUpdate( - doc, - docUpdateInfo?.lastVector - ); - // this should happen before the await on putYDoc to avoid race conditions - // but we need the current key before, so store it here - const storageKey = docUpdateInfo?.currentKey ?? nanoid(); - if (updateSinceLastVector.length > MAX_Y_UPDATE_SIZE) { - // compress update, not using the vector, we want to write the whole doc - const newKey = nanoid(); - await this.driver.write_y_updates( - docId, - newKey, - Y.encodeStateAsUpdate(doc) - ); - // delete all old keys on disk - await this.driver.delete_y_updates( - docId, - Array.from(this.keysById.getOrCreate(docId)) - ); - // update the keys we have stored - this.keysById.set(docId, new Set([newKey])); - // future updates will write from this vector and to this key - this.lastUpdatesById.set(docId, { - currentKey: nanoid(), // start writing to a new key - lastVector: Y.encodeStateVector(doc), - }); + + const storedKeys = this.storedKeysById.get(docId); + + // Every UPDATE_COUNT_THRESHOLD updates, we compact the updates + if (this.shouldCompact(storedKeys)) { + await this._compactYJSUpdates(doc, docId, storedKeys || []); + return; + } + + // the whole concept of storing keys is not needed when we kill the kv driver, all of this stuff is trivial in sqlite + const newKey = nanoid(); + await this.driver.write_y_updates(docId, newKey, v1update); + + // update the stored keys, which we'll need for compaction. + if (!storedKeys) { + this.storedKeysById.set(docId, [newKey]); } else { - // in this case, the update is small enough, just overwrite it - await this.driver.write_y_updates( - docId, - storageKey, - updateSinceLastVector - ); - const keys = [storageKey]; - // keep track of keys used - const currentKeys = this.keysById.getOrCreate(docId); - for (const key of keys) { - currentKeys.add(key); - } + storedKeys.push(newKey); + } + } + + private shouldCompact(storedKeys: string[] | undefined): boolean { + if (!storedKeys) { + return false; } + return storedKeys.length >= this.updateCountThreshold; } }