From a35f5319ac22144cb610ede36b19e0255ce36972 Mon Sep 17 00:00:00 2001 From: Pete Gillin Date: Thu, 27 Mar 2025 16:07:26 +0000 Subject: [PATCH 1/5] Refactor `DataStreamAutoShardingServiceTests` The main two things done in this commit are: - Split large test methods which do several independent tests in blank code blocks into more smaller methods. - Fix an unnecessarily complicated pattern where the code would create a `Function` in a local variable and then immediately `apply` it exactly once... rather than just executing the code normally. --- .../DataStreamAutoShardingServiceTests.java | 649 +++++++++--------- 1 file changed, 310 insertions(+), 339 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java index da4c907208299..75ce2e8345402 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java @@ -42,7 +42,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; import static org.elasticsearch.action.datastreams.autosharding.AutoShardingResult.NOT_APPLICABLE_RESULT; import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.COOLDOWN_PREVENTED_DECREASE; @@ -55,11 +54,14 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase { + private static final int MIN_WRITE_THREADS = 2; + private static final int MAX_WRITE_THREADS = 32; + private ClusterService clusterService; private ThreadPool threadPool; private DataStreamAutoShardingService service; private long now; - String dataStreamName; + private String dataStreamName; @Before public void setupService() { @@ -128,337 +130,307 @@ public void testCalculateValidations() { } } - public void testCalculateIncreaseShardingRecommendations() { + public void testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent() { // the input is a data stream with 5 backing indices with 1 shard each - // all 4 backing indices have a write load of 2.0 - // we'll recreate it across the test and add an auto sharding event as we iterate - { - var projectId = randomProjectIdOrDefault(); - ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); - Function dataStreamSupplier = (autoShardingEvent) -> createDataStream( - builder, - dataStreamName, - 1, - now, - List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), - getWriteLoad(1, 2.0, 9999.0, 9999.0), - autoShardingEvent - ); + var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + 1, + now, + List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), + getWriteLoad(1, 9999.0, 9999.0, 9999.0), // not used for increase calculation + null + ); + builder.put(dataStream); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) + .putProjectMetadata(builder.build()) + .build(); - DataStream dataStream = dataStreamSupplier.apply(null); - builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT) - .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) - .putProjectMetadata(builder.build()) - .build(); - - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5); - assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); - // no pre-existing scaling event so the cool down must be zero - assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); - assertThat(autoShardingResult.targetNumberOfShards(), is(3)); - } + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5); + assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); + // no pre-existing scaling event so the cool down must be zero + assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); + assertThat(autoShardingResult.targetNumberOfShards(), is(3)); + } - { - // let's add a pre-existing sharding event so that we'll return some cool down period that's preventing an INCREASE_SHARDS - // event so the result type we're expecting is COOLDOWN_PREVENTED_INCREASE - var projectId = randomProjectIdOrDefault(); - ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); - Function dataStreamSupplier = (autoShardingEvent) -> createDataStream( - builder, - dataStreamName, - 1, - now, - List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), - getWriteLoad(1, 2.0, 9999.0, 9999.0), - autoShardingEvent - ); + public void testCalculateIncreaseShardingRecommendations_preventedByCooldown() { + // the input is a data stream with 5 backing indices with 1 shard each + // let's add a pre-existing sharding event so that we'll return some cool down period that's preventing an INCREASE_SHARDS + // event so the result type we're expecting is COOLDOWN_PREVENTED_INCREASE + var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); - // generation 4 triggered an auto sharding event to 2 shards - DataStream dataStream = dataStreamSupplier.apply( - new DataStreamAutoShardingEvent(DataStream.getDefaultBackingIndexName(dataStreamName, 4), 2, now - 1005) - ); - builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT) - .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) - .putProjectMetadata(builder.build()) - .build(); - - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5); - assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_INCREASE)); - // no pre-existing scaling event so the cool down must be zero - assertThat(autoShardingResult.targetNumberOfShards(), is(3)); - // it's been 1005 millis since the last auto sharding event and the cool down is 270secoinds (270_000 millis) - assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.timeValueMillis(268995))); - } + // generation 4 triggered an auto sharding event to 2 shards + DataStreamAutoShardingEvent autoShardingEvent = new DataStreamAutoShardingEvent( + DataStream.getDefaultBackingIndexName(dataStreamName, 4), + 2, + now - 1005 + ); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + 1, + now, + List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), + getWriteLoad(1, 9999.0, 9999.0, 9999.0), // not used for increase calculation + autoShardingEvent + ); + builder.put(dataStream); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) + .putProjectMetadata(builder.build()) + .build(); - { - // let's test a subsequent increase in the number of shards after a previos auto sharding event - var projectId = randomProjectIdOrDefault(); - ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); - Function dataStreamSupplier = (autoShardingEvent) -> createDataStream( - builder, - dataStreamName, - 1, - now, - List.of(now - 10_000_000, now - 7_000_000, now - 2_000_000, now - 1_000_000, now - 1000), - getWriteLoad(1, 2.0, 9999.0, 9999.0), - autoShardingEvent - ); + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5); + assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_INCREASE)); + // no pre-existing scaling event so the cool down must be zero + assertThat(autoShardingResult.targetNumberOfShards(), is(3)); + // it's been 1005 millis since the last auto sharding event and the cool down is 270 seconds (270_000 millis) + assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.timeValueMillis(268995))); + } - // generation 3 triggered an increase in shards event to 2 shards - DataStream dataStream = dataStreamSupplier.apply( - new DataStreamAutoShardingEvent(DataStream.getDefaultBackingIndexName(dataStreamName, 4), 2, now - 2_000_100) - ); - builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT) - .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) - .putProjectMetadata(builder.build()) - .build(); - - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5); - assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); - // no pre-existing scaling event so the cool down must be zero - assertThat(autoShardingResult.targetNumberOfShards(), is(3)); - assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); - } + public void testCalculateIncreaseShardingRecommendations_notPreventedByPreviousIncrease() { + // the input is a data stream with 5 backing indices with 1 shard each + // let's test a subsequent increase in the number of shards after a previous auto sharding event + var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); + // generation 3 triggered an increase in shards event to 2 shards + DataStreamAutoShardingEvent autoShardingEvent = new DataStreamAutoShardingEvent( + DataStream.getDefaultBackingIndexName(dataStreamName, 4), + 2, + now - 2_000_100 + ); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + 1, + now, + List.of(now - 10_000_000, now - 7_000_000, now - 2_000_000, now - 1_000_000, now - 1000), + getWriteLoad(1, 9999.0, 9999.0, 9999.0), // not used for increase calculation + autoShardingEvent + ); + builder.put(dataStream); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) + .putProjectMetadata(builder.build()) + .build(); + + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5); + assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); + // no pre-existing scaling event so the cool down must be zero + assertThat(autoShardingResult.targetNumberOfShards(), is(3)); + assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); } - public void testCalculateDecreaseShardingRecommendations() { + public void testCalculateDecreaseShardingRecommendations_dataStreamNotOldEnough() { // the input is a data stream with 5 backing indices with 3 shards each - { - // testing a decrease shards events prevented by the cool down period not lapsing due to the oldest generation index being - // "too new" (i.e. the cool down period hasn't lapsed since the oldest generation index) - var projectId = randomProjectIdOrDefault(); - ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); - Function dataStreamSupplier = (autoShardingEvent) -> createDataStream( - builder, - dataStreamName, - 3, - now, - List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), - getWriteLoad(3, 0.25, 9999.0, 9999.0), - autoShardingEvent - ); + // testing a decrease shards events prevented by the cool down period not lapsing due to the oldest generation index being + // "too new" (i.e. the cool down period hasn't lapsed since the oldest generation index) + var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + 3, + now, + List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), + getWriteLoad(3, 0.25, 9999.0, 9999.0), + null + ); + builder.put(dataStream); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) + .putProjectMetadata(builder.build()) + .build(); - DataStream dataStream = dataStreamSupplier.apply(null); - builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT) - .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) - .putProjectMetadata(builder.build()) - .build(); - - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0); - // the cooldown period for the decrease shards event hasn't lapsed since the data stream was created - assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_DECREASE)); - assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.timeValueMillis(TimeValue.timeValueDays(3).millis() - 10_000))); - // based on the write load of 0.75 we should be reducing the number of shards to 1 - assertThat(autoShardingResult.targetNumberOfShards(), is(1)); - } + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0); + // the cooldown period for the decrease shards event hasn't lapsed since the data stream was created + assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_DECREASE)); + assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.timeValueMillis(TimeValue.timeValueDays(3).millis() - 10_000))); + // based on the write load of 0.75 we should be reducing the number of shards to 1 + assertThat(autoShardingResult.targetNumberOfShards(), is(1)); + } - { - var projectId = randomProjectIdOrDefault(); - ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); - Function dataStreamSupplier = (autoShardingEvent) -> createDataStream( - builder, - dataStreamName, - 3, - now, - List.of( - now - TimeValue.timeValueDays(21).getMillis(), - now - TimeValue.timeValueDays(15).getMillis(), - now - TimeValue.timeValueDays(4).getMillis(), - now - TimeValue.timeValueDays(2).getMillis(), - now - 1000 - ), - getWriteLoad(3, 0.333, 9999.0, 9999.0), - autoShardingEvent - ); + public void testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent() { + // the input is a data stream with 5 backing indices with 3 shards each + var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + 3, + now, + List.of( + now - TimeValue.timeValueDays(21).getMillis(), + now - TimeValue.timeValueDays(15).getMillis(), + now - TimeValue.timeValueDays(4).getMillis(), + now - TimeValue.timeValueDays(2).getMillis(), + now - 1000 + ), + getWriteLoad(3, 0.333, 9999.0, 9999.0), + null + ); + builder.put(dataStream); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) + .putProjectMetadata(builder.build()) + .build(); - DataStream dataStream = dataStreamSupplier.apply(null); - builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT) - .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) - .putProjectMetadata(builder.build()) - .build(); - - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0); - assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); - assertThat(autoShardingResult.targetNumberOfShards(), is(1)); - // no pre-existing auto sharding event however we have old enough backing indices (older than the cooldown period) so we can - // make a decision to reduce the number of shards - assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); - } + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0); + assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); + assertThat(autoShardingResult.targetNumberOfShards(), is(1)); + // no pre-existing auto sharding event however we have old enough backing indices (older than the cooldown period) so we can + // make a decision to reduce the number of shards + assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); + } - { - // let's test a decrease in number of shards after a previous decrease event - var projectId = randomProjectIdOrDefault(); - ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); - Function dataStreamSupplier = (autoShardingEvent) -> createDataStream( - builder, - dataStreamName, - 3, - now, - List.of( - now - TimeValue.timeValueDays(21).getMillis(), - now - TimeValue.timeValueDays(15).getMillis(), // triggers auto sharding event - now - TimeValue.timeValueDays(4).getMillis(), - now - TimeValue.timeValueDays(2).getMillis(), - now - 1000 - ), - getWriteLoad(3, 0.333, 9999.0, 9999.0), - autoShardingEvent - ); + public void testCalculateDecreaseShardingRecommendations_notPreventedByPreviousDecrease() { + // the input is a data stream with 5 backing indices with 3 shards each + // let's test a decrease in number of shards after a previous decrease event + var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); + // generation 2 triggered a decrease in shards event to 2 shards + DataStreamAutoShardingEvent autoShardingEvent = new DataStreamAutoShardingEvent( + DataStream.getDefaultBackingIndexName(dataStreamName, 2), + 2, + now - TimeValue.timeValueDays(4).getMillis() + ); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + 3, + now, + List.of( + now - TimeValue.timeValueDays(21).getMillis(), + now - TimeValue.timeValueDays(15).getMillis(), // triggers auto sharding event + now - TimeValue.timeValueDays(4).getMillis(), + now - TimeValue.timeValueDays(2).getMillis(), + now - 1000 + ), + getWriteLoad(3, 0.333, 9999.0, 9999.0), + autoShardingEvent + ); + builder.put(dataStream); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) + .putProjectMetadata(builder.build()) + .build(); - // generation 2 triggered a decrease in shards event to 2 shards - DataStream dataStream = dataStreamSupplier.apply( - new DataStreamAutoShardingEvent( - DataStream.getDefaultBackingIndexName(dataStreamName, 2), - 2, - now - TimeValue.timeValueDays(4).getMillis() - ) - ); - builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT) - .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) - .putProjectMetadata(builder.build()) - .build(); - - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0); - assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); - assertThat(autoShardingResult.targetNumberOfShards(), is(1)); - assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); - } + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0); + assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); + assertThat(autoShardingResult.targetNumberOfShards(), is(1)); + assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); + } - { - // let's test a decrease in number of shards that's prevented by the cool down period due to a previous sharding event - // the expected result type here is COOLDOWN_PREVENTED_DECREASE - var projectId = randomProjectIdOrDefault(); - ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); - Function dataStreamSupplier = (autoShardingEvent) -> createDataStream( - builder, - dataStreamName, - 3, - now, - List.of( - now - TimeValue.timeValueDays(21).getMillis(), - now - TimeValue.timeValueDays(2).getMillis(), // triggers auto sharding event - now - TimeValue.timeValueDays(1).getMillis(), - now - 1000 - ), - getWriteLoad(3, 0.25, 9999.0, 9999.0), - autoShardingEvent - ); + public void testCalculateDecreaseShardingRecommendations_preventedByCooldown() { + // the input is a data stream with 5 backing indices with 3 shards each + // let's test a decrease in number of shards that's prevented by the cool down period due to a previous sharding event + // the expected result type here is COOLDOWN_PREVENTED_DECREASE + var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); + // generation 2 triggered a decrease in shards event to 2 shards + DataStreamAutoShardingEvent autoShardingEvent = new DataStreamAutoShardingEvent( + DataStream.getDefaultBackingIndexName(dataStreamName, 2), + 2, + now - TimeValue.timeValueDays(2).getMillis() + ); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + 3, + now, + List.of( + now - TimeValue.timeValueDays(21).getMillis(), + now - TimeValue.timeValueDays(2).getMillis(), // triggers auto sharding event + now - TimeValue.timeValueDays(1).getMillis(), + now - 1000 + ), + getWriteLoad(3, 0.25, 9999.0, 9999.0), + autoShardingEvent + ); + builder.put(dataStream); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) + .putProjectMetadata(builder.build()) + .build(); - // generation 2 triggered a decrease in shards event to 2 shards - DataStream dataStream = dataStreamSupplier.apply( - new DataStreamAutoShardingEvent( - DataStream.getDefaultBackingIndexName(dataStreamName, 2), - 2, - now - TimeValue.timeValueDays(2).getMillis() - ) - ); - builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT) - .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) - .putProjectMetadata(builder.build()) - .build(); - - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0); - assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_DECREASE)); - assertThat(autoShardingResult.targetNumberOfShards(), is(1)); - assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.timeValueDays(1))); - } + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0); + assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_DECREASE)); + assertThat(autoShardingResult.targetNumberOfShards(), is(1)); + assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.timeValueDays(1))); + } - { - // no change required - var projectId = randomProjectIdOrDefault(); - ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); - Function dataStreamSupplier = (autoShardingEvent) -> createDataStream( - builder, - dataStreamName, - 3, - now, - List.of( - now - TimeValue.timeValueDays(21).getMillis(), - now - TimeValue.timeValueDays(15).getMillis(), - now - TimeValue.timeValueDays(4).getMillis(), - now - TimeValue.timeValueDays(2).getMillis(), - now - 1000 - ), - getWriteLoad(3, 1.333, 9999.0, 9999.0), - autoShardingEvent - ); + public void testCalculateDecreaseShardingRecommendations_noChangeRequired() { + // the input is a data stream with 5 backing indices with 3 shards each + // no change required + var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); + // generation 2 triggered a decrease in shards event to 2 shards + DataStream dataStream = createDataStream( + builder, + dataStreamName, + 3, + now, + List.of( + now - TimeValue.timeValueDays(21).getMillis(), + now - TimeValue.timeValueDays(15).getMillis(), + now - TimeValue.timeValueDays(4).getMillis(), + now - TimeValue.timeValueDays(2).getMillis(), + now - 1000 + ), + getWriteLoad(3, 1.333, 9999.0, 9999.0), + null + ); + builder.put(dataStream); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) + .putProjectMetadata(builder.build()) + .build(); - // generation 2 triggered a decrease in shards event to 2 shards - DataStream dataStream = dataStreamSupplier.apply(null); - builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT) - .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) - .putProjectMetadata(builder.build()) - .build(); - - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 4.0); - assertThat(autoShardingResult.type(), is(NO_CHANGE_REQUIRED)); - assertThat(autoShardingResult.targetNumberOfShards(), is(3)); - assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); - } + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 4.0); + assertThat(autoShardingResult.type(), is(NO_CHANGE_REQUIRED)); + assertThat(autoShardingResult.targetNumberOfShards(), is(3)); + assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); } - public void testComputeOptimalNumberOfShards() { - int minWriteThreads = 2; - int maxWriteThreads = 32; - - { - // 0.0 indexing load recommends 1 shard - logger.info("-> indexingLoad {}", 0.0); - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, 0.0), is(1L)); - } - { - // the small values will be very common so let's randomise to make sure we never go below 1L - double indexingLoad = randomDoubleBetween(0.0001, 1.0, true); - logger.info("-> indexingLoad {}", indexingLoad); - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, indexingLoad), is(1L)); - } + public void testComputeOptimalNumberOfShards_zeroLoad() { + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 0.0), is(1L)); + } - { - double indexingLoad = 2.0; - logger.info("-> indexingLoad {}", indexingLoad); - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, indexingLoad), is(2L)); - } + public void testComputeOptimalNumberOfShards_smallLoad() { + // the small values will be very common so let's randomise to make sure we never go below 1L + double indexingLoad = randomDoubleBetween(0.0001, 1.0, true); + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, indexingLoad), is(1L)); + } - { - // there's a broad range of popular values (a write index starting to be very busy, using between 3 and all of the 32 write - // threads, so let's randomise this too to make sure we stay at 3 recommended shards) - double indexingLoad = randomDoubleBetween(3.0002, 32.0, true); - logger.info("-> indexingLoad {}", indexingLoad); + public void testComputeOptimalNumberOfShards_load2() { + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 2.0), is(2L)); + } - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, indexingLoad), is(3L)); - } + public void testComputeOptimalNumberOfShards_loadUpTo32() { + // there's a broad range of popular values (a write index starting to be very busy, using between 3 and all of the 32 write + // threads, so let's randomise this too to make sure we stay at 3 recommended shards) + double indexingLoad = randomDoubleBetween(3.0002, 32.0, true); + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, indexingLoad), is(3L)); + } - { - double indexingLoad = 49.0; - logger.info("-> indexingLoad {}", indexingLoad); - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, indexingLoad), is(4L)); - } + public void testComputeOptimalNumberOfShards_load49() { + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 49.0), is(4L)); + } - { - double indexingLoad = 70.0; - logger.info("-> indexingLoad {}", indexingLoad); - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, indexingLoad), is(5L)); - } + public void testComputeOptimalNumberOfShards_load70() { + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 70.0), is(5L)); + } - { - double indexingLoad = 100.0; - logger.info("-> indexingLoad {}", indexingLoad); - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, indexingLoad), is(7L)); - } + public void testComputeOptimalNumberOfShards_load100() { + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 100.0), is(7L)); + } - { - double indexingLoad = 180.0; - logger.info("-> indexingLoad {}", indexingLoad); - assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, indexingLoad), is(12L)); - } + public void testComputeOptimalNumberOfShards_load180() { + assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 180.0), is(12L)); } public void testGetMaxIndexLoadWithinCoolingPeriod() { @@ -546,7 +518,7 @@ public void testGetMaxIndexLoadWithinCoolingPeriod() { assertThat(maxIndexLoadWithinCoolingPeriod, is(lastIndexBeforeCoolingPeriodHasLowWriteLoad ? 15.0 : 999.0)); } - public void testIndexLoadWithinCoolingPeriodIsSumOfShardsLoads() { + public void testGetMaxIndexLoadWithinCoolingPeriod_sumsShardLoads() { final TimeValue coolingPeriod = TimeValue.timeValueDays(3); final Metadata.Builder metadataBuilder = Metadata.builder(); @@ -606,41 +578,40 @@ public void testIndexLoadWithinCoolingPeriodIsSumOfShardsLoads() { assertThat(maxIndexLoadWithinCoolingPeriod, is(expectedIsSumOfShardLoads)); } - public void testAutoShardingResultValidation() { - { - // throws exception when constructed using types that shouldn't report cooldowns - expectThrows( - IllegalArgumentException.class, - () -> new AutoShardingResult(INCREASE_SHARDS, 1, 3, TimeValue.timeValueSeconds(3), 3.0) - ); - - expectThrows( - IllegalArgumentException.class, - () -> new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.timeValueSeconds(3), 1.0) - ); + public void testAutoShardingResultValidation_increaseShardsShouldNotReportCooldown() { + expectThrows( + IllegalArgumentException.class, + () -> new AutoShardingResult(INCREASE_SHARDS, 1, 3, TimeValue.timeValueSeconds(3), 3.0) + ); + } - } + public void testAutoShardingResultValidation_decreaseShardsShouldNotReportCooldown() { + expectThrows( + IllegalArgumentException.class, + () -> new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.timeValueSeconds(3), 1.0) + ); + } - { - // we can successfully create results with cooldown period for the designated types - AutoShardingResult cooldownPreventedIncrease = new AutoShardingResult( - COOLDOWN_PREVENTED_INCREASE, - 1, - 3, - TimeValue.timeValueSeconds(3), - 3.0 - ); - assertThat(cooldownPreventedIncrease.coolDownRemaining(), is(TimeValue.timeValueSeconds(3))); + public void testAutoShardingResultValidation_validCooldownPreventedIncrease() { + AutoShardingResult cooldownPreventedIncrease = new AutoShardingResult( + COOLDOWN_PREVENTED_INCREASE, + 1, + 3, + TimeValue.timeValueSeconds(3), + 3.0 + ); + assertThat(cooldownPreventedIncrease.coolDownRemaining(), is(TimeValue.timeValueSeconds(3))); + } - AutoShardingResult cooldownPreventedDecrease = new AutoShardingResult( - COOLDOWN_PREVENTED_DECREASE, - 3, - 1, - TimeValue.timeValueSeconds(7), - 1.0 - ); - assertThat(cooldownPreventedDecrease.coolDownRemaining(), is(TimeValue.timeValueSeconds(7))); - } + public void testAutoShardingResultValidation_validCooldownPreventedDecrease() { + AutoShardingResult cooldownPreventedDecrease = new AutoShardingResult( + COOLDOWN_PREVENTED_DECREASE, + 3, + 1, + TimeValue.timeValueSeconds(7), + 1.0 + ); + assertThat(cooldownPreventedDecrease.coolDownRemaining(), is(TimeValue.timeValueSeconds(7))); } private DataStream createDataStream( From fe877468b339e5daf311bb1c0e579640bfa07d91 Mon Sep 17 00:00:00 2001 From: Pete Gillin Date: Thu, 20 Mar 2025 20:40:42 +0000 Subject: [PATCH 2/5] Configurable metrics in data stream auto-sharding This adds cluster settings to allow for a choice of write load metrics in the data stream auto-sharding calculations. There are separate settings for the increasing and decreasing calculations. Both default to the existing 'all-time' metric for now. --- .../rollover/TransportRolloverAction.java | 30 +- .../DataStreamAutoShardingService.java | 201 +++++++++-- .../common/settings/ClusterSettings.java | 2 + .../DataStreamAutoShardingServiceTests.java | 337 +++++++++++++++++- 4 files changed, 505 insertions(+), 65 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java index a1f41656ddbbb..e356b466ee391 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -53,6 +53,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; @@ -68,6 +69,7 @@ import java.util.Objects; import java.util.Optional; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -280,17 +282,13 @@ protected void masterOperation( DataStream dataStream = (DataStream) indexAbstraction; final Optional indexStats = Optional.ofNullable(statsResponse) .map(stats -> stats.getIndex(dataStream.getWriteIndex().getName())); - - Double indexWriteLoad = indexStats.map( - stats -> Arrays.stream(stats.getShards()) - .filter(shardStats -> shardStats.getStats().indexing != null) - // only take primaries into account as in stateful the replicas also index data - .filter(shardStats -> shardStats.getShardRouting().primary()) - .map(shardStats -> shardStats.getStats().indexing.getTotal().getWriteLoad()) - .reduce(0.0, Double::sum) - ).orElse(null); - - rolloverAutoSharding = dataStreamAutoShardingService.calculate(projectState, dataStream, indexWriteLoad); + rolloverAutoSharding = dataStreamAutoShardingService.calculate( + projectState, + dataStream, + indexStats.map(stats -> sumLoadMetrics(stats, IndexingStats.Stats::getWriteLoad)).orElse(null), + indexStats.map(stats -> sumLoadMetrics(stats, IndexingStats.Stats::getRecentWriteLoad)).orElse(null), + indexStats.map(stats -> sumLoadMetrics(stats, IndexingStats.Stats::getPeakWriteLoad)).orElse(null) + ); logger.debug("auto sharding result for data stream [{}] is [{}]", dataStream.getName(), rolloverAutoSharding); // if auto sharding recommends increasing the number of shards we want to trigger a rollover even if there are no @@ -354,6 +352,16 @@ protected void masterOperation( ); } + private static Double sumLoadMetrics(IndexStats stats, Function loadMetric) { + return Arrays.stream(stats.getShards()) + .filter(shardStats -> shardStats.getStats().indexing != null) + // only take primaries into account as in stateful the replicas also index data + .filter(shardStats -> shardStats.getShardRouting().primary()) + .map(shardStats -> shardStats.getStats().indexing.getTotal()) + .map(loadMetric) + .reduce(0.0, Double::sum); + } + private void markForLazyRollover( RolloverRequest rolloverRequest, ActionListener listener, diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java index 42f7d7888a050..d9080d2c86aef 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java @@ -24,6 +24,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.IndexingStats; import java.util.List; import java.util.Objects; @@ -95,6 +96,51 @@ public class DataStreamAutoShardingService { Setting.Property.Dynamic, Setting.Property.NodeScope ); + + /** + * Enumerates the different ways of measuring write load which we can choose between to use in the auto-sharding calculations. + */ + public enum WriteLoadMetric { + + /** + * An unweighted average of the load across the whole time since each shard started (see + * {@link IndexingStats.Stats#getWriteLoad()}). + */ + ALL_TIME, + + /** + * A weighted average of the load favoring recent load (see {@link IndexingStats.Stats#getRecentWriteLoad()}). + */ + RECENT, + + /** + * A measure of the peak value observed for the {@link #RECENT} metric (see {@link IndexingStats.Stats#getPeakWriteLoad()}). + */ + PEAK + } + + /** + * Represents which write load metric should be used for the calculation when considering increasing shards. + */ + public static final Setting DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC = Setting.enumSetting( + WriteLoadMetric.class, + "data_streams.auto_sharding.increase_shards.load_metric", + WriteLoadMetric.ALL_TIME, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Represents which write load metric should be used for the calculation when considering decreasing shards. + */ + public static final Setting DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC = Setting.enumSetting( + WriteLoadMetric.class, + "data_streams.auto_sharding.decrease_shards.load_metric", + WriteLoadMetric.ALL_TIME, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + private final ClusterService clusterService; private final boolean isAutoShardingEnabled; private final LongSupplier nowSupplier; @@ -103,6 +149,8 @@ public class DataStreamAutoShardingService { private volatile int minWriteThreads; private volatile int maxWriteThreads; private volatile List dataStreamExcludePatterns; + private volatile WriteLoadMetric increaseShardsMetric; + private volatile WriteLoadMetric decreaseShardsMetric; public DataStreamAutoShardingService(Settings settings, ClusterService clusterService, LongSupplier nowSupplier) { this.clusterService = clusterService; @@ -112,6 +160,8 @@ public DataStreamAutoShardingService(Settings settings, ClusterService clusterSe this.minWriteThreads = CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS.get(settings); this.maxWriteThreads = CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS.get(settings); this.dataStreamExcludePatterns = DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.get(settings); + this.increaseShardsMetric = DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC.get(settings); + this.decreaseShardsMetric = DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC.get(settings); this.nowSupplier = nowSupplier; } @@ -124,74 +174,102 @@ public void init() { clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS, this::updateMaxWriteThreads); clusterService.getClusterSettings() .addSettingsUpdateConsumer(DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING, this::updateDataStreamExcludePatterns); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC, this::updateIncreaseShardsMetric); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC, this::updateDecreaseShardsMetric); } /** * Computes the optimal number of shards for the provided data stream according to the write index's indexing load (to check if we must * increase the number of shards, whilst the heuristics for decreasing the number of shards _might_ use the provided write indexing * load). - * The result type will indicate the recommendation of the auto sharding service : - * - not applicable if the data stream is excluded from auto sharding as configured by + * + *

The result type will indicate the recommendation of the auto sharding service: + *

    + *
  • not applicable if the data stream is excluded from auto sharding as configured by * {@link #DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING} or if the auto sharding functionality is disabled according to - * {@link #DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING}, or if the cluster doesn't have the feature available - * - increase number of shards if the optimal number of shards it deems necessary for the provided data stream is GT the current number - * of shards - * - decrease the number of shards if the optimal number of shards it deems necessary for the provided data stream is LT the current + * {@link #DATA_STREAMS_AUTO_SHARDING_ENABLED}, or if the write index's indexing load is not available + *
  • increase number of shards if the optimal number of shards it deems necessary for the provided data stream is GT the current + * number of shards + *
  • decrease the number of shards if the optimal number of shards it deems necessary for the provided data stream is LT the current * number of shards + *
* - * If the recommendation is to INCREASE/DECREASE shards the reported cooldown period will be TimeValue.ZERO. + *

If the recommendation is to INCREASE/DECREASE shards the reported cooldown period will be TimeValue.ZERO. * If the auto sharding service thinks the number of shards must be changed but it can't recommend a change due to the cooldown * period not lapsing, the result will be of type {@link AutoShardingType#COOLDOWN_PREVENTED_INCREASE} or * {@link AutoShardingType#COOLDOWN_PREVENTED_INCREASE} with the remaining cooldown configured and the number of shards that should * be configured for the data stream once the remaining cooldown lapses as the target number of shards. * - * The NOT_APPLICABLE type result will report a cooldown period of TimeValue.MAX_VALUE. + *

The NOT_APPLICABLE type result will report a cooldown period of TimeValue.MAX_VALUE. * - * The NO_CHANGE_REQUIRED type will potentially report the remaining cooldown always report a cool down period of TimeValue.ZERO (as + *

The NO_CHANGE_REQUIRED type will potentially report the remaining cooldown always report a cool down period of TimeValue.ZERO (as * there'll be no new auto sharding event) */ - public AutoShardingResult calculate(ProjectState state, DataStream dataStream, @Nullable Double writeIndexLoad) { + public AutoShardingResult calculate( + ProjectState state, + DataStream dataStream, + @Nullable Double writeIndexLoad, + @Nullable Double writeIndexRecentLoad, + @Nullable Double writeIndexPeakLoad + ) { if (isAutoShardingEnabled == false) { - logger.debug("Data stream auto sharding service is not enabled."); + logger.debug("Data stream auto-sharding service is not enabled."); return NOT_APPLICABLE_RESULT; } if (dataStreamExcludePatterns.stream().anyMatch(pattern -> Regex.simpleMatch(pattern, dataStream.getName()))) { logger.debug( - "Data stream [{}] is excluded from auto sharding via the [{}] setting", + "Data stream [{}] is excluded from auto-sharding via the [{}] setting", dataStream.getName(), DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.getKey() ); return NOT_APPLICABLE_RESULT; } - if (writeIndexLoad == null) { + Double writeIndexLoadForIncrease = pickMetric(increaseShardsMetric, writeIndexLoad, writeIndexRecentLoad, writeIndexPeakLoad); + Double writeIndexLoadForDecrease = pickMetric(decreaseShardsMetric, writeIndexLoad, writeIndexRecentLoad, writeIndexPeakLoad); + + if (writeIndexLoadForIncrease == null || writeIndexLoadForDecrease == null) { logger.debug( - "Data stream auto sharding service cannot compute the optimal number of shards for data stream [{}] as the write index " - + "load is not available", + "Data stream auto-sharding service cannot compute the optimal number of shards for data stream [{}] as the write index " + + "loads are not available", dataStream.getName() ); return NOT_APPLICABLE_RESULT; } - return innerCalculate(state.metadata(), dataStream, writeIndexLoad, nowSupplier); + + logger.trace( + "Data stream auto-sharding service calculating recommendation with all-time load {}, recent load {}, peak load {}, " + + "using {} for increase and {} for decrease", + writeIndexLoad, + writeIndexRecentLoad, + writeIndexPeakLoad, + increaseShardsMetric, + decreaseShardsMetric + ); + + return innerCalculate(state.metadata(), dataStream, writeIndexLoadForIncrease, writeIndexLoadForDecrease, nowSupplier); } private AutoShardingResult innerCalculate( ProjectMetadata project, DataStream dataStream, - double writeIndexLoad, + double writeIndexLoadForIncrease, + double writeIndexLoadForDecrease, LongSupplier nowSupplier ) { // increasing the number of shards is calculated solely based on the index load of the write index IndexMetadata writeIndex = project.index(dataStream.getWriteIndex()); assert writeIndex != null : "the data stream write index must exist in the provided cluster metadata"; - AutoShardingResult increaseShardsResult = getIncreaseShardsResult(dataStream, writeIndexLoad, nowSupplier, writeIndex); + AutoShardingResult increaseShardsResult = getIncreaseShardsResult(dataStream, writeIndexLoadForIncrease, nowSupplier, writeIndex); return Objects.requireNonNullElseGet( increaseShardsResult, () -> getDecreaseShardsResult( project, dataStream, - writeIndexLoad, + writeIndexLoadForDecrease, nowSupplier, writeIndex, getRemainingDecreaseShardsCooldown(project, dataStream) @@ -203,12 +281,22 @@ private AutoShardingResult innerCalculate( @Nullable private AutoShardingResult getIncreaseShardsResult( DataStream dataStream, - double writeIndexLoad, + double writeIndexLoadForIncrease, LongSupplier nowSupplier, IndexMetadata writeIndex ) { // increasing the number of shards is calculated solely based on the index load of the write index - long optimalShardCount = computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, writeIndexLoad); + long optimalShardCount = computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, writeIndexLoadForIncrease); + logger.trace( + "Calculated the optimal number of shards for a potential increase in number of shards for data stream [{}] as [{}]" + + " with the {} indexing load [{}] for the write index assuming [{}-{}] threads per shard", + dataStream.getName(), + optimalShardCount, + increaseShardsMetric, + writeIndexLoadForIncrease, + minWriteThreads, + maxWriteThreads + ); if (optimalShardCount > writeIndex.getNumberOfShards()) { TimeValue timeSinceLastAutoShardingEvent = dataStream.getAutoShardingEvent() != null ? dataStream.getAutoShardingEvent().getTimeSinceLastAutoShardingEvent(nowSupplier) @@ -218,7 +306,7 @@ private AutoShardingResult getIncreaseShardsResult( Math.max(0L, increaseShardsCooldown.millis() - timeSinceLastAutoShardingEvent.millis()) ); logger.debug( - "data stream autosharding service recommends increasing the number of shards from [{}] to [{}] after [{}] cooldown for " + "Data stream auto-sharding service recommends increasing the number of shards from [{}] to [{}] after [{}] cooldown for " + "data stream [{}]", writeIndex.getNumberOfShards(), optimalShardCount, @@ -230,7 +318,7 @@ private AutoShardingResult getIncreaseShardsResult( writeIndex.getNumberOfShards(), Math.toIntExact(optimalShardCount), coolDownRemaining, - writeIndexLoad + writeIndexLoadForIncrease ); } return null; @@ -264,7 +352,7 @@ private TimeValue getRemainingDecreaseShardsCooldown(ProjectMetadata project, Da private AutoShardingResult getDecreaseShardsResult( ProjectMetadata project, DataStream dataStream, - double writeIndexLoad, + double writeIndexLoadForDecrease, LongSupplier nowSupplier, IndexMetadata writeIndex, TimeValue remainingReduceShardsCooldown @@ -272,21 +360,27 @@ private AutoShardingResult getDecreaseShardsResult( double maxIndexLoadWithinCoolingPeriod = getMaxIndexLoadWithinCoolingPeriod( project, dataStream, - writeIndexLoad, + writeIndexLoadForDecrease, reduceShardsCooldown, - nowSupplier + nowSupplier, + decreaseShardsMetric ); + long optimalShardCount = computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, maxIndexLoadWithinCoolingPeriod); logger.trace( - "calculating the optimal number of shards for a potential decrease in number of shards for data stream [{}] with the" - + " max indexing load [{}] over the decrease shards cool down period", + "Calculated the optimal number of shards for a potential decrease in number of shards for data stream [{}] as [{}]" + + " shards, using a max {} indexing load [{}] over the cool down period [{}] assuming [{}-{}] threads per shard", dataStream.getName(), - maxIndexLoadWithinCoolingPeriod + optimalShardCount, + decreaseShardsMetric, + maxIndexLoadWithinCoolingPeriod, + reduceShardsCooldown, + minWriteThreads, + maxWriteThreads ); - long optimalShardCount = computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, maxIndexLoadWithinCoolingPeriod); if (optimalShardCount < writeIndex.getNumberOfShards()) { logger.debug( - "data stream autosharding service recommends decreasing the number of shards from [{}] to [{}] after [{}] cooldown for " + "data stream auto-sharding service recommends decreasing the number of shards from [{}] to [{}] after [{}] cooldown for " + "data stream [{}]", writeIndex.getNumberOfShards(), optimalShardCount, @@ -307,7 +401,7 @@ private AutoShardingResult getDecreaseShardsResult( } logger.trace( - "data stream autosharding service recommends maintaining the number of shards [{}] for data stream [{}]", + "data stream auto-sharding service recommends maintaining the number of shards [{}] for data stream [{}]", writeIndex.getNumberOfShards(), dataStream.getName() ); @@ -355,9 +449,10 @@ private static long roundUp(double value) { static double getMaxIndexLoadWithinCoolingPeriod( ProjectMetadata project, DataStream dataStream, - double writeIndexLoad, + double writeIndexLoadForDecrease, TimeValue coolingPeriod, - LongSupplier nowSupplier + LongSupplier nowSupplier, + WriteLoadMetric decreaseShardsMetric ) { // for reducing the number of shards we look at more than just the write index List writeLoadsWithinCoolingPeriod = DataStream.getIndicesWithinMaxAgeRange( @@ -377,12 +472,19 @@ static double getMaxIndexLoadWithinCoolingPeriod( .toList(); // assume the current write index load is the highest observed and look back to find the actual maximum - double maxIndexLoadWithinCoolingPeriod = writeIndexLoad; + double maxIndexLoadWithinCoolingPeriod = writeIndexLoadForDecrease; for (IndexWriteLoad writeLoad : writeLoadsWithinCoolingPeriod) { double totalIndexLoad = 0; for (int shardId = 0; shardId < writeLoad.numberOfShards(); shardId++) { - final OptionalDouble writeLoadForShard = writeLoad.getWriteLoadForShard(shardId); - totalIndexLoad += writeLoadForShard.orElse(0); + Double writeLoadForShard = pickMetric( + decreaseShardsMetric, + optionalDoubleToNullable(writeLoad.getWriteLoadForShard(shardId)), + optionalDoubleToNullable(writeLoad.getRecentWriteLoadForShard(shardId)), + optionalDoubleToNullable(writeLoad.getPeakWriteLoadForShard(shardId)) + ); + if (writeLoadForShard != null) { + totalIndexLoad += writeLoadForShard; + } } if (totalIndexLoad > maxIndexLoadWithinCoolingPeriod) { maxIndexLoadWithinCoolingPeriod = totalIndexLoad; @@ -410,4 +512,29 @@ void updateMaxWriteThreads(int maxNumberWriteThreads) { private void updateDataStreamExcludePatterns(List newExcludePatterns) { this.dataStreamExcludePatterns = newExcludePatterns; } + + private void updateIncreaseShardsMetric(WriteLoadMetric newMetric) { + this.increaseShardsMetric = newMetric; + } + + private void updateDecreaseShardsMetric(WriteLoadMetric newMetric) { + this.decreaseShardsMetric = newMetric; + } + + private static Double pickMetric( + WriteLoadMetric metric, + Double writeIndexLoad, + Double writeIndexRecentLoad, + Double writeIndexPeakLoad + ) { + return switch (metric) { + case ALL_TIME -> writeIndexLoad; + case RECENT -> writeIndexRecentLoad != null ? writeIndexRecentLoad : writeIndexLoad; // fall-back to all-time metric if null + case PEAK -> writeIndexPeakLoad != null ? writeIndexPeakLoad : writeIndexLoad; // fall-back to all-time metric if null + }; + } + + private static Double optionalDoubleToNullable(OptionalDouble optional) { + return optional.isPresent() ? optional.getAsDouble() : null; + } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 9cee0872f3f7d..b619ad1e566fc 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -230,6 +230,8 @@ public void apply(Settings value, Settings current, Settings previous) { DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING, DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS, DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS, + DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC, + DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC, DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING, DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING, DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java index 75ce2e8345402..64245c26b17b9 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition; import org.elasticsearch.action.admin.indices.rollover.RolloverInfo; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.WriteLoadMetric; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; @@ -49,7 +50,10 @@ import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.DECREASE_SHARDS; import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.INCREASE_SHARDS; import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.NO_CHANGE_REQUIRED; +import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC; +import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; public class DataStreamAutoShardingServiceTests extends ESTestCase { @@ -57,6 +61,7 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase { private static final int MIN_WRITE_THREADS = 2; private static final int MAX_WRITE_THREADS = 32; + private ClusterSettings clusterSettings; private ClusterService clusterService; private ThreadPool threadPool; private DataStreamAutoShardingService service; @@ -75,7 +80,7 @@ public void setupService() { Setting.Property.NodeScope ) ); - ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, builtInClusterSettings); + clusterSettings = new ClusterSettings(Settings.EMPTY, builtInClusterSettings); clusterService = createClusterService(threadPool, clusterSettings); now = System.currentTimeMillis(); service = new DataStreamAutoShardingService( @@ -83,6 +88,7 @@ public void setupService() { clusterService, () -> now ); + service.init(); dataStreamName = randomAlphaOfLengthBetween(10, 100); logger.info("-> data stream name is [{}]", dataStreamName); } @@ -119,15 +125,37 @@ public void testCalculateValidations() { System::currentTimeMillis ); - AutoShardingResult autoShardingResult = disabledAutoshardingService.calculate(state.projectState(projectId), dataStream, 2.0); + AutoShardingResult autoShardingResult = disabledAutoshardingService.calculate( + state.projectState(projectId), + dataStream, + 2.0, + 9999.0, + 9999.0 + ); assertThat(autoShardingResult, is(NOT_APPLICABLE_RESULT)); } { - // null write load passed - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, null); + // null ALL_TIME write load passed (used by default) + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, null, 9999.0, 9999.0); assertThat(autoShardingResult, is(NOT_APPLICABLE_RESULT)); } + + { + // null RECENT write load passed + doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.RECENT, () -> { + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, null, null, 9999.0); + assertThat(autoShardingResult, is(NOT_APPLICABLE_RESULT)); + }); + } + + { + // null PEAK write load passed + doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.PEAK, () -> { + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, null, 9999.0, null); + assertThat(autoShardingResult, is(NOT_APPLICABLE_RESULT)); + }); + } } public void testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent() { @@ -149,7 +177,7 @@ public void testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5); + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5, 9999.0, 9999.0); assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); // no pre-existing scaling event so the cool down must be zero assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); @@ -184,7 +212,7 @@ public void testCalculateIncreaseShardingRecommendations_preventedByCooldown() { .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5); + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5, 9999.0, 9999.0); assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_INCREASE)); // no pre-existing scaling event so the cool down must be zero assertThat(autoShardingResult.targetNumberOfShards(), is(3)); @@ -218,13 +246,69 @@ public void testCalculateIncreaseShardingRecommendations_notPreventedByPreviousI .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5); + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5, 9999.0, 9999.0); assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); // no pre-existing scaling event so the cool down must be zero assertThat(autoShardingResult.targetNumberOfShards(), is(3)); assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); } + public void testCalculateIncreaseShardingRecommendations_usingRecentWriteLoad() { + // Repeated testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent but with RECENT rather than ALL_TIME write load + var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + 1, + now, + List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), + getWriteLoad(1, 9999.0, 9999.0, 9999.0), // not used for increase calculation + null + ); + builder.put(dataStream); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) + .putProjectMetadata(builder.build()) + .build(); + + doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.RECENT, () -> { + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 9999.0, 2.5, 9999.0); + assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); + // no pre-existing scaling event so the cool down must be zero + assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); + assertThat(autoShardingResult.targetNumberOfShards(), is(3)); + }); + } + + public void testCalculateIncreaseShardingRecommendations_usingPeakWriteLoad() { + // Repeated testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent but with PEAK rather than ALL_TIME write load + var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + 1, + now, + List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), + getWriteLoad(1, 9999.0, 9999.0, 9999.0), // not used for increase calculation + null + ); + builder.put(dataStream); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) + .putProjectMetadata(builder.build()) + .build(); + + doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.PEAK, () -> { + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 9999.0, 9999.0, 2.5); + assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); + // no pre-existing scaling event so the cool down must be zero + assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); + assertThat(autoShardingResult.targetNumberOfShards(), is(3)); + }); + } + public void testCalculateDecreaseShardingRecommendations_dataStreamNotOldEnough() { // the input is a data stream with 5 backing indices with 3 shards each // testing a decrease shards events prevented by the cool down period not lapsing due to the oldest generation index being @@ -246,7 +330,7 @@ public void testCalculateDecreaseShardingRecommendations_dataStreamNotOldEnough( .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0); + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0, 9999.0, 9999.0); // the cooldown period for the decrease shards event hasn't lapsed since the data stream was created assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_DECREASE)); assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.timeValueMillis(TimeValue.timeValueDays(3).millis() - 10_000))); @@ -279,7 +363,7 @@ public void testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0); + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0, 9999.0, 9999.0); assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); assertThat(autoShardingResult.targetNumberOfShards(), is(1)); // no pre-existing auto sharding event however we have old enough backing indices (older than the cooldown period) so we can @@ -319,7 +403,7 @@ public void testCalculateDecreaseShardingRecommendations_notPreventedByPreviousD .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0); + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0, 9999.0, 9999.0); assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); assertThat(autoShardingResult.targetNumberOfShards(), is(1)); assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); @@ -357,7 +441,7 @@ public void testCalculateDecreaseShardingRecommendations_preventedByCooldown() { .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0); + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0, 9999.0, 9999.0); assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_DECREASE)); assertThat(autoShardingResult.targetNumberOfShards(), is(1)); assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.timeValueDays(1))); @@ -390,12 +474,78 @@ public void testCalculateDecreaseShardingRecommendations_noChangeRequired() { .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 4.0); + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 4.0, 9999.0, 9999.0); assertThat(autoShardingResult.type(), is(NO_CHANGE_REQUIRED)); assertThat(autoShardingResult.targetNumberOfShards(), is(3)); assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); } + public void testCalculateDecreaseShardingRecommendations_usingRecentWriteLoad() { + // Repeated testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent but with RECENT rather than ALL_TIME write load + var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + 3, + now, + List.of( + now - TimeValue.timeValueDays(21).getMillis(), + now - TimeValue.timeValueDays(15).getMillis(), + now - TimeValue.timeValueDays(4).getMillis(), + now - TimeValue.timeValueDays(2).getMillis(), + now - 1000 + ), + getWriteLoad(3, 9999.0, 0.333, 9999.0), + null + ); + builder.put(dataStream); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) + .putProjectMetadata(builder.build()) + .build(); + + doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.RECENT, () -> { + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 0.5, 1.0, 9999.0); + assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); + assertThat(autoShardingResult.targetNumberOfShards(), is(1)); + assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); + }); + } + + public void testCalculateDecreaseShardingRecommendations_usingPeakWriteLoad() { + // Repeated testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent but with PEAK rather than ALL_TIME write load + var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + 3, + now, + List.of( + now - TimeValue.timeValueDays(21).getMillis(), + now - TimeValue.timeValueDays(15).getMillis(), + now - TimeValue.timeValueDays(4).getMillis(), + now - TimeValue.timeValueDays(2).getMillis(), + now - 1000 + ), + getWriteLoad(3, 9999.0, 9999.0, 0.333), + null + ); + builder.put(dataStream); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) + .putProjectMetadata(builder.build()) + .build(); + + doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.PEAK, () -> { + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 0.5, 9999.0, 1.0); + assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); + assertThat(autoShardingResult.targetNumberOfShards(), is(1)); + assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); + }); + } + public void testComputeOptimalNumberOfShards_zeroLoad() { assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 0.0), is(1L)); } @@ -433,7 +583,7 @@ public void testComputeOptimalNumberOfShards_load180() { assertThat(DataStreamAutoShardingService.computeOptimalNumberOfShards(MIN_WRITE_THREADS, MAX_WRITE_THREADS, 180.0), is(12L)); } - public void testGetMaxIndexLoadWithinCoolingPeriod() { + public void testGetMaxIndexLoadWithinCoolingPeriod_withLongHistory() { final TimeValue coolingPeriod = TimeValue.timeValueDays(3); final Metadata.Builder metadataBuilder = Metadata.builder(); @@ -493,7 +643,7 @@ public void testGetMaxIndexLoadWithinCoolingPeriod() { final IndexMetadata writeIndexMetadata = createIndexMetadata( writeIndexName, 3, - getWriteLoad(3, 1.0, 9999.0, 9999.0), + null, System.currentTimeMillis() ); backingIndices.add(writeIndexMetadata.getIndex()); @@ -512,7 +662,8 @@ public void testGetMaxIndexLoadWithinCoolingPeriod() { dataStream, 3.0, coolingPeriod, - () -> now + () -> now, + WriteLoadMetric.ALL_TIME ); // to cover the entire cooldown period, the last index before the cooling period is taken into account assertThat(maxIndexLoadWithinCoolingPeriod, is(lastIndexBeforeCoolingPeriodHasLowWriteLoad ? 15.0 : 999.0)); @@ -554,7 +705,7 @@ public void testGetMaxIndexLoadWithinCoolingPeriod_sumsShardLoads() { final IndexMetadata writeIndexMetadata = createIndexMetadata( writeIndexName, 3, - getWriteLoad(3, 0.1, 9999.0, 9999.0), + null, System.currentTimeMillis() ); backingIndices.add(writeIndexMetadata.getIndex()); @@ -573,11 +724,153 @@ public void testGetMaxIndexLoadWithinCoolingPeriod_sumsShardLoads() { dataStream, 0.1, coolingPeriod, - () -> now + () -> now, + WriteLoadMetric.ALL_TIME ); assertThat(maxIndexLoadWithinCoolingPeriod, is(expectedIsSumOfShardLoads)); } + public void testGetMaxIndexLoadWithinCoolingPeriod_usingAllTimeWriteLoad() { + TimeValue coolingPeriod = TimeValue.timeValueDays(3); + Metadata.Builder metadataBuilder = Metadata.builder(); + List backingIndices = new ArrayList<>(); + String dataStreamName = "logs"; + long now = System.currentTimeMillis(); + long createdAt = now - (coolingPeriod.getMillis() / 2); + + IndexMetadata indexMetadata; + indexMetadata = createIndexMetadata( + DataStream.getDefaultBackingIndexName(dataStreamName, 0, createdAt), + 3, + getWriteLoad(3, 5.0, 9999.0, 9999.0), + createdAt + ); + backingIndices.add(indexMetadata.getIndex()); + metadataBuilder.put(indexMetadata, false); + + String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + IndexMetadata writeIndexMetadata = createIndexMetadata( + writeIndexName, + 3, + null, + System.currentTimeMillis() + ); + backingIndices.add(writeIndexMetadata.getIndex()); + metadataBuilder.put(writeIndexMetadata, false); + + DataStream dataStream = DataStream.builder(dataStreamName, backingIndices) + .setGeneration(2) + .setMetadata(Map.of()) + .setIndexMode(IndexMode.STANDARD) + .build(); + + metadataBuilder.put(dataStream); + + double maxIndexLoadWithinCoolingPeriod = DataStreamAutoShardingService.getMaxIndexLoadWithinCoolingPeriod( + metadataBuilder.build().getProject(), + dataStream, + 3.0, + coolingPeriod, + () -> now, + WriteLoadMetric.ALL_TIME + ); + assertThat(maxIndexLoadWithinCoolingPeriod, equalTo(3 * 5.0)); + } + + public void testGetMaxIndexLoadWithinCoolingPeriod_usingRecentWriteLoad() { + TimeValue coolingPeriod = TimeValue.timeValueDays(3); + Metadata.Builder metadataBuilder = Metadata.builder(); + List backingIndices = new ArrayList<>(); + String dataStreamName = "logs"; + long now = System.currentTimeMillis(); + long createdAt = now - (coolingPeriod.getMillis() / 2); + + IndexMetadata indexMetadata; + indexMetadata = createIndexMetadata( + DataStream.getDefaultBackingIndexName(dataStreamName, 0, createdAt), + 3, + getWriteLoad(3, 9999.0, 5.0, 9999.0), + createdAt + ); + backingIndices.add(indexMetadata.getIndex()); + metadataBuilder.put(indexMetadata, false); + + String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + IndexMetadata writeIndexMetadata = createIndexMetadata( + writeIndexName, + 3, + null, + System.currentTimeMillis() + ); + backingIndices.add(writeIndexMetadata.getIndex()); + metadataBuilder.put(writeIndexMetadata, false); + + DataStream dataStream = DataStream.builder(dataStreamName, backingIndices) + .setGeneration(2) + .setMetadata(Map.of()) + .setIndexMode(IndexMode.STANDARD) + .build(); + + metadataBuilder.put(dataStream); + + double maxIndexLoadWithinCoolingPeriod = DataStreamAutoShardingService.getMaxIndexLoadWithinCoolingPeriod( + metadataBuilder.build().getProject(), + dataStream, + 3.0, + coolingPeriod, + () -> now, + WriteLoadMetric.RECENT + ); + assertThat(maxIndexLoadWithinCoolingPeriod, equalTo(3 * 5.0)); + } + + public void testGetMaxIndexLoadWithinCoolingPeriod_usingPeakWriteLoad() { + TimeValue coolingPeriod = TimeValue.timeValueDays(3); + Metadata.Builder metadataBuilder = Metadata.builder(); + List backingIndices = new ArrayList<>(); + String dataStreamName = "logs"; + long now = System.currentTimeMillis(); + long createdAt = now - (coolingPeriod.getMillis() / 2); + + IndexMetadata indexMetadata; + indexMetadata = createIndexMetadata( + DataStream.getDefaultBackingIndexName(dataStreamName, 0, createdAt), + 3, + getWriteLoad(3, 9999.0, 9999.0, 5.0), + createdAt + ); + backingIndices.add(indexMetadata.getIndex()); + metadataBuilder.put(indexMetadata, false); + + String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + IndexMetadata writeIndexMetadata = createIndexMetadata( + writeIndexName, + 3, + null, + System.currentTimeMillis() + ); + backingIndices.add(writeIndexMetadata.getIndex()); + metadataBuilder.put(writeIndexMetadata, false); + + DataStream dataStream = DataStream.builder(dataStreamName, backingIndices) + .setGeneration(2) + .setMetadata(Map.of()) + .setIndexMode(IndexMode.STANDARD) + .build(); + + metadataBuilder.put(dataStream); + + double maxIndexLoadWithinCoolingPeriod = DataStreamAutoShardingService.getMaxIndexLoadWithinCoolingPeriod( + metadataBuilder.build().getProject(), + dataStream, + 3.0, + coolingPeriod, + () -> now, + WriteLoadMetric.PEAK + ); + assertThat(maxIndexLoadWithinCoolingPeriod, equalTo(3 * 5.0)); + } + public void testAutoShardingResultValidation_increaseShardsShouldNotReportCooldown() { expectThrows( IllegalArgumentException.class, @@ -682,4 +975,14 @@ private IndexWriteLoad getWriteLoad(int numberOfShards, double shardWriteLoad, d } return builder.build(); } + + private void doWithMetricSelection(Setting setting, WriteLoadMetric metric, Runnable action) { + clusterSettings.applySettings(Settings.builder().put(setting.getKey(), metric).build()); + try { + action.run(); + } finally { + clusterSettings.applySettings(Settings.builder().put(setting.getKey(), setting.getDefault(Settings.EMPTY)).build()); + } + } + } From 7dc015d8a33368adccae8de5c25a05958bd65b31 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 27 Mar 2025 17:58:23 +0000 Subject: [PATCH 3/5] [CI] Auto commit changes from spotless --- .../DataStreamAutoShardingServiceTests.java | 35 +++---------------- 1 file changed, 5 insertions(+), 30 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java index 64245c26b17b9..e4555bbc29ee6 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java @@ -640,12 +640,7 @@ public void testGetMaxIndexLoadWithinCoolingPeriod_withLongHistory() { } final String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size()); - final IndexMetadata writeIndexMetadata = createIndexMetadata( - writeIndexName, - 3, - null, - System.currentTimeMillis() - ); + final IndexMetadata writeIndexMetadata = createIndexMetadata(writeIndexName, 3, null, System.currentTimeMillis()); backingIndices.add(writeIndexMetadata.getIndex()); metadataBuilder.put(writeIndexMetadata, false); @@ -702,12 +697,7 @@ public void testGetMaxIndexLoadWithinCoolingPeriod_sumsShardLoads() { } final String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size()); - final IndexMetadata writeIndexMetadata = createIndexMetadata( - writeIndexName, - 3, - null, - System.currentTimeMillis() - ); + final IndexMetadata writeIndexMetadata = createIndexMetadata(writeIndexName, 3, null, System.currentTimeMillis()); backingIndices.add(writeIndexMetadata.getIndex()); metadataBuilder.put(writeIndexMetadata, false); @@ -749,12 +739,7 @@ public void testGetMaxIndexLoadWithinCoolingPeriod_usingAllTimeWriteLoad() { metadataBuilder.put(indexMetadata, false); String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); - IndexMetadata writeIndexMetadata = createIndexMetadata( - writeIndexName, - 3, - null, - System.currentTimeMillis() - ); + IndexMetadata writeIndexMetadata = createIndexMetadata(writeIndexName, 3, null, System.currentTimeMillis()); backingIndices.add(writeIndexMetadata.getIndex()); metadataBuilder.put(writeIndexMetadata, false); @@ -796,12 +781,7 @@ public void testGetMaxIndexLoadWithinCoolingPeriod_usingRecentWriteLoad() { metadataBuilder.put(indexMetadata, false); String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); - IndexMetadata writeIndexMetadata = createIndexMetadata( - writeIndexName, - 3, - null, - System.currentTimeMillis() - ); + IndexMetadata writeIndexMetadata = createIndexMetadata(writeIndexName, 3, null, System.currentTimeMillis()); backingIndices.add(writeIndexMetadata.getIndex()); metadataBuilder.put(writeIndexMetadata, false); @@ -843,12 +823,7 @@ public void testGetMaxIndexLoadWithinCoolingPeriod_usingPeakWriteLoad() { metadataBuilder.put(indexMetadata, false); String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); - IndexMetadata writeIndexMetadata = createIndexMetadata( - writeIndexName, - 3, - null, - System.currentTimeMillis() - ); + IndexMetadata writeIndexMetadata = createIndexMetadata(writeIndexName, 3, null, System.currentTimeMillis()); backingIndices.add(writeIndexMetadata.getIndex()); metadataBuilder.put(writeIndexMetadata, false); From 422f44f748a5b705c57ccf4ca8cd24bad7cb87f7 Mon Sep 17 00:00:00 2001 From: Pete Gillin Date: Fri, 28 Mar 2025 11:41:45 +0000 Subject: [PATCH 4/5] Respond to review comment: Make DSASS.calculate() take the stats object instead of the three load values --- .../rollover/TransportRolloverAction.java | 25 +-- .../DataStreamAutoShardingService.java | 33 ++-- .../DataStreamAutoShardingServiceTests.java | 155 ++++++++++++++---- 3 files changed, 146 insertions(+), 67 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java index e356b466ee391..313849253f3c9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -53,7 +53,6 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.shard.DocsStats; -import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; @@ -69,7 +68,6 @@ import java.util.Objects; import java.util.Optional; import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; /** @@ -280,19 +278,12 @@ protected void masterOperation( final IndexAbstraction indexAbstraction = projectMetadata.getIndicesLookup().get(resolvedRolloverTarget.resource()); if (indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) { DataStream dataStream = (DataStream) indexAbstraction; - final Optional indexStats = Optional.ofNullable(statsResponse) - .map(stats -> stats.getIndex(dataStream.getWriteIndex().getName())); - rolloverAutoSharding = dataStreamAutoShardingService.calculate( - projectState, - dataStream, - indexStats.map(stats -> sumLoadMetrics(stats, IndexingStats.Stats::getWriteLoad)).orElse(null), - indexStats.map(stats -> sumLoadMetrics(stats, IndexingStats.Stats::getRecentWriteLoad)).orElse(null), - indexStats.map(stats -> sumLoadMetrics(stats, IndexingStats.Stats::getPeakWriteLoad)).orElse(null) - ); + IndexStats indexStats = statsResponse != null ? statsResponse.getIndex(dataStream.getWriteIndex().getName()) : null; + rolloverAutoSharding = dataStreamAutoShardingService.calculate(projectState, dataStream, indexStats); logger.debug("auto sharding result for data stream [{}] is [{}]", dataStream.getName(), rolloverAutoSharding); // if auto sharding recommends increasing the number of shards we want to trigger a rollover even if there are no - // other "regular" conditions matching (we want to aggressively increse the number of shards) so we're adding the + // other "regular" conditions matching (we want to aggressively increase the number of shards) so we're adding the // automatic {@link OptimalShardCountCondition} to the rollover request conditions so it gets evaluated and triggers // the rollover operation (having this condition met will also provide a useful paper trail as it'll get stored in // the {@link org.elasticsearch.action.admin.indices.rollover.RolloverInfo#metConditions} ) @@ -352,16 +343,6 @@ protected void masterOperation( ); } - private static Double sumLoadMetrics(IndexStats stats, Function loadMetric) { - return Arrays.stream(stats.getShards()) - .filter(shardStats -> shardStats.getStats().indexing != null) - // only take primaries into account as in stateful the replicas also index data - .filter(shardStats -> shardStats.getShardRouting().primary()) - .map(shardStats -> shardStats.getStats().indexing.getTotal()) - .map(loadMetric) - .reduce(0.0, Double::sum); - } - private void markForLazyRollover( RolloverRequest rolloverRequest, ActionListener listener, diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java index d9080d2c86aef..5b020b4e61681 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -26,6 +27,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.IndexingStats; +import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.OptionalDouble; @@ -207,13 +209,7 @@ public void init() { *

The NO_CHANGE_REQUIRED type will potentially report the remaining cooldown always report a cool down period of TimeValue.ZERO (as * there'll be no new auto sharding event) */ - public AutoShardingResult calculate( - ProjectState state, - DataStream dataStream, - @Nullable Double writeIndexLoad, - @Nullable Double writeIndexRecentLoad, - @Nullable Double writeIndexPeakLoad - ) { + public AutoShardingResult calculate(ProjectState state, DataStream dataStream, IndexStats writeIndexStats) { if (isAutoShardingEnabled == false) { logger.debug("Data stream auto-sharding service is not enabled."); return NOT_APPLICABLE_RESULT; @@ -228,18 +224,21 @@ public AutoShardingResult calculate( return NOT_APPLICABLE_RESULT; } - Double writeIndexLoadForIncrease = pickMetric(increaseShardsMetric, writeIndexLoad, writeIndexRecentLoad, writeIndexPeakLoad); - Double writeIndexLoadForDecrease = pickMetric(decreaseShardsMetric, writeIndexLoad, writeIndexRecentLoad, writeIndexPeakLoad); - - if (writeIndexLoadForIncrease == null || writeIndexLoadForDecrease == null) { + if (writeIndexStats == null) { logger.debug( "Data stream auto-sharding service cannot compute the optimal number of shards for data stream [{}] as the write index " - + "loads are not available", + + "stats are not available", dataStream.getName() ); return NOT_APPLICABLE_RESULT; } + double writeIndexLoad = sumLoadMetrics(writeIndexStats, IndexingStats.Stats::getWriteLoad); + double writeIndexRecentLoad = sumLoadMetrics(writeIndexStats, IndexingStats.Stats::getRecentWriteLoad); + double writeIndexPeakLoad = sumLoadMetrics(writeIndexStats, IndexingStats.Stats::getPeakWriteLoad); + double writeIndexLoadForIncrease = pickMetric(increaseShardsMetric, writeIndexLoad, writeIndexRecentLoad, writeIndexPeakLoad); + double writeIndexLoadForDecrease = pickMetric(decreaseShardsMetric, writeIndexLoad, writeIndexRecentLoad, writeIndexPeakLoad); + logger.trace( "Data stream auto-sharding service calculating recommendation with all-time load {}, recent load {}, peak load {}, " + "using {} for increase and {} for decrease", @@ -253,6 +252,16 @@ public AutoShardingResult calculate( return innerCalculate(state.metadata(), dataStream, writeIndexLoadForIncrease, writeIndexLoadForDecrease, nowSupplier); } + private static double sumLoadMetrics(IndexStats stats, Function loadMetric) { + return Arrays.stream(stats.getShards()) + .filter(shardStats -> shardStats.getStats().indexing != null) + // only take primaries into account as in stateful the replicas also index data + .filter(shardStats -> shardStats.getShardRouting().primary()) + .map(shardStats -> shardStats.getStats().indexing.getTotal()) + .map(loadMetric) + .reduce(0.0, Double::sum); + } + private AutoShardingResult innerCalculate( ProjectMetadata project, DataStream dataStream, diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java index e4555bbc29ee6..1011c20bb95cc 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java @@ -11,6 +11,9 @@ import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition; import org.elasticsearch.action.admin.indices.rollover.RolloverInfo; +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.WriteLoadMetric; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -23,6 +26,9 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -32,6 +38,8 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.shard.IndexingStats; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -128,34 +136,16 @@ public void testCalculateValidations() { AutoShardingResult autoShardingResult = disabledAutoshardingService.calculate( state.projectState(projectId), dataStream, - 2.0, - 9999.0, - 9999.0 + createIndexStats(1, 2.0, 9999.0, 9999.0) ); assertThat(autoShardingResult, is(NOT_APPLICABLE_RESULT)); } { - // null ALL_TIME write load passed (used by default) - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, null, 9999.0, 9999.0); + // null stats passed + AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, null); assertThat(autoShardingResult, is(NOT_APPLICABLE_RESULT)); } - - { - // null RECENT write load passed - doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.RECENT, () -> { - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, null, null, 9999.0); - assertThat(autoShardingResult, is(NOT_APPLICABLE_RESULT)); - }); - } - - { - // null PEAK write load passed - doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.PEAK, () -> { - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, null, 9999.0, null); - assertThat(autoShardingResult, is(NOT_APPLICABLE_RESULT)); - }); - } } public void testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent() { @@ -177,7 +167,11 @@ public void testCalculateIncreaseShardingRecommendations_noPreviousShardingEvent .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5, 9999.0, 9999.0); + AutoShardingResult autoShardingResult = service.calculate( + state.projectState(projectId), + dataStream, + createIndexStats(1, 2.5, 9999.0, 9999.0) + ); assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); // no pre-existing scaling event so the cool down must be zero assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); @@ -212,7 +206,11 @@ public void testCalculateIncreaseShardingRecommendations_preventedByCooldown() { .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5, 9999.0, 9999.0); + AutoShardingResult autoShardingResult = service.calculate( + state.projectState(projectId), + dataStream, + createIndexStats(1, 2.5, 9999.0, 9999.0) + ); assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_INCREASE)); // no pre-existing scaling event so the cool down must be zero assertThat(autoShardingResult.targetNumberOfShards(), is(3)); @@ -246,7 +244,11 @@ public void testCalculateIncreaseShardingRecommendations_notPreventedByPreviousI .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 2.5, 9999.0, 9999.0); + AutoShardingResult autoShardingResult = service.calculate( + state.projectState(projectId), + dataStream, + createIndexStats(1, 2.5, 9999.0, 9999.0) + ); assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); // no pre-existing scaling event so the cool down must be zero assertThat(autoShardingResult.targetNumberOfShards(), is(3)); @@ -273,7 +275,11 @@ public void testCalculateIncreaseShardingRecommendations_usingRecentWriteLoad() .build(); doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.RECENT, () -> { - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 9999.0, 2.5, 9999.0); + AutoShardingResult autoShardingResult = service.calculate( + state.projectState(projectId), + dataStream, + createIndexStats(1, 9999.0, 2.5, 9999.0) + ); assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); // no pre-existing scaling event so the cool down must be zero assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); @@ -301,7 +307,11 @@ public void testCalculateIncreaseShardingRecommendations_usingPeakWriteLoad() { .build(); doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.PEAK, () -> { - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 9999.0, 9999.0, 2.5); + AutoShardingResult autoShardingResult = service.calculate( + state.projectState(projectId), + dataStream, + createIndexStats(1, 9999.0, 9999.0, 2.5) + ); assertThat(autoShardingResult.type(), is(INCREASE_SHARDS)); // no pre-existing scaling event so the cool down must be zero assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); @@ -330,7 +340,11 @@ public void testCalculateDecreaseShardingRecommendations_dataStreamNotOldEnough( .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0, 9999.0, 9999.0); + AutoShardingResult autoShardingResult = service.calculate( + state.projectState(projectId), + dataStream, + createIndexStats(3, 1.0 / 3, 9999.0, 9999.0) + ); // the cooldown period for the decrease shards event hasn't lapsed since the data stream was created assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_DECREASE)); assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.timeValueMillis(TimeValue.timeValueDays(3).millis() - 10_000))); @@ -363,7 +377,11 @@ public void testCalculateDecreaseShardingRecommendations_noPreviousShardingEvent .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0, 9999.0, 9999.0); + AutoShardingResult autoShardingResult = service.calculate( + state.projectState(projectId), + dataStream, + createIndexStats(3, 1.0 / 3, 9999.0, 9999.0) + ); assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); assertThat(autoShardingResult.targetNumberOfShards(), is(1)); // no pre-existing auto sharding event however we have old enough backing indices (older than the cooldown period) so we can @@ -403,7 +421,11 @@ public void testCalculateDecreaseShardingRecommendations_notPreventedByPreviousD .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0, 9999.0, 9999.0); + AutoShardingResult autoShardingResult = service.calculate( + state.projectState(projectId), + dataStream, + createIndexStats(3, 1.0 / 3, 9999.0, 9999.0) + ); assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); assertThat(autoShardingResult.targetNumberOfShards(), is(1)); assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); @@ -441,7 +463,11 @@ public void testCalculateDecreaseShardingRecommendations_preventedByCooldown() { .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 1.0, 9999.0, 9999.0); + AutoShardingResult autoShardingResult = service.calculate( + state.projectState(projectId), + dataStream, + createIndexStats(3, 1.0 / 3, 9999.0, 9999.0) + ); assertThat(autoShardingResult.type(), is(COOLDOWN_PREVENTED_DECREASE)); assertThat(autoShardingResult.targetNumberOfShards(), is(1)); assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.timeValueDays(1))); @@ -474,7 +500,11 @@ public void testCalculateDecreaseShardingRecommendations_noChangeRequired() { .putProjectMetadata(builder.build()) .build(); - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 4.0, 9999.0, 9999.0); + AutoShardingResult autoShardingResult = service.calculate( + state.projectState(projectId), + dataStream, + createIndexStats(3, 4.0 / 3, 9999.0, 9999.0) + ); assertThat(autoShardingResult.type(), is(NO_CHANGE_REQUIRED)); assertThat(autoShardingResult.targetNumberOfShards(), is(3)); assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); @@ -506,7 +536,11 @@ public void testCalculateDecreaseShardingRecommendations_usingRecentWriteLoad() .build(); doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.RECENT, () -> { - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 0.5, 1.0, 9999.0); + AutoShardingResult autoShardingResult = service.calculate( + state.projectState(projectId), + dataStream, + createIndexStats(3, 0.5 / 3, 1.0 / 3, 9999.0) + ); assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); assertThat(autoShardingResult.targetNumberOfShards(), is(1)); assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); @@ -539,7 +573,11 @@ public void testCalculateDecreaseShardingRecommendations_usingPeakWriteLoad() { .build(); doWithMetricSelection(DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC, WriteLoadMetric.PEAK, () -> { - AutoShardingResult autoShardingResult = service.calculate(state.projectState(projectId), dataStream, 0.5, 9999.0, 1.0); + AutoShardingResult autoShardingResult = service.calculate( + state.projectState(projectId), + dataStream, + createIndexStats(3, 0.5 / 3, 9999.0, 1.0 / 3) + ); assertThat(autoShardingResult.type(), is(DECREASE_SHARDS)); assertThat(autoShardingResult.targetNumberOfShards(), is(1)); assertThat(autoShardingResult.coolDownRemaining(), is(TimeValue.ZERO)); @@ -882,6 +920,57 @@ public void testAutoShardingResultValidation_validCooldownPreventedDecrease() { assertThat(cooldownPreventedDecrease.coolDownRemaining(), is(TimeValue.timeValueSeconds(7))); } + IndexStats createIndexStats(int numberOfShards, double shardWriteLoad, double shardRecentWriteLoad, double shardPeakWriteLoad) { + String indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 99); // the generation number here is not used + Index index = new Index(indexName, randomUUID()); + IndexStats.IndexStatsBuilder builder = new IndexStats.IndexStatsBuilder(indexName, randomUUID(), null, null); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + // Add stats for the primary: + builder.add(createShardStats(shardWriteLoad, shardRecentWriteLoad, shardPeakWriteLoad, new ShardId(index, shardNumber), true)); + // Add stats for a replica, which should be ignored: + builder.add(createShardStats(9999.0, 9999.0, 9999.0, new ShardId(index, shardNumber), false)); + } + return builder.build(); + } + + private static ShardStats createShardStats( + double indexingLoad, + double recentIndexingLoad, + double peakIndexingLoad, + ShardId shardId, + boolean primary + ) { + ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, "unused-node-id", primary, ShardRoutingState.STARTED); + CommonStats commonStats = new CommonStats(); + commonStats.indexing = createIndexingStats(indexingLoad, recentIndexingLoad, peakIndexingLoad); + return new ShardStats(shardRouting, commonStats, null, null, null, null, null, false, false, 0); + } + + private static IndexingStats createIndexingStats(double indexingLoad, double recentIndexingLoad, double peakIndexingLoad) { + int totalActiveTimeInNanos = 1_000_000_000; + // Use the correct indexing time to give the required all-time load value of indexingLoad (aside from the rounding errors): + long totalIndexingTimeSinceShardStartedInNanos = (long) (indexingLoad * totalActiveTimeInNanos); + return new IndexingStats( + new IndexingStats.Stats( + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + false, + 0, + totalIndexingTimeSinceShardStartedInNanos, + totalActiveTimeInNanos, + recentIndexingLoad, + peakIndexingLoad + ) + ); + } + private DataStream createDataStream( ProjectMetadata.Builder builder, String dataStreamName, From f0193447767ecc2ff7c232231bb45a259bf7d1bc Mon Sep 17 00:00:00 2001 From: Pete Gillin Date: Fri, 28 Mar 2025 13:51:42 +0000 Subject: [PATCH 5/5] Update server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java Co-authored-by: Mary Gouseti --- .../datastreams/autosharding/DataStreamAutoShardingService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java index 5b020b4e61681..e4c5bff660f15 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java @@ -209,7 +209,7 @@ public void init() { *

The NO_CHANGE_REQUIRED type will potentially report the remaining cooldown always report a cool down period of TimeValue.ZERO (as * there'll be no new auto sharding event) */ - public AutoShardingResult calculate(ProjectState state, DataStream dataStream, IndexStats writeIndexStats) { + public AutoShardingResult calculate(ProjectState state, DataStream dataStream, @Nullable IndexStats writeIndexStats) { if (isAutoShardingEnabled == false) { logger.debug("Data stream auto-sharding service is not enabled."); return NOT_APPLICABLE_RESULT;