diff --git a/http-clients/aws-crt-client/pom.xml b/http-clients/aws-crt-client/pom.xml
index 3090185d8878..0039eca8e844 100644
--- a/http-clients/aws-crt-client/pom.xml
+++ b/http-clients/aws-crt-client/pom.xml
@@ -173,6 +173,31 @@
mockito-junit-jupiter
test
+
+ io.netty
+ netty-codec-http2
+ test
+
+
+ io.netty
+ netty-common
+ test
+
+
+ io.netty
+ netty-transport
+ test
+
+
+ io.netty
+ netty-codec-http
+ test
+
+
+ io.netty
+ netty-handler
+ test
+
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java
index 08b9a5fdc7e8..dce28e5d1cfe 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java
@@ -22,7 +22,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
-import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
+import software.amazon.awssdk.crt.http.HttpStreamManager;
+import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
@@ -91,15 +92,15 @@ public CompletableFuture execute(AsyncExecuteRequest asyncRequest) {
* we have a pool and no one can destroy it underneath us until we've finished submitting the
* request)
*/
- try (HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool(poolKey(asyncRequest.request()))) {
- CrtAsyncRequestContext context = CrtAsyncRequestContext.builder()
- .crtConnPool(crtConnPool)
- .readBufferSize(this.readBufferSize)
- .request(asyncRequest)
- .build();
-
- return new CrtAsyncRequestExecutor().execute(context);
- }
+ HttpStreamManager crtConnPool = getOrCreateConnectionPool(poolKey(asyncRequest.request()));
+ CrtAsyncRequestContext context = CrtAsyncRequestContext.builder()
+ .crtConnPool(crtConnPool)
+ .readBufferSize(this.readBufferSize)
+ .request(asyncRequest)
+ .protocol(this.protocol)
+ .build();
+
+ return new CrtAsyncRequestExecutor().execute(context);
}
/**
@@ -222,6 +223,14 @@ AwsCrtAsyncHttpClient.Builder connectionHealthConfiguration(Consumer
tcpKeepAliveConfigurationBuilder);
+ /**
+ * Configure the HTTP protocol version to use for connections.
+ *
+ * @param protocol the HTTP protocol version
+ * @return The builder for method chaining.
+ */
+ AwsCrtAsyncHttpClient.Builder protocol(Protocol protocol);
+
/**
* Configure whether to enable a hybrid post-quantum key exchange option for the Transport Layer Security (TLS) network
* encryption protocol when communicating with services that support Post Quantum TLS. If Post Quantum cipher suites are
@@ -246,6 +255,13 @@ AwsCrtAsyncHttpClient.Builder tcpKeepAliveConfiguration(Consumer implements Builder {
+
+ @Override
+ public Builder protocol(Protocol protocol) {
+ getAttributeMap().put(SdkHttpConfigurationOption.PROTOCOL, protocol);
+ return this;
+ }
+
@Override
public SdkAsyncHttpClient build() {
return new AwsCrtAsyncHttpClient(this, getAttributeMap().build()
@@ -258,5 +274,6 @@ public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
.merge(serviceDefaults)
.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS));
}
+
}
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java
index 4270b47862e5..075db14e2af1 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java
@@ -23,11 +23,12 @@
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
-import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpException;
+import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
+import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.SdkHttpFullResponse;
@@ -56,6 +57,9 @@ public final class AwsCrtHttpClient extends AwsCrtHttpClientBase implements SdkH
private AwsCrtHttpClient(DefaultBuilder builder, AttributeMap config) {
super(builder, config);
+ if (this.protocol == Protocol.HTTP2) {
+ throw new UnsupportedOperationException("HTTP/2 is not supported in sync client. Use AwsCrtAsyncHttpClient instead.");
+ }
}
public static AwsCrtHttpClient.Builder builder() {
@@ -91,14 +95,13 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
* we have a pool and no one can destroy it underneath us until we've finished submitting the
* request)
*/
- try (HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool(poolKey(request.httpRequest()))) {
- CrtRequestContext context = CrtRequestContext.builder()
- .crtConnPool(crtConnPool)
- .readBufferSize(this.readBufferSize)
- .request(request)
- .build();
- return new CrtHttpRequest(context);
- }
+ HttpStreamManager crtConnPool = getOrCreateConnectionPool(poolKey(request.httpRequest()));
+ CrtRequestContext context = CrtRequestContext.builder()
+ .crtConnPool(crtConnPool)
+ .readBufferSize(this.readBufferSize)
+ .request(request)
+ .build();
+ return new CrtHttpRequest(context);
}
private static final class CrtHttpRequest implements ExecutableHttpRequest {
@@ -140,7 +143,7 @@ public HttpExecuteResponse call() throws IOException {
@Override
public void abort() {
if (responseFuture != null) {
- responseFuture.completeExceptionally(new IOException("Request ws cancelled"));
+ responseFuture.completeExceptionally(new IOException("Request was cancelled"));
}
}
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java
index 2df865f0fa0b..228a6086eddf 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java
@@ -28,10 +28,13 @@
import java.util.concurrent.ConcurrentHashMap;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.crt.CrtResource;
-import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
+import software.amazon.awssdk.crt.http.Http2StreamManagerOptions;
import software.amazon.awssdk.crt.http.HttpClientConnectionManagerOptions;
import software.amazon.awssdk.crt.http.HttpMonitoringOptions;
import software.amazon.awssdk.crt.http.HttpProxyOptions;
+import software.amazon.awssdk.crt.http.HttpStreamManager;
+import software.amazon.awssdk.crt.http.HttpStreamManagerOptions;
+import software.amazon.awssdk.crt.http.HttpVersion;
import software.amazon.awssdk.crt.io.ClientBootstrap;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.io.TlsContext;
@@ -57,7 +60,8 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
private static final long DEFAULT_STREAM_WINDOW_SIZE = 16L * 1024L * 1024L; // 16 MB
protected final long readBufferSize;
- private final Map connectionPools = new ConcurrentHashMap<>();
+ protected final Protocol protocol;
+ private final Map connectionPools = new ConcurrentHashMap<>();
private final LinkedList ownedSubResources = new LinkedList<>();
private final ClientBootstrap bootstrap;
private final SocketOptions socketOptions;
@@ -67,34 +71,35 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
private final long maxConnectionIdleInMilliseconds;
private final int maxConnectionsPerEndpoint;
private final long connectionAcquisitionTimeout;
+ private final TlsContextOptions tlsContextOptions;
private boolean isClosed = false;
AwsCrtHttpClientBase(AwsCrtClientBuilderBase builder, AttributeMap config) {
- if (config.get(PROTOCOL) == Protocol.HTTP2) {
- throw new UnsupportedOperationException("HTTP/2 is not supported in AwsCrtHttpClient yet. Use "
- + "NettyNioAsyncHttpClient instead.");
+ ClientBootstrap clientBootstrap = new ClientBootstrap(null, null);
+ SocketOptions clientSocketOptions = buildSocketOptions(builder.getTcpKeepAliveConfiguration(),
+ config.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT));
+ TlsContextOptions clientTlsContextOptions =
+ TlsContextOptions.createDefaultClient()
+ .withCipherPreference(resolveCipherPreference(builder.getPostQuantumTlsEnabled()))
+ .withVerifyPeer(!config.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES));
+ this.protocol = config.get(PROTOCOL);
+ if (protocol == Protocol.HTTP2) {
+ clientTlsContextOptions = clientTlsContextOptions.withAlpnList("h2");
}
- try (ClientBootstrap clientBootstrap = new ClientBootstrap(null, null);
- SocketOptions clientSocketOptions = buildSocketOptions(builder.getTcpKeepAliveConfiguration(),
- config.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT));
- TlsContextOptions clientTlsContextOptions =
- TlsContextOptions.createDefaultClient()
- .withCipherPreference(resolveCipherPreference(builder.getPostQuantumTlsEnabled()))
- .withVerifyPeer(!config.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES));
- TlsContext clientTlsContext = new TlsContext(clientTlsContextOptions)) {
-
- this.bootstrap = registerOwnedResource(clientBootstrap);
- this.socketOptions = registerOwnedResource(clientSocketOptions);
- this.tlsContext = registerOwnedResource(clientTlsContext);
- this.readBufferSize = builder.getReadBufferSizeInBytes() == null ?
- DEFAULT_STREAM_WINDOW_SIZE : builder.getReadBufferSizeInBytes();
- this.maxConnectionsPerEndpoint = config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS);
- this.monitoringOptions = resolveHttpMonitoringOptions(builder.getConnectionHealthConfiguration()).orElse(null);
- this.maxConnectionIdleInMilliseconds = config.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toMillis();
- this.connectionAcquisitionTimeout = config.get(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT).toMillis();
- this.proxyOptions = resolveProxy(builder.getProxyConfiguration(), tlsContext).orElse(null);
- }
+ this.tlsContextOptions = registerOwnedResource(clientTlsContextOptions);
+ TlsContext clientTlsContext = new TlsContext(clientTlsContextOptions);
+
+ this.bootstrap = registerOwnedResource(clientBootstrap);
+ this.socketOptions = registerOwnedResource(clientSocketOptions);
+ this.tlsContext = registerOwnedResource(clientTlsContext);
+ this.readBufferSize = builder.getReadBufferSizeInBytes() == null ?
+ DEFAULT_STREAM_WINDOW_SIZE : builder.getReadBufferSizeInBytes();
+ this.maxConnectionsPerEndpoint = config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS);
+ this.monitoringOptions = resolveHttpMonitoringOptions(builder.getConnectionHealthConfiguration()).orElse(null);
+ this.maxConnectionIdleInMilliseconds = config.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toMillis();
+ this.connectionAcquisitionTimeout = config.get(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT).toMillis();
+ this.proxyOptions = resolveProxy(builder.getProxyConfiguration(), tlsContext).orElse(null);
}
/**
@@ -106,7 +111,6 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
*/
private T registerOwnedResource(T subresource) {
if (subresource != null) {
- subresource.addRef();
ownedSubResources.push(subresource);
}
return subresource;
@@ -116,13 +120,16 @@ String clientName() {
return AWS_COMMON_RUNTIME;
}
- private HttpClientConnectionManager createConnectionPool(URI uri) {
+ private HttpStreamManager createConnectionPool(URI uri) {
log.debug(() -> "Creating ConnectionPool for: URI:" + uri + ", MaxConns: " + maxConnectionsPerEndpoint);
- HttpClientConnectionManagerOptions options = new HttpClientConnectionManagerOptions()
+ boolean isHttps = "https".equalsIgnoreCase(uri.getScheme());
+ TlsContext poolTlsContext = isHttps ? tlsContext : null;
+
+ HttpClientConnectionManagerOptions h1Options = new HttpClientConnectionManagerOptions()
.withClientBootstrap(bootstrap)
.withSocketOptions(socketOptions)
- .withTlsContext(tlsContext)
+ .withTlsContext(poolTlsContext)
.withUri(uri)
.withWindowSize(readBufferSize)
.withMaxConnections(maxConnectionsPerEndpoint)
@@ -132,7 +139,24 @@ private HttpClientConnectionManager createConnectionPool(URI uri) {
.withMaxConnectionIdleInMilliseconds(maxConnectionIdleInMilliseconds)
.withConnectionAcquisitionTimeoutInMilliseconds(connectionAcquisitionTimeout);
- return HttpClientConnectionManager.create(options);
+ HttpStreamManagerOptions options = new HttpStreamManagerOptions()
+ .withHTTP1ConnectionManagerOptions(h1Options);
+
+ if (protocol == Protocol.HTTP2) {
+ Http2StreamManagerOptions h2Options = new Http2StreamManagerOptions()
+ .withConnectionManagerOptions(h1Options);
+
+ if (!isHttps) {
+ h2Options.withPriorKnowledge(true);
+ }
+
+ options.withHTTP2StreamManagerOptions(h2Options);
+ options.withExpectedProtocol(HttpVersion.HTTP_2);
+ } else {
+ options.withExpectedProtocol(HttpVersion.HTTP_1_1);
+ }
+
+ return HttpStreamManager.create(options);
}
/*
@@ -150,14 +174,13 @@ private HttpClientConnectionManager createConnectionPool(URI uri) {
* existing pool. If we add all of execute() to the scope, we include, at minimum a JNI call to the native
* pool implementation.
*/
- HttpClientConnectionManager getOrCreateConnectionPool(URI uri) {
+ HttpStreamManager getOrCreateConnectionPool(URI uri) {
synchronized (this) {
if (isClosed) {
throw new IllegalStateException("Client is closed. No more requests can be made with this client.");
}
- HttpClientConnectionManager connPool = connectionPools.computeIfAbsent(uri, this::createConnectionPool);
- connPool.addRef();
+ HttpStreamManager connPool = connectionPools.computeIfAbsent(uri, this::createConnectionPool);
return connPool;
}
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestContext.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestContext.java
index 69db7cb3c5b9..83c751d282fa 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestContext.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestContext.java
@@ -16,7 +16,8 @@
package software.amazon.awssdk.http.crt.internal;
import software.amazon.awssdk.annotations.SdkInternalApi;
-import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
+import software.amazon.awssdk.crt.http.HttpStreamManager;
+import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.metrics.MetricCollector;
@@ -24,14 +25,16 @@
public final class CrtAsyncRequestContext {
private final AsyncExecuteRequest request;
private final long readBufferSize;
- private final HttpClientConnectionManager crtConnPool;
+ private final HttpStreamManager crtConnPool;
private final MetricCollector metricCollector;
+ private final Protocol protocol;
private CrtAsyncRequestContext(Builder builder) {
this.request = builder.request;
this.readBufferSize = builder.readBufferSize;
this.crtConnPool = builder.crtConnPool;
this.metricCollector = request.metricCollector().orElse(null);
+ this.protocol = builder.protocol;
}
public static Builder builder() {
@@ -46,7 +49,11 @@ public long readBufferSize() {
return readBufferSize;
}
- public HttpClientConnectionManager crtConnPool() {
+ public Protocol protocol() {
+ return protocol;
+ }
+
+ public HttpStreamManager crtConnPool() {
return crtConnPool;
}
@@ -57,7 +64,8 @@ public MetricCollector metricCollector() {
public static final class Builder {
private AsyncExecuteRequest request;
private long readBufferSize;
- private HttpClientConnectionManager crtConnPool;
+ private HttpStreamManager crtConnPool;
+ private Protocol protocol;
private Builder() {
}
@@ -72,13 +80,19 @@ public Builder readBufferSize(long readBufferSize) {
return this;
}
- public Builder crtConnPool(HttpClientConnectionManager crtConnPool) {
+ public Builder crtConnPool(HttpStreamManager crtConnPool) {
this.crtConnPool = crtConnPool;
return this;
}
+ public Builder protocol(Protocol protocol) {
+ this.protocol = protocol;
+ return this;
+ }
+
public CrtAsyncRequestContext build() {
return new CrtAsyncRequestContext(this);
}
+
}
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java
index 4ede09d7ee3e..2b3eb7f562b8 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java
@@ -16,21 +16,17 @@
package software.amazon.awssdk.http.crt.internal;
import static software.amazon.awssdk.http.crt.internal.CrtUtils.reportMetrics;
-import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapConnectionFailureException;
-import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapWithIoExceptionIfRetryable;
+import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapCrtException;
+import static software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter.toAsyncCrtRequest;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
-import software.amazon.awssdk.crt.CrtRuntimeException;
-import software.amazon.awssdk.crt.http.HttpClientConnection;
-import software.amazon.awssdk.crt.http.HttpException;
-import software.amazon.awssdk.crt.http.HttpRequest;
-import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
+import software.amazon.awssdk.crt.http.HttpRequestBase;
+import software.amazon.awssdk.crt.http.HttpStreamBase;
+import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
-import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter;
import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.metrics.NoOpMetricCollector;
@@ -42,8 +38,21 @@ public final class CrtAsyncRequestExecutor {
private static final Logger log = Logger.loggerFor(CrtAsyncRequestExecutor.class);
public CompletableFuture execute(CrtAsyncRequestContext executionContext) {
- // go ahead and get a reference to the metricCollector since multiple futures will
- // need it regardless.
+ AsyncExecuteRequest asyncRequest = executionContext.sdkRequest();
+ CompletableFuture requestFuture = createAsyncExecutionFuture(asyncRequest);
+
+ try {
+ doExecute(executionContext, asyncRequest, requestFuture);
+ } catch (Throwable t) {
+ reportAsyncFailure(t, requestFuture, asyncRequest.responseHandler());
+ }
+
+ return requestFuture;
+ }
+
+ private void doExecute(CrtAsyncRequestContext executionContext,
+ AsyncExecuteRequest asyncRequest,
+ CompletableFuture requestFuture) {
MetricCollector metricCollector = executionContext.metricCollector();
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);
@@ -56,65 +65,31 @@ public CompletableFuture execute(CrtAsyncRequestContext executionContext)
acquireStartTime = System.nanoTime();
}
- CompletableFuture requestFuture = createAsyncExecutionFuture(executionContext.sdkRequest());
+ HttpRequestBase crtRequest = toAsyncCrtRequest(executionContext);
- // When a Connection is ready from the Connection Pool, schedule the Request on the connection
- CompletableFuture httpClientConnectionCompletableFuture =
- executionContext.crtConnPool().acquireConnection();
+ HttpStreamBaseResponseHandler crtResponseHandler =
+ CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler());
- long finalAcquireStartTime = acquireStartTime;
+ CompletableFuture streamFuture =
+ executionContext.crtConnPool().acquireStream(crtRequest, crtResponseHandler);
- httpClientConnectionCompletableFuture.whenComplete((crtConn, throwable) -> {
- AsyncExecuteRequest asyncRequest = executionContext.sdkRequest();
+ long finalAcquireStartTime = acquireStartTime;
+ streamFuture.whenComplete((stream, throwable) -> {
if (shouldPublishMetrics) {
reportMetrics(executionContext.crtConnPool(), metricCollector, finalAcquireStartTime);
}
- // If we didn't get a connection for some reason, fail the request
if (throwable != null) {
- Throwable toThrow = wrapConnectionFailureException(throwable);
- reportAsyncFailure(crtConn, toThrow, requestFuture, asyncRequest.responseHandler());
- return;
+ Throwable toThrow = wrapCrtException(throwable);
+ reportAsyncFailure(toThrow, requestFuture, asyncRequest.responseHandler());
}
-
- executeRequest(executionContext, requestFuture, crtConn, asyncRequest);
});
-
- return requestFuture;
- }
-
- private void executeRequest(CrtAsyncRequestContext executionContext,
- CompletableFuture requestFuture,
- HttpClientConnection crtConn,
- AsyncExecuteRequest asyncRequest) {
- // Submit the request on the connection
- try {
- HttpRequest crtRequest = CrtRequestAdapter.toAsyncCrtRequest(executionContext);
- HttpStreamResponseHandler crtResponseHandler =
- CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, asyncRequest.responseHandler());
-
- crtConn.makeRequest(crtRequest, crtResponseHandler).activate();
- } catch (HttpException e) {
- Throwable toThrow = wrapWithIoExceptionIfRetryable(e);
- reportAsyncFailure(crtConn,
- toThrow,
- requestFuture,
- asyncRequest.responseHandler());
- } catch (IllegalStateException | CrtRuntimeException e) {
- // CRT throws IllegalStateException if the connection is closed
- reportAsyncFailure(crtConn, new IOException("An exception occurred when making the request", e),
- requestFuture,
- asyncRequest.responseHandler());
- } catch (Throwable throwable) {
- reportAsyncFailure(crtConn, throwable,
- requestFuture,
- asyncRequest.responseHandler());
- }
}
/**
* Create the execution future and set up the cancellation logic.
+ *
* @return The created execution future.
*/
private CompletableFuture createAsyncExecutionFuture(AsyncExecuteRequest request) {
@@ -137,14 +112,9 @@ private CompletableFuture createAsyncExecutionFuture(AsyncExecuteRequest r
/**
* Notify the provided response handler and future of the failure.
*/
- private void reportAsyncFailure(HttpClientConnection crtConn,
- Throwable cause,
- CompletableFuture executeFuture,
- SdkAsyncHttpResponseHandler responseHandler) {
- if (crtConn != null) {
- crtConn.close();
- }
-
+ private void reportAsyncFailure(Throwable cause,
+ CompletableFuture executeFuture,
+ SdkAsyncHttpResponseHandler responseHandler) {
try {
responseHandler.onError(cause);
} catch (Exception e) {
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java
index 420f12e31f44..507e9a6e7165 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java
@@ -16,7 +16,7 @@
package software.amazon.awssdk.http.crt.internal;
import software.amazon.awssdk.annotations.SdkInternalApi;
-import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
+import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.metrics.MetricCollector;
@@ -24,7 +24,7 @@
public final class CrtRequestContext {
private final HttpExecuteRequest request;
private final long readBufferSize;
- private final HttpClientConnectionManager crtConnPool;
+ private final HttpStreamManager crtConnPool;
private final MetricCollector metricCollector;
private CrtRequestContext(Builder builder) {
@@ -46,7 +46,7 @@ public long readBufferSize() {
return readBufferSize;
}
- public HttpClientConnectionManager crtConnPool() {
+ public HttpStreamManager crtConnPool() {
return crtConnPool;
}
@@ -57,7 +57,7 @@ public MetricCollector metricCollector() {
public static final class Builder {
private HttpExecuteRequest request;
private long readBufferSize;
- private HttpClientConnectionManager crtConnPool;
+ private HttpStreamManager crtConnPool;
private Builder() {
}
@@ -72,7 +72,7 @@ public Builder readBufferSize(long readBufferSize) {
return this;
}
- public Builder crtConnPool(HttpClientConnectionManager crtConnPool) {
+ public Builder crtConnPool(HttpStreamManager crtConnPool) {
this.crtConnPool = crtConnPool;
return this;
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java
index c3b2f4e8410d..f0b976a457e1 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java
@@ -16,17 +16,13 @@
package software.amazon.awssdk.http.crt.internal;
import static software.amazon.awssdk.http.crt.internal.CrtUtils.reportMetrics;
-import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapConnectionFailureException;
-import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapWithIoExceptionIfRetryable;
+import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapCrtException;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
-import software.amazon.awssdk.crt.CrtRuntimeException;
-import software.amazon.awssdk.crt.http.HttpClientConnection;
-import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpRequest;
-import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
+import software.amazon.awssdk.crt.http.HttpStreamBase;
+import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter;
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
@@ -37,8 +33,18 @@
public final class CrtRequestExecutor {
public CompletableFuture execute(CrtRequestContext executionContext) {
- // go ahead and get a reference to the metricCollector since multiple futures will
- // need it regardless.
+ CompletableFuture requestFuture = new CompletableFuture<>();
+
+ try {
+ doExecute(executionContext, requestFuture);
+ } catch (Throwable t) {
+ requestFuture.completeExceptionally(t);
+ }
+
+ return requestFuture;
+ }
+
+ private void doExecute(CrtRequestContext executionContext, CompletableFuture requestFuture) {
MetricCollector metricCollector = executionContext.metricCollector();
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);
@@ -51,65 +57,24 @@ public CompletableFuture execute(CrtRequestContext executio
acquireStartTime = System.nanoTime();
}
- CompletableFuture requestFuture = new CompletableFuture<>();
+ HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);
+
+ HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext);
- // When a Connection is ready from the Connection Pool, schedule the Request on the connection
- CompletableFuture httpClientConnectionCompletableFuture =
- executionContext.crtConnPool().acquireConnection();
+ CompletableFuture streamFuture =
+ executionContext.crtConnPool().acquireStream(crtRequest, crtResponseHandler);
long finalAcquireStartTime = acquireStartTime;
- httpClientConnectionCompletableFuture.whenComplete((crtConn, throwable) -> {
+ streamFuture.whenComplete((streamBase, throwable) -> {
if (shouldPublishMetrics) {
reportMetrics(executionContext.crtConnPool(), metricCollector, finalAcquireStartTime);
}
- // If we didn't get a connection for some reason, fail the request
if (throwable != null) {
- Throwable toThrow = wrapConnectionFailureException(throwable);
+ Throwable toThrow = wrapCrtException(throwable);
requestFuture.completeExceptionally(toThrow);
- return;
}
-
- executeRequest(executionContext, requestFuture, crtConn);
});
-
- return requestFuture;
- }
-
- private void executeRequest(CrtRequestContext executionContext,
- CompletableFuture requestFuture,
- HttpClientConnection crtConn) {
- HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext);
-
- try {
- HttpStreamResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(crtConn,
- requestFuture);
-
- // Submit the request on the connection
- crtConn.makeRequest(crtRequest, crtResponseHandler).activate();
- } catch (Throwable throwable) {
- handleException(requestFuture, crtConn, throwable);
- }
- }
-
- private static void handleException(CompletableFuture requestFuture, HttpClientConnection crtConn,
- Throwable throwable) {
-
- crtConn.close();
-
- if (throwable instanceof HttpException) {
- Throwable toThrow = wrapWithIoExceptionIfRetryable((HttpException) throwable);
- requestFuture.completeExceptionally(toThrow);
- return;
- }
-
- if (throwable instanceof CrtRuntimeException || throwable instanceof IllegalStateException) {
- // CRT throws IllegalStateException if the connection is closed
- requestFuture.completeExceptionally(new IOException(throwable));
- return;
- }
-
- requestFuture.completeExceptionally(throwable);
}
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtUtils.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtUtils.java
index 62e9c4156814..da69685c9ecb 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtUtils.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtUtils.java
@@ -25,12 +25,14 @@
import java.io.IOException;
import java.net.ConnectException;
import java.time.Duration;
+import java.util.concurrent.CompletionException;
import javax.net.ssl.SSLHandshakeException;
import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.crt.CrtRuntimeException;
import software.amazon.awssdk.crt.http.HttpClientConnection;
-import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpManagerMetrics;
+import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.metrics.MetricCollector;
@SdkInternalApi
@@ -52,30 +54,40 @@ public static Throwable wrapWithIoExceptionIfRetryable(HttpException httpExcepti
return toThrow;
}
- public static Throwable wrapConnectionFailureException(Throwable throwable) {
- Throwable toThrow = new IOException("An exception occurred when acquiring a connection", throwable);
+ public static Throwable wrapCrtException(Throwable throwable) {
+ if (throwable instanceof CompletionException) {
+ throwable = throwable.getCause();
+ }
if (throwable instanceof HttpException) {
HttpException httpException = (HttpException) throwable;
int httpErrorCode = httpException.getErrorCode();
if (httpErrorCode == CRT_TLS_NEGOTIATION_ERROR_CODE) {
- toThrow = new SSLHandshakeException(httpException.getMessage());
- } else if (httpErrorCode == CRT_SOCKET_TIMEOUT) {
- toThrow = new ConnectException(httpException.getMessage());
+ return new SSLHandshakeException(httpException.getMessage());
+ }
+ // TODO: check with CRT team, could CRT_SOCKET_TIMEOUT be thrown
+ // from processes other than tcp connect?
+ if (httpErrorCode == CRT_SOCKET_TIMEOUT) {
+ return new ConnectException(httpException.getMessage());
}
+
+ return wrapWithIoExceptionIfRetryable((HttpException) throwable);
}
- return toThrow;
+ if (throwable instanceof IllegalStateException || throwable instanceof CrtRuntimeException) {
+ // CRT throws IllegalStateException if the connection is closed
+ return new IOException("An exception occurred when making the request", throwable);
+ }
+
+ return throwable;
}
- public static void reportMetrics(HttpClientConnectionManager connManager, MetricCollector metricCollector,
- long acquireStartTime) {
+ public static void reportMetrics(HttpStreamManager connManager, MetricCollector metricCollector,
+ long acquireStartTime) {
long acquireCompletionTime = System.nanoTime();
Duration acquireTimeTaken = Duration.ofNanos(acquireCompletionTime - acquireStartTime);
metricCollector.reportMetric(CONCURRENCY_ACQUIRE_DURATION, acquireTimeTaken);
HttpManagerMetrics managerMetrics = connManager.getManagerMetrics();
- // currently this executor only handles HTTP 1.1. Until H2 is added, the max concurrency settings are 1:1 with TCP
- // connections. When H2 is added, this code needs to be updated to handle stream multiplexing
metricCollector.reportMetric(MAX_CONCURRENCY, connManager.getMaxConnections());
metricCollector.reportMetric(AVAILABLE_CONCURRENCY, saturatedCast(managerMetrics.getAvailableConcurrency()));
metricCollector.reportMetric(LEASED_CONCURRENCY, saturatedCast(managerMetrics.getLeasedConcurrency()));
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
index 10b658a835df..8672d80b0d1b 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java
@@ -22,8 +22,10 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpRequest;
+import software.amazon.awssdk.crt.http.HttpRequestBase;
import software.amazon.awssdk.http.Header;
import software.amazon.awssdk.http.HttpExecuteRequest;
+import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.crt.internal.CrtAsyncRequestContext;
@@ -34,7 +36,7 @@ public final class CrtRequestAdapter {
private CrtRequestAdapter() {
}
- public static HttpRequest toAsyncCrtRequest(CrtAsyncRequestContext request) {
+ public static HttpRequestBase toAsyncCrtRequest(CrtAsyncRequestContext request) {
AsyncExecuteRequest sdkExecuteRequest = request.sdkRequest();
SdkHttpRequest sdkRequest = sdkExecuteRequest.request();
@@ -47,14 +49,15 @@ public static HttpRequest toAsyncCrtRequest(CrtAsyncRequestContext request) {
String encodedQueryString = sdkRequest.encodedQueryParameters()
.map(value -> "?" + value)
.orElse("");
-
- HttpHeader[] crtHeaderArray = asArray(createAsyncHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest));
-
+ String path = encodedPath + encodedQueryString;
+ CrtRequestBodyAdapter crtRequestBodyAdapter = new CrtRequestBodyAdapter(sdkExecuteRequest.requestContentPublisher(),
+ request.readBufferSize());
+ HttpHeader[] crtHeaderArray = asArray(createAsyncHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest,
+ request.protocol()));
return new HttpRequest(method,
- encodedPath + encodedQueryString,
+ path,
crtHeaderArray,
- new CrtRequestBodyAdapter(sdkExecuteRequest.requestContentPublisher(),
- request.readBufferSize()));
+ crtRequestBodyAdapter);
}
public static HttpRequest toCrtRequest(CrtRequestContext request) {
@@ -89,7 +92,8 @@ private static HttpHeader[] asArray(List crtHeaderList) {
return crtHeaderList.toArray(new HttpHeader[0]);
}
- private static List createAsyncHttpHeaderList(URI uri, AsyncExecuteRequest sdkExecuteRequest) {
+ private static List createAsyncHttpHeaderList(URI uri, AsyncExecuteRequest sdkExecuteRequest,
+ Protocol protocol) {
SdkHttpRequest sdkRequest = sdkExecuteRequest.request();
// worst case we may add 3 more headers here
List crtHeaderList = new ArrayList<>(sdkRequest.numHeaders() + 3);
@@ -99,8 +103,8 @@ private static List createAsyncHttpHeaderList(URI uri, AsyncExecuteR
crtHeaderList.add(new HttpHeader(Header.HOST, uri.getHost()));
}
- // Add Connection Keep Alive Header to reuse this Http Connection as long as possible
- if (!sdkRequest.firstMatchingHeader(Header.CONNECTION).isPresent()) {
+ // Add Connection Keep Alive Header for HTTP/1.1 only (forbidden in HTTP/2 per RFC 7540)
+ if (protocol != Protocol.HTTP2 && !sdkRequest.firstMatchingHeader(Header.CONNECTION).isPresent()) {
crtHeaderList.add(new HttpHeader(Header.CONNECTION, Header.KEEP_ALIVE_VALUE));
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java
index d32c9e2ac228..cac60a75fea9 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java
@@ -22,12 +22,11 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.crt.CRT;
-import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
-import software.amazon.awssdk.crt.http.HttpStream;
-import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
+import software.amazon.awssdk.crt.http.HttpStreamBase;
+import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
@@ -41,49 +40,44 @@
* Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods
*/
@SdkInternalApi
-public final class CrtResponseAdapter implements HttpStreamResponseHandler {
+public final class CrtResponseAdapter implements HttpStreamBaseResponseHandler {
private static final Logger log = Logger.loggerFor(CrtResponseAdapter.class);
private final CompletableFuture completionFuture;
private final SdkAsyncHttpResponseHandler responseHandler;
private final SimplePublisher responsePublisher;
-
private final SdkHttpResponse.Builder responseBuilder;
private final ResponseHandlerHelper responseHandlerHelper;
- private CrtResponseAdapter(HttpClientConnection connection,
- CompletableFuture completionFuture,
+ private CrtResponseAdapter(CompletableFuture completionFuture,
SdkAsyncHttpResponseHandler responseHandler) {
- this(connection, completionFuture, responseHandler, new SimplePublisher<>());
+ this(completionFuture, responseHandler, new SimplePublisher<>());
}
-
@SdkTestInternalApi
- public CrtResponseAdapter(HttpClientConnection connection,
- CompletableFuture completionFuture,
- SdkAsyncHttpResponseHandler responseHandler,
- SimplePublisher simplePublisher) {
- Validate.paramNotNull(connection, "connection");
+ public CrtResponseAdapter(CompletableFuture completionFuture,
+ SdkAsyncHttpResponseHandler responseHandler,
+ SimplePublisher simplePublisher) {
this.completionFuture = Validate.paramNotNull(completionFuture, "completionFuture");
this.responseHandler = Validate.paramNotNull(responseHandler, "responseHandler");
this.responseBuilder = SdkHttpResponse.builder();
- this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder, connection);
this.responsePublisher = simplePublisher;
+ this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder);
}
- public static HttpStreamResponseHandler toCrtResponseHandler(HttpClientConnection crtConn,
- CompletableFuture requestFuture,
- SdkAsyncHttpResponseHandler responseHandler) {
- return new CrtResponseAdapter(crtConn, requestFuture, responseHandler);
+ public static HttpStreamBaseResponseHandler toCrtResponseHandler(
+ CompletableFuture requestFuture,
+ SdkAsyncHttpResponseHandler responseHandler) {
+ return new CrtResponseAdapter(requestFuture, responseHandler);
}
@Override
- public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] nextHeaders) {
- responseHandlerHelper.onResponseHeaders(responseStatusCode, blockType, nextHeaders);
+ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
+ responseHandlerHelper.onResponseHeaders(stream, responseStatusCode, headerType, nextHeaders);
}
@Override
- public void onResponseHeadersDone(HttpStream stream, int headerType) {
+ public void onResponseHeadersDone(HttpStreamBase stream, int headerType) {
if (headerType == HttpHeaderBlock.MAIN.getValue()) {
responseHandler.onHeaders(responseBuilder.build());
responseHandler.onStream(responsePublisher);
@@ -91,7 +85,7 @@ public void onResponseHeadersDone(HttpStream stream, int headerType) {
}
@Override
- public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
+ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
CompletableFuture writeFuture = responsePublisher.send(ByteBuffer.wrap(bodyBytesIn));
if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) {
@@ -101,49 +95,43 @@ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
writeFuture.whenComplete((result, failure) -> {
if (failure != null) {
- handlePublisherError(stream, failure);
+ failResponseHandlerAndFuture(failure);
+ responseHandlerHelper.closeStream();
return;
}
-
- responseHandlerHelper.incrementWindow(stream, bodyBytesIn.length);
+ responseHandlerHelper.incrementWindow(bodyBytesIn.length);
});
return 0;
}
@Override
- public void onResponseComplete(HttpStream stream, int errorCode) {
+ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
if (errorCode == CRT.AWS_CRT_SUCCESS) {
- onSuccessfulResponseComplete(stream);
+ onSuccessfulResponseComplete();
} else {
- onFailedResponseComplete(stream, new HttpException(errorCode));
+ onFailedResponseComplete(new HttpException(errorCode));
}
}
- private void onSuccessfulResponseComplete(HttpStream stream) {
+ private void onSuccessfulResponseComplete() {
responsePublisher.complete().whenComplete((result, failure) -> {
if (failure != null) {
- handlePublisherError(stream, failure);
+ failResponseHandlerAndFuture(failure);
+ responseHandlerHelper.closeStream();
return;
}
completionFuture.complete(null);
});
-
- responseHandlerHelper.releaseConnection(stream);
+ responseHandlerHelper.closeStream();
}
- private void handlePublisherError(HttpStream stream, Throwable failure) {
- failResponseHandlerAndFuture(failure);
- responseHandlerHelper.closeConnection(stream);
- }
-
- private void onFailedResponseComplete(HttpStream stream, HttpException error) {
+ private void onFailedResponseComplete(HttpException error) {
log.debug(() -> "HTTP response encountered an error.", error);
-
- Throwable toThrow = wrapWithIoExceptionIfRetryable(error);;
+ Throwable toThrow = wrapWithIoExceptionIfRetryable(error);
responsePublisher.error(toThrow);
failResponseHandlerAndFuture(toThrow);
- responseHandlerHelper.closeConnection(stream);
+ responseHandlerHelper.closeStream();
}
private void failResponseHandlerAndFuture(Throwable error) {
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java
index ba3a91dcdeca..1bee55cdc9ce 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java
@@ -22,12 +22,10 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.crt.CRT;
-import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpHeader;
-import software.amazon.awssdk.crt.http.HttpHeaderBlock;
-import software.amazon.awssdk.crt.http.HttpStream;
-import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
+import software.amazon.awssdk.crt.http.HttpStreamBase;
+import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.SdkHttpResponse;
@@ -40,54 +38,45 @@
* Response handler adaptor for {@link AwsCrtHttpClient}.
*/
@SdkInternalApi
-public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler {
+public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamBaseResponseHandler {
private static final Logger log = Logger.loggerFor(InputStreamAdaptingHttpStreamResponseHandler.class);
private volatile AbortableInputStreamSubscriber inputStreamSubscriber;
private final SimplePublisher simplePublisher;
-
private final CompletableFuture requestCompletionFuture;
-
private final SdkHttpFullResponse.Builder responseBuilder;
private final ResponseHandlerHelper responseHandlerHelper;
- public InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn,
- CompletableFuture requestCompletionFuture) {
- this(crtConn, requestCompletionFuture, new SimplePublisher<>());
+ public InputStreamAdaptingHttpStreamResponseHandler(CompletableFuture requestCompletionFuture) {
+ this(requestCompletionFuture, new SimplePublisher<>());
}
@SdkTestInternalApi
- public InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn,
- CompletableFuture requestCompletionFuture,
+ public InputStreamAdaptingHttpStreamResponseHandler(CompletableFuture requestCompletionFuture,
SimplePublisher simplePublisher) {
this.requestCompletionFuture = requestCompletionFuture;
this.responseBuilder = SdkHttpResponse.builder();
- this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder, crtConn);
this.simplePublisher = simplePublisher;
+ this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder);
}
@Override
- public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType,
+ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
HttpHeader[] nextHeaders) {
- if (blockType == HttpHeaderBlock.MAIN.getValue()) {
- for (HttpHeader h : nextHeaders) {
- responseBuilder.appendHeader(h.getName(), h.getValue());
- }
- responseBuilder.statusCode(responseStatusCode);
- }
+ responseHandlerHelper.onResponseHeaders(stream, responseStatusCode, blockType, nextHeaders);
// Propagate cancellation
requestCompletionFuture.exceptionally(t -> {
- responseHandlerHelper.closeConnection(stream);
+ responseHandlerHelper.closeStream();
return null;
});
}
@Override
- public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
+ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
if (inputStreamSubscriber == null) {
inputStreamSubscriber =
AbortableInputStreamSubscriber.builder()
- .doAfterClose(() -> responseHandlerHelper.closeConnection(stream))
+ .doAfterClose(() -> responseHandlerHelper.closeStream())
.build();
simplePublisher.subscribe(inputStreamSubscriber);
// For response with a payload, we need to complete the future here to allow downstream to retrieve the data from
@@ -107,11 +96,11 @@ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
if (failure != null) {
log.debug(() -> "The subscriber failed to receive the data, closing the connection and failing the future",
failure);
- failFutureAndCloseConnection(stream, failure);
+ requestCompletionFuture.completeExceptionally(failure);
+ responseHandlerHelper.closeStream();
return;
}
- // increment the window upon buffer consumption.
- responseHandlerHelper.incrementWindow(stream, bodyBytesIn.length);
+ responseHandlerHelper.incrementWindow(bodyBytesIn.length);
});
// Window will be incremented after the subscriber consumes the data, returning 0 here to disable it.
@@ -119,34 +108,28 @@ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
}
@Override
- public void onResponseComplete(HttpStream stream, int errorCode) {
+ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
if (errorCode == CRT.AWS_CRT_SUCCESS) {
- onSuccessfulResponseComplete(stream);
+ onSuccessfulResponseComplete();
} else {
- onFailedResponseComplete(stream, errorCode);
+ onFailedResponseComplete(errorCode);
}
}
- private void failFutureAndCloseConnection(HttpStream stream, Throwable failure) {
- requestCompletionFuture.completeExceptionally(failure);
- responseHandlerHelper.closeConnection(stream);
- }
-
- private void onFailedResponseComplete(HttpStream stream, int errorCode) {
- Throwable toThrow =
- wrapWithIoExceptionIfRetryable(new HttpException(errorCode));
-
+ private void onFailedResponseComplete(int errorCode) {
+ Throwable toThrow = wrapWithIoExceptionIfRetryable(new HttpException(errorCode));
simplePublisher.error(toThrow);
- failFutureAndCloseConnection(stream, toThrow);
+ requestCompletionFuture.completeExceptionally(toThrow);
+ responseHandlerHelper.closeStream();
}
- private void onSuccessfulResponseComplete(HttpStream stream) {
+ private void onSuccessfulResponseComplete() {
// For response without a payload, for example, S3 PutObjectResponse, we need to complete the future
// in onResponseComplete callback since onResponseBody will never be invoked.
- requestCompletionFuture.complete(responseBuilder.build());
+ requestCompletionFuture.complete(responseBuilder.build());
// requestCompletionFuture has been completed at this point, no need to notify the future
simplePublisher.complete();
- responseHandlerHelper.releaseConnection(stream);
+ responseHandlerHelper.closeStream();
}
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java
index 6da44090aa5f..4b2b4a1c4626 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java
@@ -16,35 +16,34 @@
package software.amazon.awssdk.http.crt.internal.response;
import software.amazon.awssdk.annotations.SdkInternalApi;
-import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
-import software.amazon.awssdk.crt.http.HttpStream;
+import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.http.SdkHttpResponse;
/**
* This is the helper class that contains common logic shared between {@link CrtResponseAdapter} and
* {@link InputStreamAdaptingHttpStreamResponseHandler}.
*
- * CRT connection will only be closed, i.e., not reused, in one of the following conditions:
- * 1. 5xx server error OR
- * 2. It fails to read the response OR
- * 3. the response stream is closed/aborted by the caller.
*/
@SdkInternalApi
public class ResponseHandlerHelper {
private final SdkHttpResponse.Builder responseBuilder;
- private final HttpClientConnection connection;
- private boolean connectionClosed;
- private final Object lock = new Object();
+ private HttpStreamBase stream;
+ private boolean streamClosed;
+ private final Object streamLock = new Object();
- public ResponseHandlerHelper(SdkHttpResponse.Builder responseBuilder, HttpClientConnection connection) {
+ public ResponseHandlerHelper(SdkHttpResponse.Builder responseBuilder) {
this.responseBuilder = responseBuilder;
- this.connection = connection;
}
- public void onResponseHeaders(int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
+ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
+ synchronized (streamLock) {
+ if (this.stream == null) {
+ this.stream = stream;
+ }
+ }
if (headerType == HttpHeaderBlock.MAIN.getValue()) {
for (HttpHeader h : nextHeaders) {
responseBuilder.appendHeader(h.getName(), h.getValue());
@@ -53,36 +52,18 @@ public void onResponseHeaders(int responseStatusCode, int headerType, HttpHeader
}
}
- /**
- * Release the connection back to the pool so that it can be reused.
- */
- public void releaseConnection(HttpStream stream) {
- synchronized (lock) {
- if (!connectionClosed) {
- connectionClosed = true;
- connection.close();
- stream.close();
- }
- }
- }
-
- public void incrementWindow(HttpStream stream, int windowSize) {
- synchronized (lock) {
- if (!connectionClosed) {
+ public void incrementWindow(int windowSize) {
+ synchronized (streamLock) {
+ if (!streamClosed && stream != null) {
stream.incrementWindow(windowSize);
}
}
}
- /**
- * Close the connection completely
- */
- public void closeConnection(HttpStream stream) {
- synchronized (lock) {
- if (!connectionClosed) {
- connectionClosed = true;
- connection.shutdown();
- connection.close();
+ public void closeStream() {
+ synchronized (streamLock) {
+ if (!streamClosed && stream != null) {
+ streamClosed = true;
stream.close();
}
}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientSpiVerificationTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientSpiVerificationTest.java
index 0cdb3e31cc8d..cc29152075bd 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientSpiVerificationTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientSpiVerificationTest.java
@@ -32,6 +32,7 @@
import com.github.tomakehurst.wiremock.http.Fault;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import java.io.IOException;
+import java.net.ConnectException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
@@ -126,8 +127,7 @@ public void requestFailed_connectionTimeout_shouldWrapException() {
SdkHttpRequest request = createRequest(uri);
RecordingResponseHandler recorder = new RecordingResponseHandler();
client.execute(AsyncExecuteRequest.builder().request(request).requestContentPublisher(createProvider("")).responseHandler(recorder).build());
- assertThatThrownBy(() -> recorder.completeFuture().get(5, TimeUnit.SECONDS)).hasCauseInstanceOf(IOException.class)
- .hasRootCauseInstanceOf(HttpException.class);
+ assertThatThrownBy(() -> recorder.completeFuture().get(5, TimeUnit.SECONDS)).hasCauseInstanceOf(ConnectException.class);
}
}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientWireMockTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientWireMockTest.java
index c9f76057913b..2efa2e56e9f2 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientWireMockTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientWireMockTest.java
@@ -64,15 +64,6 @@ public void closeClient_reuse_throwException() {
assertThatThrownBy(() -> makeSimpleRequest(client)).hasMessageContaining("is closed");
}
- @Test
- public void invalidProtocol_shouldThrowException() {
- AttributeMap attributeMap = AttributeMap.builder()
- .put(PROTOCOL, Protocol.HTTP2)
- .build();
- assertThatThrownBy(() -> AwsCrtAsyncHttpClient.builder().buildWithDefaults(attributeMap))
- .isInstanceOf(UnsupportedOperationException.class);
- }
-
@Test
public void sendRequest_withCollector_shouldCollectMetrics() throws Exception {
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java
index 4f024993f26c..79d5475a7323 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java
@@ -24,6 +24,7 @@
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static software.amazon.awssdk.http.crt.CrtHttpClientTestUtils.createRequest;
import com.github.tomakehurst.wiremock.http.Fault;
@@ -84,8 +85,8 @@ private byte[] generateRandomBody(int size) {
}
- @Test(expected = IOException.class)
- public void requestFailed_connectionTimeout_shouldWrapException() throws IOException {
+ @Test
+ public void requestFailed_connectionTimeout_shouldWrapException() {
try (SdkHttpClient client = AwsCrtHttpClient.builder().connectionTimeout(Duration.ofNanos(1)).build()) {
URI uri = URI.create("http://localhost:" + mockServer.port());
stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFault(Fault.RANDOM_DATA_THEN_CLOSE)));
@@ -94,12 +95,12 @@ public void requestFailed_connectionTimeout_shouldWrapException() throws IOExcep
executeRequestBuilder.request(request);
ExecutableHttpRequest executableRequest = client.prepareRequest(executeRequestBuilder.build());
- executableRequest.call();
+ assertThatThrownBy(() -> executableRequest.call()).isInstanceOf(IOException.class)
+ .hasMessageContaining("operation timed out");
}
}
-
- @Test(expected = HttpException.class)
+ @Test
public void requestFailed_notRetryable_shouldNotWrapException() throws IOException {
try (SdkHttpClient client = AwsCrtHttpClient.builder().build()) {
URI uri = URI.create("http://localhost:" + mockServer.port());
@@ -117,7 +118,8 @@ public void requestFailed_notRetryable_shouldNotWrapException() throws IOExcepti
executeRequestBuilder.request(request);
executeRequestBuilder.contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]));
ExecutableHttpRequest executableRequest = client.prepareRequest(executeRequestBuilder.build());
- executableRequest.call();
+ assertThatThrownBy(() -> executableRequest.call()).isInstanceOf(HttpException.class)
+ .hasMessageContaining("does not match the previously declared length");
}
}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/H2BehaviorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/H2BehaviorTest.java
new file mode 100644
index 000000000000..8a8ec2415982
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/H2BehaviorTest.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt;
+
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static software.amazon.awssdk.http.HttpTestUtils.sendGetRequest;
+import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL;
+import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
+import io.netty.handler.codec.http2.Http2DataFrame;
+import io.netty.handler.codec.http2.Http2Frame;
+import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
+import io.netty.handler.codec.http2.Http2FrameLogger;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2MultiplexHandler;
+import io.netty.handler.codec.http2.Http2SecurityUtil;
+import io.netty.handler.codec.http2.Http2Settings;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.ssl.ApplicationProtocolConfig;
+import io.netty.handler.ssl.ApplicationProtocolNames;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.SupportedCipherSuiteFilter;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import java.util.concurrent.CompletableFuture;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.utils.AttributeMap;
+
+public class H2BehaviorTest {
+ private SdkAsyncHttpClient crt;
+ private H2Server server;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ server = new H2Server(true);
+ server.init();
+ crt = AwsCrtAsyncHttpClient.builder()
+ .buildWithDefaults(AttributeMap.builder()
+ .put(TRUST_ALL_CERTIFICATES, true)
+ .put(PROTOCOL, Protocol.HTTP2)
+ .build());
+ }
+
+ @AfterEach
+ public void teardown() throws InterruptedException {
+ if (server != null) {
+ server.shutdown();
+ }
+ server = null;
+
+ if (crt != null) {
+ crt.close();
+ }
+ crt = null;
+ }
+
+ @Test
+ public void sendH2Request_overTls() throws Exception {
+ CompletableFuture> request = sendGetRequest(server.port(), crt);
+ request.join();
+ }
+
+ @Test
+ public void sendH2Request_overPlaintext_usesPriorKnowledge() throws Exception {
+ H2Server h2cServer = new H2Server(false);
+ h2cServer.init();
+ try (SdkAsyncHttpClient h2cClient = AwsCrtAsyncHttpClient.builder()
+ .buildWithDefaults(AttributeMap.builder()
+ .put(PROTOCOL, Protocol.HTTP2)
+ .build())) {
+ CompletableFuture> request = sendGetRequest(h2cServer.port(), h2cClient, false);
+ request.join();
+ } finally {
+ h2cServer.shutdown();
+ }
+ }
+
+ private static class H2Server extends ChannelInitializer {
+ private final boolean useTls;
+ private final NioEventLoopGroup group = new NioEventLoopGroup();
+ private ServerBootstrap bootstrap;
+ private ServerSocketChannel serverSock;
+ private SslContext sslCtx;
+
+ H2Server(boolean useTls) {
+ this.useTls = useTls;
+ }
+
+ void init() throws Exception {
+ if (useTls) {
+ SelfSignedCertificate ssc = new SelfSignedCertificate();
+ sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
+ .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
+ .sslProvider(SslProvider.JDK)
+ .applicationProtocolConfig(new ApplicationProtocolConfig(
+ ApplicationProtocolConfig.Protocol.ALPN,
+ ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
+ ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
+ ApplicationProtocolNames.HTTP_2))
+ .build();
+ }
+
+ bootstrap = new ServerBootstrap()
+ .channel(NioServerSocketChannel.class)
+ .group(group)
+ .childHandler(this);
+
+ serverSock = (ServerSocketChannel) bootstrap.bind(0).sync().channel();
+ }
+
+ @Override
+ protected void initChannel(Channel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+
+ if (useTls) {
+ pipeline.addLast(sslCtx.newHandler(ch.alloc()));
+ }
+
+ pipeline.addLast(Http2FrameCodecBuilder.forServer()
+ .autoAckSettingsFrame(true)
+ .autoAckPingFrame(true)
+ .initialSettings(Http2Settings.defaultSettings())
+ .frameLogger(new Http2FrameLogger(LogLevel.DEBUG))
+ .build());
+
+ pipeline.addLast(new Http2MultiplexHandler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(Channel ch) {
+ ch.pipeline().addLast(new SimpleChannelInboundHandler() {
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) {
+ if (frame instanceof Http2DataFrame) {
+ Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
+ ctx.write(new DefaultHttp2HeadersFrame(headers, false));
+ ctx.write(new DefaultHttp2DataFrame(true));
+ ctx.flush();
+ }
+ }
+ });
+ }
+ }));
+ }
+
+ void shutdown() throws InterruptedException {
+ group.shutdownGracefully().await();
+ serverSock.close();
+ }
+
+ int port() {
+ return serverSock.localAddress().getPort();
+ }
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/H2ErrorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/H2ErrorTest.java
new file mode 100644
index 000000000000..11b6caf7429d
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/H2ErrorTest.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static software.amazon.awssdk.http.HttpTestUtils.sendGetRequest;
+import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL;
+import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http2.DefaultHttp2GoAwayFrame;
+import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
+import io.netty.handler.codec.http2.Http2DataFrame;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Frame;
+import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
+import io.netty.handler.codec.http2.Http2FrameLogger;
+import io.netty.handler.codec.http2.Http2HeadersFrame;
+import io.netty.handler.codec.http2.Http2MultiplexHandler;
+import io.netty.handler.codec.http2.Http2SecurityUtil;
+import io.netty.handler.codec.http2.Http2Settings;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.ssl.ApplicationProtocolConfig;
+import io.netty.handler.ssl.ApplicationProtocolNames;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.SupportedCipherSuiteFilter;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.utils.AttributeMap;
+
+/**
+ * Tests HTTP/2 error scenarios: RST_STREAM and GOAWAY frames.
+ */
+public class H2ErrorTest {
+ private SdkAsyncHttpClient client;
+
+ @BeforeEach
+ public void setup() {
+ client = AwsCrtAsyncHttpClient.builder()
+ .buildWithDefaults(AttributeMap.builder()
+ .put(TRUST_ALL_CERTIFICATES, true)
+ .put(PROTOCOL, Protocol.HTTP2)
+ .build());
+ }
+
+ @AfterEach
+ public void teardown() {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Test
+ public void serverSendsRstStream_shouldThrowIOException() throws Exception {
+ H2ErrorServer server = new H2ErrorServer(ErrorType.RST_STREAM);
+ server.init();
+ try {
+ CompletableFuture> request = sendGetRequest(server.port(), client);
+ assertThatThrownBy(request::join)
+ .isInstanceOf(CompletionException.class)
+ .hasCauseInstanceOf(IOException.class)
+ .hasMessageContaining("RST_STREAM");
+ } finally {
+ server.shutdown();
+ }
+ }
+
+ @Test
+ public void serverSendsGoAway_shouldThrowIOException() throws Exception {
+ H2ErrorServer server = new H2ErrorServer(ErrorType.GOAWAY);
+ server.init();
+ try {
+ CompletableFuture> request = sendGetRequest(server.port(), client);
+ assertThatThrownBy(request::join)
+ .isInstanceOf(CompletionException.class)
+ .hasCauseInstanceOf(IOException.class)
+ .hasMessageContaining("connection has closed");
+ } finally {
+ server.shutdown();
+ }
+ }
+
+ private enum ErrorType {
+ RST_STREAM,
+ GOAWAY
+ }
+
+ private static class H2ErrorServer extends ChannelInitializer {
+ private final ErrorType errorType;
+ private final NioEventLoopGroup group = new NioEventLoopGroup();
+ private ServerBootstrap bootstrap;
+ private ServerSocketChannel serverSock;
+ private SslContext sslCtx;
+
+ H2ErrorServer(ErrorType errorType) {
+ this.errorType = errorType;
+ }
+
+ void init() throws Exception {
+ SelfSignedCertificate ssc = new SelfSignedCertificate();
+ sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
+ .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
+ .sslProvider(SslProvider.JDK)
+ .applicationProtocolConfig(new ApplicationProtocolConfig(
+ ApplicationProtocolConfig.Protocol.ALPN,
+ ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
+ ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
+ ApplicationProtocolNames.HTTP_2))
+ .build();
+
+ bootstrap = new ServerBootstrap()
+ .channel(NioServerSocketChannel.class)
+ .group(group)
+ .childHandler(this);
+
+ serverSock = (ServerSocketChannel) bootstrap.bind(0).sync().channel();
+ }
+
+ @Override
+ protected void initChannel(Channel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(sslCtx.newHandler(ch.alloc()));
+ pipeline.addLast(Http2FrameCodecBuilder.forServer()
+ .autoAckSettingsFrame(true)
+ .autoAckPingFrame(true)
+ .initialSettings(Http2Settings.defaultSettings())
+ .frameLogger(new Http2FrameLogger(LogLevel.DEBUG))
+ .build());
+
+ pipeline.addLast(new Http2MultiplexHandler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(Channel ch) {
+ ch.pipeline().addLast(new ErrorFrameHandler(errorType));
+ }
+ }));
+ }
+
+ void shutdown() throws InterruptedException {
+ group.shutdownGracefully().await();
+ serverSock.close();
+ }
+
+ int port() {
+ return serverSock.localAddress().getPort();
+ }
+
+ private static class ErrorFrameHandler extends SimpleChannelInboundHandler {
+ private final ErrorType errorType;
+
+ ErrorFrameHandler(ErrorType errorType) {
+ this.errorType = errorType;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) {
+ if (frame instanceof Http2HeadersFrame || frame instanceof Http2DataFrame) {
+ switch (errorType) {
+ case RST_STREAM:
+ ctx.writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.INTERNAL_ERROR));
+ break;
+ case GOAWAY:
+ ctx.channel().parent().writeAndFlush(new DefaultHttp2GoAwayFrame(Http2Error.INTERNAL_ERROR));
+ break;
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java
index 8e2426cc5371..58d11e068c5b 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java
@@ -25,8 +25,6 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -36,18 +34,15 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
-import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
import software.amazon.awssdk.crt.http.HttpStream;
-import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
-import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
+import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.utils.async.SimplePublisher;
@ExtendWith(MockitoExtension.class)
public abstract class BaseHttpStreamResponseHandlerTest {
- @Mock HttpClientConnection crtConn;
CompletableFuture requestFuture;
@Mock
@@ -56,11 +51,11 @@ public abstract class BaseHttpStreamResponseHandlerTest {
@Mock
SimplePublisher simplePublisher;
- HttpStreamResponseHandler responseHandler;
+ HttpStreamBaseResponseHandler responseHandler;
- abstract HttpStreamResponseHandler responseHandler();
+ abstract HttpStreamBaseResponseHandler responseHandler();
- abstract HttpStreamResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher);
+ abstract HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher);
@BeforeEach
public void setUp() {
@@ -69,7 +64,7 @@ public void setUp() {
}
@Test
- void serverError_shouldShutdownConnection() {
+ void serverError_shouldCloseStream() {
HttpHeader[] httpHeaders = getHttpHeaders();
responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(),
httpHeaders);
@@ -77,13 +72,12 @@ void serverError_shouldShutdownConnection() {
responseHandler.onResponseHeadersDone(httpStream, 0);
responseHandler.onResponseComplete(httpStream, 0);
requestFuture.join();
- verify(crtConn).close();
verify(httpStream).close();
}
@ParameterizedTest
@ValueSource(ints = { 200, 400, 202, 403 })
- void nonServerError_shouldNotShutdownConnection(int statusCode) {
+ void nonServerError_shouldCloseStream(int statusCode) {
HttpHeader[] httpHeaders = getHttpHeaders();
responseHandler.onResponseHeaders(httpStream, statusCode, HttpHeaderBlock.MAIN.getValue(),
httpHeaders);
@@ -92,21 +86,17 @@ void nonServerError_shouldNotShutdownConnection(int statusCode) {
responseHandler.onResponseComplete(httpStream, 0);
requestFuture.join();
- verify(crtConn, never()).shutdown();
- verify(crtConn).close();
verify(httpStream).close();
}
@Test
- void failedToGetResponse_shouldShutdownConnection() {
+ void failedToGetResponse_shouldCloseStream() {
HttpHeader[] httpHeaders = getHttpHeaders();
responseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
httpHeaders);
responseHandler.onResponseComplete(httpStream, 1);
assertThatThrownBy(() -> requestFuture.join()).hasRootCauseInstanceOf(HttpException.class);
- verify(crtConn).shutdown();
- verify(crtConn).close();
verify(httpStream).close();
}
@@ -120,18 +110,17 @@ void streamClosed_shouldNotIncreaseStreamWindow() throws InterruptedException {
responseHandler.onResponseComplete(httpStream, 0);
requestFuture.join();
- verify(crtConn).close();
verify(httpStream).close();
verify(httpStream, never()).incrementWindow(anyInt());
}
@Test
- void publisherWritesFutureFails_shouldShutdownConnection() {
+ void publisherWritesFutureFails_shouldCloseStream() {
SimplePublisher simplePublisher = Mockito.mock(SimplePublisher.class);
CompletableFuture future = new CompletableFuture<>();
when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future);
- HttpStreamResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher);
+ HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher);
HttpHeader[] httpHeaders = getHttpHeaders();
handler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
@@ -148,19 +137,17 @@ void publisherWritesFutureFails_shouldShutdownConnection() {
// we don't verify here because it behaves differently in async and sync
}
- verify(crtConn).shutdown();
- verify(crtConn).close();
verify(httpStream).close();
verify(httpStream, never()).incrementWindow(anyInt());
}
@Test
- void publisherWritesFutureCompletesAfterConnectionClosed_shouldNotInvokeIncrementWindow() {
+ void publisherWritesFutureCompletesAfterStreamClosed_shouldNotInvokeIncrementWindow() {
CompletableFuture future = new CompletableFuture<>();
when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future);
when(simplePublisher.complete()).thenReturn(future);
- HttpStreamResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher);
+ HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher);
HttpHeader[] httpHeaders = getHttpHeaders();
@@ -174,19 +161,17 @@ void publisherWritesFutureCompletesAfterConnectionClosed_shouldNotInvokeIncremen
future.complete(null);
requestFuture.join();
- verify(crtConn, never()).shutdown();
- verify(crtConn).close();
verify(httpStream).close();
verify(httpStream, never()).incrementWindow(anyInt());
}
@Test
- void publisherWritesFutureCompletesBeforeConnectionClosed_shouldInvokeIncrementWindow() {
+ void publisherWritesFutureCompletesBeforeStreamClosed_shouldInvokeIncrementWindow() {
CompletableFuture future = new CompletableFuture<>();
when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future);
when(simplePublisher.complete()).thenReturn(future);
- HttpStreamResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher);
+ HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher);
HttpHeader[] httpHeaders = getHttpHeaders();
@@ -201,9 +186,6 @@ void publisherWritesFutureCompletesBeforeConnectionClosed_shouldInvokeIncrementW
handler.onResponseComplete(httpStream, 0);
requestFuture.join();
verify(httpStream).incrementWindow(anyInt());
-
- verify(crtConn, never()).shutdown();
- verify(crtConn).close();
verify(httpStream).close();
}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutorTest.java
index feaf9db3d472..ff95324f0a74 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutorTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutorTest.java
@@ -39,15 +39,15 @@
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.crt.CrtRuntimeException;
-import software.amazon.awssdk.crt.http.HttpClientConnection;
-import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpException;
-import software.amazon.awssdk.crt.http.HttpRequest;
+import software.amazon.awssdk.crt.http.HttpRequestBase;
+import software.amazon.awssdk.crt.http.HttpStreamBase;
+import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
+import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
-import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
import software.amazon.awssdk.utils.CompletableFutureUtils;
@ExtendWith(MockitoExtension.class)
@@ -55,18 +55,18 @@ public class CrtAsyncRequestExecutorTest {
private CrtAsyncRequestExecutor requestExecutor;
@Mock
- private HttpClientConnectionManager connectionManager;
+ private HttpStreamManager streamManager;
@Mock
private SdkAsyncHttpResponseHandler responseHandler;
@Mock
- private HttpClientConnection httpClientConnection;
+ private HttpStreamBase httpStream;
public static Stream>> mappedExceptions() {
return Stream.of(
- new SimpleEntry<>(0x0405, SSLHandshakeException.class), // For AWS_IO_TLS_ERROR_NEGOTIATION_FAILURE (1029)
- new SimpleEntry<>(0x0418, ConnectException.class) // For AWS_IO_SOCKET_TIMEOUT (1048)
+ new SimpleEntry<>(1029, SSLHandshakeException.class), // CRT_TLS_NEGOTIATION_ERROR_CODE
+ new SimpleEntry<>(1048, ConnectException.class) // CRT_SOCKET_TIMEOUT
);
}
@@ -77,22 +77,17 @@ public void setup() {
@AfterEach
public void teardown() {
- Mockito.reset(connectionManager, responseHandler, httpClientConnection);
+ Mockito.reset(streamManager, responseHandler, httpStream);
}
@Test
- public void acquireConnectionThrowException_shouldInvokeOnError() {
- RuntimeException exception = new RuntimeException("error");
+ public void execute_requestConversionFails_invokesOnError() {
CrtAsyncRequestContext context = CrtAsyncRequestContext.builder()
- .crtConnPool(connectionManager)
+ .crtConnPool(streamManager)
.request(AsyncExecuteRequest.builder()
- .responseHandler(responseHandler)
- .build())
+ .responseHandler(responseHandler)
+ .build())
.build();
- CompletableFuture completableFuture = new CompletableFuture<>();
-
- Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
- completableFuture.completeExceptionally(exception);
CompletableFuture executeFuture = requestExecutor.execute(context);
@@ -100,23 +95,19 @@ public void acquireConnectionThrowException_shouldInvokeOnError() {
Mockito.verify(responseHandler).onError(argumentCaptor.capture());
Exception actualException = argumentCaptor.getValue();
- assertThat(actualException).hasMessageContaining("An exception occurred when acquiring a connection");
- assertThat(actualException).hasCause(exception);
- assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class);
+ assertThat(actualException).isInstanceOf(NullPointerException.class);
+ assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class);
}
@Test
- public void invalidRequest_requestConversionThrowError_shouldInvokeOnError() {
- CrtAsyncRequestContext context = CrtAsyncRequestContext.builder()
- .crtConnPool(connectionManager)
- .request(AsyncExecuteRequest.builder()
- .responseHandler(responseHandler)
- .build())
- .build();
- CompletableFuture completableFuture = new CompletableFuture<>();
+ public void execute_acquireStreamFails_invokesOnErrorAndWrapsWithIOException() {
+ IllegalStateException exception = new IllegalStateException("connection closed");
+ CrtAsyncRequestContext context = crtAsyncRequestContext();
+ CompletableFuture completableFuture = new CompletableFuture<>();
- Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
- completableFuture.complete(httpClientConnection);
+ Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequestBase.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
+ .thenReturn(completableFuture);
+ completableFuture.completeExceptionally(exception);
CompletableFuture executeFuture = requestExecutor.execute(context);
@@ -124,21 +115,18 @@ public void invalidRequest_requestConversionThrowError_shouldInvokeOnError() {
Mockito.verify(responseHandler).onError(argumentCaptor.capture());
Exception actualException = argumentCaptor.getValue();
- assertThat(actualException).isInstanceOf(NullPointerException.class);
- assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class);
+ assertThat(actualException).hasCause(exception);
+ assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class);
}
@Test
- public void executeAsyncRequest_CrtRuntimeException_shouldInvokeOnError() {
+ public void execute_crtRuntimeException_invokesOnError() {
CrtRuntimeException exception = new CrtRuntimeException("");
CrtAsyncRequestContext context = crtAsyncRequestContext();
- CompletableFuture completableFuture = new CompletableFuture<>();
-
- Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
- completableFuture.complete(httpClientConnection);
+ CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception);
- Mockito.when(httpClientConnection.makeRequest(Mockito.any(HttpRequest.class), Mockito.any(CrtResponseAdapter.class)))
- .thenThrow(exception);
+ Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequestBase.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
+ .thenReturn(completableFuture);
CompletableFuture executeFuture = requestExecutor.execute(context);
@@ -146,22 +134,17 @@ public void executeAsyncRequest_CrtRuntimeException_shouldInvokeOnError() {
Mockito.verify(responseHandler).onError(argumentCaptor.capture());
Exception actualException = argumentCaptor.getValue();
- assertThat(actualException).hasMessageContaining("An exception occurred when making the request");
assertThat(actualException).hasCause(exception);
assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class);
}
@Test
- public void cancelRequest_shouldInvokeOnError() {
- CrtAsyncRequestContext context = CrtAsyncRequestContext.builder()
- .crtConnPool(connectionManager)
- .request(AsyncExecuteRequest.builder()
- .responseHandler(responseHandler)
- .build())
- .build();
- CompletableFuture completableFuture = new CompletableFuture<>();
+ public void execute_requestCancelled_invokesOnError() {
+ CrtAsyncRequestContext context = crtAsyncRequestContext();
+ CompletableFuture completableFuture = new CompletableFuture<>();
- Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
+ Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequestBase.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
+ .thenReturn(completableFuture);
CompletableFuture executeFuture = requestExecutor.execute(context);
executeFuture.cancel(true);
@@ -175,12 +158,13 @@ public void cancelRequest_shouldInvokeOnError() {
}
@Test
- public void execute_AcquireConnectionFailure_shouldAlwaysWrapIOException() {
+ public void execute_retryableHttpException_wrapsWithIOException() {
+ HttpException exception = new HttpException(0x080a); // AWS_ERROR_HTTP_CONNECTION_CLOSED
CrtAsyncRequestContext context = crtAsyncRequestContext();
- RuntimeException exception = new RuntimeException("some failure");
- CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception);
+ CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception);
- Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
+ Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequestBase.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
+ .thenReturn(completableFuture);
CompletableFuture executeFuture = requestExecutor.execute(context);
assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(IOException.class).hasRootCause(exception);
@@ -188,76 +172,29 @@ public void execute_AcquireConnectionFailure_shouldAlwaysWrapIOException() {
@ParameterizedTest
@MethodSource("mappedExceptions")
- public void execute_AcquireConnectionFailure_shouldAlwaysBeInstanceOfIOException(Entry> entry) {
+ public void execute_httpException_mapsToCorrectException(Entry> entry) {
int errorCode = entry.getKey();
- Class extends Throwable> ioExceptionSubclass = entry.getValue();
+ Class extends Throwable> expectedExceptionClass = entry.getValue();
CrtAsyncRequestContext context = crtAsyncRequestContext();
HttpException exception = new HttpException(errorCode);
- CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception);
-
- Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
-
- CompletableFuture executeFuture = requestExecutor.execute(context);
- assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(IOException.class).hasMessageContaining(exception.getMessage());
- assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(ioExceptionSubclass);
- }
-
- @Test
- public void executeRequest_failedOfIllegalStateException_shouldWrapIOException() {
- IllegalStateException exception = new IllegalStateException("connection closed");
- CrtAsyncRequestContext context = crtAsyncRequestContext();
- CompletableFuture completableFuture = new CompletableFuture<>();
-
- Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
- completableFuture.complete(httpClientConnection);
+ CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception);
- Mockito.when(httpClientConnection.makeRequest(Mockito.any(HttpRequest.class), Mockito.any(CrtResponseAdapter.class)))
- .thenThrow(exception);
+ Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequestBase.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
+ .thenReturn(completableFuture);
CompletableFuture executeFuture = requestExecutor.execute(context);
-
- ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Exception.class);
- Mockito.verify(responseHandler).onError(argumentCaptor.capture());
-
- Exception actualException = argumentCaptor.getValue();
- assertThat(actualException).hasMessageContaining("An exception occurred when making the request").hasCause(exception);
- assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(IOException.class).hasRootCause(exception);
+ assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass);
}
@Test
- public void executeRequest_failedOfRetryableHttpException_shouldWrapIOException() {
- HttpException exception = new HttpException(0x080a); // AWS_ERROR_HTTP_CONNECTION_CLOSED
- CrtAsyncRequestContext context = crtAsyncRequestContext();
- CompletableFuture completableFuture = new CompletableFuture<>();
-
- Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
- completableFuture.complete(httpClientConnection);
-
- Mockito.when(httpClientConnection.makeRequest(Mockito.any(HttpRequest.class), Mockito.any(CrtResponseAdapter.class)))
- .thenThrow(exception);
-
- CompletableFuture executeFuture = requestExecutor.execute(context);
-
- ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Exception.class);
- Mockito.verify(responseHandler).onError(argumentCaptor.capture());
-
- Exception actualException = argumentCaptor.getValue();
- assertThat(actualException).hasCause(exception);
- assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(IOException.class).hasRootCause(exception);
- }
-
- @Test
- public void executeRequest_failedOfNonRetryableHttpException_shouldNotWrapIOException() {
+ public void execute_nonRetryableHttpException_doesNotWrapWithIOException() {
HttpException exception = new HttpException(0x0801); // AWS_ERROR_HTTP_HEADER_NOT_FOUND
CrtAsyncRequestContext context = crtAsyncRequestContext();
- CompletableFuture completableFuture = new CompletableFuture<>();
-
- Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
- completableFuture.complete(httpClientConnection);
+ CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception);
- Mockito.when(httpClientConnection.makeRequest(Mockito.any(HttpRequest.class), Mockito.any(CrtResponseAdapter.class)))
- .thenThrow(exception);
+ Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequestBase.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
+ .thenReturn(completableFuture);
CompletableFuture executeFuture = requestExecutor.execute(context);
@@ -273,7 +210,7 @@ private CrtAsyncRequestContext crtAsyncRequestContext() {
SdkHttpFullRequest request = createRequest(URI.create("http://localhost"));
return CrtAsyncRequestContext.builder()
.readBufferSize(2000)
- .crtConnPool(connectionManager)
+ .crtConnPool(streamManager)
.request(AsyncExecuteRequest.builder()
.request(request)
.requestContentPublisher(createProvider(""))
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java
index 8fdf5b371fab..a7c601495c9e 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java
@@ -38,11 +38,11 @@
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.crt.CrtRuntimeException;
-import software.amazon.awssdk.crt.http.HttpClientConnection;
-import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpRequest;
-import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
+import software.amazon.awssdk.crt.http.HttpStreamBase;
+import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
+import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpFullResponse;
@@ -53,10 +53,10 @@ public class CrtRequestExecutorTest {
private CrtRequestExecutor requestExecutor;
@Mock
- private HttpClientConnectionManager connectionManager;
+ private HttpStreamManager streamManager;
@Mock
- private HttpClientConnection httpClientConnection;
+ private HttpStreamBase httpStream;
public static Stream retryableExceptions() {
return Stream.of(new CrtRuntimeException(""), new HttpException(0x080a), new IllegalStateException(
@@ -65,8 +65,8 @@ public static Stream retryableExceptions() {
public static Stream>> mappedExceptions() {
return Stream.of(
- new SimpleEntry<>(0x0405, SSLHandshakeException.class), // For AWS_IO_TLS_ERROR_NEGOTIATION_FAILURE (1029)
- new SimpleEntry<>(0x0418, ConnectException.class) // For AWS_IO_SOCKET_TIMEOUT (1048)
+ new SimpleEntry<>(1029, SSLHandshakeException.class), // CRT_TLS_NEGOTIATION_ERROR_CODE
+ new SimpleEntry<>(1048, ConnectException.class) // CRT_SOCKET_TIMEOUT
);
}
@@ -77,16 +77,29 @@ public void setup() {
@AfterEach
public void teardown() {
- Mockito.reset(connectionManager, httpClientConnection);
+ Mockito.reset(streamManager, httpStream);
}
@Test
- public void acquireConnectionThrowException_shouldInvokeOnError() {
- RuntimeException exception = new RuntimeException("error");
+ public void execute_requestConversionFails_failsFuture() {
+ CrtRequestContext context = CrtRequestContext.builder()
+ .crtConnPool(streamManager)
+ .request(HttpExecuteRequest.builder().build())
+ .build();
+
+ CompletableFuture executeFuture = requestExecutor.execute(context);
+
+ assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class);
+ }
+
+ @Test
+ public void execute_acquireStreamFails_wrapsWithIOException() {
+ IllegalStateException exception = new IllegalStateException("connection closed");
CrtRequestContext context = crtRequestContext();
- CompletableFuture completableFuture = new CompletableFuture<>();
+ CompletableFuture completableFuture = new CompletableFuture<>();
- Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
+ Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
+ .thenReturn(completableFuture);
completableFuture.completeExceptionally(exception);
CompletableFuture executeFuture = requestExecutor.execute(context);
@@ -96,60 +109,42 @@ public void acquireConnectionThrowException_shouldInvokeOnError() {
@ParameterizedTest
@MethodSource("retryableExceptions")
- public void makeRequestFailed_retryableException_shouldWrapWithIOException(Throwable throwable) {
+ public void execute_retryableException_wrapsWithIOException(Throwable throwable) {
CrtRequestContext context = crtRequestContext();
- CompletableFuture completableFuture = new CompletableFuture<>();
+ CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(throwable);
- Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
- completableFuture.complete(httpClientConnection);
-
- Mockito.when(httpClientConnection.makeRequest(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamResponseHandler.class)))
- .thenThrow(throwable);
+ Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
+ .thenReturn(completableFuture);
CompletableFuture executeFuture = requestExecutor.execute(context);
assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(throwable).isInstanceOf(IOException.class);
}
- @Test
- public void execute_AcquireConnectionFailure_shouldAlwaysWrapIOException() {
- CrtRequestContext context = crtRequestContext();
- RuntimeException exception = new RuntimeException("some failure");
- CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception);
-
- Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
-
- CompletableFuture executeFuture = requestExecutor.execute(context);
- assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(IOException.class).hasRootCause(exception);
- }
-
@ParameterizedTest
@MethodSource("mappedExceptions")
- public void execute_AcquireConnectionFailure_shouldAlwaysBeInstanceOfIOException(Entry> entry) {
+ public void execute_httpException_mapsToCorrectException(Entry> entry) {
int errorCode = entry.getKey();
- Class extends Throwable> ioExceptionSubclass = entry.getValue();
+ Class extends Throwable> expectedExceptionClass = entry.getValue();
CrtRequestContext context = crtRequestContext();
HttpException exception = new HttpException(errorCode);
- CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception);
+ CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception);
- Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
+ Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
+ .thenReturn(completableFuture);
CompletableFuture executeFuture = requestExecutor.execute(context);
- assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(IOException.class).hasMessageContaining(exception.getMessage());
- assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(ioExceptionSubclass);
+ assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass);
}
@Test
- public void executeRequest_failedOfNonRetryableHttpException_shouldNotWrapIOException() {
+ public void execute_nonRetryableHttpException_doesNotWrapWithIOException() {
HttpException exception = new HttpException(0x0801); // AWS_ERROR_HTTP_HEADER_NOT_FOUND
CrtRequestContext context = crtRequestContext();
- CompletableFuture completableFuture = new CompletableFuture<>();
-
- Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
- completableFuture.complete(httpClientConnection);
+ CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception);
- Mockito.when(httpClientConnection.makeRequest(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamResponseHandler.class)))
- .thenThrow(exception);
+ Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
+ .thenReturn(completableFuture);
CompletableFuture executeFuture = requestExecutor.execute(context);
assertThatThrownBy(executeFuture::join).hasCause(exception);
@@ -159,7 +154,7 @@ private CrtRequestContext crtRequestContext() {
SdkHttpFullRequest request = createRequest(URI.create("http://localhost"));
return CrtRequestContext.builder()
.readBufferSize(2000)
- .crtConnPool(connectionManager)
+ .crtConnPool(streamManager)
.request(HttpExecuteRequest.builder()
.request(request)
.contentStreamProvider(SdkBytes.fromUtf8String("test").asContentStreamProvider())
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java
index 0628efaf15b2..e8865f5e928b 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java
@@ -16,58 +16,51 @@
package software.amazon.awssdk.http.crt.internal;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-import software.amazon.awssdk.core.http.HttpResponseHandler;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
-import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
-import software.amazon.awssdk.http.SdkHttpFullResponse;
+import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
-import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
import software.amazon.awssdk.utils.async.SimplePublisher;
public class CrtResponseHandlerTest extends BaseHttpStreamResponseHandlerTest {
@Override
- HttpStreamResponseHandler responseHandler() {
+ HttpStreamBaseResponseHandler responseHandler() {
AsyncResponseHandler responseHandler = new AsyncResponseHandler<>((response,
executionAttributes) -> null, Function.identity(), new ExecutionAttributes());
responseHandler.prepare();
- return CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, responseHandler);
+ return CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler);
}
@Override
- HttpStreamResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher) {
+ HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher) {
AsyncResponseHandler responseHandler = new AsyncResponseHandler<>((response,
executionAttributes) -> null, Function.identity(), new ExecutionAttributes());
responseHandler.prepare();
- return new CrtResponseAdapter(crtConn, requestFuture, responseHandler, simplePublisher);
+ return new CrtResponseAdapter(requestFuture, responseHandler, simplePublisher);
}
@Test
- void publisherFailedToDeliverEvents_shouldShutDownConnection() {
+ void onResponseComplete_publisherCancelled_closesStream() {
SdkAsyncHttpResponseHandler responseHandler = new TestAsyncHttpResponseHandler();
- HttpStreamResponseHandler crtResponseHandler = CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, responseHandler);
+ HttpStreamBaseResponseHandler crtResponseHandler = CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler);
HttpHeader[] httpHeaders = getHttpHeaders();
crtResponseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
httpHeaders);
@@ -77,8 +70,6 @@ void publisherFailedToDeliverEvents_shouldShutDownConnection() {
crtResponseHandler.onResponseComplete(httpStream, 0);
assertThatThrownBy(() -> requestFuture.join()).isInstanceOf(CancellationException.class).hasMessageContaining(
"subscription has been cancelled");
- verify(crtConn).shutdown();
- verify(crtConn).close();
verify(httpStream).close();
}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java
index 45c7fdccbfe9..62906295ba5c 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java
@@ -16,58 +16,36 @@
package software.amazon.awssdk.http.crt.internal;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import io.reactivex.Completable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import net.bytebuddy.utility.RandomString;
import org.apache.commons.lang3.RandomStringUtils;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
-import software.amazon.awssdk.crt.http.HttpStream;
-import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
+import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.SdkHttpFullResponse;
-import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
-import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
-import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.async.SimplePublisher;
public class InputStreamAdaptingHttpStreamResponseHandlerTest extends BaseHttpStreamResponseHandlerTest {
@Override
- HttpStreamResponseHandler responseHandler() {
- return new InputStreamAdaptingHttpStreamResponseHandler(crtConn, requestFuture);
+ HttpStreamBaseResponseHandler responseHandler() {
+ return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);
}
@Override
- HttpStreamResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher) {
- return new InputStreamAdaptingHttpStreamResponseHandler(crtConn, requestFuture, simplePublisher);
+ HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher) {
+ return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture, simplePublisher);
}
@Test
- void abortStream_shouldShutDownConnection() throws IOException {
+ void abortStream_shouldCloseStream() throws IOException {
HttpHeader[] httpHeaders = getHttpHeaders();
responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(),
@@ -83,13 +61,11 @@ void abortStream_shouldShutDownConnection() throws IOException {
abortableInputStream.read();
abortableInputStream.abort();
- verify(crtConn).shutdown();
- verify(crtConn).close();
verify(httpStream).close();
}
@Test
- void closeStream_shouldShutdownConnection() throws IOException {
+ void closeStream_shouldCloseStream() throws IOException {
HttpHeader[] httpHeaders = getHttpHeaders();
responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(),
@@ -103,23 +79,19 @@ void closeStream_shouldShutdownConnection() throws IOException {
AbortableInputStream abortableInputStream = response.content().get();
abortableInputStream.read();
- abortableInputStream.abort();
+ abortableInputStream.close();
- verify(crtConn).shutdown();
- verify(crtConn).close();
verify(httpStream).close();
}
@Test
- void cancelFuture_shouldCloseConnection() {
+ void cancelFuture_shouldCloseStream() {
HttpHeader[] httpHeaders = getHttpHeaders();
responseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
httpHeaders);
requestFuture.completeExceptionally(new RuntimeException());
- verify(crtConn).shutdown();
- verify(crtConn).close();
verify(httpStream).close();
}
}
diff --git a/pom.xml b/pom.xml
index d615bd3884a4..b73a2176756b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,7 +133,7 @@
3.1.5
1.17.1
1.37
- 0.40.3
+ 0.43.1
5.10.3
diff --git a/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/AbstractTestCase.java b/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/AbstractTestCase.java
index 1a8ed72777bd..8bbb531bac07 100644
--- a/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/AbstractTestCase.java
+++ b/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/AbstractTestCase.java
@@ -26,6 +26,7 @@
import software.amazon.awssdk.awscore.util.AwsHostNameUtils;
import software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils;
import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.retries.DefaultRetryStrategy;
import software.amazon.awssdk.testutils.service.AwsTestBase;
public class AbstractTestCase extends AwsTestBase {
diff --git a/services/transcribestreaming/pom.xml b/services/transcribestreaming/pom.xml
index 1a943e486b39..aab7e5617804 100644
--- a/services/transcribestreaming/pom.xml
+++ b/services/transcribestreaming/pom.xml
@@ -69,6 +69,12 @@
protocol-core
${awsjavasdk.version}
+
+ software.amazon.awssdk
+ aws-crt-client
+ ${awsjavasdk.version}
+ test
+
software.amazon.awssdk
http-auth-aws
diff --git a/services/transcribestreaming/src/it/java/software/amazon/awssdk/services/transcribestreaming/TranscribeStreamingIntegrationTest.java b/services/transcribestreaming/src/it/java/software/amazon/awssdk/services/transcribestreaming/TranscribeStreamingIntegrationTest.java
index f2806b355470..62a011029c51 100644
--- a/services/transcribestreaming/src/it/java/software/amazon/awssdk/services/transcribestreaming/TranscribeStreamingIntegrationTest.java
+++ b/services/transcribestreaming/src/it/java/software/amazon/awssdk/services/transcribestreaming/TranscribeStreamingIntegrationTest.java
@@ -19,7 +19,6 @@
import static org.mockito.Mockito.verify;
import static software.amazon.awssdk.http.Header.CONTENT_TYPE;
-import io.netty.handler.ssl.SslProvider;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -28,8 +27,10 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
-import org.junit.jupiter.api.condition.EnabledIf;
+import io.netty.handler.ssl.SslProvider;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.reactivestreams.Publisher;
@@ -44,6 +45,8 @@
import software.amazon.awssdk.http.HttpMetric;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.ProtocolNegotiation;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils;
import software.amazon.awssdk.metrics.MetricCollection;
@@ -67,65 +70,90 @@ public class TranscribeStreamingIntegrationTest {
private static final Logger log = Logger.loggerFor(TranscribeStreamingIntegrationTest.class);
private TranscribeStreamingAsyncClient client;
private MetricPublisher mockPublisher;
-
- private static Stream protocolNegotiations() {
- return Stream.of(ProtocolNegotiation.ASSUME_PROTOCOL, ProtocolNegotiation.ALPN);
- }
-
- private static boolean alpnSupported(){
- return NettyUtils.isAlpnSupported(SslProvider.JDK);
- }
-
- @ParameterizedTest
- @MethodSource("protocolNegotiations")
- @EnabledIf("alpnSupported")
- public void testFileWith16kRate(ProtocolNegotiation protocolNegotiation) throws Exception {
- initClient(protocolNegotiation);
-
- CompletableFuture result = client.startStreamTranscription(
- getRequest(16_000),
- new AudioStreamPublisher(getInputStream("silence_16kHz_s16le.wav")),
- TestResponseHandlers.responseHandlerBuilder_Classic());
-
- result.join();
- verifyMetrics();
+ private SdkAsyncHttpClient httpClient;
+
+ private static Stream httpClients() {
+ Stream.Builder builder = Stream.builder();
+
+ // Netty with prior knowledge (ASSUME_PROTOCOL)
+ builder.add(Arguments.of("Netty-PriorKnowledge",
+ NettyNioAsyncHttpClient.builder()
+ .protocol(Protocol.HTTP2)
+ .protocolNegotiation(ProtocolNegotiation.ASSUME_PROTOCOL)
+ .build()));
+
+ // Netty with ALPN (only if supported)
+ if (NettyUtils.isAlpnSupported(SslProvider.JDK)) {
+ builder.add(Arguments.of("Netty-ALPN",
+ NettyNioAsyncHttpClient.builder()
+ .protocol(Protocol.HTTP2)
+ .protocolNegotiation(ProtocolNegotiation.ALPN)
+ .build()));
+ }
+
+ // CRT HTTP/2
+ builder.add(Arguments.of("CRT", AwsCrtAsyncHttpClient.builder().build()));
+
+ return builder.build();
}
- @ParameterizedTest
- @MethodSource("protocolNegotiations")
- @EnabledIf("alpnSupported")
- public void testFileWith8kRate(ProtocolNegotiation protocolNegotiation) throws Exception {
- initClient(protocolNegotiation);
-
- CompletableFuture result = client.startStreamTranscription(
- getRequest(8_000),
- new AudioStreamPublisher(getInputStream("silence_8kHz_s16le.wav")),
- TestResponseHandlers.responseHandlerBuilder_Consumer());
-
- result.get();
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("httpClients")
+ public void testFileWith16kRate(String clientName, SdkAsyncHttpClient httpClient) throws Exception {
+ initClient(httpClient);
+ try {
+ CompletableFuture result = client.startStreamTranscription(
+ getRequest(16_000),
+ new AudioStreamPublisher(getInputStream("silence_16kHz_s16le.wav")),
+ TestResponseHandlers.responseHandlerBuilder_Classic());
+
+ result.join();
+ verifyMetrics();
+ } finally {
+ cleanup();
+ }
}
- private void initClient(ProtocolNegotiation protocolNegotiation) {
- if (client != null) {
- client.close();
- }
- if (mockPublisher != null) {
- mockPublisher.close();
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("httpClients")
+ public void testFileWith8kRate() throws Exception {
+ initClient(httpClient);
+ try {
+ CompletableFuture result = client.startStreamTranscription(
+ getRequest(8_000),
+ new AudioStreamPublisher(getInputStream("silence_8kHz_s16le.wav")),
+ TestResponseHandlers.responseHandlerBuilder_Consumer());
+
+ result.get();
+ verifyMetrics();
+ } finally {
+ cleanup();
}
+ }
+ private void initClient(SdkAsyncHttpClient httpClient) {
+ this.httpClient = httpClient;
mockPublisher = mock(MetricPublisher.class);
client = TranscribeStreamingAsyncClient.builder()
.region(Region.US_EAST_1)
.overrideConfiguration(b -> b.addExecutionInterceptor(new VerifyHeaderInterceptor())
.addMetricPublisher(mockPublisher))
.credentialsProvider(getCredentials())
- .httpClient(NettyNioAsyncHttpClient.builder()
- .protocol(Protocol.HTTP2)
- .protocolNegotiation(protocolNegotiation)
- .build())
+ .httpClient(httpClient)
.build();
}
+ private void cleanup() {
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+ if (httpClient != null) {
+ httpClient.close();
+ httpClient = null;
+ }
+ }
+
private static AwsCredentialsProvider getCredentials() {
return DefaultCredentialsProvider.create();
}
diff --git a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/HttpTestUtils.java b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/HttpTestUtils.java
index 0d6d9ea50bc2..24f6c6fbe3ce 100644
--- a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/HttpTestUtils.java
+++ b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/HttpTestUtils.java
@@ -70,19 +70,24 @@ public static KeyStore getSelfSignedKeyStore() throws Exception {
}
public static CompletableFuture sendGetRequest(int serverPort, SdkAsyncHttpClient client) {
- return sendRequest(serverPort, client, SdkHttpMethod.GET);
+ return sendRequest(serverPort, client, SdkHttpMethod.GET, true);
+ }
+
+ public static CompletableFuture sendGetRequest(int serverPort, SdkAsyncHttpClient client, boolean https) {
+ return sendRequest(serverPort, client, SdkHttpMethod.GET, https);
}
public static CompletableFuture sendHeadRequest(int serverPort, SdkAsyncHttpClient client) {
- return sendRequest(serverPort, client, SdkHttpMethod.HEAD);
+ return sendRequest(serverPort, client, SdkHttpMethod.HEAD, true);
}
private static CompletableFuture sendRequest(int serverPort,
SdkAsyncHttpClient client,
- SdkHttpMethod httpMethod) {
+ SdkHttpMethod httpMethod,
+ boolean https) {
SdkHttpFullRequest request = SdkHttpFullRequest.builder()
.method(httpMethod)
- .protocol("https")
+ .protocol(https ? "https" : "http")
.host("127.0.0.1")
.port(serverPort)
.build();