Skip to content

feat(reflexio): add memory observability data layer#162

Open
wenchanghan wants to merge 7 commits into
ReflexioAI:mainfrom
wenchanghan:feat/memory-observability-hygiene
Open

feat(reflexio): add memory observability data layer#162
wenchanghan wants to merge 7 commits into
ReflexioAI:mainfrom
wenchanghan:feat/memory-observability-hygiene

Conversation

@wenchanghan

@wenchanghan wenchanghan commented Jun 16, 2026

Copy link
Copy Markdown

Context

Memory is currently hard to operate: owners can add playbooks, but they cannot easily see which ones are repeatedly injected, which ones influence answers, or which ones are stale/noisy enough to review. The customer-facing goal is a Memory Health workflow where a data owner can:

  • see per-playbook context cost and usage,
  • review stale or high-cost/low-citation playbooks,
  • archive or annotate candidates after human review,
  • reduce memory noise without manually auditing the whole store.

Two-phase approach

This is change 1 of 2.

Phase Repo Scope
1 reflexio Data layer and API contract: capture per-entity injection telemetry, expose review candidates, and support review actions.
2 claude-smart User experience: dashboard/CLI surfaces that call these APIs and guide the owner through review.

This split keeps Reflexio focused on durable storage/API primitives while the follow-up PR owns the customer-facing workflow.

What this PR covers

  • Adds usage_events storage for per-entity learning_injection telemetry.
  • Adds get_injection_stats through storage, lib, REST, and client.
  • Adds get_memory_review through storage, lib, REST, and client.
  • Makes memory review scoped by default: callers must pass user_id, unless they explicitly set include_all_users=true.
  • Implements v1 review signals:
    • stale
    • high_cost_low_cite
  • Excludes archived user playbooks from the review queue.
  • Extends update_user_playbook with status and playbook_metadata so a reviewer can archive or annotate candidates.
  • Persists playbook_metadata on user_playbooks and exposes it through domain/public/UI models.
  • Preserves the REFLEXIO_ENABLE_USAGE_EVENT_SINK=0 opt-out.
  • Updates the docs app method registry for the new observability endpoints and review-action fields.

Deferred by design:

  • duplicate detection: better handled as a periodic/batch job.
  • supersedeable signal: current aggregation logs key removed agent playbooks, not the user playbook ids this review queue returns.
  • claude-smart dashboard/CLI UX: handled in phase 2.

Validation

  • uv run ruff check <changed Python files> passed.
  • uv run pyright <changed Python files> passed.
  • Focused regression slice: 38 passed.
  • Broader impacted regression slice: 174 passed.
  • Docs lint: passed with 3 existing warnings.
  • Docs build: passed.
  • git diff --check passed.

Full-repo pyright is not clean on the current baseline; remaining failures are outside this patch and concentrated in existing e2e/test optional-member and mock-typing areas.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added Observability section to navigation for tracking system insights
    • Introduced injection statistics endpoint to monitor learning activity metrics
    • Added memory review candidate identification to surface optimization opportunities
    • Extended playbook metadata support for enhanced organization and reference tracking
  • Chores

    • Enhanced storage infrastructure to support observability features

@coderabbitai

coderabbitai Bot commented Jun 16, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

Adds a complete memory observability stack: a usage_events SQLite table with migrations and retention, a SqliteUsageEventSink process-global recorder gated by REFLEXIO_ENABLE_USAGE_EVENT_SINK, per-entity learning_injection event emission on unified search, two new API endpoints (/api/get_injection_stats, /api/get_memory_review) with analytics queries, a playbook_metadata field propagated across all playbook schemas/storage/UI layers, docs registration of an Observability nav group, and full test coverage.

Changes

Memory Observability and Injection Metering

Layer / File(s) Summary
API schemas: new observability models and playbook_metadata field
reflexio/models/api_schema/retriever_schema.py, reflexio/models/api_schema/domain/entities.py, reflexio/models/api_schema/ui/entities.py, reflexio/models/api_schema/ui/converters.py
Adds InjectionStat, GetInjectionStatsRequest/Response, MemoryReviewCandidate, GetMemoryReviewRequest/Response (with scope validator), extends UpdateUserPlaybookRequest with status/playbook_metadata, and propagates playbook_metadata through UserPlaybook, PublicUserPlaybook, UserPlaybookView, and the to_user_playbook_view converter.
Storage base contract: record_usage_event, analytics, update_user_playbook
reflexio/server/services/storage/storage_base/_extras.py, reflexio/server/services/storage/storage_base/_playbook.py
Extends abstract ExtrasMixin with record_usage_event, get_injection_stats, and get_memory_review_candidates (defaulting to empty lists); extends abstract PlaybookMixin.update_user_playbook with status and playbook_metadata parameters.
SQLite storage: usage_events table, migrations, analytics queries, playbook_metadata
reflexio/server/services/storage/sqlite_storage/_base.py, reflexio/server/services/storage/sqlite_storage/_extras.py, reflexio/server/services/storage/sqlite_storage/_playbook.py, reflexio/server/services/storage/retention.py
Adds usage_events DDL/indexes and playbook_metadata column to user_playbooks, implements idempotent migration helpers, adds record_usage_event insert with org-id validation, get_injection_stats aggregation query, get_memory_review_candidates scored-signal logic with citation counting, extends save_user_playbooks/update_user_playbook SQL, and registers usage_events in retention targets.
Usage event sink and CLI wiring
reflexio/server/services/usage_event_sink.py, reflexio/server/api.py (wiring), reflexio/cli/run_services.py
Implements SqliteUsageEventSink with lazy per-org storage resolution, fire-and-forget error handling, and JSON-safe metadata conversion; wires via _wire_usage_event_sink in create_app when REFLEXIO_ENABLE_USAGE_EVENT_SINK=1; sets os.environ.setdefault in run_services.execute().
Injection event emission on unified search
reflexio/server/billing_meter.py, reflexio/server/api.py (metering)
Adds record_injection_events emitting one learning_injection event per entity for production_agent callers with per-entity prompt token counts; integrates into unified_search_endpoint via _meter_injection_events.
API endpoints, DashboardMixin handlers, and ReflexioClient methods
reflexio/server/api.py, reflexio/lib/_dashboard.py, reflexio/lib/_user_playbook.py, reflexio/client/client.py
Adds FastAPI POST routes /api/get_injection_stats and /api/get_memory_review; implements DashboardMixin handlers with storage-not-configured short-circuits and signal_filter post-filtering; forwards status/playbook_metadata through UserPlaybookMixin; adds ReflexioClient.get_injection_stats, get_memory_review, and extends update_user_playbook.
Docs: Observability nav group and method definitions
docs/components/layout/sidebar.tsx, docs/lib/constants.ts, docs/lib/methods/observability.ts, docs/lib/methods/registry.ts, docs/lib/methods/user-playbooks.ts
Registers Activity icon in sidebar, adds observability nav group, creates observabilityMethods with full endpoint/parameter metadata for both new endpoints, wires them into the method registry, and documents new update-user-playbook fields.
Tests: full coverage across all layers
tests/server/test_usage_event_sink.py, tests/server/test_billing_meter.py, tests/server/services/storage/test_storage_contract_extras.py, tests/server/services/storage/test_storage_contract_playbook.py, tests/server/api_endpoints/test_injection_stats.py, tests/server/api_endpoints/test_memory_review.py, tests/lib/test_config_unit.py, tests/client/test_memory_observability_client.py, tests/cli/test_run_services_workers.py, tests/conftest.py
Adds sink, billing meter, storage contract (usage events, memory review candidates, playbook_metadata), FastAPI endpoint, DashboardMixin unit, ReflexioClient, and CLI opt-out tests; isolates REFLEXIO_ENABLE_USAGE_EVENT_SINK from test environment.

Sequence Diagram(s)

sequenceDiagram
  participant Client as ReflexioClient / External
  participant API as FastAPI (api.py)
  participant Sink as SqliteUsageEventSink
  participant DashMixin as DashboardMixin
  participant Storage as SQLiteStorage

  Note over API,Sink: On unified search (production_agent)
  Client->>API: POST /api/unified_search
  API->>API: _meter_injection_events()
  API->>Sink: record_injection_events → UsageEvent(learning_injection)
  Sink->>Storage: record_usage_event(org_id, entity_type, entity_id, ...)
  Storage->>Storage: INSERT INTO usage_events

  Note over Client,Storage: Observability queries
  Client->>API: POST /api/get_injection_stats {days_back}
  API->>DashMixin: get_injection_stats(request)
  DashMixin->>Storage: get_injection_stats(days_back)
  Storage->>Storage: SELECT aggregate from usage_events
  Storage-->>DashMixin: list[InjectionStat]
  DashMixin-->>API: GetInjectionStatsResponse
  API-->>Client: {success, stats[]}

  Client->>API: POST /api/get_memory_review {user_id, days_back, signal_filter}
  API->>DashMixin: get_memory_review(request)
  DashMixin->>Storage: get_memory_review_candidates(days_back, user_id, include_all_users)
  Storage->>Storage: JOIN user_playbooks + usage_events + interactions.citations
  Storage-->>DashMixin: list[MemoryReviewCandidate]
  DashMixin->>DashMixin: filter by signal_filter
  DashMixin-->>API: GetMemoryReviewResponse
  API-->>Client: {success, candidates[]}
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

  • ReflexioAI/reflexio#126: Adds the core UsageEvent/record_usage_event billing seam and applied-learnings metering hooks that this PR extends to emit learning_injection events and persist/query them via the new usage_events storage layer.
  • ReflexioAI/reflexio#39: Introduces Interaction.citations JSON field that the new get_memory_review_candidates logic directly reads to compute per-playbook citation counts for candidate scoring.
  • ReflexioAI/reflexio#66: Establishes the RetentionTarget/RetentionMixin framework that this PR extends by registering usage_events as a new retention target.

Poem

🐇 Hop, hop, I watch what you inject!
Each playbook surfaced — I record, inspect.
The usage_events table blooms with rows,
Stale playbooks glow where signal_filter goes.
With memory review the warren stays bright —
No forgotten rule shall hide from sight! ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 67.61% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title 'feat(reflexio): add memory observability data layer' clearly and concisely describes the main feature added—a new observability data layer for memory health operations. It matches the core objective of implementing Phase 1 of memory observability, introducing the usage_events table and memory review candidate functionality.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@wenchanghan wenchanghan force-pushed the feat/memory-observability-hygiene branch from 5c62194 to 3e63ae4 Compare June 16, 2026 05:35
@wenchanghan wenchanghan changed the title feat(reflexio): per-entity observability + memory hygiene (M1+M5, change 1 of 2) feat(reflexio): per-entity observability + memory hygiene (change 1 of 2) Jun 16, 2026
@wenchanghan wenchanghan force-pushed the feat/memory-observability-hygiene branch from 3e63ae4 to 605768b Compare June 16, 2026 08:06
Derrick Han added 2 commits June 15, 2026 22:27
Adds per-entity observability for memory injection: a new
``usage_events`` table, a per-entity ``learning_injection`` event
emitted by the unified search endpoint, a storage method to read
those events back, a lib method that aggregates them, and a REST
endpoint that exposes the aggregated view for external consumers
(claude-smart's dashboard, hosted cloud dashboard).

Why a new event, not an extension of ``learning_applied``?

* ``learning_applied`` is one row per search, billing-oriented. It
  records the *count* of surfaced entities in a single
  ``count_value`` and has no entity id.
* ``learning_injection`` is one row per entity, observability-oriented.
  Each row carries ``entity_type`` + ``entity_id`` + ``prompt_tokens``
  and a ``count_value`` of 1.

The two are complementary: ``learning_applied`` answers the billing
question ("how many learnings were applied?") while
``learning_injection`` answers the dashboard question ("which
specific entity was rendered into context, and at what token
cost?"). Rollup queries from per-entity rows are trivial;
reconstructing per-entity detail from a per-search aggregate is
impossible, so the per-entity event is the source of truth.

What this commit ships (PR1 of 2 in the memory-health workstream)

* New ``usage_events`` table in SQLite with row-count retention
  wired into ``RETENTION_TARGETS``. Schema mirrors the
  ``UsageEvent`` dataclass; indexed for per-org time-window and
  per-entity rollup queries.
* New ``record_injection_events`` billing helper emits one
  ``learning_injection`` event per entity surfaced by a search,
  with ``prompt_tokens`` computed from the content via
  ``count_input_tokens`` (the canonical ``cl100k_base`` tokenizer).
* New ``record_usage_event`` and ``get_injection_stats`` storage
  methods (abstract in ``ExtrasMixin`` with ``[]`` default;
  implemented in ``sqlite_storage._extras``).
* New ``SqliteUsageEventSink`` wires the process-global
  ``UsageEventRecorder`` to the storage with a lazy storage
  resolver and a never-raise contract.
* New Pydantic schemas (``InjectionStat``,
  ``GetInjectionStatsRequest``, ``GetInjectionStatsResponse``),
  a lib method ``DashboardMixin.get_injection_stats``, and a REST
  endpoint ``POST /api/get_injection_stats`` for external consumers.
* The unified search endpoint fires both ``_meter_applied_learnings``
  and ``_meter_injection_events`` on every response; failures are
  caught and logged so the search hot path is never affected by
  observability failures.

Sink wiring

The sink is opt-in via the ``REFLEXIO_ENABLE_USAGE_EVENT_SINK=1``
env var. The CLI sets this in ``reflexio.cli.run_services.execute``
before spawning the backend subprocess; tests do not, so the no-op
default applies and existing test fixtures (which call
``configure_usage_event_recorder`` themselves) are not disturbed.
The sink wiring is in ``reflexio.server.api.create_app``, gated on
the env var, and a separate helper ``_wire_usage_event_sink`` keeps
the create_app cyclomatic complexity under the project threshold.

Dual read surface (lib + REST)

The lib method is the source of truth for in-process consumers
(tests, internal services like ``evaluation_overview/service.py``).
The REST endpoint is a thin wrapper for external consumers — the
``reflexio-client`` SDK (which is HTTP-only, shipping no lib) is
how ``claude-smart`` and the hosted cloud dashboard reach reflexio.
The two share the same storage method, so a single query path covers
both surfaces.

Tests

* :func:`tests/server/test_billing_meter.py` — 7 new tests for
  ``record_injection_events``.
* :func:`tests/server/services/storage/test_storage_contract_extras.py`
  — 6 new tests for ``record_usage_event`` / ``get_injection_stats``.
* :func:`tests/server/test_usage_event_sink.py` — 8 new tests for
  ``SqliteUsageEventSink``.
* :func:`tests/server/api_endpoints/test_injection_stats.py` — 7
  new tests for the REST endpoint.
* :func:`tests/lib/test_config_unit.py` — 5 new tests for
  ``DashboardMixin.get_injection_stats`` (lib level).
Adds the candidate-detection and action surface for memory review.
After this lands, the data owner can surface stale, low-cite, and
superseded playbooks via the lib's ``get_memory_review`` method
(or ``POST /api/get_memory_review`` for external consumers), and
clients can archive or supersede them via the extended
``update_user_playbook`` endpoint. The duplicate-signal detection
is deferred to a follow-up batch job (cosine is O(n²)).

The review workflow is human-driven, not automatic. Reflexio does
NOT auto-archive playbooks. The data owner reviews the candidate
list, decides which to act on, and triggers the action through the
existing ``update_user_playbook`` plumbing. The system provides
signals; the human makes the call.

What's in this commit (PR1 of 2 in the memory-health workstream)

* New ``MemoryReviewCandidate``, ``GetMemoryReviewRequest``, and
  ``GetMemoryReviewResponse`` Pydantic schemas. The candidate model
  carries one or more of ``stale``, ``duplicate``,
  ``high_cost_low_cite``, ``supersedeable`` signals so a single
  row can be flagged for multiple reasons.
* New ``get_memory_review_candidates`` storage method (abstract in
  ``ExtrasMixin`` with ``[]`` default, implemented in SQLite).
  Implements three signals: ``stale`` (no injection in the window
  AND not modified recently), ``high_cost_low_cite`` (injected
  often, cited rarely), and ``supersedeable`` (entity id appears
  in a recent ``playbook_aggregation_change_logs``
  removed_playbooks snapshot).
* New lib method :meth:`DashboardMixin.get_memory_review` and a
  REST endpoint ``POST /api/get_memory_review`` for external
  consumers. Both honor ``signal_filter``.
* Extended :class:`UpdateUserPlaybookRequest` with two new optional
  fields: ``status: Status | None`` (lets a client archive
  candidates) and ``playbook_metadata: str | None`` (lets a
  client stamp a ``{"superseded_by": <id>}`` reference on a
  playbook that has been replaced by a newer one). The storage
  :meth:`update_user_playbook` and the lib
  :meth:`UserPlaybookMixin.update_user_playbook` are extended
  with the same two params; both default to ``None`` so the change
  is backwards-compatible — every existing call site still works.

Dual read surface (lib + REST)

Same pattern as the telemetry commit: the lib is for in-process
consumers (tests, internal services); the REST endpoint is a thin
wrapper for external consumers (claude-smart's dashboard, hosted
cloud dashboard) which reach reflexio through the HTTP-only
``reflexio-client`` SDK.

Tests

* :func:`tests/server/services/storage/test_storage_contract_extras.py`
  — 6 new tests in ``TestMemoryReviewCandidates``: empty when no
  playbooks, empty for zero/negative days_back, ``stale`` signal
  fires for old playbooks with no usage, ``stale`` does NOT fire
  for fresh playbooks, ``high_cost_low_cite`` fires when citation
  rate is below threshold, ``high_cost_low_cite`` does NOT fire when
  citation rate is good.
* :func:`tests/server/api_endpoints/test_memory_review.py` — 8 new
  tests for the REST endpoint: happy path, empty list, failure
  response, days_back validation (zero/negative), signal_filter
  validation, default days_back, signal_filter forwarding to lib.
* :func:`tests/lib/test_config_unit.py` — 7 new tests in
  ``TestGetMemoryReview`` (lib level): happy path, empty list,
  storage-not-configured, dict input, signal_filter narrowing,
  signal_filter=None returns all, exception returns failure, default
  days_back.
@wenchanghan wenchanghan force-pushed the feat/memory-observability-hygiene branch from 605768b to c1480a1 Compare June 16, 2026 08:30
Renames the human-review contract introduced in the previous
commit from 'memory hygiene' to 'memory review' so the surface
name matches the existing claude-smart CLI command (``memory-review``)
and accurately reflects that the workflow is human-driven (a review
queue), not an automatic hygiene process.

This is a breaking change for any caller that already adopted the
older surface. Since the surface is not yet released, the rename is
applied in this PR before any caller can depend on it.

What changes (mechanical)

* ``MemoryHygieneCandidate`` → ``MemoryReviewCandidate``
* ``GetMemoryHygieneRequest`` → ``GetMemoryReviewRequest``
* ``GetMemoryHygieneResponse`` → ``GetMemoryReviewResponse``
* Lib method ``DashboardMixin.get_memory_hygiene`` →
  ``DashboardMixin.get_memory_review``
* Storage method ``get_memory_hygiene_candidates`` →
  ``get_memory_review_candidates``
* Endpoint ``POST /api/get_memory_hygiene`` →
  ``POST /api/get_memory_review``
* Test file ``tests/server/api_endpoints/test_memory_hygiene.py`` →
  ``tests/server/api_endpoints/test_memory_review.py``
* Test class ``TestGetMemoryHygiene`` → ``TestGetMemoryReview``
* Test class ``TestMemoryHygieneCandidates`` →
  ``TestMemoryReviewCandidates``
* Docstrings and user-facing response messages updated
  (e.g. 'Retrieved memory hygiene candidates successfully' →
  'Retrieved memory review successfully')

What does NOT change

* The four signal names: ``stale``, ``high_cost_low_cite``,
  ``supersedeable``, ``duplicate``
* The contract shape (request/response fields, behavior)
* The ``update_user_playbook`` extensions (``status``,
  ``playbook_metadata``) — they're independent of the review
  surface name

No functional change. 137 insertions, 137 deletions across 9 files.
@wenchanghan wenchanghan force-pushed the feat/memory-observability-hygiene branch from c1480a1 to 3688547 Compare June 16, 2026 08:42
@wenchanghan wenchanghan changed the title feat(reflexio): per-entity observability + memory hygiene (change 1 of 2) feat(reflexio): per-entity observability + memory review (change 1 of 2) Jun 16, 2026
Eight review findings from the change-of-2 reviewer; seven fixed
in this commit, one is a real bug surfaced by the new tests, two
are deferred as follow-ups.

Findings addressed

* **ReflexioAI#1 — Stale module docstring on usage_event_sink.** The module
  docstring named a non-existent env var
  (``REFLEXIO_DISABLE_USAGE_EVENT_SINK``); the code uses
  ``REFLEXIO_ENABLE_USAGE_EVENT_SINK=1``. Updated the docstring
  to match the actual semantics.

* **ReflexioAI#2 — Broken Sphinx cross-ref and false 'shared SQL' claim in
  ``_get_applied_counts_for_window``.** The backslash before the
  backtick broke the cross-reference; the 'shared SQL' claim was
  false (the function runs its own ``SELECT``). Rewrote the
  docstring to describe the actual query and reference the
  correctly-formatted :meth: cross-reference.

* **ReflexioAI#3 — Sort key used ``signals[0]`` which is misleading.** The
  score already encodes signal priority (supersedeable=100,
  stale=50-99, high_cost_low_cite=30-49), so sorting by ``-score``
  gives the same grouping and order without depending on
  the order signals happen to be appended in. Simplified the
  sort and updated the docstring to explain the score → priority
  mapping.

* **ReflexioAI#4 — ``MemoryReviewCandidate.entity_type`` allowed
  ``"profile"`` but the storage only ever returns
  ``"playbook"``.** Narrowed the type to
  ``Literal["playbook"]`` for v1; the docstring now notes
  that a follow-up will widen the type when profile review is
  added.

* **ReflexioAI#6 — Stale check unsafe against unparseable ``created_at``.**
  The previous code used ``_iso_to_epoch(...) or 0``, so a
  malformed timestamp became epoch 0 and always satisfied the
  staleness threshold. Added an explicit guard that treats
  unparseable ``created_at`` as ``None`` (no stale signal,
  ``last_modified_at`` = ``None``).

* **ReflexioAI#8 — ``signal_filter=[]`` was treated like ``None``.**
  Changed the gate to ``is not None`` so an explicit empty
  list is a real (empty) filter, not 'no filter'. Updated the
  corresponding test to match.

Real bug surfaced

* **``playbook_aggregation_change_logs.created_at`` is
  ``INTEGER`` (epoch), but the query in
  ``get_memory_review_candidates`` compared it against
  ``start_ts_iso`` (an ISO string).** The string-vs-int
  comparison always returned 0 rows, so the ``supersedeable``
  signal never fired. Fixed by computing both ``start_ts``
  (epoch) and ``start_ts_iso`` (ISO) at the top of the
  function and using the right one for each comparison.

  Discovered by the new ``test_candidates_sorted_by_score_descending``
  test, which set up all three signal classes and expected the
  sort to be monotonic. The test failed with
  ``OperationalError: table playbook_aggregation_change_logs
  has no column named user_id`` (an unrelated issue in the test
  setup) and the missing ``supersedeable`` candidate, which
  traced back to the broken comparison.

New tests

* ``test_candidates_sorted_by_score_descending`` — sets up
  one candidate per signal class and asserts the result is sorted
  by ``-score``.
* ``test_stale_signal_skipped_when_created_at_unparseable`` —
  inserts a row with ``created_at='not-a-date'`` and asserts
  no ``stale`` signal is emitted.
* ``test_signal_filter_empty_list_filters_out_everything`` —
  asserts the new ``is not None`` semantics.

Deferred (out of scope)

* **ReflexioAI#5 — N synchronous DB writes per search.** A search returning
  N entities triggers N sequential ``INSERT``s. Flagged as a
  follow-up; the sink is opt-in and the v1 workstream is the
  data layer, not the perf budget.
* **ReflexioAI#7 — ``playbook_metadata: str | None`` in the API.** Would
  require a Pydantic model change + storage format negotiation;
  separate PR.
@wenchanghan

Copy link
Copy Markdown
Author

Code review

Reviewed as requested (draft). The two-phase design (data layer here, dashboard/CLI in claude-smart) is sound and the code latches onto existing patterns well for a first PR — thin REST wrappers over DashboardMixin, usage_events retention wiring, opt-in sink, and the record_injection_events / record_applied_learnings cost-vs-value split all fit the codebase. The gap it bridges (per-entity injection observability + a human-reviewed cleanup queue) is meaningful. Found 5 issues, the first four functional:

  1. update_user_playbook(playbook_metadata=...) writes to a column that does not exist on user_playbooks. The UPDATE user_playbooks SET playbook_metadata = ? path runs whenever a caller passes a non-None value — which is the PR's own documented archive/supersede workflow (playbook_metadata='{"superseded_by": <id>}') — and SQLite will raise OperationalError: no such column: playbook_metadata. The column only exists on agent_playbooks, and no migration adds it to user_playbooks.

params.append(status.value if hasattr(status, "value") else status)
if playbook_metadata is not None:
updates.append("playbook_metadata = ?")
params.append(playbook_metadata)
if updates:
params.append(user_playbook_id)
self._execute(
f"UPDATE user_playbooks SET {', '.join(updates)} WHERE user_playbook_id = ?",
tuple(params),

(DDL confirming user_playbooks has no playbook_metadata column:

CREATE TABLE IF NOT EXISTS user_playbooks (
user_playbook_id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT,
playbook_name TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
request_id TEXT NOT NULL,
agent_version TEXT NOT NULL DEFAULT '',
content TEXT NOT NULL DEFAULT '',
trigger TEXT,
rationale TEXT,
blocking_issue TEXT,
source_interaction_ids TEXT,
status TEXT,
source TEXT,
embedding TEXT,
expanded_terms TEXT,
source_span TEXT,
notes TEXT,
reader_angle TEXT
)

  1. The supersedeable signal can never fire. removed_playbooks is serialized from AgentPlaybookSnapshot.model_dump(), whose only id key is agent_playbook_id, but the extraction reads snap.get("user_playbook_id") or snap.get("id"). Neither key exists, so eid is always "", superseded_ids stays empty, and the highest-priority signal (score 100) is dead.

superseded_ids: set[str] = set()
for r in change_log_rows:
for snap in _json_loads(r["removed_playbooks"]) or []:
if not isinstance(snap, dict):
continue
eid = str(
snap.get("user_playbook_id")
or snap.get("id")
or ""
)
if eid:
superseded_ids.add(eid)

  1. Injection counts collide across the agent/user playbook id spaces. _meter_injection_events records both user playbooks (entity_id = user_playbook_id) and agent playbooks (entity_id = agent_playbook_id) under the same entity_type="playbook". get_memory_review_candidates then groups injections by entity_id only and looks them up by user_playbook_id. Since the two id sequences are independent AUTOINCREMENTs (both start at 1), a user playbook inherits an unrelated agent playbook's injection count, corrupting the stale and high_cost_low_cite signals.

for pb in resp.user_playbooks:
entities.append(("playbook", str(pb.user_playbook_id)))
contents.append(pb.content or "")
for ap in resp.agent_playbooks:
entities.append(("playbook", str(ap.agent_playbook_id)))
contents.append(ap.content or "")

  1. The review queue never excludes archived playbooks. The candidate query is SELECT ... status ... FROM user_playbooks with no WHERE on status, and status is selected but never used to filter. An archived playbook has inj_count == 0, so it permanently re-appears as stale — the user archives it, and it comes right back on the next call.

# Snapshot of all current user_playbooks.
playbook_rows = self._fetchall(
"""SELECT user_playbook_id, playbook_name, content, status,
created_at, source_span
FROM user_playbooks"""
)
if not playbook_rows:

  1. Docstring inaccuracies (minor): record_injection_events / _meter_injection_events say GET /api/get_injection_stats, but the route is @core_router.post. Separately, get_memory_review_candidates is documented as "sorted by (signals, -score)" but candidates.sort(key=lambda c: -c.score) sorts by score only; and the wiring docstrings attribute the REFLEXIO_ENABLE_USAGE_EVENT_SINK env var to reflexio.server.__main__, but it is set in reflexio/cli/run_services.py.

and are aggregated by the storage-layer ``get_injection_stats`` and
the ``GET /api/get_injection_stats`` endpoint. Pairs with
``learning_applied`` so the dashboard can answer both "what was

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

Derrick Han and others added 3 commits June 15, 2026 23:24
…bility-hygiene

# Conflicts:
#	reflexio/server/api.py
Resolves the issues raised in code review:

- Add playbook_metadata column to user_playbooks (DDL + backfill
  migration). update_user_playbook(playbook_metadata=...) — the
  supersede action's own path — previously raised OperationalError
  because the column only existed on agent_playbooks.
- Split injection-event entity_type into "user_playbook" /
  "agent_playbook" so the two independent id spaces no longer collide
  in get_memory_review_candidates' per-entity injection counts.
- Exclude archived playbooks from the review queue (COALESCE(status,'')
  != 'archived'); previously an archived candidate reappeared forever
  as stale.
- Defer the supersedeable signal to a follow-up (like duplicate): the
  aggregation change log records removed agent_playbook_id, not the
  user_playbook_id this review keys on, so it could never fire. Removed
  the dead code path and its hand-crafted test.
- Fix docstrings: POST (not GET) /api/get_injection_stats; sort is by
  score descending (not (signals, -score)); the sink env var is set in
  reflexio/cli/run_services.py (not reflexio.server.__main__).

Adds regression tests for the metadata write path and archived
exclusion; updates entity_type vocabulary across affected tests.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@wenchanghan wenchanghan changed the title feat(reflexio): per-entity observability + memory review (change 1 of 2) feat(reflexio): add memory observability data layer Jun 16, 2026
@wenchanghan wenchanghan marked this pull request as ready for review June 16, 2026 10:31

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (1)
tests/server/services/storage/test_storage_contract_playbook.py (1)

68-71: ⚡ Quick win

Strengthen metadata round-trip assertion across all returned rows.

The current check only validates index 0; it can miss partial mapping regressions. Assert metadata for every returned playbook.

Suggested assertion update
         result = storage.get_user_playbooks(playbook_name="fb")
         assert len(result) == 2
-        assert result[0].playbook_metadata
+        assert all(p.playbook_metadata for p in result)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/server/services/storage/test_storage_contract_playbook.py` around lines
68 - 71, The assertion after the get_user_playbooks call in the test only
validates that result[0] has playbook_metadata, but this misses potential
partial mapping regressions in other rows. Update the assertion to verify that
all items returned in the result list contain the playbook_metadata property,
not just the first element at index 0. This ensures complete metadata round-trip
validation across all returned playbook objects.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/lib/methods/observability.ts`:
- Line 3: The SIGNAL_ENUM constant includes unsupported signal values
("duplicate" and "supersedeable") that are deferred for future use, creating a
misleading API contract. Remove these two values from the SIGNAL_ENUM array,
keeping only the currently supported signals: "stale" and "high_cost_low_cite".
Additionally, update the signal_filter description (which also applies to the
same issue) to reflect only these currently supported signal values.

In `@reflexio/lib/_dashboard.py`:
- Line 201: In reflexio/lib/_dashboard.py at lines 201 and 251, the API-facing
response messages currently embed exception details via str(e), which can leak
internal implementation details to users. Replace the msg parameter in both
locations with a generic failure message that does not include the exception
string, and instead log the full exception details server-side using a logger
call so that debugging information is available to developers but not exposed to
API consumers.

In `@reflexio/server/services/storage/sqlite_storage/_base.py`:
- Around line 2105-2127: The clear_user_data() function needs to be updated to
include deletion of records from the new usage_events table. Add a DELETE
statement that removes all rows from usage_events where the user_id matches the
user being deleted, similar to how the function currently handles deletion from
interactions, user_playbooks, profiles, and requests tables. This ensures that
usage telemetry records containing user_id, request_id, and session_id are
properly removed when a user's data is cleared.

In `@reflexio/server/services/storage/sqlite_storage/_extras.py`:
- Around line 346-347: Update the docstring for the entity_type parameter
(around line 346-347) to correctly document the entity types that are actually
used by get_memory_review_candidates(). Replace the current documentation that
says entity_type should be "playbook" / "profile" with the correct entity types
that the memory-review query actually processes: user_playbook, agent_playbook,
and profile. This ensures that event producers emit stats with the correct
entity types that the memory review system will actually read.
- Around line 607-643: The stale signal detection uses created_at_epoch for both
staleness evaluation and last_modified_at, allowing recently-edited old
playbooks to be incorrectly marked as stale. Add an updated_at/last_modified_at
column to the database schema, update it in the update_user_playbook() function
whenever a playbook is modified, then replace the created_at_epoch comparison in
the stale check (around the condition checking current_time minus
created_at_epoch) with a comparison using this new modification timestamp
instead, and also update the last_modified_at field in the MemoryReviewCandidate
instantiation to use the new timestamp rather than created_at_epoch.

In `@reflexio/server/services/storage/storage_base/_playbook.py`:
- Around line 395-400: The docstring in the parameter documentation for the
status parameter contains an incorrect HTTP method specification. In the section
describing the status parameter and its relationship to the get_memory_review
endpoint (around line 397), change the route specification from GET
/api/get_memory_review to POST /api/get_memory_review to accurately reflect that
this endpoint uses the POST HTTP method, not GET. This correction ensures the
docstring correctly guides implementers on which HTTP method to use when calling
this endpoint.

In `@tests/server/services/storage/test_storage_contract_extras.py`:
- Around line 247-330: Add a new test method in the test suite (following the
pattern of existing test methods like
test_get_injection_stats_aggregates_by_entity) that explicitly verifies the
collision-regression fix for split playbook entity types. The test should record
multiple usage events with identical entity_id values but different entity_type
values (specifically "user_playbook" and "agent_playbook"), then call
get_injection_stats and assert that the returned stats contain separate
aggregation buckets for each entity_type/entity_id pair (not collapsed into a
single row). This ensures that the storage layer correctly treats different
entity types as distinct entities even when they share the same entity_id value,
preventing any regression in the collision handling logic.

---

Nitpick comments:
In `@tests/server/services/storage/test_storage_contract_playbook.py`:
- Around line 68-71: The assertion after the get_user_playbooks call in the test
only validates that result[0] has playbook_metadata, but this misses potential
partial mapping regressions in other rows. Update the assertion to verify that
all items returned in the result list contain the playbook_metadata property,
not just the first element at index 0. This ensures complete metadata round-trip
validation across all returned playbook objects.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: da735582-06c4-4de5-9050-c1caa45bd42f

📥 Commits

Reviewing files that changed from the base of the PR and between 424649c and bfb20f5.

📒 Files selected for processing (32)
  • docs/components/layout/sidebar.tsx
  • docs/lib/constants.ts
  • docs/lib/methods/observability.ts
  • docs/lib/methods/registry.ts
  • docs/lib/methods/user-playbooks.ts
  • reflexio/cli/run_services.py
  • reflexio/client/client.py
  • reflexio/lib/_dashboard.py
  • reflexio/lib/_user_playbook.py
  • reflexio/models/api_schema/domain/entities.py
  • reflexio/models/api_schema/retriever_schema.py
  • reflexio/models/api_schema/ui/converters.py
  • reflexio/models/api_schema/ui/entities.py
  • reflexio/server/api.py
  • reflexio/server/billing_meter.py
  • reflexio/server/services/storage/retention.py
  • reflexio/server/services/storage/sqlite_storage/_base.py
  • reflexio/server/services/storage/sqlite_storage/_extras.py
  • reflexio/server/services/storage/sqlite_storage/_playbook.py
  • reflexio/server/services/storage/storage_base/_extras.py
  • reflexio/server/services/storage/storage_base/_playbook.py
  • reflexio/server/services/usage_event_sink.py
  • tests/cli/test_run_services_workers.py
  • tests/client/test_memory_observability_client.py
  • tests/conftest.py
  • tests/lib/test_config_unit.py
  • tests/server/api_endpoints/test_injection_stats.py
  • tests/server/api_endpoints/test_memory_review.py
  • tests/server/services/storage/test_storage_contract_extras.py
  • tests/server/services/storage/test_storage_contract_playbook.py
  • tests/server/test_billing_meter.py
  • tests/server/test_usage_event_sink.py

@@ -0,0 +1,66 @@
import { MethodDef } from "../types";

const SIGNAL_ENUM = ["stale", "duplicate", "high_cost_low_cite", "supersedeable"];

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Limit documented signal_filter values to currently supported signals.

SIGNAL_ENUM (and the signal_filter description) includes duplicate and supersedeable, but backend behavior/context indicates those are deferred/reserved. Please document only currently supported values to avoid a misleading API contract.

Also applies to: 62-62

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/lib/methods/observability.ts` at line 3, The SIGNAL_ENUM constant
includes unsupported signal values ("duplicate" and "supersedeable") that are
deferred for future use, creating a misleading API contract. Remove these two
values from the SIGNAL_ENUM array, keeping only the currently supported signals:
"stale" and "high_cost_low_cite". Additionally, update the signal_filter
description (which also applies to the same issue) to reflect only these
currently supported signal values.

return GetInjectionStatsResponse(
success=False,
stats=[],
msg=f"Failed to get injection stats: {str(e)}",

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid returning raw exception text in API-facing response messages.

Line 201 and Line 251 currently embed str(e) in user-facing msg; this can leak internal storage/query/runtime details. Return a generic failure message and log full exception server-side instead.

Also applies to: 251-251

🧰 Tools
🪛 Ruff (0.15.17)

[warning] 201-201: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@reflexio/lib/_dashboard.py` at line 201, In reflexio/lib/_dashboard.py at
lines 201 and 251, the API-facing response messages currently embed exception
details via str(e), which can leak internal implementation details to users.
Replace the msg parameter in both locations with a generic failure message that
does not include the exception string, and instead log the full exception
details server-side using a logger call so that debugging information is
available to developers but not exposed to API consumers.

Source: Linters/SAST tools

Comment on lines +2105 to +2127
CREATE TABLE IF NOT EXISTS usage_events (
event_id INTEGER PRIMARY KEY AUTOINCREMENT,
org_id TEXT NOT NULL,
user_id TEXT NOT NULL DEFAULT '',
request_id TEXT NOT NULL DEFAULT '',
session_id TEXT NOT NULL DEFAULT '',
pipeline TEXT,
entity_type TEXT,
entity_id TEXT,
event_name TEXT NOT NULL,
event_category TEXT NOT NULL,
caller_type TEXT,
count_value INTEGER NOT NULL DEFAULT 1,
prompt_tokens INTEGER,
completion_tokens INTEGER,
billing_input_tokens INTEGER,
platform_llm INTEGER,
platform_storage INTEGER,
duration_ms INTEGER,
error_kind TEXT,
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Include usage_events in per-user data deletion.

The new table stores user_id, request_id, and session_id, but clear_user_data() still deletes only interactions, user playbooks, profiles, and requests. A clear-user request will now leave usage telemetry identifiers behind.

🛡️ Proposed direction
 def clear_user_data(self, user_id: str) -> dict[str, int]:
     with self._lock:
+        request_rows = self.conn.execute(
+            "SELECT request_id, session_id FROM requests WHERE user_id = ?",
+            (user_id,),
+        ).fetchall()
+        request_ids = [r["request_id"] for r in request_rows]
+        session_ids = [r["session_id"] for r in request_rows if r["session_id"]]
+
         # Snapshot rowids/ids that need FTS or vector cleanup before
         # the DELETE removes them from the main tables.
         interaction_ids = [
             r["interaction_id"]
@@
+        usage_clauses = ["user_id = ?"]
+        usage_params: list[Any] = [user_id]
+        if request_ids:
+            usage_clauses.append(
+                "request_id IN (" + ",".join("?" for _ in request_ids) + ")"
+            )
+            usage_params.extend(request_ids)
+        if session_ids:
+            usage_clauses.append(
+                "session_id IN (" + ",".join("?" for _ in session_ids) + ")"
+            )
+            usage_params.extend(session_ids)
+        usage_events_cur = self.conn.execute(
+            "DELETE FROM usage_events WHERE " + " OR ".join(usage_clauses),
+            usage_params,
+        )
+
         requests_cur = self.conn.execute(
             "DELETE FROM requests WHERE user_id = ?", (user_id,)
         )
         self.conn.commit()
 
         return {
             "interactions": interactions_cur.rowcount,
             "user_playbooks": user_playbooks_cur.rowcount,
             "profiles": profiles_cur.rowcount,
             "requests": requests_cur.rowcount,
+            "usage_events": usage_events_cur.rowcount,
         }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@reflexio/server/services/storage/sqlite_storage/_base.py` around lines 2105 -
2127, The clear_user_data() function needs to be updated to include deletion of
records from the new usage_events table. Add a DELETE statement that removes all
rows from usage_events where the user_id matches the user being deleted, similar
to how the function currently handles deletion from interactions,
user_playbooks, profiles, and requests tables. This ensures that usage telemetry
records containing user_id, request_id, and session_id are properly removed when
a user's data is cleared.

Comment on lines +346 to +347
entity_type: ``"playbook"`` / ``"profile"`` for per-entity events.
entity_id: Storage id of the surfaced entity.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Update the documented entity types to match the memory-review query.

This doc still says per-entity events use "playbook" / "profile", but get_memory_review_candidates() only reads entity_type = 'user_playbook'. Please document user_playbook, agent_playbook, and profile so producers do not emit stats that memory review ignores.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@reflexio/server/services/storage/sqlite_storage/_extras.py` around lines 346
- 347, Update the docstring for the entity_type parameter (around line 346-347)
to correctly document the entity types that are actually used by
get_memory_review_candidates(). Replace the current documentation that says
entity_type should be "playbook" / "profile" with the correct entity types that
the memory-review query actually processes: user_playbook, agent_playbook, and
profile. This ensures that event producers emit stats with the correct entity
types that the memory review system will actually read.

Comment on lines +607 to +643
# stale — only meaningful when we have a real created_at.
if (
created_at_epoch is not None
and inj_count == 0
and (current_time - created_at_epoch) >= days_back * 24 * 60 * 60
):
signals.append("stale")
# Older = higher score; clamp to [50, 99] so stale
# outranks high_cost_low_cite (30-49).
age_days = max(
0, (current_time - created_at_epoch) // (24 * 60 * 60)
)
score = max(score, min(99, 50 + age_days // 7))

# high_cost_low_cite
if inj_count >= 3 and cite_per_inj < 0.5:
signals.append("high_cost_low_cite")
score = max(score, min(49, 30 + min(19, inj_count)))

if not signals:
continue

title = row["playbook_name"] or (
(row["content"] or "")[:80] + ("..." if len(row["content"] or "") > 80 else "")
)
candidates.append(
MemoryReviewCandidate(
entity_type="user_playbook",
entity_id=eid,
title=title,
signals=signals,
score=score,
injection_count=inj_count,
citation_count=cite_count,
last_injected_at=inj["last_injected_at"],
last_cited_at=None, # not currently exposed by get_playbook_application_stats
last_modified_at=created_at_epoch,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Use a real modification timestamp for the stale signal.

The stale check and last_modified_at are based on created_at, so an old playbook edited moments ago can still be returned as stale. The PR contract says stale also means “not recently modified”; add an updated_at/last_modified_at column, update it in update_user_playbook(), and compare that timestamp here.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@reflexio/server/services/storage/sqlite_storage/_extras.py` around lines 607
- 643, The stale signal detection uses created_at_epoch for both staleness
evaluation and last_modified_at, allowing recently-edited old playbooks to be
incorrectly marked as stale. Add an updated_at/last_modified_at column to the
database schema, update it in the update_user_playbook() function whenever a
playbook is modified, then replace the created_at_epoch comparison in the stale
check (around the condition checking current_time minus created_at_epoch) with a
comparison using this new modification timestamp instead, and also update the
last_modified_at field in the MemoryReviewCandidate instantiation to use the new
timestamp rather than created_at_epoch.

Comment on lines +395 to +400
status (Status, optional): New lifecycle status. Lets a
client archive candidates surfaced by the lib's
``get_memory_review`` (or ``GET /api/get_memory_review``).
playbook_metadata (str, optional): Free-form metadata. Used
for the ``{"superseded_by": <id>}`` convention on
playbooks that have been replaced by a newer one.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Correct the memory-review route method in the contract docstring.

Line 397 still says GET /api/get_memory_review, but this cohort documents the endpoint as a POST route. Please update the docstring so implementers do not mirror the wrong HTTP method.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@reflexio/server/services/storage/storage_base/_playbook.py` around lines 395
- 400, The docstring in the parameter documentation for the status parameter
contains an incorrect HTTP method specification. In the section describing the
status parameter and its relationship to the get_memory_review endpoint (around
line 397), change the route specification from GET /api/get_memory_review to
POST /api/get_memory_review to accurately reflect that this endpoint uses the
POST HTTP method, not GET. This correction ensures the docstring correctly
guides implementers on which HTTP method to use when calling this endpoint.

Comment on lines +247 to +330
entity_type="playbook",
entity_id="42",
caller_type="production_agent",
session_id="s1",
request_id="r1",
prompt_tokens=10,
)
# The row is queryable via get_injection_stats (it filters on
# event_name='learning_injection' and entity_id IS NOT NULL).
stats = storage.get_injection_stats(days_back=30)
assert len(stats) == 1
assert stats[0].entity_type == "playbook"
assert stats[0].entity_id == "42"
assert stats[0].surfaced_count == 1
assert stats[0].total_prompt_tokens == 10

def test_record_usage_event_rejects_org_mismatch(self, storage):
if not _backend_supports_usage_events(storage):
pytest.skip("Backend does not implement record_usage_event")
from reflexio.server.services.storage.error import StorageError
with pytest.raises(StorageError, match="org_id mismatch"):
storage.record_usage_event(
org_id="wrong-org",
event_name="learning_injection",
event_category="application",
)

def test_get_injection_stats_empty_when_no_events(self, storage):
if not _backend_supports_usage_events(storage):
pytest.skip("Backend does not implement get_injection_stats")
assert storage.get_injection_stats(days_back=30) == []

def test_get_injection_stats_empty_for_zero_days_back(self, storage):
if not _backend_supports_usage_events(storage):
pytest.skip("Backend does not implement get_injection_stats")
# Defensive guard: zero/negative days_back returns [] (mirrors
# get_playbook_application_stats). Pydantic at the API layer
# already enforces gt=0, but storage should be robust on its own.
assert storage.get_injection_stats(days_back=0) == []
assert storage.get_injection_stats(days_back=-1) == []

def test_get_injection_stats_aggregates_by_entity(self, storage):
if not _backend_supports_usage_events(storage):
pytest.skip("Backend does not implement get_injection_stats")
# Two events for playbook 42 (one in s1, one in s2), one for profile p-99.
for sid, rid in (("s1", "r1"), ("s2", "r2")):
storage.record_usage_event(
org_id=storage.org_id,
event_name="learning_injection",
event_category="application",
entity_type="playbook",
entity_id="42",
caller_type="production_agent",
session_id=sid,
request_id=rid,
prompt_tokens=5,
)
storage.record_usage_event(
org_id=storage.org_id,
event_name="learning_injection",
event_category="application",
entity_type="profile",
entity_id="p-99",
caller_type="production_agent",
session_id="s1",
request_id="r1",
prompt_tokens=3,
)
stats = storage.get_injection_stats(days_back=30)
assert len(stats) == 2
# Most-injected sorts first.
top = stats[0]
assert top.entity_type == "playbook"
assert top.entity_id == "42"
assert top.surfaced_count == 2
assert top.total_prompt_tokens == 10
assert top.distinct_session_count is None or top.distinct_session_count >= 1
# Profile row second.
bottom = stats[1]
assert bottom.entity_type == "profile"
assert bottom.entity_id == "p-99"
assert bottom.surfaced_count == 1
assert bottom.total_prompt_tokens == 3

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Add an explicit collision-regression test for split playbook entity types.

Given the critical fix for agent/user ID-space collisions, this suite should assert that identical entity_id values under entity_type="user_playbook" and entity_type="agent_playbook" remain separate aggregation buckets.

Suggested test addition
+    def test_get_injection_stats_separates_user_and_agent_playbook_ids(self, storage):
+        if not _backend_supports_usage_events(storage):
+            pytest.skip("Backend does not implement get_injection_stats")
+        for entity_type in ("user_playbook", "agent_playbook"):
+            storage.record_usage_event(
+                org_id=storage.org_id,
+                event_name="learning_injection",
+                event_category="application",
+                entity_type=entity_type,
+                entity_id="42",
+                caller_type="production_agent",
+                session_id=f"s-{entity_type}",
+                request_id=f"r-{entity_type}",
+                prompt_tokens=1,
+            )
+        stats = storage.get_injection_stats(days_back=30)
+        keys = {(s.entity_type, s.entity_id) for s in stats}
+        assert ("user_playbook", "42") in keys
+        assert ("agent_playbook", "42") in keys
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
entity_type="playbook",
entity_id="42",
caller_type="production_agent",
session_id="s1",
request_id="r1",
prompt_tokens=10,
)
# The row is queryable via get_injection_stats (it filters on
# event_name='learning_injection' and entity_id IS NOT NULL).
stats = storage.get_injection_stats(days_back=30)
assert len(stats) == 1
assert stats[0].entity_type == "playbook"
assert stats[0].entity_id == "42"
assert stats[0].surfaced_count == 1
assert stats[0].total_prompt_tokens == 10
def test_record_usage_event_rejects_org_mismatch(self, storage):
if not _backend_supports_usage_events(storage):
pytest.skip("Backend does not implement record_usage_event")
from reflexio.server.services.storage.error import StorageError
with pytest.raises(StorageError, match="org_id mismatch"):
storage.record_usage_event(
org_id="wrong-org",
event_name="learning_injection",
event_category="application",
)
def test_get_injection_stats_empty_when_no_events(self, storage):
if not _backend_supports_usage_events(storage):
pytest.skip("Backend does not implement get_injection_stats")
assert storage.get_injection_stats(days_back=30) == []
def test_get_injection_stats_empty_for_zero_days_back(self, storage):
if not _backend_supports_usage_events(storage):
pytest.skip("Backend does not implement get_injection_stats")
# Defensive guard: zero/negative days_back returns [] (mirrors
# get_playbook_application_stats). Pydantic at the API layer
# already enforces gt=0, but storage should be robust on its own.
assert storage.get_injection_stats(days_back=0) == []
assert storage.get_injection_stats(days_back=-1) == []
def test_get_injection_stats_aggregates_by_entity(self, storage):
if not _backend_supports_usage_events(storage):
pytest.skip("Backend does not implement get_injection_stats")
# Two events for playbook 42 (one in s1, one in s2), one for profile p-99.
for sid, rid in (("s1", "r1"), ("s2", "r2")):
storage.record_usage_event(
org_id=storage.org_id,
event_name="learning_injection",
event_category="application",
entity_type="playbook",
entity_id="42",
caller_type="production_agent",
session_id=sid,
request_id=rid,
prompt_tokens=5,
)
storage.record_usage_event(
org_id=storage.org_id,
event_name="learning_injection",
event_category="application",
entity_type="profile",
entity_id="p-99",
caller_type="production_agent",
session_id="s1",
request_id="r1",
prompt_tokens=3,
)
stats = storage.get_injection_stats(days_back=30)
assert len(stats) == 2
# Most-injected sorts first.
top = stats[0]
assert top.entity_type == "playbook"
assert top.entity_id == "42"
assert top.surfaced_count == 2
assert top.total_prompt_tokens == 10
assert top.distinct_session_count is None or top.distinct_session_count >= 1
# Profile row second.
bottom = stats[1]
assert bottom.entity_type == "profile"
assert bottom.entity_id == "p-99"
assert bottom.surfaced_count == 1
assert bottom.total_prompt_tokens == 3
entity_type="playbook",
entity_id="42",
caller_type="production_agent",
session_id="s1",
request_id="r1",
prompt_tokens=10,
)
# The row is queryable via get_injection_stats (it filters on
# event_name='learning_injection' and entity_id IS NOT NULL).
stats = storage.get_injection_stats(days_back=30)
assert len(stats) == 1
assert stats[0].entity_type == "playbook"
assert stats[0].entity_id == "42"
assert stats[0].surfaced_count == 1
assert stats[0].total_prompt_tokens == 10
def test_record_usage_event_rejects_org_mismatch(self, storage):
if not _backend_supports_usage_events(storage):
pytest.skip("Backend does not implement record_usage_event")
from reflexio.server.services.storage.error import StorageError
with pytest.raises(StorageError, match="org_id mismatch"):
storage.record_usage_event(
org_id="wrong-org",
event_name="learning_injection",
event_category="application",
)
def test_get_injection_stats_empty_when_no_events(self, storage):
if not _backend_supports_usage_events(storage):
pytest.skip("Backend does not implement get_injection_stats")
assert storage.get_injection_stats(days_back=30) == []
def test_get_injection_stats_empty_for_zero_days_back(self, storage):
if not _backend_supports_usage_events(storage):
pytest.skip("Backend does not implement get_injection_stats")
# Defensive guard: zero/negative days_back returns [] (mirrors
# get_playbook_application_stats). Pydantic at the API layer
# already enforces gt=0, but storage should be robust on its own.
assert storage.get_injection_stats(days_back=0) == []
assert storage.get_injection_stats(days_back=-1) == []
def test_get_injection_stats_aggregates_by_entity(self, storage):
if not _backend_supports_usage_events(storage):
pytest.skip("Backend does not implement get_injection_stats")
# Two events for playbook 42 (one in s1, one in s2), one for profile p-99.
for sid, rid in (("s1", "r1"), ("s2", "r2")):
storage.record_usage_event(
org_id=storage.org_id,
event_name="learning_injection",
event_category="application",
entity_type="playbook",
entity_id="42",
caller_type="production_agent",
session_id=sid,
request_id=rid,
prompt_tokens=5,
)
storage.record_usage_event(
org_id=storage.org_id,
event_name="learning_injection",
event_category="application",
entity_type="profile",
entity_id="p-99",
caller_type="production_agent",
session_id="s1",
request_id="r1",
prompt_tokens=3,
)
stats = storage.get_injection_stats(days_back=30)
assert len(stats) == 2
# Most-injected sorts first.
top = stats[0]
assert top.entity_type == "playbook"
assert top.entity_id == "42"
assert top.surfaced_count == 2
assert top.total_prompt_tokens == 10
assert top.distinct_session_count is None or top.distinct_session_count >= 1
# Profile row second.
bottom = stats[1]
assert bottom.entity_type == "profile"
assert bottom.entity_id == "p-99"
assert bottom.surfaced_count == 1
assert bottom.total_prompt_tokens == 3
def test_get_injection_stats_separates_user_and_agent_playbook_ids(self, storage):
if not _backend_supports_usage_events(storage):
pytest.skip("Backend does not implement get_injection_stats")
for entity_type in ("user_playbook", "agent_playbook"):
storage.record_usage_event(
org_id=storage.org_id,
event_name="learning_injection",
event_category="application",
entity_type=entity_type,
entity_id="42",
caller_type="production_agent",
session_id=f"s-{entity_type}",
request_id=f"r-{entity_type}",
prompt_tokens=1,
)
stats = storage.get_injection_stats(days_back=30)
keys = {(s.entity_type, s.entity_id) for s in stats}
assert ("user_playbook", "42") in keys
assert ("agent_playbook", "42") in keys
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/server/services/storage/test_storage_contract_extras.py` around lines
247 - 330, Add a new test method in the test suite (following the pattern of
existing test methods like test_get_injection_stats_aggregates_by_entity) that
explicitly verifies the collision-regression fix for split playbook entity
types. The test should record multiple usage events with identical entity_id
values but different entity_type values (specifically "user_playbook" and
"agent_playbook"), then call get_injection_stats and assert that the returned
stats contain separate aggregation buckets for each entity_type/entity_id pair
(not collapsed into a single row). This ensures that the storage layer correctly
treats different entity types as distinct entities even when they share the same
entity_id value, preventing any regression in the collision handling logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant