Skip to content

Commit 165df72

Browse files
authored
Updated the Composite and Cardinality aggregators (#20173)
* Updated the Composite and Cardinality aggregators Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Removed the changes in extractSubAggsAndDocCount Signed-off-by: Vinay Krishna Pudyodu <[email protected]> --------- Signed-off-by: Vinay Krishna Pudyodu <[email protected]>
1 parent f42e790 commit 165df72

File tree

4 files changed

+24
-7
lines changed

4 files changed

+24
-7
lines changed

plugins/engine-datafusion/jni/src/partial_agg_optimizer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ impl PartialAggregationOptimizer {
5555
// println!("[DEBUG] Aggregate expressions: {:?}", agg.aggr_expr().iter().map(|e| e.name()).collect::<Vec<_>>());
5656

5757
let needs_partial = agg.aggr_expr().iter().any(|e| {
58-
let name = e.name().to_lowercase();
59-
name.starts_with("approx_distinct(")
58+
let name = e.fun().name().to_lowercase();
59+
name.eq("approx_distinct")
6060
});
6161

6262
if needs_partial && !matches!(agg.mode(), &AggregateMode::Partial) {

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
8080
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
8181
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
82+
import org.opensearch.search.aggregations.bucket.terms.StringTerms;
8283
import org.opensearch.search.aggregations.metrics.InternalValueCount;
8384
import org.opensearch.search.aggregations.metrics.ValueCountAggregator;
8485
import org.opensearch.search.internal.SearchContext;
@@ -739,10 +740,13 @@ public List<InternalAggregation> convert(Map<String, Object[]> shardResult, Sear
739740
List<CompositeKey> compositeKeys = new ArrayList<>(shardResult.size());
740741
for (int i = 0; i < shardResult.get(shardResult.keySet().stream().findFirst().get()).length; i++) {
741742
for (CompositeValuesSourceConfig sourceConfig : sourceConfigs) {
742-
if (sourceConfig.fieldType() == null) {
743-
throw new UnsupportedOperationException("Composite aggregation does not support script field types");
744-
}
745-
Object[] values = shardResult.get(sourceConfig.fieldType().name());
743+
// if (sourceConfig.fieldType() == null) {
744+
// throw new UnsupportedOperationException("Composite aggregation does not support script field types");
745+
// }
746+
// source=hits | eval m = extract(minute from EventTime) | stats count() by UserID, m, SearchPhrase | sort - \`count()\` | head 10
747+
// for above query without this change it will fail above
748+
// We can get the name directly from sourceConfig
749+
Object[] values = shardResult.get(sourceConfig.name());
746750
// TODO : Would require conversion for certain types,
747751
currentCompositeKey.add(searchContext.convertToComparable(values[i]));
748752
}

server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,10 @@ public long nextValue() throws IOException {
765765

766766
@Override
767767
public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row, SearchContext searchContext) {
768-
Object[] hlls = shardResult.get(name);
768+
Object[] hlls = shardResult.get(name + "[hll_registers]");
769+
if (hlls == null) {
770+
hlls = shardResult.get(name);
771+
}
769772
HyperLogLogPlusPlus sketch = DataFusionHLLWrapper.getHyperLogLogPlusPlus((byte[]) hlls[row]);
770773
return new InternalCardinality(name, sketch, null);
771774
}

server/src/main/java/org/opensearch/search/query/SearchEngineResultConversionUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.opensearch.search.aggregations.InternalAggregation;
1616
import org.opensearch.search.aggregations.InternalAggregations;
1717
import org.opensearch.search.aggregations.ShardResultConvertor;
18+
import org.opensearch.search.aggregations.metrics.CardinalityAggregator;
19+
import org.opensearch.search.aggregations.metrics.InternalCardinality;
1820
import org.opensearch.search.aggregations.metrics.InternalValueCount;
1921
import org.opensearch.search.aggregations.metrics.ValueCountAggregator;
2022
import org.opensearch.search.internal.SearchContext;
@@ -35,6 +37,11 @@ public static void convertDFResultGeneric(SearchContext searchContext) {
3537
if (searchContext.aggregations() != null) {
3638
Map<String, Object[]> dfResult = searchContext.getDFResults();
3739

40+
// LOGGER.info("DF Results at convertDFResultGeneric:");
41+
// for (Map.Entry<String, Object[]> entry : dfResult.entrySet()) {
42+
// LOGGER.info("{}: {}", entry.getKey(), java.util.Arrays.toString(entry.getValue()));
43+
// }
44+
3845
// Create aggregators which will process the result from DataFusion
3946
try {
4047

@@ -76,6 +83,9 @@ public static Tuple<List<InternalAggregation>, Long> extractSubAggsAndDocCount(A
7683
if (aggregator instanceof ValueCountAggregator) {
7784
docCount = ((InternalValueCount) subAgg).getValue();
7885
}
86+
// if (aggregator instanceof CardinalityAggregator) {
87+
// docCount = ((InternalCardinality) subAgg).getValue();
88+
// }
7989
subAggs.add(subAgg);
8090
}
8191
}

0 commit comments

Comments
 (0)