From f8aa7ee9b39e5f2591aa44a9ae07ddfdaf3388b0 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Sat, 16 Sep 2023 20:24:56 -0700 Subject: [PATCH 01/15] partway through porting over isidentical's work --- datafusion/execution/src/task.rs | 40 +++++ datafusion/physical-plan/src/continuance.rs | 166 ++++++++++++++++++++ testing | 2 +- 3 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 datafusion/physical-plan/src/continuance.rs diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index 52c183b1612c..cc05a49123e6 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -33,6 +33,14 @@ use crate::{ runtime_env::{RuntimeConfig, RuntimeEnv}, }; +use arrow::{error::Result as ArrowResult, record_batch::RecordBatch}; +use futures::channel::mpsc::Receiver as SingleChannelReceiver; +// use futures::lock::Mutex; +use parking_lot::Mutex; +// use futures:: + +type RelationHandler = SingleChannelReceiver>; + /// Task Execution Context /// /// A [`TaskContext`] contains the state available during a single @@ -56,6 +64,8 @@ pub struct TaskContext { window_functions: HashMap>, /// Runtime environment associated with this task context runtime: Arc, + /// Registered relation handlers + relation_handlers: Mutex>, } impl Default for TaskContext { @@ -72,6 +82,7 @@ impl Default for TaskContext { aggregate_functions: HashMap::new(), window_functions: HashMap::new(), runtime: Arc::new(runtime), + relation_handlers: Mutex::new(HashMap::new()), } } } @@ -99,6 +110,7 @@ impl TaskContext { aggregate_functions, window_functions, runtime, + relation_handlers: Mutex::new(HashMap::new()), } } @@ -171,6 +183,34 @@ impl TaskContext { self.runtime = runtime; self } + + /// Register a new relation handler. If a handler with the same name already exists + /// this function will return an error. + pub fn push_relation_handler( + &self, + name: String, + handler: RelationHandler, + ) -> Result<()> { + let mut handlers = self.relation_handlers.lock(); + if handlers.contains_key(&name) { + return Err(DataFusionError::Internal(format!( + "Relation handler {} already registered", + name + ))); + } + handlers.insert(name, handler); + Ok(()) + } + + /// Retrieve the relation handler for the given name. It will remove the handler from + /// the storage if it exists, and return it as is. + pub fn pop_relation_handler(&self, name: String) -> Result { + let mut handlers = self.relation_handlers.lock(); + + handlers.remove(name.as_str()).ok_or_else(|| { + DataFusionError::Internal(format!("Relation handler {} not registered", name)) + }) + } } impl FunctionRegistry for TaskContext { diff --git a/datafusion/physical-plan/src/continuance.rs b/datafusion/physical-plan/src/continuance.rs new file mode 100644 index 000000000000..1b2d6ce93db1 --- /dev/null +++ b/datafusion/physical-plan/src/continuance.rs @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the continuance query plan + +use std::any::Any; +use std::sync::Arc; + +// use crate::error::{DataFusionError, Result}; +// use crate::physical_plan::{ +// DisplayFormatType, Distribution, ExecutionPlan, Partitioning, +// }; +use arrow::datatypes::SchemaRef; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::Partitioning; + +use crate::{DisplayAs, DisplayFormatType, ExecutionPlan}; + +use super::expressions::PhysicalSortExpr; +use super::stream::RecordBatchReceiverStream; +use super::{ + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + SendableRecordBatchStream, Statistics, +}; +use datafusion_common::{DataFusionError, Result}; + +// use crate::exe::context::TaskContext; + +/// A temporary "working table" operation wehre the input data will be +/// taken from the named handle during the execution and will be re-published +/// as is (kind of like a mirror). +/// +/// Most notably used in the implementation of recursive queries where the +/// underlying relation does not exist yet but the data will come as the previous +/// term is evaluated. +#[derive(Debug)] +pub struct ContinuanceExec { + /// Name of the relation handler + name: String, + /// The schema of the stream + schema: SchemaRef, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, +} + +impl ContinuanceExec { + /// Create a new execution plan for a continuance stream. The given relation + /// handler must exist in the task context before calling [`execute`] on this + /// plan. + pub fn new(name: String, schema: SchemaRef) -> Self { + Self { + name, + schema, + metrics: ExecutionPlanMetricsSet::new(), + } + } +} + +impl DisplayAs for ContinuanceExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + // TODO: add more details + write!(f, "ContinuanceExec: name={}", self.name) + } + } + } +} + +impl ExecutionPlan for ContinuanceExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn children(&self) -> Vec> { + vec![] + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn maintains_input_order(&self) -> Vec { + vec![false] + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(Arc::new(ContinuanceExec::new( + self.name.clone(), + self.schema.clone(), + ))) + } + + /// This plan does not come with any special streams, but rather we use + /// the existing [`RecordBatchReceiverStream`] to receive the data from + /// the registered handle. + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + // Continuance streams must be the plan base. + if partition != 0 { + return Err(DataFusionError::Internal(format!( + "ContinuanceExec got an invalid partition {} (expected 0)", + partition + ))); + } + + // let stream = Box::pin(CombinedRecordBatchStream::new( + // self.schema(), + // input_stream_vec, + // )); + // return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); + + // The relation handler must be already registered by the + // parent op. + let receiver = context.pop_relation_handler(self.name.clone())?; + // TODO: this looks wrong. + Ok(RecordBatchReceiverStream::builder(self.schema.clone(), 1).build()) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Statistics { + Statistics::default() + } +} + +#[cfg(test)] +mod tests {} diff --git a/testing b/testing index 98fceecd024d..37f29510ce97 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 98fceecd024dccd2f8a00e32fc144975f218acf4 +Subproject commit 37f29510ce97cd491b8e6ed75866c6533a5ea2a1 From 3d6ee5e779ff342f813502384ed33c25ec4744c7 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Sat, 16 Sep 2023 22:29:33 -0700 Subject: [PATCH 02/15] Continuing implementation with fixes and improvements Lint fixes --- datafusion-cli/Cargo.lock | 2 + datafusion/common/src/dfschema.rs | 5 + datafusion/core/src/physical_planner.rs | 28 +- datafusion/execution/Cargo.toml | 8 + datafusion/execution/src/task.rs | 7 +- datafusion/expr/src/logical_plan/builder.rs | 35 ++ datafusion/expr/src/logical_plan/mod.rs | 5 +- datafusion/expr/src/logical_plan/plan.rs | 69 ++++ .../optimizer/src/common_subexpr_eliminate.rs | 2 + datafusion/physical-plan/Cargo.toml | 14 +- datafusion/physical-plan/src/continuance.rs | 33 +- datafusion/physical-plan/src/filter.rs | 4 +- datafusion/physical-plan/src/lib.rs | 2 + .../physical-plan/src/recursive_query.rs | 355 ++++++++++++++++++ datafusion/proto/src/logical_plan/mod.rs | 6 + datafusion/sql/src/query.rs | 140 ++++++- 16 files changed, 675 insertions(+), 40 deletions(-) create mode 100644 datafusion/physical-plan/src/recursive_query.rs diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 06bc14c5b656..40b1cccc5658 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1202,6 +1202,7 @@ dependencies = [ "parking_lot", "rand", "tempfile", + "tokio", "url", ] @@ -1293,6 +1294,7 @@ dependencies = [ "pin-project-lite", "rand", "tokio", + "tokio-stream", "uuid", ] diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 52cd85675824..c14337fcadf5 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -897,6 +897,11 @@ impl DFField { self.field = f.into(); self } + + pub fn with_qualifier(mut self, qualifier: impl Into) -> Self { + self.qualifier = Some(qualifier.into()); + self + } } impl From for DFField { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 82d96c98e688..faabc1a1e88b 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -47,6 +47,7 @@ use crate::physical_expr::create_physical_expr; use crate::physical_optimizer::optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::analyze::AnalyzeExec; +use crate::physical_plan::continuance::ContinuanceExec; use crate::physical_plan::empty::EmptyExec; use crate::physical_plan::explain::ExplainExec; use crate::physical_plan::expressions::{Column, PhysicalSortExpr}; @@ -58,6 +59,7 @@ use crate::physical_plan::joins::{ use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::projection::ProjectionExec; +use crate::physical_plan::recursive_query::RecursiveQueryExec; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::union::UnionExec; @@ -89,7 +91,8 @@ use datafusion_expr::expr::{ use datafusion_expr::expr_rewriter::{unalias, unnormalize_cols}; use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary; use datafusion_expr::{ - DescribeTable, DmlStatement, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, + DescribeTable, DmlStatement, NamedRelation, RecursiveQuery, StringifiedPlan, + WindowFrame, WindowFrameBound, WriteOp, }; use datafusion_physical_expr::expressions::Literal; use datafusion_sql::utils::window_expr_common_partition_keys; @@ -1311,6 +1314,29 @@ impl DefaultPhysicalPlanner { Ok(plan) } } + LogicalPlan::RecursiveQuery(RecursiveQuery { name, static_term, recursive_term, is_distinct }) => { + let static_term = self.create_initial_plan(static_term, session_state).await?; + let recursive_term = self.create_initial_plan(recursive_term, session_state).await?; + + Ok(Arc::new(RecursiveQueryExec::new(name.clone(), static_term, recursive_term, *is_distinct))) + } + LogicalPlan::NamedRelation(NamedRelation {name, schema}) => { + // Named relations is how we represent access to any sort of dynamic data provider. They + // differ from tables in the sense that they can start existing dynamically during the + // execution of a query and then disappear before it even finishes. + // + // This system allows us to replicate the tricky behavior of classical databases where a + // temporary "working table" (as it is called in Postgres) can be used when dealing with + // complex operations (such as recursive CTEs) and then can be dropped. Since DataFusion + // at its core is heavily stream-based and vectorized, we try to avoid using 'real' tables + // and let the streams take care of the data flow in this as well. + + // Since the actual "input"'s will be only available to us at runtime (through task context) + // we can't really do any sort of meaningful validation here. + let schema = SchemaRef::new(schema.as_ref().to_owned().into()); + Ok(Arc::new(ContinuanceExec::new(name.clone(), schema))) + } + }; exec_plan }.boxed() diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index e9bb87e9f8ac..06b064f6b7b8 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -46,3 +46,11 @@ parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } url = { workspace = true } +tokio = { version = "1.28", features = [ + "macros", + "rt", + "rt-multi-thread", + "sync", + "fs", + "parking_lot", +] } diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index cc05a49123e6..31a4df246946 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -33,13 +33,14 @@ use crate::{ runtime_env::{RuntimeConfig, RuntimeEnv}, }; -use arrow::{error::Result as ArrowResult, record_batch::RecordBatch}; -use futures::channel::mpsc::Receiver as SingleChannelReceiver; +use arrow::record_batch::RecordBatch; +// use futures::channel::mpsc::Receiver as SingleChannelReceiver; +use tokio::sync::mpsc::Receiver as SingleChannelReceiver; // use futures::lock::Mutex; use parking_lot::Mutex; // use futures:: -type RelationHandler = SingleChannelReceiver>; +type RelationHandler = SingleChannelReceiver>; /// Task Execution Context /// diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index c4ff9fe95435..4a3904c576a3 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -55,6 +55,8 @@ use datafusion_common::{ ToDFSchema, UnnestOptions, }; +use super::plan::{NamedRelation, RecursiveQuery}; + /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -121,6 +123,39 @@ impl LogicalPlanBuilder { })) } + /// A named temporary relation with a schema. + /// + /// This is used to represent a relation that does not exist at the + /// planning stage, but will be created at execution time with the + /// given schema. + pub fn named_relation(name: &str, schema: DFSchemaRef) -> Self { + Self::from(LogicalPlan::NamedRelation(NamedRelation { + name: name.to_string(), + schema, + })) + } + + /// Convert a regular plan into a recursive query. + pub fn to_recursive_query( + &self, + name: String, + recursive_term: LogicalPlan, + is_distinct: bool, + ) -> Result { + // TODO: we need to do a bunch of validation here. Maybe more. + if is_distinct { + return Err(DataFusionError::NotImplemented( + "Recursive queries with distinct is not supported".to_string(), + )); + } + Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery { + name, + static_term: Arc::new(self.plan.clone()), + recursive_term: Arc::new(recursive_term), + is_distinct, + }))) + } + /// Create a values list based relation, and the schema is inferred from data, consuming /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) /// documentation for more details. diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 51d78cd721b6..b75c1620119d 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -35,8 +35,9 @@ pub use dml::{DmlStatement, WriteOp}; pub use plan::{ Aggregate, Analyze, CrossJoin, DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, - Partitioning, PlanType, Prepare, Projection, Repartition, Sort, StringifiedPlan, - Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, + NamedRelation, Partitioning, PlanType, Prepare, Projection, RecursiveQuery, + Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, + ToStringifiedPlan, Union, Unnest, Values, Window, }; pub use statement::{ SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index a024824c7a5a..fd1e16e33160 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -108,6 +108,8 @@ pub enum LogicalPlan { /// produces 0 or 1 row. This is used to implement SQL `SELECT` /// that has no values in the `FROM` clause. EmptyRelation(EmptyRelation), + /// A named temporary relation with a schema. + NamedRelation(NamedRelation), /// Produces the output of running another query. This is used to /// implement SQL subqueries Subquery(Subquery), @@ -150,6 +152,8 @@ pub enum LogicalPlan { /// Unnest a column that contains a nested list type such as an /// ARRAY. This is used to implement SQL `UNNEST` Unnest(Unnest), + /// A variadic query (e.g. "Recursive CTEs") + RecursiveQuery(RecursiveQuery), } impl LogicalPlan { @@ -187,6 +191,11 @@ impl LogicalPlan { LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(), LogicalPlan::Ddl(ddl) => ddl.schema(), LogicalPlan::Unnest(Unnest { schema, .. }) => schema, + LogicalPlan::NamedRelation(NamedRelation { schema, .. }) => schema, + LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => { + // we take the schema of the static term as the schema of the entire recursive query + static_term.schema() + } } } @@ -229,6 +238,7 @@ impl LogicalPlan { LogicalPlan::Explain(_) | LogicalPlan::Analyze(_) | LogicalPlan::EmptyRelation(_) + | LogicalPlan::NamedRelation(_) | LogicalPlan::Ddl(_) | LogicalPlan::Dml(_) | LogicalPlan::Copy(_) @@ -239,6 +249,10 @@ impl LogicalPlan { | LogicalPlan::TableScan(_) => { vec![self.schema()] } + LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => { + // return only the schema of the static term + static_term.all_schemas() + } // return children schemas LogicalPlan::Limit(_) | LogicalPlan::Subquery(_) @@ -380,6 +394,9 @@ impl LogicalPlan { .try_for_each(f), // plans without expressions LogicalPlan::EmptyRelation(_) + | LogicalPlan::NamedRelation(_) + // TODO: not sure if this should go here + | LogicalPlan::RecursiveQuery(_) | LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) | LogicalPlan::Limit(_) @@ -426,8 +443,14 @@ impl LogicalPlan { LogicalPlan::Ddl(ddl) => ddl.inputs(), LogicalPlan::Unnest(Unnest { input, .. }) => vec![input], LogicalPlan::Prepare(Prepare { input, .. }) => vec![input], + LogicalPlan::RecursiveQuery(RecursiveQuery { + static_term, + recursive_term, + .. + }) => vec![static_term, recursive_term], // plans without inputs LogicalPlan::TableScan { .. } + | LogicalPlan::NamedRelation(_) | LogicalPlan::Statement { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Values { .. } @@ -506,6 +529,9 @@ impl LogicalPlan { cross.left.head_output_expr() } } + LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => { + static_term.head_output_expr() + } LogicalPlan::Union(union) => Ok(Some(Expr::Column( union.schema.fields()[0].qualified_column(), ))), @@ -525,6 +551,7 @@ impl LogicalPlan { } LogicalPlan::Subquery(_) => Ok(None), LogicalPlan::EmptyRelation(_) + | LogicalPlan::NamedRelation(_) | LogicalPlan::Prepare(_) | LogicalPlan::Statement(_) | LogicalPlan::Values(_) @@ -863,6 +890,14 @@ impl LogicalPlan { }; Ok(LogicalPlan::Distinct(distinct)) } + LogicalPlan::RecursiveQuery(RecursiveQuery { + name, is_distinct, .. + }) => Ok(LogicalPlan::RecursiveQuery(RecursiveQuery { + name: name.clone(), + static_term: Arc::new(inputs[0].clone()), + recursive_term: Arc::new(inputs[1].clone()), + is_distinct: *is_distinct, + })), LogicalPlan::Analyze(a) => { assert!(expr.is_empty()); assert_eq!(inputs.len(), 1); @@ -901,6 +936,7 @@ impl LogicalPlan { })) } LogicalPlan::EmptyRelation(_) + | LogicalPlan::NamedRelation(_) | LogicalPlan::Ddl(_) | LogicalPlan::Statement(_) => { // All of these plan types have no inputs / exprs so should not be called @@ -1096,6 +1132,9 @@ impl LogicalPlan { }), LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch, LogicalPlan::EmptyRelation(_) => Some(0), + // TODO: not sure if this is correct + LogicalPlan::NamedRelation(_) => None, + LogicalPlan::RecursiveQuery(_) => None, LogicalPlan::Subquery(_) => None, LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(), LogicalPlan::Limit(Limit { fetch, .. }) => *fetch, @@ -1452,6 +1491,14 @@ impl LogicalPlan { fn fmt(&self, f: &mut Formatter) -> fmt::Result { match self.0 { LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"), + LogicalPlan::NamedRelation(NamedRelation { name, .. }) => { + write!(f, "NamedRelation: {}", name) + } + LogicalPlan::RecursiveQuery(RecursiveQuery { + is_distinct, .. + }) => { + write!(f, "RecursiveQuery: is_distinct={}", is_distinct) + } LogicalPlan::Values(Values { ref values, .. }) => { let str_values: Vec<_> = values .iter() @@ -1762,6 +1809,28 @@ pub struct EmptyRelation { pub schema: DFSchemaRef, } +/// A named temporary relation with a known schema. +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct NamedRelation { + /// The relation name + pub name: String, + /// The schema description + pub schema: DFSchemaRef, +} + +/// A variadic query operation +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct RecursiveQuery { + /// Name of the query + pub name: String, + /// The static term + pub static_term: Arc, + /// The recursive term + pub recursive_term: Arc, + /// Distinction + pub is_distinct: bool, +} + /// Values expression. See /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) /// documentation for more details. diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index f5ad767c5016..9bb7e5820a01 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -362,6 +362,8 @@ impl OptimizerRule for CommonSubexprEliminate { | LogicalPlan::Dml(_) | LogicalPlan::Copy(_) | LogicalPlan::Unnest(_) + | LogicalPlan::NamedRelation(_) + | LogicalPlan::RecursiveQuery(_) | LogicalPlan::Prepare(_) => { // apply the optimization to all inputs of the plan utils::optimize_children(self, plan, config)? diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 6c761fc9687c..9ed5aae77460 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -33,7 +33,9 @@ name = "datafusion_physical_plan" path = "src/lib.rs" [dependencies] -ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } +ahash = { version = "0.8", default-features = false, features = [ + "runtime-rng", +] } arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } @@ -56,8 +58,16 @@ pin-project-lite = "^0.2.7" rand = { workspace = true } tokio = { version = "1.28", features = ["sync", "fs", "parking_lot"] } uuid = { version = "^1.2", features = ["v4"] } +tokio-stream = { version = "0.1.14" } [dev-dependencies] rstest = { workspace = true } termtree = "0.4.1" -tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } +tokio = { version = "1.28", features = [ + "macros", + "rt", + "rt-multi-thread", + "sync", + "fs", + "parking_lot", +] } diff --git a/datafusion/physical-plan/src/continuance.rs b/datafusion/physical-plan/src/continuance.rs index 1b2d6ce93db1..3ed2a79465f4 100644 --- a/datafusion/physical-plan/src/continuance.rs +++ b/datafusion/physical-plan/src/continuance.rs @@ -27,26 +27,32 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use datafusion_execution::TaskContext; use datafusion_physical_expr::Partitioning; +use tokio_stream::wrappers::ReceiverStream; +use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayAs, DisplayFormatType, ExecutionPlan}; use super::expressions::PhysicalSortExpr; -use super::stream::RecordBatchReceiverStream; + use super::{ metrics::{ExecutionPlanMetricsSet, MetricsSet}, SendableRecordBatchStream, Statistics, }; use datafusion_common::{DataFusionError, Result}; +// use tokio::stream::; // use crate::exe::context::TaskContext; -/// A temporary "working table" operation wehre the input data will be +/// A temporary "working table" operation where the input data will be /// taken from the named handle during the execution and will be re-published /// as is (kind of like a mirror). /// /// Most notably used in the implementation of recursive queries where the /// underlying relation does not exist yet but the data will come as the previous -/// term is evaluated. +/// term is evaluated. This table will be used such that the recursive plan +/// will register a receiver in the task context and this plan will use that +/// receiver to get the data and stream it back up so that the batches are available +/// in the next iteration. #[derive(Debug)] pub struct ContinuanceExec { /// Name of the relation handler @@ -59,7 +65,7 @@ pub struct ContinuanceExec { impl ContinuanceExec { /// Create a new execution plan for a continuance stream. The given relation - /// handler must exist in the task context before calling [`execute`] on this + /// handler must exist in the task context before calling [`ContinuanceExec::execute`] on this /// plan. pub fn new(name: String, schema: SchemaRef) -> Self { Self { @@ -78,7 +84,6 @@ impl DisplayAs for ContinuanceExec { ) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - // TODO: add more details write!(f, "ContinuanceExec: name={}", self.name) } } @@ -125,7 +130,7 @@ impl ExecutionPlan for ContinuanceExec { } /// This plan does not come with any special streams, but rather we use - /// the existing [`RecordBatchReceiverStream`] to receive the data from + /// the existing [`RecordBatchStreamAdapter`] to receive the data from /// the registered handle. fn execute( &self, @@ -140,25 +145,21 @@ impl ExecutionPlan for ContinuanceExec { ))); } - // let stream = Box::pin(CombinedRecordBatchStream::new( - // self.schema(), - // input_stream_vec, - // )); - // return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); - // The relation handler must be already registered by the // parent op. let receiver = context.pop_relation_handler(self.name.clone())?; - // TODO: this looks wrong. - Ok(RecordBatchReceiverStream::builder(self.schema.clone(), 1).build()) + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + ReceiverStream::new(receiver), + ))) } fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } - fn statistics(&self) -> Statistics { - Statistics::default() + fn statistics(&self) -> Result { + Ok(Statistics::new_unknown(&self.schema())) } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index b6cd9fe79c85..6ee9a95f97f0 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -317,13 +317,13 @@ impl Stream for FilterExecStream { match self.input.poll_next_unpin(cx) { Poll::Ready(value) => match value { Some(Ok(batch)) => { - let timer = self.baseline_metrics.elapsed_compute().timer(); + // let timer = self.baseline_metrics.elapsed_compute().timer(); let filtered_batch = batch_filter(&batch, &self.predicate)?; // skip entirely filtered batches if filtered_batch.num_rows() == 0 { continue; } - timer.done(); + // timer.done(); poll = Poll::Ready(Some(Ok(filtered_batch))); break; } diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index e5cd5e674cb1..e2959ef8d34b 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -49,6 +49,7 @@ pub mod analyze; pub mod coalesce_batches; pub mod coalesce_partitions; pub mod common; +pub mod continuance; pub mod display; pub mod empty; pub mod explain; @@ -59,6 +60,7 @@ pub mod limit; pub mod memory; pub mod metrics; pub mod projection; +pub mod recursive_query; pub mod repartition; pub mod sorts; pub mod stream; diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs new file mode 100644 index 000000000000..b500dec41510 --- /dev/null +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -0,0 +1,355 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the recursive query plan + +use std::any::Any; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::{DataFusionError, Result}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::Partitioning; +use futures::{Stream, StreamExt}; +use tokio::sync::mpsc; + +use super::expressions::PhysicalSortExpr; +use super::metrics::BaselineMetrics; +use super::RecordBatchStream; +use super::{ + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + SendableRecordBatchStream, Statistics, +}; +use arrow::error::ArrowError; +use tokio::sync::mpsc::{Receiver, Sender}; + +// use crate::execution::context::TaskContext; +use crate::{DisplayAs, DisplayFormatType, ExecutionPlan}; + +/// Recursive query execution plan. +/// +/// This plan has two components: a base part (the static term) and +/// a dynamic part (the recursive term). The execution will start from +/// the base, and as long as the previous iteration produced at least +/// a single new row (taking care of the distinction) the recursive +/// part will be continuously executed. +/// +/// Before each execution of the dynamic part, the rows from the previous +/// iteration will be available in a "working table" (not a real table, +/// can be only accessed using a continuance operation). +/// +/// Note that there won't be any limit or checks applied to detect +/// an infinite recursion, so it is up to the planner to ensure that +/// it won't happen. +#[derive(Debug)] +pub struct RecursiveQueryExec { + /// Name of the query handler + name: String, + /// The base part (static term) + static_term: Arc, + /// The dynamic part (recursive term) + recursive_term: Arc, + /// Distinction + is_distinct: bool, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, +} + +impl RecursiveQueryExec { + /// Create a new RecursiveQueryExec + pub fn new( + name: String, + static_term: Arc, + recursive_term: Arc, + is_distinct: bool, + ) -> Self { + RecursiveQueryExec { + name, + static_term, + recursive_term, + is_distinct, + metrics: ExecutionPlanMetricsSet::new(), + } + } +} + +impl ExecutionPlan for RecursiveQueryExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.static_term.schema() + } + + fn children(&self) -> Vec> { + vec![self.static_term.clone(), self.recursive_term.clone()] + } + + // Distribution on a recursive query is really tricky to handle. + // For now, we are going to use a single partition but in the + // future we might find a better way to handle this. + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + // TODO: control these hints and see whether we can + // infer some from the child plans (static/recurisve terms). + fn maintains_input_order(&self) -> Vec { + vec![false, false] + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false, false] + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(RecursiveQueryExec::new( + self.name.clone(), + children[0].clone(), + children[1].clone(), + self.is_distinct, + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + // TODO: we might be able to handle multiple partitions in the future. + if partition != 0 { + return Err(DataFusionError::Internal(format!( + "RecursiveQueryExec got an invalid partition {} (expected 0)", + partition + ))); + } + + let static_stream = self.static_term.execute(partition, context.clone())?; + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + Ok(Box::pin(RecursiveQueryStream::new( + context, + self.name.clone(), + self.recursive_term.clone(), + static_stream, + baseline_metrics, + ))) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Result { + Ok(Statistics::new_unknown(&self.schema())) + } +} + +impl DisplayAs for RecursiveQueryExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "RecursiveQueryExec: is_distinct={}", self.is_distinct) + } + } + } +} + +/// The actual logic of the recursive queries happens during the streaming +/// process. A simplified version of the algorithm is the following: +/// +/// buffer = [] +/// +/// while batch := static_stream.next(): +/// buffer.push(batch) +/// yield buffer +/// +/// while buffer.len() > 0: +/// sender, receiver = Channel() +/// register_continuation(handle_name, receiver) +/// sender.send(buffer.drain()) +/// recursive_stream = recursive_term.execute() +/// while batch := recursive_stream.next(): +/// buffer.append(batch) +/// yield buffer +/// +struct RecursiveQueryStream { + /// The context to be used for managing handlers & executing new tasks + task_context: Arc, + /// Name of the relation handler to be used by the recursive term + name: String, + /// The dynamic part (recursive term) as is (without being executed) + recursive_term: Arc, + /// The static part (static term) as a stream. If the processing of this + /// part is completed, then it will be None. + static_stream: Option, + /// The dynamic part (recursive term) as a stream. If the processing of this + /// part has not started yet, or has been completed, then it will be None. + recursive_stream: Option, + /// The schema of the output. + schema: SchemaRef, + /// In-memory buffer for storing a copy of the current results. Will be + /// cleared after each iteration. + buffer: Vec, + // /// Metrics. + _baseline_metrics: BaselineMetrics, +} + +impl RecursiveQueryStream { + /// Create a new recursive query stream + fn new( + task_context: Arc, + name: String, + recursive_term: Arc, + static_stream: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + ) -> Self { + let schema = static_stream.schema(); + Self { + task_context, + name, + recursive_term, + static_stream: Some(static_stream), + recursive_stream: None, + schema, + buffer: vec![], + _baseline_metrics: baseline_metrics, + } + } + + /// Push a clone of the given batch to the in memory buffer, and then return + /// a poll with it. + fn push_batch( + mut self: std::pin::Pin<&mut Self>, + batch: RecordBatch, + ) -> Poll>> { + self.buffer.push(batch.clone()); + Poll::Ready(Some(Ok(batch))) + } + + /// Start polling for the next iteration, will be called either after the static term + /// is completed or another term is completed. It will follow the algorithm above on + /// to check whether the recursion has ended. + fn poll_next_iteration( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let total_length = self + .buffer + .iter() + .fold(0, |acc, batch| acc + batch.num_rows()); + + if total_length == 0 { + return Poll::Ready(None); + } + + // The initial capacity of the channels is the same as the number of partitions + // we currently hold in the buffer. + let (sender, receiver): ( + Sender>, + Receiver>, + ) = mpsc::channel(self.buffer.len() + 1); + + // There shouldn't be any handlers with this name, since the execution of recursive + // term will immediately consume the relation handler. + self.task_context + .push_relation_handler(self.name.clone(), receiver)?; + + // This part heavily assumes that the buffer is not going to change. Maybe we + // should use a mutex? + for batch in self.buffer.drain(..) { + match sender.try_send(Ok(batch.clone())) { + Ok(_) => {} + Err(e) => { + return Poll::Ready(Some(Err(DataFusionError::ArrowError( + ArrowError::from_external_error(Box::new(e)), + )))); + } + } + } + + // We always execute (and re-execute iteratively) the first partition. + // Downstream plans should not expect any partitioning. + let partition = 0; + + self.recursive_stream = Some( + self.recursive_term + .execute(partition, self.task_context.clone())?, + ); + self.poll_next(cx) + } +} + +impl Stream for RecursiveQueryStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + // TODO: we should use this poll to record some metrics! + if let Some(static_stream) = &mut self.static_stream { + // While the static term's stream is available, we'll be forwarding the batches from it (also + // saving them for the initial iteration of the recursive term). + let poll = static_stream.poll_next_unpin(cx); + match &poll { + Poll::Ready(None) => { + // Once this is done, we can start running the setup for the recursive term. + self.static_stream = None; + self.poll_next_iteration(cx) + } + Poll::Ready(Some(Ok(batch))) => self.push_batch(batch.clone()), + _ => poll, + } + } else if let Some(recursive_stream) = &mut self.recursive_stream { + let poll = recursive_stream.poll_next_unpin(cx); + match &poll { + Poll::Ready(None) => { + self.recursive_stream = None; + self.poll_next_iteration(cx) + } + Poll::Ready(Some(Ok(batch))) => self.push_batch(batch.clone()), + _ => poll, + } + } else { + Poll::Ready(None) + } + } +} + +impl RecordBatchStream for RecursiveQueryStream { + /// Get the schema + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +#[cfg(test)] +mod tests {} diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 851f062bd51f..f0e059b6f3ae 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1523,6 +1523,12 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::DescribeTable(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for DescribeTable", )), + LogicalPlan::NamedRelation(_) => Err(proto_error( + "LogicalPlan serde is not yet implemented for NamedRelation", + )), + LogicalPlan::RecursiveQuery(_) => Err(proto_error( + "LogicalPlan serde is not yet implemented for RecursiveQuery", + )), } } } diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 832e2da9c6ec..5d4b913626c6 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -20,13 +20,14 @@ use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::{ - not_impl_err, plan_err, sql_err, Constraints, DataFusionError, Result, ScalarValue, + plan_err, sql_err, Constraints, DFSchema, DataFusionError, Result, ScalarValue, }; use datafusion_expr::{ CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder, }; use sqlparser::ast::{ - Expr as SQLExpr, Offset as SQLOffset, OrderByExpr, Query, SetExpr, Value, + Expr as SQLExpr, Offset as SQLOffset, OrderByExpr, Query, SetExpr, SetOperator, + SetQuantifier, Value, }; use sqlparser::parser::ParserError::ParserError; @@ -52,10 +53,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let set_expr = query.body; if let Some(with) = query.with { // Process CTEs from top to bottom - // do not allow self-references - if with.recursive { - return not_impl_err!("Recursive CTEs are not supported"); - } + let is_recursive = with.recursive; for cte in with.cte_tables { // A `WITH` block can't use the same name more than once @@ -65,16 +63,130 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { "WITH query name {cte_name:?} specified more than once" ))); } - // create logical plan & pass backreferencing CTEs - // CTE expr don't need extend outer_query_schema - let logical_plan = - self.query_to_plan(*cte.query, &mut planner_context.clone())?; + let cte_query = cte.query; + if is_recursive { + match *cte_query.body { + SetExpr::SetOperation { + op: SetOperator::Union, + left, + right, + set_quantifier, + } => { + let distinct = set_quantifier != SetQuantifier::All; + + // Each recursive CTE consists from two parts in the logical plan: + // 1. A static term (the left hand side on the SQL, where the + // referencing to the same CTE is not allowed) + // + // 2. A recursive term (the right hand side, and the recursive + // part) + + // Since static term does not have any specific properties, it can + // be compiled as if it was a regular expression. This will + // allow us to infer the schema to be used in the recursive term. + + // ---------- Step 1: Compile the static term ------------------ + let static_plan = self + .set_expr_to_plan(*left, &mut planner_context.clone())?; + + // Since the recursive CTEs include a component that references a + // table with its name, like the example below: + // + // WITH RECURSIVE values(n) AS ( + // SELECT 1 as n -- static term + // UNION ALL + // SELECT n + 1 + // FROM values -- self reference + // WHERE n < 100 + // ) + // + // We need a temporary 'relation' to be referenced and used. PostgreSQL + // calls this a 'working table', but it is entirely an implementation + // detail and a 'real' table with that name might not even exist (as + // in the case of DataFusion). + // + // Since we can't simply register a table during planning stage (it is + // an execution problem), we'll use a relation object that preserves the + // schema of the input perfectly and also knows which recursive CTE it is + // bound to. + + // ---------- Step 2: Create a temporary relation ------------------ + // Step 2.1: Create a schema for the temporary relation + let static_fields = static_plan.schema().fields().clone(); + let static_metadata = static_plan.schema().metadata().clone(); + + let named_relation_schema = DFSchema::new_with_metadata( + // take the fields from the static plan + // but add the cte_name as the qualifier + // so that we can access the fields in the recursive term using + // the cte_name as the qualifier (e.g. table.id) + static_fields + .into_iter() + .map(|field| { + if field.qualifier().is_some() { + field + } else { + field.with_qualifier(cte_name.clone()) + } + }) + .collect(), + static_metadata, + )?; - // Each `WITH` block can change the column names in the last - // projection (e.g. "WITH table(t1, t2) AS SELECT 1, 2"). - let logical_plan = self.apply_table_alias(logical_plan, cte.alias)?; + // Step 2.2: Create a temporary relation logical plan that will be used + // as the input to the recursive term + let named_relation = LogicalPlanBuilder::named_relation( + cte_name.as_str(), + Arc::new(named_relation_schema), + ) + .build()?; - planner_context.insert_cte(cte_name, logical_plan); + // Step 2.3: Register the temporary relation in the planning context + // For all the self references in the variadic term, we'll replace it + // with the temporary relation we created above by temporarily registering + // it as a CTE. This temporary relation in the planning context will be + // replaced by the actual CTE plan once we're done with the planning. + planner_context.insert_cte(cte_name.clone(), named_relation); + + // ---------- Step 3: Compile the recursive term ------------------ + // this uses the named_relation we inserted above to resolve the + // relation. This ensures that the recursive term uses the named relation logical plan + // and thus the 'continuance' physical plan as its input and source + let recursive_plan = self + .set_expr_to_plan(*right, &mut planner_context.clone())?; + + // ---------- Step 4: Create the final plan ------------------ + // Step 4.1: Compile the final plan + let final_plan = LogicalPlanBuilder::from(static_plan) + .to_recursive_query( + cte_name.clone(), + recursive_plan, + distinct, + )? + .build()?; + + // Step 4.2: Remove the temporary relation from the planning context and replace it + // with the final plan. + planner_context.insert_cte(cte_name.clone(), final_plan); + } + _ => { + return Err(DataFusionError::SQL(ParserError( + "Invalid recursive CTE".to_string(), + ))); + } + }; + } else { + // create logical plan & pass backreferencing CTEs + // CTE expr don't need extend outer_query_schema + let logical_plan = + self.query_to_plan(*cte_query, &mut planner_context.clone())?; + + // Each `WITH` block can change the column names in the last + // projection (e.g. "WITH table(t1, t2) AS SELECT 1, 2"). + let logical_plan = self.apply_table_alias(logical_plan, cte.alias)?; + + planner_context.insert_cte(cte_name, logical_plan); + } } } let plan = self.set_expr_to_plan(*(set_expr.clone()), planner_context)?; From 968491260eb8ff156db8107a080071151d037f40 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Wed, 20 Sep 2023 16:54:53 -0700 Subject: [PATCH 03/15] ensure that repartitions are not added immediately after RecursiveExec in the physical-plan --- datafusion/physical-plan/src/recursive_query.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index b500dec41510..b1a816abf571 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -119,6 +119,13 @@ impl ExecutionPlan for RecursiveQueryExec { vec![false, false] } + fn required_input_distribution(&self) -> Vec { + vec![ + datafusion_physical_expr::Distribution::SinglePartition, + datafusion_physical_expr::Distribution::SinglePartition, + ] + } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { None } From d2628d342d18f276f156d8fc7e0a49a836d4e747 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Sun, 24 Sep 2023 21:32:15 -0700 Subject: [PATCH 04/15] add trivial sqllogictest --- datafusion/sqllogictest/test_files/cte.slt | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index c62b56584682..12d9f74775d9 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -19,3 +19,24 @@ query II select * from (WITH source AS (select 1 as e) SELECT * FROM source) t1, (WITH source AS (select 1 as e) SELECT * FROM source) t2 ---- 1 1 + +query I rowsort +WITH RECURSIVE nodes AS ( + SELECT 1 as id + UNION ALL + SELECT id + 1 as id + FROM nodes + WHERE id < 10 +) +SELECT * FROM nodes +---- +1 +10 +2 +3 +4 +5 +6 +7 +8 +9 From e8d78c8e85c10be4be48d7eb733f1d170e3a1555 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Mon, 25 Sep 2023 08:47:40 -0700 Subject: [PATCH 05/15] more recursive tests --- datafusion/sqllogictest/test_files/cte.slt | 78 ++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index 12d9f74775d9..d11b7ac0e8a1 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -20,6 +20,7 @@ select * from (WITH source AS (select 1 as e) SELECT * FROM source) t1, (WITH ---- 1 1 +# trivial recursive CTE works query I rowsort WITH RECURSIVE nodes AS ( SELECT 1 as id @@ -40,3 +41,80 @@ SELECT * FROM nodes 7 8 9 + +# setup +statement ok +CREATE EXTERNAL TABLE beg_account_balance STORED as CSV WITH HEADER ROW LOCATION '../../testing/data/csv/recursive_query_account_beg_2.csv' + +# setup +statement ok +CREATE EXTERNAL TABLE account_balance_growth STORED as CSV WITH HEADER ROW LOCATION '../../testing/data/csv/recursive_query_account_growth_2.csv' + +# recursive CTE with static term derived from table works +query ITI rowsort +WITH RECURSIVE balances AS ( + SELECT * from beg_account_balance + UNION ALL + SELECT time + 1 as time, name, account_balance + 10 as account_balance + FROM balances + WHERE time < 10 +) +SELECT * FROM balances +---- +1 John 100 +1 Tim 200 +10 John 190 +10 Tim 290 +2 John 110 +2 Tim 210 +3 John 120 +3 Tim 220 +4 John 130 +4 Tim 230 +5 John 140 +5 Tim 240 +6 John 150 +6 Tim 250 +7 John 160 +7 Tim 260 +8 John 170 +8 Tim 270 +9 John 180 +9 Tim 280 + + +# recursive CTE with recursive join works +query ITI +WITH RECURSIVE balances AS ( + SELECT time as time, name as name, account_balance as account_balance + FROM beg_account_balance + UNION ALL + SELECT time + 1 as time, balances.name, account_balance + account_balance_growth.account_growth as account_balance + FROM balances + JOIN account_balance_growth + ON balances.name = account_balance_growth.name + WHERE time < 10 +) +SELECT * FROM balances +ORDER BY time, name +---- +1 John 100 +1 Tim 200 +2 John 103 +2 Tim 220 +3 John 106 +3 Tim 240 +4 John 109 +4 Tim 260 +5 John 112 +5 Tim 280 +6 John 115 +6 Tim 300 +7 John 118 +7 Tim 320 +8 John 121 +8 Tim 340 +9 John 124 +9 Tim 360 +10 John 127 +10 Tim 380 From 36463cee6d519870b133ba10a64dc4f347c47ac2 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Sat, 18 Nov 2023 16:58:10 -0800 Subject: [PATCH 06/15] remove test that asserts recursive cte should fail --- datafusion/physical-plan/src/continuance.rs | 7 ------- datafusion/physical-plan/src/filter.rs | 4 ++-- datafusion/physical-plan/src/recursive_query.rs | 1 - datafusion/sql/tests/sql_integration.rs | 16 ---------------- datafusion/sqllogictest/test_files/cte.slt | 2 +- testing | 2 +- 6 files changed, 4 insertions(+), 28 deletions(-) diff --git a/datafusion/physical-plan/src/continuance.rs b/datafusion/physical-plan/src/continuance.rs index 3ed2a79465f4..b4fd3ba31985 100644 --- a/datafusion/physical-plan/src/continuance.rs +++ b/datafusion/physical-plan/src/continuance.rs @@ -20,10 +20,6 @@ use std::any::Any; use std::sync::Arc; -// use crate::error::{DataFusionError, Result}; -// use crate::physical_plan::{ -// DisplayFormatType, Distribution, ExecutionPlan, Partitioning, -// }; use arrow::datatypes::SchemaRef; use datafusion_execution::TaskContext; use datafusion_physical_expr::Partitioning; @@ -39,9 +35,6 @@ use super::{ SendableRecordBatchStream, Statistics, }; use datafusion_common::{DataFusionError, Result}; -// use tokio::stream::; - -// use crate::exe::context::TaskContext; /// A temporary "working table" operation where the input data will be /// taken from the named handle during the execution and will be re-published diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 6ee9a95f97f0..b6cd9fe79c85 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -317,13 +317,13 @@ impl Stream for FilterExecStream { match self.input.poll_next_unpin(cx) { Poll::Ready(value) => match value { Some(Ok(batch)) => { - // let timer = self.baseline_metrics.elapsed_compute().timer(); + let timer = self.baseline_metrics.elapsed_compute().timer(); let filtered_batch = batch_filter(&batch, &self.predicate)?; // skip entirely filtered batches if filtered_batch.num_rows() == 0 { continue; } - // timer.done(); + timer.done(); poll = Poll::Ready(Some(Ok(filtered_batch))); break; } diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index b1a816abf571..b3aa1c7156ec 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -39,7 +39,6 @@ use super::{ use arrow::error::ArrowError; use tokio::sync::mpsc::{Receiver, Sender}; -// use crate::execution::context::TaskContext; use crate::{DisplayAs, DisplayFormatType, ExecutionPlan}; /// Recursive query execution plan. diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 4c2bad1c719e..10b8c9c72f1a 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -1395,22 +1395,6 @@ fn select_array_no_common_type() { ); } -#[test] -fn recursive_ctes() { - let sql = " - WITH RECURSIVE numbers AS ( - select 1 as n - UNION ALL - select n + 1 FROM numbers WHERE N < 10 - ) - select * from numbers;"; - let err = logical_plan(sql).expect_err("query should have failed"); - assert_eq!( - "This feature is not implemented: Recursive CTEs are not supported", - err.strip_backtrace() - ); -} - #[test] fn select_array_non_literal_type() { let sql = "SELECT [now()]"; diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index d11b7ac0e8a1..e0def49f047a 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -48,7 +48,7 @@ CREATE EXTERNAL TABLE beg_account_balance STORED as CSV WITH HEADER ROW LOCATION # setup statement ok -CREATE EXTERNAL TABLE account_balance_growth STORED as CSV WITH HEADER ROW LOCATION '../../testing/data/csv/recursive_query_account_growth_2.csv' +CREATE EXTERNAL TABLE account_balance_growth STORED as CSV WITH HEADER ROW LOCATION '../../testing/data/csv/recursive_query_account_growth_3.csv' # recursive CTE with static term derived from table works query ITI rowsort diff --git a/testing b/testing index 37f29510ce97..bb8b92eb0ba7 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 37f29510ce97cd491b8e6ed75866c6533a5ea2a1 +Subproject commit bb8b92eb0ba7d9d1ae2348f454d97dd361d36ade From 584bd922dbbd9d4925790a800359708a924642d3 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Sat, 18 Nov 2023 17:09:53 -0800 Subject: [PATCH 07/15] additional cte test --- datafusion/sqllogictest/test_files/cte.slt | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index e0def49f047a..52ea127e1cf6 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -118,3 +118,16 @@ ORDER BY time, name 9 Tim 360 10 John 127 10 Tim 380 + +# recursive CTE with aggregations works +query I rowsort +WITH RECURSIVE nodes AS ( + SELECT 1 as id + UNION ALL + SELECT id + 1 as id + FROM nodes + WHERE id < 10 +) +SELECT sum(id) FROM nodes +---- +55 From c5a168220c126f39e3976c089821501d6f160a15 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Sat, 16 Sep 2023 22:29:33 -0700 Subject: [PATCH 08/15] Continuing implementation with fixes and improvements Lint fixes --- datafusion/expr/src/logical_plan/builder.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 4a3904c576a3..2d58657849e2 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -57,6 +57,8 @@ use datafusion_common::{ use super::plan::{NamedRelation, RecursiveQuery}; +use super::plan::{NamedRelation, RecursiveQuery}; + /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; From abb8009208f15e6c683862506ab50f9b592456da Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Wed, 20 Sep 2023 15:25:33 -0700 Subject: [PATCH 09/15] compiles and tests pass --- .../physical_optimizer/coalesce_batches.rs | 117 ++++++++++++++++-- .../enforce_distribution.rs | 70 ++++++++++- .../core/src/physical_optimizer/utils.rs | 9 ++ 3 files changed, 184 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index 7b66ca529094..c785ccde7350 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -19,6 +19,7 @@ //! in bigger batches to avoid overhead with small batches use crate::config::ConfigOptions; +use crate::physical_optimizer::utils::get_plan_string; use crate::{ error::Result, physical_optimizer::PhysicalOptimizerRule, @@ -27,9 +28,14 @@ use crate::{ repartition::RepartitionExec, Partitioning, }, }; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::{DynTreeNode, Transformed, TreeNode, VisitRecursion}; +use datafusion_physical_plan::ExecutionPlan; +use itertools::Itertools; +use std::fmt::{self, Formatter}; use std::sync::Arc; +use super::utils::is_recursive_query; + /// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that /// are produced by highly selective filters #[derive(Default)] @@ -41,6 +47,94 @@ impl CoalesceBatches { Self::default() } } + +struct CoalesceContext { + plan: Arc, + // keep track of whether we've encountered a RecursiveQuery + has_recursive_ancestor: bool, +} + +impl CoalesceContext { + fn new(plan: Arc) -> Self { + Self { + has_recursive_ancestor: is_recursive_query(&plan), + plan, + } + } + + fn new_descendent(&self, descendent_plan: Arc) -> Self { + Self { + has_recursive_ancestor: self.has_recursive_ancestor + || is_recursive_query(&descendent_plan), + plan: descendent_plan, + } + } + + /// Computes distribution tracking contexts for every child of the plan. + fn children(&self) -> Vec { + self.plan + .children() + .into_iter() + .map(|child| self.new_descendent(child)) + .collect() + } +} + +impl TreeNode for CoalesceContext { + fn apply_children(&self, op: &mut F) -> Result + where + F: FnMut(&Self) -> Result, + { + for child in self.children() { + match op(&child)? { + VisitRecursion::Continue => {} + VisitRecursion::Skip => return Ok(VisitRecursion::Continue), + VisitRecursion::Stop => return Ok(VisitRecursion::Stop), + } + } + Ok(VisitRecursion::Continue) + } + + fn map_children(self, transform: F) -> Result + where + F: FnMut(Self) -> Result, + { + let children = self.children(); + if children.is_empty() { + Ok(self) + } else { + let new_children = children + .into_iter() + .map(transform) + .collect::>>()?; + + Ok(self.new_descendent( + self.plan.with_new_arc_children( + self.plan.clone(), + new_children + .into_iter() + .map(|CoalesceContext { plan, .. }| plan) + .collect(), + )?, + )) + } + } +} + +/// implement Display method for `DistributionContext` struct. +impl fmt::Display for CoalesceContext { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let plan_string = get_plan_string(&self.plan); + write!(f, "plan: {:?}", plan_string)?; + write!( + f, + "has_recursive_ancestor: {:?}", + self.has_recursive_ancestor + )?; + write!(f, "") + } +} + impl PhysicalOptimizerRule for CoalesceBatches { fn optimize( &self, @@ -48,12 +142,16 @@ impl PhysicalOptimizerRule for CoalesceBatches { config: &ConfigOptions, ) -> Result> { if !config.execution.coalesce_batches { - return Ok(plan); + // return Ok(plan); } let target_batch_size = config.execution.batch_size; - plan.transform_up(&|plan| { - let plan_any = plan.as_any(); + let ctx = CoalesceContext::new(plan); + let CoalesceContext { plan, .. } = ctx.transform_up(&|ctx| { + if ctx.has_recursive_ancestor { + // return Ok(Transformed::No(ctx)); + } + let plan_any = ctx.plan.as_any(); // The goal here is to detect operators that could produce small batches and only // wrap those ones with a CoalesceBatchesExec operator. An alternate approach here // would be to build the coalescing logic directly into the operators @@ -71,14 +169,15 @@ impl PhysicalOptimizerRule for CoalesceBatches { }) .unwrap_or(false); if wrap_in_coalesce { - Ok(Transformed::Yes(Arc::new(CoalesceBatchesExec::new( - plan, - target_batch_size, + Ok(Transformed::Yes(ctx.new_descendent(Arc::new( + CoalesceBatchesExec::new(ctx.plan.clone(), target_batch_size), )))) } else { - Ok(Transformed::No(plan)) + Ok(Transformed::No(ctx)) } - }) + })?; + + Ok(plan) } fn name(&self) -> &str { diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 4aedc3b0d1a9..243d03ec6738 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -59,6 +59,8 @@ use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAg use itertools::izip; +use super::utils::is_recursive_query; + /// The `EnforceDistribution` rule ensures that distribution requirements are /// met. In doing so, this rule will increase the parallelism in the plan by /// introducing repartitioning operators to the physical plan. @@ -1088,6 +1090,7 @@ fn remove_dist_changing_operators( let DistributionContext { mut plan, mut distribution_onwards, + has_recursive_ancestor, } = distribution_context; // Remove any distribution changing operators at the beginning: @@ -1107,9 +1110,34 @@ fn remove_dist_changing_operators( Ok(DistributionContext { plan, distribution_onwards, + has_recursive_ancestor, }) } +/// Changes each child of the `dist_context.plan` such that they no longer +/// use order preserving variants, if no ordering is required at the output +/// of the physical plan (there is no global ordering requirement by the query). +fn update_plan_to_remove_unnecessary_final_order( + dist_context: DistributionContext, +) -> Result> { + let DistributionContext { + plan, + distribution_onwards, + .. + } = dist_context; + let new_children = izip!(plan.children(), distribution_onwards) + .map(|(mut child, mut dist_onward)| { + replace_order_preserving_variants(&mut child, &mut dist_onward)?; + Ok(child) + }) + .collect::>>()?; + if !new_children.is_empty() { + plan.with_new_children(new_children) + } else { + Ok(plan) + } +} + /// Updates the physical plan `input` by using `dist_onward` replace order preserving operator variants /// with their corresponding operators that do not preserve order. It is a wrapper for `replace_order_preserving_variants_helper` fn replace_order_preserving_variants( @@ -1176,6 +1204,13 @@ fn ensure_distribution( dist_context: DistributionContext, config: &ConfigOptions, ) -> Result> { + let mut dist_context = dist_context; + if is_recursive_query(&dist_context.plan) { + dist_context.has_recursive_ancestor = true; + } + if dist_context.has_recursive_ancestor { + return Ok(Transformed::No(dist_context)); + } let target_partitions = config.execution.target_partitions; // When `false`, round robin repartition will not be added to increase parallelism let enable_round_robin = config.optimizer.enable_round_robin_repartition; @@ -1196,7 +1231,8 @@ fn ensure_distribution( let DistributionContext { mut plan, mut distribution_onwards, - } = remove_dist_changing_operators(dist_context)?; + has_recursive_ancestor, + } = remove_unnecessary_repartition(dist_context)?; if let Some(exec) = plan.as_any().downcast_ref::() { if let Some(updated_window) = get_best_fitting_window( @@ -1365,6 +1401,7 @@ fn ensure_distribution( plan.with_new_children(new_children)? }, distribution_onwards, + has_recursive_ancestor: has_recursive_ancestor || is_recursive_query(&plan), }; Ok(Transformed::Yes(new_distribution_context)) } @@ -1379,6 +1416,8 @@ struct DistributionContext { /// Keep track of associations for each child of the plan. If `None`, /// there is no distribution changing operator in its descendants. distribution_onwards: Vec>, + // keep track of whether we've encountered a RecursiveQuery + has_recursive_ancestor: bool, } impl DistributionContext { @@ -1386,11 +1425,22 @@ impl DistributionContext { fn new(plan: Arc) -> Self { let length = plan.children().len(); DistributionContext { + has_recursive_ancestor: is_recursive_query(&plan), plan, distribution_onwards: vec![None; length], } } + fn new_from_plan_with_parent( + parent: Arc, + cur_plan: Arc, + ) -> Self { + let mut ctx = Self::new(cur_plan); + ctx.has_recursive_ancestor = + is_recursive_query(&parent) || ctx.has_recursive_ancestor; + ctx + } + /// Constructs a new context from children contexts. fn new_from_children_nodes( children_nodes: Vec, @@ -1410,6 +1460,7 @@ impl DistributionContext { // that change distribution, or preserves the existing // distribution (starting from an operator that change distribution). distribution_onwards, + .. } = context; if plan.children().is_empty() { // Plan has no children, there is nothing to propagate. @@ -1461,7 +1512,9 @@ impl DistributionContext { } }) .collect(); + Ok(DistributionContext { + has_recursive_ancestor: is_recursive_query(&parent_plan), plan: with_new_children_if_necessary(parent_plan, children_plans)?.into(), distribution_onwards, }) @@ -1472,7 +1525,15 @@ impl DistributionContext { self.plan .children() .into_iter() - .map(DistributionContext::new) + .map(|child| { + let mut ctx = DistributionContext::new_from_plan_with_parent( + self.plan.clone(), + child, + ); + ctx.has_recursive_ancestor = + self.has_recursive_ancestor || ctx.has_recursive_ancestor; + ctx + }) .collect() } } @@ -1504,7 +1565,10 @@ impl TreeNode for DistributionContext { .into_iter() .map(transform) .collect::>>()?; - DistributionContext::new_from_children_nodes(children_nodes, self.plan) + let mut ctx = + DistributionContext::new_from_children_nodes(children_nodes, self.plan)?; + ctx.has_recursive_ancestor |= self.has_recursive_ancestor; + Ok(ctx) } } } diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index 530df374ca7c..077e87336ab2 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -17,6 +17,11 @@ //! Collection of utility functions that are leveraged by the query optimizer rules +use datafusion_expr::RecursiveQuery; +use datafusion_physical_plan::recursive_query::RecursiveQueryExec; +use itertools::concat; +use std::borrow::Borrow; +use std::collections::HashSet; use std::fmt; use std::fmt::Formatter; use std::sync::Arc; @@ -155,6 +160,10 @@ pub fn is_repartition(plan: &Arc) -> bool { plan.as_any().is::() } +pub fn is_recursive_query(plan: &Arc) -> bool { + plan.as_any().is::() +} + /// Utility function yielding a string representation of the given [`ExecutionPlan`]. pub fn get_plan_string(plan: &Arc) -> Vec { let formatted = displayable(plan.as_ref()).indent(true).to_string(); From 879716fe56ebcdf0c4cfd63f6880528ef2bb21f6 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Wed, 20 Sep 2023 15:56:36 -0700 Subject: [PATCH 10/15] cleanup in enforce_distribution.rs --- .../physical_optimizer/coalesce_batches.rs | 8 ++- .../enforce_distribution.rs | 49 +++++++++---------- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index c785ccde7350..e3b3f5060f4c 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -55,6 +55,8 @@ struct CoalesceContext { } impl CoalesceContext { + /// Only use this method at the root of the plan. + /// All other contexts should be created using `new_descendent`. fn new(plan: Arc) -> Self { Self { has_recursive_ancestor: is_recursive_query(&plan), @@ -62,6 +64,8 @@ impl CoalesceContext { } } + /// Creates a new context for a descendent of this context. + /// The descendent will inherit the `has_recursive_ancestor` flag from this context. fn new_descendent(&self, descendent_plan: Arc) -> Self { Self { has_recursive_ancestor: self.has_recursive_ancestor @@ -142,14 +146,14 @@ impl PhysicalOptimizerRule for CoalesceBatches { config: &ConfigOptions, ) -> Result> { if !config.execution.coalesce_batches { - // return Ok(plan); + return Ok(plan); } let target_batch_size = config.execution.batch_size; let ctx = CoalesceContext::new(plan); let CoalesceContext { plan, .. } = ctx.transform_up(&|ctx| { if ctx.has_recursive_ancestor { - // return Ok(Transformed::No(ctx)); + return Ok(Transformed::No(ctx)); } let plan_any = ctx.plan.as_any(); // The goal here is to detect operators that could produce small batches and only diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 243d03ec6738..6f77a50c6fed 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1204,10 +1204,6 @@ fn ensure_distribution( dist_context: DistributionContext, config: &ConfigOptions, ) -> Result> { - let mut dist_context = dist_context; - if is_recursive_query(&dist_context.plan) { - dist_context.has_recursive_ancestor = true; - } if dist_context.has_recursive_ancestor { return Ok(Transformed::No(dist_context)); } @@ -1401,7 +1397,7 @@ fn ensure_distribution( plan.with_new_children(new_children)? }, distribution_onwards, - has_recursive_ancestor: has_recursive_ancestor || is_recursive_query(&plan), + has_recursive_ancestor, }; Ok(Transformed::Yes(new_distribution_context)) } @@ -1422,6 +1418,8 @@ struct DistributionContext { impl DistributionContext { /// Creates an empty context. + /// Only use this method at the root of the plan. + /// All other contexts should be created using `new_descendent`. fn new(plan: Arc) -> Self { let length = plan.children().len(); DistributionContext { @@ -1431,13 +1429,19 @@ impl DistributionContext { } } - fn new_from_plan_with_parent( - parent: Arc, - cur_plan: Arc, - ) -> Self { - let mut ctx = Self::new(cur_plan); - ctx.has_recursive_ancestor = - is_recursive_query(&parent) || ctx.has_recursive_ancestor; + /// Creates a new context from a descendent plan. + /// Importantly, this function propagates the `has_recursive_ancestor` flag. + fn new_descendent(&self, descendent_plan: Arc) -> Self { + let mut ctx = Self::new(descendent_plan); + ctx.has_recursive_ancestor |= self.has_recursive_ancestor; + ctx + } + + /// Creates a new context from a descendent context. + /// Importantly, this function propagates the `has_recursive_ancestor` flag. + fn new_descendent_from_ctx(&self, ctx: Self) -> Self { + let mut ctx = ctx; + ctx.has_recursive_ancestor |= self.has_recursive_ancestor; ctx } @@ -1512,7 +1516,6 @@ impl DistributionContext { } }) .collect(); - Ok(DistributionContext { has_recursive_ancestor: is_recursive_query(&parent_plan), plan: with_new_children_if_necessary(parent_plan, children_plans)?.into(), @@ -1525,15 +1528,7 @@ impl DistributionContext { self.plan .children() .into_iter() - .map(|child| { - let mut ctx = DistributionContext::new_from_plan_with_parent( - self.plan.clone(), - child, - ); - ctx.has_recursive_ancestor = - self.has_recursive_ancestor || ctx.has_recursive_ancestor; - ctx - }) + .map(|child| self.new_descendent(child)) .collect() } } @@ -1565,10 +1560,12 @@ impl TreeNode for DistributionContext { .into_iter() .map(transform) .collect::>>()?; - let mut ctx = - DistributionContext::new_from_children_nodes(children_nodes, self.plan)?; - ctx.has_recursive_ancestor |= self.has_recursive_ancestor; - Ok(ctx) + + DistributionContext::new_from_children_nodes( + children_nodes, + self.plan.clone(), + ) + .map(|ctx| self.new_descendent_from_ctx(ctx)) } } } From 7f982ee267721d596135274bf42af4b45326b20e Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Wed, 20 Sep 2023 17:05:25 -0700 Subject: [PATCH 11/15] updated variable names to be more descriptive --- .../core/src/physical_optimizer/coalesce_batches.rs | 3 ++- .../src/physical_optimizer/enforce_distribution.rs | 12 ++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index e3b3f5060f4c..035a2854f80c 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -67,8 +67,9 @@ impl CoalesceContext { /// Creates a new context for a descendent of this context. /// The descendent will inherit the `has_recursive_ancestor` flag from this context. fn new_descendent(&self, descendent_plan: Arc) -> Self { + let ancestor = self; Self { - has_recursive_ancestor: self.has_recursive_ancestor + has_recursive_ancestor: ancestor.has_recursive_ancestor || is_recursive_query(&descendent_plan), plan: descendent_plan, } diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 6f77a50c6fed..f8aae5567862 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1432,16 +1432,20 @@ impl DistributionContext { /// Creates a new context from a descendent plan. /// Importantly, this function propagates the `has_recursive_ancestor` flag. fn new_descendent(&self, descendent_plan: Arc) -> Self { - let mut ctx = Self::new(descendent_plan); - ctx.has_recursive_ancestor |= self.has_recursive_ancestor; - ctx + let ancestor = self; + + let mut new_ctx = Self::new(descendent_plan); + new_ctx.has_recursive_ancestor |= ancestor.has_recursive_ancestor; + new_ctx } /// Creates a new context from a descendent context. /// Importantly, this function propagates the `has_recursive_ancestor` flag. fn new_descendent_from_ctx(&self, ctx: Self) -> Self { + let ancestor = self; + let mut ctx = ctx; - ctx.has_recursive_ancestor |= self.has_recursive_ancestor; + ctx.has_recursive_ancestor |= ancestor.has_recursive_ancestor; ctx } From a8688d24dee2723e3222ebf20148b06cebebefcb Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Wed, 20 Sep 2023 20:24:16 -0700 Subject: [PATCH 12/15] wip --- datafusion/core/src/physical_optimizer/coalesce_batches.rs | 1 - .../core/src/physical_optimizer/enforce_distribution.rs | 5 +++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index 035a2854f80c..a238099be1ee 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -126,7 +126,6 @@ impl TreeNode for CoalesceContext { } } -/// implement Display method for `DistributionContext` struct. impl fmt::Display for CoalesceContext { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { let plan_string = get_plan_string(&self.plan); diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index f8aae5567862..5201e2ef8f6d 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1579,6 +1579,11 @@ impl fmt::Display for DistributionContext { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { let plan_string = get_plan_string(&self.plan); write!(f, "plan: {:?}", plan_string)?; + write!( + f, + "has_recursive_ancestor: {:?}", + self.has_recursive_ancestor, + )?; for (idx, child) in self.distribution_onwards.iter().enumerate() { if let Some(child) = child { write!(f, "idx:{:?}, exec_tree:{}", idx, child)?; From 2e58eaaf168464454f2bd2aecd22fa7ff8d95c5b Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Sat, 18 Nov 2023 14:00:32 -0800 Subject: [PATCH 13/15] fix rebase artifact --- datafusion/expr/src/logical_plan/builder.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2d58657849e2..4a3904c576a3 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -57,8 +57,6 @@ use datafusion_common::{ use super::plan::{NamedRelation, RecursiveQuery}; -use super::plan::{NamedRelation, RecursiveQuery}; - /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; From 7550b29fcb2b3aecc0d5d99d3d4da2e4654e68b2 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Sat, 18 Nov 2023 14:07:28 -0800 Subject: [PATCH 14/15] refactor fixes --- .../enforce_distribution.rs | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 5201e2ef8f6d..d6374eec1e2d 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1114,29 +1114,29 @@ fn remove_dist_changing_operators( }) } -/// Changes each child of the `dist_context.plan` such that they no longer -/// use order preserving variants, if no ordering is required at the output -/// of the physical plan (there is no global ordering requirement by the query). -fn update_plan_to_remove_unnecessary_final_order( - dist_context: DistributionContext, -) -> Result> { - let DistributionContext { - plan, - distribution_onwards, - .. - } = dist_context; - let new_children = izip!(plan.children(), distribution_onwards) - .map(|(mut child, mut dist_onward)| { - replace_order_preserving_variants(&mut child, &mut dist_onward)?; - Ok(child) - }) - .collect::>>()?; - if !new_children.is_empty() { - plan.with_new_children(new_children) - } else { - Ok(plan) - } -} +// /// Changes each child of the `dist_context.plan` such that they no longer +// /// use order preserving variants, if no ordering is required at the output +// /// of the physical plan (there is no global ordering requirement by the query). +// fn update_plan_to_remove_unnecessary_final_order( +// dist_context: DistributionContext, +// ) -> Result> { +// let DistributionContext { +// plan, +// distribution_onwards, +// .. +// } = dist_context; +// let new_children = izip!(plan.children(), distribution_onwards) +// .map(|(mut child, mut dist_onward)| { +// replace_order_preserving_variants(&mut child, &mut dist_onward)?; +// Ok(child) +// }) +// .collect::>>()?; +// if !new_children.is_empty() { +// plan.with_new_children(new_children) +// } else { +// Ok(plan) +// } +// } /// Updates the physical plan `input` by using `dist_onward` replace order preserving operator variants /// with their corresponding operators that do not preserve order. It is a wrapper for `replace_order_preserving_variants_helper` @@ -1228,7 +1228,7 @@ fn ensure_distribution( mut plan, mut distribution_onwards, has_recursive_ancestor, - } = remove_unnecessary_repartition(dist_context)?; + } = remove_dist_changing_operators(dist_context)?; if let Some(exec) = plan.as_any().downcast_ref::() { if let Some(updated_window) = get_best_fitting_window( From ce3cdcdbc034ebc69b2270e00fb1df2b26386e15 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Sat, 18 Nov 2023 14:09:14 -0800 Subject: [PATCH 15/15] wip: remove dead code --- .../enforce_distribution.rs | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index d6374eec1e2d..114e1176bc24 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1114,30 +1114,6 @@ fn remove_dist_changing_operators( }) } -// /// Changes each child of the `dist_context.plan` such that they no longer -// /// use order preserving variants, if no ordering is required at the output -// /// of the physical plan (there is no global ordering requirement by the query). -// fn update_plan_to_remove_unnecessary_final_order( -// dist_context: DistributionContext, -// ) -> Result> { -// let DistributionContext { -// plan, -// distribution_onwards, -// .. -// } = dist_context; -// let new_children = izip!(plan.children(), distribution_onwards) -// .map(|(mut child, mut dist_onward)| { -// replace_order_preserving_variants(&mut child, &mut dist_onward)?; -// Ok(child) -// }) -// .collect::>>()?; -// if !new_children.is_empty() { -// plan.with_new_children(new_children) -// } else { -// Ok(plan) -// } -// } - /// Updates the physical plan `input` by using `dist_onward` replace order preserving operator variants /// with their corresponding operators that do not preserve order. It is a wrapper for `replace_order_preserving_variants_helper` fn replace_order_preserving_variants(