diff --git a/docs/changelog/125636.yaml b/docs/changelog/125636.yaml new file mode 100644 index 0000000000000..3ceaade9f01c2 --- /dev/null +++ b/docs/changelog/125636.yaml @@ -0,0 +1,6 @@ +pr: 125636 +summary: Make `numberOfChannels` consistent with layout map by removing duplicated + `ChannelSet` +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index b25f57b77f074..a51639fd631d4 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -994,6 +994,32 @@ private void validateResultsOfDoubleParametersForIdentifiers(RequestObjectBuilde assertEquals(List.of(List.of(false, 9.1), List.of(true, 8.1)), result.get("values")); } + public void testMultipleBatchesWithLookupJoin() throws IOException { + assumeTrue( + "Makes numberOfChannels consistent with layout map for join with multiple batches", + EsqlCapabilities.Cap.MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT.isEnabled() + ); + // Create more than 10 indices to trigger multiple batches of data node execution. + // The sort field should be missing on some indices to reproduce NullPointerException caused by duplicated items in layout + for (int i = 1; i <= 20; i++) { + createIndex("idx" + i, randomBoolean(), "\"mappings\": {\"properties\" : {\"a\" : {\"type\" : \"keyword\"}}}"); + } + bulkLoadTestDataLookupMode(10); + // lookup join with and without sort + for (String sort : List.of("", "| sort integer")) { + var query = requestObjectBuilder().query(format(null, "from * | lookup join {} on integer {}", testIndexName(), sort)); + Map result = runEsql(query); + var columns = as(result.get("columns"), List.class); + assertEquals(21, columns.size()); + var values = as(result.get("values"), List.class); + assertEquals(10, values.size()); + } + // clean up + for (int i = 1; i <= 20; i++) { + assertThat(deleteIndex("idx" + i).isAcknowledged(), is(true)); + } + } + public void testErrorMessageForLiteralDateMathOverflow() throws IOException { List dateMathOverflowExpressions = List.of( "2147483647 day + 1 day", @@ -1668,6 +1694,13 @@ private static String repeatValueAsMV(Object value) { return "[" + value + ", " + value + "]"; } + private static void createIndex(String indexName, boolean lookupMode, String mapping) throws IOException { + Request request = new Request("PUT", "/" + indexName); + String settings = "\"settings\" : {\"mode\" : \"lookup\"}, "; + request.setJsonEntity("{" + (lookupMode ? settings : "") + mapping + "}"); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + } + public static RequestObjectBuilder requestObjectBuilder() throws IOException { return new RequestObjectBuilder(); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 1b5de3283fe63..8f025278cab1d 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -1452,3 +1452,90 @@ emp_no:integer | language_code:integer | language_name:keyword 10092 | 1 | English 10093 | 3 | Spanish ; + +multipleBatchesWithSort +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: make_number_of_channels_consistent_with_layout + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| sort language_code, birth_date +| keep language_code +| limit 1 +; + +language_code:integer +1 +; + +multipleBatchesWithMvExpand +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: make_number_of_channels_consistent_with_layout + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| mv_expand birth_date +| sort birth_date, language_code +| limit 1 +; + +birth_date:datetime |language_code:integer +1952-02-27T00:00:00.000Z |null +; + +multipleBatchesWithAggregate1 +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: make_number_of_channels_consistent_with_layout + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| stats x=max(birth_date), y=min(language_code) +; + +x:datetime |y:integer +1965-01-03T00:00:00.000Z |1 +; + +multipleBatchesWithAggregate2 +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: make_number_of_channels_consistent_with_layout + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| stats m=min(birth_date) by language_code +| sort language_code +| limit 1 +; + +m:datetime |language_code:integer +null |1 +; + +multipleBatchesWithAggregate3 +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: make_number_of_channels_consistent_with_layout + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| stats m=min(language_code) by birth_date +| sort birth_date +| limit 1 +; + +m:integer |birth_date:datetime +null |1952-02-27T00:00:00.000Z +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 3846d0e8a65a7..59ddd690646e3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -932,7 +932,12 @@ public enum Cap { /** * Index component selector syntax (my-data-stream-name::failures) */ - INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled()); + INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled()), + + /** + * Make numberOfChannels consistent with layout in DefaultLayout by removing duplicated ChannelSet. + */ + MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT; private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java index dafba5e92322c..a707bb1c6c000 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java @@ -107,8 +107,20 @@ public Layout build() { Map layout = new HashMap<>(); int numberOfChannels = 0; for (ChannelSet set : channels) { - int channel = numberOfChannels++; + boolean createNewChannel = true; + int channel = 0; for (NameId id : set.nameIds) { + if (layout.containsKey(id)) { + // If a NameId already exists in the map, do not increase the numberOfChannels, it can cause inverse() to create + // a null in the list of channels, and NullPointerException when build() is called. + // TODO avoid adding duplicated attributes with the same id in the plan, ReplaceMissingFieldWithNull may add nulls + // with the same ids as the missing field ids. + continue; + } + if (createNewChannel) { + channel = numberOfChannels++; + createNewChannel = false; + } ChannelAndType next = new ChannelAndType(channel, set.type); ChannelAndType prev = layout.put(id, next); // Do allow multiple name to point to the same channel - see https://github.com/elastic/elasticsearch/pull/100238