From 04ebafcbd75cad0635c6c3b6280fe3955a84880c Mon Sep 17 00:00:00 2001 From: skobeltsyn Date: Wed, 17 Jun 2026 11:10:04 +0300 Subject: [PATCH] feat(#4560): default transient-network retry in shared HTTP transport (sendBounded) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Network blips deserve a retry policy by default, not opt-in. The shared non-streaming transport (used by Claude, OpenAI + DeepSeek/Kimi/OpenRouter/Perplexity, Gemini, Ollama) now retries transient failures: - connection-level IOException (reset, refused, no-route, unexpected EOF), and - transient HTTP statuses (408/429/500/502/503/504). 3 attempts, exponential backoff (250ms->500ms). Matches OpenAI-SDK default behavior. Two deliberate exclusions: - HttpTimeoutException is NOT retried — the per-request timeout is the caller's TOTAL budget; retrying would silently multiply it (kept OllamaClientTimeoutTest's ~250ms elapsed bound intact). - the ORIGINAL exception type is preserved on exhaustion (rethrown, not wrapped) so onLLMError can still match `e is ConnectException`, etc. Sits below onLLMError/LlmErrorDecision (transport rides out blips; handler sees what survives). Streaming not retried (mid-stream re-issue would duplicate tokens). 6 tests (scripted fake HttpClient). docs/error-recovery.md + CHANGELOG updated. Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 17 +++ docs/error-recovery.md | 19 +++ .../model/HttpModelClientSupport.kt | 96 ++++++++++---- .../model/HttpModelClientSupportTest.kt | 123 ++++++++++++++++++ 4 files changed, 231 insertions(+), 24 deletions(-) create mode 100644 src/test/kotlin/agents_engine/model/HttpModelClientSupportTest.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index 2012b0c4..76c79da9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,23 @@ All notable changes to Agents.KT are documented here. The format follows [Keep a ## [Unreleased] +### Changed — default transient-network retry across all HTTP model providers (#4560) + +The shared non-streaming transport (`HttpModelClientSupport.sendBounded`, used by Claude, OpenAI + +DeepSeek/Kimi/OpenRouter/Perplexity, Gemini, and Ollama) now retries **transient** failures by default — +connection-level exceptions (`IOException`: connection reset, refused, no-route, unexpected EOF) and transient +HTTP statuses (408/429/500/502/503/504) — up to 3 attempts with exponential backoff (250ms→500ms). Previously +only Ollama retried; every other provider failed fast on a network blip unless you opted into +`onLLMError { Retry() }`. This matches the default behavior of official SDKs (e.g. OpenAI). Two deliberate +exclusions: **`HttpTimeoutException` is not retried** (the per-request `timeout` is your *total* budget — +retrying would silently multiply it; it surfaces immediately), and the **original exception type is preserved** +on exhaustion (rethrown as-is, not wrapped) so the agent-level `onLLMError`/`LlmErrorDecision` can still +pattern-match `e is ConnectException`. It sits **below** `onLLMError` (transport rides out blips first; the +handler sees only what survives, identity intact); on the final attempt a transient *status* is returned +unchanged so the per-provider parser still surfaces the provider's own error message. Streaming +(`sendChatStream`) is not retried (re-issuing mid-stream would duplicate delivered tokens — a connect-phase +follow-up). 6 tests (scripted fake `HttpClient`). + ### Added — seller-side x402 payments: `X402PaymentGate` (#4527, PRD §12.8) — experimental `X402PaymentGate(requirements, facilitator).gate(handler)` wraps any JDK `HttpHandler` so a resource is served diff --git a/docs/error-recovery.md b/docs/error-recovery.md index 937ff6a3..ce26d790 100644 --- a/docs/error-recovery.md +++ b/docs/error-recovery.md @@ -230,4 +230,23 @@ agent("analyst") { The handler does **not** fire for budget caps (`onBudgetExceeded` owns those) or cancellation, and v1 scopes recovery to the agentic loop — a model failure during multi-skill LLM routing still propagates loud. See [production-hardening.md](production-hardening.md) for the deployment checklist entry. +### Transport-layer retry (automatic, below `onLLMError`) — #4560 + +Before a failure ever reaches `onLLMError`, the shared HTTP transport (`HttpModelClientSupport.sendBounded`, used by **every** provider's non-streaming call — Claude, OpenAI + DeepSeek/Kimi/OpenRouter/Perplexity, Gemini, Ollama) already retries **transient** failures **by default**, no opt-in: + +- **connection-level exceptions** — an `IOException` from the send (connection reset, refused, no-route, unexpected EOF), and +- **transient HTTP statuses** — 408 / 429 / 500 / 502 / 503 / 504. + +Up to 3 attempts, exponential backoff (250ms → 500ms). This matches what official SDKs (e.g. OpenAI) do by default — a dropped connection or a 503 is the textbook retryable case, so you don't have to write a handler for it. + +Two deliberate exclusions: **`HttpTimeoutException` is not retried** — the per-request `timeout` is your *total* budget, and retrying would silently multiply it, so a timeout surfaces immediately. And the **original exception type is preserved** on exhaustion (rethrown as-is, not wrapped) — that's what lets the `onLLMError` handler above match `e is ConnectException`. + +The two layers compose, transport first: + +``` +http.send → [transport retry: conn-level IOException / 5xx, ×3] → raw exception → [onLLMError: your policy] → loop +``` + +So `onLLMError` sees only what survives the transport retries — use it for *semantic* recovery (a `RespondWith` fallback, a longer `Retry` schedule, escalation, or retrying a `HttpTimeoutException`), not for plain connection blips. Streaming (`sendChatStream`) is **not** transport-retried (re-issuing mid-stream would duplicate delivered tokens); wrap a streaming call in `onLLMError { Retry() }` or a `firstOf(...)` fallback if you need connect-phase resilience there. For higher-level recovery — fall over to another provider/model, or take the first of N samples — use `firstOf(a, b)` / `agent.speculative(n)`; to re-run until the output is valid, `loopUntil { … }` (see [composition.md](composition.md)). + --- diff --git a/src/main/kotlin/agents_engine/model/HttpModelClientSupport.kt b/src/main/kotlin/agents_engine/model/HttpModelClientSupport.kt index a077498a..d7fff8a1 100644 --- a/src/main/kotlin/agents_engine/model/HttpModelClientSupport.kt +++ b/src/main/kotlin/agents_engine/model/HttpModelClientSupport.kt @@ -1,38 +1,51 @@ package agents_engine.model +import java.io.IOException import java.net.http.HttpClient import java.net.http.HttpRequest import java.net.http.HttpResponse +import java.net.http.HttpTimeoutException /** - * `agents_engine/model/HttpModelClientSupport.kt` — #2792 shared - * transport seam for the JDK-HttpClient-backed provider adapters - * ([ClaudeClient], [OpenAiClient], [OllamaClient], and [DeepSeekClient] - * via OpenAi). Each adapter copy-pasted the same bounded-read + - * OOM-guard block; the duplication is concentrated here so a future - * transport-layer improvement (retry strategy, distributed tracing - * span propagation, mTLS) is a one-place edit. + * `agents_engine/model/HttpModelClientSupport.kt` — #2792 shared transport seam for the + * JDK-HttpClient-backed provider adapters ([ClaudeClient], [OpenAiClient] (+ DeepSeek/Kimi/OpenRouter/ + * Perplexity via it), [GeminiClient], [OllamaClient]). Concentrates the bounded-read + OOM-guard and — + * since #4560 — the **transient-network retry policy**, so transport-layer improvements are one-place edits. * - * Scope is intentionally small for the 0.6.x line: just the - * "POST a JSON body, read a bounded response, surface a provider- - * tagged LlmProviderException on overflow" pattern. The per-adapter - * `HttpClient` instances still live in each client (their - * `connectTimeout` differs across providers because users can tune - * it per `model { }` block); future passes can lift more shape here - * if more providers join. + * **Transient retry (#4560).** [sendBounded] retries the *non-streaming* request on: + * - **connection-level exceptions** — an [IOException] from `http.send` (connection reset, refused, + * no-route, unexpected EOF), and + * - **transient HTTP statuses** — 408 / 429 / 500 / 502 / 503 / 504. + * + * Up to [MAX_ATTEMPTS] tries with exponential backoff (`INITIAL_BACKOFF_MS * 2^attempt`). This matches the + * behavior official SDKs (e.g. OpenAI) apply by default — a dropped connection or a 503 is the textbook + * retryable case, so callers get it without opting in. + * + * Two deliberate exclusions: + * - **[HttpTimeoutException] is NOT retried** — the per-request `timeout` is the caller's *total* budget; + * retrying would silently multiply it. It propagates immediately, fast and raw. + * - **the original exception type is preserved** on exhaustion (rethrown as-is, not wrapped), so the + * agent-level `onLLMError`/[LlmErrorDecision] handler can still pattern-match `e is ConnectException` etc. + * + * It sits **below** `onLLMError`: the transport rides out blips first; a persistent failure surfaces to the + * handler with its identity intact. On the final attempt a transient *status* is NOT converted to an error + * here — the body is returned unchanged so the per-provider parser still surfaces the provider's own message. + * + * Streaming (`sendChatStream`) is deliberately not retried here — re-issuing mid-stream would duplicate + * already-delivered tokens; connect-phase streaming retry is a separate follow-up. */ internal object HttpModelClientSupport { + const val MAX_ATTEMPTS: Int = 3 + const val INITIAL_BACKOFF_MS: Long = 250L + // 408 Request Timeout, 429 Too Many Requests, 500/502/503/504 server/gateway errors. + @Suppress("MagicNumber") + private val TRANSIENT_STATUSES: Set = setOf(408, 429, 500, 502, 503, 504) + /** - * Sends [request] using [http], reads at most [maxResponseBytes] - * (+1 sentinel) of the response body, and throws - * [LlmProviderException] tagged with [providerLabel] when the - * response would exceed the cap. Returns the UTF-8 decoded body. - * - * The +1 sentinel + post-read compare is intentional — `readNBytes(N)` - * may legitimately return exactly N bytes for an N-byte response, so - * we read one more than the cap to disambiguate "exactly at cap" from - * "would have exceeded". + * Sends [request] using [http] (with transient retry, see class doc), reads at most [maxResponseBytes] + * (+1 sentinel) of the response body, and throws [LlmProviderException] tagged with [providerLabel] when + * the response would exceed the cap. Returns the UTF-8 decoded body. */ fun sendBounded( http: HttpClient, @@ -40,8 +53,39 @@ internal object HttpModelClientSupport { providerLabel: String, maxResponseBytes: Long, ): String { - val response = http.send(request, HttpResponse.BodyHandlers.ofInputStream()) + repeat(MAX_ATTEMPTS) { attempt -> + val lastAttempt = attempt == MAX_ATTEMPTS - 1 + val response: HttpResponse = try { + http.send(request, HttpResponse.BodyHandlers.ofInputStream()) + } catch (e: HttpTimeoutException) { + throw e // the per-request timeout is the caller's total budget — never multiply it by retrying + } catch (e: IOException) { + // Connection-level failure (reset, refused, no route, unexpected EOF). Rethrow the ORIGINAL + // on exhaustion so onLLMError can still match `e is ConnectException`, etc. + if (lastAttempt) throw e + backoff(attempt) + return@repeat + } + // Transient server/throttling status: retry unless we're out of attempts. On the final attempt + // fall through and return the body so the per-provider parser surfaces the real error message. + if (!lastAttempt && response.statusCode() in TRANSIENT_STATUSES) { + response.body().close() + backoff(attempt) + return@repeat + } + return readBounded(response, providerLabel, maxResponseBytes) + } + error("sendBounded retry loop exited without a result") // unreachable: last attempt returns or throws + } + + private fun readBounded( + response: HttpResponse, + providerLabel: String, + maxResponseBytes: Long, + ): String { val cap = maxResponseBytes.coerceAtMost(Int.MAX_VALUE.toLong()).toInt() + // +1 sentinel: readNBytes(N) may return exactly N for an N-byte body, so read one more to + // disambiguate "exactly at cap" from "would have exceeded". val bytes = response.body().use { it.readNBytes(cap + 1) } if (bytes.size > cap) { throw LlmProviderException( @@ -50,4 +94,8 @@ internal object HttpModelClientSupport { } return String(bytes, Charsets.UTF_8) } + + private fun backoff(attempt: Int) { + Thread.sleep(INITIAL_BACKOFF_MS shl attempt) + } } diff --git a/src/test/kotlin/agents_engine/model/HttpModelClientSupportTest.kt b/src/test/kotlin/agents_engine/model/HttpModelClientSupportTest.kt new file mode 100644 index 00000000..d816d836 --- /dev/null +++ b/src/test/kotlin/agents_engine/model/HttpModelClientSupportTest.kt @@ -0,0 +1,123 @@ +package agents_engine.model + +import java.io.ByteArrayInputStream +import java.io.IOException +import java.io.InputStream +import java.net.URI +import java.net.http.HttpClient +import java.net.http.HttpHeaders +import java.net.http.HttpRequest +import java.net.http.HttpResponse +import java.net.http.HttpTimeoutException +import java.util.Optional +import javax.net.ssl.SSLContext +import javax.net.ssl.SSLParameters +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue + +// #4560 — default transient-network retry in the shared HTTP transport. Hermetic: a scripted fake +// HttpClient drives sendBounded through retry on IOException / transient status, no-retry on 4xx, and +// exhaustion. All provider adapters inherit this via HttpModelClientSupport.sendBounded. +class HttpModelClientSupportTest { + + private fun ok(body: String) = FakeResponse(200, body) + private fun status(code: Int) = FakeResponse(code, """{"error":"$code"}""") + private val request = HttpRequest.newBuilder().uri(URI.create("http://test/")).GET().build() + + private fun send(vararg steps: () -> HttpResponse): Pair { + val client = ScriptedHttpClient(steps.toMutableList()) + val body = HttpModelClientSupport.sendBounded(client, request, "Test", 4096) + return body to client.calls + } + + @Test + fun `retries network exceptions then succeeds`() { + val (body, calls) = send( + { throw IOException("connection reset") }, + { throw IOException("timeout") }, + { ok("done") }, + ) + assertEquals("done", body) + assertEquals(3, calls) + } + + @Test + fun `retries a transient status then succeeds`() { + val (body, calls) = send({ status(503) }, { ok("recovered") }) + assertEquals("recovered", body) + assertEquals(2, calls) + } + + @Test + fun `rethrows the original exception (type preserved) after exhausting attempts`() { + val reset = { throw IOException("reset") } + val e = assertFailsWith { send(reset, reset, reset) } + assertEquals("reset", e.message) // original preserved so onLLMError can match e is ConnectException, etc. + } + + @Test + fun `does not retry a timeout (respects the configured per-request budget)`() { + val client = ScriptedHttpClient(mutableListOf({ throw HttpTimeoutException("timed out") }, { ok("nope") })) + assertFailsWith { HttpModelClientSupport.sendBounded(client, request, "Test", 4096) } + assertEquals(1, client.calls, "a timeout must surface immediately, not be retried") + } + + @Test + fun `does not retry a non-transient 4xx and returns its body for downstream parsing`() { + val (body, calls) = send({ status(400) }, { ok("should-not-reach") }) + assertTrue("400" in body, body) + assertEquals(1, calls, "4xx must not be retried") + } + + @Test + fun `returns the final transient-status body instead of masking the provider error`() { + // 503 every time → after MAX_ATTEMPTS, the last body is returned so the per-provider parser can + // surface the real error message rather than a generic transport error. + val (body, calls) = send({ status(503) }, { status(503) }, { status(503) }) + assertTrue("503" in body, body) + assertEquals(3, calls) + } + + // --- test doubles --- + + private class FakeResponse(private val status: Int, body: String) : HttpResponse { + private val bytes = body.toByteArray() + override fun statusCode() = status + override fun body(): InputStream = ByteArrayInputStream(bytes) + override fun request(): HttpRequest = HttpRequest.newBuilder().uri(URI.create("http://test/")).build() + override fun previousResponse(): Optional> = Optional.empty() + override fun headers(): HttpHeaders = HttpHeaders.of(emptyMap()) { _, _ -> true } + override fun sslSession() = Optional.empty() + override fun uri(): URI = URI.create("http://test/") + override fun version(): HttpClient.Version = HttpClient.Version.HTTP_1_1 + } + + private class ScriptedHttpClient(private val steps: MutableList<() -> HttpResponse>) : HttpClient() { + var calls = 0 + + @Suppress("UNCHECKED_CAST") + override fun send(req: HttpRequest, handler: HttpResponse.BodyHandler): HttpResponse { + calls++ + return steps.removeAt(0).invoke() as HttpResponse + } + + override fun cookieHandler() = Optional.empty() + override fun connectTimeout() = Optional.empty() + override fun followRedirects(): Redirect = Redirect.NEVER + override fun proxy() = Optional.empty() + override fun sslContext(): SSLContext = SSLContext.getDefault() + override fun sslParameters(): SSLParameters = SSLParameters() + override fun authenticator() = Optional.empty() + override fun version(): Version = Version.HTTP_1_1 + override fun executor() = Optional.empty() + override fun sendAsync(req: HttpRequest, handler: HttpResponse.BodyHandler) = error("unused") + override fun sendAsync( + req: HttpRequest, + handler: HttpResponse.BodyHandler, + push: HttpResponse.PushPromiseHandler?, + ) = error("unused") + override fun newWebSocketBuilder() = error("unused") + } +}