feat(reflexio): add memory observability data layer#162
Conversation
📝 WalkthroughWalkthroughAdds a complete memory observability stack: a ChangesMemory Observability and Injection Metering
Sequence Diagram(s)sequenceDiagram
participant Client as ReflexioClient / External
participant API as FastAPI (api.py)
participant Sink as SqliteUsageEventSink
participant DashMixin as DashboardMixin
participant Storage as SQLiteStorage
Note over API,Sink: On unified search (production_agent)
Client->>API: POST /api/unified_search
API->>API: _meter_injection_events()
API->>Sink: record_injection_events → UsageEvent(learning_injection)
Sink->>Storage: record_usage_event(org_id, entity_type, entity_id, ...)
Storage->>Storage: INSERT INTO usage_events
Note over Client,Storage: Observability queries
Client->>API: POST /api/get_injection_stats {days_back}
API->>DashMixin: get_injection_stats(request)
DashMixin->>Storage: get_injection_stats(days_back)
Storage->>Storage: SELECT aggregate from usage_events
Storage-->>DashMixin: list[InjectionStat]
DashMixin-->>API: GetInjectionStatsResponse
API-->>Client: {success, stats[]}
Client->>API: POST /api/get_memory_review {user_id, days_back, signal_filter}
API->>DashMixin: get_memory_review(request)
DashMixin->>Storage: get_memory_review_candidates(days_back, user_id, include_all_users)
Storage->>Storage: JOIN user_playbooks + usage_events + interactions.citations
Storage-->>DashMixin: list[MemoryReviewCandidate]
DashMixin->>DashMixin: filter by signal_filter
DashMixin-->>API: GetMemoryReviewResponse
API-->>Client: {success, candidates[]}
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
5c62194 to
3e63ae4
Compare
3e63ae4 to
605768b
Compare
Adds per-entity observability for memory injection: a new
``usage_events`` table, a per-entity ``learning_injection`` event
emitted by the unified search endpoint, a storage method to read
those events back, a lib method that aggregates them, and a REST
endpoint that exposes the aggregated view for external consumers
(claude-smart's dashboard, hosted cloud dashboard).
Why a new event, not an extension of ``learning_applied``?
* ``learning_applied`` is one row per search, billing-oriented. It
records the *count* of surfaced entities in a single
``count_value`` and has no entity id.
* ``learning_injection`` is one row per entity, observability-oriented.
Each row carries ``entity_type`` + ``entity_id`` + ``prompt_tokens``
and a ``count_value`` of 1.
The two are complementary: ``learning_applied`` answers the billing
question ("how many learnings were applied?") while
``learning_injection`` answers the dashboard question ("which
specific entity was rendered into context, and at what token
cost?"). Rollup queries from per-entity rows are trivial;
reconstructing per-entity detail from a per-search aggregate is
impossible, so the per-entity event is the source of truth.
What this commit ships (PR1 of 2 in the memory-health workstream)
* New ``usage_events`` table in SQLite with row-count retention
wired into ``RETENTION_TARGETS``. Schema mirrors the
``UsageEvent`` dataclass; indexed for per-org time-window and
per-entity rollup queries.
* New ``record_injection_events`` billing helper emits one
``learning_injection`` event per entity surfaced by a search,
with ``prompt_tokens`` computed from the content via
``count_input_tokens`` (the canonical ``cl100k_base`` tokenizer).
* New ``record_usage_event`` and ``get_injection_stats`` storage
methods (abstract in ``ExtrasMixin`` with ``[]`` default;
implemented in ``sqlite_storage._extras``).
* New ``SqliteUsageEventSink`` wires the process-global
``UsageEventRecorder`` to the storage with a lazy storage
resolver and a never-raise contract.
* New Pydantic schemas (``InjectionStat``,
``GetInjectionStatsRequest``, ``GetInjectionStatsResponse``),
a lib method ``DashboardMixin.get_injection_stats``, and a REST
endpoint ``POST /api/get_injection_stats`` for external consumers.
* The unified search endpoint fires both ``_meter_applied_learnings``
and ``_meter_injection_events`` on every response; failures are
caught and logged so the search hot path is never affected by
observability failures.
Sink wiring
The sink is opt-in via the ``REFLEXIO_ENABLE_USAGE_EVENT_SINK=1``
env var. The CLI sets this in ``reflexio.cli.run_services.execute``
before spawning the backend subprocess; tests do not, so the no-op
default applies and existing test fixtures (which call
``configure_usage_event_recorder`` themselves) are not disturbed.
The sink wiring is in ``reflexio.server.api.create_app``, gated on
the env var, and a separate helper ``_wire_usage_event_sink`` keeps
the create_app cyclomatic complexity under the project threshold.
Dual read surface (lib + REST)
The lib method is the source of truth for in-process consumers
(tests, internal services like ``evaluation_overview/service.py``).
The REST endpoint is a thin wrapper for external consumers — the
``reflexio-client`` SDK (which is HTTP-only, shipping no lib) is
how ``claude-smart`` and the hosted cloud dashboard reach reflexio.
The two share the same storage method, so a single query path covers
both surfaces.
Tests
* :func:`tests/server/test_billing_meter.py` — 7 new tests for
``record_injection_events``.
* :func:`tests/server/services/storage/test_storage_contract_extras.py`
— 6 new tests for ``record_usage_event`` / ``get_injection_stats``.
* :func:`tests/server/test_usage_event_sink.py` — 8 new tests for
``SqliteUsageEventSink``.
* :func:`tests/server/api_endpoints/test_injection_stats.py` — 7
new tests for the REST endpoint.
* :func:`tests/lib/test_config_unit.py` — 5 new tests for
``DashboardMixin.get_injection_stats`` (lib level).
Adds the candidate-detection and action surface for memory review.
After this lands, the data owner can surface stale, low-cite, and
superseded playbooks via the lib's ``get_memory_review`` method
(or ``POST /api/get_memory_review`` for external consumers), and
clients can archive or supersede them via the extended
``update_user_playbook`` endpoint. The duplicate-signal detection
is deferred to a follow-up batch job (cosine is O(n²)).
The review workflow is human-driven, not automatic. Reflexio does
NOT auto-archive playbooks. The data owner reviews the candidate
list, decides which to act on, and triggers the action through the
existing ``update_user_playbook`` plumbing. The system provides
signals; the human makes the call.
What's in this commit (PR1 of 2 in the memory-health workstream)
* New ``MemoryReviewCandidate``, ``GetMemoryReviewRequest``, and
``GetMemoryReviewResponse`` Pydantic schemas. The candidate model
carries one or more of ``stale``, ``duplicate``,
``high_cost_low_cite``, ``supersedeable`` signals so a single
row can be flagged for multiple reasons.
* New ``get_memory_review_candidates`` storage method (abstract in
``ExtrasMixin`` with ``[]`` default, implemented in SQLite).
Implements three signals: ``stale`` (no injection in the window
AND not modified recently), ``high_cost_low_cite`` (injected
often, cited rarely), and ``supersedeable`` (entity id appears
in a recent ``playbook_aggregation_change_logs``
removed_playbooks snapshot).
* New lib method :meth:`DashboardMixin.get_memory_review` and a
REST endpoint ``POST /api/get_memory_review`` for external
consumers. Both honor ``signal_filter``.
* Extended :class:`UpdateUserPlaybookRequest` with two new optional
fields: ``status: Status | None`` (lets a client archive
candidates) and ``playbook_metadata: str | None`` (lets a
client stamp a ``{"superseded_by": <id>}`` reference on a
playbook that has been replaced by a newer one). The storage
:meth:`update_user_playbook` and the lib
:meth:`UserPlaybookMixin.update_user_playbook` are extended
with the same two params; both default to ``None`` so the change
is backwards-compatible — every existing call site still works.
Dual read surface (lib + REST)
Same pattern as the telemetry commit: the lib is for in-process
consumers (tests, internal services); the REST endpoint is a thin
wrapper for external consumers (claude-smart's dashboard, hosted
cloud dashboard) which reach reflexio through the HTTP-only
``reflexio-client`` SDK.
Tests
* :func:`tests/server/services/storage/test_storage_contract_extras.py`
— 6 new tests in ``TestMemoryReviewCandidates``: empty when no
playbooks, empty for zero/negative days_back, ``stale`` signal
fires for old playbooks with no usage, ``stale`` does NOT fire
for fresh playbooks, ``high_cost_low_cite`` fires when citation
rate is below threshold, ``high_cost_low_cite`` does NOT fire when
citation rate is good.
* :func:`tests/server/api_endpoints/test_memory_review.py` — 8 new
tests for the REST endpoint: happy path, empty list, failure
response, days_back validation (zero/negative), signal_filter
validation, default days_back, signal_filter forwarding to lib.
* :func:`tests/lib/test_config_unit.py` — 7 new tests in
``TestGetMemoryReview`` (lib level): happy path, empty list,
storage-not-configured, dict input, signal_filter narrowing,
signal_filter=None returns all, exception returns failure, default
days_back.
605768b to
c1480a1
Compare
Renames the human-review contract introduced in the previous commit from 'memory hygiene' to 'memory review' so the surface name matches the existing claude-smart CLI command (``memory-review``) and accurately reflects that the workflow is human-driven (a review queue), not an automatic hygiene process. This is a breaking change for any caller that already adopted the older surface. Since the surface is not yet released, the rename is applied in this PR before any caller can depend on it. What changes (mechanical) * ``MemoryHygieneCandidate`` → ``MemoryReviewCandidate`` * ``GetMemoryHygieneRequest`` → ``GetMemoryReviewRequest`` * ``GetMemoryHygieneResponse`` → ``GetMemoryReviewResponse`` * Lib method ``DashboardMixin.get_memory_hygiene`` → ``DashboardMixin.get_memory_review`` * Storage method ``get_memory_hygiene_candidates`` → ``get_memory_review_candidates`` * Endpoint ``POST /api/get_memory_hygiene`` → ``POST /api/get_memory_review`` * Test file ``tests/server/api_endpoints/test_memory_hygiene.py`` → ``tests/server/api_endpoints/test_memory_review.py`` * Test class ``TestGetMemoryHygiene`` → ``TestGetMemoryReview`` * Test class ``TestMemoryHygieneCandidates`` → ``TestMemoryReviewCandidates`` * Docstrings and user-facing response messages updated (e.g. 'Retrieved memory hygiene candidates successfully' → 'Retrieved memory review successfully') What does NOT change * The four signal names: ``stale``, ``high_cost_low_cite``, ``supersedeable``, ``duplicate`` * The contract shape (request/response fields, behavior) * The ``update_user_playbook`` extensions (``status``, ``playbook_metadata``) — they're independent of the review surface name No functional change. 137 insertions, 137 deletions across 9 files.
c1480a1 to
3688547
Compare
Eight review findings from the change-of-2 reviewer; seven fixed in this commit, one is a real bug surfaced by the new tests, two are deferred as follow-ups. Findings addressed * **ReflexioAI#1 — Stale module docstring on usage_event_sink.** The module docstring named a non-existent env var (``REFLEXIO_DISABLE_USAGE_EVENT_SINK``); the code uses ``REFLEXIO_ENABLE_USAGE_EVENT_SINK=1``. Updated the docstring to match the actual semantics. * **ReflexioAI#2 — Broken Sphinx cross-ref and false 'shared SQL' claim in ``_get_applied_counts_for_window``.** The backslash before the backtick broke the cross-reference; the 'shared SQL' claim was false (the function runs its own ``SELECT``). Rewrote the docstring to describe the actual query and reference the correctly-formatted :meth: cross-reference. * **ReflexioAI#3 — Sort key used ``signals[0]`` which is misleading.** The score already encodes signal priority (supersedeable=100, stale=50-99, high_cost_low_cite=30-49), so sorting by ``-score`` gives the same grouping and order without depending on the order signals happen to be appended in. Simplified the sort and updated the docstring to explain the score → priority mapping. * **ReflexioAI#4 — ``MemoryReviewCandidate.entity_type`` allowed ``"profile"`` but the storage only ever returns ``"playbook"``.** Narrowed the type to ``Literal["playbook"]`` for v1; the docstring now notes that a follow-up will widen the type when profile review is added. * **ReflexioAI#6 — Stale check unsafe against unparseable ``created_at``.** The previous code used ``_iso_to_epoch(...) or 0``, so a malformed timestamp became epoch 0 and always satisfied the staleness threshold. Added an explicit guard that treats unparseable ``created_at`` as ``None`` (no stale signal, ``last_modified_at`` = ``None``). * **ReflexioAI#8 — ``signal_filter=[]`` was treated like ``None``.** Changed the gate to ``is not None`` so an explicit empty list is a real (empty) filter, not 'no filter'. Updated the corresponding test to match. Real bug surfaced * **``playbook_aggregation_change_logs.created_at`` is ``INTEGER`` (epoch), but the query in ``get_memory_review_candidates`` compared it against ``start_ts_iso`` (an ISO string).** The string-vs-int comparison always returned 0 rows, so the ``supersedeable`` signal never fired. Fixed by computing both ``start_ts`` (epoch) and ``start_ts_iso`` (ISO) at the top of the function and using the right one for each comparison. Discovered by the new ``test_candidates_sorted_by_score_descending`` test, which set up all three signal classes and expected the sort to be monotonic. The test failed with ``OperationalError: table playbook_aggregation_change_logs has no column named user_id`` (an unrelated issue in the test setup) and the missing ``supersedeable`` candidate, which traced back to the broken comparison. New tests * ``test_candidates_sorted_by_score_descending`` — sets up one candidate per signal class and asserts the result is sorted by ``-score``. * ``test_stale_signal_skipped_when_created_at_unparseable`` — inserts a row with ``created_at='not-a-date'`` and asserts no ``stale`` signal is emitted. * ``test_signal_filter_empty_list_filters_out_everything`` — asserts the new ``is not None`` semantics. Deferred (out of scope) * **ReflexioAI#5 — N synchronous DB writes per search.** A search returning N entities triggers N sequential ``INSERT``s. Flagged as a follow-up; the sink is opt-in and the v1 workstream is the data layer, not the perf budget. * **ReflexioAI#7 — ``playbook_metadata: str | None`` in the API.** Would require a Pydantic model change + storage format negotiation; separate PR.
Code reviewReviewed as requested (draft). The two-phase design (data layer here, dashboard/CLI in claude-smart) is sound and the code latches onto existing patterns well for a first PR — thin REST wrappers over
reflexio/reflexio/server/services/storage/sqlite_storage/_playbook.py Lines 830 to 838 in 51e4699 (DDL confirming reflexio/reflexio/server/services/storage/sqlite_storage/_base.py Lines 1751 to 1769 in 51e4699
reflexio/reflexio/server/services/storage/sqlite_storage/_extras.py Lines 570 to 582 in 51e4699
reflexio/reflexio/server/api.py Lines 582 to 587 in 51e4699
reflexio/reflexio/server/services/storage/sqlite_storage/_extras.py Lines 528 to 534 in 51e4699
reflexio/reflexio/server/api.py Lines 556 to 558 in 51e4699 🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
…bility-hygiene # Conflicts: # reflexio/server/api.py
Resolves the issues raised in code review: - Add playbook_metadata column to user_playbooks (DDL + backfill migration). update_user_playbook(playbook_metadata=...) — the supersede action's own path — previously raised OperationalError because the column only existed on agent_playbooks. - Split injection-event entity_type into "user_playbook" / "agent_playbook" so the two independent id spaces no longer collide in get_memory_review_candidates' per-entity injection counts. - Exclude archived playbooks from the review queue (COALESCE(status,'') != 'archived'); previously an archived candidate reappeared forever as stale. - Defer the supersedeable signal to a follow-up (like duplicate): the aggregation change log records removed agent_playbook_id, not the user_playbook_id this review keys on, so it could never fire. Removed the dead code path and its hand-crafted test. - Fix docstrings: POST (not GET) /api/get_injection_stats; sort is by score descending (not (signals, -score)); the sink env var is set in reflexio/cli/run_services.py (not reflexio.server.__main__). Adds regression tests for the metadata write path and archived exclusion; updates entity_type vocabulary across affected tests. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (1)
tests/server/services/storage/test_storage_contract_playbook.py (1)
68-71: ⚡ Quick winStrengthen metadata round-trip assertion across all returned rows.
The current check only validates index
0; it can miss partial mapping regressions. Assert metadata for every returned playbook.Suggested assertion update
result = storage.get_user_playbooks(playbook_name="fb") assert len(result) == 2 - assert result[0].playbook_metadata + assert all(p.playbook_metadata for p in result)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/server/services/storage/test_storage_contract_playbook.py` around lines 68 - 71, The assertion after the get_user_playbooks call in the test only validates that result[0] has playbook_metadata, but this misses potential partial mapping regressions in other rows. Update the assertion to verify that all items returned in the result list contain the playbook_metadata property, not just the first element at index 0. This ensures complete metadata round-trip validation across all returned playbook objects.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/lib/methods/observability.ts`:
- Line 3: The SIGNAL_ENUM constant includes unsupported signal values
("duplicate" and "supersedeable") that are deferred for future use, creating a
misleading API contract. Remove these two values from the SIGNAL_ENUM array,
keeping only the currently supported signals: "stale" and "high_cost_low_cite".
Additionally, update the signal_filter description (which also applies to the
same issue) to reflect only these currently supported signal values.
In `@reflexio/lib/_dashboard.py`:
- Line 201: In reflexio/lib/_dashboard.py at lines 201 and 251, the API-facing
response messages currently embed exception details via str(e), which can leak
internal implementation details to users. Replace the msg parameter in both
locations with a generic failure message that does not include the exception
string, and instead log the full exception details server-side using a logger
call so that debugging information is available to developers but not exposed to
API consumers.
In `@reflexio/server/services/storage/sqlite_storage/_base.py`:
- Around line 2105-2127: The clear_user_data() function needs to be updated to
include deletion of records from the new usage_events table. Add a DELETE
statement that removes all rows from usage_events where the user_id matches the
user being deleted, similar to how the function currently handles deletion from
interactions, user_playbooks, profiles, and requests tables. This ensures that
usage telemetry records containing user_id, request_id, and session_id are
properly removed when a user's data is cleared.
In `@reflexio/server/services/storage/sqlite_storage/_extras.py`:
- Around line 346-347: Update the docstring for the entity_type parameter
(around line 346-347) to correctly document the entity types that are actually
used by get_memory_review_candidates(). Replace the current documentation that
says entity_type should be "playbook" / "profile" with the correct entity types
that the memory-review query actually processes: user_playbook, agent_playbook,
and profile. This ensures that event producers emit stats with the correct
entity types that the memory review system will actually read.
- Around line 607-643: The stale signal detection uses created_at_epoch for both
staleness evaluation and last_modified_at, allowing recently-edited old
playbooks to be incorrectly marked as stale. Add an updated_at/last_modified_at
column to the database schema, update it in the update_user_playbook() function
whenever a playbook is modified, then replace the created_at_epoch comparison in
the stale check (around the condition checking current_time minus
created_at_epoch) with a comparison using this new modification timestamp
instead, and also update the last_modified_at field in the MemoryReviewCandidate
instantiation to use the new timestamp rather than created_at_epoch.
In `@reflexio/server/services/storage/storage_base/_playbook.py`:
- Around line 395-400: The docstring in the parameter documentation for the
status parameter contains an incorrect HTTP method specification. In the section
describing the status parameter and its relationship to the get_memory_review
endpoint (around line 397), change the route specification from GET
/api/get_memory_review to POST /api/get_memory_review to accurately reflect that
this endpoint uses the POST HTTP method, not GET. This correction ensures the
docstring correctly guides implementers on which HTTP method to use when calling
this endpoint.
In `@tests/server/services/storage/test_storage_contract_extras.py`:
- Around line 247-330: Add a new test method in the test suite (following the
pattern of existing test methods like
test_get_injection_stats_aggregates_by_entity) that explicitly verifies the
collision-regression fix for split playbook entity types. The test should record
multiple usage events with identical entity_id values but different entity_type
values (specifically "user_playbook" and "agent_playbook"), then call
get_injection_stats and assert that the returned stats contain separate
aggregation buckets for each entity_type/entity_id pair (not collapsed into a
single row). This ensures that the storage layer correctly treats different
entity types as distinct entities even when they share the same entity_id value,
preventing any regression in the collision handling logic.
---
Nitpick comments:
In `@tests/server/services/storage/test_storage_contract_playbook.py`:
- Around line 68-71: The assertion after the get_user_playbooks call in the test
only validates that result[0] has playbook_metadata, but this misses potential
partial mapping regressions in other rows. Update the assertion to verify that
all items returned in the result list contain the playbook_metadata property,
not just the first element at index 0. This ensures complete metadata round-trip
validation across all returned playbook objects.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: da735582-06c4-4de5-9050-c1caa45bd42f
📒 Files selected for processing (32)
docs/components/layout/sidebar.tsxdocs/lib/constants.tsdocs/lib/methods/observability.tsdocs/lib/methods/registry.tsdocs/lib/methods/user-playbooks.tsreflexio/cli/run_services.pyreflexio/client/client.pyreflexio/lib/_dashboard.pyreflexio/lib/_user_playbook.pyreflexio/models/api_schema/domain/entities.pyreflexio/models/api_schema/retriever_schema.pyreflexio/models/api_schema/ui/converters.pyreflexio/models/api_schema/ui/entities.pyreflexio/server/api.pyreflexio/server/billing_meter.pyreflexio/server/services/storage/retention.pyreflexio/server/services/storage/sqlite_storage/_base.pyreflexio/server/services/storage/sqlite_storage/_extras.pyreflexio/server/services/storage/sqlite_storage/_playbook.pyreflexio/server/services/storage/storage_base/_extras.pyreflexio/server/services/storage/storage_base/_playbook.pyreflexio/server/services/usage_event_sink.pytests/cli/test_run_services_workers.pytests/client/test_memory_observability_client.pytests/conftest.pytests/lib/test_config_unit.pytests/server/api_endpoints/test_injection_stats.pytests/server/api_endpoints/test_memory_review.pytests/server/services/storage/test_storage_contract_extras.pytests/server/services/storage/test_storage_contract_playbook.pytests/server/test_billing_meter.pytests/server/test_usage_event_sink.py
| @@ -0,0 +1,66 @@ | |||
| import { MethodDef } from "../types"; | |||
|
|
|||
| const SIGNAL_ENUM = ["stale", "duplicate", "high_cost_low_cite", "supersedeable"]; | |||
There was a problem hiding this comment.
Limit documented signal_filter values to currently supported signals.
SIGNAL_ENUM (and the signal_filter description) includes duplicate and supersedeable, but backend behavior/context indicates those are deferred/reserved. Please document only currently supported values to avoid a misleading API contract.
Also applies to: 62-62
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/lib/methods/observability.ts` at line 3, The SIGNAL_ENUM constant
includes unsupported signal values ("duplicate" and "supersedeable") that are
deferred for future use, creating a misleading API contract. Remove these two
values from the SIGNAL_ENUM array, keeping only the currently supported signals:
"stale" and "high_cost_low_cite". Additionally, update the signal_filter
description (which also applies to the same issue) to reflect only these
currently supported signal values.
| return GetInjectionStatsResponse( | ||
| success=False, | ||
| stats=[], | ||
| msg=f"Failed to get injection stats: {str(e)}", |
There was a problem hiding this comment.
Avoid returning raw exception text in API-facing response messages.
Line 201 and Line 251 currently embed str(e) in user-facing msg; this can leak internal storage/query/runtime details. Return a generic failure message and log full exception server-side instead.
Also applies to: 251-251
🧰 Tools
🪛 Ruff (0.15.17)
[warning] 201-201: Use explicit conversion flag
Replace with conversion flag
(RUF010)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@reflexio/lib/_dashboard.py` at line 201, In reflexio/lib/_dashboard.py at
lines 201 and 251, the API-facing response messages currently embed exception
details via str(e), which can leak internal implementation details to users.
Replace the msg parameter in both locations with a generic failure message that
does not include the exception string, and instead log the full exception
details server-side using a logger call so that debugging information is
available to developers but not exposed to API consumers.
Source: Linters/SAST tools
| CREATE TABLE IF NOT EXISTS usage_events ( | ||
| event_id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| org_id TEXT NOT NULL, | ||
| user_id TEXT NOT NULL DEFAULT '', | ||
| request_id TEXT NOT NULL DEFAULT '', | ||
| session_id TEXT NOT NULL DEFAULT '', | ||
| pipeline TEXT, | ||
| entity_type TEXT, | ||
| entity_id TEXT, | ||
| event_name TEXT NOT NULL, | ||
| event_category TEXT NOT NULL, | ||
| caller_type TEXT, | ||
| count_value INTEGER NOT NULL DEFAULT 1, | ||
| prompt_tokens INTEGER, | ||
| completion_tokens INTEGER, | ||
| billing_input_tokens INTEGER, | ||
| platform_llm INTEGER, | ||
| platform_storage INTEGER, | ||
| duration_ms INTEGER, | ||
| error_kind TEXT, | ||
| metadata_json TEXT NOT NULL DEFAULT '{}', | ||
| created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')) | ||
| ); |
There was a problem hiding this comment.
Include usage_events in per-user data deletion.
The new table stores user_id, request_id, and session_id, but clear_user_data() still deletes only interactions, user playbooks, profiles, and requests. A clear-user request will now leave usage telemetry identifiers behind.
🛡️ Proposed direction
def clear_user_data(self, user_id: str) -> dict[str, int]:
with self._lock:
+ request_rows = self.conn.execute(
+ "SELECT request_id, session_id FROM requests WHERE user_id = ?",
+ (user_id,),
+ ).fetchall()
+ request_ids = [r["request_id"] for r in request_rows]
+ session_ids = [r["session_id"] for r in request_rows if r["session_id"]]
+
# Snapshot rowids/ids that need FTS or vector cleanup before
# the DELETE removes them from the main tables.
interaction_ids = [
r["interaction_id"]
@@
+ usage_clauses = ["user_id = ?"]
+ usage_params: list[Any] = [user_id]
+ if request_ids:
+ usage_clauses.append(
+ "request_id IN (" + ",".join("?" for _ in request_ids) + ")"
+ )
+ usage_params.extend(request_ids)
+ if session_ids:
+ usage_clauses.append(
+ "session_id IN (" + ",".join("?" for _ in session_ids) + ")"
+ )
+ usage_params.extend(session_ids)
+ usage_events_cur = self.conn.execute(
+ "DELETE FROM usage_events WHERE " + " OR ".join(usage_clauses),
+ usage_params,
+ )
+
requests_cur = self.conn.execute(
"DELETE FROM requests WHERE user_id = ?", (user_id,)
)
self.conn.commit()
return {
"interactions": interactions_cur.rowcount,
"user_playbooks": user_playbooks_cur.rowcount,
"profiles": profiles_cur.rowcount,
"requests": requests_cur.rowcount,
+ "usage_events": usage_events_cur.rowcount,
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@reflexio/server/services/storage/sqlite_storage/_base.py` around lines 2105 -
2127, The clear_user_data() function needs to be updated to include deletion of
records from the new usage_events table. Add a DELETE statement that removes all
rows from usage_events where the user_id matches the user being deleted, similar
to how the function currently handles deletion from interactions,
user_playbooks, profiles, and requests tables. This ensures that usage telemetry
records containing user_id, request_id, and session_id are properly removed when
a user's data is cleared.
| entity_type: ``"playbook"`` / ``"profile"`` for per-entity events. | ||
| entity_id: Storage id of the surfaced entity. |
There was a problem hiding this comment.
Update the documented entity types to match the memory-review query.
This doc still says per-entity events use "playbook" / "profile", but get_memory_review_candidates() only reads entity_type = 'user_playbook'. Please document user_playbook, agent_playbook, and profile so producers do not emit stats that memory review ignores.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@reflexio/server/services/storage/sqlite_storage/_extras.py` around lines 346
- 347, Update the docstring for the entity_type parameter (around line 346-347)
to correctly document the entity types that are actually used by
get_memory_review_candidates(). Replace the current documentation that says
entity_type should be "playbook" / "profile" with the correct entity types that
the memory-review query actually processes: user_playbook, agent_playbook, and
profile. This ensures that event producers emit stats with the correct entity
types that the memory review system will actually read.
| # stale — only meaningful when we have a real created_at. | ||
| if ( | ||
| created_at_epoch is not None | ||
| and inj_count == 0 | ||
| and (current_time - created_at_epoch) >= days_back * 24 * 60 * 60 | ||
| ): | ||
| signals.append("stale") | ||
| # Older = higher score; clamp to [50, 99] so stale | ||
| # outranks high_cost_low_cite (30-49). | ||
| age_days = max( | ||
| 0, (current_time - created_at_epoch) // (24 * 60 * 60) | ||
| ) | ||
| score = max(score, min(99, 50 + age_days // 7)) | ||
|
|
||
| # high_cost_low_cite | ||
| if inj_count >= 3 and cite_per_inj < 0.5: | ||
| signals.append("high_cost_low_cite") | ||
| score = max(score, min(49, 30 + min(19, inj_count))) | ||
|
|
||
| if not signals: | ||
| continue | ||
|
|
||
| title = row["playbook_name"] or ( | ||
| (row["content"] or "")[:80] + ("..." if len(row["content"] or "") > 80 else "") | ||
| ) | ||
| candidates.append( | ||
| MemoryReviewCandidate( | ||
| entity_type="user_playbook", | ||
| entity_id=eid, | ||
| title=title, | ||
| signals=signals, | ||
| score=score, | ||
| injection_count=inj_count, | ||
| citation_count=cite_count, | ||
| last_injected_at=inj["last_injected_at"], | ||
| last_cited_at=None, # not currently exposed by get_playbook_application_stats | ||
| last_modified_at=created_at_epoch, |
There was a problem hiding this comment.
Use a real modification timestamp for the stale signal.
The stale check and last_modified_at are based on created_at, so an old playbook edited moments ago can still be returned as stale. The PR contract says stale also means “not recently modified”; add an updated_at/last_modified_at column, update it in update_user_playbook(), and compare that timestamp here.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@reflexio/server/services/storage/sqlite_storage/_extras.py` around lines 607
- 643, The stale signal detection uses created_at_epoch for both staleness
evaluation and last_modified_at, allowing recently-edited old playbooks to be
incorrectly marked as stale. Add an updated_at/last_modified_at column to the
database schema, update it in the update_user_playbook() function whenever a
playbook is modified, then replace the created_at_epoch comparison in the stale
check (around the condition checking current_time minus created_at_epoch) with a
comparison using this new modification timestamp instead, and also update the
last_modified_at field in the MemoryReviewCandidate instantiation to use the new
timestamp rather than created_at_epoch.
| status (Status, optional): New lifecycle status. Lets a | ||
| client archive candidates surfaced by the lib's | ||
| ``get_memory_review`` (or ``GET /api/get_memory_review``). | ||
| playbook_metadata (str, optional): Free-form metadata. Used | ||
| for the ``{"superseded_by": <id>}`` convention on | ||
| playbooks that have been replaced by a newer one. |
There was a problem hiding this comment.
Correct the memory-review route method in the contract docstring.
Line 397 still says GET /api/get_memory_review, but this cohort documents the endpoint as a POST route. Please update the docstring so implementers do not mirror the wrong HTTP method.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@reflexio/server/services/storage/storage_base/_playbook.py` around lines 395
- 400, The docstring in the parameter documentation for the status parameter
contains an incorrect HTTP method specification. In the section describing the
status parameter and its relationship to the get_memory_review endpoint (around
line 397), change the route specification from GET /api/get_memory_review to
POST /api/get_memory_review to accurately reflect that this endpoint uses the
POST HTTP method, not GET. This correction ensures the docstring correctly
guides implementers on which HTTP method to use when calling this endpoint.
| entity_type="playbook", | ||
| entity_id="42", | ||
| caller_type="production_agent", | ||
| session_id="s1", | ||
| request_id="r1", | ||
| prompt_tokens=10, | ||
| ) | ||
| # The row is queryable via get_injection_stats (it filters on | ||
| # event_name='learning_injection' and entity_id IS NOT NULL). | ||
| stats = storage.get_injection_stats(days_back=30) | ||
| assert len(stats) == 1 | ||
| assert stats[0].entity_type == "playbook" | ||
| assert stats[0].entity_id == "42" | ||
| assert stats[0].surfaced_count == 1 | ||
| assert stats[0].total_prompt_tokens == 10 | ||
|
|
||
| def test_record_usage_event_rejects_org_mismatch(self, storage): | ||
| if not _backend_supports_usage_events(storage): | ||
| pytest.skip("Backend does not implement record_usage_event") | ||
| from reflexio.server.services.storage.error import StorageError | ||
| with pytest.raises(StorageError, match="org_id mismatch"): | ||
| storage.record_usage_event( | ||
| org_id="wrong-org", | ||
| event_name="learning_injection", | ||
| event_category="application", | ||
| ) | ||
|
|
||
| def test_get_injection_stats_empty_when_no_events(self, storage): | ||
| if not _backend_supports_usage_events(storage): | ||
| pytest.skip("Backend does not implement get_injection_stats") | ||
| assert storage.get_injection_stats(days_back=30) == [] | ||
|
|
||
| def test_get_injection_stats_empty_for_zero_days_back(self, storage): | ||
| if not _backend_supports_usage_events(storage): | ||
| pytest.skip("Backend does not implement get_injection_stats") | ||
| # Defensive guard: zero/negative days_back returns [] (mirrors | ||
| # get_playbook_application_stats). Pydantic at the API layer | ||
| # already enforces gt=0, but storage should be robust on its own. | ||
| assert storage.get_injection_stats(days_back=0) == [] | ||
| assert storage.get_injection_stats(days_back=-1) == [] | ||
|
|
||
| def test_get_injection_stats_aggregates_by_entity(self, storage): | ||
| if not _backend_supports_usage_events(storage): | ||
| pytest.skip("Backend does not implement get_injection_stats") | ||
| # Two events for playbook 42 (one in s1, one in s2), one for profile p-99. | ||
| for sid, rid in (("s1", "r1"), ("s2", "r2")): | ||
| storage.record_usage_event( | ||
| org_id=storage.org_id, | ||
| event_name="learning_injection", | ||
| event_category="application", | ||
| entity_type="playbook", | ||
| entity_id="42", | ||
| caller_type="production_agent", | ||
| session_id=sid, | ||
| request_id=rid, | ||
| prompt_tokens=5, | ||
| ) | ||
| storage.record_usage_event( | ||
| org_id=storage.org_id, | ||
| event_name="learning_injection", | ||
| event_category="application", | ||
| entity_type="profile", | ||
| entity_id="p-99", | ||
| caller_type="production_agent", | ||
| session_id="s1", | ||
| request_id="r1", | ||
| prompt_tokens=3, | ||
| ) | ||
| stats = storage.get_injection_stats(days_back=30) | ||
| assert len(stats) == 2 | ||
| # Most-injected sorts first. | ||
| top = stats[0] | ||
| assert top.entity_type == "playbook" | ||
| assert top.entity_id == "42" | ||
| assert top.surfaced_count == 2 | ||
| assert top.total_prompt_tokens == 10 | ||
| assert top.distinct_session_count is None or top.distinct_session_count >= 1 | ||
| # Profile row second. | ||
| bottom = stats[1] | ||
| assert bottom.entity_type == "profile" | ||
| assert bottom.entity_id == "p-99" | ||
| assert bottom.surfaced_count == 1 | ||
| assert bottom.total_prompt_tokens == 3 | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Add an explicit collision-regression test for split playbook entity types.
Given the critical fix for agent/user ID-space collisions, this suite should assert that identical entity_id values under entity_type="user_playbook" and entity_type="agent_playbook" remain separate aggregation buckets.
Suggested test addition
+ def test_get_injection_stats_separates_user_and_agent_playbook_ids(self, storage):
+ if not _backend_supports_usage_events(storage):
+ pytest.skip("Backend does not implement get_injection_stats")
+ for entity_type in ("user_playbook", "agent_playbook"):
+ storage.record_usage_event(
+ org_id=storage.org_id,
+ event_name="learning_injection",
+ event_category="application",
+ entity_type=entity_type,
+ entity_id="42",
+ caller_type="production_agent",
+ session_id=f"s-{entity_type}",
+ request_id=f"r-{entity_type}",
+ prompt_tokens=1,
+ )
+ stats = storage.get_injection_stats(days_back=30)
+ keys = {(s.entity_type, s.entity_id) for s in stats}
+ assert ("user_playbook", "42") in keys
+ assert ("agent_playbook", "42") in keys📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| entity_type="playbook", | |
| entity_id="42", | |
| caller_type="production_agent", | |
| session_id="s1", | |
| request_id="r1", | |
| prompt_tokens=10, | |
| ) | |
| # The row is queryable via get_injection_stats (it filters on | |
| # event_name='learning_injection' and entity_id IS NOT NULL). | |
| stats = storage.get_injection_stats(days_back=30) | |
| assert len(stats) == 1 | |
| assert stats[0].entity_type == "playbook" | |
| assert stats[0].entity_id == "42" | |
| assert stats[0].surfaced_count == 1 | |
| assert stats[0].total_prompt_tokens == 10 | |
| def test_record_usage_event_rejects_org_mismatch(self, storage): | |
| if not _backend_supports_usage_events(storage): | |
| pytest.skip("Backend does not implement record_usage_event") | |
| from reflexio.server.services.storage.error import StorageError | |
| with pytest.raises(StorageError, match="org_id mismatch"): | |
| storage.record_usage_event( | |
| org_id="wrong-org", | |
| event_name="learning_injection", | |
| event_category="application", | |
| ) | |
| def test_get_injection_stats_empty_when_no_events(self, storage): | |
| if not _backend_supports_usage_events(storage): | |
| pytest.skip("Backend does not implement get_injection_stats") | |
| assert storage.get_injection_stats(days_back=30) == [] | |
| def test_get_injection_stats_empty_for_zero_days_back(self, storage): | |
| if not _backend_supports_usage_events(storage): | |
| pytest.skip("Backend does not implement get_injection_stats") | |
| # Defensive guard: zero/negative days_back returns [] (mirrors | |
| # get_playbook_application_stats). Pydantic at the API layer | |
| # already enforces gt=0, but storage should be robust on its own. | |
| assert storage.get_injection_stats(days_back=0) == [] | |
| assert storage.get_injection_stats(days_back=-1) == [] | |
| def test_get_injection_stats_aggregates_by_entity(self, storage): | |
| if not _backend_supports_usage_events(storage): | |
| pytest.skip("Backend does not implement get_injection_stats") | |
| # Two events for playbook 42 (one in s1, one in s2), one for profile p-99. | |
| for sid, rid in (("s1", "r1"), ("s2", "r2")): | |
| storage.record_usage_event( | |
| org_id=storage.org_id, | |
| event_name="learning_injection", | |
| event_category="application", | |
| entity_type="playbook", | |
| entity_id="42", | |
| caller_type="production_agent", | |
| session_id=sid, | |
| request_id=rid, | |
| prompt_tokens=5, | |
| ) | |
| storage.record_usage_event( | |
| org_id=storage.org_id, | |
| event_name="learning_injection", | |
| event_category="application", | |
| entity_type="profile", | |
| entity_id="p-99", | |
| caller_type="production_agent", | |
| session_id="s1", | |
| request_id="r1", | |
| prompt_tokens=3, | |
| ) | |
| stats = storage.get_injection_stats(days_back=30) | |
| assert len(stats) == 2 | |
| # Most-injected sorts first. | |
| top = stats[0] | |
| assert top.entity_type == "playbook" | |
| assert top.entity_id == "42" | |
| assert top.surfaced_count == 2 | |
| assert top.total_prompt_tokens == 10 | |
| assert top.distinct_session_count is None or top.distinct_session_count >= 1 | |
| # Profile row second. | |
| bottom = stats[1] | |
| assert bottom.entity_type == "profile" | |
| assert bottom.entity_id == "p-99" | |
| assert bottom.surfaced_count == 1 | |
| assert bottom.total_prompt_tokens == 3 | |
| entity_type="playbook", | |
| entity_id="42", | |
| caller_type="production_agent", | |
| session_id="s1", | |
| request_id="r1", | |
| prompt_tokens=10, | |
| ) | |
| # The row is queryable via get_injection_stats (it filters on | |
| # event_name='learning_injection' and entity_id IS NOT NULL). | |
| stats = storage.get_injection_stats(days_back=30) | |
| assert len(stats) == 1 | |
| assert stats[0].entity_type == "playbook" | |
| assert stats[0].entity_id == "42" | |
| assert stats[0].surfaced_count == 1 | |
| assert stats[0].total_prompt_tokens == 10 | |
| def test_record_usage_event_rejects_org_mismatch(self, storage): | |
| if not _backend_supports_usage_events(storage): | |
| pytest.skip("Backend does not implement record_usage_event") | |
| from reflexio.server.services.storage.error import StorageError | |
| with pytest.raises(StorageError, match="org_id mismatch"): | |
| storage.record_usage_event( | |
| org_id="wrong-org", | |
| event_name="learning_injection", | |
| event_category="application", | |
| ) | |
| def test_get_injection_stats_empty_when_no_events(self, storage): | |
| if not _backend_supports_usage_events(storage): | |
| pytest.skip("Backend does not implement get_injection_stats") | |
| assert storage.get_injection_stats(days_back=30) == [] | |
| def test_get_injection_stats_empty_for_zero_days_back(self, storage): | |
| if not _backend_supports_usage_events(storage): | |
| pytest.skip("Backend does not implement get_injection_stats") | |
| # Defensive guard: zero/negative days_back returns [] (mirrors | |
| # get_playbook_application_stats). Pydantic at the API layer | |
| # already enforces gt=0, but storage should be robust on its own. | |
| assert storage.get_injection_stats(days_back=0) == [] | |
| assert storage.get_injection_stats(days_back=-1) == [] | |
| def test_get_injection_stats_aggregates_by_entity(self, storage): | |
| if not _backend_supports_usage_events(storage): | |
| pytest.skip("Backend does not implement get_injection_stats") | |
| # Two events for playbook 42 (one in s1, one in s2), one for profile p-99. | |
| for sid, rid in (("s1", "r1"), ("s2", "r2")): | |
| storage.record_usage_event( | |
| org_id=storage.org_id, | |
| event_name="learning_injection", | |
| event_category="application", | |
| entity_type="playbook", | |
| entity_id="42", | |
| caller_type="production_agent", | |
| session_id=sid, | |
| request_id=rid, | |
| prompt_tokens=5, | |
| ) | |
| storage.record_usage_event( | |
| org_id=storage.org_id, | |
| event_name="learning_injection", | |
| event_category="application", | |
| entity_type="profile", | |
| entity_id="p-99", | |
| caller_type="production_agent", | |
| session_id="s1", | |
| request_id="r1", | |
| prompt_tokens=3, | |
| ) | |
| stats = storage.get_injection_stats(days_back=30) | |
| assert len(stats) == 2 | |
| # Most-injected sorts first. | |
| top = stats[0] | |
| assert top.entity_type == "playbook" | |
| assert top.entity_id == "42" | |
| assert top.surfaced_count == 2 | |
| assert top.total_prompt_tokens == 10 | |
| assert top.distinct_session_count is None or top.distinct_session_count >= 1 | |
| # Profile row second. | |
| bottom = stats[1] | |
| assert bottom.entity_type == "profile" | |
| assert bottom.entity_id == "p-99" | |
| assert bottom.surfaced_count == 1 | |
| assert bottom.total_prompt_tokens == 3 | |
| def test_get_injection_stats_separates_user_and_agent_playbook_ids(self, storage): | |
| if not _backend_supports_usage_events(storage): | |
| pytest.skip("Backend does not implement get_injection_stats") | |
| for entity_type in ("user_playbook", "agent_playbook"): | |
| storage.record_usage_event( | |
| org_id=storage.org_id, | |
| event_name="learning_injection", | |
| event_category="application", | |
| entity_type=entity_type, | |
| entity_id="42", | |
| caller_type="production_agent", | |
| session_id=f"s-{entity_type}", | |
| request_id=f"r-{entity_type}", | |
| prompt_tokens=1, | |
| ) | |
| stats = storage.get_injection_stats(days_back=30) | |
| keys = {(s.entity_type, s.entity_id) for s in stats} | |
| assert ("user_playbook", "42") in keys | |
| assert ("agent_playbook", "42") in keys |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/server/services/storage/test_storage_contract_extras.py` around lines
247 - 330, Add a new test method in the test suite (following the pattern of
existing test methods like test_get_injection_stats_aggregates_by_entity) that
explicitly verifies the collision-regression fix for split playbook entity
types. The test should record multiple usage events with identical entity_id
values but different entity_type values (specifically "user_playbook" and
"agent_playbook"), then call get_injection_stats and assert that the returned
stats contain separate aggregation buckets for each entity_type/entity_id pair
(not collapsed into a single row). This ensures that the storage layer correctly
treats different entity types as distinct entities even when they share the same
entity_id value, preventing any regression in the collision handling logic.
Context
Memory is currently hard to operate: owners can add playbooks, but they cannot easily see which ones are repeatedly injected, which ones influence answers, or which ones are stale/noisy enough to review. The customer-facing goal is a Memory Health workflow where a data owner can:
Two-phase approach
This is change 1 of 2.
reflexioclaude-smartThis split keeps Reflexio focused on durable storage/API primitives while the follow-up PR owns the customer-facing workflow.
What this PR covers
usage_eventsstorage for per-entitylearning_injectiontelemetry.get_injection_statsthrough storage, lib, REST, and client.get_memory_reviewthrough storage, lib, REST, and client.user_id, unless they explicitly setinclude_all_users=true.stalehigh_cost_low_citeupdate_user_playbookwithstatusandplaybook_metadataso a reviewer can archive or annotate candidates.playbook_metadataonuser_playbooksand exposes it through domain/public/UI models.REFLEXIO_ENABLE_USAGE_EVENT_SINK=0opt-out.Deferred by design:
duplicatedetection: better handled as a periodic/batch job.supersedeablesignal: current aggregation logs key removed agent playbooks, not the user playbook ids this review queue returns.claude-smartdashboard/CLI UX: handled in phase 2.Validation
uv run ruff check <changed Python files>passed.uv run pyright <changed Python files>passed.38 passed.174 passed.git diff --checkpassed.Full-repo
pyrightis not clean on the current baseline; remaining failures are outside this patch and concentrated in existing e2e/test optional-member and mock-typing areas.Summary by CodeRabbit
Release Notes
New Features
Chores