Skip to content

feat: Finalize support for RightMark join + Mark join swap #16488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/common/src/join_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ impl JoinType {
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
| JoinType::RightMark
)
}
}
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ async fn test_left_mark_join_1k_filtered() {
.await
}

// todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support
#[tokio::test]
async fn test_right_mark_join_1k() {
JoinFuzzTestCase::new(
Expand All @@ -314,7 +313,7 @@ async fn test_right_mark_join_1k() {
JoinType::RightMark,
None,
)
.run_test(&[NljHj], false)
.run_test(&[HjSmj, NljHj], false)
.await
}

Expand All @@ -326,7 +325,7 @@ async fn test_right_mark_join_1k_filtered() {
JoinType::RightMark,
Some(Box::new(col_lt_col_filter)),
)
.run_test(&[NljHj], false)
.run_test(&[HjSmj, NljHj], false)
.await
}

Expand Down
59 changes: 58 additions & 1 deletion datafusion/core/tests/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,61 @@ async fn test_join_with_swap_semi() {
}
}

#[tokio::test]
async fn test_join_with_swap_mark() {
let join_types = [JoinType::LeftMark];
for join_type in join_types {
let (big, small) = create_big_and_small();

let join = HashJoinExec::try_new(
Arc::clone(&big),
Arc::clone(&small),
vec![(
Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()),
Arc::new(Column::new_with_schema("small_col", &small.schema()).unwrap()),
)],
None,
&join_type,
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNothing,
)
.unwrap();

let original_schema = join.schema();

let optimized_join = JoinSelection::new()
.optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();

let swapped_join = optimized_join
.as_any()
.downcast_ref::<HashJoinExec>()
.expect(
"A proj is not required to swap columns back to their original order",
);

assert_eq!(swapped_join.schema().fields().len(), 2);
assert_eq!(
swapped_join
.left()
.partition_statistics(None)
.unwrap()
.total_byte_size,
Precision::Inexact(8192)
);
assert_eq!(
swapped_join
.right()
.partition_statistics(None)
.unwrap()
.total_byte_size,
Precision::Inexact(2097152)
);
assert_eq!(original_schema, swapped_join.schema());
}
}

/// Compare the input plan with the plan after running the probe order optimizer.
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr) => {
Expand Down Expand Up @@ -577,8 +632,10 @@ async fn test_nl_join_with_swap(join_type: JoinType) {
join_type,
case::left_semi(JoinType::LeftSemi),
case::left_anti(JoinType::LeftAnti),
case::left_mark(JoinType::LeftMark),
case::right_semi(JoinType::RightSemi),
case::right_anti(JoinType::RightAnti)
case::right_anti(JoinType::RightAnti),
case::right_mark(JoinType::RightMark)
)]
#[tokio::test]
async fn test_nl_join_with_swap_no_proj(join_type: JoinType) {
Expand Down
5 changes: 4 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1652,7 +1652,10 @@ pub fn build_join_schema(
);

let (schema1, schema2) = match join_type {
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left, right),
JoinType::Right
| JoinType::RightSemi
| JoinType::RightAnti
| JoinType::RightMark => (left, right),
_ => (right, left),
};

Expand Down
15 changes: 10 additions & 5 deletions datafusion/physical-optimizer/src/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ pub fn hash_join_swap_subrule(
| JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::LeftMark
)
{
input = swap_join_according_to_unboundedness(hash_join)?;
Expand All @@ -549,10 +550,10 @@ pub fn hash_join_swap_subrule(

/// This function swaps sides of a hash join to make it runnable even if one of
/// its inputs are infinite. Note that this is not always possible; i.e.
/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and
/// [`JoinType::RightSemi`] can not run with an unbounded left side, even if
/// we swap join sides. Therefore, we do not consider them here.
/// This function is crate public as it is useful for downstream projects
/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`],
/// [`JoinType::RightSemi`], and [`JoinType::RightMark`] can not run with an
/// unbounded left side, even if we swap join sides. Therefore, we do not consider
/// them here. This function is crate public as it is useful for downstream projects
/// to implement, or experiment with, their own join selection rules.
pub(crate) fn swap_join_according_to_unboundedness(
hash_join: &HashJoinExec,
Expand All @@ -562,7 +563,11 @@ pub(crate) fn swap_join_according_to_unboundedness(
match (*partition_mode, *join_type) {
(
_,
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | JoinType::Full,
JoinType::Right
| JoinType::RightSemi
| JoinType::RightAnti
| JoinType::RightMark
| JoinType::Full,
) => internal_err!("{join_type} join cannot be swapped for unbounded input."),
(PartitionMode::Partitioned, _) => {
hash_join.swap_inputs(PartitionMode::Partitioned)
Expand Down
5 changes: 4 additions & 1 deletion datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,13 +618,16 @@ impl HashJoinExec {
partition_mode,
self.null_equality(),
)?;
// In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again

// In case of Anti/Semi/Mark joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
if matches!(
self.join_type(),
JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
| JoinType::RightMark
) || self.projection.is_some()
{
Ok(Arc::new(new_join))
Expand Down
4 changes: 3 additions & 1 deletion datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,16 @@ impl NestedLoopJoinExec {
),
)?;

// For Semi/Anti joins, swap result will produce same output schema,
// For Semi/Anti/Mark joins, swap result will produce same output schema,
// no need to wrap them into additional projection
let plan: Arc<dyn ExecutionPlan> = if matches!(
self.join_type(),
JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
| JoinType::RightMark
) || self.projection.is_some()
{
Arc::new(new_join)
Expand Down
Loading