Skip to content

[persist] PARTITION BY for Persist-backed collections #30355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/repr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub use crate::relation::{
VersionedRelationDesc,
};
pub use crate::row::collection::{ProtoRowCollection, RowCollection, SortedRowCollectionIter};
pub use crate::row::encode::{RowColumnarDecoder, RowColumnarEncoder};
pub use crate::row::encode::{preserves_order, RowColumnarDecoder, RowColumnarEncoder};
pub use crate::row::iter::{IntoRowIterator, RowIterator};
pub use crate::row::{
datum_list_size, datum_size, datums_size, read_datum, row_size, DatumList, DatumMap,
Expand Down
70 changes: 70 additions & 0 deletions src/repr/src/row/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,52 @@ mod fixed_binary_sizes {
}
use fixed_binary_sizes::*;

/// Returns true iff the ordering of the "raw" and Persist-encoded versions of this columm would match:
/// ie. `sort(encode(column)) == encode(sort(column))`. This encoding has been designed so that this
/// is true for many types.
pub fn preserves_order(scalar_type: &ScalarType) -> bool {
Comment on lines +87 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anyway to add tests for this to make sure it's correct? Also when adding a new type, how do we know whether to return true or false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout! I've added some lines to our property test that checks the sorted column is also sorted according to the schema when preserves_order is true.

Also when adding a new type, how do we know whether to return true or false?

I think the idea is that whoever is designing a new encoding for a type should make a reasonable effort to ensure this property is true... so they should be in a good position to know whether they managed it or not. (And hopefully now the test will keep them honest!)

match scalar_type {
ScalarType::Bool
| ScalarType::Int16
| ScalarType::Int32
| ScalarType::Int64
| ScalarType::UInt16
| ScalarType::UInt32
| ScalarType::UInt64
| ScalarType::Float32
| ScalarType::Float64
| ScalarType::Numeric { .. }
| ScalarType::Date
| ScalarType::Time
| ScalarType::Timestamp { .. }
| ScalarType::TimestampTz { .. }
| ScalarType::Interval
| ScalarType::Bytes
| ScalarType::String
| ScalarType::Uuid
| ScalarType::MzTimestamp
| ScalarType::MzAclItem
| ScalarType::AclItem => true,
ScalarType::Record { fields, .. } => fields
.iter()
.all(|(_, field_type)| preserves_order(&field_type.scalar_type)),
ScalarType::PgLegacyChar
| ScalarType::PgLegacyName
| ScalarType::Char { .. }
| ScalarType::VarChar { .. }
| ScalarType::Jsonb
| ScalarType::Array(_)
| ScalarType::List { .. }
| ScalarType::Oid
| ScalarType::Map { .. }
| ScalarType::RegProc
| ScalarType::RegType
| ScalarType::RegClass
| ScalarType::Int2Vector
| ScalarType::Range { .. } => false,
}
}

/// An encoder for a column of [`Datum`]s.
#[derive(Debug)]
struct DatumEncoder {
Expand Down Expand Up @@ -2232,6 +2278,30 @@ mod tests {
let ord_col = ::arrow::compute::take(&col, &indices, None).expect("takeable");
assert_eq!(row_col.as_ref(), ord_col.as_ref());

// Check that our order matches the datum-native order when `preserves_order` is true.
let ordered_prefix_len = desc
.iter()
.take_while(|(_, c)| preserves_order(&c.scalar_type))
.count();

let decoder = <RelationDesc as Schema2<Row>>::decoder_any(desc, ord_col.as_ref()).unwrap();
let is_sorted = (0..ord_col.len())
.map(|i| {
let mut row = Row::default();
decoder.decode(i, &mut row);
row
})
.is_sorted_by(|a, b| {
let a_prefix = a.iter().take(ordered_prefix_len);
let b_prefix = b.iter().take(ordered_prefix_len);
a_prefix.cmp(b_prefix).is_le()
});
assert!(
is_sorted,
"ordering should be consistent on preserves_order columns: {:#?}",
desc.iter().take(ordered_prefix_len).collect_vec()
);

// Check that our size estimates are consistent.
assert_eq!(
ord.goodbytes(),
Expand Down
3 changes: 3 additions & 0 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::ast::{
pub enum MaterializedViewOptionName {
/// The `ASSERT NOT NULL [=] <ident>` option.
AssertNotNull,
PartitionBy,
RetainHistory,
/// The `REFRESH [=] ...` option.
Refresh,
Expand All @@ -41,6 +42,7 @@ impl AstDisplay for MaterializedViewOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
MaterializedViewOptionName::PartitionBy => f.write_str("PARTITION BY"),
MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
}
Expand All @@ -56,6 +58,7 @@ impl WithOptionName for MaterializedViewOptionName {
fn redact_value(&self) -> bool {
match self {
MaterializedViewOptionName::AssertNotNull
| MaterializedViewOptionName::PartitionBy
| MaterializedViewOptionName::RetainHistory
| MaterializedViewOptionName::Refresh => false,
}
Expand Down
6 changes: 5 additions & 1 deletion src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3801,12 +3801,16 @@ impl<'a> Parser<'a> {
fn parse_materialized_view_option_name(
&mut self,
) -> Result<MaterializedViewOptionName, ParserError> {
let option = self.expect_one_of_keywords(&[ASSERT, RETAIN, REFRESH])?;
let option = self.expect_one_of_keywords(&[ASSERT, PARTITION, RETAIN, REFRESH])?;
let name = match option {
ASSERT => {
self.expect_keywords(&[NOT, NULL])?;
MaterializedViewOptionName::AssertNotNull
}
PARTITION => {
self.expect_keyword(BY)?;
MaterializedViewOptionName::PartitionBy
}
RETAIN => {
self.expect_keyword(HISTORY)?;
MaterializedViewOptionName::RetainHistory
Expand Down
15 changes: 15 additions & 0 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,21 @@ CREATE MATERIALIZED VIEW v IN CLUSTER [1] AS SELECT 1
=>
CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: Some(Resolved("1")), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] })

parse-statement
CREATE MATERIALIZED VIEW v (n) WITH (PARTITION BY (n)) AS SELECT 1
----
CREATE MATERIALIZED VIEW v (n) WITH (PARTITION BY = (n)) AS SELECT 1
=>
CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [Ident("n")], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [MaterializedViewOption { name: PartitionBy, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("n")]))])) }] })

parse-statement
CREATE MATERIALIZED VIEW v (n, m) WITH (PARTITION BY (n, m)) AS SELECT (1, 2);
----
CREATE MATERIALIZED VIEW v (n, m) WITH (PARTITION BY = (n, m)) AS SELECT ROW(1, 2)
=>
CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [Ident("n"), Ident("m")], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Row { exprs: [Value(Number("1")), Value(Number("2"))] }, alias: None }], from: [], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [MaterializedViewOption { name: PartitionBy, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("n")])), UnresolvedItemName(UnresolvedItemName([Ident("m")]))])) }] })


parse-statement
CREATE MATERIALIZED VIEW v WITH (REFRESH EVERY '1 day', ASSERT NOT NULL x) AS SELECT * FROM t;
----
Expand Down
35 changes: 32 additions & 3 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ use mz_repr::optimize::OptimizerFeatureOverrides;
use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
use mz_repr::role_id::RoleId;
use mz_repr::{
strconv, ColumnName, ColumnType, GlobalId, RelationDesc, RelationType, ScalarType, Timestamp,
preserves_order, strconv, ColumnName, ColumnType, GlobalId, RelationDesc, RelationType,
ScalarType, Timestamp,
};
use mz_sql_parser::ast::display::comma_separated;
use mz_sql_parser::ast::{
Expand Down Expand Up @@ -159,8 +160,8 @@ use crate::plan::{
WebhookHeaders, WebhookValidation,
};
use crate::session::vars::{
self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_KAFKA_SINK_HEADERS,
ENABLE_KAFKA_SINK_PARTITION_BY, ENABLE_REFRESH_EVERY_MVS,
self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
ENABLE_KAFKA_SINK_HEADERS, ENABLE_KAFKA_SINK_PARTITION_BY, ENABLE_REFRESH_EVERY_MVS,
};
use crate::{names, parse};

Expand All @@ -175,6 +176,23 @@ const MAX_NUM_COLUMNS: usize = 256;
const MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());

/// Given a relation desc and a column list, checks that:
/// - the column list is a prefix of the desc;
/// - all the listed columns are types that have meaningful Persist-level ordering.
fn check_partition_by(desc: &RelationDesc, partition_by: &[ColumnName]) -> Result<(), PlanError> {
for (idx, ((desc_name, desc_type), partition_name)) in
desc.iter().zip(partition_by.iter()).enumerate()
{
if desc_name != partition_name {
sql_bail!("PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})");
}
if !preserves_order(&desc_type.scalar_type) {
sql_bail!("PARTITION BY column {partition_name} has unsupported type");
}
}
Ok(())
}

pub fn describe_create_database(
_: &StatementContext,
_: CreateDatabaseStatement,
Expand Down Expand Up @@ -2582,11 +2600,21 @@ pub fn plan_create_materialized_view(

let MaterializedViewOptionExtracted {
assert_not_null,
partition_by,
retain_history,
refresh,
seen: _,
}: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;

if let Some(partition_by) = partition_by {
scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
let partition_by: Vec<_> = partition_by
.into_iter()
.map(normalize::column_name)
.collect();
check_partition_by(&desc, &partition_by)?;
}

let refresh_schedule = {
let mut refresh_schedule = RefreshSchedule::default();
let mut on_commits_seen = 0;
Expand Down Expand Up @@ -2812,6 +2840,7 @@ pub fn plan_create_materialized_view(
generate_extracted_config!(
MaterializedViewOption,
(AssertNotNull, Ident, AllowMultiple),
(PartitionBy, Vec<Ident>),
(RetainHistory, OptionalDuration),
(Refresh, RefreshOptionValue<Aug>, AllowMultiple)
);
Expand Down
6 changes: 6 additions & 0 deletions src/sql/src/session/vars/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,12 @@ feature_flags!(
default: false,
enable_for_item_parsing: true,
},
{
name: enable_collection_partition_by,
desc: "PARTITION BY",
default: false,
enable_for_item_parsing: true,
},
{
name: enable_multi_worker_storage_persist_sink,
desc: "multi-worker storage persist sink",
Expand Down
35 changes: 35 additions & 0 deletions test/testdrive/partition-by.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

# Tests for the new PARTITION BY syntax for persisted collections.

# First, check that the flag is disabled by default.

! CREATE MATERIALIZED VIEW integers (n) WITH (PARTITION BY (n)) AS VALUES (3), (2), (1);
contains:PARTITION BY

$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_collection_partition_by = true

> CREATE MATERIALIZED VIEW integers (n) WITH (PARTITION BY (n)) AS VALUES (3), (2), (1);

> CREATE MATERIALIZED VIEW integers_strings (n, m) WITH (PARTITION BY (n, m))
AS VALUES (3, 'three'), (2, 'two'), (1, 'one');

! CREATE MATERIALIZED VIEW out_of_order (n, m) WITH (PARTITION BY (m, n))
AS VALUES (3, 'three'), (2, 'two'), (1, 'one');
contains:PARTITION BY columns should be a prefix

! CREATE MATERIALIZED VIEW out_of_order (n, m) WITH (PARTITION BY (m))
AS VALUES (3, 'three'), (2, 'two'), (1, 'one');
contains:PARTITION BY columns should be a prefix

! CREATE MATERIALIZED VIEW unsupported_type (n, m) WITH (PARTITION BY (n, m))
AS VALUES (3, '[3]'::json), (2, '[2]'::json), (1, '[1]'::json);
contains:PARTITION BY column m has unsupported type
Loading