diff --git a/src/datajoint/codecs.py b/src/datajoint/codecs.py index 2719e9509..c68e33413 100644 --- a/src/datajoint/codecs.py +++ b/src/datajoint/codecs.py @@ -38,6 +38,7 @@ class MyTable(dj.Manual): from __future__ import annotations +import json import logging from abc import ABC, abstractmethod from typing import Any @@ -219,6 +220,45 @@ def validate(self, value: Any) -> None: """ pass + def referenced_paths(self, stored: Any) -> list[tuple[str, str | None]]: + """ + Return the external store paths this stored value references. + + Garbage collection and delete-time cleanup call this on the *stored* + (encoded) representation of a column value — the JSON/dict already in + the database — so discovery stays metadata-only (no download or decode). + + The default recognizes DataJoint's standard external-storage metadata: + a dict (or JSON string) carrying a ``path`` and optional ``store``. This + covers hash-addressed (````/````/````) and + schema-addressed (````/```` and any :class:`SchemaCodec` + subclass) storage with no extra work, and returns an empty list for + values that are not externally stored (e.g. in-table ````). + + Override only if your codec stores external references in a non-standard + shape. Returning the paths here is what lets garbage collection see a + custom codec's files as *referenced* rather than orphaned (see #1469). + + Parameters + ---------- + stored : any + The stored (encoded) value as read from the database column. + + Returns + ------- + list[tuple[str, str | None]] + ``(path, store_name)`` for each external artifact referenced. + """ + value = stored + if isinstance(value, str): + try: + value = json.loads(value) + except (json.JSONDecodeError, TypeError, ValueError): + return [] + if isinstance(value, dict) and "path" in value: + return [(value["path"], value.get("store"))] + return [] + def __repr__(self) -> str: return f"<{self.__class__.__name__}(name={self.name!r})>" diff --git a/src/datajoint/gc.py b/src/datajoint/gc.py index 8c87efd84..80b862e93 100644 --- a/src/datajoint/gc.py +++ b/src/datajoint/gc.py @@ -108,8 +108,12 @@ def _uses_schema_storage(attr) -> bool: if not attr.codec: return False - codec_name = getattr(attr.codec, "name", "") - return codec_name in ("object", "npy") + # Recognize schema-addressed storage by type, not by a hardcoded codec name, + # so custom SchemaCodec subclasses (e.g. a user's NetCDF codec) are seen by + # GC and their live files are not misclassified as orphans (#1469). + from .builtin_codecs.schema import SchemaCodec + + return isinstance(attr.codec, SchemaCodec) def _extract_hash_refs(value: Any) -> list[tuple[str, str | None]]: @@ -231,12 +235,13 @@ def scan_hash_references( # Read raw JSON metadata via cursor — bypasses decode_attribute # so we get the stored dict (PostgreSQL/JSONB) or JSON string - # (MySQL), not the decoded codec output. _extract_hash_refs - # handles both shapes. + # (MySQL), not the decoded codec output. The codec's own + # referenced_paths() extracts the referenced paths and handles + # both shapes (codec-driven discovery, #1469). try: cursor = table.proj(attr_name).cursor(as_dict=True) for row in cursor: - for path, ref_store in _extract_hash_refs(row[attr_name]): + for path, ref_store in attr.codec.referenced_paths(row[attr_name]): # Filter by store if specified if store_name is None or ref_store == store_name: referenced.add(path) @@ -296,12 +301,13 @@ def scan_schema_references( # Read raw JSON metadata via cursor — bypasses decode_attribute # so we get the stored dict (PostgreSQL/JSONB) or JSON string - # (MySQL), not the decoded codec output. _extract_schema_refs - # handles both shapes. + # (MySQL), not the decoded codec output. The codec's own + # referenced_paths() extracts the referenced paths and handles + # both shapes (codec-driven discovery, #1469). try: cursor = table.proj(attr_name).cursor(as_dict=True) for row in cursor: - for path, ref_store in _extract_schema_refs(row[attr_name]): + for path, ref_store in attr.codec.referenced_paths(row[attr_name]): # Filter by store if specified if store_name is None or ref_store == store_name: referenced.add(path) @@ -379,10 +385,14 @@ def list_stored_hashes(store_name: str | None = None, config=None) -> dict[str, def list_schema_paths(store_name: str | None = None, config=None) -> dict[str, int]: """ - List all schema-addressed items in storage. + List all schema-addressed object files in storage. - Scans for directories matching the schema-addressed storage pattern: - ``{schema}/{table}/{pk}/{field}/`` + Enumerates the individual object **files** written by schema-addressed + codecs, whose paths follow ``{schema}/{table}/{pk}/{field}_{token}[.ext]``. + Returning file paths (rather than the enclosing directory) is what lets + them match the file paths recorded in each row's stored metadata — the two + must be comparable for orphan detection, and per-token granularity lets + superseded versions be reclaimed while the current token is retained. Parameters ---------- @@ -394,13 +404,13 @@ def list_schema_paths(store_name: str | None = None, config=None) -> dict[str, i Returns ------- dict[str, int] - Dict mapping storage path to size in bytes. + Dict mapping each object file's relative path to its size in bytes. """ backend = get_store_backend(store_name, config=config) stored: dict[str, int] = {} try: - # Walk the storage looking for schema-addressed paths + # Walk the storage collecting schema-addressed object files full_prefix = backend._full_path("") for root, dirs, files in backend.fs.walk(full_prefix): @@ -408,26 +418,22 @@ def list_schema_paths(store_name: str | None = None, config=None) -> dict[str, i if "_hash" in root: continue - # Look for schema-addressed pattern (has files, not in _hash) - # Schema-addressed paths: {schema}/{table}/{pk}/{field}/ - relative_path = root.replace(full_prefix, "").lstrip("/") + for filename in files: + # Skip manifest sidecars (mirrors list_stored_hashes) + if filename.endswith(".manifest.json"): + continue - # Skip empty paths and root-level directories - if not relative_path or relative_path.count("/") < 2: - continue + file_path = f"{root}/{filename}" + relative_path = file_path.replace(full_prefix, "").lstrip("/") + + # Schema-addressed files live at least at {schema}/{table}/.../{file} + if relative_path.count("/") < 2: + continue - # Calculate total size of this directory - total_size = 0 - for file in files: try: - file_path = f"{root}/{file}" - total_size += backend.fs.size(file_path) + stored[relative_path] = backend.fs.size(file_path) except Exception: - pass - - # Only count directories with files (actual objects) - if total_size > 0 or files: - stored[relative_path] = total_size + stored[relative_path] = 0 except FileNotFoundError: pass @@ -439,12 +445,17 @@ def list_schema_paths(store_name: str | None = None, config=None) -> dict[str, i def delete_schema_path(path: str, store_name: str | None = None, config=None) -> bool: """ - Delete a schema-addressed directory from storage. + Delete a schema-addressed object file from storage. + + ``path`` is an individual object file (as enumerated by + :func:`list_schema_paths`), not a directory — so only the orphaned token + file is removed, leaving other versions/fields under the same primary-key + directory intact. Empty parent directories are pruned best-effort. Parameters ---------- path : str - Storage path (relative to store root). + Object file path (relative to store root). store_name : str, optional Store name (None = default store). config : Config, optional @@ -460,12 +471,25 @@ def delete_schema_path(path: str, store_name: str | None = None, config=None) -> try: full_path = backend._full_path(path) if backend.fs.exists(full_path): - # Remove entire directory tree - backend.fs.rm(full_path, recursive=True) - logger.debug(f"Deleted schema path: {path}") + # Remove just this object file (not the whole PK directory) + backend.fs.rm(full_path) + logger.debug(f"Deleted schema object: {path}") + + # Best-effort: prune now-empty parent directories up to the store root. + root = backend._full_path("").rstrip("/") + parent = full_path.rsplit("/", 1)[0] + while parent and parent != root and parent.startswith(root): + try: + if backend.fs.ls(parent): + break # not empty + backend.fs.rmdir(parent) + except Exception: + break + parent = parent.rsplit("/", 1)[0] + return True except Exception as e: - logger.warning(f"Error deleting schema path {path}: {e}") + logger.warning(f"Error deleting schema object {path}: {e}") return False diff --git a/tests/integration/test_gc.py b/tests/integration/test_gc.py index c9ea741bd..e0d22f86a 100644 --- a/tests/integration/test_gc.py +++ b/tests/integration/test_gc.py @@ -9,9 +9,21 @@ import datajoint as dj from datajoint import gc +from datajoint.builtin_codecs.object import ObjectCodec +from datajoint.codecs import get_codec from datajoint.errors import DataJointError +class GcCustomObjectCodec(ObjectCodec): + """A custom schema-addressed codec — a ``SchemaCodec`` subclass with a + non-built-in name, mirroring the user's NetCDF codec in #1469. Registered + at import via ``Codec.__init_subclass__``. GC must recognize it by type + (not by hardcoded name) and treat its files as referenced, not orphaned. + """ + + name = "gc_custom_object" + + # Tables used by TestScanWithLiveData. Defined at module scope so dj.Schema's # context resolution can find them by class name; bound to a schema inside # each fixture (see schema(...) calls below). @@ -41,6 +53,14 @@ class GcObjectTest(dj.Manual): """ +class GcCustomCodecTest(dj.Manual): + definition = """ + rid : int + --- + payload : + """ + + class TestUsesHashStorage: """Tests for _uses_hash_storage helper function.""" @@ -133,24 +153,30 @@ def test_returns_false_for_no_adapter(self): def test_returns_true_for_object_type(self): """Test that True is returned for type.""" attr = MagicMock() - attr.codec = MagicMock() - attr.codec.name = "object" + attr.codec = get_codec("object") assert gc._uses_schema_storage(attr) is True def test_returns_true_for_npy_type(self): """Test that True is returned for type.""" attr = MagicMock() - attr.codec = MagicMock() - attr.codec.name = "npy" + attr.codec = get_codec("npy") + + assert gc._uses_schema_storage(attr) is True + + def test_returns_true_for_custom_schema_subclass(self): + """Recognition is by type, not name: a custom SchemaCodec subclass + (here, a subclass of ObjectCodec) must be seen as schema-addressed so + GC does not misclassify its live files as orphans (#1469).""" + attr = MagicMock() + attr.codec = get_codec("gc_custom_object") assert gc._uses_schema_storage(attr) is True def test_returns_false_for_other_types(self): """Test that False is returned for non-schema-addressed types.""" attr = MagicMock() - attr.codec = MagicMock() - attr.codec.name = "blob" + attr.codec = get_codec("blob") assert gc._uses_schema_storage(attr) is False @@ -465,3 +491,57 @@ def test_scan_finds_active_object_reference(self, schema_object): stats = gc.scan(schema_object, store_name="local") assert stats["schema_paths_referenced"] >= 1, f"scan should find the active reference; got {stats}" + + @pytest.fixture + def schema_custom(self, connection_test, prefix, mock_stores): + schema_name = f"{prefix}_test_gc_e2e_custom" + schema = dj.Schema( + schema_name, + context={"GcCustomCodecTest": GcCustomCodecTest}, + connection=connection_test, + ) + schema(GcCustomCodecTest) + yield schema + schema.drop() + + def test_custom_codec_reference_not_orphaned(self, schema_custom): + """#1469: a live custom SchemaCodec value must be recognized as + referenced and its file path must NOT be flagged as an orphan. Before + the fix, _uses_schema_storage keyed on the hardcoded names object/npy, + so this codec was never scanned and its live file was reported orphaned. + + Asserts on the specific live path (not global counts) so it is robust to + other tests sharing the same ``local`` store. + """ + GcCustomCodecTest.insert1({"rid": 1, "payload": b"live-payload"}) + + refs = gc.scan_schema_references(schema_custom, store_name="local") + assert refs, "custom codec's live reference must be discovered (#1469)" + live_path = next(iter(refs)) + + stats = gc.scan(schema_custom, store_name="local") + assert live_path not in stats["orphaned_paths"], f"live custom-codec file wrongly flagged orphan: {live_path}" + + def test_custom_codec_survives_collect(self, schema_custom): + """#1469 end-to-end data-loss guard: collect() must delete only the + deleted row's file and keep the surviving row's file (checked by exact + path, robust to a shared store).""" + GcCustomCodecTest.insert1({"rid": 1, "payload": b"row-one"}) + GcCustomCodecTest.insert1({"rid": 2, "payload": b"row-two"}) + + refs_all = gc.scan_schema_references(schema_custom, store_name="local") + assert len(refs_all) == 2, f"both live rows should be referenced; got {refs_all}" + + # Delete one row — its file becomes a genuine orphan (delete-then-GC). + (GcCustomCodecTest & {"rid": 1}).delete(prompt=False) + + refs_live = gc.scan_schema_references(schema_custom, store_name="local") + assert len(refs_live) == 1, f"one live row should remain referenced; got {refs_live}" + live_path = next(iter(refs_live)) + deleted_paths = refs_all - refs_live + + gc.collect(schema_custom, store_name="local", dry_run=False) + + stored_after = set(gc.list_schema_paths("local", config=schema_custom.connection._config)) + assert live_path in stored_after, f"collect() deleted the live custom-codec file {live_path} (#1469)" + assert not (deleted_paths & stored_after), f"deleted row's orphan not reclaimed: {deleted_paths & stored_after}"