diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java index 20374358317..f5c02634b1a 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; @@ -104,7 +105,7 @@ public OkHttpGrpcSender( dispatcher = OkHttpUtil.newDispatcher(); this.managedExecutor = true; } else { - dispatcher = new Dispatcher(executorService); + dispatcher = OkHttpUtil.newDispatcher(executorService); this.managedExecutor = false; } @@ -155,22 +156,26 @@ public void send( RequestBody requestBody = new GrpcRequestBody(messageWriter, compressor); requestBuilder.post(requestBody); - InstrumentationUtil.suppressInstrumentation( - () -> - client - .newCall(requestBuilder.build()) - .enqueue( - new Callback() { - @Override - public void onFailure(Call call, IOException e) { - onError.accept(e); - } - - @Override - public void onResponse(Call call, Response response) { - handleResponse(response, onResponse); - } - })); + try { + InstrumentationUtil.suppressInstrumentation( + () -> + client + .newCall(requestBuilder.build()) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + onError.accept(e); + } + + @Override + public void onResponse(Call call, Response response) { + handleResponse(response, onResponse); + } + })); + } catch (RejectedExecutionException e) { + onError.accept(e); + } } private void handleResponse(Response response, Consumer onResponse) { diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java index ad35eae4a60..0522a09caa2 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.logging.Level; @@ -86,7 +87,7 @@ public OkHttpHttpSender( dispatcher = OkHttpUtil.newDispatcher(); this.managedExecutor = true; } else { - dispatcher = new Dispatcher(executorService); + dispatcher = OkHttpUtil.newDispatcher(executorService); this.managedExecutor = false; } @@ -138,22 +139,26 @@ public void send( requestBuilder.addHeader("Accept-Encoding", "gzip, identity"); requestBuilder.post(new RequestBodyImpl(messageWriter, compressor, mediaType)); - InstrumentationUtil.suppressInstrumentation( - () -> - client - .newCall(requestBuilder.build()) - .enqueue( - new Callback() { - @Override - public void onFailure(Call call, IOException e) { - onError.accept(e); - } - - @Override - public void onResponse(Call call, Response response) { - handleResponse(response, onResponse, onError); - } - })); + try { + InstrumentationUtil.suppressInstrumentation( + () -> + client + .newCall(requestBuilder.build()) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + onError.accept(e); + } + + @Override + public void onResponse(Call call, Response response) { + handleResponse(response, onResponse, onError); + } + })); + } catch (RejectedExecutionException e) { + onError.accept(e); + } } private void handleResponse( diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java index 4448c717e15..5b4ce5eab56 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java @@ -7,6 +7,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.internal.DaemonThreadFactory; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -20,6 +21,9 @@ * at any time. */ public final class OkHttpUtil { + private static final int DEFAULT_MAX_REQUESTS = 64; + private static final int DEFAULT_MAX_REQUESTS_PER_HOST = 5; + @SuppressWarnings("NonFinalStaticField") private static boolean propagateContextForTestingInDispatcher = false; @@ -30,16 +34,23 @@ public static void setPropagateContextForTestingInDispatcher( /** Returns a {@link Dispatcher} using daemon threads, otherwise matching the OkHttp default. */ public static Dispatcher newDispatcher() { - return new Dispatcher( + return newDispatcher( new ThreadPoolExecutor( 0, - Integer.MAX_VALUE, + DEFAULT_MAX_REQUESTS, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), createThreadFactory("okhttp-dispatch"))); } + public static Dispatcher newDispatcher(ExecutorService executorService) { + Dispatcher dispatcher = new Dispatcher(executorService); + dispatcher.setMaxRequests(DEFAULT_MAX_REQUESTS); + dispatcher.setMaxRequestsPerHost(DEFAULT_MAX_REQUESTS_PER_HOST); + return dispatcher; + } + private static DaemonThreadFactory createThreadFactory(String namePrefix) { if (propagateContextForTestingInDispatcher) { return new DaemonThreadFactory( diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java index a7f314c93b5..d1172f2209a 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java @@ -5,11 +5,13 @@ package io.opentelemetry.exporter.sender.okhttp.internal; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import io.opentelemetry.exporter.internal.RetryUtil; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.GrpcResponse; import io.opentelemetry.sdk.common.export.GrpcStatusCode; import io.opentelemetry.sdk.common.export.MessageWriter; import java.io.IOException; @@ -21,7 +23,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import okhttp3.MediaType; import okhttp3.Protocol; import okhttp3.Request; @@ -57,6 +62,34 @@ void isRetryable_NonRetryableGrpcStatus() { assertFalse(isRetryable); } + @Test + void send_rejectedExecution_callsOnError() { + ThreadPoolExecutor executor = + new ThreadPoolExecutor(0, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<>()); + executor.shutdown(); + + OkHttpGrpcSender sender = + new OkHttpGrpcSender( + "http://localhost", + null, + Duration.ofSeconds(10), + Duration.ofSeconds(10), + Collections::emptyMap, + null, + null, + null, + executor, + Long.MAX_VALUE); + + AtomicReference responseRef = new AtomicReference<>(); + AtomicReference errorRef = new AtomicReference<>(); + + sender.send(new TestMessageWriter(), responseRef::set, errorRef::set); + + assertThat(errorRef.get()).isNotNull(); + assertThat(responseRef.get()).isNull(); + } + private static Response createResponse(int httpCode, String grpcStatus, String message) { return new Response.Builder() .request(new Request.Builder().url("http://localhost/").build()) diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderTest.java new file mode 100644 index 00000000000..a74695df9af --- /dev/null +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderTest.java @@ -0,0 +1,63 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp.internal; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.sdk.common.export.HttpResponse; +import io.opentelemetry.sdk.common.export.MessageWriter; +import java.io.OutputStream; +import java.net.URI; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; + +class OkHttpHttpSenderTest { + + @Test + void send_rejectedExecution_callsOnError() { + ThreadPoolExecutor executor = + new ThreadPoolExecutor(0, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<>()); + executor.shutdown(); + + OkHttpHttpSender sender = + new OkHttpHttpSender( + URI.create("http://localhost"), + "text/plain", + null, + Duration.ofSeconds(10), + Duration.ofSeconds(10), + Collections::emptyMap, + null, + null, + null, + null, + executor, + Long.MAX_VALUE); + + AtomicReference responseRef = new AtomicReference<>(); + AtomicReference errorRef = new AtomicReference<>(); + + sender.send(new NoOpRequestBodyWriter(), responseRef::set, errorRef::set); + + assertThat(errorRef.get()).isNotNull(); + assertThat(responseRef.get()).isNull(); + } + + private static class NoOpRequestBodyWriter implements MessageWriter { + @Override + public void writeMessage(OutputStream output) {} + + @Override + public int getContentLength() { + return 0; + } + } +} diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtilTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtilTest.java new file mode 100644 index 00000000000..c03e660c37f --- /dev/null +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtilTest.java @@ -0,0 +1,36 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.type; + +import java.util.concurrent.ThreadPoolExecutor; +import okhttp3.Dispatcher; +import org.junit.jupiter.api.Test; + +class OkHttpUtilTest { + + @Test + void newDispatcher_isBounded() { + Dispatcher dispatcher = OkHttpUtil.newDispatcher(); + + try { + assertThat(dispatcher.getMaxRequests()).isEqualTo(64); + assertThat(dispatcher.getMaxRequestsPerHost()).isEqualTo(5); + assertThat(dispatcher.executorService()) + .asInstanceOf(type(ThreadPoolExecutor.class)) + .satisfies( + executor -> { + assertThat(executor.getMaximumPoolSize()).isEqualTo(64); + assertThat(executor.getRejectedExecutionHandler()) + .isInstanceOf(ThreadPoolExecutor.AbortPolicy.class); + }); + } finally { + dispatcher.executorService().shutdownNow(); + } + } +}