Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- ── 1. Add the 30d watermark columns to npm_package_state ──────────────────────
ALTER TABLE npm_package_state
ADD COLUMN IF NOT EXISTS downloads_30d_last_run_at timestamptz, -- breadth watermark: latest 30d window refreshed
ADD COLUMN IF NOT EXISTS downloads_30d_history_backfilled_at timestamptz, -- depth watermark: NULL until full older history filled
ADD COLUMN IF NOT EXISTS downloads_30d_run_result jsonb; -- { status, httpStatus?, errorKind?, message? }

-- Recreate the two indexes the old table had — both due-selection queries
-- filter/order on these columns.
CREATE INDEX IF NOT EXISTS npm_package_state_downloads_30d_last_run_at_idx
ON npm_package_state (downloads_30d_last_run_at);
CREATE INDEX IF NOT EXISTS npm_package_state_downloads_30d_history_backfilled_at_idx
ON npm_package_state (downloads_30d_history_backfilled_at);

-- ── 2. Migrate existing rows ───────────────────────────────────────────────────
INSERT INTO npm_package_state
(purl, downloads_30d_last_run_at, downloads_30d_history_backfilled_at, downloads_30d_run_result)
SELECT purl, downloads_30d_last_run_at, downloads_30d_history_backfilled_at, downloads_30d_run_result
FROM npm_package_universe_state
ON CONFLICT (purl) DO UPDATE SET
downloads_30d_last_run_at = EXCLUDED.downloads_30d_last_run_at,
downloads_30d_history_backfilled_at = EXCLUDED.downloads_30d_history_backfilled_at,
downloads_30d_run_result = EXCLUDED.downloads_30d_run_result;

-- ── 3. Drop the retired table ──────────────────────────────────────────────────
DROP TABLE npm_package_universe_state;
8 changes: 4 additions & 4 deletions services/apps/packages_worker/src/npm/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import {
getMissingDownloadDates,
getNpmChangesLastSeq,
getNpmPackagesNeedingDailyBackfill,
getNpmPurlsDueForLast30dHistory,
getNpmPurlsDueForLatest30d,
getNpmPurlsForChangedNames,
getNpmUniversePurlsDueForLast30dHistory,
getNpmUniversePurlsDueForLatest30d,
getUnscannedNpmPurls,
insertDailyDownloads,
logAuditFieldChanges,
Expand Down Expand Up @@ -507,7 +507,7 @@ export async function refreshLatestLast30dLane(
lanes: number,
): Promise<{ fetched: number }> {
const qx = await getPackagesDb()
const due = await getNpmUniversePurlsDueForLatest30d(qx, cutoff, batchSize, laneIndex, lanes)
const due = await getNpmPurlsDueForLatest30d(qx, cutoff, batchSize, laneIndex, lanes)
if (due.length === 0) return { fetched: 0 }

const window = latestLast30dWindow(utcFirstOfCurrentMonth())
Expand Down Expand Up @@ -554,7 +554,7 @@ export async function backfillLast30dHistoryLane(
lanes: number,
): Promise<{ fetched: number }> {
const qx = await getPackagesDb()
const due = await getNpmUniversePurlsDueForLast30dHistory(qx, batchSize, laneIndex, lanes)
const due = await getNpmPurlsDueForLast30dHistory(qx, batchSize, laneIndex, lanes)
if (due.length === 0) return { fetched: 0 }

const latestEnd = utcFirstOfCurrentMonth()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ export interface DailyBackfillCandidate {

// `laneIndex`/`laneCount` shard the due set across concurrent lanes by a stable hash
// of the purl, so each lane drains a disjoint slice (laneCount=1 ⇒ no sharding).
// Restricted to is_critical packages — daily downloads are deep per-package history,
// scoped to the critical set (matching the metadata pass). The 30d download passes run
// over all npm packages, since their counts feed the criticality ranking.
export async function getNpmPackagesNeedingDailyBackfill(
qx: QueryExecutor,
cutoff: string,
Expand All @@ -49,6 +52,7 @@ export async function getNpmPackagesNeedingDailyBackfill(
FROM packages p
LEFT JOIN npm_package_state s ON s.purl = p.purl
WHERE p.ecosystem = 'npm'
AND p.is_critical = TRUE
AND (((hashtext(p.purl) % $(laneCount)) + $(laneCount)) % $(laneCount)) = $(laneIndex)
AND (s.daily_downloads_last_processed_at IS NULL
OR s.daily_downloads_last_processed_at < $(cutoff)::timestamptz)
Expand Down
21 changes: 10 additions & 11 deletions services/libs/data-access-layer/src/packages/downloadsLast30d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ export interface Last30dCandidate {
// BREADTH selection. A purl is "due for the latest window" while its breadth watermark
// (`downloads_30d_last_run_at`) is older than this run's cutoff (or absent). The watermark
// is bumped once its current 30-day window is refreshed, so the monthly run touches every
// package's latest window exactly once and the denormalized number lands across the whole
// universe before any deep history is filled. Older history is a separate pass keyed on
// `downloads_30d_history_backfilled_at` — see getNpmUniversePurlsDueForLast30dHistory.
export async function getNpmUniversePurlsDueForLatest30d(
// package's latest window exactly once and the denormalized number lands across all npm
// packages before any deep history is filled. Older history is a separate pass keyed on
// `downloads_30d_history_backfilled_at` — see getNpmPurlsDueForLast30dHistory.
export async function getNpmPurlsDueForLatest30d(
qx: QueryExecutor,
cutoff: string,
batchSize: number,
Expand All @@ -39,7 +39,7 @@ export async function getNpmUniversePurlsDueForLatest30d(
`WITH due AS (
SELECT p.purl AS purl, p.first_release_at
FROM packages p
LEFT JOIN npm_package_universe_state s ON s.purl = p.purl
LEFT JOIN npm_package_state s ON s.purl = p.purl
WHERE p.ecosystem = 'npm'
AND (((hashtext(p.purl) % $(laneCount)) + $(laneCount)) % $(laneCount)) = $(laneIndex)
AND (s.downloads_30d_last_run_at IS NULL
Expand All @@ -61,7 +61,7 @@ export async function getNpmUniversePurlsDueForLatest30d(
// not yet been filled (`downloads_30d_history_backfilled_at IS NULL`). This keeps the work
// strictly breadth-first per package: history is never fetched before the latest window.
// Sharded the same way; oldest-breadth-first so the longest-waiting packages drain first.
export async function getNpmUniversePurlsDueForLast30dHistory(
export async function getNpmPurlsDueForLast30dHistory(
qx: QueryExecutor,
batchSize: number,
laneIndex: number,
Expand All @@ -71,9 +71,8 @@ export async function getNpmUniversePurlsDueForLast30dHistory(
`WITH due AS (
SELECT p.purl AS purl, p.first_release_at, s.downloads_30d_last_run_at AS last_run_at
FROM packages p
JOIN npm_package_universe_state s ON s.purl = p.purl
JOIN npm_package_state s ON s.purl = p.purl
WHERE p.ecosystem = 'npm'
AND p.is_critical = TRUE
AND (((hashtext(p.purl) % $(laneCount)) + $(laneCount)) % $(laneCount)) = $(laneIndex)
AND s.downloads_30d_last_run_at IS NOT NULL
AND s.downloads_30d_history_backfilled_at IS NULL
Expand All @@ -90,7 +89,7 @@ export async function getNpmUniversePurlsDueForLast30dHistory(
}

// Structured outcome of a last-30d run, stored as JSONB in
// npm_package_universe_state.downloads_30d_run_result.
// npm_package_state.downloads_30d_run_result.
export interface Last30dRunResult {
status: 'success' | 'error'
httpStatus?: number
Expand All @@ -107,7 +106,7 @@ export async function markLast30dProcessed(
result: Last30dRunResult,
): Promise<void> {
await qx.result(
`INSERT INTO npm_package_universe_state (purl, downloads_30d_last_run_at, downloads_30d_run_result)
Comment thread
cursor[bot] marked this conversation as resolved.
`INSERT INTO npm_package_state (purl, downloads_30d_last_run_at, downloads_30d_run_result)
VALUES ($(purl), NOW(), $(result)::jsonb)
ON CONFLICT (purl) DO UPDATE SET
downloads_30d_last_run_at = NOW(),
Expand All @@ -126,7 +125,7 @@ export async function markLast30dHistoryBackfilled(
result: Last30dRunResult,
): Promise<void> {
await qx.result(
`UPDATE npm_package_universe_state
`UPDATE npm_package_state
SET downloads_30d_history_backfilled_at = NOW(),
downloads_30d_run_result = $(result)::jsonb
WHERE purl = $(purl)`,
Expand Down
22 changes: 14 additions & 8 deletions services/libs/data-access-layer/src/packages/npmPackageState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ export interface NpmMetadataRunResult {
}

// Mark a package as metadata-scanned and record the run outcome (+ timestamp). Keyed
// by purl (from the packages row). metadata_first_scanned_at is kept from the first
// insert; metadata_run_result/metadata_last_run_at are refreshed on every run.
// by purl (from the packages row). metadata_run_result/metadata_last_run_at are refreshed
// on every run — metadata_last_run_at is the authoritative "metadata has run" signal.
export async function markNpmPackageScanned(
qx: QueryExecutor,
purl: string,
Expand All @@ -28,7 +28,9 @@ export async function markNpmPackageScanned(
)
}

// npm packages in the `packages` table that have never been metadata-scanned.
// Critical npm packages in the `packages` table whose metadata has never been scanned.
// Only is_critical packages are enriched — metadata is deep, per-package work, so it is
// scoped to the critical set (matching the daily-downloads pass).
// Keyset-paginated on purl so the workflow can drain a large first run across
// many continueAsNew runs.
export async function getUnscannedNpmPurls(
Expand All @@ -41,8 +43,9 @@ export async function getUnscannedNpmPurls(
FROM packages p
LEFT JOIN npm_package_state s ON s.purl = p.purl
WHERE p.ecosystem = 'npm'
AND p.is_critical = TRUE
AND p.purl > $(afterPurl)
AND s.purl IS NULL
AND s.metadata_last_run_at IS NULL
ORDER BY p.purl
LIMIT $(batchSize)`,
{ afterPurl, batchSize },
Expand All @@ -51,9 +54,11 @@ export async function getUnscannedNpmPurls(
}

// Given a list of changed npm registry names (from the _changes feed), return the
// purls of those that exist as npm rows in `packages`. The purl is read straight
// from the row; feed names are matched against the decoded namespace/name columns
// (the purl is percent-encoded, so substr(purl) would be %40scope/name).
// purls of the critical ones that exist as npm rows in `packages`. The purl is read
// straight from the row; feed names are matched against the decoded namespace/name
// columns (the purl is percent-encoded, so substr(purl) would be %40scope/name).
// Restricted to is_critical packages — non-critical packages are never metadata-scanned,
// so there is nothing to keep fresh when they change.
export async function getNpmPurlsForChangedNames(
qx: QueryExecutor,
names: string[],
Expand All @@ -64,7 +69,8 @@ export async function getNpmPurlsForChangedNames(
FROM packages p
JOIN unnest($(names)::text[]) AS u(name)
ON (CASE WHEN p.namespace IS NOT NULL THEN p.namespace || '/' || p.name ELSE p.name END) = u.name
WHERE p.ecosystem = 'npm'`,
WHERE p.ecosystem = 'npm'
AND p.is_critical = TRUE`,
{ names },
)
return rows.map((r) => r.purl)
Expand Down
Loading