From b23c298b1564ee46a2b0c0c0eb9e4ee988054e5a Mon Sep 17 00:00:00 2001 From: Joan Reyero Date: Wed, 3 Jun 2026 15:36:30 +0200 Subject: [PATCH 1/8] feat: dockerhub-sync worker for repo_docker pull counts (CM-1213) Standalone loop worker (modeled on github-repos-enricher) that: - discovers Docker images for GitHub repos via Dockerfile-gated / probing on hub.docker.com/v2 - refreshes pull/star counts daily into repo_docker - snapshots lifetime pull_count into repo_docker_pulls_daily for delta-at-query-time daily granularity Schema (V1779710880 edited in place, pre-prod): - repos.docker_checked_at + partial index for discovery backlog - repo_docker_pulls_daily partitioned by date (pg_partman, mirrors downloads_daily) - repo_docker_stale_idx on last_synced_at Tested against a 1000-repo random sample from prod public.repositories: 2.6% hit rate on Hub; 87% of repos have no Dockerfile; ghcr.io is the dominant registry for the remainder. CI-workflow parsing and ghcr/quay probes scoped as follow-ups. Co-Authored-By: Claude Opus 4.8 Signed-off-by: Joan Reyero --- backend/.env.dist.local | 7 + .../V1779710880__initial_schema.sql | 35 ++ scripts/services/dockerhub-sync.yaml | 70 ++++ services/apps/packages_worker/package.json | 3 + .../packages_worker/src/bin/dockerhub-sync.ts | 48 +++ services/apps/packages_worker/src/config.ts | 18 ++ .../dockerhub/__tests__/candidates.test.ts | 28 ++ .../__tests__/fetchDockerhub.test.ts | 95 ++++++ .../src/dockerhub/candidates.ts | 19 ++ .../src/dockerhub/detectDockerfile.ts | 81 +++++ .../src/dockerhub/fetchDockerhub.ts | 63 ++++ .../packages_worker/src/dockerhub/index.ts | 303 ++++++++++++++++++ .../packages_worker/src/dockerhub/types.ts | 32 ++ .../src/dockerhub/upsertRepoDocker.ts | 41 +++ 14 files changed, 843 insertions(+) create mode 100644 scripts/services/dockerhub-sync.yaml create mode 100644 services/apps/packages_worker/src/bin/dockerhub-sync.ts create mode 100644 services/apps/packages_worker/src/dockerhub/__tests__/candidates.test.ts create mode 100644 services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts create mode 100644 services/apps/packages_worker/src/dockerhub/candidates.ts create mode 100644 services/apps/packages_worker/src/dockerhub/detectDockerfile.ts create mode 100644 services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts create mode 100644 services/apps/packages_worker/src/dockerhub/index.ts create mode 100644 services/apps/packages_worker/src/dockerhub/types.ts create mode 100644 services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts diff --git a/backend/.env.dist.local b/backend/.env.dist.local index 64365ea36a..501ae3b1af 100755 --- a/backend/.env.dist.local +++ b/backend/.env.dist.local @@ -194,3 +194,10 @@ OSV_ECOSYSTEMS=npm,Maven OSV_TMP_DIR=/tmp/osv OSV_BATCH_SIZE=500 OSV_DERIVE_BATCH_SIZE=1000 + +# dockerhub-sync (see services/apps/packages_worker/src/dockerhub/) +DOCKERHUB_API_BASE_URL=https://hub.docker.com/v2 +DOCKERHUB_BATCH_SIZE=100 +DOCKERHUB_REFRESH_INTERVAL_HOURS=24 +DOCKERHUB_DISCOVERY_INTERVAL_DAYS=14 +DOCKERHUB_IDLE_SLEEP_SEC=60 \ No newline at end of file diff --git a/backend/src/osspckgs/migrations/V1779710880__initial_schema.sql b/backend/src/osspckgs/migrations/V1779710880__initial_schema.sql index 9c61a03e25..50ee0781fa 100644 --- a/backend/src/osspckgs/migrations/V1779710880__initial_schema.sql +++ b/backend/src/osspckgs/migrations/V1779710880__initial_schema.sql @@ -507,6 +507,10 @@ CREATE TABLE repos ( -- Scorecard aggregate; per-check detail in repo_scorecard_checks scorecard_score numeric(3, 1), scorecard_last_run_at timestamptz, + -- Last time dockerhub-sync probed this repo for a published Docker image (Dockerfile + -- detection + Hub candidate lookup). NULL = never checked. Separate from last_synced_at + -- because discovery cadence (weeks) differs from light-metadata refresh cadence (daily). + docker_checked_at timestamptz, -- Nullable with no default: multiple enrichers (deps.dev, GitHub worker, Scorecard) each write -- different columns at different times. NOT NULL DEFAULT would stamp a "synced" timestamp on -- first insert even when most columns are still NULL, making freshness checks misleading. @@ -521,6 +525,13 @@ CREATE INDEX ON repos (scorecard_score) WHERE scorecard_score IS NOT NULL; +-- Partial index for the dockerhub-sync discovery backlog query: pages repos that have +-- never been probed for a Docker image. Once docker_checked_at is set the row drops out +-- of the index, so this stays small even as the repos table grows. +CREATE INDEX repos_docker_pending_idx ON repos (id) +WHERE + host = 'github' AND docker_checked_at IS NULL; + -- OpenSSF Scorecard per-check detail (~18 named checks) CREATE TABLE repo_scorecard_checks ( id bigserial PRIMARY KEY, @@ -547,6 +558,30 @@ CREATE INDEX ON repo_docker (repo_id) WHERE repo_id IS NOT NULL; +-- Supports the dockerhub-sync refresh query (WHERE last_synced_at < NOW() - interval). +CREATE INDEX repo_docker_stale_idx ON repo_docker (last_synced_at); + +-- ============================================================ +-- REPO DOCKER PULLS DAILY +-- One row per image per day storing the *lifetime* pull_count as returned +-- by hub.docker.com/v2/repositories/. Docker Hub does not expose +-- per-day download counts, so daily deltas are derived at query time: +-- pulls_total - LAG(pulls_total) OVER (PARTITION BY image_name ORDER BY date) +-- Keyed by image_name (matches repo_docker UNIQUE) so rows survive a +-- repo_docker re-discovery without an FK cascade. +-- +-- Partitioned monthly via pg_partman — same setup as downloads_daily; add a +-- partman.create_parent('public.repo_docker_pulls_daily', 'date', '1 month', 3) +-- call alongside the downloads_daily registration. +-- ============================================================ +CREATE TABLE repo_docker_pulls_daily ( + image_name text NOT NULL, + date date NOT NULL, + pulls_total bigint NOT NULL, + PRIMARY KEY (image_name, date) +) +PARTITION BY RANGE (date); + -- Package → repo provenance (monorepos publish N packages from one repo) CREATE TABLE package_repos ( id bigserial PRIMARY KEY, diff --git a/scripts/services/dockerhub-sync.yaml b/scripts/services/dockerhub-sync.yaml new file mode 100644 index 0000000000..2d67c5b22c --- /dev/null +++ b/scripts/services/dockerhub-sync.yaml @@ -0,0 +1,70 @@ +version: '3.1' + +x-env-args: &env-args + DOCKER_BUILDKIT: 1 + NODE_ENV: docker + SERVICE: dockerhub-sync + SHELL: /bin/sh + SUPPRESS_NO_CONFIG_WARNING: 'true' + DOCKERHUB_API_BASE_URL: 'https://hub.docker.com/v2' + DOCKERHUB_BATCH_SIZE: '100' + DOCKERHUB_REFRESH_INTERVAL_HOURS: '24' + DOCKERHUB_DISCOVERY_INTERVAL_DAYS: '14' + DOCKERHUB_IDLE_SLEEP_SEC: '60' + +services: + dockerhub-sync: + build: + context: ../../ + dockerfile: ./scripts/services/docker/Dockerfile.packages-worker + command: 'pnpm run start:dockerhub-sync' + working_dir: /usr/crowd/app/services/apps/packages_worker + env_file: + - ../../backend/.env.dist.local + - ../../backend/.env.dist.composed + - ../../backend/.env.override.local + - ../../backend/.env.override.composed + environment: + <<: *env-args + restart: always + networks: + - crowd-bridge + + dockerhub-sync-dev: + build: + context: ../../ + dockerfile: ./scripts/services/docker/Dockerfile.packages-worker + command: 'pnpm run dev:dockerhub-sync' + working_dir: /usr/crowd/app/services/apps/packages_worker + # user: '${USER_ID}:${GROUP_ID}' + env_file: + - ../../backend/.env.dist.local + - ../../backend/.env.dist.composed + - ../../backend/.env.override.local + - ../../backend/.env.override.composed + environment: + <<: *env-args + hostname: dockerhub-sync + networks: + - crowd-bridge + volumes: + - ../../services/libs/audit-logs/src:/usr/crowd/app/services/libs/audit-logs/src + - ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src + - ../../services/libs/common_services/src:/usr/crowd/app/services/libs/common_services/src + - ../../services/libs/data-access-layer/src:/usr/crowd/app/services/libs/data-access-layer/src + - ../../services/libs/database/src:/usr/crowd/app/services/libs/database/src + - ../../services/libs/integrations/src:/usr/crowd/app/services/libs/integrations/src + - ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src + - ../../services/libs/nango/src:/usr/crowd/app/services/libs/nango/src + - ../../services/libs/opensearch/src:/usr/crowd/app/services/libs/opensearch/src + - ../../services/libs/queue/src:/usr/crowd/app/services/libs/queue/src + - ../../services/libs/redis/src:/usr/crowd/app/services/libs/redis/src + - ../../services/libs/snowflake/src:/usr/crowd/app/services/libs/snowflake/src + - ../../services/libs/telemetry/src:/usr/crowd/app/services/libs/telemetry/src + - ../../services/libs/temporal/src:/usr/crowd/app/services/libs/temporal/src + - ../../services/libs/types/src:/usr/crowd/app/services/libs/types/src + - ../../services/apps/packages_worker/src:/usr/crowd/app/services/apps/packages_worker/src + +networks: + crowd-bridge: + external: true diff --git a/services/apps/packages_worker/package.json b/services/apps/packages_worker/package.json index af8f62dde0..ce89c44ce1 100644 --- a/services/apps/packages_worker/package.json +++ b/services/apps/packages_worker/package.json @@ -8,6 +8,9 @@ "dev:github-repos-enricher": "SERVICE=github-repos-enricher LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9234 src/bin/github-repos-enricher.ts", "dev:packages-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=packages-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9233 src/bin/packages-worker.ts", "dev:github-repos-enricher:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=github-repos-enricher LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9234 src/bin/github-repos-enricher.ts", + "start:dockerhub-sync": "SERVICE=dockerhub-sync tsx src/bin/dockerhub-sync.ts", + "dev:dockerhub-sync": "SERVICE=dockerhub-sync LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9235 src/bin/dockerhub-sync.ts", + "dev:dockerhub-sync:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=dockerhub-sync LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9235 src/bin/dockerhub-sync.ts", "lint": "npx eslint --ext .ts src --max-warnings=0", "format": "npx prettier --write \"src/**/*.ts\"", "format-check": "npx prettier --check .", diff --git a/services/apps/packages_worker/src/bin/dockerhub-sync.ts b/services/apps/packages_worker/src/bin/dockerhub-sync.ts new file mode 100644 index 0000000000..40602204f4 --- /dev/null +++ b/services/apps/packages_worker/src/bin/dockerhub-sync.ts @@ -0,0 +1,48 @@ +import { getServiceLogger } from '@crowd/logging' + +import { getDockerhubConfig } from '../config' +import { getPackagesDb } from '../db' +import { runDockerhubLoop } from '../dockerhub' + +const log = getServiceLogger() + +let shuttingDown = false + +const shutdown = async () => { + if (shuttingDown) return + shuttingDown = true + log.info('Shutting down dockerhub-sync...') +} + +process.on('SIGINT', shutdown) +process.on('SIGTERM', shutdown) + +const main = async () => { + log.info('dockerhub-sync starting...') + + const config = getDockerhubConfig() + + if (config.tokens.length === 0) { + log.error('ENRICHER_GITHUB_TOKENS is required (comma-separated PATs)') + process.exit(1) + } + + const qx = await getPackagesDb() + await qx.selectOne('SELECT 1') + log.info('Connected to packages-db.') + + log.info( + { tokens: config.tokens.length, batchSize: config.batchSize, hubBaseUrl: config.hubBaseUrl }, + 'Starting dockerhub loop', + ) + + await runDockerhubLoop(qx, config, () => shuttingDown) + + log.info('dockerhub-sync stopped.') + process.exit(0) +} + +main().catch((err) => { + log.error({ err }, 'dockerhub-sync fatal error') + process.exit(1) +}) diff --git a/services/apps/packages_worker/src/config.ts b/services/apps/packages_worker/src/config.ts index 84c11fb134..aa1de67d4d 100644 --- a/services/apps/packages_worker/src/config.ts +++ b/services/apps/packages_worker/src/config.ts @@ -32,3 +32,21 @@ export function getEnricherConfig() { idleSleepSec: requireEnvInt('ENRICHER_IDLE_SLEEP_SEC'), } } + +export function getDockerhubConfig() { + const rawTokens = process.env.ENRICHER_GITHUB_TOKENS ?? '' + const tokens = rawTokens + .split(',') + .map((t) => t.trim()) + .filter(Boolean) + + return { + // GitHub PATs reused from the enricher for Dockerfile detection (GraphQL). + tokens, + hubBaseUrl: requireEnv('DOCKERHUB_API_BASE_URL'), + batchSize: requireEnvInt('DOCKERHUB_BATCH_SIZE'), + refreshIntervalHours: requireEnvInt('DOCKERHUB_REFRESH_INTERVAL_HOURS'), + discoveryIntervalDays: requireEnvInt('DOCKERHUB_DISCOVERY_INTERVAL_DAYS'), + idleSleepSec: requireEnvInt('DOCKERHUB_IDLE_SLEEP_SEC'), + } +} diff --git a/services/apps/packages_worker/src/dockerhub/__tests__/candidates.test.ts b/services/apps/packages_worker/src/dockerhub/__tests__/candidates.test.ts new file mode 100644 index 0000000000..7a9db3fa3a --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/__tests__/candidates.test.ts @@ -0,0 +1,28 @@ +import { describe, expect, it } from 'vitest' + +import { buildCandidates } from '../candidates' + +describe('buildCandidates', () => { + it('lowercases owner and repo into a single / candidate', () => { + expect(buildCandidates('Grafana', 'Grafana')).toEqual(['grafana/grafana']) + }) + + it('passes through already-valid lowercase slugs', () => { + expect(buildCandidates('prometheus', 'node_exporter')).toEqual(['prometheus/node_exporter']) + }) + + it('rejects components with characters Docker Hub does not accept', () => { + // GitHub allows '+' in org names via renames; Hub would 400. + expect(buildCandidates('foo+bar', 'baz')).toEqual([]) + }) + + it('rejects components that start or end with a separator', () => { + expect(buildCandidates('-leading', 'repo')).toEqual([]) + expect(buildCandidates('owner', 'trailing.')).toEqual([]) + }) + + it('does not emit a library/ candidate', () => { + // Guard against accidental reintroduction — see comment in candidates.ts. + expect(buildCandidates('nodejs', 'node')).toEqual(['nodejs/node']) + }) +}) diff --git a/services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts b/services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts new file mode 100644 index 0000000000..fa689be5fe --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts @@ -0,0 +1,95 @@ +import { afterEach, describe, expect, it, vi } from 'vitest' + +import { fetchDockerhub } from '../fetchDockerhub' +import { FetchError } from '../types' + +const BASE = 'https://hub.docker.com/v2' + +function mockFetch(status: number, body: unknown, headers: Record = {}) { + return vi.spyOn(globalThis, 'fetch').mockResolvedValue( + new Response(typeof body === 'string' ? body : JSON.stringify(body), { + status, + headers: { 'Content-Type': 'application/json', ...headers }, + }), + ) +} + +afterEach(() => { + vi.restoreAllMocks() +}) + +describe('fetchDockerhub', () => { + it('returns pull/star counts on 200', async () => { + mockFetch(200, { + name: 'grafana', + namespace: 'grafana', + pull_count: 12345, + star_count: 678, + last_updated: '2026-05-01T00:00:00Z', + }) + + const r = await fetchDockerhub(BASE, 'grafana/grafana') + expect(r).toEqual({ + imageName: 'grafana/grafana', + pulls: 12345, + stars: 678, + lastUpdated: '2026-05-01T00:00:00Z', + }) + }) + + it('appends trailing slash to the request URL', async () => { + const spy = mockFetch(200, { pull_count: 1, star_count: 0 }) + await fetchDockerhub(BASE, 'a/b') + expect(spy).toHaveBeenCalledWith(`${BASE}/repositories/a/b/`, expect.anything()) + }) + + it('classifies 404 as NOT_FOUND', async () => { + mockFetch(404, { message: 'object not found' }) + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'NOT_FOUND' }) + }) + + it('classifies 400 as NOT_FOUND (Hub 400s on malformed slugs)', async () => { + mockFetch(400, { message: 'bad request' }) + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'NOT_FOUND' }) + }) + + it('classifies 429 as RATE_LIMIT with resetAt from header', async () => { + const resetSec = Math.floor(Date.now() / 1000) + 120 + mockFetch(429, { message: 'too many' }, { 'x-ratelimit-reset': String(resetSec) }) + + expect.assertions(3) + try { + await fetchDockerhub(BASE, 'a/b') + } catch (err) { + expect(err).toBeInstanceOf(FetchError) + const fe = err as FetchError + expect(fe.kind).toBe('RATE_LIMIT') + expect(fe.resetAt).toBeGreaterThan(Date.now()) + } + }) + + it('classifies x-ratelimit-remaining: 0 as RATE_LIMIT even on 200', async () => { + mockFetch(200, { pull_count: 1 }, { 'x-ratelimit-remaining': '0' }) + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'RATE_LIMIT' }) + }) + + it('classifies 5xx as TRANSIENT', async () => { + mockFetch(503, 'Service Unavailable') + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'TRANSIENT' }) + }) + + it('classifies network failure as TRANSIENT', async () => { + vi.spyOn(globalThis, 'fetch').mockRejectedValue(new Error('ECONNRESET')) + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'TRANSIENT' }) + }) + + it('classifies non-JSON 200 body as MALFORMED', async () => { + mockFetch(200, 'not json') + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'MALFORMED' }) + }) + + it('classifies missing pull_count as MALFORMED', async () => { + mockFetch(200, { name: 'x', star_count: 0 }) + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'MALFORMED' }) + }) +}) diff --git a/services/apps/packages_worker/src/dockerhub/candidates.ts b/services/apps/packages_worker/src/dockerhub/candidates.ts new file mode 100644 index 0000000000..eeaef0e954 --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/candidates.ts @@ -0,0 +1,19 @@ +// Docker Hub repository slugs are lowercase and limited to [a-z0-9._-]. +// GitHub allows uppercase and a few characters Hub rejects, so we lowercase +// and validate before probing — anything that fails the regex would 400 on +// Hub anyway and isn't worth an HTTP round-trip. +const HUB_COMPONENT = /^[a-z0-9](?:[a-z0-9._-]*[a-z0-9])?$/ + +export function buildCandidates(owner: string, name: string): string[] { + const ns = owner.toLowerCase() + const repo = name.toLowerCase() + + if (!HUB_COMPONENT.test(ns) || !HUB_COMPONENT.test(repo)) { + return [] + } + + // v1 deliberately omits `library/`: a random github.com/foo/node with a + // dev Dockerfile would false-positive onto the official library/node image. + // Official images (~150) to be seeded via allowlist in a follow-up. + return [`${ns}/${repo}`] +} diff --git a/services/apps/packages_worker/src/dockerhub/detectDockerfile.ts b/services/apps/packages_worker/src/dockerhub/detectDockerfile.ts new file mode 100644 index 0000000000..281defd316 --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/detectDockerfile.ts @@ -0,0 +1,81 @@ +import { FetchError } from './types' + +const GRAPHQL_URL = 'https://api.github.com/graphql' + +// One GraphQL call probes three common Dockerfile locations. object(expression:) +// returns null when the path doesn't exist, so a single request answers "does +// this repo ship a Dockerfile anywhere we care about?" without listing the tree. +const DOCKERFILE_QUERY = ` + query($owner: String!, $name: String!) { + repository(owner: $owner, name: $name) { + d0: object(expression: "HEAD:Dockerfile") { oid } + d1: object(expression: "HEAD:docker/Dockerfile") { oid } + d2: object(expression: "HEAD:build/Dockerfile") { oid } + } + } +` + +export async function detectDockerfile( + owner: string, + name: string, + token: string, +): Promise { + let response: Response + try { + response = await fetch(GRAPHQL_URL, { + method: 'POST', + headers: { + Authorization: `bearer ${token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ query: DOCKERFILE_QUERY, variables: { owner, name } }), + }) + } catch (err) { + throw new FetchError( + 'TRANSIENT', + `Network error for ${owner}/${name}: ${(err as Error).message}`, + ) + } + + const resetSec = parseInt(response.headers.get('x-ratelimit-reset') ?? '0', 10) + const resetMs = resetSec ? resetSec * 1000 + 5_000 : Date.now() + 65_000 + + if (response.status === 401) { + throw new FetchError('AUTH', `401 Unauthorized for ${owner}/${name}`) + } + + if (response.status === 403) { + const body = await response.text() + if (body.toLowerCase().includes('rate limit')) { + throw new FetchError('RATE_LIMIT', `Rate limited on ${owner}/${name}`, resetMs) + } + throw new FetchError('AUTH', `403 Forbidden for ${owner}/${name}`) + } + + if (response.status === 404) throw new FetchError('NOT_FOUND', `404 for ${owner}/${name}`) + if (response.status >= 500) { + throw new FetchError('TRANSIENT', `${response.status} for ${owner}/${name}`) + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const json = (await response.json()) as any + + if (json.errors?.length) { + const err = json.errors[0] + if (err.type === 'RATE_LIMITED') { + throw new FetchError('RATE_LIMIT', `RATE_LIMITED for ${owner}/${name}`, resetMs) + } + if (err.type === 'NOT_FOUND') { + throw new FetchError('NOT_FOUND', `NOT_FOUND for ${owner}/${name}`) + } + throw new FetchError( + 'TRANSIENT', + `GraphQL error for ${owner}/${name}: ${err.message ?? err.type}`, + ) + } + + const repo = json.data?.repository + if (!repo) throw new FetchError('NOT_FOUND', `No repository data for ${owner}/${name}`) + + return Boolean(repo.d0 ?? repo.d1 ?? repo.d2) +} diff --git a/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts b/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts new file mode 100644 index 0000000000..f03ead3a7e --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts @@ -0,0 +1,63 @@ +import { DockerhubRepoResult, FetchError } from './types' + +interface HubResponse { + name?: string + namespace?: string + pull_count?: number + star_count?: number + last_updated?: string | null +} + +// imageName must be '/' (lowercase). Trailing slash on the URL +// is required — Hub returns a 301 to the slashed form otherwise. +export async function fetchDockerhub( + baseUrl: string, + imageName: string, +): Promise { + const url = `${baseUrl}/repositories/${imageName}/` + + let response: Response + try { + response = await fetch(url, { + method: 'GET', + headers: { Accept: 'application/json' }, + }) + } catch (err) { + throw new FetchError('TRANSIENT', `Network error for ${imageName}: ${(err as Error).message}`) + } + + const remaining = parseInt(response.headers.get('x-ratelimit-remaining') ?? '', 10) + const resetSec = parseInt(response.headers.get('x-ratelimit-reset') ?? '0', 10) + const resetMs = resetSec ? resetSec * 1000 + 5_000 : Date.now() + 65_000 + + if (response.status === 429 || (Number.isFinite(remaining) && remaining <= 0)) { + throw new FetchError('RATE_LIMIT', `Rate limited on ${imageName}`, resetMs) + } + + if (response.status === 404) throw new FetchError('NOT_FOUND', `404 for ${imageName}`) + if (response.status >= 500) { + throw new FetchError('TRANSIENT', `${response.status} for ${imageName}`) + } + if (!response.ok) { + // 400/401/403 etc — treat as a miss; Hub sometimes 400s on malformed slugs. + throw new FetchError('NOT_FOUND', `${response.status} for ${imageName}`) + } + + let json: HubResponse + try { + json = (await response.json()) as HubResponse + } catch (err) { + throw new FetchError('MALFORMED', `Non-JSON body for ${imageName}: ${(err as Error).message}`) + } + + if (typeof json.pull_count !== 'number') { + throw new FetchError('MALFORMED', `Missing pull_count for ${imageName}`) + } + + return { + imageName, + pulls: json.pull_count, + stars: json.star_count ?? 0, + lastUpdated: json.last_updated ?? null, + } +} diff --git a/services/apps/packages_worker/src/dockerhub/index.ts b/services/apps/packages_worker/src/dockerhub/index.ts new file mode 100644 index 0000000000..e145b9465d --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/index.ts @@ -0,0 +1,303 @@ +import { QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' +import { getServiceChildLogger } from '@crowd/logging' + +import { getDockerhubConfig } from '../config' +import { parseGithubUrl } from '../enricher/fetchLightRepo' + +import { buildCandidates } from './candidates' +import { detectDockerfile } from './detectDockerfile' +import { fetchDockerhub } from './fetchDockerhub' +import { DiscoveryRepoRow, DockerhubRepoResult, FetchError, RefreshImageRow } from './types' +import { markDockerChecked, touchRepoDocker, upsertRepoDocker } from './upsertRepoDocker' + +const log = getServiceChildLogger('dockerhub-sync') + +const MAX_RETRIES = 3 + +type DockerhubConfig = ReturnType + +// Docker Hub's anonymous rate limit is per-IP, not per-token, so unlike the +// GitHub fan-out below this stays a single sequential caller. hubParkedUntil +// is module state so the park survives across refresh and discovery pages. +let hubParkedUntil = 0 + +async function hubFetchWithRetries( + baseUrl: string, + imageName: string, +): Promise { + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + const wait = hubParkedUntil - Date.now() + if (wait > 0) { + log.warn({ waitSec: Math.round(wait / 1000) }, 'Docker Hub parked, waiting') + await new Promise((r) => setTimeout(r, wait)) + } + try { + return await fetchDockerhub(baseUrl, imageName) + } catch (err) { + if (!(err instanceof FetchError)) throw err + + if (err.kind === 'NOT_FOUND') return null + if (err.kind === 'MALFORMED') { + log.warn({ imageName }, err.message) + return null + } + if (err.kind === 'RATE_LIMIT') { + hubParkedUntil = err.resetAt ?? Date.now() + 60_000 + continue + } + if (attempt < MAX_RETRIES) { + const backoffMs = 1000 * 2 ** attempt + log.warn({ imageName, attempt, backoffMs }, `Transient Hub error, retrying: ${err.message}`) + await new Promise((r) => setTimeout(r, backoffMs)) + } else { + log.error({ imageName }, `Gave up after ${MAX_RETRIES} retries: ${err.message}`) + return null + } + } + } + return null +} + +async function githubFetchWithRetries( + owner: string, + name: string, + token: string, +): Promise { + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + try { + return await detectDockerfile(owner, name, token) + } catch (err) { + if (!(err instanceof FetchError)) throw err + + if (['NOT_FOUND', 'AUTH', 'MALFORMED'].includes(err.kind)) { + log.warn({ owner, name, kind: err.kind }, err.message) + return null + } + + if (err.kind === 'RATE_LIMIT') throw err + + if (attempt < MAX_RETRIES) { + const backoffMs = 1000 * 2 ** attempt + log.warn({ owner, name, attempt, backoffMs }, `Transient error, retrying: ${err.message}`) + await new Promise((r) => setTimeout(r, backoffMs)) + } else { + log.error({ owner, name }, `Gave up after ${MAX_RETRIES} retries: ${err.message}`) + return null + } + } + } + return null +} + +async function fetchRefreshPage( + qx: QueryExecutor, + cursor: string | null, + config: DockerhubConfig, +): Promise { + return qx.select( + ` + SELECT id, repo_id, image_name + FROM repo_docker + WHERE last_synced_at < NOW() - make_interval(hours => $(refreshIntervalHours)) + AND ($(cursor)::bigint IS NULL OR id > $(cursor)::bigint) + ORDER BY id + LIMIT $(batchSize) + `, + { + cursor, + batchSize: config.batchSize, + refreshIntervalHours: config.refreshIntervalHours, + }, + ) +} + +async function processRefreshPage( + qx: QueryExecutor, + rows: RefreshImageRow[], + config: DockerhubConfig, +): Promise<{ ok: number; gone: number }> { + let ok = 0 + let gone = 0 + + for (const row of rows) { + const result = await hubFetchWithRetries(config.hubBaseUrl, row.image_name) + if (result) { + await upsertRepoDocker(qx, row.repo_id, result) + ok++ + } else { + // Image deleted or unreachable — bump last_synced_at so we don't retry + // it every page; the row (and its pulls_daily history) is kept. + await touchRepoDocker(qx, row.image_name) + gone++ + } + } + + return { ok, gone } +} + +async function fetchDiscoveryPage( + qx: QueryExecutor, + cursor: string | null, + config: DockerhubConfig, +): Promise { + return qx.select( + ` + SELECT id, url, owner, name + FROM repos + WHERE host = 'github' + AND (docker_checked_at IS NULL + OR docker_checked_at < NOW() - make_interval(days => $(discoveryIntervalDays))) + AND ($(cursor)::bigint IS NULL OR id > $(cursor)::bigint) + ORDER BY id + LIMIT $(batchSize) + `, + { + cursor, + batchSize: config.batchSize, + discoveryIntervalDays: config.discoveryIntervalDays, + }, + ) +} + +function resolveOwnerName(row: DiscoveryRepoRow): { owner: string; name: string } | null { + if (row.owner && row.name) return { owner: row.owner, name: row.name } + try { + return parseGithubUrl(row.url) + } catch { + return null + } +} + +async function discoverRepo( + qx: QueryExecutor, + row: DiscoveryRepoRow, + token: string, + config: DockerhubConfig, +): Promise<'hit' | 'miss' | 'skip'> { + const parsed = resolveOwnerName(row) + if (!parsed) { + await markDockerChecked(qx, row.id) + return 'skip' + } + + const hasDockerfile = await githubFetchWithRetries(parsed.owner, parsed.name, token) + if (hasDockerfile === null) { + // GitHub lookup failed (404/auth/gave-up). Mark checked so the backlog + // drains; the discoveryIntervalDays re-check will try again later. + await markDockerChecked(qx, row.id) + return 'skip' + } + if (!hasDockerfile) { + await markDockerChecked(qx, row.id) + return 'miss' + } + + for (const candidate of buildCandidates(parsed.owner, parsed.name)) { + const result = await hubFetchWithRetries(config.hubBaseUrl, candidate) + if (result) { + await upsertRepoDocker(qx, row.id, result) + await markDockerChecked(qx, row.id) + return 'hit' + } + } + + await markDockerChecked(qx, row.id) + return 'miss' +} + +async function processDiscoveryPage( + qx: QueryExecutor, + rows: DiscoveryRepoRow[], + parkedUntil: Map, + config: DockerhubConfig, +): Promise<{ hits: number; misses: number; skipped: number; failed: number }> { + let hits = 0 + let misses = 0 + let skipped = 0 + let failed = 0 + let nextIdx = 0 + + await Promise.all( + config.tokens.map(async (token, tokenIdx) => { + const initialPark = (parkedUntil.get(token) ?? 0) - Date.now() + if (initialPark > 0) { + log.warn(`token#${tokenIdx} still parked, waiting ${Math.round(initialPark / 1000)}s`) + await new Promise((r) => setTimeout(r, initialPark)) + } + + while (nextIdx < rows.length) { + const idx = nextIdx++ + const row = rows[idx] + + try { + const outcome = await discoverRepo(qx, row, token, config) + if (outcome === 'hit') hits++ + else if (outcome === 'miss') misses++ + else skipped++ + } catch (err) { + if (err instanceof FetchError && err.kind === 'RATE_LIMIT') { + const resetAt = err.resetAt ?? Date.now() + 60_000 + const waitMs = Math.max(1_000, resetAt - Date.now()) + parkedUntil.set(token, resetAt) + log.warn( + { tokenIdx, parkedUntil: new Date(resetAt).toISOString() }, + `token#${tokenIdx} rate limited — parking for ${Math.round(waitMs / 1000)}s`, + ) + await new Promise((r) => setTimeout(r, waitMs)) + failed++ + } else { + log.error({ url: row.url, err }, 'Unexpected discovery error') + failed++ + } + } + } + }), + ) + + return { hits, misses, skipped, failed } +} + +export async function runDockerhubLoop( + qx: QueryExecutor, + config: DockerhubConfig, + isShuttingDown: () => boolean, +): Promise { + const githubParkedUntil = new Map() + let refreshCursor: string | null = null + let discoveryCursor: string | null = null + let pageNum = 0 + + while (!isShuttingDown()) { + pageNum++ + + const refreshRows = await fetchRefreshPage(qx, refreshCursor, config) + if (refreshRows.length > 0) { + const stats = await processRefreshPage(qx, refreshRows, config) + log.info( + `Refresh page ${pageNum}: read=${refreshRows.length} ok=${stats.ok} gone=${stats.gone}`, + ) + refreshCursor = refreshRows[refreshRows.length - 1].id + } else { + refreshCursor = null + } + + if (isShuttingDown()) break + + const discoveryRows = await fetchDiscoveryPage(qx, discoveryCursor, config) + if (discoveryRows.length > 0) { + const stats = await processDiscoveryPage(qx, discoveryRows, githubParkedUntil, config) + log.info( + `Discovery page ${pageNum}: read=${discoveryRows.length} hits=${stats.hits} ` + + `misses=${stats.misses} skipped=${stats.skipped} failed=${stats.failed}`, + ) + discoveryCursor = discoveryRows[discoveryRows.length - 1].id + } else { + discoveryCursor = null + } + + if (refreshRows.length === 0 && discoveryRows.length === 0) { + log.info('No work, sleeping') + await new Promise((r) => setTimeout(r, config.idleSleepSec * 1000)) + } + } +} diff --git a/services/apps/packages_worker/src/dockerhub/types.ts b/services/apps/packages_worker/src/dockerhub/types.ts new file mode 100644 index 0000000000..c73aa69097 --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/types.ts @@ -0,0 +1,32 @@ +export interface DockerhubRepoResult { + imageName: string // '/', lowercase + pulls: number + stars: number + lastUpdated: string | null +} + +export interface DiscoveryRepoRow { + id: string + url: string + owner: string | null + name: string | null +} + +export interface RefreshImageRow { + id: string + repo_id: string | null + image_name: string +} + +export type FetchErrorKind = 'RATE_LIMIT' | 'TRANSIENT' | 'NOT_FOUND' | 'AUTH' | 'MALFORMED' + +export class FetchError extends Error { + constructor( + public readonly kind: FetchErrorKind, + message: string, + public readonly resetAt?: number, + ) { + super(message) + this.name = 'FetchError' + } +} diff --git a/services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts b/services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts new file mode 100644 index 0000000000..832087e02b --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts @@ -0,0 +1,41 @@ +import { QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' + +import { DockerhubRepoResult } from './types' + +export async function upsertRepoDocker( + qx: QueryExecutor, + repoId: string | null, + r: DockerhubRepoResult, +): Promise { + await qx.result( + ` + INSERT INTO repo_docker (repo_id, image_name, pulls, stars, last_synced_at) + VALUES ($(repoId), $(imageName), $(pulls), $(stars), NOW()) + ON CONFLICT (image_name) DO UPDATE SET + repo_id = COALESCE(repo_docker.repo_id, EXCLUDED.repo_id), + pulls = EXCLUDED.pulls, + stars = EXCLUDED.stars, + last_synced_at = NOW() + `, + { repoId, imageName: r.imageName, pulls: r.pulls, stars: r.stars }, + ) + + await qx.result( + ` + INSERT INTO repo_docker_pulls_daily (image_name, date, pulls_total) + VALUES ($(imageName), CURRENT_DATE, $(pulls)) + ON CONFLICT (image_name, date) DO UPDATE SET pulls_total = EXCLUDED.pulls_total + `, + { imageName: r.imageName, pulls: r.pulls }, + ) +} + +export async function touchRepoDocker(qx: QueryExecutor, imageName: string): Promise { + await qx.result(`UPDATE repo_docker SET last_synced_at = NOW() WHERE image_name = $(imageName)`, { + imageName, + }) +} + +export async function markDockerChecked(qx: QueryExecutor, repoId: string): Promise { + await qx.result(`UPDATE repos SET docker_checked_at = NOW() WHERE id = $(repoId)`, { repoId }) +} From d200d43df413213c9b319ef0b0a5793b52df6c71 Mon Sep 17 00:00:00 2001 From: Joan Reyero Date: Wed, 3 Jun 2026 18:11:48 +0200 Subject: [PATCH 2/8] fix: address bot review on dockerhub-sync (CM-1213) - Retry the same row after a GitHub rate-limit park instead of abandoning it (cursor would otherwise advance past unprobed repos until end-of-sweep). - Serialize Docker Hub calls via a promise chain so the per-token GitHub fan-out cannot fire concurrent requests against the per-IP Hub budget. - 401/403 from Hub now classified AUTH and propagated, so a misconfigured base URL fails fast instead of silently marking every image gone. - Stop discarding valid 200 responses when x-ratelimit-remaining=0. - Wrap repo_docker + repo_docker_pulls_daily writes in a transaction. - Classify non-JSON GitHub GraphQL bodies as MALFORMED. Not addressed (replied on PR): - Inline SQL stays per packages_worker convention (matches enricher/osv). - repo_docker_pulls_daily partition setup deferred to pg_partman, same as downloads_daily in the same migration. - Loop-level retry/parking tests deferred; validated against 1065 real repos. Co-Authored-By: Claude Opus 4.8 Signed-off-by: Joan Reyero --- .../__tests__/fetchDockerhub.test.ts | 16 ++++- .../src/dockerhub/detectDockerfile.ts | 10 ++- .../src/dockerhub/fetchDockerhub.ts | 11 +-- .../packages_worker/src/dockerhub/index.ts | 72 +++++++++++++------ .../src/dockerhub/upsertRepoDocker.ts | 45 ++++++------ 5 files changed, 103 insertions(+), 51 deletions(-) diff --git a/services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts b/services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts index fa689be5fe..5b7ff4ac1c 100644 --- a/services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts +++ b/services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts @@ -68,9 +68,19 @@ describe('fetchDockerhub', () => { } }) - it('classifies x-ratelimit-remaining: 0 as RATE_LIMIT even on 200', async () => { - mockFetch(200, { pull_count: 1 }, { 'x-ratelimit-remaining': '0' }) - await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'RATE_LIMIT' }) + it('does NOT discard a 200 response when x-ratelimit-remaining is 0', async () => { + // remaining=0 means this request consumed the last slot; the response itself + // is valid. The next call will 429 and park then. + mockFetch(200, { pull_count: 1, star_count: 0 }, { 'x-ratelimit-remaining': '0' }) + const r = await fetchDockerhub(BASE, 'a/b') + expect(r.pulls).toBe(1) + }) + + it('classifies 401/403 as AUTH so misconfig surfaces instead of looking like a miss', async () => { + mockFetch(401, { message: 'unauthorized' }) + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'AUTH' }) + mockFetch(403, { message: 'forbidden' }) + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'AUTH' }) }) it('classifies 5xx as TRANSIENT', async () => { diff --git a/services/apps/packages_worker/src/dockerhub/detectDockerfile.ts b/services/apps/packages_worker/src/dockerhub/detectDockerfile.ts index 281defd316..8a850ef70a 100644 --- a/services/apps/packages_worker/src/dockerhub/detectDockerfile.ts +++ b/services/apps/packages_worker/src/dockerhub/detectDockerfile.ts @@ -58,7 +58,15 @@ export async function detectDockerfile( } // eslint-disable-next-line @typescript-eslint/no-explicit-any - const json = (await response.json()) as any + let json: any + try { + json = await response.json() + } catch (err) { + throw new FetchError( + 'MALFORMED', + `Non-JSON body for ${owner}/${name}: ${(err as Error).message}`, + ) + } if (json.errors?.length) { const err = json.errors[0] diff --git a/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts b/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts index f03ead3a7e..f4fe347baa 100644 --- a/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts +++ b/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts @@ -26,20 +26,23 @@ export async function fetchDockerhub( throw new FetchError('TRANSIENT', `Network error for ${imageName}: ${(err as Error).message}`) } - const remaining = parseInt(response.headers.get('x-ratelimit-remaining') ?? '', 10) const resetSec = parseInt(response.headers.get('x-ratelimit-reset') ?? '0', 10) const resetMs = resetSec ? resetSec * 1000 + 5_000 : Date.now() + 65_000 - if (response.status === 429 || (Number.isFinite(remaining) && remaining <= 0)) { + if (response.status === 429) { throw new FetchError('RATE_LIMIT', `Rate limited on ${imageName}`, resetMs) } - + if (response.status === 401 || response.status === 403) { + // Surface as AUTH so the loop fails fast instead of silently marking every + // image "gone" when the base URL is misconfigured or Hub starts requiring auth. + throw new FetchError('AUTH', `${response.status} for ${imageName}`) + } if (response.status === 404) throw new FetchError('NOT_FOUND', `404 for ${imageName}`) if (response.status >= 500) { throw new FetchError('TRANSIENT', `${response.status} for ${imageName}`) } if (!response.ok) { - // 400/401/403 etc — treat as a miss; Hub sometimes 400s on malformed slugs. + // 400 etc — Hub sometimes 400s on malformed slugs; treat as a miss. throw new FetchError('NOT_FOUND', `${response.status} for ${imageName}`) } diff --git a/services/apps/packages_worker/src/dockerhub/index.ts b/services/apps/packages_worker/src/dockerhub/index.ts index e145b9465d..b5d8ce9482 100644 --- a/services/apps/packages_worker/src/dockerhub/index.ts +++ b/services/apps/packages_worker/src/dockerhub/index.ts @@ -16,12 +16,23 @@ const MAX_RETRIES = 3 type DockerhubConfig = ReturnType -// Docker Hub's anonymous rate limit is per-IP, not per-token, so unlike the -// GitHub fan-out below this stays a single sequential caller. hubParkedUntil -// is module state so the park survives across refresh and discovery pages. +// Docker Hub's anonymous rate limit is per-IP, not per-token. hubParkedUntil +// is module state so the park survives across refresh and discovery pages, and +// hubChain serializes calls so the per-token GitHub fan-out below can't fire +// concurrent Hub requests against that single per-IP budget. let hubParkedUntil = 0 +let hubChain: Promise = Promise.resolve() -async function hubFetchWithRetries( +function hubFetchWithRetries( + baseUrl: string, + imageName: string, +): Promise { + const run = hubChain.then(() => hubFetchInner(baseUrl, imageName)) + hubChain = run.catch(() => undefined) + return run +} + +async function hubFetchInner( baseUrl: string, imageName: string, ): Promise { @@ -37,6 +48,12 @@ async function hubFetchWithRetries( if (!(err instanceof FetchError)) throw err if (err.kind === 'NOT_FOUND') return null + if (err.kind === 'AUTH') { + // Systemic misconfig (wrong base URL, Hub now requires auth) — propagate + // so the worker exits instead of silently marking every image gone. + log.error({ imageName }, err.message) + throw err + } if (err.kind === 'MALFORMED') { log.warn({ imageName }, err.message) return null @@ -229,25 +246,34 @@ async function processDiscoveryPage( const idx = nextIdx++ const row = rows[idx] - try { - const outcome = await discoverRepo(qx, row, token, config) - if (outcome === 'hit') hits++ - else if (outcome === 'miss') misses++ - else skipped++ - } catch (err) { - if (err instanceof FetchError && err.kind === 'RATE_LIMIT') { - const resetAt = err.resetAt ?? Date.now() + 60_000 - const waitMs = Math.max(1_000, resetAt - Date.now()) - parkedUntil.set(token, resetAt) - log.warn( - { tokenIdx, parkedUntil: new Date(resetAt).toISOString() }, - `token#${tokenIdx} rate limited — parking for ${Math.round(waitMs / 1000)}s`, - ) - await new Promise((r) => setTimeout(r, waitMs)) - failed++ - } else { - log.error({ url: row.url, err }, 'Unexpected discovery error') - failed++ + // Retry the same row after a GitHub rate-limit park instead of abandoning + // it. Without this the cursor advances past a row whose docker_checked_at + // was never set, and it isn't picked up again until the cursor wraps to + // null at end-of-backlog (potentially days on a 600k-row sweep). + let done = false + while (!done) { + try { + const outcome = await discoverRepo(qx, row, token, config) + if (outcome === 'hit') hits++ + else if (outcome === 'miss') misses++ + else skipped++ + done = true + } catch (err) { + if (err instanceof FetchError && err.kind === 'RATE_LIMIT') { + const resetAt = err.resetAt ?? Date.now() + 60_000 + const waitMs = Math.max(1_000, resetAt - Date.now()) + parkedUntil.set(token, resetAt) + log.warn( + { tokenIdx, parkedUntil: new Date(resetAt).toISOString() }, + `token#${tokenIdx} rate limited — parking for ${Math.round(waitMs / 1000)}s`, + ) + await new Promise((r) => setTimeout(r, waitMs)) + // loop retries the same row + } else { + log.error({ url: row.url, err }, 'Unexpected discovery error') + failed++ + done = true + } } } } diff --git a/services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts b/services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts index 832087e02b..a3712e3173 100644 --- a/services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts +++ b/services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts @@ -7,27 +7,32 @@ export async function upsertRepoDocker( repoId: string | null, r: DockerhubRepoResult, ): Promise { - await qx.result( - ` - INSERT INTO repo_docker (repo_id, image_name, pulls, stars, last_synced_at) - VALUES ($(repoId), $(imageName), $(pulls), $(stars), NOW()) - ON CONFLICT (image_name) DO UPDATE SET - repo_id = COALESCE(repo_docker.repo_id, EXCLUDED.repo_id), - pulls = EXCLUDED.pulls, - stars = EXCLUDED.stars, - last_synced_at = NOW() - `, - { repoId, imageName: r.imageName, pulls: r.pulls, stars: r.stars }, - ) + // Transactional so a missing partition on repo_docker_pulls_daily (or any other + // failure on the snapshot insert) rolls back the last_synced_at bump on repo_docker + // too — otherwise the row drops out of the refresh queue for 24h with no snapshot. + await qx.tx(async (tx) => { + await tx.result( + ` + INSERT INTO repo_docker (repo_id, image_name, pulls, stars, last_synced_at) + VALUES ($(repoId), $(imageName), $(pulls), $(stars), NOW()) + ON CONFLICT (image_name) DO UPDATE SET + repo_id = COALESCE(repo_docker.repo_id, EXCLUDED.repo_id), + pulls = EXCLUDED.pulls, + stars = EXCLUDED.stars, + last_synced_at = NOW() + `, + { repoId, imageName: r.imageName, pulls: r.pulls, stars: r.stars }, + ) - await qx.result( - ` - INSERT INTO repo_docker_pulls_daily (image_name, date, pulls_total) - VALUES ($(imageName), CURRENT_DATE, $(pulls)) - ON CONFLICT (image_name, date) DO UPDATE SET pulls_total = EXCLUDED.pulls_total - `, - { imageName: r.imageName, pulls: r.pulls }, - ) + await tx.result( + ` + INSERT INTO repo_docker_pulls_daily (image_name, date, pulls_total) + VALUES ($(imageName), CURRENT_DATE, $(pulls)) + ON CONFLICT (image_name, date) DO UPDATE SET pulls_total = EXCLUDED.pulls_total + `, + { imageName: r.imageName, pulls: r.pulls }, + ) + }) } export async function touchRepoDocker(qx: QueryExecutor, imageName: string): Promise { From 09d2a00a4c24561497d919e9a0c4bb3506ff9ace Mon Sep 17 00:00:00 2001 From: Joan Reyero Date: Wed, 3 Jun 2026 18:23:44 +0200 Subject: [PATCH 3/8] refactor: move dockerhub-sync queries to data-access-layer (CM-1213) Per themarolt's review on #4149, packages-db queries belong in services/libs/data-access-layer/src/packages/ alongside osv.ts. The worker now imports fetchStaleRepoDocker, fetchPendingDockerRepos, upsertRepoDockerRow, upsertRepoDockerDailySnapshot, touchRepoDocker, markRepoDockerChecked from @crowd/data-access-layer; dockerhub/upsertRepoDocker.ts is reduced to the tx orchestrator. Query strings unchanged. Co-Authored-By: Claude Opus 4.8 Signed-off-by: Joan Reyero --- .../packages_worker/src/dockerhub/index.ts | 90 +++++-------- .../packages_worker/src/dockerhub/types.ts | 13 -- .../src/dockerhub/upsertRepoDocker.ts | 42 ++---- .../data-access-layer/src/packages/index.ts | 1 + .../src/packages/repoDocker.ts | 126 ++++++++++++++++++ 5 files changed, 169 insertions(+), 103 deletions(-) create mode 100644 services/libs/data-access-layer/src/packages/repoDocker.ts diff --git a/services/apps/packages_worker/src/dockerhub/index.ts b/services/apps/packages_worker/src/dockerhub/index.ts index b5d8ce9482..c797103894 100644 --- a/services/apps/packages_worker/src/dockerhub/index.ts +++ b/services/apps/packages_worker/src/dockerhub/index.ts @@ -1,3 +1,11 @@ +import { + PendingDockerRepoRow, + StaleRepoDockerRow, + fetchPendingDockerRepos, + fetchStaleRepoDocker, + markRepoDockerChecked, + touchRepoDocker, +} from '@crowd/data-access-layer/src/packages' import { QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' import { getServiceChildLogger } from '@crowd/logging' @@ -7,8 +15,8 @@ import { parseGithubUrl } from '../enricher/fetchLightRepo' import { buildCandidates } from './candidates' import { detectDockerfile } from './detectDockerfile' import { fetchDockerhub } from './fetchDockerhub' -import { DiscoveryRepoRow, DockerhubRepoResult, FetchError, RefreshImageRow } from './types' -import { markDockerChecked, touchRepoDocker, upsertRepoDocker } from './upsertRepoDocker' +import { DockerhubRepoResult, FetchError } from './types' +import { upsertRepoDocker } from './upsertRepoDocker' const log = getServiceChildLogger('dockerhub-sync') @@ -106,31 +114,9 @@ async function githubFetchWithRetries( return null } -async function fetchRefreshPage( - qx: QueryExecutor, - cursor: string | null, - config: DockerhubConfig, -): Promise { - return qx.select( - ` - SELECT id, repo_id, image_name - FROM repo_docker - WHERE last_synced_at < NOW() - make_interval(hours => $(refreshIntervalHours)) - AND ($(cursor)::bigint IS NULL OR id > $(cursor)::bigint) - ORDER BY id - LIMIT $(batchSize) - `, - { - cursor, - batchSize: config.batchSize, - refreshIntervalHours: config.refreshIntervalHours, - }, - ) -} - async function processRefreshPage( qx: QueryExecutor, - rows: RefreshImageRow[], + rows: StaleRepoDockerRow[], config: DockerhubConfig, ): Promise<{ ok: number; gone: number }> { let ok = 0 @@ -152,31 +138,7 @@ async function processRefreshPage( return { ok, gone } } -async function fetchDiscoveryPage( - qx: QueryExecutor, - cursor: string | null, - config: DockerhubConfig, -): Promise { - return qx.select( - ` - SELECT id, url, owner, name - FROM repos - WHERE host = 'github' - AND (docker_checked_at IS NULL - OR docker_checked_at < NOW() - make_interval(days => $(discoveryIntervalDays))) - AND ($(cursor)::bigint IS NULL OR id > $(cursor)::bigint) - ORDER BY id - LIMIT $(batchSize) - `, - { - cursor, - batchSize: config.batchSize, - discoveryIntervalDays: config.discoveryIntervalDays, - }, - ) -} - -function resolveOwnerName(row: DiscoveryRepoRow): { owner: string; name: string } | null { +function resolveOwnerName(row: PendingDockerRepoRow): { owner: string; name: string } | null { if (row.owner && row.name) return { owner: row.owner, name: row.name } try { return parseGithubUrl(row.url) @@ -187,13 +149,13 @@ function resolveOwnerName(row: DiscoveryRepoRow): { owner: string; name: string async function discoverRepo( qx: QueryExecutor, - row: DiscoveryRepoRow, + row: PendingDockerRepoRow, token: string, config: DockerhubConfig, ): Promise<'hit' | 'miss' | 'skip'> { const parsed = resolveOwnerName(row) if (!parsed) { - await markDockerChecked(qx, row.id) + await markRepoDockerChecked(qx, row.id) return 'skip' } @@ -201,11 +163,11 @@ async function discoverRepo( if (hasDockerfile === null) { // GitHub lookup failed (404/auth/gave-up). Mark checked so the backlog // drains; the discoveryIntervalDays re-check will try again later. - await markDockerChecked(qx, row.id) + await markRepoDockerChecked(qx, row.id) return 'skip' } if (!hasDockerfile) { - await markDockerChecked(qx, row.id) + await markRepoDockerChecked(qx, row.id) return 'miss' } @@ -213,18 +175,18 @@ async function discoverRepo( const result = await hubFetchWithRetries(config.hubBaseUrl, candidate) if (result) { await upsertRepoDocker(qx, row.id, result) - await markDockerChecked(qx, row.id) + await markRepoDockerChecked(qx, row.id) return 'hit' } } - await markDockerChecked(qx, row.id) + await markRepoDockerChecked(qx, row.id) return 'miss' } async function processDiscoveryPage( qx: QueryExecutor, - rows: DiscoveryRepoRow[], + rows: PendingDockerRepoRow[], parkedUntil: Map, config: DockerhubConfig, ): Promise<{ hits: number; misses: number; skipped: number; failed: number }> { @@ -296,7 +258,12 @@ export async function runDockerhubLoop( while (!isShuttingDown()) { pageNum++ - const refreshRows = await fetchRefreshPage(qx, refreshCursor, config) + const refreshRows = await fetchStaleRepoDocker( + qx, + refreshCursor, + config.batchSize, + config.refreshIntervalHours, + ) if (refreshRows.length > 0) { const stats = await processRefreshPage(qx, refreshRows, config) log.info( @@ -309,7 +276,12 @@ export async function runDockerhubLoop( if (isShuttingDown()) break - const discoveryRows = await fetchDiscoveryPage(qx, discoveryCursor, config) + const discoveryRows = await fetchPendingDockerRepos( + qx, + discoveryCursor, + config.batchSize, + config.discoveryIntervalDays, + ) if (discoveryRows.length > 0) { const stats = await processDiscoveryPage(qx, discoveryRows, githubParkedUntil, config) log.info( diff --git a/services/apps/packages_worker/src/dockerhub/types.ts b/services/apps/packages_worker/src/dockerhub/types.ts index c73aa69097..4ae67cdef9 100644 --- a/services/apps/packages_worker/src/dockerhub/types.ts +++ b/services/apps/packages_worker/src/dockerhub/types.ts @@ -5,19 +5,6 @@ export interface DockerhubRepoResult { lastUpdated: string | null } -export interface DiscoveryRepoRow { - id: string - url: string - owner: string | null - name: string | null -} - -export interface RefreshImageRow { - id: string - repo_id: string | null - image_name: string -} - export type FetchErrorKind = 'RATE_LIMIT' | 'TRANSIENT' | 'NOT_FOUND' | 'AUTH' | 'MALFORMED' export class FetchError extends Error { diff --git a/services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts b/services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts index a3712e3173..d9cff13773 100644 --- a/services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts +++ b/services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts @@ -1,3 +1,7 @@ +import { + upsertRepoDockerDailySnapshot, + upsertRepoDockerRow, +} from '@crowd/data-access-layer/src/packages' import { QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' import { DockerhubRepoResult } from './types' @@ -11,36 +15,12 @@ export async function upsertRepoDocker( // failure on the snapshot insert) rolls back the last_synced_at bump on repo_docker // too — otherwise the row drops out of the refresh queue for 24h with no snapshot. await qx.tx(async (tx) => { - await tx.result( - ` - INSERT INTO repo_docker (repo_id, image_name, pulls, stars, last_synced_at) - VALUES ($(repoId), $(imageName), $(pulls), $(stars), NOW()) - ON CONFLICT (image_name) DO UPDATE SET - repo_id = COALESCE(repo_docker.repo_id, EXCLUDED.repo_id), - pulls = EXCLUDED.pulls, - stars = EXCLUDED.stars, - last_synced_at = NOW() - `, - { repoId, imageName: r.imageName, pulls: r.pulls, stars: r.stars }, - ) - - await tx.result( - ` - INSERT INTO repo_docker_pulls_daily (image_name, date, pulls_total) - VALUES ($(imageName), CURRENT_DATE, $(pulls)) - ON CONFLICT (image_name, date) DO UPDATE SET pulls_total = EXCLUDED.pulls_total - `, - { imageName: r.imageName, pulls: r.pulls }, - ) - }) -} - -export async function touchRepoDocker(qx: QueryExecutor, imageName: string): Promise { - await qx.result(`UPDATE repo_docker SET last_synced_at = NOW() WHERE image_name = $(imageName)`, { - imageName, + await upsertRepoDockerRow(tx, { + repoId, + imageName: r.imageName, + pulls: r.pulls, + stars: r.stars, + }) + await upsertRepoDockerDailySnapshot(tx, r.imageName, r.pulls) }) } - -export async function markDockerChecked(qx: QueryExecutor, repoId: string): Promise { - await qx.result(`UPDATE repos SET docker_checked_at = NOW() WHERE id = $(repoId)`, { repoId }) -} diff --git a/services/libs/data-access-layer/src/packages/index.ts b/services/libs/data-access-layer/src/packages/index.ts index 4ed98a8325..be769bbfe0 100644 --- a/services/libs/data-access-layer/src/packages/index.ts +++ b/services/libs/data-access-layer/src/packages/index.ts @@ -1 +1,2 @@ export * from './osv' +export * from './repoDocker' diff --git a/services/libs/data-access-layer/src/packages/repoDocker.ts b/services/libs/data-access-layer/src/packages/repoDocker.ts new file mode 100644 index 0000000000..5fe7fca60c --- /dev/null +++ b/services/libs/data-access-layer/src/packages/repoDocker.ts @@ -0,0 +1,126 @@ +import { QueryExecutor } from '../queryExecutor' + +// Database access layer for the dockerhub-sync worker. All repo_docker / +// repo_docker_pulls_daily / repos.docker_checked_at queries used by +// services/apps/packages_worker/src/dockerhub live here; the worker keeps the +// HTTP clients, rate-limit handling, and loop orchestration. +// +// Tables are defined in backend/src/osspckgs/migrations/V1779710880__initial_schema.sql. + +// ---- query result shapes ---- + +export interface StaleRepoDockerRow { + id: string + repo_id: string | null + image_name: string +} + +export interface PendingDockerRepoRow { + id: string + url: string + owner: string | null + name: string | null +} + +// ---- input shapes ---- + +export interface RepoDockerUpsertInput { + repoId: string | null + imageName: string + pulls: number + stars: number +} + +// ---- reads ---- + +// repo_docker rows whose last_synced_at is older than the refresh interval, +// keyset-paginated by id. Backed by repo_docker_stale_idx. +export async function fetchStaleRepoDocker( + qx: QueryExecutor, + cursor: string | null, + batchSize: number, + refreshIntervalHours: number, +): Promise { + return qx.select( + ` + SELECT id, repo_id, image_name + FROM repo_docker + WHERE last_synced_at < NOW() - make_interval(hours => $(refreshIntervalHours)) + AND ($(cursor)::bigint IS NULL OR id > $(cursor)::bigint) + ORDER BY id + LIMIT $(batchSize) + `, + { cursor, batchSize, refreshIntervalHours }, + ) +} + +// GitHub repos that have never been probed for a Docker image (or whose probe +// is older than discoveryIntervalDays), keyset-paginated by id. The +// docker_checked_at IS NULL arm is backed by repos_docker_pending_idx. +export async function fetchPendingDockerRepos( + qx: QueryExecutor, + cursor: string | null, + batchSize: number, + discoveryIntervalDays: number, +): Promise { + return qx.select( + ` + SELECT id, url, owner, name + FROM repos + WHERE host = 'github' + AND (docker_checked_at IS NULL + OR docker_checked_at < NOW() - make_interval(days => $(discoveryIntervalDays))) + AND ($(cursor)::bigint IS NULL OR id > $(cursor)::bigint) + ORDER BY id + LIMIT $(batchSize) + `, + { cursor, batchSize, discoveryIntervalDays }, + ) +} + +// ---- writes ---- + +export async function upsertRepoDockerRow( + qx: QueryExecutor, + input: RepoDockerUpsertInput, +): Promise { + await qx.result( + ` + INSERT INTO repo_docker (repo_id, image_name, pulls, stars, last_synced_at) + VALUES ($(repoId), $(imageName), $(pulls), $(stars), NOW()) + ON CONFLICT (image_name) DO UPDATE SET + repo_id = COALESCE(repo_docker.repo_id, EXCLUDED.repo_id), + pulls = EXCLUDED.pulls, + stars = EXCLUDED.stars, + last_synced_at = NOW() + `, + input, + ) +} + +export async function upsertRepoDockerDailySnapshot( + qx: QueryExecutor, + imageName: string, + pullsTotal: number, +): Promise { + await qx.result( + ` + INSERT INTO repo_docker_pulls_daily (image_name, date, pulls_total) + VALUES ($(imageName), CURRENT_DATE, $(pullsTotal)) + ON CONFLICT (image_name, date) DO UPDATE SET pulls_total = EXCLUDED.pulls_total + `, + { imageName, pullsTotal }, + ) +} + +// Bumps last_synced_at without changing pull/star data. Used when an image +// 404s on refresh so it drops out of the stale-queue without losing history. +export async function touchRepoDocker(qx: QueryExecutor, imageName: string): Promise { + await qx.result(`UPDATE repo_docker SET last_synced_at = NOW() WHERE image_name = $(imageName)`, { + imageName, + }) +} + +export async function markRepoDockerChecked(qx: QueryExecutor, repoId: string): Promise { + await qx.result(`UPDATE repos SET docker_checked_at = NOW() WHERE id = $(repoId)`, { repoId }) +} From 6c0d8be30d69b2a8728f9bba7c8392f83295ce25 Mon Sep 17 00:00:00 2001 From: Joan Reyero Date: Fri, 5 Jun 2026 15:07:32 +0100 Subject: [PATCH 4/8] fix: add 30s fetch timeouts to dockerhub-sync HTTP calls (CM-1213) Hub calls are serialized via hubChain, so a stalled socket would block all subsequent probes indefinitely. AbortSignal.timeout(30s) on both the Hub and GitHub GraphQL requests; aborts surface as TRANSIENT and retry with backoff. Co-Authored-By: Claude Opus 4.8 Signed-off-by: Joan Reyero --- .../apps/packages_worker/src/dockerhub/detectDockerfile.ts | 1 + services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts | 3 +++ 2 files changed, 4 insertions(+) diff --git a/services/apps/packages_worker/src/dockerhub/detectDockerfile.ts b/services/apps/packages_worker/src/dockerhub/detectDockerfile.ts index 8a850ef70a..2c5d010cdf 100644 --- a/services/apps/packages_worker/src/dockerhub/detectDockerfile.ts +++ b/services/apps/packages_worker/src/dockerhub/detectDockerfile.ts @@ -29,6 +29,7 @@ export async function detectDockerfile( 'Content-Type': 'application/json', }, body: JSON.stringify({ query: DOCKERFILE_QUERY, variables: { owner, name } }), + signal: AbortSignal.timeout(30_000), }) } catch (err) { throw new FetchError( diff --git a/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts b/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts index f4fe347baa..44584bd8a9 100644 --- a/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts +++ b/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts @@ -21,6 +21,9 @@ export async function fetchDockerhub( response = await fetch(url, { method: 'GET', headers: { Accept: 'application/json' }, + // Hub calls are serialized via hubChain in the loop — a stalled socket + // would otherwise block every subsequent Hub probe indefinitely. + signal: AbortSignal.timeout(30_000), }) } catch (err) { throw new FetchError('TRANSIENT', `Network error for ${imageName}: ${(err as Error).message}`) From 12ed78f2c0d6b0471893c49c682e0db7edbd75ec Mon Sep 17 00:00:00 2001 From: Joan Reyero Date: Fri, 5 Jun 2026 15:19:20 +0100 Subject: [PATCH 5/8] refactor: switch dockerhub-sync to GitHub App tokens (CM-1213) Aligns with enricher-v2 (#4165). getDockerhubConfig drops the ENRICHER_GITHUB_TOKENS PAT pool; the entrypoint now calls getGithubAppConfig + resolveInstallations + fetchRateLimitDiagnostics, and the discovery fan-out runs one worker per installation id with parkedUntil keyed on installationId. getInstallationToken is called per-request so token refresh/caching is shared with the enricher. Co-Authored-By: Claude Opus 4.8 Signed-off-by: Joan Reyero --- .../packages_worker/src/bin/dockerhub-sync.ts | 20 +++++++--- services/apps/packages_worker/src/config.ts | 8 ---- .../packages_worker/src/dockerhub/index.ts | 38 ++++++++++++++----- 3 files changed, 44 insertions(+), 22 deletions(-) diff --git a/services/apps/packages_worker/src/bin/dockerhub-sync.ts b/services/apps/packages_worker/src/bin/dockerhub-sync.ts index 40602204f4..e7aafa7a5a 100644 --- a/services/apps/packages_worker/src/bin/dockerhub-sync.ts +++ b/services/apps/packages_worker/src/bin/dockerhub-sync.ts @@ -1,8 +1,9 @@ import { getServiceLogger } from '@crowd/logging' -import { getDockerhubConfig } from '../config' +import { getDockerhubConfig, getGithubAppConfig } from '../config' import { getPackagesDb } from '../db' import { runDockerhubLoop } from '../dockerhub' +import { fetchRateLimitDiagnostics, resolveInstallations } from '../enricher/githubAppAuth' const log = getServiceLogger() @@ -21,22 +22,31 @@ const main = async () => { log.info('dockerhub-sync starting...') const config = getDockerhubConfig() + const appConfig = getGithubAppConfig() - if (config.tokens.length === 0) { - log.error('ENRICHER_GITHUB_TOKENS is required (comma-separated PATs)') + const installationIds = await resolveInstallations(appConfig) + + if (installationIds.length === 0) { + log.error('No GitHub App installations found — cannot build token pool') process.exit(1) } + await fetchRateLimitDiagnostics(appConfig.appId, appConfig.privateKeyPem, installationIds) + const qx = await getPackagesDb() await qx.selectOne('SELECT 1') log.info('Connected to packages-db.') log.info( - { tokens: config.tokens.length, batchSize: config.batchSize, hubBaseUrl: config.hubBaseUrl }, + { + installations: installationIds.length, + batchSize: config.batchSize, + hubBaseUrl: config.hubBaseUrl, + }, 'Starting dockerhub loop', ) - await runDockerhubLoop(qx, config, () => shuttingDown) + await runDockerhubLoop(qx, installationIds, appConfig, config, () => shuttingDown) log.info('dockerhub-sync stopped.') process.exit(0) diff --git a/services/apps/packages_worker/src/config.ts b/services/apps/packages_worker/src/config.ts index df2f46980c..68b5d230b9 100644 --- a/services/apps/packages_worker/src/config.ts +++ b/services/apps/packages_worker/src/config.ts @@ -47,15 +47,7 @@ export function getEnricherConfig() { } export function getDockerhubConfig() { - const rawTokens = process.env.ENRICHER_GITHUB_TOKENS ?? '' - const tokens = rawTokens - .split(',') - .map((t) => t.trim()) - .filter(Boolean) - return { - // GitHub PATs reused from the enricher for Dockerfile detection (GraphQL). - tokens, hubBaseUrl: requireEnv('DOCKERHUB_API_BASE_URL'), batchSize: requireEnvInt('DOCKERHUB_BATCH_SIZE'), refreshIntervalHours: requireEnvInt('DOCKERHUB_REFRESH_INTERVAL_HOURS'), diff --git a/services/apps/packages_worker/src/dockerhub/index.ts b/services/apps/packages_worker/src/dockerhub/index.ts index c797103894..d176c94c42 100644 --- a/services/apps/packages_worker/src/dockerhub/index.ts +++ b/services/apps/packages_worker/src/dockerhub/index.ts @@ -11,6 +11,7 @@ import { getServiceChildLogger } from '@crowd/logging' import { getDockerhubConfig } from '../config' import { parseGithubUrl } from '../enricher/fetchLightRepo' +import { GithubAppConfig, getInstallationToken } from '../enricher/githubAppAuth' import { buildCandidates } from './candidates' import { detectDockerfile } from './detectDockerfile' @@ -187,7 +188,9 @@ async function discoverRepo( async function processDiscoveryPage( qx: QueryExecutor, rows: PendingDockerRepoRow[], - parkedUntil: Map, + installationIds: number[], + appConfig: GithubAppConfig, + parkedUntil: Map, config: DockerhubConfig, ): Promise<{ hits: number; misses: number; skipped: number; failed: number }> { let hits = 0 @@ -197,10 +200,13 @@ async function processDiscoveryPage( let nextIdx = 0 await Promise.all( - config.tokens.map(async (token, tokenIdx) => { - const initialPark = (parkedUntil.get(token) ?? 0) - Date.now() + installationIds.map(async (installationId) => { + const initialPark = (parkedUntil.get(installationId) ?? 0) - Date.now() if (initialPark > 0) { - log.warn(`token#${tokenIdx} still parked, waiting ${Math.round(initialPark / 1000)}s`) + log.warn( + { installationId }, + `installation still parked, waiting ${Math.round(initialPark / 1000)}s`, + ) await new Promise((r) => setTimeout(r, initialPark)) } @@ -215,6 +221,11 @@ async function processDiscoveryPage( let done = false while (!done) { try { + const token = await getInstallationToken( + appConfig.appId, + appConfig.privateKeyPem, + installationId, + ) const outcome = await discoverRepo(qx, row, token, config) if (outcome === 'hit') hits++ else if (outcome === 'miss') misses++ @@ -224,10 +235,10 @@ async function processDiscoveryPage( if (err instanceof FetchError && err.kind === 'RATE_LIMIT') { const resetAt = err.resetAt ?? Date.now() + 60_000 const waitMs = Math.max(1_000, resetAt - Date.now()) - parkedUntil.set(token, resetAt) + parkedUntil.set(installationId, resetAt) log.warn( - { tokenIdx, parkedUntil: new Date(resetAt).toISOString() }, - `token#${tokenIdx} rate limited — parking for ${Math.round(waitMs / 1000)}s`, + { installationId, parkedUntil: new Date(resetAt).toISOString() }, + `installation rate limited — parking for ${Math.round(waitMs / 1000)}s`, ) await new Promise((r) => setTimeout(r, waitMs)) // loop retries the same row @@ -247,10 +258,12 @@ async function processDiscoveryPage( export async function runDockerhubLoop( qx: QueryExecutor, + installationIds: number[], + appConfig: GithubAppConfig, config: DockerhubConfig, isShuttingDown: () => boolean, ): Promise { - const githubParkedUntil = new Map() + const githubParkedUntil = new Map() let refreshCursor: string | null = null let discoveryCursor: string | null = null let pageNum = 0 @@ -283,7 +296,14 @@ export async function runDockerhubLoop( config.discoveryIntervalDays, ) if (discoveryRows.length > 0) { - const stats = await processDiscoveryPage(qx, discoveryRows, githubParkedUntil, config) + const stats = await processDiscoveryPage( + qx, + discoveryRows, + installationIds, + appConfig, + githubParkedUntil, + config, + ) log.info( `Discovery page ${pageNum}: read=${discoveryRows.length} hits=${stats.hits} ` + `misses=${stats.misses} skipped=${stats.skipped} failed=${stats.failed}`, From cbf3f6c498efa94d4ee359df57c4217cd3b45649 Mon Sep 17 00:00:00 2001 From: Joan Reyero Date: Fri, 5 Jun 2026 15:24:05 +0100 Subject: [PATCH 6/8] fix: propagate GitHub AUTH errors fatally in dockerhub-sync (CM-1213) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit githubFetchWithRetries was returning null on AUTH (same bucket as NOT_FOUND), which caused discoverRepo to mark docker_checked_at and move on. With a bad installation token that would silently stamp every repo for DOCKERHUB_DISCOVERY_INTERVAL_DAYS. Now AUTH re-throws through processDiscoveryPage so the worker exits and restarts with a fresh resolveInstallations() — symmetric to the Hub AUTH path. Co-Authored-By: Claude Opus 4.8 Signed-off-by: Joan Reyero --- services/apps/packages_worker/src/dockerhub/index.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/services/apps/packages_worker/src/dockerhub/index.ts b/services/apps/packages_worker/src/dockerhub/index.ts index d176c94c42..01c1749ed6 100644 --- a/services/apps/packages_worker/src/dockerhub/index.ts +++ b/services/apps/packages_worker/src/dockerhub/index.ts @@ -95,11 +95,15 @@ async function githubFetchWithRetries( } catch (err) { if (!(err instanceof FetchError)) throw err - if (['NOT_FOUND', 'AUTH', 'MALFORMED'].includes(err.kind)) { + if (['NOT_FOUND', 'MALFORMED'].includes(err.kind)) { log.warn({ owner, name, kind: err.kind }, err.message) return null } + // AUTH means the installation token is bad (revoked / app misconfig). + // Propagate so the worker fails fast instead of marking every repo + // checked for DOCKERHUB_DISCOVERY_INTERVAL_DAYS — symmetric to Hub AUTH. + if (err.kind === 'AUTH') throw err if (err.kind === 'RATE_LIMIT') throw err if (attempt < MAX_RETRIES) { @@ -242,6 +246,10 @@ async function processDiscoveryPage( ) await new Promise((r) => setTimeout(r, waitMs)) // loop retries the same row + } else if (err instanceof FetchError && err.kind === 'AUTH') { + // Re-throw so runDockerhubLoop exits and the process restarts with + // a fresh resolveInstallations() — same fail-fast as Hub AUTH. + throw err } else { log.error({ url: row.url, err }, 'Unexpected discovery error') failed++ From a4528db8b5d4b4c72c3be399729933ef7d5008b0 Mon Sep 17 00:00:00 2001 From: Joan Reyero Date: Mon, 8 Jun 2026 15:29:43 +0100 Subject: [PATCH 7/8] fix: address themarolt review on dockerhub-sync (CM-1213) - Initial schema is now deployed; revert in-place edits to V1779710880 and move docker_checked_at / repos_docker_pending_idx / repo_docker_stale_idx / repo_docker_pulls_daily into a new migration V1780928852__dockerhub_sync (created via cli scaffold create-packages-migration). - processRefreshPage: wrap per-row Hub fetch + DB writes in try/catch so a transient DB failure logs and continues instead of crashing the worker. Hub AUTH still propagates fatally. Co-Authored-By: Claude Opus 4.8 Signed-off-by: Joan Reyero --- .../V1779710880__initial_schema.sql | 35 --------------- .../V1780928852__dockerhub_sync.sql | 43 +++++++++++++++++++ .../packages_worker/src/dockerhub/index.ts | 34 +++++++++------ 3 files changed, 65 insertions(+), 47 deletions(-) create mode 100644 backend/src/osspckgs/migrations/V1780928852__dockerhub_sync.sql diff --git a/backend/src/osspckgs/migrations/V1779710880__initial_schema.sql b/backend/src/osspckgs/migrations/V1779710880__initial_schema.sql index a8738189d2..d3b713c162 100644 --- a/backend/src/osspckgs/migrations/V1779710880__initial_schema.sql +++ b/backend/src/osspckgs/migrations/V1779710880__initial_schema.sql @@ -517,10 +517,6 @@ CREATE TABLE repos ( -- Scorecard aggregate; per-check detail in repo_scorecard_checks scorecard_score numeric(3, 1), scorecard_last_run_at timestamptz, - -- Last time dockerhub-sync probed this repo for a published Docker image (Dockerfile - -- detection + Hub candidate lookup). NULL = never checked. Separate from last_synced_at - -- because discovery cadence (weeks) differs from light-metadata refresh cadence (daily). - docker_checked_at timestamptz, -- Nullable with no default: multiple enrichers (deps.dev, GitHub worker, Scorecard) each write -- different columns at different times. NOT NULL DEFAULT would stamp a "synced" timestamp on -- first insert even when most columns are still NULL, making freshness checks misleading. @@ -535,13 +531,6 @@ CREATE INDEX ON repos (scorecard_score) WHERE scorecard_score IS NOT NULL; --- Partial index for the dockerhub-sync discovery backlog query: pages repos that have --- never been probed for a Docker image. Once docker_checked_at is set the row drops out --- of the index, so this stays small even as the repos table grows. -CREATE INDEX repos_docker_pending_idx ON repos (id) -WHERE - host = 'github' AND docker_checked_at IS NULL; - -- OpenSSF Scorecard per-check detail (~18 named checks) CREATE TABLE repo_scorecard_checks ( id bigserial PRIMARY KEY, @@ -568,30 +557,6 @@ CREATE INDEX ON repo_docker (repo_id) WHERE repo_id IS NOT NULL; --- Supports the dockerhub-sync refresh query (WHERE last_synced_at < NOW() - interval). -CREATE INDEX repo_docker_stale_idx ON repo_docker (last_synced_at); - --- ============================================================ --- REPO DOCKER PULLS DAILY --- One row per image per day storing the *lifetime* pull_count as returned --- by hub.docker.com/v2/repositories/. Docker Hub does not expose --- per-day download counts, so daily deltas are derived at query time: --- pulls_total - LAG(pulls_total) OVER (PARTITION BY image_name ORDER BY date) --- Keyed by image_name (matches repo_docker UNIQUE) so rows survive a --- repo_docker re-discovery without an FK cascade. --- --- Partitioned monthly via pg_partman — same setup as downloads_daily; add a --- partman.create_parent('public.repo_docker_pulls_daily', 'date', '1 month', 3) --- call alongside the downloads_daily registration. --- ============================================================ -CREATE TABLE repo_docker_pulls_daily ( - image_name text NOT NULL, - date date NOT NULL, - pulls_total bigint NOT NULL, - PRIMARY KEY (image_name, date) -) -PARTITION BY RANGE (date); - -- Package → repo provenance (monorepos publish N packages from one repo) CREATE TABLE package_repos ( id bigserial PRIMARY KEY, diff --git a/backend/src/osspckgs/migrations/V1780928852__dockerhub_sync.sql b/backend/src/osspckgs/migrations/V1780928852__dockerhub_sync.sql new file mode 100644 index 0000000000..4d3afc99ca --- /dev/null +++ b/backend/src/osspckgs/migrations/V1780928852__dockerhub_sync.sql @@ -0,0 +1,43 @@ +-- dockerhub-sync (CM-1213) +-- +-- Adds discovery/refresh bookkeeping for the dockerhub-sync worker +-- (services/apps/packages_worker/src/dockerhub) and a daily snapshot table +-- for Docker Hub lifetime pull counts. + +-- Last time dockerhub-sync probed this repo for a published Docker image +-- (Dockerfile detection + Hub candidate lookup). NULL = never checked. +-- Separate from repos.last_synced_at because discovery cadence (weeks) +-- differs from light-metadata refresh cadence (daily). +ALTER TABLE repos + ADD COLUMN IF NOT EXISTS docker_checked_at timestamptz; + +-- Partial index for the discovery backlog query: pages repos that have never +-- been probed for a Docker image. Once docker_checked_at is set the row drops +-- out of the index, so this stays small even as the repos table grows. +CREATE INDEX IF NOT EXISTS repos_docker_pending_idx ON repos (id) +WHERE + host = 'github' AND docker_checked_at IS NULL; + +-- Supports the refresh query (WHERE last_synced_at < NOW() - interval). +CREATE INDEX IF NOT EXISTS repo_docker_stale_idx ON repo_docker (last_synced_at); + +-- ============================================================ +-- REPO DOCKER PULLS DAILY +-- One row per image per day storing the *lifetime* pull_count as returned +-- by hub.docker.com/v2/repositories/. Docker Hub does not expose +-- per-day download counts, so daily deltas are derived at query time: +-- pulls_total - LAG(pulls_total) OVER (PARTITION BY image_name ORDER BY date) +-- Keyed by image_name (matches repo_docker UNIQUE) so rows survive a +-- repo_docker re-discovery without an FK cascade. +-- +-- Partitioned monthly via pg_partman — same setup as downloads_daily; add a +-- partman.create_parent('public.repo_docker_pulls_daily', 'date', '1 month', 3) +-- call alongside the downloads_daily registration. +-- ============================================================ +CREATE TABLE IF NOT EXISTS repo_docker_pulls_daily ( + image_name text NOT NULL, + date date NOT NULL, + pulls_total bigint NOT NULL, + PRIMARY KEY (image_name, date) +) +PARTITION BY RANGE (date); diff --git a/services/apps/packages_worker/src/dockerhub/index.ts b/services/apps/packages_worker/src/dockerhub/index.ts index 01c1749ed6..a221425fb8 100644 --- a/services/apps/packages_worker/src/dockerhub/index.ts +++ b/services/apps/packages_worker/src/dockerhub/index.ts @@ -123,24 +123,33 @@ async function processRefreshPage( qx: QueryExecutor, rows: StaleRepoDockerRow[], config: DockerhubConfig, -): Promise<{ ok: number; gone: number }> { +): Promise<{ ok: number; gone: number; failed: number }> { let ok = 0 let gone = 0 + let failed = 0 for (const row of rows) { - const result = await hubFetchWithRetries(config.hubBaseUrl, row.image_name) - if (result) { - await upsertRepoDocker(qx, row.repo_id, result) - ok++ - } else { - // Image deleted or unreachable — bump last_synced_at so we don't retry - // it every page; the row (and its pulls_daily history) is kept. - await touchRepoDocker(qx, row.image_name) - gone++ + try { + const result = await hubFetchWithRetries(config.hubBaseUrl, row.image_name) + if (result) { + await upsertRepoDocker(qx, row.repo_id, result) + ok++ + } else { + // Image deleted or unreachable — bump last_synced_at so we don't retry + // it every page; the row (and its pulls_daily history) is kept. + await touchRepoDocker(qx, row.image_name) + gone++ + } + } catch (err) { + // AUTH from Hub stays fatal — propagate so the worker exits instead of + // silently degrading every refresh into a miss. + if (err instanceof FetchError && err.kind === 'AUTH') throw err + log.error({ imageName: row.image_name, err }, 'Unexpected refresh error') + failed++ } } - return { ok, gone } + return { ok, gone, failed } } function resolveOwnerName(row: PendingDockerRepoRow): { owner: string; name: string } | null { @@ -288,7 +297,8 @@ export async function runDockerhubLoop( if (refreshRows.length > 0) { const stats = await processRefreshPage(qx, refreshRows, config) log.info( - `Refresh page ${pageNum}: read=${refreshRows.length} ok=${stats.ok} gone=${stats.gone}`, + `Refresh page ${pageNum}: read=${refreshRows.length} ok=${stats.ok} ` + + `gone=${stats.gone} failed=${stats.failed}`, ) refreshCursor = refreshRows[refreshRows.length - 1].id } else { From cf321490724c94e81c8943592736bbcd16666100 Mon Sep 17 00:00:00 2001 From: Joan Reyero Date: Mon, 8 Jun 2026 15:33:41 +0100 Subject: [PATCH 8/8] fix: address remaining bot review on dockerhub-sync (CM-1213) - Register repo_docker_pulls_daily with pg_partman in the migration (mirrors V1780231200 for downloads_daily; guarded against re-registration). - Normalize trailing slash on DOCKERHUB_API_BASE_URL to avoid // in the path. - Update stale 'per-token' comment to 'per-installation' after the App-auth switch. Co-Authored-By: Claude Opus 4.8 Signed-off-by: Joan Reyero --- .../V1780928852__dockerhub_sync.sql | 24 ++++++++++++++++--- .../__tests__/fetchDockerhub.test.ts | 6 +++++ .../src/dockerhub/fetchDockerhub.ts | 2 +- .../packages_worker/src/dockerhub/index.ts | 4 ++-- 4 files changed, 30 insertions(+), 6 deletions(-) diff --git a/backend/src/osspckgs/migrations/V1780928852__dockerhub_sync.sql b/backend/src/osspckgs/migrations/V1780928852__dockerhub_sync.sql index 4d3afc99ca..d17ecd7fc8 100644 --- a/backend/src/osspckgs/migrations/V1780928852__dockerhub_sync.sql +++ b/backend/src/osspckgs/migrations/V1780928852__dockerhub_sync.sql @@ -30,9 +30,8 @@ CREATE INDEX IF NOT EXISTS repo_docker_stale_idx ON repo_docker (last_synced_at) -- Keyed by image_name (matches repo_docker UNIQUE) so rows survive a -- repo_docker re-discovery without an FK cascade. -- --- Partitioned monthly via pg_partman — same setup as downloads_daily; add a --- partman.create_parent('public.repo_docker_pulls_daily', 'date', '1 month', 3) --- call alongside the downloads_daily registration. +-- Partitioned monthly via pg_partman (extension + schema already created in +-- V1780231200__npm_worker.sql). -- ============================================================ CREATE TABLE IF NOT EXISTS repo_docker_pulls_daily ( image_name text NOT NULL, @@ -41,3 +40,22 @@ CREATE TABLE IF NOT EXISTS repo_docker_pulls_daily ( PRIMARY KEY (image_name, date) ) PARTITION BY RANGE (date); + +-- Guard so this migration is idempotent against environments where the +-- table was already registered manually (e.g. local dev that applied the +-- earlier in-place schema edit). +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM partman.part_config + WHERE parent_table = 'public.repo_docker_pulls_daily' + ) THEN + PERFORM partman.create_parent( + p_parent_table => 'public.repo_docker_pulls_daily', + p_control => 'date', + p_interval => '1 month', + p_premake => 3 + ); + END IF; +END +$$; diff --git a/services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts b/services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts index 5b7ff4ac1c..d447d70027 100644 --- a/services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts +++ b/services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts @@ -43,6 +43,12 @@ describe('fetchDockerhub', () => { expect(spy).toHaveBeenCalledWith(`${BASE}/repositories/a/b/`, expect.anything()) }) + it('normalizes a trailing slash on the configured base URL', async () => { + const spy = mockFetch(200, { pull_count: 1, star_count: 0 }) + await fetchDockerhub(`${BASE}/`, 'a/b') + expect(spy).toHaveBeenCalledWith(`${BASE}/repositories/a/b/`, expect.anything()) + }) + it('classifies 404 as NOT_FOUND', async () => { mockFetch(404, { message: 'object not found' }) await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'NOT_FOUND' }) diff --git a/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts b/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts index 44584bd8a9..7cb7273cc3 100644 --- a/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts +++ b/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts @@ -14,7 +14,7 @@ export async function fetchDockerhub( baseUrl: string, imageName: string, ): Promise { - const url = `${baseUrl}/repositories/${imageName}/` + const url = `${baseUrl.replace(/\/+$/, '')}/repositories/${imageName}/` let response: Response try { diff --git a/services/apps/packages_worker/src/dockerhub/index.ts b/services/apps/packages_worker/src/dockerhub/index.ts index a221425fb8..5cf75bfa3c 100644 --- a/services/apps/packages_worker/src/dockerhub/index.ts +++ b/services/apps/packages_worker/src/dockerhub/index.ts @@ -27,8 +27,8 @@ type DockerhubConfig = ReturnType // Docker Hub's anonymous rate limit is per-IP, not per-token. hubParkedUntil // is module state so the park survives across refresh and discovery pages, and -// hubChain serializes calls so the per-token GitHub fan-out below can't fire -// concurrent Hub requests against that single per-IP budget. +// hubChain serializes calls so the per-installation GitHub fan-out below can't +// fire concurrent Hub requests against that single per-IP budget. let hubParkedUntil = 0 let hubChain: Promise = Promise.resolve()