Skip to content

Commit

Permalink
Re-Optimize cross joins with single build batch (facebookincubator#10726
Browse files Browse the repository at this point in the history
)

Summary:
Pull Request resolved: facebookincubator#10726

As a follow up from:
.
facebookincubator#10690
.
We can also optimize the case of a cross join with a single build batch to
simply wrap probe and build inputs into a dictionary, then avoiding copying
and not producing outputs vectors that are too small.
.
Check in-code header comments for more details.
.
This is a re-submission of the PR
.
facebookincubator#10695
.
which was recently reverted. There is additional logic required to add this
optimization for joins other than cross-joins. Re-adding only for cross-joins
for now to unblock.

Will add it for non-cross-joins along with more tests as a follow up.

Reviewed By: Yuhta

Differential Revision: D61164691

fbshipit-source-id: 3f7dfadfdaad464f22d0dea45f556b17435fb7c8
  • Loading branch information
pedroerp authored and facebook-github-bot committed Aug 13, 2024
1 parent ba1bb9e commit 5c55c63
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 75 deletions.
169 changes: 119 additions & 50 deletions velox/exec/NestedLoopJoinProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,6 @@ bool NestedLoopJoinProbe::getBuildData(ContinueFuture* future) {
}

buildVectors_ = std::move(buildData);

for (const auto& build : buildVectors_.value()) {
buildRowCount_ += build->size();
}
buildSideEmpty_ = (buildRowCount_ == 0);
return true;
}

Expand Down Expand Up @@ -276,13 +271,7 @@ RowVectorPtr NestedLoopJoinProbe::generateOutput() {

bool NestedLoopJoinProbe::advanceProbe() {
if (hasProbedAllBuildData()) {
// For cross joins, if there is a single record on the build side, we return
// batches containing all probe records from `input_` at a time.
if (isCrossJoin() && buildRowCount_ == 1) {
probeRow_ = input_->size();
} else {
++probeRow_;
}
probeRow_ += probeRowCount_;
probeRowHasMatch_ = false;
buildIndex_ = 0;

Expand Down Expand Up @@ -319,9 +308,8 @@ bool NestedLoopJoinProbe::addToOutput() {
}

// If this is a cross join, there is no filter to evaluate. We can just
// return the output vector directly, which is composed of the build
// projections at `probeRow_` (as constants), and current vector of the
// build side. Also don't need to bother about adding mismatched rows.
// return the output vector directly. Also don't need to bother about adding
// mismatched rows.
if (isCrossJoin()) {
output_ = getNextCrossProductBatch(
currentBuild, outputType_, identityProjections_, buildProjections_);
Expand Down Expand Up @@ -384,13 +372,13 @@ void NestedLoopJoinProbe::prepareOutput() {
}
std::vector<VectorPtr> localColumns(outputType_->size());

probeIndices_ = allocateIndices(outputBatchSize_, pool());
rawProbeIndices_ = probeIndices_->asMutable<vector_size_t>();
probeOutputIndices_ = allocateIndices(outputBatchSize_, pool());
rawProbeOutputIndices_ = probeOutputIndices_->asMutable<vector_size_t>();

for (const auto& projection : identityProjections_) {
localColumns[projection.outputChannel] = BaseVector::wrapInDictionary(
{},
probeIndices_,
probeOutputIndices_,
outputBatchSize_,
input_->childAt(projection.inputChannel));
}
Expand Down Expand Up @@ -435,47 +423,128 @@ RowVectorPtr NestedLoopJoinProbe::getNextCrossProductBatch(
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections) {
VELOX_CHECK_GT(buildVector->size(), 0);

// TODO: For now we only enable the build optimizations in cross-joins, but we
// should allow it for other join types as well.
if (isCrossJoin() && isSingleBuildRow()) {
return genCrossProductSingleBuildRow(
buildVector, outputType, probeProjections, buildProjections);
} else if (isCrossJoin() && isSingleBuildVector()) {
return genCrossProductSingleBuildVector(
buildVector, outputType, probeProjections, buildProjections);
} else {
return genCrossProductMultipleBuildVectors(
buildVector, outputType, probeProjections, buildProjections);
}
}

RowVectorPtr NestedLoopJoinProbe::genCrossProductSingleBuildRow(
const RowVectorPtr& buildVector,
const RowTypePtr& outputType,
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections) {
VELOX_CHECK(isSingleBuildRow());

std::vector<VectorPtr> projectedChildren(outputType->size());
size_t numOutputRows = 0;
size_t numOutputRows = input_->size();
probeRowCount_ = input_->size();

// If it's a cross join and there is a single build record, we use the entire
// probe batch `input_` and the single build record wrapped as a constant.
if (isCrossJoin() && buildRowCount_ == 1) {
numOutputRows = input_->size();
// Project columns from the probe side.
projectChildren(
projectedChildren, input_, probeProjections, numOutputRows, nullptr);

// Project columns from the probe side.
projectChildren(
projectedChildren, input_, probeProjections, numOutputRows, nullptr);
// Wrap projections from the build side as constants.
for (const auto [inputChannel, outputChannel] : buildProjections) {
projectedChildren[outputChannel] = BaseVector::wrapInConstant(
numOutputRows, 0, buildVector->childAt(inputChannel));
}
return std::make_shared<RowVector>(
pool(), outputType, nullptr, numOutputRows, std::move(projectedChildren));
}

// Wrap projections from the build side as constants.
for (const auto [inputChannel, outputChannel] : buildProjections) {
projectedChildren[outputChannel] = BaseVector::wrapInConstant(
numOutputRows, 0, buildVector->childAt(inputChannel));
}
RowVectorPtr NestedLoopJoinProbe::genCrossProductSingleBuildVector(
const RowVectorPtr& buildVector,
const RowTypePtr& outputType,
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections) {
VELOX_CHECK(isSingleBuildVector());
std::vector<VectorPtr> projectedChildren(outputType->size());
vector_size_t buildRowCount = buildVector->size();

// Calculate how many probe rows we can cover without exceeding
// outputBatchSize_.
if (buildRowCount > outputBatchSize_) {
probeRowCount_ = 1;
} else {
numOutputRows = buildVector->size();

// Project columns from the build side.
projectChildren(
projectedChildren,
buildVector,
buildProjections,
numOutputRows,
nullptr);

// Wrap projections from the probe side as constants.
for (const auto [inputChannel, outputChannel] : probeProjections) {
projectedChildren[outputChannel] = BaseVector::wrapInConstant(
numOutputRows, probeRow_, input_->childAt(inputChannel));
}
probeRowCount_ = std::min(
(vector_size_t)outputBatchSize_ / buildRowCount,
input_->size() - probeRow_);
}
size_t numOutputRows = probeRowCount_ * buildRowCount;

// Generate probe dictionary indices.
auto rawProbeIndices =
initializeRowNumberMapping(probeIndices_, numOutputRows, pool());
for (auto i = 0; i < probeRowCount_; ++i) {
std::fill(
rawProbeIndices.begin() + i * buildRowCount,
rawProbeIndices.begin() + (i + 1) * buildRowCount,
probeRow_ + i);
}

// Generate build dictionary indices.
auto rawBuildIndices_ =
initializeRowNumberMapping(buildIndices_, numOutputRows, pool());
for (auto i = 0; i < probeRowCount_; ++i) {
std::iota(
rawBuildIndices_.begin() + i * buildRowCount,
rawBuildIndices_.begin() + (i + 1) * buildRowCount,
0);
}

projectChildren(
projectedChildren,
input_,
probeProjections,
numOutputRows,
probeIndices_);
projectChildren(
projectedChildren,
buildVector,
buildProjections,
numOutputRows,
buildIndices_);

return std::make_shared<RowVector>(
pool(), outputType, nullptr, numOutputRows, std::move(projectedChildren));
}

RowVectorPtr NestedLoopJoinProbe::genCrossProductMultipleBuildVectors(
const RowVectorPtr& buildVector,
const RowTypePtr& outputType,
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections) {
std::vector<VectorPtr> projectedChildren(outputType->size());
size_t numOutputRows = buildVector->size();
probeRowCount_ = 1;

// Project columns from the build side.
projectChildren(
projectedChildren, buildVector, buildProjections, numOutputRows, nullptr);

// Wrap projections from the probe side as constants.
for (const auto [inputChannel, outputChannel] : probeProjections) {
projectedChildren[outputChannel] = BaseVector::wrapInConstant(
numOutputRows, probeRow_, input_->childAt(inputChannel));
}

return std::make_shared<RowVector>(
pool(), outputType, nullptr, numOutputRows, std::move(projectedChildren));
}

void NestedLoopJoinProbe::addOutputRow(vector_size_t buildRow) {
// Probe side is always a dictionary; just populate the index.
rawProbeIndices_[numOutputRows_] = probeRow_;
rawProbeOutputIndices_[numOutputRows_] = probeRow_;

// For the build side, we accumulate the ranges to copy, then copy all of them
// at once. If records are consecutive and can have a single copy range run.
Expand All @@ -501,7 +570,7 @@ void NestedLoopJoinProbe::copyBuildValues(const RowVectorPtr& buildVector) {

void NestedLoopJoinProbe::addProbeMismatchRow() {
// Probe side is always a dictionary; just populate the index.
rawProbeIndices_[numOutputRows_] = probeRow_;
rawProbeOutputIndices_[numOutputRows_] = probeRow_;

// Null out build projections.
for (const auto& projection : buildProjections_) {
Expand Down Expand Up @@ -534,7 +603,7 @@ void NestedLoopJoinProbe::finishProbeInput() {
// From now one we finished processing the probe side. Check now if this is a
// right or full outer join, and hence we may need to start emitting buid
// mismatch records.
if (!needsBuildMismatch(joinType_) || buildSideEmpty_) {
if (!needsBuildMismatch(joinType_) || isBuildSideEmpty()) {
setState(ProbeOperatorState::kFinish);
return;
}
Expand Down Expand Up @@ -590,7 +659,7 @@ RowVectorPtr NestedLoopJoinProbe::getBuildMismatchedOutput(
// product but the build or probe side is empty, there could still be
// mismatched rows from the other side.
if (matched.isAllSelected() ||
(isCrossJoin() && !probeSideEmpty_ && !buildSideEmpty_)) {
(isCrossJoin() && !probeSideEmpty_ && !isBuildSideEmpty())) {
return nullptr;
}

Expand Down
96 changes: 72 additions & 24 deletions velox/exec/NestedLoopJoinProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ namespace facebook::velox::exec {
/// To produce output, the operator processes each probe record from probe
/// input, using the following steps:
///
/// 1. Materialize a cross product by wrapping each probe record (as a constant)
/// to each build vector.
/// 1. Materialize a cross-product batch across probe and build.
/// 2. Evaluate the join condition.
/// 3. Add key matches to the output.
/// 4. Once all build vectors are processed for a particular probe row, check if
Expand All @@ -49,15 +48,23 @@ namespace facebook::velox::exec {
/// collect all build matches at the end, and emit any records that haven't
/// been matched by any of the peers.
///
/// The output always contains dictionaries wrapped around probe columns, and
/// copies for build columns. The only exception are cases when the build side
/// contains a single record. In that case, each probe batch will be wrapped
/// with the single build record (as a constant).
/// There are three different cases for the generation of cross-product across
/// probe and build (#1 above):
///
/// The buid-side copies are done lazily; it first accumulates the ranges to be
/// copied, then performs the copies in batch, column-by-column. It produces at
/// most `outputBatchSize_` records, but it may produce fewer since the output
/// needs to follow the probe vector boundaries.
/// a) If build side has a single row, simply wrap that row as a constant and
/// produce it along with probe batches.
///
/// b) If build side has a single batch, produce a dictionary wrapped across
/// probe and build rows, covering as many probe rows as allowed by
/// `outputBatchSize_` (maximum record to produce per batch).
///
/// c) If build side has multiple vectors, take one probe row are at a time,
/// wrapping it as a constant, and produce it along with build batches.
///
/// If needed, buid-side copies are done lazily; it first accumulates the ranges
/// to be copied, then performs the copies in batch, column-by-column. It
/// produces at most `outputBatchSize_` records, but it may produce fewer since
/// the output needs to follow the probe vector boundaries.
class NestedLoopJoinProbe : public Operator {
public:
NestedLoopJoinProbe(
Expand Down Expand Up @@ -140,14 +147,8 @@ class NestedLoopJoinProbe : public Operator {
}

// Generates the next batch of a cross product between probe and build. It
// handles two cases:
//
// #1. Use the current probe record being processed (`probeRow_` from
// `input_`) for probe projections, and the columns from buildVector for build
// projections.
// #2. For cross joins, if there is a single build record, it uses the columns
// from the current probe batch (`input_`), and the single build record
// wrapped as a constant.
// should be used as the entry point, and will internally delegate to one of
// the three functions below.
//
// Output projections can be specified so that this function can be used to
// generate both filter input and actual output (in case there is no join
Expand All @@ -158,6 +159,30 @@ class NestedLoopJoinProbe : public Operator {
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections);

// Generates a cross product batch when there is a single build row (probe
// batch plus build row as a constant).
RowVectorPtr genCrossProductSingleBuildRow(
const RowVectorPtr& buildVector,
const RowTypePtr& outputType,
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections);

// Generates a cross product batch when there is a single build vector (probe
// and build batch wrapped in a dictionary).
RowVectorPtr genCrossProductSingleBuildVector(
const RowVectorPtr& buildVector,
const RowTypePtr& outputType,
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections);

// As a fallback, process the current probe row to as much build data as
// possible (probe row as constant, and flat copied data for build records).
RowVectorPtr genCrossProductMultipleBuildVectors(
const RowVectorPtr& buildVector,
const RowTypePtr& outputType,
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections);

// Add a single record to `output_` based on buildRow from buildVector, and
// the current probeRow and probe vector (input_). Probe side projections are
// zero-copy (dictionary indices), and build side projections are marked to be
Expand Down Expand Up @@ -209,6 +234,24 @@ class NestedLoopJoinProbe : public Operator {
return joinCondition_ == nullptr;
}

// If build has a single vector, we can wrap probe and build batches into
// dictionaries and produce as many combinations of probe and build rows,
// until `numOutputRows_` is filled.
bool isSingleBuildVector() const {
return buildVectors_->size() == 1;
}

// If there are no incoming records in the build side.
bool isBuildSideEmpty() const {
return buildVectors_->empty();
}

// If build has a single row, we can simply add it as a constant to probe
// batches.
bool isSingleBuildRow() const {
return isSingleBuildVector() && buildVectors_->front()->size() == 1;
}

// Wraps rows of 'data' that are not selected in 'matched' and projects
// to the output according to 'projections'. 'nullProjections' is used to
// create null column vectors in output for outer join. 'unmatchedMapping' is
Expand Down Expand Up @@ -238,9 +281,15 @@ class NestedLoopJoinProbe : public Operator {
// Number of output rows in the current output batch.
vector_size_t numOutputRows_{0};

// Dictionary indices for probe columns.
// Dictionary indices for probe columns used to generate cross-product.
BufferPtr probeIndices_;
vector_size_t* rawProbeIndices_;

// Dictionary indices for probe columns for output vector.
BufferPtr probeOutputIndices_;
vector_size_t* rawProbeOutputIndices_;

// Dictionary indices for build columns.
BufferPtr buildIndices_;

// Join condition expression.

Expand Down Expand Up @@ -268,6 +317,9 @@ class NestedLoopJoinProbe : public Operator {
// Probe row being currently processed (related to `input_`).
vector_size_t probeRow_{0};

// How many probe rows are being processed by the current batch.
vector_size_t probeRowCount_{1};

// Whether the current probeRow_ has produces a match. Used for left and full
// outer joins.
bool probeRowHasMatch_{false};
Expand All @@ -287,10 +339,6 @@ class NestedLoopJoinProbe : public Operator {

// Stores the data for build vectors (right side of the join).
std::optional<std::vector<RowVectorPtr>> buildVectors_;
bool buildSideEmpty_{false};

// Total number of records from the build side (across all vectors).
vector_size_t buildRowCount_{0};

// Index into `buildVectors_` for the build vector being currently processed.
size_t buildIndex_{0};
Expand Down
Loading

0 comments on commit 5c55c63

Please sign in to comment.