Skip to content

Commit 5d26ab3

Browse files
committed
fix(test): use kafka native for integration tests
1 parent 69190f7 commit 5d26ab3

5 files changed

Lines changed: 46 additions & 37 deletions

File tree

providers/jikkou-provider-kafka-connect/src/integration-test/java/io/streamthoughts/jikkou/kafka/connect/AbstractKafkaConnectorIT.java

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import org.slf4j.Logger;
2222
import org.slf4j.LoggerFactory;
2323
import org.testcontainers.containers.GenericContainer;
24-
import org.testcontainers.containers.KafkaContainer;
2524
import org.testcontainers.containers.Network;
2625
import org.testcontainers.containers.output.Slf4jLogConsumer;
2726
import org.testcontainers.junit.jupiter.Container;
2827
import org.testcontainers.junit.jupiter.Testcontainers;
28+
import org.testcontainers.kafka.KafkaContainer;
2929
import org.testcontainers.utility.DockerImageName;
3030

3131
@Testcontainers
@@ -35,40 +35,43 @@ public class AbstractKafkaConnectorIT {
3535

3636
private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaConnectorIT.class);
3737

38-
public static final String CONFLUENT_PLATFORM_VERSION = "7.5.0";
38+
public static final String APACHE_KAFKA_VERSION = "4.1.1";
39+
public static final String CONFLUENT_PLATFORM_VERSION = "8.1.1";
3940
private static final Network KAFKA_NETWORK = Network.newNetwork();
4041
public static final String KAFKA_CONNECTOR_NAME = "test";
42+
4143
@Container
4244
final KafkaContainer kafka = new KafkaContainer(
43-
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)).withKraft()
44-
.withNetwork(KAFKA_NETWORK)
45-
.withNetworkAliases("broker")
46-
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
47-
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
48-
.withLogConsumer(new Slf4jLogConsumer(LOG));
45+
DockerImageName.parse("apache/kafka-native").withTag(APACHE_KAFKA_VERSION))
46+
.withNetworkAliases("kafka")
47+
.withNetwork(KAFKA_NETWORK)
48+
.withListener("kafka:19092")
49+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
50+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
51+
.withLogConsumer(new Slf4jLogConsumer(LOG));
4952

5053
@Container
5154
final GenericContainer connect = new GenericContainer(
52-
DockerImageName.parse("confluentinc/cp-kafka-connect").withTag(CONFLUENT_PLATFORM_VERSION))
53-
.withEnv("CONNECT_BOOTSTRAP_SERVERS", "PLAINTEXT://broker:9092")
54-
.withEnv("CONNECT_REST_PORT", "8083")
55-
.withEnv("CONNECT_GROUP_ID", "kafka-connect")
56-
.withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "_connect-configs")
57-
.withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "_connect-offsets")
58-
.withEnv("CONNECT_STATUS_STORAGE_TOPIC", "_connect-status")
59-
.withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
60-
.withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
61-
.withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "kafka-connect")
62-
.withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1")
63-
.withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1")
64-
.withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1")
65-
.withEnv("CONNECT_PLUGIN_PATH", "/usr/local/share/kafka/plugins,/usr/share/filestream-connectors")
66-
.withExposedPorts(8083)
67-
.withNetwork(KAFKA_NETWORK)
68-
.withNetworkAliases("kafka-connect")
69-
.dependsOn(kafka)
70-
.withLogConsumer(new Slf4jLogConsumer(LOG))
71-
.waitingFor(forHttp("/connector-plugins"));
55+
DockerImageName.parse("confluentinc/cp-kafka-connect").withTag(CONFLUENT_PLATFORM_VERSION))
56+
.withEnv("CONNECT_BOOTSTRAP_SERVERS", "kafka:19092")
57+
.withEnv("CONNECT_REST_PORT", "8083")
58+
.withEnv("CONNECT_GROUP_ID", "kafka-connect")
59+
.withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "_connect-configs")
60+
.withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "_connect-offsets")
61+
.withEnv("CONNECT_STATUS_STORAGE_TOPIC", "_connect-status")
62+
.withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
63+
.withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
64+
.withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "kafka-connect")
65+
.withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1")
66+
.withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1")
67+
.withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1")
68+
.withEnv("CONNECT_PLUGIN_PATH", "/usr/local/share/kafka/plugins,/usr/share/filestream-connectors")
69+
.withExposedPorts(8083)
70+
.withNetwork(KAFKA_NETWORK)
71+
.withNetworkAliases("kafka-connect")
72+
.dependsOn(kafka)
73+
.withLogConsumer(new Slf4jLogConsumer(LOG))
74+
.waitingFor(forHttp("/connector-plugins"));
7275

7376
@NotNull
7477
protected String getConnectUrl() {
@@ -77,7 +80,7 @@ protected String getConnectUrl() {
7780

7881

7982
protected void deployFilestreamSinkConnectorAndWait() throws URISyntaxException, IOException, InterruptedException {
80-
try(HttpClient httpClient = HttpClient.newHttpClient();) {
83+
try (HttpClient httpClient = HttpClient.newHttpClient();) {
8184
HttpRequest createOrUpdateConnectorConfig = HttpRequest.newBuilder()
8285
.uri(new URI(getConnectUrl() + "/connectors/test/config"))
8386
.header("Content-Type", "application/json")

providers/jikkou-provider-kafka/src/integration-test/java/io/streamthoughts/jikkou/kafka/AbstractKafkaIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class AbstractKafkaIntegrationTest {
3535
private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaIntegrationTest.class);
3636
private static final Network KAFKA_NETWORK = Network.newNetwork();
3737

38-
public static final String APACHE_KAFKA_VERSION = "3.8.0";
38+
public static final String APACHE_KAFKA_VERSION = "4.1.1";
3939
public static final int DEFAULT_NUM_PARTITIONS = 1;
4040
public static final short DEFAULT_REPLICATION_FACTOR = (short) 1;
4141

providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/AbstractIntegrationTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
import org.junit.jupiter.api.TestMethodOrder;
1717
import org.slf4j.Logger;
1818
import org.slf4j.LoggerFactory;
19-
import org.testcontainers.containers.KafkaContainer;
2019
import org.testcontainers.containers.Network;
2120
import org.testcontainers.containers.output.Slf4jLogConsumer;
2221
import org.testcontainers.junit.jupiter.Container;
2322
import org.testcontainers.junit.jupiter.Testcontainers;
23+
import org.testcontainers.kafka.KafkaContainer;
2424
import org.testcontainers.utility.DockerImageName;
2525

2626
@Testcontainers
@@ -47,12 +47,15 @@ public class AbstractIntegrationTest {
4747

4848
private static final Network KAFKA_NETWORK = Network.newNetwork();
4949

50-
public static final String CONFLUENT_PLATFORM_VERSION = "8.0.3";
50+
public static final String APACHE_KAFKA_VERSION = "4.1.1";
51+
public static final String CONFLUENT_PLATFORM_VERSION = "8.1.1";
5152

5253
@Container
5354
final KafkaContainer kafkaContainer = new KafkaContainer(
54-
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)).withKraft()
55+
DockerImageName.parse("apache/kafka-native").withTag(APACHE_KAFKA_VERSION))
5556
.withNetwork(KAFKA_NETWORK)
57+
.withNetworkAliases("kafka")
58+
.withListener("kafka:19092")
5659
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
5760
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
5861
.withLogConsumer(new Slf4jLogConsumer(LOG));

providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApiTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77
package io.streamthoughts.jikkou.schema.registry.api;
88

9+
import static io.streamthoughts.jikkou.schema.registry.AbstractIntegrationTest.APACHE_KAFKA_VERSION;
910
import static io.streamthoughts.jikkou.schema.registry.AbstractIntegrationTest.CONFLUENT_PLATFORM_VERSION;
1011

1112
import io.streamthoughts.jikkou.core.data.SchemaType;
@@ -29,11 +30,11 @@
2930
import org.junit.jupiter.api.TestMethodOrder;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
32-
import org.testcontainers.containers.KafkaContainer;
3333
import org.testcontainers.containers.Network;
3434
import org.testcontainers.containers.output.Slf4jLogConsumer;
3535
import org.testcontainers.junit.jupiter.Container;
3636
import org.testcontainers.junit.jupiter.Testcontainers;
37+
import org.testcontainers.kafka.KafkaContainer;
3738
import org.testcontainers.utility.DockerImageName;
3839
import reactor.core.publisher.Mono;
3940

@@ -48,8 +49,10 @@ class AsyncSchemaRegistryApiTest {
4849

4950
@Container
5051
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(
51-
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)).withKraft()
52+
DockerImageName.parse("apache/kafka-native").withTag(APACHE_KAFKA_VERSION))
5253
.withNetwork(KAFKA_NETWORK)
54+
.withNetworkAliases("broker")
55+
.withListener("broker:19092")
5356
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
5457
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
5558
.withLogConsumer(new Slf4jLogConsumer(LOG));

providers/jikkou-provider-schema-registry/src/integration-test/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
package io.streamthoughts.jikkou.schema.registry.api;
88

99
import org.testcontainers.containers.GenericContainer;
10-
import org.testcontainers.containers.KafkaContainer;
1110
import org.testcontainers.containers.Network;
1211
import org.testcontainers.containers.wait.strategy.Wait;
12+
import org.testcontainers.kafka.KafkaContainer;
1313

1414
public class SchemaRegistryContainer extends GenericContainer<SchemaRegistryContainer> {
1515

@@ -29,7 +29,7 @@ public SchemaRegistryContainer(String version) {
2929
}
3030

3131
public SchemaRegistryContainer withKafka(KafkaContainer kafka) {
32-
return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + ":9092");
32+
return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + ":19092");
3333
}
3434

3535
public SchemaRegistryContainer withKafka(Network network, String bootstrapServers) {

0 commit comments

Comments
 (0)