From 8a8ed855fc4c67fede8cf1a996917978e19c56e9 Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Tue, 3 Mar 2026 13:17:06 +1100 Subject: [PATCH 1/2] feat: add `zarrs` `DecodeMode` support --- Cargo.toml | 7 +++++- README.md | 5 ++++ examples/.gitignore | 1 + examples/issue_152.py | 54 ++++++++++++++++++++++++++++++++++++++++ python/zarrs/pipeline.py | 1 + src/lib.rs | 24 +++++++++++++++--- 6 files changed, 87 insertions(+), 5 deletions(-) create mode 100644 examples/.gitignore create mode 100644 examples/issue_152.py diff --git a/Cargo.toml b/Cargo.toml index 10f33fa3..63aa6f62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ crate-type = ["cdylib", "rlib"] [dependencies] pyo3 = { version = "0.27.1", features = ["abi3-py311"] } -zarrs = { version = "0.23.0", features = ["async", "zlib", "pcodec", "bz2"] } +zarrs = { version = "0.23.6", features = ["async", "zlib", "pcodec", "bz2"] } rayon_iter_concurrent_limit = "0.2.0" rayon = "1.10.0" # fix for https://stackoverflow.com/questions/76593417/package-openssl-was-not-found-in-the-pkg-config-search-path @@ -29,3 +29,8 @@ zarrs_object_store = "0.5.0" # object_store 0.12 [profile.release] lto = true + +[patch.crates-io] +zarrs = { git = "https://github.com/zarrs/zarrs.git", branch = "feat/DecodeMode" } +zarrs_storage = { git = "https://github.com/zarrs/zarrs.git", branch = "feat/DecodeMode" } +zarrs_codec = { git = "https://github.com/zarrs/zarrs.git", branch = "feat/DecodeMode" } diff --git a/README.md b/README.md index f06add74..1fbbac97 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,10 @@ The `ZarrsCodecPipeline` specific options are: - Defaults to `True`. See [here](https://docs.rs/zarrs/latest/zarrs/config/struct.Config.html#validate-checksums) for more info. - `codec_pipeline.direct_io`: enable `O_DIRECT` read/write, needs support from the operating system (currently only Linux) and file system. - Defaults to `False`. +- `codec_pipeline.decode_mode`: controls the decode path used when reading a chunk subset. + - `"auto"` (default): use the full-chunk decode path when the requested subset covers the entire chunk, and the partial-decoder path otherwise. + - `"partial"`: always use the partial-decoder path, even for whole-chunk reads. Useful for codecs (e.g. sharding) where partial decoding is more efficient even for full-chunk reads. + - `"full"`: always decode the full chunk and extract the subset. - `codec_pipeline.strict`: raise exceptions for unsupported operations instead of falling back to the default codec pipeline of `zarr-python`. - Defaults to `False`. @@ -63,6 +67,7 @@ zarr.config.set({ "chunk_concurrent_maximum": None, "chunk_concurrent_minimum": 4, "direct_io": False, + "decode_mode": None, "strict": False } }) diff --git a/examples/.gitignore b/examples/.gitignore new file mode 100644 index 00000000..b2bf4f38 --- /dev/null +++ b/examples/.gitignore @@ -0,0 +1 @@ +issue_152.zarr diff --git a/examples/issue_152.py b/examples/issue_152.py new file mode 100644 index 00000000..d114a40e --- /dev/null +++ b/examples/issue_152.py @@ -0,0 +1,54 @@ +import platform +import subprocess +import tempfile +import time + +import numpy as np +import zarr + + +def clear_cache(): + if platform.system() == "Darwin": + subprocess.call(["sync", "&&", "sudo", "purge"]) + elif platform.system() == "Linux": + subprocess.call(["sudo", "sh", "-c", "sync; echo 3 > /proc/sys/vm/drop_caches"]) + else: + raise Exception("Unsupported platform") + +zarr.config.set({"codec_pipeline.path": "zarrs.ZarrsCodecPipeline"}) + +# zarr.config.set({"codec_pipeline.decode_mode": "auto"}) +# full read took: 3.3279900550842285 +# partial shard read (4095) took: 1.211921215057373 +# partial shard read (4096) took: 2.3402509689331055 + +zarr.config.set({"codec_pipeline.decode_mode": "partial"}) +# full read took: 2.2892508506774902 +# partial shard read (4095) took: 1.1934266090393066 +# partial shard read (4096) took: 1.1788337230682373 + +z = zarr.create_array( + "examples/issue_152.zarr", + shape=(8192, 4, 128, 128), + shards=(4096, 4, 128, 128), + chunks=(1, 1, 128, 128), + dtype=np.float64, + overwrite=True, +) +z[...] = np.random.randn(8192, 4, 128, 128) + +clear_cache() +t = time.time() +z[...] +print("full read took: ", time.time() - t) + +clear_cache() +t = time.time() +z[:4095, ...] +print("partial shard read (4095) took: ", time.time() - t) + + +clear_cache() +t = time.time() +z[:4096, ...] +print("partial shard read (4096) took: ", time.time() - t) diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index a3e49590..00a2fc0b 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -62,6 +62,7 @@ def get_codec_pipeline_impl( ), num_threads=config.get("threading.max_workers", None), direct_io=config.get("codec_pipeline.direct_io", False), + decode_mode=config.get("codec_pipeline.decode_mode", None), ) except TypeError as e: if strict: diff --git a/src/lib.rs b/src/lib.rs index 1b7c08e5..e699ca63 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,7 @@ use utils::is_whole_chunk; use zarrs::array::{ ArrayBytes, ArrayBytesDecodeIntoTarget, ArrayBytesFixedDisjointView, ArrayMetadata, ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecChain, CodecOptions, DataType, - FillValue, StoragePartialDecoder, copy_fill_value_into, update_array_bytes, + DecodeMode, FillValue, StoragePartialDecoder, copy_fill_value_into, update_array_bytes, }; use zarrs::config::global_config; use zarrs::convert::array_metadata_v2_to_v3; @@ -218,6 +218,7 @@ impl CodecPipelineImpl { chunk_concurrent_maximum=None, num_threads=None, direct_io=false, + decode_mode=None, ))] #[new] fn new( @@ -228,6 +229,7 @@ impl CodecPipelineImpl { chunk_concurrent_maximum: Option, num_threads: Option, direct_io: bool, + decode_mode: Option<&str>, ) -> PyResult { store_config.direct_io(direct_io); let metadata = serde_json::from_str(array_metadata).map_py_err::()?; @@ -239,7 +241,19 @@ impl CodecPipelineImpl { }; let codec_chain = Arc::new(CodecChain::from_metadata(&metadata_v3.codecs).map_py_err::()?); - let codec_options = CodecOptions::default().with_validate_checksums(validate_checksums); + let decode_mode = match decode_mode { + None | Some("auto") => DecodeMode::Auto, + Some("partial") => DecodeMode::Partial, + Some("full") => DecodeMode::Full, + Some(s) => { + return Err(PyErr::new::(format!( + "invalid decode_mode {s:?}, expected \"auto\", \"partial\", or \"full\"" + ))); + } + }; + let codec_options = CodecOptions::default() + .with_validate_checksums(validate_checksums) + .with_decode_mode(decode_mode); let chunk_concurrent_minimum = chunk_concurrent_minimum.unwrap_or(global_config().chunk_concurrent_minimum()); @@ -300,7 +314,9 @@ impl CodecPipelineImpl { // Assemble partial decoders ahead of time and in parallel let partial_chunk_items = chunk_descriptions .iter() - .filter(|item| !(is_whole_chunk(item))) + .filter(|item| { + !is_whole_chunk(item) || codec_options.decode_mode() == DecodeMode::Partial + }) .unique_by(|item| item.key.clone()) .collect::>(); let mut partial_decoder_cache: HashMap> = @@ -350,7 +366,7 @@ impl CodecPipelineImpl { }; let target = ArrayBytesDecodeIntoTarget::Fixed(&mut output_view); // See zarrs::array::Array::retrieve_chunk_subset_into - if is_whole_chunk(&item) { + if is_whole_chunk(&item) && codec_options.decode_mode() != DecodeMode::Partial { // See zarrs::array::Array::retrieve_chunk_into if let Some(chunk_encoded) = self.store.get(&item.key).map_py_err::()? From e1f5889f83d27da068a317193ba7dbea0641fac2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 3 Mar 2026 02:21:51 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- examples/issue_152.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/issue_152.py b/examples/issue_152.py index d114a40e..f93e4135 100644 --- a/examples/issue_152.py +++ b/examples/issue_152.py @@ -1,6 +1,5 @@ import platform import subprocess -import tempfile import time import numpy as np @@ -15,6 +14,7 @@ def clear_cache(): else: raise Exception("Unsupported platform") + zarr.config.set({"codec_pipeline.path": "zarrs.ZarrsCodecPipeline"}) # zarr.config.set({"codec_pipeline.decode_mode": "auto"})