|
28 | 28 | import com.datastax.driver.core.LatencyTracker; |
29 | 29 | import com.datastax.driver.core.ScassandraTestBase; |
30 | 30 | import com.datastax.driver.core.Session; |
| 31 | +import com.datastax.driver.core.SimpleStatement; |
31 | 32 | import com.datastax.driver.core.Statement; |
32 | 33 | import com.datastax.driver.core.exceptions.NoHostAvailableException; |
33 | 34 | import com.datastax.driver.core.exceptions.ReadTimeoutException; |
34 | 35 | import com.datastax.driver.core.exceptions.UnavailableException; |
| 36 | +import com.google.common.collect.Lists; |
| 37 | +import java.util.Iterator; |
35 | 38 | import java.util.concurrent.CountDownLatch; |
36 | 39 | import org.testng.annotations.Test; |
37 | 40 |
|
@@ -178,4 +181,50 @@ public void should_consider_latency_when_read_timeout() throws Exception { |
178 | 181 | cluster.close(); |
179 | 182 | } |
180 | 183 | } |
| 184 | + |
| 185 | + @Test(groups = "short") |
| 186 | + public void should_not_reorder_query_plan_for_lwt_queries() throws Exception { |
| 187 | + // given |
| 188 | + String query = "SELECT foo FROM bar"; |
| 189 | + primingClient.prime(queryBuilder().withQuery(query).build()); |
| 190 | + |
| 191 | + LatencyAwarePolicy latencyAwarePolicy = |
| 192 | + LatencyAwarePolicy.builder(new RoundRobinPolicy()).withMininumMeasurements(1).build(); |
| 193 | + |
| 194 | + Cluster.Builder builder = super.createClusterBuilder(); |
| 195 | + builder.withLoadBalancingPolicy(latencyAwarePolicy); |
| 196 | + |
| 197 | + Cluster cluster = builder.build(); |
| 198 | + try { |
| 199 | + cluster.init(); |
| 200 | + |
| 201 | + // Create an LWT statement so latency-aware policy must preserve child ordering |
| 202 | + Statement lwtStatement = |
| 203 | + new SimpleStatement(query) { |
| 204 | + @Override |
| 205 | + public boolean isLWT() { |
| 206 | + return true; |
| 207 | + } |
| 208 | + }; |
| 209 | + |
| 210 | + // Make a request to populate latency metrics |
| 211 | + LatencyTrackerBarrier barrier = new LatencyTrackerBarrier(1); |
| 212 | + cluster.register(barrier); |
| 213 | + Session session = cluster.connect(); |
| 214 | + session.execute(query); |
| 215 | + barrier.await(); |
| 216 | + latencyAwarePolicy.new Updater().run(); |
| 217 | + |
| 218 | + // when |
| 219 | + Iterator<Host> plan1 = latencyAwarePolicy.newQueryPlan("ks", lwtStatement); |
| 220 | + Iterator<Host> plan2 = latencyAwarePolicy.newQueryPlan("ks", lwtStatement); |
| 221 | + |
| 222 | + // then |
| 223 | + Host host = retrieveSingleHost(cluster); |
| 224 | + assertThat(Lists.newArrayList(plan1)).containsExactly(host); |
| 225 | + assertThat(Lists.newArrayList(plan2)).containsExactly(host); |
| 226 | + } finally { |
| 227 | + cluster.close(); |
| 228 | + } |
| 229 | + } |
181 | 230 | } |
0 commit comments