Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions redis-scripts/cancel.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

local deleted = redis.call('HDEL', KEYS[1], ARGV[1])
if deleted == 1 then
-- Also clean up expiry field
redis.call('HDEL', KEYS[1], ARGV[1] .. ':exp')
redis.call('PUBLISH', 'job:events', ARGV[1] .. ':cancelled')
return 1
end
Expand Down
6 changes: 6 additions & 0 deletions redis-scripts/complete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
-- Set state to completed
redis.call('HSET', KEYS[1], ARGV[1], ARGV[3])

-- Set dedup expiry
local now = redis.call('TIME')
local nowMs = tonumber(now[1]) * 1000 + math.floor(tonumber(now[2]) / 1000)
local expiresAt = nowMs + tonumber(ARGV[5])
redis.call('HSET', KEYS[1], ARGV[1] .. ':exp', tostring(expiresAt))

-- Store result with TTL
redis.call('HSET', KEYS[2], ARGV[1], ARGV[4])
redis.call('PEXPIRE', KEYS[2], ARGV[5])
Expand Down
15 changes: 14 additions & 1 deletion redis-scripts/enqueue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,20 @@

local existing = redis.call('HGET', KEYS[1], ARGV[1])
if existing then
return existing
-- Check if the job has a dedup expiry and if it has passed
local expiry = redis.call('HGET', KEYS[1], ARGV[1] .. ':exp')
if expiry then
local now = redis.call('TIME')
local nowMs = tonumber(now[1]) * 1000 + math.floor(tonumber(now[2]) / 1000)
if nowMs >= tonumber(expiry) then
-- Expired: remove both fields and fall through to enqueue as new
redis.call('HDEL', KEYS[1], ARGV[1], ARGV[1] .. ':exp')
else
return existing
end
else
return existing
end
end

redis.call('HSET', KEYS[1], ARGV[1], ARGV[3])
Expand Down
6 changes: 6 additions & 0 deletions redis-scripts/fail.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
-- Set state to failed
redis.call('HSET', KEYS[1], ARGV[1], ARGV[3])

-- Set dedup expiry
local now = redis.call('TIME')
local nowMs = tonumber(now[1]) * 1000 + math.floor(tonumber(now[2]) / 1000)
local expiresAt = nowMs + tonumber(ARGV[5])
redis.call('HSET', KEYS[1], ARGV[1] .. ':exp', tostring(expiresAt))

-- Store error with TTL
redis.call('HSET', KEYS[2], ARGV[1], ARGV[4])
redis.call('PEXPIRE', KEYS[2], ARGV[5])
Expand Down
2 changes: 2 additions & 0 deletions src/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ export class Producer<TPayload, TResult> {
return { status: 'missing_payload' }
}
await this.#storage.setResult(id, existingResult, ttlMs)
await this.#storage.setJobExpiry(id, ttlMs)
this.#logger.debug({ id, ttlMs }, 'Updated completed payload TTL.')
return { status: 'updated' }
}
Expand All @@ -229,6 +230,7 @@ export class Producer<TPayload, TResult> {
return { status: 'missing_payload' }
}
await this.#storage.setError(id, existingError, ttlMs)
await this.#storage.setJobExpiry(id, ttlMs)
this.#logger.debug({ id, ttlMs }, 'Updated failed payload TTL.')
return { status: 'updated' }
}
Expand Down
67 changes: 65 additions & 2 deletions src/storage/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,22 @@ export class FileStorage implements Storage {
// Check if job already exists
try {
const existing = await readFile(jobFile, 'utf8')
return existing

// Check if the job has a dedup expiry and if it has passed
try {
const ttlPath = join(this.#jobsPath, `${id}.ttl`)
const expiresAt = parseInt(await readFile(ttlPath, 'utf8'), 10)
if (Date.now() >= expiresAt) {
// Expired: remove both files and fall through to enqueue as new
await unlink(jobFile).catch(() => {})
await unlink(ttlPath).catch(() => {})
} else {
return existing
}
} catch {
// No ttl file means no expiry — always block
return existing
}
} catch {
// Job doesn't exist, continue
}
Expand Down Expand Up @@ -375,7 +390,22 @@ export class FileStorage implements Storage {

async getJobState (id: string): Promise<string | null> {
try {
return await readFile(join(this.#jobsPath, `${id}.state`), 'utf8')
const state = await readFile(join(this.#jobsPath, `${id}.state`), 'utf8')

// Check dedup expiry
try {
const ttlPath = join(this.#jobsPath, `${id}.ttl`)
const expiresAt = parseInt(await readFile(ttlPath, 'utf8'), 10)
if (Date.now() >= expiresAt) {
await unlink(join(this.#jobsPath, `${id}.state`)).catch(() => {})
await unlink(ttlPath).catch(() => {})
return null
}
} catch {
// No ttl file — not expired
}

return state
} catch {
return null
}
Expand All @@ -388,6 +418,7 @@ export class FileStorage implements Storage {
async deleteJob (id: string): Promise<boolean> {
try {
await unlink(join(this.#jobsPath, `${id}.state`))
await unlink(join(this.#jobsPath, `${id}.ttl`)).catch(() => {})
this.#eventEmitter.emit('event', id, 'cancelled')
return true
} catch {
Expand All @@ -405,6 +436,12 @@ export class FileStorage implements Storage {
return result
}

async setJobExpiry (id: string, ttlMs: number): Promise<void> {
const ttlPath = join(this.#jobsPath, `${id}.ttl`)
const expiresAt = Date.now() + ttlMs
await this.#writeFileAtomic(ttlPath, expiresAt.toString())
}

async setResult (id: string, result: Buffer, ttlMs: number): Promise<void> {
const filePath = join(this.#resultsPath, `${id}.result`)
// Store TTL expiry time in a companion file
Expand Down Expand Up @@ -567,6 +604,9 @@ export class FileStorage implements Storage {
// Set state to completed
await this.setJobState(id, `completed:${timestamp}`)

// Set dedup expiry
await this.setJobExpiry(id, resultTTL)

// Store result
await this.setResult(id, result, resultTTL)

Expand All @@ -586,6 +626,9 @@ export class FileStorage implements Storage {
// Set state to failed
await this.setJobState(id, `failed:${timestamp}`)

// Set dedup expiry
await this.setJobExpiry(id, errorTTL)

// Store error
await this.setError(id, error, errorTTL)

Expand Down Expand Up @@ -618,6 +661,26 @@ export class FileStorage implements Storage {
async #cleanupExpired (): Promise<void> {
const now = Date.now()

// Clean expired job entries
try {
const jobFiles = await readdir(this.#jobsPath)
for (const file of jobFiles) {
if (!file.endsWith('.ttl')) continue
const id = file.replace('.ttl', '')
try {
const expiresAt = parseInt(await readFile(join(this.#jobsPath, file), 'utf8'), 10)
if (now >= expiresAt) {
await unlink(join(this.#jobsPath, `${id}.state`)).catch(() => {})
await unlink(join(this.#jobsPath, file)).catch(() => {})
}
} catch {
// Ignore errors
}
}
} catch {
// Ignore errors
}

// Clean expired results
try {
const resultFiles = await readdir(this.#resultsPath)
Expand Down
43 changes: 40 additions & 3 deletions src/storage/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export class MemoryStorage implements Storage {
#queue: Buffer[] = []
#processingQueues: Map<string, Buffer[]> = new Map()
#jobs: Map<string, string> = new Map()
#jobExpiry: Map<string, number> = new Map()
#results: Map<string, StoredResult> = new Map()
#errors: Map<string, StoredResult> = new Map()
#workers: Map<string, WorkerInfo> = new Map()
Expand Down Expand Up @@ -68,7 +69,13 @@ export class MemoryStorage implements Storage {
async enqueue (id: string, message: Buffer, timestamp: number): Promise<string | null> {
const existing = this.#jobs.get(id)
if (existing) {
return existing
const expiry = this.#jobExpiry.get(id)
if (expiry && Date.now() >= expiry) {
this.#jobs.delete(id)
this.#jobExpiry.delete(id)
} else {
return existing
}
}

this.#jobs.set(id, `queued:${timestamp}`)
Expand Down Expand Up @@ -159,7 +166,17 @@ export class MemoryStorage implements Storage {
}

async getJobState (id: string): Promise<string | null> {
return this.#jobs.get(id) ?? null
const state = this.#jobs.get(id)
if (!state) return null

const expiry = this.#jobExpiry.get(id)
if (expiry && Date.now() >= expiry) {
this.#jobs.delete(id)
this.#jobExpiry.delete(id)
return null
}

return state
}

async setJobState (id: string, state: string): Promise<void> {
Expand All @@ -169,6 +186,7 @@ export class MemoryStorage implements Storage {
async deleteJob (id: string): Promise<boolean> {
const existed = this.#jobs.has(id)
this.#jobs.delete(id)
this.#jobExpiry.delete(id)

if (existed) {
this.#eventEmitter.emit('event', id, 'cancelled')
Expand All @@ -180,11 +198,15 @@ export class MemoryStorage implements Storage {
async getJobStates (ids: string[]): Promise<Map<string, string | null>> {
const result = new Map<string, string | null>()
for (const id of ids) {
result.set(id, this.#jobs.get(id) ?? null)
result.set(id, await this.getJobState(id))
}
return result
}

async setJobExpiry (id: string, ttlMs: number): Promise<void> {
this.#jobExpiry.set(id, Date.now() + ttlMs)
}

async setResult (id: string, result: Buffer, ttlMs: number): Promise<void> {
this.#results.set(id, {
data: result,
Expand Down Expand Up @@ -285,6 +307,9 @@ export class MemoryStorage implements Storage {
// Set state to completed
this.#jobs.set(id, `completed:${timestamp}`)

// Set dedup expiry
this.#jobExpiry.set(id, timestamp + resultTTL)

// Store result
await this.setResult(id, result, resultTTL)

Expand All @@ -304,6 +329,9 @@ export class MemoryStorage implements Storage {
// Set state to failed
this.#jobs.set(id, `failed:${timestamp}`)

// Set dedup expiry
this.#jobExpiry.set(id, timestamp + errorTTL)

// Store error
await this.setError(id, error, errorTTL)

Expand Down Expand Up @@ -336,6 +364,14 @@ export class MemoryStorage implements Storage {
#cleanupExpired (): void {
const now = Date.now()

// Clean expired job entries
for (const [id, expiresAt] of this.#jobExpiry) {
if (now >= expiresAt) {
this.#jobs.delete(id)
this.#jobExpiry.delete(id)
}
}

// Clean expired results
for (const [id, stored] of this.#results) {
if (now > stored.expiresAt) {
Expand Down Expand Up @@ -365,6 +401,7 @@ export class MemoryStorage implements Storage {
this.#queue = []
this.#processingQueues.clear()
this.#jobs.clear()
this.#jobExpiry.clear()
this.#results.clear()
this.#errors.clear()
this.#workers.clear()
Expand Down
37 changes: 35 additions & 2 deletions src/storage/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,27 @@ export class RedisStorage implements Storage {

async getJobState (id: string): Promise<string | null> {
const state = await this.#client!.hget(this.#jobsKey(), id)
if (!state) return null

// Check dedup expiry
const expiry = await this.#client!.hget(this.#jobsKey(), `${id}:exp`)
if (expiry && Date.now() >= parseInt(expiry, 10)) {
await this.#client!.hdel(this.#jobsKey(), id, `${id}:exp`)
return null
}

return state
}

async setJobState (id: string, state: string): Promise<void> {
await this.#client!.hset(this.#jobsKey(), id, state)
}

async setJobExpiry (id: string, ttlMs: number): Promise<void> {
const expiresAt = Date.now() + ttlMs
await this.#client!.hset(this.#jobsKey(), `${id}:exp`, expiresAt.toString())
}

async deleteJob (id: string): Promise<boolean> {
const result = await this.#client!.evalsha(this.#scriptSHAs!.cancel, 1, this.#jobsKey(), id)
return result === 1
Expand All @@ -258,11 +272,30 @@ export class RedisStorage implements Storage {
return new Map()
}

const states = await this.#client!.hmget(this.#jobsKey(), ...ids)
// Fetch both state and expiry fields for each id
const allKeys = ids.flatMap(id => [id, `${id}:exp`])
const values = await this.#client!.hmget(this.#jobsKey(), ...allKeys)

const now = Date.now()
const result = new Map<string, string | null>()
const expiredFields: string[] = []

for (let i = 0; i < ids.length; i++) {
result.set(ids[i], states[i])
const state = values[i * 2]
const expiry = values[i * 2 + 1]

if (state && expiry && now >= parseInt(expiry, 10)) {
expiredFields.push(ids[i], `${ids[i]}:exp`)
result.set(ids[i], null)
} else {
result.set(ids[i], state)
}
}

if (expiredFields.length > 0) {
await this.#client!.hdel(this.#jobsKey(), ...expiredFields)
}

return result
}

Expand Down
6 changes: 6 additions & 0 deletions src/storage/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ export interface Storage {
*/
releaseLeaderLock? (lockKey: string, ownerId: string): Promise<boolean>

/**
* Set the dedup expiry for a terminal job.
* After ttlMs elapses, the job ID can be re-enqueued.
*/
setJobExpiry (id: string, ttlMs: number): Promise<void>

// ═══════════════════════════════════════════════════════════════════
// ATOMIC OPERATIONS (Lua scripts in Redis)
// ═══════════════════════════════════════════════════════════════════
Expand Down
Loading