-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathDomainEventEmitter.ts
More file actions
234 lines (209 loc) · 8.03 KB
/
DomainEventEmitter.ts
File metadata and controls
234 lines (209 loc) · 8.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import {
type ErrorReporter,
InternalError,
type TransactionObservabilityManager,
resolveGlobalErrorLogObject,
} from '@lokalise/node-core'
import type { MetadataFiller } from '../messages/MetadataFiller'
import type { HandlerSpy, HandlerSpyParams, PublicHandlerSpy } from '../queues/HandlerSpy'
import { resolveHandlerSpy } from '../queues/HandlerSpy'
import { randomUUID } from 'node:crypto'
import type { ConsumerMessageMetadataType } from '@message-queue-toolkit/schemas'
import type { Logger } from '../types/MessageQueueTypes'
import type { EventRegistry } from './EventRegistry'
import type {
AnyEventHandler,
CommonEventDefinition,
CommonEventDefinitionConsumerSchemaType,
CommonEventDefinitionPublisherSchemaType,
EventHandler,
EventTypeNames,
SingleEventHandler,
} from './eventTypes'
export type DomainEventEmitterDependencies<SupportedEvents extends CommonEventDefinition[]> = {
eventRegistry: EventRegistry<SupportedEvents>
metadataFiller: MetadataFiller
logger: Logger
errorReporter?: ErrorReporter
transactionObservabilityManager?: TransactionObservabilityManager
}
type Handlers<T> = {
background: T[]
foreground: T[]
}
export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]> {
private readonly eventRegistry: EventRegistry<SupportedEvents>
private readonly metadataFiller: MetadataFiller
private readonly logger: Logger
private readonly errorReporter?: ErrorReporter
private readonly transactionObservabilityManager?: TransactionObservabilityManager
private readonly _handlerSpy?: HandlerSpy<
CommonEventDefinitionConsumerSchemaType<SupportedEvents[number]>
>
private readonly eventHandlerMap: Map<
string,
Handlers<EventHandler<CommonEventDefinitionPublisherSchemaType<SupportedEvents[number]>>>
>
private readonly inProgressBackgroundHandlerByEventId: Map<string, Promise<void>>
constructor(
deps: DomainEventEmitterDependencies<SupportedEvents>,
options: {
handlerSpy?: HandlerSpy<object> | HandlerSpyParams | boolean
} = {},
) {
this.eventRegistry = deps.eventRegistry
this.metadataFiller = deps.metadataFiller
this.logger = deps.logger
this.errorReporter = deps.errorReporter
this.transactionObservabilityManager = deps.transactionObservabilityManager
this._handlerSpy =
resolveHandlerSpy<CommonEventDefinitionConsumerSchemaType<SupportedEvents[number]>>(options)
this.eventHandlerMap = new Map()
this.inProgressBackgroundHandlerByEventId = new Map()
}
get handlerSpy(): PublicHandlerSpy<
CommonEventDefinitionPublisherSchemaType<SupportedEvents[number]>
> {
if (!this._handlerSpy) {
throw new Error(
'HandlerSpy was not instantiated, please pass `handlerSpy` parameter during queue service creation.',
)
}
return this._handlerSpy
}
public async dispose(): Promise<void> {
await Promise.all(this.inProgressBackgroundHandlerByEventId.values())
this.inProgressBackgroundHandlerByEventId.clear()
this.eventHandlerMap.clear()
this._handlerSpy?.clear()
}
public async emit<SupportedEvent extends SupportedEvents[number]>(
supportedEvent: SupportedEvent,
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>,
precedingMessageMetadata?: Partial<ConsumerMessageMetadataType>,
): Promise<Omit<CommonEventDefinitionConsumerSchemaType<SupportedEvent>, 'type'>> {
const eventTypeName = supportedEvent.publisherSchema.shape.type.value
if (!this.eventRegistry.isSupportedEvent(eventTypeName)) {
throw new InternalError({
errorCode: 'UNKNOWN_EVENT',
message: `Unknown event ${eventTypeName}`,
})
}
if (!data.timestamp) data.timestamp = this.metadataFiller.produceTimestamp()
if (!data.id) data.id = this.metadataFiller.produceId()
if (!data.metadata) {
data.metadata = this.metadataFiller.produceMetadata(
// @ts-ignore
data,
supportedEvent,
precedingMessageMetadata ?? {},
)
}
if (!data.metadata.correlationId) data.metadata.correlationId = this.metadataFiller.produceId()
const validatedEvent = this.eventRegistry
.getEventDefinitionByTypeName(eventTypeName)
.publisherSchema.parse({ type: eventTypeName, ...data })
// @ts-ignore
await this.handleEvent(validatedEvent)
// @ts-ignore
return validatedEvent
}
/**
* Register handler for a specific event
*/
public on<EventTypeName extends EventTypeNames<SupportedEvents[number]>>(
eventTypeName: EventTypeName,
handler: SingleEventHandler<SupportedEvents, EventTypeName>,
isBackgroundHandler = false,
) {
if (!this.eventHandlerMap.has(eventTypeName)) {
this.eventHandlerMap.set(eventTypeName, { foreground: [], background: [] })
}
if (isBackgroundHandler) this.eventHandlerMap.get(eventTypeName)?.background.push(handler)
else this.eventHandlerMap.get(eventTypeName)?.foreground.push(handler)
}
/**
* Register handler for multiple events
*/
public onMany<EventTypeName extends EventTypeNames<SupportedEvents[number]>>(
eventTypeNames: EventTypeName[],
handler: SingleEventHandler<SupportedEvents, EventTypeName>,
isBackgroundHandler = false,
) {
for (const eventTypeName of eventTypeNames) {
this.on(eventTypeName, handler, isBackgroundHandler)
}
}
/**
* Register handler for all events supported by the emitter
*/
public onAny(handler: AnyEventHandler<SupportedEvents>, isBackgroundHandler = false) {
this.onMany(Array.from(this.eventRegistry.supportedEventTypes), handler, isBackgroundHandler)
}
private async handleEvent<SupportedEvent extends SupportedEvents[number]>(
event: CommonEventDefinitionConsumerSchemaType<SupportedEvent>,
): Promise<void> {
const eventHandlers = this.eventHandlerMap.get(event.type)
if (!eventHandlers) return
try {
for (const handler of eventHandlers.foreground) {
await this.executeEventHandler(event, handler, false)
}
} finally {
const bgPromise = Promise.all(
eventHandlers.background.map((handler) => this.executeEventHandler(event, handler, true)),
).then(() => {
this.inProgressBackgroundHandlerByEventId.delete(event.id)
if (!this._handlerSpy) return
this._handlerSpy.addProcessedMessage(
{
// @ts-ignore
message: event,
processingResult: 'consumed',
},
event.id,
)
})
this.inProgressBackgroundHandlerByEventId.set(event.id, bgPromise)
}
}
private async executeEventHandler<SupportedEvent extends SupportedEvents[number]>(
event: CommonEventDefinitionConsumerSchemaType<SupportedEvent>,
handler: EventHandler<CommonEventDefinitionConsumerSchemaType<SupportedEvent>>,
isBackgroundHandler: boolean,
) {
const transactionId = randomUUID()
let isSuccessful = false
try {
this.transactionObservabilityManager?.startWithGroup(
this.buildTransactionKey(event, handler, isBackgroundHandler),
transactionId,
event.type,
)
await handler.handleEvent(event)
isSuccessful = true
} catch (error) {
if (!isBackgroundHandler) throw error
const context = {
event: JSON.stringify(event),
eventHandlerId: handler.eventHandlerId,
'x-request-id': event.metadata?.correlationId,
}
this.logger.error({
...resolveGlobalErrorLogObject(error),
...context,
})
// biome-ignore lint/suspicious/noExplicitAny: TODO: improve error type
this.errorReporter?.report({ error: error as any, context })
} finally {
this.transactionObservabilityManager?.stop(transactionId, isSuccessful)
}
}
private buildTransactionKey<SupportedEvent extends SupportedEvents[number]>(
event: CommonEventDefinitionPublisherSchemaType<SupportedEvent>,
handler: EventHandler<CommonEventDefinitionPublisherSchemaType<SupportedEvent>>,
isBackgroundHandler: boolean,
): string {
return `${isBackgroundHandler ? 'bg' : 'fg'}_event_listener:${event.type}:${handler.eventHandlerId}`
}
}