Skip to content

Commit 755880b

Browse files
authored
[OGUI-1850] Server should check if there are ongoing runs at server start (#3285)
* add fetching of ongoing runs at server start * add tests for fetching ongoing runs at server start * fix: add try catch for bookkeeping service connect
1 parent c5cf947 commit 755880b

6 files changed

Lines changed: 162 additions & 12 deletions

File tree

QualityControl/lib/QCModel.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ export const setupQcModel = async (ws, eventEmitter) => {
117117
const intervalsService = new IntervalsService();
118118

119119
const bookkeepingService = new BookkeepingService(config.bookkeeping);
120+
try {
121+
await bookkeepingService.connect();
122+
} catch (error) {
123+
logger.errorMessage(`Failed connecting to Bookkeeping: ${error.message || error}`);
124+
}
125+
120126
const filterService = new FilterService(bookkeepingService, config);
121127
const runModeService = new RunModeService(config.bookkeeping, bookkeepingService, ccdbService, eventEmitter, ws);
122128
const objectController = new ObjectController(qcObjectService, runModeService, qcdbDownloadService);

QualityControl/lib/services/BookkeepingService.js

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ const GET_DATA_PASSES_PATH = '/api/dataPasses';
2525

2626
const LOG_FACILITY = `${process.env.npm_config_log_label ?? 'qcg'}/bkp-service`;
2727

28+
const RECENT_RUN_THRESHOLD_MS = 1 * 24 * 60 * 60 * 1000; // -1 day in milliseconds
29+
2830
/**
2931
* BookkeepingService class to be used to retrieve data from Bookkeeping
3032
*/
@@ -75,12 +77,12 @@ export class BookkeepingService {
7577
*/
7678
async connect() {
7779
if (!this.validateConfig()) {
78-
this._logger.infoMessage(`Bookkeeping service will not be used. Reason: ${this.error}`);
80+
this._logger.warnMessage(`Bookkeeping service will not be used. Reason: ${this.error}`);
7981
return;
8082
}
8183
this.active = await this.simulateConnection();
8284
if (!this.active) {
83-
this._logger.infoMessage(`Bookkeeping service will not be used. Reason: ${this.error}`);
85+
this._logger.warnMessage(`Bookkeeping service will not be used. Reason: ${this.error}`);
8486
}
8587
}
8688

@@ -200,6 +202,44 @@ export class BookkeepingService {
200202
}
201203
}
202204

205+
/**
206+
* Retrieves runs that are currently ongoing (started within the last \@see {RECENT_RUN_THRESHOLD_MS}
207+
* but have not yet ended).
208+
* @returns {Promise<Array<object>|undefined>} A promise that resolves to an array of run objects,
209+
* or undefined if the service is inactive, no data is found, or an error occurs
210+
*/
211+
async retrieveOngoingRuns() {
212+
if (!this.active) {
213+
return;
214+
}
215+
216+
const timestamp = Date.now() - RECENT_RUN_THRESHOLD_MS;
217+
218+
const queryParams = `page[offset]=0&page[limit]=20&filter[o2start][from]=${timestamp}&token=${this._token}`;
219+
220+
try {
221+
const { data } = await httpGetJson(
222+
this._hostname,
223+
this._port,
224+
`${GET_RUN_PATH}?${queryParams}`,
225+
{
226+
protocol: this._protocol,
227+
rejectUnauthorized: false,
228+
},
229+
);
230+
231+
if (data.length === 0) {
232+
return [];
233+
}
234+
235+
return data.filter((run) => !run.timeO2End);
236+
} catch (error) {
237+
const msg = error?.message ?? String(error);
238+
this._logger.errorMessage(msg);
239+
return;
240+
}
241+
}
242+
203243
/**
204244
* Retrieves the information about the detectors from the Bookkeeping service.
205245
* @returns {Promise<object[]>} Array of detector summaries.

QualityControl/lib/services/FilterService.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ export class FilterService {
4949
* @returns {Promise<void>} - resolves when the filter service is initialized
5050
*/
5151
async initFilters() {
52-
await this._bookkeepingService.connect();
5352
await this.getRunTypes();
5453
await this._initializeDetectors();
5554
await this.getDataPasses();

QualityControl/lib/services/RunModeService.js

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export class RunModeService {
5353

5454
this._logger = LogManager.getLogger(`${process.env.npm_config_log_label ?? 'qcg'}/run-mode-service`);
5555
this._listenToEvents();
56+
this._fetchOnGoingRunsAtStart();
5657
}
5758

5859
/**
@@ -115,6 +116,22 @@ export class RunModeService {
115116
this._eventEmitter.on(EmitterKeys.RUN_TRACK, (runEvent) => this._onRunTrackEvent(runEvent));
116117
}
117118

119+
/**
120+
* Fetches the already ongoing runs from Bookkeeping service, becaue Kafka only sends an event at START of run.
121+
* @returns {Promise<void>}
122+
*/
123+
async _fetchOnGoingRunsAtStart() {
124+
const ongoingRuns = await this._bookkeepingService.retrieveOngoingRuns();
125+
if (!ongoingRuns || ongoingRuns.length === 0) {
126+
this._logger.infoMessage('No ongoing runs detected at server start');
127+
return;
128+
}
129+
130+
const runNumbers = ongoingRuns.map(({ runNumber }) => runNumber);
131+
const tasks = runNumbers.map(async (runNumber) => await this._initializeRunData(runNumber));
132+
await Promise.all(tasks);
133+
}
134+
118135
/**
119136
* Handles run track events emitted by the event emitter.
120137
* Updates the ongoing runs cache based on the transition type.
@@ -125,15 +142,7 @@ export class RunModeService {
125142
*/
126143
async _onRunTrackEvent({ runNumber, transition }) {
127144
if (transition === Transition.START_ACTIVITY) {
128-
let rawPaths = [];
129-
try {
130-
rawPaths = await this._dataService.getObjectsLatestVersionList({
131-
filters: { RunNumber: runNumber },
132-
});
133-
} catch (error) {
134-
this._logger.errorMessage(`Error fetching initial paths for run ${runNumber}: ${error.message || error}`);
135-
}
136-
this._ongoingRuns.set(runNumber, rawPaths);
145+
await this._initializeRunData(runNumber);
137146

138147
const wsMessage = new WebSocketMessage();
139148
wsMessage.command = `${EmitterKeys.RUN_TRACK}:${Transition.START_ACTIVITY}`;
@@ -146,6 +155,23 @@ export class RunModeService {
146155
}
147156
}
148157

158+
/**
159+
* Fetches the latest object versions for each run and populates the local `ongoingRuns` map.
160+
* @param {number} runNumber - The run number associated with the event.
161+
* @returns {Promise<void>}
162+
*/
163+
async _initializeRunData(runNumber) {
164+
let rawPaths = [];
165+
try {
166+
rawPaths = await this._dataService.getObjectsLatestVersionList({
167+
filters: { RunNumber: runNumber },
168+
});
169+
} catch (error) {
170+
this._logger.errorMessage(`Error fetching initial paths for run ${runNumber}: ${error.message || error}`);
171+
}
172+
this._ongoingRuns.set(runNumber, rawPaths);
173+
}
174+
149175
/**
150176
* Returns the last time the ongoing runs cache was refreshed.
151177
* @returns {number} - Timestamp of the last refresh. (ms)

QualityControl/test/lib/services/BookkeepingService.test.js

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,5 +529,68 @@ export const bookkeepingServiceTestSuite = async () => {
529529
strictEqual(result.length, 0);
530530
});
531531
});
532+
suite('`retrieveOngoingRuns()` tests', () => {
533+
let bkpService = null;
534+
const runsPathPattern = new RegExp(`/api/runs\\?.*token=${VALID_CONFIG.bookkeeping.token}`);
535+
536+
beforeEach(() => {
537+
bkpService = new BookkeepingService(VALID_CONFIG.bookkeeping);
538+
bkpService.validateConfig();
539+
bkpService.active = true;
540+
});
541+
542+
afterEach(() => {
543+
nock.cleanAll();
544+
});
545+
546+
test('should return all already ongoing runs', async () => {
547+
const mockResponse = {
548+
data: [
549+
{
550+
runNumber: 1,
551+
timeO2End: undefined,
552+
},
553+
{
554+
runNumber: 2,
555+
timeO2End: 1,
556+
},
557+
{
558+
runNumber: 3,
559+
timeO2End: undefined,
560+
},
561+
],
562+
};
563+
564+
nock(VALID_CONFIG.bookkeeping.url).get(runsPathPattern).reply(200, mockResponse);
565+
const ongoingRuns = await bkpService.retrieveOngoingRuns();
566+
strictEqual(ongoingRuns.length, 2);
567+
deepStrictEqual(ongoingRuns.map((run) => run.runNumber), [1, 3]);
568+
});
569+
570+
test('should return an empty array when data when no runs are retrieved', async () => {
571+
const mockResponse = {
572+
data: [],
573+
};
574+
575+
nock(VALID_CONFIG.bookkeeping.url).get(runsPathPattern).reply(200, mockResponse);
576+
const ongoingRuns = await bkpService.retrieveOngoingRuns();
577+
strictEqual(ongoingRuns.length, 0);
578+
});
579+
580+
test('should return an empty array when all runs have an end time specified', async () => {
581+
const mockResponse = {
582+
data: [
583+
{
584+
runNumber: 99,
585+
timeO2End: 1,
586+
},
587+
],
588+
};
589+
590+
nock(VALID_CONFIG.bookkeeping.url).get(runsPathPattern).reply(200, mockResponse);
591+
const ongoingRuns = await bkpService.retrieveOngoingRuns();
592+
strictEqual(ongoingRuns.length, 0);
593+
});
594+
});
532595
});
533596
};

QualityControl/test/lib/services/RunModeService.test.js

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ export const runModeServiceTestSuite = async () => {
3434
beforeEach(() => {
3535
bookkeepingService = {
3636
retrieveRunInformation: sinon.stub(),
37+
retrieveOngoingRuns: sinon.stub(),
3738
};
3839

3940
dataService = {
@@ -134,6 +135,21 @@ export const runModeServiceTestSuite = async () => {
134135
});
135136
});
136137

138+
suite('`_fetchOnGoingRunsAtStart` tests', () => {
139+
test('should populate ongoing runs on startup', async () => {
140+
const runNumber = 1;
141+
const mockRun = { runNumber };
142+
const mockPaths = [{ path: '/run/path1' }];
143+
144+
bookkeepingService.retrieveOngoingRuns.resolves([mockRun]);
145+
146+
dataService.getObjectsLatestVersionList.resolves(mockPaths);
147+
148+
await runModeService._fetchOnGoingRunsAtStart();
149+
strictEqual(runModeService._ongoingRuns.has(runNumber), true);
150+
});
151+
});
152+
137153
suite('_onRunTrackEvent - test suite', () => {
138154
test('should correctly parse event to RUN_TRACK and update ongoing runs map', async () => {
139155
const runEvent = { runNumber: 1234, transition: 'START_ACTIVITY' };

0 commit comments

Comments
 (0)