Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ crate-type = ["cdylib", "rlib"]

[dependencies]
pyo3 = { version = "0.27.1", features = ["abi3-py311"] }
zarrs = { version = "0.23.0", features = ["async", "zlib", "pcodec", "bz2"] }
zarrs = { version = "0.23.6", features = ["async", "zlib", "pcodec", "bz2"] }
rayon_iter_concurrent_limit = "0.2.0"
rayon = "1.10.0"
# fix for https://stackoverflow.com/questions/76593417/package-openssl-was-not-found-in-the-pkg-config-search-path
Expand Down
4 changes: 2 additions & 2 deletions python/zarrs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from ._internal import __version__
from .pipeline import ZarrsCodecPipeline as _ZarrsCodecPipeline
from .utils import CollapsedDimensionError, DiscontiguousArrayError
from .utils import DiscontiguousArrayError, UnsupportedVIndexingError


# Need to do this redirection so people can access the pipeline as `zarrs.ZarrsCodecPipeline` instead of `zarrs.pipeline.ZarrsCodecPipeline`
Expand All @@ -11,6 +11,6 @@ class ZarrsCodecPipeline(_ZarrsCodecPipeline):
__all__ = [
"ZarrsCodecPipeline",
"DiscontiguousArrayError",
"CollapsedDimensionError",
"UnsupportedVIndexingError",
"__version__",
]
6 changes: 3 additions & 3 deletions python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@

from ._internal import CodecPipelineImpl
from .utils import (
CollapsedDimensionError,
DiscontiguousArrayError,
FillValueNoneError,
UnsupportedVIndexingError,
make_chunk_info_for_rust_with_indices,
)

Expand Down Expand Up @@ -185,7 +185,7 @@ async def read(
except (
UnsupportedMetadataError,
DiscontiguousArrayError,
CollapsedDimensionError,
UnsupportedVIndexingError,
UnsupportedDataTypeError,
FillValueNoneError,
):
Expand Down Expand Up @@ -220,7 +220,7 @@ async def write(
except (
UnsupportedMetadataError,
DiscontiguousArrayError,
CollapsedDimensionError,
UnsupportedVIndexingError,
UnsupportedDataTypeError,
FillValueNoneError,
):
Expand Down
43 changes: 37 additions & 6 deletions python/zarrs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class DiscontiguousArrayError(Exception):
pass


class CollapsedDimensionError(Exception):
class UnsupportedVIndexingError(Exception):
pass


Expand Down Expand Up @@ -160,7 +160,7 @@ def make_chunk_info_for_rust_with_indices(
drop_axes: tuple[int, ...],
shape: tuple[int, ...],
) -> RustChunkInfo:
shape = shape if shape else (1,) # constant array
is_constant = shape == ()
chunk_info_with_indices: list[ChunkItem] = []
write_empty_chunks: bool = True
for (
Expand All @@ -171,8 +171,12 @@ def make_chunk_info_for_rust_with_indices(
_,
) in batch_info:
write_empty_chunks = chunk_spec.config.write_empty_chunks
# Convert the selector tuples to ones that only have slices i.e., `i: int` replaced by slice(i, i+1)
out_selection_as_slices = selector_tuple_to_slice_selection(out_selection)
chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection)
# Because `chunk_selection_as_slices` contains only slices, certain types of vindex-ing are not going to be able to be processed by the zarrs pipeline.
# Thus we get the shapes of the input selector and the the converted-to-slices selector to check if they differ.
# If they differ, then the indexing operation is not supported because it is not describe-able as slices.
shape_chunk_selection_slices = get_shape_for_selector(
tuple(chunk_selection_as_slices),
chunk_spec.shape,
Expand All @@ -182,17 +186,44 @@ def make_chunk_info_for_rust_with_indices(
shape_chunk_selection = get_shape_for_selector(
chunk_selection, chunk_spec.shape, pad=True, drop_axes=drop_axes
)
if prod_op(shape_chunk_selection) != prod_op(shape_chunk_selection_slices):
raise CollapsedDimensionError(
if (chunk_size := prod_op(shape_chunk_selection)) != prod_op(
shape_chunk_selection_slices
):
raise UnsupportedVIndexingError(
f"{shape_chunk_selection} != {shape_chunk_selection_slices}"
)
if not is_constant and chunk_size > prod_op(shape):
raise IndexError(
f"the size of the chunk subset {shape_chunk_selection} and input/output subset {shape} are incompatible"
)
io_array_shape = list(shape)
out_selection_expanded = out_selection_as_slices
# We need to have io_array_shape and out_selection_expanded with dimensionalities matching that of the underlying array.
# `drop_axes`` is only triggered via fancy outer-indexing because applying `chunk_selection_as_slices` to the chunk array would not drop a dimension that the out-array thinks should be dropped, thus that dimension needs to be indicated.
# However, other indexing operations can silently drop a dimension on input to match the output, like `z[1, ...]`.
# In other words, applying the `chunk_selection_as_slices` to a chunk array would drop a dimension, but `out_selection` already encodes this dropped dimension because zarr-python constructs the out-array missing the dimension.
# So if we detect that a dimension has been dropped silently like this after converting to slices, we update to handle the dropped dimension.
scs_iter = iter(shape_chunk_selection)
scs_current = next(scs_iter, None)
for idx_shape, shape_chunk_from_slices in enumerate(
shape_chunk_selection_slices
):
# Detect if this dimension has been dropped on the io_array i.e., shape_chunk_selection has been exhausted so there is an extra 1-sized dimension at the end or has a mismatch with the "full" chunk shape `shape_chunk_selection_slices`.
if shape_chunk_from_slices == 1 != scs_current:
drop_axes += (idx_shape,)
else:
scs_current = next(scs_iter, None)
if drop_axes:
for axis in drop_axes:
io_array_shape.insert(axis, 1)
out_selection_expanded.insert(axis, slice(0, 1))
chunk_info_with_indices.append(
ChunkItem(
key=byte_getter.path,
chunk_subset=chunk_selection_as_slices,
chunk_shape=chunk_spec.shape,
subset=out_selection_as_slices,
shape=shape,
subset=out_selection_expanded,
shape=io_array_shape,
)
)
return RustChunkInfo(chunk_info_with_indices, write_empty_chunks)
2 changes: 2 additions & 0 deletions src/chunk_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub(crate) struct ChunkItem {
pub subset: ArraySubset,
pub shape: Vec<NonZeroU64>,
pub num_elements: u64,
pub array_shape: Vec<NonZeroU64>,
}

#[gen_stub_pymethods]
Expand Down Expand Up @@ -65,6 +66,7 @@ impl ChunkItem {
subset,
shape: chunk_shape_nonzero_u64,
num_elements,
array_shape: shape_nonzero_u64,
})
}
}
Expand Down
12 changes: 7 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mod utils;

use crate::concurrency::ChunkConcurrentLimitAndCodecOptions;
use crate::store::StoreConfig;
use crate::utils::{PyCodecErrExt, PyErrExt as _, PyUntypedArrayExt as _};
use crate::utils::{PyCodecErrExt, PyErrExt as _};

// TODO: Use a OnceLock for store with get_or_try_init when stabilised?
#[gen_stub_pyclass]
Expand Down Expand Up @@ -288,7 +288,6 @@ impl CodecPipelineImpl {
) -> PyResult<()> {
// Get input array
let output = Self::nparray_to_unsafe_cell_slice(value)?;
let output_shape: Vec<u64> = value.shape_zarr()?;

// Adjust the concurrency based on the codec chain and the first chunk description
let Some((chunk_concurrent_limit, codec_options)) =
Expand Down Expand Up @@ -343,7 +342,7 @@ impl CodecPipelineImpl {
.fixed_size()
.ok_or("variable length data type not supported")
.map_py_err::<PyTypeError>()?,
&output_shape,
bytemuck::must_cast_slice(&item.array_shape),
item.subset.clone(),
)
.map_py_err::<PyRuntimeError>()?
Expand Down Expand Up @@ -410,7 +409,6 @@ impl CodecPipelineImpl {
} else {
InputValue::Constant(FillValue::new(input_slice.to_vec()))
};
let input_shape: Vec<u64> = value.shape_zarr()?;

// Adjust the concurrency based on the codec chain and the first chunk description
let Some((chunk_concurrent_limit, mut codec_options)) =
Expand All @@ -424,7 +422,11 @@ impl CodecPipelineImpl {
let store_chunk = |item: ChunkItem| match &input {
InputValue::Array(input) => {
let chunk_subset_bytes = input
.extract_array_subset(&item.subset, &input_shape, &self.data_type)
.extract_array_subset(
&item.subset,
bytemuck::must_cast_slice(&item.array_shape),
&self.data_type,
)
.map_codec_err()?;
self.store_chunk_subset_bytes(
&item,
Expand Down
20 changes: 1 addition & 19 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::fmt::Display;

use numpy::{PyUntypedArray, PyUntypedArrayMethods};
use pyo3::{Bound, PyErr, PyResult, PyTypeInfo};
use pyo3::{PyErr, PyResult, PyTypeInfo};
use zarrs::array::CodecError;

use crate::ChunkItem;
Expand Down Expand Up @@ -38,23 +37,6 @@ impl<T> PyCodecErrExt<T> for Result<T, CodecError> {
}
}

pub(crate) trait PyUntypedArrayExt {
fn shape_zarr(&self) -> PyResult<Vec<u64>>;
}

impl PyUntypedArrayExt for Bound<'_, PyUntypedArray> {
fn shape_zarr(&self) -> PyResult<Vec<u64>> {
Ok(if self.shape().is_empty() {
vec![1] // scalar value
} else {
self.shape()
.iter()
.map(|&i| u64::try_from(i))
.collect::<Result<_, _>>()?
})
}
}

pub fn is_whole_chunk(item: &ChunkItem) -> bool {
item.chunk_subset.start().iter().all(|&o| o == 0)
&& item.chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(&item.shape)
Expand Down
9 changes: 6 additions & 3 deletions tests/test_sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,12 @@ def test_sharding_partial_readwrite(

a[:] = data

for x in range(data.shape[0]):
read_data = a[x, :, :]
assert np.array_equal(data[x], read_data)
for axis in range(len(data.shape)):
for x in range(data.shape[0]):
selector = [slice(None), slice(None), slice(None)]
selector[axis] = x
read_data = a[*tuple(selector)]
assert np.array_equal(data[*tuple(selector)], read_data)


@pytest.mark.parametrize(
Expand Down
Loading