Skip to content

Set default older_than to tomorrow #1224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 114 additions & 54 deletions integrations/acquisition/covid_hosp/state_daily/test_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,62 +47,122 @@ def setUp(self):
cur.execute('delete from api_user')
cur.execute('insert into api_user(api_key, email) values("key", "email")')

@freeze_time("2021-03-16")
def test_acquire_dataset(self):
"""Acquire a new dataset."""
def get_modified_dataset(self, critical_staffing_shortage_today_yes, reporting_cutoff_start):
"""Get a simplified version of a test dataset.

# make sure the data does not yet exist
with self.subTest(name='no data yet'):
response = Epidata.covid_hosp('MA', Epidata.range(20200101, 20210101))
self.assertEqual(response['result'], -2, response)
Only WY data is modified. The issue date is specified in the metadata file.
"""
df = self.test_utils.load_sample_dataset()
df_new = pd.DataFrame(df[df["state"] == "WY"], columns=df.columns).reset_index(drop=True)
df_new["critical_staffing_shortage_today_yes"] = critical_staffing_shortage_today_yes
df_new["reporting_cutoff_start"] = reporting_cutoff_start
return df_new

# acquire sample data into local database
# mock out network calls to external hosts
with self.subTest(name='first acquisition'), \
patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()) as mock_fetch_meta, \
patch.object(Network, 'fetch_dataset', side_effect=[self.test_utils.load_sample_dataset("dataset0.csv"), # dataset for 3/13
self.test_utils.load_sample_dataset("dataset0.csv"), # first dataset for 3/15
self.test_utils.load_sample_dataset()] # second dataset for 3/15
) as mock_fetch:
acquired = Update.run()
self.assertTrue(acquired)
self.assertEqual(mock_fetch_meta.call_count, 1)

# make sure the data now exists
with self.subTest(name='initial data checks'):
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101))
self.assertEqual(response['result'], 1)
self.assertEqual(len(response['epidata']), 1)
row = response['epidata'][0]
self.assertEqual(row['state'], 'WY')
self.assertEqual(row['date'], 20201209)
self.assertEqual(row['issue'], 20210315)
self.assertEqual(row['critical_staffing_shortage_today_yes'], 8)
self.assertEqual(row['total_patients_hospitalized_confirmed_influenza_covid_coverage'], 56)
actual = row['inpatient_bed_covid_utilization']
expected = 0.11729857819905214
self.assertAlmostEqual(actual, expected)
self.assertIsNone(row['critical_staffing_shortage_today_no'])

# expect 61 fields per row (63 database columns, except `id` and `record_type`)
self.assertEqual(len(row), 118)

with self.subTest(name='all date batches acquired'):
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101), issues=20210313)
self.assertEqual(response['result'], 1)

# re-acquisition of the same dataset should be a no-op
with self.subTest(name='second acquisition'), \
patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()) as mock_fetch_meta, \
patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()) as mock_fetch:
acquired = Update.run()
self.assertFalse(acquired)
def test_acquire_dataset(self):
"""Acquire a new dataset."""

# make sure the data still exists
with self.subTest(name='final data checks'):
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101))
self.assertEqual(response['result'], 1)
self.assertEqual(len(response['epidata']), 1)
with freeze_time("2021-03-15"):
# make sure the data does not yet exist
with self.subTest(name='no data yet'):
response = Epidata.covid_hosp('MA', Epidata.range(20200101, 20210101))
self.assertEqual(response['result'], -2, response)

# acquire sample data into local database
# mock out network calls to external hosts
# issues: 3/13, 3/15
with self.subTest(name='first acquisition'), \
patch.object(Network, 'fetch_metadata',
return_value=self.test_utils.load_sample_metadata("metadata.csv")) as mock_fetch_meta, \
patch.object(Network, 'fetch_dataset', side_effect=[
self.test_utils.load_sample_dataset(),
self.test_utils.load_sample_dataset()
]) as mock_fetch:
acquired = Update.run()
self.assertTrue(acquired)
self.assertEqual(mock_fetch_meta.call_count, 1)

# make sure the data now exists
with self.subTest(name='initial data checks'):
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101))
self.assertEqual(response['result'], 1)
self.assertEqual(len(response['epidata']), 1)
row = response['epidata'][0]
self.assertEqual(row['state'], 'WY')
self.assertEqual(row['date'], 20201209)
self.assertEqual(row['issue'], 20210315) # include today's data by default
self.assertEqual(row['critical_staffing_shortage_today_yes'], 8)
self.assertEqual(row['total_patients_hospitalized_confirmed_influenza_covid_coverage'], 56)
self.assertIsNone(row['critical_staffing_shortage_today_no'])

# expect 61 fields per row (63 database columns, except `id` and `record_type`)
self.assertEqual(len(row), 118)

with self.subTest(name='all date batches acquired'):
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101), issues=20210313)
self.assertEqual(response['result'], 1)

# re-acquisition of the same dataset should be a no-op
# issues: 3/13, 3/15
with self.subTest(name='second acquisition'), \
patch.object(Network, 'fetch_metadata',
return_value=self.test_utils.load_sample_metadata("metadata.csv")) as mock_fetch_meta, \
patch.object(Network, 'fetch_dataset', side_effect=[
self.test_utils.load_sample_dataset(),
self.test_utils.load_sample_dataset()
]) as mock_fetch:
acquired = Update.run()
self.assertFalse(acquired)

# make sure the data still exists
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101))
self.assertEqual(response['result'], 1)
self.assertEqual(len(response['epidata']), 1)

with freeze_time("2021-03-16"):
# simulate issue posted after yesterday's run
with self.subTest(name='late issue posted'), \
patch.object(Network, 'fetch_metadata',
return_value=self.test_utils.load_sample_metadata("metadata2.csv")) as mock_fetch_meta, \
patch.object(Network, 'fetch_dataset', side_effect=[
self.get_modified_dataset(critical_staffing_shortage_today_yes = 9, reporting_cutoff_start="2020-12-09"),
self.get_modified_dataset(critical_staffing_shortage_today_yes = 10, reporting_cutoff_start="2020-12-09"),
self.get_modified_dataset(critical_staffing_shortage_today_yes = 11, reporting_cutoff_start="2020-12-10"),
self.get_modified_dataset(critical_staffing_shortage_today_yes = 12, reporting_cutoff_start="2020-12-10"),
]) as mock_fetch:
acquired = Update.run()
self.assertTrue(acquired)
self.assertEqual(mock_fetch_meta.call_count, 1)

# make sure everything was filed correctly
with self.subTest(name='late issue data checks'):
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101))
self.assertEqual(response['result'], 1)
self.assertEqual(len(response['epidata']), 2)

# should have data from 03-15 00:00:01AM
row = response['epidata'][0]
self.assertEqual(row['state'], 'WY')
self.assertEqual(row['date'], 20201209)
self.assertEqual(row['issue'], 20210315) # include today's data by default
self.assertEqual(row['critical_staffing_shortage_today_yes'], 10)
self.assertEqual(row['total_patients_hospitalized_confirmed_influenza_covid_coverage'], 56)
self.assertIsNone(row['critical_staffing_shortage_today_no'])

# should have data from 03-16 00:00:01AM
row = response['epidata'][1]
self.assertEqual(row['state'], 'WY')
self.assertEqual(row['date'], 20201210)
self.assertEqual(row['issue'], 20210316) # include today's data by default
self.assertEqual(row['critical_staffing_shortage_today_yes'], 12)
self.assertEqual(row['total_patients_hospitalized_confirmed_influenza_covid_coverage'], 56)
self.assertIsNone(row['critical_staffing_shortage_today_no'])

# expect 61 fields per row (63 database columns, except `id` and `record_type`)
self.assertEqual(len(row), 118)

with self.subTest(name='all date batches acquired'):
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101), issues=20210316)
self.assertEqual(response['result'], 1)


@freeze_time("2021-03-16")
Expand All @@ -121,7 +181,7 @@ def test_acquire_specific_issue(self):
self.assertEqual(pre_max_issue, pd.Timestamp('1900-01-01 00:00:00'))
with self.subTest(name='first acquisition'), \
patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()) as mock_fetch_meta, \
patch.object(Network, 'fetch_dataset', side_effect=[self.test_utils.load_sample_dataset("dataset0.csv")]
patch.object(Network, 'fetch_dataset', side_effect=[self.test_utils.load_sample_dataset()]
) as mock_fetch:
acquired = Utils.update_dataset(Database,
Network,
Expand Down
8 changes: 6 additions & 2 deletions src/acquisition/covid_hosp/common/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,13 @@ def nan_safe_dtype(dtype, value):

num_columns = 2 + len(dataframe_columns_and_types) + len(self.additional_fields)
value_placeholders = ', '.join(['%s'] * num_columns)
columns = ', '.join(f'`{i.sql_name}`' for i in dataframe_columns_and_types + self.additional_fields)
col_names = [f'`{i.sql_name}`' for i in dataframe_columns_and_types + self.additional_fields]
columns = ', '.join(col_names)
updates = ', '.join(f'{c}=new_values.{c}' for c in col_names)
# NOTE: list in `updates` presumes `publication_col_name` is part of the unique key and thus not needed in UPDATE
sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) ' \
f'VALUES ({value_placeholders})'
f'VALUES ({value_placeholders}) AS new_values ' \
f'ON DUPLICATE KEY UPDATE {updates}'
id_and_publication_date = (0, publication_date)
if logger:
logger.info('updating values', count=len(dataframe.index))
Expand Down
54 changes: 41 additions & 13 deletions src/acquisition/covid_hosp/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,33 +188,61 @@ def update_dataset(database, network, newer_than=None, older_than=None):
Whether a new dataset was acquired.
"""
logger = database.logger()

metadata = network.fetch_metadata(logger=logger)
datasets = []
with database.connect() as db:
max_issue = db.get_max_issue(logger=logger)

older_than = datetime.datetime.today().date() if newer_than is None else older_than
newer_than = max_issue if newer_than is None else newer_than
# daily runs specify no bounds; patching runs specify at least one bound
is_patch_run = any(bound is not None for bound in (newer_than, older_than))
if is_patch_run:
logger.warn('runing update_dataset() as a "patch" with some specific date bound[s] specified;'
' this will include and overwrite any revisions that were already collected.',
newer_than=newer_than, older_than=older_than)
if older_than is None:
# by default, include days "older than tomorrow" which thus includes "today"
older_than = (datetime.datetime.today().date() + datetime.timedelta(days=1))
if newer_than is None:
# by default, include days "newer than the day before the last update"
# which thus includes the day of the last update (in case there are new updates
# that day which were published after the one we already ingested)
with database.connect() as db:
max_issue = db.get_max_issue(logger=logger)
newer_than = (max_issue - datetime.timedelta(days=1))
logger.info("looking up issues in date range", newer_than=newer_than, older_than=older_than)
daily_issues = Utils.issues_to_fetch(metadata, newer_than, older_than, logger=logger)
if not daily_issues:
logger.info("no new issues; nothing to do")
logger.info("no issues found in date range; nothing to do")
return False
for issue, revisions in daily_issues.items():
issue_int = int(issue.strftime("%Y%m%d"))
# download the dataset and add it to the database
dataset = Utils.merge_by_key_cols([network.fetch_dataset(url, logger=logger) for url, _ in revisions],
db.KEY_COLS,
logger=logger)
# add metadata to the database
# download dataset(s) and save associated metadata
dataset_list = []
all_metadata = []
for url, index in revisions:
all_metadata.append((url, metadata.loc[index].reset_index().to_json()))
with database.connect() as db:
already_in_db = db.contains_revision(url)
if already_in_db:
logger.info(f"already collected revision: {url}")
if is_patch_run or not already_in_db:
logger.info(f"including dataset revision: {url}")
dataset_list.append(network.fetch_dataset(url, logger=logger))
all_metadata.append((url, metadata.loc[index].reset_index().to_json()))
if not dataset_list:
# we already had everything for this issue or the issue was empty:
# move on to the next issue
continue
dataset = Utils.merge_by_key_cols(dataset_list,
database.KEY_COLS,
logger=logger)
datasets.append((
issue_int,
dataset,
all_metadata
))
tot_revs = sum(len(revisions) for revisions in daily_issues.values())
logger.info(f"{len(daily_issues)} issues checked w/ {tot_revs} revisions, resulting in {len(datasets)} datasets.")
if not datasets:
logger.info("nothing to do, exiting")
return False
with database.connect() as db:
for issue_int, dataset, all_metadata in datasets:
db.insert_dataset(issue_int, dataset, logger=logger)
Expand Down
2 changes: 1 addition & 1 deletion testdata/acquisition/covid_hosp/state_daily/dataset.csv
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ MI,30,129,4,32,127,4,41,159,23598,163,18003,163,3812,159,376,163,162,159,9,159,9
MN,21,116,2,26,111,2,63,138,10358,139,7558,139,1516,138,182,139,70,138,3,138,2,138,806,139,346,138,355,139,1490,138,1358,139,26,138,21,138,1019,139,0.7296775439273991,139,7558,10358,0.2082417582417582,138,1516,7280,0.151630326065213,138,1516,9998,0.365364308342133,138,346,947,0.7909715407262021,139,806,1019,2020-12-09,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56
MO,47,78,16,62,63,16,22,137,17433,141,13521,141,2611,137,315,141,239,137,5,137,18,137,1615,141,645,137,604,141,2546,137,2307,141,65,137,26,137,1931,141,0.7755980037859233,141,13521,17433,0.1964487247009254,137,2611,13291,0.1523959610109146,137,2611,17133,0.3456591639871382,137,645,1866,0.8363542206110823,141,1615,1931,2020-12-09,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56
MS,21,85,2,30,76,2,12,106,8799,108,5637,108,1254,106,142,108,30,106,3,106,5,106,718,108,338,106,263,108,1234,106,1066,108,20,106,5,106,881,108,0.6406409819297647,108,5637,8799,0.2250134577426879,106,1254,5573,0.143922873866636,106,1254,8713,0.3953216374269006,106,338,855,0.8149829738933031,108,718,881,2020-12-09,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56
WY,8,,2,7,22,2,5,29,1729,31,856,31,198,29,26,31,15,29,0,29,0,29,58,31,32,29,32,31,196,29,189,31,2,29,2,29,137,31,0.4950838635049161,31,856,1729,0.2362768496420047,29,198,838,0.1172985781990521,29,198,1688,0.2519685039370078,29,32,127,0.4233576642335766,31,58,137,2020/12/09,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56
WY,8,,2,7,22,2,5,29,1729,31,856,31,198,29,26,31,15,29,0,29,0,29,58,31,32,29,32,31,196,29,189,31,2,29,2,29,137,31,0.4950838635049161,31,856,1729,0.2362768496420047,29,198,838,0.1172985781990521,29,198,1688,0.2519685039370078,29,32,127,0.4233576642335766,31,58,137,2020-12-09,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56
Loading