diff --git a/models/session.py b/models/session.py index 0d4ff5d..cf0e608 100644 --- a/models/session.py +++ b/models/session.py @@ -52,17 +52,29 @@ class MessageDict(TypedDict): parent_tool_use_id: NotRequired[str | None] -class SessionMetadataDict(TypedDict, total=False): +class SessionMetadataDict(TypedDict): + """Metadata accumulated while parsing a Claude Code JSONL session. + + ``parse_session()`` always produces every field below via + ``_finalize_session_metadata()``; defaults are zeros, empty collections, + or ``None`` where noted. Mypy treats the full shape as required so parser + and finalize code cannot drop a field silently. + + The three identity/timing keys are also enforced at the runtime validation + boundary (``validate_session_dict``) with stricter type checks; remaining + keys must be present but are only type-checked lightly there. + """ + session_id: str models_used: list[str] + first_timestamp: str | None + last_timestamp: str | None total_input_tokens: int total_output_tokens: int total_cache_read_tokens: int total_cache_creation_tokens: int total_tool_calls: int tool_call_counts: dict[str, int] - first_timestamp: str | None - last_timestamp: str | None version: str | None cwd: str | None git_branch: str | None @@ -84,6 +96,45 @@ class SessionMetadataDict(TypedDict, total=False): entry_counts: dict[str, int] +# Derived from SessionMetadataDict — single source of truth for parity tests. +SESSION_METADATA_FIELD_NAMES = frozenset(SessionMetadataDict.__annotations__) +SESSION_METADATA_REQUIRED_KEYS = SessionMetadataDict.__required_keys__ + + +class SessionMetadataBuilderDict(TypedDict): + """Mutable metadata accumulator during JSONL parsing; sets are sorted at finalize.""" + + session_id: str + models_used: set[str] + first_timestamp: str | None + last_timestamp: str | None + total_input_tokens: int + total_output_tokens: int + total_cache_read_tokens: int + total_cache_creation_tokens: int + total_tool_calls: int + tool_call_counts: dict[str, int] + version: str | None + cwd: str | None + git_branch: str | None + permission_mode: str | None + compactions: int + total_ephemeral_5m_tokens: int + total_ephemeral_1h_tokens: int + service_tiers: set[str] + session_wall_time_seconds: float | None + compact_boundaries: list[dict[str, Any]] + api_errors: int + files_read: set[str] + files_written: set[str] + files_created: set[str] + bash_commands: list[Any] + web_fetches: list[Any] + sidechain_messages: int + stop_reasons: dict[str, int] + entry_counts: dict[str, int] + + class SessionDict(TypedDict): session_id: str title: str diff --git a/tests/test_exclusion_helpers.py b/tests/test_exclusion_helpers.py index 1792d5b..62f5133 100644 --- a/tests/test_exclusion_helpers.py +++ b/tests/test_exclusion_helpers.py @@ -45,7 +45,11 @@ def _session( ) -> dict: return { "title": title, - "metadata": {"models_used": models or []}, + "metadata": { + "session_id": "stub", + "models_used": models or [], + "first_timestamp": None, + }, "messages": messages or [], } diff --git a/tests/test_jsonl_parser.py b/tests/test_jsonl_parser.py index a53b1e4..c927d85 100644 --- a/tests/test_jsonl_parser.py +++ b/tests/test_jsonl_parser.py @@ -7,6 +7,7 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from models.session import SESSION_METADATA_FIELD_NAMES from utils.jsonl_helpers import ( extract_images, extract_text, @@ -14,7 +15,12 @@ normalize_content, strip_system_tags, ) -from utils.jsonl_parser import parse_session, quick_session_info +from utils.jsonl_parser import ( + _finalize_session_metadata, + _new_session_metadata_builder, + parse_session, + quick_session_info, +) from utils.tool_dispatch import _parse_tool_result # --------------------------------------------------------------------------- @@ -38,6 +44,18 @@ def _parse_entries(entries: list) -> dict: os.unlink(path) +class TestSessionMetadataFinalize: + def test_builder_keys_match_field_names_constant(self): + raw = _new_session_metadata_builder("parity-test") + assert set(raw.keys()) == SESSION_METADATA_FIELD_NAMES + + def test_finalize_preserves_all_builder_keys(self): + raw = _new_session_metadata_builder("parity-test") + finalized = _finalize_session_metadata(raw) + assert set(finalized.keys()) == SESSION_METADATA_FIELD_NAMES + assert set(finalized.keys()) == set(raw.keys()) + + # --------------------------------------------------------------------------- # _parse_tool_result # --------------------------------------------------------------------------- @@ -720,6 +738,23 @@ def test_file_history_snapshot_timestamp(self): finally: os.unlink(path) + def test_file_history_snapshot_timestamp_falls_back_when_top_level_invalid(self): + path = _write_jsonl( + [ + { + "type": "file-history-snapshot", + "timestamp": 1, + "snapshot": {"timestamp": "2026-01-02T12:00:00Z"}, + }, + ] + ) + try: + s = parse_session(path) + assert s["metadata"]["first_timestamp"] == "2026-01-02T12:00:00Z" + assert s["metadata"]["last_timestamp"] == "2026-01-02T12:00:00Z" + finally: + os.unlink(path) + def test_summary_entry_type_produces_no_message(self, caplog): path = _write_jsonl( [ diff --git a/tests/test_jsonl_validation.py b/tests/test_jsonl_validation.py index f9b7cf7..48df6cc 100644 --- a/tests/test_jsonl_validation.py +++ b/tests/test_jsonl_validation.py @@ -9,18 +9,28 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from models.errors import SessionValidationError -from utils.jsonl_parser import parse_session +from utils.jsonl_parser import ( + _finalize_session_metadata, + _new_session_metadata_builder, + parse_session, +) from utils.validation import validate_session_dict FIXTURES = os.path.join(os.path.dirname(__file__), "fixtures") +def _full_metadata(**overrides: Any) -> dict[str, Any]: + raw = _new_session_metadata_builder("abc123") + raw.update(overrides) + return dict(_finalize_session_metadata(raw)) + + def _valid_payload(**overrides: Any) -> dict[str, Any]: base: dict[str, Any] = { "session_id": "abc123", "title": "Test Session", "messages": [{"role": "user", "text": "hello"}], - "metadata": {"session_id": "abc123"}, + "metadata": _full_metadata(), } base.update(overrides) return base @@ -60,6 +70,40 @@ def test_metadata_not_dict(self): validate_session_dict(_valid_payload(metadata="not-a-dict")) assert exc_info.value.path == "metadata" + def test_metadata_missing_session_id(self): + metadata = _full_metadata() + del metadata["session_id"] + with pytest.raises(SessionValidationError) as exc_info: + validate_session_dict(_valid_payload(metadata=metadata)) + assert exc_info.value.path == "metadata.session_id" + + def test_metadata_missing_models_used(self): + metadata = _full_metadata() + del metadata["models_used"] + with pytest.raises(SessionValidationError) as exc_info: + validate_session_dict(_valid_payload(metadata=metadata)) + assert exc_info.value.path == "metadata.models_used" + + def test_metadata_missing_first_timestamp(self): + metadata = _full_metadata() + del metadata["first_timestamp"] + with pytest.raises(SessionValidationError) as exc_info: + validate_session_dict(_valid_payload(metadata=metadata)) + assert exc_info.value.path == "metadata.first_timestamp" + + def test_metadata_first_timestamp_null_allowed(self): + result = validate_session_dict( + _valid_payload(metadata=_full_metadata(first_timestamp=None)) + ) + assert result["metadata"]["first_timestamp"] is None + + def test_metadata_models_used_requires_string_elements(self): + metadata = _full_metadata() + metadata["models_used"] = ["claude-sonnet", 42] + with pytest.raises(SessionValidationError) as exc_info: + validate_session_dict(_valid_payload(metadata=metadata)) + assert exc_info.value.path == "metadata.models_used[1]" + def test_message_not_dict(self): with pytest.raises(SessionValidationError) as exc_info: validate_session_dict(_valid_payload(messages=["not-a-dict"])) diff --git a/utils/export_engine.py b/utils/export_engine.py index 201ac91..2f7bf52 100644 --- a/utils/export_engine.py +++ b/utils/export_engine.py @@ -176,7 +176,7 @@ def finalize(self, manifest: list[dict[str, Any]]) -> None: def _resolve_first_timestamp(meta: SessionMetadataDict, sess_info: SessionListItemDict) -> str: """Return first_timestamp from metadata, or synthesise from mtime without mutating *meta*.""" - ts = (meta.get("first_timestamp") or "").strip() + ts = (meta["first_timestamp"] or "").strip() if not ts: ts = datetime.fromtimestamp(sess_info["modified"], tz=timezone.utc).strftime( "%Y-%m-%dT%H:%M:%S" diff --git a/utils/jsonl_parser.py b/utils/jsonl_parser.py index 04dec48..5466bb0 100644 --- a/utils/jsonl_parser.py +++ b/utils/jsonl_parser.py @@ -9,7 +9,14 @@ from typing import Any, cast, get_args from models.record_data import RecordDataUnion -from models.session import MessageDict, RoleLiteral, SessionDict, ToolUseDict +from models.session import ( + MessageDict, + RoleLiteral, + SessionDict, + SessionMetadataBuilderDict, + SessionMetadataDict, + ToolUseDict, +) from models.tool_results import ToolResultUnion, is_tool_result_dict from utils.jsonl_helpers import ( entry_message as _entry_message, @@ -70,13 +77,9 @@ def _safe_int(val: Any) -> int: return 0 -def parse_session(filepath: str) -> SessionDict: - """Main entry point. Reads every line from a .jsonl file and builds up - a session dict with messages, metadata (tokens, models, tool counts), - and file/command activity.""" - session_id = os.path.basename(filepath).replace(".jsonl", "") - messages: list[MessageDict] = [] - metadata: dict[str, Any] = { +def _new_session_metadata_builder(session_id: str) -> SessionMetadataBuilderDict: + """Fresh mutable metadata accumulator for ``parse_session()``.""" + return { "session_id": session_id, "models_used": set(), "total_input_tokens": 0, @@ -92,30 +95,80 @@ def parse_session(filepath: str) -> SessionDict: "git_branch": None, "permission_mode": None, "compactions": 0, - # Extended token accounting "total_ephemeral_5m_tokens": 0, "total_ephemeral_1h_tokens": 0, "service_tiers": set(), - # Timing "session_wall_time_seconds": None, - # Compaction details "compact_boundaries": [], - # Error tracking "api_errors": 0, - # File activity (from tool_use inputs) "files_read": set(), "files_written": set(), "files_created": set(), "bash_commands": [], "web_fetches": [], - # Sidechain tracking "sidechain_messages": 0, - # Stop reasons "stop_reasons": {}, - # Entry type counts "entry_counts": {}, } + +def _finalize_session_metadata(raw: SessionMetadataBuilderDict) -> SessionMetadataDict: + """Convert the mutable parse-time metadata builder into a SessionMetadataDict.""" + return { + "session_id": raw["session_id"], + "models_used": sorted(raw["models_used"]), + "first_timestamp": raw["first_timestamp"], + "last_timestamp": raw["last_timestamp"], + "total_input_tokens": raw["total_input_tokens"], + "total_output_tokens": raw["total_output_tokens"], + "total_cache_read_tokens": raw["total_cache_read_tokens"], + "total_cache_creation_tokens": raw["total_cache_creation_tokens"], + "total_tool_calls": raw["total_tool_calls"], + "tool_call_counts": raw["tool_call_counts"], + "version": raw["version"], + "cwd": raw["cwd"], + "git_branch": raw["git_branch"], + "permission_mode": raw["permission_mode"], + "compactions": raw["compactions"], + "total_ephemeral_5m_tokens": raw["total_ephemeral_5m_tokens"], + "total_ephemeral_1h_tokens": raw["total_ephemeral_1h_tokens"], + "service_tiers": sorted(raw["service_tiers"]), + "session_wall_time_seconds": raw["session_wall_time_seconds"], + "compact_boundaries": raw["compact_boundaries"], + "api_errors": raw["api_errors"], + "files_read": sorted(raw["files_read"]), + "files_written": sorted(raw["files_written"]), + "files_created": sorted(raw["files_created"]), + "bash_commands": raw["bash_commands"], + "web_fetches": raw["web_fetches"], + "sidechain_messages": raw["sidechain_messages"], + "stop_reasons": raw["stop_reasons"], + "entry_counts": raw["entry_counts"], + } + + +def _entry_timestamp(entry: dict[str, Any]) -> str | None: + """Return a usable ISO timestamp string from an entry, or None.""" + ts = entry.get("timestamp") + if isinstance(ts, str) and ts: + return ts + if entry.get("type") == "file-history-snapshot": + snap = entry.get("snapshot") + if isinstance(snap, dict): + snap_ts = snap.get("timestamp") + if isinstance(snap_ts, str) and snap_ts: + return snap_ts + return None + + +def parse_session(filepath: str) -> SessionDict: + """Main entry point. Reads every line from a .jsonl file and builds up + a session dict with messages, metadata (tokens, models, tool counts), + and file/command activity.""" + session_id = os.path.basename(filepath).replace(".jsonl", "") + messages: list[MessageDict] = [] + metadata = _new_session_metadata_builder(session_id) + with open(filepath, "r", encoding="utf-8", errors="replace") as f: for line in f: line = line.strip() @@ -130,12 +183,7 @@ def parse_session(filepath: str) -> SessionDict: continue entry_type = entry.get("type") - ts = entry.get("timestamp") - # file-history-snapshot stores timestamp inside snapshot - if not ts and entry_type == "file-history-snapshot": - snap = entry.get("snapshot") - if isinstance(snap, dict): - ts = snap.get("timestamp") + ts = _entry_timestamp(entry) if ts: if metadata["first_timestamp"] is None: @@ -165,12 +213,6 @@ def parse_session(filepath: str) -> SessionDict: if type_str not in _SKIP_ENTRY_TYPES: messages.append(_fallback_message(entry, _coerce_role(type_str))) - metadata["models_used"] = sorted(metadata["models_used"]) - metadata["service_tiers"] = sorted(metadata["service_tiers"]) - metadata["files_read"] = sorted(metadata["files_read"]) - metadata["files_written"] = sorted(metadata["files_written"]) - metadata["files_created"] = sorted(metadata["files_created"]) - # Compute wall clock time first_ts = metadata["first_timestamp"] last_ts = metadata["last_timestamp"] @@ -189,13 +231,13 @@ def parse_session(filepath: str) -> SessionDict: "session_id": session_id, "title": title, "messages": messages, - "metadata": metadata, + "metadata": _finalize_session_metadata(metadata), } ) def _process_user( - entry: dict[str, Any], messages: list[MessageDict], metadata: dict[str, Any] + entry: dict[str, Any], messages: list[MessageDict], metadata: SessionMetadataBuilderDict ) -> None: """Pull out text, tool results, and session-level metadata (cwd, version, etc.) from a user entry.""" @@ -242,7 +284,7 @@ def _process_user( def _process_assistant( - entry: dict[str, Any], messages: list[MessageDict], metadata: dict[str, Any] + entry: dict[str, Any], messages: list[MessageDict], metadata: SessionMetadataBuilderDict ) -> None: """Handle assistant responses -- splits content into text, thinking blocks, and tool_use calls, and accumulates token/model/tool stats.""" @@ -338,7 +380,7 @@ def _process_assistant( def _process_system( - entry: dict[str, Any], messages: list[MessageDict], metadata: dict[str, Any] + entry: dict[str, Any], messages: list[MessageDict], metadata: SessionMetadataBuilderDict ) -> None: """Handle system entries (mostly compact_boundary markers from context compaction).""" diff --git a/utils/tool_dispatch.py b/utils/tool_dispatch.py index 7f81a6a..588e561 100644 --- a/utils/tool_dispatch.py +++ b/utils/tool_dispatch.py @@ -35,6 +35,7 @@ from collections.abc import Callable from typing import Any, cast +from models.session import SessionMetadataBuilderDict from models.tool_results import ( ToolResultDict, ToolResultUnion, @@ -240,40 +241,42 @@ def _tool_result_build_user_input(tr: ToolResultDict, base: dict[str, object]) - # ``_FILE_ACTIVITY_HANDLERS`` is the single registry; ``KNOWN_TOOL_TYPES`` is derived. -def _file_activity_read(tool_input: dict[str, Any], metadata: dict[str, Any]) -> None: +def _file_activity_read(tool_input: dict[str, Any], metadata: SessionMetadataBuilderDict) -> None: raw_fp = tool_input.get("file_path", "") fp = raw_fp if isinstance(raw_fp, str) else "" if fp: metadata["files_read"].add(fp) -def _file_activity_write(tool_input: dict[str, Any], metadata: dict[str, Any]) -> None: +def _file_activity_write(tool_input: dict[str, Any], metadata: SessionMetadataBuilderDict) -> None: raw_fp = tool_input.get("file_path", "") fp = raw_fp if isinstance(raw_fp, str) else "" if fp: metadata["files_created"].add(fp) -def _file_activity_edit(tool_input: dict[str, Any], metadata: dict[str, Any]) -> None: +def _file_activity_edit(tool_input: dict[str, Any], metadata: SessionMetadataBuilderDict) -> None: raw_fp = tool_input.get("file_path", "") fp = raw_fp if isinstance(raw_fp, str) else "" if fp: metadata["files_written"].add(fp) -def _file_activity_bash(tool_input: dict[str, Any], metadata: dict[str, Any]) -> None: +def _file_activity_bash(tool_input: dict[str, Any], metadata: SessionMetadataBuilderDict) -> None: cmd = tool_input.get("command", "") if isinstance(cmd, str) and cmd: metadata["bash_commands"].append(cmd) -def _file_activity_web(tool_input: dict[str, Any], metadata: dict[str, Any]) -> None: +def _file_activity_web(tool_input: dict[str, Any], metadata: SessionMetadataBuilderDict) -> None: url_or_query = tool_input.get("url") or tool_input.get("query", "") if isinstance(url_or_query, str) and url_or_query: metadata["web_fetches"].append(url_or_query) -_FILE_ACTIVITY_HANDLERS: dict[str, Callable[[dict[str, Any], dict[str, Any]], None] | None] = { +_FILE_ACTIVITY_HANDLERS: dict[ + str, Callable[[dict[str, Any], SessionMetadataBuilderDict], None] | None +] = { "AskUserQuestion": None, "Bash": _file_activity_bash, "Edit": _file_activity_edit, @@ -290,7 +293,7 @@ def _file_activity_web(tool_input: dict[str, Any], metadata: dict[str, Any]) -> def track_tool_file_activity( - tool_name: str, tool_input: dict[str, Any], metadata: dict[str, Any] + tool_name: str, tool_input: dict[str, Any], metadata: SessionMetadataBuilderDict ) -> None: """Record file/bash/web side effects for tools listed in ``KNOWN_TOOL_TYPES``.""" if tool_name not in KNOWN_TOOL_TYPES: diff --git a/utils/validation.py b/utils/validation.py index 9a7e73b..2ce607b 100644 --- a/utils/validation.py +++ b/utils/validation.py @@ -3,7 +3,7 @@ from typing import Any, cast, get_args from models.errors import SessionValidationError -from models.session import RoleLiteral, SessionDict +from models.session import SESSION_METADATA_REQUIRED_KEYS, RoleLiteral, SessionDict _VALID_ROLES = frozenset(get_args(RoleLiteral)) @@ -37,6 +37,37 @@ def _require_value( return val +def _require_optional_str(path: str, val: Any) -> str | None: + if val is None: + return None + if not isinstance(val, str): + raise SessionValidationError(path, f"expected str or null, got {type(val).__name__}") + return val + + +def _require_str_list(path: str, val: Any) -> list[str]: + if not isinstance(val, list): + raise SessionValidationError(path, f"expected list, got {type(val).__name__}") + for index, item in enumerate(val): + if not isinstance(item, str): + raise SessionValidationError( + f"{path}[{index}]", + f"expected str, got {type(item).__name__}", + ) + return val + + +def _validate_session_metadata(metadata: dict[str, Any]) -> None: + """Enforce SessionMetadataDict keys at the runtime boundary.""" + for key in SESSION_METADATA_REQUIRED_KEYS: + if key not in metadata: + raise SessionValidationError(f"metadata.{key}", "missing required field") + + _require_value("metadata.session_id", metadata["session_id"], str, "str") + _require_str_list("metadata.models_used", metadata["models_used"]) + _require_optional_str("metadata.first_timestamp", metadata["first_timestamp"]) + + def validate_session_dict(data: dict[str, Any]) -> SessionDict: """Validate a plain dict matches SessionDict before returning it.""" # Runtime guard for dynamic callers; mypy already types the parameter as dict. @@ -46,7 +77,8 @@ def validate_session_dict(data: dict[str, Any]) -> SessionDict: _require_field(data, "session_id", str, "str") _require_field(data, "title", str, "str") messages = _require_field(data, "messages", list, "list") - _require_field(data, "metadata", dict, "dict") + metadata = _require_field(data, "metadata", dict, "dict") + _validate_session_metadata(metadata) for index, message in enumerate(messages): path = f"messages[{index}]"