Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ crate-type = ["cdylib", "rlib"]

[dependencies]
pyo3 = { version = "0.27.1", features = ["abi3-py311"] }
zarrs = { version = "0.23.0", features = ["async", "zlib", "pcodec", "bz2"] }
zarrs = { version = "0.23.6", features = ["async", "zlib", "pcodec", "bz2"] }
rayon_iter_concurrent_limit = "0.2.0"
rayon = "1.10.0"
# fix for https://stackoverflow.com/questions/76593417/package-openssl-was-not-found-in-the-pkg-config-search-path
Expand Down
33 changes: 1 addition & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,38 +89,7 @@ Chunk concurrency is typically favored because:

## Supported Indexing Methods

The following methods will trigger use with the old zarr-python pipeline:

1. Any `oindex` or `vindex` integer `np.ndarray` indexing with dimensionality >=3 i.e.,

```python
arr[np.array([...]), :, np.array([...])]
arr[np.array([...]), np.array([...]), np.array([...])]
arr[np.array([...]), np.array([...]), np.array([...])] = ...
arr.oindex[np.array([...]), np.array([...]), np.array([...])] = ...
```

2. Any `vindex` or `oindex` discontinuous integer `np.ndarray` indexing for writes in 2D

```python
arr[np.array([0, 5]), :] = ...
arr.oindex[np.array([0, 5]), :] = ...
```

3. `vindex` writes in 2D where both indexers are integer `np.ndarray` indices i.e.,

```python
arr[np.array([...]), np.array([...])] = ...
```

4. Ellipsis indexing. We have tested some, but others fail even with `zarr-python`'s default codec pipeline. Thus for now we advise proceeding with caution here.

```python
arr[0:10, ..., 0:5]
```


Furthermore, using anything except contiguous (i.e., slices or consecutive integer) `np.ndarray` for numeric data will fall back to the default `zarr-python` implementation.
The only supported methods are `slice` and `Ellipsis`-based methods - everything else will fall back to the `zarr-python` implementation (if `codec_pipeline.strict` is not enabled).

Please file an issue if you believe we have more holes in our coverage than we are aware of or you wish to contribute! For example, we have an [issue in zarrs for integer-array indexing](https://github.com/LDeakin/zarrs/issues/52) that would unblock a lot the use of the rust pipeline for that use-case (very useful for mini-batch training perhaps!).

Expand Down
5 changes: 2 additions & 3 deletions python/zarrs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from ._internal import __version__
from .pipeline import ZarrsCodecPipeline as _ZarrsCodecPipeline
from .utils import CollapsedDimensionError, DiscontiguousArrayError
from .utils import UnsupportedIndexTypeError


# Need to do this redirection so people can access the pipeline as `zarrs.ZarrsCodecPipeline` instead of `zarrs.pipeline.ZarrsCodecPipeline`
Expand All @@ -10,7 +10,6 @@ class ZarrsCodecPipeline(_ZarrsCodecPipeline):

__all__ = [
"ZarrsCodecPipeline",
"DiscontiguousArrayError",
"CollapsedDimensionError",
"UnsupportedIndexTypeError",
"__version__",
]
12 changes: 3 additions & 9 deletions python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@

from ._internal import CodecPipelineImpl
from .utils import (
CollapsedDimensionError,
DiscontiguousArrayError,
FillValueNoneError,
UnsupportedIndexTypeError,
make_chunk_info_for_rust_with_indices,
)

Expand Down Expand Up @@ -184,10 +182,8 @@ async def read(
)
except (
UnsupportedMetadataError,
DiscontiguousArrayError,
CollapsedDimensionError,
UnsupportedIndexTypeError,
UnsupportedDataTypeError,
FillValueNoneError,
):
if self.python_impl is None:
raise
Expand Down Expand Up @@ -219,10 +215,8 @@ async def write(
)
except (
UnsupportedMetadataError,
DiscontiguousArrayError,
CollapsedDimensionError,
UnsupportedIndexTypeError,
UnsupportedDataTypeError,
FillValueNoneError,
):
if self.python_impl is None:
raise
Expand Down
158 changes: 36 additions & 122 deletions python/zarrs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@
from functools import reduce
from typing import TYPE_CHECKING, Any

import numpy as np
from zarr.core.indexing import is_integer

from zarrs._internal import ChunkItem

if TYPE_CHECKING:
from collections.abc import Iterable
from types import EllipsisType

from zarr.abc.store import ByteGetter, ByteSetter
from zarr.core.array_spec import ArraySpec
Expand All @@ -26,121 +22,29 @@ def get_max_threads() -> int:
return (os.cpu_count() or 1) + 4


class DiscontiguousArrayError(Exception):
pass


class CollapsedDimensionError(Exception):
pass


class FillValueNoneError(Exception):
class UnsupportedIndexTypeError(Exception):
pass


# This is a (mostly) copy of the function from zarr.core.indexing that fixes:
# DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated
# TODO: Upstream this fix
def make_slice_selection(selection: tuple[np.ndarray | float]) -> list[slice]:
ls: list[slice] = []
for dim_selection in selection:
if is_integer(dim_selection):
ls.append(slice(int(dim_selection), int(dim_selection) + 1, 1))
elif isinstance(dim_selection, np.ndarray):
dim_selection = dim_selection.ravel()
if len(dim_selection) == 1:
ls.append(
slice(int(dim_selection.item()), int(dim_selection.item()) + 1, 1)
)
else:
diff = np.diff(dim_selection)
if (diff != 1).any() and (diff != 0).any():
raise DiscontiguousArrayError(diff)
ls.append(slice(dim_selection[0], dim_selection[-1] + 1, 1))
else:
ls.append(dim_selection)
return ls


def selector_tuple_to_slice_selection(selector_tuple: SelectorTuple) -> list[slice]:
if isinstance(selector_tuple, slice):
return [selector_tuple]
if all(isinstance(s, slice) for s in selector_tuple):
return list(selector_tuple)
return make_slice_selection(selector_tuple)


def resulting_shape_from_index(
array_shape: tuple[int, ...],
index_tuple: tuple[int | slice | EllipsisType | np.ndarray],
drop_axes: tuple[int, ...],
*,
pad: bool,
) -> tuple[int, ...]:
result_shape = []
advanced_index_shapes = [
idx.shape for idx in index_tuple if isinstance(idx, np.ndarray)
]
basic_shape_index = 0

# Broadcast all advanced indices, if any
if advanced_index_shapes:
result_shape += np.broadcast_shapes(*advanced_index_shapes)
# Consume dimensions from array_shape
basic_shape_index += len(advanced_index_shapes)

# Process each remaining index in index_tuple
for idx in index_tuple:
if isinstance(idx, int):
# Integer index reduces dimension, so skip this dimension in array_shape
basic_shape_index += 1
elif isinstance(idx, slice):
if idx.step is not None and idx.step > 1:
raise DiscontiguousArrayError(
"Step size greater than 1 is not supported"
)
# Slice keeps dimension, adjust size accordingly
start, stop, _ = idx.indices(array_shape[basic_shape_index])
result_shape.append(stop - start)
basic_shape_index += 1
elif idx is Ellipsis:
# Calculate number of dimensions that Ellipsis should fill
num_to_fill = len(array_shape) - len(index_tuple) + 1
result_shape += array_shape[
basic_shape_index : basic_shape_index + num_to_fill
]
basic_shape_index += num_to_fill
elif not isinstance(idx, np.ndarray):
raise ValueError(f"Invalid index type: {type(idx)}")

# Step 4: Append remaining dimensions from array_shape if fewer indices were used
if basic_shape_index < len(array_shape) and pad:
result_shape += array_shape[basic_shape_index:]

return tuple(size for idx, size in enumerate(result_shape) if idx not in drop_axes)
if all(
isinstance(s, slice) and (s.step is None or s.step == 1) or isinstance(s, int)
for s in selector_tuple
):
return list(
s if isinstance(s, slice) else slice(s, s + 1) for s in selector_tuple
)
raise UnsupportedIndexTypeError(
f"Invalid index type detected among indexes: {selector_tuple}"
)


def prod_op(x: Iterable[int]) -> int:
return reduce(operator.mul, x, 1)


def get_shape_for_selector(
selector_tuple: SelectorTuple,
shape: tuple[int, ...],
*,
pad: bool,
drop_axes: tuple[int, ...] = (),
) -> tuple[int, ...]:
if isinstance(selector_tuple, slice | np.ndarray):
return resulting_shape_from_index(
shape,
(selector_tuple,),
drop_axes,
pad=pad,
)
return resulting_shape_from_index(shape, selector_tuple, drop_axes, pad=pad)


def get_implicit_fill_value(dtype: ZDType, fill_value: Any) -> Any:
if fill_value is None:
fill_value = dtype.default_scalar()
Expand All @@ -160,7 +64,7 @@ def make_chunk_info_for_rust_with_indices(
drop_axes: tuple[int, ...],
shape: tuple[int, ...],
) -> RustChunkInfo:
shape = shape if shape else (1,) # constant array
is_shape_constant = shape == ()
chunk_info_with_indices: list[ChunkItem] = []
write_empty_chunks: bool = True
for (
Expand All @@ -171,28 +75,38 @@ def make_chunk_info_for_rust_with_indices(
_,
) in batch_info:
write_empty_chunks = chunk_spec.config.write_empty_chunks
out_selection_as_slices = selector_tuple_to_slice_selection(out_selection)
chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection)
shape_chunk_selection_slices = get_shape_for_selector(
tuple(chunk_selection_as_slices),
chunk_spec.shape,
pad=True,
drop_axes=drop_axes,
out_selection_as_slices_iter = iter(
selector_tuple_to_slice_selection(out_selection)
)
shape_chunk_selection = get_shape_for_selector(
chunk_selection, chunk_spec.shape, pad=True, drop_axes=drop_axes
out_selection_as_slices_expanded = list(
next(out_selection_as_slices_iter)
if not isinstance(c, int)
else slice(0, 1)
for c in chunk_selection
)
if prod_op(shape_chunk_selection) != prod_op(shape_chunk_selection_slices):
raise CollapsedDimensionError(
f"{shape_chunk_selection} != {shape_chunk_selection_slices}"
)
if is_shape_constant:
io_array_shape = (1,)
else:
shape_iter = iter(shape)
try:
io_array_shape = list(
next(shape_iter) if not isinstance(c, int) else 1
for c in chunk_selection
)
except RuntimeError as e:
if isinstance(e.__cause__, StopIteration):
raise IndexError(
f"the size of the chunk subset {chunk_selection} and input/output subset {shape} are incompatible"
) from None
raise
chunk_info_with_indices.append(
ChunkItem(
key=byte_getter.path,
chunk_subset=chunk_selection_as_slices,
chunk_shape=chunk_spec.shape,
subset=out_selection_as_slices,
shape=shape,
subset=out_selection_as_slices_expanded,
shape=io_array_shape,
)
)
return RustChunkInfo(chunk_info_with_indices, write_empty_chunks)
2 changes: 2 additions & 0 deletions src/chunk_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub(crate) struct ChunkItem {
pub chunk_subset: ArraySubset,
pub subset: ArraySubset,
pub shape: Vec<NonZeroU64>,
pub output_shape: Vec<NonZeroU64>,
pub num_elements: u64,
}

Expand Down Expand Up @@ -65,6 +66,7 @@ impl ChunkItem {
subset,
shape: chunk_shape_nonzero_u64,
num_elements,
output_shape: shape_nonzero_u64
})
}
}
Expand Down
8 changes: 3 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mod utils;

use crate::concurrency::ChunkConcurrentLimitAndCodecOptions;
use crate::store::StoreConfig;
use crate::utils::{PyCodecErrExt, PyErrExt as _, PyUntypedArrayExt as _};
use crate::utils::{PyCodecErrExt, PyErrExt as _};

// TODO: Use a OnceLock for store with get_or_try_init when stabilised?
#[gen_stub_pyclass]
Expand Down Expand Up @@ -288,7 +288,6 @@ impl CodecPipelineImpl {
) -> PyResult<()> {
// Get input array
let output = Self::nparray_to_unsafe_cell_slice(value)?;
let output_shape: Vec<u64> = value.shape_zarr()?;

// Adjust the concurrency based on the codec chain and the first chunk description
let Some((chunk_concurrent_limit, codec_options)) =
Expand Down Expand Up @@ -343,7 +342,7 @@ impl CodecPipelineImpl {
.fixed_size()
.ok_or("variable length data type not supported")
.map_py_err::<PyTypeError>()?,
&output_shape,
bytemuck::must_cast_slice::<_, u64>(&item.output_shape),
item.subset.clone(),
)
.map_py_err::<PyRuntimeError>()?
Expand Down Expand Up @@ -410,7 +409,6 @@ impl CodecPipelineImpl {
} else {
InputValue::Constant(FillValue::new(input_slice.to_vec()))
};
let input_shape: Vec<u64> = value.shape_zarr()?;

// Adjust the concurrency based on the codec chain and the first chunk description
let Some((chunk_concurrent_limit, mut codec_options)) =
Expand All @@ -424,7 +422,7 @@ impl CodecPipelineImpl {
let store_chunk = |item: ChunkItem| match &input {
InputValue::Array(input) => {
let chunk_subset_bytes = input
.extract_array_subset(&item.subset, &input_shape, &self.data_type)
.extract_array_subset(&item.subset, bytemuck::must_cast_slice::<_, u64>(&item.output_shape), &self.data_type)
.map_codec_err()?;
self.store_chunk_subset_bytes(
&item,
Expand Down
Loading
Loading