Skip to content
This repository was archived by the owner on Jan 20, 2023. It is now read-only.

Commit 62b0349

Browse files
authored
Remove hard-coded PartitionMode from Ballista serde (apache#637)
1 parent e86f8e9 commit 62b0349

File tree

5 files changed

+53
-10
lines changed

5 files changed

+53
-10
lines changed

ballista/rust/core/proto/ballista.proto

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,12 +565,17 @@ message CsvScanExecNode {
565565
repeated string filename = 8;
566566
}
567567

568+
enum PartitionMode {
569+
COLLECT_LEFT = 0;
570+
PARTITIONED = 1;
571+
}
572+
568573
message HashJoinExecNode {
569574
PhysicalPlanNode left = 1;
570575
PhysicalPlanNode right = 2;
571576
repeated JoinOn on = 3;
572577
JoinType join_type = 4;
573-
578+
PartitionMode partition_mode = 6;
574579
}
575580

576581
message PhysicalColumn {

ballista/rust/core/src/serde/physical_plan/from_proto.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,12 +356,24 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
356356
protobuf::JoinType::Semi => JoinType::Semi,
357357
protobuf::JoinType::Anti => JoinType::Anti,
358358
};
359+
let partition_mode =
360+
protobuf::PartitionMode::from_i32(hashjoin.partition_mode)
361+
.ok_or_else(|| {
362+
proto_error(format!(
363+
"Received a HashJoinNode message with unknown PartitionMode {}",
364+
hashjoin.partition_mode
365+
))
366+
})?;
367+
let partition_mode = match partition_mode {
368+
protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
369+
protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
370+
};
359371
Ok(Arc::new(HashJoinExec::try_new(
360372
left,
361373
right,
362374
on,
363375
&join_type,
364-
PartitionMode::CollectLeft,
376+
partition_mode,
365377
)?))
366378
}
367379
PhysicalPlanType::ShuffleReader(shuffle_reader) => {

ballista/rust/core/src/serde/physical_plan/mod.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,29 @@ mod roundtrip_tests {
8888
Column::new("col", schema_right.index_of("col")?),
8989
)];
9090

91-
roundtrip_test(Arc::new(HashJoinExec::try_new(
92-
Arc::new(EmptyExec::new(false, Arc::new(schema_left))),
93-
Arc::new(EmptyExec::new(false, Arc::new(schema_right))),
94-
on,
95-
&JoinType::Inner,
96-
PartitionMode::CollectLeft,
97-
)?))
91+
let schema_left = Arc::new(schema_left);
92+
let schema_right = Arc::new(schema_right);
93+
for join_type in &[
94+
JoinType::Inner,
95+
JoinType::Left,
96+
JoinType::Right,
97+
JoinType::Full,
98+
JoinType::Anti,
99+
JoinType::Semi,
100+
] {
101+
for partition_mode in
102+
&[PartitionMode::Partitioned, PartitionMode::CollectLeft]
103+
{
104+
roundtrip_test(Arc::new(HashJoinExec::try_new(
105+
Arc::new(EmptyExec::new(false, schema_left.clone())),
106+
Arc::new(EmptyExec::new(false, schema_right.clone())),
107+
on.clone(),
108+
&join_type,
109+
*partition_mode,
110+
)?))?;
111+
}
112+
}
113+
Ok(())
98114
}
99115

100116
#[test]

ballista/rust/core/src/serde/physical_plan/to_proto.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datafusion::physical_plan::expressions::{
3434
use datafusion::physical_plan::expressions::{CastExpr, TryCastExpr};
3535
use datafusion::physical_plan::filter::FilterExec;
3636
use datafusion::physical_plan::hash_aggregate::AggregateMode;
37-
use datafusion::physical_plan::hash_join::HashJoinExec;
37+
use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
3838
use datafusion::physical_plan::hash_utils::JoinType;
3939
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
4040
use datafusion::physical_plan::parquet::ParquetExec;
@@ -143,13 +143,18 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
143143
JoinType::Semi => protobuf::JoinType::Semi,
144144
JoinType::Anti => protobuf::JoinType::Anti,
145145
};
146+
let partition_mode = match exec.partition_mode() {
147+
PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
148+
PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
149+
};
146150
Ok(protobuf::PhysicalPlanNode {
147151
physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
148152
protobuf::HashJoinExecNode {
149153
left: Some(Box::new(left)),
150154
right: Some(Box::new(right)),
151155
on,
152156
join_type: join_type.into(),
157+
partition_mode: partition_mode.into(),
153158
},
154159
))),
155160
})

datafusion/src/physical_plan/hash_join.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ impl HashJoinExec {
177177
&self.join_type
178178
}
179179

180+
/// The partitioning mode of this hash join
181+
pub fn partition_mode(&self) -> &PartitionMode {
182+
&self.mode
183+
}
184+
180185
/// Calculates column indices and left/right placement on input / output schemas and jointype
181186
fn column_indices_from_schema(&self) -> ArrowResult<Vec<ColumnIndex>> {
182187
let (primary_is_left, primary_schema, secondary_schema) = match self.join_type {

0 commit comments

Comments
 (0)