Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,19 @@ The Scylla Java Driver is a fork from [DataStax Java Driver](https://github.com/
**Features:**

* Like all Scylla Drivers, the Scylla Java Driver is **Shard Aware** and contains extensions for a `tokenAwareHostPolicy`.
Using this policy, the driver can select a connection to a particular shard based on the shards token.
Using this policy, the driver can select a connection to a particular shard based on the shard's token.
As a result, latency is significantly reduced because there is no need to pass data between the shards.
* **Lightweight Transaction (LWT) Optimization**:
- When using `TokenAwarePolicy` with prepared statements, LWT queries automatically use replica-only routing,
prioritizing local datacenter replicas to minimize coordinator forwarding overhead and reduce contention during
Paxos consensus phases.
- When using `RackAwareRoundRobinPolicy`, LWT queries skip local rack prioritization and distribute evenly across
all hosts in the local datacenter. This avoids creating rack-level hotspots during Paxos consensus, which can
lead to increased contention and reduced throughput. The local datacenter is still prioritized over remote
datacenters to maintain low latency.
- When using `LatencyAwarePolicy`, LWT queries bypass latency-based reordering to preserve deterministic replica
selection. This ensures that LWT routing assumptions (such as consistent coordinator selection for optimal Paxos
performance) are maintained throughout the policy chain.
* [Sync](manual/) and [Async](manual/async/) API
* [Simple](manual/statements/simple/), [Prepared](manual/statements/prepared/), and [Batch](manual/statements/batch/)
statements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class QueryOptions {

public static final int DEFAULT_REFRESH_SCHEMA_INTERVAL_MILLIS = 1000;

public static final RequestRoutingMethod DEFAULT_LOAD_BALANCING_LWT_REQUEST_ROUTING_METHOD =
RequestRoutingMethod.PRESERVE_REPLICA_ORDER;

private volatile ConsistencyLevel consistency = DEFAULT_CONSISTENCY_LEVEL;
private volatile ConsistencyLevel serialConsistency = DEFAULT_SERIAL_CONSISTENCY_LEVEL;
private volatile int fetchSize = DEFAULT_FETCH_SIZE;
Expand Down Expand Up @@ -79,6 +82,9 @@ public class QueryOptions {
private volatile boolean addOriginalContactsToReconnectionPlan = false;
private volatile boolean considerZeroTokenNodesValidPeers = false;

private volatile RequestRoutingMethod loadBalancingLwtRequestRoutingMethod =
DEFAULT_LOAD_BALANCING_LWT_REQUEST_ROUTING_METHOD;

/**
* Creates a new {@link QueryOptions} instance using the {@link #DEFAULT_CONSISTENCY_LEVEL},
* {@link #DEFAULT_SERIAL_CONSISTENCY_LEVEL} and {@link #DEFAULT_FETCH_SIZE}.
Expand Down Expand Up @@ -221,7 +227,7 @@ public QueryOptions setSkipCQL4MetadataResolveMethod(CQL4SkipMetadataResolveMeth
/**
* Skip metadata resolve method .
*
* <p>It defaults to {@link #skipCQL4MetadataResolveMethod.SMART}.
* <p>It defaults to {@link CQL4SkipMetadataResolveMethod#SMART}.
*
* @return the default idempotence for queries.
*/
Expand Down Expand Up @@ -574,6 +580,28 @@ public boolean shouldConsiderZeroTokenNodesValidPeers() {
return this.considerZeroTokenNodesValidPeers;
}

/**
* Sets the default request routing method to use for LWT queries. Default is {@link
* RequestRoutingMethod#PRESERVE_REPLICA_ORDER}.
*
* @param loadBalancingLwtRequestRoutingMethod the new request routing method.
* @return this {@code QueryOptions} instance.
*/
public QueryOptions setLoadBalancingLwtRequestRoutingMethod(
RequestRoutingMethod loadBalancingLwtRequestRoutingMethod) {
this.loadBalancingLwtRequestRoutingMethod = loadBalancingLwtRequestRoutingMethod;
return this;
}

/**
* The default request routing method used by LWT queries.
*
* @return the default request routing method used by LWT queries.
*/
public RequestRoutingMethod getLoadBalancingLwtRequestRoutingMethod() {
return loadBalancingLwtRequestRoutingMethod;
}

@Override
public boolean equals(Object that) {
if (that == null || !(that instanceof QueryOptions)) {
Expand All @@ -594,7 +622,9 @@ public boolean equals(Object that) {
&& this.refreshNodeIntervalMillis == other.refreshNodeIntervalMillis
&& this.refreshSchemaIntervalMillis == other.refreshSchemaIntervalMillis
&& this.reprepareOnUp == other.reprepareOnUp
&& this.prepareOnAllHosts == other.prepareOnAllHosts)
&& this.prepareOnAllHosts == other.prepareOnAllHosts
&& this.loadBalancingLwtRequestRoutingMethod
== other.loadBalancingLwtRequestRoutingMethod)
&& this.schemaQueriesPaged == other.schemaQueriesPaged;
}

Expand All @@ -614,6 +644,7 @@ public int hashCode() {
refreshSchemaIntervalMillis,
reprepareOnUp,
prepareOnAllHosts,
loadBalancingLwtRequestRoutingMethod,
schemaQueriesPaged);
}

Expand All @@ -626,4 +657,10 @@ public enum CQL4SkipMetadataResolveMethod {
DISABLED,
SMART
}

/** The request routing method for queries. */
public enum RequestRoutingMethod {
REGULAR,
PRESERVE_REPLICA_ORDER
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,42 +97,6 @@ class RequestHandler {
private final AtomicBoolean isDone = new AtomicBoolean();
private final AtomicInteger executionIndex = new AtomicInteger();

private Iterator<Host> getReplicas(
String loggedKeyspace, Statement statement, Iterator<Host> fallback) {
ProtocolVersion protocolVersion = manager.cluster.manager.protocolVersion();
CodecRegistry codecRegistry = manager.cluster.manager.configuration.getCodecRegistry();
ByteBuffer partitionKey = statement.getRoutingKey(protocolVersion, codecRegistry);
String keyspace = statement.getKeyspace();
if (keyspace == null) {
keyspace = loggedKeyspace;
}

if (partitionKey == null || keyspace == null) {
return fallback;
}

Token.Factory partitioner = statement.getPartitioner();
String tableName = null;
ColumnDefinitions defs = null;
if (statement instanceof BoundStatement) {
defs = ((BoundStatement) statement).preparedStatement().getVariables();
} else if (statement instanceof PreparedStatement) {
defs = ((PreparedStatement) statement).getVariables();
}
if (defs != null && defs.size() > 0) {
tableName = defs.getTable(0);
}

final List<Host> replicas =
manager
.cluster
.getMetadata()
.getReplicasList(Metadata.quote(keyspace), tableName, partitioner, partitionKey);

// replicas are stored in the right order starting with the primary replica
return replicas.iterator();
}

public RequestHandler(SessionManager manager, Callback callback, Statement statement) {
this.id = Long.toString(System.identityHashCode(this));
if (logger.isTraceEnabled()) logger.trace("[{}] {}", id, statement);
Expand All @@ -145,15 +109,6 @@ public RequestHandler(SessionManager manager, Callback callback, Statement state
// If host is explicitly set on statement, bypass load balancing policy.
if (statement.getHost() != null) {
this.queryPlan = new QueryPlan(Iterators.singletonIterator(statement.getHost()));
} else if (statement.isLWT()) {
this.queryPlan =
new QueryPlan(
getReplicas(
manager.poolsState.keyspace,
statement,
manager
.loadBalancingPolicy()
.newQueryPlan(manager.poolsState.keyspace, statement)));
} else {
this.queryPlan =
new QueryPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@
* they will only be tried if all other nodes failed). Note that this policy only penalizes slow
* nodes, it does <em>not</em> globally sort the query plan by latency.
*
* <p><strong>LWT statements:</strong> if {@link Statement#isLWT()} returns {@code true}, this
* policy does not apply latency-based reordering and returns the child policy's query plan as-is.
* This is to preserve LWT-specific routing assumptions (for example deterministic replica selection
* when using {@link TokenAwarePolicy}).
*
* <p>The latency score for a given node is a based on a form of <a
* href="http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average">exponential moving
* average</a>. In other words, the latency score of a node is the average of its previously
Expand Down Expand Up @@ -145,7 +150,7 @@ public void run() {
if (logger.isDebugEnabled()) {
/*
* For users to be able to know if the policy potentially needs tuning, we need to provide
* some feedback on on how things evolve. For that, we use the min computation to also check
* some feedback on how things evolve. For that, we use the min computation to also check
* which host will be excluded if a query is submitted now and if any host is, we log it (but
* we try to avoid flooding too). This is probably interesting information anyway since it
* gets an idea of which host perform badly.
Expand Down Expand Up @@ -253,6 +258,13 @@ public HostDistance distance(Host host) {
*/
@Override
public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
// For LWT queries, preserve the child policy's ordering.
// LWT routing can rely on deterministic replica ordering (e.g. by TokenAwarePolicy), and
// latency-based reordering can undermine those assumptions.
if (statement != null && statement.isLWT()) {
return childPolicy.newQueryPlan(loggedKeyspace, statement);
}

final Iterator<Host> childIter = childPolicy.newQueryPlan(loggedKeyspace, statement);
return new AbstractIterator<Host>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
* but those are always tried after the local nodes. In other words, this policy guarantees that no
* host in a remote data center will be queried unless no host in the local data center can be
* reached.
*
* <p>For LWT (Lightweight Transaction) queries (where {@link Statement#isLWT()} returns {@code
* true}), the policy skips local rack prioritization and treats all hosts in the local datacenter
* equally, distributing queries in round-robin fashion across the entire local DC. Remote
* datacenters are still only used as fallback after all local DC hosts have been tried.
*/
public class RackAwareRoundRobinPolicy implements LoadBalancingPolicy {

Expand All @@ -73,11 +78,11 @@ public static Builder builder() {
private static final String UNSET = "";

private final ConcurrentMap<String, CopyOnWriteArrayList<Host>> perDcLiveHosts =
new ConcurrentHashMap<String, CopyOnWriteArrayList<Host>>();
private final CopyOnWriteArrayList<Host> liveHostsLocalRackLocalDC =
new CopyOnWriteArrayList<Host>();
new ConcurrentHashMap<>();
private final CopyOnWriteArrayList<Host> liveHostsAllLocalDC = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Host> liveHostsLocalRackLocalDC = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Host> liveHostsRemoteRacksLocalDC =
new CopyOnWriteArrayList<Host>();
new CopyOnWriteArrayList<>();
@VisibleForTesting final AtomicInteger index = new AtomicInteger();

@VisibleForTesting volatile String localDc;
Expand Down Expand Up @@ -147,6 +152,7 @@ public void init(Cluster cluster, Collection<Host> hosts) {
else prev.addIfAbsent(host);

if (dc.equals(localDc)) {
liveHostsAllLocalDC.add(host);
if (rack.equals(localRack)) {
liveHostsLocalRackLocalDC.add(host);
} else {
Expand Down Expand Up @@ -240,10 +246,17 @@ public HostDistance distance(Host host) {
@Override
public Iterator<Host> newQueryPlan(String loggedKeyspace, final Statement statement) {

CopyOnWriteArrayList<Host> localLiveHosts = perDcLiveHosts.get(localDc);
// Clone for thread safety
final List<Host> copyLiveHostsLocalRackLocalDC = cloneList(liveHostsLocalRackLocalDC);
final List<Host> copyLiveHostsRemoteRacksLocalDC = cloneList(liveHostsRemoteRacksLocalDC);
// For LWT queries, skip rack prioritization and use all local DC hosts equally
final boolean isLWT = statement != null && statement.isLWT();

// For LWT queries, include all local DC hosts in the first part of the plan, not just those in
// the local rack
final List<Host> copyLiveHostsLocalRackLocalDC =
isLWT ? cloneList(liveHostsAllLocalDC) : cloneList(liveHostsLocalRackLocalDC);
// For LWT queries, skip the second part of the plan that includes hosts in remote racks of the
// local DC
final List<Host> copyLiveHostsRemoteRacksLocalDC =
isLWT ? Collections.emptyList() : cloneList(liveHostsRemoteRacksLocalDC);
final int startIdx = index.getAndIncrement();

return new AbstractIterator<Host>() {
Expand Down Expand Up @@ -288,7 +301,7 @@ protected Host computeNext() {
}

ConsistencyLevel cl =
statement.getConsistencyLevel() == null
statement == null || statement.getConsistencyLevel() == null
? configuration.getQueryOptions().getConsistencyLevel()
: statement.getConsistencyLevel();

Expand Down Expand Up @@ -348,6 +361,7 @@ public void onUp(Host host) {
dcHosts.addIfAbsent(host);

if (dc.equals(localDc)) {
liveHostsAllLocalDC.addIfAbsent(host);
if (rack.equals(localRack)) {
liveHostsLocalRackLocalDC.add(host);
} else {
Expand All @@ -365,6 +379,7 @@ public void onDown(Host host) {
if (dcHosts != null) dcHosts.remove(host);

if (dc.equals(localDc)) {
liveHostsAllLocalDC.remove(host);
if (rack.equals(localRack)) {
liveHostsLocalRackLocalDC.remove(host);
} else {
Expand Down
Loading