Skip to content

Commit 531d541

Browse files
committed
add sql -> logical plan support
* impl cte as work table * move SharedState to continuance * impl WorkTableState wip: readying pr to implement only logical plan
1 parent 128b2c6 commit 531d541

File tree

15 files changed

+323
-29
lines changed

15 files changed

+323
-29
lines changed

datafusion-cli/Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/dfschema.rs

+5
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,11 @@ impl DFField {
915915
self.field = f.into();
916916
self
917917
}
918+
919+
pub fn with_qualifier(mut self, qualifier: impl Into<OwnedTableReference>) -> Self {
920+
self.qualifier = Some(qualifier.into());
921+
self
922+
}
918923
}
919924

920925
impl From<FieldRef> for DFField {

datafusion/core/src/datasource/cte.rs

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! CteWorkTable implementation used for recursive queries
19+
20+
use std::any::Any;
21+
use std::sync::Arc;
22+
23+
use arrow::datatypes::SchemaRef;
24+
use async_trait::async_trait;
25+
use datafusion_common::not_impl_err;
26+
27+
use crate::{
28+
error::Result,
29+
logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown},
30+
physical_plan::ExecutionPlan,
31+
};
32+
33+
use datafusion_common::DataFusionError;
34+
35+
use crate::datasource::{TableProvider, TableType};
36+
use crate::execution::context::SessionState;
37+
38+
/// TODO: add docs
39+
pub struct CteWorkTable {
40+
name: String,
41+
table_schema: SchemaRef,
42+
}
43+
44+
impl CteWorkTable {
45+
/// TODO: add doc
46+
pub fn new(name: &str, table_schema: SchemaRef) -> Self {
47+
Self {
48+
name: name.to_owned(),
49+
table_schema,
50+
}
51+
}
52+
}
53+
54+
#[async_trait]
55+
impl TableProvider for CteWorkTable {
56+
fn as_any(&self) -> &dyn Any {
57+
self
58+
}
59+
60+
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
61+
None
62+
}
63+
64+
fn schema(&self) -> SchemaRef {
65+
self.table_schema.clone()
66+
}
67+
68+
fn table_type(&self) -> TableType {
69+
TableType::Temporary
70+
}
71+
72+
async fn scan(
73+
&self,
74+
_state: &SessionState,
75+
_projection: Option<&Vec<usize>>,
76+
_filters: &[Expr],
77+
_limit: Option<usize>,
78+
) -> Result<Arc<dyn ExecutionPlan>> {
79+
not_impl_err!("scan not implemented for CteWorkTable yet")
80+
}
81+
82+
fn supports_filter_pushdown(
83+
&self,
84+
_filter: &Expr,
85+
) -> Result<TableProviderFilterPushDown> {
86+
// TODO: should we support filter pushdown?
87+
Ok(TableProviderFilterPushDown::Unsupported)
88+
}
89+
}

datafusion/core/src/datasource/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
//! [`ListingTable`]: crate::datasource::listing::ListingTable
2121
2222
pub mod avro_to_arrow;
23+
pub mod cte;
2324
pub mod default_table_source;
2425
pub mod empty;
2526
pub mod file_format;

datafusion/core/src/execution/context/mod.rs

+10
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod parquet;
2626
use crate::{
2727
catalog::{CatalogList, MemoryCatalogList},
2828
datasource::{
29+
cte::CteWorkTable,
2930
function::{TableFunction, TableFunctionImpl},
3031
listing::{ListingOptions, ListingTable},
3132
provider::TableProviderFactory,
@@ -1899,6 +1900,15 @@ impl<'a> ContextProvider for SessionContextProvider<'a> {
18991900
Ok(provider_as_source(provider))
19001901
}
19011902

1903+
fn create_cte_work_table(
1904+
&self,
1905+
name: &str,
1906+
schema: SchemaRef,
1907+
) -> Result<Arc<dyn TableSource>> {
1908+
let table = Arc::new(CteWorkTable::new(name, schema));
1909+
Ok(provider_as_source(table))
1910+
}
1911+
19021912
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
19031913
self.state.scalar_functions().get(name).cloned()
19041914
}

datafusion/core/src/physical_planner.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ use datafusion_expr::expr::{
8787
use datafusion_expr::expr_rewriter::unnormalize_cols;
8888
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
8989
use datafusion_expr::{
90-
DescribeTable, DmlStatement, ScalarFunctionDefinition, StringifiedPlan, WindowFrame,
91-
WindowFrameBound, WriteOp,
90+
DescribeTable, DmlStatement, RecursiveQuery, ScalarFunctionDefinition,
91+
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
9292
};
9393
use datafusion_physical_expr::expressions::Literal;
9494
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
@@ -1311,6 +1311,9 @@ impl DefaultPhysicalPlanner {
13111311
Ok(plan)
13121312
}
13131313
}
1314+
LogicalPlan::RecursiveQuery(RecursiveQuery { name: _, static_term: _, recursive_term: _, is_distinct: _,.. }) => {
1315+
not_impl_err!("Physical counterpart of RecursiveQuery is not implemented yet")
1316+
}
13141317
};
13151318
exec_plan
13161319
}.boxed()

datafusion/expr/src/logical_plan/builder.rs

+23
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ use datafusion_common::{
5555
ScalarValue, TableReference, ToDFSchema, UnnestOptions,
5656
};
5757

58+
use super::plan::RecursiveQuery;
59+
5860
/// Default table name for unnamed table
5961
pub const UNNAMED_TABLE: &str = "?table?";
6062

@@ -121,6 +123,27 @@ impl LogicalPlanBuilder {
121123
}))
122124
}
123125

126+
/// Convert a regular plan into a recursive query.
127+
pub fn to_recursive_query(
128+
&self,
129+
name: String,
130+
recursive_term: LogicalPlan,
131+
is_distinct: bool,
132+
) -> Result<Self> {
133+
// TODO: we need to do a bunch of validation here. Maybe more.
134+
if is_distinct {
135+
return Err(DataFusionError::NotImplemented(
136+
"Recursive queries with distinct is not supported".to_string(),
137+
));
138+
}
139+
Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
140+
name,
141+
static_term: Arc::new(self.plan.clone()),
142+
recursive_term: Arc::new(recursive_term),
143+
is_distinct,
144+
})))
145+
}
146+
124147
/// Create a values list based relation, and the schema is inferred from data, consuming
125148
/// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
126149
/// documentation for more details.

datafusion/expr/src/logical_plan/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ pub use plan::{
3636
projection_schema, Aggregate, Analyze, CrossJoin, DescribeTable, Distinct,
3737
DistinctOn, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint,
3838
JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection,
39-
Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
40-
ToStringifiedPlan, Union, Unnest, Values, Window,
39+
RecursiveQuery, Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias,
40+
TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
4141
};
4242
pub use statement::{
4343
SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd,

datafusion/expr/src/logical_plan/plan.rs

+46
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ pub enum LogicalPlan {
154154
/// Unnest a column that contains a nested list type such as an
155155
/// ARRAY. This is used to implement SQL `UNNEST`
156156
Unnest(Unnest),
157+
/// A variadic query (e.g. "Recursive CTEs")
158+
RecursiveQuery(RecursiveQuery),
157159
}
158160

159161
impl LogicalPlan {
@@ -191,6 +193,10 @@ impl LogicalPlan {
191193
LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
192194
LogicalPlan::Ddl(ddl) => ddl.schema(),
193195
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
196+
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
197+
// we take the schema of the static term as the schema of the entire recursive query
198+
static_term.schema()
199+
}
194200
}
195201
}
196202

@@ -243,6 +249,10 @@ impl LogicalPlan {
243249
| LogicalPlan::TableScan(_) => {
244250
vec![self.schema()]
245251
}
252+
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
253+
// return only the schema of the static term
254+
static_term.all_schemas()
255+
}
246256
// return children schemas
247257
LogicalPlan::Limit(_)
248258
| LogicalPlan::Subquery(_)
@@ -384,6 +394,7 @@ impl LogicalPlan {
384394
.try_for_each(f),
385395
// plans without expressions
386396
LogicalPlan::EmptyRelation(_)
397+
| LogicalPlan::RecursiveQuery(_)
387398
| LogicalPlan::Subquery(_)
388399
| LogicalPlan::SubqueryAlias(_)
389400
| LogicalPlan::Limit(_)
@@ -430,6 +441,11 @@ impl LogicalPlan {
430441
LogicalPlan::Ddl(ddl) => ddl.inputs(),
431442
LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
432443
LogicalPlan::Prepare(Prepare { input, .. }) => vec![input],
444+
LogicalPlan::RecursiveQuery(RecursiveQuery {
445+
static_term,
446+
recursive_term,
447+
..
448+
}) => vec![static_term, recursive_term],
433449
// plans without inputs
434450
LogicalPlan::TableScan { .. }
435451
| LogicalPlan::Statement { .. }
@@ -510,6 +526,9 @@ impl LogicalPlan {
510526
cross.left.head_output_expr()
511527
}
512528
}
529+
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
530+
static_term.head_output_expr()
531+
}
513532
LogicalPlan::Union(union) => Ok(Some(Expr::Column(
514533
union.schema.fields()[0].qualified_column(),
515534
))),
@@ -835,6 +854,14 @@ impl LogicalPlan {
835854
};
836855
Ok(LogicalPlan::Distinct(distinct))
837856
}
857+
LogicalPlan::RecursiveQuery(RecursiveQuery {
858+
name, is_distinct, ..
859+
}) => Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
860+
name: name.clone(),
861+
static_term: Arc::new(inputs[0].clone()),
862+
recursive_term: Arc::new(inputs[1].clone()),
863+
is_distinct: *is_distinct,
864+
})),
838865
LogicalPlan::Analyze(a) => {
839866
assert!(expr.is_empty());
840867
assert_eq!(inputs.len(), 1);
@@ -1073,6 +1100,7 @@ impl LogicalPlan {
10731100
}),
10741101
LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
10751102
LogicalPlan::EmptyRelation(_) => Some(0),
1103+
LogicalPlan::RecursiveQuery(_) => None,
10761104
LogicalPlan::Subquery(_) => None,
10771105
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
10781106
LogicalPlan::Limit(Limit { fetch, .. }) => *fetch,
@@ -1408,6 +1436,11 @@ impl LogicalPlan {
14081436
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
14091437
match self.0 {
14101438
LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
1439+
LogicalPlan::RecursiveQuery(RecursiveQuery {
1440+
is_distinct, ..
1441+
}) => {
1442+
write!(f, "RecursiveQuery: is_distinct={}", is_distinct)
1443+
}
14111444
LogicalPlan::Values(Values { ref values, .. }) => {
14121445
let str_values: Vec<_> = values
14131446
.iter()
@@ -1718,6 +1751,19 @@ pub struct EmptyRelation {
17181751
pub schema: DFSchemaRef,
17191752
}
17201753

1754+
/// A variadic query operation
1755+
#[derive(Clone, PartialEq, Eq, Hash)]
1756+
pub struct RecursiveQuery {
1757+
/// Name of the query
1758+
pub name: String,
1759+
/// The static term
1760+
pub static_term: Arc<LogicalPlan>,
1761+
/// The recursive term
1762+
pub recursive_term: Arc<LogicalPlan>,
1763+
/// Distinction
1764+
pub is_distinct: bool,
1765+
}
1766+
17211767
/// Values expression. See
17221768
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
17231769
/// documentation for more details.

datafusion/optimizer/src/common_subexpr_eliminate.rs

+1
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ impl OptimizerRule for CommonSubexprEliminate {
364364
| LogicalPlan::Dml(_)
365365
| LogicalPlan::Copy(_)
366366
| LogicalPlan::Unnest(_)
367+
| LogicalPlan::RecursiveQuery(_)
367368
| LogicalPlan::Prepare(_) => {
368369
// apply the optimization to all inputs of the plan
369370
utils::optimize_children(self, plan, config)?

datafusion/optimizer/src/optimize_projections.rs

+1
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ fn optimize_projections(
162162
.collect::<Vec<_>>()
163163
}
164164
LogicalPlan::EmptyRelation(_)
165+
| LogicalPlan::RecursiveQuery(_)
165166
| LogicalPlan::Statement(_)
166167
| LogicalPlan::Values(_)
167168
| LogicalPlan::Extension(_)

datafusion/proto/src/logical_plan/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -1702,6 +1702,9 @@ impl AsLogicalPlan for LogicalPlanNode {
17021702
LogicalPlan::DescribeTable(_) => Err(proto_error(
17031703
"LogicalPlan serde is not yet implemented for DescribeTable",
17041704
)),
1705+
LogicalPlan::RecursiveQuery(_) => Err(proto_error(
1706+
"LogicalPlan serde is not yet implemented for RecursiveQuery",
1707+
)),
17051708
}
17061709
}
17071710
}

datafusion/sql/src/planner.rs

+9
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,15 @@ pub trait ContextProvider {
6161
not_impl_err!("Table Functions are not supported")
6262
}
6363

64+
/// TODO: add doc
65+
fn create_cte_work_table(
66+
&self,
67+
_name: &str,
68+
_schema: SchemaRef,
69+
) -> Result<Arc<dyn TableSource>> {
70+
not_impl_err!("Recursive CTE is not supported")
71+
}
72+
6473
/// Getter for a UDF description
6574
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>;
6675
/// Getter for a UDAF description

0 commit comments

Comments
 (0)