Skip to content

Commit a786921

Browse files
matthewgappalamb
andauthored
Recursive CTEs: Stage 2 - add support for sql -> logical plan generation (#8839)
* add config flag for recursive ctes update docs from script update slt test for doc change * restore testing pin * add sql -> logical plan support * impl cte as work table * move SharedState to continuance * impl WorkTableState wip: readying pr to implement only logical plan fix sql integration test wip: add sql test for logical plan wip: format test assertion * wip: remove uncessary with qualifier method some docs more docs * Add comments to `RecursiveQuery` * Update datfusion-cli Cargo.lock * Fix clippy * better errors and comments * add doc comment with rationale for create_cte_worktable method * wip: tweak --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 3a9e23d commit a786921

File tree

14 files changed

+431
-67
lines changed

14 files changed

+431
-67
lines changed

datafusion-cli/Cargo.lock

Lines changed: 28 additions & 38 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! CteWorkTable implementation used for recursive queries
19+
20+
use std::any::Any;
21+
use std::sync::Arc;
22+
23+
use arrow::datatypes::SchemaRef;
24+
use async_trait::async_trait;
25+
use datafusion_common::not_impl_err;
26+
27+
use crate::{
28+
error::Result,
29+
logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown},
30+
physical_plan::ExecutionPlan,
31+
};
32+
33+
use datafusion_common::DataFusionError;
34+
35+
use crate::datasource::{TableProvider, TableType};
36+
use crate::execution::context::SessionState;
37+
38+
/// The temporary working table where the previous iteration of a recursive query is stored
39+
/// Naming is based on PostgreSQL's implementation.
40+
/// See here for more details: www.postgresql.org/docs/11/queries-with.html#id-1.5.6.12.5.4
41+
pub struct CteWorkTable {
42+
/// The name of the CTE work table
43+
// WIP, see https://github.com/apache/arrow-datafusion/issues/462
44+
#[allow(dead_code)]
45+
name: String,
46+
/// This schema must be shared across both the static and recursive terms of a recursive query
47+
table_schema: SchemaRef,
48+
}
49+
50+
impl CteWorkTable {
51+
/// construct a new CteWorkTable with the given name and schema
52+
/// This schema must match the schema of the recursive term of the query
53+
/// Since the scan method will contain an physical plan that assumes this schema
54+
pub fn new(name: &str, table_schema: SchemaRef) -> Self {
55+
Self {
56+
name: name.to_owned(),
57+
table_schema,
58+
}
59+
}
60+
}
61+
62+
#[async_trait]
63+
impl TableProvider for CteWorkTable {
64+
fn as_any(&self) -> &dyn Any {
65+
self
66+
}
67+
68+
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
69+
None
70+
}
71+
72+
fn schema(&self) -> SchemaRef {
73+
self.table_schema.clone()
74+
}
75+
76+
fn table_type(&self) -> TableType {
77+
TableType::Temporary
78+
}
79+
80+
async fn scan(
81+
&self,
82+
_state: &SessionState,
83+
_projection: Option<&Vec<usize>>,
84+
_filters: &[Expr],
85+
_limit: Option<usize>,
86+
) -> Result<Arc<dyn ExecutionPlan>> {
87+
not_impl_err!("scan not implemented for CteWorkTable yet")
88+
}
89+
90+
fn supports_filter_pushdown(
91+
&self,
92+
_filter: &Expr,
93+
) -> Result<TableProviderFilterPushDown> {
94+
// TODO: should we support filter pushdown?
95+
Ok(TableProviderFilterPushDown::Unsupported)
96+
}
97+
}

datafusion/core/src/datasource/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
//! [`ListingTable`]: crate::datasource::listing::ListingTable
2121
2222
pub mod avro_to_arrow;
23+
pub mod cte_worktable;
2324
pub mod default_table_source;
2425
pub mod empty;
2526
pub mod file_format;

datafusion/core/src/execution/context/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod parquet;
2626
use crate::{
2727
catalog::{CatalogList, MemoryCatalogList},
2828
datasource::{
29+
cte_worktable::CteWorkTable,
2930
function::{TableFunction, TableFunctionImpl},
3031
listing::{ListingOptions, ListingTable},
3132
provider::TableProviderFactory,
@@ -1899,6 +1900,18 @@ impl<'a> ContextProvider for SessionContextProvider<'a> {
18991900
Ok(provider_as_source(provider))
19001901
}
19011902

1903+
/// Create a new CTE work table for a recursive CTE logical plan
1904+
/// This table will be used in conjunction with a Worktable physical plan
1905+
/// to read and write each iteration of a recursive CTE
1906+
fn create_cte_work_table(
1907+
&self,
1908+
name: &str,
1909+
schema: SchemaRef,
1910+
) -> Result<Arc<dyn TableSource>> {
1911+
let table = Arc::new(CteWorkTable::new(name, schema));
1912+
Ok(provider_as_source(table))
1913+
}
1914+
19021915
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
19031916
self.state.scalar_functions().get(name).cloned()
19041917
}

datafusion/core/src/physical_planner.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ use datafusion_expr::expr::{
8787
use datafusion_expr::expr_rewriter::unnormalize_cols;
8888
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
8989
use datafusion_expr::{
90-
DescribeTable, DmlStatement, ScalarFunctionDefinition, StringifiedPlan, WindowFrame,
91-
WindowFrameBound, WriteOp,
90+
DescribeTable, DmlStatement, RecursiveQuery, ScalarFunctionDefinition,
91+
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
9292
};
9393
use datafusion_physical_expr::expressions::Literal;
9494
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
@@ -1290,6 +1290,9 @@ impl DefaultPhysicalPlanner {
12901290
Ok(plan)
12911291
}
12921292
}
1293+
LogicalPlan::RecursiveQuery(RecursiveQuery { name: _, static_term: _, recursive_term: _, is_distinct: _,.. }) => {
1294+
not_impl_err!("Physical counterpart of RecursiveQuery is not implemented yet")
1295+
}
12931296
};
12941297
exec_plan
12951298
}.boxed()

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ use datafusion_common::{
5555
ScalarValue, TableReference, ToDFSchema, UnnestOptions,
5656
};
5757

58+
use super::plan::RecursiveQuery;
59+
5860
/// Default table name for unnamed table
5961
pub const UNNAMED_TABLE: &str = "?table?";
6062

@@ -121,6 +123,28 @@ impl LogicalPlanBuilder {
121123
}))
122124
}
123125

126+
/// Convert a regular plan into a recursive query.
127+
/// `is_distinct` indicates whether the recursive term should be de-duplicated (`UNION`) after each iteration or not (`UNION ALL`).
128+
pub fn to_recursive_query(
129+
&self,
130+
name: String,
131+
recursive_term: LogicalPlan,
132+
is_distinct: bool,
133+
) -> Result<Self> {
134+
// TODO: we need to do a bunch of validation here. Maybe more.
135+
if is_distinct {
136+
return Err(DataFusionError::NotImplemented(
137+
"Recursive queries with a distinct 'UNION' (in which the previous iteration's results will be de-duplicated) is not supported".to_string(),
138+
));
139+
}
140+
Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
141+
name,
142+
static_term: Arc::new(self.plan.clone()),
143+
recursive_term: Arc::new(recursive_term),
144+
is_distinct,
145+
})))
146+
}
147+
124148
/// Create a values list based relation, and the schema is inferred from data, consuming
125149
/// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
126150
/// documentation for more details.

0 commit comments

Comments
 (0)