diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index c37ab838f88..cf08e744b23 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -223,6 +223,11 @@
tools
test
+
+ org.awaitility
+ awaitility
+ test
+
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java
index 9146d379980..9a01a8c5164 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java
@@ -1,5 +1,7 @@
package com.datastax.oss.driver.core.metadata;
+import static org.awaitility.Awaitility.await;
+
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
@@ -28,6 +30,7 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.Function;
import java.util.function.Supplier;
+import org.awaitility.core.ConditionTimeoutException;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -340,16 +343,15 @@ public void should_receive_each_tablet_exactly_once() {
}
private static boolean waitSessionLearnedTabletInfo(CqlSession session) {
- if (isSessionLearnedTabletInfo(session)) {
- return true;
- }
- // Wait till tablet update, which is async, is completed
try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ await()
+ .atMost(Duration.ofSeconds(5))
+ .pollInterval(Duration.ofMillis(50))
+ .until(() -> isSessionLearnedTabletInfo(session));
+ return true;
+ } catch (ConditionTimeoutException e) {
+ return false;
}
- return isSessionLearnedTabletInfo(session);
}
private static boolean checkIfRoutedProperly(CqlSession session, Statement stmt) {
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java
index e6121217619..ac9c2673875 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java
@@ -174,11 +174,12 @@ public void should_evict_down_node_metrics_when_timeout_fires() throws Exception
// trigger node1 UP -> DOWN
eventBus.fire(NodeStateEvent.changed(NodeState.UP, NodeState.DOWN, node1));
- Thread.sleep(expireAfter.toMillis());
-
// then node-level metrics should be evicted from node1, but
// node2 and node3 metrics should not have been evicted
- await().untilAsserted(() -> assertNodeMetricsEvicted(session, node1));
+ await()
+ .atMost(expireAfter.plusSeconds(5))
+ .pollInterval(Duration.ofMillis(100))
+ .untilAsserted(() -> assertNodeMetricsEvicted(session, node1));
assertNodeMetricsNotEvicted(session, node2);
assertNodeMetricsNotEvicted(session, node3);
@@ -219,19 +220,25 @@ public void should_not_evict_down_node_metrics_when_node_is_back_up_before_timeo
eventBus.fire(NodeStateEvent.changed(NodeState.UP, NodeState.FORCED_DOWN, node2));
eventBus.fire(NodeStateEvent.removed(node3));
- Thread.sleep(500);
+ // Wait for half the expiry window before bringing nodes back up
+ await().pollDelay(Duration.ofMillis(500)).atMost(Duration.ofSeconds(5)).until(() -> true);
// trigger nodes DOWN -> UP, should cancel the timeouts
eventBus.fire(NodeStateEvent.changed(NodeState.DOWN, NodeState.UP, node1));
eventBus.fire(NodeStateEvent.changed(NodeState.FORCED_DOWN, NodeState.UP, node2));
eventBus.fire(NodeStateEvent.added(node3));
- Thread.sleep(expireAfter.toMillis());
-
- // then no node-level metrics should be evicted
- assertNodeMetricsNotEvicted(session, node1);
- assertNodeMetricsNotEvicted(session, node2);
- assertNodeMetricsNotEvicted(session, node3);
+ // Wait for the full expiry duration and verify metrics are never evicted
+ await()
+ .during(expireAfter)
+ .atMost(expireAfter.plusSeconds(5))
+ .pollInterval(Duration.ofMillis(200))
+ .untilAsserted(
+ () -> {
+ assertNodeMetricsNotEvicted(session, node1);
+ assertNodeMetricsNotEvicted(session, node2);
+ assertNodeMetricsNotEvicted(session, node3);
+ });
} finally {
AbstractMetricUpdater.MIN_EXPIRE_AFTER = Duration.ofMinutes(5);
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java
index 0498462ce1c..4e9eefebf63 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java
@@ -34,14 +34,11 @@
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
-import com.datastax.oss.driver.api.core.cql.SimpleStatement;
-import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge;
import com.datastax.oss.driver.categories.IsolatedTests;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
import java.net.InetSocketAddress;
-import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -231,30 +228,7 @@ public void cannot_reconnect_with_resolved_socket() {
ccmBridge.create();
ccmBridge.start();
session = builder.build();
- long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
- while (System.currentTimeMillis() < endTime) {
- try {
- nodes = session.getMetadata().getNodes().values();
- int upNodes = 0;
- for (Node node : nodes) {
- if (node.getUpSinceMillis() > 0) {
- upNodes++;
- }
- }
- if (upNodes == 3) {
- break;
- }
- // session.refreshSchema();
- SimpleStatement statement =
- new SimpleStatementBuilder("select * from system.local where key='local'")
- .setTimeout(Duration.ofSeconds(3))
- .build();
- session.executeAsync(statement);
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- break;
- }
- }
+ waitForAllNodesUp(session, 3);
ResultSet rs = session.execute("select * from system.local where key='local'");
assertThat(rs).isNotNull();
Row row = rs.one();
@@ -291,29 +265,7 @@ public void cannot_reconnect_with_resolved_socket() {
"test.cluster.fake", ccmBridge.getNodeIpAddress(3));
ccmBridge.create();
ccmBridge.start();
- long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
- while (System.currentTimeMillis() < endTime) {
- try {
- nodes = session.getMetadata().getNodes().values();
- int upNodes = 0;
- for (Node node : nodes) {
- if (node.getUpSinceMillis() > 0) {
- upNodes++;
- }
- }
- if (upNodes == 3) {
- break;
- }
- SimpleStatement statement =
- new SimpleStatementBuilder("select * from system.local where key='local'")
- .setTimeout(Duration.ofSeconds(3))
- .build();
- session.executeAsync(statement);
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- break;
- }
- }
+ waitForAllNodesUp(session, 3);
nodes = session.getMetadata().getNodes().values();
assertThat(nodes).hasSize(3);
Iterator iterator = nodes.iterator();
@@ -346,30 +298,7 @@ public void cannot_reconnect_with_resolved_socket() {
// Now the driver should fail to reconnect since unresolved hostname is gone.
ccmBridge.create();
ccmBridge.start();
- long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
- while (System.currentTimeMillis() < endTime) {
- try {
- nodes = session.getMetadata().getNodes().values();
- int upNodes = 0;
- for (Node node : nodes) {
- if (node.getUpSinceMillis() > 0) {
- upNodes++;
- }
- }
- if (upNodes == 3) {
- break;
- }
- // session.refreshSchema();
- SimpleStatement statement =
- new SimpleStatementBuilder("select * from system.local where key='local'")
- .setTimeout(Duration.ofSeconds(3))
- .build();
- session.executeAsync(statement);
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- break;
- }
- }
+ waitForAllNodesUp(session, 3);
session.execute("select * from system.local where key='local'");
}
session.close();