Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 19 additions & 26 deletions photomap/backend/cluster_labels.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import hashlib
import importlib.resources
import logging
import threading
from collections import OrderedDict
from pathlib import Path
from typing import TYPE_CHECKING

Expand All @@ -28,6 +26,7 @@
from sklearn.cluster import DBSCAN

from .encoders import get_cached_encoder
from .util import BoundedLRU

if TYPE_CHECKING:
from .embeddings import Embeddings
Expand Down Expand Up @@ -584,27 +583,12 @@ def get_or_build_cluster_labels(

# Bounded in-memory LRU. Per-image scoring is microseconds (one vector dot the
# vocab matrix, ~1M FLOPs), but we still cache because the frontend re-opens
# the drawer for the same image often during navigation. Key includes the
# vocab mtime so edits to either vocab file naturally bypass stale entries.
_IMAGE_LABEL_CACHE: OrderedDict[tuple, dict] = OrderedDict()
_IMAGE_LABEL_CACHE_MAX = 1024
_image_label_cache_lock = threading.Lock()


def _image_label_cache_get(key: tuple) -> dict | None:
with _image_label_cache_lock:
val = _IMAGE_LABEL_CACHE.get(key)
if val is not None:
_IMAGE_LABEL_CACHE.move_to_end(key)
return val


def _image_label_cache_put(key: tuple, val: dict) -> None:
with _image_label_cache_lock:
_IMAGE_LABEL_CACHE[key] = val
_IMAGE_LABEL_CACHE.move_to_end(key)
while len(_IMAGE_LABEL_CACHE) > _IMAGE_LABEL_CACHE_MAX:
_IMAGE_LABEL_CACHE.popitem(last=False)
# the drawer for the same image often during navigation. The cache key carries
# the embeddings .npz mtime AND the vocab mtime so that re-indexing the album
# (which can change which raw row corresponds to a given sorted_index) and
# vocab edits both invalidate stale entries — the previous version omitted the
# npz mtime and could serve stale labels until the vocab was touched.
_IMAGE_LABEL_CACHE: BoundedLRU[tuple, dict] = BoundedLRU(maxsize=1024)


def compute_image_label(
Expand All @@ -627,8 +611,17 @@ def compute_image_label(

vocab_path = vocab_file_path()
vocab_mtime = _vocab_sources_max_mtime(vocab_path) if vocab_path.exists() else 0.0
cache_key = (str(embeddings.embeddings_path), int(sorted_index), vocab_mtime)
hit = _image_label_cache_get(cache_key)
try:
npz_mtime = embeddings.embeddings_path.stat().st_mtime
except OSError:
npz_mtime = 0.0
cache_key = (
str(embeddings.embeddings_path),
int(sorted_index),
npz_mtime,
vocab_mtime,
)
hit = _IMAGE_LABEL_CACHE.get(cache_key)
if hit is not None:
return hit

Expand Down Expand Up @@ -663,5 +656,5 @@ def compute_image_label(
"alternates": top_phrases[1:],
"score": float(scores[int(top_idx[0])]),
}
_image_label_cache_put(cache_key, result)
_IMAGE_LABEL_CACHE.put(cache_key, result)
return result
29 changes: 12 additions & 17 deletions photomap/backend/routers/curation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import os
import random
import shutil
import threading
import uuid
from collections import Counter
from collections.abc import Callable
Expand All @@ -15,16 +14,19 @@
from ..config import get_config_manager
from ..embeddings import _open_npz_file, get_fps_indices_global, get_kmeans_indices_global
from ..progress import IndexStatus, progress_tracker
from ..util import BoundedLRU
from .album import validate_album_exists, validate_image_access

router = APIRouter()
logger = logging.getLogger(__name__)

# Store results for completed curation jobs. The background task writes here
# from a worker thread while request handlers read in the event-loop thread,
# so guard accesses with a lock to keep the dict from being mutated mid-read.
_curation_results: dict[str, Any] = {}
_curation_results_lock = threading.Lock()
# Completed curation-job results. Bounded so a long-running server doesn't
# accumulate one entry per job_id forever — 64 is well above any realistic
# in-flight + recently-viewed working set (FPS / K-means takes seconds; the
# frontend polls each job_id once or twice). Older entries fall off via LRU
# when the cap is exceeded; the frontend already handles "job not found" as
# the no-result path.
_curation_results: BoundedLRU[str, dict[str, Any]] = BoundedLRU(maxsize=64)

class CurationRequest(BaseModel):
"""
Expand Down Expand Up @@ -155,20 +157,15 @@ def _on_iter(i: int) -> None:
progress_tracker.set_error(job_id, str(exc))
return

with _curation_results_lock:
_curation_results[job_id] = result
_curation_results.put(job_id, result)

progress_tracker.complete_operation(job_id, "Curation completed")
logger.info(f"Curation Job {job_id}: Completed successfully")

except Exception as e:
logger.error(f"Curation Job {job_id}: Error - {str(e)}")
progress_tracker.set_error(job_id, str(e))
with _curation_results_lock:
_curation_results[job_id] = {
"status": "error",
"error": str(e)
}
_curation_results.put(job_id, {"status": "error", "error": str(e)})

@router.post("/curate")
async def run_curation(request: CurationRequest, background_tasks: BackgroundTasks):
Expand Down Expand Up @@ -206,8 +203,7 @@ async def get_curation_progress(job_id: str):

if progress is None:
# Check if we have a completed result
with _curation_results_lock:
cached = _curation_results.get(job_id)
cached = _curation_results.get(job_id)
if cached is not None:
return {
"status": "completed",
Expand All @@ -223,8 +219,7 @@ async def get_curation_progress(job_id: str):

if progress.status == IndexStatus.COMPLETED:
# Return result if available
with _curation_results_lock:
result = _curation_results.get(job_id, {})
result = _curation_results.get(job_id, {})
return {
"status": "completed",
"result": result
Expand Down
55 changes: 54 additions & 1 deletion photomap/backend/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,64 @@

import os
import socket
import threading
from collections import OrderedDict
from collections.abc import Hashable
from pathlib import Path
from typing import Any
from typing import Any, Generic, TypeVar

import numpy as np

K = TypeVar("K", bound=Hashable)
V = TypeVar("V")


class BoundedLRU(Generic[K, V]):
"""Thread-safe LRU cache capped at ``maxsize`` entries.

Replaces ad-hoc per-module ``dict``s that previously had no eviction
policy (e.g. ``_curation_results`` accumulating one entry per curation
job for the life of the server). Get / put are O(1) and serialized by
an internal lock — fine for the modest hit rates these caches see.

Hits move-to-end (most-recently-used). Inserts evict the
least-recently-used entry once ``maxsize`` is exceeded.
"""

def __init__(self, maxsize: int) -> None:
if maxsize <= 0:
raise ValueError(f"maxsize must be positive (got {maxsize})")
self._maxsize = maxsize
self._data: OrderedDict[K, V] = OrderedDict()
self._lock = threading.Lock()

def get(self, key: K, default: V | None = None) -> V | None:
with self._lock:
value = self._data.get(key)
if value is None and key not in self._data:
return default
self._data.move_to_end(key)
return value

def put(self, key: K, value: V) -> None:
with self._lock:
self._data[key] = value
self._data.move_to_end(key)
while len(self._data) > self._maxsize:
self._data.popitem(last=False)

def __contains__(self, key: K) -> bool:
with self._lock:
return key in self._data

def __len__(self) -> int:
with self._lock:
return len(self._data)

def clear(self) -> None:
with self._lock:
self._data.clear()


def atomic_savez(path: Path, **arrays: Any) -> None:
"""Write a ``.npz`` archive to ``path`` atomically.
Expand Down
7 changes: 5 additions & 2 deletions tests/backend/test_cluster_labels.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,11 @@ def counting_get_or_build(*args, **kwargs):


def test_compute_image_label_cache_evicts_past_max(synthetic_album, monkeypatch):
monkeypatch.setattr(cluster_labels, "_IMAGE_LABEL_CACHE_MAX", 3)
cluster_labels._IMAGE_LABEL_CACHE.clear()
# Replace the module-level cache with a small one for this test so the
# eviction policy is observable in a few iterations.
from photomap.backend.util import BoundedLRU

monkeypatch.setattr(cluster_labels, "_IMAGE_LABEL_CACHE", BoundedLRU(maxsize=3))

for i in range(5):
cluster_labels.compute_image_label(synthetic_album, i)
Expand Down
Loading