Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 127 additions & 14 deletions src/copilot_usage/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,15 @@ def _insert_session_entry(

@dataclasses.dataclass(frozen=True, slots=True)
class _CachedEvents:
"""Cache entry pairing a file identity with parsed events."""
"""Cache entry pairing a file identity with parsed events.

``end_offset`` is the byte position after the last successfully
parsed event. When the file grows (append-only), only bytes after
``end_offset`` need to be parsed — avoiding a full re-read.
Comment thread
microsasa marked this conversation as resolved.
Outdated
"""

file_id: tuple[int, int] | None
end_offset: int
events: tuple[SessionEvent, ...]


Expand All @@ -112,15 +118,16 @@ class _CachedEvents:
def _insert_events_entry(
events_path: Path,
file_id: tuple[int, int] | None,
events: list[SessionEvent],
events: list[SessionEvent] | tuple[SessionEvent, ...],
end_offset: int = 0,
) -> None:
"""Insert parsed events into ``_EVENTS_CACHE`` with LRU eviction.

If *events_path* already exists in the cache (stale file-id), the
old entry is removed first. Otherwise, when the cache is full the
least-recently-used entry (front of the ``OrderedDict``) is evicted.

The *events* list is converted to a ``tuple`` before storage so
The *events* sequence is converted to a ``tuple`` before storage so
that callers cannot accidentally add, remove, or reorder entries
in the cache. This is **container-level** immutability only —
individual ``SessionEvent`` objects remain mutable and must not
Expand All @@ -130,7 +137,80 @@ def _insert_events_entry(
del _EVENTS_CACHE[events_path]
elif len(_EVENTS_CACHE) >= _MAX_CACHED_EVENTS:
_EVENTS_CACHE.popitem(last=False) # evict LRU (front)
_EVENTS_CACHE[events_path] = _CachedEvents(file_id=file_id, events=tuple(events))
stored = events if isinstance(events, tuple) else tuple(events)
_EVENTS_CACHE[events_path] = _CachedEvents(
file_id=file_id, end_offset=end_offset, events=stored
)


def _parse_events_from_offset(
events_path: Path, offset: int
) -> tuple[list[SessionEvent], int]:
"""Parse events from *events_path* starting at byte *offset*.

Only lines beginning at or after *offset* are JSON-decoded and
Pydantic-validated. Complete malformed or invalid lines are skipped
with a warning, matching the behaviour of :func:`parse_events`.

Lines without a trailing newline (possible incomplete write) that
fail JSON decoding are treated as still-in-progress and stop
parsing — the returned *safe_end* does not advance past them so
the caller can retry on the next refresh.

Returns:
``(new_events, safe_end)`` where *safe_end* is the byte
position after the last fully consumed line. Callers should
store this as ``end_offset`` so incomplete trailing lines are
retried on the next refresh.

Raises:
OSError: If the file cannot be opened or read.
"""
new_events: list[SessionEvent] = []
safe_offset = offset
try:
with events_path.open("rb") as fh:
fh.seek(offset)
current_offset = offset
for raw_line in fh:
line_start = current_offset
current_offset += len(raw_line)
stripped = raw_line.strip()
if not stripped:
safe_offset = current_offset
continue
try:
raw = json.loads(stripped)
except json.JSONDecodeError:
if not raw_line.endswith(b"\n"):
# Possibly incomplete write — stop and retry later
break
logger.warning(
"{}:offset {} — malformed JSON, skipping",
events_path,
line_start,
)
Comment thread
microsasa marked this conversation as resolved.
safe_offset = current_offset
continue
try:
new_events.append(SessionEvent.model_validate(raw))
except ValidationError as exc:
logger.warning(
"{}:offset {} — validation error ({}), skipping",
events_path,
line_start,
exc.error_count(),
)
safe_offset = current_offset
except UnicodeDecodeError as exc:
logger.warning(
"{} — UTF-8 decode error at offset {}; returning {} new events (partial): {}",
events_path,
safe_offset,
len(new_events),
exc,
)
Comment thread
microsasa marked this conversation as resolved.
return new_events, safe_offset


def get_cached_events(events_path: Path) -> tuple[SessionEvent, ...]:
Expand All @@ -142,6 +222,12 @@ def get_cached_events(events_path: Path) -> tuple[SessionEvent, ...]:
entries; the **least-recently used** entry is evicted when the
limit is reached.

When the file has grown since the last read (append-only pattern),
only the newly appended bytes are parsed via
:func:`_parse_events_from_offset` and merged with the cached tuple.
A full reparse is performed when the file has shrunk (truncation or
replacement) or on cold start.
Comment thread
microsasa marked this conversation as resolved.
Outdated

The returned ``tuple`` prevents callers from adding, removing, or
reordering cached entries (container-level immutability). Individual
``SessionEvent`` objects are **not** deep-copied and must not be
Expand All @@ -154,11 +240,33 @@ def get_cached_events(events_path: Path) -> tuple[SessionEvent, ...]:
"""
file_id = _safe_file_identity(events_path)
cached = _EVENTS_CACHE.get(events_path)
if cached is not None and cached.file_id == file_id:
_EVENTS_CACHE.move_to_end(events_path)
return cached.events
events = parse_events(events_path)
_insert_events_entry(events_path, file_id, events)

if cached is not None and file_id is not None:
new_size = file_id[1]
if cached.file_id == file_id:
_EVENTS_CACHE.move_to_end(events_path)
return cached.events
# Append-only growth: new size ≥ cached end_offset → incremental
if new_size >= cached.end_offset and cached.end_offset > 0:
Comment thread
microsasa marked this conversation as resolved.
Outdated
new_events, safe_end = _parse_events_from_offset(
events_path, cached.end_offset
)
merged = cached.events + tuple(new_events)
Comment thread
microsasa marked this conversation as resolved.
_insert_events_entry(events_path, file_id, merged, safe_end)
return _EVENTS_CACHE[events_path].events

# Full reparse: cold start, truncation, or unknown file.
# Use _parse_events_from_offset(offset=0) instead of parse_events() so
# we get a safe_end byte boundary that reflects only bytes actually
# consumed. A post-parse stat() could observe bytes appended after
# parsing completed, overstating the consumed boundary and causing
# later incremental refreshes to skip unparsed data.
events, safe_end = _parse_events_from_offset(events_path, 0)
# Re-stat so the cached file_id reflects the current mtime/size for
# exact-match lookups; end_offset is derived from safe_end, not size.
post_id = _safe_file_identity(events_path)
stored_id = post_id if post_id is not None else file_id
Comment thread
microsasa marked this conversation as resolved.
Outdated
_insert_events_entry(events_path, stored_id, events, safe_end)
return _EVENTS_CACHE[events_path].events


Expand Down Expand Up @@ -783,7 +891,9 @@ def get_all_sessions(base_path: Path | None = None) -> list[SessionSummary]:
# Only the newest _MAX_CACHED_EVENTS entries are retained for
# _EVENTS_CACHE to avoid a temporary memory spike when many sessions
# are cache-misses.
deferred_events: list[tuple[Path, tuple[int, int] | None, list[SessionEvent]]] = []
deferred_events: list[
tuple[Path, tuple[int, int] | None, list[SessionEvent], int]
] = []
deferred_sessions: list[tuple[Path, _CachedSession]] = []
cache_hit_paths: list[Path] = []
for events_path, file_id, plan_id in discovered:
Expand Down Expand Up @@ -819,14 +929,14 @@ def get_all_sessions(base_path: Path | None = None) -> list[SessionSummary]:
summaries.append(summary)
continue
try:
events = parse_events(events_path)
events, safe_end = _parse_events_from_offset(events_path, 0)
except OSError as exc:
Comment thread
microsasa marked this conversation as resolved.
Comment thread
microsasa marked this conversation as resolved.
logger.warning("Skipping unreadable session {}: {}", events_path, exc)
continue
if not events:
continue
if len(deferred_events) < _MAX_CACHED_EVENTS:
deferred_events.append((events_path, file_id, events))
deferred_events.append((events_path, file_id, events, safe_end))
meta = _build_session_summary_with_meta(
events,
session_dir=events_path.parent,
Expand Down Expand Up @@ -861,8 +971,11 @@ def get_all_sessions(base_path: Path | None = None) -> list[SessionSummary]:

# Populate _EVENTS_CACHE in oldest→newest order so that the newest
# sessions sit at the back (MRU) and eviction drops the oldest.
for ep, fid, evts in reversed(deferred_events):
_insert_events_entry(ep, fid, evts)
for ep, fid, evts, safe_end in reversed(deferred_events):
# Use the safe_end byte boundary returned by the parser rather
# than a post-parse stat() size, which can overstate the bytes
# actually consumed if the file grew after parsing completed.
_insert_events_entry(ep, fid, evts, safe_end)

# Prune stale cache entries for sessions no longer on disk.
discovered_paths = {p for p, _, _ in discovered}
Expand Down
Loading
Loading