Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 147 additions & 74 deletions workers/grouper/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import { rightTrim } from '../../../lib/utils/string';
import { hasValue } from '../../../lib/utils/hasValue';
import GrouperMetrics from './metrics/grouperMetrics';
import GrouperMemoryMonitor from './metrics/memoryMonitor';
import { grouperMemoryConfig } from './metrics/config';
import SlowHandleDiagnostics, { SlowHandleSession } from './metrics/slowHandleDiagnostics';
import { grouperDiagnosticsConfig, grouperMemoryConfig } from './metrics/config';

/**
* eslint does not count decorators as a variable usage
Expand Down Expand Up @@ -101,6 +102,11 @@ export default class GrouperWorker extends Worker {
*/
private memoryMonitor = new GrouperMemoryMonitor(this.logger, grouperMemoryConfig);

/**
* Slow handle diagnostics helper.
*/
private slowHandleDiagnostics = new SlowHandleDiagnostics(this.logger, this.grouperMetrics, grouperDiagnosticsConfig);

/**
* Interval for periodic cache cleanup to prevent memory leaks from unbounded cache growth
*/
Expand Down Expand Up @@ -180,9 +186,14 @@ export default class GrouperWorker extends Worker {
* @param task - event to handle
*/
private async handleInternal(task: GroupWorkerTask<ErrorsCatcherType>): Promise<void> {
const taskPayloadSize = Buffer.byteLength(JSON.stringify(task.payload));
const session = this.slowHandleDiagnostics.startSession();
const taskPayloadSize = await session.measureStep('payloadSize', () => {
return Buffer.byteLength(JSON.stringify(task.payload));
});
const handledTasksCount = ++this.handledTasksCount;
const memoryBeforeHandle = process.memoryUsage();
let deltaSize = 0;
let eventType: 'new' | 'repeated' = 'new';

this.grouperMetrics.observePayloadSize(taskPayloadSize);
this.memoryMonitor.logBeforeHandle(memoryBeforeHandle, handledTasksCount, taskPayloadSize, task.projectId);
Expand All @@ -198,25 +209,29 @@ export default class GrouperWorker extends Worker {
};
}

let uniqueEventHash = await this.getUniqueEventHash(task);
let uniqueEventHash = await session.measureStep('hash', () => this.getUniqueEventHash(task));
let existedEvent: GroupedEventDBScheme;
let repetitionId = null;
let incrementDailyAffectedUsers = false;

/**
* Trim source code lines to prevent memory leaks
*/
this.trimSourceCodeLines(task.payload);
await session.measureStep('preprocess', () => {
/**
* Trim source code lines to prevent memory leaks
*/
this.trimSourceCodeLines(task.payload);

/**
* Filter sensitive information
*/
this.dataFilter.processEvent(task.payload);
/**
* Filter sensitive information
*/
this.dataFilter.processEvent(task.payload);
});

/**
* Find similar events by grouping pattern
*/
const similarEvent = await this.findSimilarEvent(task.projectId, task.payload.title);
const similarEvent = await session.measureStep('findSimilarEvent', () => {
return this.findSimilarEvent(task.projectId, task.payload.title);
});

if (similarEvent) {
this.logger.info(`[handle] project=${task.projectId} title="${task.payload.title}" similar event found, groupHash=${similarEvent.groupHash} totalCount=${similarEvent.totalCount}`);
Expand All @@ -234,14 +249,18 @@ export default class GrouperWorker extends Worker {
/**
* Find event by group hash.
*/
existedEvent = await this.getEvent(task.projectId, uniqueEventHash);
existedEvent = await session.measureStep('getEvent', () => {
return this.getEvent(task.projectId, uniqueEventHash);
});
}

/**
* Event happened for the first time
*/
const isFirstOccurrence = !existedEvent && !similarEvent;

eventType = isFirstOccurrence ? 'new' : 'repeated';

if (isFirstOccurrence) {
try {
const incrementAffectedUsers = !!task.payload.user;
Expand All @@ -251,14 +270,16 @@ export default class GrouperWorker extends Worker {
/**
* Insert new event
*/
await this.saveEvent(task.projectId, {
groupHash: uniqueEventHash,
totalCount: 1,
catcherType: task.catcherType,
payload: task.payload,
timestamp: task.timestamp,
usersAffected: incrementAffectedUsers ? 1 : 0,
} as GroupedEventDBScheme);
await session.measureStep('saveNewEvent', () => {
return this.saveEvent(task.projectId, {
groupHash: uniqueEventHash,
totalCount: 1,
catcherType: task.catcherType,
payload: task.payload,
timestamp: task.timestamp,
usersAffected: incrementAffectedUsers ? 1 : 0,
} as GroupedEventDBScheme);
});

const eventCacheKey = await this.getEventCacheKey(task.projectId, uniqueEventHash);

Expand Down Expand Up @@ -288,21 +309,27 @@ export default class GrouperWorker extends Worker {
throw e;
}
} else {
const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await this.shouldIncrementAffectedUsers(task, existedEvent);
const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await session.measureStep('affectedUsers', () => {
return this.shouldIncrementAffectedUsers(task, existedEvent, session);
});

incrementDailyAffectedUsers = shouldIncrementDailyAffectedUsers;

/**
* Increment existed task's counter
*/
await this.incrementEventCounterAndAffectedUsers(task.projectId, {
groupHash: uniqueEventHash,
}, incrementAffectedUsers);
await session.measureStep('incrementCounter', () => {
return this.incrementEventCounterAndAffectedUsers(task.projectId, {
groupHash: uniqueEventHash,
}, incrementAffectedUsers);
});

/**
* Decode existed event to calculate diffs correctly
*/
decodeUnsafeFields(existedEvent);
await session.measureStep('decodeEvent', () => {
decodeUnsafeFields(existedEvent);
});

let delta: RepetitionDelta;

Expand All @@ -314,14 +341,17 @@ export default class GrouperWorker extends Worker {
/**
* Calculate delta between original event and repetition
*/
delta = computeDelta(existedEvent.payload, task.payload);
delta = await session.measureStep('computeDelta', () => {
return computeDelta(existedEvent.payload, task.payload);
});
} catch (e) {
console.error(e);
throw new DiffCalculationError(e, existedEvent.payload, task.payload);
}

const deltaStr = JSON.stringify(delta);
const deltaSize = deltaStr != null ? Buffer.byteLength(deltaStr) : 0;

deltaSize = deltaStr != null ? Buffer.byteLength(deltaStr) : 0;

this.grouperMetrics.observeDeltaSize(deltaSize);

Expand All @@ -333,7 +363,9 @@ export default class GrouperWorker extends Worker {
timestamp: task.timestamp,
} as RepetitionDBScheme;

repetitionId = await this.saveRepetition(task.projectId, newRepetition);
repetitionId = await session.measureStep('saveRepetition', () => {
return this.saveRepetition(task.projectId, newRepetition);
});

/**
* Clear the large event payload references to allow garbage collection
Expand All @@ -350,13 +382,15 @@ export default class GrouperWorker extends Worker {
/**
* Store events counter by days
*/
await this.saveDailyEvents(
task.projectId,
uniqueEventHash,
task.timestamp,
repetitionId,
incrementDailyAffectedUsers
);
await session.measureStep('saveDailyEvents', () => {
return this.saveDailyEvents(
task.projectId,
uniqueEventHash,
task.timestamp,
repetitionId,
incrementDailyAffectedUsers
);
});

this.memoryMonitor.logHandleCompletion(
memoryBeforeHandle,
Expand All @@ -373,19 +407,31 @@ export default class GrouperWorker extends Worker {
const isIgnored = isFirstOccurrence ? false : !!existedEvent?.marks?.ignored;

if (!isIgnored) {
await this.addTask(WorkerNames.NOTIFIER, {
projectId: task.projectId,
event: {
title: task.payload.title,
groupHash: uniqueEventHash,
isNew: isFirstOccurrence,
repetitionId: repetitionId ? repetitionId.toString() : null,
},
await session.measureStep('enqueueNotifier', () => {
return this.addTask(WorkerNames.NOTIFIER, {
projectId: task.projectId,
event: {
title: task.payload.title,
groupHash: uniqueEventHash,
isNew: isFirstOccurrence,
repetitionId: repetitionId ? repetitionId.toString() : null,
},
});
});
}
}

await this.recordProjectMetrics(task.projectId, 'events-accepted');
await session.measureStep('recordProjectMetrics', () => {
return this.recordProjectMetrics(task.projectId, 'events-accepted');
});

session.logIfSlow({
projectId: task.projectId,
title: task.payload?.title,
type: eventType,
payloadSize: taskPayloadSize,
deltaSize,
});
}

/**
Expand Down Expand Up @@ -421,8 +467,18 @@ export default class GrouperWorker extends Worker {
};

const series = [
{ key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY, timestampMs: bucketTimestampMs('minutely') },
{ key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK, timestampMs: bucketTimestampMs('hourly') },
{
key: minutelyKey,
label: 'minutely',
retentionMs: TimeMs.DAY,
timestampMs: bucketTimestampMs('minutely'),
},
{
key: hourlyKey,
label: 'hourly',
retentionMs: TimeMs.WEEK,
timestampMs: bucketTimestampMs('hourly'),
},
{
key: dailyKey,
label: 'daily',
Expand Down Expand Up @@ -491,11 +547,13 @@ export default class GrouperWorker extends Worker {
* @param projectId - id of the project to find event in
*/
private async findFirstEventByPattern(pattern: string, projectId: string): Promise<GroupedEventDBScheme> {
return await this.eventsDb.getConnection()
.collection(`events:${projectId}`)
.findOne(
{ 'payload.title': { $regex: pattern } }
);
return this.grouperMetrics.observeMongoDuration('findFirstEventByPattern', async () => {
return await this.eventsDb.getConnection()
.collection(`events:${projectId}`)
.findOne(
{ 'payload.title': { $regex: pattern } }
);
});
}

/**
Expand Down Expand Up @@ -561,11 +619,13 @@ export default class GrouperWorker extends Worker {
* @returns {ProjectEventGroupingPatternsDBScheme[]} EventPatterns object with projectId and list of patterns
*/
private async getProjectPatterns(projectId: string): Promise<ProjectEventGroupingPatternsDBScheme[]> {
const project = await this.accountsDb.getConnection()
.collection('projects')
.findOne({
_id: new mongodb.ObjectId(projectId),
});
const project = await this.grouperMetrics.observeMongoDuration('getProjectPatterns', async () => {
return this.accountsDb.getConnection()
.collection('projects')
.findOne({
_id: new mongodb.ObjectId(projectId),
});
});

return project?.eventGroupingPatterns || [];
}
Expand All @@ -575,9 +635,14 @@ export default class GrouperWorker extends Worker {
*
* @param task - worker task to process
* @param existedEvent - original event to get its user
* @param session - current slow handle diagnostics session.
* @returns {[boolean, boolean]} - whether to increment affected users for the repetition and the daily aggregation
*/
private async shouldIncrementAffectedUsers<Type extends ErrorsCatcherType>(task: GroupWorkerTask<Type>, existedEvent: GroupedEventDBScheme): Promise<[boolean, boolean]> {
private async shouldIncrementAffectedUsers<Type extends ErrorsCatcherType>(
task: GroupWorkerTask<Type>,
existedEvent: GroupedEventDBScheme,
session: SlowHandleSession
): Promise<[boolean, boolean]> {
const eventUser = task.payload.user;

/**
Expand Down Expand Up @@ -609,11 +674,13 @@ export default class GrouperWorker extends Worker {
*/
const repetitionCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}`;
const repetition = await this.cache.get(repetitionCacheKey, async () => {
return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`)
.findOne({
groupHash: existedEvent.groupHash,
'payload.user.id': eventUser.id,
});
return this.grouperMetrics.observeMongoDuration('findUserRepetition', async () => {
return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`)
.findOne({
groupHash: existedEvent.groupHash,
'payload.user.id': eventUser.id,
});
});
});

if (repetition) {
Expand Down Expand Up @@ -643,15 +710,17 @@ export default class GrouperWorker extends Worker {
*/
const repetitionDailyCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}:${eventMidnight}`;
const repetitionDaily = await this.cache.get(repetitionDailyCacheKey, async () => {
return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`)
.findOne({
groupHash: existedEvent.groupHash,
'payload.user.id': eventUser.id,
timestamp: {
$gte: eventMidnight,
$lt: eventNextMidnight,
},
});
return this.grouperMetrics.observeMongoDuration('findDailyUserRepetition', async () => {
return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`)
.findOne({
groupHash: existedEvent.groupHash,
'payload.user.id': eventUser.id,
timestamp: {
$gte: eventMidnight,
$lt: eventNextMidnight,
},
});
});
});

/**
Expand All @@ -665,8 +734,12 @@ export default class GrouperWorker extends Worker {
/**
* Check Redis lock - if locked, don't increment either counter
*/
const isEventLocked = await this.redis.checkOrSetlockEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id);
const isDailyEventLocked = await this.redis.checkOrSetlockDailyEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id, eventMidnight);
const [isEventLocked, isDailyEventLocked] = await session.measureStep('affectedUsersRedisLocks', async () => {
return [
await this.redis.checkOrSetlockEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id),
await this.redis.checkOrSetlockDailyEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id, eventMidnight),
];
});

shouldIncrementRepetitionAffectedUsers = isEventLocked ? false : shouldIncrementRepetitionAffectedUsers;
shouldIncrementDailyAffectedUsers = isDailyEventLocked ? false : shouldIncrementDailyAffectedUsers;
Expand Down
Loading
Loading