Skip to content

Commit 74606a9

Browse files
nikagradkropachev
authored andcommitted
feat: Implement LWT routing optimization in RackAwareRoundRobinPolicy. 🖥️
1 parent cb02a1f commit 74606a9

File tree

3 files changed

+138
-9
lines changed

3 files changed

+138
-9
lines changed

driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@
5656
* but those are always tried after the local nodes. In other words, this policy guarantees that no
5757
* host in a remote data center will be queried unless no host in the local data center can be
5858
* reached.
59+
*
60+
* <p>For LWT (Lightweight Transaction) queries (where {@link Statement#isLWT()} returns {@code
61+
* true}), the policy skips local rack prioritization and treats all hosts in the local datacenter
62+
* equally, distributing queries in round-robin fashion across the entire local DC. Remote
63+
* datacenters are still only used as fallback after all local DC hosts have been tried.
5964
*/
6065
public class RackAwareRoundRobinPolicy implements LoadBalancingPolicy {
6166

@@ -73,11 +78,11 @@ public static Builder builder() {
7378
private static final String UNSET = "";
7479

7580
private final ConcurrentMap<String, CopyOnWriteArrayList<Host>> perDcLiveHosts =
76-
new ConcurrentHashMap<String, CopyOnWriteArrayList<Host>>();
77-
private final CopyOnWriteArrayList<Host> liveHostsLocalRackLocalDC =
78-
new CopyOnWriteArrayList<Host>();
81+
new ConcurrentHashMap<>();
82+
private final CopyOnWriteArrayList<Host> liveHostsAllLocalDC = new CopyOnWriteArrayList<>();
83+
private final CopyOnWriteArrayList<Host> liveHostsLocalRackLocalDC = new CopyOnWriteArrayList<>();
7984
private final CopyOnWriteArrayList<Host> liveHostsRemoteRacksLocalDC =
80-
new CopyOnWriteArrayList<Host>();
85+
new CopyOnWriteArrayList<>();
8186
@VisibleForTesting final AtomicInteger index = new AtomicInteger();
8287

8388
@VisibleForTesting volatile String localDc;
@@ -147,6 +152,7 @@ public void init(Cluster cluster, Collection<Host> hosts) {
147152
else prev.addIfAbsent(host);
148153

149154
if (dc.equals(localDc)) {
155+
liveHostsAllLocalDC.add(host);
150156
if (rack.equals(localRack)) {
151157
liveHostsLocalRackLocalDC.add(host);
152158
} else {
@@ -240,10 +246,17 @@ public HostDistance distance(Host host) {
240246
@Override
241247
public Iterator<Host> newQueryPlan(String loggedKeyspace, final Statement statement) {
242248

243-
CopyOnWriteArrayList<Host> localLiveHosts = perDcLiveHosts.get(localDc);
244-
// Clone for thread safety
245-
final List<Host> copyLiveHostsLocalRackLocalDC = cloneList(liveHostsLocalRackLocalDC);
246-
final List<Host> copyLiveHostsRemoteRacksLocalDC = cloneList(liveHostsRemoteRacksLocalDC);
249+
// For LWT queries, skip rack prioritization and use all local DC hosts equally
250+
final boolean isLWT = statement != null && statement.isLWT();
251+
252+
// For LWT queries, include all local DC hosts in the first part of the plan, not just those in
253+
// the local rack
254+
final List<Host> copyLiveHostsLocalRackLocalDC =
255+
isLWT ? cloneList(liveHostsAllLocalDC) : cloneList(liveHostsLocalRackLocalDC);
256+
// For LWT queries, skip the second part of the plan that includes hosts in remote racks of the
257+
// local DC
258+
final List<Host> copyLiveHostsRemoteRacksLocalDC =
259+
isLWT ? Collections.emptyList() : cloneList(liveHostsRemoteRacksLocalDC);
247260
final int startIdx = index.getAndIncrement();
248261

249262
return new AbstractIterator<Host>() {
@@ -288,7 +301,7 @@ protected Host computeNext() {
288301
}
289302

290303
ConsistencyLevel cl =
291-
statement.getConsistencyLevel() == null
304+
statement == null || statement.getConsistencyLevel() == null
292305
? configuration.getQueryOptions().getConsistencyLevel()
293306
: statement.getConsistencyLevel();
294307

@@ -348,6 +361,7 @@ public void onUp(Host host) {
348361
dcHosts.addIfAbsent(host);
349362

350363
if (dc.equals(localDc)) {
364+
liveHostsAllLocalDC.addIfAbsent(host);
351365
if (rack.equals(localRack)) {
352366
liveHostsLocalRackLocalDC.add(host);
353367
} else {
@@ -365,6 +379,7 @@ public void onDown(Host host) {
365379
if (dcHosts != null) dcHosts.remove(host);
366380

367381
if (dc.equals(localDc)) {
382+
liveHostsAllLocalDC.remove(host);
368383
if (rack.equals(localRack)) {
369384
liveHostsLocalRackLocalDC.remove(host);
370385
} else {

driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,14 @@ public void setUpUnitTests() {
8686
cluster = mock(Cluster.class);
8787
Configuration configuration = mock(Configuration.class);
8888
ProtocolOptions protocolOptions = mock(ProtocolOptions.class);
89+
QueryOptions queryOptions = mock(QueryOptions.class);
8990
Metadata metadata = mock(Metadata.class);
9091
childPolicy = mock(LoadBalancingPolicy.class);
9192
when(cluster.getConfiguration()).thenReturn(configuration);
9293
when(configuration.getCodecRegistry()).thenReturn(codecRegistry);
9394
when(configuration.getProtocolOptions()).thenReturn(protocolOptions);
95+
when(configuration.getQueryOptions()).thenReturn(queryOptions);
96+
when(queryOptions.getConsistencyLevel()).thenReturn(ConsistencyLevel.ONE);
9497
when(protocolOptions.getProtocolVersion()).thenReturn(ProtocolVersion.DEFAULT);
9598
when(cluster.getMetadata()).thenReturn(metadata);
9699
when(host1.isUp()).thenReturn(true);
@@ -1107,6 +1110,110 @@ public void should_follow_configuration_on_query_planning(
11071110
.containsExactly(queryPlanForNonLocalConsistencyLevel2.toArray(new Host[0]));
11081111
}
11091112

1113+
/**
1114+
* Ensures that {@link RackAwareRoundRobinPolicy} skips rack prioritization for LWT queries,
1115+
* treating all local DC hosts equally while still prioritizing local DC over remote DC.
1116+
*
1117+
* @test_category load_balancing:rack_aware,lwt
1118+
*/
1119+
@Test(groups = "unit")
1120+
public void should_skip_rack_prioritization_for_lwt_queries() {
1121+
// given: a policy with 4 local DC hosts (2 in local rack, 2 in remote rack) and 2 remote DC
1122+
// hosts
1123+
// Initialize hosts in a mixed order: remoteRack, localRack, remoteRack, localRack
1124+
// This ensures that when LWT skips rack prioritization, we get a different order
1125+
// than the rack-aware order
1126+
RackAwareRoundRobinPolicy policy =
1127+
new RackAwareRoundRobinPolicy("localDC", "localRack", 1, false, false, false);
1128+
policy.init(cluster, ImmutableList.of(host3, host1, host4, host2, host5, host6));
1129+
1130+
// Create a mock LWT statement
1131+
Statement lwtStatement = mock(Statement.class);
1132+
when(lwtStatement.isLWT()).thenReturn(true);
1133+
when(lwtStatement.getConsistencyLevel()).thenReturn(ConsistencyLevel.ONE);
1134+
1135+
// when: generating query plans for LWT queries
1136+
policy.index.set(0);
1137+
List<Host> queryPlan1 = Lists.newArrayList(policy.newQueryPlan("keyspace", lwtStatement));
1138+
List<Host> queryPlan2 = Lists.newArrayList(policy.newQueryPlan("keyspace", lwtStatement));
1139+
1140+
// then: all 4 local DC hosts should appear before any remote DC host (no rack prioritization)
1141+
Assertions.assertThat(queryPlan1.subList(0, 4)).containsOnly(host1, host2, host3, host4);
1142+
Assertions.assertThat(queryPlan2.subList(0, 4)).containsOnly(host1, host2, host3, host4);
1143+
1144+
// then: remote DC hosts should appear after all local DC hosts
1145+
Assertions.assertThat(queryPlan1.subList(4, 5)).containsOnly(host5);
1146+
Assertions.assertThat(queryPlan2.subList(4, 5)).containsOnly(host5);
1147+
1148+
// then: for LWT queries, order should follow insertion order (host3, host1, host4, host2)
1149+
// not rack-aware order (host1, host2, host3, host4)
1150+
Assertions.assertThat(queryPlan1).startsWith(host3);
1151+
Assertions.assertThat(queryPlan2).startsWith(host1);
1152+
}
1153+
1154+
/**
1155+
* Ensures that {@link RackAwareRoundRobinPolicy} preserves rack-aware routing for non-LWT
1156+
* queries.
1157+
*
1158+
* @test_category load_balancing:rack_aware
1159+
*/
1160+
@Test(groups = "unit")
1161+
public void should_preserve_rack_aware_routing_for_non_lwt_queries() {
1162+
// given: a policy with 4 local DC hosts (2 in local rack, 2 in remote rack) and 2 remote DC
1163+
// hosts
1164+
// Initialize hosts in a mixed order to ensure rack-aware routing reorganizes them
1165+
RackAwareRoundRobinPolicy policy =
1166+
new RackAwareRoundRobinPolicy("localDC", "localRack", 1, false, false, false);
1167+
policy.init(cluster, ImmutableList.of(host3, host1, host4, host2, host5, host6));
1168+
1169+
// Create a normal (non-LWT) statement
1170+
Statement normalStatement = mock(Statement.class);
1171+
when(normalStatement.isLWT()).thenReturn(false);
1172+
when(normalStatement.getConsistencyLevel()).thenReturn(ConsistencyLevel.ONE);
1173+
1174+
// when: generating query plans for non-LWT queries
1175+
policy.index.set(0);
1176+
List<Host> queryPlan1 = Lists.newArrayList(policy.newQueryPlan("keyspace", normalStatement));
1177+
List<Host> queryPlan2 = Lists.newArrayList(policy.newQueryPlan("keyspace", normalStatement));
1178+
1179+
// then: local rack hosts (host1, host2) should appear first regardless of init order
1180+
Assertions.assertThat(queryPlan1.subList(0, 2)).containsOnly(host1, host2);
1181+
Assertions.assertThat(queryPlan2.subList(0, 2)).containsOnly(host1, host2);
1182+
1183+
// then: remote rack local DC hosts (host3, host4) should appear next
1184+
Assertions.assertThat(queryPlan1.subList(2, 4)).containsOnly(host3, host4);
1185+
Assertions.assertThat(queryPlan2.subList(2, 4)).containsOnly(host3, host4);
1186+
1187+
// then: remote DC hosts should appear last
1188+
Assertions.assertThat(queryPlan1.subList(4, 5)).containsOnly(host5);
1189+
Assertions.assertThat(queryPlan2.subList(4, 5)).containsOnly(host5);
1190+
1191+
// then: query plans should follow round-robin pattern within rack boundaries
1192+
Assertions.assertThat(queryPlan1).startsWith(host1);
1193+
Assertions.assertThat(queryPlan2).startsWith(host2);
1194+
}
1195+
1196+
/**
1197+
* Ensures that {@link RackAwareRoundRobinPolicy} handles null statement correctly.
1198+
*
1199+
* @test_category load_balancing:rack_aware
1200+
*/
1201+
@Test(groups = "unit")
1202+
public void should_handle_null_statement() {
1203+
// given: a policy with hosts in local and remote DC
1204+
RackAwareRoundRobinPolicy policy =
1205+
new RackAwareRoundRobinPolicy("localDC", "localRack", 1, false, false, false);
1206+
policy.init(cluster, ImmutableList.of(host1, host2, host3, host4, host5, host6));
1207+
1208+
// when: generating query plan with null statement
1209+
policy.index.set(0);
1210+
List<Host> queryPlan = Lists.newArrayList(policy.newQueryPlan("keyspace", null));
1211+
1212+
// then: should use rack-aware routing (default behavior for non-LWT)
1213+
// Local rack hosts should appear first
1214+
Assertions.assertThat(queryPlan.subList(0, 2)).containsOnly(host1, host2);
1215+
}
1216+
11101217
@DataProvider(name = "distanceTestCases")
11111218
public Object[][] distanceTestCases() {
11121219
return new Object[][] {

manual/load_balancing/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,13 @@ local datacenter and rack. In general, providing the datacenter and rack name ex
138138
Hosts belonging to the local datacenter are at distance `LOCAL`, and appear first in query plans (in a round-robin
139139
fashion) with hosts in the local rack having precedence over nodes in remote racks in the local datacenter.
140140

141+
**LWT (Lightweight Transaction) Behavior:** For LWT queries (`Statement.isLWT()` returns true), the policy does not
142+
prioritize the local rack. Instead, it round-robins evenly across all hosts in the local datacenter first, then falls
143+
back to remote datacenters (if enabled). This design avoids creating rack-level hotspots during Paxos consensus phases.
144+
LWT queries involve multiple rounds of coordination between replicas, and concentrating these operations on a single
145+
rack can create contention that degrades performance. By distributing LWT load across the entire local datacenter,
146+
the driver achieves better throughput while maintaining low latency through datacenter locality.
147+
141148
For example, if there are any UP hosts in the local rack the policy will query those nodes in round-robin fashion:
142149
* query 1: host1 *(local DC, local rack)*, host2 *(local DC, local rack)*, host3 *(local DC, local rack)*
143150
* query 2: host2 *(local DC, local rack)*, host3 *(local DC, local rack)*, host1 *(local DC, local rack)*

0 commit comments

Comments
 (0)