Conversation
tests/workflows/test_rechunk.py
Outdated
|
|
||
|
|
||
| def test_rechunk(rechunk_client, s3_url): | ||
| x = da.random.random((5_000, 5_000, 5_000, 2), chunks="50 MB") # 1.82 TiB |
There was a problem hiding this comment.
TODO: Replace this with some public Zarr dataset on AWS
| def test_rechunk(rechunk_client, s3_url): | ||
| x = da.from_zarr("s3://mur-sst/zarr", component="sea_ice_fraction") # 3.80 TiB | ||
| y = x.rechunk("200 MB") | ||
| y.to_zarr(s3_url) |
There was a problem hiding this comment.
@rabernat we're trying to establish a set of benchmarks for common workloads faced by Dask users. Ideally these are more realistic and less of toy examples.
We were inspired by your thoughts on reading in a dataset stored in one chunking and then performing an analysis in another. This is a quick proxy to that. Do you happen to have anything that we could use that would be better? In terms of complexity we'd love something in between 20-200 lines of code.
There was a problem hiding this comment.
The most difficult / pathological case to test is a "full shuffle" rechunk, where every input chunk goes into every output chunk.
I'd look at this dataset.
An array is at s3://yuvipanda-test1/cmr/gpm3imergdl.zarr/HQprecipitation ... not quite as big as MUR but chunked contiguously in space.
Try something like
x = da.from_zarr("s3://yuvipanda-test1/cmr/gpm3imergdl.zarr/HQprecipitation")
y = x.rechunk((-1, 36, 180))There was a problem hiding this comment.
Thanks @rabernat . We're also trying to capture a broader set of common operations to keep things real-world-ish. These might eventually graduate into some sort of notebook repository with worked examples that people can take a look at. Is there some icing we could put on this particular cake? Maybe some common analysis and an image that would be produced for example?
There was a problem hiding this comment.
Or, maybe stated differently, @rabernat , what are 1-3 workflows you'd like run regularly as part of Dask's benchmark suite. These get used day-to-day by engineers as they run AB tests on various changes that they want to make. Workloads in this suite will naturally be optimized over time.
There was a problem hiding this comment.
I appreciate the chance to contribute to this. I'll try to follow up with more detail in the coming days.
There was a problem hiding this comment.
Thanks for the pointer @rabernat 🚀 Btw, what region is the dataset located in? I didn't see a region reference on the pangeo-force page or the corresponding feedstock
Try something like
That example seems to run well (albeit with lots of transfers) when I try it here. I'm currently using a large cluster (~750GiB of memory) which is much bigger than the dataset itself (~200 GiB). Under what conditions do you tend to see bad performance? Maybe on a cluster with less memory than the dataset itself?
There was a problem hiding this comment.
Just wanted to check in here -- @rabernat do you have additional details that'd be useful here (e.g. some common follow-up operations/plots like Matt mentioned)?
There was a problem hiding this comment.
Sorry for my slow response here. I would recommend you look at https://github.com/pangeo-data/distributed-array-examples/issues. This is our best attempt to collect the sort of workflows you're looking for from the Pangeo Community.
|
Interestingly I'm not able to reproduce the |
These have been cropping up everywhere. 🤦 See zarr-developers/zarr-python#1353. Should be fixed in Zarr 2.14.2. |
|
Ah, great. Thanks @rabernat! |
| @pytest.fixture(scope="module") | ||
| def rechunk_cluster( | ||
| dask_env_variables, | ||
| cluster_kwargs, | ||
| github_cluster_tags, | ||
| ): | ||
| with coiled.Cluster( | ||
| f"test-rechunk-{uuid.uuid4().hex[:8]}", | ||
| environ=dask_env_variables, | ||
| tags=github_cluster_tags, | ||
| **cluster_kwargs["embarrassingly_parallel_cluster"], | ||
| ) as cluster: | ||
| yield cluster | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def rechunk_client( | ||
| rechunk_cluster, | ||
| cluster_kwargs, | ||
| upload_cluster_dump, | ||
| benchmark_all, | ||
| ): | ||
| n_workers = cluster_kwargs["embarrassingly_parallel_cluster"]["n_workers"] | ||
| with Client(rechunk_cluster) as client: | ||
| rechunk_cluster.scale(n_workers) | ||
| client.wait_for_workers(n_workers) | ||
| client.restart() | ||
| with upload_cluster_dump(client), benchmark_all(client): | ||
| yield client |
There was a problem hiding this comment.
I'll eventually want to move these into a utility somewhere for better reusability, but for now I'm just copy-and-pasting form other tests
| def test_rechunk(rechunk_client, s3_url): | ||
| x = da.from_zarr("s3://mur-sst/zarr", component="sea_ice_fraction") # 3.80 TiB | ||
| y = x.rechunk("200 MB") | ||
| y.to_zarr(s3_url) |
There was a problem hiding this comment.
Thanks for the pointer @rabernat 🚀 Btw, what region is the dataset located in? I didn't see a region reference on the pangeo-force page or the corresponding feedstock
Try something like
That example seems to run well (albeit with lots of transfers) when I try it here. I'm currently using a large cluster (~750GiB of memory) which is much bigger than the dataset itself (~200 GiB). Under what conditions do you tend to see bad performance? Maybe on a cluster with less memory than the dataset itself?
|
@jrbourbeau: Is there anything you need help with to get this over the finish line? |

xref #725