From 86991b49912dc051519171b7270c4186bad2f516 Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Tue, 5 May 2026 23:12:47 +0200 Subject: [PATCH 1/3] CAMEL-23430: Fix flaky tests in camel-kafka Co-Authored-By: Claude Opus 4.6 (1M context) --- ...afkaConsumerAutoInstResumeRouteStrategyIT.java | 15 +++++++++------ .../kafka/integration/common/KafkaTestUtil.java | 15 +++++++-------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java index 8c61cd65eb6d3..915f9b4df111d 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.Collections; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; @@ -29,6 +30,7 @@ import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -54,20 +56,21 @@ public static KafkaResumeStrategyConfigurationBuilder getDefaultKafkaResumeStrat public void before() { Properties props = KafkaTestUtil.getDefaultProperties(service); KafkaTestUtil.createTopic(service, TOPIC, 1); - KafkaProducer producer = new KafkaProducer<>(props); - - for (int i = 0; i < 10; i++) { - producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i))); + try (KafkaProducer producer = new KafkaProducer<>(props)) { + for (int i = 0; i < 10; i++) { + producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i))); + } + producer.flush(); } } @Test @Timeout(value = 30) - public void testOffsetIsBeingChecked() throws InterruptedException { + public void testOffsetIsBeingChecked() { MockEndpoint mock = contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT); mock.expectedMessageCount(10); - mock.assertIsSatisfied(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> mock.assertIsSatisfied()); } @AfterEach diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java index f8c33c2d07b61..40e34f0941424 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java @@ -91,14 +91,13 @@ public static void configureKafkaComponent(CamelContext context, String bootstra } public static void createTopic(KafkaService service, String topic, int numPartitions) { - AdminClient kafkaAdminClient = createAdminClient(service); - NewTopic testTopic = new NewTopic(topic, numPartitions, CreateTopicsRequest.NO_REPLICATION_FACTOR); - kafkaAdminClient.createTopics(Collections.singleton(testTopic)); - KafkaFuture tdFuture - = kafkaAdminClient.describeTopics(Collections.singletonList(topic)).topicNameValues().get(topic); - - try { - TopicDescription td = tdFuture.get(5L, TimeUnit.SECONDS); + try (AdminClient kafkaAdminClient = createAdminClient(service)) { + NewTopic testTopic = new NewTopic(topic, numPartitions, CreateTopicsRequest.NO_REPLICATION_FACTOR); + kafkaAdminClient.createTopics(Collections.singleton(testTopic)).all().get(30L, TimeUnit.SECONDS); + KafkaFuture tdFuture + = kafkaAdminClient.describeTopics(Collections.singletonList(topic)).topicNameValues().get(topic); + + TopicDescription td = tdFuture.get(30L, TimeUnit.SECONDS); List pi = td.partitions(); assertEquals(numPartitions, pi.size()); } catch (Exception e) { From 919e356bd7b63a2c4fafe43dd129c48f48dd1237 Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Wed, 6 May 2026 08:28:03 +0200 Subject: [PATCH 2/3] CAMEL-23430: Do not await createTopics result to avoid TopicExistsException on retries Co-Authored-By: Claude Opus 4.6 (1M context) --- .../camel/component/kafka/integration/common/KafkaTestUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java index 40e34f0941424..871457970bd5b 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java @@ -93,7 +93,7 @@ public static void configureKafkaComponent(CamelContext context, String bootstra public static void createTopic(KafkaService service, String topic, int numPartitions) { try (AdminClient kafkaAdminClient = createAdminClient(service)) { NewTopic testTopic = new NewTopic(topic, numPartitions, CreateTopicsRequest.NO_REPLICATION_FACTOR); - kafkaAdminClient.createTopics(Collections.singleton(testTopic)).all().get(30L, TimeUnit.SECONDS); + kafkaAdminClient.createTopics(Collections.singleton(testTopic)); KafkaFuture tdFuture = kafkaAdminClient.describeTopics(Collections.singletonList(topic)).topicNameValues().get(topic); From 4189a627cb648223d2b28e0f8adf4d53e669dcb9 Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Wed, 6 May 2026 11:06:35 +0200 Subject: [PATCH 3/3] CAMEL-23430: Address review feedback - use Awaitility retry for topic creation, lower assertion timeout Co-Authored-By: Claude Opus 4.6 (1M context) --- ...ConsumerAutoInstResumeRouteStrategyIT.java | 2 +- .../integration/common/KafkaTestUtil.java | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java index 915f9b4df111d..545e8d594b91a 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java @@ -70,7 +70,7 @@ public void testOffsetIsBeingChecked() { MockEndpoint mock = contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT); mock.expectedMessageCount(10); - Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> mock.assertIsSatisfied()); + Awaitility.await().atMost(25, TimeUnit.SECONDS).untilAsserted(() -> mock.assertIsSatisfied()); } @AfterEach diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java index 871457970bd5b..183d204d5f02b 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java @@ -18,7 +18,6 @@ package org.apache.camel.component.kafka.integration.common; import java.util.Collections; -import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -32,12 +31,11 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.requests.CreateTopicsRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @@ -94,12 +92,15 @@ public static void createTopic(KafkaService service, String topic, int numPartit try (AdminClient kafkaAdminClient = createAdminClient(service)) { NewTopic testTopic = new NewTopic(topic, numPartitions, CreateTopicsRequest.NO_REPLICATION_FACTOR); kafkaAdminClient.createTopics(Collections.singleton(testTopic)); - KafkaFuture tdFuture - = kafkaAdminClient.describeTopics(Collections.singletonList(topic)).topicNameValues().get(topic); - - TopicDescription td = tdFuture.get(30L, TimeUnit.SECONDS); - List pi = td.partitions(); - assertEquals(numPartitions, pi.size()); + await().atMost(30, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .ignoreExceptions() + .untilAsserted(() -> { + TopicDescription td = kafkaAdminClient + .describeTopics(Collections.singletonList(topic)).topicNameValues().get(topic) + .get(5L, TimeUnit.SECONDS); + assertEquals(numPartitions, td.partitions().size()); + }); } catch (Exception e) { fail("Exception while creating Kafka topic", e); }