|
53 | 53 | import org.elasticsearch.common.util.concurrent.EsExecutors;
|
54 | 54 | import org.elasticsearch.core.Nullable;
|
55 | 55 | import org.elasticsearch.index.shard.DocsStats;
|
| 56 | +import org.elasticsearch.index.shard.IndexingStats; |
56 | 57 | import org.elasticsearch.injection.guice.Inject;
|
57 | 58 | import org.elasticsearch.tasks.CancellableTask;
|
58 | 59 | import org.elasticsearch.tasks.Task;
|
|
68 | 69 | import java.util.Objects;
|
69 | 70 | import java.util.Optional;
|
70 | 71 | import java.util.function.Consumer;
|
| 72 | +import java.util.function.Function; |
71 | 73 | import java.util.stream.Collectors;
|
72 | 74 |
|
73 | 75 | /**
|
@@ -280,17 +282,13 @@ protected void masterOperation(
|
280 | 282 | DataStream dataStream = (DataStream) indexAbstraction;
|
281 | 283 | final Optional<IndexStats> indexStats = Optional.ofNullable(statsResponse)
|
282 | 284 | .map(stats -> stats.getIndex(dataStream.getWriteIndex().getName()));
|
283 |
| - |
284 |
| - Double indexWriteLoad = indexStats.map( |
285 |
| - stats -> Arrays.stream(stats.getShards()) |
286 |
| - .filter(shardStats -> shardStats.getStats().indexing != null) |
287 |
| - // only take primaries into account as in stateful the replicas also index data |
288 |
| - .filter(shardStats -> shardStats.getShardRouting().primary()) |
289 |
| - .map(shardStats -> shardStats.getStats().indexing.getTotal().getWriteLoad()) |
290 |
| - .reduce(0.0, Double::sum) |
291 |
| - ).orElse(null); |
292 |
| - |
293 |
| - rolloverAutoSharding = dataStreamAutoShardingService.calculate(projectState, dataStream, indexWriteLoad); |
| 285 | + rolloverAutoSharding = dataStreamAutoShardingService.calculate( |
| 286 | + projectState, |
| 287 | + dataStream, |
| 288 | + indexStats.map(stats -> sumLoadMetrics(stats, IndexingStats.Stats::getWriteLoad)).orElse(null), |
| 289 | + indexStats.map(stats -> sumLoadMetrics(stats, IndexingStats.Stats::getRecentWriteLoad)).orElse(null), |
| 290 | + indexStats.map(stats -> sumLoadMetrics(stats, IndexingStats.Stats::getPeakWriteLoad)).orElse(null) |
| 291 | + ); |
294 | 292 | logger.debug("auto sharding result for data stream [{}] is [{}]", dataStream.getName(), rolloverAutoSharding);
|
295 | 293 |
|
296 | 294 | // if auto sharding recommends increasing the number of shards we want to trigger a rollover even if there are no
|
@@ -354,6 +352,16 @@ protected void masterOperation(
|
354 | 352 | );
|
355 | 353 | }
|
356 | 354 |
|
| 355 | + private static Double sumLoadMetrics(IndexStats stats, Function<IndexingStats.Stats, Double> loadMetric) { |
| 356 | + return Arrays.stream(stats.getShards()) |
| 357 | + .filter(shardStats -> shardStats.getStats().indexing != null) |
| 358 | + // only take primaries into account as in stateful the replicas also index data |
| 359 | + .filter(shardStats -> shardStats.getShardRouting().primary()) |
| 360 | + .map(shardStats -> shardStats.getStats().indexing.getTotal()) |
| 361 | + .map(loadMetric) |
| 362 | + .reduce(0.0, Double::sum); |
| 363 | + } |
| 364 | + |
357 | 365 | private void markForLazyRollover(
|
358 | 366 | RolloverRequest rolloverRequest,
|
359 | 367 | ActionListener<RolloverResponse> listener,
|
|
0 commit comments