Skip to content

Commit 95b07a9

Browse files
committed
Expose terminal error as a CompletionStage that completes with the failure cause or null on success.
1 parent 0fac29b commit 95b07a9

2 files changed

Lines changed: 44 additions & 8 deletions

File tree

httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.ArrayList;
3232
import java.util.Collections;
3333
import java.util.List;
34+
import java.util.concurrent.CancellationException;
3435
import java.util.concurrent.CompletableFuture;
3536
import java.util.concurrent.CompletionStage;
3637
import java.util.concurrent.Future;
@@ -67,6 +68,12 @@ public final class ReactiveResponseConsumer implements AsyncResponseConsumer<Voi
6768
private final CompletableFuture<Message<HttpResponse, Publisher<ByteBuffer>>> responseCompletableFuture;
6869
private final CompletableFuture<Void> responseCompletionFuture;
6970

71+
/**
72+
* Completes with {@code null} on success and with the terminal {@link Throwable} on failure.
73+
* This future never completes exceptionally.
74+
*/
75+
private final CompletableFuture<Throwable> failureFuture;
76+
7077
private volatile BasicFuture<Void> responseCompletion;
7178
private volatile HttpResponse informationResponse;
7279
private volatile EntityDetails entityDetails;
@@ -78,6 +85,7 @@ public ReactiveResponseConsumer() {
7885
this.responseFuture = new BasicFuture<>(null);
7986
this.responseCompletableFuture = new CompletableFuture<>();
8087
this.responseCompletionFuture = new CompletableFuture<>();
88+
this.failureFuture = new CompletableFuture<>();
8189
}
8290

8391
/**
@@ -90,6 +98,7 @@ public ReactiveResponseConsumer(final FutureCallback<Message<HttpResponse, Publi
9098
this.responseFuture = new BasicFuture<>(Args.notNull(responseCallback, "responseCallback"));
9199
this.responseCompletableFuture = new CompletableFuture<>();
92100
this.responseCompletionFuture = new CompletableFuture<>();
101+
this.failureFuture = new CompletableFuture<>();
93102
}
94103

95104
public Future<Message<HttpResponse, Publisher<ByteBuffer>>> getResponseFuture() {
@@ -136,6 +145,16 @@ public CompletionStage<Void> getResponseCompletionStage() {
136145
return responseCompletionFuture;
137146
}
138147

148+
/**
149+
* Completes with {@code null} on success and with the terminal {@link Throwable} on failure.
150+
* This stage never completes exceptionally.
151+
*
152+
* @since 5.5
153+
*/
154+
public CompletionStage<Throwable> getFailureStage() {
155+
return failureFuture;
156+
}
157+
139158
/**
140159
* Returns the intermediate (1xx) HTTP response if one was received.
141160
*
@@ -165,10 +184,10 @@ public List<Header> getTrailers() {
165184

166185
@Override
167186
public void consumeResponse(
168-
final HttpResponse response,
169-
final EntityDetails entityDetails,
170-
final HttpContext httpContext,
171-
final FutureCallback<Void> resultCallback
187+
final HttpResponse response,
188+
final EntityDetails entityDetails,
189+
final HttpContext httpContext,
190+
final FutureCallback<Void> resultCallback
172191
) {
173192
this.entityDetails = entityDetails;
174193
this.responseCompletion = new BasicFuture<>(resultCallback);
@@ -190,12 +209,13 @@ public void informationResponse(final HttpResponse response, final HttpContext h
190209
@Override
191210
public void failed(final Exception cause) {
192211
reactiveDataConsumer.failed(cause);
193-
194-
// Complete stage/futures regardless of whether consumeResponse() has been invoked yet.
195212
responseFuture.failed(cause);
196213
responseCompletableFuture.completeExceptionally(cause);
197214
responseCompletionFuture.completeExceptionally(cause);
198215

216+
// Record failure as a normal completion value.
217+
failureFuture.complete(cause);
218+
199219
final BasicFuture<Void> completion = responseCompletion;
200220
if (completion != null) {
201221
completion.failed(cause);
@@ -222,6 +242,9 @@ public void streamEnd(final List<? extends Header> trailers) {
222242
// Complete CF before BasicFuture.completed(...) (it may trigger releaseResources()).
223243
responseCompletionFuture.complete(null);
224244

245+
// Success => no failure.
246+
failureFuture.complete(null);
247+
225248
final BasicFuture<Void> completion = responseCompletion;
226249
if (completion != null) {
227250
completion.completed(null);
@@ -241,6 +264,10 @@ public void releaseResources() {
241264
responseCompletionFuture.cancel(true);
242265
}
243266

267+
if (!failureFuture.isDone()) {
268+
failureFuture.complete(new CancellationException());
269+
}
270+
244271
final BasicFuture<Void> completion = responseCompletion;
245272
if (completion != null) {
246273
completion.cancel();

httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveClientCompletionStageExample.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@
3939
import java.util.concurrent.TimeUnit;
4040
import java.util.concurrent.TimeoutException;
4141

42-
import io.reactivex.rxjava3.core.Flowable;
43-
import io.reactivex.rxjava3.core.Observable;
4442
import org.apache.hc.core5.http.ContentType;
4543
import org.apache.hc.core5.http.Header;
4644
import org.apache.hc.core5.http.HttpConnection;
@@ -60,6 +58,9 @@
6058
import org.apache.hc.core5.util.Timeout;
6159
import org.reactivestreams.Publisher;
6260

61+
import io.reactivex.rxjava3.core.Flowable;
62+
import io.reactivex.rxjava3.core.Observable;
63+
6364
/**
6465
* Client demo using CompletionStage accessors on ReactiveResponseConsumer (Java 8).
6566
*/
@@ -190,6 +191,14 @@ public void onExchangeComplete(final HttpConnection connection, final boolean ke
190191
final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
191192
requester.execute(requestProducer, consumer, Timeout.ofSeconds(30), null);
192193

194+
consumer.getFailureStage().whenComplete((t, ex) -> {
195+
if (ex != null) {
196+
ex.printStackTrace();
197+
} else if (t != null) {
198+
System.out.println("Request failed: " + t);
199+
}
200+
});
201+
193202
final CompletableFuture<Void> printedAndDrained = new CompletableFuture<>();
194203

195204
consumer.getResponseStage().whenComplete((msg, ex) -> {

0 commit comments

Comments
 (0)