Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ data/data_index
data/data_api
pipeline/pipeline_index
pipeline/pipeline_api
pipeline/pipeline_entrypoints
training/training_index
training/training_api
tutorial/tutorial_index
Expand Down
51 changes: 51 additions & 0 deletions docs/api/pipeline/pipeline_entrypoints.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Pipeline Entrypoints

As `PyEarthTools` pipelines propose a generic way to load and prepare various earth system datasets, it is possible to use
a pipeline as a source for [anemoi-datasets](https://anemoi.readthedocs.io/projects/datasets/en/latest/).

## Example

Below is a minimal example of using a `PyEarthTools` pipeline to load data and prepare it for `anemoi`, please see the `anemoi` docs
for more information on the `datasets` config.

### Create the Pipeline
Comment thread
HCookie marked this conversation as resolved.
Outdated

.. code-block:: python

import pyearthtools.data
import pyearthtools.pipeline

pipeline = pyearthtools.pipeline.Pipeline(
pyearthtools.data.download.arcoera5.ARCOERA5(['t2m', 'u10', 'v10']),
pyearthtools.pipeline.operations.xarray.values.FillNan()
)
pipeline.save('/PATH/TO/PIPELINE.yaml')

### Create the anemoi-datasets config

.. code-block:: yaml

name: pyearthtools_to_anemoi
description: PyEarthTools Pipeline converted to Anemoi
attribution: PyEarthTools

dates:
start: '2025-11-10T00:00:00'
end: '2025-11-12T00:00:00'
frequency: 1h

input:
pyearthtools: # Use the pyearthtools input object
pipeline: /PATH/TO/PIPELINE.yaml

### Run anemoi-datasets

.. code-block:: bash

anemoi-datasets create /path/to/anemoi/dataset.yaml

## Contract
Comment thread
HCookie marked this conversation as resolved.
Outdated
Comment thread
HCookie marked this conversation as resolved.
Outdated

The expected contract and result from the `PyEarthTools` pipeline is to return an `xarray` object of a single time index.

Both tools provide methods to modify the metadata of the data, and should be used accordingly to prepare for downstream uses.
4 changes: 4 additions & 0 deletions packages/pipeline/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ dependencies = [

dynamic = ["version", "readme"]

[project.entry-points]
# Add PyEarthTools as an anemoi datasets source
"anemoi.datasets.create.sources".pyearthtools = "pyearthtools.pipeline.entrypoints.anemoi:pyearthtoolsSource"
Comment thread
tennlee marked this conversation as resolved.

[project.optional-dependencies]
distributed = [
"dask",
Expand Down
48 changes: 48 additions & 0 deletions packages/pipeline/src/pyearthtools/pipeline/entrypoints/anemoi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from functools import cached_property
from pathlib import Path

from pyearthtools.pipeline import load
from pyearthtools.pipeline import Pipeline

import earthkit.data as ekd
from anemoi.datasets.create.source import Source
from anemoi.datasets.create.typing import DateList


class pyearthtoolsSource(Source):
emoji = "🌏" # For tracing

def __init__(self, context, pipeline: str | Path):
"""Initialise the source.

Parameters
----------
context : Any
The context for the data source.
pipeline: str
The path to the pyearthtools pipeline file.
"""
super().__init__(context)
self._pyearthtools_pipeline = pipeline

@cached_property
def pipeline(self) -> Pipeline:
return load(self._pyearthtools_pipeline)

def execute(self, dates: DateList) -> ekd.FieldList:
"""Execute the source.

Parameters
----------
dates : DateList
The input dates.

Returns
-------
ekd.FieldList
The output data.
"""
fields = []
for date in dates:
fields.extend(ekd.from_object(self.pipeline[date.isoformat()])) # type: ignore
return ekd.FieldList.from_fields(fields)