Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/bindings-typescript/src/lib/binary_writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export default class BinaryWriter {
return fromByteArray(this.getBuffer());
}

getBuffer(): Uint8Array {
getBuffer(): Uint8Array<ArrayBuffer> {
return new Uint8Array(this.buffer.buffer, 0, this.offset);
}

Expand Down
23 changes: 16 additions & 7 deletions crates/bindings-typescript/src/sdk/db_connection_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type {
} from '../';
import { ensureMinimumVersionOrThrow } from './version';
import { WebsocketDecompressAdapter } from './websocket_decompress_adapter';
import type { WebSocketFactory } from './ws';

/**
* The database client connection to a SpacetimeDB server.
Expand All @@ -23,10 +24,10 @@ export class DbConnectionBuilder<DbConnection extends DbConnectionImpl<any>> {
#identity?: Identity;
#token?: string;
#emitter: EventEmitter<ConnectionEvent> = new EventEmitter();
#compression: 'gzip' | 'none' = 'gzip';
#compression: 'gzip' | 'brotli' | 'none' = 'gzip';
#lightMode: boolean = false;
#confirmedReads?: boolean;
#createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn;
#createWSFn: WebSocketFactory;

/**
* Creates a new `DbConnectionBuilder` database client and set the initial parameters.
Expand All @@ -42,7 +43,7 @@ export class DbConnectionBuilder<DbConnection extends DbConnectionImpl<any>> {
config: DbConnectionConfig<RemoteModuleOf<DbConnection>>
) => DbConnection
) {
this.#createWSFn = WebsocketDecompressAdapter.createWebSocketFn;
this.#createWSFn = WebsocketDecompressAdapter.openWebSocket;
}

/**
Expand Down Expand Up @@ -82,9 +83,7 @@ export class DbConnectionBuilder<DbConnection extends DbConnectionImpl<any>> {
return this;
}

withWSFn(
createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn
): this {
withWSFn(createWSFn: WebSocketFactory): this {
this.#createWSFn = createWSFn;
return this;
}
Expand All @@ -94,7 +93,17 @@ export class DbConnectionBuilder<DbConnection extends DbConnectionImpl<any>> {
*
* @param compression The compression algorithm to use for the connection.
*/
withCompression(compression: 'gzip' | 'none'): this {
withCompression(compression: 'gzip' | 'brotli' | 'none'): this {
if (compression === 'brotli') {
try {
new DecompressionStream('brotli' as CompressionFormat);
} catch (e) {
throw new TypeError(
`Brotli compression is not supported by the runtime. Please choose a different compression method.`,
{ cause: e }
);
}
}
this.#compression = compression;
return this;
}
Expand Down
17 changes: 7 additions & 10 deletions crates/bindings-typescript/src/sdk/db_connection_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ import {
type PendingCallback,
type TableUpdate as CacheTableUpdate,
} from './table_cache.ts';
import {
WebsocketDecompressAdapter,
type WebsocketAdapter,
} from './websocket_decompress_adapter.ts';
import {
SubscriptionBuilderImpl,
SubscriptionHandleImpl,
Expand All @@ -60,6 +56,7 @@ import type { ProceduresView } from './procedures.ts';
import type { Values } from '../lib/type_util.ts';
import type { TransactionUpdate } from './client_api/types.ts';
import { InternalError, SenderError } from '../lib/errors.ts';
import type { WebSocketAdapter, WebSocketFactory } from './ws.ts';

export {
DbConnectionBuilder,
Expand Down Expand Up @@ -89,8 +86,8 @@ export type DbConnectionConfig<RemoteModule extends UntypedRemoteModule> = {
identity?: Identity;
token?: string;
emitter: EventEmitter<ConnectionEvent>;
createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn;
compression: 'gzip' | 'none';
createWSFn: WebSocketFactory;
compression: 'gzip' | 'brotli' | 'none';
lightMode: boolean;
confirmedReads?: boolean;
remoteModule: RemoteModule;
Expand Down Expand Up @@ -148,7 +145,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
#eventId = 0;
#emitter: EventEmitter<ConnectionEvent>;
#messageQueue = Promise.resolve();
#outboundQueue: Uint8Array[] = [];
#outboundQueue: Uint8Array<ArrayBuffer>[] = [];
#subscriptionManager = new SubscriptionManager<RemoteModule>();
#remoteModule: RemoteModule;
#reducerCallbacks = new Map<
Expand All @@ -173,8 +170,8 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
// private fields.
// We use them in testing.
private clientCache: ClientCache<RemoteModule>;
private ws?: WebsocketAdapter;
private wsPromise: Promise<WebsocketAdapter | undefined>;
private ws?: WebSocketAdapter;
private wsPromise: Promise<WebSocketAdapter | undefined>;

constructor({
uri,
Expand Down Expand Up @@ -541,7 +538,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
return this.#mergeTableUpdates(updates);
}

#flushOutboundQueue(wsResolved: WebsocketAdapter): void {
#flushOutboundQueue(wsResolved: WebSocketAdapter): void {
const pending = this.#outboundQueue.splice(0);
for (const message of pending) {
wsResolved.send(message);
Expand Down
30 changes: 7 additions & 23 deletions crates/bindings-typescript/src/sdk/decompress.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
export async function decompress(
buffer: Uint8Array,
// Leaving it here to expand to brotli when it lands in the browsers and NodeJS
type: 'gzip',
buffer: Uint8Array<ArrayBuffer>,
type: CompressionFormat,
chunkSize: number = 128 * 1024 // 128KB
): Promise<Uint8Array> {
// Create a single ReadableStream to handle chunks
let offset = 0;
const readableStream = new ReadableStream({
const readableStream = new ReadableStream<BufferSource>({
pull(controller) {
if (offset < buffer.length) {
// Slice a chunk of the buffer and enqueue it
Expand All @@ -29,24 +28,9 @@ export async function decompress(
const decompressedStream = readableStream.pipeThrough(decompressionStream);

// Collect the decompressed chunks efficiently
const reader = decompressedStream.getReader();
const chunks: Uint8Array[] = [];
let totalLength = 0;
let result: any;

while (!(result = await reader.read()).done) {
chunks.push(result.value);
totalLength += result.value.length;
const chunks = [];
for await (const chunk of decompressedStream) {
chunks.push(chunk);
}

// Allocate a single Uint8Array for the decompressed data
const decompressedArray = new Uint8Array(totalLength);
let chunkOffset = 0;

for (const chunk of chunks) {
decompressedArray.set(chunk, chunkOffset);
chunkOffset += chunk.length;
}

return decompressedArray;
return new Blob(chunks).bytes();
}
89 changes: 12 additions & 77 deletions crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
import { decompress } from './decompress';
import { resolveWS } from './ws';
import { openWebSocket, type WebSocketAdapter, type WebSocketArgs } from './ws';

export interface WebsocketAdapter {
send(msg: Uint8Array): void;
close(): void;

set onclose(handler: (ev: CloseEvent) => void);
set onopen(handler: () => void);
set onmessage(handler: (msg: { data: Uint8Array }) => void);
set onerror(handler: (msg: ErrorEvent) => void);
}

export class WebsocketDecompressAdapter implements WebsocketAdapter {
export class WebsocketDecompressAdapter implements WebSocketAdapter {
set onclose(handler: (ev: CloseEvent) => void) {
this.#ws.onclose = handler;
}
Expand All @@ -30,16 +20,17 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter {

#ws: WebSocket;

async #decompress(buffer: Uint8Array): Promise<Uint8Array> {
async #decompress(buffer: Uint8Array<ArrayBuffer>): Promise<Uint8Array> {
const tag = buffer[0];
const data = buffer.subarray(1);
switch (tag) {
case 0:
return data;
case 1:
throw new Error(
'Brotli Compression not supported. Please use gzip or none compression in withCompression method on DbConnection.'
);
// Some runtimes support brotli, but it's not yet defined in `lib.dom.d.ts`.
// We assert runtime support in `DbConnectionBuilder.withCompression`, so
// this cast is safe.
return await decompress(data, 'brotli' as CompressionFormat);
case 2:
return await decompress(data, 'gzip');
default:
Expand All @@ -49,7 +40,7 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter {
}
}

send(msg: Uint8Array): void {
send(msg: Uint8Array<ArrayBuffer>): void {
this.#ws.send(msg);
}

Expand All @@ -58,68 +49,12 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter {
}

constructor(ws: WebSocket) {
ws.binaryType = 'arraybuffer';

this.#ws = ws;
}

static async createWebSocketFn({
url,
nameOrAddress,
wsProtocol,
authToken,
compression,
lightMode,
confirmedReads,
}: {
url: URL;
wsProtocol: string;
nameOrAddress: string;
authToken?: string;
compression: 'gzip' | 'none';
lightMode: boolean;
confirmedReads?: boolean;
}): Promise<WebsocketDecompressAdapter> {
const headers = new Headers();

const WS = await resolveWS();

// We swap our original token to a shorter-lived token
// to avoid sending the original via query params.
let temporaryAuthToken: string | undefined = undefined;
if (authToken) {
headers.set('Authorization', `Bearer ${authToken}`);
const tokenUrl = new URL('v1/identity/websocket-token', url);
tokenUrl.protocol = url.protocol === 'wss:' ? 'https:' : 'http:';

const response = await fetch(tokenUrl, { method: 'POST', headers });
if (response.ok) {
const { token } = await response.json();
temporaryAuthToken = token;
} else {
return Promise.reject(
new Error(`Failed to verify token: ${response.statusText}`)
);
}
}

const databaseUrl = new URL(`v1/database/${nameOrAddress}/subscribe`, url);
if (temporaryAuthToken) {
databaseUrl.searchParams.set('token', temporaryAuthToken);
}
databaseUrl.searchParams.set(
'compression',
compression === 'gzip' ? 'Gzip' : 'None'
);
if (lightMode) {
databaseUrl.searchParams.set('light', 'true');
}
if (confirmedReads !== undefined) {
databaseUrl.searchParams.set('confirmed', confirmedReads.toString());
}

const ws = new WS(databaseUrl.toString(), wsProtocol);

return new WebsocketDecompressAdapter(ws);
static async openWebSocket(
args: WebSocketArgs
): Promise<WebsocketDecompressAdapter> {
return new this(await openWebSocket(args));
}
}
19 changes: 4 additions & 15 deletions crates/bindings-typescript/src/sdk/websocket_test_adapter.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { BinaryReader, BinaryWriter } from '../';
import { ClientMessage, ServerMessage } from './client_api/types';
import type { WebsocketAdapter } from './websocket_decompress_adapter';
import type { WebSocketAdapter, WebSocketFactory } from './ws';

class WebsocketTestAdapter implements WebsocketAdapter {
class WebsocketTestAdapter implements WebSocketAdapter {
onclose: any;
// eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
onopen!: () => void;
Expand All @@ -19,7 +19,7 @@ class WebsocketTestAdapter implements WebsocketAdapter {
this.closed = false;
}

send(message: any): void {
send(message: Uint8Array<ArrayBuffer>): void {
const parsedMessage = ClientMessage.deserialize(new BinaryReader(message));
this.outgoingMessages.push(parsedMessage);
// console.ClientMessageSerde.deserialize(message);
Expand Down Expand Up @@ -47,18 +47,7 @@ class WebsocketTestAdapter implements WebsocketAdapter {
this.onmessage({ data: rawBytes });
}

async createWebSocketFn(_args: {
url: URL;
wsProtocol: string;
nameOrAddress: string;
authToken?: string;
compression: 'gzip' | 'none';
lightMode: boolean;
confirmedReads?: boolean;
}): Promise<WebsocketTestAdapter> {
return this;
}
openWebSocket: WebSocketFactory = async () => this;
}

export type { WebsocketTestAdapter };
export default WebsocketTestAdapter;
Loading
Loading