Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 85 additions & 9 deletions src/copilot_usage/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,15 @@ def _insert_session_entry(

@dataclasses.dataclass(frozen=True, slots=True)
class _CachedEvents:
"""Cache entry pairing a file identity with parsed events."""
"""Cache entry pairing a file identity with parsed events.

``end_offset`` is the byte position after the last successfully
parsed event. When the file grows (append-only), only bytes after
``end_offset`` need to be parsed — avoiding a full re-read.
Comment thread
microsasa marked this conversation as resolved.
Outdated
"""

file_id: tuple[int, int] | None
end_offset: int
events: tuple[SessionEvent, ...]


Expand All @@ -112,15 +118,16 @@ class _CachedEvents:
def _insert_events_entry(
events_path: Path,
file_id: tuple[int, int] | None,
events: list[SessionEvent],
events: list[SessionEvent] | tuple[SessionEvent, ...],
end_offset: int = 0,
) -> None:
"""Insert parsed events into ``_EVENTS_CACHE`` with LRU eviction.

If *events_path* already exists in the cache (stale file-id), the
old entry is removed first. Otherwise, when the cache is full the
least-recently-used entry (front of the ``OrderedDict``) is evicted.

The *events* list is converted to a ``tuple`` before storage so
The *events* sequence is converted to a ``tuple`` before storage so
that callers cannot accidentally add, remove, or reorder entries
in the cache. This is **container-level** immutability only —
individual ``SessionEvent`` objects remain mutable and must not
Expand All @@ -130,7 +137,57 @@ def _insert_events_entry(
del _EVENTS_CACHE[events_path]
elif len(_EVENTS_CACHE) >= _MAX_CACHED_EVENTS:
_EVENTS_CACHE.popitem(last=False) # evict LRU (front)
_EVENTS_CACHE[events_path] = _CachedEvents(file_id=file_id, events=tuple(events))
stored = events if isinstance(events, tuple) else tuple(events)
_EVENTS_CACHE[events_path] = _CachedEvents(
file_id=file_id, end_offset=end_offset, events=stored
)


def _parse_events_from_offset(events_path: Path, offset: int) -> list[SessionEvent]:
"""Parse events from *events_path* starting at byte *offset*.

Only lines beginning at or after *offset* are JSON-decoded and
Pydantic-validated. Malformed or invalid lines are skipped with a
warning, matching the behaviour of :func:`parse_events`.

Raises:
OSError: If the file cannot be opened or read.
"""
new_events: list[SessionEvent] = []
try:
with events_path.open("rb") as fh:
fh.seek(offset)
for raw_line in fh:
stripped = raw_line.strip()
if not stripped:
continue
try:
raw = json.loads(stripped)
except json.JSONDecodeError:
logger.warning(
"{}:offset {} — malformed JSON, skipping",
events_path,
offset,
)
Comment thread
microsasa marked this conversation as resolved.
continue
try:
new_events.append(SessionEvent.model_validate(raw))
except ValidationError as exc:
logger.warning(
"{}:offset {} — validation error ({}), skipping",
events_path,
offset,
exc.error_count(),
)
except UnicodeDecodeError as exc:
logger.warning(
"{} — UTF-8 decode error at offset {}; returning {} new events (partial): {}",
events_path,
offset,
len(new_events),
exc,
)
Comment thread
microsasa marked this conversation as resolved.
return new_events


def get_cached_events(events_path: Path) -> tuple[SessionEvent, ...]:
Expand All @@ -142,6 +199,12 @@ def get_cached_events(events_path: Path) -> tuple[SessionEvent, ...]:
entries; the **least-recently used** entry is evicted when the
limit is reached.

When the file has grown since the last read (append-only pattern),
only the newly appended bytes are parsed via
:func:`_parse_events_from_offset` and merged with the cached tuple.
A full reparse is performed when the file has shrunk (truncation or
replacement) or on cold start.
Comment thread
microsasa marked this conversation as resolved.
Outdated

The returned ``tuple`` prevents callers from adding, removing, or
reordering cached entries (container-level immutability). Individual
``SessionEvent`` objects are **not** deep-copied and must not be
Expand All @@ -154,11 +217,23 @@ def get_cached_events(events_path: Path) -> tuple[SessionEvent, ...]:
"""
file_id = _safe_file_identity(events_path)
cached = _EVENTS_CACHE.get(events_path)
if cached is not None and cached.file_id == file_id:
_EVENTS_CACHE.move_to_end(events_path)
return cached.events

if cached is not None and file_id is not None:
new_size = file_id[1]
if cached.file_id == file_id:
_EVENTS_CACHE.move_to_end(events_path)
return cached.events
# Append-only growth: new size ≥ cached end_offset → incremental
if new_size >= cached.end_offset and cached.end_offset > 0:
Comment thread
microsasa marked this conversation as resolved.
Outdated
new_events = _parse_events_from_offset(events_path, cached.end_offset)
merged = cached.events + tuple(new_events)
Comment thread
microsasa marked this conversation as resolved.
_insert_events_entry(events_path, file_id, merged, new_size)
return _EVENTS_CACHE[events_path].events
Comment thread
microsasa marked this conversation as resolved.
Outdated

# Full reparse: cold start, truncation, or unknown file
events = parse_events(events_path)
_insert_events_entry(events_path, file_id, events)
end_offset = file_id[1] if file_id is not None else 0
_insert_events_entry(events_path, file_id, events, end_offset)
Comment thread
microsasa marked this conversation as resolved.
Outdated
return _EVENTS_CACHE[events_path].events


Expand Down Expand Up @@ -862,7 +937,8 @@ def get_all_sessions(base_path: Path | None = None) -> list[SessionSummary]:
# Populate _EVENTS_CACHE in oldest→newest order so that the newest
# sessions sit at the back (MRU) and eviction drops the oldest.
for ep, fid, evts in reversed(deferred_events):
_insert_events_entry(ep, fid, evts)
end_off = fid[1] if fid is not None else 0
_insert_events_entry(ep, fid, evts, end_off)
Comment thread
microsasa marked this conversation as resolved.
Outdated

# Prune stale cache entries for sessions no longer on disk.
discovered_paths = {p for p, _, _ in discovered}
Expand Down
139 changes: 139 additions & 0 deletions tests/copilot_usage/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5664,6 +5664,145 @@ def test_oserror_propagated_on_missing_file(self, tmp_path: Path) -> None:
get_cached_events(missing)


# ---------------------------------------------------------------------------
# Issue #732 — incremental append-only parsing in get_cached_events
# ---------------------------------------------------------------------------


class TestIncrementalEventsParsing:
"""Verify that get_cached_events incrementally parses only newly
appended events instead of re-reading the entire file.
"""

def test_incremental_parse_only_validates_new_events(self, tmp_path: Path) -> None:
"""Appending 10 events to a 5 000-event file triggers Pydantic
validation only for the new events, not the full file.

Patches ``SessionEvent.model_validate`` with a counter to confirm
the incremental path was taken.
"""
p = tmp_path / "s1" / "events.jsonl"
initial_count = 5_000
_write_large_events_file(p, initial_count)

# Prime the cache with a cold read
first = get_cached_events(p)
assert len(first) == initial_count + 1 # 1 start + 5000 user messages

# Append 10 new events
append_count = 10
with p.open("a", encoding="utf-8") as fh:
for i in range(initial_count, initial_count + append_count):
fh.write(_make_user_event(i) + "\n")

# Patch model_validate to count calls during incremental parse
original_validate = SessionEvent.model_validate
validate_calls: list[int] = [0]

def counting_validate(
obj: object,
*args: object,
**kwargs: object,
) -> SessionEvent:
validate_calls[0] += 1
return original_validate(obj, *args, **kwargs) # type: ignore[arg-type]

with patch.object(
SessionEvent, "model_validate", side_effect=counting_validate
):
second = get_cached_events(p)

# Only the 10 new events should have been validated
assert validate_calls[0] == append_count
# Total should include all events
assert len(second) == initial_count + 1 + append_count

def test_incremental_parse_returns_all_events(self, tmp_path: Path) -> None:
"""After incremental parse, the returned tuple contains every event."""
p = tmp_path / "s1" / "events.jsonl"
_write_events(p, _START_EVENT, _USER_MSG)

first = get_cached_events(p)
assert len(first) == 2

with p.open("a", encoding="utf-8") as fh:
fh.write(_ASSISTANT_MSG + "\n")

second = get_cached_events(p)
assert len(second) == 3
# Original events are preserved
assert second[0].type == "session.start"
assert second[1].type == "user.message"
assert second[2].type == "assistant.message"

def test_truncated_file_triggers_full_reparse(self, tmp_path: Path) -> None:
"""If the file shrinks, the cache falls back to a full reparse."""
p = tmp_path / "s1" / "events.jsonl"
_write_events(p, _START_EVENT, _USER_MSG, _ASSISTANT_MSG)

first = get_cached_events(p)
assert len(first) == 3

# Overwrite with a shorter file (simulates truncation)
_write_events(p, _START_EVENT)

with patch("copilot_usage.parser.parse_events", wraps=parse_events) as spy:
second = get_cached_events(p)
assert spy.call_count == 1 # full reparse
Comment thread
microsasa marked this conversation as resolved.
assert len(second) == 1

def test_incremental_does_not_call_parse_events(self, tmp_path: Path) -> None:
"""The incremental path bypasses parse_events entirely."""
p = tmp_path / "s1" / "events.jsonl"
_write_large_events_file(p, 100)

get_cached_events(p) # prime

with p.open("a", encoding="utf-8") as fh:
fh.write(_make_user_event(999) + "\n")

with patch("copilot_usage.parser.parse_events", wraps=parse_events) as spy:
result = get_cached_events(p)
assert spy.call_count == 0
Comment thread
microsasa marked this conversation as resolved.
Outdated
assert len(result) == 102 # 1 start + 100 + 1 appended

def test_cache_entry_stores_end_offset(self, tmp_path: Path) -> None:
"""After a call, _EVENTS_CACHE entry has end_offset == file size."""
p = tmp_path / "s1" / "events.jsonl"
_write_events(p, _START_EVENT, _USER_MSG)
expected_size = p.stat().st_size

get_cached_events(p)

entry = _EVENTS_CACHE[p]
assert entry.end_offset == expected_size

def test_incremental_updates_end_offset(self, tmp_path: Path) -> None:
"""After incremental parse, end_offset reflects the new file size."""
p = tmp_path / "s1" / "events.jsonl"
_write_events(p, _START_EVENT, _USER_MSG)
get_cached_events(p)

with p.open("a", encoding="utf-8") as fh:
fh.write(_ASSISTANT_MSG + "\n")
new_size = p.stat().st_size

get_cached_events(p)

entry = _EVENTS_CACHE[p]
assert entry.end_offset == new_size

def test_cold_start_full_reparse(self, tmp_path: Path) -> None:
"""A cold cache (no prior entry) always does a full reparse."""
p = tmp_path / "s1" / "events.jsonl"
_write_events(p, _START_EVENT, _USER_MSG, _ASSISTANT_MSG)

with patch("copilot_usage.parser.parse_events", wraps=parse_events) as spy:
result = get_cached_events(p)
assert spy.call_count == 1
Comment thread
microsasa marked this conversation as resolved.
Outdated
assert len(result) == 3


# ---------------------------------------------------------------------------
# Issue #668 — get_all_sessions populates _EVENTS_CACHE
# ---------------------------------------------------------------------------
Expand Down
Loading