Skip to content

Adaptive broadcast to partitioned #23206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

gaurav8297
Copy link
Member

@gaurav8297 gaurav8297 commented Aug 31, 2024

Description

Issue: #23182

This rule converts a broadcast join to a partitioned join at runtime which can significantly reduce memory usage and, in some cases, improve performance.

We can fix this kind of query at runtime using this rule.

Example (TPCDS):

set session join_distribution_type='BROADCAST';

SELECT
sum(ss.ss_quantity), sum(ss.ss_list_price), sum(ss.ss_coupon_amt), sum(cs.cs_wholesale_cost), sum(cs.cs_list_price),
sum(cs.cs_sales_price), sum(cs.cs_ext_sales_price),
sum(cs.cs_net_paid_inc_tax), sum(cs.cs_net_paid_inc_ship),
sum(cs.cs_net_profit), sum(cs.cs_ext_tax), sum(cs.cs_coupon_amt),
sum(cs.cs_ext_ship_cost), sum(cs.cs_net_paid), sum(cs.cs_net_paid_inc_tax),
sum(cs.cs_net_paid_inc_ship), sum(cs.cs_call_center_sk),
sum(cs.cs_net_paid_inc_ship_tax), sum(cs.cs_net_profit),
sum(cs.cs_call_center_sk), sum(cs.cs_warehouse_sk), sum(cs.cs_bill_hdemo_sk)
FROM store_sales AS ss
LEFT JOIN catalog_sales AS cs ON (ss.ss_customer_sk=cs.cs_bill_customer_sk AND ss.ss_sold_date_sk = cs.cs_sold_date_sk);

Before:

It will fail due to high memory usage

Query 20240808_203757_00002_ivgw3, FAILED, 8 nodes
Splits: 4,410 total, 4,333 done (98.25%)
1:55 [13.8B rows, 694GB] [120M rows/s, 6.03GB/s]

Query 20240808_203757_00002_ivgw3 failed: Cannot allocate enough memory for task 
20240808_203757_00002_ivgw3.1.44.0. Reported peak memory reservation: 77323190690B. Maximum possible reservation: 77309411328B.

After:

Query 20240808_204327_00005_ivgw3, FINISHED, 8 nodes
Splits: 9,818 total, 9,818 done (100.00%)
41.83 [4.32B rows, 59.7GB] [103M rows/s, 1.43GB/s]

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Aug 31, 2024
@gaurav8297 gaurav8297 force-pushed the gaurav8297/adaptive_broadcast_to_partitioned branch 2 times, most recently from 2296bcb to 857c672 Compare September 5, 2024 06:56
@gaurav8297 gaurav8297 changed the title [WIP] Adaptive broadcast to partitioned Adaptive broadcast to partitioned Sep 5, 2024
@gaurav8297 gaurav8297 marked this pull request as ready for review September 5, 2024 06:57
@gaurav8297 gaurav8297 force-pushed the gaurav8297/adaptive_broadcast_to_partitioned branch from 857c672 to 3631135 Compare September 5, 2024 06:59
@gaurav8297
Copy link
Member Author

@losipiuk I'm still adding more tests. But you can take a look.

@gaurav8297 gaurav8297 requested a review from losipiuk September 5, 2024 07:02
@gaurav8297 gaurav8297 force-pushed the gaurav8297/adaptive_broadcast_to_partitioned branch 4 times, most recently from 0d9a9c0 to 5aa5d0f Compare September 5, 2024 20:17
* RemoteExchangeNodes that can be reused instead of adding a new one. For instance, this will be helpful in
* cases where either side of the join has union nodes.
*/
public class AdaptiveBroadcastToPartitionedJoin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general request: for this and adaptive reordering PRs.
Can you add means to track how often the optimizers trigger.
Let's have metrics for each optimizer which is increased whenever rule triggers.
Also it would be great to list all adaptive optimizers which triggered for a query, with some context (which stage etc in query completion event, so we can do offline analysis).

Copy link
Member Author

@gaurav8297 gaurav8297 Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I'll create a PR for this. We already record metrics through JMX as part of the planner. However, we need to expose that through the query completion event.

if (mustReplicate(node, context)) {
return Result.empty();
}
boolean isExtraRemoteExchangeNeededAtProbeSide = captures.getOptional(LEFT_EXCHANGE_NODE).isEmpty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does having a remote exchange on left side ensures that we do not need to repartition again.
You are checking if there is an remote exchange with FIXED_ARBITRARY_DISTRIBUTION. Where do we guarantee that data is actually distributed according to join keys?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh - ok the meaning of that is:

  • are we changing existing exchange or adding new one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment

{
DataSize joinMaxBroadcastTableSize = getJoinMaxBroadcastTableSize(context.getSession());

PlanNodeWithCost replicatedJoinCost = getJoinNodeWithCost(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reming me where are we taking into account that some of the progress has been made alrady with current plan shape, and if we replan we need to start from scratch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we only consider the cost of adding a new exchange.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We consider the subplan finished if it's 20% done and do not change it, and if it's less than 20%, we will always restart. There's an open issue around handling speculative execution with adaptive planning: #23180

@gaurav8297 gaurav8297 force-pushed the gaurav8297/adaptive_broadcast_to_partitioned branch 2 times, most recently from 217f09e to 8a157c7 Compare September 12, 2024 05:42
@@ -484,8 +486,12 @@ public PlanNode visitRemoteSource(RemoteSourceNode node, RewriteContext<Fragment
else if (node.getExchangeType() == ExchangeNode.Type.REPARTITION) {
for (SubPlan child : completedChildren) {
PartitioningScheme partitioningScheme = child.getFragment().getOutputPartitioningScheme();
PartitioningHandle handle = partitioningScheme.getPartitioning().getHandle();
if (handle.equals(FIXED_BROADCAST_DISTRIBUTION)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@losipiuk Can you take a look at this? This seems hacky but I'm not sure what's the best way to do this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root cause of that is that handle not only describes how data is distributed, but also how it is consumed, which is not important if you look at fragment output.
But I do not see an easy way out of that without turning lot's of things around.

Maybe this is the best we can get.

Can you explain more the proposed change with extra PlanNode which can be used for adaptive planning in place of RemoteSourceNode. How does that simplify things?

cc: @martint

Copy link
Member Author

@gaurav8297 gaurav8297 Sep 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to solve the problem by introducing a new plan node specifically for the AdaptivePlan source. Instead of using RemoteSourceNode, we can create a new node that includes additional information like partitionHandle, which can be used during the PlanFragmenter. This extra information would be added through Adaptive planner rules. By doing this, we eliminate the need for an if condition, simplifying the PlanFragmenter code.

Additionally, currently, we are using RemoteSourceNode during AdaptivePlanning which is not intended for that use case.

cc @martint @losipiuk

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - that sounds fine to me.

@gaurav8297 gaurav8297 force-pushed the gaurav8297/adaptive_broadcast_to_partitioned branch from 8a157c7 to 7b7b18a Compare September 12, 2024 06:01
if (node.getScope() == LOCAL) {
return rewriteSources(this, node, globalContext);
}
verify(node.getScope() == REMOTE && node.getType() == REPLICATE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should you also verify that node.partitioningColumns were not set or that using buildSymbols as partitioning columns is compatible with we had previously (buildSymbols would need to be subset of previous set)

Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Oct 21, 2024
Copy link

Closing this pull request, as it has been stale for six weeks. Feel free to re-open at any time.

@github-actions github-actions bot closed this Nov 12, 2024
@mosabua
Copy link
Member

mosabua commented Nov 13, 2024

Added performance label @martint fyi

Also please reopen if you plan to continue @gaurav8297 and @losipiuk and add stale-ignore label

@losipiuk losipiuk reopened this Nov 13, 2024
@losipiuk losipiuk added the stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed. label Nov 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed performance stale stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed.
Development

Successfully merging this pull request may close these issues.

3 participants