diff --git a/src/docpull/benchmark.py b/src/docpull/benchmark.py index 23d2f71..b744de8 100644 --- a/src/docpull/benchmark.py +++ b/src/docpull/benchmark.py @@ -5,13 +5,16 @@ import argparse import asyncio import json +import random import resource import sys import time import uuid from collections import Counter +from collections.abc import Callable from dataclasses import dataclass from pathlib import Path +from statistics import median from typing import Any from urllib.error import HTTPError, URLError from urllib.parse import urlparse @@ -60,6 +63,9 @@ EXA_SEARCH_URL = "https://api.exa.ai/search" TAVILY_SEARCH_URL = "https://api.tavily.com/search" TAVILY_EXTRACT_URL = "https://api.tavily.com/extract" +HTTP_RETRY_MAX_ATTEMPTS = 3 +HTTP_RETRY_TRANSIENT_STATUSES: frozenset[int] = frozenset({429, 502, 503, 504}) +HTTP_RETRY_CAP_SECONDS = 30.0 BENCHMARK_SCORE_WEIGHTS = { "coverage": 0.30, "cleanliness": 0.20, @@ -249,6 +255,17 @@ def create_benchmark_parser() -> argparse.ArgumentParser: quick.add_argument("--max-concurrent", type=int, default=8, help="Core crawl concurrency") quick.add_argument("--per-host-concurrent", type=int, default=4, help="Core per-host concurrency") quick.add_argument("--no-cache", action="store_true", help="Disable cache for the core crawl") + quick.add_argument( + "--runs", + type=int, + default=1, + help=( + "Repeat each case N times and report median wall seconds and score " + "with min/max spread. Per-run artifacts land under run-1/, run-2/, ... " + "subdirs. N>1 forces --no-cached-pass since the cached pass shares state " + "with the prior run by design." + ), + ) quick.set_defaults(cached_pass=None) quick.add_argument( "--cached-pass", @@ -376,6 +393,7 @@ def run_benchmark_cli(argv: list[str] | None = None) -> int: tavily_credit_usd=args.tavily_credit_usd, max_estimated_cost=args.max_estimated_cost, trace_backend=args.trace, + runs=args.runs, ) if args.json_output: console.print_json(data=report) @@ -431,6 +449,7 @@ def run_quick_benchmark( tavily: bool = False, exa: bool = False, live_providers: list[str] | None = None, + runs: int = 1, ) -> dict[str, Any]: """Run the default real-site benchmark matrix.""" _validate_positive_int(max_pages, "max_pages") @@ -439,6 +458,7 @@ def run_quick_benchmark( _validate_positive_int(per_host_concurrent, "per_host_concurrent") _validate_positive_int(max_search_results, "max_search_results") _validate_positive_int(extract_limit, "extract_limit") + _validate_positive_int(runs, "runs") if max_estimated_cost < 0: raise BenchmarkError("max_estimated_cost cannot be negative.") target_set = _canonical_target_set(target_set) @@ -452,6 +472,11 @@ def run_quick_benchmark( ) if cached_pass is None: cached_pass = len(targets) == 1 + if runs > 1: + # Cached pass shares cache state with its prior pass by design; that + # composes poorly with N-run aggregation. Force it off and surface in + # the report so users notice. + cached_pass = False requested_providers = _normalize_live_providers( parallel=parallel, tavily=tavily, @@ -511,23 +536,44 @@ def run_and_record( provider: str, target: _BenchmarkTarget, output_dir: Path, + cache_dir: Path | None = None, prompt: str, settings: dict[str, Any], - runner: Any, + runner_factory: Callable[..., dict[str, Any]], ) -> None: - rss_before = _peak_rss_bytes() - t0 = time.perf_counter() - try: - case = runner() - except Exception as err: # noqa: BLE001 - case = _failed_case( + per_run: list[dict[str, Any]] = [] + for run_index in range(1, runs + 1): + if runs == 1: + run_output = output_dir + run_cache = cache_dir + else: + run_output = output_dir / f"run-{run_index}" + run_cache = (cache_dir / f"run-{run_index}") if cache_dir else None + rss_before = _peak_rss_bytes() + t0 = time.perf_counter() + try: + case = runner_factory(output_dir=run_output, cache_dir=run_cache) + except Exception as err: # noqa: BLE001 + case = _failed_case( + name=name, + workflow=workflow, + output_dir=run_output, + wall_seconds=time.perf_counter() - t0, + rss_before=rss_before, + error=err, + ) + per_run.append(case) + case = ( + per_run[0] + if runs == 1 + else _aggregate_runs( + per_run, name=name, workflow=workflow, output_dir=output_dir, - wall_seconds=time.perf_counter() - t0, - rss_before=rss_before, - error=err, + runs_total=runs, ) + ) _annotate_case( case, provider=provider, @@ -541,14 +587,39 @@ def run_and_record( for target in targets: target_root = run_dir / _safe_slug(target.id) if matrix_run else run_dir - cache_dir = target_root / "cache-core" + core_cache_dir = target_root / "cache-core" core_output = target_root / "core-llm" + + def core_factory( + *, + output_dir: Path, + cache_dir: Path | None, + target: _BenchmarkTarget = target, + ) -> dict[str, Any]: + assert cache_dir is not None + return asyncio.run( + _run_core_case( + name="core-llm", + target_url=target.url, + output_dir=output_dir, + cache_dir=cache_dir, + cache_enabled=cache_enabled, + max_pages=max_pages, + max_depth=max_depth, + max_concurrent=max_concurrent, + per_host_concurrent=per_host_concurrent, + include_domains=list(target.include_domains), + target=target, + ) + ) + run_and_record( name="core-llm", workflow="core-llm", provider="docpull", target=target, output_dir=core_output, + cache_dir=core_cache_dir, prompt=target.objective, settings={ "profile": "llm", @@ -558,25 +629,38 @@ def run_and_record( "per_host_concurrent": per_host_concurrent, "cache_enabled": cache_enabled, }, - runner=lambda target=target, core_output=core_output, cache_dir=cache_dir: asyncio.run( - _run_core_case( - name="core-llm", - target_url=target.url, - output_dir=core_output, - cache_dir=cache_dir, - cache_enabled=cache_enabled, - max_pages=max_pages, - max_depth=max_depth, - max_concurrent=max_concurrent, - per_host_concurrent=per_host_concurrent, - include_domains=list(target.include_domains), - target=target, - ) - ), + runner_factory=core_factory, ) if cache_enabled and cached_pass: cached_output = target_root / "core-llm-cached" + + def cached_factory( + *, + output_dir: Path, + cache_dir: Path | None, + target: _BenchmarkTarget = target, + shared_cache: Path = core_cache_dir, + ) -> dict[str, Any]: + # Reuse the prior pass's cache regardless of the per-run cache + # arg — the cached pass is the second half of a paired + # measurement. (Only reachable when runs == 1.) + return asyncio.run( + _run_core_case( + name="core-llm-cached", + target_url=target.url, + output_dir=output_dir, + cache_dir=shared_cache, + cache_enabled=True, + max_pages=max_pages, + max_depth=max_depth, + max_concurrent=max_concurrent, + per_host_concurrent=per_host_concurrent, + include_domains=list(target.include_domains), + target=target, + ) + ) + run_and_record( name="core-llm-cached", workflow="core-llm", @@ -593,37 +677,24 @@ def run_and_record( "cache_enabled": True, "cache_measurement": True, }, - runner=lambda target=target, cached_output=cached_output, cache_dir=cache_dir: asyncio.run( - _run_core_case( - name="core-llm-cached", - target_url=target.url, - output_dir=cached_output, - cache_dir=cache_dir, - cache_enabled=True, - max_pages=max_pages, - max_depth=max_depth, - max_concurrent=max_concurrent, - per_host_concurrent=per_host_concurrent, - include_domains=list(target.include_domains), - target=target, - ) - ), + runner_factory=cached_factory, ) if parallel: source_policy = _build_source_policy(include_domains=list(target.include_domains)) search_output = target_root / "parallel-search" - def parallel_search_runner( + def parallel_search_factory( *, + output_dir: Path, + cache_dir: Path | None, target: _BenchmarkTarget = target, source_policy: dict[str, Any] = source_policy, - search_output: Path = search_output, ) -> dict[str, Any]: return _run_parallel_search_case( objective=target.objective, queries=list(target.queries), - output_dir=search_output, + output_dir=output_dir, include_domains=list(target.include_domains), source_policy=source_policy, mode=mode, @@ -640,20 +711,21 @@ def parallel_search_runner( output_dir=search_output, prompt=target.objective, settings={"mode": mode, "max_search_results": max_search_results}, - runner=parallel_search_runner, + runner_factory=parallel_search_factory, ) context_output = target_root / "parallel-context" - def parallel_context_runner( + def parallel_context_factory( *, + output_dir: Path, + cache_dir: Path | None, target: _BenchmarkTarget = target, source_policy: dict[str, Any] = source_policy, - context_output: Path = context_output, ) -> dict[str, Any]: return _run_parallel_context_case( objective=target.objective, queries=list(target.queries), - output_dir=context_output, + output_dir=output_dir, include_domains=list(target.include_domains), source_policy=source_policy, mode=mode, @@ -675,11 +747,29 @@ def parallel_context_runner( "max_search_results": max_search_results, "extract_limit": extract_limit, }, - runner=parallel_context_runner, + runner_factory=parallel_context_factory, ) if "tavily" in providers: tavily_output = target_root / "tavily-search-extract" + + def tavily_factory( + *, + output_dir: Path, + cache_dir: Path | None, + target: _BenchmarkTarget = target, + ) -> dict[str, Any]: + return _run_tavily_case( + objective=target.objective, + queries=list(target.queries), + output_dir=output_dir, + include_domains=list(target.include_domains), + max_search_results=max_search_results, + extract_limit=extract_limit, + tavily_credit_usd=tavily_credit_usd, + target=target, + ) + run_and_record( name="tavily-search-extract", workflow="tavily-search-extract-pack", @@ -692,20 +782,27 @@ def parallel_context_runner( "extract_limit": extract_limit, "tavily_credit_usd": tavily_credit_usd, }, - runner=lambda target=target, tavily_output=tavily_output: _run_tavily_case( + runner_factory=tavily_factory, + ) + + if "exa" in providers: + exa_output = target_root / "exa-search-contents" + + def exa_factory( + *, + output_dir: Path, + cache_dir: Path | None, + target: _BenchmarkTarget = target, + ) -> dict[str, Any]: + return _run_exa_case( objective=target.objective, queries=list(target.queries), - output_dir=tavily_output, + output_dir=output_dir, include_domains=list(target.include_domains), max_search_results=max_search_results, - extract_limit=extract_limit, - tavily_credit_usd=tavily_credit_usd, target=target, - ), - ) + ) - if "exa" in providers: - exa_output = target_root / "exa-search-contents" run_and_record( name="exa-search-contents", workflow="exa-search-contents-pack", @@ -714,14 +811,7 @@ def parallel_context_runner( output_dir=exa_output, prompt=target.objective, settings={"max_search_results": max_search_results}, - runner=lambda target=target, exa_output=exa_output: _run_exa_case( - objective=target.objective, - queries=list(target.queries), - output_dir=exa_output, - include_domains=list(target.include_domains), - max_search_results=max_search_results, - target=target, - ), + runner_factory=exa_factory, ) report_path = run_dir / "benchmark.report.json" @@ -740,6 +830,7 @@ def parallel_context_runner( "skipped_providers": skipped_providers, "provider_status": _benchmark_provider_statuses(provider_status), "cost_normalization": _cost_normalization_metadata(tavily_credit_usd), + "runs_per_case": runs, "trace": trace.metadata(), "cases": cases, "summary": _summary(cases), @@ -1875,6 +1966,8 @@ def _benchmark_score( include_domains: list[str], target: _BenchmarkTarget | None, ) -> dict[str, Any]: + if not records: + return _empty_benchmark_score() dimensions = { "coverage": _coverage_dimension(payload, records, target), "cleanliness": _cleanliness_dimension(payload, records), @@ -1897,6 +1990,137 @@ def _benchmark_score( } +def _aggregate_runs( + runs: list[dict[str, Any]], + *, + name: str, + workflow: str, + output_dir: Path, + runs_total: int, +) -> dict[str, Any]: + """Synthesize a single case dict from N per-run case dicts. + + Headline fields (wall_seconds, pack_score.score, benchmark_score.score) + are reported as medians across successful runs, with min/max alongside. + The full per-run list is preserved under ``runs`` for raw inspection. + """ + successful = [run for run in runs if run.get("status") != "failed"] + wall_seconds_list = [float(run.get("wall_seconds") or 0.0) for run in runs] + estimated_costs = [float(run.get("estimated_cost_usd") or 0.0) for run in runs] + artifact_sizes = [int(run.get("artifact_size_bytes") or 0) for run in runs] + cache_sizes = [int(run.get("cache_size_bytes") or 0) for run in runs] + rss_deltas = [float(run.get("rss_delta_mb") or 0.0) for run in runs] + rss_baselines = [float(run.get("rss_baseline_mb") or 0.0) for run in runs] + rss_peaks = [float(run.get("rss_peak_mb") or 0.0) for run in runs] + + case: dict[str, Any] = { + "name": name, + "workflow": workflow, + "output_dir": str(output_dir), + "wall_seconds": round(median(wall_seconds_list), 3), + "wall_seconds_min": round(min(wall_seconds_list), 3), + "wall_seconds_max": round(max(wall_seconds_list), 3), + "wall_seconds_runs": [round(value, 3) for value in wall_seconds_list], + "rss_baseline_mb": round(min(rss_baselines), 1) if rss_baselines else 0.0, + "rss_peak_mb": round(max(rss_peaks), 1) if rss_peaks else 0.0, + "rss_delta_mb": round(max(rss_deltas), 1) if rss_deltas else 0.0, + "artifact_size_bytes": sum(artifact_sizes), + "cache_size_bytes": sum(cache_sizes), + "estimated_cost_usd": round(sum(estimated_costs), 6), + "runs_total": runs_total, + "runs_succeeded": len(successful), + "runs": runs, + } + + pack_scores = [ + int(run["pack_score"]["score"]) + for run in successful + if isinstance(run.get("pack_score"), dict) and isinstance(run["pack_score"].get("score"), int) + ] + if pack_scores: + med = _median_int(pack_scores) + representative = next( + run + for run in successful + if isinstance(run.get("pack_score"), dict) and int(run["pack_score"].get("score", -1)) == med + ) + case["pack_score"] = { + **representative["pack_score"], + "score": med, + "score_min": min(pack_scores), + "score_max": max(pack_scores), + "score_runs": pack_scores, + } + else: + case["pack_score"] = None + + benchmark_scores = [ + int(run["benchmark_score"]["score"]) + for run in successful + if isinstance(run.get("benchmark_score"), dict) + and isinstance(run["benchmark_score"].get("score"), int) + ] + if benchmark_scores: + med = _median_int(benchmark_scores) + representative = next( + run + for run in successful + if isinstance(run.get("benchmark_score"), dict) + and int(run["benchmark_score"].get("score", -1)) == med + ) + case["benchmark_score"] = { + **representative["benchmark_score"], + "score": med, + "score_min": min(benchmark_scores), + "score_max": max(benchmark_scores), + "score_runs": benchmark_scores, + } + else: + case["benchmark_score"] = None + + if not successful: + case["status"] = "failed" + first_error = next((run.get("error") for run in runs if run.get("error")), None) + case["error"] = first_error or { + "type": "BenchmarkError", + "message": f"all {runs_total} runs failed", + } + + if successful: + first = successful[0] + for key in ("stats", "skip_counts", "cost_units", "pack_metadata", "source_score_count"): + if key in first: + case[key] = first[key] + + return case + + +def _median_int(values: list[int]) -> int: + """Lower-median: returns an actual element of ``values`` (no interpolation).""" + ordered = sorted(values) + return ordered[(len(ordered) - 1) // 2] + + +def _empty_benchmark_score() -> dict[str, Any]: + """Floor the score when a pack has zero records. + + Without this, cleanliness/source_fidelity/freshness return 100 because they + have nothing to penalize, leaving the weighted score around 50. An empty + pack should read as failure, not a passing grade. + """ + dimensions = { + name: {"score": 0, "weight": weight, "signals": ["empty pack"]} + for name, weight in BENCHMARK_SCORE_WEIGHTS.items() + } + return { + "schema_version": BENCHMARK_SCHEMA_VERSION, + "score": 0, + "grade": _benchmark_grade(0), + "weights": BENCHMARK_SCORE_WEIGHTS, + "dimensions": dimensions, + } + + def _coverage_dimension( payload: dict[str, Any], records: list[dict[str, Any]], @@ -2216,6 +2440,38 @@ def _http_json_post( headers: dict[str, str], body: dict[str, Any], timeout: float, + max_attempts: int = HTTP_RETRY_MAX_ATTEMPTS, + sleep: Any = time.sleep, +) -> dict[str, Any]: + """POST JSON with bounded retry on transient HTTP/URL errors. + + Retries on 429/502/503/504 and URLError up to ``max_attempts`` total. + Honors ``Retry-After`` (seconds) when present, capped at + ``HTTP_RETRY_CAP_SECONDS``. Other 4xx, JSON errors, and non-https URLs + raise immediately. + """ + last_error: BenchmarkError | None = None + for attempt in range(1, max_attempts + 1): + try: + return _http_json_post_once(label=label, url=url, headers=headers, body=body, timeout=timeout) + except _TransientHTTPError as err: + last_error = BenchmarkError(str(err)) + last_error.__cause__ = err.__cause__ + if attempt >= max_attempts: + break + delay = _retry_delay_seconds(attempt=attempt, retry_after=err.retry_after) + sleep(delay) + assert last_error is not None + raise last_error + + +def _http_json_post_once( + *, + label: str, + url: str, + headers: dict[str, str], + body: dict[str, Any], + timeout: float, ) -> dict[str, Any]: parsed_url = urlparse(url) if parsed_url.scheme != "https": @@ -2234,9 +2490,17 @@ def _http_json_post( raw = response.read().decode("utf-8") except HTTPError as err: detail = err.read().decode("utf-8", errors="replace") - raise BenchmarkError(f"{label} returned HTTP {err.code}: {_short_error_detail(detail)}") from err + message = f"{label} returned HTTP {err.code}: {_short_error_detail(detail)}" + if err.code in HTTP_RETRY_TRANSIENT_STATUSES: + transient = _TransientHTTPError(message, retry_after=_parse_retry_after(err)) + transient.__cause__ = err + raise transient from err + raise BenchmarkError(message) from err except URLError as err: - raise BenchmarkError(f"{label} request failed: {err.reason}") from err + message = f"{label} request failed: {err.reason}" + transient = _TransientHTTPError(message, retry_after=None) + transient.__cause__ = err + raise transient from err try: parsed = json.loads(raw) except json.JSONDecodeError as err: @@ -2246,6 +2510,35 @@ def _http_json_post( return parsed +class _TransientHTTPError(Exception): + """Internal marker for retryable HTTP failures.""" + + def __init__(self, message: str, *, retry_after: float | None) -> None: + super().__init__(message) + self.retry_after = retry_after + + +def _parse_retry_after(err: HTTPError) -> float | None: + raw = err.headers.get("Retry-After") if err.headers else None + if not raw: + return None + try: + seconds = float(raw) + except (TypeError, ValueError): + return None + if seconds < 0: + return None + return min(seconds, HTTP_RETRY_CAP_SECONDS) + + +def _retry_delay_seconds(*, attempt: int, retry_after: float | None) -> float: + if retry_after is not None: + return retry_after + base = min(HTTP_RETRY_CAP_SECONDS, 2.0 ** (attempt - 1)) + # Backoff jitter is not security-sensitive — stdlib random is fine here. + return base + random.uniform(0.0, 0.5) # nosec B311 + + def _require_benchmark_api_key(env_var: str, provider: str) -> str: value = _lookup_benchmark_secret(env_var) if not value: @@ -2371,6 +2664,86 @@ def _format_skipped_providers(skipped: list[Any]) -> str: return ", ".join(values) if values else "none" +def _runs_disclosure_lines(report: dict[str, Any]) -> list[str]: + runs = report.get("runs_per_case") + if not isinstance(runs, int) or runs <= 1: + return [] + return [ + ( + f"- Repetition: each cell ran `N={runs}` times. Headline wall seconds " + "and scores are the median across runs, with `[min–max]` shown " + "inline. Per-run artifacts live under `run-1/`, `run-2/`, ... " + "subdirs alongside the case." + ), + ] + + +def _workload_disclosure_lines(report: dict[str, Any]) -> list[str]: + """Render a per-workflow workload table so readers can see the comparison terms.""" + cases = [case for case in report.get("cases", []) if isinstance(case, dict)] + by_workflow: dict[str, dict[str, Any]] = {} + workflow_order: list[str] = [] + for case in cases: + if _is_cache_only_case(case): + continue + workflow = str(case.get("workflow") or "") + if not workflow: + continue + if workflow not in by_workflow: + workflow_order.append(workflow) + by_workflow[workflow] = { + "settings": case.get("settings") or {}, + "records": [], + } + score = case.get("pack_score") + if isinstance(score, dict): + summary = score.get("summary") + if isinstance(summary, dict): + value = summary.get("record_count") + if isinstance(value, int): + by_workflow[workflow]["records"].append(value) + if not by_workflow: + return [] + lines = [ + "## Workload disclosure", + "", + ( + "The five workflows are not the same job. The core crawl walks a " + "page graph from a seed URL; provider workflows fetch a fixed " + "number of search results and optionally extract their content. " + "Compare scores within a row of the heatmap (same workflow across " + "targets), not down a column." + ), + "", + "| Workflow | Settings | Median records | Records range |", + "| --- | --- | ---: | --- |", + ] + setting_keys = ( + "max_pages", + "max_depth", + "max_concurrent", + "max_search_results", + "extract_limit", + "mode", + ) + for workflow in workflow_order: + info = by_workflow[workflow] + settings = info["settings"] if isinstance(info["settings"], dict) else {} + rendered_settings = ", ".join(f"{key}={settings[key]}" for key in setting_keys if key in settings) + records = info["records"] + med_text: str + range_text: str + if records: + med_text = str(_median_int(records)) + range_text = f"{min(records)}–{max(records)}" if min(records) != max(records) else str(records[0]) + else: + med_text = "" + range_text = "" + lines.append(f"| `{workflow}` | {rendered_settings or '—'} | {med_text} | {range_text} |") + lines.append("") + return lines + + def _raindrop_trace_lines(trace: dict[str, Any]) -> list[str]: lines = [ f"- Raindrop trace: `{trace.get('provider', 'none')}` / `{trace.get('status', 'disabled')}`", @@ -2439,7 +2812,7 @@ def _markdown_report(report: dict[str, Any]) -> str: f"`{case.get('target_id', '')}` | " f"`{case['name']}` | " f"`{case.get('workflow', '')}` | " - f"{case['wall_seconds']} | " + f"{_case_wall_seconds_text(case)} | " f"{benchmark_score} | " f"{score_value} | " f"{record_count} | " @@ -2480,12 +2853,33 @@ def _case_benchmark_score_text(case: dict[str, Any]) -> str: return "failed" score = case.get("benchmark_score") if isinstance(score, dict) and isinstance(score.get("score"), int): - return str(score["score"]) + text = str(score["score"]) + score_min = score.get("score_min") + score_max = score.get("score_max") + if isinstance(score_min, int) and isinstance(score_max, int) and score_min != score_max: + text += f" [{score_min}–{score_max}]" + return text if _is_cache_only_case(case): return "cache skip" return "" +def _case_wall_seconds_text(case: dict[str, Any]) -> str: + seconds = case.get("wall_seconds") + if seconds is None: + return "" + text = str(seconds) + seconds_min = case.get("wall_seconds_min") + seconds_max = case.get("wall_seconds_max") + if ( + isinstance(seconds_min, int | float) + and isinstance(seconds_max, int | float) + and seconds_min != seconds_max + ): + text += f" [{seconds_min}–{seconds_max}]" + return text + + def _case_provider_key(case: dict[str, Any]) -> str: workflow = str(case.get("workflow") or "") if workflow == "core-llm": @@ -2591,9 +2985,22 @@ def _article_markdown(report: dict[str, Any], *, title: str) -> str: ), ( "- Weighted score: coverage 30%, cleanliness 20%, source fidelity 20%, " - "freshness 15%, and density 15%." + "freshness 15%, and density 15%. Weights are heuristic — the sub-score " + "signals are the load-bearing detail." ), + ( + "- Boilerplate detection (used inside the cleanliness and density " + "dimensions) is a substring sniff on English navigation phrases; " + "it will under-report localized boilerplate." + ), + ( + "- Freshness is a presence test for target-specific terms in URL, " + "title, or first 5000 characters of body; it does not check page " + "modification time." + ), + *_runs_disclosure_lines(report), "", + *_workload_disclosure_lines(report), "## Targets", "", ] @@ -2642,7 +3049,7 @@ def _article_markdown(report: dict[str, Any], *, title: str) -> str: f"`{case.get('target_id', '')}` | " f"`{case.get('name')}` | " f"`{case.get('workflow')}` | " - f"{case.get('wall_seconds')} | " + f"{_case_wall_seconds_text(case)} | " f"{_case_benchmark_score_text(case)} | " f"{score_value} | " f"{record_count} | " diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py index fd9e7cb..c2d25c5 100644 --- a/tests/test_benchmark.py +++ b/tests/test_benchmark.py @@ -643,3 +643,187 @@ def test_benchmark_article_cli_writes_publishable_markdown(tmp_path: Path) -> No assert "Benchmarking docpull, Parallel, Tavily, Exa, and Raindrop" in article assert "Raindrop tracing was enabled" in article assert "`parallel-context`" in article + + +def test_http_json_post_retries_transient_then_succeeds( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Transient HTTPError on first attempt should retry and succeed on second.""" + attempts = {"count": 0} + sleep_calls: list[float] = [] + + def fake_once(**_kwargs: Any) -> dict[str, Any]: + attempts["count"] += 1 + if attempts["count"] == 1: + err = benchmark._TransientHTTPError("simulated 503", retry_after=None) + raise err + return {"ok": True, "attempt": attempts["count"]} + + monkeypatch.setattr(benchmark, "_http_json_post_once", fake_once) + + result = benchmark._http_json_post( + label="Test", + url="https://example.com/x", + headers={}, + body={}, + timeout=10, + max_attempts=3, + sleep=sleep_calls.append, + ) + + assert result == {"ok": True, "attempt": 2} + assert attempts["count"] == 2 + assert len(sleep_calls) == 1 + assert sleep_calls[0] > 0 + + +def test_http_json_post_raises_after_exhausting_retries( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """All attempts transient → BenchmarkError with the last error's message.""" + sleep_calls: list[float] = [] + + def fake_once(**_kwargs: Any) -> dict[str, Any]: + raise benchmark._TransientHTTPError("simulated 429", retry_after=0.0) + + monkeypatch.setattr(benchmark, "_http_json_post_once", fake_once) + + with pytest.raises(BenchmarkError, match="429"): + benchmark._http_json_post( + label="Test", + url="https://example.com/x", + headers={}, + body={}, + timeout=10, + max_attempts=2, + sleep=sleep_calls.append, + ) + + # 2 attempts total → 1 sleep between them. + assert len(sleep_calls) == 1 + + +def test_benchmark_score_floors_to_zero_on_empty_pack() -> None: + """Empty pack should not arithmetic-average to 50 via vacuous high dims.""" + payload = {"pack_score": {"score": 65, "summary": {"record_count": 0}}} + score = benchmark._benchmark_score( + payload=payload, + records=[], + include_domains=["docs.parallel.ai"], + target=None, + ) + + assert score["score"] == 0 + assert score["grade"] == "poor" + assert set(score["dimensions"]) == { + "coverage", + "cleanliness", + "source_fidelity", + "freshness", + "density", + } + for dim in score["dimensions"].values(): + assert dim["score"] == 0 + assert "empty pack" in dim["signals"] + + +def test_runs_n_aggregates_with_median_and_spread( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """--runs 3 should produce one aggregate case with median + min/max + raw runs.""" + call_count = {"i": 0} + score_sequence = [88, 92, 95] + wall_sequence = [1.0, 2.0, 3.0] + + async def fake(**kwargs: Any) -> dict[str, Any]: + i = call_count["i"] + call_count["i"] += 1 + output_dir = kwargs["output_dir"] + output_dir.mkdir(parents=True, exist_ok=True) + return { + "name": kwargs["name"], + "workflow": "core-llm", + "output_dir": str(output_dir), + "wall_seconds": wall_sequence[i], + "rss_baseline_mb": 10.0, + "rss_peak_mb": 11.0, + "rss_delta_mb": 1.0, + "stats": { + "urls_discovered": 1, + "pages_fetched": 1, + "pages_skipped": 0, + "pages_failed": 0, + "duration_seconds": 0.01, + "success_rate": 100.0, + }, + "skip_counts": {}, + "artifact_size_bytes": 100, + "cache_size_bytes": 200, + "pack_score": { + "score": score_sequence[i], + "grade": "excellent", + "summary": {"record_count": 1, "total_tokens": 100}, + "issues": [], + "warnings": [], + }, + "benchmark_score": { + "schema_version": 2, + "score": score_sequence[i], + "grade": "excellent", + "weights": {}, + "dimensions": {}, + }, + "source_score_count": 1, + } + + monkeypatch.setattr(benchmark, "_run_core_case", fake) + + report = run_quick_benchmark( + target_url="https://docs.parallel.ai", + output_dir=tmp_path / "bench", + max_pages=1, + max_depth=1, + max_concurrent=1, + per_host_concurrent=1, + cache_enabled=True, + cached_pass=True, # should be forced off by runs > 1 + parallel=False, + parallel_objective=None, + parallel_queries=[], + include_domains=[], + mode="advanced", + max_search_results=8, + extract_limit=3, + max_estimated_cost=0.05, + runs=3, + ) + + assert report["runs_per_case"] == 3 + # cached pass should have been forced off + assert report["summary"]["case_count"] == 1 + assert call_count["i"] == 3 + + case = report["cases"][0] + assert case["runs_total"] == 3 + assert case["runs_succeeded"] == 3 + assert case["wall_seconds"] == 2.0 + assert case["wall_seconds_min"] == 1.0 + assert case["wall_seconds_max"] == 3.0 + assert case["wall_seconds_runs"] == [1.0, 2.0, 3.0] + assert case["pack_score"]["score"] == 92 + assert case["pack_score"]["score_min"] == 88 + assert case["pack_score"]["score_max"] == 95 + assert case["pack_score"]["score_runs"] == [88, 92, 95] + assert case["benchmark_score"]["score"] == 92 + assert case["benchmark_score"]["score_min"] == 88 + assert case["benchmark_score"]["score_max"] == 95 + assert len(case["runs"]) == 3 + assert case["estimated_cost_usd"] == 0.0 + # Per-run output dirs should exist as siblings under the case output dir. + assert (tmp_path / "bench" / "core-llm" / "run-1").exists() + assert (tmp_path / "bench" / "core-llm" / "run-2").exists() + assert (tmp_path / "bench" / "core-llm" / "run-3").exists() + + summary = (tmp_path / "bench" / "benchmark.summary.md").read_text(encoding="utf-8") + assert "1.0" in summary and "3.0" in summary # spread is rendered