Skip to content

Commit b1a540d

Browse files
authored
Merge pull request #1224 from cmu-delphi/krivard/covid_hosp-older_than
look for new issues "older than" tomorrow (instead of older than today), and from later in the same day as the most recently imported issue (instead of the day after)
2 parents 6ce89a9 + b2fb357 commit b1a540d

File tree

8 files changed

+208
-145
lines changed

8 files changed

+208
-145
lines changed

integrations/acquisition/covid_hosp/state_daily/test_scenarios.py

+114-54
Original file line numberDiff line numberDiff line change
@@ -47,62 +47,122 @@ def setUp(self):
4747
cur.execute('delete from api_user')
4848
cur.execute('insert into api_user(api_key, email) values("key", "email")')
4949

50-
@freeze_time("2021-03-16")
51-
def test_acquire_dataset(self):
52-
"""Acquire a new dataset."""
50+
def get_modified_dataset(self, critical_staffing_shortage_today_yes, reporting_cutoff_start):
51+
"""Get a simplified version of a test dataset.
5352
54-
# make sure the data does not yet exist
55-
with self.subTest(name='no data yet'):
56-
response = Epidata.covid_hosp('MA', Epidata.range(20200101, 20210101))
57-
self.assertEqual(response['result'], -2, response)
53+
Only WY data is modified. The issue date is specified in the metadata file.
54+
"""
55+
df = self.test_utils.load_sample_dataset()
56+
df_new = pd.DataFrame(df[df["state"] == "WY"], columns=df.columns).reset_index(drop=True)
57+
df_new["critical_staffing_shortage_today_yes"] = critical_staffing_shortage_today_yes
58+
df_new["reporting_cutoff_start"] = reporting_cutoff_start
59+
return df_new
5860

59-
# acquire sample data into local database
60-
# mock out network calls to external hosts
61-
with self.subTest(name='first acquisition'), \
62-
patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()) as mock_fetch_meta, \
63-
patch.object(Network, 'fetch_dataset', side_effect=[self.test_utils.load_sample_dataset("dataset0.csv"), # dataset for 3/13
64-
self.test_utils.load_sample_dataset("dataset0.csv"), # first dataset for 3/15
65-
self.test_utils.load_sample_dataset()] # second dataset for 3/15
66-
) as mock_fetch:
67-
acquired = Update.run()
68-
self.assertTrue(acquired)
69-
self.assertEqual(mock_fetch_meta.call_count, 1)
70-
71-
# make sure the data now exists
72-
with self.subTest(name='initial data checks'):
73-
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101))
74-
self.assertEqual(response['result'], 1)
75-
self.assertEqual(len(response['epidata']), 1)
76-
row = response['epidata'][0]
77-
self.assertEqual(row['state'], 'WY')
78-
self.assertEqual(row['date'], 20201209)
79-
self.assertEqual(row['issue'], 20210315)
80-
self.assertEqual(row['critical_staffing_shortage_today_yes'], 8)
81-
self.assertEqual(row['total_patients_hospitalized_confirmed_influenza_covid_coverage'], 56)
82-
actual = row['inpatient_bed_covid_utilization']
83-
expected = 0.11729857819905214
84-
self.assertAlmostEqual(actual, expected)
85-
self.assertIsNone(row['critical_staffing_shortage_today_no'])
86-
87-
# expect 61 fields per row (63 database columns, except `id` and `record_type`)
88-
self.assertEqual(len(row), 118)
89-
90-
with self.subTest(name='all date batches acquired'):
91-
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101), issues=20210313)
92-
self.assertEqual(response['result'], 1)
93-
94-
# re-acquisition of the same dataset should be a no-op
95-
with self.subTest(name='second acquisition'), \
96-
patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()) as mock_fetch_meta, \
97-
patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()) as mock_fetch:
98-
acquired = Update.run()
99-
self.assertFalse(acquired)
61+
def test_acquire_dataset(self):
62+
"""Acquire a new dataset."""
10063

101-
# make sure the data still exists
102-
with self.subTest(name='final data checks'):
103-
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101))
104-
self.assertEqual(response['result'], 1)
105-
self.assertEqual(len(response['epidata']), 1)
64+
with freeze_time("2021-03-15"):
65+
# make sure the data does not yet exist
66+
with self.subTest(name='no data yet'):
67+
response = Epidata.covid_hosp('MA', Epidata.range(20200101, 20210101))
68+
self.assertEqual(response['result'], -2, response)
69+
70+
# acquire sample data into local database
71+
# mock out network calls to external hosts
72+
# issues: 3/13, 3/15
73+
with self.subTest(name='first acquisition'), \
74+
patch.object(Network, 'fetch_metadata',
75+
return_value=self.test_utils.load_sample_metadata("metadata.csv")) as mock_fetch_meta, \
76+
patch.object(Network, 'fetch_dataset', side_effect=[
77+
self.test_utils.load_sample_dataset(),
78+
self.test_utils.load_sample_dataset()
79+
]) as mock_fetch:
80+
acquired = Update.run()
81+
self.assertTrue(acquired)
82+
self.assertEqual(mock_fetch_meta.call_count, 1)
83+
84+
# make sure the data now exists
85+
with self.subTest(name='initial data checks'):
86+
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101))
87+
self.assertEqual(response['result'], 1)
88+
self.assertEqual(len(response['epidata']), 1)
89+
row = response['epidata'][0]
90+
self.assertEqual(row['state'], 'WY')
91+
self.assertEqual(row['date'], 20201209)
92+
self.assertEqual(row['issue'], 20210315) # include today's data by default
93+
self.assertEqual(row['critical_staffing_shortage_today_yes'], 8)
94+
self.assertEqual(row['total_patients_hospitalized_confirmed_influenza_covid_coverage'], 56)
95+
self.assertIsNone(row['critical_staffing_shortage_today_no'])
96+
97+
# expect 61 fields per row (63 database columns, except `id` and `record_type`)
98+
self.assertEqual(len(row), 118)
99+
100+
with self.subTest(name='all date batches acquired'):
101+
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101), issues=20210313)
102+
self.assertEqual(response['result'], 1)
103+
104+
# re-acquisition of the same dataset should be a no-op
105+
# issues: 3/13, 3/15
106+
with self.subTest(name='second acquisition'), \
107+
patch.object(Network, 'fetch_metadata',
108+
return_value=self.test_utils.load_sample_metadata("metadata.csv")) as mock_fetch_meta, \
109+
patch.object(Network, 'fetch_dataset', side_effect=[
110+
self.test_utils.load_sample_dataset(),
111+
self.test_utils.load_sample_dataset()
112+
]) as mock_fetch:
113+
acquired = Update.run()
114+
self.assertFalse(acquired)
115+
116+
# make sure the data still exists
117+
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101))
118+
self.assertEqual(response['result'], 1)
119+
self.assertEqual(len(response['epidata']), 1)
120+
121+
with freeze_time("2021-03-16"):
122+
# simulate issue posted after yesterday's run
123+
with self.subTest(name='late issue posted'), \
124+
patch.object(Network, 'fetch_metadata',
125+
return_value=self.test_utils.load_sample_metadata("metadata2.csv")) as mock_fetch_meta, \
126+
patch.object(Network, 'fetch_dataset', side_effect=[
127+
self.get_modified_dataset(critical_staffing_shortage_today_yes = 9, reporting_cutoff_start="2020-12-09"),
128+
self.get_modified_dataset(critical_staffing_shortage_today_yes = 10, reporting_cutoff_start="2020-12-09"),
129+
self.get_modified_dataset(critical_staffing_shortage_today_yes = 11, reporting_cutoff_start="2020-12-10"),
130+
self.get_modified_dataset(critical_staffing_shortage_today_yes = 12, reporting_cutoff_start="2020-12-10"),
131+
]) as mock_fetch:
132+
acquired = Update.run()
133+
self.assertTrue(acquired)
134+
self.assertEqual(mock_fetch_meta.call_count, 1)
135+
136+
# make sure everything was filed correctly
137+
with self.subTest(name='late issue data checks'):
138+
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101))
139+
self.assertEqual(response['result'], 1)
140+
self.assertEqual(len(response['epidata']), 2)
141+
142+
# should have data from 03-15 00:00:01AM
143+
row = response['epidata'][0]
144+
self.assertEqual(row['state'], 'WY')
145+
self.assertEqual(row['date'], 20201209)
146+
self.assertEqual(row['issue'], 20210315) # include today's data by default
147+
self.assertEqual(row['critical_staffing_shortage_today_yes'], 10)
148+
self.assertEqual(row['total_patients_hospitalized_confirmed_influenza_covid_coverage'], 56)
149+
self.assertIsNone(row['critical_staffing_shortage_today_no'])
150+
151+
# should have data from 03-16 00:00:01AM
152+
row = response['epidata'][1]
153+
self.assertEqual(row['state'], 'WY')
154+
self.assertEqual(row['date'], 20201210)
155+
self.assertEqual(row['issue'], 20210316) # include today's data by default
156+
self.assertEqual(row['critical_staffing_shortage_today_yes'], 12)
157+
self.assertEqual(row['total_patients_hospitalized_confirmed_influenza_covid_coverage'], 56)
158+
self.assertIsNone(row['critical_staffing_shortage_today_no'])
159+
160+
# expect 61 fields per row (63 database columns, except `id` and `record_type`)
161+
self.assertEqual(len(row), 118)
162+
163+
with self.subTest(name='all date batches acquired'):
164+
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101), issues=20210316)
165+
self.assertEqual(response['result'], 1)
106166

107167

108168
@freeze_time("2021-03-16")
@@ -121,7 +181,7 @@ def test_acquire_specific_issue(self):
121181
self.assertEqual(pre_max_issue, pd.Timestamp('1900-01-01 00:00:00'))
122182
with self.subTest(name='first acquisition'), \
123183
patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()) as mock_fetch_meta, \
124-
patch.object(Network, 'fetch_dataset', side_effect=[self.test_utils.load_sample_dataset("dataset0.csv")]
184+
patch.object(Network, 'fetch_dataset', side_effect=[self.test_utils.load_sample_dataset()]
125185
) as mock_fetch:
126186
acquired = Utils.update_dataset(Database,
127187
Network,

src/acquisition/covid_hosp/common/database.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,13 @@ def nan_safe_dtype(dtype, value):
186186

187187
num_columns = 2 + len(dataframe_columns_and_types) + len(self.additional_fields)
188188
value_placeholders = ', '.join(['%s'] * num_columns)
189-
columns = ', '.join(f'`{i.sql_name}`' for i in dataframe_columns_and_types + self.additional_fields)
189+
col_names = [f'`{i.sql_name}`' for i in dataframe_columns_and_types + self.additional_fields]
190+
columns = ', '.join(col_names)
191+
updates = ', '.join(f'{c}=new_values.{c}' for c in col_names)
192+
# NOTE: list in `updates` presumes `publication_col_name` is part of the unique key and thus not needed in UPDATE
190193
sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) ' \
191-
f'VALUES ({value_placeholders})'
194+
f'VALUES ({value_placeholders}) AS new_values ' \
195+
f'ON DUPLICATE KEY UPDATE {updates}'
192196
id_and_publication_date = (0, publication_date)
193197
if logger:
194198
logger.info('updating values', count=len(dataframe.index))

src/acquisition/covid_hosp/common/utils.py

+41-13
Original file line numberDiff line numberDiff line change
@@ -188,33 +188,61 @@ def update_dataset(database, network, newer_than=None, older_than=None):
188188
Whether a new dataset was acquired.
189189
"""
190190
logger = database.logger()
191-
191+
192192
metadata = network.fetch_metadata(logger=logger)
193193
datasets = []
194-
with database.connect() as db:
195-
max_issue = db.get_max_issue(logger=logger)
196-
197-
older_than = datetime.datetime.today().date() if newer_than is None else older_than
198-
newer_than = max_issue if newer_than is None else newer_than
194+
# daily runs specify no bounds; patching runs specify at least one bound
195+
is_patch_run = any(bound is not None for bound in (newer_than, older_than))
196+
if is_patch_run:
197+
logger.warn('runing update_dataset() as a "patch" with some specific date bound[s] specified;'
198+
' this will include and overwrite any revisions that were already collected.',
199+
newer_than=newer_than, older_than=older_than)
200+
if older_than is None:
201+
# by default, include days "older than tomorrow" which thus includes "today"
202+
older_than = (datetime.datetime.today().date() + datetime.timedelta(days=1))
203+
if newer_than is None:
204+
# by default, include days "newer than the day before the last update"
205+
# which thus includes the day of the last update (in case there are new updates
206+
# that day which were published after the one we already ingested)
207+
with database.connect() as db:
208+
max_issue = db.get_max_issue(logger=logger)
209+
newer_than = (max_issue - datetime.timedelta(days=1))
210+
logger.info("looking up issues in date range", newer_than=newer_than, older_than=older_than)
199211
daily_issues = Utils.issues_to_fetch(metadata, newer_than, older_than, logger=logger)
200212
if not daily_issues:
201-
logger.info("no new issues; nothing to do")
213+
logger.info("no issues found in date range; nothing to do")
202214
return False
203215
for issue, revisions in daily_issues.items():
204216
issue_int = int(issue.strftime("%Y%m%d"))
205-
# download the dataset and add it to the database
206-
dataset = Utils.merge_by_key_cols([network.fetch_dataset(url, logger=logger) for url, _ in revisions],
207-
db.KEY_COLS,
208-
logger=logger)
209-
# add metadata to the database
217+
# download dataset(s) and save associated metadata
218+
dataset_list = []
210219
all_metadata = []
211220
for url, index in revisions:
212-
all_metadata.append((url, metadata.loc[index].reset_index().to_json()))
221+
with database.connect() as db:
222+
already_in_db = db.contains_revision(url)
223+
if already_in_db:
224+
logger.info(f"already collected revision: {url}")
225+
if is_patch_run or not already_in_db:
226+
logger.info(f"including dataset revision: {url}")
227+
dataset_list.append(network.fetch_dataset(url, logger=logger))
228+
all_metadata.append((url, metadata.loc[index].reset_index().to_json()))
229+
if not dataset_list:
230+
# we already had everything for this issue or the issue was empty:
231+
# move on to the next issue
232+
continue
233+
dataset = Utils.merge_by_key_cols(dataset_list,
234+
database.KEY_COLS,
235+
logger=logger)
213236
datasets.append((
214237
issue_int,
215238
dataset,
216239
all_metadata
217240
))
241+
tot_revs = sum(len(revisions) for revisions in daily_issues.values())
242+
logger.info(f"{len(daily_issues)} issues checked w/ {tot_revs} revisions, resulting in {len(datasets)} datasets.")
243+
if not datasets:
244+
logger.info("nothing to do, exiting")
245+
return False
218246
with database.connect() as db:
219247
for issue_int, dataset, all_metadata in datasets:
220248
db.insert_dataset(issue_int, dataset, logger=logger)

testdata/acquisition/covid_hosp/state_daily/dataset.csv

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,4 @@ MI,30,129,4,32,127,4,41,159,23598,163,18003,163,3812,159,376,163,162,159,9,159,9
5151
MN,21,116,2,26,111,2,63,138,10358,139,7558,139,1516,138,182,139,70,138,3,138,2,138,806,139,346,138,355,139,1490,138,1358,139,26,138,21,138,1019,139,0.7296775439273991,139,7558,10358,0.2082417582417582,138,1516,7280,0.151630326065213,138,1516,9998,0.365364308342133,138,346,947,0.7909715407262021,139,806,1019,2020-12-09,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56
5252
MO,47,78,16,62,63,16,22,137,17433,141,13521,141,2611,137,315,141,239,137,5,137,18,137,1615,141,645,137,604,141,2546,137,2307,141,65,137,26,137,1931,141,0.7755980037859233,141,13521,17433,0.1964487247009254,137,2611,13291,0.1523959610109146,137,2611,17133,0.3456591639871382,137,645,1866,0.8363542206110823,141,1615,1931,2020-12-09,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56
5353
MS,21,85,2,30,76,2,12,106,8799,108,5637,108,1254,106,142,108,30,106,3,106,5,106,718,108,338,106,263,108,1234,106,1066,108,20,106,5,106,881,108,0.6406409819297647,108,5637,8799,0.2250134577426879,106,1254,5573,0.143922873866636,106,1254,8713,0.3953216374269006,106,338,855,0.8149829738933031,108,718,881,2020-12-09,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56
54-
WY,8,,2,7,22,2,5,29,1729,31,856,31,198,29,26,31,15,29,0,29,0,29,58,31,32,29,32,31,196,29,189,31,2,29,2,29,137,31,0.4950838635049161,31,856,1729,0.2362768496420047,29,198,838,0.1172985781990521,29,198,1688,0.2519685039370078,29,32,127,0.4233576642335766,31,58,137,2020/12/09,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56
54+
WY,8,,2,7,22,2,5,29,1729,31,856,31,198,29,26,31,15,29,0,29,0,29,58,31,32,29,32,31,196,29,189,31,2,29,2,29,137,31,0.4950838635049161,31,856,1729,0.2362768496420047,29,198,838,0.1172985781990521,29,198,1688,0.2519685039370078,29,32,127,0.4233576642335766,31,58,137,2020-12-09,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56

0 commit comments

Comments
 (0)