diff --git a/.coveragerc.reporting b/.coveragerc.reporting index d1965f1..4d52fbb 100644 --- a/.coveragerc.reporting +++ b/.coveragerc.reporting @@ -2,6 +2,7 @@ source = website_profiling.reporting omit = */website_profiling/reporting/builder.py + */website_profiling/reporting/builder_sections/* [report] show_missing = True diff --git a/README.md b/README.md index 35a0e51..cd9c58b 100644 --- a/README.md +++ b/README.md @@ -37,18 +37,17 @@ # Site Audit -**Open-source SEO crawl and technical audit platform** — self-hosted UI built with **Next.js, Python, and PostgreSQL**. - -Repository: [codefrydev/WebsiteProfiling](https://github.com/codefrydev/WebsiteProfiling) +**Open-source SEO crawl and technical audit platform** — built with **Next.js, Python, and PostgreSQL**. ## Overview -Site Audit is a self-hosted alternative to commercial SEO audit tools. It runs on your infrastructure, stores data in your PostgreSQL database, and produces transparent technical reports without subscription tiers or gated exports. +Site Audit is a self-hosted alternative to commercial SEO suites. It runs on your own infrastructure, stores data in your PostgreSQL database, and produces transparent technical reports — no subscription tiers, no gated exports. **Use cases** - Technical SEO audits for owned or client properties - Crawl analysis with static and JavaScript rendering +- Content writing and optimization with live SEO scoring - Search Console, GA4, and Bing Webmaster integration - Agency portfolio management and run comparison - Optional AI-assisted analysis over audit data via MCP-compatible tools @@ -79,7 +78,7 @@ Site Audit focuses on **honest, self-hosted technical SEO**. It is not a drop-in
@@ -147,8 +146,6 @@ WebsiteProfiling/
| `tests/` | Backend tests; `./local-test browser` for Playwright crawl integration |
| `docs/MCP.md` | MCP server setup for IDE and agent integrations |
| `data/` | Local secrets and shadow `pipeline-config.txt` (gitignored) |
-| `docker-compose.prod.yml` | Production stack (`POSTGRES_USER`, `POSTGRES_PASSWORD`, `AUTH_SECRET`) |
-| `docker-compose.pull.yml` | Pre-built `WEB_IMAGE` deployment |
For layout details and common development patterns, see [AGENT.md](AGENT.md).
@@ -225,6 +222,10 @@ Ask questions about audit data at [http://localhost:3000/chat](http://localhost:
The agent uses the same **340 read-only audit tools** as the MCP server ([docs/MCP.md](docs/MCP.md)), with **dynamic routing** (~45 tools per turn). Responses stream over SSE (`POST /api/chat`). Sessions persist per property (`chat_sessions` / `chat_messages`).
+### Content studio (optional)
+
+Write and optimize content at [http://localhost:3000/write](http://localhost:3000/write) with **live SEO scoring** from Search Console and on-page heuristics. Drafts persist per property; an optional AI assist (same providers as AI chat) drafts and rewrites copy. Backed by `/api/content-drafts`, `/api/content/score`, and `/api/content/analyze`.
+
## Contributing
Contributions are welcome. See [CONTRIBUTING.md](CONTRIBUTING.md) for setup and pull request guidelines.
diff --git a/src/website_profiling/common.py b/src/website_profiling/common.py
index 899ebe0..02d963e 100644
--- a/src/website_profiling/common.py
+++ b/src/website_profiling/common.py
@@ -21,6 +21,17 @@
_wappalyzer_disabled = _tech._wappalyzer_disabled
+def strip_www_prefix(host: str) -> str:
+ """Remove a single leading ``www.`` label from a host.
+
+ Use this instead of ``host.lstrip("www.")`` — ``str.lstrip`` strips any
+ leading characters in the *set* ``{'w', '.'}``, so e.g.
+ ``"www.washington.edu".lstrip("www.")`` wrongly yields ``"ashington.edu"``.
+ """
+ h = host or ""
+ return h[4:] if h.lower().startswith("www.") else h
+
+
def detect_tech_wappalyzer(url, html, headers, soup, wappalyzer=None):
"""Detect technologies; syncs wappalyzer module state with this facade for tests."""
_tech._wappalyzer_disabled = _wappalyzer_disabled
@@ -37,6 +48,7 @@ def detect_tech_wappalyzer(url, html, headers, soup, wappalyzer=None):
"load_edges",
"save_edges",
"strip_crawl_query_params",
+ "strip_www_prefix",
"normalize_link",
"parse_link_edges",
"parse_links",
diff --git a/src/website_profiling/config.py b/src/website_profiling/config.py
index 9308b66..3078164 100644
--- a/src/website_profiling/config.py
+++ b/src/website_profiling/config.py
@@ -37,7 +37,12 @@ def get_str(cfg: dict, key: str, default: str = "") -> str:
def get_bool(cfg: dict, key: str, default: bool = False) -> bool:
- return str(cfg.get(key, default)).lower() in ("true", "1", "yes")
+ raw = cfg.get(key)
+ # Missing or empty value falls back to the default (consistent with get_int/get_float);
+ # an empty string must not silently disable a default-on flag.
+ if raw is None or str(raw).strip() == "":
+ return default
+ return str(raw).strip().lower() in ("true", "1", "yes")
def get_int(cfg: dict, key: str, default: int | None = None) -> int | None:
diff --git a/src/website_profiling/content_studio/ai_suggest.py b/src/website_profiling/content_studio/ai_suggest.py
index b60c8dc..49b5645 100644
--- a/src/website_profiling/content_studio/ai_suggest.py
+++ b/src/website_profiling/content_studio/ai_suggest.py
@@ -34,6 +34,16 @@ def _rule_suggestions(score: dict[str, Any]) -> list[dict[str, Any]]:
"type": "term",
"source": "rule",
})
+ elif term.get("status") == "included":
+ count = int(term.get("count") or 0)
+ target = int(term.get("target") or 0)
+ if target and count < target and term.get("importance") == "high":
+ items.append({
+ "text": f"Use “{term.get('term')}” {target - count} more time(s) ({count}/{target}) to fully cover it.",
+ "priority": "low",
+ "type": "term",
+ "source": "rule",
+ })
for check in score.get("checks") or []:
if isinstance(check, dict) and not check.get("pass"):
items.append({
diff --git a/src/website_profiling/content_studio/score.py b/src/website_profiling/content_studio/score.py
index 8e80ec1..505be98 100644
--- a/src/website_profiling/content_studio/score.py
+++ b/src/website_profiling/content_studio/score.py
@@ -1,4 +1,12 @@
-"""Content Studio scoring from GSC keywords and on-page heuristics."""
+"""Content Studio scoring from GSC keywords and on-page heuristics.
+
+The score mirrors the workflow of a content-optimization editor (Clearscope-style):
+a target keyword expands into a set of related *terms*, each with a recommended
+usage *count*; the draft is graded on how well it covers those terms at the right
+frequency, plus on-page structure and readability. Term data is sourced from
+Search Console (real queries the property already shows for) — not live SERP
+scraping — so the grade is honestly "estimated", never a competitor crawl.
+"""
from __future__ import annotations
import re
@@ -15,18 +23,54 @@
PROVENANCE = "Search Console + on-site heuristics"
_WORD_COUNT_MIN = 600
+_WORD_COUNT_TARGET = 1200
_WORD_COUNT_MAX = 2500
+# Flesch–Kincaid grade we treat as broadly readable; above this we nudge to simplify.
+_READING_GRADE_TARGET = 12.0
+_READING_GRADE_MAX = 14.0
+# Below this word count, readability can't be measured meaningfully.
+_READING_MIN_WORDS = 80
+
+# How many leading words count as the "intro" for keyword-placement checks.
+_INTRO_WORDS = 100
+
+# Matching tokens: lowercase alphanumeric runs (word-boundary aware) so that a
+# term like "ai" never spuriously matches "br[ai]n" or "expl[ai]ned".
+_MATCH_TOKEN_RE = re.compile(r"[a-z0-9]+")
+
+# Words ignored when deciding whether a multi-word phrase is "partially" covered
+# or whether the keyword appears in the title/H1/intro.
+_STOPWORDS = frozenset(
+ {
+ "a", "an", "and", "the", "of", "for", "to", "in", "on", "or", "is",
+ "are", "be", "with", "your", "you", "how", "what", "why", "vs",
+ }
+)
+
+# Fine-grained grade bands (high → low). Mirrors a Clearscope-style A++…F scale so
+# small improvements are visible instead of being flattened into five buckets.
+_GRADE_BANDS: list[tuple[int, str]] = [
+ (97, "A++"),
+ (93, "A+"),
+ (90, "A"),
+ (87, "A-"),
+ (83, "B+"),
+ (80, "B"),
+ (77, "B-"),
+ (73, "C+"),
+ (70, "C"),
+ (67, "C-"),
+ (63, "D+"),
+ (60, "D"),
+ (57, "D-"),
+]
+
def _grade_label(score: int) -> str:
- if score >= 90:
- return "A"
- if score >= 80:
- return "B"
- if score >= 70:
- return "C"
- if score >= 60:
- return "D"
+ for threshold, label in _GRADE_BANDS:
+ if score >= threshold:
+ return label
return "F"
@@ -34,6 +78,32 @@ def _normalize_url(url: str) -> str:
return (url or "").strip().lower().rstrip("/")
+def _match_tokens(text: str) -> list[str]:
+ return _MATCH_TOKEN_RE.findall((text or "").lower())
+
+
+def _phrase_count(needle: list[str], haystack: list[str]) -> int:
+ """Count non-overlapping contiguous occurrences of ``needle`` within ``haystack``."""
+ n, m = len(haystack), len(needle)
+ if m == 0 or m > n:
+ return 0
+ count = 0
+ i = 0
+ while i <= n - m:
+ if haystack[i : i + m] == needle:
+ count += 1
+ i += m
+ else:
+ i += 1
+ return count
+
+
+def _significant_words(term_tokens: list[str]) -> list[str]:
+ """Content words of a phrase (drop short/stop words), falling back to all tokens."""
+ sig = [w for w in term_tokens if len(w) >= 3 and w not in _STOPWORDS]
+ return sig or term_tokens
+
+
def _html_to_text(html: str) -> str:
if not html or not html.strip():
return ""
@@ -48,18 +118,56 @@ def _count_h1(html: str) -> int:
return len(soup.find_all("h1"))
+def _first_h1_text(html: str) -> str:
+ if not html or not html.strip():
+ return ""
+ soup = BeautifulSoup(html, "html.parser")
+ h1 = soup.find("h1")
+ return h1.get_text(separator=" ", strip=True) if h1 else ""
+
+
+def _term_match(term: str, corpus_tokens: list[str], corpus_set: set[str]) -> tuple[str, int]:
+ """Return (status, count) for a term against tokenized corpus.
+
+ ``included`` → the exact phrase occurs ``count`` times.
+ ``partial`` → (multi-word only) every significant word appears, but not as a phrase.
+ ``missing`` → otherwise.
+ """
+ term_tokens = _match_tokens(term)
+ if not term_tokens:
+ return "missing", 0
+ count = _phrase_count(term_tokens, corpus_tokens)
+ if count > 0:
+ return "included", count
+ if len(term_tokens) > 1 and all(w in corpus_set for w in _significant_words(term_tokens)):
+ return "partial", 0
+ return "missing", 0
+
+
def _term_in_corpus(term: str, corpus: str) -> str:
- """Return included | partial | missing for a term against corpus text."""
- t = (term or "").strip().lower()
- if not t:
- return "missing"
- c = (corpus or "").lower()
- if t in c:
- return "included"
- words = [w for w in re.split(r"\W+", t) if len(w) >= 3]
- if words and all(w in c for w in words):
- return "partial"
- return "missing"
+ """Status (included | partial | missing) for a term against corpus text."""
+ tokens = _match_tokens(corpus)
+ status, _ = _term_match(term, tokens, set(tokens))
+ return status
+
+
+def _term_target(term: str, importance: str) -> int:
+ """Recommended occurrence count for a term (stable, independent of current length)."""
+ if len(_match_tokens(term)) >= 3:
+ return 1 # long phrases: a single natural mention is enough
+ return 3 if importance == "high" else 2
+
+
+def _keyword_present(keyword: str, text: str) -> bool:
+ """True if the keyword appears as a phrase, or all its content words appear."""
+ kw_tokens = _match_tokens(keyword)
+ if not kw_tokens:
+ return False
+ text_tokens = _match_tokens(text)
+ if _phrase_count(kw_tokens, text_tokens) > 0:
+ return True
+ text_set = set(text_tokens)
+ return all(w in text_set for w in _significant_words(kw_tokens))
def _title_check(title_tag: str) -> dict[str, Any]:
@@ -109,6 +217,42 @@ def _word_count_check(word_count: int) -> dict[str, Any]:
return {"id": "word_count", "pass": True, "hint": f"Word count in range ({word_count} words)."}
+def _keyword_in_title_check(keyword: str, title_tag: str) -> dict[str, Any]:
+ if _keyword_present(keyword, title_tag):
+ return {"id": "keyword_in_title", "pass": True, "hint": "Target keyword appears in the title tag."}
+ return {"id": "keyword_in_title", "pass": False, "hint": "Add the target keyword to the title tag."}
+
+
+def _keyword_in_h1_check(keyword: str, html: str) -> dict[str, Any]:
+ h1_text = _first_h1_text(html)
+ if h1_text and _keyword_present(keyword, h1_text):
+ return {"id": "keyword_in_h1", "pass": True, "hint": "Target keyword appears in the H1."}
+ return {"id": "keyword_in_h1", "pass": False, "hint": "Work the target keyword into the H1 heading."}
+
+
+def _keyword_in_intro_check(keyword: str, body_text: str) -> dict[str, Any]:
+ intro = " ".join((body_text or "").split()[:_INTRO_WORDS])
+ if intro and _keyword_present(keyword, intro):
+ return {"id": "keyword_in_intro", "pass": True, "hint": "Target keyword appears in the opening paragraph."}
+ return {
+ "id": "keyword_in_intro",
+ "pass": False,
+ "hint": f"Mention the target keyword within the first {_INTRO_WORDS} words.",
+ }
+
+
+def _reading_level_check(reading_level: float, word_count: int) -> dict[str, Any]:
+ if word_count < _READING_MIN_WORDS:
+ return {"id": "reading_level", "pass": False, "hint": "Add more content to assess readability."}
+ if reading_level > _READING_GRADE_MAX:
+ return {
+ "id": "reading_level",
+ "pass": False,
+ "hint": f"Reading level is high (grade {reading_level}); shorten sentences for a broader audience.",
+ }
+ return {"id": "reading_level", "pass": True, "hint": f"Reading level is accessible (grade {reading_level})."}
+
+
def _collect_gsc_terms(
keyword: str,
landing_url: str | None,
@@ -140,6 +284,11 @@ def add(term: str, importance: str, source: str, impressions: int = 0) -> None:
if kw_lower:
add(keyword.strip(), "high", "keyword", 10_000)
+ # Content words of the keyword drive topical relatedness (e.g. "crm" links
+ # "best crm" to "crm software"). Empty for all-stopword keywords, in which
+ # case we fall back to substring/URL matching only.
+ kw_content_words = {w for w in _match_tokens(keyword) if len(w) >= 3 and w not in _STOPWORDS}
+
scored_rows: list[tuple[int, dict[str, Any]]] = []
for row in rows:
if not isinstance(row, dict):
@@ -153,6 +302,7 @@ def add(term: str, importance: str, source: str, impressions: int = 0) -> None:
related = (
kw_lower in q_lower
or q_lower in kw_lower
+ or bool(kw_content_words & set(_match_tokens(q)))
or (landing_norm and landing_norm in gsc_url)
)
if related:
@@ -171,6 +321,7 @@ def add(term: str, importance: str, source: str, impressions: int = 0) -> None:
def _term_coverage_score(terms: list[dict[str, Any]]) -> float:
+ """Frequency-aware coverage: each term earns credit up to its target count."""
if not terms:
return 0.5
total_weight = 0.0
@@ -178,11 +329,15 @@ def _term_coverage_score(terms: list[dict[str, Any]]) -> float:
for t in terms:
w = 2.0 if t.get("importance") == "high" else 1.0
total_weight += w
- status = t.get("status") or "missing"
- if status == "included":
- earned += w
- elif status == "partial":
- earned += w * 0.5
+ count = int(t.get("count") or 0)
+ target = max(1, int(t.get("target") or 1))
+ if count > 0:
+ frac = min(count / target, 1.0)
+ elif t.get("status") == "partial":
+ frac = 0.4
+ else:
+ frac = 0.0
+ earned += w * frac
return earned / total_weight if total_weight else 0.5
@@ -220,7 +375,8 @@ def score_content_draft(
word_count = count_words(tokens)
reading_level = flesch_kincaid_grade(tokens, body_text) if tokens else 0.0
- corpus = f"{title_tag} {body_text}".lower()
+ corpus_tokens = _match_tokens(f"{title_tag} {body_text}")
+ corpus_set = set(corpus_tokens)
rows = keyword_rows
if rows is None and property_id is not None:
@@ -234,28 +390,38 @@ def score_content_draft(
raw_terms = _collect_gsc_terms(keyword, landing_url, rows)
terms: list[dict[str, Any]] = []
for t in raw_terms:
- status = _term_in_corpus(str(t["term"]), corpus)
- terms.append({**t, "status": status})
+ term_str = str(t["term"])
+ status, count = _term_match(term_str, corpus_tokens, corpus_set)
+ target = _term_target(term_str, str(t.get("importance") or "medium"))
+ terms.append({**t, "status": status, "count": count, "target": target})
checks = [
+ _keyword_in_title_check(keyword, title_tag),
+ _keyword_in_h1_check(keyword, body_html),
+ _keyword_in_intro_check(keyword, body_text),
_title_check(title_tag),
_meta_check(meta_description),
_h1_check(body_html),
_word_count_check(word_count),
+ _reading_level_check(reading_level, word_count),
]
term_cov = _term_coverage_score(terms)
check_rate = _checks_pass_rate(checks)
wc_band = _word_count_band_score(word_count)
- raw_grade = term_cov * 0.6 + check_rate * 0.25 + wc_band * 0.15
+ raw_grade = term_cov * 0.5 + check_rate * 0.35 + wc_band * 0.15
grade_score = max(0, min(100, round(raw_grade * 100)))
return {
"grade_score": grade_score,
"grade_label": _grade_label(grade_score),
"word_count": word_count,
+ "word_count_target": _WORD_COUNT_TARGET,
+ "word_count_min": _WORD_COUNT_MIN,
+ "word_count_max": _WORD_COUNT_MAX,
"reading_level": round(reading_level, 1),
+ "reading_level_target": _READING_GRADE_TARGET,
"terms": terms,
"checks": checks,
"provenance": PROVENANCE,
diff --git a/src/website_profiling/content_studio/wizard.py b/src/website_profiling/content_studio/wizard.py
new file mode 100644
index 0000000..96f97b4
--- /dev/null
+++ b/src/website_profiling/content_studio/wizard.py
@@ -0,0 +1,379 @@
+"""Content Studio guided-draft wizard: AI-driven brief → full draft.
+
+Powers the multi-step /write wizard (intent → content type → tone → title →
+outline → draft). Every step is AI-generated through the configured LLM, with
+deterministic fallbacks so the wizard degrades gracefully when the model is
+unavailable, disabled, or returns malformed JSON. The final draft body is
+*assembled* in Python from AI-written prose, so the HTML structure is always
+valid and safe (no model-emitted markup is trusted verbatim).
+"""
+from __future__ import annotations
+
+import html
+import re
+from typing import Any
+
+from ..llm.base import get_llm_client, parse_json_response
+from ..llm_config import load_llm_config_from_db, llm_is_enabled
+from ..text_sanitize import strip_surrogates
+
+_MAX_OPTIONS = 6
+_MAX_TITLES = 6
+_MAX_OUTLINE = 24
+_ALLOWED_LEVELS = ("h1", "h2", "h3")
+
+_FALLBACK_CONTENT_TYPES = [
+ ("How-to guide", "Step-by-step instructions that walk the reader through a task."),
+ ("Listicle", "A scannable numbered or bulleted list of items, tips, or examples."),
+ ("Comparison", "Weighs two or more options against each other to aid a decision."),
+ ("Explainer / overview", "Defines the topic and covers the essentials for newcomers."),
+ ("FAQ", "Answers the common questions searchers ask about the topic."),
+ ("Opinion / editorial", "A point-of-view piece backed by reasoning and examples."),
+]
+
+_FALLBACK_TONES = [
+ ("Professional", "Polished and credible, suitable for a business audience."),
+ ("Conversational", "Warm and approachable, like talking to a knowledgeable friend."),
+ ("Authoritative", "Confident and expert, establishing trust and depth."),
+ ("Friendly", "Casual and encouraging, easy for beginners to follow."),
+ ("Informative", "Neutral and fact-forward, prioritising clarity over flair."),
+ ("Persuasive", "Action-oriented, building toward a clear call to action."),
+]
+
+_JSON_SYSTEM = "You are an expert SEO content strategist. Respond with valid JSON only — no prose, no markdown fences."
+
+
+def _clean(value: Any) -> str:
+ return re.sub(r"\s+", " ", strip_surrogates(str(value or "")).strip())
+
+
+def _content_studio_ai_on(cfg: dict[str, str]) -> bool:
+ return str(cfg.get("llm_enable_content_studio", "true")).lower() in ("true", "1", "yes")
+
+
+def _get_client() -> tuple[Any, dict[str, Any] | None]:
+ """Return (client, None) when AI is usable, else (None, error_dict)."""
+ cfg = load_llm_config_from_db()
+ if not llm_is_enabled(cfg) or not _content_studio_ai_on(cfg):
+ return None, {"ok": False, "error": "AI is disabled. Enable it in Run audit → AI settings."}
+ try:
+ return get_llm_client(cfg), None
+ except ValueError as e:
+ return None, {"ok": False, "error": str(e)}
+
+
+def _safe_complete(client: Any, system: str, user: str) -> dict[str, Any]:
+ try:
+ data = client.complete_json(system, user)
+ except Exception:
+ return {}
+ if isinstance(data, dict):
+ return data
+ return parse_json_response(str(data))
+
+
+def _options_from_pairs(pairs: list[tuple[str, str]]) -> list[dict[str, str]]:
+ return [{"label": label, "description": desc} for label, desc in pairs]
+
+
+def _normalize_options(raw: Any) -> list[dict[str, str]]:
+ out: list[dict[str, str]] = []
+ if not isinstance(raw, list):
+ return out
+ for item in raw:
+ if isinstance(item, dict):
+ label = _clean(item.get("label") or item.get("name") or item.get("title"))
+ desc = _clean(item.get("description") or item.get("summary"))
+ elif isinstance(item, str):
+ label, desc = _clean(item), ""
+ else:
+ continue
+ if label:
+ out.append({"label": label[:120], "description": desc[:240]})
+ return out
+
+
+def _normalize_str_list(raw: Any) -> list[str]:
+ out: list[str] = []
+ if not isinstance(raw, list):
+ return out
+ for item in raw:
+ if isinstance(item, dict):
+ text = _clean(item.get("text") or item.get("title"))
+ elif isinstance(item, str):
+ text = _clean(item)
+ else:
+ continue
+ if text:
+ out.append(text[:160])
+ return out
+
+
+def _normalize_outline(raw: Any, title: str) -> list[dict[str, str]]:
+ items: list[dict[str, str]] = []
+ if isinstance(raw, list):
+ for it in raw:
+ if isinstance(it, dict):
+ level = str(it.get("level") or "").strip().lower()
+ text = _clean(it.get("text") or it.get("title") or it.get("heading"))
+ elif isinstance(it, str):
+ level, text = "h2", _clean(it)
+ else:
+ continue
+ if level not in _ALLOWED_LEVELS:
+ level = "h2"
+ if text:
+ items.append({"level": level, "text": text[:200]})
+ if len(items) >= _MAX_OUTLINE:
+ break
+
+ title_text = _clean(title) or (items[0]["text"] if items else "Untitled")
+ body_items = [it for it in items if it["level"] != "h1"]
+ if not body_items:
+ return _fallback_outline(title_text)
+ return [{"level": "h1", "text": title_text}, *body_items[: _MAX_OUTLINE - 1]]
+
+
+def _fallback_intents(keyword: str) -> list[dict[str, str]]:
+ kw = keyword.strip()
+ return _options_from_pairs([
+ (f"Learn about {kw}", f"Understand what {kw} is and why it matters."),
+ (f"How to use {kw}", f"Practical, step-by-step guidance for {kw}."),
+ (f"Best {kw} options", f"Compare the top {kw} choices available."),
+ (f"{kw} reviews & comparisons", f"Evaluate {kw} against the alternatives."),
+ ])
+
+
+def _fallback_titles(keyword: str) -> list[str]:
+ t = keyword.strip().title() or "Your Topic"
+ return [
+ f"{t}: A Complete Guide",
+ f"What Is {t}? Everything You Need to Know",
+ f"The Beginner's Guide to {t}",
+ f"{t}: Tips, Examples, and Best Practices",
+ ]
+
+
+def _fallback_outline(title: str) -> list[dict[str, str]]:
+ h1 = title.strip() or "Untitled"
+ sections = ["Introduction", "Key concepts", "How it works", "Practical tips", "Common mistakes", "Conclusion"]
+ return [{"level": "h1", "text": h1}, *({"level": "h2", "text": s} for s in sections)]
+
+
+def suggest_intents(keyword: str, locale: str = "en-US") -> dict[str, Any]:
+ client, err = _get_client()
+ if err:
+ return err
+ kw = (keyword or "").strip()
+ if not kw:
+ return {"ok": False, "error": "keyword required"}
+ user = (
+ f'For the search keyword "{kw}" (locale {locale}), list up to {_MAX_OPTIONS} distinct '
+ "search intents a reader might have. Return JSON: "
+ '{"intents":[{"label":"short intent label","description":"one sentence"}]}'
+ )
+ data = _safe_complete(client, _JSON_SYSTEM, user)
+ options = _normalize_options(data.get("intents")) or _fallback_intents(kw)
+ return {"ok": True, "options": options[:_MAX_OPTIONS]}
+
+
+def suggest_content_types(keyword: str, intent: str) -> dict[str, Any]:
+ client, err = _get_client()
+ if err:
+ return err
+ user = (
+ f'A writer is creating content for the keyword "{keyword.strip()}" with the intent '
+ f'"{intent.strip()}". Recommend up to {_MAX_OPTIONS} content types that best serve this, '
+ 'best first. Return JSON: {"content_types":[{"label":"type","description":"why it fits"}]}'
+ )
+ data = _safe_complete(client, _JSON_SYSTEM, user)
+ options = _normalize_options(data.get("content_types")) or _options_from_pairs(_FALLBACK_CONTENT_TYPES)
+ return {"ok": True, "options": options[:_MAX_OPTIONS]}
+
+
+def suggest_tones(keyword: str, intent: str, content_type: str) -> dict[str, Any]:
+ client, err = _get_client()
+ if err:
+ return err
+ user = (
+ f'For a "{content_type.strip()}" about "{keyword.strip()}" (intent: "{intent.strip()}"), '
+ f"recommend up to {_MAX_OPTIONS} writing tones, best first. "
+ 'Return JSON: {"tones":[{"label":"tone","description":"when to use it"}]}'
+ )
+ data = _safe_complete(client, _JSON_SYSTEM, user)
+ options = _normalize_options(data.get("tones")) or _options_from_pairs(_FALLBACK_TONES)
+ return {"ok": True, "options": options[:_MAX_OPTIONS]}
+
+
+def suggest_titles(keyword: str, intent: str, content_type: str, tone: str) -> dict[str, Any]:
+ client, err = _get_client()
+ if err:
+ return err
+ kw = (keyword or "").strip()
+ user = (
+ f'Write up to {_MAX_TITLES} compelling, SEO-friendly article titles for the keyword "{kw}". '
+ f'Content type: "{content_type.strip()}". Intent: "{intent.strip()}". Tone: "{tone.strip()}". '
+ "Keep each under 60 characters where possible and include the keyword naturally. "
+ 'Return JSON: {"titles":["title one","title two"]}'
+ )
+ data = _safe_complete(client, _JSON_SYSTEM, user)
+ titles = _normalize_str_list(data.get("titles")) or _fallback_titles(kw)
+ return {"ok": True, "titles": titles[:_MAX_TITLES]}
+
+
+def _fallback_questions(keyword: str) -> list[str]:
+ kw = keyword.strip()
+ return [
+ f"What is {kw}?",
+ f"How does {kw} work?",
+ f"Why is {kw} important?",
+ f"What are examples of {kw}?",
+ f"How do you use {kw}?",
+ ]
+
+
+def _fallback_sources() -> list[dict[str, str]]:
+ return _options_from_pairs([
+ ("Wikipedia", "Background, definitions, and a neutral overview."),
+ ("Official site or documentation", "Authoritative first-party specifics."),
+ ("Industry publications", "Expert analysis, trends, and commentary."),
+ ("Academic or research sources", "Evidence for data-backed claims."),
+ ("Reputable news coverage", "Recent developments and real-world context."),
+ ])
+
+
+def research_panel(keyword: str, intent: str = "", title: str = "") -> dict[str, Any]:
+ """People-Also-Ask style questions + suggested reference sources for a keyword."""
+ client, err = _get_client()
+ if err:
+ return err
+ kw = (keyword or "").strip()
+ if not kw:
+ return {"ok": False, "error": "keyword required"}
+ context = f' The article is "{title.strip()}" (intent "{intent.strip()}").' if (title or intent).strip() else ""
+ user = (
+ f'For the search keyword "{kw}", help an author research the topic.{context} Return JSON with: '
+ '"questions" = up to 8 "People Also Ask" style questions real searchers ask; '
+ '"sources" = up to 6 authoritative reference types to cite, each '
+ '{"label":"source name or type","description":"what to cite it for"}. '
+ 'Return JSON: {"questions":["..."],"sources":[{"label":"...","description":"..."}]}'
+ )
+ data = _safe_complete(client, _JSON_SYSTEM, user)
+ questions = _normalize_str_list(data.get("questions")) or _fallback_questions(kw)
+ sources = _normalize_options(data.get("sources")) or _fallback_sources()
+ return {"ok": True, "questions": questions[:8], "sources": sources[:6]}
+
+
+def suggest_outline(keyword: str, intent: str, content_type: str, tone: str, title: str) -> dict[str, Any]:
+ client, err = _get_client()
+ if err:
+ return err
+ user = (
+ f'Create a heading outline for an article titled "{title.strip()}" '
+ f'(keyword "{keyword.strip()}", {content_type.strip()}, intent "{intent.strip()}", tone "{tone.strip()}"). '
+ "Use h2 for main sections and h3 for sub-points. Do not include the title as a heading. "
+ 'Return JSON: {"outline":[{"level":"h2","text":"Section heading"},{"level":"h3","text":"Sub-point"}]}'
+ )
+ data = _safe_complete(client, _JSON_SYSTEM, user)
+ outline = _normalize_outline(data.get("outline"), title)
+ return {"ok": True, "outline": outline}
+
+
+def _assemble_body(h1_text: str, headings: list[dict[str, str]], sections: Any) -> str:
+ section_list = sections if isinstance(sections, list) else []
+ parts = [f"
{html.escape(prose)}
") + return "\n".join(parts) + + +def generate_draft( + keyword: str, + intent: str, + content_type: str, + tone: str, + title: str, + outline: list[dict[str, Any]], +) -> dict[str, Any]: + client, err = _get_client() + if err: + return err + + normalized = _normalize_outline(outline, title) + h1_text = next((it["text"] for it in normalized if it["level"] == "h1"), title.strip() or keyword.strip()) + headings = [it for it in normalized if it["level"] != "h1"] + headings_text = "\n".join(f"{it['level']}: {it['text']}" for it in headings) + + user = ( + f'Write the body of a "{content_type.strip()}" titled "{h1_text}" for the keyword ' + f'"{keyword.strip()}" (intent "{intent.strip()}", tone "{tone.strip()}"). ' + f"Write 2-4 sentences of plain-text prose for each heading below, in order:\n{headings_text}\n\n" + 'Return JSON: {"title_tag":"SEO title under 60 chars","meta_description":"under 160 chars",' + '"sections":["prose for heading 1","prose for heading 2", ...]} ' + "with one sections entry per heading, in the same order." + ) + data = _safe_complete(client, _JSON_SYSTEM, user) + + title_tag = (_clean(data.get("title_tag")) or h1_text)[:70] + meta = (_clean(data.get("meta_description")) or f"{h1_text}. Learn about {keyword.strip()}.")[:170] + body_html = _assemble_body(h1_text, headings, data.get("sections")) + return { + "ok": True, + "title_tag": title_tag, + "meta_description": meta, + "body_html": body_html, + "outline": normalized, + } + + +def run_wizard_step(step: str, payload: dict[str, Any]) -> dict[str, Any]: + """Dispatch a single wizard step. ``payload`` carries prior selections.""" + p = payload or {} + if step == "intents": + return suggest_intents(str(p.get("keyword") or ""), str(p.get("locale") or "en-US")) + if step == "content_types": + return suggest_content_types(str(p.get("keyword") or ""), str(p.get("intent") or "")) + if step == "tones": + return suggest_tones( + str(p.get("keyword") or ""), + str(p.get("intent") or ""), + str(p.get("contentType") or ""), + ) + if step == "titles": + return suggest_titles( + str(p.get("keyword") or ""), + str(p.get("intent") or ""), + str(p.get("contentType") or ""), + str(p.get("tone") or ""), + ) + if step == "research": + return research_panel( + str(p.get("keyword") or ""), + str(p.get("intent") or ""), + str(p.get("title") or ""), + ) + if step == "outline": + return suggest_outline( + str(p.get("keyword") or ""), + str(p.get("intent") or ""), + str(p.get("contentType") or ""), + str(p.get("tone") or ""), + str(p.get("title") or ""), + ) + if step == "draft": + return generate_draft( + str(p.get("keyword") or ""), + str(p.get("intent") or ""), + str(p.get("contentType") or ""), + str(p.get("tone") or ""), + str(p.get("title") or ""), + p.get("outline") if isinstance(p.get("outline"), list) else [], + ) + return {"ok": False, "error": f"unknown step: {step}"} diff --git a/src/website_profiling/crawl/crawler.py b/src/website_profiling/crawl/crawler.py index 1757e49..c74d727 100644 --- a/src/website_profiling/crawl/crawler.py +++ b/src/website_profiling/crawl/crawler.py @@ -44,6 +44,27 @@ ] +def _build_configured_session(config: CrawlConfig) -> requests.Session: + """Build a session configured from crawl auth/headers/cookies. + + Called once per thread (and once for the main-thread template) so each + worker thread fetches with its own session — see ``StaticFetcher``. + """ + session = requests.Session() + session.headers.update({"User-Agent": config.user_agent}) + if config.crawl_auth_username: + session.auth = (config.crawl_auth_username, config.crawl_auth_password or "") + for line in (config.crawl_extra_headers or "").replace("\r", "").split("\n"): + if ":" in line: + key, val = line.split(":", 1) + k, v = key.strip(), val.strip() + if k: + session.headers[k] = v + if config.crawl_cookies and str(config.crawl_cookies).strip(): + session.headers["Cookie"] = str(config.crawl_cookies).strip() + return session + + class Crawler: def __init__( self, @@ -185,24 +206,19 @@ def __init__( self.lock = self.frontier.lock self.results: list[dict] = [] - self.session = requests.Session() - self.session.headers.update({"User-Agent": config.user_agent}) - if config.crawl_auth_username: - self.session.auth = (config.crawl_auth_username, config.crawl_auth_password or "") - for line in (config.crawl_extra_headers or "").replace("\r", "").split("\n"): - if ":" in line: - key, val = line.split(":", 1) - k, v = key.strip(), val.strip() - if k: - self.session.headers[k] = v - if config.crawl_cookies and str(config.crawl_cookies).strip(): - self.session.headers["Cookie"] = str(config.crawl_cookies).strip() + # `requests.Session` is not thread-safe, so worker threads each build + # their own session from this factory (see StaticFetcher). The template + # `self.session` below is only touched on the main thread (sitemap + # seeding and Playwright auth mapping). + self._session_factory = lambda: _build_configured_session(config) + self.session = self._session_factory() self.fetcher = build_fetcher( render_mode=config.fetcher_render_mode, timeout=config.timeout, user_agent=config.user_agent, session=self.session, + session_factory=self._session_factory, js_concurrency=config.js_concurrency, js_timeout=config.js_timeout, js_wait_until=config.js_wait_until, diff --git a/src/website_profiling/crawl/fetchers/factory.py b/src/website_profiling/crawl/fetchers/factory.py index ff4f1d0..71d7885 100644 --- a/src/website_profiling/crawl/fetchers/factory.py +++ b/src/website_profiling/crawl/fetchers/factory.py @@ -85,6 +85,7 @@ def build_fetcher( timeout: int = 12, user_agent: str = "WebsiteProfilingCrawler/1.0", session: Optional[requests.Session] = None, + session_factory: Optional[Callable[[], requests.Session]] = None, js_concurrency: int = 3, js_timeout: int = 30, js_wait_until: str = "domcontentloaded", @@ -117,7 +118,12 @@ def build_fetcher( if mode == "javascript": validate_browser_available() return _browser_factory(**browser_kwargs)() - static = StaticFetcher(timeout=timeout, user_agent=user_agent, session=session) + static = StaticFetcher( + timeout=timeout, + user_agent=user_agent, + session=session, + session_factory=session_factory, + ) if mode == "static": return static if mode == "auto": diff --git a/src/website_profiling/crawl/fetchers/static.py b/src/website_profiling/crawl/fetchers/static.py index 7477dcc..37a867a 100644 --- a/src/website_profiling/crawl/fetchers/static.py +++ b/src/website_profiling/crawl/fetchers/static.py @@ -2,8 +2,9 @@ from __future__ import annotations +import threading import time -from typing import Optional +from typing import Callable, Optional import requests @@ -11,23 +12,66 @@ class StaticFetcher: + """Fetch pages over HTTP. + + ``requests.Session`` is not documented as thread-safe, so when this fetcher + is shared across worker threads each thread gets its own session built + lazily from ``session_factory``. Pass ``session`` (without a factory) to + keep the legacy single-shared-session behaviour for single-threaded callers. + """ + def __init__( self, *, timeout: int = 12, user_agent: str = "WebsiteProfilingCrawler/1.0", session: Optional[requests.Session] = None, + session_factory: Optional[Callable[[], requests.Session]] = None, ) -> None: self.timeout = timeout - self.session = session or requests.Session() - if session is None: - self.session.headers.update({"User-Agent": user_agent}) - self._owns_session = session is None + self._user_agent = user_agent + self._explicit_session = session + if session_factory is not None: + # Per-thread sessions built from the caller's factory. + self._session_factory: Optional[Callable[[], requests.Session]] = session_factory + elif session is not None: + # Legacy: a single caller-owned session shared as-is. + self._session_factory = None + else: + # No session supplied: own a per-thread default session. + def _default_factory() -> requests.Session: + sess = requests.Session() + sess.headers.update({"User-Agent": user_agent}) + return sess + + self._session_factory = _default_factory + self._local = threading.local() + self._owned_sessions: list[requests.Session] = [] + self._sessions_lock = threading.Lock() + + @property + def session(self) -> requests.Session: + """Return the session for the calling thread. + + With a factory, each thread lazily builds (and we track) its own + session; without one, the single explicit session is returned. + """ + if self._session_factory is None: + return self._explicit_session # type: ignore[return-value] + existing = getattr(self._local, "session", None) + if existing is not None: + return existing + built = self._session_factory() + self._local.session = built + with self._sessions_lock: + self._owned_sessions.append(built) + return built def fetch(self, url: str) -> FetchResult: + session = self.session try: t0 = time.perf_counter() - resp = self.session.get(url, timeout=self.timeout, allow_redirects=True) + resp = session.get(url, timeout=self.timeout, allow_redirects=True) response_time_ms = int((time.perf_counter() - t0) * 1000) ct = resp.headers.get("Content-Type", "") is_html = resp.status_code == 200 and ( @@ -63,5 +107,11 @@ def fetch(self, url: str) -> FetchResult: ) def close(self) -> None: - if self._owns_session: - self.session.close() + if self._session_factory is None: + # The caller owns the explicit session; leave its lifecycle to them. + return + with self._sessions_lock: + sessions = list(self._owned_sessions) + self._owned_sessions.clear() + for sess in sessions: + sess.close() diff --git a/src/website_profiling/db/historical.py b/src/website_profiling/db/historical.py index cb71baf..9698451 100644 --- a/src/website_profiling/db/historical.py +++ b/src/website_profiling/db/historical.py @@ -120,11 +120,16 @@ def _bulk( vals.append(v) params.append(tuple(vals)) try: - _executemany(conn, sql, params, page_size=500) + # Savepoint: a bulk failure must not poison the outer transaction, or the + # per-row fallback below would itself fail (current transaction is aborted) + # and silently drop every preserved row. + with conn.transaction(): + _executemany(conn, sql, params, page_size=500) except Exception: for p in params: try: - conn.execute(sql, p) + with conn.transaction(): + conn.execute(sql, p) except Exception: pass diff --git a/src/website_profiling/db/report_store.py b/src/website_profiling/db/report_store.py index 431d9ae..23df422 100644 --- a/src/website_profiling/db/report_store.py +++ b/src/website_profiling/db/report_store.py @@ -99,7 +99,10 @@ def write_report_payload(conn: Connection, report_data: dict[str, Any]) -> None: report_id = int(rid) if rid is not None else None if report_id is not None: try: - _write_audit_health_snapshot(conn, report_id, canonical_domain, report_data) + # Savepoint: a failed snapshot insert must not poison/roll back the + # report_payload write that precedes it in this transaction. + with conn.transaction(): + _write_audit_health_snapshot(conn, report_id, canonical_domain, report_data) except Exception: pass conn.commit() diff --git a/src/website_profiling/integrations/google/gsc.py b/src/website_profiling/integrations/google/gsc.py index e01e073..5d2d1e0 100644 --- a/src/website_profiling/integrations/google/gsc.py +++ b/src/website_profiling/integrations/google/gsc.py @@ -8,6 +8,8 @@ from typing import Any from urllib.parse import urlparse +from ...common import strip_www_prefix + INSTALL_HINT = ( "pip install google-api-python-client" @@ -205,7 +207,7 @@ def _url_prefix_key(site_url: str) -> str | None: if not site_url.startswith(("http://", "https://")): return None parsed = urlparse(site_url) - host = parsed.netloc.lower().lstrip("www.") + host = strip_www_prefix(parsed.netloc.lower()) path = parsed.path.rstrip("/") or "" return f"{parsed.scheme.lower()}://{host}{path}/" @@ -213,9 +215,9 @@ def _url_prefix_key(site_url: str) -> str | None: def _domain_from_site_url(site_url: str) -> str | None: site_url = site_url.strip() if site_url.startswith("sc-domain:"): - return site_url.split(":", 1)[1].lower().lstrip("www.") + return strip_www_prefix(site_url.split(":", 1)[1].lower()) if site_url.startswith(("http://", "https://")): - return urlparse(site_url).netloc.lower().lstrip("www.") + return strip_www_prefix(urlparse(site_url).netloc.lower()) return None diff --git a/src/website_profiling/integrations/google/gsc_links_csv.py b/src/website_profiling/integrations/google/gsc_links_csv.py index 78568e3..f1943d9 100644 --- a/src/website_profiling/integrations/google/gsc_links_csv.py +++ b/src/website_profiling/integrations/google/gsc_links_csv.py @@ -11,6 +11,8 @@ from datetime import datetime, timezone from typing import Any +from ...common import strip_www_prefix + from .normalize import build_crawl_norm_map, normalize_url _SECTION_KEYS = ( @@ -193,7 +195,7 @@ def get(*needles: str) -> str: try: from urllib.parse import urlparse - host = urlparse(source).netloc.lower().lstrip("www.") + host = strip_www_prefix(urlparse(source).netloc.lower()) if host: row["linking_site"] = host except Exception: diff --git a/src/website_profiling/integrations/google/gsc_links_sync.py b/src/website_profiling/integrations/google/gsc_links_sync.py index ee43059..aacf5cb 100644 --- a/src/website_profiling/integrations/google/gsc_links_sync.py +++ b/src/website_profiling/integrations/google/gsc_links_sync.py @@ -12,7 +12,8 @@ def snapshot_gsc_links(property_id: int, gsc_links_data: dict[str, Any]) -> None domains = gsc_links_data.get("top_linking_sites") or [] count = len(domains) top = [ - {"site": d.get("site"), "links": d.get("links")} + # top_linking_sites entries use the "link_count" key (see gsc_links_csv.py). + {"site": d.get("site"), "links": d.get("link_count")} for d in domains[:50] if isinstance(d, dict) ] diff --git a/src/website_profiling/integrations/google/normalize.py b/src/website_profiling/integrations/google/normalize.py index 3ef468a..4485c6a 100644 --- a/src/website_profiling/integrations/google/normalize.py +++ b/src/website_profiling/integrations/google/normalize.py @@ -5,12 +5,14 @@ from urllib.parse import urlparse +from ...common import strip_www_prefix + def normalize_url(url: str) -> str: """Strip scheme, www., trailing slash, lowercase -- for join key.""" url = url.strip() parsed = urlparse(url) - host = parsed.netloc.lower().lstrip("www.") + host = strip_www_prefix(parsed.netloc.lower()) path = parsed.path.rstrip("/") or "/" return f"{host}{path}" diff --git a/src/website_profiling/llm/agent.py b/src/website_profiling/llm/agent.py index e9b0a1b..40418c6 100644 --- a/src/website_profiling/llm/agent.py +++ b/src/website_profiling/llm/agent.py @@ -135,10 +135,12 @@ def _react_step( on_token: Callable[[str], None] | None, ) -> ChatResult: """JSON ReAct fallback for providers without native tool calling.""" + # Include "tool" messages so the model sees prior tool results; otherwise it + # keeps re-issuing the same call and loops until MAX_TOOL_ROUNDS. convo = "\n".join( f"{m.get('role')}: {m.get('content')}" for m in messages - if m.get("role") in ("user", "assistant", "system") + if m.get("role") in ("user", "assistant", "system", "tool") ) user = f"Available tools:\n{tools_desc}\n\nConversation:\n{convo}\n\nNext action JSON:" data = client.complete_json(SYSTEM_PROMPT + REACT_PROMPT_SUFFIX, user) diff --git a/src/website_profiling/llm/providers/anthropic.py b/src/website_profiling/llm/providers/anthropic.py index c352f37..499927e 100644 --- a/src/website_profiling/llm/providers/anthropic.py +++ b/src/website_profiling/llm/providers/anthropic.py @@ -7,6 +7,75 @@ from ..base import ChatResult, TokenCallback, ToolCall, parse_json_response +def _to_anthropic_messages(messages: list[dict[str, Any]]) -> tuple[str, list[dict[str, Any]]]: + """Convert OpenAI-shaped chat messages to ``(system, anthropic_messages)``. + + Assistant messages that carry ``tool_calls`` (the OpenAI shape the agent loop + produces) are reconstructed into ``tool_use`` content blocks. Without this the + following ``tool_result`` block has no matching ``tool_use`` in the prior + assistant turn and the Anthropic Messages API rejects the request with HTTP 400, + breaking every multi-round tool conversation. + """ + system_parts: list[str] = [] + out: list[dict[str, Any]] = [] + for msg in messages: + role = msg.get("role") + if role == "system": + system_parts.append(str(msg.get("content") or "")) + elif role == "tool": + out.append({ + "role": "user", + "content": [{ + "type": "tool_result", + "tool_use_id": str(msg.get("tool_call_id") or ""), + "content": str(msg.get("content") or ""), + }], + }) + elif role == "assistant" and msg.get("tool_calls"): + blocks: list[dict[str, Any]] = [] + text = str(msg.get("content") or "") + if text: + blocks.append({"type": "text", "text": text}) + for tc in msg.get("tool_calls") or []: + fn = tc.get("function") or {} + raw_args = fn.get("arguments", tc.get("arguments")) + if isinstance(raw_args, str): + try: + args = json.loads(raw_args or "{}") + except json.JSONDecodeError: + args = {} + elif isinstance(raw_args, dict): + args = raw_args + else: + args = {} + blocks.append({ + "type": "tool_use", + "id": str(tc.get("id") or ""), + "name": str(fn.get("name") or tc.get("name") or ""), + "input": args, + }) + out.append({"role": "assistant", "content": blocks}) + else: + out.append({ + "role": role if role in ("user", "assistant") else "user", + "content": str(msg.get("content") or ""), + }) + return "\n".join(system_parts), out + + +def _to_anthropic_tools(tools: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Convert OpenAI-shaped tool definitions to Anthropic ``input_schema`` form.""" + out: list[dict[str, Any]] = [] + for tool in tools: + fn = tool.get("function") or tool + out.append({ + "name": fn.get("name"), + "description": fn.get("description") or "", + "input_schema": fn.get("parameters") or {"type": "object", "properties": {}}, + }) + return out + + class AnthropicClient: def __init__(self, cfg: dict[str, str]) -> None: self._cfg = cfg @@ -49,41 +118,14 @@ def chat_with_tools( except ImportError as e: raise ImportError("pip install -r requirements.txt") from e - system_parts: list[str] = [] - anthropic_messages: list[dict[str, Any]] = [] - for msg in messages: - role = msg.get("role") - if role == "system": - system_parts.append(str(msg.get("content") or "")) - elif role == "tool": - anthropic_messages.append({ - "role": "user", - "content": [{ - "type": "tool_result", - "tool_use_id": str(msg.get("tool_call_id") or ""), - "content": str(msg.get("content") or ""), - }], - }) - else: - anthropic_messages.append({ - "role": role if role in ("user", "assistant") else "user", - "content": str(msg.get("content") or ""), - }) - - anthropic_tools = [] - for tool in tools: - fn = tool.get("function") or tool - anthropic_tools.append({ - "name": fn.get("name"), - "description": fn.get("description") or "", - "input_schema": fn.get("parameters") or {"type": "object", "properties": {}}, - }) + system, anthropic_messages = _to_anthropic_messages(messages) + anthropic_tools = _to_anthropic_tools(tools) client = anthropic.Anthropic(api_key=self._api_key, timeout=self._timeout) kwargs: dict[str, Any] = { "model": self._model, "max_tokens": 4096, - "system": "\n".join(system_parts), + "system": system, "messages": anthropic_messages, "tools": anthropic_tools, } diff --git a/src/website_profiling/llm/providers/openai.py b/src/website_profiling/llm/providers/openai.py index dfa4af4..2047a4a 100644 --- a/src/website_profiling/llm/providers/openai.py +++ b/src/website_profiling/llm/providers/openai.py @@ -149,7 +149,7 @@ def _stream_chat( acc["arguments"] += fn["arguments"] tool_calls: list[ToolCall] = [] - for acc in tool_calls_acc.values(): + for idx, acc in tool_calls_acc.items(): raw_args = acc.get("arguments") or "{}" try: args = json.loads(raw_args) if isinstance(raw_args, str) else dict(raw_args) @@ -157,7 +157,9 @@ def _stream_chat( args = {} tool_calls.append( ToolCall( - id=str(acc.get("id") or ""), + # OpenAI-compatible endpoints (Groq, etc.) may omit the id; synthesize + # a stable one from the stream index so tool_call_id pairing still works. + id=str(acc.get("id") or "") or f"call_{idx}", name=str(acc.get("name") or ""), arguments=args if isinstance(args, dict) else {}, ), diff --git a/src/website_profiling/reporting/builder.py b/src/website_profiling/reporting/builder.py index f2c14ec..6d9172e 100644 --- a/src/website_profiling/reporting/builder.py +++ b/src/website_profiling/reporting/builder.py @@ -57,6 +57,7 @@ _compute_summary_seo_issues, ) from .site_level import _fetch_site_level +from .builder_sections import build_content_url_lists, build_links_list # Backward-compatible re-exports for tests and external imports. __all__ = [ @@ -335,399 +336,13 @@ def run_simple_report( for from_url, to_url in edges: in_degree[to_url] = in_degree.get(to_url, 0) + 1 - dup_gid = ml_bundle.get("url_duplicate_group_id") or {} - sim_map = ml_bundle.get("similar_internal_by_url") or {} - lang_map = ml_bundle.get("language_by_url") or {} - spacy_map = ml_bundle.get("spacy_by_url") or {} - kp_map = ml_bundle.get("keyphrases_by_url") or {} - - # Full links list: every crawled URL with url, status, inlinks, title, content_length, depth - links = [] - for _, row in df.iterrows(): - u = row.get("url") - if pd.isna(u) or not u: - continue - u = str(u).strip() - st = str(row.get("status", "")).strip() - title_val = row.get("title") - title_str = "" if pd.isna(title_val) else str(title_val).strip() - content_len = row.get("content_length") - if "content_length" in df.columns and content_len is not None and not pd.isna(content_len): - content_len = int(pd.to_numeric(content_len, errors="coerce") or 0) - else: - content_len = 0 - depth_val = row.get("depth") if "depth" in df.columns else None - depth_int = None - if depth_val is not None and not pd.isna(depth_val): - try: - depth_int = int(pd.to_numeric(depth_val, errors="coerce") or 0) - except Exception: - depth_int = None - wc_val = row.get("word_count") if "word_count" in df.columns else 0 - wc_int = 0 - if wc_val is not None and not pd.isna(wc_val): - try: - wc_int = int(pd.to_numeric(wc_val, errors="coerce") or 0) - except Exception: - wc_int = 0 - rt_val = row.get("response_time_ms") if "response_time_ms" in df.columns else 0 - rt_int = 0 - if rt_val is not None and not pd.isna(rt_val): - try: - rt_int = int(pd.to_numeric(rt_val, errors="coerce") or 0) - except Exception: - rt_int = 0 - rec = { - "url": u, - "status": st, - "inlinks": in_degree.get(u, 0), - "title": title_str, - "content_length": content_len, - "word_count": wc_int, - "response_time_ms": rt_int, - } - if depth_int is not None: - rec["depth"] = depth_int - - def _int_col(col): - v = row.get(col) if col in df.columns else None - if v is None or (isinstance(v, float) and pd.isna(v)): - return 0 - try: - return int(pd.to_numeric(v, errors="coerce") or 0) - except Exception: - return 0 - - def _str_col(col): - v = row.get(col) if col in df.columns else None - if v is None or (isinstance(v, float) and pd.isna(v)): - return "" - return str(v).strip() - - def _bool_col(col): - v = row.get(col) if col in df.columns else None - if v is None or (isinstance(v, float) and pd.isna(v)): - return False - return bool(v) - - # Navigation / crawl basics - rec["outlinks"] = _int_col("outlinks") - rec["content_type"] = _str_col("content_type") - rec["redirect_chain_length"] = _int_col("redirect_chain_length") - - # SEO signals - rec["meta_description"] = _str_col("meta_description") - rec["meta_description_len"] = _int_col("meta_description_len") - rec["h1"] = _str_col("h1") - rec["h1_count"] = _int_col("h1_count") - rec["canonical_url"] = _str_col("canonical_url") - rec["noindex"] = _bool_col("noindex") - rec["has_schema"] = _bool_col("has_schema") - rec["viewport_present"] = _bool_col("viewport_present") - rec["heading_sequence"] = _str_col("heading_sequence") - - # Images & accessibility - rec["images_total"] = _int_col("images_total") - rec["images_without_alt"] = _int_col("images_without_alt") - rec["img_without_lazy"] = _int_col("img_without_lazy") - rec["img_without_dimensions"] = _int_col("img_without_dimensions") - rec["aria_count"] = _int_col("aria_count") - rec["mixed_content_count"] = _int_col("mixed_content_count") - - # Assets - rec["script_count"] = _int_col("script_count") - rec["link_stylesheet_count"] = _int_col("link_stylesheet_count") - - # Caching - rec["cache_control"] = _str_col("cache_control") - rec["etag"] = _str_col("etag") - - # Security headers - rec["strict_transport_security"] = _str_col("strict_transport_security") - rec["x_content_type_options"] = _str_col("x_content_type_options") - rec["x_frame_options"] = _str_col("x_frame_options") - rec["content_security_policy"] = _str_col("content_security_policy") - - # Content analysis - rec["reading_level"] = round(float(pd.to_numeric(row.get("reading_level") if "reading_level" in df.columns else None, errors="coerce") or 0.0), 1) - rec["content_html_ratio"] = round(float(pd.to_numeric(row.get("content_html_ratio") if "content_html_ratio" in df.columns else None, errors="coerce") or 0.0), 2) - rec["top_keywords"] = _str_col("top_keywords") - rec["content_excerpt"] = _str_col("content_excerpt") if "content_excerpt" in df.columns else "" - - # Social / OG - rec["og_title"] = _str_col("og_title") - rec["og_description"] = _str_col("og_description") - rec["og_image"] = _str_col("og_image") - rec["og_type"] = _str_col("og_type") - rec["twitter_card"] = _str_col("twitter_card") - rec["twitter_title"] = _str_col("twitter_title") - rec["twitter_image"] = _str_col("twitter_image") - - # Tech stack - rec["tech_stack"] = _str_col("tech_stack") - - # Custom extraction (regex + XPath/CSS extractors) - rec["custom_extract"] = _str_col("custom_extract") - rec["custom_fields"] = _str_col("custom_fields") - - pa_obj: dict[str, Any] = {} - if "page_analysis" in df.columns: - raw_pa = row.get("page_analysis") - if raw_pa is not None and not (isinstance(raw_pa, float) and pd.isna(raw_pa)): - s = str(raw_pa).strip() - if s and s != "{}": - try: - pa_obj = json.loads(s) - except json.JSONDecodeError: - pa_obj = {} - if not isinstance(pa_obj, dict): - pa_obj = {} - rec["page_analysis"] = pa_obj - rec["internal_link_count"] = int(pa_obj.get("internal_link_count") or 0) - rec["external_link_count"] = int(pa_obj.get("external_link_count") or 0) - from ..crawl.fetchers.browser_diagnostics import browser_summary_from_page_analysis - - browser_counts = browser_summary_from_page_analysis(pa_obj) - rec["console_error_count"] = browser_counts["console_error_count"] - rec["page_error_count"] = browser_counts["page_error_count"] - rec["has_browser_errors"] = ( - browser_counts["console_error_count"] > 0 or browser_counts["page_error_count"] > 0 - ) - - rec["lighthouse"] = lighthouse_for_url(lighthouse_by_url or {}, u) - - uk = u.rstrip("/") - if isinstance(rec["page_analysis"], dict): - if uk in lang_map: - rec["page_analysis"].setdefault("signals", {})["language"] = lang_map[uk] - if uk in spacy_map: - rec["page_analysis"].setdefault("signals", {})["nlp_entities"] = spacy_map[uk] - if uk in dup_gid: - rec["duplicate_group_id"] = dup_gid[uk] - nei = sim_map.get(uk) or sim_map.get(u) - if nei: - rec["similar_internal"] = list(nei) - if uk in lang_map: - rec["detected_language"] = lang_map[uk] - if uk in spacy_map: - rec["nlp_entities"] = spacy_map[uk] - if uk in kp_map: - rec["keyphrases"] = kp_map[uk] - - links.append(rec) + # Full links list: every crawled URL with its SEO/a11y/asset/content signals. + links = build_links_list(df, in_degree, lighthouse_by_url, ml_bundle) # Content URL lists for On-Page Content view - missing_h1 = [] - missing_title = [] - multiple_h1 = [] - if "h1_count" in df.columns: - h1c = pd.to_numeric(df["h1_count"], errors="coerce").fillna(-1).astype(int) - for i, row in df.iterrows(): - u = row.get("url") - if pd.isna(u) or not u: - continue - u = str(u).strip() - t = row.get("title") - title_str = "" if pd.isna(t) else str(t).strip() - if h1c.iloc[i] == 0 or h1c.iloc[i] == -1: - missing_h1.append({"url": u, "title": title_str}) - elif h1c.iloc[i] > 1: - multiple_h1.append({"url": u, "h1_count": int(h1c.iloc[i]), "title": title_str}) - if "title" in df.columns: - titles = df["title"].fillna("").astype(str) - for i, row in df.iterrows(): - u = row.get("url") - if pd.isna(u) or not u: - continue - u = str(u).strip() - if titles.iloc[i].strip() == "": - missing_title.append({"url": u}) - - missing_meta_desc = [] - meta_desc_short = [] - meta_desc_long = [] - thin_content = [] - if "meta_description_len" in df.columns: - md_len = pd.to_numeric(df["meta_description_len"], errors="coerce").fillna(0).astype(int) - for i, row in df.iterrows(): - u = row.get("url") - if pd.isna(u) or not u: - continue - u = str(u).strip() - ml = md_len.iloc[i] - title_str = "" if pd.isna(row.get("title")) else str(row.get("title")).strip() - if ml == 0: - missing_meta_desc.append({"url": u, "title": title_str}) - elif 0 < ml < META_DESC_LEN_MIN: - meta_desc_short.append({"url": u, "title": title_str, "meta_desc_len": int(ml)}) - elif ml > META_DESC_LEN_MAX: - meta_desc_long.append({"url": u, "title": title_str, "meta_desc_len": int(ml)}) - if "content_length" in df.columns: - cl = pd.to_numeric(df["content_length"], errors="coerce").fillna(0).astype(int) - for i, row in df.iterrows(): - u = row.get("url") - if pd.isna(u) or not u: - continue - u = str(u).strip() - c = int(cl.iloc[i]) - if 0 < c < THIN_CONTENT_CHARS: - title_str = "" if pd.isna(row.get("title")) else str(row.get("title")).strip() - thin_content.append({"url": u, "title": title_str, "content_length": c}) - - missing_canonical: list[dict[str, Any]] = [] - canonical_mismatch: list[dict[str, Any]] = [] - missing_alt: list[dict[str, Any]] = [] success_mask = df["status"].astype(str).str.match(r"2\d{2}", na=False) if "status" in df.columns else pd.Series([True] * len(df)) success_df_urls = df[success_mask] if len(df) else df - if "canonical_url" in success_df_urls.columns: - for _, row in success_df_urls.iterrows(): - u = row.get("url") - if pd.isna(u) or not u: - continue - u = str(u).strip() - title_str = "" if pd.isna(row.get("title")) else str(row.get("title")).strip() - canon = "" if pd.isna(row.get("canonical_url")) else str(row.get("canonical_url")).strip() - if not canon: - missing_canonical.append({"url": u, "title": title_str}) - elif u.rstrip("/").lower() != canon.rstrip("/").lower(): - canonical_mismatch.append({"url": u, "canonical_url": canon, "title": title_str}) - if "images_without_alt" in success_df_urls.columns: - alt_missing = pd.to_numeric(success_df_urls["images_without_alt"], errors="coerce").fillna(0).astype(int) - for i, row in success_df_urls.iterrows(): - if alt_missing.loc[i] <= 0: - continue - u = row.get("url") - if pd.isna(u) or not u: - continue - missing_alt.append({ - "url": str(u).strip(), - "images_without_alt": int(alt_missing.loc[i]), - "images_total": int(pd.to_numeric(row.get("images_total"), errors="coerce") or 0), - }) - - missing_lazy: list[dict[str, Any]] = [] - missing_dimensions: list[dict[str, Any]] = [] - if "img_without_lazy" in success_df_urls.columns: - lazy_missing = pd.to_numeric(success_df_urls["img_without_lazy"], errors="coerce").fillna(0).astype(int) - for i, row in success_df_urls.iterrows(): - if lazy_missing.loc[i] <= 0: - continue - u = row.get("url") - if pd.isna(u) or not u: - continue - missing_lazy.append({ - "url": str(u).strip(), - "img_without_lazy": int(lazy_missing.loc[i]), - "images_total": int(pd.to_numeric(row.get("images_total"), errors="coerce") or 0), - }) - if "img_without_dimensions" in success_df_urls.columns: - dim_missing = pd.to_numeric(success_df_urls["img_without_dimensions"], errors="coerce").fillna(0).astype(int) - for i, row in success_df_urls.iterrows(): - if dim_missing.loc[i] <= 0: - continue - u = row.get("url") - if pd.isna(u) or not u: - continue - missing_dimensions.append({ - "url": str(u).strip(), - "img_without_dimensions": int(dim_missing.loc[i]), - "images_total": int(pd.to_numeric(row.get("images_total"), errors="coerce") or 0), - }) - - title_short: list[dict[str, Any]] = [] - title_long: list[dict[str, Any]] = [] - if "title" in df.columns: - titles = df["title"].fillna("").astype(str) - tl = titles.str.len() - for i, row in df.iterrows(): - u = row.get("url") - if pd.isna(u) or not u: - continue - u = str(u).strip() - title_str = titles.iloc[i].strip() - n = int(tl.iloc[i]) - if n == 0: - continue - if n < TITLE_LEN_MIN: - title_short.append({"url": u, "title": title_str, "title_length": n}) - elif n > TITLE_LEN_MAX: - title_long.append({"url": u, "title": title_str, "title_length": n}) - - slow_response: list[dict[str, Any]] = [] - if "response_time_ms" in df.columns: - rt = pd.to_numeric(df["response_time_ms"], errors="coerce") - for i, row in df.iterrows(): - ms = rt.iloc[i] - if pd.isna(ms) or float(ms) <= 2000: - continue - u = row.get("url") - if pd.isna(u) or not u: - continue - slow_response.append({"url": str(u).strip(), "response_time_ms": int(ms)}) - - missing_html_lang: list[dict[str, Any]] = [] - invalid_viewport: list[dict[str, Any]] = [] - if "html_lang" in success_df_urls.columns: - for _, row in success_df_urls.iterrows(): - u = row.get("url") - if pd.isna(u) or not u: - continue - lang = str(row.get("html_lang") or "").strip() - if not lang: - missing_html_lang.append({"url": str(u).strip()}) - if "viewport_present" in success_df_urls.columns: - vp = success_df_urls["viewport_present"] - for _, row in success_df_urls.iterrows(): - u = row.get("url") - if pd.isna(u) or not u: - continue - if not bool(row.get("viewport_present")): - invalid_viewport.append({"url": str(u).strip()}) - - high_reading_level: list[dict[str, Any]] = [] - very_thin_content: list[dict[str, Any]] = [] - if "reading_level" in success_df_urls.columns: - rl = pd.to_numeric(success_df_urls["reading_level"], errors="coerce") - for i, row in success_df_urls.iterrows(): - val = rl.loc[i] - if pd.isna(val) or float(val) <= 12: - continue - u = row.get("url") - if pd.isna(u) or not u: - continue - high_reading_level.append({"url": str(u).strip(), "reading_level": float(val)}) - if "word_count" in success_df_urls.columns: - wc = pd.to_numeric(success_df_urls["word_count"], errors="coerce").fillna(0).astype(int) - for i, row in success_df_urls.iterrows(): - w = int(wc.loc[i]) - if w <= 0 or w >= 100: - continue - u = row.get("url") - if pd.isna(u) or not u: - continue - very_thin_content.append({"url": str(u).strip(), "word_count": w}) - - content_urls = { - "missing_h1": missing_h1, - "missing_title": missing_title, - "multiple_h1": multiple_h1, - "missing_meta_desc": missing_meta_desc, - "meta_desc_short": meta_desc_short, - "meta_desc_long": meta_desc_long, - "thin_content": thin_content, - "missing_canonical": missing_canonical, - "canonical_mismatch": canonical_mismatch, - "missing_alt": missing_alt, - "missing_lazy": missing_lazy, - "missing_dimensions": missing_dimensions, - "title_short": title_short, - "title_long": title_long, - "slow_response": slow_response, - "missing_html_lang": missing_html_lang, - "invalid_viewport": invalid_viewport, - "high_reading_level": high_reading_level, - "very_thin_content": very_thin_content, - } + content_urls = build_content_url_lists(df, success_df_urls) emit_progress("report", "content_analytics", message="Building content analytics") print(" Building content analytics...", flush=True) diff --git a/src/website_profiling/reporting/builder_sections/__init__.py b/src/website_profiling/reporting/builder_sections/__init__.py new file mode 100644 index 0000000..172bba2 --- /dev/null +++ b/src/website_profiling/reporting/builder_sections/__init__.py @@ -0,0 +1,10 @@ +"""Extracted, self-contained sections of the report builder. + +These were split out of ``reporting/builder.py`` to keep that orchestrator +readable. Each is a pure function of the crawl DataFrame (plus already-computed +inputs) and produces one slice of the report payload. +""" +from .content_urls import build_content_url_lists +from .links import build_links_list + +__all__ = ["build_content_url_lists", "build_links_list"] diff --git a/src/website_profiling/reporting/builder_sections/content_urls.py b/src/website_profiling/reporting/builder_sections/content_urls.py new file mode 100644 index 0000000..0cbc542 --- /dev/null +++ b/src/website_profiling/reporting/builder_sections/content_urls.py @@ -0,0 +1,234 @@ +"""Per-URL content issue lists for the On-Page Content view. + +Extracted verbatim from ``reporting/builder.py``. Pure function of the crawl +DataFrame (and the success-status subset computed by the caller). +""" +from __future__ import annotations + +from typing import Any + +import pandas as pd + +from ..seo_summary import ( + META_DESC_LEN_MAX, + META_DESC_LEN_MIN, + THIN_CONTENT_CHARS, + TITLE_LEN_MAX, + TITLE_LEN_MIN, +) + + +def build_content_url_lists( + df: pd.DataFrame, + success_df_urls: pd.DataFrame, +) -> dict[str, list[dict[str, Any]]]: + """Build the ``content_urls`` payload slice (missing_h1, thin_content, etc.).""" + missing_h1: list[dict[str, Any]] = [] + missing_title: list[dict[str, Any]] = [] + multiple_h1: list[dict[str, Any]] = [] + if "h1_count" in df.columns: + h1c = pd.to_numeric(df["h1_count"], errors="coerce").fillna(-1).astype(int) + for i, row in df.iterrows(): + u = row.get("url") + if pd.isna(u) or not u: + continue + u = str(u).strip() + t = row.get("title") + title_str = "" if pd.isna(t) else str(t).strip() + if h1c.iloc[i] == 0 or h1c.iloc[i] == -1: + missing_h1.append({"url": u, "title": title_str}) + elif h1c.iloc[i] > 1: + multiple_h1.append({"url": u, "h1_count": int(h1c.iloc[i]), "title": title_str}) + if "title" in df.columns: + titles = df["title"].fillna("").astype(str) + for i, row in df.iterrows(): + u = row.get("url") + if pd.isna(u) or not u: + continue + u = str(u).strip() + if titles.iloc[i].strip() == "": + missing_title.append({"url": u}) + + missing_meta_desc = [] + meta_desc_short = [] + meta_desc_long = [] + thin_content = [] + if "meta_description_len" in df.columns: + md_len = pd.to_numeric(df["meta_description_len"], errors="coerce").fillna(0).astype(int) + for i, row in df.iterrows(): + u = row.get("url") + if pd.isna(u) or not u: + continue + u = str(u).strip() + ml = md_len.iloc[i] + title_str = "" if pd.isna(row.get("title")) else str(row.get("title")).strip() + if ml == 0: + missing_meta_desc.append({"url": u, "title": title_str}) + elif 0 < ml < META_DESC_LEN_MIN: + meta_desc_short.append({"url": u, "title": title_str, "meta_desc_len": int(ml)}) + elif ml > META_DESC_LEN_MAX: + meta_desc_long.append({"url": u, "title": title_str, "meta_desc_len": int(ml)}) + if "content_length" in df.columns: + cl = pd.to_numeric(df["content_length"], errors="coerce").fillna(0).astype(int) + for i, row in df.iterrows(): + u = row.get("url") + if pd.isna(u) or not u: + continue + u = str(u).strip() + c = int(cl.iloc[i]) + if 0 < c < THIN_CONTENT_CHARS: + title_str = "" if pd.isna(row.get("title")) else str(row.get("title")).strip() + thin_content.append({"url": u, "title": title_str, "content_length": c}) + + missing_canonical: list[dict[str, Any]] = [] + canonical_mismatch: list[dict[str, Any]] = [] + missing_alt: list[dict[str, Any]] = [] + if "canonical_url" in success_df_urls.columns: + for _, row in success_df_urls.iterrows(): + u = row.get("url") + if pd.isna(u) or not u: + continue + u = str(u).strip() + title_str = "" if pd.isna(row.get("title")) else str(row.get("title")).strip() + canon = "" if pd.isna(row.get("canonical_url")) else str(row.get("canonical_url")).strip() + if not canon: + missing_canonical.append({"url": u, "title": title_str}) + elif u.rstrip("/").lower() != canon.rstrip("/").lower(): + canonical_mismatch.append({"url": u, "canonical_url": canon, "title": title_str}) + if "images_without_alt" in success_df_urls.columns: + alt_missing = pd.to_numeric(success_df_urls["images_without_alt"], errors="coerce").fillna(0).astype(int) + for i, row in success_df_urls.iterrows(): + if alt_missing.loc[i] <= 0: + continue + u = row.get("url") + if pd.isna(u) or not u: + continue + missing_alt.append({ + "url": str(u).strip(), + "images_without_alt": int(alt_missing.loc[i]), + "images_total": int(pd.to_numeric(row.get("images_total"), errors="coerce") or 0), + }) + + missing_lazy: list[dict[str, Any]] = [] + missing_dimensions: list[dict[str, Any]] = [] + if "img_without_lazy" in success_df_urls.columns: + lazy_missing = pd.to_numeric(success_df_urls["img_without_lazy"], errors="coerce").fillna(0).astype(int) + for i, row in success_df_urls.iterrows(): + if lazy_missing.loc[i] <= 0: + continue + u = row.get("url") + if pd.isna(u) or not u: + continue + missing_lazy.append({ + "url": str(u).strip(), + "img_without_lazy": int(lazy_missing.loc[i]), + "images_total": int(pd.to_numeric(row.get("images_total"), errors="coerce") or 0), + }) + if "img_without_dimensions" in success_df_urls.columns: + dim_missing = pd.to_numeric(success_df_urls["img_without_dimensions"], errors="coerce").fillna(0).astype(int) + for i, row in success_df_urls.iterrows(): + if dim_missing.loc[i] <= 0: + continue + u = row.get("url") + if pd.isna(u) or not u: + continue + missing_dimensions.append({ + "url": str(u).strip(), + "img_without_dimensions": int(dim_missing.loc[i]), + "images_total": int(pd.to_numeric(row.get("images_total"), errors="coerce") or 0), + }) + + title_short: list[dict[str, Any]] = [] + title_long: list[dict[str, Any]] = [] + if "title" in df.columns: + titles = df["title"].fillna("").astype(str) + tl = titles.str.len() + for i, row in df.iterrows(): + u = row.get("url") + if pd.isna(u) or not u: + continue + u = str(u).strip() + title_str = titles.iloc[i].strip() + n = int(tl.iloc[i]) + if n == 0: + continue + if n < TITLE_LEN_MIN: + title_short.append({"url": u, "title": title_str, "title_length": n}) + elif n > TITLE_LEN_MAX: + title_long.append({"url": u, "title": title_str, "title_length": n}) + + slow_response: list[dict[str, Any]] = [] + if "response_time_ms" in df.columns: + rt = pd.to_numeric(df["response_time_ms"], errors="coerce") + for i, row in df.iterrows(): + ms = rt.iloc[i] + if pd.isna(ms) or float(ms) <= 2000: + continue + u = row.get("url") + if pd.isna(u) or not u: + continue + slow_response.append({"url": str(u).strip(), "response_time_ms": int(ms)}) + + missing_html_lang: list[dict[str, Any]] = [] + invalid_viewport: list[dict[str, Any]] = [] + if "html_lang" in success_df_urls.columns: + for _, row in success_df_urls.iterrows(): + u = row.get("url") + if pd.isna(u) or not u: + continue + lang = str(row.get("html_lang") or "").strip() + if not lang: + missing_html_lang.append({"url": str(u).strip()}) + if "viewport_present" in success_df_urls.columns: + vp = success_df_urls["viewport_present"] + for _, row in success_df_urls.iterrows(): + u = row.get("url") + if pd.isna(u) or not u: + continue + if not bool(row.get("viewport_present")): + invalid_viewport.append({"url": str(u).strip()}) + + high_reading_level: list[dict[str, Any]] = [] + very_thin_content: list[dict[str, Any]] = [] + if "reading_level" in success_df_urls.columns: + rl = pd.to_numeric(success_df_urls["reading_level"], errors="coerce") + for i, row in success_df_urls.iterrows(): + val = rl.loc[i] + if pd.isna(val) or float(val) <= 12: + continue + u = row.get("url") + if pd.isna(u) or not u: + continue + high_reading_level.append({"url": str(u).strip(), "reading_level": float(val)}) + if "word_count" in success_df_urls.columns: + wc = pd.to_numeric(success_df_urls["word_count"], errors="coerce").fillna(0).astype(int) + for i, row in success_df_urls.iterrows(): + w = int(wc.loc[i]) + if w <= 0 or w >= 100: + continue + u = row.get("url") + if pd.isna(u) or not u: + continue + very_thin_content.append({"url": str(u).strip(), "word_count": w}) + + return { + "missing_h1": missing_h1, + "missing_title": missing_title, + "multiple_h1": multiple_h1, + "missing_meta_desc": missing_meta_desc, + "meta_desc_short": meta_desc_short, + "meta_desc_long": meta_desc_long, + "thin_content": thin_content, + "missing_canonical": missing_canonical, + "canonical_mismatch": canonical_mismatch, + "missing_alt": missing_alt, + "missing_lazy": missing_lazy, + "missing_dimensions": missing_dimensions, + "title_short": title_short, + "title_long": title_long, + "slow_response": slow_response, + "missing_html_lang": missing_html_lang, + "invalid_viewport": invalid_viewport, + "high_reading_level": high_reading_level, + "very_thin_content": very_thin_content, + } diff --git a/src/website_profiling/reporting/builder_sections/links.py b/src/website_profiling/reporting/builder_sections/links.py new file mode 100644 index 0000000..ec59587 --- /dev/null +++ b/src/website_profiling/reporting/builder_sections/links.py @@ -0,0 +1,204 @@ +"""Per-URL ``links`` list for the report payload (Link Explorer / On-Page views). + +Extracted verbatim from ``reporting/builder.py``. Builds one record per crawled +URL with SEO, accessibility, asset, caching, security-header, content and +social/OG signals, plus Lighthouse and ML enrichment overlays. +""" +from __future__ import annotations + +import json +from typing import Any + +import pandas as pd + +from ...crawl.fetchers.browser_diagnostics import browser_summary_from_page_analysis +from ..lighthouse_report import lighthouse_for_url + + +def build_links_list( + df: pd.DataFrame, + in_degree: dict[str, int], + lighthouse_by_url: dict[str, Any] | None, + ml_bundle: dict[str, Any], +) -> list[dict[str, Any]]: + """Build the full ``links`` payload slice: every crawled URL with its signals.""" + dup_gid = ml_bundle.get("url_duplicate_group_id") or {} + sim_map = ml_bundle.get("similar_internal_by_url") or {} + lang_map = ml_bundle.get("language_by_url") or {} + spacy_map = ml_bundle.get("spacy_by_url") or {} + kp_map = ml_bundle.get("keyphrases_by_url") or {} + + links = [] + for _, row in df.iterrows(): + u = row.get("url") + if pd.isna(u) or not u: + continue + u = str(u).strip() + st = str(row.get("status", "")).strip() + title_val = row.get("title") + title_str = "" if pd.isna(title_val) else str(title_val).strip() + content_len = row.get("content_length") + if "content_length" in df.columns and content_len is not None and not pd.isna(content_len): + content_len = int(pd.to_numeric(content_len, errors="coerce") or 0) + else: + content_len = 0 + depth_val = row.get("depth") if "depth" in df.columns else None + depth_int = None + if depth_val is not None and not pd.isna(depth_val): + try: + depth_int = int(pd.to_numeric(depth_val, errors="coerce") or 0) + except Exception: + depth_int = None + wc_val = row.get("word_count") if "word_count" in df.columns else 0 + wc_int = 0 + if wc_val is not None and not pd.isna(wc_val): + try: + wc_int = int(pd.to_numeric(wc_val, errors="coerce") or 0) + except Exception: + wc_int = 0 + rt_val = row.get("response_time_ms") if "response_time_ms" in df.columns else 0 + rt_int = 0 + if rt_val is not None and not pd.isna(rt_val): + try: + rt_int = int(pd.to_numeric(rt_val, errors="coerce") or 0) + except Exception: + rt_int = 0 + rec = { + "url": u, + "status": st, + "inlinks": in_degree.get(u, 0), + "title": title_str, + "content_length": content_len, + "word_count": wc_int, + "response_time_ms": rt_int, + } + if depth_int is not None: + rec["depth"] = depth_int + + def _int_col(col): + v = row.get(col) if col in df.columns else None + if v is None or (isinstance(v, float) and pd.isna(v)): + return 0 + try: + return int(pd.to_numeric(v, errors="coerce") or 0) + except Exception: + return 0 + + def _str_col(col): + v = row.get(col) if col in df.columns else None + if v is None or (isinstance(v, float) and pd.isna(v)): + return "" + return str(v).strip() + + def _bool_col(col): + v = row.get(col) if col in df.columns else None + if v is None or (isinstance(v, float) and pd.isna(v)): + return False + return bool(v) + + # Navigation / crawl basics + rec["outlinks"] = _int_col("outlinks") + rec["content_type"] = _str_col("content_type") + rec["redirect_chain_length"] = _int_col("redirect_chain_length") + + # SEO signals + rec["meta_description"] = _str_col("meta_description") + rec["meta_description_len"] = _int_col("meta_description_len") + rec["h1"] = _str_col("h1") + rec["h1_count"] = _int_col("h1_count") + rec["canonical_url"] = _str_col("canonical_url") + rec["noindex"] = _bool_col("noindex") + rec["has_schema"] = _bool_col("has_schema") + rec["viewport_present"] = _bool_col("viewport_present") + rec["heading_sequence"] = _str_col("heading_sequence") + + # Images & accessibility + rec["images_total"] = _int_col("images_total") + rec["images_without_alt"] = _int_col("images_without_alt") + rec["img_without_lazy"] = _int_col("img_without_lazy") + rec["img_without_dimensions"] = _int_col("img_without_dimensions") + rec["aria_count"] = _int_col("aria_count") + rec["mixed_content_count"] = _int_col("mixed_content_count") + + # Assets + rec["script_count"] = _int_col("script_count") + rec["link_stylesheet_count"] = _int_col("link_stylesheet_count") + + # Caching + rec["cache_control"] = _str_col("cache_control") + rec["etag"] = _str_col("etag") + + # Security headers + rec["strict_transport_security"] = _str_col("strict_transport_security") + rec["x_content_type_options"] = _str_col("x_content_type_options") + rec["x_frame_options"] = _str_col("x_frame_options") + rec["content_security_policy"] = _str_col("content_security_policy") + + # Content analysis + rec["reading_level"] = round(float(pd.to_numeric(row.get("reading_level") if "reading_level" in df.columns else None, errors="coerce") or 0.0), 1) + rec["content_html_ratio"] = round(float(pd.to_numeric(row.get("content_html_ratio") if "content_html_ratio" in df.columns else None, errors="coerce") or 0.0), 2) + rec["top_keywords"] = _str_col("top_keywords") + rec["content_excerpt"] = _str_col("content_excerpt") if "content_excerpt" in df.columns else "" + + # Social / OG + rec["og_title"] = _str_col("og_title") + rec["og_description"] = _str_col("og_description") + rec["og_image"] = _str_col("og_image") + rec["og_type"] = _str_col("og_type") + rec["twitter_card"] = _str_col("twitter_card") + rec["twitter_title"] = _str_col("twitter_title") + rec["twitter_image"] = _str_col("twitter_image") + + # Tech stack + rec["tech_stack"] = _str_col("tech_stack") + + # Custom extraction (regex + XPath/CSS extractors) + rec["custom_extract"] = _str_col("custom_extract") + rec["custom_fields"] = _str_col("custom_fields") + + pa_obj: dict[str, Any] = {} + if "page_analysis" in df.columns: + raw_pa = row.get("page_analysis") + if raw_pa is not None and not (isinstance(raw_pa, float) and pd.isna(raw_pa)): + s = str(raw_pa).strip() + if s and s != "{}": + try: + pa_obj = json.loads(s) + except json.JSONDecodeError: + pa_obj = {} + if not isinstance(pa_obj, dict): + pa_obj = {} + rec["page_analysis"] = pa_obj + rec["internal_link_count"] = int(pa_obj.get("internal_link_count") or 0) + rec["external_link_count"] = int(pa_obj.get("external_link_count") or 0) + + browser_counts = browser_summary_from_page_analysis(pa_obj) + rec["console_error_count"] = browser_counts["console_error_count"] + rec["page_error_count"] = browser_counts["page_error_count"] + rec["has_browser_errors"] = ( + browser_counts["console_error_count"] > 0 or browser_counts["page_error_count"] > 0 + ) + + rec["lighthouse"] = lighthouse_for_url(lighthouse_by_url or {}, u) + + uk = u.rstrip("/") + if isinstance(rec["page_analysis"], dict): + if uk in lang_map: + rec["page_analysis"].setdefault("signals", {})["language"] = lang_map[uk] + if uk in spacy_map: + rec["page_analysis"].setdefault("signals", {})["nlp_entities"] = spacy_map[uk] + if uk in dup_gid: + rec["duplicate_group_id"] = dup_gid[uk] + nei = sim_map.get(uk) or sim_map.get(u) + if nei: + rec["similar_internal"] = list(nei) + if uk in lang_map: + rec["detected_language"] = lang_map[uk] + if uk in spacy_map: + rec["nlp_entities"] = spacy_map[uk] + if uk in kp_map: + rec["keyphrases"] = kp_map[uk] + + links.append(rec) + + return links diff --git a/src/website_profiling/reporting/content_analytics.py b/src/website_profiling/reporting/content_analytics.py index 3018e13..47911d5 100644 --- a/src/website_profiling/reporting/content_analytics.py +++ b/src/website_profiling/reporting/content_analytics.py @@ -47,10 +47,12 @@ def _build_content_analytics(df: pd.DataFrame) -> dict: if "reading_level" in success_df.columns: rl = pd.to_numeric(success_df["reading_level"], errors="coerce").fillna(0) - rl_bins = [(0, 5), (6, 8), (9, 12), (13, 99)] + # Half-open contiguous bins: reading_level is a float (Flesch-Kincaid grade), + # so inclusive bins like (0,5)/(6,8) silently dropped fractional grades (5.5, 8.7). + rl_bins = [(0, 6), (6, 9), (9, 13), (13, float("inf"))] rl_labels = ["Elementary (0-5)", "Middle School (6-8)", "High School (9-12)", "College (13+)"] result["reading_level_distribution"] = { - lbl: int(((rl >= lo) & (rl <= hi)).sum()) for (lo, hi), lbl in zip(rl_bins, rl_labels) + lbl: int(((rl >= lo) & (rl < hi)).sum()) for (lo, hi), lbl in zip(rl_bins, rl_labels) } if "content_html_ratio" in success_df.columns: diff --git a/src/website_profiling/tools/audit_tools/backlink_lists.py b/src/website_profiling/tools/audit_tools/backlink_lists.py index de3db31..c7662b9 100644 --- a/src/website_profiling/tools/audit_tools/backlink_lists.py +++ b/src/website_profiling/tools/audit_tools/backlink_lists.py @@ -7,6 +7,7 @@ from psycopg import Connection +from ...common import strip_www_prefix from ._slice import cap_list, parse_limit from .context import AuditToolContext @@ -95,7 +96,7 @@ def list_backlinks_from_domain(conn: Connection, ctx: AuditToolContext, args: di scoped = ctx.with_args(args) if scoped.property_id is None: return {"error": "property_id is required", "links": [], "total": 0, "truncated": False} - domain = str(args.get("domain") or args.get("linking_site") or "").strip().lower().lstrip("www.") + domain = strip_www_prefix(str(args.get("domain") or args.get("linking_site") or "").strip().lower()) if not domain: return {"error": "domain is required", "links": [], "total": 0, "truncated": False} data = _load_links(scoped, conn) diff --git a/src/website_profiling/tools/audit_tools/link_lists.py b/src/website_profiling/tools/audit_tools/link_lists.py index 0d20789..6713851 100644 --- a/src/website_profiling/tools/audit_tools/link_lists.py +++ b/src/website_profiling/tools/audit_tools/link_lists.py @@ -6,6 +6,7 @@ from psycopg import Connection +from ...common import strip_www_prefix from ._slice import cap_list, parse_limit from .context import AuditToolContext @@ -87,10 +88,10 @@ def list_outbound_links(conn: Connection, ctx: AuditToolContext, args: dict[str, ] if not items: start = str(payload.get("start_url") or payload.get("origin") or "").strip() - origin_host = urlparse(start).netloc.lower().lstrip("www.") if start else "" + origin_host = strip_www_prefix(urlparse(start).netloc.lower()) if start else "" for e in edges: to_url = str(e.get("to_url") or "") - host = urlparse(to_url).netloc.lower().lstrip("www.") + host = strip_www_prefix(urlparse(to_url).netloc.lower()) if origin_host and host and host != origin_host: items.append({ "from_url": e.get("from_url"), diff --git a/tests/content_studio/test_ai_suggest.py b/tests/content_studio/test_ai_suggest.py index 3ed1d71..e399676 100644 --- a/tests/content_studio/test_ai_suggest.py +++ b/tests/content_studio/test_ai_suggest.py @@ -50,6 +50,24 @@ def test_analyze_without_ai_runs_all_tools() -> None: assert "Rule-based" in result["provenance"] +def test_rule_suggestions_under_target_high_term() -> None: + score = { + "terms": [ + {"term": "best crm", "status": "included", "importance": "high", "count": 1, "target": 3}, + {"term": "crm software", "status": "included", "importance": "high", "count": 3, "target": 3}, + {"term": "sales pipeline", "status": "included", "importance": "medium", "count": 1, "target": 2}, + ], + "checks": [], + "word_count": 800, + } + items = _rule_suggestions(score) + texts = [i["text"] for i in items] + # Only the under-target high-importance term gets a "use it more" tip. + assert any("best crm" in t and "more time" in t for t in texts) + assert not any("crm software" in t for t in texts) + assert not any("sales pipeline" in t for t in texts) + + def test_rule_suggestions_skips_non_dict_terms() -> None: score = { "terms": ["bad", {"term": "crm", "status": "missing", "importance": "high"}], diff --git a/tests/content_studio/test_score.py b/tests/content_studio/test_score.py index a879f23..edb98cb 100644 --- a/tests/content_studio/test_score.py +++ b/tests/content_studio/test_score.py @@ -35,9 +35,13 @@ def test_score_empty_body_low_grade() -> None: ) assert 0 <= result["grade_score"] <= 100 assert result["word_count"] == 0 - assert result["grade_label"] in ("A", "B", "C", "D", "F") + assert result["grade_label"] == "F" assert result["provenance"] == "Search Console + on-site heuristics" - assert any(t["term"] == "best crm" for t in result["terms"]) + assert result["word_count_target"] > 0 + assert result["reading_level_target"] > 0 + kw_term = next(t for t in result["terms"] if t["term"] == "best crm") + assert kw_term["count"] == 0 + assert kw_term["target"] >= 1 def test_score_rich_content_higher() -> None: @@ -67,8 +71,12 @@ def test_score_rich_content_higher() -> None: ], ) assert rich["grade_score"] >= sparse["grade_score"] - assert rich["checks"][2]["id"] == "h1_single" - assert rich["checks"][2]["pass"] is True + h1_check = next(c for c in rich["checks"] if c["id"] == "h1_single") + assert h1_check["pass"] is True + # Frequency: "crm software" appears twice in the rich body. + crm_software = next(t for t in rich["terms"] if t["term"] == "crm software") + assert crm_software["status"] == "included" + assert crm_software["count"] == 2 def test_meta_title_checks() -> None: @@ -83,10 +91,14 @@ def test_meta_title_checks() -> None: def test_grade_label_bounds() -> None: from website_profiling.content_studio.score import _grade_label - assert _grade_label(95) == "A" - assert _grade_label(85) == "B" - assert _grade_label(75) == "C" - assert _grade_label(65) == "D" + assert _grade_label(100) == "A++" + assert _grade_label(95) == "A+" + assert _grade_label(90) == "A" + assert _grade_label(88) == "A-" + assert _grade_label(81) == "B" + assert _grade_label(70) == "C" + assert _grade_label(60) == "D" + assert _grade_label(57) == "D-" assert _grade_label(40) == "F" @@ -179,6 +191,93 @@ def test_score_loads_keyword_rows_from_db() -> None: assert any(t["term"] == "best crm" for t in result["terms"]) +def test_term_in_corpus_no_substring_false_positive() -> None: + """Word-boundary matching: a short term must not match inside a longer word.""" + from website_profiling.content_studio.score import _term_in_corpus + + assert _term_in_corpus("ai", "the brain explained this domain") == "missing" + assert _term_in_corpus("ai", "the best ai tools available") == "included" + + +def test_term_match_counts_occurrences() -> None: + from website_profiling.content_studio.score import _match_tokens, _term_match + + tokens = _match_tokens("crm software is the best crm software for teams") + status, count = _term_match("crm software", tokens, set(tokens)) + assert status == "included" + assert count == 2 + + +def test_phrase_count_edges() -> None: + from website_profiling.content_studio.score import _phrase_count + + assert _phrase_count([], ["a", "b"]) == 0 + assert _phrase_count(["a", "b", "c"], ["a"]) == 0 + assert _phrase_count(["a", "a"], ["a", "a", "a"]) == 1 # non-overlapping + + +def test_term_target_scales_with_importance_and_length() -> None: + from website_profiling.content_studio.score import _term_target + + assert _term_target("crm", "high") == 3 + assert _term_target("crm", "medium") == 2 + assert _term_target("best crm software guide", "high") == 1 # long phrase + + +def test_keyword_present_phrase_words_and_fallback() -> None: + from website_profiling.content_studio.score import _keyword_present + + assert _keyword_present("best crm", "the best crm tool") is True # phrase + assert _keyword_present("best crm", "crm picks ranked from best to worst") is True # words + assert _keyword_present("best crm", "spreadsheet tips") is False + assert _keyword_present("", "anything") is False + assert _keyword_present("to by", "go to and come by") is True # stopword fallback + + +def test_keyword_placement_checks() -> None: + from website_profiling.content_studio.score import ( + _keyword_in_h1_check, + _keyword_in_intro_check, + _keyword_in_title_check, + ) + + assert _keyword_in_title_check("best crm", "Best CRM Guide")["pass"] is True + assert _keyword_in_title_check("best crm", "Spreadsheet Guide")["pass"] is False + assert _keyword_in_h1_check("best crm", "no heading
")["pass"] is False + assert _keyword_in_intro_check("best crm", "The best crm options today.")["pass"] is True + assert _keyword_in_intro_check("best crm", "")["pass"] is False + assert _keyword_in_intro_check("best crm", "Spreadsheets are fine.")["pass"] is False + + +def test_first_h1_text() -> None: + from website_profiling.content_studio.score import _first_h1_text + + assert _first_h1_text("") == "" + assert _first_h1_text("no heading here
") == "" + + +def test_reading_level_check_branches() -> None: + from website_profiling.content_studio.score import _reading_level_check + + assert _reading_level_check(9.0, 20)["pass"] is False # too short + assert _reading_level_check(15.0, 500)["pass"] is False # too complex + assert _reading_level_check(9.0, 500)["pass"] is True + + +def test_term_coverage_frequency_aware() -> None: + from website_profiling.content_studio.score import _term_coverage_score + + # Below target earns a fraction; at/above target earns full credit. + below = _term_coverage_score([{"importance": "high", "status": "included", "count": 1, "target": 3}]) + full = _term_coverage_score([{"importance": "high", "status": "included", "count": 3, "target": 3}]) + missing = _term_coverage_score([{"importance": "high", "status": "missing", "count": 0, "target": 3}]) + assert below == 1 / 3 + assert full == 1.0 + assert missing == 0.0 + + def test_score_db_returns_non_list_rows() -> None: from unittest.mock import MagicMock, patch diff --git a/tests/content_studio/test_wizard.py b/tests/content_studio/test_wizard.py new file mode 100644 index 0000000..2facc86 --- /dev/null +++ b/tests/content_studio/test_wizard.py @@ -0,0 +1,299 @@ +"""Tests for the Content Studio guided-draft wizard.""" +from __future__ import annotations + +from contextlib import contextmanager +from unittest.mock import patch + +from website_profiling.content_studio import wizard +from website_profiling.content_studio.wizard import ( + _assemble_body, + _normalize_options, + _normalize_outline, + _normalize_str_list, + generate_draft, + research_panel, + run_wizard_step, + suggest_content_types, + suggest_intents, + suggest_outline, + suggest_titles, + suggest_tones, +) + + +class FakeClient: + def __init__(self, payload=None, *, raise_exc=False): + self.payload = payload + self.raise_exc = raise_exc + + def complete_json(self, system, user): + if self.raise_exc: + raise RuntimeError("boom") + return self.payload + + +@contextmanager +def ai(client, cfg=None): + with patch("website_profiling.content_studio.wizard.load_llm_config_from_db", return_value=cfg or {}), patch( + "website_profiling.content_studio.wizard.llm_is_enabled", return_value=True + ), patch("website_profiling.content_studio.wizard.get_llm_client", return_value=client): + yield + + +# --- gating --------------------------------------------------------------- + + +def test_disabled_when_llm_off() -> None: + with patch("website_profiling.content_studio.wizard.load_llm_config_from_db", return_value={}), patch( + "website_profiling.content_studio.wizard.llm_is_enabled", return_value=False + ): + out = run_wizard_step("intents", {"keyword": "best crm"}) + assert out["ok"] is False + assert "disabled" in out["error"].lower() + + +def test_disabled_when_content_studio_toggle_off() -> None: + with patch("website_profiling.content_studio.wizard.load_llm_config_from_db", return_value={"llm_enable_content_studio": "false"}), patch( + "website_profiling.content_studio.wizard.llm_is_enabled", return_value=True + ): + out = suggest_intents("best crm") + assert out["ok"] is False + + +def test_every_step_returns_error_when_disabled() -> None: + with patch("website_profiling.content_studio.wizard.load_llm_config_from_db", return_value={}), patch( + "website_profiling.content_studio.wizard.llm_is_enabled", return_value=False + ): + assert suggest_content_types("k", "i")["ok"] is False + assert suggest_tones("k", "i", "c")["ok"] is False + assert suggest_titles("k", "i", "c", "t")["ok"] is False + assert suggest_outline("k", "i", "c", "t", "T")["ok"] is False + assert generate_draft("k", "i", "c", "t", "T", [])["ok"] is False + assert research_panel("k")["ok"] is False + + +def test_get_client_value_error() -> None: + with patch("website_profiling.content_studio.wizard.load_llm_config_from_db", return_value={}), patch( + "website_profiling.content_studio.wizard.llm_is_enabled", return_value=True + ), patch("website_profiling.content_studio.wizard.get_llm_client", side_effect=ValueError("no provider")): + out = suggest_intents("best crm") + assert out["ok"] is False + assert out["error"] == "no provider" + + +# --- intents -------------------------------------------------------------- + + +def test_intents_happy_normalizes_mixed_shapes() -> None: + payload = {"intents": [ + {"label": "Learn", "description": "Understand it"}, + {"name": "How to", "summary": "Do it"}, + "Compare options", + 12345, + ]} + with ai(FakeClient(payload)): + out = suggest_intents("best crm") + labels = [o["label"] for o in out["options"]] + assert out["ok"] is True + assert labels == ["Learn", "How to", "Compare options"] + + +def test_intents_fallback_on_empty() -> None: + with ai(FakeClient({})): + out = suggest_intents("best crm") + assert out["ok"] is True + assert any("best crm" in o["label"] for o in out["options"]) + + +def test_intents_keyword_required() -> None: + with ai(FakeClient({})): + out = suggest_intents(" ") + assert out["ok"] is False + assert out["error"] == "keyword required" + + +def test_intents_handles_client_exception() -> None: + with ai(FakeClient(raise_exc=True)): + out = suggest_intents("best crm") + assert out["ok"] is True # falls back + + +def test_intents_parses_json_string_response() -> None: + with ai(FakeClient('{"intents":[{"label":"From string"}]}')): + out = suggest_intents("best crm") + assert out["options"][0]["label"] == "From string" + + +# --- content types & tones ------------------------------------------------ + + +def test_content_types_happy_and_fallback() -> None: + with ai(FakeClient({"content_types": [{"label": "Guide", "description": "d"}]})): + happy = suggest_content_types("best crm", "Learn about it") + with ai(FakeClient({})): + fallback = suggest_content_types("best crm", "Learn about it") + assert happy["options"][0]["label"] == "Guide" + assert len(fallback["options"]) == 6 + + +def test_tones_happy_and_fallback() -> None: + with ai(FakeClient({"tones": [{"label": "Snappy", "description": "d"}]})): + happy = suggest_tones("best crm", "Learn", "Guide") + with ai(FakeClient({})): + fallback = suggest_tones("best crm", "Learn", "Guide") + assert happy["options"][0]["label"] == "Snappy" + assert any(o["label"] == "Professional" for o in fallback["options"]) + + +# --- titles --------------------------------------------------------------- + + +def test_titles_happy_with_strings_and_dicts() -> None: + with ai(FakeClient({"titles": ["Title A", {"text": "Title B"}, "", 5]})): + out = suggest_titles("best crm", "Learn", "Guide", "Professional") + assert out["titles"] == ["Title A", "Title B"] + + +def test_titles_fallback_when_not_a_list() -> None: + with ai(FakeClient({"titles": "not a list"})): + out = suggest_titles("best crm", "Learn", "Guide", "Professional") + assert out["ok"] is True + assert any("Complete Guide" in t for t in out["titles"]) + + +# --- outline -------------------------------------------------------------- + + +def test_outline_happy_normalizes_levels_and_drops_h1() -> None: + payload = {"outline": [ + {"level": "h2", "text": "Section A"}, + {"level": "h3", "text": "Sub B"}, + {"level": "bogus", "text": "Coerced"}, + "Plain section", + 99, + {"level": "h1", "text": "Should be dropped"}, + ]} + with ai(FakeClient(payload)): + out = suggest_outline("best crm", "Learn", "Guide", "Professional", "My Title") + outline = out["outline"] + assert outline[0] == {"level": "h1", "text": "My Title"} + assert {"level": "h2", "text": "Coerced"} in outline + assert {"level": "h2", "text": "Plain section"} in outline + assert all(it["text"] != "Should be dropped" for it in outline) + + +def test_outline_fallback_on_empty() -> None: + with ai(FakeClient({})): + out = suggest_outline("best crm", "Learn", "Guide", "Professional", "My Title") + assert out["outline"][0] == {"level": "h1", "text": "My Title"} + assert len(out["outline"]) == 7 # h1 + 6 default sections + + +def test_normalize_outline_title_fallbacks() -> None: + # No title → first body heading becomes the h1. + out = _normalize_outline([{"level": "h2", "text": "First"}], "") + assert out[0]["text"] == "First" + # Nothing at all → Untitled fallback outline. + empty = _normalize_outline([], "") + assert empty[0]["text"] == "Untitled" + + +def test_normalize_outline_caps_length() -> None: + raw = [{"level": "h2", "text": f"S{i}"} for i in range(40)] + out = _normalize_outline(raw, "Title") + assert len(out) <= 24 + + +def test_normalize_options_and_str_list_non_list() -> None: + assert _normalize_options("nope") == [] + assert _normalize_str_list({"a": 1}) == [] + + +# --- draft ---------------------------------------------------------------- + + +def test_generate_draft_happy() -> None: + outline = [{"level": "h2", "text": "Intro"}, {"level": "h3", "text": "Detail"}] + payload = {"title_tag": "SEO Title", "meta_description": "A meta", "sections": ["Intro prose.", {"text": "Detail prose."}]} + with ai(FakeClient(payload)): + out = generate_draft("best crm", "Learn", "Guide", "Professional", "My Title", outline) + assert out["ok"] is True + assert out["title_tag"] == "SEO Title" + assert out["meta_description"] == "A meta" + body = out["body_html"] + assert "Intro prose.
" in body + assert "Detail prose.
" in body + + +def test_generate_draft_fallbacks_and_placeholder() -> None: + outline = [{"level": "h2", "text": "Alpha"}, {"level": "h2", "text": "Beta"}] + with ai(FakeClient({})): # no title_tag, meta, or sections + out = generate_draft("best crm", "Learn", "Guide", "Professional", "My Title", outline) + assert out["title_tag"] == "My Title" + assert out["meta_description"].startswith("My Title") + assert "Add details about alpha here." in out["body_html"] + + +def test_assemble_body_escapes_and_handles_non_list_sections() -> None: + body = _assemble_body("Title & Co", [{"level": "h2", "text": "A < B"}], None) + assert "Title & Co" in body + assert "A < B" in body + # Placeholder prose is escaped on the way out (None sections → placeholder path). + assert "Add details about a < b here.
" in body + + +# --- research ------------------------------------------------------------- + + +def test_research_happy() -> None: + payload = { + "questions": ["What is it?", {"text": "How does it work?"}, "", 7], + "sources": [{"label": "Wikipedia", "description": "Overview"}], + } + with ai(FakeClient(payload)): + out = research_panel("chain reaction", intent="Learn", title="Guide") + assert out["ok"] is True + assert out["questions"] == ["What is it?", "How does it work?"] + assert out["sources"][0]["label"] == "Wikipedia" + + +def test_research_fallback_on_empty() -> None: + with ai(FakeClient({})): + out = research_panel("chain reaction") + assert out["ok"] is True + assert any("chain reaction" in q for q in out["questions"]) + assert any(s["label"] == "Wikipedia" for s in out["sources"]) + + +def test_research_keyword_required() -> None: + with ai(FakeClient({})): + out = research_panel(" ") + assert out["ok"] is False + assert out["error"] == "keyword required" + + +# --- dispatcher ----------------------------------------------------------- + + +def test_run_wizard_step_all_branches() -> None: + with ai(FakeClient({})): + assert run_wizard_step("intents", {"keyword": "best crm"})["ok"] is True + assert run_wizard_step("content_types", {"keyword": "best crm", "intent": "Learn"})["ok"] is True + assert run_wizard_step("tones", {"keyword": "k", "intent": "i", "contentType": "Guide"})["ok"] is True + assert run_wizard_step("titles", {"keyword": "k", "contentType": "Guide", "tone": "Pro"})["ok"] is True + assert run_wizard_step("outline", {"keyword": "k", "title": "T"})["ok"] is True + assert run_wizard_step("research", {"keyword": "k", "title": "T"})["ok"] is True + # outline arrives as a non-list → coerced to [] then normalized to a fallback. + draft = run_wizard_step("draft", {"keyword": "k", "title": "T", "outline": "bad"}) + assert draft["ok"] is True and draft["body_html"] + + +def test_run_wizard_step_unknown() -> None: + out = run_wizard_step("nope", {}) + assert out["ok"] is False + assert "unknown step" in out["error"] + + +def test_wizard_module_exposes_run_step() -> None: + assert hasattr(wizard, "run_wizard_step") diff --git a/tests/reporting/test_builder_sections.py b/tests/reporting/test_builder_sections.py new file mode 100644 index 0000000..d936a6d --- /dev/null +++ b/tests/reporting/test_builder_sections.py @@ -0,0 +1,94 @@ +"""Unit tests for the report-builder section helpers extracted from builder.py.""" +from __future__ import annotations + +import pandas as pd + +from website_profiling.reporting.builder_sections import ( + build_content_url_lists, + build_links_list, +) + + +def test_content_url_lists_classifies_issues() -> None: + df = pd.DataFrame([ + { # row 0: a problem page on every axis + "url": "https://a.com/1", "status": "200", "h1_count": 0, "title": "", + "meta_description_len": 0, "content_length": 50, "canonical_url": "", + "images_without_alt": 2, "images_total": 3, "img_without_lazy": 1, + "img_without_dimensions": 1, "response_time_ms": 3000, "html_lang": "", + "viewport_present": False, "reading_level": 15, "word_count": 50, + }, + { # row 1: long title/meta, multiple h1, canonical mismatch + "url": "https://a.com/2", "status": "200", "h1_count": 2, "title": "T" * 70, + "meta_description_len": 300, "content_length": 5000, + "canonical_url": "https://a.com/other", "images_without_alt": 0, "images_total": 1, + "img_without_lazy": 0, "img_without_dimensions": 0, "response_time_ms": 100, + "html_lang": "en", "viewport_present": True, "reading_level": 5, "word_count": 500, + }, + ]) + out = build_content_url_lists(df, df) # both rows are 2xx + + u1 = "https://a.com/1" + assert {"url": u1, "title": ""} in out["missing_h1"] + assert {"url": u1} in out["missing_title"] + assert any(r["url"] == u1 for r in out["missing_meta_desc"]) + assert any(r["url"] == u1 for r in out["thin_content"]) + assert any(r["url"] == u1 for r in out["missing_canonical"]) + assert any(r["url"] == u1 for r in out["missing_alt"]) + assert any(r["url"] == u1 for r in out["missing_lazy"]) + assert any(r["url"] == u1 for r in out["missing_dimensions"]) + assert any(r["url"] == u1 for r in out["slow_response"]) + assert any(r["url"] == u1 for r in out["missing_html_lang"]) + assert any(r["url"] == u1 for r in out["invalid_viewport"]) + assert any(r["url"] == u1 for r in out["high_reading_level"]) + assert any(r["url"] == u1 for r in out["very_thin_content"]) + + u2 = "https://a.com/2" + assert any(r["url"] == u2 for r in out["multiple_h1"]) + assert any(r["url"] == u2 for r in out["meta_desc_long"]) + assert any(r["url"] == u2 for r in out["title_long"]) + assert any(r["url"] == u2 for r in out["canonical_mismatch"]) + + +def test_links_list_maps_fields_and_overlays() -> None: + df = pd.DataFrame([ + { + "url": "https://a.com/p", "status": "200", "title": "Hi", "content_length": 1234, + "word_count": 300, "response_time_ms": 150, "depth": 2, "outlinks": 5, + "h1_count": 1, "noindex": True, "images_total": 4, "reading_level": 7.5, + "content_html_ratio": 12.3456, + "page_analysis": '{"internal_link_count": 3, "external_link_count": 1}', + }, + ]) + in_degree = {"https://a.com/p": 9} + ml_bundle = { + "language_by_url": {"https://a.com/p": "en"}, + "keyphrases_by_url": {"https://a.com/p": ["seo", "audit"]}, + "url_duplicate_group_id": {"https://a.com/p": 4}, + } + links = build_links_list(df, in_degree, {}, ml_bundle) + + assert len(links) == 1 + r = links[0] + assert r["url"] == "https://a.com/p" + assert r["inlinks"] == 9 + assert r["content_length"] == 1234 + assert r["word_count"] == 300 + assert r["response_time_ms"] == 150 + assert r["depth"] == 2 + assert r["outlinks"] == 5 + assert r["h1_count"] == 1 + assert r["noindex"] is True + assert r["reading_level"] == 7.5 + assert r["content_html_ratio"] == 12.35 # rounded to 2dp + assert r["internal_link_count"] == 3 + assert r["external_link_count"] == 1 + assert r["detected_language"] == "en" + assert r["keyphrases"] == ["seo", "audit"] + assert r["duplicate_group_id"] == 4 + assert "lighthouse" in r + + +def test_links_list_skips_blank_urls() -> None: + df = pd.DataFrame([{"url": "", "status": "200"}, {"url": None, "status": "200"}]) + assert build_links_list(df, {}, {}, {}) == [] diff --git a/tests/test_agent_react_tool_results.py b/tests/test_agent_react_tool_results.py new file mode 100644 index 0000000..4489008 --- /dev/null +++ b/tests/test_agent_react_tool_results.py @@ -0,0 +1,30 @@ +"""Regression test: the ReAct fallback must show prior tool results to the model. + +Providers without native tool calling (e.g. Gemini) go through `_react_step`. If +tool-result messages are excluded from the conversation, the model never sees the +output and keeps re-issuing the same call until MAX_TOOL_ROUNDS. +""" +from __future__ import annotations + +from website_profiling.llm import agent as agent_mod + + +class _CapturingClient: + def __init__(self) -> None: + self.user_prompt = "" + + def complete_json(self, system: str, user: str) -> dict: + self.user_prompt = user + return {"action": "answer", "text": "done"} + + +def test_react_step_includes_tool_results_in_prompt() -> None: + client = _CapturingClient() + messages = [ + {"role": "user", "content": "how healthy is the site?"}, + {"role": "assistant", "content": "Calling tool get_health"}, + {"role": "tool", "tool_call_id": "x", "content": '{"score": 80}'}, + ] + result = agent_mod._react_step(client, messages, "get_health", None) + assert result.content == "done" + assert '{"score": 80}' in client.user_prompt diff --git a/tests/test_config_parsing_unit.py b/tests/test_config_parsing_unit.py index 679c51b..088611e 100644 --- a/tests/test_config_parsing_unit.py +++ b/tests/test_config_parsing_unit.py @@ -32,6 +32,10 @@ def test_getters_bool_int_float_list() -> None: assert get_str(cfg, "missing") == "" assert get_bool(cfg, "b1", False) is True assert get_bool(cfg, "b2", True) is False + # Missing or empty value honors the default (must not silently disable a default-on flag). + assert get_bool(cfg, "missing", True) is True + assert get_bool({"e": ""}, "e", True) is True + assert get_bool({"e": " "}, "e", True) is True assert get_int(cfg, "i") == 10 assert get_int(cfg, "missing", 7) == 7 assert get_int(cfg, "bad", 3) == 3 diff --git a/tests/test_crawler_session_threadsafe.py b/tests/test_crawler_session_threadsafe.py new file mode 100644 index 0000000..236d305 --- /dev/null +++ b/tests/test_crawler_session_threadsafe.py @@ -0,0 +1,211 @@ +"""Thread-safety of crawler HTTP sessions. + +`requests.Session` is not documented as thread-safe, so the crawler must hand +each worker thread its own session. These tests pin that behaviour down on the +`StaticFetcher` (per-thread sessions, lifecycle) and the `Crawler` wiring. +""" +from __future__ import annotations + +import threading + +import requests + +from website_profiling.crawl.fetchers.factory import build_fetcher +from website_profiling.crawl.fetchers.static import StaticFetcher + + +class _FakeResp: + def __init__(self, status: int, content_type: str, body: str | None) -> None: + self.status_code = status + self.headers = {"Content-Type": content_type} + self.text = body + self.content = body.encode() if body is not None else None + self.url = "https://example.com/final" + self.history: list = [] + + +def test_static_fetcher_session_is_per_thread_and_reused() -> None: + built: list = [] + + def factory() -> requests.Session: + sess = requests.Session() + built.append(sess) + return sess + + fetcher = StaticFetcher(session_factory=factory) + try: + main_first = fetcher.session + # Second access on the same thread reuses the cached session. + main_second = fetcher.session + assert main_first is main_second + + seen: dict[str, requests.Session] = {} + + def grab(name: str) -> None: + seen[name] = fetcher.session + + t1 = threading.Thread(target=grab, args=("t1",)) + t2 = threading.Thread(target=grab, args=("t2",)) + t1.start() + t2.start() + t1.join() + t2.join() + + # Each thread got a distinct session, none shared with the main thread. + assert seen["t1"] is not seen["t2"] + assert seen["t1"] is not main_first + assert seen["t2"] is not main_first + # main thread + two worker threads => three builds. + assert len(built) == 3 + finally: + fetcher.close() + + +def test_static_fetcher_close_closes_owned_sessions() -> None: + closed: list = [] + + class FakeSession: + def close(self) -> None: + closed.append(self) + + fetcher = StaticFetcher(session_factory=lambda: FakeSession()) + owned = fetcher.session + fetcher.close() + assert closed == [owned] + # close() is idempotent: tracked sessions are cleared after the first call. + fetcher.close() + assert closed == [owned] + + +def test_static_fetcher_explicit_session_is_shared_and_not_closed() -> None: + closed: list = [] + + class FakeSession: + headers: dict = {} + + def close(self) -> None: + closed.append(self) + + sess = FakeSession() + fetcher = StaticFetcher(session=sess) + assert fetcher.session is sess + + grabbed: dict = {} + + def grab() -> None: + grabbed["worker"] = fetcher.session + + t = threading.Thread(target=grab) + t.start() + t.join() + # Legacy explicit session is shared as-is across threads. + assert grabbed["worker"] is sess + + fetcher.close() + # The caller owns an explicit session, so the fetcher must not close it. + assert closed == [] + + +def test_static_fetcher_default_factory_sets_user_agent() -> None: + fetcher = StaticFetcher(user_agent="MyBot/9") + try: + assert fetcher.session.headers["User-Agent"] == "MyBot/9" + finally: + fetcher.close() + + +def test_static_fetcher_fetch_success_non_html_and_error() -> None: + class OkSession: + def get(self, url, timeout, allow_redirects): # noqa: ANN001 + return _FakeResp(200, "text/html", "ok") + + def close(self) -> None: + pass + + fetcher = StaticFetcher(session_factory=lambda: OkSession()) + try: + ok = fetcher.fetch("https://example.com") + assert ok.status == 200 + assert ok.text == "ok" + assert ok.final_url == "https://example.com/final" + finally: + fetcher.close() + + class NonHtmlSession: + def get(self, url, timeout, allow_redirects): # noqa: ANN001 + # 404 + empty body exercises the non-HTML and content-is-None paths. + return _FakeResp(404, "application/json", None) + + def close(self) -> None: + pass + + non_html = StaticFetcher(session_factory=lambda: NonHtmlSession()) + try: + res = non_html.fetch("https://example.com") + assert res.status == 404 + assert res.text is None + assert res.content_length == 0 + finally: + non_html.close() + + class BoomSession: + def get(self, url, timeout, allow_redirects): # noqa: ANN001 + raise requests.RequestException("boom") + + def close(self) -> None: + pass + + boom = StaticFetcher(session_factory=lambda: BoomSession()) + try: + err = boom.fetch("https://example.com") + assert err.status is None + assert err.fetch_method == "static" + finally: + boom.close() + + +def test_build_fetcher_forwards_session_factory() -> None: + calls: list = [] + + def factory() -> requests.Session: + calls.append(1) + return requests.Session() + + fetcher = build_fetcher(render_mode="static", session_factory=factory) + try: + assert isinstance(fetcher, StaticFetcher) + assert fetcher.session is not None + assert calls # the factory, not the (absent) shared session, was used + finally: + fetcher.close() + + +def test_crawler_serves_distinct_configured_sessions_per_thread(monkeypatch) -> None: + monkeypatch.setattr( + "website_profiling.crawl.sitemap.discover_sitemap_urls", + lambda *_a, **_k: [], + ) + from website_profiling.crawl.crawler import Crawler + + crawler = Crawler( + start_url="https://site.com", + ignore_robots=True, + use_wappalyzer=False, + crawl_auth_username="user", + crawl_auth_password="pass", + crawl_extra_headers="X-Test: abc", + crawl_cookies="sid=1", + ) + + # A fresh session from the factory is a distinct object configured exactly + # like the main-thread template. + spawned = crawler._session_factory() + assert spawned is not crawler.session + assert spawned.headers["User-Agent"] == crawler.session.headers["User-Agent"] + assert spawned.headers["X-Test"] == "abc" + assert spawned.headers["Cookie"] == "sid=1" + assert spawned.auth == ("user", "pass") + + # The static fetcher never reuses the main-thread template for fetching. + assert crawler.fetcher.session is not crawler.session + assert crawler.fetcher.session.headers["Cookie"] == "sid=1" diff --git a/tests/test_llm_provider_anthropic.py b/tests/test_llm_provider_anthropic.py new file mode 100644 index 0000000..86b5be9 --- /dev/null +++ b/tests/test_llm_provider_anthropic.py @@ -0,0 +1,77 @@ +"""Regression tests for the Anthropic message/tool converter. + +An assistant message carrying OpenAI-shaped ``tool_calls`` must be reconstructed +into ``tool_use`` content blocks; otherwise the following ``tool_result`` has no +matching ``tool_use`` and the Anthropic Messages API returns HTTP 400 on every +multi-round tool conversation. +""" +from __future__ import annotations + +from website_profiling.llm.providers.anthropic import ( + _to_anthropic_messages, + _to_anthropic_tools, +) + + +def test_assistant_tool_calls_become_matching_tool_use_blocks() -> None: + messages = [ + {"role": "system", "content": "sys"}, + {"role": "user", "content": "hi"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + {"id": "call_1", "type": "function", + "function": {"name": "get_health", "arguments": '{"x": 1}'}}, + ], + }, + {"role": "tool", "tool_call_id": "call_1", "content": '{"score": 80}'}, + ] + system, conv = _to_anthropic_messages(messages) + + assert system == "sys" + assistant = conv[1] + assert assistant["role"] == "assistant" + tool_use = [b for b in assistant["content"] if b["type"] == "tool_use"] + assert len(tool_use) == 1 + assert tool_use[0]["id"] == "call_1" + assert tool_use[0]["name"] == "get_health" + assert tool_use[0]["input"] == {"x": 1} + + # The tool_result in the next turn references the same id -> valid pairing. + tool_result = conv[2]["content"][0] + assert tool_result["type"] == "tool_result" + assert tool_result["tool_use_id"] == "call_1" + + +def test_assistant_tool_calls_with_dict_arguments_and_text() -> None: + messages = [ + {"role": "assistant", "content": "thinking", + "tool_calls": [{"id": "c2", "function": {"name": "foo", "arguments": {"a": 2}}}]}, + ] + _, conv = _to_anthropic_messages(messages) + blocks = conv[0]["content"] + assert blocks[0] == {"type": "text", "text": "thinking"} + assert blocks[1]["input"] == {"a": 2} + + +def test_invalid_tool_call_arguments_fall_back_to_empty() -> None: + messages = [ + {"role": "assistant", "content": "", + "tool_calls": [{"id": "c3", "function": {"name": "foo", "arguments": "not-json"}}]}, + ] + _, conv = _to_anthropic_messages(messages) + assert conv[0]["content"][0]["input"] == {} + + +def test_plain_messages_pass_through() -> None: + _, conv = _to_anthropic_messages([{"role": "user", "content": "hi"}]) + assert conv == [{"role": "user", "content": "hi"}] + + +def test_to_anthropic_tools_maps_schema() -> None: + tools = [{"type": "function", "function": { + "name": "t", "description": "d", "parameters": {"type": "object", "properties": {}}}}] + assert _to_anthropic_tools(tools) == [ + {"name": "t", "description": "d", "input_schema": {"type": "object", "properties": {}}}, + ] diff --git a/tests/test_url_www_prefix.py b/tests/test_url_www_prefix.py new file mode 100644 index 0000000..079bd9b --- /dev/null +++ b/tests/test_url_www_prefix.py @@ -0,0 +1,33 @@ +"""Regression tests for stripping the leading ``www.`` host label. + +`str.lstrip("www.")` strips any leading char in the set {'w','.'}, so it corrupts +hosts like ``www.washington.edu`` (-> ``ashington.edu``). `strip_www_prefix` +removes only a single literal ``www.`` prefix. +""" +from __future__ import annotations + +from website_profiling.common import strip_www_prefix +from website_profiling.integrations.google.normalize import normalize_url + + +def test_removes_only_the_leading_www_label() -> None: + assert strip_www_prefix("www.example.com") == "example.com" + + +def test_does_not_eat_into_the_host() -> None: + # The lstrip bug would have produced "ashington.edu". + assert strip_www_prefix("www.washington.edu") == "washington.edu" + + +def test_non_www_hosts_are_untouched() -> None: + assert strip_www_prefix("web.example.com") == "web.example.com" + assert strip_www_prefix("example.com") == "example.com" + assert strip_www_prefix("") == "" + + +def test_normalize_url_strips_www_label_not_chars() -> None: + assert normalize_url("https://www.washington.edu/admissions/") == "washington.edu/admissions" + + +def test_normalize_url_preserves_w_prefixed_host() -> None: + assert normalize_url("https://web.example.com") == "web.example.com/" diff --git a/web/app/api/content/wizard/route.ts b/web/app/api/content/wizard/route.ts new file mode 100644 index 0000000..43b3030 --- /dev/null +++ b/web/app/api/content/wizard/route.ts @@ -0,0 +1,90 @@ +import { NextResponse, type NextRequest } from 'next/server'; +import { spawn } from 'child_process'; +import { forbiddenIfNotLocal } from '@/server/localOnly'; +import { requireApiAuth } from '@/server/auth'; +import { getRepoRoot, getPipelineSpawnEnv } from '@/server/pipelineSpawnEnv'; +import { resolvePythonExecutable, parsePythonJsonStdout } from '@/server/resolvePython'; +import type { ApiRouteHandler } from '@/types/api'; + +export const runtime = 'nodejs'; +export const dynamic = 'force-dynamic'; + +const VALID_STEPS = new Set(['intents', 'content_types', 'tones', 'titles', 'outline', 'draft', 'research']); + +/** + * POST /api/content/wizard — one step of the guided-draft wizard. + * Body: { step, keyword, locale?, intent?, contentType?, tone?, title?, outline? } + */ +export const POST: ApiRouteHandler = async (request: NextRequest): Promise{s.body}
+
{p.keywordNeeded}
; + + return ( +
+
{error}
: null} + + {questions && questions.length > 0 ? ( +{p.keywordNeeded}
; + + const headings = draftHeadings(bodyHtml); + + return ( +{p.outlineSub}
+
+
{error}
: null} + + {outline && outline.length > 0 ? ( ++ {keyword.trim()} · {locale} +
+ ) : null} ++ {w.selectedIntent}{' '} + {intent} +
+{w.keywordSub}
+ +{sub}
+{w.titleSub}
+{w.outlineSub}
+{s.noTerms}
+ ) : grouped ? ( +{s.groupHigh}
+{s.groupOther}
+{s.dockTitle}
++ {busy + ? s.dockRunning + : status === 'error' + ? s.dockFailed + : status + ? `${s.statusLabel}: ${status}` + : log + ? s.dockFailed + : 'Idle'} +
+