Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 9 additions & 32 deletions api/export_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from utils.session_stats import compute_stats
from utils.md_exporter import session_to_markdown
from utils.json_exporter import session_to_json
from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules
from utils.exclusion_rules import is_session_excluded

export_bp = Blueprint("export", __name__)

Expand All @@ -40,16 +40,6 @@ def _write_state(sessions_map: dict, count: int):
json.dump(state, f, indent=2)


def _session_text_for_exclusion(session: dict) -> str:
"""Extract a plain-text snippet from session messages for exclusion matching."""
parts = []
for msg in session.get("messages", []):
text = msg.get("text") or ""
if isinstance(text, str) and text.strip():
parts.append(text)
return "\n\n".join(parts)


@export_bp.route("/api/export/state")
def get_export_state():
state = _read_state()
Expand Down Expand Up @@ -98,16 +88,12 @@ def bulk_export():
if session["title"] == "Untitled Session":
continue

if rules:
meta = session["metadata"]
searchable = build_searchable_text(
project_name=project.get("display_name") or project["name"],
session_title=session["title"],
model_names=list(meta.get("models_used") or []),
content_snippet=_session_text_for_exclusion(session),
)
if is_excluded_by_rules(rules, searchable):
continue
if is_session_excluded(
rules,
session,
project.get("display_name") or project["name"],
):
continue

stats = compute_stats(session)
md = session_to_markdown(session, stats)
Expand Down Expand Up @@ -166,17 +152,8 @@ def export_session(project_name, session_id):
fmt = request.args.get("format", "md")
session = parse_session(filepath)
rules = current_app.config.get("EXCLUSION_RULES") or []
if rules:
meta = session["metadata"]
text_parts = [msg.get("text") or "" for msg in session.get("messages", []) if msg.get("text")]
searchable = build_searchable_text(
project_name=project_name,
session_title=session["title"],
model_names=list(meta.get("models_used") or []),
content_snippet="\n\n".join(text_parts),
)
if is_excluded_by_rules(rules, searchable):
return jsonify({"error": "Session not found"}), 404
if is_session_excluded(rules, session, project_name):
return jsonify({"error": "Session not found"}), 404
stats = compute_stats(session)
title_slug = _slugify(session["title"]) or "session"

Expand Down
14 changes: 3 additions & 11 deletions api/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from flask import Blueprint, current_app, jsonify

from utils.session_path import get_claude_projects_dir, list_projects, list_sessions, safe_join
from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules
from utils.exclusion_rules import is_session_excluded

projects_bp = Blueprint("projects", __name__)

Expand Down Expand Up @@ -60,16 +60,8 @@ def get_project_sessions(project_name):
# Skip untitled sessions (no real conversation)
if parsed["title"] == "Untitled Session":
continue
if rules:
text_parts = [msg.get("text") or "" for msg in parsed.get("messages", []) if msg.get("text")]
searchable = build_searchable_text(
project_name=project_name,
session_title=parsed["title"],
model_names=list(meta.get("models_used") or []),
content_snippet="\n\n".join(text_parts),
)
if is_excluded_by_rules(rules, searchable):
continue
if is_session_excluded(rules, parsed, project_name):
continue
result.append({
**s,
"title": parsed["title"],
Expand Down
15 changes: 3 additions & 12 deletions api/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from utils.session_path import get_claude_projects_dir, list_projects, list_sessions
from utils.jsonl_parser import parse_session
from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules
from utils.exclusion_rules import is_session_excluded

search_bp = Blueprint("search", __name__)

Expand All @@ -33,17 +33,8 @@ def search():
except Exception:
continue

if rules:
meta = session["metadata"]
text_parts = [msg.get("text") or "" for msg in session.get("messages", []) if msg.get("text")]
searchable = build_searchable_text(
project_name=project["name"],
session_title=session["title"],
model_names=list(meta.get("models_used") or []),
content_snippet="\n\n".join(text_parts),
)
if is_excluded_by_rules(rules, searchable):
continue
if is_session_excluded(rules, session, project["name"]):
continue

for msg in session["messages"]:
text = msg.get("text", "") or msg.get("content", "")
Expand Down
15 changes: 3 additions & 12 deletions api/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from utils.session_path import get_claude_projects_dir, safe_join
from utils.jsonl_parser import parse_session
from utils.session_stats import compute_stats
from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules
from utils.exclusion_rules import is_session_excluded

sessions_bp = Blueprint("sessions", __name__)

Expand All @@ -27,17 +27,8 @@ def get_session(project_name, session_id):
try:
session = parse_session(filepath)
rules = current_app.config.get("EXCLUSION_RULES") or []
if rules:
meta = session["metadata"]
text_parts = [msg.get("text") or "" for msg in session.get("messages", []) if msg.get("text")]
searchable = build_searchable_text(
project_name=project_name,
session_title=session["title"],
model_names=list(meta.get("models_used") or []),
content_snippet="\n\n".join(text_parts),
)
if is_excluded_by_rules(rules, searchable):
return jsonify({"error": "Session not found"}), 404
if is_session_excluded(rules, session, project_name):
return jsonify({"error": "Session not found"}), 404
return jsonify(session)
except Exception as e:
tb = traceback.format_exc()
Expand Down
31 changes: 8 additions & 23 deletions scripts/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
from utils.exclusion_rules import (
resolve_exclusion_rules_path,
load_rules,
build_searchable_text,
is_excluded_by_rules,
is_session_excluded,
)


Expand Down Expand Up @@ -348,17 +347,13 @@ def cmd_export(args):
skipped += 1
continue

if rules:
meta = session["metadata"]
searchable = build_searchable_text(
project_name=project.get("display_name") or project["name"],
session_title=session["title"],
model_names=list(meta.get("models_used") or []),
content_snippet=_session_text_for_exclusion(session),
)
if is_excluded_by_rules(rules, searchable):
skipped += 1
continue
if is_session_excluded(
rules,
session,
project.get("display_name") or project["name"],
):
skipped += 1
continue

stats = compute_stats(session)
meta = session["metadata"]
Expand Down Expand Up @@ -473,16 +468,6 @@ def _export_single(session: dict, stats: dict, fmt: str, out_dir: str):
# ==================== Helpers ====================


def _session_text_for_exclusion(session: dict) -> str:
"""Extract plain text from all session messages for exclusion rule matching."""
parts = []
for msg in session.get("messages", []):
text = msg.get("text") or ""
if isinstance(text, str) and text.strip():
parts.append(text)
return "\n\n".join(parts)


# ==================== Argument Parser ====================


Expand Down
158 changes: 158 additions & 0 deletions tests/test_exclusion_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""
Unit tests for the consolidated exclusion-rule helpers introduced in issue #23:

- ``session_text_for_exclusion`` — moved from a duplicate-defined private helper
in ``scripts/export.py`` and ``api/export_api.py`` into ``utils/exclusion_rules``.
- ``is_session_excluded`` — wraps the previously-inlined "extract text →
build_searchable_text → is_excluded_by_rules" pattern that was repeated
across six call sites.

Both functions are pure and dependency-free, so they're tested directly without
booting Flask or any of the API blueprints.

Run:
pytest tests/test_exclusion_helpers.py -v
"""

from __future__ import annotations

import sys
from pathlib import Path

import pytest

REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT))

from utils.exclusion_rules import (
is_session_excluded,
load_rules,
session_text_for_exclusion,
)


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _write_rules(tmp_path, *lines: str) -> str:
"""Write rules file and return its path. Tokenized by load_rules."""
p = tmp_path / "exclusion-rules.txt"
p.write_text("\n".join(lines), encoding="utf-8")
return str(p)


def _session(*, title: str = "session", models: list[str] | None = None,
messages: list[dict] | None = None) -> dict:
return {
"title": title,
"metadata": {"models_used": models or []},
"messages": messages or [],
}


# ---------------------------------------------------------------------------
# session_text_for_exclusion
# ---------------------------------------------------------------------------

class TestSessionTextForExclusion:

def test_empty_session(self):
assert session_text_for_exclusion({}) == ""

def test_session_with_no_messages(self):
assert session_text_for_exclusion({"messages": []}) == ""

def test_joins_message_text_with_blank_lines(self):
s = _session(messages=[{"text": "alpha"}, {"text": "beta"}])
assert session_text_for_exclusion(s) == "alpha\n\nbeta"

def test_skips_messages_without_text(self):
s = _session(messages=[{"text": "alpha"}, {"role": "tool"}, {"text": "gamma"}])
assert session_text_for_exclusion(s) == "alpha\n\ngamma"

def test_skips_whitespace_only_text(self):
# Regression: this is the inconsistency the consolidation fixed —
# the helper rejects whitespace-only strings, the previous inline
# variants didn't. The helper version is now canonical.
s = _session(messages=[
{"text": "alpha"},
{"text": " "}, # whitespace-only — should be skipped
{"text": "\n\t\n"}, # whitespace-only — should be skipped
{"text": "beta"},
])
assert session_text_for_exclusion(s) == "alpha\n\nbeta"

def test_skips_non_string_text(self):
s = _session(messages=[{"text": "alpha"}, {"text": 42}, {"text": None}, {"text": "beta"}])
assert session_text_for_exclusion(s) == "alpha\n\nbeta"


# ---------------------------------------------------------------------------
# is_session_excluded
# ---------------------------------------------------------------------------

class TestIsSessionExcluded:

def test_returns_false_when_rules_empty(self, tmp_path):
s = _session(title="anything", messages=[{"text": "anything"}])
assert is_session_excluded([], s, "any project") is False
assert is_session_excluded(None, s, "any project") is False # type: ignore[arg-type]

def test_matches_on_project_name(self, tmp_path):
rules = load_rules(_write_rules(tmp_path, "secret-project"))
s = _session()
assert is_session_excluded(rules, s, "my secret-project work") is True
assert is_session_excluded(rules, s, "unrelated work") is False

def test_matches_on_session_title(self, tmp_path):
rules = load_rules(_write_rules(tmp_path, "confidential"))
assert is_session_excluded(rules, _session(title="Confidential debrief"), "proj") is True
assert is_session_excluded(rules, _session(title="Public roadmap"), "proj") is False

def test_matches_on_model_name(self, tmp_path):
rules = load_rules(_write_rules(tmp_path, "claude-opus-4-7"))
s = _session(models=["claude-opus-4-7", "claude-haiku-4-5"])
assert is_session_excluded(rules, s, "proj") is True

def test_matches_on_message_content(self, tmp_path):
rules = load_rules(_write_rules(tmp_path, "password"))
s = _session(messages=[{"text": "do not commit the password"}])
assert is_session_excluded(rules, s, "proj") is True

def test_AND_rule_requires_both_terms(self, tmp_path):
# AND has higher precedence than OR (per the rule grammar).
rules = load_rules(_write_rules(tmp_path, "alpha AND beta"))
s_both = _session(messages=[{"text": "alpha and beta together"}])
s_one = _session(messages=[{"text": "only alpha here"}])
assert is_session_excluded(rules, s_both, "proj") is True
assert is_session_excluded(rules, s_one, "proj") is False

def test_OR_rule_matches_either(self, tmp_path):
rules = load_rules(_write_rules(tmp_path, "alpha OR beta"))
s_alpha = _session(messages=[{"text": "alpha here"}])
s_beta = _session(messages=[{"text": "beta here"}])
s_neither = _session(messages=[{"text": "gamma here"}])
assert is_session_excluded(rules, s_alpha, "proj") is True
assert is_session_excluded(rules, s_beta, "proj") is True
assert is_session_excluded(rules, s_neither, "proj") is False

def test_quoted_phrase_match(self, tmp_path):
rules = load_rules(_write_rules(tmp_path, '"project alpha"'))
s_match = _session(title="Project alpha kickoff")
s_partial = _session(title="alpha project") # token order matters
assert is_session_excluded(rules, s_match, "proj") is True
assert is_session_excluded(rules, s_partial, "proj") is False

def test_handles_session_without_metadata(self, tmp_path):
# Defensive: session dicts coming from older code paths might be
# missing a metadata key. Should not raise.
rules = load_rules(_write_rules(tmp_path, "anything"))
bare = {"title": "x", "messages": []} # no metadata key at all
assert is_session_excluded(rules, bare, "proj") is False

def test_project_name_None_does_not_break(self, tmp_path):
rules = load_rules(_write_rules(tmp_path, "confidential"))
s = _session(title="Confidential")
# project_name=None should still let title-based rules match.
assert is_session_excluded(rules, s, None) is True
Loading
Loading