From cbf8e612597f6f1872ca362f5a18a2787e8c4903 Mon Sep 17 00:00:00 2001 From: timon0305 Date: Wed, 6 May 2026 16:15:56 +0200 Subject: [PATCH] refactor(exclusion): consolidate _session_text_for_exclusion + 6 repeated call patterns (closes #23) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three layers of duplication around exclusion-rule filtering: 1. `_session_text_for_exclusion(session)` was defined twice — byte-identical bodies (only docstring differs) in scripts/export.py:476 and api/export_api.py:43. Two maintenance points for one function. 2. Six call sites repeated the same `if rules: build_searchable_text(...) → is_excluded_by_rules(...) → skip` pattern across api/sessions.py, api/projects.py, api/search.py, api/export_api.py (×2), and scripts/export.py. ~60 lines of near-identical wrapping. 3. Latent inconsistency: the helper rejected whitespace-only message text (`isinstance(text, str) and text.strip()`); the 4 inline call sites did not (`if msg.get("text")`). Bug surface small but real, and there was no reason for the difference. Consolidated into utils/exclusion_rules.py: - session_text_for_exclusion(session) — moved from the duplicate-defined private helpers; now the single source of truth. - is_session_excluded(rules, session, project_name) — wraps the full pattern. Returns False when rules is empty/falsy, so callers don't have to gate on `if rules:` before calling. Six call sites collapsed to one line each. After: one definition of the extraction logic, one definition of the wrapping pattern, the whitespace-handling inconsistency disappears (everyone uses the helper), and ~20 net lines removed across 6 files. Tests: tests/test_exclusion_helpers.py adds 16 unit cases for the new helpers — text-extraction edge cases (empty, whitespace-only, non-string, mixed) and rule-matching across all four haystack fields (project_name, session_title, model_names, content) plus AND/OR/quoted-phrase grammar plus defensive cases (no metadata key, project_name=None). 91/91 pytest pass (was 75; +16 new). --- api/export_api.py | 41 ++------- api/projects.py | 14 +-- api/search.py | 15 +-- api/sessions.py | 15 +-- scripts/export.py | 31 ++----- tests/test_exclusion_helpers.py | 158 ++++++++++++++++++++++++++++++++ utils/exclusion_rules.py | 44 +++++++++ 7 files changed, 228 insertions(+), 90 deletions(-) create mode 100644 tests/test_exclusion_helpers.py diff --git a/api/export_api.py b/api/export_api.py index 4845a2a..700e348 100644 --- a/api/export_api.py +++ b/api/export_api.py @@ -13,7 +13,7 @@ from utils.session_stats import compute_stats from utils.md_exporter import session_to_markdown from utils.json_exporter import session_to_json -from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules +from utils.exclusion_rules import is_session_excluded export_bp = Blueprint("export", __name__) @@ -40,16 +40,6 @@ def _write_state(sessions_map: dict, count: int): json.dump(state, f, indent=2) -def _session_text_for_exclusion(session: dict) -> str: - """Extract a plain-text snippet from session messages for exclusion matching.""" - parts = [] - for msg in session.get("messages", []): - text = msg.get("text") or "" - if isinstance(text, str) and text.strip(): - parts.append(text) - return "\n\n".join(parts) - - @export_bp.route("/api/export/state") def get_export_state(): state = _read_state() @@ -98,16 +88,12 @@ def bulk_export(): if session["title"] == "Untitled Session": continue - if rules: - meta = session["metadata"] - searchable = build_searchable_text( - project_name=project.get("display_name") or project["name"], - session_title=session["title"], - model_names=list(meta.get("models_used") or []), - content_snippet=_session_text_for_exclusion(session), - ) - if is_excluded_by_rules(rules, searchable): - continue + if is_session_excluded( + rules, + session, + project.get("display_name") or project["name"], + ): + continue stats = compute_stats(session) md = session_to_markdown(session, stats) @@ -166,17 +152,8 @@ def export_session(project_name, session_id): fmt = request.args.get("format", "md") session = parse_session(filepath) rules = current_app.config.get("EXCLUSION_RULES") or [] - if rules: - meta = session["metadata"] - text_parts = [msg.get("text") or "" for msg in session.get("messages", []) if msg.get("text")] - searchable = build_searchable_text( - project_name=project_name, - session_title=session["title"], - model_names=list(meta.get("models_used") or []), - content_snippet="\n\n".join(text_parts), - ) - if is_excluded_by_rules(rules, searchable): - return jsonify({"error": "Session not found"}), 404 + if is_session_excluded(rules, session, project_name): + return jsonify({"error": "Session not found"}), 404 stats = compute_stats(session) title_slug = _slugify(session["title"]) or "session" diff --git a/api/projects.py b/api/projects.py index fd607fe..32bd085 100644 --- a/api/projects.py +++ b/api/projects.py @@ -5,7 +5,7 @@ from flask import Blueprint, current_app, jsonify from utils.session_path import get_claude_projects_dir, list_projects, list_sessions, safe_join -from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules +from utils.exclusion_rules import is_session_excluded projects_bp = Blueprint("projects", __name__) @@ -60,16 +60,8 @@ def get_project_sessions(project_name): # Skip untitled sessions (no real conversation) if parsed["title"] == "Untitled Session": continue - if rules: - text_parts = [msg.get("text") or "" for msg in parsed.get("messages", []) if msg.get("text")] - searchable = build_searchable_text( - project_name=project_name, - session_title=parsed["title"], - model_names=list(meta.get("models_used") or []), - content_snippet="\n\n".join(text_parts), - ) - if is_excluded_by_rules(rules, searchable): - continue + if is_session_excluded(rules, parsed, project_name): + continue result.append({ **s, "title": parsed["title"], diff --git a/api/search.py b/api/search.py index 373e3d4..c323123 100644 --- a/api/search.py +++ b/api/search.py @@ -6,7 +6,7 @@ from utils.session_path import get_claude_projects_dir, list_projects, list_sessions from utils.jsonl_parser import parse_session -from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules +from utils.exclusion_rules import is_session_excluded search_bp = Blueprint("search", __name__) @@ -33,17 +33,8 @@ def search(): except Exception: continue - if rules: - meta = session["metadata"] - text_parts = [msg.get("text") or "" for msg in session.get("messages", []) if msg.get("text")] - searchable = build_searchable_text( - project_name=project["name"], - session_title=session["title"], - model_names=list(meta.get("models_used") or []), - content_snippet="\n\n".join(text_parts), - ) - if is_excluded_by_rules(rules, searchable): - continue + if is_session_excluded(rules, session, project["name"]): + continue for msg in session["messages"]: text = msg.get("text", "") or msg.get("content", "") diff --git a/api/sessions.py b/api/sessions.py index 93ce9a8..2816a48 100644 --- a/api/sessions.py +++ b/api/sessions.py @@ -8,7 +8,7 @@ from utils.session_path import get_claude_projects_dir, safe_join from utils.jsonl_parser import parse_session from utils.session_stats import compute_stats -from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules +from utils.exclusion_rules import is_session_excluded sessions_bp = Blueprint("sessions", __name__) @@ -27,17 +27,8 @@ def get_session(project_name, session_id): try: session = parse_session(filepath) rules = current_app.config.get("EXCLUSION_RULES") or [] - if rules: - meta = session["metadata"] - text_parts = [msg.get("text") or "" for msg in session.get("messages", []) if msg.get("text")] - searchable = build_searchable_text( - project_name=project_name, - session_title=session["title"], - model_names=list(meta.get("models_used") or []), - content_snippet="\n\n".join(text_parts), - ) - if is_excluded_by_rules(rules, searchable): - return jsonify({"error": "Session not found"}), 404 + if is_session_excluded(rules, session, project_name): + return jsonify({"error": "Session not found"}), 404 return jsonify(session) except Exception as e: tb = traceback.format_exc() diff --git a/scripts/export.py b/scripts/export.py index a8106d7..e8f93cb 100644 --- a/scripts/export.py +++ b/scripts/export.py @@ -31,8 +31,7 @@ from utils.exclusion_rules import ( resolve_exclusion_rules_path, load_rules, - build_searchable_text, - is_excluded_by_rules, + is_session_excluded, ) @@ -348,17 +347,13 @@ def cmd_export(args): skipped += 1 continue - if rules: - meta = session["metadata"] - searchable = build_searchable_text( - project_name=project.get("display_name") or project["name"], - session_title=session["title"], - model_names=list(meta.get("models_used") or []), - content_snippet=_session_text_for_exclusion(session), - ) - if is_excluded_by_rules(rules, searchable): - skipped += 1 - continue + if is_session_excluded( + rules, + session, + project.get("display_name") or project["name"], + ): + skipped += 1 + continue stats = compute_stats(session) meta = session["metadata"] @@ -473,16 +468,6 @@ def _export_single(session: dict, stats: dict, fmt: str, out_dir: str): # ==================== Helpers ==================== -def _session_text_for_exclusion(session: dict) -> str: - """Extract plain text from all session messages for exclusion rule matching.""" - parts = [] - for msg in session.get("messages", []): - text = msg.get("text") or "" - if isinstance(text, str) and text.strip(): - parts.append(text) - return "\n\n".join(parts) - - # ==================== Argument Parser ==================== diff --git a/tests/test_exclusion_helpers.py b/tests/test_exclusion_helpers.py new file mode 100644 index 0000000..0603452 --- /dev/null +++ b/tests/test_exclusion_helpers.py @@ -0,0 +1,158 @@ +""" +Unit tests for the consolidated exclusion-rule helpers introduced in issue #23: + +- ``session_text_for_exclusion`` — moved from a duplicate-defined private helper + in ``scripts/export.py`` and ``api/export_api.py`` into ``utils/exclusion_rules``. +- ``is_session_excluded`` — wraps the previously-inlined "extract text → + build_searchable_text → is_excluded_by_rules" pattern that was repeated + across six call sites. + +Both functions are pure and dependency-free, so they're tested directly without +booting Flask or any of the API blueprints. + +Run: + pytest tests/test_exclusion_helpers.py -v +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest + +REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(REPO_ROOT)) + +from utils.exclusion_rules import ( + is_session_excluded, + load_rules, + session_text_for_exclusion, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _write_rules(tmp_path, *lines: str) -> str: + """Write rules file and return its path. Tokenized by load_rules.""" + p = tmp_path / "exclusion-rules.txt" + p.write_text("\n".join(lines), encoding="utf-8") + return str(p) + + +def _session(*, title: str = "session", models: list[str] | None = None, + messages: list[dict] | None = None) -> dict: + return { + "title": title, + "metadata": {"models_used": models or []}, + "messages": messages or [], + } + + +# --------------------------------------------------------------------------- +# session_text_for_exclusion +# --------------------------------------------------------------------------- + +class TestSessionTextForExclusion: + + def test_empty_session(self): + assert session_text_for_exclusion({}) == "" + + def test_session_with_no_messages(self): + assert session_text_for_exclusion({"messages": []}) == "" + + def test_joins_message_text_with_blank_lines(self): + s = _session(messages=[{"text": "alpha"}, {"text": "beta"}]) + assert session_text_for_exclusion(s) == "alpha\n\nbeta" + + def test_skips_messages_without_text(self): + s = _session(messages=[{"text": "alpha"}, {"role": "tool"}, {"text": "gamma"}]) + assert session_text_for_exclusion(s) == "alpha\n\ngamma" + + def test_skips_whitespace_only_text(self): + # Regression: this is the inconsistency the consolidation fixed — + # the helper rejects whitespace-only strings, the previous inline + # variants didn't. The helper version is now canonical. + s = _session(messages=[ + {"text": "alpha"}, + {"text": " "}, # whitespace-only — should be skipped + {"text": "\n\t\n"}, # whitespace-only — should be skipped + {"text": "beta"}, + ]) + assert session_text_for_exclusion(s) == "alpha\n\nbeta" + + def test_skips_non_string_text(self): + s = _session(messages=[{"text": "alpha"}, {"text": 42}, {"text": None}, {"text": "beta"}]) + assert session_text_for_exclusion(s) == "alpha\n\nbeta" + + +# --------------------------------------------------------------------------- +# is_session_excluded +# --------------------------------------------------------------------------- + +class TestIsSessionExcluded: + + def test_returns_false_when_rules_empty(self, tmp_path): + s = _session(title="anything", messages=[{"text": "anything"}]) + assert is_session_excluded([], s, "any project") is False + assert is_session_excluded(None, s, "any project") is False # type: ignore[arg-type] + + def test_matches_on_project_name(self, tmp_path): + rules = load_rules(_write_rules(tmp_path, "secret-project")) + s = _session() + assert is_session_excluded(rules, s, "my secret-project work") is True + assert is_session_excluded(rules, s, "unrelated work") is False + + def test_matches_on_session_title(self, tmp_path): + rules = load_rules(_write_rules(tmp_path, "confidential")) + assert is_session_excluded(rules, _session(title="Confidential debrief"), "proj") is True + assert is_session_excluded(rules, _session(title="Public roadmap"), "proj") is False + + def test_matches_on_model_name(self, tmp_path): + rules = load_rules(_write_rules(tmp_path, "claude-opus-4-7")) + s = _session(models=["claude-opus-4-7", "claude-haiku-4-5"]) + assert is_session_excluded(rules, s, "proj") is True + + def test_matches_on_message_content(self, tmp_path): + rules = load_rules(_write_rules(tmp_path, "password")) + s = _session(messages=[{"text": "do not commit the password"}]) + assert is_session_excluded(rules, s, "proj") is True + + def test_AND_rule_requires_both_terms(self, tmp_path): + # AND has higher precedence than OR (per the rule grammar). + rules = load_rules(_write_rules(tmp_path, "alpha AND beta")) + s_both = _session(messages=[{"text": "alpha and beta together"}]) + s_one = _session(messages=[{"text": "only alpha here"}]) + assert is_session_excluded(rules, s_both, "proj") is True + assert is_session_excluded(rules, s_one, "proj") is False + + def test_OR_rule_matches_either(self, tmp_path): + rules = load_rules(_write_rules(tmp_path, "alpha OR beta")) + s_alpha = _session(messages=[{"text": "alpha here"}]) + s_beta = _session(messages=[{"text": "beta here"}]) + s_neither = _session(messages=[{"text": "gamma here"}]) + assert is_session_excluded(rules, s_alpha, "proj") is True + assert is_session_excluded(rules, s_beta, "proj") is True + assert is_session_excluded(rules, s_neither, "proj") is False + + def test_quoted_phrase_match(self, tmp_path): + rules = load_rules(_write_rules(tmp_path, '"project alpha"')) + s_match = _session(title="Project alpha kickoff") + s_partial = _session(title="alpha project") # token order matters + assert is_session_excluded(rules, s_match, "proj") is True + assert is_session_excluded(rules, s_partial, "proj") is False + + def test_handles_session_without_metadata(self, tmp_path): + # Defensive: session dicts coming from older code paths might be + # missing a metadata key. Should not raise. + rules = load_rules(_write_rules(tmp_path, "anything")) + bare = {"title": "x", "messages": []} # no metadata key at all + assert is_session_excluded(rules, bare, "proj") is False + + def test_project_name_None_does_not_break(self, tmp_path): + rules = load_rules(_write_rules(tmp_path, "confidential")) + s = _session(title="Confidential") + # project_name=None should still let title-based rules match. + assert is_session_excluded(rules, s, None) is True diff --git a/utils/exclusion_rules.py b/utils/exclusion_rules.py index 822e56a..9f27bb9 100644 --- a/utils/exclusion_rules.py +++ b/utils/exclusion_rules.py @@ -209,3 +209,47 @@ def build_searchable_text( if content_snippet: parts.append(content_snippet) return "\n".join(p for p in parts if p) + + +def session_text_for_exclusion(session: dict) -> str: + """Extract a plain-text snippet from session messages for exclusion matching. + + Joins all non-empty, non-whitespace message ``text`` fields with blank + lines. Whitespace-only strings are skipped — they carry no signal for + rule matching and only inflate the haystack. (Previously this lived as a + duplicate ``_session_text_for_exclusion`` in two callers; consolidated + here as the single source of truth — issue #23.) + """ + parts = [] + for msg in session.get("messages", []): + text = msg.get("text") or "" + if isinstance(text, str) and text.strip(): + parts.append(text) + return "\n\n".join(parts) + + +def is_session_excluded( + rules: list[list], + session: dict, + project_name: str | None, +) -> bool: + """High-level helper: evaluate exclusion rules against a parsed session. + + Wraps the full pattern that was previously inlined at six call sites: + extract message text via :func:`session_text_for_exclusion`, build the + haystack via :func:`build_searchable_text`, then evaluate via + :func:`is_excluded_by_rules`. + + Returns ``False`` when ``rules`` is empty/falsy — callers can call this + unconditionally without first checking whether rules exist. + """ + if not rules: + return False + meta = session.get("metadata", {}) or {} + searchable = build_searchable_text( + project_name=project_name, + session_title=session.get("title"), + model_names=list(meta.get("models_used") or []), + content_snippet=session_text_for_exclusion(session), + ) + return is_excluded_by_rules(rules, searchable)