Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions http-clients/aws-crt-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,31 @@
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
Expand Down Expand Up @@ -91,15 +92,15 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
* we have a pool and no one can destroy it underneath us until we've finished submitting the
* request)
*/
try (HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool(poolKey(asyncRequest.request()))) {
CrtAsyncRequestContext context = CrtAsyncRequestContext.builder()
.crtConnPool(crtConnPool)
.readBufferSize(this.readBufferSize)
.request(asyncRequest)
.build();

return new CrtAsyncRequestExecutor().execute(context);
}
HttpStreamManager crtConnPool = getOrCreateConnectionPool(poolKey(asyncRequest.request()));
CrtAsyncRequestContext context = CrtAsyncRequestContext.builder()
.crtConnPool(crtConnPool)
.readBufferSize(this.readBufferSize)
.request(asyncRequest)
.protocol(this.protocol)
.build();

return new CrtAsyncRequestExecutor().execute(context);
}

/**
Expand Down Expand Up @@ -222,6 +223,14 @@ AwsCrtAsyncHttpClient.Builder connectionHealthConfiguration(Consumer<ConnectionH
AwsCrtAsyncHttpClient.Builder tcpKeepAliveConfiguration(Consumer<TcpKeepAliveConfiguration.Builder>
tcpKeepAliveConfigurationBuilder);

/**
* Configure the HTTP protocol version to use for connections.
*
* @param protocol the HTTP protocol version
* @return The builder for method chaining.
*/
AwsCrtAsyncHttpClient.Builder protocol(Protocol protocol);

/**
* Configure whether to enable a hybrid post-quantum key exchange option for the Transport Layer Security (TLS) network
* encryption protocol when communicating with services that support Post Quantum TLS. If Post Quantum cipher suites are
Expand All @@ -246,6 +255,13 @@ AwsCrtAsyncHttpClient.Builder tcpKeepAliveConfiguration(Consumer<TcpKeepAliveCon
private static final class DefaultAsyncBuilder
extends AwsCrtClientBuilderBase<AwsCrtAsyncHttpClient.Builder> implements Builder {


@Override
public Builder protocol(Protocol protocol) {
getAttributeMap().put(SdkHttpConfigurationOption.PROTOCOL, protocol);
return this;
}

@Override
public SdkAsyncHttpClient build() {
return new AwsCrtAsyncHttpClient(this, getAttributeMap().build()
Expand All @@ -258,5 +274,6 @@ public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
.merge(serviceDefaults)
.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS));
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.SdkHttpFullResponse;
Expand Down Expand Up @@ -56,6 +57,9 @@ public final class AwsCrtHttpClient extends AwsCrtHttpClientBase implements SdkH

private AwsCrtHttpClient(DefaultBuilder builder, AttributeMap config) {
super(builder, config);
if (this.protocol == Protocol.HTTP2) {
throw new UnsupportedOperationException("HTTP/2 is not supported in sync client. Use AwsCrtAsyncHttpClient instead.");
}
}

public static AwsCrtHttpClient.Builder builder() {
Expand Down Expand Up @@ -91,14 +95,13 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
* we have a pool and no one can destroy it underneath us until we've finished submitting the
* request)
*/
try (HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool(poolKey(request.httpRequest()))) {
CrtRequestContext context = CrtRequestContext.builder()
.crtConnPool(crtConnPool)
.readBufferSize(this.readBufferSize)
.request(request)
.build();
return new CrtHttpRequest(context);
}
HttpStreamManager crtConnPool = getOrCreateConnectionPool(poolKey(request.httpRequest()));
CrtRequestContext context = CrtRequestContext.builder()
.crtConnPool(crtConnPool)
.readBufferSize(this.readBufferSize)
.request(request)
.build();
return new CrtHttpRequest(context);
}

private static final class CrtHttpRequest implements ExecutableHttpRequest {
Expand Down Expand Up @@ -140,7 +143,7 @@ public HttpExecuteResponse call() throws IOException {
@Override
public void abort() {
if (responseFuture != null) {
responseFuture.completeExceptionally(new IOException("Request ws cancelled"));
responseFuture.completeExceptionally(new IOException("Request was cancelled"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
import java.util.concurrent.ConcurrentHashMap;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.Http2StreamManagerOptions;
import software.amazon.awssdk.crt.http.HttpClientConnectionManagerOptions;
import software.amazon.awssdk.crt.http.HttpMonitoringOptions;
import software.amazon.awssdk.crt.http.HttpProxyOptions;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.crt.http.HttpStreamManagerOptions;
import software.amazon.awssdk.crt.http.HttpVersion;
import software.amazon.awssdk.crt.io.ClientBootstrap;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.io.TlsContext;
Expand All @@ -57,7 +60,8 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
private static final long DEFAULT_STREAM_WINDOW_SIZE = 16L * 1024L * 1024L; // 16 MB

protected final long readBufferSize;
private final Map<URI, HttpClientConnectionManager> connectionPools = new ConcurrentHashMap<>();
protected final Protocol protocol;
private final Map<URI, HttpStreamManager> connectionPools = new ConcurrentHashMap<>();
private final LinkedList<CrtResource> ownedSubResources = new LinkedList<>();
private final ClientBootstrap bootstrap;
private final SocketOptions socketOptions;
Expand All @@ -67,34 +71,35 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
private final long maxConnectionIdleInMilliseconds;
private final int maxConnectionsPerEndpoint;
private final long connectionAcquisitionTimeout;
private final TlsContextOptions tlsContextOptions;
private boolean isClosed = false;

AwsCrtHttpClientBase(AwsCrtClientBuilderBase builder, AttributeMap config) {
if (config.get(PROTOCOL) == Protocol.HTTP2) {
throw new UnsupportedOperationException("HTTP/2 is not supported in AwsCrtHttpClient yet. Use "
+ "NettyNioAsyncHttpClient instead.");
ClientBootstrap clientBootstrap = new ClientBootstrap(null, null);
SocketOptions clientSocketOptions = buildSocketOptions(builder.getTcpKeepAliveConfiguration(),
config.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT));
TlsContextOptions clientTlsContextOptions =
TlsContextOptions.createDefaultClient()
.withCipherPreference(resolveCipherPreference(builder.getPostQuantumTlsEnabled()))
.withVerifyPeer(!config.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES));
Comment on lines +78 to +84

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we remove the try with resource block? Maybe I am not following the logic, but where do we clean up these resource?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So previously, we have this subresource.addRef() process to increase reference every time we use it and then try with resource to close it. I just removed both to keep the logic simple.

this.protocol = config.get(PROTOCOL);
if (protocol == Protocol.HTTP2) {
clientTlsContextOptions = clientTlsContextOptions.withAlpnList("h2");
}

try (ClientBootstrap clientBootstrap = new ClientBootstrap(null, null);
SocketOptions clientSocketOptions = buildSocketOptions(builder.getTcpKeepAliveConfiguration(),
config.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT));
TlsContextOptions clientTlsContextOptions =
TlsContextOptions.createDefaultClient()
.withCipherPreference(resolveCipherPreference(builder.getPostQuantumTlsEnabled()))
.withVerifyPeer(!config.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES));
TlsContext clientTlsContext = new TlsContext(clientTlsContextOptions)) {

this.bootstrap = registerOwnedResource(clientBootstrap);
this.socketOptions = registerOwnedResource(clientSocketOptions);
this.tlsContext = registerOwnedResource(clientTlsContext);
this.readBufferSize = builder.getReadBufferSizeInBytes() == null ?
DEFAULT_STREAM_WINDOW_SIZE : builder.getReadBufferSizeInBytes();
this.maxConnectionsPerEndpoint = config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS);
this.monitoringOptions = resolveHttpMonitoringOptions(builder.getConnectionHealthConfiguration()).orElse(null);
this.maxConnectionIdleInMilliseconds = config.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toMillis();
this.connectionAcquisitionTimeout = config.get(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT).toMillis();
this.proxyOptions = resolveProxy(builder.getProxyConfiguration(), tlsContext).orElse(null);
}
this.tlsContextOptions = registerOwnedResource(clientTlsContextOptions);
TlsContext clientTlsContext = new TlsContext(clientTlsContextOptions);

this.bootstrap = registerOwnedResource(clientBootstrap);
this.socketOptions = registerOwnedResource(clientSocketOptions);
this.tlsContext = registerOwnedResource(clientTlsContext);
this.readBufferSize = builder.getReadBufferSizeInBytes() == null ?
DEFAULT_STREAM_WINDOW_SIZE : builder.getReadBufferSizeInBytes();
this.maxConnectionsPerEndpoint = config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS);
this.monitoringOptions = resolveHttpMonitoringOptions(builder.getConnectionHealthConfiguration()).orElse(null);
this.maxConnectionIdleInMilliseconds = config.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toMillis();
this.connectionAcquisitionTimeout = config.get(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT).toMillis();
this.proxyOptions = resolveProxy(builder.getProxyConfiguration(), tlsContext).orElse(null);
}

/**
Expand All @@ -106,7 +111,6 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
*/
private <T extends CrtResource> T registerOwnedResource(T subresource) {
if (subresource != null) {
subresource.addRef();
ownedSubResources.push(subresource);
}
return subresource;
Expand All @@ -116,13 +120,16 @@ String clientName() {
return AWS_COMMON_RUNTIME;
}

private HttpClientConnectionManager createConnectionPool(URI uri) {
private HttpStreamManager createConnectionPool(URI uri) {
log.debug(() -> "Creating ConnectionPool for: URI:" + uri + ", MaxConns: " + maxConnectionsPerEndpoint);

HttpClientConnectionManagerOptions options = new HttpClientConnectionManagerOptions()
boolean isHttps = "https".equalsIgnoreCase(uri.getScheme());
TlsContext poolTlsContext = isHttps ? tlsContext : null;

HttpClientConnectionManagerOptions h1Options = new HttpClientConnectionManagerOptions()
.withClientBootstrap(bootstrap)
.withSocketOptions(socketOptions)
.withTlsContext(tlsContext)
.withTlsContext(poolTlsContext)
.withUri(uri)
.withWindowSize(readBufferSize)
.withMaxConnections(maxConnectionsPerEndpoint)
Expand All @@ -132,7 +139,24 @@ private HttpClientConnectionManager createConnectionPool(URI uri) {
.withMaxConnectionIdleInMilliseconds(maxConnectionIdleInMilliseconds)
.withConnectionAcquisitionTimeoutInMilliseconds(connectionAcquisitionTimeout);

return HttpClientConnectionManager.create(options);
HttpStreamManagerOptions options = new HttpStreamManagerOptions()
.withHTTP1ConnectionManagerOptions(h1Options);

if (protocol == Protocol.HTTP2) {
Http2StreamManagerOptions h2Options = new Http2StreamManagerOptions()
.withConnectionManagerOptions(h1Options);

if (!isHttps) {
h2Options.withPriorKnowledge(true);
}

options.withHTTP2StreamManagerOptions(h2Options);
options.withExpectedProtocol(HttpVersion.HTTP_2);
} else {
options.withExpectedProtocol(HttpVersion.HTTP_1_1);
}

return HttpStreamManager.create(options);
}

/*
Expand All @@ -150,14 +174,13 @@ private HttpClientConnectionManager createConnectionPool(URI uri) {
* existing pool. If we add all of execute() to the scope, we include, at minimum a JNI call to the native
* pool implementation.
*/
HttpClientConnectionManager getOrCreateConnectionPool(URI uri) {
HttpStreamManager getOrCreateConnectionPool(URI uri) {
synchronized (this) {
if (isClosed) {
throw new IllegalStateException("Client is closed. No more requests can be made with this client.");
}

HttpClientConnectionManager connPool = connectionPools.computeIfAbsent(uri, this::createConnectionPool);
connPool.addRef();
HttpStreamManager connPool = connectionPools.computeIfAbsent(uri, this::createConnectionPool);
return connPool;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,25 @@
package software.amazon.awssdk.http.crt.internal;

import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.metrics.MetricCollector;

@SdkInternalApi
public final class CrtAsyncRequestContext {
private final AsyncExecuteRequest request;
private final long readBufferSize;
private final HttpClientConnectionManager crtConnPool;
private final HttpStreamManager crtConnPool;
private final MetricCollector metricCollector;
private final Protocol protocol;

private CrtAsyncRequestContext(Builder builder) {
this.request = builder.request;
this.readBufferSize = builder.readBufferSize;
this.crtConnPool = builder.crtConnPool;
this.metricCollector = request.metricCollector().orElse(null);
this.protocol = builder.protocol;
}

public static Builder builder() {
Expand All @@ -46,7 +49,11 @@ public long readBufferSize() {
return readBufferSize;
}

public HttpClientConnectionManager crtConnPool() {
public Protocol protocol() {
return protocol;
}

public HttpStreamManager crtConnPool() {
return crtConnPool;
}

Expand All @@ -57,7 +64,8 @@ public MetricCollector metricCollector() {
public static final class Builder {
private AsyncExecuteRequest request;
private long readBufferSize;
private HttpClientConnectionManager crtConnPool;
private HttpStreamManager crtConnPool;
private Protocol protocol;

private Builder() {
}
Expand All @@ -72,13 +80,19 @@ public Builder readBufferSize(long readBufferSize) {
return this;
}

public Builder crtConnPool(HttpClientConnectionManager crtConnPool) {
public Builder crtConnPool(HttpStreamManager crtConnPool) {
this.crtConnPool = crtConnPool;
return this;
}

public Builder protocol(Protocol protocol) {
this.protocol = protocol;
return this;
}

public CrtAsyncRequestContext build() {
return new CrtAsyncRequestContext(this);
}

}
}
Loading
Loading