diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index c37ab838f88..cf08e744b23 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -223,6 +223,11 @@ tools test + + org.awaitility + awaitility + test + diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java index 9146d379980..9a01a8c5164 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java @@ -1,5 +1,7 @@ package com.datastax.oss.driver.core.metadata; +import static org.awaitility.Awaitility.await; + import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; @@ -28,6 +30,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.function.Function; import java.util.function.Supplier; +import org.awaitility.core.ConditionTimeoutException; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -340,16 +343,15 @@ public void should_receive_each_tablet_exactly_once() { } private static boolean waitSessionLearnedTabletInfo(CqlSession session) { - if (isSessionLearnedTabletInfo(session)) { - return true; - } - // Wait till tablet update, which is async, is completed try { - Thread.sleep(200); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + await() + .atMost(Duration.ofSeconds(5)) + .pollInterval(Duration.ofMillis(50)) + .until(() -> isSessionLearnedTabletInfo(session)); + return true; + } catch (ConditionTimeoutException e) { + return false; } - return isSessionLearnedTabletInfo(session); } private static boolean checkIfRoutedProperly(CqlSession session, Statement stmt) { diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java index e6121217619..ac9c2673875 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java @@ -174,11 +174,12 @@ public void should_evict_down_node_metrics_when_timeout_fires() throws Exception // trigger node1 UP -> DOWN eventBus.fire(NodeStateEvent.changed(NodeState.UP, NodeState.DOWN, node1)); - Thread.sleep(expireAfter.toMillis()); - // then node-level metrics should be evicted from node1, but // node2 and node3 metrics should not have been evicted - await().untilAsserted(() -> assertNodeMetricsEvicted(session, node1)); + await() + .atMost(expireAfter.plusSeconds(5)) + .pollInterval(Duration.ofMillis(100)) + .untilAsserted(() -> assertNodeMetricsEvicted(session, node1)); assertNodeMetricsNotEvicted(session, node2); assertNodeMetricsNotEvicted(session, node3); @@ -219,19 +220,25 @@ public void should_not_evict_down_node_metrics_when_node_is_back_up_before_timeo eventBus.fire(NodeStateEvent.changed(NodeState.UP, NodeState.FORCED_DOWN, node2)); eventBus.fire(NodeStateEvent.removed(node3)); - Thread.sleep(500); + // Wait for half the expiry window before bringing nodes back up + await().pollDelay(Duration.ofMillis(500)).atMost(Duration.ofSeconds(5)).until(() -> true); // trigger nodes DOWN -> UP, should cancel the timeouts eventBus.fire(NodeStateEvent.changed(NodeState.DOWN, NodeState.UP, node1)); eventBus.fire(NodeStateEvent.changed(NodeState.FORCED_DOWN, NodeState.UP, node2)); eventBus.fire(NodeStateEvent.added(node3)); - Thread.sleep(expireAfter.toMillis()); - - // then no node-level metrics should be evicted - assertNodeMetricsNotEvicted(session, node1); - assertNodeMetricsNotEvicted(session, node2); - assertNodeMetricsNotEvicted(session, node3); + // Wait for the full expiry duration and verify metrics are never evicted + await() + .during(expireAfter) + .atMost(expireAfter.plusSeconds(5)) + .pollInterval(Duration.ofMillis(200)) + .untilAsserted( + () -> { + assertNodeMetricsNotEvicted(session, node1); + assertNodeMetricsNotEvicted(session, node2); + assertNodeMetricsNotEvicted(session, node3); + }); } finally { AbstractMetricUpdater.MIN_EXPIRE_AFTER = Duration.ofMinutes(5); diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java index 0498462ce1c..4e9eefebf63 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java @@ -34,14 +34,11 @@ import com.datastax.oss.driver.api.core.config.TypedDriverOption; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; -import com.datastax.oss.driver.api.core.cql.SimpleStatement; -import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge; import com.datastax.oss.driver.categories.IsolatedTests; import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder; import java.net.InetSocketAddress; -import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -231,30 +228,7 @@ public void cannot_reconnect_with_resolved_socket() { ccmBridge.create(); ccmBridge.start(); session = builder.build(); - long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000; - while (System.currentTimeMillis() < endTime) { - try { - nodes = session.getMetadata().getNodes().values(); - int upNodes = 0; - for (Node node : nodes) { - if (node.getUpSinceMillis() > 0) { - upNodes++; - } - } - if (upNodes == 3) { - break; - } - // session.refreshSchema(); - SimpleStatement statement = - new SimpleStatementBuilder("select * from system.local where key='local'") - .setTimeout(Duration.ofSeconds(3)) - .build(); - session.executeAsync(statement); - Thread.sleep(3000); - } catch (InterruptedException e) { - break; - } - } + waitForAllNodesUp(session, 3); ResultSet rs = session.execute("select * from system.local where key='local'"); assertThat(rs).isNotNull(); Row row = rs.one(); @@ -291,29 +265,7 @@ public void cannot_reconnect_with_resolved_socket() { "test.cluster.fake", ccmBridge.getNodeIpAddress(3)); ccmBridge.create(); ccmBridge.start(); - long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000; - while (System.currentTimeMillis() < endTime) { - try { - nodes = session.getMetadata().getNodes().values(); - int upNodes = 0; - for (Node node : nodes) { - if (node.getUpSinceMillis() > 0) { - upNodes++; - } - } - if (upNodes == 3) { - break; - } - SimpleStatement statement = - new SimpleStatementBuilder("select * from system.local where key='local'") - .setTimeout(Duration.ofSeconds(3)) - .build(); - session.executeAsync(statement); - Thread.sleep(3000); - } catch (InterruptedException e) { - break; - } - } + waitForAllNodesUp(session, 3); nodes = session.getMetadata().getNodes().values(); assertThat(nodes).hasSize(3); Iterator iterator = nodes.iterator(); @@ -346,30 +298,7 @@ public void cannot_reconnect_with_resolved_socket() { // Now the driver should fail to reconnect since unresolved hostname is gone. ccmBridge.create(); ccmBridge.start(); - long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000; - while (System.currentTimeMillis() < endTime) { - try { - nodes = session.getMetadata().getNodes().values(); - int upNodes = 0; - for (Node node : nodes) { - if (node.getUpSinceMillis() > 0) { - upNodes++; - } - } - if (upNodes == 3) { - break; - } - // session.refreshSchema(); - SimpleStatement statement = - new SimpleStatementBuilder("select * from system.local where key='local'") - .setTimeout(Duration.ofSeconds(3)) - .build(); - session.executeAsync(statement); - Thread.sleep(3000); - } catch (InterruptedException e) { - break; - } - } + waitForAllNodesUp(session, 3); session.execute("select * from system.local where key='local'"); } session.close();