Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 64 additions & 3 deletions src/storage/src/source/mysql/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ use std::sync::Arc;

use differential_dataflow::AsCollection;
use futures::TryStreamExt;
use itertools::Itertools;
use mysql_async::prelude::Queryable;
use mysql_async::{IsolationLevel, Row as MySqlRow, TxOpts};
use mz_mysql_util::{pack_mysql_row, query_sys_var, MySqlError, ER_NO_SUCH_TABLE};
Expand Down Expand Up @@ -404,9 +405,8 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(

let mut snapshot_staged = 0;
for (table, outputs) in &reader_snapshot_table_info {
let query = format!("SELECT * FROM {}", table);
trace!(%id, "timely-{worker_id} reading snapshot from \
table '{table}'");
let query = build_snapshot_query(outputs);
trace!(%id, "timely-{worker_id} reading snapshot query='{}'", query);
let mut results = tx.exec_stream(query, ()).await?;
let mut count = 0;
while let Some(row) = results.try_next().await? {
Expand Down Expand Up @@ -517,6 +517,22 @@ where
Ok(total)
}

/// Builds the SQL query to be used for creating the snapshot using the first entry in outputs.
///
/// Expect `outputs` to contain entries for a single table, and to have at least 1 entry.
/// Expect that each MySqlTableDesc entry contains all columns described in information_schema.columns.
#[must_use]
fn build_snapshot_query(outputs: &[SourceOutputInfo]) -> String {
let info = outputs.first().expect("MySQL table info");
let columns = info
.desc
.columns
.iter()
.map(|col| format!("`{}`", col.name))
.join(", ");
format!("SELECT {} FROM {}", columns, info.table_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(sorry for the drive-by review) Do we have to worry about SQL injection here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm.. good point, I made an unfortunate assumption that the existing format! was good enough.

I'll update the snapshot command and collect_table_statistics to use prepared statements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did wonder about that during review and concluded that we don't have to be worried as to perform an injection you have to somehow create weirdly named columns, which means you already admin level have access to the database. My understanding is that we can't use prepared statements here since with those you can put placeholders for values but here we want to put placeholders for column names. If it's possible and easy to do we should maybe do it

Copy link
Contributor Author

@martykulma martykulma Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done this kind of thing in PG by building dynamic statements using virtual sets for the identifiers, but I'm not as familiar with MySQL, and this is proving challenging to try to implement via prepared statements.

mysql_async client doesn't provide a quote_identifier, but I can write one based on the stored procedure implementation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

}

#[derive(Default)]
struct TableStatistics {
count_latency: f64,
Expand All @@ -541,3 +557,48 @@ where

Ok(stats)
}

#[cfg(test)]
mod tests {
use super::*;
use mz_mysql_util::{MySqlColumnDesc, MySqlTableDesc};
use timely::progress::Antichain;

#[mz_ore::test]
fn snapshot_query_duplicate_table() {
let schema_name = "myschema".to_string();
let table_name = "mytable".to_string();
let table = MySqlTableName(schema_name.clone(), table_name.clone());
let columns = ["c1", "c2", "c3"]
.iter()
.map(|col| MySqlColumnDesc {
name: col.to_string(),
column_type: None,
meta: None,
})
.collect::<Vec<_>>();
let desc = MySqlTableDesc {
schema_name: schema_name.clone(),
name: table_name.clone(),
columns,
keys: BTreeSet::default(),
};
let info = SourceOutputInfo {
output_index: 1, // ignored
table_name: table.clone(),
desc,
text_columns: vec![],
exclude_columns: vec![],
initial_gtid_set: Antichain::default(),
resume_upper: Antichain::default(),
};
let query = build_snapshot_query(&[info.clone(), info]);
assert_eq!(
format!(
"SELECT `c1`, `c2`, `c3` FROM `{}`.`{}`",
&schema_name, &table_name
),
query
);
}
}
41 changes: 26 additions & 15 deletions test/mysql-cdc/invisible-columns.td
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,31 @@ DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;

CREATE TABLE t1 (f1 INT, f2 INT INVISIBLE, f3 INT INVISIBLE);
INSERT INTO t1 (f1, f2, f3) VALUES (10, 20, 30);
SET sql_generate_invisible_primary_key=ON;
CREATE TABLE t1 (f1 INT, f2 INT INVISIBLE, f3 DATE INVISIBLE, f4 INT INVISIBLE);
INSERT INTO t1 (f1, f2, f3, f4) VALUES (10, 20, '2025-01-28', 6);
INSERT INTO t1 VALUES (11);

# TODO: database-issues#7782 (invisible columns not supported)
# > CREATE SOURCE mz_source FROM MYSQL CONNECTION mysql_conn;
# > CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE public.t1);
#
# > SELECT * FROM t1;
# 10
# 11
#
# $ mysql-execute name=mysql
# ALTER TABLE t1 ALTER COLUMN f2 SET VISIBLE;
#
# ! SELECT * FROM t1;
# contains:incompatible schema change
> CREATE SOURCE mz_source FROM MYSQL CONNECTION mysql_conn;
> CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE public.t1) WITH (TEXT COLUMNS (f3), EXCLUDE COLUMNS (f4));

> SELECT * FROM t1;
1 10 20 "2025-01-28"
2 11 <null> <null>

$ mysql-execute name=mysql
ALTER TABLE t1 ALTER COLUMN f2 SET VISIBLE;
INSERT INTO t1 (f1, f2, f3, f4) VALUES (111, 222, '2025-01-29', 6);
INSERT INTO t1 (f1, f2) VALUES (1111, 2222);

> SELECT * from t1;
1 10 20 "2025-01-28"
2 11 <null> <null>
3 111 222 "2025-01-29"
4 1111 2222 <null>

$ mysql-execute name=mysql
ALTER TABLE t1 DROP COLUMN f2;

! SELECT * FROM t1;
contains:incompatible schema change
6 changes: 4 additions & 2 deletions test/mysql-cdc/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import threading
from textwrap import dedent

from materialize import buildkite
from materialize import MZ_ROOT, buildkite
from materialize.mysql_util import (
retrieve_invalid_ssl_context_for_mysql,
retrieve_ssl_context_for_mysql,
Expand Down Expand Up @@ -99,7 +99,9 @@ def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None:

matching_files = []
for filter in args.filter:
matching_files.extend(glob.glob(filter, root_dir="test/mysql-cdc"))
matching_files.extend(
glob.glob(filter, root_dir=MZ_ROOT / "test" / "mysql-cdc")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this fix! I've been bitten by it!

)
sharded_files: list[str] = sorted(
buildkite.shard_list(matching_files, lambda file: file)
)
Expand Down