From 7214fbe13ab5a78e20d2748c0f257c5f2aac8ce6 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Mon, 9 Feb 2026 14:23:10 -0800 Subject: [PATCH 1/3] Add stability tests and JMH tests # Conflicts: # services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/AbstractTestCase.java --- .../awssdk/benchmark/BenchmarkRunner.java | 8 +- .../async/AwsCrtH2ClientBenchmark.java | 127 +++++++++ ...va => NettyHttpClientH2AlpnBenchmark.java} | 6 +- ...yHttpClientH2PriorKnowledgeBenchmark.java} | 6 +- .../kinesis/KinesisBaseStabilityTest.java | 249 +++++++++++++++++ .../kinesis/KinesisCrtH2StabilityTest.java | 54 ++++ .../tests/kinesis/KinesisStabilityTest.java | 250 ++---------------- .../TranscribeStreamingBaseStabilityTest.java | 84 ++++++ .../TranscribeStreamingCrtStabilityTest.java | 57 ++++ .../TranscribeStreamingStabilityTest.java | 80 +----- 10 files changed, 608 insertions(+), 313 deletions(-) create mode 100644 test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtH2ClientBenchmark.java rename test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/{NettyHttpClientAlpnBenchmark.java => NettyHttpClientH2AlpnBenchmark.java} (92%) rename test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/{NettyHttpClientH2Benchmark.java => NettyHttpClientH2PriorKnowledgeBenchmark.java} (91%) create mode 100644 test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisBaseStabilityTest.java create mode 100644 test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisCrtH2StabilityTest.java create mode 100644 test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/transcribestreaming/TranscribeStreamingBaseStabilityTest.java create mode 100644 test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/transcribestreaming/TranscribeStreamingCrtStabilityTest.java diff --git a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/BenchmarkRunner.java b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/BenchmarkRunner.java index 31fb84f78c22..6cef787fef3d 100644 --- a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/BenchmarkRunner.java +++ b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/BenchmarkRunner.java @@ -39,8 +39,9 @@ import org.openjdk.jmh.runner.options.OptionsBuilder; import software.amazon.awssdk.benchmark.apicall.MetricsEnabledBenchmark; import software.amazon.awssdk.benchmark.apicall.httpclient.async.AwsCrtClientBenchmark; +import software.amazon.awssdk.benchmark.apicall.httpclient.async.AwsCrtH2ClientBenchmark; import software.amazon.awssdk.benchmark.apicall.httpclient.async.NettyHttpClientH1Benchmark; -import software.amazon.awssdk.benchmark.apicall.httpclient.async.NettyHttpClientH2Benchmark; +import software.amazon.awssdk.benchmark.apicall.httpclient.async.NettyHttpClientH2PriorKnowledgeBenchmark; import software.amazon.awssdk.benchmark.apicall.httpclient.sync.ApacheHttpClientBenchmark; import software.amazon.awssdk.benchmark.apicall.httpclient.sync.CrtHttpClientBenchmark; import software.amazon.awssdk.benchmark.apicall.httpclient.sync.UrlConnectionHttpClientBenchmark; @@ -73,9 +74,10 @@ public class BenchmarkRunner { QueryProtocolBenchmark.class.getSimpleName(), XmlProtocolBenchmark.class.getSimpleName()); private static final List ASYNC_BENCHMARKS = Arrays.asList( - NettyHttpClientH2Benchmark.class.getSimpleName(), + NettyHttpClientH2PriorKnowledgeBenchmark.class.getSimpleName(), NettyHttpClientH1Benchmark.class.getSimpleName(), - AwsCrtClientBenchmark.class.getSimpleName()); + AwsCrtClientBenchmark.class.getSimpleName(), + AwsCrtH2ClientBenchmark.class.getSimpleName()); private static final List SYNC_BENCHMARKS = Arrays.asList( ApacheHttpClientBenchmark.class.getSimpleName(), diff --git a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtH2ClientBenchmark.java b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtH2ClientBenchmark.java new file mode 100644 index 000000000000..88ed9906fab6 --- /dev/null +++ b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtH2ClientBenchmark.java @@ -0,0 +1,127 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.benchmark.apicall.httpclient.async; + +import static software.amazon.awssdk.benchmark.utils.BenchmarkConstant.CONCURRENT_CALLS; +import static software.amazon.awssdk.benchmark.utils.BenchmarkUtils.awaitCountdownLatchUninterruptibly; +import static software.amazon.awssdk.benchmark.utils.BenchmarkUtils.countDownUponCompletion; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.profile.StackProfiler; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.benchmark.apicall.httpclient.SdkHttpClientBenchmark; +import software.amazon.awssdk.benchmark.utils.MockH2Server; +import software.amazon.awssdk.crt.Log; +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient; +import software.amazon.awssdk.utils.AttributeMap; + +/** + * Benchmark for CRT HTTP client with HTTP/2 over TLS. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 3, time = 15, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS) +@Fork(2) +@BenchmarkMode(Mode.Throughput) +public class AwsCrtH2ClientBenchmark implements SdkHttpClientBenchmark { + + private MockH2Server mockServer; + private SdkAsyncHttpClient sdkHttpClient; + private ProtocolRestJsonAsyncClient client; + + static { + //System.setProperty("aws.crt.debugnative", "true"); + Log.initLoggingToStdout(Log.LogLevel.Error); + } + + @Setup(Level.Trial) + public void setup() throws Exception { + mockServer = new MockH2Server( true); + mockServer.start(); + + sdkHttpClient = AwsCrtAsyncHttpClient.builder() + .buildWithDefaults(AttributeMap.builder() + .put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true) + .put(SdkHttpConfigurationOption.PROTOCOL, Protocol.HTTP2) + .build()); + + client = ProtocolRestJsonAsyncClient.builder() + .credentialsProvider(() -> AwsBasicCredentials.create("foo", "bar")) + .endpointOverride(mockServer.getHttpsUri()) + .httpClient(sdkHttpClient) + .region(Region.US_EAST_1) + .build(); + + client.allTypes().join(); + } + + @TearDown(Level.Trial) + public void tearDown() throws Exception { + mockServer.stop(); + client.close(); + sdkHttpClient.close(); + } + + @Override + @Benchmark + @OperationsPerInvocation(CONCURRENT_CALLS) + public void concurrentApiCall(Blackhole blackhole) { + CountDownLatch countDownLatch = new CountDownLatch(CONCURRENT_CALLS); + for (int i = 0; i < CONCURRENT_CALLS; i++) { + countDownUponCompletion(blackhole, client.allTypes(), countDownLatch); + } + awaitCountdownLatchUninterruptibly(countDownLatch, 10, TimeUnit.SECONDS); + } + + @Override + @Benchmark + public void sequentialApiCall(Blackhole blackhole) { + CountDownLatch countDownLatch = new CountDownLatch(1); + countDownUponCompletion(blackhole, client.allTypes(), countDownLatch); + awaitCountdownLatchUninterruptibly(countDownLatch, 1, TimeUnit.SECONDS); + } + + public static void main(String... args) throws Exception { + Options opt = new OptionsBuilder() + .include(AwsCrtH2ClientBenchmark.class.getSimpleName()) + .addProfiler(StackProfiler.class) + .build(); + new Runner(opt).run(); + } +} diff --git a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/NettyHttpClientAlpnBenchmark.java b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/NettyHttpClientH2AlpnBenchmark.java similarity index 92% rename from test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/NettyHttpClientAlpnBenchmark.java rename to test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/NettyHttpClientH2AlpnBenchmark.java index 0972b9cd3db6..1cf4a7d92756 100644 --- a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/NettyHttpClientAlpnBenchmark.java +++ b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/NettyHttpClientH2AlpnBenchmark.java @@ -38,6 +38,7 @@ import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.benchmark.utils.MockH2Server; import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.ProtocolNegotiation; @@ -54,7 +55,7 @@ @Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS) @Fork(2) // To reduce difference between each run @BenchmarkMode(Mode.Throughput) -public class NettyHttpClientAlpnBenchmark extends BaseNettyBenchmark { +public class NettyHttpClientH2AlpnBenchmark extends BaseNettyBenchmark { private MockH2Server mockServer; private SdkAsyncHttpClient sdkHttpClient; @@ -76,6 +77,7 @@ public void setup() throws Exception { .protocolNegotiation(ProtocolNegotiation.ALPN) .buildWithDefaults(trustAllTlsAttributeMapBuilder().build()); client = ProtocolRestJsonAsyncClient.builder() + .credentialsProvider(() -> AwsBasicCredentials.create("foo", "bar")) .endpointOverride(mockServer.getHttpsUri()) .httpClient(sdkHttpClient) .region(Region.US_EAST_1) @@ -94,7 +96,7 @@ public void tearDown() throws Exception { public static void main(String... args) throws Exception { Options opt = new OptionsBuilder() - .include(NettyHttpClientH2Benchmark.class.getSimpleName()) + .include(NettyHttpClientH2PriorKnowledgeBenchmark.class.getSimpleName()) .build(); Collection run = new Runner(opt).run(); } diff --git a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/NettyHttpClientH2Benchmark.java b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/NettyHttpClientH2PriorKnowledgeBenchmark.java similarity index 91% rename from test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/NettyHttpClientH2Benchmark.java rename to test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/NettyHttpClientH2PriorKnowledgeBenchmark.java index ffbd537da1aa..0bd167c8dc4d 100644 --- a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/NettyHttpClientH2Benchmark.java +++ b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/NettyHttpClientH2PriorKnowledgeBenchmark.java @@ -38,6 +38,7 @@ import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.benchmark.utils.MockH2Server; import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; @@ -53,7 +54,7 @@ @Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS) @Fork(2) // To reduce difference between each run @BenchmarkMode(Mode.Throughput) -public class NettyHttpClientH2Benchmark extends BaseNettyBenchmark { +public class NettyHttpClientH2PriorKnowledgeBenchmark extends BaseNettyBenchmark { private MockH2Server mockServer; private SdkAsyncHttpClient sdkHttpClient; @@ -74,6 +75,7 @@ public void setup() throws Exception { .protocol(Protocol.HTTP2) .buildWithDefaults(trustAllTlsAttributeMapBuilder().build()); client = ProtocolRestJsonAsyncClient.builder() + .credentialsProvider(() -> AwsBasicCredentials.create("foo", "bar")) .endpointOverride(mockServer.getHttpsUri()) .httpClient(sdkHttpClient) .region(Region.US_EAST_1) @@ -92,7 +94,7 @@ public void tearDown() throws Exception { public static void main(String... args) throws Exception { Options opt = new OptionsBuilder() - .include(NettyHttpClientH2Benchmark.class.getSimpleName()) + .include(NettyHttpClientH2PriorKnowledgeBenchmark.class.getSimpleName()) .build(); Collection run = new Runner(opt).run(); } diff --git a/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisBaseStabilityTest.java b/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisBaseStabilityTest.java new file mode 100644 index 000000000000..497dc05747ba --- /dev/null +++ b/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisBaseStabilityTest.java @@ -0,0 +1,249 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.stability.tests.kinesis; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.IntFunction; +import java.util.stream.Collectors; +import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ConsumerStatus; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.awssdk.services.kinesis.model.StreamStatus; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; +import software.amazon.awssdk.stability.tests.utils.StabilityTestRunner; +import software.amazon.awssdk.stability.tests.utils.TestEventStreamingResponseHandler; +import software.amazon.awssdk.testutils.Waiter; +import software.amazon.awssdk.testutils.service.AwsTestBase; +import software.amazon.awssdk.utils.Logger; + +/** + * Base class for Kinesis stability tests. + */ +public abstract class KinesisBaseStabilityTest extends AwsTestBase { + private static final Logger log = Logger.loggerFor(KinesisBaseStabilityTest.class); + protected static final int CONSUMER_COUNT = 4; + protected static final int SHARD_COUNT = 9; + protected static final int CONCURRENCY = CONSUMER_COUNT * SHARD_COUNT; + protected static final int MAX_CONCURRENCY = CONCURRENCY + 10; + + protected List consumerArns; + protected List shardIds; + protected List producedData; + protected KinesisAsyncClient asyncClient; + protected String streamName; + protected String streamARN; + protected ExecutorService waiterExecutorService; + protected ScheduledExecutorService producer; + + protected abstract KinesisAsyncClient createClient(); + protected abstract String getTestNamePrefix(); + protected abstract String getConsumerPrefix(); + + @BeforeEach + public void setup() { + streamName = getTestNamePrefix().toLowerCase() + System.currentTimeMillis(); + consumerArns = new ArrayList<>(CONSUMER_COUNT); + shardIds = new ArrayList<>(SHARD_COUNT); + producedData = new ArrayList<>(); + + asyncClient = createClient(); + + asyncClient.createStream(r -> r.streamName(streamName).shardCount(SHARD_COUNT)).join(); + waitForStreamToBeActive(); + + streamARN = asyncClient.describeStream(r -> r.streamName(streamName)).join() + .streamDescription() + .streamARN(); + + shardIds = asyncClient.listShards(r -> r.streamName(streamName)) + .join() + .shards().stream().map(Shard::shardId).collect(Collectors.toList()); + + waiterExecutorService = Executors.newFixedThreadPool(CONSUMER_COUNT); + producer = Executors.newScheduledThreadPool(1); + registerStreamConsumers(); + waitForConsumersToBeActive(); + } + + @AfterEach + public void tearDown() { + asyncClient.deleteStream(b -> b.streamName(streamName).enforceConsumerDeletion(true)).join(); + waiterExecutorService.shutdown(); + producer.shutdown(); + asyncClient.close(); + } + + protected void runPutRecordsAndSubscribeToShard() throws InterruptedException { + putRecords(); + subscribeToShard(); + } + + private void subscribeToShard() throws InterruptedException { + log.info(() -> "Starting subscribeToShard to stream: " + streamName); + List> completableFutures = generateSubscribeToShardFutures(); + StabilityTestRunner.newRunner() + .testName(getTestNamePrefix() + ".subscribeToShard") + .futures(completableFutures) + .run(); + } + + private void registerStreamConsumers() { + log.info(() -> "Registering stream consumers for " + streamARN); + IntFunction> futureFunction = i -> + asyncClient.registerStreamConsumer(r -> r.streamARN(streamARN).consumerName(getConsumerPrefix() + i)) + .thenApply(b -> consumerArns.add(b.consumer().consumerARN())); + + StabilityTestRunner.newRunner() + .requestCountPerRun(CONSUMER_COUNT) + .totalRuns(1) + .testName(getTestNamePrefix() + ".registerStreamConsumers") + .futureFactory(futureFunction) + .run(); + } + + private void putRecords() { + log.info(() -> "Starting putRecord"); + producedData = new ArrayList<>(); + SdkBytes data = SdkBytes.fromByteArray(RandomUtils.nextBytes(20)); + IntFunction> futureFunction = i -> + asyncClient.putRecord(PutRecordRequest.builder() + .streamName(streamName) + .data(data) + .partitionKey(UUID.randomUUID().toString()) + .build()) + .thenApply(b -> producedData.add(data)); + + StabilityTestRunner.newRunner() + .requestCountPerRun(CONCURRENCY) + .testName(getTestNamePrefix() + ".putRecords") + .futureFactory(futureFunction) + .run(); + } + + private List> generateSubscribeToShardFutures() throws InterruptedException { + List> completableFutures = new ArrayList<>(); + int baseDelay = 150; + int jitterRange = 150; + for (int i = 0; i < CONSUMER_COUNT; i++) { + final int consumerIndex = i; + for (int j = 0; j < SHARD_COUNT; j++) { + final int shardIndex = j; + Thread.sleep(baseDelay + (int) (Math.random() * jitterRange)); + TestSubscribeToShardResponseHandler responseHandler = + new TestSubscribeToShardResponseHandler(consumerIndex, shardIndex); + CompletableFuture completableFuture = + asyncClient.subscribeToShard(b -> b.shardId(shardIds.get(shardIndex)) + .consumerARN(consumerArns.get(consumerIndex)) + .startingPosition(s -> s.type(ShardIteratorType.TRIM_HORIZON)), + responseHandler) + .thenAccept(b -> { + if (responseHandler.allEventsReceived && !responseHandler.receivedData.isEmpty()) { + assertThat(producedData).as(responseHandler.id + " has not received all events") + .containsSequence(responseHandler.receivedData); + } + }); + completableFutures.add(completableFuture); + } + } + return completableFutures; + } + + private void waitForStreamToBeActive() { + Waiter.run(() -> asyncClient.describeStream(r -> r.streamName(streamName)).join()) + .until(b -> b.streamDescription().streamStatus().equals(StreamStatus.ACTIVE)) + .orFailAfter(Duration.ofMinutes(5)); + + Waiter.run(() -> { + try { + asyncClient.listShards(r -> r.streamName(streamName)).join(); + return true; + } catch (Exception e) { + if (e.getCause() instanceof ResourceInUseException) { + return false; + } + throw e; + } + }) + .until(Boolean::booleanValue) + .orFailAfter(Duration.ofMinutes(1)); + } + + private void waitForConsumersToBeActive() { + CompletableFuture[] completableFutures = + consumerArns.stream() + .map(a -> CompletableFuture.supplyAsync( + () -> Waiter.run(() -> asyncClient.describeStreamConsumer(b -> b.consumerARN(a)).join()) + .until(b -> b.consumerDescription().consumerStatus().equals(ConsumerStatus.ACTIVE)) + .orFailAfter(Duration.ofMinutes(5)), waiterExecutorService)) + .toArray(CompletableFuture[]::new); + + CompletableFuture.allOf(completableFutures).join(); + } + + private static class TestSubscribeToShardResponseHandler + extends TestEventStreamingResponseHandler + implements SubscribeToShardResponseHandler { + + private final List receivedData = new ArrayList<>(); + private final String id; + private volatile boolean allEventsReceived = false; + + TestSubscribeToShardResponseHandler(int consumerIndex, int shardIndex) { + id = "consumer_" + consumerIndex + "_shard_" + shardIndex; + } + + @Override + public void onEventStream(SdkPublisher publisher) { + publisher.filter(SubscribeToShardEvent.class) + .subscribe(b -> { + log.debug(() -> "sequenceNumber " + b.records() + "_" + id); + receivedData.addAll(b.records().stream().map(Record::data).collect(Collectors.toList())); + }); + } + + @Override + public void exceptionOccurred(Throwable throwable) { + log.error(() -> "Exception from " + id, throwable); + } + + @Override + public void complete() { + allEventsReceived = true; + log.info(() -> "All events received for " + id); + } + } +} diff --git a/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisCrtH2StabilityTest.java b/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisCrtH2StabilityTest.java new file mode 100644 index 000000000000..6b2ccb687556 --- /dev/null +++ b/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisCrtH2StabilityTest.java @@ -0,0 +1,54 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.stability.tests.kinesis; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.stability.tests.exceptions.StabilityTestsRetryableException; +import software.amazon.awssdk.testutils.retry.RetryableTest; +import software.amazon.awssdk.utils.AttributeMap; + +/** + * Stability tests for Kinesis using CRT HTTP client with HTTP/2. + */ +public class KinesisCrtH2StabilityTest extends KinesisBaseStabilityTest { + + @Override + protected KinesisAsyncClient createClient() { + return KinesisAsyncClient.builder() + .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) + .httpClientBuilder(AwsCrtAsyncHttpClient.builder() + .maxConcurrency(MAX_CONCURRENCY)) + .build(); + } + + @Override + protected String getTestNamePrefix() { + return "KinesisCrtH2StabilityTest"; + } + + @Override + protected String getConsumerPrefix() { + return "kinesiscrth2stabilitytestconsumer_"; + } + + @RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class) + public void putRecords_subscribeToShard() throws InterruptedException { + runPutRecordsAndSubscribeToShard(); + } +} diff --git a/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisStabilityTest.java b/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisStabilityTest.java index d16717c6a8db..c1a38700423e 100644 --- a/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisStabilityTest.java +++ b/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisStabilityTest.java @@ -15,254 +15,36 @@ package software.amazon.awssdk.stability.tests.kinesis; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.function.IntFunction; -import java.util.stream.Collectors; -import org.apache.commons.lang3.RandomUtils; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import software.amazon.awssdk.core.SdkBytes; -import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.awssdk.services.kinesis.model.ConsumerStatus; -import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; -import software.amazon.awssdk.services.kinesis.model.Record; -import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; -import software.amazon.awssdk.services.kinesis.model.Shard; -import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; -import software.amazon.awssdk.services.kinesis.model.StreamStatus; -import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; -import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; -import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; -import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import software.amazon.awssdk.stability.tests.exceptions.StabilityTestsRetryableException; import software.amazon.awssdk.testutils.retry.RetryableTest; -import software.amazon.awssdk.stability.tests.utils.StabilityTestRunner; -import software.amazon.awssdk.stability.tests.utils.TestEventStreamingResponseHandler; -import software.amazon.awssdk.testutils.Waiter; -import software.amazon.awssdk.testutils.service.AwsTestBase; -import software.amazon.awssdk.utils.Logger; /** - * Stability Tests using Kinesis. - * We can make one call to SubscribeToShard per second per registered consumer per shard - * Limit: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html + * Stability tests for Kinesis using Netty HTTP client. */ -public class KinesisStabilityTest extends AwsTestBase { - private static final Logger log = Logger.loggerFor(KinesisStabilityTest.class.getSimpleName()); - private static final int CONSUMER_COUNT = 4; - private static final int SHARD_COUNT = 9; - // one request per consumer/shard combination - private static final int CONCURRENCY = CONSUMER_COUNT * SHARD_COUNT; - private static final int MAX_CONCURRENCY = CONCURRENCY + 10; - - public static final String CONSUMER_PREFIX = "kinesisstabilitytestconsumer_"; - private List consumerArns; - private List shardIds; - private List producedData; - private KinesisAsyncClient asyncClient; - private String streamName; - private String streamARN; - private ExecutorService waiterExecutorService; - private ScheduledExecutorService producer; - - @BeforeEach - public void setup() { - streamName = "kinesisstabilitytest" + System.currentTimeMillis(); - consumerArns = new ArrayList<>(CONSUMER_COUNT); - shardIds = new ArrayList<>(SHARD_COUNT); - producedData = new ArrayList<>(); - asyncClient = KinesisAsyncClient.builder() - .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) - .httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(MAX_CONCURRENCY)) - .build(); +public class KinesisStabilityTest extends KinesisBaseStabilityTest { - asyncClient.createStream(r -> r.streamName(streamName) - .shardCount(SHARD_COUNT)) - .join(); - waitForStreamToBeActive(); - - streamARN = asyncClient.describeStream(r -> r.streamName(streamName)).join() - .streamDescription() - .streamARN(); - - shardIds = asyncClient.listShards(r -> r.streamName(streamName)) - .join() - .shards().stream().map(Shard::shardId).collect(Collectors.toList()); + @Override + protected KinesisAsyncClient createClient() { + return KinesisAsyncClient.builder() + .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) + .httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(MAX_CONCURRENCY)) + .build(); + } - waiterExecutorService = Executors.newFixedThreadPool(CONSUMER_COUNT); - producer = Executors.newScheduledThreadPool(1); - registerStreamConsumers(); - waitForConsumersToBeActive(); + @Override + protected String getTestNamePrefix() { + return "KinesisStabilityTest"; } - @AfterEach - public void tearDown() { - asyncClient.deleteStream(b -> b.streamName(streamName).enforceConsumerDeletion(true)).join(); - waiterExecutorService.shutdown(); - producer.shutdown(); - asyncClient.close(); + @Override + protected String getConsumerPrefix() { + return "kinesisstabilitytestconsumer_"; } @RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class) public void putRecords_subscribeToShard() throws InterruptedException { - putRecords(); - subscribeToShard(); - } - - /** - * We only have one run of subscribeToShard tests because it takes 5 minutes. - */ - private void subscribeToShard() throws InterruptedException { - log.info(() -> "starting to test subscribeToShard to stream: " + streamName); - List> completableFutures = generateSubscribeToShardFutures(); - StabilityTestRunner.newRunner() - .testName("KinesisStabilityTest.subscribeToShard") - .futures(completableFutures) - .run(); - } - - private void registerStreamConsumers() { - log.info(() -> "Starting to register stream consumer " + streamARN); - IntFunction> futureFunction = i -> asyncClient.registerStreamConsumer(r -> r.streamARN(streamARN) - .consumerName(CONSUMER_PREFIX + i)) - .thenApply(b -> consumerArns.add(b.consumer().consumerARN())); - - StabilityTestRunner.newRunner() - .requestCountPerRun(CONSUMER_COUNT) - .totalRuns(1) - .testName("KinesisStabilityTest.registerStreamConsumers") - .futureFactory(futureFunction) - .run(); - } - - private void putRecords() { - log.info(() -> "Starting to test putRecord"); - producedData = new ArrayList<>(); - SdkBytes data = SdkBytes.fromByteArray(RandomUtils.nextBytes(20)); - IntFunction> futureFunction = - i -> asyncClient.putRecord(PutRecordRequest.builder() - .streamName(streamName) - .data(data) - .partitionKey(UUID.randomUUID().toString()) - .build()) - .thenApply(b -> producedData.add(data)); - - StabilityTestRunner.newRunner() - .requestCountPerRun(CONCURRENCY) - .testName("KinesisStabilityTest.putRecords") - .futureFactory(futureFunction) - .run(); - } - - /** - * Generate request per consumer/shard combination - * @return a lit of completablefutures - */ - private List> generateSubscribeToShardFutures() throws InterruptedException { - List> completableFutures = new ArrayList<>(); - int baseDelay = 150; - int jitterRange = 150; - for (int i = 0; i < CONSUMER_COUNT; i++) { - final int consumerIndex = i; - for (int j = 0; j < SHARD_COUNT; j++) { - final int shardIndex = j; - Thread.sleep(baseDelay + (int)(Math.random() * jitterRange)); - TestSubscribeToShardResponseHandler responseHandler = - new TestSubscribeToShardResponseHandler(consumerIndex, shardIndex); - CompletableFuture completableFuture = - asyncClient.subscribeToShard(b -> b.shardId(shardIds.get(shardIndex)) - .consumerARN(consumerArns.get(consumerIndex)) - .startingPosition(s -> s.type(ShardIteratorType.TRIM_HORIZON)), - responseHandler) - .thenAccept(b -> { - // Only verify data if all events have been received and the received data is not empty. - // It is possible the received data is empty because there is no record at the position - // event with TRIM_HORIZON. - if (responseHandler.allEventsReceived && !responseHandler.receivedData.isEmpty()) { - assertThat(producedData).as(responseHandler.id + " has not received all events" - + ".").containsSequence(responseHandler.receivedData); - } - }); - completableFutures.add(completableFuture); - } - } - return completableFutures; - } - - private void waitForStreamToBeActive() { - Waiter.run(() -> asyncClient.describeStream(r -> r.streamName(streamName)) - .join()) - .until(b -> b.streamDescription().streamStatus().equals(StreamStatus.ACTIVE)) - .orFailAfter(Duration.ofMinutes(5)); - - // Additional verification to ensure stream is fully operational - Waiter.run(() -> { - try { - asyncClient.listShards(r -> r.streamName(streamName)).join(); - return true; - } catch (Exception e) { - if (e.getCause() instanceof ResourceInUseException) { - return false; - } - throw e; - } - }) - .until(Boolean::booleanValue) - .orFailAfter(Duration.ofMinutes(1)); - } - - private void waitForConsumersToBeActive() { - CompletableFuture[] completableFutures = - consumerArns.stream() - .map(a -> CompletableFuture.supplyAsync(() -> Waiter.run(() -> asyncClient.describeStreamConsumer(b -> b.consumerARN(a)) - .join()) - .until(b -> b.consumerDescription().consumerStatus().equals(ConsumerStatus.ACTIVE)) - .orFailAfter(Duration.ofMinutes(5)), waiterExecutorService)) - .toArray(CompletableFuture[]::new); - - CompletableFuture.allOf(completableFutures).join(); - } - - private static class TestSubscribeToShardResponseHandler extends TestEventStreamingResponseHandler implements SubscribeToShardResponseHandler { - private final List receivedData = new ArrayList<>(); - private final String id; - private volatile boolean allEventsReceived = false; - - TestSubscribeToShardResponseHandler(int consumerIndex, int shardIndex) { - id = "consumer_" + consumerIndex + "_shard_" + shardIndex; - } - - @Override - public void onEventStream(SdkPublisher publisher) { - publisher.filter(SubscribeToShardEvent.class) - .subscribe(b -> { - log.debug(() -> "sequenceNumber " + b.records() + "_" + id); - receivedData.addAll(b.records().stream().map(Record::data).collect(Collectors.toList())); - }); - } - - @Override - public void exceptionOccurred(Throwable throwable) { - log.error(() -> "An exception was thrown from " + id, throwable); - } - - @Override - public void complete() { - allEventsReceived = true; - log.info(() -> "All events stream successfully " + id); - } + runPutRecordsAndSubscribeToShard(); } } diff --git a/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/transcribestreaming/TranscribeStreamingBaseStabilityTest.java b/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/transcribestreaming/TranscribeStreamingBaseStabilityTest.java new file mode 100644 index 000000000000..6771d5af093a --- /dev/null +++ b/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/transcribestreaming/TranscribeStreamingBaseStabilityTest.java @@ -0,0 +1,84 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.stability.tests.transcribestreaming; + +import java.io.InputStream; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.services.transcribestreaming.TranscribeStreamingAsyncClient; +import software.amazon.awssdk.services.transcribestreaming.model.AudioStream; +import software.amazon.awssdk.services.transcribestreaming.model.LanguageCode; +import software.amazon.awssdk.services.transcribestreaming.model.MediaEncoding; +import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponse; +import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponseHandler; +import software.amazon.awssdk.services.transcribestreaming.model.TranscriptEvent; +import software.amazon.awssdk.services.transcribestreaming.model.TranscriptResultStream; +import software.amazon.awssdk.stability.tests.utils.StabilityTestRunner; +import software.amazon.awssdk.stability.tests.utils.TestEventStreamingResponseHandler; +import software.amazon.awssdk.stability.tests.utils.TestTranscribeStreamingSubscription; +import software.amazon.awssdk.testutils.service.AwsTestBase; +import software.amazon.awssdk.utils.Logger; + +/** + * Base class for Transcribe Streaming stability tests. + */ +public abstract class TranscribeStreamingBaseStabilityTest extends AwsTestBase { + private static final Logger log = Logger.loggerFor(TranscribeStreamingBaseStabilityTest.class); + protected static final int CONCURRENCY = 2; + protected static final int TOTAL_RUNS = 1; + + protected static InputStream audioFileInputStream; + + protected static InputStream getInputStream() { + return TranscribeStreamingBaseStabilityTest.class.getResourceAsStream("silence_8kHz.wav"); + } + + protected void runTranscriptionTest(TranscribeStreamingAsyncClient client, String testName) { + IntFunction> futureIntFunction = i -> + client.startStreamTranscription(b -> b.mediaSampleRateHertz(8_000) + .languageCode(LanguageCode.EN_US) + .mediaEncoding(MediaEncoding.PCM), + new AudioStreamPublisher(), + new TestResponseHandler()); + StabilityTestRunner.newRunner() + .futureFactory(futureIntFunction) + .totalRuns(TOTAL_RUNS) + .requestCountPerRun(CONCURRENCY) + .testName(testName) + .run(); + } + + protected static class AudioStreamPublisher implements Publisher { + @Override + public void subscribe(Subscriber s) { + s.onSubscribe(new TestTranscribeStreamingSubscription(s, audioFileInputStream)); + } + } + + protected static class TestResponseHandler + extends TestEventStreamingResponseHandler + implements StartStreamTranscriptionResponseHandler { + + @Override + public void onEventStream(SdkPublisher publisher) { + publisher.filter(TranscriptEvent.class) + .subscribe(result -> log.debug(() -> "Record Batch - " + result.transcript().results())); + } + } +} diff --git a/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/transcribestreaming/TranscribeStreamingCrtStabilityTest.java b/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/transcribestreaming/TranscribeStreamingCrtStabilityTest.java new file mode 100644 index 000000000000..c106177fc362 --- /dev/null +++ b/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/transcribestreaming/TranscribeStreamingCrtStabilityTest.java @@ -0,0 +1,57 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.stability.tests.transcribestreaming; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; +import software.amazon.awssdk.services.transcribestreaming.TranscribeStreamingAsyncClient; +import software.amazon.awssdk.stability.tests.exceptions.StabilityTestsRetryableException; +import software.amazon.awssdk.testutils.retry.RetryableTest; +import software.amazon.awssdk.utils.AttributeMap; + +/** + * Stability tests for Transcribe Streaming using CRT HTTP client with HTTP/2. + */ +public class TranscribeStreamingCrtStabilityTest extends TranscribeStreamingBaseStabilityTest { + + private static TranscribeStreamingAsyncClient asyncClient; + + @BeforeAll + public static void setup() { + asyncClient = TranscribeStreamingAsyncClient.builder() + .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) + .httpClientBuilder(AwsCrtAsyncHttpClient.builder()) + .build(); + + audioFileInputStream = getInputStream(); + if (audioFileInputStream == null) { + throw new RuntimeException("Failed to get audio input stream"); + } + } + + @AfterAll + public static void tearDown() { + asyncClient.close(); + } + + @RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class) + public void startTranscription_crtH2() { + runTranscriptionTest(asyncClient, "TranscribeStreamingCrtStabilityTest.startTranscription"); + } +} diff --git a/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/transcribestreaming/TranscribeStreamingStabilityTest.java b/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/transcribestreaming/TranscribeStreamingStabilityTest.java index 4e277af0a5dc..c445026ea452 100644 --- a/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/transcribestreaming/TranscribeStreamingStabilityTest.java +++ b/test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/transcribestreaming/TranscribeStreamingStabilityTest.java @@ -15,41 +15,23 @@ package software.amazon.awssdk.stability.tests.transcribestreaming; -import java.io.InputStream; import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.function.IntFunction; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.ProtocolNegotiation; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.services.transcribestreaming.TranscribeStreamingAsyncClient; -import software.amazon.awssdk.services.transcribestreaming.model.AudioStream; -import software.amazon.awssdk.services.transcribestreaming.model.LanguageCode; -import software.amazon.awssdk.services.transcribestreaming.model.MediaEncoding; -import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponse; -import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponseHandler; -import software.amazon.awssdk.services.transcribestreaming.model.TranscriptEvent; -import software.amazon.awssdk.services.transcribestreaming.model.TranscriptResultStream; import software.amazon.awssdk.stability.tests.exceptions.StabilityTestsRetryableException; import software.amazon.awssdk.testutils.retry.RetryableTest; -import software.amazon.awssdk.stability.tests.utils.StabilityTestRunner; -import software.amazon.awssdk.stability.tests.utils.TestEventStreamingResponseHandler; -import software.amazon.awssdk.stability.tests.utils.TestTranscribeStreamingSubscription; -import software.amazon.awssdk.testutils.service.AwsTestBase; -import software.amazon.awssdk.utils.Logger; -public class TranscribeStreamingStabilityTest extends AwsTestBase { - private static final Logger log = Logger.loggerFor(TranscribeStreamingStabilityTest.class.getSimpleName()); - public static final int CONCURRENCY = 2; - public static final int TOTAL_RUNS = 1; +/** + * Stability tests for Transcribe Streaming using Netty HTTP client. + */ +public class TranscribeStreamingStabilityTest extends TranscribeStreamingBaseStabilityTest { + private static TranscribeStreamingAsyncClient asyncClient; private static TranscribeStreamingAsyncClient asyncClientAlpn; - private static InputStream audioFileInputStream; @BeforeAll public static void setup() { @@ -57,9 +39,8 @@ public static void setup() { asyncClientAlpn = initClient(ProtocolNegotiation.ALPN); audioFileInputStream = getInputStream(); - if (audioFileInputStream == null) { - throw new RuntimeException("fail to get the audio input stream"); + throw new RuntimeException("Failed to get audio input stream"); } } @@ -82,56 +63,11 @@ public static void tearDown() { @RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class) public void startTranscription() { - IntFunction> futureIntFunction = i -> - asyncClient.startStreamTranscription(b -> b.mediaSampleRateHertz(8_000) - .languageCode(LanguageCode.EN_US) - .mediaEncoding(MediaEncoding.PCM), - new AudioStreamPublisher(), - new TestStartStreamTranscriptionResponseHandler()); - StabilityTestRunner.newRunner() - .futureFactory(futureIntFunction) - .totalRuns(TOTAL_RUNS) - .requestCountPerRun(CONCURRENCY) - .testName("TranscribeStreamingStabilityTest.startTranscription") - .run(); + runTranscriptionTest(asyncClient, "TranscribeStreamingStabilityTest.startTranscription"); } @RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class) public void startTranscription_alpnEnabled() { - IntFunction> futureIntFunction = i -> - asyncClientAlpn.startStreamTranscription(b -> b.mediaSampleRateHertz(8_000) - .languageCode(LanguageCode.EN_US) - .mediaEncoding(MediaEncoding.PCM), - new AudioStreamPublisher(), - new TestStartStreamTranscriptionResponseHandler()); - StabilityTestRunner.newRunner() - .futureFactory(futureIntFunction) - .totalRuns(TOTAL_RUNS) - .requestCountPerRun(CONCURRENCY) - .testName("TranscribeStreamingStabilityTest.startTranscription") - .run(); - } - - private static InputStream getInputStream() { - return TranscribeStreamingStabilityTest.class.getResourceAsStream("silence_8kHz.wav"); - } - - private static class AudioStreamPublisher implements Publisher { - - @Override - public void subscribe(Subscriber s) { - s.onSubscribe(new TestTranscribeStreamingSubscription(s, audioFileInputStream)); - } - } - - private static class TestStartStreamTranscriptionResponseHandler extends TestEventStreamingResponseHandler - implements StartStreamTranscriptionResponseHandler { - - @Override - public void onEventStream(SdkPublisher publisher) { - publisher - .filter(TranscriptEvent.class) - .subscribe(result -> log.debug(() -> "Record Batch - " + result.transcript().results())); - } + runTranscriptionTest(asyncClientAlpn, "TranscribeStreamingStabilityTest.startTranscription_alpn"); } } From 4f71fdcd343d56ff95dc5e472a7ee75712b558bd Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Mon, 9 Feb 2026 15:10:37 -0800 Subject: [PATCH 2/3] Fix tests --- .../http/crt/AwsCrtAsyncHttpClientSpiVerificationTest.java | 3 ++- .../awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientSpiVerificationTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientSpiVerificationTest.java index cc29152075bd..8de777d80155 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientSpiVerificationTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClientSpiVerificationTest.java @@ -127,7 +127,8 @@ public void requestFailed_connectionTimeout_shouldWrapException() { SdkHttpRequest request = createRequest(uri); RecordingResponseHandler recorder = new RecordingResponseHandler(); client.execute(AsyncExecuteRequest.builder().request(request).requestContentPublisher(createProvider("")).responseHandler(recorder).build()); - assertThatThrownBy(() -> recorder.completeFuture().get(5, TimeUnit.SECONDS)).hasCauseInstanceOf(ConnectException.class); + assertThatThrownBy(() -> recorder.completeFuture().get(5, TimeUnit.SECONDS)).hasCauseInstanceOf(IOException.class) + .hasMessageContaining("socket"); } } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java index 79d5475a7323..949eded552df 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java @@ -94,9 +94,8 @@ public void requestFailed_connectionTimeout_shouldWrapException() { HttpExecuteRequest.Builder executeRequestBuilder = HttpExecuteRequest.builder(); executeRequestBuilder.request(request); ExecutableHttpRequest executableRequest = client.prepareRequest(executeRequestBuilder.build()); - assertThatThrownBy(() -> executableRequest.call()).isInstanceOf(IOException.class) - .hasMessageContaining("operation timed out"); + .hasMessageContaining("socket"); } } From 05f773d9d1d412dbe4f322b1456a006c2213781d Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Tue, 10 Feb 2026 09:25:17 -0800 Subject: [PATCH 3/3] Fix checkstyle errors --- .../httpclient/async/AwsCrtH2ClientBenchmark.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtH2ClientBenchmark.java b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtH2ClientBenchmark.java index 88ed9906fab6..aa8655ad21a5 100644 --- a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtH2ClientBenchmark.java +++ b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtH2ClientBenchmark.java @@ -39,7 +39,6 @@ import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.benchmark.apicall.httpclient.SdkHttpClientBenchmark; import software.amazon.awssdk.benchmark.utils.MockH2Server; import software.amazon.awssdk.crt.Log; @@ -72,14 +71,16 @@ public class AwsCrtH2ClientBenchmark implements SdkHttpClientBenchmark { @Setup(Level.Trial) public void setup() throws Exception { - mockServer = new MockH2Server( true); + mockServer = new MockH2Server(true); mockServer.start(); sdkHttpClient = AwsCrtAsyncHttpClient.builder() - .buildWithDefaults(AttributeMap.builder() - .put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true) - .put(SdkHttpConfigurationOption.PROTOCOL, Protocol.HTTP2) - .build()); + .buildWithDefaults( + AttributeMap.builder() + .put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true) + .put(SdkHttpConfigurationOption.PROTOCOL, + Protocol.HTTP2) + .build()); client = ProtocolRestJsonAsyncClient.builder() .credentialsProvider(() -> AwsBasicCredentials.create("foo", "bar"))