@@ -184,19 +184,16 @@ def nan_safe_dtype(dtype, value):
184
184
for csv_name in self .key_columns :
185
185
dataframe .loc [:, csv_name ] = dataframe [csv_name ].map (self .columns_and_types [csv_name ].dtype )
186
186
187
- num_columns = 2 + len (dataframe_columns_and_types ) + len (self .additional_fields )
188
- value_placeholders = ', ' .join (['%s' ] * num_columns )
189
187
col_names = [f'`{ i .sql_name } `' for i in dataframe_columns_and_types + self .additional_fields ]
190
- columns = ', ' .join (col_names )
191
- updates = ', ' .join (f'{ c } =new_values.{ c } ' for c in col_names )
192
- # NOTE: list in `updates` presumes `publication_col_name` is part of the unique key and thus not needed in UPDATE
193
- sql = f'INSERT INTO `{ self .table_name } ` (`id`, `{ self .publication_col_name } `, { columns } ) ' \
194
- f'VALUES ({ value_placeholders } ) AS new_values ' \
195
- f'ON DUPLICATE KEY UPDATE { updates } '
188
+ value_placeholders = ', ' .join (['%s' ] * (2 + len (col_names ))) # extra 2 for `id` and `self.publication_col_name` cols
189
+ columnstring = ', ' .join (col_names )
190
+ sql = f'REPLACE INTO `{ self .table_name } ` (`id`, `{ self .publication_col_name } `, { columnstring } ) VALUES ({ value_placeholders } )'
196
191
id_and_publication_date = (0 , publication_date )
192
+ num_values = len (dataframe .index )
197
193
if logger :
198
- logger .info ('updating values' , count = len ( dataframe . index ) )
194
+ logger .info ('updating values' , count = num_values )
199
195
n = 0
196
+ rows_affected = 0
200
197
many_values = []
201
198
with self .new_cursor () as cursor :
202
199
for index , row in dataframe .iterrows ():
@@ -212,6 +209,7 @@ def nan_safe_dtype(dtype, value):
212
209
if n % 5_000 == 0 :
213
210
try :
214
211
cursor .executemany (sql , many_values )
212
+ rows_affected += cursor .rowcount
215
213
many_values = []
216
214
except Exception as e :
217
215
if logger :
@@ -220,6 +218,11 @@ def nan_safe_dtype(dtype, value):
220
218
# insert final batch
221
219
if many_values :
222
220
cursor .executemany (sql , many_values )
221
+ rows_affected += cursor .rowcount
222
+ if logger :
223
+ # NOTE: REPLACE INTO marks 2 rows affected for a "replace" (one for a delete and one for a re-insert)
224
+ # which allows us to count rows which were updated
225
+ logger .info ('rows affected' , total = rows_affected , updated = rows_affected - num_values )
223
226
224
227
# deal with non/seldomly updated columns used like a fk table (if this database needs it)
225
228
if hasattr (self , 'AGGREGATE_KEY_COLS' ):
0 commit comments