Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions velox/exec/WindowPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@
const std::vector<std::pair<column_index_t, core::SortOrder>>& sortKeyInfo)
: WindowPartition(data, {}, inputMapping, sortKeyInfo, true, false) {}

WindowPartition::WindowPartition(
const std::vector<column_index_t>& inputMapping,
const std::vector<std::pair<column_index_t, core::SortOrder>>& sortKeyInfo,
bool partial)
: partial_(partial),
data_(nullptr),
partition_(),

Check warning on line 64 in velox/exec/WindowPartition.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

readability-redundant-member-init

initializer for member 'partition_' is redundant
complete_(!partial),
inputMapping_(inputMapping),
sortKeyInfo_(sortKeyInfo) {}

void WindowPartition::addRows(const std::vector<char*>& rows) {
checkPartial();
rows_.insert(rows_.end(), rows.begin(), rows.end());
Expand Down
35 changes: 26 additions & 9 deletions velox/exec/WindowPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ namespace facebook::velox::exec {

class WindowPartition {
public:
virtual ~WindowPartition() = default;

/// The WindowPartition is used by the Window operator and WindowFunction
/// objects to access the underlying data and columns of a partition of rows.
/// The WindowPartition is constructed by WindowBuild from the input data.
Expand Down Expand Up @@ -57,20 +59,21 @@ class WindowPartition {
sortKeyInfo);

/// Adds remaining input 'rows' for a partial window partition.
void addRows(const std::vector<char*>& rows);
virtual void addRows(const std::vector<char*>& rows);

/// Removes the first 'numRows' in 'rows_' from a partial window partition
/// after been processed.
void removeProcessedRows(vector_size_t numRows);
virtual void removeProcessedRows(vector_size_t numRows);

/// Returns the number of rows in the current WindowPartition.
vector_size_t numRows() const {
virtual vector_size_t numRows() const {
return partition_.size();
}

/// Returns the number of rows in a window partition remaining for data
/// processing.
vector_size_t numRowsForProcessing(vector_size_t partitionOffset) const;
virtual vector_size_t numRowsForProcessing(
vector_size_t partitionOffset) const;

bool complete() const {
return complete_;
Expand All @@ -90,7 +93,7 @@ class WindowPartition {
/// Copies the values at 'columnIndex' into 'result' (starting at
/// 'resultOffset') for the absolute partition row positions in the
/// 'rowNumbers' array. Negative row positions are copied as nulls.
void extractColumn(
virtual void extractColumn(
int32_t columnIndex,
folly::Range<const vector_size_t*> rowNumbers,
vector_size_t resultOffset,
Expand All @@ -99,7 +102,7 @@ class WindowPartition {
/// Copies the values at 'columnIndex' into 'result' (starting at
/// 'resultOffset') for 'numRows' starting at positions 'partitionOffset'
/// in the partition input data.
void extractColumn(
virtual void extractColumn(
int32_t columnIndex,
vector_size_t partitionOffset,
vector_size_t numRows,
Expand All @@ -109,7 +112,7 @@ class WindowPartition {
/// Extracts null positions at 'columnIndex' into 'nullsBuffer' for
/// 'numRows' starting at positions 'partitionOffset' in the partition
/// input data.
void extractNulls(
virtual void extractNulls(
int32_t columnIndex,
vector_size_t partitionOffset,
vector_size_t numRows,
Expand Down Expand Up @@ -141,7 +144,7 @@ class WindowPartition {
/// keys). So peerStart and peerEnd of the last row of this call are returned
/// to be passed as prevPeerStart and prevPeerEnd to the subsequent
/// call to computePeerBuffers.
std::pair<vector_size_t, vector_size_t> computePeerBuffers(
virtual std::pair<vector_size_t, vector_size_t> computePeerBuffers(
vector_size_t start,
vector_size_t end,
vector_size_t prevPeerStart,
Expand All @@ -161,7 +164,7 @@ class WindowPartition {
/// @param validFrames SelectivityVector to keep track of valid frames.
/// This function unselect rows in validFrames where the frame bounds are NaN
/// that are invalid.
void computeKRangeFrameBounds(
virtual void computeKRangeFrameBounds(
bool isStartBound,
bool isPreceding,
column_index_t frameColumn,
Expand All @@ -171,6 +174,20 @@ class WindowPartition {
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) const;

protected:
// Constructs a non-RowContainer-backed partition for subclasses that provide
// storage-specific accessors.
WindowPartition(
const std::vector<column_index_t>& inputMapping,
const std::vector<std::pair<column_index_t, core::SortOrder>>&
sortKeyInfo,
bool partial);

const std::vector<std::pair<column_index_t, core::SortOrder>>& sortKeyInfo()
const {
return sortKeyInfo_;
}

private:
WindowPartition(
RowContainer* data,
Expand Down
12 changes: 12 additions & 0 deletions velox/exec/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ target_link_libraries(
Folly::follybenchmark
)

add_executable(velox_rows_streaming_window_benchmark RowsStreamingWindowBenchmark.cpp)

target_link_libraries(
velox_rows_streaming_window_benchmark
velox_aggregates
velox_exec
velox_exec_test_lib
velox_vector_test_lib
velox_window
Folly::follybenchmark
)

add_executable(velox_mark_sorted_benchmark MarkSortedBenchmark.cpp)

target_link_libraries(
Expand Down
204 changes: 204 additions & 0 deletions velox/exec/benchmarks/RowsStreamingWindowBenchmark.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <folly/Benchmark.h>
#include <folly/init/Init.h>

#include "velox/common/memory/Memory.h"
#include "velox/core/QueryConfig.h"
#include "velox/exec/Cursor.h"
#include "velox/exec/OperatorType.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h"
#include "velox/functions/prestosql/window/WindowFunctionsRegistration.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::test;

namespace {

constexpr int32_t kRowsPerVector = 10'000;
constexpr int32_t kOutputBatchRows = 1'000;

class RowsStreamingWindowBenchmark : public VectorTestBase {
public:
void addBenchmarks() {
addBenchmarksForSize("10K", 1, 10'000);
addBenchmarksForSize("100K", 10, 25'000);
addBenchmarksForSize("1M", 100, 25'000);
}

private:
struct TestCase {

Check warning on line 47 in velox/exec/benchmarks/RowsStreamingWindowBenchmark.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

cppcoreguidelines-pro-type-member-init

constructor does not initialize these fields: numRows
std::string name;
std::vector<RowVectorPtr> data;
core::PlanNodePtr plan;
int64_t numRows;
};

std::vector<RowVectorPtr> makeData(
int32_t numVectors,
int32_t rowsPerPartition) {
std::vector<RowVectorPtr> data;
data.reserve(numVectors);
const auto rowType = ROW({"p", "s", "v"}, {INTEGER(), INTEGER(), BIGINT()});
for (auto vectorIndex = 0; vectorIndex < numVectors; ++vectorIndex) {
data.push_back(makeRowVector(
rowType->names(),
{
makeFlatVector<int32_t>(
kRowsPerVector,
[vectorIndex, rowsPerPartition](auto row) {
const auto globalRow = vectorIndex * kRowsPerVector + row;
return globalRow / rowsPerPartition;
}),
makeFlatVector<int32_t>(
kRowsPerVector,
[vectorIndex, rowsPerPartition](auto row) {
const auto globalRow = vectorIndex * kRowsPerVector + row;
return globalRow % rowsPerPartition;
}),
makeFlatVector<int64_t>(
kRowsPerVector, [](auto row) { return row % 97; }),
}));
}
return data;
}

void addBenchmarksForSize(
const std::string& sizeName,
int32_t numVectors,
int32_t rowsPerPartition) {
auto data = makeData(numVectors, rowsPerPartition);
addBenchmark(
"rowsStreamingWindowRank_" + sizeName,
data,
{"rank() over (partition by p order by s)"});
addBenchmark(
"rowsStreamingWindowSum_" + sizeName,
data,
{"sum(v) over (partition by p order by s)"});
addBenchmark(
"rowsStreamingWindowRankAndSum_" + sizeName,
data,
{"rank() over (partition by p order by s)",
"sum(v) over (partition by p order by s)"});
addBenchmark(
"rowsStreamingWindowSevenFunctions_" + sizeName,
data,
{"rank() over (partition by p order by s)",
"dense_rank() over (partition by p order by s)",
"row_number() over (partition by p order by s)",
"sum(v) over (partition by p order by s)",
"count(v) over (partition by p order by s)",
"min(v) over (partition by p order by s)",
"max(v) over (partition by p order by s)"});
}

void addBenchmark(
const std::string& name,
const std::vector<RowVectorPtr>& data,
const std::vector<std::string>& windowFunctions) {
auto testCase = std::make_unique<TestCase>();
testCase->name = name;
testCase->data = data;
testCase->numRows = data.size() * kRowsPerVector;

Check warning on line 120 in velox/exec/benchmarks/RowsStreamingWindowBenchmark.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

bugprone-narrowing-conversions

narrowing conversion from 'size_type' (aka 'unsigned long') to signed type 'int64_t' (aka 'long') is implementation-defined
testCase->plan = exec::test::PlanBuilder()
.values(testCase->data)
.streamingWindow(windowFunctions)
.planNode();

const auto* test = testCase.get();
folly::addBenchmark(
__FILE__,
name,
[this, test](folly::UserCounters& counters, unsigned iterations) {
CpuWallTiming windowTiming;
uint64_t numResultRows = 0;
for (auto i = 0; i < iterations; ++i) {
numResultRows += runOnce(*test, windowTiming);
}

BENCHMARK_SUSPEND {
counters["rows"] = folly::UserMetric(
static_cast<int64_t>(test->numRows),
folly::UserMetric::Type::METRIC);
counters["windowCpu"] = folly::UserMetric(
static_cast<double>(windowTiming.cpuNanos) / iterations / 1e9,
folly::UserMetric::Type::TIME);
counters["windowWall"] = folly::UserMetric(
static_cast<double>(windowTiming.wallNanos) / iterations / 1e9,
folly::UserMetric::Type::TIME);
folly::doNotOptimizeAway(numResultRows);
}
return iterations;
});

cases_.push_back(std::move(testCase));
}

uint64_t runOnce(const TestCase& test, CpuWallTiming& windowTiming) const {
std::unique_ptr<TaskCursor> cursor;
BENCHMARK_SUSPEND {
CursorParameters params;
params.planNode = test.plan;
params.serialExecution = true;
params.queryConfigs = {
{core::QueryConfig::kPreferredOutputBatchRows,
std::to_string(kOutputBatchRows)}};
cursor = TaskCursor::create(params);
}

uint64_t numResultRows = 0;
while (cursor->moveNext()) {
numResultRows += cursor->current()->size();
}

BENCHMARK_SUSPEND {
VELOX_CHECK_EQ(numResultRows, test.numRows);
const auto stats = cursor->task()->taskStats();
for (const auto& pipeline : stats.pipelineStats) {
for (const auto& op : pipeline.operatorStats) {
if (op.operatorType == OperatorType::kWindow) {
windowTiming.add(op.addInputTiming);
windowTiming.add(op.getOutputTiming);
}
}
}
}
return numResultRows;
}

std::vector<std::unique_ptr<TestCase>> cases_;
};

std::unique_ptr<RowsStreamingWindowBenchmark> benchmark;

} // namespace

int main(int argc, char** argv) {
folly::Init init(&argc, &argv);

Check warning on line 195 in velox/exec/benchmarks/RowsStreamingWindowBenchmark.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-const-correctness

variable 'init' of type 'folly::Init' can be declared 'const'
memory::MemoryManager::initialize(memory::MemoryManager::Options{});
aggregate::prestosql::registerAllAggregateFunctions();
window::prestosql::registerAllWindowFunctions();
benchmark = std::make_unique<RowsStreamingWindowBenchmark>();
benchmark->addBenchmarks();
folly::runBenchmarks();
benchmark.reset();
return 0;
}
1 change: 1 addition & 0 deletions velox/exec/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ set(
AssignUniqueIdTest.cpp
FilterProjectTest.cpp
AsyncConnectorTest.cpp
VectorWindowPartitionTest.cpp
)

set(
Expand Down
Loading
Loading