From e5e2ee77130ae93488fb9e7baa3c5a0df3e34dd0 Mon Sep 17 00:00:00 2001 From: Vecko <36369090+VeckoTheGecko@users.noreply.github.com> Date: Thu, 19 Mar 2026 10:35:31 +0100 Subject: [PATCH 1/6] Remove compat code around MPI --- src/parcels/_compat.py | 18 ------------------ src/parcels/_core/particleset.py | 9 ++------- 2 files changed, 2 insertions(+), 25 deletions(-) diff --git a/src/parcels/_compat.py b/src/parcels/_compat.py index c065407f7..96c29433d 100644 --- a/src/parcels/_compat.py +++ b/src/parcels/_compat.py @@ -1,23 +1,5 @@ """Import helpers for compatability between installations.""" -__all__ = ["MPI", "KMeans"] - -from typing import Any - -MPI: Any | None = None -KMeans: Any | None = None - -try: - from mpi4py import MPI # type: ignore[import-untyped,no-redef] -except ModuleNotFoundError: - pass - -# KMeans is used in MPI. sklearn not installed by default -try: - from sklearn.cluster import KMeans -except ModuleNotFoundError: - pass - # for compat with v3 of parcels when users provide `initial=attrgetter("lon")` to a Variable # so that particle initial state matches another variable diff --git a/src/parcels/_core/particleset.py b/src/parcels/_core/particleset.py index 6e33c867f..67e3c5715 100644 --- a/src/parcels/_core/particleset.py +++ b/src/parcels/_core/particleset.py @@ -51,8 +51,6 @@ class ParticleSet: Optional interval on which to repeat the release of the ParticleSet. Either timedelta object, or float in seconds. trajectory_ids : Optional list of "trajectory" values (integers) for the particle IDs - partition_function : - Function to use for partitioning particles over processors. Default is to use kMeans Other Variables can be initialised using further arguments (e.g. v=... for a Variable named 'v') """ @@ -106,11 +104,8 @@ def __init__( _warn_particle_times_outside_fieldset_time_bounds(time, fieldset.time_interval) for kwvar in kwargs: - if kwvar not in ["partition_function"]: - kwargs[kwvar] = np.array(kwargs[kwvar]).flatten() - assert lon.size == kwargs[kwvar].size, ( - f"{kwvar} and positions (lon, lat, z) don't have the same lengths." - ) + kwargs[kwvar] = np.array(kwargs[kwvar]).flatten() + assert lon.size == kwargs[kwvar].size, f"{kwvar} and positions (lon, lat, z) don't have the same lengths." self._data = create_particle_data( pclass=pclass, From acdc34eb450d4aeb5744de3d1ff1f8fd7778302b Mon Sep 17 00:00:00 2001 From: Vecko <36369090+VeckoTheGecko@users.noreply.github.com> Date: Thu, 19 Mar 2026 11:02:32 +0100 Subject: [PATCH 2/6] Remove items to do with custom partition function tmp --- .../user_guide/examples_v3/example_stommel.py | 43 ++++--------------- 1 file changed, 9 insertions(+), 34 deletions(-) diff --git a/docs/user_guide/examples_v3/example_stommel.py b/docs/user_guide/examples_v3/example_stommel.py index 66e416d94..068ac5812 100755 --- a/docs/user_guide/examples_v3/example_stommel.py +++ b/docs/user_guide/examples_v3/example_stommel.py @@ -88,11 +88,6 @@ def AgeP(particle, fieldset, time): # pragma: no cover particle.delete() -def simple_partition_function(coords, mpi_size=1): - """A very simple partition function that assigns particles to processors (for MPI testing purposes))""" - return np.linspace(0, mpi_size, coords.shape[0], endpoint=False, dtype=np.int32) - - def stommel_example( npart=1, verbose=False, @@ -101,7 +96,6 @@ def stommel_example( outfile="StommelParticle.zarr", repeatdt=None, maxage=None, - custom_partition_function=False, ): parcels.timer.fieldset = parcels.timer.Timer( "FieldSet", parent=parcels.timer.stommel @@ -125,27 +119,15 @@ def stommel_example( ] MyParticle = parcels.Particle.add_variables(extra_vars) - if custom_partition_function: - pset = parcels.ParticleSet.from_line( - fieldset, - size=npart, - pclass=MyParticle, - repeatdt=repeatdt, - start=(10e3, 5000e3), - finish=(100e3, 5000e3), - time=0, - partition_function=simple_partition_function, - ) - else: - pset = parcels.ParticleSet.from_line( - fieldset, - size=npart, - pclass=MyParticle, - repeatdt=repeatdt, - start=(10e3, 5000e3), - finish=(100e3, 5000e3), - time=0, - ) + pset = parcels.ParticleSet.from_line( + fieldset, + size=npart, + pclass=MyParticle, + repeatdt=repeatdt, + start=(10e3, 5000e3), + finish=(100e3, 5000e3), + time=0, + ) if verbose: print(f"Initial particle positions:\n{pset}") @@ -245,12 +227,6 @@ def main(args=None): type=int, help="max age of the particles (after which particles are deleted)", ) - p.add_argument( - "-cpf", - "--custom_partition_function", - default=False, - help="Use a custom partition_function (for MPI testing purposes)", - ) args = p.parse_args(args) parcels.timer.args.stop() @@ -262,7 +238,6 @@ def main(args=None): outfile=args.outfile, repeatdt=args.repeatdt, maxage=args.maxage, - custom_partition_function=args.custom_partition_function, ) parcels.timer.stommel.stop() parcels.timer.root.stop() From 740ba9b1474ed7e2e171389d743cf594aaa82969 Mon Sep 17 00:00:00 2001 From: Vecko <36369090+VeckoTheGecko@users.noreply.github.com> Date: Thu, 19 Mar 2026 11:02:40 +0100 Subject: [PATCH 3/6] Update tutorial_output --- docs/getting_started/tutorial_output.ipynb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/getting_started/tutorial_output.ipynb b/docs/getting_started/tutorial_output.ipynb index cd8dcbc1a..485c3c080 100644 --- a/docs/getting_started/tutorial_output.ipynb +++ b/docs/getting_started/tutorial_output.ipynb @@ -162,9 +162,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Note that if you are running Parcels on multiple processors with `mpirun`, you will need to concatenate the files of each processor, see also the [MPI documentation](https://docs.oceanparcels.org/en/latest/examples/documentation_MPI.html#Reading-in-the-ParticleFile-data-in-zarr-format).\n", - "\n", - "Also, once you have loaded the data as an `xarray` DataSet using `xr.open_zarr()`, you can always save the file to NetCDF if you prefer with the `.to_netcdf()` method.\n" + "Once you have loaded the data as an `xarray` DataSet using `xr.open_zarr()`, you can always save the file to NetCDF if you prefer with the `.to_netcdf()` method.\n" ] }, { From cf6a382db7e216bd7921f0db4e12e60428e0628d Mon Sep 17 00:00:00 2001 From: Vecko <36369090+VeckoTheGecko@users.noreply.github.com> Date: Thu, 19 Mar 2026 11:04:16 +0100 Subject: [PATCH 4/6] Delete MPI doc page --- .../examples_v3/documentation_MPI.ipynb | 207 ------------------ 1 file changed, 207 deletions(-) delete mode 100644 docs/user_guide/examples_v3/documentation_MPI.ipynb diff --git a/docs/user_guide/examples_v3/documentation_MPI.ipynb b/docs/user_guide/examples_v3/documentation_MPI.ipynb deleted file mode 100644 index d61b1169b..000000000 --- a/docs/user_guide/examples_v3/documentation_MPI.ipynb +++ /dev/null @@ -1,207 +0,0 @@ -{ - "cells": [ - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Parallelisation with MPI\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Parcels can be run in Parallel with MPI. To do this, you will need to also install the `mpich` and `mpi4py` packages (i.e., `conda install -c conda-forge mpich mpi4py`).\n", - "\n", - "Note that MPI support is only for Linux and macOS. There is no Windows support.\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Splitting the ParticleSet with MPI\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Once you have installed a parallel version of Parcels, you can simply run your script with\n", - "\n", - "```\n", - "mpirun -np python \n", - "```\n", - "\n", - "Where `` is the number of processors you want to use\n", - "\n", - "Parcels will then split the `ParticleSet` into `` smaller ParticleSets, based on a `sklearn.cluster.KMeans` clustering. Each of those smaller `ParticleSets` will be executed by one of the `` MPI processors.\n", - "\n", - "Note that in principle this means that all MPI processors need access to the full `FieldSet`, which can be Gigabytes in size for large global datasets. Therefore, efficient parallelisation only works if at the same time we also chunk the `FieldSet` into smaller domains.\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Optimising the partitioning of the particles with a user-defined partition_function" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Parcels uses so-called *partition functions* to assign particles to processors. By default, Parcels uses a `scikit-learn` `KMeans` clustering algorithm to partition the particles. However, you can also define your own partition function, which can be useful if you know more about the distribution of particles in your domain than the `KMeans` algorithm does.\n", - "\n", - "To define your own partition function, you need to define a function that takes a list of coordinates (lat, lon) as input, and returns a list of integers, defining the MPI processor that each particle should be assigned to. See the example below:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def simple_partition_function(coords, mpi_size=1):\n", - " \"\"\"A very simple partition function\n", - " that assigns particles to processors\n", - " \"\"\"\n", - " return np.linspace(0, mpi_size, coords.shape[0], endpoint=False, dtype=np.int32)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "To add the partition function to your script, you need to add it to the ParticleSet constructor:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "pset = ParticleSet(..., partition_function=simple_partition_function)" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Reading in the ParticleFile data in zarr format\n", - "\n", - "For efficiency, each processor will write its own data to a `zarr`-store. If the name of your `ParticleFile` is `fname`, then these stores will be located at `fname/proc00.zarr`, `fname/proc01.zarr`, etc.\n", - "\n", - "Reading in these stores and merging them into one `xarray.Dataset` can be done with\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from glob import glob\n", - "from os import path\n", - "\n", - "files = glob(path.join(fname, \"proc*\"))\n", - "ds = xr.concat(\n", - " [xr.open_zarr(f) for f in files],\n", - " dim=\"trajectory\",\n", - " compat=\"no_conflicts\",\n", - " coords=\"minimal\",\n", - ")" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Note that, if you have added particles during the `execute()` (for example because you used `repeatdt`), then the trajectories will not be ordered monotonically. While this may not be a problem, this will result in a different Dataset than a single-core simulation. If you do want the outputs of the MPI run to be the same as the single-core run, add `.sortby(['trajectory'])` at the end of the `xr.concat()` command\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ds = xr.concat(\n", - " [xr.open_zarr(f) for f in files],\n", - " dim=\"trajectory\",\n", - " compat=\"no_conflicts\",\n", - " coords=\"minimal\",\n", - ").sortby([\"trajectory\"])" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Note that if you want, you can save this new DataSet with the `.to_zarr()` or `.to_netcdf()` methods.\n", - "\n", - "When using `.to_zarr()`, then further analysis may be sped up by first rechunking the DataSet, by using `ds.chunk()`. Note that in some cases, you will first need to remove the chunks encoding information manually, using a code like below\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "For small projects, the above instructions are sufficient. If your project is large, then it is helpful to combine the `proc*` directories into a single zarr dataset and to optimise the chunking for your analysis. What is \"large\"? If you find yourself running out of memory while doing your analysis, saving the results, or sorting the dataset, or if reading the data is taking longer than you can tolerate, your problem is \"large.\" Another rule of thumb is if the size of your output directory is 1/3 or more of the memory of your machine, your problem is large. Chunking and combining the `proc*` data in order to speed up analysis is discussed [in the documentation on runs with large output](https://docs.oceanparcels.org/en/latest/examples/documentation_LargeRunsOutput.html).\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Future developments: load-balancing\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The current implementation of MPI parallelisation in Parcels is still fairly rudimentary. In particular, we will continue to develop the load-balancing of the `ParticleSet`.\n", - "\n", - "With load-balancing we mean that `Particles` that are close together are ideally on the same MPI processor. Practically, it means that we need to take care how `Particles` are spread over chunks and processors. See for example the two figures below:\n", - "\n", - "![](images/ParcelsParallel.png)\n", - "_Example of load-balancing for Particles. The domain is chunked along the thick lines, and the orange and blue particles are on separate MPI processors. Before load-balancing (left panel), two chuncks in the centre of the domain have both orange and blue particles. After the load-balancing (right panel), the Particles are redistributed over the processors so that the number of chunks and particles per processor is optimised._\n", - "\n", - "The difficulty is that since we don't know how the `ParticleSet` will disperse over time, we need to do this load-balancing 'on the fly'. If you to contribute to the optimisation of the load-balancing, please leave a message on [github](https://github.com/OceanParcels/parcels/issues)!\n" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.6" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} From 0bcfa6b308fc12a735002c69d408292a6a53e2ee Mon Sep 17 00:00:00 2001 From: Vecko <36369090+VeckoTheGecko@users.noreply.github.com> Date: Thu, 19 Mar 2026 11:05:01 +0100 Subject: [PATCH 5/6] Add TODO to doc page on dealing with large output We will update this doc page another time. Just having the TODO to avoid heavy editting - this can be revised another time. --- docs/user_guide/examples_v3/documentation_LargeRunsOutput.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user_guide/examples_v3/documentation_LargeRunsOutput.ipynb b/docs/user_guide/examples_v3/documentation_LargeRunsOutput.ipynb index bd8876b09..3180e8c6d 100644 --- a/docs/user_guide/examples_v3/documentation_LargeRunsOutput.ipynb +++ b/docs/user_guide/examples_v3/documentation_LargeRunsOutput.ipynb @@ -60,7 +60,7 @@ "id": "4", "metadata": {}, "source": [ - "## How to save the output of an MPI ocean parcels run to a single zarr dataset\n" + "## (TODO: Remove section now that we don't support MPI) How to save the output of an MPI ocean parcels run to a single zarr dataset\n" ] }, { From 1d81709a3e88286cccac4b1b1a091a8d47aff719 Mon Sep 17 00:00:00 2001 From: Vecko <36369090+VeckoTheGecko@users.noreply.github.com> Date: Thu, 19 Mar 2026 11:05:39 +0100 Subject: [PATCH 6/6] Remove test file --- tests-v3/test_mpirun.py | 45 ----------------------------------------- 1 file changed, 45 deletions(-) delete mode 100644 tests-v3/test_mpirun.py diff --git a/tests-v3/test_mpirun.py b/tests-v3/test_mpirun.py deleted file mode 100644 index cb29261f6..000000000 --- a/tests-v3/test_mpirun.py +++ /dev/null @@ -1,45 +0,0 @@ -import os -from glob import glob - -import numpy as np -import pytest -import xarray as xr - -from parcels._compat import MPI -from tests.utils import PROJECT_ROOT - - -@pytest.mark.skipif(MPI is None, reason="MPI not installed") -@pytest.mark.parametrize("repeatdt, maxage", [(200 * 86400, 600 * 86400), (100 * 86400, 100 * 86400)]) -@pytest.mark.parametrize("nump", [8]) -def test_mpi_run(tmpdir, repeatdt, maxage, nump): - stommel_file = PROJECT_ROOT / "docs/examples/example_stommel.py" - outputMPI = tmpdir.join("StommelMPI") - outputMPI_partition_function = tmpdir.join("StommelMPI_partition_function") - outputNoMPI = tmpdir.join("StommelNoMPI.zarr") - - os.system( - f"mpirun -np 2 python {stommel_file} -p {nump} -o {outputMPI_partition_function} -r {repeatdt} -a {maxage} -cpf True" - ) - os.system(f"mpirun -np 2 python {stommel_file} -p {nump} -o {outputMPI} -r {repeatdt} -a {maxage}") - os.system(f"python {stommel_file} -p {nump} -o {outputNoMPI} -r {repeatdt} -a {maxage}") - - ds2 = xr.open_zarr(outputNoMPI) - - for mpi_run in [outputMPI, outputMPI_partition_function]: - files = glob(os.path.join(mpi_run, "proc*")) - ds1 = xr.concat( - [xr.open_zarr(f) for f in files], dim="trajectory", compat="no_conflicts", coords="minimal" - ).sortby(["trajectory"]) - - for v in ds2.variables.keys(): - if v == "time": - continue # skip because np.allclose does not work well on np.datetime64 - assert np.allclose(ds1.variables[v][:], ds2.variables[v][:], equal_nan=True) - - for a in ds2.attrs: - if a != "parcels_version": - assert ds1.attrs[a] == ds2.attrs[a] - - ds1.close() - ds2.close()