Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/funlib/persistence/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .arrays import Array, open_ds, prepare_ds, open_ome_ds, prepare_ome_ds # noqa

__version__ = "0.7.0"
__version__ = "0.7.1"
__version_info__ = tuple(int(i) for i in __version__.split("."))
18 changes: 15 additions & 3 deletions src/funlib/persistence/arrays/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ class Array(Freezable):
offset, voxel_size, axis_names, units, and types. Metadata
can either be passed in or read from array attributes.

scheduler (``Optional[str]``):

The scheduler to use for dask operations. If not provided, a
single-threaded scheduler is used. This is the default due to
the fact that we regularly use `Array` objects in a multi-threaded
context already, which can often lead to race conditions and
deadlocks with the default dask scheduler.
See `https://docs.dask.org/en/stable/scheduler-overview.html` for
more details on the dask scheduler.

"""

data: da.Array
Expand All @@ -89,7 +99,9 @@ def __init__(
chunks: Optional[Union[int, Sequence[int], str]] = "auto",
lazy_op: Optional[LazyOp] = None,
strict_metadata: bool = False,
scheduler: Optional[str] = "single-threaded"
):
self.scheduler = scheduler
if not isinstance(data, da.Array):
self.data = da.from_array(data, chunks=chunks)
else:
Expand Down Expand Up @@ -329,7 +341,7 @@ def __getitem__(self, key) -> np.ndarray:
% (roi, self.roi)
)

return self.data[self.__slices(roi, use_lazy_slices=False)].compute()
return self.data[self.__slices(roi, use_lazy_slices=False)].compute(scheduler=self.scheduler)

elif isinstance(key, Coordinate):
coordinate = key
Expand All @@ -338,10 +350,10 @@ def __getitem__(self, key) -> np.ndarray:
raise IndexError("Requested coordinate is not contained in this array.")

index = self.__index(coordinate)
return self.data[index].compute()
return self.data[index].compute(scheduler=self.scheduler)

else:
return self.data[key].compute()
return self.data[key].compute(scheduler=self.scheduler)

def __setitem__(self, key: Roi | slice | tuple, value: np.ndarray | float | int):
"""Set the data of this array.
Expand Down
26 changes: 23 additions & 3 deletions src/funlib/persistence/arrays/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,15 @@ def prepare_ds(

if existing_array is not None:
data_compatible = True
inconsistencies = []

# data incompatibilities
if shape != existing_array.shape:
logger.info(
"Shapes differ: given (%s) vs parsed (%s)", shape, existing_array.shape
)
data_compatible = False
inconsistencies.append(("Shape", shape, existing_array.shape))

if (
chunk_shape is not None
Expand All @@ -251,12 +253,16 @@ def prepare_ds(
existing_array._source_data.chunks,
)
data_compatible = False
inconsistencies.append(
("Chunk shape", chunk_shape, existing_array._source_data.chunks)
)

if dtype != existing_array.dtype:
logger.info(
"dtypes differ: given (%s) vs parsed (%s)", dtype, existing_array.dtype
)
data_compatible = False
inconsistencies.append(("dtype", dtype, existing_array.dtype))

metadata_compatible = True
existing_metadata = metadata_format.parse(
Expand All @@ -272,6 +278,9 @@ def prepare_ds(
existing_metadata.voxel_size,
)
metadata_compatible = False
inconsistencies.append(
("Voxel size", given_metadata.voxel_size, existing_metadata.voxel_size)
)

if given_metadata.types != existing_metadata.types:
logger.info(
Expand All @@ -280,6 +289,9 @@ def prepare_ds(
existing_metadata.types,
)
metadata_compatible = False
inconsistencies.append(
("Types", given_metadata.types, existing_metadata.types)
)

if given_metadata.axis_names != existing_metadata.axis_names:
logger.info(
Expand All @@ -288,6 +300,9 @@ def prepare_ds(
existing_metadata.axis_names,
)
metadata_compatible = False
inconsistencies.append(
("Axis names", given_metadata.axis_names, existing_metadata.axis_names)
)

if given_metadata.units != existing_metadata.units:
logger.info(
Expand All @@ -296,24 +311,29 @@ def prepare_ds(
existing_metadata.units,
)
metadata_compatible = False
inconsistencies.append(
("Units", given_metadata.units, existing_metadata.units)
)

if not data_compatible:
logger.info(
"Existing dataset is not compatible, attempting to create a new one"
)
if mode != "w":
raise PermissionError(
"Existing dataset is not compatible, but mode is not 'w'."
f"Existing dataset ({store}) is not compatible, but mode is not 'w'.\n"
'\n'.join(f"{name}: (given) {given} vs (parsed) {parsed}" for name, given, parsed in inconsistencies)
)
elif not metadata_compatible:
if mode == "r":
raise PermissionError(
"Existing metadata is not compatible, but mode is 'r' and the metadata can't be udpated."
"Existing metadata is not compatible, but mode is 'r' and the metadata can't be udpated.\n"
'\n'.join(f"{name}: (given) {given} vs (parsed) {parsed}" for name, given, parsed in inconsistencies)
)
else:
if mode == "w":
logger.info(
"Existing dataset is compatible, but mode is 'w' and thus the existing dataset will be deleted"
"Existing dataset is compatible, but mode is 'w' and thus the existing dataset will be deleted."
)
else:
ds = zarr.open(store, mode=mode, **kwargs)
Expand Down