Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql: add support for invisible columns #31239

Merged
merged 10 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ steps:
- id: checks-0dt-restart-entire-mz-forced-migrations-azurite
label: "Checks 0dt restart of the entire Mz with forced migrations with :azure: blob store"
depends_on: build-aarch64
timeout_in_minutes: 120
timeout_in_minutes: 180
parallelism: 2
agents:
queue: hetzner-aarch64-16cpu-32gb
Expand Down
2 changes: 1 addition & 1 deletion ci/test/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ steps:

- id: mysql-cdc-resumption
label: "MySQL CDC resumption tests"
parallelism: 4
parallelism: 6
depends_on: build-aarch64
timeout_in_minutes: 30
inputs: [test/mysql-cdc-resumption]
Expand Down
101 changes: 101 additions & 0 deletions misc/python/materialize/checks/all_checks/mysql_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,5 +480,106 @@ def validate(self) -> Testdrive:
)


@externally_idempotent(False)
class MySqlInvisibleColumn(Check):
def _can_run(self, e: Executor) -> bool:
return self.base_version > MzVersion.parse_mz("v0.132.0-dev")

def initialize(self) -> Testdrive:
return Testdrive(
dedent(
f"""
$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

$ mysql-execute name=mysql
# create the database if it does not exist yet but do not drop it
CREATE DATABASE IF NOT EXISTS public;
USE public;

CREATE USER mysql4 IDENTIFIED BY 'mysql';
GRANT REPLICATION SLAVE ON *.* TO mysql4;
GRANT ALL ON public.* TO mysql4;

DROP TABLE IF EXISTS mysql_invisible_table;

CREATE TABLE mysql_invisible_table (f1 INT, f2 FLOAT INVISIBLE, f3 DATE INVISIBLE, f4 TEXT INVISIBLE);

INSERT INTO mysql_invisible_table (f1, f2, f3, f4) VALUES (1, 0.1, '2025-01-01', 'one');

> CREATE SECRET mysql_invisible_pass AS 'mysql';
> CREATE CONNECTION mysql_invisible_conn TO MYSQL (
HOST 'mysql',
USER mysql4,
PASSWORD SECRET mysql_invisible_pass
)

> CREATE SOURCE mysql_invisible_source
FROM MYSQL CONNECTION mysql_invisible_conn;
> CREATE TABLE mysql_invisible_table FROM SOURCE mysql_invisible_source (REFERENCE public.mysql_invisible_table);

# Return all rows
> CREATE MATERIALIZED VIEW mysql_invisible_view AS
SELECT * FROM mysql_invisible_table
"""
)
)

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(dedent(s))
for s in [
f"""
$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

$ mysql-execute name=mysql
USE public;
INSERT INTO mysql_invisible_table (f1, f2, f3, f4) VALUES (2, 0.2, '2025-02-02', 'two');
""",
f"""
$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

$ mysql-execute name=mysql
USE public;
INSERT INTO mysql_invisible_table (f1, f2, f3, f4) VALUES (3, 0.3, '2025-03-03', 'three');
""",
]
]

def validate(self) -> Testdrive:
return Testdrive(
dedent(
f"""
> SELECT * FROM mysql_invisible_table;
1 0.1 2025-01-01 one
2 0.2 2025-02-02 two
3 0.3 2025-03-03 three

$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}

$ mysql-execute name=mysql
USE public;
ALTER TABLE mysql_invisible_table ALTER COLUMN f2 SET VISIBLE;
INSERT INTO mysql_invisible_table (f1, f2, f3, f4) VALUES (4, 0.4, '2025-04-04', 'four');

> SELECT * FROM mysql_invisible_table;
1 0.1 2025-01-01 one
2 0.2 2025-02-02 two
3 0.3 2025-03-03 three
4 0.4 2025-04-04 four

# Rollback the last INSERTs so that validate() can be called multiple times
$ mysql-execute name=mysql
DELETE FROM mysql_invisible_table WHERE f1 = 4;
ALTER TABLE mysql_invisible_table ALTER COLUMN f2 SET INVISIBLE;

> SELECT * FROM mysql_invisible_table;
1 0.1 2025-01-01 one
2 0.2 2025-02-02 two
3 0.3 2025-03-03 three
"""
)
)


def remove_target_cluster_from_explain(sql: str) -> str:
return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)
23 changes: 23 additions & 0 deletions src/mysql-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,33 @@ pub enum MySqlError {
MySql(#[from] mysql_async::Error),
}

/// Quotes MySQL identifiers. [See MySQL quote_identifier()](https://github.com/mysql/mysql-sys/blob/master/functions/quote_identifier.sql)
pub fn quote_identifier(identifier: &str) -> String {
let mut escaped = identifier.replace("`", "``");
escaped.insert(0, '`');
escaped.push('`');
escaped
}

// NOTE: this error was renamed between MySQL 5.7 and 8.0
// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html#error_er_source_fatal_error_reading_binlog
// https://dev.mysql.com/doc/mysql-errors/5.7/en/server-error-reference.html#error_er_master_fatal_error_reading_binlog
pub const ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE: u16 = 1236;

// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html#error_er_no_such_table
pub const ER_NO_SUCH_TABLE: u16 = 1146;

#[cfg(test)]
mod tests {

use super::quote_identifier;
#[mz_ore::test]
fn test_identifier_quoting() {
let expected = vec!["`a`", "`naughty``sql`", "```;naughty;sql;```"];
let input = ["a", "naughty`sql", "`;naughty;sql;`"]
.iter()
.map(|raw_str| quote_identifier(raw_str))
.collect::<Vec<_>>();
assert_eq!(expected, input);
}
}
8 changes: 7 additions & 1 deletion src/storage/src/source/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use std::rc::Rc;

use differential_dataflow::AsCollection;
use itertools::Itertools;
use mz_mysql_util::quote_identifier;
use mz_ore::cast::CastFrom;
use mz_repr::Diff;
use mz_repr::GlobalId;
Expand Down Expand Up @@ -338,7 +339,12 @@ impl MySqlTableName {

impl fmt::Display for MySqlTableName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "`{}`.`{}`", self.0, self.1)
write!(
f,
"{}.{}",
quote_identifier(&self.0),
quote_identifier(&self.1)
)
}
}

Expand Down
78 changes: 74 additions & 4 deletions src/storage/src/source/mysql/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,12 @@ use std::sync::Arc;

use differential_dataflow::AsCollection;
use futures::TryStreamExt;
use itertools::Itertools;
use mysql_async::prelude::Queryable;
use mysql_async::{IsolationLevel, Row as MySqlRow, TxOpts};
use mz_mysql_util::{pack_mysql_row, query_sys_var, MySqlError, ER_NO_SUCH_TABLE};
use mz_mysql_util::{
pack_mysql_row, query_sys_var, quote_identifier, MySqlError, ER_NO_SUCH_TABLE,
};
use mz_ore::cast::CastFrom;
use mz_ore::future::InTask;
use mz_ore::iter::IteratorExt;
Expand Down Expand Up @@ -404,9 +407,8 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(

let mut snapshot_staged = 0;
for (table, outputs) in &reader_snapshot_table_info {
let query = format!("SELECT * FROM {}", table);
trace!(%id, "timely-{worker_id} reading snapshot from \
table '{table}'");
let query = build_snapshot_query(outputs);
trace!(%id, "timely-{worker_id} reading snapshot query='{}'", query);
let mut results = tx.exec_stream(query, ()).await?;
let mut count = 0;
while let Some(row) = results.try_next().await? {
Expand Down Expand Up @@ -517,6 +519,29 @@ where
Ok(total)
}

/// Builds the SQL query to be used for creating the snapshot using the first entry in outputs.
///
/// Expect `outputs` to contain entries for a single table, and to have at least 1 entry.
/// Expect that each MySqlTableDesc entry contains all columns described in information_schema.columns.
#[must_use]
fn build_snapshot_query(outputs: &[SourceOutputInfo]) -> String {
let info = outputs.first().expect("MySQL table info");
for output in &outputs[1..] {
assert!(
info.desc.columns == output.desc.columns,
"Mismatch in table descriptions for {}",
info.table_name
);
}
let columns = info
.desc
.columns
.iter()
.map(|col| quote_identifier(&col.name))
.join(", ");
format!("SELECT {} FROM {}", columns, info.table_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(sorry for the drive-by review) Do we have to worry about SQL injection here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm.. good point, I made an unfortunate assumption that the existing format! was good enough.

I'll update the snapshot command and collect_table_statistics to use prepared statements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did wonder about that during review and concluded that we don't have to be worried as to perform an injection you have to somehow create weirdly named columns, which means you already admin level have access to the database. My understanding is that we can't use prepared statements here since with those you can put placeholders for values but here we want to put placeholders for column names. If it's possible and easy to do we should maybe do it

Copy link
Contributor Author

@martykulma martykulma Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done this kind of thing in PG by building dynamic statements using virtual sets for the identifiers, but I'm not as familiar with MySQL, and this is proving challenging to try to implement via prepared statements.

mysql_async client doesn't provide a quote_identifier, but I can write one based on the stored procedure implementation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

}

#[derive(Default)]
struct TableStatistics {
count_latency: f64,
Expand All @@ -541,3 +566,48 @@ where

Ok(stats)
}

#[cfg(test)]
mod tests {
use super::*;
use mz_mysql_util::{MySqlColumnDesc, MySqlTableDesc};
use timely::progress::Antichain;

#[mz_ore::test]
fn snapshot_query_duplicate_table() {
let schema_name = "myschema".to_string();
let table_name = "mytable".to_string();
let table = MySqlTableName(schema_name.clone(), table_name.clone());
let columns = ["c1", "c2", "c3"]
.iter()
.map(|col| MySqlColumnDesc {
name: col.to_string(),
column_type: None,
meta: None,
})
.collect::<Vec<_>>();
let desc = MySqlTableDesc {
schema_name: schema_name.clone(),
name: table_name.clone(),
columns,
keys: BTreeSet::default(),
};
let info = SourceOutputInfo {
output_index: 1, // ignored
table_name: table.clone(),
desc,
text_columns: vec![],
exclude_columns: vec![],
initial_gtid_set: Antichain::default(),
resume_upper: Antichain::default(),
};
let query = build_snapshot_query(&[info.clone(), info]);
assert_eq!(
format!(
"SELECT `c1`, `c2`, `c3` FROM `{}`.`{}`",
&schema_name, &table_name
),
query
);
}
}
41 changes: 26 additions & 15 deletions test/mysql-cdc/invisible-columns.td
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,31 @@ DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;

CREATE TABLE t1 (f1 INT, f2 INT INVISIBLE, f3 INT INVISIBLE);
INSERT INTO t1 (f1, f2, f3) VALUES (10, 20, 30);
SET sql_generate_invisible_primary_key=ON;
CREATE TABLE t1 (f1 INT, f2 INT INVISIBLE, f3 DATE INVISIBLE, f4 INT INVISIBLE);
INSERT INTO t1 (f1, f2, f3, f4) VALUES (10, 20, '2025-01-28', 6);
INSERT INTO t1 VALUES (11);

# TODO: database-issues#7782 (invisible columns not supported)
# > CREATE SOURCE mz_source FROM MYSQL CONNECTION mysql_conn;
# > CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE public.t1);
#
# > SELECT * FROM t1;
# 10
# 11
#
# $ mysql-execute name=mysql
# ALTER TABLE t1 ALTER COLUMN f2 SET VISIBLE;
#
# ! SELECT * FROM t1;
# contains:incompatible schema change
> CREATE SOURCE mz_source FROM MYSQL CONNECTION mysql_conn;
> CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE public.t1) WITH (TEXT COLUMNS (f3), EXCLUDE COLUMNS (f4));

> SELECT * FROM t1;
1 10 20 "2025-01-28"
2 11 <null> <null>

$ mysql-execute name=mysql
ALTER TABLE t1 ALTER COLUMN f2 SET VISIBLE;
INSERT INTO t1 (f1, f2, f3, f4) VALUES (111, 222, '2025-01-29', 6);
INSERT INTO t1 (f1, f2) VALUES (1111, 2222);

> SELECT * from t1;
1 10 20 "2025-01-28"
2 11 <null> <null>
3 111 222 "2025-01-29"
4 1111 2222 <null>

$ mysql-execute name=mysql
ALTER TABLE t1 DROP COLUMN f2;

! SELECT * FROM t1;
contains:incompatible schema change
6 changes: 4 additions & 2 deletions test/mysql-cdc/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import threading
from textwrap import dedent

from materialize import buildkite
from materialize import MZ_ROOT, buildkite
from materialize.mysql_util import (
retrieve_invalid_ssl_context_for_mysql,
retrieve_ssl_context_for_mysql,
Expand Down Expand Up @@ -99,7 +99,9 @@ def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None:

matching_files = []
for filter in args.filter:
matching_files.extend(glob.glob(filter, root_dir="test/mysql-cdc"))
matching_files.extend(
glob.glob(filter, root_dir=MZ_ROOT / "test" / "mysql-cdc")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this fix! I've been bitten by it!

)
sharded_files: list[str] = sorted(
buildkite.shard_list(matching_files, lambda file: file)
)
Expand Down
34 changes: 34 additions & 0 deletions test/mysql-cdc/quoted-identifier.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

#
# identifiers that need proper quoting
#

> CREATE SECRET mysqlpass AS '${arg.mysql-root-password}'
> CREATE CONNECTION mysql_conn TO MYSQL (
HOST mysql,
USER root,
PASSWORD SECRET mysqlpass
)

$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password}

$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
CREATE TABLE `t``1` (`f``1` INT)
INSERT INTO `t``1` VALUES(1);

> CREATE SOURCE mz_source FROM MYSQL CONNECTION mysql_conn;
> CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE public."t`1");

> SELECT * FROM t1;
1