Skip to content

Commit f4194e7

Browse files
authored
Merge pull request #30355 from bkirwi/partition-by
[persist] PARTITION BY for Persist-backed collections
2 parents ad9a093 + 1725762 commit f4194e7

File tree

8 files changed

+167
-5
lines changed

8 files changed

+167
-5
lines changed

src/repr/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub use crate::relation::{
5959
VersionedRelationDesc,
6060
};
6161
pub use crate::row::collection::{ProtoRowCollection, RowCollection, SortedRowCollectionIter};
62-
pub use crate::row::encode::{RowColumnarDecoder, RowColumnarEncoder};
62+
pub use crate::row::encode::{preserves_order, RowColumnarDecoder, RowColumnarEncoder};
6363
pub use crate::row::iter::{IntoRowIterator, RowIterator};
6464
pub use crate::row::{
6565
datum_list_size, datum_size, datums_size, read_datum, row_size, DatumList, DatumMap,

src/repr/src/row/encode.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,52 @@ mod fixed_binary_sizes {
8484
}
8585
use fixed_binary_sizes::*;
8686

87+
/// Returns true iff the ordering of the "raw" and Persist-encoded versions of this columm would match:
88+
/// ie. `sort(encode(column)) == encode(sort(column))`. This encoding has been designed so that this
89+
/// is true for many types.
90+
pub fn preserves_order(scalar_type: &ScalarType) -> bool {
91+
match scalar_type {
92+
ScalarType::Bool
93+
| ScalarType::Int16
94+
| ScalarType::Int32
95+
| ScalarType::Int64
96+
| ScalarType::UInt16
97+
| ScalarType::UInt32
98+
| ScalarType::UInt64
99+
| ScalarType::Float32
100+
| ScalarType::Float64
101+
| ScalarType::Numeric { .. }
102+
| ScalarType::Date
103+
| ScalarType::Time
104+
| ScalarType::Timestamp { .. }
105+
| ScalarType::TimestampTz { .. }
106+
| ScalarType::Interval
107+
| ScalarType::Bytes
108+
| ScalarType::String
109+
| ScalarType::Uuid
110+
| ScalarType::MzTimestamp
111+
| ScalarType::MzAclItem
112+
| ScalarType::AclItem => true,
113+
ScalarType::Record { fields, .. } => fields
114+
.iter()
115+
.all(|(_, field_type)| preserves_order(&field_type.scalar_type)),
116+
ScalarType::PgLegacyChar
117+
| ScalarType::PgLegacyName
118+
| ScalarType::Char { .. }
119+
| ScalarType::VarChar { .. }
120+
| ScalarType::Jsonb
121+
| ScalarType::Array(_)
122+
| ScalarType::List { .. }
123+
| ScalarType::Oid
124+
| ScalarType::Map { .. }
125+
| ScalarType::RegProc
126+
| ScalarType::RegType
127+
| ScalarType::RegClass
128+
| ScalarType::Int2Vector
129+
| ScalarType::Range { .. } => false,
130+
}
131+
}
132+
87133
/// An encoder for a column of [`Datum`]s.
88134
#[derive(Debug)]
89135
struct DatumEncoder {
@@ -2232,6 +2278,30 @@ mod tests {
22322278
let ord_col = ::arrow::compute::take(&col, &indices, None).expect("takeable");
22332279
assert_eq!(row_col.as_ref(), ord_col.as_ref());
22342280

2281+
// Check that our order matches the datum-native order when `preserves_order` is true.
2282+
let ordered_prefix_len = desc
2283+
.iter()
2284+
.take_while(|(_, c)| preserves_order(&c.scalar_type))
2285+
.count();
2286+
2287+
let decoder = <RelationDesc as Schema2<Row>>::decoder_any(desc, ord_col.as_ref()).unwrap();
2288+
let is_sorted = (0..ord_col.len())
2289+
.map(|i| {
2290+
let mut row = Row::default();
2291+
decoder.decode(i, &mut row);
2292+
row
2293+
})
2294+
.is_sorted_by(|a, b| {
2295+
let a_prefix = a.iter().take(ordered_prefix_len);
2296+
let b_prefix = b.iter().take(ordered_prefix_len);
2297+
a_prefix.cmp(b_prefix).is_le()
2298+
});
2299+
assert!(
2300+
is_sorted,
2301+
"ordering should be consistent on preserves_order columns: {:#?}",
2302+
desc.iter().take(ordered_prefix_len).collect_vec()
2303+
);
2304+
22352305
// Check that our size estimates are consistent.
22362306
assert_eq!(
22372307
ord.goodbytes(),

src/sql-parser/src/ast/defs/ddl.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::ast::{
3232
pub enum MaterializedViewOptionName {
3333
/// The `ASSERT NOT NULL [=] <ident>` option.
3434
AssertNotNull,
35+
PartitionBy,
3536
RetainHistory,
3637
/// The `REFRESH [=] ...` option.
3738
Refresh,
@@ -41,6 +42,7 @@ impl AstDisplay for MaterializedViewOptionName {
4142
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
4243
match self {
4344
MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
45+
MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
4446
MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
4547
MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
4648
}
@@ -56,6 +58,7 @@ impl WithOptionName for MaterializedViewOptionName {
5658
fn redact_value(&self) -> bool {
5759
match self {
5860
MaterializedViewOptionName::AssertNotNull
61+
| MaterializedViewOptionName::PartitionBy
5962
| MaterializedViewOptionName::RetainHistory
6063
| MaterializedViewOptionName::Refresh => false,
6164
}

src/sql-parser/src/parser.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3801,12 +3801,16 @@ impl<'a> Parser<'a> {
38013801
fn parse_materialized_view_option_name(
38023802
&mut self,
38033803
) -> Result<MaterializedViewOptionName, ParserError> {
3804-
let option = self.expect_one_of_keywords(&[ASSERT, RETAIN, REFRESH])?;
3804+
let option = self.expect_one_of_keywords(&[ASSERT, PARTITION, RETAIN, REFRESH])?;
38053805
let name = match option {
38063806
ASSERT => {
38073807
self.expect_keywords(&[NOT, NULL])?;
38083808
MaterializedViewOptionName::AssertNotNull
38093809
}
3810+
PARTITION => {
3811+
self.expect_keyword(BY)?;
3812+
MaterializedViewOptionName::PartitionBy
3813+
}
38103814
RETAIN => {
38113815
self.expect_keyword(HISTORY)?;
38123816
MaterializedViewOptionName::RetainHistory

src/sql-parser/tests/testdata/ddl

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,21 @@ CREATE MATERIALIZED VIEW v IN CLUSTER [1] AS SELECT 1
445445
=>
446446
CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: Some(Resolved("1")), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] })
447447

448+
parse-statement
449+
CREATE MATERIALIZED VIEW v (n) WITH (PARTITION BY (n)) AS SELECT 1
450+
----
451+
CREATE MATERIALIZED VIEW v (n) WITH (PARTITION BY = (n)) AS SELECT 1
452+
=>
453+
CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [Ident("n")], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [MaterializedViewOption { name: PartitionBy, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("n")]))])) }] })
454+
455+
parse-statement
456+
CREATE MATERIALIZED VIEW v (n, m) WITH (PARTITION BY (n, m)) AS SELECT (1, 2);
457+
----
458+
CREATE MATERIALIZED VIEW v (n, m) WITH (PARTITION BY = (n, m)) AS SELECT ROW(1, 2)
459+
=>
460+
CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [Ident("n"), Ident("m")], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Row { exprs: [Value(Number("1")), Value(Number("2"))] }, alias: None }], from: [], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [MaterializedViewOption { name: PartitionBy, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("n")])), UnresolvedItemName(UnresolvedItemName([Ident("m")]))])) }] })
461+
462+
448463
parse-statement
449464
CREATE MATERIALIZED VIEW v WITH (REFRESH EVERY '1 day', ASSERT NOT NULL x) AS SELECT * FROM t;
450465
----

src/sql/src/plan/statement/ddl.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ use mz_repr::optimize::OptimizerFeatureOverrides;
3939
use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
4040
use mz_repr::role_id::RoleId;
4141
use mz_repr::{
42-
strconv, ColumnName, ColumnType, GlobalId, RelationDesc, RelationType, ScalarType, Timestamp,
42+
preserves_order, strconv, ColumnName, ColumnType, GlobalId, RelationDesc, RelationType,
43+
ScalarType, Timestamp,
4344
};
4445
use mz_sql_parser::ast::display::comma_separated;
4546
use mz_sql_parser::ast::{
@@ -159,8 +160,8 @@ use crate::plan::{
159160
WebhookHeaders, WebhookValidation,
160161
};
161162
use crate::session::vars::{
162-
self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_KAFKA_SINK_HEADERS,
163-
ENABLE_KAFKA_SINK_PARTITION_BY, ENABLE_REFRESH_EVERY_MVS,
163+
self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
164+
ENABLE_KAFKA_SINK_HEADERS, ENABLE_KAFKA_SINK_PARTITION_BY, ENABLE_REFRESH_EVERY_MVS,
164165
};
165166
use crate::{names, parse};
166167

@@ -175,6 +176,23 @@ const MAX_NUM_COLUMNS: usize = 256;
175176
const MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
176177
std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
177178

179+
/// Given a relation desc and a column list, checks that:
180+
/// - the column list is a prefix of the desc;
181+
/// - all the listed columns are types that have meaningful Persist-level ordering.
182+
fn check_partition_by(desc: &RelationDesc, partition_by: &[ColumnName]) -> Result<(), PlanError> {
183+
for (idx, ((desc_name, desc_type), partition_name)) in
184+
desc.iter().zip(partition_by.iter()).enumerate()
185+
{
186+
if desc_name != partition_name {
187+
sql_bail!("PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})");
188+
}
189+
if !preserves_order(&desc_type.scalar_type) {
190+
sql_bail!("PARTITION BY column {partition_name} has unsupported type");
191+
}
192+
}
193+
Ok(())
194+
}
195+
178196
pub fn describe_create_database(
179197
_: &StatementContext,
180198
_: CreateDatabaseStatement,
@@ -2582,11 +2600,21 @@ pub fn plan_create_materialized_view(
25822600

25832601
let MaterializedViewOptionExtracted {
25842602
assert_not_null,
2603+
partition_by,
25852604
retain_history,
25862605
refresh,
25872606
seen: _,
25882607
}: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
25892608

2609+
if let Some(partition_by) = partition_by {
2610+
scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2611+
let partition_by: Vec<_> = partition_by
2612+
.into_iter()
2613+
.map(normalize::column_name)
2614+
.collect();
2615+
check_partition_by(&desc, &partition_by)?;
2616+
}
2617+
25902618
let refresh_schedule = {
25912619
let mut refresh_schedule = RefreshSchedule::default();
25922620
let mut on_commits_seen = 0;
@@ -2812,6 +2840,7 @@ pub fn plan_create_materialized_view(
28122840
generate_extracted_config!(
28132841
MaterializedViewOption,
28142842
(AssertNotNull, Ident, AllowMultiple),
2843+
(PartitionBy, Vec<Ident>),
28152844
(RetainHistory, OptionalDuration),
28162845
(Refresh, RefreshOptionValue<Aug>, AllowMultiple)
28172846
);

src/sql/src/session/vars/definitions.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1866,6 +1866,12 @@ feature_flags!(
18661866
default: false,
18671867
enable_for_item_parsing: true,
18681868
},
1869+
{
1870+
name: enable_collection_partition_by,
1871+
desc: "PARTITION BY",
1872+
default: false,
1873+
enable_for_item_parsing: true,
1874+
},
18691875
{
18701876
name: enable_multi_worker_storage_persist_sink,
18711877
desc: "multi-worker storage persist sink",

test/testdrive/partition-by.td

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Copyright Materialize, Inc. and contributors. All rights reserved.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the LICENSE file at the root of this repository.
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0.
9+
10+
# Tests for the new PARTITION BY syntax for persisted collections.
11+
12+
# First, check that the flag is disabled by default.
13+
14+
! CREATE MATERIALIZED VIEW integers (n) WITH (PARTITION BY (n)) AS VALUES (3), (2), (1);
15+
contains:PARTITION BY
16+
17+
$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
18+
ALTER SYSTEM SET enable_collection_partition_by = true
19+
20+
> CREATE MATERIALIZED VIEW integers (n) WITH (PARTITION BY (n)) AS VALUES (3), (2), (1);
21+
22+
> CREATE MATERIALIZED VIEW integers_strings (n, m) WITH (PARTITION BY (n, m))
23+
AS VALUES (3, 'three'), (2, 'two'), (1, 'one');
24+
25+
! CREATE MATERIALIZED VIEW out_of_order (n, m) WITH (PARTITION BY (m, n))
26+
AS VALUES (3, 'three'), (2, 'two'), (1, 'one');
27+
contains:PARTITION BY columns should be a prefix
28+
29+
! CREATE MATERIALIZED VIEW out_of_order (n, m) WITH (PARTITION BY (m))
30+
AS VALUES (3, 'three'), (2, 'two'), (1, 'one');
31+
contains:PARTITION BY columns should be a prefix
32+
33+
! CREATE MATERIALIZED VIEW unsupported_type (n, m) WITH (PARTITION BY (n, m))
34+
AS VALUES (3, '[3]'::json), (2, '[2]'::json), (1, '[1]'::json);
35+
contains:PARTITION BY column m has unsupported type

0 commit comments

Comments
 (0)