Skip to content

Commit

Permalink
Fix the result mismatch in RowsStreamingWindowBuild (facebookincubato…
Browse files Browse the repository at this point in the history
…r#10979)

Summary:
For a Range frame, it is necessary to ensure that the peer is ready before commencing the window function computation

Pull Request resolved: facebookincubator#10979

Reviewed By: kagamiori

Differential Revision: D62622816

Pulled By: xiaoxmeng

fbshipit-source-id: 1a9911da416c867c9e295242a05d0f33fbc2e22d
  • Loading branch information
JkSelf authored and facebook-github-bot committed Sep 13, 2024
1 parent 17eb5a1 commit e13c591
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 2 deletions.
24 changes: 22 additions & 2 deletions velox/exec/RowsStreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,28 @@

#include "velox/exec/RowsStreamingWindowBuild.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/exec/WindowFunction.h"

namespace facebook::velox::exec {

namespace {
bool hasRangeFrame(const std::shared_ptr<const core::WindowNode>& windowNode) {
for (const auto& function : windowNode->windowFunctions()) {
if (function.frame.type == core::WindowNode::WindowType::kRange) {
return true;
}
}
return false;
}
} // namespace

RowsStreamingWindowBuild::RowsStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection),
hasRangeFrame_(hasRangeFrame(windowNode)) {
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild",
this);
Expand Down Expand Up @@ -68,7 +81,14 @@ void RowsStreamingWindowBuild::addInput(RowVectorPtr input) {
}

if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_) {
addPartitionInputs(false);
// Needs to wait the peer group ready for range frame.
if (hasRangeFrame_) {
if (compareRowsWithKeys(previousRow_, newRow, sortKeyInfo_)) {
addPartitionInputs(false);
}
} else {
addPartitionInputs(false);
}
}

inputRows_.push_back(newRow);
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/RowsStreamingWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class RowsStreamingWindowBuild : public WindowBuild {
// does not exist.
void addPartitionInputs(bool finished);

// Sets to true if this window node has range frames.
const bool hasRangeFrame_;

// Points to the input rows in the current partition.
std::vector<char*> inputRows_;

Expand Down
34 changes: 34 additions & 0 deletions velox/exec/tests/WindowTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,40 @@ TEST_F(WindowTest, rowBasedStreamingWindowOOM) {
testWindowBuild(false);
}

DEBUG_ONLY_TEST_F(WindowTest, aggWindowResultMismatch) {
auto data = makeRowVector(
{"id", "order_num"},
{makeFlatVector<int64_t>(4500, [](auto row) { return row; }),
makeConstant(1, 4500)});

createDuckDbTable({data});

const std::vector<std::string> kClauses = {
"sum(order_num) over (order by order_num DESC)"};

auto plan = PlanBuilder()
.values({data})
.orderBy({"order_num"}, false)
.streamingWindow(kClauses)
.planNode();

std::atomic_bool isStreamCreated{false};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild",
std::function<void(RowsStreamingWindowBuild*)>(
[&](RowsStreamingWindowBuild* windowBuild) {
isStreamCreated.store(true);
}));

AssertQueryBuilder(plan, duckDbQueryRunner_)
.config(core::QueryConfig::kPreferredOutputBatchBytes, "1024")
.config(core::QueryConfig::kPreferredOutputBatchRows, "2")
.config(core::QueryConfig::kMaxOutputBatchRows, "2")
.assertResults(
"SELECT *, sum(order_num) over (order by order_num DESC) FROM tmp");
ASSERT_TRUE(isStreamCreated.load());
}

DEBUG_ONLY_TEST_F(WindowTest, rankRowStreamingWindowBuild) {
auto data = makeRowVector(
{"c1"},
Expand Down

0 comments on commit e13c591

Please sign in to comment.