Skip to content

Commit

Permalink
Allow pushdown of dynamic filters through HashAggregation (facebookin…
Browse files Browse the repository at this point in the history
…cubator#10988)

Summary:
Currently, in HashAggregation, only when the aggregation is distinct,
the channel of the group key is added to the identity projections. This
results in a situation where, when the group key matches the join key,
the dynamic filters generated by HashProbe cannot be pushed down through
HashAggregation. This pr supports dynamic filters push down through
HashAggregation when group key and join key is the same.

This optimization applies to plans like:

```
- HashProbe(k1 = k2,...)
  - HashAggregation(k1, sum(x))
    - TableScan
```

Pull Request resolved: facebookincubator#10988

Reviewed By: kevinwilfong

Differential Revision: D63035508

Pulled By: Yuhta

fbshipit-source-id: 7fb336adb17154c54658a7db518a94b79157efce
  • Loading branch information
xiaodouchen authored and facebook-github-bot committed Sep 20, 2024
1 parent 023be97 commit 7463706
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 5 deletions.
6 changes: 2 additions & 4 deletions velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,8 @@ void HashAggregation::initialize() {
core::AggregationNode::stepName(aggregationNode_->step()));
}

if (isDistinct_) {
for (auto i = 0; i < hashers.size(); ++i) {
identityProjections_.emplace_back(hashers[i]->channel(), i);
}
for (auto i = 0; i < hashers.size(); ++i) {
identityProjections_.emplace_back(hashers[i]->channel(), i);
}

std::optional<column_index_t> groupIdChannel;
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,10 @@ class Operator : public BaseRuntimeStatWriter {
}

/// Returns a list of identity projections, e.g. columns that are projected
/// as-is possibly after applying a filter.
/// as-is possibly after applying a filter. Used to allow pushdown of dynamic
/// filters generated by HashProbe into the TableScan. Examples of identity
/// projections: all columns in FilterProject(only filters), group-by keys in
/// HashAggregation.
const std::vector<IdentityProjection>& identityProjections() const {
return identityProjections_;
}
Expand Down
76 changes: 76 additions & 0 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,13 @@ class HashJoinTest : public HiveConnectorTestBase {
return stats[operatorIndex].runtimeStats[statsName];
}

// Get the operator index from the plan node id. Only used in the probe-side
// pipeline. The plan node id starts from "1" and the operator index starts
// from 0. Plan node IDs map to operators 1:1.
static int32_t getOperatorIndex(const core::PlanNodeId& planNodeId) {
return folly::to<int32_t>(planNodeId) - 1;
}

static core::JoinType flipJoinType(core::JoinType joinType) {
switch (joinType) {
case core::JoinType::kInner:
Expand Down Expand Up @@ -5177,6 +5184,75 @@ TEST_F(HashJoinTest, dynamicFiltersAppliedToPreloadedSplits) {
.run();
}

TEST_F(HashJoinTest, dynamicFiltersPushDownThroughAgg) {
const int32_t numRowsProbe = 300;
const int32_t numRowsBuild = 100;

// Create probe data
std::vector<RowVectorPtr> probeVectors{makeRowVector({
makeFlatVector<int32_t>(numRowsProbe, [&](auto row) { return row - 10; }),
makeFlatVector<int64_t>(numRowsProbe, folly::identity),
})};
std::shared_ptr<TempFilePath> probeFile = TempFilePath::create();
writeToFile(probeFile->getPath(), probeVectors);

// Create build data
std::vector<RowVectorPtr> buildVectors{makeRowVector(
{"u0"}, {makeFlatVector<int32_t>(numRowsBuild, [&](auto row) {
return 35 + 2 * (row + numRowsBuild / 5);
})})};

createDuckDbTable("t", probeVectors);
createDuckDbTable("u", buildVectors);

auto probeType = ROW({"c0", "c1"}, {INTEGER(), BIGINT()});
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto buildSide =
PlanBuilder(planNodeIdGenerator).values(buildVectors).planNode();

// Inner join.
core::PlanNodeId scanNodeId;
core::PlanNodeId joinNodeId;
core::PlanNodeId aggNodeId;
auto op = PlanBuilder(planNodeIdGenerator, pool_.get())
.tableScan(probeType)
.capturePlanNodeId(scanNodeId)
.partialAggregation({"c0"}, {"sum(c1)"})
.capturePlanNodeId(aggNodeId)
.hashJoin(
{"c0"},
{"u0"},
buildSide,
"",
{"c0", "a0"},
core::JoinType::kInner)
.capturePlanNodeId(joinNodeId)
.planNode();

SplitInput splitInput = {
{scanNodeId, {Split(makeHiveConnectorSplit(probeFile->getPath()))}}};
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.planNode(std::move(op))
.inputSplits(splitInput)
.injectSpill(false)
.checkSpillStats(false)
.referenceQuery("SELECT c0, sum(c1) FROM t, u WHERE c0 = u0 group by c0")
.verifier([&](const std::shared_ptr<Task>& task, bool hasSpill) {
auto planStats = toPlanStats(task->taskStats());
auto dynamicFilterStats = planStats.at(scanNodeId).dynamicFilterStats;
ASSERT_EQ(
1, getFiltersProduced(task, getOperatorIndex(joinNodeId)).sum);
ASSERT_EQ(
1, getFiltersAccepted(task, getOperatorIndex(scanNodeId)).sum);
ASSERT_LT(
getInputPositions(task, getOperatorIndex(aggNodeId)), numRowsProbe);
ASSERT_EQ(
dynamicFilterStats.producerNodeIds,
std::unordered_set({joinNodeId}));
})
.run();
}

// Verify the size of the join output vectors when projecting build-side
// variable-width column.
TEST_F(HashJoinTest, memoryUsage) {
Expand Down

0 comments on commit 7463706

Please sign in to comment.