-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathEnvironment.service.js
More file actions
383 lines (362 loc) · 16.1 KB
/
Environment.service.js
File metadata and controls
383 lines (362 loc) · 16.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
/**
* @license
* Copyright 2019-2020 CERN and copyright holders of ALICE O2.
* See http://alice-o2.web.cern.ch/copyright for details of the copyright holders.
* All rights not expressly granted are reserved.
*
* This software is distributed under the terms of the GNU General Public
* License v3 (GPL Version 3), copied verbatim in the file "COPYING".
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/
const {LogManager,grpcErrorToNativeError, NotFoundError} = require('@aliceo2/web-ui');
const { CacheKeys } = require('./../common/cacheKeys.enum.js');
const { BroadcastKeys: { ENVIRONMENTS_OVERVIEW } } = require('./../common/broadcastKeys.enum');
const EnvironmentInfoAdapter = require('./../adapters/EnvironmentInfoAdapter.js');
const {EnvironmentTransitionResultAdapter} = require('./../adapters/EnvironmentTransitionResultAdapter.js');
/**
* EnvironmentService class to be used to retrieve data from AliEcs Core via the gRPC Control client
*/
class EnvironmentService {
/**
* Constructor for inserting dependencies needed to retrieve environment data
* @param {GrpcServiceClient} coreGrpc
* @param {ApricotProxy} apricotGrpc
* @param {CacheService} cacheService - to use for updating information on environments
* @param {BroadcastService} broadcastService - to use for broadcasting information
* @param {EnvironmentCacheService} environmentCacheService - to use for caching environments
*/
constructor(coreGrpc, apricotGrpc, cacheService, broadcastService, environmentCacheService) {
/**
* @type {GrpcServiceClient}
*/
this._coreGrpc = coreGrpc;
/**
* @type {ApricotProxy}
*/
this._apricotGrpc = apricotGrpc;
/**
* @type {CacheService}
*/
this._cacheService = cacheService;
/**
* @type {BroadcastService}
*/
this._broadcastService = broadcastService;
/**
* @type {EnvironmentCacheService}
*/
this._environmentCacheService = environmentCacheService;
this._logger = LogManager.getLogger(`${process.env.npm_config_log_label ?? 'cog'}/env-service`);
}
/**
* Method to retrieve all environments from AliECS Core via the gRPC Client and update the Cache
* @param {boolean} showTaskInfos - if true, will retrieve task information for each environment
* @param {boolean} shouldUpdateCache - if true, will update the cache with the retrieved environments
* @return {Promise.<EnvironmentInfo[], Error>} - if operation was a success or not
*/
async getEnvironments(showTaskInfos = false, shouldUpdateCache = false) {
let environments = [];
try {
({ environments } = await this._coreGrpc.GetEnvironments({ showTaskInfos }));
} catch (error) {
throw grpcErrorToNativeError(error);
}
try {
const activeEnvironmentList = [];
for (const { id } of environments) {
let environment;
try {
// Retrieving environments one by one is needed so that ODC devices tasks info is part of the payload
environment = await this.getEnvironment(id, '', false);
} catch (error) {
this._logger.errorMessage(`Failed to retrieve environment ${id}: ${error}`);
}
if (environment) {
if (shouldUpdateCache) {
this._environmentCacheService.addOrUpdateEnvironment(environment, false);
}
activeEnvironmentList.push(environment);
}
}
// Remove environments from cache that are not in the retrieved list and that are not in deploying state
// Environments that are `isDeploying` should not be removed. If deployment failed, ECS will delete it
// but we need to keep it until user acknowledges the failure
// and removes it from the cache manually
const cachedEnvironmentIds = [...this._environmentCacheService.environments.keys()];
for (const cachedEnvironmentId of cachedEnvironmentIds) {
if (!activeEnvironmentList.some((env) => env.id === cachedEnvironmentId)) {
const environmentPotentiallyToRemove = this._environmentCacheService.environments.get(cachedEnvironmentId);
if (environmentPotentiallyToRemove.isDeploying || environmentPotentiallyToRemove.deploymentError) {
// If the environment is deploying or has a deployment error, we still consider it active
// and we do not remove it from the cache
activeEnvironmentList.push(environmentPotentiallyToRemove);
} else {
this._environmentCacheService.removeEnvironmentById(cachedEnvironmentId);
}
}
}
this._broadcastService.broadcast(ENVIRONMENTS_OVERVIEW, [...this._environmentCacheService.environments.values()]);
return activeEnvironmentList;
} catch (error) {
this._logger.errorMessage(error);
}
}
/**
* Given an environment ID, use the gRPC client to retrieve needed information
* Parses the environment and prepares the information for GUI purposes
* @param {string} id - environment id as defined by AliECS Core
* @param {string} taskSource - Source of where to request tasks from: FLP, EPN, QC, TRG
* @return {EnvironmentInfo}
* @throws {Error}
*/
async getEnvironment(id, taskSource, retrieveEvents = true) {
let environment = undefined;
try {
const environmentResponse = await this._coreGrpc.GetEnvironment({ id });
environment = environmentResponse.environment ?? undefined;
} catch (error) {
throw grpcErrorToNativeError(error);
}
if (!environment) {
throw new NotFoundError(`Environment (id: ${id}) not found`);
}
const detectorsAll = this._apricotGrpc.detectors ?? [];
const hostsByDetector = this._apricotGrpc.hostsByDetector ?? {};
const environmentInfo = EnvironmentInfoAdapter.toEntity(
environment, taskSource, detectorsAll, hostsByDetector
);
if (retrieveEvents && this._environmentCacheService.environments.has(id)) {
const cachedEnvironment = this._environmentCacheService.environments.get(id);
environmentInfo.events = [...cachedEnvironment.events];
environmentInfo.isDeploying = cachedEnvironment.isDeploying;
environmentInfo.deploymentError = cachedEnvironment.deploymentError;
environmentInfo.firstTaskInError = cachedEnvironment.firstTaskInError;
}
return environmentInfo;
}
/**
* Given an environment ID and a transition type, use the gRPC client to perform the transition
* @param {String} id - environment id as defined by AliECS Core
* @param {EnvironmentTransitionType} transitionType - allowed transitions for an environment
* @param {User} requestUser - user that requested the transition
* @return {EnvironmentTransitionResult} - result of the environment transition
*/
async transitionEnvironment(id, transitionType, user) {
try {
const transitionedEnvironment = await this._coreGrpc.ControlEnvironment({
id, type: transitionType, requestUser: user.toEcsFormat()
});
return EnvironmentTransitionResultAdapter.toEntity(transitionedEnvironment);
} catch (error) {
throw grpcErrorToNativeError(error);
}
}
/**
* Given an environment ID and optional parameters, use the gRPC client to send a request to destroy an environment
* @param {String} id - environment id as defined by AliECS Core
* @param {{keepTasks: Boolean, allowInRunningState: Boolean, force: Boolean}} - options for destroying the environment
* @return {Promise.<{String}, Error>} - if operation was a success or not
*/
async destroyEnvironment(id, {keepTasks = false, allowInRunningState = false, force = false} = {}, user) {
try {
await this._coreGrpc.DestroyEnvironment({
id, keepTasks, allowInRunningState, force, requestUser: user.toEcsFormat()
});
return {id};
} catch (grpcError) {
throw grpcErrorToNativeError(grpcError);
}
}
/**
* Method to create a NewEnvironmentAsync request using the gRPC client.
* Service is considered low-level. It is assumed that the caller has already checked the validity of the parameters.
* @param {NewEnvironmentRequest - o2control.proto} request - partial request object with information needed to create the environment
* @param {string} request.workflowTemplate - name in format `repository/revision/template`
* @param {Object<string, string>} request.userVars - KV string pairs to define environment configuration
* @param {User} request.user - user that requested the environment creation
* @returns {Promise.<{EnvironmentInfo}, Error>} - if operation was a success ECS will return a partialEnvironmentInfo object
* @throws {Error} - if the operation failed
*/
async newEnvironmentAsync({ workflowTemplate, userVars, user, shouldAutoTransition = false, detectors }) {
let environment = undefined;
try {
({ environment } = await this._coreGrpc.NewEnvironmentAsync({
workflowTemplate,
vars: userVars,
autoTransition: shouldAutoTransition,
requestUser: user.toEcsFormat()
})
);
} catch (grpcError) {
throw grpcErrorToNativeError(grpcError);
}
const detectorsAll = this._apricotGrpc.detectors ?? [];
const hostsByDetector = this._apricotGrpc.hostsByDetector ?? {};
/**
* Transition is not yet started as per ECS, but we set the state to DEPLOYING to ensure that the UI
* is updated accordingly. The state will be updated once the environment is created and the transition
* is finished.
* @type {EnvironmentInfo}
* @property {string} currentTransition - the current transition of the environment
*/
environment.isDeploying = true;
/**
* As per ticket OCTRL-1045, ECS is not able to respond with the static information yet. Thus, the GUI
* should keep track of the user that requested the environment and the detectors included in the deployment
* to be able to show this information in the UI.
*/
if (!environment?.userVars?.last_request_user) {
if (!environment.userVars) {
environment.userVars = {};
}
environment.userVars.last_request_user = {
externalId: user.personid,
name: user.username,
};
}
if (!environment.rootRole) {
environment.rootRole = workflowTemplate;
}
if (!environment.includedDetectors || environment.includedDetectors.length === 0) {
environment.includedDetectors = detectors ?? [];
}
const environmentInfo = EnvironmentInfoAdapter.toEntity(environment, '', detectorsAll, hostsByDetector);
this._environmentCacheService.addOrUpdateEnvironment(environmentInfo, true);
return environmentInfo;
}
/**
* Given the workflowTemplate and variables configuration, it will generate a unique string and send all to AliECS to create a
* new auto transitioning environment
* @param {String} workflowTemplate - name in format `repository/revision/template`
* @param {Object<String, String>} vars - KV string pairs to define environment configuration
* @param {String} detector - on which the environment is deployed
* @param {String} runType - for which the environment is deployed
* @return {AutoEnvironmentDeployment} - if environment request was successfully sent
*/
async newAutoEnvironment(workflowTemplate, vars, detector, runType, user) {
const channelIdString = (Math.floor(Math.random() * (999999 - 100000) + 100000)).toString();
const autoEnvironment = {
channelIdString,
inProgress: true,
detector,
runType,
events: [
{
type: 'ENVIRONMENT',
payload: {
id: '-',
message: 'request was sent to AliECS',
at: Date.now(),
}
}
],
};
let calibrationRunsRequests = this._cacheService.getByKey(CacheKeys.CALIBRATION_RUNS_REQUESTS);
if (!calibrationRunsRequests) {
calibrationRunsRequests = {};
}
if (!calibrationRunsRequests[detector]) {
calibrationRunsRequests[detector] = {};
}
if (!calibrationRunsRequests[detector[runType]]) {
calibrationRunsRequests[detector][runType] = autoEnvironment;
}
this._cacheService.updateByKeyAndBroadcast(CacheKeys.CALIBRATION_RUNS_REQUESTS, calibrationRunsRequests);
this._broadcastService.broadcast(CacheKeys.CALIBRATION_RUNS_REQUESTS, calibrationRunsRequests[detector][runType]);
const subscribeChannel = this._coreGrpc.client.Subscribe({id: channelIdString});
subscribeChannel.on('data', (data) => this._onData(data, detector, runType));
subscribeChannel.on('error', (error) => this._onError(error, detector, runType));
subscribeChannel.on('end', () => this._onEnd(detector, runType));
this._coreGrpc.NewAutoEnvironment({
vars,
workflowTemplate,
id: channelIdString,
requestUser: user.toEcsFormat()
});
return autoEnvironment;
}
/**
* Method to parse incoming messages from stream channel
* @param {Event} event - AliECS Event (proto)
* @param {String} detector - detector name for which the event was triggered
* @param {String} runType - run type for which the event was triggered
* @return {void}
*/
_onData(event, detector, runType) {
const events = [];
const {taskEvent, environmentEvent, timestamp = Date.now()} = event;
if (taskEvent && (taskEvent.state === 'ERROR' || taskEvent.status === 'TASK_FAILED')) {
events.push({
type: 'TASK',
payload: {
...taskEvent,
at: Number(timestamp),
message: 'Please ensure environment is killed before retrying',
}
});
} else if (environmentEvent) {
events.push({
type: 'ENVIRONMENT',
payload: {
...environmentEvent,
at: Number(timestamp),
}
});
}
if (events.length > 0) {
const calibrationRunsRequests = this._cacheService.getByKey(CacheKeys.CALIBRATION_RUNS_REQUESTS);
calibrationRunsRequests[detector][runType].events.push(...events);
this._cacheService.updateByKeyAndBroadcast(CacheKeys.CALIBRATION_RUNS_REQUESTS, calibrationRunsRequests);
this._broadcastService.broadcast(CacheKeys.CALIBRATION_RUNS_REQUESTS, calibrationRunsRequests[detector][runType]);
}
}
/**
* Method to be used in case of AliECS environment creation request error
* @param {Error} error - error encountered during the creation of environment
* @param {String} detector - detector name for which the event was triggered
* @param {String} runType - run type for which the event was triggered
* @return {void}
*/
_onError(error, detector, runType) {
const calibrationRunsRequests = this._cacheService.getByKey(CacheKeys.CALIBRATION_RUNS_REQUESTS);
calibrationRunsRequests[detector][runType].events.push({
type: 'ERROR',
payload: {
error,
at: Date.now()
}
});
calibrationRunsRequests[detector][runType].events.push({
type: 'ERROR',
payload: {
error: 'Please ensure environment is killed before retrying',
at: Date.now()
}
});
this._cacheService.updateByKeyAndBroadcast(CacheKeys.CALIBRATION_RUNS_REQUESTS, calibrationRunsRequests);
this._broadcastService.broadcast(CacheKeys.CALIBRATION_RUNS_REQUESTS, calibrationRunsRequests[detector][runType]);
}
/**
* Method to be used for when environment successfully finished transitioning
* @param {String} detector - detector name for which the event was triggered
* @param {String} runType - run type for which the event was triggered
* @return {void}
*/
_onEnd(detector, runType) {
const calibrationRunsRequests = this._cacheService.getByKey(CacheKeys.CALIBRATION_RUNS_REQUESTS);
calibrationRunsRequests[detector][runType].events.push({
type: 'ENVIRONMENT',
payload: {
at: Date.now(),
message: 'Stream has now ended'
}
});
calibrationRunsRequests[detector][runType].inProgress = false;
this._cacheService.updateByKeyAndBroadcast(CacheKeys.CALIBRATION_RUNS_REQUESTS, calibrationRunsRequests);
this._broadcastService.broadcast(CacheKeys.CALIBRATION_RUNS_REQUESTS, calibrationRunsRequests[detector][runType]);
}
}
module.exports = {EnvironmentService};