From bf26e78d75dde9ed2f4fb9dfba106f177251691c Mon Sep 17 00:00:00 2001 From: anilb Date: Tue, 9 Jun 2026 16:07:18 +0200 Subject: [PATCH 1/2] chore: consolidate npm package state Signed-off-by: anilb --- ...0800__merge_30d_into_npm_package_state.sql | 25 +++++++++++++++++++ .../packages_worker/src/npm/activities.ts | 8 +++--- .../src/packages/downloadsDaily.ts | 4 +++ .../src/packages/downloadsLast30d.ts | 21 ++++++++-------- .../src/packages/npmPackageState.ts | 16 ++++++++---- 5 files changed, 54 insertions(+), 20 deletions(-) create mode 100644 backend/src/osspckgs/migrations/V1781020800__merge_30d_into_npm_package_state.sql diff --git a/backend/src/osspckgs/migrations/V1781020800__merge_30d_into_npm_package_state.sql b/backend/src/osspckgs/migrations/V1781020800__merge_30d_into_npm_package_state.sql new file mode 100644 index 0000000000..9f65fa2599 --- /dev/null +++ b/backend/src/osspckgs/migrations/V1781020800__merge_30d_into_npm_package_state.sql @@ -0,0 +1,25 @@ +-- ── 1. Add the 30d watermark columns to npm_package_state ────────────────────── +ALTER TABLE npm_package_state + ADD COLUMN IF NOT EXISTS downloads_30d_last_run_at timestamptz, -- breadth watermark: latest 30d window refreshed + ADD COLUMN IF NOT EXISTS downloads_30d_history_backfilled_at timestamptz, -- depth watermark: NULL until full older history filled + ADD COLUMN IF NOT EXISTS downloads_30d_run_result jsonb; -- { status, httpStatus?, errorKind?, message? } + +-- Recreate the two indexes the old table had — both due-selection queries +-- filter/order on these columns. +CREATE INDEX IF NOT EXISTS npm_package_state_downloads_30d_last_run_at_idx + ON npm_package_state (downloads_30d_last_run_at); +CREATE INDEX IF NOT EXISTS npm_package_state_downloads_30d_history_backfilled_at_idx + ON npm_package_state (downloads_30d_history_backfilled_at); + +-- ── 2. Migrate existing rows ─────────────────────────────────────────────────── +INSERT INTO npm_package_state + (purl, downloads_30d_last_run_at, downloads_30d_history_backfilled_at, downloads_30d_run_result) +SELECT purl, downloads_30d_last_run_at, downloads_30d_history_backfilled_at, downloads_30d_run_result + FROM npm_package_universe_state +ON CONFLICT (purl) DO UPDATE SET + downloads_30d_last_run_at = EXCLUDED.downloads_30d_last_run_at, + downloads_30d_history_backfilled_at = EXCLUDED.downloads_30d_history_backfilled_at, + downloads_30d_run_result = EXCLUDED.downloads_30d_run_result; + +-- ── 3. Drop the retired table ────────────────────────────────────────────────── +DROP TABLE npm_package_universe_state; diff --git a/services/apps/packages_worker/src/npm/activities.ts b/services/apps/packages_worker/src/npm/activities.ts index 3f3dc6a405..0149bd14cc 100644 --- a/services/apps/packages_worker/src/npm/activities.ts +++ b/services/apps/packages_worker/src/npm/activities.ts @@ -8,9 +8,9 @@ import { getMissingDownloadDates, getNpmChangesLastSeq, getNpmPackagesNeedingDailyBackfill, + getNpmPurlsDueForLast30dHistory, + getNpmPurlsDueForLatest30d, getNpmPurlsForChangedNames, - getNpmUniversePurlsDueForLast30dHistory, - getNpmUniversePurlsDueForLatest30d, getUnscannedNpmPurls, insertDailyDownloads, logAuditFieldChanges, @@ -507,7 +507,7 @@ export async function refreshLatestLast30dLane( lanes: number, ): Promise<{ fetched: number }> { const qx = await getPackagesDb() - const due = await getNpmUniversePurlsDueForLatest30d(qx, cutoff, batchSize, laneIndex, lanes) + const due = await getNpmPurlsDueForLatest30d(qx, cutoff, batchSize, laneIndex, lanes) if (due.length === 0) return { fetched: 0 } const window = latestLast30dWindow(utcFirstOfCurrentMonth()) @@ -554,7 +554,7 @@ export async function backfillLast30dHistoryLane( lanes: number, ): Promise<{ fetched: number }> { const qx = await getPackagesDb() - const due = await getNpmUniversePurlsDueForLast30dHistory(qx, batchSize, laneIndex, lanes) + const due = await getNpmPurlsDueForLast30dHistory(qx, batchSize, laneIndex, lanes) if (due.length === 0) return { fetched: 0 } const latestEnd = utcFirstOfCurrentMonth() diff --git a/services/libs/data-access-layer/src/packages/downloadsDaily.ts b/services/libs/data-access-layer/src/packages/downloadsDaily.ts index d2d932763f..d7fcee4cda 100644 --- a/services/libs/data-access-layer/src/packages/downloadsDaily.ts +++ b/services/libs/data-access-layer/src/packages/downloadsDaily.ts @@ -30,6 +30,9 @@ export interface DailyBackfillCandidate { // `laneIndex`/`laneCount` shard the due set across concurrent lanes by a stable hash // of the purl, so each lane drains a disjoint slice (laneCount=1 ⇒ no sharding). +// Restricted to is_critical packages — daily downloads are deep per-package history, +// scoped to the critical set (matching the metadata pass). The 30d download passes run +// over all npm packages, since their counts feed the criticality ranking. export async function getNpmPackagesNeedingDailyBackfill( qx: QueryExecutor, cutoff: string, @@ -49,6 +52,7 @@ export async function getNpmPackagesNeedingDailyBackfill( FROM packages p LEFT JOIN npm_package_state s ON s.purl = p.purl WHERE p.ecosystem = 'npm' + AND p.is_critical = TRUE AND (((hashtext(p.purl) % $(laneCount)) + $(laneCount)) % $(laneCount)) = $(laneIndex) AND (s.daily_downloads_last_processed_at IS NULL OR s.daily_downloads_last_processed_at < $(cutoff)::timestamptz) diff --git a/services/libs/data-access-layer/src/packages/downloadsLast30d.ts b/services/libs/data-access-layer/src/packages/downloadsLast30d.ts index 98512cad08..8ac1e3e718 100644 --- a/services/libs/data-access-layer/src/packages/downloadsLast30d.ts +++ b/services/libs/data-access-layer/src/packages/downloadsLast30d.ts @@ -25,10 +25,10 @@ export interface Last30dCandidate { // BREADTH selection. A purl is "due for the latest window" while its breadth watermark // (`downloads_30d_last_run_at`) is older than this run's cutoff (or absent). The watermark // is bumped once its current 30-day window is refreshed, so the monthly run touches every -// package's latest window exactly once and the denormalized number lands across the whole -// universe before any deep history is filled. Older history is a separate pass keyed on -// `downloads_30d_history_backfilled_at` — see getNpmUniversePurlsDueForLast30dHistory. -export async function getNpmUniversePurlsDueForLatest30d( +// package's latest window exactly once and the denormalized number lands across all npm +// packages before any deep history is filled. Older history is a separate pass keyed on +// `downloads_30d_history_backfilled_at` — see getNpmPurlsDueForLast30dHistory. +export async function getNpmPurlsDueForLatest30d( qx: QueryExecutor, cutoff: string, batchSize: number, @@ -39,7 +39,7 @@ export async function getNpmUniversePurlsDueForLatest30d( `WITH due AS ( SELECT p.purl AS purl, p.first_release_at FROM packages p - LEFT JOIN npm_package_universe_state s ON s.purl = p.purl + LEFT JOIN npm_package_state s ON s.purl = p.purl WHERE p.ecosystem = 'npm' AND (((hashtext(p.purl) % $(laneCount)) + $(laneCount)) % $(laneCount)) = $(laneIndex) AND (s.downloads_30d_last_run_at IS NULL @@ -61,7 +61,7 @@ export async function getNpmUniversePurlsDueForLatest30d( // not yet been filled (`downloads_30d_history_backfilled_at IS NULL`). This keeps the work // strictly breadth-first per package: history is never fetched before the latest window. // Sharded the same way; oldest-breadth-first so the longest-waiting packages drain first. -export async function getNpmUniversePurlsDueForLast30dHistory( +export async function getNpmPurlsDueForLast30dHistory( qx: QueryExecutor, batchSize: number, laneIndex: number, @@ -71,9 +71,8 @@ export async function getNpmUniversePurlsDueForLast30dHistory( `WITH due AS ( SELECT p.purl AS purl, p.first_release_at, s.downloads_30d_last_run_at AS last_run_at FROM packages p - JOIN npm_package_universe_state s ON s.purl = p.purl + JOIN npm_package_state s ON s.purl = p.purl WHERE p.ecosystem = 'npm' - AND p.is_critical = TRUE AND (((hashtext(p.purl) % $(laneCount)) + $(laneCount)) % $(laneCount)) = $(laneIndex) AND s.downloads_30d_last_run_at IS NOT NULL AND s.downloads_30d_history_backfilled_at IS NULL @@ -90,7 +89,7 @@ export async function getNpmUniversePurlsDueForLast30dHistory( } // Structured outcome of a last-30d run, stored as JSONB in -// npm_package_universe_state.downloads_30d_run_result. +// npm_package_state.downloads_30d_run_result. export interface Last30dRunResult { status: 'success' | 'error' httpStatus?: number @@ -107,7 +106,7 @@ export async function markLast30dProcessed( result: Last30dRunResult, ): Promise { await qx.result( - `INSERT INTO npm_package_universe_state (purl, downloads_30d_last_run_at, downloads_30d_run_result) + `INSERT INTO npm_package_state (purl, downloads_30d_last_run_at, downloads_30d_run_result) VALUES ($(purl), NOW(), $(result)::jsonb) ON CONFLICT (purl) DO UPDATE SET downloads_30d_last_run_at = NOW(), @@ -126,7 +125,7 @@ export async function markLast30dHistoryBackfilled( result: Last30dRunResult, ): Promise { await qx.result( - `UPDATE npm_package_universe_state + `UPDATE npm_package_state SET downloads_30d_history_backfilled_at = NOW(), downloads_30d_run_result = $(result)::jsonb WHERE purl = $(purl)`, diff --git a/services/libs/data-access-layer/src/packages/npmPackageState.ts b/services/libs/data-access-layer/src/packages/npmPackageState.ts index 7d017d709e..b24e992dfa 100644 --- a/services/libs/data-access-layer/src/packages/npmPackageState.ts +++ b/services/libs/data-access-layer/src/packages/npmPackageState.ts @@ -28,7 +28,9 @@ export async function markNpmPackageScanned( ) } -// npm packages in the `packages` table that have never been metadata-scanned. +// Critical npm packages in the `packages` table that have never been metadata-scanned. +// Only is_critical packages are enriched — metadata is deep, per-package work, so it is +// scoped to the critical set (matching the daily-downloads pass). // Keyset-paginated on purl so the workflow can drain a large first run across // many continueAsNew runs. export async function getUnscannedNpmPurls( @@ -41,6 +43,7 @@ export async function getUnscannedNpmPurls( FROM packages p LEFT JOIN npm_package_state s ON s.purl = p.purl WHERE p.ecosystem = 'npm' + AND p.is_critical = TRUE AND p.purl > $(afterPurl) AND s.purl IS NULL ORDER BY p.purl @@ -51,9 +54,11 @@ export async function getUnscannedNpmPurls( } // Given a list of changed npm registry names (from the _changes feed), return the -// purls of those that exist as npm rows in `packages`. The purl is read straight -// from the row; feed names are matched against the decoded namespace/name columns -// (the purl is percent-encoded, so substr(purl) would be %40scope/name). +// purls of the critical ones that exist as npm rows in `packages`. The purl is read +// straight from the row; feed names are matched against the decoded namespace/name +// columns (the purl is percent-encoded, so substr(purl) would be %40scope/name). +// Restricted to is_critical packages — non-critical packages are never metadata-scanned, +// so there is nothing to keep fresh when they change. export async function getNpmPurlsForChangedNames( qx: QueryExecutor, names: string[], @@ -64,7 +69,8 @@ export async function getNpmPurlsForChangedNames( FROM packages p JOIN unnest($(names)::text[]) AS u(name) ON (CASE WHEN p.namespace IS NOT NULL THEN p.namespace || '/' || p.name ELSE p.name END) = u.name - WHERE p.ecosystem = 'npm'`, + WHERE p.ecosystem = 'npm' + AND p.is_critical = TRUE`, { names }, ) return rows.map((r) => r.purl) From 0db19e468ce6cfc4947e0b9be4d454b7d4d13800 Mon Sep 17 00:00:00 2001 From: anilb Date: Tue, 9 Jun 2026 16:25:49 +0200 Subject: [PATCH 2/2] fix: drain unscanned npm metadata by watermark Signed-off-by: anilb --- .../data-access-layer/src/packages/npmPackageState.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/services/libs/data-access-layer/src/packages/npmPackageState.ts b/services/libs/data-access-layer/src/packages/npmPackageState.ts index b24e992dfa..3b359bfdb6 100644 --- a/services/libs/data-access-layer/src/packages/npmPackageState.ts +++ b/services/libs/data-access-layer/src/packages/npmPackageState.ts @@ -11,8 +11,8 @@ export interface NpmMetadataRunResult { } // Mark a package as metadata-scanned and record the run outcome (+ timestamp). Keyed -// by purl (from the packages row). metadata_first_scanned_at is kept from the first -// insert; metadata_run_result/metadata_last_run_at are refreshed on every run. +// by purl (from the packages row). metadata_run_result/metadata_last_run_at are refreshed +// on every run — metadata_last_run_at is the authoritative "metadata has run" signal. export async function markNpmPackageScanned( qx: QueryExecutor, purl: string, @@ -28,7 +28,7 @@ export async function markNpmPackageScanned( ) } -// Critical npm packages in the `packages` table that have never been metadata-scanned. +// Critical npm packages in the `packages` table whose metadata has never been scanned. // Only is_critical packages are enriched — metadata is deep, per-package work, so it is // scoped to the critical set (matching the daily-downloads pass). // Keyset-paginated on purl so the workflow can drain a large first run across @@ -45,7 +45,7 @@ export async function getUnscannedNpmPurls( WHERE p.ecosystem = 'npm' AND p.is_critical = TRUE AND p.purl > $(afterPurl) - AND s.purl IS NULL + AND s.metadata_last_run_at IS NULL ORDER BY p.purl LIMIT $(batchSize)`, { afterPurl, batchSize },