-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathAliEcsSynchronizer.js
More file actions
120 lines (110 loc) · 4.32 KB
/
AliEcsSynchronizer.js
File metadata and controls
120 lines (110 loc) · 4.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/
import { AliEcsEventMessagesConsumer, LogManager } from '@aliceo2/web-ui';
import { EmitterKeys } from './../../../common/library/enums/emitterKeys.enum.js';
import { ServiceStatus } from '../../../common/library/enums/Status/serviceStatus.enum.js';
const LOG_FACILITY = `${process.env.npm_config_log_label ?? 'qcg'}/ecs-synchronizer`;
const RUN_TOPICS = ['aliecs.run'];
/**
* @type {RunEvent}
* @property {number} runNumber - The run number associated with the event.
* @property {Transition} transition - The type of transition (e.g., START_ACTIVITY, END_ACTIVITY).
* @property {TransitionStatus} transitionStatus - The status of the transition (e.g., DONE_OK, DONE_ERROR).
*/
/**
* Service for processing events sent via Kafka from AliECS with proto objects
*/
export class AliEcsSynchronizer {
/**
* Constructor
* @param {import('kafkajs').Kafka} kafkaClient - configured kafka client
* @param {KafkaConfiguration.consumerGroups} consumerGroups - consumer groups to be used for various topics
* @param {EventEmitter} eventEmitter - event emitter to be used to emit events when new data is available
* @param {class} ConsumerClass - class to be used for creating the consumer, defaults to AliEcsEventMessagesConsumer
*/
constructor(kafkaClient, consumerGroups, eventEmitter, ConsumerClass = AliEcsEventMessagesConsumer) {
this._logger = LogManager.getLogger(LOG_FACILITY);
this._eventEmitter = eventEmitter;
this._ecsRunConsumer = new ConsumerClass(
kafkaClient,
consumerGroups.QCG_RUN,
RUN_TOPICS,
);
this._ecsRunConsumer.onMessageReceived(this._onRunMessage.bind(this));
this._status = ServiceStatus.NOT_ASKED;
this._extraInfo = {};
}
/**
* Start the synchronization process and listen to events from various topics via their consumers
* @returns {Promise<void>}
*/
async start() {
this._logger.infoMessage('Starting to consume AliECS messages for topics:');
this._status = ServiceStatus.ERROR;
this._extraInfo = {
// KafkaConsumer is currently not supporting "active" status checking [OGUI-1872]
message: 'Kafka is configured but the service has not started yet',
};
try {
await this._ecsRunConsumer.start();
this._status = ServiceStatus.SUCCESS;
} catch (error) {
this._logger.errorMessage(`Error when starting ECS run consumer: ${error.message}\n${error.stack}`);
this._status = ServiceStatus.ERROR;
this._extraInfo = {
message: error.message,
};
}
}
/**
* Callback for when a message is received on the run topic
* @param {events.proto.Event} eventMessage - message received on run topic
* @returns {void}
*/
async _onRunMessage(eventMessage) {
/**
* @param {RunEvent} - eventMessage - message received on run topic
*/
const { runEvent, timestamp } = eventMessage;
if (!runEvent) {
this._logger.warnMessage('Received run message on run topic without runEvent field');
return;
} if (!runEvent.runNumber) {
this._logger.warnMessage('Received run message on run topic without runEvent.runNumber field');
} else if (!runEvent.transition) {
this._logger.warnMessage('Received run message on run topic without runEvent.transition field');
} else {
const { runNumber, transition, transitionStatus } = runEvent;
this._eventEmitter.emit(EmitterKeys.RUN_TRACK, {
runNumber,
transitionStatus,
transition,
timestamp: timestamp.toNumber(),
});
}
}
/**
* Returns the current kafka service status
* @returns {ServiceStatus} - The kafka service status
*/
get status() {
return this._status;
}
/**
* Returns extra information about the current kafka service
* @returns {object} - The extra information of the kafka service
*/
get extraInfo() {
return this._extraInfo;
}
}