Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions packages/client/lib/client/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,11 @@ describe('Client', () => {
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('AbortError', async client => {
await blockSetImmediate(async () => {
await assert.rejects(client.sendCommand(['PING'], {
abortSignal: AbortSignal.timeout(5)
}), AbortError);
})
await blockSetImmediate(async () => {
await assert.rejects(client.sendCommand(['PING'], {
abortSignal: AbortSignal.timeout(5)
}), AbortError);
})
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('Timeout with custom timeout config', async client => {
Expand Down Expand Up @@ -689,6 +689,37 @@ describe('Client', () => {
}
});


testUtils.testWithClient('proxies respect RedisClient command options', async client => {

const TIMEOUT = 1234;
(client as any)._commandOptions = { timeout: TIMEOUT };

const bufferProxy = client.withCommandOptions({
typeMapping: { [RESP_TYPES.BLOB_STRING]: Buffer }
});

const stringReply = await client.module.echo('hi');
const bufferReply = await bufferProxy.module.echo('hi');


assert.ok((bufferReply as unknown) instanceof Buffer, 'Proxy failed to return Buffer.');
assert.strictEqual(typeof stringReply, 'string', 'Original client was corrupted.');
assert.equal(bufferReply.toString(), stringReply);

const proxyOptions = (bufferProxy.module as any)._commandOptions;
assert.equal(proxyOptions.timeout, TIMEOUT, 'Inherited options (timeout) were lost in the proxy chain.')

assert.ok(!Object.prototype.hasOwnProperty.call(proxyOptions, 'timeout'), 'Timeout should be inherited, not copied.');
Comment thread
cursor[bot] marked this conversation as resolved.
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
modules: {
module
}
}
})

testUtils.testWithClient('duplicate should reuse command options', async client => {
const duplicate = client.duplicate();

Expand Down
177 changes: 90 additions & 87 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { ClientMetricsHandle, ClientRegistry } from '../opentelemetry';
import { ClientIdentity, ClientRole, generateClientId } from './identity';
import { trace, sanitizeArgs, publish, CHANNELS, type CommandTraceContext } from './tracing';

const noop = () => {};
const noop = () => { };

export interface RedisClientOptions<
M extends RedisModules = RedisModules,
Expand Down Expand Up @@ -257,7 +257,10 @@ export type RedisClientType<

type ProxyClient = RedisClient<any, any, any, any, any>;

type NamespaceProxyClient = { _self: ProxyClient };
type NamespaceProxyClient<TM extends TypeMapping = {}> = {
_self: ProxyClient;
_commandOptions?: CommandOptions<TM>
};

interface ScanIteratorOptions {
cursor?: RedisArgument;
Expand Down Expand Up @@ -290,7 +293,7 @@ export default class RedisClient<
const parser = new BasicCommandParser();
command.parseCommand(parser, ...args);

return this._self._executeCommand(command, parser, this._self._commandOptions, transformReply);
return this._self._executeCommand(command, parser, this._commandOptions, transformReply);
Comment thread
cursor[bot] marked this conversation as resolved.
};
}

Expand All @@ -303,7 +306,7 @@ export default class RedisClient<
parser.push(...prefix);
fn.parseCommand(parser, ...args);

return this._self._executeCommand(fn, parser, this._self._commandOptions, transformReply);
return this._self._executeCommand(fn, parser, this._commandOptions, transformReply);
};
}

Expand Down Expand Up @@ -587,7 +590,7 @@ export default class RedisClient<

this.#registerForMetrics();

if(this.#options.maintNotifications !== 'disabled') {
if (this.#options.maintNotifications !== 'disabled') {
new EnterpriseMaintenanceManager(this.#queue, this, this.#options);
};

Expand Down Expand Up @@ -664,7 +667,7 @@ export default class RedisClient<
this._commandOptions = options.commandOptions;
}

if(options.maintNotifications !== 'disabled') {
if (options.maintNotifications !== 'disabled') {
EnterpriseMaintenanceManager.setupDefaultMaintOptions(options);
}

Expand Down Expand Up @@ -847,16 +850,16 @@ export default class RedisClient<
}

if (this.#clientSideCache) {
commands.push({cmd: this.#clientSideCache.trackingOn()});
commands.push({ cmd: this.#clientSideCache.trackingOn() });
}

if (this.#options?.emitInvalidate) {
commands.push({cmd: ['CLIENT', 'TRACKING', 'ON']});
commands.push({ cmd: ['CLIENT', 'TRACKING', 'ON'] });
}

const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(this.#options, this._clientId);

if(maintenanceHandshakeCmd) {
if (maintenanceHandshakeCmd) {
commands.push(maintenanceHandshakeCmd);
};

Expand All @@ -872,24 +875,24 @@ export default class RedisClient<
this.emit('error', err);
}
})
.on('error', err => {
this.emit('error', err);
this.#clientSideCache?.onError();
if (this.#socket.isOpen && !this.#options.disableOfflineQueue) {
this.#queue.flushWaitingForReply(err);
} else {
this.#queue.flushAll(err);
}
})
.on('connect', () => this.emit('connect'))
.on('ready', () => {
this.emit('ready');
this.#setPingTimer();
this.#maybeScheduleWrite();
})
.on('reconnecting', () => this.emit('reconnecting'))
.on('drain', () => this.#maybeScheduleWrite())
.on('end', () => this.emit('end'));
.on('error', err => {
this.emit('error', err);
this.#clientSideCache?.onError();
if (this.#socket.isOpen && !this.#options.disableOfflineQueue) {
this.#queue.flushWaitingForReply(err);
} else {
this.#queue.flushAll(err);
}
})
.on('connect', () => this.emit('connect'))
.on('ready', () => {
this.emit('ready');
this.#setPingTimer();
this.#maybeScheduleWrite();
})
.on('reconnecting', () => this.emit('reconnecting'))
.on('drain', () => this.#maybeScheduleWrite())
.on('end', () => this.emit('end'));
}

#initiateSocket(clientId: string): RedisSocket {
Expand Down Expand Up @@ -958,7 +961,8 @@ export default class RedisClient<
TYPE_MAPPING extends TypeMapping
>(options: OPTIONS) {
const proxy = Object.create(this._self);
proxy._commandOptions = options;
proxy._commandOptions = Object.assign(
Object.create(this._commandOptions ?? null),options);
return proxy as RedisClientType<
M,
F,
Expand Down Expand Up @@ -1055,61 +1059,61 @@ export default class RedisClient<
/**
* @internal
*/
_ejectSocket(): RedisSocket {
const socket = this._self.#socket;
// @ts-ignore
this._self.#socket = null;
socket.removeAllListeners();
return socket;
}

/**
* @internal
*/
_insertSocket(socket: RedisSocket) {
if(this._self.#socket) {
_ejectSocket(): RedisSocket {
const socket = this._self.#socket;
// @ts-ignore
this._self.#socket = null;
socket.removeAllListeners();
return socket;
}

/**
* @internal
*/
_insertSocket(socket: RedisSocket) {
if (this._self.#socket) {
this._self._ejectSocket().destroy();
}
this._self.#socket = socket;
this._self.#attachListeners(this._self.#socket);
}

/**
* @internal
*/
_maintenanceUpdate(update: MaintenanceUpdate) {
this._self.#socket.setMaintenanceTimeout(update.relaxedSocketTimeout);
this._self.#queue.setMaintenanceCommandTimeout(update.relaxedCommandTimeout);
}

/**
* @internal
*/
_pause() {
this._self.#paused = true;
}

/**
* @internal
*/
_unpause() {
this._self.#paused = false;
this._self.#maybeScheduleWrite();
}

/**
* @internal
*/
_handleSmigrated(smigratedEvent: SMigratedEvent) {
this._self.emit(SMIGRATED_EVENT, smigratedEvent);
}

/**
* @internal
*/
_getQueue(): RedisCommandsQueue {
return this._self.#queue;
}
}
this._self.#socket = socket;
this._self.#attachListeners(this._self.#socket);
}

/**
* @internal
*/
_maintenanceUpdate(update: MaintenanceUpdate) {
this._self.#socket.setMaintenanceTimeout(update.relaxedSocketTimeout);
this._self.#queue.setMaintenanceCommandTimeout(update.relaxedCommandTimeout);
}

/**
* @internal
*/
_pause() {
this._self.#paused = true;
}

/**
* @internal
*/
_unpause() {
this._self.#paused = false;
this._self.#maybeScheduleWrite();
}

/**
* @internal
*/
_handleSmigrated(smigratedEvent: SMigratedEvent) {
this._self.emit(SMIGRATED_EVENT, smigratedEvent);
}

/**
* @internal
*/
_getQueue(): RedisCommandsQueue {
return this._self.#queue;
}

/**
* @internal
Expand Down Expand Up @@ -1182,10 +1186,9 @@ export default class RedisClient<
}

// Merge global options with provided options
const opts = {
...this._self._commandOptions,
...options,
};
const opts = options ?
Object.assign(Object.create(this._commandOptions ?? null), options) :
this._commandOptions;

const promise = this._self.#queue.addCommand<T>(args, opts);
this._self.#scheduleWrite();
Expand Down Expand Up @@ -1371,7 +1374,7 @@ export default class RedisClient<
}

#write() {
if(this.#paused) {
if (this.#paused) {
return
}
this.#socket.write(this.#queue.commandsToWrite());
Expand Down
Loading