Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@

from zarr.core.buffer import Buffer, BufferPrototype

__all__ = ["ByteGetter", "ByteSetter", "Store", "set_or_delete"]
__all__ = [
"ByteGetter",
"ByteSetter",
"Store",
"SupportsDeleteSync",
"SupportsGetSync",
"SupportsSetSync",
"SupportsSyncStore",
"set_or_delete",
]


@dataclass(frozen=True, slots=True)
Expand Down Expand Up @@ -700,6 +709,31 @@ async def delete(self) -> None: ...
async def set_if_not_exists(self, default: Buffer) -> None: ...


@runtime_checkable
class SupportsGetSync(Protocol):
def get_sync(
self,
key: str,
*,
prototype: BufferPrototype | None = None,
byte_range: ByteRequest | None = None,
) -> Buffer | None: ...


@runtime_checkable
class SupportsSetSync(Protocol):
def set_sync(self, key: str, value: Buffer) -> None: ...


@runtime_checkable
class SupportsDeleteSync(Protocol):
def delete_sync(self, key: str) -> None: ...


@runtime_checkable
class SupportsSyncStore(SupportsGetSync, SupportsSetSync, SupportsDeleteSync, Protocol): ...


async def set_or_delete(byte_setter: ByteSetter, value: Buffer | None) -> None:
"""Set or delete a value in a byte setter

Expand Down
39 changes: 38 additions & 1 deletion src/zarr/storage/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Self, TypeAlias

from zarr.abc.store import ByteRequest, Store
from zarr.abc.store import (
ByteRequest,
Store,
SupportsDeleteSync,
SupportsGetSync,
SupportsSetSync,
)
from zarr.core.buffer import Buffer, default_buffer_prototype
from zarr.core.common import (
ANY_ACCESS_MODE,
Expand Down Expand Up @@ -228,6 +234,37 @@ async def is_empty(self) -> bool:
"""
return await self.store.is_empty(self.path)

# -------------------------------------------------------------------
# Synchronous IO delegation
# -------------------------------------------------------------------

def get_sync(
self,
*,
prototype: BufferPrototype | None = None,
byte_range: ByteRequest | None = None,
) -> Buffer | None:
"""Synchronous read — delegates to ``self.store.get_sync(self.path, ...)``."""
if not isinstance(self.store, SupportsGetSync):
raise TypeError(f"Store {type(self.store).__name__} does not support synchronous get.")
if prototype is None:
prototype = default_buffer_prototype()
return self.store.get_sync(self.path, prototype=prototype, byte_range=byte_range)

def set_sync(self, value: Buffer) -> None:
"""Synchronous write — delegates to ``self.store.set_sync(self.path, value)``."""
if not isinstance(self.store, SupportsSetSync):
raise TypeError(f"Store {type(self.store).__name__} does not support synchronous set.")
self.store.set_sync(self.path, value)

def delete_sync(self) -> None:
"""Synchronous delete — delegates to ``self.store.delete_sync(self.path)``."""
if not isinstance(self.store, SupportsDeleteSync):
raise TypeError(
f"Store {type(self.store).__name__} does not support synchronous delete."
)
self.store.delete_sync(self.path)

def __truediv__(self, other: str) -> StorePath:
"""Combine this store path with another path"""
return self.__class__(self.store, _dereference_path(self.path, other))
Expand Down
50 changes: 50 additions & 0 deletions src/zarr/storage/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,56 @@ def __repr__(self) -> str:
def __eq__(self, other: object) -> bool:
return isinstance(other, type(self)) and self.root == other.root

# -------------------------------------------------------------------
# Synchronous store methods
# -------------------------------------------------------------------

def _ensure_open_sync(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. in our current Store design, creating an instance of a store doesn't necessarily "open" it, so we have an async open method that actually opens the store. Our async get/set methods guard against the store being un-open:

if not self._is_open:
await self._open()
, and our sync methods have to do the same thing here.

if not self._is_open:
if not self.read_only:
self.root.mkdir(parents=True, exist_ok=True)
if not self.root.exists():
raise FileNotFoundError(f"{self.root} does not exist")
self._is_open = True

def get_sync(
self,
key: str,
*,
prototype: BufferPrototype | None = None,
byte_range: ByteRequest | None = None,
) -> Buffer | None:
if prototype is None:
prototype = default_buffer_prototype()
self._ensure_open_sync()
assert isinstance(key, str)
path = self.root / key
try:
return _get(path, prototype, byte_range)
except (FileNotFoundError, IsADirectoryError, NotADirectoryError):
return None

def set_sync(self, key: str, value: Buffer) -> None:
self._ensure_open_sync()
self._check_writable()
assert isinstance(key, str)
if not isinstance(value, Buffer):
raise TypeError(
f"LocalStore.set(): `value` must be a Buffer instance. "
f"Got an instance of {type(value)} instead."
)
path = self.root / key
_put(path, value)

def delete_sync(self, key: str) -> None:
self._ensure_open_sync()
self._check_writable()
path = self.root / key
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink(missing_ok=True)

async def get(
self,
key: str,
Expand Down
44 changes: 43 additions & 1 deletion src/zarr/storage/_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,49 @@ def __eq__(self, other: object) -> bool:
and self.read_only == other.read_only
)

# -------------------------------------------------------------------
# Synchronous store methods
# -------------------------------------------------------------------

def get_sync(
self,
key: str,
*,
prototype: BufferPrototype | None = None,
byte_range: ByteRequest | None = None,
) -> Buffer | None:
if prototype is None:
prototype = default_buffer_prototype()
if not self._is_open:
self._is_open = True
assert isinstance(key, str)
try:
value = self._store_dict[key]
start, stop = _normalize_byte_range_index(value, byte_range)
return prototype.buffer.from_buffer(value[start:stop])
except KeyError:
return None

def set_sync(self, key: str, value: Buffer) -> None:
self._check_writable()
if not self._is_open:
self._is_open = True
assert isinstance(key, str)
if not isinstance(value, Buffer):
raise TypeError(
f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
)
self._store_dict[key] = value

def delete_sync(self, key: str) -> None:
self._check_writable()
if not self._is_open:
self._is_open = True
try:
del self._store_dict[key]
except KeyError:
logger.debug("Key %s does not exist.", key)

async def get(
self,
key: str,
Expand Down Expand Up @@ -122,7 +165,6 @@ async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None
raise TypeError(
f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
)

if byte_range is not None:
buf = self._store_dict[key]
buf[byte_range[0] : byte_range[1]] = value
Expand Down
71 changes: 70 additions & 1 deletion src/zarr/testing/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
if TYPE_CHECKING:
from typing import Any

from zarr.abc.store import ByteRequest
from zarr.core.buffer.core import BufferPrototype

import pytest
Expand All @@ -22,6 +21,9 @@
RangeByteRequest,
Store,
SuffixByteRequest,
SupportsDeleteSync,
SupportsGetSync,
SupportsSetSync,
)
from zarr.core.buffer import Buffer, default_buffer_prototype
from zarr.core.sync import _collect_aiterator, sync
Expand All @@ -39,6 +41,27 @@ class StoreTests(Generic[S, B]):
store_cls: type[S]
buffer_cls: type[B]

@staticmethod
def _require_get_sync(store: S) -> SupportsGetSync:
"""Skip unless *store* implements :class:`SupportsGetSync`."""
if not isinstance(store, SupportsGetSync):
pytest.skip("store does not implement SupportsGetSync")
return store # type: ignore[unreachable]

@staticmethod
def _require_set_sync(store: S) -> SupportsSetSync:
"""Skip unless *store* implements :class:`SupportsSetSync`."""
if not isinstance(store, SupportsSetSync):
pytest.skip("store does not implement SupportsSetSync")
return store # type: ignore[unreachable]

@staticmethod
def _require_delete_sync(store: S) -> SupportsDeleteSync:
"""Skip unless *store* implements :class:`SupportsDeleteSync`."""
if not isinstance(store, SupportsDeleteSync):
pytest.skip("store does not implement SupportsDeleteSync")
return store # type: ignore[unreachable]

@abstractmethod
async def set(self, store: S, key: str, value: Buffer) -> None:
"""
Expand Down Expand Up @@ -579,6 +602,52 @@ def test_get_json_sync(self, store: S) -> None:
sync(self.set(store, key, self.buffer_cls.from_bytes(data_bytes)))
assert store._get_json_sync(key, prototype=default_buffer_prototype()) == data

# -------------------------------------------------------------------
# Synchronous store methods (SupportsSyncStore protocol)
# -------------------------------------------------------------------

def test_get_sync(self, store: S) -> None:
getter = self._require_get_sync(store)
data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
key = "sync_get"
sync(self.set(store, key, data_buf))
result = getter.get_sync(key)
assert result is not None
assert_bytes_equal(result, data_buf)

def test_get_sync_missing(self, store: S) -> None:
getter = self._require_get_sync(store)
result = getter.get_sync("nonexistent")
assert result is None

def test_set_sync(self, store: S) -> None:
setter = self._require_set_sync(store)
data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
key = "sync_set"
setter.set_sync(key, data_buf)
result = sync(self.get(store, key))
assert_bytes_equal(result, data_buf)

def test_delete_sync(self, store: S) -> None:
setter = self._require_set_sync(store)
deleter = self._require_delete_sync(store)
getter = self._require_get_sync(store)
if not store.supports_deletes:
pytest.skip("store does not support deletes")
data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
key = "sync_delete"
setter.set_sync(key, data_buf)
deleter.delete_sync(key)
result = getter.get_sync(key)
assert result is None

def test_delete_sync_missing(self, store: S) -> None:
deleter = self._require_delete_sync(store)
if not store.supports_deletes:
pytest.skip("store does not support deletes")
# should not raise
deleter.delete_sync("nonexistent_sync")


class LatencyStore(WrapperStore[Store]):
"""
Expand Down
29 changes: 27 additions & 2 deletions tests/test_codecs/test_blosc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
from packaging.version import Version

import zarr
from zarr.abc.codec import SupportsSyncCodec
from zarr.codecs import BloscCodec
from zarr.codecs.blosc import BloscShuffle, Shuffle
from zarr.core.array_spec import ArraySpec
from zarr.core.array_spec import ArrayConfig, ArraySpec
from zarr.core.buffer import default_buffer_prototype
from zarr.core.dtype import UInt16
from zarr.core.dtype import UInt16, get_data_type_from_native_dtype
from zarr.storage import MemoryStore, StorePath


Expand Down Expand Up @@ -110,3 +111,27 @@ async def test_typesize() -> None:
else:
expected_size = 10216
assert size == expected_size, msg


def test_blosc_codec_supports_sync() -> None:
assert isinstance(BloscCodec(), SupportsSyncCodec)


def test_blosc_codec_sync_roundtrip() -> None:
codec = BloscCodec(typesize=8)
arr = np.arange(100, dtype="float64")
zdtype = get_data_type_from_native_dtype(arr.dtype)
spec = ArraySpec(
shape=arr.shape,
dtype=zdtype,
fill_value=zdtype.cast_scalar(0),
config=ArrayConfig(order="C", write_empty_chunks=True),
prototype=default_buffer_prototype(),
)
buf = default_buffer_prototype().buffer.from_array_like(arr.view("B"))

encoded = codec._encode_sync(buf, spec)
assert encoded is not None
decoded = codec._decode_sync(encoded, spec)
result = np.frombuffer(decoded.as_numpy_array(), dtype="float64")
np.testing.assert_array_equal(arr, result)
33 changes: 33 additions & 0 deletions tests/test_codecs/test_crc32c.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from __future__ import annotations

import numpy as np

from zarr.abc.codec import SupportsSyncCodec
from zarr.codecs.crc32c_ import Crc32cCodec
from zarr.core.array_spec import ArrayConfig, ArraySpec
from zarr.core.buffer import default_buffer_prototype
from zarr.core.dtype import get_data_type_from_native_dtype


def test_crc32c_codec_supports_sync() -> None:
assert isinstance(Crc32cCodec(), SupportsSyncCodec)


def test_crc32c_codec_sync_roundtrip() -> None:
codec = Crc32cCodec()
arr = np.arange(100, dtype="float64")
zdtype = get_data_type_from_native_dtype(arr.dtype)
spec = ArraySpec(
shape=arr.shape,
dtype=zdtype,
fill_value=zdtype.cast_scalar(0),
config=ArrayConfig(order="C", write_empty_chunks=True),
prototype=default_buffer_prototype(),
)
buf = default_buffer_prototype().buffer.from_array_like(arr.view("B"))

encoded = codec._encode_sync(buf, spec)
assert encoded is not None
decoded = codec._decode_sync(encoded, spec)
result = np.frombuffer(decoded.as_numpy_array(), dtype="float64")
np.testing.assert_array_equal(arr, result)
Loading
Loading