Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Resolve duplicate key exception in GetDatafeedRunningStateAction #125477

Merged
merged 4 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/125477.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 125477
summary: Prevent get datafeeds stats API returning an error when local tasks are slow to stop
area: Machine Learning
type: bug
issues:
- 104160
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,28 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

private final Map<String, RunningState> datafeedRunningState;

private static RunningState selectMostRecentState(RunningState state1, RunningState state2) {

if (state1.searchInterval != null && state2.searchInterval != null) {
return state1.searchInterval.startMs() > state2.searchInterval.startMs() ? state1 : state2;
}

if (state1.searchInterval != null) {
return state1;
}
if (state2.searchInterval != null) {
return state2;
}

return state2;
}

public static Response fromResponses(List<Response> responses) {
return new Response(
responses.stream()
.flatMap(r -> r.datafeedRunningState.entrySet().stream())
.filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Response::selectMostRecentState))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response.RunningState;
import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
import org.elasticsearch.xpack.core.ml.datafeed.SearchIntervalTests;

import java.util.function.Function;
Expand Down Expand Up @@ -41,4 +43,68 @@ protected Writeable.Reader<Response> instanceReader() {
return Response::new;
}

/**
* Tests merging responses with the same datafeed ID but different running states,
* where both states have a searchInterval with different start times.
* The state with the more recent searchInterval (larger startMs value) should be selected.
*/
public void testMergeWithDuplicateKeysAndDifferentSearchIntervals() {
SearchInterval olderInterval = new SearchInterval(1000L, 2000L);
SearchInterval newerInterval = new SearchInterval(3000L, 4000L);

RunningState olderState = new RunningState(true, true, olderInterval);
RunningState newerState = new RunningState(false, false, newerInterval);

String datafeedId = "test-datafeed";
Response response1 = Response.fromTaskAndState(datafeedId, olderState);
Response response2 = Response.fromTaskAndState(datafeedId, newerState);

Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2));

assertEquals(newerState, mergedResponse.getRunningState(datafeedId).orElse(null));

mergedResponse = Response.fromResponses(java.util.List.of(response2, response1));
assertEquals(newerState, mergedResponse.getRunningState(datafeedId).orElse(null));
}

/**
* Tests merging responses with the same datafeed ID but different running states,
* where only one state has a searchInterval.
* The state with the searchInterval should be selected, regardless of order.
*/
public void testMergeWithDuplicateKeysWhenOnlyOneHasSearchInterval() {
SearchInterval interval = new SearchInterval(1000L, 2000L);

RunningState stateWithInterval = new RunningState(true, true, interval);
RunningState stateWithoutInterval = new RunningState(false, false, null);

String datafeedId = "test-datafeed";
Response response1 = Response.fromTaskAndState(datafeedId, stateWithInterval);
Response response2 = Response.fromTaskAndState(datafeedId, stateWithoutInterval);

Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2));

assertEquals(stateWithInterval, mergedResponse.getRunningState(datafeedId).orElse(null));

mergedResponse = Response.fromResponses(java.util.List.of(response2, response1));
assertEquals(stateWithInterval, mergedResponse.getRunningState(datafeedId).orElse(null));
}

/**
* Tests merging responses with the same datafeed ID but different running states,
* where neither state has a searchInterval.
* In this case, the second state in the list should be selected.
*/
public void testMergeWithDuplicateKeysWhenNeitherHasSearchInterval() {
RunningState state1 = new RunningState(true, true, null);
RunningState state2 = new RunningState(false, false, null);

String datafeedId = "test-datafeed";
Response response1 = Response.fromTaskAndState(datafeedId, state1);
Response response2 = Response.fromTaskAndState(datafeedId, state2);

Response mergedResponse = Response.fromResponses(java.util.List.of(response1, response2));

assertEquals(state2, mergedResponse.getRunningState(datafeedId).orElse(null));
}
}
Loading