diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java index 8c61cd65eb6d3..545e8d594b91a 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.Collections; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; @@ -29,6 +30,7 @@ import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -54,20 +56,21 @@ public static KafkaResumeStrategyConfigurationBuilder getDefaultKafkaResumeStrat public void before() { Properties props = KafkaTestUtil.getDefaultProperties(service); KafkaTestUtil.createTopic(service, TOPIC, 1); - KafkaProducer producer = new KafkaProducer<>(props); - - for (int i = 0; i < 10; i++) { - producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i))); + try (KafkaProducer producer = new KafkaProducer<>(props)) { + for (int i = 0; i < 10; i++) { + producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i))); + } + producer.flush(); } } @Test @Timeout(value = 30) - public void testOffsetIsBeingChecked() throws InterruptedException { + public void testOffsetIsBeingChecked() { MockEndpoint mock = contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT); mock.expectedMessageCount(10); - mock.assertIsSatisfied(); + Awaitility.await().atMost(25, TimeUnit.SECONDS).untilAsserted(() -> mock.assertIsSatisfied()); } @AfterEach diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java index f8c33c2d07b61..183d204d5f02b 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java @@ -18,7 +18,6 @@ package org.apache.camel.component.kafka.integration.common; import java.util.Collections; -import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -32,12 +31,11 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.requests.CreateTopicsRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @@ -91,16 +89,18 @@ public static void configureKafkaComponent(CamelContext context, String bootstra } public static void createTopic(KafkaService service, String topic, int numPartitions) { - AdminClient kafkaAdminClient = createAdminClient(service); - NewTopic testTopic = new NewTopic(topic, numPartitions, CreateTopicsRequest.NO_REPLICATION_FACTOR); - kafkaAdminClient.createTopics(Collections.singleton(testTopic)); - KafkaFuture tdFuture - = kafkaAdminClient.describeTopics(Collections.singletonList(topic)).topicNameValues().get(topic); - - try { - TopicDescription td = tdFuture.get(5L, TimeUnit.SECONDS); - List pi = td.partitions(); - assertEquals(numPartitions, pi.size()); + try (AdminClient kafkaAdminClient = createAdminClient(service)) { + NewTopic testTopic = new NewTopic(topic, numPartitions, CreateTopicsRequest.NO_REPLICATION_FACTOR); + kafkaAdminClient.createTopics(Collections.singleton(testTopic)); + await().atMost(30, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .ignoreExceptions() + .untilAsserted(() -> { + TopicDescription td = kafkaAdminClient + .describeTopics(Collections.singletonList(topic)).topicNameValues().get(topic) + .get(5L, TimeUnit.SECONDS); + assertEquals(numPartitions, td.partitions().size()); + }); } catch (Exception e) { fail("Exception while creating Kafka topic", e); }