-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathRetryingWebClient.java
More file actions
92 lines (79 loc) · 3.64 KB
/
RetryingWebClient.java
File metadata and controls
92 lines (79 loc) · 3.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.uid2.optout.web;
import io.netty.handler.codec.http.HttpMethod;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.BiFunction;
public class RetryingWebClient {
private static final Logger LOGGER = LoggerFactory.getLogger(RetryingWebClient.class);
private final URI uri;
private final HttpMethod method;
private final long resultTimeoutMs;
private final int retryCount;
private final int retryBackoffMs;
private final HttpClient httpClient;
private Vertx vertx;
public RetryingWebClient(Vertx vertx, String uri, HttpMethod method, int retryCount, int retryBackoffMs) {
this(vertx, uri, method, retryCount, retryBackoffMs, 5*60*1000);
}
public RetryingWebClient(Vertx vertx, String uri, HttpMethod method, int retryCount, int retryBackoffMs, long resultTimeoutMs) {
this.vertx = vertx;
this.uri = URI.create(uri);
this.method = method;
this.resultTimeoutMs = resultTimeoutMs;
this.httpClient = HttpClient.newHttpClient();
this.retryCount = retryCount;
this.retryBackoffMs = retryBackoffMs;
}
public Future<Void> send(BiFunction<URI, HttpMethod, HttpRequest> requestCreator, Function<HttpResponse, Boolean> responseValidator) {
return this.send(requestCreator, responseValidator, 0);
}
public Future<Void> send(BiFunction<URI, HttpMethod, HttpRequest> requestCreator, Function<HttpResponse, Boolean> responseValidator, int currentRetries) {
Promise<Void> promise = Promise.promise();
HttpRequest request = requestCreator.apply(this.uri, this.method);
CompletableFuture<HttpResponse<String>> asyncResponse = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.orTimeout(this.resultTimeoutMs, TimeUnit.MILLISECONDS);
asyncResponse.thenAccept(response -> {
try {
Boolean responseOK = responseValidator.apply(response);
if (responseOK == null) {
throw new RuntimeException("Response validator returned null");
}
if (responseOK) {
promise.complete();
} else if (currentRetries < this.retryCount) {
LOGGER.error("failed sending to " + uri + ", currentRetries: " + currentRetries + ", backing off before retrying");
if (this.retryBackoffMs > 0) {
vertx.setTimer(this.retryBackoffMs, i -> {
send(requestCreator, responseValidator, currentRetries + 1)
.onComplete(ar2 -> promise.handle(ar2));
});
} else {
send(requestCreator, responseValidator, currentRetries + 1)
.onComplete(ar2 -> promise.handle(ar2));
}
} else {
LOGGER.error("retry count exceeded for sending to " + this.uri);
throw new TooManyRetriesException(currentRetries);
}
}
catch (Throwable ex) {
promise.fail(ex);
}
});
asyncResponse.exceptionally(ex -> {
promise.fail(ex);
return null;
});
return promise.future();
}
}