Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -104,7 +105,7 @@ public OkHttpGrpcSender(
dispatcher = OkHttpUtil.newDispatcher();
this.managedExecutor = true;
} else {
dispatcher = new Dispatcher(executorService);
dispatcher = OkHttpUtil.newDispatcher(executorService);
this.managedExecutor = false;
}

Expand Down Expand Up @@ -155,22 +156,26 @@ public void send(
RequestBody requestBody = new GrpcRequestBody(messageWriter, compressor);
requestBuilder.post(requestBody);

InstrumentationUtil.suppressInstrumentation(
() ->
client
.newCall(requestBuilder.build())
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
onError.accept(e);
}

@Override
public void onResponse(Call call, Response response) {
handleResponse(response, onResponse);
}
}));
try {
InstrumentationUtil.suppressInstrumentation(
() ->
client
.newCall(requestBuilder.build())
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
onError.accept(e);
}

@Override
public void onResponse(Call call, Response response) {
handleResponse(response, onResponse);
}
}));
} catch (RejectedExecutionException e) {
onError.accept(e);
}
}

private void handleResponse(Response response, Consumer<GrpcResponse> onResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
Expand Down Expand Up @@ -86,7 +87,7 @@ public OkHttpHttpSender(
dispatcher = OkHttpUtil.newDispatcher();
this.managedExecutor = true;
} else {
dispatcher = new Dispatcher(executorService);
dispatcher = OkHttpUtil.newDispatcher(executorService);
this.managedExecutor = false;
}

Expand Down Expand Up @@ -138,22 +139,26 @@ public void send(
requestBuilder.addHeader("Accept-Encoding", "gzip, identity");
requestBuilder.post(new RequestBodyImpl(messageWriter, compressor, mediaType));

InstrumentationUtil.suppressInstrumentation(
() ->
client
.newCall(requestBuilder.build())
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
onError.accept(e);
}

@Override
public void onResponse(Call call, Response response) {
handleResponse(response, onResponse, onError);
}
}));
try {
InstrumentationUtil.suppressInstrumentation(
() ->
client
.newCall(requestBuilder.build())
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
onError.accept(e);
}

@Override
public void onResponse(Call call, Response response) {
handleResponse(response, onResponse, onError);
}
}));
} catch (RejectedExecutionException e) {
onError.accept(e);
}
}

private void handleResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.internal.DaemonThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -20,6 +21,9 @@
* at any time.
*/
public final class OkHttpUtil {
private static final int DEFAULT_MAX_REQUESTS = 64;
private static final int DEFAULT_MAX_REQUESTS_PER_HOST = 5;

@SuppressWarnings("NonFinalStaticField")
private static boolean propagateContextForTestingInDispatcher = false;

Expand All @@ -30,16 +34,23 @@ public static void setPropagateContextForTestingInDispatcher(

/** Returns a {@link Dispatcher} using daemon threads, otherwise matching the OkHttp default. */
public static Dispatcher newDispatcher() {
return new Dispatcher(
return newDispatcher(
new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
DEFAULT_MAX_REQUESTS,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
createThreadFactory("okhttp-dispatch")));
}

public static Dispatcher newDispatcher(ExecutorService executorService) {
Dispatcher dispatcher = new Dispatcher(executorService);
dispatcher.setMaxRequests(DEFAULT_MAX_REQUESTS);
dispatcher.setMaxRequestsPerHost(DEFAULT_MAX_REQUESTS_PER_HOST);
return dispatcher;
}

private static DaemonThreadFactory createThreadFactory(String namePrefix) {
if (propagateContextForTestingInDispatcher) {
return new DaemonThreadFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

package io.opentelemetry.exporter.sender.okhttp.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.opentelemetry.exporter.internal.RetryUtil;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.GrpcResponse;
import io.opentelemetry.sdk.common.export.GrpcStatusCode;
import io.opentelemetry.sdk.common.export.MessageWriter;
import java.io.IOException;
Expand All @@ -21,7 +23,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.MediaType;
import okhttp3.Protocol;
import okhttp3.Request;
Expand Down Expand Up @@ -57,6 +62,34 @@ void isRetryable_NonRetryableGrpcStatus() {
assertFalse(isRetryable);
}

@Test
void send_rejectedExecution_callsOnError() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(0, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
executor.shutdown();

OkHttpGrpcSender sender =
new OkHttpGrpcSender(
"http://localhost",
null,
Duration.ofSeconds(10),
Duration.ofSeconds(10),
Collections::emptyMap,
null,
null,
null,
executor,
Long.MAX_VALUE);

AtomicReference<GrpcResponse> responseRef = new AtomicReference<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();

sender.send(new TestMessageWriter(), responseRef::set, errorRef::set);

assertThat(errorRef.get()).isNotNull();
assertThat(responseRef.get()).isNull();
}

private static Response createResponse(int httpCode, String grpcStatus, String message) {
return new Response.Builder()
.request(new Request.Builder().url("http://localhost/").build())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.sender.okhttp.internal;

import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.sdk.common.export.HttpResponse;
import io.opentelemetry.sdk.common.export.MessageWriter;
import java.io.OutputStream;
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;

class OkHttpHttpSenderTest {

@Test
void send_rejectedExecution_callsOnError() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(0, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
executor.shutdown();

OkHttpHttpSender sender =
new OkHttpHttpSender(
URI.create("http://localhost"),
"text/plain",
null,
Duration.ofSeconds(10),
Duration.ofSeconds(10),
Collections::emptyMap,
null,
null,
null,
null,
executor,
Long.MAX_VALUE);

AtomicReference<HttpResponse> responseRef = new AtomicReference<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();

sender.send(new NoOpRequestBodyWriter(), responseRef::set, errorRef::set);

assertThat(errorRef.get()).isNotNull();
assertThat(responseRef.get()).isNull();
}

private static class NoOpRequestBodyWriter implements MessageWriter {
@Override
public void writeMessage(OutputStream output) {}

@Override
public int getContentLength() {
return 0;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.sender.okhttp.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.InstanceOfAssertFactories.type;

import java.util.concurrent.ThreadPoolExecutor;
import okhttp3.Dispatcher;
import org.junit.jupiter.api.Test;

class OkHttpUtilTest {

@Test
void newDispatcher_isBounded() {
Dispatcher dispatcher = OkHttpUtil.newDispatcher();

try {
assertThat(dispatcher.getMaxRequests()).isEqualTo(64);
assertThat(dispatcher.getMaxRequestsPerHost()).isEqualTo(5);
assertThat(dispatcher.executorService())
.asInstanceOf(type(ThreadPoolExecutor.class))
.satisfies(
executor -> {
assertThat(executor.getMaximumPoolSize()).isEqualTo(64);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(ThreadPoolExecutor.AbortPolicy.class);
});
} finally {
dispatcher.executorService().shutdownNow();
}
}
}
Loading