diff --git a/velox/exec/RowsStreamingWindowBuild.cpp b/velox/exec/RowsStreamingWindowBuild.cpp index 81d4a4f8d00a..8c72e0babf93 100644 --- a/velox/exec/RowsStreamingWindowBuild.cpp +++ b/velox/exec/RowsStreamingWindowBuild.cpp @@ -16,15 +16,28 @@ #include "velox/exec/RowsStreamingWindowBuild.h" #include "velox/common/testutil/TestValue.h" +#include "velox/exec/WindowFunction.h" namespace facebook::velox::exec { +namespace { +bool hasRangeFrame(const std::shared_ptr& windowNode) { + for (const auto& function : windowNode->windowFunctions()) { + if (function.frame.type == core::WindowNode::WindowType::kRange) { + return true; + } + } + return false; +} +} // namespace + RowsStreamingWindowBuild::RowsStreamingWindowBuild( const std::shared_ptr& windowNode, velox::memory::MemoryPool* pool, const common::SpillConfig* spillConfig, tsan_atomic* nonReclaimableSection) - : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) { + : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection), + hasRangeFrame_(hasRangeFrame(windowNode)) { velox::common::testutil::TestValue::adjust( "facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild", this); @@ -68,7 +81,14 @@ void RowsStreamingWindowBuild::addInput(RowVectorPtr input) { } if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_) { - addPartitionInputs(false); + // Needs to wait the peer group ready for range frame. + if (hasRangeFrame_) { + if (compareRowsWithKeys(previousRow_, newRow, sortKeyInfo_)) { + addPartitionInputs(false); + } + } else { + addPartitionInputs(false); + } } inputRows_.push_back(newRow); diff --git a/velox/exec/RowsStreamingWindowBuild.h b/velox/exec/RowsStreamingWindowBuild.h index c003f1e6a3f9..ceb8ada9b6ac 100644 --- a/velox/exec/RowsStreamingWindowBuild.h +++ b/velox/exec/RowsStreamingWindowBuild.h @@ -63,6 +63,9 @@ class RowsStreamingWindowBuild : public WindowBuild { // does not exist. void addPartitionInputs(bool finished); + // Sets to true if this window node has range frames. + const bool hasRangeFrame_; + // Points to the input rows in the current partition. std::vector inputRows_; diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index 2bf71f57d36d..21df032f58a4 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -141,6 +141,40 @@ TEST_F(WindowTest, rowBasedStreamingWindowOOM) { testWindowBuild(false); } +DEBUG_ONLY_TEST_F(WindowTest, aggWindowResultMismatch) { + auto data = makeRowVector( + {"id", "order_num"}, + {makeFlatVector(4500, [](auto row) { return row; }), + makeConstant(1, 4500)}); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "sum(order_num) over (order by order_num DESC)"}; + + auto plan = PlanBuilder() + .values({data}) + .orderBy({"order_num"}, false) + .streamingWindow(kClauses) + .planNode(); + + std::atomic_bool isStreamCreated{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild", + std::function( + [&](RowsStreamingWindowBuild* windowBuild) { + isStreamCreated.store(true); + })); + + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kPreferredOutputBatchRows, "2") + .config(core::QueryConfig::kMaxOutputBatchRows, "2") + .assertResults( + "SELECT *, sum(order_num) over (order by order_num DESC) FROM tmp"); + ASSERT_TRUE(isStreamCreated.load()); +} + DEBUG_ONLY_TEST_F(WindowTest, rankRowStreamingWindowBuild) { auto data = makeRowVector( {"c1"},