From c5652de80b7ebeb6eb3c0bcd0bd7d5b84fd9ae15 Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Thu, 19 Feb 2026 16:09:55 +0100 Subject: [PATCH 1/2] feat: Added support for pino logging. Signed-off-by: Paolo Insogna --- README.md | 4 ++- package.json | 4 ++- pnpm-lock.yaml | 4 +++ src/consumer.ts | 66 ++++++++++++++++++++++++++++++++++++++------ src/producer.ts | 29 ++++++++++++++++++- src/queue.ts | 51 +++++++++++++++++++++++++++++----- src/storage/redis.ts | 7 +++++ src/types.ts | 4 +++ src/utils/logging.ts | 30 ++++++++++++++++++++ test/queue.test.ts | 38 +++++++++++++++++++++++-- 10 files changed, 216 insertions(+), 21 deletions(-) create mode 100644 src/utils/logging.ts diff --git a/README.md b/README.md index c98483b..73187e4 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,7 @@ const queue = new Queue(config) | `blockTimeout` | `number` | `5` | Seconds to wait when polling for jobs | | `visibilityTimeout` | `number` | `30000` | Milliseconds before a processing job is considered stalled | | `resultTTL` | `number` | `3600000` | Milliseconds to cache job results (1 hour) | +| `logger` | `pino.Logger` | `abstractLogger` | Logger used by queue/producer/consumer | | `afterExecution` | `AfterExecutionHook` | `undefined` | Hook called after execution and before persisting terminal state | | `payloadSerde` | `Serde` | `JsonSerde` | Custom serializer for job payloads | | `resultSerde` | `Serde` | `JsonSerde` | Custom serializer for job results | @@ -279,7 +280,8 @@ import { RedisStorage } from '@platformatic/job-queue' const storage = new RedisStorage({ url: 'redis://localhost:6379', - keyPrefix: 'myapp:' // Optional prefix for all keys + keyPrefix: 'myapp:', // Optional prefix for all keys + logger // Optional pino-compatible logger }) ``` diff --git a/package.json b/package.json index 9f88ffa..7b9a3dc 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,9 @@ "bench:redis": "REDIS_URL=${REDIS_URL:-redis://localhost:6379} node benchmark/request-response.ts", "bench:valkey": "REDIS_URL=${REDIS_URL:-redis://localhost:6380} node benchmark/request-response.ts" }, - "dependencies": {}, + "dependencies": { + "pino": "^10.3.1" + }, "optionalDependencies": { "fast-write-atomic": "^0.4.0", "iovalkey": "^0.2.0" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d6a0e4a..1f14614 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -7,6 +7,10 @@ settings: importers: .: + dependencies: + pino: + specifier: ^10.3.1 + version: 10.3.1 devDependencies: '@platformatic/flame': specifier: ^1.6.0 diff --git a/src/consumer.ts b/src/consumer.ts index b0891ef..c223f73 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -1,9 +1,11 @@ import { EventEmitter } from 'node:events' -import type { Storage } from './storage/types.ts' -import type { Serde } from './serde/index.ts' -import type { QueueMessage, Job, JobHandler, AfterExecutionHook, AfterExecutionContext } from './types.ts' +import type { Logger } from 'pino' import { MaxRetriesError } from './errors.ts' +import type { Serde } from './serde/index.ts' import { createJsonSerde } from './serde/index.ts' +import type { Storage } from './storage/types.ts' +import type { AfterExecutionContext, AfterExecutionHook, Job, JobHandler, QueueMessage } from './types.ts' +import { abstractLogger, ensureLoggableError } from './utils/logging.ts' interface ConsumerConfig { storage: Storage @@ -16,6 +18,7 @@ interface ConsumerConfig { resultTTL?: number visibilityTimeout?: number afterExecution?: AfterExecutionHook + logger?: Logger } interface ConsumerEvents { @@ -46,6 +49,7 @@ export class Consumer extends EventEmitter + #logger: Logger #handler: JobHandler | null = null #running = false @@ -65,6 +69,7 @@ export class Consumer extends EventEmitter extends EventEmitter): void { this.#handler = handler + this.#logger.debug('Registered consumer handler.') } /** * Start consuming jobs */ async start (): Promise { - if (this.#running) return + if (this.#running) { + this.#logger.trace('Consumer already running.') + return + } if (!this.#handler) { throw new Error('No handler registered. Call execute() first.') } @@ -86,6 +95,8 @@ export class Consumer extends EventEmitter extends EventEmitter { - this.emit('error', err) + const error = err instanceof Error ? err : new Error(String(err)) + this.#logger.error({ err: ensureLoggableError(error) }, 'Worker loop terminated with error.') + this.emit('error', error) }) + + this.#logger.debug('Consumer started.') } /** * Stop consuming jobs gracefully */ async stop (): Promise { - if (!this.#running) return + if (!this.#running) { + this.#logger.trace('Consumer already stopped.') + return + } + this.#logger.debug('Stopping consumer.') this.#running = false // Signal abort to worker loops @@ -122,6 +141,10 @@ export class Consumer extends EventEmitter setTimeout(resolve, 100)) } + if (this.#activeJobs > 0) { + this.#logger.warn({ activeJobs: this.#activeJobs }, 'Forcing abort of active jobs during stop.') + } + // Abort any remaining jobs for (const [, controller] of this.#jobAbortControllers) { controller.abort() @@ -130,6 +153,7 @@ export class Consumer extends EventEmitter extends EventEmitter extends EventEmitter setTimeout(resolve, 1000)) } @@ -173,10 +202,13 @@ export class Consumer extends EventEmitter extends EventEmitter = { @@ -233,6 +266,7 @@ export class Consumer extends EventEmitter extends EventEmitter extends EventEmitter extends EventEmitter extends EventEmitter { @@ -20,6 +22,7 @@ interface ProducerConfig { resultSerde?: Serde maxRetries?: number resultTTL?: number + logger?: Logger } /** @@ -31,6 +34,7 @@ export class Producer { #resultSerde: Serde #maxRetries: number #resultTTL: number + #logger: Logger constructor (config: ProducerConfig) { this.#storage = config.storage @@ -38,6 +42,7 @@ export class Producer { this.#resultSerde = config.resultSerde ?? createJsonSerde() this.#maxRetries = config.maxRetries ?? 3 this.#resultTTL = config.resultTTL ?? 3600000 // 1 hour + this.#logger = (config.logger ?? abstractLogger).child({ component: 'producer' }) } /** @@ -48,6 +53,7 @@ export class Producer { const maxAttempts = options?.maxAttempts ?? this.#maxRetries const resultTTL = options?.resultTTL ?? this.#resultTTL + this.#logger.trace({ id, maxAttempts, resultTTL }, 'Enqueue requested.') this.#validateResultTTL(resultTTL) const message: QueueMessage = { @@ -64,10 +70,12 @@ export class Producer { if (existingState) { const { status } = parseState(existingState) + this.#logger.debug({ id, status }, 'Duplicate enqueue detected.') if (status === 'completed') { const result = await this.getResult(id) if (result !== null) { + this.#logger.debug({ id }, 'Returning cached completed result.') return { status: 'completed', result } } } @@ -75,6 +83,7 @@ export class Producer { return { status: 'duplicate', existingState: status } } + this.#logger.debug({ id }, 'Job enqueued.') return { status: 'queued' } } @@ -83,11 +92,13 @@ export class Producer { */ async enqueueAndWait (id: string, payload: TPayload, options?: EnqueueAndWaitOptions): Promise { const timeout = options?.timeout ?? 30000 + this.#logger.trace({ id, timeout }, 'EnqueueAndWait requested.') // Subscribe BEFORE enqueue to avoid race conditions const { promise: resultPromise, resolve: resolveResult, reject: rejectResult } = Promise.withResolvers() const unsubscribe = await this.#storage.subscribeToJob(id, async status => { + this.#logger.trace({ id, status }, 'Received job notification.') if (status === 'completed') { const result = await this.getResult(id) if (result !== null) { @@ -108,6 +119,7 @@ export class Producer { // If already completed, return cached result immediately if (enqueueResult.status === 'completed') { + this.#logger.debug({ id }, 'EnqueueAndWait resolved from cached result.') return enqueueResult.result } @@ -115,16 +127,20 @@ export class Producer { if (enqueueResult.status === 'duplicate' && enqueueResult.existingState === 'failed') { const error = await this.#storage.getError(id) const errorMessage = error ? error.toString() : 'Job failed' + this.#logger.warn({ id }, 'EnqueueAndWait found already failed duplicate job.') throw new JobFailedError(id, errorMessage) } // Wait for result with timeout const { promise: timeoutPromise, reject: rejectTimeout } = Promise.withResolvers() timeoutId = setTimeout(() => { + this.#logger.warn({ id, timeout }, 'EnqueueAndWait timed out.') rejectTimeout(new TimeoutError(id, timeout)) }, timeout) - return await Promise.race([resultPromise, timeoutPromise]) + const result = await Promise.race([resultPromise, timeoutPromise]) + this.#logger.debug({ id }, 'EnqueueAndWait resolved.') + return result } finally { if (timeoutId !== undefined) { clearTimeout(timeoutId) @@ -137,9 +153,11 @@ export class Producer { * Cancel a pending job */ async cancel (id: string): Promise { + this.#logger.trace({ id }, 'Cancel requested.') const state = await this.#storage.getJobState(id) if (!state) { + this.#logger.trace({ id }, 'Cancel target not found.') return { status: 'not_found' } } @@ -156,6 +174,7 @@ export class Producer { // Can cancel if queued or failing const deleted = await this.#storage.deleteJob(id) if (deleted) { + this.#logger.debug({ id }, 'Job cancelled.') return { status: 'cancelled' } } @@ -170,6 +189,7 @@ export class Producer { if (!resultBuffer) { return null } + this.#logger.trace({ id }, 'Deserializing job result.') return this.#resultSerde.deserialize(resultBuffer) } @@ -177,6 +197,7 @@ export class Producer { * Update TTL for a terminal job payload (result for completed jobs, error for failed jobs). */ async updateResultTTL (id: string, ttlMs: number): Promise { + this.#logger.trace({ id, ttlMs }, 'UpdateResultTTL requested.') this.#validateResultTTL(ttlMs) const state = await this.#storage.getJobState(id) @@ -187,23 +208,28 @@ export class Producer { const { status } = parseState(state) if (status !== 'completed' && status !== 'failed') { + this.#logger.debug({ id, status }, 'UpdateResultTTL rejected for non-terminal job.') return { status: 'not_terminal' } } if (status === 'completed') { const existingResult = await this.#storage.getResult(id) if (!existingResult) { + this.#logger.warn({ id }, 'UpdateResultTTL missing completed payload.') return { status: 'missing_payload' } } await this.#storage.setResult(id, existingResult, ttlMs) + this.#logger.debug({ id, ttlMs }, 'Updated completed payload TTL.') return { status: 'updated' } } const existingError = await this.#storage.getError(id) if (!existingError) { + this.#logger.warn({ id }, 'UpdateResultTTL missing failed payload.') return { status: 'missing_payload' } } await this.#storage.setError(id, existingError, ttlMs) + this.#logger.debug({ id, ttlMs }, 'Updated failed payload TTL.') return { status: 'updated' } } @@ -247,6 +273,7 @@ export class Producer { #validateResultTTL (resultTTL: number): void { if (!Number.isFinite(resultTTL) || !Number.isInteger(resultTTL) || resultTTL <= 0) { + this.#logger.error({ resultTTL }, 'Invalid resultTTL provided.') throw new TypeError('resultTTL must be a positive integer in milliseconds') } } diff --git a/src/queue.ts b/src/queue.ts index da65514..49f73a0 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -1,5 +1,6 @@ import { EventEmitter } from 'node:events' import { randomUUID } from 'node:crypto' +import type { Logger } from 'pino' import type { Storage } from './storage/types.ts' import type { Serde } from './serde/index.ts' import type { @@ -17,6 +18,7 @@ import type { import { Producer } from './producer.ts' import { Consumer } from './consumer.ts' import { createJsonSerde } from './serde/index.ts' +import { abstractLogger, ensureLoggableError } from './utils/logging.ts' /** * Queue class combining Producer and Consumer functionality @@ -37,6 +39,7 @@ export class Queue extends EventEmitter | undefined + #logger: Logger constructor (config: QueueConfig) { super() @@ -51,13 +54,15 @@ export class Queue extends EventEmitter({ storage: this.#storage, payloadSerde: this.#payloadSerde, resultSerde: this.#resultSerde, maxRetries: this.#maxRetries, - resultTTL: this.#resultTTL + resultTTL: this.#resultTTL, + logger: this.#logger }) } @@ -65,8 +70,12 @@ export class Queue extends EventEmitter { - if (this.#started) return + if (this.#started) { + this.#logger.trace('Queue already started.') + return + } + this.#logger.debug('Starting queue.') await this.#storage.connect() this.#started = true @@ -75,6 +84,7 @@ export class Queue extends EventEmitter extends EventEmitter { - if (!this.#started) return + if (!this.#started) { + this.#logger.trace('Queue already stopped.') + return + } + + this.#logger.debug('Stopping queue.') if (this.#consumer) { await this.#consumer.stop() @@ -91,6 +106,7 @@ export class Queue extends EventEmitter extends EventEmitter): void { this.#handler = handler + this.#logger.debug('Registered queue handler.') // If already started, create and start consumer if (this.#started) { @@ -110,7 +127,9 @@ export class Queue extends EventEmitter> { + this.#logger.trace({ id }, 'Enqueue requested.') const result = await this.#producer.enqueue(id, payload, options) + this.#logger.trace({ id, status: result.status }, 'Enqueue completed.') if (result.status === 'queued') { this.emit('enqueued', id) } @@ -121,14 +140,19 @@ export class Queue extends EventEmitter { - return this.#producer.enqueueAndWait(id, payload, options) + this.#logger.trace({ id }, 'EnqueueAndWait requested.') + const result = await this.#producer.enqueueAndWait(id, payload, options) + this.#logger.trace({ id }, 'EnqueueAndWait resolved.') + return result } /** * Cancel a pending job */ async cancel (id: string): Promise { + this.#logger.trace({ id }, 'Cancel requested.') const result = await this.#producer.cancel(id) + this.#logger.trace({ id, status: result.status }, 'Cancel completed.') if (result.status === 'cancelled') { this.emit('cancelled', id) } @@ -146,7 +170,10 @@ export class Queue extends EventEmitter { - return this.#producer.updateResultTTL(id, ttlMs) + this.#logger.trace({ id, ttlMs }, 'UpdateResultTTL requested.') + const result = await this.#producer.updateResultTTL(id, ttlMs) + this.#logger.trace({ id, status: result.status }, 'UpdateResultTTL completed.') + return result } /** @@ -159,6 +186,8 @@ export class Queue extends EventEmitter({ storage: this.#storage, workerId: this.#workerId, @@ -169,33 +198,41 @@ export class Queue extends EventEmitter { + this.#logger.error({ err: ensureLoggableError(error) }, 'Consumer emitted error.') this.emit('error', error) }) this.#consumer.on('completed', (id, result) => { + this.#logger.debug({ id }, 'Job completed.') this.emit('completed', id, result) }) this.#consumer.on('failed', (id, error) => { + this.#logger.warn({ id, err: ensureLoggableError(error) }, 'Job failed.') this.emit('failed', id, error) }) this.#consumer.on('failing', (id, error, attempt) => { + this.#logger.warn({ id, attempt, err: ensureLoggableError(error) }, 'Job failing and retrying.') this.emit('failing', id, error, attempt) }) this.#consumer.on('requeued', id => { + this.#logger.debug({ id }, 'Job requeued.') this.emit('requeued', id) }) this.#consumer.execute(this.#handler) this.#consumer.start().catch(err => { - this.emit('error', err) + const error = err instanceof Error ? err : new Error(String(err)) + this.#logger.error({ err: ensureLoggableError(error) }, 'Failed to start consumer.') + this.emit('error', error) }) } } diff --git a/src/storage/redis.ts b/src/storage/redis.ts index 5ebb2ef..e7b13fa 100644 --- a/src/storage/redis.ts +++ b/src/storage/redis.ts @@ -2,14 +2,17 @@ import { type Redis } from 'iovalkey' import { EventEmitter } from 'node:events' import { readFileSync } from 'node:fs' import { join } from 'node:path' +import type { Logger } from 'pino' import type { Storage } from './types.ts' import { loadOptionalDependency } from './utils.ts' +import { abstractLogger } from '../utils/logging.ts' const EXPIRING_VALUE_HEADER_SIZE = 8 // First 8 bytes are the expireAt timestamp in milliseconds (Uint64 BE) interface RedisStorageConfig { url?: string keyPrefix?: string + logger?: Logger } interface ScriptSHAs { @@ -35,10 +38,12 @@ export class RedisStorage implements Storage { #eventEmitter = new EventEmitter({ captureRejections: true }) #notifyEmitter = new EventEmitter({ captureRejections: true }) #eventSubscription: boolean = false + #logger: Logger constructor (config: RedisStorageConfig = {}) { this.#url = config.url ?? process.env.REDIS_URL ?? 'redis://localhost:6379' this.#keyPrefix = config.keyPrefix ?? 'jq:' + this.#logger = (config.logger ?? abstractLogger).child({ component: 'redis-storage', keyPrefix: this.#keyPrefix }) // Disable max listeners warning for high-throughput scenarios this.#eventEmitter.setMaxListeners(0) @@ -273,6 +278,7 @@ export class RedisStorage implements Storage { const decoded = this.#decodeExpiringValue(result) if (!decoded) { + this.#logger.warn({ id, valueLength: result.length }, 'Failed to decode TTL envelope for result.') // Backward compatibility for legacy entries stored without envelope return result } @@ -297,6 +303,7 @@ export class RedisStorage implements Storage { const decoded = this.#decodeExpiringValue(result) if (!decoded) { + this.#logger.warn({ id, valueLength: result.length }, 'Failed to decode TTL envelope for error.') // Backward compatibility for legacy entries stored without envelope return result } diff --git a/src/types.ts b/src/types.ts index e272c09..5b7204e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,3 +1,4 @@ +import type { Logger } from 'pino' import type { Storage } from './storage/types.ts' import type { Serde } from './serde/index.ts' @@ -160,6 +161,9 @@ export interface QueueConfig { /** TTL for stored results and errors in ms (default: 3600000 = 1 hour) */ resultTTL?: number + + /** Logger instance (default: abstract logger / no-op) */ + logger?: Logger } /** diff --git a/src/utils/logging.ts b/src/utils/logging.ts new file mode 100644 index 0000000..03c1048 --- /dev/null +++ b/src/utils/logging.ts @@ -0,0 +1,30 @@ +import type { Logger } from 'pino' + +export function noop () {} + +export const abstractLogger: Logger = { + fatal: noop, + error: noop, + warn: noop, + info: noop, + debug: noop, + trace: noop, + // @ts-expect-error + child (): Logger { + return abstractLogger + } +} + +export function ensureLoggableError (error: Error): Error { + Reflect.defineProperty(error, 'message', { enumerable: true }) + + if ('code' in error) { + Reflect.defineProperty(error, 'code', { enumerable: true }) + } + + if ('stack' in error) { + Reflect.defineProperty(error, 'stack', { enumerable: true }) + } + + return error +} diff --git a/test/queue.test.ts b/test/queue.test.ts index f5ae9fd..6d845aa 100644 --- a/test/queue.test.ts +++ b/test/queue.test.ts @@ -1,8 +1,8 @@ -import { describe, it, beforeEach, afterEach } from 'node:test' import assert from 'node:assert' -import { setTimeout as sleep } from 'node:timers/promises' import { once } from 'node:events' -import { Queue, MemoryStorage, type Job } from '../src/index.ts' +import { afterEach, beforeEach, describe, it } from 'node:test' +import { setTimeout as sleep } from 'node:timers/promises' +import { MemoryStorage, Queue, type Job } from '../src/index.ts' describe('Queue', () => { let storage: MemoryStorage @@ -29,6 +29,38 @@ describe('Queue', () => { await queue.stop() }) + it('should accept a custom logger instance', async () => { + const logs: string[] = [] + const logger = { + fatal: () => {}, + error: () => { + logs.push('error') + }, + warn: () => { + logs.push('warn') + }, + info: () => { + logs.push('info') + }, + debug: () => { + logs.push('debug') + }, + trace: () => {}, + child () { + return this + } + } + + const localQueue = new Queue<{ value: number }, { result: number }>({ + storage, + logger: logger as unknown as import('pino').Logger + }) + + await localQueue.start() + await localQueue.stop() + assert.ok(logs.includes('debug')) + }) + it('should handle multiple start calls', async () => { await queue.start() await queue.start() From 16f99a5e369aaaa44d4edb56f2627aca2679655f Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Thu, 19 Feb 2026 16:13:06 +0100 Subject: [PATCH 2/2] fixup Signed-off-by: Paolo Insogna --- test/queue.test.ts | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/test/queue.test.ts b/test/queue.test.ts index 6d845aa..48f84a2 100644 --- a/test/queue.test.ts +++ b/test/queue.test.ts @@ -2,6 +2,7 @@ import assert from 'node:assert' import { once } from 'node:events' import { afterEach, beforeEach, describe, it } from 'node:test' import { setTimeout as sleep } from 'node:timers/promises' +import { type Logger } from 'pino' import { MemoryStorage, Queue, type Job } from '../src/index.ts' describe('Queue', () => { @@ -31,7 +32,7 @@ describe('Queue', () => { it('should accept a custom logger instance', async () => { const logs: string[] = [] - const logger = { + const logger: Logger = { fatal: () => {}, error: () => { logs.push('error') @@ -49,12 +50,9 @@ describe('Queue', () => { child () { return this } - } + } as unknown as Logger - const localQueue = new Queue<{ value: number }, { result: number }>({ - storage, - logger: logger as unknown as import('pino').Logger - }) + const localQueue = new Queue<{ value: number }, { result: number }>({ storage, logger }) await localQueue.start() await localQueue.stop()