Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@ All notable changes to Agents.KT are documented here. The format follows [Keep a

## [Unreleased]

### Changed — default transient-network retry across all HTTP model providers (#4560)

The shared non-streaming transport (`HttpModelClientSupport.sendBounded`, used by Claude, OpenAI +
DeepSeek/Kimi/OpenRouter/Perplexity, Gemini, and Ollama) now retries **transient** failures by default —
connection-level exceptions (`IOException`: connection reset, refused, no-route, unexpected EOF) and transient
HTTP statuses (408/429/500/502/503/504) — up to 3 attempts with exponential backoff (250ms→500ms). Previously
only Ollama retried; every other provider failed fast on a network blip unless you opted into
`onLLMError { Retry() }`. This matches the default behavior of official SDKs (e.g. OpenAI). Two deliberate
exclusions: **`HttpTimeoutException` is not retried** (the per-request `timeout` is your *total* budget —
retrying would silently multiply it; it surfaces immediately), and the **original exception type is preserved**
on exhaustion (rethrown as-is, not wrapped) so the agent-level `onLLMError`/`LlmErrorDecision` can still
pattern-match `e is ConnectException`. It sits **below** `onLLMError` (transport rides out blips first; the
handler sees only what survives, identity intact); on the final attempt a transient *status* is returned
unchanged so the per-provider parser still surfaces the provider's own error message. Streaming
(`sendChatStream`) is not retried (re-issuing mid-stream would duplicate delivered tokens — a connect-phase
follow-up). 6 tests (scripted fake `HttpClient`).

### Added — seller-side x402 payments: `X402PaymentGate` (#4527, PRD §12.8) — experimental

`X402PaymentGate(requirements, facilitator).gate(handler)` wraps any JDK `HttpHandler` so a resource is served
Expand Down
19 changes: 19 additions & 0 deletions docs/error-recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,23 @@ agent<String, Report>("analyst") {

The handler does **not** fire for budget caps (`onBudgetExceeded` owns those) or cancellation, and v1 scopes recovery to the agentic loop — a model failure during multi-skill LLM routing still propagates loud. See [production-hardening.md](production-hardening.md) for the deployment checklist entry.

### Transport-layer retry (automatic, below `onLLMError`) — #4560

Before a failure ever reaches `onLLMError`, the shared HTTP transport (`HttpModelClientSupport.sendBounded`, used by **every** provider's non-streaming call — Claude, OpenAI + DeepSeek/Kimi/OpenRouter/Perplexity, Gemini, Ollama) already retries **transient** failures **by default**, no opt-in:

- **connection-level exceptions** — an `IOException` from the send (connection reset, refused, no-route, unexpected EOF), and
- **transient HTTP statuses** — 408 / 429 / 500 / 502 / 503 / 504.

Up to 3 attempts, exponential backoff (250ms → 500ms). This matches what official SDKs (e.g. OpenAI) do by default — a dropped connection or a 503 is the textbook retryable case, so you don't have to write a handler for it.

Two deliberate exclusions: **`HttpTimeoutException` is not retried** — the per-request `timeout` is your *total* budget, and retrying would silently multiply it, so a timeout surfaces immediately. And the **original exception type is preserved** on exhaustion (rethrown as-is, not wrapped) — that's what lets the `onLLMError` handler above match `e is ConnectException`.

The two layers compose, transport first:

```
http.send → [transport retry: conn-level IOException / 5xx, ×3] → raw exception → [onLLMError: your policy] → loop
```

So `onLLMError` sees only what survives the transport retries — use it for *semantic* recovery (a `RespondWith` fallback, a longer `Retry` schedule, escalation, or retrying a `HttpTimeoutException`), not for plain connection blips. Streaming (`sendChatStream`) is **not** transport-retried (re-issuing mid-stream would duplicate delivered tokens); wrap a streaming call in `onLLMError { Retry() }` or a `firstOf(...)` fallback if you need connect-phase resilience there. For higher-level recovery — fall over to another provider/model, or take the first of N samples — use `firstOf(a, b)` / `agent.speculative(n)`; to re-run until the output is valid, `loopUntil { … }` (see [composition.md](composition.md)).

---
96 changes: 72 additions & 24 deletions src/main/kotlin/agents_engine/model/HttpModelClientSupport.kt
Original file line number Diff line number Diff line change
@@ -1,47 +1,91 @@
package agents_engine.model

import java.io.IOException
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.net.http.HttpTimeoutException

/**
* `agents_engine/model/HttpModelClientSupport.kt` — #2792 shared
* transport seam for the JDK-HttpClient-backed provider adapters
* ([ClaudeClient], [OpenAiClient], [OllamaClient], and [DeepSeekClient]
* via OpenAi). Each adapter copy-pasted the same bounded-read +
* OOM-guard block; the duplication is concentrated here so a future
* transport-layer improvement (retry strategy, distributed tracing
* span propagation, mTLS) is a one-place edit.
* `agents_engine/model/HttpModelClientSupport.kt` — #2792 shared transport seam for the
* JDK-HttpClient-backed provider adapters ([ClaudeClient], [OpenAiClient] (+ DeepSeek/Kimi/OpenRouter/
* Perplexity via it), [GeminiClient], [OllamaClient]). Concentrates the bounded-read + OOM-guard and —
* since #4560 — the **transient-network retry policy**, so transport-layer improvements are one-place edits.
*
* Scope is intentionally small for the 0.6.x line: just the
* "POST a JSON body, read a bounded response, surface a provider-
* tagged LlmProviderException on overflow" pattern. The per-adapter
* `HttpClient` instances still live in each client (their
* `connectTimeout` differs across providers because users can tune
* it per `model { }` block); future passes can lift more shape here
* if more providers join.
* **Transient retry (#4560).** [sendBounded] retries the *non-streaming* request on:
* - **connection-level exceptions** — an [IOException] from `http.send` (connection reset, refused,
* no-route, unexpected EOF), and
* - **transient HTTP statuses** — 408 / 429 / 500 / 502 / 503 / 504.
*
* Up to [MAX_ATTEMPTS] tries with exponential backoff (`INITIAL_BACKOFF_MS * 2^attempt`). This matches the
* behavior official SDKs (e.g. OpenAI) apply by default — a dropped connection or a 503 is the textbook
* retryable case, so callers get it without opting in.
*
* Two deliberate exclusions:
* - **[HttpTimeoutException] is NOT retried** — the per-request `timeout` is the caller's *total* budget;
* retrying would silently multiply it. It propagates immediately, fast and raw.
* - **the original exception type is preserved** on exhaustion (rethrown as-is, not wrapped), so the
* agent-level `onLLMError`/[LlmErrorDecision] handler can still pattern-match `e is ConnectException` etc.
*
* It sits **below** `onLLMError`: the transport rides out blips first; a persistent failure surfaces to the
* handler with its identity intact. On the final attempt a transient *status* is NOT converted to an error
* here — the body is returned unchanged so the per-provider parser still surfaces the provider's own message.
*
* Streaming (`sendChatStream`) is deliberately not retried here — re-issuing mid-stream would duplicate
* already-delivered tokens; connect-phase streaming retry is a separate follow-up.
*/
internal object HttpModelClientSupport {

const val MAX_ATTEMPTS: Int = 3
const val INITIAL_BACKOFF_MS: Long = 250L
// 408 Request Timeout, 429 Too Many Requests, 500/502/503/504 server/gateway errors.
@Suppress("MagicNumber")
private val TRANSIENT_STATUSES: Set<Int> = setOf(408, 429, 500, 502, 503, 504)

/**
* Sends [request] using [http], reads at most [maxResponseBytes]
* (+1 sentinel) of the response body, and throws
* [LlmProviderException] tagged with [providerLabel] when the
* response would exceed the cap. Returns the UTF-8 decoded body.
*
* The +1 sentinel + post-read compare is intentional — `readNBytes(N)`
* may legitimately return exactly N bytes for an N-byte response, so
* we read one more than the cap to disambiguate "exactly at cap" from
* "would have exceeded".
* Sends [request] using [http] (with transient retry, see class doc), reads at most [maxResponseBytes]
* (+1 sentinel) of the response body, and throws [LlmProviderException] tagged with [providerLabel] when
* the response would exceed the cap. Returns the UTF-8 decoded body.
*/
fun sendBounded(
http: HttpClient,
request: HttpRequest,
providerLabel: String,
maxResponseBytes: Long,
): String {
val response = http.send(request, HttpResponse.BodyHandlers.ofInputStream())
repeat(MAX_ATTEMPTS) { attempt ->
val lastAttempt = attempt == MAX_ATTEMPTS - 1
val response: HttpResponse<java.io.InputStream> = try {
http.send(request, HttpResponse.BodyHandlers.ofInputStream())
} catch (e: HttpTimeoutException) {
throw e // the per-request timeout is the caller's total budget — never multiply it by retrying
} catch (e: IOException) {
// Connection-level failure (reset, refused, no route, unexpected EOF). Rethrow the ORIGINAL
// on exhaustion so onLLMError can still match `e is ConnectException`, etc.
if (lastAttempt) throw e
backoff(attempt)
return@repeat
}
// Transient server/throttling status: retry unless we're out of attempts. On the final attempt
// fall through and return the body so the per-provider parser surfaces the real error message.
if (!lastAttempt && response.statusCode() in TRANSIENT_STATUSES) {
response.body().close()
backoff(attempt)
return@repeat
}
return readBounded(response, providerLabel, maxResponseBytes)
}
error("sendBounded retry loop exited without a result") // unreachable: last attempt returns or throws
}

private fun readBounded(
response: HttpResponse<java.io.InputStream>,
providerLabel: String,
maxResponseBytes: Long,
): String {
val cap = maxResponseBytes.coerceAtMost(Int.MAX_VALUE.toLong()).toInt()
// +1 sentinel: readNBytes(N) may return exactly N for an N-byte body, so read one more to
// disambiguate "exactly at cap" from "would have exceeded".
val bytes = response.body().use { it.readNBytes(cap + 1) }
if (bytes.size > cap) {
throw LlmProviderException(
Expand All @@ -50,4 +94,8 @@ internal object HttpModelClientSupport {
}
return String(bytes, Charsets.UTF_8)
}

private fun backoff(attempt: Int) {
Thread.sleep(INITIAL_BACKOFF_MS shl attempt)
}
}
123 changes: 123 additions & 0 deletions src/test/kotlin/agents_engine/model/HttpModelClientSupportTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package agents_engine.model

import java.io.ByteArrayInputStream
import java.io.IOException
import java.io.InputStream
import java.net.URI
import java.net.http.HttpClient
import java.net.http.HttpHeaders
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.net.http.HttpTimeoutException
import java.util.Optional
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLParameters
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue

// #4560 — default transient-network retry in the shared HTTP transport. Hermetic: a scripted fake
// HttpClient drives sendBounded through retry on IOException / transient status, no-retry on 4xx, and
// exhaustion. All provider adapters inherit this via HttpModelClientSupport.sendBounded.
class HttpModelClientSupportTest {

private fun ok(body: String) = FakeResponse(200, body)
private fun status(code: Int) = FakeResponse(code, """{"error":"$code"}""")
private val request = HttpRequest.newBuilder().uri(URI.create("http://test/")).GET().build()

private fun send(vararg steps: () -> HttpResponse<InputStream>): Pair<String, Int> {
val client = ScriptedHttpClient(steps.toMutableList())
val body = HttpModelClientSupport.sendBounded(client, request, "Test", 4096)
return body to client.calls
}

@Test
fun `retries network exceptions then succeeds`() {
val (body, calls) = send(
{ throw IOException("connection reset") },
{ throw IOException("timeout") },
{ ok("done") },
)
assertEquals("done", body)
assertEquals(3, calls)
}

@Test
fun `retries a transient status then succeeds`() {
val (body, calls) = send({ status(503) }, { ok("recovered") })
assertEquals("recovered", body)
assertEquals(2, calls)
}

@Test
fun `rethrows the original exception (type preserved) after exhausting attempts`() {
val reset = { throw IOException("reset") }
val e = assertFailsWith<IOException> { send(reset, reset, reset) }
assertEquals("reset", e.message) // original preserved so onLLMError can match e is ConnectException, etc.
}

@Test
fun `does not retry a timeout (respects the configured per-request budget)`() {
val client = ScriptedHttpClient(mutableListOf({ throw HttpTimeoutException("timed out") }, { ok("nope") }))
assertFailsWith<HttpTimeoutException> { HttpModelClientSupport.sendBounded(client, request, "Test", 4096) }
assertEquals(1, client.calls, "a timeout must surface immediately, not be retried")
}

@Test
fun `does not retry a non-transient 4xx and returns its body for downstream parsing`() {
val (body, calls) = send({ status(400) }, { ok("should-not-reach") })
assertTrue("400" in body, body)
assertEquals(1, calls, "4xx must not be retried")
}

@Test
fun `returns the final transient-status body instead of masking the provider error`() {
// 503 every time → after MAX_ATTEMPTS, the last body is returned so the per-provider parser can
// surface the real error message rather than a generic transport error.
val (body, calls) = send({ status(503) }, { status(503) }, { status(503) })
assertTrue("503" in body, body)
assertEquals(3, calls)
}

// --- test doubles ---

private class FakeResponse(private val status: Int, body: String) : HttpResponse<InputStream> {
private val bytes = body.toByteArray()
override fun statusCode() = status
override fun body(): InputStream = ByteArrayInputStream(bytes)
override fun request(): HttpRequest = HttpRequest.newBuilder().uri(URI.create("http://test/")).build()
override fun previousResponse(): Optional<HttpResponse<InputStream>> = Optional.empty()
override fun headers(): HttpHeaders = HttpHeaders.of(emptyMap()) { _, _ -> true }
override fun sslSession() = Optional.empty<javax.net.ssl.SSLSession>()
override fun uri(): URI = URI.create("http://test/")
override fun version(): HttpClient.Version = HttpClient.Version.HTTP_1_1
}

private class ScriptedHttpClient(private val steps: MutableList<() -> HttpResponse<InputStream>>) : HttpClient() {
var calls = 0

@Suppress("UNCHECKED_CAST")
override fun <T> send(req: HttpRequest, handler: HttpResponse.BodyHandler<T>): HttpResponse<T> {
calls++
return steps.removeAt(0).invoke() as HttpResponse<T>
}

override fun cookieHandler() = Optional.empty<java.net.CookieHandler>()
override fun connectTimeout() = Optional.empty<java.time.Duration>()
override fun followRedirects(): Redirect = Redirect.NEVER
override fun proxy() = Optional.empty<java.net.ProxySelector>()
override fun sslContext(): SSLContext = SSLContext.getDefault()
override fun sslParameters(): SSLParameters = SSLParameters()
override fun authenticator() = Optional.empty<java.net.Authenticator>()
override fun version(): Version = Version.HTTP_1_1
override fun executor() = Optional.empty<java.util.concurrent.Executor>()
override fun <T> sendAsync(req: HttpRequest, handler: HttpResponse.BodyHandler<T>) = error("unused")
override fun <T> sendAsync(
req: HttpRequest,
handler: HttpResponse.BodyHandler<T>,
push: HttpResponse.PushPromiseHandler<T>?,
) = error("unused")
override fun newWebSocketBuilder() = error("unused")
}
}
Loading