From 32530803b2b151a7fdf73bb0e4e83a758056de0f Mon Sep 17 00:00:00 2001 From: Fang Xing <155562079+fang-xing-esql@users.noreply.github.com> Date: Wed, 26 Mar 2025 14:51:23 -0400 Subject: [PATCH] [ES|QL] Make numberOfChannels consistent with layout map by removing duplicated ChannelSet (#125636) * make numberOfChannels consistent with layout (cherry picked from commit 80125a4bac31f54cc86fdee26ef0ac5dfa61eee3) # Conflicts: # x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java --- docs/changelog/125636.yaml | 6 ++ .../xpack/esql/qa/rest/RestEsqlTestCase.java | 53 ++++++++++- .../src/main/resources/lookup-join.csv-spec | 87 +++++++++++++++++++ .../xpack/esql/action/EsqlCapabilities.java | 7 +- .../xpack/esql/planner/Layout.java | 14 ++- 5 files changed, 162 insertions(+), 5 deletions(-) create mode 100644 docs/changelog/125636.yaml diff --git a/docs/changelog/125636.yaml b/docs/changelog/125636.yaml new file mode 100644 index 0000000000000..3ceaade9f01c2 --- /dev/null +++ b/docs/changelog/125636.yaml @@ -0,0 +1,6 @@ +pr: 125636 +summary: Make `numberOfChannels` consistent with layout map by removing duplicated + `ChannelSet` +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index 0de555b6b4942..13cfbe32af033 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -33,6 +33,7 @@ import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.esql.AssertWarnings; import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.action.EsqlCapabilities; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -83,9 +84,13 @@ public abstract class RestEsqlTestCase extends ESRestTestCase { private static final String MAPPING_ALL_TYPES; + private static final String MAPPING_ALL_TYPES_LOOKUP; + static { String properties = EsqlTestUtils.loadUtf8TextFile("/mapping-all-types.json"); MAPPING_ALL_TYPES = "{\"mappings\": " + properties + "}"; + String settings = "{\"settings\" : {\"mode\" : \"lookup\"}"; + MAPPING_ALL_TYPES_LOOKUP = settings + ", " + "\"mappings\": " + properties + "}"; } private static final String DOCUMENT_TEMPLATE = """ @@ -796,6 +801,32 @@ public void testErrorMessageForMissingParams() throws IOException { ); } + public void testMultipleBatchesWithLookupJoin() throws IOException { + assumeTrue( + "Makes numberOfChannels consistent with layout map for join with multiple batches", + EsqlCapabilities.Cap.MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT.isEnabled() + ); + // Create more than 10 indices to trigger multiple batches of data node execution. + // The sort field should be missing on some indices to reproduce NullPointerException caused by duplicated items in layout + for (int i = 1; i <= 20; i++) { + createIndex("idx" + i, randomBoolean(), "\"mappings\": {\"properties\" : {\"a\" : {\"type\" : \"keyword\"}}}"); + } + bulkLoadTestDataLookupMode(10); + // lookup join with and without sort + for (String sort : List.of("", "| sort integer")) { + var query = requestObjectBuilder().query(format(null, "from * | lookup join {} on integer {}", testIndexName(), sort)); + Map result = runEsql(query); + var columns = as(result.get("columns"), List.class); + assertEquals(21, columns.size()); + var values = as(result.get("values"), List.class); + assertEquals(10, values.size()); + } + // clean up + for (int i = 1; i <= 20; i++) { + assertThat(deleteIndex("idx" + i).isAcknowledged(), is(true)); + } + } + public void testErrorMessageForLiteralDateMathOverflow() throws IOException { List dateMathOverflowExpressions = List.of( "2147483647 day + 1 day", @@ -1377,13 +1408,22 @@ private static void bulkLoadTestData(int count) throws IOException { bulkLoadTestData(count, 0, true, RestEsqlTestCase::createDocument); } + private static void bulkLoadTestDataLookupMode(int count) throws IOException { + createIndex(testIndexName(), true); + bulkLoadTestData(count, 0, false, RestEsqlTestCase::createDocument); + } + + private static void createIndex(String indexName, boolean lookupMode) throws IOException { + Request request = new Request("PUT", "/" + indexName); + request.setJsonEntity(lookupMode ? MAPPING_ALL_TYPES_LOOKUP : MAPPING_ALL_TYPES); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + } + private static void bulkLoadTestData(int count, int firstIndex, boolean createIndex, IntFunction createDocument) throws IOException { Request request; if (createIndex) { - request = new Request("PUT", "/" + testIndexName()); - request.setJsonEntity(MAPPING_ALL_TYPES); - assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + createIndex(testIndexName(), false); } if (count > 0) { @@ -1458,6 +1498,13 @@ private static String repeatValueAsMV(Object value) { return "[" + value + ", " + value + "]"; } + private static void createIndex(String indexName, boolean lookupMode, String mapping) throws IOException { + Request request = new Request("PUT", "/" + indexName); + String settings = "\"settings\" : {\"mode\" : \"lookup\"}, "; + request.setJsonEntity("{" + (lookupMode ? settings : "") + mapping + "}"); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + } + public static RequestObjectBuilder requestObjectBuilder() throws IOException { return new RequestObjectBuilder(); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 8ca4292f97faa..fc43c5fb5b278 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -1393,3 +1393,90 @@ emp_no:integer | language_code:integer | language_name:keyword 10092 | 1 | English 10093 | 3 | Spanish ; + +multipleBatchesWithSort +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: make_number_of_channels_consistent_with_layout + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| sort language_code, birth_date +| keep language_code +| limit 1 +; + +language_code:integer +1 +; + +multipleBatchesWithMvExpand +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: make_number_of_channels_consistent_with_layout + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| mv_expand birth_date +| sort birth_date, language_code +| limit 1 +; + +birth_date:datetime |language_code:integer +1952-02-27T00:00:00.000Z |null +; + +multipleBatchesWithAggregate1 +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: make_number_of_channels_consistent_with_layout + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| stats x=max(birth_date), y=min(language_code) +; + +x:datetime |y:integer +1965-01-03T00:00:00.000Z |1 +; + +multipleBatchesWithAggregate2 +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: make_number_of_channels_consistent_with_layout + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| stats m=min(birth_date) by language_code +| sort language_code +| limit 1 +; + +m:datetime |language_code:integer +null |1 +; + +multipleBatchesWithAggregate3 +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: make_number_of_channels_consistent_with_layout + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| stats m=min(language_code) by birth_date +| sort birth_date +| limit 1 +; + +m:integer |birth_date:datetime +null |1952-02-27T00:00:00.000Z +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index a654961879a61..e55f07b715010 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -694,7 +694,12 @@ public enum Cap { /** * Allow mixed numeric types in conditional functions - case, greatest and least */ - MIXED_NUMERIC_TYPES_IN_CASE_GREATEST_LEAST; + MIXED_NUMERIC_TYPES_IN_CASE_GREATEST_LEAST, + + /** + * Make numberOfChannels consistent with layout in DefaultLayout by removing duplicated ChannelSet. + */ + MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT; private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java index dafba5e92322c..a707bb1c6c000 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java @@ -107,8 +107,20 @@ public Layout build() { Map layout = new HashMap<>(); int numberOfChannels = 0; for (ChannelSet set : channels) { - int channel = numberOfChannels++; + boolean createNewChannel = true; + int channel = 0; for (NameId id : set.nameIds) { + if (layout.containsKey(id)) { + // If a NameId already exists in the map, do not increase the numberOfChannels, it can cause inverse() to create + // a null in the list of channels, and NullPointerException when build() is called. + // TODO avoid adding duplicated attributes with the same id in the plan, ReplaceMissingFieldWithNull may add nulls + // with the same ids as the missing field ids. + continue; + } + if (createNewChannel) { + channel = numberOfChannels++; + createNewChannel = false; + } ChannelAndType next = new ChannelAndType(channel, set.type); ChannelAndType prev = layout.put(id, next); // Do allow multiple name to point to the same channel - see https://github.com/elastic/elasticsearch/pull/100238