From 79a6ad788630055320a8e09f92699c9b253a9749 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Mon, 26 Jan 2026 17:39:20 +0100 Subject: [PATCH] lift array methods to separate functions --- src/zarr/core/array.py | 951 ++++++++++++++++++++++++++++++++--------- 1 file changed, 748 insertions(+), 203 deletions(-) diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 00536a1ec0..0e68288c68 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -1337,13 +1337,7 @@ async def example(): result = asyncio.run(example()) ``` """ - if self.shards is None: - chunks_per_shard = 1 - else: - chunks_per_shard = product( - tuple(a // b for a, b in zip(self.shards, self.chunks, strict=True)) - ) - return (await self._nshards_initialized()) * chunks_per_shard + return await _nchunks_initialized(self) async def _nshards_initialized(self) -> int: """ @@ -1381,10 +1375,10 @@ async def example(): result = asyncio.run(example()) ``` """ - return len(await _shards_initialized(self)) + return await _nshards_initialized(self) async def nbytes_stored(self) -> int: - return await self.store_path.store.getsize_prefix(self.store_path.path) + return await _nbytes_stored(self.store_path) def _iter_chunk_coords( self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None @@ -1549,49 +1543,16 @@ async def _get_selection( out: NDBuffer | None = None, fields: Fields | None = None, ) -> NDArrayLikeOrScalar: - # check fields are sensible - out_dtype = check_fields(fields, self.dtype) - - # setup output buffer - if out is not None: - if isinstance(out, NDBuffer): - out_buffer = out - else: - raise TypeError(f"out argument needs to be an NDBuffer. Got {type(out)!r}") - if out_buffer.shape != indexer.shape: - raise ValueError( - f"shape of out argument doesn't match. Expected {indexer.shape}, got {out.shape}" - ) - else: - out_buffer = prototype.nd_buffer.empty( - shape=indexer.shape, - dtype=out_dtype, - order=self.order, - ) - if product(indexer.shape) > 0: - # need to use the order from the metadata for v2 - _config = self._config - if self.metadata.zarr_format == 2: - _config = replace(_config, order=self.order) - - # reading chunks and decoding them - await self.codec_pipeline.read( - [ - ( - self.store_path / self.metadata.encode_chunk_key(chunk_coords), - self.metadata.get_chunk_spec(chunk_coords, _config, prototype=prototype), - chunk_selection, - out_selection, - is_complete_chunk, - ) - for chunk_coords, chunk_selection, out_selection, is_complete_chunk in indexer - ], - out_buffer, - drop_axes=indexer.drop_axes, - ) - if isinstance(indexer, BasicIndexer) and indexer.shape == (): - return out_buffer.as_scalar() - return out_buffer.as_ndarray_like() + return await _get_selection( + self.store_path, + self.metadata, + self.codec_pipeline, + self._config, + indexer, + prototype=prototype, + out=out, + fields=fields, + ) async def getitem( self, @@ -1636,14 +1597,14 @@ async def example(): value = asyncio.run(example()) ``` """ - if prototype is None: - prototype = default_buffer_prototype() - indexer = BasicIndexer( + return await _getitem( + self.store_path, + self.metadata, + self.codec_pipeline, + self._config, selection, - shape=self.metadata.shape, - chunk_grid=self.metadata.chunk_grid, + prototype=prototype, ) - return await self._get_selection(indexer, prototype=prototype) async def get_orthogonal_selection( self, @@ -1653,11 +1614,15 @@ async def get_orthogonal_selection( fields: Fields | None = None, prototype: BufferPrototype | None = None, ) -> NDArrayLikeOrScalar: - if prototype is None: - prototype = default_buffer_prototype() - indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid) - return await self._get_selection( - indexer=indexer, out=out, fields=fields, prototype=prototype + return await _get_orthogonal_selection( + self.store_path, + self.metadata, + self.codec_pipeline, + self._config, + selection, + out=out, + fields=fields, + prototype=prototype, ) async def get_mask_selection( @@ -1668,11 +1633,15 @@ async def get_mask_selection( fields: Fields | None = None, prototype: BufferPrototype | None = None, ) -> NDArrayLikeOrScalar: - if prototype is None: - prototype = default_buffer_prototype() - indexer = MaskIndexer(mask, self.shape, self.metadata.chunk_grid) - return await self._get_selection( - indexer=indexer, out=out, fields=fields, prototype=prototype + return await _get_mask_selection( + self.store_path, + self.metadata, + self.codec_pipeline, + self._config, + mask, + out=out, + fields=fields, + prototype=prototype, ) async def get_coordinate_selection( @@ -1683,18 +1652,17 @@ async def get_coordinate_selection( fields: Fields | None = None, prototype: BufferPrototype | None = None, ) -> NDArrayLikeOrScalar: - if prototype is None: - prototype = default_buffer_prototype() - indexer = CoordinateIndexer(selection, self.shape, self.metadata.chunk_grid) - out_array = await self._get_selection( - indexer=indexer, out=out, fields=fields, prototype=prototype + return await _get_coordinate_selection( + self.store_path, + self.metadata, + self.codec_pipeline, + self._config, + selection, + out=out, + fields=fields, + prototype=prototype, ) - if hasattr(out_array, "shape"): - # restore shape - out_array = np.array(out_array).reshape(indexer.sel_shape) - return out_array - async def _save_metadata(self, metadata: ArrayMetadata, ensure_parents: bool = False) -> None: """ Asynchronously save the array metadata. @@ -1709,56 +1677,15 @@ async def _set_selection( prototype: BufferPrototype, fields: Fields | None = None, ) -> None: - # check fields are sensible - check_fields(fields, self.dtype) - fields = check_no_multi_fields(fields) - - # check value shape - if np.isscalar(value): - array_like = prototype.buffer.create_zero_length().as_array_like() - if isinstance(array_like, np._typing._SupportsArrayFunc): - # TODO: need to handle array types that don't support __array_function__ - # like PyTorch and JAX - array_like_ = cast("np._typing._SupportsArrayFunc", array_like) - value = np.asanyarray(value, dtype=self.dtype, like=array_like_) - else: - if not hasattr(value, "shape"): - value = np.asarray(value, self.dtype) - # assert ( - # value.shape == indexer.shape - # ), f"shape of value doesn't match indexer shape. Expected {indexer.shape}, got {value.shape}" - if not hasattr(value, "dtype") or value.dtype.name != self.dtype.name: - if hasattr(value, "astype"): - # Handle things that are already NDArrayLike more efficiently - value = value.astype(dtype=self.dtype, order="A") - else: - value = np.array(value, dtype=self.dtype, order="A") - value = cast("NDArrayLike", value) - - # We accept any ndarray like object from the user and convert it - # to an NDBuffer (or subclass). From this point onwards, we only pass - # Buffer and NDBuffer between components. - value_buffer = prototype.nd_buffer.from_ndarray_like(value) - - # need to use the order from the metadata for v2 - _config = self._config - if self.metadata.zarr_format == 2: - _config = replace(_config, order=self.metadata.order) - - # merging with existing data and encoding chunks - await self.codec_pipeline.write( - [ - ( - self.store_path / self.metadata.encode_chunk_key(chunk_coords), - self.metadata.get_chunk_spec(chunk_coords, _config, prototype), - chunk_selection, - out_selection, - is_complete_chunk, - ) - for chunk_coords, chunk_selection, out_selection, is_complete_chunk in indexer - ], - value_buffer, - drop_axes=indexer.drop_axes, + return await _set_selection( + self.store_path, + self.metadata, + self.codec_pipeline, + self._config, + indexer, + value, + prototype=prototype, + fields=fields, ) async def setitem( @@ -1800,14 +1727,15 @@ async def setitem( - This method is asynchronous and should be awaited. - Supports basic indexing, where the selection is contiguous and does not involve advanced indexing. """ - if prototype is None: - prototype = default_buffer_prototype() - indexer = BasicIndexer( + return await _setitem( + self.store_path, + self.metadata, + self.codec_pipeline, + self._config, selection, - shape=self.metadata.shape, - chunk_grid=self.metadata.chunk_grid, + value, + prototype=prototype, ) - return await self._set_selection(indexer, value, prototype=prototype) @property def oindex(self) -> AsyncOIndex[T_ArrayMetadata]: @@ -1849,32 +1777,7 @@ async def resize(self, new_shape: ShapeLike, delete_outside_chunks: bool = True) ----- - This method is asynchronous and should be awaited. """ - new_shape = parse_shapelike(new_shape) - assert len(new_shape) == len(self.metadata.shape) - new_metadata = self.metadata.update_shape(new_shape) - - if delete_outside_chunks: - # Remove all chunks outside of the new shape - old_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(self.metadata.shape)) - new_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(new_shape)) - - async def _delete_key(key: str) -> None: - await (self.store_path / key).delete() - - await concurrent_map( - [ - (self.metadata.encode_chunk_key(chunk_coords),) - for chunk_coords in old_chunk_coords.difference(new_chunk_coords) - ], - _delete_key, - zarr_config.get("async.concurrency"), - ) - - # Write new metadata - await self._save_metadata(new_metadata) - - # Update metadata (in place) - object.__setattr__(self, "metadata", new_metadata) + return await _resize(self, new_shape, delete_outside_chunks) async def append(self, data: npt.ArrayLike, axis: int = 0) -> tuple[int, ...]: """Append `data` to `axis`. @@ -1895,40 +1798,7 @@ async def append(self, data: npt.ArrayLike, axis: int = 0) -> tuple[int, ...]: The size of all dimensions other than `axis` must match between this array and `data`. """ - # ensure data is array-like - if not hasattr(data, "shape"): - data = np.asanyarray(data) - - self_shape_preserved = tuple(s for i, s in enumerate(self.shape) if i != axis) - data_shape_preserved = tuple(s for i, s in enumerate(data.shape) if i != axis) - if self_shape_preserved != data_shape_preserved: - raise ValueError( - f"shape of data to append is not compatible with the array. " - f"The shape of the data is ({data_shape_preserved})" - f"and the shape of the array is ({self_shape_preserved})." - "All dimensions must match except for the dimension being " - "appended." - ) - # remember old shape - old_shape = self.shape - - # determine new shape - new_shape = tuple( - self.shape[i] if i != axis else self.shape[i] + data.shape[i] - for i in range(len(self.shape)) - ) - - # resize - await self.resize(new_shape) - - # store data - append_selection = tuple( - slice(None) if i != axis else slice(old_shape[i], new_shape[i]) - for i in range(len(self.shape)) - ) - await self.setitem(append_selection, data) - - return new_shape + return await _append(self, data, axis) async def update_attributes(self, new_attributes: dict[str, JSON]) -> Self: """ @@ -1956,11 +1826,7 @@ async def update_attributes(self, new_attributes: dict[str, JSON]) -> Self: - The updated attributes will be merged with existing attributes, and any conflicts will be overwritten by the new values. """ - self.metadata.attributes.update(new_attributes) - - # Write new metadata - await self._save_metadata(self.metadata) - + await _update_attributes(self, new_attributes) return self def __repr__(self) -> str: @@ -2017,10 +1883,7 @@ async def info_complete(self) -> Any: ------- [zarr.AsyncArray.info][] - A property giving just the statically known information about an array. """ - return self._info( - await self._nshards_initialized(), - await self.store_path.store.getsize_prefix(self.store_path.path), - ) + return await _info_complete(self) def _info( self, count_chunks_initialized: int | None = None, count_bytes_stored: int | None = None @@ -5518,3 +5381,685 @@ def _iter_chunk_regions( return _iter_regions( array.shape, array.chunks, origin=origin, selection_shape=selection_shape, trim_excess=True ) + + +async def _nchunks_initialized( + array: AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata], +) -> int: + """ + Calculate the number of chunks that have been initialized in storage. + + This value is calculated as the product of the number of initialized shards and the number + of chunks per shard. For arrays that do not use sharding, the number of chunks per shard is + effectively 1, and in that case the number of chunks initialized is the same as the number + of stored objects associated with an array. + + Parameters + ---------- + array : AsyncArray + The array to inspect. + + Returns + ------- + nchunks_initialized : int + The number of chunks that have been initialized. + """ + if array.shards is None: + chunks_per_shard = 1 + else: + chunks_per_shard = product( + tuple(a // b for a, b in zip(array.shards, array.chunks, strict=True)) + ) + return (await _nshards_initialized(array)) * chunks_per_shard + + +async def _nshards_initialized( + array: AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata], +) -> int: + """ + Calculate the number of shards that have been initialized in storage. + + This is the number of shards that have been persisted to the storage backend. + + Parameters + ---------- + array : AsyncArray + The array to inspect. + + Returns + ------- + nshards_initialized : int + The number of shards that have been initialized. + """ + return len(await _shards_initialized(array)) + + +async def _nbytes_stored( + store_path: StorePath, +) -> int: + """ + Calculate the number of bytes stored for an array. + + Parameters + ---------- + store_path : StorePath + The store path of the array. + + Returns + ------- + nbytes_stored : int + The number of bytes stored. + """ + return await store_path.store.getsize_prefix(store_path.path) + + +async def _get_selection( + store_path: StorePath, + metadata: ArrayMetadata, + codec_pipeline: CodecPipeline, + config: ArrayConfig, + indexer: Indexer, + *, + prototype: BufferPrototype, + out: NDBuffer | None = None, + fields: Fields | None = None, +) -> NDArrayLikeOrScalar: + """ + Get a selection from an array. + + Parameters + ---------- + store_path : StorePath + The store path of the array. + metadata : ArrayMetadata + The array metadata. + codec_pipeline : CodecPipeline + The codec pipeline for encoding/decoding. + config : ArrayConfig + The array configuration. + indexer : Indexer + The indexer specifying the selection. + prototype : BufferPrototype + A buffer prototype to use for the retrieved data. + out : NDBuffer | None, optional + An output buffer to write the data to. + fields : Fields | None, optional + Fields to select from structured arrays. + + Returns + ------- + NDArrayLikeOrScalar + The selected data. + """ + # Get dtype from metadata + if metadata.zarr_format == 2: + zdtype = metadata.dtype + else: + zdtype = metadata.data_type + dtype = zdtype.to_native_dtype() + + # Determine memory order + if metadata.zarr_format == 2: + order = metadata.order + else: + order = config.order + + # check fields are sensible + out_dtype = check_fields(fields, dtype) + + # setup output buffer + if out is not None: + if isinstance(out, NDBuffer): + out_buffer = out + else: + raise TypeError(f"out argument needs to be an NDBuffer. Got {type(out)!r}") + if out_buffer.shape != indexer.shape: + raise ValueError( + f"shape of out argument doesn't match. Expected {indexer.shape}, got {out.shape}" + ) + else: + out_buffer = prototype.nd_buffer.empty( + shape=indexer.shape, + dtype=out_dtype, + order=order, + ) + if product(indexer.shape) > 0: + # need to use the order from the metadata for v2 + _config = config + if metadata.zarr_format == 2: + _config = replace(_config, order=order) + + # reading chunks and decoding them + await codec_pipeline.read( + [ + ( + store_path / metadata.encode_chunk_key(chunk_coords), + metadata.get_chunk_spec(chunk_coords, _config, prototype=prototype), + chunk_selection, + out_selection, + is_complete_chunk, + ) + for chunk_coords, chunk_selection, out_selection, is_complete_chunk in indexer + ], + out_buffer, + drop_axes=indexer.drop_axes, + ) + if isinstance(indexer, BasicIndexer) and indexer.shape == (): + return out_buffer.as_scalar() + return out_buffer.as_ndarray_like() + + +async def _getitem( + store_path: StorePath, + metadata: ArrayMetadata, + codec_pipeline: CodecPipeline, + config: ArrayConfig, + selection: BasicSelection, + *, + prototype: BufferPrototype | None = None, +) -> NDArrayLikeOrScalar: + """ + Retrieve a subset of the array's data based on the provided selection. + + Parameters + ---------- + store_path : StorePath + The store path of the array. + metadata : ArrayMetadata + The array metadata. + codec_pipeline : CodecPipeline + The codec pipeline for encoding/decoding. + config : ArrayConfig + The array configuration. + selection : BasicSelection + A selection object specifying the subset of data to retrieve. + prototype : BufferPrototype, optional + A buffer prototype to use for the retrieved data (default is None). + + Returns + ------- + NDArrayLikeOrScalar + The retrieved subset of the array's data. + """ + if prototype is None: + prototype = default_buffer_prototype() + indexer = BasicIndexer( + selection, + shape=metadata.shape, + chunk_grid=metadata.chunk_grid, + ) + return await _get_selection( + store_path, metadata, codec_pipeline, config, indexer, prototype=prototype + ) + + +async def _get_orthogonal_selection( + store_path: StorePath, + metadata: ArrayMetadata, + codec_pipeline: CodecPipeline, + config: ArrayConfig, + selection: OrthogonalSelection, + *, + out: NDBuffer | None = None, + fields: Fields | None = None, + prototype: BufferPrototype | None = None, +) -> NDArrayLikeOrScalar: + """ + Get an orthogonal selection from the array. + + Parameters + ---------- + store_path : StorePath + The store path of the array. + metadata : ArrayMetadata + The array metadata. + codec_pipeline : CodecPipeline + The codec pipeline for encoding/decoding. + config : ArrayConfig + The array configuration. + selection : OrthogonalSelection + The orthogonal selection specification. + out : NDBuffer | None, optional + An output buffer to write the data to. + fields : Fields | None, optional + Fields to select from structured arrays. + prototype : BufferPrototype | None, optional + A buffer prototype to use for the retrieved data. + + Returns + ------- + NDArrayLikeOrScalar + The selected data. + """ + if prototype is None: + prototype = default_buffer_prototype() + indexer = OrthogonalIndexer(selection, metadata.shape, metadata.chunk_grid) + return await _get_selection( + store_path, + metadata, + codec_pipeline, + config, + indexer=indexer, + out=out, + fields=fields, + prototype=prototype, + ) + + +async def _get_mask_selection( + store_path: StorePath, + metadata: ArrayMetadata, + codec_pipeline: CodecPipeline, + config: ArrayConfig, + mask: MaskSelection, + *, + out: NDBuffer | None = None, + fields: Fields | None = None, + prototype: BufferPrototype | None = None, +) -> NDArrayLikeOrScalar: + """ + Get a mask selection from the array. + + Parameters + ---------- + store_path : StorePath + The store path of the array. + metadata : ArrayMetadata + The array metadata. + codec_pipeline : CodecPipeline + The codec pipeline for encoding/decoding. + config : ArrayConfig + The array configuration. + mask : MaskSelection + The boolean mask specifying the selection. + out : NDBuffer | None, optional + An output buffer to write the data to. + fields : Fields | None, optional + Fields to select from structured arrays. + prototype : BufferPrototype | None, optional + A buffer prototype to use for the retrieved data. + + Returns + ------- + NDArrayLikeOrScalar + The selected data. + """ + if prototype is None: + prototype = default_buffer_prototype() + indexer = MaskIndexer(mask, metadata.shape, metadata.chunk_grid) + return await _get_selection( + store_path, + metadata, + codec_pipeline, + config, + indexer=indexer, + out=out, + fields=fields, + prototype=prototype, + ) + + +async def _get_coordinate_selection( + store_path: StorePath, + metadata: ArrayMetadata, + codec_pipeline: CodecPipeline, + config: ArrayConfig, + selection: CoordinateSelection, + *, + out: NDBuffer | None = None, + fields: Fields | None = None, + prototype: BufferPrototype | None = None, +) -> NDArrayLikeOrScalar: + """ + Get a coordinate selection from the array. + + Parameters + ---------- + store_path : StorePath + The store path of the array. + metadata : ArrayMetadata + The array metadata. + codec_pipeline : CodecPipeline + The codec pipeline for encoding/decoding. + config : ArrayConfig + The array configuration. + selection : CoordinateSelection + The coordinate selection specification. + out : NDBuffer | None, optional + An output buffer to write the data to. + fields : Fields | None, optional + Fields to select from structured arrays. + prototype : BufferPrototype | None, optional + A buffer prototype to use for the retrieved data. + + Returns + ------- + NDArrayLikeOrScalar + The selected data. + """ + if prototype is None: + prototype = default_buffer_prototype() + indexer = CoordinateIndexer(selection, metadata.shape, metadata.chunk_grid) + out_array = await _get_selection( + store_path, + metadata, + codec_pipeline, + config, + indexer=indexer, + out=out, + fields=fields, + prototype=prototype, + ) + + if hasattr(out_array, "shape"): + # restore shape + out_array = np.array(out_array).reshape(indexer.sel_shape) + return out_array + + +async def _set_selection( + store_path: StorePath, + metadata: ArrayMetadata, + codec_pipeline: CodecPipeline, + config: ArrayConfig, + indexer: Indexer, + value: npt.ArrayLike, + *, + prototype: BufferPrototype, + fields: Fields | None = None, +) -> None: + """ + Set a selection in an array. + + Parameters + ---------- + store_path : StorePath + The store path of the array. + metadata : ArrayMetadata + The array metadata. + codec_pipeline : CodecPipeline + The codec pipeline for encoding/decoding. + config : ArrayConfig + The array configuration. + indexer : Indexer + The indexer specifying the selection. + value : npt.ArrayLike + The values to write. + prototype : BufferPrototype + A buffer prototype to use. + fields : Fields | None, optional + Fields to select from structured arrays. + """ + # Get dtype from metadata + if metadata.zarr_format == 2: + zdtype = metadata.dtype + else: + zdtype = metadata.data_type + dtype = zdtype.to_native_dtype() + + # check fields are sensible + check_fields(fields, dtype) + fields = check_no_multi_fields(fields) + + # check value shape + if np.isscalar(value): + array_like = prototype.buffer.create_zero_length().as_array_like() + if isinstance(array_like, np._typing._SupportsArrayFunc): + # TODO: need to handle array types that don't support __array_function__ + # like PyTorch and JAX + array_like_ = cast("np._typing._SupportsArrayFunc", array_like) + value = np.asanyarray(value, dtype=dtype, like=array_like_) + else: + if not hasattr(value, "shape"): + value = np.asarray(value, dtype) + # assert ( + # value.shape == indexer.shape + # ), f"shape of value doesn't match indexer shape. Expected {indexer.shape}, got {value.shape}" + if not hasattr(value, "dtype") or value.dtype.name != dtype.name: + if hasattr(value, "astype"): + # Handle things that are already NDArrayLike more efficiently + value = value.astype(dtype=dtype, order="A") + else: + value = np.array(value, dtype=dtype, order="A") + value = cast("NDArrayLike", value) + + # We accept any ndarray like object from the user and convert it + # to an NDBuffer (or subclass). From this point onwards, we only pass + # Buffer and NDBuffer between components. + value_buffer = prototype.nd_buffer.from_ndarray_like(value) + + # Determine memory order + if metadata.zarr_format == 2: + order = metadata.order + else: + order = config.order + + # need to use the order from the metadata for v2 + _config = config + if metadata.zarr_format == 2: + _config = replace(_config, order=order) + + # merging with existing data and encoding chunks + await codec_pipeline.write( + [ + ( + store_path / metadata.encode_chunk_key(chunk_coords), + metadata.get_chunk_spec(chunk_coords, _config, prototype), + chunk_selection, + out_selection, + is_complete_chunk, + ) + for chunk_coords, chunk_selection, out_selection, is_complete_chunk in indexer + ], + value_buffer, + drop_axes=indexer.drop_axes, + ) + + +async def _setitem( + store_path: StorePath, + metadata: ArrayMetadata, + codec_pipeline: CodecPipeline, + config: ArrayConfig, + selection: BasicSelection, + value: npt.ArrayLike, + prototype: BufferPrototype | None = None, +) -> None: + """ + Set values in the array using basic indexing. + + Parameters + ---------- + store_path : StorePath + The store path of the array. + metadata : ArrayMetadata + The array metadata. + codec_pipeline : CodecPipeline + The codec pipeline for encoding/decoding. + config : ArrayConfig + The array configuration. + selection : BasicSelection + The selection defining the region of the array to set. + value : npt.ArrayLike + The values to be written into the selected region of the array. + prototype : BufferPrototype or None, optional + A prototype buffer that defines the structure and properties of the array chunks being modified. + If None, the default buffer prototype is used. + """ + if prototype is None: + prototype = default_buffer_prototype() + indexer = BasicIndexer( + selection, + shape=metadata.shape, + chunk_grid=metadata.chunk_grid, + ) + return await _set_selection( + store_path, metadata, codec_pipeline, config, indexer, value, prototype=prototype + ) + + +async def _resize( + array: AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata], + new_shape: ShapeLike, + delete_outside_chunks: bool = True, +) -> None: + """ + Resize an array to a new shape. + + Parameters + ---------- + array : AsyncArray + The array to resize. + new_shape : ShapeLike + The desired new shape of the array. + delete_outside_chunks : bool, optional + If True (default), chunks that fall outside the new shape will be deleted. + If False, the data in those chunks will be preserved. + """ + new_shape = parse_shapelike(new_shape) + assert len(new_shape) == len(array.metadata.shape) + new_metadata = array.metadata.update_shape(new_shape) + + if delete_outside_chunks: + # Remove all chunks outside of the new shape + old_chunk_coords = set(array.metadata.chunk_grid.all_chunk_coords(array.metadata.shape)) + new_chunk_coords = set(array.metadata.chunk_grid.all_chunk_coords(new_shape)) + + async def _delete_key(key: str) -> None: + await (array.store_path / key).delete() + + await concurrent_map( + [ + (array.metadata.encode_chunk_key(chunk_coords),) + for chunk_coords in old_chunk_coords.difference(new_chunk_coords) + ], + _delete_key, + zarr_config.get("async.concurrency"), + ) + + # Write new metadata + await save_metadata(array.store_path, new_metadata) + + # Update metadata (in place) + object.__setattr__(array, "metadata", new_metadata) + + +async def _append( + array: AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata], + data: npt.ArrayLike, + axis: int = 0, +) -> tuple[int, ...]: + """ + Append data to an array along the specified axis. + + Parameters + ---------- + array : AsyncArray + The array to append to. + data : npt.ArrayLike + Data to be appended. + axis : int + Axis along which to append. + + Returns + ------- + new_shape : tuple[int, ...] + The new shape of the array after appending. + + Notes + ----- + The size of all dimensions other than `axis` must match between the + array and `data`. + """ + # ensure data is array-like + if not hasattr(data, "shape"): + data = np.asanyarray(data) + + self_shape_preserved = tuple(s for i, s in enumerate(array.shape) if i != axis) + data_shape_preserved = tuple(s for i, s in enumerate(data.shape) if i != axis) + if self_shape_preserved != data_shape_preserved: + raise ValueError( + f"shape of data to append is not compatible with the array. " + f"The shape of the data is ({data_shape_preserved})" + f"and the shape of the array is ({self_shape_preserved})." + "All dimensions must match except for the dimension being " + "appended." + ) + # remember old shape + old_shape = array.shape + + # determine new shape + new_shape = tuple( + array.shape[i] if i != axis else array.shape[i] + data.shape[i] + for i in range(len(array.shape)) + ) + + # resize + await _resize(array, new_shape) + + # store data + append_selection = tuple( + slice(None) if i != axis else slice(old_shape[i], new_shape[i]) + for i in range(len(array.shape)) + ) + await _setitem( + array.store_path, + array.metadata, + array.codec_pipeline, + array._config, + append_selection, + data, + ) + + return new_shape + + +async def _update_attributes( + array: AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata], + new_attributes: dict[str, JSON], +) -> AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata]: + """ + Update the array's attributes. + + Parameters + ---------- + array : AsyncArray + The array whose attributes to update. + new_attributes : dict[str, JSON] + A dictionary of new attributes to update or add to the array. + + Returns + ------- + AsyncArray + The array with the updated attributes. + """ + array.metadata.attributes.update(new_attributes) + + # Write new metadata + await save_metadata(array.store_path, array.metadata) + + return array + + +async def _info_complete( + array: AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata], +) -> Any: + """ + Return all the information for an array, including dynamic information like storage size. + + Parameters + ---------- + array : AsyncArray + The array to get info for. + + Returns + ------- + ArrayInfo + Complete information about the array including: + - The count of chunks initialized + - The sum of the bytes written + """ + return array._info( + await _nshards_initialized(array), + await array.store_path.store.getsize_prefix(array.store_path.path), + )