Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public boolean takeSnapshot(File snapshotDir) {
@Before
public void setUp() throws Exception {
logger.info("[RECOVER TEST] start setting up the test env");
TestUtils.prepareJvmForRatisTest();
final TestUtils.MiniClusterFactory factory = new TestUtils.MiniClusterFactory();
miniCluster =
factory
Expand Down Expand Up @@ -117,8 +118,12 @@ public void setUp() throws Exception {
@After
public void tearUp() throws Exception {
logger.info("[RECOVER TEST] start tearing down the test env");
miniCluster.cleanUp();
logger.info("[RECOVER TEST] end tearing down the test env");
try {
miniCluster.cleanUp();
} finally {
TestUtils.assertNoUnexpectedRatisExit();
logger.info("[RECOVER TEST] end tearing down the test env");
}
}

/* mimics the situation before this patch */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iotdb.consensus.ratis.utils.Utils;

import org.apache.ratis.thirdparty.com.google.common.base.Preconditions;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
Expand All @@ -55,9 +56,11 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand All @@ -70,6 +73,9 @@
public class TestUtils {

private static final Logger logger = LoggerFactory.getLogger(TestUtils.class);
private static final TimeDuration PORT_RELEASE_WAIT = TimeDuration.valueOf(10, TimeUnit.SECONDS);
private static final TimeDuration PORT_RELEASE_POLL =
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);

public static class TestDataSet implements DataSet {

Expand Down Expand Up @@ -230,6 +236,7 @@ static class MiniCluster {
private final RatisConfig config;
private final List<RatisConsensus> servers;
private final ConsensusGroup group;
private final List<Integer> peerPorts;
private Supplier<IStateMachine> smProvider;
private final AtomicBoolean isStopped = new AtomicBoolean(false);

Expand All @@ -250,9 +257,10 @@ private MiniCluster(
this.peerStorage = new ArrayList<>();
this.stateMachines = new ArrayList<>();
this.servers = new ArrayList<>();
this.peerPorts = randomDistinctPorts(replicas);

for (int i = 0; i < replicas; i++) {
peers.add(new Peer(gid, i, new TEndPoint("127.0.0.1", randomFreePort())));
peers.add(new Peer(gid, i, new TEndPoint("127.0.0.1", peerPorts.get(i))));

final File storage = storageProvider.apply(i);
FileUtils.deleteFileQuietly(storage);
Expand Down Expand Up @@ -300,6 +308,7 @@ void stop() throws IOException {
for (RatisConsensus server : servers) {
server.stop();
}
waitUntilPortsReleased(peerPorts);
isStopped.set(true);
}

Expand Down Expand Up @@ -495,6 +504,65 @@ MiniCluster create() {
}
}

static void prepareJvmForRatisTest() {
ExitUtils.disableSystemExit();
ExitUtils.clear();
}

static void assertNoUnexpectedRatisExit() {
ExitUtils.assertNotTerminated();
ExitUtils.clear();
}

private static List<Integer> randomDistinctPorts(int count) {
final List<Integer> ports = new ArrayList<>(count);
final Set<Integer> uniquePorts = new HashSet<>();
while (ports.size() < count) {
final int port = randomFreePort();
if (uniquePorts.add(port)) {
ports.add(port);
}
}
return ports;
}

private static void waitUntilPortsReleased(List<Integer> ports) throws IOException {
final Timestamp start = Timestamp.currentTime();
while (true) {
boolean allReleased = true;
for (int port : ports) {
if (!isLocalPortAvailable(port)) {
allReleased = false;
break;
}
}

if (allReleased) {
return;
}

if (start.elapsedTime().compareTo(PORT_RELEASE_WAIT) > 0) {
throw new IOException("Timed out waiting for Ratis test ports to be released: " + ports);
}

try {
PORT_RELEASE_POLL.sleep();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting for Ratis test ports to be released", e);
}
}
}

private static boolean isLocalPortAvailable(int port) {
try (ServerSocket socket = new ServerSocket(port)) {
socket.setReuseAddress(true);
return true;
} catch (IOException e) {
return false;
}
}

private static int randomFreePort() {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
Expand Down
Loading