@@ -90,9 +90,12 @@ use std::sync::Arc;
9090
9191use differential_dataflow:: AsCollection ;
9292use futures:: TryStreamExt ;
93+ use itertools:: Itertools ;
9394use mysql_async:: prelude:: Queryable ;
9495use mysql_async:: { IsolationLevel , Row as MySqlRow , TxOpts } ;
95- use mz_mysql_util:: { pack_mysql_row, query_sys_var, MySqlError , ER_NO_SUCH_TABLE } ;
96+ use mz_mysql_util:: {
97+ pack_mysql_row, query_sys_var, quote_identifier, MySqlError , ER_NO_SUCH_TABLE ,
98+ } ;
9699use mz_ore:: cast:: CastFrom ;
97100use mz_ore:: future:: InTask ;
98101use mz_ore:: iter:: IteratorExt ;
@@ -404,9 +407,8 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
404407
405408 let mut snapshot_staged = 0 ;
406409 for ( table, outputs) in & reader_snapshot_table_info {
407- let query = format ! ( "SELECT * FROM {}" , table) ;
408- trace ! ( %id, "timely-{worker_id} reading snapshot from \
409- table '{table}'") ;
410+ let query = build_snapshot_query ( outputs) ;
411+ trace ! ( %id, "timely-{worker_id} reading snapshot query='{}'" , query) ;
410412 let mut results = tx. exec_stream ( query, ( ) ) . await ?;
411413 let mut count = 0 ;
412414 while let Some ( row) = results. try_next ( ) . await ? {
@@ -517,6 +519,29 @@ where
517519 Ok ( total)
518520}
519521
522+ /// Builds the SQL query to be used for creating the snapshot using the first entry in outputs.
523+ ///
524+ /// Expect `outputs` to contain entries for a single table, and to have at least 1 entry.
525+ /// Expect that each MySqlTableDesc entry contains all columns described in information_schema.columns.
526+ #[ must_use]
527+ fn build_snapshot_query ( outputs : & [ SourceOutputInfo ] ) -> String {
528+ let info = outputs. first ( ) . expect ( "MySQL table info" ) ;
529+ for output in & outputs[ 1 ..] {
530+ assert ! (
531+ info. desc. columns == output. desc. columns,
532+ "Mismatch in table descriptions for {}" ,
533+ info. table_name
534+ ) ;
535+ }
536+ let columns = info
537+ . desc
538+ . columns
539+ . iter ( )
540+ . map ( |col| quote_identifier ( & col. name ) )
541+ . join ( ", " ) ;
542+ format ! ( "SELECT {} FROM {}" , columns, info. table_name)
543+ }
544+
520545#[ derive( Default ) ]
521546struct TableStatistics {
522547 count_latency : f64 ,
@@ -541,3 +566,48 @@ where
541566
542567 Ok ( stats)
543568}
569+
570+ #[ cfg( test) ]
571+ mod tests {
572+ use super :: * ;
573+ use mz_mysql_util:: { MySqlColumnDesc , MySqlTableDesc } ;
574+ use timely:: progress:: Antichain ;
575+
576+ #[ mz_ore:: test]
577+ fn snapshot_query_duplicate_table ( ) {
578+ let schema_name = "myschema" . to_string ( ) ;
579+ let table_name = "mytable" . to_string ( ) ;
580+ let table = MySqlTableName ( schema_name. clone ( ) , table_name. clone ( ) ) ;
581+ let columns = [ "c1" , "c2" , "c3" ]
582+ . iter ( )
583+ . map ( |col| MySqlColumnDesc {
584+ name : col. to_string ( ) ,
585+ column_type : None ,
586+ meta : None ,
587+ } )
588+ . collect :: < Vec < _ > > ( ) ;
589+ let desc = MySqlTableDesc {
590+ schema_name : schema_name. clone ( ) ,
591+ name : table_name. clone ( ) ,
592+ columns,
593+ keys : BTreeSet :: default ( ) ,
594+ } ;
595+ let info = SourceOutputInfo {
596+ output_index : 1 , // ignored
597+ table_name : table. clone ( ) ,
598+ desc,
599+ text_columns : vec ! [ ] ,
600+ exclude_columns : vec ! [ ] ,
601+ initial_gtid_set : Antichain :: default ( ) ,
602+ resume_upper : Antichain :: default ( ) ,
603+ } ;
604+ let query = build_snapshot_query ( & [ info. clone ( ) , info] ) ;
605+ assert_eq ! (
606+ format!(
607+ "SELECT `c1`, `c2`, `c3` FROM `{}`.`{}`" ,
608+ & schema_name, & table_name
609+ ) ,
610+ query
611+ ) ;
612+ }
613+ }
0 commit comments