diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 02a18f22c916..ec8572c7ae45 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1112,6 +1112,69 @@ impl DataFrame { ) } + /// Sorts the DataFrame within each partition using the specified expressions. + /// + /// This function performs a local sort within each partition of the DataFrame, + /// meaning that the sorting is done independently for each partition without + /// merging the results across partitions. This is more efficient than a global + /// sort when you only need data sorted within each partition. + /// + /// # Arguments + /// + /// * `expr` - A vector of expressions to sort by. Each expression can be a column name + /// or a more complex expression. The expressions are evaluated in order, with + /// earlier expressions taking precedence over later ones. + /// + /// # Returns + /// + /// Returns a new DataFrame with the data sorted within each partition according + /// to the specified expressions. + /// + /// # Example + /// + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # use datafusion_common::assert_batches_sorted_eq; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let ctx = SessionContext::new(); + /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?; + /// // First repartition the data + /// let df = df.repartition(Partitioning::RoundRobinBatch(2))?; + /// // Then sort within each partition + /// let df = df.sort_by_within_partitions(vec![ + /// col("a"), // a ASC + /// col("b"), // b ASC + /// ])?; + /// let expected = vec![ + /// "+---+---+---+", + /// "| a | b | c |", + /// "+---+---+---+", + /// "| 7 | 8 | 9 |", + /// "| 4 | 5 | 6 |", + /// "| 1 | 2 | 3 |", + /// "+---+---+---+", + /// ]; + /// # assert_batches_sorted_eq!(expected, &df.collect().await?); + /// # Ok(()) + /// # } + /// ``` + /// + /// # Note + /// + /// - This operation maintains the existing partitioning of the data + /// - The sort order is not guaranteed across partitions + /// - For a global sort across all partitions, use [`sort_by()`](Self::sort_by) instead + /// + pub fn sort_by_within_partitions(self, expr: Vec) -> Result { + self.sort_within_partitions( + expr.into_iter() + .map(|e| e.sort(true, false)) + .collect::>(), + ) + } + /// Sort the DataFrame by the specified sorting expressions. /// /// Note that any expression can be turned into @@ -1152,6 +1215,70 @@ impl DataFrame { projection_requires_validation: self.projection_requires_validation, }) } + /// Sorts the DataFrame within each partition using the specified expressions. + /// + /// This function performs a local sort within each partition of the DataFrame, + /// meaning that the sorting is done independently for each partition without + /// merging the results across partitions. This is more efficient than a global + /// sort when you only need data sorted within each partition. + /// + /// # Arguments + /// + /// * `expr` - A vector of expressions to sort by. Each expression can be a column name + /// or a more complex expression. The expressions are evaluated in order, with + /// earlier expressions taking precedence over later ones. + /// + /// # Returns + /// + /// Returns a new DataFrame with the data sorted within each partition according + /// to the specified expressions. + /// + /// # Example + /// + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # use datafusion_common::assert_batches_sorted_eq; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let ctx = SessionContext::new(); + /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?; + /// // First repartition the data + /// let df = df.repartition(Partitioning::RoundRobinBatch(2))?; + /// // Then sort within each partition + /// let df = df.sort_within_partitions(vec![ + /// col("a").sort(false, true), // a DESC, nulls first + /// col("b").sort(true, false), // b ASC, nulls last + /// ])?; + /// let expected = vec![ + /// "+---+---+---+", + /// "| a | b | c |", + /// "+---+---+---+", + /// "| 7 | 8 | 9 |", + /// "| 4 | 5 | 6 |", + /// "| 1 | 2 | 3 |", + /// "+---+---+---+", + /// ]; + /// # assert_batches_sorted_eq!(expected, &df.collect().await?); + /// # Ok(()) + /// # } + /// ``` + /// + /// # Note + /// + /// - This operation maintains the existing partitioning of the data + /// - The sort order is not guaranteed across partitions + /// - For a global sort across all partitions, use [`sort()`](Self::sort) instead + pub fn sort_within_partitions(self, expr: Vec) -> Result { + let plan = LogicalPlanBuilder::from(self.plan) + .sort_within_partitions(expr)? + .build()?; + Ok(DataFrame { + session_state: self.session_state, + plan, + projection_requires_validation: self.projection_requires_validation, + }) + } /// Join this `DataFrame` with another `DataFrame` using explicitly specified /// columns and an optional filter expression. diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 8bf513a55a66..802d3ca479bf 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -841,10 +841,20 @@ impl DefaultPhysicalPlanner { .collect::>>()?; Partitioning::Hash(runtime_expr, *n) } - LogicalPartitioning::DistributeBy(_) => { - return not_impl_err!( - "Physical plan does not support DistributeBy partitioning" - ); + LogicalPartitioning::DistributeBy(expr) => { + let n = + session_state.config().options().execution.target_partitions; + let runtime_expr = expr + .iter() + .map(|e| { + self.create_physical_expr( + e, + input_dfschema, + session_state, + ) + }) + .collect::>>()?; + Partitioning::Hash(runtime_expr, n) } }; Arc::new(RepartitionExec::try_new( @@ -853,7 +863,11 @@ impl DefaultPhysicalPlanner { )?) } LogicalPlan::Sort(Sort { - expr, input, fetch, .. + expr, + input, + fetch, + preserve_partitioning, + .. }) => { let physical_input = children.one()?; let input_dfschema = input.as_ref().schema(); @@ -867,7 +881,9 @@ impl DefaultPhysicalPlanner { "SortExec requires at least one sort expression" ); }; - let new_sort = SortExec::new(ordering, physical_input).with_fetch(*fetch); + let new_sort = SortExec::new(ordering, physical_input) + .with_preserve_partitioning(*preserve_partitioning) + .with_fetch(*fetch); Arc::new(new_sort) } LogicalPlan::Subquery(_) => todo!(), diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 93dd6c2b89fc..edbc36759e04 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -779,7 +779,14 @@ impl LogicalPlanBuilder { self, sorts: impl IntoIterator> + Clone, ) -> Result { - self.sort_with_limit(sorts, None) + self.sort_with_limit(sorts, None, false) + } + + pub fn sort_within_partitions( + self, + sorts: impl IntoIterator> + Clone, + ) -> Result { + self.sort_with_limit(sorts, None, true) } /// Apply a sort @@ -787,6 +794,7 @@ impl LogicalPlanBuilder { self, sorts: impl IntoIterator> + Clone, fetch: Option, + preserve_partitioning: bool, ) -> Result { let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?; @@ -812,6 +820,7 @@ impl LogicalPlanBuilder { expr: normalize_sorts(sorts, &self.plan)?, input: self.plan, fetch, + preserve_partitioning, }))); } @@ -829,6 +838,7 @@ impl LogicalPlanBuilder { expr: normalize_sorts(sorts, &plan)?, input: Arc::new(plan), fetch, + preserve_partitioning: false, }); Projection::try_new(new_expr, Arc::new(sort_plan)) diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index f1e455f46db3..33840649b12b 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -473,10 +473,16 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { "Aggregates": expr_vec_fmt!(aggr_expr) }) } - LogicalPlan::Sort(Sort { expr, fetch, .. }) => { + LogicalPlan::Sort(Sort { + expr, + fetch, + preserve_partitioning, + .. + }) => { let mut object = json!({ "Node Type": "Sort", "Sort Key": expr_vec_fmt!(expr), + "Preserve Partitioning": preserve_partitioning, }); if let Some(fetch) = fetch { diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 876c14f1000f..cf4d6ef33ad6 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -879,6 +879,7 @@ impl LogicalPlan { LogicalPlan::Sort(Sort { expr: sort_expr, fetch, + preserve_partitioning, .. }) => { let input = self.only_input(inputs)?; @@ -890,6 +891,7 @@ impl LogicalPlan { .collect(), input: Arc::new(input), fetch: *fetch, + preserve_partitioning: *preserve_partitioning, })) } LogicalPlan::Join(Join { @@ -1872,7 +1874,7 @@ impl LogicalPlan { expr_vec_fmt!(group_expr), expr_vec_fmt!(aggr_expr) ), - LogicalPlan::Sort(Sort { expr, fetch, .. }) => { + LogicalPlan::Sort(Sort { expr, fetch, preserve_partitioning, .. }) => { write!(f, "Sort: ")?; for (i, expr_item) in expr.iter().enumerate() { if i > 0 { @@ -1883,6 +1885,9 @@ impl LogicalPlan { if let Some(a) = fetch { write!(f, ", fetch={a}")?; } + if *preserve_partitioning { + write!(f, ", preserve_ordering={preserve_partitioning}")?; + } Ok(()) } @@ -3689,6 +3694,9 @@ pub struct Sort { pub input: Arc, /// Optional fetch limit pub fetch: Option, + /// Preserve partitions of input plan. If false, the input partitions + /// will be sorted and merged into a single output partition. + pub preserve_partitioning: bool, } /// Join two logical plans on one or more join columns diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 527248ad39c2..63b11a87bae1 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -121,9 +121,19 @@ impl TreeNode for LogicalPlan { schema, }) }), - LogicalPlan::Sort(Sort { expr, input, fetch }) => input - .map_elements(f)? - .update_data(|input| LogicalPlan::Sort(Sort { expr, input, fetch })), + LogicalPlan::Sort(Sort { + expr, + input, + fetch, + preserve_partitioning, + }) => input.map_elements(f)?.update_data(|input| { + LogicalPlan::Sort(Sort { + expr, + input, + fetch, + preserve_partitioning, + }) + }), LogicalPlan::Join(Join { left, right, @@ -574,9 +584,19 @@ impl LogicalPlan { null_equality, }) }), - LogicalPlan::Sort(Sort { expr, input, fetch }) => expr - .map_elements(f)? - .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })), + LogicalPlan::Sort(Sort { + expr, + input, + fetch, + preserve_partitioning, + }) => expr.map_elements(f)?.update_data(|expr| { + LogicalPlan::Sort(Sort { + expr, + input, + fetch, + preserve_partitioning, + }) + }), LogicalPlan::Extension(Extension { node }) => { // would be nice to avoid this copy -- maybe can // update extension to just observer Exprs diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index b5a3e9a2d585..77c75d91a2a1 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -1298,6 +1298,7 @@ mod test { expr: vec![sort_expr], input: Arc::new(plan), fetch: None, + preserve_partitioning: false, }); // Plan C: no coerce @@ -1421,6 +1422,7 @@ mod test { expr: vec![sort_expr], input: Arc::new(plan), fetch: None, + preserve_partitioning: false, }); // Plan C: no coerce diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 6a49e5d22087..dffa6b83f2a4 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -96,7 +96,12 @@ impl CommonSubexprEliminate { sort: Sort, config: &dyn OptimizerConfig, ) -> Result> { - let Sort { expr, input, fetch } = sort; + let Sort { + expr, + input, + fetch, + preserve_partitioning, + } = sort; let input = Arc::unwrap_or_clone(input); let (sort_expressions, sort_params): (Vec<_>, Vec<(_, _)>) = expr .into_iter() @@ -117,6 +122,7 @@ impl CommonSubexprEliminate { .collect(), input: Arc::new(new_input), fetch, + preserve_partitioning, }) }); Ok(new_sort) diff --git a/datafusion/optimizer/src/eliminate_duplicated_expr.rs b/datafusion/optimizer/src/eliminate_duplicated_expr.rs index a6651df938a7..eb80e4faf03e 100644 --- a/datafusion/optimizer/src/eliminate_duplicated_expr.rs +++ b/datafusion/optimizer/src/eliminate_duplicated_expr.rs @@ -86,6 +86,7 @@ impl OptimizerRule for EliminateDuplicatedExpr { expr: unique_exprs, input: sort.input, fetch: sort.fetch, + preserve_partitioning: sort.preserve_partitioning, }))) } LogicalPlan::Aggregate(agg) => { diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 64789f5de0d2..a92ba8e90a08 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -139,6 +139,7 @@ message SortNode { repeated SortExprNode expr = 2; // Maximum number of highest/lowest rows to fetch; negative means no limit int64 fetch = 3; + bool preserve_partitioning = 4; } message RepartitionNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 92309ea6a5cb..15c376454e5c 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -19583,6 +19583,9 @@ impl serde::Serialize for SortNode { if self.fetch != 0 { len += 1; } + if self.preserve_partitioning { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.SortNode", len)?; if let Some(v) = self.input.as_ref() { struct_ser.serialize_field("input", v)?; @@ -19595,6 +19598,9 @@ impl serde::Serialize for SortNode { #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("fetch", ToString::to_string(&self.fetch).as_str())?; } + if self.preserve_partitioning { + struct_ser.serialize_field("preservePartitioning", &self.preserve_partitioning)?; + } struct_ser.end() } } @@ -19608,6 +19614,8 @@ impl<'de> serde::Deserialize<'de> for SortNode { "input", "expr", "fetch", + "preserve_partitioning", + "preservePartitioning", ]; #[allow(clippy::enum_variant_names)] @@ -19615,6 +19623,7 @@ impl<'de> serde::Deserialize<'de> for SortNode { Input, Expr, Fetch, + PreservePartitioning, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -19639,6 +19648,7 @@ impl<'de> serde::Deserialize<'de> for SortNode { "input" => Ok(GeneratedField::Input), "expr" => Ok(GeneratedField::Expr), "fetch" => Ok(GeneratedField::Fetch), + "preservePartitioning" | "preserve_partitioning" => Ok(GeneratedField::PreservePartitioning), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -19661,6 +19671,7 @@ impl<'de> serde::Deserialize<'de> for SortNode { let mut input__ = None; let mut expr__ = None; let mut fetch__ = None; + let mut preserve_partitioning__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Input => { @@ -19683,12 +19694,19 @@ impl<'de> serde::Deserialize<'de> for SortNode { Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) ; } + GeneratedField::PreservePartitioning => { + if preserve_partitioning__.is_some() { + return Err(serde::de::Error::duplicate_field("preservePartitioning")); + } + preserve_partitioning__ = Some(map_.next_value()?); + } } } Ok(SortNode { input: input__, expr: expr__.unwrap_or_default(), fetch: fetch__.unwrap_or_default(), + preserve_partitioning: preserve_partitioning__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index b0fc0ce60436..752729557e00 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -202,6 +202,8 @@ pub struct SortNode { /// Maximum number of highest/lowest rows to fetch; negative means no limit #[prost(int64, tag = "3")] pub fetch: i64, + #[prost(bool, tag = "4")] + pub preserve_partitioning: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct RepartitionNode { diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 1acf1ee27bfe..4acd2dbae3dd 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -542,7 +542,7 @@ impl AsLogicalPlan for LogicalPlanNode { from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?; let fetch: Option = sort.fetch.try_into().ok(); LogicalPlanBuilder::from(input) - .sort_with_limit(sort_expr, fetch)? + .sort_with_limit(sort_expr, fetch, false)? .build() } LogicalPlanType::Repartition(repartition) => { @@ -1416,7 +1416,12 @@ impl AsLogicalPlan for LogicalPlanNode { ))), }) } - LogicalPlan::Sort(Sort { input, expr, fetch }) => { + LogicalPlan::Sort(Sort { + input, + expr, + fetch, + preserve_partitioning, + }) => { let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan( input.as_ref(), extension_codec, @@ -1429,6 +1434,7 @@ impl AsLogicalPlan for LogicalPlanNode { input: Some(Box::new(input)), expr: sort_expr, fetch: fetch.map(|f| f as i64).unwrap_or(-1i64), + preserve_partitioning: *preserve_partitioning, }, ))), }) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index b50fbf68129c..aba71ba562da 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -47,7 +47,8 @@ use datafusion_expr::{ use indexmap::IndexMap; use sqlparser::ast::{ visit_expressions_mut, Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, - OrderBy, SelectItemQualifiedWildcardKind, WildcardAdditionalOptions, WindowType, + OrderBy, OrderByExpr, OrderByOptions, SelectItemQualifiedWildcardKind, + WildcardAdditionalOptions, WindowType, }; use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins}; @@ -60,9 +61,6 @@ impl SqlToRel<'_, S> { planner_context: &mut PlannerContext, ) -> Result { // Check for unsupported syntax first - if !select.cluster_by.is_empty() { - return not_impl_err!("CLUSTER BY"); - } if !select.lateral_views.is_empty() { return not_impl_err!("LATERAL VIEWS"); } @@ -72,8 +70,26 @@ impl SqlToRel<'_, S> { if select.top.is_some() { return not_impl_err!("TOP"); } - if !select.sort_by.is_empty() { - return not_impl_err!("SORT BY"); + + if query_order_by.is_some() { + if !select.cluster_by.is_empty() { + return plan_err!("ORDER BY and CLUSTER BY cannot be used together"); + } + if !select.distribute_by.is_empty() { + return plan_err!("ORDER BY and DISTRIBUTE BY cannot be used together"); + } + if !select.sort_by.is_empty() { + return plan_err!("ORDER BY and SORT BY cannot be used together"); + } + } + + if !select.cluster_by.is_empty() { + if !select.sort_by.is_empty() { + return plan_err!("CLUSTER BY and SORT BY cannot be used together"); + } + if !select.distribute_by.is_empty() { + return plan_err!("CLUSTER BY and DISTRIBUTE BY cannot be used together"); + } } // Process `from` clause @@ -274,10 +290,11 @@ impl SqlToRel<'_, S> { }?; // DISTRIBUTE BY - let plan = if !select.distribute_by.is_empty() { + let plan = if !select.distribute_by.is_empty() || !select.cluster_by.is_empty() { let x = select .distribute_by .iter() + .chain(select.cluster_by.iter()) .map(|e| { self.sql_expr_to_logical_expr( e.clone(), @@ -293,6 +310,34 @@ impl SqlToRel<'_, S> { plan }; + // SORT BY + let plan = if !select.sort_by.is_empty() || !select.cluster_by.is_empty() { + let sort_by_rex = self.order_by_to_sort_expr( + select + .sort_by + .into_iter() + .chain(select.cluster_by.iter().map(|e| OrderByExpr { + expr: e.clone(), + options: OrderByOptions { + asc: Some(true), + nulls_first: None, + }, + with_fill: None, + })) + .collect(), + projected_plan.schema().as_ref(), + planner_context, + true, + Some(plan.schema().as_ref()), + )?; + let sort_by_rex = normalize_sorts(sort_by_rex, &projected_plan)?; + LogicalPlanBuilder::from(plan) + .sort_within_partitions(sort_by_rex)? + .build()? + } else { + plan + }; + self.order_by(plan, order_by_rex) } diff --git a/datafusion/sql/src/unparser/ast.rs b/datafusion/sql/src/unparser/ast.rs index d9ade822aa00..64cdf246d16c 100644 --- a/datafusion/sql/src/unparser/ast.rs +++ b/datafusion/sql/src/unparser/ast.rs @@ -143,7 +143,7 @@ pub struct SelectBuilder { group_by: Option, cluster_by: Vec, distribute_by: Vec, - sort_by: Vec, + sort_by: Vec, having: Option, named_window: Vec, qualify: Option, @@ -260,7 +260,7 @@ impl SelectBuilder { self.distribute_by = value; self } - pub fn sort_by(&mut self, value: Vec) -> &mut Self { + pub fn sort_by(&mut self, value: Vec) -> &mut Self { self.sort_by = value; self } diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs index aa480cf4fff9..f053c1a28359 100644 --- a/datafusion/sql/src/unparser/rewrite.rs +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -75,6 +75,7 @@ pub(super) fn normalize_union_schema(plan: &LogicalPlan) -> Result expr: rewrite_sort_expr_for_union(sort.expr)?, input: sort.input, fetch: sort.fetch, + preserve_partitioning: sort.preserve_partitioning, }))) } _ => Ok(Transformed::no(plan)), diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index b4697c2fe473..91bbf0a3ad68 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -1610,7 +1610,7 @@ fn test_sort_with_push_down_fetch() -> Result<()> { let plan = table_scan(Some("t1"), &schema, None)? .project(vec![col("id"), col("age")])? - .sort_with_limit(vec![col("age").sort(true, true)], Some(10))? + .sort_with_limit(vec![col("age").sort(true, true)], Some(10), false)? .build()?; let sql = plan_to_sql(&plan)?; diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index c82239d9b455..4ba0dc9d79d4 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4182,11 +4182,36 @@ fn test_select_distinct_order_by() { ); } +#[test] +fn test_select_sort_by() { + let sql = "SELECT id,age from person SORT BY id"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" +Sort: person.id ASC NULLS LAST, preserve_ordering=true + Projection: person.id, person.age + TableScan: person +"# + ); +} + +#[test] +fn test_select_cluster_by() { + let sql = "SELECT id,age from person CLUSTER BY id"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" +Sort: person.id ASC NULLS LAST, preserve_ordering=true + Repartition: DistributeBy(person.id) + Projection: person.id, person.age + TableScan: person +"# + ); +} + #[rstest] -#[case::select_cluster_by_unsupported( - "SELECT customer_name, sum(order_total) as total_order_amount FROM orders CLUSTER BY customer_name", - "This feature is not implemented: CLUSTER BY" -)] #[case::select_lateral_view_unsupported( "SELECT id, number FROM person LATERAL VIEW explode(numbers) exploded_table AS number", "This feature is not implemented: LATERAL VIEWS" @@ -4199,10 +4224,6 @@ fn test_select_distinct_order_by() { "SELECT TOP (5) * FROM person", "This feature is not implemented: TOP" )] -#[case::select_sort_by_unsupported( - "SELECT * FROM person SORT BY id", - "This feature is not implemented: SORT BY" -)] #[test] fn test_select_unsupported_syntax_errors(#[case] sql: &str, #[case] error: &str) { let err = logical_plan(sql).unwrap_err(); diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 9e67018ecd0b..2e79d27ed555 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -4145,7 +4145,10 @@ logical_plan 02)--Projection: multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1 03)----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]] 04)------TableScan: multiple_ordered_table_with_pk projection=[a, b, c, d] -physical_plan_error This feature is not implemented: Physical plan does not support DistributeBy partitioning +physical_plan +01)ProjectionExec: expr=[a@1 as a, b@2 as b, sum(multiple_ordered_table_with_pk.d)@3 as sum1] +02)--AggregateExec: mode=Single, gby=[c@2 as c, a@0 as a, b@1 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], constraints=[PrimaryKey([3])], file_type=csv, has_header=true # union with aggregate query TT diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 3fc90a6459f2..951c07c8ea5e 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -397,10 +397,19 @@ ORDER BY sum(value) + sum(value); 2022-01-01T01:00:00 4 2022-01-02T00:00:00 6 -## SORT BY is not supported -statement error DataFusion error: This feature is not implemented: SORT BY +query IP rowsort select * from t SORT BY time; +---- +1 2022-01-01T00:00:30 +2 2022-01-01T01:00:10 +3 2022-01-02T00:00:20 +query IP rowsort +select * from t CLUSTER BY time; +---- +1 2022-01-01T00:00:30 +2 2022-01-01T01:00:10 +3 2022-01-02T00:00:20 # distinct on a column not in the select list should not work statement error DataFusion error: Error during planning: For SELECT DISTINCT, ORDER BY expressions t\.time must appear in select list diff --git a/datafusion/substrait/src/logical_plan/producer/rel/sort_rel.rs b/datafusion/substrait/src/logical_plan/producer/rel/sort_rel.rs index aaa8be163560..f379e20edca1 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/sort_rel.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/sort_rel.rs @@ -27,7 +27,12 @@ pub fn from_sort( producer: &mut impl SubstraitProducer, sort: &Sort, ) -> datafusion::common::Result> { - let Sort { expr, input, fetch } = sort; + let Sort { + expr, + input, + fetch, + preserve_partitioning: _, // TODO: implement preserve_partitioning after substrait supports it + } = sort; let sort_fields = expr .iter() .map(|e| substrait_sort_field(producer, e, input.schema()))