diff --git a/backend/app/llm/budget_gate.py b/backend/app/llm/budget_gate.py index 3471020d..44232029 100644 --- a/backend/app/llm/budget_gate.py +++ b/backend/app/llm/budget_gate.py @@ -29,6 +29,7 @@ from datetime import UTC, datetime +import structlog from redis.asyncio import Redis _TTL_SECONDS = 26 * 60 * 60 @@ -90,3 +91,40 @@ async def record_cost( total = await redis.incrbyfloat(key, cost_usd) await redis.expire(key, _TTL_SECONDS) return float(total) + + +async def safe_record_cost( + redis: Redis, + cost_usd: float, + *, + logger: structlog.stdlib.BoundLogger, + log_message: str, + event_type: str, +) -> float | None: + """Record an LLM cost, swallowing transient Redis failures. + + Per GPT-5.5 cycle-2 C2-F3 (feat_llm_judgments): a Redis hiccup AFTER a + paid LLM call must not propagate up and abort the caller — the caller + persists its artifacts (judgments, digest) BEFORE calling this, so + under-counting daily spend during a Redis outage is recoverable on + rollover while losing the paid-for output is not. Returns ``None`` on + failure. + + Lives here (next to :func:`record_cost`) rather than in the worker layer + so the judgment-generation service can record cost without importing up + into ``backend.workers``. ``log_message`` / ``event_type`` are passed by + the caller so each caller keeps its own log voice (``judgment worker: …`` + / ``judgment_record_cost_failed`` etc.) while sharing the one defensive + contract. + """ + try: + return await record_cost(redis, cost_usd) + except Exception as exc: # noqa: BLE001 — defensive + logger.warning( + log_message, + event_type=event_type, + cost_usd=cost_usd, + error_type=type(exc).__name__, + error=str(exc), + ) + return None diff --git a/backend/app/services/judgment_generation.py b/backend/app/services/judgment_generation.py new file mode 100644 index 00000000..121f1f8d --- /dev/null +++ b/backend/app/services/judgment_generation.py @@ -0,0 +1,362 @@ +# SPDX-FileCopyrightText: 2026 soundminds.ai +# +# SPDX-License-Identifier: Apache-2.0 + +"""Judgment-generation service layer. + +Composition logic for turning a ``judgment_lists`` row into persisted +``judgments`` — shared by the LLM judge worker (``workers.judgments``) and +the UBI worker (``workers.judgments_ubi``). The workers own orchestration +(load the row, build clients, loop queries, flip terminal status, clean up); +this module owns the per-list/per-query composition of repos + adapter + LLM ++ budget so that logic lives in the service layer rather than the worker. +""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import Any + +import openai +import structlog +import uuid_utils +from openai import AsyncOpenAI +from redis.asyncio import Redis +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from backend.app.adapters.protocol import NativeQuery, QueryTemplate +from backend.app.db import repo +from backend.app.domain.study.template_defaults import compute_default_params +from backend.app.llm.budget_gate import BudgetExceededError, peek_daily_total, safe_record_cost +from backend.app.llm.cost_model import UnknownModelPricingError, estimated_max_call_cost +from backend.app.llm.openai_judge import rate_query_batch +from backend.app.llm.prompt_loader import render_user_prompt + +logger = structlog.get_logger(__name__) + + +TOP_K = 50 +"""Retrieval depth per query (per spec §13 cost guardrail). + +50 is the design target: enough docs to give the LLM useful relevance +contrast across the rubric scale, low enough that one call per query keeps +the tutorial cost under $1.""" + +_DOC_BODY_CHAR_LIMIT = 500 +"""Per-doc body truncation length (per spec §13). + +Bounds the input token count; only the first 500 chars of each doc body are +passed to the LLM. The doc body is just for the rubric judgment, not for +retrieval — full-text retention isn't needed.""" + + +def _build_doc_inputs(hits: Sequence[Any]) -> list[dict[str, str]]: + """Translate ``ScoredHit`` rows into ``[{doc_id, body}, ...]`` for the prompt. + + Body extraction prefers ``hit.source.body`` (the canonical text field); + falls back to a JSON-dumped ``hit.source`` when absent. The result is + trimmed to :data:`_DOC_BODY_CHAR_LIMIT` characters per doc. + """ + out: list[dict[str, str]] = [] + for hit in hits: + source = getattr(hit, "source", None) or {} + body_raw = source.get("body") if isinstance(source, dict) else None + if not body_raw: + # Fall back to a stable string form of the source. ``source`` is + # engine ``_source`` (always JSON-origin in practice), but guard the + # dump anyway now that this helper is service-layer: a future caller + # could pass a hit whose source holds a non-serializable value, and + # a TypeError here would abort the whole judgment run. + import json as _json + + try: + body_raw = _json.dumps(source, ensure_ascii=False) + except TypeError: + body_raw = str(source) + body = str(body_raw) + if len(body) > _DOC_BODY_CHAR_LIMIT: + body = body[:_DOC_BODY_CHAR_LIMIT] + out.append({"doc_id": str(hit.doc_id), "body": body}) + return out + + +async def fail_judgment_list(db: Any, judgment_list_id: str, failed_reason: str) -> None: + """Flip a judgment list to ``status='failed'`` with a structured reason. + + Commits in the caller's session. Shared by both judgment workers — the + terminal-failure transition is identical regardless of which generation + path (LLM or UBI) hit the failure. + """ + await repo.update_judgment_list_status( + db, judgment_list_id, status="failed", failed_reason=failed_reason + ) + await db.commit() + + +async def fail_on_budget_or_pricing_error( + factory: async_sessionmaker[AsyncSession], + judgment_list_id: str, + exc: BudgetExceededError | UnknownModelPricingError, + *, + logger: structlog.stdlib.BoundLogger, + event_prefix: str, +) -> None: + """Map a fatal budget / pricing error to the terminal failed status + log. + + Shared by both judgment workers' per-query loops, which previously each + carried a near-identical pair of ``except`` blocks. Opens its own short + session off ``factory`` (the loop's per-query session is already unwound by + the time the error propagates). ``event_prefix`` ('judgment' or 'ubi') + preserves each worker's spec §13 operability ``event_type`` + (``{prefix}_budget_exceeded`` / ``{prefix}_unknown_pricing``). + """ + if isinstance(exc, BudgetExceededError): + reason, suffix, what = "OPENAI_BUDGET_EXCEEDED", "budget_exceeded", "budget exceeded" + else: + reason, suffix, what = "UNKNOWN_MODEL_PRICING", "unknown_pricing", "unknown model pricing" + async with factory() as db: + await fail_judgment_list(db, judgment_list_id, reason) + logger.warning( + f"judgment generation: {what} — aborting loop", + event_type=f"{event_prefix}_{suffix}", + judgment_list_id=judgment_list_id, + error=str(exc), + ) + + +async def process_judgment_query( + *, + db: Any, + redis: Redis, + openai_client: AsyncOpenAI, + judgment_list_id: str, + judgment_list: Any, + template: QueryTemplate, + template_row: Any, + query: Any, + adapter: Any, + bundle_system: str, + rubric_text: str, + model: str, + budget_usd: float, +) -> bool: + """Compose one query's judgment generation: budget → search → LLM → persist. + + Returns: + ``True`` when judgments were persisted (success or already-judged + resume-skip) and ``False`` when the query was skipped for any + operational reason (search failed, LLM partial, empty hits). + The worker loop uses this to decide whether to mark the list + ``complete`` (all True) or ``failed`` with + ``failed_reason='PARTIAL_LLM_FAILURE'`` (any False). Per GPT-5.5 + cycle-8 C8-F1 — without the tracking, a partial-LLM-response + skip would leave the list ``complete`` with missing qrels and + the resume sweep would never pick it back up. + + Raises on budget / pricing failures so the worker loop can mark the + list ``failed`` with the specific reason. + """ + # Resume-skip: if this query already has ANY judgments, the prior worker + # pass either completed it OR was atomically rolled back. Because + # bulk_create_judgments + commit happens in one transaction after the + # LLM call, the only post-crash states are "0 rows" or "the full batch + # returned by the LLM". A hardcoded ``existing >= TOP_K`` would fail for + # queries that legitimately returned fewer than TOP_K hits (sparse + # indices, tutorial-scale datasets) and would re-spend OpenAI dollars. + # Per GPT-5.5 final review F2. + existing = await repo.count_judgments_for_list_and_query(db, judgment_list_id, query.id) + if existing > 0: + logger.info( + "query already judged, skipping", + event_type="judgment_skip_resume", + judgment_list_id=judgment_list_id, + query_id=query.id, + existing_count=existing, + ) + return True + + # Pre-call budget peek (spec FR-2 + GPT-5.5 cycle 1 F8). + if budget_usd > 0: + current = await peek_daily_total(redis) + est_max = estimated_max_call_cost(model) + if current + est_max > budget_usd: + raise BudgetExceededError( + f"current ${current:.4f} + estimated ${est_max:.4f} > budget ${budget_usd:.4f}" + ) + + # Render template + execute search. + default_params = compute_default_params(template_row) + try: + native = adapter.render(template, default_params, query.query_text) + # Override the adapter-generated query_id with our own so the + # search_batch response key matches what we expect. + native = NativeQuery(query_id=str(query.id), body=native.body) + hits_by_qid = await adapter.search_batch( + target=judgment_list.target, + queries=[native], + top_k=TOP_K, + strict_errors=False, + ) + except Exception as exc: + logger.warning( + "judgment worker: search failed for query, skipping", + event_type="judgment_search_failed", + judgment_list_id=judgment_list_id, + query_id=query.id, + error_type=type(exc).__name__, + error=str(exc), + ) + return False + + hits = hits_by_qid.get(str(query.id), []) + if not hits: + # Zero hits: not a worker failure — there's genuinely nothing to + # judge. Count this as success so the outer loop can still mark + # the list complete. The downstream qrels_loader returns ``{}`` + # for queries with no judgments, which run_trial handles + # gracefully. + logger.info( + "judgment worker: no hits for query, skipping LLM call", + event_type="judgment_no_hits", + judgment_list_id=judgment_list_id, + query_id=query.id, + ) + return True + + # Ordinal prompt-ids decouple engine-supplied doc_ids (which may contain + # XML-sensitive chars like ``<``, ``&``, ``"``) from the LLM's + # round-trippable identifier. Autoescape on the Jinja sandbox would + # otherwise render ```` as ```` — the + # LLM would echo back ``a&b`` and the worker's allowlist + # (which has ``a&b``) would drop it as spurious, producing a permanent + # zero-judgments outcome. Per GPT-5.5 cycle-6 C6-F1. + raw_docs = _build_doc_inputs(hits) + prompt_docs = [{"doc_id": f"item-{i}", "body": d["body"]} for i, d in enumerate(raw_docs)] + prompt_id_to_real = {f"item-{i}": d["doc_id"] for i, d in enumerate(raw_docs)} + expected_doc_ids = set(prompt_id_to_real.keys()) + + user_prompt = render_user_prompt( + rubric_text=rubric_text, + query_text=query.query_text, + docs=prompt_docs, + ) + + # Spec FR-3c: "The actual prompt sent to OpenAI MUST include this rubric + # in full as part of the system prompt." We append the per-list rubric + # to the operator-fixed system message so the rubric appears at the + # top of the instruction hierarchy AND inside the user message's + # delimiter block (defense in depth). + system_prompt = f"{bundle_system}\n\n\n{rubric_text}\n" + + try: + result = await rate_query_batch( + client=openai_client, + model=model, + system_prompt=system_prompt, + user_prompt=user_prompt, + expected_doc_ids=expected_doc_ids, + ) + except ( + openai.AuthenticationError, + openai.PermissionDeniedError, + openai.BadRequestError, + openai.NotFoundError, + ): + # Persistent provider misconfiguration (bad key, model id, endpoint, + # ZDR enrollment denied, etc.). No subsequent query will succeed — + # propagate so the outer handler marks the list failed with + # ``failed_reason='UNEXPECTED:'`` rather than silently + # producing a `complete` list with zero judgments. Per GPT-5.5 + # cycle-2 C2-F1. + raise + except Exception as exc: + # Per-query operational failure (rate-limit exhaustion after retries, + # 5xx after retries, malformed JSON after retries). Subsequent queries + # may still succeed; isolate this one. + logger.warning( + "judgment worker: LLM call failed for query, skipping", + event_type="judgment_llm_failed", + judgment_list_id=judgment_list_id, + query_id=query.id, + error_type=type(exc).__name__, + error=str(exc), + ) + return False + + # All-or-nothing persistence: confirm the response is a *set-equal* + # match for the expected doc_ids. A simple ``len(ratings) < + # len(expected_doc_ids)`` check would still admit duplicates like + # ``[d1, d1]`` when expected was ``{d1, d2}`` — the UNIQUE constraint + # would then admit only the first row and resume-skip would strand d2 + # forever (per GPT-5.5 cycle-3 C3-F1). Require exact set equality + # AND that the LLM did not repeat any doc_id (no duplicates). + returned_ids = [r.doc_id for r in result.ratings] + returned_set = set(returned_ids) + if returned_set != expected_doc_ids or len(returned_ids) != len(returned_set): + logger.warning( + "judgment worker: LLM response not set-equal to expected docs; " + "skipping partial persist for retry", + event_type="judgment_partial_response", + judgment_list_id=judgment_list_id, + query_id=query.id, + expected=len(expected_doc_ids), + returned_unique=len(returned_set), + returned_total=len(returned_ids), + ) + # Still record the cost — we paid for the call. The retry pays again, + # but the alternative (permanent partial state) is worse. + await safe_record_cost( + redis, + result.cost_usd, + logger=logger, + log_message="judgment service: record_cost failed (budget telemetry only)", + event_type="judgment_record_cost_failed", + ) + return False + + rater_ref = f"openai:{result.model}" + rows = [ + { + "id": str(uuid_utils.uuid7()), + "judgment_list_id": judgment_list_id, + "query_id": str(query.id), + # Map prompt-only ordinal id (``item-N``) back to the real + # engine-supplied doc_id for DB persistence (C6-F1). + "doc_id": prompt_id_to_real[r.doc_id], + "rating": r.rating, + "source": "llm", + "rater_ref": rater_ref, + "notes": r.rationale, + } + for r in result.ratings + ] + # Persist FIRST, then record cost. If Redis is transiently unavailable + # at the record_cost step we'd otherwise drop the already-paid-for + # ratings (per GPT-5.5 cycle-2 C2-F3). Order swap means we may + # under-count daily spend if Redis flaps, which is recoverable on + # rollover; losing paid-for judgments is not. + if rows: + await repo.bulk_create_judgments(db, rows) + await db.commit() + + new_total = await safe_record_cost( + redis, + result.cost_usd, + logger=logger, + log_message="judgment service: record_cost failed (budget telemetry only)", + event_type="judgment_record_cost_failed", + ) + + logger.info( + "judgment query processed", + event_type="judgment_query_complete", + judgment_list_id=judgment_list_id, + query_id=query.id, + ratings_count=len(result.ratings), + input_tokens=result.input_tokens, + output_tokens=result.output_tokens, + cost_usd=result.cost_usd, + running_total_usd=new_total, + duration_ms=result.duration_ms, + ) + return True diff --git a/backend/tests/integration/test_digest_persist_then_record_cost.py b/backend/tests/integration/test_digest_persist_then_record_cost.py index f8506a1c..455992d7 100644 --- a/backend/tests/integration/test_digest_persist_then_record_cost.py +++ b/backend/tests/integration/test_digest_persist_then_record_cost.py @@ -52,9 +52,9 @@ async def test_digest_persisted_when_redis_record_fails(monkeypatch: pytest.Monk async def _broken_record_cost(*args: object, **kwargs: object) -> float: raise RuntimeError("simulated Redis flap during record_cost") - # record_cost is now invoked inside the shared workers.helpers.safe_record_cost + # record_cost is invoked inside the shared budget_gate.safe_record_cost # wrapper (digest.py delegates to it), so patch it at its call site there. - monkeypatch.setattr("backend.workers.helpers.record_cost", _broken_record_cost) + monkeypatch.setattr("backend.app.llm.budget_gate.record_cost", _broken_record_cost) await generate_digest({}, seeded["study_id"]) diff --git a/backend/tests/unit/llm/test_budget_gate.py b/backend/tests/unit/llm/test_budget_gate.py index 8f0585c7..49033123 100644 --- a/backend/tests/unit/llm/test_budget_gate.py +++ b/backend/tests/unit/llm/test_budget_gate.py @@ -22,13 +22,18 @@ from datetime import UTC, datetime from unittest.mock import AsyncMock +import structlog + from backend.app.llm.budget_gate import ( BudgetExceededError, daily_key, peek_daily_total, record_cost, + safe_record_cost, ) +_logger = structlog.get_logger("test") + def test_daily_key_format() -> None: assert daily_key(datetime(2026, 5, 11, 12, 0, tzinfo=UTC)) == "openai:budget:2026-05-11" @@ -116,3 +121,24 @@ async def fake_get(key: str) -> str | None: def test_budget_exceeded_error_inherits_from_runtime_error() -> None: """Type-only check — caller code uses ``except RuntimeError`` paths.""" assert issubclass(BudgetExceededError, RuntimeError) + + +async def test_safe_record_cost_returns_total_on_success() -> None: + """Happy path delegates to record_cost and returns the new running total.""" + redis = AsyncMock() + redis.incrbyfloat = AsyncMock(return_value=0.42) + redis.expire = AsyncMock(return_value=True) + total = await safe_record_cost( + redis, 0.42, logger=_logger, log_message="worker: failed", event_type="record_cost_failed" + ) + assert total == 0.42 + + +async def test_safe_record_cost_swallows_redis_failure() -> None: + """A Redis flap AFTER the paid call returns None instead of propagating.""" + redis = AsyncMock() + redis.incrbyfloat = AsyncMock(side_effect=RuntimeError("redis down")) + result = await safe_record_cost( + redis, 0.42, logger=_logger, log_message="worker: failed", event_type="record_cost_failed" + ) + assert result is None diff --git a/backend/tests/unit/services/test_judgment_generation.py b/backend/tests/unit/services/test_judgment_generation.py new file mode 100644 index 00000000..fa7cb696 --- /dev/null +++ b/backend/tests/unit/services/test_judgment_generation.py @@ -0,0 +1,126 @@ +# SPDX-FileCopyrightText: 2026 soundminds.ai +# +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for judgment-generation service helpers.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, cast + +import pytest +import structlog +from structlog.testing import capture_logs + +from backend.app.db import repo +from backend.app.llm.budget_gate import BudgetExceededError +from backend.app.llm.cost_model import UnknownModelPricingError +from backend.app.services.judgment_generation import ( + _DOC_BODY_CHAR_LIMIT, + _build_doc_inputs, + fail_on_budget_or_pricing_error, +) + + +@dataclass +class _Hit: + doc_id: str + source: Any + + +def test_prefers_source_body() -> None: + rows = _build_doc_inputs([_Hit(doc_id="d1", source={"body": "hello"})]) + assert rows == [{"doc_id": "d1", "body": "hello"}] + + +def test_truncates_long_body() -> None: + long = "x" * (_DOC_BODY_CHAR_LIMIT + 100) + rows = _build_doc_inputs([_Hit(doc_id="d1", source={"body": long})]) + assert len(rows[0]["body"]) == _DOC_BODY_CHAR_LIMIT + + +def test_json_dumps_fallback_when_no_body() -> None: + rows = _build_doc_inputs([_Hit(doc_id="d1", source={"title": "t", "n": 1})]) + # No ``body`` key → stable JSON dump of the source. + assert rows[0]["doc_id"] == "d1" + assert '"title"' in rows[0]["body"] + + +def test_str_fallback_when_source_not_json_serializable() -> None: + # The Gemini-flagged path: a source carrying a non-serializable value must + # fall back to str() rather than raising TypeError and aborting the run. + class _Weird: + def __repr__(self) -> str: + return "" + + rows = _build_doc_inputs([_Hit(doc_id="d1", source={"obj": _Weird()})]) + assert rows[0]["doc_id"] == "d1" + assert "weird" in rows[0]["body"] + + +class _FakeDB: + async def commit(self) -> None: ... + + +class _FakeFactory: + """Minimal async_sessionmaker stand-in: callable → async context manager.""" + + def __call__(self) -> _FakeFactory: + return self + + async def __aenter__(self) -> _FakeDB: + return _FakeDB() + + async def __aexit__(self, *exc: object) -> bool: + return False + + +@pytest.mark.parametrize( + ("exc", "prefix", "expected_reason", "expected_event"), + [ + ( + BudgetExceededError("over"), + "judgment", + "OPENAI_BUDGET_EXCEEDED", + "judgment_budget_exceeded", + ), + (BudgetExceededError("over"), "ubi", "OPENAI_BUDGET_EXCEEDED", "ubi_budget_exceeded"), + ( + UnknownModelPricingError("?"), + "judgment", + "UNKNOWN_MODEL_PRICING", + "judgment_unknown_pricing", + ), + (UnknownModelPricingError("?"), "ubi", "UNKNOWN_MODEL_PRICING", "ubi_unknown_pricing"), + ], +) +async def test_fail_on_budget_or_pricing_error( + monkeypatch: pytest.MonkeyPatch, + exc: Exception, + prefix: str, + expected_reason: str, + expected_event: str, +) -> None: + """Maps exc → failed_reason and emits the per-worker operability event_type.""" + captured: dict[str, Any] = {} + + async def fake_update( + db: Any, jid: str, *, status: str, failed_reason: str | None = None + ) -> None: + captured["status"] = status + captured["reason"] = failed_reason + + monkeypatch.setattr(repo, "update_judgment_list_status", fake_update) + + with capture_logs() as logs: + await fail_on_budget_or_pricing_error( + cast(Any, _FakeFactory()), + "jl-1", + cast(Any, exc), + logger=structlog.get_logger("test"), + event_prefix=prefix, + ) + + assert captured == {"status": "failed", "reason": expected_reason} + assert any(entry.get("event_type") == expected_event for entry in logs) diff --git a/backend/workers/digest.py b/backend/workers/digest.py index 8574a61d..66c5b0b5 100644 --- a/backend/workers/digest.py +++ b/backend/workers/digest.py @@ -78,7 +78,7 @@ remap_search_space_for_swap_target, ) from backend.app.eval.optuna_runtime import build_pruner, build_sampler, get_or_create_study -from backend.app.llm.budget_gate import peek_daily_total +from backend.app.llm.budget_gate import peek_daily_total, safe_record_cost from backend.app.llm.capability_check import read_capability_result from backend.app.llm.cost_model import ( compute_call_cost, @@ -88,7 +88,7 @@ from backend.app.llm.digest_prompt import load_digest_prompts, render_digest_user_prompt from backend.app.services.study_confidence import fetch_study_confidence from backend.app.services.study_convergence import fetch_study_convergence -from backend.workers.helpers import close_quietly, safe_record_cost +from backend.workers.helpers import close_quietly logger = structlog.get_logger(__name__) diff --git a/backend/workers/helpers.py b/backend/workers/helpers.py index 1b1feb48..7d937bde 100644 --- a/backend/workers/helpers.py +++ b/backend/workers/helpers.py @@ -9,9 +9,6 @@ from typing import Any import structlog -from redis.asyncio import Redis - -from backend.app.llm.budget_gate import record_cost async def close_quietly(client: Any, *, logger: structlog.stdlib.BoundLogger, label: str) -> None: @@ -33,38 +30,3 @@ async def close_quietly(client: Any, *, logger: structlog.stdlib.BoundLogger, la await closer() except Exception: # noqa: BLE001 — cleanup must not mask the job outcome logger.debug(f"{label} close raised", exc_info=True) - - -async def safe_record_cost( - redis: Redis, - cost_usd: float, - *, - logger: structlog.stdlib.BoundLogger, - log_message: str, - event_type: str, -) -> float | None: - """Record an LLM cost, swallowing transient Redis failures. - - Per GPT-5.5 cycle-2 C2-F3 (feat_llm_judgments): a Redis hiccup AFTER a - paid LLM call must not propagate up and abort the worker — the worker - persists its artifacts (judgments, digest) BEFORE calling this, so - under-counting daily spend during a Redis outage is recoverable on - rollover while losing the paid-for output is not. Returns ``None`` on - failure. - - ``log_message`` / ``event_type`` are passed by the caller so each worker - keeps its own log voice (``judgment worker: …`` / - ``judgment_record_cost_failed`` etc.) while sharing the one defensive - record-cost contract. - """ - try: - return await record_cost(redis, cost_usd) - except Exception as exc: # noqa: BLE001 — defensive - logger.warning( - log_message, - event_type=event_type, - cost_usd=cost_usd, - error_type=type(exc).__name__, - error=str(exc), - ) - return None diff --git a/backend/workers/judgments.py b/backend/workers/judgments.py index 8742037f..3d93f2f4 100644 --- a/backend/workers/judgments.py +++ b/backend/workers/judgments.py @@ -40,310 +40,30 @@ from __future__ import annotations import time -from collections.abc import Sequence from typing import Any, cast -import openai import structlog -import uuid_utils from openai import AsyncOpenAI from redis.asyncio import Redis -from backend.app.adapters.protocol import NativeQuery, QueryTemplate +from backend.app.adapters.protocol import QueryTemplate from backend.app.core.settings import get_settings from backend.app.db import repo from backend.app.db.session import get_session_factory -from backend.app.domain.study.template_defaults import compute_default_params -from backend.app.llm.budget_gate import ( - BudgetExceededError, - peek_daily_total, -) +from backend.app.llm.budget_gate import BudgetExceededError from backend.app.llm.cost_model import UnknownModelPricingError, estimated_max_call_cost -from backend.app.llm.openai_judge import rate_query_batch -from backend.app.llm.prompt_loader import load_judgment_prompts, render_user_prompt +from backend.app.llm.prompt_loader import load_judgment_prompts from backend.app.services.cluster import build_adapter -from backend.workers.helpers import close_quietly, safe_record_cost +from backend.app.services.judgment_generation import ( + fail_judgment_list, + fail_on_budget_or_pricing_error, + process_judgment_query, +) +from backend.workers.helpers import close_quietly logger = structlog.get_logger(__name__) -TOP_K = 50 -"""Retrieval depth per query (per spec §13 cost guardrail). - -50 is the design target: enough docs to give the LLM useful relevance -contrast across the rubric scale, low enough that one call per query keeps -the tutorial cost under $1.""" - -_DOC_BODY_CHAR_LIMIT = 500 -"""Per-doc body truncation length (per spec §13). - -Bounds the input token count; the worker passes only the first 500 chars of -each doc body to the LLM. The doc body is just for the rubric judgment, not -for retrieval — full-text retention isn't needed.""" - - -def _build_doc_inputs( - hits: Sequence[Any], -) -> list[dict[str, str]]: - """Translate ``ScoredHit`` rows into ``[{doc_id, body}, ...]`` for the prompt. - - Body extraction prefers ``hit.source.body`` (the canonical text field); - falls back to a JSON-dumped ``hit.source`` when absent. The result is - trimmed to :data:`_DOC_BODY_CHAR_LIMIT` characters per doc. - """ - out: list[dict[str, str]] = [] - for hit in hits: - source = getattr(hit, "source", None) or {} - body_raw = source.get("body") if isinstance(source, dict) else None - if not body_raw: - # Fall back to a stable string form of the source. - import json as _json - - body_raw = _json.dumps(source, ensure_ascii=False) - body = str(body_raw) - if len(body) > _DOC_BODY_CHAR_LIMIT: - body = body[:_DOC_BODY_CHAR_LIMIT] - out.append({"doc_id": str(hit.doc_id), "body": body}) - return out - - -async def _safe_record_cost(redis: Redis, cost_usd: float) -> float | None: - """Record cost, swallowing transient Redis failures (judgment worker voice).""" - return await safe_record_cost( - redis, - cost_usd, - logger=logger, - log_message="judgment worker: record_cost failed (budget telemetry only)", - event_type="judgment_record_cost_failed", - ) - - -async def _process_query( - *, - db: Any, - redis: Redis, - openai_client: AsyncOpenAI, - judgment_list_id: str, - judgment_list: Any, - template: QueryTemplate, - template_row: Any, - query: Any, - adapter: Any, - bundle_system: str, - rubric_text: str, - model: str, - budget_usd: float, -) -> bool: - """Inner per-query routine. - - Returns: - ``True`` when judgments were persisted (success or already-judged - resume-skip) and ``False`` when the query was skipped for any - operational reason (search failed, LLM partial, empty hits). - The outer loop uses this to decide whether to mark the list - ``complete`` (all True) or ``failed`` with - ``failed_reason='PARTIAL_LLM_FAILURE'`` (any False). Per GPT-5.5 - cycle-8 C8-F1 — without the tracking, a partial-LLM-response - skip would leave the list ``complete`` with missing qrels and - the resume sweep would never pick it back up. - - Raises on budget / pricing failures so the outer loop can mark the - list ``failed`` with the specific reason. - """ - # Resume-skip: if this query already has ANY judgments, the prior worker - # pass either completed it OR was atomically rolled back. Because - # bulk_create_judgments + commit happens in one transaction after the - # LLM call, the only post-crash states are "0 rows" or "the full batch - # returned by the LLM". A hardcoded ``existing >= TOP_K`` would fail for - # queries that legitimately returned fewer than TOP_K hits (sparse - # indices, tutorial-scale datasets) and would re-spend OpenAI dollars. - # Per GPT-5.5 final review F2. - existing = await repo.count_judgments_for_list_and_query(db, judgment_list_id, query.id) - if existing > 0: - logger.info( - "query already judged, skipping", - event_type="judgment_skip_resume", - judgment_list_id=judgment_list_id, - query_id=query.id, - existing_count=existing, - ) - return True - - # Pre-call budget peek (spec FR-2 + GPT-5.5 cycle 1 F8). - if budget_usd > 0: - current = await peek_daily_total(redis) - est_max = estimated_max_call_cost(model) - if current + est_max > budget_usd: - raise BudgetExceededError( - f"current ${current:.4f} + estimated ${est_max:.4f} > budget ${budget_usd:.4f}" - ) - - # Render template + execute search. - default_params = compute_default_params(template_row) - try: - native = adapter.render(template, default_params, query.query_text) - # Override the adapter-generated query_id with our own so the - # search_batch response key matches what we expect. - native = NativeQuery(query_id=str(query.id), body=native.body) - hits_by_qid = await adapter.search_batch( - target=judgment_list.target, - queries=[native], - top_k=TOP_K, - strict_errors=False, - ) - except Exception as exc: - logger.warning( - "judgment worker: search failed for query, skipping", - event_type="judgment_search_failed", - judgment_list_id=judgment_list_id, - query_id=query.id, - error_type=type(exc).__name__, - error=str(exc), - ) - return False - - hits = hits_by_qid.get(str(query.id), []) - if not hits: - # Zero hits: not a worker failure — there's genuinely nothing to - # judge. Count this as success so the outer loop can still mark - # the list complete. The downstream qrels_loader returns ``{}`` - # for queries with no judgments, which run_trial handles - # gracefully. - logger.info( - "judgment worker: no hits for query, skipping LLM call", - event_type="judgment_no_hits", - judgment_list_id=judgment_list_id, - query_id=query.id, - ) - return True - - # Ordinal prompt-ids decouple engine-supplied doc_ids (which may contain - # XML-sensitive chars like ``<``, ``&``, ``"``) from the LLM's - # round-trippable identifier. Autoescape on the Jinja sandbox would - # otherwise render ```` as ```` — the - # LLM would echo back ``a&b`` and the worker's allowlist - # (which has ``a&b``) would drop it as spurious, producing a permanent - # zero-judgments outcome. Per GPT-5.5 cycle-6 C6-F1. - raw_docs = _build_doc_inputs(hits) - prompt_docs = [{"doc_id": f"item-{i}", "body": d["body"]} for i, d in enumerate(raw_docs)] - prompt_id_to_real = {f"item-{i}": d["doc_id"] for i, d in enumerate(raw_docs)} - expected_doc_ids = set(prompt_id_to_real.keys()) - - user_prompt = render_user_prompt( - rubric_text=rubric_text, - query_text=query.query_text, - docs=prompt_docs, - ) - - # Spec FR-3c: "The actual prompt sent to OpenAI MUST include this rubric - # in full as part of the system prompt." We append the per-list rubric - # to the operator-fixed system message so the rubric appears at the - # top of the instruction hierarchy AND inside the user message's - # delimiter block (defense in depth). - system_prompt = f"{bundle_system}\n\n\n{rubric_text}\n" - - try: - result = await rate_query_batch( - client=openai_client, - model=model, - system_prompt=system_prompt, - user_prompt=user_prompt, - expected_doc_ids=expected_doc_ids, - ) - except ( - openai.AuthenticationError, - openai.PermissionDeniedError, - openai.BadRequestError, - openai.NotFoundError, - ): - # Persistent provider misconfiguration (bad key, model id, endpoint, - # ZDR enrollment denied, etc.). No subsequent query will succeed — - # propagate so the outer handler marks the list failed with - # ``failed_reason='UNEXPECTED:'`` rather than silently - # producing a `complete` list with zero judgments. Per GPT-5.5 - # cycle-2 C2-F1. - raise - except Exception as exc: - # Per-query operational failure (rate-limit exhaustion after retries, - # 5xx after retries, malformed JSON after retries). Subsequent queries - # may still succeed; isolate this one. - logger.warning( - "judgment worker: LLM call failed for query, skipping", - event_type="judgment_llm_failed", - judgment_list_id=judgment_list_id, - query_id=query.id, - error_type=type(exc).__name__, - error=str(exc), - ) - return False - - # All-or-nothing persistence: confirm the response is a *set-equal* - # match for the expected doc_ids. A simple ``len(ratings) < - # len(expected_doc_ids)`` check would still admit duplicates like - # ``[d1, d1]`` when expected was ``{d1, d2}`` — the UNIQUE constraint - # would then admit only the first row and resume-skip would strand d2 - # forever (per GPT-5.5 cycle-3 C3-F1). Require exact set equality - # AND that the LLM did not repeat any doc_id (no duplicates). - returned_ids = [r.doc_id for r in result.ratings] - returned_set = set(returned_ids) - if returned_set != expected_doc_ids or len(returned_ids) != len(returned_set): - logger.warning( - "judgment worker: LLM response not set-equal to expected docs; " - "skipping partial persist for retry", - event_type="judgment_partial_response", - judgment_list_id=judgment_list_id, - query_id=query.id, - expected=len(expected_doc_ids), - returned_unique=len(returned_set), - returned_total=len(returned_ids), - ) - # Still record the cost — we paid for the call. The retry pays again, - # but the alternative (permanent partial state) is worse. - await _safe_record_cost(redis, result.cost_usd) - return False - - rater_ref = f"openai:{result.model}" - rows = [ - { - "id": str(uuid_utils.uuid7()), - "judgment_list_id": judgment_list_id, - "query_id": str(query.id), - # Map prompt-only ordinal id (``item-N``) back to the real - # engine-supplied doc_id for DB persistence (C6-F1). - "doc_id": prompt_id_to_real[r.doc_id], - "rating": r.rating, - "source": "llm", - "rater_ref": rater_ref, - "notes": r.rationale, - } - for r in result.ratings - ] - # Persist FIRST, then record cost. If Redis is transiently unavailable - # at the record_cost step we'd otherwise drop the already-paid-for - # ratings (per GPT-5.5 cycle-2 C2-F3). Order swap means we may - # under-count daily spend if Redis flaps, which is recoverable on - # rollover; losing paid-for judgments is not. - if rows: - await repo.bulk_create_judgments(db, rows) - await db.commit() - - new_total = await _safe_record_cost(redis, result.cost_usd) - - logger.info( - "judgment query processed", - event_type="judgment_query_complete", - judgment_list_id=judgment_list_id, - query_id=query.id, - ratings_count=len(result.ratings), - input_tokens=result.input_tokens, - output_tokens=result.output_tokens, - cost_usd=result.cost_usd, - running_total_usd=new_total, - duration_ms=result.duration_ms, - ) - return True - - async def generate_judgments_llm(ctx: dict[str, Any], judgment_list_id: str) -> None: """Arq entry point — run the LLM judge pipeline for one judgment list. @@ -382,7 +102,7 @@ async def generate_judgments_llm(ctx: dict[str, Any], judgment_list_id: str) -> cluster = await repo.get_cluster(db, judgment_list.cluster_id) if cluster is None: - await _fail_list( + await fail_judgment_list( db, judgment_list_id, "CLUSTER_NOT_FOUND", @@ -393,7 +113,7 @@ async def generate_judgments_llm(ctx: dict[str, Any], judgment_list_id: str) -> if judgment_list.current_template_id is not None: template_row = await repo.get_query_template(db, judgment_list.current_template_id) if template_row is None: - await _fail_list(db, judgment_list_id, "TEMPLATE_NOT_FOUND") + await fail_judgment_list(db, judgment_list_id, "TEMPLATE_NOT_FOUND") return queries = await repo.list_queries_for_set(db, judgment_list.query_set_id) @@ -407,7 +127,7 @@ async def generate_judgments_llm(ctx: dict[str, Any], judgment_list_id: str) -> api_key = settings.openai_api_key if not api_key: async with factory() as db: - await _fail_list(db, judgment_list_id, "OPENAI_NOT_CONFIGURED") + await fail_judgment_list(db, judgment_list_id, "OPENAI_NOT_CONFIGURED") return model = settings.openai_model # Pricing must be known so the budget gate can fire honestly. @@ -415,7 +135,7 @@ async def generate_judgments_llm(ctx: dict[str, Any], judgment_list_id: str) -> estimated_max_call_cost(model) except UnknownModelPricingError: async with factory() as db: - await _fail_list(db, judgment_list_id, "UNKNOWN_MODEL_PRICING") + await fail_judgment_list(db, judgment_list_id, "UNKNOWN_MODEL_PRICING") return openai_client = AsyncOpenAI(api_key=api_key, base_url=settings.openai_base_url) @@ -436,7 +156,7 @@ async def generate_judgments_llm(ctx: dict[str, Any], judgment_list_id: str) -> for query in queries: async with factory() as db: try: - ok = await _process_query( + ok = await process_judgment_query( db=db, redis=redis_client, openai_client=openai_client, @@ -453,25 +173,10 @@ async def generate_judgments_llm(ctx: dict[str, Any], judgment_list_id: str) -> ) if not ok: skipped_query_ids.append(str(query.id)) - except BudgetExceededError as exc: - logger.warning( - "judgment worker: budget exceeded — aborting loop", - event_type="judgment_budget_exceeded", - judgment_list_id=judgment_list_id, - error=str(exc), + except (BudgetExceededError, UnknownModelPricingError) as exc: + await fail_on_budget_or_pricing_error( + factory, judgment_list_id, exc, logger=logger, event_prefix="judgment" ) - async with factory() as db2: - await _fail_list(db2, judgment_list_id, "OPENAI_BUDGET_EXCEEDED") - return - except UnknownModelPricingError as exc: - logger.warning( - "judgment worker: unknown model pricing — aborting loop", - event_type="judgment_unknown_pricing", - judgment_list_id=judgment_list_id, - error=str(exc), - ) - async with factory() as db2: - await _fail_list(db2, judgment_list_id, "UNKNOWN_MODEL_PRICING") return # All queries processed. If any query was skipped (search failed, @@ -518,7 +223,7 @@ async def generate_judgments_llm(ctx: dict[str, Any], judgment_list_id: str) -> ) try: async with factory() as db: - await _fail_list(db, judgment_list_id, f"UNEXPECTED:{type(exc).__name__}") + await fail_judgment_list(db, judgment_list_id, f"UNEXPECTED:{type(exc).__name__}") except Exception: # noqa: BLE001 — defensive; nothing left to do logger.exception( "judgment worker: failed to record terminal status", @@ -527,11 +232,3 @@ async def generate_judgments_llm(ctx: dict[str, Any], judgment_list_id: str) -> finally: await close_quietly(openai_client, logger=logger, label="openai client") await close_quietly(redis_client, logger=logger, label="redis") - - -async def _fail_list(db: Any, judgment_list_id: str, failed_reason: str) -> None: - """Helper: flip a list to ``status='failed'`` with a structured reason.""" - await repo.update_judgment_list_status( - db, judgment_list_id, status="failed", failed_reason=failed_reason - ) - await db.commit() diff --git a/backend/workers/judgments_ubi.py b/backend/workers/judgments_ubi.py index 6fdab0d5..fa366d4d 100644 --- a/backend/workers/judgments_ubi.py +++ b/backend/workers/judgments_ubi.py @@ -100,14 +100,19 @@ from backend.app.llm.budget_gate import ( BudgetExceededError, peek_daily_total, + safe_record_cost, ) from backend.app.llm.cost_model import UnknownModelPricingError, estimated_max_call_cost from backend.app.llm.openai_judge import rate_query_batch from backend.app.llm.prompt_loader import load_judgment_prompts, render_user_prompt from backend.app.services.cluster import build_adapter +from backend.app.services.judgment_generation import ( + fail_judgment_list, + fail_on_budget_or_pricing_error, +) from backend.app.services.ubi_errors import UbiNotEnabledError from backend.app.services.ubi_reader import UbiReader -from backend.workers.helpers import close_quietly, safe_record_cost +from backend.workers.helpers import close_quietly logger = structlog.get_logger(__name__) @@ -358,12 +363,12 @@ async def generate_judgments_from_ubi(ctx: dict[str, Any], judgment_list_id: str event_type="ubi_missing_generation_params", judgment_list_id=judgment_list_id, ) - await _fail_list(db, judgment_list_id, "MISSING_GENERATION_PARAMS") + await fail_judgment_list(db, judgment_list_id, "MISSING_GENERATION_PARAMS") return cluster = await repo.get_cluster(db, judgment_list.cluster_id) if cluster is None: - await _fail_list(db, judgment_list_id, "CLUSTER_NOT_FOUND") + await fail_judgment_list(db, judgment_list_id, "CLUSTER_NOT_FOUND") return query_set_rows = await repo.list_queries_for_set(db, judgment_list.query_set_id) @@ -396,7 +401,7 @@ async def generate_judgments_from_ubi(ctx: dict[str, Any], judgment_list_id: str ) except UbiNotEnabledError as exc: async with factory() as db: - await _fail_list(db, judgment_list_id, "UBI_NOT_ENABLED") + await fail_judgment_list(db, judgment_list_id, "UBI_NOT_ENABLED") logger.warning( "ubi worker: ubi not enabled mid-run", event_type="ubi_not_enabled_mid_run", @@ -409,7 +414,7 @@ async def generate_judgments_from_ubi(ctx: dict[str, Any], judgment_list_id: str # Race fallback: preflight U-D2 catches the obvious case; this # fires only when the in-flight window's data vanished. async with factory() as db: - await _fail_list(db, judgment_list_id, "UBI_INSUFFICIENT_DATA") + await fail_judgment_list(db, judgment_list_id, "UBI_INSUFFICIENT_DATA") logger.info( "ubi worker: empty features after probe — race fallback fired", event_type="ubi_empty_features_race", @@ -520,20 +525,11 @@ def _lookup_query_text(query_id: str) -> str: rated_queries = {qid for qid, _doc in ratings} all_scoped_queries = {qid for qid, _doc in scoped_features} sparse_skip_count = len(all_scoped_queries - rated_queries) - except BudgetExceededError as exc: - async with factory() as db: - await _fail_list(db, judgment_list_id, "OPENAI_BUDGET_EXCEEDED") - logger.warning( - "ubi worker: hybrid budget exceeded mid-loop", - event_type="ubi_budget_exceeded", - judgment_list_id=judgment_list_id, - error=str(exc), + except (BudgetExceededError, UnknownModelPricingError) as exc: + await fail_on_budget_or_pricing_error( + factory, judgment_list_id, exc, logger=logger, event_prefix="ubi" ) return - except UnknownModelPricingError: - async with factory() as db: - await _fail_list(db, judgment_list_id, "UNKNOWN_MODEL_PRICING") - return # Build rows. Pair source = 'click' (pure UBI rating) if the inner # converter rated it; 'llm' if the hybrid LLM-fill callback filled it. @@ -609,7 +605,7 @@ def _lookup_query_text(query_id: str) -> str: ) try: async with factory() as db: - await _fail_list(db, judgment_list_id, f"UNEXPECTED:{type(exc).__name__}") + await fail_judgment_list(db, judgment_list_id, f"UNEXPECTED:{type(exc).__name__}") except Exception: # noqa: BLE001 logger.exception( "ubi worker: failed to record terminal status", @@ -693,14 +689,6 @@ def _build_converter( # ---------------------------------------------------------------------------- -async def _fail_list(db: Any, judgment_list_id: str, failed_reason: str) -> None: - """Helper: flip a list to ``status='failed'`` with a structured reason.""" - await repo.update_judgment_list_status( - db, judgment_list_id, status="failed", failed_reason=failed_reason - ) - await db.commit() - - async def _write_calibration_and_complete( db: Any, *, diff --git a/docs/00_overview/implemented_features/2026_05_29_feat_ubi_judgments/feature_spec.md b/docs/00_overview/implemented_features/2026_05_29_feat_ubi_judgments/feature_spec.md index 6bfabfad..c3f2b39e 100644 --- a/docs/00_overview/implemented_features/2026_05_29_feat_ubi_judgments/feature_spec.md +++ b/docs/00_overview/implemented_features/2026_05_29_feat_ubi_judgments/feature_spec.md @@ -738,7 +738,7 @@ All glossary keys must be added to [`ui/src/lib/glossary.ts`](../../../../../ui/ - Boot-time resume sweep at `backend/workers/all.py` re-enqueues any `generating` UBI list when the worker boots. - Terminal `failed` only on global failures. - **Operability:** - - Every worker log line includes `judgment_list_id`, `event_type` (one of: `ubi_read_complete`, `ubi_converter_complete`, `ubi_persist_complete`, `ubi_resume_skip`, `ubi_per_query_skipped`, `ubi_list_complete`, `ubi_list_failed`, `ubi_budget_exceeded`). + - Every worker log line includes `judgment_list_id`, `event_type` (one of: `ubi_read_complete`, `ubi_converter_complete`, `ubi_persist_complete`, `ubi_resume_skip`, `ubi_per_query_skipped`, `ubi_list_complete`, `ubi_list_failed`, `ubi_budget_exceeded`, `ubi_unknown_pricing`). (`ubi_unknown_pricing` added when the shared `fail_on_budget_or_pricing_error` service helper unified the budget/pricing failure path across both judgment workers — the UBI unknown-pricing path previously failed silently.) - `/healthz` is unaffected — UBI doesn't introduce a startup-time check (the readiness probe is per-request, not boot-time). - Metrics surfaced on the judgment-list detail page: `judgment_count`, `source_breakdown`, `calibration.coverage_pct`, `calibration.llm_fill_calls` (hybrid only). - **Accessibility / usability:**