Skip to content

Commit ab2574f

Browse files
nikagradkropachev
authored andcommitted
feat: Preserve deterministic routing for LWT queries in LatencyAwarePolicy. ⏱️
1 parent 74606a9 commit ab2574f

File tree

3 files changed

+73
-4
lines changed

3 files changed

+73
-4
lines changed

README.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,17 @@ The Scylla Java Driver is a fork from [DataStax Java Driver](https://github.com/
1919
* Like all Scylla Drivers, the Scylla Java Driver is **Shard Aware** and contains extensions for a `tokenAwareHostPolicy`.
2020
Using this policy, the driver can select a connection to a particular shard based on the shard's token.
2121
As a result, latency is significantly reduced because there is no need to pass data between the shards.
22-
* **Lightweight Transaction (LWT) Optimization**: when using `TokenAwarePolicy` with prepared statements,
23-
LWT queries automatically use replica-only routing, prioritizing local datacenter replicas to minimize
24-
coordinator forwarding overhead and reduce contention during Paxos consensus phases.
22+
* **Lightweight Transaction (LWT) Optimization**:
23+
- When using `TokenAwarePolicy` with prepared statements, LWT queries automatically use replica-only routing,
24+
prioritizing local datacenter replicas to minimize coordinator forwarding overhead and reduce contention during
25+
Paxos consensus phases.
26+
- When using `RackAwareRoundRobinPolicy`, LWT queries skip local rack prioritization and distribute evenly across
27+
all hosts in the local datacenter. This avoids creating rack-level hotspots during Paxos consensus, which can
28+
lead to increased contention and reduced throughput. The local datacenter is still prioritized over remote
29+
datacenters to maintain low latency.
30+
- When using `LatencyAwarePolicy`, LWT queries bypass latency-based reordering to preserve deterministic replica
31+
selection. This ensures that LWT routing assumptions (such as consistent coordinator selection for optimal Paxos
32+
performance) are maintained throughout the policy chain.
2533
* [Sync](manual/) and [Async](manual/async/) API
2634
* [Simple](manual/statements/simple/), [Prepared](manual/statements/prepared/), and [Batch](manual/statements/batch/)
2735
statements

driver-core/src/main/java/com/datastax/driver/core/policies/LatencyAwarePolicy.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@
6262
* they will only be tried if all other nodes failed). Note that this policy only penalizes slow
6363
* nodes, it does <em>not</em> globally sort the query plan by latency.
6464
*
65+
* <p><strong>LWT statements:</strong> if {@link Statement#isLWT()} returns {@code true}, this
66+
* policy does not apply latency-based reordering and returns the child policy's query plan as-is.
67+
* This is to preserve LWT-specific routing assumptions (for example deterministic replica selection
68+
* when using {@link TokenAwarePolicy}).
69+
*
6570
* <p>The latency score for a given node is a based on a form of <a
6671
* href="http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average">exponential moving
6772
* average</a>. In other words, the latency score of a node is the average of its previously
@@ -145,7 +150,7 @@ public void run() {
145150
if (logger.isDebugEnabled()) {
146151
/*
147152
* For users to be able to know if the policy potentially needs tuning, we need to provide
148-
* some feedback on on how things evolve. For that, we use the min computation to also check
153+
* some feedback on how things evolve. For that, we use the min computation to also check
149154
* which host will be excluded if a query is submitted now and if any host is, we log it (but
150155
* we try to avoid flooding too). This is probably interesting information anyway since it
151156
* gets an idea of which host perform badly.
@@ -253,6 +258,13 @@ public HostDistance distance(Host host) {
253258
*/
254259
@Override
255260
public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
261+
// For LWT queries, preserve the child policy's ordering.
262+
// LWT routing can rely on deterministic replica ordering (e.g. by TokenAwarePolicy), and
263+
// latency-based reordering can undermine those assumptions.
264+
if (statement != null && statement.isLWT()) {
265+
return childPolicy.newQueryPlan(loggedKeyspace, statement);
266+
}
267+
256268
final Iterator<Host> childIter = childPolicy.newQueryPlan(loggedKeyspace, statement);
257269
return new AbstractIterator<Host>() {
258270

driver-core/src/test/java/com/datastax/driver/core/policies/LatencyAwarePolicyTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@
2828
import com.datastax.driver.core.LatencyTracker;
2929
import com.datastax.driver.core.ScassandraTestBase;
3030
import com.datastax.driver.core.Session;
31+
import com.datastax.driver.core.SimpleStatement;
3132
import com.datastax.driver.core.Statement;
3233
import com.datastax.driver.core.exceptions.NoHostAvailableException;
3334
import com.datastax.driver.core.exceptions.ReadTimeoutException;
3435
import com.datastax.driver.core.exceptions.UnavailableException;
36+
import com.google.common.collect.Lists;
37+
import java.util.Iterator;
3538
import java.util.concurrent.CountDownLatch;
3639
import org.testng.annotations.Test;
3740

@@ -178,4 +181,50 @@ public void should_consider_latency_when_read_timeout() throws Exception {
178181
cluster.close();
179182
}
180183
}
184+
185+
@Test(groups = "short")
186+
public void should_not_reorder_query_plan_for_lwt_queries() throws Exception {
187+
// given
188+
String query = "SELECT foo FROM bar";
189+
primingClient.prime(queryBuilder().withQuery(query).build());
190+
191+
LatencyAwarePolicy latencyAwarePolicy =
192+
LatencyAwarePolicy.builder(new RoundRobinPolicy()).withMininumMeasurements(1).build();
193+
194+
Cluster.Builder builder = super.createClusterBuilder();
195+
builder.withLoadBalancingPolicy(latencyAwarePolicy);
196+
197+
Cluster cluster = builder.build();
198+
try {
199+
cluster.init();
200+
201+
// Create an LWT statement so latency-aware policy must preserve child ordering
202+
Statement lwtStatement =
203+
new SimpleStatement(query) {
204+
@Override
205+
public boolean isLWT() {
206+
return true;
207+
}
208+
};
209+
210+
// Make a request to populate latency metrics
211+
LatencyTrackerBarrier barrier = new LatencyTrackerBarrier(1);
212+
cluster.register(barrier);
213+
Session session = cluster.connect();
214+
session.execute(query);
215+
barrier.await();
216+
latencyAwarePolicy.new Updater().run();
217+
218+
// when
219+
Iterator<Host> plan1 = latencyAwarePolicy.newQueryPlan("ks", lwtStatement);
220+
Iterator<Host> plan2 = latencyAwarePolicy.newQueryPlan("ks", lwtStatement);
221+
222+
// then
223+
Host host = retrieveSingleHost(cluster);
224+
assertThat(Lists.newArrayList(plan1)).containsExactly(host);
225+
assertThat(Lists.newArrayList(plan2)).containsExactly(host);
226+
} finally {
227+
cluster.close();
228+
}
229+
}
181230
}

0 commit comments

Comments
 (0)