diff --git a/Cargo.toml b/Cargo.toml index 10f33fa..6c4cd9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ crate-type = ["cdylib", "rlib"] [dependencies] pyo3 = { version = "0.27.1", features = ["abi3-py311"] } -zarrs = { version = "0.23.0", features = ["async", "zlib", "pcodec", "bz2"] } +zarrs = { version = "0.23.6", features = ["async", "zlib", "pcodec", "bz2"] } rayon_iter_concurrent_limit = "0.2.0" rayon = "1.10.0" # fix for https://stackoverflow.com/questions/76593417/package-openssl-was-not-found-in-the-pkg-config-search-path diff --git a/python/zarrs/__init__.py b/python/zarrs/__init__.py index cf5b8bd..c34bc23 100644 --- a/python/zarrs/__init__.py +++ b/python/zarrs/__init__.py @@ -1,6 +1,6 @@ from ._internal import __version__ from .pipeline import ZarrsCodecPipeline as _ZarrsCodecPipeline -from .utils import CollapsedDimensionError, DiscontiguousArrayError +from .utils import DiscontiguousArrayError, UnsupportedVIndexingError # Need to do this redirection so people can access the pipeline as `zarrs.ZarrsCodecPipeline` instead of `zarrs.pipeline.ZarrsCodecPipeline` @@ -11,6 +11,6 @@ class ZarrsCodecPipeline(_ZarrsCodecPipeline): __all__ = [ "ZarrsCodecPipeline", "DiscontiguousArrayError", - "CollapsedDimensionError", + "UnsupportedVIndexingError", "__version__", ] diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index a3e4959..d033820 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -26,9 +26,9 @@ from ._internal import CodecPipelineImpl from .utils import ( - CollapsedDimensionError, DiscontiguousArrayError, FillValueNoneError, + UnsupportedVIndexingError, make_chunk_info_for_rust_with_indices, ) @@ -185,7 +185,7 @@ async def read( except ( UnsupportedMetadataError, DiscontiguousArrayError, - CollapsedDimensionError, + UnsupportedVIndexingError, UnsupportedDataTypeError, FillValueNoneError, ): @@ -220,7 +220,7 @@ async def write( except ( UnsupportedMetadataError, DiscontiguousArrayError, - CollapsedDimensionError, + UnsupportedVIndexingError, UnsupportedDataTypeError, FillValueNoneError, ): diff --git a/python/zarrs/utils.py b/python/zarrs/utils.py index 8a314a7..0881cc1 100644 --- a/python/zarrs/utils.py +++ b/python/zarrs/utils.py @@ -30,7 +30,7 @@ class DiscontiguousArrayError(Exception): pass -class CollapsedDimensionError(Exception): +class UnsupportedVIndexingError(Exception): pass @@ -160,7 +160,7 @@ def make_chunk_info_for_rust_with_indices( drop_axes: tuple[int, ...], shape: tuple[int, ...], ) -> RustChunkInfo: - shape = shape if shape else (1,) # constant array + is_constant = shape == () chunk_info_with_indices: list[ChunkItem] = [] write_empty_chunks: bool = True for ( @@ -171,8 +171,12 @@ def make_chunk_info_for_rust_with_indices( _, ) in batch_info: write_empty_chunks = chunk_spec.config.write_empty_chunks + # Convert the selector tuples to ones that only have slices i.e., `i: int` replaced by slice(i, i+1) out_selection_as_slices = selector_tuple_to_slice_selection(out_selection) chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection) + # Because `chunk_selection_as_slices` contains only slices, certain types of vindex-ing are not going to be able to be processed by the zarrs pipeline. + # Thus we get the shapes of the input selector and the the converted-to-slices selector to check if they differ. + # If they differ, then the indexing operation is not supported because it is not describe-able as slices. shape_chunk_selection_slices = get_shape_for_selector( tuple(chunk_selection_as_slices), chunk_spec.shape, @@ -182,17 +186,44 @@ def make_chunk_info_for_rust_with_indices( shape_chunk_selection = get_shape_for_selector( chunk_selection, chunk_spec.shape, pad=True, drop_axes=drop_axes ) - if prod_op(shape_chunk_selection) != prod_op(shape_chunk_selection_slices): - raise CollapsedDimensionError( + if (chunk_size := prod_op(shape_chunk_selection)) != prod_op( + shape_chunk_selection_slices + ): + raise UnsupportedVIndexingError( f"{shape_chunk_selection} != {shape_chunk_selection_slices}" ) + if not is_constant and chunk_size > prod_op(shape): + raise IndexError( + f"the size of the chunk subset {shape_chunk_selection} and input/output subset {shape} are incompatible" + ) + io_array_shape = list(shape) + out_selection_expanded = out_selection_as_slices + # We need to have io_array_shape and out_selection_expanded with dimensionalities matching that of the underlying array. + # `drop_axes`` is only triggered via fancy outer-indexing because applying `chunk_selection_as_slices` to the chunk array would not drop a dimension that the out-array thinks should be dropped, thus that dimension needs to be indicated. + # However, other indexing operations can silently drop a dimension on input to match the output, like `z[1, ...]`. + # In other words, applying the `chunk_selection_as_slices` to a chunk array would drop a dimension, but `out_selection` already encodes this dropped dimension because zarr-python constructs the out-array missing the dimension. + # So if we detect that a dimension has been dropped silently like this after converting to slices, we update to handle the dropped dimension. + scs_iter = iter(shape_chunk_selection) + scs_current = next(scs_iter, None) + for idx_shape, shape_chunk_from_slices in enumerate( + shape_chunk_selection_slices + ): + # Detect if this dimension has been dropped on the io_array i.e., shape_chunk_selection has been exhausted so there is an extra 1-sized dimension at the end or has a mismatch with the "full" chunk shape `shape_chunk_selection_slices`. + if shape_chunk_from_slices == 1 != scs_current: + drop_axes += (idx_shape,) + else: + scs_current = next(scs_iter, None) + if drop_axes: + for axis in drop_axes: + io_array_shape.insert(axis, 1) + out_selection_expanded.insert(axis, slice(0, 1)) chunk_info_with_indices.append( ChunkItem( key=byte_getter.path, chunk_subset=chunk_selection_as_slices, chunk_shape=chunk_spec.shape, - subset=out_selection_as_slices, - shape=shape, + subset=out_selection_expanded, + shape=io_array_shape, ) ) return RustChunkInfo(chunk_info_with_indices, write_empty_chunks) diff --git a/src/chunk_item.rs b/src/chunk_item.rs index b759120..8a9aa0a 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -32,6 +32,7 @@ pub(crate) struct ChunkItem { pub subset: ArraySubset, pub shape: Vec, pub num_elements: u64, + pub array_shape: Vec, } #[gen_stub_pymethods] @@ -65,6 +66,7 @@ impl ChunkItem { subset, shape: chunk_shape_nonzero_u64, num_elements, + array_shape: shape_nonzero_u64, }) } } diff --git a/src/lib.rs b/src/lib.rs index 1b7c08e..7cc1a0f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,7 +38,7 @@ mod utils; use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; use crate::store::StoreConfig; -use crate::utils::{PyCodecErrExt, PyErrExt as _, PyUntypedArrayExt as _}; +use crate::utils::{PyCodecErrExt, PyErrExt as _}; // TODO: Use a OnceLock for store with get_or_try_init when stabilised? #[gen_stub_pyclass] @@ -288,7 +288,6 @@ impl CodecPipelineImpl { ) -> PyResult<()> { // Get input array let output = Self::nparray_to_unsafe_cell_slice(value)?; - let output_shape: Vec = value.shape_zarr()?; // Adjust the concurrency based on the codec chain and the first chunk description let Some((chunk_concurrent_limit, codec_options)) = @@ -343,7 +342,7 @@ impl CodecPipelineImpl { .fixed_size() .ok_or("variable length data type not supported") .map_py_err::()?, - &output_shape, + bytemuck::must_cast_slice(&item.array_shape), item.subset.clone(), ) .map_py_err::()? @@ -410,7 +409,6 @@ impl CodecPipelineImpl { } else { InputValue::Constant(FillValue::new(input_slice.to_vec())) }; - let input_shape: Vec = value.shape_zarr()?; // Adjust the concurrency based on the codec chain and the first chunk description let Some((chunk_concurrent_limit, mut codec_options)) = @@ -424,7 +422,11 @@ impl CodecPipelineImpl { let store_chunk = |item: ChunkItem| match &input { InputValue::Array(input) => { let chunk_subset_bytes = input - .extract_array_subset(&item.subset, &input_shape, &self.data_type) + .extract_array_subset( + &item.subset, + bytemuck::must_cast_slice(&item.array_shape), + &self.data_type, + ) .map_codec_err()?; self.store_chunk_subset_bytes( &item, diff --git a/src/utils.rs b/src/utils.rs index d663b5c..3ededf0 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,7 +1,6 @@ use std::fmt::Display; -use numpy::{PyUntypedArray, PyUntypedArrayMethods}; -use pyo3::{Bound, PyErr, PyResult, PyTypeInfo}; +use pyo3::{PyErr, PyResult, PyTypeInfo}; use zarrs::array::CodecError; use crate::ChunkItem; @@ -38,23 +37,6 @@ impl PyCodecErrExt for Result { } } -pub(crate) trait PyUntypedArrayExt { - fn shape_zarr(&self) -> PyResult>; -} - -impl PyUntypedArrayExt for Bound<'_, PyUntypedArray> { - fn shape_zarr(&self) -> PyResult> { - Ok(if self.shape().is_empty() { - vec![1] // scalar value - } else { - self.shape() - .iter() - .map(|&i| u64::try_from(i)) - .collect::>()? - }) - } -} - pub fn is_whole_chunk(item: &ChunkItem) -> bool { item.chunk_subset.start().iter().all(|&o| o == 0) && item.chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(&item.shape) diff --git a/tests/test_sharding.py b/tests/test_sharding.py index 2ff6234..599b5e9 100644 --- a/tests/test_sharding.py +++ b/tests/test_sharding.py @@ -148,9 +148,12 @@ def test_sharding_partial_readwrite( a[:] = data - for x in range(data.shape[0]): - read_data = a[x, :, :] - assert np.array_equal(data[x], read_data) + for axis in range(len(data.shape)): + for x in range(data.shape[0]): + selector = [slice(None), slice(None), slice(None)] + selector[axis] = x + read_data = a[*tuple(selector)] + assert np.array_equal(data[*tuple(selector)], read_data) @pytest.mark.parametrize(