From 98cd57555cd99fbac4a5b32c15ea1fe1dbabec64 Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Thu, 19 Feb 2026 12:38:19 +0100 Subject: [PATCH] feat: Respect job TTL on Redis. Signed-off-by: Paolo Insogna --- src/storage/redis.ts | 76 ++++++++++++++++++++++++++++++++------ test/redis-storage.test.ts | 20 ++++++++++ 2 files changed, 84 insertions(+), 12 deletions(-) diff --git a/src/storage/redis.ts b/src/storage/redis.ts index 41d2467..5ebb2ef 100644 --- a/src/storage/redis.ts +++ b/src/storage/redis.ts @@ -1,12 +1,11 @@ import { type Redis } from 'iovalkey' import { EventEmitter } from 'node:events' import { readFileSync } from 'node:fs' -import { dirname, join } from 'node:path' -import { fileURLToPath } from 'node:url' +import { join } from 'node:path' import type { Storage } from './types.ts' import { loadOptionalDependency } from './utils.ts' -const __dirname = dirname(fileURLToPath(import.meta.url)) +const EXPIRING_VALUE_HEADER_SIZE = 8 // First 8 bytes are the expireAt timestamp in milliseconds (Uint64 BE) interface RedisStorageConfig { url?: string @@ -75,6 +74,29 @@ export class RedisStorage implements Storage { return this.#key('workers') } + #encodeExpiringValue (value: Buffer, ttlMs: number): Buffer { + const buffer = Buffer.allocUnsafe(EXPIRING_VALUE_HEADER_SIZE + value.length) + buffer.writeBigInt64BE(BigInt(Date.now() + ttlMs)) + value.copy(buffer, EXPIRING_VALUE_HEADER_SIZE) + return buffer + } + + #decodeExpiringValue (value: Buffer): { payload: Buffer; expiresAt: number } | null { + if (value.length < EXPIRING_VALUE_HEADER_SIZE) { + return null + } + + const expiresAt = Number(value.readBigInt64BE(0)) + if (!Number.isFinite(expiresAt) || expiresAt <= 0) { + return null + } + + return { + payload: value.subarray(EXPIRING_VALUE_HEADER_SIZE), + expiresAt + } + } + async connect (): Promise { if (this.#client) return @@ -119,7 +141,7 @@ export class RedisStorage implements Storage { } async #loadScripts (): Promise { - const scriptsDir = join(__dirname, '..', '..', 'redis-scripts') + const scriptsDir = join(import.meta.dirname, '..', '..', 'redis-scripts') const enqueueScript = readFileSync(join(scriptsDir, 'enqueue.lua'), 'utf8') const completeScript = readFileSync(join(scriptsDir, 'complete.lua'), 'utf8') @@ -240,23 +262,51 @@ export class RedisStorage implements Storage { } async setResult (id: string, result: Buffer, ttlMs: number): Promise { - await this.#client!.hset(this.#resultsKey(), id, result) - // Note: HEXPIRE is not widely supported, so we set TTL on the whole hash - // For production, consider using separate keys per result + await this.#client!.hset(this.#resultsKey(), id, this.#encodeExpiringValue(result, ttlMs)) } async getResult (id: string): Promise { const result = await this.#client!.hgetBuffer(this.#resultsKey(), id) - return result + if (!result) { + return null + } + + const decoded = this.#decodeExpiringValue(result) + if (!decoded) { + // Backward compatibility for legacy entries stored without envelope + return result + } + + if (Date.now() > decoded.expiresAt) { + await this.#client!.hdel(this.#resultsKey(), id) + return null + } + + return decoded.payload } async setError (id: string, error: Buffer, ttlMs: number): Promise { - await this.#client!.hset(this.#errorsKey(), id, error) + await this.#client!.hset(this.#errorsKey(), id, this.#encodeExpiringValue(error, ttlMs)) } async getError (id: string): Promise { const result = await this.#client!.hgetBuffer(this.#errorsKey(), id) - return result + if (!result) { + return null + } + + const decoded = this.#decodeExpiringValue(result) + if (!decoded) { + // Backward compatibility for legacy entries stored without envelope + return result + } + + if (Date.now() > decoded.expiresAt) { + await this.#client!.hdel(this.#errorsKey(), id) + return null + } + + return decoded.payload } async registerWorker (workerId: string, ttlMs: number): Promise { @@ -328,6 +378,7 @@ export class RedisStorage implements Storage { async completeJob (id: string, message: Buffer, workerId: string, result: Buffer, resultTTL: number): Promise { const timestamp = Date.now() const state = `completed:${timestamp}` + const encodedResult = this.#encodeExpiringValue(result, resultTTL) await this.#client!.evalsha( this.#scriptSHAs!.complete, @@ -338,7 +389,7 @@ export class RedisStorage implements Storage { id, message, state, - result, + encodedResult, resultTTL.toString() ) @@ -350,6 +401,7 @@ export class RedisStorage implements Storage { async failJob (id: string, message: Buffer, workerId: string, error: Buffer, errorTTL: number): Promise { const timestamp = Date.now() const state = `failed:${timestamp}` + const encodedError = this.#encodeExpiringValue(error, errorTTL) await this.#client!.evalsha( this.#scriptSHAs!.fail, @@ -360,7 +412,7 @@ export class RedisStorage implements Storage { id, message, state, - error, + encodedError, errorTTL.toString() ) diff --git a/test/redis-storage.test.ts b/test/redis-storage.test.ts index 9c90384..223db65 100644 --- a/test/redis-storage.test.ts +++ b/test/redis-storage.test.ts @@ -184,6 +184,16 @@ describe('RedisStorage', () => { const result = await storage.getResult('non-existent') assert.strictEqual(result, null) }) + + it('should expire results per job without affecting other entries', async () => { + await storage.setResult('job-short', Buffer.from('short'), 20) + await storage.setResult('job-long', Buffer.from('long'), 1000) + + await sleep(30) + + assert.strictEqual(await storage.getResult('job-short'), null) + assert.deepStrictEqual(await storage.getResult('job-long'), Buffer.from('long')) + }) }) describe('errors', () => { @@ -199,6 +209,16 @@ describe('RedisStorage', () => { const error = await storage.getError('non-existent') assert.strictEqual(error, null) }) + + it('should expire errors per job without affecting other entries', async () => { + await storage.setError('job-short', Buffer.from('short-error'), 20) + await storage.setError('job-long', Buffer.from('long-error'), 1000) + + await sleep(30) + + assert.strictEqual(await storage.getError('job-short'), null) + assert.deepStrictEqual(await storage.getError('job-long'), Buffer.from('long-error')) + }) }) describe('workers', () => {