From 50cca31a6a1ab2a045f9be355031a0be85db2f85 Mon Sep 17 00:00:00 2001 From: Torsten Mielke Date: Fri, 26 Jun 2026 13:56:45 +0200 Subject: [PATCH] CAMEL-19549: camel-core: replace Thread.sleep in tests Additional improvement to unit test DirectProducerBlockingTest. Introducing better coordination between producer blocking and route resumption, eliminating race conditions on slow hardware. Made with help from AI tools. --- .../direct/DirectProducerBlockingTest.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java b/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java index 1c2ca3b21de66..4ad06aa15819c 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.direct; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -29,6 +28,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -74,29 +74,32 @@ public void testProducerBlocksWithNoConsumers() throws Exception { @Test public void testProducerBlocksResumeTest() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + context.getRouteController().suspendRoute("foo"); - CountDownLatch producerReady = new CountDownLatch(1); + Thread mainThread = Thread.currentThread(); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(new Runnable() { @Override public void run() { try { - // Wait for producer to start blocking - assertTrue(producerReady.await(2, TimeUnit.SECONDS)); + // Wait for the main thread to enter TIMED_WAITING state + // (blocked on condition in DirectComponent.getConsumer) + await().atMost(2, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> mainThread.getState() == Thread.State.TIMED_WAITING); + log.info("Resuming consumer"); context.getRouteController().resumeRoute("foo"); } catch (Exception e) { - // ignore + log.error("Error in background thread", e); } } }); - getMockEndpoint("mock:result").expectedMessageCount(1); - - // Signal that we're about to send (producer will block) - producerReady.countDown(); - template.sendBody("direct:suspended?block=true&timeout=1000", "hello world"); + // This call will block until the route is resumed by the background thread + template.sendBody("direct:suspended?block=true&timeout=2000", "hello world"); assertMockEndpointsSatisfied();