diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java index 7cbc7cc4c25db..55d46078830c7 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java @@ -8,75 +8,79 @@ package org.elasticsearch.compute.aggregation.blockhash; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; -import org.elasticsearch.common.util.BytesRefHash; -import org.elasticsearch.common.util.LongLongHash; +import org.elasticsearch.common.util.BytesRefArray; +import org.elasticsearch.common.util.IntArray; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; -import org.elasticsearch.compute.aggregation.SeenGroupIds; import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.BytesRefVector; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBigArrayVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.OrdinalBytesRefBlock; +import org.elasticsearch.compute.data.OrdinalBytesRefVector; import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import java.util.Objects; +/** + * An optimized block hash that receives two blocks: tsid and timestamp, which are sorted. + * Since the incoming data is sorted, this block hash appends the incoming data to the internal arrays without lookup. + */ public final class TimeSeriesBlockHash extends BlockHash { private final int tsHashChannel; private final int timestampIntervalChannel; - private final BytesRefHash tsidHashes; - private final LongLongHash intervalHash; - long groupOrdinal = -1; - BytesRef previousTsidHash; - long previousTimestampInterval; + private final BytesRef lastTsid = new BytesRef(); + private final BytesRefArrayWithSize tsidArray; + + private long lastTimestamp; + private final LongArrayWithSize timestampArray; + + private int currentTimestampCount; + private final IntArrayWithSize perTsidCountArray; + + int groupOrdinal = -1; - public TimeSeriesBlockHash(int tsHashChannel, int timestampIntervalChannel, DriverContext driverContext) { - super(driverContext.blockFactory()); + public TimeSeriesBlockHash(int tsHashChannel, int timestampIntervalChannel, BlockFactory blockFactory) { + super(blockFactory); this.tsHashChannel = tsHashChannel; this.timestampIntervalChannel = timestampIntervalChannel; - this.tsidHashes = new BytesRefHash(1, blockFactory.bigArrays()); - this.intervalHash = new LongLongHash(1, blockFactory.bigArrays()); + this.tsidArray = new BytesRefArrayWithSize(); + this.timestampArray = new LongArrayWithSize(); + this.perTsidCountArray = new IntArrayWithSize(); } @Override public void close() { - Releasables.close(tsidHashes, intervalHash); + Releasables.close(tsidArray, timestampArray, perTsidCountArray); } @Override public void add(Page page, GroupingAggregatorFunction.AddInput addInput) { - BytesRefBlock tsHashBlock = page.getBlock(tsHashChannel); - BytesRefVector tsHashVector = Objects.requireNonNull(tsHashBlock.asVector()); - try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsHashVector.getPositionCount())) { - LongBlock timestampIntervalBlock = page.getBlock(timestampIntervalChannel); - BytesRef spare = new BytesRef(); - for (int i = 0; i < tsHashVector.getPositionCount(); i++) { - BytesRef tsHash = tsHashVector.getBytesRef(i, spare); - long timestampInterval = timestampIntervalBlock.getLong(i); - // Optimization that relies on the fact that blocks are sorted by tsid hash and timestamp - if (tsHash.equals(previousTsidHash) == false || timestampInterval != previousTimestampInterval) { - long tsidOrdinal = tsidHashes.add(tsHash); - if (tsidOrdinal < 0) { - tsidOrdinal = -1 - tsidOrdinal; - } - groupOrdinal = intervalHash.add(tsidOrdinal, timestampInterval); - if (groupOrdinal < 0) { - groupOrdinal = -1 - groupOrdinal; - } - previousTsidHash = BytesRef.deepCopyOf(tsHash); - previousTimestampInterval = timestampInterval; - } - ordsBuilder.appendInt(Math.toIntExact(groupOrdinal)); + final BytesRefBlock tsidBlock = page.getBlock(tsHashChannel); + final BytesRefVector tsidVector = Objects.requireNonNull(tsidBlock.asVector(), "tsid input must be a vector"); + final LongBlock timestampBlock = page.getBlock(timestampIntervalChannel); + final LongVector timestampVector = Objects.requireNonNull(timestampBlock.asVector(), "timestamp input must be a vector"); + try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidVector.getPositionCount())) { + final BytesRef spare = new BytesRef(); + // TODO: optimize incoming ordinal block + for (int i = 0; i < tsidVector.getPositionCount(); i++) { + final BytesRef tsid = tsidVector.getBytesRef(i, spare); + final long timestamp = timestampVector.getLong(i); + ordsBuilder.appendInt(addOnePosition(tsid, timestamp)); } try (var ords = ordsBuilder.build()) { addInput.add(0, ords); @@ -84,6 +88,32 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) { } } + private int addOnePosition(BytesRef tsid, long timestamp) { + boolean newGroup = false; + if (groupOrdinal == -1 || lastTsid.equals(tsid) == false) { + assert groupOrdinal == -1 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward "; + endTsidGroup(); + tsidArray.append(tsid); + tsidArray.get(tsidArray.count - 1, lastTsid); + newGroup = true; + } + if (newGroup || timestamp != lastTimestamp) { + assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp; + timestampArray.append(timestamp); + lastTimestamp = timestamp; + groupOrdinal++; + currentTimestampCount++; + } + return groupOrdinal; + } + + private void endTsidGroup() { + if (currentTimestampCount > 0) { + perTsidCountArray.append(currentTimestampCount); + currentTimestampCount = 0; + } + } + @Override public ReleasableIterator lookup(Page page, ByteSizeValue targetBlockSize) { throw new UnsupportedOperationException("TODO"); @@ -91,39 +121,74 @@ public ReleasableIterator lookup(Page page, ByteSizeValue targetBlockS @Override public Block[] getKeys() { - int positions = (int) intervalHash.size(); - BytesRefVector tsidHashes = null; - LongVector timestampIntervals = null; - try ( - BytesRefVector.Builder tsidHashesBuilder = blockFactory.newBytesRefVectorBuilder(positions); - LongVector.FixedBuilder timestampIntervalsBuilder = blockFactory.newLongVectorFixedBuilder(positions) - ) { - BytesRef scratch = new BytesRef(); - for (long i = 0; i < positions; i++) { - BytesRef key1 = this.tsidHashes.get(intervalHash.getKey1(i), scratch); - tsidHashesBuilder.appendBytesRef(key1); - timestampIntervalsBuilder.appendLong((int) i, intervalHash.getKey2(i)); + endTsidGroup(); + final Block[] blocks = new Block[2]; + try { + if (OrdinalBytesRefBlock.isDense(positionCount(), tsidArray.count)) { + blocks[0] = buildTsidBlockWithOrdinal(); + } else { + blocks[0] = buildTsidBlock(); } - tsidHashes = tsidHashesBuilder.build(); - timestampIntervals = timestampIntervalsBuilder.build(); + blocks[1] = timestampArray.toBlock(); + return blocks; } finally { - if (timestampIntervals == null) { - Releasables.closeExpectNoException(tsidHashes); + if (blocks[blocks.length - 1] == null) { + Releasables.close(blocks); + } + } + } + + private BytesRefBlock buildTsidBlockWithOrdinal() { + try (IntVector.FixedBuilder ordinalBuilder = blockFactory.newIntVectorFixedBuilder(positionCount())) { + for (int i = 0; i < tsidArray.count; i++) { + int numTimestamps = perTsidCountArray.array.get(i); + for (int t = 0; t < numTimestamps; t++) { + ordinalBuilder.appendInt(i); + } + } + final IntVector ordinalVector = ordinalBuilder.build(); + BytesRefVector dictionary = null; + boolean success = false; + try { + dictionary = tsidArray.toVector(); + var result = new OrdinalBytesRefVector(ordinalVector, dictionary).asBlock(); + success = true; + return result; + } finally { + if (success == false) { + Releasables.close(ordinalVector, dictionary); + } + } + } + } + + private BytesRefBlock buildTsidBlock() { + try (BytesRefVector.Builder tsidBuilder = blockFactory.newBytesRefVectorBuilder(positionCount());) { + final BytesRef tsid = new BytesRef(); + for (int i = 0; i < tsidArray.count; i++) { + tsidArray.array.get(i, tsid); + int numTimestamps = perTsidCountArray.array.get(i); + for (int t = 0; t < numTimestamps; t++) { + tsidBuilder.appendBytesRef(tsid); + } } + return tsidBuilder.build().asBlock(); } - return new Block[] { tsidHashes.asBlock(), timestampIntervals.asBlock() }; + } + + private int positionCount() { + return groupOrdinal + 1; } @Override public IntVector nonEmpty() { - long endExclusive = intervalHash.size(); + long endExclusive = positionCount(); return IntVector.range(0, Math.toIntExact(endExclusive), blockFactory); } @Override public BitArray seenGroupIds(BigArrays bigArrays) { - long size = intervalHash.size(); - return new SeenGroupIds.Range(0, Math.toIntExact(size)).seenGroupIds(bigArrays); + return new Range(0, positionCount()).seenGroupIds(bigArrays); } public String toString() { @@ -135,4 +200,80 @@ public String toString() { + groupOrdinal + "b}"; } + + private class LongArrayWithSize implements Releasable { + private LongArray array; + private int count = 0; + + LongArrayWithSize() { + this.array = blockFactory.bigArrays().newLongArray(1, false); + } + + void append(long value) { + this.array = blockFactory.bigArrays().grow(array, count + 1); + this.array.set(count, value); + count++; + } + + LongBlock toBlock() { + LongBlock block = new LongBigArrayVector(array, count, blockFactory).asBlock(); + blockFactory.adjustBreaker(block.ramBytesUsed() - RamUsageEstimator.sizeOf(array)); + array = null; + return block; + } + + @Override + public void close() { + Releasables.close(array); + } + } + + private class IntArrayWithSize implements Releasable { + private IntArray array; + private int count = 0; + + IntArrayWithSize() { + this.array = blockFactory.bigArrays().newIntArray(1, false); + } + + void append(int value) { + this.array = blockFactory.bigArrays().grow(array, count + 1); + this.array.set(count, value); + count++; + } + + @Override + public void close() { + Releasables.close(array); + } + } + + private class BytesRefArrayWithSize implements Releasable { + private final BytesRefArray array; + private int count = 0; + + BytesRefArrayWithSize() { + this.array = new BytesRefArray(1, blockFactory.bigArrays()); + } + + void append(BytesRef value) { + array.append(value); + count++; + } + + void get(int index, BytesRef dest) { + array.get(index, dest); + } + + BytesRefVector toVector() { + BytesRefVector vector = blockFactory.newBytesRefArrayVector(tsidArray.array, tsidArray.count); + blockFactory.adjustBreaker(vector.ramBytesUsed() - tsidArray.array.bigArraysRamBytesUsed()); + return vector; + } + + @Override + public void close() { + Releasables.close(array); + } + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java index 87b33b3b0893d..c4ef3ddf288c3 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java @@ -54,7 +54,11 @@ void writeOrdinalBlock(StreamOutput out) throws IOException { * Returns true if this ordinal block is dense enough to enable optimizations using its ordinals */ public boolean isDense() { - return ordinals.getTotalValueCount() * 2 / 3 >= bytes.getPositionCount(); + return isDense(bytes.getPositionCount(), ordinals.getTotalValueCount()); + } + + public static boolean isDense(int totalPositions, int numOrdinals) { + return numOrdinals * 2L / 3L >= totalPositions; } public IntBlock getOrdinalsBlock() { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java index c47b6cebdaddc..db30c5e66f991 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.function.Function; import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -39,33 +40,55 @@ public class HashAggregationOperator implements Operator { - public record HashAggregationOperatorFactory( - List groups, - AggregatorMode aggregatorMode, - List aggregators, - int maxPageSize, - AnalysisRegistry analysisRegistry - ) implements OperatorFactory { - @Override - public Operator get(DriverContext driverContext) { + public static final class HashAggregationOperatorFactory implements OperatorFactory { + final Function blockHashSupplier; + final AggregatorMode aggregatorMode; + final List aggregators; + final int maxPageSize; + final AnalysisRegistry analysisRegistry; + + public HashAggregationOperatorFactory( + Function blockHashSupplier, + AggregatorMode aggregatorMode, + List aggregators, + int maxPageSize, + AnalysisRegistry analysisRegistry + ) { + this.blockHashSupplier = blockHashSupplier; + this.aggregatorMode = aggregatorMode; + this.aggregators = aggregators; + this.maxPageSize = maxPageSize; + this.analysisRegistry = analysisRegistry; + + } + + public HashAggregationOperatorFactory( + List groups, + AggregatorMode aggregatorMode, + List aggregators, + int maxPageSize, + AnalysisRegistry analysisRegistry + ) { if (groups.stream().anyMatch(BlockHash.GroupSpec::isCategorize)) { - return new HashAggregationOperator( - aggregators, - () -> BlockHash.buildCategorizeBlockHash( - groups, - aggregatorMode, - driverContext.blockFactory(), - analysisRegistry, - maxPageSize - ), - driverContext + this.blockHashSupplier = driverContext -> BlockHash.buildCategorizeBlockHash( + groups, + aggregatorMode, + driverContext.blockFactory(), + analysisRegistry, + maxPageSize ); + } else { + this.blockHashSupplier = driverContext -> BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false); } - return new HashAggregationOperator( - aggregators, - () -> BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false), - driverContext - ); + this.aggregatorMode = aggregatorMode; + this.aggregators = aggregators; + this.maxPageSize = maxPageSize; + this.analysisRegistry = analysisRegistry; + } + + @Override + public Operator get(DriverContext driverContext) { + return new HashAggregationOperator(aggregators, () -> blockHashSupplier.apply(driverContext), driverContext); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java index 3b011d4a682ff..c6602be15041b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java @@ -63,7 +63,7 @@ public Operator get(DriverContext driverContext) { aggregators.addAll(valuesAggregatorForGroupings(groupings, timeBucketChannel)); return new HashAggregationOperator( aggregators, - () -> new TimeSeriesBlockHash(tsHashChannel, timeBucketChannel, driverContext), + () -> new TimeSeriesBlockHash(tsHashChannel, timeBucketChannel, driverContext.blockFactory()), driverContext ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java index 8fb51457b6a8a..4a57418d99ed7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java @@ -13,7 +13,9 @@ import org.elasticsearch.compute.aggregation.FilteredAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.GroupingAggregator; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; +import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash; import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.lucene.TimeSeriesSortedSourceOperatorFactory; import org.elasticsearch.compute.operator.AggregationOperator; import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.compute.operator.HashAggregationOperator.HashAggregationOperatorFactory; @@ -28,6 +30,7 @@ import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.core.expression.NameId; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.evaluator.EvalMapper; import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; @@ -171,8 +174,25 @@ else if (aggregatorMode.isOutputPartial()) { true, // grouping s -> aggregatorFactories.add(s.supplier.groupingAggregatorFactory(s.mode, s.channels)) ); - - if (groupSpecs.size() == 1 && groupSpecs.get(0).channel == null) { + // time-series aggregation + if (source.sourceOperatorFactory instanceof TimeSeriesSortedSourceOperatorFactory + && aggregatorMode.isInputPartial() == false + && groupSpecs.size() == 2 + && groupSpecs.get(0).attribute.dataType() == DataType.TSID_DATA_TYPE + && groupSpecs.get(1).attribute.dataType() == DataType.LONG) { + operatorFactory = new HashAggregationOperatorFactory( + driverContext -> new TimeSeriesBlockHash( + groupSpecs.get(0).channel, + groupSpecs.get(1).channel, + driverContext.blockFactory() + ), + aggregatorMode, + aggregatorFactories, + context.pageSize(aggregateExec.estimatedRowSize()), + analysisRegistry + ); + // ordinal grouping + } else if (groupSpecs.size() == 1 && groupSpecs.get(0).channel == null) { operatorFactory = ordinalGroupingOperatorFactory( source, aggregateExec,