-
Notifications
You must be signed in to change notification settings - Fork 67
feat: add min_issue to covidcast meta data #236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
f723beb
f177818
6f01aa5
b2fce76
a581f0b
06aab57
a85adbb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -162,12 +162,12 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False | |
SET `is_latest_issue`=0 | ||
''' | ||
set_is_latest_issue_sql = f''' | ||
UPDATE | ||
UPDATE | ||
( | ||
SELECT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, MAX(`issue`) AS `issue` | ||
FROM | ||
( | ||
SELECT DISTINCT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value` | ||
SELECT DISTINCT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value` | ||
FROM `{tmp_table_name}` | ||
) AS TMP | ||
LEFT JOIN `covidcast` | ||
|
@@ -176,7 +176,7 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False | |
) AS TMP | ||
LEFT JOIN `covidcast` | ||
USING (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, `issue`) | ||
SET `is_latest_issue`=1 | ||
SET `is_latest_issue`=1 | ||
''' | ||
|
||
# TODO: ^ do we want to reset `direction_updated_timestamp` and `direction` in the duplicate key case? | ||
|
@@ -333,7 +333,7 @@ def get_all_record_values_of_timeseries_with_potentially_stale_direction(self, t | |
# A query that selects all rows from `covidcast` that have latest issue-date | ||
# for any (time-series-key, time_value) with time_type='day'. | ||
latest_issues_sql = f''' | ||
SELECT | ||
SELECT | ||
`id`, | ||
`source`, | ||
`signal`, | ||
|
@@ -560,68 +560,69 @@ def get_covidcast_meta(self): | |
|
||
sql = 'SELECT `source`, `signal` FROM covidcast WHERE NOT `is_wip` GROUP BY `source`, `signal` ORDER BY `source` ASC, `signal` ASC;' | ||
self._cursor.execute(sql) | ||
for source, signal in [ss for ss in self._cursor]: #NOTE: this obfuscation protects the integrity of the cursor; using the cursor as a generator will cause contention w/ subsequent queries | ||
signals = [ss for ss in self._cursor] #NOTE: this obfuscation protects the integrity of the cursor; using the cursor as a generator will cause contention w/ subsequent queries | ||
for source, signal in signals: | ||
|
||
# calculate the min issues per combination | ||
sql = ''' | ||
SELECT | ||
`time_type`, | ||
`geo_type`, | ||
MIN(`issue`) as `min_issue` | ||
FROM | ||
`covidcast` | ||
WHERE | ||
`source` = %s AND | ||
`signal` = %s | ||
GROUP BY | ||
`time_type`, | ||
`geo_type` | ||
''' | ||
self._cursor.execute(sql, (source, signal)) | ||
min_issue_lookup = {f'{x[0]}:{x[1]}': x[2] for x in self._cursor} | ||
|
||
# calculate statistics for the latest issue entries | ||
sql = ''' | ||
SELECT | ||
t.`source` AS `data_source`, | ||
t.`signal`, | ||
t.`time_type`, | ||
t.`geo_type`, | ||
MIN(t.`time_value`) AS `min_time`, | ||
MAX(t.`time_value`) AS `max_time`, | ||
COUNT(DISTINCT t.`geo_value`) AS `num_locations`, | ||
`source` AS `data_source`, | ||
`signal`, | ||
`time_type`, | ||
`geo_type`, | ||
MIN(`time_value`) AS `min_time`, | ||
MAX(`time_value`) AS `max_time`, | ||
COUNT(DISTINCT `geo_value`) AS `num_locations`, | ||
MIN(`value`) AS `min_value`, | ||
MAX(`value`) AS `max_value`, | ||
ROUND(AVG(`value`),7) AS `mean_value`, | ||
ROUND(STD(`value`),7) AS `stdev_value`, | ||
MAX(`value_updated_timestamp`) AS `last_update`, | ||
MIN(`issue`) as `min_issue`, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i believe this is not going to work as intended, and instead will show the issue date of the least-recently-updated of the issues in a group, which i would argue is meaningless (plz correct me if im wrong). i think it will require a MIN(issue) inside the sub-SELECT and appropriate handling outside of that. also, some non-trivial test cases would be nice, with different issue values. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. true, I will think about it how to fix it. Moreover, we can think of using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @melange396 can you take a look at the new statements. I split it into two, since I couldn't think of a fast way to handle both cases at the same time. Moreover, I switched the nested statement to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i suppose this will work, but the old SQL query would have done the trick if you added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes but I think this version should be more efficient using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. after wrestling with and waiting for long-duration queries for the past week, i think it might be wise to test the performance of the one-query vs two-query approaches on the staging server. i can certainly help do this if necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Apologies for being so late to this conversation. The use case for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i will go ahead and try this and other variants out on the staging copy of the database to check performance and runtimes |
||
MAX(`issue`) as `max_issue`, | ||
MIN(`lag`) as `min_lag`, | ||
MAX(`lag`) as `max_lag` | ||
FROM | ||
`covidcast` t | ||
JOIN | ||
( | ||
SELECT | ||
max(`issue`) `max_issue`, | ||
`time_type`, | ||
`time_value`, | ||
`source`, | ||
`signal`, | ||
`geo_type`, | ||
`geo_value` | ||
FROM | ||
`covidcast` | ||
WHERE | ||
`source` = %s AND | ||
`signal` = %s | ||
GROUP BY | ||
`time_value`, | ||
`time_type`, | ||
`geo_type`, | ||
`geo_value` | ||
) x | ||
ON | ||
x.`max_issue` = t.`issue` AND | ||
x.`time_type` = t.`time_type` AND | ||
x.`time_value` = t.`time_value` AND | ||
x.`source` = t.`source` AND | ||
x.`signal` = t.`signal` AND | ||
x.`geo_type` = t.`geo_type` AND | ||
x.`geo_value` = t.`geo_value` | ||
`covidcast` | ||
WHERE | ||
`source` = %s AND | ||
`signal` = %s AND | ||
`is_latest_issue` is TRUE | ||
GROUP BY | ||
t.`time_type`, | ||
t.`geo_type` | ||
`time_type`, | ||
`geo_type` | ||
ORDER BY | ||
t.`time_type` ASC, | ||
t.`geo_type` ASC | ||
`time_type` ASC, | ||
`geo_type` ASC | ||
''' | ||
self._cursor.execute(sql, (source, signal)) | ||
meta.extend(list(dict(zip(self._cursor.column_names,x)) for x in self._cursor)) | ||
|
||
for x in self._cursor: | ||
entry = dict(zip(self._cursor.column_names, x)) | ||
# merge in the min issue | ||
key = f"{entry['time_type']}:{entry['geo_type']}" | ||
entry['min_issue'] = min_issue_lookup.get(key, entry['max_issue']) | ||
meta.append(entry) | ||
return meta | ||
|
||
def update_covidcast_meta_cache(self, metadata): | ||
"""Updates the `covidcast_meta_cache` table.""" | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.