diff --git a/velox/exec/WindowPartition.cpp b/velox/exec/WindowPartition.cpp index f2b8c6b5f47..48aa7308621 100644 --- a/velox/exec/WindowPartition.cpp +++ b/velox/exec/WindowPartition.cpp @@ -55,6 +55,17 @@ WindowPartition::WindowPartition( const std::vector>& sortKeyInfo) : WindowPartition(data, {}, inputMapping, sortKeyInfo, true, false) {} +WindowPartition::WindowPartition( + const std::vector& inputMapping, + const std::vector>& sortKeyInfo, + bool partial) + : partial_(partial), + data_(nullptr), + partition_(), + complete_(!partial), + inputMapping_(inputMapping), + sortKeyInfo_(sortKeyInfo) {} + void WindowPartition::addRows(const std::vector& rows) { checkPartial(); rows_.insert(rows_.end(), rows.begin(), rows.end()); diff --git a/velox/exec/WindowPartition.h b/velox/exec/WindowPartition.h index f8a871710d8..9980034196c 100644 --- a/velox/exec/WindowPartition.h +++ b/velox/exec/WindowPartition.h @@ -30,6 +30,8 @@ namespace facebook::velox::exec { class WindowPartition { public: + virtual ~WindowPartition() = default; + /// The WindowPartition is used by the Window operator and WindowFunction /// objects to access the underlying data and columns of a partition of rows. /// The WindowPartition is constructed by WindowBuild from the input data. @@ -57,20 +59,21 @@ class WindowPartition { sortKeyInfo); /// Adds remaining input 'rows' for a partial window partition. - void addRows(const std::vector& rows); + virtual void addRows(const std::vector& rows); /// Removes the first 'numRows' in 'rows_' from a partial window partition /// after been processed. - void removeProcessedRows(vector_size_t numRows); + virtual void removeProcessedRows(vector_size_t numRows); /// Returns the number of rows in the current WindowPartition. - vector_size_t numRows() const { + virtual vector_size_t numRows() const { return partition_.size(); } /// Returns the number of rows in a window partition remaining for data /// processing. - vector_size_t numRowsForProcessing(vector_size_t partitionOffset) const; + virtual vector_size_t numRowsForProcessing( + vector_size_t partitionOffset) const; bool complete() const { return complete_; @@ -90,7 +93,7 @@ class WindowPartition { /// Copies the values at 'columnIndex' into 'result' (starting at /// 'resultOffset') for the absolute partition row positions in the /// 'rowNumbers' array. Negative row positions are copied as nulls. - void extractColumn( + virtual void extractColumn( int32_t columnIndex, folly::Range rowNumbers, vector_size_t resultOffset, @@ -99,7 +102,7 @@ class WindowPartition { /// Copies the values at 'columnIndex' into 'result' (starting at /// 'resultOffset') for 'numRows' starting at positions 'partitionOffset' /// in the partition input data. - void extractColumn( + virtual void extractColumn( int32_t columnIndex, vector_size_t partitionOffset, vector_size_t numRows, @@ -109,7 +112,7 @@ class WindowPartition { /// Extracts null positions at 'columnIndex' into 'nullsBuffer' for /// 'numRows' starting at positions 'partitionOffset' in the partition /// input data. - void extractNulls( + virtual void extractNulls( int32_t columnIndex, vector_size_t partitionOffset, vector_size_t numRows, @@ -141,7 +144,7 @@ class WindowPartition { /// keys). So peerStart and peerEnd of the last row of this call are returned /// to be passed as prevPeerStart and prevPeerEnd to the subsequent /// call to computePeerBuffers. - std::pair computePeerBuffers( + virtual std::pair computePeerBuffers( vector_size_t start, vector_size_t end, vector_size_t prevPeerStart, @@ -161,7 +164,7 @@ class WindowPartition { /// @param validFrames SelectivityVector to keep track of valid frames. /// This function unselect rows in validFrames where the frame bounds are NaN /// that are invalid. - void computeKRangeFrameBounds( + virtual void computeKRangeFrameBounds( bool isStartBound, bool isPreceding, column_index_t frameColumn, @@ -171,6 +174,20 @@ class WindowPartition { vector_size_t* rawFrameBounds, SelectivityVector& validFrames) const; + protected: + // Constructs a non-RowContainer-backed partition for subclasses that provide + // storage-specific accessors. + WindowPartition( + const std::vector& inputMapping, + const std::vector>& + sortKeyInfo, + bool partial); + + const std::vector>& sortKeyInfo() + const { + return sortKeyInfo_; + } + private: WindowPartition( RowContainer* data, diff --git a/velox/exec/benchmarks/CMakeLists.txt b/velox/exec/benchmarks/CMakeLists.txt index 1ccecdf49d5..b4dfc653f62 100644 --- a/velox/exec/benchmarks/CMakeLists.txt +++ b/velox/exec/benchmarks/CMakeLists.txt @@ -167,6 +167,18 @@ target_link_libraries( Folly::follybenchmark ) +add_executable(velox_rows_streaming_window_benchmark RowsStreamingWindowBenchmark.cpp) + +target_link_libraries( + velox_rows_streaming_window_benchmark + velox_aggregates + velox_exec + velox_exec_test_lib + velox_vector_test_lib + velox_window + Folly::follybenchmark +) + add_executable(velox_mark_sorted_benchmark MarkSortedBenchmark.cpp) target_link_libraries( diff --git a/velox/exec/benchmarks/RowsStreamingWindowBenchmark.cpp b/velox/exec/benchmarks/RowsStreamingWindowBenchmark.cpp new file mode 100644 index 00000000000..b4a6a306043 --- /dev/null +++ b/velox/exec/benchmarks/RowsStreamingWindowBenchmark.cpp @@ -0,0 +1,204 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "velox/common/memory/Memory.h" +#include "velox/core/QueryConfig.h" +#include "velox/exec/Cursor.h" +#include "velox/exec/OperatorType.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" +#include "velox/functions/prestosql/window/WindowFunctionsRegistration.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::test; + +namespace { + +constexpr int32_t kRowsPerVector = 10'000; +constexpr int32_t kOutputBatchRows = 1'000; + +class RowsStreamingWindowBenchmark : public VectorTestBase { + public: + void addBenchmarks() { + addBenchmarksForSize("10K", 1, 10'000); + addBenchmarksForSize("100K", 10, 25'000); + addBenchmarksForSize("1M", 100, 25'000); + } + + private: + struct TestCase { + std::string name; + std::vector data; + core::PlanNodePtr plan; + int64_t numRows; + }; + + std::vector makeData( + int32_t numVectors, + int32_t rowsPerPartition) { + std::vector data; + data.reserve(numVectors); + const auto rowType = ROW({"p", "s", "v"}, {INTEGER(), INTEGER(), BIGINT()}); + for (auto vectorIndex = 0; vectorIndex < numVectors; ++vectorIndex) { + data.push_back(makeRowVector( + rowType->names(), + { + makeFlatVector( + kRowsPerVector, + [vectorIndex, rowsPerPartition](auto row) { + const auto globalRow = vectorIndex * kRowsPerVector + row; + return globalRow / rowsPerPartition; + }), + makeFlatVector( + kRowsPerVector, + [vectorIndex, rowsPerPartition](auto row) { + const auto globalRow = vectorIndex * kRowsPerVector + row; + return globalRow % rowsPerPartition; + }), + makeFlatVector( + kRowsPerVector, [](auto row) { return row % 97; }), + })); + } + return data; + } + + void addBenchmarksForSize( + const std::string& sizeName, + int32_t numVectors, + int32_t rowsPerPartition) { + auto data = makeData(numVectors, rowsPerPartition); + addBenchmark( + "rowsStreamingWindowRank_" + sizeName, + data, + {"rank() over (partition by p order by s)"}); + addBenchmark( + "rowsStreamingWindowSum_" + sizeName, + data, + {"sum(v) over (partition by p order by s)"}); + addBenchmark( + "rowsStreamingWindowRankAndSum_" + sizeName, + data, + {"rank() over (partition by p order by s)", + "sum(v) over (partition by p order by s)"}); + addBenchmark( + "rowsStreamingWindowSevenFunctions_" + sizeName, + data, + {"rank() over (partition by p order by s)", + "dense_rank() over (partition by p order by s)", + "row_number() over (partition by p order by s)", + "sum(v) over (partition by p order by s)", + "count(v) over (partition by p order by s)", + "min(v) over (partition by p order by s)", + "max(v) over (partition by p order by s)"}); + } + + void addBenchmark( + const std::string& name, + const std::vector& data, + const std::vector& windowFunctions) { + auto testCase = std::make_unique(); + testCase->name = name; + testCase->data = data; + testCase->numRows = data.size() * kRowsPerVector; + testCase->plan = exec::test::PlanBuilder() + .values(testCase->data) + .streamingWindow(windowFunctions) + .planNode(); + + const auto* test = testCase.get(); + folly::addBenchmark( + __FILE__, + name, + [this, test](folly::UserCounters& counters, unsigned iterations) { + CpuWallTiming windowTiming; + uint64_t numResultRows = 0; + for (auto i = 0; i < iterations; ++i) { + numResultRows += runOnce(*test, windowTiming); + } + + BENCHMARK_SUSPEND { + counters["rows"] = folly::UserMetric( + static_cast(test->numRows), + folly::UserMetric::Type::METRIC); + counters["windowCpu"] = folly::UserMetric( + static_cast(windowTiming.cpuNanos) / iterations / 1e9, + folly::UserMetric::Type::TIME); + counters["windowWall"] = folly::UserMetric( + static_cast(windowTiming.wallNanos) / iterations / 1e9, + folly::UserMetric::Type::TIME); + folly::doNotOptimizeAway(numResultRows); + } + return iterations; + }); + + cases_.push_back(std::move(testCase)); + } + + uint64_t runOnce(const TestCase& test, CpuWallTiming& windowTiming) const { + std::unique_ptr cursor; + BENCHMARK_SUSPEND { + CursorParameters params; + params.planNode = test.plan; + params.serialExecution = true; + params.queryConfigs = { + {core::QueryConfig::kPreferredOutputBatchRows, + std::to_string(kOutputBatchRows)}}; + cursor = TaskCursor::create(params); + } + + uint64_t numResultRows = 0; + while (cursor->moveNext()) { + numResultRows += cursor->current()->size(); + } + + BENCHMARK_SUSPEND { + VELOX_CHECK_EQ(numResultRows, test.numRows); + const auto stats = cursor->task()->taskStats(); + for (const auto& pipeline : stats.pipelineStats) { + for (const auto& op : pipeline.operatorStats) { + if (op.operatorType == OperatorType::kWindow) { + windowTiming.add(op.addInputTiming); + windowTiming.add(op.getOutputTiming); + } + } + } + } + return numResultRows; + } + + std::vector> cases_; +}; + +std::unique_ptr benchmark; + +} // namespace + +int main(int argc, char** argv) { + folly::Init init(&argc, &argv); + memory::MemoryManager::initialize(memory::MemoryManager::Options{}); + aggregate::prestosql::registerAllAggregateFunctions(); + window::prestosql::registerAllWindowFunctions(); + benchmark = std::make_unique(); + benchmark->addBenchmarks(); + folly::runBenchmarks(); + benchmark.reset(); + return 0; +} diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index bda084e8ad9..f3bef79ac0d 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -149,6 +149,7 @@ set( AssignUniqueIdTest.cpp FilterProjectTest.cpp AsyncConnectorTest.cpp + VectorWindowPartitionTest.cpp ) set( diff --git a/velox/exec/tests/VectorWindowPartitionTest.cpp b/velox/exec/tests/VectorWindowPartitionTest.cpp new file mode 100644 index 00000000000..62251c0f6ef --- /dev/null +++ b/velox/exec/tests/VectorWindowPartitionTest.cpp @@ -0,0 +1,331 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/window/VectorWindowPartition.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/exec/WindowFunction.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +#include + +using namespace facebook::velox; +using namespace facebook::velox::exec; + +namespace facebook::velox::exec::test { +namespace { + +class VectorWindowPartitionTest : public testing::Test, + public velox::test::VectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); + } + + window::VectorWindowPartition makePartition( + std::vector inputChannels, + std::vector> sortKeyInfo = + {}) { + std::vector inputMapping(inputChannels.size()); + for (auto i = 0; i < inputChannels.size(); ++i) { + inputMapping[inputChannels[i]] = i; + } + return window::VectorWindowPartition{ + inputChannels, std::move(inputMapping), std::move(sortKeyInfo), pool()}; + } +}; + +TEST_F(VectorWindowPartitionTest, extractsColumnsAcrossBlocksAndRanges) { + auto firstBlock = makeRowVector({makeFlatVector({1, 2, 3, 4})}); + auto secondBlock = makeRowVector({makeFlatVector({5, 6, 7})}); + + auto partition = makePartition({0}); + partition.addRows(firstBlock, 1, 4); + partition.addRows(secondBlock, 0, 2); + + auto result = makeFlatVector(5); + partition.extractColumn(0, 0, 5, 0, result); + velox::test::assertEqualVectors( + makeFlatVector({2, 3, 4, 5, 6}), result); +} + +TEST_F(VectorWindowPartitionTest, rejectsInvalidBlocks) { + auto data = makeRowVector({makeFlatVector({1, 2, 3})}); + auto partition = makePartition({0}); + + VELOX_ASSERT_THROW(partition.addRows(nullptr, 0, 1), "Input vector"); + VELOX_ASSERT_THROW(partition.addRows(data, 2, 1), "startRow"); + VELOX_ASSERT_THROW(partition.addRows(data, 0, 4), "endRow"); +} + +TEST_F(VectorWindowPartitionTest, extractsRandomRowsAndNullRows) { + auto firstBlock = makeRowVector({makeFlatVector({1, 2, 3})}); + auto secondBlock = makeRowVector({makeFlatVector({4, 5, 6})}); + + auto partition = makePartition({0}); + partition.addRows(firstBlock, 0, firstBlock->size()); + partition.addRows(secondBlock, 0, secondBlock->size()); + + std::vector rowNumbers{5, WindowFunction::kNullRow, 0, 3}; + auto result = makeFlatVector(rowNumbers.size()); + partition.extractColumn( + 0, folly::Range(rowNumbers.data(), rowNumbers.size()), 0, result); + + EXPECT_EQ(result->valueAt(0), 6); + EXPECT_TRUE(result->isNullAt(1)); + EXPECT_EQ(result->valueAt(2), 1); + EXPECT_EQ(result->valueAt(3), 4); +} + +TEST_F(VectorWindowPartitionTest, extractsRandomRowsAfterRemoval) { + auto data = makeRowVector({makeFlatVector({1, 2, 3, 4, 5, 6})}); + std::vector rowNumbers{2, 3, WindowFunction::kNullRow}; + auto expected = makeNullableFlatVector( + std::vector>{3, 4, std::nullopt}); + + auto vectorPartition = makePartition({0}); + vectorPartition.addRows(data, 0, data->size()); + vectorPartition.removeProcessedRows(2); + auto vectorResult = makeFlatVector(rowNumbers.size()); + vectorPartition.extractColumn( + 0, folly::Range(rowNumbers.data(), rowNumbers.size()), 0, vectorResult); + velox::test::assertEqualVectors(expected, vectorResult); +} + +TEST_F(VectorWindowPartitionTest, extractsNullsAcrossBlocksAfterRemoval) { + auto firstBlock = makeRowVector( + {"s", "v"}, + { + makeFlatVector({1, 2, 3}), + makeNullableFlatVector( + std::vector>{10, std::nullopt, 30}), + }); + auto secondBlock = makeRowVector( + {"s", "v"}, + { + makeFlatVector({4, 5, 6}), + makeNullableFlatVector(std::vector>{ + std::nullopt, 50, std::nullopt}), + }); + + auto partition = makePartition({0, 1}, {{0, core::SortOrder{true, true}}}); + partition.addRows(firstBlock, 0, firstBlock->size()); + partition.addRows(secondBlock, 0, secondBlock->size()); + partition.removeProcessedRows(2); + + auto nulls = allocateNulls(4, pool_.get()); + partition.extractNulls(1, 2, 4, nulls); + const auto* rawNulls = nulls->as(); + EXPECT_FALSE(bits::isBitSet(rawNulls, 0)); + EXPECT_TRUE(bits::isBitSet(rawNulls, 1)); + EXPECT_FALSE(bits::isBitSet(rawNulls, 2)); + EXPECT_TRUE(bits::isBitSet(rawNulls, 3)); + + auto frameStarts = AlignedBuffer::allocate(2, pool_.get()); + auto frameEnds = AlignedBuffer::allocate(2, pool_.get()); + auto* rawFrameStarts = frameStarts->asMutable(); + auto* rawFrameEnds = frameEnds->asMutable(); + rawFrameStarts[0] = 2; + rawFrameEnds[0] = 3; + rawFrameStarts[1] = 4; + rawFrameEnds[1] = 5; + + auto frameNulls = allocateNulls(0, pool_.get()); + const auto frameNullRange = partition.extractNulls( + 1, SelectivityVector(2), frameStarts, frameEnds, &frameNulls); + ASSERT_TRUE(frameNullRange.has_value()); + EXPECT_EQ(frameNullRange->first, 2); + EXPECT_EQ(frameNullRange->second, 4); + + rawNulls = frameNulls->as(); + EXPECT_FALSE(bits::isBitSet(rawNulls, 0)); + EXPECT_TRUE(bits::isBitSet(rawNulls, 1)); + EXPECT_FALSE(bits::isBitSet(rawNulls, 2)); + EXPECT_TRUE(bits::isBitSet(rawNulls, 3)); +} + +TEST_F(VectorWindowPartitionTest, computesPeerBuffersAfterRemoval) { + auto firstBlock = makeRowVector({makeFlatVector({10, 10, 10})}); + auto secondBlock = makeRowVector({makeFlatVector({10, 20})}); + + auto partition = makePartition({0}, {{0, core::SortOrder{true, true}}}); + partition.addRows(firstBlock, 0, firstBlock->size()); + partition.addRows(secondBlock, 0, secondBlock->size()); + partition.removeProcessedRows(2); + + std::vector peerStarts(3); + std::vector peerEnds(3); + const auto peerBounds = partition.computePeerBuffers( + 2, 5, 0, 2, peerStarts.data(), peerEnds.data()); + + EXPECT_THAT(peerStarts, ::testing::ElementsAre(0, 0, 4)); + EXPECT_THAT(peerEnds, ::testing::ElementsAre(3, 3, 4)); + EXPECT_EQ(peerBounds.first, 4); + EXPECT_EQ(peerBounds.second, 5); +} + +TEST_F(VectorWindowPartitionTest, previousRowDoesNotRetainProcessedInput) { + auto partition = makePartition({0}, {{0, core::SortOrder{true, true}}}); + + std::weak_ptr processedInput; + { + auto firstBlock = makeRowVector({makeFlatVector({10})}); + processedInput = firstBlock; + partition.addRows(firstBlock, 0, firstBlock->size()); + } + + partition.removeProcessedRows(1); + EXPECT_TRUE(processedInput.expired()); + + auto secondBlock = makeRowVector({makeFlatVector({10})}); + partition.addRows(secondBlock, 0, secondBlock->size()); + + std::vector peerStarts(1); + std::vector peerEnds(1); + const auto peerBounds = partition.computePeerBuffers( + 1, 2, 0, 1, peerStarts.data(), peerEnds.data()); + + EXPECT_THAT(peerStarts, ::testing::ElementsAre(0)); + EXPECT_THAT(peerEnds, ::testing::ElementsAre(1)); + EXPECT_EQ(peerBounds.first, 0); + EXPECT_EQ(peerBounds.second, 2); +} + +TEST_F( + VectorWindowPartitionTest, + skipsEmptyBlocksAndExtractsAfterRepeatedRemoval) { + auto firstBlock = makeRowVector({makeFlatVector({1, 2, 3})}); + auto secondBlock = makeRowVector({makeFlatVector({4, 5})}); + + auto partition = makePartition({0}); + partition.addRows(firstBlock, 0, firstBlock->size()); + partition.addRows(secondBlock, 0, 0); + partition.addRows(secondBlock, 2, 2); + partition.removeProcessedRows(2); + partition.addRows(secondBlock, 0, secondBlock->size()); + partition.removeProcessedRows(2); + + EXPECT_EQ(partition.numRows(), 1); + auto result = makeFlatVector(1); + partition.extractColumn(0, 4, 1, 0, result); + velox::test::assertEqualVectors(makeFlatVector({5}), result); +} + +TEST_F( + VectorWindowPartitionTest, + computesKRangeFrameBoundsAfterRemovalFromSecondBlock) { + auto firstBlock = makeRowVector( + {"s", "bound"}, + { + makeFlatVector({10, 20}), + makeFlatVector({5, 15}), + }); + auto secondBlock = makeRowVector( + {"s", "bound"}, + { + makeFlatVector({30, 40, 50}), + makeFlatVector({25, 35, 45}), + }); + + auto partition = makePartition({0, 1}, {{0, core::SortOrder{true, true}}}); + partition.addRows(firstBlock, 0, firstBlock->size()); + partition.addRows(secondBlock, 0, secondBlock->size()); + partition.removeProcessedRows(2); + + std::vector peerStarts(3); + std::vector peerEnds(3); + partition.computePeerBuffers(2, 5, 0, 0, peerStarts.data(), peerEnds.data()); + + std::vector frameBounds(3); + SelectivityVector validFrames(3, true); + partition.computeKRangeFrameBounds( + true, true, 1, 2, 3, peerStarts.data(), frameBounds.data(), validFrames); + + EXPECT_THAT(peerStarts, ::testing::ElementsAre(2, 3, 4)); + EXPECT_THAT(peerEnds, ::testing::ElementsAre(2, 3, 4)); + EXPECT_THAT(frameBounds, ::testing::ElementsAre(2, 3, 4)); + EXPECT_TRUE(validFrames.isAllSelected()); +} + +TEST_F(VectorWindowPartitionTest, computesKRangeFrameBoundsForNullOrderValues) { + auto firstBlock = makeRowVector( + {"s", "bound"}, + { + makeNullableFlatVector( + std::vector>{std::nullopt, std::nullopt}), + makeNullableFlatVector( + std::vector>{std::nullopt, std::nullopt}), + }); + auto secondBlock = makeRowVector( + {"s", "bound"}, + { + makeNullableFlatVector( + std::vector>{10, 20}), + makeNullableFlatVector( + std::vector>{10, 15}), + }); + + auto partition = makePartition({0, 1}, {{0, core::SortOrder{true, true}}}); + partition.addRows(firstBlock, 0, firstBlock->size()); + partition.addRows(secondBlock, 0, secondBlock->size()); + + std::vector peerStarts(4); + std::vector peerEnds(4); + partition.computePeerBuffers(0, 4, 0, 0, peerStarts.data(), peerEnds.data()); + + std::vector frameBounds(4); + SelectivityVector validFrames(4, true); + partition.computeKRangeFrameBounds( + true, true, 1, 0, 4, peerStarts.data(), frameBounds.data(), validFrames); + + EXPECT_THAT(peerStarts, ::testing::ElementsAre(0, 0, 2, 3)); + EXPECT_THAT(peerEnds, ::testing::ElementsAre(1, 1, 2, 3)); + EXPECT_THAT(frameBounds, ::testing::ElementsAre(0, 0, 2, 3)); + EXPECT_TRUE(validFrames.isAllSelected()); +} + +TEST_F(VectorWindowPartitionTest, computesKRangeFrameBoundsAcrossBlocks) { + auto firstBlock = makeRowVector( + {"s", "bound"}, + { + makeFlatVector({1, 1}), + makeFlatVector({1, 1}), + }); + auto secondBlock = makeRowVector( + {"s", "bound"}, + { + makeFlatVector({2, 3}), + makeFlatVector({2, 3}), + }); + + auto partition = makePartition({0, 1}, {{0, core::SortOrder{true, true}}}); + partition.addRows(firstBlock, 0, firstBlock->size()); + partition.addRows(secondBlock, 0, secondBlock->size()); + + std::vector peerStarts(4); + std::vector peerEnds(4); + partition.computePeerBuffers(0, 4, 0, 0, peerStarts.data(), peerEnds.data()); + + std::vector frameBounds(4); + SelectivityVector validFrames(4, true); + partition.computeKRangeFrameBounds( + true, false, 1, 0, 4, peerStarts.data(), frameBounds.data(), validFrames); + + EXPECT_THAT(frameBounds, ::testing::ElementsAre(0, 0, 2, 3)); + EXPECT_TRUE(validFrames.isAllSelected()); +} + +} // namespace +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index 992ddf48b5d..25d6ab9609c 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -15,6 +15,7 @@ */ #include "velox/exec/Window.h" #include +#include #include "velox/common/base/Exceptions.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" @@ -27,6 +28,9 @@ #include "velox/exec/window/RowsStreamingWindowBuild.h" #include "velox/exec/window/SortWindowBuild.h" #include "velox/functions/prestosql/window/WindowFunctionsRegistration.h" +#include "velox/vector/LazyVector.h" +#include "velox/vector/SimpleVector.h" +#include "velox/vector/tests/utils/VectorMaker.h" using namespace facebook::velox::exec::test; @@ -75,6 +79,16 @@ class WindowTest : public OperatorTestBase { tsan_atomic nonReclaimableSection_{false}; }; +class TestingRowsStreamingWindowBuild + : public window::RowsStreamingWindowBuild { + public: + using window::RowsStreamingWindowBuild::RowsStreamingWindowBuild; + + bool testingHasRowContainer() const { + return data_ != nullptr; + } +}; + TEST_F(WindowTest, spill) { const vector_size_t size = 1'000; auto data = makeRowVector( @@ -437,6 +451,539 @@ DEBUG_ONLY_TEST_F(WindowTest, valuesRowsStreamingWindowBuild) { ASSERT_TRUE(isStreamCreated.load()); } +DEBUG_ONLY_TEST_F(WindowTest, encodedRowsStreamingWindowBuild) { + const vector_size_t size = 120; + const vector_size_t baseSize = size * 2; + const auto indices = makeIndices(size, [](auto row) { return row * 2; }); + auto data = makeRowVector( + {"p", "s", "v"}, + { + wrapInDictionary( + indices, + size, + makeFlatVector( + baseSize, [](auto row) { return (row / 2) / 24; })), + wrapInDictionary( + indices, + size, + makeFlatVector( + baseSize, [](auto row) { return ((row / 2) % 24) / 3; })), + wrapInDictionary( + indices, + size, + makeFlatVector( + baseSize, + [](auto row) { return (row / 2) % 7 + 1; }, + [](auto row) { + return row % 2 == 0 && (row / 2) % 11 == 0; + })), + }); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "rank() over (partition by p order by s)", + "dense_rank() over (partition by p order by s)", + "sum(v) over (partition by p order by s)"}; + + auto plan = PlanBuilder() + .values(split(data, 7)) + .orderBy({"p", "s"}, false) + .streamingWindow(kClauses) + .planNode(); + + std::atomic_bool isStreamCreated{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::window::RowsStreamingWindowBuild::RowsStreamingWindowBuild", + std::function( + [&](window::RowsStreamingWindowBuild* windowBuild) { + isStreamCreated.store(true); + })); + + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kPreferredOutputBatchRows, "4") + .config(core::QueryConfig::kMaxOutputBatchRows, "4") + .assertResults( + "SELECT *, rank() over (partition by p order by s), dense_rank() over (partition by p order by s), sum(v) over (partition by p order by s) FROM tmp"); + ASSERT_TRUE(isStreamCreated.load()); +} + +TEST_F(WindowTest, rowsStreamingWindowBuildDoesNotMaterializeRows) { + auto data = makeRowVector( + {"p", "s", "v"}, + { + makeFlatVector({1, 1, 1, 2, 2, 2}), + makeFlatVector({1, 1, 2, 1, 1, 2}), + makeFlatVector({10, 20, 30, 40, 50, 60}), + }); + + auto plan = PlanBuilder() + .values({data}) + .orderBy({"p", "s"}, false) + .streamingWindow( + {"rank() over (partition by p order by s)", + "sum(v) over (partition by p order by s)"}) + .planNode(); + auto windowNode = std::dynamic_pointer_cast(plan); + ASSERT_NE(windowNode, nullptr); + + TestingRowsStreamingWindowBuild windowBuild( + windowNode, pool(), nullptr, &nonReclaimableSection_); + windowBuild.setNumRowsPerOutput(2); + windowBuild.addInput(data); + windowBuild.noMoreInput(); + + ASSERT_FALSE(windowBuild.testingHasRowContainer()); +} + +TEST_F(WindowTest, rowsStreamingWindowBuildLoadsOnlyBoundaryColumns) { + const vector_size_t size = 6; + + auto makeLazyColumn = [&](const TypePtr& type, + std::function loader) { + return std::make_shared( + pool(), + type, + size, + std::make_unique( + [loader = std::move(loader)](RowSet /*rows*/) { + return loader(); + })); + }; + + auto partitionKey = makeLazyColumn( + INTEGER(), [&]() { return makeFlatVector({1, 1, 1, 2, 2, 2}); }); + auto sortKey = makeLazyColumn( + INTEGER(), [&]() { return makeFlatVector({1, 1, 2, 1, 1, 2}); }); + auto payload = makeLazyColumn(BIGINT(), [&]() { + return makeFlatVector({10, 20, 30, 40, 50, 60}); + }); + + auto data = makeRowVector({"p", "s", "v"}, {partitionKey, sortKey, payload}); + auto plan = PlanBuilder() + .values({data}) + .orderBy({"p", "s"}, false) + .streamingWindow({"rank() over (partition by p order by s)"}) + .planNode(); + auto windowNode = std::dynamic_pointer_cast(plan); + ASSERT_NE(windowNode, nullptr); + + TestingRowsStreamingWindowBuild windowBuild( + windowNode, pool(), nullptr, &nonReclaimableSection_); + windowBuild.setNumRowsPerOutput(2); + windowBuild.addInput(data); + + EXPECT_TRUE(partitionKey->isLoaded()); + EXPECT_TRUE(sortKey->isLoaded()); + EXPECT_FALSE(payload->isLoaded()); +} + +TEST_F( + WindowTest, + rowsStreamingWindowBuildLoadsFunctionOnlyLazyColumnDuringApply) { + const vector_size_t size = 6; + auto payload = std::make_shared( + pool(), + BIGINT(), + size, + std::make_unique([&](RowSet /*rows*/) { + return makeFlatVector({10, 20, 30, 40, 50, 60}); + })); + + auto data = makeRowVector( + {"p", "s", "v"}, + { + makeFlatVector({1, 1, 1, 1, 1, 1}), + makeFlatVector({1, 2, 3, 4, 5, 6}), + payload, + }); + auto plan = PlanBuilder() + .values({data}) + .orderBy({"p", "s"}, false) + .streamingWindow({"sum(v) over (partition by p order by s)"}) + .planNode(); + auto windowNode = std::dynamic_pointer_cast(plan); + ASSERT_NE(windowNode, nullptr); + + TestingRowsStreamingWindowBuild windowBuild( + windowNode, pool(), nullptr, &nonReclaimableSection_); + windowBuild.setNumRowsPerOutput(10); + windowBuild.addInput(data); + EXPECT_FALSE(payload->isLoaded()); + windowBuild.noMoreInput(); + + auto partition = windowBuild.nextPartition(); + HashStringAllocator stringAllocator{pool()}; + const auto& function = windowNode->windowFunctions()[0]; + auto windowFunction = WindowFunction::create( + function.functionCall->name(), + {WindowFunctionArg{BIGINT(), nullptr, 2}}, + function.functionCall->type(), + function.ignoreNulls, + pool(), + &stringAllocator, + core::QueryConfig({})); + windowFunction->resetPartition(partition.get()); + + auto peerStarts = AlignedBuffer::allocate(size, pool()); + auto peerEnds = AlignedBuffer::allocate(size, pool()); + auto frameStarts = AlignedBuffer::allocate(size, pool()); + auto frameEnds = AlignedBuffer::allocate(size, pool()); + auto* rawPeerStarts = peerStarts->asMutable(); + auto* rawPeerEnds = peerEnds->asMutable(); + auto* rawFrameStarts = frameStarts->asMutable(); + auto* rawFrameEnds = frameEnds->asMutable(); + for (auto i = 0; i < size; ++i) { + rawPeerStarts[i] = i; + rawPeerEnds[i] = i; + rawFrameStarts[i] = 0; + rawFrameEnds[i] = i; + } + + auto result = BaseVector::create(function.functionCall->type(), size, pool()); + windowFunction->apply( + peerStarts, + peerEnds, + frameStarts, + frameEnds, + SelectivityVector(size), + 0, + result); + + EXPECT_TRUE(payload->isLoaded()); + velox::test::assertEqualVectors( + makeFlatVector({10, 30, 60, 100, 150, 210}), result); +} + +TEST_F(WindowTest, rowsStreamingWindowBuildRetainsEncodedRows) { + const vector_size_t size = 12; + const vector_size_t baseSize = size * 2; + const auto indices = makeIndices(size, [](auto row) { return row * 2; }); + auto data = makeRowVector( + {"p", "s", "v"}, + { + wrapInDictionary(indices, size, makeConstant(1, baseSize)), + wrapInDictionary( + indices, + size, + makeFlatVector( + baseSize, [](auto row) { return (row / 2) / 3; })), + wrapInDictionary( + indices, + size, + makeFlatVector( + baseSize, + [](auto row) { return (row / 2) + 10; }, + [](auto row) { return (row / 2) == 5; })), + }); + + auto inputs = split(data, 5); + auto plan = PlanBuilder() + .values(inputs) + .orderBy({"p", "s"}, false) + .streamingWindow( + {"rank() over (partition by p order by s)", + "sum(v) over (partition by p order by s)"}) + .planNode(); + auto windowNode = std::dynamic_pointer_cast(plan); + ASSERT_NE(windowNode, nullptr); + + TestingRowsStreamingWindowBuild windowBuild( + windowNode, pool(), nullptr, &nonReclaimableSection_); + windowBuild.setNumRowsPerOutput(4); + for (const auto& input : inputs) { + windowBuild.addInput(input); + } + windowBuild.noMoreInput(); + + ASSERT_FALSE(windowBuild.testingHasRowContainer()); + + std::vector rowNumbers{0, 5, -1, 11}; + auto result = BaseVector::create(BIGINT(), 0, pool()); + windowBuild.nextPartition()->extractColumn( + 2, folly::Range(rowNumbers.data(), rowNumbers.size()), 0, result); + + ASSERT_EQ(result->size(), 4); + EXPECT_EQ(result->as>()->valueAt(0), 10); + EXPECT_TRUE(result->isNullAt(1)); + EXPECT_TRUE(result->isNullAt(2)); + EXPECT_EQ(result->as>()->valueAt(3), 21); +} + +TEST_F(WindowTest, rowsStreamingWindowBuildExtractsNegativeRowsAsNull) { + auto data = makeRowVector( + {"p", "s", "v"}, + { + makeFlatVector({1, 1, 1}), + makeFlatVector({1, 2, 3}), + makeFlatVector({10, 20, 30}), + }); + + auto plan = PlanBuilder() + .values({data}) + .orderBy({"p", "s"}, false) + .streamingWindow({"rank() over (partition by p order by s)"}) + .planNode(); + auto windowNode = std::dynamic_pointer_cast(plan); + ASSERT_NE(windowNode, nullptr); + + TestingRowsStreamingWindowBuild windowBuild( + windowNode, pool(), nullptr, &nonReclaimableSection_); + windowBuild.setNumRowsPerOutput(10); + windowBuild.addInput(data); + windowBuild.noMoreInput(); + + std::vector rowNumbers{0, -2, 2}; + auto result = BaseVector::create(BIGINT(), 0, pool()); + windowBuild.nextPartition()->extractColumn( + 2, folly::Range(rowNumbers.data(), rowNumbers.size()), 0, result); + + ASSERT_EQ(result->size(), 3); + EXPECT_EQ(result->as>()->valueAt(0), 10); + EXPECT_TRUE(result->isNullAt(1)); + EXPECT_EQ(result->as>()->valueAt(2), 30); +} + +TEST_F(WindowTest, rowsStreamingWindowBuildKRangeFrameNanBounds) { + const auto kNan = std::numeric_limits::quiet_NaN(); + auto data = makeRowVector( + {"s0", "bound"}, + { + makeFlatVector({1.0, 2.0, 3.0, kNan}), + makeFlatVector({kNan, 2.0, kNan, kNan}), + }); + + auto plan = PlanBuilder() + .values({data}) + .orderBy({"s0"}, false) + .streamingWindow( + {"rank() over (order by s0 range between bound preceding " + "and current row)"}) + .planNode(); + auto windowNode = std::dynamic_pointer_cast(plan); + ASSERT_NE(windowNode, nullptr); + + TestingRowsStreamingWindowBuild windowBuild( + windowNode, pool(), nullptr, &nonReclaimableSection_); + windowBuild.setNumRowsPerOutput(10); + windowBuild.addInput(data); + windowBuild.noMoreInput(); + + auto partition = windowBuild.nextPartition(); + partition->removeProcessedRows(2); + + const auto startRow = 2; + const auto numRows = data->size() - startRow; + std::vector peerStarts(numRows); + std::vector peerEnds(numRows); + partition->computePeerBuffers( + startRow, data->size(), 0, 0, peerStarts.data(), peerEnds.data()); + + std::vector frameBounds(numRows); + SelectivityVector validFrames(numRows, true); + partition->computeKRangeFrameBounds( + true, + true, + 1, + startRow, + numRows, + peerStarts.data(), + frameBounds.data(), + validFrames); + + EXPECT_FALSE(validFrames.isValid(0)); + EXPECT_TRUE(validFrames.isValid(1)); +} + +TEST_F(WindowTest, rowsStreamingWindowBuildKRangeFramePeerBounds) { + auto data = makeRowVector( + {"s0", "bound"}, + { + makeFlatVector({1, 1, 2, 3}), + makeFlatVector({1, 1, 2, 3}), + }); + + auto plan = + PlanBuilder() + .values({data}) + .orderBy({"s0"}, false) + .streamingWindow({"rank() over (order by s0 range between bound " + "following and unbounded following)"}) + .planNode(); + auto windowNode = std::dynamic_pointer_cast(plan); + ASSERT_NE(windowNode, nullptr); + + TestingRowsStreamingWindowBuild windowBuild( + windowNode, pool(), nullptr, &nonReclaimableSection_); + windowBuild.setNumRowsPerOutput(10); + windowBuild.addInput(data); + windowBuild.noMoreInput(); + + auto partition = windowBuild.nextPartition(); + const auto numRows = data->size(); + std::vector peerStarts(numRows); + std::vector peerEnds(numRows); + partition->computePeerBuffers( + 0, numRows, 0, 0, peerStarts.data(), peerEnds.data()); + + std::vector frameBounds(numRows); + SelectivityVector validFrames(numRows, true); + partition->computeKRangeFrameBounds( + true, + false, + 1, + 0, + numRows, + peerStarts.data(), + frameBounds.data(), + validFrames); + + EXPECT_EQ(frameBounds[0], 0); + EXPECT_EQ(frameBounds[1], 0); + EXPECT_EQ(frameBounds[2], 2); + EXPECT_EQ(frameBounds[3], 3); +} + +TEST_F(WindowTest, rowsStreamingWindowBuildPeerContinuesAcrossInputBatches) { + auto firstBatch = makeRowVector( + {"p", "s"}, + { + makeFlatVector({1, 1}), + makeFlatVector({10, 10}), + }); + auto secondBatch = makeRowVector( + {"p", "s"}, + { + makeFlatVector({1, 1}), + makeFlatVector({10, 10}), + }); + + auto plan = + PlanBuilder() + .values({firstBatch, secondBatch}) + .orderBy({"p", "s"}, false) + .streamingWindow({"rank() over (partition by p order by s rows " + "unbounded preceding)"}) + .planNode(); + auto windowNode = std::dynamic_pointer_cast(plan); + ASSERT_NE(windowNode, nullptr); + + TestingRowsStreamingWindowBuild windowBuild( + windowNode, pool(), nullptr, &nonReclaimableSection_); + windowBuild.setNumRowsPerOutput(2); + windowBuild.addInput(firstBatch); + windowBuild.addInput(secondBatch); + + auto partition = windowBuild.nextPartition(); + std::vector peerStarts(2); + std::vector peerEnds(2); + auto peerBounds = partition->computePeerBuffers( + 0, 2, 0, 0, peerStarts.data(), peerEnds.data()); + EXPECT_THAT(peerStarts, ::testing::ElementsAre(0, 0)); + EXPECT_THAT(peerEnds, ::testing::ElementsAre(1, 1)); + EXPECT_EQ(peerBounds.first, 0); + EXPECT_EQ(peerBounds.second, 2); + + partition->removeProcessedRows(2); + windowBuild.noMoreInput(); + + peerBounds = partition->computePeerBuffers( + 2, + 4, + peerBounds.first, + peerBounds.second, + peerStarts.data(), + peerEnds.data()); + EXPECT_THAT(peerStarts, ::testing::ElementsAre(0, 0)); + EXPECT_THAT(peerEnds, ::testing::ElementsAre(3, 3)); + EXPECT_EQ(peerBounds.first, 0); + EXPECT_EQ(peerBounds.second, 4); +} + +TEST_F(WindowTest, rowsStreamingWindowBuildKRangeFrameSearchBounds) { + auto testSearchBounds = [&](const RowVectorPtr& data, + const std::string& sortKey, + const std::string& windowFunction) { + auto plan = PlanBuilder() + .values({data}) + .orderBy({sortKey}, false) + .streamingWindow({windowFunction}) + .planNode(); + auto windowNode = std::dynamic_pointer_cast(plan); + ASSERT_NE(windowNode, nullptr); + + TestingRowsStreamingWindowBuild windowBuild( + windowNode, pool(), nullptr, &nonReclaimableSection_); + windowBuild.setNumRowsPerOutput(10); + auto splitData = split(data, 4); + for (const auto& input : splitData) { + windowBuild.addInput(input); + } + windowBuild.noMoreInput(); + + auto partition = windowBuild.nextPartition(); + partition->removeProcessedRows(2); + + const auto startRow = 2; + const auto endRow = data->size(); + const auto numRows = endRow - startRow; + std::vector peerStarts(numRows); + std::vector peerEnds(numRows); + partition->computePeerBuffers( + startRow, endRow, 0, 0, peerStarts.data(), peerEnds.data()); + + std::vector frameBounds(numRows); + SelectivityVector validFrames(numRows, true); + partition->computeKRangeFrameBounds( + true, + true, + 1, + startRow, + numRows, + peerStarts.data(), + frameBounds.data(), + validFrames); + EXPECT_THAT(frameBounds, ::testing::ElementsAre(2, 2, 3, 4, 5, 6)); + + validFrames.resizeFill(numRows, true); + partition->computeKRangeFrameBounds( + false, + false, + 2, + startRow, + numRows, + peerEnds.data(), + frameBounds.data(), + validFrames); + EXPECT_THAT(frameBounds, ::testing::ElementsAre(3, 4, 5, 6, 9, 9)); + }; + + testSearchBounds( + makeRowVector( + {"s0", "preceding_bound", "following_bound"}, + { + makeFlatVector({0, 10, 20, 30, 40, 50, 60, 70}), + makeFlatVector({-15, -5, 5, 15, 25, 35, 45, 55}), + makeFlatVector({15, 25, 35, 45, 55, 65, 75, 85}), + }), + "s0", + "rank() over (order by s0 range between preceding_bound preceding " + "and current row)"); + + testSearchBounds( + makeRowVector( + {"s0", "preceding_bound", "following_bound"}, + { + makeFlatVector({70, 60, 50, 40, 30, 20, 10, 0}), + makeFlatVector({85, 75, 65, 55, 45, 35, 25, 15}), + makeFlatVector({55, 45, 35, 25, 15, 5, -5, -15}), + }), + "s0 DESC", + "rank() over (order by s0 desc range between preceding_bound preceding " + "and current row)"); +} + TEST_F(WindowTest, prePartitionedSortBuild) { const vector_size_t size = 1'000; const int numPartitions = 37; diff --git a/velox/exec/window/CMakeLists.txt b/velox/exec/window/CMakeLists.txt index 24e9d6e6b58..e5dae87aad0 100644 --- a/velox/exec/window/CMakeLists.txt +++ b/velox/exec/window/CMakeLists.txt @@ -17,17 +17,22 @@ velox_add_library( AggregateWindow.cpp PartitionStreamingWindowBuild.cpp RowsStreamingWindowBuild.cpp + SingleRowValues.cpp SortWindowBuild.cpp SubPartitionedSortWindowBuild.cpp + VectorWindowPartition.cpp WindowBuild.cpp HEADERS AggregateWindow.h KRangeFrameBound.h PartitionStreamingWindowBuild.h PeerGroupComputation.h + RowRange.h RowsStreamingWindowBuild.h + SingleRowValues.h SortWindowBuild.h SubPartitionedSortWindowBuild.h + VectorWindowPartition.h WindowBuild.h WindowPartitionAccessor.h ) diff --git a/velox/exec/window/RowRange.h b/velox/exec/window/RowRange.h new file mode 100644 index 00000000000..840e8795e7b --- /dev/null +++ b/velox/exec/window/RowRange.h @@ -0,0 +1,55 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/common/base/Exceptions.h" +#include "velox/vector/ComplexVector.h" + +#include + +namespace facebook::velox::exec::window { + +/// Represents a contiguous row range [startRow, endRow) from an input vector. +struct RowRange { + /// Creates a range over 'input' and checks that [startRow, endRow) is a valid + /// row range within 'input'. + RowRange(RowVectorPtr input, vector_size_t startRow, vector_size_t endRow) + : input(std::move(input)), startRow(startRow), endRow(endRow) { + VELOX_CHECK_NOT_NULL(this->input, "Input vector must not be null"); + VELOX_CHECK_LE( + startRow, endRow, "startRow must be less than or equal to endRow"); + VELOX_CHECK_LE( + endRow, + this->input->size(), + "endRow must be less than or equal to input size"); + } + + /// Input vector that owns the rows in this range. + RowVectorPtr input; + + /// First row in 'input', inclusive. + vector_size_t startRow; + + /// Last row in 'input', exclusive. + vector_size_t endRow; + + /// Number of rows in the range. + vector_size_t size() const { + return endRow - startRow; + } +}; + +} // namespace facebook::velox::exec::window diff --git a/velox/exec/window/RowsStreamingWindowBuild.cpp b/velox/exec/window/RowsStreamingWindowBuild.cpp index e85fd9ac57c..0ba48c67bcc 100644 --- a/velox/exec/window/RowsStreamingWindowBuild.cpp +++ b/velox/exec/window/RowsStreamingWindowBuild.cpp @@ -16,11 +16,14 @@ #include "velox/exec/window/RowsStreamingWindowBuild.h" #include "velox/common/testutil/TestValue.h" -#include "velox/exec/WindowFunction.h" +#include "velox/exec/window/VectorWindowPartition.h" + +#include namespace facebook::velox::exec::window { namespace { + bool hasRangeFrame(const std::shared_ptr& windowNode) { for (const auto& function : windowNode->windowFunctions()) { if (function.frame.type == core::WindowNode::WindowType::kRange) { @@ -29,6 +32,45 @@ bool hasRangeFrame(const std::shared_ptr& windowNode) { } return false; } + +void appendUnique( + std::vector& channels, + column_index_t channel) { + if (std::find(channels.begin(), channels.end(), channel) == channels.end()) { + channels.push_back(channel); + } +} + +// Returns the deduplicated input channels referenced by 'keyInfo', in +// first-seen order. +std::vector keyChannels( + const std::vector>& keyInfo, + const std::vector& inputChannels) { + std::vector channels; + channels.reserve(keyInfo.size()); + for (const auto& key : keyInfo) { + appendUnique(channels, inputChannels[key.first]); + } + return channels; +} + +// Returns the deduplicated input channels referenced by the partition and sort +// keys, in first-seen order. +std::vector keyChannels( + const std::vector>& + partitionKeyInfo, + const std::vector>& sortKeyInfo, + const std::vector& inputChannels) { + std::vector channels; + channels.reserve(partitionKeyInfo.size() + sortKeyInfo.size()); + for (const auto& key : partitionKeyInfo) { + appendUnique(channels, inputChannels[key.first]); + } + for (const auto& key : sortKeyInfo) { + appendUnique(channels, inputChannels[key.first]); + } + return channels; +} } // namespace RowsStreamingWindowBuild::RowsStreamingWindowBuild( @@ -37,9 +79,13 @@ RowsStreamingWindowBuild::RowsStreamingWindowBuild( const common::SpillConfig* spillConfig, tsan_atomic* nonReclaimableSection) : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection), - hasRangeFrame_(hasRangeFrame(windowNode)) { - initializeRowContainer(pool); - initializeDecodedInputVectors(); + hasRangeFrame_(hasRangeFrame(windowNode)), + partitionKeyValues_(keyChannels(partitionKeyInfo_, inputChannels_), pool), + peerKeyValues_(keyChannels(sortKeyInfo_, inputChannels_), pool), + boundaryKeyChannels_( + keyChannels(partitionKeyInfo_, sortKeyInfo_, inputChannels_)), + pool_(pool) { + VELOX_CHECK_NOT_NULL(pool_); velox::common::testutil::TestValue::adjust( "facebook::velox::exec::window::RowsStreamingWindowBuild::RowsStreamingWindowBuild", this); @@ -53,62 +99,75 @@ bool RowsStreamingWindowBuild::needsInput() { void RowsStreamingWindowBuild::ensureInputPartition() { if (windowPartitions_.empty() || windowPartitions_.back()->complete()) { windowPartitions_.emplace_back( - std::make_shared( - data_.get(), inversedInputChannels_, sortKeyInfo_)); + std::make_shared( + inputChannels_, inversedInputChannels_, sortKeyInfo_, pool_)); } } void RowsStreamingWindowBuild::addPartitionInputs(bool finished) { - if (inputRows_.empty()) { + if (currentRanges_.empty()) { + if (finished && !windowPartitions_.empty() && + !windowPartitions_.back()->complete()) { + windowPartitions_.back()->setComplete(); + } return; } ensureInputPartition(); - windowPartitions_.back()->addRows(inputRows_); + auto partition = + std::static_pointer_cast(windowPartitions_.back()); + for (const auto& range : currentRanges_) { + partition->addRows(range.input, range.startRow, range.endRow); + } if (finished) { windowPartitions_.back()->setComplete(); } - inputRows_.clear(); - inputRows_.shrink_to_fit(); + currentRanges_.clear(); + pendingRowCount_ = 0; } void RowsStreamingWindowBuild::addInput(RowVectorPtr input) { - for (auto i = 0; i < inputChannels_.size(); ++i) { - decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i])); - } + loadBoundaryColumns(input); + vector_size_t rangeStart = 0; for (auto row = 0; row < input->size(); ++row) { - char* newRow = data_->newRow(); - - for (auto col = 0; col < input->childrenSize(); ++col) { - data_->store(decodedInputVectors_[col], row, newRow, col); - } - - if (previousRow_ != nullptr && - compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) { + const bool hasPreviousRow = row > 0 || partitionKeyValues_.hasValue(); + if (isNewPartition(input, row)) { + flushRange(input, rangeStart, row); addPartitionInputs(true); + rangeStart = row; } - - if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_) { + if (hasPreviousRow && pendingRowCount_ >= numRowsPerOutput_) { // Needs to wait the peer group ready for range frame. if (hasRangeFrame_) { - if (compareRowsWithKeys(previousRow_, newRow, sortKeyInfo_)) { + if (isNewPeerGroup(input, row)) { + flushRange(input, rangeStart, row); addPartitionInputs(false); + rangeStart = row; } } else { + flushRange(input, rangeStart, row); addPartitionInputs(false); + rangeStart = row; } } - inputRows_.push_back(newRow); - previousRow_ = newRow; + ++pendingRowCount_; + } + + flushRange(input, rangeStart, input->size()); + if (input->size() > 0) { + partitionKeyValues_.capture(input, input->size() - 1); + peerKeyValues_.capture(input, input->size() - 1); } } void RowsStreamingWindowBuild::noMoreInput() { addPartitionInputs(true); + partitionKeyValues_.reset(); + peerKeyValues_.reset(); } std::shared_ptr RowsStreamingWindowBuild::nextPartition() { @@ -139,4 +198,58 @@ bool RowsStreamingWindowBuild::hasNextPartition() { return false; } +void RowsStreamingWindowBuild::flushRange( + const RowVectorPtr& input, + vector_size_t start, + vector_size_t end) { + if (start >= end) { + return; + } + currentRanges_.emplace_back(input, start, end); +} + +bool RowsStreamingWindowBuild::isNewPartition( + const RowVectorPtr& input, + vector_size_t row) const { + if (row == 0 && !partitionKeyValues_.hasValue()) { + return false; + } + return !compareRowsEqual(input, row, partitionKeyInfo_, partitionKeyValues_); +} + +bool RowsStreamingWindowBuild::isNewPeerGroup( + const RowVectorPtr& input, + vector_size_t row) const { + if (row == 0 && !peerKeyValues_.hasValue()) { + return false; + } + return !compareRowsEqual(input, row, sortKeyInfo_, peerKeyValues_); +} + +bool RowsStreamingWindowBuild::compareRowsEqual( + const RowVectorPtr& input, + vector_size_t row, + const std::vector>& keyInfo, + const SingleRowValues& previousValues) const { + if (row == 0) { + return previousValues.equals(input, row); + } + + for (const auto& key : keyInfo) { + const auto inputColumn = inputChannels_[key.first]; + if (!input->childAt(inputColumn) + ->equalValueAt(input->childAt(inputColumn).get(), row - 1, row)) { + return false; + } + } + return true; +} + +void RowsStreamingWindowBuild::loadBoundaryColumns( + const RowVectorPtr& input) const { + for (const auto channel : boundaryKeyChannels_) { + input->childAt(channel)->loadedVector(); + } +} + } // namespace facebook::velox::exec::window diff --git a/velox/exec/window/RowsStreamingWindowBuild.h b/velox/exec/window/RowsStreamingWindowBuild.h index efd04a1df67..18481bd036f 100644 --- a/velox/exec/window/RowsStreamingWindowBuild.h +++ b/velox/exec/window/RowsStreamingWindowBuild.h @@ -16,13 +16,19 @@ #pragma once +#include "velox/exec/window/RowRange.h" +#include "velox/exec/window/SingleRowValues.h" #include "velox/exec/window/WindowBuild.h" +#include +#include +#include + namespace facebook::velox::exec::window { /// Unlike PartitionStreamingWindowBuild, RowsStreamingWindowBuild is capable of /// processing window functions as rows arrive within a single partition, -/// without the need to wait for the entirewindow partition to be ready. This +/// without the need to wait for the entire window partition to be ready. This /// approach can significantly reduce memory usage, especially when a single /// partition contains a large amount of data. It is particularly suited for /// optimizing rank, dense_rank and row_number functions, as well as aggregate @@ -54,27 +60,63 @@ class RowsStreamingWindowBuild : public WindowBuild { bool needsInput() override; private: + // Flushes rows in [start, end) from 'input' as a vector row range. + void + flushRange(const RowVectorPtr& input, vector_size_t start, vector_size_t end); + // Adds input rows to the current partition, or creates a new partition if it // does not exist. void addPartitionInputs(bool finished); - // Invoked before add input to ensure there is an open (in-complete) partition + // Invoked before add input to ensure there is an open (incomplete) partition // to accept new input. The function creates a new one at the tail of // 'windowPartitions_' if it is empty or the last partition is already // completed. void ensureInputPartition(); + // Returns true if 'row' starts a new partition relative to the previous row. + bool isNewPartition(const RowVectorPtr& input, vector_size_t row) const; + + // Returns true if 'row' starts a new peer group relative to the previous row. + bool isNewPeerGroup(const RowVectorPtr& input, vector_size_t row) const; + + // Compares 'row' with the previous row using the specified key columns. + // 'previousValues' holds the captured values of the previous input vector's + // last row over the same keys; it is used only when 'row' is 0. + bool compareRowsEqual( + const RowVectorPtr& input, + vector_size_t row, + const std::vector>& keyInfo, + const SingleRowValues& previousValues) const; + + // Loads only key columns needed to detect partition and peer boundaries. + void loadBoundaryColumns(const RowVectorPtr& input) const; + // Sets to true if this window node has range frames. const bool hasRangeFrame_; - // Points to the input rows in the current partition. - std::vector inputRows_; + // Ranges of input rows buffered for the current partition. + std::vector currentRanges_; + + // Partition-key values from the last row of the previous input vector, used + // to detect partition boundaries across vectors. + SingleRowValues partitionKeyValues_; + + // Sort-key values from the last row of the previous input vector, used to + // detect peer-group boundaries across vectors. + SingleRowValues peerKeyValues_; + + // Original input channels used to detect partition and peer boundaries. + std::vector boundaryKeyChannels_; + + // Pool used to create window partitions. + memory::MemoryPool* const pool_; - // Used to compare rows based on partitionKeys. - char* previousRow_ = nullptr; + // Number of rows accumulated since the last partial flush. + vector_size_t pendingRowCount_{0}; - /// The output gets next partition from the head of 'windowPartitions_' and - /// input adds to the next partition from the tail of 'windowPartitions_'. + // The output gets next partition from the head of 'windowPartitions_' and + // input adds to the next partition from the tail of 'windowPartitions_'. std::deque> windowPartitions_; }; diff --git a/velox/exec/window/SingleRowValues.cpp b/velox/exec/window/SingleRowValues.cpp new file mode 100644 index 00000000000..fad80199d18 --- /dev/null +++ b/velox/exec/window/SingleRowValues.cpp @@ -0,0 +1,59 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/exec/window/SingleRowValues.h" + +#include "velox/common/base/Exceptions.h" + +namespace facebook::velox::exec::window { + +SingleRowValues::SingleRowValues( + std::vector channels, + memory::MemoryPool* pool) + : channels_(std::move(channels)), pool_(pool) { + VELOX_CHECK_NOT_NULL(pool_); +} + +void SingleRowValues::capture(const RowVectorPtr& input, vector_size_t row) { + VELOX_CHECK_NOT_NULL(input); + + values_.clear(); + values_.reserve(channels_.size()); + for (const auto channel : channels_) { + auto value = BaseVector::create(input->childAt(channel)->type(), 1, pool_); + value->copy(input->childAt(channel).get(), 0, row, 1); + values_.push_back(std::move(value)); + } + hasValue_ = true; +} + +void SingleRowValues::reset() { + hasValue_ = false; + values_.clear(); +} + +bool SingleRowValues::equals(const RowVectorPtr& input, vector_size_t row) + const { + VELOX_CHECK(hasValue_); + for (auto i = 0; i < channels_.size(); ++i) { + const auto channel = channels_[i]; + if (!values_[i]->equalValueAt(input->childAt(channel).get(), 0, row)) { + return false; + } + } + return true; +} + +} // namespace facebook::velox::exec::window diff --git a/velox/exec/window/SingleRowValues.h b/velox/exec/window/SingleRowValues.h new file mode 100644 index 00000000000..1635afc3512 --- /dev/null +++ b/velox/exec/window/SingleRowValues.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/core/PlanNode.h" +#include "velox/vector/BaseVector.h" + +#include + +namespace facebook::velox::exec::window { + +/// Stores copies of selected column values from a single row for later +/// comparison. Behaves like std::optional: it is empty after construction, +/// capture() populates it with a row, reset() clears it, and hasValue() +/// reports whether it currently holds a row. +class SingleRowValues { + public: + /// 'channels' are the input columns to copy and compare. 'pool' allocates the + /// copied values and must outlive this object. + SingleRowValues( + std::vector channels, + memory::MemoryPool* pool); + + /// Copies the configured columns from 'input' row 'row'. + void capture(const RowVectorPtr& input, vector_size_t row); + + /// Clears the stored values. + void reset(); + + /// Returns true if a row has been captured. + bool hasValue() const { + return hasValue_; + } + + /// Returns true if the stored values equal 'input' row 'row' over the + /// configured columns. + bool equals(const RowVectorPtr& input, vector_size_t row) const; + + private: + // Input columns copied and compared. + const std::vector channels_; + + // Pool used to allocate copied values. The owner must outlive this object. + memory::MemoryPool* const pool_; + + // True if values_ holds a captured row. + bool hasValue_{false}; + + // Copied one-row values, one per entry in channels_. + std::vector values_; +}; + +} // namespace facebook::velox::exec::window diff --git a/velox/exec/window/VectorWindowPartition.cpp b/velox/exec/window/VectorWindowPartition.cpp new file mode 100644 index 00000000000..114e91c2830 --- /dev/null +++ b/velox/exec/window/VectorWindowPartition.cpp @@ -0,0 +1,388 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/exec/window/VectorWindowPartition.h" +#include "velox/exec/window/KRangeFrameBound.h" +#include "velox/exec/window/PeerGroupComputation.h" +#include "velox/vector/SimpleVector.h" + +#include +#include +#include + +namespace facebook::velox::exec::window { + +namespace { + +// Points to a row in a retained input vector. +struct RowReference { + // Input vector that owns the referenced row. + RowVectorPtr input; + + // Row number in 'input'. + vector_size_t row; +}; + +void appendUnique( + std::vector& channels, + column_index_t channel) { + if (std::find(channels.begin(), channels.end(), channel) == channels.end()) { + channels.push_back(channel); + } +} + +// Returns the deduplicated input channels referenced by the sort keys, in +// first-seen order. +std::vector keyChannels( + const std::vector>& keyInfo, + const std::vector& inputChannels) { + std::vector channels; + channels.reserve(keyInfo.size()); + for (const auto& key : keyInfo) { + appendUnique(channels, inputChannels[key.first]); + } + return channels; +} + +} // namespace + +VectorWindowPartition::VectorWindowPartition( + const std::vector& inputChannels, + const std::vector& inputMapping, + const std::vector>& sortKeyInfo, + memory::MemoryPool* pool) + : WindowPartition(inputMapping, sortKeyInfo, true), + previousRow_(keyChannels(sortKeyInfo, inputChannels), pool), + inputChannels_(inputChannels) {} + +void VectorWindowPartition::addRows(const std::vector& /*rows*/) { + VELOX_FAIL("VectorWindowPartition does not support RowContainer rows"); +} + +void VectorWindowPartition::addRows( + const RowVectorPtr& input, + vector_size_t startRow, + vector_size_t endRow) { + RowRange range{input, startRow, endRow}; + if (range.size() == 0) { + return; + } + + ranges_.push_back(range); + rangePrefixSums_.push_back(rangePrefixSums_.back() + range.size()); + totalRows_ += range.size(); +} + +void VectorWindowPartition::removeProcessedRows(vector_size_t numRows) { + VELOX_CHECK_LE(numRows, totalRows_); + if (numRows == 0) { + return; + } + + if (complete() && numRows == totalRows_) { + previousRow_.reset(); + } else { + const auto [rangeIndex, localRow] = findRange(numRows - 1); + previousRow_.capture(ranges_[rangeIndex].input, localRow); + } + + auto remaining = numRows; + while (remaining > 0) { + auto& range = ranges_.front(); + const auto rangeSize = range.size(); + if (remaining >= rangeSize) { + ranges_.pop_front(); + remaining -= rangeSize; + } else { + range.startRow += remaining; + remaining = 0; + } + } + + startRow_ += numRows; + rebuildPrefixSums(); +} + +void VectorWindowPartition::extractColumn( + int32_t columnIndex, + vector_size_t partitionOffset, + vector_size_t numRows, + vector_size_t resultOffset, + const VectorPtr& result) const { + VELOX_CHECK_GE(partitionOffset, startRow_); + if (numRows == 0) { + return; + } + + result->resize(resultOffset + numRows); + + auto [rangeIndex, localRow] = findRange(partitionOffset - startRow_); + auto remaining = numRows; + auto outputOffset = resultOffset; + while (remaining > 0) { + const auto& range = ranges_[rangeIndex]; + const auto numRowsToCopy = std::min(range.endRow - localRow, remaining); + result->copy( + range.input->childAt(columnIndex).get(), + outputOffset, + localRow, + numRowsToCopy); + + outputOffset += numRowsToCopy; + remaining -= numRowsToCopy; + if (remaining > 0) { + ++rangeIndex; + localRow = ranges_[rangeIndex].startRow; + } + } +} + +void VectorWindowPartition::extractColumn( + int32_t columnIndex, + folly::Range rowNumbers, + vector_size_t resultOffset, + const VectorPtr& result) const { + if (rowNumbers.empty()) { + return; + } + + result->resize(resultOffset + rowNumbers.size()); + + for (auto i = 0; i < rowNumbers.size(); ++i) { + const auto rowNumber = rowNumbers[i]; + if (rowNumber < 0) { + result->setNull(resultOffset + i, true); + continue; + } + + VELOX_CHECK_GE(rowNumber, startRow_); + const auto [rangeIndex, localRow] = findRange(rowNumber - startRow_); + result->copy( + ranges_[rangeIndex].input->childAt(columnIndex).get(), + resultOffset + i, + localRow, + 1); + } +} + +void VectorWindowPartition::extractNulls( + int32_t columnIndex, + vector_size_t partitionOffset, + vector_size_t numRows, + const BufferPtr& nullsBuffer) const { + VELOX_CHECK_GE(partitionOffset, startRow_); + if (numRows == 0) { + return; + } + + auto* rawNulls = nullsBuffer->asMutable(); + bits::fillBits(rawNulls, 0, numRows, false); + + auto [rangeIndex, localRow] = findRange(partitionOffset - startRow_); + vector_size_t processedRows = 0; + while (processedRows < numRows) { + const auto& range = ranges_[rangeIndex]; + const auto input = range.input->childAt(columnIndex); + const auto numRowsToProcess = + std::min(range.endRow - localRow, numRows - processedRows); + + for (auto i = 0; i < numRowsToProcess; ++i) { + if (input->isNullAt(localRow + i)) { + bits::setBit(rawNulls, processedRows + i, true); + } + } + + processedRows += numRowsToProcess; + if (processedRows < numRows) { + ++rangeIndex; + localRow = ranges_[rangeIndex].startRow; + } + } +} + +class VectorWindowPartition::VectorAccessor { + public: + explicit VectorAccessor(const VectorWindowPartition& partition) + : partition_(partition) {} + + vector_size_t startRow() const { + return partition_.startRow_; + } + + vector_size_t partitionEnd() const { + return partition_.startRow_ + partition_.totalRows_; + } + + bool hasPreviousRow() const { + return partition_.previousRow_.hasValue(); + } + + bool previousRowEquals(vector_size_t row) const { + const auto rowRef = rowAt(row); + return partition_.previousRow_.equals(rowRef.input, rowRef.row); + } + + bool rowsEqual(vector_size_t lhs, vector_size_t rhs) const { + return rowsEqual(rowAt(lhs), rowAt(rhs), partition_.sortKeyInfo()); + } + + std::optional compareFrameValue( + vector_size_t orderByRow, + vector_size_t frameValueRow, + column_index_t frameColumn, + const CompareFlags& flags) const { + const auto orderByRef = rowAt(orderByRow); + const auto frameValueRef = rowAt(frameValueRow); + return orderByRef.input->childAt(orderByColumn()) + ->compare( + frameValueRef.input->childAt(frameColumn).get(), + orderByRef.row, + frameValueRef.row, + flags); + } + + bool frameValueIsNull(vector_size_t row, column_index_t frameColumn) const { + const auto rowRef = rowAt(row); + return rowRef.input->childAt(frameColumn)->isNullAt(rowRef.row); + } + + bool orderByValueIsNull(vector_size_t row) const { + const auto rowRef = rowAt(row); + return rowRef.input->childAt(orderByColumn())->isNullAt(rowRef.row); + } + + template + bool frameValueIsNan(vector_size_t row, column_index_t frameColumn) const { + const auto rowRef = rowAt(row); + return partition_.isNanAt( + rowRef.input->childAt(frameColumn), rowRef.row); + } + + template + bool orderByValueIsNan(vector_size_t row) const { + const auto rowRef = rowAt(row); + return partition_.isNanAt( + rowRef.input->childAt(orderByColumn()), rowRef.row); + } + + private: + column_index_t orderByColumn() const { + return partition_.inputChannels_[partition_.sortKeyInfo()[0].first]; + } + + // Returns a reference to the absolute partition row. + RowReference rowAt(vector_size_t row) const { + VELOX_CHECK_GE(row, partition_.startRow_); + const auto [rangeIndex, localRow] = + partition_.findRange(row - partition_.startRow_); + return {partition_.ranges_[rangeIndex].input, localRow}; + } + + // Returns true if two retained rows are equal over the specified keys. + bool rowsEqual( + const RowReference& lhs, + const RowReference& rhs, + const std::vector>& keyInfo) + const { + for (const auto& key : keyInfo) { + const auto inputColumn = partition_.inputChannels_[key.first]; + if (!lhs.input->childAt(inputColumn) + ->equalValueAt( + rhs.input->childAt(inputColumn).get(), lhs.row, rhs.row)) { + return false; + } + } + return true; + } + + const VectorWindowPartition& partition_; +}; + +std::pair +VectorWindowPartition::computePeerBuffers( + vector_size_t start, + vector_size_t end, + vector_size_t prevPeerStart, + vector_size_t prevPeerEnd, + vector_size_t* rawPeerStarts, + vector_size_t* rawPeerEnds) { + VectorAccessor rows{*this}; + auto result = PeerGroupComputation::compute( + rows, start, end, prevPeerStart, prevPeerEnd, rawPeerStarts, rawPeerEnds); + if (result.previousRowConsumed) { + previousRow_.reset(); + } + return {result.peerStart, result.peerEnd}; +} + +template +bool VectorWindowPartition::isNanAt(const VectorPtr& vector, vector_size_t row) + const { + return !vector->isNullAt(row) && + std::isnan(vector->loadedVector()->as>()->valueAt(row)); +} + +void VectorWindowPartition::computeKRangeFrameBounds( + bool isStartBound, + bool isPreceding, + column_index_t frameColumn, + vector_size_t startRow, + vector_size_t numRows, + const vector_size_t* rawPeerBounds, + vector_size_t* rawFrameBounds, + SelectivityVector& validFrames) const { + if (numRows == 0) { + return; + } + + const auto sortOrder = sortKeyInfo()[0].second; + const auto frameType = ranges_.front().input->childAt(frameColumn)->type(); + + VectorAccessor rows{*this}; + KRangeFrameBound::compute( + rows, + isStartBound, + isPreceding, + frameColumn, + sortOrder, + frameType, + startRow, + numRows, + rawPeerBounds, + rawFrameBounds, + validFrames); +} + +std::pair VectorWindowPartition::findRange( + vector_size_t row) const { + VELOX_CHECK_LT(row, totalRows_); + + const auto it = + std::upper_bound(rangePrefixSums_.begin(), rangePrefixSums_.end(), row); + const size_t rangeIndex = std::distance(rangePrefixSums_.begin(), it) - 1; + const auto offsetInRange = row - rangePrefixSums_[rangeIndex]; + return {rangeIndex, ranges_[rangeIndex].startRow + offsetInRange}; +} + +void VectorWindowPartition::rebuildPrefixSums() { + rangePrefixSums_.clear(); + rangePrefixSums_.push_back(0); + for (const auto& range : ranges_) { + rangePrefixSums_.push_back(rangePrefixSums_.back() + range.size()); + } + totalRows_ = rangePrefixSums_.back(); +} + +} // namespace facebook::velox::exec::window diff --git a/velox/exec/window/VectorWindowPartition.h b/velox/exec/window/VectorWindowPartition.h new file mode 100644 index 00000000000..6a567071dd5 --- /dev/null +++ b/velox/exec/window/VectorWindowPartition.h @@ -0,0 +1,138 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/exec/WindowPartition.h" +#include "velox/exec/window/RowRange.h" +#include "velox/exec/window/SingleRowValues.h" + +#include +#include +#include + +namespace facebook::velox::exec::window { + +/// Provides WindowPartition accessors over retained input vectors. +class VectorWindowPartition : public WindowPartition { + public: + /// Constructs a partial window partition over retained input vector ranges. + VectorWindowPartition( + const std::vector& inputChannels, + const std::vector& inputMapping, + const std::vector>& + sortKeyInfo, + memory::MemoryPool* pool); + + /// Returns the number of retained rows in this partition. + vector_size_t numRows() const override { + return totalRows_; + } + + /// Returns the number of retained rows available for processing. + vector_size_t numRowsForProcessing( + vector_size_t /*partitionOffset*/) const override { + return totalRows_; + } + + /// Rejects RowContainer rows because this partition stores vector ranges. + void addRows(const std::vector& rows) override; + + /// Adds a retained input vector row range to this partition. + void addRows( + const RowVectorPtr& input, + vector_size_t startRow, + vector_size_t endRow); + + /// Removes processed rows from the front of this partition. + void removeProcessedRows(vector_size_t numRows) override; + + /// Extracts a contiguous row range from retained input vectors. + void extractColumn( + int32_t columnIndex, + vector_size_t partitionOffset, + vector_size_t numRows, + vector_size_t resultOffset, + const VectorPtr& result) const override; + + /// Extracts arbitrary row positions from retained input vectors. + void extractColumn( + int32_t columnIndex, + folly::Range rowNumbers, + vector_size_t resultOffset, + const VectorPtr& result) const override; + + /// Extracts null positions from a retained input vector column. + void extractNulls( + int32_t columnIndex, + vector_size_t partitionOffset, + vector_size_t numRows, + const BufferPtr& nullsBuffer) const override; + + using WindowPartition::extractNulls; + + /// Computes peer group bounds for retained input vector rows. + std::pair computePeerBuffers( + vector_size_t start, + vector_size_t end, + vector_size_t prevPeerStart, + vector_size_t prevPeerEnd, + vector_size_t* rawPeerStarts, + vector_size_t* rawPeerEnds) override; + + /// Computes k range frame bounds for retained input vector rows. + void computeKRangeFrameBounds( + bool isStartBound, + bool isPreceding, + column_index_t frameColumn, + vector_size_t startRow, + vector_size_t numRows, + const vector_size_t* rawPeerStarts, + vector_size_t* rawFrameBounds, + SelectivityVector& validFrames) const override; + + private: + class VectorAccessor; + + // Returns true if the vector value at 'row' is NaN. + template + bool isNanAt(const VectorPtr& vector, vector_size_t row) const; + + // Finds the retained range and local row for a partition-relative row. + std::pair findRange(vector_size_t row) const; + + // Rebuilds range prefix sums after processed rows are removed. + void rebuildPrefixSums(); + + // Retained input vector row ranges. + std::deque ranges_; + + // Prefix sums of retained row counts by range. + std::vector rangePrefixSums_{0}; + + // Number of retained rows in this partition. + vector_size_t totalRows_{0}; + + // Absolute partition offset of the first retained row. + vector_size_t startRow_{0}; + + // Last row from the previously processed range, if needed for peer grouping. + SingleRowValues previousRow_; + + // Maps window input columns to retained input vector columns. + const std::vector inputChannels_; +}; + +} // namespace facebook::velox::exec::window