Skip to content

Commit 219de0c

Browse files
committed
fix issue where CTE could not be referenced more than 1 time
wip: fixes after rebase but tpcds_physical_q54 keeps overflowing its stack
1 parent 38e95dd commit 219de0c

File tree

7 files changed

+150
-59
lines changed

7 files changed

+150
-59
lines changed

datafusion/core/src/physical_planner.rs

+54-27
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
2020
use std::collections::HashMap;
2121
use std::fmt::Write;
22-
use std::sync::Arc;
22+
use std::sync::atomic::AtomicI32;
23+
use std::sync::{Arc, OnceLock};
2324

2425
use crate::datasource::file_format::arrow::ArrowFormat;
2526
use crate::datasource::file_format::avro::AvroFormat;
@@ -89,8 +90,8 @@ use datafusion_expr::expr::{
8990
use datafusion_expr::expr_rewriter::unnormalize_cols;
9091
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
9192
use datafusion_expr::{
92-
DescribeTable, DmlStatement, ScalarFunctionDefinition, StringifiedPlan, WindowFrame,
93-
WindowFrameBound, WriteOp, NamedRelation, RecursiveQuery,
93+
DescribeTable, DmlStatement, NamedRelation, RecursiveQuery, ScalarFunctionDefinition,
94+
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
9495
};
9596
use datafusion_physical_expr::expressions::Literal;
9697
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
@@ -452,11 +453,13 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
452453
logical_plan: &LogicalPlan,
453454
session_state: &SessionState,
454455
) -> Result<Arc<dyn ExecutionPlan>> {
456+
reset_recursive_cte_physical_plan_branch_number();
457+
455458
match self.handle_explain(logical_plan, session_state).await? {
456459
Some(plan) => Ok(plan),
457460
None => {
458461
let plan = self
459-
.create_initial_plan(logical_plan, session_state)
462+
.create_initial_plan(logical_plan, session_state, None)
460463
.await?;
461464
self.optimize_internal(plan, session_state, |_, _| {})
462465
}
@@ -487,6 +490,21 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
487490
}
488491
}
489492

493+
static RECURSIVE_CTE_PHYSICAL_PLAN_BRANCH: OnceLock<AtomicI32> = OnceLock::new();
494+
495+
fn new_recursive_cte_physical_plan_branch_number() -> u32 {
496+
let counter = RECURSIVE_CTE_PHYSICAL_PLAN_BRANCH
497+
.get_or_init(|| AtomicI32::new(0))
498+
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
499+
counter as u32
500+
}
501+
502+
fn reset_recursive_cte_physical_plan_branch_number() {
503+
RECURSIVE_CTE_PHYSICAL_PLAN_BRANCH
504+
.get_or_init(|| AtomicI32::new(0))
505+
.store(0, std::sync::atomic::Ordering::SeqCst);
506+
}
507+
490508
impl DefaultPhysicalPlanner {
491509
/// Create a physical planner that uses `extension_planners` to
492510
/// plan user-defined logical nodes [`LogicalPlan::Extension`].
@@ -507,6 +525,7 @@ impl DefaultPhysicalPlanner {
507525
&'a self,
508526
logical_plans: impl IntoIterator<Item = &'a LogicalPlan> + Send + 'a,
509527
session_state: &'a SessionState,
528+
ctx: Option<&'a String>,
510529
) -> BoxFuture<'a, Result<Vec<Arc<dyn ExecutionPlan>>>> {
511530
async move {
512531
// First build futures with as little references as possible, then performing some stream magic.
@@ -519,7 +538,7 @@ impl DefaultPhysicalPlanner {
519538
.into_iter()
520539
.enumerate()
521540
.map(|(idx, lp)| async move {
522-
let plan = self.create_initial_plan(lp, session_state).await?;
541+
let plan = self.create_initial_plan(lp, session_state, ctx).await?;
523542
Ok((idx, plan)) as Result<_>
524543
})
525544
.collect::<Vec<_>>();
@@ -548,6 +567,7 @@ impl DefaultPhysicalPlanner {
548567
&'a self,
549568
logical_plan: &'a LogicalPlan,
550569
session_state: &'a SessionState,
570+
ctx: Option<&'a String>,
551571
) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
552572
async move {
553573
let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan {
@@ -572,7 +592,7 @@ impl DefaultPhysicalPlanner {
572592
single_file_output,
573593
copy_options,
574594
}) => {
575-
let input_exec = self.create_initial_plan(input, session_state).await?;
595+
let input_exec = self.create_initial_plan(input, session_state, ctx).await?;
576596
let parsed_url = ListingTableUrl::parse(output_url)?;
577597
let object_store_url = parsed_url.object_store();
578598

@@ -620,7 +640,7 @@ impl DefaultPhysicalPlanner {
620640
let name = table_name.table();
621641
let schema = session_state.schema_for_ref(table_name)?;
622642
if let Some(provider) = schema.table(name).await {
623-
let input_exec = self.create_initial_plan(input, session_state).await?;
643+
let input_exec = self.create_initial_plan(input, session_state, ctx).await?;
624644
provider.insert_into(session_state, input_exec, false).await
625645
} else {
626646
return exec_err!(
@@ -637,7 +657,7 @@ impl DefaultPhysicalPlanner {
637657
let name = table_name.table();
638658
let schema = session_state.schema_for_ref(table_name)?;
639659
if let Some(provider) = schema.table(name).await {
640-
let input_exec = self.create_initial_plan(input, session_state).await?;
660+
let input_exec = self.create_initial_plan(input, session_state, ctx).await?;
641661
provider.insert_into(session_state, input_exec, true).await
642662
} else {
643663
return exec_err!(
@@ -678,7 +698,7 @@ impl DefaultPhysicalPlanner {
678698
);
679699
}
680700

681-
let input_exec = self.create_initial_plan(input, session_state).await?;
701+
let input_exec = self.create_initial_plan(input, session_state, ctx).await?;
682702

683703
// at this moment we are guaranteed by the logical planner
684704
// to have all the window_expr to have equal sort key
@@ -774,7 +794,7 @@ impl DefaultPhysicalPlanner {
774794
..
775795
}) => {
776796
// Initially need to perform the aggregate and then merge the partitions
777-
let input_exec = self.create_initial_plan(input, session_state).await?;
797+
let input_exec = self.create_initial_plan(input, session_state, ctx).await?;
778798
let physical_input_schema = input_exec.schema();
779799
let logical_input_schema = input.as_ref().schema();
780800

@@ -848,7 +868,7 @@ impl DefaultPhysicalPlanner {
848868
)?))
849869
}
850870
LogicalPlan::Projection(Projection { input, expr, .. }) => {
851-
let input_exec = self.create_initial_plan(input, session_state).await?;
871+
let input_exec = self.create_initial_plan(input, session_state, ctx).await?;
852872
let input_schema = input.as_ref().schema();
853873

854874
let physical_exprs = expr
@@ -900,7 +920,7 @@ impl DefaultPhysicalPlanner {
900920
)?))
901921
}
902922
LogicalPlan::Filter(filter) => {
903-
let physical_input = self.create_initial_plan(&filter.input, session_state).await?;
923+
let physical_input = self.create_initial_plan(&filter.input, session_state, ctx).await?;
904924
let input_schema = physical_input.as_ref().schema();
905925
let input_dfschema = filter.input.schema();
906926

@@ -914,16 +934,16 @@ impl DefaultPhysicalPlanner {
914934
let filter = FilterExec::try_new(runtime_expr, physical_input)?;
915935
Ok(Arc::new(filter.with_default_selectivity(selectivity)?))
916936
}
917-
LogicalPlan::Union(Union { inputs, .. }) => {
918-
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;
937+
LogicalPlan::Union(Union { inputs, schema: _ }) => {
938+
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state, ctx).await?;
919939

920940
Ok(Arc::new(UnionExec::new(physical_plans)))
921941
}
922942
LogicalPlan::Repartition(Repartition {
923943
input,
924944
partitioning_scheme,
925945
}) => {
926-
let physical_input = self.create_initial_plan(input, session_state).await?;
946+
let physical_input = self.create_initial_plan(input, session_state, ctx).await?;
927947
let input_schema = physical_input.schema();
928948
let input_dfschema = input.as_ref().schema();
929949
let physical_partitioning = match partitioning_scheme {
@@ -954,7 +974,7 @@ impl DefaultPhysicalPlanner {
954974
)?))
955975
}
956976
LogicalPlan::Sort(Sort { expr, input, fetch, .. }) => {
957-
let physical_input = self.create_initial_plan(input, session_state).await?;
977+
let physical_input = self.create_initial_plan(input, session_state, ctx).await?;
958978
let input_schema = physical_input.as_ref().schema();
959979
let input_dfschema = input.as_ref().schema();
960980
let sort_expr = expr
@@ -1045,12 +1065,12 @@ impl DefaultPhysicalPlanner {
10451065
};
10461066

10471067
return self
1048-
.create_initial_plan(&join_plan, session_state)
1068+
.create_initial_plan(&join_plan, session_state, ctx)
10491069
.await;
10501070
}
10511071

10521072
// All equi-join keys are columns now, create physical join plan
1053-
let left_right = self.create_initial_plan_multi([left.as_ref(), right.as_ref()], session_state).await?;
1073+
let left_right = self.create_initial_plan_multi([left.as_ref(), right.as_ref()], session_state, ctx).await?;
10541074
let [physical_left, physical_right]: [Arc<dyn ExecutionPlan>; 2] = left_right.try_into().map_err(|_| DataFusionError::Internal("`create_initial_plan_multi` is broken".to_string()))?;
10551075
let left_df_schema = left.schema();
10561076
let right_df_schema = right.schema();
@@ -1185,7 +1205,7 @@ impl DefaultPhysicalPlanner {
11851205
}
11861206
}
11871207
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
1188-
let left_right = self.create_initial_plan_multi([left.as_ref(), right.as_ref()], session_state).await?;
1208+
let left_right = self.create_initial_plan_multi([left.as_ref(), right.as_ref()], session_state, ctx).await?;
11891209
let [left, right]: [Arc<dyn ExecutionPlan>; 2] = left_right.try_into().map_err(|_| DataFusionError::Internal("`create_initial_plan_multi` is broken".to_string()))?;
11901210
Ok(Arc::new(CrossJoinExec::new(left, right)))
11911211
}
@@ -1203,10 +1223,10 @@ impl DefaultPhysicalPlanner {
12031223
SchemaRef::new(schema.as_ref().to_owned().into()),
12041224
))),
12051225
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
1206-
self.create_initial_plan(input, session_state).await
1226+
self.create_initial_plan(input, session_state, ctx).await
12071227
}
12081228
LogicalPlan::Limit(Limit { input, skip, fetch, .. }) => {
1209-
let input = self.create_initial_plan(input, session_state).await?;
1229+
let input = self.create_initial_plan(input, session_state, ctx).await?;
12101230

12111231
// GlobalLimitExec requires a single partition for input
12121232
let input = if input.output_partitioning().partition_count() == 1 {
@@ -1224,7 +1244,7 @@ impl DefaultPhysicalPlanner {
12241244
Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
12251245
}
12261246
LogicalPlan::Unnest(Unnest { input, column, schema, options }) => {
1227-
let input = self.create_initial_plan(input, session_state).await?;
1247+
let input = self.create_initial_plan(input, session_state, ctx).await?;
12281248
let column_exec = schema.index_of_column(column)
12291249
.map(|idx| Column::new(&column.name, idx))?;
12301250
let schema = SchemaRef::new(schema.as_ref().to_owned().into());
@@ -1277,7 +1297,7 @@ impl DefaultPhysicalPlanner {
12771297
"Unsupported logical plan: Analyze must be root of the plan"
12781298
),
12791299
LogicalPlan::Extension(e) => {
1280-
let physical_inputs = self.create_initial_plan_multi(e.node.inputs(), session_state).await?;
1300+
let physical_inputs = self.create_initial_plan_multi(e.node.inputs(), session_state, ctx).await?;
12811301

12821302
let mut maybe_plan = None;
12831303
for planner in &self.extension_planners {
@@ -1314,12 +1334,17 @@ impl DefaultPhysicalPlanner {
13141334
}
13151335
}
13161336
LogicalPlan::RecursiveQuery(RecursiveQuery { name, static_term, recursive_term, is_distinct }) => {
1317-
let static_term = self.create_initial_plan(static_term, session_state).await?;
1318-
let recursive_term = self.create_initial_plan(recursive_term, session_state).await?;
1337+
let name = format!("{}-{}", name, new_recursive_cte_physical_plan_branch_number());
1338+
1339+
let ctx = Some(&name);
1340+
1341+
let static_term = self.create_initial_plan(static_term, session_state, ctx).await?;
1342+
let recursive_term = self.create_initial_plan(recursive_term, session_state, ctx).await?;
13191343

13201344
Ok(Arc::new(RecursiveQueryExec::new(name.clone(), static_term, recursive_term, *is_distinct)))
13211345
}
1322-
LogicalPlan::NamedRelation(NamedRelation {name, schema}) => {
1346+
LogicalPlan::NamedRelation(NamedRelation {schema, ..}) => {
1347+
let name = ctx.expect("NamedRelation must have a context that contains the recursive query's branch name");
13231348
// Named relations is how we represent access to any sort of dynamic data provider. They
13241349
// differ from tables in the sense that they can start existing dynamically during the
13251350
// execution of a query and then disappear before it even finishes.
@@ -1866,6 +1891,8 @@ impl DefaultPhysicalPlanner {
18661891
logical_plan: &LogicalPlan,
18671892
session_state: &SessionState,
18681893
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1894+
reset_recursive_cte_physical_plan_branch_number();
1895+
18691896
if let LogicalPlan::Explain(e) = logical_plan {
18701897
use PlanType::*;
18711898
let mut stringified_plans = vec![];
@@ -1881,7 +1908,7 @@ impl DefaultPhysicalPlanner {
18811908

18821909
if !config.logical_plan_only && e.logical_optimization_succeeded {
18831910
match self
1884-
.create_initial_plan(e.plan.as_ref(), session_state)
1911+
.create_initial_plan(e.plan.as_ref(), session_state, None)
18851912
.await
18861913
{
18871914
Ok(input) => {

datafusion/optimizer/src/optimize_projections.rs

+2
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ fn optimize_projections(
162162
.collect::<Vec<_>>()
163163
}
164164
LogicalPlan::EmptyRelation(_)
165+
| LogicalPlan::NamedRelation(_)
166+
| LogicalPlan::RecursiveQuery(_)
165167
| LogicalPlan::Statement(_)
166168
| LogicalPlan::Values(_)
167169
| LogicalPlan::Extension(_)

datafusion/physical-plan/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,4 @@ uuid = { version = "^1.2", features = ["v4"] }
6363
[dev-dependencies]
6464
rstest = { workspace = true }
6565
termtree = "0.4.1"
66+
tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }

datafusion/physical-plan/src/recursive_query.rs

+1
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ impl RecursiveQueryStream {
296296
Err(e) => {
297297
return Poll::Ready(Some(Err(DataFusionError::ArrowError(
298298
ArrowError::from_external_error(Box::new(e)),
299+
None,
299300
))));
300301
}
301302
}

datafusion/sql/src/query.rs

+12-10
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
133133
static_metadata,
134134
)?;
135135

136+
let name = cte_name.clone();
137+
136138
// Step 2.2: Create a temporary relation logical plan that will be used
137139
// as the input to the recursive term
138140
let named_relation = LogicalPlanBuilder::named_relation(
139-
cte_name.as_str(),
141+
&name,
140142
Arc::new(named_relation_schema),
141143
)
142144
.build()?;
@@ -157,22 +159,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
157159

158160
// ---------- Step 4: Create the final plan ------------------
159161
// Step 4.1: Compile the final plan
160-
let final_plan = LogicalPlanBuilder::from(static_plan)
161-
.to_recursive_query(
162-
cte_name.clone(),
163-
recursive_plan,
164-
distinct,
165-
)?
162+
let logical_plan = LogicalPlanBuilder::from(static_plan)
163+
.to_recursive_query(name, recursive_plan, distinct)?
166164
.build()?;
167165

166+
let final_plan =
167+
self.apply_table_alias(logical_plan, cte.alias)?;
168+
168169
// Step 4.2: Remove the temporary relation from the planning context and replace it
169170
// with the final plan.
170171
planner_context.insert_cte(cte_name.clone(), final_plan);
171172
}
172173
_ => {
173-
return Err(DataFusionError::SQL(ParserError(
174-
"Invalid recursive CTE".to_string(),
175-
)));
174+
return Err(DataFusionError::SQL(
175+
ParserError("Invalid recursive CTE".to_string()),
176+
None,
177+
));
176178
}
177179
};
178180
} else {

datafusion/sql/tests/sql_integration.rs

-22
Original file line numberDiff line numberDiff line change
@@ -1387,28 +1387,6 @@ fn select_interval_out_of_range() {
13871387
);
13881388
}
13891389

1390-
#[test]
1391-
fn select_array_no_common_type() {
1392-
let sql = "SELECT [1, true, null]";
1393-
let err = logical_plan(sql).expect_err("query should have failed");
1394-
1395-
// HashSet doesn't guarantee order
1396-
assert_contains!(
1397-
err.strip_backtrace(),
1398-
"This feature is not implemented: Arrays with different types are not supported: "
1399-
);
1400-
}
1401-
1402-
#[test]
1403-
fn select_array_non_literal_type() {
1404-
let sql = "SELECT [now()]";
1405-
let err = logical_plan(sql).expect_err("query should have failed");
1406-
assert_eq!(
1407-
"This feature is not implemented: Arrays with elements other than literal are not supported: now()",
1408-
err.strip_backtrace()
1409-
);
1410-
}
1411-
14121390
#[test]
14131391
fn select_simple_aggregate_with_groupby_and_column_is_in_aggregate_and_groupby() {
14141392
quick_test(

0 commit comments

Comments
 (0)