Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
-- Wire packages-db into the Sequin → Kafka → Tinybird pipeline.
--
-- Two related changes bundled here because both serve the same goal — making
-- packages-db row changes replicate cleanly into Tinybird:
--
-- 1. Publication + REPLICA IDENTITY FULL on the 11 tables the Tinybird
-- datasources read from. publish_via_partition_root collapses the
-- versions (32) / package_dependencies (64) partition leaves into a
-- single logical topic each. REPLICA IDENTITY on a partitioned root
-- does not cascade, so every leaf is set explicitly via pg_inherits.
--
-- 2. rank_packages() bumps last_synced_at on every UPDATE that touches a
-- DS-exported field (impact, is_critical, last_rank_pass_at).
-- last_synced_at is the Tinybird ENGINE_VER for the packages datasource;
-- without this bump, ReplacingMergeTree may keep an older row when
-- criticality changes without any other write path touching the row.

-- ─── 1. Sequin publication ──────────────────────────────────────────────────

DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_publication WHERE pubname = 'sequin_pub'
) THEN
CREATE PUBLICATION sequin_pub
FOR TABLE
packages,
versions,
package_dependencies,
package_maintainers,
package_repos,
maintainers,
repos,
repo_scorecard_checks,
advisories,
advisory_packages,
advisory_affected_ranges
WITH (publish_via_partition_root = true);
END IF;
END$$;

ALTER TABLE public.packages REPLICA IDENTITY FULL;
ALTER TABLE public.versions REPLICA IDENTITY FULL;
ALTER TABLE public.package_dependencies REPLICA IDENTITY FULL;
ALTER TABLE public.package_maintainers REPLICA IDENTITY FULL;
ALTER TABLE public.package_repos REPLICA IDENTITY FULL;
ALTER TABLE public.maintainers REPLICA IDENTITY FULL;
ALTER TABLE public.repos REPLICA IDENTITY FULL;
ALTER TABLE public.repo_scorecard_checks REPLICA IDENTITY FULL;
ALTER TABLE public.advisories REPLICA IDENTITY FULL;
ALTER TABLE public.advisory_packages REPLICA IDENTITY FULL;
ALTER TABLE public.advisory_affected_ranges REPLICA IDENTITY FULL;

-- versions (32) and package_dependencies (64) are hash-partitioned. REPLICA
-- IDENTITY on the partitioned root does not cascade; set it on every leaf.
DO $$
DECLARE
parent_table text;
partition_oid regclass;
BEGIN
FOREACH parent_table IN ARRAY ARRAY['public.versions', 'public.package_dependencies']
LOOP
FOR partition_oid IN
SELECT inhrelid::regclass
FROM pg_inherits
WHERE inhparent = parent_table::regclass
LOOP
EXECUTE format('ALTER TABLE %s REPLICA IDENTITY FULL', partition_oid);
END LOOP;
END LOOP;
END$$;

-- ─── 2. rank_packages() bumps last_synced_at ────────────────────────────────

CREATE OR REPLACE FUNCTION rank_packages(
weight_downloads numeric DEFAULT 0.25,
weight_dependent_packages numeric DEFAULT 0.25,
weight_transitive numeric DEFAULT 0.50,
critical_top_n_by_ecosystem jsonb DEFAULT '{"npm":400000,"go":100000,"maven":200000,"pypi":100000,"nuget":50000,"cargo":75000}'::jsonb
)
RETURNS TABLE(scored_rows int, ranked_rows int)
LANGUAGE plpgsql AS $$
DECLARE
n_scored int;
n_ranked int;
BEGIN
-- Step 1: score
WITH percentile_scores AS (
SELECT
id,
(
weight_downloads * PERCENT_RANK() OVER (
PARTITION BY ecosystem ORDER BY LOG(1 + COALESCE(downloads_last_30d, 0)))

+ weight_dependent_packages * PERCENT_RANK() OVER (
PARTITION BY ecosystem ORDER BY LOG(1 + COALESCE(dependent_count, 0)))

+ weight_transitive * PERCENT_RANK() OVER (
PARTITION BY ecosystem ORDER BY LOG(1 + COALESCE(transitive_dependent_count, 0)))
)::numeric(10, 4) AS new_impact
FROM packages
WHERE ecosystem IN (SELECT jsonb_object_keys(critical_top_n_by_ecosystem))
)
UPDATE packages p
SET impact = ps.new_impact,
last_synced_at = NOW()
FROM percentile_scores ps
WHERE p.id = ps.id
AND p.impact IS DISTINCT FROM ps.new_impact;

GET DIAGNOSTICS n_scored = ROW_COUNT;

-- Step 2: rank + flag
WITH ranked AS (
SELECT
id, ecosystem,
ROW_NUMBER() OVER (
PARTITION BY ecosystem
ORDER BY impact DESC NULLS LAST, id
) AS r
FROM packages
WHERE purl IS NOT NULL
AND ecosystem IN (SELECT jsonb_object_keys(critical_top_n_by_ecosystem))
),
flagged AS (
SELECT
id, r,
COALESCE(
r <= (critical_top_n_by_ecosystem ->> ecosystem)::int,
FALSE
) AS new_is_critical
FROM ranked
)
UPDATE packages p
SET rank_in_ecosystem = f.r,
is_critical = f.new_is_critical,
last_synced_at = NOW()
FROM flagged f
WHERE p.id = f.id
AND (
p.rank_in_ecosystem IS DISTINCT FROM f.r
OR p.is_critical IS DISTINCT FROM f.new_is_critical
);

GET DIAGNOSTICS n_ranked = ROW_COUNT;

-- Step 2.5: spotlight overrides
UPDATE packages p
SET is_critical = TRUE,
last_synced_at = NOW()
FROM package_criticality_spotlight s
WHERE p.ecosystem = s.ecosystem
AND (p.namespace IS NOT DISTINCT FROM s.namespace)
AND p.name = s.name
AND p.is_critical = FALSE;

-- Step 3: stamp last_rank_pass_at unconditionally
UPDATE packages
SET last_rank_pass_at = NOW(),
last_synced_at = NOW()
WHERE ecosystem IN (SELECT jsonb_object_keys(critical_top_n_by_ecosystem));

RETURN QUERY SELECT n_scored, n_ranked;
END;
$$;
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ const REPOS_PG_COLUMNS = [
'open_issues',
]

// last_synced_at is intentionally left NULL on seed — it is owned by the GitHub
// enricher as its freshness signal. created_at / updated_at use their column defaults.
const REPOS_MERGE_SQL = `
INSERT INTO repos (url, raw_project_type, raw_project_name, host, owner, name,
description, homepage, stars, forks, open_issues, last_synced_at)
description, homepage, stars, forks, open_issues)
SELECT s.canonical_url, s.raw_project_type, s.raw_project_name, s.host, s.owner, s.name,
s.description, s.homepage, s.stars, s.forks, s.open_issues, NOW()
s.description, s.homepage, s.stars, s.forks, s.open_issues
FROM staging.osspckgs_repos_raw s
ON CONFLICT (url) DO NOTHING
`
Expand Down
15 changes: 8 additions & 7 deletions services/libs/data-access-layer/src/osspckgs/repos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@ export async function findRepoIdsByUrl(
* Inserts or updates a repo row keyed on url.
* Uses COALESCE so richer data from other enrichers (GitHub, deps.dev) is never
* overwritten with nulls from a partial write.
* `last_synced_at` is intentionally NOT touched here — that column is owned by the
* GitHub enricher as its freshness signal. Maven discovery only stamps updated_at.
* Returns the repo id.
*/
export async function upsertRepo(qx: QueryExecutor, item: IDbRepoUpsert): Promise<number> {
const row = await qx.selectOne(
`
INSERT INTO repos (url, host, owner, name, last_synced_at, updated_at)
VALUES ($(url), $(host), $(owner), $(name), NOW(), NOW())
INSERT INTO repos (url, host, owner, name, updated_at)
VALUES ($(url), $(host), $(owner), $(name), NOW())
ON CONFLICT (url) DO UPDATE SET
host = COALESCE(EXCLUDED.host, repos.host),
owner = COALESCE(EXCLUDED.owner, repos.owner),
name = COALESCE(EXCLUDED.name, repos.name),
last_synced_at = NOW(),
updated_at = NOW()
host = COALESCE(EXCLUDED.host, repos.host),
owner = COALESCE(EXCLUDED.owner, repos.owner),
name = COALESCE(EXCLUDED.name, repos.name),
updated_at = NOW()
RETURNING id
`,
item,
Expand Down
4 changes: 2 additions & 2 deletions services/libs/data-access-layer/src/packages/osv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ export async function getRangesForPackages(qx: QueryExecutor, ids: number[]): Pr
export async function flipVulnerableFlags(qx: QueryExecutor, ids: number[]): Promise<number> {
if (ids.length === 0) return 0
return qx.result(
`UPDATE packages SET has_critical_vulnerability = TRUE
`UPDATE packages SET has_critical_vulnerability = TRUE, last_synced_at = NOW()
WHERE id IN ($(ids:csv)) AND has_critical_vulnerability = FALSE`,
{ ids },
)
Expand All @@ -252,7 +252,7 @@ export async function flipVulnerableFlags(qx: QueryExecutor, ids: number[]): Pro
export async function clearSafeFlags(qx: QueryExecutor, ids: number[]): Promise<number> {
if (ids.length === 0) return 0
return qx.result(
`UPDATE packages SET has_critical_vulnerability = FALSE
`UPDATE packages SET has_critical_vulnerability = FALSE, last_synced_at = NOW()
WHERE id IN ($(ids:csv)) AND has_critical_vulnerability = TRUE`,
{ ids },
)
Expand Down
40 changes: 40 additions & 0 deletions services/libs/tinybird/datasources/advisories.datasource
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
DESCRIPTION >
- `advisories` contains OSV-shaped security advisories ingested from deps.dev BigQuery.
- Replicated from Postgres packages-db — each row is one advisory (CVE, GHSA, MAL-*, etc.).
- Used to surface critical vulnerabilities, track CVSS scores, and power security overlays in LFX Insights.
- `id` is the internal primary key.
- `osvId` is the canonical advisory identifier (e.g. GHSA-xxx, CVE-xxx, OSV-xxx) — globally unique.
- `source` is the originating database: 'GHSA', 'OSV', 'NVD', 'NSWG', etc. (empty string if unknown).
- `sourceUrl` is the upstream advisory URL (empty string if not provided).
- `aliases` is an array of alternate identifiers for the same advisory (e.g. a GHSA may alias a CVE).
- `severity` is the qualitative risk level: 'LOW', 'MEDIUM', 'HIGH', or 'CRITICAL' (empty string if unknown).
- `cvss` is the numeric CVSS score (0 if not available).
- `cvssSource` documents the provenance of the CVSS value: 'osv_cvss_v3', 'osv_qualitative_fallback', 'osv_malicious_package', etc.
- `isCritical` is 1 when cvss >= 7.0 (HIGH or CRITICAL), 0 otherwise — computed from the score.
- `summary` is a short human-readable description of the vulnerability (empty string if not provided).
- `details` is the full advisory text (empty string if not provided).
- `publishedAt` is when the advisory was first published upstream (NULL if unknown).
- `modifiedAt` is when the advisory was last modified upstream; NULL for BQ-sourced rows.
- `createdAt` and `updatedAt` are row-level audit timestamps for Tinybird watermark-based sync.

SCHEMA >
`id` UInt64 `json:$.record.id`,
`osvId` String `json:$.record.osv_id`,
`source` String `json:$.record.source` DEFAULT '',
`sourceUrl` String `json:$.record.source_url` DEFAULT '',
`aliases` Array(String) `json:$.record.aliases[:]` DEFAULT [],
`severity` String `json:$.record.severity` DEFAULT '',
`cvss` Float32 `json:$.record.cvss` DEFAULT 0,
`cvssSource` String `json:$.record.cvss_source` DEFAULT '',
`isCritical` UInt8 `json:$.record.is_critical` DEFAULT 0,
`summary` String `json:$.record.summary` DEFAULT '',
`details` String `json:$.record.details` DEFAULT '',
`publishedAt` Nullable(DateTime64(3)) `json:$.record.published_at`,
`modifiedAt` Nullable(DateTime64(3)) `json:$.record.modified_at`,
`createdAt` DateTime64(3) `json:$.record.created_at`,
`updatedAt` DateTime64(3) `json:$.record.updated_at`

ENGINE ReplacingMergeTree
ENGINE_PARTITION_KEY toYear(updatedAt)
ENGINE_SORTING_KEY osvId
ENGINE_VER updatedAt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
DESCRIPTION >
- `advisoryAffectedRanges` stores the version ranges affected by a security advisory per package.
- Replicated from Postgres packages-db — each row defines one vulnerable window (introduced → fixed/last_affected).
- Used to determine whether a specific package version falls within a known vulnerable range.
- `id` is the internal primary key.
- `advisoryPackageId` links to the parent advisory_packages row.
- `introducedVersion` is the version where the vulnerability was introduced (empty string if unknown start).
- `fixedVersion` is the version where the vulnerability was patched (empty string if no fix exists yet).
- `lastAffected` is the last known affected version when there is no fixed version (empty string if not applicable).
- `rangeRaw` is the raw AffectedVersions string from the deps.dev BigQuery source (empty string if OSV-sourced).
- `unaffectedRaw` is the raw UnaffectedVersions string from deps.dev BigQuery (empty string if OSV-sourced).
- `createdAt` and `updatedAt` are row-level audit timestamps for Tinybird watermark-based sync.

SCHEMA >
`id` UInt64 `json:$.record.id`,
`advisoryPackageId` UInt64 `json:$.record.advisory_package_id`,
`introducedVersion` String `json:$.record.introduced_version` DEFAULT '',
`fixedVersion` String `json:$.record.fixed_version` DEFAULT '',
`lastAffected` String `json:$.record.last_affected` DEFAULT '',
`rangeRaw` String `json:$.record.range_raw` DEFAULT '',
`unaffectedRaw` String `json:$.record.unaffected_raw` DEFAULT '',
`createdAt` DateTime64(3) `json:$.record.created_at`,
`updatedAt` DateTime64(3) `json:$.record.updated_at`

ENGINE ReplacingMergeTree
ENGINE_PARTITION_KEY toYear(updatedAt)
ENGINE_SORTING_KEY advisoryPackageId, id
ENGINE_VER updatedAt
24 changes: 24 additions & 0 deletions services/libs/tinybird/datasources/advisoryPackages.datasource
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
DESCRIPTION >
- `advisoryPackages` maps security advisories to the packages they affect.
- Replicated from Postgres packages-db — one advisory can affect multiple packages across different ecosystems.
- Used to determine which packages are impacted by a given advisory, and to backfill `has_critical_vulnerability` on packages.
- `id` is the internal primary key.
- `advisoryId` links to the parent advisory record.
- `packageId` links to the matching packages row (0 if the package exists in OSV but not yet in our DB).
- `ecosystem` is the package ecosystem (npm, go, maven, pypi, etc.).
- `packageName` is the package name within its ecosystem as reported by OSV.
- `createdAt` and `updatedAt` are row-level audit timestamps for Tinybird watermark-based sync.

SCHEMA >
`id` UInt64 `json:$.record.id`,
`advisoryId` UInt64 `json:$.record.advisory_id`,
`packageId` UInt64 `json:$.record.package_id` DEFAULT 0,
`ecosystem` String `json:$.record.ecosystem`,
`packageName` String `json:$.record.package_name`,
`createdAt` DateTime64(3) `json:$.record.created_at`,
`updatedAt` DateTime64(3) `json:$.record.updated_at`

ENGINE ReplacingMergeTree
ENGINE_PARTITION_KEY toYear(updatedAt)
ENGINE_SORTING_KEY ecosystem, packageName, id
ENGINE_VER updatedAt
28 changes: 28 additions & 0 deletions services/libs/tinybird/datasources/maintainers.datasource
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
DESCRIPTION >
- `maintainers` contains package maintainer profiles sourced from package registries (npm, PyPI, etc.).
- Replicated from Postgres packages-db — one row per unique (ecosystem, username) identity.
- Used to identify who maintains critical packages, correlate maintainers across ecosystems, and support contributor analytics.
- `id` is the internal primary key.
- `ecosystem` is the package registry this identity belongs to: 'npm', 'pypi', 'cargo', 'maven', etc.
- `username` is the maintainer's registry username — unique within an ecosystem.
- `displayName` is the maintainer's human-readable name as published in the registry (empty string if not provided).
- `url` is the maintainer's profile URL on the registry (empty string if not provided).
- `email` is the maintainer's email address as published in the registry (empty string if not provided).
- `githubLogin` is the maintainer's GitHub username if resolved (empty string if not linked).
- `createdAt` and `updatedAt` are row-level audit timestamps for Tinybird watermark-based sync.

SCHEMA >
`id` UInt64 `json:$.record.id`,
`ecosystem` String `json:$.record.ecosystem`,
`username` String `json:$.record.username`,
`displayName` String `json:$.record.display_name` DEFAULT '',
`url` String `json:$.record.url` DEFAULT '',
`email` String `json:$.record.email` DEFAULT '',
`githubLogin` String `json:$.record.github_login` DEFAULT '',
`createdAt` DateTime64(3) `json:$.record.created_at`,
`updatedAt` DateTime64(3) `json:$.record.updated_at`

ENGINE ReplacingMergeTree
ENGINE_PARTITION_KEY toYear(updatedAt)
ENGINE_SORTING_KEY ecosystem, username
ENGINE_VER updatedAt
31 changes: 31 additions & 0 deletions services/libs/tinybird/datasources/packageDependencies.datasource
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
DESCRIPTION >
- `packageDependencies` stores the dependency graph between package versions.
- Replicated from Postgres packages-db — one row per (version, dependency) edge in the dependency graph.
- Partitioned by hash(depends_on_id) in Postgres for fast downstream lookups; sorted here for analytical queries.
- Used to answer "what depends on package X?" (downstream consumers) and to compute dependent_repos_count / dependent_packages_count.
- `id` is the internal primary key.
- `packageId` is the ID of the package that contains the depending version.
- `versionId` is the specific version that declares the dependency.
- `dependsOnId` is the package being depended upon — the hot lookup key for vulnerability blast-radius queries.
- `dependsOnVersionId` is the resolved specific version of the dependency (0 if the exact version is unknown).
- `versionConstraint` is the declared version constraint string (e.g. '^1.2.3', '>=2.0.0'); empty string if not specified.
- `dependencyKind` is the dependency type: 'direct', 'dev', or 'peer'.
- `isOptional` is 1 if the dependency is marked optional, 0 otherwise.
- `createdAt` and `updatedAt` are row-level audit timestamps for Tinybird watermark-based sync.

SCHEMA >
`id` UInt64 `json:$.record.id`,
`packageId` UInt64 `json:$.record.package_id`,
`versionId` UInt64 `json:$.record.version_id`,
`dependsOnId` UInt64 `json:$.record.depends_on_id`,
`dependsOnVersionId` UInt64 `json:$.record.depends_on_version_id` DEFAULT 0,
`versionConstraint` String `json:$.record.version_constraint` DEFAULT '',
`dependencyKind` String `json:$.record.dependency_kind`,
`isOptional` UInt8 `json:$.record.is_optional` DEFAULT 0,
`createdAt` DateTime64(3) `json:$.record.created_at`,
`updatedAt` DateTime64(3) `json:$.record.updated_at`

ENGINE ReplacingMergeTree
ENGINE_PARTITION_KEY toYear(updatedAt)
ENGINE_SORTING_KEY dependsOnId, versionId, id
ENGINE_VER updatedAt
Loading
Loading