diff --git a/src/funlib/persistence/__init__.py b/src/funlib/persistence/__init__.py index b893c3c..850f579 100644 --- a/src/funlib/persistence/__init__.py +++ b/src/funlib/persistence/__init__.py @@ -1,4 +1,4 @@ from .arrays import Array, open_ds, prepare_ds, open_ome_ds, prepare_ome_ds # noqa -__version__ = "0.7.0" +__version__ = "0.7.1" __version_info__ = tuple(int(i) for i in __version__.split(".")) diff --git a/src/funlib/persistence/arrays/array.py b/src/funlib/persistence/arrays/array.py index 9acf3f5..b679830 100644 --- a/src/funlib/persistence/arrays/array.py +++ b/src/funlib/persistence/arrays/array.py @@ -74,6 +74,16 @@ class Array(Freezable): offset, voxel_size, axis_names, units, and types. Metadata can either be passed in or read from array attributes. + scheduler (``Optional[str]``): + + The scheduler to use for dask operations. If not provided, a + single-threaded scheduler is used. This is the default due to + the fact that we regularly use `Array` objects in a multi-threaded + context already, which can often lead to race conditions and + deadlocks with the default dask scheduler. + See `https://docs.dask.org/en/stable/scheduler-overview.html` for + more details on the dask scheduler. + """ data: da.Array @@ -89,7 +99,9 @@ def __init__( chunks: Optional[Union[int, Sequence[int], str]] = "auto", lazy_op: Optional[LazyOp] = None, strict_metadata: bool = False, + scheduler: Optional[str] = "single-threaded" ): + self.scheduler = scheduler if not isinstance(data, da.Array): self.data = da.from_array(data, chunks=chunks) else: @@ -329,7 +341,7 @@ def __getitem__(self, key) -> np.ndarray: % (roi, self.roi) ) - return self.data[self.__slices(roi, use_lazy_slices=False)].compute() + return self.data[self.__slices(roi, use_lazy_slices=False)].compute(scheduler=self.scheduler) elif isinstance(key, Coordinate): coordinate = key @@ -338,10 +350,10 @@ def __getitem__(self, key) -> np.ndarray: raise IndexError("Requested coordinate is not contained in this array.") index = self.__index(coordinate) - return self.data[index].compute() + return self.data[index].compute(scheduler=self.scheduler) else: - return self.data[key].compute() + return self.data[key].compute(scheduler=self.scheduler) def __setitem__(self, key: Roi | slice | tuple, value: np.ndarray | float | int): """Set the data of this array. diff --git a/src/funlib/persistence/arrays/datasets.py b/src/funlib/persistence/arrays/datasets.py index 9df0c24..57eb1f3 100644 --- a/src/funlib/persistence/arrays/datasets.py +++ b/src/funlib/persistence/arrays/datasets.py @@ -233,6 +233,7 @@ def prepare_ds( if existing_array is not None: data_compatible = True + inconsistencies = [] # data incompatibilities if shape != existing_array.shape: @@ -240,6 +241,7 @@ def prepare_ds( "Shapes differ: given (%s) vs parsed (%s)", shape, existing_array.shape ) data_compatible = False + inconsistencies.append(("Shape", shape, existing_array.shape)) if ( chunk_shape is not None @@ -251,12 +253,16 @@ def prepare_ds( existing_array._source_data.chunks, ) data_compatible = False + inconsistencies.append( + ("Chunk shape", chunk_shape, existing_array._source_data.chunks) + ) if dtype != existing_array.dtype: logger.info( "dtypes differ: given (%s) vs parsed (%s)", dtype, existing_array.dtype ) data_compatible = False + inconsistencies.append(("dtype", dtype, existing_array.dtype)) metadata_compatible = True existing_metadata = metadata_format.parse( @@ -272,6 +278,9 @@ def prepare_ds( existing_metadata.voxel_size, ) metadata_compatible = False + inconsistencies.append( + ("Voxel size", given_metadata.voxel_size, existing_metadata.voxel_size) + ) if given_metadata.types != existing_metadata.types: logger.info( @@ -280,6 +289,9 @@ def prepare_ds( existing_metadata.types, ) metadata_compatible = False + inconsistencies.append( + ("Types", given_metadata.types, existing_metadata.types) + ) if given_metadata.axis_names != existing_metadata.axis_names: logger.info( @@ -288,6 +300,9 @@ def prepare_ds( existing_metadata.axis_names, ) metadata_compatible = False + inconsistencies.append( + ("Axis names", given_metadata.axis_names, existing_metadata.axis_names) + ) if given_metadata.units != existing_metadata.units: logger.info( @@ -296,6 +311,9 @@ def prepare_ds( existing_metadata.units, ) metadata_compatible = False + inconsistencies.append( + ("Units", given_metadata.units, existing_metadata.units) + ) if not data_compatible: logger.info( @@ -303,17 +321,19 @@ def prepare_ds( ) if mode != "w": raise PermissionError( - "Existing dataset is not compatible, but mode is not 'w'." + f"Existing dataset ({store}) is not compatible, but mode is not 'w'.\n" + '\n'.join(f"{name}: (given) {given} vs (parsed) {parsed}" for name, given, parsed in inconsistencies) ) elif not metadata_compatible: if mode == "r": raise PermissionError( - "Existing metadata is not compatible, but mode is 'r' and the metadata can't be udpated." + "Existing metadata is not compatible, but mode is 'r' and the metadata can't be udpated.\n" + '\n'.join(f"{name}: (given) {given} vs (parsed) {parsed}" for name, given, parsed in inconsistencies) ) else: if mode == "w": logger.info( - "Existing dataset is compatible, but mode is 'w' and thus the existing dataset will be deleted" + "Existing dataset is compatible, but mode is 'w' and thus the existing dataset will be deleted." ) else: ds = zarr.open(store, mode=mode, **kwargs)