From 344fb92371c3dcc02728c5b98f9ee2021f49d05a Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Wed, 4 Mar 2026 15:56:20 +0100 Subject: [PATCH] feat: Clean up job IDs from dedup cache when resultTTL expires MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Job entries in the jobs hash previously persisted forever, even after the result/error TTL expired. This meant a completed job's ID could never be reused. Now deduplication lasts only as long as the result is cached — once resultTTL expires, the job ID is cleaned up so it can be re-enqueued. Uses a companion expiry field ({id}:exp in Redis, #jobExpiry map in Memory, {id}.ttl file in File) to track when terminal job entries should expire. On enqueue, if the existing job has expired, it is deleted and re-enqueue proceeds. The updateResultTTL method also syncs the dedup expiry when extending the result TTL. Co-Authored-By: Claude Opus 4.6 --- redis-scripts/cancel.lua | 2 + redis-scripts/complete.lua | 6 ++ redis-scripts/enqueue.lua | 15 ++++- redis-scripts/fail.lua | 6 ++ src/producer.ts | 2 + src/storage/file.ts | 67 +++++++++++++++++++- src/storage/memory.ts | 43 ++++++++++++- src/storage/redis.ts | 37 ++++++++++- src/storage/types.ts | 6 ++ test/deduplication.test.ts | 125 +++++++++++++++++++++++++++++++++++++ test/queue.test.ts | 6 +- 11 files changed, 305 insertions(+), 10 deletions(-) diff --git a/redis-scripts/cancel.lua b/redis-scripts/cancel.lua index 378dd1f..d9ac3a0 100644 --- a/redis-scripts/cancel.lua +++ b/redis-scripts/cancel.lua @@ -5,6 +5,8 @@ local deleted = redis.call('HDEL', KEYS[1], ARGV[1]) if deleted == 1 then + -- Also clean up expiry field + redis.call('HDEL', KEYS[1], ARGV[1] .. ':exp') redis.call('PUBLISH', 'job:events', ARGV[1] .. ':cancelled') return 1 end diff --git a/redis-scripts/complete.lua b/redis-scripts/complete.lua index fc9bbb1..8984b09 100644 --- a/redis-scripts/complete.lua +++ b/redis-scripts/complete.lua @@ -12,6 +12,12 @@ -- Set state to completed redis.call('HSET', KEYS[1], ARGV[1], ARGV[3]) +-- Set dedup expiry +local now = redis.call('TIME') +local nowMs = tonumber(now[1]) * 1000 + math.floor(tonumber(now[2]) / 1000) +local expiresAt = nowMs + tonumber(ARGV[5]) +redis.call('HSET', KEYS[1], ARGV[1] .. ':exp', tostring(expiresAt)) + -- Store result with TTL redis.call('HSET', KEYS[2], ARGV[1], ARGV[4]) redis.call('PEXPIRE', KEYS[2], ARGV[5]) diff --git a/redis-scripts/enqueue.lua b/redis-scripts/enqueue.lua index c9eda5a..a773fd9 100644 --- a/redis-scripts/enqueue.lua +++ b/redis-scripts/enqueue.lua @@ -8,7 +8,20 @@ local existing = redis.call('HGET', KEYS[1], ARGV[1]) if existing then - return existing + -- Check if the job has a dedup expiry and if it has passed + local expiry = redis.call('HGET', KEYS[1], ARGV[1] .. ':exp') + if expiry then + local now = redis.call('TIME') + local nowMs = tonumber(now[1]) * 1000 + math.floor(tonumber(now[2]) / 1000) + if nowMs >= tonumber(expiry) then + -- Expired: remove both fields and fall through to enqueue as new + redis.call('HDEL', KEYS[1], ARGV[1], ARGV[1] .. ':exp') + else + return existing + end + else + return existing + end end redis.call('HSET', KEYS[1], ARGV[1], ARGV[3]) diff --git a/redis-scripts/fail.lua b/redis-scripts/fail.lua index a146acb..5420194 100644 --- a/redis-scripts/fail.lua +++ b/redis-scripts/fail.lua @@ -12,6 +12,12 @@ -- Set state to failed redis.call('HSET', KEYS[1], ARGV[1], ARGV[3]) +-- Set dedup expiry +local now = redis.call('TIME') +local nowMs = tonumber(now[1]) * 1000 + math.floor(tonumber(now[2]) / 1000) +local expiresAt = nowMs + tonumber(ARGV[5]) +redis.call('HSET', KEYS[1], ARGV[1] .. ':exp', tostring(expiresAt)) + -- Store error with TTL redis.call('HSET', KEYS[2], ARGV[1], ARGV[4]) redis.call('PEXPIRE', KEYS[2], ARGV[5]) diff --git a/src/producer.ts b/src/producer.ts index 20a83d7..328c001 100644 --- a/src/producer.ts +++ b/src/producer.ts @@ -219,6 +219,7 @@ export class Producer { return { status: 'missing_payload' } } await this.#storage.setResult(id, existingResult, ttlMs) + await this.#storage.setJobExpiry(id, ttlMs) this.#logger.debug({ id, ttlMs }, 'Updated completed payload TTL.') return { status: 'updated' } } @@ -229,6 +230,7 @@ export class Producer { return { status: 'missing_payload' } } await this.#storage.setError(id, existingError, ttlMs) + await this.#storage.setJobExpiry(id, ttlMs) this.#logger.debug({ id, ttlMs }, 'Updated failed payload TTL.') return { status: 'updated' } } diff --git a/src/storage/file.ts b/src/storage/file.ts index 3baaa74..e84c6b2 100644 --- a/src/storage/file.ts +++ b/src/storage/file.ts @@ -301,7 +301,22 @@ export class FileStorage implements Storage { // Check if job already exists try { const existing = await readFile(jobFile, 'utf8') - return existing + + // Check if the job has a dedup expiry and if it has passed + try { + const ttlPath = join(this.#jobsPath, `${id}.ttl`) + const expiresAt = parseInt(await readFile(ttlPath, 'utf8'), 10) + if (Date.now() >= expiresAt) { + // Expired: remove both files and fall through to enqueue as new + await unlink(jobFile).catch(() => {}) + await unlink(ttlPath).catch(() => {}) + } else { + return existing + } + } catch { + // No ttl file means no expiry — always block + return existing + } } catch { // Job doesn't exist, continue } @@ -375,7 +390,22 @@ export class FileStorage implements Storage { async getJobState (id: string): Promise { try { - return await readFile(join(this.#jobsPath, `${id}.state`), 'utf8') + const state = await readFile(join(this.#jobsPath, `${id}.state`), 'utf8') + + // Check dedup expiry + try { + const ttlPath = join(this.#jobsPath, `${id}.ttl`) + const expiresAt = parseInt(await readFile(ttlPath, 'utf8'), 10) + if (Date.now() >= expiresAt) { + await unlink(join(this.#jobsPath, `${id}.state`)).catch(() => {}) + await unlink(ttlPath).catch(() => {}) + return null + } + } catch { + // No ttl file — not expired + } + + return state } catch { return null } @@ -388,6 +418,7 @@ export class FileStorage implements Storage { async deleteJob (id: string): Promise { try { await unlink(join(this.#jobsPath, `${id}.state`)) + await unlink(join(this.#jobsPath, `${id}.ttl`)).catch(() => {}) this.#eventEmitter.emit('event', id, 'cancelled') return true } catch { @@ -405,6 +436,12 @@ export class FileStorage implements Storage { return result } + async setJobExpiry (id: string, ttlMs: number): Promise { + const ttlPath = join(this.#jobsPath, `${id}.ttl`) + const expiresAt = Date.now() + ttlMs + await this.#writeFileAtomic(ttlPath, expiresAt.toString()) + } + async setResult (id: string, result: Buffer, ttlMs: number): Promise { const filePath = join(this.#resultsPath, `${id}.result`) // Store TTL expiry time in a companion file @@ -567,6 +604,9 @@ export class FileStorage implements Storage { // Set state to completed await this.setJobState(id, `completed:${timestamp}`) + // Set dedup expiry + await this.setJobExpiry(id, resultTTL) + // Store result await this.setResult(id, result, resultTTL) @@ -586,6 +626,9 @@ export class FileStorage implements Storage { // Set state to failed await this.setJobState(id, `failed:${timestamp}`) + // Set dedup expiry + await this.setJobExpiry(id, errorTTL) + // Store error await this.setError(id, error, errorTTL) @@ -618,6 +661,26 @@ export class FileStorage implements Storage { async #cleanupExpired (): Promise { const now = Date.now() + // Clean expired job entries + try { + const jobFiles = await readdir(this.#jobsPath) + for (const file of jobFiles) { + if (!file.endsWith('.ttl')) continue + const id = file.replace('.ttl', '') + try { + const expiresAt = parseInt(await readFile(join(this.#jobsPath, file), 'utf8'), 10) + if (now >= expiresAt) { + await unlink(join(this.#jobsPath, `${id}.state`)).catch(() => {}) + await unlink(join(this.#jobsPath, file)).catch(() => {}) + } + } catch { + // Ignore errors + } + } + } catch { + // Ignore errors + } + // Clean expired results try { const resultFiles = await readdir(this.#resultsPath) diff --git a/src/storage/memory.ts b/src/storage/memory.ts index 08e3c7d..035004f 100644 --- a/src/storage/memory.ts +++ b/src/storage/memory.ts @@ -23,6 +23,7 @@ export class MemoryStorage implements Storage { #queue: Buffer[] = [] #processingQueues: Map = new Map() #jobs: Map = new Map() + #jobExpiry: Map = new Map() #results: Map = new Map() #errors: Map = new Map() #workers: Map = new Map() @@ -68,7 +69,13 @@ export class MemoryStorage implements Storage { async enqueue (id: string, message: Buffer, timestamp: number): Promise { const existing = this.#jobs.get(id) if (existing) { - return existing + const expiry = this.#jobExpiry.get(id) + if (expiry && Date.now() >= expiry) { + this.#jobs.delete(id) + this.#jobExpiry.delete(id) + } else { + return existing + } } this.#jobs.set(id, `queued:${timestamp}`) @@ -159,7 +166,17 @@ export class MemoryStorage implements Storage { } async getJobState (id: string): Promise { - return this.#jobs.get(id) ?? null + const state = this.#jobs.get(id) + if (!state) return null + + const expiry = this.#jobExpiry.get(id) + if (expiry && Date.now() >= expiry) { + this.#jobs.delete(id) + this.#jobExpiry.delete(id) + return null + } + + return state } async setJobState (id: string, state: string): Promise { @@ -169,6 +186,7 @@ export class MemoryStorage implements Storage { async deleteJob (id: string): Promise { const existed = this.#jobs.has(id) this.#jobs.delete(id) + this.#jobExpiry.delete(id) if (existed) { this.#eventEmitter.emit('event', id, 'cancelled') @@ -180,11 +198,15 @@ export class MemoryStorage implements Storage { async getJobStates (ids: string[]): Promise> { const result = new Map() for (const id of ids) { - result.set(id, this.#jobs.get(id) ?? null) + result.set(id, await this.getJobState(id)) } return result } + async setJobExpiry (id: string, ttlMs: number): Promise { + this.#jobExpiry.set(id, Date.now() + ttlMs) + } + async setResult (id: string, result: Buffer, ttlMs: number): Promise { this.#results.set(id, { data: result, @@ -285,6 +307,9 @@ export class MemoryStorage implements Storage { // Set state to completed this.#jobs.set(id, `completed:${timestamp}`) + // Set dedup expiry + this.#jobExpiry.set(id, timestamp + resultTTL) + // Store result await this.setResult(id, result, resultTTL) @@ -304,6 +329,9 @@ export class MemoryStorage implements Storage { // Set state to failed this.#jobs.set(id, `failed:${timestamp}`) + // Set dedup expiry + this.#jobExpiry.set(id, timestamp + errorTTL) + // Store error await this.setError(id, error, errorTTL) @@ -336,6 +364,14 @@ export class MemoryStorage implements Storage { #cleanupExpired (): void { const now = Date.now() + // Clean expired job entries + for (const [id, expiresAt] of this.#jobExpiry) { + if (now >= expiresAt) { + this.#jobs.delete(id) + this.#jobExpiry.delete(id) + } + } + // Clean expired results for (const [id, stored] of this.#results) { if (now > stored.expiresAt) { @@ -365,6 +401,7 @@ export class MemoryStorage implements Storage { this.#queue = [] this.#processingQueues.clear() this.#jobs.clear() + this.#jobExpiry.clear() this.#results.clear() this.#errors.clear() this.#workers.clear() diff --git a/src/storage/redis.ts b/src/storage/redis.ts index e7b13fa..e82fb61 100644 --- a/src/storage/redis.ts +++ b/src/storage/redis.ts @@ -241,6 +241,15 @@ export class RedisStorage implements Storage { async getJobState (id: string): Promise { const state = await this.#client!.hget(this.#jobsKey(), id) + if (!state) return null + + // Check dedup expiry + const expiry = await this.#client!.hget(this.#jobsKey(), `${id}:exp`) + if (expiry && Date.now() >= parseInt(expiry, 10)) { + await this.#client!.hdel(this.#jobsKey(), id, `${id}:exp`) + return null + } + return state } @@ -248,6 +257,11 @@ export class RedisStorage implements Storage { await this.#client!.hset(this.#jobsKey(), id, state) } + async setJobExpiry (id: string, ttlMs: number): Promise { + const expiresAt = Date.now() + ttlMs + await this.#client!.hset(this.#jobsKey(), `${id}:exp`, expiresAt.toString()) + } + async deleteJob (id: string): Promise { const result = await this.#client!.evalsha(this.#scriptSHAs!.cancel, 1, this.#jobsKey(), id) return result === 1 @@ -258,11 +272,30 @@ export class RedisStorage implements Storage { return new Map() } - const states = await this.#client!.hmget(this.#jobsKey(), ...ids) + // Fetch both state and expiry fields for each id + const allKeys = ids.flatMap(id => [id, `${id}:exp`]) + const values = await this.#client!.hmget(this.#jobsKey(), ...allKeys) + + const now = Date.now() const result = new Map() + const expiredFields: string[] = [] + for (let i = 0; i < ids.length; i++) { - result.set(ids[i], states[i]) + const state = values[i * 2] + const expiry = values[i * 2 + 1] + + if (state && expiry && now >= parseInt(expiry, 10)) { + expiredFields.push(ids[i], `${ids[i]}:exp`) + result.set(ids[i], null) + } else { + result.set(ids[i], state) + } } + + if (expiredFields.length > 0) { + await this.#client!.hdel(this.#jobsKey(), ...expiredFields) + } + return result } diff --git a/src/storage/types.ts b/src/storage/types.ts index e0e63df..acab600 100644 --- a/src/storage/types.ts +++ b/src/storage/types.ts @@ -213,6 +213,12 @@ export interface Storage { */ releaseLeaderLock? (lockKey: string, ownerId: string): Promise + /** + * Set the dedup expiry for a terminal job. + * After ttlMs elapses, the job ID can be re-enqueued. + */ + setJobExpiry (id: string, ttlMs: number): Promise + // ═══════════════════════════════════════════════════════════════════ // ATOMIC OPERATIONS (Lua scripts in Redis) // ═══════════════════════════════════════════════════════════════════ diff --git a/test/deduplication.test.ts b/test/deduplication.test.ts index b2edbdf..7126868 100644 --- a/test/deduplication.test.ts +++ b/test/deduplication.test.ts @@ -222,4 +222,129 @@ describe('Deduplication', () => { await shortTtlQueue.stop() }) }) + + describe('dedup expiry', () => { + it('should allow re-enqueue after resultTTL expires for completed job', async () => { + const shortTtlStorage = new MemoryStorage() + const shortTtlQueue = new Queue<{ value: number }, { result: number }>({ + storage: shortTtlStorage, + resultTTL: 50, // 50ms TTL + concurrency: 1, + visibilityTimeout: 5000 + }) + + let callCount = 0 + shortTtlQueue.execute(async (job: Job<{ value: number }>) => { + callCount++ + return { result: job.payload.value * callCount } + }) + + await shortTtlQueue.start() + + // Complete first job + const result1 = await shortTtlQueue.enqueueAndWait('job-1', { value: 10 }, { timeout: 5000 }) + assert.deepStrictEqual(result1, { result: 10 }) + assert.strictEqual(callCount, 1) + + // Wait for TTL to expire + await sleep(100) + + // Re-enqueue same ID — should succeed now + const result2 = await shortTtlQueue.enqueueAndWait('job-1', { value: 10 }, { timeout: 5000 }) + assert.deepStrictEqual(result2, { result: 20 }) + assert.strictEqual(callCount, 2) + + await shortTtlQueue.stop() + }) + + it('should allow re-enqueue after errorTTL expires for failed job', async () => { + const shortTtlStorage = new MemoryStorage() + const shortTtlQueue = new Queue<{ value: number }, { result: number }>({ + storage: shortTtlStorage, + resultTTL: 50, // 50ms TTL + concurrency: 1, + visibilityTimeout: 5000 + }) + + let shouldFail = true + shortTtlQueue.execute(async (job: Job<{ value: number }>) => { + if (shouldFail) { + throw new Error('Intentional failure') + } + return { result: job.payload.value * 2 } + }) + + await shortTtlQueue.start() + + // Enqueue and wait for failure + const failedPromise = once(shortTtlQueue, 'failed') + await shortTtlQueue.enqueue('job-1', { value: 42 }) + await failedPromise + + // Verify it's a duplicate while within TTL + const dupResult = await shortTtlQueue.enqueue('job-1', { value: 42 }) + assert.strictEqual(dupResult.status, 'duplicate') + + // Wait for TTL to expire + await sleep(100) + + // Now re-enqueue should work + shouldFail = false + const result = await shortTtlQueue.enqueueAndWait('job-1', { value: 42 }, { timeout: 5000 }) + assert.deepStrictEqual(result, { result: 84 }) + + await shortTtlQueue.stop() + }) + + it('should still block re-enqueue within TTL window', async () => { + const shortTtlStorage = new MemoryStorage() + const shortTtlQueue = new Queue<{ value: number }, { result: number }>({ + storage: shortTtlStorage, + resultTTL: 5000, // 5s TTL — won't expire during this test + concurrency: 1, + visibilityTimeout: 5000 + }) + + shortTtlQueue.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value * 2 } + }) + + await shortTtlQueue.start() + + // Complete first job + const result1 = await shortTtlQueue.enqueueAndWait('job-1', { value: 21 }, { timeout: 5000 }) + assert.deepStrictEqual(result1, { result: 42 }) + + // Re-enqueue immediately — should return cached result (dedup still active) + const result2 = await shortTtlQueue.enqueue('job-1', { value: 99 }) + assert.strictEqual(result2.status, 'completed') + if (result2.status === 'completed') { + assert.deepStrictEqual(result2.result, { result: 42 }) + } + + await shortTtlQueue.stop() + }) + + it('should return null from getJobState after expiry', async () => { + const shortTtlStorage = new MemoryStorage() + await shortTtlStorage.connect() + + // Simulate a completed job with short TTL + await shortTtlStorage.enqueue('job-1', Buffer.from('test'), Date.now()) + await shortTtlStorage.completeJob('job-1', Buffer.from('test'), 'worker-1', Buffer.from('result'), 50) + + // State should exist within TTL + const state1 = await shortTtlStorage.getJobState('job-1') + assert.ok(state1?.startsWith('completed:')) + + // Wait for TTL to expire + await sleep(100) + + // State should be null after expiry + const state2 = await shortTtlStorage.getJobState('job-1') + assert.strictEqual(state2, null) + + await shortTtlStorage.disconnect() + }) + }) }) diff --git a/test/queue.test.ts b/test/queue.test.ts index 71a0f9d..6c3cb11 100644 --- a/test/queue.test.ts +++ b/test/queue.test.ts @@ -675,7 +675,7 @@ describe('Queue', () => { assert.deepStrictEqual(updateResult, { status: 'not_terminal' }) }) - it('should return missing_payload when terminal payload has expired', async () => { + it('should return not_found when terminal payload and job state have expired', async () => { const localStorage = new MemoryStorage() const localQueue = new Queue<{ value: number }, { result: number }>({ storage: localStorage, @@ -693,8 +693,10 @@ describe('Queue', () => { await sleep(60) + // After resultTTL expires, both the job state and result are cleaned up, + // so updateResultTTL returns not_found (the job ID can be re-enqueued) const updateResult = await localQueue.updateResultTTL('job-1', 100) - assert.deepStrictEqual(updateResult, { status: 'missing_payload' }) + assert.deepStrictEqual(updateResult, { status: 'not_found' }) await localQueue.stop() })