From 10f7162c3980af4815c4b0cdcaf296ace7a35263 Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Wed, 6 May 2026 10:24:12 +0200 Subject: [PATCH] CAMEL-13629: Fix flaky DisruptorReconfigureWithBlockingProducerTest The test was disabled since 2019 due to multiple flakiness issues: - Fragile wall-clock timing assertion (< 2000ms) with only 400ms margin over the theoretical 1600ms drain time, easily exceeded on slow CI. - Race between producer and reconfiguration: the ConsumerEventHandler processes events asynchronously (calls processor.process with a no-op callback and returns immediately), so the ring buffer drains in microseconds. The producer could send all messages to the OLD disruptor before addRoutes() nullified the reference, causing mock:b to receive far fewer messages than expected. - Non-volatile exception field across threads and tight 5-second timeouts. Fix by using explicit CountDownLatch synchronization: the producer sends a first batch, signals, then waits for reconfiguration to complete before sending the second batch. This ensures the second batch deterministically goes through both consumers. Also use blockWhenFull=true (matching the test name), increase timeouts, and make the exception field volatile. Co-Authored-By: Claude Opus 4.6 (1M context) --- ...orReconfigureWithBlockingProducerTest.java | 66 +++++++++---------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorReconfigureWithBlockingProducerTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorReconfigureWithBlockingProducerTest.java index 935ea8502a4e6..b5e68f77afe82 100644 --- a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorReconfigureWithBlockingProducerTest.java +++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorReconfigureWithBlockingProducerTest.java @@ -22,26 +22,28 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.junit6.CamelTestSupport; -import org.apache.camel.util.StopWatch; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertTrue; -@Disabled("CAMEL-13629: Flaky test") +/** + * Tests that reconfiguring a Disruptor (adding a consumer) works correctly while a producer is actively sending + * messages, and that the new consumer receives messages sent after reconfiguration. + */ public class DisruptorReconfigureWithBlockingProducerTest extends CamelTestSupport { + private static final String DISRUPTOR_URI = "disruptor:foo?blockWhenFull=true"; + @Test void testDisruptorReconfigureWithBlockingProducer() throws Exception { getMockEndpoint("mock:a").expectedMessageCount(20); - getMockEndpoint("mock:b").expectedMinimumMessageCount(10); + getMockEndpoint("mock:b").expectedMinimumMessageCount(12); - StopWatch watch = new StopWatch(); - ProducerThread producerThread = new ProducerThread(); + CountDownLatch reconfiguredLatch = new CountDownLatch(1); + ProducerThread producerThread = new ProducerThread(reconfiguredLatch); producerThread.start(); - //synchronize with the producer to the point that the buffer is full - assertTrue(producerThread.awaitFullBufferProduced()); + assertTrue(producerThread.awaitFirstBatchSent()); context.addRoutes(new RouteBuilder() { @Override @@ -50,16 +52,10 @@ public void configure() { } }); - // adding the consumer may take place after the current buffer is flushed - // which will take approximately 8*200=1600 ms because of delay on route. - // If the reconfigure does not correctly hold back the producer thread on this request, - // it will take approximately 20*200=4000 ms. - // be on the safe side and check that it was at least faster than 2 seconds. - assertTrue(watch.taken() < 2000, "Reconfigure of Disruptor blocked"); + reconfiguredLatch.countDown(); - //Wait and check that the producer has produced all messages without throwing an exception assertTrue(producerThread.checkResult()); - MockEndpoint.assertIsSatisfied(context); + MockEndpoint.assertIsSatisfied(context, 30, TimeUnit.SECONDS); } @Override @@ -73,42 +69,46 @@ public void configure() { } private class ProducerThread extends Thread { - private final CountDownLatch startedLatch = new CountDownLatch(1); + private final CountDownLatch firstBatchSentLatch = new CountDownLatch(1); + private final CountDownLatch reconfiguredLatch; private final CountDownLatch resultLatch = new CountDownLatch(1); - private Exception exception; + private volatile Exception exception; + + ProducerThread(CountDownLatch reconfiguredLatch) { + this.reconfiguredLatch = reconfiguredLatch; + } @Override public void run() { - for (int i = 0; i < 8; i++) { - template.sendBody("disruptor:foo", "Message"); - } + try { + for (int i = 0; i < 8; i++) { + template.sendBody(DISRUPTOR_URI, "Message"); + } - startedLatch.countDown(); + firstBatchSentLatch.countDown(); + + reconfiguredLatch.await(10, TimeUnit.SECONDS); - try { for (int i = 0; i < 12; i++) { - template.sendBody("disruptor:foo", "Message"); + template.sendBody(DISRUPTOR_URI, "Message"); } } catch (Exception e) { exception = e; + firstBatchSentLatch.countDown(); + } finally { + resultLatch.countDown(); } - - resultLatch.countDown(); } - public boolean awaitFullBufferProduced() throws InterruptedException { - return startedLatch.await(5, TimeUnit.SECONDS); + public boolean awaitFirstBatchSent() throws InterruptedException { + return firstBatchSentLatch.await(10, TimeUnit.SECONDS); } public boolean checkResult() throws Exception { + boolean result = resultLatch.await(30, TimeUnit.SECONDS); if (exception != null) { throw exception; } - boolean result = resultLatch.await(5, TimeUnit.SECONDS); - if (exception != null) { - throw exception; - } - return result; } }