Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/datajoint/codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class MyTable(dj.Manual):

from __future__ import annotations

import json
import logging
from abc import ABC, abstractmethod
from typing import Any
Expand Down Expand Up @@ -219,6 +220,45 @@ def validate(self, value: Any) -> None:
"""
pass

def referenced_paths(self, stored: Any) -> list[tuple[str, str | None]]:
"""
Return the external store paths this stored value references.

Garbage collection and delete-time cleanup call this on the *stored*
(encoded) representation of a column value — the JSON/dict already in
the database — so discovery stays metadata-only (no download or decode).

The default recognizes DataJoint's standard external-storage metadata:
a dict (or JSON string) carrying a ``path`` and optional ``store``. This
covers hash-addressed (``<hash@>``/``<blob@>``/``<attach@>``) and
schema-addressed (``<object@>``/``<npy@>`` and any :class:`SchemaCodec`
subclass) storage with no extra work, and returns an empty list for
values that are not externally stored (e.g. in-table ``<blob>``).

Override only if your codec stores external references in a non-standard
shape. Returning the paths here is what lets garbage collection see a
custom codec's files as *referenced* rather than orphaned (see #1469).

Parameters
----------
stored : any
The stored (encoded) value as read from the database column.

Returns
-------
list[tuple[str, str | None]]
``(path, store_name)`` for each external artifact referenced.
"""
value = stored
if isinstance(value, str):
try:
value = json.loads(value)
except (json.JSONDecodeError, TypeError, ValueError):
return []
if isinstance(value, dict) and "path" in value:
return [(value["path"], value.get("store"))]
return []

def __repr__(self) -> str:
return f"<{self.__class__.__name__}(name={self.name!r})>"

Expand Down
94 changes: 59 additions & 35 deletions src/datajoint/gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,12 @@ def _uses_schema_storage(attr) -> bool:
if not attr.codec:
return False

codec_name = getattr(attr.codec, "name", "")
return codec_name in ("object", "npy")
# Recognize schema-addressed storage by type, not by a hardcoded codec name,
# so custom SchemaCodec subclasses (e.g. a user's NetCDF codec) are seen by
# GC and their live files are not misclassified as orphans (#1469).
from .builtin_codecs.schema import SchemaCodec

return isinstance(attr.codec, SchemaCodec)


def _extract_hash_refs(value: Any) -> list[tuple[str, str | None]]:
Expand Down Expand Up @@ -231,12 +235,13 @@ def scan_hash_references(

# Read raw JSON metadata via cursor — bypasses decode_attribute
# so we get the stored dict (PostgreSQL/JSONB) or JSON string
# (MySQL), not the decoded codec output. _extract_hash_refs
# handles both shapes.
# (MySQL), not the decoded codec output. The codec's own
# referenced_paths() extracts the referenced paths and handles
# both shapes (codec-driven discovery, #1469).
try:
cursor = table.proj(attr_name).cursor(as_dict=True)
for row in cursor:
for path, ref_store in _extract_hash_refs(row[attr_name]):
for path, ref_store in attr.codec.referenced_paths(row[attr_name]):
# Filter by store if specified
if store_name is None or ref_store == store_name:
referenced.add(path)
Expand Down Expand Up @@ -296,12 +301,13 @@ def scan_schema_references(

# Read raw JSON metadata via cursor — bypasses decode_attribute
# so we get the stored dict (PostgreSQL/JSONB) or JSON string
# (MySQL), not the decoded codec output. _extract_schema_refs
# handles both shapes.
# (MySQL), not the decoded codec output. The codec's own
# referenced_paths() extracts the referenced paths and handles
# both shapes (codec-driven discovery, #1469).
try:
cursor = table.proj(attr_name).cursor(as_dict=True)
for row in cursor:
for path, ref_store in _extract_schema_refs(row[attr_name]):
for path, ref_store in attr.codec.referenced_paths(row[attr_name]):
# Filter by store if specified
if store_name is None or ref_store == store_name:
referenced.add(path)
Expand Down Expand Up @@ -379,10 +385,14 @@ def list_stored_hashes(store_name: str | None = None, config=None) -> dict[str,

def list_schema_paths(store_name: str | None = None, config=None) -> dict[str, int]:
"""
List all schema-addressed items in storage.
List all schema-addressed object files in storage.

Scans for directories matching the schema-addressed storage pattern:
``{schema}/{table}/{pk}/{field}/``
Enumerates the individual object **files** written by schema-addressed
codecs, whose paths follow ``{schema}/{table}/{pk}/{field}_{token}[.ext]``.
Returning file paths (rather than the enclosing directory) is what lets
them match the file paths recorded in each row's stored metadata — the two
must be comparable for orphan detection, and per-token granularity lets
superseded versions be reclaimed while the current token is retained.

Parameters
----------
Expand All @@ -394,40 +404,36 @@ def list_schema_paths(store_name: str | None = None, config=None) -> dict[str, i
Returns
-------
dict[str, int]
Dict mapping storage path to size in bytes.
Dict mapping each object file's relative path to its size in bytes.
"""
backend = get_store_backend(store_name, config=config)
stored: dict[str, int] = {}

try:
# Walk the storage looking for schema-addressed paths
# Walk the storage collecting schema-addressed object files
full_prefix = backend._full_path("")

for root, dirs, files in backend.fs.walk(full_prefix):
# Skip _hash directory (hash-addressed storage)
if "_hash" in root:
continue

# Look for schema-addressed pattern (has files, not in _hash)
# Schema-addressed paths: {schema}/{table}/{pk}/{field}/
relative_path = root.replace(full_prefix, "").lstrip("/")
for filename in files:
# Skip manifest sidecars (mirrors list_stored_hashes)
if filename.endswith(".manifest.json"):
continue

# Skip empty paths and root-level directories
if not relative_path or relative_path.count("/") < 2:
continue
file_path = f"{root}/{filename}"
relative_path = file_path.replace(full_prefix, "").lstrip("/")

# Schema-addressed files live at least at {schema}/{table}/.../{file}
if relative_path.count("/") < 2:
continue

# Calculate total size of this directory
total_size = 0
for file in files:
try:
file_path = f"{root}/{file}"
total_size += backend.fs.size(file_path)
stored[relative_path] = backend.fs.size(file_path)
except Exception:
pass

# Only count directories with files (actual objects)
if total_size > 0 or files:
stored[relative_path] = total_size
stored[relative_path] = 0

except FileNotFoundError:
pass
Expand All @@ -439,12 +445,17 @@ def list_schema_paths(store_name: str | None = None, config=None) -> dict[str, i

def delete_schema_path(path: str, store_name: str | None = None, config=None) -> bool:
"""
Delete a schema-addressed directory from storage.
Delete a schema-addressed object file from storage.

``path`` is an individual object file (as enumerated by
:func:`list_schema_paths`), not a directory — so only the orphaned token
file is removed, leaving other versions/fields under the same primary-key
directory intact. Empty parent directories are pruned best-effort.

Parameters
----------
path : str
Storage path (relative to store root).
Object file path (relative to store root).
store_name : str, optional
Store name (None = default store).
config : Config, optional
Expand All @@ -460,12 +471,25 @@ def delete_schema_path(path: str, store_name: str | None = None, config=None) ->
try:
full_path = backend._full_path(path)
if backend.fs.exists(full_path):
# Remove entire directory tree
backend.fs.rm(full_path, recursive=True)
logger.debug(f"Deleted schema path: {path}")
# Remove just this object file (not the whole PK directory)
backend.fs.rm(full_path)
logger.debug(f"Deleted schema object: {path}")

# Best-effort: prune now-empty parent directories up to the store root.
root = backend._full_path("").rstrip("/")
parent = full_path.rsplit("/", 1)[0]
while parent and parent != root and parent.startswith(root):
try:
if backend.fs.ls(parent):
break # not empty
backend.fs.rmdir(parent)
except Exception:
break
parent = parent.rsplit("/", 1)[0]

return True
except Exception as e:
logger.warning(f"Error deleting schema path {path}: {e}")
logger.warning(f"Error deleting schema object {path}: {e}")

return False

Expand Down
92 changes: 86 additions & 6 deletions tests/integration/test_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,21 @@

import datajoint as dj
from datajoint import gc
from datajoint.builtin_codecs.object import ObjectCodec
from datajoint.codecs import get_codec
from datajoint.errors import DataJointError


class GcCustomObjectCodec(ObjectCodec):
"""A custom schema-addressed codec — a ``SchemaCodec`` subclass with a
non-built-in name, mirroring the user's NetCDF codec in #1469. Registered
at import via ``Codec.__init_subclass__``. GC must recognize it by type
(not by hardcoded name) and treat its files as referenced, not orphaned.
"""

name = "gc_custom_object"


# Tables used by TestScanWithLiveData. Defined at module scope so dj.Schema's
# context resolution can find them by class name; bound to a schema inside
# each fixture (see schema(...) calls below).
Expand Down Expand Up @@ -41,6 +53,14 @@ class GcObjectTest(dj.Manual):
"""


class GcCustomCodecTest(dj.Manual):
definition = """
rid : int
---
payload : <gc_custom_object@local>
"""


class TestUsesHashStorage:
"""Tests for _uses_hash_storage helper function."""

Expand Down Expand Up @@ -133,24 +153,30 @@ def test_returns_false_for_no_adapter(self):
def test_returns_true_for_object_type(self):
"""Test that True is returned for <object@> type."""
attr = MagicMock()
attr.codec = MagicMock()
attr.codec.name = "object"
attr.codec = get_codec("object")

assert gc._uses_schema_storage(attr) is True

def test_returns_true_for_npy_type(self):
"""Test that True is returned for <npy@> type."""
attr = MagicMock()
attr.codec = MagicMock()
attr.codec.name = "npy"
attr.codec = get_codec("npy")

assert gc._uses_schema_storage(attr) is True

def test_returns_true_for_custom_schema_subclass(self):
"""Recognition is by type, not name: a custom SchemaCodec subclass
(here, a subclass of ObjectCodec) must be seen as schema-addressed so
GC does not misclassify its live files as orphans (#1469)."""
attr = MagicMock()
attr.codec = get_codec("gc_custom_object")

assert gc._uses_schema_storage(attr) is True

def test_returns_false_for_other_types(self):
"""Test that False is returned for non-schema-addressed types."""
attr = MagicMock()
attr.codec = MagicMock()
attr.codec.name = "blob"
attr.codec = get_codec("blob")

assert gc._uses_schema_storage(attr) is False

Expand Down Expand Up @@ -465,3 +491,57 @@ def test_scan_finds_active_object_reference(self, schema_object):
stats = gc.scan(schema_object, store_name="local")

assert stats["schema_paths_referenced"] >= 1, f"scan should find the active <object@> reference; got {stats}"

@pytest.fixture
def schema_custom(self, connection_test, prefix, mock_stores):
schema_name = f"{prefix}_test_gc_e2e_custom"
schema = dj.Schema(
schema_name,
context={"GcCustomCodecTest": GcCustomCodecTest},
connection=connection_test,
)
schema(GcCustomCodecTest)
yield schema
schema.drop()

def test_custom_codec_reference_not_orphaned(self, schema_custom):
"""#1469: a live custom SchemaCodec value must be recognized as
referenced and its file path must NOT be flagged as an orphan. Before
the fix, _uses_schema_storage keyed on the hardcoded names object/npy,
so this codec was never scanned and its live file was reported orphaned.

Asserts on the specific live path (not global counts) so it is robust to
other tests sharing the same ``local`` store.
"""
GcCustomCodecTest.insert1({"rid": 1, "payload": b"live-payload"})

refs = gc.scan_schema_references(schema_custom, store_name="local")
assert refs, "custom codec's live reference must be discovered (#1469)"
live_path = next(iter(refs))

stats = gc.scan(schema_custom, store_name="local")
assert live_path not in stats["orphaned_paths"], f"live custom-codec file wrongly flagged orphan: {live_path}"

def test_custom_codec_survives_collect(self, schema_custom):
"""#1469 end-to-end data-loss guard: collect() must delete only the
deleted row's file and keep the surviving row's file (checked by exact
path, robust to a shared store)."""
GcCustomCodecTest.insert1({"rid": 1, "payload": b"row-one"})
GcCustomCodecTest.insert1({"rid": 2, "payload": b"row-two"})

refs_all = gc.scan_schema_references(schema_custom, store_name="local")
assert len(refs_all) == 2, f"both live rows should be referenced; got {refs_all}"

# Delete one row — its file becomes a genuine orphan (delete-then-GC).
(GcCustomCodecTest & {"rid": 1}).delete(prompt=False)

refs_live = gc.scan_schema_references(schema_custom, store_name="local")
assert len(refs_live) == 1, f"one live row should remain referenced; got {refs_live}"
live_path = next(iter(refs_live))
deleted_paths = refs_all - refs_live

gc.collect(schema_custom, store_name="local", dry_run=False)

stored_after = set(gc.list_schema_paths("local", config=schema_custom.connection._config))
assert live_path in stored_after, f"collect() deleted the live custom-codec file {live_path} (#1469)"
assert not (deleted_paths & stored_after), f"deleted row's orphan not reclaimed: {deleted_paths & stored_after}"
Loading