Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 164 additions & 2 deletions packages/client/lib/client/pool.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,170 @@ describe('RedisClientPool', () => {
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClientPool('close', async pool => {
await pool.close()
assert.equal(pool.totalClients, 0)
assert.equal(pool.isOpen, true, 'pool should be open before close');
await pool.close();
assert.equal(pool.totalClients, 0, 'totalClients should be 0 after close');
assert.equal(pool.isOpen, false, 'isOpen should be false after close');
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClientPool('destroy', async pool => {
assert.equal(pool.isOpen, true, 'pool should be open before destroy');
pool.destroy();
assert.equal(pool.totalClients, 0, 'totalClients should be 0 after destroy');
assert.equal(pool.isOpen, false, 'isOpen should be false after destroy');
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClientPool('close waits for in-flight and queued tasks', async pool => {
const events: string[] = [];

// Start a long-running task (will be in-flight)
const task1Promise = pool.execute(async client => {
events.push('task1 started');
await client.set('key1', 'value1');
await new Promise(resolve => setTimeout(resolve, 100));
events.push('task1 completed');
return 'task1';
});

// This task will be queued since the pool has only 1 client (minimum: 1, maximum: 1)
const task2Promise = pool.execute(async client => {
events.push('task2 started');
await client.set('key2', 'value2');
events.push('task2 completed');
return 'task2';
});

// Verify task2 is queued
assert.equal(pool.tasksQueueLength, 1, 'task2 should be queued');

// Close while task1 is running and task2 is queued
await pool.close();
events.push('close completed');

// Desired behavior: close() should wait for all tasks to complete
// - Both in-flight tasks and queued tasks should complete before close() returns
// - All task promises should be fulfilled (not rejected)
assert.deepEqual(events, [
'task1 started',
'task1 completed',
'task2 started',
'task2 completed',
'close completed'
], 'close() should wait for all tasks to complete');

// Verify both tasks completed successfully
const [result1, result2] = await Promise.all([task1Promise, task2Promise]);
assert.equal(result1, 'task1');
assert.equal(result2, 'task2');
}, {
...GLOBAL.SERVERS.OPEN,
poolOptions: { minimum: 1, maximum: 1, acquireTimeout: 2000, cleanupDelay: 400 }
});

testUtils.testWithClientPool('execute rejects when pool is closing', async pool => {
// Start a long-running task to keep the pool busy during close
const task1Promise = pool.execute(async client => {
await new Promise(resolve => setTimeout(resolve, 100));
return 'task1';
});

// Start closing (will wait for task1 to complete)
const closePromise = pool.close();

// Try to execute a new command while closing
await assert.rejects(
pool.execute(client => client.ping()),
{ message: /closed/i },
'execute() should reject when pool is closing'
);

// sendCommand should also reject
await assert.rejects(
pool.sendCommand(['PING']),
{ message: /closed/i },
'sendCommand() should reject when pool is closing'
);

// ping() should also reject
await assert.rejects(
pool.ping(),
{ message: /closed/i },
'ping() should reject when pool is closing'
);

// multi() should also reject when executed
await assert.rejects(
pool.multi().ping().exec(),
{ message: /closed/i },
'multi().exec() should reject when pool is closing'
);

await closePromise;
await task1Promise;
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClientPool('execute rejects when pool is closed', async pool => {
await pool.close();

// Try to execute after pool is closed
await assert.rejects(
pool.execute(client => client.ping()),
{ message: /closed/i },
'execute() should reject when pool is closed'
);

// sendCommand should also reject
await assert.rejects(
pool.sendCommand(['PING']),
{ message: /closed/i },
'sendCommand() should reject when pool is closed'
);

// ping() should also reject
await assert.rejects(
pool.ping(),
{ message: /closed/i },
'ping() should reject when pool is closed'
);

// multi() should also reject when executed
await assert.rejects(
pool.multi().ping().exec(),
{ message: /closed/i },
'multi().exec() should reject when pool is closed'
);
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClientPool('execute rejects when pool is destroyed', async pool => {
pool.destroy();

// Try to execute after pool is destroyed
await assert.rejects(
pool.execute(client => client.ping()),
{ message: /closed/i },
'execute() should reject when pool is destroyed'
);

// sendCommand should also reject
await assert.rejects(
pool.sendCommand(['PING']),
{ message: /closed/i },
'sendCommand() should reject when pool is destroyed'
);

// ping() should also reject
await assert.rejects(
pool.ping(),
{ message: /closed/i },
'ping() should reject when pool is destroyed'
);

// multi() should also reject when executed
await assert.rejects(
pool.multi().ping().exec(),
{ message: /closed/i },
'multi().exec() should reject when pool is destroyed'
);
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClientPool(
Expand Down
69 changes: 47 additions & 22 deletions packages/client/lib/client/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Command, CommandSignature, RedisArgument, RedisFunction, RedisFunctions
import RedisClient, { RedisClientType, RedisClientOptions, WithModules, WithFunctions, WithScripts } from '.';
import { EventEmitter } from 'node:events';
import { DoublyLinkedNode, DoublyLinkedList, SinglyLinkedList } from './linked-list';
import { TimeoutError } from '../errors';
import { ClientClosedError, TimeoutError } from '../errors';
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { CommandOptions } from './commands-queue';
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
Expand All @@ -27,20 +27,20 @@ export interface RedisPoolOptions {
acquireTimeout: number;
/**
* The delay in milliseconds before a cleanup operation is performed on idle clients.
*
* After this delay, the pool will check if there are too many idle clients and destroy
*
* After this delay, the pool will check if there are too many idle clients and destroy
* excess ones to maintain optimal pool size.
*/
cleanupDelay: number;
/**
* Client Side Caching configuration for the pool.
*
* Enables Redis Servers and Clients to work together to cache results from commands
*
* Enables Redis Servers and Clients to work together to cache results from commands
* sent to a server. The server will notify the client when cached results are no longer valid.
* In pooled mode, the cache is shared across all clients in the pool.
*
*
* Note: Client Side Caching is only supported with RESP3.
*
*
* @example Anonymous cache configuration
* ```
* const client = createClientPool({RESP: 3}, {
Expand All @@ -52,7 +52,7 @@ export interface RedisPoolOptions {
* minimum: 5
* });
* ```
*
*
* @example Using a controllable cache
* ```
* const cache = new BasicPooledClientSideCache({
Expand All @@ -69,11 +69,11 @@ export interface RedisPoolOptions {
clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig;
/**
* Enable experimental support for RESP3 module commands.
*
* When enabled, allows the use of module commands that have been adapted
* for the RESP3 protocol. This is an unstable feature and may change in
*
* When enabled, allows the use of module commands that have been adapted
* for the RESP3 protocol. This is an unstable feature and may change in
* future versions.
*
*
* @default false
*/
unstableResp3Modules?: boolean;
Expand Down Expand Up @@ -273,6 +273,12 @@ export class RedisClientPool<
return this._self.#isClosing;
}

/**
* Resolve function called when all in-flight tasks complete during close().
* Used to signal that the pool has finished draining and clients can be closed.
*/
#drainResolve?: () => void;

#clientSideCache?: PooledClientSideCacheProvider;
get clientSideCache() {
return this._self.#clientSideCache;
Expand Down Expand Up @@ -389,6 +395,9 @@ export class RedisClientPool<
}

async #create() {
// Track the client as "in use" during connect so it counts toward capacity.
// If we waited to add it until after connect, the pool would think it doesn't
// exist yet and could spin up extra clients when multiple tasks queue up.
const node = this._self.#clientsInUse.push(
this._self.#clientFactory()
.on('error', (err: Error) => this.emit('error', err))
Expand All @@ -404,9 +413,13 @@ export class RedisClientPool<

this._self.#returnClient(node);
}

execute<T>(fn: PoolTask<M, F, S, RESP, TYPE_MAPPING, T>) {
return new Promise<Awaited<T>>((resolve, reject) => {
if (this._self.#isClosing || !this._self.#isOpen) {
return reject(new ClientClosedError());
}

const client = this._self.#idleClients.shift(),
{ tail } = this._self.#tasksQueue;
if (!client) {
Expand Down Expand Up @@ -470,6 +483,12 @@ export class RedisClientPool<
this.#clientsInUse.remove(node);
this.#idleClients.push(node.value);

// If closing and all tasks are done, signal the drain is complete
if (this.#isClosing && this.#clientsInUse.length === 0) {
this.#drainResolve?.();
return;
}

this.#scheduleCleanup();
}

Expand Down Expand Up @@ -515,29 +534,35 @@ export class RedisClientPool<
if (!this._self.#isOpen) return; // TODO: throw err?

this._self.#isClosing = true;

clearTimeout(this._self.cleanupTimeout);

try {
const promises = [];
// Wait for all in-flight and queued tasks to complete
if (this._self.#clientsInUse.length > 0) {
await new Promise<void>(resolve => {
this._self.#drainResolve = resolve;
});
}

// Now all tasks are done, close all clients (which are now idle)
const promises = [];
for (const client of this._self.#idleClients) {
promises.push(client.close());
}

for (const client of this._self.#clientsInUse) {
promises.push(client.close());
}

await Promise.all(promises);

this._self.#clientSideCache?.onPoolClose();

this._self.#idleClients.reset();
this._self.#clientsInUse.reset();
} catch (err) {

} finally {
this._self.#drainResolve = undefined;
this._self.#isClosing = false;
}
this._self.#isOpen = false;
}
}

destroy() {
Expand Down
6 changes: 4 additions & 2 deletions packages/test-utils/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -766,8 +766,10 @@ export default class TestUtils {
await pool.flushAll();
await fn(pool);
} finally {
await pool.flushAll();
pool.close();
if (pool.isOpen) {
await pool.flushAll();
pool.destroy();
}
}
});
}
Expand Down
Loading