Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ const queue = new Queue<TPayload, TResult>(config)
| `blockTimeout` | `number` | `5` | Seconds to wait when polling for jobs |
| `visibilityTimeout` | `number` | `30000` | Milliseconds before a processing job is considered stalled |
| `resultTTL` | `number` | `3600000` | Milliseconds to cache job results (1 hour) |
| `logger` | `pino.Logger` | `abstractLogger` | Logger used by queue/producer/consumer |
| `afterExecution` | `AfterExecutionHook<TPayload, TResult>` | `undefined` | Hook called after execution and before persisting terminal state |
| `payloadSerde` | `Serde<TPayload>` | `JsonSerde` | Custom serializer for job payloads |
| `resultSerde` | `Serde<TResult>` | `JsonSerde` | Custom serializer for job results |
Expand Down Expand Up @@ -279,7 +280,8 @@ import { RedisStorage } from '@platformatic/job-queue'

const storage = new RedisStorage({
url: 'redis://localhost:6379',
keyPrefix: 'myapp:' // Optional prefix for all keys
keyPrefix: 'myapp:', // Optional prefix for all keys
logger // Optional pino-compatible logger
})
```

Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
"bench:redis": "REDIS_URL=${REDIS_URL:-redis://localhost:6379} node benchmark/request-response.ts",
"bench:valkey": "REDIS_URL=${REDIS_URL:-redis://localhost:6380} node benchmark/request-response.ts"
},
"dependencies": {},
"dependencies": {
"pino": "^10.3.1"
},
"optionalDependencies": {
"fast-write-atomic": "^0.4.0",
"iovalkey": "^0.2.0"
Expand Down
4 changes: 4 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 58 additions & 8 deletions src/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { EventEmitter } from 'node:events'
import type { Storage } from './storage/types.ts'
import type { Serde } from './serde/index.ts'
import type { QueueMessage, Job, JobHandler, AfterExecutionHook, AfterExecutionContext } from './types.ts'
import type { Logger } from 'pino'
import { MaxRetriesError } from './errors.ts'
import type { Serde } from './serde/index.ts'
import { createJsonSerde } from './serde/index.ts'
import type { Storage } from './storage/types.ts'
import type { AfterExecutionContext, AfterExecutionHook, Job, JobHandler, QueueMessage } from './types.ts'
import { abstractLogger, ensureLoggableError } from './utils/logging.ts'

interface ConsumerConfig<TPayload, TResult> {
storage: Storage
Expand All @@ -16,6 +18,7 @@ interface ConsumerConfig<TPayload, TResult> {
resultTTL?: number
visibilityTimeout?: number
afterExecution?: AfterExecutionHook<TPayload, TResult>
logger?: Logger
}

interface ConsumerEvents<TResult> {
Expand Down Expand Up @@ -46,6 +49,7 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
#resultTTL: number
#visibilityTimeout: number
#afterExecution: AfterExecutionHook<TPayload, TResult>
#logger: Logger

#handler: JobHandler<TPayload, TResult> | null = null
#running = false
Expand All @@ -65,27 +69,34 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
this.#resultTTL = config.resultTTL ?? 3600000
this.#visibilityTimeout = config.visibilityTimeout ?? 30000
this.#afterExecution = config.afterExecution ?? noopAfterExecution
this.#logger = (config.logger ?? abstractLogger).child({ component: 'consumer', workerId: this.#workerId })
}

/**
* Register a job handler
*/
execute (handler: JobHandler<TPayload, TResult>): void {
this.#handler = handler
this.#logger.debug('Registered consumer handler.')
}

/**
* Start consuming jobs
*/
async start (): Promise<void> {
if (this.#running) return
if (this.#running) {
this.#logger.trace('Consumer already running.')
return
}
if (!this.#handler) {
throw new Error('No handler registered. Call execute() first.')
}

this.#running = true
this.#abortController = new AbortController()

this.#logger.debug({ concurrency: this.#concurrency }, 'Starting consumer.')

// Register worker
await this.#storage.registerWorker(this.#workerId, this.#visibilityTimeout * 2)

Expand All @@ -97,16 +108,24 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe

// Don't await - let them run in background
Promise.all(loops).catch(err => {
this.emit('error', err)
const error = err instanceof Error ? err : new Error(String(err))
this.#logger.error({ err: ensureLoggableError(error) }, 'Worker loop terminated with error.')
this.emit('error', error)
})

this.#logger.debug('Consumer started.')
}

/**
* Stop consuming jobs gracefully
*/
async stop (): Promise<void> {
if (!this.#running) return
if (!this.#running) {
this.#logger.trace('Consumer already stopped.')
return
}

this.#logger.debug('Stopping consumer.')
this.#running = false

// Signal abort to worker loops
Expand All @@ -122,6 +141,10 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
await new Promise(resolve => setTimeout(resolve, 100))
}

if (this.#activeJobs > 0) {
this.#logger.warn({ activeJobs: this.#activeJobs }, 'Forcing abort of active jobs during stop.')
}

// Abort any remaining jobs
for (const [, controller] of this.#jobAbortControllers) {
controller.abort()
Expand All @@ -130,6 +153,7 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe

// Unregister worker
await this.#storage.unregisterWorker(this.#workerId)
this.#logger.debug('Consumer stopped.')
}

/**
Expand All @@ -145,10 +169,13 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
continue
}

this.#logger.trace('Dequeued job message.')

// Check if aborted
if (this.#abortController?.signal.aborted) {
// Put message back
const queueMessage = this.#deserializeMessage(message)
this.#logger.warn({ id: queueMessage.id }, 'Consumer aborted while holding job, requeueing.')
await this.#storage.requeue(queueMessage.id, message, this.#workerId)
this.emit('requeued', queueMessage.id)
break
Expand All @@ -158,7 +185,9 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
await this.#processJob(message)
} catch (err) {
if (!this.#running) break
this.emit('error', err as Error)
const error = err instanceof Error ? err : new Error(String(err))
this.#logger.error({ err: ensureLoggableError(error) }, 'Worker loop error.')
this.emit('error', error)
// Brief pause before retrying loop
await new Promise(resolve => setTimeout(resolve, 1000))
}
Expand All @@ -173,10 +202,13 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
const { id, payload, attempts, maxAttempts, createdAt } = queueMessage
const resolvedTTL = queueMessage.resultTTL ?? this.#resultTTL

this.#logger.trace({ id, attempts, maxAttempts, resolvedTTL }, 'Processing job.')

// Check if job was cancelled (deleted from jobs hash)
const state = await this.#storage.getJobState(id)
if (!state) {
// Job was cancelled, just ack it
this.#logger.debug({ id }, 'Acknowledging cancelled job.')
await this.#storage.ack(id, message, this.#workerId)
return
}
Expand All @@ -198,6 +230,7 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
const startedAt = Date.now()
await this.#storage.setJobState(id, `processing:${startedAt}:${this.#workerId}`)
await this.#storage.publishEvent(id, 'processing')
this.#logger.trace({ id, attempt: currentAttempts }, 'Job marked as processing.')

try {
const job: Job<TPayload> = {
Expand Down Expand Up @@ -233,13 +266,15 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
// Complete the job
const serializedResult = this.#resultSerde.serialize(finalResult)
await this.#storage.completeJob(id, message, this.#workerId, serializedResult, context.ttl)
this.#logger.debug({ id, ttl: context.ttl }, 'Job completed and persisted.')

this.emit('completed', id, finalResult)
} catch (err) {
// Clear visibility timer
clearTimeout(visibilityTimer)

const error = err as ExtendedError
this.#logger.warn({ id, attempt: currentAttempts, err: ensureLoggableError(error) }, 'Job handler failed.')

if (currentAttempts < maxAttempts) {
// Retry - update message with incremented attempts
Expand All @@ -250,6 +285,7 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
const serializedMessage = this.#payloadSerde.serialize(updatedMessage as unknown as TPayload)

await this.#storage.retryJob(id, serializedMessage, this.#workerId, currentAttempts)
this.#logger.warn({ id, attempt: currentAttempts }, 'Job scheduled for retry.')

this.emit('failing', id, error, currentAttempts)
} else {
Expand Down Expand Up @@ -285,12 +321,17 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
)

await this.#storage.failJob(id, message, this.#workerId, serializedError, context.ttl)
this.#logger.error(
{ id, attempts: currentAttempts, ttl: context.ttl, err: ensureLoggableError(maxRetriesError) },
'Job failed permanently.'
)

this.emit('failed', id, maxRetriesError)
}
} finally {
this.#jobAbortControllers.delete(id)
this.#activeJobs--
this.#logger.trace({ id, activeJobs: this.#activeJobs }, 'Finished job processing cycle.')
}
}

Expand All @@ -302,11 +343,20 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
try {
await this.#afterExecution(context)
} catch (err) {
this.emit('error', err as Error)
const error = err instanceof Error ? err : new Error(String(err))
this.#logger.error(
{ id: context.id, err: ensureLoggableError(error) },
'AfterExecution hook failed, restoring original TTL.'
)
this.emit('error', error)
context.ttl = originalTTL
}

if (!Number.isFinite(context.ttl) || !Number.isInteger(context.ttl) || context.ttl <= 0) {
this.#logger.warn(
{ id: context.id, ttl: context.ttl },
'Invalid TTL from afterExecution, restoring original TTL.'
)
this.emit('error', new TypeError('resultTTL must be a positive integer in milliseconds'))
context.ttl = originalTTL
}
Expand Down
Loading