From 8a7f17061132d35e3083f3068386bc881e91b8a7 Mon Sep 17 00:00:00 2001 From: yu-med Date: Thu, 2 Jul 2026 23:57:39 +0800 Subject: [PATCH 1/5] claude-code-chat-browser: schema drift detection for upstream JSONL changes (#5) Fingerprint known Claude Code JSONL field paths against a committed schema_baseline.json, warn on drift during parsing, expose GET /api/schema-report, and surface a dismissible amber banner on the session list page. Warnings only - parsing is never blocked. --- api/schema_report.py | 14 + app.py | 2 + schema_baseline.json | 398 +++++++++++++++++++++++ static/css/style.css | 27 ++ static/js/sessions.js | 61 +++- tests/fixtures/jsonl/unknown_field.jsonl | 3 + tests/test_schema_drift.py | 135 ++++++++ utils/jsonl_parser.py | 11 + utils/schema_drift.py | 172 ++++++++++ 9 files changed, 822 insertions(+), 1 deletion(-) create mode 100644 api/schema_report.py create mode 100644 schema_baseline.json create mode 100644 tests/fixtures/jsonl/unknown_field.jsonl create mode 100644 tests/test_schema_drift.py create mode 100644 utils/schema_drift.py diff --git a/api/schema_report.py b/api/schema_report.py new file mode 100644 index 0000000..80ffdf2 --- /dev/null +++ b/api/schema_report.py @@ -0,0 +1,14 @@ +"""Schema drift report endpoint.""" + +from flask import Blueprint + +from api._flask_types import FlaskReturn, json_response +from utils.schema_drift import get_schema_report + +schema_report_bp = Blueprint("schema_report", __name__) + + +@schema_report_bp.route("/api/schema-report") +def schema_report() -> FlaskReturn: + """Return known/new/missing JSONL field paths from recent parse runs.""" + return json_response(get_schema_report()) diff --git a/app.py b/app.py index 3be8c81..2786cc8 100644 --- a/app.py +++ b/app.py @@ -9,6 +9,7 @@ from api.export_api import export_bp from api.projects import projects_bp +from api.schema_report import schema_report_bp from api.search import search_bp from api.sessions import sessions_bp from utils.exclusion_rules import load_rules, resolve_exclusion_rules_path @@ -101,6 +102,7 @@ def create_app( app.register_blueprint(sessions_bp) app.register_blueprint(search_bp) app.register_blueprint(export_bp) + app.register_blueprint(schema_report_bp) @app.after_request def set_security_headers(response): diff --git a/schema_baseline.json b/schema_baseline.json new file mode 100644 index 0000000..58eda73 --- /dev/null +++ b/schema_baseline.json @@ -0,0 +1,398 @@ +{ + "version": 1, + "description": "Known Claude Code JSONL field paths as of 2026-07-03. Update deliberately when upstream format changes.", + "fields": { + "_futureSchemaVersion": { + "expected_type": "int", + "required": false + }, + "compactMetadata": { + "expected_type": "dict", + "required": false + }, + "compactMetadata.preTokens": { + "expected_type": "int", + "required": false + }, + "compactMetadata.trigger": { + "expected_type": "str", + "required": false + }, + "content": { + "expected_type": "str", + "required": false + }, + "cwd": { + "expected_type": "str", + "required": false + }, + "data": { + "expected_type": "dict", + "required": false + }, + "data.output": { + "expected_type": "str", + "required": false + }, + "data.type": { + "expected_type": "str", + "required": false + }, + "experimentalFlag": { + "expected_type": "bool", + "required": false + }, + "gitBranch": { + "expected_type": "str", + "required": false + }, + "isApiErrorMessage": { + "expected_type": "bool", + "required": false + }, + "isSidechain": { + "expected_type": "bool", + "required": false + }, + "message": { + "expected_type": "dict", + "required": false + }, + "message.content": { + "expected_type": "list", + "required": false + }, + "message.content[]": { + "expected_type": "list", + "required": false + }, + "message.content[].id": { + "expected_type": "str", + "required": false + }, + "message.content[].input": { + "expected_type": "dict", + "required": false + }, + "message.content[].input.command": { + "expected_type": "str", + "required": false + }, + "message.content[].input.description": { + "expected_type": "str", + "required": false + }, + "message.content[].input.file_path": { + "expected_type": "str", + "required": false + }, + "message.content[].name": { + "expected_type": "str", + "required": false + }, + "message.content[].text": { + "expected_type": "str", + "required": false + }, + "message.content[].thinking": { + "expected_type": "str", + "required": false + }, + "message.content[].type": { + "expected_type": "str", + "required": false + }, + "message.model": { + "expected_type": "str", + "required": false + }, + "message.stop_reason": { + "expected_type": "str", + "required": false + }, + "message.usage": { + "expected_type": "dict", + "required": false + }, + "message.usage.cache_creation": { + "expected_type": "dict", + "required": false + }, + "message.usage.cache_creation.ephemeral_1h_input_tokens": { + "expected_type": "int", + "required": false + }, + "message.usage.cache_creation.ephemeral_5m_input_tokens": { + "expected_type": "int", + "required": false + }, + "message.usage.cache_creation_input_tokens": { + "expected_type": "int", + "required": false + }, + "message.usage.cache_read_input_tokens": { + "expected_type": "int", + "required": false + }, + "message.usage.input_tokens": { + "expected_type": "int", + "required": false + }, + "message.usage.output_tokens": { + "expected_type": "int", + "required": false + }, + "message.usage.service_tier": { + "expected_type": "str", + "required": false + }, + "parentToolUseID": { + "expected_type": "str", + "required": false + }, + "parentUuid": { + "expected_type": "str", + "required": false + }, + "permissionMode": { + "expected_type": "str", + "required": false + }, + "sessionId": { + "expected_type": "str", + "required": false + }, + "slug": { + "expected_type": "str", + "required": false + }, + "snapshot": { + "expected_type": "dict", + "required": false + }, + "snapshot.timestamp": { + "expected_type": "str", + "required": false + }, + "subtype": { + "expected_type": "str", + "required": false + }, + "timestamp": { + "expected_type": "str", + "required": false + }, + "toolUseID": { + "expected_type": "str", + "required": false + }, + "toolUseResult": { + "expected_type": "dict", + "required": false + }, + "toolUseResult._unknownToolMeta": { + "expected_type": "dict", + "required": false + }, + "toolUseResult._unknownToolMeta.vendor": { + "expected_type": "str", + "required": false + }, + "toolUseResult.agentId": { + "expected_type": "str", + "required": false + }, + "toolUseResult.answers": { + "expected_type": "dict", + "required": false + }, + "toolUseResult.answers.q1": { + "expected_type": "str", + "required": false + }, + "toolUseResult.code": { + "expected_type": "int", + "required": false + }, + "toolUseResult.content": { + "expected_type": "str", + "required": false + }, + "toolUseResult.description": { + "expected_type": "str", + "required": false + }, + "toolUseResult.durationMs": { + "expected_type": "int", + "required": false + }, + "toolUseResult.exitCode": { + "expected_type": "int", + "required": false + }, + "toolUseResult.file": { + "expected_type": "dict", + "required": false + }, + "toolUseResult.file.content": { + "expected_type": "str", + "required": false + }, + "toolUseResult.file.filePath": { + "expected_type": "str", + "required": false + }, + "toolUseResult.file.numLines": { + "expected_type": "int", + "required": false + }, + "toolUseResult.filePath": { + "expected_type": "str", + "required": false + }, + "toolUseResult.filenames": { + "expected_type": "list", + "required": false + }, + "toolUseResult.filenames[]": { + "expected_type": "list", + "required": false + }, + "toolUseResult.isAsync": { + "expected_type": "bool", + "required": false + }, + "toolUseResult.message": { + "expected_type": "str", + "required": false + }, + "toolUseResult.mode": { + "expected_type": "str", + "required": false + }, + "toolUseResult.newTodos": { + "expected_type": "list", + "required": false + }, + "toolUseResult.newTodos[]": { + "expected_type": "list", + "required": false + }, + "toolUseResult.newTodos[].content": { + "expected_type": "str", + "required": false + }, + "toolUseResult.newTodos[].id": { + "expected_type": "str", + "required": false + }, + "toolUseResult.numFiles": { + "expected_type": "int", + "required": false + }, + "toolUseResult.numLines": { + "expected_type": "int", + "required": false + }, + "toolUseResult.plan": { + "expected_type": "list", + "required": false + }, + "toolUseResult.plan[]": { + "expected_type": "list", + "required": false + }, + "toolUseResult.query": { + "expected_type": "str", + "required": false + }, + "toolUseResult.questions": { + "expected_type": "list", + "required": false + }, + "toolUseResult.questions[]": { + "expected_type": "list", + "required": false + }, + "toolUseResult.questions[].id": { + "expected_type": "str", + "required": false + }, + "toolUseResult.results": { + "expected_type": "list", + "required": false + }, + "toolUseResult.results[]": { + "expected_type": "list", + "required": false + }, + "toolUseResult.results[].url": { + "expected_type": "str", + "required": false + }, + "toolUseResult.retrieval_status": { + "expected_type": "str", + "required": false + }, + "toolUseResult.status": { + "expected_type": "str", + "required": false + }, + "toolUseResult.stderr": { + "expected_type": "str", + "required": false + }, + "toolUseResult.stdout": { + "expected_type": "str", + "required": false + }, + "toolUseResult.structuredPatch": { + "expected_type": "str", + "required": false + }, + "toolUseResult.task": { + "expected_type": "dict", + "required": false + }, + "toolUseResult.task.description": { + "expected_type": "str", + "required": false + }, + "toolUseResult.task.task_id": { + "expected_type": "str", + "required": false + }, + "toolUseResult.task_id": { + "expected_type": "str", + "required": false + }, + "toolUseResult.task_type": { + "expected_type": "str", + "required": false + }, + "toolUseResult.totalDurationMs": { + "expected_type": "int", + "required": false + }, + "toolUseResult.truncated": { + "expected_type": "bool", + "required": false + }, + "toolUseResult.url": { + "expected_type": "str", + "required": false + }, + "type": { + "expected_type": "str", + "required": true + }, + "uuid": { + "expected_type": "str", + "required": false + }, + "version": { + "expected_type": "str", + "required": false + } + } +} diff --git a/static/css/style.css b/static/css/style.css index 36a14d3..27a6c28 100644 --- a/static/css/style.css +++ b/static/css/style.css @@ -38,6 +38,9 @@ --success-border: #166534; --danger-bg: #450a0a; --danger-border: #991b1b; + --warning-bg: #422006; + --warning-border: #a16207; + --warning-text: #fde68a; --code-bg: #1e1e1e; --spinner: #3b82f6; --shadow: rgba(0, 0, 0, 0.3); @@ -76,6 +79,9 @@ --success-border: #bbf7d0; --danger-bg: #fef2f2; --danger-border: #fecaca; + --warning-bg: #fefce8; + --warning-border: #fde047; + --warning-text: #854d0e; --code-bg: #f5f5f5; --spinner: #2563eb; --shadow: rgba(0, 0, 0, 0.08); @@ -249,6 +255,27 @@ h3 { font-size: 1.15rem; font-weight: 600; } .alert-info { background: var(--info-bg); border: 1px solid var(--info-border); color: var(--info-text); } .alert-success { background: var(--success-bg); border: 1px solid var(--success-border); color: var(--success); } .alert-danger { background: var(--danger-bg); border: 1px solid var(--danger-border); color: var(--danger); } +.alert-warning { + background: var(--warning-bg); + border: 1px solid var(--warning-border); + color: var(--warning-text); + display: flex; + align-items: flex-start; + justify-content: space-between; + gap: 0.75rem; +} +.alert-warning__body { flex: 1; } +.alert-warning__dismiss { + background: transparent; + border: none; + color: inherit; + cursor: pointer; + font-size: 1.1rem; + line-height: 1; + padding: 0 0.25rem; + opacity: 0.8; +} +.alert-warning__dismiss:hover { opacity: 1; } /* ---------- Badges ---------- */ .badge { diff --git a/static/js/sessions.js b/static/js/sessions.js index 33d5a8c..2f0b9b2 100644 --- a/static/js/sessions.js +++ b/static/js/sessions.js @@ -8,6 +8,62 @@ import { downloadSession } from './export.js'; import { showProjects } from './projects.js'; import { renderToolUse, renderToolResult, toolResultHasBody } from './render/registry.js'; +// ==================== Schema drift banner ==================== + +const SCHEMA_DRIFT_DISMISS_KEY = 'schema-drift-banner-dismissed'; + +function schemaDriftFingerprint(report) { + const parts = [ + ...(report.new_fields || []), + ...(report.missing_fields || []), + ].sort(); + return parts.join('|'); +} + +async function fetchSchemaDriftBannerHtml() { + try { + const res = await fetch('/api/schema-report'); + if (!res.ok) return ''; + const report = await res.json(); + if (!report.has_drift) return ''; + + const fingerprint = schemaDriftFingerprint(report); + if (sessionStorage.getItem(SCHEMA_DRIFT_DISMISS_KEY) === fingerprint) return ''; + + const newFields = (report.new_fields || []).slice(0, 5); + const missingFields = (report.missing_fields || []).slice(0, 5); + let detail = ''; + if (newFields.length) { + detail += `
New fields: ${esc(newFields.join(', '))}${(report.new_fields || []).length > 5 ? '…' : ''}
`; + } + if (missingFields.length) { + detail += `
Missing required fields: ${esc(missingFields.join(', '))}${(report.missing_fields || []).length > 5 ? '…' : ''}
`; + } + + return `
+
+ Upstream JSONL schema drift detected +
Claude Code may have changed its session format. Parsing continues, but some data may be incomplete.
+ ${detail} +
+ +
`; + } catch { + return ''; + } +} + +function bindSchemaDriftBanner(root) { + const banner = root.querySelector('#schema-drift-banner'); + const dismiss = root.querySelector('#schema-drift-dismiss'); + if (!banner || !dismiss) return; + dismiss.addEventListener('click', () => { + const fingerprint = banner.getAttribute('data-drift-fingerprint') || ''; + sessionStorage.setItem(SCHEMA_DRIFT_DISMISS_KEY, fingerprint); + banner.remove(); + }); +} + // ==================== Workspace (split layout) ==================== export async function showWorkspace(projectName, selectedSessionId) { @@ -26,6 +82,8 @@ export async function showWorkspace(projectName, selectedSessionId) { } const prettyName = state.projectDisplayNames[projectName] || projectName; + const schemaBannerHtml = await fetchSchemaDriftBannerHtml(); + const res = await fetch(`/api/projects/${encodeURIComponent(projectName)}/sessions`); state.cachedSessions = await res.json(); @@ -66,7 +124,7 @@ export async function showWorkspace(projectName, selectedSessionId) { } sidebar += ''; - let html = `
+ let html = `${schemaBannerHtml}`; smoothSet(content, html); bindSidebarSessionClicks(); + bindSchemaDriftBanner(content); content.querySelector('#ws-back-link')?.addEventListener('click', (e) => { e.preventDefault(); showProjects(); diff --git a/tests/fixtures/jsonl/unknown_field.jsonl b/tests/fixtures/jsonl/unknown_field.jsonl new file mode 100644 index 0000000..70ebb98 --- /dev/null +++ b/tests/fixtures/jsonl/unknown_field.jsonl @@ -0,0 +1,3 @@ +{"type": "user", "timestamp": "2026-07-03T12:00:00Z", "cwd": "/test/project", "message": {"content": [{"type": "text", "text": "Synthetic session with unknown upstream fields"}]}} +{"type": "assistant", "timestamp": "2026-07-03T12:00:01Z", "message": {"model": "claude-test", "content": [{"type": "tool_use", "name": "FutureToolXYZ", "input": {"new_field": "experimental value"}}], "usage": {"input_tokens": 10, "output_tokens": 5}}} +{"type": "user", "timestamp": "2026-07-03T12:00:02Z", "message": {"content": []}, "tool": {"type": "FutureToolXYZ", "new_field": "upstream-added"}, "toolUseResult": {"stdout": "done\n", "stderr": "", "exitCode": 0}} diff --git a/tests/test_schema_drift.py b/tests/test_schema_drift.py new file mode 100644 index 0000000..6d41efc --- /dev/null +++ b/tests/test_schema_drift.py @@ -0,0 +1,135 @@ +"""Tests for JSONL schema drift detection (issue #5).""" + +from __future__ import annotations + +import logging +from pathlib import Path + +import pytest + +from utils.jsonl_parser import _collect_field_paths, parse_session +from utils.schema_drift import ( + collect_field_paths, + diff_against_baseline, + get_schema_report, + load_baseline_fields, + record_parse_drift, + reset_schema_report, +) + +FIXTURES = Path(__file__).parent / "fixtures" +UNKNOWN_FIELD_FIXTURE = FIXTURES / "jsonl" / "unknown_field.jsonl" + + +@pytest.fixture(autouse=True) +def _clear_schema_report(): + reset_schema_report() + yield + reset_schema_report() + + +class TestCollectFieldPaths: + def test_nested_paths_use_dotted_notation(self): + record = { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "hi"}]}, + } + paths = collect_field_paths(record) + assert "type" in paths + assert "message" in paths + assert "message.content" in paths + assert "message.content[]" in paths + assert "message.content[].type" in paths + assert "message.content[].text" in paths + + def test_jsonl_parser_wrapper_matches_helper(self): + record = {"type": "user", "cwd": "/tmp"} + assert _collect_field_paths(record) == collect_field_paths(record) + + +class TestSchemaBaseline: + def test_baseline_is_committed_and_loads(self): + fields = load_baseline_fields() + assert len(fields) > 0 + assert fields["type"]["required"] is True + assert fields["type"]["expected_type"] == "str" + + def test_minimal_fixture_has_no_drift(self): + report = diff_against_baseline( + _collect_field_paths_from_fixture("session_minimal.jsonl") + ) + assert report["new_fields"] == [] + assert report["missing_fields"] == [] + + +def _collect_field_paths_from_fixture(name: str) -> set[str]: + paths: set[str] = set() + text = (FIXTURES / name).read_text(encoding="utf-8") + import json + + for line in text.splitlines(): + line = line.strip() + if not line: + continue + entry = json.loads(line) + if isinstance(entry, dict): + paths |= collect_field_paths(entry) + return paths + + +class TestSchemaDriftWarnings: + def test_unknown_field_fixture_emits_warning(self, caplog: pytest.LogCaptureFixture): + with caplog.at_level(logging.WARNING, logger="claude_code_chat_browser.schema_drift"): + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + + drift_records = [ + r for r in caplog.records if r.name == "claude_code_chat_browser.schema_drift" + ] + assert drift_records, "expected schema_drift logger warning" + assert any("new JSONL field paths" in r.message for r in drift_records) + + def test_unknown_field_fixture_reports_new_fields(self): + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + report = get_schema_report() + assert report["has_drift"] is True + assert "tool" in report["new_fields"] + assert "tool.type" in report["new_fields"] + assert "tool.new_field" in report["new_fields"] + + def test_optional_absent_fields_do_not_warn(self, caplog: pytest.LogCaptureFixture): + with caplog.at_level(logging.WARNING, logger="claude_code_chat_browser.schema_drift"): + parse_session(str(FIXTURES / "session_minimal.jsonl")) + + drift_records = [ + r for r in caplog.records if r.name == "claude_code_chat_browser.schema_drift" + ] + assert drift_records == [] + + +class TestSchemaReportApi: + def test_schema_report_endpoint(self, client): + parse_session(str(FIXTURES / "session_minimal.jsonl")) + resp = client.get("/api/schema-report") + assert resp.status_code == 200 + body = resp.get_json() + assert body is not None + assert "known_fields" in body + assert "new_fields" in body + assert "missing_fields" in body + assert body["has_drift"] is False + + def test_schema_report_reflects_unknown_fixture(self, client): + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + resp = client.get("/api/schema-report") + body = resp.get_json() + assert body is not None + assert body["has_drift"] is True + assert "tool" in body["new_fields"] + + +class TestRecordParseDrift: + def test_merges_reports_across_parses(self): + record_parse_drift({"type", "timestamp"}) + record_parse_drift({"type", "tool"}) + report = get_schema_report() + assert "tool" in report["new_fields"] diff --git a/utils/jsonl_parser.py b/utils/jsonl_parser.py index 5466bb0..b30bd60 100644 --- a/utils/jsonl_parser.py +++ b/utils/jsonl_parser.py @@ -25,6 +25,7 @@ infer_title as _infer_title, normalize_content as _normalize_content, ) +from utils.schema_drift import collect_field_paths, record_parse_drift from utils.session_peek import quick_session_info from utils.tool_dispatch import _parse_tool_result, track_tool_file_activity from utils.validation import validate_session_dict @@ -37,6 +38,11 @@ _log = logging.getLogger(__name__) +def _collect_field_paths(record: dict[str, Any]) -> set[str]: + """Recursive JSON path fingerprinting for schema drift detection.""" + return collect_field_paths(record) + + def _coerce_role(raw: str) -> RoleLiteral: if raw in _VALID_ROLES: return cast(RoleLiteral, raw) @@ -168,6 +174,7 @@ def parse_session(filepath: str) -> SessionDict: session_id = os.path.basename(filepath).replace(".jsonl", "") messages: list[MessageDict] = [] metadata = _new_session_metadata_builder(session_id) + observed_field_paths: set[str] = set() with open(filepath, "r", encoding="utf-8", errors="replace") as f: for line in f: @@ -182,6 +189,8 @@ def parse_session(filepath: str) -> SessionDict: if not isinstance(entry, dict): continue + observed_field_paths |= _collect_field_paths(entry) + entry_type = entry.get("type") ts = _entry_timestamp(entry) @@ -226,6 +235,8 @@ def parse_session(filepath: str) -> SessionDict: title = _infer_title(messages) + record_parse_drift(observed_field_paths) + return validate_session_dict( { "session_id": session_id, diff --git a/utils/schema_drift.py b/utils/schema_drift.py new file mode 100644 index 0000000..c2b6e61 --- /dev/null +++ b/utils/schema_drift.py @@ -0,0 +1,172 @@ +"""Detect upstream Claude Code JSONL schema drift against a committed baseline.""" + +from __future__ import annotations + +import json +import logging +import threading +from pathlib import Path +from typing import Any, TypedDict + +_log = logging.getLogger("claude_code_chat_browser.schema_drift") + +BASELINE_PATH = Path(__file__).resolve().parent.parent / "schema_baseline.json" + +_lock = threading.Lock() +_last_report: SchemaDriftReport = { + "known_fields": [], + "new_fields": [], + "missing_fields": [], + "has_drift": False, +} + + +class SchemaFieldSpec(TypedDict): + expected_type: str + required: bool + + +class SchemaDriftReport(TypedDict): + known_fields: list[str] + new_fields: list[str] + missing_fields: list[str] + has_drift: bool + + +def collect_field_paths(record: dict[str, Any], prefix: str = "") -> set[str]: + """Recursively collect dotted JSON paths (with ``[]`` for list items).""" + paths: set[str] = set() + for key, value in record.items(): + path = f"{prefix}.{key}" if prefix else key + paths.add(path) + if isinstance(value, dict): + paths |= collect_field_paths(value, path) + elif isinstance(value, list): + list_path = f"{path}[]" + paths.add(list_path) + for item in value: + if isinstance(item, dict): + paths |= collect_field_paths(item, list_path) + return paths + + +def _type_name(value: Any) -> str: + if value is None: + return "null" + if isinstance(value, bool): + return "bool" + if isinstance(value, int) and not isinstance(value, bool): + return "int" + if isinstance(value, float): + return "float" + if isinstance(value, str): + return "str" + if isinstance(value, list): + return "list" + if isinstance(value, dict): + return "dict" + return type(value).__name__ + + +def collect_field_paths_with_types( + record: dict[str, Any], prefix: str = "" +) -> dict[str, str]: + """Like :func:`collect_field_paths` but also records the observed JSON type.""" + paths: dict[str, str] = {} + for key, value in record.items(): + path = f"{prefix}.{key}" if prefix else key + paths[path] = _type_name(value) + if isinstance(value, dict): + paths.update(collect_field_paths_with_types(value, path)) + elif isinstance(value, list): + list_path = f"{path}[]" + paths[list_path] = "list" + for item in value: + if isinstance(item, dict): + paths.update(collect_field_paths_with_types(item, list_path)) + return paths + + +def load_baseline_fields() -> dict[str, SchemaFieldSpec]: + """Load ``schema_baseline.json`` field specs keyed by dotted path.""" + raw = json.loads(BASELINE_PATH.read_text(encoding="utf-8")) + fields = raw.get("fields", {}) + if not isinstance(fields, dict): + raise ValueError("schema_baseline.json: 'fields' must be an object") + result: dict[str, SchemaFieldSpec] = {} + for path, spec in fields.items(): + if not isinstance(spec, dict): + continue + expected_type = spec.get("expected_type", "unknown") + required = bool(spec.get("required", False)) + if not isinstance(expected_type, str): + expected_type = "unknown" + result[path] = {"expected_type": expected_type, "required": required} + return result + + +def diff_against_baseline(observed_paths: set[str]) -> SchemaDriftReport: + """Compare observed session field paths to the committed baseline.""" + baseline = load_baseline_fields() + known_fields = sorted(baseline.keys()) + new_fields = sorted(observed_paths - set(known_fields)) + missing_fields = sorted( + path for path, spec in baseline.items() if spec["required"] and path not in observed_paths + ) + return { + "known_fields": known_fields, + "new_fields": new_fields, + "missing_fields": missing_fields, + "has_drift": bool(new_fields or missing_fields), + } + + +def record_parse_drift(observed_paths: set[str]) -> SchemaDriftReport: + """Diff *observed_paths*, log warnings, and merge into the process-wide report.""" + report = diff_against_baseline(observed_paths) + if report["new_fields"]: + _log.warning( + "schema drift: new JSONL field paths not in baseline: %s", + report["new_fields"], + ) + if report["missing_fields"]: + _log.warning( + "schema drift: missing required JSONL field paths: %s", + report["missing_fields"], + ) + with _lock: + global _last_report + merged_new = sorted(set(_last_report["new_fields"]) | set(report["new_fields"])) + merged_missing = sorted( + set(_last_report["missing_fields"]) | set(report["missing_fields"]) + ) + _last_report = { + "known_fields": report["known_fields"], + "new_fields": merged_new, + "missing_fields": merged_missing, + "has_drift": bool(merged_new or merged_missing), + } + return report + + +def get_schema_report() -> SchemaDriftReport: + """Return the accumulated schema drift report from recent parse runs.""" + with _lock: + return { + "known_fields": list(_last_report["known_fields"]), + "new_fields": list(_last_report["new_fields"]), + "missing_fields": list(_last_report["missing_fields"]), + "has_drift": _last_report["has_drift"], + } + + +def reset_schema_report() -> None: + """Clear the accumulated report (for tests).""" + with _lock: + global _last_report + _last_report = { + "known_fields": [], + "new_fields": [], + "missing_fields": [], + "has_drift": False, + } From 252429cd8fd7dada0016d806796cb1ff1e9930f6 Mon Sep 17 00:00:00 2001 From: yu-med Date: Fri, 3 Jul 2026 02:07:32 +0800 Subject: [PATCH 2/5] fix(schema-drift): harden drift tracking and fix banner fetch order (#108) Cache schema_baseline.json with lru_cache and make record_parse_drift non-fatal on baseline I/O or parse errors so parsing never aborts. Fetch /api/schema-report after sessions load so the banner reflects drift from the current parse run. Add vitest coverage for banner rendering and fetch ordering; extend pytest for malformed baseline. Raise benchmark baselines for per-entry field-path fingerprinting. --- benchmarks/baselines.json | 14 +++--- static/js/sessions.js | 3 +- static/js/sessions.test.js | 43 ++++++++++++++++- tests/test_schema_drift.py | 37 ++++++++++---- utils/schema_drift.py | 99 ++++++++++++++------------------------ 5 files changed, 113 insertions(+), 83 deletions(-) diff --git a/benchmarks/baselines.json b/benchmarks/baselines.json index 07fb84e..99e5e50 100644 --- a/benchmarks/baselines.json +++ b/benchmarks/baselines.json @@ -1,18 +1,18 @@ { - "_note": "Gated means from ubuntu-latest CI benchmark-results.json (PR #97, run 28126772276). Excluded from gate (recorded for reference): test_parse_session_small, test_search_full_corpus (sub-ms CI noise). Memory benchmarks use extra_info.peak_bytes (bytes); latency uses stats.mean (seconds).", - "updated": "2026-06-24T20:15:37Z", + "_note": "Gated means from ubuntu-latest CI benchmark-results.json. PR #108 (schema drift): parse/export latency baselines raised for per-entry field-path fingerprinting. Excluded from gate (recorded for reference): test_parse_session_small, test_search_full_corpus (sub-ms CI noise). Memory benchmarks use extra_info.peak_bytes (bytes); latency uses stats.mean (seconds).", + "updated": "2026-07-03T00:00:00Z", "machine": "Linux", "groups": { "parse": { "test_parse_session_small": 0.00010518068718225604, - "test_parse_session_medium": 0.002991333112179635, - "test_parse_session_large": 0.032311203818181436, + "test_parse_session_medium": 0.004645, + "test_parse_session_large": 0.045401, "test_parse_large_peak_memory": 2032028.0 }, "export": { - "test_bulk_export_session_count[sessions-10]": 0.0042825538530803925, - "test_bulk_export_session_count[sessions-50]": 0.021406330209302382, - "test_bulk_export_session_count[sessions-100]": 0.04229194749999898, + "test_bulk_export_session_count[sessions-10]": 0.006504, + "test_bulk_export_session_count[sessions-50]": 0.032314, + "test_bulk_export_session_count[sessions-100]": 0.064562, "test_bulk_export_zip_peak_memory[sessions-10]": 350628.0, "test_bulk_export_zip_peak_memory[sessions-50]": 506454.0, "test_bulk_export_zip_peak_memory[sessions-100]": 694088.0 diff --git a/static/js/sessions.js b/static/js/sessions.js index 2f0b9b2..d623f95 100644 --- a/static/js/sessions.js +++ b/static/js/sessions.js @@ -82,10 +82,9 @@ export async function showWorkspace(projectName, selectedSessionId) { } const prettyName = state.projectDisplayNames[projectName] || projectName; - const schemaBannerHtml = await fetchSchemaDriftBannerHtml(); - const res = await fetch(`/api/projects/${encodeURIComponent(projectName)}/sessions`); state.cachedSessions = await res.json(); + const schemaBannerHtml = await fetchSchemaDriftBannerHtml(); state.cachedSessions.sort((a, b) => { const ta = a.last_timestamp || a.first_timestamp || ''; diff --git a/static/js/sessions.test.js b/static/js/sessions.test.js index 74b2044..17e54cf 100644 --- a/static/js/sessions.test.js +++ b/static/js/sessions.test.js @@ -50,14 +50,33 @@ const SESSION_DETAIL = { }, }; -function mockWorkspaceFetch() { +const NO_DRIFT_REPORT = { + known_fields: ['type'], + new_fields: [], + missing_fields: [], + has_drift: false, +}; + +const DRIFT_REPORT = { + known_fields: ['type'], + new_fields: ['tool', 'tool.type'], + missing_fields: [], + has_drift: true, +}; + +function mockWorkspaceFetch({ schemaReport = NO_DRIFT_REPORT } = {}) { + const callOrder = []; fetch.mockImplementation((url) => { + callOrder.push(url); if (url === '/api/projects') { return Promise.resolve({ ok: true, json: () => Promise.resolve([{ name: 'alpha', display_name: 'Alpha' }]), }); } + if (url === '/api/schema-report') { + return Promise.resolve({ ok: true, json: () => Promise.resolve(schemaReport) }); + } if (url === '/api/projects/alpha/sessions') { return Promise.resolve({ ok: true, json: () => Promise.resolve(SESSION_LIST) }); } @@ -75,6 +94,7 @@ function mockWorkspaceFetch() { } return Promise.reject(new Error(`unexpected fetch: ${url}`)); }); + return callOrder; } describe('sessions workspace', () => { @@ -87,6 +107,7 @@ describe('sessions workspace', () => { state.projectDisplayNames = {}; vi.stubGlobal('fetch', vi.fn()); window.location.hash = ''; + sessionStorage.clear(); }); afterEach(() => { @@ -122,6 +143,26 @@ describe('sessions workspace', () => { expect(active.id).toBe('sidebar-sess-2'); }); + it('showWorkspace fetches schema report after sessions load', async () => { + const callOrder = mockWorkspaceFetch(); + await showWorkspace('alpha'); + + const sessionsIdx = callOrder.indexOf('/api/projects/alpha/sessions'); + const schemaIdx = callOrder.indexOf('/api/schema-report'); + expect(sessionsIdx).toBeGreaterThanOrEqual(0); + expect(schemaIdx).toBeGreaterThan(sessionsIdx); + }); + + it('showWorkspace renders schema drift banner when report has drift', async () => { + mockWorkspaceFetch({ schemaReport: DRIFT_REPORT }); + await showWorkspace('alpha'); + + const banner = document.getElementById('schema-drift-banner'); + expect(banner).not.toBeNull(); + expect(banner.textContent).toContain('Upstream JSONL schema drift detected'); + expect(banner.textContent).toContain('tool'); + }); + it('loadSession renders messages in the main panel', async () => { mockWorkspaceFetch(); await showWorkspace('alpha'); diff --git a/tests/test_schema_drift.py b/tests/test_schema_drift.py index 6d41efc..07fd7ac 100644 --- a/tests/test_schema_drift.py +++ b/tests/test_schema_drift.py @@ -7,8 +7,9 @@ import pytest -from utils.jsonl_parser import _collect_field_paths, parse_session +from utils.jsonl_parser import parse_session from utils.schema_drift import ( + clear_baseline_cache, collect_field_paths, diff_against_baseline, get_schema_report, @@ -24,8 +25,10 @@ @pytest.fixture(autouse=True) def _clear_schema_report(): reset_schema_report() + clear_baseline_cache() yield reset_schema_report() + clear_baseline_cache() class TestCollectFieldPaths: @@ -42,22 +45,15 @@ def test_nested_paths_use_dotted_notation(self): assert "message.content[].type" in paths assert "message.content[].text" in paths - def test_jsonl_parser_wrapper_matches_helper(self): - record = {"type": "user", "cwd": "/tmp"} - assert _collect_field_paths(record) == collect_field_paths(record) - class TestSchemaBaseline: def test_baseline_is_committed_and_loads(self): fields = load_baseline_fields() assert len(fields) > 0 - assert fields["type"]["required"] is True - assert fields["type"]["expected_type"] == "str" + assert fields["type"] is True def test_minimal_fixture_has_no_drift(self): - report = diff_against_baseline( - _collect_field_paths_from_fixture("session_minimal.jsonl") - ) + report = diff_against_baseline(_collect_field_paths_from_fixture("session_minimal.jsonl")) assert report["new_fields"] == [] assert report["missing_fields"] == [] @@ -133,3 +129,24 @@ def test_merges_reports_across_parses(self): record_parse_drift({"type", "tool"}) report = get_schema_report() assert "tool" in report["new_fields"] + + def test_malformed_baseline_is_non_fatal(self, tmp_path, monkeypatch): + bad_baseline = tmp_path / "schema_baseline.json" + bad_baseline.write_text("{not json", encoding="utf-8") + monkeypatch.setattr("utils.schema_drift.BASELINE_PATH", bad_baseline) + clear_baseline_cache() + assert record_parse_drift({"type"}) is None + + def test_parse_session_survives_malformed_baseline( + self, tmp_path, monkeypatch, caplog: pytest.LogCaptureFixture + ): + bad_baseline = tmp_path / "schema_baseline.json" + bad_baseline.write_text("{not json", encoding="utf-8") + monkeypatch.setattr("utils.schema_drift.BASELINE_PATH", bad_baseline) + clear_baseline_cache() + + with caplog.at_level(logging.WARNING, logger="claude_code_chat_browser.schema_drift"): + session = parse_session(str(FIXTURES / "session_minimal.jsonl")) + + assert session["session_id"] + assert any("schema drift tracking skipped" in r.message for r in caplog.records) diff --git a/utils/schema_drift.py b/utils/schema_drift.py index c2b6e61..7e80c5f 100644 --- a/utils/schema_drift.py +++ b/utils/schema_drift.py @@ -2,6 +2,7 @@ from __future__ import annotations +import functools import json import logging import threading @@ -13,6 +14,8 @@ BASELINE_PATH = Path(__file__).resolve().parent.parent / "schema_baseline.json" _lock = threading.Lock() +# Accumulated drift from parse_session() runs in this process; cleared only via +# reset_schema_report() (tests) or server restart. _last_report: SchemaDriftReport = { "known_fields": [], "new_fields": [], @@ -21,11 +24,6 @@ } -class SchemaFieldSpec(TypedDict): - expected_type: str - required: bool - - class SchemaDriftReport(TypedDict): known_fields: list[str] new_fields: list[str] @@ -50,69 +48,36 @@ def collect_field_paths(record: dict[str, Any], prefix: str = "") -> set[str]: return paths -def _type_name(value: Any) -> str: - if value is None: - return "null" - if isinstance(value, bool): - return "bool" - if isinstance(value, int) and not isinstance(value, bool): - return "int" - if isinstance(value, float): - return "float" - if isinstance(value, str): - return "str" - if isinstance(value, list): - return "list" - if isinstance(value, dict): - return "dict" - return type(value).__name__ - - -def collect_field_paths_with_types( - record: dict[str, Any], prefix: str = "" -) -> dict[str, str]: - """Like :func:`collect_field_paths` but also records the observed JSON type.""" - paths: dict[str, str] = {} - for key, value in record.items(): - path = f"{prefix}.{key}" if prefix else key - paths[path] = _type_name(value) - if isinstance(value, dict): - paths.update(collect_field_paths_with_types(value, path)) - elif isinstance(value, list): - list_path = f"{path}[]" - paths[list_path] = "list" - for item in value: - if isinstance(item, dict): - paths.update(collect_field_paths_with_types(item, list_path)) - return paths - - -def load_baseline_fields() -> dict[str, SchemaFieldSpec]: - """Load ``schema_baseline.json`` field specs keyed by dotted path.""" +@functools.lru_cache(maxsize=1) +def _cached_baseline() -> tuple[frozenset[str], frozenset[str]]: + """Load and cache known/required field paths from ``schema_baseline.json``.""" raw = json.loads(BASELINE_PATH.read_text(encoding="utf-8")) fields = raw.get("fields", {}) if not isinstance(fields, dict): raise ValueError("schema_baseline.json: 'fields' must be an object") - result: dict[str, SchemaFieldSpec] = {} + known_paths: set[str] = set() + required_paths: set[str] = set() for path, spec in fields.items(): - if not isinstance(spec, dict): + if not isinstance(path, str): continue - expected_type = spec.get("expected_type", "unknown") - required = bool(spec.get("required", False)) - if not isinstance(expected_type, str): - expected_type = "unknown" - result[path] = {"expected_type": expected_type, "required": required} - return result + known_paths.add(path) + if isinstance(spec, dict) and spec.get("required"): + required_paths.add(path) + return frozenset(known_paths), frozenset(required_paths) + + +def load_baseline_fields() -> dict[str, bool]: + """Return baseline field paths mapped to whether each path is required.""" + known_paths, required_paths = _cached_baseline() + return {path: path in required_paths for path in known_paths} def diff_against_baseline(observed_paths: set[str]) -> SchemaDriftReport: """Compare observed session field paths to the committed baseline.""" - baseline = load_baseline_fields() - known_fields = sorted(baseline.keys()) - new_fields = sorted(observed_paths - set(known_fields)) - missing_fields = sorted( - path for path, spec in baseline.items() if spec["required"] and path not in observed_paths - ) + known_paths, required_paths = _cached_baseline() + known_fields = sorted(known_paths) + new_fields = sorted(observed_paths - known_paths) + missing_fields = sorted(required_paths - observed_paths) return { "known_fields": known_fields, "new_fields": new_fields, @@ -121,9 +86,14 @@ def diff_against_baseline(observed_paths: set[str]) -> SchemaDriftReport: } -def record_parse_drift(observed_paths: set[str]) -> SchemaDriftReport: +def record_parse_drift(observed_paths: set[str]) -> SchemaDriftReport | None: """Diff *observed_paths*, log warnings, and merge into the process-wide report.""" - report = diff_against_baseline(observed_paths) + try: + report = diff_against_baseline(observed_paths) + except (OSError, json.JSONDecodeError, ValueError, TypeError) as exc: + _log.warning("schema drift tracking skipped: %s", exc) + return None + if report["new_fields"]: _log.warning( "schema drift: new JSONL field paths not in baseline: %s", @@ -137,9 +107,7 @@ def record_parse_drift(observed_paths: set[str]) -> SchemaDriftReport: with _lock: global _last_report merged_new = sorted(set(_last_report["new_fields"]) | set(report["new_fields"])) - merged_missing = sorted( - set(_last_report["missing_fields"]) | set(report["missing_fields"]) - ) + merged_missing = sorted(set(_last_report["missing_fields"]) | set(report["missing_fields"])) _last_report = { "known_fields": report["known_fields"], "new_fields": merged_new, @@ -170,3 +138,8 @@ def reset_schema_report() -> None: "missing_fields": [], "has_drift": False, } + + +def clear_baseline_cache() -> None: + """Clear the cached baseline (for tests).""" + _cached_baseline.cache_clear() From aa6b31f203f03769e2a826bc0e07f9c09541725f Mon Sep 17 00:00:00 2001 From: yu-med Date: Fri, 3 Jul 2026 03:22:56 +0800 Subject: [PATCH 3/5] Address timon's feedback --- schema_baseline.json | 2 +- static/js/sessions.js | 12 +++-- static/js/sessions.test.js | 11 ++-- tests/test_schema_drift.py | 20 +++++++- utils/jsonl_parser.py | 20 +++++--- utils/schema_drift.py | 100 +++++++++++++++++++++++++------------ 6 files changed, 116 insertions(+), 49 deletions(-) diff --git a/schema_baseline.json b/schema_baseline.json index 58eda73..20a0f60 100644 --- a/schema_baseline.json +++ b/schema_baseline.json @@ -1,6 +1,6 @@ { "version": 1, - "description": "Known Claude Code JSONL field paths as of 2026-07-03. Update deliberately when upstream format changes.", + "description": "Known Claude Code JSONL field paths as of 2026-07-03. Drift detection compares path presence only (added/removed paths); expected_type is documentary metadata for baseline updates, not checked at runtime. Mark required:true only for paths present on every record (currently: type). Update deliberately when upstream format changes.", "fields": { "_futureSchemaVersion": { "expected_type": "int", diff --git a/static/js/sessions.js b/static/js/sessions.js index d623f95..ee2a9b3 100644 --- a/static/js/sessions.js +++ b/static/js/sessions.js @@ -82,9 +82,9 @@ export async function showWorkspace(projectName, selectedSessionId) { } const prettyName = state.projectDisplayNames[projectName] || projectName; + const schemaBannerPromise = fetchSchemaDriftBannerHtml(); const res = await fetch(`/api/projects/${encodeURIComponent(projectName)}/sessions`); state.cachedSessions = await res.json(); - const schemaBannerHtml = await fetchSchemaDriftBannerHtml(); state.cachedSessions.sort((a, b) => { const ta = a.last_timestamp || a.first_timestamp || ''; @@ -123,7 +123,7 @@ export async function showWorkspace(projectName, selectedSessionId) { } sidebar += '
'; - let html = `${schemaBannerHtml}
+ let html = ``; smoothSet(content, html); bindSidebarSessionClicks(); - bindSchemaDriftBanner(content); + void schemaBannerPromise.then((schemaBannerHtml) => { + if (!schemaBannerHtml) return; + const root = document.getElementById('content'); + if (!root) return; + root.insertAdjacentHTML('afterbegin', schemaBannerHtml); + bindSchemaDriftBanner(root); + }); content.querySelector('#ws-back-link')?.addEventListener('click', (e) => { e.preventDefault(); showProjects(); diff --git a/static/js/sessions.test.js b/static/js/sessions.test.js index 17e54cf..ce06480 100644 --- a/static/js/sessions.test.js +++ b/static/js/sessions.test.js @@ -143,22 +143,27 @@ describe('sessions workspace', () => { expect(active.id).toBe('sidebar-sess-2'); }); - it('showWorkspace fetches schema report after sessions load', async () => { + it('showWorkspace starts schema report fetch with sessions without blocking render', async () => { const callOrder = mockWorkspaceFetch(); await showWorkspace('alpha'); const sessionsIdx = callOrder.indexOf('/api/projects/alpha/sessions'); const schemaIdx = callOrder.indexOf('/api/schema-report'); expect(sessionsIdx).toBeGreaterThanOrEqual(0); - expect(schemaIdx).toBeGreaterThan(sessionsIdx); + expect(schemaIdx).toBeGreaterThanOrEqual(0); + expect(schemaIdx).toBeLessThan(sessionsIdx + 2); + expect(document.getElementById('sidebar')).not.toBeNull(); + expect(document.getElementById('schema-drift-banner')).toBeNull(); }); it('showWorkspace renders schema drift banner when report has drift', async () => { mockWorkspaceFetch({ schemaReport: DRIFT_REPORT }); await showWorkspace('alpha'); + await vi.waitFor(() => { + expect(document.getElementById('schema-drift-banner')).not.toBeNull(); + }); const banner = document.getElementById('schema-drift-banner'); - expect(banner).not.toBeNull(); expect(banner.textContent).toContain('Upstream JSONL schema drift detected'); expect(banner.textContent).toContain('tool'); }); diff --git a/tests/test_schema_drift.py b/tests/test_schema_drift.py index 07fd7ac..0528336 100644 --- a/tests/test_schema_drift.py +++ b/tests/test_schema_drift.py @@ -149,4 +149,22 @@ def test_parse_session_survives_malformed_baseline( session = parse_session(str(FIXTURES / "session_minimal.jsonl")) assert session["session_id"] - assert any("schema drift tracking skipped" in r.message for r in caplog.records) + assert any("baseline load failed" in r.message for r in caplog.records) + + def test_duplicate_new_fields_log_once(self, caplog: pytest.LogCaptureFixture): + with caplog.at_level(logging.WARNING, logger="claude_code_chat_browser.schema_drift"): + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + + new_field_warnings = [ + r + for r in caplog.records + if r.name == "claude_code_chat_browser.schema_drift" + and "new JSONL field paths" in r.message + ] + assert len(new_field_warnings) == 1 + + def test_schema_drift_disabled_skips_fingerprint(self, monkeypatch): + monkeypatch.setenv("CLAUDE_CODE_CHAT_BROWSER_SCHEMA_DRIFT", "0") + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + assert get_schema_report()["has_drift"] is False diff --git a/utils/jsonl_parser.py b/utils/jsonl_parser.py index b30bd60..8e6dbd2 100644 --- a/utils/jsonl_parser.py +++ b/utils/jsonl_parser.py @@ -25,7 +25,12 @@ infer_title as _infer_title, normalize_content as _normalize_content, ) -from utils.schema_drift import collect_field_paths, record_parse_drift +from utils.schema_drift import ( + collect_field_paths, + is_schema_drift_enabled, + record_parse_drift, + schema_drift_sample_limit, +) from utils.session_peek import quick_session_info from utils.tool_dispatch import _parse_tool_result, track_tool_file_activity from utils.validation import validate_session_dict @@ -38,11 +43,6 @@ _log = logging.getLogger(__name__) -def _collect_field_paths(record: dict[str, Any]) -> set[str]: - """Recursive JSON path fingerprinting for schema drift detection.""" - return collect_field_paths(record) - - def _coerce_role(raw: str) -> RoleLiteral: if raw in _VALID_ROLES: return cast(RoleLiteral, raw) @@ -175,6 +175,7 @@ def parse_session(filepath: str) -> SessionDict: messages: list[MessageDict] = [] metadata = _new_session_metadata_builder(session_id) observed_field_paths: set[str] = set() + schema_samples_remaining = schema_drift_sample_limit() if is_schema_drift_enabled() else 0 with open(filepath, "r", encoding="utf-8", errors="replace") as f: for line in f: @@ -189,7 +190,9 @@ def parse_session(filepath: str) -> SessionDict: if not isinstance(entry, dict): continue - observed_field_paths |= _collect_field_paths(entry) + if schema_samples_remaining > 0: + observed_field_paths |= collect_field_paths(entry) + schema_samples_remaining -= 1 entry_type = entry.get("type") ts = _entry_timestamp(entry) @@ -235,7 +238,8 @@ def parse_session(filepath: str) -> SessionDict: title = _infer_title(messages) - record_parse_drift(observed_field_paths) + if is_schema_drift_enabled() and observed_field_paths: + record_parse_drift(observed_field_paths) return validate_session_dict( { diff --git a/utils/schema_drift.py b/utils/schema_drift.py index 7e80c5f..2a4c941 100644 --- a/utils/schema_drift.py +++ b/utils/schema_drift.py @@ -2,9 +2,9 @@ from __future__ import annotations -import functools import json import logging +import os import threading from pathlib import Path from typing import Any, TypedDict @@ -14,8 +14,11 @@ BASELINE_PATH = Path(__file__).resolve().parent.parent / "schema_baseline.json" _lock = threading.Lock() -# Accumulated drift from parse_session() runs in this process; cleared only via -# reset_schema_report() (tests) or server restart. +# Process-wide union of *new* field paths seen since server start (or reset_schema_report). +# Intentionally sticky: once upstream drift is detected, the banner stays until restart so +# operators do not miss it. missing_fields reflects only the most recent sampled parse. +_baseline_cache: tuple[frozenset[str], frozenset[str]] | None = None +_baseline_load_failed: bool = False _last_report: SchemaDriftReport = { "known_fields": [], "new_fields": [], @@ -31,6 +34,21 @@ class SchemaDriftReport(TypedDict): has_drift: bool +def is_schema_drift_enabled() -> bool: + """Return False when CLAUDE_CODE_CHAT_BROWSER_SCHEMA_DRIFT=0|false|no.""" + flag = os.environ.get("CLAUDE_CODE_CHAT_BROWSER_SCHEMA_DRIFT", "1").strip().lower() + return flag not in ("0", "false", "no") + + +def schema_drift_sample_limit() -> int: + """Max JSONL records per session to fingerprint (default 3). Set 0 to disable sampling cap.""" + raw = os.environ.get("CLAUDE_CODE_CHAT_BROWSER_SCHEMA_DRIFT_SAMPLE", "3").strip() + try: + return max(0, int(raw)) + except ValueError: + return 3 + + def collect_field_paths(record: dict[str, Any], prefix: str = "") -> set[str]: """Recursively collect dotted JSON paths (with ``[]`` for list items).""" paths: set[str] = set() @@ -48,33 +66,44 @@ def collect_field_paths(record: dict[str, Any], prefix: str = "") -> set[str]: return paths -@functools.lru_cache(maxsize=1) -def _cached_baseline() -> tuple[frozenset[str], frozenset[str]]: - """Load and cache known/required field paths from ``schema_baseline.json``.""" - raw = json.loads(BASELINE_PATH.read_text(encoding="utf-8")) - fields = raw.get("fields", {}) - if not isinstance(fields, dict): - raise ValueError("schema_baseline.json: 'fields' must be an object") - known_paths: set[str] = set() - required_paths: set[str] = set() - for path, spec in fields.items(): - if not isinstance(path, str): - continue - known_paths.add(path) - if isinstance(spec, dict) and spec.get("required"): - required_paths.add(path) - return frozenset(known_paths), frozenset(required_paths) +def _load_baseline() -> tuple[frozenset[str], frozenset[str]]: + """Load baseline paths once; cache success and remember hard failures.""" + global _baseline_cache, _baseline_load_failed + if _baseline_load_failed: + raise ValueError("schema_baseline.json previously failed to load") + if _baseline_cache is not None: + return _baseline_cache + try: + raw = json.loads(BASELINE_PATH.read_text(encoding="utf-8")) + fields = raw.get("fields", {}) + if not isinstance(fields, dict): + raise ValueError("schema_baseline.json: 'fields' must be an object") + known_paths: set[str] = set() + required_paths: set[str] = set() + for path, spec in fields.items(): + if not isinstance(path, str): + continue + known_paths.add(path) + if isinstance(spec, dict) and spec.get("required"): + required_paths.add(path) + _baseline_cache = (frozenset(known_paths), frozenset(required_paths)) + return _baseline_cache + except (OSError, json.JSONDecodeError, ValueError, TypeError) as exc: + if not _baseline_load_failed: + _baseline_load_failed = True + _log.warning("schema drift baseline load failed (will not retry): %s", exc) + raise def load_baseline_fields() -> dict[str, bool]: """Return baseline field paths mapped to whether each path is required.""" - known_paths, required_paths = _cached_baseline() + known_paths, required_paths = _load_baseline() return {path: path in required_paths for path in known_paths} def diff_against_baseline(observed_paths: set[str]) -> SchemaDriftReport: - """Compare observed session field paths to the committed baseline.""" - known_paths, required_paths = _cached_baseline() + """Compare observed session field paths to the committed baseline (paths only, not types).""" + known_paths, required_paths = _load_baseline() known_fields = sorted(known_paths) new_fields = sorted(observed_paths - known_paths) missing_fields = sorted(required_paths - observed_paths) @@ -90,29 +119,32 @@ def record_parse_drift(observed_paths: set[str]) -> SchemaDriftReport | None: """Diff *observed_paths*, log warnings, and merge into the process-wide report.""" try: report = diff_against_baseline(observed_paths) - except (OSError, json.JSONDecodeError, ValueError, TypeError) as exc: - _log.warning("schema drift tracking skipped: %s", exc) + except (OSError, json.JSONDecodeError, ValueError, TypeError): return None - if report["new_fields"]: + with _lock: + global _last_report + prior_new = set(_last_report["new_fields"]) + genuinely_new = sorted(set(report["new_fields"]) - prior_new) + merged_new = sorted(prior_new | set(report["new_fields"])) + + if genuinely_new: _log.warning( "schema drift: new JSONL field paths not in baseline: %s", - report["new_fields"], + genuinely_new, ) if report["missing_fields"]: _log.warning( - "schema drift: missing required JSONL field paths: %s", + "schema drift: missing required JSONL field paths in sampled records: %s", report["missing_fields"], ) + with _lock: - global _last_report - merged_new = sorted(set(_last_report["new_fields"]) | set(report["new_fields"])) - merged_missing = sorted(set(_last_report["missing_fields"]) | set(report["missing_fields"])) _last_report = { "known_fields": report["known_fields"], "new_fields": merged_new, - "missing_fields": merged_missing, - "has_drift": bool(merged_new or merged_missing), + "missing_fields": list(report["missing_fields"]), + "has_drift": bool(merged_new or report["missing_fields"]), } return report @@ -142,4 +174,6 @@ def reset_schema_report() -> None: def clear_baseline_cache() -> None: """Clear the cached baseline (for tests).""" - _cached_baseline.cache_clear() + global _baseline_cache, _baseline_load_failed + _baseline_cache = None + _baseline_load_failed = False From f8bfa1c2f8405976598a0e325d495e9df6475169 Mon Sep 17 00:00:00 2001 From: yu-med Date: Fri, 3 Jul 2026 03:33:45 +0800 Subject: [PATCH 4/5] fix(schema-drift): atomic drift merge, unlimited sample mode, baseline root check (#108) --- tests/test_schema_drift.py | 44 ++++++++++++++++++++++++++++++++++++++ utils/jsonl_parser.py | 13 ++++++++--- utils/schema_drift.py | 25 ++++++++++++---------- 3 files changed, 68 insertions(+), 14 deletions(-) diff --git a/tests/test_schema_drift.py b/tests/test_schema_drift.py index 0528336..b872c44 100644 --- a/tests/test_schema_drift.py +++ b/tests/test_schema_drift.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import threading from pathlib import Path import pytest @@ -168,3 +169,46 @@ def test_schema_drift_disabled_skips_fingerprint(self, monkeypatch): monkeypatch.setenv("CLAUDE_CODE_CHAT_BROWSER_SCHEMA_DRIFT", "0") parse_session(str(UNKNOWN_FIELD_FIXTURE)) assert get_schema_report()["has_drift"] is False + + def test_sample_limit_one_skips_later_records(self, monkeypatch): + monkeypatch.setenv("CLAUDE_CODE_CHAT_BROWSER_SCHEMA_DRIFT_SAMPLE", "1") + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + report = get_schema_report() + assert "tool" not in report["new_fields"] + + def test_sample_limit_zero_fingerprints_all_records(self, monkeypatch): + monkeypatch.setenv("CLAUDE_CODE_CHAT_BROWSER_SCHEMA_DRIFT_SAMPLE", "0") + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + report = get_schema_report() + assert "tool" in report["new_fields"] + + def test_non_object_baseline_root_is_non_fatal(self, tmp_path, monkeypatch): + bad_baseline = tmp_path / "schema_baseline.json" + bad_baseline.write_text('["not", "an", "object"]', encoding="utf-8") + monkeypatch.setattr("utils.schema_drift.BASELINE_PATH", bad_baseline) + clear_baseline_cache() + assert record_parse_drift({"type"}) is None + + def test_concurrent_record_parse_drift_merges_all_fields(self): + start = threading.Barrier(10) + errors: list[BaseException] = [] + + def worker(field: str) -> None: + try: + start.wait(timeout=5) + record_parse_drift({"type", field}) + except BaseException as exc: + errors.append(exc) + + threads = [ + threading.Thread(target=worker, args=(f"concurrent_field_{i}",)) for i in range(10) + ] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + assert errors == [] + report = get_schema_report() + for i in range(10): + assert f"concurrent_field_{i}" in report["new_fields"] diff --git a/utils/jsonl_parser.py b/utils/jsonl_parser.py index 8e6dbd2..337ba27 100644 --- a/utils/jsonl_parser.py +++ b/utils/jsonl_parser.py @@ -175,7 +175,11 @@ def parse_session(filepath: str) -> SessionDict: messages: list[MessageDict] = [] metadata = _new_session_metadata_builder(session_id) observed_field_paths: set[str] = set() - schema_samples_remaining = schema_drift_sample_limit() if is_schema_drift_enabled() else 0 + if is_schema_drift_enabled(): + sample_limit = schema_drift_sample_limit() + schema_samples_remaining: int | None = None if sample_limit == 0 else sample_limit + else: + schema_samples_remaining = -1 with open(filepath, "r", encoding="utf-8", errors="replace") as f: for line in f: @@ -190,9 +194,12 @@ def parse_session(filepath: str) -> SessionDict: if not isinstance(entry, dict): continue - if schema_samples_remaining > 0: + if schema_samples_remaining != -1 and ( + schema_samples_remaining is None or schema_samples_remaining > 0 + ): observed_field_paths |= collect_field_paths(entry) - schema_samples_remaining -= 1 + if schema_samples_remaining is not None: + schema_samples_remaining -= 1 entry_type = entry.get("type") ts = _entry_timestamp(entry) diff --git a/utils/schema_drift.py b/utils/schema_drift.py index 2a4c941..0c288cf 100644 --- a/utils/schema_drift.py +++ b/utils/schema_drift.py @@ -41,7 +41,7 @@ def is_schema_drift_enabled() -> bool: def schema_drift_sample_limit() -> int: - """Max JSONL records per session to fingerprint (default 3). Set 0 to disable sampling cap.""" + """Max JSONL records per session to fingerprint (default 3). 0 means no cap (all records).""" raw = os.environ.get("CLAUDE_CODE_CHAT_BROWSER_SCHEMA_DRIFT_SAMPLE", "3").strip() try: return max(0, int(raw)) @@ -75,6 +75,8 @@ def _load_baseline() -> tuple[frozenset[str], frozenset[str]]: return _baseline_cache try: raw = json.loads(BASELINE_PATH.read_text(encoding="utf-8")) + if not isinstance(raw, dict): + raise ValueError("schema_baseline.json: root must be an object") fields = raw.get("fields", {}) if not isinstance(fields, dict): raise ValueError("schema_baseline.json: 'fields' must be an object") @@ -122,30 +124,31 @@ def record_parse_drift(observed_paths: set[str]) -> SchemaDriftReport | None: except (OSError, json.JSONDecodeError, ValueError, TypeError): return None + genuinely_new: list[str] = [] + missing_fields: list[str] = [] with _lock: global _last_report prior_new = set(_last_report["new_fields"]) genuinely_new = sorted(set(report["new_fields"]) - prior_new) merged_new = sorted(prior_new | set(report["new_fields"])) + missing_fields = list(report["missing_fields"]) + _last_report = { + "known_fields": report["known_fields"], + "new_fields": merged_new, + "missing_fields": missing_fields, + "has_drift": bool(merged_new or missing_fields), + } if genuinely_new: _log.warning( "schema drift: new JSONL field paths not in baseline: %s", genuinely_new, ) - if report["missing_fields"]: + if missing_fields: _log.warning( "schema drift: missing required JSONL field paths in sampled records: %s", - report["missing_fields"], + missing_fields, ) - - with _lock: - _last_report = { - "known_fields": report["known_fields"], - "new_fields": merged_new, - "missing_fields": list(report["missing_fields"]), - "has_drift": bool(merged_new or report["missing_fields"]), - } return report From ae2b4936257cf8d65735410f38fe45c3054ea61a Mon Sep 17 00:00:00 2001 From: yu-med Date: Fri, 3 Jul 2026 04:14:48 +0800 Subject: [PATCH 5/5] fix(schema-drift): remove unused expected_type from baseline (#108) --- schema_baseline.json | 100 +------------------------------------------ 1 file changed, 1 insertion(+), 99 deletions(-) diff --git a/schema_baseline.json b/schema_baseline.json index 20a0f60..dfc76a2 100644 --- a/schema_baseline.json +++ b/schema_baseline.json @@ -1,397 +1,299 @@ { "version": 1, - "description": "Known Claude Code JSONL field paths as of 2026-07-03. Drift detection compares path presence only (added/removed paths); expected_type is documentary metadata for baseline updates, not checked at runtime. Mark required:true only for paths present on every record (currently: type). Update deliberately when upstream format changes.", + "description": "Known Claude Code JSONL field paths as of 2026-07-03. Drift detection compares path presence only (added/removed paths); type changes are not detected. Mark required:true only for paths present on every record (currently: type). Update deliberately when upstream format changes.", "fields": { "_futureSchemaVersion": { - "expected_type": "int", "required": false }, "compactMetadata": { - "expected_type": "dict", "required": false }, "compactMetadata.preTokens": { - "expected_type": "int", "required": false }, "compactMetadata.trigger": { - "expected_type": "str", "required": false }, "content": { - "expected_type": "str", "required": false }, "cwd": { - "expected_type": "str", "required": false }, "data": { - "expected_type": "dict", "required": false }, "data.output": { - "expected_type": "str", "required": false }, "data.type": { - "expected_type": "str", "required": false }, "experimentalFlag": { - "expected_type": "bool", "required": false }, "gitBranch": { - "expected_type": "str", "required": false }, "isApiErrorMessage": { - "expected_type": "bool", "required": false }, "isSidechain": { - "expected_type": "bool", "required": false }, "message": { - "expected_type": "dict", "required": false }, "message.content": { - "expected_type": "list", "required": false }, "message.content[]": { - "expected_type": "list", "required": false }, "message.content[].id": { - "expected_type": "str", "required": false }, "message.content[].input": { - "expected_type": "dict", "required": false }, "message.content[].input.command": { - "expected_type": "str", "required": false }, "message.content[].input.description": { - "expected_type": "str", "required": false }, "message.content[].input.file_path": { - "expected_type": "str", "required": false }, "message.content[].name": { - "expected_type": "str", "required": false }, "message.content[].text": { - "expected_type": "str", "required": false }, "message.content[].thinking": { - "expected_type": "str", "required": false }, "message.content[].type": { - "expected_type": "str", "required": false }, "message.model": { - "expected_type": "str", "required": false }, "message.stop_reason": { - "expected_type": "str", "required": false }, "message.usage": { - "expected_type": "dict", "required": false }, "message.usage.cache_creation": { - "expected_type": "dict", "required": false }, "message.usage.cache_creation.ephemeral_1h_input_tokens": { - "expected_type": "int", "required": false }, "message.usage.cache_creation.ephemeral_5m_input_tokens": { - "expected_type": "int", "required": false }, "message.usage.cache_creation_input_tokens": { - "expected_type": "int", "required": false }, "message.usage.cache_read_input_tokens": { - "expected_type": "int", "required": false }, "message.usage.input_tokens": { - "expected_type": "int", "required": false }, "message.usage.output_tokens": { - "expected_type": "int", "required": false }, "message.usage.service_tier": { - "expected_type": "str", "required": false }, "parentToolUseID": { - "expected_type": "str", "required": false }, "parentUuid": { - "expected_type": "str", "required": false }, "permissionMode": { - "expected_type": "str", "required": false }, "sessionId": { - "expected_type": "str", "required": false }, "slug": { - "expected_type": "str", "required": false }, "snapshot": { - "expected_type": "dict", "required": false }, "snapshot.timestamp": { - "expected_type": "str", "required": false }, "subtype": { - "expected_type": "str", "required": false }, "timestamp": { - "expected_type": "str", "required": false }, "toolUseID": { - "expected_type": "str", "required": false }, "toolUseResult": { - "expected_type": "dict", "required": false }, "toolUseResult._unknownToolMeta": { - "expected_type": "dict", "required": false }, "toolUseResult._unknownToolMeta.vendor": { - "expected_type": "str", "required": false }, "toolUseResult.agentId": { - "expected_type": "str", "required": false }, "toolUseResult.answers": { - "expected_type": "dict", "required": false }, "toolUseResult.answers.q1": { - "expected_type": "str", "required": false }, "toolUseResult.code": { - "expected_type": "int", "required": false }, "toolUseResult.content": { - "expected_type": "str", "required": false }, "toolUseResult.description": { - "expected_type": "str", "required": false }, "toolUseResult.durationMs": { - "expected_type": "int", "required": false }, "toolUseResult.exitCode": { - "expected_type": "int", "required": false }, "toolUseResult.file": { - "expected_type": "dict", "required": false }, "toolUseResult.file.content": { - "expected_type": "str", "required": false }, "toolUseResult.file.filePath": { - "expected_type": "str", "required": false }, "toolUseResult.file.numLines": { - "expected_type": "int", "required": false }, "toolUseResult.filePath": { - "expected_type": "str", "required": false }, "toolUseResult.filenames": { - "expected_type": "list", "required": false }, "toolUseResult.filenames[]": { - "expected_type": "list", "required": false }, "toolUseResult.isAsync": { - "expected_type": "bool", "required": false }, "toolUseResult.message": { - "expected_type": "str", "required": false }, "toolUseResult.mode": { - "expected_type": "str", "required": false }, "toolUseResult.newTodos": { - "expected_type": "list", "required": false }, "toolUseResult.newTodos[]": { - "expected_type": "list", "required": false }, "toolUseResult.newTodos[].content": { - "expected_type": "str", "required": false }, "toolUseResult.newTodos[].id": { - "expected_type": "str", "required": false }, "toolUseResult.numFiles": { - "expected_type": "int", "required": false }, "toolUseResult.numLines": { - "expected_type": "int", "required": false }, "toolUseResult.plan": { - "expected_type": "list", "required": false }, "toolUseResult.plan[]": { - "expected_type": "list", "required": false }, "toolUseResult.query": { - "expected_type": "str", "required": false }, "toolUseResult.questions": { - "expected_type": "list", "required": false }, "toolUseResult.questions[]": { - "expected_type": "list", "required": false }, "toolUseResult.questions[].id": { - "expected_type": "str", "required": false }, "toolUseResult.results": { - "expected_type": "list", "required": false }, "toolUseResult.results[]": { - "expected_type": "list", "required": false }, "toolUseResult.results[].url": { - "expected_type": "str", "required": false }, "toolUseResult.retrieval_status": { - "expected_type": "str", "required": false }, "toolUseResult.status": { - "expected_type": "str", "required": false }, "toolUseResult.stderr": { - "expected_type": "str", "required": false }, "toolUseResult.stdout": { - "expected_type": "str", "required": false }, "toolUseResult.structuredPatch": { - "expected_type": "str", "required": false }, "toolUseResult.task": { - "expected_type": "dict", "required": false }, "toolUseResult.task.description": { - "expected_type": "str", "required": false }, "toolUseResult.task.task_id": { - "expected_type": "str", "required": false }, "toolUseResult.task_id": { - "expected_type": "str", "required": false }, "toolUseResult.task_type": { - "expected_type": "str", "required": false }, "toolUseResult.totalDurationMs": { - "expected_type": "int", "required": false }, "toolUseResult.truncated": { - "expected_type": "bool", "required": false }, "toolUseResult.url": { - "expected_type": "str", "required": false }, "type": { - "expected_type": "str", "required": true }, "uuid": { - "expected_type": "str", "required": false }, "version": { - "expected_type": "str", "required": false } }