diff --git a/src/context_engine/memory/db.py b/src/context_engine/memory/db.py index 8278a95..567788f 100644 --- a/src/context_engine/memory/db.py +++ b/src/context_engine/memory/db.py @@ -301,6 +301,12 @@ def connect(db_path: str | Path) -> sqlite3.Connection: # WAL gives concurrent readers (the dashboard) decent isolation while the # MCP server writes; no impact on single-process use. conn.execute("PRAGMA journal_mode = WAL") + # Without busy_timeout, any contention (auto-prune vs hot-path inserts, + # dashboard reads vs writes) returns SQLITE_BUSY immediately. 5 seconds + # is well past any expected single-statement contention and short enough + # that a stuck writer surfaces as a real error rather than silently + # waiting forever. + conn.execute("PRAGMA busy_timeout = 5000") has_vec = _try_load_vec(conn) _ensure_schema(conn, has_vec=has_vec) return conn diff --git a/src/context_engine/memory/hooks.py b/src/context_engine/memory/hooks.py index de384ad..5a576ae 100644 --- a/src/context_engine/memory/hooks.py +++ b/src/context_engine/memory/hooks.py @@ -43,6 +43,34 @@ def _conn(request: web.Request) -> sqlite3.Connection: return request.app["memory_db"] +def _ensure_session( + conn: sqlite3.Connection, + request: web.Request, + session_id: str, +) -> None: + """Backfill a `sessions` row for `session_id` if it doesn't already exist. + + The lifecycle invariant — \"SessionStart fires before UserPromptSubmit / + PostToolUse for the same session\" — breaks when `cce serve` was started + after Claude Code, when Claude Code resumes a prior session_id without + re-firing SessionStart, or when the SessionStart POST itself failed to + reach the hook server. The downstream INSERTs into `prompts` and + `tool_events` would then trip the FK on `sessions(id)` and crash the + handler. Inserting a placeholder session row here lets the rest of the + handler proceed; the row will be reconciled if a real SessionStart + arrives later (INSERT OR IGNORE), and the session_id remains queryable + in the meantime. + """ + project = request.app.get("project_name", "") + epoch = _now_epoch() + conn.execute( + "INSERT OR IGNORE INTO sessions " + "(id, project, started_at_epoch, started_at, status) " + "VALUES (?, ?, ?, ?, 'active')", + (session_id, project, epoch, _now_iso(epoch)), + ) + + _RESUME_RECENT_DECISIONS = 5 _RESUME_DECISION_REASON_CHARS = 200 @@ -213,6 +241,7 @@ async def handle_user_prompt_submit(request: web.Request) -> web.Response: conn = _conn(request) try: + _ensure_session(conn, request, session_id) if prompt_number is None: row = conn.execute( "SELECT COALESCE(MAX(prompt_number), 0) + 1 AS next " @@ -264,6 +293,7 @@ async def handle_post_tool_use(request: web.Request) -> web.Response: conn = _conn(request) try: + _ensure_session(conn, request, session_id) if prompt_number is None: row = conn.execute( "SELECT COALESCE(MAX(prompt_number), 0) AS cur FROM prompts " @@ -301,6 +331,7 @@ async def handle_stop(request: web.Request) -> web.Response: conn = _conn(request) try: + _ensure_session(conn, request, session_id) if prompt_number is None: row = conn.execute( "SELECT COALESCE(MAX(prompt_number), 0) AS cur FROM prompts " @@ -329,6 +360,7 @@ async def handle_session_end(request: web.Request) -> web.Response: conn = _conn(request) try: + _ensure_session(conn, request, session_id) epoch = _now_epoch() conn.execute( "UPDATE sessions SET status = 'completed', exit_reason = ?, " diff --git a/tests/memory/test_db.py b/tests/memory/test_db.py index e990bd4..2484daa 100644 --- a/tests/memory/test_db.py +++ b/tests/memory/test_db.py @@ -57,6 +57,20 @@ def test_foreign_keys_enabled(tmp_path: Path): conn.close() +def test_busy_timeout_set(tmp_path: Path): + """Without busy_timeout, the auto-prune background task and the hot + insert path can both throw `sqlite3.OperationalError: database is locked` + on contention (issue #49). Pin that connect() configures a non-zero + timeout so a single contended write retries instead of crashing.""" + db_path = tmp_path / "memory.db" + conn = memory_db.connect(db_path) + try: + timeout = conn.execute("PRAGMA busy_timeout").fetchone()[0] + assert timeout >= 1000, f"busy_timeout too low: {timeout}ms" + finally: + conn.close() + + def test_decisions_fts_search(tmp_path: Path): """A decision inserted into the parent table is searchable via fts.""" db_path = tmp_path / "memory.db" diff --git a/tests/memory/test_hooks.py b/tests/memory/test_hooks.py index cb10ac7..d9ab4d1 100644 --- a/tests/memory/test_hooks.py +++ b/tests/memory/test_hooks.py @@ -320,6 +320,94 @@ async def test_missing_session_id_returns_400(hook_app, aiohttp_client): assert resp.status == 400, f"{endpoint} should require session_id" +async def test_user_prompt_submit_without_prior_session_start_backfills_session( + hook_app, aiohttp_client, +): + """Regression for issue #49: when `cce serve` starts mid-Claude-Code session + or Claude Code resumes an old session_id, UserPromptSubmit can fire without + a preceding SessionStart. The handler must backfill the parent session row + instead of crashing on the FK to sessions(id).""" + app, conn = hook_app + client = await aiohttp_client(app) + # Note: NO SessionStart call. + resp = await client.post( + "/hooks/UserPromptSubmit", + json={"session_id": "orphan", "prompt_text": "hi"}, + ) + assert resp.status == 200, f"backfill failed: {await resp.text()}" + assert (await resp.json())["prompt_number"] == 1 + # Session row was created with project=demo (from app["project_name"]) and + # status=active, so subsequent SessionEnd / dashboard listings still work. + sess = conn.execute( + "SELECT id, project, status FROM sessions WHERE id = ?", ("orphan",), + ).fetchone() + assert sess is not None + assert sess["project"] == "demo" + assert sess["status"] == "active" + # Prompt was inserted normally. + prompts = list(conn.execute( + "SELECT prompt_number, prompt_text FROM prompts WHERE session_id = ?", + ("orphan",), + )) + assert prompts == [{"prompt_number": 1, "prompt_text": "hi"}] \ + if isinstance(prompts[0], dict) else len(prompts) == 1 + + +async def test_post_tool_use_without_prior_session_start_backfills_session( + hook_app, aiohttp_client, +): + """Same regression as above for PostToolUse — different table, same FK.""" + app, conn = hook_app + client = await aiohttp_client(app) + resp = await client.post( + "/hooks/PostToolUse", + json={ + "session_id": "orphan", + "tool_name": "Read", + "tool_input": {"file_path": "/tmp/x.py"}, + "tool_output": "x = 1\n", + }, + ) + assert resp.status == 200, f"backfill failed: {await resp.text()}" + sess = conn.execute( + "SELECT id FROM sessions WHERE id = ?", ("orphan",), + ).fetchone() + assert sess is not None + events = list(conn.execute( + "SELECT tool_name FROM tool_events WHERE session_id = ?", ("orphan",), + )) + assert len(events) == 1 + + +async def test_real_session_start_after_backfill_does_not_clobber( + hook_app, aiohttp_client, +): + """If SessionStart arrives after the session was backfilled (e.g. the + SessionStart POST was retried by Claude Code), INSERT OR IGNORE keeps the + original placeholder row and doesn't reset its started_at to the later + timestamp. The session keeps its real first-seen time.""" + app, conn = hook_app + client = await aiohttp_client(app) + # Backfill via UserPromptSubmit first. + await client.post( + "/hooks/UserPromptSubmit", + json={"session_id": "late-start", "prompt_text": "hi"}, + ) + placeholder_epoch = conn.execute( + "SELECT started_at_epoch FROM sessions WHERE id = ?", ("late-start",), + ).fetchone()["started_at_epoch"] + + # Real SessionStart arrives later with a different timestamp. + await client.post( + "/hooks/SessionStart", + json={"session_id": "late-start", "project": "demo", "started_at": 1700000000}, + ) + after = conn.execute( + "SELECT started_at_epoch FROM sessions WHERE id = ?", ("late-start",), + ).fetchone()["started_at_epoch"] + assert after == placeholder_epoch, "INSERT OR IGNORE should not overwrite" + + async def test_compression_queue_dedupes(hook_app, aiohttp_client): """Stop and the next UserPromptSubmit can both enqueue the same turn.""" app, conn = hook_app