From 358d75820fc5159a018e2be96778bffad563f24c Mon Sep 17 00:00:00 2001 From: Noa Date: Thu, 5 Mar 2026 12:08:31 -0500 Subject: [PATCH] [TS] Allow brotli to be specified for compression and reorganize some websocket stuff --- .../src/lib/binary_writer.ts | 2 +- .../src/sdk/db_connection_builder.ts | 23 +++-- .../src/sdk/db_connection_impl.ts | 17 ++-- .../bindings-typescript/src/sdk/decompress.ts | 30 ++----- .../src/sdk/websocket_decompress_adapter.ts | 89 +++---------------- .../src/sdk/websocket_test_adapter.ts | 19 +--- crates/bindings-typescript/src/sdk/ws.ts | 83 ++++++++++++++++- .../tests/db_connection.test.ts | 16 ++-- 8 files changed, 135 insertions(+), 144 deletions(-) diff --git a/crates/bindings-typescript/src/lib/binary_writer.ts b/crates/bindings-typescript/src/lib/binary_writer.ts index a66310745ce..fa5a6fb7fc7 100644 --- a/crates/bindings-typescript/src/lib/binary_writer.ts +++ b/crates/bindings-typescript/src/lib/binary_writer.ts @@ -63,7 +63,7 @@ export default class BinaryWriter { return fromByteArray(this.getBuffer()); } - getBuffer(): Uint8Array { + getBuffer(): Uint8Array { return new Uint8Array(this.buffer.buffer, 0, this.offset); } diff --git a/crates/bindings-typescript/src/sdk/db_connection_builder.ts b/crates/bindings-typescript/src/sdk/db_connection_builder.ts index 43bda10f68c..282cab02d37 100644 --- a/crates/bindings-typescript/src/sdk/db_connection_builder.ts +++ b/crates/bindings-typescript/src/sdk/db_connection_builder.ts @@ -8,6 +8,7 @@ import type { } from '../'; import { ensureMinimumVersionOrThrow } from './version'; import { WebsocketDecompressAdapter } from './websocket_decompress_adapter'; +import type { WebSocketFactory } from './ws'; /** * The database client connection to a SpacetimeDB server. @@ -23,10 +24,10 @@ export class DbConnectionBuilder> { #identity?: Identity; #token?: string; #emitter: EventEmitter = new EventEmitter(); - #compression: 'gzip' | 'none' = 'gzip'; + #compression: 'gzip' | 'brotli' | 'none' = 'gzip'; #lightMode: boolean = false; #confirmedReads?: boolean; - #createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn; + #createWSFn: WebSocketFactory; /** * Creates a new `DbConnectionBuilder` database client and set the initial parameters. @@ -42,7 +43,7 @@ export class DbConnectionBuilder> { config: DbConnectionConfig> ) => DbConnection ) { - this.#createWSFn = WebsocketDecompressAdapter.createWebSocketFn; + this.#createWSFn = WebsocketDecompressAdapter.openWebSocket; } /** @@ -82,9 +83,7 @@ export class DbConnectionBuilder> { return this; } - withWSFn( - createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn - ): this { + withWSFn(createWSFn: WebSocketFactory): this { this.#createWSFn = createWSFn; return this; } @@ -94,7 +93,17 @@ export class DbConnectionBuilder> { * * @param compression The compression algorithm to use for the connection. */ - withCompression(compression: 'gzip' | 'none'): this { + withCompression(compression: 'gzip' | 'brotli' | 'none'): this { + if (compression === 'brotli') { + try { + new DecompressionStream('brotli' as CompressionFormat); + } catch (e) { + throw new TypeError( + `Brotli compression is not supported by the runtime. Please choose a different compression method.`, + { cause: e } + ); + } + } this.#compression = compression; return this; } diff --git a/crates/bindings-typescript/src/sdk/db_connection_impl.ts b/crates/bindings-typescript/src/sdk/db_connection_impl.ts index c2c26123bfe..bc65f87aabe 100644 --- a/crates/bindings-typescript/src/sdk/db_connection_impl.ts +++ b/crates/bindings-typescript/src/sdk/db_connection_impl.ts @@ -37,10 +37,6 @@ import { type PendingCallback, type TableUpdate as CacheTableUpdate, } from './table_cache.ts'; -import { - WebsocketDecompressAdapter, - type WebsocketAdapter, -} from './websocket_decompress_adapter.ts'; import { SubscriptionBuilderImpl, SubscriptionHandleImpl, @@ -60,6 +56,7 @@ import type { ProceduresView } from './procedures.ts'; import type { Values } from '../lib/type_util.ts'; import type { TransactionUpdate } from './client_api/types.ts'; import { InternalError, SenderError } from '../lib/errors.ts'; +import type { WebSocketAdapter, WebSocketFactory } from './ws.ts'; export { DbConnectionBuilder, @@ -89,8 +86,8 @@ export type DbConnectionConfig = { identity?: Identity; token?: string; emitter: EventEmitter; - createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn; - compression: 'gzip' | 'none'; + createWSFn: WebSocketFactory; + compression: 'gzip' | 'brotli' | 'none'; lightMode: boolean; confirmedReads?: boolean; remoteModule: RemoteModule; @@ -148,7 +145,7 @@ export class DbConnectionImpl #eventId = 0; #emitter: EventEmitter; #messageQueue = Promise.resolve(); - #outboundQueue: Uint8Array[] = []; + #outboundQueue: Uint8Array[] = []; #subscriptionManager = new SubscriptionManager(); #remoteModule: RemoteModule; #reducerCallbacks = new Map< @@ -173,8 +170,8 @@ export class DbConnectionImpl // private fields. // We use them in testing. private clientCache: ClientCache; - private ws?: WebsocketAdapter; - private wsPromise: Promise; + private ws?: WebSocketAdapter; + private wsPromise: Promise; constructor({ uri, @@ -541,7 +538,7 @@ export class DbConnectionImpl return this.#mergeTableUpdates(updates); } - #flushOutboundQueue(wsResolved: WebsocketAdapter): void { + #flushOutboundQueue(wsResolved: WebSocketAdapter): void { const pending = this.#outboundQueue.splice(0); for (const message of pending) { wsResolved.send(message); diff --git a/crates/bindings-typescript/src/sdk/decompress.ts b/crates/bindings-typescript/src/sdk/decompress.ts index cdc49a2241a..8c35e0d90ba 100644 --- a/crates/bindings-typescript/src/sdk/decompress.ts +++ b/crates/bindings-typescript/src/sdk/decompress.ts @@ -1,12 +1,11 @@ export async function decompress( - buffer: Uint8Array, - // Leaving it here to expand to brotli when it lands in the browsers and NodeJS - type: 'gzip', + buffer: Uint8Array, + type: CompressionFormat, chunkSize: number = 128 * 1024 // 128KB ): Promise { // Create a single ReadableStream to handle chunks let offset = 0; - const readableStream = new ReadableStream({ + const readableStream = new ReadableStream({ pull(controller) { if (offset < buffer.length) { // Slice a chunk of the buffer and enqueue it @@ -29,24 +28,9 @@ export async function decompress( const decompressedStream = readableStream.pipeThrough(decompressionStream); // Collect the decompressed chunks efficiently - const reader = decompressedStream.getReader(); - const chunks: Uint8Array[] = []; - let totalLength = 0; - let result: any; - - while (!(result = await reader.read()).done) { - chunks.push(result.value); - totalLength += result.value.length; + const chunks = []; + for await (const chunk of decompressedStream) { + chunks.push(chunk); } - - // Allocate a single Uint8Array for the decompressed data - const decompressedArray = new Uint8Array(totalLength); - let chunkOffset = 0; - - for (const chunk of chunks) { - decompressedArray.set(chunk, chunkOffset); - chunkOffset += chunk.length; - } - - return decompressedArray; + return new Blob(chunks).bytes(); } diff --git a/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts b/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts index 40157393dd1..1038df17933 100644 --- a/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts +++ b/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts @@ -1,17 +1,7 @@ import { decompress } from './decompress'; -import { resolveWS } from './ws'; +import { openWebSocket, type WebSocketAdapter, type WebSocketArgs } from './ws'; -export interface WebsocketAdapter { - send(msg: Uint8Array): void; - close(): void; - - set onclose(handler: (ev: CloseEvent) => void); - set onopen(handler: () => void); - set onmessage(handler: (msg: { data: Uint8Array }) => void); - set onerror(handler: (msg: ErrorEvent) => void); -} - -export class WebsocketDecompressAdapter implements WebsocketAdapter { +export class WebsocketDecompressAdapter implements WebSocketAdapter { set onclose(handler: (ev: CloseEvent) => void) { this.#ws.onclose = handler; } @@ -30,16 +20,17 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter { #ws: WebSocket; - async #decompress(buffer: Uint8Array): Promise { + async #decompress(buffer: Uint8Array): Promise { const tag = buffer[0]; const data = buffer.subarray(1); switch (tag) { case 0: return data; case 1: - throw new Error( - 'Brotli Compression not supported. Please use gzip or none compression in withCompression method on DbConnection.' - ); + // Some runtimes support brotli, but it's not yet defined in `lib.dom.d.ts`. + // We assert runtime support in `DbConnectionBuilder.withCompression`, so + // this cast is safe. + return await decompress(data, 'brotli' as CompressionFormat); case 2: return await decompress(data, 'gzip'); default: @@ -49,7 +40,7 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter { } } - send(msg: Uint8Array): void { + send(msg: Uint8Array): void { this.#ws.send(msg); } @@ -58,68 +49,12 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter { } constructor(ws: WebSocket) { - ws.binaryType = 'arraybuffer'; - this.#ws = ws; } - static async createWebSocketFn({ - url, - nameOrAddress, - wsProtocol, - authToken, - compression, - lightMode, - confirmedReads, - }: { - url: URL; - wsProtocol: string; - nameOrAddress: string; - authToken?: string; - compression: 'gzip' | 'none'; - lightMode: boolean; - confirmedReads?: boolean; - }): Promise { - const headers = new Headers(); - - const WS = await resolveWS(); - - // We swap our original token to a shorter-lived token - // to avoid sending the original via query params. - let temporaryAuthToken: string | undefined = undefined; - if (authToken) { - headers.set('Authorization', `Bearer ${authToken}`); - const tokenUrl = new URL('v1/identity/websocket-token', url); - tokenUrl.protocol = url.protocol === 'wss:' ? 'https:' : 'http:'; - - const response = await fetch(tokenUrl, { method: 'POST', headers }); - if (response.ok) { - const { token } = await response.json(); - temporaryAuthToken = token; - } else { - return Promise.reject( - new Error(`Failed to verify token: ${response.statusText}`) - ); - } - } - - const databaseUrl = new URL(`v1/database/${nameOrAddress}/subscribe`, url); - if (temporaryAuthToken) { - databaseUrl.searchParams.set('token', temporaryAuthToken); - } - databaseUrl.searchParams.set( - 'compression', - compression === 'gzip' ? 'Gzip' : 'None' - ); - if (lightMode) { - databaseUrl.searchParams.set('light', 'true'); - } - if (confirmedReads !== undefined) { - databaseUrl.searchParams.set('confirmed', confirmedReads.toString()); - } - - const ws = new WS(databaseUrl.toString(), wsProtocol); - - return new WebsocketDecompressAdapter(ws); + static async openWebSocket( + args: WebSocketArgs + ): Promise { + return new this(await openWebSocket(args)); } } diff --git a/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts b/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts index 6ac15f0e7fe..8d33b426a2e 100644 --- a/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts +++ b/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts @@ -1,8 +1,8 @@ import { BinaryReader, BinaryWriter } from '../'; import { ClientMessage, ServerMessage } from './client_api/types'; -import type { WebsocketAdapter } from './websocket_decompress_adapter'; +import type { WebSocketAdapter, WebSocketFactory } from './ws'; -class WebsocketTestAdapter implements WebsocketAdapter { +class WebsocketTestAdapter implements WebSocketAdapter { onclose: any; // eslint-disable-next-line @typescript-eslint/no-unsafe-function-type onopen!: () => void; @@ -19,7 +19,7 @@ class WebsocketTestAdapter implements WebsocketAdapter { this.closed = false; } - send(message: any): void { + send(message: Uint8Array): void { const parsedMessage = ClientMessage.deserialize(new BinaryReader(message)); this.outgoingMessages.push(parsedMessage); // console.ClientMessageSerde.deserialize(message); @@ -47,18 +47,7 @@ class WebsocketTestAdapter implements WebsocketAdapter { this.onmessage({ data: rawBytes }); } - async createWebSocketFn(_args: { - url: URL; - wsProtocol: string; - nameOrAddress: string; - authToken?: string; - compression: 'gzip' | 'none'; - lightMode: boolean; - confirmedReads?: boolean; - }): Promise { - return this; - } + openWebSocket: WebSocketFactory = async () => this; } -export type { WebsocketTestAdapter }; export default WebsocketTestAdapter; diff --git a/crates/bindings-typescript/src/sdk/ws.ts b/crates/bindings-typescript/src/sdk/ws.ts index fb7279d6e50..6a77a404f34 100644 --- a/crates/bindings-typescript/src/sdk/ws.ts +++ b/crates/bindings-typescript/src/sdk/ws.ts @@ -1,9 +1,9 @@ import { stdbLogger } from './logger'; -export async function resolveWS(): Promise { +async function resolveWS(): Promise { // Browser or Node >= 22 (or any env that exposes global WebSocket) - if (typeof (globalThis as any).WebSocket !== 'undefined') { - return (globalThis as any).WebSocket as typeof WebSocket; + if (typeof WebSocket !== 'undefined') { + return WebSocket; } // Node without a global WebSocket: lazily load undici's polyfill. @@ -25,3 +25,80 @@ export async function resolveWS(): Promise { throw err; } } + +export interface WebSocketAdapter { + send(msg: Uint8Array): void; + close(): void; + + set onclose(handler: (ev: CloseEvent) => void); + set onopen(handler: () => void); + set onmessage(handler: (msg: { data: Uint8Array }) => void); + set onerror(handler: (msg: ErrorEvent) => void); +} + +export interface WebSocketArgs { + url: URL; + wsProtocol: string; + nameOrAddress: string; + authToken?: string; + compression: 'gzip' | 'brotli' | 'none'; + lightMode: boolean; + confirmedReads?: boolean; +} +export type WebSocketFactory = ( + args: WebSocketArgs +) => Promise; + +/** + * Open a WebSocket to the database specified by the given `WebSocketArgs`. + * @returns a WebSocket with `binaryType` set to `arraybuffer`. + */ +export async function openWebSocket({ + url, + nameOrAddress, + wsProtocol, + authToken, + compression, + lightMode, + confirmedReads, +}: WebSocketArgs): Promise { + const headers = new Headers(); + + const WS = await resolveWS(); + + // We swap our original token to a shorter-lived token + // to avoid sending the original via query params. + let temporaryAuthToken: string | undefined; + if (authToken) { + headers.set('Authorization', `Bearer ${authToken}`); + const tokenUrl = new URL('v1/identity/websocket-token', url); + tokenUrl.protocol = url.protocol === 'wss:' ? 'https:' : 'http:'; + + const response = await fetch(tokenUrl, { method: 'POST', headers }); + if (response.ok) { + const { token } = await response.json(); + temporaryAuthToken = token; + } else { + throw new Error(`Failed to verify token: ${response.statusText}`); + } + } + + const databaseUrl = new URL(`v1/database/${nameOrAddress}/subscribe`, url); + if (temporaryAuthToken) { + databaseUrl.searchParams.set('token', temporaryAuthToken); + } + databaseUrl.searchParams.set( + 'compression', + { gzip: 'Gzip', brotli: 'Brotli', none: 'None' }[compression] ?? 'None' + ); + if (lightMode) { + databaseUrl.searchParams.set('light', 'true'); + } + if (confirmedReads !== undefined) { + databaseUrl.searchParams.set('confirmed', confirmedReads.toString()); + } + + const ws = new WS(databaseUrl.toString(), wsProtocol); + ws.binaryType = 'arraybuffer'; + return ws; +} diff --git a/crates/bindings-typescript/tests/db_connection.test.ts b/crates/bindings-typescript/tests/db_connection.test.ts index ec17430e41a..6a0e6614cd3 100644 --- a/crates/bindings-typescript/tests/db_connection.test.ts +++ b/crates/bindings-typescript/tests/db_connection.test.ts @@ -172,7 +172,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .onConnect(() => { called = true; onConnectPromise.resolve(); @@ -200,7 +200,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .onDisconnect(() => { onDisconnectPromise.resolve(); }) @@ -226,7 +226,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .build(); await client['wsPromise']; @@ -268,7 +268,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .onConnect(() => { onConnectPromise.resolve(); }) @@ -334,7 +334,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .onConnect(() => { onConnectPromise.resolve(); }) @@ -379,7 +379,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .onConnect(() => { onConnectPromise.resolve(); }) @@ -656,7 +656,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .onConnect(() => {}) .build(); @@ -747,7 +747,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .build(); await client['wsPromise']; const user1 = { identity: bobIdentity, username: 'bob' };