Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/mysql-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,33 @@ pub enum MySqlError {
MySql(#[from] mysql_async::Error),
}

/// Quotes MySQL identifiers. [See MySQL quote_identifier()](https://github.com/mysql/mysql-sys/blob/master/functions/quote_identifier.sql)
pub fn quote_identifier(identifier: &str) -> String {
let mut escaped = identifier.replace("`", "``");
escaped.insert(0, '`');
escaped.push('`');
escaped
}

// NOTE: this error was renamed between MySQL 5.7 and 8.0
// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html#error_er_source_fatal_error_reading_binlog
// https://dev.mysql.com/doc/mysql-errors/5.7/en/server-error-reference.html#error_er_master_fatal_error_reading_binlog
pub const ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE: u16 = 1236;

// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html#error_er_no_such_table
pub const ER_NO_SUCH_TABLE: u16 = 1146;

#[cfg(test)]
mod tests {

use super::quote_identifier;
#[mz_ore::test]
fn test_identifier_quoting() {
let expected = vec!["`a`", "`naughty``sql`", "```;naughty;sql;```"];
let input = ["a", "naughty`sql", "`;naughty;sql;`"]
.iter()
.map(|raw_str| quote_identifier(raw_str))
.collect::<Vec<_>>();
assert_eq!(expected, input);
}
}
8 changes: 7 additions & 1 deletion src/storage/src/source/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use std::rc::Rc;

use differential_dataflow::AsCollection;
use itertools::Itertools;
use mz_mysql_util::quote_identifier;
use mz_ore::cast::CastFrom;
use mz_repr::Diff;
use mz_repr::GlobalId;
Expand Down Expand Up @@ -338,7 +339,12 @@ impl MySqlTableName {

impl fmt::Display for MySqlTableName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "`{}`.`{}`", self.0, self.1)
write!(
f,
"{}.{}",
quote_identifier(&self.0),
quote_identifier(&self.1)
)
}
}

Expand Down
80 changes: 76 additions & 4 deletions src/storage/src/source/mysql/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,12 @@ use std::sync::Arc;

use differential_dataflow::AsCollection;
use futures::TryStreamExt;
use itertools::Itertools;
use mysql_async::prelude::Queryable;
use mysql_async::{IsolationLevel, Row as MySqlRow, TxOpts};
use mz_mysql_util::{pack_mysql_row, query_sys_var, MySqlError, ER_NO_SUCH_TABLE};
use mz_mysql_util::{
pack_mysql_row, query_sys_var, quote_identifier, MySqlError, ER_NO_SUCH_TABLE,
};
use mz_ore::cast::CastFrom;
use mz_ore::future::InTask;
use mz_ore::iter::IteratorExt;
Expand Down Expand Up @@ -404,9 +407,8 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(

let mut snapshot_staged = 0;
for (table, outputs) in &reader_snapshot_table_info {
let query = format!("SELECT * FROM {}", table);
trace!(%id, "timely-{worker_id} reading snapshot from \
table '{table}'");
let query = build_snapshot_query(outputs);
trace!(%id, "timely-{worker_id} reading snapshot query='{}'", query);
let mut results = tx.exec_stream(query, ()).await?;
let mut count = 0;
while let Some(row) = results.try_next().await? {
Expand Down Expand Up @@ -517,6 +519,31 @@ where
Ok(total)
}

/// Builds the SQL query to be used for creating the snapshot using the first entry in outputs.
///
/// Expect `outputs` to contain entries for a single table, and to have at least 1 entry.
/// Expect that each MySqlTableDesc entry contains all columns described in information_schema.columns.
#[must_use]
fn build_snapshot_query(outputs: &[SourceOutputInfo]) -> String {
let info = outputs.first().expect("MySQL table info");
if outputs.len() > 1
&& !outputs
.iter()
.skip(1)
.map(|other_info| &other_info.desc.columns)
.all(|other_columns| *other_columns == info.desc.columns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit: in idiomatic Rust, at least in the MZ codebase, it usually better to avoid long chains of iterator transformations with either a short chain if possible or just a imperative for loop. This functional style becomes harder to read after a certain point.

So I would write this either as:

assert!(outputs.iter().all(|o| info.desc.columns == o.desc.columns), "Mismatch...");

or:

for output in outputs[1..] {
   assert!(info.desc.columns == output.desc.columns), "Mismatch...");
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thank you!

{
panic!("Mismatch in table descriptions for {}", info.table_name);
}
let columns = info
.desc
.columns
.iter()
.map(|col| quote_identifier(&col.name))
.join(", ");
format!("SELECT {} FROM {}", columns, info.table_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(sorry for the drive-by review) Do we have to worry about SQL injection here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm.. good point, I made an unfortunate assumption that the existing format! was good enough.

I'll update the snapshot command and collect_table_statistics to use prepared statements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did wonder about that during review and concluded that we don't have to be worried as to perform an injection you have to somehow create weirdly named columns, which means you already admin level have access to the database. My understanding is that we can't use prepared statements here since with those you can put placeholders for values but here we want to put placeholders for column names. If it's possible and easy to do we should maybe do it

Copy link
Contributor Author

@martykulma martykulma Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done this kind of thing in PG by building dynamic statements using virtual sets for the identifiers, but I'm not as familiar with MySQL, and this is proving challenging to try to implement via prepared statements.

mysql_async client doesn't provide a quote_identifier, but I can write one based on the stored procedure implementation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

}

#[derive(Default)]
struct TableStatistics {
count_latency: f64,
Expand All @@ -541,3 +568,48 @@ where

Ok(stats)
}

#[cfg(test)]
mod tests {
use super::*;
use mz_mysql_util::{MySqlColumnDesc, MySqlTableDesc};
use timely::progress::Antichain;

#[mz_ore::test]
fn snapshot_query_duplicate_table() {
let schema_name = "myschema".to_string();
let table_name = "mytable".to_string();
let table = MySqlTableName(schema_name.clone(), table_name.clone());
let columns = ["c1", "c2", "c3"]
.iter()
.map(|col| MySqlColumnDesc {
name: col.to_string(),
column_type: None,
meta: None,
})
.collect::<Vec<_>>();
let desc = MySqlTableDesc {
schema_name: schema_name.clone(),
name: table_name.clone(),
columns,
keys: BTreeSet::default(),
};
let info = SourceOutputInfo {
output_index: 1, // ignored
table_name: table.clone(),
desc,
text_columns: vec![],
exclude_columns: vec![],
initial_gtid_set: Antichain::default(),
resume_upper: Antichain::default(),
};
let query = build_snapshot_query(&[info.clone(), info]);
assert_eq!(
format!(
"SELECT `c1`, `c2`, `c3` FROM `{}`.`{}`",
&schema_name, &table_name
),
query
);
}
}
41 changes: 26 additions & 15 deletions test/mysql-cdc/invisible-columns.td
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,31 @@ DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;

CREATE TABLE t1 (f1 INT, f2 INT INVISIBLE, f3 INT INVISIBLE);
INSERT INTO t1 (f1, f2, f3) VALUES (10, 20, 30);
SET sql_generate_invisible_primary_key=ON;
CREATE TABLE t1 (f1 INT, f2 INT INVISIBLE, f3 DATE INVISIBLE, f4 INT INVISIBLE);
INSERT INTO t1 (f1, f2, f3, f4) VALUES (10, 20, '2025-01-28', 6);
INSERT INTO t1 VALUES (11);

# TODO: database-issues#7782 (invisible columns not supported)
# > CREATE SOURCE mz_source FROM MYSQL CONNECTION mysql_conn;
# > CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE public.t1);
#
# > SELECT * FROM t1;
# 10
# 11
#
# $ mysql-execute name=mysql
# ALTER TABLE t1 ALTER COLUMN f2 SET VISIBLE;
#
# ! SELECT * FROM t1;
# contains:incompatible schema change
> CREATE SOURCE mz_source FROM MYSQL CONNECTION mysql_conn;
> CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE public.t1) WITH (TEXT COLUMNS (f3), EXCLUDE COLUMNS (f4));

> SELECT * FROM t1;
1 10 20 "2025-01-28"
2 11 <null> <null>

$ mysql-execute name=mysql
ALTER TABLE t1 ALTER COLUMN f2 SET VISIBLE;
INSERT INTO t1 (f1, f2, f3, f4) VALUES (111, 222, '2025-01-29', 6);
INSERT INTO t1 (f1, f2) VALUES (1111, 2222);

> SELECT * from t1;
1 10 20 "2025-01-28"
2 11 <null> <null>
3 111 222 "2025-01-29"
4 1111 2222 <null>

$ mysql-execute name=mysql
ALTER TABLE t1 DROP COLUMN f2;

! SELECT * FROM t1;
contains:incompatible schema change
6 changes: 4 additions & 2 deletions test/mysql-cdc/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import threading
from textwrap import dedent

from materialize import buildkite
from materialize import MZ_ROOT, buildkite
from materialize.mysql_util import (
retrieve_invalid_ssl_context_for_mysql,
retrieve_ssl_context_for_mysql,
Expand Down Expand Up @@ -99,7 +99,9 @@ def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None:

matching_files = []
for filter in args.filter:
matching_files.extend(glob.glob(filter, root_dir="test/mysql-cdc"))
matching_files.extend(
glob.glob(filter, root_dir=MZ_ROOT / "test" / "mysql-cdc")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this fix! I've been bitten by it!

)
sharded_files: list[str] = sorted(
buildkite.shard_list(matching_files, lambda file: file)
)
Expand Down
34 changes: 34 additions & 0 deletions test/mysql-cdc/quoted-identifier.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

#
# identifiers that need proper quoting
#

> CREATE SECRET mysqlpass AS '${arg.mysql-root-password}'
> CREATE CONNECTION mysql_conn TO MYSQL (
HOST mysql,
USER root,
PASSWORD SECRET mysqlpass
)

$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password}

$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
CREATE TABLE `t``1` (`f``1` INT)
INSERT INTO `t``1` VALUES(1);

> CREATE SOURCE mz_source FROM MYSQL CONNECTION mysql_conn;
> CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE public."t`1");

> SELECT * FROM t1;
1