Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions packages/liveblocks-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
<a href="https://github.com/liveblocks/liveblocks/blob/main/licenses/LICENSE-AGPL-3.0"><img src="https://img.shields.io/badge/license-AGPL--3.0-green" alt="License" /></a>
</p>

`@liveblocks/server` provides the core Liveblocks server functionality. It
powers both the Liveblocks production environment, and
[our dev server](https://liveblocks.io/docs/tools/dev-server), so the behavior
is identical. You typically don't need to install or use this package directly.
`@liveblocks/server` provides the core Liveblocks server functionality, powering
both the Liveblocks production environment, and [dev server][devserver], so the
behavior is identical. **You typically don't need to install or use this package
directly.**

If you are interested in running Liveblocks locally or within your CI
environment, check out the [dev server][devserver] instead (which has an actual
web server wrapped around it, contains documentation, and a Dockerfile).

While `@liveblocks/server` contains the same core technology that powers
Liveblocks, we do not yet offer or recommend self-hosting or on-premises
Expand All @@ -29,3 +33,5 @@ Licensed under the GNU Affero General Public License v3.0 or later, Copyright ©
2021-present [Liveblocks](https://liveblocks.io).

See [LICENSE-AGPL-3.0](./LICENSE) for more information.

[devserver]: https://liveblocks.io/docs/tools/dev-server
4 changes: 2 additions & 2 deletions packages/liveblocks-server/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@liveblocks/server",
"version": "1.0.14",
"version": "1.0.15",
"description": "Liveblocks backend server foundation.",
"type": "module",
"main": "./dist/index.js",
Expand Down Expand Up @@ -58,7 +58,7 @@
"dependencies": {
"@liveblocks/core": "3.14.0",
"async-mutex": "^0.4.0",
"decoders": "^2.8.0",
"decoders": "^2.9.0-pre.4",
"itertools": "^2.3.2",
"js-base64": "^3.7.5",
"nanoid": "^3",
Expand Down
6 changes: 3 additions & 3 deletions tools/liveblocks-cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "liveblocks",
"version": "1.0.14",
"version": "1.0.15",
"description": "Liveblocks command line interface",
"type": "module",
"bin": {
Expand Down Expand Up @@ -40,9 +40,9 @@
},
"dependencies": {
"@liveblocks/core": "3.14.0",
"@liveblocks/server": "1.0.14",
"@liveblocks/server": "1.0.15",
"@liveblocks/zenrouter": "^1.0.17",
"decoders": "^2.8.0",
"decoders": "^2.9.0-pre.4",
"js-base64": "^3.7.5",
"yjs": "^13.6.10"
}
Expand Down
11 changes: 11 additions & 0 deletions tools/liveblocks-cli/src/dev-server/db/rooms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ export async function remove(roomId: string): Promise<void> {
}
}

/**
* Unload all room instances and clear the instances map, but keep SQLite
* files on disk so storage survives a reboot.
*/
export function unloadAll(): void {
for (const room of instances.values()) {
room.unload();
}
instances.clear();
}

/**
* Unload all room instances and, if in ephemeral mode, remove the temp
* directory.
Expand Down
242 changes: 140 additions & 102 deletions tools/liveblocks-cli/src/dev-server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,118 +174,138 @@ const dev: SubCommand = {
process.exit(1);
}

const server = Bun.serve<SocketData>({
hostname,
port,

async fetch(req, server) {
// WebSocket bypass - handle upgrades directly
if (req.headers.get("Upgrade") === "websocket") {
const authResult = authorizeWebSocket(req);

if (!authResult.ok) {
warn(authResult.xwarn, true);
return refuseSocketConnection(
server,
req,
CloseCode.NOT_ALLOWED,
"You have no access to this room"
);
let server: Bun.Server<SocketData>;

function createServer() {
return Bun.serve<SocketData>({
hostname,
port,

async fetch(req, server) {
// WebSocket bypass - handle upgrades directly
if (req.headers.get("Upgrade") === "websocket") {
const authResult = authorizeWebSocket(req);

if (!authResult.ok) {
warn(authResult.xwarn, true);
return refuseSocketConnection(
server,
req,
CloseCode.NOT_ALLOWED,
"You have no access to this room"
);
}

const { roomId, ticketData } = authResult;

// Look up or create the room for the requested room ID
const room = RoomsDB.getOrCreate(roomId);
await room.load();

const ticket = await room.createTicket(ticketData);
const sessionKey = ticket.sessionKey;
const success = server.upgrade(req, {
data: { room, ticket, sessionKey },
});
if (success) {
// Bun automatically returns a 101 Switching Protocols
// if the upgrade succeeds
console.log(
`${green("101")} WS ${new URL(req.url).pathname}${dim(` - ${roomId}`)}`
);
return undefined;
}

return new Response("Could not upgrade to WebSocket", {
status: 426,
});
}

const { roomId, ticketData } = authResult;
// Force-reboot: drop all connections (clients see 1006) and restart
const url = new URL(req.url);
if (req.method === "POST" && url.pathname === "/crash") {
console.log(`${green("204")} POST /crash`);
setTimeout(() => void reboot(), 0);
return new Response(null, { status: 204 });
}

// Look up or create the room for the requested room ID
const room = RoomsDB.getOrCreate(roomId);
await room.load();
// Defer all other routing to ZenRouter
// TODO: Maybe port this logging to ZenRouter natively
const route = `${req.method} ${url.pathname}`;
const resp = await zen.fetch(req);
const status = resp.status;
const colorStatus =
status >= 500
? red(String(status))
: status >= 400
? yellow(String(status))
: green(String(status));
console.log(`${colorStatus} ${route}`);
const warnMsg = resp.headers.get("X-LB-Warn") ?? undefined;
warn(warnMsg, !resp.ok);
return resp;
},

const ticket = await room.createTicket(ticketData);
const sessionKey = ticket.sessionKey;
const success = server.upgrade(req, {
data: { room, ticket, sessionKey },
});
if (success) {
// Bun automatically returns a 101 Switching Protocols
// if the upgrade succeeds
console.log(
`${green("101")} WS ${new URL(req.url).pathname}${dim(` - ${roomId}`)}`
);
return undefined;
}
// Bun will call this if an error happens during the handling of an HTTP request
error(err): Response {
// YYY Define a lint rule that forbids the use of `console`, in favor of
// using `ctx.logger`
console.error(err);
return new Response("An unknown error occurred", { status: 500 });
},

return new Response("Could not upgrade to WebSocket", {
status: 426,
});
}
websocket: {
// The socket is opened
async open(ws): Promise<void> {
const { refuseConnection, room, ticket } = ws.data;

// Defer all other routing to ZenRouter
// TODO: Maybe port this logging to ZenRouter natively
const url = new URL(req.url);
const route = `${req.method} ${url.pathname}`;
const resp = await zen.fetch(req);
const status = resp.status;
const colorStatus =
status >= 500
? red(String(status))
: status >= 400
? yellow(String(status))
: green(String(status));
console.log(`${colorStatus} ${route}`);
const warnMsg = resp.headers.get("X-LB-Warn") ?? undefined;
warn(warnMsg, !resp.ok);
return resp;
},

// Bun will call this if an error happens during the handling of an HTTP request
error(err): Response {
// YYY Define a lint rule that forbids the use of `console`, in favor of
// using `ctx.logger`
console.error(err);
return new Response("An unknown error occurred", { status: 500 });
},

websocket: {
// The socket is opened
async open(ws): Promise<void> {
const { refuseConnection, room, ticket } = ws.data;

// If this connection should be refused, close it immediately
if (refuseConnection) {
ws.close(refuseConnection.code, refuseConnection.message);
return;
}
// If this connection should be refused, close it immediately
if (refuseConnection) {
ws.close(refuseConnection.code, refuseConnection.message);
return;
}

if (room && ticket) {
await room.startBrowserSession(ticket, ws);
}
},
if (room && ticket) {
await room.startBrowserSession(ticket, ws);
}
},

// A message is received
async message(ws, data): Promise<void> {
const { room, sessionKey } = ws.data;
// Ignore messages for refused connections
if (room && sessionKey) {
await room.handleData(sessionKey, data);
}
},
// A message is received
async message(ws, data): Promise<void> {
const { room, sessionKey } = ws.data;
// Ignore messages for refused connections
if (room && sessionKey) {
await room.handleData(sessionKey, data);
}
},

// The socket is closed by the client side
close(ws, code, message): void {
const { room, sessionKey } = ws.data;
// Ignore close events for refused connections
if (room && sessionKey) {
room.endBrowserSession(sessionKey, code, message);
}
// The socket is closed by the client side
close(ws, code, message): void {
const { room, sessionKey } = ws.data;
// Ignore close events for refused connections
if (room && sessionKey) {
room.endBrowserSession(sessionKey, code, message);
}
},

// The socket is ready to receive more data
// drain(ws): void {
// // Will be invoked if a previous .send() message returned -1 (there
// // was back pressure), but now (at a later moment) the socket is ready
// // to receive more data, so we may want to re-attempt this.
// },
},
});
}

// The socket is ready to receive more data
// drain(ws): void {
// // Will be invoked if a previous .send() message returned -1 (there
// // was back pressure), but now (at a later moment) the socket is ready
// // to receive more data, so we may want to re-attempt this.
// },
},
});
server = createServer();

async function reboot() {
RoomsDB.unloadAll();
await server.stop(true);
server = createServer();
console.log("Crash \uD83D\uDCA5");
}

// -----------------------------------------------------------------------------

Expand Down Expand Up @@ -350,19 +370,37 @@ const dev: SubCommand = {
dim("Press ") +
bold("q") +
dim(" to quit, ") +
bold("!") +
dim(" to crash, ") +
bold("c") +
dim(" to clear")
);

let rebootTimer: ReturnType<typeof setTimeout> | null = null;

process.stdin.setRawMode?.(true);
process.stdin.resume();
process.stdin.on("data", (data: Buffer) => {
const ch = data.toString();
if (ch === "q" || ch === "\x03" /* Ctrl-C */) {
void server.stop().then(() => {
void server.stop(true).then(() => {
RoomsDB.cleanup();
process.exit(0);
});
} else if (ch === "!") {
if (rebootTimer !== null) {
clearTimeout(rebootTimer);
rebootTimer = null;
void reboot();
} else {
console.log(
"Simulating crash in 2.5s... (press ! again to crash now)"
);
rebootTimer = setTimeout(() => {
rebootTimer = null;
void reboot();
}, 2500);
}
} else if (ch === "c") {
console.clear();
} else if (ch === "p") {
Expand Down
Loading