diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java index c93cdc91af8e..017118594ba6 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java @@ -82,6 +82,7 @@ public boolean takeSnapshot(File snapshotDir) { @Before public void setUp() throws Exception { logger.info("[RECOVER TEST] start setting up the test env"); + TestUtils.prepareJvmForRatisTest(); final TestUtils.MiniClusterFactory factory = new TestUtils.MiniClusterFactory(); miniCluster = factory @@ -117,8 +118,12 @@ public void setUp() throws Exception { @After public void tearUp() throws Exception { logger.info("[RECOVER TEST] start tearing down the test env"); - miniCluster.cleanUp(); - logger.info("[RECOVER TEST] end tearing down the test env"); + try { + miniCluster.cleanUp(); + } finally { + TestUtils.assertNoUnexpectedRatisExit(); + logger.info("[RECOVER TEST] end tearing down the test env"); + } } /* mimics the situation before this patch */ diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java index 69725d68be6e..5cac2173396a 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java @@ -40,6 +40,7 @@ import org.apache.iotdb.consensus.ratis.utils.Utils; import org.apache.ratis.thirdparty.com.google.common.base.Preconditions; +import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.Timestamp; @@ -55,9 +56,11 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Scanner; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -70,6 +73,9 @@ public class TestUtils { private static final Logger logger = LoggerFactory.getLogger(TestUtils.class); + private static final TimeDuration PORT_RELEASE_WAIT = TimeDuration.valueOf(10, TimeUnit.SECONDS); + private static final TimeDuration PORT_RELEASE_POLL = + TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); public static class TestDataSet implements DataSet { @@ -230,6 +236,7 @@ static class MiniCluster { private final RatisConfig config; private final List servers; private final ConsensusGroup group; + private final List peerPorts; private Supplier smProvider; private final AtomicBoolean isStopped = new AtomicBoolean(false); @@ -250,9 +257,10 @@ private MiniCluster( this.peerStorage = new ArrayList<>(); this.stateMachines = new ArrayList<>(); this.servers = new ArrayList<>(); + this.peerPorts = randomDistinctPorts(replicas); for (int i = 0; i < replicas; i++) { - peers.add(new Peer(gid, i, new TEndPoint("127.0.0.1", randomFreePort()))); + peers.add(new Peer(gid, i, new TEndPoint("127.0.0.1", peerPorts.get(i)))); final File storage = storageProvider.apply(i); FileUtils.deleteFileQuietly(storage); @@ -300,6 +308,7 @@ void stop() throws IOException { for (RatisConsensus server : servers) { server.stop(); } + waitUntilPortsReleased(peerPorts); isStopped.set(true); } @@ -495,6 +504,65 @@ MiniCluster create() { } } + static void prepareJvmForRatisTest() { + ExitUtils.disableSystemExit(); + ExitUtils.clear(); + } + + static void assertNoUnexpectedRatisExit() { + ExitUtils.assertNotTerminated(); + ExitUtils.clear(); + } + + private static List randomDistinctPorts(int count) { + final List ports = new ArrayList<>(count); + final Set uniquePorts = new HashSet<>(); + while (ports.size() < count) { + final int port = randomFreePort(); + if (uniquePorts.add(port)) { + ports.add(port); + } + } + return ports; + } + + private static void waitUntilPortsReleased(List ports) throws IOException { + final Timestamp start = Timestamp.currentTime(); + while (true) { + boolean allReleased = true; + for (int port : ports) { + if (!isLocalPortAvailable(port)) { + allReleased = false; + break; + } + } + + if (allReleased) { + return; + } + + if (start.elapsedTime().compareTo(PORT_RELEASE_WAIT) > 0) { + throw new IOException("Timed out waiting for Ratis test ports to be released: " + ports); + } + + try { + PORT_RELEASE_POLL.sleep(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for Ratis test ports to be released", e); + } + } + } + + private static boolean isLocalPortAvailable(int port) { + try (ServerSocket socket = new ServerSocket(port)) { + socket.setReuseAddress(true); + return true; + } catch (IOException e) { + return false; + } + } + private static int randomFreePort() { try (ServerSocket socket = new ServerSocket(0)) { return socket.getLocalPort();