Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Unique ids when replacing missing fields #125656

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/125462.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125462
summary: Unique ids when replacing missing fields
area: ES|QL
type: bug
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/125656.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 125656
summary: Unique ids when replacing missing fields
area: ES|QL
type: bug
issues:
- 121754
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,32 @@ public void testDoubleParamsWithLookupJoin() throws IOException {
);
}

public void testMultipleBatchesWithLookupJoin() throws IOException {
assumeTrue(
"Requires new null alias ids for join with multiple batches",
EsqlCapabilities.Cap.REPLACE_MISSING_FIELD_WITH_NULL_NEW_ALIAS_ID_FOR_JOIN_AND_MULTIPLE_BATCHES.isEnabled()
);
// Create more than 10 indices to trigger multiple batches of data node execution.
// The sort field should be missing on some indices to reproduce NullPointerException caused by duplicated items in layout
for (int i = 1; i <= 20; i++) {
createIndex("idx" + i, randomBoolean(), "\"mappings\": {\"properties\" : {\"a\" : {\"type\" : \"keyword\"}}}");
}
bulkLoadTestDataLookupMode(10);
// lookup join with and without sort
for (String sort : List.of("", "| sort integer")) {
var query = requestObjectBuilder().query(format(null, "from * | lookup join {} on integer {}", testIndexName(), sort));
Map<String, Object> result = runEsql(query);
var columns = as(result.get("columns"), List.class);
assertEquals(21, columns.size());
var values = as(result.get("values"), List.class);
assertEquals(10, values.size());
}
// clean up
for (int i = 1; i <= 20; i++) {
assertThat(deleteIndex("idx" + i).isAcknowledged(), is(true));
}
}

private void validateResultsOfDoubleParametersForIdentifiers(RequestObjectBuilder query) throws IOException {
Map<String, Object> result = runEsql(query);
Map<String, String> colA = Map.of("name", "boolean", "type", "boolean");
Expand Down Expand Up @@ -1668,6 +1694,13 @@ private static String repeatValueAsMV(Object value) {
return "[" + value + ", " + value + "]";
}

private static void createIndex(String indexName, boolean lookupMode, String mapping) throws IOException {
Request request = new Request("PUT", "/" + indexName);
String settings = "\"settings\" : {\"mode\" : \"lookup\"}, ";
request.setJsonEntity("{" + (lookupMode ? settings : "") + mapping + "}");
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
}

public static RequestObjectBuilder requestObjectBuilder() throws IOException {
return new RequestObjectBuilder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1452,3 +1452,90 @@ emp_no:integer | language_code:integer | language_name:keyword
10092 | 1 | English
10093 | 3 | Spanish
;

multipleBatchesWithSort
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| sort language_code, birth_date
| keep language_code
| limit 1
;

language_code:integer
1
;

multipleBatchesWithMvExpand
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| mv_expand birth_date
| sort birth_date, language_code
| limit 1
;

birth_date:datetime |language_code:integer
1952-02-27T00:00:00.000Z |null
;

multipleBatchesWithAggregate1
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats x=max(birth_date), y=min(language_code)
;

x:datetime |y:integer
1965-01-03T00:00:00.000Z |1
;

multipleBatchesWithAggregate2
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats m=min(birth_date) by language_code
| sort language_code
| limit 1
;

m:datetime |language_code:integer
null |1
;

multipleBatchesWithAggregate3
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats m=min(language_code) by birth_date
| sort birth_date
| limit 1
;

m:integer |birth_date:datetime
null |1952-02-27T00:00:00.000Z
;
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,12 @@ public enum Cap {
/**
* Index component selector syntax (my-data-stream-name::failures)
*/
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled());
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled()),

/**
* Create null alias with new id in ReplaceMissingFieldWithNull when there is lookup join with multiple batches.
*/
REPLACE_MISSING_FIELD_WITH_NULL_NEW_ALIAS_ID_FOR_JOIN_AND_MULTIPLE_BATCHES;

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField;
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
Expand Down Expand Up @@ -50,28 +51,32 @@ public LogicalPlan apply(LogicalPlan plan, LocalLogicalOptimizerContext localLog
}
});

return plan.transformUp(p -> missingToNull(p, localLogicalOptimizerContext.searchStats(), lookupFields));
AttributeMap<Attribute> fieldAttrReplacedBy = new AttributeMap<Attribute>();

return plan.transformUp(p -> missingToNull(p, localLogicalOptimizerContext.searchStats(), lookupFields, fieldAttrReplacedBy));
}

private LogicalPlan missingToNull(LogicalPlan plan, SearchStats stats, AttributeSet lookupFields) {
private LogicalPlan missingToNull(
LogicalPlan plan,
SearchStats stats,
AttributeSet lookupFields,
AttributeMap<Attribute> fieldAttrReplacedBy
) {
if (plan instanceof EsRelation || plan instanceof LocalRelation) {
return plan;
}
} else if (plan instanceof Project) {
// Only create null literals for newly encountered missing fields
plan = resolveAlreadyReplacedAttributes(plan, fieldAttrReplacedBy);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is problematic, see #110373

Project project = (Project) plan;

if (plan instanceof Aggregate a) {
// don't do anything (for now)
return a;
}
// keep the aliased name
else if (plan instanceof Project project) {
var projections = project.projections();
List<NamedExpression> newProjections = new ArrayList<>(projections.size());
Map<DataType, Alias> nullLiteral = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size());
AttributeSet joinAttributes = joinAttributes(project);

for (NamedExpression projection : projections) {
// Do not use the attribute name, this can deviate from the field name for union types.
if (projection instanceof FieldAttribute f
// Do not use the attribute name, this can deviate from the field name for union types
&& stats.exists(f.fieldName()) == false
&& joinAttributes.contains(f) == false
&& f.field() instanceof PotentiallyUnmappedKeywordEsField == false) {
Expand All @@ -81,24 +86,28 @@ else if (plan instanceof Project project) {
Alias nullAlias = nullLiteral.get(f.dataType());
// save the first field as null (per datatype)
if (nullAlias == null) {
Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null), f.id());
Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null), null);
nullLiteral.put(dt, alias);
projection = alias.toAttribute();
}
// otherwise point to it
else {
// since avoids creating field copies
projection = new Alias(f.source(), f.name(), nullAlias.toAttribute(), f.id());
projection = new Alias(f.source(), f.name(), nullAlias.toAttribute(), null);
}
fieldAttrReplacedBy.put(f, projection.toAttribute());
}

newProjections.add(projection);
}
// add the first found field as null

// Add an EVAL ahead of the Project to create null literals
if (nullLiteral.size() > 0) {
plan = new Eval(project.source(), project.child(), new ArrayList<>(nullLiteral.values()));
plan = new Project(project.source(), plan, newProjections);
}

return plan;
} else if (plan instanceof Eval
|| plan instanceof Filter
|| plan instanceof OrderBy
Expand All @@ -113,8 +122,22 @@ else if (plan instanceof Project project) {
? f
: Literal.of(f, null)
);

return plan;
}

// If we replaced any field attribute by a null literal previously, we must reference the null literal now - the initial field
// attribute is no longer available (projected away).
return resolveAlreadyReplacedAttributes(plan, fieldAttrReplacedBy);
}

private LogicalPlan resolveAlreadyReplacedAttributes(LogicalPlan plan, AttributeMap<Attribute> fieldAttrReplacedBy) {
if (fieldAttrReplacedBy.size() > 0) {
plan = plan.transformExpressionsOnly(FieldAttribute.class, f -> {
Attribute replacement = fieldAttrReplacedBy.get(f);
return replacement == null ? f : replacement;
});
}
return plan;
}

Expand Down
Loading