Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split covid_hosp into daily & timeseries tables #1126

Open
wants to merge 42 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e37407b
[WIP] Split covid_hosp into daily & timeseries tables
rzats Apr 7, 2023
0647157
Fix API endpoint
rzats Apr 7, 2023
e2680e6
Alias
rzats Apr 7, 2023
beb6b6f
Test changes
rzats Apr 7, 2023
569c396
no date
rzats Apr 7, 2023
c2ee818
Remove test checks
rzats Apr 7, 2023
2033478
truncate more instead
rzats Apr 7, 2023
23313b5
no D/T
rzats Apr 7, 2023
926cf16
aliases?
rzats Apr 7, 2023
321415c
alt subqueries
rzats Apr 7, 2023
f5718ca
workaround?
rzats Apr 7, 2023
edf11a1
workaround? 2
rzats Apr 7, 2023
927cbd1
Undo test changes
rzats Apr 12, 2023
bfe3394
Add missing lines
rzats Apr 12, 2023
62fe29b
Test AUTO_INCREMENT
rzats Apr 12, 2023
015eaf4
Review fixes
rzats Apr 12, 2023
28873b7
SQL rewrite
rzats Apr 19, 2023
f6aec5d
aliases + indent
rzats Apr 19, 2023
ef55754
simplify
rzats Apr 19, 2023
199ad09
Update migration docs
rzats May 10, 2023
db925ef
Try alternate recent issues query
rzats May 12, 2023
bc5d736
Optimized query
rzats May 17, 2023
30581b1
row filter
rzats May 17, 2023
3893f62
aliases
rzats May 17, 2023
128fafc
what if it's an index problem
rzats May 19, 2023
3e671b4
and what if this is an index problem too
rzats May 19, 2023
adc8487
rewrite #1
rzats May 19, 2023
1e7654d
fixup partition
rzats May 19, 2023
560e356
fixup union all
rzats May 19, 2023
69f9897
Try without CTEs
rzats May 19, 2023
acf9b1f
no brackets
rzats May 19, 2023
0719c96
revert to more performant query
rzats May 19, 2023
1f47ada
refactor a little
rzats May 19, 2023
c147287
refactor a little more
rzats May 19, 2023
098d590
don't forget record_type
rzats May 19, 2023
efb1a70
try inner join
rzats May 19, 2023
3d07161
fixup tables
rzats May 19, 2023
d8db5c2
and no caps here
rzats May 19, 2023
cecf957
aliases...
rzats May 19, 2023
046f123
aliases
rzats May 19, 2023
355abfd
Comment & cleanup
rzats May 19, 2023
c6016d1
Misc. fixes to SQL
rzats Jun 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def setUp(self):
# clear relevant tables
with Database.connect() as db:
with db.new_cursor() as cur:
cur.execute('truncate table covid_hosp_state_daily')
cur.execute('truncate table covid_hosp_state_timeseries')
cur.execute('truncate table covid_hosp_meta')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def setUp(self):
# clear relevant tables
with Database.connect() as db:
with db.new_cursor() as cur:
cur.execute('truncate table covid_hosp_state_daily')
cur.execute('truncate table covid_hosp_state_timeseries')
cur.execute('truncate table covid_hosp_meta')

Expand Down
31 changes: 19 additions & 12 deletions integrations/server/test_covid_hosp.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,21 @@ def setUp(self):
# clear relevant tables
with Database.connect() as db:
with db.new_cursor() as cur:
cur.execute('truncate table covid_hosp_state_daily')
cur.execute('truncate table covid_hosp_state_timeseries')
cur.execute('truncate table covid_hosp_meta')


def insert_issue(self, cur, issue, value, record_type):
so_many_nulls = ', '.join(['null'] * 57)
def insert_timeseries(self, cur, issue, value):
so_many_nulls = ', '.join(['null'] * 114)
cur.execute(f'''insert into covid_hosp_state_timeseries values (
0, {issue}, 'PA', 20201118, {value}, {so_many_nulls}, '{record_type}', {so_many_nulls}
0, {issue}, 'PA', 20201118, {value}, {so_many_nulls}
)''')

def insert_daily(self, cur, issue, value):
so_many_nulls = ', '.join(['null'] * 114)
cur.execute(f'''insert into covid_hosp_state_daily values (
0, {issue}, 'PA', 20201118, {value}, {so_many_nulls}
)''')

def test_query_by_issue(self):
Expand All @@ -42,10 +49,10 @@ def test_query_by_issue(self):
with db.new_cursor() as cur:
# inserting out of order to test server-side order by
# also inserting two for 20201201 to test tiebreaker.
self.insert_issue(cur, 20201201, 123, 'T')
self.insert_issue(cur, 20201201, 321, 'D')
self.insert_issue(cur, 20201203, 789, 'T')
self.insert_issue(cur, 20201202, 456, 'T')
self.insert_timeseries(cur, 20201201, 123)
self.insert_daily(cur, 20201201, 321)
self.insert_timeseries(cur, 20201203, 789)
self.insert_timeseries(cur, 20201202, 456)

# request without issue (defaulting to latest issue)
with self.subTest(name='no issue (latest)'):
Expand Down Expand Up @@ -86,11 +93,11 @@ def test_query_by_issue(self):
def test_query_by_as_of(self):
with Database.connect() as db:
with db.new_cursor() as cur:
self.insert_issue(cur, 20201101, 0, 'T')
self.insert_issue(cur, 20201102, 1, 'D')
self.insert_issue(cur, 20201103, 2, 'D')
self.insert_issue(cur, 20201103, 3, 'T')
self.insert_issue(cur, 20201104, 4, 'T')
self.insert_timeseries(cur, 20201101, 0)
self.insert_daily(cur, 20201102, 1)
self.insert_daily(cur, 20201103, 2)
self.insert_timeseries(cur, 20201103, 3)
self.insert_timeseries(cur, 20201104, 4)

with self.subTest(name='as_of with multiple issues'):
response = Epidata.covid_hosp('PA', 20201118, as_of=20201103)
Expand Down
16 changes: 5 additions & 11 deletions src/acquisition/covid_hosp/common/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ def __init__(self,
table_name=None,
hhs_dataset_id=None,
columns_and_types=None,
key_columns=None,
additional_fields=None):
key_columns=None):
"""Create a new Database object.

Parameters
Expand All @@ -37,22 +36,18 @@ def __init__(self,
columns_and_types : tuple[str, str, Callable]
List of 3-tuples of (CSV header name, SQL column name, data type) for
all the columns in the CSV file.
additional_fields : tuple[str]
List of 2-tuples of (value, SQL column name) fordditional fields to include
at the end of the row which are not present in the CSV data.
"""

self.connection = connection
self.table_name = table_name
self.hhs_dataset_id = hhs_dataset_id
self.publication_col_name = "issue" if table_name == 'covid_hosp_state_timeseries' else \
self.publication_col_name = "issue" if table_name == 'covid_hosp_state_timeseries' or table_name == "covid_hosp_state_daily" else \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is getting a little gnarly, but we will fix it with the other yaml-schema changes so you can leave it for now

'publication_date'
self.columns_and_types = {
c.csv_name: c
for c in (columns_and_types if columns_and_types is not None else [])
}
self.key_columns = key_columns if key_columns is not None else []
self.additional_fields = additional_fields if additional_fields is not None else []

@classmethod
def logger(database_class):
Expand Down Expand Up @@ -184,9 +179,9 @@ def nan_safe_dtype(dtype, value):
for csv_name in self.key_columns:
dataframe.loc[:, csv_name] = dataframe[csv_name].map(self.columns_and_types[csv_name].dtype)

num_columns = 2 + len(dataframe_columns_and_types) + len(self.additional_fields)
num_columns = 2 + len(dataframe_columns_and_types)
value_placeholders = ', '.join(['%s'] * num_columns)
columns = ', '.join(f'`{i.sql_name}`' for i in dataframe_columns_and_types + self.additional_fields)
columns = ', '.join(f'`{i.sql_name}`' for i in dataframe_columns_and_types)
sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) ' \
f'VALUES ({value_placeholders})'
id_and_publication_date = (0, publication_date)
Expand All @@ -200,8 +195,7 @@ def nan_safe_dtype(dtype, value):
for c in dataframe_columns_and_types:
values.append(nan_safe_dtype(c.dtype, row[c.csv_name]))
many_values.append(id_and_publication_date +
tuple(values) +
tuple(i.csv_name for i in self.additional_fields))
tuple(values))
n += 1
# insert in batches because one at a time is slow and all at once makes
# the connection drop :(
Expand Down
6 changes: 2 additions & 4 deletions src/acquisition/covid_hosp/state_daily/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@

class Database(BaseDatabase):

# note we share a database with state_timeseries
TABLE_NAME = 'covid_hosp_state_timeseries'
TABLE_NAME = 'covid_hosp_state_daily'
KEY_COLS = ['state', 'reporting_cutoff_start']
# These are 3-tuples of (CSV header name, SQL db column name, data type) for
# all the columns in the CSV file.
Expand Down Expand Up @@ -226,5 +225,4 @@ def __init__(self, *args, **kwargs):
table_name=Database.TABLE_NAME,
hhs_dataset_id=Network.DATASET_ID,
columns_and_types=Database.ORDERED_CSV_COLUMNS,
key_columns=Database.KEY_COLS,
additional_fields=[Columndef('D', 'record_type', None)])
key_columns=Database.KEY_COLS)
3 changes: 1 addition & 2 deletions src/acquisition/covid_hosp/state_timeseries/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,5 +225,4 @@ def __init__(self, *args, **kwargs):
table_name=Database.TABLE_NAME,
hhs_dataset_id=Network.DATASET_ID,
columns_and_types=Database.ORDERED_CSV_COLUMNS,
key_columns=Database.KEY_COLS,
additional_fields=[Columndef('T', 'record_type', None)])
key_columns=Database.KEY_COLS)
45 changes: 26 additions & 19 deletions src/ddl/covid_hosp.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ CREATE TABLE `covid_hosp_meta` (


/*
`covid_hosp_state_timeseries` stores the versioned "state timeseries" dataset,
which contains data from both the time series data and the daily snapshot files.

`covid_hosp_state_timeseries` stores time series data from the versioned "state timeseries" dataset.
Data is public under the Open Data Commons Open Database License (ODbL).

+------------------------------------------------------------------+---------+------+-----+---------+----------------+
Expand Down Expand Up @@ -131,7 +129,6 @@ Data is public under the Open Data Commons Open Database License (ODbL).
| adult_icu_bed_utilization_coverage | int(11) | YES | | NULL | |
| adult_icu_bed_utilization_numerator | int(11) | YES | | NULL | |
| adult_icu_bed_utilization_denominator | int(11) | YES | | NULL | |
| record_type | char(1) | NO | MUL | NULL | |
+------------------------------------------------------------------+---------+------+-----+---------+----------------+

- `id`
Expand Down Expand Up @@ -366,14 +363,6 @@ For daily snapshot files, there is a `reporting_cutoff_start` value,
defined as "Look back date start - The latest reports from each hospital
is summed for this report starting with this date." We place this value
into the `date` column.

We also add a column `record_type` that specifies if a row came from a
time series file or a daily snapshot file. "T" = time series and
"D" = daily snapshot. When both a time series and a daily snapshot row
have the same issue/date/state but different values, we tiebreak by
taking the daily snapshot value. This is done with a window function that
sorts by the record_type field, ascending, and so it is important that "D"
comes before "T".
*/

CREATE TABLE `covid_hosp_state_timeseries` (
Expand Down Expand Up @@ -439,7 +428,6 @@ CREATE TABLE `covid_hosp_state_timeseries` (
`adult_icu_bed_utilization_coverage` INT,
`adult_icu_bed_utilization_numerator` INT,
`adult_icu_bed_utilization_denominator` INT,
`record_type` CHAR(1) NOT NULL,
-- new columns added Oct 10
`geocoded_state` VARCHAR(32),
`previous_day_admission_adult_covid_confirmed_18_19` INT,
Expand Down Expand Up @@ -500,15 +488,34 @@ CREATE TABLE `covid_hosp_state_timeseries` (
`total_patients_hospitalized_confirmed_influenza_coverage` INT,
PRIMARY KEY (`id`),
-- for uniqueness
-- for fast lookup of most recent issue for a given state, date, and record type
UNIQUE KEY `issue_by_state_and_date` (`state`, `date`, `issue`, `record_type`),
-- for fast lookup of a time-series for a given state, issue, and record type
KEY `date_by_issue_and_state` (`issue`, `state`, `date`, `record_type`),
-- for fast lookup of all states for a given date, issue, and record_type
KEY `state_by_issue_and_date` (`issue`, `date`, `state`, `record_type`)
-- for fast lookup of most recent issue for a given state and date
UNIQUE KEY `issue_by_state_and_date` (`state`, `date`, `issue`),
-- for fast lookup of a time-series for a given state and issue
KEY `date_by_issue_and_state` (`issue`, `state`, `date`),
-- for fast lookup of all states for a given date and issue
KEY `state_by_issue_and_date` (`issue`, `date`, `state`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


/*
`covid_hosp_state_daily` stores the versioned "state timeseries" dataset,
which contains data from the daily snapshot files.
Schema is equivalent to `covid_hosp_state_timeseries`.
*/
CREATE TABLE `covid_hosp_state_daily` (
-- for uniqueness
PRIMARY KEY (`id`),
-- for fast lookup of most recent issue for a given state and date
UNIQUE KEY `issue_by_state_and_date` (`state`, `date`, `issue`),
-- for fast lookup of a time-series for a given state and issue
KEY `date_by_issue_and_state` (`issue`, `state`, `date`),
-- for fast lookup of all states for a given date and issue
KEY `state_by_issue_and_date` (`issue`, `date`, `state`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
SELECT * FROM covid_hosp_state_timeseries;
-- AUTOINCREMENT is not preserved by `CREATE TABLE ... SELECT`; Re-add
ALTER TABLE covid_hosp_state_daily MODIFY id INT NOT NULL AUTO_INCREMENT;

/*
`covid_hosp_facility` stores the versioned "facility" dataset.

Expand Down
24 changes: 24 additions & 0 deletions src/ddl/migrations/covid_hosp_state_split_tables.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- 1. Add new state_daily table mirroring state_timeseries table

CREATE TABLE `covid_hosp_state_daily` (
-- for uniqueness
PRIMARY KEY (`id`),
-- for fast lookup of most recent issue for a given state and date
UNIQUE KEY `issue_by_state_and_date` (`state`, `date`, `issue`),
-- for fast lookup of a time-series for a given state and issue
KEY `date_by_issue_and_state` (`issue`, `state`, `date`),
-- for fast lookup of all states for a given date and issue
KEY `state_by_issue_and_date` (`issue`, `date`, `state`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
SELECT * FROM covid_hosp_state_timeseries WHERE record_type='D';
-- AUTOINCREMENT is not preserved by `CREATE TABLE ... SELECT`; Re-add
ALTER TABLE covid_hosp_state_daily MODIFY id INT NOT NULL AUTO_INCREMENT;

-- 2. Remove data with incorrect record_type from timeseries table (D records were moved to daily)

DELETE FROM `covid_hosp_state_timeseries` WHERE record_type='D';

-- 3. Remove the record_type column from both tables

ALTER TABLE `covid_hosp_state_daily` DROP COLUMN record_type;
ALTER TABLE `covid_hosp_state_timeseries` DROP COLUMN record_type;
67 changes: 57 additions & 10 deletions src/server/endpoints/covid_hosp_state_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,66 @@ def handle():
q.where_integers("date", dates)
q.where_strings("state", states)

# These queries prioritize the daily value if there is both a time series and daily value for a given issue/date/state.
# Further details: https://github.com/cmu-delphi/delphi-epidata/pull/336/files#diff-097d4969fdc9ac1f722809e85f3dc59ad371b66011861a50d15fcc605839c63dR364-R368
if issues is not None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if issues is not None:
if issues:

does this work? if so, its a little easier to comprehend. same for the if as_of is not None: statement below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more specifically -- do we always want to treat falsy values of issues as if issues was None? 0, False, [], "", {}, etc

we don't encourage users to submit 0 as an issue, but that doesn't mean they couldn't do it

# Filter for all matching issues
q.where_integers("issue", issues)
# final query using specific issues
query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state, issue ORDER BY record_type) `row` FROM {q.table} WHERE {q.conditions_clause}) SELECT {q.fields_clause} FROM {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause}"
elif as_of is not None:
sub_condition_asof = "(issue <= :as_of)"
q.params["as_of"] = as_of
query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state ORDER BY issue DESC, record_type) `row` FROM {q.table} WHERE {q.conditions_clause} AND {sub_condition_asof}) SELECT {q.fields_clause} FROM {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause}"

# Get all issues matching the conditions from daily & timeseries
union_subquery = f'''
(
SELECT *, 'D' AS record_type FROM `covid_hosp_state_daily` AS {q.alias} WHERE {q.conditions_clause}
UNION ALL
SELECT *, 'T' AS record_type FROM `covid_hosp_state_timeseries` AS {q.alias} WHERE {q.conditions_clause}
) AS {q.alias}'''

# Prioritize rows with record_type='D' for each issue/date/state group
query = f'''
SELECT {q.fields_clause} FROM (
SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY issue, date, state ORDER BY record_type) AS `row`
FROM {union_subquery}
) AS {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause}
'''
else:
# final query using most recent issues
subquery = f"(SELECT max(`issue`) `max_issue`, `date`, `state` FROM {q.table} WHERE {q.conditions_clause} GROUP BY `date`, `state`) x"
condition = f"x.`max_issue` = {q.alias}.`issue` AND x.`date` = {q.alias}.`date` AND x.`state` = {q.alias}.`state`"
query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state, issue ORDER BY record_type) `row` FROM {q.table} JOIN {subquery} ON {condition}) select {q.fields_clause} FROM {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause}"
# Filter for most recent issues
cond_clause = q.conditions_clause
if as_of is not None:
# ...Filter for most recent issues before a given as_of
cond_clause += " AND (issue <= :as_of)"
q.params["as_of"] = as_of

join_condition = f"{q.alias}.state = x.state AND {q.alias}.date = x.date AND {q.alias}.issue = x.max_issue"

# Get the rows from the daily & timeseries tables with the highest issue value within each state/date group
join_daily = f'''
SELECT {q.fields_clause}, 'D' AS record_type FROM `covid_hosp_state_daily` AS {q.alias}
JOIN (
SELECT {q.alias}.state, {q.alias}.date, MAX({q.alias}.issue) AS max_issue
FROM `covid_hosp_state_daily` AS {q.alias}
WHERE {cond_clause}
GROUP BY {q.alias}.state, {q.alias}.date
) AS x
ON {join_condition}
'''
join_timeseries = f'''
SELECT {q.fields_clause}, 'T' AS record_type FROM `covid_hosp_state_timeseries` AS {q.alias}
JOIN (
SELECT {q.alias}.state, {q.alias}.date, MAX(issue) AS max_issue
FROM `covid_hosp_state_timeseries` AS {q.alias}
WHERE {cond_clause}
GROUP BY {q.alias}.state, {q.alias}.date
) AS x
ON {join_condition}
'''

# Combine daily & timeseries queries, getting the combined latest issues (and prioritizing rows with record_type='D' in a tie)
query = f'''
SELECT {q.fields_clause} FROM (
SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY state, date ORDER BY issue DESC, record_type) AS `row`
FROM ({join_daily} UNION ALL {join_timeseries}) AS {q.alias}
) AS {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause}
'''

# send query
return execute_query(query, q.params, fields_string, fields_int, fields_float)
2 changes: 1 addition & 1 deletion tests/acquisition/covid_hosp/state_daily/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_insert_dataset(self):
17, 18, 19, 20, 21, 22, 23, 31, 24, 25, 15, 26, 27, 28, 29, 30, 31, 32,
33, 34, 35, 36, 37, 38, 39, 40, 41, 29, 42, 43, 44, 45, 0, 29, 0, 29,
46, 47, 48, 49, 50, 51, 52, 58, 31, 32, 29, 32, 31, 196, 29, 189, 31,
53, 54, 55, 56, 2, 29, 2, 29, 137, 31, 'D')
53, 54, 55, 56, 2, 29, 2, 29, 137, 31)
self.assertEqual(len(last_query_values), len(expected_query_values))

for actual, expected in zip(last_query_values, expected_query_values):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_insert_dataset(self):
24, 25, 13, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
41, 26, 42, 43, 44, 45, 0, 21, 0, 22, 46, 47, 48, 49, 50, 51, 52, 49,
28, 10, 26, 7, 28, 17, 26, 14, 28, 53, 54, 55, 56, 0, 26, 0, 26,
114, 28, 'T')
114, 28)
self.assertEqual(len(last_query_values), len(expected_query_values))

for actual, expected in zip(last_query_values, expected_query_values):
Expand Down