Skip to content

Commit c01cf11

Browse files
committed
TSDB codec
1 parent 073ca0e commit c01cf11

File tree

1 file changed

+134
-32
lines changed

1 file changed

+134
-32
lines changed

Diff for: server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesProducer.java

+134-32
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,21 @@
3333
import org.apache.lucene.store.ByteArrayDataInput;
3434
import org.apache.lucene.store.ChecksumIndexInput;
3535
import org.apache.lucene.store.DataInput;
36+
import org.apache.lucene.store.FilterIndexInput;
3637
import org.apache.lucene.store.IndexInput;
3738
import org.apache.lucene.store.RandomAccessInput;
3839
import org.apache.lucene.util.BytesRef;
40+
import org.apache.lucene.util.BytesRefBuilder;
3941
import org.apache.lucene.util.LongValues;
4042
import org.apache.lucene.util.compress.LZ4;
4143
import org.apache.lucene.util.packed.DirectMonotonicReader;
4244
import org.apache.lucene.util.packed.PackedInts;
45+
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
46+
import org.elasticsearch.core.CheckedFunction;
4347
import org.elasticsearch.core.IOUtils;
4448

4549
import java.io.IOException;
50+
import java.util.Arrays;
4651
import java.util.HashMap;
4752
import java.util.Map;
4853

@@ -51,11 +56,11 @@
5156
import static org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat.TERMS_DICT_BLOCK_LZ4_SHIFT;
5257

5358
public class ES87TSDBDocValuesProducer extends DocValuesProducer {
54-
private final Map<String, NumericEntry> numerics;
55-
private final Map<String, BinaryEntry> binaries;
56-
private final Map<String, SortedEntry> sorted;
57-
private final Map<String, SortedSetEntry> sortedSets;
58-
private final Map<String, SortedNumericEntry> sortedNumerics;
59+
private final Map<String, EntryOrBytes<NumericEntry>> numerics;
60+
private final Map<String, EntryOrBytes<BinaryEntry>> binaries;
61+
private final Map<String, EntryOrBytes<SortedEntry>> sorted;
62+
private final Map<String, EntryOrBytes<SortedSetEntry>> sortedSets;
63+
private final Map<String, EntryOrBytes<SortedNumericEntry>> sortedNumerics;
5964
private final Map<String, DocValuesSkipperEntry> skippers;
6065
private final IndexInput data;
6166
private final int maxDoc;
@@ -72,7 +77,6 @@ public class ES87TSDBDocValuesProducer extends DocValuesProducer {
7277
this.skippers = new HashMap<>();
7378
this.maxDoc = state.segmentInfo.maxDoc();
7479
this.merging = false;
75-
7680
// read in the entries from the metadata file.
7781
int version = -1;
7882
String metaName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, metaExtension);
@@ -130,11 +134,11 @@ public class ES87TSDBDocValuesProducer extends DocValuesProducer {
130134
}
131135

132136
private ES87TSDBDocValuesProducer(
133-
Map<String, NumericEntry> numerics,
134-
Map<String, BinaryEntry> binaries,
135-
Map<String, SortedEntry> sorted,
136-
Map<String, SortedSetEntry> sortedSets,
137-
Map<String, SortedNumericEntry> sortedNumerics,
137+
Map<String, EntryOrBytes<NumericEntry>> numerics,
138+
Map<String, EntryOrBytes<BinaryEntry>> binaries,
139+
Map<String, EntryOrBytes<SortedEntry>> sorted,
140+
Map<String, EntryOrBytes<SortedSetEntry>> sortedSets,
141+
Map<String, EntryOrBytes<SortedNumericEntry>> sortedNumerics,
138142
Map<String, DocValuesSkipperEntry> skippers,
139143
IndexInput data,
140144
int maxDoc,
@@ -160,13 +164,19 @@ public DocValuesProducer getMergeInstance() {
160164

161165
@Override
162166
public NumericDocValues getNumeric(FieldInfo field) throws IOException {
163-
NumericEntry entry = numerics.get(field.name);
167+
var entry = getOrDecode(numerics.get(field.name), bytes -> readNumeric(new ByteArrayIndexInput("numeric", bytes)));
168+
if (entry == null) {
169+
return null;
170+
}
164171
return getNumeric(entry, -1);
165172
}
166173

167174
@Override
168175
public BinaryDocValues getBinary(FieldInfo field) throws IOException {
169-
BinaryEntry entry = binaries.get(field.name);
176+
var entry = getOrDecode(binaries.get(field.name), bytes -> readBinary(new ByteArrayIndexInput("binary", bytes)));
177+
if (entry == null) {
178+
return null;
179+
}
170180
if (entry.docsWithFieldOffset == -2) {
171181
return DocValues.emptyBinary();
172182
}
@@ -320,7 +330,10 @@ public boolean advanceExact(int target) throws IOException {
320330

321331
@Override
322332
public SortedDocValues getSorted(FieldInfo field) throws IOException {
323-
SortedEntry entry = sorted.get(field.name);
333+
var entry = getOrDecode(sorted.get(field.name), bytes -> readSorted(new ByteArrayIndexInput("sorted", bytes)));
334+
if (entry == null) {
335+
return null;
336+
}
324337
return getSorted(entry);
325338
}
326339

@@ -675,13 +688,22 @@ public int docFreq() throws IOException {
675688

676689
@Override
677690
public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException {
678-
SortedNumericEntry entry = sortedNumerics.get(field.name);
691+
var entry = getOrDecode(
692+
sortedNumerics.get(field.name),
693+
bytes -> readSortedNumeric(new ByteArrayIndexInput("sorted_numeric", bytes))
694+
);
695+
if (entry == null) {
696+
return null;
697+
}
679698
return getSortedNumeric(entry, -1);
680699
}
681700

682701
@Override
683702
public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
684-
SortedSetEntry entry = sortedSets.get(field.name);
703+
var entry = getOrDecode(sortedSets.get(field.name), bytes -> readSortedSet(new ByteArrayIndexInput("sorted_set", bytes)));
704+
if (entry == null) {
705+
return null;
706+
}
685707
if (entry.singleValueEntry != null) {
686708
return DocValues.singleton(getSorted(entry.singleValueEntry));
687709
}
@@ -861,38 +883,85 @@ public void close() throws IOException {
861883
data.close();
862884
}
863885

886+
private static class StoringBytesIndexInput extends FilterIndexInput {
887+
final BytesRefBuilder buffer = new BytesRefBuilder();
888+
boolean tracking = false;
889+
890+
StoringBytesIndexInput(IndexInput in) {
891+
super("storing_bytes", in);
892+
}
893+
894+
void startTracking() {
895+
buffer.clear();
896+
tracking = true;
897+
}
898+
899+
byte[] endTracking() {
900+
tracking = false;
901+
BytesRef out = buffer.get();
902+
return Arrays.copyOf(out.bytes, out.length);
903+
}
904+
905+
@Override
906+
public byte readByte() throws IOException {
907+
byte b = super.readByte();
908+
if (tracking) {
909+
buffer.append(b);
910+
}
911+
return b;
912+
}
913+
914+
@Override
915+
public void readBytes(byte[] b, int offset, int len) throws IOException {
916+
super.readBytes(b, offset, len);
917+
if (tracking) {
918+
buffer.append(b, offset, len);
919+
}
920+
}
921+
922+
@Override
923+
public void skipBytes(long numBytes) throws IOException {
924+
super.skipBytes(numBytes);
925+
if (tracking) {
926+
byte[] empty = new byte[Math.toIntExact(numBytes)];
927+
buffer.append(empty, 0, empty.length);
928+
}
929+
}
930+
}
931+
864932
private void readFields(IndexInput meta, FieldInfos infos) throws IOException {
865-
for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = meta.readInt()) {
933+
StoringBytesIndexInput input = new StoringBytesIndexInput(meta);
934+
for (int fieldNumber = input.readInt(); fieldNumber != -1; fieldNumber = input.readInt()) {
866935
FieldInfo info = infos.fieldInfo(fieldNumber);
867936
if (info == null) {
868-
throw new CorruptIndexException("Invalid field number: " + fieldNumber, meta);
937+
throw new CorruptIndexException("Invalid field number: " + fieldNumber, input);
869938
}
870-
byte type = meta.readByte();
939+
byte type = input.readByte();
871940
if (info.docValuesSkipIndexType() != DocValuesSkipIndexType.NONE) {
872-
skippers.put(info.name, readDocValueSkipperMeta(meta));
941+
skippers.put(info.name, readDocValueSkipperMeta(input));
873942
}
943+
input.startTracking();
874944
if (type == ES87TSDBDocValuesFormat.NUMERIC) {
875-
numerics.put(info.name, readNumeric(meta));
945+
readNumeric(input);
946+
numerics.put(info.name, new EntryOrBytes<>(input.endTracking()));
876947
} else if (type == ES87TSDBDocValuesFormat.BINARY) {
877-
binaries.put(info.name, readBinary(meta));
948+
readBinary(input);
949+
binaries.put(info.name, new EntryOrBytes<>(input.endTracking()));
878950
} else if (type == ES87TSDBDocValuesFormat.SORTED) {
879-
sorted.put(info.name, readSorted(meta));
951+
readSorted(input);
952+
sorted.put(info.name, new EntryOrBytes<>(input.endTracking()));
880953
} else if (type == ES87TSDBDocValuesFormat.SORTED_SET) {
881-
sortedSets.put(info.name, readSortedSet(meta));
954+
readSortedSet(input);
955+
sortedSets.put(info.name, new EntryOrBytes<>(input.endTracking()));
882956
} else if (type == ES87TSDBDocValuesFormat.SORTED_NUMERIC) {
883-
sortedNumerics.put(info.name, readSortedNumeric(meta));
957+
readSortedNumeric(input);
958+
sortedNumerics.put(info.name, new EntryOrBytes<>(input.endTracking()));
884959
} else {
885-
throw new CorruptIndexException("invalid type: " + type, meta);
960+
throw new CorruptIndexException("invalid type: " + type, input);
886961
}
887962
}
888963
}
889964

890-
private static NumericEntry readNumeric(IndexInput meta) throws IOException {
891-
NumericEntry entry = new NumericEntry();
892-
readNumeric(meta, entry);
893-
return entry;
894-
}
895-
896965
private static DocValuesSkipperEntry readDocValueSkipperMeta(IndexInput meta) throws IOException {
897966
long offset = meta.readLong();
898967
long length = meta.readLong();
@@ -904,6 +973,12 @@ private static DocValuesSkipperEntry readDocValueSkipperMeta(IndexInput meta) th
904973
return new DocValuesSkipperEntry(offset, length, minValue, maxValue, docCount, maxDocID);
905974
}
906975

976+
private static NumericEntry readNumeric(IndexInput meta) throws IOException {
977+
NumericEntry entry = new NumericEntry();
978+
readNumeric(meta, entry);
979+
return entry;
980+
}
981+
907982
private static void readNumeric(IndexInput meta, NumericEntry entry) throws IOException {
908983
entry.docsWithFieldOffset = meta.readLong();
909984
entry.docsWithFieldLength = meta.readLong();
@@ -1480,4 +1555,31 @@ private static class TermsDictEntry {
14801555
int maxBlockLength;
14811556
}
14821557

1558+
private static class EntryOrBytes<E> {
1559+
private Object v;
1560+
1561+
EntryOrBytes(E v) {
1562+
this.v = v;
1563+
}
1564+
1565+
EntryOrBytes(byte[] v) {
1566+
this.v = v;
1567+
}
1568+
}
1569+
1570+
@SuppressWarnings("unchecked")
1571+
private <E> E getOrDecode(EntryOrBytes<E> entry, CheckedFunction<byte[], E, IOException> decoder) throws IOException {
1572+
if (entry == null) {
1573+
return null;
1574+
}
1575+
final Object current = entry.v;
1576+
assert current != null;
1577+
if (current instanceof byte[] bytes) {
1578+
E e = decoder.apply(bytes);
1579+
entry.v = e;
1580+
return e;
1581+
}
1582+
return (E) current;
1583+
}
1584+
14831585
}

0 commit comments

Comments
 (0)