diff --git a/backend/.env.dist.local b/backend/.env.dist.local index ef8bfc6101..f8e483d1df 100755 --- a/backend/.env.dist.local +++ b/backend/.env.dist.local @@ -199,3 +199,10 @@ OSV_ECOSYSTEMS=npm,Maven OSV_TMP_DIR=/tmp/osv OSV_BATCH_SIZE=500 OSV_DERIVE_BATCH_SIZE=1000 + +# dockerhub-sync (see services/apps/packages_worker/src/dockerhub/) +DOCKERHUB_API_BASE_URL=https://hub.docker.com/v2 +DOCKERHUB_BATCH_SIZE=100 +DOCKERHUB_REFRESH_INTERVAL_HOURS=24 +DOCKERHUB_DISCOVERY_INTERVAL_DAYS=14 +DOCKERHUB_IDLE_SLEEP_SEC=60 \ No newline at end of file diff --git a/backend/src/osspckgs/migrations/V1780928852__dockerhub_sync.sql b/backend/src/osspckgs/migrations/V1780928852__dockerhub_sync.sql new file mode 100644 index 0000000000..d17ecd7fc8 --- /dev/null +++ b/backend/src/osspckgs/migrations/V1780928852__dockerhub_sync.sql @@ -0,0 +1,61 @@ +-- dockerhub-sync (CM-1213) +-- +-- Adds discovery/refresh bookkeeping for the dockerhub-sync worker +-- (services/apps/packages_worker/src/dockerhub) and a daily snapshot table +-- for Docker Hub lifetime pull counts. + +-- Last time dockerhub-sync probed this repo for a published Docker image +-- (Dockerfile detection + Hub candidate lookup). NULL = never checked. +-- Separate from repos.last_synced_at because discovery cadence (weeks) +-- differs from light-metadata refresh cadence (daily). +ALTER TABLE repos + ADD COLUMN IF NOT EXISTS docker_checked_at timestamptz; + +-- Partial index for the discovery backlog query: pages repos that have never +-- been probed for a Docker image. Once docker_checked_at is set the row drops +-- out of the index, so this stays small even as the repos table grows. +CREATE INDEX IF NOT EXISTS repos_docker_pending_idx ON repos (id) +WHERE + host = 'github' AND docker_checked_at IS NULL; + +-- Supports the refresh query (WHERE last_synced_at < NOW() - interval). +CREATE INDEX IF NOT EXISTS repo_docker_stale_idx ON repo_docker (last_synced_at); + +-- ============================================================ +-- REPO DOCKER PULLS DAILY +-- One row per image per day storing the *lifetime* pull_count as returned +-- by hub.docker.com/v2/repositories/. Docker Hub does not expose +-- per-day download counts, so daily deltas are derived at query time: +-- pulls_total - LAG(pulls_total) OVER (PARTITION BY image_name ORDER BY date) +-- Keyed by image_name (matches repo_docker UNIQUE) so rows survive a +-- repo_docker re-discovery without an FK cascade. +-- +-- Partitioned monthly via pg_partman (extension + schema already created in +-- V1780231200__npm_worker.sql). +-- ============================================================ +CREATE TABLE IF NOT EXISTS repo_docker_pulls_daily ( + image_name text NOT NULL, + date date NOT NULL, + pulls_total bigint NOT NULL, + PRIMARY KEY (image_name, date) +) +PARTITION BY RANGE (date); + +-- Guard so this migration is idempotent against environments where the +-- table was already registered manually (e.g. local dev that applied the +-- earlier in-place schema edit). +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM partman.part_config + WHERE parent_table = 'public.repo_docker_pulls_daily' + ) THEN + PERFORM partman.create_parent( + p_parent_table => 'public.repo_docker_pulls_daily', + p_control => 'date', + p_interval => '1 month', + p_premake => 3 + ); + END IF; +END +$$; diff --git a/scripts/services/dockerhub-sync.yaml b/scripts/services/dockerhub-sync.yaml new file mode 100644 index 0000000000..2d67c5b22c --- /dev/null +++ b/scripts/services/dockerhub-sync.yaml @@ -0,0 +1,70 @@ +version: '3.1' + +x-env-args: &env-args + DOCKER_BUILDKIT: 1 + NODE_ENV: docker + SERVICE: dockerhub-sync + SHELL: /bin/sh + SUPPRESS_NO_CONFIG_WARNING: 'true' + DOCKERHUB_API_BASE_URL: 'https://hub.docker.com/v2' + DOCKERHUB_BATCH_SIZE: '100' + DOCKERHUB_REFRESH_INTERVAL_HOURS: '24' + DOCKERHUB_DISCOVERY_INTERVAL_DAYS: '14' + DOCKERHUB_IDLE_SLEEP_SEC: '60' + +services: + dockerhub-sync: + build: + context: ../../ + dockerfile: ./scripts/services/docker/Dockerfile.packages-worker + command: 'pnpm run start:dockerhub-sync' + working_dir: /usr/crowd/app/services/apps/packages_worker + env_file: + - ../../backend/.env.dist.local + - ../../backend/.env.dist.composed + - ../../backend/.env.override.local + - ../../backend/.env.override.composed + environment: + <<: *env-args + restart: always + networks: + - crowd-bridge + + dockerhub-sync-dev: + build: + context: ../../ + dockerfile: ./scripts/services/docker/Dockerfile.packages-worker + command: 'pnpm run dev:dockerhub-sync' + working_dir: /usr/crowd/app/services/apps/packages_worker + # user: '${USER_ID}:${GROUP_ID}' + env_file: + - ../../backend/.env.dist.local + - ../../backend/.env.dist.composed + - ../../backend/.env.override.local + - ../../backend/.env.override.composed + environment: + <<: *env-args + hostname: dockerhub-sync + networks: + - crowd-bridge + volumes: + - ../../services/libs/audit-logs/src:/usr/crowd/app/services/libs/audit-logs/src + - ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src + - ../../services/libs/common_services/src:/usr/crowd/app/services/libs/common_services/src + - ../../services/libs/data-access-layer/src:/usr/crowd/app/services/libs/data-access-layer/src + - ../../services/libs/database/src:/usr/crowd/app/services/libs/database/src + - ../../services/libs/integrations/src:/usr/crowd/app/services/libs/integrations/src + - ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src + - ../../services/libs/nango/src:/usr/crowd/app/services/libs/nango/src + - ../../services/libs/opensearch/src:/usr/crowd/app/services/libs/opensearch/src + - ../../services/libs/queue/src:/usr/crowd/app/services/libs/queue/src + - ../../services/libs/redis/src:/usr/crowd/app/services/libs/redis/src + - ../../services/libs/snowflake/src:/usr/crowd/app/services/libs/snowflake/src + - ../../services/libs/telemetry/src:/usr/crowd/app/services/libs/telemetry/src + - ../../services/libs/temporal/src:/usr/crowd/app/services/libs/temporal/src + - ../../services/libs/types/src:/usr/crowd/app/services/libs/types/src + - ../../services/apps/packages_worker/src:/usr/crowd/app/services/apps/packages_worker/src + +networks: + crowd-bridge: + external: true diff --git a/services/apps/packages_worker/package.json b/services/apps/packages_worker/package.json index 2c1f9c0190..ac6eaf72dc 100644 --- a/services/apps/packages_worker/package.json +++ b/services/apps/packages_worker/package.json @@ -20,6 +20,9 @@ "dev:deps-dev-ingest:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=deps-dev-ingest CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=deps-dev-ingest nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9235 src/bin/deps-dev-ingest.ts", "dev:npm-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=npm-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=npm-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9236 src/bin/npm-worker.ts", "dev:github-repos-enricher:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=github-repos-enricher LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9234 src/bin/github-repos-enricher.ts", + "start:dockerhub-sync": "SERVICE=dockerhub-sync tsx src/bin/dockerhub-sync.ts", + "dev:dockerhub-sync": "SERVICE=dockerhub-sync LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9238 src/bin/dockerhub-sync.ts", + "dev:dockerhub-sync:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=dockerhub-sync LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9238 src/bin/dockerhub-sync.ts", "export-to-bucket": "SERVICE=deps-dev-ingest tsx src/scripts/exportToBucket.ts", "export-to-bucket:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=deps-dev-ingest tsx src/scripts/exportToBucket.ts", "monitor:osspckgs": "SERVICE=monitor tsx src/scripts/monitorOsspckgs.ts", diff --git a/services/apps/packages_worker/src/bin/dockerhub-sync.ts b/services/apps/packages_worker/src/bin/dockerhub-sync.ts new file mode 100644 index 0000000000..e7aafa7a5a --- /dev/null +++ b/services/apps/packages_worker/src/bin/dockerhub-sync.ts @@ -0,0 +1,58 @@ +import { getServiceLogger } from '@crowd/logging' + +import { getDockerhubConfig, getGithubAppConfig } from '../config' +import { getPackagesDb } from '../db' +import { runDockerhubLoop } from '../dockerhub' +import { fetchRateLimitDiagnostics, resolveInstallations } from '../enricher/githubAppAuth' + +const log = getServiceLogger() + +let shuttingDown = false + +const shutdown = async () => { + if (shuttingDown) return + shuttingDown = true + log.info('Shutting down dockerhub-sync...') +} + +process.on('SIGINT', shutdown) +process.on('SIGTERM', shutdown) + +const main = async () => { + log.info('dockerhub-sync starting...') + + const config = getDockerhubConfig() + const appConfig = getGithubAppConfig() + + const installationIds = await resolveInstallations(appConfig) + + if (installationIds.length === 0) { + log.error('No GitHub App installations found — cannot build token pool') + process.exit(1) + } + + await fetchRateLimitDiagnostics(appConfig.appId, appConfig.privateKeyPem, installationIds) + + const qx = await getPackagesDb() + await qx.selectOne('SELECT 1') + log.info('Connected to packages-db.') + + log.info( + { + installations: installationIds.length, + batchSize: config.batchSize, + hubBaseUrl: config.hubBaseUrl, + }, + 'Starting dockerhub loop', + ) + + await runDockerhubLoop(qx, installationIds, appConfig, config, () => shuttingDown) + + log.info('dockerhub-sync stopped.') + process.exit(0) +} + +main().catch((err) => { + log.error({ err }, 'dockerhub-sync fatal error') + process.exit(1) +}) diff --git a/services/apps/packages_worker/src/config.ts b/services/apps/packages_worker/src/config.ts index 9c7cc7829b..68b5d230b9 100644 --- a/services/apps/packages_worker/src/config.ts +++ b/services/apps/packages_worker/src/config.ts @@ -45,3 +45,13 @@ export function getEnricherConfig() { fetchTimeoutMs: parseInt(process.env.ENRICHER_FETCH_TIMEOUT_MS ?? '10000', 10), } } + +export function getDockerhubConfig() { + return { + hubBaseUrl: requireEnv('DOCKERHUB_API_BASE_URL'), + batchSize: requireEnvInt('DOCKERHUB_BATCH_SIZE'), + refreshIntervalHours: requireEnvInt('DOCKERHUB_REFRESH_INTERVAL_HOURS'), + discoveryIntervalDays: requireEnvInt('DOCKERHUB_DISCOVERY_INTERVAL_DAYS'), + idleSleepSec: requireEnvInt('DOCKERHUB_IDLE_SLEEP_SEC'), + } +} diff --git a/services/apps/packages_worker/src/dockerhub/__tests__/candidates.test.ts b/services/apps/packages_worker/src/dockerhub/__tests__/candidates.test.ts new file mode 100644 index 0000000000..7a9db3fa3a --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/__tests__/candidates.test.ts @@ -0,0 +1,28 @@ +import { describe, expect, it } from 'vitest' + +import { buildCandidates } from '../candidates' + +describe('buildCandidates', () => { + it('lowercases owner and repo into a single / candidate', () => { + expect(buildCandidates('Grafana', 'Grafana')).toEqual(['grafana/grafana']) + }) + + it('passes through already-valid lowercase slugs', () => { + expect(buildCandidates('prometheus', 'node_exporter')).toEqual(['prometheus/node_exporter']) + }) + + it('rejects components with characters Docker Hub does not accept', () => { + // GitHub allows '+' in org names via renames; Hub would 400. + expect(buildCandidates('foo+bar', 'baz')).toEqual([]) + }) + + it('rejects components that start or end with a separator', () => { + expect(buildCandidates('-leading', 'repo')).toEqual([]) + expect(buildCandidates('owner', 'trailing.')).toEqual([]) + }) + + it('does not emit a library/ candidate', () => { + // Guard against accidental reintroduction — see comment in candidates.ts. + expect(buildCandidates('nodejs', 'node')).toEqual(['nodejs/node']) + }) +}) diff --git a/services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts b/services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts new file mode 100644 index 0000000000..d447d70027 --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/__tests__/fetchDockerhub.test.ts @@ -0,0 +1,111 @@ +import { afterEach, describe, expect, it, vi } from 'vitest' + +import { fetchDockerhub } from '../fetchDockerhub' +import { FetchError } from '../types' + +const BASE = 'https://hub.docker.com/v2' + +function mockFetch(status: number, body: unknown, headers: Record = {}) { + return vi.spyOn(globalThis, 'fetch').mockResolvedValue( + new Response(typeof body === 'string' ? body : JSON.stringify(body), { + status, + headers: { 'Content-Type': 'application/json', ...headers }, + }), + ) +} + +afterEach(() => { + vi.restoreAllMocks() +}) + +describe('fetchDockerhub', () => { + it('returns pull/star counts on 200', async () => { + mockFetch(200, { + name: 'grafana', + namespace: 'grafana', + pull_count: 12345, + star_count: 678, + last_updated: '2026-05-01T00:00:00Z', + }) + + const r = await fetchDockerhub(BASE, 'grafana/grafana') + expect(r).toEqual({ + imageName: 'grafana/grafana', + pulls: 12345, + stars: 678, + lastUpdated: '2026-05-01T00:00:00Z', + }) + }) + + it('appends trailing slash to the request URL', async () => { + const spy = mockFetch(200, { pull_count: 1, star_count: 0 }) + await fetchDockerhub(BASE, 'a/b') + expect(spy).toHaveBeenCalledWith(`${BASE}/repositories/a/b/`, expect.anything()) + }) + + it('normalizes a trailing slash on the configured base URL', async () => { + const spy = mockFetch(200, { pull_count: 1, star_count: 0 }) + await fetchDockerhub(`${BASE}/`, 'a/b') + expect(spy).toHaveBeenCalledWith(`${BASE}/repositories/a/b/`, expect.anything()) + }) + + it('classifies 404 as NOT_FOUND', async () => { + mockFetch(404, { message: 'object not found' }) + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'NOT_FOUND' }) + }) + + it('classifies 400 as NOT_FOUND (Hub 400s on malformed slugs)', async () => { + mockFetch(400, { message: 'bad request' }) + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'NOT_FOUND' }) + }) + + it('classifies 429 as RATE_LIMIT with resetAt from header', async () => { + const resetSec = Math.floor(Date.now() / 1000) + 120 + mockFetch(429, { message: 'too many' }, { 'x-ratelimit-reset': String(resetSec) }) + + expect.assertions(3) + try { + await fetchDockerhub(BASE, 'a/b') + } catch (err) { + expect(err).toBeInstanceOf(FetchError) + const fe = err as FetchError + expect(fe.kind).toBe('RATE_LIMIT') + expect(fe.resetAt).toBeGreaterThan(Date.now()) + } + }) + + it('does NOT discard a 200 response when x-ratelimit-remaining is 0', async () => { + // remaining=0 means this request consumed the last slot; the response itself + // is valid. The next call will 429 and park then. + mockFetch(200, { pull_count: 1, star_count: 0 }, { 'x-ratelimit-remaining': '0' }) + const r = await fetchDockerhub(BASE, 'a/b') + expect(r.pulls).toBe(1) + }) + + it('classifies 401/403 as AUTH so misconfig surfaces instead of looking like a miss', async () => { + mockFetch(401, { message: 'unauthorized' }) + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'AUTH' }) + mockFetch(403, { message: 'forbidden' }) + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'AUTH' }) + }) + + it('classifies 5xx as TRANSIENT', async () => { + mockFetch(503, 'Service Unavailable') + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'TRANSIENT' }) + }) + + it('classifies network failure as TRANSIENT', async () => { + vi.spyOn(globalThis, 'fetch').mockRejectedValue(new Error('ECONNRESET')) + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'TRANSIENT' }) + }) + + it('classifies non-JSON 200 body as MALFORMED', async () => { + mockFetch(200, 'not json') + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'MALFORMED' }) + }) + + it('classifies missing pull_count as MALFORMED', async () => { + mockFetch(200, { name: 'x', star_count: 0 }) + await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'MALFORMED' }) + }) +}) diff --git a/services/apps/packages_worker/src/dockerhub/candidates.ts b/services/apps/packages_worker/src/dockerhub/candidates.ts new file mode 100644 index 0000000000..eeaef0e954 --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/candidates.ts @@ -0,0 +1,19 @@ +// Docker Hub repository slugs are lowercase and limited to [a-z0-9._-]. +// GitHub allows uppercase and a few characters Hub rejects, so we lowercase +// and validate before probing — anything that fails the regex would 400 on +// Hub anyway and isn't worth an HTTP round-trip. +const HUB_COMPONENT = /^[a-z0-9](?:[a-z0-9._-]*[a-z0-9])?$/ + +export function buildCandidates(owner: string, name: string): string[] { + const ns = owner.toLowerCase() + const repo = name.toLowerCase() + + if (!HUB_COMPONENT.test(ns) || !HUB_COMPONENT.test(repo)) { + return [] + } + + // v1 deliberately omits `library/`: a random github.com/foo/node with a + // dev Dockerfile would false-positive onto the official library/node image. + // Official images (~150) to be seeded via allowlist in a follow-up. + return [`${ns}/${repo}`] +} diff --git a/services/apps/packages_worker/src/dockerhub/detectDockerfile.ts b/services/apps/packages_worker/src/dockerhub/detectDockerfile.ts new file mode 100644 index 0000000000..2c5d010cdf --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/detectDockerfile.ts @@ -0,0 +1,90 @@ +import { FetchError } from './types' + +const GRAPHQL_URL = 'https://api.github.com/graphql' + +// One GraphQL call probes three common Dockerfile locations. object(expression:) +// returns null when the path doesn't exist, so a single request answers "does +// this repo ship a Dockerfile anywhere we care about?" without listing the tree. +const DOCKERFILE_QUERY = ` + query($owner: String!, $name: String!) { + repository(owner: $owner, name: $name) { + d0: object(expression: "HEAD:Dockerfile") { oid } + d1: object(expression: "HEAD:docker/Dockerfile") { oid } + d2: object(expression: "HEAD:build/Dockerfile") { oid } + } + } +` + +export async function detectDockerfile( + owner: string, + name: string, + token: string, +): Promise { + let response: Response + try { + response = await fetch(GRAPHQL_URL, { + method: 'POST', + headers: { + Authorization: `bearer ${token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ query: DOCKERFILE_QUERY, variables: { owner, name } }), + signal: AbortSignal.timeout(30_000), + }) + } catch (err) { + throw new FetchError( + 'TRANSIENT', + `Network error for ${owner}/${name}: ${(err as Error).message}`, + ) + } + + const resetSec = parseInt(response.headers.get('x-ratelimit-reset') ?? '0', 10) + const resetMs = resetSec ? resetSec * 1000 + 5_000 : Date.now() + 65_000 + + if (response.status === 401) { + throw new FetchError('AUTH', `401 Unauthorized for ${owner}/${name}`) + } + + if (response.status === 403) { + const body = await response.text() + if (body.toLowerCase().includes('rate limit')) { + throw new FetchError('RATE_LIMIT', `Rate limited on ${owner}/${name}`, resetMs) + } + throw new FetchError('AUTH', `403 Forbidden for ${owner}/${name}`) + } + + if (response.status === 404) throw new FetchError('NOT_FOUND', `404 for ${owner}/${name}`) + if (response.status >= 500) { + throw new FetchError('TRANSIENT', `${response.status} for ${owner}/${name}`) + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let json: any + try { + json = await response.json() + } catch (err) { + throw new FetchError( + 'MALFORMED', + `Non-JSON body for ${owner}/${name}: ${(err as Error).message}`, + ) + } + + if (json.errors?.length) { + const err = json.errors[0] + if (err.type === 'RATE_LIMITED') { + throw new FetchError('RATE_LIMIT', `RATE_LIMITED for ${owner}/${name}`, resetMs) + } + if (err.type === 'NOT_FOUND') { + throw new FetchError('NOT_FOUND', `NOT_FOUND for ${owner}/${name}`) + } + throw new FetchError( + 'TRANSIENT', + `GraphQL error for ${owner}/${name}: ${err.message ?? err.type}`, + ) + } + + const repo = json.data?.repository + if (!repo) throw new FetchError('NOT_FOUND', `No repository data for ${owner}/${name}`) + + return Boolean(repo.d0 ?? repo.d1 ?? repo.d2) +} diff --git a/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts b/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts new file mode 100644 index 0000000000..7cb7273cc3 --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/fetchDockerhub.ts @@ -0,0 +1,69 @@ +import { DockerhubRepoResult, FetchError } from './types' + +interface HubResponse { + name?: string + namespace?: string + pull_count?: number + star_count?: number + last_updated?: string | null +} + +// imageName must be '/' (lowercase). Trailing slash on the URL +// is required — Hub returns a 301 to the slashed form otherwise. +export async function fetchDockerhub( + baseUrl: string, + imageName: string, +): Promise { + const url = `${baseUrl.replace(/\/+$/, '')}/repositories/${imageName}/` + + let response: Response + try { + response = await fetch(url, { + method: 'GET', + headers: { Accept: 'application/json' }, + // Hub calls are serialized via hubChain in the loop — a stalled socket + // would otherwise block every subsequent Hub probe indefinitely. + signal: AbortSignal.timeout(30_000), + }) + } catch (err) { + throw new FetchError('TRANSIENT', `Network error for ${imageName}: ${(err as Error).message}`) + } + + const resetSec = parseInt(response.headers.get('x-ratelimit-reset') ?? '0', 10) + const resetMs = resetSec ? resetSec * 1000 + 5_000 : Date.now() + 65_000 + + if (response.status === 429) { + throw new FetchError('RATE_LIMIT', `Rate limited on ${imageName}`, resetMs) + } + if (response.status === 401 || response.status === 403) { + // Surface as AUTH so the loop fails fast instead of silently marking every + // image "gone" when the base URL is misconfigured or Hub starts requiring auth. + throw new FetchError('AUTH', `${response.status} for ${imageName}`) + } + if (response.status === 404) throw new FetchError('NOT_FOUND', `404 for ${imageName}`) + if (response.status >= 500) { + throw new FetchError('TRANSIENT', `${response.status} for ${imageName}`) + } + if (!response.ok) { + // 400 etc — Hub sometimes 400s on malformed slugs; treat as a miss. + throw new FetchError('NOT_FOUND', `${response.status} for ${imageName}`) + } + + let json: HubResponse + try { + json = (await response.json()) as HubResponse + } catch (err) { + throw new FetchError('MALFORMED', `Non-JSON body for ${imageName}: ${(err as Error).message}`) + } + + if (typeof json.pull_count !== 'number') { + throw new FetchError('MALFORMED', `Missing pull_count for ${imageName}`) + } + + return { + imageName, + pulls: json.pull_count, + stars: json.star_count ?? 0, + lastUpdated: json.last_updated ?? null, + } +} diff --git a/services/apps/packages_worker/src/dockerhub/index.ts b/services/apps/packages_worker/src/dockerhub/index.ts new file mode 100644 index 0000000000..5cf75bfa3c --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/index.ts @@ -0,0 +1,339 @@ +import { + PendingDockerRepoRow, + StaleRepoDockerRow, + fetchPendingDockerRepos, + fetchStaleRepoDocker, + markRepoDockerChecked, + touchRepoDocker, +} from '@crowd/data-access-layer/src/packages' +import { QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' +import { getServiceChildLogger } from '@crowd/logging' + +import { getDockerhubConfig } from '../config' +import { parseGithubUrl } from '../enricher/fetchLightRepo' +import { GithubAppConfig, getInstallationToken } from '../enricher/githubAppAuth' + +import { buildCandidates } from './candidates' +import { detectDockerfile } from './detectDockerfile' +import { fetchDockerhub } from './fetchDockerhub' +import { DockerhubRepoResult, FetchError } from './types' +import { upsertRepoDocker } from './upsertRepoDocker' + +const log = getServiceChildLogger('dockerhub-sync') + +const MAX_RETRIES = 3 + +type DockerhubConfig = ReturnType + +// Docker Hub's anonymous rate limit is per-IP, not per-token. hubParkedUntil +// is module state so the park survives across refresh and discovery pages, and +// hubChain serializes calls so the per-installation GitHub fan-out below can't +// fire concurrent Hub requests against that single per-IP budget. +let hubParkedUntil = 0 +let hubChain: Promise = Promise.resolve() + +function hubFetchWithRetries( + baseUrl: string, + imageName: string, +): Promise { + const run = hubChain.then(() => hubFetchInner(baseUrl, imageName)) + hubChain = run.catch(() => undefined) + return run +} + +async function hubFetchInner( + baseUrl: string, + imageName: string, +): Promise { + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + const wait = hubParkedUntil - Date.now() + if (wait > 0) { + log.warn({ waitSec: Math.round(wait / 1000) }, 'Docker Hub parked, waiting') + await new Promise((r) => setTimeout(r, wait)) + } + try { + return await fetchDockerhub(baseUrl, imageName) + } catch (err) { + if (!(err instanceof FetchError)) throw err + + if (err.kind === 'NOT_FOUND') return null + if (err.kind === 'AUTH') { + // Systemic misconfig (wrong base URL, Hub now requires auth) — propagate + // so the worker exits instead of silently marking every image gone. + log.error({ imageName }, err.message) + throw err + } + if (err.kind === 'MALFORMED') { + log.warn({ imageName }, err.message) + return null + } + if (err.kind === 'RATE_LIMIT') { + hubParkedUntil = err.resetAt ?? Date.now() + 60_000 + continue + } + if (attempt < MAX_RETRIES) { + const backoffMs = 1000 * 2 ** attempt + log.warn({ imageName, attempt, backoffMs }, `Transient Hub error, retrying: ${err.message}`) + await new Promise((r) => setTimeout(r, backoffMs)) + } else { + log.error({ imageName }, `Gave up after ${MAX_RETRIES} retries: ${err.message}`) + return null + } + } + } + return null +} + +async function githubFetchWithRetries( + owner: string, + name: string, + token: string, +): Promise { + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + try { + return await detectDockerfile(owner, name, token) + } catch (err) { + if (!(err instanceof FetchError)) throw err + + if (['NOT_FOUND', 'MALFORMED'].includes(err.kind)) { + log.warn({ owner, name, kind: err.kind }, err.message) + return null + } + + // AUTH means the installation token is bad (revoked / app misconfig). + // Propagate so the worker fails fast instead of marking every repo + // checked for DOCKERHUB_DISCOVERY_INTERVAL_DAYS — symmetric to Hub AUTH. + if (err.kind === 'AUTH') throw err + if (err.kind === 'RATE_LIMIT') throw err + + if (attempt < MAX_RETRIES) { + const backoffMs = 1000 * 2 ** attempt + log.warn({ owner, name, attempt, backoffMs }, `Transient error, retrying: ${err.message}`) + await new Promise((r) => setTimeout(r, backoffMs)) + } else { + log.error({ owner, name }, `Gave up after ${MAX_RETRIES} retries: ${err.message}`) + return null + } + } + } + return null +} + +async function processRefreshPage( + qx: QueryExecutor, + rows: StaleRepoDockerRow[], + config: DockerhubConfig, +): Promise<{ ok: number; gone: number; failed: number }> { + let ok = 0 + let gone = 0 + let failed = 0 + + for (const row of rows) { + try { + const result = await hubFetchWithRetries(config.hubBaseUrl, row.image_name) + if (result) { + await upsertRepoDocker(qx, row.repo_id, result) + ok++ + } else { + // Image deleted or unreachable — bump last_synced_at so we don't retry + // it every page; the row (and its pulls_daily history) is kept. + await touchRepoDocker(qx, row.image_name) + gone++ + } + } catch (err) { + // AUTH from Hub stays fatal — propagate so the worker exits instead of + // silently degrading every refresh into a miss. + if (err instanceof FetchError && err.kind === 'AUTH') throw err + log.error({ imageName: row.image_name, err }, 'Unexpected refresh error') + failed++ + } + } + + return { ok, gone, failed } +} + +function resolveOwnerName(row: PendingDockerRepoRow): { owner: string; name: string } | null { + if (row.owner && row.name) return { owner: row.owner, name: row.name } + try { + return parseGithubUrl(row.url) + } catch { + return null + } +} + +async function discoverRepo( + qx: QueryExecutor, + row: PendingDockerRepoRow, + token: string, + config: DockerhubConfig, +): Promise<'hit' | 'miss' | 'skip'> { + const parsed = resolveOwnerName(row) + if (!parsed) { + await markRepoDockerChecked(qx, row.id) + return 'skip' + } + + const hasDockerfile = await githubFetchWithRetries(parsed.owner, parsed.name, token) + if (hasDockerfile === null) { + // GitHub lookup failed (404/auth/gave-up). Mark checked so the backlog + // drains; the discoveryIntervalDays re-check will try again later. + await markRepoDockerChecked(qx, row.id) + return 'skip' + } + if (!hasDockerfile) { + await markRepoDockerChecked(qx, row.id) + return 'miss' + } + + for (const candidate of buildCandidates(parsed.owner, parsed.name)) { + const result = await hubFetchWithRetries(config.hubBaseUrl, candidate) + if (result) { + await upsertRepoDocker(qx, row.id, result) + await markRepoDockerChecked(qx, row.id) + return 'hit' + } + } + + await markRepoDockerChecked(qx, row.id) + return 'miss' +} + +async function processDiscoveryPage( + qx: QueryExecutor, + rows: PendingDockerRepoRow[], + installationIds: number[], + appConfig: GithubAppConfig, + parkedUntil: Map, + config: DockerhubConfig, +): Promise<{ hits: number; misses: number; skipped: number; failed: number }> { + let hits = 0 + let misses = 0 + let skipped = 0 + let failed = 0 + let nextIdx = 0 + + await Promise.all( + installationIds.map(async (installationId) => { + const initialPark = (parkedUntil.get(installationId) ?? 0) - Date.now() + if (initialPark > 0) { + log.warn( + { installationId }, + `installation still parked, waiting ${Math.round(initialPark / 1000)}s`, + ) + await new Promise((r) => setTimeout(r, initialPark)) + } + + while (nextIdx < rows.length) { + const idx = nextIdx++ + const row = rows[idx] + + // Retry the same row after a GitHub rate-limit park instead of abandoning + // it. Without this the cursor advances past a row whose docker_checked_at + // was never set, and it isn't picked up again until the cursor wraps to + // null at end-of-backlog (potentially days on a 600k-row sweep). + let done = false + while (!done) { + try { + const token = await getInstallationToken( + appConfig.appId, + appConfig.privateKeyPem, + installationId, + ) + const outcome = await discoverRepo(qx, row, token, config) + if (outcome === 'hit') hits++ + else if (outcome === 'miss') misses++ + else skipped++ + done = true + } catch (err) { + if (err instanceof FetchError && err.kind === 'RATE_LIMIT') { + const resetAt = err.resetAt ?? Date.now() + 60_000 + const waitMs = Math.max(1_000, resetAt - Date.now()) + parkedUntil.set(installationId, resetAt) + log.warn( + { installationId, parkedUntil: new Date(resetAt).toISOString() }, + `installation rate limited — parking for ${Math.round(waitMs / 1000)}s`, + ) + await new Promise((r) => setTimeout(r, waitMs)) + // loop retries the same row + } else if (err instanceof FetchError && err.kind === 'AUTH') { + // Re-throw so runDockerhubLoop exits and the process restarts with + // a fresh resolveInstallations() — same fail-fast as Hub AUTH. + throw err + } else { + log.error({ url: row.url, err }, 'Unexpected discovery error') + failed++ + done = true + } + } + } + } + }), + ) + + return { hits, misses, skipped, failed } +} + +export async function runDockerhubLoop( + qx: QueryExecutor, + installationIds: number[], + appConfig: GithubAppConfig, + config: DockerhubConfig, + isShuttingDown: () => boolean, +): Promise { + const githubParkedUntil = new Map() + let refreshCursor: string | null = null + let discoveryCursor: string | null = null + let pageNum = 0 + + while (!isShuttingDown()) { + pageNum++ + + const refreshRows = await fetchStaleRepoDocker( + qx, + refreshCursor, + config.batchSize, + config.refreshIntervalHours, + ) + if (refreshRows.length > 0) { + const stats = await processRefreshPage(qx, refreshRows, config) + log.info( + `Refresh page ${pageNum}: read=${refreshRows.length} ok=${stats.ok} ` + + `gone=${stats.gone} failed=${stats.failed}`, + ) + refreshCursor = refreshRows[refreshRows.length - 1].id + } else { + refreshCursor = null + } + + if (isShuttingDown()) break + + const discoveryRows = await fetchPendingDockerRepos( + qx, + discoveryCursor, + config.batchSize, + config.discoveryIntervalDays, + ) + if (discoveryRows.length > 0) { + const stats = await processDiscoveryPage( + qx, + discoveryRows, + installationIds, + appConfig, + githubParkedUntil, + config, + ) + log.info( + `Discovery page ${pageNum}: read=${discoveryRows.length} hits=${stats.hits} ` + + `misses=${stats.misses} skipped=${stats.skipped} failed=${stats.failed}`, + ) + discoveryCursor = discoveryRows[discoveryRows.length - 1].id + } else { + discoveryCursor = null + } + + if (refreshRows.length === 0 && discoveryRows.length === 0) { + log.info('No work, sleeping') + await new Promise((r) => setTimeout(r, config.idleSleepSec * 1000)) + } + } +} diff --git a/services/apps/packages_worker/src/dockerhub/types.ts b/services/apps/packages_worker/src/dockerhub/types.ts new file mode 100644 index 0000000000..4ae67cdef9 --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/types.ts @@ -0,0 +1,19 @@ +export interface DockerhubRepoResult { + imageName: string // '/', lowercase + pulls: number + stars: number + lastUpdated: string | null +} + +export type FetchErrorKind = 'RATE_LIMIT' | 'TRANSIENT' | 'NOT_FOUND' | 'AUTH' | 'MALFORMED' + +export class FetchError extends Error { + constructor( + public readonly kind: FetchErrorKind, + message: string, + public readonly resetAt?: number, + ) { + super(message) + this.name = 'FetchError' + } +} diff --git a/services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts b/services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts new file mode 100644 index 0000000000..d9cff13773 --- /dev/null +++ b/services/apps/packages_worker/src/dockerhub/upsertRepoDocker.ts @@ -0,0 +1,26 @@ +import { + upsertRepoDockerDailySnapshot, + upsertRepoDockerRow, +} from '@crowd/data-access-layer/src/packages' +import { QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' + +import { DockerhubRepoResult } from './types' + +export async function upsertRepoDocker( + qx: QueryExecutor, + repoId: string | null, + r: DockerhubRepoResult, +): Promise { + // Transactional so a missing partition on repo_docker_pulls_daily (or any other + // failure on the snapshot insert) rolls back the last_synced_at bump on repo_docker + // too — otherwise the row drops out of the refresh queue for 24h with no snapshot. + await qx.tx(async (tx) => { + await upsertRepoDockerRow(tx, { + repoId, + imageName: r.imageName, + pulls: r.pulls, + stars: r.stars, + }) + await upsertRepoDockerDailySnapshot(tx, r.imageName, r.pulls) + }) +} diff --git a/services/libs/data-access-layer/src/packages/index.ts b/services/libs/data-access-layer/src/packages/index.ts index a890e0e418..ed3118bb19 100644 --- a/services/libs/data-access-layer/src/packages/index.ts +++ b/services/libs/data-access-layer/src/packages/index.ts @@ -9,3 +9,4 @@ export * from './packages' export * from './repos' export * from './versions' export * from './osv' +export * from './repoDocker' diff --git a/services/libs/data-access-layer/src/packages/repoDocker.ts b/services/libs/data-access-layer/src/packages/repoDocker.ts new file mode 100644 index 0000000000..5fe7fca60c --- /dev/null +++ b/services/libs/data-access-layer/src/packages/repoDocker.ts @@ -0,0 +1,126 @@ +import { QueryExecutor } from '../queryExecutor' + +// Database access layer for the dockerhub-sync worker. All repo_docker / +// repo_docker_pulls_daily / repos.docker_checked_at queries used by +// services/apps/packages_worker/src/dockerhub live here; the worker keeps the +// HTTP clients, rate-limit handling, and loop orchestration. +// +// Tables are defined in backend/src/osspckgs/migrations/V1779710880__initial_schema.sql. + +// ---- query result shapes ---- + +export interface StaleRepoDockerRow { + id: string + repo_id: string | null + image_name: string +} + +export interface PendingDockerRepoRow { + id: string + url: string + owner: string | null + name: string | null +} + +// ---- input shapes ---- + +export interface RepoDockerUpsertInput { + repoId: string | null + imageName: string + pulls: number + stars: number +} + +// ---- reads ---- + +// repo_docker rows whose last_synced_at is older than the refresh interval, +// keyset-paginated by id. Backed by repo_docker_stale_idx. +export async function fetchStaleRepoDocker( + qx: QueryExecutor, + cursor: string | null, + batchSize: number, + refreshIntervalHours: number, +): Promise { + return qx.select( + ` + SELECT id, repo_id, image_name + FROM repo_docker + WHERE last_synced_at < NOW() - make_interval(hours => $(refreshIntervalHours)) + AND ($(cursor)::bigint IS NULL OR id > $(cursor)::bigint) + ORDER BY id + LIMIT $(batchSize) + `, + { cursor, batchSize, refreshIntervalHours }, + ) +} + +// GitHub repos that have never been probed for a Docker image (or whose probe +// is older than discoveryIntervalDays), keyset-paginated by id. The +// docker_checked_at IS NULL arm is backed by repos_docker_pending_idx. +export async function fetchPendingDockerRepos( + qx: QueryExecutor, + cursor: string | null, + batchSize: number, + discoveryIntervalDays: number, +): Promise { + return qx.select( + ` + SELECT id, url, owner, name + FROM repos + WHERE host = 'github' + AND (docker_checked_at IS NULL + OR docker_checked_at < NOW() - make_interval(days => $(discoveryIntervalDays))) + AND ($(cursor)::bigint IS NULL OR id > $(cursor)::bigint) + ORDER BY id + LIMIT $(batchSize) + `, + { cursor, batchSize, discoveryIntervalDays }, + ) +} + +// ---- writes ---- + +export async function upsertRepoDockerRow( + qx: QueryExecutor, + input: RepoDockerUpsertInput, +): Promise { + await qx.result( + ` + INSERT INTO repo_docker (repo_id, image_name, pulls, stars, last_synced_at) + VALUES ($(repoId), $(imageName), $(pulls), $(stars), NOW()) + ON CONFLICT (image_name) DO UPDATE SET + repo_id = COALESCE(repo_docker.repo_id, EXCLUDED.repo_id), + pulls = EXCLUDED.pulls, + stars = EXCLUDED.stars, + last_synced_at = NOW() + `, + input, + ) +} + +export async function upsertRepoDockerDailySnapshot( + qx: QueryExecutor, + imageName: string, + pullsTotal: number, +): Promise { + await qx.result( + ` + INSERT INTO repo_docker_pulls_daily (image_name, date, pulls_total) + VALUES ($(imageName), CURRENT_DATE, $(pullsTotal)) + ON CONFLICT (image_name, date) DO UPDATE SET pulls_total = EXCLUDED.pulls_total + `, + { imageName, pullsTotal }, + ) +} + +// Bumps last_synced_at without changing pull/star data. Used when an image +// 404s on refresh so it drops out of the stale-queue without losing history. +export async function touchRepoDocker(qx: QueryExecutor, imageName: string): Promise { + await qx.result(`UPDATE repo_docker SET last_synced_at = NOW() WHERE image_name = $(imageName)`, { + imageName, + }) +} + +export async function markRepoDockerChecked(qx: QueryExecutor, repoId: string): Promise { + await qx.result(`UPDATE repos SET docker_checked_at = NOW() WHERE id = $(repoId)`, { repoId }) +}