This repo includes a Dagster implementation of the Ensembl preparation pipeline as a parallel alternative to the Prefect flows.
The Dagster implementation lives under:
src/prepare_annotations/definitions.py(main Dagster definitions)src/prepare_annotations/assets/(asset definitions)src/prepare_annotations/core/(shared utilities)
It is intentionally file/directory based: each asset materializes a concrete on-disk artifact (a JSON manifest, a directory of VCFs, a directory of Parquet files, etc.). This makes lineage inspectable and keeps memory usage predictable.
- Lineage-first assets: each asset returns a concrete on-disk artifact (Path) to avoid passing large in-memory objects.
- Dynamic partitioning: per-file assets are partitioned by filename for fine-grained lineage and UI progress.
- Memory safety: prefer streaming (
LazyFrame.sink_parquetwithengine="streaming"by default) and avoid eager materialization during conversion. - Scale-aware joins: for joins that Polars would materialize in memory, prefer DuckDB or staged filtering.
- Resource visibility: download/convert steps log duration and peak memory where available.
- Idempotent outputs: assets skip work when target files are present and up-to-date.
- Parallel downloads: Configurable concurrent downloads (
max_concurrent_downloads, default: 4) - Retry policies: Dagster retry policy (max 3 attempts) plus downloader retries (default: 10)
- Checksum verification: BSD sum checksum validation using CHECKSUMS file from Ensembl FTP
- Resumable downloads: fsspec filecache-based resumption for interrupted transfers
- Uploads directly from the non-split Parquet directory (no legacy TSA splitting in Dagster)
The default pipeline prepares Ensembl VCFs into Parquet format:
- Discover remote VCF URLs on the Ensembl FTP (per species)
- Download the VCFs in parallel (with retries, resume, and optional CHECKSUMS verification)
- Convert VCF → Parquet (streaming write; no full in-memory collect)
- Optional: upload the Parquet dataset to HuggingFace Hub
flowchart TD
A[ensembl_ftp_source<br/>(external)] --> B[ensembl_vcf_urls<br/>vcf_urls.json]
B --> C1[ensembl_vcf_file<br/>per-file download<br/>(partitioned)]
C1 --> C2[ensembl_vcf_files<br/>vcf/ directory<br/>(batch downloads)]
C2 --> D1[ensembl_parquet_file<br/>per-file conversion<br/>(partitioned)]
D1 --> D2[ensembl_parquet_files<br/>species dir (*.parquet)]
D2 --> F[ensembl_hf_upload<br/>(optional)]
ensembl_ftp_source (external)
|
v
ensembl_vcf_urls (vcf_urls.json)
|
v
ensembl_vcf_file (per-file download, partitioned)
|
v
ensembl_vcf_files (vcf/ directory, parallel downloads with retries)
|
v
ensembl_parquet_file (per-file conversion, partitioned)
|
v
ensembl_parquet_files (species directory with *.parquet)
|
v
ensembl_hf_upload (optional)
Paths are resolved via src/prepare_annotations/core/paths.py.
By default the pipeline writes to your user cache (same convention as other Just DNA tooling):
- Base cache dir:
~/.cache/just-dna-pipelines/(orJUST_DNA_PIPELINES_CACHE_DIR)
For Ensembl:
~/.cache/just-dna-pipelines/ensembl/{species}/vcf_urls.json(URL manifest)vcf/(downloaded.vcf.gzfiles)*.parquet(per-chromosome conversions, e.g.homo_sapiens-chr1.parquet)
- Parallel downloads via
ThreadPoolExecutorwith configurable concurrency (max_concurrent_downloads). - Retry policy with exponential backoff (30s initial delay, up to 3 retries) at the Dagster asset level.
- Resumable downloads via fsspec filecache (interrupted downloads resume from where they left off).
- Checksum verification using BSD sum (
CHECKSUMSfile from Ensembl FTP); corrupted files are automatically re-downloaded. - VCF → Parquet uses
polars-bioscanning andLazyFrame.sink_parquet(..., engine="streaming")to stream to disk by default. - Resource logging: conversion and download steps record duration/peak memory when available.
- Join strategy: when Polars would materialize full datasets on joins, prefer DuckDB or pre-filtered joins to limit memory pressure.
- Dagster assets return Paths (manifest files / directories), not large Python lists, to avoid passing large in-memory objects between steps.
Run the full pipeline (download → convert → upload):
uv run dagster-ensemblThis is equivalent to:
uv run dagster-ensembl run --job fullRun for a different species:
uv run dagster-ensembl run --species mus_musculusRun specific jobs:
uv run dagster-ensembl run --job prepare # download + convert (no upload)
uv run dagster-ensembl run --job download # download only
uv run dagster-ensembl run --job convert # convert only
uv run dagster-ensembl run --job upload # upload onlyList available jobs:
uv run dagster-ensembl jobsRun LongevityMap conversion (Dagster module assets):
uv run dagster-ensembl longevitymap
uv run dagster-ensembl longevitymap --full
uv run dagster-ensembl longevitymap --uploadStart the web interface for interactive execution:
uv run dagster-uiThen materialize assets / jobs from the UI.
Jobs are defined in src/prepare_annotations/definitions.py:
| Job | Description |
|---|---|
full |
Complete pipeline: download → convert → upload (default) |
prepare |
Download and convert to Parquet (no upload) |
download |
Download VCF files only (parallel with retries) |
convert |
Convert VCF to Parquet (assumes VCFs downloaded) |
upload |
Upload to HuggingFace Hub (assumes parquet exists) |
longevitymap |
Convert LongevityMap to unified schema (with Ensembl genotype resolution) |
longevitymap_full |
Convert LongevityMap and join with full Ensembl data |
longevitymap_upload |
Convert LongevityMap and upload to just-dna-seq/annotators |
Key configuration parameters (set via Dagster config):
EnsemblDownloadConfig:
species: Species name (default:homo_sapiens)base_url: Ensembl FTP base URL (default:https://ftp.ensembl.org/pub/current_variation/vcf/)pattern: Regex to filter remote files (default: species-aware pattern)cache_dir: Override cache directory (default:~/.cache/just-dna-pipelines/ensembl/{species})max_concurrent_downloads: Maximum parallel downloads (default:4)verify_checksums: Whether to verify checksums (default:True)force_download: Re-download even if files already exist (default:False)http_max_pool: HTTP pool size for downloader (default:20)retries: Number of retry attempts per file (default:10)connect_timeout: Connection timeout in seconds (default:10.0)sock_read_timeout: Socket read timeout in seconds (default:120.0)
ParquetConversionConfig:
max_concurrent_conversions: Maximum parallel conversions. If unset, usesPREPARE_ANNOTATIONS_PARQUET_WORKERSenv var; defaults to2.threads: Thread count per conversion (auto-detected if not set).force_convert: Re-convert even when parquet is up-to-date.
Environment overrides:
PREPARE_ANNOTATIONS_PARQUET_WORKERS: Max concurrent parquet conversions (used when config not set).
The upload asset (ensembl_hf_upload) depends on the parquet directory output (ensembl_parquet_files). In the Dagster UI, this makes it straightforward to answer:
- "Which local dataset was uploaded?"
- "When did we last upload, and what was uploaded vs skipped?"
Uploads are executed using the existing uploader implementation:
prepare_annotations.huggingface.uploader.upload_parquet_to_hf
