Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize TimeSeriesBlockHash #125461

Merged
merged 1 commit into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,122 +8,187 @@
package org.elasticsearch.compute.aggregation.blockhash;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.common.util.LongLongHash;
import org.elasticsearch.common.util.BytesRefArray;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.aggregation.SeenGroupIds;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.BytesRefVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBigArrayVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;

import java.util.Objects;

/**
* An optimized block hash that receives two blocks: tsid and timestamp, which are sorted.
* Since the incoming data is sorted, this block hash appends the incoming data to the internal arrays without lookup.
*/
public final class TimeSeriesBlockHash extends BlockHash {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a unit test for this class. In case if it is missing, should we add a unit test for this block hash implementation?


private final int tsHashChannel;
private final int timestampIntervalChannel;
private final BytesRefHash tsidHashes;
private final LongLongHash intervalHash;

long groupOrdinal = -1;
BytesRef previousTsidHash;
long previousTimestampInterval;
private final BytesRef lastTsid = new BytesRef();
private final BytesRefArrayWithSize tsidArray;

private long lastTimestamp;
private final LongArrayWithSize timestampArray;

private int currentTimestampCount;
private final IntArrayWithSize perTsidCountArray;

int groupOrdinal = -1;

public TimeSeriesBlockHash(int tsHashChannel, int timestampIntervalChannel, DriverContext driverContext) {
super(driverContext.blockFactory());
public TimeSeriesBlockHash(int tsHashChannel, int timestampIntervalChannel, BlockFactory blockFactory) {
super(blockFactory);
this.tsHashChannel = tsHashChannel;
this.timestampIntervalChannel = timestampIntervalChannel;
this.tsidHashes = new BytesRefHash(1, blockFactory.bigArrays());
this.intervalHash = new LongLongHash(1, blockFactory.bigArrays());
this.tsidArray = new BytesRefArrayWithSize();
this.timestampArray = new LongArrayWithSize();
this.perTsidCountArray = new IntArrayWithSize();
}

@Override
public void close() {
Releasables.close(tsidHashes, intervalHash);
Releasables.close(tsidArray, timestampArray, perTsidCountArray);
}

@Override
public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
BytesRefBlock tsHashBlock = page.getBlock(tsHashChannel);
BytesRefVector tsHashVector = Objects.requireNonNull(tsHashBlock.asVector());
try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsHashVector.getPositionCount())) {
LongBlock timestampIntervalBlock = page.getBlock(timestampIntervalChannel);
BytesRef spare = new BytesRef();
for (int i = 0; i < tsHashVector.getPositionCount(); i++) {
BytesRef tsHash = tsHashVector.getBytesRef(i, spare);
long timestampInterval = timestampIntervalBlock.getLong(i);
// Optimization that relies on the fact that blocks are sorted by tsid hash and timestamp
if (tsHash.equals(previousTsidHash) == false || timestampInterval != previousTimestampInterval) {
long tsidOrdinal = tsidHashes.add(tsHash);
if (tsidOrdinal < 0) {
tsidOrdinal = -1 - tsidOrdinal;
}
groupOrdinal = intervalHash.add(tsidOrdinal, timestampInterval);
if (groupOrdinal < 0) {
groupOrdinal = -1 - groupOrdinal;
}
previousTsidHash = BytesRef.deepCopyOf(tsHash);
previousTimestampInterval = timestampInterval;
}
ordsBuilder.appendInt(Math.toIntExact(groupOrdinal));
final BytesRefBlock tsidBlock = page.getBlock(tsHashChannel);
final BytesRefVector tsidVector = Objects.requireNonNull(tsidBlock.asVector(), "tsid input must be a vector");
final LongBlock timestampBlock = page.getBlock(timestampIntervalChannel);
final LongVector timestampVector = Objects.requireNonNull(timestampBlock.asVector(), "timestamp input must be a vector");
try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidVector.getPositionCount())) {
final BytesRef spare = new BytesRef();
// TODO: optimize incoming ordinal block
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean here that in a future change the time series source operator should generate the time series ordinal?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, I see this is tested indirectly via TimeSeriesAggregationOperatorTests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I think we should have a separate unit test for this. I'll add it in a follow-up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean here that in a future change the time series source operator should generate the time series ordinal?

Yes, I will try to improve TimeSeriesSortedSourceOperatorFactory.

for (int i = 0; i < tsidVector.getPositionCount(); i++) {
final BytesRef tsid = tsidVector.getBytesRef(i, spare);
final long timestamp = timestampVector.getLong(i);
ordsBuilder.appendInt(addOnePosition(tsid, timestamp));
}
try (var ords = ordsBuilder.build()) {
addInput.add(0, ords);
}
}
}

private int addOnePosition(BytesRef tsid, long timestamp) {
boolean newGroup = false;
if (groupOrdinal == -1 || lastTsid.equals(tsid) == false) {
assert groupOrdinal == -1 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward ";
endTsidGroup();
tsidArray.append(tsid);
tsidArray.get(tsidArray.count - 1, lastTsid);
newGroup = true;
}
if (newGroup || timestamp != lastTimestamp) {
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
timestampArray.append(timestamp);
lastTimestamp = timestamp;
groupOrdinal++;
currentTimestampCount++;
}
return groupOrdinal;
}

private void endTsidGroup() {
if (currentTimestampCount > 0) {
perTsidCountArray.append(currentTimestampCount);
currentTimestampCount = 0;
}
}

@Override
public ReleasableIterator<IntBlock> lookup(Page page, ByteSizeValue targetBlockSize) {
throw new UnsupportedOperationException("TODO");
}

@Override
public Block[] getKeys() {
int positions = (int) intervalHash.size();
BytesRefVector tsidHashes = null;
LongVector timestampIntervals = null;
try (
BytesRefVector.Builder tsidHashesBuilder = blockFactory.newBytesRefVectorBuilder(positions);
LongVector.FixedBuilder timestampIntervalsBuilder = blockFactory.newLongVectorFixedBuilder(positions)
) {
BytesRef scratch = new BytesRef();
for (long i = 0; i < positions; i++) {
BytesRef key1 = this.tsidHashes.get(intervalHash.getKey1(i), scratch);
tsidHashesBuilder.appendBytesRef(key1);
timestampIntervalsBuilder.appendLong((int) i, intervalHash.getKey2(i));
endTsidGroup();
final Block[] blocks = new Block[2];
try {
if (OrdinalBytesRefBlock.isDense(positionCount(), tsidArray.count)) {
blocks[0] = buildTsidBlockWithOrdinal();
} else {
blocks[0] = buildTsidBlock();
}
tsidHashes = tsidHashesBuilder.build();
timestampIntervals = timestampIntervalsBuilder.build();
blocks[1] = timestampArray.toBlock();
return blocks;
} finally {
if (timestampIntervals == null) {
Releasables.closeExpectNoException(tsidHashes);
if (blocks[blocks.length - 1] == null) {
Releasables.close(blocks);
}
}
}

private BytesRefBlock buildTsidBlockWithOrdinal() {
try (IntVector.FixedBuilder ordinalBuilder = blockFactory.newIntVectorFixedBuilder(positionCount())) {
for (int i = 0; i < tsidArray.count; i++) {
int numTimestamps = perTsidCountArray.array.get(i);
for (int t = 0; t < numTimestamps; t++) {
ordinalBuilder.appendInt(i);
}
}
final IntVector ordinalVector = ordinalBuilder.build();
BytesRefVector dictionary = null;
boolean success = false;
try {
dictionary = tsidArray.toVector();
var result = new OrdinalBytesRefVector(ordinalVector, dictionary).asBlock();
success = true;
return result;
} finally {
if (success == false) {
Releasables.close(ordinalVector, dictionary);
}
}
}
}

private BytesRefBlock buildTsidBlock() {
try (BytesRefVector.Builder tsidBuilder = blockFactory.newBytesRefVectorBuilder(positionCount());) {
final BytesRef tsid = new BytesRef();
for (int i = 0; i < tsidArray.count; i++) {
tsidArray.array.get(i, tsid);
int numTimestamps = perTsidCountArray.array.get(i);
for (int t = 0; t < numTimestamps; t++) {
tsidBuilder.appendBytesRef(tsid);
}
}
return tsidBuilder.build().asBlock();
}
return new Block[] { tsidHashes.asBlock(), timestampIntervals.asBlock() };
}

private int positionCount() {
return groupOrdinal + 1;
}

@Override
public IntVector nonEmpty() {
long endExclusive = intervalHash.size();
long endExclusive = positionCount();
return IntVector.range(0, Math.toIntExact(endExclusive), blockFactory);
}

@Override
public BitArray seenGroupIds(BigArrays bigArrays) {
long size = intervalHash.size();
return new SeenGroupIds.Range(0, Math.toIntExact(size)).seenGroupIds(bigArrays);
return new Range(0, positionCount()).seenGroupIds(bigArrays);
}

public String toString() {
Expand All @@ -135,4 +200,80 @@ public String toString() {
+ groupOrdinal
+ "b}";
}

private class LongArrayWithSize implements Releasable {
private LongArray array;
private int count = 0;

LongArrayWithSize() {
this.array = blockFactory.bigArrays().newLongArray(1, false);
}

void append(long value) {
this.array = blockFactory.bigArrays().grow(array, count + 1);
this.array.set(count, value);
count++;
}

LongBlock toBlock() {
LongBlock block = new LongBigArrayVector(array, count, blockFactory).asBlock();
blockFactory.adjustBreaker(block.ramBytesUsed() - RamUsageEstimator.sizeOf(array));
array = null;
return block;
}

@Override
public void close() {
Releasables.close(array);
}
}

private class IntArrayWithSize implements Releasable {
private IntArray array;
private int count = 0;

IntArrayWithSize() {
this.array = blockFactory.bigArrays().newIntArray(1, false);
}

void append(int value) {
this.array = blockFactory.bigArrays().grow(array, count + 1);
this.array.set(count, value);
count++;
}

@Override
public void close() {
Releasables.close(array);
}
}

private class BytesRefArrayWithSize implements Releasable {
private final BytesRefArray array;
private int count = 0;

BytesRefArrayWithSize() {
this.array = new BytesRefArray(1, blockFactory.bigArrays());
}

void append(BytesRef value) {
array.append(value);
count++;
}

void get(int index, BytesRef dest) {
array.get(index, dest);
}

BytesRefVector toVector() {
BytesRefVector vector = blockFactory.newBytesRefArrayVector(tsidArray.array, tsidArray.count);
blockFactory.adjustBreaker(vector.ramBytesUsed() - tsidArray.array.bigArraysRamBytesUsed());
return vector;
}

@Override
public void close() {
Releasables.close(array);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ void writeOrdinalBlock(StreamOutput out) throws IOException {
* Returns true if this ordinal block is dense enough to enable optimizations using its ordinals
*/
public boolean isDense() {
return ordinals.getTotalValueCount() * 2 / 3 >= bytes.getPositionCount();
return isDense(bytes.getPositionCount(), ordinals.getTotalValueCount());
}

public static boolean isDense(int totalPositions, int numOrdinals) {
return numOrdinals * 2L / 3L >= totalPositions;
}

public IntBlock getOrdinalsBlock() {
Expand Down
Loading