@@ -140,25 +140,83 @@ public BalancedShardsAllocator(
140
140
141
141
@ Override
142
142
public void allocate (RoutingAllocation allocation ) {
143
+ assert allocation .isSimulating () == false || balancerSettings .completeEarlyOnShardAssignmentChange ()
144
+ : "inconsistent states: isSimulating ["
145
+ + allocation .isSimulating ()
146
+ + "] vs completeEarlyOnShardAssignmentChange ["
147
+ + balancerSettings .completeEarlyOnShardAssignmentChange ()
148
+ + "]" ;
143
149
if (allocation .metadata ().hasAnyIndices ()) {
144
150
// must not use licensed features when just starting up
145
151
writeLoadForecaster .refreshLicense ();
146
152
}
147
153
148
154
assert allocation .ignoreDisable () == false ;
155
+ assert allocation .isSimulating () == false || allocation .routingNodes ().hasInactiveShards () == false
156
+ : "expect no initializing shard, but got " + allocation .routingNodes ();
157
+ // TODO: ES-12943 cannot assert the following because shards moved by commands are not simulated promptly in DesiredBalanceComputer
158
+ // assert allocation.isSimulating() == false || allocation.routingNodes().getRelocatingShardCount() == 0
159
+ // : "expect no relocating shard, but got " + allocation.routingNodes();
149
160
150
161
if (allocation .routingNodes ().size () == 0 ) {
151
162
failAllocationOfNewPrimaries (allocation );
152
163
return ;
153
164
}
154
165
final BalancingWeights balancingWeights = balancingWeightsFactory .create ();
155
- final Balancer balancer = new Balancer (writeLoadForecaster , allocation , balancerSettings .getThreshold (), balancingWeights );
156
- balancer .allocateUnassigned ();
157
- balancer .moveShards ();
158
- balancer .balance ();
166
+ final Balancer balancer = new Balancer (
167
+ writeLoadForecaster ,
168
+ allocation ,
169
+ balancerSettings .getThreshold (),
170
+ balancingWeights ,
171
+ balancerSettings .completeEarlyOnShardAssignmentChange ()
172
+ );
173
+
174
+ boolean shardAssigned = false , shardMoved = false , shardBalanced = false ;
175
+ try {
176
+ shardAssigned = balancer .allocateUnassigned ();
177
+ if (shardAssigned && balancerSettings .completeEarlyOnShardAssignmentChange ()) {
178
+ return ;
179
+ }
159
180
160
- // Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
161
- collectAndRecordNodeWeightStats (balancer , balancingWeights , allocation );
181
+ shardMoved = balancer .moveShards ();
182
+ if (shardMoved && balancerSettings .completeEarlyOnShardAssignmentChange ()) {
183
+ return ;
184
+ }
185
+
186
+ shardBalanced = balancer .balance ();
187
+ } finally {
188
+ if (logger .isDebugEnabled ()) {
189
+ logger .debug (
190
+ "shards assigned: {}, shards moved: {}, shards balanced: {}, "
191
+ + "routingNodes hasInactiveShards [{}], relocation count [{}]" ,
192
+ shardAssigned ,
193
+ shardMoved ,
194
+ shardBalanced ,
195
+ allocation .routingNodes ().hasInactiveShards (),
196
+ allocation .routingNodes ().getRelocatingShardCount ()
197
+ );
198
+ }
199
+ assert assertShardAssignmentChanges (allocation , shardAssigned , shardMoved , shardBalanced );
200
+ // Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
201
+ collectAndRecordNodeWeightStats (balancer , balancingWeights , allocation );
202
+ }
203
+ }
204
+
205
+ private boolean assertShardAssignmentChanges (
206
+ RoutingAllocation allocation ,
207
+ boolean shardAssigned ,
208
+ boolean shardMoved ,
209
+ boolean shardBalanced
210
+ ) {
211
+ if (allocation .isSimulating () == false ) {
212
+ return true ;
213
+ }
214
+ assert shardAssigned == false || allocation .routingNodes ().hasInactiveShards ()
215
+ : "expect initializing shard, but got " + allocation .routingNodes ();
216
+
217
+ assert (shardMoved == false && shardBalanced == false ) || allocation .routingNodes ().getRelocatingShardCount () > 0
218
+ : "expect relocating shard, but got " + allocation .routingNodes ();
219
+ return true ;
162
220
}
163
221
164
222
private void collectAndRecordNodeWeightStats (Balancer balancer , BalancingWeights balancingWeights , RoutingAllocation allocation ) {
@@ -188,7 +246,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
188
246
writeLoadForecaster ,
189
247
allocation ,
190
248
balancerSettings .getThreshold (),
191
- balancingWeightsFactory .create ()
249
+ balancingWeightsFactory .create (),
250
+ balancerSettings .completeEarlyOnShardAssignmentChange ()
192
251
);
193
252
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision .NOT_TAKEN ;
194
253
MoveDecision moveDecision = MoveDecision .NOT_TAKEN ;
@@ -248,12 +307,14 @@ public static class Balancer {
248
307
private final Map <String , ModelNode > nodes ;
249
308
private final BalancingWeights balancingWeights ;
250
309
private final NodeSorters nodeSorters ;
310
+ private final boolean completeEarlyOnShardAssignmentChange ;
251
311
252
312
private Balancer (
253
313
WriteLoadForecaster writeLoadForecaster ,
254
314
RoutingAllocation allocation ,
255
315
float threshold ,
256
- BalancingWeights balancingWeights
316
+ BalancingWeights balancingWeights ,
317
+ boolean completeEarlyOnShardAssignmentChange
257
318
) {
258
319
this .writeLoadForecaster = writeLoadForecaster ;
259
320
this .allocation = allocation ;
@@ -266,6 +327,7 @@ private Balancer(
266
327
nodes = Collections .unmodifiableMap (buildModelFromAssigned ());
267
328
this .nodeSorters = balancingWeights .createNodeSorters (nodesArray (), this );
268
329
this .balancingWeights = balancingWeights ;
330
+ this .completeEarlyOnShardAssignmentChange = completeEarlyOnShardAssignmentChange ;
269
331
}
270
332
271
333
private static long getShardDiskUsageInBytes (ShardRouting shardRouting , IndexMetadata indexMetadata , ClusterInfo clusterInfo ) {
@@ -358,7 +420,7 @@ private IndexMetadata indexMetadata(ProjectIndex index) {
358
420
* Balances the nodes on the cluster model according to the weight function.
359
421
* The actual balancing is delegated to {@link #balanceByWeights(NodeSorter)}
360
422
*/
361
- private void balance () {
423
+ private boolean balance () {
362
424
if (logger .isTraceEnabled ()) {
363
425
logger .trace ("Start balancing cluster" );
364
426
}
@@ -371,21 +433,27 @@ private void balance() {
371
433
* Therefore we only do a rebalance if we have fetched all information.
372
434
*/
373
435
logger .debug ("skipping rebalance due to in-flight shard/store fetches" );
374
- return ;
436
+ return false ;
375
437
}
376
438
if (allocation .deciders ().canRebalance (allocation ).type () != Type .YES ) {
377
439
logger .trace ("skipping rebalance as it is disabled" );
378
- return ;
440
+ return false ;
379
441
}
380
442
443
+ boolean shardBalanced = false ;
381
444
// Balance each partition
382
445
for (NodeSorter nodeSorter : nodeSorters ) {
383
446
if (nodeSorter .modelNodes .length < 2 ) { /* skip if we only have one node */
384
447
logger .trace ("skipping rebalance as the partition has single node only" );
385
448
continue ;
386
449
}
387
- balanceByWeights (nodeSorter );
450
+ shardBalanced |= balanceByWeights (nodeSorter );
451
+ // TODO: We could choose to account shardBalanced separately for each partition since they do not overlap.
452
+ if (shardBalanced && completeEarlyOnShardAssignmentChange ) {
453
+ return true ;
454
+ }
388
455
}
456
+ return shardBalanced ;
389
457
}
390
458
391
459
/**
@@ -531,7 +599,8 @@ private MoveDecision decideRebalance(final ProjectIndex index, final ShardRoutin
531
599
* only, or in other words relocations that move the weight delta closer
532
600
* to {@code 0.0}
533
601
*/
534
- private void balanceByWeights (NodeSorter sorter ) {
602
+ private boolean balanceByWeights (NodeSorter sorter ) {
603
+ boolean shardBalanced = false ;
535
604
final AllocationDeciders deciders = allocation .deciders ();
536
605
final ModelNode [] modelNodes = sorter .modelNodes ;
537
606
final float [] weights = sorter .weights ;
@@ -630,6 +699,18 @@ private void balanceByWeights(NodeSorter sorter) {
630
699
sorter .sort (0 , relevantNodes );
631
700
lowIdx = 0 ;
632
701
highIdx = relevantNodes - 1 ;
702
+
703
+ if (routingNodes .getRelocatingShardCount () > 0 ) {
704
+ // ES-12955: Check routingNodes.getRelocatingShardCount() > 0 in case the first relocation is a THROTTLE.
705
+ // This should rarely happen since in most cases, we don't throttle unless there is an existing relocation.
706
+ // But it can happen in production for frozen indices when the cache is still being prepared. It can also
707
+ // happen in tests because we have decider like RandomAllocationDecider that can randomly return THROTTLE
708
+ // when there is no existing relocation.
709
+ shardBalanced = true ;
710
+ }
711
+ if (completeEarlyOnShardAssignmentChange && shardBalanced ) {
712
+ return true ;
713
+ }
633
714
continue ;
634
715
}
635
716
}
@@ -651,6 +732,7 @@ private void balanceByWeights(NodeSorter sorter) {
651
732
}
652
733
}
653
734
}
735
+ return shardBalanced ;
654
736
}
655
737
656
738
/**
@@ -721,7 +803,8 @@ protected int comparePivot(int j) {
721
803
* shard is created with an incremented version in the state
722
804
* {@link ShardRoutingState#INITIALIZING}.
723
805
*/
724
- public void moveShards () {
806
+ public boolean moveShards () {
807
+ boolean shardMoved = false ;
725
808
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
726
809
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
727
810
// offloading the shards.
@@ -745,10 +828,15 @@ public void moveShards() {
745
828
if (logger .isTraceEnabled ()) {
746
829
logger .trace ("Moved shard [{}] to node [{}]" , shardRouting , targetNode .getRoutingNode ());
747
830
}
831
+ shardMoved = true ;
832
+ if (completeEarlyOnShardAssignmentChange ) {
833
+ return true ;
834
+ }
748
835
} else if (moveDecision .isDecisionTaken () && moveDecision .canRemain () == false ) {
749
836
logger .trace ("[{}][{}] can't move" , shardRouting .index (), shardRouting .id ());
750
837
}
751
838
}
839
+ return shardMoved ;
752
840
}
753
841
754
842
/**
@@ -888,14 +976,14 @@ private Map<String, ModelNode> buildModelFromAssigned() {
888
976
* Allocates all given shards on the minimal eligible node for the shards index
889
977
* with respect to the weight function. All given shards must be unassigned.
890
978
*/
891
- private void allocateUnassigned () {
979
+ private boolean allocateUnassigned () {
892
980
RoutingNodes .UnassignedShards unassigned = routingNodes .unassigned ();
893
981
assert nodes .isEmpty () == false ;
894
982
if (logger .isTraceEnabled ()) {
895
983
logger .trace ("Start allocating unassigned shards" );
896
984
}
897
985
if (unassigned .isEmpty ()) {
898
- return ;
986
+ return false ;
899
987
}
900
988
901
989
/*
@@ -932,6 +1020,7 @@ private void allocateUnassigned() {
932
1020
int secondaryLength = 0 ;
933
1021
int primaryLength = primary .length ;
934
1022
ArrayUtil .timSort (primary , comparator );
1023
+ boolean shardAssignmentChanged = false ;
935
1024
do {
936
1025
for (int i = 0 ; i < primaryLength ; i ++) {
937
1026
ShardRouting shard = primary [i ];
@@ -949,6 +1038,7 @@ private void allocateUnassigned() {
949
1038
950
1039
final long shardSize = getExpectedShardSize (shard , ShardRouting .UNAVAILABLE_EXPECTED_SHARD_SIZE , allocation );
951
1040
shard = routingNodes .initializeShard (shard , minNode .getNodeId (), null , shardSize , allocation .changes ());
1041
+ shardAssignmentChanged = true ;
952
1042
minNode .addShard (index , shard );
953
1043
if (shard .primary () == false ) {
954
1044
// copy over the same replica shards to the secondary array so they will get allocated
@@ -972,6 +1062,9 @@ private void allocateUnassigned() {
972
1062
assert allocationDecision .getAllocationStatus () == AllocationStatus .DECIDERS_THROTTLED ;
973
1063
final long shardSize = getExpectedShardSize (shard , ShardRouting .UNAVAILABLE_EXPECTED_SHARD_SIZE , allocation );
974
1064
minNode .addShard (projectIndex (shard ), shard .initialize (minNode .getNodeId (), null , shardSize ));
1065
+ // If we see a throttle decision in simulation, there must be other shards that got assigned before it.
1066
+ assert allocation .isSimulating () == false || shardAssignmentChanged
1067
+ : "shard " + shard + " was throttled but no other shards were assigned" ;
975
1068
} else {
976
1069
if (logger .isTraceEnabled ()) {
977
1070
logger .trace ("No Node found to assign shard [{}]" , shard );
@@ -994,6 +1087,7 @@ private void allocateUnassigned() {
994
1087
secondaryLength = 0 ;
995
1088
} while (primaryLength > 0 );
996
1089
// clear everything we have either added it or moved to ignoreUnassigned
1090
+ return shardAssignmentChanged ;
997
1091
}
998
1092
999
1093
private ProjectIndex projectIndex (ShardRouting shardRouting ) {
0 commit comments