diff --git a/app/features/demo/hitl.py b/app/features/demo/hitl.py new file mode 100644 index 00000000..32a19edb --- /dev/null +++ b/app/features/demo/hitl.py @@ -0,0 +1,95 @@ +"""HITL decision relay for the showcase pipeline (E5, issue #411). + +A single-slot, in-memory store that lets the Showcase HITL step card relay an +operator's Approve/Reject decision back to the in-flight pipeline. The browser +POSTs ``/demo/hitl-decision`` (demo slice); :func:`resolve` records the decision +and wakes the waiting step, which then forwards the real decision to the agents +HITL gate (``POST /agents/sessions/{id}/approve`` with ``approved=true|false``). + +This is module-level mutable state. It is SAFE because +``app.features.demo.service._pipeline_lock`` enforces exactly one pipeline per +process, and ``step_agent_hitl_flow`` registers at most one pending action per +run (precedent for module-level demo state: the lock itself, ``service.py:19``). +Defensive anyway: :func:`register` overwrites any stale slot from a crashed run +so the next run can never wedge, and the step clears the slot in a ``finally``. +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from typing import Literal + +from app.core.logging import get_logger + +logger = get_logger(__name__) + +Decision = Literal["approved", "rejected"] +ResolveOutcome = Literal["applied", "already_decided", "not_found"] + + +@dataclass +class _PendingDecision: + """The one open decision window, or ``None`` when no step is awaiting.""" + + action_id: str + event: asyncio.Event = field(default_factory=asyncio.Event) + decision: Decision | None = None + reason: str | None = None + + +_slot: _PendingDecision | None = None # module-level; one pipeline at a time + + +def register(action_id: str) -> None: + """Open the decision window for ``action_id``. + + Overwrites any stale slot left by a crashed run so a wedged slot can never + block the next run. Called by ``step_agent_hitl_flow`` immediately before it + yields the intermediate ``awaiting_approval`` event. + """ + global _slot + _slot = _PendingDecision(action_id=action_id) + + +def resolve(action_id: str, decision: Decision, reason: str | None = None) -> ResolveOutcome: + """Record the operator's decision; called by ``POST /demo/hitl-decision``. + + Returns ``"not_found"`` when no window is open for ``action_id`` (nothing + pending under that id), ``"already_decided"`` when a decision already + landed, and ``"applied"`` on success. + """ + if _slot is None or _slot.action_id != action_id: + return "not_found" + if _slot.decision is not None: + return "already_decided" + _slot.decision = decision + _slot.reason = reason + _slot.event.set() + logger.info("demo.hitl_decision_resolved", action_id=action_id, decision=decision) + return "applied" + + +async def wait_for_decision(action_id: str, timeout: float) -> tuple[Decision, str | None] | None: + """Block up to ``timeout`` seconds for an operator decision. + + Returns the ``(decision, reason)`` pair when the operator decided in time, + or ``None`` when the window lapsed (the caller then auto-approves) or when + no slot is open for ``action_id`` (defensive -- the step always registers + first). + """ + if _slot is None or _slot.action_id != action_id: + return None + try: + await asyncio.wait_for(_slot.event.wait(), timeout=timeout) + except TimeoutError: + return None + if _slot.decision is None: # defensive: event set without a decision + return None + return (_slot.decision, _slot.reason) + + +def clear() -> None: + """Close the decision window (called from the step's ``finally``).""" + global _slot + _slot = None diff --git a/app/features/demo/models.py b/app/features/demo/models.py index 4897f621..207aed33 100644 --- a/app/features/demo/models.py +++ b/app/features/demo/models.py @@ -130,8 +130,12 @@ class ShowcaseWorkspace(TimestampMixin, Base): ) # Version of the workspace config + story-slot schema (umbrella #406 # junk-drawer mitigation). Bump the ORM default when a slot shape changes. + # E5 (#411) bumped 1 -> 2: it widened the approval_events.decision enum + # (+timed_out), added "probe" to rag_events.event, and added the additive + # entry keys documented below. server_default stays text("1") -- no + # migration; old rows legitimately read 1. config_schema_version: Mapped[int] = mapped_column( - Integer, nullable=False, default=1, server_default=text("1") + Integer, nullable=False, default=2, server_default=text("1") ) # ── E1 (#407) — replay provenance ───────────────────────────────────── @@ -152,13 +156,22 @@ class ShowcaseWorkspace(TimestampMixin, Base): # user_scope (E3 #409 writes) — dict: operator-selected focus, # {"store_id": int, "product_id": int} (additive keys # allowed later). - # approval_events (E5 #411 writes) — list[dict], append-only: + # approval_events (E5 #411 writes — schema v2) — list[dict], append-only: # {"action_id": str, "tool_name": str, - # "decision": "approved"|"rejected", - # "decided_at": iso8601-str, "session_id": str}. - # rag_events (E5 #411 writes) — list[dict], append-only: - # {"event": "index"|"retrieve"|"skip", "detail": str, - # "count": int, "occurred_at": iso8601-str}. + # "decision": "approved"|"rejected"|"timed_out", + # "decided_at": iso8601-str, "session_id": str, + # # v2 additive (config_schema_version >= 2): + # "auto_approved": bool, "reason": str|None, + # "execution_status": str|None, + # "tool_call_summary": {"description": str, + # "arguments_keys": list[str]}, + # "transcript_summary": str, "tokens_used": int, + # "tool_calls_count": int}. + # rag_events (E5 #411 writes — schema v2) — list[dict], append-only: + # {"event": "probe"|"index"|"retrieve"|"skip", + # "status": "pass"|"warn"|"skip", "detail": str, + # "count": int, "occurred_at": iso8601-str, + # "provider": str|None, "reachable": bool|None}. # job_ids (later parallel epic) — list[str]: job / batch # sub-job ids the run submitted (soft references). # phase_summaries (later parallel epic) — list[dict], one per phase: diff --git a/app/features/demo/pipeline.py b/app/features/demo/pipeline.py index a91c1dd9..4602e8c8 100644 --- a/app/features/demo/pipeline.py +++ b/app/features/demo/pipeline.py @@ -40,7 +40,7 @@ from app.core.config import get_settings from app.core.logging import get_logger from app.core.problem_details import EMBEDDING_AUTH_CODE, ERROR_TYPES -from app.features.demo import workspace +from app.features.demo import hitl, workspace from app.features.demo.schemas import DemoRunRequest, StepEvent, StepStatus, UserScope from app.shared.model_taxonomy import KNOWN_MODEL_TYPES from app.shared.seeder.config import ScenarioPreset @@ -305,7 +305,7 @@ class DemoContext: # PRP-41 — additive HITL approval state, populated only by # step_agent_hitl_flow on SHOWCASE_RICH. Remain None on every other path. approval_action_id: str | None = None - agent_approval_decision: str | None = None # "executed"|"rejected"|"expired"|"timed_out" + agent_approval_decision: str | None = None # executed|rejected|external_4xx|timed_out # E1 (#390) -- workspace persistence. Set only on preservation="keep" runs # (and only when the row insert succeeded); None on ephemeral runs. workspace_id: str | None = None @@ -322,6 +322,13 @@ class DemoContext: # metric). Defaults to the all-legacy ResolvedRunConfig so a frame without # the new fields behaves byte-identically. run_config: ResolvedRunConfig = field(default_factory=ResolvedRunConfig) + # E5 (#411) -- story-capture accumulators. Appended by step_agent_hitl_flow + # and the three knowledge steps on SHOWCASE_RICH; finalize_workspace + # persists them to the workspace slots (empty list -> slot stays NULL). + # Always append in-memory (cheap, cannot fail); only the DB write is + # fallible (warn-and-continue). + approval_events: list[dict[str, Any]] = field(default_factory=list) + rag_events: list[dict[str, Any]] = field(default_factory=list) # ============================================================================= @@ -379,10 +386,13 @@ def _llm_key_present() -> bool: return False -# PRP-41 — HITL approval flow constants. Display delay gives the visitor a -# window to click Approve on the FE before the backend auto-fires; the hard +# PRP-41 / E5 (#411) — HITL approval flow constants. The decision window is the +# span the FE renders Approve + Reject and the step waits on the in-memory relay +# (D3: 10 s -- 3 s was unclickable by a human; 10 s stays well under the 90 s +# hard timeout and the 180 s soft budget). It is emitted to the FE as +# ``data.decision_window_s`` so the countdown never hardcodes it. The hard # timeout is the load-bearing fallback so a hung agent never stops the demo. -_APPROVAL_DISPLAY_DELAY_S = 3.0 +_APPROVAL_DECISION_WINDOW_S = 10.0 _APPROVAL_HARD_TIMEOUT_S = 90.0 _HITL_PROMPT = ( "Save a 10% price-cut scenario plan for the demo-production model " @@ -390,6 +400,53 @@ def _llm_key_present() -> bool: ) +def _record_approval_event( + ctx: DemoContext, + *, + action_id: str, + tool_name: str, + decision: str, + session_id: str, + auto_approved: bool, + reason: str | None, + execution_status: str | None, + pending_action: dict[str, Any], + transcript_summary: str, + tokens_used: int, + tool_calls_count: int, +) -> None: + """Append one approval-event entry to ``ctx.approval_events`` (E5, #411). + + ``tool_call_summary`` carries the action description + argument KEYS only -- + never values (security-patterns.md: never echo full payloads; values may + embed user-supplied text). Schema v2 -- see DOMAIN_MODEL § showcase_workspace. + """ + raw_args = pending_action.get("arguments") + arguments_keys = sorted(raw_args) if isinstance(raw_args, dict) else [] + description_raw = pending_action.get("description") + description = description_raw if isinstance(description_raw, str) else "" + ctx.approval_events.append( + { + "action_id": action_id, + "tool_name": tool_name, + "decision": decision, + "decided_at": datetime.now(UTC).isoformat(), + "session_id": session_id, + # -- E5 (#411) additive (config_schema_version >= 2) -- + "auto_approved": auto_approved, + "reason": reason, + "execution_status": execution_status, + "tool_call_summary": { + "description": description, + "arguments_keys": arguments_keys, + }, + "transcript_summary": transcript_summary, + "tokens_used": tokens_used, + "tool_calls_count": tool_calls_count, + } + ) + + # PRP-40 — artifact-key parser for /scenarios/* run_id resolution. Two ID # spaces: model_run.run_id (32-char UUID-hex) vs scenarios.run_id (12-char # artifact key parsed from `model_{KEY}.joblib`). Memory anchor: @@ -1670,6 +1727,35 @@ async def step_multi_plan_compare(ctx: DemoContext, client: _Client) -> StepResu ) +def _record_rag_event( + ctx: DemoContext, + *, + event: str, + status: str, + detail: str, + count: int = 0, + provider: str | None = None, + reachable: bool | None = None, +) -> None: + """Append one RAG-event entry to ``ctx.rag_events`` (E5, #411). + + Called once on EVERY return path of the three knowledge steps so the + workspace story records the knowledge outcome (probe / index / retrieve / + skip) with the provider state. Schema v2 -- see DOMAIN_MODEL. + """ + ctx.rag_events.append( + { + "event": event, + "status": status, + "detail": detail, + "count": count, + "occurred_at": datetime.now(UTC).isoformat(), + "provider": provider, + "reachable": reachable, + } + ) + + async def step_embedding_provider_probe(ctx: DemoContext, client: _Client) -> StepResult: """PRP-40 — probe the configured embedding provider. Always PASS. @@ -1689,6 +1775,9 @@ async def step_embedding_provider_probe(ctx: DemoContext, client: _Client) -> St if reachable else f"provider={provider} unreachable — knowledge phase will skip" ) + _record_rag_event( + ctx, event="probe", status="pass", detail=detail, provider=provider, reachable=reachable + ) return ("pass", detail, {"provider": provider, "reachable": reachable}) @@ -1699,7 +1788,15 @@ async def step_rag_index_subset(ctx: DemoContext, client: _Client) -> StepResult Uses the additive ``path_prefix`` field on IndexProjectDocsRequest so the blast radius stays scoped to the user-guide subset. """ + provider = get_settings().rag_embedding_provider if ctx.embedding_unreachable: + _record_rag_event( + ctx, + event="skip", + status="skip", + detail="embedding provider unreachable", + provider=provider, + ) return ("skip", "embedding provider unreachable", {}) try: @@ -1721,6 +1818,13 @@ async def step_rag_index_subset(ctx: DemoContext, client: _Client) -> StepResult # context so the retrieve probe skips too, without a second 401 round-trip. if _is_embedding_auth_error(exc): ctx.embedding_unreachable = True + _record_rag_event( + ctx, + event="skip", + status="skip", + detail="embedding provider rejected credentials", + provider=provider, + ) return ("skip", "embedding provider rejected credentials", {}) raise results = body.get("results") or [] @@ -1734,9 +1838,18 @@ async def step_rag_index_subset(ctx: DemoContext, client: _Client) -> StepResult for r in results if isinstance(r, dict) and r.get("source_path") in _USER_GUIDE_CURATED_FILES ) + detail = f"files_indexed={curated_hits}/5 chunks={total_chunks} failed={failed}" + _record_rag_event( + ctx, + event="index", + status="pass", + detail=detail, + count=total_chunks, + provider=provider, + ) return ( "pass", - f"files_indexed={curated_hits}/5 chunks={total_chunks} failed={failed}", + detail, { "total_files": int(body.get("total_files", 0)), "indexed": indexed, @@ -1755,7 +1868,15 @@ async def step_rag_retrieve_probe(ctx: DemoContext, client: _Client) -> StepResu SKIPs when ``ctx.embedding_unreachable``. WARN (not FAIL) on zero hits so a green-but-empty corpus still lets the pipeline go green. """ + provider = get_settings().rag_embedding_provider if ctx.embedding_unreachable: + _record_rag_event( + ctx, + event="skip", + status="skip", + detail="embedding provider unreachable", + provider=provider, + ) return ("skip", "embedding provider unreachable", {}) try: @@ -1770,13 +1891,24 @@ async def step_rag_retrieve_probe(ctx: DemoContext, client: _Client) -> StepResu # in case retrieve is reached with a freshly-rejecting key. if _is_embedding_auth_error(exc): ctx.embedding_unreachable = True + _record_rag_event( + ctx, + event="skip", + status="skip", + detail="embedding provider rejected credentials", + provider=provider, + ) return ("skip", "embedding provider rejected credentials", {}) raise results = body.get("results") or [] if not results: + detail = "no hits — corpus indexed but query did not match" + _record_rag_event( + ctx, event="retrieve", status="warn", detail=detail, count=0, provider=provider + ) return ( "warn", - "no hits — corpus indexed but query did not match", + detail, { "results_count": 0, "total_chunks_searched": body.get("total_chunks_searched", 0), @@ -1789,9 +1921,18 @@ async def step_rag_retrieve_probe(ctx: DemoContext, client: _Client) -> StepResu score = float(score_raw) except (TypeError, ValueError): score = 0.0 + detail = f"top hit: {title} (score={score:.3f})" + _record_rag_event( + ctx, + event="retrieve", + status="pass", + detail=detail, + count=len(results), + provider=provider, + ) return ( "pass", - f"top hit: {title} (score={score:.3f})", + detail, { "results_count": len(results), "top_source_path": title, @@ -2414,7 +2555,7 @@ async def step_cleanup(ctx: DemoContext, client: _Client) -> StepResult: async def step_agent_hitl_flow(ctx: DemoContext, client: _Client) -> StepResult: - """PRP-41 — HITL approval round-trip on the experiment agent. + """PRP-41 / E5 (#411) — HITL approval round-trip on the experiment agent. Flow: 1. ``_llm_key_present()`` -> skip when no key. @@ -2424,22 +2565,25 @@ async def step_agent_hitl_flow(ctx: DemoContext, client: _Client) -> StepResult: on the ``save_scenario`` entry in ``agent_require_approval``. The chat response carries ``pending_approval=true`` + ``pending_action: PendingAction``. - 4. ``client.yield_event(...)`` an intermediate step_complete with - ``status='running'`` + ``awaiting_approval=true`` so the FE can - render the Approve button. - 5. Sleep ``_APPROVAL_DISPLAY_DELAY_S`` -- a one-click FE Approve may - pre-empt the auto-approve in this window. - 6. ``POST /agents/sessions/{id}/approve`` with ``{action_id, - approved: true}``. Absorb 4xx (the FE pre-empted; the action was - already consumed). - 7. Terminal: ``pass`` with the approval decision in step.data. + 4. ``hitl.register(action_id)`` then ``client.yield_event(...)`` an + intermediate step_complete with ``status='running'`` + + ``awaiting_approval=true`` + ``decision_url`` + ``decision_window_s`` + so the FE renders Approve + Reject (E5). + 5. ``hitl.wait_for_decision(...)`` up to ``_APPROVAL_DECISION_WINDOW_S`` + -- the operator's Approve/Reject relayed via POST /demo/hitl-decision; + ``None`` on window lapse -> auto-approve. + 6. ``POST /agents/sessions/{id}/approve`` with the REAL decision + (``approved: true|false`` + optional reason). Absorb 4xx (an operator + pre-empted the agents endpoint directly; the action was consumed). + 7. Append one ``approval_events`` entry, then terminal ``pass`` -- a + reject is GREEN by design (D5): the gated tool never executed. Skip-gracefully on every error path (session-create / chat / approve failure, or the agent never triggers ``save_scenario``). Never raises. Hard timeout: if the elapsed time exceeds ``_APPROVAL_HARD_TIMEOUT_S`` before step (6) completes, returns ``skip`` with - ``approval_decision='timed_out'``. + ``approval_decision='timed_out'`` (and records a ``timed_out`` entry). """ key_present = _llm_key_present() logger.info("demo.agent_hitl_flow.key_present", present=key_present) @@ -2489,6 +2633,12 @@ async def step_agent_hitl_flow(ctx: DemoContext, client: _Client) -> StepResult: tokens_used = int(chat_body.get("tokens_used", 0)) raw_tool_calls = chat_body.get("tool_calls", []) tool_count = len(raw_tool_calls) if isinstance(raw_tool_calls, list) else 0 + # E5 (#411) -- transcript summary for the approval-events entry. Capped at + # 200 chars (precedent: the #335 failure-detail 300-char cap); never the + # full transcript (security-patterns.md). + transcript_summary = str(chat_body.get("message", ""))[:200] + raw_action_type = pending_action.get("action_type") + tool_name = raw_action_type if isinstance(raw_action_type, str) else "save_scenario" if not pending_approval or not pending_action: # The agent didn't trigger save_scenario (e.g. answered directly or @@ -2516,102 +2666,173 @@ async def step_agent_hitl_flow(ctx: DemoContext, client: _Client) -> StepResult: action_id: str = action_id_raw ctx.approval_action_id = action_id - # (4) -- intermediate event so the FE renders Approve. step_index / - # total_steps / phase_index / phase_total are stamped by the orchestrator - # when it drains the sink (see run_pipeline). - elapsed_ms = (time.monotonic() - started_at) * 1000.0 - client.yield_event( - StepEvent( - event_type="step_complete", - step_name="agent_hitl_flow", - step_index=0, - total_steps=0, - status="running", - detail="awaiting approval (auto-approve in 3 s)", - duration_ms=elapsed_ms, - data={ - "awaiting_approval": True, - "approval_url": f"/agents/sessions/{session_id}/approve", - "action_id": action_id, - "session_id": session_id, - "tokens_used": tokens_used, - "tool_calls_count": tool_count, - }, - phase_name=PHASE_AGENTS, - ) - ) - - # (5) -- display delay. - elapsed_after_intermediate = time.monotonic() - started_at - delay = max(0.0, _APPROVAL_DISPLAY_DELAY_S - elapsed_after_intermediate) - if delay > 0: - await asyncio.sleep(delay) - - # (5b) -- hard-timeout check BEFORE the approve POST. - elapsed_before_approve = time.monotonic() - started_at - if elapsed_before_approve > _APPROVAL_HARD_TIMEOUT_S: - ctx.agent_approval_decision = "timed_out" - return ( - "skip", - "approval timed out -- pipeline continued", - { - "session_id": session_id, - "action_id": action_id, - "approval_decision": "timed_out", - "tokens_used": tokens_used, - "tool_calls_count": tool_count, - "timed_out": True, - }, - ) - - # (6) -- POST /approve. Absorb 4xx (FE pre-empted) per Task 1 §5 #2: - # AgentService.approve_action returns 400 ("No pending action") when the - # action was already consumed by the FE's optimistic Approve click. - approval_decision = "executed" + # E5 (#411) -- open the decision window on the in-memory relay BEFORE the + # FE can see the action, then clear it on every exit (finally). The relay + # is the single intent channel: the FE Approve/Reject buttons POST + # /demo/hitl-decision (demo slice), this step waits on the relay, then + # forwards the REAL decision to the agents HITL gate. + hitl.register(action_id) try: - approve_body = await client.request( - "agent_hitl_flow[approve]", - "POST", - f"/agents/sessions/{session_id}/approve", - json_body={"action_id": action_id, "approved": True}, + # (4) -- intermediate event so the FE renders Approve + Reject. + # step_index / total_steps / phase_index / phase_total are stamped by + # the orchestrator when it drains the sink (see run_pipeline). D2 makes + # this event reach the browser DURING the window, not after it closes. + window_s = _APPROVAL_DECISION_WINDOW_S + elapsed_ms = (time.monotonic() - started_at) * 1000.0 + client.yield_event( + StepEvent( + event_type="step_complete", + step_name="agent_hitl_flow", + step_index=0, + total_steps=0, + status="running", + detail=f"awaiting approval (auto-approve in {int(window_s)} s)", + duration_ms=elapsed_ms, + data={ + "awaiting_approval": True, + # E5 -- the relay is the new intent channel; approval_url is + # kept for back-compat (an operator curl-ing it directly is + # still absorbed below as execution_status="external_4xx"). + "decision_url": "/demo/hitl-decision", + "decision_window_s": window_s, + "approval_url": f"/agents/sessions/{session_id}/approve", + "action_id": action_id, + "session_id": session_id, + "tokens_used": tokens_used, + "tool_calls_count": tool_count, + }, + phase_name=PHASE_AGENTS, + ) ) - raw_status = approve_body.get("status", "executed") - if isinstance(raw_status, str): - approval_decision = raw_status - except _StepError as exc: - if 400 <= exc.status_code < 500: - # FE pre-empted -- the approval already landed. Optimistic default. - logger.info( - "demo.agent_hitl_flow.approve_pre_empted", - session_id=session_id, + + # (5) -- wait up to the remaining window for an operator decision. + remaining = max(0.0, window_s - (time.monotonic() - started_at)) + operator = await hitl.wait_for_decision(action_id, timeout=remaining) + + # (5b) -- hard-timeout check BEFORE the approve POST (a hung agent / + # blocked window never stops the demo). timed_out -> skip + entry. + elapsed_before_approve = time.monotonic() - started_at + if elapsed_before_approve > _APPROVAL_HARD_TIMEOUT_S: + ctx.agent_approval_decision = "timed_out" + _record_approval_event( + ctx, action_id=action_id, - status_code=exc.status_code, + tool_name=tool_name, + decision="timed_out", + session_id=session_id, + auto_approved=False, + reason=None, + execution_status=None, + pending_action=pending_action, + transcript_summary=transcript_summary, + tokens_used=tokens_used, + tool_calls_count=tool_count, ) - approval_decision = "executed" - else: return ( "skip", - f"approve failed: {exc}", + "approval timed out -- pipeline continued", { "session_id": session_id, "action_id": action_id, + "approval_decision": "timed_out", "tokens_used": tokens_used, "tool_calls_count": tool_count, + "timed_out": True, }, ) - ctx.agent_approval_decision = approval_decision + # Resolve the operator's intent (None == window lapsed -> auto-approve). + auto_approved = operator is None + approved = operator is None or operator[0] == "approved" + reason = operator[1] if operator is not None else None + + # (6) -- forward the REAL decision to the agents HITL gate. Absorb 4xx + # (an operator pre-empted by curl-ing /agents/.../approve directly): + # AgentService.approve_action returns 400 once the action is consumed. + approve_json: dict[str, Any] = {"action_id": action_id, "approved": approved} + if reason: + approve_json["reason"] = reason + execution_status = "executed" if approved else "rejected" + try: + approve_body = await client.request( + "agent_hitl_flow[approve]", + "POST", + f"/agents/sessions/{session_id}/approve", + json_body=approve_json, + ) + raw_status = approve_body.get("status", execution_status) + if isinstance(raw_status, str): + execution_status = raw_status + except _StepError as exc: + if 400 <= exc.status_code < 500: + # Pre-empted -- the decision already landed via the agents API. + logger.info( + "demo.agent_hitl_flow.approve_pre_empted", + session_id=session_id, + action_id=action_id, + status_code=exc.status_code, + ) + execution_status = "external_4xx" + else: + return ( + "skip", + f"approve failed: {exc}", + { + "session_id": session_id, + "action_id": action_id, + "tokens_used": tokens_used, + "tool_calls_count": tool_count, + }, + ) + decision = "approved" if approved else "rejected" + # ctx mirror keeps the agents-API execution status (executed/rejected/ + # external_4xx); the slot entry below records the operator decision. + ctx.agent_approval_decision = execution_status + _record_approval_event( + ctx, + action_id=action_id, + tool_name=tool_name, + decision=decision, + session_id=session_id, + auto_approved=auto_approved, + reason=reason, + execution_status=execution_status, + pending_action=pending_action, + transcript_summary=transcript_summary, + tokens_used=tokens_used, + tool_calls_count=tool_count, + ) + finally: + hitl.clear() + + # (7) -- terminal. D5: a human rejection is a SUCCESSFUL demonstration of + # the HITL gate, not an error -- the run stays GREEN and the gated + # save_scenario never executed (no scenario_plan row written by the agent). + if not approved: + return ( + "pass", + "rejected by operator", + { + "session_id": session_id, + "action_id": action_id, + "approval_decision": "rejected", + "auto_approved": False, + "tokens_used": tokens_used, + "tool_calls_count": tool_count, + }, + ) return ( "pass", ( f"session={session_id[:8]}... tokens={tokens_used} " - f"tool_calls={tool_count} approved={approval_decision}" + f"tool_calls={tool_count} approved={execution_status}" ), { "session_id": session_id, "action_id": action_id, - "approval_decision": approval_decision, + "approval_decision": execution_status, + "auto_approved": auto_approved, "tokens_used": tokens_used, "tool_calls_count": tool_count, }, @@ -2908,8 +3129,33 @@ async def run_pipeline(app: FastAPI, req: DemoRunRequest) -> AsyncIterator[StepE status: StepStatus detail: str data: dict[str, Any] + # E5 (#411) — D2: run the step as a task and drain the intermediate + # event sink CONCURRENTLY with the in-flight step. PRP-41 drained + # the sink only AFTER the step returned, so the HITL step's + # ``awaiting_approval`` event reached the browser only once the + # decision window had already closed (the auto-approve had fired). + # Steps still execute strictly one at a time under the single lock; + # only event flushing overlaps the running step. + task = asyncio.ensure_future(fn(ctx, client)) try: - status, detail, data = await fn(ctx, client) + while True: + done, _pending = await asyncio.wait({task}, timeout=0.25) + # Drain + stamp the row's index/phase fields so the FE state + # machine processes buffered events as if the orchestrator + # emitted them. Order matters: an intermediate must land + # before the terminal so "awaiting_approval" precedes + # "approved" in the WS stream. + for ev in intermediate_events: + ev.step_index = index + ev.total_steps = total + ev.phase_index = phase_index + ev.phase_total = phase_total + ev.phase_name = phase_name + yield ev + intermediate_events.clear() + if done: + break + status, detail, data = task.result() except _StepError as exc: status, detail, data = "fail", str(exc), {} except (httpx.HTTPError, OSError) as exc: @@ -2927,19 +3173,25 @@ async def run_pipeline(app: FastAPI, req: DemoRunRequest) -> AsyncIterator[StepE f"unexpected error: {type(exc).__name__}: {exc}", {}, ) + finally: + # LOAD-BEARING (PRP Gotcha / quality Finding 3): the Stop button + # closes the WebSocket -> the async generator is closed, throwing + # GeneratorExit (a BaseException no except clause above catches) + # into the mid-step ``yield ev`` suspension point. This finally + # is the only hook that runs on EVERY exit path; without it the + # in-flight step task is orphaned ("Task was destroyed but it is + # pending") while the _Client closes underneath it. + if not task.done(): + task.cancel() duration_ms = (time.monotonic() - t0) * 1000 - # PRP-41 — drain any intermediate events the step buffered BEFORE - # the terminal step_complete. Stamp the row's index/phase fields - # so the FE state machine processes them as if they were emitted - # by the orchestrator. Order matters: intermediate events must - # land before the terminal so "awaiting_approval" precedes - # "approved" in the WS stream. + # Final flush: drain anything the step buffered after the last + # 0.25s tick (mid-step loop drained the rest). Keeps the + # intermediate-before-terminal ordering identical to pre-D2. for ev in intermediate_events: ev.step_index = index ev.total_steps = total ev.phase_index = phase_index ev.phase_total = phase_total - # phase_name is set by the step fn already, but mirror in case. ev.phase_name = phase_name yield ev intermediate_events.clear() diff --git a/app/features/demo/routes.py b/app/features/demo/routes.py index deaa8ac0..dc9d6b89 100644 --- a/app/features/demo/routes.py +++ b/app/features/demo/routes.py @@ -40,10 +40,13 @@ from app.core.database import get_db from app.core.exceptions import ConflictError, NotFoundError from app.core.logging import get_logger -from app.features.demo import link_health, service, workspace +from app.features.demo import hitl, link_health, service, workspace from app.features.demo.schemas import ( + ApprovalEventItem, + ApprovalEventsResponse, DemoRunRequest, DemoRunResult, + HitlDecisionRequest, StepEvent, WorkspaceDetailResponse, WorkspaceHealthResponse, @@ -86,6 +89,64 @@ async def run_demo_pipeline(request: Request, params: DemoRunRequest) -> DemoRun raise ConflictError(str(exc)) from exc +@router.post( + "/hitl-decision", + status_code=status.HTTP_204_NO_CONTENT, + summary="Relay an operator decision to the in-flight HITL step", + description=( + "Relay the Showcase HITL step card's Approve / Reject to the running " + "pipeline (E5, #411). The pipeline forwards the real decision to the " + "agents HITL gate. 404 when no matching action is pending; 409 when the " + "action was already decided; 422 on a malformed body." + ), +) +async def submit_hitl_decision(body: HitlDecisionRequest) -> None: + """Relay an operator Approve/Reject to the in-flight HITL step (E5, #411). + + Args: + body: The operator decision (action_id, approved/rejected, optional reason). + + Raises: + NotFoundError: When no HITL action is pending under ``action_id`` (404). + ConflictError: When the action was already decided (409). + """ + outcome = hitl.resolve(body.action_id, body.decision, body.reason) + if outcome == "not_found": + raise NotFoundError(message=f"No pending HITL action: {body.action_id}") + if outcome == "already_decided": + raise ConflictError(f"Action already decided: {body.action_id}") + + +@router.get( + "/approval-events", + response_model=ApprovalEventsResponse, + summary="Recent HITL approval events across saved workspaces", + description=( + "Flatten ``approval_events`` across the newest saved workspaces that " + "carry the slot, newest-workspace-first (E5, #411). An audit-glance " + "surface -- no pagination. Returns 200 + an empty list when none." + ), +) +async def list_hitl_approval_events( + db: AsyncSession = Depends(get_db), + limit: int = Query(default=50, ge=1, le=200, description="Maximum flattened entries."), +) -> ApprovalEventsResponse: + """List recent HITL approval events flattened across workspaces (E5, #411). + + Args: + db: Async database session from dependency. + limit: Maximum flattened entries to return (1-200). + + Returns: + The flattened approval events plus the returned count. + """ + events = await workspace.list_approval_events(db, limit=limit) + return ApprovalEventsResponse( + events=[ApprovalEventItem.model_validate(event) for event in events], + total=len(events), + ) + + @router.get( "/workspaces", response_model=WorkspaceListResponse, diff --git a/app/features/demo/schemas.py b/app/features/demo/schemas.py index 352770aa..70b1e8c3 100644 --- a/app/features/demo/schemas.py +++ b/app/features/demo/schemas.py @@ -506,3 +506,64 @@ class WorkspaceHealthResponse(BaseModel): dead: int = Field(..., ge=0, description="Count of references that probed dead (404).") unknown: int = Field(..., ge=0, description="Count of references whose probe was inconclusive.") checked_at: datetime = Field(default_factory=_utc_now, description="When the probes ran (UTC).") + + +class HitlDecisionRequest(BaseModel): + """Operator decision relay for the showcase HITL step (E5, issue #411). + + POSTed by the Showcase step card's Approve / Reject buttons to + ``POST /demo/hitl-decision``; the in-flight pipeline waits on the in-memory + relay and forwards the real decision to the agents HITL gate. HTTP-only + body -- every field is JSON-native (``str`` / ``Literal``), so the + model-level ``strict=True`` needs no ``Field(strict=False)`` override (the + AST policy walker fires only on date/datetime/time/UUID/Decimal). + ``extra="forbid"`` so a typo'd field 422s instead of silently no-opping. + """ + + model_config = ConfigDict(strict=True, extra="forbid") + + action_id: str = Field(..., min_length=1, description="Pending action to decide.") + decision: Literal["approved", "rejected"] = Field(..., description="Operator decision.") + reason: str | None = Field( + default=None, + max_length=500, + description="Optional reason (mirrors agents ApprovalRequest.reason).", + ) + + +class ApprovalEventItem(BaseModel): + """One flattened approval event for ``GET /demo/approval-events`` (E5, #411). + + Built from JSONB story-slot dicts (NOT ORM rows) -- tolerant typing with + defaults so a v1 entry (pre-E5 base keys only) still validates. Response + model: plain ``BaseModel``, NOT strict (strict mode is request-body policy). + """ + + workspace_id: str = Field(..., description="The workspace whose run recorded this event.") + workspace_name: str | None = Field(default=None, description="The workspace's optional label.") + action_id: str | None = Field(default=None, description="The decided action's id.") + tool_name: str | None = Field(default=None, description="The gated tool (e.g. save_scenario).") + decision: str | None = Field(default=None, description="approved / rejected / timed_out.") + decided_at: str | None = Field(default=None, description="ISO8601 UTC decision timestamp.") + session_id: str | None = Field( + default=None, description="Agent session the action belonged to." + ) + auto_approved: bool | None = Field( + default=None, description="True when the decision window lapsed." + ) + reason: str | None = Field(default=None, description="Operator-supplied reason (reject).") + execution_status: str | None = Field( + default=None, description="Agents-API status: executed / rejected / external_4xx." + ) + transcript_summary: str | None = Field( + default=None, description="Agent chat message (<=200 chars)." + ) + + +class ApprovalEventsResponse(BaseModel): + """Recent HITL approval events flattened across workspaces (E5, #411).""" + + events: list[ApprovalEventItem] = Field( + ..., description="Flattened approval events, newest workspace first; empty when none." + ) + total: int = Field(..., ge=0, description="Number of flattened entries returned (capped).") diff --git a/app/features/demo/tests/test_hitl.py b/app/features/demo/tests/test_hitl.py new file mode 100644 index 00000000..34f38e97 --- /dev/null +++ b/app/features/demo/tests/test_hitl.py @@ -0,0 +1,80 @@ +"""Unit tests for the HITL decision relay (E5, issue #411). + +pytest-asyncio runs in auto mode (``pyproject.toml``), so ``async def`` tests +need no marker. Each test clears the module slot first so the global state never +leaks between cases. +""" + +from __future__ import annotations + +import asyncio + +import pytest + +from app.features.demo import hitl + + +@pytest.fixture(autouse=True) +def _clear_slot() -> None: + """Reset the module-level slot before every test (global-state hygiene).""" + hitl.clear() + + +def test_resolve_without_register_is_not_found() -> None: + assert hitl.resolve("action-1", "approved") == "not_found" + + +def test_resolve_wrong_action_is_not_found() -> None: + hitl.register("action-1") + assert hitl.resolve("other", "approved") == "not_found" + + +def test_double_resolve_is_already_decided() -> None: + hitl.register("action-1") + assert hitl.resolve("action-1", "approved") == "applied" + assert hitl.resolve("action-1", "rejected") == "already_decided" + + +def test_register_overwrites_stale_slot() -> None: + hitl.register("stale") + assert hitl.resolve("stale", "approved") == "applied" + # A new run registers a fresh slot; the stale decision must not bleed in. + hitl.register("fresh") + assert hitl.resolve("fresh", "rejected") == "applied" + + +def test_clear_closes_the_window() -> None: + hitl.register("action-1") + hitl.clear() + assert hitl.resolve("action-1", "approved") == "not_found" + + +async def test_resolve_before_wait_returns_decision() -> None: + hitl.register("action-1") + assert hitl.resolve("action-1", "rejected", reason="too risky") == "applied" + result = await hitl.wait_for_decision("action-1", timeout=1.0) + assert result == ("rejected", "too risky") + + +async def test_wait_then_resolve_concurrently() -> None: + hitl.register("action-1") + + async def _decide() -> None: + await asyncio.sleep(0.02) + hitl.resolve("action-1", "approved") + + decider = asyncio.ensure_future(_decide()) + result = await hitl.wait_for_decision("action-1", timeout=1.0) + await decider + assert result == ("approved", None) + + +async def test_wait_times_out_to_none() -> None: + hitl.register("action-1") + result = await hitl.wait_for_decision("action-1", timeout=0.02) + assert result is None + + +async def test_wait_unknown_action_returns_none() -> None: + result = await hitl.wait_for_decision("never-registered", timeout=0.02) + assert result is None diff --git a/app/features/demo/tests/test_models.py b/app/features/demo/tests/test_models.py index ee048764..22b2eedd 100644 --- a/app/features/demo/tests/test_models.py +++ b/app/features/demo/tests/test_models.py @@ -118,7 +118,12 @@ async def test_showcase_workspace_status_check_violation(db_session: AsyncSessio async def test_showcase_workspace_e1_defaults_applied(db_session: AsyncSession) -> None: - """A minimal insert gets the E1 defaults (ORM + server defaults agree).""" + """A minimal insert gets the E1 defaults. + + E5 (#411) D4 -- an ORM insert now applies the bumped ORM default + (config_schema_version=2); the server_default stays 1 so pre-E5 rows + inserted outside the ORM legitimately read 1. + """ row = _make_row() db_session.add(row) await db_session.commit() @@ -129,7 +134,7 @@ async def test_showcase_workspace_e1_defaults_applied(db_session: AsyncSession) assert loaded.pinned is False assert loaded.notes is None assert loaded.tags == [] - assert loaded.config_schema_version == 1 + assert loaded.config_schema_version == 2 assert loaded.replayed_from_workspace_id is None # All six story slots stay NULL until their writer epic lands. assert loaded.seed_overrides is None diff --git a/app/features/demo/tests/test_pipeline.py b/app/features/demo/tests/test_pipeline.py index 7862d5a2..27b8d406 100644 --- a/app/features/demo/tests/test_pipeline.py +++ b/app/features/demo/tests/test_pipeline.py @@ -8,6 +8,7 @@ from __future__ import annotations +import asyncio from datetime import date, timedelta from types import SimpleNamespace from typing import Any, cast @@ -15,8 +16,13 @@ import pytest from fastapi import FastAPI -from app.features.demo import pipeline -from app.features.demo.schemas import DemoBacktestConfig, DemoRunRequest, UserScope +from app.features.demo import hitl, pipeline +from app.features.demo.schemas import ( + DemoBacktestConfig, + DemoRunRequest, + StepEvent, + UserScope, +) from app.shared.seeder.config import ScenarioPreset from app.shared.seeder.overrides import SeederOverrides @@ -850,10 +856,121 @@ async def request(self, *_a: object, **_k: object) -> dict[str, Any]: # ============================================================================= -# PRP-38 — phase grouping + new scenarios +# E5 (#411) — D2 concurrent intermediate-event drain # ============================================================================= +def _single_step_table(step_fn: Any) -> Any: + """Return a one-row phase table wrapping ``step_fn`` (drain-test helper).""" + + def _table(_scenario: Any) -> list[Any]: + return [("data", "blocking", step_fn)] + + return _table + + +async def test_run_pipeline_drains_intermediate_event_mid_step(monkeypatch): + """D2 — an intermediate event is YIELDED while the step is still pending. + + The stub step buffers an intermediate event, signals it has started, then + blocks on an asyncio.Event. The test consumes events until it sees the + intermediate frame, asserts the step has NOT yet returned its terminal, + then releases the step. Proves the drain overlaps the in-flight step in + wall time (not just stream order). + """ + started = asyncio.Event() + release = asyncio.Event() + + async def _blocking_step(_ctx: Any, client: Any) -> Any: + client.yield_event( + StepEvent( + event_type="step_complete", + step_name="blocking", + step_index=0, + total_steps=0, + status="running", + detail="mid-step", + data={"awaiting": True}, + ) + ) + started.set() + await release.wait() + return ("pass", "done", {}) + + monkeypatch.setattr(pipeline, "_phase_table", _single_step_table(_blocking_step)) + + agen = pipeline.run_pipeline(app=_FAKE_APP, req=DemoRunRequest()) + seen: list[StepEvent] = [] + intermediate: StepEvent | None = None + async for ev in agen: + seen.append(ev) + if ev.event_type == "step_complete" and ev.data.get("awaiting"): + intermediate = ev + # The step is still blocked: its terminal step_complete (status + # 'pass') cannot have been emitted yet. + assert started.is_set() + assert not any(e.event_type == "step_complete" and e.status == "pass" for e in seen) + release.set() + break + + assert intermediate is not None + # The orchestrator stamped the row's index/phase fields on the drained event. + assert intermediate.step_index == 1 + assert intermediate.phase_name == "data" + rest = [e async for e in agen] + terminal = [e for e in rest if e.event_type == "step_complete" and e.status == "pass"] + assert terminal and terminal[0].step_name == "blocking" + assert rest[-1].event_type == "pipeline_complete" + + +async def test_run_pipeline_cancels_in_flight_step_on_generator_close(monkeypatch): + """D2 — closing the generator mid-step cancels the step task (Stop button). + + Drives one intermediate event, then ``aclose()`` while the stub step is + still blocked. The finally clause must cancel the in-flight task so it ends + cancelled (no "Task was destroyed but it is pending" warning). + """ + started = asyncio.Event() + release = asyncio.Event() + cancelled = False + + async def _blocking_step(_ctx: Any, client: Any) -> Any: + nonlocal cancelled + client.yield_event( + StepEvent( + event_type="step_complete", + step_name="blocking", + step_index=0, + total_steps=0, + status="running", + detail="mid-step", + data={"awaiting": True}, + ) + ) + started.set() + try: + await release.wait() + except asyncio.CancelledError: + cancelled = True + raise + return ("pass", "done", {}) # pragma: no cover -- never reached + + monkeypatch.setattr(pipeline, "_phase_table", _single_step_table(_blocking_step)) + + # Typed Any so .aclose() is reachable (run_pipeline is annotated as the + # AsyncIterator supertype, which has no aclose). + agen: Any = pipeline.run_pipeline(app=_FAKE_APP, req=DemoRunRequest()) + async for ev in agen: + if ev.event_type == "step_complete" and ev.data.get("awaiting"): + break + # Close the generator (mirrors the WebSocketDisconnect -> aclose path). + await agen.aclose() + # Let the cancellation propagate into the orphaned-otherwise task. + await asyncio.sleep(0) + assert started.is_set() + assert cancelled is True + + def test_phase_table_demo_minimal_matches_legacy_11_steps_under_agents_phase(): """PRP-38 / PRP-41 — DEMO_MINIMAL keeps the legacy 11-step flow. @@ -2006,6 +2123,116 @@ async def test_rag_retrieve_probe_skips_on_embedding_auth_502(): assert ctx.embedding_unreachable is True +# ============================================================================= +# E5 (#411) — RAG-event capture (one ctx.rag_events entry per return path) +# ============================================================================= + + +async def test_rag_event_capture_probe_records_provider_state(monkeypatch, tmp_path): + monkeypatch.setattr( + pipeline, + "get_settings", + lambda: _fake_settings( + str(tmp_path / "reg"), rag_embedding_provider="openai", openai_api_key="sk-test" + ), + ) + ctx = _make_showcase_ctx() + await pipeline.step_embedding_provider_probe(ctx, _as_client(_RecordingClient(None))) + assert len(ctx.rag_events) == 1 + ev = ctx.rag_events[0] + assert ev["event"] == "probe" + assert ev["status"] == "pass" + assert ev["provider"] == "openai" + assert ev["reachable"] is True + + +async def test_rag_event_capture_index_records_chunk_count(monkeypatch, tmp_path): + monkeypatch.setattr( + pipeline, + "get_settings", + lambda: _fake_settings(str(tmp_path / "reg"), rag_embedding_provider="openai"), + ) + ctx = _make_showcase_ctx() + results = [ + {"source_path": p, "status": "indexed", "chunks_created": 4, "error": None} + for p in sorted(pipeline._USER_GUIDE_CURATED_FILES) + ] + client = _RecordingClient( + None, + responses={ + ("POST", "/rag/index/project-docs"): { + "results": results, + "total_files": 5, + "indexed": 5, + "updated": 0, + "unchanged": 0, + "failed": 0, + "total_chunks": 20, + }, + }, + ) + await pipeline.step_rag_index_subset(ctx, _as_client(client)) + assert len(ctx.rag_events) == 1 + ev = ctx.rag_events[0] + assert ev["event"] == "index" + assert ev["status"] == "pass" + assert ev["count"] == 20 + assert ev["provider"] == "openai" + + +async def test_rag_event_capture_index_skip_when_unreachable(monkeypatch, tmp_path): + monkeypatch.setattr( + pipeline, + "get_settings", + lambda: _fake_settings(str(tmp_path / "reg"), rag_embedding_provider="ollama"), + ) + ctx = _make_showcase_ctx() + ctx.embedding_unreachable = True + status, _detail, _ = await pipeline.step_rag_index_subset( + ctx, _as_client(_RecordingClient(None)) + ) + assert status == "skip" + assert len(ctx.rag_events) == 1 + assert ctx.rag_events[0]["event"] == "skip" + assert ctx.rag_events[0]["status"] == "skip" + + +async def test_rag_event_capture_retrieve_warn_on_zero_hits(monkeypatch, tmp_path): + monkeypatch.setattr( + pipeline, + "get_settings", + lambda: _fake_settings(str(tmp_path / "reg"), rag_embedding_provider="openai"), + ) + ctx = _make_showcase_ctx() + client = _RecordingClient( + None, + responses={("POST", "/rag/retrieve"): {"results": [], "total_chunks_searched": 12}}, + ) + status, _detail, _ = await pipeline.step_rag_retrieve_probe(ctx, _as_client(client)) + assert status == "warn" + assert len(ctx.rag_events) == 1 + ev = ctx.rag_events[0] + assert ev["event"] == "retrieve" + assert ev["status"] == "warn" + assert ev["count"] == 0 + + +async def test_rag_event_capture_demo_minimal_leaves_events_empty(monkeypatch, tmp_path): + """A legacy demo_minimal run never reaches the knowledge phase -> no events.""" + artifact = tmp_path / "naive-model.joblib" + artifact.write_bytes(b"fake joblib artifact bytes") + monkeypatch.setattr( + pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "registry")) + ) + wapes = {"naive": 0.30, "seasonal_naive": 0.15, "moving_average": 0.25} + monkeypatch.setattr(pipeline, "_Client", _build_fake_client(str(artifact), wapes)) + events = [e async for e in pipeline.run_pipeline(app=_FAKE_APP, req=DemoRunRequest())] + # demo_minimal has no knowledge steps; the accumulator stays empty. + assert events[-1].event_type == "pipeline_complete" + knowledge_steps = {"embedding_provider_probe", "rag_index_subset", "rag_retrieve_probe"} + assert not any(e.step_name in knowledge_steps for e in events) + + async def test_run_pipeline_showcase_rich_runs_planning_and_knowledge(monkeypatch, tmp_path): """PRP-40 — end-to-end SHOWCASE_RICH reaches the 5 new steps + greens.""" artifact = tmp_path / "artifacts" / "models" / "model_abc123def456.joblib" @@ -2112,6 +2339,9 @@ def __init__( event_sink: list[Any] | None = None, ) -> None: self.calls: list[tuple[str, str]] = [] + # E5 (#411) -- capture the approve POST body so tests can assert the + # relayed decision (approved=true|false + optional reason). + self.approve_body_sent: dict[str, Any] | None = None self._event_sink = event_sink if event_sink is not None else intermediate async def __aenter__(self) -> _HitlClient: @@ -2158,16 +2388,21 @@ async def request( "tokens_used": 80, } if path.endswith("/approve"): + self.approve_body_sent = json_body if approve_status >= 400: raise pipeline._StepError( step, approve_status, {"title": "Bad Request", "detail": "No pending action"}, ) - return approve_body or { + if approve_body is not None: + return approve_body + # Mirror the agents API: approved=false -> status "rejected". + approved = bool(json_body.get("approved", True)) if json_body else True + return { "action_id": chat_action_id, - "approved": True, - "status": "executed", + "approved": approved, + "status": "executed" if approved else "rejected", } raise AssertionError(f"unexpected request: {method} {path}") @@ -2209,8 +2444,8 @@ def test_llm_key_present_cloud_still_requires_key(monkeypatch): assert pipeline._llm_key_present() is False -async def test_agent_hitl_flow_happy_path(monkeypatch, tmp_path): - """PRP-41 — full HITL round-trip: chat -> intermediate -> approve -> pass.""" +async def test_agent_hitl_flow_window_lapse_auto_approves(monkeypatch, tmp_path): + """PRP-41 / E5 — no operator decision -> window lapses -> auto-approve pass.""" monkeypatch.setattr( pipeline, "get_settings", @@ -2222,8 +2457,8 @@ async def test_agent_hitl_flow_happy_path(monkeypatch, tmp_path): "_llm_key_present", lambda: True, ) - # Short-circuit the 3s display delay so the test stays fast. - monkeypatch.setattr(pipeline, "_APPROVAL_DISPLAY_DELAY_S", 0.0) + # Zero window -> wait_for_decision lapses immediately (no operator click). + monkeypatch.setattr(pipeline, "_APPROVAL_DECISION_WINDOW_S", 0.0) client, intermediate = _make_hitl_client() ctx = pipeline.DemoContext(seed=42, skip_seed=True, reset=False) @@ -2232,22 +2467,92 @@ async def test_agent_hitl_flow_happy_path(monkeypatch, tmp_path): assert status == "pass" assert "approved=executed" in detail assert data["approval_decision"] == "executed" + assert data["auto_approved"] is True assert data["action_id"] == "action-abc-123" assert data["session_id"] == "sess-test-0001" assert data["tokens_used"] == 240 + # The window lapse relayed approved=true to the agents HITL gate. + assert client.approve_body_sent == {"action_id": "action-abc-123", "approved": True} # The HITL step buffered exactly one intermediate event for the FE. assert len(intermediate) == 1 inter = intermediate[0] assert inter.status == "running" assert inter.data["awaiting_approval"] is True assert inter.data["action_id"] == "action-abc-123" + assert inter.data["decision_url"] == "/demo/hitl-decision" + assert inter.data["decision_window_s"] == 0.0 assert inter.phase_name == pipeline.PHASE_AGENTS + # One approval_events entry captured (auto-approved). + assert len(ctx.approval_events) == 1 + entry = ctx.approval_events[0] + assert entry["decision"] == "approved" + assert entry["auto_approved"] is True + assert entry["tool_name"] == "save_scenario" + assert entry["execution_status"] == "executed" + assert entry["transcript_summary"] == "I'll save that scenario." + assert "arguments_keys" in entry["tool_call_summary"] # Ctx threaded for downstream cleanup + KPI consumers. assert ctx.approval_action_id == "action-abc-123" assert ctx.agent_approval_decision == "executed" assert ctx.session_id == "sess-test-0001" +async def test_agent_hitl_flow_operator_approve(monkeypatch, tmp_path): + """E5 — operator approves within the window -> approve POST, entry approved.""" + monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg"))) + monkeypatch.setattr(pipeline, "_llm_key_present", lambda: True) + monkeypatch.setattr(pipeline, "_APPROVAL_DECISION_WINDOW_S", 5.0) + + async def _fake_wait(_action_id: str, timeout: float) -> tuple[str, str | None]: + return ("approved", None) + + monkeypatch.setattr(hitl, "wait_for_decision", _fake_wait) + + client, _intermediate = _make_hitl_client() + ctx = pipeline.DemoContext(seed=42, skip_seed=True, reset=False) + status, _detail, data = await pipeline.step_agent_hitl_flow(ctx, client) + + assert status == "pass" + assert data["approval_decision"] == "executed" + assert data["auto_approved"] is False + assert client.approve_body_sent == {"action_id": "action-abc-123", "approved": True} + assert len(ctx.approval_events) == 1 + assert ctx.approval_events[0]["decision"] == "approved" + assert ctx.approval_events[0]["auto_approved"] is False + + +async def test_agent_hitl_flow_operator_reject(monkeypatch, tmp_path): + """E5 (D5) — operator rejects -> approve POST approved=false; pass + green.""" + monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg"))) + monkeypatch.setattr(pipeline, "_llm_key_present", lambda: True) + monkeypatch.setattr(pipeline, "_APPROVAL_DECISION_WINDOW_S", 5.0) + + async def _fake_wait(_action_id: str, timeout: float) -> tuple[str, str | None]: + return ("rejected", "too risky for the demo") + + monkeypatch.setattr(hitl, "wait_for_decision", _fake_wait) + + client, _intermediate = _make_hitl_client() + ctx = pipeline.DemoContext(seed=42, skip_seed=True, reset=False) + status, detail, data = await pipeline.step_agent_hitl_flow(ctx, client) + + # D5 -- a reject is a SUCCESSFUL HITL demonstration: the run stays GREEN. + assert status == "pass" + assert detail == "rejected by operator" + assert data["approval_decision"] == "rejected" + # The reject + reason were relayed to the agents HITL gate (approved=false). + assert client.approve_body_sent == { + "action_id": "action-abc-123", + "approved": False, + "reason": "too risky for the demo", + } + assert len(ctx.approval_events) == 1 + entry = ctx.approval_events[0] + assert entry["decision"] == "rejected" + assert entry["reason"] == "too risky for the demo" + assert entry["auto_approved"] is False + + async def test_agent_hitl_flow_skips_without_key(monkeypatch, tmp_path): """PRP-41 — no LLM key -> skip-gracefully; no session created.""" monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg"))) @@ -2297,7 +2602,7 @@ async def test_agent_hitl_flow_skips_when_agent_did_not_trigger_tool(monkeypatch """PRP-41 — agent answered directly (no pending_action) -> skip with detail.""" monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg"))) monkeypatch.setattr(pipeline, "_llm_key_present", lambda: True) - monkeypatch.setattr(pipeline, "_APPROVAL_DISPLAY_DELAY_S", 0.0) + monkeypatch.setattr(pipeline, "_APPROVAL_DECISION_WINDOW_S", 0.0) client, intermediate = _make_hitl_client(chat_pending=False) ctx = pipeline.DemoContext(seed=42, skip_seed=True, reset=False) @@ -2309,20 +2614,26 @@ async def test_agent_hitl_flow_skips_when_agent_did_not_trigger_tool(monkeypatch assert intermediate == [] -async def test_agent_hitl_flow_absorbs_double_approve_400(monkeypatch, tmp_path): - """PRP-41 — FE pre-empted Approve -> backend approve returns 400; absorb.""" +async def test_agent_hitl_flow_absorbs_external_approve_400(monkeypatch, tmp_path): + """E5 (D1) — an operator pre-empted /agents/.../approve directly -> 400. + + The step absorbs the 4xx and records ``execution_status="external_4xx"`` -- + honest about the residual ambiguity (the decision landed outside the relay). + """ monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg"))) monkeypatch.setattr(pipeline, "_llm_key_present", lambda: True) - monkeypatch.setattr(pipeline, "_APPROVAL_DISPLAY_DELAY_S", 0.0) + monkeypatch.setattr(pipeline, "_APPROVAL_DECISION_WINDOW_S", 0.0) client, intermediate = _make_hitl_client(approve_status=400) ctx = pipeline.DemoContext(seed=42, skip_seed=True, reset=False) status, detail, data = await pipeline.step_agent_hitl_flow(ctx, client) - # 4xx absorbed: step still passes with optimistic "executed" decision. + # 4xx absorbed: step still passes; the decision is the honest external edge. assert status == "pass" - assert data["approval_decision"] == "executed" - assert "approved=executed" in detail + assert data["approval_decision"] == "external_4xx" + assert "approved=external_4xx" in detail + assert ctx.approval_events[0]["execution_status"] == "external_4xx" + assert ctx.approval_events[0]["decision"] == "approved" # The intermediate event was still buffered before the absorb branch. assert len(intermediate) == 1 @@ -2331,7 +2642,7 @@ async def test_agent_hitl_flow_skips_on_hard_timeout(monkeypatch, tmp_path): """PRP-41 — elapsed > _APPROVAL_HARD_TIMEOUT_S -> skip with timed_out.""" monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg"))) monkeypatch.setattr(pipeline, "_llm_key_present", lambda: True) - monkeypatch.setattr(pipeline, "_APPROVAL_DISPLAY_DELAY_S", 0.0) + monkeypatch.setattr(pipeline, "_APPROVAL_DECISION_WINDOW_S", 0.0) # Force the elapsed-time check to fire: set the hard cap below the # display delay so any positive elapsed exceeds it. monkeypatch.setattr(pipeline, "_APPROVAL_HARD_TIMEOUT_S", -1.0) @@ -2345,6 +2656,10 @@ async def test_agent_hitl_flow_skips_on_hard_timeout(monkeypatch, tmp_path): assert data["timed_out"] is True assert data["approval_decision"] == "timed_out" assert ctx.agent_approval_decision == "timed_out" + # E5 -- a timed_out entry is still recorded for the workspace story. + assert len(ctx.approval_events) == 1 + assert ctx.approval_events[0]["decision"] == "timed_out" + assert ctx.approval_events[0]["execution_status"] is None # Intermediate event was emitted; approve POST never fired. assert len(intermediate) == 1 assert all(call[1] != f"/agents/sessions/{data['session_id']}/approve" for call in client.calls) diff --git a/app/features/demo/tests/test_routes.py b/app/features/demo/tests/test_routes.py index f5813d79..7b8858ba 100644 --- a/app/features/demo/tests/test_routes.py +++ b/app/features/demo/tests/test_routes.py @@ -822,3 +822,127 @@ async def test_workspace_health_integration_alive_and_dead(client, db_session: A assert body["partial_run"] is True finally: await client.delete(f"/agents/sessions/{agent_session_id}") + + +# ============================================================================= +# E5 (#411) — POST /demo/hitl-decision + GET /demo/approval-events +# ============================================================================= + + +@pytest.fixture(autouse=True) +def _clear_hitl_slot(): + """Reset the module-level HITL relay slot around every test in this file.""" + from app.features.demo import hitl + + hitl.clear() + yield + hitl.clear() + + +async def test_hitl_decision_204_on_pending(client): + """A decision for the registered pending action returns 204.""" + from app.features.demo import hitl + + hitl.register("act-204") + resp = await client.post( + "/demo/hitl-decision", + json={"action_id": "act-204", "decision": "rejected", "reason": "too risky"}, + ) + assert resp.status_code == 204 + # The relay recorded the operator's decision for the waiting step (use a + # positive timeout: wait_for(timeout=0) raises before stepping the + # freshly-scheduled task, even when the event is already set). + assert await hitl.wait_for_decision("act-204", timeout=1.0) == ("rejected", "too risky") + + +async def test_hitl_decision_404_when_nothing_pending(client): + """No registered action -> 404 problem+json.""" + resp = await client.post( + "/demo/hitl-decision", + json={"action_id": "ghost", "decision": "approved"}, + ) + assert resp.status_code == 404 + assert resp.headers["content-type"].startswith("application/problem+json") + assert "No pending HITL action" in resp.json()["detail"] + + +async def test_hitl_decision_409_when_already_decided(client): + """A second decision for the same action -> 409 problem+json.""" + from app.features.demo import hitl + + hitl.register("act-409") + first = await client.post( + "/demo/hitl-decision", json={"action_id": "act-409", "decision": "approved"} + ) + assert first.status_code == 204 + second = await client.post( + "/demo/hitl-decision", json={"action_id": "act-409", "decision": "rejected"} + ) + assert second.status_code == 409 + assert second.headers["content-type"].startswith("application/problem+json") + assert "already decided" in second.json()["detail"] + + +async def test_hitl_decision_422_bad_body(client): + """A bad decision literal / extra key -> 422 problem+json.""" + bad_literal = await client.post( + "/demo/hitl-decision", json={"action_id": "a", "decision": "maybe"} + ) + assert bad_literal.status_code == 422 + assert bad_literal.headers["content-type"].startswith("application/problem+json") + extra_key = await client.post( + "/demo/hitl-decision", + json={"action_id": "a", "decision": "approved", "bogus": 1}, + ) + assert extra_key.status_code == 422 + + +async def test_approval_events_empty(client, monkeypatch): + """200 + empty list when no workspace carries approval events.""" + + async def fake_list(_db, *, limit: int = 50) -> list[dict[str, object]]: + return [] + + monkeypatch.setattr(workspace, "list_approval_events", fake_list) + resp = await client.get("/demo/approval-events") + assert resp.status_code == 200 + body = resp.json() + assert body == {"events": [], "total": 0} + + +async def test_approval_events_populated(client, monkeypatch): + """Flattened entries carry workspace_id / workspace_name + decision.""" + + async def fake_list(_db, *, limit: int = 50) -> list[dict[str, object]]: + return [ + { + "workspace_id": "a" * 32, + "workspace_name": "demo-1", + "action_id": "act-1", + "tool_name": "save_scenario", + "decision": "rejected", + "decided_at": "2026-06-13T00:00:00+00:00", + "session_id": "sess-1", + "auto_approved": False, + "reason": "too risky", + "execution_status": "rejected", + "transcript_summary": "I'll save that scenario.", + } + ] + + monkeypatch.setattr(workspace, "list_approval_events", fake_list) + resp = await client.get("/demo/approval-events", params={"limit": 5}) + assert resp.status_code == 200 + body = resp.json() + assert body["total"] == 1 + assert body["events"][0]["workspace_id"] == "a" * 32 + assert body["events"][0]["workspace_name"] == "demo-1" + assert body["events"][0]["decision"] == "rejected" + + +async def test_approval_events_rejects_bad_limit(client): + """limit is bounded 1-200 -> 422 problem+json out of range.""" + resp = await client.get("/demo/approval-events", params={"limit": 0}) + assert resp.status_code == 422 + resp = await client.get("/demo/approval-events", params={"limit": 999}) + assert resp.status_code == 422 diff --git a/app/features/demo/tests/test_schemas.py b/app/features/demo/tests/test_schemas.py index d7ba3573..ac976544 100644 --- a/app/features/demo/tests/test_schemas.py +++ b/app/features/demo/tests/test_schemas.py @@ -7,9 +7,11 @@ from pydantic import ValidationError from app.features.demo.schemas import ( + ApprovalEventItem, DemoBacktestConfig, DemoRunRequest, DemoRunResult, + HitlDecisionRequest, StepEvent, UserScope, WorkspaceDetailResponse, @@ -622,3 +624,60 @@ def test_workspace_list_response_shape(): assert dumped["workspaces"][0]["workspace_id"] == "a" * 32 # ISO serialization on the wire. assert isinstance(dumped["workspaces"][0]["created_at"], str) + + +# ============================================================================= +# E5 (#411) — HitlDecisionRequest + ApprovalEventItem +# ============================================================================= + + +def test_hitl_decision_request_json_path(): + """The JSON-dict path (FastAPI's validate_python) accepts a valid body.""" + body = HitlDecisionRequest.model_validate( + {"action_id": "act-1", "decision": "rejected", "reason": "too risky"} + ) + assert body.action_id == "act-1" + assert body.decision == "rejected" + assert body.reason == "too risky" + + +def test_hitl_decision_request_reason_optional(): + body = HitlDecisionRequest.model_validate({"action_id": "act-1", "decision": "approved"}) + assert body.reason is None + + +def test_hitl_decision_request_rejects_unknown_decision(): + with pytest.raises(ValidationError): + HitlDecisionRequest.model_validate({"action_id": "a", "decision": "maybe"}) + + +def test_hitl_decision_request_forbids_extra_key(): + with pytest.raises(ValidationError): + HitlDecisionRequest.model_validate({"action_id": "a", "decision": "approved", "bogus": 1}) + + +def test_hitl_decision_request_rejects_empty_action_and_long_reason(): + with pytest.raises(ValidationError): + HitlDecisionRequest.model_validate({"action_id": "", "decision": "approved"}) + with pytest.raises(ValidationError): + HitlDecisionRequest.model_validate( + {"action_id": "a", "decision": "approved", "reason": "x" * 501} + ) + + +def test_approval_event_item_tolerates_v1_entry(): + """A pre-E5 base-key entry (no additive keys) still validates with defaults.""" + item = ApprovalEventItem.model_validate( + { + "workspace_id": "a" * 32, + "workspace_name": "demo-1", + "action_id": "act-1", + "tool_name": "save_scenario", + "decision": "approved", + "decided_at": "2026-06-13T00:00:00+00:00", + "session_id": "sess-1", + } + ) + assert item.auto_approved is None + assert item.execution_status is None + assert item.decision == "approved" diff --git a/app/features/demo/tests/test_workspace.py b/app/features/demo/tests/test_workspace.py index b0597981..737bfccd 100644 --- a/app/features/demo/tests/test_workspace.py +++ b/app/features/demo/tests/test_workspace.py @@ -279,7 +279,8 @@ async def test_create_workspace_without_replayed_from_is_none(db_session: AsyncS assert row.archived is False assert row.pinned is False assert row.tags == [] - assert row.config_schema_version == 1 + # E5 (#411) D4 -- new rows carry the bumped story-slot schema version. + assert row.config_schema_version == 2 # ============================================================================= @@ -379,3 +380,143 @@ async def test_update_workspace_empty_request_noop(db_session: AsyncSession) -> assert row is not None assert row.name == "it-noop" assert row.status == WORKSPACE_STATUS_RUNNING + + +# ============================================================================= +# E5 (#411) — story-slot capture + reproduction marker + approval-events list +# ============================================================================= + + +def _ctx_with_story() -> DemoContext: + """A finished ctx carrying one approval event + one index rag event.""" + ctx = _finished_ctx() + ctx.approval_events = [ + { + "action_id": "act-1", + "tool_name": "save_scenario", + "decision": "rejected", + "decided_at": "2026-06-13T00:00:00+00:00", + "session_id": "sess-0123abcd", + "auto_approved": False, + "reason": "too risky", + "execution_status": "rejected", + "tool_call_summary": {"description": "save plan", "arguments_keys": ["name"]}, + "transcript_summary": "I'll save that scenario.", + "tokens_used": 240, + "tool_calls_count": 1, + } + ] + ctx.rag_events = [ + { + "event": "index", + "status": "pass", + "detail": "files_indexed=5/5 chunks=20", + "count": 20, + "occurred_at": "2026-06-13T00:00:00+00:00", + "provider": "openai", + "reachable": None, + } + ] + return ctx + + +async def test_finalize_writes_story_slots(db_session: AsyncSession) -> None: + """finalize persists approval_events + rag_events when the run captured them.""" + workspace_id = await workspace.create_workspace(_keep_request(workspace_name="it-story")) + assert workspace_id is not None + await workspace.finalize_workspace( + workspace_id, _ctx_with_story(), failed=False, wall_clock_s=5.0 + ) + row = await workspace.get_workspace(db_session, workspace_id) + assert row is not None + assert row.approval_events is not None + assert len(row.approval_events) == 1 + assert row.approval_events[0]["decision"] == "rejected" + assert row.rag_events is not None + assert row.rag_events[0]["event"] == "index" + + +async def test_finalize_leaves_story_slots_null_when_empty(db_session: AsyncSession) -> None: + """Empty accumulators -> slots stay NULL (never []), per E1's slot contract.""" + workspace_id = await workspace.create_workspace(_keep_request(workspace_name="it-empty")) + assert workspace_id is not None + await workspace.finalize_workspace( + workspace_id, _finished_ctx(), failed=False, wall_clock_s=5.0 + ) + row = await workspace.get_workspace(db_session, workspace_id) + assert row is not None + assert row.approval_events is None + assert row.rag_events is None + assert "story_reproduction" not in (row.result_summary or {}) + + +async def test_finalize_replay_records_reproduced(db_session: AsyncSession) -> None: + """A replay of a workspace WITH a story whose run also has a story -> reproduced.""" + source_id = await workspace.create_workspace(_keep_request(workspace_name="it-src")) + assert source_id is not None + await workspace.finalize_workspace(source_id, _ctx_with_story(), failed=False, wall_clock_s=5.0) + replay_id = await workspace.create_workspace( + _keep_request(workspace_name="it-replay", replayed_from_workspace_id=source_id) + ) + assert replay_id is not None + await workspace.finalize_workspace(replay_id, _ctx_with_story(), failed=False, wall_clock_s=5.0) + row = await workspace.get_workspace(db_session, replay_id) + assert row is not None + repro = (row.result_summary or {})["story_reproduction"] + assert repro["agent"] == "reproduced" + assert repro["knowledge"] == "reproduced" + assert repro["source_workspace_id"] == source_id + + +async def test_finalize_replay_records_not_applicable(db_session: AsyncSession) -> None: + """Source row had NO story -> not_applicable regardless of this run's capture.""" + source_id = await workspace.create_workspace(_keep_request(workspace_name="it-src2")) + assert source_id is not None + await workspace.finalize_workspace(source_id, _finished_ctx(), failed=False, wall_clock_s=5.0) + replay_id = await workspace.create_workspace( + _keep_request(workspace_name="it-replay2", replayed_from_workspace_id=source_id) + ) + assert replay_id is not None + await workspace.finalize_workspace(replay_id, _ctx_with_story(), failed=False, wall_clock_s=5.0) + row = await workspace.get_workspace(db_session, replay_id) + assert row is not None + repro = (row.result_summary or {})["story_reproduction"] + assert repro["agent"] == "not_applicable" + assert repro["knowledge"] == "not_applicable" + + +async def test_finalize_replay_dangling_source_is_unknown(db_session: AsyncSession) -> None: + """A dangling replay source (deleted / never existed) -> unknown.""" + replay_id = await workspace.create_workspace( + _keep_request(workspace_name="it-dangle", replayed_from_workspace_id="0" * 32) + ) + assert replay_id is not None + await workspace.finalize_workspace(replay_id, _ctx_with_story(), failed=False, wall_clock_s=5.0) + row = await workspace.get_workspace(db_session, replay_id) + assert row is not None + repro = (row.result_summary or {})["story_reproduction"] + assert repro["agent"] == "unknown" + assert repro["knowledge"] == "unknown" + assert repro["source_workspace_id"] is None + + +async def test_list_approval_events_flattens_newest_first(db_session: AsyncSession) -> None: + """list_approval_events flattens entries newest-row-first and respects limit.""" + for index in range(2): + wid = await workspace.create_workspace(_keep_request(workspace_name=f"it-ae-{index}")) + assert wid is not None + await workspace.finalize_workspace(wid, _ctx_with_story(), failed=False, wall_clock_s=1.0) + # A workspace with no approval_events must be excluded from the flatten. + plain = await workspace.create_workspace(_keep_request(workspace_name="it-ae-plain")) + assert plain is not None + await workspace.finalize_workspace(plain, _finished_ctx(), failed=False, wall_clock_s=1.0) + + events = await workspace.list_approval_events(db_session, limit=50) + assert len(events) == 2 + assert all(e["workspace_name"].startswith("it-ae-") for e in events) + assert all("workspace_id" in e and e["decision"] == "rejected" for e in events) + # Newest workspace first. + assert events[0]["workspace_name"] == "it-ae-1" + + capped = await workspace.list_approval_events(db_session, limit=1) + assert len(capped) == 1 diff --git a/app/features/demo/workspace.py b/app/features/demo/workspace.py index 1b3ba4aa..7b5ce5c7 100644 --- a/app/features/demo/workspace.py +++ b/app/features/demo/workspace.py @@ -219,11 +219,29 @@ async def finalize_workspace( row.date_start = ctx.date_start row.date_end = ctx.date_end row.created_objects = _collect_created_objects(ctx) - row.result_summary = { + # E5 (#411) -- story slots. Empty list -> None (E1: NULL = "slot + # never written"). Whole-value assignment so SQLAlchemy change + # detection fires (never mutate a loaded row's JSONB in place). + row.approval_events = ctx.approval_events or None + row.rag_events = ctx.rag_events or None + summary: dict[str, Any] = { "winner_model_type": ctx.winner_model_type, "winner_wape": ctx.winner_wape, "wall_clock_s": wall_clock_s, } + # E5 (#411) D7 -- on a replay keep-run, compare the SOURCE row's + # story slots against this run's capture and record the verdict. + # One extra get-by-id select in this same session (inside the + # warn-and-continue try). A dangling source -> "unknown". + if row.replayed_from_workspace_id: + src_result = await db.execute( + select(ShowcaseWorkspace).where( + ShowcaseWorkspace.workspace_id == row.replayed_from_workspace_id + ) + ) + src = src_result.scalar_one_or_none() + summary["story_reproduction"] = _story_reproduction(src, ctx) + row.result_summary = summary await db.commit() except Exception as exc: # workspace must never break the demo logger.warning( @@ -236,6 +254,68 @@ async def finalize_workspace( logger.info("demo.workspace_finalized", workspace_id=workspace_id, failed=failed) +def _story_reproduction(src: ShowcaseWorkspace | None, ctx: DemoContext) -> dict[str, Any]: + """Compare the source row's story slots against this run's capture (E5 D7). + + ``agent``: the source row had >=1 approval event -> compare with this run + (``reproduced`` / ``not_reproduced``); the source had none -> ``not_applicable``; + the source row is missing (soft reference dangles) -> ``unknown``. + ``knowledge``: same logic over ``rag_events`` entries whose ``event`` is + ``index`` / ``retrieve`` with ``status != "skip"`` (a real knowledge hit, + not a graceful skip). + """ + if src is None: + return {"agent": "unknown", "knowledge": "unknown", "source_workspace_id": None} + + def _verdict(source_had: bool, new_has: bool) -> str: + if not source_had: + return "not_applicable" + return "reproduced" if new_has else "not_reproduced" + + def _has_knowledge(events: list[dict[str, Any]] | None) -> bool: + return any( + e.get("event") in ("index", "retrieve") and e.get("status") != "skip" + for e in (events or []) + ) + + return { + "agent": _verdict(bool(src.approval_events), bool(ctx.approval_events)), + "knowledge": _verdict(_has_knowledge(src.rag_events), _has_knowledge(ctx.rag_events)), + "source_workspace_id": src.workspace_id, + } + + +async def list_approval_events(db: AsyncSession, *, limit: int = 50) -> list[dict[str, Any]]: + """Flatten ``approval_events`` across the newest rows that carry the slot. + + Scans the newest workspace rows with a non-NULL ``approval_events`` slot + (an audit-glance surface, not a browse API) and flattens their entries + newest-row-first, each tagged with its ``workspace_id`` / ``workspace_name``, + capped at ``limit``. Python-side flatten over a low-cardinality table -- no + ``jsonb_array_elements`` SQL (D6). + + Args: + db: An open async session (caller-owned). + limit: Maximum flattened entries to return (route caps 1-200). + + Returns: + The flattened approval events, newest workspace first. + """ + result = await db.execute( + select(ShowcaseWorkspace) + .where(ShowcaseWorkspace.approval_events.isnot(None)) + .order_by(ShowcaseWorkspace.created_at.desc(), ShowcaseWorkspace.id.desc()) + .limit(50) + ) + events: list[dict[str, Any]] = [] + for row in result.scalars(): + for entry in row.approval_events or []: + events.append({"workspace_id": row.workspace_id, "workspace_name": row.name, **entry}) + if len(events) >= limit: + return events + return events + + async def get_workspace(db: AsyncSession, workspace_id: str) -> ShowcaseWorkspace | None: """Load a workspace row by its external id. diff --git a/docs/_base/API_CONTRACTS.md b/docs/_base/API_CONTRACTS.md index 2922c077..b2fb5c64 100644 --- a/docs/_base/API_CONTRACTS.md +++ b/docs/_base/API_CONTRACTS.md @@ -65,6 +65,8 @@ All endpoints serve JSON; error responses use `application/problem+json` (RFC 78 | demo | GET | `/demo/workspaces/{workspace_id}/health` | **E2 (#408)** — probe the workspace's soft references in-process (model runs, scenario plans, alias, batch, agent session, `job_ids` slot) via `httpx.ASGITransport`; per-reference `status` ∈ `alive` (2xx) / `dead` (404 — deleted after the run) / `unknown` (anything else — never a 500), plus `alive`/`dead`/`unknown` counts and `partial_run` (true when the row's status ≠ `completed`); non-probeable keys (`v2_model_path`, `scenario_artifact_key`, `train_model_types`) are skipped; `404 application/problem+json` when the workspace is missing | | demo | PATCH | `/demo/workspaces/{workspace_id}` | **E1 (#407)** — partial lifecycle update (`name` / `notes` / `tags` / `archived` / `pinned`; `exclude_unset` semantics — only provided fields change; explicit `null` clears `name`/`notes`; explicit `null` on `archived`/`pinned`/`tags` → `422` (send `[]` to clear tags); `status` NOT patchable — the pipeline owns it); returns the updated `WorkspaceDetailResponse`; empty body = `200` no-op; `404 application/problem+json` when missing; `422` on unknown keys / bad name pattern / >20 tags | | demo | DELETE | `/demo/workspaces/{workspace_id}` | Delete one saved workspace METADATA row; `204` on success, `404 application/problem+json` when missing. The run's created objects (model runs, scenario plans, aliases, jobs, artifacts) are soft references and are NOT deleted | +| demo | POST | `/demo/hitl-decision` | **E5 (#411)** — relay the Showcase HITL step card's Approve/Reject to the in-flight pipeline. Body `{action_id: str, decision: 'approved' \| 'rejected', reason?: str ≤500}` (`ConfigDict(strict=True, extra='forbid')`). `204` on success; `404 application/problem+json` when no matching action is pending; `409` when the action was already decided; `422` on a malformed body. The in-memory single-slot relay is safe because the pipeline runs one-at-a-time under the module `_pipeline_lock`; the pipeline forwards the real decision to `/agents/sessions/{id}/approve` (`approved=true\|false` + reason) — `agent_require_approval` is untouched. A reject keeps the pipeline GREEN (D5); the gated `save_scenario` never executes | +| demo | GET | `/demo/approval-events` | **E5 (#411)** — recent HITL approval events flattened across the newest saved workspaces carrying the `approval_events` slot, newest-workspace-first (`limit` 1-200 default 50); `200` + empty list when none. Each item carries `workspace_id` / `workspace_name` plus the entry's base + additive keys (`decision`, `tool_name`, `auto_approved`, `reason`, `execution_status`, `transcript_summary`, …). Audit-glance surface — no pagination/offset (D6). Backs the `/ops` page's Approval History table (frontend-only — the ops slice does not import demo code) | | config | GET | `/config/ai` | Effective AI-model config (agent LLM + RAG embeddings); API keys masked, never raw | | config | PATCH | `/config/ai` | Persist + apply AI-model changes live (no restart). `409` if an embedding-dimension change would orphan indexed RAG chunks (resend with `force=true`) | | config | GET | `/config/providers/health` | Per-provider connectivity — Ollama probed live, cloud providers by API-key presence | @@ -92,7 +94,7 @@ Drives the end-to-end demo pipeline for the dashboard Showcase page. Verified ag - **Server → client (every frame):** Pydantic-serialized `StepEvent` — `{"event_type", "step_name", "step_index", "total_steps", "status", "detail", "duration_ms", "data", "timestamp", "phase_name"?, "phase_index"?, "phase_total"?}`. PRP-38 — the three `phase_*` fields are Optional + Nullable so legacy clients that don't render phases keep working. - **`event_type` values (Literal in `StepEvent`):** - `step_start` — a step began; `status` is `null`. - - `step_complete` — a step finished; `status ∈ {pass, fail, skip, warn}`, `data` carries structured payload (backtest `per_model` WAPE + `winner` + `bucketed_aggregated_metrics` on PRP-36/38 feature-aware runs; register `run_id` + `alias`; PRP-38 `v2_train` → `v2_run_id` + `feature_frame_version` + `feature_columns_count` + `feature_groups` + `artifact_uri_full`). + - `step_complete` — a step finished; `status ∈ {pass, fail, skip, warn}`, `data` carries structured payload (backtest `per_model` WAPE + `winner` + `bucketed_aggregated_metrics` on PRP-36/38 feature-aware runs; register `run_id` + `alias`; PRP-38 `v2_train` → `v2_run_id` + `feature_frame_version` + `feature_columns_count` + `feature_groups` + `artifact_uri_full`). **E5 (#411)** — `agent_hitl_flow` ALSO emits an INTERMEDIATE `step_complete` with `status="running"` + `data.awaiting_approval=true` while it is still awaiting; that frame now reaches the browser DURING the decision window (the orchestrator drains the intermediate-event sink concurrently with the in-flight step — D2; pre-E5 it flushed only after the step returned, so the button could never render in time). Its `data` gains `decision_url: "/demo/hitl-decision"` + `decision_window_s: float` (the FE countdown reads this, never hardcodes) alongside the existing `action_id` / `session_id` / `approval_url`. - `pipeline_complete` — final event; `data` carries `winner_model_type`, `winner_wape`, `winning_run_id`, `alias`, `wall_clock_s`, `v2_run_id` (PRP-38; null when no V2 run was registered), and `workspace_id` (E1 #390; additive — a string on `preservation="keep"` runs, null otherwise). - `error` — bad start frame or a concurrent run already in progress; one event, then the server closes. - Concurrency: a module-level `asyncio.Lock` allows one pipeline at a time. A second `POST /demo/run` returns `409`; a second `WS /demo/stream` receives one `error` event. @@ -100,6 +102,7 @@ Drives the end-to-end demo pipeline for the dashboard Showcase page. Verified ag - PRP-40 — `scenario="showcase_rich"` ALSO adds two phases inserted BEFORE `verify`: `planning` (2 steps — `scenario_simulate_and_save`, `multi_plan_compare`) and `knowledge` (3 steps — `embedding_provider_probe`, `rag_index_subset`, `rag_retrieve_probe`). Total step count: 19 for `showcase_rich`, 11 for `demo_minimal` and `sparse`. Phase ids on `showcase_rich` are `data` / `modeling` / `decision` / `planning` / `knowledge` / `verify` / `agent` / `cleanup` (8 phases). The knowledge steps SKIP gracefully when the embedding provider is unreachable; the pipeline still goes green. - E3 (#392) — the planning-phase steps tag the plans they save: pipeline-saved plans now carry `source:showcase` (alongside the legacy `showcase` + `price`/`holiday` tags), and on `preservation="keep"` runs additionally `workspace:` — retrievable via `GET /scenarios?tags=workspace: