From 84b3c6bffc7280b11dc894bf87e640501ffcfb33 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Fri, 17 Oct 2025 10:41:22 +0100 Subject: [PATCH] [11067] Support scan filter for decimal in ORC --- .../reader/SelectiveDecimalColumnReader.cpp | 155 ++++++++++++++++-- .../reader/SelectiveDecimalColumnReader.h | 19 ++- velox/dwio/dwrf/test/E2EFilterTest.cpp | 56 +++++++ velox/exec/tests/TableScanTest.cpp | 155 +++++++++++++++++- velox/exec/tests/data/decimal.orc | Bin 0 -> 738 bytes 5 files changed, 371 insertions(+), 14 deletions(-) create mode 100644 velox/exec/tests/data/decimal.orc diff --git a/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp index ec570ae05b7f..cdb844a41b76 100644 --- a/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp @@ -75,16 +75,17 @@ void SelectiveDecimalColumnReader::seekToRowGroup(int64_t index) { template template -void SelectiveDecimalColumnReader::readHelper(RowSet rows) { - vector_size_t numRows = rows.back() + 1; +void SelectiveDecimalColumnReader::readHelper( + const common::Filter* filter, + RowSet rows) { ExtractToReader extractValues(this); - common::AlwaysTrue filter; + common::AlwaysTrue alwaysTrue; DirectRleColumnVisitor< int64_t, common::AlwaysTrue, decltype(extractValues), kDense> - visitor(filter, this, rows, extractValues); + visitor(alwaysTrue, this, rows, extractValues); // decode scale stream if (version_ == velox::dwrf::RleVersion_1) { @@ -104,14 +105,135 @@ void SelectiveDecimalColumnReader::readHelper(RowSet rows) { // reset numValues_ before reading values numValues_ = 0; valueSize_ = sizeof(DataT); + vector_size_t numRows = rows.back() + 1; ensureValuesCapacity(numRows); // decode value stream facebook::velox::dwio::common:: ColumnVisitor - valueVisitor(filter, this, rows, extractValues); + valueVisitor(alwaysTrue, this, rows, extractValues); decodeWithVisitor>(valueDecoder_.get(), valueVisitor); readOffset_ += numRows; + + // Fill decimals before applying filter. + fillDecimals(); + + // 'nullsInReadRange_' is the nulls for the entire read range, and if the row + // set is not dense, result nulls should be allocated, which represents the + // nulls for the selected rows before filtering. + const auto rawNulls = nullsInReadRange_ + ? (kDense ? nullsInReadRange_->as() : rawResultNulls_) + : nullptr; + // Process filter. + process(filter, rows, rawNulls); +} + +template +void SelectiveDecimalColumnReader::processNulls( + bool isNull, + const RowSet& rows, + const uint64_t* rawNulls) { + if (!rawNulls) { + return; + } + returnReaderNulls_ = false; + anyNulls_ = !isNull; + allNull_ = isNull; + + auto rawDecimal = values_->asMutable(); + auto rawScale = scaleBuffer_->asMutable(); + + vector_size_t idx = 0; + if (isNull) { + for (vector_size_t i = 0; i < numValues_; i++) { + if (bits::isBitNull(rawNulls, i)) { + bits::setNull(rawResultNulls_, idx); + addOutputRow(rows[i]); + idx++; + } + } + } else { + for (vector_size_t i = 0; i < numValues_; i++) { + if (!bits::isBitNull(rawNulls, i)) { + bits::setNull(rawResultNulls_, idx, false); + rawDecimal[idx] = rawDecimal[i]; + rawScale[idx] = rawScale[i]; + addOutputRow(rows[i]); + idx++; + } + } + } +} + +template +void SelectiveDecimalColumnReader::processFilter( + const common::Filter* filter, + const RowSet& rows, + const uint64_t* rawNulls) { + VELOX_CHECK_NOT_NULL(filter, "Filter must not be null."); + returnReaderNulls_ = false; + anyNulls_ = false; + allNull_ = true; + + vector_size_t idx = 0; + auto rawDecimal = values_->asMutable(); + for (vector_size_t i = 0; i < numValues_; i++) { + if (rawNulls && bits::isBitNull(rawNulls, i)) { + if (filter->testNull()) { + bits::setNull(rawResultNulls_, idx); + addOutputRow(rows[i]); + anyNulls_ = true; + idx++; + } + } else { + bool tested; + if constexpr (std::is_same_v) { + tested = filter->testInt64(rawDecimal[i]); + } else { + tested = filter->testInt128(rawDecimal[i]); + } + + if (tested) { + if (rawNulls) { + bits::setNull(rawResultNulls_, idx, false); + } + rawDecimal[idx] = rawDecimal[i]; + addOutputRow(rows[i]); + allNull_ = false; + idx++; + } + } + } +} + +template +void SelectiveDecimalColumnReader::process( + const common::Filter* filter, + const RowSet& rows, + const uint64_t* rawNulls) { + if (!filter) { + // No filter and "hasDeletion" is false so input rows will be + // reused. + return; + } + + switch (filter->kind()) { + case common::FilterKind::kIsNull: + processNulls(true, rows, rawNulls); + break; + case common::FilterKind::kIsNotNull: { + if (rawNulls) { + processNulls(false, rows, rawNulls); + } else { + for (vector_size_t i = 0; i < numValues_; i++) { + addOutputRow(rows[i]); + } + } + break; + } + default: + processFilter(filter, rows, rawNulls); + } } template @@ -119,14 +241,22 @@ void SelectiveDecimalColumnReader::read( int64_t offset, const RowSet& rows, const uint64_t* incomingNulls) { - VELOX_CHECK(!scanSpec_->filter()); VELOX_CHECK(!scanSpec_->valueHook()); prepareRead(offset, rows, incomingNulls); + if (DictionaryValues::hasFilter(scanSpec_->filter()) && + (!resultNulls_ || !resultNulls_->unique() || + resultNulls_->capacity() * 8 < rows.size())) { + // Make sure a dedicated resultNulls_ is allocated with enough capacity as + // RleDecoder always assumes it is available. + resultNulls_ = AlignedBuffer::allocate(rows.size(), memoryPool_); + rawResultNulls_ = resultNulls_->asMutable(); + } + rawValues_ = values_->asMutable(); bool isDense = rows.back() == rows.size() - 1; if (isDense) { - readHelper(rows); + readHelper(scanSpec_->filter(), rows); } else { - readHelper(rows); + readHelper(scanSpec_->filter(), rows); } } @@ -134,16 +264,17 @@ template void SelectiveDecimalColumnReader::getValues( const RowSet& rows, VectorPtr* result) { + getIntValues(rows, requestedType_, result); +} + +template +void SelectiveDecimalColumnReader::fillDecimals() { auto nullsPtr = resultNulls() ? resultNulls()->template as() : nullptr; auto scales = scaleBuffer_->as(); auto values = values_->asMutable(); - DecimalUtil::fillDecimals( values, nullsPtr, values, scales, numValues_, scale_); - - rawValues_ = values_->asMutable(); - getIntValues(rows, requestedType_, result); } template class SelectiveDecimalColumnReader; diff --git a/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h b/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h index 67a82b051e36..338d8ac4756f 100644 --- a/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h @@ -49,7 +49,24 @@ class SelectiveDecimalColumnReader : public SelectiveColumnReader { private: template - void readHelper(RowSet rows); + void readHelper(const common::Filter* filter, RowSet rows); + + // Process IsNull and IsNotNull filters. + void processNulls(bool isNull, const RowSet& rows, const uint64_t* rawNulls); + + // Process filters on decimal values. + void processFilter( + const common::Filter* filter, + const RowSet& rows, + const uint64_t* rawNulls); + + // Dispatch to the respective filter processing based on the filter type. + void process( + const common::Filter* filter, + const RowSet& rows, + const uint64_t* rawNulls); + + void fillDecimals(); std::unique_ptr> valueDecoder_; std::unique_ptr> scaleDecoder_; diff --git a/velox/dwio/dwrf/test/E2EFilterTest.cpp b/velox/dwio/dwrf/test/E2EFilterTest.cpp index 43b67e91e550..d9ebb9f52e62 100644 --- a/velox/dwio/dwrf/test/E2EFilterTest.cpp +++ b/velox/dwio/dwrf/test/E2EFilterTest.cpp @@ -241,6 +241,62 @@ TEST_F(E2EFilterTest, floatAndDouble) { false); } +TEST_F(E2EFilterTest, DISABLED_shortDecimal) { + // ORC write functionality is not yet supported. Enable this test once it + // becomes available and set the file format to ORC at that time. + // options.format = DwrfFormat::kOrc; + const std::unordered_map types = { + {"shortdecimal_val:decimal(8, 5)", DECIMAL(8, 5)}, + {"shortdecimal_val:decimal(10, 5)", DECIMAL(10, 5)}, + {"shortdecimal_val:decimal(17, 5)", DECIMAL(17, 5)}}; + + for (const auto& pair : types) { + testWithTypes( + pair.first, + [&]() { + makeIntDistribution( + "shortdecimal_val", + 10, // min + 100, // max + 22, // repeats + 19, // rareFrequency + -999, // rareMin + 30000, // rareMax + true); + }, + false, + {"shortdecimal_val"}, + 20); + } +} + +TEST_F(E2EFilterTest, DISABLED_longDecimal) { + // ORC write functionality is not yet supported. Enable this test once it + // becomes available and set the file format to ORC at that time. + // options.format = DwrfFormat::kOrc; + const std::unordered_map types = { + {"longdecimal_val:decimal(30, 10)", DECIMAL(30, 10)}, + {"longdecimal_val:decimal(37, 15)", DECIMAL(37, 15)}}; + for (const auto& pair : types) { + testWithTypes( + pair.first, + [&]() { + makeIntDistribution( + "longdecimal_val", + 10, // min + 100, // max + 22, // repeats + 19, // rareFrequency + -999, // rareMin + 30000, // rareMax + true); + }, + false, + {"longdecimal_val"}, + 20); + } +} + TEST_F(E2EFilterTest, stringDirect) { testutil::TestValue::enable(); bool coverage[2][2]{}; diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index c53fdc16c7b3..cef3712f3489 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -36,6 +36,7 @@ #include "velox/connectors/hive/HiveDataSource.h" #include "velox/connectors/hive/HivePartitionFunction.h" #include "velox/dwio/common/tests/utils/DataFiles.h" +#include "velox/dwio/orc/reader/OrcReader.h" #include "velox/exec/Cursor.h" #include "velox/exec/Exchange.h" #include "velox/exec/PlanNodeStats.h" @@ -46,6 +47,7 @@ #include "velox/exec/tests/utils/TableScanTestBase.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/expression/ExprToSubfieldFilter.h" +#include "velox/functions/lib/IsNull.h" #include "velox/type/Timestamp.h" #include "velox/type/Type.h" #include "velox/type/tests/SubfieldFiltersBuilder.h" @@ -72,7 +74,12 @@ void verifyCacheStats( EXPECT_EQ(cacheStats.numLookups, numLookups); } -class TableScanTest : public TableScanTestBase {}; +class TableScanTest : public TableScanTestBase { + void SetUp() override { + TableScanTestBase::SetUp(); + orc::registerOrcReaderFactory(); + } +}; TEST_F(TableScanTest, allColumns) { auto vectors = makeVectors(10, 1'000); @@ -6152,5 +6159,151 @@ TEST_F(TableScanTest, parallelUnitLoader) { ASSERT_GT(stats.count("waitForUnitReadyNanos"), 0); } +TEST_F(TableScanTest, shortDecimalFilter) { + functions::registerIsNotNullFunction("isnotnull"); + + std::vector> values = { + 123456789123456789L, + 987654321123456L, + std::nullopt, + 2000000000000000L, + 5000000000000000L, + 987654321987654321L, + 100000000000000L, + 1230000000123456L, + 120000000123456L, + std::nullopt}; + auto rowVector = makeRowVector( + {"a"}, + { + makeNullableFlatVector(values, DECIMAL(18, 6)), + }); + createDuckDbTable({rowVector}); + + auto filePath = facebook::velox::test::getDataFilePath( + "velox/exec/tests", "data/decimal.orc"); + auto split = exec::test::HiveConnectorSplitBuilder(filePath) + .start(0) + .length(fs::file_size(filePath)) + .fileFormat(dwio::common::FileFormat::ORC) + .build(); + + auto rowType = rowVector->rowType(); + // Is not null. + auto op = + PlanBuilder().tableScan(rowType, {}, "isnotnull(a)", rowType).planNode(); + assertQuery(op, split, "SELECT a FROM tmp where a is not null"); + + // Is null. + op = PlanBuilder().tableScan(rowType, {}, "is_null(a)", rowType).planNode(); + assertQuery(op, split, "SELECT a FROM tmp where a is null"); + + // BigintRange. + op = + PlanBuilder() + .tableScan( + rowType, + {}, + "a > 2000000000.0::DECIMAL(18, 6) and a < 6000000000.0::DECIMAL(18, 6)", + rowType) + .planNode(); + assertQuery( + op, + split, + "SELECT a FROM tmp where a > 2000000000.0 and a < 6000000000.0"); + + // NegatedBigintRange. + op = + PlanBuilder() + .tableScan( + rowType, + {}, + "not(a between 2000000000.0::DECIMAL(18, 6) and 6000000000.0::DECIMAL(18, 6))", + rowType) + .planNode(); + assertQuery( + op, + split, + "SELECT a FROM tmp where a < 2000000000.0 or a > 6000000000.0"); +} + +TEST_F(TableScanTest, longDecimalFilter) { + functions::registerIsNotNullFunction("isnotnull"); + + std::vector> shortValues = { + 123456789123456789L, + 987654321123456L, + std::nullopt, + 2000000000000000L, + 5000000000000000L, + 987654321987654321L, + 100000000000000L, + 1230000000123456L, + 120000000123456L, + std::nullopt}; + + std::vector> longValues = { + HugeInt::parse("123456789123456789123456789" + std::string(9, '0')), + HugeInt::parse("987654321123456789" + std::string(9, '0')), + std::nullopt, + HugeInt::parse("2" + std::string(37, '0')), + HugeInt::parse("5" + std::string(37, '0')), + HugeInt::parse("987654321987654321987654321" + std::string(9, '0')), + HugeInt::parse("1" + std::string(26, '0')), + HugeInt::parse("123000000012345678" + std::string(10, '0')), + HugeInt::parse("120000000123456789" + std::string(9, '0')), + HugeInt::parse("9" + std::string(37, '0'))}; + + auto rowVector = makeRowVector( + {"a", "b"}, + { + makeNullableFlatVector(shortValues, DECIMAL(18, 6)), + makeNullableFlatVector(longValues, DECIMAL(38, 18)), + }); + createDuckDbTable({rowVector}); + + auto filePath = facebook::velox::test::getDataFilePath( + "velox/exec/tests", "data/decimal.orc"); + auto split = exec::test::HiveConnectorSplitBuilder(filePath) + .start(0) + .length(fs::file_size(filePath)) + .fileFormat(dwio::common::FileFormat::ORC) + .build(); + + auto outputType = ROW({"b"}, {DECIMAL(38, 18)}); + auto dataColumns = rowVector->rowType(); + + auto op = PlanBuilder() + .tableScan(outputType, {}, "isnotnull(b)", dataColumns) + .planNode(); + assertQuery(op, split, "SELECT b FROM tmp where b is not null"); + + // Is null. + op = PlanBuilder() + .tableScan(outputType, {}, "is_null(b)", dataColumns) + .planNode(); + assertQuery(op, split, "SELECT b FROM tmp where b is null"); + + // HugeintRange. + op = + PlanBuilder() + .tableScan( + outputType, + {}, + "b > 2000000000.0::DECIMAL(38, 18) and b < 6000000000.0::DECIMAL(38, 18)", + dataColumns) + .planNode(); + assertQuery( + op, + split, + "SELECT b FROM tmp where b > 2000000000.0 and b < 6000000000.0"); + + // Test filter column not being projected out. + op = PlanBuilder() + .tableScan(outputType, {}, "a is null", dataColumns) + .planNode(); + assertQuery(op, split, "SELECT b FROM tmp WHERE a is null"); +} + } // namespace } // namespace facebook::velox::exec diff --git a/velox/exec/tests/data/decimal.orc b/velox/exec/tests/data/decimal.orc new file mode 100644 index 0000000000000000000000000000000000000000..b89662b65f6880650cb8ec86b8bf285be1af4adc GIT binary patch literal 738 zcmaKqUr19?9LIm>-rafbag)2_pwkMnQ6fED?%ki;wui6;Sy4K~XhaZGEF*9>p{Ihw zvOt7PDGDkOqp%r_K=i4af+0g8Dl|y2l!%DfLoHt{x_6V3g5do4{y3c97rvkKYi?<9 z0T4U2VG@HA06~TLi3~2Zi`HW7t0A(kIZ>QeheLafV8HLw%|F@a~oTaG`MX%lJ0ln?1?<9t=*5Wo^8#f1x;QH~|_f<{&p$GRkEwqmf+U z=1$JCwA%Rs<0fSkBRZ9RjP?QWIQu+k4&*W~DF^2x)E)(W) z>$)D+T%M@v^9O>45kv8V1RioiHmyaG7t&S`S`n