Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/nuxtjs-live-avatars/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions examples/react-native-todo-list/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

181 changes: 68 additions & 113 deletions packages/liveblocks-server/src/YjsStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import { DefaultMap } from "@liveblocks/core";
import { Base64 } from "js-base64";
import { nanoid } from "nanoid";
import * as Y from "yjs";
Expand All @@ -25,30 +24,24 @@ import { ROOT_YDOC_ID } from "~/decoders";
import type { IStorageDriver } from "~/interfaces";
import type { Logger } from "~/lib/Logger";

// How big an update can be until we compress all individual updates into
// a single vector and persist that instead (i.e. when we trigger "garbage
// collection")
const MAX_Y_UPDATE_SIZE = 100_000;

type YUpdateInfo = {
currentKey: string;
lastVector: Uint8Array | undefined;
};
// How many updates to store before compacting
const UPDATE_COUNT_THRESHOLD = 1_000;

export class YjsStorage {
private readonly driver: IStorageDriver;
private readonly updateCountThreshold: number;

private readonly doc: Y.Doc = new Y.Doc(); // the root document
private readonly lastUpdatesById = new Map<YDocId, YUpdateInfo>();
private readonly lastSnapshotById = new Map<YDocId, Y.Snapshot>();
// Keeps track of which keys are loaded, so we can clean them up without calling `.list()`
private readonly keysById = new DefaultMap<YDocId, Set<string>>(
() => new Set()
);
private readonly initPromisesById: Map<YDocId, Promise<Y.Doc>> = new Map();
private readonly storedKeysById: Map<YDocId, string[]> = new Map();

constructor(driver: IStorageDriver) {
constructor(
driver: IStorageDriver,
updateCountThreshold: number = UPDATE_COUNT_THRESHOLD
) {
this.driver = driver;
this.updateCountThreshold = updateCountThreshold;
this.doc.on("subdocs", ({ removed }) => {
removed.forEach((subdoc: Y.Doc) => {
subdoc.destroy(); // will remove listeners
Expand Down Expand Up @@ -138,7 +131,9 @@ export class YjsStorage {

/**
* @param update base64 encoded uint8array
* @returns
* @returns { isUpdated: boolean; snapshotHash: string }
* isUpdated: true if the update had an effect on the YDoc
* snapshotHash: the hash of the new snapshot
*/
public async addYDocUpdate(
logger: Logger,
Expand All @@ -163,7 +158,7 @@ export class YjsStorage {
// Check the snapshot before/after to see if the update had an effect
const updated = !Y.equalSnapshots(beforeSnapshot, afterSnapshot);
if (updated) {
await this.handleYDocUpdate(doc);
await this.handleYDocUpdate(doc, updateAsU8, isV2);
}

return {
Expand Down Expand Up @@ -208,9 +203,7 @@ export class YjsStorage {
//
// this.doc = new Y.Doc();
// this.initPromisesById.clear();
// this.lastUpdatesById.clear();
// this.keysById.clear();
// this.initPromisesById.clear();
// this.lastSnapshotById.clear();
}

// ------------------------------------------------------------------------------------
Expand All @@ -236,64 +229,36 @@ export class YjsStorage {
this.lastSnapshotById.set(docId, snapshot);
return snapshot;
}
/**
* Given a record of updates, merge them and compress if savings are significant
*/
private _loadAndCompressYJSUpdates = async (
docUpdates: Record<string, Uint8Array>,

// compact the updates into a single update and write it to the durable storage
private _compactYJSUpdates = async (
doc: Y.Doc,
docId: YDocId
docId: YDocId,
storedKeys: string[]
): Promise<void> => {
// the percent we need to save to trigger re-writing storage, ie. only rewrite storage if we save more than 20%
const SAVINGS_THRESHOLD = 0.2;
// get all updates from disk
const updates = Object.values(docUpdates);
// uint8arrays size on disk is equal to their length, combine them to see how much we're using
const sizeOnDisk = updates.reduce((acc, update) => {
return acc + update.length;
}, 0);
if (updates.length > 0) {
const docKeys = Object.keys(docUpdates);
// keep track of keys in use
this.keysById.set(docId, new Set(docKeys));

const mergedUpdate = Y.mergeUpdates(updates);
// Garbage collection won't happen unless we actually apply the update
Y.applyUpdate(doc, mergedUpdate);

// get the update so we can check out how big it is
const garbageCollectedUpdate = Y.encodeStateAsUpdate(doc);

if (
garbageCollectedUpdate.length <
sizeOnDisk * (1 - SAVINGS_THRESHOLD)
) {
const newKey = nanoid();
await this.driver.write_y_updates(
docId,
newKey,
garbageCollectedUpdate
);
// delete all old keys, we're going to write new merged updates
await this.driver.delete_y_updates(docId, docKeys);
this.keysById.set(docId, new Set([newKey]));
}
}
const compactedUpdate = Y.encodeStateAsUpdate(doc);
const newKey = nanoid();
await this.driver.write_y_updates(docId, newKey, compactedUpdate);
// Todo: after we kill the kv driver, we should have an overwrite method in the driverso we don't need to delete and write
await this.driver.delete_y_updates(docId, storedKeys);
this.storedKeysById.set(docId, [newKey]);
};

private _loadYDocFromDurableStorage = async (
doc: Y.Doc,
docId: YDocId
): Promise<Y.Doc> => {
const docUpdates = Object.fromEntries(
await this.driver.iter_y_updates(docId)
);
await this._loadAndCompressYJSUpdates(docUpdates, doc, docId);
// store the vector of the last update
this.lastUpdatesById.set(docId, {
currentKey: nanoid(),
lastVector: Y.encodeStateVector(doc),
});
const storedKeys: string[] = [];
for (const [key, update] of await this.driver.iter_y_updates(docId)) {
Y.applyUpdate(doc, update);
storedKeys.push(key);
}
// after compaction, there will only be one unique key.
if (this.shouldCompact(storedKeys)) {
await this._compactYJSUpdates(doc, docId, storedKeys);
} else {
this.storedKeysById.set(docId, storedKeys);
}
doc.emit("load", [doc]); // sets the "isLoaded" to true on the doc

return doc;
Expand Down Expand Up @@ -333,51 +298,41 @@ export class YjsStorage {
}

// When the YJS doc changes, update it in durable storage
private async handleYDocUpdate(doc: Y.Doc): Promise<void> {
private async handleYDocUpdate(
doc: Y.Doc,
update: Uint8Array,
isV2: boolean | undefined
): Promise<void> {
// Todo: in the future, we should pass this detail to the driver so it can store the version as metadata
// this will be easy for sqlite drivers, but not for the KV driver
const v1update = isV2 ? Y.convertUpdateFormatV2ToV1(update) : update;
const docId: YDocId =
doc.guid === this.doc.guid ? ROOT_YDOC_ID : (doc.guid as Guid);
const docUpdateInfo = this.lastUpdatesById.get(docId);
// get the update since last vector
const updateSinceLastVector = Y.encodeStateAsUpdate(
doc,
docUpdateInfo?.lastVector
);
// this should happen before the await on putYDoc to avoid race conditions
// but we need the current key before, so store it here
const storageKey = docUpdateInfo?.currentKey ?? nanoid();
if (updateSinceLastVector.length > MAX_Y_UPDATE_SIZE) {
// compress update, not using the vector, we want to write the whole doc
const newKey = nanoid();
await this.driver.write_y_updates(
docId,
newKey,
Y.encodeStateAsUpdate(doc)
);
// delete all old keys on disk
await this.driver.delete_y_updates(
docId,
Array.from(this.keysById.getOrCreate(docId))
);
// update the keys we have stored
this.keysById.set(docId, new Set([newKey]));
// future updates will write from this vector and to this key
this.lastUpdatesById.set(docId, {
currentKey: nanoid(), // start writing to a new key
lastVector: Y.encodeStateVector(doc),
});

const storedKeys = this.storedKeysById.get(docId);

// Every UPDATE_COUNT_THRESHOLD updates, we compact the updates
if (this.shouldCompact(storedKeys)) {
await this._compactYJSUpdates(doc, docId, storedKeys || []);
return;
}

// the whole concept of storing keys is not needed when we kill the kv driver, all of this stuff is trivial in sqlite
const newKey = nanoid();
await this.driver.write_y_updates(docId, newKey, v1update);

// update the stored keys, which we'll need for compaction.
if (!storedKeys) {
this.storedKeysById.set(docId, [newKey]);
} else {
// in this case, the update is small enough, just overwrite it
await this.driver.write_y_updates(
docId,
storageKey,
updateSinceLastVector
);
const keys = [storageKey];
// keep track of keys used
const currentKeys = this.keysById.getOrCreate(docId);
for (const key of keys) {
currentKeys.add(key);
}
storedKeys.push(newKey);
}
}

private shouldCompact(storedKeys: string[] | undefined): boolean {
if (!storedKeys) {
return false;
}
return storedKeys.length >= this.updateCountThreshold;
}
}
Loading