Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ES|QL] Make numberOfChannels consistent with layout map by removing duplicated ChannelSet #125636

Merged
merged 5 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/125636.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 125636
summary: Make `numberOfChannels` consistent with layout map by removing duplicated
`ChannelSet`
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,32 @@ private void validateResultsOfDoubleParametersForIdentifiers(RequestObjectBuilde
assertEquals(List.of(List.of(false, 9.1), List.of(true, 8.1)), result.get("values"));
}

public void testMultipleBatchesWithLookupJoin() throws IOException {
assumeTrue(
"Makes numberOfChannels consistent with layout map for join with multiple batches",
EsqlCapabilities.Cap.MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT.isEnabled()
);
// Create more than 10 indices to trigger multiple batches of data node execution.
// The sort field should be missing on some indices to reproduce NullPointerException caused by duplicated items in layout
for (int i = 1; i <= 20; i++) {
createIndex("idx" + i, randomBoolean(), "\"mappings\": {\"properties\" : {\"a\" : {\"type\" : \"keyword\"}}}");
}
bulkLoadTestDataLookupMode(10);
// lookup join with and without sort
for (String sort : List.of("", "| sort integer")) {
var query = requestObjectBuilder().query(format(null, "from * | lookup join {} on integer {}", testIndexName(), sort));
Map<String, Object> result = runEsql(query);
var columns = as(result.get("columns"), List.class);
assertEquals(21, columns.size());
var values = as(result.get("values"), List.class);
assertEquals(10, values.size());
}
// clean up
for (int i = 1; i <= 20; i++) {
assertThat(deleteIndex("idx" + i).isAcknowledged(), is(true));
}
}

public void testErrorMessageForLiteralDateMathOverflow() throws IOException {
List<String> dateMathOverflowExpressions = List.of(
"2147483647 day + 1 day",
Expand Down Expand Up @@ -1668,6 +1694,13 @@ private static String repeatValueAsMV(Object value) {
return "[" + value + ", " + value + "]";
}

private static void createIndex(String indexName, boolean lookupMode, String mapping) throws IOException {
Request request = new Request("PUT", "/" + indexName);
String settings = "\"settings\" : {\"mode\" : \"lookup\"}, ";
request.setJsonEntity("{" + (lookupMode ? settings : "") + mapping + "}");
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
}

public static RequestObjectBuilder requestObjectBuilder() throws IOException {
return new RequestObjectBuilder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1452,3 +1452,90 @@ emp_no:integer | language_code:integer | language_name:keyword
10092 | 1 | English
10093 | 3 | Spanish
;

multipleBatchesWithSort
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: make_number_of_channels_consistent_with_layout

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| sort language_code, birth_date
| keep language_code
| limit 1
;

language_code:integer
1
;

multipleBatchesWithMvExpand
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: make_number_of_channels_consistent_with_layout

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| mv_expand birth_date
| sort birth_date, language_code
| limit 1
;

birth_date:datetime |language_code:integer
1952-02-27T00:00:00.000Z |null
;

multipleBatchesWithAggregate1
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: make_number_of_channels_consistent_with_layout

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats x=max(birth_date), y=min(language_code)
;

x:datetime |y:integer
1965-01-03T00:00:00.000Z |1
;

multipleBatchesWithAggregate2
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: make_number_of_channels_consistent_with_layout

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats m=min(birth_date) by language_code
| sort language_code
| limit 1
;

m:datetime |language_code:integer
null |1
;

multipleBatchesWithAggregate3
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: make_number_of_channels_consistent_with_layout

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats m=min(language_code) by birth_date
| sort birth_date
| limit 1
;

m:integer |birth_date:datetime
null |1952-02-27T00:00:00.000Z
;
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,12 @@ public enum Cap {
/**
* Index component selector syntax (my-data-stream-name::failures)
*/
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled());
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled()),

/**
* Make numberOfChannels consistent with layout in DefaultLayout by removing duplicated ChannelSet.
*/
MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT;

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,20 @@ public Layout build() {
Map<NameId, ChannelAndType> layout = new HashMap<>();
int numberOfChannels = 0;
for (ChannelSet set : channels) {
int channel = numberOfChannels++;
boolean createNewChannel = true;
int channel = 0;
for (NameId id : set.nameIds) {
if (layout.containsKey(id)) {
// If a NameId already exists in the map, do not increase the numberOfChannels, it can cause inverse() to create
// a null in the list of channels, and NullPointerException when build() is called.
// TODO avoid adding duplicated attributes with the same id in the plan, ReplaceMissingFieldWithNull may add nulls
// with the same ids as the missing field ids.
continue;
}
if (createNewChannel) {
channel = numberOfChannels++;
createNewChannel = false;
}
ChannelAndType next = new ChannelAndType(channel, set.type);
ChannelAndType prev = layout.put(id, next);
// Do allow multiple name to point to the same channel - see https://github.com/elastic/elasticsearch/pull/100238
Expand Down
Loading