Skip to content

Commit 7a3c1a6

Browse files
authored
fix(query-orchestrator): Queue - improve performance for high concurrency (cube-js#9705)
In production clusters where it contains N nodes, it shares the same concurrency. It leads to a point where every node tries to pick up jobs as much as concurrency is defined for the whole cluster. To minimize he effect of competition between nodes, it's important to reduce the number of tries to process by active jobs.
1 parent 098d185 commit 7a3c1a6

4 files changed

Lines changed: 63 additions & 25 deletions

File tree

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,31 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
136136
}
137137

138138
public async getActiveAndToProcess(): Promise<GetActiveAndToProcessResponse> {
139+
const active: QueryKeysTuple[] = [];
140+
const toProcess: QueryKeysTuple[] = [];
141+
142+
const rows = await this.driver.query<CubeStoreListResponse>('QUEUE LIST ?', [
143+
this.options.redisQueuePrefix
144+
]);
145+
if (rows.length) {
146+
for (const row of rows) {
147+
if (row.status === 'active') {
148+
active.push([
149+
row.id as QueryKeyHash,
150+
row.queue_id ? parseInt(row.queue_id, 10) : null,
151+
]);
152+
} else {
153+
toProcess.push([
154+
row.id as QueryKeyHash,
155+
row.queue_id ? parseInt(row.queue_id, 10) : null,
156+
]);
157+
}
158+
}
159+
}
160+
139161
return [
140-
// We don't return active queries, because it's useless
141-
// There is only one place where it's used, and it's QueryQueue.reconcileQueueImpl
142-
// Cube Store provides strict guarantees that queue item cannot be active & pending in the same time
143-
[],
144-
await this.getToProcessQueries()
162+
active,
163+
toProcess,
145164
];
146165
}
147166

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,16 @@ export class QueryQueue {
543543
}
544544
}));
545545

546-
const [_active, toProcess] = await queueConnection.getActiveAndToProcess();
546+
const [active, toProcess] = await queueConnection.getActiveAndToProcess();
547+
548+
/**
549+
* Important notice: Concurrency configuration works per a specific queue, not per node.
550+
*
551+
* In production clusters where it contains N nodes, it shares the same concurrency. It leads to a point
552+
* where every node tries to pick up jobs as much as concurrency is defined for the whole cluster. To minimize
553+
* the effect of competition between nodes, it's important to reduce the number of tries to process by active jobs.
554+
*/
555+
const toProcessLimit = active.length >= this.concurrency ? 1 : this.concurrency - active.length;
547556

548557
const tasks = toProcess
549558
.filter(([queryKey, _queueId]) => {
@@ -559,7 +568,7 @@ export class QueryQueue {
559568
return false;
560569
}
561570
})
562-
.slice(0, this.concurrency)
571+
.slice(0, toProcessLimit)
563572
.map(([queryKey, queueId]) => this.sendProcessMessageFn(queryKey, queueId));
564573

565574
await Promise.all(tasks);

packages/cubejs-query-orchestrator/test/benchmarks/QueueBench.abstract.ts

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,10 @@ export function QueryQueueBenchmark(name: string, options: QueryQueueTestOptions
138138
} else {
139139
counters.events[event] = 1;
140140
}
141+
142+
if (event.includes('error')) {
143+
console.log(event, _params);
144+
}
141145
},
142146
queueDriverFactory,
143147
...options
@@ -159,7 +163,8 @@ export function QueryQueueBenchmark(name: string, options: QueryQueueTestOptions
159163
const progressIntervalId = setInterval(() => {
160164
console.log('running', {
161165
...counters,
162-
processingPromisses: processingPromisses.length
166+
processingPromisses: processingPromisses.length,
167+
benchSettings,
163168
});
164169
}, 1000);
165170

@@ -177,18 +182,25 @@ export function QueryQueueBenchmark(name: string, options: QueryQueueTestOptions
177182

178183
const queueId = crypto.randomBytes(12).toString('hex');
179184
const running = (async () => {
180-
await queue.executeInQueue('query', queueId, {
181-
// eslint-disable-next-line no-bitwise
182-
payload: 'a'.repeat(benchSettings.queuePayloadSize)
183-
}, 1, {
184-
stageQueryKey: 1,
185-
requestId: 'request-id',
186-
spanId: 'span-id'
187-
});
185+
try {
186+
await queue.executeInQueue('query', queueId, {
187+
// eslint-disable-next-line no-bitwise
188+
payload: {
189+
large_str: 'a'.repeat(benchSettings.queuePayloadSize)
190+
},
191+
orphanedTimeout: 120
192+
}, 1, {
193+
stageQueryKey: 1,
194+
requestId: 'request-id',
195+
spanId: 'span-id'
196+
});
197+
} catch (e) {
198+
console.error(e);
199+
}
188200

189201
counters.queueResolved++;
190202

191-
// loosing memory for result
203+
// losing memory for a result
192204
return null;
193205
})();
194206

packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -244,11 +244,9 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions) =>
244244
expect(result).toEqual(['10', '21', '32', '43']);
245245
});
246246

247-
const nonCubeStoreTest = options.cacheAndQueueDriver !== 'cubestore' ? test : xtest;
247+
const onlyLocalTest = options.cacheAndQueueDriver !== 'cubestore' ? test : xtest;
248248

249-
// this works with cube store, but there is an issue with timings
250-
// TODO(ovr): fix me
251-
nonCubeStoreTest('orphaned', async () => {
249+
test('orphaned', async () => {
252250
// recover if previous test broken something
253251
for (let i = 1; i <= 4; i++) {
254252
await queue.executeInQueue('delay', `11${i}`, { delay: 50, result: `${i}` }, 0);
@@ -259,7 +257,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions) =>
259257

260258
let result = queue.executeInQueue('delay', '111', { delay: 800, result: '1' }, 0);
261259
delayFn(null, 50).then(() => queue.executeInQueue('delay', '112', { delay: 800, result: '2' }, 0)).catch(e => e);
262-
delayFn(null, 75).then(() => queue.executeInQueue('delay', '113', { delay: 500, result: '3' }, 0)).catch(e => e);
260+
delayFn(null, 75).then(() => queue.executeInQueue('delay', '113', { delay: 800, result: '3' }, 0)).catch(e => e);
263261
// orphaned timeout should be applied
264262
delayFn(null, 100).then(() => queue.executeInQueue('delay', '114', { delay: 900, result: '4' }, 0)).catch(e => e);
265263

@@ -269,7 +267,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions) =>
269267
result = await queue.executeInQueue('delay', '113', { delay: 900, result: '3' }, 0);
270268
expect(result).toBe('32');
271269

272-
await delayFn(null, 200);
270+
await delayFn(null, 500);
273271
expect(cancelledQuery).toBe('114');
274272
await queue.executeInQueue('delay', '114', { delay: 50, result: '4' }, 0);
275273
});
@@ -368,7 +366,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions) =>
368366
expect(result).toBe('select * from bar');
369367
});
370368

371-
nonCubeStoreTest('queue driver lock obtain race condition', async () => {
369+
onlyLocalTest('queue driver lock obtain race condition', async () => {
372370
const redisClient: any = await queue.queueDriver.createConnection();
373371
const redisClient2: any = await queue.queueDriver.createConnection();
374372
const priority = 10;
@@ -423,7 +421,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions) =>
423421
await queue.queueDriver.release(redisClient2);
424422
});
425423

426-
nonCubeStoreTest('activated but lock is not acquired', async () => {
424+
onlyLocalTest('activated but lock is not acquired', async () => {
427425
const redisClient = await queue.queueDriver.createConnection();
428426
const redisClient2 = await queue.queueDriver.createConnection();
429427
const priority = 10;

0 commit comments

Comments
 (0)