-
Notifications
You must be signed in to change notification settings - Fork 494
mysql: add support for invisible columns #31239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
All contributors have signed the CLA. |
|
I have read the Contributor License Agreement (CLA) and I hereby sign the CLA. |
f1ec013 to
bb7369e
Compare
| .iter() | ||
| .map(|col_info| format!("`{}`", col_info.name)) | ||
| }) | ||
| .join(", "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I think this might be doing a different thing than what we need. Here you iterate over all outputs and concatenate the name of the columns from each one of them. So if I understand correctly, if the same table is ingested twice in Materialize we will end up with a query like SELECT col1, col2, col1, col2 and transfer twice the amount of data. Am I reading this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
excellent point - I hadn't considered that case.
Now that I'm looking back at it, this may also run into time-of-check/time-of-use issues. There's no guarantee that the table definition is the same at the time we issue the select. Which means we need to identify the columns in the transaction that we run the query.. I'll make that change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no guarantee that the table definition is the same at the time we issue the select.
That's true, but also fine, because Materialize expects a certain schema that has already been committed in the catalog. So we should explicitly request the columns that we expect to be there, and if they aren't we'll get an error from the MySQL client and report it.
Even if we learned the new schema in the dataflow there isn't anything else we can do at that point. If the user has changed the upstream schema after having created the ingestion in Materialize then the only way out is to drop and recreate the tables in Materialize.
I think what we want to do here is collect the set of columns that each output needs (using something like a BTreeSet) and then make sure that we decode and project the columns in the right order downstream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 When we build MySqlTableDesc.columns, the intention appears to be that all columns will be in the struct. For ignored columns, column_type is set to None, but the name is maintained. So I expect it's possible to use any entry from outputs for the column names in the table description. I'll check if we have test coverage for it, if not, I'll add something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the PR to just use the first entry rather than a BTreeSet based on my note above. Let me know if that sits well with you, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me! Let's also add an assertion that all the outputs contain the same column names in the same order to ensure a panic if this assumption ever changes
| for filter in args.filter: | ||
| matching_files.extend(glob.glob(filter, root_dir="test/mysql-cdc")) | ||
| matching_files.extend( | ||
| glob.glob(filter, root_dir=MZ_ROOT / "test" / "mysql-cdc") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this fix! I've been bitten by it!
| .iter() | ||
| .map(|col| format!("`{}`", col.name)) | ||
| .join(", "); | ||
| format!("SELECT {} FROM {}", columns, info.table_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(sorry for the drive-by review) Do we have to worry about SQL injection here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmm.. good point, I made an unfortunate assumption that the existing format! was good enough.
I'll update the snapshot command and collect_table_statistics to use prepared statements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did wonder about that during review and concluded that we don't have to be worried as to perform an injection you have to somehow create weirdly named columns, which means you already admin level have access to the database. My understanding is that we can't use prepared statements here since with those you can put placeholders for values but here we want to put placeholders for column names. If it's possible and easy to do we should maybe do it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have done this kind of thing in PG by building dynamic statements using virtual sets for the identifiers, but I'm not as familiar with MySQL, and this is proving challenging to try to implement via prepared statements.
mysql_async client doesn't provide a quote_identifier, but I can write one based on the stored procedure implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
petrosagg
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
src/mysql-util/src/lib.rs
Outdated
| let input = ["a", "naughty`sql", "`;naughty;sql;`"].iter().map(|raw_str| quote_identifier(&raw_str)).collect::<Vec<_>>(); | ||
| assert_eq!(expected, input); | ||
| } | ||
| } No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like your editor doesn't add trailing new lines, this might generate spurious diffs. Not blocking for this PR but something to configure
| if outputs.len() > 1 | ||
| && !outputs | ||
| .iter() | ||
| .skip(1) | ||
| .map(|other_info| &other_info.desc.columns) | ||
| .all(|other_columns| *other_columns == info.desc.columns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small nit: in idiomatic Rust, at least in the MZ codebase, it usually better to avoid long chains of iterator transformations with either a short chain if possible or just a imperative for loop. This functional style becomes harder to read after a certain point.
So I would write this either as:
assert!(outputs.iter().all(|o| info.desc.columns == o.desc.columns), "Mismatch...");or:
for output in outputs[1..] {
assert!(info.desc.columns == output.desc.columns), "Mismatch...");
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, thank you!
| .iter() | ||
| .map(|col| format!("`{}`", col.name)) | ||
| .join(", "); | ||
| format!("SELECT {} FROM {}", columns, info.table_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
Noticed and fixed for one test by Marty in MaterializeInc#31239
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding a test. I have added a platform checks check, nightly run with it: https://buildkite.com/materialize/nightly/builds/11054
e065a5f to
fe6897e
Compare
|
I find it slightly suspicious that the MySQL CDC resumption test timed out twice on this PR: https://buildkite.com/materialize/test/builds/98149#0194d5c5-ceb8-4010-9f7f-a41aa8ce2acf |
hmm.. this PR changes how we query for snapshot from That |
|
hm, https://github.com/MaterializeInc/database-issues/issues/8884 seems a bit in the weeds since it involves a failover scenario so I wouldn't dig into it at the moment. I compared the set of dataflow restarts that happened in this run vs one from main that succeeds and this is the difference: -INFO mz_storage::healthcheck: Broadcasting suspend-and-restart command because of Some((MySql, Stalled { error: "Input/output error: Input/output error: Connection refused (os error 111): Input/output error: Connection refused (os error 111): Connection refused (os error 111)", hint: None, should_halt: true })) after 5s delay
+INFO mz_storage::healthcheck: Broadcasting suspend-and-restart command because of Some((MySql, Stalled { error: "Input/output error: Input/output error: connection closed: Input/output error: connection closed: connection closed", hint: None, should_halt: true })) after 5s delay
+INFO mz_storage::healthcheck: Broadcasting suspend-and-restart command because of Some((MySql, Stalled { error: "Input/output error: Input/output error: connection closed: Input/output error: connection closed: connection closed", hint: None, should_halt: true })) after 5s delay
+INFO mz_storage::healthcheck: Broadcasting suspend-and-restart command because of Some((MySql, Stalled { error: "Input/output error: Input/output error: connection closed: Input/output error: connection closed: connection closed", hint: None, should_halt: true })) after 5s delay(For Marty: these logs are in each buildkite run under the "Artifacts" tab and named "services.log") So it looks like in this run the dataflow found the MySQL service down more times. The run on main takes 24 minutes which is already close to the 30 minute timeout. At the same time these three restarts should take less than 6 minutes to complete so it's not that they are a complete explanation. Given the change is pretty simple I'm inclined to just merge this with the increased timeout without further investigation. |
To support invisible columns in MySQL, explicitly request columns in snapshot select statement. And while I'm here, I corrected the glob root path in mzcompose.py to get the tests to run.
Motivation
Adds support for invisible columns in mysql
Tips for reviewer
Checklist
$T ⇔ Proto$Tmapping (possibly in a backwards-incompatible way), then it is tagged with aT-protolabel.