diff --git a/photomap/backend/cluster_labels.py b/photomap/backend/cluster_labels.py index 46c68f53..ed464099 100644 --- a/photomap/backend/cluster_labels.py +++ b/photomap/backend/cluster_labels.py @@ -18,8 +18,6 @@ import hashlib import importlib.resources import logging -import threading -from collections import OrderedDict from pathlib import Path from typing import TYPE_CHECKING @@ -28,6 +26,7 @@ from sklearn.cluster import DBSCAN from .encoders import get_cached_encoder +from .util import BoundedLRU if TYPE_CHECKING: from .embeddings import Embeddings @@ -584,27 +583,12 @@ def get_or_build_cluster_labels( # Bounded in-memory LRU. Per-image scoring is microseconds (one vector dot the # vocab matrix, ~1M FLOPs), but we still cache because the frontend re-opens -# the drawer for the same image often during navigation. Key includes the -# vocab mtime so edits to either vocab file naturally bypass stale entries. -_IMAGE_LABEL_CACHE: OrderedDict[tuple, dict] = OrderedDict() -_IMAGE_LABEL_CACHE_MAX = 1024 -_image_label_cache_lock = threading.Lock() - - -def _image_label_cache_get(key: tuple) -> dict | None: - with _image_label_cache_lock: - val = _IMAGE_LABEL_CACHE.get(key) - if val is not None: - _IMAGE_LABEL_CACHE.move_to_end(key) - return val - - -def _image_label_cache_put(key: tuple, val: dict) -> None: - with _image_label_cache_lock: - _IMAGE_LABEL_CACHE[key] = val - _IMAGE_LABEL_CACHE.move_to_end(key) - while len(_IMAGE_LABEL_CACHE) > _IMAGE_LABEL_CACHE_MAX: - _IMAGE_LABEL_CACHE.popitem(last=False) +# the drawer for the same image often during navigation. The cache key carries +# the embeddings .npz mtime AND the vocab mtime so that re-indexing the album +# (which can change which raw row corresponds to a given sorted_index) and +# vocab edits both invalidate stale entries — the previous version omitted the +# npz mtime and could serve stale labels until the vocab was touched. +_IMAGE_LABEL_CACHE: BoundedLRU[tuple, dict] = BoundedLRU(maxsize=1024) def compute_image_label( @@ -627,8 +611,17 @@ def compute_image_label( vocab_path = vocab_file_path() vocab_mtime = _vocab_sources_max_mtime(vocab_path) if vocab_path.exists() else 0.0 - cache_key = (str(embeddings.embeddings_path), int(sorted_index), vocab_mtime) - hit = _image_label_cache_get(cache_key) + try: + npz_mtime = embeddings.embeddings_path.stat().st_mtime + except OSError: + npz_mtime = 0.0 + cache_key = ( + str(embeddings.embeddings_path), + int(sorted_index), + npz_mtime, + vocab_mtime, + ) + hit = _IMAGE_LABEL_CACHE.get(cache_key) if hit is not None: return hit @@ -663,5 +656,5 @@ def compute_image_label( "alternates": top_phrases[1:], "score": float(scores[int(top_idx[0])]), } - _image_label_cache_put(cache_key, result) + _IMAGE_LABEL_CACHE.put(cache_key, result) return result diff --git a/photomap/backend/routers/curation.py b/photomap/backend/routers/curation.py index 6a7617ea..bdc0bf77 100644 --- a/photomap/backend/routers/curation.py +++ b/photomap/backend/routers/curation.py @@ -2,7 +2,6 @@ import os import random import shutil -import threading import uuid from collections import Counter from collections.abc import Callable @@ -15,16 +14,19 @@ from ..config import get_config_manager from ..embeddings import _open_npz_file, get_fps_indices_global, get_kmeans_indices_global from ..progress import IndexStatus, progress_tracker +from ..util import BoundedLRU from .album import validate_album_exists, validate_image_access router = APIRouter() logger = logging.getLogger(__name__) -# Store results for completed curation jobs. The background task writes here -# from a worker thread while request handlers read in the event-loop thread, -# so guard accesses with a lock to keep the dict from being mutated mid-read. -_curation_results: dict[str, Any] = {} -_curation_results_lock = threading.Lock() +# Completed curation-job results. Bounded so a long-running server doesn't +# accumulate one entry per job_id forever — 64 is well above any realistic +# in-flight + recently-viewed working set (FPS / K-means takes seconds; the +# frontend polls each job_id once or twice). Older entries fall off via LRU +# when the cap is exceeded; the frontend already handles "job not found" as +# the no-result path. +_curation_results: BoundedLRU[str, dict[str, Any]] = BoundedLRU(maxsize=64) class CurationRequest(BaseModel): """ @@ -155,8 +157,7 @@ def _on_iter(i: int) -> None: progress_tracker.set_error(job_id, str(exc)) return - with _curation_results_lock: - _curation_results[job_id] = result + _curation_results.put(job_id, result) progress_tracker.complete_operation(job_id, "Curation completed") logger.info(f"Curation Job {job_id}: Completed successfully") @@ -164,11 +165,7 @@ def _on_iter(i: int) -> None: except Exception as e: logger.error(f"Curation Job {job_id}: Error - {str(e)}") progress_tracker.set_error(job_id, str(e)) - with _curation_results_lock: - _curation_results[job_id] = { - "status": "error", - "error": str(e) - } + _curation_results.put(job_id, {"status": "error", "error": str(e)}) @router.post("/curate") async def run_curation(request: CurationRequest, background_tasks: BackgroundTasks): @@ -206,8 +203,7 @@ async def get_curation_progress(job_id: str): if progress is None: # Check if we have a completed result - with _curation_results_lock: - cached = _curation_results.get(job_id) + cached = _curation_results.get(job_id) if cached is not None: return { "status": "completed", @@ -223,8 +219,7 @@ async def get_curation_progress(job_id: str): if progress.status == IndexStatus.COMPLETED: # Return result if available - with _curation_results_lock: - result = _curation_results.get(job_id, {}) + result = _curation_results.get(job_id, {}) return { "status": "completed", "result": result diff --git a/photomap/backend/util.py b/photomap/backend/util.py index 57e69cf5..dda0c906 100644 --- a/photomap/backend/util.py +++ b/photomap/backend/util.py @@ -3,11 +3,64 @@ import os import socket +import threading +from collections import OrderedDict +from collections.abc import Hashable from pathlib import Path -from typing import Any +from typing import Any, Generic, TypeVar import numpy as np +K = TypeVar("K", bound=Hashable) +V = TypeVar("V") + + +class BoundedLRU(Generic[K, V]): + """Thread-safe LRU cache capped at ``maxsize`` entries. + + Replaces ad-hoc per-module ``dict``s that previously had no eviction + policy (e.g. ``_curation_results`` accumulating one entry per curation + job for the life of the server). Get / put are O(1) and serialized by + an internal lock — fine for the modest hit rates these caches see. + + Hits move-to-end (most-recently-used). Inserts evict the + least-recently-used entry once ``maxsize`` is exceeded. + """ + + def __init__(self, maxsize: int) -> None: + if maxsize <= 0: + raise ValueError(f"maxsize must be positive (got {maxsize})") + self._maxsize = maxsize + self._data: OrderedDict[K, V] = OrderedDict() + self._lock = threading.Lock() + + def get(self, key: K, default: V | None = None) -> V | None: + with self._lock: + value = self._data.get(key) + if value is None and key not in self._data: + return default + self._data.move_to_end(key) + return value + + def put(self, key: K, value: V) -> None: + with self._lock: + self._data[key] = value + self._data.move_to_end(key) + while len(self._data) > self._maxsize: + self._data.popitem(last=False) + + def __contains__(self, key: K) -> bool: + with self._lock: + return key in self._data + + def __len__(self) -> int: + with self._lock: + return len(self._data) + + def clear(self) -> None: + with self._lock: + self._data.clear() + def atomic_savez(path: Path, **arrays: Any) -> None: """Write a ``.npz`` archive to ``path`` atomically. diff --git a/tests/backend/test_cluster_labels.py b/tests/backend/test_cluster_labels.py index 40317727..ba8cc828 100644 --- a/tests/backend/test_cluster_labels.py +++ b/tests/backend/test_cluster_labels.py @@ -183,8 +183,11 @@ def counting_get_or_build(*args, **kwargs): def test_compute_image_label_cache_evicts_past_max(synthetic_album, monkeypatch): - monkeypatch.setattr(cluster_labels, "_IMAGE_LABEL_CACHE_MAX", 3) - cluster_labels._IMAGE_LABEL_CACHE.clear() + # Replace the module-level cache with a small one for this test so the + # eviction policy is observable in a few iterations. + from photomap.backend.util import BoundedLRU + + monkeypatch.setattr(cluster_labels, "_IMAGE_LABEL_CACHE", BoundedLRU(maxsize=3)) for i in range(5): cluster_labels.compute_image_label(synthetic_album, i)