diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 44460c8d..64eabe51 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -28,7 +28,8 @@ import { rightTrim } from '../../../lib/utils/string'; import { hasValue } from '../../../lib/utils/hasValue'; import GrouperMetrics from './metrics/grouperMetrics'; import GrouperMemoryMonitor from './metrics/memoryMonitor'; -import { grouperMemoryConfig } from './metrics/config'; +import SlowHandleDiagnostics, { SlowHandleSession } from './metrics/slowHandleDiagnostics'; +import { grouperDiagnosticsConfig, grouperMemoryConfig } from './metrics/config'; /** * eslint does not count decorators as a variable usage @@ -101,6 +102,11 @@ export default class GrouperWorker extends Worker { */ private memoryMonitor = new GrouperMemoryMonitor(this.logger, grouperMemoryConfig); + /** + * Slow handle diagnostics helper. + */ + private slowHandleDiagnostics = new SlowHandleDiagnostics(this.logger, this.grouperMetrics, grouperDiagnosticsConfig); + /** * Interval for periodic cache cleanup to prevent memory leaks from unbounded cache growth */ @@ -180,9 +186,14 @@ export default class GrouperWorker extends Worker { * @param task - event to handle */ private async handleInternal(task: GroupWorkerTask): Promise { - const taskPayloadSize = Buffer.byteLength(JSON.stringify(task.payload)); + const session = this.slowHandleDiagnostics.startSession(); + const taskPayloadSize = await session.measureStep('payloadSize', () => { + return Buffer.byteLength(JSON.stringify(task.payload)); + }); const handledTasksCount = ++this.handledTasksCount; const memoryBeforeHandle = process.memoryUsage(); + let deltaSize = 0; + let eventType: 'new' | 'repeated' = 'new'; this.grouperMetrics.observePayloadSize(taskPayloadSize); this.memoryMonitor.logBeforeHandle(memoryBeforeHandle, handledTasksCount, taskPayloadSize, task.projectId); @@ -198,25 +209,29 @@ export default class GrouperWorker extends Worker { }; } - let uniqueEventHash = await this.getUniqueEventHash(task); + let uniqueEventHash = await session.measureStep('hash', () => this.getUniqueEventHash(task)); let existedEvent: GroupedEventDBScheme; let repetitionId = null; let incrementDailyAffectedUsers = false; - /** - * Trim source code lines to prevent memory leaks - */ - this.trimSourceCodeLines(task.payload); + await session.measureStep('preprocess', () => { + /** + * Trim source code lines to prevent memory leaks + */ + this.trimSourceCodeLines(task.payload); - /** - * Filter sensitive information - */ - this.dataFilter.processEvent(task.payload); + /** + * Filter sensitive information + */ + this.dataFilter.processEvent(task.payload); + }); /** * Find similar events by grouping pattern */ - const similarEvent = await this.findSimilarEvent(task.projectId, task.payload.title); + const similarEvent = await session.measureStep('findSimilarEvent', () => { + return this.findSimilarEvent(task.projectId, task.payload.title); + }); if (similarEvent) { this.logger.info(`[handle] project=${task.projectId} title="${task.payload.title}" similar event found, groupHash=${similarEvent.groupHash} totalCount=${similarEvent.totalCount}`); @@ -234,7 +249,9 @@ export default class GrouperWorker extends Worker { /** * Find event by group hash. */ - existedEvent = await this.getEvent(task.projectId, uniqueEventHash); + existedEvent = await session.measureStep('getEvent', () => { + return this.getEvent(task.projectId, uniqueEventHash); + }); } /** @@ -242,6 +259,8 @@ export default class GrouperWorker extends Worker { */ const isFirstOccurrence = !existedEvent && !similarEvent; + eventType = isFirstOccurrence ? 'new' : 'repeated'; + if (isFirstOccurrence) { try { const incrementAffectedUsers = !!task.payload.user; @@ -251,14 +270,16 @@ export default class GrouperWorker extends Worker { /** * Insert new event */ - await this.saveEvent(task.projectId, { - groupHash: uniqueEventHash, - totalCount: 1, - catcherType: task.catcherType, - payload: task.payload, - timestamp: task.timestamp, - usersAffected: incrementAffectedUsers ? 1 : 0, - } as GroupedEventDBScheme); + await session.measureStep('saveNewEvent', () => { + return this.saveEvent(task.projectId, { + groupHash: uniqueEventHash, + totalCount: 1, + catcherType: task.catcherType, + payload: task.payload, + timestamp: task.timestamp, + usersAffected: incrementAffectedUsers ? 1 : 0, + } as GroupedEventDBScheme); + }); const eventCacheKey = await this.getEventCacheKey(task.projectId, uniqueEventHash); @@ -288,21 +309,27 @@ export default class GrouperWorker extends Worker { throw e; } } else { - const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await this.shouldIncrementAffectedUsers(task, existedEvent); + const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await session.measureStep('affectedUsers', () => { + return this.shouldIncrementAffectedUsers(task, existedEvent, session); + }); incrementDailyAffectedUsers = shouldIncrementDailyAffectedUsers; /** * Increment existed task's counter */ - await this.incrementEventCounterAndAffectedUsers(task.projectId, { - groupHash: uniqueEventHash, - }, incrementAffectedUsers); + await session.measureStep('incrementCounter', () => { + return this.incrementEventCounterAndAffectedUsers(task.projectId, { + groupHash: uniqueEventHash, + }, incrementAffectedUsers); + }); /** * Decode existed event to calculate diffs correctly */ - decodeUnsafeFields(existedEvent); + await session.measureStep('decodeEvent', () => { + decodeUnsafeFields(existedEvent); + }); let delta: RepetitionDelta; @@ -314,14 +341,17 @@ export default class GrouperWorker extends Worker { /** * Calculate delta between original event and repetition */ - delta = computeDelta(existedEvent.payload, task.payload); + delta = await session.measureStep('computeDelta', () => { + return computeDelta(existedEvent.payload, task.payload); + }); } catch (e) { console.error(e); throw new DiffCalculationError(e, existedEvent.payload, task.payload); } const deltaStr = JSON.stringify(delta); - const deltaSize = deltaStr != null ? Buffer.byteLength(deltaStr) : 0; + + deltaSize = deltaStr != null ? Buffer.byteLength(deltaStr) : 0; this.grouperMetrics.observeDeltaSize(deltaSize); @@ -333,7 +363,9 @@ export default class GrouperWorker extends Worker { timestamp: task.timestamp, } as RepetitionDBScheme; - repetitionId = await this.saveRepetition(task.projectId, newRepetition); + repetitionId = await session.measureStep('saveRepetition', () => { + return this.saveRepetition(task.projectId, newRepetition); + }); /** * Clear the large event payload references to allow garbage collection @@ -350,13 +382,15 @@ export default class GrouperWorker extends Worker { /** * Store events counter by days */ - await this.saveDailyEvents( - task.projectId, - uniqueEventHash, - task.timestamp, - repetitionId, - incrementDailyAffectedUsers - ); + await session.measureStep('saveDailyEvents', () => { + return this.saveDailyEvents( + task.projectId, + uniqueEventHash, + task.timestamp, + repetitionId, + incrementDailyAffectedUsers + ); + }); this.memoryMonitor.logHandleCompletion( memoryBeforeHandle, @@ -373,19 +407,31 @@ export default class GrouperWorker extends Worker { const isIgnored = isFirstOccurrence ? false : !!existedEvent?.marks?.ignored; if (!isIgnored) { - await this.addTask(WorkerNames.NOTIFIER, { - projectId: task.projectId, - event: { - title: task.payload.title, - groupHash: uniqueEventHash, - isNew: isFirstOccurrence, - repetitionId: repetitionId ? repetitionId.toString() : null, - }, + await session.measureStep('enqueueNotifier', () => { + return this.addTask(WorkerNames.NOTIFIER, { + projectId: task.projectId, + event: { + title: task.payload.title, + groupHash: uniqueEventHash, + isNew: isFirstOccurrence, + repetitionId: repetitionId ? repetitionId.toString() : null, + }, + }); }); } } - await this.recordProjectMetrics(task.projectId, 'events-accepted'); + await session.measureStep('recordProjectMetrics', () => { + return this.recordProjectMetrics(task.projectId, 'events-accepted'); + }); + + session.logIfSlow({ + projectId: task.projectId, + title: task.payload?.title, + type: eventType, + payloadSize: taskPayloadSize, + deltaSize, + }); } /** @@ -421,8 +467,18 @@ export default class GrouperWorker extends Worker { }; const series = [ - { key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY, timestampMs: bucketTimestampMs('minutely') }, - { key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK, timestampMs: bucketTimestampMs('hourly') }, + { + key: minutelyKey, + label: 'minutely', + retentionMs: TimeMs.DAY, + timestampMs: bucketTimestampMs('minutely'), + }, + { + key: hourlyKey, + label: 'hourly', + retentionMs: TimeMs.WEEK, + timestampMs: bucketTimestampMs('hourly'), + }, { key: dailyKey, label: 'daily', @@ -491,11 +547,13 @@ export default class GrouperWorker extends Worker { * @param projectId - id of the project to find event in */ private async findFirstEventByPattern(pattern: string, projectId: string): Promise { - return await this.eventsDb.getConnection() - .collection(`events:${projectId}`) - .findOne( - { 'payload.title': { $regex: pattern } } - ); + return this.grouperMetrics.observeMongoDuration('findFirstEventByPattern', async () => { + return await this.eventsDb.getConnection() + .collection(`events:${projectId}`) + .findOne( + { 'payload.title': { $regex: pattern } } + ); + }); } /** @@ -561,11 +619,13 @@ export default class GrouperWorker extends Worker { * @returns {ProjectEventGroupingPatternsDBScheme[]} EventPatterns object with projectId and list of patterns */ private async getProjectPatterns(projectId: string): Promise { - const project = await this.accountsDb.getConnection() - .collection('projects') - .findOne({ - _id: new mongodb.ObjectId(projectId), - }); + const project = await this.grouperMetrics.observeMongoDuration('getProjectPatterns', async () => { + return this.accountsDb.getConnection() + .collection('projects') + .findOne({ + _id: new mongodb.ObjectId(projectId), + }); + }); return project?.eventGroupingPatterns || []; } @@ -575,9 +635,14 @@ export default class GrouperWorker extends Worker { * * @param task - worker task to process * @param existedEvent - original event to get its user + * @param session - current slow handle diagnostics session. * @returns {[boolean, boolean]} - whether to increment affected users for the repetition and the daily aggregation */ - private async shouldIncrementAffectedUsers(task: GroupWorkerTask, existedEvent: GroupedEventDBScheme): Promise<[boolean, boolean]> { + private async shouldIncrementAffectedUsers( + task: GroupWorkerTask, + existedEvent: GroupedEventDBScheme, + session: SlowHandleSession + ): Promise<[boolean, boolean]> { const eventUser = task.payload.user; /** @@ -609,11 +674,13 @@ export default class GrouperWorker extends Worker { */ const repetitionCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}`; const repetition = await this.cache.get(repetitionCacheKey, async () => { - return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`) - .findOne({ - groupHash: existedEvent.groupHash, - 'payload.user.id': eventUser.id, - }); + return this.grouperMetrics.observeMongoDuration('findUserRepetition', async () => { + return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`) + .findOne({ + groupHash: existedEvent.groupHash, + 'payload.user.id': eventUser.id, + }); + }); }); if (repetition) { @@ -643,15 +710,17 @@ export default class GrouperWorker extends Worker { */ const repetitionDailyCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}:${eventMidnight}`; const repetitionDaily = await this.cache.get(repetitionDailyCacheKey, async () => { - return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`) - .findOne({ - groupHash: existedEvent.groupHash, - 'payload.user.id': eventUser.id, - timestamp: { - $gte: eventMidnight, - $lt: eventNextMidnight, - }, - }); + return this.grouperMetrics.observeMongoDuration('findDailyUserRepetition', async () => { + return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`) + .findOne({ + groupHash: existedEvent.groupHash, + 'payload.user.id': eventUser.id, + timestamp: { + $gte: eventMidnight, + $lt: eventNextMidnight, + }, + }); + }); }); /** @@ -665,8 +734,12 @@ export default class GrouperWorker extends Worker { /** * Check Redis lock - if locked, don't increment either counter */ - const isEventLocked = await this.redis.checkOrSetlockEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id); - const isDailyEventLocked = await this.redis.checkOrSetlockDailyEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id, eventMidnight); + const [isEventLocked, isDailyEventLocked] = await session.measureStep('affectedUsersRedisLocks', async () => { + return [ + await this.redis.checkOrSetlockEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id), + await this.redis.checkOrSetlockDailyEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id, eventMidnight), + ]; + }); shouldIncrementRepetitionAffectedUsers = isEventLocked ? false : shouldIncrementRepetitionAffectedUsers; shouldIncrementDailyAffectedUsers = isDailyEventLocked ? false : shouldIncrementDailyAffectedUsers; diff --git a/workers/grouper/src/metrics/config.ts b/workers/grouper/src/metrics/config.ts index 7fe5abb2..f36821ba 100644 --- a/workers/grouper/src/metrics/config.ts +++ b/workers/grouper/src/metrics/config.ts @@ -23,6 +23,16 @@ export interface GrouperMemoryConfig { handleGrowthWarnMb: number; } +/** + * Parsed config for slow handle diagnostics. + */ +export interface GrouperDiagnosticsConfig { + /** + * Log handle step breakdown when total handle duration is greater than this value. + */ + slowHandleWarnMs: number; +} + /** * Default memory checkpoint interval in tasks. */ @@ -43,6 +53,11 @@ const DEFAULT_MEMORY_GROWTH_WARN_MB = 64; */ const DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB = 16; +/** + * Default slow handle warning threshold in ms. + */ +const DEFAULT_SLOW_HANDLE_WARN_MS = 5000; + /** * Histogram buckets for payload and delta sizes (bytes). */ @@ -70,3 +85,10 @@ export const grouperMemoryConfig: GrouperMemoryConfig = { growthWarnMb: asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WARN_MB, DEFAULT_MEMORY_GROWTH_WARN_MB), handleGrowthWarnMb: asPositiveNumber(process.env.GROUPER_MEMORY_HANDLE_GROWTH_WARN_MB, DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB), }; + +/** + * Slow handle diagnostics config from environment. + */ +export const grouperDiagnosticsConfig: GrouperDiagnosticsConfig = { + slowHandleWarnMs: asPositiveNumber(process.env.GROUPER_SLOW_HANDLE_WARN_MS, DEFAULT_SLOW_HANDLE_WARN_MS), +}; diff --git a/workers/grouper/src/metrics/grouperMetrics.ts b/workers/grouper/src/metrics/grouperMetrics.ts index 4be19e87..02a36306 100644 --- a/workers/grouper/src/metrics/grouperMetrics.ts +++ b/workers/grouper/src/metrics/grouperMetrics.ts @@ -2,7 +2,39 @@ import { client, register } from '../../../../lib/metrics'; import { GROUPER_METRICS_SIZE_BUCKETS } from './config'; type EventType = 'new' | 'repeated'; -type MongoOperation = 'getEvent' | 'saveEvent' | 'saveRepetition' | 'incrementCounter' | 'saveDailyEvents'; +type MongoOperation = + 'findDailyUserRepetition' | + 'findFirstEventByPattern' | + 'findUserRepetition' | + 'getEvent' | + 'getProjectPatterns' | + 'incrementCounter' | + 'saveDailyEvents' | + 'saveEvent' | + 'saveRepetition'; + +export type GrouperStep = + 'affectedUsers' | + 'affectedUsersRedisLocks' | + 'computeDelta' | + 'decodeEvent' | + 'enqueueNotifier' | + 'findSimilarEvent' | + 'getEvent' | + 'hash' | + 'incrementCounter' | + 'payloadSize' | + 'preprocess' | + 'recordProjectMetrics' | + 'saveDailyEvents' | + 'saveNewEvent' | + 'saveRepetition'; + +// eslint-disable-next-line @typescript-eslint/no-magic-numbers +const GROUPER_HANDLE_DURATION_BUCKETS = [0.05, 0.1, 0.25, 0.5, 1, 2, 3, 5, 7.5, 10, 15, 30, 60]; + +// eslint-disable-next-line @typescript-eslint/no-magic-numbers +const GROUPER_STEP_DURATION_BUCKETS = [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 30]; /** * Reuse already registered metric by name, or create one. @@ -39,6 +71,18 @@ export default class GrouperMetrics { () => new client.Histogram({ name: 'hawk_grouper_handle_duration_seconds', help: 'Duration of handle() call in seconds', + buckets: GROUPER_HANDLE_DURATION_BUCKETS, + registers: [ register ], + }) + ); + + private readonly stepDuration = getOrCreateMetric( + 'hawk_grouper_step_duration_seconds', + () => new client.Histogram({ + name: 'hawk_grouper_step_duration_seconds', + help: 'Duration of Grouper handle step in seconds', + labelNames: [ 'step' ], + buckets: GROUPER_STEP_DURATION_BUCKETS, registers: [ register ], }) ); @@ -106,6 +150,22 @@ export default class GrouperMetrics { } } + /** + * Measure a single Grouper handle step duration. + * + * @param step - step label. + * @param callback - callback to execute under timer. + */ + public async observeStepDuration(step: GrouperStep, callback: () => Promise | T): Promise { + const endTimer = this.stepDuration.startTimer({ step }); + + try { + return await callback(); + } finally { + endTimer(); + } + } + /** * Increment events counter by event type. * diff --git a/workers/grouper/src/metrics/slowHandleDiagnostics.ts b/workers/grouper/src/metrics/slowHandleDiagnostics.ts new file mode 100644 index 00000000..1d74d8c4 --- /dev/null +++ b/workers/grouper/src/metrics/slowHandleDiagnostics.ts @@ -0,0 +1,110 @@ +import type { GrouperDiagnosticsConfig } from './config'; +import type GrouperMetrics from './grouperMetrics'; +import type { GrouperStep } from './grouperMetrics'; + +interface LoggerLike { + warn(message: string): void; +} + +/** + * Context describing the handled task for slow handle log line. + */ +export interface SlowHandleContext { + projectId: string; + title?: string; + type: 'new' | 'repeated'; + payloadSize: number; + deltaSize: number; +} + +/** + * Per-handle diagnostics session: owns its own start timestamp and step timings, + * measures Grouper handle steps and emits a slow handle warning on demand. + */ +export class SlowHandleSession { + private readonly startedAt: number = Date.now(); + private readonly timings: Map = new Map(); + private readonly logger: LoggerLike; + private readonly metrics: GrouperMetrics; + private readonly config: GrouperDiagnosticsConfig; + + /** + * @param logger - logger instance. + * @param metrics - Grouper metrics facade used to record step duration. + * @param config - slow handle diagnostics thresholds. + */ + constructor(logger: LoggerLike, metrics: GrouperMetrics, config: GrouperDiagnosticsConfig) { + this.logger = logger; + this.metrics = metrics; + this.config = config; + } + + /** + * Measure a Grouper handle step and accumulate its duration in the session. + * + * @param step - step name. + * @param callback - measured callback. + */ + public async measureStep(step: GrouperStep, callback: () => Promise | T): Promise { + const stepStartedAt = Date.now(); + + try { + return await this.metrics.observeStepDuration(step, callback); + } finally { + const durationMs = Date.now() - stepStartedAt; + const previousDurationMs = this.timings.get(step) || 0; + + this.timings.set(step, previousDurationMs + durationMs); + } + } + + /** + * Log slow handle breakdown if total session duration exceeds the warn threshold. + * + * @param context - handled task context. + */ + public logIfSlow(context: SlowHandleContext): void { + const durationMs = Date.now() - this.startedAt; + + if (durationMs < this.config.slowHandleWarnMs) { + return; + } + + const steps = Array.from(this.timings.entries()) + .sort((first, second) => second[1] - first[1]) + .map(([step, stepDurationMs]) => `${step}=${stepDurationMs}ms`) + .join(' '); + + this.logger.warn( + `[slowHandle] duration=${durationMs}ms project=${context.projectId} type=${context.type} ` + + `payloadSize=${context.payloadSize}b deltaSize=${context.deltaSize}b title="${context.title}" steps="${steps}"` + ); + } +} + +/** + * Factory for per-handle slow handle diagnostics sessions. + */ +export default class SlowHandleDiagnostics { + private readonly logger: LoggerLike; + private readonly metrics: GrouperMetrics; + private readonly config: GrouperDiagnosticsConfig; + + /** + * @param logger - logger instance. + * @param metrics - Grouper metrics facade used to record step duration. + * @param config - slow handle diagnostics thresholds. + */ + constructor(logger: LoggerLike, metrics: GrouperMetrics, config: GrouperDiagnosticsConfig) { + this.logger = logger; + this.metrics = metrics; + this.config = config; + } + + /** + * Start a new per-handle diagnostics session. + */ + public startSession(): SlowHandleSession { + return new SlowHandleSession(this.logger, this.metrics, this.config); + } +}