diff --git a/docs/changelog/125477.yaml b/docs/changelog/125477.yaml new file mode 100644 index 0000000000000..316f7a7cba2da --- /dev/null +++ b/docs/changelog/125477.yaml @@ -0,0 +1,6 @@ +pr: 125477 +summary: Prevent get datafeeds stats API returning an error when local tasks are slow to stop +area: Machine Learning +type: bug +issues: + - 104160 diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java index a2cf82dd30e8c..7dc3015091dae 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java @@ -146,12 +146,28 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws private final Map datafeedRunningState; + private static RunningState selectMostRecentState(RunningState state1, RunningState state2) { + + if (state1.searchInterval != null && state2.searchInterval != null) { + return state1.searchInterval.startMs() > state2.searchInterval.startMs() ? state1 : state2; + } + + if (state1.searchInterval != null) { + return state1; + } + if (state2.searchInterval != null) { + return state2; + } + + return state2; + } + public static Response fromResponses(List responses) { return new Response( responses.stream() .flatMap(r -> r.datafeedRunningState.entrySet().stream()) .filter(entry -> entry.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Response::selectMostRecentState)) ); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateActionResponseTests.java index 17f402e9fb063..2cb908d5666a3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateActionResponseTests.java @@ -9,6 +9,8 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response.RunningState; +import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval; import org.elasticsearch.xpack.core.ml.datafeed.SearchIntervalTests; import java.util.function.Function; @@ -41,4 +43,68 @@ protected Writeable.Reader instanceReader() { return Response::new; } + /** + * Tests merging responses with the same datafeed ID but different running states, + * where both states have a searchInterval with different start times. + * The state with the more recent searchInterval (larger startMs value) should be selected. + */ + public void testMergeWithDuplicateKeysAndDifferentSearchIntervals() { + SearchInterval olderInterval = new SearchInterval(1000L, 2000L); + SearchInterval newerInterval = new SearchInterval(3000L, 4000L); + + RunningState olderState = new RunningState(true, true, olderInterval); + RunningState newerState = new RunningState(false, false, newerInterval); + + String datafeedId = "test-datafeed"; + Response response1 = Response.fromTaskAndState(datafeedId, olderState); + Response response2 = Response.fromTaskAndState(datafeedId, newerState); + + Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2)); + + assertEquals(newerState, mergedResponse.getRunningState(datafeedId).orElse(null)); + + mergedResponse = Response.fromResponses(java.util.List.of(response2, response1)); + assertEquals(newerState, mergedResponse.getRunningState(datafeedId).orElse(null)); + } + + /** + * Tests merging responses with the same datafeed ID but different running states, + * where only one state has a searchInterval. + * The state with the searchInterval should be selected, regardless of order. + */ + public void testMergeWithDuplicateKeysWhenOnlyOneHasSearchInterval() { + SearchInterval interval = new SearchInterval(1000L, 2000L); + + RunningState stateWithInterval = new RunningState(true, true, interval); + RunningState stateWithoutInterval = new RunningState(false, false, null); + + String datafeedId = "test-datafeed"; + Response response1 = Response.fromTaskAndState(datafeedId, stateWithInterval); + Response response2 = Response.fromTaskAndState(datafeedId, stateWithoutInterval); + + Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2)); + + assertEquals(stateWithInterval, mergedResponse.getRunningState(datafeedId).orElse(null)); + + mergedResponse = Response.fromResponses(java.util.List.of(response2, response1)); + assertEquals(stateWithInterval, mergedResponse.getRunningState(datafeedId).orElse(null)); + } + + /** + * Tests merging responses with the same datafeed ID but different running states, + * where neither state has a searchInterval. + * In this case, the second state in the list should be selected. + */ + public void testMergeWithDuplicateKeysWhenNeitherHasSearchInterval() { + RunningState state1 = new RunningState(true, true, null); + RunningState state2 = new RunningState(false, false, null); + + String datafeedId = "test-datafeed"; + Response response1 = Response.fromTaskAndState(datafeedId, state1); + Response response2 = Response.fromTaskAndState(datafeedId, state2); + + Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2)); + + assertEquals(state2, mergedResponse.getRunningState(datafeedId).orElse(null)); + } }