Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions alembic/versions/018_link_edge_position.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Add position column to link_edges for nav/header/content/footer/sidebar classification.

Revision ID: 018_link_edge_position
Revises: 017_content_drafts
"""
from __future__ import annotations

from alembic import op

revision = "018_link_edge_position"
down_revision = "017_content_drafts"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.execute("""
ALTER TABLE link_edges
ADD COLUMN IF NOT EXISTS position TEXT NOT NULL DEFAULT 'content'
""")


def downgrade() -> None:
op.execute("ALTER TABLE link_edges DROP COLUMN IF EXISTS position")
24 changes: 24 additions & 0 deletions alembic/versions/019_crawl_run_mobile_link.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Add mobile_run_id to crawl_runs for pairing desktop+mobile dual crawls.

Revision ID: 019_crawl_run_mobile_link
Revises: 018_link_edge_position
"""
from __future__ import annotations

from alembic import op

revision = "019_crawl_run_mobile_link"
down_revision = "018_link_edge_position"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.execute("""
ALTER TABLE crawl_runs
ADD COLUMN IF NOT EXISTS mobile_run_id INT REFERENCES crawl_runs(id)
""")


def downgrade() -> None:
op.execute("ALTER TABLE crawl_runs DROP COLUMN IF EXISTS mobile_run_id")
26 changes: 26 additions & 0 deletions alembic/versions/020_crawl_run_pause_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Add pause_state JSONB and paused_at to crawl_runs.

Revision ID: 020_crawl_run_pause_state
Revises: 019_crawl_run_mobile_link
Create Date: 2026-06-18
"""
from alembic import op

revision = "020_crawl_run_pause_state"
down_revision = "019_crawl_run_mobile_link"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.execute(
"ALTER TABLE crawl_runs ADD COLUMN IF NOT EXISTS pause_state JSONB"
)
op.execute(
"ALTER TABLE crawl_runs ADD COLUMN IF NOT EXISTS paused_at TEXT"
)


def downgrade() -> None:
op.execute("ALTER TABLE crawl_runs DROP COLUMN IF EXISTS pause_state")
op.execute("ALTER TABLE crawl_runs DROP COLUMN IF EXISTS paused_at")
2 changes: 2 additions & 0 deletions crawl_results.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
url,status
https://a.com,200
5 changes: 3 additions & 2 deletions input.txt.example
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ crawl_exclude_urls =
crawl_discovery_mode = spider
crawl_url_list =
crawl_user_agent_preset = default
crawl_user_agent_custom =
crawl_auth_username =
crawl_user_agent_custom =
compare_mobile_desktop = false
crawl_auth_username =
crawl_auth_password =
crawl_extra_headers =
crawl_cookies =
Expand Down
3 changes: 2 additions & 1 deletion pipeline-config.example.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ crawl_exclude_urls =
crawl_discovery_mode = spider
crawl_url_list =
crawl_user_agent_preset = default
crawl_user_agent_custom =
crawl_user_agent_custom =
compare_mobile_desktop = false
crawl_auth_username =
crawl_auth_password =
crawl_extra_headers =
Expand Down
7 changes: 7 additions & 0 deletions src/website_profiling/commands/config_resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,4 +364,11 @@ def build_parser() -> argparse.ArgumentParser:
dest="stdin_json",
help="For 'chat' command: read JSON payload from stdin and emit NDJSON events.",
)
parser.add_argument(
"--resume-run-id",
type=int,
default=None,
dest="resume_run_id",
help="Resume a paused crawl from the saved frontier of the given crawl_run_id.",
)
return parser
10 changes: 8 additions & 2 deletions src/website_profiling/commands/pipeline_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,12 @@ def run(cfg: dict, args: argparse.Namespace) -> None:

phase_results: list[PhaseResult] = []

if run_crawl:
resume_run_id = getattr(args, "resume_run_id", None)
if resume_run_id is not None:
phase_results.append(
run_pipeline_phase("crawl", lambda: _run_crawl(cfg, use_database, resume_run_id=resume_run_id))
)
elif run_crawl:
phase_results.append(run_pipeline_phase("crawl", lambda: _run_crawl(cfg, use_database)))

if run_content_analysis and use_database:
Expand Down Expand Up @@ -209,7 +214,7 @@ def _finalize_pipeline_run(phase_results: list[PhaseResult]) -> None:
sys.exit(1)


def _run_crawl(cfg: dict, use_database: bool) -> None:
def _run_crawl(cfg: dict, use_database: bool, resume_run_id: int | None = None) -> None:
from ..crawl.crawler import run_crawler

console_print("[Crawl] Starting...", flush=True)
Expand Down Expand Up @@ -304,6 +309,7 @@ def _run_crawl(cfg: dict, use_database: bool) -> None:
crawl_robots_txt_override=(cfg.get("crawl_robots_txt_override") or "").strip(),
custom_extractors=custom_extractors or None,
enable_axe=enable_axe,
resume_run_id=resume_run_id,
)
console_print("[Crawl] Done.", flush=True)
emit_phase_done("crawl")
Expand Down
1 change: 1 addition & 0 deletions src/website_profiling/crawl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class CrawlConfig:
crawl_robots_txt_override: str = ""
custom_extractors: Optional[list[dict]] = None
enable_axe: bool = False
compare_mobile_desktop: bool = False

@classmethod
def from_kwargs(cls, **kwargs: object) -> CrawlConfig:
Expand Down
145 changes: 145 additions & 0 deletions src/website_profiling/crawl/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,26 @@
from __future__ import annotations

import json
import os
import signal
import threading
import time
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
from typing import Optional

# Module-level pause event — set by SIGUSR1 (Unix) or a PID-keyed file (Windows).
_PAUSE_EVENT = threading.Event()


def _handle_pause_signal(signum: int, frame: object) -> None: # pragma: no cover
_PAUSE_EVENT.set()


try:
signal.signal(signal.SIGUSR1, _handle_pause_signal)
except (AttributeError, OSError): # pragma: no cover
pass # SIGUSR1 not available on Windows

import pandas as pd
import requests
from tqdm.auto import tqdm
Expand Down Expand Up @@ -112,6 +128,7 @@ def __init__(
enable_axe: bool = False,
*,
config: Optional[CrawlConfig] = None,
pause_state: Optional[dict] = None,
):
if config is None:
config = CrawlConfig.from_kwargs(
Expand Down Expand Up @@ -206,6 +223,7 @@ def __init__(
self.lock = self.frontier.lock

self.results: list[dict] = []
self.paused: bool = False
# `requests.Session` is not thread-safe, so worker threads each build
# their own session from this factory (see StaticFetcher). The template
# `self.session` below is only touched on the main thread (sitemap
Expand Down Expand Up @@ -233,6 +251,8 @@ def __init__(
self._hybrid_fetcher = (
self.fetcher if isinstance(self.fetcher, HybridFetcher) else None
)
if pause_state:
self.frontier.restore_from_state(pause_state)
self.frontier.seed_initial_urls(
discovery_mode=config.discovery_mode,
crawl_url_list=config.crawl_url_list,
Expand Down Expand Up @@ -429,6 +449,7 @@ def crawl(
stream_crawl_run_id: Optional[int] = None,
stream_batch_size: int = 500,
) -> pd.DataFrame:
_PAUSE_EVENT.clear()
start_time = time.time()
from ..progress import CrawlProgressTracker, emit_phase_start

Expand Down Expand Up @@ -509,6 +530,22 @@ def crawl(
remaining.append(f)
futures = remaining

# Check for pause request (SIGUSR1) or Windows file-based signal.
if not _PAUSE_EVENT.is_set():
_pause_file = os.path.join(
os.environ.get("TMPDIR", "/tmp"),
f"wp_pause_{os.getpid()}.flag",
)
if os.path.exists(_pause_file):
try:
os.unlink(_pause_file)
except OSError:
pass
_PAUSE_EVENT.set()
if _PAUSE_EVENT.is_set():
self.paused = True
break

if self.queue.empty() and not futures:
break
finally:
Expand Down Expand Up @@ -582,8 +619,22 @@ def run_crawler(
crawl_robots_txt_override: str = "",
custom_extractors: Optional[list] = None,
enable_axe: bool = False,
compare_mobile_desktop: bool = False,
resume_run_id: Optional[int] = None,
) -> pd.DataFrame:
"""Run crawler and optionally save to CSV/JSON or PostgreSQL. Returns DataFrame."""
_resume_pause_state: Optional[dict] = None
if resume_run_id is not None:
from ..db import db_session
from ..db.crawl_store import load_pause_state
with db_session() as _conn:
_resume_pause_state = load_pause_state(_conn, resume_run_id)
if _resume_pause_state:
console_print(
f" Resuming from paused run {resume_run_id} "
f"({len(_resume_pause_state.get('pending', []))} URLs pending)...",
flush=True,
)
max_p = max_pages if max_pages is not None else 0
mode_label = (render_mode or "static").strip().lower()
disc_label = normalize_discovery_mode(discovery_mode)
Expand Down Expand Up @@ -634,6 +685,7 @@ def run_crawler(
crawl_robots_txt_override=crawl_robots_txt_override,
custom_extractors=custom_extractors,
enable_axe=enable_axe,
pause_state=_resume_pause_state,
)
stream_run_id: Optional[int] = None
if output_db:
Expand Down Expand Up @@ -663,6 +715,35 @@ def run_crawler(
show_progress=show_progress,
stream_crawl_run_id=stream_run_id,
)

# ---- Pause handling: save frontier and exit with code 2 ----
if getattr(crawler, "paused", False):
import sys
from ..db import db_session
from ..db.crawl_store import save_pause_state

_pause_run_id = stream_run_id
if _pause_run_id is not None:
_frontier_state = crawler.frontier.serialize_state()
_frontier_state["pages_crawled"] = len(crawler.results)
with db_session() as _conn:
save_pause_state(_conn, _pause_run_id, _frontier_state)
console_print(
f"[PAUSE] crawl_run_id={_pause_run_id}",
flush=True,
)
else:
console_print("[PAUSE] crawl_run_id=none", flush=True)
sys.exit(2)

# ---- Resume cleanup: clear saved frontier from the resumed run ----
if resume_run_id is not None and _resume_pause_state is not None and not getattr(crawler, "paused", False):
from ..db import db_session
from ..db.crawl_store import clear_pause_state

with db_session() as _conn:
clear_pause_state(_conn, resume_run_id)

if output_db and crawler.link_edges_accum:
from ..db import db_session
from ..db.crawl_store import write_link_edges
Expand Down Expand Up @@ -712,6 +793,70 @@ def run_crawler(
console_print(" Crawl DB write complete.", flush=True)
elif output_db and stream_run_id is not None:
console_print(" Crawl streamed to DB during fetch.", flush=True)

# Second pass: run mobile crawl and pair the two runs via mobile_run_id FK
if compare_mobile_desktop and output_db and run_id is not None:
from ..db import db_session
from ..db.crawl_store import get_latest_crawl_run_id, set_mobile_run_id

console_print(" Starting mobile second-pass crawl for comparison...", flush=True)
with db_session() as _conn:
_baseline_id = get_latest_crawl_run_id(_conn) or 0
run_crawler(
start_url=start_url,
max_pages=max_pages,
concurrency=concurrency,
timeout=timeout,
ignore_robots=ignore_robots,
allow_external=allow_external,
max_depth=max_depth,
polite_delay=polite_delay,
store_outlinks=store_outlinks,
output_csv=None,
output_db=True,
show_progress=show_progress,
exclude_urls=exclude_urls,
preserve_crawl_history=True,
store_content_excerpt=store_content_excerpt,
content_excerpt_max_chars=content_excerpt_max_chars,
store_page_html=False,
run_content_analysis=False,
crawl_stream_to_db=crawl_stream_to_db,
property_id=property_id,
render_mode=render_mode,
js_concurrency=js_concurrency,
js_timeout=js_timeout,
js_wait_until=js_wait_until,
js_extra_wait_ms=js_extra_wait_ms,
js_block_resources=js_block_resources,
capture_console=capture_console,
js_console_levels=js_console_levels,
capture_failed_requests=capture_failed_requests,
console_max_per_page=console_max_per_page,
custom_extraction_regex=custom_extraction_regex,
crawl_ignore_params=crawl_ignore_params,
discovery_mode=discovery_mode,
crawl_url_list=crawl_url_list,
crawl_user_agent_preset="mobile",
crawl_user_agent_custom="",
crawl_auth_username=crawl_auth_username,
crawl_auth_password=crawl_auth_password,
crawl_extra_headers=crawl_extra_headers,
crawl_cookies=crawl_cookies,
crawl_robots_txt_override=crawl_robots_txt_override,
custom_extractors=custom_extractors,
enable_axe=False,
compare_mobile_desktop=False,
)
with db_session() as _conn:
mobile_id = get_latest_crawl_run_id(_conn)
if mobile_id is not None and mobile_id != _baseline_id:
set_mobile_run_id(_conn, run_id, mobile_id)
console_print(
f" Mobile crawl complete (run_id={mobile_id}). Linked to desktop run {run_id}.",
flush=True,
)

elif output_csv and not df.empty:
if output_csv.lower().endswith(".json"):
df.to_json(output_csv, orient="records", indent=2, date_format="iso", default_handler=str)
Expand Down
16 changes: 16 additions & 0 deletions src/website_profiling/crawl/frontier.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,19 @@ def mark_visited(self, url: str) -> bool:

def should_skip_dequeued(self, url: str) -> bool:
return url_matches_exclude(url, self.exclude_urls)

def serialize_state(self) -> dict:
"""Return a JSON-serialisable snapshot of the frontier for pause/resume."""
with self.lock:
pending = list(self.queue.queue)
visited = list(self.visited)
depths = dict(self.depths)
return {"pending": pending, "visited": visited, "depths": depths}

def restore_from_state(self, state: dict) -> None:
"""Pre-populate the frontier from a previously serialised state."""
with self.lock:
for url in state.get("pending", []):
self.queue.put(url)
self.visited.update(state.get("visited", []))
self.depths.update(state.get("depths", {}))
Loading
Loading