Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.camel.component.direct;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -29,6 +28,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -74,29 +74,32 @@ public void testProducerBlocksWithNoConsumers() throws Exception {

@Test
public void testProducerBlocksResumeTest() throws Exception {
getMockEndpoint("mock:result").expectedMessageCount(1);

context.getRouteController().suspendRoute("foo");

CountDownLatch producerReady = new CountDownLatch(1);
Thread mainThread = Thread.currentThread();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(new Runnable() {
@Override
public void run() {
try {
// Wait for producer to start blocking
assertTrue(producerReady.await(2, TimeUnit.SECONDS));
// Wait for the main thread to enter TIMED_WAITING state
// (blocked on condition in DirectComponent.getConsumer)
await().atMost(2, TimeUnit.SECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.until(() -> mainThread.getState() == Thread.State.TIMED_WAITING);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor observation: TIMED_WAITING is a reliable proxy here (the producer blocks on consumersCondition.await() in DirectComponent.getConsumer), but it's a heuristic — in theory the main thread could transiently enter TIMED_WAITING earlier in the call path before reaching getConsumer(). With the 10ms poll interval this is extremely unlikely in practice, and it's strictly better than the prior CountDownLatch race.

Just something to keep in mind for future maintainers if this test ever becomes flaky again — the Thread.State check is a practical workaround, not a contract.

log.info("Resuming consumer");
context.getRouteController().resumeRoute("foo");
} catch (Exception e) {
// ignore
log.error("Error in background thread", e);
}
}
});

getMockEndpoint("mock:result").expectedMessageCount(1);

// Signal that we're about to send (producer will block)
producerReady.countDown();
template.sendBody("direct:suspended?block=true&timeout=1000", "hello world");
// This call will block until the route is resumed by the background thread
template.sendBody("direct:suspended?block=true&timeout=2000", "hello world");

assertMockEndpointsSatisfied();

Expand Down
Loading