Skip to content

Commit d0885eb

Browse files
feat(client): expose sleeper option
fix(client): ensure single timer is created per client
1 parent f302bf3 commit d0885eb

File tree

8 files changed

+158
-49
lines changed

8 files changed

+158
-49
lines changed

braintrust-java-client-okhttp/src/main/kotlin/com/braintrustdata/api/client/okhttp/BraintrustOkHttpClient.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package com.braintrustdata.api.client.okhttp
55
import com.braintrustdata.api.client.BraintrustClient
66
import com.braintrustdata.api.client.BraintrustClientImpl
77
import com.braintrustdata.api.core.ClientOptions
8+
import com.braintrustdata.api.core.Sleeper
89
import com.braintrustdata.api.core.Timeout
910
import com.braintrustdata.api.core.http.AsyncStreamResponse
1011
import com.braintrustdata.api.core.http.Headers
@@ -133,6 +134,17 @@ class BraintrustOkHttpClient private constructor() {
133134
clientOptions.streamHandlerExecutor(streamHandlerExecutor)
134135
}
135136

137+
/**
138+
* The interface to use for delaying execution, like during retries.
139+
*
140+
* This is primarily useful for using fake delays in tests.
141+
*
142+
* Defaults to real execution delays.
143+
*
144+
* This class takes ownership of the sleeper and closes it when closed.
145+
*/
146+
fun sleeper(sleeper: Sleeper) = apply { clientOptions.sleeper(sleeper) }
147+
136148
/**
137149
* The clock to use for operations that require timing, like retries.
138150
*

braintrust-java-client-okhttp/src/main/kotlin/com/braintrustdata/api/client/okhttp/BraintrustOkHttpClientAsync.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package com.braintrustdata.api.client.okhttp
55
import com.braintrustdata.api.client.BraintrustClientAsync
66
import com.braintrustdata.api.client.BraintrustClientAsyncImpl
77
import com.braintrustdata.api.core.ClientOptions
8+
import com.braintrustdata.api.core.Sleeper
89
import com.braintrustdata.api.core.Timeout
910
import com.braintrustdata.api.core.http.AsyncStreamResponse
1011
import com.braintrustdata.api.core.http.Headers
@@ -133,6 +134,17 @@ class BraintrustOkHttpClientAsync private constructor() {
133134
clientOptions.streamHandlerExecutor(streamHandlerExecutor)
134135
}
135136

137+
/**
138+
* The interface to use for delaying execution, like during retries.
139+
*
140+
* This is primarily useful for using fake delays in tests.
141+
*
142+
* Defaults to real execution delays.
143+
*
144+
* This class takes ownership of the sleeper and closes it when closed.
145+
*/
146+
fun sleeper(sleeper: Sleeper) = apply { clientOptions.sleeper(sleeper) }
147+
136148
/**
137149
* The clock to use for operations that require timing, like retries.
138150
*

braintrust-java-core/src/main/kotlin/com/braintrustdata/api/core/ClientOptions.kt

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,16 @@ private constructor(
5454
* This class takes ownership of the executor and shuts it down, if possible, when closed.
5555
*/
5656
@get:JvmName("streamHandlerExecutor") val streamHandlerExecutor: Executor,
57+
/**
58+
* The interface to use for delaying execution, like during retries.
59+
*
60+
* This is primarily useful for using fake delays in tests.
61+
*
62+
* Defaults to real execution delays.
63+
*
64+
* This class takes ownership of the sleeper and closes it when closed.
65+
*/
66+
@get:JvmName("sleeper") val sleeper: Sleeper,
5767
/**
5868
* The clock to use for operations that require timing, like retries.
5969
*
@@ -146,6 +156,7 @@ private constructor(
146156
private var checkJacksonVersionCompatibility: Boolean = true
147157
private var jsonMapper: JsonMapper = jsonMapper()
148158
private var streamHandlerExecutor: Executor? = null
159+
private var sleeper: Sleeper? = null
149160
private var clock: Clock = Clock.systemUTC()
150161
private var baseUrl: String? = null
151162
private var headers: Headers.Builder = Headers.builder()
@@ -161,6 +172,7 @@ private constructor(
161172
checkJacksonVersionCompatibility = clientOptions.checkJacksonVersionCompatibility
162173
jsonMapper = clientOptions.jsonMapper
163174
streamHandlerExecutor = clientOptions.streamHandlerExecutor
175+
sleeper = clientOptions.sleeper
164176
clock = clientOptions.clock
165177
baseUrl = clientOptions.baseUrl
166178
headers = clientOptions.headers.toBuilder()
@@ -215,6 +227,17 @@ private constructor(
215227
else streamHandlerExecutor
216228
}
217229

230+
/**
231+
* The interface to use for delaying execution, like during retries.
232+
*
233+
* This is primarily useful for using fake delays in tests.
234+
*
235+
* Defaults to real execution delays.
236+
*
237+
* This class takes ownership of the sleeper and closes it when closed.
238+
*/
239+
fun sleeper(sleeper: Sleeper) = apply { this.sleeper = PhantomReachableSleeper(sleeper) }
240+
218241
/**
219242
* The clock to use for operations that require timing, like retries.
220243
*
@@ -399,6 +422,25 @@ private constructor(
399422
*/
400423
fun build(): ClientOptions {
401424
val httpClient = checkRequired("httpClient", httpClient)
425+
val streamHandlerExecutor =
426+
streamHandlerExecutor
427+
?: PhantomReachableExecutorService(
428+
Executors.newCachedThreadPool(
429+
object : ThreadFactory {
430+
431+
private val threadFactory: ThreadFactory =
432+
Executors.defaultThreadFactory()
433+
private val count = AtomicLong(0)
434+
435+
override fun newThread(runnable: Runnable): Thread =
436+
threadFactory.newThread(runnable).also {
437+
it.name =
438+
"braintrust-stream-handler-thread-${count.getAndIncrement()}"
439+
}
440+
}
441+
)
442+
)
443+
val sleeper = sleeper ?: PhantomReachableSleeper(DefaultSleeper())
402444

403445
val headers = Headers.builder()
404446
val queryParams = QueryParams.builder()
@@ -421,26 +463,14 @@ private constructor(
421463
httpClient,
422464
RetryingHttpClient.builder()
423465
.httpClient(httpClient)
466+
.sleeper(sleeper)
424467
.clock(clock)
425468
.maxRetries(maxRetries)
426469
.build(),
427470
checkJacksonVersionCompatibility,
428471
jsonMapper,
429-
streamHandlerExecutor
430-
?: Executors.newCachedThreadPool(
431-
object : ThreadFactory {
432-
433-
private val threadFactory: ThreadFactory =
434-
Executors.defaultThreadFactory()
435-
private val count = AtomicLong(0)
436-
437-
override fun newThread(runnable: Runnable): Thread =
438-
threadFactory.newThread(runnable).also {
439-
it.name =
440-
"braintrust-stream-handler-thread-${count.getAndIncrement()}"
441-
}
442-
}
443-
),
472+
streamHandlerExecutor,
473+
sleeper,
444474
clock,
445475
baseUrl,
446476
headers.build(),
@@ -466,5 +496,6 @@ private constructor(
466496
fun close() {
467497
httpClient.close()
468498
(streamHandlerExecutor as? ExecutorService)?.shutdown()
499+
sleeper.close()
469500
}
470501
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.braintrustdata.api.core
2+
3+
import java.time.Duration
4+
import java.util.Timer
5+
import java.util.TimerTask
6+
import java.util.concurrent.CompletableFuture
7+
8+
class DefaultSleeper : Sleeper {
9+
10+
private val timer = Timer("DefaultSleeper", true)
11+
12+
override fun sleep(duration: Duration) = Thread.sleep(duration.toMillis())
13+
14+
override fun sleepAsync(duration: Duration): CompletableFuture<Void> {
15+
val future = CompletableFuture<Void>()
16+
timer.schedule(
17+
object : TimerTask() {
18+
override fun run() {
19+
future.complete(null)
20+
}
21+
},
22+
duration.toMillis(),
23+
)
24+
return future
25+
}
26+
27+
override fun close() = timer.cancel()
28+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.braintrustdata.api.core
2+
3+
import java.time.Duration
4+
import java.util.concurrent.CompletableFuture
5+
6+
/**
7+
* A delegating wrapper around a [Sleeper] that closes it once it's only phantom reachable.
8+
*
9+
* This class ensures the [Sleeper] is closed even if the user forgets to do it.
10+
*/
11+
internal class PhantomReachableSleeper(private val sleeper: Sleeper) : Sleeper {
12+
13+
init {
14+
closeWhenPhantomReachable(this, sleeper)
15+
}
16+
17+
override fun sleep(duration: Duration) = sleeper.sleep(duration)
18+
19+
override fun sleepAsync(duration: Duration): CompletableFuture<Void> =
20+
sleeper.sleepAsync(duration)
21+
22+
override fun close() = sleeper.close()
23+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.braintrustdata.api.core
2+
3+
import java.time.Duration
4+
import java.util.concurrent.CompletableFuture
5+
6+
/**
7+
* An interface for delaying execution for a specified amount of time.
8+
*
9+
* Useful for testing and cleaning up resources.
10+
*/
11+
interface Sleeper : AutoCloseable {
12+
13+
/** Synchronously pauses execution for the given [duration]. */
14+
fun sleep(duration: Duration)
15+
16+
/** Asynchronously pauses execution for the given [duration]. */
17+
fun sleepAsync(duration: Duration): CompletableFuture<Void>
18+
19+
/** Overridden from [AutoCloseable] to not have a checked exception in its signature. */
20+
override fun close()
21+
}

braintrust-java-core/src/main/kotlin/com/braintrustdata/api/core/http/RetryingHttpClient.kt

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.braintrustdata.api.core.http
22

3+
import com.braintrustdata.api.core.DefaultSleeper
34
import com.braintrustdata.api.core.RequestOptions
5+
import com.braintrustdata.api.core.Sleeper
46
import com.braintrustdata.api.core.checkRequired
57
import com.braintrustdata.api.errors.BraintrustIoException
68
import com.braintrustdata.api.errors.BraintrustRetryableException
@@ -11,8 +13,6 @@ import java.time.OffsetDateTime
1113
import java.time.format.DateTimeFormatter
1214
import java.time.format.DateTimeParseException
1315
import java.time.temporal.ChronoUnit
14-
import java.util.Timer
15-
import java.util.TimerTask
1616
import java.util.UUID
1717
import java.util.concurrent.CompletableFuture
1818
import java.util.concurrent.ThreadLocalRandom
@@ -130,7 +130,10 @@ private constructor(
130130
return executeWithRetries(modifiedRequest, requestOptions)
131131
}
132132

133-
override fun close() = httpClient.close()
133+
override fun close() {
134+
httpClient.close()
135+
sleeper.close()
136+
}
134137

135138
private fun isRetryable(request: HttpRequest): Boolean =
136139
// Some requests, such as when a request body is being streamed, cannot be retried because
@@ -235,33 +238,14 @@ private constructor(
235238
class Builder internal constructor() {
236239

237240
private var httpClient: HttpClient? = null
238-
private var sleeper: Sleeper =
239-
object : Sleeper {
240-
241-
private val timer = Timer("RetryingHttpClient", true)
242-
243-
override fun sleep(duration: Duration) = Thread.sleep(duration.toMillis())
244-
245-
override fun sleepAsync(duration: Duration): CompletableFuture<Void> {
246-
val future = CompletableFuture<Void>()
247-
timer.schedule(
248-
object : TimerTask() {
249-
override fun run() {
250-
future.complete(null)
251-
}
252-
},
253-
duration.toMillis(),
254-
)
255-
return future
256-
}
257-
}
241+
private var sleeper: Sleeper? = null
258242
private var clock: Clock = Clock.systemUTC()
259243
private var maxRetries: Int = 2
260244
private var idempotencyHeader: String? = null
261245

262246
fun httpClient(httpClient: HttpClient) = apply { this.httpClient = httpClient }
263247

264-
@JvmSynthetic internal fun sleeper(sleeper: Sleeper) = apply { this.sleeper = sleeper }
248+
fun sleeper(sleeper: Sleeper) = apply { this.sleeper = sleeper }
265249

266250
fun clock(clock: Clock) = apply { this.clock = clock }
267251

@@ -272,17 +256,10 @@ private constructor(
272256
fun build(): HttpClient =
273257
RetryingHttpClient(
274258
checkRequired("httpClient", httpClient),
275-
sleeper,
259+
sleeper ?: DefaultSleeper(),
276260
clock,
277261
maxRetries,
278262
idempotencyHeader,
279263
)
280264
}
281-
282-
internal interface Sleeper {
283-
284-
fun sleep(duration: Duration)
285-
286-
fun sleepAsync(duration: Duration): CompletableFuture<Void>
287-
}
288265
}

braintrust-java-core/src/test/kotlin/com/braintrustdata/api/core/http/RetryingHttpClientTest.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.braintrustdata.api.core.http
22

33
import com.braintrustdata.api.client.okhttp.OkHttpClient
44
import com.braintrustdata.api.core.RequestOptions
5+
import com.braintrustdata.api.core.Sleeper
56
import com.braintrustdata.api.errors.BraintrustRetryableException
67
import com.github.tomakehurst.wiremock.client.WireMock.*
78
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo
@@ -294,12 +295,14 @@ internal class RetryingHttpClientTest {
294295
.httpClient(failingHttpClient)
295296
.maxRetries(2)
296297
.sleeper(
297-
object : RetryingHttpClient.Sleeper {
298+
object : Sleeper {
298299

299300
override fun sleep(duration: Duration) {}
300301

301302
override fun sleepAsync(duration: Duration): CompletableFuture<Void> =
302303
CompletableFuture.completedFuture(null)
304+
305+
override fun close() {}
303306
}
304307
)
305308
.build()
@@ -333,12 +336,14 @@ internal class RetryingHttpClientTest {
333336
.httpClient(httpClient)
334337
// Use a no-op `Sleeper` to make the test fast.
335338
.sleeper(
336-
object : RetryingHttpClient.Sleeper {
339+
object : Sleeper {
337340

338341
override fun sleep(duration: Duration) {}
339342

340343
override fun sleepAsync(duration: Duration): CompletableFuture<Void> =
341344
CompletableFuture.completedFuture(null)
345+
346+
override fun close() {}
342347
}
343348
)
344349

0 commit comments

Comments
 (0)