Skip to content
66 changes: 28 additions & 38 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 97 additions & 0 deletions datafusion/core/src/datasource/cte_worktable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! CteWorkTable implementation used for recursive queries

use std::any::Any;
use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::not_impl_err;

use crate::{
error::Result,
logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown},
physical_plan::ExecutionPlan,
};

use datafusion_common::DataFusionError;

use crate::datasource::{TableProvider, TableType};
use crate::execution::context::SessionState;

/// The temporary working table where the previous iteration of a recursive query is stored
/// Naming is based on PostgreSQL's implementation.
/// See here for more details: www.postgresql.org/docs/11/queries-with.html#id-1.5.6.12.5.4
pub struct CteWorkTable {
/// The name of the CTE work table
// WIP, see https://github.com/apache/arrow-datafusion/issues/462
#[allow(dead_code)]
name: String,
/// This schema must be shared across both the static and recursive terms of a recursive query
table_schema: SchemaRef,
}

impl CteWorkTable {
/// construct a new CteWorkTable with the given name and schema
/// This schema must match the schema of the recursive term of the query
/// Since the scan method will contain an physical plan that assumes this schema
pub fn new(name: &str, table_schema: SchemaRef) -> Self {
Self {
name: name.to_owned(),
table_schema,
}
}
}

#[async_trait]
impl TableProvider for CteWorkTable {
fn as_any(&self) -> &dyn Any {
self
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
None
}

fn schema(&self) -> SchemaRef {
self.table_schema.clone()
}

fn table_type(&self) -> TableType {
TableType::Temporary
}

async fn scan(
&self,
_state: &SessionState,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("scan not implemented for CteWorkTable yet")
}

fn supports_filter_pushdown(
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
// TODO: should we support filter pushdown?
Ok(TableProviderFilterPushDown::Unsupported)
}
}
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! [`ListingTable`]: crate::datasource::listing::ListingTable

pub mod avro_to_arrow;
pub mod cte_worktable;
pub mod default_table_source;
pub mod empty;
pub mod file_format;
Expand Down
13 changes: 13 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod parquet;
use crate::{
catalog::{CatalogList, MemoryCatalogList},
datasource::{
cte_worktable::CteWorkTable,
function::{TableFunction, TableFunctionImpl},
listing::{ListingOptions, ListingTable},
provider::TableProviderFactory,
Expand Down Expand Up @@ -1899,6 +1900,18 @@ impl<'a> ContextProvider for SessionContextProvider<'a> {
Ok(provider_as_source(provider))
}

/// Create a new CTE work table for a recursive CTE logical plan
/// This table will be used in conjunction with a Worktable physical plan
/// to read and write each iteration of a recursive CTE
fn create_cte_work_table(
&self,
name: &str,
schema: SchemaRef,
) -> Result<Arc<dyn TableSource>> {
let table = Arc::new(CteWorkTable::new(name, schema));
Ok(provider_as_source(table))
}

fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
self.state.scalar_functions().get(name).cloned()
}
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ use datafusion_expr::expr::{
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, ScalarFunctionDefinition, StringifiedPlan, WindowFrame,
WindowFrameBound, WriteOp,
DescribeTable, DmlStatement, RecursiveQuery, ScalarFunctionDefinition,
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
Expand Down Expand Up @@ -1290,6 +1290,9 @@ impl DefaultPhysicalPlanner {
Ok(plan)
}
}
LogicalPlan::RecursiveQuery(RecursiveQuery { name: _, static_term: _, recursive_term: _, is_distinct: _,.. }) => {
not_impl_err!("Physical counterpart of RecursiveQuery is not implemented yet")
}
};
exec_plan
}.boxed()
Expand Down
24 changes: 24 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ use datafusion_common::{
ScalarValue, TableReference, ToDFSchema, UnnestOptions,
};

use super::plan::RecursiveQuery;

/// Default table name for unnamed table
pub const UNNAMED_TABLE: &str = "?table?";

Expand Down Expand Up @@ -121,6 +123,28 @@ impl LogicalPlanBuilder {
}))
}

/// Convert a regular plan into a recursive query.
/// `is_distinct` indicates whether the recursive term should be de-duplicated (`UNION`) after each iteration or not (`UNION ALL`).
pub fn to_recursive_query(
&self,
name: String,
recursive_term: LogicalPlan,
is_distinct: bool,
) -> Result<Self> {
// TODO: we need to do a bunch of validation here. Maybe more.
if is_distinct {
return Err(DataFusionError::NotImplemented(
"Recursive queries with a distinct 'UNION' (in which the previous iteration's results will be de-duplicated) is not supported".to_string(),
));
}
Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
name,
static_term: Arc::new(self.plan.clone()),
recursive_term: Arc::new(recursive_term),
is_distinct,
})))
}

/// Create a values list based relation, and the schema is inferred from data, consuming
/// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
Expand Down
Loading