From 38b10854e27e73c979390514cd05e73cbac6e326 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 10 Mar 2026 07:33:16 +0100 Subject: [PATCH 1/6] feat: Add named queue namespaces for multi-queue support Allow multiple named queues to share a single storage backend. Queue and Reaper accept an optional `name` config, which calls `storage.createNamespace(name)` to get an isolated view. RedisStorage namespaces share the parent's 3 Redis clients with ref-counted connect/disconnect. MemoryStorage returns independent instances. FileStorage uses subdirectories. Co-Authored-By: Claude Opus 4.6 --- src/queue.ts | 4 +- src/reaper.ts | 5 +- src/storage/file.ts | 4 + src/storage/memory.ts | 4 + src/storage/redis.ts | 94 ++++++++++++++-- src/storage/types.ts | 7 ++ src/types.ts | 3 + test/multi-queue.test.ts | 235 +++++++++++++++++++++++++++++++++++++++ 8 files changed, 347 insertions(+), 9 deletions(-) create mode 100644 test/multi-queue.test.ts diff --git a/src/queue.ts b/src/queue.ts index 7067bc9..cc2f4f2 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -44,7 +44,9 @@ export class Queue extends EventEmitter) { super() - this.#storage = config.storage + this.#storage = config.name + ? config.storage.createNamespace(config.name) + : config.storage this.#workerId = config.workerId ?? randomUUID() this.#payloadSerde = config.payloadSerde ?? createJsonSerde() this.#resultSerde = config.resultSerde ?? createJsonSerde() diff --git a/src/reaper.ts b/src/reaper.ts index 48486f0..bc30af3 100644 --- a/src/reaper.ts +++ b/src/reaper.ts @@ -17,6 +17,7 @@ interface LeaderElectionConfig { interface ReaperConfig { storage: Storage + name?: string payloadSerde?: Serde visibilityTimeout?: number leaderElection?: LeaderElectionConfig @@ -62,7 +63,9 @@ export class Reaper extends EventEmitter { constructor (config: ReaperConfig) { super() - this.#storage = config.storage + this.#storage = config.name + ? config.storage.createNamespace(config.name) + : config.storage this.#payloadSerde = config.payloadSerde ?? createJsonSerde() this.#visibilityTimeout = config.visibilityTimeout ?? 30000 this.#leaderElection = config.leaderElection ?? { enabled: false } diff --git a/src/storage/file.ts b/src/storage/file.ts index e84c6b2..3a20124 100644 --- a/src/storage/file.ts +++ b/src/storage/file.ts @@ -846,6 +846,10 @@ export class FileStorage implements Storage { } } + createNamespace (name: string): Storage { + return new FileStorage({ basePath: join(this.#basePath, name) }) + } + /** * Clear all data (useful for testing) */ diff --git a/src/storage/memory.ts b/src/storage/memory.ts index 035004f..9dd4b79 100644 --- a/src/storage/memory.ts +++ b/src/storage/memory.ts @@ -394,6 +394,10 @@ export class MemoryStorage implements Storage { } } + createNamespace (_name: string): Storage { + return new MemoryStorage() + } + /** * Clear all data (useful for testing) */ diff --git a/src/storage/redis.ts b/src/storage/redis.ts index e82fb61..85c7996 100644 --- a/src/storage/redis.ts +++ b/src/storage/redis.ts @@ -40,6 +40,14 @@ export class RedisStorage implements Storage { #eventSubscription: boolean = false #logger: Logger + // Namespace support + #parentStorage: RedisStorage | null = null // set for namespace views + #refCount: number = 0 // for root instance + + // PubSub handlers for namespace views (needed for cleanup on disconnect) + #messageHandler: ((channel: string, message: string) => void) | null = null + #pmessageHandler: ((_pattern: string, channel: string, message: string) => void) | null = null + constructor (config: RedisStorageConfig = {}) { this.#url = config.url ?? process.env.REDIS_URL ?? 'redis://localhost:6379' this.#keyPrefix = config.keyPrefix ?? 'jq:' @@ -103,7 +111,32 @@ export class RedisStorage implements Storage { } async connect (): Promise { - if (this.#client) return + // Namespace view: connect parent and copy shared clients + if (this.#parentStorage) { + if (this.#client) return // already connected + await this.#parentStorage.connect() + + // Copy shared clients and scripts from parent + this.#client = this.#parentStorage.#client + this.#blockingClient = this.#parentStorage.#blockingClient + this.#subscriber = this.#parentStorage.#subscriber + this.#scriptSHAs = this.#parentStorage.#scriptSHAs + + // Register own message handlers on shared subscriber + this.#messageHandler = (channel: string, message: string) => { + this.#handlePubSubMessage(channel, message) + } + this.#pmessageHandler = (_pattern: string, channel: string, message: string) => { + this.#handlePubSubMessage(channel, message) + } + this.#subscriber!.on('message', this.#messageHandler) + this.#subscriber!.on('pmessage', this.#pmessageHandler) + return + } + + // Root instance: create clients if needed, increment ref count + this.#refCount++ + if (this.#client) return // already connected, just increment ref count const redisModule = await loadOptionalDependency<{ Redis: new (url: string) => Redis }>('iovalkey', 'RedisStorage') @@ -114,18 +147,54 @@ export class RedisStorage implements Storage { // Load Lua scripts await this.#loadScripts() - // Set up pub/sub message handler - this.#subscriber.on('message', (channel: string, message: string) => { + // Set up pub/sub message handler for root + this.#messageHandler = (channel: string, message: string) => { this.#handlePubSubMessage(channel, message) - }) - - this.#subscriber.on('pmessage', (_pattern: string, channel: string, message: string) => { + } + this.#pmessageHandler = (_pattern: string, channel: string, message: string) => { this.#handlePubSubMessage(channel, message) - }) + } + this.#subscriber.on('message', this.#messageHandler) + this.#subscriber.on('pmessage', this.#pmessageHandler) } async disconnect (): Promise { + // Namespace view: remove own handlers, null references, disconnect parent + if (this.#parentStorage) { + if (this.#subscriber && this.#messageHandler) { + this.#subscriber.off('message', this.#messageHandler) + } + if (this.#subscriber && this.#pmessageHandler) { + this.#subscriber.off('pmessage', this.#pmessageHandler) + } + this.#messageHandler = null + this.#pmessageHandler = null + this.#client = null + this.#blockingClient = null + this.#subscriber = null + this.#scriptSHAs = null + + this.#eventEmitter.removeAllListeners() + this.#notifyEmitter.removeAllListeners() + this.#eventSubscription = false + + await this.#parentStorage.disconnect() + return + } + + // Root instance: decrement ref count, only destroy when 0 + this.#refCount-- + if (this.#refCount > 0) return + if (this.#subscriber) { + if (this.#messageHandler) { + this.#subscriber.off('message', this.#messageHandler) + } + if (this.#pmessageHandler) { + this.#subscriber.off('pmessage', this.#pmessageHandler) + } + this.#messageHandler = null + this.#pmessageHandler = null this.#subscriber.disconnect() this.#subscriber = null } @@ -503,6 +572,17 @@ export class RedisStorage implements Storage { } } + createNamespace (name: string): Storage { + const root = this.#parentStorage ?? this + const ns = new RedisStorage({ + url: root.#url, + keyPrefix: `${this.#keyPrefix}${name}:`, + logger: this.#logger + }) + ns.#parentStorage = root + return ns + } + /** * Clear all data (useful for testing) */ diff --git a/src/storage/types.ts b/src/storage/types.ts index acab600..2d9e5bd 100644 --- a/src/storage/types.ts +++ b/src/storage/types.ts @@ -219,6 +219,13 @@ export interface Storage { */ setJobExpiry (id: string, ttlMs: number): Promise + /** + * Create a namespaced view of this storage. + * The returned storage shares the same underlying connections (where applicable) + * but uses a different key prefix, providing isolated queues on the same backend. + */ + createNamespace (name: string): Storage + // ═══════════════════════════════════════════════════════════════════ // ATOMIC OPERATIONS (Lua scripts in Redis) // ═══════════════════════════════════════════════════════════════════ diff --git a/src/types.ts b/src/types.ts index 5b7204e..65ce639 100644 --- a/src/types.ts +++ b/src/types.ts @@ -132,6 +132,9 @@ export interface QueueConfig { /** Storage backend (required) */ storage: Storage + /** Queue name for namespace isolation (optional) */ + name?: string + /** Hook called after execution and before persisting terminal state */ afterExecution?: AfterExecutionHook diff --git a/test/multi-queue.test.ts b/test/multi-queue.test.ts new file mode 100644 index 0000000..0e7a93d --- /dev/null +++ b/test/multi-queue.test.ts @@ -0,0 +1,235 @@ +import assert from 'node:assert' +import { afterEach, beforeEach, describe, it } from 'node:test' +import { Queue } from '../src/queue.ts' +import { MemoryStorage } from '../src/storage/memory.ts' +import type { Job } from '../src/types.ts' +import { createLatch } from './helpers/events.ts' + +describe('Multi-Queue (named queues on shared storage)', () => { + let storage: MemoryStorage + let queueA: Queue<{ value: number }, { result: number }> + let queueB: Queue<{ value: string }, { result: string }> + + beforeEach(() => { + storage = new MemoryStorage() + queueA = new Queue({ + storage, + name: 'emails', + concurrency: 1, + visibilityTimeout: 5000 + }) + queueB = new Queue({ + storage, + name: 'images', + concurrency: 1, + visibilityTimeout: 5000 + }) + }) + + afterEach(async () => { + await queueA.stop() + await queueB.stop() + }) + + describe('job isolation', () => { + it('should process jobs independently across queues', async () => { + const latchA = createLatch() + const latchB = createLatch() + + queueA.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value * 2 } + }) + + queueB.execute(async (job: Job<{ value: string }>) => { + return { result: job.payload.value.toUpperCase() } + }) + + queueA.on('completed', () => latchA.resolve()) + queueB.on('completed', () => latchB.resolve()) + + await queueA.start() + await queueB.start() + + await queueA.enqueue('job-1', { value: 21 }) + await queueB.enqueue('job-1', { value: 'hello' }) + + await latchA.promise + await latchB.promise + + const resultA = await queueA.getResult('job-1') + const resultB = await queueB.getResult('job-1') + + assert.deepStrictEqual(resultA, { result: 42 }) + assert.deepStrictEqual(resultB, { result: 'HELLO' }) + }) + + it('should not deliver queue A jobs to queue B consumers', async () => { + const latchA = createLatch() + + queueA.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value * 2 } + }) + + // Queue B has a handler but should never receive anything + queueB.execute(async () => { + assert.fail('Queue B should not receive queue A jobs') + return { result: 'never' } + }) + + queueA.on('completed', () => latchA.resolve()) + + await queueA.start() + await queueB.start() + + await queueA.enqueue('job-1', { value: 5 }) + await latchA.promise + + const resultA = await queueA.getResult('job-1') + assert.deepStrictEqual(resultA, { result: 10 }) + }) + }) + + describe('dedup isolation', () => { + it('should allow same job ID in different queues', async () => { + await queueA.start() + await queueB.start() + + const resultA = await queueA.enqueue('shared-id', { value: 1 }) + const resultB = await queueB.enqueue('shared-id', { value: 'test' }) + + assert.strictEqual(resultA.status, 'queued') + assert.strictEqual(resultB.status, 'queued') + }) + + it('should enforce dedup within the same named queue', async () => { + await queueA.start() + + const result1 = await queueA.enqueue('dup-id', { value: 1 }) + const result2 = await queueA.enqueue('dup-id', { value: 2 }) + + assert.strictEqual(result1.status, 'queued') + assert.strictEqual(result2.status, 'duplicate') + }) + }) + + describe('result isolation', () => { + it('should store and retrieve results independently per queue', async () => { + const latchA = createLatch() + const latchB = createLatch() + + queueA.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value + 100 } + }) + + queueB.execute(async (job: Job<{ value: string }>) => { + return { result: `processed:${job.payload.value}` } + }) + + queueA.on('completed', () => latchA.resolve()) + queueB.on('completed', () => latchB.resolve()) + + await queueA.start() + await queueB.start() + + await queueA.enqueue('result-test', { value: 5 }) + await queueB.enqueue('result-test', { value: 'data' }) + + await latchA.promise + await latchB.promise + + const resultA = await queueA.getResult('result-test') + const resultB = await queueB.getResult('result-test') + + assert.deepStrictEqual(resultA, { result: 105 }) + assert.deepStrictEqual(resultB, { result: 'processed:data' }) + }) + }) + + describe('status isolation', () => { + it('should track status independently per queue', async () => { + const latchA = createLatch() + + queueA.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value } + }) + + queueA.on('completed', () => latchA.resolve()) + + await queueA.start() + await queueB.start() + + await queueA.enqueue('status-test', { value: 1 }) + await queueB.enqueue('status-test', { value: 'pending' }) + + await latchA.promise + + const statusA = await queueA.getStatus('status-test') + const statusB = await queueB.getStatus('status-test') + + assert.strictEqual(statusA?.state, 'completed') + assert.strictEqual(statusB?.state, 'queued') + }) + }) + + describe('backward compatibility', () => { + it('should work without a name (no namespace)', async () => { + const latch = createLatch() + const plainQueue = new Queue<{ value: number }, { result: number }>({ + storage, + concurrency: 1, + visibilityTimeout: 5000 + }) + + plainQueue.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value * 3 } + }) + + plainQueue.on('completed', () => latch.resolve()) + + await plainQueue.start() + await plainQueue.enqueue('plain-job', { value: 10 }) + + await latch.promise + + const result = await plainQueue.getResult('plain-job') + assert.deepStrictEqual(result, { result: 30 }) + + await plainQueue.stop() + }) + }) + + describe('connect/disconnect lifecycle', () => { + it('should handle multiple start/stop cycles', async () => { + queueA.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value } + }) + + await queueA.start() + await queueB.start() + + await queueA.stop() + await queueB.stop() + + // Second cycle + queueA = new Queue({ + storage, + name: 'emails', + concurrency: 1, + visibilityTimeout: 5000 + }) + queueA.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value } + }) + + const latch = createLatch() + queueA.on('completed', () => latch.resolve()) + + await queueA.start() + await queueA.enqueue('cycle-test', { value: 7 }) + await latch.promise + + const result = await queueA.getResult('cycle-test') + assert.deepStrictEqual(result, { result: 7 }) + }) + }) +}) From 9a62486a498f2b6b0cc540a1898f9de519529610 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 10 Mar 2026 21:52:51 +0100 Subject: [PATCH 2/6] feat: Add PostgreSQL storage backend PgStorage uses SELECT FOR UPDATE SKIP LOCKED for concurrent dequeue (pg-boss pattern) and LISTEN/NOTIFY for pub/sub notifications. Shares pool/listener connections across namespaces with ref counting. Co-Authored-By: Claude Opus 4.6 --- docker-compose.yml | 14 + package.json | 5 +- src/index.ts | 1 + src/storage/pg.ts | 955 ++++++++++++++++++++++++++++++++++++++++ test/fixtures/pg.ts | 13 + test/pg-storage.test.ts | 498 +++++++++++++++++++++ 6 files changed, 1484 insertions(+), 2 deletions(-) create mode 100644 src/storage/pg.ts create mode 100644 test/fixtures/pg.ts create mode 100644 test/pg-storage.test.ts diff --git a/docker-compose.yml b/docker-compose.yml index d8b8d01..2b7d3d1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,3 +18,17 @@ services: interval: 1s timeout: 3s retries: 5 + + postgres: + image: postgres:17-alpine + ports: + - "127.0.0.1:5432:5432" + environment: + POSTGRES_DB: job_queue_test + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 1s + timeout: 3s + retries: 5 diff --git a/package.json b/package.json index 0a5cfda..26b18c2 100644 --- a/package.json +++ b/package.json @@ -47,7 +47,8 @@ }, "optionalDependencies": { "fast-write-atomic": "^0.4.0", - "iovalkey": "^0.2.0" + "iovalkey": "^0.2.0", + "pg": "^8.20.0" }, "devDependencies": { "@platformatic/flame": "^1.6.0", @@ -64,4 +65,4 @@ "engines": { "node": ">=22.19.0" } -} \ No newline at end of file +} diff --git a/src/index.ts b/src/index.ts index a3f6e9e..e0071f7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -36,6 +36,7 @@ export type { Storage } from './storage/types.ts' export { MemoryStorage } from './storage/memory.ts' export { RedisStorage } from './storage/redis.ts' export { FileStorage } from './storage/file.ts' +export { PgStorage } from './storage/pg.ts' // Queue export { Queue } from './queue.ts' diff --git a/src/storage/pg.ts b/src/storage/pg.ts new file mode 100644 index 0000000..7a4b655 --- /dev/null +++ b/src/storage/pg.ts @@ -0,0 +1,955 @@ +import { EventEmitter } from 'node:events' +import type { Logger } from 'pino' +import type { Storage } from './types.ts' +import { loadOptionalDependency } from './utils.ts' +import { abstractLogger } from '../utils/logging.ts' + +// Minimal type definitions for pg (optional dependency) +interface PgPoolConfig { + connectionString?: string + max?: number + application_name?: string + [key: string]: unknown +} + +interface PgQueryResult { + rows: Record[] + rowCount: number | null +} + +interface PgPool { + query (text: string, values?: unknown[]): Promise + connect (): Promise + end (): Promise + on (event: string, handler: (...args: unknown[]) => void): void +} + +interface PgPoolClient { + query (text: string, values?: unknown[]): Promise + release (destroy?: boolean): void +} + +interface PgClient { + query (text: string, values?: unknown[]): Promise + connect (): Promise + end (): Promise + on (event: string, handler: (...args: unknown[]) => void): void + off (event: string, handler: (...args: unknown[]) => void): void +} + +interface PgNotification { + channel: string + payload?: string +} + +interface PgModule { + Pool: new (config: PgPoolConfig) => PgPool + Client: new (config: PgPoolConfig) => PgClient +} + +interface PgStorageConfig { + connectionString?: string + tablePrefix?: string + logger?: Logger +} + +interface DequeueWaiter { + workerId: string + resolve: (value: Buffer | null) => void + timeoutId: ReturnType +} + +/** + * PostgreSQL storage implementation + * + * Uses SELECT FOR UPDATE SKIP LOCKED for concurrent dequeue (pg-boss pattern) + * and LISTEN/NOTIFY for pub/sub notifications. + * + * Connection architecture: + * - Pool: for all queries (enqueue, state, results, etc.) + * - Dedicated listener client: stays in LISTEN mode for pub/sub + */ +export class PgStorage implements Storage { + #connectionString: string + #tablePrefix: string + #pool: PgPool | null = null + #listener: PgClient | null = null + #eventEmitter = new EventEmitter({ captureRejections: true }) + #notifyEmitter = new EventEmitter({ captureRejections: true }) + #eventSubscription: boolean = false + #logger: Logger + #dequeueWaiters: DequeueWaiter[] = [] + + // Namespace support + #parentStorage: PgStorage | null = null + #refCount: number = 0 + + // Notification handler (stored for removal on disconnect) + #notificationHandler: ((msg: PgNotification) => void) | null = null + + // Table names (computed from prefix) + #jobsTable: string + #queueTable: string + #processingTable: string + #resultsTable: string + #errorsTable: string + #workersTable: string + #locksTable: string + + // Channel names + #notifyChannel: string + #eventsChannel: string + #newJobChannel: string + + constructor (config: PgStorageConfig = {}) { + this.#connectionString = config.connectionString ?? process.env.DATABASE_URL ?? 'postgresql://localhost:5432/job_queue' + this.#tablePrefix = config.tablePrefix ?? 'jq_' + this.#logger = (config.logger ?? abstractLogger).child({ component: 'pg-storage', tablePrefix: this.#tablePrefix }) + + // Compute table names + this.#jobsTable = `${this.#tablePrefix}jobs` + this.#queueTable = `${this.#tablePrefix}queue` + this.#processingTable = `${this.#tablePrefix}processing` + this.#resultsTable = `${this.#tablePrefix}results` + this.#errorsTable = `${this.#tablePrefix}errors` + this.#workersTable = `${this.#tablePrefix}workers` + this.#locksTable = `${this.#tablePrefix}locks` + + // Compute channel names (underscores only, safe for LISTEN) + this.#notifyChannel = `${this.#tablePrefix}notify` + this.#eventsChannel = `${this.#tablePrefix}events` + this.#newJobChannel = `${this.#tablePrefix}new_job` + + this.#eventEmitter.setMaxListeners(0) + this.#notifyEmitter.setMaxListeners(0) + } + + // ═══════════════════════════════════════════════════════════════════ + // LIFECYCLE + // ═══════════════════════════════════════════════════════════════════ + + async connect (): Promise { + // Namespace view: connect parent and share pool/listener + if (this.#parentStorage) { + if (this.#pool) return // already connected + await this.#parentStorage.connect() + + this.#pool = this.#parentStorage.#pool + this.#listener = this.#parentStorage.#listener + + // Create own tables + await this.#createSchema() + + // Register own notification handler on shared listener + this.#notificationHandler = (msg: PgNotification) => { + this.#handleNotification(msg) + } + this.#listener!.on('notification', this.#notificationHandler) + + // LISTEN on own channels + await this.#listenChannels() + return + } + + // Root instance: create pool/listener if needed, increment ref count + this.#refCount++ + if (this.#pool) return + + const pgModule = await loadOptionalDependency('pg', 'PgStorage') + + this.#pool = new pgModule.Pool({ + connectionString: this.#connectionString, + application_name: 'job-queue' + }) + + this.#pool.on('error', (err: unknown) => { + this.#logger.error({ err }, 'Pool error.') + }) + + this.#listener = new pgModule.Client({ + connectionString: this.#connectionString, + application_name: 'job-queue-listener' + }) + await this.#listener.connect() + + // Create tables + await this.#createSchema() + + // Set up notification handler + this.#notificationHandler = (msg: PgNotification) => { + this.#handleNotification(msg) + } + this.#listener.on('notification', this.#notificationHandler) + + // LISTEN on channels + await this.#listenChannels() + } + + async disconnect (): Promise { + // Namespace view + if (this.#parentStorage) { + // UNLISTEN own channels + if (this.#listener) { + await this.#listener.query(`UNLISTEN "${this.#notifyChannel}"`).catch(() => {}) + await this.#listener.query(`UNLISTEN "${this.#eventsChannel}"`).catch(() => {}) + await this.#listener.query(`UNLISTEN "${this.#newJobChannel}"`).catch(() => {}) + + if (this.#notificationHandler) { + this.#listener.off('notification', this.#notificationHandler) + this.#notificationHandler = null + } + } + + this.#clearDequeueWaiters() + this.#pool = null + this.#listener = null + + this.#eventEmitter.removeAllListeners() + this.#notifyEmitter.removeAllListeners() + this.#eventSubscription = false + + await this.#parentStorage.disconnect() + return + } + + // Root instance + this.#refCount-- + if (this.#refCount > 0) return + + if (this.#listener) { + if (this.#notificationHandler) { + this.#listener.off('notification', this.#notificationHandler) + this.#notificationHandler = null + } + await this.#listener.end().catch(() => {}) + this.#listener = null + } + + this.#clearDequeueWaiters() + + if (this.#pool) { + await this.#pool.end().catch(() => {}) + this.#pool = null + } + + this.#eventEmitter.removeAllListeners() + this.#notifyEmitter.removeAllListeners() + this.#eventSubscription = false + } + + async #createSchema (): Promise { + await this.#pool!.query(` + CREATE TABLE IF NOT EXISTS "${this.#jobsTable}" ( + id TEXT PRIMARY KEY, + state TEXT NOT NULL, + expires_at BIGINT + ) + `) + + await this.#pool!.query(` + CREATE TABLE IF NOT EXISTS "${this.#queueTable}" ( + seq BIGSERIAL PRIMARY KEY, + message BYTEA NOT NULL + ) + `) + + await this.#pool!.query(` + CREATE TABLE IF NOT EXISTS "${this.#processingTable}" ( + seq BIGSERIAL PRIMARY KEY, + worker_id TEXT NOT NULL, + message BYTEA NOT NULL + ) + `) + + // Index for worker lookups on processing table + await this.#pool!.query(` + CREATE INDEX IF NOT EXISTS "${this.#processingTable}_worker_idx" + ON "${this.#processingTable}" (worker_id) + `) + + await this.#pool!.query(` + CREATE TABLE IF NOT EXISTS "${this.#resultsTable}" ( + id TEXT PRIMARY KEY, + data BYTEA NOT NULL, + expires_at BIGINT NOT NULL + ) + `) + + await this.#pool!.query(` + CREATE TABLE IF NOT EXISTS "${this.#errorsTable}" ( + id TEXT PRIMARY KEY, + data BYTEA NOT NULL, + expires_at BIGINT NOT NULL + ) + `) + + await this.#pool!.query(` + CREATE TABLE IF NOT EXISTS "${this.#workersTable}" ( + worker_id TEXT PRIMARY KEY, + expires_at BIGINT NOT NULL + ) + `) + + await this.#pool!.query(` + CREATE TABLE IF NOT EXISTS "${this.#locksTable}" ( + lock_key TEXT PRIMARY KEY, + owner_id TEXT NOT NULL, + expires_at BIGINT NOT NULL + ) + `) + } + + async #listenChannels (): Promise { + await this.#listener!.query(`LISTEN "${this.#notifyChannel}"`) + await this.#listener!.query(`LISTEN "${this.#eventsChannel}"`) + await this.#listener!.query(`LISTEN "${this.#newJobChannel}"`) + } + + #handleNotification (msg: PgNotification): void { + if (msg.channel === this.#newJobChannel) { + this.#notifyDequeueWaiters() + return + } + + if (msg.channel === this.#notifyChannel && msg.payload) { + // payload format: jobId:status + const sepIdx = msg.payload.indexOf(':') + if (sepIdx > 0) { + const jobId = msg.payload.substring(0, sepIdx) + const status = msg.payload.substring(sepIdx + 1) + this.#notifyEmitter.emit(`notify:${jobId}`, status as 'completed' | 'failed' | 'failing') + } + return + } + + if (msg.channel === this.#eventsChannel && msg.payload) { + // payload format: id:event + const sepIdx = msg.payload.indexOf(':') + if (sepIdx > 0) { + const id = msg.payload.substring(0, sepIdx) + const event = msg.payload.substring(sepIdx + 1) + this.#eventEmitter.emit('event', id, event) + } + } + } + + #clearDequeueWaiters (): void { + for (const waiter of this.#dequeueWaiters) { + clearTimeout(waiter.timeoutId) + waiter.resolve(null) + } + this.#dequeueWaiters = [] + } + + #notifyDequeueWaiters (): void { + // Try to fulfill waiting dequeue calls + // Each waiter will do its own SELECT FOR UPDATE SKIP LOCKED, + // so concurrent fulfillment is safe + const waiters = this.#dequeueWaiters.splice(0) + for (const waiter of waiters) { + this.#tryDequeue(waiter.workerId).then(result => { + clearTimeout(waiter.timeoutId) + if (result) { + waiter.resolve(result) + } else { + // No job available, put waiter back + this.#dequeueWaiters.push(waiter) + } + }).catch(() => { + // On error, put waiter back + this.#dequeueWaiters.push(waiter) + }) + } + } + + // ═══════════════════════════════════════════════════════════════════ + // QUEUE OPERATIONS + // ═══════════════════════════════════════════════════════════════════ + + async enqueue (id: string, message: Buffer, timestamp: number): Promise { + const state = `queued:${timestamp}` + const now = Date.now() + + const client = await this.#pool!.connect() + try { + await client.query('BEGIN') + + // Check for existing job (with dedup expiry handling) + const existing = await client.query( + `SELECT state, expires_at FROM "${this.#jobsTable}" WHERE id = $1 FOR UPDATE`, + [id] + ) + + if (existing.rows.length > 0) { + const row = existing.rows[0] + const expiresAt = row.expires_at as number | null + if (expiresAt && now >= expiresAt) { + // Expired: remove and fall through to enqueue as new + await client.query(`DELETE FROM "${this.#jobsTable}" WHERE id = $1`, [id]) + } else { + await client.query('COMMIT') + return row.state as string + } + } + + // Insert new job + await client.query( + `INSERT INTO "${this.#jobsTable}" (id, state) VALUES ($1, $2)`, + [id, state] + ) + + // Push to queue + await client.query( + `INSERT INTO "${this.#queueTable}" (message) VALUES ($1)`, + [message] + ) + + await client.query('COMMIT') + + // Publish event and notify (outside transaction so NOTIFY fires immediately) + await this.publishEvent(id, 'queued') + await this.#pool!.query(`SELECT pg_notify($1, '')`, [this.#newJobChannel]) + + return null + } catch (err) { + await client.query('ROLLBACK').catch(() => {}) + throw err + } finally { + client.release() + } + } + + async dequeue (workerId: string, timeout: number): Promise { + // Try immediate fetch + const result = await this.#tryDequeue(workerId) + if (result) return result + + // Wait for NOTIFY or timeout + return new Promise(resolve => { + const timeoutId = setTimeout(() => { + const index = this.#dequeueWaiters.findIndex(w => w.resolve === resolve) + if (index !== -1) { + this.#dequeueWaiters.splice(index, 1) + } + resolve(null) + }, timeout * 1000) + + this.#dequeueWaiters.push({ workerId, resolve, timeoutId }) + }) + } + + async #tryDequeue (workerId: string): Promise { + // Atomic: delete from queue (SKIP LOCKED) + insert into processing + const result = await this.#pool!.query(` + WITH deleted AS ( + DELETE FROM "${this.#queueTable}" + WHERE seq = ( + SELECT seq FROM "${this.#queueTable}" + ORDER BY seq + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + RETURNING message + ) + INSERT INTO "${this.#processingTable}" (worker_id, message) + SELECT $1, message FROM deleted + RETURNING message + `, [workerId]) + + if (result.rows.length === 0) return null + return result.rows[0].message as Buffer + } + + async requeue (id: string, message: Buffer, workerId: string): Promise { + const client = await this.#pool!.connect() + try { + await client.query('BEGIN') + + // Remove from processing queue + await client.query( + `DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, + [workerId, message] + ) + + // Add to front of main queue (use seq = 0 trick won't work since BIGSERIAL auto-increments) + // Just insert normally - ordering is by seq which is auto-increment + await client.query( + `INSERT INTO "${this.#queueTable}" (message) VALUES ($1)`, + [message] + ) + + await client.query('COMMIT') + + // Notify waiters + await this.#pool!.query(`SELECT pg_notify($1, '')`, [this.#newJobChannel]) + } catch (err) { + await client.query('ROLLBACK').catch(() => {}) + throw err + } finally { + client.release() + } + } + + async ack (id: string, message: Buffer, workerId: string): Promise { + await this.#pool!.query( + `DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, + [workerId, message] + ) + } + + // ═══════════════════════════════════════════════════════════════════ + // JOB STATE + // ═══════════════════════════════════════════════════════════════════ + + async getJobState (id: string): Promise { + const result = await this.#pool!.query( + `SELECT state, expires_at FROM "${this.#jobsTable}" WHERE id = $1`, + [id] + ) + + if (result.rows.length === 0) return null + + const row = result.rows[0] + const expiresAt = row.expires_at as number | null + if (expiresAt && Date.now() >= expiresAt) { + await this.#pool!.query(`DELETE FROM "${this.#jobsTable}" WHERE id = $1`, [id]) + return null + } + + return row.state as string + } + + async setJobState (id: string, state: string): Promise { + await this.#pool!.query( + `UPDATE "${this.#jobsTable}" SET state = $2 WHERE id = $1`, + [id, state] + ) + } + + async deleteJob (id: string): Promise { + const result = await this.#pool!.query( + `DELETE FROM "${this.#jobsTable}" WHERE id = $1`, + [id] + ) + + if (result.rowCount && result.rowCount > 0) { + await this.publishEvent(id, 'cancelled') + return true + } + return false + } + + async getJobStates (ids: string[]): Promise> { + const result = new Map() + if (ids.length === 0) return result + + const rows = await this.#pool!.query( + `SELECT id, state, expires_at FROM "${this.#jobsTable}" WHERE id = ANY($1)`, + [ids] + ) + + const now = Date.now() + const found = new Set() + const expiredIds: string[] = [] + + for (const row of rows.rows) { + const id = row.id as string + const expiresAt = row.expires_at as number | null + found.add(id) + + if (expiresAt && now >= expiresAt) { + expiredIds.push(id) + result.set(id, null) + } else { + result.set(id, row.state as string) + } + } + + // Clean up expired entries + if (expiredIds.length > 0) { + await this.#pool!.query( + `DELETE FROM "${this.#jobsTable}" WHERE id = ANY($1)`, + [expiredIds] + ) + } + + // Set null for IDs not found + for (const id of ids) { + if (!found.has(id)) { + result.set(id, null) + } + } + + return result + } + + async setJobExpiry (id: string, ttlMs: number): Promise { + const expiresAt = Date.now() + ttlMs + await this.#pool!.query( + `UPDATE "${this.#jobsTable}" SET expires_at = $2 WHERE id = $1`, + [id, expiresAt] + ) + } + + // ═══════════════════════════════════════════════════════════════════ + // RESULTS + // ═══════════════════════════════════════════════════════════════════ + + async setResult (id: string, result: Buffer, ttlMs: number): Promise { + const expiresAt = Date.now() + ttlMs + await this.#pool!.query( + `INSERT INTO "${this.#resultsTable}" (id, data, expires_at) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO UPDATE SET data = $2, expires_at = $3`, + [id, result, expiresAt] + ) + } + + async getResult (id: string): Promise { + const result = await this.#pool!.query( + `SELECT data, expires_at FROM "${this.#resultsTable}" WHERE id = $1`, + [id] + ) + + if (result.rows.length === 0) return null + + const row = result.rows[0] + if (Date.now() > (row.expires_at as number)) { + await this.#pool!.query(`DELETE FROM "${this.#resultsTable}" WHERE id = $1`, [id]) + return null + } + + return row.data as Buffer + } + + async setError (id: string, error: Buffer, ttlMs: number): Promise { + const expiresAt = Date.now() + ttlMs + await this.#pool!.query( + `INSERT INTO "${this.#errorsTable}" (id, data, expires_at) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO UPDATE SET data = $2, expires_at = $3`, + [id, error, expiresAt] + ) + } + + async getError (id: string): Promise { + const result = await this.#pool!.query( + `SELECT data, expires_at FROM "${this.#errorsTable}" WHERE id = $1`, + [id] + ) + + if (result.rows.length === 0) return null + + const row = result.rows[0] + if (Date.now() > (row.expires_at as number)) { + await this.#pool!.query(`DELETE FROM "${this.#errorsTable}" WHERE id = $1`, [id]) + return null + } + + return row.data as Buffer + } + + // ═══════════════════════════════════════════════════════════════════ + // WORKERS + // ═══════════════════════════════════════════════════════════════════ + + async registerWorker (workerId: string, ttlMs: number): Promise { + const expiresAt = Date.now() + ttlMs + await this.#pool!.query( + `INSERT INTO "${this.#workersTable}" (worker_id, expires_at) + VALUES ($1, $2) + ON CONFLICT (worker_id) DO UPDATE SET expires_at = $2`, + [workerId, expiresAt] + ) + } + + async refreshWorker (workerId: string, ttlMs: number): Promise { + await this.registerWorker(workerId, ttlMs) + } + + async unregisterWorker (workerId: string): Promise { + if (!this.#pool) return + await this.#pool.query( + `DELETE FROM "${this.#workersTable}" WHERE worker_id = $1`, + [workerId] + ) + await this.#pool.query( + `DELETE FROM "${this.#processingTable}" WHERE worker_id = $1`, + [workerId] + ) + } + + async getWorkers (): Promise { + const result = await this.#pool!.query( + `SELECT worker_id FROM "${this.#workersTable}" WHERE expires_at > $1`, + [Date.now()] + ) + return result.rows.map(row => row.worker_id as string) + } + + async getProcessingJobs (workerId: string): Promise { + const result = await this.#pool!.query( + `SELECT message FROM "${this.#processingTable}" WHERE worker_id = $1`, + [workerId] + ) + return result.rows.map(row => row.message as Buffer) + } + + // ═══════════════════════════════════════════════════════════════════ + // NOTIFICATIONS (for request/response) + // ═══════════════════════════════════════════════════════════════════ + + async subscribeToJob ( + id: string, + handler: (status: 'completed' | 'failed' | 'failing') => void + ): Promise<() => Promise> { + const eventName = `notify:${id}` + this.#notifyEmitter.on(eventName, handler) + + return async () => { + this.#notifyEmitter.off(eventName, handler) + } + } + + async notifyJobComplete (id: string, status: 'completed' | 'failed' | 'failing'): Promise { + await this.#pool!.query( + `SELECT pg_notify($1, $2)`, + [this.#notifyChannel, `${id}:${status}`] + ) + } + + // ═══════════════════════════════════════════════════════════════════ + // EVENTS (for monitoring/reaper) + // ═══════════════════════════════════════════════════════════════════ + + async subscribeToEvents (handler: (id: string, event: string) => void): Promise<() => Promise> { + this.#eventEmitter.on('event', handler) + this.#eventSubscription = true + + return async () => { + this.#eventEmitter.off('event', handler) + } + } + + async publishEvent (id: string, event: string): Promise { + await this.#pool!.query( + `SELECT pg_notify($1, $2)`, + [this.#eventsChannel, `${id}:${event}`] + ) + } + + // ═══════════════════════════════════════════════════════════════════ + // ATOMIC OPERATIONS + // ═══════════════════════════════════════════════════════════════════ + + async completeJob (id: string, message: Buffer, workerId: string, result: Buffer, resultTTL: number): Promise { + const timestamp = Date.now() + const state = `completed:${timestamp}` + const expiresAt = timestamp + resultTTL + + const client = await this.#pool!.connect() + try { + await client.query('BEGIN') + + // Set state to completed + dedup expiry + await client.query( + `UPDATE "${this.#jobsTable}" SET state = $2, expires_at = $3 WHERE id = $1`, + [id, state, expiresAt] + ) + + // Store result + await client.query( + `INSERT INTO "${this.#resultsTable}" (id, data, expires_at) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO UPDATE SET data = $2, expires_at = $3`, + [id, result, expiresAt] + ) + + // Remove from processing queue + await client.query( + `DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, + [workerId, message] + ) + + await client.query('COMMIT') + } catch (err) { + await client.query('ROLLBACK').catch(() => {}) + throw err + } finally { + client.release() + } + + // Notify after commit (NOTIFY is transactional, but we do it separately for the emitter) + await this.notifyJobComplete(id, 'completed') + await this.publishEvent(id, 'completed') + } + + async failJob (id: string, message: Buffer, workerId: string, error: Buffer, errorTTL: number): Promise { + const timestamp = Date.now() + const state = `failed:${timestamp}` + const expiresAt = timestamp + errorTTL + + const client = await this.#pool!.connect() + try { + await client.query('BEGIN') + + // Set state to failed + dedup expiry + await client.query( + `UPDATE "${this.#jobsTable}" SET state = $2, expires_at = $3 WHERE id = $1`, + [id, state, expiresAt] + ) + + // Store error + await client.query( + `INSERT INTO "${this.#errorsTable}" (id, data, expires_at) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO UPDATE SET data = $2, expires_at = $3`, + [id, error, expiresAt] + ) + + // Remove from processing queue + await client.query( + `DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, + [workerId, message] + ) + + await client.query('COMMIT') + } catch (err) { + await client.query('ROLLBACK').catch(() => {}) + throw err + } finally { + client.release() + } + + // Notify after commit + await this.notifyJobComplete(id, 'failed') + await this.publishEvent(id, 'failed') + } + + async retryJob (id: string, message: Buffer, workerId: string, attempts: number): Promise { + const timestamp = Date.now() + const state = `failing:${timestamp}:${attempts}` + + // Get the old message from processing queue + const processingJobs = await this.getProcessingJobs(workerId) + let oldMessage: Buffer | null = null + + for (const job of processingJobs) { + try { + const parsed = JSON.parse(job.toString()) + if (parsed.id === id) { + oldMessage = job + break + } + } catch { + // Ignore parse errors + } + } + + const client = await this.#pool!.connect() + try { + await client.query('BEGIN') + + // Set state to failing + await client.query( + `UPDATE "${this.#jobsTable}" SET state = $2 WHERE id = $1`, + [id, state] + ) + + // Remove old message from processing queue + if (oldMessage) { + await client.query( + `DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, + [workerId, oldMessage] + ) + } + + // Add new message to queue + await client.query( + `INSERT INTO "${this.#queueTable}" (message) VALUES ($1)`, + [message] + ) + + await client.query('COMMIT') + } catch (err) { + await client.query('ROLLBACK').catch(() => {}) + throw err + } finally { + client.release() + } + + // Notify after commit + await this.notifyJobComplete(id, 'failing') + await this.publishEvent(id, 'failing') + await this.#pool!.query(`SELECT pg_notify($1, '')`, [this.#newJobChannel]) + } + + // ═══════════════════════════════════════════════════════════════════ + // LEADER ELECTION + // ═══════════════════════════════════════════════════════════════════ + + async acquireLeaderLock (lockKey: string, ownerId: string, ttlMs: number): Promise { + const expiresAt = Date.now() + ttlMs + + // Try to insert, or take over if expired + const result = await this.#pool!.query( + `INSERT INTO "${this.#locksTable}" (lock_key, owner_id, expires_at) + VALUES ($1, $2, $3) + ON CONFLICT (lock_key) DO UPDATE + SET owner_id = $2, expires_at = $3 + WHERE "${this.#locksTable}".expires_at < $4 + RETURNING lock_key`, + [lockKey, ownerId, expiresAt, Date.now()] + ) + + return result.rows.length > 0 + } + + async renewLeaderLock (lockKey: string, ownerId: string, ttlMs: number): Promise { + const expiresAt = Date.now() + ttlMs + const result = await this.#pool!.query( + `UPDATE "${this.#locksTable}" SET expires_at = $3 WHERE lock_key = $1 AND owner_id = $2`, + [lockKey, ownerId, expiresAt] + ) + return result.rowCount !== null && result.rowCount > 0 + } + + async releaseLeaderLock (lockKey: string, ownerId: string): Promise { + if (!this.#pool) return false + const result = await this.#pool.query( + `DELETE FROM "${this.#locksTable}" WHERE lock_key = $1 AND owner_id = $2`, + [lockKey, ownerId] + ) + return result.rowCount !== null && result.rowCount > 0 + } + + // ═══════════════════════════════════════════════════════════════════ + // NAMESPACE + // ═══════════════════════════════════════════════════════════════════ + + createNamespace (name: string): Storage { + const root = this.#parentStorage ?? this + const ns = new PgStorage({ + connectionString: root.#connectionString, + tablePrefix: `${this.#tablePrefix}${name}_`, + logger: this.#logger + }) + ns.#parentStorage = root + return ns + } + + /** + * Clear all data (useful for testing) + */ + async clear (): Promise { + if (!this.#pool) return + + await this.#pool.query(`TRUNCATE "${this.#queueTable}" RESTART IDENTITY`) + await this.#pool.query(`TRUNCATE "${this.#processingTable}" RESTART IDENTITY`) + await this.#pool.query(`DELETE FROM "${this.#jobsTable}"`) + await this.#pool.query(`DELETE FROM "${this.#resultsTable}"`) + await this.#pool.query(`DELETE FROM "${this.#errorsTable}"`) + await this.#pool.query(`DELETE FROM "${this.#workersTable}"`) + await this.#pool.query(`DELETE FROM "${this.#locksTable}"`) + } +} diff --git a/test/fixtures/pg.ts b/test/fixtures/pg.ts new file mode 100644 index 0000000..30d8ff5 --- /dev/null +++ b/test/fixtures/pg.ts @@ -0,0 +1,13 @@ +import { PgStorage } from '../../src/storage/pg.ts' + +/** + * Create a PgStorage instance for testing. + * Each test gets its own table prefix to avoid conflicts. + */ +export function createPgStorage (): PgStorage { + const suffix = `${Date.now()}_${Math.random().toString(36).slice(2)}` + return new PgStorage({ + connectionString: process.env.DATABASE_URL ?? 'postgresql://postgres:postgres@localhost:5432/job_queue_test', + tablePrefix: `t${suffix}_` + }) +} diff --git a/test/pg-storage.test.ts b/test/pg-storage.test.ts new file mode 100644 index 0000000..babb12a --- /dev/null +++ b/test/pg-storage.test.ts @@ -0,0 +1,498 @@ +import assert from 'node:assert' +import { afterEach, beforeEach, describe, it } from 'node:test' +import { setTimeout as sleep } from 'node:timers/promises' +import { type PgStorage } from '../src/storage/pg.ts' +import { createPgStorage } from './fixtures/pg.ts' +import { promisifyCallback, waitForCallbacks } from './helpers/events.ts' + +describe('PgStorage', () => { + let storage: PgStorage + + beforeEach(async () => { + storage = createPgStorage() + await storage.connect() + }) + + afterEach(async () => { + await storage.clear() + await storage.disconnect() + }) + + describe('enqueue/dequeue', () => { + it('should enqueue and dequeue a job', async () => { + const message = Buffer.from(JSON.stringify({ id: 'job-1', payload: 'test' })) + const result = await storage.enqueue('job-1', message, Date.now()) + + assert.strictEqual(result, null, 'enqueue should return null for new job') + + const dequeued = await storage.dequeue('worker-1', 1) + assert.ok(dequeued, 'dequeue should return the message') + assert.deepStrictEqual(dequeued, message) + }) + + it('should return existing state for duplicate job', async () => { + const message = Buffer.from(JSON.stringify({ id: 'job-1', payload: 'test' })) + const timestamp = Date.now() + + await storage.enqueue('job-1', message, timestamp) + const result = await storage.enqueue('job-1', message, timestamp) + + assert.ok(result, 'should return existing state') + assert.ok(result.startsWith('queued:'), 'state should start with queued:') + }) + + it('should return null on dequeue timeout', async () => { + const result = await storage.dequeue('worker-1', 0.1) + assert.strictEqual(result, null) + }) + + it('should dequeue in FIFO order', async () => { + const msg1 = Buffer.from('msg-1') + const msg2 = Buffer.from('msg-2') + const msg3 = Buffer.from('msg-3') + + await storage.enqueue('job-1', msg1, Date.now()) + await storage.enqueue('job-2', msg2, Date.now()) + await storage.enqueue('job-3', msg3, Date.now()) + + const d1 = await storage.dequeue('worker-1', 1) + const d2 = await storage.dequeue('worker-1', 1) + const d3 = await storage.dequeue('worker-1', 1) + + assert.deepStrictEqual(d1, msg1) + assert.deepStrictEqual(d2, msg2) + assert.deepStrictEqual(d3, msg3) + }) + + it('should wake up dequeue waiter when job is enqueued', async () => { + // Start dequeue that will block + const dequeuePromise = storage.dequeue('worker-1', 5) + + // Give it time to register the waiter + await sleep(50) + + // Enqueue a job - should wake up the waiter via NOTIFY + const msg = Buffer.from('wakeup-test') + await storage.enqueue('job-1', msg, Date.now()) + + const result = await dequeuePromise + assert.deepStrictEqual(result, msg) + }) + }) + + describe('job state', () => { + it('should get and set job state', async () => { + await storage.enqueue('job-1', Buffer.from('test'), Date.now()) + await storage.setJobState('job-1', 'processing:123456:worker-1') + const state = await storage.getJobState('job-1') + + assert.strictEqual(state, 'processing:123456:worker-1') + }) + + it('should return null for non-existent job', async () => { + const state = await storage.getJobState('non-existent') + assert.strictEqual(state, null) + }) + + it('should delete job', async () => { + await storage.enqueue('job-1', Buffer.from('test'), Date.now()) + const deleted = await storage.deleteJob('job-1') + + assert.strictEqual(deleted, true) + assert.strictEqual(await storage.getJobState('job-1'), null) + }) + + it('should return false when deleting non-existent job', async () => { + const deleted = await storage.deleteJob('non-existent') + assert.strictEqual(deleted, false) + }) + + it('should get multiple job states', async () => { + await storage.enqueue('job-1', Buffer.from('a'), Date.now()) + await storage.setJobState('job-1', 'queued:1') + await storage.enqueue('job-2', Buffer.from('b'), Date.now()) + await storage.setJobState('job-2', 'processing:2') + + const states = await storage.getJobStates(['job-1', 'job-2', 'job-3']) + + assert.strictEqual(states.get('job-1'), 'queued:1') + assert.strictEqual(states.get('job-2'), 'processing:2') + assert.strictEqual(states.get('job-3'), null) + }) + }) + + describe('requeue', () => { + it('should move job from processing queue back to main queue', async () => { + const message = Buffer.from('requeue-test') + await storage.enqueue('job-1', message, Date.now()) + + // Dequeue to worker-1 + const dequeued = await storage.dequeue('worker-1', 1) + assert.deepStrictEqual(dequeued, message) + + // Verify processing queue has the job + const processing = await storage.getProcessingJobs('worker-1') + assert.strictEqual(processing.length, 1) + + // Requeue + await storage.requeue('job-1', message, 'worker-1') + + // Processing queue should be empty + const processingAfter = await storage.getProcessingJobs('worker-1') + assert.strictEqual(processingAfter.length, 0) + + // Should be able to dequeue again + const redequeued = await storage.dequeue('worker-2', 1) + assert.deepStrictEqual(redequeued, message) + }) + }) + + describe('ack', () => { + it('should remove job from processing queue', async () => { + const message = Buffer.from('ack-test') + await storage.enqueue('job-1', message, Date.now()) + + const dequeued = await storage.dequeue('worker-1', 1) + assert.ok(dequeued) + + await storage.ack('job-1', message, 'worker-1') + + const processing = await storage.getProcessingJobs('worker-1') + assert.strictEqual(processing.length, 0) + }) + }) + + describe('results', () => { + it('should store and retrieve result', async () => { + const result = Buffer.from(JSON.stringify({ success: true })) + await storage.setResult('job-1', result, 60000) + + const retrieved = await storage.getResult('job-1') + assert.deepStrictEqual(retrieved, result) + }) + + it('should return null for non-existent result', async () => { + const result = await storage.getResult('non-existent') + assert.strictEqual(result, null) + }) + + it('should return null for expired result', async () => { + await storage.setResult('job-1', Buffer.from('short-lived'), 20) + await sleep(30) + + const result = await storage.getResult('job-1') + assert.strictEqual(result, null) + }) + }) + + describe('errors', () => { + it('should store and retrieve error', async () => { + const error = Buffer.from(JSON.stringify({ message: 'Something failed' })) + await storage.setError('job-1', error, 60000) + + const retrieved = await storage.getError('job-1') + assert.deepStrictEqual(retrieved, error) + }) + + it('should return null for non-existent error', async () => { + const error = await storage.getError('non-existent') + assert.strictEqual(error, null) + }) + }) + + describe('workers', () => { + it('should register and get workers', async () => { + await storage.registerWorker('worker-1', 60000) + await storage.registerWorker('worker-2', 60000) + + const workers = await storage.getWorkers() + assert.deepStrictEqual(workers.sort(), ['worker-1', 'worker-2']) + }) + + it('should unregister worker', async () => { + await storage.registerWorker('worker-1', 60000) + await storage.unregisterWorker('worker-1') + + const workers = await storage.getWorkers() + assert.deepStrictEqual(workers, []) + }) + + it('should not return expired workers', async () => { + await storage.registerWorker('worker-1', 20) + await sleep(30) + + const workers = await storage.getWorkers() + assert.deepStrictEqual(workers, []) + }) + }) + + describe('notifications', () => { + it('should notify on job completion', async () => { + const { value, unsubscribe } = await promisifyCallback(handler => + storage.subscribeToJob('job-1', handler)) + + await sleep(50) + + await storage.notifyJobComplete('job-1', 'completed') + + const notifiedStatus = await value + assert.strictEqual(notifiedStatus, 'completed') + + await unsubscribe() + }) + + it('should notify on job failure', async () => { + const { value, unsubscribe } = await promisifyCallback(handler => + storage.subscribeToJob('job-1', handler)) + + await sleep(50) + + await storage.notifyJobComplete('job-1', 'failed') + + const notifiedStatus = await value + assert.strictEqual(notifiedStatus, 'failed') + + await unsubscribe() + }) + }) + + describe('events', () => { + it('should emit events on state changes', async () => { + const events: Array<{ id: string; event: string }> = [] + const { callback, promise: eventsReceived } = waitForCallbacks(2) + + const unsubscribe = await storage.subscribeToEvents((id, event) => { + events.push({ id, event }) + callback() + }) + + await sleep(50) + + await storage.publishEvent('job-1', 'processing') + await storage.publishEvent('job-1', 'completed') + + await eventsReceived + + assert.deepStrictEqual(events, [ + { id: 'job-1', event: 'processing' }, + { id: 'job-1', event: 'completed' } + ]) + + await unsubscribe() + }) + + it('should emit queued event on enqueue', async () => { + const events: Array<{ id: string; event: string }> = [] + const { callback, promise: eventReceived } = waitForCallbacks(1) + + const unsubscribe = await storage.subscribeToEvents((id, event) => { + events.push({ id, event }) + callback() + }) + + await sleep(50) + + await storage.enqueue('job-1', Buffer.from('test'), Date.now()) + + await eventReceived + + assert.deepStrictEqual(events, [{ id: 'job-1', event: 'queued' }]) + + await unsubscribe() + }) + }) + + describe('atomic operations', () => { + it('should complete job atomically', async () => { + const message = Buffer.from('complete-test') + const result = Buffer.from(JSON.stringify({ success: true })) + + await storage.enqueue('job-1', message, Date.now()) + await storage.dequeue('worker-1', 1) + await storage.setJobState('job-1', 'processing:123:worker-1') + + const { value: notificationReceived, unsubscribe } = await promisifyCallback(handler => + storage.subscribeToJob('job-1', handler)) + + await sleep(50) + + await storage.completeJob('job-1', message, 'worker-1', result, 60000) + + await notificationReceived + + // Verify state + const state = await storage.getJobState('job-1') + assert.ok(state?.startsWith('completed:')) + + // Verify result stored + const storedResult = await storage.getResult('job-1') + assert.deepStrictEqual(storedResult, result) + + // Verify removed from processing queue + const processing = await storage.getProcessingJobs('worker-1') + assert.strictEqual(processing.length, 0) + + await unsubscribe() + }) + + it('should fail job atomically', async () => { + const message = Buffer.from('fail-test') + const error = Buffer.from(JSON.stringify({ message: 'Error' })) + + await storage.enqueue('job-1', message, Date.now()) + await storage.dequeue('worker-1', 1) + await storage.setJobState('job-1', 'processing:123:worker-1') + + const { value: notificationReceived, unsubscribe } = await promisifyCallback(handler => + storage.subscribeToJob('job-1', handler)) + + await sleep(50) + + await storage.failJob('job-1', message, 'worker-1', error, 60000) + + const notifiedStatus = await notificationReceived + + // Verify state + const state = await storage.getJobState('job-1') + assert.ok(state?.startsWith('failed:')) + + // Verify error stored + const storedError = await storage.getError('job-1') + assert.deepStrictEqual(storedError, error) + + // Verify notification + assert.strictEqual(notifiedStatus, 'failed') + + await unsubscribe() + }) + + it('should retry job atomically', async () => { + const message = Buffer.from(JSON.stringify({ id: 'job-1', payload: 'test', attempts: 0 })) + + await storage.enqueue('job-1', message, Date.now()) + await storage.dequeue('worker-1', 1) + await storage.setJobState('job-1', 'processing:123:worker-1') + + const updatedMessage = Buffer.from(JSON.stringify({ id: 'job-1', payload: 'test', attempts: 1 })) + await storage.retryJob('job-1', updatedMessage, 'worker-1', 1) + + // Verify state + const state = await storage.getJobState('job-1') + assert.ok(state?.startsWith('failing:')) + assert.ok(state?.endsWith(':1')) + + // Verify job is back in queue + const dequeued = await storage.dequeue('worker-2', 1) + assert.deepStrictEqual(dequeued, updatedMessage) + }) + }) + + describe('leader election', () => { + it('should acquire lock when no lock exists', async () => { + const acquired = await storage.acquireLeaderLock('test-lock', 'owner-1', 60000) + assert.strictEqual(acquired, true) + }) + + it('should fail to acquire lock held by another', async () => { + await storage.acquireLeaderLock('test-lock', 'owner-1', 60000) + const acquired = await storage.acquireLeaderLock('test-lock', 'owner-2', 60000) + assert.strictEqual(acquired, false) + }) + + it('should acquire lock when expired', async () => { + await storage.acquireLeaderLock('test-lock', 'owner-1', 10) + await sleep(20) + + const acquired = await storage.acquireLeaderLock('test-lock', 'owner-2', 60000) + assert.strictEqual(acquired, true) + }) + + it('should renew lock by same owner', async () => { + await storage.acquireLeaderLock('test-lock', 'owner-1', 60000) + const renewed = await storage.renewLeaderLock('test-lock', 'owner-1', 60000) + assert.strictEqual(renewed, true) + }) + + it('should fail to renew lock by different owner', async () => { + await storage.acquireLeaderLock('test-lock', 'owner-1', 60000) + const renewed = await storage.renewLeaderLock('test-lock', 'owner-2', 60000) + assert.strictEqual(renewed, false) + }) + + it('should release lock by same owner', async () => { + await storage.acquireLeaderLock('test-lock', 'owner-1', 60000) + const released = await storage.releaseLeaderLock('test-lock', 'owner-1') + assert.strictEqual(released, true) + + // Should be acquirable again + const acquired = await storage.acquireLeaderLock('test-lock', 'owner-2', 60000) + assert.strictEqual(acquired, true) + }) + + it('should fail to release lock by different owner', async () => { + await storage.acquireLeaderLock('test-lock', 'owner-1', 60000) + const released = await storage.releaseLeaderLock('test-lock', 'owner-2') + assert.strictEqual(released, false) + }) + }) + + describe('dedup expiry', () => { + it('should allow re-enqueue after dedup expiry', async () => { + await storage.enqueue('job-1', Buffer.from('first'), Date.now()) + await storage.setJobExpiry('job-1', 20) + + await sleep(30) + + const result = await storage.enqueue('job-1', Buffer.from('second'), Date.now()) + assert.strictEqual(result, null, 'should allow re-enqueue after expiry') + }) + + it('should block re-enqueue within TTL window', async () => { + await storage.enqueue('job-1', Buffer.from('first'), Date.now()) + await storage.setJobExpiry('job-1', 60000) + + const result = await storage.enqueue('job-1', Buffer.from('second'), Date.now()) + assert.ok(result, 'should block re-enqueue within TTL') + }) + }) + + describe('clear', () => { + it('should clear all data', async () => { + await storage.enqueue('job-1', Buffer.from('test'), Date.now()) + await storage.setResult('job-1', Buffer.from('result'), 60000) + await storage.registerWorker('worker-1', 60000) + + await storage.clear() + + assert.strictEqual(await storage.getJobState('job-1'), null) + assert.strictEqual(await storage.getResult('job-1'), null) + assert.deepStrictEqual(await storage.getWorkers(), []) + }) + }) + + describe('namespace', () => { + it('should isolate data between namespaces', async () => { + const ns1 = storage.createNamespace('emails') as PgStorage + const ns2 = storage.createNamespace('images') as PgStorage + + await ns1.connect() + await ns2.connect() + + try { + await ns1.enqueue('job-1', Buffer.from('email-data'), Date.now()) + await ns2.enqueue('job-1', Buffer.from('image-data'), Date.now()) + + const d1 = await ns1.dequeue('worker-1', 1) + const d2 = await ns2.dequeue('worker-1', 1) + + assert.deepStrictEqual(d1, Buffer.from('email-data')) + assert.deepStrictEqual(d2, Buffer.from('image-data')) + } finally { + await ns1.clear() + await ns2.clear() + await ns2.disconnect() + await ns1.disconnect() + } + }) + }) +}) From d1926e6973a9126c739aa538ec3c63ae58d45322 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 16 Mar 2026 14:33:42 +0100 Subject: [PATCH 3/6] ci: Add postgres service and update lockfile Add postgres service container for PgStorage tests in CI. Update pnpm-lock.yaml with pg optional dependency. Co-Authored-By: Claude Opus 4.6 --- .github/workflows/ci.yml | 13 ++++ pnpm-lock.yaml | 124 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 73f23f5..652594b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,6 +29,19 @@ jobs: --health-interval 1s --health-timeout 3s --health-retries 5 + postgres: + image: postgres:17-alpine + ports: + - 5432:5432 + env: + POSTGRES_DB: job_queue_test + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd "pg_isready -U postgres" + --health-interval 1s + --health-timeout 3s + --health-retries 5 steps: - name: Checkout uses: actions/checkout@v4 diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1f14614..2186b03 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -49,6 +49,9 @@ importers: iovalkey: specifier: ^0.2.0 version: 0.2.2 + pg: + specifier: ^8.20.0 + version: 8.20.0 packages: @@ -341,41 +344,49 @@ packages: resolution: {integrity: sha512-34gw7PjDGB9JgePJEmhEqBhWvCiiWCuXsL9hYphDF7crW7UgI05gyBAi6MF58uGcMOiOqSJ2ybEeCvHcq0BCmQ==} cpu: [arm64] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-arm64-musl@1.11.1': resolution: {integrity: sha512-RyMIx6Uf53hhOtJDIamSbTskA99sPHS96wxVE/bJtePJJtpdKGXO1wY90oRdXuYOGOTuqjT8ACccMc4K6QmT3w==} cpu: [arm64] os: [linux] + libc: [musl] '@unrs/resolver-binding-linux-ppc64-gnu@1.11.1': resolution: {integrity: sha512-D8Vae74A4/a+mZH0FbOkFJL9DSK2R6TFPC9M+jCWYia/q2einCubX10pecpDiTmkJVUH+y8K3BZClycD8nCShA==} cpu: [ppc64] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-riscv64-gnu@1.11.1': resolution: {integrity: sha512-frxL4OrzOWVVsOc96+V3aqTIQl1O2TjgExV4EKgRY09AJ9leZpEg8Ak9phadbuX0BA4k8U5qtvMSQQGGmaJqcQ==} cpu: [riscv64] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-riscv64-musl@1.11.1': resolution: {integrity: sha512-mJ5vuDaIZ+l/acv01sHoXfpnyrNKOk/3aDoEdLO/Xtn9HuZlDD6jKxHlkN8ZhWyLJsRBxfv9GYM2utQ1SChKew==} cpu: [riscv64] os: [linux] + libc: [musl] '@unrs/resolver-binding-linux-s390x-gnu@1.11.1': resolution: {integrity: sha512-kELo8ebBVtb9sA7rMe1Cph4QHreByhaZ2QEADd9NzIQsYNQpt9UkM9iqr2lhGr5afh885d/cB5QeTXSbZHTYPg==} cpu: [s390x] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-x64-gnu@1.11.1': resolution: {integrity: sha512-C3ZAHugKgovV5YvAMsxhq0gtXuwESUKc5MhEtjBpLoHPLYM+iuwSj3lflFwK3DPm68660rZ7G8BMcwSro7hD5w==} cpu: [x64] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-x64-musl@1.11.1': resolution: {integrity: sha512-rV0YSoyhK2nZ4vEswT/QwqzqQXw5I6CjoaYMOX0TqBlWhojUf8P94mvI7nuJTeaCkkds3QE4+zS8Ko+GdXuZtA==} cpu: [x64] os: [linux] + libc: [musl] '@unrs/resolver-binding-wasm32-wasi@1.11.1': resolution: {integrity: sha512-5u4RkfxJm+Ng7IWgkzi3qrFOvLvQYnPBmjmZQ8+szTK/b31fQCnleNl1GgEt7nIsZRIf5PLhPwT0WM+q45x/UQ==} @@ -1301,6 +1312,40 @@ packages: resolution: {integrity: sha512-5UmUtvuCv3KzBX2NuQw2uF28o0t8Eq4KkPRZfUCzJs+DiNVKw7OaYn29vNDgrt/Pggs23CPlSTqgzlhHJfpT0A==} engines: {node: '>=18.6.0', typescript: '>=5.8'} + pg-cloudflare@1.3.0: + resolution: {integrity: sha512-6lswVVSztmHiRtD6I8hw4qP/nDm1EJbKMRhf3HCYaqud7frGysPv7FYJ5noZQdhQtN2xJnimfMtvQq21pdbzyQ==} + + pg-connection-string@2.12.0: + resolution: {integrity: sha512-U7qg+bpswf3Cs5xLzRqbXbQl85ng0mfSV/J0nnA31MCLgvEaAo7CIhmeyrmJpOr7o+zm0rXK+hNnT5l9RHkCkQ==} + + pg-int8@1.0.1: + resolution: {integrity: sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==} + engines: {node: '>=4.0.0'} + + pg-pool@3.13.0: + resolution: {integrity: sha512-gB+R+Xud1gLFuRD/QgOIgGOBE2KCQPaPwkzBBGC9oG69pHTkhQeIuejVIk3/cnDyX39av2AxomQiyPT13WKHQA==} + peerDependencies: + pg: '>=8.0' + + pg-protocol@1.13.0: + resolution: {integrity: sha512-zzdvXfS6v89r6v7OcFCHfHlyG/wvry1ALxZo4LqgUoy7W9xhBDMaqOuMiF3qEV45VqsN6rdlcehHrfDtlCPc8w==} + + pg-types@2.2.0: + resolution: {integrity: sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==} + engines: {node: '>=4'} + + pg@8.20.0: + resolution: {integrity: sha512-ldhMxz2r8fl/6QkXnBD3CR9/xg694oT6DZQ2s6c/RI28OjtSOpxnPrUCGOBJ46RCUxcWdx3p6kw/xnDHjKvaRA==} + engines: {node: '>= 16.0.0'} + peerDependencies: + pg-native: '>=3.0.1' + peerDependenciesMeta: + pg-native: + optional: true + + pgpass@1.0.5: + resolution: {integrity: sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==} + picomatch@4.0.3: resolution: {integrity: sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==} engines: {node: '>=12'} @@ -1319,6 +1364,22 @@ packages: resolution: {integrity: sha512-/+5VFTchJDoVj3bhoqi6UeymcD00DAwb1nJwamzPvHEszJ4FpF6SNNbUbOS8yI56qHzdV8eK0qEfOSiodkTdxg==} engines: {node: '>= 0.4'} + postgres-array@2.0.0: + resolution: {integrity: sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==} + engines: {node: '>=4'} + + postgres-bytea@1.0.1: + resolution: {integrity: sha512-5+5HqXnsZPE65IJZSMkZtURARZelel2oXUEO8rH83VS/hxH5vv1uHquPg5wZs8yMAfdv971IU+kcPUczi7NVBQ==} + engines: {node: '>=0.10.0'} + + postgres-date@1.0.7: + resolution: {integrity: sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==} + engines: {node: '>=0.10.0'} + + postgres-interval@1.2.0: + resolution: {integrity: sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==} + engines: {node: '>=0.10.0'} + pprof-format@2.2.1: resolution: {integrity: sha512-p4tVN7iK19ccDqQv8heyobzUmbHyds4N2FI6aBMcXz6y99MglTWDxIyhFkNaLeEXs6IFUEzT0zya0icbSLLY0g==} @@ -1700,6 +1761,10 @@ packages: resolution: {integrity: sha512-si7QWI6zUMq56bESFvagtmzMdGOtoxfR+Sez11Mobfc7tm+VkUckk9bW2UeffTGVUbOksxmSw0AA2gs8g71NCQ==} engines: {node: '>=12'} + xtend@4.0.2: + resolution: {integrity: sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==} + engines: {node: '>=0.4'} + y18n@5.0.8: resolution: {integrity: sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA==} engines: {node: '>=10'} @@ -3161,6 +3226,48 @@ snapshots: peowly@1.3.3: {} + pg-cloudflare@1.3.0: + optional: true + + pg-connection-string@2.12.0: + optional: true + + pg-int8@1.0.1: + optional: true + + pg-pool@3.13.0(pg@8.20.0): + dependencies: + pg: 8.20.0 + optional: true + + pg-protocol@1.13.0: + optional: true + + pg-types@2.2.0: + dependencies: + pg-int8: 1.0.1 + postgres-array: 2.0.0 + postgres-bytea: 1.0.1 + postgres-date: 1.0.7 + postgres-interval: 1.2.0 + optional: true + + pg@8.20.0: + dependencies: + pg-connection-string: 2.12.0 + pg-pool: 3.13.0(pg@8.20.0) + pg-protocol: 1.13.0 + pg-types: 2.2.0 + pgpass: 1.0.5 + optionalDependencies: + pg-cloudflare: 1.3.0 + optional: true + + pgpass@1.0.5: + dependencies: + split2: 4.2.0 + optional: true + picomatch@4.0.3: {} pino-abstract-transport@3.0.0: @@ -3185,6 +3292,20 @@ snapshots: possible-typed-array-names@1.1.0: {} + postgres-array@2.0.0: + optional: true + + postgres-bytea@1.0.1: + optional: true + + postgres-date@1.0.7: + optional: true + + postgres-interval@1.2.0: + dependencies: + xtend: 4.0.2 + optional: true + pprof-format@2.2.1: {} pprof-to-md@0.1.0: @@ -3671,6 +3792,9 @@ snapshots: string-width: 5.1.2 strip-ansi: 7.1.2 + xtend@4.0.2: + optional: true + y18n@5.0.8: {} yargs-parser@21.1.1: {} From f7f4328f5cbb2a19dfc52a01f553343df9a2a8f9 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 16 Mar 2026 18:55:08 +0100 Subject: [PATCH 4/6] fix: Fix TypeScript error in PgClient interface handler types Co-Authored-By: Claude Opus 4.6 --- src/storage/pg.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage/pg.ts b/src/storage/pg.ts index 7a4b655..55add40 100644 --- a/src/storage/pg.ts +++ b/src/storage/pg.ts @@ -33,8 +33,8 @@ interface PgClient { query (text: string, values?: unknown[]): Promise connect (): Promise end (): Promise - on (event: string, handler: (...args: unknown[]) => void): void - off (event: string, handler: (...args: unknown[]) => void): void + on (event: string, handler: (...args: any[]) => void): void + off (event: string, handler: (...args: any[]) => void): void } interface PgNotification { From cd6fb5a6236c2a0eab169b5bae6313166e1135b8 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 17 Mar 2026 09:46:21 +0100 Subject: [PATCH 5/6] style: Fix prettier and eslint formatting Co-Authored-By: Claude Opus 4.6 --- src/queue.ts | 4 +- src/reaper.ts | 4 +- src/storage/pg.ts | 207 ++++++++++++++++++---------------------------- 3 files changed, 84 insertions(+), 131 deletions(-) diff --git a/src/queue.ts b/src/queue.ts index cc2f4f2..1b65ebf 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -44,9 +44,7 @@ export class Queue extends EventEmitter) { super() - this.#storage = config.name - ? config.storage.createNamespace(config.name) - : config.storage + this.#storage = config.name ? config.storage.createNamespace(config.name) : config.storage this.#workerId = config.workerId ?? randomUUID() this.#payloadSerde = config.payloadSerde ?? createJsonSerde() this.#resultSerde = config.resultSerde ?? createJsonSerde() diff --git a/src/reaper.ts b/src/reaper.ts index bc30af3..9460fe6 100644 --- a/src/reaper.ts +++ b/src/reaper.ts @@ -63,9 +63,7 @@ export class Reaper extends EventEmitter { constructor (config: ReaperConfig) { super() - this.#storage = config.name - ? config.storage.createNamespace(config.name) - : config.storage + this.#storage = config.name ? config.storage.createNamespace(config.name) : config.storage this.#payloadSerde = config.payloadSerde ?? createJsonSerde() this.#visibilityTimeout = config.visibilityTimeout ?? 30000 this.#leaderElection = config.leaderElection ?? { enabled: false } diff --git a/src/storage/pg.ts b/src/storage/pg.ts index 55add40..8865e8e 100644 --- a/src/storage/pg.ts +++ b/src/storage/pg.ts @@ -102,7 +102,8 @@ export class PgStorage implements Storage { #newJobChannel: string constructor (config: PgStorageConfig = {}) { - this.#connectionString = config.connectionString ?? process.env.DATABASE_URL ?? 'postgresql://localhost:5432/job_queue' + this.#connectionString = + config.connectionString ?? process.env.DATABASE_URL ?? 'postgresql://localhost:5432/job_queue' this.#tablePrefix = config.tablePrefix ?? 'jq_' this.#logger = (config.logger ?? abstractLogger).child({ component: 'pg-storage', tablePrefix: this.#tablePrefix }) @@ -347,18 +348,20 @@ export class PgStorage implements Storage { // so concurrent fulfillment is safe const waiters = this.#dequeueWaiters.splice(0) for (const waiter of waiters) { - this.#tryDequeue(waiter.workerId).then(result => { - clearTimeout(waiter.timeoutId) - if (result) { - waiter.resolve(result) - } else { - // No job available, put waiter back + this.#tryDequeue(waiter.workerId) + .then(result => { + clearTimeout(waiter.timeoutId) + if (result) { + waiter.resolve(result) + } else { + // No job available, put waiter back + this.#dequeueWaiters.push(waiter) + } + }) + .catch(() => { + // On error, put waiter back this.#dequeueWaiters.push(waiter) - } - }).catch(() => { - // On error, put waiter back - this.#dequeueWaiters.push(waiter) - }) + }) } } @@ -393,22 +396,16 @@ export class PgStorage implements Storage { } // Insert new job - await client.query( - `INSERT INTO "${this.#jobsTable}" (id, state) VALUES ($1, $2)`, - [id, state] - ) + await client.query(`INSERT INTO "${this.#jobsTable}" (id, state) VALUES ($1, $2)`, [id, state]) // Push to queue - await client.query( - `INSERT INTO "${this.#queueTable}" (message) VALUES ($1)`, - [message] - ) + await client.query(`INSERT INTO "${this.#queueTable}" (message) VALUES ($1)`, [message]) await client.query('COMMIT') // Publish event and notify (outside transaction so NOTIFY fires immediately) await this.publishEvent(id, 'queued') - await this.#pool!.query(`SELECT pg_notify($1, '')`, [this.#newJobChannel]) + await this.#pool!.query("SELECT pg_notify($1, '')", [this.#newJobChannel]) return null } catch (err) { @@ -440,7 +437,8 @@ export class PgStorage implements Storage { async #tryDequeue (workerId: string): Promise { // Atomic: delete from queue (SKIP LOCKED) + insert into processing - const result = await this.#pool!.query(` + const result = await this.#pool!.query( + ` WITH deleted AS ( DELETE FROM "${this.#queueTable}" WHERE seq = ( @@ -454,7 +452,9 @@ export class PgStorage implements Storage { INSERT INTO "${this.#processingTable}" (worker_id, message) SELECT $1, message FROM deleted RETURNING message - `, [workerId]) + `, + [workerId] + ) if (result.rows.length === 0) return null return result.rows[0].message as Buffer @@ -466,22 +466,19 @@ export class PgStorage implements Storage { await client.query('BEGIN') // Remove from processing queue - await client.query( - `DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, - [workerId, message] - ) + await client.query(`DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, [ + workerId, + message + ]) // Add to front of main queue (use seq = 0 trick won't work since BIGSERIAL auto-increments) // Just insert normally - ordering is by seq which is auto-increment - await client.query( - `INSERT INTO "${this.#queueTable}" (message) VALUES ($1)`, - [message] - ) + await client.query(`INSERT INTO "${this.#queueTable}" (message) VALUES ($1)`, [message]) await client.query('COMMIT') // Notify waiters - await this.#pool!.query(`SELECT pg_notify($1, '')`, [this.#newJobChannel]) + await this.#pool!.query("SELECT pg_notify($1, '')", [this.#newJobChannel]) } catch (err) { await client.query('ROLLBACK').catch(() => {}) throw err @@ -491,10 +488,10 @@ export class PgStorage implements Storage { } async ack (id: string, message: Buffer, workerId: string): Promise { - await this.#pool!.query( - `DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, - [workerId, message] - ) + await this.#pool!.query(`DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, [ + workerId, + message + ]) } // ═══════════════════════════════════════════════════════════════════ @@ -502,10 +499,7 @@ export class PgStorage implements Storage { // ═══════════════════════════════════════════════════════════════════ async getJobState (id: string): Promise { - const result = await this.#pool!.query( - `SELECT state, expires_at FROM "${this.#jobsTable}" WHERE id = $1`, - [id] - ) + const result = await this.#pool!.query(`SELECT state, expires_at FROM "${this.#jobsTable}" WHERE id = $1`, [id]) if (result.rows.length === 0) return null @@ -520,17 +514,11 @@ export class PgStorage implements Storage { } async setJobState (id: string, state: string): Promise { - await this.#pool!.query( - `UPDATE "${this.#jobsTable}" SET state = $2 WHERE id = $1`, - [id, state] - ) + await this.#pool!.query(`UPDATE "${this.#jobsTable}" SET state = $2 WHERE id = $1`, [id, state]) } async deleteJob (id: string): Promise { - const result = await this.#pool!.query( - `DELETE FROM "${this.#jobsTable}" WHERE id = $1`, - [id] - ) + const result = await this.#pool!.query(`DELETE FROM "${this.#jobsTable}" WHERE id = $1`, [id]) if (result.rowCount && result.rowCount > 0) { await this.publishEvent(id, 'cancelled') @@ -543,10 +531,9 @@ export class PgStorage implements Storage { const result = new Map() if (ids.length === 0) return result - const rows = await this.#pool!.query( - `SELECT id, state, expires_at FROM "${this.#jobsTable}" WHERE id = ANY($1)`, - [ids] - ) + const rows = await this.#pool!.query(`SELECT id, state, expires_at FROM "${this.#jobsTable}" WHERE id = ANY($1)`, [ + ids + ]) const now = Date.now() const found = new Set() @@ -567,10 +554,7 @@ export class PgStorage implements Storage { // Clean up expired entries if (expiredIds.length > 0) { - await this.#pool!.query( - `DELETE FROM "${this.#jobsTable}" WHERE id = ANY($1)`, - [expiredIds] - ) + await this.#pool!.query(`DELETE FROM "${this.#jobsTable}" WHERE id = ANY($1)`, [expiredIds]) } // Set null for IDs not found @@ -585,10 +569,7 @@ export class PgStorage implements Storage { async setJobExpiry (id: string, ttlMs: number): Promise { const expiresAt = Date.now() + ttlMs - await this.#pool!.query( - `UPDATE "${this.#jobsTable}" SET expires_at = $2 WHERE id = $1`, - [id, expiresAt] - ) + await this.#pool!.query(`UPDATE "${this.#jobsTable}" SET expires_at = $2 WHERE id = $1`, [id, expiresAt]) } // ═══════════════════════════════════════════════════════════════════ @@ -606,10 +587,7 @@ export class PgStorage implements Storage { } async getResult (id: string): Promise { - const result = await this.#pool!.query( - `SELECT data, expires_at FROM "${this.#resultsTable}" WHERE id = $1`, - [id] - ) + const result = await this.#pool!.query(`SELECT data, expires_at FROM "${this.#resultsTable}" WHERE id = $1`, [id]) if (result.rows.length === 0) return null @@ -633,10 +611,7 @@ export class PgStorage implements Storage { } async getError (id: string): Promise { - const result = await this.#pool!.query( - `SELECT data, expires_at FROM "${this.#errorsTable}" WHERE id = $1`, - [id] - ) + const result = await this.#pool!.query(`SELECT data, expires_at FROM "${this.#errorsTable}" WHERE id = $1`, [id]) if (result.rows.length === 0) return null @@ -669,29 +644,21 @@ export class PgStorage implements Storage { async unregisterWorker (workerId: string): Promise { if (!this.#pool) return - await this.#pool.query( - `DELETE FROM "${this.#workersTable}" WHERE worker_id = $1`, - [workerId] - ) - await this.#pool.query( - `DELETE FROM "${this.#processingTable}" WHERE worker_id = $1`, - [workerId] - ) + await this.#pool.query(`DELETE FROM "${this.#workersTable}" WHERE worker_id = $1`, [workerId]) + await this.#pool.query(`DELETE FROM "${this.#processingTable}" WHERE worker_id = $1`, [workerId]) } async getWorkers (): Promise { - const result = await this.#pool!.query( - `SELECT worker_id FROM "${this.#workersTable}" WHERE expires_at > $1`, - [Date.now()] - ) + const result = await this.#pool!.query(`SELECT worker_id FROM "${this.#workersTable}" WHERE expires_at > $1`, [ + Date.now() + ]) return result.rows.map(row => row.worker_id as string) } async getProcessingJobs (workerId: string): Promise { - const result = await this.#pool!.query( - `SELECT message FROM "${this.#processingTable}" WHERE worker_id = $1`, - [workerId] - ) + const result = await this.#pool!.query(`SELECT message FROM "${this.#processingTable}" WHERE worker_id = $1`, [ + workerId + ]) return result.rows.map(row => row.message as Buffer) } @@ -712,10 +679,7 @@ export class PgStorage implements Storage { } async notifyJobComplete (id: string, status: 'completed' | 'failed' | 'failing'): Promise { - await this.#pool!.query( - `SELECT pg_notify($1, $2)`, - [this.#notifyChannel, `${id}:${status}`] - ) + await this.#pool!.query('SELECT pg_notify($1, $2)', [this.#notifyChannel, `${id}:${status}`]) } // ═══════════════════════════════════════════════════════════════════ @@ -732,10 +696,7 @@ export class PgStorage implements Storage { } async publishEvent (id: string, event: string): Promise { - await this.#pool!.query( - `SELECT pg_notify($1, $2)`, - [this.#eventsChannel, `${id}:${event}`] - ) + await this.#pool!.query('SELECT pg_notify($1, $2)', [this.#eventsChannel, `${id}:${event}`]) } // ═══════════════════════════════════════════════════════════════════ @@ -752,10 +713,11 @@ export class PgStorage implements Storage { await client.query('BEGIN') // Set state to completed + dedup expiry - await client.query( - `UPDATE "${this.#jobsTable}" SET state = $2, expires_at = $3 WHERE id = $1`, - [id, state, expiresAt] - ) + await client.query(`UPDATE "${this.#jobsTable}" SET state = $2, expires_at = $3 WHERE id = $1`, [ + id, + state, + expiresAt + ]) // Store result await client.query( @@ -766,10 +728,10 @@ export class PgStorage implements Storage { ) // Remove from processing queue - await client.query( - `DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, - [workerId, message] - ) + await client.query(`DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, [ + workerId, + message + ]) await client.query('COMMIT') } catch (err) { @@ -794,10 +756,11 @@ export class PgStorage implements Storage { await client.query('BEGIN') // Set state to failed + dedup expiry - await client.query( - `UPDATE "${this.#jobsTable}" SET state = $2, expires_at = $3 WHERE id = $1`, - [id, state, expiresAt] - ) + await client.query(`UPDATE "${this.#jobsTable}" SET state = $2, expires_at = $3 WHERE id = $1`, [ + id, + state, + expiresAt + ]) // Store error await client.query( @@ -808,10 +771,10 @@ export class PgStorage implements Storage { ) // Remove from processing queue - await client.query( - `DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, - [workerId, message] - ) + await client.query(`DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, [ + workerId, + message + ]) await client.query('COMMIT') } catch (err) { @@ -851,24 +814,18 @@ export class PgStorage implements Storage { await client.query('BEGIN') // Set state to failing - await client.query( - `UPDATE "${this.#jobsTable}" SET state = $2 WHERE id = $1`, - [id, state] - ) + await client.query(`UPDATE "${this.#jobsTable}" SET state = $2 WHERE id = $1`, [id, state]) // Remove old message from processing queue if (oldMessage) { - await client.query( - `DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, - [workerId, oldMessage] - ) + await client.query(`DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, [ + workerId, + oldMessage + ]) } // Add new message to queue - await client.query( - `INSERT INTO "${this.#queueTable}" (message) VALUES ($1)`, - [message] - ) + await client.query(`INSERT INTO "${this.#queueTable}" (message) VALUES ($1)`, [message]) await client.query('COMMIT') } catch (err) { @@ -881,7 +838,7 @@ export class PgStorage implements Storage { // Notify after commit await this.notifyJobComplete(id, 'failing') await this.publishEvent(id, 'failing') - await this.#pool!.query(`SELECT pg_notify($1, '')`, [this.#newJobChannel]) + await this.#pool!.query("SELECT pg_notify($1, '')", [this.#newJobChannel]) } // ═══════════════════════════════════════════════════════════════════ @@ -916,10 +873,10 @@ export class PgStorage implements Storage { async releaseLeaderLock (lockKey: string, ownerId: string): Promise { if (!this.#pool) return false - const result = await this.#pool.query( - `DELETE FROM "${this.#locksTable}" WHERE lock_key = $1 AND owner_id = $2`, - [lockKey, ownerId] - ) + const result = await this.#pool.query(`DELETE FROM "${this.#locksTable}" WHERE lock_key = $1 AND owner_id = $2`, [ + lockKey, + ownerId + ]) return result.rowCount !== null && result.rowCount > 0 } From 4cd7123b51182616a1b4cc55db020f265710c83f Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 17 Mar 2026 11:20:56 +0100 Subject: [PATCH 6/6] fix: Only use refCount for namespace children, not direct callers Root connect/disconnect stays idempotent as before. RefCount only tracks namespace children so existing code that calls connect() multiple times followed by a single disconnect() still cleans up. Co-Authored-By: Claude Opus 4.6 --- src/storage/pg.ts | 9 ++++----- src/storage/redis.ts | 13 ++++++------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/storage/pg.ts b/src/storage/pg.ts index 8865e8e..42660fe 100644 --- a/src/storage/pg.ts +++ b/src/storage/pg.ts @@ -133,6 +133,7 @@ export class PgStorage implements Storage { // Namespace view: connect parent and share pool/listener if (this.#parentStorage) { if (this.#pool) return // already connected + this.#parentStorage.#refCount++ await this.#parentStorage.connect() this.#pool = this.#parentStorage.#pool @@ -152,8 +153,7 @@ export class PgStorage implements Storage { return } - // Root instance: create pool/listener if needed, increment ref count - this.#refCount++ + // Root instance: idempotent connect (no ref count for direct callers) if (this.#pool) return const pgModule = await loadOptionalDependency('pg', 'PgStorage') @@ -209,12 +209,11 @@ export class PgStorage implements Storage { this.#notifyEmitter.removeAllListeners() this.#eventSubscription = false - await this.#parentStorage.disconnect() + this.#parentStorage.#refCount-- return } - // Root instance - this.#refCount-- + // Root instance: only destroy when no namespace children are connected if (this.#refCount > 0) return if (this.#listener) { diff --git a/src/storage/redis.ts b/src/storage/redis.ts index 85c7996..f58448f 100644 --- a/src/storage/redis.ts +++ b/src/storage/redis.ts @@ -114,6 +114,7 @@ export class RedisStorage implements Storage { // Namespace view: connect parent and copy shared clients if (this.#parentStorage) { if (this.#client) return // already connected + this.#parentStorage.#refCount++ await this.#parentStorage.connect() // Copy shared clients and scripts from parent @@ -134,9 +135,8 @@ export class RedisStorage implements Storage { return } - // Root instance: create clients if needed, increment ref count - this.#refCount++ - if (this.#client) return // already connected, just increment ref count + // Root instance: idempotent connect (no ref count for direct callers) + if (this.#client) return const redisModule = await loadOptionalDependency<{ Redis: new (url: string) => Redis }>('iovalkey', 'RedisStorage') @@ -159,7 +159,7 @@ export class RedisStorage implements Storage { } async disconnect (): Promise { - // Namespace view: remove own handlers, null references, disconnect parent + // Namespace view: remove own handlers, null references, release parent ref if (this.#parentStorage) { if (this.#subscriber && this.#messageHandler) { this.#subscriber.off('message', this.#messageHandler) @@ -178,12 +178,11 @@ export class RedisStorage implements Storage { this.#notifyEmitter.removeAllListeners() this.#eventSubscription = false - await this.#parentStorage.disconnect() + this.#parentStorage.#refCount-- return } - // Root instance: decrement ref count, only destroy when 0 - this.#refCount-- + // Root instance: only destroy when no namespace children are connected if (this.#refCount > 0) return if (this.#subscriber) {