Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNR] Unify existing RLE dictionary decoders #24728

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package com.facebook.presto.parquet.batchreader.decoders;

import com.facebook.presto.parquet.batchreader.decoders.rle.BaseRLEBitPackedDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.GenericRLEDictionaryValuesDecoder;
import org.apache.parquet.io.ParquetDecodingException;

import java.io.IOException;
Expand All @@ -22,7 +22,7 @@
import static com.google.common.base.Preconditions.checkState;

public class DefinitionLevelDecoder
extends BaseRLEBitPackedDecoder
extends GenericRLEDictionaryValuesDecoder
{
public DefinitionLevelDecoder(int valueCount, int bitWidth, InputStream inputStream)
{
Expand All @@ -40,16 +40,16 @@ public void readNext(int[] values, int offset, int length)
int destinationIndex = offset;
int remainingToCopy = length;
while (remainingToCopy > 0) {
if (currentCount == 0) {
if (getCurrentCount() == 0) {
if (!decode()) {
break;
}
}

int chunkSize = Math.min(remainingToCopy, currentCount);
switch (mode) {
int chunkSize = Math.min(remainingToCopy, getCurrentCount());
switch (getCurrentMode()) {
case RLE: {
int rleValue = currentValue;
int rleValue = getDecodedInt();
int endIndex = destinationIndex + chunkSize;
while (destinationIndex < endIndex) {
values[destinationIndex] = rleValue;
Expand All @@ -58,14 +58,15 @@ public void readNext(int[] values, int offset, int length)
break;
}
case PACKED: {
System.arraycopy(currentBuffer, currentBuffer.length - currentCount, values, destinationIndex, chunkSize);
int[] decodedInts = getDecodedInts();
System.arraycopy(decodedInts, decodedInts.length - getCurrentCount(), values, destinationIndex, chunkSize);
destinationIndex += chunkSize;
break;
}
default:
throw new ParquetDecodingException("not a valid mode " + mode);
throw new ParquetDecodingException("not a valid mode " + getCurrentMode());
}
currentCount -= chunkSize;
decrementCurrentCount(chunkSize);
remainingToCopy -= chunkSize;
}
checkState(remainingToCopy == 0, "Failed to copy the requested number of DLs");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package com.facebook.presto.parquet.batchreader.decoders;

import com.facebook.presto.parquet.batchreader.decoders.rle.BaseRLEBitPackedDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.GenericRLEDictionaryValuesDecoder;
import org.apache.parquet.io.ParquetDecodingException;

import java.io.IOException;
Expand All @@ -25,7 +25,7 @@
* Definition Level decoder for non-nested types where the values are either 0 or 1
*/
public class FlatDefinitionLevelDecoder
extends BaseRLEBitPackedDecoder
extends GenericRLEDictionaryValuesDecoder
{
public FlatDefinitionLevelDecoder(int valueCount, InputStream inputStream)
{
Expand All @@ -44,36 +44,36 @@ public int readNext(boolean[] values, int offset, int length)
int destinationIndex = offset;
int remainingToCopy = length;
while (remainingToCopy > 0) {
if (currentCount == 0) {
if (getCurrentCount() == 0) {
if (!decode()) {
break;
}
}

int chunkSize = Math.min(remainingToCopy, currentCount);
int chunkSize = Math.min(remainingToCopy, getCurrentCount());
int endIndex = destinationIndex + chunkSize;
switch (mode) {
switch (getCurrentMode()) {
case RLE: {
boolean rleValue = currentValue == 0;
boolean rleValue = getDecodedInt() == 0;
while (destinationIndex < endIndex) {
values[destinationIndex++] = rleValue;
}
nonNullCount += currentValue * chunkSize;
nonNullCount += getDecodedInt() * chunkSize;
break;
}
case PACKED: {
int[] buffer = currentBuffer;
for (int sourceIndex = buffer.length - currentCount; destinationIndex < endIndex; sourceIndex++, destinationIndex++) {
int[] buffer = getDecodedInts();
for (int sourceIndex = buffer.length - getCurrentCount(); destinationIndex < endIndex; sourceIndex++, destinationIndex++) {
final int value = buffer[sourceIndex];
values[destinationIndex] = value == 0;
nonNullCount += value;
}
break;
}
default:
throw new ParquetDecodingException("not a valid mode " + mode);
throw new ParquetDecodingException("not a valid mode " + getCurrentMode());
}
currentCount -= chunkSize;
decrementCurrentCount(chunkSize);
remainingToCopy -= chunkSize;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.facebook.presto.parquet.batchreader.decoders;

import com.facebook.presto.parquet.batchreader.decoders.rle.BaseRLEBitPackedDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.GenericRLEDictionaryValuesDecoder;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.parquet.io.ParquetDecodingException;
import org.openjdk.jol.info.ClassLayout;
Expand All @@ -25,9 +25,9 @@
import static io.airlift.slice.SizeOf.sizeOf;

public class RepetitionLevelDecoder
extends BaseRLEBitPackedDecoder
extends GenericRLEDictionaryValuesDecoder
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(BaseRLEBitPackedDecoder.class).instanceSize();
private static final int INSTANCE_SIZE = ClassLayout.parseClass(GenericRLEDictionaryValuesDecoder.class).instanceSize();

private int remaining;
private int currentOffsetPackedBuffer;
Expand All @@ -53,29 +53,29 @@ public int readNext(IntList repetitionLevels, int batchSize)
break;
}

switch (mode) {
switch (getCurrentMode()) {
case RLE: {
int rleValue = currentValue;
int rleValue = getDecodedInt();
if (rleValue == 0) {
int chunkSize = Math.min(remainingToCopy, currentCount);
int chunkSize = Math.min(remainingToCopy, getCurrentCount());
for (int i = 0; i < chunkSize; i++) {
repetitionLevels.add(0);
}
currentCount -= chunkSize;
decrementCurrentCount(chunkSize);
remaining -= chunkSize;
remainingToCopy -= chunkSize;
}
else {
remaining -= currentCount;
for (int i = 0; i < currentCount; i++) {
remaining -= getCurrentCount();
for (int i = 0; i < getCurrentCount(); i++) {
repetitionLevels.add(rleValue);
}
currentCount = 0;
decrementCurrentCount(getCurrentCount());
}
break;
}
case PACKED: {
final int[] localBuffer = currentBuffer;
final int[] localBuffer = getDecodedInts();
do {
int rlValue = localBuffer[currentOffsetPackedBuffer];
currentOffsetPackedBuffer = currentOffsetPackedBuffer + 1;
Expand All @@ -86,11 +86,11 @@ public int readNext(IntList repetitionLevels, int batchSize)
remaining--;
}
while (currentOffsetPackedBuffer < endOffsetPackedBuffer && remainingToCopy > 0);
currentCount = endOffsetPackedBuffer - currentOffsetPackedBuffer;
decrementCurrentCount(endOffsetPackedBuffer - currentOffsetPackedBuffer);
break;
}
default:
throw new ParquetDecodingException("not a valid mode " + mode);
throw new ParquetDecodingException("not a valid mode " + getCurrentMode());
}
}
return batchSize - remainingToCopy;
Expand All @@ -99,19 +99,19 @@ public int readNext(IntList repetitionLevels, int batchSize)
@Override
public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE + sizeOf(currentBuffer);
return INSTANCE_SIZE + sizeOf(getDecodedInts());
}

private boolean ensureBlockAvailable()
throws IOException
{
if (currentCount == 0) {
if (getCurrentCount() == 0) {
if (!decode()) {
return false;
}
currentCount = Math.min(remaining, currentCount);
decrementCurrentCount(Math.min(remaining, getCurrentCount()));
currentOffsetPackedBuffer = 0;
endOffsetPackedBuffer = currentCount;
endOffsetPackedBuffer = getCurrentCount();
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static io.airlift.slice.SizeOf.sizeOf;

public class BinaryRLEDictionaryValuesDecoder
extends BaseRLEBitPackedDecoder
extends GenericRLEDictionaryValuesDecoder
implements BinaryValuesDecoder
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(BinaryRLEDictionaryValuesDecoder.class).instanceSize();
Expand All @@ -48,17 +48,17 @@ public ValueBuffer readNext(int length)
int bufferSize = 0;
int remainingToCopy = length;
while (remainingToCopy > 0) {
if (currentCount == 0) {
if (getCurrentCount() == 0) {
if (!decode()) {
break;
}
}

int numEntriesToFill = Math.min(remainingToCopy, currentCount);
int numEntriesToFill = Math.min(remainingToCopy, getCurrentCount());
int endIndex = destinationIndex + numEntriesToFill;
switch (mode) {
switch (getCurrentMode()) {
case RLE: {
final int rleValue = currentValue;
final int rleValue = getDecodedInt();
final int rleValueLength = dictionary.getLength(rleValue);
while (destinationIndex < endIndex) {
dictionaries[destinationIndex++] = rleValue;
Expand All @@ -67,19 +67,19 @@ public ValueBuffer readNext(int length)
break;
}
case PACKED: {
final int[] localBuffer = currentBuffer;
final int[] localBuffer = getDecodedInts();
final BinaryBatchDictionary localDictionary = dictionary;
for (int srcIndex = currentBuffer.length - currentCount; destinationIndex < endIndex; srcIndex++, destinationIndex++) {
for (int srcIndex = localBuffer.length - getCurrentCount(); destinationIndex < endIndex; srcIndex++, destinationIndex++) {
int dictionaryId = localBuffer[srcIndex];
dictionaries[destinationIndex] = dictionaryId;
bufferSize += localDictionary.getLength(dictionaryId);
}
break;
}
default:
throw new ParquetDecodingException("not a valid mode " + this.mode);
throw new ParquetDecodingException("not a valid mode " + getCurrentMode());
}
currentCount -= numEntriesToFill;
decrementCurrentCount(numEntriesToFill);
remainingToCopy -= numEntriesToFill;
}

Expand Down Expand Up @@ -110,14 +110,14 @@ public void skip(int length)
{
int remaining = length;
while (remaining > 0) {
if (currentCount == 0) {
if (getCurrentCount() == 0) {
if (!decode()) {
break;
}
}

int chunkSize = Math.min(remaining, currentCount);
currentCount -= chunkSize;
int chunkSize = Math.min(remaining, getCurrentCount());
decrementCurrentCount(chunkSize);
remaining -= chunkSize;
}
checkState(remaining == 0, "Invalid read size request");
Expand Down
Loading
Loading