Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
4ba36c0
chore: add test
duongcongtoai Feb 3, 2025
79eaca3
chore: more progress
duongcongtoai Feb 10, 2025
7ed0831
temp
duongcongtoai Mar 18, 2025
cc97879
Merge remote-tracking branch 'origin/main' into 14554-unnest-subquery…
duongcongtoai Mar 18, 2025
5096937
Merge remote-tracking branch 'origin/main' into 14554-unnest-subquery…
duongcongtoai Apr 10, 2025
68fd9ca
chore: some work
duongcongtoai Apr 16, 2025
ace332e
chore: some work on indexed algebra
duongcongtoai Apr 27, 2025
da8980c
chore: more progress
duongcongtoai May 4, 2025
483e3ac
chore: impl projection pull up
duongcongtoai May 4, 2025
f14b145
chore: complete unnesting simple subquery
duongcongtoai May 6, 2025
0cd8143
chore: correct join condition
duongcongtoai May 8, 2025
cc3e01c
chore: handle exist query
duongcongtoai May 8, 2025
9b5daa2
test: in sq test
duongcongtoai May 10, 2025
f26baf8
test: exist with no dependent column
duongcongtoai May 10, 2025
37852c1
test: exist with dependent columns
duongcongtoai May 10, 2025
2544478
Merge remote-tracking branch 'origin/main' into 14554-subquery-unnest…
duongcongtoai May 10, 2025
e984a55
chore: remove redundant clone
duongcongtoai May 11, 2025
94aba08
feat: dummy implementation for aggregation
duongcongtoai May 13, 2025
0f039fe
feat: handle count bug
duongcongtoai May 15, 2025
898bdc4
feat: add sq alias step
duongcongtoai May 16, 2025
1a600b6
test: simple count decorrelate
duongcongtoai May 16, 2025
6ce21b3
chore: some work to support multiple subqueries per level
duongcongtoai May 17, 2025
67923d4
feat: support multiple subqueries decorrelation untested
duongcongtoai May 19, 2025
64538cc
feat: correct node rewriting rule
duongcongtoai May 19, 2025
957403f
fix: subquery alias
duongcongtoai May 19, 2025
a465459
fix: adjust test case expectation
duongcongtoai May 19, 2025
479ae64
feat: convert sq to dependent joins
duongcongtoai May 24, 2025
2171e52
feat: impl dependent join rewriter
duongcongtoai May 24, 2025
9d26437
chore: clean up unused function
duongcongtoai May 24, 2025
24d1223
chore: clean up debug slt
duongcongtoai May 24, 2025
3533cd1
chore: simple logical plan type for dependent join
duongcongtoai May 24, 2025
e1002f8
fix: recursive dependent join rewrite
duongcongtoai May 24, 2025
7ba92f1
Merge remote-tracking branch 'origin/main' into 14554-subquery-unnest…
duongcongtoai May 24, 2025
e3c77d6
chore: some more note on further implementation
duongcongtoai May 24, 2025
1ae0926
chore: lint
duongcongtoai May 24, 2025
d15c2aa
chore: clippy
duongcongtoai May 24, 2025
e5baf2c
fix: test
duongcongtoai May 25, 2025
11dbb80
doc: draw diagram
duongcongtoai May 25, 2025
5856213
fix: proto
duongcongtoai May 25, 2025
a3f11a8
chore: revert unrelated change
duongcongtoai May 25, 2025
e2d9d14
chore: lint
duongcongtoai May 25, 2025
b298426
fix: subtrait
duongcongtoai May 25, 2025
cb1a757
fix: subtrait again
duongcongtoai May 25, 2025
baef066
fix: fail test
duongcongtoai May 25, 2025
a07b3b0
chore: clippy
duongcongtoai May 25, 2025
32db3a9
chore: add depth and data_type to correlated columns
duongcongtoai May 26, 2025
50d26f3
chore: rm snapshot
duongcongtoai May 26, 2025
b09e370
Merge branch 'main' into 14554-subquery-unnest-framework
duongcongtoai May 26, 2025
28dc7a4
feat: support alias and join
duongcongtoai May 26, 2025
cf830cb
feat: add lateral join fields to dependent join
duongcongtoai May 26, 2025
95994da
feat: rewrite lateral join
duongcongtoai May 27, 2025
9745a4f
feat: rewrite projection
duongcongtoai May 28, 2025
c2bf4d3
refactor: split rewrite logic
duongcongtoai May 28, 2025
c083501
feat: impl other api of logical plan for dependent join
duongcongtoai May 28, 2025
9512ccc
chore: rm debug file
duongcongtoai May 28, 2025
9c53364
dummy implement for flatten dependent join
irenjj Jun 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,12 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: Analyze must be root of the plan"
)
}
LogicalPlan::DependentJoin(_) => {
return internal_err!(
"Optimizors have not completely remove dependent join"
)
}
LogicalPlan::DelimGet(_) => todo!()
};
Ok(exec_node)
}
Expand Down
64 changes: 63 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::{

use super::dml::InsertOp;
use super::plan::{ColumnUnnestList, ExplainFormat};
use super::DependentJoin;
use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use datafusion_common::display::ToStringifiedPlan;
Expand Down Expand Up @@ -717,7 +718,7 @@ impl LogicalPlanBuilder {
)
})
.collect::<Result<Vec<_>>>()?;
curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
curr_plan.with_new_exprs_inputs(curr_plan.expressions(), new_inputs)
}
}
}
Expand Down Expand Up @@ -880,6 +881,47 @@ impl LogicalPlanBuilder {
))))
}

/// Build a dependent join provided a subquery plan
/// this function should only be used by the optimizor
/// a dependent join node will provides all columns belonging to the LHS
/// and one additional column as the result of evaluating the subquery on the RHS
/// under the name "subquery_name.output"
pub fn dependent_join(
self,
right: LogicalPlan,
correlated_columns: Vec<(usize, Expr)>,
subquery_expr: Option<Expr>,
subquery_depth: usize,
subquery_name: String,
lateral_join_condition: Option<(JoinType, Expr)>,
) -> Result<Self> {
let left = self.build()?;
let schema = left.schema();
// TODO: for lateral join, output schema is similar to a normal join
let qualified_fields = schema
.iter()
.map(|(q, f)| (q.cloned(), Arc::clone(f)))
.chain(
subquery_expr
.iter()
.map(|expr| subquery_output_field(&subquery_name, expr)),
)
.collect();
let metadata = schema.metadata().clone();
let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;

Ok(Self::new(LogicalPlan::DependentJoin(DependentJoin {
schema: DFSchemaRef::new(dfschema),
left: Arc::new(left),
right: Arc::new(right),
correlated_columns,
subquery_expr,
subquery_name,
subquery_depth,
lateral_join_condition,
})))
}

/// Apply a join to `right` using explicitly specified columns and an
/// optional filter expression.
///
Expand Down Expand Up @@ -1544,6 +1586,26 @@ fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
)
}

fn subquery_output_field(
subquery_alias: &str,
subquery_expr: &Expr,
) -> (Option<TableReference>, Arc<Field>) {
// TODO: check nullability
let field = match subquery_expr {
Expr::InSubquery(_) => Arc::new(Field::new("output", DataType::Boolean, false)),
Expr::Exists(_) => Arc::new(Field::new("output", DataType::Boolean, false)),
Expr::ScalarSubquery(sq) => {
let data_type = sq.subquery.schema().field(0).data_type().clone();
Arc::new(Field::new("output", data_type, false))
}
_ => {
unreachable!()
}
};

(Some(TableReference::bare(subquery_alias)), field)
}

/// Creates a schema for a join operation.
/// The fields from the left side are first
pub fn build_join_schema(
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {

object
}
LogicalPlan::DependentJoin(..) => json!({}),
LogicalPlan::Join(Join {
on: ref keys,
filter,
Expand Down Expand Up @@ -650,6 +651,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
"StructColumn": expr_vec_fmt!(struct_type_columns),
})
}
LogicalPlan::DelimGet(_) => todo!(),
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ pub use ddl::{
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct,
DistinctOn, EmptyRelation, Explain, ExplainFormat, Extension, FetchType, Filter,
Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
projection_schema, Aggregate, Analyze, ColumnUnnestList, DependentJoin,
DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, ExplainFormat,
Extension, FetchType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, Projection, RecursiveQuery, Repartition, SkipType, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
Unnest, Values, Window,DelimGet,
};
pub use statement::{
Deallocate, Execute, Prepare, SetVariable, Statement, TransactionAccessMode,
Expand Down
Loading
Loading