From 3bfcaccb6e8c7e9d30113f0475ea8703fb9f74ba Mon Sep 17 00:00:00 2001 From: Kuchizu Date: Mon, 4 May 2026 18:27:23 +0300 Subject: [PATCH 1/2] feat(grouper): add slow handle diagnostics --- workers/grouper/src/index.ts | 271 +++++++++++++----- workers/grouper/src/metrics/config.ts | 22 ++ workers/grouper/src/metrics/grouperMetrics.ts | 62 +++- 3 files changed, 279 insertions(+), 76 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 44460c8d..dd268a37 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -26,9 +26,9 @@ import { computeDelta } from './utils/repetitionDiff'; import { bucketTimestampMs } from './utils/bucketTimestamp'; import { rightTrim } from '../../../lib/utils/string'; import { hasValue } from '../../../lib/utils/hasValue'; -import GrouperMetrics from './metrics/grouperMetrics'; +import GrouperMetrics, { GrouperStep } from './metrics/grouperMetrics'; import GrouperMemoryMonitor from './metrics/memoryMonitor'; -import { grouperMemoryConfig } from './metrics/config'; +import { grouperDiagnosticsConfig, grouperMemoryConfig } from './metrics/config'; /** * eslint does not count decorators as a variable usage @@ -174,15 +174,80 @@ export default class GrouperWorker extends Worker { } } + /** + * Measure a Grouper handle step and keep per-handle timing for slow logs. + * + * @param step - step name. + * @param timings - current handle timings. + * @param callback - measured callback. + */ + private async measureStep( + step: GrouperStep, + timings: Map, + callback: () => Promise | T + ): Promise { + const startedAt = Date.now(); + + try { + return await this.grouperMetrics.observeStepDuration(step, callback); + } finally { + const durationMs = Date.now() - startedAt; + const previousDurationMs = timings.get(step) || 0; + + timings.set(step, previousDurationMs + durationMs); + } + } + + /** + * Log slow handle breakdown for finding the exact slow operation. + * + * @param startedAt - handle start timestamp. + * @param timings - step timings. + * @param task - handled task. + * @param type - event type. + * @param payloadSize - payload size in bytes. + * @param deltaSize - delta size in bytes. + */ + private logSlowHandle( + startedAt: number, + timings: Map, + task: GroupWorkerTask, + type: 'new' | 'repeated', + payloadSize: number, + deltaSize: number + ): void { + const durationMs = Date.now() - startedAt; + + if (durationMs < grouperDiagnosticsConfig.slowHandleWarnMs) { + return; + } + + const steps = Array.from(timings.entries()) + .sort((first, second) => second[1] - first[1]) + .map(([step, stepDurationMs]) => `${step}=${stepDurationMs}ms`) + .join(' '); + + this.logger.warn( + `[slowHandle] duration=${durationMs}ms project=${task.projectId} type=${type} ` + + `payloadSize=${payloadSize}b deltaSize=${deltaSize}b title="${task.payload?.title}" steps="${steps}"` + ); + } + /** * Internal task handling function * * @param task - event to handle */ private async handleInternal(task: GroupWorkerTask): Promise { - const taskPayloadSize = Buffer.byteLength(JSON.stringify(task.payload)); + const handleStartedAt = Date.now(); + const stepTimings = new Map(); + const taskPayloadSize = await this.measureStep('payloadSize', stepTimings, () => { + return Buffer.byteLength(JSON.stringify(task.payload)); + }); const handledTasksCount = ++this.handledTasksCount; const memoryBeforeHandle = process.memoryUsage(); + let deltaSize = 0; + let eventType: 'new' | 'repeated' = 'new'; this.grouperMetrics.observePayloadSize(taskPayloadSize); this.memoryMonitor.logBeforeHandle(memoryBeforeHandle, handledTasksCount, taskPayloadSize, task.projectId); @@ -198,25 +263,29 @@ export default class GrouperWorker extends Worker { }; } - let uniqueEventHash = await this.getUniqueEventHash(task); + let uniqueEventHash = await this.measureStep('hash', stepTimings, () => this.getUniqueEventHash(task)); let existedEvent: GroupedEventDBScheme; let repetitionId = null; let incrementDailyAffectedUsers = false; - /** - * Trim source code lines to prevent memory leaks - */ - this.trimSourceCodeLines(task.payload); + await this.measureStep('preprocess', stepTimings, () => { + /** + * Trim source code lines to prevent memory leaks + */ + this.trimSourceCodeLines(task.payload); - /** - * Filter sensitive information - */ - this.dataFilter.processEvent(task.payload); + /** + * Filter sensitive information + */ + this.dataFilter.processEvent(task.payload); + }); /** * Find similar events by grouping pattern */ - const similarEvent = await this.findSimilarEvent(task.projectId, task.payload.title); + const similarEvent = await this.measureStep('findSimilarEvent', stepTimings, () => { + return this.findSimilarEvent(task.projectId, task.payload.title); + }); if (similarEvent) { this.logger.info(`[handle] project=${task.projectId} title="${task.payload.title}" similar event found, groupHash=${similarEvent.groupHash} totalCount=${similarEvent.totalCount}`); @@ -234,7 +303,9 @@ export default class GrouperWorker extends Worker { /** * Find event by group hash. */ - existedEvent = await this.getEvent(task.projectId, uniqueEventHash); + existedEvent = await this.measureStep('getEvent', stepTimings, () => { + return this.getEvent(task.projectId, uniqueEventHash); + }); } /** @@ -242,6 +313,8 @@ export default class GrouperWorker extends Worker { */ const isFirstOccurrence = !existedEvent && !similarEvent; + eventType = isFirstOccurrence ? 'new' : 'repeated'; + if (isFirstOccurrence) { try { const incrementAffectedUsers = !!task.payload.user; @@ -251,14 +324,16 @@ export default class GrouperWorker extends Worker { /** * Insert new event */ - await this.saveEvent(task.projectId, { - groupHash: uniqueEventHash, - totalCount: 1, - catcherType: task.catcherType, - payload: task.payload, - timestamp: task.timestamp, - usersAffected: incrementAffectedUsers ? 1 : 0, - } as GroupedEventDBScheme); + await this.measureStep('saveNewEvent', stepTimings, () => { + return this.saveEvent(task.projectId, { + groupHash: uniqueEventHash, + totalCount: 1, + catcherType: task.catcherType, + payload: task.payload, + timestamp: task.timestamp, + usersAffected: incrementAffectedUsers ? 1 : 0, + } as GroupedEventDBScheme); + }); const eventCacheKey = await this.getEventCacheKey(task.projectId, uniqueEventHash); @@ -288,21 +363,27 @@ export default class GrouperWorker extends Worker { throw e; } } else { - const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await this.shouldIncrementAffectedUsers(task, existedEvent); + const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await this.measureStep('affectedUsers', stepTimings, () => { + return this.shouldIncrementAffectedUsers(task, existedEvent, stepTimings); + }); incrementDailyAffectedUsers = shouldIncrementDailyAffectedUsers; /** * Increment existed task's counter */ - await this.incrementEventCounterAndAffectedUsers(task.projectId, { - groupHash: uniqueEventHash, - }, incrementAffectedUsers); + await this.measureStep('incrementCounter', stepTimings, () => { + return this.incrementEventCounterAndAffectedUsers(task.projectId, { + groupHash: uniqueEventHash, + }, incrementAffectedUsers); + }); /** * Decode existed event to calculate diffs correctly */ - decodeUnsafeFields(existedEvent); + await this.measureStep('decodeEvent', stepTimings, () => { + decodeUnsafeFields(existedEvent); + }); let delta: RepetitionDelta; @@ -314,14 +395,17 @@ export default class GrouperWorker extends Worker { /** * Calculate delta between original event and repetition */ - delta = computeDelta(existedEvent.payload, task.payload); + delta = await this.measureStep('computeDelta', stepTimings, () => { + return computeDelta(existedEvent.payload, task.payload); + }); } catch (e) { console.error(e); throw new DiffCalculationError(e, existedEvent.payload, task.payload); } const deltaStr = JSON.stringify(delta); - const deltaSize = deltaStr != null ? Buffer.byteLength(deltaStr) : 0; + + deltaSize = deltaStr != null ? Buffer.byteLength(deltaStr) : 0; this.grouperMetrics.observeDeltaSize(deltaSize); @@ -333,7 +417,9 @@ export default class GrouperWorker extends Worker { timestamp: task.timestamp, } as RepetitionDBScheme; - repetitionId = await this.saveRepetition(task.projectId, newRepetition); + repetitionId = await this.measureStep('saveRepetition', stepTimings, () => { + return this.saveRepetition(task.projectId, newRepetition); + }); /** * Clear the large event payload references to allow garbage collection @@ -350,13 +436,15 @@ export default class GrouperWorker extends Worker { /** * Store events counter by days */ - await this.saveDailyEvents( - task.projectId, - uniqueEventHash, - task.timestamp, - repetitionId, - incrementDailyAffectedUsers - ); + await this.measureStep('saveDailyEvents', stepTimings, () => { + return this.saveDailyEvents( + task.projectId, + uniqueEventHash, + task.timestamp, + repetitionId, + incrementDailyAffectedUsers + ); + }); this.memoryMonitor.logHandleCompletion( memoryBeforeHandle, @@ -373,19 +461,25 @@ export default class GrouperWorker extends Worker { const isIgnored = isFirstOccurrence ? false : !!existedEvent?.marks?.ignored; if (!isIgnored) { - await this.addTask(WorkerNames.NOTIFIER, { - projectId: task.projectId, - event: { - title: task.payload.title, - groupHash: uniqueEventHash, - isNew: isFirstOccurrence, - repetitionId: repetitionId ? repetitionId.toString() : null, - }, + await this.measureStep('enqueueNotifier', stepTimings, () => { + return this.addTask(WorkerNames.NOTIFIER, { + projectId: task.projectId, + event: { + title: task.payload.title, + groupHash: uniqueEventHash, + isNew: isFirstOccurrence, + repetitionId: repetitionId ? repetitionId.toString() : null, + }, + }); }); } } - await this.recordProjectMetrics(task.projectId, 'events-accepted'); + await this.measureStep('recordProjectMetrics', stepTimings, () => { + return this.recordProjectMetrics(task.projectId, 'events-accepted'); + }); + + this.logSlowHandle(handleStartedAt, stepTimings, task, eventType, taskPayloadSize, deltaSize); } /** @@ -421,8 +515,18 @@ export default class GrouperWorker extends Worker { }; const series = [ - { key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY, timestampMs: bucketTimestampMs('minutely') }, - { key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK, timestampMs: bucketTimestampMs('hourly') }, + { + key: minutelyKey, + label: 'minutely', + retentionMs: TimeMs.DAY, + timestampMs: bucketTimestampMs('minutely'), + }, + { + key: hourlyKey, + label: 'hourly', + retentionMs: TimeMs.WEEK, + timestampMs: bucketTimestampMs('hourly'), + }, { key: dailyKey, label: 'daily', @@ -491,11 +595,13 @@ export default class GrouperWorker extends Worker { * @param projectId - id of the project to find event in */ private async findFirstEventByPattern(pattern: string, projectId: string): Promise { - return await this.eventsDb.getConnection() - .collection(`events:${projectId}`) - .findOne( - { 'payload.title': { $regex: pattern } } - ); + return this.grouperMetrics.observeMongoDuration('findFirstEventByPattern', async () => { + return await this.eventsDb.getConnection() + .collection(`events:${projectId}`) + .findOne( + { 'payload.title': { $regex: pattern } } + ); + }); } /** @@ -561,11 +667,13 @@ export default class GrouperWorker extends Worker { * @returns {ProjectEventGroupingPatternsDBScheme[]} EventPatterns object with projectId and list of patterns */ private async getProjectPatterns(projectId: string): Promise { - const project = await this.accountsDb.getConnection() - .collection('projects') - .findOne({ - _id: new mongodb.ObjectId(projectId), - }); + const project = await this.grouperMetrics.observeMongoDuration('getProjectPatterns', async () => { + return this.accountsDb.getConnection() + .collection('projects') + .findOne({ + _id: new mongodb.ObjectId(projectId), + }); + }); return project?.eventGroupingPatterns || []; } @@ -575,9 +683,14 @@ export default class GrouperWorker extends Worker { * * @param task - worker task to process * @param existedEvent - original event to get its user + * @param stepTimings - current handle timings. * @returns {[boolean, boolean]} - whether to increment affected users for the repetition and the daily aggregation */ - private async shouldIncrementAffectedUsers(task: GroupWorkerTask, existedEvent: GroupedEventDBScheme): Promise<[boolean, boolean]> { + private async shouldIncrementAffectedUsers( + task: GroupWorkerTask, + existedEvent: GroupedEventDBScheme, + stepTimings: Map + ): Promise<[boolean, boolean]> { const eventUser = task.payload.user; /** @@ -609,11 +722,13 @@ export default class GrouperWorker extends Worker { */ const repetitionCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}`; const repetition = await this.cache.get(repetitionCacheKey, async () => { - return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`) - .findOne({ - groupHash: existedEvent.groupHash, - 'payload.user.id': eventUser.id, - }); + return this.grouperMetrics.observeMongoDuration('findUserRepetition', async () => { + return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`) + .findOne({ + groupHash: existedEvent.groupHash, + 'payload.user.id': eventUser.id, + }); + }); }); if (repetition) { @@ -643,15 +758,17 @@ export default class GrouperWorker extends Worker { */ const repetitionDailyCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}:${eventMidnight}`; const repetitionDaily = await this.cache.get(repetitionDailyCacheKey, async () => { - return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`) - .findOne({ - groupHash: existedEvent.groupHash, - 'payload.user.id': eventUser.id, - timestamp: { - $gte: eventMidnight, - $lt: eventNextMidnight, - }, - }); + return this.grouperMetrics.observeMongoDuration('findDailyUserRepetition', async () => { + return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`) + .findOne({ + groupHash: existedEvent.groupHash, + 'payload.user.id': eventUser.id, + timestamp: { + $gte: eventMidnight, + $lt: eventNextMidnight, + }, + }); + }); }); /** @@ -665,8 +782,12 @@ export default class GrouperWorker extends Worker { /** * Check Redis lock - if locked, don't increment either counter */ - const isEventLocked = await this.redis.checkOrSetlockEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id); - const isDailyEventLocked = await this.redis.checkOrSetlockDailyEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id, eventMidnight); + const [isEventLocked, isDailyEventLocked] = await this.measureStep('affectedUsersRedisLocks', stepTimings, async () => { + return [ + await this.redis.checkOrSetlockEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id), + await this.redis.checkOrSetlockDailyEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id, eventMidnight), + ]; + }); shouldIncrementRepetitionAffectedUsers = isEventLocked ? false : shouldIncrementRepetitionAffectedUsers; shouldIncrementDailyAffectedUsers = isDailyEventLocked ? false : shouldIncrementDailyAffectedUsers; diff --git a/workers/grouper/src/metrics/config.ts b/workers/grouper/src/metrics/config.ts index 7fe5abb2..f36821ba 100644 --- a/workers/grouper/src/metrics/config.ts +++ b/workers/grouper/src/metrics/config.ts @@ -23,6 +23,16 @@ export interface GrouperMemoryConfig { handleGrowthWarnMb: number; } +/** + * Parsed config for slow handle diagnostics. + */ +export interface GrouperDiagnosticsConfig { + /** + * Log handle step breakdown when total handle duration is greater than this value. + */ + slowHandleWarnMs: number; +} + /** * Default memory checkpoint interval in tasks. */ @@ -43,6 +53,11 @@ const DEFAULT_MEMORY_GROWTH_WARN_MB = 64; */ const DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB = 16; +/** + * Default slow handle warning threshold in ms. + */ +const DEFAULT_SLOW_HANDLE_WARN_MS = 5000; + /** * Histogram buckets for payload and delta sizes (bytes). */ @@ -70,3 +85,10 @@ export const grouperMemoryConfig: GrouperMemoryConfig = { growthWarnMb: asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WARN_MB, DEFAULT_MEMORY_GROWTH_WARN_MB), handleGrowthWarnMb: asPositiveNumber(process.env.GROUPER_MEMORY_HANDLE_GROWTH_WARN_MB, DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB), }; + +/** + * Slow handle diagnostics config from environment. + */ +export const grouperDiagnosticsConfig: GrouperDiagnosticsConfig = { + slowHandleWarnMs: asPositiveNumber(process.env.GROUPER_SLOW_HANDLE_WARN_MS, DEFAULT_SLOW_HANDLE_WARN_MS), +}; diff --git a/workers/grouper/src/metrics/grouperMetrics.ts b/workers/grouper/src/metrics/grouperMetrics.ts index 4be19e87..02a36306 100644 --- a/workers/grouper/src/metrics/grouperMetrics.ts +++ b/workers/grouper/src/metrics/grouperMetrics.ts @@ -2,7 +2,39 @@ import { client, register } from '../../../../lib/metrics'; import { GROUPER_METRICS_SIZE_BUCKETS } from './config'; type EventType = 'new' | 'repeated'; -type MongoOperation = 'getEvent' | 'saveEvent' | 'saveRepetition' | 'incrementCounter' | 'saveDailyEvents'; +type MongoOperation = + 'findDailyUserRepetition' | + 'findFirstEventByPattern' | + 'findUserRepetition' | + 'getEvent' | + 'getProjectPatterns' | + 'incrementCounter' | + 'saveDailyEvents' | + 'saveEvent' | + 'saveRepetition'; + +export type GrouperStep = + 'affectedUsers' | + 'affectedUsersRedisLocks' | + 'computeDelta' | + 'decodeEvent' | + 'enqueueNotifier' | + 'findSimilarEvent' | + 'getEvent' | + 'hash' | + 'incrementCounter' | + 'payloadSize' | + 'preprocess' | + 'recordProjectMetrics' | + 'saveDailyEvents' | + 'saveNewEvent' | + 'saveRepetition'; + +// eslint-disable-next-line @typescript-eslint/no-magic-numbers +const GROUPER_HANDLE_DURATION_BUCKETS = [0.05, 0.1, 0.25, 0.5, 1, 2, 3, 5, 7.5, 10, 15, 30, 60]; + +// eslint-disable-next-line @typescript-eslint/no-magic-numbers +const GROUPER_STEP_DURATION_BUCKETS = [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 30]; /** * Reuse already registered metric by name, or create one. @@ -39,6 +71,18 @@ export default class GrouperMetrics { () => new client.Histogram({ name: 'hawk_grouper_handle_duration_seconds', help: 'Duration of handle() call in seconds', + buckets: GROUPER_HANDLE_DURATION_BUCKETS, + registers: [ register ], + }) + ); + + private readonly stepDuration = getOrCreateMetric( + 'hawk_grouper_step_duration_seconds', + () => new client.Histogram({ + name: 'hawk_grouper_step_duration_seconds', + help: 'Duration of Grouper handle step in seconds', + labelNames: [ 'step' ], + buckets: GROUPER_STEP_DURATION_BUCKETS, registers: [ register ], }) ); @@ -106,6 +150,22 @@ export default class GrouperMetrics { } } + /** + * Measure a single Grouper handle step duration. + * + * @param step - step label. + * @param callback - callback to execute under timer. + */ + public async observeStepDuration(step: GrouperStep, callback: () => Promise | T): Promise { + const endTimer = this.stepDuration.startTimer({ step }); + + try { + return await callback(); + } finally { + endTimer(); + } + } + /** * Increment events counter by event type. * From e81f695daf0bafa5147321a6544aa7c5b56c8d9e Mon Sep 17 00:00:00 2001 From: Kuchizu Date: Wed, 6 May 2026 15:17:52 +0300 Subject: [PATCH 2/2] refactor(grouper): extract slow handle diagnostics into session --- workers/grouper/src/index.ts | 114 +++++------------- .../src/metrics/slowHandleDiagnostics.ts | 110 +++++++++++++++++ 2 files changed, 143 insertions(+), 81 deletions(-) create mode 100644 workers/grouper/src/metrics/slowHandleDiagnostics.ts diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index dd268a37..64eabe51 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -26,8 +26,9 @@ import { computeDelta } from './utils/repetitionDiff'; import { bucketTimestampMs } from './utils/bucketTimestamp'; import { rightTrim } from '../../../lib/utils/string'; import { hasValue } from '../../../lib/utils/hasValue'; -import GrouperMetrics, { GrouperStep } from './metrics/grouperMetrics'; +import GrouperMetrics from './metrics/grouperMetrics'; import GrouperMemoryMonitor from './metrics/memoryMonitor'; +import SlowHandleDiagnostics, { SlowHandleSession } from './metrics/slowHandleDiagnostics'; import { grouperDiagnosticsConfig, grouperMemoryConfig } from './metrics/config'; /** @@ -101,6 +102,11 @@ export default class GrouperWorker extends Worker { */ private memoryMonitor = new GrouperMemoryMonitor(this.logger, grouperMemoryConfig); + /** + * Slow handle diagnostics helper. + */ + private slowHandleDiagnostics = new SlowHandleDiagnostics(this.logger, this.grouperMetrics, grouperDiagnosticsConfig); + /** * Interval for periodic cache cleanup to prevent memory leaks from unbounded cache growth */ @@ -174,74 +180,14 @@ export default class GrouperWorker extends Worker { } } - /** - * Measure a Grouper handle step and keep per-handle timing for slow logs. - * - * @param step - step name. - * @param timings - current handle timings. - * @param callback - measured callback. - */ - private async measureStep( - step: GrouperStep, - timings: Map, - callback: () => Promise | T - ): Promise { - const startedAt = Date.now(); - - try { - return await this.grouperMetrics.observeStepDuration(step, callback); - } finally { - const durationMs = Date.now() - startedAt; - const previousDurationMs = timings.get(step) || 0; - - timings.set(step, previousDurationMs + durationMs); - } - } - - /** - * Log slow handle breakdown for finding the exact slow operation. - * - * @param startedAt - handle start timestamp. - * @param timings - step timings. - * @param task - handled task. - * @param type - event type. - * @param payloadSize - payload size in bytes. - * @param deltaSize - delta size in bytes. - */ - private logSlowHandle( - startedAt: number, - timings: Map, - task: GroupWorkerTask, - type: 'new' | 'repeated', - payloadSize: number, - deltaSize: number - ): void { - const durationMs = Date.now() - startedAt; - - if (durationMs < grouperDiagnosticsConfig.slowHandleWarnMs) { - return; - } - - const steps = Array.from(timings.entries()) - .sort((first, second) => second[1] - first[1]) - .map(([step, stepDurationMs]) => `${step}=${stepDurationMs}ms`) - .join(' '); - - this.logger.warn( - `[slowHandle] duration=${durationMs}ms project=${task.projectId} type=${type} ` + - `payloadSize=${payloadSize}b deltaSize=${deltaSize}b title="${task.payload?.title}" steps="${steps}"` - ); - } - /** * Internal task handling function * * @param task - event to handle */ private async handleInternal(task: GroupWorkerTask): Promise { - const handleStartedAt = Date.now(); - const stepTimings = new Map(); - const taskPayloadSize = await this.measureStep('payloadSize', stepTimings, () => { + const session = this.slowHandleDiagnostics.startSession(); + const taskPayloadSize = await session.measureStep('payloadSize', () => { return Buffer.byteLength(JSON.stringify(task.payload)); }); const handledTasksCount = ++this.handledTasksCount; @@ -263,12 +209,12 @@ export default class GrouperWorker extends Worker { }; } - let uniqueEventHash = await this.measureStep('hash', stepTimings, () => this.getUniqueEventHash(task)); + let uniqueEventHash = await session.measureStep('hash', () => this.getUniqueEventHash(task)); let existedEvent: GroupedEventDBScheme; let repetitionId = null; let incrementDailyAffectedUsers = false; - await this.measureStep('preprocess', stepTimings, () => { + await session.measureStep('preprocess', () => { /** * Trim source code lines to prevent memory leaks */ @@ -283,7 +229,7 @@ export default class GrouperWorker extends Worker { /** * Find similar events by grouping pattern */ - const similarEvent = await this.measureStep('findSimilarEvent', stepTimings, () => { + const similarEvent = await session.measureStep('findSimilarEvent', () => { return this.findSimilarEvent(task.projectId, task.payload.title); }); @@ -303,7 +249,7 @@ export default class GrouperWorker extends Worker { /** * Find event by group hash. */ - existedEvent = await this.measureStep('getEvent', stepTimings, () => { + existedEvent = await session.measureStep('getEvent', () => { return this.getEvent(task.projectId, uniqueEventHash); }); } @@ -324,7 +270,7 @@ export default class GrouperWorker extends Worker { /** * Insert new event */ - await this.measureStep('saveNewEvent', stepTimings, () => { + await session.measureStep('saveNewEvent', () => { return this.saveEvent(task.projectId, { groupHash: uniqueEventHash, totalCount: 1, @@ -363,8 +309,8 @@ export default class GrouperWorker extends Worker { throw e; } } else { - const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await this.measureStep('affectedUsers', stepTimings, () => { - return this.shouldIncrementAffectedUsers(task, existedEvent, stepTimings); + const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await session.measureStep('affectedUsers', () => { + return this.shouldIncrementAffectedUsers(task, existedEvent, session); }); incrementDailyAffectedUsers = shouldIncrementDailyAffectedUsers; @@ -372,7 +318,7 @@ export default class GrouperWorker extends Worker { /** * Increment existed task's counter */ - await this.measureStep('incrementCounter', stepTimings, () => { + await session.measureStep('incrementCounter', () => { return this.incrementEventCounterAndAffectedUsers(task.projectId, { groupHash: uniqueEventHash, }, incrementAffectedUsers); @@ -381,7 +327,7 @@ export default class GrouperWorker extends Worker { /** * Decode existed event to calculate diffs correctly */ - await this.measureStep('decodeEvent', stepTimings, () => { + await session.measureStep('decodeEvent', () => { decodeUnsafeFields(existedEvent); }); @@ -395,7 +341,7 @@ export default class GrouperWorker extends Worker { /** * Calculate delta between original event and repetition */ - delta = await this.measureStep('computeDelta', stepTimings, () => { + delta = await session.measureStep('computeDelta', () => { return computeDelta(existedEvent.payload, task.payload); }); } catch (e) { @@ -417,7 +363,7 @@ export default class GrouperWorker extends Worker { timestamp: task.timestamp, } as RepetitionDBScheme; - repetitionId = await this.measureStep('saveRepetition', stepTimings, () => { + repetitionId = await session.measureStep('saveRepetition', () => { return this.saveRepetition(task.projectId, newRepetition); }); @@ -436,7 +382,7 @@ export default class GrouperWorker extends Worker { /** * Store events counter by days */ - await this.measureStep('saveDailyEvents', stepTimings, () => { + await session.measureStep('saveDailyEvents', () => { return this.saveDailyEvents( task.projectId, uniqueEventHash, @@ -461,7 +407,7 @@ export default class GrouperWorker extends Worker { const isIgnored = isFirstOccurrence ? false : !!existedEvent?.marks?.ignored; if (!isIgnored) { - await this.measureStep('enqueueNotifier', stepTimings, () => { + await session.measureStep('enqueueNotifier', () => { return this.addTask(WorkerNames.NOTIFIER, { projectId: task.projectId, event: { @@ -475,11 +421,17 @@ export default class GrouperWorker extends Worker { } } - await this.measureStep('recordProjectMetrics', stepTimings, () => { + await session.measureStep('recordProjectMetrics', () => { return this.recordProjectMetrics(task.projectId, 'events-accepted'); }); - this.logSlowHandle(handleStartedAt, stepTimings, task, eventType, taskPayloadSize, deltaSize); + session.logIfSlow({ + projectId: task.projectId, + title: task.payload?.title, + type: eventType, + payloadSize: taskPayloadSize, + deltaSize, + }); } /** @@ -683,13 +635,13 @@ export default class GrouperWorker extends Worker { * * @param task - worker task to process * @param existedEvent - original event to get its user - * @param stepTimings - current handle timings. + * @param session - current slow handle diagnostics session. * @returns {[boolean, boolean]} - whether to increment affected users for the repetition and the daily aggregation */ private async shouldIncrementAffectedUsers( task: GroupWorkerTask, existedEvent: GroupedEventDBScheme, - stepTimings: Map + session: SlowHandleSession ): Promise<[boolean, boolean]> { const eventUser = task.payload.user; @@ -782,7 +734,7 @@ export default class GrouperWorker extends Worker { /** * Check Redis lock - if locked, don't increment either counter */ - const [isEventLocked, isDailyEventLocked] = await this.measureStep('affectedUsersRedisLocks', stepTimings, async () => { + const [isEventLocked, isDailyEventLocked] = await session.measureStep('affectedUsersRedisLocks', async () => { return [ await this.redis.checkOrSetlockEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id), await this.redis.checkOrSetlockDailyEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id, eventMidnight), diff --git a/workers/grouper/src/metrics/slowHandleDiagnostics.ts b/workers/grouper/src/metrics/slowHandleDiagnostics.ts new file mode 100644 index 00000000..1d74d8c4 --- /dev/null +++ b/workers/grouper/src/metrics/slowHandleDiagnostics.ts @@ -0,0 +1,110 @@ +import type { GrouperDiagnosticsConfig } from './config'; +import type GrouperMetrics from './grouperMetrics'; +import type { GrouperStep } from './grouperMetrics'; + +interface LoggerLike { + warn(message: string): void; +} + +/** + * Context describing the handled task for slow handle log line. + */ +export interface SlowHandleContext { + projectId: string; + title?: string; + type: 'new' | 'repeated'; + payloadSize: number; + deltaSize: number; +} + +/** + * Per-handle diagnostics session: owns its own start timestamp and step timings, + * measures Grouper handle steps and emits a slow handle warning on demand. + */ +export class SlowHandleSession { + private readonly startedAt: number = Date.now(); + private readonly timings: Map = new Map(); + private readonly logger: LoggerLike; + private readonly metrics: GrouperMetrics; + private readonly config: GrouperDiagnosticsConfig; + + /** + * @param logger - logger instance. + * @param metrics - Grouper metrics facade used to record step duration. + * @param config - slow handle diagnostics thresholds. + */ + constructor(logger: LoggerLike, metrics: GrouperMetrics, config: GrouperDiagnosticsConfig) { + this.logger = logger; + this.metrics = metrics; + this.config = config; + } + + /** + * Measure a Grouper handle step and accumulate its duration in the session. + * + * @param step - step name. + * @param callback - measured callback. + */ + public async measureStep(step: GrouperStep, callback: () => Promise | T): Promise { + const stepStartedAt = Date.now(); + + try { + return await this.metrics.observeStepDuration(step, callback); + } finally { + const durationMs = Date.now() - stepStartedAt; + const previousDurationMs = this.timings.get(step) || 0; + + this.timings.set(step, previousDurationMs + durationMs); + } + } + + /** + * Log slow handle breakdown if total session duration exceeds the warn threshold. + * + * @param context - handled task context. + */ + public logIfSlow(context: SlowHandleContext): void { + const durationMs = Date.now() - this.startedAt; + + if (durationMs < this.config.slowHandleWarnMs) { + return; + } + + const steps = Array.from(this.timings.entries()) + .sort((first, second) => second[1] - first[1]) + .map(([step, stepDurationMs]) => `${step}=${stepDurationMs}ms`) + .join(' '); + + this.logger.warn( + `[slowHandle] duration=${durationMs}ms project=${context.projectId} type=${context.type} ` + + `payloadSize=${context.payloadSize}b deltaSize=${context.deltaSize}b title="${context.title}" steps="${steps}"` + ); + } +} + +/** + * Factory for per-handle slow handle diagnostics sessions. + */ +export default class SlowHandleDiagnostics { + private readonly logger: LoggerLike; + private readonly metrics: GrouperMetrics; + private readonly config: GrouperDiagnosticsConfig; + + /** + * @param logger - logger instance. + * @param metrics - Grouper metrics facade used to record step duration. + * @param config - slow handle diagnostics thresholds. + */ + constructor(logger: LoggerLike, metrics: GrouperMetrics, config: GrouperDiagnosticsConfig) { + this.logger = logger; + this.metrics = metrics; + this.config = config; + } + + /** + * Start a new per-handle diagnostics session. + */ + public startSession(): SlowHandleSession { + return new SlowHandleSession(this.logger, this.metrics, this.config); + } +}