Skip to content

Commit 8777e77

Browse files
committed
chore(grouper): add counters to the grouper worker
1 parent c4cc417 commit 8777e77

3 files changed

Lines changed: 697 additions & 1 deletion

File tree

workers/grouper/src/index.ts

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import { rightTrim } from '../../../lib/utils/string';
2626
import { hasValue } from '../../../lib/utils/hasValue';
2727
/* eslint-disable-next-line no-unused-vars */
2828
import { memoize } from '../../../lib/memoize';
29+
import TimeMs from '../../../lib/utils/time';
2930

3031
/**
3132
* eslint does not count decorators as a variable usage
@@ -268,6 +269,186 @@ export default class GrouperWorker extends Worker {
268269
});
269270
}
270271
}
272+
273+
await this.incrementRateLimitCounter(task.projectId);
274+
await this.recordProjectMetrics(task.projectId, 'events-stored');
275+
}
276+
277+
/**
278+
* Build RedisTimeSeries key for project metrics.
279+
*
280+
* @param projectId - id of the project
281+
* @param metricType - metric type identifier
282+
* @param granularity - time granularity
283+
*/
284+
private getTimeSeriesKey(
285+
projectId: string,
286+
metricType: string,
287+
granularity: 'minutely' | 'hourly' | 'daily'
288+
): string {
289+
return `ts:project-${metricType}:${projectId}:${granularity}`;
290+
}
291+
292+
/**
293+
* Record project metrics to Redis TimeSeries.
294+
*
295+
* @param projectId - id of the project
296+
* @param metricType - metric type identifier
297+
*/
298+
private async recordProjectMetrics(projectId: string, metricType: string): Promise<void> {
299+
const minutelyKey = this.getTimeSeriesKey(projectId, metricType, 'minutely');
300+
const hourlyKey = this.getTimeSeriesKey(projectId, metricType, 'hourly');
301+
const dailyKey = this.getTimeSeriesKey(projectId, metricType, 'daily');
302+
303+
const labels: Record<string, string> = {
304+
type: 'error',
305+
status: metricType,
306+
project: projectId,
307+
};
308+
309+
const series = [
310+
{ key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY },
311+
{ key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK },
312+
{ key: dailyKey, label: 'daily', retentionMs: 90 * TimeMs.DAY },
313+
];
314+
315+
for (const { key, label, retentionMs } of series) {
316+
try {
317+
await this.redis.safeTsAdd(key, 1, labels, retentionMs);
318+
} catch (error) {
319+
this.logger.error(`Failed to add ${label} TS for ${metricType}`, error);
320+
}
321+
}
322+
}
323+
324+
/**
325+
* Increment rate limit counters for the project.
326+
*
327+
* @param projectId - id of the project
328+
*/
329+
private async incrementRateLimitCounter(projectId: string): Promise<void> {
330+
try {
331+
const settings = await this.getProjectRateLimitSettings(projectId);
332+
333+
if (!settings) {
334+
return;
335+
}
336+
337+
await this.redis.incrementRateLimitCounterForCurrentEvent(
338+
projectId,
339+
settings.eventsPeriod,
340+
settings.eventsLimit
341+
);
342+
} catch (error) {
343+
this.logger.error(`Failed to increment rate limit counter for project ${projectId}`, error);
344+
}
345+
}
346+
347+
/**
348+
* Fetch and normalize rate limit settings
349+
* Rate limit settings could appear in tarifPlan, workspace and project.
350+
* All rateLimits have different priority.
351+
*
352+
* @param projectId - id of the project
353+
*/
354+
@memoize({ max: 200, ttl: MEMOIZATION_TTL, strategy: 'concat', skipCache: [null] })
355+
private async getProjectRateLimitSettings(projectId: string): Promise<{ eventsLimit: number; eventsPeriod: number } | null> {
356+
if (!projectId || !mongodb.ObjectID.isValid(projectId)) {
357+
return null;
358+
}
359+
360+
const accountsDb = this.accountsDb.getConnection();
361+
362+
/**
363+
* Fetch project from the db
364+
*/
365+
const project = await accountsDb
366+
.collection('projects')
367+
.findOne(
368+
{ _id: new mongodb.ObjectId(projectId) },
369+
{ projection: { rateLimitSettings: 1, workspaceId: 1 } }
370+
);
371+
372+
if (!project) {
373+
return null;
374+
}
375+
376+
const projectRateLimitSettings = project.rateLimitSettings as { N: number, T: number };
377+
const workspaceId = new mongodb.ObjectID(project.workspaceId);
378+
379+
let planRateLimitSettings: { N: number, T: number};
380+
let workspaceRateLimitSettings: { N: number, T: number};
381+
382+
/**
383+
* Fetch workspace from the db
384+
*/
385+
if (workspaceId) {
386+
const workspace = await accountsDb
387+
.collection('workspaces')
388+
.findOne(
389+
{ _id: workspaceId },
390+
{ projection: { rateLimitSettings: 1, tariffPlanId: 1 } }
391+
);
392+
393+
workspaceRateLimitSettings = workspace?.rateLimitSettings as { N: number, T: number };
394+
395+
const planId = new mongodb.ObjectId(workspace?.tariffPlanId);
396+
397+
/**
398+
* Tarif plan from the db
399+
*/
400+
if (planId) {
401+
const plan = await accountsDb
402+
.collection('plans')
403+
.findOne(
404+
{ _id: planId },
405+
{ projection: { rateLimitSettings: 1 } }
406+
);
407+
408+
planRateLimitSettings = plan?.rateLimitSettings;
409+
}
410+
}
411+
412+
return this.normalizeRateLimitSettings(
413+
planRateLimitSettings,
414+
workspaceRateLimitSettings,
415+
projectRateLimitSettings
416+
);
417+
}
418+
419+
/**
420+
* Normalize rate limit settings shape from database.
421+
*
422+
* @param rateLimitLayers - raw settings documents in priority order
423+
*/
424+
private normalizeRateLimitSettings(
425+
...rateLimitLayers: { N: number, T: number }[]
426+
): { eventsLimit: number; eventsPeriod: number } | null {
427+
let eventsLimit = 0;
428+
let eventsPeriod = 0;
429+
430+
for (const layer of rateLimitLayers) {
431+
if (!layer) {
432+
continue;
433+
}
434+
435+
const limit = layer.N as number;
436+
const period = layer.T as number;
437+
438+
if (limit !== undefined && limit > 0) {
439+
eventsLimit = limit;
440+
}
441+
442+
if (period !== undefined && period > 0) {
443+
eventsPeriod = period;
444+
}
445+
}
446+
447+
if (eventsLimit <= 0 || eventsPeriod <= 0) {
448+
return null;
449+
}
450+
451+
return { eventsLimit, eventsPeriod };
271452
}
272453

273454
/**

0 commit comments

Comments
 (0)