From b4c9dccb741754348f14742a33412cfcb2b7e93b Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 17 Jun 2026 06:30:32 +0300 Subject: [PATCH 1/4] feat: add async retry and bearer-auth pipeline steps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the AsyncHttpStep counterparts for the RETRY and AUTH pillar stages so async calls get the same retry and authentication behaviour as the synchronous pipeline. DefaultAsyncRetryStep mirrors DefaultRetryStep's policy exactly — it reuses HttpRetryOptions, the shared BackoffCalculator, RetryAfterParser, the same Retry-After header set, and the same idempotency-aware re-sendability gating. Backoff delays are scheduled on a ScheduledExecutorService via Futures.delay rather than blocking a thread, and the retry loop is driven by an iterative trampoline (no per-attempt thenCompose recursion), so a long zero-delay retry sequence stays stack-safe. AsyncBearerTokenAuthStep stamps Authorization: Bearer via a new non-blocking BearerTokenProvider.fetchAsync seam. A token that is still valid but inside the refresh margin is returned and stamped immediately while a refresh runs off-thread; concurrent requests that observe an expiring or missing token share a single in-flight fetch (single-flight) so they don't stampede the token endpoint. The HTTPS guard, cross-origin credential suppression, and 401-challenge token eviction match the synchronous BearerTokenAuthStep. A ManualScheduler test fixture drives the scheduled delays deterministically so the retry tests run without real sleeps. --- sdk-core/api/sdk-core.api | 41 ++ .../sdk/core/http/auth/BearerTokenProvider.kt | 43 ++ .../core/http/pipeline/steps/AsyncAuthStep.kt | 150 +++++++ .../steps/AsyncBearerTokenAuthStep.kt | 276 ++++++++++++ .../http/pipeline/steps/AsyncRetryStep.kt | 26 ++ .../pipeline/steps/DefaultAsyncRetryStep.kt | 398 ++++++++++++++++++ .../steps/AsyncBearerTokenAuthStepTest.kt | 329 +++++++++++++++ .../steps/DefaultAsyncRetryStepTest.kt | 383 +++++++++++++++++ .../sdk/core/testing/ManualScheduler.kt | 192 +++++++++ 9 files changed, 1838 insertions(+) create mode 100644 sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt create mode 100644 sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt create mode 100644 sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncRetryStep.kt create mode 100644 sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt create mode 100644 sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStepTest.kt create mode 100644 sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStepTest.kt create mode 100644 sdk-core/src/testFixtures/kotlin/org/dexpace/sdk/core/testing/ManualScheduler.kt diff --git a/sdk-core/api/sdk-core.api b/sdk-core/api/sdk-core.api index 7ebcc312..023f2296 100644 --- a/sdk-core/api/sdk-core.api +++ b/sdk-core/api/sdk-core.api @@ -138,10 +138,14 @@ public final class org/dexpace/sdk/core/http/auth/BearerToken : org/dexpace/sdk/ public abstract interface class org/dexpace/sdk/core/http/auth/BearerTokenProvider { public fun fetch (Ljava/util/List;)Lorg/dexpace/sdk/core/http/auth/BearerToken; public abstract fun fetch (Ljava/util/List;Ljava/util/Map;)Lorg/dexpace/sdk/core/http/auth/BearerToken; + public fun fetchAsync (Ljava/util/List;)Ljava/util/concurrent/CompletableFuture; + public fun fetchAsync (Ljava/util/List;Ljava/util/Map;)Ljava/util/concurrent/CompletableFuture; } public final class org/dexpace/sdk/core/http/auth/BearerTokenProvider$DefaultImpls { public static fun fetch (Lorg/dexpace/sdk/core/http/auth/BearerTokenProvider;Ljava/util/List;)Lorg/dexpace/sdk/core/http/auth/BearerToken; + public static fun fetchAsync (Lorg/dexpace/sdk/core/http/auth/BearerTokenProvider;Ljava/util/List;)Ljava/util/concurrent/CompletableFuture; + public static fun fetchAsync (Lorg/dexpace/sdk/core/http/auth/BearerTokenProvider;Ljava/util/List;Ljava/util/Map;)Ljava/util/concurrent/CompletableFuture; } public abstract interface class org/dexpace/sdk/core/http/auth/ChallengeHandler { @@ -747,6 +751,30 @@ public final class org/dexpace/sdk/core/http/pipeline/Stage : java/lang/Enum { public static fun values ()[Lorg/dexpace/sdk/core/http/pipeline/Stage; } +public abstract class org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep : org/dexpace/sdk/core/http/pipeline/AsyncHttpStep { + public fun ()V + protected abstract fun authorizeRequestAsync (Lorg/dexpace/sdk/core/http/request/Request;)Ljava/util/concurrent/CompletableFuture; + protected fun authorizeRequestOnChallengeAsync (Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/http/response/Response;)Ljava/util/concurrent/CompletableFuture; + public final fun getStage ()Lorg/dexpace/sdk/core/http/pipeline/Stage; + public final fun processAsync (Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/http/pipeline/AsyncPipelineNext;)Ljava/util/concurrent/CompletableFuture; +} + +public class org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep : org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep { + public fun (Lorg/dexpace/sdk/core/http/auth/BearerTokenProvider;Ljava/util/List;)V + public fun (Lorg/dexpace/sdk/core/http/auth/BearerTokenProvider;Ljava/util/List;Ljava/time/Duration;)V + public fun (Lorg/dexpace/sdk/core/http/auth/BearerTokenProvider;Ljava/util/List;Ljava/time/Duration;Lorg/dexpace/sdk/core/util/Clock;)V + public fun (Lorg/dexpace/sdk/core/http/auth/BearerTokenProvider;Ljava/util/List;Ljava/time/Duration;Lorg/dexpace/sdk/core/util/Clock;Lorg/dexpace/sdk/core/instrumentation/ClientLogger;)V + public synthetic fun (Lorg/dexpace/sdk/core/http/auth/BearerTokenProvider;Ljava/util/List;Ljava/time/Duration;Lorg/dexpace/sdk/core/util/Clock;Lorg/dexpace/sdk/core/instrumentation/ClientLogger;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + protected fun authorizeRequestAsync (Lorg/dexpace/sdk/core/http/request/Request;)Ljava/util/concurrent/CompletableFuture; + protected fun authorizeRequestOnChallengeAsync (Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/http/response/Response;)Ljava/util/concurrent/CompletableFuture; + protected fun bearerHeaderValue (Ljava/lang/String;)Ljava/lang/String; +} + +public abstract class org/dexpace/sdk/core/http/pipeline/steps/AsyncRetryStep : org/dexpace/sdk/core/http/pipeline/AsyncHttpStep { + public fun ()V + public final fun getStage ()Lorg/dexpace/sdk/core/http/pipeline/Stage; +} + public abstract class org/dexpace/sdk/core/http/pipeline/steps/AuthStep : org/dexpace/sdk/core/http/pipeline/HttpStep { public fun ()V protected abstract fun authorizeRequest (Lorg/dexpace/sdk/core/http/request/Request;)Lorg/dexpace/sdk/core/http/request/Request; @@ -775,6 +803,19 @@ public final class org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrume public fun processAsync (Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/http/pipeline/AsyncPipelineNext;)Ljava/util/concurrent/CompletableFuture; } +public class org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep : org/dexpace/sdk/core/http/pipeline/steps/AsyncRetryStep { + public static final field Companion Lorg/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep$Companion; + public fun (Ljava/util/concurrent/ScheduledExecutorService;)V + public fun (Ljava/util/concurrent/ScheduledExecutorService;Lorg/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions;)V + public fun (Ljava/util/concurrent/ScheduledExecutorService;Lorg/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions;Lorg/dexpace/sdk/core/util/Clock;)V + public fun (Ljava/util/concurrent/ScheduledExecutorService;Lorg/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions;Lorg/dexpace/sdk/core/util/Clock;Lorg/dexpace/sdk/core/instrumentation/ClientLogger;)V + public synthetic fun (Ljava/util/concurrent/ScheduledExecutorService;Lorg/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions;Lorg/dexpace/sdk/core/util/Clock;Lorg/dexpace/sdk/core/instrumentation/ClientLogger;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun processAsync (Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/http/pipeline/AsyncPipelineNext;)Ljava/util/concurrent/CompletableFuture; +} + +public final class org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep$Companion { +} + public final class org/dexpace/sdk/core/http/pipeline/steps/DefaultInstrumentationStep : org/dexpace/sdk/core/http/pipeline/steps/InstrumentationStep { public fun ()V public fun (Lorg/dexpace/sdk/core/http/pipeline/steps/HttpInstrumentationOptions;)V diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/auth/BearerTokenProvider.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/auth/BearerTokenProvider.kt index 2ba71503..830d518c 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/auth/BearerTokenProvider.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/auth/BearerTokenProvider.kt @@ -7,6 +7,8 @@ package org.dexpace.sdk.core.http.auth +import java.util.concurrent.CompletableFuture + /** * Source of fresh [BearerToken]s for [org.dexpace.sdk.core.http.pipeline.steps.BearerTokenAuthStep]. * @@ -44,4 +46,45 @@ public fun interface BearerTokenProvider { /** Convenience for callers without extra params; forwards to [fetch] with an empty map. */ public fun fetch(scopes: List): BearerToken = fetch(scopes, emptyMap()) + + /** + * Asynchronous counterpart of [fetch], used by + * [org.dexpace.sdk.core.http.pipeline.steps.AsyncBearerTokenAuthStep] so a token refresh + * never blocks the request-dispatching thread. + * + * The default implementation invokes the blocking [fetch] **on the calling thread** and + * wraps the outcome into an already-completed [CompletableFuture] (completing exceptionally + * if [fetch] throws). That default is correct but not non-blocking: a provider that talks to + * a remote token endpoint should override this method to dispatch the fetch off-thread — + * e.g. submit to an [java.util.concurrent.Executor], or call an async OAuth client — so the + * returned future completes without parking the caller. + * + * Per-cloud providers (GCP / Azure / Kubernetes workload identity) and OAuth + * token-exchange flows belong in adapter modules, not in `sdk-core`; this seam is what they + * override. + * + * @param scopes OAuth scopes to request; service-specific. + * @param params extra parameters to pass through to the token endpoint. + * @return a future that completes with a fresh [BearerToken], or completes exceptionally + * with whatever [fetch] threw. + */ + public fun fetchAsync( + scopes: List, + params: Map, + ): CompletableFuture = + try { + CompletableFuture.completedFuture(fetch(scopes, params)) + } catch (t: Throwable) { + // A provider's blocking fetch may throw any Throwable. Surface it through the + // future rather than synchronously so async callers observe a uniform error model. + // Error subclasses (OOM, StackOverflow) are intentionally NOT special-cased here: + // the default just mirrors fetch()'s outcome into the future, and an Error in a + // user-supplied lambda is still that lambda's failure, not a JVM-fatal one for us. + val failed = CompletableFuture() + failed.completeExceptionally(t) + failed + } + + /** Convenience for callers without extra params; forwards to [fetchAsync] with an empty map. */ + public fun fetchAsync(scopes: List): CompletableFuture = fetchAsync(scopes, emptyMap()) } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt new file mode 100644 index 00000000..4534e4e5 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.pipeline.steps + +import org.dexpace.sdk.core.http.common.HttpHeaderName +import org.dexpace.sdk.core.http.pipeline.AsyncHttpStep +import org.dexpace.sdk.core.http.pipeline.AsyncPipelineNext +import org.dexpace.sdk.core.http.pipeline.Stage +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.Response +import org.dexpace.sdk.core.util.Futures +import java.util.concurrent.CompletableFuture + +/** + * Async pillar step at [Stage.AUTH] — the [AsyncHttpStep] counterpart of [AuthStep]. Stamps + * credentials onto outgoing requests via an async [authorizeRequestAsync] (so a token fetch / + * refresh never blocks the dispatching thread) and exposes the same 401 + `WWW-Authenticate` + * challenge hook. + * + * The stamping and challenge semantics mirror [AuthStep] exactly: + * + * - **HTTPS-only.** On the path that attaches a credential, [processAsync] rejects non-HTTPS + * schemes before any token fetch. The guard is skipped on the marker-suppressed cross-origin + * re-issue path, where no credential is attached. + * - **Cross-origin redirects.** A re-issue marked by the redirect step (see + * [CrossOriginRedirectMarker]) is forwarded credential-free; the marker is stripped before + * the request reaches the wire. + * - **Challenge retry.** On a 401 carrying `WWW-Authenticate`, [authorizeRequestOnChallengeAsync] + * is consulted; a non-null replacement is driven through the chain exactly once (no further + * challenge handling). The default returns a future of `null` (no retry). + * + * Unlike the synchronous [AuthStep] the credential-attaching guard checks and the downstream + * dispatches are composed on [CompletableFuture]s so the whole flow stays non-blocking. + * + * ## Thread-safety + * + * The stage is locked at the type level via `final override`. Concrete subclasses must be safe + * for concurrent invocation — see [AsyncBearerTokenAuthStep] (single-flight token refresh). + */ +public abstract class AsyncAuthStep : AsyncHttpStep { + final override val stage: Stage = Stage.AUTH + + final override fun processAsync( + request: Request, + next: AsyncPipelineNext, + ): CompletableFuture { + val authorizedFuture: CompletableFuture = + if (CrossOriginRedirectMarker.isMarked(request)) { + // Cross-origin redirect re-issue: strip the marker, attach no credential. + CompletableFuture.completedFuture( + request.newBuilder() + .headers(CrossOriginRedirectMarker.strip(request.headers)) + .build(), + ) + } else { + val scheme = request.url.protocol + if (!"https".equals(scheme, ignoreCase = true)) { + Futures.failed( + IllegalStateException( + "${this::class.simpleName} requires HTTPS to prevent credential leak " + + "(URL scheme: $scheme)", + ), + ) + } else { + authorizeRequestAsync(request) + } + } + + return authorizedFuture.thenCompose { authorized -> + next.copy().processAsync(authorized).thenCompose { response -> + handleChallenge(authorized, response, next) + } + } + } + + /** + * After the first downstream attempt, applies the 401 + `WWW-Authenticate` challenge hook. + * Returns the response unchanged unless [authorizeRequestOnChallengeAsync] yields a non-null + * replacement, in which case the original 401 is closed and the replacement is driven once. + */ + private fun handleChallenge( + authorized: Request, + response: Response, + next: AsyncPipelineNext, + ): CompletableFuture { + if (response.status.code != SC_UNAUTHORIZED) return CompletableFuture.completedFuture(response) + response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) + ?: return CompletableFuture.completedFuture(response) + + val challengeFuture: CompletableFuture = + try { + authorizeRequestOnChallengeAsync(authorized, response) + } catch (t: Throwable) { + // A sync throw from the hook (caller-bug case) must still close the 401 body. + response.close() + return Futures.failed(t) + } + + return challengeFuture.handle { retryRequest, hookError -> + HookOutcome(retryRequest, hookError) + }.thenCompose { outcome -> + val hookError = outcome.error + if (hookError != null) { + response.close() + return@thenCompose Futures.failed(Futures.unwrap(hookError)) + } + val retryRequest = outcome.request ?: return@thenCompose CompletableFuture.completedFuture(response) + response.close() + next.copy().processAsync(retryRequest) + } + } + + /** Carrier so the challenge future's outcome (value or error) survives [CompletableFuture.handle]. */ + private class HookOutcome(val request: Request?, val error: Throwable?) + + /** + * Returns a future of [request] with the credential's auth header attached. Subclasses + * implement the concrete async stamping (e.g. fetch-or-refresh a bearer token off-thread, + * then stamp `Authorization: Bearer `). + * + * Called once per request before the downstream chain is invoked. + */ + protected abstract fun authorizeRequestAsync(request: Request): CompletableFuture + + /** + * Hook invoked on a 401 response that carries a `WWW-Authenticate` header. The default + * returns a future of `null` — surface the 401 with no retry. + * + * Subclasses override to refresh tokens or step up auth. A non-null [Request] in the + * returned future triggers a single retry through the downstream chain; the original 401 is + * closed first. + * + * @param request the request already stamped with the credential that produced the 401. + * @param response the 401 response; its body is still open at this point. + */ + protected open fun authorizeRequestOnChallengeAsync( + request: Request, + response: Response, + ): CompletableFuture = CompletableFuture.completedFuture(null) + + private companion object { + // HTTP 401 — the only status code AsyncAuthStep responds to. + private const val SC_UNAUTHORIZED = 401 + } +} diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt new file mode 100644 index 00000000..d877e177 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt @@ -0,0 +1,276 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.pipeline.steps + +import org.dexpace.sdk.core.http.auth.AuthChallengeParser +import org.dexpace.sdk.core.http.auth.BearerToken +import org.dexpace.sdk.core.http.auth.BearerTokenProvider +import org.dexpace.sdk.core.http.common.HttpHeaderName +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.Response +import org.dexpace.sdk.core.instrumentation.ClientLogger +import org.dexpace.sdk.core.util.Clock +import java.time.Duration +import java.util.concurrent.CompletableFuture +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +/** + * [AsyncAuthStep] that stamps `Authorization: Bearer ` on outgoing requests, fetching the + * token from [provider] via [BearerTokenProvider.fetchAsync] so a refresh never blocks the + * dispatching thread. The async counterpart of [BearerTokenAuthStep]. + * + * ## Background refresh + * + * The cached token is held in a `@Volatile` field; the hot-path read is wait-free. The expiry + * decision has three zones, keyed off [refreshMargin]: + * + * - **Fresh** — `now + margin < expiresAt`: stamp the cached token, no refresh. + * - **Expiring** — `expiresAt - margin <= now < expiresAt`: the token is still *valid*, so it + * is returned **immediately** and stamped, while a refresh is kicked off **off-thread** in the + * background. The in-flight request never waits on the token endpoint; the next request picks + * up the refreshed token once it lands. This is the behaviour issue #32 asks for: a + * valid-but-expiring token returned without blocking. + * - **Expired / missing** — `now >= expiresAt`, or no cached token: there is no usable + * credential, so the request **must** await a fresh fetch before it can be stamped. + * + * ## Single-flight + * + * Concurrent requests that all observe an expiring or missing token share **one** in-flight + * [provider] call rather than each hitting the token endpoint (no stampede). The in-flight future + * is published under a [ReentrantLock]; whoever wins the lock starts the fetch, everyone else + * joins the same future. On completion the in-flight slot is cleared so a later refresh starts a + * new fetch. A failed fetch is **not** cached — the next request retries. + * + * ## Eviction on 401 + * + * Like [BearerTokenAuthStep], a 401 + `WWW-Authenticate: Bearer` evicts the rejected token and + * re-stamps the single [AsyncAuthStep] retry with a freshly fetched one. Eviction is scoped to + * the exact token that produced the 401; a token a concurrent request already refreshed is left + * in place. A 401 with no bearer challenge (or none at all), or a request that reached the hook + * credential-free (cross-origin suppression), surfaces unchanged. + * + * ## Errors from [provider] + * + * - Future completes exceptionally → propagated; not cached, so a later request retries. + * - Future completes with `null` → surfaced as [IllegalStateException]. + * - Future completes with an already-expired token → surfaced as [IllegalStateException]. + * + * A background (expiring-zone) refresh that fails or returns an unusable token does **not** fail + * the in-flight request — the still-valid cached token was already stamped — it only logs and + * leaves the cache for the next request to refresh. + * + * ## Open for subclassing + * + * Override [bearerHeaderValue] to change the header format (and keep eviction matching), or + * [authorizeRequestOnChallengeAsync] to customise challenge handling. + */ +public open class AsyncBearerTokenAuthStep + @JvmOverloads + constructor( + private val provider: BearerTokenProvider, + private val scopes: List, + private val refreshMargin: Duration = Duration.ofSeconds(DEFAULT_REFRESH_MARGIN_SECONDS), + private val clock: Clock = Clock.SYSTEM, + private val logger: ClientLogger = ClientLogger(AsyncBearerTokenAuthStep::class), + ) : AsyncAuthStep() { + private val lock = ReentrantLock() + + @Volatile + private var cachedToken: BearerToken? = null + + // The single shared in-flight fetch, or null when none is running. Published / cleared + // under [lock] so concurrent expiring/missing requests coalesce onto one provider call. + @Volatile + private var inFlight: CompletableFuture? = null + + override fun authorizeRequestAsync(request: Request): CompletableFuture = + currentToken().thenApply { token -> stamp(request, token) } + + override fun authorizeRequestOnChallengeAsync( + request: Request, + response: Response, + ): CompletableFuture { + // No credential on the rejected request → stamping was suppressed (cross-origin + // redirect). Surface the 401 unchanged. + val rejectedHeader = + request.headers.get(HttpHeaderName.AUTHORIZATION) + ?: return CompletableFuture.completedFuture(null) + // A token refresh can only satisfy a Bearer challenge. + if (!offersBearerChallenge(response)) return CompletableFuture.completedFuture(null) + evictRejectedToken(rejectedHeader) + // forceFresh: the rejected token was just evicted; await a genuinely fresh fetch + // before re-stamping so the retry never carries the same rejected credential. + return forceFreshToken().thenApply { token -> stamp(request, token) as Request? } + } + + private fun stamp( + request: Request, + token: BearerToken, + ): Request = + request.newBuilder() + .setHeader(HttpHeaderName.AUTHORIZATION.caseSensitiveName, bearerHeaderValue(token.token)) + .build() + + /** + * Resolves the token to stamp for the current request, applying the three-zone expiry + * policy (fresh / expiring / expired). Never blocks: the returned future completes + * immediately when a usable cached token exists (even if a background refresh is also + * kicked off), and otherwise completes when the single-flight fetch lands. + */ + private fun currentToken(): CompletableFuture { + val now = clock.now() + val cached = cachedToken + if (cached != null && !cached.isExpiredAt(now)) { + // Token is still valid. If it is inside the refresh margin, return it now and + // refresh in the background; otherwise just return it. + if (cached.isExpiredAt(now, refreshMargin)) { + startBackgroundRefresh() + } + return CompletableFuture.completedFuture(cached) + } + // Missing or hard-expired: must await a fresh fetch. + return forceFreshToken() + } + + /** + * Kicks off a single-flight refresh whose result the caller does NOT await. Used on the + * expiring-but-valid path: the still-valid cached token is what the in-flight request + * stamps; this just warms the cache for the next request. + */ + private fun startBackgroundRefresh() { + // Reuse the single-flight machinery; ignore the returned future (fire-and-forget). + // exceptionally/handle keeps an unhandled background failure from surfacing as an + // uncaught CompletableFuture completion. + sharedFetch().whenComplete { _, error -> + if (error != null) { + logger.atWarning() + .event("http.auth.background_refresh_failed") + .field("error.type", error::class.java.simpleName ?: "Throwable") + .cause(error) + .log() + } + } + } + + /** + * Returns a future that completes with a usable token, awaiting the single-flight fetch. + * Used when there is no usable cached token (missing / hard-expired) and on the + * post-eviction challenge path. + */ + private fun forceFreshToken(): CompletableFuture = sharedFetch() + + /** + * Single-flight fetch coordinator. The first caller to find no in-flight fetch starts one + * (publishing it under [lock]); concurrent callers join the same future. The in-flight + * slot is cleared on completion so a subsequent refresh starts fresh. A re-check of the + * cache inside the lock means a token another thread just refreshed short-circuits the + * fetch. + */ + private fun sharedFetch(): CompletableFuture { + lock.withLock { + // Re-read inside the lock: another thread may have just refreshed. + val now = clock.now() + cachedToken?.takeIf { !it.isExpiredAt(now, refreshMargin) } + ?.let { return CompletableFuture.completedFuture(it) } + inFlight?.let { return it } + val fetch = launchFetch() + inFlight = fetch + // Attach cache bookkeeping AFTER publishing, so the clear-on-complete callback + // compares against the exact future stored in `inFlight`. Attaching here (rather + // than inside launchFetch) sidesteps the self-reference an inline-completed future + // would otherwise need. + fetch.whenComplete { token, error -> + lock.withLock { + if (inFlight === fetch) inFlight = null + if (error == null && token != null) cachedToken = token + } + } + return fetch + } + } + + /** + * Starts the provider fetch and applies validation (null token, already-expired token) so + * a misbehaving provider surfaces as [IllegalStateException] to the awaiting caller. Cache + * population and in-flight clearing are attached by [sharedFetch] against the published + * future. + */ + private fun launchFetch(): CompletableFuture { + val raw: CompletableFuture = + try { + fetchAsyncSafe() + } catch (t: Throwable) { + // A provider whose fetchAsync throws synchronously (caller-bug) — normalise. + val failed = CompletableFuture() + failed.completeExceptionally(t) + failed + } + return raw.thenApply { token -> validateFresh(token) } + } + + @Suppress( + "UNCHECKED_CAST", + "RedundantNullableReturnType", + ) + private fun fetchAsyncSafe(): CompletableFuture { + val future: CompletableFuture? = + provider.fetchAsync(scopes) as CompletableFuture? + return future ?: error("BearerTokenProvider.fetchAsync returned null") + } + + /** + * Validates a freshly fetched token: rejects a `null` (Kotlin intrinsics usually catch + * this earlier for Kotlin SAMs, but a platform-disabled-intrinsics context may not) and a + * token already expired at fetch time (no margin applied — a provider minting an + * effectively-expired token is misbehaving). + */ + @Suppress("UNCHECKED_CAST", "RedundantNullableReturnType") + private fun validateFresh(token: BearerToken): BearerToken { + val nonNull: BearerToken = (token as BearerToken?) ?: error("BearerTokenProvider returned null") + check(!nonNull.isExpiredAt(clock.now())) { + "BearerTokenProvider returned an already-expired token" + } + return nonNull + } + + /** + * Returns `true` when [response]'s `WWW-Authenticate` header advertises a `Bearer` + * challenge. + */ + private fun offersBearerChallenge(response: Response): Boolean { + val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false + return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME } + } + + /** + * Clears [cachedToken] iff it is still the token whose stamped header is [rejectedHeader]. + * Guarded by the same [lock] as the fetch path so the read-compare-clear is atomic against + * a concurrent refresh. + */ + private fun evictRejectedToken(rejectedHeader: String) { + lock.withLock { + val current = cachedToken ?: return + if (bearerHeaderValue(current.token) == rejectedHeader) { + cachedToken = null + } + } + } + + /** + * The `Authorization` header value for [token]. Single source of truth shared by the + * stamping path and [evictRejectedToken]. A subclass that emits a different header format + * must override this too, or eviction stops matching. + */ + protected open fun bearerHeaderValue(token: String): String = "Bearer $token" + + private companion object { + private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L + private const val BEARER_SCHEME = "bearer" + } + } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncRetryStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncRetryStep.kt new file mode 100644 index 00000000..ad2ec8e1 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncRetryStep.kt @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.pipeline.steps + +import org.dexpace.sdk.core.http.pipeline.AsyncHttpStep +import org.dexpace.sdk.core.http.pipeline.Stage + +/** + * Async pillar step at [Stage.RETRY] — the [AsyncHttpStep] counterpart of [RetryStep]. Drives + * an async retry loop with the same classification policy, backoff schedule, and + * `Retry-After` pacing as the synchronous stack, but schedules its delays on a + * [java.util.concurrent.ScheduledExecutorService] instead of blocking a thread. + * + * The base is `abstract` because the stage is locked to [Stage.RETRY] at the type level — + * users implementing custom async-retry behaviour override + * [AsyncHttpStep.processAsync] but inherit the pillar slot. The shipped concrete + * implementation is [DefaultAsyncRetryStep]. + */ +public abstract class AsyncRetryStep : AsyncHttpStep { + final override val stage: Stage = Stage.RETRY +} diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt new file mode 100644 index 00000000..ebeece9f --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt @@ -0,0 +1,398 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.pipeline.steps + +import org.dexpace.sdk.core.http.common.HttpHeaderName +import org.dexpace.sdk.core.http.pipeline.AsyncPipelineNext +import org.dexpace.sdk.core.http.request.Method +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.Response +import org.dexpace.sdk.core.instrumentation.ClientLogger +import org.dexpace.sdk.core.pipeline.step.retry.BackoffCalculator +import org.dexpace.sdk.core.pipeline.step.retry.RetryAfterParser +import org.dexpace.sdk.core.pipeline.step.retry.RetrySettings +import org.dexpace.sdk.core.util.Clock +import org.dexpace.sdk.core.util.Futures +import java.io.IOException +import java.time.Duration +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ScheduledExecutorService + +/** + * Default [AsyncRetryStep] — the async mirror of [DefaultRetryStep]. Re-invokes the downstream + * async chain on classified failures with exponential / fixed backoff, server-supplied + * [HttpHeaderName.RETRY_AFTER] pacing, and idempotency-aware re-sendability gating, sharing the + * exact same policy as the synchronous stack: the same [HttpRetryOptions], the same + * [BackoffCalculator], the same [RetryAfterParser], the same `Retry-After` header set, and the + * same idempotent-method allow-list. + * + * ## Non-blocking delays + * + * Backoff delays are scheduled on a [ScheduledExecutorService] via [Futures.delay] — no + * `Thread.sleep`, no `Timer`. While a delay is pending the dispatching thread is free; the loop + * re-arms when the scheduled future fires. + * + * ## Loop shape — stack-safe, no `thenCompose` recursion + * + * The retry loop is driven iteratively by [drive], not by chaining `thenCompose` per attempt. + * Each downstream attempt registers a single [CompletableFuture.whenComplete] callback. When the + * outcome warrants another attempt, the callback hands control back to [drive] through a + * trampoline ([RetryDriver.continuation]) instead of calling [drive] recursively, so a retry + * sequence of length N never builds an N-deep stack frame chain or an N-deep future + * continuation graph. This mirrors the iterative `while` loop of [DefaultRetryStep] while staying + * fully async. + * + * ## Re-sendability gating + * + * Identical to [DefaultRetryStep]: + * - **No body** — retried only when the method is idempotent ([IDEMPOTENT_METHODS]); a bare + * non-idempotent `POST` is not retried even though there is nothing to re-send. + * - **Has a body** — retried only when [org.dexpace.sdk.core.http.request.RequestBody.isReplayable]. + * + * When the request is not re-sendable the loop runs exactly one attempt and completes with the + * response (or the failure) as-is. + * + * ## Failure handling + * + * Downstream failures surface as exceptionally-completed futures; the loop unwraps the + * [java.util.concurrent.CompletionException] wrapper via [Futures.unwrap] before classifying. + * Only [Exception] subclasses are classified — an [Error] (OOM, StackOverflow) completes the + * call exceptionally without retry. On terminal failure every prior attempt's exception is + * attached to the surfaced exception via [Throwable.addSuppressed]. + * + * ## Thread-safety + * + * Stateless after construction (the per-call [RetryDriver] holds all mutable loop state). The + * immutable [options] / [clock] / [scheduler] and the [ClientLogger] are shared across + * concurrent calls. + */ +public open class DefaultAsyncRetryStep + @JvmOverloads + constructor( + private val scheduler: ScheduledExecutorService, + options: HttpRetryOptions = HttpRetryOptions(), + private val clock: Clock = Clock.SYSTEM, + internal val logger: ClientLogger = ClientLogger(DefaultAsyncRetryStep::class), + ) : AsyncRetryStep() { + /** Effective options. `maxRetries < 0` is clamped to [DefaultRetryStep.DEFAULT_MAX_RETRIES]. */ + private val options: HttpRetryOptions = clampOptions(options) + + /** + * The [options]' exponential parameters as a [RetrySettings] view so the shared + * [BackoffCalculator] computes this stack's schedule — built once, exactly as + * [DefaultRetryStep.backoffSettings]. `totalTimeout = ZERO` disables the deadline cap. + * Building it eagerly validates the delay magnitudes at construction. + */ + private val backoffSettings: RetrySettings = + RetrySettings.builder() + .initialDelay(this.options.baseDelay) + .maxDelay(this.options.maxDelay) + .delayMultiplier(RetrySettings.DEFAULT_DELAY_MULTIPLIER) + .jitter(RetrySettings.DEFAULT_JITTER) + .totalTimeout(Duration.ZERO) + .build() + + override fun processAsync( + request: Request, + next: AsyncPipelineNext, + ): CompletableFuture { + val result = CompletableFuture() + val driver = RetryDriver(next, isRetrySafe(request), result) + driver.drive() + return result + } + + /** + * Per-call mutable loop state plus the trampolining driver. One instance per + * [processAsync] call; never shared across calls or threads (each call's continuations + * run sequentially on the scheduler / completing thread, never concurrently with + * themselves). + */ + private inner class RetryDriver( + private val next: AsyncPipelineNext, + private val retrySafe: Boolean, + private val result: CompletableFuture, + ) { + private var tryCount = 0 + private val sequenceStartNanos = clock.monotonic() + + // Lazily allocated on first failure so the success path never pays for the list. + private var suppressed: MutableList? = null + + // Trampoline state. `pumping` is true while the synchronous pump loop in drive() is + // active; `rearm` records that another attempt should run. A re-arm that happens + // while the pump is active (an inline / zero-delay retry) just sets `rearm` and lets + // the loop pick it up — it never recurses into a new drive() frame. A re-arm that + // happens after the pump has exited (the common async case, fired from a scheduler + // or downstream-completion thread) starts a fresh, shallow pump. Both paths run + // sequentially per call — drive() is only ever entered by one thread at a time + // because each attempt's continuation fires exactly once and the previous pump has + // returned before the async callback runs. + private var pumping: Boolean = false + private var rearm: Boolean = false + + /** + * Entry point and trampoline. Marks that an attempt should run ([rearm]); if a pump + * loop is already active it returns immediately (the active loop will pick up the + * re-arm), otherwise it runs the loop. The loop keeps starting attempts as long as + * inline completions keep setting [rearm], so a burst of zero-delay retries unwinds + * iteratively instead of recursing. + */ + fun drive() { + rearm = true + if (pumping) return + pumping = true + try { + while (rearm) { + rearm = false + startAttempt() + } + } finally { + pumping = false + } + } + + /** + * Launches one downstream attempt and registers its completion handler. When the + * attempt (and its retry decision + zero-length delay) complete inline, the handler + * runs synchronously and re-enters [drive] — which, because [pumping] is still true, + * merely sets [rearm] for the active loop. When the attempt completes later, the + * handler runs on the completing thread and starts a fresh pump. + */ + private fun startAttempt() { + val attempt: CompletableFuture = + try { + next.copy().processAsync() + } catch (e: Exception) { + // The async chain contract permits sync exceptions only for caller-bug + // cases; normalise to a failed future so classification is uniform. + Futures.failed(e) + } + attempt.whenComplete { response, error -> handleOutcome(response, error) } + } + + /** Routes a completed attempt to the success or failure handler. */ + private fun handleOutcome( + response: Response?, + error: Throwable?, + ) { + if (error == null) { + onSuccess(response!!) + } else { + onFailure(error) + } + } + + private fun onSuccess(response: Response) { + val retry = + retrySafe && + tryCount < options.maxRetries && + shouldRetryResponse(response) + if (!retry) { + result.complete(response) + return + } + val delay = computeResponseDelay(response, tryCount) + logRetry(tryCount, delay, response.status.code, cause = null) + closeQuietly(response) + tryCount++ + scheduleNext(delay) + } + + private fun onFailure(rawError: Throwable) { + val error = Futures.unwrap(rawError) + // Errors (OOM, StackOverflow, …) are unrecoverable — never retry, never log. + if (error is Error) { + result.completeExceptionally(error) + return + } + val exception = error as Exception + val retry = + retrySafe && + tryCount < options.maxRetries && + shouldRetryException(exception) + if (!retry) { + suppressed?.forEach(exception::addSuppressed) + result.completeExceptionally(exception) + return + } + val accumulator = suppressed ?: ArrayList().also { suppressed = it } + val delay = computeExceptionDelay(exception, tryCount) + logRetry(tryCount, delay, statusCode = -1, cause = exception) + // Record the current failure BEFORE scheduling so it is attached to any later + // terminal exception's suppressed list rather than being silently dropped. + accumulator.add(exception) + tryCount++ + scheduleNext(delay) + } + + /** + * Schedules the next attempt after [delay]. [Futures.delay] returns an + * already-complete future for a zero delay, so the [CompletableFuture.whenComplete] + * callback runs inline and re-enters [drive] while the pump loop is still active — a + * synchronous re-arm with no extra stack frame. For a positive delay the scheduled + * future fires later on the scheduler thread and starts a fresh pump. + */ + private fun scheduleNext(delay: Duration) { + val safeDelay = if (delay.isNegative) Duration.ZERO else delay + Futures.delay(scheduler, safeDelay).whenComplete { _, scheduleError -> + if (scheduleError != null) { + // The scheduler rejected or failed the delay task — surface it with any + // accumulated prior failures attached. + suppressed?.forEach(scheduleError::addSuppressed) + result.completeExceptionally(scheduleError) + } else { + drive() + } + } + } + + // --------------- Classification --------------- + + private fun shouldRetryResponse(response: Response): Boolean { + val condition = HttpRetryCondition(response, null, tryCount, (suppressed ?: emptyList())) + return invokeShouldRetry(options.shouldRetryCondition, condition) + } + + private fun shouldRetryException(exception: Exception): Boolean { + val condition = HttpRetryCondition(null, exception, tryCount, (suppressed ?: emptyList())) + return invokeShouldRetry(options.shouldRetryException, condition) + } + + // --------------- Delay computation --------------- + + private fun computeResponseDelay( + response: Response, + tryCount: Int, + ): Duration { + val condition = HttpRetryCondition(response, null, tryCount, (suppressed ?: emptyList())) + invokeDelayFromCondition(condition)?.let { return it } + retryAfterFromHeaders(response)?.let { return it } + return backoffOrFixed(tryCount) + } + + private fun computeExceptionDelay( + exception: Exception, + tryCount: Int, + ): Duration { + val condition = HttpRetryCondition(null, exception, tryCount, (suppressed ?: emptyList())) + invokeDelayFromCondition(condition)?.let { return it } + return backoffOrFixed(tryCount) + } + + // --------------- Logging --------------- + + private fun logRetry( + tryCount: Int, + delay: Duration, + statusCode: Int, + cause: Throwable?, + ) { + val event = + logger.atInfo() + .event("http.retry") + .field("http.retry.try_count", tryCount.toLong()) + .field("http.retry.delay_ms", delay.toMillis()) + .field("retry.total_elapsed_ms", (clock.monotonic() - sequenceStartNanos) / NANOS_PER_MILLI) + if (statusCode > 0) { + event.field("http.response.status_code", statusCode.toLong()) + } + if (cause != null) { + event.field("error.type", cause::class.java.simpleName ?: "Throwable") + .field("retry.cause_class", cause::class.simpleName ?: "Throwable") + .cause(cause) + } + event.log() + } + } + + // --------------- Shared helpers (stateless across calls) --------------- + + private fun isRetrySafe(request: Request): Boolean { + val body = request.body ?: return request.method in IDEMPOTENT_METHODS + return body.isReplayable() + } + + private fun invokeShouldRetry( + predicate: HttpRetryConditionPredicate, + condition: HttpRetryCondition, + ): Boolean = + try { + predicate.shouldRetry(condition) + } catch (t: Throwable) { + @Suppress("InstanceOfCheckForException") + if (t is Error) throw t + throw IllegalStateException("shouldRetry predicate threw", t) + } + + private fun invokeDelayFromCondition(condition: HttpRetryCondition): Duration? = + try { + options.delayFromCondition.delayFor(condition) + } catch (t: Throwable) { + @Suppress("InstanceOfCheckForException") + if (t is Error) throw t + logger.atWarning() + .event("http.retry.delay_override_failed") + .field("error.type", t::class.java.simpleName ?: "Throwable") + .cause(t) + .log() + null + } + + private fun backoffOrFixed(tryCount: Int): Duration = + options.fixedDelay ?: BackoffCalculator.computeDelay(tryCount + 1, backoffSettings) + + private fun retryAfterFromHeaders(response: Response): Duration? { + val now = clock.now() + for (name in options.retryAfterHeaders) { + val raw = response.headers.get(name) ?: continue + RetryAfterParser.parseHeaderValue(name, raw, now)?.let { return it } + } + return null + } + + private fun closeQuietly(response: Response) { + try { + response.close() + } catch (closeErr: IOException) { + logger.atVerbose() + .event("http.retry.close_failed") + .field("error.type", closeErr::class.java.simpleName ?: "IOException") + .log() + } + } + + private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions { + if (opts.maxRetries >= 0) return opts + logger.atVerbose() + .event("http.retry.maxRetries_clamped") + .field("http.retry.max_retries.requested", opts.maxRetries.toLong()) + .field("http.retry.max_retries.applied", DefaultRetryStep.DEFAULT_MAX_RETRIES.toLong()) + .log() + return HttpRetryOptions( + maxRetries = DefaultRetryStep.DEFAULT_MAX_RETRIES, + baseDelay = opts.baseDelay, + maxDelay = opts.maxDelay, + fixedDelay = opts.fixedDelay, + retryAfterHeaders = opts.retryAfterHeaders, + shouldRetryCondition = opts.shouldRetryCondition, + shouldRetryException = opts.shouldRetryException, + delayFromCondition = opts.delayFromCondition, + ) + } + + public companion object { + // Nanoseconds in one millisecond — converts monotonic deltas to ms for log events. + private const val NANOS_PER_MILLI = 1_000_000L + + // Methods safe to re-send regardless of body replayability (idempotent per RFC 9110). + // Mirrors DefaultRetryStep.IDEMPOTENT_METHODS / RetrySettings.DEFAULT_RETRYABLE_METHODS. + private val IDEMPOTENT_METHODS: Set = + setOf(Method.GET, Method.HEAD, Method.OPTIONS, Method.PUT, Method.DELETE) + } + } diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStepTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStepTest.kt new file mode 100644 index 00000000..d3ad74bd --- /dev/null +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStepTest.kt @@ -0,0 +1,329 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.pipeline.steps + +import org.dexpace.sdk.core.client.AsyncHttpClient +import org.dexpace.sdk.core.http.auth.BearerToken +import org.dexpace.sdk.core.http.auth.BearerTokenProvider +import org.dexpace.sdk.core.http.common.Headers +import org.dexpace.sdk.core.http.common.HttpHeaderName +import org.dexpace.sdk.core.http.common.Protocol +import org.dexpace.sdk.core.http.pipeline.AsyncHttpPipelineBuilder +import org.dexpace.sdk.core.http.request.Method +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.Response +import org.dexpace.sdk.core.http.response.Status +import org.dexpace.sdk.core.io.Io +import org.dexpace.sdk.core.testing.FixedClock +import org.dexpace.sdk.core.util.Futures +import org.dexpace.sdk.io.OkioIoProvider +import java.time.Duration +import java.time.Instant +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFails +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class AsyncBearerTokenAuthStepTest { + private val now: Instant = Instant.parse("2026-01-01T12:00:00Z") + private val clock = FixedClock(now) + + @BeforeTest + fun setUp() { + Io.installProvider(OkioIoProvider) + } + + // ----------------- Basic stamping ----------------- + + @Test + fun `stamps a fresh token without blocking`() { + val provider = CountingProvider { BearerToken("tok", now.plusSeconds(3600)) } + val client = RecordingClient(200) + val future = pipeline(provider, client).sendAsync(getRequest()) + val response = future.join() + assertEquals(200, response.status.code) + assertEquals("Bearer tok", client.lastAuth) + assertEquals(1, provider.fetchCount) + } + + @Test + fun `caches the token across requests - one fetch for two sends`() { + val provider = CountingProvider { BearerToken("tok", now.plusSeconds(3600)) } + val client = RecordingClient(200) + val p = pipeline(provider, client) + p.sendAsync(getRequest()).join() + p.sendAsync(getRequest()).join() + assertEquals(1, provider.fetchCount) + } + + // ----------------- Background refresh of an expiring token ----------------- + + @Test + fun `valid-but-expiring token is returned immediately and refreshed in the background`() { + // Token expires in 10s; refresh margin is 30s → inside the margin but still valid. + val deferred = CompletableFuture() + // Seed the cache with the expiring token by handing it back on the first (awaited) fetch. + val seeded = CompletableFuture.completedFuture(BearerToken("old", now.plusSeconds(10))) + val seedingProvider = + object : BearerTokenProvider { + val fetches = AtomicInteger(0) + val gate = AtomicInteger(0) + + override fun fetch( + scopes: List, + params: Map, + ): BearerToken = error("blocking fetch must not be called") + + override fun fetchAsync( + scopes: List, + params: Map, + ): CompletableFuture { + fetches.incrementAndGet() + return if (gate.getAndIncrement() == 0) seeded else deferred + } + } + val client = RecordingClient(200) + val p = pipeline(seedingProvider, client) + + // First request: no cache → awaits the seeding fetch → stamps "old". + p.sendAsync(getRequest()).join() + assertEquals("Bearer old", client.lastAuth) + + // Second request: cached "old" is valid (expires in 10s) but inside the 30s margin → + // it is returned IMMEDIATELY and stamped; the refresh is kicked off in the background and + // is still pending (deferred not completed). + val secondFuture = p.sendAsync(getRequest()) + val second = secondFuture.join() + assertEquals(200, second.status.code) + assertEquals("Bearer old", client.lastAuth, "expiring-but-valid token must be stamped without waiting") + assertEquals(2, seedingProvider.fetches.get(), "a background refresh must have been started") + assertFalse(deferred.isDone) + + // Complete the background refresh; the NEXT request now sees the new token. + deferred.complete(BearerToken("new", now.plusSeconds(3600))) + p.sendAsync(getRequest()).join() + assertEquals("Bearer new", client.lastAuth) + } + + // ----------------- Single-flight ----------------- + + @Test + fun `concurrent expired-token requests share one fetch`() { + // No cached token → both requests must await; only ONE provider fetch should happen. + val deferred = CompletableFuture() + val fetches = AtomicInteger(0) + val provider = + object : BearerTokenProvider { + override fun fetch( + scopes: List, + params: Map, + ): BearerToken = error("blocking fetch must not be called") + + override fun fetchAsync( + scopes: List, + params: Map, + ): CompletableFuture { + fetches.incrementAndGet() + return deferred + } + } + val client = RecordingClient(200) + val p = pipeline(provider, client) + + val f1 = p.sendAsync(getRequest()) + val f2 = p.sendAsync(getRequest()) + // Both are parked on the single shared fetch. + assertFalse(f1.isDone) + assertFalse(f2.isDone) + assertEquals(1, fetches.get(), "concurrent requests must coalesce onto one fetch") + + deferred.complete(BearerToken("tok", now.plusSeconds(3600))) + assertEquals(200, f1.join().status.code) + assertEquals(200, f2.join().status.code) + assertEquals(1, fetches.get()) + } + + // ----------------- Provider errors ----------------- + + @Test + fun `provider failure propagates and is not cached`() { + val fetches = AtomicInteger(0) + val provider = + object : BearerTokenProvider { + override fun fetch( + scopes: List, + params: Map, + ): BearerToken = error("blocking fetch must not be called") + + override fun fetchAsync( + scopes: List, + params: Map, + ): CompletableFuture { + fetches.incrementAndGet() + return Futures.failed(RuntimeException("token endpoint down")) + } + } + val client = RecordingClient(200) + val p = pipeline(provider, client) + + val thrown = assertFails { p.sendAsync(getRequest()).join() } + assertTrue(Futures.unwrap(thrown) is RuntimeException) + // Not cached: a second request retries the fetch. + assertFails { p.sendAsync(getRequest()).join() } + assertEquals(2, fetches.get()) + } + + @Test + fun `provider returning an already-expired token surfaces IllegalStateException`() { + val provider = CountingProvider { BearerToken("stale", now.minusSeconds(1)) } + val client = RecordingClient(200) + val thrown = assertFails { pipeline(provider, client).sendAsync(getRequest()).join() } + assertTrue(Futures.unwrap(thrown) is IllegalStateException) + } + + // ----------------- HTTPS guard ----------------- + + @Test + fun `non-HTTPS request is rejected before any fetch`() { + val provider = CountingProvider { BearerToken("tok", now.plusSeconds(3600)) } + val client = RecordingClient(200) + val request = Request.builder().method(Method.GET).url("http://api.example.com/x").build() + val thrown = assertFails { pipeline(provider, client).sendAsync(request).join() } + assertTrue(Futures.unwrap(thrown) is IllegalStateException) + assertEquals(0, provider.fetchCount, "no token fetch on the rejected plaintext path") + } + + // ----------------- 401 challenge eviction ----------------- + + @Test + fun `401 bearer challenge evicts the token and retries with a fresh one`() { + var fetch = 0 + val provider = + object : BearerTokenProvider { + val fetches = AtomicInteger(0) + + override fun fetch( + scopes: List, + params: Map, + ): BearerToken = error("blocking fetch must not be called") + + override fun fetchAsync( + scopes: List, + params: Map, + ): CompletableFuture { + fetches.incrementAndGet() + fetch++ + val value = if (fetch == 1) "first" else "second" + return CompletableFuture.completedFuture(BearerToken(value, now.plusSeconds(3600))) + } + } + var call = 0 + val seenAuth = mutableListOf() + val client = + AsyncHttpClient { request -> + call++ + seenAuth.add(request.headers.get(HttpHeaderName.AUTHORIZATION)) + val code = if (call == 1) 401 else 200 + val headers = + if (call == 1) { + Headers.Builder().add(HttpHeaderName.WWW_AUTHENTICATE.caseSensitiveName, "Bearer").build() + } else { + Headers.Builder().build() + } + CompletableFuture.completedFuture( + Response.builder() + .request(request) + .protocol(Protocol.HTTP_1_1) + .status(Status.fromCode(code)) + .headers(headers) + .build(), + ) + } + val response = pipeline(provider, client).sendAsync(getRequest()).join() + assertEquals(200, response.status.code) + assertEquals(listOf("Bearer first", "Bearer second"), seenAuth) + assertEquals(2, provider.fetches.get()) + } + + @Test + fun `401 without a bearer challenge surfaces unchanged`() { + val provider = CountingProvider { BearerToken("tok", now.plusSeconds(3600)) } + var call = 0 + val client = + AsyncHttpClient { request -> + call++ + CompletableFuture.completedFuture( + Response.builder() + .request(request) + .protocol(Protocol.HTTP_1_1) + .status(Status.fromCode(401)) + .headers( + Headers.Builder() + .add(HttpHeaderName.WWW_AUTHENTICATE.caseSensitiveName, "Basic realm=x") + .build(), + ) + .build(), + ) + } + val response = pipeline(provider, client).sendAsync(getRequest()).join() + assertEquals(401, response.status.code) + assertEquals(1, call, "a Basic challenge must not trigger a bearer re-fetch + retry") + } + + // ----------------- Helpers ----------------- + + private fun pipeline( + provider: BearerTokenProvider, + client: AsyncHttpClient, + ) = AsyncHttpPipelineBuilder(client) + .append(AsyncBearerTokenAuthStep(provider, listOf("scope"), Duration.ofSeconds(30), clock)) + .build() + + private fun getRequest(): Request = Request.builder().method(Method.GET).url("https://api.example.com/x").build() + + /** Provider that counts fetchAsync calls and returns [supply]'s token, completed. */ + private class CountingProvider(private val supply: () -> BearerToken) : BearerTokenProvider { + private val fetches = AtomicInteger(0) + + val fetchCount: Int get() = fetches.get() + + override fun fetch( + scopes: List, + params: Map, + ): BearerToken = error("blocking fetch must not be called") + + override fun fetchAsync( + scopes: List, + params: Map, + ): CompletableFuture { + fetches.incrementAndGet() + return CompletableFuture.completedFuture(supply()) + } + } + + /** Async client returning a constant status and recording the Authorization header. */ + private class RecordingClient(private val code: Int) : AsyncHttpClient { + @Volatile + var lastAuth: String? = null + + override fun executeAsync(request: Request): CompletableFuture { + lastAuth = request.headers.get(HttpHeaderName.AUTHORIZATION) + return CompletableFuture.completedFuture( + Response.builder() + .request(request) + .protocol(Protocol.HTTP_1_1) + .status(Status.fromCode(code)) + .build(), + ) + } + } +} diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStepTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStepTest.kt new file mode 100644 index 00000000..11088fb5 --- /dev/null +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStepTest.kt @@ -0,0 +1,383 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.pipeline.steps + +import org.dexpace.sdk.core.client.AsyncHttpClient +import org.dexpace.sdk.core.http.common.Headers +import org.dexpace.sdk.core.http.common.MediaType +import org.dexpace.sdk.core.http.common.Protocol +import org.dexpace.sdk.core.http.pipeline.AsyncHttpPipelineBuilder +import org.dexpace.sdk.core.http.pipeline.AsyncPipelineNext +import org.dexpace.sdk.core.http.pipeline.Stage +import org.dexpace.sdk.core.http.request.Method +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.request.RequestBody +import org.dexpace.sdk.core.http.response.Response +import org.dexpace.sdk.core.http.response.ResponseBody +import org.dexpace.sdk.core.http.response.Status +import org.dexpace.sdk.core.io.BufferedSink +import org.dexpace.sdk.core.io.BufferedSource +import org.dexpace.sdk.core.testing.ManualScheduler +import org.dexpace.sdk.core.util.Clock +import org.dexpace.sdk.core.util.Futures +import java.io.IOException +import java.time.Duration +import java.time.Instant +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.AfterTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFails +import kotlin.test.assertFalse +import kotlin.test.assertTrue +import kotlin.test.fail + +class DefaultAsyncRetryStepTest { + private val scheduler = ManualScheduler() + + @AfterTest + fun tearDown() { + scheduler.close() + } + + // ----------------- Type-level invariants ----------------- + + @Test + fun `stage is RETRY and final`() { + val step = DefaultAsyncRetryStep(scheduler) + assertEquals(Stage.RETRY, step.stage) + val custom = + object : AsyncRetryStep() { + override fun processAsync( + request: Request, + next: AsyncPipelineNext, + ): CompletableFuture = next.processAsync() + } + assertEquals(Stage.RETRY, custom.stage) + } + + // ----------------- maxRetries semantics ----------------- + + @Test + fun `maxRetries = 0 performs exactly one attempt`() { + val client = QueueClient().enqueue(503) + val future = pipeline(client, HttpRetryOptions(maxRetries = 0)).sendAsync(getRequest()) + scheduler.runAll() + assertEquals(503, future.join().status.code) + assertEquals(1, client.callCount) + } + + @Test + fun `retries a 503 until a 200 within the budget`() { + val client = QueueClient().enqueue(503).enqueue(503).enqueue(200) + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ofMillis(50))) + .sendAsync(getRequest()) + // Each backoff schedules a task on the manual scheduler; drain them to advance the loop. + scheduler.runAll() + assertEquals(200, future.join().status.code) + assertEquals(3, client.callCount) + } + + @Test + fun `exhausts retries and returns the last retryable response`() { + val client = QueueClient().enqueue(503).enqueue(503).enqueue(503) + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = 2, delay = Duration.ofMillis(10))) + .sendAsync(getRequest()) + scheduler.runAll() + assertEquals(503, future.join().status.code) + // initial + 2 retries. + assertEquals(3, client.callCount) + } + + @Test + fun `non-retryable status is returned without retry`() { + val client = QueueClient().enqueue(404) + val future = pipeline(client, HttpRetryOptions()).sendAsync(getRequest()) + scheduler.runAll() + assertEquals(404, future.join().status.code) + assertEquals(1, client.callCount) + } + + // ----------------- Exception retry ----------------- + + @Test + fun `retries a retryable IOException then succeeds`() { + val client = FailNTimesClient(failures = 2, exception = IOException("boom")) + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ofMillis(5))) + .sendAsync(getRequest()) + scheduler.runAll() + assertEquals(200, future.join().status.code) + assertEquals(3, client.callCount) + } + + @Test + fun `terminal exception carries prior attempts as suppressed`() { + val client = FailNTimesClient(failures = 5, exception = IOException("boom")) + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = 2, delay = Duration.ofMillis(5))) + .sendAsync(getRequest()) + scheduler.runAll() + val thrown = assertFails { future.join() } + val cause = Futures.unwrap(thrown) + assertTrue(cause is IOException) + // 2 prior failures attached as suppressed (initial + first retry), terminal is the 3rd. + assertEquals(2, cause.suppressed.size) + assertEquals(3, client.callCount) + } + + @Test + fun `non-retryable exception is surfaced immediately`() { + val client = FailNTimesClient(failures = 5, exception = IllegalArgumentException("nope")) + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ZERO)) + .sendAsync(getRequest()) + scheduler.runAll() + val thrown = assertFails { future.join() } + assertTrue(Futures.unwrap(thrown) is IllegalArgumentException) + assertEquals(1, client.callCount) + } + + // ----------------- Idempotency awareness ----------------- + + @Test + fun `bare POST without body is not retried`() { + val client = QueueClient().enqueue(503) + val request = + Request.builder().method(Method.POST).url("https://api.example.com/x").build() + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ZERO)) + .sendAsync(request) + scheduler.runAll() + assertEquals(503, future.join().status.code) + assertEquals(1, client.callCount) + } + + @Test + fun `POST with a replayable body is retried`() { + val client = QueueClient().enqueue(503).enqueue(200) + val request = + Request.builder() + .method(Method.POST) + .url("https://api.example.com/x") + .body(RequestBody.create("payload".toByteArray())) + .build() + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ZERO)) + .sendAsync(request) + scheduler.runAll() + assertEquals(200, future.join().status.code) + assertEquals(2, client.callCount) + } + + @Test + fun `POST with a non-replayable body is not retried`() { + val client = QueueClient().enqueue(503) + val request = + Request.builder() + .method(Method.POST) + .url("https://api.example.com/x") + .body(NonReplayableBody()) + .build() + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ZERO)) + .sendAsync(request) + scheduler.runAll() + assertEquals(503, future.join().status.code) + assertEquals(1, client.callCount) + } + + // ----------------- Retry-After honoring ----------------- + + @Test + fun `Retry-After seconds header is honored as the delay`() { + val client = + QueueClient() + .enqueue(503, Headers.Builder().add("Retry-After", "2").build()) + .enqueue(200) + val future = + pipeline(client, HttpRetryOptions(maxRetries = 3)) + .sendAsync(getRequest()) + // The first retry should be scheduled for 2 seconds. + scheduler.runAll() + assertEquals(200, future.join().status.code) + val scheduled = scheduler.recordedDelays + assertTrue(scheduled.any { it == Duration.ofSeconds(2) }, "expected a 2s scheduled delay, got $scheduled") + } + + // ----------------- Body close before retry ----------------- + + @Test + fun `prior retryable response body is closed before retrying`() { + val closes = AtomicInteger(0) + var n = 0 + val client = + AsyncHttpClient { request -> + n++ + val code = if (n == 1) 503 else 200 + CompletableFuture.completedFuture( + Response.builder() + .request(request) + .protocol(Protocol.HTTP_1_1) + .status(Status.fromCode(code)) + .body(CountingCloseBody(closes)) + .build(), + ) + } + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ZERO)) + .sendAsync(getRequest()) + scheduler.runAll() + future.join().close() + assertTrue(closes.get() >= 1, "the 503 body should have been closed before retry") + } + + // ----------------- Stack safety ----------------- + + @Test + fun `a long zero-delay retry sequence does not overflow the stack`() { + // 5000 zero-delay retries. An implementation that recursed per attempt (thenCompose + // chains or self-recursive drive) would blow the stack; the iterative trampoline must not. + val attempts = 5000 + val client = AlwaysFailClient(IOException("io")) + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = attempts, delay = Duration.ZERO)) + .sendAsync(getRequest()) + scheduler.runAll() + val thrown = assertFails { future.join() } + assertTrue(Futures.unwrap(thrown) is IOException) + assertEquals(attempts + 1, client.callCount) + } + + // ----------------- Cancellation / no real sleep ----------------- + + @Test + fun `delay is scheduled not slept - pending future before the scheduler runs`() { + val client = QueueClient().enqueue(503).enqueue(200) + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ofSeconds(30))) + .sendAsync(getRequest()) + // The first attempt already ran (503), but the retry is parked on the scheduler — the + // future is NOT complete and no thread is blocked. + assertFalse(future.isDone) + assertEquals(1, client.callCount) + scheduler.runAll() + assertEquals(200, future.join().status.code) + } + + // ----------------- Helpers ----------------- + + private fun pipeline( + client: AsyncHttpClient, + options: HttpRetryOptions, + ) = AsyncHttpPipelineBuilder(client) + .append(DefaultAsyncRetryStep(scheduler, options, fixedClock())) + .build() + + private fun getRequest(): Request = Request.builder().method(Method.GET).url("https://api.example.com/x").build() + + private fun fixedClock(): Clock = + object : Clock { + override fun now(): Instant = Instant.EPOCH + + override fun monotonic(): Long = 0L + + override fun sleep(duration: Duration) = fail("async retry must not call Clock.sleep") + } + + /** Async client returning a FIFO queue of canned responses; throws if the queue is empty. */ + private class QueueClient : AsyncHttpClient { + private val queue = ArrayDeque>() + private val calls = AtomicInteger(0) + + val callCount: Int get() = calls.get() + + fun enqueue( + code: Int, + headers: Headers = Headers.Builder().build(), + ): QueueClient = apply { queue.addLast(code to headers) } + + override fun executeAsync(request: Request): CompletableFuture { + calls.incrementAndGet() + val (code, headers) = + queue.removeFirstOrNull() ?: error("QueueClient: no response enqueued") + return CompletableFuture.completedFuture( + Response.builder() + .request(request) + .protocol(Protocol.HTTP_1_1) + .status(Status.fromCode(code)) + .headers(headers) + .build(), + ) + } + } + + /** Fails the first [failures] attempts with [exception], then returns 200. */ + private class FailNTimesClient( + private val failures: Int, + private val exception: Exception, + ) : AsyncHttpClient { + private val calls = AtomicInteger(0) + + val callCount: Int get() = calls.get() + + override fun executeAsync(request: Request): CompletableFuture { + val n = calls.incrementAndGet() + return if (n <= failures) { + Futures.failed(exception) + } else { + CompletableFuture.completedFuture( + Response.builder() + .request(request) + .protocol(Protocol.HTTP_1_1) + .status(Status.OK) + .build(), + ) + } + } + } + + /** Always fails with [exception]. */ + private class AlwaysFailClient(private val exception: Exception) : AsyncHttpClient { + private val calls = AtomicInteger(0) + + val callCount: Int get() = calls.get() + + override fun executeAsync(request: Request): CompletableFuture { + calls.incrementAndGet() + return Futures.failed(exception) + } + } + + private class NonReplayableBody : RequestBody() { + override fun mediaType(): MediaType? = MediaType.parse("text/plain") + + override fun contentLength(): Long = 5 + + override fun isReplayable(): Boolean = false + + override fun writeTo(sink: BufferedSink) { + sink.write("hello".toByteArray(Charsets.UTF_8)) + } + } + + private class CountingCloseBody(private val closes: AtomicInteger) : ResponseBody() { + override fun mediaType(): MediaType? = null + + override fun contentLength(): Long = 0 + + override fun source(): BufferedSource = fail("body should not be read") + + override fun close() { + closes.incrementAndGet() + } + } +} diff --git a/sdk-core/src/testFixtures/kotlin/org/dexpace/sdk/core/testing/ManualScheduler.kt b/sdk-core/src/testFixtures/kotlin/org/dexpace/sdk/core/testing/ManualScheduler.kt new file mode 100644 index 00000000..b03bccd5 --- /dev/null +++ b/sdk-core/src/testFixtures/kotlin/org/dexpace/sdk/core/testing/ManualScheduler.kt @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.testing + +import java.time.Duration +import java.util.concurrent.Callable +import java.util.concurrent.Delayed +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit + +/** + * Deterministic [ScheduledExecutorService] for testing async, time-dependent pipeline steps + * (async retry backoff) without real sleeps or background threads. + * + * Only the `schedule(Runnable, delay, unit)` overload used by + * [org.dexpace.sdk.core.util.Futures.delay] is implemented; every other method throws + * [UnsupportedOperationException]. Scheduled tasks are NOT run automatically — the test drives + * them explicitly via [runAll] (or [runNext]), so the test thread controls exactly when each + * delayed continuation fires. Each scheduled delay is recorded in [recordedDelays] so a test can + * assert on the requested backoff schedule (e.g. that a `Retry-After: 2` produced a 2-second + * delay) without observing wall-clock time. + * + * Not thread-safe: tests run on a single thread. + */ +class ManualScheduler : ScheduledExecutorService { + private val pending: ArrayDeque = ArrayDeque() + + /** Every delay requested via [schedule], in submission order. Read-only snapshot semantics. */ + val recordedDelays: List get() = pending.map { it.delay } + ran + + private val ran: MutableList = mutableListOf() + private var closed = false + + /** Number of tasks still queued and not yet run. */ + val pendingCount: Int get() = pending.size + + /** + * Runs queued tasks until the queue is empty, including tasks that earlier tasks schedule + * while running (the async retry loop re-arms by scheduling a new delay). Cancelled tasks are + * skipped. Bounded so a misbehaving infinite re-schedule fails loudly instead of hanging. + */ + fun runAll() { + var guard = 0 + while (pending.isNotEmpty()) { + check(guard++ < MAX_DRAIN_ITERATIONS) { + "ManualScheduler.runAll exceeded $MAX_DRAIN_ITERATIONS iterations — likely an " + + "unbounded re-schedule loop" + } + runNext() + } + } + + /** Runs the next queued task (FIFO). No-op if the queue is empty. */ + fun runNext() { + val task = pending.removeFirstOrNull() ?: return + ran.add(task.delay) + if (!task.cancelled) task.command.run() + } + + override fun schedule( + command: Runnable, + delay: Long, + unit: TimeUnit, + ): ScheduledFuture<*> { + check(!closed) { "ManualScheduler is closed" } + val task = ScheduledTask(command, Duration.ofNanos(unit.toNanos(delay))) + pending.addLast(task) + return task + } + + /** Marks the scheduler closed and drops any queued tasks. Not an override on Java 8's + * [ScheduledExecutorService] (which gained `close()` only in Java 19); a plain helper. */ + fun close() { + closed = true + pending.clear() + } + + override fun shutdown() { + close() + } + + override fun shutdownNow(): List { + val drained = pending.map { it.command } + close() + return drained + } + + override fun isShutdown(): Boolean = closed + + override fun isTerminated(): Boolean = closed + + override fun awaitTermination( + timeout: Long, + unit: TimeUnit, + ): Boolean = closed + + // -- Unused ScheduledExecutorService surface ------------------------------------------------ + + override fun schedule( + callable: Callable, + delay: Long, + unit: TimeUnit, + ): ScheduledFuture = unsupported() + + override fun scheduleAtFixedRate( + command: Runnable, + initialDelay: Long, + period: Long, + unit: TimeUnit, + ): ScheduledFuture<*> = unsupported() + + override fun scheduleWithFixedDelay( + command: Runnable, + initialDelay: Long, + delay: Long, + unit: TimeUnit, + ): ScheduledFuture<*> = unsupported() + + override fun execute(command: Runnable) { + command.run() + } + + override fun submit(task: Callable): java.util.concurrent.Future = unsupported() + + override fun submit( + task: Runnable, + result: T, + ): java.util.concurrent.Future = unsupported() + + override fun submit(task: Runnable): java.util.concurrent.Future<*> = unsupported() + + override fun invokeAll( + tasks: MutableCollection>, + ): MutableList> = unsupported() + + override fun invokeAll( + tasks: MutableCollection>, + timeout: Long, + unit: TimeUnit, + ): MutableList> = unsupported() + + override fun invokeAny(tasks: MutableCollection>): T = unsupported() + + override fun invokeAny( + tasks: MutableCollection>, + timeout: Long, + unit: TimeUnit, + ): T = unsupported() + + private fun unsupported(): Nothing = + throw UnsupportedOperationException("ManualScheduler only supports schedule(Runnable, delay, unit)") + + /** A queued task. [ScheduledFuture] is implemented minimally — only cancellation matters. */ + private class ScheduledTask( + val command: Runnable, + val delay: Duration, + ) : ScheduledFuture { + var cancelled: Boolean = false + private set + + override fun cancel(mayInterruptIfRunning: Boolean): Boolean { + cancelled = true + return true + } + + override fun isCancelled(): Boolean = cancelled + + override fun isDone(): Boolean = cancelled + + override fun get(): Any? = null + + override fun get( + timeout: Long, + unit: TimeUnit, + ): Any? = null + + override fun getDelay(unit: TimeUnit): Long = unit.convert(delay.toNanos(), TimeUnit.NANOSECONDS) + + override fun compareTo(other: Delayed): Int = + getDelay(TimeUnit.NANOSECONDS).compareTo(other.getDelay(TimeUnit.NANOSECONDS)) + } + + private companion object { + private const val MAX_DRAIN_ITERATIONS = 100_000 + } +} From 21c653e4486077fe5156ae4ef54485e74ce4a2f1 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 24 Jun 2026 06:34:01 +0300 Subject: [PATCH 2/4] fix: complete the async retry future on every terminal path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DefaultAsyncRetryStep drove its retry loop through an attempt.whenComplete callback while completing the caller-facing future by hand inside onSuccess/onFailure. A throw escaping a whenComplete action is swallowed — it only completes the discarded callback future — so any exception raised while deciding to retry, computing the backoff, or logging left the returned future forever incomplete and hung the caller's join(). The common trigger was self-suppression: when a transport re-throws a single exception instance across attempts, the terminal step attached that instance under itself via addSuppressed, throwing IllegalArgumentException ("Self-suppression not permitted"). DefaultAsyncRetryStep: - Wrap the retry decision, delay computation and logging in onSuccess/onFailure so any throw completes the future exceptionally and closes an open retryable response, instead of being swallowed by the completion callback. - Skip self-references when attaching the prior-attempt trail as suppressed. - Intercept InterruptedIOException/InterruptedException before classification: restore the interrupt flag, normalise to InterruptedIOException, and surface it without retrying, matching the synchronous DefaultRetryStep and the SDK cancellation convention. - Guard scheduleNext against a synchronous scheduler rejection. - Guard the retry trampoline's re-arm state with a lock so the pump-exit is visible to a drive() resuming on a scheduler thread and the exit-check is atomic with a concurrent re-arm; the previous plain fields could strand the future on a real multi-threaded ScheduledExecutorService. - Close the response on the no-retry success path when the caller has already completed or cancelled the returned future. - Swallow any non-fatal close failure (not only IOException) on a discarded retryable response. Also simplify AsyncBearerTokenAuthStep.validateFresh to accept a nullable token directly, dropping a redundant cast and two suppressions. Tests: FailNTimesClient now throws a distinct exception per attempt; add regression coverage for a throwing retry/exception predicate, a synchronous scheduler rejection, interrupt handling, self-suppression with a reused instance, response close-on-cancel, a throwing response close, and retries driven on a real multi-threaded scheduler. --- .../steps/AsyncBearerTokenAuthStep.kt | 13 +- .../pipeline/steps/DefaultAsyncRetryStep.kt | 226 ++++++++++++---- .../steps/DefaultAsyncRetryStepTest.kt | 253 +++++++++++++++++- 3 files changed, 425 insertions(+), 67 deletions(-) diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt index d877e177..8b934973 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt @@ -225,14 +225,13 @@ public open class AsyncBearerTokenAuthStep } /** - * Validates a freshly fetched token: rejects a `null` (Kotlin intrinsics usually catch - * this earlier for Kotlin SAMs, but a platform-disabled-intrinsics context may not) and a - * token already expired at fetch time (no margin applied — a provider minting an - * effectively-expired token is misbehaving). + * Validates a freshly fetched token: rejects a `null` (a Java provider may hand one back + * despite the non-null Kotlin signature when null-check intrinsics are disabled — hence the + * nullable parameter) and a token already expired at fetch time (no margin applied — a + * provider minting an effectively-expired token is misbehaving). */ - @Suppress("UNCHECKED_CAST", "RedundantNullableReturnType") - private fun validateFresh(token: BearerToken): BearerToken { - val nonNull: BearerToken = (token as BearerToken?) ?: error("BearerTokenProvider returned null") + private fun validateFresh(token: BearerToken?): BearerToken { + val nonNull = token ?: error("BearerTokenProvider returned null") check(!nonNull.isExpiredAt(clock.now())) { "BearerTokenProvider returned an already-expired token" } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt index ebeece9f..38463671 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt @@ -19,9 +19,12 @@ import org.dexpace.sdk.core.pipeline.step.retry.RetrySettings import org.dexpace.sdk.core.util.Clock import org.dexpace.sdk.core.util.Futures import java.io.IOException +import java.io.InterruptedIOException import java.time.Duration import java.util.concurrent.CompletableFuture import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock /** * Default [AsyncRetryStep] — the async mirror of [DefaultRetryStep]. Re-invokes the downstream @@ -42,10 +45,12 @@ import java.util.concurrent.ScheduledExecutorService * The retry loop is driven iteratively by [drive], not by chaining `thenCompose` per attempt. * Each downstream attempt registers a single [CompletableFuture.whenComplete] callback. When the * outcome warrants another attempt, the callback hands control back to [drive] through a - * trampoline ([RetryDriver.continuation]) instead of calling [drive] recursively, so a retry - * sequence of length N never builds an N-deep stack frame chain or an N-deep future - * continuation graph. This mirrors the iterative `while` loop of [DefaultRetryStep] while staying - * fully async. + * lock-guarded re-arm flag instead of calling [drive] recursively, so a retry sequence of length + * N never builds an N-deep stack frame chain or an N-deep future continuation graph. The re-arm + * state is guarded by a lock because a non-blocking backoff hands the next [drive] to a scheduler + * thread while the previous pump ran on another thread — the lock supplies the happens-before + * edge that keeps the handoff visible and the exit-check atomic. This mirrors the iterative + * `while` loop of [DefaultRetryStep] while staying fully async. * * ## Re-sendability gating * @@ -62,8 +67,19 @@ import java.util.concurrent.ScheduledExecutorService * Downstream failures surface as exceptionally-completed futures; the loop unwraps the * [java.util.concurrent.CompletionException] wrapper via [Futures.unwrap] before classifying. * Only [Exception] subclasses are classified — an [Error] (OOM, StackOverflow) completes the - * call exceptionally without retry. On terminal failure every prior attempt's exception is - * attached to the surfaced exception via [Throwable.addSuppressed]. + * call exceptionally without retry. An [InterruptedIOException] / [InterruptedException] is + * treated as cancellation, not a retryable failure: the interrupt flag is restored and the call + * completes with an [InterruptedIOException], matching [DefaultRetryStep]. On terminal failure + * every prior attempt's exception is attached to the surfaced exception via + * [Throwable.addSuppressed] — skipping the surfaced exception itself, so a transport that reuses + * one exception instance across attempts cannot trip self-suppression. + * + * A `shouldRetry` predicate or delay computation that throws — or a synchronous scheduler + * rejection — completes the call exceptionally rather than leaving it hanging, and any open + * retryable response is closed first so it never leaks (mirroring [DefaultRetryStep]'s + * close-on-throw guard). Unlike the synchronous stack the terminal failure is surfaced as-is + * rather than wrapped in [IOException]: a [CompletableFuture] carries no checked-exception + * contract, so the original failure type reaches the caller (after [Futures.unwrap]) unchanged. * * ## Thread-safety * @@ -124,36 +140,53 @@ public open class DefaultAsyncRetryStep // Lazily allocated on first failure so the success path never pays for the list. private var suppressed: MutableList? = null - // Trampoline state. `pumping` is true while the synchronous pump loop in drive() is - // active; `rearm` records that another attempt should run. A re-arm that happens - // while the pump is active (an inline / zero-delay retry) just sets `rearm` and lets - // the loop pick it up — it never recurses into a new drive() frame. A re-arm that - // happens after the pump has exited (the common async case, fired from a scheduler - // or downstream-completion thread) starts a fresh, shallow pump. Both paths run - // sequentially per call — drive() is only ever entered by one thread at a time - // because each attempt's continuation fires exactly once and the previous pump has - // returned before the async callback runs. + // Trampoline state, guarded by [trampolineLock]. `pumping` is true while a pump loop + // in drive() is active; `rearm` records that another attempt should run. A re-arm + // while the pump is active (an inline / zero-delay retry, or a concurrent re-entry) + // just sets `rearm` and lets the active loop pick it up — it never recurses into a new + // drive() frame. A re-arm after the pump has exited starts a fresh, shallow pump. + // + // The flags are NOT plain fields: a non-blocking backoff hands the next drive() to a + // ScheduledExecutorService thread while the previous pump ran on the dispatching (or a + // transport) thread, so the two touch this state from different threads with no + // intrinsic happens-before edge. Guarding every read/write under the lock both makes + // the pump-exit write visible to the next cross-thread drive() (no stranding on a + // stale `pumping`) and makes the exit-check atomic with a concurrent re-arm (no lost + // wakeup). The lock is held only for the flag flips, never across startAttempt() or + // the downstream call, so it cannot pin a thread. Per CLAUDE.md, ReentrantLock (not + // synchronized) so a virtual-thread carrier is never pinned. + private val trampolineLock = ReentrantLock() private var pumping: Boolean = false private var rearm: Boolean = false /** * Entry point and trampoline. Marks that an attempt should run ([rearm]); if a pump - * loop is already active it returns immediately (the active loop will pick up the - * re-arm), otherwise it runs the loop. The loop keeps starting attempts as long as - * inline completions keep setting [rearm], so a burst of zero-delay retries unwinds - * iteratively instead of recursing. + * loop is already active it returns immediately (the active loop picks up the re-arm + * on its next exit-check), otherwise it runs the loop. The loop keeps starting attempts + * as long as completions keep setting [rearm], so a burst of zero-delay retries unwinds + * iteratively instead of recursing. Safe to call from any thread (see the field note). */ fun drive() { - rearm = true - if (pumping) return - pumping = true - try { - while (rearm) { + trampolineLock.withLock { + rearm = true + // A pump is already active (this thread or another); it will observe the + // re-arm on its next exit-check. Exactly one pump runs at a time. + if (pumping) return + pumping = true + } + while (true) { + trampolineLock.withLock { + // Atomic exit-check: clearing `pumping` under the same lock as drive()'s + // re-arm means a concurrent drive() either set `rearm` before this check + // (so the pump continues) or starts a fresh pump after it (because it sees + // `pumping` cleared) — the wakeup is never lost. + if (!rearm) { + pumping = false + return + } rearm = false - startAttempt() } - } finally { - pumping = false + startAttempt() } } @@ -189,16 +222,39 @@ public open class DefaultAsyncRetryStep } private fun onSuccess(response: Response) { - val retry = - retrySafe && - tryCount < options.maxRetries && - shouldRetryResponse(response) - if (!retry) { - result.complete(response) - return - } - val delay = computeResponseDelay(response, tryCount) - logRetry(tryCount, delay, response.status.code, cause = null) + val delay: Duration = + try { + val retry = + retrySafe && + tryCount < options.maxRetries && + shouldRetryResponse(response) + if (!retry) { + // Not retrying: hand the still-open response to the caller, who then + // owns closing it. If the caller already completed or cancelled the + // returned future (a race against an in-flight attempt), complete() is + // a no-op and the response would otherwise leak its socket/buffer — + // close it here in that case. + if (!result.complete(response)) closeQuietly(response) + return + } + val computed = computeResponseDelay(response, tryCount) + logRetry(tryCount, computed, response.status.code, cause = null) + computed + } catch (t: Throwable) { + // The retry decision, delay computation, or log call threw — e.g. a + // misbehaving shouldRetry predicate (surfaced as IllegalStateException), an + // Error rethrown by a delay override, or an unexpected backoff failure. The + // retryable response is still open, so close it before surfacing — + // otherwise its socket/buffer leaks — then complete the call terminally so + // the caller's future is never left hanging (a throw escaping this + // whenComplete callback would be swallowed, stranding `result`). Mirrors + // DefaultRetryStep.decideRetryResponse's close-on-throw guard. + closeQuietly(response) + failTerminally(t) + return + } + // Committed to retrying: release the retryable response before the backoff window, + // then schedule the next attempt. closeQuietly(response) tryCount++ scheduleNext(delay) @@ -207,30 +263,66 @@ public open class DefaultAsyncRetryStep private fun onFailure(rawError: Throwable) { val error = Futures.unwrap(rawError) // Errors (OOM, StackOverflow, …) are unrecoverable — never retry, never log. + // Surface as-is, matching DefaultRetryStep (which rethrows an Error before + // attaching the failure trail). if (error is Error) { result.completeExceptionally(error) return } val exception = error as Exception - val retry = - retrySafe && - tryCount < options.maxRetries && - shouldRetryException(exception) - if (!retry) { - suppressed?.forEach(exception::addSuppressed) - result.completeExceptionally(exception) + // Interrupts are never retryable, per the SDK-wide cancellation convention. + // Restore the interrupt flag (the completing thread's catch may have cleared it), + // normalise a bare InterruptedException to InterruptedIOException, and surface + // terminally with the prior-attempt trail attached — mirroring DefaultRetryStep's + // pre-classification interrupt carve-out. + if (exception is InterruptedIOException || exception is InterruptedException) { + Thread.currentThread().interrupt() + failTerminally(asInterruptedIo(exception)) return } - val accumulator = suppressed ?: ArrayList().also { suppressed = it } - val delay = computeExceptionDelay(exception, tryCount) - logRetry(tryCount, delay, statusCode = -1, cause = exception) + val delay: Duration = + try { + val retry = + retrySafe && + tryCount < options.maxRetries && + shouldRetryException(exception) + if (!retry) { + failTerminally(exception) + return + } + val computed = computeExceptionDelay(exception, tryCount) + logRetry(tryCount, computed, statusCode = -1, cause = exception) + computed + } catch (t: Throwable) { + // The retry decision, delay computation, or log call threw. Attach the + // in-flight failure and complete terminally so the caller never hangs (a + // throw escaping this whenComplete callback would be swallowed). No open + // response to close on the exception path. + if (t !== exception) t.addSuppressed(exception) + failTerminally(t) + return + } // Record the current failure BEFORE scheduling so it is attached to any later // terminal exception's suppressed list rather than being silently dropped. + val accumulator = suppressed ?: ArrayList().also { suppressed = it } accumulator.add(exception) tryCount++ scheduleNext(delay) } + /** + * Completes [result] exceptionally with [error]. Any accumulated prior-attempt + * failures are attached as suppressed, skipping [error] itself so a transport that + * re-throws one exception instance across attempts cannot trip + * [Throwable.addSuppressed]'s self-suppression guard. Used on every terminal path so + * the caller's future is always completed — never left hanging — with the full failure + * trail preserved. + */ + private fun failTerminally(error: Throwable) { + suppressed?.forEach { prior -> if (prior !== error) error.addSuppressed(prior) } + result.completeExceptionally(error) + } + /** * Schedules the next attempt after [delay]. [Futures.delay] returns an * already-complete future for a zero delay, so the [CompletableFuture.whenComplete] @@ -240,12 +332,21 @@ public open class DefaultAsyncRetryStep */ private fun scheduleNext(delay: Duration) { val safeDelay = if (delay.isNegative) Duration.ZERO else delay - Futures.delay(scheduler, safeDelay).whenComplete { _, scheduleError -> + val scheduled = + try { + Futures.delay(scheduler, safeDelay) + } catch (t: Throwable) { + // scheduler.schedule rejected the delay task synchronously (e.g. the + // scheduler was shut down → RejectedExecutionException). Surface terminally + // so the caller never hangs. + failTerminally(t) + return + } + scheduled.whenComplete { _, scheduleError -> if (scheduleError != null) { - // The scheduler rejected or failed the delay task — surface it with any - // accumulated prior failures attached. - suppressed?.forEach(scheduleError::addSuppressed) - result.completeExceptionally(scheduleError) + // The scheduled delay task failed — surface it with any accumulated prior + // failures attached. + failTerminally(scheduleError) } else { drive() } @@ -318,6 +419,17 @@ public open class DefaultAsyncRetryStep return body.isReplayable() } + /** + * Normalises an interrupt-signalling exception to [InterruptedIOException]: an + * [InterruptedIOException] is returned as-is; a bare [InterruptedException] is wrapped with + * the original attached as its cause. Mirrors [DefaultRetryStep]'s helper of the same name. + */ + private fun asInterruptedIo(exception: Exception): InterruptedIOException = + when (exception) { + is InterruptedIOException -> exception + else -> InterruptedIOException("retry interrupted").apply { initCause(exception) } + } + private fun invokeShouldRetry( predicate: HttpRetryConditionPredicate, condition: HttpRetryCondition, @@ -359,10 +471,16 @@ public open class DefaultAsyncRetryStep private fun closeQuietly(response: Response) { try { response.close() - } catch (closeErr: IOException) { + } catch (closeErr: Exception) { + // Swallow ANY close failure (not just IOException) on a response being discarded + // before a retry: it is not actionable, and on the async path an escaping throw + // would be swallowed by the whenComplete callback and strand the returned future. + // Only [Error] (OOM, StackOverflow) propagates — those are JVM-fatal, not ours to + // recover. (The sync DefaultRetryStep lets a non-IOException close failure surface + // as a terminal error; async cannot, since that path has no caller to throw to.) logger.atVerbose() .event("http.retry.close_failed") - .field("error.type", closeErr::class.java.simpleName ?: "IOException") + .field("error.type", closeErr::class.java.simpleName ?: "Exception") .log() } } diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStepTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStepTest.kt index 11088fb5..89602b45 100644 --- a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStepTest.kt +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStepTest.kt @@ -26,9 +26,12 @@ import org.dexpace.sdk.core.testing.ManualScheduler import org.dexpace.sdk.core.util.Clock import org.dexpace.sdk.core.util.Futures import java.io.IOException +import java.io.InterruptedIOException import java.time.Duration import java.time.Instant import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import kotlin.test.AfterTest import kotlin.test.Test @@ -110,7 +113,7 @@ class DefaultAsyncRetryStepTest { @Test fun `retries a retryable IOException then succeeds`() { - val client = FailNTimesClient(failures = 2, exception = IOException("boom")) + val client = FailNTimesClient(failures = 2) { IOException("boom") } val future = pipeline(client, HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ofMillis(5))) .sendAsync(getRequest()) @@ -121,7 +124,7 @@ class DefaultAsyncRetryStepTest { @Test fun `terminal exception carries prior attempts as suppressed`() { - val client = FailNTimesClient(failures = 5, exception = IOException("boom")) + val client = FailNTimesClient(failures = 5) { IOException("boom") } val future = pipeline(client, HttpRetryOptions.fixed(maxRetries = 2, delay = Duration.ofMillis(5))) .sendAsync(getRequest()) @@ -136,7 +139,7 @@ class DefaultAsyncRetryStepTest { @Test fun `non-retryable exception is surfaced immediately`() { - val client = FailNTimesClient(failures = 5, exception = IllegalArgumentException("nope")) + val client = FailNTimesClient(failures = 5) { IllegalArgumentException("nope") } val future = pipeline(client, HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ZERO)) .sendAsync(getRequest()) @@ -273,6 +276,226 @@ class DefaultAsyncRetryStepTest { assertEquals(200, future.join().status.code) } + // ----------------- Throwing predicate / delay (must complete the future, never hang) ----------------- + + @Test + fun `a throwing shouldRetry predicate completes the future exceptionally and closes the response`() { + val closes = AtomicInteger(0) + val client = + AsyncHttpClient { request -> + CompletableFuture.completedFuture( + Response.builder() + .request(request) + .protocol(Protocol.HTTP_1_1) + .status(Status.fromCode(503)) + .body(CountingCloseBody(closes)) + .build(), + ) + } + val options = + HttpRetryOptions( + maxRetries = 3, + shouldRetryCondition = { _ -> throw RuntimeException("predicate boom") }, + ) + val future = pipeline(client, options).sendAsync(getRequest()) + scheduler.runAll() + // The predicate threw inside the completion callback. The future must still complete + // (not hang), and the still-open retryable response must be closed so it does not leak. + val thrown = assertFails { future.join() } + assertTrue( + Futures.unwrap(thrown) is IllegalStateException, + "a throwing predicate must surface as IllegalStateException", + ) + assertEquals(1, closes.get(), "the retryable response must be closed when the predicate throws") + } + + @Test + fun `a throwing shouldRetryException predicate completes the future exceptionally`() { + val client = AlwaysFailClient(IOException("io")) + val options = + HttpRetryOptions( + maxRetries = 3, + shouldRetryException = { _ -> throw RuntimeException("predicate boom") }, + ) + val future = pipeline(client, options).sendAsync(getRequest()) + scheduler.runAll() + val thrown = assertFails { future.join() } + assertTrue(Futures.unwrap(thrown) is IllegalStateException) + assertEquals(1, client.callCount, "the loop must not retry when the predicate itself throws") + } + + @Test + fun `a synchronous scheduler rejection completes the future exceptionally`() { + // A closed ManualScheduler rejects schedule(...) synchronously — stand-in for a real + // ScheduledExecutorService throwing RejectedExecutionException after shutdown. + val rejecting = ManualScheduler().apply { close() } + val client = QueueClient().enqueue(503) + val future = + AsyncHttpPipelineBuilder(client) + .append( + DefaultAsyncRetryStep( + rejecting, + HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ofSeconds(1)), + fixedClock(), + ), + ) + .build() + .sendAsync(getRequest()) + // The first attempt ran (503); scheduling the 1s retry on the closed scheduler is rejected + // synchronously. That rejection must complete the future rather than stranding it. + assertTrue(future.isCompletedExceptionally, "a rejected schedule must complete the future") + assertEquals(1, client.callCount) + assertFails { future.join() } + } + + // ----------------- Cancellation (interrupts are surfaced, never retried) ----------------- + + @Test + fun `InterruptedIOException is surfaced as cancellation and never retried`() { + val client = AlwaysFailClient(InterruptedIOException("cancelled")) + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ZERO)) + .sendAsync(getRequest()) + scheduler.runAll() + val thrown = assertFails { future.join() } + // Read AND clear the flag so it cannot leak into a later test on this worker thread. + val interruptObserved = Thread.interrupted() + assertTrue(Futures.unwrap(thrown) is InterruptedIOException) + assertEquals(1, client.callCount, "an interrupt must abort the retry loop after one attempt") + assertTrue(interruptObserved, "the interrupt flag must be restored") + } + + @Test + fun `a bare InterruptedException is normalised to InterruptedIOException and not retried`() { + val client = AlwaysFailClient(InterruptedException("bare")) + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ZERO)) + .sendAsync(getRequest()) + scheduler.runAll() + val thrown = assertFails { future.join() } + val interruptObserved = Thread.interrupted() + val cause = Futures.unwrap(thrown) + assertTrue( + cause is InterruptedIOException, + "a bare InterruptedException must surface as InterruptedIOException", + ) + assertTrue( + cause.cause is InterruptedException, + "the original InterruptedException must be attached as the cause", + ) + assertEquals(1, client.callCount) + assertTrue(interruptObserved, "the interrupt flag must be restored") + } + + @Test + fun `the same exception instance reused across attempts completes the future without self-suppression`() { + // Regression for the CI hang: a transport that re-throws ONE exception instance would, on + // the terminal attempt, attach that instance under itself (addSuppressed(this) -> + // IllegalArgumentException) — which, swallowed by the completion callback, stranded the + // returned future forever. + val shared = IOException("shared boom") + val client = AlwaysFailClient(shared) + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ZERO)) + .sendAsync(getRequest()) + scheduler.runAll() + val thrown = assertFails { future.join() } + val cause = Futures.unwrap(thrown) + assertTrue(cause === shared, "the shared exception instance must surface as the terminal failure") + assertFalse(cause.suppressed.any { it === cause }, "an exception must never be suppressed under itself") + assertEquals(4, client.callCount, "initial + 3 retries") + } + + // ----------------- Resource hygiene ----------------- + + @Test + fun `a response delivered after the caller cancelled the future is closed not leaked`() { + val closes = AtomicInteger(0) + val deferred = CompletableFuture() + val client = AsyncHttpClient { _ -> deferred } + val req = getRequest() + val future = pipeline(client, HttpRetryOptions(maxRetries = 3)).sendAsync(req) + // The attempt is in flight; cancel the returned future before it completes. + assertFalse(future.isDone) + future.cancel(true) + // The downstream attempt now completes with an open 2xx response. Since the caller already + // cancelled, result.complete(...) is a no-op — the step must close the response itself. + deferred.complete( + Response.builder() + .request(req) + .protocol(Protocol.HTTP_1_1) + .status(Status.OK) + .body(CountingCloseBody(closes)) + .build(), + ) + scheduler.runAll() + assertEquals(1, closes.get(), "a response delivered after cancellation must be closed, not leaked") + } + + @Test + fun `a retryable response whose close throws does not strand the retry`() { + var n = 0 + val client = + AsyncHttpClient { request -> + n++ + val builder = + Response.builder() + .request(request) + .protocol(Protocol.HTTP_1_1) + .status(Status.fromCode(if (n == 1) 503 else 200)) + if (n == 1) builder.body(ThrowingCloseBody()) + CompletableFuture.completedFuture(builder.build()) + } + val future = + pipeline(client, HttpRetryOptions.fixed(maxRetries = 3, delay = Duration.ZERO)) + .sendAsync(getRequest()) + scheduler.runAll() + // The 503's body.close() throws; closeQuietly must swallow it so the retry proceeds to 200 + // rather than the throw escaping the callback and stranding the future. + assertEquals(200, future.join().status.code) + assertEquals(2, n, "the retry must proceed past the throwing close") + } + + // ----------------- Real multi-threaded scheduler (trampoline visibility) ----------------- + + @Test + fun `retries on a real multi-threaded scheduler complete without stranding`() { + // Exercises the trampoline across real threads: each backoff fires drive() on a scheduler + // pool thread, distinct from the thread that ran the previous pump. Guards the lock-guarded + // re-arm handoff — a regression to plain fields could strand the future, and get(timeout) + // fails the test rather than hanging it. + val realScheduler = Executors.newScheduledThreadPool(2) + try { + val calls = AtomicInteger(0) + val client = + AsyncHttpClient { request -> + val n = calls.incrementAndGet() + CompletableFuture.completedFuture( + Response.builder() + .request(request) + .protocol(Protocol.HTTP_1_1) + .status(Status.fromCode(if (n <= 10) 503 else 200)) + .build(), + ) + } + val future = + AsyncHttpPipelineBuilder(client) + .append( + DefaultAsyncRetryStep( + realScheduler, + HttpRetryOptions.fixed(maxRetries = 15, delay = Duration.ofMillis(1)), + fixedClock(), + ), + ) + .build() + .sendAsync(getRequest()) + assertEquals(200, future.get(10, TimeUnit.SECONDS).status.code) + assertEquals(11, calls.get(), "10 retryable 503s + a final 200") + } finally { + realScheduler.shutdownNow() + } + } + // ----------------- Helpers ----------------- private fun pipeline( @@ -320,10 +543,15 @@ class DefaultAsyncRetryStepTest { } } - /** Fails the first [failures] attempts with [exception], then returns 200. */ + /** + * Fails the first [failures] attempts, then returns 200. Each failure gets a **fresh** + * exception from [exceptionFactory] — real transports throw a distinct instance per attempt, + * and reusing one instance would make the accumulated suppressed-failure trail meaningless + * (an exception cannot be suppressed under itself). + */ private class FailNTimesClient( private val failures: Int, - private val exception: Exception, + private val exceptionFactory: () -> Exception, ) : AsyncHttpClient { private val calls = AtomicInteger(0) @@ -332,7 +560,7 @@ class DefaultAsyncRetryStepTest { override fun executeAsync(request: Request): CompletableFuture { val n = calls.incrementAndGet() return if (n <= failures) { - Futures.failed(exception) + Futures.failed(exceptionFactory()) } else { CompletableFuture.completedFuture( Response.builder() @@ -380,4 +608,17 @@ class DefaultAsyncRetryStepTest { closes.incrementAndGet() } } + + /** A response body whose close() throws a non-IOException, to prove closeQuietly swallows it. */ + private class ThrowingCloseBody : ResponseBody() { + override fun mediaType(): MediaType? = null + + override fun contentLength(): Long = 0 + + override fun source(): BufferedSource = fail("body should not be read") + + override fun close() { + throw RuntimeException("close boom") + } + } } From 3ff24e38b5efff9eb31dcda21bb14ae10fdb2f8d Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 24 Jun 2026 07:16:39 +0300 Subject: [PATCH 3/4] feat: make async retry delay computation overridable The synchronous DefaultRetryStep exposes computeResponseDelay, computeExceptionDelay, and retryAfterFromHeaders as protected open extension points so a subclass can apply request-specific pacing (for example a server-specific Retry-After variant). DefaultAsyncRetryStep carried the same logic but buried it in a private per-call driver, so there was no way to customise delay resolution from a subclass. Hoist the three methods onto DefaultAsyncRetryStep with the same signatures as the synchronous step; the per-call driver now builds the HttpRetryCondition and dispatches through them, so an override is honored. Delay resolution order and values are unchanged. Also correct ManualScheduler.recordedDelays to report run-then-pending so it matches its documented submission order, and add async coverage for retry-after-ms parsing, delayFromCondition precedence, and a subclass delay override. --- sdk-core/api/sdk-core.api | 3 + .../pipeline/steps/DefaultAsyncRetryStep.kt | 79 +++++++++++++------ .../steps/DefaultAsyncRetryStepTest.kt | 59 ++++++++++++++ .../sdk/core/testing/ManualScheduler.kt | 8 +- 4 files changed, 122 insertions(+), 27 deletions(-) diff --git a/sdk-core/api/sdk-core.api b/sdk-core/api/sdk-core.api index 749f4692..f33e2f7e 100644 --- a/sdk-core/api/sdk-core.api +++ b/sdk-core/api/sdk-core.api @@ -817,7 +817,10 @@ public class org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep : or public fun (Ljava/util/concurrent/ScheduledExecutorService;Lorg/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions;Lorg/dexpace/sdk/core/util/Clock;)V public fun (Ljava/util/concurrent/ScheduledExecutorService;Lorg/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions;Lorg/dexpace/sdk/core/util/Clock;Lorg/dexpace/sdk/core/instrumentation/ClientLogger;)V public synthetic fun (Ljava/util/concurrent/ScheduledExecutorService;Lorg/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions;Lorg/dexpace/sdk/core/util/Clock;Lorg/dexpace/sdk/core/instrumentation/ClientLogger;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + protected fun computeExceptionDelay (Lorg/dexpace/sdk/core/http/pipeline/steps/HttpRetryCondition;)Ljava/time/Duration; + protected fun computeResponseDelay (Lorg/dexpace/sdk/core/http/pipeline/steps/HttpRetryCondition;)Ljava/time/Duration; public fun processAsync (Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/http/pipeline/AsyncPipelineNext;)Ljava/util/concurrent/CompletableFuture; + protected fun retryAfterFromHeaders (Lorg/dexpace/sdk/core/http/response/Response;)Ljava/time/Duration; } public final class org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep$Companion { diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt index 38463671..c903d2ef 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt @@ -81,11 +81,19 @@ import kotlin.concurrent.withLock * rather than wrapped in [IOException]: a [CompletableFuture] carries no checked-exception * contract, so the original failure type reaches the caller (after [Futures.unwrap]) unchanged. * + * ## Subclassing + * + * Open specifically to customise delay resolution, matching the synchronous [DefaultRetryStep]. + * Override [computeResponseDelay], [computeExceptionDelay], or [retryAfterFromHeaders] to apply + * request-specific pacing; an override must return a non-negative [Duration] and must not throw. + * The retry-decision policy itself stays configured through [HttpRetryOptions]. + * * ## Thread-safety * * Stateless after construction (the per-call [RetryDriver] holds all mutable loop state). The * immutable [options] / [clock] / [scheduler] and the [ClientLogger] are shared across - * concurrent calls. + * concurrent calls. Any subclass override of the delay hooks must likewise be safe for + * concurrent invocation. */ public open class DefaultAsyncRetryStep @JvmOverloads @@ -237,7 +245,10 @@ public open class DefaultAsyncRetryStep if (!result.complete(response)) closeQuietly(response) return } - val computed = computeResponseDelay(response, tryCount) + val computed = + this@DefaultAsyncRetryStep.computeResponseDelay( + HttpRetryCondition(response, null, tryCount, suppressed ?: emptyList()), + ) logRetry(tryCount, computed, response.status.code, cause = null) computed } catch (t: Throwable) { @@ -290,7 +301,10 @@ public open class DefaultAsyncRetryStep failTerminally(exception) return } - val computed = computeExceptionDelay(exception, tryCount) + val computed = + this@DefaultAsyncRetryStep.computeExceptionDelay( + HttpRetryCondition(null, exception, tryCount, suppressed ?: emptyList()), + ) logRetry(tryCount, computed, statusCode = -1, cause = exception) computed } catch (t: Throwable) { @@ -365,27 +379,6 @@ public open class DefaultAsyncRetryStep return invokeShouldRetry(options.shouldRetryException, condition) } - // --------------- Delay computation --------------- - - private fun computeResponseDelay( - response: Response, - tryCount: Int, - ): Duration { - val condition = HttpRetryCondition(response, null, tryCount, (suppressed ?: emptyList())) - invokeDelayFromCondition(condition)?.let { return it } - retryAfterFromHeaders(response)?.let { return it } - return backoffOrFixed(tryCount) - } - - private fun computeExceptionDelay( - exception: Exception, - tryCount: Int, - ): Duration { - val condition = HttpRetryCondition(null, exception, tryCount, (suppressed ?: emptyList())) - invokeDelayFromCondition(condition)?.let { return it } - return backoffOrFixed(tryCount) - } - // --------------- Logging --------------- private fun logRetry( @@ -459,7 +452,43 @@ public open class DefaultAsyncRetryStep private fun backoffOrFixed(tryCount: Int): Duration = options.fixedDelay ?: BackoffCalculator.computeDelay(tryCount + 1, backoffSettings) - private fun retryAfterFromHeaders(response: Response): Duration? { + // --------------- Delay computation (subclass extension points) --------------- + + /** + * Computes the delay before retrying [condition]'s response. Resolution order mirrors + * [DefaultRetryStep.computeResponseDelay]: + * 1. [HttpRetryOptions.delayFromCondition] override (if it returns non-null). + * 2. `Retry-After` header parsing ([retryAfterFromHeaders]). + * 3. [HttpRetryOptions.fixedDelay] or exponential backoff. + * + * `protected open` so a subclass can apply request-specific delay logic, exactly as the + * synchronous [DefaultRetryStep] allows. An override MUST return a non-negative [Duration] + * and MUST NOT throw: a throw aborts the call (the open retryable response is closed first + * by the loop's close-on-throw guard). + */ + protected open fun computeResponseDelay(condition: HttpRetryCondition): Duration { + invokeDelayFromCondition(condition)?.let { return it } + condition.response?.let { retryAfterFromHeaders(it) }?.let { return it } + return backoffOrFixed(condition.tryCount) + } + + /** + * Computes the delay before retrying [condition]'s exception. Like [computeResponseDelay] + * but skips header parsing (there is no response to read headers from). `protected open` + * with the same invariants. Mirrors [DefaultRetryStep.computeExceptionDelay]. + */ + protected open fun computeExceptionDelay(condition: HttpRetryCondition): Duration { + invokeDelayFromCondition(condition)?.let { return it } + return backoffOrFixed(condition.tryCount) + } + + /** + * Walks [HttpRetryOptions.retryAfterHeaders] in order, returning the first parseable delay. + * `protected open` so a subclass can support additional server-specific pacing headers, + * mirroring [DefaultRetryStep.retryAfterFromHeaders]. May return `null` to fall through to + * the default backoff; must not throw. + */ + protected open fun retryAfterFromHeaders(response: Response): Duration? { val now = clock.now() for (name in options.retryAfterHeaders) { val raw = response.headers.get(name) ?: continue diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStepTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStepTest.kt index 89602b45..d8702214 100644 --- a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStepTest.kt +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStepTest.kt @@ -216,6 +216,65 @@ class DefaultAsyncRetryStepTest { assertTrue(scheduled.any { it == Duration.ofSeconds(2) }, "expected a 2s scheduled delay, got $scheduled") } + @Test + fun `retry-after-ms header is honored as a millisecond delay`() { + val client = + QueueClient() + .enqueue(503, Headers.Builder().add("retry-after-ms", "1500").build()) + .enqueue(200) + val future = + pipeline(client, HttpRetryOptions(maxRetries = 3)) + .sendAsync(getRequest()) + scheduler.runAll() + assertEquals(200, future.join().status.code) + val scheduled = scheduler.recordedDelays + assertTrue(scheduled.any { it == Duration.ofMillis(1500) }, "expected a 1500ms scheduled delay, got $scheduled") + } + + // ----------------- Delay extension points (parity with DefaultRetryStep) ----------------- + + @Test + fun `delayFromCondition override wins over Retry-After and backoff`() { + // The response carries Retry-After: 9, but the caller's delayFromCondition returns 4s, which + // has the highest precedence — so the scheduled delay must be 4s and Retry-After is not read. + val client = + QueueClient() + .enqueue(503, Headers.Builder().add("Retry-After", "9").build()) + .enqueue(200) + val options = + HttpRetryOptions( + maxRetries = 3, + delayFromCondition = { _ -> Duration.ofSeconds(4) }, + ) + val future = pipeline(client, options).sendAsync(getRequest()) + scheduler.runAll() + assertEquals(200, future.join().status.code) + val scheduled = scheduler.recordedDelays + assertTrue(scheduled.any { it == Duration.ofSeconds(4) }, "delayFromCondition (4s) must win, got $scheduled") + assertFalse( + scheduled.any { it == Duration.ofSeconds(9) }, + "Retry-After must not be consulted once delayFromCondition returns a delay", + ) + } + + @Test + fun `a subclass can override computeResponseDelay`() { + // The async step exposes the same protected delay hooks as the synchronous DefaultRetryStep, + // so a subclass can supply request-specific pacing. + val client = QueueClient().enqueue(503).enqueue(200) + val step = + object : DefaultAsyncRetryStep(scheduler, HttpRetryOptions(maxRetries = 3), fixedClock()) { + override fun computeResponseDelay(condition: HttpRetryCondition): Duration = Duration.ofSeconds(7) + } + val future = AsyncHttpPipelineBuilder(client).append(step).build().sendAsync(getRequest()) + scheduler.runAll() + assertEquals(200, future.join().status.code) + assertTrue( + scheduler.recordedDelays.any { it == Duration.ofSeconds(7) }, + "the subclass-supplied 7s delay must be used, got ${scheduler.recordedDelays}", + ) + } + // ----------------- Body close before retry ----------------- @Test diff --git a/sdk-core/src/testFixtures/kotlin/org/dexpace/sdk/core/testing/ManualScheduler.kt b/sdk-core/src/testFixtures/kotlin/org/dexpace/sdk/core/testing/ManualScheduler.kt index b03bccd5..9c0888f2 100644 --- a/sdk-core/src/testFixtures/kotlin/org/dexpace/sdk/core/testing/ManualScheduler.kt +++ b/sdk-core/src/testFixtures/kotlin/org/dexpace/sdk/core/testing/ManualScheduler.kt @@ -31,8 +31,12 @@ import java.util.concurrent.TimeUnit class ManualScheduler : ScheduledExecutorService { private val pending: ArrayDeque = ArrayDeque() - /** Every delay requested via [schedule], in submission order. Read-only snapshot semantics. */ - val recordedDelays: List get() = pending.map { it.delay } + ran + /** + * Every delay requested via [schedule], in submission order: already-run tasks first (in the + * FIFO order they ran), then still-pending tasks. After [runAll] drains the queue this is simply + * every delay ever scheduled, in order. Read-only snapshot semantics. + */ + val recordedDelays: List get() = ran + pending.map { it.delay } private val ran: MutableList = mutableListOf() private var closed = false From 6646f3518ac3ebd6b52aa5f5b406f69326ba2441 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 24 Jun 2026 07:16:39 +0300 Subject: [PATCH 4/4] fix: log async bearer background-refresh failures once per refresh AsyncBearerTokenAuthStep returns a still-valid token immediately while refreshing it off-thread once it enters the refresh margin. Every request in that window called startBackgroundRefresh, and each attached its own failure-logging callback to the shared single-flight fetch, so a single failed refresh emitted one warning per concurrent request instead of one per fetch. Route the failure log through a new onLaunch hook on sharedFetch that runs only for the caller that actually starts the fetch; joiners and cache hits no longer attach a callback. Single-flight coalescing, cache bookkeeping, and the in-flight handoff are unchanged. --- .../steps/AsyncBearerTokenAuthStep.kt | 79 +++++++++++-------- .../steps/AsyncBearerTokenAuthStepTest.kt | 64 +++++++++++++++ 2 files changed, 112 insertions(+), 31 deletions(-) diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt index 8b934973..6f31c708 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt @@ -144,18 +144,25 @@ public open class AsyncBearerTokenAuthStep * stamps; this just warms the cache for the next request. */ private fun startBackgroundRefresh() { - // Reuse the single-flight machinery; ignore the returned future (fire-and-forget). - // exceptionally/handle keeps an unhandled background failure from surfacing as an - // uncaught CompletableFuture completion. - sharedFetch().whenComplete { _, error -> - if (error != null) { - logger.atWarning() - .event("http.auth.background_refresh_failed") - .field("error.type", error::class.java.simpleName ?: "Throwable") - .cause(error) - .log() - } - } + // Reuse the single-flight machinery; ignore the returned future (fire-and-forget). The + // failure log is attached via [sharedFetch]'s onLaunch hook so ONLY the caller that + // actually launches the fetch logs it: every request in the expiring window funnels onto + // one in-flight refresh, and a single failed refresh must log once — not once per + // concurrent joiner. (A forced, awaited fetch surfaces its failure to the caller and + // passes no onLaunch, so it is never double-counted here.) + sharedFetch( + onLaunch = { fetch -> + fetch.whenComplete { _, error -> + if (error != null) { + logger.atWarning() + .event("http.auth.background_refresh_failed") + .field("error.type", error::class.java.simpleName ?: "Throwable") + .cause(error) + .log() + } + } + }, + ) } /** @@ -171,28 +178,38 @@ public open class AsyncBearerTokenAuthStep * slot is cleared on completion so a subsequent refresh starts fresh. A re-check of the * cache inside the lock means a token another thread just refreshed short-circuits the * fetch. + * + * [onLaunch] is invoked exactly once, and ONLY for the caller that actually starts a new + * fetch — never for a cache hit or a joiner onto an already-in-flight fetch. It runs + * outside [lock] (so a callback it attaches never executes under the lock), which is how + * the background-refresh path attaches its failure log without it firing once per joiner. */ - private fun sharedFetch(): CompletableFuture { - lock.withLock { - // Re-read inside the lock: another thread may have just refreshed. - val now = clock.now() - cachedToken?.takeIf { !it.isExpiredAt(now, refreshMargin) } - ?.let { return CompletableFuture.completedFuture(it) } - inFlight?.let { return it } - val fetch = launchFetch() - inFlight = fetch - // Attach cache bookkeeping AFTER publishing, so the clear-on-complete callback - // compares against the exact future stored in `inFlight`. Attaching here (rather - // than inside launchFetch) sidesteps the self-reference an inline-completed future - // would otherwise need. - fetch.whenComplete { token, error -> - lock.withLock { - if (inFlight === fetch) inFlight = null - if (error == null && token != null) cachedToken = token + private fun sharedFetch( + onLaunch: ((CompletableFuture) -> Unit)? = null, + ): CompletableFuture { + val launched: CompletableFuture = + lock.withLock { + // Re-read inside the lock: another thread may have just refreshed. + val now = clock.now() + cachedToken?.takeIf { !it.isExpiredAt(now, refreshMargin) } + ?.let { return CompletableFuture.completedFuture(it) } + inFlight?.let { return it } + val fetch = launchFetch() + inFlight = fetch + // Attach cache bookkeeping AFTER publishing, so the clear-on-complete callback + // compares against the exact future stored in `inFlight`. Attaching here (rather + // than inside launchFetch) sidesteps the self-reference an inline-completed future + // would otherwise need. + fetch.whenComplete { token, error -> + lock.withLock { + if (inFlight === fetch) inFlight = null + if (error == null && token != null) cachedToken = token + } } + fetch } - return fetch - } + onLaunch?.invoke(launched) + return launched } /** diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStepTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStepTest.kt index d3ad74bd..117a1d88 100644 --- a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStepTest.kt +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStepTest.kt @@ -18,6 +18,8 @@ import org.dexpace.sdk.core.http.request.Method import org.dexpace.sdk.core.http.request.Request import org.dexpace.sdk.core.http.response.Response import org.dexpace.sdk.core.http.response.Status +import org.dexpace.sdk.core.instrumentation.ClientLogger +import org.dexpace.sdk.core.instrumentation.FakeSlf4jLogger import org.dexpace.sdk.core.io.Io import org.dexpace.sdk.core.testing.FixedClock import org.dexpace.sdk.core.util.Futures @@ -279,6 +281,68 @@ class AsyncBearerTokenAuthStepTest { assertEquals(1, call, "a Basic challenge must not trigger a bearer re-fetch + retry") } + // ----------------- Background-refresh failure ----------------- + + @Test + fun `a failed background refresh logs once across joiners and leaves the cache usable`() { + // Seed an expiring-but-valid token, then fail the background refresh while several requests + // join the SAME in-flight refresh. The failure must be logged exactly once (by the launcher), + // not once per joiner, and the cache must remain usable — a later refresh still succeeds. + val fakeSlf4j = FakeSlf4jLogger("test.auth") + val clientLogger = ClientLogger.forTesting(fakeSlf4j) + val deferred = CompletableFuture() + val seeded = CompletableFuture.completedFuture(BearerToken("old", now.plusSeconds(10))) + val fetches = AtomicInteger(0) + val provider = + object : BearerTokenProvider { + override fun fetch( + scopes: List, + params: Map, + ): BearerToken = error("blocking fetch must not be called") + + override fun fetchAsync( + scopes: List, + params: Map, + ): CompletableFuture = + when (fetches.getAndIncrement()) { + 0 -> seeded + 1 -> deferred + else -> CompletableFuture.completedFuture(BearerToken("new", now.plusSeconds(3600))) + } + } + val client = RecordingClient(200) + val step = + AsyncBearerTokenAuthStep(provider, listOf("scope"), Duration.ofSeconds(30), clock, clientLogger) + val p = AsyncHttpPipelineBuilder(client).append(step).build() + + // R1 seeds "old"; R2 launches the background refresh (deferred); R3/R4 join the same one — + // all four are stamped with the still-valid "old" token without waiting on the refresh. + repeat(4) { + assertEquals(200, p.sendAsync(getRequest()).join().status.code) + assertEquals("Bearer old", client.lastAuth) + } + assertEquals(2, fetches.get(), "R3/R4 must join the in-flight refresh, not start new fetches") + + deferred.completeExceptionally(RuntimeException("token endpoint down")) + + val failureLogs = + fakeSlf4j.records.count { rec -> + rec.keyValues.any { it.key == "event" && it.value == "http.auth.background_refresh_failed" } + } + assertEquals(1, failureLogs, "a single failed background refresh must log once, not once per joiner") + + // Cache not poisoned: the failed refresh was not cached. The next request stamps the still + // valid "old" token and starts a fresh refresh that succeeds, so a later request sees "new". + p.sendAsync(getRequest()).join() + p.sendAsync(getRequest()).join() + assertEquals("Bearer new", client.lastAuth, "the cache recovered after the failed refresh") + val failureLogsAfterRecovery = + fakeSlf4j.records.count { rec -> + rec.keyValues.any { it.key == "event" && it.value == "http.auth.background_refresh_failed" } + } + assertEquals(1, failureLogsAfterRecovery, "the successful follow-up refresh must not add another failure log") + } + // ----------------- Helpers ----------------- private fun pipeline(