@@ -39,7 +39,8 @@ use mz_repr::optimize::OptimizerFeatureOverrides;
3939use mz_repr:: refresh_schedule:: { RefreshEvery , RefreshSchedule } ;
4040use mz_repr:: role_id:: RoleId ;
4141use mz_repr:: {
42- strconv, ColumnName , ColumnType , GlobalId , RelationDesc , RelationType , ScalarType , Timestamp ,
42+ preserves_order, strconv, ColumnName , ColumnType , GlobalId , RelationDesc , RelationType ,
43+ ScalarType , Timestamp ,
4344} ;
4445use mz_sql_parser:: ast:: display:: comma_separated;
4546use mz_sql_parser:: ast:: {
@@ -175,6 +176,23 @@ const MAX_NUM_COLUMNS: usize = 256;
175176const MANAGED_REPLICA_PATTERN : std:: sync:: LazyLock < regex:: Regex > =
176177 std:: sync:: LazyLock :: new ( || regex:: Regex :: new ( r"^r(\d)+$" ) . unwrap ( ) ) ;
177178
179+ /// Given a relation desc and a column list, checks that:
180+ /// - the column list is a prefix of the desc;
181+ /// - all the listed columns are types that have meaningful Persist-level ordering.
182+ fn check_partition_by ( desc : & RelationDesc , partition_by : & [ ColumnName ] ) -> Result < ( ) , PlanError > {
183+ for ( idx, ( ( desc_name, desc_type) , partition_name) ) in
184+ desc. iter ( ) . zip ( partition_by. iter ( ) ) . enumerate ( )
185+ {
186+ if desc_name != partition_name {
187+ sql_bail ! ( "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})" ) ;
188+ }
189+ if !preserves_order ( & desc_type. scalar_type ) {
190+ sql_bail ! ( "PARTITION BY column {partition_name} has unsupported type" ) ;
191+ }
192+ }
193+ Ok ( ( ) )
194+ }
195+
178196pub fn describe_create_database (
179197 _: & StatementContext ,
180198 _: CreateDatabaseStatement ,
@@ -2588,6 +2606,14 @@ pub fn plan_create_materialized_view(
25882606 seen : _,
25892607 } : MaterializedViewOptionExtracted = stmt. with_options . try_into ( ) ?;
25902608
2609+ if let Some ( partition_by) = partition_by {
2610+ let partition_by: Vec < _ > = partition_by
2611+ . into_iter ( )
2612+ . map ( normalize:: column_name)
2613+ . collect ( ) ;
2614+ check_partition_by ( & desc, & partition_by) ?;
2615+ }
2616+
25912617 let refresh_schedule = {
25922618 let mut refresh_schedule = RefreshSchedule :: default ( ) ;
25932619 let mut on_commits_seen = 0 ;
0 commit comments