diff --git a/velox/common/base/PrefixSortConfig.h b/velox/common/base/PrefixSortConfig.h index 34adee1b7914..27048174d857 100644 --- a/velox/common/base/PrefixSortConfig.h +++ b/velox/common/base/PrefixSortConfig.h @@ -22,16 +22,16 @@ namespace facebook::velox::common { /// Specifies the config for prefix-sort. struct PrefixSortConfig { - explicit PrefixSortConfig( - int64_t maxNormalizedKeySize, - int32_t threshold = 130) - : maxNormalizedKeySize(maxNormalizedKeySize), threshold(threshold) {} + PrefixSortConfig() = default; + + PrefixSortConfig(int64_t _maxNormalizedKeySize, int32_t _threshold) + : maxNormalizedKeySize(_maxNormalizedKeySize), threshold(_threshold) {} /// Max number of bytes can store normalized keys in prefix-sort buffer per - /// entry. - const int64_t maxNormalizedKeySize; + /// entry. Same with QueryConfig kPrefixSortNormalizedKeyMaxBytes. + int64_t maxNormalizedKeySize{128}; /// PrefixSort will have performance regression when the dateset is too small. - const int32_t threshold; + int32_t threshold{130}; }; } // namespace facebook::velox::common diff --git a/velox/common/base/SpillConfig.cpp b/velox/common/base/SpillConfig.cpp index eb02a00b4f1d..dd428a41ec7b 100644 --- a/velox/common/base/SpillConfig.cpp +++ b/velox/common/base/SpillConfig.cpp @@ -34,6 +34,7 @@ SpillConfig::SpillConfig( uint64_t _maxSpillRunRows, uint64_t _writerFlushThresholdSize, const std::string& _compressionKind, + std::optional _prefixSortConfig, const std::string& _fileCreateConfig) : getSpillDirPathCb(std::move(_getSpillDirPathCb)), updateAndCheckSpillLimitCb(std::move(_updateAndCheckSpillLimitCb)), @@ -52,6 +53,7 @@ SpillConfig::SpillConfig( maxSpillRunRows(_maxSpillRunRows), writerFlushThresholdSize(_writerFlushThresholdSize), compressionKind(common::stringToCompressionKind(_compressionKind)), + prefixSortConfig(_prefixSortConfig), fileCreateConfig(_fileCreateConfig) { VELOX_USER_CHECK_GE( spillableReservationGrowthPct, diff --git a/velox/common/base/SpillConfig.h b/velox/common/base/SpillConfig.h index d6ced347e787..09790dff2d7d 100644 --- a/velox/common/base/SpillConfig.h +++ b/velox/common/base/SpillConfig.h @@ -20,6 +20,7 @@ #include #include +#include "velox/common/base/PrefixSortConfig.h" #include "velox/common/compression/Compression.h" namespace facebook::velox::common { @@ -61,6 +62,7 @@ struct SpillConfig { uint64_t _maxSpillRunRows, uint64_t _writerFlushThresholdSize, const std::string& _compressionKind, + std::optional _prefixSortConfig = std::nullopt, const std::string& _fileCreateConfig = {}); /// Returns the spilling level with given 'startBitOffset' and @@ -135,6 +137,10 @@ struct SpillConfig { /// CompressionKind when spilling, CompressionKind_NONE means no compression. common::CompressionKind compressionKind; + /// Prefix sort config when spilling, enable prefix sort when this config is + /// set, otherwise, fallback to timsort. + std::optional prefixSortConfig; + /// Custom options passed to velox::FileSystem to create spill WriteFile. std::string fileCreateConfig; }; diff --git a/velox/common/base/tests/SpillConfigTest.cpp b/velox/common/base/tests/SpillConfigTest.cpp index 837f0098c47c..9949486a4862 100644 --- a/velox/common/base/tests/SpillConfigTest.cpp +++ b/velox/common/base/tests/SpillConfigTest.cpp @@ -22,7 +22,14 @@ using namespace facebook::velox::common; using namespace facebook::velox::exec; -TEST(SpillConfig, spillLevel) { +class SpillConfigTest : public testing::TestWithParam { + protected: + const bool enablePrefixSort_{GetParam()}; + const std::optional prefixSortConfig_ = + enablePrefixSort_ ? std::optional({}) : std::nullopt; +}; + +TEST_P(SpillConfigTest, spillLevel) { const uint8_t kInitialBitOffset = 16; const uint8_t kNumPartitionsBits = 3; const SpillConfig config( @@ -40,7 +47,8 @@ TEST(SpillConfig, spillLevel) { 0, 0, 0, - "none"); + "none", + prefixSortConfig_); struct { uint8_t bitOffset; // Indicates an invalid if 'expectedLevel' is negative. @@ -70,7 +78,7 @@ TEST(SpillConfig, spillLevel) { } } -TEST(SpillConfig, spillLevelLimit) { +TEST_P(SpillConfigTest, spillLevelLimit) { struct { uint8_t startBitOffset; int32_t numBits; @@ -125,7 +133,8 @@ TEST(SpillConfig, spillLevelLimit) { testData.maxSpillLevel, 0, 0, - "none"); + "none", + prefixSortConfig_); ASSERT_EQ( testData.expectedExceeds, @@ -133,7 +142,7 @@ TEST(SpillConfig, spillLevelLimit) { } } -TEST(SpillConfig, spillableReservationPercentages) { +TEST_P(SpillConfigTest, spillableReservationPercentages) { struct { uint32_t growthPct; int32_t minPct; @@ -171,7 +180,8 @@ TEST(SpillConfig, spillableReservationPercentages) { 0, 1'000'000, 0, - "none"); + "none", + prefixSortConfig_); }; if (testData.expectedError) { @@ -183,3 +193,8 @@ TEST(SpillConfig, spillableReservationPercentages) { } } } + +VELOX_INSTANTIATE_TEST_SUITE_P( + SpillConfigTest, + SpillConfigTest, + testing::ValuesIn({false, true})); diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index b39df5397910..7ceccda275a4 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -254,6 +254,12 @@ class QueryConfig { static constexpr const char* kSpillCompressionKind = "spill_compression_codec"; + /// Enable the prefix sort or fallback to timsort in spill. The prefix sort is + /// faster than timsort but requires the memory to build prefix data, which + /// may cause out of memory. + static constexpr const char* kSpillEnablePrefixSort = + "spill_enable_prefix_sort"; + /// Specifies spill write buffer size in bytes. The spiller tries to buffer /// serialized spill data up to the specified size before write to storage /// underneath for io efficiency. If it is set to zero, then spill write @@ -656,6 +662,10 @@ class QueryConfig { return get(kSpillCompressionKind, "none"); } + bool spillEnablePrefixSort() const { + return get(kSpillEnablePrefixSort, false); + } + uint64_t spillWriteBufferSize() const { // The default write buffer size set to 1MB. return get(kSpillWriteBufferSize, 1L << 20); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 89355ebdb32b..e6fe264a7cac 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -349,6 +349,11 @@ Spilling - Specifies the compression algorithm type to compress the spilled data before write to disk to trade CPU for IO efficiency. The supported compression codecs are: ZLIB, SNAPPY, LZO, ZSTD, LZ4 and GZIP. NONE means no compression. + * - spill_enable_prefix_sort + - bool + - false + - Enable the prefix sort or fallback to timsort in spill. The prefix sort is faster than timsort but requires the + memory to build prefix data, which might have potential risk of running out of server memory. * - spiller_start_partition_bit - integer - 29 diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index dee9b97ebec4..9156672ea32e 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -141,6 +141,9 @@ std::optional DriverCtx::makeSpillConfig( queryConfig.maxSpillRunRows(), queryConfig.writerFlushThresholdBytes(), queryConfig.spillCompressionKind(), + queryConfig.spillEnablePrefixSort() + ? std::optional(prefixSortConfig()) + : std::nullopt, queryConfig.spillFileCreateConfig()); } diff --git a/velox/exec/PrefixSort.cpp b/velox/exec/PrefixSort.cpp index ad075bdacf36..a0205d71a35c 100644 --- a/velox/exec/PrefixSort.cpp +++ b/velox/exec/PrefixSort.cpp @@ -217,10 +217,10 @@ void PrefixSort::extractRowToPrefix(char* row, char* prefix) { // static. uint32_t PrefixSort::maxRequiredBytes( - memory::MemoryPool* pool, RowContainer* rowContainer, const std::vector& compareFlags, - const velox::common::PrefixSortConfig& config) { + const velox::common::PrefixSortConfig& config, + memory::MemoryPool* pool) { if (rowContainer->numRows() < config.threshold) { return 0; } @@ -235,6 +235,23 @@ uint32_t PrefixSort::maxRequiredBytes( return prefixSort.maxRequiredBytes(); } +// static +void PrefixSort::stdSort( + std::vector>& rows, + RowContainer* rowContainer, + const std::vector& compareFlags) { + std::sort( + rows.begin(), rows.end(), [&](const char* leftRow, const char* rightRow) { + for (auto i = 0; i < compareFlags.size(); ++i) { + if (auto result = rowContainer->compare( + leftRow, rightRow, i, compareFlags[i])) { + return result < 0; + } + } + return false; + }); +} + uint32_t PrefixSort::maxRequiredBytes() { const auto numRows = rowContainer_->numRows(); const auto numPages = diff --git a/velox/exec/PrefixSort.h b/velox/exec/PrefixSort.h index a565694d1c9d..d012204cc938 100644 --- a/velox/exec/PrefixSort.h +++ b/velox/exec/PrefixSort.h @@ -23,25 +23,6 @@ namespace facebook::velox::exec { -namespace detail { - -FOLLY_ALWAYS_INLINE void stdSort( - std::vector>& rows, - RowContainer* rowContainer, - const std::vector& compareFlags) { - std::sort( - rows.begin(), rows.end(), [&](const char* leftRow, const char* rightRow) { - for (auto i = 0; i < compareFlags.size(); ++i) { - if (auto result = rowContainer->compare( - leftRow, rightRow, i, compareFlags[i])) { - return result < 0; - } - } - return false; - }); -} -}; // namespace detail - /// The layout of prefix-sort buffer, a prefix entry includes: /// 1. normalized keys /// 2. non-normalized data ptr for semi-normalized types such as @@ -126,16 +107,16 @@ class PrefixSort { const std::vector& compareFlags, const velox::common::PrefixSortConfig& config) { if (rowContainer->numRows() < config.threshold) { - detail::stdSort(rows, rowContainer, compareFlags); + stdSort(rows, rowContainer, compareFlags); return; } VELOX_DCHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size()); const auto sortLayout = PrefixSortLayout::makeSortLayout( rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeySize); // All keys can not normalize, skip the binary string compare opt. - // Putting this outside sort-internal helps with inline std-sort. + // Putting this outside sort-internal helps with stdSort. if (sortLayout.noNormalizedKeys) { - detail::stdSort(rows, rowContainer, compareFlags); + stdSort(rows, rowContainer, compareFlags); return; } @@ -143,16 +124,24 @@ class PrefixSort { prefixSort.sortInternal(rows); } - /// The stdsort won't require bytes while prefixsort may require buffers + /// The std::sort won't require bytes while prefix sort may require buffers /// such as prefix data. The logic is similar to the above function - /// PrefixSort::sort but returns the maxmium buffer the sort may need. + /// PrefixSort::sort but returns the maximum buffer the sort may need. static uint32_t maxRequiredBytes( - memory::MemoryPool* pool, RowContainer* rowContainer, const std::vector& compareFlags, - const velox::common::PrefixSortConfig& config); + const velox::common::PrefixSortConfig& config, + memory::MemoryPool* pool); private: + /// Fallback to stdSort when prefix sort conditions such as config and memory + /// are not satisfied. stdSort provides >2X performance win than std::sort for + /// user experienced data. + static void stdSort( + std::vector>& rows, + RowContainer* rowContainer, + const std::vector& compareFlags); + // Estimates the memory required for prefix sort such as prefix buffer and // swap buffer. uint32_t maxRequiredBytes(); diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index f8cd69496172..ccaf9b153fea 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -298,7 +298,7 @@ void SortBuffer::ensureSortFits() { uint64_t sortBufferToReserve = numInputRows_ * sizeof(char*) + PrefixSort::maxRequiredBytes( - pool_, data_.get(), sortCompareFlags_, prefixSortConfig_); + data_.get(), sortCompareFlags_, prefixSortConfig_, pool_); { memory::ReclaimableSectionGuard guard(nonReclaimableSection_); if (pool_->maybeReserve(sortBufferToReserve)) { diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index 2c1e72847b9e..571c2634b907 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -161,7 +161,7 @@ void SortWindowBuild::ensureSortFits() { uint64_t sortBufferToReserve = numRows_ * (sizeof(char*) + sizeof(vector_size_t)) + PrefixSort::maxRequiredBytes( - pool_, data_.get(), compareFlags_, prefixSortConfig_); + data_.get(), compareFlags_, prefixSortConfig_, pool_); { memory::ReclaimableSectionGuard guard(nonReclaimableSection_); if (pool_->maybeReserve(sortBufferToReserve)) { diff --git a/velox/exec/Spill.cpp b/velox/exec/Spill.cpp index f7776566864c..cc58f1fdf541 100644 --- a/velox/exec/Spill.cpp +++ b/velox/exec/Spill.cpp @@ -23,6 +23,20 @@ using facebook::velox::common::testutil::TestValue; namespace facebook::velox::exec { +namespace { +// Returns the CompareFlags vector whose size is equal to numSortKeys. Fill in +// with default CompareFlags() if 'compareFlags' is empty. +const std::vector getCompareFlagsOrDefault( + const std::vector& compareFlags, + int32_t numSortKeys) { + VELOX_DCHECK(compareFlags.empty() || compareFlags.size() == numSortKeys); + if (compareFlags.size() == numSortKeys) { + return compareFlags; + } + return std::vector(numSortKeys); +} +} // namespace + void SpillMergeStream::pop() { if (++index_ >= size_) { setNextBatch(); @@ -74,6 +88,7 @@ SpillState::SpillState( uint64_t targetFileSize, uint64_t writeBufferSize, common::CompressionKind compressionKind, + const std::optional& prefixSortConfig, memory::MemoryPool* pool, folly::Synchronized* stats, const std::string& fileCreateConfig) @@ -82,10 +97,12 @@ SpillState::SpillState( fileNamePrefix_(fileNamePrefix), maxPartitions_(maxPartitions), numSortKeys_(numSortKeys), - sortCompareFlags_(sortCompareFlags), + sortCompareFlags_( + getCompareFlagsOrDefault(sortCompareFlags, numSortKeys)), targetFileSize_(targetFileSize), writeBufferSize_(writeBufferSize), compressionKind_(compressionKind), + prefixSortConfig_(prefixSortConfig), fileCreateConfig_(fileCreateConfig), pool_(pool), stats_(stats), diff --git a/velox/exec/Spill.h b/velox/exec/Spill.h index e51c052410b9..6f1ca38a5ccd 100644 --- a/velox/exec/Spill.h +++ b/velox/exec/Spill.h @@ -343,6 +343,7 @@ class SpillState { uint64_t targetFileSize, uint64_t writeBufferSize, common::CompressionKind compressionKind, + const std::optional& prefixSortConfig, memory::MemoryPool* pool, folly::Synchronized* stats, const std::string& fileCreateConfig = {}); @@ -369,6 +370,10 @@ class SpillState { return compressionKind_; } + const std::optional& prefixSortConfig() const { + return prefixSortConfig_; + } + const std::vector& sortCompareFlags() const { return sortCompareFlags_; } @@ -439,6 +444,7 @@ class SpillState { const uint64_t targetFileSize_; const uint64_t writeBufferSize_; const common::CompressionKind compressionKind_; + const std::optional prefixSortConfig_; const std::string fileCreateConfig_; memory::MemoryPool* const pool_; folly::Synchronized* const stats_; diff --git a/velox/exec/Spiller.cpp b/velox/exec/Spiller.cpp index 9f1f1513ba1c..a970217e70e5 100644 --- a/velox/exec/Spiller.cpp +++ b/velox/exec/Spiller.cpp @@ -21,6 +21,7 @@ #include "velox/common/testutil/TestValue.h" #include "velox/exec/Aggregate.h" #include "velox/exec/HashJoinBridge.h" +#include "velox/exec/PrefixSort.h" #include "velox/external/timsort/TimSort.hpp" using facebook::velox::common::testutil::TestValue; @@ -56,6 +57,7 @@ Spiller::Spiller( std::numeric_limits::max(), spillConfig->writeBufferSize, spillConfig->compressionKind, + spillConfig->prefixSortConfig, spillConfig->executor, spillConfig->maxSpillRunRows, spillConfig->fileCreateConfig, @@ -91,6 +93,7 @@ Spiller::Spiller( std::numeric_limits::max(), spillConfig->writeBufferSize, spillConfig->compressionKind, + spillConfig->prefixSortConfig, spillConfig->executor, spillConfig->maxSpillRunRows, spillConfig->fileCreateConfig, @@ -122,6 +125,7 @@ Spiller::Spiller( std::numeric_limits::max(), spillConfig->writeBufferSize, spillConfig->compressionKind, + spillConfig->prefixSortConfig, spillConfig->executor, spillConfig->maxSpillRunRows, spillConfig->fileCreateConfig, @@ -154,6 +158,7 @@ Spiller::Spiller( spillConfig->maxFileSize, spillConfig->writeBufferSize, spillConfig->compressionKind, + spillConfig->prefixSortConfig, spillConfig->executor, 0, spillConfig->fileCreateConfig, @@ -187,6 +192,7 @@ Spiller::Spiller( spillConfig->maxFileSize, spillConfig->writeBufferSize, spillConfig->compressionKind, + spillConfig->prefixSortConfig, spillConfig->executor, spillConfig->maxSpillRunRows, spillConfig->fileCreateConfig, @@ -216,6 +222,7 @@ Spiller::Spiller( spillConfig->maxFileSize, spillConfig->writeBufferSize, spillConfig->compressionKind, + spillConfig->prefixSortConfig, spillConfig->executor, spillConfig->maxSpillRunRows, spillConfig->fileCreateConfig, @@ -237,6 +244,7 @@ Spiller::Spiller( uint64_t targetFileSize, uint64_t writeBufferSize, common::CompressionKind compressionKind, + const std::optional& prefixSortConfig, folly::Executor* executor, uint64_t maxSpillRunRows, const std::string& fileCreateConfig, @@ -259,6 +267,7 @@ Spiller::Spiller( targetFileSize, writeBufferSize, compressionKind, + prefixSortConfig, memory::spillMemoryPool(), spillStats, fileCreateConfig) { @@ -421,13 +430,24 @@ void Spiller::ensureSorted(SpillRun& run) { uint64_t sortTimeNs{0}; { NanosecondTimer timer(&sortTimeNs); - gfx::timsort( - run.rows.begin(), - run.rows.end(), - [&](const char* left, const char* right) { - return container_->compareRows( - left, right, state_.sortCompareFlags()) < 0; - }); + + if (!state_.prefixSortConfig().has_value()) { + gfx::timsort( + run.rows.begin(), + run.rows.end(), + [&](const char* left, const char* right) { + return container_->compareRows( + left, right, state_.sortCompareFlags()) < 0; + }); + } else { + PrefixSort::sort( + run.rows, + memory::spillMemoryPool(), + container_, + state_.sortCompareFlags(), + state_.prefixSortConfig().value()); + } + run.sorted = true; } diff --git a/velox/exec/Spiller.h b/velox/exec/Spiller.h index 33bbcb1f5c66..91a1352a205b 100644 --- a/velox/exec/Spiller.h +++ b/velox/exec/Spiller.h @@ -215,6 +215,7 @@ class Spiller { uint64_t targetFileSize, uint64_t writeBufferSize, common::CompressionKind compressionKind, + const std::optional& prefixSortConfig, folly::Executor* executor, uint64_t maxSpillRunRows, const std::string& fileCreateConfig, diff --git a/velox/exec/tests/PrefixSortTest.cpp b/velox/exec/tests/PrefixSortTest.cpp index 4c97adb2d800..eedacf131a15 100644 --- a/velox/exec/tests/PrefixSortTest.cpp +++ b/velox/exec/tests/PrefixSortTest.cpp @@ -61,13 +61,13 @@ class PrefixSortTest : public exec::test::OperatorTestBase { const std::shared_ptr sortPool = rootPool_->addLeafChild("prefixsort"); const auto maxBytes = PrefixSort::maxRequiredBytes( - sortPool.get(), &rowContainer, compareFlags, common::PrefixSortConfig{ 1024, // Set threshold to 0 to enable prefix-sort in small dataset. - 0}); + 0}, + sortPool.get()); const auto beforeBytes = sortPool->peakBytes(); ASSERT_EQ(sortPool->peakBytes(), 0); // Use PrefixSort to sort rows. diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index 1ea0390ed5c2..8ebb04f9b0fb 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -32,7 +32,8 @@ using namespace facebook::velox::memory; namespace facebook::velox::functions::test { -class SortBufferTest : public OperatorTestBase { +class SortBufferTest : public OperatorTestBase, + public testing::WithParamInterface { protected: void SetUp() override { OperatorTestBase::SetUp(); @@ -62,11 +63,17 @@ class SortBufferTest : public OperatorTestBase { 0, 0, 0, - "none"); + "none", + spillPrefixSortConfig_); } + const bool enableSpillPrefixSort_{GetParam()}; const velox::common::PrefixSortConfig prefixSortConfig_ = velox::common::PrefixSortConfig{std::numeric_limits::max(), 130}; + const std::optional spillPrefixSortConfig_ = + enableSpillPrefixSort_ + ? std::optional(prefixSortConfig_) + : std::nullopt; const RowTypePtr inputType_ = ROW( {{"c0", BIGINT()}, @@ -96,7 +103,7 @@ class SortBufferTest : public OperatorTestBase { folly::Random::DefaultGenerator rng_; }; -TEST_F(SortBufferTest, singleKey) { +TEST_P(SortBufferTest, singleKey) { struct { std::vector sortCompareFlags; std::vector expectedResult; @@ -158,7 +165,7 @@ TEST_F(SortBufferTest, singleKey) { } } -TEST_F(SortBufferTest, multipleKeys) { +TEST_P(SortBufferTest, multipleKeys) { auto sortBuffer = std::make_unique( inputType_, sortColumnIndices_, @@ -188,7 +195,7 @@ TEST_F(SortBufferTest, multipleKeys) { } // TODO: enable it later with test utility to compare the sorted result. -TEST_F(SortBufferTest, DISABLED_randomData) { +TEST_P(SortBufferTest, DISABLED_randomData) { struct { RowTypePtr inputType; std::vector sortColumnIndices; @@ -265,7 +272,7 @@ TEST_F(SortBufferTest, DISABLED_randomData) { } } -TEST_F(SortBufferTest, batchOutput) { +TEST_P(SortBufferTest, batchOutput) { struct { bool triggerSpill; std::vector numInputRows; @@ -312,7 +319,8 @@ TEST_F(SortBufferTest, batchOutput) { 0, 0, 0, - "none"); + "none", + prefixSortConfig_); folly::Synchronized spillStats; auto sortBuffer = std::make_unique( inputType_, @@ -361,7 +369,7 @@ TEST_F(SortBufferTest, batchOutput) { } } -TEST_F(SortBufferTest, spill) { +TEST_P(SortBufferTest, spill) { struct { bool spillEnabled; bool memoryReservationFailure; @@ -408,7 +416,8 @@ TEST_F(SortBufferTest, spill) { 0, 0, 0, - "none"); + "none", + prefixSortConfig_); folly::Synchronized spillStats; auto sortBuffer = std::make_unique( inputType_, @@ -463,7 +472,7 @@ TEST_F(SortBufferTest, spill) { } } -DEBUG_ONLY_TEST_F(SortBufferTest, spillDuringInput) { +DEBUG_ONLY_TEST_P(SortBufferTest, spillDuringInput) { auto spillDirectory = exec::test::TempDirectoryPath::create(); const auto spillConfig = getSpillConfig(spillDirectory->getPath()); folly::Synchronized spillStats; @@ -520,7 +529,7 @@ DEBUG_ONLY_TEST_F(SortBufferTest, spillDuringInput) { } } -DEBUG_ONLY_TEST_F(SortBufferTest, spillDuringOutput) { +DEBUG_ONLY_TEST_P(SortBufferTest, spillDuringOutput) { auto spillDirectory = exec::test::TempDirectoryPath::create(); const auto spillConfig = getSpillConfig(spillDirectory->getPath()); folly::Synchronized spillStats; @@ -572,7 +581,7 @@ DEBUG_ONLY_TEST_F(SortBufferTest, spillDuringOutput) { } } -DEBUG_ONLY_TEST_F(SortBufferTest, reserveMemorySortGetOutput) { +DEBUG_ONLY_TEST_P(SortBufferTest, reserveMemorySortGetOutput) { for (bool spillEnabled : {false, true}) { SCOPED_TRACE(fmt::format("spillEnabled {}", spillEnabled)); @@ -626,7 +635,7 @@ DEBUG_ONLY_TEST_F(SortBufferTest, reserveMemorySortGetOutput) { } } -DEBUG_ONLY_TEST_F(SortBufferTest, reserveMemorySort) { +DEBUG_ONLY_TEST_P(SortBufferTest, reserveMemorySort) { struct { bool usePrefixSort; bool spillEnabled; @@ -676,7 +685,7 @@ DEBUG_ONLY_TEST_F(SortBufferTest, reserveMemorySort) { } } -TEST_F(SortBufferTest, emptySpill) { +TEST_P(SortBufferTest, emptySpill) { const std::shared_ptr fuzzerPool = memory::memoryManager()->addLeafPool("emptySpillSource"); @@ -704,4 +713,9 @@ TEST_F(SortBufferTest, emptySpill) { ASSERT_TRUE(spillStats.rlock()->empty()); } } + +VELOX_INSTANTIATE_TEST_SUITE_P( + SortBufferTest, + SortBufferTest, + testing::ValuesIn({false, true})); } // namespace facebook::velox::functions::test diff --git a/velox/exec/tests/SpillTest.cpp b/velox/exec/tests/SpillTest.cpp index 0ed215e2feb6..170ef2563bc5 100644 --- a/velox/exec/tests/SpillTest.cpp +++ b/velox/exec/tests/SpillTest.cpp @@ -53,7 +53,12 @@ class TestRuntimeStatWriter : public BaseRuntimeStatWriter { }; } // namespace -class SpillTest : public ::testing::TestWithParam, +struct TestParam { + common::CompressionKind compressionKind; + bool enablePrefixSort; +}; + +class SpillTest : public ::testing::TestWithParam, public facebook::velox::test::VectorTestBase { public: explicit SpillTest() @@ -66,6 +71,17 @@ class SpillTest : public ::testing::TestWithParam, setThreadLocalRunTimeStatWriter(nullptr); } + static std::vector getTestParams() { + static std::vector testParams = { + {common::CompressionKind::CompressionKind_NONE, false}, + {common::CompressionKind::CompressionKind_ZLIB, true}, + {common::CompressionKind::CompressionKind_SNAPPY, false}, + {common::CompressionKind::CompressionKind_ZSTD, true}, + {common::CompressionKind::CompressionKind_LZ4, false}, + {common::CompressionKind::CompressionKind_GZIP, true}}; + return testParams; + } + protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); @@ -80,7 +96,8 @@ class SpillTest : public ::testing::TestWithParam, } filesystems::registerLocalFileSystem(); rng_.seed(1); - compressionKind_ = GetParam(); + compressionKind_ = GetParam().compressionKind; + enablePrefixSort_ = GetParam().enablePrefixSort; } uint8_t randPartitionBitOffset() { @@ -156,16 +173,22 @@ class SpillTest : public ::testing::TestWithParam, // the batch number of the vector in the partition. When read back, both // partitions produce an ascending sequence of integers without gaps. spillStats_.wlock()->reset(); + const std::optional prefixSortConfig = + enablePrefixSort_ + ? std::optional(common::PrefixSortConfig()) + : std::nullopt; + const int32_t numSortKeys = 1; state_ = std::make_unique( [&]() -> const std::string& { return tempDir_->getPath(); }, updateSpilledBytesCb_, fileNamePrefix_, numPartitions, - 1, + numSortKeys, compareFlags, targetFileSize, writeBufferSize, compressionKind_, + prefixSortConfig, pool(), &spillStats_); ASSERT_EQ(targetFileSize, state_->targetFileSize()); @@ -174,6 +197,7 @@ class SpillTest : public ::testing::TestWithParam, ASSERT_EQ(spillStats_.rlock()->spilledPartitions, 0); ASSERT_TRUE(state_->spilledPartitionSet().empty()); ASSERT_EQ(compressionKind_, state_->compressionKind()); + ASSERT_EQ(state_->sortCompareFlags().size(), numSortKeys); for (auto partition = 0; partition < state_->maxPartitions(); ++partition) { ASSERT_FALSE(state_->isPartitionSpilled(partition)); @@ -435,6 +459,7 @@ class SpillTest : public ::testing::TestWithParam, std::shared_ptr tempDir_; memory::MemoryAllocator* allocator_; common::CompressionKind compressionKind_; + bool enablePrefixSort_; std::vector> values_; std::vector> batchesByPartition_; std::string fileNamePrefix_; @@ -478,7 +503,10 @@ TEST_P(SpillTest, spillTimestamp) { Timestamp{-1, 17'123'456}, Timestamp{Timestamp::kMaxSeconds, Timestamp::kMaxNanos}, Timestamp{Timestamp::kMinSeconds, 0}}; - + const std::optional prefixSortConfig = + enablePrefixSort_ + ? std::optional(common::PrefixSortConfig()) + : std::nullopt; SpillState state( [&]() -> const std::string& { return tempDirectory->getPath(); }, updateSpilledBytesCb_, @@ -489,6 +517,7 @@ TEST_P(SpillTest, spillTimestamp) { 1024, 0, compressionKind_, + prefixSortConfig, pool(), &spillStats_); int partitionIndex = 0; @@ -868,10 +897,4 @@ TEST(SpillTest, scopedSpillInjectionRegex) { INSTANTIATE_TEST_SUITE_P( SpillTestSuite, SpillTest, - ::testing::Values( - common::CompressionKind::CompressionKind_NONE, - common::CompressionKind::CompressionKind_ZLIB, - common::CompressionKind::CompressionKind_SNAPPY, - common::CompressionKind::CompressionKind_ZSTD, - common::CompressionKind::CompressionKind_LZ4, - common::CompressionKind::CompressionKind_GZIP)); + ::testing::ValuesIn(SpillTest::getTestParams())); diff --git a/velox/exec/tests/SpillerTest.cpp b/velox/exec/tests/SpillerTest.cpp index f842c39f4976..83d6d7983e86 100644 --- a/velox/exec/tests/SpillerTest.cpp +++ b/velox/exec/tests/SpillerTest.cpp @@ -57,16 +57,19 @@ struct TestParam { // write path is executed inline with spiller control code path. int poolSize; common::CompressionKind compressionKind; + bool enablePrefixSort; core::JoinType joinType; TestParam( Spiller::Type _type, int _poolSize, common::CompressionKind _compressionKind, + bool _enablePrefixSort, core::JoinType _joinType) : type(_type), poolSize(_poolSize), compressionKind(_compressionKind), + enablePrefixSort(_enablePrefixSort), joinType(_joinType) {} std::string toString() const { @@ -75,6 +78,7 @@ struct TestParam { Spiller::typeName(type), poolSize, compressionKindToString(compressionKind), + std::to_string(enablePrefixSort), joinTypeName(joinType)); } }; @@ -90,10 +94,18 @@ struct TestParamsBuilder { static_cast(numSpillerTypes % 6); for (int poolSize : {0, 8}) { params.emplace_back( - type, poolSize, compressionKind, core::JoinType::kRight); + type, + poolSize, + compressionKind, + poolSize % 2, + core::JoinType::kRight); if (type == Spiller::Type::kHashJoinBuild) { params.emplace_back( - type, poolSize, compressionKind, core::JoinType::kLeft); + type, + poolSize, + compressionKind, + poolSize % 2, + core::JoinType::kLeft); } } } @@ -137,6 +149,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { type_(param.type), executorPoolSize_(param.poolSize), compressionKind_(param.compressionKind), + enablePrefixSort_(param.enablePrefixSort), joinType_(param.joinType), spillProbedFlag_( type_ == Spiller::Type::kHashJoinBuild && @@ -557,6 +570,10 @@ class SpillerTest : public exec::test::RowContainerTestBase { spillConfig_.readBufferSize = readBufferSize; spillConfig_.executor = executor(); spillConfig_.compressionKind = compressionKind_; + enablePrefixSort_ ? spillConfig_.prefixSortConfig = + std::optional( + common::PrefixSortConfig()) + : spillConfig_.prefixSortConfig = std::nullopt; spillConfig_.maxSpillRunRows = maxSpillRunRows; spillConfig_.maxFileSize = targetFileSize; spillConfig_.fileCreateConfig = {}; @@ -1101,6 +1118,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { const Spiller::Type type_; const int32_t executorPoolSize_; const common::CompressionKind compressionKind_; + const bool enablePrefixSort_; const core::JoinType joinType_; const bool spillProbedFlag_; const HashBitRange hashBits_; diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index a269e67c81df..23981e743c14 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -39,7 +39,12 @@ class WindowTest : public OperatorTestBase { filesystems::registerLocalFileSystem(); } - common::SpillConfig getSpillConfig(const std::string& spillDir) const { + common::SpillConfig getSpillConfig( + const std::string& spillDir, + bool enablePrefixSort) const { + const auto prefixSortConfig = enablePrefixSort + ? std::optional(common::PrefixSortConfig()) + : std::nullopt; return common::SpillConfig( [spillDir]() -> const std::string& { return spillDir; }, [&](uint64_t) {}, @@ -55,7 +60,8 @@ class WindowTest : public OperatorTestBase { 0, 0, 0, - "none"); + "none", + prefixSortConfig); } const std::shared_ptr executor_{ @@ -597,7 +603,9 @@ DEBUG_ONLY_TEST_F(WindowTest, reserveMemorySort) { struct { bool usePrefixSort; bool spillEnabled; - } testSettings[] = {{false, true}, {true, false}, {true, true}}; + bool enableSpillPrefixSort; + } testSettings[] = { + {false, true, false}, {true, false, true}, {true, true, false}}; const vector_size_t size = 1'000; auto prefixSortData = makeRowVector( @@ -638,11 +646,16 @@ DEBUG_ONLY_TEST_F(WindowTest, reserveMemorySort) { .window({"row_number() over (partition by p order by s)"}) .planNode()); - for (const auto [usePrefixSort, spillEnabled] : testSettings) { + for (const auto [usePrefixSort, spillEnabled, enableSpillPrefixSort] : + testSettings) { SCOPED_TRACE(fmt::format( - "usePrefixSort: {}, spillEnabled: {}, ", usePrefixSort, spillEnabled)); + "usePrefixSort: {}, spillEnabled: {}, enableSpillPrefixSort: {}", + usePrefixSort, + spillEnabled, + enableSpillPrefixSort)); auto spillDirectory = exec::test::TempDirectoryPath::create(); - auto spillConfig = getSpillConfig(spillDirectory->getPath()); + auto spillConfig = + getSpillConfig(spillDirectory->getPath(), enableSpillPrefixSort); folly::Synchronized spillStats; const auto plan = usePrefixSort ? prefixSortPlan : nonPrefixSortPlan; velox::common::PrefixSortConfig prefixSortConfig =