diff --git a/CLAUDE.md b/CLAUDE.md index 0509fbb..0de10ce 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -96,3 +96,17 @@ Three documented internal boundaries. AI agents must respect them — never cros - Check the relevant [`architecture/`](architecture/) capability file before adding a new module or extension point. - Surface ambiguity as a documentation gap rather than improvising. + +## Agent skills + +### Issue tracker + +Issues live in GitHub Issues (`modern-python/httpware`), managed via the `gh` CLI; external PRs are not a triage surface. See `planning/agents/issue-tracker.md`. + +### Triage labels + +Canonical defaults — `needs-triage`, `needs-info`, `ready-for-agent`, `ready-for-human`, `wontfix` (the last already exists). See `planning/agents/triage-labels.md`. + +### Domain docs + +Single-context — one `CONTEXT.md` at the repo root + ADRs under `planning/adr/`. See `planning/agents/domain.md`. diff --git a/architecture/resilience.md b/architecture/resilience.md index af29c6d..620687a 100644 --- a/architecture/resilience.md +++ b/architecture/resilience.md @@ -6,6 +6,8 @@ `Retry` (and `AsyncRetry`) is a retry middleware backed by a Finagle-style `RetryBudget` — a token bucket that caps the proportion of traffic spent on retries so a degraded backend cannot be amplified into a retry storm. `RetryBudget` is a single thread-safe class shared by both worlds: all mutations go through a `threading.Lock`, so state is never torn. "Safe" here means no corruption, not non-blocking — when one budget is shared across a (sync `Client`, `AsyncClient`) pair, a sync thread holding the lock can briefly block the event-loop thread's acquisition. The critical section is intentionally tiny to bound that latency. Backoff between attempts uses full-jitter. +The decision logic — status/method eligibility, streaming-body refusal, exhaustion, Retry-After handling, budget accounting, and the backoff delay — lives once in a stateless private `_RetryPolicy.decide`, the retry analog of how the circuit breaker keeps its transition logic in one shared state object. `Retry` and `AsyncRetry` are thin loop drivers over that policy: they own the attempt loop, the terminal call, and the sleep, and differ only in `await next` vs `next` and `asyncio.sleep` vs `time.sleep`. `decide` returns the delay to sleep before the next attempt, or raises the terminal exception (with its PEP 678 note and event already emitted); because it runs inside the wrapper's `except` block, exception chaining behaves as a direct raise. `_RetryPolicy` holds the immutable config plus the shared `RetryBudget`; per-attempt state stays as wrapper locals, so one instance is safe across the concurrent requests it serves. + ## Bulkhead `Bulkhead` / `AsyncBulkhead` is a concurrency limiter. `AsyncBulkhead` uses `asyncio.Semaphore` with a bounded acquire wait; sync `Bulkhead` uses `threading.Semaphore`. A sync instance cannot share with an async one. Both are sharable across clients (one instance = one shared concurrency pool). diff --git a/planning/agents/domain.md b/planning/agents/domain.md new file mode 100644 index 0000000..dcb978e --- /dev/null +++ b/planning/agents/domain.md @@ -0,0 +1,39 @@ +# Domain Docs + +How the engineering skills should consume this repo's domain documentation when exploring the codebase. + +**Layout: single-context.** One `CONTEXT.md` at the repo root + ADRs under `planning/adr/`. + +ADRs live under `planning/` (internal docs) rather than `docs/` (the user-facing mkdocs site). This repo also keeps per-capability living truth in [`architecture/`](../../architecture/) and per-change design under [`planning/changes/`](../changes/) — read those for established context before writing a new ADR. + +## Before exploring, read these + +- **`CONTEXT.md`** at the repo root. +- **`planning/adr/`** — read ADRs that touch the area you're about to work in. + +If any of these files don't exist, **proceed silently**. Don't flag their absence; don't suggest creating them upfront. The `/domain-modeling` skill (reached via `/grill-with-docs` and `/improve-codebase-architecture`) creates them lazily when terms or decisions actually get resolved. + +## File structure + +Single-context repo (this repo): + +``` +/ +├── CONTEXT.md +├── planning/adr/ +│ ├── 0001-some-decision.md +│ └── 0002-another-decision.md +└── src/ +``` + +## Use the glossary's vocabulary + +When your output names a domain concept (in an issue title, a refactor proposal, a hypothesis, a test name), use the term as defined in `CONTEXT.md`. Don't drift to synonyms the glossary explicitly avoids. + +If the concept you need isn't in the glossary yet, that's a signal — either you're inventing language the project doesn't use (reconsider) or there's a real gap (note it for `/domain-modeling`). + +## Flag ADR conflicts + +If your output contradicts an existing ADR, surface it explicitly rather than silently overriding: + +> _Contradicts ADR-0007 (some decision) — but worth reopening because…_ diff --git a/planning/agents/issue-tracker.md b/planning/agents/issue-tracker.md new file mode 100644 index 0000000..a6ef151 --- /dev/null +++ b/planning/agents/issue-tracker.md @@ -0,0 +1,34 @@ +# Issue tracker: GitHub + +Issues and PRDs for this repo live as GitHub issues. Use the `gh` CLI for all operations. + +## Conventions + +- **Create an issue**: `gh issue create --title "..." --body "..."`. Use a heredoc for multi-line bodies. +- **Read an issue**: `gh issue view --comments`, filtering comments by `jq` and also fetching labels. +- **List issues**: `gh issue list --state open --json number,title,body,labels,comments --jq '[.[] | {number, title, body, labels: [.labels[].name], comments: [.comments[].body]}]'` with appropriate `--label` and `--state` filters. +- **Comment on an issue**: `gh issue comment --body "..."` +- **Apply / remove labels**: `gh issue edit --add-label "..."` / `--remove-label "..."` +- **Close**: `gh issue close --comment "..."` + +Infer the repo from `git remote -v` — `gh` does this automatically when run inside a clone. This repo's remote is `modern-python/httpware`. + +## Pull requests as a triage surface + +**PRs as a request surface: no.** _(Set to `yes` if this repo treats external PRs as feature requests; `/triage` reads this flag.)_ + +When set to `yes`, PRs run through the same labels and states as issues, using the `gh pr` equivalents: + +- **Read a PR**: `gh pr view --comments` and `gh pr diff ` for the diff. +- **List external PRs for triage**: `gh pr list --state open --json number,title,body,labels,author,authorAssociation,comments` then keep only `authorAssociation` of `CONTRIBUTOR`, `FIRST_TIME_CONTRIBUTOR`, or `NONE` (drop `OWNER`/`MEMBER`/`COLLABORATOR`). +- **Comment / label / close**: `gh pr comment`, `gh pr edit --add-label`/`--remove-label`, `gh pr close`. + +GitHub shares one number space across issues and PRs, so a bare `#42` may be either — resolve with `gh pr view 42` and fall back to `gh issue view 42`. + +## When a skill says "publish to the issue tracker" + +Create a GitHub issue. + +## When a skill says "fetch the relevant ticket" + +Run `gh issue view --comments`. diff --git a/planning/agents/triage-labels.md b/planning/agents/triage-labels.md new file mode 100644 index 0000000..83e93f1 --- /dev/null +++ b/planning/agents/triage-labels.md @@ -0,0 +1,17 @@ +# Triage Labels + +The skills speak in terms of five canonical triage roles. This file maps those roles to the actual label strings used in this repo's issue tracker. + +| Canonical role | Label in our tracker | Meaning | +| -------------------------- | -------------------- | ---------------------------------------- | +| `needs-triage` | `needs-triage` | Maintainer needs to evaluate this issue | +| `needs-info` | `needs-info` | Waiting on reporter for more information | +| `ready-for-agent` | `ready-for-agent` | Fully specified, ready for an AFK agent | +| `ready-for-human` | `ready-for-human` | Requires human implementation | +| `wontfix` | `wontfix` | Will not be actioned | + +`wontfix` already exists in this repo's GitHub labels. The other four are created on first use by `/triage` (`gh label create `). + +When a skill mentions a role (e.g. "apply the AFK-ready triage label"), use the corresponding label string from this table. + +Edit the right-hand column to match whatever vocabulary you actually use. diff --git a/planning/changes/2026-06-23.01-retry-policy-extraction/design.md b/planning/changes/2026-06-23.01-retry-policy-extraction/design.md new file mode 100644 index 0000000..c743267 --- /dev/null +++ b/planning/changes/2026-06-23.01-retry-policy-extraction/design.md @@ -0,0 +1,173 @@ +--- +status: shipped +date: 2026-06-23 +slug: retry-policy-extraction +summary: Extract a stateless _RetryPolicy decision module from the duplicated AsyncRetry/Retry __call__ loops. +supersedes: null +superseded_by: null +pr: 76 +outcome: Shipped via #76 — decision logic moved into a stateless _RetryPolicy.decide; AsyncRetry/Retry are now thin loop drivers, the ~110-line sync/async duplication is gone, behaviour byte-identical (718 tests, 100% coverage). New seam suite tests/test_retry_policy.py; promoted into architecture/resilience.md. Internal refactor — no release. +--- + +# Design: Extract a deep `_RetryPolicy` decision module + +## Summary + +`AsyncRetry.__call__` and `Retry.__call__` hand-copy ~110 lines of retry +*decision* logic — status eligibility, streaming-body refusal, exhaustion, +Retry-After parsing, budget accounting, backoff — differing only in `await +next` vs `next` and `asyncio.sleep` vs `time.sleep`. This change pulls the +decision logic into a stateless private `_RetryPolicy` in the same module, so +both wrappers shrink to a thin loop and the decision lives once. It mirrors +the precedent already in the package: `CircuitBreaker`/`AsyncCircuitBreaker` +share the lock-free `_CircuitBreakerState`. + +## Motivation + +- `retry.py:100-210` (`AsyncRetry.__call__`) and `retry.py:213-349` + (`Retry.__call__`) are ~110 lines each, byte-identical except the `await`. + Parity is hand-maintained; drift is undetectable. Both carry + `# noqa: C901, PLR0912, PLR0915` to silence the complexity budget. +- The package already proved the fix: `_CircuitBreakerState` + (`circuit_breaker.py:131-310`) is a deep, synchronous, lock-free decision + module that both breaker wrappers drive. Retry never got the same treatment. +- **Depth:** the retry interface (the `Middleware` protocol — one `__call__`) + is small, but the implementation is duplicated rather than deep. Moving the + decision behind `_RetryPolicy.decide` concentrates it: one place to fix a + retry bug (locality), one interface to test directly without `MockTransport` + (leverage). + +## Non-goals + +- No behaviour change. The retry policy, defaults, events, notes, and raised + exceptions stay byte-identical. +- Not touching `RetryBudget`, `_backoff.full_jitter_delay`, or + `_parse_retry_after` — they stay as-is. +- Not unifying the sync/async wrappers themselves — the `await`/blocking split + is fundamental and stays in the two thin `__call__` shells. +- Not extending the same treatment to `Bulkhead` in this change. + +## Design + +### 1. `_RetryPolicy` — stateless decision module + +A private class in `retry.py`, holding **immutable config + the shared +budget** and nothing per-call mutable. This is the faithful analog of +`_CircuitBreakerState`: there the *circuit* is the shared state; here the +shared state is the already-thread-safe `RetryBudget`, and `_RetryPolicy` is +the decision logic around it. Because it carries no per-call field, it is +trivially safe under the concurrent requests a single frozen middleware +instance serves. + +It owns: + +- config: `max_attempts`, `base_delay`, `max_delay`, `retry_status_codes`, + `retry_methods`, `respect_retry_after`, `budget`; +- validation: `max_attempts < 1` → `ValueError` (raised when the wrapper + builds the policy in `__init__`, so construction-time behaviour is + unchanged); +- the `_LOGGER` event emissions and PEP-678 note additions (side effects move + here with the decision). + +### 2. The seam — one method + +```python +def decide(self, *, attempt: int, request: httpx2.Request, exc: BaseException) -> float +``` + +- **Returns** the `float` delay to sleep for the retry case. +- **Raises** for every terminal case, having already added the note, emitted + the event, and (for the budget case) constructed `RetryBudgetExhaustedError` + with its `__cause__`. `decide` is called *inside* the wrapper's `except` + block, so implicit `__context__` and explicit `raise ... from exc` chaining + behave exactly as today — no manual `__cause__` fiddling. + +Classification is folded in (no separate predicate): derive `last_response` +from `isinstance(exc, StatusError)`; apply method-eligibility and status-set +membership; re-raise non-retryable failures unchanged; otherwise walk +streaming-refusal → exhaustion → Retry-After-exceeds-`max_delay` → budget +`try_withdraw` → delay (Retry-After value or `full_jitter_delay`). + +Rejected alternative: a `_Sleep | _Stop` sum type. It defers the raise to +*after* the `except` block, losing the active exception context and forcing +manual chain reconstruction — machinery that exists only to paper over that. +Returning-a-delay-or-raising matches `_CircuitBreakerState.admit()`, which +already raises `CircuitOpenError` rather than returning a rejected value. + +### 3. The wrappers shrink to a thin driver + +```python +_RETRYABLE_EXCEPTIONS = (StatusError, NetworkError, TimeoutError) + +async def __call__(self, request: httpx2.Request, next: AsyncNext) -> httpx2.Response: + self.budget.deposit() + for attempt in range(self._policy.max_attempts): + try: + return await next(request) + except _RETRYABLE_EXCEPTIONS as exc: + delay = self._policy.decide(attempt=attempt, request=request, exc=exc) + await self._sleep(delay) + raise AssertionError("unreachable") # pragma: no cover +``` + +The sync `Retry.__call__` is identical but for `next(request)` and +`self._sleep(delay)`. `_RETRYABLE_EXCEPTIONS` is one module constant +referenced by both — the narrow catch surface stays structural, so anything +not in the tuple (e.g. `httpx2.InvalidURL`, programming errors) propagates +untouched exactly as today. The `# noqa: C901, PLR0912, PLR0915` suppressions +come off `__call__`; `decide` may carry its own. + +### 4. Preserved public contract + +- `AsyncRetry.__init__` / `Retry.__init__` signatures unchanged (incl. + `_sleep`, `budget`). +- The wrapper keeps `self.budget` (the *same object* the policy holds, so + `r1.budget is r2.budget` identity tests pass) and `self._sleep`. +- The six config attributes (`max_attempts`, `base_delay`, `max_delay`, + `retry_status_codes`, `retry_methods`, `respect_retry_after`) are **dropped** + from the wrapper instances — they live solely on `_RetryPolicy`. They are + read nowhere outside `retry.py` and `docs/resilience.md` documents them only + as constructor parameters, not readable attributes. + +## Operations + +None — internal refactor, no infra or external changes. + +## Out of scope + +- `Bulkhead`/`AsyncBulkhead` deduplication. +- Injecting randomness into `full_jitter_delay` (see Testing — only needed if + we want exact-value assertions on the jitter path). + +## Testing + +- **Parity net:** all existing `MockTransport` suites — `test_retry.py`, + `test_retry_sync.py`, `test_retry_props.py`, + `test_retry_budget_threadsafety.py`, `test_threading_with_shared_budget.py` + — stay green unchanged. Byte-identical behaviour is the bar. +- **New seam tests:** `tests/test_retry_policy.py` drives `decide` directly + (no client, no `MockTransport`) across the decision matrix: retryable → + returns a delay; non-retryable status / non-eligible method → re-raises the + original; streaming-body refusal; exhaustion note on the last attempt; + Retry-After > `max_delay`; budget refusal → `RetryBudgetExhaustedError` with + `__cause__`. +- The jitter path returns a random delay, so assert **bounds** + (`0 ≤ delay ≤ max_delay`) for it; assert exact values only on the + deterministic Retry-After path. +- `just lint` and `just test` both clean. + +## Risk + +- **Behavioural drift during extraction** (likely × high): a subtle + reordering changes a note string, an event payload, or which exception wins. + *Mitigation:* extract under the existing green suites; they assert notes, + events (via the recording sleeper / caplog), and exception types. Do not + edit the test suites in this change. +- **Exception-chaining regression** (low × medium): moving the raise into + `decide` could drop a `__cause__`/`__context__`. *Mitigation:* `decide` is + called inside the live `except`; an explicit test asserts `__cause__` on the + budget-exhausted path. +- **Concurrency** (low × high): a stray per-call field on `_RetryPolicy` would + make a shared instance unsafe. *Mitigation:* the policy holds only immutable + config + the lock-guarded budget; per-attempt state stays as wrapper locals. + The property/thread-safety suites cover interleaving. diff --git a/planning/changes/2026-06-23.01-retry-policy-extraction/plan.md b/planning/changes/2026-06-23.01-retry-policy-extraction/plan.md new file mode 100644 index 0000000..a69e4bc --- /dev/null +++ b/planning/changes/2026-06-23.01-retry-policy-extraction/plan.md @@ -0,0 +1,161 @@ +--- +status: shipped +date: 2026-06-23 +slug: retry-policy-extraction +spec: retry-policy-extraction +pr: 76 +--- + +# retry-policy-extraction — implementation plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use +> superpowers:subagent-driven-development (recommended) or +> superpowers:executing-plans to implement this plan task-by-task. Steps +> use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Move the duplicated retry decision logic into a stateless +`_RetryPolicy`, shrinking `AsyncRetry`/`Retry` to thin loop drivers with no +behaviour change. + +**Spec:** [`design.md`](./design.md) + +**Branch:** `refactor/retry-policy-extraction` + +**Commit strategy:** Per-task commits. + +--- + +### Task 1: Extract `_RetryPolicy` and thin the wrappers + +**Files:** +- Modify: `src/httpware/middleware/resilience/retry.py` + +Introduce the decision module; both wrappers drive it. Existing suites are the +parity net — do not edit them in this task. + +- [ ] **Step 1: Add the catch-surface constant** + + Add `_RETRYABLE_EXCEPTIONS = (StatusError, NetworkError, TimeoutError)` at + module level (near the other module constants). + +- [ ] **Step 2: Add `_RetryPolicy`** + + New private class holding `max_attempts`, `base_delay`, `max_delay`, + `retry_status_codes`, `retry_methods`, `respect_retry_after`, `budget`. + Move the `max_attempts < 1` → `ValueError` validation here. Add: + + ```python + def decide(self, *, attempt: int, request: httpx2.Request, exc: BaseException) -> float + ``` + + Port the decision logic verbatim from `AsyncRetry.__call__` (lines ~108-206): + classification (derive `last_response` from `isinstance(exc, StatusError)`, + method-eligibility, status-set), streaming-refusal, exhaustion, Retry-After + vs `max_delay`, budget `try_withdraw`, delay choice. Keep every `_emit_event` + call, note string, and `RetryBudgetExhaustedError(... ) from exc` exactly as + today. Terminal cases `raise`; the retry case `return delay`. + +- [ ] **Step 3: Rewrite `AsyncRetry`** + + `__init__` keeps its signature; build `self._policy = _RetryPolicy(...)`, set + `self.budget = self._policy.budget` and `self._sleep = _sleep`. Drop the six + config attributes. Replace `__call__` body with the thin loop (deposit → + `for attempt in range(self._policy.max_attempts)` → try/`await next` → + `except _RETRYABLE_EXCEPTIONS as exc: delay = self._policy.decide(...)` → + `await self._sleep(delay)`). Remove the now-unneeded + `# noqa: C901, PLR0912, PLR0915` from `__call__`. + +- [ ] **Step 4: Rewrite `Retry`** + + Identical to Step 3 but `next(request)` and `self._sleep(delay)`. Both + wrappers share the one `_RetryPolicy`/`_RETRYABLE_EXCEPTIONS`. + +- [ ] **Step 5: Verify parity** + + ```bash + just test tests/test_retry.py tests/test_retry_sync.py tests/test_retry_props.py \ + tests/test_retry_budget_threadsafety.py tests/test_threading_with_shared_budget.py + ``` + All green, unchanged. If any fail, the extraction drifted — fix the policy, + not the tests. + +- [ ] **Step 6: Commit** + + ```bash + git add src/httpware/middleware/resilience/retry.py + git commit -m "refactor(retry): extract stateless _RetryPolicy decision module + + Co-Authored-By: Claude Opus 4.8 (1M context) " + ``` + +--- + +### Task 2: Add seam-level policy tests + +**Files:** +- Create: `tests/test_retry_policy.py` + +Drive `decide` directly across the decision matrix — no client, no +`MockTransport`. + +- [ ] **Step 1: Write the matrix** + + Cover: retryable status / network / timeout → returns delay within + `0 ≤ delay ≤ max_delay`; non-retryable status → re-raises original; + non-eligible method → re-raises original; streaming-body (`STREAMING_BODY_MARKER` + set) → raises with refusal note; exhaustion (last attempt) → raises with + "gave up after N" note; Retry-After > `max_delay` → raises with note; + Retry-After ≤ `max_delay` → returns that exact value; budget refusal → + `RetryBudgetExhaustedError` with populated fields and `__cause__` set. + Annotate all test args. Build `httpx2.Request`/`StatusError` fixtures + directly; inject a zero/stingy `RetryBudget` for the refusal case. + +- [ ] **Step 2: Run** + + ```bash + just test tests/test_retry_policy.py + ``` + All green. + +- [ ] **Step 3: Commit** + + ```bash + git add tests/test_retry_policy.py + git commit -m "test(retry): cover _RetryPolicy.decide at the seam + + Co-Authored-By: Claude Opus 4.8 (1M context) " + ``` + +--- + +### Task 3: Promote to architecture, lint, full suite + +**Files:** +- Modify: `architecture/resilience.md` +- Modify: `planning/changes/2026-06-23.01-retry-policy-extraction/design.md` (frontmatter at ship) + +- [ ] **Step 1: Promote the living truth** + + In `architecture/resilience.md`, document `_RetryPolicy` as the shared + decision module behind `AsyncRetry`/`Retry`, mirroring how the doc already + frames `_CircuitBreakerState`. Keep it prose, no frontmatter. + +- [ ] **Step 2: Full gate** + + ```bash + just lint && just test + ``` + Both clean. Confirm the `httpx2._` and other review-only invariants still + hold in the diff. + +- [ ] **Step 3: Ship frontmatter + commit** + + Set `status: shipped`, `pr`, and `outcome` in `design.md` once the PR number + exists. Run `just index` to confirm the listing regenerates. + + ```bash + git add architecture/resilience.md planning/changes/2026-06-23.01-retry-policy-extraction/ + git commit -m "docs(resilience): promote _RetryPolicy into architecture truth + + Co-Authored-By: Claude Opus 4.8 (1M context) " + ``` diff --git a/src/httpware/middleware/resilience/retry.py b/src/httpware/middleware/resilience/retry.py index 7a74cec..311114e 100644 --- a/src/httpware/middleware/resilience/retry.py +++ b/src/httpware/middleware/resilience/retry.py @@ -1,10 +1,18 @@ -"""AsyncRetry middleware — automatic retry of transient failures with budget control. +"""AsyncRetry + Retry middleware — automatic retry of transient failures with budget control. See planning/specs/2026-06-05-retry-and-retry-budget-design.md for the full contract. -Status-code retry: the AsyncClient terminal raises StatusError subclasses on 4xx/5xx, -so AsyncRetry catches StatusError and inspects exc.response.status_code. The original -StatusError subclass is re-raised unwrapped on exhaustion, with a PEP 678 note added. +Status-code retry: the client terminal raises StatusError subclasses on 4xx/5xx, +so the retry middleware catches StatusError and inspects exc.response.status_code. The +original StatusError subclass is re-raised unwrapped on exhaustion, with a PEP 678 note +added. + +The decision logic lives in the lock-free, stateless _RetryPolicy, shared by both +wrappers (mirroring _CircuitBreakerState). AsyncRetry and Retry are thin loop drivers: +they own the attempt loop, the terminal call, and the sleep, and differ only in +``await next`` vs ``next`` and ``asyncio.sleep`` vs ``time.sleep``. _RetryPolicy holds +the immutable config plus the shared RetryBudget; per-attempt state stays as wrapper +locals, so a single instance is safe across the concurrent requests it serves. """ import asyncio @@ -45,6 +53,10 @@ } ) +# Catch surface for both wrappers. Narrow by design: anything not in this tuple +# (e.g. httpx2.InvalidURL, programming errors) propagates untouched. +_RETRYABLE_EXCEPTIONS = (StatusError, NetworkError, TimeoutError) + _MAX_ATTEMPTS_INVALID = "max_attempts must be >= 1" _STREAMING_BODY_REFUSAL_NOTE = "httpware: not retrying — request body is a stream that cannot replay across attempts" _RETRY_AFTER_EXCEEDS_MAX_DELAY_NOTE = ( @@ -71,20 +83,27 @@ def _parse_retry_after(value: str) -> float | None: return max(0.0, delta) -class AsyncRetry: - """Async retry middleware. See module docstring for default policy.""" +class _RetryPolicy: + """Stateless retry decision module shared by AsyncRetry + Retry. + + Holds the immutable retry config plus the shared RetryBudget and nothing + per-call mutable, so a single instance is safe across concurrent requests. + ``decide`` is synchronous and does the whole decision: it returns the delay + to sleep before the next attempt, or raises the terminal exception (having + added the PEP 678 note and emitted the event). It is invoked from inside the + wrapper's ``except`` block, so exception chaining behaves as a direct raise. + """ def __init__( # noqa: PLR0913 — retry policy has many orthogonal knobs; a dataclass would be worse self, *, - max_attempts: int = 3, - base_delay: float = 0.1, - max_delay: float = 5.0, - retry_status_codes: frozenset[int] = DEFAULT_RETRY_STATUS_CODES, - retry_methods: frozenset[str] = DEFAULT_IDEMPOTENT_METHODS, - respect_retry_after: bool = True, - budget: RetryBudget | None = None, - _sleep: Callable[[float], Awaitable[None]] = asyncio.sleep, + max_attempts: int, + base_delay: float, + max_delay: float, + retry_status_codes: frozenset[int], + retry_methods: frozenset[str], + respect_retry_after: bool, + budget: RetryBudget | None, ) -> None: if max_attempts < 1: raise ValueError(_MAX_ATTEMPTS_INVALID) @@ -95,115 +114,142 @@ def __init__( # noqa: PLR0913 — retry policy has many orthogonal knobs; a dat self.retry_methods = retry_methods self.respect_retry_after = respect_retry_after self.budget = budget if budget is not None else RetryBudget() - self._sleep = _sleep - async def __call__(self, request: httpx2.Request, next: AsyncNext) -> httpx2.Response: # noqa: A002, C901, PLR0912, PLR0915 — complexity budget: 3 error clauses + idempotency gate + streaming-body refusal + budget gate + Retry-After branch + backoff - """Process a request through the retry loop. See module docstring.""" + def decide( # noqa: C901 — complexity budget: classification + streaming-body refusal + exhaustion + Retry-After branch + budget gate + backoff + self, + *, + attempt: int, + request: httpx2.Request, + exc: BaseException, + ) -> float: + """Decide the next action after a retryable failure on `attempt`. + + Returns the delay to sleep before retrying, or raises the terminal + exception. `exc` is the currently-handled exception (one of + _RETRYABLE_EXCEPTIONS); see the class docstring for chaining semantics. + """ method_eligible = request.method.upper() in self.retry_methods - last_exc: BaseException | None = None - last_response: httpx2.Response | None = None + if isinstance(exc, StatusError): + retryable_status = exc.response.status_code in self.retry_status_codes + if not method_eligible or not retryable_status: + raise exc + last_response: httpx2.Response | None = exc.response + else: # NetworkError | TimeoutError + if not method_eligible: + raise exc + last_response = None + + # ---- retryable failure path + if request.extensions.get(STREAMING_BODY_MARKER): + exc.add_note(_STREAMING_BODY_REFUSAL_NOTE) + _emit_event( + _LOGGER, + "retry.streaming_refused", + level=logging.WARNING, + message="retry refused — request body is a stream that cannot replay", + attributes={ + "method": request.method, + "url": str(request.url), + "last_exception_type": type(exc).__qualname__, + }, + ) + raise exc + + if attempt + 1 >= self.max_attempts: + exc.add_note(f"httpware: gave up after {attempt + 1} attempts") + _emit_event( + _LOGGER, + "retry.giving_up", + level=logging.WARNING, + message=f"retry gave up after {attempt + 1} attempts", + attributes={ + "attempts": attempt + 1, + "method": request.method, + "url": str(request.url), + "last_status": last_response.status_code if last_response is not None else None, + "last_exception_type": type(exc).__qualname__, + }, + ) + raise exc + + retry_after: float | None = None + if self.respect_retry_after and last_response is not None: + header = last_response.headers.get("Retry-After") + if header is not None: + retry_after = _parse_retry_after(header) + + if retry_after is not None and retry_after > self.max_delay: + exc.add_note( + _RETRY_AFTER_EXCEEDS_MAX_DELAY_NOTE.format( + retry_after=retry_after, + max_delay=self.max_delay, + ), + ) + raise exc + + if not self.budget.try_withdraw(): + _emit_event( + _LOGGER, + "retry.budget_refused", + level=logging.WARNING, + message=f"retry budget refused after {attempt + 1} attempts", + attributes={ + "attempts": attempt + 1, + "method": request.method, + "url": str(request.url), + "last_status": last_response.status_code if last_response is not None else None, + }, + ) + raise RetryBudgetExhaustedError( + last_response=last_response, + last_exception=exc, + attempts=attempt + 1, + ) from exc + + if retry_after is not None: + return retry_after + return full_jitter_delay( + attempt, + base_delay=self.base_delay, + max_delay=self.max_delay, + ) + + +class AsyncRetry: + """Async retry middleware. See the module docstring for the default policy.""" + def __init__( # noqa: PLR0913 — retry policy has many orthogonal knobs; a dataclass would be worse + self, + *, + max_attempts: int = 3, + base_delay: float = 0.1, + max_delay: float = 5.0, + retry_status_codes: frozenset[int] = DEFAULT_RETRY_STATUS_CODES, + retry_methods: frozenset[str] = DEFAULT_IDEMPOTENT_METHODS, + respect_retry_after: bool = True, + budget: RetryBudget | None = None, + _sleep: Callable[[float], Awaitable[None]] = asyncio.sleep, + ) -> None: + self._policy = _RetryPolicy( + max_attempts=max_attempts, + base_delay=base_delay, + max_delay=max_delay, + retry_status_codes=retry_status_codes, + retry_methods=retry_methods, + respect_retry_after=respect_retry_after, + budget=budget, + ) + self.budget = self._policy.budget + self._sleep = _sleep + + async def __call__(self, request: httpx2.Request, next: AsyncNext) -> httpx2.Response: # noqa: A002 + """Process a request through the retry loop. See module docstring.""" self.budget.deposit() - for attempt in range(self.max_attempts): - is_last = attempt + 1 >= self.max_attempts + for attempt in range(self._policy.max_attempts): try: return await next(request) - except StatusError as exc: - retryable_status = exc.response.status_code in self.retry_status_codes - if not method_eligible or not retryable_status: - raise - last_exc = exc - last_response = exc.response - except (NetworkError, TimeoutError) as exc: - if not method_eligible: - raise - last_exc = exc - last_response = None - - # ---- retryable failure path - if request.extensions.get(STREAMING_BODY_MARKER): - if last_exc is None: # pragma: no cover — invariant from except branch - msg = "AsyncRetry: streaming-body refusal reached with no last_exc" - raise AssertionError(msg) - last_exc.add_note(_STREAMING_BODY_REFUSAL_NOTE) - _emit_event( - _LOGGER, - "retry.streaming_refused", - level=logging.WARNING, - message="retry refused — request body is a stream that cannot replay", - attributes={ - "method": request.method, - "url": str(request.url), - "last_exception_type": type(last_exc).__qualname__, - }, - ) - raise last_exc - - if is_last: - if last_exc is None: # pragma: no cover — structural invariant from except branch - msg = "AsyncRetry: last_exc unset on final attempt — unreachable" - raise AssertionError(msg) - last_exc.add_note(f"httpware: gave up after {attempt + 1} attempts") - _emit_event( - _LOGGER, - "retry.giving_up", - level=logging.WARNING, - message=f"retry gave up after {attempt + 1} attempts", - attributes={ - "attempts": attempt + 1, - "method": request.method, - "url": str(request.url), - "last_status": last_response.status_code if last_response is not None else None, - "last_exception_type": type(last_exc).__qualname__, - }, - ) - raise last_exc - - retry_after: float | None = None - if self.respect_retry_after and last_response is not None: - header = last_response.headers.get("Retry-After") - if header is not None: - retry_after = _parse_retry_after(header) - - if retry_after is not None and retry_after > self.max_delay: - if last_exc is None: # pragma: no cover — retry_after requires last_response which requires last_exc - msg = "AsyncRetry: retry_after path reached with no last_exc" - raise AssertionError(msg) - last_exc.add_note( - _RETRY_AFTER_EXCEEDS_MAX_DELAY_NOTE.format( - retry_after=retry_after, - max_delay=self.max_delay, - ), - ) - raise last_exc - - if not self.budget.try_withdraw(): - _emit_event( - _LOGGER, - "retry.budget_refused", - level=logging.WARNING, - message=f"retry budget refused after {attempt + 1} attempts", - attributes={ - "attempts": attempt + 1, - "method": request.method, - "url": str(request.url), - "last_status": last_response.status_code if last_response is not None else None, - }, - ) - raise RetryBudgetExhaustedError( - last_response=last_response, - last_exception=last_exc, - attempts=attempt + 1, - ) from last_exc - - if retry_after is not None: - delay = retry_after - else: - delay = full_jitter_delay( - attempt, - base_delay=self.base_delay, - max_delay=self.max_delay, - ) + except _RETRYABLE_EXCEPTIONS as exc: + delay = self._policy.decide(attempt=attempt, request=request, exc=exc) await self._sleep(delay) msg = "unreachable" # pragma: no cover @@ -225,124 +271,26 @@ def __init__( # noqa: PLR0913 — retry policy has many orthogonal knobs; a dat budget: RetryBudget | None = None, _sleep: Callable[[float], None] = time.sleep, ) -> None: - if max_attempts < 1: - raise ValueError(_MAX_ATTEMPTS_INVALID) - self.max_attempts = max_attempts - self.base_delay = base_delay - self.max_delay = max_delay - self.retry_status_codes = retry_status_codes - self.retry_methods = retry_methods - self.respect_retry_after = respect_retry_after - self.budget = budget if budget is not None else RetryBudget() + self._policy = _RetryPolicy( + max_attempts=max_attempts, + base_delay=base_delay, + max_delay=max_delay, + retry_status_codes=retry_status_codes, + retry_methods=retry_methods, + respect_retry_after=respect_retry_after, + budget=budget, + ) + self.budget = self._policy.budget self._sleep = _sleep - def __call__(self, request: httpx2.Request, next: Next) -> httpx2.Response: # noqa: A002, C901, PLR0912, PLR0915 — same complexity rationale as AsyncRetry + def __call__(self, request: httpx2.Request, next: Next) -> httpx2.Response: # noqa: A002 """Process a request through the sync retry loop. See AsyncRetry for full contract.""" - method_eligible = request.method.upper() in self.retry_methods - last_exc: BaseException | None = None - last_response: httpx2.Response | None = None - self.budget.deposit() - for attempt in range(self.max_attempts): - is_last = attempt + 1 >= self.max_attempts + for attempt in range(self._policy.max_attempts): try: return next(request) - except StatusError as exc: - retryable_status = exc.response.status_code in self.retry_status_codes - if not method_eligible or not retryable_status: - raise - last_exc = exc - last_response = exc.response - except (NetworkError, TimeoutError) as exc: - if not method_eligible: - raise - last_exc = exc - last_response = None - - # ---- retryable failure path - if request.extensions.get(STREAMING_BODY_MARKER): - if last_exc is None: # pragma: no cover — invariant from except branch - msg = "Retry: streaming-body refusal reached with no last_exc" - raise AssertionError(msg) - last_exc.add_note(_STREAMING_BODY_REFUSAL_NOTE) - _emit_event( - _LOGGER, - "retry.streaming_refused", - level=logging.WARNING, - message="retry refused — request body is a stream that cannot replay", - attributes={ - "method": request.method, - "url": str(request.url), - "last_exception_type": type(last_exc).__qualname__, - }, - ) - raise last_exc - - if is_last: - if last_exc is None: # pragma: no cover — structural invariant from except branch - msg = "Retry: last_exc unset on final attempt — unreachable" - raise AssertionError(msg) - last_exc.add_note(f"httpware: gave up after {attempt + 1} attempts") - _emit_event( - _LOGGER, - "retry.giving_up", - level=logging.WARNING, - message=f"retry gave up after {attempt + 1} attempts", - attributes={ - "attempts": attempt + 1, - "method": request.method, - "url": str(request.url), - "last_status": last_response.status_code if last_response is not None else None, - "last_exception_type": type(last_exc).__qualname__, - }, - ) - raise last_exc - - retry_after: float | None = None - if self.respect_retry_after and last_response is not None: - header = last_response.headers.get("Retry-After") - if header is not None: - retry_after = _parse_retry_after(header) - - if retry_after is not None and retry_after > self.max_delay: - if last_exc is None: # pragma: no cover — retry_after requires last_response which requires last_exc - msg = "Retry: retry_after path reached with no last_exc" - raise AssertionError(msg) - last_exc.add_note( - _RETRY_AFTER_EXCEEDS_MAX_DELAY_NOTE.format( - retry_after=retry_after, - max_delay=self.max_delay, - ), - ) - raise last_exc - - if not self.budget.try_withdraw(): - _emit_event( - _LOGGER, - "retry.budget_refused", - level=logging.WARNING, - message=f"retry budget refused after {attempt + 1} attempts", - attributes={ - "attempts": attempt + 1, - "method": request.method, - "url": str(request.url), - "last_status": last_response.status_code if last_response is not None else None, - }, - ) - raise RetryBudgetExhaustedError( - last_response=last_response, - last_exception=last_exc, - attempts=attempt + 1, - ) from last_exc - - if retry_after is not None: - delay = retry_after - else: - delay = full_jitter_delay( - attempt, - base_delay=self.base_delay, - max_delay=self.max_delay, - ) + except _RETRYABLE_EXCEPTIONS as exc: + delay = self._policy.decide(attempt=attempt, request=request, exc=exc) self._sleep(delay) msg = "unreachable" # pragma: no cover diff --git a/tests/test_retry_policy.py b/tests/test_retry_policy.py new file mode 100644 index 0000000..5185e62 --- /dev/null +++ b/tests/test_retry_policy.py @@ -0,0 +1,186 @@ +"""Seam-level tests for _RetryPolicy.decide. + +Drives the decision directly — no client, no MockTransport — across the full +matrix: classification, streaming-body refusal, exhaustion, Retry-After +handling, budget refusal, and the delay returned on a normal retry. The jitter +path is random, so it is asserted by bounds; the Retry-After path by exact value. +""" + +import httpx2 +import pytest + +from httpware._internal.status import STREAMING_BODY_MARKER +from httpware.errors import ( + NetworkError, + NotFoundError, + RetryBudgetExhaustedError, + ServiceUnavailableError, + StatusError, + TimeoutError, # noqa: A004 +) +from httpware.middleware.resilience.budget import RetryBudget +from httpware.middleware.resilience.retry import ( + DEFAULT_IDEMPOTENT_METHODS, + DEFAULT_RETRY_STATUS_CODES, + _RetryPolicy, +) + + +_URL = "https://example.test/x" +_BASE_DELAY = 0.1 +_MAX_DELAY = 5.0 +_RETRY_AFTER_HEADER = "2" +_RETRY_AFTER_SECONDS = 2.0 + + +def _policy( + *, + max_attempts: int = 3, + respect_retry_after: bool = True, + budget: RetryBudget | None = None, +) -> _RetryPolicy: + return _RetryPolicy( + max_attempts=max_attempts, + base_delay=_BASE_DELAY, + max_delay=_MAX_DELAY, + retry_status_codes=DEFAULT_RETRY_STATUS_CODES, + retry_methods=DEFAULT_IDEMPOTENT_METHODS, + respect_retry_after=respect_retry_after, + budget=budget, + ) + + +def _zero_budget() -> RetryBudget: + """Return a budget that always refuses withdrawal (floor=0, percent=0).""" + return RetryBudget(ttl=10.0, min_retries_per_sec=0.0, percent_can_retry=0.0) + + +def _request(method: str = "PUT", *, streaming: bool = False) -> httpx2.Request: + extensions = {STREAMING_BODY_MARKER: True} if streaming else None + return httpx2.Request(method, _URL, extensions=extensions) + + +def _status_exc( + status: int, + request: httpx2.Request, + *, + retry_after: str | None = None, +) -> StatusError: + headers = {"Retry-After": retry_after} if retry_after is not None else None + response = httpx2.Response(status, headers=headers, request=request) + cls = ServiceUnavailableError if status >= 500 else NotFoundError # noqa: PLR2004 + return cls(response) + + +def _notes(exc: BaseException) -> list[str]: + return list(getattr(exc, "__notes__", [])) + + +# ---- retryable failures return a sleep delay + + +def test_retryable_status_returns_delay_within_bounds() -> None: + request = _request("PUT") + exc = _status_exc(503, request) + delay = _policy().decide(attempt=0, request=request, exc=exc) + assert 0.0 <= delay <= _BASE_DELAY # full-jitter ceiling at attempt 0 is base_delay + + +def test_retryable_network_returns_delay_within_bounds() -> None: + request = _request("PUT") + delay = _policy().decide(attempt=0, request=request, exc=NetworkError("boom")) + assert 0.0 <= delay <= _BASE_DELAY + + +def test_retryable_timeout_returns_delay_within_bounds() -> None: + request = _request("PUT") + delay = _policy().decide(attempt=0, request=request, exc=TimeoutError("slow")) + assert 0.0 <= delay <= _BASE_DELAY + + +# ---- classification re-raises the original, untouched + + +def test_non_retryable_status_reraises_unwrapped() -> None: + request = _request("PUT") + exc = _status_exc(404, request) + with pytest.raises(NotFoundError) as ei: + _policy().decide(attempt=0, request=request, exc=exc) + assert ei.value is exc + assert _notes(ei.value) == [] + + +def test_non_eligible_method_reraises_unwrapped() -> None: + request = _request("POST") + exc = _status_exc(503, request) + with pytest.raises(ServiceUnavailableError) as ei: + _policy().decide(attempt=0, request=request, exc=exc) + assert ei.value is exc + assert _notes(ei.value) == [] + + +# ---- terminal raises carry the right note + + +def test_streaming_body_refused_with_note() -> None: + request = _request("PUT", streaming=True) + exc = _status_exc(503, request) + with pytest.raises(ServiceUnavailableError) as ei: + _policy().decide(attempt=0, request=request, exc=exc) + assert any("stream that cannot replay" in note for note in _notes(ei.value)) + + +def test_exhaustion_adds_gave_up_note() -> None: + request = _request("PUT") + exc = _status_exc(503, request) + with pytest.raises(ServiceUnavailableError) as ei: + _policy(max_attempts=3).decide(attempt=2, request=request, exc=exc) + assert any("gave up after 3 attempts" in note for note in _notes(ei.value)) + + +def test_retry_after_exceeding_max_delay_gives_up() -> None: + request = _request("PUT") + exc = _status_exc(503, request, retry_after="10") # > max_delay (5.0) + with pytest.raises(ServiceUnavailableError) as ei: + _policy().decide(attempt=0, request=request, exc=exc) + assert any("exceeded max_delay" in note for note in _notes(ei.value)) + + +# ---- Retry-After handling + + +def test_retry_after_within_max_delay_returned_exactly() -> None: + request = _request("PUT") + exc = _status_exc(503, request, retry_after=_RETRY_AFTER_HEADER) # <= max_delay + delay = _policy().decide(attempt=0, request=request, exc=exc) + assert delay == _RETRY_AFTER_SECONDS + + +def test_respect_retry_after_false_ignores_header() -> None: + request = _request("PUT") + exc = _status_exc(503, request, retry_after="2") + delay = _policy(respect_retry_after=False).decide(attempt=0, request=request, exc=exc) + assert delay <= _BASE_DELAY # jitter, not the 2.0 header value + + +# ---- budget refusal + + +def test_budget_refusal_raises_with_cause_and_fields() -> None: + request = _request("PUT") + exc = _status_exc(503, request) + policy = _policy(max_attempts=3, budget=_zero_budget()) + with pytest.raises(RetryBudgetExhaustedError) as ei: + policy.decide(attempt=0, request=request, exc=exc) + assert ei.value.attempts == 1 + assert ei.value.last_response is exc.response + assert ei.value.last_exception is exc + assert ei.value.__cause__ is exc + + +# ---- construction-time validation moved onto the policy + + +def test_invalid_max_attempts_rejected() -> None: + with pytest.raises(ValueError, match="max_attempts must be >= 1"): + _policy(max_attempts=0)