Skip to content

Commit e038ffa

Browse files
authored
feat: Do not crash the queue or reapers in case of errors. (#32)
Signed-off-by: Paolo Insogna <paolo@cowtech.it>
1 parent 086418e commit e038ffa

4 files changed

Lines changed: 121 additions & 25 deletions

File tree

src/queue.ts

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
1-
import { EventEmitter } from 'node:events'
21
import { randomUUID } from 'node:crypto'
2+
import { EventEmitter } from 'node:events'
33
import type { Logger } from 'pino'
4-
import type { Storage } from './storage/types.ts'
4+
import { Consumer } from './consumer.ts'
5+
import { Producer } from './producer.ts'
56
import type { Serde } from './serde/index.ts'
7+
import { createJsonSerde } from './serde/index.ts'
8+
import type { Storage } from './storage/types.ts'
69
import type {
7-
QueueConfig,
8-
EnqueueOptions,
10+
AfterExecutionHook,
11+
CancelResult,
912
EnqueueAndWaitOptions,
13+
EnqueueOptions,
1014
EnqueueResult,
11-
CancelResult,
12-
MessageStatus,
13-
UpdateResultTTLResult,
1415
JobHandler,
16+
MessageStatus,
17+
QueueConfig,
1518
QueueEvents,
16-
AfterExecutionHook
19+
UpdateResultTTLResult
1720
} from './types.ts'
18-
import { Producer } from './producer.ts'
19-
import { Consumer } from './consumer.ts'
20-
import { createJsonSerde } from './serde/index.ts'
2121
import { abstractLogger, ensureLoggableError } from './utils/logging.ts'
2222

2323
/**
@@ -204,8 +204,7 @@ export class Queue<TPayload, TResult = void> extends EventEmitter<QueueEvents<TR
204204

205205
// Forward consumer events
206206
this.#consumer.on('error', error => {
207-
this.#logger.error({ err: ensureLoggableError(error) }, 'Consumer emitted error.')
208-
this.emit('error', error)
207+
this.#emitError(error, 'Consumer emitted error.')
209208
})
210209

211210
this.#consumer.on('completed', (id, result) => {
@@ -230,9 +229,18 @@ export class Queue<TPayload, TResult = void> extends EventEmitter<QueueEvents<TR
230229

231230
this.#consumer.execute(this.#handler)
232231
this.#consumer.start().catch(err => {
233-
const error = err instanceof Error ? err : new Error(String(err))
234-
this.#logger.error({ err: ensureLoggableError(error) }, 'Failed to start consumer.')
235-
this.emit('error', error)
232+
this.#emitError(err, 'Failed to start consumer.')
236233
})
237234
}
235+
236+
#emitError (err: unknown, message: string): void {
237+
const error = err instanceof Error ? err : new Error(String(err))
238+
239+
if (this.listenerCount('error') > 0) {
240+
this.emit('error', error)
241+
return
242+
}
243+
244+
this.#logger.error({ err: ensureLoggableError(error) }, message)
245+
}
238246
}

src/reaper.ts

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
import { EventEmitter } from 'node:events'
21
import { randomUUID } from 'node:crypto'
3-
import type { Storage } from './storage/types.ts'
2+
import { EventEmitter } from 'node:events'
3+
import type { Logger } from 'pino'
44
import type { Serde } from './serde/index.ts'
5-
import type { QueueMessage } from './types.ts'
65
import { createJsonSerde } from './serde/index.ts'
6+
import type { Storage } from './storage/types.ts'
7+
import type { QueueMessage } from './types.ts'
8+
import { abstractLogger, ensureLoggableError } from './utils/logging.ts'
79
import { parseState } from './utils/state.ts'
810

911
interface LeaderElectionConfig {
@@ -18,6 +20,7 @@ interface ReaperConfig<TPayload> {
1820
payloadSerde?: Serde<TPayload>
1921
visibilityTimeout?: number
2022
leaderElection?: LeaderElectionConfig
23+
logger?: Logger
2124
}
2225

2326
interface ReaperEvents {
@@ -46,6 +49,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
4649
#payloadSerde: Serde<TPayload>
4750
#visibilityTimeout: number
4851
#leaderElection: LeaderElectionConfig
52+
#logger: Logger
4953

5054
#running = false
5155
#unsubscribe: (() => Promise<void>) | null = null
@@ -63,6 +67,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
6367
this.#visibilityTimeout = config.visibilityTimeout ?? 30000
6468
this.#leaderElection = config.leaderElection ?? { enabled: false }
6569
this.#reaperId = randomUUID()
70+
this.#logger = (config.logger ?? abstractLogger).child({ component: 'reaper', reaperId: this.#reaperId })
6671
}
6772

6873
/**
@@ -201,7 +206,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
201206
}
202207
}
203208
} catch (err) {
204-
this.emit('error', err as Error)
209+
this.#emitError(err, 'Leadership check failed.')
205210
}
206211
}, interval)
207212
}
@@ -212,7 +217,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
212217
async #tryAcquireLock (ttlMs: number): Promise<boolean> {
213218
if (!this.#storage.acquireLeaderLock) {
214219
// Storage doesn't support leader election
215-
this.emit('error', new Error('Storage does not support leader election'))
220+
this.#emitError(new Error('Storage does not support leader election'))
216221
return false
217222
}
218223

@@ -294,7 +299,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
294299
const timer = setTimeout(() => {
295300
this.#processingTimers.delete(id)
296301
this.#checkJob(id).catch(err => {
297-
this.emit('error', err)
302+
this.#emitError(err, 'Failed checking job after visibility timer.')
298303
})
299304
}, this.#visibilityTimeout)
300305

@@ -337,7 +342,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
337342
const timer = setTimeout(() => {
338343
this.#processingTimers.delete(id)
339344
this.#checkJob(id).catch(err => {
340-
this.emit('error', err)
345+
this.#emitError(err, 'Failed re-checking job after visibility timeout.')
341346
})
342347
}, remaining)
343348
this.#processingTimers.set(id, timer)
@@ -353,7 +358,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
353358
*/
354359
async #recoverStalledJob (id: string, workerId?: string): Promise<void> {
355360
if (!workerId) {
356-
this.emit('error', new Error(`Cannot recover stalled job ${id}: no workerId in state`))
361+
this.#emitError(new Error(`Cannot recover stalled job ${id}: no workerId in state`))
357362
return
358363
}
359364

@@ -435,7 +440,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
435440
const timer = setTimeout(() => {
436441
this.#processingTimers.delete(queueMessage.id)
437442
this.#checkJob(queueMessage.id).catch(err => {
438-
this.emit('error', err)
443+
this.#emitError(err, 'Failed checking worker processing job.')
439444
})
440445
}, remaining)
441446
this.#processingTimers.set(queueMessage.id, timer)
@@ -446,4 +451,15 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
446451
}
447452
}
448453
}
454+
455+
#emitError (err: unknown, message = 'Reaper emitted error.'): void {
456+
const error = err instanceof Error ? err : new Error(String(err))
457+
458+
if (this.listenerCount('error') > 0) {
459+
this.emit('error', error)
460+
return
461+
}
462+
463+
this.#logger.error({ err: ensureLoggableError(error) }, message)
464+
}
449465
}

test/queue.test.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,47 @@ describe('Queue', () => {
542542

543543
await localQueue.stop()
544544
})
545+
546+
it('should log errors when no queue error listeners are registered', async t => {
547+
const { promise, resolve } = Promise.withResolvers<void>()
548+
549+
const localStorage = new MemoryStorage()
550+
const logs: string[] = []
551+
const logger: Logger = {
552+
fatal: () => {},
553+
error: () => {
554+
resolve()
555+
logs.push('error')
556+
},
557+
warn: () => {},
558+
info: () => {},
559+
debug: () => {},
560+
trace: () => {},
561+
child () {
562+
return this
563+
}
564+
} as unknown as Logger
565+
566+
const localQueue = new Queue<{ value: number }, { result: number }>({
567+
storage: localStorage,
568+
visibilityTimeout: 5000,
569+
logger,
570+
afterExecution: () => {
571+
throw new Error('hook-error')
572+
}
573+
})
574+
575+
localQueue.execute(async (job: Job<{ value: number }>) => {
576+
return { result: job.payload.value * 2 }
577+
})
578+
579+
await localQueue.start()
580+
t.after(() => localQueue.stop())
581+
await localQueue.enqueue('job-1', { value: 21 })
582+
await promise
583+
584+
assert.ok(logs.includes('error'))
585+
})
545586
})
546587

547588
describe('updateResultTTL', () => {

test/reaper.test.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { describe, it, beforeEach, afterEach } from 'node:test'
22
import assert from 'node:assert'
33
import { setTimeout as sleep } from 'node:timers/promises'
4+
import type { Logger } from 'pino'
45
import { Queue, Reaper, MemoryStorage, type Job } from '../src/index.ts'
56
import { once, waitForEvents } from './helpers/events.ts'
67

@@ -51,6 +52,36 @@ describe('Reaper', () => {
5152
await reaper.stop()
5253
await reaper.stop()
5354
})
55+
56+
it('should log errors when no reaper error listeners are registered', async () => {
57+
const logs: string[] = []
58+
const logger: Logger = {
59+
fatal: () => {},
60+
error: () => {
61+
logs.push('error')
62+
},
63+
warn: () => {},
64+
info: () => {},
65+
debug: () => {},
66+
trace: () => {},
67+
child () {
68+
return this
69+
}
70+
} as unknown as Logger
71+
72+
const localReaper = new Reaper<{ value: number }>({
73+
storage,
74+
visibilityTimeout: 100,
75+
leaderElection: { enabled: true },
76+
logger
77+
})
78+
79+
await storage.connect()
80+
await localReaper.start()
81+
await localReaper.stop()
82+
83+
assert.ok(logs.includes('error'))
84+
})
5485
})
5586

5687
describe('stalled job detection', () => {

0 commit comments

Comments
 (0)