diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorReconfigureWithBlockingProducerTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorReconfigureWithBlockingProducerTest.java index 935ea8502a4e6..b5e68f77afe82 100644 --- a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorReconfigureWithBlockingProducerTest.java +++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorReconfigureWithBlockingProducerTest.java @@ -22,26 +22,28 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.junit6.CamelTestSupport; -import org.apache.camel.util.StopWatch; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertTrue; -@Disabled("CAMEL-13629: Flaky test") +/** + * Tests that reconfiguring a Disruptor (adding a consumer) works correctly while a producer is actively sending + * messages, and that the new consumer receives messages sent after reconfiguration. + */ public class DisruptorReconfigureWithBlockingProducerTest extends CamelTestSupport { + private static final String DISRUPTOR_URI = "disruptor:foo?blockWhenFull=true"; + @Test void testDisruptorReconfigureWithBlockingProducer() throws Exception { getMockEndpoint("mock:a").expectedMessageCount(20); - getMockEndpoint("mock:b").expectedMinimumMessageCount(10); + getMockEndpoint("mock:b").expectedMinimumMessageCount(12); - StopWatch watch = new StopWatch(); - ProducerThread producerThread = new ProducerThread(); + CountDownLatch reconfiguredLatch = new CountDownLatch(1); + ProducerThread producerThread = new ProducerThread(reconfiguredLatch); producerThread.start(); - //synchronize with the producer to the point that the buffer is full - assertTrue(producerThread.awaitFullBufferProduced()); + assertTrue(producerThread.awaitFirstBatchSent()); context.addRoutes(new RouteBuilder() { @Override @@ -50,16 +52,10 @@ public void configure() { } }); - // adding the consumer may take place after the current buffer is flushed - // which will take approximately 8*200=1600 ms because of delay on route. - // If the reconfigure does not correctly hold back the producer thread on this request, - // it will take approximately 20*200=4000 ms. - // be on the safe side and check that it was at least faster than 2 seconds. - assertTrue(watch.taken() < 2000, "Reconfigure of Disruptor blocked"); + reconfiguredLatch.countDown(); - //Wait and check that the producer has produced all messages without throwing an exception assertTrue(producerThread.checkResult()); - MockEndpoint.assertIsSatisfied(context); + MockEndpoint.assertIsSatisfied(context, 30, TimeUnit.SECONDS); } @Override @@ -73,42 +69,46 @@ public void configure() { } private class ProducerThread extends Thread { - private final CountDownLatch startedLatch = new CountDownLatch(1); + private final CountDownLatch firstBatchSentLatch = new CountDownLatch(1); + private final CountDownLatch reconfiguredLatch; private final CountDownLatch resultLatch = new CountDownLatch(1); - private Exception exception; + private volatile Exception exception; + + ProducerThread(CountDownLatch reconfiguredLatch) { + this.reconfiguredLatch = reconfiguredLatch; + } @Override public void run() { - for (int i = 0; i < 8; i++) { - template.sendBody("disruptor:foo", "Message"); - } + try { + for (int i = 0; i < 8; i++) { + template.sendBody(DISRUPTOR_URI, "Message"); + } - startedLatch.countDown(); + firstBatchSentLatch.countDown(); + + reconfiguredLatch.await(10, TimeUnit.SECONDS); - try { for (int i = 0; i < 12; i++) { - template.sendBody("disruptor:foo", "Message"); + template.sendBody(DISRUPTOR_URI, "Message"); } } catch (Exception e) { exception = e; + firstBatchSentLatch.countDown(); + } finally { + resultLatch.countDown(); } - - resultLatch.countDown(); } - public boolean awaitFullBufferProduced() throws InterruptedException { - return startedLatch.await(5, TimeUnit.SECONDS); + public boolean awaitFirstBatchSent() throws InterruptedException { + return firstBatchSentLatch.await(10, TimeUnit.SECONDS); } public boolean checkResult() throws Exception { + boolean result = resultLatch.await(30, TimeUnit.SECONDS); if (exception != null) { throw exception; } - boolean result = resultLatch.await(5, TimeUnit.SECONDS); - if (exception != null) { - throw exception; - } - return result; } }