Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Fix ReplaceMissingFieldsWithNull #125764

Merged
Merged
8 changes: 8 additions & 0 deletions docs/changelog/125764.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pr: 125764
summary: Fix `ReplaceMissingFieldsWithNull`
area: ES|QL
type: bug
issues:
- 126036
- 121754
- 126030
Original file line number Diff line number Diff line change
Expand Up @@ -643,3 +643,21 @@ FROM airports
abbrev:k | city_name:k | city_location:geo_point | country:k | location:geo_point | name:text | region:text | boundary_wkt_length:i
IDR | Indore | POINT(75.8472 22.7167) | India | POINT(75.8092915005895 22.727749187571) | Devi Ahilyabai Holkar Int'l | Indore City | 231
;

// Regression test for https://github.com/elastic/elasticsearch/issues/126030
// We had wrong layouts from ReplaceMissingFieldsWithNull in case of indices that had relevant fields for the query,
// but were **missing the field we enrich on**.
fieldsInOtherIndicesBug
required_capability: enrich_load
required_capability: fix_replace_missing_field_with_null_duplicate_name_id_in_layout

from *
| keep author.keyword, book_no, scalerank, street, bytes_in, @timestamp, abbrev, city_location, distance, description, birth_date, language_code, intersects, client_ip, event_duration, version
| enrich languages_policy on author.keyword
| sort book_no
| limit 1
;

author.keyword:keyword|book_no:keyword|scalerank:integer|street:keyword|bytes_in:ul|@timestamp:unsupported|abbrev:keyword|city_location:geo_point|distance:double|description:unsupported|birth_date:date|language_code:integer|intersects:boolean|client_ip:unsupported|event_duration:long|version:version|language_name:keyword
Fyodor Dostoevsky |1211 |null |null |null |null |null |null |null |null |null |null |null |null |null |null |null
;
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,10 @@ emp_no:integer | language_code:integer | language_name:keyword
10093 | 3 | Spanish
;

###############################################
# Bugfixes
###############################################

multipleBatchesWithSort
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
Expand Down Expand Up @@ -1539,3 +1543,21 @@ from *
m:integer |birth_date:datetime
null |1952-02-27T00:00:00.000Z
;

// Regression test for https://github.com/elastic/elasticsearch/issues/126030
// We had wrong layouts from ReplaceMissingFieldsWithNull

enrichLookupStatsBug
required_capability: join_lookup_v12
required_capability: fix_replace_missing_field_with_null_duplicate_name_id_in_layout

from *
| enrich languages_policy on cluster
| rename languages.byte as language_code
| lookup join languages_lookup on language_code
| stats salary_change.long = max(ratings), foo = max(num)
;

salary_change.long:double|foo:long
5.0 |1698069301543123456
;
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,12 @@ public enum Cap {
/**
* Support for sorting when aggregate_metric_doubles are present
*/
AGGREGATE_METRIC_DOUBLE_SORTING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG);
AGGREGATE_METRIC_DOUBLE_SORTING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),

/**
* Supercedes {@link Cap#MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT}.
*/
FIX_REPLACE_MISSING_FIELD_WITH_NULL_DUPLICATE_NAME_ID_IN_LAYOUT;

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField;
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
Expand All @@ -26,14 +26,12 @@
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.rule.ParameterizedRule;
import org.elasticsearch.xpack.esql.stats.SearchStats;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

/**
* Look for any fields used in the plan that are missing locally and replace them with null.
Expand All @@ -45,82 +43,83 @@ public class ReplaceMissingFieldWithNull extends ParameterizedRule<LogicalPlan,
public LogicalPlan apply(LogicalPlan plan, LocalLogicalOptimizerContext localLogicalOptimizerContext) {
var lookupFieldsBuilder = AttributeSet.builder();
plan.forEachUp(EsRelation.class, esRelation -> {
// Looking only for indices in LOOKUP mode is correct: during parsing, we assign the expected mode and even if a lookup index
// is used in the FROM command, it will not be marked with LOOKUP mode there - but STANDARD.
// It seems like we could instead just look for JOINs and walk down their right hand side to find lookup fields - but this does
// not work as this rule also gets called just on the right hand side of a JOIN, which means that we don't always know that
// we're inside the right (or left) branch of a JOIN node. (See PlannerUtils.localPlan - this looks for FragmentExecs and
// performs local logical optimization of the fragments; the right hand side of a LookupJoinExec can be a FragmentExec.)
if (esRelation.indexMode() == IndexMode.LOOKUP) {
lookupFieldsBuilder.addAll(esRelation.output());
}
});
AttributeSet lookupFields = lookupFieldsBuilder.build();

return plan.transformUp(p -> missingToNull(p, localLogicalOptimizerContext.searchStats(), lookupFieldsBuilder.build()));
}

private LogicalPlan missingToNull(LogicalPlan plan, SearchStats stats, AttributeSet lookupFields) {
if (plan instanceof EsRelation || plan instanceof LocalRelation) {
return plan;
}
// Do not use the attribute name, this can deviate from the field name for union types; use fieldName() instead.
// Also retain fields from lookup indices because we do not have stats for these.
Predicate<FieldAttribute> shouldBeRetained = f -> f.field() instanceof PotentiallyUnmappedKeywordEsField
|| (localLogicalOptimizerContext.searchStats().exists(f.fieldName()) || lookupFields.contains(f));

if (plan instanceof Aggregate a) {
// don't do anything (for now)
return a;
}
// keep the aliased name
else if (plan instanceof Project project) {
var projections = project.projections();
List<NamedExpression> newProjections = new ArrayList<>(projections.size());
Map<DataType, Alias> nullLiteral = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size());
AttributeSet joinAttributes = joinAttributes(project);
return plan.transformUp(p -> missingToNull(p, shouldBeRetained));
}

for (NamedExpression projection : projections) {
// Do not use the attribute name, this can deviate from the field name for union types.
if (projection instanceof FieldAttribute f
&& stats.exists(f.fieldName()) == false
&& joinAttributes.contains(f) == false
&& f.field() instanceof PotentiallyUnmappedKeywordEsField == false) {
// TODO: Should do a searchStats lookup for join attributes instead of just ignoring them here
// See TransportSearchShardsAction
private LogicalPlan missingToNull(LogicalPlan plan, Predicate<FieldAttribute> shouldBeRetained) {
if (plan instanceof EsRelation relation) {
// Remove missing fields from the EsRelation because this is not where we will obtain them from; replace them by an Eval right
// after, instead. This allows us to safely re-use the attribute ids of the corresponding FieldAttributes.
// This means that an EsRelation[field1, field2, field3] where field1 and field 3 are missing will be replaced by
// Project[field1, field2, field3] <- keeps the ordering intact
// \_Eval[field1 = null, field3 = null]
// \_EsRelation[field2]
List<Attribute> relationOutput = relation.output();
Map<DataType, Alias> nullLiterals = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size());
List<NamedExpression> newProjections = new ArrayList<>(relationOutput.size());
for (int i = 0, size = relationOutput.size(); i < size; i++) {
Attribute attr = relationOutput.get(i);
NamedExpression projection;
if (attr instanceof FieldAttribute f && (shouldBeRetained.test(f) == false)) {
DataType dt = f.dataType();
Alias nullAlias = nullLiteral.get(f.dataType());
Alias nullAlias = nullLiterals.get(dt);
// save the first field as null (per datatype)
if (nullAlias == null) {
// Keep the same id so downstream query plans don't need updating
// NOTE: THIS IS BRITTLE AND CAN LEAD TO BUGS.
// In case some optimizer rule or so inserts a plan node that requires the field BEFORE the Eval that we're adding
// on top of the EsRelation, this can trigger a field extraction in the physical optimizer phase, causing wrong
// layouts due to a duplicate name id.
// If someone reaches here AGAIN when debugging e.g. ClassCastExceptions NPEs from wrong layouts, we should probably
// give up on this approach and instead insert EvalExecs in InsertFieldExtraction.
Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null), f.id());
nullLiteral.put(dt, alias);
nullLiterals.put(dt, alias);
projection = alias.toAttribute();
}
// otherwise point to it
// otherwise point to it since this avoids creating field copies
else {
// since avoids creating field copies
projection = new Alias(f.source(), f.name(), nullAlias.toAttribute(), f.id());
}
} else {
projection = attr;
}

newProjections.add(projection);
}
// add the first found field as null
if (nullLiteral.size() > 0) {
plan = new Eval(project.source(), project.child(), new ArrayList<>(nullLiteral.values()));
plan = new Project(project.source(), plan, newProjections);

if (nullLiterals.size() == 0) {
return plan;
}
} else if (plan instanceof Eval

Eval eval = new Eval(plan.source(), relation, new ArrayList<>(nullLiterals.values()));
// This projection is redundant if there's another projection downstream (and no commands depend on the order until we hit it).
return new Project(plan.source(), eval, newProjections);
}

if (plan instanceof Eval
|| plan instanceof Filter
|| plan instanceof OrderBy
|| plan instanceof RegexExtract
|| plan instanceof TopN) {
plan = plan.transformExpressionsOnlyUp(
FieldAttribute.class,
// Do not use the attribute name, this can deviate from the field name for union types.
// Also skip fields from lookup indices because we do not have stats for these.
// TODO: We do have stats for lookup indices in case they are being used in the FROM clause; this can be refined.
f -> f.field() instanceof PotentiallyUnmappedKeywordEsField || (stats.exists(f.fieldName()) || lookupFields.contains(f))
? f
: Literal.of(f, null)
);
}
return plan.transformExpressionsOnlyUp(FieldAttribute.class, f -> shouldBeRetained.test(f) ? f : Literal.of(f, null));
}

return plan;
}

private static AttributeSet joinAttributes(Project project) {
var attributesBuilder = AttributeSet.builder();
project.forEachDown(Join.class, j -> j.right().forEachDown(EsRelation.class, p -> attributesBuilder.addAll(p.output())));
return attributesBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,26 +107,13 @@ public Layout build() {
Map<NameId, ChannelAndType> layout = new HashMap<>();
int numberOfChannels = 0;
for (ChannelSet set : channels) {
boolean createNewChannel = true;
int channel = 0;
int channel = numberOfChannels++;
for (NameId id : set.nameIds) {
if (layout.containsKey(id)) {
// If a NameId already exists in the map, do not increase the numberOfChannels, it can cause inverse() to create
// a null in the list of channels, and NullPointerException when build() is called.
// TODO avoid adding duplicated attributes with the same id in the plan, ReplaceMissingFieldWithNull may add nulls
// with the same ids as the missing field ids.
continue;
}
if (createNewChannel) {
channel = numberOfChannels++;
createNewChannel = false;
}
// Duplicate name ids would mean that have 2 channels that are declared under the same id. That makes no sense - which
// channel should subsequent operators use, then, when they want to refer to this id?
assert (layout.containsKey(id) == false) : "Duplicate name ids are not allowed in layouts";
ChannelAndType next = new ChannelAndType(channel, set.type);
ChannelAndType prev = layout.put(id, next);
// Do allow multiple name to point to the same channel - see https://github.com/elastic/elasticsearch/pull/100238
// if (prev != null) {
// throw new IllegalArgumentException("Name [" + id + "] is on two channels [" + prev + "] and [" + next + "]");
// }
layout.put(id, next);
}
}
return new DefaultLayout(Collections.unmodifiableMap(layout), numberOfChannels);
Expand Down
Loading
Loading