Skip to content

Commit 80125a4

Browse files
[ES|QL] Make numberOfChannels consistent with layout map by removing duplicated ChannelSet (#125636)
* make numberOfChannels consistent with layout
1 parent 220f4c4 commit 80125a4

File tree

5 files changed

+145
-2
lines changed

5 files changed

+145
-2
lines changed

docs/changelog/125636.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 125636
2+
summary: Make `numberOfChannels` consistent with layout map by removing duplicated
3+
`ChannelSet`
4+
area: ES|QL
5+
type: bug
6+
issues: []

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

+33
Original file line numberDiff line numberDiff line change
@@ -994,6 +994,32 @@ private void validateResultsOfDoubleParametersForIdentifiers(RequestObjectBuilde
994994
assertEquals(List.of(List.of(false, 9.1), List.of(true, 8.1)), result.get("values"));
995995
}
996996

997+
public void testMultipleBatchesWithLookupJoin() throws IOException {
998+
assumeTrue(
999+
"Makes numberOfChannels consistent with layout map for join with multiple batches",
1000+
EsqlCapabilities.Cap.MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT.isEnabled()
1001+
);
1002+
// Create more than 10 indices to trigger multiple batches of data node execution.
1003+
// The sort field should be missing on some indices to reproduce NullPointerException caused by duplicated items in layout
1004+
for (int i = 1; i <= 20; i++) {
1005+
createIndex("idx" + i, randomBoolean(), "\"mappings\": {\"properties\" : {\"a\" : {\"type\" : \"keyword\"}}}");
1006+
}
1007+
bulkLoadTestDataLookupMode(10);
1008+
// lookup join with and without sort
1009+
for (String sort : List.of("", "| sort integer")) {
1010+
var query = requestObjectBuilder().query(format(null, "from * | lookup join {} on integer {}", testIndexName(), sort));
1011+
Map<String, Object> result = runEsql(query);
1012+
var columns = as(result.get("columns"), List.class);
1013+
assertEquals(21, columns.size());
1014+
var values = as(result.get("values"), List.class);
1015+
assertEquals(10, values.size());
1016+
}
1017+
// clean up
1018+
for (int i = 1; i <= 20; i++) {
1019+
assertThat(deleteIndex("idx" + i).isAcknowledged(), is(true));
1020+
}
1021+
}
1022+
9971023
public void testErrorMessageForLiteralDateMathOverflow() throws IOException {
9981024
List<String> dateMathOverflowExpressions = List.of(
9991025
"2147483647 day + 1 day",
@@ -1668,6 +1694,13 @@ private static String repeatValueAsMV(Object value) {
16681694
return "[" + value + ", " + value + "]";
16691695
}
16701696

1697+
private static void createIndex(String indexName, boolean lookupMode, String mapping) throws IOException {
1698+
Request request = new Request("PUT", "/" + indexName);
1699+
String settings = "\"settings\" : {\"mode\" : \"lookup\"}, ";
1700+
request.setJsonEntity("{" + (lookupMode ? settings : "") + mapping + "}");
1701+
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
1702+
}
1703+
16711704
public static RequestObjectBuilder requestObjectBuilder() throws IOException {
16721705
return new RequestObjectBuilder();
16731706
}

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

+87
Original file line numberDiff line numberDiff line change
@@ -1452,3 +1452,90 @@ emp_no:integer | language_code:integer | language_name:keyword
14521452
10092 | 1 | English
14531453
10093 | 3 | Spanish
14541454
;
1455+
1456+
multipleBatchesWithSort
1457+
required_capability: join_lookup_v12
1458+
required_capability: remove_redundant_sort
1459+
required_capability: make_number_of_channels_consistent_with_layout
1460+
1461+
from *
1462+
| rename city.country.continent.planet.name as message
1463+
| lookup join message_types_lookup on message
1464+
| sort language_code, birth_date
1465+
| keep language_code
1466+
| limit 1
1467+
;
1468+
1469+
language_code:integer
1470+
1
1471+
;
1472+
1473+
multipleBatchesWithMvExpand
1474+
required_capability: join_lookup_v12
1475+
required_capability: remove_redundant_sort
1476+
required_capability: make_number_of_channels_consistent_with_layout
1477+
1478+
from *
1479+
| rename city.country.continent.planet.name as message
1480+
| lookup join message_types_lookup on message
1481+
| keep birth_date, language_code
1482+
| mv_expand birth_date
1483+
| sort birth_date, language_code
1484+
| limit 1
1485+
;
1486+
1487+
birth_date:datetime |language_code:integer
1488+
1952-02-27T00:00:00.000Z |null
1489+
;
1490+
1491+
multipleBatchesWithAggregate1
1492+
required_capability: join_lookup_v12
1493+
required_capability: remove_redundant_sort
1494+
required_capability: make_number_of_channels_consistent_with_layout
1495+
1496+
from *
1497+
| rename city.country.continent.planet.name as message
1498+
| lookup join message_types_lookup on message
1499+
| keep birth_date, language_code
1500+
| stats x=max(birth_date), y=min(language_code)
1501+
;
1502+
1503+
x:datetime |y:integer
1504+
1965-01-03T00:00:00.000Z |1
1505+
;
1506+
1507+
multipleBatchesWithAggregate2
1508+
required_capability: join_lookup_v12
1509+
required_capability: remove_redundant_sort
1510+
required_capability: make_number_of_channels_consistent_with_layout
1511+
1512+
from *
1513+
| rename city.country.continent.planet.name as message
1514+
| lookup join message_types_lookup on message
1515+
| keep birth_date, language_code
1516+
| stats m=min(birth_date) by language_code
1517+
| sort language_code
1518+
| limit 1
1519+
;
1520+
1521+
m:datetime |language_code:integer
1522+
null |1
1523+
;
1524+
1525+
multipleBatchesWithAggregate3
1526+
required_capability: join_lookup_v12
1527+
required_capability: remove_redundant_sort
1528+
required_capability: make_number_of_channels_consistent_with_layout
1529+
1530+
from *
1531+
| rename city.country.continent.planet.name as message
1532+
| lookup join message_types_lookup on message
1533+
| keep birth_date, language_code
1534+
| stats m=min(language_code) by birth_date
1535+
| sort birth_date
1536+
| limit 1
1537+
;
1538+
1539+
m:integer |birth_date:datetime
1540+
null |1952-02-27T00:00:00.000Z
1541+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -932,7 +932,12 @@ public enum Cap {
932932
/**
933933
* Index component selector syntax (my-data-stream-name::failures)
934934
*/
935-
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled());
935+
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled()),
936+
937+
/**
938+
* Make numberOfChannels consistent with layout in DefaultLayout by removing duplicated ChannelSet.
939+
*/
940+
MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT;
936941

937942
private final boolean enabled;
938943

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,20 @@ public Layout build() {
107107
Map<NameId, ChannelAndType> layout = new HashMap<>();
108108
int numberOfChannels = 0;
109109
for (ChannelSet set : channels) {
110-
int channel = numberOfChannels++;
110+
boolean createNewChannel = true;
111+
int channel = 0;
111112
for (NameId id : set.nameIds) {
113+
if (layout.containsKey(id)) {
114+
// If a NameId already exists in the map, do not increase the numberOfChannels, it can cause inverse() to create
115+
// a null in the list of channels, and NullPointerException when build() is called.
116+
// TODO avoid adding duplicated attributes with the same id in the plan, ReplaceMissingFieldWithNull may add nulls
117+
// with the same ids as the missing field ids.
118+
continue;
119+
}
120+
if (createNewChannel) {
121+
channel = numberOfChannels++;
122+
createNewChannel = false;
123+
}
112124
ChannelAndType next = new ChannelAndType(channel, set.type);
113125
ChannelAndType prev = layout.put(id, next);
114126
// Do allow multiple name to point to the same channel - see https://github.com/elastic/elasticsearch/pull/100238

0 commit comments

Comments
 (0)