diff --git a/http-clients/aws-crt-client/pom.xml b/http-clients/aws-crt-client/pom.xml index 3090185d8878..0039eca8e844 100644 --- a/http-clients/aws-crt-client/pom.xml +++ b/http-clients/aws-crt-client/pom.xml @@ -173,6 +173,31 @@ mockito-junit-jupiter test + + io.netty + netty-codec-http2 + test + + + io.netty + netty-common + test + + + io.netty + netty-transport + test + + + io.netty + netty-codec-http + test + + + io.netty + netty-handler + test + diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java index 08b9a5fdc7e8..dce28e5d1cfe 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java @@ -22,7 +22,8 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import software.amazon.awssdk.annotations.SdkPublicApi; -import software.amazon.awssdk.crt.http.HttpClientConnectionManager; +import software.amazon.awssdk.crt.http.HttpStreamManager; +import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.async.AsyncExecuteRequest; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; @@ -91,15 +92,15 @@ public CompletableFuture execute(AsyncExecuteRequest asyncRequest) { * we have a pool and no one can destroy it underneath us until we've finished submitting the * request) */ - try (HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool(poolKey(asyncRequest.request()))) { - CrtAsyncRequestContext context = CrtAsyncRequestContext.builder() - .crtConnPool(crtConnPool) - .readBufferSize(this.readBufferSize) - .request(asyncRequest) - .build(); - - return new CrtAsyncRequestExecutor().execute(context); - } + HttpStreamManager crtConnPool = getOrCreateConnectionPool(poolKey(asyncRequest.request())); + CrtAsyncRequestContext context = CrtAsyncRequestContext.builder() + .crtConnPool(crtConnPool) + .readBufferSize(this.readBufferSize) + .request(asyncRequest) + .protocol(this.protocol) + .build(); + + return new CrtAsyncRequestExecutor().execute(context); } /** @@ -222,6 +223,14 @@ AwsCrtAsyncHttpClient.Builder connectionHealthConfiguration(Consumer tcpKeepAliveConfigurationBuilder); + /** + * Configure the HTTP protocol version to use for connections. + * + * @param protocol the HTTP protocol version + * @return The builder for method chaining. + */ + AwsCrtAsyncHttpClient.Builder protocol(Protocol protocol); + /** * Configure whether to enable a hybrid post-quantum key exchange option for the Transport Layer Security (TLS) network * encryption protocol when communicating with services that support Post Quantum TLS. If Post Quantum cipher suites are @@ -246,6 +255,13 @@ AwsCrtAsyncHttpClient.Builder tcpKeepAliveConfiguration(Consumer implements Builder { + + @Override + public Builder protocol(Protocol protocol) { + getAttributeMap().put(SdkHttpConfigurationOption.PROTOCOL, protocol); + return this; + } + @Override public SdkAsyncHttpClient build() { return new AwsCrtAsyncHttpClient(this, getAttributeMap().build() @@ -258,5 +274,6 @@ public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) { .merge(serviceDefaults) .merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS)); } + } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java index 4270b47862e5..075db14e2af1 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java @@ -23,11 +23,12 @@ import java.util.concurrent.CompletionException; import java.util.function.Consumer; import software.amazon.awssdk.annotations.SdkPublicApi; -import software.amazon.awssdk.crt.http.HttpClientConnectionManager; import software.amazon.awssdk.crt.http.HttpException; +import software.amazon.awssdk.crt.http.HttpStreamManager; import software.amazon.awssdk.http.ExecutableHttpRequest; import software.amazon.awssdk.http.HttpExecuteRequest; import software.amazon.awssdk.http.HttpExecuteResponse; +import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.SdkHttpFullResponse; @@ -56,6 +57,9 @@ public final class AwsCrtHttpClient extends AwsCrtHttpClientBase implements SdkH private AwsCrtHttpClient(DefaultBuilder builder, AttributeMap config) { super(builder, config); + if (this.protocol == Protocol.HTTP2) { + throw new UnsupportedOperationException("HTTP/2 is not supported in sync client. Use AwsCrtAsyncHttpClient instead."); + } } public static AwsCrtHttpClient.Builder builder() { @@ -91,14 +95,13 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) { * we have a pool and no one can destroy it underneath us until we've finished submitting the * request) */ - try (HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool(poolKey(request.httpRequest()))) { - CrtRequestContext context = CrtRequestContext.builder() - .crtConnPool(crtConnPool) - .readBufferSize(this.readBufferSize) - .request(request) - .build(); - return new CrtHttpRequest(context); - } + HttpStreamManager crtConnPool = getOrCreateConnectionPool(poolKey(request.httpRequest())); + CrtRequestContext context = CrtRequestContext.builder() + .crtConnPool(crtConnPool) + .readBufferSize(this.readBufferSize) + .request(request) + .build(); + return new CrtHttpRequest(context); } private static final class CrtHttpRequest implements ExecutableHttpRequest { @@ -140,7 +143,7 @@ public HttpExecuteResponse call() throws IOException { @Override public void abort() { if (responseFuture != null) { - responseFuture.completeExceptionally(new IOException("Request ws cancelled")); + responseFuture.completeExceptionally(new IOException("Request was cancelled")); } } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java index 2df865f0fa0b..228a6086eddf 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientBase.java @@ -28,10 +28,13 @@ import java.util.concurrent.ConcurrentHashMap; import software.amazon.awssdk.annotations.SdkProtectedApi; import software.amazon.awssdk.crt.CrtResource; -import software.amazon.awssdk.crt.http.HttpClientConnectionManager; +import software.amazon.awssdk.crt.http.Http2StreamManagerOptions; import software.amazon.awssdk.crt.http.HttpClientConnectionManagerOptions; import software.amazon.awssdk.crt.http.HttpMonitoringOptions; import software.amazon.awssdk.crt.http.HttpProxyOptions; +import software.amazon.awssdk.crt.http.HttpStreamManager; +import software.amazon.awssdk.crt.http.HttpStreamManagerOptions; +import software.amazon.awssdk.crt.http.HttpVersion; import software.amazon.awssdk.crt.io.ClientBootstrap; import software.amazon.awssdk.crt.io.SocketOptions; import software.amazon.awssdk.crt.io.TlsContext; @@ -57,7 +60,8 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable { private static final long DEFAULT_STREAM_WINDOW_SIZE = 16L * 1024L * 1024L; // 16 MB protected final long readBufferSize; - private final Map connectionPools = new ConcurrentHashMap<>(); + protected final Protocol protocol; + private final Map connectionPools = new ConcurrentHashMap<>(); private final LinkedList ownedSubResources = new LinkedList<>(); private final ClientBootstrap bootstrap; private final SocketOptions socketOptions; @@ -67,34 +71,35 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable { private final long maxConnectionIdleInMilliseconds; private final int maxConnectionsPerEndpoint; private final long connectionAcquisitionTimeout; + private final TlsContextOptions tlsContextOptions; private boolean isClosed = false; AwsCrtHttpClientBase(AwsCrtClientBuilderBase builder, AttributeMap config) { - if (config.get(PROTOCOL) == Protocol.HTTP2) { - throw new UnsupportedOperationException("HTTP/2 is not supported in AwsCrtHttpClient yet. Use " - + "NettyNioAsyncHttpClient instead."); + ClientBootstrap clientBootstrap = new ClientBootstrap(null, null); + SocketOptions clientSocketOptions = buildSocketOptions(builder.getTcpKeepAliveConfiguration(), + config.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT)); + TlsContextOptions clientTlsContextOptions = + TlsContextOptions.createDefaultClient() + .withCipherPreference(resolveCipherPreference(builder.getPostQuantumTlsEnabled())) + .withVerifyPeer(!config.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES)); + this.protocol = config.get(PROTOCOL); + if (protocol == Protocol.HTTP2) { + clientTlsContextOptions = clientTlsContextOptions.withAlpnList("h2"); } - try (ClientBootstrap clientBootstrap = new ClientBootstrap(null, null); - SocketOptions clientSocketOptions = buildSocketOptions(builder.getTcpKeepAliveConfiguration(), - config.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT)); - TlsContextOptions clientTlsContextOptions = - TlsContextOptions.createDefaultClient() - .withCipherPreference(resolveCipherPreference(builder.getPostQuantumTlsEnabled())) - .withVerifyPeer(!config.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES)); - TlsContext clientTlsContext = new TlsContext(clientTlsContextOptions)) { - - this.bootstrap = registerOwnedResource(clientBootstrap); - this.socketOptions = registerOwnedResource(clientSocketOptions); - this.tlsContext = registerOwnedResource(clientTlsContext); - this.readBufferSize = builder.getReadBufferSizeInBytes() == null ? - DEFAULT_STREAM_WINDOW_SIZE : builder.getReadBufferSizeInBytes(); - this.maxConnectionsPerEndpoint = config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS); - this.monitoringOptions = resolveHttpMonitoringOptions(builder.getConnectionHealthConfiguration()).orElse(null); - this.maxConnectionIdleInMilliseconds = config.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toMillis(); - this.connectionAcquisitionTimeout = config.get(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT).toMillis(); - this.proxyOptions = resolveProxy(builder.getProxyConfiguration(), tlsContext).orElse(null); - } + this.tlsContextOptions = registerOwnedResource(clientTlsContextOptions); + TlsContext clientTlsContext = new TlsContext(clientTlsContextOptions); + + this.bootstrap = registerOwnedResource(clientBootstrap); + this.socketOptions = registerOwnedResource(clientSocketOptions); + this.tlsContext = registerOwnedResource(clientTlsContext); + this.readBufferSize = builder.getReadBufferSizeInBytes() == null ? + DEFAULT_STREAM_WINDOW_SIZE : builder.getReadBufferSizeInBytes(); + this.maxConnectionsPerEndpoint = config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS); + this.monitoringOptions = resolveHttpMonitoringOptions(builder.getConnectionHealthConfiguration()).orElse(null); + this.maxConnectionIdleInMilliseconds = config.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toMillis(); + this.connectionAcquisitionTimeout = config.get(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT).toMillis(); + this.proxyOptions = resolveProxy(builder.getProxyConfiguration(), tlsContext).orElse(null); } /** @@ -106,7 +111,6 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable { */ private T registerOwnedResource(T subresource) { if (subresource != null) { - subresource.addRef(); ownedSubResources.push(subresource); } return subresource; @@ -116,13 +120,16 @@ String clientName() { return AWS_COMMON_RUNTIME; } - private HttpClientConnectionManager createConnectionPool(URI uri) { + private HttpStreamManager createConnectionPool(URI uri) { log.debug(() -> "Creating ConnectionPool for: URI:" + uri + ", MaxConns: " + maxConnectionsPerEndpoint); - HttpClientConnectionManagerOptions options = new HttpClientConnectionManagerOptions() + boolean isHttps = "https".equalsIgnoreCase(uri.getScheme()); + TlsContext poolTlsContext = isHttps ? tlsContext : null; + + HttpClientConnectionManagerOptions h1Options = new HttpClientConnectionManagerOptions() .withClientBootstrap(bootstrap) .withSocketOptions(socketOptions) - .withTlsContext(tlsContext) + .withTlsContext(poolTlsContext) .withUri(uri) .withWindowSize(readBufferSize) .withMaxConnections(maxConnectionsPerEndpoint) @@ -132,7 +139,24 @@ private HttpClientConnectionManager createConnectionPool(URI uri) { .withMaxConnectionIdleInMilliseconds(maxConnectionIdleInMilliseconds) .withConnectionAcquisitionTimeoutInMilliseconds(connectionAcquisitionTimeout); - return HttpClientConnectionManager.create(options); + HttpStreamManagerOptions options = new HttpStreamManagerOptions() + .withHTTP1ConnectionManagerOptions(h1Options); + + if (protocol == Protocol.HTTP2) { + Http2StreamManagerOptions h2Options = new Http2StreamManagerOptions() + .withConnectionManagerOptions(h1Options); + + if (!isHttps) { + h2Options.withPriorKnowledge(true); + } + + options.withHTTP2StreamManagerOptions(h2Options); + options.withExpectedProtocol(HttpVersion.HTTP_2); + } else { + options.withExpectedProtocol(HttpVersion.HTTP_1_1); + } + + return HttpStreamManager.create(options); } /* @@ -150,14 +174,13 @@ private HttpClientConnectionManager createConnectionPool(URI uri) { * existing pool. If we add all of execute() to the scope, we include, at minimum a JNI call to the native * pool implementation. */ - HttpClientConnectionManager getOrCreateConnectionPool(URI uri) { + HttpStreamManager getOrCreateConnectionPool(URI uri) { synchronized (this) { if (isClosed) { throw new IllegalStateException("Client is closed. No more requests can be made with this client."); } - HttpClientConnectionManager connPool = connectionPools.computeIfAbsent(uri, this::createConnectionPool); - connPool.addRef(); + HttpStreamManager connPool = connectionPools.computeIfAbsent(uri, this::createConnectionPool); return connPool; } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestContext.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestContext.java index 69db7cb3c5b9..83c751d282fa 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestContext.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestContext.java @@ -16,7 +16,8 @@ package software.amazon.awssdk.http.crt.internal; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.crt.http.HttpClientConnectionManager; +import software.amazon.awssdk.crt.http.HttpStreamManager; +import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.async.AsyncExecuteRequest; import software.amazon.awssdk.metrics.MetricCollector; @@ -24,14 +25,16 @@ public final class CrtAsyncRequestContext { private final AsyncExecuteRequest request; private final long readBufferSize; - private final HttpClientConnectionManager crtConnPool; + private final HttpStreamManager crtConnPool; private final MetricCollector metricCollector; + private final Protocol protocol; private CrtAsyncRequestContext(Builder builder) { this.request = builder.request; this.readBufferSize = builder.readBufferSize; this.crtConnPool = builder.crtConnPool; this.metricCollector = request.metricCollector().orElse(null); + this.protocol = builder.protocol; } public static Builder builder() { @@ -46,7 +49,11 @@ public long readBufferSize() { return readBufferSize; } - public HttpClientConnectionManager crtConnPool() { + public Protocol protocol() { + return protocol; + } + + public HttpStreamManager crtConnPool() { return crtConnPool; } @@ -57,7 +64,8 @@ public MetricCollector metricCollector() { public static final class Builder { private AsyncExecuteRequest request; private long readBufferSize; - private HttpClientConnectionManager crtConnPool; + private HttpStreamManager crtConnPool; + private Protocol protocol; private Builder() { } @@ -72,13 +80,19 @@ public Builder readBufferSize(long readBufferSize) { return this; } - public Builder crtConnPool(HttpClientConnectionManager crtConnPool) { + public Builder crtConnPool(HttpStreamManager crtConnPool) { this.crtConnPool = crtConnPool; return this; } + public Builder protocol(Protocol protocol) { + this.protocol = protocol; + return this; + } + public CrtAsyncRequestContext build() { return new CrtAsyncRequestContext(this); } + } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java index 4ede09d7ee3e..2b3eb7f562b8 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java @@ -16,21 +16,17 @@ package software.amazon.awssdk.http.crt.internal; import static software.amazon.awssdk.http.crt.internal.CrtUtils.reportMetrics; -import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapConnectionFailureException; -import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapWithIoExceptionIfRetryable; +import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapCrtException; +import static software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter.toAsyncCrtRequest; -import java.io.IOException; import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.crt.CrtRuntimeException; -import software.amazon.awssdk.crt.http.HttpClientConnection; -import software.amazon.awssdk.crt.http.HttpException; -import software.amazon.awssdk.crt.http.HttpRequest; -import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; +import software.amazon.awssdk.crt.http.HttpRequestBase; +import software.amazon.awssdk.crt.http.HttpStreamBase; +import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.SdkCancellationException; import software.amazon.awssdk.http.async.AsyncExecuteRequest; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; -import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter; import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter; import software.amazon.awssdk.metrics.MetricCollector; import software.amazon.awssdk.metrics.NoOpMetricCollector; @@ -42,8 +38,21 @@ public final class CrtAsyncRequestExecutor { private static final Logger log = Logger.loggerFor(CrtAsyncRequestExecutor.class); public CompletableFuture execute(CrtAsyncRequestContext executionContext) { - // go ahead and get a reference to the metricCollector since multiple futures will - // need it regardless. + AsyncExecuteRequest asyncRequest = executionContext.sdkRequest(); + CompletableFuture requestFuture = createAsyncExecutionFuture(asyncRequest); + + try { + doExecute(executionContext, asyncRequest, requestFuture); + } catch (Throwable t) { + reportAsyncFailure(t, requestFuture, asyncRequest.responseHandler()); + } + + return requestFuture; + } + + private void doExecute(CrtAsyncRequestContext executionContext, + AsyncExecuteRequest asyncRequest, + CompletableFuture requestFuture) { MetricCollector metricCollector = executionContext.metricCollector(); boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector); @@ -56,65 +65,31 @@ public CompletableFuture execute(CrtAsyncRequestContext executionContext) acquireStartTime = System.nanoTime(); } - CompletableFuture requestFuture = createAsyncExecutionFuture(executionContext.sdkRequest()); + HttpRequestBase crtRequest = toAsyncCrtRequest(executionContext); - // When a Connection is ready from the Connection Pool, schedule the Request on the connection - CompletableFuture httpClientConnectionCompletableFuture = - executionContext.crtConnPool().acquireConnection(); + HttpStreamBaseResponseHandler crtResponseHandler = + CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler()); - long finalAcquireStartTime = acquireStartTime; + CompletableFuture streamFuture = + executionContext.crtConnPool().acquireStream(crtRequest, crtResponseHandler); - httpClientConnectionCompletableFuture.whenComplete((crtConn, throwable) -> { - AsyncExecuteRequest asyncRequest = executionContext.sdkRequest(); + long finalAcquireStartTime = acquireStartTime; + streamFuture.whenComplete((stream, throwable) -> { if (shouldPublishMetrics) { reportMetrics(executionContext.crtConnPool(), metricCollector, finalAcquireStartTime); } - // If we didn't get a connection for some reason, fail the request if (throwable != null) { - Throwable toThrow = wrapConnectionFailureException(throwable); - reportAsyncFailure(crtConn, toThrow, requestFuture, asyncRequest.responseHandler()); - return; + Throwable toThrow = wrapCrtException(throwable); + reportAsyncFailure(toThrow, requestFuture, asyncRequest.responseHandler()); } - - executeRequest(executionContext, requestFuture, crtConn, asyncRequest); }); - - return requestFuture; - } - - private void executeRequest(CrtAsyncRequestContext executionContext, - CompletableFuture requestFuture, - HttpClientConnection crtConn, - AsyncExecuteRequest asyncRequest) { - // Submit the request on the connection - try { - HttpRequest crtRequest = CrtRequestAdapter.toAsyncCrtRequest(executionContext); - HttpStreamResponseHandler crtResponseHandler = - CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, asyncRequest.responseHandler()); - - crtConn.makeRequest(crtRequest, crtResponseHandler).activate(); - } catch (HttpException e) { - Throwable toThrow = wrapWithIoExceptionIfRetryable(e); - reportAsyncFailure(crtConn, - toThrow, - requestFuture, - asyncRequest.responseHandler()); - } catch (IllegalStateException | CrtRuntimeException e) { - // CRT throws IllegalStateException if the connection is closed - reportAsyncFailure(crtConn, new IOException("An exception occurred when making the request", e), - requestFuture, - asyncRequest.responseHandler()); - } catch (Throwable throwable) { - reportAsyncFailure(crtConn, throwable, - requestFuture, - asyncRequest.responseHandler()); - } } /** * Create the execution future and set up the cancellation logic. + * * @return The created execution future. */ private CompletableFuture createAsyncExecutionFuture(AsyncExecuteRequest request) { @@ -137,14 +112,9 @@ private CompletableFuture createAsyncExecutionFuture(AsyncExecuteRequest r /** * Notify the provided response handler and future of the failure. */ - private void reportAsyncFailure(HttpClientConnection crtConn, - Throwable cause, - CompletableFuture executeFuture, - SdkAsyncHttpResponseHandler responseHandler) { - if (crtConn != null) { - crtConn.close(); - } - + private void reportAsyncFailure(Throwable cause, + CompletableFuture executeFuture, + SdkAsyncHttpResponseHandler responseHandler) { try { responseHandler.onError(cause); } catch (Exception e) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java index 420f12e31f44..507e9a6e7165 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestContext.java @@ -16,7 +16,7 @@ package software.amazon.awssdk.http.crt.internal; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.crt.http.HttpClientConnectionManager; +import software.amazon.awssdk.crt.http.HttpStreamManager; import software.amazon.awssdk.http.HttpExecuteRequest; import software.amazon.awssdk.metrics.MetricCollector; @@ -24,7 +24,7 @@ public final class CrtRequestContext { private final HttpExecuteRequest request; private final long readBufferSize; - private final HttpClientConnectionManager crtConnPool; + private final HttpStreamManager crtConnPool; private final MetricCollector metricCollector; private CrtRequestContext(Builder builder) { @@ -46,7 +46,7 @@ public long readBufferSize() { return readBufferSize; } - public HttpClientConnectionManager crtConnPool() { + public HttpStreamManager crtConnPool() { return crtConnPool; } @@ -57,7 +57,7 @@ public MetricCollector metricCollector() { public static final class Builder { private HttpExecuteRequest request; private long readBufferSize; - private HttpClientConnectionManager crtConnPool; + private HttpStreamManager crtConnPool; private Builder() { } @@ -72,7 +72,7 @@ public Builder readBufferSize(long readBufferSize) { return this; } - public Builder crtConnPool(HttpClientConnectionManager crtConnPool) { + public Builder crtConnPool(HttpStreamManager crtConnPool) { this.crtConnPool = crtConnPool; return this; } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java index c3b2f4e8410d..f0b976a457e1 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java @@ -16,17 +16,13 @@ package software.amazon.awssdk.http.crt.internal; import static software.amazon.awssdk.http.crt.internal.CrtUtils.reportMetrics; -import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapConnectionFailureException; -import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapWithIoExceptionIfRetryable; +import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapCrtException; -import java.io.IOException; import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.crt.CrtRuntimeException; -import software.amazon.awssdk.crt.http.HttpClientConnection; -import software.amazon.awssdk.crt.http.HttpException; import software.amazon.awssdk.crt.http.HttpRequest; -import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; +import software.amazon.awssdk.crt.http.HttpStreamBase; +import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.SdkHttpFullResponse; import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter; import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; @@ -37,8 +33,18 @@ public final class CrtRequestExecutor { public CompletableFuture execute(CrtRequestContext executionContext) { - // go ahead and get a reference to the metricCollector since multiple futures will - // need it regardless. + CompletableFuture requestFuture = new CompletableFuture<>(); + + try { + doExecute(executionContext, requestFuture); + } catch (Throwable t) { + requestFuture.completeExceptionally(t); + } + + return requestFuture; + } + + private void doExecute(CrtRequestContext executionContext, CompletableFuture requestFuture) { MetricCollector metricCollector = executionContext.metricCollector(); boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector); @@ -51,65 +57,24 @@ public CompletableFuture execute(CrtRequestContext executio acquireStartTime = System.nanoTime(); } - CompletableFuture requestFuture = new CompletableFuture<>(); + HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(requestFuture); + + HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext); - // When a Connection is ready from the Connection Pool, schedule the Request on the connection - CompletableFuture httpClientConnectionCompletableFuture = - executionContext.crtConnPool().acquireConnection(); + CompletableFuture streamFuture = + executionContext.crtConnPool().acquireStream(crtRequest, crtResponseHandler); long finalAcquireStartTime = acquireStartTime; - httpClientConnectionCompletableFuture.whenComplete((crtConn, throwable) -> { + streamFuture.whenComplete((streamBase, throwable) -> { if (shouldPublishMetrics) { reportMetrics(executionContext.crtConnPool(), metricCollector, finalAcquireStartTime); } - // If we didn't get a connection for some reason, fail the request if (throwable != null) { - Throwable toThrow = wrapConnectionFailureException(throwable); + Throwable toThrow = wrapCrtException(throwable); requestFuture.completeExceptionally(toThrow); - return; } - - executeRequest(executionContext, requestFuture, crtConn); }); - - return requestFuture; - } - - private void executeRequest(CrtRequestContext executionContext, - CompletableFuture requestFuture, - HttpClientConnection crtConn) { - HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext); - - try { - HttpStreamResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(crtConn, - requestFuture); - - // Submit the request on the connection - crtConn.makeRequest(crtRequest, crtResponseHandler).activate(); - } catch (Throwable throwable) { - handleException(requestFuture, crtConn, throwable); - } - } - - private static void handleException(CompletableFuture requestFuture, HttpClientConnection crtConn, - Throwable throwable) { - - crtConn.close(); - - if (throwable instanceof HttpException) { - Throwable toThrow = wrapWithIoExceptionIfRetryable((HttpException) throwable); - requestFuture.completeExceptionally(toThrow); - return; - } - - if (throwable instanceof CrtRuntimeException || throwable instanceof IllegalStateException) { - // CRT throws IllegalStateException if the connection is closed - requestFuture.completeExceptionally(new IOException(throwable)); - return; - } - - requestFuture.completeExceptionally(throwable); } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtUtils.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtUtils.java index 62e9c4156814..da69685c9ecb 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtUtils.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtUtils.java @@ -25,12 +25,14 @@ import java.io.IOException; import java.net.ConnectException; import java.time.Duration; +import java.util.concurrent.CompletionException; import javax.net.ssl.SSLHandshakeException; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.crt.CrtRuntimeException; import software.amazon.awssdk.crt.http.HttpClientConnection; -import software.amazon.awssdk.crt.http.HttpClientConnectionManager; import software.amazon.awssdk.crt.http.HttpException; import software.amazon.awssdk.crt.http.HttpManagerMetrics; +import software.amazon.awssdk.crt.http.HttpStreamManager; import software.amazon.awssdk.metrics.MetricCollector; @SdkInternalApi @@ -52,30 +54,40 @@ public static Throwable wrapWithIoExceptionIfRetryable(HttpException httpExcepti return toThrow; } - public static Throwable wrapConnectionFailureException(Throwable throwable) { - Throwable toThrow = new IOException("An exception occurred when acquiring a connection", throwable); + public static Throwable wrapCrtException(Throwable throwable) { + if (throwable instanceof CompletionException) { + throwable = throwable.getCause(); + } if (throwable instanceof HttpException) { HttpException httpException = (HttpException) throwable; int httpErrorCode = httpException.getErrorCode(); if (httpErrorCode == CRT_TLS_NEGOTIATION_ERROR_CODE) { - toThrow = new SSLHandshakeException(httpException.getMessage()); - } else if (httpErrorCode == CRT_SOCKET_TIMEOUT) { - toThrow = new ConnectException(httpException.getMessage()); + return new SSLHandshakeException(httpException.getMessage()); + } + // TODO: check with CRT team, could CRT_SOCKET_TIMEOUT be thrown + // from processes other than tcp connect? + if (httpErrorCode == CRT_SOCKET_TIMEOUT) { + return new ConnectException(httpException.getMessage()); } + + return wrapWithIoExceptionIfRetryable((HttpException) throwable); } - return toThrow; + if (throwable instanceof IllegalStateException || throwable instanceof CrtRuntimeException) { + // CRT throws IllegalStateException if the connection is closed + return new IOException("An exception occurred when making the request", throwable); + } + + return throwable; } - public static void reportMetrics(HttpClientConnectionManager connManager, MetricCollector metricCollector, - long acquireStartTime) { + public static void reportMetrics(HttpStreamManager connManager, MetricCollector metricCollector, + long acquireStartTime) { long acquireCompletionTime = System.nanoTime(); Duration acquireTimeTaken = Duration.ofNanos(acquireCompletionTime - acquireStartTime); metricCollector.reportMetric(CONCURRENCY_ACQUIRE_DURATION, acquireTimeTaken); HttpManagerMetrics managerMetrics = connManager.getManagerMetrics(); - // currently this executor only handles HTTP 1.1. Until H2 is added, the max concurrency settings are 1:1 with TCP - // connections. When H2 is added, this code needs to be updated to handle stream multiplexing metricCollector.reportMetric(MAX_CONCURRENCY, connManager.getMaxConnections()); metricCollector.reportMetric(AVAILABLE_CONCURRENCY, saturatedCast(managerMetrics.getAvailableConcurrency())); metricCollector.reportMetric(LEASED_CONCURRENCY, saturatedCast(managerMetrics.getLeasedConcurrency())); diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java index 10b658a835df..8672d80b0d1b 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java @@ -22,8 +22,10 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpRequest; +import software.amazon.awssdk.crt.http.HttpRequestBase; import software.amazon.awssdk.http.Header; import software.amazon.awssdk.http.HttpExecuteRequest; +import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.SdkHttpRequest; import software.amazon.awssdk.http.async.AsyncExecuteRequest; import software.amazon.awssdk.http.crt.internal.CrtAsyncRequestContext; @@ -34,7 +36,7 @@ public final class CrtRequestAdapter { private CrtRequestAdapter() { } - public static HttpRequest toAsyncCrtRequest(CrtAsyncRequestContext request) { + public static HttpRequestBase toAsyncCrtRequest(CrtAsyncRequestContext request) { AsyncExecuteRequest sdkExecuteRequest = request.sdkRequest(); SdkHttpRequest sdkRequest = sdkExecuteRequest.request(); @@ -47,14 +49,15 @@ public static HttpRequest toAsyncCrtRequest(CrtAsyncRequestContext request) { String encodedQueryString = sdkRequest.encodedQueryParameters() .map(value -> "?" + value) .orElse(""); - - HttpHeader[] crtHeaderArray = asArray(createAsyncHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest)); - + String path = encodedPath + encodedQueryString; + CrtRequestBodyAdapter crtRequestBodyAdapter = new CrtRequestBodyAdapter(sdkExecuteRequest.requestContentPublisher(), + request.readBufferSize()); + HttpHeader[] crtHeaderArray = asArray(createAsyncHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest, + request.protocol())); return new HttpRequest(method, - encodedPath + encodedQueryString, + path, crtHeaderArray, - new CrtRequestBodyAdapter(sdkExecuteRequest.requestContentPublisher(), - request.readBufferSize())); + crtRequestBodyAdapter); } public static HttpRequest toCrtRequest(CrtRequestContext request) { @@ -89,7 +92,8 @@ private static HttpHeader[] asArray(List crtHeaderList) { return crtHeaderList.toArray(new HttpHeader[0]); } - private static List createAsyncHttpHeaderList(URI uri, AsyncExecuteRequest sdkExecuteRequest) { + private static List createAsyncHttpHeaderList(URI uri, AsyncExecuteRequest sdkExecuteRequest, + Protocol protocol) { SdkHttpRequest sdkRequest = sdkExecuteRequest.request(); // worst case we may add 3 more headers here List crtHeaderList = new ArrayList<>(sdkRequest.numHeaders() + 3); @@ -99,8 +103,8 @@ private static List createAsyncHttpHeaderList(URI uri, AsyncExecuteR crtHeaderList.add(new HttpHeader(Header.HOST, uri.getHost())); } - // Add Connection Keep Alive Header to reuse this Http Connection as long as possible - if (!sdkRequest.firstMatchingHeader(Header.CONNECTION).isPresent()) { + // Add Connection Keep Alive Header for HTTP/1.1 only (forbidden in HTTP/2 per RFC 7540) + if (protocol != Protocol.HTTP2 && !sdkRequest.firstMatchingHeader(Header.CONNECTION).isPresent()) { crtHeaderList.add(new HttpHeader(Header.CONNECTION, Header.KEEP_ALIVE_VALUE)); } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java index d32c9e2ac228..cac60a75fea9 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java @@ -22,12 +22,11 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.annotations.SdkTestInternalApi; import software.amazon.awssdk.crt.CRT; -import software.amazon.awssdk.crt.http.HttpClientConnection; import software.amazon.awssdk.crt.http.HttpException; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpHeaderBlock; -import software.amazon.awssdk.crt.http.HttpStream; -import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; +import software.amazon.awssdk.crt.http.HttpStreamBase; +import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; @@ -41,49 +40,44 @@ * Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods */ @SdkInternalApi -public final class CrtResponseAdapter implements HttpStreamResponseHandler { +public final class CrtResponseAdapter implements HttpStreamBaseResponseHandler { private static final Logger log = Logger.loggerFor(CrtResponseAdapter.class); private final CompletableFuture completionFuture; private final SdkAsyncHttpResponseHandler responseHandler; private final SimplePublisher responsePublisher; - private final SdkHttpResponse.Builder responseBuilder; private final ResponseHandlerHelper responseHandlerHelper; - private CrtResponseAdapter(HttpClientConnection connection, - CompletableFuture completionFuture, + private CrtResponseAdapter(CompletableFuture completionFuture, SdkAsyncHttpResponseHandler responseHandler) { - this(connection, completionFuture, responseHandler, new SimplePublisher<>()); + this(completionFuture, responseHandler, new SimplePublisher<>()); } - @SdkTestInternalApi - public CrtResponseAdapter(HttpClientConnection connection, - CompletableFuture completionFuture, - SdkAsyncHttpResponseHandler responseHandler, - SimplePublisher simplePublisher) { - Validate.paramNotNull(connection, "connection"); + public CrtResponseAdapter(CompletableFuture completionFuture, + SdkAsyncHttpResponseHandler responseHandler, + SimplePublisher simplePublisher) { this.completionFuture = Validate.paramNotNull(completionFuture, "completionFuture"); this.responseHandler = Validate.paramNotNull(responseHandler, "responseHandler"); this.responseBuilder = SdkHttpResponse.builder(); - this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder, connection); this.responsePublisher = simplePublisher; + this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder); } - public static HttpStreamResponseHandler toCrtResponseHandler(HttpClientConnection crtConn, - CompletableFuture requestFuture, - SdkAsyncHttpResponseHandler responseHandler) { - return new CrtResponseAdapter(crtConn, requestFuture, responseHandler); + public static HttpStreamBaseResponseHandler toCrtResponseHandler( + CompletableFuture requestFuture, + SdkAsyncHttpResponseHandler responseHandler) { + return new CrtResponseAdapter(requestFuture, responseHandler); } @Override - public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] nextHeaders) { - responseHandlerHelper.onResponseHeaders(responseStatusCode, blockType, nextHeaders); + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) { + responseHandlerHelper.onResponseHeaders(stream, responseStatusCode, headerType, nextHeaders); } @Override - public void onResponseHeadersDone(HttpStream stream, int headerType) { + public void onResponseHeadersDone(HttpStreamBase stream, int headerType) { if (headerType == HttpHeaderBlock.MAIN.getValue()) { responseHandler.onHeaders(responseBuilder.build()); responseHandler.onStream(responsePublisher); @@ -91,7 +85,7 @@ public void onResponseHeadersDone(HttpStream stream, int headerType) { } @Override - public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) { + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { CompletableFuture writeFuture = responsePublisher.send(ByteBuffer.wrap(bodyBytesIn)); if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) { @@ -101,49 +95,43 @@ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) { writeFuture.whenComplete((result, failure) -> { if (failure != null) { - handlePublisherError(stream, failure); + failResponseHandlerAndFuture(failure); + responseHandlerHelper.closeStream(); return; } - - responseHandlerHelper.incrementWindow(stream, bodyBytesIn.length); + responseHandlerHelper.incrementWindow(bodyBytesIn.length); }); return 0; } @Override - public void onResponseComplete(HttpStream stream, int errorCode) { + public void onResponseComplete(HttpStreamBase stream, int errorCode) { if (errorCode == CRT.AWS_CRT_SUCCESS) { - onSuccessfulResponseComplete(stream); + onSuccessfulResponseComplete(); } else { - onFailedResponseComplete(stream, new HttpException(errorCode)); + onFailedResponseComplete(new HttpException(errorCode)); } } - private void onSuccessfulResponseComplete(HttpStream stream) { + private void onSuccessfulResponseComplete() { responsePublisher.complete().whenComplete((result, failure) -> { if (failure != null) { - handlePublisherError(stream, failure); + failResponseHandlerAndFuture(failure); + responseHandlerHelper.closeStream(); return; } completionFuture.complete(null); }); - - responseHandlerHelper.releaseConnection(stream); + responseHandlerHelper.closeStream(); } - private void handlePublisherError(HttpStream stream, Throwable failure) { - failResponseHandlerAndFuture(failure); - responseHandlerHelper.closeConnection(stream); - } - - private void onFailedResponseComplete(HttpStream stream, HttpException error) { + private void onFailedResponseComplete(HttpException error) { log.debug(() -> "HTTP response encountered an error.", error); - - Throwable toThrow = wrapWithIoExceptionIfRetryable(error);; + Throwable toThrow = wrapWithIoExceptionIfRetryable(error); responsePublisher.error(toThrow); failResponseHandlerAndFuture(toThrow); - responseHandlerHelper.closeConnection(stream); + responseHandlerHelper.closeStream(); } private void failResponseHandlerAndFuture(Throwable error) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java index ba3a91dcdeca..1bee55cdc9ce 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java @@ -22,12 +22,10 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.annotations.SdkTestInternalApi; import software.amazon.awssdk.crt.CRT; -import software.amazon.awssdk.crt.http.HttpClientConnection; import software.amazon.awssdk.crt.http.HttpException; import software.amazon.awssdk.crt.http.HttpHeader; -import software.amazon.awssdk.crt.http.HttpHeaderBlock; -import software.amazon.awssdk.crt.http.HttpStream; -import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; +import software.amazon.awssdk.crt.http.HttpStreamBase; +import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.AbortableInputStream; import software.amazon.awssdk.http.SdkHttpFullResponse; import software.amazon.awssdk.http.SdkHttpResponse; @@ -40,54 +38,45 @@ * Response handler adaptor for {@link AwsCrtHttpClient}. */ @SdkInternalApi -public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler { +public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamBaseResponseHandler { private static final Logger log = Logger.loggerFor(InputStreamAdaptingHttpStreamResponseHandler.class); private volatile AbortableInputStreamSubscriber inputStreamSubscriber; private final SimplePublisher simplePublisher; - private final CompletableFuture requestCompletionFuture; - private final SdkHttpFullResponse.Builder responseBuilder; private final ResponseHandlerHelper responseHandlerHelper; - public InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn, - CompletableFuture requestCompletionFuture) { - this(crtConn, requestCompletionFuture, new SimplePublisher<>()); + public InputStreamAdaptingHttpStreamResponseHandler(CompletableFuture requestCompletionFuture) { + this(requestCompletionFuture, new SimplePublisher<>()); } @SdkTestInternalApi - public InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn, - CompletableFuture requestCompletionFuture, + public InputStreamAdaptingHttpStreamResponseHandler(CompletableFuture requestCompletionFuture, SimplePublisher simplePublisher) { this.requestCompletionFuture = requestCompletionFuture; this.responseBuilder = SdkHttpResponse.builder(); - this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder, crtConn); this.simplePublisher = simplePublisher; + this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder); } @Override - public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, HttpHeader[] nextHeaders) { - if (blockType == HttpHeaderBlock.MAIN.getValue()) { - for (HttpHeader h : nextHeaders) { - responseBuilder.appendHeader(h.getName(), h.getValue()); - } - responseBuilder.statusCode(responseStatusCode); - } + responseHandlerHelper.onResponseHeaders(stream, responseStatusCode, blockType, nextHeaders); // Propagate cancellation requestCompletionFuture.exceptionally(t -> { - responseHandlerHelper.closeConnection(stream); + responseHandlerHelper.closeStream(); return null; }); } @Override - public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) { + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { if (inputStreamSubscriber == null) { inputStreamSubscriber = AbortableInputStreamSubscriber.builder() - .doAfterClose(() -> responseHandlerHelper.closeConnection(stream)) + .doAfterClose(() -> responseHandlerHelper.closeStream()) .build(); simplePublisher.subscribe(inputStreamSubscriber); // For response with a payload, we need to complete the future here to allow downstream to retrieve the data from @@ -107,11 +96,11 @@ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) { if (failure != null) { log.debug(() -> "The subscriber failed to receive the data, closing the connection and failing the future", failure); - failFutureAndCloseConnection(stream, failure); + requestCompletionFuture.completeExceptionally(failure); + responseHandlerHelper.closeStream(); return; } - // increment the window upon buffer consumption. - responseHandlerHelper.incrementWindow(stream, bodyBytesIn.length); + responseHandlerHelper.incrementWindow(bodyBytesIn.length); }); // Window will be incremented after the subscriber consumes the data, returning 0 here to disable it. @@ -119,34 +108,28 @@ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) { } @Override - public void onResponseComplete(HttpStream stream, int errorCode) { + public void onResponseComplete(HttpStreamBase stream, int errorCode) { if (errorCode == CRT.AWS_CRT_SUCCESS) { - onSuccessfulResponseComplete(stream); + onSuccessfulResponseComplete(); } else { - onFailedResponseComplete(stream, errorCode); + onFailedResponseComplete(errorCode); } } - private void failFutureAndCloseConnection(HttpStream stream, Throwable failure) { - requestCompletionFuture.completeExceptionally(failure); - responseHandlerHelper.closeConnection(stream); - } - - private void onFailedResponseComplete(HttpStream stream, int errorCode) { - Throwable toThrow = - wrapWithIoExceptionIfRetryable(new HttpException(errorCode)); - + private void onFailedResponseComplete(int errorCode) { + Throwable toThrow = wrapWithIoExceptionIfRetryable(new HttpException(errorCode)); simplePublisher.error(toThrow); - failFutureAndCloseConnection(stream, toThrow); + requestCompletionFuture.completeExceptionally(toThrow); + responseHandlerHelper.closeStream(); } - private void onSuccessfulResponseComplete(HttpStream stream) { + private void onSuccessfulResponseComplete() { // For response without a payload, for example, S3 PutObjectResponse, we need to complete the future // in onResponseComplete callback since onResponseBody will never be invoked. - requestCompletionFuture.complete(responseBuilder.build()); + requestCompletionFuture.complete(responseBuilder.build()); // requestCompletionFuture has been completed at this point, no need to notify the future simplePublisher.complete(); - responseHandlerHelper.releaseConnection(stream); + responseHandlerHelper.closeStream(); } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java index 6da44090aa5f..4b2b4a1c4626 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java @@ -16,35 +16,34 @@ package software.amazon.awssdk.http.crt.internal.response; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.crt.http.HttpClientConnection; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpHeaderBlock; -import software.amazon.awssdk.crt.http.HttpStream; +import software.amazon.awssdk.crt.http.HttpStreamBase; import software.amazon.awssdk.http.SdkHttpResponse; /** * This is the helper class that contains common logic shared between {@link CrtResponseAdapter} and * {@link InputStreamAdaptingHttpStreamResponseHandler}. * - * CRT connection will only be closed, i.e., not reused, in one of the following conditions: - * 1. 5xx server error OR - * 2. It fails to read the response OR - * 3. the response stream is closed/aborted by the caller. */ @SdkInternalApi public class ResponseHandlerHelper { private final SdkHttpResponse.Builder responseBuilder; - private final HttpClientConnection connection; - private boolean connectionClosed; - private final Object lock = new Object(); + private HttpStreamBase stream; + private boolean streamClosed; + private final Object streamLock = new Object(); - public ResponseHandlerHelper(SdkHttpResponse.Builder responseBuilder, HttpClientConnection connection) { + public ResponseHandlerHelper(SdkHttpResponse.Builder responseBuilder) { this.responseBuilder = responseBuilder; - this.connection = connection; } - public void onResponseHeaders(int responseStatusCode, int headerType, HttpHeader[] nextHeaders) { + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) { + synchronized (streamLock) { + if (this.stream == null) { + this.stream = stream; + } + } if (headerType == HttpHeaderBlock.MAIN.getValue()) { for (HttpHeader h : nextHeaders) { responseBuilder.appendHeader(h.getName(), h.getValue()); @@ -53,36 +52,18 @@ public void onResponseHeaders(int responseStatusCode, int headerType, HttpHeader } } - /** - * Release the connection back to the pool so that it can be reused. - */ - public void releaseConnection(HttpStream stream) { - synchronized (lock) { - if (!connectionClosed) { - connectionClosed = true; - connection.close(); - stream.close(); - } - } - } - - public void incrementWindow(HttpStream stream, int windowSize) { - synchronized (lock) { - if (!connectionClosed) { + public void incrementWindow(int windowSize) { + synchronized (streamLock) { + if (!streamClosed && stream != null) { stream.incrementWindow(windowSize); } } } - /** - * Close the connection completely - */ - public void closeConnection(HttpStream stream) { - synchronized (lock) { - if (!connectionClosed) { - connectionClosed = true; - connection.shutdown(); - connection.close(); + public void closeStream() { + synchronized (streamLock) { + if (!streamClosed && stream != null) { + streamClosed = true; stream.close(); } } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientSpiVerificationTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientSpiVerificationTest.java index 0cdb3e31cc8d..cc29152075bd 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientSpiVerificationTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientSpiVerificationTest.java @@ -32,6 +32,7 @@ import com.github.tomakehurst.wiremock.http.Fault; import com.github.tomakehurst.wiremock.junit.WireMockRule; import java.io.IOException; +import java.net.ConnectException; import java.net.URI; import java.nio.ByteBuffer; import java.time.Duration; @@ -126,8 +127,7 @@ public void requestFailed_connectionTimeout_shouldWrapException() { SdkHttpRequest request = createRequest(uri); RecordingResponseHandler recorder = new RecordingResponseHandler(); client.execute(AsyncExecuteRequest.builder().request(request).requestContentPublisher(createProvider("")).responseHandler(recorder).build()); - assertThatThrownBy(() -> recorder.completeFuture().get(5, TimeUnit.SECONDS)).hasCauseInstanceOf(IOException.class) - .hasRootCauseInstanceOf(HttpException.class); + assertThatThrownBy(() -> recorder.completeFuture().get(5, TimeUnit.SECONDS)).hasCauseInstanceOf(ConnectException.class); } } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientWireMockTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientWireMockTest.java index c9f76057913b..2efa2e56e9f2 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientWireMockTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientWireMockTest.java @@ -64,15 +64,6 @@ public void closeClient_reuse_throwException() { assertThatThrownBy(() -> makeSimpleRequest(client)).hasMessageContaining("is closed"); } - @Test - public void invalidProtocol_shouldThrowException() { - AttributeMap attributeMap = AttributeMap.builder() - .put(PROTOCOL, Protocol.HTTP2) - .build(); - assertThatThrownBy(() -> AwsCrtAsyncHttpClient.builder().buildWithDefaults(attributeMap)) - .isInstanceOf(UnsupportedOperationException.class); - } - @Test public void sendRequest_withCollector_shouldCollectMetrics() throws Exception { diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java index 4f024993f26c..79d5475a7323 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java @@ -24,6 +24,7 @@ import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static software.amazon.awssdk.http.crt.CrtHttpClientTestUtils.createRequest; import com.github.tomakehurst.wiremock.http.Fault; @@ -84,8 +85,8 @@ private byte[] generateRandomBody(int size) { } - @Test(expected = IOException.class) - public void requestFailed_connectionTimeout_shouldWrapException() throws IOException { + @Test + public void requestFailed_connectionTimeout_shouldWrapException() { try (SdkHttpClient client = AwsCrtHttpClient.builder().connectionTimeout(Duration.ofNanos(1)).build()) { URI uri = URI.create("http://localhost:" + mockServer.port()); stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFault(Fault.RANDOM_DATA_THEN_CLOSE))); @@ -94,12 +95,12 @@ public void requestFailed_connectionTimeout_shouldWrapException() throws IOExcep executeRequestBuilder.request(request); ExecutableHttpRequest executableRequest = client.prepareRequest(executeRequestBuilder.build()); - executableRequest.call(); + assertThatThrownBy(() -> executableRequest.call()).isInstanceOf(IOException.class) + .hasMessageContaining("operation timed out"); } } - - @Test(expected = HttpException.class) + @Test public void requestFailed_notRetryable_shouldNotWrapException() throws IOException { try (SdkHttpClient client = AwsCrtHttpClient.builder().build()) { URI uri = URI.create("http://localhost:" + mockServer.port()); @@ -117,7 +118,8 @@ public void requestFailed_notRetryable_shouldNotWrapException() throws IOExcepti executeRequestBuilder.request(request); executeRequestBuilder.contentStreamProvider(() -> new ByteArrayInputStream(new byte[0])); ExecutableHttpRequest executableRequest = client.prepareRequest(executeRequestBuilder.build()); - executableRequest.call(); + assertThatThrownBy(() -> executableRequest.call()).isInstanceOf(HttpException.class) + .hasMessageContaining("does not match the previously declared length"); } } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/H2BehaviorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/H2BehaviorTest.java new file mode 100644 index 000000000000..8a8ec2415982 --- /dev/null +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/H2BehaviorTest.java @@ -0,0 +1,184 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt; + + +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static software.amazon.awssdk.http.HttpTestUtils.sendGetRequest; +import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL; +import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2Frame; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2FrameLogger; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2SecurityUtil; +import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.SupportedCipherSuiteFilter; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.utils.AttributeMap; + +public class H2BehaviorTest { + private SdkAsyncHttpClient crt; + private H2Server server; + + @BeforeEach + public void setup() throws Exception { + server = new H2Server(true); + server.init(); + crt = AwsCrtAsyncHttpClient.builder() + .buildWithDefaults(AttributeMap.builder() + .put(TRUST_ALL_CERTIFICATES, true) + .put(PROTOCOL, Protocol.HTTP2) + .build()); + } + + @AfterEach + public void teardown() throws InterruptedException { + if (server != null) { + server.shutdown(); + } + server = null; + + if (crt != null) { + crt.close(); + } + crt = null; + } + + @Test + public void sendH2Request_overTls() throws Exception { + CompletableFuture request = sendGetRequest(server.port(), crt); + request.join(); + } + + @Test + public void sendH2Request_overPlaintext_usesPriorKnowledge() throws Exception { + H2Server h2cServer = new H2Server(false); + h2cServer.init(); + try (SdkAsyncHttpClient h2cClient = AwsCrtAsyncHttpClient.builder() + .buildWithDefaults(AttributeMap.builder() + .put(PROTOCOL, Protocol.HTTP2) + .build())) { + CompletableFuture request = sendGetRequest(h2cServer.port(), h2cClient, false); + request.join(); + } finally { + h2cServer.shutdown(); + } + } + + private static class H2Server extends ChannelInitializer { + private final boolean useTls; + private final NioEventLoopGroup group = new NioEventLoopGroup(); + private ServerBootstrap bootstrap; + private ServerSocketChannel serverSock; + private SslContext sslCtx; + + H2Server(boolean useTls) { + this.useTls = useTls; + } + + void init() throws Exception { + if (useTls) { + SelfSignedCertificate ssc = new SelfSignedCertificate(); + sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) + .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) + .sslProvider(SslProvider.JDK) + .applicationProtocolConfig(new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2)) + .build(); + } + + bootstrap = new ServerBootstrap() + .channel(NioServerSocketChannel.class) + .group(group) + .childHandler(this); + + serverSock = (ServerSocketChannel) bootstrap.bind(0).sync().channel(); + } + + @Override + protected void initChannel(Channel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + if (useTls) { + pipeline.addLast(sslCtx.newHandler(ch.alloc())); + } + + pipeline.addLast(Http2FrameCodecBuilder.forServer() + .autoAckSettingsFrame(true) + .autoAckPingFrame(true) + .initialSettings(Http2Settings.defaultSettings()) + .frameLogger(new Http2FrameLogger(LogLevel.DEBUG)) + .build()); + + pipeline.addLast(new Http2MultiplexHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) { + if (frame instanceof Http2DataFrame) { + Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText()); + ctx.write(new DefaultHttp2HeadersFrame(headers, false)); + ctx.write(new DefaultHttp2DataFrame(true)); + ctx.flush(); + } + } + }); + } + })); + } + + void shutdown() throws InterruptedException { + group.shutdownGracefully().await(); + serverSock.close(); + } + + int port() { + return serverSock.localAddress().getPort(); + } + } +} diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/H2ErrorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/H2ErrorTest.java new file mode 100644 index 000000000000..11b6caf7429d --- /dev/null +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/H2ErrorTest.java @@ -0,0 +1,199 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static software.amazon.awssdk.http.HttpTestUtils.sendGetRequest; +import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL; +import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http2.DefaultHttp2GoAwayFrame; +import io.netty.handler.codec.http2.DefaultHttp2ResetFrame; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2Error; +import io.netty.handler.codec.http2.Http2Frame; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2FrameLogger; +import io.netty.handler.codec.http2.Http2HeadersFrame; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2SecurityUtil; +import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.SupportedCipherSuiteFilter; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.utils.AttributeMap; + +/** + * Tests HTTP/2 error scenarios: RST_STREAM and GOAWAY frames. + */ +public class H2ErrorTest { + private SdkAsyncHttpClient client; + + @BeforeEach + public void setup() { + client = AwsCrtAsyncHttpClient.builder() + .buildWithDefaults(AttributeMap.builder() + .put(TRUST_ALL_CERTIFICATES, true) + .put(PROTOCOL, Protocol.HTTP2) + .build()); + } + + @AfterEach + public void teardown() { + if (client != null) { + client.close(); + } + } + + @Test + public void serverSendsRstStream_shouldThrowIOException() throws Exception { + H2ErrorServer server = new H2ErrorServer(ErrorType.RST_STREAM); + server.init(); + try { + CompletableFuture request = sendGetRequest(server.port(), client); + assertThatThrownBy(request::join) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(IOException.class) + .hasMessageContaining("RST_STREAM"); + } finally { + server.shutdown(); + } + } + + @Test + public void serverSendsGoAway_shouldThrowIOException() throws Exception { + H2ErrorServer server = new H2ErrorServer(ErrorType.GOAWAY); + server.init(); + try { + CompletableFuture request = sendGetRequest(server.port(), client); + assertThatThrownBy(request::join) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(IOException.class) + .hasMessageContaining("connection has closed"); + } finally { + server.shutdown(); + } + } + + private enum ErrorType { + RST_STREAM, + GOAWAY + } + + private static class H2ErrorServer extends ChannelInitializer { + private final ErrorType errorType; + private final NioEventLoopGroup group = new NioEventLoopGroup(); + private ServerBootstrap bootstrap; + private ServerSocketChannel serverSock; + private SslContext sslCtx; + + H2ErrorServer(ErrorType errorType) { + this.errorType = errorType; + } + + void init() throws Exception { + SelfSignedCertificate ssc = new SelfSignedCertificate(); + sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) + .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) + .sslProvider(SslProvider.JDK) + .applicationProtocolConfig(new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2)) + .build(); + + bootstrap = new ServerBootstrap() + .channel(NioServerSocketChannel.class) + .group(group) + .childHandler(this); + + serverSock = (ServerSocketChannel) bootstrap.bind(0).sync().channel(); + } + + @Override + protected void initChannel(Channel ch) { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(sslCtx.newHandler(ch.alloc())); + pipeline.addLast(Http2FrameCodecBuilder.forServer() + .autoAckSettingsFrame(true) + .autoAckPingFrame(true) + .initialSettings(Http2Settings.defaultSettings()) + .frameLogger(new Http2FrameLogger(LogLevel.DEBUG)) + .build()); + + pipeline.addLast(new Http2MultiplexHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(new ErrorFrameHandler(errorType)); + } + })); + } + + void shutdown() throws InterruptedException { + group.shutdownGracefully().await(); + serverSock.close(); + } + + int port() { + return serverSock.localAddress().getPort(); + } + + private static class ErrorFrameHandler extends SimpleChannelInboundHandler { + private final ErrorType errorType; + + ErrorFrameHandler(ErrorType errorType) { + this.errorType = errorType; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) { + if (frame instanceof Http2HeadersFrame || frame instanceof Http2DataFrame) { + switch (errorType) { + case RST_STREAM: + ctx.writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.INTERNAL_ERROR)); + break; + case GOAWAY: + ctx.channel().parent().writeAndFlush(new DefaultHttp2GoAwayFrame(Http2Error.INTERNAL_ERROR)); + break; + } + } + } + } + } +} diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java index 8e2426cc5371..58d11e068c5b 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java @@ -25,8 +25,6 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -36,18 +34,15 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import software.amazon.awssdk.crt.http.HttpClientConnection; import software.amazon.awssdk.crt.http.HttpException; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpHeaderBlock; import software.amazon.awssdk.crt.http.HttpStream; -import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; -import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; +import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.utils.async.SimplePublisher; @ExtendWith(MockitoExtension.class) public abstract class BaseHttpStreamResponseHandlerTest { - @Mock HttpClientConnection crtConn; CompletableFuture requestFuture; @Mock @@ -56,11 +51,11 @@ public abstract class BaseHttpStreamResponseHandlerTest { @Mock SimplePublisher simplePublisher; - HttpStreamResponseHandler responseHandler; + HttpStreamBaseResponseHandler responseHandler; - abstract HttpStreamResponseHandler responseHandler(); + abstract HttpStreamBaseResponseHandler responseHandler(); - abstract HttpStreamResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher); + abstract HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher); @BeforeEach public void setUp() { @@ -69,7 +64,7 @@ public void setUp() { } @Test - void serverError_shouldShutdownConnection() { + void serverError_shouldCloseStream() { HttpHeader[] httpHeaders = getHttpHeaders(); responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(), httpHeaders); @@ -77,13 +72,12 @@ void serverError_shouldShutdownConnection() { responseHandler.onResponseHeadersDone(httpStream, 0); responseHandler.onResponseComplete(httpStream, 0); requestFuture.join(); - verify(crtConn).close(); verify(httpStream).close(); } @ParameterizedTest @ValueSource(ints = { 200, 400, 202, 403 }) - void nonServerError_shouldNotShutdownConnection(int statusCode) { + void nonServerError_shouldCloseStream(int statusCode) { HttpHeader[] httpHeaders = getHttpHeaders(); responseHandler.onResponseHeaders(httpStream, statusCode, HttpHeaderBlock.MAIN.getValue(), httpHeaders); @@ -92,21 +86,17 @@ void nonServerError_shouldNotShutdownConnection(int statusCode) { responseHandler.onResponseComplete(httpStream, 0); requestFuture.join(); - verify(crtConn, never()).shutdown(); - verify(crtConn).close(); verify(httpStream).close(); } @Test - void failedToGetResponse_shouldShutdownConnection() { + void failedToGetResponse_shouldCloseStream() { HttpHeader[] httpHeaders = getHttpHeaders(); responseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), httpHeaders); responseHandler.onResponseComplete(httpStream, 1); assertThatThrownBy(() -> requestFuture.join()).hasRootCauseInstanceOf(HttpException.class); - verify(crtConn).shutdown(); - verify(crtConn).close(); verify(httpStream).close(); } @@ -120,18 +110,17 @@ void streamClosed_shouldNotIncreaseStreamWindow() throws InterruptedException { responseHandler.onResponseComplete(httpStream, 0); requestFuture.join(); - verify(crtConn).close(); verify(httpStream).close(); verify(httpStream, never()).incrementWindow(anyInt()); } @Test - void publisherWritesFutureFails_shouldShutdownConnection() { + void publisherWritesFutureFails_shouldCloseStream() { SimplePublisher simplePublisher = Mockito.mock(SimplePublisher.class); CompletableFuture future = new CompletableFuture<>(); when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future); - HttpStreamResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); + HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); HttpHeader[] httpHeaders = getHttpHeaders(); handler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), @@ -148,19 +137,17 @@ void publisherWritesFutureFails_shouldShutdownConnection() { // we don't verify here because it behaves differently in async and sync } - verify(crtConn).shutdown(); - verify(crtConn).close(); verify(httpStream).close(); verify(httpStream, never()).incrementWindow(anyInt()); } @Test - void publisherWritesFutureCompletesAfterConnectionClosed_shouldNotInvokeIncrementWindow() { + void publisherWritesFutureCompletesAfterStreamClosed_shouldNotInvokeIncrementWindow() { CompletableFuture future = new CompletableFuture<>(); when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future); when(simplePublisher.complete()).thenReturn(future); - HttpStreamResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); + HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); HttpHeader[] httpHeaders = getHttpHeaders(); @@ -174,19 +161,17 @@ void publisherWritesFutureCompletesAfterConnectionClosed_shouldNotInvokeIncremen future.complete(null); requestFuture.join(); - verify(crtConn, never()).shutdown(); - verify(crtConn).close(); verify(httpStream).close(); verify(httpStream, never()).incrementWindow(anyInt()); } @Test - void publisherWritesFutureCompletesBeforeConnectionClosed_shouldInvokeIncrementWindow() { + void publisherWritesFutureCompletesBeforeStreamClosed_shouldInvokeIncrementWindow() { CompletableFuture future = new CompletableFuture<>(); when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future); when(simplePublisher.complete()).thenReturn(future); - HttpStreamResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); + HttpStreamBaseResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher); HttpHeader[] httpHeaders = getHttpHeaders(); @@ -201,9 +186,6 @@ void publisherWritesFutureCompletesBeforeConnectionClosed_shouldInvokeIncrementW handler.onResponseComplete(httpStream, 0); requestFuture.join(); verify(httpStream).incrementWindow(anyInt()); - - verify(crtConn, never()).shutdown(); - verify(crtConn).close(); verify(httpStream).close(); } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutorTest.java index feaf9db3d472..ff95324f0a74 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutorTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutorTest.java @@ -39,15 +39,15 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import software.amazon.awssdk.crt.CrtRuntimeException; -import software.amazon.awssdk.crt.http.HttpClientConnection; -import software.amazon.awssdk.crt.http.HttpClientConnectionManager; import software.amazon.awssdk.crt.http.HttpException; -import software.amazon.awssdk.crt.http.HttpRequest; +import software.amazon.awssdk.crt.http.HttpRequestBase; +import software.amazon.awssdk.crt.http.HttpStreamBase; +import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; +import software.amazon.awssdk.crt.http.HttpStreamManager; import software.amazon.awssdk.http.SdkCancellationException; import software.amazon.awssdk.http.SdkHttpFullRequest; import software.amazon.awssdk.http.async.AsyncExecuteRequest; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; -import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter; import software.amazon.awssdk.utils.CompletableFutureUtils; @ExtendWith(MockitoExtension.class) @@ -55,18 +55,18 @@ public class CrtAsyncRequestExecutorTest { private CrtAsyncRequestExecutor requestExecutor; @Mock - private HttpClientConnectionManager connectionManager; + private HttpStreamManager streamManager; @Mock private SdkAsyncHttpResponseHandler responseHandler; @Mock - private HttpClientConnection httpClientConnection; + private HttpStreamBase httpStream; public static Stream>> mappedExceptions() { return Stream.of( - new SimpleEntry<>(0x0405, SSLHandshakeException.class), // For AWS_IO_TLS_ERROR_NEGOTIATION_FAILURE (1029) - new SimpleEntry<>(0x0418, ConnectException.class) // For AWS_IO_SOCKET_TIMEOUT (1048) + new SimpleEntry<>(1029, SSLHandshakeException.class), // CRT_TLS_NEGOTIATION_ERROR_CODE + new SimpleEntry<>(1048, ConnectException.class) // CRT_SOCKET_TIMEOUT ); } @@ -77,22 +77,17 @@ public void setup() { @AfterEach public void teardown() { - Mockito.reset(connectionManager, responseHandler, httpClientConnection); + Mockito.reset(streamManager, responseHandler, httpStream); } @Test - public void acquireConnectionThrowException_shouldInvokeOnError() { - RuntimeException exception = new RuntimeException("error"); + public void execute_requestConversionFails_invokesOnError() { CrtAsyncRequestContext context = CrtAsyncRequestContext.builder() - .crtConnPool(connectionManager) + .crtConnPool(streamManager) .request(AsyncExecuteRequest.builder() - .responseHandler(responseHandler) - .build()) + .responseHandler(responseHandler) + .build()) .build(); - CompletableFuture completableFuture = new CompletableFuture<>(); - - Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); - completableFuture.completeExceptionally(exception); CompletableFuture executeFuture = requestExecutor.execute(context); @@ -100,23 +95,19 @@ public void acquireConnectionThrowException_shouldInvokeOnError() { Mockito.verify(responseHandler).onError(argumentCaptor.capture()); Exception actualException = argumentCaptor.getValue(); - assertThat(actualException).hasMessageContaining("An exception occurred when acquiring a connection"); - assertThat(actualException).hasCause(exception); - assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class); + assertThat(actualException).isInstanceOf(NullPointerException.class); + assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class); } @Test - public void invalidRequest_requestConversionThrowError_shouldInvokeOnError() { - CrtAsyncRequestContext context = CrtAsyncRequestContext.builder() - .crtConnPool(connectionManager) - .request(AsyncExecuteRequest.builder() - .responseHandler(responseHandler) - .build()) - .build(); - CompletableFuture completableFuture = new CompletableFuture<>(); + public void execute_acquireStreamFails_invokesOnErrorAndWrapsWithIOException() { + IllegalStateException exception = new IllegalStateException("connection closed"); + CrtAsyncRequestContext context = crtAsyncRequestContext(); + CompletableFuture completableFuture = new CompletableFuture<>(); - Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); - completableFuture.complete(httpClientConnection); + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequestBase.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + .thenReturn(completableFuture); + completableFuture.completeExceptionally(exception); CompletableFuture executeFuture = requestExecutor.execute(context); @@ -124,21 +115,18 @@ public void invalidRequest_requestConversionThrowError_shouldInvokeOnError() { Mockito.verify(responseHandler).onError(argumentCaptor.capture()); Exception actualException = argumentCaptor.getValue(); - assertThat(actualException).isInstanceOf(NullPointerException.class); - assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class); + assertThat(actualException).hasCause(exception); + assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class); } @Test - public void executeAsyncRequest_CrtRuntimeException_shouldInvokeOnError() { + public void execute_crtRuntimeException_invokesOnError() { CrtRuntimeException exception = new CrtRuntimeException(""); CrtAsyncRequestContext context = crtAsyncRequestContext(); - CompletableFuture completableFuture = new CompletableFuture<>(); - - Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); - completableFuture.complete(httpClientConnection); + CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception); - Mockito.when(httpClientConnection.makeRequest(Mockito.any(HttpRequest.class), Mockito.any(CrtResponseAdapter.class))) - .thenThrow(exception); + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequestBase.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + .thenReturn(completableFuture); CompletableFuture executeFuture = requestExecutor.execute(context); @@ -146,22 +134,17 @@ public void executeAsyncRequest_CrtRuntimeException_shouldInvokeOnError() { Mockito.verify(responseHandler).onError(argumentCaptor.capture()); Exception actualException = argumentCaptor.getValue(); - assertThat(actualException).hasMessageContaining("An exception occurred when making the request"); assertThat(actualException).hasCause(exception); assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class); } @Test - public void cancelRequest_shouldInvokeOnError() { - CrtAsyncRequestContext context = CrtAsyncRequestContext.builder() - .crtConnPool(connectionManager) - .request(AsyncExecuteRequest.builder() - .responseHandler(responseHandler) - .build()) - .build(); - CompletableFuture completableFuture = new CompletableFuture<>(); + public void execute_requestCancelled_invokesOnError() { + CrtAsyncRequestContext context = crtAsyncRequestContext(); + CompletableFuture completableFuture = new CompletableFuture<>(); - Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequestBase.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + .thenReturn(completableFuture); CompletableFuture executeFuture = requestExecutor.execute(context); executeFuture.cancel(true); @@ -175,12 +158,13 @@ public void cancelRequest_shouldInvokeOnError() { } @Test - public void execute_AcquireConnectionFailure_shouldAlwaysWrapIOException() { + public void execute_retryableHttpException_wrapsWithIOException() { + HttpException exception = new HttpException(0x080a); // AWS_ERROR_HTTP_CONNECTION_CLOSED CrtAsyncRequestContext context = crtAsyncRequestContext(); - RuntimeException exception = new RuntimeException("some failure"); - CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception); + CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception); - Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequestBase.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + .thenReturn(completableFuture); CompletableFuture executeFuture = requestExecutor.execute(context); assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(IOException.class).hasRootCause(exception); @@ -188,76 +172,29 @@ public void execute_AcquireConnectionFailure_shouldAlwaysWrapIOException() { @ParameterizedTest @MethodSource("mappedExceptions") - public void execute_AcquireConnectionFailure_shouldAlwaysBeInstanceOfIOException(Entry> entry) { + public void execute_httpException_mapsToCorrectException(Entry> entry) { int errorCode = entry.getKey(); - Class ioExceptionSubclass = entry.getValue(); + Class expectedExceptionClass = entry.getValue(); CrtAsyncRequestContext context = crtAsyncRequestContext(); HttpException exception = new HttpException(errorCode); - CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception); - - Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); - - CompletableFuture executeFuture = requestExecutor.execute(context); - assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(IOException.class).hasMessageContaining(exception.getMessage()); - assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(ioExceptionSubclass); - } - - @Test - public void executeRequest_failedOfIllegalStateException_shouldWrapIOException() { - IllegalStateException exception = new IllegalStateException("connection closed"); - CrtAsyncRequestContext context = crtAsyncRequestContext(); - CompletableFuture completableFuture = new CompletableFuture<>(); - - Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); - completableFuture.complete(httpClientConnection); + CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception); - Mockito.when(httpClientConnection.makeRequest(Mockito.any(HttpRequest.class), Mockito.any(CrtResponseAdapter.class))) - .thenThrow(exception); + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequestBase.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + .thenReturn(completableFuture); CompletableFuture executeFuture = requestExecutor.execute(context); - - ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Exception.class); - Mockito.verify(responseHandler).onError(argumentCaptor.capture()); - - Exception actualException = argumentCaptor.getValue(); - assertThat(actualException).hasMessageContaining("An exception occurred when making the request").hasCause(exception); - assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(IOException.class).hasRootCause(exception); + assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass); } @Test - public void executeRequest_failedOfRetryableHttpException_shouldWrapIOException() { - HttpException exception = new HttpException(0x080a); // AWS_ERROR_HTTP_CONNECTION_CLOSED - CrtAsyncRequestContext context = crtAsyncRequestContext(); - CompletableFuture completableFuture = new CompletableFuture<>(); - - Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); - completableFuture.complete(httpClientConnection); - - Mockito.when(httpClientConnection.makeRequest(Mockito.any(HttpRequest.class), Mockito.any(CrtResponseAdapter.class))) - .thenThrow(exception); - - CompletableFuture executeFuture = requestExecutor.execute(context); - - ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Exception.class); - Mockito.verify(responseHandler).onError(argumentCaptor.capture()); - - Exception actualException = argumentCaptor.getValue(); - assertThat(actualException).hasCause(exception); - assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(IOException.class).hasRootCause(exception); - } - - @Test - public void executeRequest_failedOfNonRetryableHttpException_shouldNotWrapIOException() { + public void execute_nonRetryableHttpException_doesNotWrapWithIOException() { HttpException exception = new HttpException(0x0801); // AWS_ERROR_HTTP_HEADER_NOT_FOUND CrtAsyncRequestContext context = crtAsyncRequestContext(); - CompletableFuture completableFuture = new CompletableFuture<>(); - - Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); - completableFuture.complete(httpClientConnection); + CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception); - Mockito.when(httpClientConnection.makeRequest(Mockito.any(HttpRequest.class), Mockito.any(CrtResponseAdapter.class))) - .thenThrow(exception); + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequestBase.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + .thenReturn(completableFuture); CompletableFuture executeFuture = requestExecutor.execute(context); @@ -273,7 +210,7 @@ private CrtAsyncRequestContext crtAsyncRequestContext() { SdkHttpFullRequest request = createRequest(URI.create("http://localhost")); return CrtAsyncRequestContext.builder() .readBufferSize(2000) - .crtConnPool(connectionManager) + .crtConnPool(streamManager) .request(AsyncExecuteRequest.builder() .request(request) .requestContentPublisher(createProvider("")) diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java index 8fdf5b371fab..a7c601495c9e 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java @@ -38,11 +38,11 @@ import org.mockito.junit.jupiter.MockitoExtension; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.crt.CrtRuntimeException; -import software.amazon.awssdk.crt.http.HttpClientConnection; -import software.amazon.awssdk.crt.http.HttpClientConnectionManager; import software.amazon.awssdk.crt.http.HttpException; import software.amazon.awssdk.crt.http.HttpRequest; -import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; +import software.amazon.awssdk.crt.http.HttpStreamBase; +import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; +import software.amazon.awssdk.crt.http.HttpStreamManager; import software.amazon.awssdk.http.HttpExecuteRequest; import software.amazon.awssdk.http.SdkHttpFullRequest; import software.amazon.awssdk.http.SdkHttpFullResponse; @@ -53,10 +53,10 @@ public class CrtRequestExecutorTest { private CrtRequestExecutor requestExecutor; @Mock - private HttpClientConnectionManager connectionManager; + private HttpStreamManager streamManager; @Mock - private HttpClientConnection httpClientConnection; + private HttpStreamBase httpStream; public static Stream retryableExceptions() { return Stream.of(new CrtRuntimeException(""), new HttpException(0x080a), new IllegalStateException( @@ -65,8 +65,8 @@ public static Stream retryableExceptions() { public static Stream>> mappedExceptions() { return Stream.of( - new SimpleEntry<>(0x0405, SSLHandshakeException.class), // For AWS_IO_TLS_ERROR_NEGOTIATION_FAILURE (1029) - new SimpleEntry<>(0x0418, ConnectException.class) // For AWS_IO_SOCKET_TIMEOUT (1048) + new SimpleEntry<>(1029, SSLHandshakeException.class), // CRT_TLS_NEGOTIATION_ERROR_CODE + new SimpleEntry<>(1048, ConnectException.class) // CRT_SOCKET_TIMEOUT ); } @@ -77,16 +77,29 @@ public void setup() { @AfterEach public void teardown() { - Mockito.reset(connectionManager, httpClientConnection); + Mockito.reset(streamManager, httpStream); } @Test - public void acquireConnectionThrowException_shouldInvokeOnError() { - RuntimeException exception = new RuntimeException("error"); + public void execute_requestConversionFails_failsFuture() { + CrtRequestContext context = CrtRequestContext.builder() + .crtConnPool(streamManager) + .request(HttpExecuteRequest.builder().build()) + .build(); + + CompletableFuture executeFuture = requestExecutor.execute(context); + + assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class); + } + + @Test + public void execute_acquireStreamFails_wrapsWithIOException() { + IllegalStateException exception = new IllegalStateException("connection closed"); CrtRequestContext context = crtRequestContext(); - CompletableFuture completableFuture = new CompletableFuture<>(); + CompletableFuture completableFuture = new CompletableFuture<>(); - Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + .thenReturn(completableFuture); completableFuture.completeExceptionally(exception); CompletableFuture executeFuture = requestExecutor.execute(context); @@ -96,60 +109,42 @@ public void acquireConnectionThrowException_shouldInvokeOnError() { @ParameterizedTest @MethodSource("retryableExceptions") - public void makeRequestFailed_retryableException_shouldWrapWithIOException(Throwable throwable) { + public void execute_retryableException_wrapsWithIOException(Throwable throwable) { CrtRequestContext context = crtRequestContext(); - CompletableFuture completableFuture = new CompletableFuture<>(); + CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(throwable); - Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); - completableFuture.complete(httpClientConnection); - - Mockito.when(httpClientConnection.makeRequest(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamResponseHandler.class))) - .thenThrow(throwable); + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + .thenReturn(completableFuture); CompletableFuture executeFuture = requestExecutor.execute(context); assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(throwable).isInstanceOf(IOException.class); } - @Test - public void execute_AcquireConnectionFailure_shouldAlwaysWrapIOException() { - CrtRequestContext context = crtRequestContext(); - RuntimeException exception = new RuntimeException("some failure"); - CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception); - - Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); - - CompletableFuture executeFuture = requestExecutor.execute(context); - assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(IOException.class).hasRootCause(exception); - } - @ParameterizedTest @MethodSource("mappedExceptions") - public void execute_AcquireConnectionFailure_shouldAlwaysBeInstanceOfIOException(Entry> entry) { + public void execute_httpException_mapsToCorrectException(Entry> entry) { int errorCode = entry.getKey(); - Class ioExceptionSubclass = entry.getValue(); + Class expectedExceptionClass = entry.getValue(); CrtRequestContext context = crtRequestContext(); HttpException exception = new HttpException(errorCode); - CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception); + CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception); - Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + .thenReturn(completableFuture); CompletableFuture executeFuture = requestExecutor.execute(context); - assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(IOException.class).hasMessageContaining(exception.getMessage()); - assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(ioExceptionSubclass); + assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass); } @Test - public void executeRequest_failedOfNonRetryableHttpException_shouldNotWrapIOException() { + public void execute_nonRetryableHttpException_doesNotWrapWithIOException() { HttpException exception = new HttpException(0x0801); // AWS_ERROR_HTTP_HEADER_NOT_FOUND CrtRequestContext context = crtRequestContext(); - CompletableFuture completableFuture = new CompletableFuture<>(); - - Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); - completableFuture.complete(httpClientConnection); + CompletableFuture completableFuture = CompletableFutureUtils.failedFuture(exception); - Mockito.when(httpClientConnection.makeRequest(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamResponseHandler.class))) - .thenThrow(exception); + Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class))) + .thenReturn(completableFuture); CompletableFuture executeFuture = requestExecutor.execute(context); assertThatThrownBy(executeFuture::join).hasCause(exception); @@ -159,7 +154,7 @@ private CrtRequestContext crtRequestContext() { SdkHttpFullRequest request = createRequest(URI.create("http://localhost")); return CrtRequestContext.builder() .readBufferSize(2000) - .crtConnPool(connectionManager) + .crtConnPool(streamManager) .request(HttpExecuteRequest.builder() .request(request) .contentStreamProvider(SdkBytes.fromUtf8String("test").asContentStreamProvider()) diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java index 0628efaf15b2..e8865f5e928b 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java @@ -16,58 +16,51 @@ package software.amazon.awssdk.http.crt.internal; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import software.amazon.awssdk.core.http.HttpResponseHandler; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpHeaderBlock; -import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; -import software.amazon.awssdk.http.SdkHttpFullResponse; +import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter; -import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; import software.amazon.awssdk.utils.async.SimplePublisher; public class CrtResponseHandlerTest extends BaseHttpStreamResponseHandlerTest { @Override - HttpStreamResponseHandler responseHandler() { + HttpStreamBaseResponseHandler responseHandler() { AsyncResponseHandler responseHandler = new AsyncResponseHandler<>((response, executionAttributes) -> null, Function.identity(), new ExecutionAttributes()); responseHandler.prepare(); - return CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, responseHandler); + return CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler); } @Override - HttpStreamResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher) { + HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher) { AsyncResponseHandler responseHandler = new AsyncResponseHandler<>((response, executionAttributes) -> null, Function.identity(), new ExecutionAttributes()); responseHandler.prepare(); - return new CrtResponseAdapter(crtConn, requestFuture, responseHandler, simplePublisher); + return new CrtResponseAdapter(requestFuture, responseHandler, simplePublisher); } @Test - void publisherFailedToDeliverEvents_shouldShutDownConnection() { + void onResponseComplete_publisherCancelled_closesStream() { SdkAsyncHttpResponseHandler responseHandler = new TestAsyncHttpResponseHandler(); - HttpStreamResponseHandler crtResponseHandler = CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, responseHandler); + HttpStreamBaseResponseHandler crtResponseHandler = CrtResponseAdapter.toCrtResponseHandler(requestFuture, responseHandler); HttpHeader[] httpHeaders = getHttpHeaders(); crtResponseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), httpHeaders); @@ -77,8 +70,6 @@ void publisherFailedToDeliverEvents_shouldShutDownConnection() { crtResponseHandler.onResponseComplete(httpStream, 0); assertThatThrownBy(() -> requestFuture.join()).isInstanceOf(CancellationException.class).hasMessageContaining( "subscription has been cancelled"); - verify(crtConn).shutdown(); - verify(crtConn).close(); verify(httpStream).close(); } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java index 45c7fdccbfe9..62906295ba5c 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java @@ -16,58 +16,36 @@ package software.amazon.awssdk.http.crt.internal; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import io.reactivex.Completable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; -import net.bytebuddy.utility.RandomString; import org.apache.commons.lang3.RandomStringUtils; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; -import software.amazon.awssdk.crt.http.HttpClientConnection; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpHeaderBlock; -import software.amazon.awssdk.crt.http.HttpStream; -import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; +import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.AbortableInputStream; import software.amazon.awssdk.http.SdkHttpFullResponse; -import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; -import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter; import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; -import software.amazon.awssdk.utils.CompletableFutureUtils; import software.amazon.awssdk.utils.async.SimplePublisher; public class InputStreamAdaptingHttpStreamResponseHandlerTest extends BaseHttpStreamResponseHandlerTest { @Override - HttpStreamResponseHandler responseHandler() { - return new InputStreamAdaptingHttpStreamResponseHandler(crtConn, requestFuture); + HttpStreamBaseResponseHandler responseHandler() { + return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture); } @Override - HttpStreamResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher) { - return new InputStreamAdaptingHttpStreamResponseHandler(crtConn, requestFuture, simplePublisher); + HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher simplePublisher) { + return new InputStreamAdaptingHttpStreamResponseHandler(requestFuture, simplePublisher); } @Test - void abortStream_shouldShutDownConnection() throws IOException { + void abortStream_shouldCloseStream() throws IOException { HttpHeader[] httpHeaders = getHttpHeaders(); responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(), @@ -83,13 +61,11 @@ void abortStream_shouldShutDownConnection() throws IOException { abortableInputStream.read(); abortableInputStream.abort(); - verify(crtConn).shutdown(); - verify(crtConn).close(); verify(httpStream).close(); } @Test - void closeStream_shouldShutdownConnection() throws IOException { + void closeStream_shouldCloseStream() throws IOException { HttpHeader[] httpHeaders = getHttpHeaders(); responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(), @@ -103,23 +79,19 @@ void closeStream_shouldShutdownConnection() throws IOException { AbortableInputStream abortableInputStream = response.content().get(); abortableInputStream.read(); - abortableInputStream.abort(); + abortableInputStream.close(); - verify(crtConn).shutdown(); - verify(crtConn).close(); verify(httpStream).close(); } @Test - void cancelFuture_shouldCloseConnection() { + void cancelFuture_shouldCloseStream() { HttpHeader[] httpHeaders = getHttpHeaders(); responseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), httpHeaders); requestFuture.completeExceptionally(new RuntimeException()); - verify(crtConn).shutdown(); - verify(crtConn).close(); verify(httpStream).close(); } } diff --git a/pom.xml b/pom.xml index d615bd3884a4..b73a2176756b 100644 --- a/pom.xml +++ b/pom.xml @@ -133,7 +133,7 @@ 3.1.5 1.17.1 1.37 - 0.40.3 + 0.43.1 5.10.3 diff --git a/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/AbstractTestCase.java b/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/AbstractTestCase.java index 1a8ed72777bd..8bbb531bac07 100644 --- a/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/AbstractTestCase.java +++ b/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/AbstractTestCase.java @@ -26,6 +26,7 @@ import software.amazon.awssdk.awscore.util.AwsHostNameUtils; import software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.retries.DefaultRetryStrategy; import software.amazon.awssdk.testutils.service.AwsTestBase; public class AbstractTestCase extends AwsTestBase { diff --git a/services/transcribestreaming/pom.xml b/services/transcribestreaming/pom.xml index 1a943e486b39..aab7e5617804 100644 --- a/services/transcribestreaming/pom.xml +++ b/services/transcribestreaming/pom.xml @@ -69,6 +69,12 @@ protocol-core ${awsjavasdk.version} + + software.amazon.awssdk + aws-crt-client + ${awsjavasdk.version} + test + software.amazon.awssdk http-auth-aws diff --git a/services/transcribestreaming/src/it/java/software/amazon/awssdk/services/transcribestreaming/TranscribeStreamingIntegrationTest.java b/services/transcribestreaming/src/it/java/software/amazon/awssdk/services/transcribestreaming/TranscribeStreamingIntegrationTest.java index f2806b355470..62a011029c51 100644 --- a/services/transcribestreaming/src/it/java/software/amazon/awssdk/services/transcribestreaming/TranscribeStreamingIntegrationTest.java +++ b/services/transcribestreaming/src/it/java/software/amazon/awssdk/services/transcribestreaming/TranscribeStreamingIntegrationTest.java @@ -19,7 +19,6 @@ import static org.mockito.Mockito.verify; import static software.amazon.awssdk.http.Header.CONTENT_TYPE; -import io.netty.handler.ssl.SslProvider; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -28,8 +27,10 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; -import org.junit.jupiter.api.condition.EnabledIf; +import io.netty.handler.ssl.SslProvider; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.reactivestreams.Publisher; @@ -44,6 +45,8 @@ import software.amazon.awssdk.http.HttpMetric; import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.ProtocolNegotiation; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils; import software.amazon.awssdk.metrics.MetricCollection; @@ -67,65 +70,90 @@ public class TranscribeStreamingIntegrationTest { private static final Logger log = Logger.loggerFor(TranscribeStreamingIntegrationTest.class); private TranscribeStreamingAsyncClient client; private MetricPublisher mockPublisher; - - private static Stream protocolNegotiations() { - return Stream.of(ProtocolNegotiation.ASSUME_PROTOCOL, ProtocolNegotiation.ALPN); - } - - private static boolean alpnSupported(){ - return NettyUtils.isAlpnSupported(SslProvider.JDK); - } - - @ParameterizedTest - @MethodSource("protocolNegotiations") - @EnabledIf("alpnSupported") - public void testFileWith16kRate(ProtocolNegotiation protocolNegotiation) throws Exception { - initClient(protocolNegotiation); - - CompletableFuture result = client.startStreamTranscription( - getRequest(16_000), - new AudioStreamPublisher(getInputStream("silence_16kHz_s16le.wav")), - TestResponseHandlers.responseHandlerBuilder_Classic()); - - result.join(); - verifyMetrics(); + private SdkAsyncHttpClient httpClient; + + private static Stream httpClients() { + Stream.Builder builder = Stream.builder(); + + // Netty with prior knowledge (ASSUME_PROTOCOL) + builder.add(Arguments.of("Netty-PriorKnowledge", + NettyNioAsyncHttpClient.builder() + .protocol(Protocol.HTTP2) + .protocolNegotiation(ProtocolNegotiation.ASSUME_PROTOCOL) + .build())); + + // Netty with ALPN (only if supported) + if (NettyUtils.isAlpnSupported(SslProvider.JDK)) { + builder.add(Arguments.of("Netty-ALPN", + NettyNioAsyncHttpClient.builder() + .protocol(Protocol.HTTP2) + .protocolNegotiation(ProtocolNegotiation.ALPN) + .build())); + } + + // CRT HTTP/2 + builder.add(Arguments.of("CRT", AwsCrtAsyncHttpClient.builder().build())); + + return builder.build(); } - @ParameterizedTest - @MethodSource("protocolNegotiations") - @EnabledIf("alpnSupported") - public void testFileWith8kRate(ProtocolNegotiation protocolNegotiation) throws Exception { - initClient(protocolNegotiation); - - CompletableFuture result = client.startStreamTranscription( - getRequest(8_000), - new AudioStreamPublisher(getInputStream("silence_8kHz_s16le.wav")), - TestResponseHandlers.responseHandlerBuilder_Consumer()); - - result.get(); + @ParameterizedTest(name = "{0}") + @MethodSource("httpClients") + public void testFileWith16kRate(String clientName, SdkAsyncHttpClient httpClient) throws Exception { + initClient(httpClient); + try { + CompletableFuture result = client.startStreamTranscription( + getRequest(16_000), + new AudioStreamPublisher(getInputStream("silence_16kHz_s16le.wav")), + TestResponseHandlers.responseHandlerBuilder_Classic()); + + result.join(); + verifyMetrics(); + } finally { + cleanup(); + } } - private void initClient(ProtocolNegotiation protocolNegotiation) { - if (client != null) { - client.close(); - } - if (mockPublisher != null) { - mockPublisher.close(); + @ParameterizedTest(name = "{0}") + @MethodSource("httpClients") + public void testFileWith8kRate() throws Exception { + initClient(httpClient); + try { + CompletableFuture result = client.startStreamTranscription( + getRequest(8_000), + new AudioStreamPublisher(getInputStream("silence_8kHz_s16le.wav")), + TestResponseHandlers.responseHandlerBuilder_Consumer()); + + result.get(); + verifyMetrics(); + } finally { + cleanup(); } + } + private void initClient(SdkAsyncHttpClient httpClient) { + this.httpClient = httpClient; mockPublisher = mock(MetricPublisher.class); client = TranscribeStreamingAsyncClient.builder() .region(Region.US_EAST_1) .overrideConfiguration(b -> b.addExecutionInterceptor(new VerifyHeaderInterceptor()) .addMetricPublisher(mockPublisher)) .credentialsProvider(getCredentials()) - .httpClient(NettyNioAsyncHttpClient.builder() - .protocol(Protocol.HTTP2) - .protocolNegotiation(protocolNegotiation) - .build()) + .httpClient(httpClient) .build(); } + private void cleanup() { + if (client != null) { + client.close(); + client = null; + } + if (httpClient != null) { + httpClient.close(); + httpClient = null; + } + } + private static AwsCredentialsProvider getCredentials() { return DefaultCredentialsProvider.create(); } diff --git a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/HttpTestUtils.java b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/HttpTestUtils.java index 0d6d9ea50bc2..24f6c6fbe3ce 100644 --- a/test/http-client-tests/src/main/java/software/amazon/awssdk/http/HttpTestUtils.java +++ b/test/http-client-tests/src/main/java/software/amazon/awssdk/http/HttpTestUtils.java @@ -70,19 +70,24 @@ public static KeyStore getSelfSignedKeyStore() throws Exception { } public static CompletableFuture sendGetRequest(int serverPort, SdkAsyncHttpClient client) { - return sendRequest(serverPort, client, SdkHttpMethod.GET); + return sendRequest(serverPort, client, SdkHttpMethod.GET, true); + } + + public static CompletableFuture sendGetRequest(int serverPort, SdkAsyncHttpClient client, boolean https) { + return sendRequest(serverPort, client, SdkHttpMethod.GET, https); } public static CompletableFuture sendHeadRequest(int serverPort, SdkAsyncHttpClient client) { - return sendRequest(serverPort, client, SdkHttpMethod.HEAD); + return sendRequest(serverPort, client, SdkHttpMethod.HEAD, true); } private static CompletableFuture sendRequest(int serverPort, SdkAsyncHttpClient client, - SdkHttpMethod httpMethod) { + SdkHttpMethod httpMethod, + boolean https) { SdkHttpFullRequest request = SdkHttpFullRequest.builder() .method(httpMethod) - .protocol("https") + .protocol(https ? "https" : "http") .host("127.0.0.1") .port(serverPort) .build();