Skip to content
113 changes: 113 additions & 0 deletions services/apps/cron_service/src/jobs/incomingWebhooksCheck.job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import CronTime from 'cron-time-generator'

import { IS_PROD_ENV } from '@crowd/common'
import { IntegrationStreamWorkerEmitter } from '@crowd/common_services'
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import { QUEUE_CONFIG, getKafkaClient, getKafkaMessageCounts } from '@crowd/queue'
import { KafkaQueueService } from '@crowd/queue/src/vendors/kafka/client'
import { WebhookState } from '@crowd/types'

import { IJobDefinition } from '../types'

const TOPIC = 'integration-stream-worker-high-production'
const GROUP_ID = 'integration-stream-worker-high-production'
const MAX_UNCONSUMED = 50000

const job: IJobDefinition = {
name: 'incoming-webhooks-check',
cronTime: CronTime.everyDay(),
timeout: 30 * 60, // 30 minutes
enabled: async () => IS_PROD_ENV,
process: async (ctx) => {
const kafkaClient = getKafkaClient(QUEUE_CONFIG())
const admin = kafkaClient.admin()
await admin.connect()

let counts: { total: number; consumed: number; unconsumed: number }
try {
counts = await getKafkaMessageCounts(ctx.log, admin, TOPIC, GROUP_ID)
} finally {
await admin.disconnect()
}

if (counts.unconsumed >= MAX_UNCONSUMED) {
ctx.log.info(
`Integration stream worker queue has ${counts.unconsumed} unconsumed messages, skipping!`,
)
return
}

const dbConnection = await getDbConnection(WRITE_DB_CONFIG())

// Clean up orphaned webhooks whose integration was deleted (hard or soft).
// incomingWebhooks has no FK constraint on integrationId so these accumulate silently.
const deleted = await dbConnection.result(
`
delete from "incomingWebhooks" iw
where not exists (
select 1 from integrations i
where i.id = iw."integrationId"
and i."deletedAt" is null
)
`,
)
if (deleted.rowCount > 0) {
ctx.log.info(
`Deleted ${deleted.rowCount} orphaned webhooks with missing or deleted integrations!`,
)
}

const pendingCount = (
await dbConnection.one(
`
select count(*)::int as count
from "incomingWebhooks" iw
join integrations i on iw."integrationId" = i.id and i."deletedAt" is null
where iw.state = $(state)
and iw."createdAt" < now() - interval '1 day'
`,
{ state: WebhookState.PENDING },
)
).count

if (pendingCount <= counts.unconsumed) {
ctx.log.info(`All ${pendingCount} stuck pending webhooks are already in the queue, skipping!`)
return
}

const webhooks = await dbConnection.any<{ id: string }>(
`
select iw.id
from "incomingWebhooks" iw
join integrations i on iw."integrationId" = i.id and i."deletedAt" is null
where iw.state = $(state)
and iw."createdAt" < now() - interval '1 day'
order by iw."createdAt" asc
limit 10000
`,
{ state: WebhookState.PENDING },
)

if (webhooks.length === 0) {
ctx.log.info('No stuck pending webhooks found!')
return
}

ctx.log.info(
`Found ${webhooks.length} of ${pendingCount} stuck pending webhooks, re-triggering!`,
)

const queueService = new KafkaQueueService(kafkaClient, ctx.log)
const emitter = new IntegrationStreamWorkerEmitter(queueService, ctx.log)
await emitter.init()

await emitter.triggerWebhookProcessingBatch(
webhooks.map((w) => w.id),
true,
)

ctx.log.info(`Re-triggered ${webhooks.length} stuck pending webhooks in total!`)
},
}

export default job
140 changes: 49 additions & 91 deletions services/apps/cron_service/src/jobs/integrationResultsCheck.job.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import CronTime from 'cron-time-generator'

import { generateUUIDv1, partition } from '@crowd/common'
import { IS_PROD_ENV, generateUUIDv1, partition } from '@crowd/common'
import { DataSinkWorkerEmitter } from '@crowd/common_services'
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import { Logger } from '@crowd/logging'
import { KafkaAdmin, QUEUE_CONFIG, getKafkaClient } from '@crowd/queue'
import { QUEUE_CONFIG, getKafkaClient, getKafkaMessageCounts } from '@crowd/queue'
import { KafkaQueueService } from '@crowd/queue/src/vendors/kafka/client'
import { DataSinkWorkerQueueMessageType, IntegrationResultState } from '@crowd/types'

Expand All @@ -14,6 +13,7 @@ const job: IJobDefinition = {
name: 'integration-results-check',
cronTime: CronTime.every(10).minutes(),
timeout: 30 * 60, // 30 minutes
enabled: async () => IS_PROD_ENV,
process: async (ctx) => {
const topic = 'data-sink-worker-normal-production'
const groupId = 'data-sink-worker-normal-production'
Expand All @@ -22,107 +22,65 @@ const job: IJobDefinition = {
const admin = kafkaClient.admin()
await admin.connect()

const counts = await getMessageCounts(ctx.log, admin, topic, groupId)
try {
const counts = await getKafkaMessageCounts(ctx.log, admin, topic, groupId)

// if we have less than 50k messages in the queue we can trigger 50k oldest results (we process between 100k and 300k results per hour on average)
if (counts.unconsumed < 50000) {
const dbConnection = await getDbConnection(WRITE_DB_CONFIG())
// if we have less than 50k messages in the queue we can trigger 50k oldest results (we process between 100k and 300k results per hour on average)
if (counts.unconsumed < 50000) {
const dbConnection = await getDbConnection(WRITE_DB_CONFIG())

// we check if we have more than unconsumed pending results so that we don't trigger just the ones in the queue :)
const count = (
await dbConnection.one(
`select count(*) as count from integration.results where state = '${IntegrationResultState.PENDING}'`,
)
).count
// we check if we have more than unconsumed pending results so that we don't trigger just the ones in the queue :)
const count = (
await dbConnection.one(
`select count(*) as count from integration.results where state = '${IntegrationResultState.PENDING}'`,
)
).count

if (count > counts.unconsumed) {
ctx.log.info(`We have ${count} pending results, triggering 100k oldest results!`)
if (count > counts.unconsumed) {
ctx.log.info(`We have ${count} pending results, triggering 100k oldest results!`)

const queueService = new KafkaQueueService(kafkaClient, ctx.log)
const dswEmitter = new DataSinkWorkerEmitter(queueService, ctx.log)
await dswEmitter.init()
const queueService = new KafkaQueueService(kafkaClient, ctx.log)
const dswEmitter = new DataSinkWorkerEmitter(queueService, ctx.log)
await dswEmitter.init()

const resultIds = (
await dbConnection.any(
`select id from integration.results where state = 'pending' order by "createdAt" desc limit 100000`,
)
).map((r) => r.id)

let triggered = 0

for (const batch of partition(resultIds, 10)) {
const messages = batch.map((resultId) => {
return {
payload: {
type: DataSinkWorkerQueueMessageType.PROCESS_INTEGRATION_RESULT,
resultId,
},
groupId: generateUUIDv1(),
deduplicationId: resultId,
}
})
const resultIds = (
await dbConnection.any(
`select id from integration.results where state = 'pending' order by "createdAt" desc limit 100000`,
)
).map((r) => r.id)

let triggered = 0

await dswEmitter.sendMessages(messages)
for (const batch of partition(resultIds, 10)) {
const messages = batch.map((resultId) => {
return {
payload: {
type: DataSinkWorkerQueueMessageType.PROCESS_INTEGRATION_RESULT,
resultId,
},
groupId: generateUUIDv1(),
deduplicationId: resultId,
}
})

triggered += batch.length
await dswEmitter.sendMessages(messages)

if (triggered % 1000 === 0) {
ctx.log.info(`Triggered ${triggered} results!`)
triggered += batch.length

if (triggered % 1000 === 0) {
ctx.log.info(`Triggered ${triggered} results!`)
}
}
}

ctx.log.info(`Triggered ${triggered} results in total!`)
ctx.log.info(`Triggered ${triggered} results in total!`)
}
} else {
ctx.log.info(`We have ${counts.unconsumed} unconsumed messages in the queue, skipping!`)
}
} else {
ctx.log.info(`We have ${counts.unconsumed} unconsumed messages in the queue, skipping!`)
} finally {
await admin.disconnect()
}
},
}

async function getMessageCounts(
log: Logger,
admin: KafkaAdmin,
topic: string,
groupId: string,
): Promise<{
total: number
consumed: number
unconsumed: number
}> {
try {
const topicOffsets = await admin.fetchTopicOffsets(topic)
const offsetsResponse = await admin.fetchOffsets({
groupId: groupId,
topics: [topic],
})

const offsets = offsetsResponse[0].partitions

let totalMessages = 0
let consumedMessages = 0
let totalLeft = 0

for (const offset of offsets) {
const topicOffset = topicOffsets.find((p) => p.partition === offset.partition)
if (topicOffset) {
// Total messages is the latest offset
totalMessages += Number(topicOffset.offset)
// Consumed messages is the consumer group's offset
consumedMessages += Number(offset.offset)
// Unconsumed is the difference
totalLeft += Number(topicOffset.offset) - Number(offset.offset)
}
}

return {
total: totalMessages,
consumed: consumedMessages,
unconsumed: totalLeft,
}
} catch (err) {
log.error(err, 'Failed to get message count!')
throw err
}
}

export default job
Loading
Loading