-
Notifications
You must be signed in to change notification settings - Fork 67
use REPLACE INTO instead of INSERT INTO...UPDATE in covid_hosp acquisition #1356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
8536431
e85a994
3ccd57d
a930bab
924b08e
a232e7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -159,6 +159,26 @@ def insert_metadata(self, publication_date, revision, meta_json, logger=False): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(%s, %s, %s, %s, %s, NOW()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
''', (self.table_name, self.hhs_dataset_id, publication_date, revision, meta_json)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def remove_issues(self, issue_date): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# TODO: this is *VERY* incomplete! SQL statements are never even evaluated! | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# delete from metadata table where issue date matches | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
a = f"DELETE FROM `covid_hosp_meta` WHERE dataset_name='{self.table_name}' AND publication_date='{issue_date}'" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if self.aggregate_key_cols: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# TODO: restrict this to just UNIQUE columns from aggregate keys table? | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# create (empty) `some_temp_table` like `{self.table_name}_key` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
b = f"CREATE TABLE some_temp_table AS SELECT {self.aggregate_key_cols} FROM `{self.table_name}_key` WHERE FALSE" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# save aggregate keys from what we are about to delete | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
c = f"SELECT {self.aggregate_key_cols} INTO some_temp_table FROM `{self.table_name}` WHERE `{self.publication_col_name}`={issue_date} GROUP BY {self.aggregate_key_cols}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# TODO: combine two SQL queries above into one? | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# delete from main data table where issue matches | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
d = f"DELETE FROM `{self.table_name}` WHERE `{self.publication_col_name}`={issue_date}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if self.aggregate_key_cols: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# delete from saved aggregate keys where the key still exists | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
e = f"DELETE FROM some_temp_table JOIN `{self.table_name}` USING ({self.aggregate_key_cols})" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# delete from aggregate key table anything left in saved keys (which should be aggregate keys that only existed in the issue we deleted) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
f = f"DELETE FROM `{self.table_name}_key` JOIN some_temp_table USING ({self.aggregate_key_cols})" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
g = "DROP TABLE some_temp_table" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good suggestion, but this is just a work-in-progress for now... as you probably noticed, these are all just strings and the function doesnt actually accomplish anything. im gonna remove the method and save it for later. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def insert_dataset(self, publication_date, dataframe, logger=False): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"""Add a dataset to the database. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -184,19 +204,16 @@ def nan_safe_dtype(dtype, value): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for csv_name in self.key_columns: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dataframe.loc[:, csv_name] = dataframe[csv_name].map(self.columns_and_types[csv_name].dtype) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
num_columns = 2 + len(dataframe_columns_and_types) + len(self.additional_fields) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
value_placeholders = ', '.join(['%s'] * num_columns) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
col_names = [f'`{i.sql_name}`' for i in dataframe_columns_and_types + self.additional_fields] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
columns = ', '.join(col_names) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updates = ', '.join(f'{c}=new_values.{c}' for c in col_names) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# NOTE: list in `updates` presumes `publication_col_name` is part of the unique key and thus not needed in UPDATE | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) ' \ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
f'VALUES ({value_placeholders}) AS new_values ' \ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
f'ON DUPLICATE KEY UPDATE {updates}' | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
value_placeholders = ', '.join(['%s'] * (2 + len(col_names))) # extra 2 for `id` and `self.publication_col_name` cols | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
columnstring = ', '.join(col_names) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sql = f'REPLACE INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columnstring}) VALUES ({value_placeholders})' | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
id_and_publication_date = (0, publication_date) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
num_values = len(dataframe.index) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if logger: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger.info('updating values', count=len(dataframe.index)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger.info('updating values', count=num_values) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
n = 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
rows_affected = 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
many_values = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
with self.new_cursor() as cursor: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for index, row in dataframe.iterrows(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -212,6 +229,7 @@ def nan_safe_dtype(dtype, value): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if n % 5_000 == 0: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cursor.executemany(sql, many_values) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
rows_affected += cursor.rowcount | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
many_values = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if logger: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -220,6 +238,11 @@ def nan_safe_dtype(dtype, value): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# insert final batch | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if many_values: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cursor.executemany(sql, many_values) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
rows_affected += cursor.rowcount | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if logger: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# NOTE: REPLACE INTO marks 2 rows affected for a "replace" (one for a delete and one for a re-insert) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# which allows us to count rows which were updated | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger.info('rows affected', total=rows_affected, updated=rows_affected-num_values) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# deal with non/seldomly updated columns used like a fk table (if this database needs it) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if hasattr(self, 'AGGREGATE_KEY_COLS'): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Uh oh!
There was an error while loading. Please reload this page.