Skip to content

Commit f9614d0

Browse files
committed
Continuing implementation with fixes and improvements
Lint fixes
1 parent 7d3565a commit f9614d0

File tree

17 files changed

+656
-38
lines changed

17 files changed

+656
-38
lines changed

datafusion-cli/Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/dfschema.rs

+5
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,11 @@ impl DFField {
753753
self.field = f.into();
754754
self
755755
}
756+
757+
pub fn with_qualifier(mut self, qualifier: impl Into<OwnedTableReference>) -> Self {
758+
self.qualifier = Some(qualifier.into());
759+
self
760+
}
756761
}
757762

758763
impl From<FieldRef> for DFField {

datafusion/core/src/physical_planner.rs

+28-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use crate::physical_expr::create_physical_expr;
5050
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
5151
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
5252
use crate::physical_plan::analyze::AnalyzeExec;
53+
use crate::physical_plan::continuance::ContinuanceExec;
5354
use crate::physical_plan::explain::ExplainExec;
5455
use crate::physical_plan::expressions::{Column, PhysicalSortExpr};
5556
use crate::physical_plan::filter::FilterExec;
@@ -58,6 +59,7 @@ use crate::physical_plan::joins::SortMergeJoinExec;
5859
use crate::physical_plan::joins::{CrossJoinExec, NestedLoopJoinExec};
5960
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
6061
use crate::physical_plan::projection::ProjectionExec;
62+
use crate::physical_plan::recursive_query::RecursiveQueryExec;
6163
use crate::physical_plan::repartition::RepartitionExec;
6264
use crate::physical_plan::sorts::sort::SortExec;
6365
use crate::physical_plan::unnest::UnnestExec;
@@ -87,7 +89,9 @@ use datafusion_expr::expr::{
8789
};
8890
use datafusion_expr::expr_rewriter::{unalias, unnormalize_cols};
8991
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
90-
use datafusion_expr::{DescribeTable, DmlStatement, StringifiedPlan, WriteOp};
92+
use datafusion_expr::{
93+
DescribeTable, DmlStatement, NamedRelation, RecursiveQuery, StringifiedPlan, WriteOp,
94+
};
9195
use datafusion_expr::{WindowFrame, WindowFrameBound};
9296
use datafusion_physical_expr::expressions::Literal;
9397
use datafusion_sql::utils::window_expr_common_partition_keys;
@@ -1316,6 +1320,29 @@ impl DefaultPhysicalPlanner {
13161320
Ok(plan)
13171321
}
13181322
}
1323+
LogicalPlan::RecursiveQuery(RecursiveQuery { name, static_term, recursive_term, is_distinct }) => {
1324+
let static_term = self.create_initial_plan(static_term, session_state).await?;
1325+
let recursive_term = self.create_initial_plan(recursive_term, session_state).await?;
1326+
1327+
Ok(Arc::new(RecursiveQueryExec::new(name.clone(), static_term, recursive_term, *is_distinct)))
1328+
}
1329+
LogicalPlan::NamedRelation(NamedRelation {name, schema}) => {
1330+
// Named relations is how we represent access to any sort of dynamic data provider. They
1331+
// differ from tables in the sense that they can start existing dynamically during the
1332+
// execution of a query and then disappear before it even finishes.
1333+
//
1334+
// This system allows us to replicate the tricky behavior of classical databases where a
1335+
// temporary "working table" (as it is called in Postgres) can be used when dealing with
1336+
// complex operations (such as recursive CTEs) and then can be dropped. Since DataFusion
1337+
// at its core is heavily stream-based and vectorized, we try to avoid using 'real' tables
1338+
// and let the streams take care of the data flow in this as well.
1339+
1340+
// Since the actual "input"'s will be only available to us at runtime (through task context)
1341+
// we can't really do any sort of meaningful validation here.
1342+
let schema = SchemaRef::new(schema.as_ref().to_owned().into());
1343+
Ok(Arc::new(ContinuanceExec::new(name.clone(), schema)))
1344+
}
1345+
13191346
};
13201347
exec_plan
13211348
}.boxed()

datafusion/execution/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,5 @@ object_store = "0.7.0"
4545
parking_lot = "0.12"
4646
rand = "0.8"
4747
tempfile = "3"
48+
tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
4849
url = "2.2"

datafusion/execution/src/task.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,14 @@ use crate::{
3333
runtime_env::{RuntimeConfig, RuntimeEnv},
3434
};
3535

36-
use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
37-
use futures::channel::mpsc::Receiver as SingleChannelReceiver;
36+
use arrow::record_batch::RecordBatch;
37+
// use futures::channel::mpsc::Receiver as SingleChannelReceiver;
38+
use tokio::sync::mpsc::Receiver as SingleChannelReceiver;
3839
// use futures::lock::Mutex;
3940
use parking_lot::Mutex;
4041
// use futures::
4142

42-
type RelationHandler = SingleChannelReceiver<ArrowResult<RecordBatch>>;
43+
type RelationHandler = SingleChannelReceiver<Result<RecordBatch>>;
4344

4445
/// Task Execution Context
4546
///

datafusion/expr/src/logical_plan/builder.rs

+35
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ use std::collections::{HashMap, HashSet};
5454
use std::convert::TryFrom;
5555
use std::sync::Arc;
5656

57+
use super::plan::{NamedRelation, RecursiveQuery};
58+
5759
/// Default table name for unnamed table
5860
pub const UNNAMED_TABLE: &str = "?table?";
5961

@@ -120,6 +122,39 @@ impl LogicalPlanBuilder {
120122
}))
121123
}
122124

125+
/// A named temporary relation with a schema.
126+
///
127+
/// This is used to represent a relation that does not exist at the
128+
/// planning stage, but will be created at execution time with the
129+
/// given schema.
130+
pub fn named_relation(name: &str, schema: DFSchemaRef) -> Self {
131+
Self::from(LogicalPlan::NamedRelation(NamedRelation {
132+
name: name.to_string(),
133+
schema,
134+
}))
135+
}
136+
137+
/// Convert a regular plan into a recursive query.
138+
pub fn to_recursive_query(
139+
&self,
140+
name: String,
141+
recursive_term: LogicalPlan,
142+
is_distinct: bool,
143+
) -> Result<Self> {
144+
// TODO: we need to do a bunch of validation here. Maybe more.
145+
if is_distinct {
146+
return Err(DataFusionError::NotImplemented(
147+
"Recursive queries with distinct is not supported".to_string(),
148+
));
149+
}
150+
Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
151+
name,
152+
static_term: Arc::new(self.plan.clone()),
153+
recursive_term: Arc::new(recursive_term),
154+
is_distinct,
155+
})))
156+
}
157+
123158
/// Create a values list based relation, and the schema is inferred from data, consuming
124159
/// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
125160
/// documentation for more details.

datafusion/expr/src/logical_plan/mod.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ pub use ddl::{
3434
pub use dml::{DmlStatement, WriteOp};
3535
pub use plan::{
3636
Aggregate, Analyze, CrossJoin, DescribeTable, Distinct, EmptyRelation, Explain,
37-
Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning,
38-
PlanType, Prepare, Projection, Repartition, Sort, StringifiedPlan, Subquery,
39-
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
37+
Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, NamedRelation,
38+
Partitioning, PlanType, Prepare, Projection, RecursiveQuery, Repartition, Sort,
39+
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
40+
Unnest, Values, Window,
4041
};
4142
pub use statement::{
4243
SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd,

datafusion/expr/src/logical_plan/plan.rs

+69
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ pub enum LogicalPlan {
110110
/// produces 0 or 1 row. This is used to implement SQL `SELECT`
111111
/// that has no values in the `FROM` clause.
112112
EmptyRelation(EmptyRelation),
113+
/// A named temporary relation with a schema.
114+
NamedRelation(NamedRelation),
113115
/// Produces the output of running another query. This is used to
114116
/// implement SQL subqueries
115117
Subquery(Subquery),
@@ -152,6 +154,8 @@ pub enum LogicalPlan {
152154
/// Unnest a column that contains a nested list type such as an
153155
/// ARRAY. This is used to implement SQL `UNNEST`
154156
Unnest(Unnest),
157+
/// A variadic query (e.g. "Recursive CTEs")
158+
RecursiveQuery(RecursiveQuery),
155159
}
156160

157161
impl LogicalPlan {
@@ -188,6 +192,11 @@ impl LogicalPlan {
188192
LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
189193
LogicalPlan::Ddl(ddl) => ddl.schema(),
190194
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
195+
LogicalPlan::NamedRelation(NamedRelation { schema, .. }) => schema,
196+
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
197+
// we take the schema of the static term as the schema of the entire recursive query
198+
static_term.schema()
199+
}
191200
}
192201
}
193202

@@ -230,6 +239,7 @@ impl LogicalPlan {
230239
LogicalPlan::Explain(_)
231240
| LogicalPlan::Analyze(_)
232241
| LogicalPlan::EmptyRelation(_)
242+
| LogicalPlan::NamedRelation(_)
233243
| LogicalPlan::Ddl(_)
234244
| LogicalPlan::Dml(_)
235245
| LogicalPlan::Copy(_)
@@ -240,6 +250,10 @@ impl LogicalPlan {
240250
| LogicalPlan::TableScan(_) => {
241251
vec![self.schema()]
242252
}
253+
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
254+
// return only the schema of the static term
255+
static_term.all_schemas()
256+
}
243257
// return children schemas
244258
LogicalPlan::Limit(_)
245259
| LogicalPlan::Subquery(_)
@@ -371,6 +385,9 @@ impl LogicalPlan {
371385
}
372386
// plans without expressions
373387
LogicalPlan::EmptyRelation(_)
388+
| LogicalPlan::NamedRelation(_)
389+
// TODO: not sure if this should go here
390+
| LogicalPlan::RecursiveQuery(_)
374391
| LogicalPlan::Subquery(_)
375392
| LogicalPlan::SubqueryAlias(_)
376393
| LogicalPlan::Limit(_)
@@ -415,8 +432,14 @@ impl LogicalPlan {
415432
LogicalPlan::Ddl(ddl) => ddl.inputs(),
416433
LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
417434
LogicalPlan::Prepare(Prepare { input, .. }) => vec![input],
435+
LogicalPlan::RecursiveQuery(RecursiveQuery {
436+
static_term,
437+
recursive_term,
438+
..
439+
}) => vec![static_term, recursive_term],
418440
// plans without inputs
419441
LogicalPlan::TableScan { .. }
442+
| LogicalPlan::NamedRelation(_)
420443
| LogicalPlan::Statement { .. }
421444
| LogicalPlan::EmptyRelation { .. }
422445
| LogicalPlan::Values { .. }
@@ -492,6 +515,9 @@ impl LogicalPlan {
492515
cross.left.head_output_expr()
493516
}
494517
}
518+
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
519+
static_term.head_output_expr()
520+
}
495521
LogicalPlan::Union(union) => Ok(Some(Expr::Column(
496522
union.schema.fields()[0].qualified_column(),
497523
))),
@@ -511,6 +537,7 @@ impl LogicalPlan {
511537
}
512538
LogicalPlan::Subquery(_) => Ok(None),
513539
LogicalPlan::EmptyRelation(_)
540+
| LogicalPlan::NamedRelation(_)
514541
| LogicalPlan::Prepare(_)
515542
| LogicalPlan::Statement(_)
516543
| LogicalPlan::Values(_)
@@ -839,6 +866,14 @@ impl LogicalPlan {
839866
input: Arc::new(inputs[0].clone()),
840867
}))
841868
}
869+
LogicalPlan::RecursiveQuery(RecursiveQuery {
870+
name, is_distinct, ..
871+
}) => Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
872+
name: name.clone(),
873+
static_term: Arc::new(inputs[0].clone()),
874+
recursive_term: Arc::new(inputs[1].clone()),
875+
is_distinct: *is_distinct,
876+
})),
842877
LogicalPlan::Analyze(a) => {
843878
assert!(expr.is_empty());
844879
assert_eq!(inputs.len(), 1);
@@ -877,6 +912,7 @@ impl LogicalPlan {
877912
}))
878913
}
879914
LogicalPlan::EmptyRelation(_)
915+
| LogicalPlan::NamedRelation(_)
880916
| LogicalPlan::Ddl(_)
881917
| LogicalPlan::Statement(_) => {
882918
// All of these plan types have no inputs / exprs so should not be called
@@ -1040,6 +1076,9 @@ impl LogicalPlan {
10401076
}),
10411077
LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
10421078
LogicalPlan::EmptyRelation(_) => Some(0),
1079+
// TODO: not sure if this is correct
1080+
LogicalPlan::NamedRelation(_) => None,
1081+
LogicalPlan::RecursiveQuery(_) => None,
10431082
LogicalPlan::Subquery(_) => None,
10441083
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
10451084
LogicalPlan::Limit(Limit { fetch, .. }) => *fetch,
@@ -1387,6 +1426,14 @@ impl LogicalPlan {
13871426
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
13881427
match self.0 {
13891428
LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
1429+
LogicalPlan::NamedRelation(NamedRelation { name, .. }) => {
1430+
write!(f, "NamedRelation: {}", name)
1431+
}
1432+
LogicalPlan::RecursiveQuery(RecursiveQuery {
1433+
is_distinct, ..
1434+
}) => {
1435+
write!(f, "RecursiveQuery: is_distinct={}", is_distinct)
1436+
}
13901437
LogicalPlan::Values(Values { ref values, .. }) => {
13911438
let str_values: Vec<_> = values
13921439
.iter()
@@ -1685,6 +1732,28 @@ pub struct EmptyRelation {
16851732
pub schema: DFSchemaRef,
16861733
}
16871734

1735+
/// A named temporary relation with a known schema.
1736+
#[derive(Clone, PartialEq, Eq, Hash)]
1737+
pub struct NamedRelation {
1738+
/// The relation name
1739+
pub name: String,
1740+
/// The schema description
1741+
pub schema: DFSchemaRef,
1742+
}
1743+
1744+
/// A variadic query operation
1745+
#[derive(Clone, PartialEq, Eq, Hash)]
1746+
pub struct RecursiveQuery {
1747+
/// Name of the query
1748+
pub name: String,
1749+
/// The static term
1750+
pub static_term: Arc<LogicalPlan>,
1751+
/// The recursive term
1752+
pub recursive_term: Arc<LogicalPlan>,
1753+
/// Distinction
1754+
pub is_distinct: bool,
1755+
}
1756+
16881757
/// Values expression. See
16891758
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
16901759
/// documentation for more details.

datafusion/optimizer/src/common_subexpr_eliminate.rs

+2
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,8 @@ impl OptimizerRule for CommonSubexprEliminate {
366366
| LogicalPlan::Dml(_)
367367
| LogicalPlan::Copy(_)
368368
| LogicalPlan::Unnest(_)
369+
| LogicalPlan::NamedRelation(_)
370+
| LogicalPlan::RecursiveQuery(_)
369371
| LogicalPlan::Prepare(_) => {
370372
// apply the optimization to all inputs of the plan
371373
utils::optimize_children(self, plan, config)?

datafusion/physical-plan/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,5 @@ tempfile = "3"
6060
#[dev-dependencies]
6161
termtree = "0.4.1"
6262
tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
63+
tokio-stream = { version = "0.1.14" }
6364
uuid = { version = "^1.2", features = ["v4"] }

0 commit comments

Comments
 (0)