Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 47 additions & 12 deletions BOSSVeloxEngine/Source/BOSSVeloxEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,8 @@ getColumns(ComplexExpression&& expression, memory::MemoryPool* pool) {
[pool, &indices, &columnName]<typename T>(boss::Span<T>&& typedSpan) -> VectorPtr {
if constexpr(std::is_same_v<T, int32_t> || std::is_same_v<T, int64_t> ||
std::is_same_v<T, double_t> || std::is_same_v<T, int32_t const> ||
std::is_same_v<T, int64_t const> || std::is_same_v<T, double_t const>) {
std::is_same_v<T, int64_t const> ||
std::is_same_v<T, double_t const>) {
return spanToVelox<T>(std::move(typedSpan), pool, indices);
} else {
throw std::runtime_error(
Expand Down Expand Up @@ -809,7 +810,7 @@ static std::vector<std::string> expressionToProjections(ComplexExpression&& e) {
}

PlanBuilder Engine::buildOperatorPipeline(
ComplexExpression&& e, std::vector<std::pair<core::PlanNodeId, size_t>>& scanIds,
ComplexExpression&& e, std::vector<std::tuple<core::PlanNodeId, size_t, size_t>>& scanIds,
memory::MemoryPool& pool, std::shared_ptr<core::PlanNodeIdGenerator>& planNodeIdGenerator,
int& tableCnt, int& joinCnt) {
if(e.getHead().getName() == "Table" || e.getHead().getName() == "Gather" ||
Expand All @@ -829,6 +830,7 @@ PlanBuilder Engine::buildOperatorPipeline(

core::PlanNodeId scanId;
auto numSpans = spanRowCountVec.size();
auto numRows = std::accumulate(spanRowCountVec.begin(), spanRowCountVec.end(), 0);
auto plan = PlanBuilder(planNodeIdGenerator)
.startTableScan()
.outputType(tableSchema)
Expand All @@ -838,7 +840,7 @@ PlanBuilder Engine::buildOperatorPipeline(
.assignments(assignmentsMap)
.endTableScan()
.capturePlanNodeId(scanId);
scanIds.emplace_back(scanId, numSpans);
scanIds.emplace_back(scanId, numSpans, numRows);
return std::move(plan);
}
if(e.getHead().getName() == "Project") {
Expand Down Expand Up @@ -889,7 +891,12 @@ PlanBuilder Engine::buildOperatorPipeline(
it == itEnd ? std::vector<std::string>{} : expressionToOneSideKeys(std::move(secondArg));
auto asExpr = get<ComplexExpression>(it == itEnd ? std::move(secondArg) : std::move(*it));
auto aggregates = expressionToProjections(std::move(asExpr));
return inputPlan.singleAggregation(groupKeysStr, aggregates);
if(maxThreads < 2) {
return inputPlan.singleAggregation(groupKeysStr, aggregates);
}
return inputPlan.partialAggregation(groupKeysStr, aggregates)
.localPartition(groupKeysStr)
.finalAggregation();
}
if(e.getHead() == "Order"_ || e.getHead() == "OrderBy"_ || e.getHead() == "Sort"_ ||
e.getHead() == "SortBy"_) {
Expand All @@ -898,7 +905,8 @@ PlanBuilder Engine::buildOperatorPipeline(
auto inputPlan = buildOperatorPipeline(get<ComplexExpression>(std::move(*it++)), scanIds, pool,
planNodeIdGenerator, tableCnt, joinCnt);
auto groupKeysStr = expressionToOneSideKeys(std::move(*it));
return inputPlan.orderBy(groupKeysStr, true).localMerge(groupKeysStr);
return inputPlan.localPartition(std::vector<std::string>{})
.orderBy(groupKeysStr, false);//.localMerge(groupKeysStr);
}
if(e.getHead() == "Top"_ || e.getHead() == "TopN"_) {
auto [head, unused_, dynamics, unused2_] = std::move(e).decompose();
Expand All @@ -908,7 +916,10 @@ PlanBuilder Engine::buildOperatorPipeline(
auto groupKeysStr = expressionToOneSideKeys(std::move(*it++));
auto limit = std::holds_alternative<int32_t>(*it) ? std::get<int32_t>(std::move(*it))
: std::get<int64_t>(std::move(*it));
return inputPlan.topN(groupKeysStr, limit, true).localMerge(groupKeysStr);
return inputPlan.localPartition(std::vector<std::string>{})
.orderBy(groupKeysStr, false)//.localMerge(groupKeysStr)
.limit(0, limit, false);
//.topN(groupKeysStr, limit, true).localMerge(groupKeysStr);
}
if(e.getHead() == "Let"_) {
auto [head, unused_, dynamics, unused2_] = std::move(e).decompose();
Expand Down Expand Up @@ -963,19 +974,38 @@ boss::Expression Engine::evaluate(boss::ComplexExpression&& e) {

if(e.getHead().getName() == "Set") {
auto param = std::get<Symbol>(e.getDynamicArguments()[0]);
if(param == "maxThreads"_) {
if(param == "MaxThreads"_) {
maxThreads = std::holds_alternative<int32_t>(e.getDynamicArguments()[1])
? std::get<int32_t>(e.getDynamicArguments()[1])
: std::get<int64_t>(e.getDynamicArguments()[1]);
return true;
}
if(param == "internalBatchNumRows"_) {
if(param == "NumDrivers"_) {
numDrivers = std::holds_alternative<int32_t>(e.getDynamicArguments()[1])
? std::get<int32_t>(e.getDynamicArguments()[1])
: std::get<int64_t>(e.getDynamicArguments()[1]);
return true;
}
if(param == "InputBatchNumSplits"_) {
inputBatchNumSplits = std::holds_alternative<int32_t>(e.getDynamicArguments()[1])
? std::get<int32_t>(e.getDynamicArguments()[1])
: std::get<int64_t>(e.getDynamicArguments()[1]);
return true;
}
if(param == "InputBatchNumRows"_) {
// overrides inputBatchNumSplits if > 0
inputBatchNumRows = std::holds_alternative<int32_t>(e.getDynamicArguments()[1])
? std::get<int32_t>(e.getDynamicArguments()[1])
: std::get<int64_t>(e.getDynamicArguments()[1]);
return true;
}
if(param == "InternalBatchNumRows"_) {
internalBatchNumRows = std::holds_alternative<int32_t>(e.getDynamicArguments()[1])
? std::get<int32_t>(e.getDynamicArguments()[1])
: std::get<int64_t>(e.getDynamicArguments()[1]);
return true;
}
if(param == "minimumOutputBatchNumRows"_) {
if(param == "MinimumOutputBatchNumRows"_) {
minimumOutputBatchNumRows = std::holds_alternative<int32_t>(e.getDynamicArguments()[1])
? std::get<int32_t>(e.getDynamicArguments()[1])
: std::get<int64_t>(e.getDynamicArguments()[1]);
Expand All @@ -1000,7 +1030,7 @@ boss::Expression Engine::evaluate(boss::ComplexExpression&& e) {

boss::expressions::ExpressionArguments columns;
auto evalAndAddOutputSpans = [&, this](auto&& e) {
auto scanIds = std::vector<std::pair<core::PlanNodeId, size_t>>{};
auto scanIds = std::vector<std::tuple<core::PlanNodeId, size_t, size_t>>{};
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
int tableCnt = 0;
int joinCnt = 0;
Expand All @@ -1009,7 +1039,11 @@ boss::Expression Engine::evaluate(boss::ComplexExpression&& e) {

auto params = std::make_unique<CursorParameters>();
params->planNode = plan.planNode();
params->maxDrivers = std::max(1, (maxThreads / (joinCnt + 1)) - 1);
if(numDrivers > 0) {
params->maxDrivers = numDrivers;
} else {
params->maxDrivers = std::max(1, (maxThreads / (joinCnt + 1)) - 1);
}
params->copyResult = false;
std::shared_ptr<folly::Executor> executor;
if(maxThreads < 2) {
Expand All @@ -1028,7 +1062,8 @@ boss::Expression Engine::evaluate(boss::ComplexExpression&& e) {
std::make_shared<core::QueryCtx>(executor.get(), core::QueryConfig{std::move(config)});

std::unique_ptr<TaskCursor> cursor;
auto results = veloxRunQueryParallel(*params, cursor, scanIds);
auto results =
veloxRunQueryParallel(*params, cursor, scanIds, inputBatchNumRows, inputBatchNumSplits);
if(!cursor) {
throw std::runtime_error("Query terminated with error");
}
Expand Down
7 changes: 5 additions & 2 deletions BOSSVeloxEngine/Source/BOSSVeloxEngine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ __declspec(dllexport) void reset();

// #define USE_NEW_TABLE_FORMAT

// #define TAKE_OWNERSHIP_OF_TASK_POOLS // requires velox patch to Task.h
#define TAKE_OWNERSHIP_OF_TASK_POOLS // requires velox patch to Task.h

// #define DebugInfo

Expand Down Expand Up @@ -48,12 +48,15 @@ class Engine {
threadPools_;

int32_t maxThreads = 1;
int32_t numDrivers = 0; // overrides maxThreads if > 0
int32_t inputBatchNumSplits = 64;
int32_t inputBatchNumRows = 0; // overrides numSplits if > 0
int32_t internalBatchNumRows = 0;
int32_t minimumOutputBatchNumRows = 0;
bool hashAdaptivityEnabled = true;

PlanBuilder buildOperatorPipeline(ComplexExpression&& e,
std::vector<std::pair<core::PlanNodeId, size_t>>& scanIds,
std::vector<std::tuple<core::PlanNodeId, size_t, size_t>>& scanIds,
memory::MemoryPool& pool,
std::shared_ptr<core::PlanNodeIdGenerator>& planNodeIdGenerator,
int& tableCnt, int& joinCnt);
Expand Down
55 changes: 47 additions & 8 deletions BOSSVeloxEngine/Source/BossConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,50 @@ void BossDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
currentSplit_ = std::dynamic_pointer_cast<BossConnectorSplit>(split);
VELOX_CHECK_NOT_NULL(currentSplit_, "Wrong type of split for BossDataSource.");

spanCountIdx_ = currentSplit_->partNumber;
splitOffset_ = 0;
splitEnd_ = bossSpanRowCountVec_.at(spanCountIdx_) - splitOffset_;
int totalRows = std::accumulate(bossSpanRowCountVec_.begin(), bossSpanRowCountVec_.end(), 0);
float rowsPerPart = (float)totalRows / currentSplit_->totalParts;

int currentSplitRowStart = rowsPerPart * currentSplit_->partNumber;
int currentSplitRowEnd = rowsPerPart * (currentSplit_->partNumber + 1);

if(currentSplitRowStart >= totalRows) {
spanCountIdx_ = currentSplit_->totalParts - 1;
splitOffset_ = splitEnd_ = 0;
return;
}

if(currentSplit_->totalParts <= bossSpanRowCountVec_.size()) {
spanCountIdx_ = currentSplit_->partNumber;
splitOffset_ = 0;
splitEnd_ = bossSpanRowCountVec_.at(spanCountIdx_);
} else {
spanCountIdx_ = 0;
for(auto const& currRowCount : bossSpanRowCountVec_) {
if(currentSplitRowStart < currRowCount) {
// Found the span that contains the start of the current split.
if(currentSplitRowStart < rowsPerPart) {
splitOffset_ = 0; // a little bigger to match the beginning of the span
} else {
splitOffset_ = currentSplitRowStart;
}
if(currentSplitRowEnd > currRowCount) {
splitEnd_ = currRowCount; // a little smaller to match the end of the span
} else {
splitEnd_ = currentSplitRowEnd;
}
break;
}
spanCountIdx_++;
currentSplitRowStart -= currRowCount;
currentSplitRowEnd -= currRowCount;
}
}

#ifdef DebugInfo
std::cout << "addSplit for table " << bossTableName_ << std::endl;
std::cout << " totalRows: " << totalRows << std::endl;
std::cout << " totalParts: " << currentSplit_->totalParts << std::endl;
std::cout << " partNumber: " << currentSplit_->partNumber << std::endl;
std::cout << " bossSpanRowCountVec_.size(): " << bossSpanRowCountVec_.size() << std::endl;
std::cout << " spanCountIdx_: " << spanCountIdx_ << std::endl;
std::cout << " splitOffset_: " << splitOffset_ << std::endl;
Expand All @@ -66,10 +104,6 @@ void BossDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
}

RowVectorPtr BossDataSource::getBossData(uint64_t length) {
#ifdef DebugInfo
std::cout << "getBossData: spanCountIdx_=" << spanCountIdx_ << " splitOffset_=" << splitOffset_
<< " length=" << length << std::endl;
#endif
assert(splitOffset_ <= INT_MAX);
assert(length <= INT_MAX);

Expand All @@ -87,9 +121,14 @@ RowVectorPtr BossDataSource::getBossData(uint64_t length) {
std::optional<RowVectorPtr> BossDataSource::next(uint64_t size, ContinueFuture& /*future*/) {
VELOX_CHECK_NOT_NULL(currentSplit_, "No split to process. Call addSplit() first.");

auto maxRows = std::min(size, (splitEnd_ - splitOffset_));
// auto maxRows = std::min(size, (splitEnd_ - splitOffset_));
auto maxRows = splitEnd_ - splitOffset_;
auto outputVector = getBossData(maxRows);

#ifdef DebugInfo
std::cout << "requested size: " << size << ", maxRows: " << maxRows << std::endl;
#endif // DebugInfo

// If the split is exhausted.
if(!outputVector || outputVector->size() == 0) {
currentSplit_ = nullptr;
Expand Down
15 changes: 11 additions & 4 deletions BOSSVeloxEngine/Source/BridgeVelox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,22 @@ std::vector<RowVectorPtr> myReadCursor(CursorParameters const& params,

std::vector<RowVectorPtr>
veloxRunQueryParallel(CursorParameters const& params, std::unique_ptr<TaskCursor>& cursor,
std::vector<std::pair<core::PlanNodeId, size_t>> const& scanIds) {
std::vector<std::tuple<core::PlanNodeId, size_t, size_t>> const& scanIds,
size_t batchSize, size_t numSplits) {
try {
bool noMoreSplits = false;
auto addSplits = [&](exec::Task* task) {
if(!noMoreSplits) {
for(auto const& [scanId, numSpans] : scanIds) {
for(size_t i = 0; i < numSpans; ++i) {
for(auto const& [scanId, numSpans, totalNumRows] : scanIds) {
if(batchSize > 0) {
numSplits = 2 + totalNumRows / batchSize;
}
if(numSplits < numSpans) {
numSplits = numSpans;
}
for(size_t i = 0; i < numSplits; ++i) {
task->addSplit(scanId, exec::Split(std::make_shared<BossConnectorSplit>(
kBossConnectorId, numSpans, i)));
kBossConnectorId, numSplits, i)));
}
task->noMoreSplits(scanId);
}
Expand Down
3 changes: 2 additions & 1 deletion BOSSVeloxEngine/Source/BridgeVelox.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ BufferPtr importFromBossAsOwnerBuffer(BossArray&& bossArray);

std::vector<RowVectorPtr>
veloxRunQueryParallel(CursorParameters const& params, std::unique_ptr<TaskCursor>& cursor,
std::vector<std::pair<core::PlanNodeId, size_t>> const& scanIds);
std::vector<std::tuple<core::PlanNodeId, size_t, size_t>> const& scanIds,
size_t numRows, size_t numSplits);

void veloxPrintResults(std::vector<RowVectorPtr> const& results);

Expand Down
30 changes: 26 additions & 4 deletions Benchmarks/BOSSBenchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ static int BENCHMARK_NUM_WARMPUP_ITERATIONS = 3;
static bool MONETDB_MULTITHREADING = true;
static int DUCKDB_MAX_THREADS = 100;

static int BOSS_MAX_THREADS = 20;
static int VELOX_NUM_DRIVERS = 0; // > 0 : overrides BOSS_MAX_THREADS
static int VELOX_NUM_SPLITS = 512;
static int VELOX_BATCH_SIZE = 100000; // > 0: overrides VELOX_NUM_SPLITS

static bool USE_FIXED_POINT_NUMERIC_TYPE = false;

static bool DISABLE_MMAP_CACHE = false;
Expand Down Expand Up @@ -153,6 +158,10 @@ static void initBOSSEngine_TPCH(int dataSize, int64_t storageBlockSize) {
checkForErrors(eval("Set"_("ArrayFireEngineCopyDataIn"_, BENCHMARK_DATA_COPY_IN)));
checkForErrors(eval("Set"_("ArrayFireEngineCopyDataOut"_, BENCHMARK_DATA_COPY_OUT)));
checkForErrors(eval("Set"_("DisableGatherOperator"_, DISABLE_GATHER_OPERATOR)));
checkForErrors(eval("Set"_("MaxThreads"_, BOSS_MAX_THREADS)));
checkForErrors(eval("Set"_("NumDrivers"_, VELOX_NUM_DRIVERS)));
checkForErrors(eval("Set"_("InputBatchNumSplits"_, VELOX_NUM_SPLITS)));
checkForErrors(eval("Set"_("InputBatchNumRows"_, VELOX_BATCH_SIZE)));

checkForErrors(
eval("CreateTable"_("LINEITEM"_, "l_orderkey"_, "l_partkey"_, "l_suppkey"_, "l_linenumber"_,
Expand Down Expand Up @@ -819,7 +828,6 @@ static auto& monetdbQueries() {
" nation,"s
" o_year desc;"s},
{TPCH_Q18, "select"s
" c_name,"s
" c_custkey,"s
" o_orderkey,"s
" o_orderdate,"s
Expand Down Expand Up @@ -943,7 +951,6 @@ static auto& duckdbQueries() {
" nation,"
" o_year DESC;"},
{TPCH_Q18, "SELECT"
" c_name,"
" c_custkey,"
" o_orderkey,"
" o_orderdate,"
Expand Down Expand Up @@ -1254,6 +1261,22 @@ void initAndRunBenchmarks(int argc, char** argv) {
if(++i < argc) {
DUCKDB_MAX_THREADS = atoi(argv[i]);
}
} else if(std::string("--max-threads") == argv[i]) {
if(++i < argc) {
BOSS_MAX_THREADS = atoi(argv[i]);
}
} else if(std::string("--num-drivers") == argv[i]) {
if(++i < argc) {
VELOX_NUM_DRIVERS = atoi(argv[i]);
}
} else if(std::string("--velox-num-splits") == argv[i]) {
if(++i < argc) {
VELOX_NUM_SPLITS = atoi(argv[i]);
}
} else if(std::string("--velox-batch-size") == argv[i]) {
if(++i < argc) {
VELOX_BATCH_SIZE = atoi(argv[i]);
}
} else if(std::string("--monetdb-enable-multithreading") == argv[i]) {
MONETDB_MULTITHREADING = true;
} else if(std::string("--fixed-point-numeric-type") == argv[i]) {
Expand All @@ -1269,8 +1292,7 @@ void initAndRunBenchmarks(int argc, char** argv) {
}
}
// register TPC-H benchmarks
for(int dataSize :
std::vector<int>{1, 10, 100, 1000, 2000, 5000, 10000, 20000, 50000, 100000}) {
for(int dataSize : std::vector<int>{1, 10, 100, 1000, 2000, 5000, 10000, 20000, 50000, 100000}) {
for(int engine = ENGINE_START; engine < ENGINE_END; ++engine) {
for(int64_t blockSize :
(BENCHMARK_STORAGE_BLOCK_SIZE && engine == BOSS
Expand Down