Skip to content

Commit 18fca95

Browse files
authored
feat: Respect job TTL on Redis. (#30)
Signed-off-by: Paolo Insogna <paolo@cowtech.it>
1 parent 36fafeb commit 18fca95

2 files changed

Lines changed: 84 additions & 12 deletions

File tree

src/storage/redis.ts

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import { type Redis } from 'iovalkey'
22
import { EventEmitter } from 'node:events'
33
import { readFileSync } from 'node:fs'
4-
import { dirname, join } from 'node:path'
5-
import { fileURLToPath } from 'node:url'
4+
import { join } from 'node:path'
65
import type { Storage } from './types.ts'
76
import { loadOptionalDependency } from './utils.ts'
87

9-
const __dirname = dirname(fileURLToPath(import.meta.url))
8+
const EXPIRING_VALUE_HEADER_SIZE = 8 // First 8 bytes are the expireAt timestamp in milliseconds (Uint64 BE)
109

1110
interface RedisStorageConfig {
1211
url?: string
@@ -75,6 +74,29 @@ export class RedisStorage implements Storage {
7574
return this.#key('workers')
7675
}
7776

77+
#encodeExpiringValue (value: Buffer, ttlMs: number): Buffer {
78+
const buffer = Buffer.allocUnsafe(EXPIRING_VALUE_HEADER_SIZE + value.length)
79+
buffer.writeBigInt64BE(BigInt(Date.now() + ttlMs))
80+
value.copy(buffer, EXPIRING_VALUE_HEADER_SIZE)
81+
return buffer
82+
}
83+
84+
#decodeExpiringValue (value: Buffer): { payload: Buffer; expiresAt: number } | null {
85+
if (value.length < EXPIRING_VALUE_HEADER_SIZE) {
86+
return null
87+
}
88+
89+
const expiresAt = Number(value.readBigInt64BE(0))
90+
if (!Number.isFinite(expiresAt) || expiresAt <= 0) {
91+
return null
92+
}
93+
94+
return {
95+
payload: value.subarray(EXPIRING_VALUE_HEADER_SIZE),
96+
expiresAt
97+
}
98+
}
99+
78100
async connect (): Promise<void> {
79101
if (this.#client) return
80102

@@ -119,7 +141,7 @@ export class RedisStorage implements Storage {
119141
}
120142

121143
async #loadScripts (): Promise<void> {
122-
const scriptsDir = join(__dirname, '..', '..', 'redis-scripts')
144+
const scriptsDir = join(import.meta.dirname, '..', '..', 'redis-scripts')
123145

124146
const enqueueScript = readFileSync(join(scriptsDir, 'enqueue.lua'), 'utf8')
125147
const completeScript = readFileSync(join(scriptsDir, 'complete.lua'), 'utf8')
@@ -240,23 +262,51 @@ export class RedisStorage implements Storage {
240262
}
241263

242264
async setResult (id: string, result: Buffer, ttlMs: number): Promise<void> {
243-
await this.#client!.hset(this.#resultsKey(), id, result)
244-
// Note: HEXPIRE is not widely supported, so we set TTL on the whole hash
245-
// For production, consider using separate keys per result
265+
await this.#client!.hset(this.#resultsKey(), id, this.#encodeExpiringValue(result, ttlMs))
246266
}
247267

248268
async getResult (id: string): Promise<Buffer | null> {
249269
const result = await this.#client!.hgetBuffer(this.#resultsKey(), id)
250-
return result
270+
if (!result) {
271+
return null
272+
}
273+
274+
const decoded = this.#decodeExpiringValue(result)
275+
if (!decoded) {
276+
// Backward compatibility for legacy entries stored without envelope
277+
return result
278+
}
279+
280+
if (Date.now() > decoded.expiresAt) {
281+
await this.#client!.hdel(this.#resultsKey(), id)
282+
return null
283+
}
284+
285+
return decoded.payload
251286
}
252287

253288
async setError (id: string, error: Buffer, ttlMs: number): Promise<void> {
254-
await this.#client!.hset(this.#errorsKey(), id, error)
289+
await this.#client!.hset(this.#errorsKey(), id, this.#encodeExpiringValue(error, ttlMs))
255290
}
256291

257292
async getError (id: string): Promise<Buffer | null> {
258293
const result = await this.#client!.hgetBuffer(this.#errorsKey(), id)
259-
return result
294+
if (!result) {
295+
return null
296+
}
297+
298+
const decoded = this.#decodeExpiringValue(result)
299+
if (!decoded) {
300+
// Backward compatibility for legacy entries stored without envelope
301+
return result
302+
}
303+
304+
if (Date.now() > decoded.expiresAt) {
305+
await this.#client!.hdel(this.#errorsKey(), id)
306+
return null
307+
}
308+
309+
return decoded.payload
260310
}
261311

262312
async registerWorker (workerId: string, ttlMs: number): Promise<void> {
@@ -328,6 +378,7 @@ export class RedisStorage implements Storage {
328378
async completeJob (id: string, message: Buffer, workerId: string, result: Buffer, resultTTL: number): Promise<void> {
329379
const timestamp = Date.now()
330380
const state = `completed:${timestamp}`
381+
const encodedResult = this.#encodeExpiringValue(result, resultTTL)
331382

332383
await this.#client!.evalsha(
333384
this.#scriptSHAs!.complete,
@@ -338,7 +389,7 @@ export class RedisStorage implements Storage {
338389
id,
339390
message,
340391
state,
341-
result,
392+
encodedResult,
342393
resultTTL.toString()
343394
)
344395

@@ -350,6 +401,7 @@ export class RedisStorage implements Storage {
350401
async failJob (id: string, message: Buffer, workerId: string, error: Buffer, errorTTL: number): Promise<void> {
351402
const timestamp = Date.now()
352403
const state = `failed:${timestamp}`
404+
const encodedError = this.#encodeExpiringValue(error, errorTTL)
353405

354406
await this.#client!.evalsha(
355407
this.#scriptSHAs!.fail,
@@ -360,7 +412,7 @@ export class RedisStorage implements Storage {
360412
id,
361413
message,
362414
state,
363-
error,
415+
encodedError,
364416
errorTTL.toString()
365417
)
366418

test/redis-storage.test.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,16 @@ describe('RedisStorage', () => {
184184
const result = await storage.getResult('non-existent')
185185
assert.strictEqual(result, null)
186186
})
187+
188+
it('should expire results per job without affecting other entries', async () => {
189+
await storage.setResult('job-short', Buffer.from('short'), 20)
190+
await storage.setResult('job-long', Buffer.from('long'), 1000)
191+
192+
await sleep(30)
193+
194+
assert.strictEqual(await storage.getResult('job-short'), null)
195+
assert.deepStrictEqual(await storage.getResult('job-long'), Buffer.from('long'))
196+
})
187197
})
188198

189199
describe('errors', () => {
@@ -199,6 +209,16 @@ describe('RedisStorage', () => {
199209
const error = await storage.getError('non-existent')
200210
assert.strictEqual(error, null)
201211
})
212+
213+
it('should expire errors per job without affecting other entries', async () => {
214+
await storage.setError('job-short', Buffer.from('short-error'), 20)
215+
await storage.setError('job-long', Buffer.from('long-error'), 1000)
216+
217+
await sleep(30)
218+
219+
assert.strictEqual(await storage.getError('job-short'), null)
220+
assert.deepStrictEqual(await storage.getError('job-long'), Buffer.from('long-error'))
221+
})
202222
})
203223

204224
describe('workers', () => {

0 commit comments

Comments
 (0)