diff --git a/api/schema_report.py b/api/schema_report.py new file mode 100644 index 0000000..80ffdf2 --- /dev/null +++ b/api/schema_report.py @@ -0,0 +1,14 @@ +"""Schema drift report endpoint.""" + +from flask import Blueprint + +from api._flask_types import FlaskReturn, json_response +from utils.schema_drift import get_schema_report + +schema_report_bp = Blueprint("schema_report", __name__) + + +@schema_report_bp.route("/api/schema-report") +def schema_report() -> FlaskReturn: + """Return known/new/missing JSONL field paths from recent parse runs.""" + return json_response(get_schema_report()) diff --git a/app.py b/app.py index 3be8c81..2786cc8 100644 --- a/app.py +++ b/app.py @@ -9,6 +9,7 @@ from api.export_api import export_bp from api.projects import projects_bp +from api.schema_report import schema_report_bp from api.search import search_bp from api.sessions import sessions_bp from utils.exclusion_rules import load_rules, resolve_exclusion_rules_path @@ -101,6 +102,7 @@ def create_app( app.register_blueprint(sessions_bp) app.register_blueprint(search_bp) app.register_blueprint(export_bp) + app.register_blueprint(schema_report_bp) @app.after_request def set_security_headers(response): diff --git a/benchmarks/baselines.json b/benchmarks/baselines.json index 07fb84e..99e5e50 100644 --- a/benchmarks/baselines.json +++ b/benchmarks/baselines.json @@ -1,18 +1,18 @@ { - "_note": "Gated means from ubuntu-latest CI benchmark-results.json (PR #97, run 28126772276). Excluded from gate (recorded for reference): test_parse_session_small, test_search_full_corpus (sub-ms CI noise). Memory benchmarks use extra_info.peak_bytes (bytes); latency uses stats.mean (seconds).", - "updated": "2026-06-24T20:15:37Z", + "_note": "Gated means from ubuntu-latest CI benchmark-results.json. PR #108 (schema drift): parse/export latency baselines raised for per-entry field-path fingerprinting. Excluded from gate (recorded for reference): test_parse_session_small, test_search_full_corpus (sub-ms CI noise). Memory benchmarks use extra_info.peak_bytes (bytes); latency uses stats.mean (seconds).", + "updated": "2026-07-03T00:00:00Z", "machine": "Linux", "groups": { "parse": { "test_parse_session_small": 0.00010518068718225604, - "test_parse_session_medium": 0.002991333112179635, - "test_parse_session_large": 0.032311203818181436, + "test_parse_session_medium": 0.004645, + "test_parse_session_large": 0.045401, "test_parse_large_peak_memory": 2032028.0 }, "export": { - "test_bulk_export_session_count[sessions-10]": 0.0042825538530803925, - "test_bulk_export_session_count[sessions-50]": 0.021406330209302382, - "test_bulk_export_session_count[sessions-100]": 0.04229194749999898, + "test_bulk_export_session_count[sessions-10]": 0.006504, + "test_bulk_export_session_count[sessions-50]": 0.032314, + "test_bulk_export_session_count[sessions-100]": 0.064562, "test_bulk_export_zip_peak_memory[sessions-10]": 350628.0, "test_bulk_export_zip_peak_memory[sessions-50]": 506454.0, "test_bulk_export_zip_peak_memory[sessions-100]": 694088.0 diff --git a/schema_baseline.json b/schema_baseline.json new file mode 100644 index 0000000..dfc76a2 --- /dev/null +++ b/schema_baseline.json @@ -0,0 +1,300 @@ +{ + "version": 1, + "description": "Known Claude Code JSONL field paths as of 2026-07-03. Drift detection compares path presence only (added/removed paths); type changes are not detected. Mark required:true only for paths present on every record (currently: type). Update deliberately when upstream format changes.", + "fields": { + "_futureSchemaVersion": { + "required": false + }, + "compactMetadata": { + "required": false + }, + "compactMetadata.preTokens": { + "required": false + }, + "compactMetadata.trigger": { + "required": false + }, + "content": { + "required": false + }, + "cwd": { + "required": false + }, + "data": { + "required": false + }, + "data.output": { + "required": false + }, + "data.type": { + "required": false + }, + "experimentalFlag": { + "required": false + }, + "gitBranch": { + "required": false + }, + "isApiErrorMessage": { + "required": false + }, + "isSidechain": { + "required": false + }, + "message": { + "required": false + }, + "message.content": { + "required": false + }, + "message.content[]": { + "required": false + }, + "message.content[].id": { + "required": false + }, + "message.content[].input": { + "required": false + }, + "message.content[].input.command": { + "required": false + }, + "message.content[].input.description": { + "required": false + }, + "message.content[].input.file_path": { + "required": false + }, + "message.content[].name": { + "required": false + }, + "message.content[].text": { + "required": false + }, + "message.content[].thinking": { + "required": false + }, + "message.content[].type": { + "required": false + }, + "message.model": { + "required": false + }, + "message.stop_reason": { + "required": false + }, + "message.usage": { + "required": false + }, + "message.usage.cache_creation": { + "required": false + }, + "message.usage.cache_creation.ephemeral_1h_input_tokens": { + "required": false + }, + "message.usage.cache_creation.ephemeral_5m_input_tokens": { + "required": false + }, + "message.usage.cache_creation_input_tokens": { + "required": false + }, + "message.usage.cache_read_input_tokens": { + "required": false + }, + "message.usage.input_tokens": { + "required": false + }, + "message.usage.output_tokens": { + "required": false + }, + "message.usage.service_tier": { + "required": false + }, + "parentToolUseID": { + "required": false + }, + "parentUuid": { + "required": false + }, + "permissionMode": { + "required": false + }, + "sessionId": { + "required": false + }, + "slug": { + "required": false + }, + "snapshot": { + "required": false + }, + "snapshot.timestamp": { + "required": false + }, + "subtype": { + "required": false + }, + "timestamp": { + "required": false + }, + "toolUseID": { + "required": false + }, + "toolUseResult": { + "required": false + }, + "toolUseResult._unknownToolMeta": { + "required": false + }, + "toolUseResult._unknownToolMeta.vendor": { + "required": false + }, + "toolUseResult.agentId": { + "required": false + }, + "toolUseResult.answers": { + "required": false + }, + "toolUseResult.answers.q1": { + "required": false + }, + "toolUseResult.code": { + "required": false + }, + "toolUseResult.content": { + "required": false + }, + "toolUseResult.description": { + "required": false + }, + "toolUseResult.durationMs": { + "required": false + }, + "toolUseResult.exitCode": { + "required": false + }, + "toolUseResult.file": { + "required": false + }, + "toolUseResult.file.content": { + "required": false + }, + "toolUseResult.file.filePath": { + "required": false + }, + "toolUseResult.file.numLines": { + "required": false + }, + "toolUseResult.filePath": { + "required": false + }, + "toolUseResult.filenames": { + "required": false + }, + "toolUseResult.filenames[]": { + "required": false + }, + "toolUseResult.isAsync": { + "required": false + }, + "toolUseResult.message": { + "required": false + }, + "toolUseResult.mode": { + "required": false + }, + "toolUseResult.newTodos": { + "required": false + }, + "toolUseResult.newTodos[]": { + "required": false + }, + "toolUseResult.newTodos[].content": { + "required": false + }, + "toolUseResult.newTodos[].id": { + "required": false + }, + "toolUseResult.numFiles": { + "required": false + }, + "toolUseResult.numLines": { + "required": false + }, + "toolUseResult.plan": { + "required": false + }, + "toolUseResult.plan[]": { + "required": false + }, + "toolUseResult.query": { + "required": false + }, + "toolUseResult.questions": { + "required": false + }, + "toolUseResult.questions[]": { + "required": false + }, + "toolUseResult.questions[].id": { + "required": false + }, + "toolUseResult.results": { + "required": false + }, + "toolUseResult.results[]": { + "required": false + }, + "toolUseResult.results[].url": { + "required": false + }, + "toolUseResult.retrieval_status": { + "required": false + }, + "toolUseResult.status": { + "required": false + }, + "toolUseResult.stderr": { + "required": false + }, + "toolUseResult.stdout": { + "required": false + }, + "toolUseResult.structuredPatch": { + "required": false + }, + "toolUseResult.task": { + "required": false + }, + "toolUseResult.task.description": { + "required": false + }, + "toolUseResult.task.task_id": { + "required": false + }, + "toolUseResult.task_id": { + "required": false + }, + "toolUseResult.task_type": { + "required": false + }, + "toolUseResult.totalDurationMs": { + "required": false + }, + "toolUseResult.truncated": { + "required": false + }, + "toolUseResult.url": { + "required": false + }, + "type": { + "required": true + }, + "uuid": { + "required": false + }, + "version": { + "required": false + } + } +} diff --git a/static/css/style.css b/static/css/style.css index 36a14d3..27a6c28 100644 --- a/static/css/style.css +++ b/static/css/style.css @@ -38,6 +38,9 @@ --success-border: #166534; --danger-bg: #450a0a; --danger-border: #991b1b; + --warning-bg: #422006; + --warning-border: #a16207; + --warning-text: #fde68a; --code-bg: #1e1e1e; --spinner: #3b82f6; --shadow: rgba(0, 0, 0, 0.3); @@ -76,6 +79,9 @@ --success-border: #bbf7d0; --danger-bg: #fef2f2; --danger-border: #fecaca; + --warning-bg: #fefce8; + --warning-border: #fde047; + --warning-text: #854d0e; --code-bg: #f5f5f5; --spinner: #2563eb; --shadow: rgba(0, 0, 0, 0.08); @@ -249,6 +255,27 @@ h3 { font-size: 1.15rem; font-weight: 600; } .alert-info { background: var(--info-bg); border: 1px solid var(--info-border); color: var(--info-text); } .alert-success { background: var(--success-bg); border: 1px solid var(--success-border); color: var(--success); } .alert-danger { background: var(--danger-bg); border: 1px solid var(--danger-border); color: var(--danger); } +.alert-warning { + background: var(--warning-bg); + border: 1px solid var(--warning-border); + color: var(--warning-text); + display: flex; + align-items: flex-start; + justify-content: space-between; + gap: 0.75rem; +} +.alert-warning__body { flex: 1; } +.alert-warning__dismiss { + background: transparent; + border: none; + color: inherit; + cursor: pointer; + font-size: 1.1rem; + line-height: 1; + padding: 0 0.25rem; + opacity: 0.8; +} +.alert-warning__dismiss:hover { opacity: 1; } /* ---------- Badges ---------- */ .badge { diff --git a/static/js/sessions.js b/static/js/sessions.js index 33d5a8c..ee2a9b3 100644 --- a/static/js/sessions.js +++ b/static/js/sessions.js @@ -8,6 +8,62 @@ import { downloadSession } from './export.js'; import { showProjects } from './projects.js'; import { renderToolUse, renderToolResult, toolResultHasBody } from './render/registry.js'; +// ==================== Schema drift banner ==================== + +const SCHEMA_DRIFT_DISMISS_KEY = 'schema-drift-banner-dismissed'; + +function schemaDriftFingerprint(report) { + const parts = [ + ...(report.new_fields || []), + ...(report.missing_fields || []), + ].sort(); + return parts.join('|'); +} + +async function fetchSchemaDriftBannerHtml() { + try { + const res = await fetch('/api/schema-report'); + if (!res.ok) return ''; + const report = await res.json(); + if (!report.has_drift) return ''; + + const fingerprint = schemaDriftFingerprint(report); + if (sessionStorage.getItem(SCHEMA_DRIFT_DISMISS_KEY) === fingerprint) return ''; + + const newFields = (report.new_fields || []).slice(0, 5); + const missingFields = (report.missing_fields || []).slice(0, 5); + let detail = ''; + if (newFields.length) { + detail += `
New fields: ${esc(newFields.join(', '))}${(report.new_fields || []).length > 5 ? '…' : ''}
`; + } + if (missingFields.length) { + detail += `
Missing required fields: ${esc(missingFields.join(', '))}${(report.missing_fields || []).length > 5 ? '…' : ''}
`; + } + + return `
+
+ Upstream JSONL schema drift detected +
Claude Code may have changed its session format. Parsing continues, but some data may be incomplete.
+ ${detail} +
+ +
`; + } catch { + return ''; + } +} + +function bindSchemaDriftBanner(root) { + const banner = root.querySelector('#schema-drift-banner'); + const dismiss = root.querySelector('#schema-drift-dismiss'); + if (!banner || !dismiss) return; + dismiss.addEventListener('click', () => { + const fingerprint = banner.getAttribute('data-drift-fingerprint') || ''; + sessionStorage.setItem(SCHEMA_DRIFT_DISMISS_KEY, fingerprint); + banner.remove(); + }); +} + // ==================== Workspace (split layout) ==================== export async function showWorkspace(projectName, selectedSessionId) { @@ -26,6 +82,7 @@ export async function showWorkspace(projectName, selectedSessionId) { } const prettyName = state.projectDisplayNames[projectName] || projectName; + const schemaBannerPromise = fetchSchemaDriftBannerHtml(); const res = await fetch(`/api/projects/${encodeURIComponent(projectName)}/sessions`); state.cachedSessions = await res.json(); @@ -85,6 +142,13 @@ export async function showWorkspace(projectName, selectedSessionId) { `; smoothSet(content, html); bindSidebarSessionClicks(); + void schemaBannerPromise.then((schemaBannerHtml) => { + if (!schemaBannerHtml) return; + const root = document.getElementById('content'); + if (!root) return; + root.insertAdjacentHTML('afterbegin', schemaBannerHtml); + bindSchemaDriftBanner(root); + }); content.querySelector('#ws-back-link')?.addEventListener('click', (e) => { e.preventDefault(); showProjects(); diff --git a/static/js/sessions.test.js b/static/js/sessions.test.js index 74b2044..ce06480 100644 --- a/static/js/sessions.test.js +++ b/static/js/sessions.test.js @@ -50,14 +50,33 @@ const SESSION_DETAIL = { }, }; -function mockWorkspaceFetch() { +const NO_DRIFT_REPORT = { + known_fields: ['type'], + new_fields: [], + missing_fields: [], + has_drift: false, +}; + +const DRIFT_REPORT = { + known_fields: ['type'], + new_fields: ['tool', 'tool.type'], + missing_fields: [], + has_drift: true, +}; + +function mockWorkspaceFetch({ schemaReport = NO_DRIFT_REPORT } = {}) { + const callOrder = []; fetch.mockImplementation((url) => { + callOrder.push(url); if (url === '/api/projects') { return Promise.resolve({ ok: true, json: () => Promise.resolve([{ name: 'alpha', display_name: 'Alpha' }]), }); } + if (url === '/api/schema-report') { + return Promise.resolve({ ok: true, json: () => Promise.resolve(schemaReport) }); + } if (url === '/api/projects/alpha/sessions') { return Promise.resolve({ ok: true, json: () => Promise.resolve(SESSION_LIST) }); } @@ -75,6 +94,7 @@ function mockWorkspaceFetch() { } return Promise.reject(new Error(`unexpected fetch: ${url}`)); }); + return callOrder; } describe('sessions workspace', () => { @@ -87,6 +107,7 @@ describe('sessions workspace', () => { state.projectDisplayNames = {}; vi.stubGlobal('fetch', vi.fn()); window.location.hash = ''; + sessionStorage.clear(); }); afterEach(() => { @@ -122,6 +143,31 @@ describe('sessions workspace', () => { expect(active.id).toBe('sidebar-sess-2'); }); + it('showWorkspace starts schema report fetch with sessions without blocking render', async () => { + const callOrder = mockWorkspaceFetch(); + await showWorkspace('alpha'); + + const sessionsIdx = callOrder.indexOf('/api/projects/alpha/sessions'); + const schemaIdx = callOrder.indexOf('/api/schema-report'); + expect(sessionsIdx).toBeGreaterThanOrEqual(0); + expect(schemaIdx).toBeGreaterThanOrEqual(0); + expect(schemaIdx).toBeLessThan(sessionsIdx + 2); + expect(document.getElementById('sidebar')).not.toBeNull(); + expect(document.getElementById('schema-drift-banner')).toBeNull(); + }); + + it('showWorkspace renders schema drift banner when report has drift', async () => { + mockWorkspaceFetch({ schemaReport: DRIFT_REPORT }); + await showWorkspace('alpha'); + + await vi.waitFor(() => { + expect(document.getElementById('schema-drift-banner')).not.toBeNull(); + }); + const banner = document.getElementById('schema-drift-banner'); + expect(banner.textContent).toContain('Upstream JSONL schema drift detected'); + expect(banner.textContent).toContain('tool'); + }); + it('loadSession renders messages in the main panel', async () => { mockWorkspaceFetch(); await showWorkspace('alpha'); diff --git a/tests/fixtures/jsonl/unknown_field.jsonl b/tests/fixtures/jsonl/unknown_field.jsonl new file mode 100644 index 0000000..70ebb98 --- /dev/null +++ b/tests/fixtures/jsonl/unknown_field.jsonl @@ -0,0 +1,3 @@ +{"type": "user", "timestamp": "2026-07-03T12:00:00Z", "cwd": "/test/project", "message": {"content": [{"type": "text", "text": "Synthetic session with unknown upstream fields"}]}} +{"type": "assistant", "timestamp": "2026-07-03T12:00:01Z", "message": {"model": "claude-test", "content": [{"type": "tool_use", "name": "FutureToolXYZ", "input": {"new_field": "experimental value"}}], "usage": {"input_tokens": 10, "output_tokens": 5}}} +{"type": "user", "timestamp": "2026-07-03T12:00:02Z", "message": {"content": []}, "tool": {"type": "FutureToolXYZ", "new_field": "upstream-added"}, "toolUseResult": {"stdout": "done\n", "stderr": "", "exitCode": 0}} diff --git a/tests/test_schema_drift.py b/tests/test_schema_drift.py new file mode 100644 index 0000000..b872c44 --- /dev/null +++ b/tests/test_schema_drift.py @@ -0,0 +1,214 @@ +"""Tests for JSONL schema drift detection (issue #5).""" + +from __future__ import annotations + +import logging +import threading +from pathlib import Path + +import pytest + +from utils.jsonl_parser import parse_session +from utils.schema_drift import ( + clear_baseline_cache, + collect_field_paths, + diff_against_baseline, + get_schema_report, + load_baseline_fields, + record_parse_drift, + reset_schema_report, +) + +FIXTURES = Path(__file__).parent / "fixtures" +UNKNOWN_FIELD_FIXTURE = FIXTURES / "jsonl" / "unknown_field.jsonl" + + +@pytest.fixture(autouse=True) +def _clear_schema_report(): + reset_schema_report() + clear_baseline_cache() + yield + reset_schema_report() + clear_baseline_cache() + + +class TestCollectFieldPaths: + def test_nested_paths_use_dotted_notation(self): + record = { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "hi"}]}, + } + paths = collect_field_paths(record) + assert "type" in paths + assert "message" in paths + assert "message.content" in paths + assert "message.content[]" in paths + assert "message.content[].type" in paths + assert "message.content[].text" in paths + + +class TestSchemaBaseline: + def test_baseline_is_committed_and_loads(self): + fields = load_baseline_fields() + assert len(fields) > 0 + assert fields["type"] is True + + def test_minimal_fixture_has_no_drift(self): + report = diff_against_baseline(_collect_field_paths_from_fixture("session_minimal.jsonl")) + assert report["new_fields"] == [] + assert report["missing_fields"] == [] + + +def _collect_field_paths_from_fixture(name: str) -> set[str]: + paths: set[str] = set() + text = (FIXTURES / name).read_text(encoding="utf-8") + import json + + for line in text.splitlines(): + line = line.strip() + if not line: + continue + entry = json.loads(line) + if isinstance(entry, dict): + paths |= collect_field_paths(entry) + return paths + + +class TestSchemaDriftWarnings: + def test_unknown_field_fixture_emits_warning(self, caplog: pytest.LogCaptureFixture): + with caplog.at_level(logging.WARNING, logger="claude_code_chat_browser.schema_drift"): + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + + drift_records = [ + r for r in caplog.records if r.name == "claude_code_chat_browser.schema_drift" + ] + assert drift_records, "expected schema_drift logger warning" + assert any("new JSONL field paths" in r.message for r in drift_records) + + def test_unknown_field_fixture_reports_new_fields(self): + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + report = get_schema_report() + assert report["has_drift"] is True + assert "tool" in report["new_fields"] + assert "tool.type" in report["new_fields"] + assert "tool.new_field" in report["new_fields"] + + def test_optional_absent_fields_do_not_warn(self, caplog: pytest.LogCaptureFixture): + with caplog.at_level(logging.WARNING, logger="claude_code_chat_browser.schema_drift"): + parse_session(str(FIXTURES / "session_minimal.jsonl")) + + drift_records = [ + r for r in caplog.records if r.name == "claude_code_chat_browser.schema_drift" + ] + assert drift_records == [] + + +class TestSchemaReportApi: + def test_schema_report_endpoint(self, client): + parse_session(str(FIXTURES / "session_minimal.jsonl")) + resp = client.get("/api/schema-report") + assert resp.status_code == 200 + body = resp.get_json() + assert body is not None + assert "known_fields" in body + assert "new_fields" in body + assert "missing_fields" in body + assert body["has_drift"] is False + + def test_schema_report_reflects_unknown_fixture(self, client): + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + resp = client.get("/api/schema-report") + body = resp.get_json() + assert body is not None + assert body["has_drift"] is True + assert "tool" in body["new_fields"] + + +class TestRecordParseDrift: + def test_merges_reports_across_parses(self): + record_parse_drift({"type", "timestamp"}) + record_parse_drift({"type", "tool"}) + report = get_schema_report() + assert "tool" in report["new_fields"] + + def test_malformed_baseline_is_non_fatal(self, tmp_path, monkeypatch): + bad_baseline = tmp_path / "schema_baseline.json" + bad_baseline.write_text("{not json", encoding="utf-8") + monkeypatch.setattr("utils.schema_drift.BASELINE_PATH", bad_baseline) + clear_baseline_cache() + assert record_parse_drift({"type"}) is None + + def test_parse_session_survives_malformed_baseline( + self, tmp_path, monkeypatch, caplog: pytest.LogCaptureFixture + ): + bad_baseline = tmp_path / "schema_baseline.json" + bad_baseline.write_text("{not json", encoding="utf-8") + monkeypatch.setattr("utils.schema_drift.BASELINE_PATH", bad_baseline) + clear_baseline_cache() + + with caplog.at_level(logging.WARNING, logger="claude_code_chat_browser.schema_drift"): + session = parse_session(str(FIXTURES / "session_minimal.jsonl")) + + assert session["session_id"] + assert any("baseline load failed" in r.message for r in caplog.records) + + def test_duplicate_new_fields_log_once(self, caplog: pytest.LogCaptureFixture): + with caplog.at_level(logging.WARNING, logger="claude_code_chat_browser.schema_drift"): + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + + new_field_warnings = [ + r + for r in caplog.records + if r.name == "claude_code_chat_browser.schema_drift" + and "new JSONL field paths" in r.message + ] + assert len(new_field_warnings) == 1 + + def test_schema_drift_disabled_skips_fingerprint(self, monkeypatch): + monkeypatch.setenv("CLAUDE_CODE_CHAT_BROWSER_SCHEMA_DRIFT", "0") + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + assert get_schema_report()["has_drift"] is False + + def test_sample_limit_one_skips_later_records(self, monkeypatch): + monkeypatch.setenv("CLAUDE_CODE_CHAT_BROWSER_SCHEMA_DRIFT_SAMPLE", "1") + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + report = get_schema_report() + assert "tool" not in report["new_fields"] + + def test_sample_limit_zero_fingerprints_all_records(self, monkeypatch): + monkeypatch.setenv("CLAUDE_CODE_CHAT_BROWSER_SCHEMA_DRIFT_SAMPLE", "0") + parse_session(str(UNKNOWN_FIELD_FIXTURE)) + report = get_schema_report() + assert "tool" in report["new_fields"] + + def test_non_object_baseline_root_is_non_fatal(self, tmp_path, monkeypatch): + bad_baseline = tmp_path / "schema_baseline.json" + bad_baseline.write_text('["not", "an", "object"]', encoding="utf-8") + monkeypatch.setattr("utils.schema_drift.BASELINE_PATH", bad_baseline) + clear_baseline_cache() + assert record_parse_drift({"type"}) is None + + def test_concurrent_record_parse_drift_merges_all_fields(self): + start = threading.Barrier(10) + errors: list[BaseException] = [] + + def worker(field: str) -> None: + try: + start.wait(timeout=5) + record_parse_drift({"type", field}) + except BaseException as exc: + errors.append(exc) + + threads = [ + threading.Thread(target=worker, args=(f"concurrent_field_{i}",)) for i in range(10) + ] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + assert errors == [] + report = get_schema_report() + for i in range(10): + assert f"concurrent_field_{i}" in report["new_fields"] diff --git a/utils/jsonl_parser.py b/utils/jsonl_parser.py index 5466bb0..337ba27 100644 --- a/utils/jsonl_parser.py +++ b/utils/jsonl_parser.py @@ -25,6 +25,12 @@ infer_title as _infer_title, normalize_content as _normalize_content, ) +from utils.schema_drift import ( + collect_field_paths, + is_schema_drift_enabled, + record_parse_drift, + schema_drift_sample_limit, +) from utils.session_peek import quick_session_info from utils.tool_dispatch import _parse_tool_result, track_tool_file_activity from utils.validation import validate_session_dict @@ -168,6 +174,12 @@ def parse_session(filepath: str) -> SessionDict: session_id = os.path.basename(filepath).replace(".jsonl", "") messages: list[MessageDict] = [] metadata = _new_session_metadata_builder(session_id) + observed_field_paths: set[str] = set() + if is_schema_drift_enabled(): + sample_limit = schema_drift_sample_limit() + schema_samples_remaining: int | None = None if sample_limit == 0 else sample_limit + else: + schema_samples_remaining = -1 with open(filepath, "r", encoding="utf-8", errors="replace") as f: for line in f: @@ -182,6 +194,13 @@ def parse_session(filepath: str) -> SessionDict: if not isinstance(entry, dict): continue + if schema_samples_remaining != -1 and ( + schema_samples_remaining is None or schema_samples_remaining > 0 + ): + observed_field_paths |= collect_field_paths(entry) + if schema_samples_remaining is not None: + schema_samples_remaining -= 1 + entry_type = entry.get("type") ts = _entry_timestamp(entry) @@ -226,6 +245,9 @@ def parse_session(filepath: str) -> SessionDict: title = _infer_title(messages) + if is_schema_drift_enabled() and observed_field_paths: + record_parse_drift(observed_field_paths) + return validate_session_dict( { "session_id": session_id, diff --git a/utils/schema_drift.py b/utils/schema_drift.py new file mode 100644 index 0000000..0c288cf --- /dev/null +++ b/utils/schema_drift.py @@ -0,0 +1,182 @@ +"""Detect upstream Claude Code JSONL schema drift against a committed baseline.""" + +from __future__ import annotations + +import json +import logging +import os +import threading +from pathlib import Path +from typing import Any, TypedDict + +_log = logging.getLogger("claude_code_chat_browser.schema_drift") + +BASELINE_PATH = Path(__file__).resolve().parent.parent / "schema_baseline.json" + +_lock = threading.Lock() +# Process-wide union of *new* field paths seen since server start (or reset_schema_report). +# Intentionally sticky: once upstream drift is detected, the banner stays until restart so +# operators do not miss it. missing_fields reflects only the most recent sampled parse. +_baseline_cache: tuple[frozenset[str], frozenset[str]] | None = None +_baseline_load_failed: bool = False +_last_report: SchemaDriftReport = { + "known_fields": [], + "new_fields": [], + "missing_fields": [], + "has_drift": False, +} + + +class SchemaDriftReport(TypedDict): + known_fields: list[str] + new_fields: list[str] + missing_fields: list[str] + has_drift: bool + + +def is_schema_drift_enabled() -> bool: + """Return False when CLAUDE_CODE_CHAT_BROWSER_SCHEMA_DRIFT=0|false|no.""" + flag = os.environ.get("CLAUDE_CODE_CHAT_BROWSER_SCHEMA_DRIFT", "1").strip().lower() + return flag not in ("0", "false", "no") + + +def schema_drift_sample_limit() -> int: + """Max JSONL records per session to fingerprint (default 3). 0 means no cap (all records).""" + raw = os.environ.get("CLAUDE_CODE_CHAT_BROWSER_SCHEMA_DRIFT_SAMPLE", "3").strip() + try: + return max(0, int(raw)) + except ValueError: + return 3 + + +def collect_field_paths(record: dict[str, Any], prefix: str = "") -> set[str]: + """Recursively collect dotted JSON paths (with ``[]`` for list items).""" + paths: set[str] = set() + for key, value in record.items(): + path = f"{prefix}.{key}" if prefix else key + paths.add(path) + if isinstance(value, dict): + paths |= collect_field_paths(value, path) + elif isinstance(value, list): + list_path = f"{path}[]" + paths.add(list_path) + for item in value: + if isinstance(item, dict): + paths |= collect_field_paths(item, list_path) + return paths + + +def _load_baseline() -> tuple[frozenset[str], frozenset[str]]: + """Load baseline paths once; cache success and remember hard failures.""" + global _baseline_cache, _baseline_load_failed + if _baseline_load_failed: + raise ValueError("schema_baseline.json previously failed to load") + if _baseline_cache is not None: + return _baseline_cache + try: + raw = json.loads(BASELINE_PATH.read_text(encoding="utf-8")) + if not isinstance(raw, dict): + raise ValueError("schema_baseline.json: root must be an object") + fields = raw.get("fields", {}) + if not isinstance(fields, dict): + raise ValueError("schema_baseline.json: 'fields' must be an object") + known_paths: set[str] = set() + required_paths: set[str] = set() + for path, spec in fields.items(): + if not isinstance(path, str): + continue + known_paths.add(path) + if isinstance(spec, dict) and spec.get("required"): + required_paths.add(path) + _baseline_cache = (frozenset(known_paths), frozenset(required_paths)) + return _baseline_cache + except (OSError, json.JSONDecodeError, ValueError, TypeError) as exc: + if not _baseline_load_failed: + _baseline_load_failed = True + _log.warning("schema drift baseline load failed (will not retry): %s", exc) + raise + + +def load_baseline_fields() -> dict[str, bool]: + """Return baseline field paths mapped to whether each path is required.""" + known_paths, required_paths = _load_baseline() + return {path: path in required_paths for path in known_paths} + + +def diff_against_baseline(observed_paths: set[str]) -> SchemaDriftReport: + """Compare observed session field paths to the committed baseline (paths only, not types).""" + known_paths, required_paths = _load_baseline() + known_fields = sorted(known_paths) + new_fields = sorted(observed_paths - known_paths) + missing_fields = sorted(required_paths - observed_paths) + return { + "known_fields": known_fields, + "new_fields": new_fields, + "missing_fields": missing_fields, + "has_drift": bool(new_fields or missing_fields), + } + + +def record_parse_drift(observed_paths: set[str]) -> SchemaDriftReport | None: + """Diff *observed_paths*, log warnings, and merge into the process-wide report.""" + try: + report = diff_against_baseline(observed_paths) + except (OSError, json.JSONDecodeError, ValueError, TypeError): + return None + + genuinely_new: list[str] = [] + missing_fields: list[str] = [] + with _lock: + global _last_report + prior_new = set(_last_report["new_fields"]) + genuinely_new = sorted(set(report["new_fields"]) - prior_new) + merged_new = sorted(prior_new | set(report["new_fields"])) + missing_fields = list(report["missing_fields"]) + _last_report = { + "known_fields": report["known_fields"], + "new_fields": merged_new, + "missing_fields": missing_fields, + "has_drift": bool(merged_new or missing_fields), + } + + if genuinely_new: + _log.warning( + "schema drift: new JSONL field paths not in baseline: %s", + genuinely_new, + ) + if missing_fields: + _log.warning( + "schema drift: missing required JSONL field paths in sampled records: %s", + missing_fields, + ) + return report + + +def get_schema_report() -> SchemaDriftReport: + """Return the accumulated schema drift report from recent parse runs.""" + with _lock: + return { + "known_fields": list(_last_report["known_fields"]), + "new_fields": list(_last_report["new_fields"]), + "missing_fields": list(_last_report["missing_fields"]), + "has_drift": _last_report["has_drift"], + } + + +def reset_schema_report() -> None: + """Clear the accumulated report (for tests).""" + with _lock: + global _last_report + _last_report = { + "known_fields": [], + "new_fields": [], + "missing_fields": [], + "has_drift": False, + } + + +def clear_baseline_cache() -> None: + """Clear the cached baseline (for tests).""" + global _baseline_cache, _baseline_load_failed + _baseline_cache = None + _baseline_load_failed = False