Skip to content

Commit 5c1e17c

Browse files
[ES|QL] Make numberOfChannels consistent with layout map by removing duplicated ChannelSet (#125636) (#125721)
* make numberOfChannels consistent with layout (cherry picked from commit 80125a4) # Conflicts: # x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
1 parent eeea3d7 commit 5c1e17c

File tree

5 files changed

+162
-5
lines changed

5 files changed

+162
-5
lines changed

docs/changelog/125636.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 125636
2+
summary: Make `numberOfChannels` consistent with layout map by removing duplicated
3+
`ChannelSet`
4+
area: ES|QL
5+
type: bug
6+
issues: []

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.xcontent.XContentType;
3434
import org.elasticsearch.xpack.esql.AssertWarnings;
3535
import org.elasticsearch.xpack.esql.EsqlTestUtils;
36+
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
3637
import org.junit.After;
3738
import org.junit.Assert;
3839
import org.junit.Before;
@@ -83,9 +84,13 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
8384

8485
private static final String MAPPING_ALL_TYPES;
8586

87+
private static final String MAPPING_ALL_TYPES_LOOKUP;
88+
8689
static {
8790
String properties = EsqlTestUtils.loadUtf8TextFile("/mapping-all-types.json");
8891
MAPPING_ALL_TYPES = "{\"mappings\": " + properties + "}";
92+
String settings = "{\"settings\" : {\"mode\" : \"lookup\"}";
93+
MAPPING_ALL_TYPES_LOOKUP = settings + ", " + "\"mappings\": " + properties + "}";
8994
}
9095

9196
private static final String DOCUMENT_TEMPLATE = """
@@ -796,6 +801,32 @@ public void testErrorMessageForMissingParams() throws IOException {
796801
);
797802
}
798803

804+
public void testMultipleBatchesWithLookupJoin() throws IOException {
805+
assumeTrue(
806+
"Makes numberOfChannels consistent with layout map for join with multiple batches",
807+
EsqlCapabilities.Cap.MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT.isEnabled()
808+
);
809+
// Create more than 10 indices to trigger multiple batches of data node execution.
810+
// The sort field should be missing on some indices to reproduce NullPointerException caused by duplicated items in layout
811+
for (int i = 1; i <= 20; i++) {
812+
createIndex("idx" + i, randomBoolean(), "\"mappings\": {\"properties\" : {\"a\" : {\"type\" : \"keyword\"}}}");
813+
}
814+
bulkLoadTestDataLookupMode(10);
815+
// lookup join with and without sort
816+
for (String sort : List.of("", "| sort integer")) {
817+
var query = requestObjectBuilder().query(format(null, "from * | lookup join {} on integer {}", testIndexName(), sort));
818+
Map<String, Object> result = runEsql(query);
819+
var columns = as(result.get("columns"), List.class);
820+
assertEquals(21, columns.size());
821+
var values = as(result.get("values"), List.class);
822+
assertEquals(10, values.size());
823+
}
824+
// clean up
825+
for (int i = 1; i <= 20; i++) {
826+
assertThat(deleteIndex("idx" + i).isAcknowledged(), is(true));
827+
}
828+
}
829+
799830
public void testErrorMessageForLiteralDateMathOverflow() throws IOException {
800831
List<String> dateMathOverflowExpressions = List.of(
801832
"2147483647 day + 1 day",
@@ -1377,13 +1408,22 @@ private static void bulkLoadTestData(int count) throws IOException {
13771408
bulkLoadTestData(count, 0, true, RestEsqlTestCase::createDocument);
13781409
}
13791410

1411+
private static void bulkLoadTestDataLookupMode(int count) throws IOException {
1412+
createIndex(testIndexName(), true);
1413+
bulkLoadTestData(count, 0, false, RestEsqlTestCase::createDocument);
1414+
}
1415+
1416+
private static void createIndex(String indexName, boolean lookupMode) throws IOException {
1417+
Request request = new Request("PUT", "/" + indexName);
1418+
request.setJsonEntity(lookupMode ? MAPPING_ALL_TYPES_LOOKUP : MAPPING_ALL_TYPES);
1419+
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
1420+
}
1421+
13801422
private static void bulkLoadTestData(int count, int firstIndex, boolean createIndex, IntFunction<String> createDocument)
13811423
throws IOException {
13821424
Request request;
13831425
if (createIndex) {
1384-
request = new Request("PUT", "/" + testIndexName());
1385-
request.setJsonEntity(MAPPING_ALL_TYPES);
1386-
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
1426+
createIndex(testIndexName(), false);
13871427
}
13881428

13891429
if (count > 0) {
@@ -1458,6 +1498,13 @@ private static String repeatValueAsMV(Object value) {
14581498
return "[" + value + ", " + value + "]";
14591499
}
14601500

1501+
private static void createIndex(String indexName, boolean lookupMode, String mapping) throws IOException {
1502+
Request request = new Request("PUT", "/" + indexName);
1503+
String settings = "\"settings\" : {\"mode\" : \"lookup\"}, ";
1504+
request.setJsonEntity("{" + (lookupMode ? settings : "") + mapping + "}");
1505+
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
1506+
}
1507+
14611508
public static RequestObjectBuilder requestObjectBuilder() throws IOException {
14621509
return new RequestObjectBuilder();
14631510
}

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1393,3 +1393,90 @@ emp_no:integer | language_code:integer | language_name:keyword
13931393
10092 | 1 | English
13941394
10093 | 3 | Spanish
13951395
;
1396+
1397+
multipleBatchesWithSort
1398+
required_capability: join_lookup_v12
1399+
required_capability: remove_redundant_sort
1400+
required_capability: make_number_of_channels_consistent_with_layout
1401+
1402+
from *
1403+
| rename city.country.continent.planet.name as message
1404+
| lookup join message_types_lookup on message
1405+
| sort language_code, birth_date
1406+
| keep language_code
1407+
| limit 1
1408+
;
1409+
1410+
language_code:integer
1411+
1
1412+
;
1413+
1414+
multipleBatchesWithMvExpand
1415+
required_capability: join_lookup_v12
1416+
required_capability: remove_redundant_sort
1417+
required_capability: make_number_of_channels_consistent_with_layout
1418+
1419+
from *
1420+
| rename city.country.continent.planet.name as message
1421+
| lookup join message_types_lookup on message
1422+
| keep birth_date, language_code
1423+
| mv_expand birth_date
1424+
| sort birth_date, language_code
1425+
| limit 1
1426+
;
1427+
1428+
birth_date:datetime |language_code:integer
1429+
1952-02-27T00:00:00.000Z |null
1430+
;
1431+
1432+
multipleBatchesWithAggregate1
1433+
required_capability: join_lookup_v12
1434+
required_capability: remove_redundant_sort
1435+
required_capability: make_number_of_channels_consistent_with_layout
1436+
1437+
from *
1438+
| rename city.country.continent.planet.name as message
1439+
| lookup join message_types_lookup on message
1440+
| keep birth_date, language_code
1441+
| stats x=max(birth_date), y=min(language_code)
1442+
;
1443+
1444+
x:datetime |y:integer
1445+
1965-01-03T00:00:00.000Z |1
1446+
;
1447+
1448+
multipleBatchesWithAggregate2
1449+
required_capability: join_lookup_v12
1450+
required_capability: remove_redundant_sort
1451+
required_capability: make_number_of_channels_consistent_with_layout
1452+
1453+
from *
1454+
| rename city.country.continent.planet.name as message
1455+
| lookup join message_types_lookup on message
1456+
| keep birth_date, language_code
1457+
| stats m=min(birth_date) by language_code
1458+
| sort language_code
1459+
| limit 1
1460+
;
1461+
1462+
m:datetime |language_code:integer
1463+
null |1
1464+
;
1465+
1466+
multipleBatchesWithAggregate3
1467+
required_capability: join_lookup_v12
1468+
required_capability: remove_redundant_sort
1469+
required_capability: make_number_of_channels_consistent_with_layout
1470+
1471+
from *
1472+
| rename city.country.continent.planet.name as message
1473+
| lookup join message_types_lookup on message
1474+
| keep birth_date, language_code
1475+
| stats m=min(language_code) by birth_date
1476+
| sort birth_date
1477+
| limit 1
1478+
;
1479+
1480+
m:integer |birth_date:datetime
1481+
null |1952-02-27T00:00:00.000Z
1482+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -694,7 +694,12 @@ public enum Cap {
694694
/**
695695
* Allow mixed numeric types in conditional functions - case, greatest and least
696696
*/
697-
MIXED_NUMERIC_TYPES_IN_CASE_GREATEST_LEAST;
697+
MIXED_NUMERIC_TYPES_IN_CASE_GREATEST_LEAST,
698+
699+
/**
700+
* Make numberOfChannels consistent with layout in DefaultLayout by removing duplicated ChannelSet.
701+
*/
702+
MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT;
698703

699704
private final boolean enabled;
700705

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,20 @@ public Layout build() {
107107
Map<NameId, ChannelAndType> layout = new HashMap<>();
108108
int numberOfChannels = 0;
109109
for (ChannelSet set : channels) {
110-
int channel = numberOfChannels++;
110+
boolean createNewChannel = true;
111+
int channel = 0;
111112
for (NameId id : set.nameIds) {
113+
if (layout.containsKey(id)) {
114+
// If a NameId already exists in the map, do not increase the numberOfChannels, it can cause inverse() to create
115+
// a null in the list of channels, and NullPointerException when build() is called.
116+
// TODO avoid adding duplicated attributes with the same id in the plan, ReplaceMissingFieldWithNull may add nulls
117+
// with the same ids as the missing field ids.
118+
continue;
119+
}
120+
if (createNewChannel) {
121+
channel = numberOfChannels++;
122+
createNewChannel = false;
123+
}
112124
ChannelAndType next = new ChannelAndType(channel, set.type);
113125
ChannelAndType prev = layout.put(id, next);
114126
// Do allow multiple name to point to the same channel - see https://github.com/elastic/elasticsearch/pull/100238

0 commit comments

Comments
 (0)