From 4b04b1525186778307135c746656957cbd6dee7e Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Tue, 9 Jun 2026 17:15:01 +0100 Subject: [PATCH 1/2] feat: extend enricher to cover extra fields Signed-off-by: Mouad BANI --- .../V1780996561__repo_activity_snapshot.sql | 30 ++ .../src/enricher/computeMedians.ts | 105 +++++ .../src/enricher/fetchActivitySnapshot.ts | 385 ++++++++++++++++++ .../src/enricher/fetchLightRepo.ts | 77 +++- .../src/enricher/runEnrichmentLoop.ts | 54 ++- .../packages_worker/src/enricher/types.ts | 24 ++ .../src/enricher/updateEnrichedRepos.ts | 12 +- .../enricher/updateRepoActivitySnapshot.ts | 74 ++++ 8 files changed, 748 insertions(+), 13 deletions(-) create mode 100644 backend/src/osspckgs/migrations/V1780996561__repo_activity_snapshot.sql create mode 100644 services/apps/packages_worker/src/enricher/computeMedians.ts create mode 100644 services/apps/packages_worker/src/enricher/fetchActivitySnapshot.ts create mode 100644 services/apps/packages_worker/src/enricher/updateRepoActivitySnapshot.ts diff --git a/backend/src/osspckgs/migrations/V1780996561__repo_activity_snapshot.sql b/backend/src/osspckgs/migrations/V1780996561__repo_activity_snapshot.sql new file mode 100644 index 0000000000..96549d7f2d --- /dev/null +++ b/backend/src/osspckgs/migrations/V1780996561__repo_activity_snapshot.sql @@ -0,0 +1,30 @@ +ALTER TABLE repos ADD COLUMN IF NOT EXISTS security_policy_enabled boolean; +ALTER TABLE repos ADD COLUMN IF NOT EXISTS security_file_enabled boolean; +ALTER TABLE repos ADD COLUMN IF NOT EXISTS snapshot_at timestamptz; + +CREATE TABLE IF NOT EXISTS repo_activity_snapshot ( + repo_id bigint PRIMARY KEY REFERENCES repos(id) ON DELETE CASCADE, + snapshot_at timestamptz NOT NULL, + window_months int NOT NULL DEFAULT 12, + -- commit activity + commits_last_12m int, + commits_last_6m int, + commits_prior_6m int, + -- PR health + prs_opened_last_12m int, + prs_merged_last_12m int, + prs_closed_unmerged_12m int, + pr_median_time_to_merge_hours int, + pr_median_time_to_first_response_hours int, + -- issue health + issues_opened_last_12m int, + issues_closed_last_12m int, + issues_opened_last_6m int, + issues_opened_prior_6m int, + issues_open_now int, + issue_median_time_to_close_hours int, + issue_median_time_to_first_response_hours int +); + +CREATE INDEX IF NOT EXISTS repo_activity_snapshot_snapshot_at_idx + ON repo_activity_snapshot (snapshot_at); diff --git a/services/apps/packages_worker/src/enricher/computeMedians.ts b/services/apps/packages_worker/src/enricher/computeMedians.ts new file mode 100644 index 0000000000..a5b7cdd7e5 --- /dev/null +++ b/services/apps/packages_worker/src/enricher/computeMedians.ts @@ -0,0 +1,105 @@ +const MS_PER_HOUR = 1000 * 60 * 60 + +function median(values: number[]): number | null { + if (values.length === 0) return null + const sorted = [...values].sort((a, b) => a - b) + const middleIndex = Math.floor(sorted.length / 2) + return sorted.length % 2 === 0 + ? (sorted[middleIndex - 1] + sorted[middleIndex]) / 2 + : sorted[middleIndex] +} + +function hoursBetween(startDate: string, endDate: string): number { + return (new Date(endDate).getTime() - new Date(startDate).getTime()) / MS_PER_HOUR +} + +function toIntHours(hours: number | null): number | null { + return hours != null ? Math.round(hours) : null +} + +// Shapes mirror exactly what GitHub GraphQL returns for comments/reviews nodes +export interface ResponseNode { + createdAt: string + author: { login: string } | null +} + +export interface PrNode { + createdAt: string + mergedAt: string | null + author: { login: string } | null + comments: { nodes: ResponseNode[] } + reviews: { nodes: ResponseNode[] } +} + +export interface IssueNode { + createdAt: string + closedAt: string | null + author: { login: string } | null + comments: { nodes: ResponseNode[] } +} + +function firstNonAuthorResponseHours( + itemCreatedAt: string, + authorLogin: string | null, + responses: ResponseNode[], +): number | null { + const firstResponse = responses.find( + (response) => response.author?.login && response.author.login !== authorLogin, + ) + return firstResponse ? hoursBetween(itemCreatedAt, firstResponse.createdAt) : null +} + +export function computePrMedians(prs: PrNode[]): { + medianTimeToMergeHours: number | null + medianTimeToFirstResponseHours: number | null +} { + const mergeHours: number[] = [] + const firstResponseHours: number[] = [] + + for (const pr of prs) { + if (pr.mergedAt != null) { + mergeHours.push(hoursBetween(pr.createdAt, pr.mergedAt)) + } + + const allResponses = [...pr.comments.nodes, ...pr.reviews.nodes].sort( + (left, right) => new Date(left.createdAt).getTime() - new Date(right.createdAt).getTime(), + ) + const responseHours = firstNonAuthorResponseHours( + pr.createdAt, + pr.author?.login ?? null, + allResponses, + ) + if (responseHours != null) firstResponseHours.push(responseHours) + } + + return { + medianTimeToMergeHours: toIntHours(median(mergeHours)), + medianTimeToFirstResponseHours: toIntHours(median(firstResponseHours)), + } +} + +export function computeIssueMedians(issues: IssueNode[]): { + medianTimeToCloseHours: number | null + medianTimeToFirstResponseHours: number | null +} { + const closeHours: number[] = [] + const firstResponseHours: number[] = [] + + for (const issue of issues) { + if (issue.closedAt != null) { + closeHours.push(hoursBetween(issue.createdAt, issue.closedAt)) + } + + const responseHours = firstNonAuthorResponseHours( + issue.createdAt, + issue.author?.login ?? null, + issue.comments.nodes, + ) + if (responseHours != null) firstResponseHours.push(responseHours) + } + + return { + medianTimeToCloseHours: toIntHours(median(closeHours)), + medianTimeToFirstResponseHours: toIntHours(median(firstResponseHours)), + } +} diff --git a/services/apps/packages_worker/src/enricher/fetchActivitySnapshot.ts b/services/apps/packages_worker/src/enricher/fetchActivitySnapshot.ts new file mode 100644 index 0000000000..fd68aab376 --- /dev/null +++ b/services/apps/packages_worker/src/enricher/fetchActivitySnapshot.ts @@ -0,0 +1,385 @@ +import { getServiceChildLogger } from '@crowd/logging' + +import { IssueNode, PrNode, computeIssueMedians, computePrMedians } from './computeMedians' +import { FetchError, RepoActivitySnapshot } from './types' + +const log = getServiceChildLogger('fetch-activity-snapshot') + +const GITHUB_GRAPHQL_URL = 'https://api.github.com/graphql' +const SNAPSHOT_WINDOW_MONTHS = 12 +const PAGE_SIZE = 100 +const RESPONSES_PER_NODE = 50 + +interface GraphqlError { + type?: string + message?: string +} + +interface GraphqlResponse { + data?: T + errors?: GraphqlError[] +} + +async function graphqlRequest( + query: string, + variables: Record, + token: string, + timeoutMs: number, +): Promise { + const controller = new AbortController() + const timeoutId = setTimeout(() => controller.abort(), timeoutMs) + + let response: Response + try { + response = await fetch(GITHUB_GRAPHQL_URL, { + method: 'POST', + headers: { Authorization: `bearer ${token}`, 'Content-Type': 'application/json' }, + body: JSON.stringify({ query, variables }), + signal: controller.signal, + }) + } catch (err) { + throw new FetchError('TRANSIENT', `Network error: ${(err as Error).message}`) + } finally { + clearTimeout(timeoutId) + } + + const resetSec = parseInt(response.headers.get('x-ratelimit-reset') ?? '0', 10) + const resetMs = resetSec ? resetSec * 1000 + 5_000 : Date.now() + 65_000 + + if (response.status === 401) throw new FetchError('AUTH', '401 Unauthorized') + if (response.status === 403) { + const body = await response.text() + if (body.toLowerCase().includes('rate limit')) + throw new FetchError('RATE_LIMIT', 'Rate limited', resetMs) + throw new FetchError('AUTH', '403 Forbidden') + } + if (response.status === 404) throw new FetchError('NOT_FOUND', '404 Not Found') + if (response.status >= 500) throw new FetchError('TRANSIENT', `${response.status} Server Error`) + + const json = (await response.json()) as GraphqlResponse + if (json.errors?.length) { + const error = json.errors[0] + if (error.type === 'RATE_LIMITED' || error.message?.toLowerCase().includes('rate limit')) + throw new FetchError('RATE_LIMIT', 'RATE_LIMITED', resetMs) + if (error.type === 'NOT_FOUND') throw new FetchError('NOT_FOUND', 'NOT_FOUND') + if (error.message?.toLowerCase().includes('ip allow list')) + throw new FetchError('AUTH', 'IP allowlist blocked') + throw new FetchError('TRANSIENT', `GraphQL error: ${error.message ?? error.type}`) + } + + return json.data as T +} + +function toDateString(date: Date): string { + return date.toISOString().slice(0, 10) +} + +function buildDateWindows(): { + since12m: Date + since12mIso: string + since6mIso: string + since12mDate: string + since6mDate: string +} { + const now = new Date() + + const since12m = new Date(now) + since12m.setMonth(now.getMonth() - SNAPSHOT_WINDOW_MONTHS) + + const since6m = new Date(now) + since6m.setMonth(now.getMonth() - SNAPSHOT_WINDOW_MONTHS / 2) + + return { + since12m, + since12mIso: since12m.toISOString(), + since6mIso: since6m.toISOString(), + since12mDate: toDateString(since12m), + since6mDate: toDateString(since6m), + } +} + +interface RateLimit { + cost: number +} + +interface IssueCount { + issueCount: number +} + +interface CommitHistory { + totalCount: number +} + +interface PageInfo { + hasNextPage: boolean + endCursor?: string +} + +interface SummaryQueryResult { + rateLimit: RateLimit + repository: { + defaultBranchRef?: { + target?: { + commits12m?: CommitHistory + commits6m?: CommitHistory + commitsPrior6m?: CommitHistory + } + } + } + prsOpened: IssueCount + prsMerged: IssueCount + prsClosedUnmerged: IssueCount + prsOpenNow: IssueCount + issuesOpened: IssueCount + issuesClosed: IssueCount + issuesOpened6m: IssueCount + issuesOpenNow: IssueCount +} + +interface PrPageQueryResult { + rateLimit: RateLimit + repository: { + pullRequests: { + pageInfo: PageInfo + nodes: PrNode[] + } + } +} + +interface IssuePageQueryResult { + rateLimit: RateLimit + repository: { + issues: { + pageInfo: PageInfo + nodes: IssueNode[] + } + } +} + +const SUMMARY_QUERY = ` + query( + $owner: String!, $name: String!, + $since12m: GitTimestamp!, $since6m: GitTimestamp!, + $searchPrsOpened: String!, $searchPrsMerged: String!, + $searchPrsClosedUnmerged: String!, $searchPrsOpenNow: String!, + $searchIssuesOpened: String!, $searchIssuesClosed: String!, + $searchIssuesOpened6m: String!, $searchIssuesOpenNow: String! + ) { + rateLimit { cost } + repository(owner: $owner, name: $name) { + defaultBranchRef { + target { + ... on Commit { + commits12m: history(since: $since12m) { totalCount } + commits6m: history(since: $since6m) { totalCount } + commitsPrior6m: history(since: $since12m, until: $since6m) { totalCount } + } + } + } + } + prsOpened: search(query: $searchPrsOpened, type: ISSUE) { issueCount } + prsMerged: search(query: $searchPrsMerged, type: ISSUE) { issueCount } + prsClosedUnmerged: search(query: $searchPrsClosedUnmerged, type: ISSUE) { issueCount } + prsOpenNow: search(query: $searchPrsOpenNow, type: ISSUE) { issueCount } + issuesOpened: search(query: $searchIssuesOpened, type: ISSUE) { issueCount } + issuesClosed: search(query: $searchIssuesClosed, type: ISSUE) { issueCount } + issuesOpened6m: search(query: $searchIssuesOpened6m, type: ISSUE) { issueCount } + issuesOpenNow: search(query: $searchIssuesOpenNow, type: ISSUE) { issueCount } + } +` + +const PR_PAGE_QUERY = ` + query($owner: String!, $name: String!, $cursor: String) { + rateLimit { cost } + repository(owner: $owner, name: $name) { + pullRequests(first: ${PAGE_SIZE}, orderBy: { field: CREATED_AT, direction: DESC }, after: $cursor) { + pageInfo { hasNextPage endCursor } + nodes { + createdAt + mergedAt + author { login } + comments(first: ${RESPONSES_PER_NODE}) { nodes { createdAt author { login } } } + reviews(first: ${RESPONSES_PER_NODE}) { nodes { createdAt author { login } } } + } + } + } + } +` + +async function pagePrs( + owner: string, + name: string, + token: string, + timeoutMs: number, + windowStart: Date, +): Promise<{ nodes: PrNode[]; rateLimitCost: number }> { + const nodes: PrNode[] = [] + let rateLimitCost = 0 + let cursor: string | undefined + + for (;;) { + const variables: Record = { owner, name } + if (cursor) variables.cursor = cursor + + const data = await graphqlRequest(PR_PAGE_QUERY, variables, token, timeoutMs) + rateLimitCost += data.rateLimit.cost + + const connection = data.repository.pullRequests + let reachedWindowBoundary = false + + for (const node of connection.nodes) { + if (new Date(node.createdAt) < windowStart) { + reachedWindowBoundary = true + break + } + nodes.push(node) + } + + if (reachedWindowBoundary || !connection.pageInfo.hasNextPage) break + cursor = connection.pageInfo.endCursor + } + + return { nodes, rateLimitCost } +} + +const ISSUE_PAGE_QUERY = ` + query($owner: String!, $name: String!, $cursor: String) { + rateLimit { cost } + repository(owner: $owner, name: $name) { + issues(first: ${PAGE_SIZE}, orderBy: { field: CREATED_AT, direction: DESC }, after: $cursor) { + pageInfo { hasNextPage endCursor } + nodes { + createdAt + closedAt + author { login } + comments(first: ${RESPONSES_PER_NODE}) { nodes { createdAt author { login } } } + } + } + } + } +` + +async function pageIssues( + owner: string, + name: string, + token: string, + timeoutMs: number, + windowStart: Date, +): Promise<{ nodes: IssueNode[]; rateLimitCost: number }> { + const nodes: IssueNode[] = [] + let rateLimitCost = 0 + let cursor: string | undefined + + for (;;) { + const variables: Record = { owner, name } + if (cursor) variables.cursor = cursor + + const data = await graphqlRequest( + ISSUE_PAGE_QUERY, + variables, + token, + timeoutMs, + ) + rateLimitCost += data.rateLimit.cost + + const connection = data.repository.issues + let reachedWindowBoundary = false + + for (const node of connection.nodes) { + if (new Date(node.createdAt) < windowStart) { + reachedWindowBoundary = true + break + } + nodes.push(node) + } + + if (reachedWindowBoundary || !connection.pageInfo.hasNextPage) break + cursor = connection.pageInfo.endCursor + } + + return { nodes, rateLimitCost } +} + +export async function fetchActivitySnapshot( + repoId: string, + owner: string, + name: string, + token: string, + timeoutMs: number, +): Promise { + const { since12m, since12mIso, since6mIso, since12mDate, since6mDate } = buildDateWindows() + const repoFilter = `repo:${owner}/${name}` + + const summaryData = await graphqlRequest( + SUMMARY_QUERY, + { + owner, + name, + since12m: since12mIso, + since6m: since6mIso, + searchPrsOpened: `${repoFilter} is:pr created:>=${since12mDate}`, + searchPrsMerged: `${repoFilter} is:pr is:merged merged:>=${since12mDate}`, + searchPrsClosedUnmerged: `${repoFilter} is:pr is:unmerged is:closed created:>=${since12mDate}`, + searchPrsOpenNow: `${repoFilter} is:pr is:open`, + searchIssuesOpened: `${repoFilter} is:issue created:>=${since12mDate}`, + searchIssuesClosed: `${repoFilter} is:issue is:closed closed:>=${since12mDate}`, + searchIssuesOpened6m: `${repoFilter} is:issue created:>=${since6mDate}`, + searchIssuesOpenNow: `${repoFilter} is:issue is:open`, + }, + token, + timeoutMs, + ) + + let totalRateLimitCost = summaryData.rateLimit.cost + + const commitTarget = summaryData.repository.defaultBranchRef?.target + const commitsLast12m: number | null = commitTarget?.commits12m?.totalCount ?? null + const commitsLast6m: number | null = commitTarget?.commits6m?.totalCount ?? null + const commitsPrior6m: number | null = commitTarget?.commitsPrior6m?.totalCount ?? null + + log.debug({ owner, name, commitsLast12m, commitsLast6m, commitsPrior6m }, 'Commit totals fetched') + + const [prResult, issueResult] = await Promise.all([ + pagePrs(owner, name, token, timeoutMs, since12m), + pageIssues(owner, name, token, timeoutMs, since12m), + ]) + + totalRateLimitCost += prResult.rateLimitCost + issueResult.rateLimitCost + + log.debug( + { + owner, + name, + prCount: prResult.nodes.length, + issueCount: issueResult.nodes.length, + totalRateLimitCost, + }, + 'Snapshot paging complete', + ) + + const prMedians = computePrMedians(prResult.nodes) + const issueMedians = computeIssueMedians(issueResult.nodes) + + return { + repoId, + snapshotAt: new Date().toISOString(), + windowMonths: SNAPSHOT_WINDOW_MONTHS, + commitsLast12m, + commitsLast6m, + commitsPrior6m, + prsOpenedLast12m: summaryData.prsOpened.issueCount, + prsMergedLast12m: summaryData.prsMerged.issueCount, + prsClosedUnmerged12m: summaryData.prsClosedUnmerged.issueCount, + prMedianTimeToMergeHours: prMedians.medianTimeToMergeHours, + prMedianTimeToFirstResponseHours: prMedians.medianTimeToFirstResponseHours, + issuesOpenedLast12m: summaryData.issuesOpened.issueCount, + issuesClosedLast12m: summaryData.issuesClosed.issueCount, + issuesOpenedLast6m: summaryData.issuesOpened6m.issueCount, + issuesOpenedPrior6m: + summaryData.issuesOpened.issueCount - summaryData.issuesOpened6m.issueCount, + issuesOpenNow: summaryData.issuesOpenNow.issueCount, + issueMedianTimeToCloseHours: issueMedians.medianTimeToCloseHours, + issueMedianTimeToFirstResponseHours: issueMedians.medianTimeToFirstResponseHours, + rateLimitCost: totalRateLimitCost, + } +} diff --git a/services/apps/packages_worker/src/enricher/fetchLightRepo.ts b/services/apps/packages_worker/src/enricher/fetchLightRepo.ts index 1d9aa4878a..4d59f13cc6 100644 --- a/services/apps/packages_worker/src/enricher/fetchLightRepo.ts +++ b/services/apps/packages_worker/src/enricher/fetchLightRepo.ts @@ -1,6 +1,10 @@ +import { getServiceChildLogger } from '@crowd/logging' + import { FetchError, LightRepoResult } from './types' -const GRAPHQL_URL = 'https://api.github.com/graphql' +const log = getServiceChildLogger('fetch-light-repo') + +const GITHUB_API_URL = 'https://api.github.com' const REPO_QUERY = ` query($owner: String!, $name: String!) { @@ -18,6 +22,7 @@ const REPO_QUERY = ` isDisabled isFork createdAt + isSecurityPolicyEnabled } } ` @@ -28,6 +33,66 @@ export function parseGithubUrl(url: string): { owner: string; name: string } { return { owner: match[1], name: match[2] } } +// community/profile API doesn't reliably return files.security — use Contents API instead. +async function fetchSecurityFileEnabled( + url: string, + owner: string, + name: string, + token: string, + timeoutMs: number, +): Promise { + const headers = { Authorization: `bearer ${token}`, Accept: 'application/vnd.github+json' } + const check = async (path: string): Promise => { + const controller = new AbortController() + const timeoutId = setTimeout(() => controller.abort(), timeoutMs) + try { + const response = await fetch(`${GITHUB_API_URL}/repos/${owner}/${name}/contents/${path}`, { + headers, + signal: controller.signal, + }) + return response.status === 200 + } catch (err) { + log.warn({ url, path, err }, 'Security file check error') + return false + } finally { + clearTimeout(timeoutId) + } + } + + try { + const [root, dotGithub] = await Promise.all([ + check('SECURITY.md'), + check('.github/SECURITY.md'), + ]) + return root || dotGithub + } catch (err) { + log.warn({ url, err }, 'Security file check failed — securityFileEnabled will be null') + return null + } +} + +interface RepoGraphqlResponse { + data?: { + rateLimit: { limit: number; cost: number; remaining: number; resetAt: string } + repository: { + description: string | null + primaryLanguage: { name: string } | null + repositoryTopics: { nodes: Array<{ topic: { name: string } }> } + stargazerCount: number + forkCount: number + watchers: { totalCount: number } + issues: { totalCount: number } + pushedAt: string | null + isArchived: boolean + isDisabled: boolean + isFork: boolean + createdAt: string + isSecurityPolicyEnabled: boolean + } | null + } + errors?: Array<{ type?: string; message?: string }> +} + export async function fetchLightRepo( url: string, token: string, @@ -40,7 +105,7 @@ export async function fetchLightRepo( let response: Response try { - response = await fetch(GRAPHQL_URL, { + response = await fetch(`${GITHUB_API_URL}/graphql`, { method: 'POST', headers: { Authorization: `bearer ${token}`, @@ -70,8 +135,10 @@ export async function fetchLightRepo( if (response.status === 404) throw new FetchError('NOT_FOUND', `404 for ${url}`) if (response.status >= 500) throw new FetchError('TRANSIENT', `${response.status} for ${url}`) - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const json = (await response.json()) as any + const [json, securityFileEnabled] = await Promise.all([ + response.json() as Promise, + fetchSecurityFileEnabled(url, owner, name, token, timeoutMs), + ]) if (json.errors?.length) { const err = json.errors[0] @@ -105,6 +172,8 @@ export async function fetchLightRepo( disabled: repo.isDisabled ?? null, isFork: repo.isFork ?? null, createdAt: repo.createdAt ?? null, + securityPolicyEnabled: repo.isSecurityPolicyEnabled ?? null, + securityFileEnabled, rateLimit: json.data?.rateLimit ?? null, } } diff --git a/services/apps/packages_worker/src/enricher/runEnrichmentLoop.ts b/services/apps/packages_worker/src/enricher/runEnrichmentLoop.ts index d0950c4772..730d3370f3 100644 --- a/services/apps/packages_worker/src/enricher/runEnrichmentLoop.ts +++ b/services/apps/packages_worker/src/enricher/runEnrichmentLoop.ts @@ -3,10 +3,12 @@ import { getServiceChildLogger } from '@crowd/logging' import { getEnricherConfig } from '../config' +import { fetchActivitySnapshot } from './fetchActivitySnapshot' import { fetchLightRepo, parseGithubUrl } from './fetchLightRepo' import { GithubAppConfig, getInstallationToken } from './githubAppAuth' -import { FetchError, LightRepoResult } from './types' +import { FetchError, LightRepoResult, RepoActivitySnapshot } from './types' import { bulkUpdateEnrichedRepos, markReposSkipped } from './updateEnrichedRepos' +import { bulkUpsertRepoActivitySnapshot } from './updateRepoActivitySnapshot' const log = getServiceChildLogger('github-repos-enricher') @@ -88,6 +90,7 @@ async function fetchWithRetries( class WriteBuffer { private results: LightRepoResult[] = [] + private snapshots: RepoActivitySnapshot[] = [] private skipUrls: string[] = [] private lastFlushAt = Date.now() private flushing = false @@ -98,6 +101,10 @@ class WriteBuffer { this.results.push(result) } + addSnapshot(snapshot: RepoActivitySnapshot): void { + this.snapshots.push(snapshot) + } + addSkip(url: string): void { this.skipUrls.push(url) } @@ -113,13 +120,19 @@ class WriteBuffer { async flush(): Promise { const batch = [...this.results] + const snapshotBatch = [...this.snapshots] const skips = [...this.skipUrls] this.lastFlushAt = Date.now() this.flushing = true try { - await Promise.all([bulkUpdateEnrichedRepos(this.qx, batch), markReposSkipped(this.qx, skips)]) - // Clear only after both writes succeed — preserves items if the DB call throws + await Promise.all([ + bulkUpdateEnrichedRepos(this.qx, batch), + bulkUpsertRepoActivitySnapshot(this.qx, snapshotBatch), + markReposSkipped(this.qx, skips), + ]) + // Clear only after all writes succeed — preserves items if the DB call throws this.results.splice(0, batch.length) + this.snapshots.splice(0, snapshotBatch.length) this.skipUrls.splice(0, skips.length) } finally { this.flushing = false @@ -194,7 +207,6 @@ async function runStreamingPool( fillQueue() if (pendingFetch) await pendingFetch } - // Prefetch next batch when queue gets low if (queue.length < DB_FETCH_SIZE / 2 && !pendingFetch && !dbDone) fillQueue() return queue.shift() ?? null } @@ -209,13 +221,15 @@ async function runStreamingPool( const row = await nextRow() if (!row) break + let parsedUrl: { owner: string; name: string } try { - parseGithubUrl(row.url) + parsedUrl = parseGithubUrl(row.url) } catch { log.warn({ url: row.url }, 'Skipping non-GitHub URL') writeBuffer.addSkip(row.url) continue } + const { owner, name } = parsedUrl const { installationId, waitMs } = selectInstallation( installationIds, @@ -243,6 +257,36 @@ async function runStreamingPool( if (outcome.kind === 'success') { metrics.totalFetched++ writeBuffer.add(outcome.data) + + try { + const snapshot = await fetchActivitySnapshot( + row.id, + owner, + name, + token, + config.fetchTimeoutMs, + ) + metrics.totalRequests += snapshot.rateLimitCost + writeBuffer.addSnapshot(snapshot) + } catch (snapshotErr) { + if (snapshotErr instanceof FetchError && snapshotErr.kind === 'RATE_LIMIT') { + const resetAt = snapshotErr.resetAt ?? Date.now() + 60_000 + parkedUntil.set(installationId, resetAt) + log.warn( + { installationId, resetAt: new Date(resetAt).toISOString() }, + 'Snapshot rate limited — parking installation', + ) + } else { + log.warn( + { + url: row.url, + errKind: snapshotErr instanceof FetchError ? snapshotErr.kind : 'UNKNOWN', + errMsg: (snapshotErr as Error).message, + }, + 'Snapshot fetch failed — skipping snapshot', + ) + } + } } else if (outcome.kind === 'permanent') { writeBuffer.addSkip(row.url) } diff --git a/services/apps/packages_worker/src/enricher/types.ts b/services/apps/packages_worker/src/enricher/types.ts index cff8e22c5e..c0f82b2b56 100644 --- a/services/apps/packages_worker/src/enricher/types.ts +++ b/services/apps/packages_worker/src/enricher/types.ts @@ -15,6 +15,8 @@ export interface LightRepoResult { disabled: boolean | null isFork: boolean | null createdAt: string | null + securityPolicyEnabled: boolean | null + securityFileEnabled: boolean | null rateLimit: { limit: number cost: number @@ -23,6 +25,28 @@ export interface LightRepoResult { } | null } +export interface RepoActivitySnapshot { + repoId: string + snapshotAt: string + windowMonths: number + commitsLast12m: number | null + commitsLast6m: number | null + commitsPrior6m: number | null + prsOpenedLast12m: number | null + prsMergedLast12m: number | null + prsClosedUnmerged12m: number | null + prMedianTimeToMergeHours: number | null + prMedianTimeToFirstResponseHours: number | null + issuesOpenedLast12m: number | null + issuesClosedLast12m: number | null + issuesOpenedLast6m: number | null + issuesOpenedPrior6m: number | null + issuesOpenNow: number | null + issueMedianTimeToCloseHours: number | null + issueMedianTimeToFirstResponseHours: number | null + rateLimitCost: number +} + export type FetchErrorKind = 'RATE_LIMIT' | 'TRANSIENT' | 'NOT_FOUND' | 'AUTH' | 'MALFORMED' export class FetchError extends Error { diff --git a/services/apps/packages_worker/src/enricher/updateEnrichedRepos.ts b/services/apps/packages_worker/src/enricher/updateEnrichedRepos.ts index 6bfa8ee790..2982da144d 100644 --- a/services/apps/packages_worker/src/enricher/updateEnrichedRepos.ts +++ b/services/apps/packages_worker/src/enricher/updateEnrichedRepos.ts @@ -22,10 +22,12 @@ export async function bulkUpdateEnrichedRepos( forks = v.forks::int, watchers = v.watchers::int, open_issues = v.open_issues::int, - last_commit_at = NULLIF(v.last_commit_at, '')::timestamptz, - archived = v.archived::bool, - disabled = v.disabled::bool, - is_fork = v.is_fork::bool, + last_commit_at = NULLIF(v.last_commit_at, '')::timestamptz, + archived = v.archived::bool, + disabled = v.disabled::bool, + is_fork = v.is_fork::bool, + security_policy_enabled = v.security_policy_enabled::bool, + security_file_enabled = v.security_file_enabled::bool, host = COALESCE(r.host, v.host), owner = COALESCE(r.owner, v.owner), name = COALESCE(r.name, v.name), @@ -45,6 +47,8 @@ export async function bulkUpdateEnrichedRepos( j->>'archived' AS archived, j->>'disabled' AS disabled, j->>'isFork' AS is_fork, + j->>'securityPolicyEnabled' AS security_policy_enabled, + j->>'securityFileEnabled' AS security_file_enabled, j->>'host' AS host, j->>'owner' AS owner, j->>'name' AS name, diff --git a/services/apps/packages_worker/src/enricher/updateRepoActivitySnapshot.ts b/services/apps/packages_worker/src/enricher/updateRepoActivitySnapshot.ts new file mode 100644 index 0000000000..b610682012 --- /dev/null +++ b/services/apps/packages_worker/src/enricher/updateRepoActivitySnapshot.ts @@ -0,0 +1,74 @@ +import { QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' + +import { RepoActivitySnapshot } from './types' + +export async function bulkUpsertRepoActivitySnapshot( + qx: QueryExecutor, + rows: RepoActivitySnapshot[], +): Promise { + if (rows.length === 0) return + + await qx.result( + ` + INSERT INTO repo_activity_snapshot ( + repo_id, + snapshot_at, + window_months, + commits_last_12m, + commits_last_6m, + commits_prior_6m, + prs_opened_last_12m, + prs_merged_last_12m, + prs_closed_unmerged_12m, + pr_median_time_to_merge_hours, + pr_median_time_to_first_response_hours, + issues_opened_last_12m, + issues_closed_last_12m, + issues_opened_last_6m, + issues_opened_prior_6m, + issues_open_now, + issue_median_time_to_close_hours, + issue_median_time_to_first_response_hours + ) + SELECT + (j->>'repoId')::bigint, + (j->>'snapshotAt')::timestamptz, + (j->>'windowMonths')::int, + (j->>'commitsLast12m')::int, + (j->>'commitsLast6m')::int, + (j->>'commitsPrior6m')::int, + (j->>'prsOpenedLast12m')::int, + (j->>'prsMergedLast12m')::int, + (j->>'prsClosedUnmerged12m')::int, + (j->>'prMedianTimeToMergeHours')::int, + (j->>'prMedianTimeToFirstResponseHours')::int, + (j->>'issuesOpenedLast12m')::int, + (j->>'issuesClosedLast12m')::int, + (j->>'issuesOpenedLast6m')::int, + (j->>'issuesOpenedPrior6m')::int, + (j->>'issuesOpenNow')::int, + (j->>'issueMedianTimeToCloseHours')::int, + (j->>'issueMedianTimeToFirstResponseHours')::int + FROM jsonb_array_elements($1::jsonb) j + ON CONFLICT (repo_id) DO UPDATE SET + snapshot_at = EXCLUDED.snapshot_at, + window_months = EXCLUDED.window_months, + commits_last_12m = EXCLUDED.commits_last_12m, + commits_last_6m = EXCLUDED.commits_last_6m, + commits_prior_6m = EXCLUDED.commits_prior_6m, + prs_opened_last_12m = EXCLUDED.prs_opened_last_12m, + prs_merged_last_12m = EXCLUDED.prs_merged_last_12m, + prs_closed_unmerged_12m = EXCLUDED.prs_closed_unmerged_12m, + pr_median_time_to_merge_hours = EXCLUDED.pr_median_time_to_merge_hours, + pr_median_time_to_first_response_hours = EXCLUDED.pr_median_time_to_first_response_hours, + issues_opened_last_12m = EXCLUDED.issues_opened_last_12m, + issues_closed_last_12m = EXCLUDED.issues_closed_last_12m, + issues_opened_last_6m = EXCLUDED.issues_opened_last_6m, + issues_opened_prior_6m = EXCLUDED.issues_opened_prior_6m, + issues_open_now = EXCLUDED.issues_open_now, + issue_median_time_to_close_hours = EXCLUDED.issue_median_time_to_close_hours, + issue_median_time_to_first_response_hours = EXCLUDED.issue_median_time_to_first_response_hours + `, + [JSON.stringify(rows)], + ) +} From 93795a5c04e501a3038d977c488905a1fde2f993 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Tue, 9 Jun 2026 17:35:56 +0100 Subject: [PATCH 2/2] fix: code review fixes Signed-off-by: Mouad BANI --- .../src/enricher/fetchActivitySnapshot.ts | 24 ++-- .../src/enricher/fetchLightRepo.ts | 7 +- .../src/enricher/runEnrichmentLoop.ts | 34 +++-- .../packages_worker/src/enricher/types.ts | 1 + .../enricher/updateRepoActivitySnapshot.ts | 121 +++++++++--------- 5 files changed, 107 insertions(+), 80 deletions(-) diff --git a/services/apps/packages_worker/src/enricher/fetchActivitySnapshot.ts b/services/apps/packages_worker/src/enricher/fetchActivitySnapshot.ts index fd68aab376..d35f588e67 100644 --- a/services/apps/packages_worker/src/enricher/fetchActivitySnapshot.ts +++ b/services/apps/packages_worker/src/enricher/fetchActivitySnapshot.ts @@ -129,7 +129,6 @@ interface SummaryQueryResult { prsOpened: IssueCount prsMerged: IssueCount prsClosedUnmerged: IssueCount - prsOpenNow: IssueCount issuesOpened: IssueCount issuesClosed: IssueCount issuesOpened6m: IssueCount @@ -161,7 +160,7 @@ const SUMMARY_QUERY = ` $owner: String!, $name: String!, $since12m: GitTimestamp!, $since6m: GitTimestamp!, $searchPrsOpened: String!, $searchPrsMerged: String!, - $searchPrsClosedUnmerged: String!, $searchPrsOpenNow: String!, + $searchPrsClosedUnmerged: String!, $searchIssuesOpened: String!, $searchIssuesClosed: String!, $searchIssuesOpened6m: String!, $searchIssuesOpenNow: String! ) { @@ -180,7 +179,6 @@ const SUMMARY_QUERY = ` prsOpened: search(query: $searchPrsOpened, type: ISSUE) { issueCount } prsMerged: search(query: $searchPrsMerged, type: ISSUE) { issueCount } prsClosedUnmerged: search(query: $searchPrsClosedUnmerged, type: ISSUE) { issueCount } - prsOpenNow: search(query: $searchPrsOpenNow, type: ISSUE) { issueCount } issuesOpened: search(query: $searchIssuesOpened, type: ISSUE) { issueCount } issuesClosed: search(query: $searchIssuesClosed, type: ISSUE) { issueCount } issuesOpened6m: search(query: $searchIssuesOpened6m, type: ISSUE) { issueCount } @@ -212,9 +210,10 @@ async function pagePrs( token: string, timeoutMs: number, windowStart: Date, -): Promise<{ nodes: PrNode[]; rateLimitCost: number }> { +): Promise<{ nodes: PrNode[]; rateLimitCost: number; httpRequestCount: number }> { const nodes: PrNode[] = [] let rateLimitCost = 0 + let httpRequestCount = 0 let cursor: string | undefined for (;;) { @@ -223,6 +222,7 @@ async function pagePrs( const data = await graphqlRequest(PR_PAGE_QUERY, variables, token, timeoutMs) rateLimitCost += data.rateLimit.cost + httpRequestCount++ const connection = data.repository.pullRequests let reachedWindowBoundary = false @@ -239,7 +239,7 @@ async function pagePrs( cursor = connection.pageInfo.endCursor } - return { nodes, rateLimitCost } + return { nodes, rateLimitCost, httpRequestCount } } const ISSUE_PAGE_QUERY = ` @@ -265,9 +265,10 @@ async function pageIssues( token: string, timeoutMs: number, windowStart: Date, -): Promise<{ nodes: IssueNode[]; rateLimitCost: number }> { +): Promise<{ nodes: IssueNode[]; rateLimitCost: number; httpRequestCount: number }> { const nodes: IssueNode[] = [] let rateLimitCost = 0 + let httpRequestCount = 0 let cursor: string | undefined for (;;) { @@ -281,6 +282,7 @@ async function pageIssues( timeoutMs, ) rateLimitCost += data.rateLimit.cost + httpRequestCount++ const connection = data.repository.issues let reachedWindowBoundary = false @@ -297,7 +299,7 @@ async function pageIssues( cursor = connection.pageInfo.endCursor } - return { nodes, rateLimitCost } + return { nodes, rateLimitCost, httpRequestCount } } export async function fetchActivitySnapshot( @@ -318,11 +320,10 @@ export async function fetchActivitySnapshot( since12m: since12mIso, since6m: since6mIso, searchPrsOpened: `${repoFilter} is:pr created:>=${since12mDate}`, - searchPrsMerged: `${repoFilter} is:pr is:merged merged:>=${since12mDate}`, + searchPrsMerged: `${repoFilter} is:pr is:merged created:>=${since12mDate}`, searchPrsClosedUnmerged: `${repoFilter} is:pr is:unmerged is:closed created:>=${since12mDate}`, - searchPrsOpenNow: `${repoFilter} is:pr is:open`, searchIssuesOpened: `${repoFilter} is:issue created:>=${since12mDate}`, - searchIssuesClosed: `${repoFilter} is:issue is:closed closed:>=${since12mDate}`, + searchIssuesClosed: `${repoFilter} is:issue is:closed created:>=${since12mDate}`, searchIssuesOpened6m: `${repoFilter} is:issue created:>=${since6mDate}`, searchIssuesOpenNow: `${repoFilter} is:issue is:open`, }, @@ -331,6 +332,7 @@ export async function fetchActivitySnapshot( ) let totalRateLimitCost = summaryData.rateLimit.cost + let totalHttpRequests = 1 // summary query const commitTarget = summaryData.repository.defaultBranchRef?.target const commitsLast12m: number | null = commitTarget?.commits12m?.totalCount ?? null @@ -345,6 +347,7 @@ export async function fetchActivitySnapshot( ]) totalRateLimitCost += prResult.rateLimitCost + issueResult.rateLimitCost + totalHttpRequests += prResult.httpRequestCount + issueResult.httpRequestCount log.debug( { @@ -380,6 +383,7 @@ export async function fetchActivitySnapshot( issuesOpenNow: summaryData.issuesOpenNow.issueCount, issueMedianTimeToCloseHours: issueMedians.medianTimeToCloseHours, issueMedianTimeToFirstResponseHours: issueMedians.medianTimeToFirstResponseHours, + httpRequestCount: totalHttpRequests, rateLimitCost: totalRateLimitCost, } } diff --git a/services/apps/packages_worker/src/enricher/fetchLightRepo.ts b/services/apps/packages_worker/src/enricher/fetchLightRepo.ts index 4d59f13cc6..8875fe68ea 100644 --- a/services/apps/packages_worker/src/enricher/fetchLightRepo.ts +++ b/services/apps/packages_worker/src/enricher/fetchLightRepo.ts @@ -50,10 +50,9 @@ async function fetchSecurityFileEnabled( headers, signal: controller.signal, }) - return response.status === 200 - } catch (err) { - log.warn({ url, path, err }, 'Security file check error') - return false + if (response.status === 200) return true + if (response.status === 404) return false + throw new Error(`Unexpected status ${response.status} for ${path}`) } finally { clearTimeout(timeoutId) } diff --git a/services/apps/packages_worker/src/enricher/runEnrichmentLoop.ts b/services/apps/packages_worker/src/enricher/runEnrichmentLoop.ts index 730d3370f3..73a98d0703 100644 --- a/services/apps/packages_worker/src/enricher/runEnrichmentLoop.ts +++ b/services/apps/packages_worker/src/enricher/runEnrichmentLoop.ts @@ -171,7 +171,12 @@ async function runStreamingPool( appConfig: GithubAppConfig, config: ReturnType, isShuttingDown: () => boolean, - metrics: { totalFetched: number; totalRequests: number; startTime: number }, + metrics: { + totalFetched: number + totalHttpRequests: number + totalRateLimitCost: number + startTime: number + }, ): Promise<'exhausted' | 'shutdown'> { const parkedUntil = new Map() const roundRobinIdx = { value: 0 } @@ -251,7 +256,7 @@ async function runStreamingPool( appConfig.privateKeyPem, installationId, ) - metrics.totalRequests++ + metrics.totalHttpRequests++ const outcome = await fetchWithRetries(row.url, token, config.fetchTimeoutMs) if (outcome.kind === 'success') { @@ -266,7 +271,8 @@ async function runStreamingPool( token, config.fetchTimeoutMs, ) - metrics.totalRequests += snapshot.rateLimitCost + metrics.totalHttpRequests += snapshot.httpRequestCount + metrics.totalRateLimitCost += snapshot.rateLimitCost writeBuffer.addSnapshot(snapshot) } catch (snapshotErr) { if (snapshotErr instanceof FetchError && snapshotErr.kind === 'RATE_LIMIT') { @@ -312,15 +318,16 @@ async function runStreamingPool( log.info( { totalFetched: metrics.totalFetched, - totalRequests: metrics.totalRequests, + totalHttpRequests: metrics.totalHttpRequests, + totalRateLimitCost: metrics.totalRateLimitCost, reposPerHour: elapsedHours > 0 ? Math.round(metrics.totalFetched / elapsedHours) : metrics.totalFetched, - reqsPerHour: + httpReqsPerHour: elapsedHours > 0 - ? Math.round(metrics.totalRequests / elapsedHours) - : metrics.totalRequests, + ? Math.round(metrics.totalHttpRequests / elapsedHours) + : metrics.totalHttpRequests, flushed, queueDepth: queue.length, }, @@ -345,7 +352,12 @@ export async function runEnrichmentLoop( config: ReturnType, isShuttingDown: () => boolean, ): Promise { - const metrics = { totalFetched: 0, totalRequests: 0, startTime: Date.now() } + const metrics = { + totalFetched: 0, + totalHttpRequests: 0, + totalRateLimitCost: 0, + startTime: Date.now(), + } while (!isShuttingDown()) { const outcome = await runStreamingPool( @@ -360,7 +372,11 @@ export async function runEnrichmentLoop( if (outcome === 'shutdown') break log.info( - { totalFetched: metrics.totalFetched, totalRequests: metrics.totalRequests }, + { + totalFetched: metrics.totalFetched, + totalHttpRequests: metrics.totalHttpRequests, + totalRateLimitCost: metrics.totalRateLimitCost, + }, `All repos processed — sleeping ${config.idleSleepSec}s`, ) await new Promise((r) => setTimeout(r, config.idleSleepSec * 1000)) diff --git a/services/apps/packages_worker/src/enricher/types.ts b/services/apps/packages_worker/src/enricher/types.ts index c0f82b2b56..c27a179433 100644 --- a/services/apps/packages_worker/src/enricher/types.ts +++ b/services/apps/packages_worker/src/enricher/types.ts @@ -44,6 +44,7 @@ export interface RepoActivitySnapshot { issuesOpenNow: number | null issueMedianTimeToCloseHours: number | null issueMedianTimeToFirstResponseHours: number | null + httpRequestCount: number rateLimitCost: number } diff --git a/services/apps/packages_worker/src/enricher/updateRepoActivitySnapshot.ts b/services/apps/packages_worker/src/enricher/updateRepoActivitySnapshot.ts index b610682012..39de5e4bdc 100644 --- a/services/apps/packages_worker/src/enricher/updateRepoActivitySnapshot.ts +++ b/services/apps/packages_worker/src/enricher/updateRepoActivitySnapshot.ts @@ -10,64 +10,71 @@ export async function bulkUpsertRepoActivitySnapshot( await qx.result( ` - INSERT INTO repo_activity_snapshot ( - repo_id, - snapshot_at, - window_months, - commits_last_12m, - commits_last_6m, - commits_prior_6m, - prs_opened_last_12m, - prs_merged_last_12m, - prs_closed_unmerged_12m, - pr_median_time_to_merge_hours, - pr_median_time_to_first_response_hours, - issues_opened_last_12m, - issues_closed_last_12m, - issues_opened_last_6m, - issues_opened_prior_6m, - issues_open_now, - issue_median_time_to_close_hours, - issue_median_time_to_first_response_hours + WITH upserted AS ( + INSERT INTO repo_activity_snapshot ( + repo_id, + snapshot_at, + window_months, + commits_last_12m, + commits_last_6m, + commits_prior_6m, + prs_opened_last_12m, + prs_merged_last_12m, + prs_closed_unmerged_12m, + pr_median_time_to_merge_hours, + pr_median_time_to_first_response_hours, + issues_opened_last_12m, + issues_closed_last_12m, + issues_opened_last_6m, + issues_opened_prior_6m, + issues_open_now, + issue_median_time_to_close_hours, + issue_median_time_to_first_response_hours + ) + SELECT + (j->>'repoId')::bigint, + (j->>'snapshotAt')::timestamptz, + (j->>'windowMonths')::int, + (j->>'commitsLast12m')::int, + (j->>'commitsLast6m')::int, + (j->>'commitsPrior6m')::int, + (j->>'prsOpenedLast12m')::int, + (j->>'prsMergedLast12m')::int, + (j->>'prsClosedUnmerged12m')::int, + (j->>'prMedianTimeToMergeHours')::int, + (j->>'prMedianTimeToFirstResponseHours')::int, + (j->>'issuesOpenedLast12m')::int, + (j->>'issuesClosedLast12m')::int, + (j->>'issuesOpenedLast6m')::int, + (j->>'issuesOpenedPrior6m')::int, + (j->>'issuesOpenNow')::int, + (j->>'issueMedianTimeToCloseHours')::int, + (j->>'issueMedianTimeToFirstResponseHours')::int + FROM jsonb_array_elements($1::jsonb) j + ON CONFLICT (repo_id) DO UPDATE SET + snapshot_at = EXCLUDED.snapshot_at, + window_months = EXCLUDED.window_months, + commits_last_12m = EXCLUDED.commits_last_12m, + commits_last_6m = EXCLUDED.commits_last_6m, + commits_prior_6m = EXCLUDED.commits_prior_6m, + prs_opened_last_12m = EXCLUDED.prs_opened_last_12m, + prs_merged_last_12m = EXCLUDED.prs_merged_last_12m, + prs_closed_unmerged_12m = EXCLUDED.prs_closed_unmerged_12m, + pr_median_time_to_merge_hours = EXCLUDED.pr_median_time_to_merge_hours, + pr_median_time_to_first_response_hours = EXCLUDED.pr_median_time_to_first_response_hours, + issues_opened_last_12m = EXCLUDED.issues_opened_last_12m, + issues_closed_last_12m = EXCLUDED.issues_closed_last_12m, + issues_opened_last_6m = EXCLUDED.issues_opened_last_6m, + issues_opened_prior_6m = EXCLUDED.issues_opened_prior_6m, + issues_open_now = EXCLUDED.issues_open_now, + issue_median_time_to_close_hours = EXCLUDED.issue_median_time_to_close_hours, + issue_median_time_to_first_response_hours = EXCLUDED.issue_median_time_to_first_response_hours + RETURNING repo_id, snapshot_at ) - SELECT - (j->>'repoId')::bigint, - (j->>'snapshotAt')::timestamptz, - (j->>'windowMonths')::int, - (j->>'commitsLast12m')::int, - (j->>'commitsLast6m')::int, - (j->>'commitsPrior6m')::int, - (j->>'prsOpenedLast12m')::int, - (j->>'prsMergedLast12m')::int, - (j->>'prsClosedUnmerged12m')::int, - (j->>'prMedianTimeToMergeHours')::int, - (j->>'prMedianTimeToFirstResponseHours')::int, - (j->>'issuesOpenedLast12m')::int, - (j->>'issuesClosedLast12m')::int, - (j->>'issuesOpenedLast6m')::int, - (j->>'issuesOpenedPrior6m')::int, - (j->>'issuesOpenNow')::int, - (j->>'issueMedianTimeToCloseHours')::int, - (j->>'issueMedianTimeToFirstResponseHours')::int - FROM jsonb_array_elements($1::jsonb) j - ON CONFLICT (repo_id) DO UPDATE SET - snapshot_at = EXCLUDED.snapshot_at, - window_months = EXCLUDED.window_months, - commits_last_12m = EXCLUDED.commits_last_12m, - commits_last_6m = EXCLUDED.commits_last_6m, - commits_prior_6m = EXCLUDED.commits_prior_6m, - prs_opened_last_12m = EXCLUDED.prs_opened_last_12m, - prs_merged_last_12m = EXCLUDED.prs_merged_last_12m, - prs_closed_unmerged_12m = EXCLUDED.prs_closed_unmerged_12m, - pr_median_time_to_merge_hours = EXCLUDED.pr_median_time_to_merge_hours, - pr_median_time_to_first_response_hours = EXCLUDED.pr_median_time_to_first_response_hours, - issues_opened_last_12m = EXCLUDED.issues_opened_last_12m, - issues_closed_last_12m = EXCLUDED.issues_closed_last_12m, - issues_opened_last_6m = EXCLUDED.issues_opened_last_6m, - issues_opened_prior_6m = EXCLUDED.issues_opened_prior_6m, - issues_open_now = EXCLUDED.issues_open_now, - issue_median_time_to_close_hours = EXCLUDED.issue_median_time_to_close_hours, - issue_median_time_to_first_response_hours = EXCLUDED.issue_median_time_to_first_response_hours + UPDATE repos r + SET snapshot_at = u.snapshot_at + FROM upserted u + WHERE r.id = u.repo_id `, [JSON.stringify(rows)], )