Skip to content
79 changes: 79 additions & 0 deletions services/apps/cron_service/src/jobs/incomingWebhooksCheck.job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import CronTime from 'cron-time-generator'

import { IS_PROD_ENV } from '@crowd/common'
import { IntegrationStreamWorkerEmitter } from '@crowd/common_services'
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import { QUEUE_CONFIG, getKafkaClient, getKafkaMessageCounts } from '@crowd/queue'
import { KafkaQueueService } from '@crowd/queue/src/vendors/kafka/client'
import { WebhookState } from '@crowd/types'

import { IJobDefinition } from '../types'

const TOPIC = 'integration-stream-worker-high-production'
const GROUP_ID = 'integration-stream-worker-high-production'
Comment thread
cursor[bot] marked this conversation as resolved.
const MAX_UNCONSUMED = 50000

const job: IJobDefinition = {
name: 'incoming-webhooks-check',
cronTime: CronTime.everyDay(),
timeout: 30 * 60, // 30 minutes
enabled: async () => IS_PROD_ENV,
process: async (ctx) => {
const kafkaClient = getKafkaClient(QUEUE_CONFIG())
const admin = kafkaClient.admin()
await admin.connect()

const counts = await getKafkaMessageCounts(ctx.log, admin, TOPIC, GROUP_ID)

if (counts.unconsumed >= MAX_UNCONSUMED) {
Comment thread
themarolt marked this conversation as resolved.
ctx.log.info(
`Integration stream worker queue has ${counts.unconsumed} unconsumed messages, skipping!`,
)
return
}

const dbConnection = await getDbConnection(WRITE_DB_CONFIG())

const count = (
Comment thread
themarolt marked this conversation as resolved.
Outdated
await dbConnection.one(
`select count(*)::int as count from "incomingWebhooks" where state = $(state) and "createdAt" < now() - interval '1 day'`,
{ state: WebhookState.PENDING },
)
).count
Comment thread
themarolt marked this conversation as resolved.

if (count <= counts.unconsumed) {
ctx.log.info(`All ${count} stuck pending webhooks are already in the queue, skipping!`)
return
}

const webhooks = await dbConnection.any<{ id: string; platform: string }>(
`
select iw.id, i.platform
from "incomingWebhooks" iw
join integrations i on iw."integrationId" = i.id
where iw.state = $(state)
and iw."createdAt" < now() - interval '1 day'
order by iw."createdAt" asc
limit 10000
`,
{ state: WebhookState.PENDING },
)
Comment thread
cursor[bot] marked this conversation as resolved.

if (webhooks.length === 0) {
ctx.log.info('No stuck pending webhooks found!')
return
}

ctx.log.info(`Found ${webhooks.length} stuck pending webhooks, re-triggering!`)

const queueService = new KafkaQueueService(kafkaClient, ctx.log)
const emitter = new IntegrationStreamWorkerEmitter(queueService, ctx.log)
await emitter.init()

await emitter.triggerWebhookProcessingBatch(webhooks.map((w) => w.id))

ctx.log.info(`Re-triggered ${webhooks.length} stuck pending webhooks in total!`)
},
}

export default job
54 changes: 4 additions & 50 deletions services/apps/cron_service/src/jobs/integrationResultsCheck.job.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import CronTime from 'cron-time-generator'

import { generateUUIDv1, partition } from '@crowd/common'
import { IS_PROD_ENV, generateUUIDv1, partition } from '@crowd/common'
import { DataSinkWorkerEmitter } from '@crowd/common_services'
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import { Logger } from '@crowd/logging'
import { KafkaAdmin, QUEUE_CONFIG, getKafkaClient } from '@crowd/queue'
import { QUEUE_CONFIG, getKafkaClient, getKafkaMessageCounts } from '@crowd/queue'
import { KafkaQueueService } from '@crowd/queue/src/vendors/kafka/client'
import { DataSinkWorkerQueueMessageType, IntegrationResultState } from '@crowd/types'

Expand All @@ -14,6 +13,7 @@ const job: IJobDefinition = {
name: 'integration-results-check',
cronTime: CronTime.every(10).minutes(),
timeout: 30 * 60, // 30 minutes
enabled: async () => IS_PROD_ENV,
process: async (ctx) => {
const topic = 'data-sink-worker-normal-production'
const groupId = 'data-sink-worker-normal-production'
Expand All @@ -22,7 +22,7 @@ const job: IJobDefinition = {
const admin = kafkaClient.admin()
await admin.connect()

const counts = await getMessageCounts(ctx.log, admin, topic, groupId)
const counts = await getKafkaMessageCounts(ctx.log, admin, topic, groupId)

// if we have less than 50k messages in the queue we can trigger 50k oldest results (we process between 100k and 300k results per hour on average)
if (counts.unconsumed < 50000) {
Expand Down Expand Up @@ -79,50 +79,4 @@ const job: IJobDefinition = {
},
}

async function getMessageCounts(
log: Logger,
admin: KafkaAdmin,
topic: string,
groupId: string,
): Promise<{
total: number
consumed: number
unconsumed: number
}> {
try {
const topicOffsets = await admin.fetchTopicOffsets(topic)
const offsetsResponse = await admin.fetchOffsets({
groupId: groupId,
topics: [topic],
})

const offsets = offsetsResponse[0].partitions

let totalMessages = 0
let consumedMessages = 0
let totalLeft = 0

for (const offset of offsets) {
const topicOffset = topicOffsets.find((p) => p.partition === offset.partition)
if (topicOffset) {
// Total messages is the latest offset
totalMessages += Number(topicOffset.offset)
// Consumed messages is the consumer group's offset
consumedMessages += Number(offset.offset)
// Unconsumed is the difference
totalLeft += Number(topicOffset.offset) - Number(offset.offset)
}
}

return {
total: totalMessages,
consumed: consumedMessages,
unconsumed: totalLeft,
}
} catch (err) {
log.error(err, 'Failed to get message count!')
throw err
}
}

export default job
159 changes: 159 additions & 0 deletions services/apps/cron_service/src/jobs/integrationResultsReporting.job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import CronTime from 'cron-time-generator'

import { IS_DEV_ENV } from '@crowd/common'
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import {
SlackChannel,
SlackMessageSection,
SlackPersona,
sendSlackNotificationAsync,
} from '@crowd/slack'
import { IntegrationResultState } from '@crowd/types'

import { IJobDefinition } from '../types'

interface IResultStateCount {
state: string
count: number
}

interface IErrorGroup {
errorMessage: string
location: string
message: string
count: number
avgRetries: number
maxRetries: number
oldest: Date
newest: Date
platforms: string | null
}

const job: IJobDefinition = {
name: 'integration-results-reporting',
cronTime: IS_DEV_ENV ? CronTime.everyMinute() : CronTime.everyDayAt(8, 30),
Comment thread
themarolt marked this conversation as resolved.
Outdated
timeout: 10 * 60, // 10 minutes
process: async (ctx) => {
ctx.log.info('Running integration-results-reporting job...')

const dbConnection = await getDbConnection(READ_DB_CONFIG(), 3, 0)

// Count results per state
const stateCounts = await dbConnection.any<IResultStateCount>(
`SELECT state, count(*)::int AS count FROM integration.results GROUP BY state ORDER BY count DESC`,
)

const countByState: Record<string, number> = {}
for (const row of stateCounts) {
countByState[row.state] = row.count
}

const pending = countByState[IntegrationResultState.PENDING] ?? 0
const processing = countByState[IntegrationResultState.PROCESSING] ?? 0
const processed = countByState[IntegrationResultState.PROCESSED] ?? 0
const delayed = countByState[IntegrationResultState.DELAYED] ?? 0
const errorCount = countByState[IntegrationResultState.ERROR] ?? 0
const total = pending + processing + processed + delayed + errorCount

// How many delayed results are overdue (i.e. should already be processed)
const overdueDelayed = (
await dbConnection.one<{ count: number }>(
`SELECT count(*)::int AS count FROM integration.results WHERE state = 'delayed' AND "delayedUntil" < now()`,
)
).count

// Break down errors by errorMessage + location, enriched with platform info
const errorGroups = await dbConnection.any<IErrorGroup>(
`
SELECT
COALESCE(r.error->>'errorMessage', '[no errorMessage]') AS "errorMessage",
COALESCE(r.error->>'location', '[no location]') AS location,
COALESCE(r.error->>'message', '[no message]') AS message,
count(*)::int AS count,
round(avg(r.retries), 1)::float AS "avgRetries",
max(r.retries)::int AS "maxRetries",
min(r."createdAt") AS oldest,
max(r."updatedAt") AS newest,
string_agg(DISTINCT i.platform, ', ' ORDER BY i.platform) AS platforms
FROM integration.results r
LEFT JOIN integrations i ON i.id = r."integrationId"
WHERE r.state = 'error'
GROUP BY
r.error->>'errorMessage',
r.error->>'location',
r.error->>'message'
ORDER BY count DESC
LIMIT 20
`,
)

const sections: SlackMessageSection[] = []

sections.push({
title: 'Integration Results Summary',
text: [
`*Total:* ${total.toLocaleString()}`,
'',
`⏳ Pending: *${pending.toLocaleString()}*`,
`⚙️ Processing: *${processing.toLocaleString()}*`,
`✅ Processed: *${processed.toLocaleString()}*`,
`🕐 Delayed: *${delayed.toLocaleString()}*${overdueDelayed > 0 ? ` (${overdueDelayed.toLocaleString()} overdue)` : ''}`,
`❌ Error: *${errorCount.toLocaleString()}*`,
].join('\n'),
})

if (errorCount > 0 && errorGroups.length > 0) {
const lines: string[] = [
`Top ${errorGroups.length} error group${errorGroups.length !== 1 ? 's' : ''} out of *${errorCount.toLocaleString()}* total errors:`,
'',
]

for (const group of errorGroups) {
const oldestHoursAgo = Math.round(
(Date.now() - new Date(group.oldest).getTime()) / 3_600_000,
)
const newestHoursAgo = Math.round(
(Date.now() - new Date(group.newest).getTime()) / 3_600_000,
)
const ageLabel =
oldestHoursAgo === newestHoursAgo
? formatHoursAgo(oldestHoursAgo)
: `${formatHoursAgo(newestHoursAgo)} – ${formatHoursAgo(oldestHoursAgo)}`

lines.push(
`• *${group.count}x* \`${group.errorMessage}\``,
` _Location:_ \`${group.location}\` | _retries avg/max:_ ${group.avgRetries}/${group.maxRetries}${group.platforms ? ` | _platforms:_ \`${group.platforms}\`` : ''}`,
` _Age:_ ${ageLabel}`,
` _Detail:_ ${group.message}`,
'',
)
}

sections.push({
title: `Error Breakdown (top ${errorGroups.length})`,
text: lines.join('\n'),
})
}

const persona = errorCount > 0 ? SlackPersona.WARNING_PROPAGATOR : SlackPersona.INFO_NOTIFIER

await sendSlackNotificationAsync(
SlackChannel.CDP_INTEGRATIONS_ALERTS,
persona,
'Integration Results Daily Report',
sections,
)

ctx.log.info(
`Integration results report sent: pending=${pending}, delayed=${delayed} (${overdueDelayed} overdue), errors=${errorCount}`,
)
},
}

function formatHoursAgo(hours: number): string {
if (hours < 1) return 'just now'
if (hours < 24) return `${hours}h ago`
return `${Math.round(hours / 24)}d ago`
}

export default job
Loading
Loading