diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 73f23f5..652594b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,6 +29,19 @@ jobs: --health-interval 1s --health-timeout 3s --health-retries 5 + postgres: + image: postgres:17-alpine + ports: + - 5432:5432 + env: + POSTGRES_DB: job_queue_test + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd "pg_isready -U postgres" + --health-interval 1s + --health-timeout 3s + --health-retries 5 steps: - name: Checkout uses: actions/checkout@v4 diff --git a/docker-compose.yml b/docker-compose.yml index d8b8d01..2b7d3d1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,3 +18,17 @@ services: interval: 1s timeout: 3s retries: 5 + + postgres: + image: postgres:17-alpine + ports: + - "127.0.0.1:5432:5432" + environment: + POSTGRES_DB: job_queue_test + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 1s + timeout: 3s + retries: 5 diff --git a/package.json b/package.json index 0a5cfda..26b18c2 100644 --- a/package.json +++ b/package.json @@ -47,7 +47,8 @@ }, "optionalDependencies": { "fast-write-atomic": "^0.4.0", - "iovalkey": "^0.2.0" + "iovalkey": "^0.2.0", + "pg": "^8.20.0" }, "devDependencies": { "@platformatic/flame": "^1.6.0", @@ -64,4 +65,4 @@ "engines": { "node": ">=22.19.0" } -} \ No newline at end of file +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1f14614..2186b03 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -49,6 +49,9 @@ importers: iovalkey: specifier: ^0.2.0 version: 0.2.2 + pg: + specifier: ^8.20.0 + version: 8.20.0 packages: @@ -341,41 +344,49 @@ packages: resolution: {integrity: sha512-34gw7PjDGB9JgePJEmhEqBhWvCiiWCuXsL9hYphDF7crW7UgI05gyBAi6MF58uGcMOiOqSJ2ybEeCvHcq0BCmQ==} cpu: [arm64] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-arm64-musl@1.11.1': resolution: {integrity: sha512-RyMIx6Uf53hhOtJDIamSbTskA99sPHS96wxVE/bJtePJJtpdKGXO1wY90oRdXuYOGOTuqjT8ACccMc4K6QmT3w==} cpu: [arm64] os: [linux] + libc: [musl] '@unrs/resolver-binding-linux-ppc64-gnu@1.11.1': resolution: {integrity: sha512-D8Vae74A4/a+mZH0FbOkFJL9DSK2R6TFPC9M+jCWYia/q2einCubX10pecpDiTmkJVUH+y8K3BZClycD8nCShA==} cpu: [ppc64] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-riscv64-gnu@1.11.1': resolution: {integrity: sha512-frxL4OrzOWVVsOc96+V3aqTIQl1O2TjgExV4EKgRY09AJ9leZpEg8Ak9phadbuX0BA4k8U5qtvMSQQGGmaJqcQ==} cpu: [riscv64] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-riscv64-musl@1.11.1': resolution: {integrity: sha512-mJ5vuDaIZ+l/acv01sHoXfpnyrNKOk/3aDoEdLO/Xtn9HuZlDD6jKxHlkN8ZhWyLJsRBxfv9GYM2utQ1SChKew==} cpu: [riscv64] os: [linux] + libc: [musl] '@unrs/resolver-binding-linux-s390x-gnu@1.11.1': resolution: {integrity: sha512-kELo8ebBVtb9sA7rMe1Cph4QHreByhaZ2QEADd9NzIQsYNQpt9UkM9iqr2lhGr5afh885d/cB5QeTXSbZHTYPg==} cpu: [s390x] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-x64-gnu@1.11.1': resolution: {integrity: sha512-C3ZAHugKgovV5YvAMsxhq0gtXuwESUKc5MhEtjBpLoHPLYM+iuwSj3lflFwK3DPm68660rZ7G8BMcwSro7hD5w==} cpu: [x64] os: [linux] + libc: [glibc] '@unrs/resolver-binding-linux-x64-musl@1.11.1': resolution: {integrity: sha512-rV0YSoyhK2nZ4vEswT/QwqzqQXw5I6CjoaYMOX0TqBlWhojUf8P94mvI7nuJTeaCkkds3QE4+zS8Ko+GdXuZtA==} cpu: [x64] os: [linux] + libc: [musl] '@unrs/resolver-binding-wasm32-wasi@1.11.1': resolution: {integrity: sha512-5u4RkfxJm+Ng7IWgkzi3qrFOvLvQYnPBmjmZQ8+szTK/b31fQCnleNl1GgEt7nIsZRIf5PLhPwT0WM+q45x/UQ==} @@ -1301,6 +1312,40 @@ packages: resolution: {integrity: sha512-5UmUtvuCv3KzBX2NuQw2uF28o0t8Eq4KkPRZfUCzJs+DiNVKw7OaYn29vNDgrt/Pggs23CPlSTqgzlhHJfpT0A==} engines: {node: '>=18.6.0', typescript: '>=5.8'} + pg-cloudflare@1.3.0: + resolution: {integrity: sha512-6lswVVSztmHiRtD6I8hw4qP/nDm1EJbKMRhf3HCYaqud7frGysPv7FYJ5noZQdhQtN2xJnimfMtvQq21pdbzyQ==} + + pg-connection-string@2.12.0: + resolution: {integrity: sha512-U7qg+bpswf3Cs5xLzRqbXbQl85ng0mfSV/J0nnA31MCLgvEaAo7CIhmeyrmJpOr7o+zm0rXK+hNnT5l9RHkCkQ==} + + pg-int8@1.0.1: + resolution: {integrity: sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==} + engines: {node: '>=4.0.0'} + + pg-pool@3.13.0: + resolution: {integrity: sha512-gB+R+Xud1gLFuRD/QgOIgGOBE2KCQPaPwkzBBGC9oG69pHTkhQeIuejVIk3/cnDyX39av2AxomQiyPT13WKHQA==} + peerDependencies: + pg: '>=8.0' + + pg-protocol@1.13.0: + resolution: {integrity: sha512-zzdvXfS6v89r6v7OcFCHfHlyG/wvry1ALxZo4LqgUoy7W9xhBDMaqOuMiF3qEV45VqsN6rdlcehHrfDtlCPc8w==} + + pg-types@2.2.0: + resolution: {integrity: sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==} + engines: {node: '>=4'} + + pg@8.20.0: + resolution: {integrity: sha512-ldhMxz2r8fl/6QkXnBD3CR9/xg694oT6DZQ2s6c/RI28OjtSOpxnPrUCGOBJ46RCUxcWdx3p6kw/xnDHjKvaRA==} + engines: {node: '>= 16.0.0'} + peerDependencies: + pg-native: '>=3.0.1' + peerDependenciesMeta: + pg-native: + optional: true + + pgpass@1.0.5: + resolution: {integrity: sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==} + picomatch@4.0.3: resolution: {integrity: sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==} engines: {node: '>=12'} @@ -1319,6 +1364,22 @@ packages: resolution: {integrity: sha512-/+5VFTchJDoVj3bhoqi6UeymcD00DAwb1nJwamzPvHEszJ4FpF6SNNbUbOS8yI56qHzdV8eK0qEfOSiodkTdxg==} engines: {node: '>= 0.4'} + postgres-array@2.0.0: + resolution: {integrity: sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==} + engines: {node: '>=4'} + + postgres-bytea@1.0.1: + resolution: {integrity: sha512-5+5HqXnsZPE65IJZSMkZtURARZelel2oXUEO8rH83VS/hxH5vv1uHquPg5wZs8yMAfdv971IU+kcPUczi7NVBQ==} + engines: {node: '>=0.10.0'} + + postgres-date@1.0.7: + resolution: {integrity: sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==} + engines: {node: '>=0.10.0'} + + postgres-interval@1.2.0: + resolution: {integrity: sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==} + engines: {node: '>=0.10.0'} + pprof-format@2.2.1: resolution: {integrity: sha512-p4tVN7iK19ccDqQv8heyobzUmbHyds4N2FI6aBMcXz6y99MglTWDxIyhFkNaLeEXs6IFUEzT0zya0icbSLLY0g==} @@ -1700,6 +1761,10 @@ packages: resolution: {integrity: sha512-si7QWI6zUMq56bESFvagtmzMdGOtoxfR+Sez11Mobfc7tm+VkUckk9bW2UeffTGVUbOksxmSw0AA2gs8g71NCQ==} engines: {node: '>=12'} + xtend@4.0.2: + resolution: {integrity: sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==} + engines: {node: '>=0.4'} + y18n@5.0.8: resolution: {integrity: sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA==} engines: {node: '>=10'} @@ -3161,6 +3226,48 @@ snapshots: peowly@1.3.3: {} + pg-cloudflare@1.3.0: + optional: true + + pg-connection-string@2.12.0: + optional: true + + pg-int8@1.0.1: + optional: true + + pg-pool@3.13.0(pg@8.20.0): + dependencies: + pg: 8.20.0 + optional: true + + pg-protocol@1.13.0: + optional: true + + pg-types@2.2.0: + dependencies: + pg-int8: 1.0.1 + postgres-array: 2.0.0 + postgres-bytea: 1.0.1 + postgres-date: 1.0.7 + postgres-interval: 1.2.0 + optional: true + + pg@8.20.0: + dependencies: + pg-connection-string: 2.12.0 + pg-pool: 3.13.0(pg@8.20.0) + pg-protocol: 1.13.0 + pg-types: 2.2.0 + pgpass: 1.0.5 + optionalDependencies: + pg-cloudflare: 1.3.0 + optional: true + + pgpass@1.0.5: + dependencies: + split2: 4.2.0 + optional: true + picomatch@4.0.3: {} pino-abstract-transport@3.0.0: @@ -3185,6 +3292,20 @@ snapshots: possible-typed-array-names@1.1.0: {} + postgres-array@2.0.0: + optional: true + + postgres-bytea@1.0.1: + optional: true + + postgres-date@1.0.7: + optional: true + + postgres-interval@1.2.0: + dependencies: + xtend: 4.0.2 + optional: true + pprof-format@2.2.1: {} pprof-to-md@0.1.0: @@ -3671,6 +3792,9 @@ snapshots: string-width: 5.1.2 strip-ansi: 7.1.2 + xtend@4.0.2: + optional: true + y18n@5.0.8: {} yargs-parser@21.1.1: {} diff --git a/src/index.ts b/src/index.ts index a3f6e9e..e0071f7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -36,6 +36,7 @@ export type { Storage } from './storage/types.ts' export { MemoryStorage } from './storage/memory.ts' export { RedisStorage } from './storage/redis.ts' export { FileStorage } from './storage/file.ts' +export { PgStorage } from './storage/pg.ts' // Queue export { Queue } from './queue.ts' diff --git a/src/queue.ts b/src/queue.ts index 7067bc9..1b65ebf 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -44,7 +44,7 @@ export class Queue extends EventEmitter) { super() - this.#storage = config.storage + this.#storage = config.name ? config.storage.createNamespace(config.name) : config.storage this.#workerId = config.workerId ?? randomUUID() this.#payloadSerde = config.payloadSerde ?? createJsonSerde() this.#resultSerde = config.resultSerde ?? createJsonSerde() diff --git a/src/reaper.ts b/src/reaper.ts index 48486f0..9460fe6 100644 --- a/src/reaper.ts +++ b/src/reaper.ts @@ -17,6 +17,7 @@ interface LeaderElectionConfig { interface ReaperConfig { storage: Storage + name?: string payloadSerde?: Serde visibilityTimeout?: number leaderElection?: LeaderElectionConfig @@ -62,7 +63,7 @@ export class Reaper extends EventEmitter { constructor (config: ReaperConfig) { super() - this.#storage = config.storage + this.#storage = config.name ? config.storage.createNamespace(config.name) : config.storage this.#payloadSerde = config.payloadSerde ?? createJsonSerde() this.#visibilityTimeout = config.visibilityTimeout ?? 30000 this.#leaderElection = config.leaderElection ?? { enabled: false } diff --git a/src/storage/file.ts b/src/storage/file.ts index e84c6b2..3a20124 100644 --- a/src/storage/file.ts +++ b/src/storage/file.ts @@ -846,6 +846,10 @@ export class FileStorage implements Storage { } } + createNamespace (name: string): Storage { + return new FileStorage({ basePath: join(this.#basePath, name) }) + } + /** * Clear all data (useful for testing) */ diff --git a/src/storage/memory.ts b/src/storage/memory.ts index 035004f..9dd4b79 100644 --- a/src/storage/memory.ts +++ b/src/storage/memory.ts @@ -394,6 +394,10 @@ export class MemoryStorage implements Storage { } } + createNamespace (_name: string): Storage { + return new MemoryStorage() + } + /** * Clear all data (useful for testing) */ diff --git a/src/storage/pg.ts b/src/storage/pg.ts new file mode 100644 index 0000000..42660fe --- /dev/null +++ b/src/storage/pg.ts @@ -0,0 +1,911 @@ +import { EventEmitter } from 'node:events' +import type { Logger } from 'pino' +import type { Storage } from './types.ts' +import { loadOptionalDependency } from './utils.ts' +import { abstractLogger } from '../utils/logging.ts' + +// Minimal type definitions for pg (optional dependency) +interface PgPoolConfig { + connectionString?: string + max?: number + application_name?: string + [key: string]: unknown +} + +interface PgQueryResult { + rows: Record[] + rowCount: number | null +} + +interface PgPool { + query (text: string, values?: unknown[]): Promise + connect (): Promise + end (): Promise + on (event: string, handler: (...args: unknown[]) => void): void +} + +interface PgPoolClient { + query (text: string, values?: unknown[]): Promise + release (destroy?: boolean): void +} + +interface PgClient { + query (text: string, values?: unknown[]): Promise + connect (): Promise + end (): Promise + on (event: string, handler: (...args: any[]) => void): void + off (event: string, handler: (...args: any[]) => void): void +} + +interface PgNotification { + channel: string + payload?: string +} + +interface PgModule { + Pool: new (config: PgPoolConfig) => PgPool + Client: new (config: PgPoolConfig) => PgClient +} + +interface PgStorageConfig { + connectionString?: string + tablePrefix?: string + logger?: Logger +} + +interface DequeueWaiter { + workerId: string + resolve: (value: Buffer | null) => void + timeoutId: ReturnType +} + +/** + * PostgreSQL storage implementation + * + * Uses SELECT FOR UPDATE SKIP LOCKED for concurrent dequeue (pg-boss pattern) + * and LISTEN/NOTIFY for pub/sub notifications. + * + * Connection architecture: + * - Pool: for all queries (enqueue, state, results, etc.) + * - Dedicated listener client: stays in LISTEN mode for pub/sub + */ +export class PgStorage implements Storage { + #connectionString: string + #tablePrefix: string + #pool: PgPool | null = null + #listener: PgClient | null = null + #eventEmitter = new EventEmitter({ captureRejections: true }) + #notifyEmitter = new EventEmitter({ captureRejections: true }) + #eventSubscription: boolean = false + #logger: Logger + #dequeueWaiters: DequeueWaiter[] = [] + + // Namespace support + #parentStorage: PgStorage | null = null + #refCount: number = 0 + + // Notification handler (stored for removal on disconnect) + #notificationHandler: ((msg: PgNotification) => void) | null = null + + // Table names (computed from prefix) + #jobsTable: string + #queueTable: string + #processingTable: string + #resultsTable: string + #errorsTable: string + #workersTable: string + #locksTable: string + + // Channel names + #notifyChannel: string + #eventsChannel: string + #newJobChannel: string + + constructor (config: PgStorageConfig = {}) { + this.#connectionString = + config.connectionString ?? process.env.DATABASE_URL ?? 'postgresql://localhost:5432/job_queue' + this.#tablePrefix = config.tablePrefix ?? 'jq_' + this.#logger = (config.logger ?? abstractLogger).child({ component: 'pg-storage', tablePrefix: this.#tablePrefix }) + + // Compute table names + this.#jobsTable = `${this.#tablePrefix}jobs` + this.#queueTable = `${this.#tablePrefix}queue` + this.#processingTable = `${this.#tablePrefix}processing` + this.#resultsTable = `${this.#tablePrefix}results` + this.#errorsTable = `${this.#tablePrefix}errors` + this.#workersTable = `${this.#tablePrefix}workers` + this.#locksTable = `${this.#tablePrefix}locks` + + // Compute channel names (underscores only, safe for LISTEN) + this.#notifyChannel = `${this.#tablePrefix}notify` + this.#eventsChannel = `${this.#tablePrefix}events` + this.#newJobChannel = `${this.#tablePrefix}new_job` + + this.#eventEmitter.setMaxListeners(0) + this.#notifyEmitter.setMaxListeners(0) + } + + // ═══════════════════════════════════════════════════════════════════ + // LIFECYCLE + // ═══════════════════════════════════════════════════════════════════ + + async connect (): Promise { + // Namespace view: connect parent and share pool/listener + if (this.#parentStorage) { + if (this.#pool) return // already connected + this.#parentStorage.#refCount++ + await this.#parentStorage.connect() + + this.#pool = this.#parentStorage.#pool + this.#listener = this.#parentStorage.#listener + + // Create own tables + await this.#createSchema() + + // Register own notification handler on shared listener + this.#notificationHandler = (msg: PgNotification) => { + this.#handleNotification(msg) + } + this.#listener!.on('notification', this.#notificationHandler) + + // LISTEN on own channels + await this.#listenChannels() + return + } + + // Root instance: idempotent connect (no ref count for direct callers) + if (this.#pool) return + + const pgModule = await loadOptionalDependency('pg', 'PgStorage') + + this.#pool = new pgModule.Pool({ + connectionString: this.#connectionString, + application_name: 'job-queue' + }) + + this.#pool.on('error', (err: unknown) => { + this.#logger.error({ err }, 'Pool error.') + }) + + this.#listener = new pgModule.Client({ + connectionString: this.#connectionString, + application_name: 'job-queue-listener' + }) + await this.#listener.connect() + + // Create tables + await this.#createSchema() + + // Set up notification handler + this.#notificationHandler = (msg: PgNotification) => { + this.#handleNotification(msg) + } + this.#listener.on('notification', this.#notificationHandler) + + // LISTEN on channels + await this.#listenChannels() + } + + async disconnect (): Promise { + // Namespace view + if (this.#parentStorage) { + // UNLISTEN own channels + if (this.#listener) { + await this.#listener.query(`UNLISTEN "${this.#notifyChannel}"`).catch(() => {}) + await this.#listener.query(`UNLISTEN "${this.#eventsChannel}"`).catch(() => {}) + await this.#listener.query(`UNLISTEN "${this.#newJobChannel}"`).catch(() => {}) + + if (this.#notificationHandler) { + this.#listener.off('notification', this.#notificationHandler) + this.#notificationHandler = null + } + } + + this.#clearDequeueWaiters() + this.#pool = null + this.#listener = null + + this.#eventEmitter.removeAllListeners() + this.#notifyEmitter.removeAllListeners() + this.#eventSubscription = false + + this.#parentStorage.#refCount-- + return + } + + // Root instance: only destroy when no namespace children are connected + if (this.#refCount > 0) return + + if (this.#listener) { + if (this.#notificationHandler) { + this.#listener.off('notification', this.#notificationHandler) + this.#notificationHandler = null + } + await this.#listener.end().catch(() => {}) + this.#listener = null + } + + this.#clearDequeueWaiters() + + if (this.#pool) { + await this.#pool.end().catch(() => {}) + this.#pool = null + } + + this.#eventEmitter.removeAllListeners() + this.#notifyEmitter.removeAllListeners() + this.#eventSubscription = false + } + + async #createSchema (): Promise { + await this.#pool!.query(` + CREATE TABLE IF NOT EXISTS "${this.#jobsTable}" ( + id TEXT PRIMARY KEY, + state TEXT NOT NULL, + expires_at BIGINT + ) + `) + + await this.#pool!.query(` + CREATE TABLE IF NOT EXISTS "${this.#queueTable}" ( + seq BIGSERIAL PRIMARY KEY, + message BYTEA NOT NULL + ) + `) + + await this.#pool!.query(` + CREATE TABLE IF NOT EXISTS "${this.#processingTable}" ( + seq BIGSERIAL PRIMARY KEY, + worker_id TEXT NOT NULL, + message BYTEA NOT NULL + ) + `) + + // Index for worker lookups on processing table + await this.#pool!.query(` + CREATE INDEX IF NOT EXISTS "${this.#processingTable}_worker_idx" + ON "${this.#processingTable}" (worker_id) + `) + + await this.#pool!.query(` + CREATE TABLE IF NOT EXISTS "${this.#resultsTable}" ( + id TEXT PRIMARY KEY, + data BYTEA NOT NULL, + expires_at BIGINT NOT NULL + ) + `) + + await this.#pool!.query(` + CREATE TABLE IF NOT EXISTS "${this.#errorsTable}" ( + id TEXT PRIMARY KEY, + data BYTEA NOT NULL, + expires_at BIGINT NOT NULL + ) + `) + + await this.#pool!.query(` + CREATE TABLE IF NOT EXISTS "${this.#workersTable}" ( + worker_id TEXT PRIMARY KEY, + expires_at BIGINT NOT NULL + ) + `) + + await this.#pool!.query(` + CREATE TABLE IF NOT EXISTS "${this.#locksTable}" ( + lock_key TEXT PRIMARY KEY, + owner_id TEXT NOT NULL, + expires_at BIGINT NOT NULL + ) + `) + } + + async #listenChannels (): Promise { + await this.#listener!.query(`LISTEN "${this.#notifyChannel}"`) + await this.#listener!.query(`LISTEN "${this.#eventsChannel}"`) + await this.#listener!.query(`LISTEN "${this.#newJobChannel}"`) + } + + #handleNotification (msg: PgNotification): void { + if (msg.channel === this.#newJobChannel) { + this.#notifyDequeueWaiters() + return + } + + if (msg.channel === this.#notifyChannel && msg.payload) { + // payload format: jobId:status + const sepIdx = msg.payload.indexOf(':') + if (sepIdx > 0) { + const jobId = msg.payload.substring(0, sepIdx) + const status = msg.payload.substring(sepIdx + 1) + this.#notifyEmitter.emit(`notify:${jobId}`, status as 'completed' | 'failed' | 'failing') + } + return + } + + if (msg.channel === this.#eventsChannel && msg.payload) { + // payload format: id:event + const sepIdx = msg.payload.indexOf(':') + if (sepIdx > 0) { + const id = msg.payload.substring(0, sepIdx) + const event = msg.payload.substring(sepIdx + 1) + this.#eventEmitter.emit('event', id, event) + } + } + } + + #clearDequeueWaiters (): void { + for (const waiter of this.#dequeueWaiters) { + clearTimeout(waiter.timeoutId) + waiter.resolve(null) + } + this.#dequeueWaiters = [] + } + + #notifyDequeueWaiters (): void { + // Try to fulfill waiting dequeue calls + // Each waiter will do its own SELECT FOR UPDATE SKIP LOCKED, + // so concurrent fulfillment is safe + const waiters = this.#dequeueWaiters.splice(0) + for (const waiter of waiters) { + this.#tryDequeue(waiter.workerId) + .then(result => { + clearTimeout(waiter.timeoutId) + if (result) { + waiter.resolve(result) + } else { + // No job available, put waiter back + this.#dequeueWaiters.push(waiter) + } + }) + .catch(() => { + // On error, put waiter back + this.#dequeueWaiters.push(waiter) + }) + } + } + + // ═══════════════════════════════════════════════════════════════════ + // QUEUE OPERATIONS + // ═══════════════════════════════════════════════════════════════════ + + async enqueue (id: string, message: Buffer, timestamp: number): Promise { + const state = `queued:${timestamp}` + const now = Date.now() + + const client = await this.#pool!.connect() + try { + await client.query('BEGIN') + + // Check for existing job (with dedup expiry handling) + const existing = await client.query( + `SELECT state, expires_at FROM "${this.#jobsTable}" WHERE id = $1 FOR UPDATE`, + [id] + ) + + if (existing.rows.length > 0) { + const row = existing.rows[0] + const expiresAt = row.expires_at as number | null + if (expiresAt && now >= expiresAt) { + // Expired: remove and fall through to enqueue as new + await client.query(`DELETE FROM "${this.#jobsTable}" WHERE id = $1`, [id]) + } else { + await client.query('COMMIT') + return row.state as string + } + } + + // Insert new job + await client.query(`INSERT INTO "${this.#jobsTable}" (id, state) VALUES ($1, $2)`, [id, state]) + + // Push to queue + await client.query(`INSERT INTO "${this.#queueTable}" (message) VALUES ($1)`, [message]) + + await client.query('COMMIT') + + // Publish event and notify (outside transaction so NOTIFY fires immediately) + await this.publishEvent(id, 'queued') + await this.#pool!.query("SELECT pg_notify($1, '')", [this.#newJobChannel]) + + return null + } catch (err) { + await client.query('ROLLBACK').catch(() => {}) + throw err + } finally { + client.release() + } + } + + async dequeue (workerId: string, timeout: number): Promise { + // Try immediate fetch + const result = await this.#tryDequeue(workerId) + if (result) return result + + // Wait for NOTIFY or timeout + return new Promise(resolve => { + const timeoutId = setTimeout(() => { + const index = this.#dequeueWaiters.findIndex(w => w.resolve === resolve) + if (index !== -1) { + this.#dequeueWaiters.splice(index, 1) + } + resolve(null) + }, timeout * 1000) + + this.#dequeueWaiters.push({ workerId, resolve, timeoutId }) + }) + } + + async #tryDequeue (workerId: string): Promise { + // Atomic: delete from queue (SKIP LOCKED) + insert into processing + const result = await this.#pool!.query( + ` + WITH deleted AS ( + DELETE FROM "${this.#queueTable}" + WHERE seq = ( + SELECT seq FROM "${this.#queueTable}" + ORDER BY seq + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + RETURNING message + ) + INSERT INTO "${this.#processingTable}" (worker_id, message) + SELECT $1, message FROM deleted + RETURNING message + `, + [workerId] + ) + + if (result.rows.length === 0) return null + return result.rows[0].message as Buffer + } + + async requeue (id: string, message: Buffer, workerId: string): Promise { + const client = await this.#pool!.connect() + try { + await client.query('BEGIN') + + // Remove from processing queue + await client.query(`DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, [ + workerId, + message + ]) + + // Add to front of main queue (use seq = 0 trick won't work since BIGSERIAL auto-increments) + // Just insert normally - ordering is by seq which is auto-increment + await client.query(`INSERT INTO "${this.#queueTable}" (message) VALUES ($1)`, [message]) + + await client.query('COMMIT') + + // Notify waiters + await this.#pool!.query("SELECT pg_notify($1, '')", [this.#newJobChannel]) + } catch (err) { + await client.query('ROLLBACK').catch(() => {}) + throw err + } finally { + client.release() + } + } + + async ack (id: string, message: Buffer, workerId: string): Promise { + await this.#pool!.query(`DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, [ + workerId, + message + ]) + } + + // ═══════════════════════════════════════════════════════════════════ + // JOB STATE + // ═══════════════════════════════════════════════════════════════════ + + async getJobState (id: string): Promise { + const result = await this.#pool!.query(`SELECT state, expires_at FROM "${this.#jobsTable}" WHERE id = $1`, [id]) + + if (result.rows.length === 0) return null + + const row = result.rows[0] + const expiresAt = row.expires_at as number | null + if (expiresAt && Date.now() >= expiresAt) { + await this.#pool!.query(`DELETE FROM "${this.#jobsTable}" WHERE id = $1`, [id]) + return null + } + + return row.state as string + } + + async setJobState (id: string, state: string): Promise { + await this.#pool!.query(`UPDATE "${this.#jobsTable}" SET state = $2 WHERE id = $1`, [id, state]) + } + + async deleteJob (id: string): Promise { + const result = await this.#pool!.query(`DELETE FROM "${this.#jobsTable}" WHERE id = $1`, [id]) + + if (result.rowCount && result.rowCount > 0) { + await this.publishEvent(id, 'cancelled') + return true + } + return false + } + + async getJobStates (ids: string[]): Promise> { + const result = new Map() + if (ids.length === 0) return result + + const rows = await this.#pool!.query(`SELECT id, state, expires_at FROM "${this.#jobsTable}" WHERE id = ANY($1)`, [ + ids + ]) + + const now = Date.now() + const found = new Set() + const expiredIds: string[] = [] + + for (const row of rows.rows) { + const id = row.id as string + const expiresAt = row.expires_at as number | null + found.add(id) + + if (expiresAt && now >= expiresAt) { + expiredIds.push(id) + result.set(id, null) + } else { + result.set(id, row.state as string) + } + } + + // Clean up expired entries + if (expiredIds.length > 0) { + await this.#pool!.query(`DELETE FROM "${this.#jobsTable}" WHERE id = ANY($1)`, [expiredIds]) + } + + // Set null for IDs not found + for (const id of ids) { + if (!found.has(id)) { + result.set(id, null) + } + } + + return result + } + + async setJobExpiry (id: string, ttlMs: number): Promise { + const expiresAt = Date.now() + ttlMs + await this.#pool!.query(`UPDATE "${this.#jobsTable}" SET expires_at = $2 WHERE id = $1`, [id, expiresAt]) + } + + // ═══════════════════════════════════════════════════════════════════ + // RESULTS + // ═══════════════════════════════════════════════════════════════════ + + async setResult (id: string, result: Buffer, ttlMs: number): Promise { + const expiresAt = Date.now() + ttlMs + await this.#pool!.query( + `INSERT INTO "${this.#resultsTable}" (id, data, expires_at) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO UPDATE SET data = $2, expires_at = $3`, + [id, result, expiresAt] + ) + } + + async getResult (id: string): Promise { + const result = await this.#pool!.query(`SELECT data, expires_at FROM "${this.#resultsTable}" WHERE id = $1`, [id]) + + if (result.rows.length === 0) return null + + const row = result.rows[0] + if (Date.now() > (row.expires_at as number)) { + await this.#pool!.query(`DELETE FROM "${this.#resultsTable}" WHERE id = $1`, [id]) + return null + } + + return row.data as Buffer + } + + async setError (id: string, error: Buffer, ttlMs: number): Promise { + const expiresAt = Date.now() + ttlMs + await this.#pool!.query( + `INSERT INTO "${this.#errorsTable}" (id, data, expires_at) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO UPDATE SET data = $2, expires_at = $3`, + [id, error, expiresAt] + ) + } + + async getError (id: string): Promise { + const result = await this.#pool!.query(`SELECT data, expires_at FROM "${this.#errorsTable}" WHERE id = $1`, [id]) + + if (result.rows.length === 0) return null + + const row = result.rows[0] + if (Date.now() > (row.expires_at as number)) { + await this.#pool!.query(`DELETE FROM "${this.#errorsTable}" WHERE id = $1`, [id]) + return null + } + + return row.data as Buffer + } + + // ═══════════════════════════════════════════════════════════════════ + // WORKERS + // ═══════════════════════════════════════════════════════════════════ + + async registerWorker (workerId: string, ttlMs: number): Promise { + const expiresAt = Date.now() + ttlMs + await this.#pool!.query( + `INSERT INTO "${this.#workersTable}" (worker_id, expires_at) + VALUES ($1, $2) + ON CONFLICT (worker_id) DO UPDATE SET expires_at = $2`, + [workerId, expiresAt] + ) + } + + async refreshWorker (workerId: string, ttlMs: number): Promise { + await this.registerWorker(workerId, ttlMs) + } + + async unregisterWorker (workerId: string): Promise { + if (!this.#pool) return + await this.#pool.query(`DELETE FROM "${this.#workersTable}" WHERE worker_id = $1`, [workerId]) + await this.#pool.query(`DELETE FROM "${this.#processingTable}" WHERE worker_id = $1`, [workerId]) + } + + async getWorkers (): Promise { + const result = await this.#pool!.query(`SELECT worker_id FROM "${this.#workersTable}" WHERE expires_at > $1`, [ + Date.now() + ]) + return result.rows.map(row => row.worker_id as string) + } + + async getProcessingJobs (workerId: string): Promise { + const result = await this.#pool!.query(`SELECT message FROM "${this.#processingTable}" WHERE worker_id = $1`, [ + workerId + ]) + return result.rows.map(row => row.message as Buffer) + } + + // ═══════════════════════════════════════════════════════════════════ + // NOTIFICATIONS (for request/response) + // ═══════════════════════════════════════════════════════════════════ + + async subscribeToJob ( + id: string, + handler: (status: 'completed' | 'failed' | 'failing') => void + ): Promise<() => Promise> { + const eventName = `notify:${id}` + this.#notifyEmitter.on(eventName, handler) + + return async () => { + this.#notifyEmitter.off(eventName, handler) + } + } + + async notifyJobComplete (id: string, status: 'completed' | 'failed' | 'failing'): Promise { + await this.#pool!.query('SELECT pg_notify($1, $2)', [this.#notifyChannel, `${id}:${status}`]) + } + + // ═══════════════════════════════════════════════════════════════════ + // EVENTS (for monitoring/reaper) + // ═══════════════════════════════════════════════════════════════════ + + async subscribeToEvents (handler: (id: string, event: string) => void): Promise<() => Promise> { + this.#eventEmitter.on('event', handler) + this.#eventSubscription = true + + return async () => { + this.#eventEmitter.off('event', handler) + } + } + + async publishEvent (id: string, event: string): Promise { + await this.#pool!.query('SELECT pg_notify($1, $2)', [this.#eventsChannel, `${id}:${event}`]) + } + + // ═══════════════════════════════════════════════════════════════════ + // ATOMIC OPERATIONS + // ═══════════════════════════════════════════════════════════════════ + + async completeJob (id: string, message: Buffer, workerId: string, result: Buffer, resultTTL: number): Promise { + const timestamp = Date.now() + const state = `completed:${timestamp}` + const expiresAt = timestamp + resultTTL + + const client = await this.#pool!.connect() + try { + await client.query('BEGIN') + + // Set state to completed + dedup expiry + await client.query(`UPDATE "${this.#jobsTable}" SET state = $2, expires_at = $3 WHERE id = $1`, [ + id, + state, + expiresAt + ]) + + // Store result + await client.query( + `INSERT INTO "${this.#resultsTable}" (id, data, expires_at) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO UPDATE SET data = $2, expires_at = $3`, + [id, result, expiresAt] + ) + + // Remove from processing queue + await client.query(`DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, [ + workerId, + message + ]) + + await client.query('COMMIT') + } catch (err) { + await client.query('ROLLBACK').catch(() => {}) + throw err + } finally { + client.release() + } + + // Notify after commit (NOTIFY is transactional, but we do it separately for the emitter) + await this.notifyJobComplete(id, 'completed') + await this.publishEvent(id, 'completed') + } + + async failJob (id: string, message: Buffer, workerId: string, error: Buffer, errorTTL: number): Promise { + const timestamp = Date.now() + const state = `failed:${timestamp}` + const expiresAt = timestamp + errorTTL + + const client = await this.#pool!.connect() + try { + await client.query('BEGIN') + + // Set state to failed + dedup expiry + await client.query(`UPDATE "${this.#jobsTable}" SET state = $2, expires_at = $3 WHERE id = $1`, [ + id, + state, + expiresAt + ]) + + // Store error + await client.query( + `INSERT INTO "${this.#errorsTable}" (id, data, expires_at) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO UPDATE SET data = $2, expires_at = $3`, + [id, error, expiresAt] + ) + + // Remove from processing queue + await client.query(`DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, [ + workerId, + message + ]) + + await client.query('COMMIT') + } catch (err) { + await client.query('ROLLBACK').catch(() => {}) + throw err + } finally { + client.release() + } + + // Notify after commit + await this.notifyJobComplete(id, 'failed') + await this.publishEvent(id, 'failed') + } + + async retryJob (id: string, message: Buffer, workerId: string, attempts: number): Promise { + const timestamp = Date.now() + const state = `failing:${timestamp}:${attempts}` + + // Get the old message from processing queue + const processingJobs = await this.getProcessingJobs(workerId) + let oldMessage: Buffer | null = null + + for (const job of processingJobs) { + try { + const parsed = JSON.parse(job.toString()) + if (parsed.id === id) { + oldMessage = job + break + } + } catch { + // Ignore parse errors + } + } + + const client = await this.#pool!.connect() + try { + await client.query('BEGIN') + + // Set state to failing + await client.query(`UPDATE "${this.#jobsTable}" SET state = $2 WHERE id = $1`, [id, state]) + + // Remove old message from processing queue + if (oldMessage) { + await client.query(`DELETE FROM "${this.#processingTable}" WHERE worker_id = $1 AND message = $2`, [ + workerId, + oldMessage + ]) + } + + // Add new message to queue + await client.query(`INSERT INTO "${this.#queueTable}" (message) VALUES ($1)`, [message]) + + await client.query('COMMIT') + } catch (err) { + await client.query('ROLLBACK').catch(() => {}) + throw err + } finally { + client.release() + } + + // Notify after commit + await this.notifyJobComplete(id, 'failing') + await this.publishEvent(id, 'failing') + await this.#pool!.query("SELECT pg_notify($1, '')", [this.#newJobChannel]) + } + + // ═══════════════════════════════════════════════════════════════════ + // LEADER ELECTION + // ═══════════════════════════════════════════════════════════════════ + + async acquireLeaderLock (lockKey: string, ownerId: string, ttlMs: number): Promise { + const expiresAt = Date.now() + ttlMs + + // Try to insert, or take over if expired + const result = await this.#pool!.query( + `INSERT INTO "${this.#locksTable}" (lock_key, owner_id, expires_at) + VALUES ($1, $2, $3) + ON CONFLICT (lock_key) DO UPDATE + SET owner_id = $2, expires_at = $3 + WHERE "${this.#locksTable}".expires_at < $4 + RETURNING lock_key`, + [lockKey, ownerId, expiresAt, Date.now()] + ) + + return result.rows.length > 0 + } + + async renewLeaderLock (lockKey: string, ownerId: string, ttlMs: number): Promise { + const expiresAt = Date.now() + ttlMs + const result = await this.#pool!.query( + `UPDATE "${this.#locksTable}" SET expires_at = $3 WHERE lock_key = $1 AND owner_id = $2`, + [lockKey, ownerId, expiresAt] + ) + return result.rowCount !== null && result.rowCount > 0 + } + + async releaseLeaderLock (lockKey: string, ownerId: string): Promise { + if (!this.#pool) return false + const result = await this.#pool.query(`DELETE FROM "${this.#locksTable}" WHERE lock_key = $1 AND owner_id = $2`, [ + lockKey, + ownerId + ]) + return result.rowCount !== null && result.rowCount > 0 + } + + // ═══════════════════════════════════════════════════════════════════ + // NAMESPACE + // ═══════════════════════════════════════════════════════════════════ + + createNamespace (name: string): Storage { + const root = this.#parentStorage ?? this + const ns = new PgStorage({ + connectionString: root.#connectionString, + tablePrefix: `${this.#tablePrefix}${name}_`, + logger: this.#logger + }) + ns.#parentStorage = root + return ns + } + + /** + * Clear all data (useful for testing) + */ + async clear (): Promise { + if (!this.#pool) return + + await this.#pool.query(`TRUNCATE "${this.#queueTable}" RESTART IDENTITY`) + await this.#pool.query(`TRUNCATE "${this.#processingTable}" RESTART IDENTITY`) + await this.#pool.query(`DELETE FROM "${this.#jobsTable}"`) + await this.#pool.query(`DELETE FROM "${this.#resultsTable}"`) + await this.#pool.query(`DELETE FROM "${this.#errorsTable}"`) + await this.#pool.query(`DELETE FROM "${this.#workersTable}"`) + await this.#pool.query(`DELETE FROM "${this.#locksTable}"`) + } +} diff --git a/src/storage/redis.ts b/src/storage/redis.ts index e82fb61..f58448f 100644 --- a/src/storage/redis.ts +++ b/src/storage/redis.ts @@ -40,6 +40,14 @@ export class RedisStorage implements Storage { #eventSubscription: boolean = false #logger: Logger + // Namespace support + #parentStorage: RedisStorage | null = null // set for namespace views + #refCount: number = 0 // for root instance + + // PubSub handlers for namespace views (needed for cleanup on disconnect) + #messageHandler: ((channel: string, message: string) => void) | null = null + #pmessageHandler: ((_pattern: string, channel: string, message: string) => void) | null = null + constructor (config: RedisStorageConfig = {}) { this.#url = config.url ?? process.env.REDIS_URL ?? 'redis://localhost:6379' this.#keyPrefix = config.keyPrefix ?? 'jq:' @@ -103,6 +111,31 @@ export class RedisStorage implements Storage { } async connect (): Promise { + // Namespace view: connect parent and copy shared clients + if (this.#parentStorage) { + if (this.#client) return // already connected + this.#parentStorage.#refCount++ + await this.#parentStorage.connect() + + // Copy shared clients and scripts from parent + this.#client = this.#parentStorage.#client + this.#blockingClient = this.#parentStorage.#blockingClient + this.#subscriber = this.#parentStorage.#subscriber + this.#scriptSHAs = this.#parentStorage.#scriptSHAs + + // Register own message handlers on shared subscriber + this.#messageHandler = (channel: string, message: string) => { + this.#handlePubSubMessage(channel, message) + } + this.#pmessageHandler = (_pattern: string, channel: string, message: string) => { + this.#handlePubSubMessage(channel, message) + } + this.#subscriber!.on('message', this.#messageHandler) + this.#subscriber!.on('pmessage', this.#pmessageHandler) + return + } + + // Root instance: idempotent connect (no ref count for direct callers) if (this.#client) return const redisModule = await loadOptionalDependency<{ Redis: new (url: string) => Redis }>('iovalkey', 'RedisStorage') @@ -114,18 +147,53 @@ export class RedisStorage implements Storage { // Load Lua scripts await this.#loadScripts() - // Set up pub/sub message handler - this.#subscriber.on('message', (channel: string, message: string) => { + // Set up pub/sub message handler for root + this.#messageHandler = (channel: string, message: string) => { this.#handlePubSubMessage(channel, message) - }) - - this.#subscriber.on('pmessage', (_pattern: string, channel: string, message: string) => { + } + this.#pmessageHandler = (_pattern: string, channel: string, message: string) => { this.#handlePubSubMessage(channel, message) - }) + } + this.#subscriber.on('message', this.#messageHandler) + this.#subscriber.on('pmessage', this.#pmessageHandler) } async disconnect (): Promise { + // Namespace view: remove own handlers, null references, release parent ref + if (this.#parentStorage) { + if (this.#subscriber && this.#messageHandler) { + this.#subscriber.off('message', this.#messageHandler) + } + if (this.#subscriber && this.#pmessageHandler) { + this.#subscriber.off('pmessage', this.#pmessageHandler) + } + this.#messageHandler = null + this.#pmessageHandler = null + this.#client = null + this.#blockingClient = null + this.#subscriber = null + this.#scriptSHAs = null + + this.#eventEmitter.removeAllListeners() + this.#notifyEmitter.removeAllListeners() + this.#eventSubscription = false + + this.#parentStorage.#refCount-- + return + } + + // Root instance: only destroy when no namespace children are connected + if (this.#refCount > 0) return + if (this.#subscriber) { + if (this.#messageHandler) { + this.#subscriber.off('message', this.#messageHandler) + } + if (this.#pmessageHandler) { + this.#subscriber.off('pmessage', this.#pmessageHandler) + } + this.#messageHandler = null + this.#pmessageHandler = null this.#subscriber.disconnect() this.#subscriber = null } @@ -503,6 +571,17 @@ export class RedisStorage implements Storage { } } + createNamespace (name: string): Storage { + const root = this.#parentStorage ?? this + const ns = new RedisStorage({ + url: root.#url, + keyPrefix: `${this.#keyPrefix}${name}:`, + logger: this.#logger + }) + ns.#parentStorage = root + return ns + } + /** * Clear all data (useful for testing) */ diff --git a/src/storage/types.ts b/src/storage/types.ts index acab600..2d9e5bd 100644 --- a/src/storage/types.ts +++ b/src/storage/types.ts @@ -219,6 +219,13 @@ export interface Storage { */ setJobExpiry (id: string, ttlMs: number): Promise + /** + * Create a namespaced view of this storage. + * The returned storage shares the same underlying connections (where applicable) + * but uses a different key prefix, providing isolated queues on the same backend. + */ + createNamespace (name: string): Storage + // ═══════════════════════════════════════════════════════════════════ // ATOMIC OPERATIONS (Lua scripts in Redis) // ═══════════════════════════════════════════════════════════════════ diff --git a/src/types.ts b/src/types.ts index 5b7204e..65ce639 100644 --- a/src/types.ts +++ b/src/types.ts @@ -132,6 +132,9 @@ export interface QueueConfig { /** Storage backend (required) */ storage: Storage + /** Queue name for namespace isolation (optional) */ + name?: string + /** Hook called after execution and before persisting terminal state */ afterExecution?: AfterExecutionHook diff --git a/test/fixtures/pg.ts b/test/fixtures/pg.ts new file mode 100644 index 0000000..30d8ff5 --- /dev/null +++ b/test/fixtures/pg.ts @@ -0,0 +1,13 @@ +import { PgStorage } from '../../src/storage/pg.ts' + +/** + * Create a PgStorage instance for testing. + * Each test gets its own table prefix to avoid conflicts. + */ +export function createPgStorage (): PgStorage { + const suffix = `${Date.now()}_${Math.random().toString(36).slice(2)}` + return new PgStorage({ + connectionString: process.env.DATABASE_URL ?? 'postgresql://postgres:postgres@localhost:5432/job_queue_test', + tablePrefix: `t${suffix}_` + }) +} diff --git a/test/multi-queue.test.ts b/test/multi-queue.test.ts new file mode 100644 index 0000000..0e7a93d --- /dev/null +++ b/test/multi-queue.test.ts @@ -0,0 +1,235 @@ +import assert from 'node:assert' +import { afterEach, beforeEach, describe, it } from 'node:test' +import { Queue } from '../src/queue.ts' +import { MemoryStorage } from '../src/storage/memory.ts' +import type { Job } from '../src/types.ts' +import { createLatch } from './helpers/events.ts' + +describe('Multi-Queue (named queues on shared storage)', () => { + let storage: MemoryStorage + let queueA: Queue<{ value: number }, { result: number }> + let queueB: Queue<{ value: string }, { result: string }> + + beforeEach(() => { + storage = new MemoryStorage() + queueA = new Queue({ + storage, + name: 'emails', + concurrency: 1, + visibilityTimeout: 5000 + }) + queueB = new Queue({ + storage, + name: 'images', + concurrency: 1, + visibilityTimeout: 5000 + }) + }) + + afterEach(async () => { + await queueA.stop() + await queueB.stop() + }) + + describe('job isolation', () => { + it('should process jobs independently across queues', async () => { + const latchA = createLatch() + const latchB = createLatch() + + queueA.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value * 2 } + }) + + queueB.execute(async (job: Job<{ value: string }>) => { + return { result: job.payload.value.toUpperCase() } + }) + + queueA.on('completed', () => latchA.resolve()) + queueB.on('completed', () => latchB.resolve()) + + await queueA.start() + await queueB.start() + + await queueA.enqueue('job-1', { value: 21 }) + await queueB.enqueue('job-1', { value: 'hello' }) + + await latchA.promise + await latchB.promise + + const resultA = await queueA.getResult('job-1') + const resultB = await queueB.getResult('job-1') + + assert.deepStrictEqual(resultA, { result: 42 }) + assert.deepStrictEqual(resultB, { result: 'HELLO' }) + }) + + it('should not deliver queue A jobs to queue B consumers', async () => { + const latchA = createLatch() + + queueA.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value * 2 } + }) + + // Queue B has a handler but should never receive anything + queueB.execute(async () => { + assert.fail('Queue B should not receive queue A jobs') + return { result: 'never' } + }) + + queueA.on('completed', () => latchA.resolve()) + + await queueA.start() + await queueB.start() + + await queueA.enqueue('job-1', { value: 5 }) + await latchA.promise + + const resultA = await queueA.getResult('job-1') + assert.deepStrictEqual(resultA, { result: 10 }) + }) + }) + + describe('dedup isolation', () => { + it('should allow same job ID in different queues', async () => { + await queueA.start() + await queueB.start() + + const resultA = await queueA.enqueue('shared-id', { value: 1 }) + const resultB = await queueB.enqueue('shared-id', { value: 'test' }) + + assert.strictEqual(resultA.status, 'queued') + assert.strictEqual(resultB.status, 'queued') + }) + + it('should enforce dedup within the same named queue', async () => { + await queueA.start() + + const result1 = await queueA.enqueue('dup-id', { value: 1 }) + const result2 = await queueA.enqueue('dup-id', { value: 2 }) + + assert.strictEqual(result1.status, 'queued') + assert.strictEqual(result2.status, 'duplicate') + }) + }) + + describe('result isolation', () => { + it('should store and retrieve results independently per queue', async () => { + const latchA = createLatch() + const latchB = createLatch() + + queueA.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value + 100 } + }) + + queueB.execute(async (job: Job<{ value: string }>) => { + return { result: `processed:${job.payload.value}` } + }) + + queueA.on('completed', () => latchA.resolve()) + queueB.on('completed', () => latchB.resolve()) + + await queueA.start() + await queueB.start() + + await queueA.enqueue('result-test', { value: 5 }) + await queueB.enqueue('result-test', { value: 'data' }) + + await latchA.promise + await latchB.promise + + const resultA = await queueA.getResult('result-test') + const resultB = await queueB.getResult('result-test') + + assert.deepStrictEqual(resultA, { result: 105 }) + assert.deepStrictEqual(resultB, { result: 'processed:data' }) + }) + }) + + describe('status isolation', () => { + it('should track status independently per queue', async () => { + const latchA = createLatch() + + queueA.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value } + }) + + queueA.on('completed', () => latchA.resolve()) + + await queueA.start() + await queueB.start() + + await queueA.enqueue('status-test', { value: 1 }) + await queueB.enqueue('status-test', { value: 'pending' }) + + await latchA.promise + + const statusA = await queueA.getStatus('status-test') + const statusB = await queueB.getStatus('status-test') + + assert.strictEqual(statusA?.state, 'completed') + assert.strictEqual(statusB?.state, 'queued') + }) + }) + + describe('backward compatibility', () => { + it('should work without a name (no namespace)', async () => { + const latch = createLatch() + const plainQueue = new Queue<{ value: number }, { result: number }>({ + storage, + concurrency: 1, + visibilityTimeout: 5000 + }) + + plainQueue.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value * 3 } + }) + + plainQueue.on('completed', () => latch.resolve()) + + await plainQueue.start() + await plainQueue.enqueue('plain-job', { value: 10 }) + + await latch.promise + + const result = await plainQueue.getResult('plain-job') + assert.deepStrictEqual(result, { result: 30 }) + + await plainQueue.stop() + }) + }) + + describe('connect/disconnect lifecycle', () => { + it('should handle multiple start/stop cycles', async () => { + queueA.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value } + }) + + await queueA.start() + await queueB.start() + + await queueA.stop() + await queueB.stop() + + // Second cycle + queueA = new Queue({ + storage, + name: 'emails', + concurrency: 1, + visibilityTimeout: 5000 + }) + queueA.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value } + }) + + const latch = createLatch() + queueA.on('completed', () => latch.resolve()) + + await queueA.start() + await queueA.enqueue('cycle-test', { value: 7 }) + await latch.promise + + const result = await queueA.getResult('cycle-test') + assert.deepStrictEqual(result, { result: 7 }) + }) + }) +}) diff --git a/test/pg-storage.test.ts b/test/pg-storage.test.ts new file mode 100644 index 0000000..babb12a --- /dev/null +++ b/test/pg-storage.test.ts @@ -0,0 +1,498 @@ +import assert from 'node:assert' +import { afterEach, beforeEach, describe, it } from 'node:test' +import { setTimeout as sleep } from 'node:timers/promises' +import { type PgStorage } from '../src/storage/pg.ts' +import { createPgStorage } from './fixtures/pg.ts' +import { promisifyCallback, waitForCallbacks } from './helpers/events.ts' + +describe('PgStorage', () => { + let storage: PgStorage + + beforeEach(async () => { + storage = createPgStorage() + await storage.connect() + }) + + afterEach(async () => { + await storage.clear() + await storage.disconnect() + }) + + describe('enqueue/dequeue', () => { + it('should enqueue and dequeue a job', async () => { + const message = Buffer.from(JSON.stringify({ id: 'job-1', payload: 'test' })) + const result = await storage.enqueue('job-1', message, Date.now()) + + assert.strictEqual(result, null, 'enqueue should return null for new job') + + const dequeued = await storage.dequeue('worker-1', 1) + assert.ok(dequeued, 'dequeue should return the message') + assert.deepStrictEqual(dequeued, message) + }) + + it('should return existing state for duplicate job', async () => { + const message = Buffer.from(JSON.stringify({ id: 'job-1', payload: 'test' })) + const timestamp = Date.now() + + await storage.enqueue('job-1', message, timestamp) + const result = await storage.enqueue('job-1', message, timestamp) + + assert.ok(result, 'should return existing state') + assert.ok(result.startsWith('queued:'), 'state should start with queued:') + }) + + it('should return null on dequeue timeout', async () => { + const result = await storage.dequeue('worker-1', 0.1) + assert.strictEqual(result, null) + }) + + it('should dequeue in FIFO order', async () => { + const msg1 = Buffer.from('msg-1') + const msg2 = Buffer.from('msg-2') + const msg3 = Buffer.from('msg-3') + + await storage.enqueue('job-1', msg1, Date.now()) + await storage.enqueue('job-2', msg2, Date.now()) + await storage.enqueue('job-3', msg3, Date.now()) + + const d1 = await storage.dequeue('worker-1', 1) + const d2 = await storage.dequeue('worker-1', 1) + const d3 = await storage.dequeue('worker-1', 1) + + assert.deepStrictEqual(d1, msg1) + assert.deepStrictEqual(d2, msg2) + assert.deepStrictEqual(d3, msg3) + }) + + it('should wake up dequeue waiter when job is enqueued', async () => { + // Start dequeue that will block + const dequeuePromise = storage.dequeue('worker-1', 5) + + // Give it time to register the waiter + await sleep(50) + + // Enqueue a job - should wake up the waiter via NOTIFY + const msg = Buffer.from('wakeup-test') + await storage.enqueue('job-1', msg, Date.now()) + + const result = await dequeuePromise + assert.deepStrictEqual(result, msg) + }) + }) + + describe('job state', () => { + it('should get and set job state', async () => { + await storage.enqueue('job-1', Buffer.from('test'), Date.now()) + await storage.setJobState('job-1', 'processing:123456:worker-1') + const state = await storage.getJobState('job-1') + + assert.strictEqual(state, 'processing:123456:worker-1') + }) + + it('should return null for non-existent job', async () => { + const state = await storage.getJobState('non-existent') + assert.strictEqual(state, null) + }) + + it('should delete job', async () => { + await storage.enqueue('job-1', Buffer.from('test'), Date.now()) + const deleted = await storage.deleteJob('job-1') + + assert.strictEqual(deleted, true) + assert.strictEqual(await storage.getJobState('job-1'), null) + }) + + it('should return false when deleting non-existent job', async () => { + const deleted = await storage.deleteJob('non-existent') + assert.strictEqual(deleted, false) + }) + + it('should get multiple job states', async () => { + await storage.enqueue('job-1', Buffer.from('a'), Date.now()) + await storage.setJobState('job-1', 'queued:1') + await storage.enqueue('job-2', Buffer.from('b'), Date.now()) + await storage.setJobState('job-2', 'processing:2') + + const states = await storage.getJobStates(['job-1', 'job-2', 'job-3']) + + assert.strictEqual(states.get('job-1'), 'queued:1') + assert.strictEqual(states.get('job-2'), 'processing:2') + assert.strictEqual(states.get('job-3'), null) + }) + }) + + describe('requeue', () => { + it('should move job from processing queue back to main queue', async () => { + const message = Buffer.from('requeue-test') + await storage.enqueue('job-1', message, Date.now()) + + // Dequeue to worker-1 + const dequeued = await storage.dequeue('worker-1', 1) + assert.deepStrictEqual(dequeued, message) + + // Verify processing queue has the job + const processing = await storage.getProcessingJobs('worker-1') + assert.strictEqual(processing.length, 1) + + // Requeue + await storage.requeue('job-1', message, 'worker-1') + + // Processing queue should be empty + const processingAfter = await storage.getProcessingJobs('worker-1') + assert.strictEqual(processingAfter.length, 0) + + // Should be able to dequeue again + const redequeued = await storage.dequeue('worker-2', 1) + assert.deepStrictEqual(redequeued, message) + }) + }) + + describe('ack', () => { + it('should remove job from processing queue', async () => { + const message = Buffer.from('ack-test') + await storage.enqueue('job-1', message, Date.now()) + + const dequeued = await storage.dequeue('worker-1', 1) + assert.ok(dequeued) + + await storage.ack('job-1', message, 'worker-1') + + const processing = await storage.getProcessingJobs('worker-1') + assert.strictEqual(processing.length, 0) + }) + }) + + describe('results', () => { + it('should store and retrieve result', async () => { + const result = Buffer.from(JSON.stringify({ success: true })) + await storage.setResult('job-1', result, 60000) + + const retrieved = await storage.getResult('job-1') + assert.deepStrictEqual(retrieved, result) + }) + + it('should return null for non-existent result', async () => { + const result = await storage.getResult('non-existent') + assert.strictEqual(result, null) + }) + + it('should return null for expired result', async () => { + await storage.setResult('job-1', Buffer.from('short-lived'), 20) + await sleep(30) + + const result = await storage.getResult('job-1') + assert.strictEqual(result, null) + }) + }) + + describe('errors', () => { + it('should store and retrieve error', async () => { + const error = Buffer.from(JSON.stringify({ message: 'Something failed' })) + await storage.setError('job-1', error, 60000) + + const retrieved = await storage.getError('job-1') + assert.deepStrictEqual(retrieved, error) + }) + + it('should return null for non-existent error', async () => { + const error = await storage.getError('non-existent') + assert.strictEqual(error, null) + }) + }) + + describe('workers', () => { + it('should register and get workers', async () => { + await storage.registerWorker('worker-1', 60000) + await storage.registerWorker('worker-2', 60000) + + const workers = await storage.getWorkers() + assert.deepStrictEqual(workers.sort(), ['worker-1', 'worker-2']) + }) + + it('should unregister worker', async () => { + await storage.registerWorker('worker-1', 60000) + await storage.unregisterWorker('worker-1') + + const workers = await storage.getWorkers() + assert.deepStrictEqual(workers, []) + }) + + it('should not return expired workers', async () => { + await storage.registerWorker('worker-1', 20) + await sleep(30) + + const workers = await storage.getWorkers() + assert.deepStrictEqual(workers, []) + }) + }) + + describe('notifications', () => { + it('should notify on job completion', async () => { + const { value, unsubscribe } = await promisifyCallback(handler => + storage.subscribeToJob('job-1', handler)) + + await sleep(50) + + await storage.notifyJobComplete('job-1', 'completed') + + const notifiedStatus = await value + assert.strictEqual(notifiedStatus, 'completed') + + await unsubscribe() + }) + + it('should notify on job failure', async () => { + const { value, unsubscribe } = await promisifyCallback(handler => + storage.subscribeToJob('job-1', handler)) + + await sleep(50) + + await storage.notifyJobComplete('job-1', 'failed') + + const notifiedStatus = await value + assert.strictEqual(notifiedStatus, 'failed') + + await unsubscribe() + }) + }) + + describe('events', () => { + it('should emit events on state changes', async () => { + const events: Array<{ id: string; event: string }> = [] + const { callback, promise: eventsReceived } = waitForCallbacks(2) + + const unsubscribe = await storage.subscribeToEvents((id, event) => { + events.push({ id, event }) + callback() + }) + + await sleep(50) + + await storage.publishEvent('job-1', 'processing') + await storage.publishEvent('job-1', 'completed') + + await eventsReceived + + assert.deepStrictEqual(events, [ + { id: 'job-1', event: 'processing' }, + { id: 'job-1', event: 'completed' } + ]) + + await unsubscribe() + }) + + it('should emit queued event on enqueue', async () => { + const events: Array<{ id: string; event: string }> = [] + const { callback, promise: eventReceived } = waitForCallbacks(1) + + const unsubscribe = await storage.subscribeToEvents((id, event) => { + events.push({ id, event }) + callback() + }) + + await sleep(50) + + await storage.enqueue('job-1', Buffer.from('test'), Date.now()) + + await eventReceived + + assert.deepStrictEqual(events, [{ id: 'job-1', event: 'queued' }]) + + await unsubscribe() + }) + }) + + describe('atomic operations', () => { + it('should complete job atomically', async () => { + const message = Buffer.from('complete-test') + const result = Buffer.from(JSON.stringify({ success: true })) + + await storage.enqueue('job-1', message, Date.now()) + await storage.dequeue('worker-1', 1) + await storage.setJobState('job-1', 'processing:123:worker-1') + + const { value: notificationReceived, unsubscribe } = await promisifyCallback(handler => + storage.subscribeToJob('job-1', handler)) + + await sleep(50) + + await storage.completeJob('job-1', message, 'worker-1', result, 60000) + + await notificationReceived + + // Verify state + const state = await storage.getJobState('job-1') + assert.ok(state?.startsWith('completed:')) + + // Verify result stored + const storedResult = await storage.getResult('job-1') + assert.deepStrictEqual(storedResult, result) + + // Verify removed from processing queue + const processing = await storage.getProcessingJobs('worker-1') + assert.strictEqual(processing.length, 0) + + await unsubscribe() + }) + + it('should fail job atomically', async () => { + const message = Buffer.from('fail-test') + const error = Buffer.from(JSON.stringify({ message: 'Error' })) + + await storage.enqueue('job-1', message, Date.now()) + await storage.dequeue('worker-1', 1) + await storage.setJobState('job-1', 'processing:123:worker-1') + + const { value: notificationReceived, unsubscribe } = await promisifyCallback(handler => + storage.subscribeToJob('job-1', handler)) + + await sleep(50) + + await storage.failJob('job-1', message, 'worker-1', error, 60000) + + const notifiedStatus = await notificationReceived + + // Verify state + const state = await storage.getJobState('job-1') + assert.ok(state?.startsWith('failed:')) + + // Verify error stored + const storedError = await storage.getError('job-1') + assert.deepStrictEqual(storedError, error) + + // Verify notification + assert.strictEqual(notifiedStatus, 'failed') + + await unsubscribe() + }) + + it('should retry job atomically', async () => { + const message = Buffer.from(JSON.stringify({ id: 'job-1', payload: 'test', attempts: 0 })) + + await storage.enqueue('job-1', message, Date.now()) + await storage.dequeue('worker-1', 1) + await storage.setJobState('job-1', 'processing:123:worker-1') + + const updatedMessage = Buffer.from(JSON.stringify({ id: 'job-1', payload: 'test', attempts: 1 })) + await storage.retryJob('job-1', updatedMessage, 'worker-1', 1) + + // Verify state + const state = await storage.getJobState('job-1') + assert.ok(state?.startsWith('failing:')) + assert.ok(state?.endsWith(':1')) + + // Verify job is back in queue + const dequeued = await storage.dequeue('worker-2', 1) + assert.deepStrictEqual(dequeued, updatedMessage) + }) + }) + + describe('leader election', () => { + it('should acquire lock when no lock exists', async () => { + const acquired = await storage.acquireLeaderLock('test-lock', 'owner-1', 60000) + assert.strictEqual(acquired, true) + }) + + it('should fail to acquire lock held by another', async () => { + await storage.acquireLeaderLock('test-lock', 'owner-1', 60000) + const acquired = await storage.acquireLeaderLock('test-lock', 'owner-2', 60000) + assert.strictEqual(acquired, false) + }) + + it('should acquire lock when expired', async () => { + await storage.acquireLeaderLock('test-lock', 'owner-1', 10) + await sleep(20) + + const acquired = await storage.acquireLeaderLock('test-lock', 'owner-2', 60000) + assert.strictEqual(acquired, true) + }) + + it('should renew lock by same owner', async () => { + await storage.acquireLeaderLock('test-lock', 'owner-1', 60000) + const renewed = await storage.renewLeaderLock('test-lock', 'owner-1', 60000) + assert.strictEqual(renewed, true) + }) + + it('should fail to renew lock by different owner', async () => { + await storage.acquireLeaderLock('test-lock', 'owner-1', 60000) + const renewed = await storage.renewLeaderLock('test-lock', 'owner-2', 60000) + assert.strictEqual(renewed, false) + }) + + it('should release lock by same owner', async () => { + await storage.acquireLeaderLock('test-lock', 'owner-1', 60000) + const released = await storage.releaseLeaderLock('test-lock', 'owner-1') + assert.strictEqual(released, true) + + // Should be acquirable again + const acquired = await storage.acquireLeaderLock('test-lock', 'owner-2', 60000) + assert.strictEqual(acquired, true) + }) + + it('should fail to release lock by different owner', async () => { + await storage.acquireLeaderLock('test-lock', 'owner-1', 60000) + const released = await storage.releaseLeaderLock('test-lock', 'owner-2') + assert.strictEqual(released, false) + }) + }) + + describe('dedup expiry', () => { + it('should allow re-enqueue after dedup expiry', async () => { + await storage.enqueue('job-1', Buffer.from('first'), Date.now()) + await storage.setJobExpiry('job-1', 20) + + await sleep(30) + + const result = await storage.enqueue('job-1', Buffer.from('second'), Date.now()) + assert.strictEqual(result, null, 'should allow re-enqueue after expiry') + }) + + it('should block re-enqueue within TTL window', async () => { + await storage.enqueue('job-1', Buffer.from('first'), Date.now()) + await storage.setJobExpiry('job-1', 60000) + + const result = await storage.enqueue('job-1', Buffer.from('second'), Date.now()) + assert.ok(result, 'should block re-enqueue within TTL') + }) + }) + + describe('clear', () => { + it('should clear all data', async () => { + await storage.enqueue('job-1', Buffer.from('test'), Date.now()) + await storage.setResult('job-1', Buffer.from('result'), 60000) + await storage.registerWorker('worker-1', 60000) + + await storage.clear() + + assert.strictEqual(await storage.getJobState('job-1'), null) + assert.strictEqual(await storage.getResult('job-1'), null) + assert.deepStrictEqual(await storage.getWorkers(), []) + }) + }) + + describe('namespace', () => { + it('should isolate data between namespaces', async () => { + const ns1 = storage.createNamespace('emails') as PgStorage + const ns2 = storage.createNamespace('images') as PgStorage + + await ns1.connect() + await ns2.connect() + + try { + await ns1.enqueue('job-1', Buffer.from('email-data'), Date.now()) + await ns2.enqueue('job-1', Buffer.from('image-data'), Date.now()) + + const d1 = await ns1.dequeue('worker-1', 1) + const d2 = await ns2.dequeue('worker-1', 1) + + assert.deepStrictEqual(d1, Buffer.from('email-data')) + assert.deepStrictEqual(d2, Buffer.from('image-data')) + } finally { + await ns1.clear() + await ns2.clear() + await ns2.disconnect() + await ns1.disconnect() + } + }) + }) +})