Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@
<artifactId>tools</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.datastax.oss.driver.core.metadata;

import static org.awaitility.Awaitility.await;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
Expand Down Expand Up @@ -28,6 +30,7 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.Function;
import java.util.function.Supplier;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand Down Expand Up @@ -340,16 +343,15 @@ public void should_receive_each_tablet_exactly_once() {
}

private static boolean waitSessionLearnedTabletInfo(CqlSession session) {
if (isSessionLearnedTabletInfo(session)) {
return true;
}
// Wait till tablet update, which is async, is completed
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
await()
.atMost(Duration.ofSeconds(5))
.pollInterval(Duration.ofMillis(50))
.until(() -> isSessionLearnedTabletInfo(session));
return true;
} catch (ConditionTimeoutException e) {
return false;
}
return isSessionLearnedTabletInfo(session);
}

private static boolean checkIfRoutedProperly(CqlSession session, Statement stmt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,12 @@ public void should_evict_down_node_metrics_when_timeout_fires() throws Exception
// trigger node1 UP -> DOWN
eventBus.fire(NodeStateEvent.changed(NodeState.UP, NodeState.DOWN, node1));

Thread.sleep(expireAfter.toMillis());

// then node-level metrics should be evicted from node1, but
// node2 and node3 metrics should not have been evicted
await().untilAsserted(() -> assertNodeMetricsEvicted(session, node1));
await()
.atMost(expireAfter.plusSeconds(5))
.pollInterval(Duration.ofMillis(100))
.untilAsserted(() -> assertNodeMetricsEvicted(session, node1));
assertNodeMetricsNotEvicted(session, node2);
assertNodeMetricsNotEvicted(session, node3);

Expand Down Expand Up @@ -219,19 +220,25 @@ public void should_not_evict_down_node_metrics_when_node_is_back_up_before_timeo
eventBus.fire(NodeStateEvent.changed(NodeState.UP, NodeState.FORCED_DOWN, node2));
eventBus.fire(NodeStateEvent.removed(node3));

Thread.sleep(500);
// Wait for half the expiry window before bringing nodes back up
await().pollDelay(Duration.ofMillis(500)).atMost(Duration.ofSeconds(5)).until(() -> true);

// trigger nodes DOWN -> UP, should cancel the timeouts
eventBus.fire(NodeStateEvent.changed(NodeState.DOWN, NodeState.UP, node1));
eventBus.fire(NodeStateEvent.changed(NodeState.FORCED_DOWN, NodeState.UP, node2));
eventBus.fire(NodeStateEvent.added(node3));

Thread.sleep(expireAfter.toMillis());

// then no node-level metrics should be evicted
assertNodeMetricsNotEvicted(session, node1);
assertNodeMetricsNotEvicted(session, node2);
assertNodeMetricsNotEvicted(session, node3);
// Wait for the full expiry duration and verify metrics are never evicted
await()
.during(expireAfter)
.atMost(expireAfter.plusSeconds(5))
.pollInterval(Duration.ofMillis(200))
.untilAsserted(
() -> {
assertNodeMetricsNotEvicted(session, node1);
assertNodeMetricsNotEvicted(session, node2);
assertNodeMetricsNotEvicted(session, node3);
});

} finally {
AbstractMetricUpdater.MIN_EXPIRE_AFTER = Duration.ofMinutes(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,11 @@
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge;
import com.datastax.oss.driver.categories.IsolatedTests;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -231,30 +228,7 @@ public void cannot_reconnect_with_resolved_socket() {
ccmBridge.create();
ccmBridge.start();
session = builder.build();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
// session.refreshSchema();
SimpleStatement statement =
new SimpleStatementBuilder("select * from system.local where key='local'")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
waitForAllNodesUp(session, 3);
ResultSet rs = session.execute("select * from system.local where key='local'");
assertThat(rs).isNotNull();
Row row = rs.one();
Expand Down Expand Up @@ -291,29 +265,7 @@ public void cannot_reconnect_with_resolved_socket() {
"test.cluster.fake", ccmBridge.getNodeIpAddress(3));
ccmBridge.create();
ccmBridge.start();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
SimpleStatement statement =
new SimpleStatementBuilder("select * from system.local where key='local'")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
waitForAllNodesUp(session, 3);
nodes = session.getMetadata().getNodes().values();
assertThat(nodes).hasSize(3);
Iterator<Node> iterator = nodes.iterator();
Expand Down Expand Up @@ -346,30 +298,7 @@ public void cannot_reconnect_with_resolved_socket() {
// Now the driver should fail to reconnect since unresolved hostname is gone.
ccmBridge.create();
ccmBridge.start();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
// session.refreshSchema();
SimpleStatement statement =
new SimpleStatementBuilder("select * from system.local where key='local'")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
waitForAllNodesUp(session, 3);
session.execute("select * from system.local where key='local'");
}
session.close();
Expand Down
Loading