From a2a853533d95e099579cb2b4b5740dafd9759458 Mon Sep 17 00:00:00 2001 From: admin-raintree <277948009+admin-raintree@users.noreply.github.com> Date: Tue, 9 Jun 2026 15:36:27 -0700 Subject: [PATCH 1/4] Make publication benchmark numbers harder to dismiss Four publication-blocking changes to the benchmark harness: - Retry transient 429/5xx and URLError on live provider HTTP with bounded backoff + jitter; honors Retry-After. Distinguishes transient from terminal errors so a single hiccup no longer leaves a failed cell in the published heatmap. - Add --runs N: per-run output/cache subdirs (run-1/, run-2/, ...), median + min/max aggregation, full per-run history preserved. Cached pass is forced off for N>1 since it composes poorly with repetition. - Floor benchmark_score to 0 when records is empty instead of letting cleanliness/source_fidelity/freshness vacuously return 100 and arithmetic-averaging to ~50. Zero records is a routing gap. - Article generator discloses methodology: heuristic-weight caveat, boilerplate substring-sniff caveat, freshness presence-test caveat, N-run repetition note, and a new Workload Disclosure table showing per-workflow page counts so readers can see comparison terms. 16 benchmark tests pass (12 existing + 4 new for retry, retry-exhaust, empty-pack floor, and N=3 aggregation). Full 476-test suite clean. Co-Authored-By: Claude Opus 4.7 --- src/docpull/benchmark.py | 555 ++++++++++++++++++++++++++++++++++----- tests/test_benchmark.py | 184 +++++++++++++ 2 files changed, 668 insertions(+), 71 deletions(-) diff --git a/src/docpull/benchmark.py b/src/docpull/benchmark.py index 23d2f71..e0db782 100644 --- a/src/docpull/benchmark.py +++ b/src/docpull/benchmark.py @@ -5,13 +5,16 @@ import argparse import asyncio import json +import random import resource import sys import time import uuid from collections import Counter +from collections.abc import Callable from dataclasses import dataclass from pathlib import Path +from statistics import median from typing import Any from urllib.error import HTTPError, URLError from urllib.parse import urlparse @@ -60,6 +63,9 @@ EXA_SEARCH_URL = "https://api.exa.ai/search" TAVILY_SEARCH_URL = "https://api.tavily.com/search" TAVILY_EXTRACT_URL = "https://api.tavily.com/extract" +HTTP_RETRY_MAX_ATTEMPTS = 3 +HTTP_RETRY_TRANSIENT_STATUSES: frozenset[int] = frozenset({429, 502, 503, 504}) +HTTP_RETRY_CAP_SECONDS = 30.0 BENCHMARK_SCORE_WEIGHTS = { "coverage": 0.30, "cleanliness": 0.20, @@ -249,6 +255,17 @@ def create_benchmark_parser() -> argparse.ArgumentParser: quick.add_argument("--max-concurrent", type=int, default=8, help="Core crawl concurrency") quick.add_argument("--per-host-concurrent", type=int, default=4, help="Core per-host concurrency") quick.add_argument("--no-cache", action="store_true", help="Disable cache for the core crawl") + quick.add_argument( + "--runs", + type=int, + default=1, + help=( + "Repeat each case N times and report median wall seconds and score " + "with min/max spread. Per-run artifacts land under run-1/, run-2/, ... " + "subdirs. N>1 forces --no-cached-pass since the cached pass shares state " + "with the prior run by design." + ), + ) quick.set_defaults(cached_pass=None) quick.add_argument( "--cached-pass", @@ -376,6 +393,7 @@ def run_benchmark_cli(argv: list[str] | None = None) -> int: tavily_credit_usd=args.tavily_credit_usd, max_estimated_cost=args.max_estimated_cost, trace_backend=args.trace, + runs=args.runs, ) if args.json_output: console.print_json(data=report) @@ -431,6 +449,7 @@ def run_quick_benchmark( tavily: bool = False, exa: bool = False, live_providers: list[str] | None = None, + runs: int = 1, ) -> dict[str, Any]: """Run the default real-site benchmark matrix.""" _validate_positive_int(max_pages, "max_pages") @@ -439,6 +458,7 @@ def run_quick_benchmark( _validate_positive_int(per_host_concurrent, "per_host_concurrent") _validate_positive_int(max_search_results, "max_search_results") _validate_positive_int(extract_limit, "extract_limit") + _validate_positive_int(runs, "runs") if max_estimated_cost < 0: raise BenchmarkError("max_estimated_cost cannot be negative.") target_set = _canonical_target_set(target_set) @@ -452,6 +472,11 @@ def run_quick_benchmark( ) if cached_pass is None: cached_pass = len(targets) == 1 + if runs > 1: + # Cached pass shares cache state with its prior pass by design; that + # composes poorly with N-run aggregation. Force it off and surface in + # the report so users notice. + cached_pass = False requested_providers = _normalize_live_providers( parallel=parallel, tavily=tavily, @@ -511,23 +536,44 @@ def run_and_record( provider: str, target: _BenchmarkTarget, output_dir: Path, + cache_dir: Path | None = None, prompt: str, settings: dict[str, Any], - runner: Any, + runner_factory: Callable[..., dict[str, Any]], ) -> None: - rss_before = _peak_rss_bytes() - t0 = time.perf_counter() - try: - case = runner() - except Exception as err: # noqa: BLE001 - case = _failed_case( + per_run: list[dict[str, Any]] = [] + for run_index in range(1, runs + 1): + if runs == 1: + run_output = output_dir + run_cache = cache_dir + else: + run_output = output_dir / f"run-{run_index}" + run_cache = (cache_dir / f"run-{run_index}") if cache_dir else None + rss_before = _peak_rss_bytes() + t0 = time.perf_counter() + try: + case = runner_factory(output_dir=run_output, cache_dir=run_cache) + except Exception as err: # noqa: BLE001 + case = _failed_case( + name=name, + workflow=workflow, + output_dir=run_output, + wall_seconds=time.perf_counter() - t0, + rss_before=rss_before, + error=err, + ) + per_run.append(case) + case = ( + per_run[0] + if runs == 1 + else _aggregate_runs( + per_run, name=name, workflow=workflow, output_dir=output_dir, - wall_seconds=time.perf_counter() - t0, - rss_before=rss_before, - error=err, + runs_total=runs, ) + ) _annotate_case( case, provider=provider, @@ -541,14 +587,39 @@ def run_and_record( for target in targets: target_root = run_dir / _safe_slug(target.id) if matrix_run else run_dir - cache_dir = target_root / "cache-core" + core_cache_dir = target_root / "cache-core" core_output = target_root / "core-llm" + + def core_factory( + *, + output_dir: Path, + cache_dir: Path | None, + target: _BenchmarkTarget = target, + ) -> dict[str, Any]: + assert cache_dir is not None + return asyncio.run( + _run_core_case( + name="core-llm", + target_url=target.url, + output_dir=output_dir, + cache_dir=cache_dir, + cache_enabled=cache_enabled, + max_pages=max_pages, + max_depth=max_depth, + max_concurrent=max_concurrent, + per_host_concurrent=per_host_concurrent, + include_domains=list(target.include_domains), + target=target, + ) + ) + run_and_record( name="core-llm", workflow="core-llm", provider="docpull", target=target, output_dir=core_output, + cache_dir=core_cache_dir, prompt=target.objective, settings={ "profile": "llm", @@ -558,25 +629,38 @@ def run_and_record( "per_host_concurrent": per_host_concurrent, "cache_enabled": cache_enabled, }, - runner=lambda target=target, core_output=core_output, cache_dir=cache_dir: asyncio.run( - _run_core_case( - name="core-llm", - target_url=target.url, - output_dir=core_output, - cache_dir=cache_dir, - cache_enabled=cache_enabled, - max_pages=max_pages, - max_depth=max_depth, - max_concurrent=max_concurrent, - per_host_concurrent=per_host_concurrent, - include_domains=list(target.include_domains), - target=target, - ) - ), + runner_factory=core_factory, ) if cache_enabled and cached_pass: cached_output = target_root / "core-llm-cached" + + def cached_factory( + *, + output_dir: Path, + cache_dir: Path | None, + target: _BenchmarkTarget = target, + shared_cache: Path = core_cache_dir, + ) -> dict[str, Any]: + # Reuse the prior pass's cache regardless of the per-run cache + # arg — the cached pass is the second half of a paired + # measurement. (Only reachable when runs == 1.) + return asyncio.run( + _run_core_case( + name="core-llm-cached", + target_url=target.url, + output_dir=output_dir, + cache_dir=shared_cache, + cache_enabled=True, + max_pages=max_pages, + max_depth=max_depth, + max_concurrent=max_concurrent, + per_host_concurrent=per_host_concurrent, + include_domains=list(target.include_domains), + target=target, + ) + ) + run_and_record( name="core-llm-cached", workflow="core-llm", @@ -593,37 +677,24 @@ def run_and_record( "cache_enabled": True, "cache_measurement": True, }, - runner=lambda target=target, cached_output=cached_output, cache_dir=cache_dir: asyncio.run( - _run_core_case( - name="core-llm-cached", - target_url=target.url, - output_dir=cached_output, - cache_dir=cache_dir, - cache_enabled=True, - max_pages=max_pages, - max_depth=max_depth, - max_concurrent=max_concurrent, - per_host_concurrent=per_host_concurrent, - include_domains=list(target.include_domains), - target=target, - ) - ), + runner_factory=cached_factory, ) if parallel: source_policy = _build_source_policy(include_domains=list(target.include_domains)) search_output = target_root / "parallel-search" - def parallel_search_runner( + def parallel_search_factory( *, + output_dir: Path, + cache_dir: Path | None, target: _BenchmarkTarget = target, source_policy: dict[str, Any] = source_policy, - search_output: Path = search_output, ) -> dict[str, Any]: return _run_parallel_search_case( objective=target.objective, queries=list(target.queries), - output_dir=search_output, + output_dir=output_dir, include_domains=list(target.include_domains), source_policy=source_policy, mode=mode, @@ -640,20 +711,21 @@ def parallel_search_runner( output_dir=search_output, prompt=target.objective, settings={"mode": mode, "max_search_results": max_search_results}, - runner=parallel_search_runner, + runner_factory=parallel_search_factory, ) context_output = target_root / "parallel-context" - def parallel_context_runner( + def parallel_context_factory( *, + output_dir: Path, + cache_dir: Path | None, target: _BenchmarkTarget = target, source_policy: dict[str, Any] = source_policy, - context_output: Path = context_output, ) -> dict[str, Any]: return _run_parallel_context_case( objective=target.objective, queries=list(target.queries), - output_dir=context_output, + output_dir=output_dir, include_domains=list(target.include_domains), source_policy=source_policy, mode=mode, @@ -675,11 +747,29 @@ def parallel_context_runner( "max_search_results": max_search_results, "extract_limit": extract_limit, }, - runner=parallel_context_runner, + runner_factory=parallel_context_factory, ) if "tavily" in providers: tavily_output = target_root / "tavily-search-extract" + + def tavily_factory( + *, + output_dir: Path, + cache_dir: Path | None, + target: _BenchmarkTarget = target, + ) -> dict[str, Any]: + return _run_tavily_case( + objective=target.objective, + queries=list(target.queries), + output_dir=output_dir, + include_domains=list(target.include_domains), + max_search_results=max_search_results, + extract_limit=extract_limit, + tavily_credit_usd=tavily_credit_usd, + target=target, + ) + run_and_record( name="tavily-search-extract", workflow="tavily-search-extract-pack", @@ -692,20 +782,27 @@ def parallel_context_runner( "extract_limit": extract_limit, "tavily_credit_usd": tavily_credit_usd, }, - runner=lambda target=target, tavily_output=tavily_output: _run_tavily_case( + runner_factory=tavily_factory, + ) + + if "exa" in providers: + exa_output = target_root / "exa-search-contents" + + def exa_factory( + *, + output_dir: Path, + cache_dir: Path | None, + target: _BenchmarkTarget = target, + ) -> dict[str, Any]: + return _run_exa_case( objective=target.objective, queries=list(target.queries), - output_dir=tavily_output, + output_dir=output_dir, include_domains=list(target.include_domains), max_search_results=max_search_results, - extract_limit=extract_limit, - tavily_credit_usd=tavily_credit_usd, target=target, - ), - ) + ) - if "exa" in providers: - exa_output = target_root / "exa-search-contents" run_and_record( name="exa-search-contents", workflow="exa-search-contents-pack", @@ -714,14 +811,7 @@ def parallel_context_runner( output_dir=exa_output, prompt=target.objective, settings={"max_search_results": max_search_results}, - runner=lambda target=target, exa_output=exa_output: _run_exa_case( - objective=target.objective, - queries=list(target.queries), - output_dir=exa_output, - include_domains=list(target.include_domains), - max_search_results=max_search_results, - target=target, - ), + runner_factory=exa_factory, ) report_path = run_dir / "benchmark.report.json" @@ -740,6 +830,7 @@ def parallel_context_runner( "skipped_providers": skipped_providers, "provider_status": _benchmark_provider_statuses(provider_status), "cost_normalization": _cost_normalization_metadata(tavily_credit_usd), + "runs_per_case": runs, "trace": trace.metadata(), "cases": cases, "summary": _summary(cases), @@ -1875,6 +1966,8 @@ def _benchmark_score( include_domains: list[str], target: _BenchmarkTarget | None, ) -> dict[str, Any]: + if not records: + return _empty_benchmark_score() dimensions = { "coverage": _coverage_dimension(payload, records, target), "cleanliness": _cleanliness_dimension(payload, records), @@ -1897,6 +1990,138 @@ def _benchmark_score( } +def _aggregate_runs( + runs: list[dict[str, Any]], + *, + name: str, + workflow: str, + output_dir: Path, + runs_total: int, +) -> dict[str, Any]: + """Synthesize a single case dict from N per-run case dicts. + + Headline fields (wall_seconds, pack_score.score, benchmark_score.score) + are reported as medians across successful runs, with min/max alongside. + The full per-run list is preserved under ``runs`` for raw inspection. + """ + successful = [run for run in runs if run.get("status") != "failed"] + wall_seconds_list = [float(run.get("wall_seconds") or 0.0) for run in runs] + estimated_costs = [float(run.get("estimated_cost_usd") or 0.0) for run in runs] + artifact_sizes = [int(run.get("artifact_size_bytes") or 0) for run in runs] + cache_sizes = [int(run.get("cache_size_bytes") or 0) for run in runs] + rss_deltas = [float(run.get("rss_delta_mb") or 0.0) for run in runs] + rss_baselines = [float(run.get("rss_baseline_mb") or 0.0) for run in runs] + rss_peaks = [float(run.get("rss_peak_mb") or 0.0) for run in runs] + + case: dict[str, Any] = { + "name": name, + "workflow": workflow, + "output_dir": str(output_dir), + "wall_seconds": round(median(wall_seconds_list), 3), + "wall_seconds_min": round(min(wall_seconds_list), 3), + "wall_seconds_max": round(max(wall_seconds_list), 3), + "wall_seconds_runs": [round(value, 3) for value in wall_seconds_list], + "rss_baseline_mb": round(min(rss_baselines), 1) if rss_baselines else 0.0, + "rss_peak_mb": round(max(rss_peaks), 1) if rss_peaks else 0.0, + "rss_delta_mb": round(max(rss_deltas), 1) if rss_deltas else 0.0, + "artifact_size_bytes": sum(artifact_sizes), + "cache_size_bytes": sum(cache_sizes), + "estimated_cost_usd": round(sum(estimated_costs), 6), + "runs_total": runs_total, + "runs_succeeded": len(successful), + "runs": runs, + } + + pack_scores = [ + int(run["pack_score"]["score"]) + for run in successful + if isinstance(run.get("pack_score"), dict) and isinstance(run["pack_score"].get("score"), int) + ] + if pack_scores: + med = _median_int(pack_scores) + representative = next( + run + for run in successful + if isinstance(run.get("pack_score"), dict) + and int(run["pack_score"].get("score", -1)) == med + ) + case["pack_score"] = { + **representative["pack_score"], + "score": med, + "score_min": min(pack_scores), + "score_max": max(pack_scores), + "score_runs": pack_scores, + } + else: + case["pack_score"] = None + + benchmark_scores = [ + int(run["benchmark_score"]["score"]) + for run in successful + if isinstance(run.get("benchmark_score"), dict) + and isinstance(run["benchmark_score"].get("score"), int) + ] + if benchmark_scores: + med = _median_int(benchmark_scores) + representative = next( + run + for run in successful + if isinstance(run.get("benchmark_score"), dict) + and int(run["benchmark_score"].get("score", -1)) == med + ) + case["benchmark_score"] = { + **representative["benchmark_score"], + "score": med, + "score_min": min(benchmark_scores), + "score_max": max(benchmark_scores), + "score_runs": benchmark_scores, + } + else: + case["benchmark_score"] = None + + if not successful: + case["status"] = "failed" + first_error = next((run.get("error") for run in runs if run.get("error")), None) + case["error"] = first_error or { + "type": "BenchmarkError", + "message": f"all {runs_total} runs failed", + } + + if successful: + first = successful[0] + for key in ("stats", "skip_counts", "cost_units", "pack_metadata", "source_score_count"): + if key in first: + case[key] = first[key] + + return case + + +def _median_int(values: list[int]) -> int: + """Lower-median: returns an actual element of ``values`` (no interpolation).""" + ordered = sorted(values) + return ordered[(len(ordered) - 1) // 2] + + +def _empty_benchmark_score() -> dict[str, Any]: + """Floor the score when a pack has zero records. + + Without this, cleanliness/source_fidelity/freshness return 100 because they + have nothing to penalize, leaving the weighted score around 50. An empty + pack should read as failure, not a passing grade. + """ + dimensions = { + name: {"score": 0, "weight": weight, "signals": ["empty pack"]} + for name, weight in BENCHMARK_SCORE_WEIGHTS.items() + } + return { + "schema_version": BENCHMARK_SCHEMA_VERSION, + "score": 0, + "grade": _benchmark_grade(0), + "weights": BENCHMARK_SCORE_WEIGHTS, + "dimensions": dimensions, + } + + def _coverage_dimension( payload: dict[str, Any], records: list[dict[str, Any]], @@ -2216,6 +2441,38 @@ def _http_json_post( headers: dict[str, str], body: dict[str, Any], timeout: float, + max_attempts: int = HTTP_RETRY_MAX_ATTEMPTS, + sleep: Any = time.sleep, +) -> dict[str, Any]: + """POST JSON with bounded retry on transient HTTP/URL errors. + + Retries on 429/502/503/504 and URLError up to ``max_attempts`` total. + Honors ``Retry-After`` (seconds) when present, capped at + ``HTTP_RETRY_CAP_SECONDS``. Other 4xx, JSON errors, and non-https URLs + raise immediately. + """ + last_error: BenchmarkError | None = None + for attempt in range(1, max_attempts + 1): + try: + return _http_json_post_once(label=label, url=url, headers=headers, body=body, timeout=timeout) + except _TransientHTTPError as err: + last_error = BenchmarkError(str(err)) + last_error.__cause__ = err.__cause__ + if attempt >= max_attempts: + break + delay = _retry_delay_seconds(attempt=attempt, retry_after=err.retry_after) + sleep(delay) + assert last_error is not None + raise last_error + + +def _http_json_post_once( + *, + label: str, + url: str, + headers: dict[str, str], + body: dict[str, Any], + timeout: float, ) -> dict[str, Any]: parsed_url = urlparse(url) if parsed_url.scheme != "https": @@ -2234,9 +2491,17 @@ def _http_json_post( raw = response.read().decode("utf-8") except HTTPError as err: detail = err.read().decode("utf-8", errors="replace") - raise BenchmarkError(f"{label} returned HTTP {err.code}: {_short_error_detail(detail)}") from err + message = f"{label} returned HTTP {err.code}: {_short_error_detail(detail)}" + if err.code in HTTP_RETRY_TRANSIENT_STATUSES: + transient = _TransientHTTPError(message, retry_after=_parse_retry_after(err)) + transient.__cause__ = err + raise transient from err + raise BenchmarkError(message) from err except URLError as err: - raise BenchmarkError(f"{label} request failed: {err.reason}") from err + message = f"{label} request failed: {err.reason}" + transient = _TransientHTTPError(message, retry_after=None) + transient.__cause__ = err + raise transient from err try: parsed = json.loads(raw) except json.JSONDecodeError as err: @@ -2246,6 +2511,34 @@ def _http_json_post( return parsed +class _TransientHTTPError(Exception): + """Internal marker for retryable HTTP failures.""" + + def __init__(self, message: str, *, retry_after: float | None) -> None: + super().__init__(message) + self.retry_after = retry_after + + +def _parse_retry_after(err: HTTPError) -> float | None: + raw = err.headers.get("Retry-After") if err.headers else None + if not raw: + return None + try: + seconds = float(raw) + except (TypeError, ValueError): + return None + if seconds < 0: + return None + return min(seconds, HTTP_RETRY_CAP_SECONDS) + + +def _retry_delay_seconds(*, attempt: int, retry_after: float | None) -> float: + if retry_after is not None: + return retry_after + base = min(HTTP_RETRY_CAP_SECONDS, 2.0 ** (attempt - 1)) + return base + random.uniform(0.0, 0.5) + + def _require_benchmark_api_key(env_var: str, provider: str) -> str: value = _lookup_benchmark_secret(env_var) if not value: @@ -2371,6 +2664,88 @@ def _format_skipped_providers(skipped: list[Any]) -> str: return ", ".join(values) if values else "none" +def _runs_disclosure_lines(report: dict[str, Any]) -> list[str]: + runs = report.get("runs_per_case") + if not isinstance(runs, int) or runs <= 1: + return [] + return [ + ( + f"- Repetition: each cell ran `N={runs}` times. Headline wall seconds " + "and scores are the median across runs, with `[min–max]` shown " + "inline. Per-run artifacts live under `run-1/`, `run-2/`, ... " + "subdirs alongside the case." + ), + ] + + +def _workload_disclosure_lines(report: dict[str, Any]) -> list[str]: + """Render a per-workflow workload table so readers can see the comparison terms.""" + cases = [case for case in report.get("cases", []) if isinstance(case, dict)] + by_workflow: dict[str, dict[str, Any]] = {} + workflow_order: list[str] = [] + for case in cases: + if _is_cache_only_case(case): + continue + workflow = str(case.get("workflow") or "") + if not workflow: + continue + if workflow not in by_workflow: + workflow_order.append(workflow) + by_workflow[workflow] = { + "settings": case.get("settings") or {}, + "records": [], + } + score = case.get("pack_score") + if isinstance(score, dict): + summary = score.get("summary") + if isinstance(summary, dict): + value = summary.get("record_count") + if isinstance(value, int): + by_workflow[workflow]["records"].append(value) + if not by_workflow: + return [] + lines = [ + "## Workload disclosure", + "", + ( + "The five workflows are not the same job. The core crawl walks a " + "page graph from a seed URL; provider workflows fetch a fixed " + "number of search results and optionally extract their content. " + "Compare scores within a row of the heatmap (same workflow across " + "targets), not down a column." + ), + "", + "| Workflow | Settings | Median records | Records range |", + "| --- | --- | ---: | --- |", + ] + setting_keys = ( + "max_pages", + "max_depth", + "max_concurrent", + "max_search_results", + "extract_limit", + "mode", + ) + for workflow in workflow_order: + info = by_workflow[workflow] + settings = info["settings"] if isinstance(info["settings"], dict) else {} + rendered_settings = ", ".join( + f"{key}={settings[key]}" for key in setting_keys if key in settings + ) + records = info["records"] + if records: + med = _median_int(records) + range_text = f"{min(records)}–{max(records)}" if min(records) != max(records) else str(records[0]) + else: + med = "" + range_text = "" + lines.append( + f"| `{workflow}` | {rendered_settings or '—'} | {med} | {range_text} |" + ) + lines.append("") + return lines + + def _raindrop_trace_lines(trace: dict[str, Any]) -> list[str]: lines = [ f"- Raindrop trace: `{trace.get('provider', 'none')}` / `{trace.get('status', 'disabled')}`", @@ -2439,7 +2814,7 @@ def _markdown_report(report: dict[str, Any]) -> str: f"`{case.get('target_id', '')}` | " f"`{case['name']}` | " f"`{case.get('workflow', '')}` | " - f"{case['wall_seconds']} | " + f"{_case_wall_seconds_text(case)} | " f"{benchmark_score} | " f"{score_value} | " f"{record_count} | " @@ -2480,12 +2855,37 @@ def _case_benchmark_score_text(case: dict[str, Any]) -> str: return "failed" score = case.get("benchmark_score") if isinstance(score, dict) and isinstance(score.get("score"), int): - return str(score["score"]) + text = str(score["score"]) + score_min = score.get("score_min") + score_max = score.get("score_max") + if ( + isinstance(score_min, int) + and isinstance(score_max, int) + and score_min != score_max + ): + text += f" [{score_min}–{score_max}]" + return text if _is_cache_only_case(case): return "cache skip" return "" +def _case_wall_seconds_text(case: dict[str, Any]) -> str: + seconds = case.get("wall_seconds") + if seconds is None: + return "" + text = str(seconds) + seconds_min = case.get("wall_seconds_min") + seconds_max = case.get("wall_seconds_max") + if ( + isinstance(seconds_min, int | float) + and isinstance(seconds_max, int | float) + and seconds_min != seconds_max + ): + text += f" [{seconds_min}–{seconds_max}]" + return text + + def _case_provider_key(case: dict[str, Any]) -> str: workflow = str(case.get("workflow") or "") if workflow == "core-llm": @@ -2591,9 +2991,22 @@ def _article_markdown(report: dict[str, Any], *, title: str) -> str: ), ( "- Weighted score: coverage 30%, cleanliness 20%, source fidelity 20%, " - "freshness 15%, and density 15%." + "freshness 15%, and density 15%. Weights are heuristic — the sub-score " + "signals are the load-bearing detail." ), + ( + "- Boilerplate detection (used inside the cleanliness and density " + "dimensions) is a substring sniff on English navigation phrases; " + "it will under-report localized boilerplate." + ), + ( + "- Freshness is a presence test for target-specific terms in URL, " + "title, or first 5000 characters of body; it does not check page " + "modification time." + ), + *_runs_disclosure_lines(report), "", + *_workload_disclosure_lines(report), "## Targets", "", ] @@ -2642,7 +3055,7 @@ def _article_markdown(report: dict[str, Any], *, title: str) -> str: f"`{case.get('target_id', '')}` | " f"`{case.get('name')}` | " f"`{case.get('workflow')}` | " - f"{case.get('wall_seconds')} | " + f"{_case_wall_seconds_text(case)} | " f"{_case_benchmark_score_text(case)} | " f"{score_value} | " f"{record_count} | " diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py index fd9e7cb..c2d25c5 100644 --- a/tests/test_benchmark.py +++ b/tests/test_benchmark.py @@ -643,3 +643,187 @@ def test_benchmark_article_cli_writes_publishable_markdown(tmp_path: Path) -> No assert "Benchmarking docpull, Parallel, Tavily, Exa, and Raindrop" in article assert "Raindrop tracing was enabled" in article assert "`parallel-context`" in article + + +def test_http_json_post_retries_transient_then_succeeds( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Transient HTTPError on first attempt should retry and succeed on second.""" + attempts = {"count": 0} + sleep_calls: list[float] = [] + + def fake_once(**_kwargs: Any) -> dict[str, Any]: + attempts["count"] += 1 + if attempts["count"] == 1: + err = benchmark._TransientHTTPError("simulated 503", retry_after=None) + raise err + return {"ok": True, "attempt": attempts["count"]} + + monkeypatch.setattr(benchmark, "_http_json_post_once", fake_once) + + result = benchmark._http_json_post( + label="Test", + url="https://example.com/x", + headers={}, + body={}, + timeout=10, + max_attempts=3, + sleep=sleep_calls.append, + ) + + assert result == {"ok": True, "attempt": 2} + assert attempts["count"] == 2 + assert len(sleep_calls) == 1 + assert sleep_calls[0] > 0 + + +def test_http_json_post_raises_after_exhausting_retries( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """All attempts transient → BenchmarkError with the last error's message.""" + sleep_calls: list[float] = [] + + def fake_once(**_kwargs: Any) -> dict[str, Any]: + raise benchmark._TransientHTTPError("simulated 429", retry_after=0.0) + + monkeypatch.setattr(benchmark, "_http_json_post_once", fake_once) + + with pytest.raises(BenchmarkError, match="429"): + benchmark._http_json_post( + label="Test", + url="https://example.com/x", + headers={}, + body={}, + timeout=10, + max_attempts=2, + sleep=sleep_calls.append, + ) + + # 2 attempts total → 1 sleep between them. + assert len(sleep_calls) == 1 + + +def test_benchmark_score_floors_to_zero_on_empty_pack() -> None: + """Empty pack should not arithmetic-average to 50 via vacuous high dims.""" + payload = {"pack_score": {"score": 65, "summary": {"record_count": 0}}} + score = benchmark._benchmark_score( + payload=payload, + records=[], + include_domains=["docs.parallel.ai"], + target=None, + ) + + assert score["score"] == 0 + assert score["grade"] == "poor" + assert set(score["dimensions"]) == { + "coverage", + "cleanliness", + "source_fidelity", + "freshness", + "density", + } + for dim in score["dimensions"].values(): + assert dim["score"] == 0 + assert "empty pack" in dim["signals"] + + +def test_runs_n_aggregates_with_median_and_spread( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """--runs 3 should produce one aggregate case with median + min/max + raw runs.""" + call_count = {"i": 0} + score_sequence = [88, 92, 95] + wall_sequence = [1.0, 2.0, 3.0] + + async def fake(**kwargs: Any) -> dict[str, Any]: + i = call_count["i"] + call_count["i"] += 1 + output_dir = kwargs["output_dir"] + output_dir.mkdir(parents=True, exist_ok=True) + return { + "name": kwargs["name"], + "workflow": "core-llm", + "output_dir": str(output_dir), + "wall_seconds": wall_sequence[i], + "rss_baseline_mb": 10.0, + "rss_peak_mb": 11.0, + "rss_delta_mb": 1.0, + "stats": { + "urls_discovered": 1, + "pages_fetched": 1, + "pages_skipped": 0, + "pages_failed": 0, + "duration_seconds": 0.01, + "success_rate": 100.0, + }, + "skip_counts": {}, + "artifact_size_bytes": 100, + "cache_size_bytes": 200, + "pack_score": { + "score": score_sequence[i], + "grade": "excellent", + "summary": {"record_count": 1, "total_tokens": 100}, + "issues": [], + "warnings": [], + }, + "benchmark_score": { + "schema_version": 2, + "score": score_sequence[i], + "grade": "excellent", + "weights": {}, + "dimensions": {}, + }, + "source_score_count": 1, + } + + monkeypatch.setattr(benchmark, "_run_core_case", fake) + + report = run_quick_benchmark( + target_url="https://docs.parallel.ai", + output_dir=tmp_path / "bench", + max_pages=1, + max_depth=1, + max_concurrent=1, + per_host_concurrent=1, + cache_enabled=True, + cached_pass=True, # should be forced off by runs > 1 + parallel=False, + parallel_objective=None, + parallel_queries=[], + include_domains=[], + mode="advanced", + max_search_results=8, + extract_limit=3, + max_estimated_cost=0.05, + runs=3, + ) + + assert report["runs_per_case"] == 3 + # cached pass should have been forced off + assert report["summary"]["case_count"] == 1 + assert call_count["i"] == 3 + + case = report["cases"][0] + assert case["runs_total"] == 3 + assert case["runs_succeeded"] == 3 + assert case["wall_seconds"] == 2.0 + assert case["wall_seconds_min"] == 1.0 + assert case["wall_seconds_max"] == 3.0 + assert case["wall_seconds_runs"] == [1.0, 2.0, 3.0] + assert case["pack_score"]["score"] == 92 + assert case["pack_score"]["score_min"] == 88 + assert case["pack_score"]["score_max"] == 95 + assert case["pack_score"]["score_runs"] == [88, 92, 95] + assert case["benchmark_score"]["score"] == 92 + assert case["benchmark_score"]["score_min"] == 88 + assert case["benchmark_score"]["score_max"] == 95 + assert len(case["runs"]) == 3 + assert case["estimated_cost_usd"] == 0.0 + # Per-run output dirs should exist as siblings under the case output dir. + assert (tmp_path / "bench" / "core-llm" / "run-1").exists() + assert (tmp_path / "bench" / "core-llm" / "run-2").exists() + assert (tmp_path / "bench" / "core-llm" / "run-3").exists() + + summary = (tmp_path / "bench" / "benchmark.summary.md").read_text(encoding="utf-8") + assert "1.0" in summary and "3.0" in summary # spread is rendered From 3ddbbf976221d3dae872621043f29bb1b95d6338 Mon Sep 17 00:00:00 2001 From: admin-raintree <277948009+admin-raintree@users.noreply.github.com> Date: Tue, 9 Jun 2026 15:44:13 -0700 Subject: [PATCH 2/4] Fix CI: mypy assignment, ruff format, bandit B311 - _workload_disclosure_lines: annotate med_text/range_text as str so mypy stops inferring int from the populated branch (#61 CI). - _retry_delay_seconds: # nosec B311 on the jitter random.uniform call; backoff jitter is not security-sensitive. - ruff format pass on the file. Co-Authored-By: Claude Opus 4.7 --- src/docpull/benchmark.py | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/src/docpull/benchmark.py b/src/docpull/benchmark.py index e0db782..b744de8 100644 --- a/src/docpull/benchmark.py +++ b/src/docpull/benchmark.py @@ -2042,8 +2042,7 @@ def _aggregate_runs( representative = next( run for run in successful - if isinstance(run.get("pack_score"), dict) - and int(run["pack_score"].get("score", -1)) == med + if isinstance(run.get("pack_score"), dict) and int(run["pack_score"].get("score", -1)) == med ) case["pack_score"] = { **representative["pack_score"], @@ -2536,7 +2535,8 @@ def _retry_delay_seconds(*, attempt: int, retry_after: float | None) -> float: if retry_after is not None: return retry_after base = min(HTTP_RETRY_CAP_SECONDS, 2.0 ** (attempt - 1)) - return base + random.uniform(0.0, 0.5) + # Backoff jitter is not security-sensitive — stdlib random is fine here. + return base + random.uniform(0.0, 0.5) # nosec B311 def _require_benchmark_api_key(env_var: str, provider: str) -> str: @@ -2729,19 +2729,17 @@ def _workload_disclosure_lines(report: dict[str, Any]) -> list[str]: for workflow in workflow_order: info = by_workflow[workflow] settings = info["settings"] if isinstance(info["settings"], dict) else {} - rendered_settings = ", ".join( - f"{key}={settings[key]}" for key in setting_keys if key in settings - ) + rendered_settings = ", ".join(f"{key}={settings[key]}" for key in setting_keys if key in settings) records = info["records"] + med_text: str + range_text: str if records: - med = _median_int(records) + med_text = str(_median_int(records)) range_text = f"{min(records)}–{max(records)}" if min(records) != max(records) else str(records[0]) else: - med = "" + med_text = "" range_text = "" - lines.append( - f"| `{workflow}` | {rendered_settings or '—'} | {med} | {range_text} |" - ) + lines.append(f"| `{workflow}` | {rendered_settings or '—'} | {med_text} | {range_text} |") lines.append("") return lines @@ -2858,11 +2856,7 @@ def _case_benchmark_score_text(case: dict[str, Any]) -> str: text = str(score["score"]) score_min = score.get("score_min") score_max = score.get("score_max") - if ( - isinstance(score_min, int) - and isinstance(score_max, int) - and score_min != score_max - ): + if isinstance(score_min, int) and isinstance(score_max, int) and score_min != score_max: text += f" [{score_min}–{score_max}]" return text if _is_cache_only_case(case): From a1b6c7e4c013ede652febd9589fb19195c44dc8e Mon Sep 17 00:00:00 2001 From: admin-raintree <277948009+admin-raintree@users.noreply.github.com> Date: Tue, 9 Jun 2026 16:22:05 -0700 Subject: [PATCH 3/4] feat(benchmark): add pass^k reliability metric + LLM-judge stub MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit pass^k (worst-of-k-trials clears the bar) is now computed alongside the existing median and rendered in benchmark.summary.md. The N3 publication report's headline median hid a 33-point reliability gap between providers at the 90 threshold; pass^3 surfaces it (e.g. docpull 88% vs. tavily 50%). The LLM-judge module is an advisory stub — separate from benchmark_score, key-gated, with a four-dimension rubric (coverage / groundedness / source_authority / synthesis_readiness) per the Anthropic eval guidance. Not wired into the report until calibrated against a hand-graded set. Co-Authored-By: Claude Opus 4.7 --- src/docpull/benchmark.py | 89 ++++++++++ src/docpull/judge.py | 360 +++++++++++++++++++++++++++++++++++++++ src/docpull/passk.py | 204 ++++++++++++++++++++++ tests/test_judge.py | 133 +++++++++++++++ tests/test_passk.py | 83 +++++++++ 5 files changed, 869 insertions(+) create mode 100644 src/docpull/judge.py create mode 100644 src/docpull/passk.py create mode 100644 tests/test_judge.py create mode 100644 tests/test_passk.py diff --git a/src/docpull/benchmark.py b/src/docpull/benchmark.py index b744de8..97df90b 100644 --- a/src/docpull/benchmark.py +++ b/src/docpull/benchmark.py @@ -40,6 +40,7 @@ run_live_context_pack, run_search_pack, ) +from .passk import pass_at_k from .pipeline.manifest import CorpusManifest from .provider_keys import ( PROVIDER_CONFIGS, @@ -73,6 +74,7 @@ "freshness": 0.15, "density": 0.15, } +PASS_AT_K_THRESHOLDS: tuple[int, ...] = (70, 80, 90) TARGET_SET_CHOICES = ("single", "tool-docs", "provider-matrix", "v2") @@ -2627,9 +2629,49 @@ def _summary(cases: list[dict[str, Any]]) -> dict[str, Any]: if case.get("pack_score") is None and not cache_only ), "best_by_target": _best_by_target(cases), + "pass_at_k": _pass_at_k_summary(cases, cache_only_cases), } +def _pass_at_k_summary( + cases: list[dict[str, Any]], + cache_only_cases: list[bool], +) -> dict[str, Any]: + """Compute pass^k for ``pack_score`` and ``benchmark_score``. + + pass^k = fraction of cases whose *worst* trial meets the threshold. The + Anthropic "Demystifying evals" post argues this is the right framing when + consistency matters ("users expect reliable behavior every time"). Median + tells you the typical run; pass^k tells you how often a case is reliably + above bar. Cache-only cases are excluded — they're not scored. + """ + scored_cases = [ + case + for case, cache_only in zip(cases, cache_only_cases, strict=True) + if not cache_only + ] + if not scored_cases: + return {"k": 0, "thresholds": list(PASS_AT_K_THRESHOLDS), "results": {}} + results: dict[str, list[dict[str, Any]]] = {} + k = 0 + for score_key in ("pack_score", "benchmark_score"): + per_threshold: list[dict[str, Any]] = [] + for threshold in PASS_AT_K_THRESHOLDS: + block = pass_at_k(scored_cases, score_key=score_key, threshold=threshold) + k = max(k, block["k"]) + per_threshold.append( + { + "threshold": threshold, + "cases_total": block["cases_total"], + "cases_passed": block["cases_passed"], + "rate": round(block["rate"], 4), + "by_provider": block["by_provider"], + } + ) + results[score_key] = per_threshold + return {"k": k, "thresholds": list(PASS_AT_K_THRESHOLDS), "results": results} + + def _best_by_target(cases: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: best: dict[str, dict[str, Any]] = {} for case in cases: @@ -2845,9 +2887,56 @@ def _markdown_report(report: dict[str, Any]) -> str: "", ] ) + lines.extend(_pass_at_k_lines(report)) return "\n".join(lines) +def _pass_at_k_lines(report: dict[str, Any]) -> list[str]: + summary = report.get("summary") + if not isinstance(summary, dict): + return [] + block = summary.get("pass_at_k") + if not isinstance(block, dict): + return [] + k = block.get("k") + if not isinstance(k, int) or k < 2: + return [] + results = block.get("results") + if not isinstance(results, dict) or not results: + return [] + thresholds = block.get("thresholds") or list(PASS_AT_K_THRESHOLDS) + lines = [ + "", + f"## Reliability (pass^{k})", + "", + ( + f"Fraction of cases whose **worst** of {k} trials meets the threshold. " + "Stricter than the headline median: a case only counts as passing if " + "every run cleared the bar." + ), + "", + "| Score | " + " | ".join(f"@{t}" for t in thresholds) + " | n |", + "| --- | " + " | ".join(["---:"] * len(thresholds)) + " | ---: |", + ] + for score_key, rows in results.items(): + if not isinstance(rows, list) or not rows: + continue + by_threshold = {row["threshold"]: row for row in rows if isinstance(row, dict)} + cells: list[str] = [] + total = 0 + for threshold in thresholds: + row = by_threshold.get(threshold) + if not row: + cells.append("—") + continue + total = max(total, int(row.get("cases_total") or 0)) + rate = float(row.get("rate") or 0.0) + cells.append(f"{rate:.1%} ({row.get('cases_passed')}/{row.get('cases_total')})") + lines.append(f"| `{score_key}` | " + " | ".join(cells) + f" | {total} |") + lines.append("") + return lines + + def _case_benchmark_score_text(case: dict[str, Any]) -> str: if case.get("status") == "failed": return "failed" diff --git a/src/docpull/judge.py b/src/docpull/judge.py new file mode 100644 index 0000000..7c6878d --- /dev/null +++ b/src/docpull/judge.py @@ -0,0 +1,360 @@ +"""LLM-judge dimension for docpull benchmark packs (stub). + +The existing ``benchmark_score`` is fully deterministic. For research-style +evals the Anthropic "Demystifying evals for AI agents" post recommends +corroborating heuristics with a model-based rubric, calibrated against +human judgment on a small set of targets. This module provides that +corroborating signal as an **advisory** score (parallel to, not folded +into, ``benchmark_score``) so the heuristic baseline and its regression +properties stay intact. + +What this gives you: + * A clear rubric prompt (coverage / groundedness / source_authority / + synthesis_readiness) the model fills in with per-dimension scores. + * A key-gated transport: if ``ANTHROPIC_API_KEY`` is unset, the judge + returns ``skipped=True`` with a structured reason rather than failing + the run. + * A pluggable ``client`` callable so the same code path works in tests + (inject a stub) and in CI (skip cleanly). + * Document sampling capped to keep cost predictable and reproducible. + +Wire-up checklist (deferred — this is a stub, not a calibrated grader): + * Calibrate the rubric against ~10 hand-graded packs (see Anthropic eval + post, "Design graders thoughtfully" section). + * Decide reporting cadence (every run vs. a sampled subset) given cost. + * Optionally add a self-consistency layer (multi-judge consensus). + +Usage: + python -m docpull.judge .bench/runs////run-1 \\ + --task-prompt "Build a context pack for the Parallel API docs" +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import urllib.error +import urllib.request +from collections.abc import Callable, Iterable +from pathlib import Path +from typing import Any + +JUDGE_SCHEMA_VERSION = 1 +JUDGE_MODEL_ENV = "DOCPULL_JUDGE_MODEL" +JUDGE_API_KEY_ENV = "ANTHROPIC_API_KEY" +DEFAULT_JUDGE_MODEL = "claude-opus-4-7" +ANTHROPIC_MESSAGES_URL = "https://api.anthropic.com/v1/messages" +ANTHROPIC_API_VERSION = "2023-06-01" + +JUDGE_DIMENSIONS = ("coverage", "groundedness", "source_authority", "synthesis_readiness") +JUDGE_WEIGHTS = { + "coverage": 0.35, + "groundedness": 0.25, + "source_authority": 0.20, + "synthesis_readiness": 0.20, +} +MAX_SAMPLED_DOCS = 6 +MAX_DOC_CHARS = 1500 +JUDGE_REQUEST_TIMEOUT_S = 60.0 + +JudgeClient = Callable[[str], str] + + +RUBRIC_PROMPT = """You are grading a documentation context pack produced by an automated +agent. The pack is a collection of Markdown documents intended for a downstream +LLM agent to consult when answering questions or writing code about the topic. + +Task prompt (what the agent was asked to build a pack for): +{task_prompt} + +Expected authoritative domains (if relevant): +{expected_domains} + +Sample of {sampled} of {total} document(s) in the pack (truncated): +{documents} + +Score the pack from 0-100 on each of these four dimensions, then explain +briefly. If you do not have enough evidence to judge a dimension, return +"Unknown" for that dimension's score and say why in the rationale — do not +guess. + +Dimensions: +- coverage: does the pack cover the intent of the task prompt across the + breadth a useful answer would need? Penalize narrow / one-topic packs when + the task implies breadth. +- groundedness: are the documents factually relevant to the topic and free + of obvious irrelevant noise (navigation, ads, unrelated pages)? +- source_authority: do the documents come from the canonical/expected + authoritative sources, not tangential third-party pages? +- synthesis_readiness: is the content shaped well for downstream agent use + — coherent prose, code blocks intact, no paywall snippets, no broken HTML? + +Return ONLY a JSON object on a single line, no prose before or after: +{{"coverage": {{"score": <0-100 or "Unknown">, "rationale": ""}}, + "groundedness": {{"score": <0-100 or "Unknown">, "rationale": ""}}, + "source_authority": {{"score": <0-100 or "Unknown">, "rationale": ""}}, + "synthesis_readiness": {{"score": <0-100 or "Unknown">, "rationale": ""}}}} +""" + + +def judge_pack( + pack_dir: Path, + task: dict[str, Any], + *, + client: JudgeClient | None = None, + model: str | None = None, +) -> dict[str, Any]: + """Grade a pack with an LLM rubric. Returns an advisory ``judge_score`` dict. + + ``task`` is expected to carry: ``prompt`` (str), optionally + ``expected_domains`` (list[str]). The returned dict mirrors the shape of + ``benchmark_score`` (schema_version / score / grade / weights / + dimensions) plus ``skipped`` / ``skip_reason`` / ``model`` / + ``doc_sample_count`` so callers can render it alongside the heuristic + score without special casing. + """ + documents_path = pack_dir / "documents.ndjson" + if not documents_path.exists(): + return _skipped("documents.ndjson missing", model=model) + + docs = list(_read_documents(documents_path)) + if not docs: + return _skipped("pack has zero documents", model=model) + + resolved_client, resolved_model, key_error = _resolve_client(client=client, model=model) + if resolved_client is None: + return _skipped(key_error or "no judge client configured", model=resolved_model) + + sampled = docs[:MAX_SAMPLED_DOCS] + prompt = RUBRIC_PROMPT.format( + task_prompt=task.get("prompt") or "(none provided)", + expected_domains=", ".join(task.get("expected_domains") or []) or "(none provided)", + sampled=len(sampled), + total=len(docs), + documents=_format_documents(sampled), + ) + try: + raw = resolved_client(prompt) + except _JudgeTransportError as exc: + return _skipped(f"judge transport error: {exc}", model=resolved_model) + + parsed = _parse_judgment(raw) + if parsed is None: + return _skipped("judge returned non-JSON response", model=resolved_model) + + dimensions, score = _aggregate(parsed) + return { + "schema_version": JUDGE_SCHEMA_VERSION, + "score": score, + "grade": _grade(score) if score is not None else None, + "weights": JUDGE_WEIGHTS, + "dimensions": dimensions, + "skipped": False, + "skip_reason": None, + "model": resolved_model, + "doc_sample_count": len(sampled), + } + + +def _read_documents(path: Path) -> Iterable[dict[str, Any]]: + with path.open() as fh: + for line in fh: + line = line.strip() + if not line: + continue + try: + yield json.loads(line) + except json.JSONDecodeError: + continue + + +def _format_documents(docs: list[dict[str, Any]]) -> str: + chunks: list[str] = [] + for i, doc in enumerate(docs, start=1): + content = str(doc.get("content") or "") + if len(content) > MAX_DOC_CHARS: + content = content[:MAX_DOC_CHARS] + " …" + chunks.append( + f"--- doc {i} ---\n" + f"url: {doc.get('url')}\n" + f"title: {doc.get('title')}\n\n" + f"{content}" + ) + return "\n\n".join(chunks) + + +def _resolve_client( + *, + client: JudgeClient | None, + model: str | None, +) -> tuple[JudgeClient | None, str | None, str | None]: + resolved_model = model or os.environ.get(JUDGE_MODEL_ENV) or DEFAULT_JUDGE_MODEL + if client is not None: + return client, resolved_model, None + api_key = os.environ.get(JUDGE_API_KEY_ENV) + if not api_key: + return None, resolved_model, f"{JUDGE_API_KEY_ENV} not set" + return _AnthropicMessagesClient(api_key=api_key, model=resolved_model), resolved_model, None + + +def _parse_judgment(text: str) -> dict[str, Any] | None: + start = text.find("{") + end = text.rfind("}") + if start == -1 or end == -1 or end <= start: + return None + try: + parsed = json.loads(text[start : end + 1]) + except json.JSONDecodeError: + return None + if not isinstance(parsed, dict): + return None + return parsed + + +def _aggregate(parsed: dict[str, Any]) -> tuple[dict[str, Any], int | None]: + dimensions: dict[str, Any] = {} + weighted_total = 0.0 + weight_used = 0.0 + for name in JUDGE_DIMENSIONS: + weight = JUDGE_WEIGHTS[name] + entry = parsed.get(name) if isinstance(parsed.get(name), dict) else {} + raw_score = entry.get("score") if isinstance(entry, dict) else None + rationale = entry.get("rationale") if isinstance(entry, dict) else None + numeric: int | None + if isinstance(raw_score, int | float) and not isinstance(raw_score, bool): + numeric = max(0, min(100, int(raw_score))) + weighted_total += numeric * weight + weight_used += weight + else: + numeric = None + dimensions[name] = { + "score": numeric, + "weight": weight, + "rationale": rationale if isinstance(rationale, str) else None, + } + score = round(weighted_total / weight_used) if weight_used > 0 else None + return dimensions, score + + +def _grade(score: int) -> str: + if score >= 90: + return "excellent" + if score >= 75: + return "good" + if score >= 60: + return "fair" + if score >= 40: + return "poor" + return "failing" + + +def _skipped(reason: str, *, model: str | None) -> dict[str, Any]: + return { + "schema_version": JUDGE_SCHEMA_VERSION, + "score": None, + "grade": None, + "weights": JUDGE_WEIGHTS, + "dimensions": { + name: {"score": None, "weight": JUDGE_WEIGHTS[name], "rationale": None} + for name in JUDGE_DIMENSIONS + }, + "skipped": True, + "skip_reason": reason, + "model": model, + "doc_sample_count": 0, + } + + +class _JudgeTransportError(RuntimeError): + """Raised when the judge HTTP call fails; converted to skipped result.""" + + +class _AnthropicMessagesClient: + """Minimal urllib-based client for the Anthropic Messages API. + + Kept dependency-free on purpose (no anthropic SDK, no httpx). The Anthropic + eval post calls out that LLM judges benefit from a clean retry/timeout + surface and easy injection in tests — this class is the seam. + """ + + def __init__(self, *, api_key: str, model: str) -> None: + self._api_key = api_key + self._model = model + + def __call__(self, prompt: str) -> str: + body = json.dumps( + { + "model": self._model, + "max_tokens": 600, + "temperature": 0, + "messages": [{"role": "user", "content": prompt}], + } + ).encode() + request = urllib.request.Request( + ANTHROPIC_MESSAGES_URL, + data=body, + headers={ + "content-type": "application/json", + "x-api-key": self._api_key, + "anthropic-version": ANTHROPIC_API_VERSION, + }, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=JUDGE_REQUEST_TIMEOUT_S) as response: # noqa: S310 + payload = json.loads(response.read()) + except urllib.error.HTTPError as exc: + raise _JudgeTransportError(f"HTTP {exc.code}") from exc + except urllib.error.URLError as exc: + raise _JudgeTransportError(str(exc.reason)) from exc + except json.JSONDecodeError as exc: + raise _JudgeTransportError("response was not JSON") from exc + content = payload.get("content") + if isinstance(content, list) and content and isinstance(content[0], dict): + text = content[0].get("text") + if isinstance(text, str): + return text + raise _JudgeTransportError("response missing content[0].text") + + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "pack_dir", + type=Path, + help="path to a pack output dir (containing documents.ndjson)", + ) + parser.add_argument("--task-prompt", required=True, help="the prompt the pack was built for") + parser.add_argument( + "--expected-domain", + action="append", + default=[], + help="authoritative domain(s); repeatable", + ) + parser.add_argument( + "--model", + default=None, + help=f"judge model id (default: ${JUDGE_MODEL_ENV} or {DEFAULT_JUDGE_MODEL})", + ) + return parser + + +def main(argv: list[str] | None = None) -> int: + args = _build_parser().parse_args(argv) + result = judge_pack( + args.pack_dir, + {"prompt": args.task_prompt, "expected_domains": args.expected_domain}, + model=args.model, + ) + json.dump(result, sys.stdout, indent=2) + sys.stdout.write("\n") + return 0 if not result["skipped"] else 2 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/docpull/passk.py b/src/docpull/passk.py new file mode 100644 index 0000000..ee20a02 --- /dev/null +++ b/src/docpull/passk.py @@ -0,0 +1,204 @@ +"""pass^k analyzer for docpull benchmark reports. + +Reads a ``benchmark.report.json`` produced by the harness and reports how +many cases meet a score threshold on *every* trial — the framing the +Anthropic "Demystifying evals for AI agents" post argues for when +consistency is the product claim ("users expect reliable behavior every +time"). Median tells you the typical run; pass^k tells you how often a +case is reliably above bar. + +Usage: + python -m docpull.passk .bench/runs//benchmark.report.json + python -m docpull.passk --thresholds 70 80 90 --score benchmark +""" + +from __future__ import annotations + +import argparse +import json +import sys +from collections import defaultdict +from collections.abc import Iterable +from pathlib import Path +from typing import Any + +SCORE_KEYS = ("pack_score", "benchmark_score") + + +def _runs_for(case: dict[str, Any], score_key: str) -> list[int] | None: + score = case.get(score_key) + if not isinstance(score, dict): + return None + runs = score.get("score_runs") + if not isinstance(runs, list) or not runs: + return None + out: list[int] = [] + for value in runs: + if isinstance(value, bool) or not isinstance(value, int | float): + return None + out.append(int(value)) + return out + + +def _provider_of(case: dict[str, Any]) -> str: + return str(case.get("provider") or case.get("workflow") or "unknown") + + +def pass_at_k( + cases: Iterable[dict[str, Any]], + *, + score_key: str, + threshold: int, +) -> dict[str, Any]: + """Fraction of cases whose worst trial still meets ``threshold``. + + Returns a dict with aggregate counts plus a per-provider breakdown and a + flat list of failing cases so the publication writeup can name them. + """ + total = 0 + passed = 0 + by_provider: dict[str, dict[str, int]] = defaultdict(lambda: {"total": 0, "passed": 0}) + failures: list[dict[str, Any]] = [] + for case in cases: + runs = _runs_for(case, score_key) + if runs is None: + continue + total += 1 + provider = _provider_of(case) + by_provider[provider]["total"] += 1 + worst = min(runs) + if worst >= threshold: + passed += 1 + by_provider[provider]["passed"] += 1 + else: + failures.append( + { + "name": case.get("name"), + "provider": provider, + "runs": runs, + "worst": worst, + "median": sorted(runs)[(len(runs) - 1) // 2], + } + ) + rate = passed / total if total else 0.0 + return { + "score_key": score_key, + "threshold": threshold, + "k": _trials_per_case(cases), + "cases_total": total, + "cases_passed": passed, + "rate": rate, + "by_provider": dict(by_provider), + "failures": failures, + } + + +def _trials_per_case(cases: Iterable[dict[str, Any]]) -> int: + lengths = {len(_runs_for(c, "benchmark_score") or []) for c in cases} + lengths.discard(0) + return max(lengths) if lengths else 0 + + +def _format_table(report: dict[str, Any], results: list[dict[str, Any]]) -> str: + lines = [ + f"run: {report.get('run_dir', '')}", + f"target_set: {report.get('target_set')} cases: {len(report.get('cases', []))} " + f"runs_per_case: {report.get('runs_per_case')}", + "", + f"{'score':<16} {'threshold':<10} {'k':<3} {'pass^k':<10} {'cases':<10}", + "-" * 52, + ] + for r in results: + lines.append( + f"{r['score_key']:<16} {r['threshold']:<10} {r['k']:<3} " + f"{r['rate']:>6.1%} {r['cases_passed']}/{r['cases_total']}" + ) + by_provider = _aggregate_by_provider(results) + if by_provider: + lines += ["", "by provider (benchmark_score):"] + for provider, rows in by_provider.items(): + row = " ".join(f"@{t}={p:.0%}" for t, p in rows.items()) + lines.append(f" {provider:<24} {row}") + failure_lines = _failure_lines(results) + if failure_lines: + lines += ["", "fails-any-trial:", *failure_lines] + return "\n".join(lines) + + +def _aggregate_by_provider(results: list[dict[str, Any]]) -> dict[str, dict[int, float]]: + bench_results = [r for r in results if r["score_key"] == "benchmark_score"] + out: dict[str, dict[int, float]] = defaultdict(dict) + for r in bench_results: + for provider, counts in r["by_provider"].items(): + rate = counts["passed"] / counts["total"] if counts["total"] else 0.0 + out[provider][r["threshold"]] = rate + return dict(out) + + +def _failure_lines(results: list[dict[str, Any]]) -> list[str]: + seen: set[tuple[str, str, int]] = set() + lines: list[str] = [] + for r in results: + if r["score_key"] != "benchmark_score": + continue + for f in r["failures"]: + key = (str(f["name"]), r["score_key"], r["threshold"]) + if key in seen: + continue + seen.add(key) + lines.append( + f" @{r['threshold']:<3} {f['name']} runs={f['runs']} worst={f['worst']}" + ) + return lines + + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("report", type=Path, help="path to benchmark.report.json") + parser.add_argument( + "--thresholds", + type=int, + nargs="+", + default=[70, 80, 90], + help="score thresholds to evaluate (default: 70 80 90)", + ) + parser.add_argument( + "--score", + choices=("pack", "benchmark", "both"), + default="both", + help="which score to evaluate (default: both)", + ) + parser.add_argument("--json", action="store_true", help="emit machine-readable JSON") + return parser + + +def main(argv: list[str] | None = None) -> int: + args = _build_parser().parse_args(argv) + report = json.loads(args.report.read_text()) + cases = report.get("cases") or [] + if args.score == "both": + score_keys: tuple[str, ...] = SCORE_KEYS + else: + score_keys = (f"{args.score}_score",) + results = [ + pass_at_k(cases, score_key=key, threshold=t) + for key in score_keys + for t in sorted(args.thresholds) + ] + if args.json: + json.dump( + {"run_dir": report.get("run_dir"), "results": results}, + sys.stdout, + indent=2, + ) + sys.stdout.write("\n") + else: + print(_format_table(report, results)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_judge.py b/tests/test_judge.py new file mode 100644 index 0000000..3544b3f --- /dev/null +++ b/tests/test_judge.py @@ -0,0 +1,133 @@ +"""Tests for the LLM-judge dimension stub. + +We never call out to the real Anthropic API in tests. The judge accepts a +``client`` callable so we inject canned responses; the default code path +should also skip cleanly when no API key is set. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from docpull.judge import ( + JUDGE_API_KEY_ENV, + JUDGE_DIMENSIONS, + _parse_judgment, + judge_pack, +) + + +def _write_pack(pack_dir: Path, docs: list[dict]) -> None: + pack_dir.mkdir(parents=True, exist_ok=True) + (pack_dir / "documents.ndjson").write_text( + "\n".join(json.dumps(d) for d in docs) + "\n" + ) + + +def test_skips_when_no_api_key(monkeypatch, tmp_path: Path) -> None: + monkeypatch.delenv(JUDGE_API_KEY_ENV, raising=False) + _write_pack(tmp_path, [{"url": "https://x", "title": "t", "content": "c"}]) + + result = judge_pack(tmp_path, {"prompt": "build a pack"}) + + assert result["skipped"] is True + assert result["skip_reason"] == f"{JUDGE_API_KEY_ENV} not set" + assert result["score"] is None + for dim in JUDGE_DIMENSIONS: + assert result["dimensions"][dim]["score"] is None + + +def test_skips_when_pack_missing(tmp_path: Path) -> None: + result = judge_pack(tmp_path, {"prompt": "build a pack"}, client=lambda _p: "{}") + assert result["skipped"] is True + assert "documents.ndjson missing" in result["skip_reason"] + + +def test_skips_when_pack_empty(tmp_path: Path) -> None: + (tmp_path / "documents.ndjson").write_text("") + result = judge_pack(tmp_path, {"prompt": "p"}, client=lambda _p: "{}") + assert result["skipped"] is True + assert "zero documents" in result["skip_reason"] + + +def test_parses_judge_response(tmp_path: Path) -> None: + _write_pack( + tmp_path, + [ + {"url": f"https://docs.example.com/{i}", "title": f"t{i}", "content": "body"} + for i in range(3) + ], + ) + response = json.dumps( + { + "coverage": {"score": 90, "rationale": "covers everything"}, + "groundedness": {"score": 80, "rationale": "ok"}, + "source_authority": {"score": 70, "rationale": "ok"}, + "synthesis_readiness": {"score": 60, "rationale": "ok"}, + } + ) + + result = judge_pack(tmp_path, {"prompt": "p"}, client=lambda _p: response) + + assert result["skipped"] is False + # Weighted: 90*.35 + 80*.25 + 70*.20 + 60*.20 = 78.5 -> 78 + assert result["score"] == 78 + assert result["doc_sample_count"] == 3 + assert result["dimensions"]["coverage"]["score"] == 90 + + +def test_unknown_dimensions_are_excluded_from_blend(tmp_path: Path) -> None: + _write_pack(tmp_path, [{"url": "https://x", "title": "t", "content": "c"}]) + response = json.dumps( + { + "coverage": {"score": 90, "rationale": "ok"}, + "groundedness": {"score": "Unknown", "rationale": "not enough evidence"}, + "source_authority": {"score": 80, "rationale": "ok"}, + "synthesis_readiness": {"score": "Unknown", "rationale": "n/a"}, + } + ) + + result = judge_pack(tmp_path, {"prompt": "p"}, client=lambda _p: response) + + assert result["dimensions"]["groundedness"]["score"] is None + # Only coverage(.35) + source_authority(.20) contribute; renormalized + # (90*.35 + 80*.20) / (.35 + .20) = 86.36 -> 86 + assert result["score"] == 86 + + +def test_non_json_response_is_skipped(tmp_path: Path) -> None: + _write_pack(tmp_path, [{"url": "https://x", "title": "t", "content": "c"}]) + result = judge_pack(tmp_path, {"prompt": "p"}, client=lambda _p: "I cannot grade this") + assert result["skipped"] is True + assert "non-JSON" in result["skip_reason"] + + +def test_response_clamped_to_0_100(tmp_path: Path) -> None: + _write_pack(tmp_path, [{"url": "https://x", "title": "t", "content": "c"}]) + response = json.dumps( + { + "coverage": {"score": 150, "rationale": "ok"}, + "groundedness": {"score": -10, "rationale": "ok"}, + "source_authority": {"score": 50, "rationale": "ok"}, + "synthesis_readiness": {"score": 50, "rationale": "ok"}, + } + ) + + result = judge_pack(tmp_path, {"prompt": "p"}, client=lambda _p: response) + + assert result["dimensions"]["coverage"]["score"] == 100 + assert result["dimensions"]["groundedness"]["score"] == 0 + + +def test_parse_judgment_finds_json_within_prose() -> None: + response = 'Here is my judgment:\n\n{"coverage": {"score": 80}}\n\nLet me know.' + parsed = _parse_judgment(response) + assert parsed == {"coverage": {"score": 80}} + + +@pytest.mark.parametrize("bad", ["no braces", "{not json}", "", "{}{"]) +def test_parse_judgment_rejects_malformed(bad: str) -> None: + assert _parse_judgment(bad) is None or _parse_judgment(bad) == {} diff --git a/tests/test_passk.py b/tests/test_passk.py new file mode 100644 index 0000000..c2e4c1d --- /dev/null +++ b/tests/test_passk.py @@ -0,0 +1,83 @@ +"""Tests for the pass^k analyzer.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from docpull.passk import main as passk_main +from docpull.passk import pass_at_k + + +def _case(name: str, provider: str, runs: list[int]) -> dict: + return { + "name": name, + "provider": provider, + "pack_score": {"score": runs[len(runs) // 2], "score_runs": runs}, + "benchmark_score": {"score": runs[len(runs) // 2], "score_runs": runs}, + } + + +def test_pass_at_k_all_pass() -> None: + cases = [_case("a/x", "alpha", [95, 96, 94]), _case("b/x", "alpha", [90, 92, 91])] + result = pass_at_k(cases, score_key="benchmark_score", threshold=90) + assert result["cases_total"] == 2 + assert result["cases_passed"] == 2 + assert result["rate"] == 1.0 + assert result["k"] == 3 + assert result["failures"] == [] + + +def test_pass_at_k_one_failure() -> None: + cases = [ + _case("a/x", "alpha", [95, 96, 94]), + _case("b/x", "alpha", [89, 92, 91]), # worst=89 fails @90 + ] + result = pass_at_k(cases, score_key="benchmark_score", threshold=90) + assert result["cases_passed"] == 1 + assert result["rate"] == 0.5 + assert len(result["failures"]) == 1 + assert result["failures"][0]["worst"] == 89 + + +def test_pass_at_k_per_provider_breakdown() -> None: + cases = [ + _case("a/x", "alpha", [95, 96, 94]), + _case("b/x", "alpha", [80, 81, 82]), + _case("c/x", "beta", [95, 95, 95]), + ] + result = pass_at_k(cases, score_key="benchmark_score", threshold=90) + assert result["by_provider"]["alpha"] == {"total": 2, "passed": 1} + assert result["by_provider"]["beta"] == {"total": 1, "passed": 1} + + +def test_pass_at_k_skips_cases_without_runs() -> None: + cases = [ + _case("a/x", "alpha", [95, 96, 94]), + {"name": "b/x", "provider": "alpha", "pack_score": None, "benchmark_score": None}, + ] + result = pass_at_k(cases, score_key="benchmark_score", threshold=90) + assert result["cases_total"] == 1 + + +def test_pass_at_k_threshold_boundary_is_inclusive() -> None: + cases = [_case("a/x", "alpha", [90, 95, 92])] # worst==threshold passes + result = pass_at_k(cases, score_key="benchmark_score", threshold=90) + assert result["cases_passed"] == 1 + + +def test_cli_emits_json(tmp_path: Path, capsys) -> None: + report = { + "schema_version": 2, + "run_dir": str(tmp_path), + "target_set": "x", + "runs_per_case": 3, + "cases": [_case("a/x", "alpha", [95, 96, 94]), _case("b/x", "alpha", [80, 81, 82])], + } + path = tmp_path / "report.json" + path.write_text(json.dumps(report)) + rc = passk_main([str(path), "--thresholds", "90", "--score", "benchmark", "--json"]) + assert rc == 0 + out = json.loads(capsys.readouterr().out) + assert out["results"][0]["rate"] == 0.5 + assert out["results"][0]["cases_passed"] == 1 From 561e7879d39c89d18372ea1e953c8027cee45003 Mon Sep 17 00:00:00 2001 From: admin-raintree <277948009+admin-raintree@users.noreply.github.com> Date: Tue, 9 Jun 2026 16:24:48 -0700 Subject: [PATCH 4/4] chore: ruff format + bandit B310 suppression on judge urlopen CI runs `ruff format --check` (stricter than `ruff check`) and bandit separately; my local sweep used only `ruff check`. Apply formatting and switch the urlopen suppression from `# noqa: S310` (ruff prefix) to `# nosec B310` (bandit's directive, matching the existing pattern at benchmark.py:2491). Co-Authored-By: Claude Opus 4.7 --- src/docpull/benchmark.py | 6 +----- src/docpull/judge.py | 9 ++------- src/docpull/passk.py | 8 ++------ tests/test_judge.py | 9 ++------- 4 files changed, 7 insertions(+), 25 deletions(-) diff --git a/src/docpull/benchmark.py b/src/docpull/benchmark.py index 97df90b..f3bc7a6 100644 --- a/src/docpull/benchmark.py +++ b/src/docpull/benchmark.py @@ -2645,11 +2645,7 @@ def _pass_at_k_summary( tells you the typical run; pass^k tells you how often a case is reliably above bar. Cache-only cases are excluded — they're not scored. """ - scored_cases = [ - case - for case, cache_only in zip(cases, cache_only_cases, strict=True) - if not cache_only - ] + scored_cases = [case for case, cache_only in zip(cases, cache_only_cases, strict=True) if not cache_only] if not scored_cases: return {"k": 0, "thresholds": list(PASS_AT_K_THRESHOLDS), "results": {}} results: dict[str, list[dict[str, Any]]] = {} diff --git a/src/docpull/judge.py b/src/docpull/judge.py index 7c6878d..e09a40e 100644 --- a/src/docpull/judge.py +++ b/src/docpull/judge.py @@ -176,12 +176,7 @@ def _format_documents(docs: list[dict[str, Any]]) -> str: content = str(doc.get("content") or "") if len(content) > MAX_DOC_CHARS: content = content[:MAX_DOC_CHARS] + " …" - chunks.append( - f"--- doc {i} ---\n" - f"url: {doc.get('url')}\n" - f"title: {doc.get('title')}\n\n" - f"{content}" - ) + chunks.append(f"--- doc {i} ---\nurl: {doc.get('url')}\ntitle: {doc.get('title')}\n\n{content}") return "\n\n".join(chunks) @@ -303,7 +298,7 @@ def __call__(self, prompt: str) -> str: method="POST", ) try: - with urllib.request.urlopen(request, timeout=JUDGE_REQUEST_TIMEOUT_S) as response: # noqa: S310 + with urllib.request.urlopen(request, timeout=JUDGE_REQUEST_TIMEOUT_S) as response: # nosec B310 payload = json.loads(response.read()) except urllib.error.HTTPError as exc: raise _JudgeTransportError(f"HTTP {exc.code}") from exc diff --git a/src/docpull/passk.py b/src/docpull/passk.py index ee20a02..a818ea6 100644 --- a/src/docpull/passk.py +++ b/src/docpull/passk.py @@ -146,9 +146,7 @@ def _failure_lines(results: list[dict[str, Any]]) -> list[str]: if key in seen: continue seen.add(key) - lines.append( - f" @{r['threshold']:<3} {f['name']} runs={f['runs']} worst={f['worst']}" - ) + lines.append(f" @{r['threshold']:<3} {f['name']} runs={f['runs']} worst={f['worst']}") return lines @@ -184,9 +182,7 @@ def main(argv: list[str] | None = None) -> int: else: score_keys = (f"{args.score}_score",) results = [ - pass_at_k(cases, score_key=key, threshold=t) - for key in score_keys - for t in sorted(args.thresholds) + pass_at_k(cases, score_key=key, threshold=t) for key in score_keys for t in sorted(args.thresholds) ] if args.json: json.dump( diff --git a/tests/test_judge.py b/tests/test_judge.py index 3544b3f..49f099d 100644 --- a/tests/test_judge.py +++ b/tests/test_judge.py @@ -22,9 +22,7 @@ def _write_pack(pack_dir: Path, docs: list[dict]) -> None: pack_dir.mkdir(parents=True, exist_ok=True) - (pack_dir / "documents.ndjson").write_text( - "\n".join(json.dumps(d) for d in docs) + "\n" - ) + (pack_dir / "documents.ndjson").write_text("\n".join(json.dumps(d) for d in docs) + "\n") def test_skips_when_no_api_key(monkeypatch, tmp_path: Path) -> None: @@ -56,10 +54,7 @@ def test_skips_when_pack_empty(tmp_path: Path) -> None: def test_parses_judge_response(tmp_path: Path) -> None: _write_pack( tmp_path, - [ - {"url": f"https://docs.example.com/{i}", "title": f"t{i}", "content": "body"} - for i in range(3) - ], + [{"url": f"https://docs.example.com/{i}", "title": f"t{i}", "content": "body"} for i in range(3)], ) response = json.dumps( {