Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pylint alerts corrections as part of an intervention experiment 1560 #1568

Open
wants to merge 37 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
93c259e
src\maintenance\covidcast_meta_cache_updater.py superfluous-parens
evidencebp Dec 14, 2024
8282f0b
src\server\_query.py comparison-of-constants
evidencebp Dec 14, 2024
24c0557
src\acquisition\quidel\quidel.py too-many-branches
evidencebp Dec 14, 2024
bc23b26
src\acquisition\cdcp\cdc_upload.py too-many-statements
evidencebp Dec 14, 2024
f217d2f
src\server\_printer.py too-many-return-statements
evidencebp Dec 14, 2024
f20aeea
src\acquisition\covidcast\csv_importer.py too-many-return-statements
evidencebp Dec 14, 2024
d2c4147
src\acquisition\covid_hosp\common\database.py too-many-branches
evidencebp Dec 14, 2024
b25edb7
src\server\endpoints\covidcast.py too-many-statements
evidencebp Dec 14, 2024
1310e15
src\acquisition\wiki\wiki_download.py too-many-branches
evidencebp Dec 15, 2024
fa85df2
src\acquisition\ght\ght_update.py too-many-statements
evidencebp Dec 16, 2024
06b088a
src\acquisition\nidss\taiwan_update.py wildcard-import
evidencebp Dec 16, 2024
a3251c5
src\client\delphi_epidata.py broad-exception-caught
evidencebp Dec 16, 2024
6f627b9
src\acquisition\kcdc\kcdc_update.py broad-exception-caught
evidencebp Dec 16, 2024
8d1c155
src\server\endpoints\sensors.py line-too-long
evidencebp Dec 16, 2024
e229426
src\acquisition\paho\paho_db_update.py line-too-long
evidencebp Dec 16, 2024
3accb56
src\server\_limiter.py line-too-long
evidencebp Dec 16, 2024
241a518
src\server\covidcast_issues_migration\proc_db_backups_pd.py line-too-…
evidencebp Dec 16, 2024
a07e7ca
src\maintenance\remove_outdated_keys.py line-too-long
evidencebp Dec 16, 2024
42612bf
src\acquisition\wiki\wiki_util.py line-too-long
evidencebp Dec 16, 2024
18cef97
src\acquisition\fluview\impute_missing_values.py line-too-long
evidencebp Dec 16, 2024
a3099a5
scripts\report_missing_covidcast_meta.py line-too-long
evidencebp Dec 16, 2024
c28759d
src\server\endpoints\fluview_meta.py line-too-long
evidencebp Dec 16, 2024
dd809dc
src\server\endpoints\covidcast_meta.py line-too-long
evidencebp Dec 16, 2024
15402fd
src\server\utils\__init__.py line-too-long
evidencebp Dec 16, 2024
937012f
src\maintenance\update_last_usage.py line-too-long
evidencebp Dec 16, 2024
cddf0af
src\acquisition\ght\google_health_trends.py line-too-long
evidencebp Dec 16, 2024
f14852c
src\server\endpoints\covid_hosp_facility_lookup.py line-too-long
evidencebp Dec 16, 2024
a921f7b
src\server\_pandas.py line-too-long
evidencebp Dec 16, 2024
99d03a5
src\server\endpoints\covidcast_utils\meta.py line-too-long
evidencebp Dec 16, 2024
106d5c0
src\server\main.py line-too-long
evidencebp Dec 16, 2024
282188e
src\server\_exceptions.py line-too-long
evidencebp Dec 16, 2024
33b5e0b
src\acquisition\twtr\healthtweets.py line-too-long
evidencebp Dec 16, 2024
59a0ba1
src\maintenance\signal_dash_data_generator.py line-too-long
evidencebp Dec 16, 2024
35488ff
src\server\endpoints\covid_hosp_facility.py line-too-long
evidencebp Dec 16, 2024
d24a641
src\server\endpoints\delphi.py line-too-long
evidencebp Dec 16, 2024
a0f9a3b
src\common\covidcast_row.py line-too-long
evidencebp Dec 16, 2024
7f8b16c
src\acquisition\covid_hosp\common\utils.py line-too-long
evidencebp Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion scripts/report_missing_covidcast_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ def compute_missing_signals() -> List[Tuple[Tuple[str, str], Dict]]:


def gen_row(source: str, signal: str, info: Dict) -> Dict:
is_weighted = signal.startswith('smoothed_w') and not (signal.startswith('smoothed_wa') or signal.startswith('smoothed_we') or signal.startswith('smoothed_wi') or signal.startswith('smoothed_wo') or signal.startswith('smoothed_wu'))
is_weighted = (signal.startswith('smoothed_w')
and not (signal.startswith('smoothed_wa')
or signal.startswith('smoothed_we')
or signal.startswith('smoothed_wi')
or signal.startswith('smoothed_wo')
or signal.startswith('smoothed_wu')))
base_name = signal.replace('smoothed_w', 'smoothed_') if is_weighted else signal
bool_str = lambda x: 'TRUE' if x else 'FALSE'

Expand Down
137 changes: 69 additions & 68 deletions src/acquisition/cdcp/cdc_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,80 +161,81 @@
`total` = %s
"""

# insert (or update) table `cdc`
def insert_cdc(cur, date, page, state, num):
cur.execute(sql_cdc, (date, page, state, num, num))

# insert (or update) table `cdc_meta`
def insert_cdc_meta(cur, date, state, total):
cur.execute(sql_cdc_meta, (date, date, state, total, total))

# loop over rows until the header row is found
def find_header(reader):
for row in reader:
if len(row) > 0 and row[0] == "Date":
return True
return False

# parse csv files for `cdc` and `cdc_meta`
def parse_csv(cur, meta):
def handler(cur, reader):
if not find_header(reader):
raise Exception("header not found")
count = 0
cols = 3 if meta else 4
for row in reader:
if len(row) != cols:
continue
if meta:
(a, c, d) = row
else:
(a, b, c, d) = row
c = c[:-16]
if c not in STATES:
continue
a = datetime.strptime(a, "%b %d, %Y").strftime("%Y-%m-%d")
c = STATES[c]
d = int(d)
if meta:
insert_cdc_meta(cur, a, c, d)
else:
insert_cdc(cur, a, b, c, d)
count += 1
return count

return handler


# recursively open zip files
def parse_zip(cur, zf, level=1):
for name in zf.namelist():
prefix = " " * level
print(prefix, name)
if name[-4:] == ".zip":
with zf.open(name) as temp:
with ZipFile(io.BytesIO(temp.read())) as zf2:
parse_zip(cur, zf2, level + 1)
elif name[-4:] == ".csv":
handler = None
if "Flu Pages by Region" in name:
handler = parse_csv(cur, False)
elif "Regions for all CDC" in name:
handler = parse_csv(cur, True)
else:
print(prefix, " (skipped)")
if handler is not None:
with zf.open(name) as temp:
count = handler(csv.reader(io.StringIO(str(temp.read(), "utf-8"))))
print(prefix, f" {int(count)} rows")
else:
print(prefix, " (ignored)")

def upload(test_mode):
# connect
u, p = secrets.db.epi
cnx = mysql.connector.connect(user=u, password=p, database="epidata")
cur = cnx.cursor()

# insert (or update) table `cdc`
def insert_cdc(date, page, state, num):
cur.execute(sql_cdc, (date, page, state, num, num))

# insert (or update) table `cdc_meta`
def insert_cdc_meta(date, state, total):
cur.execute(sql_cdc_meta, (date, date, state, total, total))

# loop over rows until the header row is found
def find_header(reader):
for row in reader:
if len(row) > 0 and row[0] == "Date":
return True
return False

# parse csv files for `cdc` and `cdc_meta`
def parse_csv(meta):
def handler(reader):
if not find_header(reader):
raise Exception("header not found")
count = 0
cols = 3 if meta else 4
for row in reader:
if len(row) != cols:
continue
if meta:
(a, c, d) = row
else:
(a, b, c, d) = row
c = c[:-16]
if c not in STATES:
continue
a = datetime.strptime(a, "%b %d, %Y").strftime("%Y-%m-%d")
c = STATES[c]
d = int(d)
if meta:
insert_cdc_meta(a, c, d)
else:
insert_cdc(a, b, c, d)
count += 1
return count

return handler

# recursively open zip files
def parse_zip(zf, level=1):
for name in zf.namelist():
prefix = " " * level
print(prefix, name)
if name[-4:] == ".zip":
with zf.open(name) as temp:
with ZipFile(io.BytesIO(temp.read())) as zf2:
parse_zip(zf2, level + 1)
elif name[-4:] == ".csv":
handler = None
if "Flu Pages by Region" in name:
handler = parse_csv(False)
elif "Regions for all CDC" in name:
handler = parse_csv(True)
else:
print(prefix, " (skipped)")
if handler is not None:
with zf.open(name) as temp:
count = handler(csv.reader(io.StringIO(str(temp.read(), "utf-8"))))
print(prefix, f" {int(count)} rows")
else:
print(prefix, " (ignored)")

# find, parse, and move zip files
zip_files = glob.glob("/common/cdc_stage/*.zip")
Expand All @@ -244,7 +245,7 @@ def parse_zip(zf, level=1):
print("parsing...")
for f in zip_files:
with ZipFile(f) as zf:
parse_zip(zf)
parse_zip(cur, zf)
print("moving...")
for f in zip_files:
src = f
Expand Down
73 changes: 42 additions & 31 deletions src/acquisition/covid_hosp/common/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,37 +192,14 @@ def nan_safe_dtype(dtype, value):
num_values = len(dataframe.index)
if logger:
logger.info('updating values', count=num_values)
n = 0
rows_affected = 0
many_values = []
with self.new_cursor() as cursor:
for index, row in dataframe.iterrows():
values = []
for c in dataframe_columns_and_types:
values.append(nan_safe_dtype(c.dtype, row[c.csv_name]))
many_values.append(id_and_publication_date +
tuple(values) +
tuple(i.csv_name for i in self.additional_fields))
n += 1
# insert in batches because one at a time is slow and all at once makes
# the connection drop :(
if n % 5_000 == 0:
try:
cursor.executemany(sql, many_values)
rows_affected += cursor.rowcount
many_values = []
except Exception as e:
if logger:
logger.error('error on insert', publ_date=publication_date, in_lines=(n-5_000, n), index=index, values=values, exception=e)
raise e
# insert final batch
if many_values:
cursor.executemany(sql, many_values)
rows_affected += cursor.rowcount
if logger:
# NOTE: REPLACE INTO marks 2 rows affected for a "replace" (one for a delete and one for a re-insert)
# which allows us to count rows which were updated
logger.info('rows affected', total=rows_affected, updated=rows_affected-num_values)
self._process_rows(publication_date
, dataframe
, logger
, dataframe_columns_and_types
, nan_safe_dtype
, sql
, id_and_publication_date
, num_values)

# deal with non/seldomly updated columns used like a fk table (if this database needs it)
if hasattr(self, 'AGGREGATE_KEY_COLS'):
Expand Down Expand Up @@ -261,6 +238,40 @@ def nan_safe_dtype(dtype, value):
with self.new_cursor() as cur:
cur.executemany(ak_insert_sql, ak_data)

def _process_rows(self, publication_date, dataframe, logger, dataframe_columns_and_types, nan_safe_dtype, sql
, id_and_publication_date, num_values):
n = 0
rows_affected = 0
many_values = []
with self.new_cursor() as cursor:
for index, row in dataframe.iterrows():
values = []
for c in dataframe_columns_and_types:
values.append(nan_safe_dtype(c.dtype, row[c.csv_name]))
many_values.append(id_and_publication_date +
tuple(values) +
tuple(i.csv_name for i in self.additional_fields))
n += 1
# insert in batches because one at a time is slow and all at once makes
# the connection drop :(
if n % 5_000 == 0:
try:
cursor.executemany(sql, many_values)
rows_affected += cursor.rowcount
many_values = []
except Exception as e:
if logger:
logger.error('error on insert', publ_date=publication_date, in_lines=(n-5_000, n), index=index, values=values, exception=e)
raise e
# insert final batch
if many_values:
cursor.executemany(sql, many_values)
rows_affected += cursor.rowcount
if logger:
# NOTE: REPLACE INTO marks 2 rows affected for a "replace" (one for a delete and one for a re-insert)
# which allows us to count rows which were updated
logger.info('rows affected', total=rows_affected, updated=rows_affected-num_values)


def get_max_issue(self, logger=False):
"""Fetch the most recent issue.
Expand Down
8 changes: 6 additions & 2 deletions src/acquisition/covid_hosp/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ def issues_to_fetch(metadata, newer_than, older_than, logger=False):
if logger:
if n_beyond > 0:
logger.info("issues available beyond selection", on_or_newer=older_than, count=n_beyond)
logger.info("issues selected", newer_than=str(newer_than), older_than=str(older_than), count=n_selected)
logger.info("issues selected"
, newer_than=str(newer_than)
, older_than=str(older_than)
, count=n_selected)
return daily_issues

@staticmethod
Expand Down Expand Up @@ -239,7 +242,8 @@ def update_dataset(database, network, newer_than=None, older_than=None):
all_metadata
))
tot_revs = sum(len(revisions) for revisions in daily_issues.values())
logger.info(f"{len(daily_issues)} issues checked w/ {tot_revs} revisions, resulting in {len(datasets)} datasets.")
logger.info(f"{len(daily_issues)} issues checked w/ {tot_revs} revisions"
+ f", resulting in {len(datasets)} datasets.")
if not datasets:
logger.info("nothing to do, exiting")
return False
Expand Down
Loading