diff --git a/src/repr/src/lib.rs b/src/repr/src/lib.rs index aed31792baebd..4be5579eefe79 100644 --- a/src/repr/src/lib.rs +++ b/src/repr/src/lib.rs @@ -59,7 +59,7 @@ pub use crate::relation::{ VersionedRelationDesc, }; pub use crate::row::collection::{ProtoRowCollection, RowCollection, SortedRowCollectionIter}; -pub use crate::row::encode::{RowColumnarDecoder, RowColumnarEncoder}; +pub use crate::row::encode::{preserves_order, RowColumnarDecoder, RowColumnarEncoder}; pub use crate::row::iter::{IntoRowIterator, RowIterator}; pub use crate::row::{ datum_list_size, datum_size, datums_size, read_datum, row_size, DatumList, DatumMap, diff --git a/src/repr/src/row/encode.rs b/src/repr/src/row/encode.rs index 8b288e711e65c..7d8ccada60af5 100644 --- a/src/repr/src/row/encode.rs +++ b/src/repr/src/row/encode.rs @@ -84,6 +84,52 @@ mod fixed_binary_sizes { } use fixed_binary_sizes::*; +/// Returns true iff the ordering of the "raw" and Persist-encoded versions of this columm would match: +/// ie. `sort(encode(column)) == encode(sort(column))`. This encoding has been designed so that this +/// is true for many types. +pub fn preserves_order(scalar_type: &ScalarType) -> bool { + match scalar_type { + ScalarType::Bool + | ScalarType::Int16 + | ScalarType::Int32 + | ScalarType::Int64 + | ScalarType::UInt16 + | ScalarType::UInt32 + | ScalarType::UInt64 + | ScalarType::Float32 + | ScalarType::Float64 + | ScalarType::Numeric { .. } + | ScalarType::Date + | ScalarType::Time + | ScalarType::Timestamp { .. } + | ScalarType::TimestampTz { .. } + | ScalarType::Interval + | ScalarType::Bytes + | ScalarType::String + | ScalarType::Uuid + | ScalarType::MzTimestamp + | ScalarType::MzAclItem + | ScalarType::AclItem => true, + ScalarType::Record { fields, .. } => fields + .iter() + .all(|(_, field_type)| preserves_order(&field_type.scalar_type)), + ScalarType::PgLegacyChar + | ScalarType::PgLegacyName + | ScalarType::Char { .. } + | ScalarType::VarChar { .. } + | ScalarType::Jsonb + | ScalarType::Array(_) + | ScalarType::List { .. } + | ScalarType::Oid + | ScalarType::Map { .. } + | ScalarType::RegProc + | ScalarType::RegType + | ScalarType::RegClass + | ScalarType::Int2Vector + | ScalarType::Range { .. } => false, + } +} + /// An encoder for a column of [`Datum`]s. #[derive(Debug)] struct DatumEncoder { @@ -2232,6 +2278,30 @@ mod tests { let ord_col = ::arrow::compute::take(&col, &indices, None).expect("takeable"); assert_eq!(row_col.as_ref(), ord_col.as_ref()); + // Check that our order matches the datum-native order when `preserves_order` is true. + let ordered_prefix_len = desc + .iter() + .take_while(|(_, c)| preserves_order(&c.scalar_type)) + .count(); + + let decoder = >::decoder_any(desc, ord_col.as_ref()).unwrap(); + let is_sorted = (0..ord_col.len()) + .map(|i| { + let mut row = Row::default(); + decoder.decode(i, &mut row); + row + }) + .is_sorted_by(|a, b| { + let a_prefix = a.iter().take(ordered_prefix_len); + let b_prefix = b.iter().take(ordered_prefix_len); + a_prefix.cmp(b_prefix).is_le() + }); + assert!( + is_sorted, + "ordering should be consistent on preserves_order columns: {:#?}", + desc.iter().take(ordered_prefix_len).collect_vec() + ); + // Check that our size estimates are consistent. assert_eq!( ord.goodbytes(), diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index 927f95bffe7b1..79a259ff73283 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -32,6 +32,7 @@ use crate::ast::{ pub enum MaterializedViewOptionName { /// The `ASSERT NOT NULL [=] ` option. AssertNotNull, + PartitionBy, RetainHistory, /// The `REFRESH [=] ...` option. Refresh, @@ -41,6 +42,7 @@ impl AstDisplay for MaterializedViewOptionName { fn fmt(&self, f: &mut AstFormatter) { match self { MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"), + MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"), MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"), MaterializedViewOptionName::Refresh => f.write_str("REFRESH"), } @@ -56,6 +58,7 @@ impl WithOptionName for MaterializedViewOptionName { fn redact_value(&self) -> bool { match self { MaterializedViewOptionName::AssertNotNull + | MaterializedViewOptionName::PartitionBy | MaterializedViewOptionName::RetainHistory | MaterializedViewOptionName::Refresh => false, } diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 6e7ed82558695..1b6202cc7645e 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -3801,12 +3801,16 @@ impl<'a> Parser<'a> { fn parse_materialized_view_option_name( &mut self, ) -> Result { - let option = self.expect_one_of_keywords(&[ASSERT, RETAIN, REFRESH])?; + let option = self.expect_one_of_keywords(&[ASSERT, PARTITION, RETAIN, REFRESH])?; let name = match option { ASSERT => { self.expect_keywords(&[NOT, NULL])?; MaterializedViewOptionName::AssertNotNull } + PARTITION => { + self.expect_keyword(BY)?; + MaterializedViewOptionName::PartitionBy + } RETAIN => { self.expect_keyword(HISTORY)?; MaterializedViewOptionName::RetainHistory diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 49a6709635499..a6f4a098edd8f 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -445,6 +445,21 @@ CREATE MATERIALIZED VIEW v IN CLUSTER [1] AS SELECT 1 => CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: Some(Resolved("1")), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) +parse-statement +CREATE MATERIALIZED VIEW v (n) WITH (PARTITION BY (n)) AS SELECT 1 +---- +CREATE MATERIALIZED VIEW v (n) WITH (PARTITION BY = (n)) AS SELECT 1 +=> +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [Ident("n")], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [MaterializedViewOption { name: PartitionBy, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("n")]))])) }] }) + +parse-statement +CREATE MATERIALIZED VIEW v (n, m) WITH (PARTITION BY (n, m)) AS SELECT (1, 2); +---- +CREATE MATERIALIZED VIEW v (n, m) WITH (PARTITION BY = (n, m)) AS SELECT ROW(1, 2) +=> +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [Ident("n"), Ident("m")], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Row { exprs: [Value(Number("1")), Value(Number("2"))] }, alias: None }], from: [], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [MaterializedViewOption { name: PartitionBy, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("n")])), UnresolvedItemName(UnresolvedItemName([Ident("m")]))])) }] }) + + parse-statement CREATE MATERIALIZED VIEW v WITH (REFRESH EVERY '1 day', ASSERT NOT NULL x) AS SELECT * FROM t; ---- diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 59f6e7105f3bd..af06e6ef3d012 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -39,7 +39,8 @@ use mz_repr::optimize::OptimizerFeatureOverrides; use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule}; use mz_repr::role_id::RoleId; use mz_repr::{ - strconv, ColumnName, ColumnType, GlobalId, RelationDesc, RelationType, ScalarType, Timestamp, + preserves_order, strconv, ColumnName, ColumnType, GlobalId, RelationDesc, RelationType, + ScalarType, Timestamp, }; use mz_sql_parser::ast::display::comma_separated; use mz_sql_parser::ast::{ @@ -159,8 +160,8 @@ use crate::plan::{ WebhookHeaders, WebhookValidation, }; use crate::session::vars::{ - self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_KAFKA_SINK_HEADERS, - ENABLE_KAFKA_SINK_PARTITION_BY, ENABLE_REFRESH_EVERY_MVS, + self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY, + ENABLE_KAFKA_SINK_HEADERS, ENABLE_KAFKA_SINK_PARTITION_BY, ENABLE_REFRESH_EVERY_MVS, }; use crate::{names, parse}; @@ -175,6 +176,23 @@ const MAX_NUM_COLUMNS: usize = 256; const MANAGED_REPLICA_PATTERN: std::sync::LazyLock = std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap()); +/// Given a relation desc and a column list, checks that: +/// - the column list is a prefix of the desc; +/// - all the listed columns are types that have meaningful Persist-level ordering. +fn check_partition_by(desc: &RelationDesc, partition_by: &[ColumnName]) -> Result<(), PlanError> { + for (idx, ((desc_name, desc_type), partition_name)) in + desc.iter().zip(partition_by.iter()).enumerate() + { + if desc_name != partition_name { + sql_bail!("PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"); + } + if !preserves_order(&desc_type.scalar_type) { + sql_bail!("PARTITION BY column {partition_name} has unsupported type"); + } + } + Ok(()) +} + pub fn describe_create_database( _: &StatementContext, _: CreateDatabaseStatement, @@ -2582,11 +2600,21 @@ pub fn plan_create_materialized_view( let MaterializedViewOptionExtracted { assert_not_null, + partition_by, retain_history, refresh, seen: _, }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?; + if let Some(partition_by) = partition_by { + scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?; + let partition_by: Vec<_> = partition_by + .into_iter() + .map(normalize::column_name) + .collect(); + check_partition_by(&desc, &partition_by)?; + } + let refresh_schedule = { let mut refresh_schedule = RefreshSchedule::default(); let mut on_commits_seen = 0; @@ -2812,6 +2840,7 @@ pub fn plan_create_materialized_view( generate_extracted_config!( MaterializedViewOption, (AssertNotNull, Ident, AllowMultiple), + (PartitionBy, Vec), (RetainHistory, OptionalDuration), (Refresh, RefreshOptionValue, AllowMultiple) ); diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index 05eb40a2ec6b7..aabdc5f9150e1 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -1866,6 +1866,12 @@ feature_flags!( default: false, enable_for_item_parsing: true, }, + { + name: enable_collection_partition_by, + desc: "PARTITION BY", + default: false, + enable_for_item_parsing: true, + }, { name: enable_multi_worker_storage_persist_sink, desc: "multi-worker storage persist sink", diff --git a/test/testdrive/partition-by.td b/test/testdrive/partition-by.td new file mode 100644 index 0000000000000..98d73b975b6a0 --- /dev/null +++ b/test/testdrive/partition-by.td @@ -0,0 +1,35 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Tests for the new PARTITION BY syntax for persisted collections. + +# First, check that the flag is disabled by default. + +! CREATE MATERIALIZED VIEW integers (n) WITH (PARTITION BY (n)) AS VALUES (3), (2), (1); +contains:PARTITION BY + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET enable_collection_partition_by = true + +> CREATE MATERIALIZED VIEW integers (n) WITH (PARTITION BY (n)) AS VALUES (3), (2), (1); + +> CREATE MATERIALIZED VIEW integers_strings (n, m) WITH (PARTITION BY (n, m)) + AS VALUES (3, 'three'), (2, 'two'), (1, 'one'); + +! CREATE MATERIALIZED VIEW out_of_order (n, m) WITH (PARTITION BY (m, n)) + AS VALUES (3, 'three'), (2, 'two'), (1, 'one'); +contains:PARTITION BY columns should be a prefix + +! CREATE MATERIALIZED VIEW out_of_order (n, m) WITH (PARTITION BY (m)) + AS VALUES (3, 'three'), (2, 'two'), (1, 'one'); +contains:PARTITION BY columns should be a prefix + +! CREATE MATERIALIZED VIEW unsupported_type (n, m) WITH (PARTITION BY (n, m)) + AS VALUES (3, '[3]'::json), (2, '[2]'::json), (1, '[1]'::json); +contains:PARTITION BY column m has unsupported type