From 620b647cbe0641d64f621ee07b7295ce3e995a3e Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 19 Sep 2024 13:09:18 -0400 Subject: [PATCH 01/13] first implimentation in progress --- .../delphi_quidel_covidtest/patch.py | 82 +++++++++++++++++++ .../delphi_quidel_covidtest/pull.py | 34 ++++---- 2 files changed, 101 insertions(+), 15 deletions(-) create mode 100644 quidel_covidtest/delphi_quidel_covidtest/patch.py diff --git a/quidel_covidtest/delphi_quidel_covidtest/patch.py b/quidel_covidtest/delphi_quidel_covidtest/patch.py new file mode 100644 index 000000000..cf9e778b3 --- /dev/null +++ b/quidel_covidtest/delphi_quidel_covidtest/patch.py @@ -0,0 +1,82 @@ +""" +This module is used for patching data in the delphi_doctor_visits package. + +To use this module, you need to specify the range of issue dates in params.json, like so: + +{ + "common": { + ... + }, + "validation": { + ... + }, + "patch": { + "patch_dir": "/Users/minhkhuele/Desktop/delphi/covidcast-indicators/doctor_visits/AprilPatch", + "start_issue": "2024-04-20", + "end_issue": "2024-04-21" + } +} + +It will generate data for that range of issue dates, and store them in batch issue format: +[name-of-patch]/issue_[issue-date]/quidel_covidtest/actual_data_file.csv +""" +import time +from datetime import datetime, timedelta +from os import makedirs + +from delphi_utils import get_structured_logger, read_params + +from .pull import preprocess_new_data +from .run import run_module + +def grab_source(params, logger): + start_time = time.time() + cache_dir = params['input_cache_dir'] + df, _ = preprocess_new_data(params["patch"]["start_issue"], params["patch"]["end_issue"], params, params["test_mode"], logger) + end_issue = params["patch"]["end_issue"].strip("-") + df.to_csv(f"{cache_dir}/pulled_until_{end_issue}.csv", index=False) + logger.info("Completed cache file update", + end_date = end_issue.strftime('%Y-%m-%d'), + elapsed_time_in_seconds = round(time.time() - start_time, 2)) + + + +def patch(): + """ + Run the quidel_covidtest indicator for a range of issue dates. + + The range of issue dates is specified in params.json using the following keys: + - "patch": Only used for patching data + - "start_date": str, YYYY-MM-DD format, first issue date + - "end_date": str, YYYY-MM-DD format, last issue date + - "patch_dir": str, directory to write all issues output + """ + params = read_params() + logger = get_structured_logger("delphi_quidel_covidtest.patch", filename=params["common"]["log_filename"]) + + start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") + end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") + + logger.info( + "Starting patching", + patch_directory=params["patch"]["patch_dir"], + start_issue=start_issue.strftime("%Y-%m-%d"), + end_issue=end_issue.strftime("%Y-%m-%d"), + ) + + makedirs(params["patch"]["patch_dir"], exist_ok=True) + + current_issue = start_issue + + logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d")) + + current_issue_yyyymmdd = current_issue.strftime("%Y%m%d") + current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/quidel_covidtest""" + makedirs(f"{current_issue_dir}", exist_ok=True) + params["common"]["export_dir"] = f"""{current_issue_dir}""" + + run_module(params, logger) + + +if __name__ == "__main__": + patch() diff --git a/quidel_covidtest/delphi_quidel_covidtest/pull.py b/quidel_covidtest/delphi_quidel_covidtest/pull.py index 560f89456..d07cdf4f8 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/pull.py +++ b/quidel_covidtest/delphi_quidel_covidtest/pull.py @@ -295,21 +295,25 @@ def pull_quidel_covidtest(params, logger): else: pull_end_date = datetime.strptime(params["pull_end_date"], '%Y-%m-%d') - # Pull data from the file at 5 digit zipcode level - # Use _end_date to check the most recent date that we received data - df, _end_date = preprocess_new_data( - pull_start_date, pull_end_date, params, test_mode, logger) - - # Utilize previously stored data - if previous_df is not None: - df = pd.concat( - [previous_df, df] - ).groupby( - ["timestamp", "zip"] - ).sum( - numeric_only=True - ).reset_index( - ) + if not params.get("custom_run", False): + # Pull data from the file at 5 digit zipcode level + # Use _end_date to check the most recent date that we received data + df, _end_date = preprocess_new_data( + pull_start_date, pull_end_date, params, test_mode, logger) + + # Utilize previously stored data + if previous_df is not None: + df = pd.concat( + [previous_df, df] + ).groupby( + ["timestamp", "zip"] + ).sum( + numeric_only=True + ).reset_index( + ) + else: + df = previous_df[previous_df["timestamp"] == params["pull_start_date"]] + _end_date = datetime.strptime(params["params"]["end_issue"], '%Y-%m-%d') return df, _end_date def check_export_end_date(input_export_end_date, _end_date, From c7b117da38f611f15a9a0149c5b2a4671f46c328 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Mon, 23 Sep 2024 10:15:23 -0400 Subject: [PATCH 02/13] in progress --- .../delphi_quidel_covidtest/patch.py | 52 +++++++++++++------ .../delphi_quidel_covidtest/pull.py | 4 +- .../delphi_quidel_covidtest/run.py | 12 +++-- 3 files changed, 46 insertions(+), 22 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/patch.py b/quidel_covidtest/delphi_quidel_covidtest/patch.py index cf9e778b3..75126dd4a 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/patch.py +++ b/quidel_covidtest/delphi_quidel_covidtest/patch.py @@ -23,6 +23,7 @@ import time from datetime import datetime, timedelta from os import makedirs +from pathlib import Path from delphi_utils import get_structured_logger, read_params @@ -30,17 +31,35 @@ from .run import run_module def grab_source(params, logger): + """ + Grab source data for patch range ahead of time. + + Parameters + ---------- + params + logger + + Returns + ------- + + """ + start_issue = params["patch"]["start_issue"] + end_issue = params["patch"]["end_issue"] + end_date = datetime.strptime(end_issue, "%Y-%m-%d") + + cache_dir = params["indicator"]['input_cache_dir'] + filename = f"{cache_dir}/pulled_until_{(end_date + timedelta(days=1)).strftime('%Y%m%d')}.csv" + if Path(filename).is_file(): + return start_time = time.time() - cache_dir = params['input_cache_dir'] - df, _ = preprocess_new_data(params["patch"]["start_issue"], params["patch"]["end_issue"], params, params["test_mode"], logger) - end_issue = params["patch"]["end_issue"].strip("-") - df.to_csv(f"{cache_dir}/pulled_until_{end_issue}.csv", index=False) + start_date = datetime.strptime(start_issue, "%Y-%m-%d") + df, _ = preprocess_new_data(start_date, end_date, params["indicator"], params["indicator"]["test_mode"], logger) + df.to_csv(filename, index=False) logger.info("Completed cache file update", - end_date = end_issue.strftime('%Y-%m-%d'), + start_issue=start_issue, + end_issue = end_issue, elapsed_time_in_seconds = round(time.time() - start_time, 2)) - - def patch(): """ Run the quidel_covidtest indicator for a range of issue dates. @@ -63,19 +82,22 @@ def patch(): start_issue=start_issue.strftime("%Y-%m-%d"), end_issue=end_issue.strftime("%Y-%m-%d"), ) - + params["common"]["custom_run"] = True makedirs(params["patch"]["patch_dir"], exist_ok=True) - + grab_source(params, logger) current_issue = start_issue - logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d")) + while current_issue <= end_issue: + logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d")) - current_issue_yyyymmdd = current_issue.strftime("%Y%m%d") - current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/quidel_covidtest""" - makedirs(f"{current_issue_dir}", exist_ok=True) - params["common"]["export_dir"] = f"""{current_issue_dir}""" + current_issue_yyyymmdd = current_issue.strftime("%Y%m%d") + current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/quidel_covidtest""" + makedirs(f"{current_issue_dir}", exist_ok=True) + params["common"]["export_dir"] = f"""{current_issue_dir}""" + params["indicator"]["pull_start_date"] = current_issue.strftime("%Y-%m-%d") - run_module(params, logger) + run_module(params, logger) + current_issue += timedelta(days=1) if __name__ == "__main__": diff --git a/quidel_covidtest/delphi_quidel_covidtest/pull.py b/quidel_covidtest/delphi_quidel_covidtest/pull.py index d07cdf4f8..fea327975 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/pull.py +++ b/quidel_covidtest/delphi_quidel_covidtest/pull.py @@ -313,7 +313,7 @@ def pull_quidel_covidtest(params, logger): ) else: df = previous_df[previous_df["timestamp"] == params["pull_start_date"]] - _end_date = datetime.strptime(params["params"]["end_issue"], '%Y-%m-%d') + _end_date = pull_start_date return df, _end_date def check_export_end_date(input_export_end_date, _end_date, @@ -347,7 +347,7 @@ def check_export_end_date(input_export_end_date, _end_date, def check_export_start_date(export_start_date, export_end_date, export_day_range): """ - Ensure that the starte date, end date, and day range are mutually consistent. + Ensure that the start date, end date, and day range are mutually consistent. Parameters: export_start_date: str diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index e6974b6aa..b010df8b7 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -92,7 +92,7 @@ def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, de remove_null_samples=True) # for parent geo, remove null sample size return dates -def run_module(params: Dict[str, Any]): +def run_module(params: Dict[str, Any], logger=None): """Run the quidel_covidtest indicator. The `params` argument is expected to have the following structure: @@ -117,9 +117,10 @@ def run_module(params: Dict[str, Any]): - "test_mode": bool, whether we are running in test mode """ start_time = time.time() - logger = get_structured_logger( - __name__, filename=params["common"].get("log_filename"), - log_exceptions=params["common"].get("log_exceptions", True)) + if logger is None: + logger = get_structured_logger( + __name__, filename=params["common"].get("log_filename"), + log_exceptions=params["common"].get("log_exceptions", True)) stats = [] # Log at program exit in case of an exception, otherwise after successful completion atexit.register(log_exit, start_time, stats, logger) @@ -224,7 +225,8 @@ def run_module(params: Dict[str, Any]): # Export the cache file if the pipeline runs successfully. # Otherwise, don't update the cache file - update_cache_file(df, _end_date, cache_dir, logger) + if not params["common"].get("custom_run", False): + update_cache_file(df, _end_date, cache_dir, logger) # Log stats now instead of at program exit atexit.unregister(log_exit) log_exit(start_time, stats, logger) From 507c2b26ad723f11d9c126156a626f36d2d32a80 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 26 Sep 2024 12:44:06 -0400 Subject: [PATCH 03/13] in progress --- .../delphi_quidel_covidtest/constants.py | 5 ++ .../delphi_quidel_covidtest/patch.py | 44 +++-------- .../delphi_quidel_covidtest/pull.py | 79 ++++++++----------- quidel_covidtest/tests/test_patch.py | 0 4 files changed, 49 insertions(+), 79 deletions(-) create mode 100644 quidel_covidtest/tests/test_patch.py diff --git a/quidel_covidtest/delphi_quidel_covidtest/constants.py b/quidel_covidtest/delphi_quidel_covidtest/constants.py index 8e4d37cb2..0bc2cff98 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/constants.py +++ b/quidel_covidtest/delphi_quidel_covidtest/constants.py @@ -1,4 +1,6 @@ """Registry for constants.""" +from datetime import datetime + # global constants MIN_OBS = 50 # minimum number of observations in order to compute a proportion. POOL_DAYS = 7 # number of days in the past (including today) to pool over @@ -49,3 +51,6 @@ "age_65plus", "age_0_17", ] + +FULL_BKFILL_START_DATE = datetime(2020, 5, 26) + diff --git a/quidel_covidtest/delphi_quidel_covidtest/patch.py b/quidel_covidtest/delphi_quidel_covidtest/patch.py index 75126dd4a..96c13dc95 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/patch.py +++ b/quidel_covidtest/delphi_quidel_covidtest/patch.py @@ -27,38 +27,8 @@ from delphi_utils import get_structured_logger, read_params -from .pull import preprocess_new_data from .run import run_module - -def grab_source(params, logger): - """ - Grab source data for patch range ahead of time. - - Parameters - ---------- - params - logger - - Returns - ------- - - """ - start_issue = params["patch"]["start_issue"] - end_issue = params["patch"]["end_issue"] - end_date = datetime.strptime(end_issue, "%Y-%m-%d") - - cache_dir = params["indicator"]['input_cache_dir'] - filename = f"{cache_dir}/pulled_until_{(end_date + timedelta(days=1)).strftime('%Y%m%d')}.csv" - if Path(filename).is_file(): - return - start_time = time.time() - start_date = datetime.strptime(start_issue, "%Y-%m-%d") - df, _ = preprocess_new_data(start_date, end_date, params["indicator"], params["indicator"]["test_mode"], logger) - df.to_csv(filename, index=False) - logger.info("Completed cache file update", - start_issue=start_issue, - end_issue = end_issue, - elapsed_time_in_seconds = round(time.time() - start_time, 2)) +from .constants import END_FROM_TODAY_MINUS def patch(): """ @@ -82,11 +52,13 @@ def patch(): start_issue=start_issue.strftime("%Y-%m-%d"), end_issue=end_issue.strftime("%Y-%m-%d"), ) - params["common"]["custom_run"] = True makedirs(params["patch"]["patch_dir"], exist_ok=True) - grab_source(params, logger) + export_day_range = params["indicator"]["export_day_range"] + current_issue = start_issue + export_day_range += END_FROM_TODAY_MINUS + while current_issue <= end_issue: logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d")) @@ -94,11 +66,13 @@ def patch(): current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/quidel_covidtest""" makedirs(f"{current_issue_dir}", exist_ok=True) params["common"]["export_dir"] = f"""{current_issue_dir}""" - params["indicator"]["pull_start_date"] = current_issue.strftime("%Y-%m-%d") + calculated_start_date = current_issue - timedelta(export_day_range) + calculated_end_date = current_issue + params["indicator"]["pull_start_date"] = calculated_start_date.strftime("%Y-%m-%d") + params["indicator"]["pull_end_date"] = calculated_end_date.strftime("%Y-%m-%d") run_module(params, logger) current_issue += timedelta(days=1) - if __name__ == "__main__": patch() diff --git a/quidel_covidtest/delphi_quidel_covidtest/pull.py b/quidel_covidtest/delphi_quidel_covidtest/pull.py index fea327975..4ea99a139 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/pull.py +++ b/quidel_covidtest/delphi_quidel_covidtest/pull.py @@ -9,15 +9,17 @@ import pandas as pd import numpy as np -from .constants import AGE_GROUPS +from .constants import AGE_GROUPS, FULL_BKFILL_START_DATE -def get_from_s3(start_date, end_date, bucket, logger): +def get_from_s3(params, start_date, end_date, logger): """ Get raw data from aws s3 bucket. Args: + params: dict + read from params.json start_date: datetime.datetime pull data from file tagged with date on/after the start date end_date: datetime.datetime @@ -30,6 +32,15 @@ def get_from_s3(start_date, end_date, bucket, logger): df: pd.DataFrame time_flag: datetime.datetime """ + # connect aws s3 bucket + aws_access_key_id = params["aws_credentials"]["aws_access_key_id"] + aws_secret_access_key = params["aws_credentials"]["aws_secret_access_key"] + bucket_name = params["bucket_name"] + + s3 = boto3.resource('s3', aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key) + bucket = s3.Bucket(bucket_name) + time_flag = None selected_columns = ['SofiaSerNum', 'TestDate', 'Facility', 'City', 'State', 'Zip', 'PatientAge', 'Result1', @@ -118,7 +129,7 @@ def fix_date(df, logger): df["timestamp"].values[mask] = df["StorageDate"].values[mask] return df -def preprocess_new_data(start_date, end_date, params, test_mode, logger): +def preprocess_new_data(start_date, end_date, params, logger): """ Pull and pre-process Quidel Covid Test data. @@ -133,8 +144,6 @@ def preprocess_new_data(start_date, end_date, params, test_mode, logger): pull data from file tagged with date on/before the end date params: dict read from params.json - test_mode: bool - pull raw data from s3 or not logger: logging.Logger The structured logger. output: @@ -142,23 +151,8 @@ def preprocess_new_data(start_date, end_date, params, test_mode, logger): time_flag: datetime.date: the actual pull end date on which we successfully pull the data """ - if test_mode: - test_data_dir = "./test_data/test_data.csv" - df, time_flag = pd.read_csv( - test_data_dir, - parse_dates=["StorageDate", "TestDate"] - ), datetime(2020, 8, 17) - else: - # connect aws s3 bucket - aws_access_key_id = params["aws_credentials"]["aws_access_key_id"] - aws_secret_access_key = params["aws_credentials"]["aws_secret_access_key"] - bucket_name = params["bucket_name"] - - s3 = boto3.resource('s3', aws_access_key_id=aws_access_key_id, - aws_secret_access_key=aws_secret_access_key) - bucket = s3.Bucket(bucket_name) - # Get new data from s3 - df, time_flag = get_from_s3(start_date, end_date, bucket, logger) + # Get new data from s3 + df, time_flag = get_from_s3(params, start_date, end_date, logger) # No new data can be pulled if time_flag is None: @@ -283,37 +277,34 @@ def pull_quidel_covidtest(params, logger): """ cache_dir = params["input_cache_dir"] - test_mode = params["test_mode"] - # pull new data only that has not been ingested previous_df, pull_start_date = check_intermediate_file( cache_dir, datetime.strptime(params["pull_start_date"], '%Y-%m-%d')) + if params["pull_start_date"] != "": + pull_start_date = datetime.strptime(params["pull_start_date"], '%Y-%m-%d') + if params["pull_end_date"] == "": pull_end_date = datetime.today() else: pull_end_date = datetime.strptime(params["pull_end_date"], '%Y-%m-%d') - if not params.get("custom_run", False): - # Pull data from the file at 5 digit zipcode level - # Use _end_date to check the most recent date that we received data - df, _end_date = preprocess_new_data( - pull_start_date, pull_end_date, params, test_mode, logger) - - # Utilize previously stored data - if previous_df is not None: - df = pd.concat( - [previous_df, df] - ).groupby( - ["timestamp", "zip"] - ).sum( - numeric_only=True - ).reset_index( - ) - else: - df = previous_df[previous_df["timestamp"] == params["pull_start_date"]] - _end_date = pull_start_date + # Pull data from the file at 5 digit zipcode level + # Use _end_date to check the most recent date that we received data + df, _end_date = preprocess_new_data( + pull_start_date, pull_end_date, params, logger) + + # Utilize previously stored data + if previous_df is not None: + df = pd.concat( + [previous_df, df] + ).groupby( + ["timestamp", "zip"] + ).sum( + numeric_only=True + ).reset_index( + ) return df, _end_date def check_export_end_date(input_export_end_date, _end_date, @@ -363,7 +354,7 @@ def check_export_start_date(export_start_date, export_end_date, """ if export_start_date == "": - export_start_date = datetime(2020, 5, 26) + export_start_date = FULL_BKFILL_START_DATE else: export_start_date = datetime.strptime(export_start_date, '%Y-%m-%d') # Only export data from -50 days to -5 days diff --git a/quidel_covidtest/tests/test_patch.py b/quidel_covidtest/tests/test_patch.py new file mode 100644 index 000000000..e69de29bb From 71a5f6c0507d1f7d3f1524ea4da1c84aecbaa56e Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 26 Sep 2024 12:46:26 -0400 Subject: [PATCH 04/13] making test more robust in progress --- quidel_covidtest/tests/conftest.py | 59 +++++++++++++++++++++++----- quidel_covidtest/tests/test_patch.py | 8 ++++ quidel_covidtest/tests/test_pull.py | 3 +- quidel_covidtest/tests/test_run.py | 51 +++++++++--------------- 4 files changed, 77 insertions(+), 44 deletions(-) diff --git a/quidel_covidtest/tests/conftest.py b/quidel_covidtest/tests/conftest.py index c5ebcf630..67d4ccbee 100644 --- a/quidel_covidtest/tests/conftest.py +++ b/quidel_covidtest/tests/conftest.py @@ -1,16 +1,57 @@ # -*- coding: utf-8 -*- - +from datetime import datetime from os.path import join import os +from pathlib import Path + +import mock +import pandas as pd import pytest +from mock.mock import patch + +import delphi_quidel_covidtest.run + +TEST_DIR = Path(__file__).parent +SOURCE_DIR = Path(__file__).parent.parent +@pytest.fixture(scope='session') +def params(): + PARAMS = { + "common": { + "export_dir": f"{TEST_DIR}/receiving" + }, + "indicator": { + "static_file_dir": f"{SOURCE_DIR}/static", + "input_cache_dir": f"{TEST_DIR}/cache", + "backfill_dir": f"{TEST_DIR}/backfill", + "backfill_merge_day": 0, + "export_start_date": "2020-06-30", + "export_end_date": "", + "pull_start_date": "2020-07-09", + "pull_end_date":"", + "export_day_range":40, + "aws_credentials": { + "aws_access_key_id": "", + "aws_secret_access_key": "" + }, + "bucket_name": "", + "wip_signal": "", + "test_mode": True + } + } + return PARAMS +@pytest.fixture(scope="session", autouse=True) +def mock_get_from_s3(): + with patch("delphi_quidel_covidtest.pull.get_from_s3") as m: + test_data_dir = "./test_data/test_data.csv" + time_flag = datetime(2020, 8, 17) + df = pd.read_csv( + test_data_dir, + parse_dates=["StorageDate", "TestDate"] + ) + m.return_value = df, time_flag + yield m @pytest.fixture(scope="session") -def clean_receiving_dir(): - # Clean receiving directory - for fname in os.listdir("receiving"): - if ".csv" in fname: - os.remove(join("receiving", fname)) - for fname in os.listdir("cache"): - if ".csv" in fname: - os.remove(join("cache", fname)) +def run_as_module(params, mock_get_from_s3): + delphi_quidel_covidtest.run.run_module(params) diff --git a/quidel_covidtest/tests/test_patch.py b/quidel_covidtest/tests/test_patch.py index e69de29bb..e5f72b90c 100644 --- a/quidel_covidtest/tests/test_patch.py +++ b/quidel_covidtest/tests/test_patch.py @@ -0,0 +1,8 @@ + +class TestPatch: + def generate_expected_dates(self, params,): + date_dict = { + "2020-07-19": [""] + } + def test_patch(self): + pass \ No newline at end of file diff --git a/quidel_covidtest/tests/test_pull.py b/quidel_covidtest/tests/test_pull.py index a3436392b..8177df1a5 100644 --- a/quidel_covidtest/tests/test_pull.py +++ b/quidel_covidtest/tests/test_pull.py @@ -38,8 +38,7 @@ def test_fix_date(self): datetime(2020, 6, 11), datetime(2020, 7, 2)]) class TestingPullData: - def test_pull_quidel_covidtest(self): - + def test_pull_quidel_covidtest(self, mock_get_from_s3): df, _ = pull_quidel_covidtest({ "static_file_dir": "../static", "input_cache_dir": "./cache", diff --git a/quidel_covidtest/tests/test_run.py b/quidel_covidtest/tests/test_run.py index 4e15bea91..cc054dfc2 100644 --- a/quidel_covidtest/tests/test_run.py +++ b/quidel_covidtest/tests/test_run.py @@ -1,49 +1,25 @@ """Tests for running the quidel covidtest indicator.""" +from itertools import product +import os from os import listdir from os.path import join +from pathlib import Path import pandas as pd import numpy as np from delphi_utils import add_prefix from delphi_quidel_covidtest.constants import PARENT_GEO_RESOLUTIONS, NONPARENT_GEO_RESOLUTIONS, \ - SENSORS -from delphi_quidel_covidtest.run import run_module - + SENSORS, AGE_GROUPS class TestRun: """Tests for run_module().""" - PARAMS = { - "common": { - "export_dir": "./receiving" - }, - "indicator": { - "static_file_dir": "../static", - "input_cache_dir": "./cache", - "backfill_dir": "./backfill", - "backfill_merge_day": 0, - "export_start_date": "2020-06-30", - "export_end_date": "", - "pull_start_date": "2020-07-09", - "pull_end_date":"", - "export_day_range":40, - "aws_credentials": { - "aws_access_key_id": "", - "aws_secret_access_key": "" - }, - "bucket_name": "", - "wip_signal": "", - "test_mode": True - } - } - - def test_output_files(self, clean_receiving_dir): + def test_output_files(self, run_as_module, params): """Tests that the proper files are output.""" # Test output exists - run_module(self.PARAMS) - csv_files = [i for i in listdir("receiving") if i.endswith(".csv")] + csv_files = [i for i in listdir(params["common"]["export_dir"]) if i.endswith(".csv")] dates = [ "20200718", @@ -52,16 +28,20 @@ def test_output_files(self, clean_receiving_dir): ] geos = PARENT_GEO_RESOLUTIONS + NONPARENT_GEO_RESOLUTIONS sensors = add_prefix(SENSORS, - wip_signal=self.PARAMS["indicator"]["wip_signal"], + wip_signal=params["indicator"]["wip_signal"], prefix="wip_") + full_sensor = [f"{sensor}_{age}" for sensor, age in product(sensors, AGE_GROUPS)] expected_files = [] for date in dates: - for geo in geos: + for geo in PARENT_GEO_RESOLUTIONS: for sensor in sensors: expected_files += [date + "_" + geo + "_" + sensor + ".csv"] + for geo in NONPARENT_GEO_RESOLUTIONS: + for sensor in full_sensor: + expected_files += [date + "_" + geo + "_" + sensor + ".csv"] - assert set(expected_files).issubset(set(csv_files)) + assert set(expected_files) == (set(csv_files)) assert '20200721_state_covid_ag_raw_pct_positive.csv' not in csv_files assert '20200722_state_covid_ag_raw_pct_positive.csv' not in csv_files @@ -105,3 +85,8 @@ def test_output_files(self, clean_receiving_dir): if ".csv" in fname: flag = 1 assert flag is not None + + for files in Path(params["common"]["export_dir"]).glob("*.csv"): + os.remove(files) + for files in Path(params["indicator"]["input_cache_dir"]).glob("*.csv"): + os.remove(files) From 9c4f5ab91bdda6d09fd9e09db3751c3c0b54c351 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Tue, 1 Oct 2024 18:36:25 -0400 Subject: [PATCH 05/13] using constants instead of params --- quidel_covidtest/delphi_quidel_covidtest/pull.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/pull.py b/quidel_covidtest/delphi_quidel_covidtest/pull.py index 4ea99a139..31704c3fb 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/pull.py +++ b/quidel_covidtest/delphi_quidel_covidtest/pull.py @@ -276,13 +276,15 @@ def pull_quidel_covidtest(params, logger): """ cache_dir = params["input_cache_dir"] + if params["pull_start_date"] == "": + params["pull_start_date"] = FULL_BKFILL_START_DATE # pull new data only that has not been ingested previous_df, pull_start_date = check_intermediate_file( cache_dir, - datetime.strptime(params["pull_start_date"], '%Y-%m-%d')) + params["pull_start_date"]) - if params["pull_start_date"] != "": + if params["pull_start_date"] != FULL_BKFILL_START_DATE: pull_start_date = datetime.strptime(params["pull_start_date"], '%Y-%m-%d') if params["pull_end_date"] == "": From b2bbb81a51e9340d2cfa5b2016238c0c48985e8c Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Mon, 28 Oct 2024 12:21:08 -0400 Subject: [PATCH 06/13] implimenting --- .../delphi_quidel_covidtest/backfill.py | 101 ++++++++++++++---- .../delphi_quidel_covidtest/run.py | 8 +- 2 files changed, 83 insertions(+), 26 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/backfill.py b/quidel_covidtest/delphi_quidel_covidtest/backfill.py index 7f7aba06d..bc1b6b48e 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/backfill.py +++ b/quidel_covidtest/delphi_quidel_covidtest/backfill.py @@ -1,8 +1,12 @@ # -*- coding: utf-8 -*- """Store backfill data.""" +import calendar import os import glob -from datetime import datetime +import re +import shutil +from datetime import datetime, timedelta +from typing import Union import pandas as pd @@ -11,7 +15,7 @@ gmpr = GeoMapper() -def store_backfill_file(df, _end_date, backfill_dir): +def store_backfill_file(df, _end_date, backfill_dir, logger): """ Store county level backfill data into backfill_dir. @@ -59,6 +63,7 @@ def store_backfill_file(df, _end_date, backfill_dir): 'num_age_0_17', 'den_age_0_17'] backfilldata = backfilldata.loc[backfilldata["time_value"] >= _start_date, selected_columns] + logger.info("Filtering source data", startdate=_start_date, enddate=_end_date) backfilldata["lag"] = [(_end_date - x).days for x in backfilldata["time_value"]] backfilldata["time_value"] = backfilldata.time_value.dt.strftime("%Y-%m-%d") backfilldata["issue_date"] = datetime.strftime(_end_date, "%Y-%m-%d") @@ -70,19 +75,73 @@ def store_backfill_file(df, _end_date, backfill_dir): "state_id": "string" }) - path = backfill_dir + \ - "/quidel_covidtest_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + filename = "/quidel_covidtest_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + path = f"{backfill_dir}\{filename}" # Store intermediate file into the backfill folder - backfilldata.to_parquet(path, index=False) + try: + backfilldata.to_parquet(path, index=False) + logger.info("Stored source data in parquet", filename=filename) + except: + logger.info("Failed to store source data in parquet") + return path -def merge_backfill_file(backfill_dir, backfill_merge_day, today, - test_mode=False, check_nd=25): + +def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger): + """ + Merge existing backfill with the patch data included. This function is specifically run for patching. + + When the indicator fails for some reason or another, there's a gap in the backfill files. + The patch to fill in the missing dates happens later down the line when the backfill files are already merged. + This function takes the merged files with the missing date, insert the particular date, and merge back the file. + Parameters + ---------- + issue_date : datetime + The most recent date when the raw data is received + backfill_dir : str + specified path to store backfill files. + backfill_file : str + specific file add to merged backfill file. """ - Merge ~4 weeks' backfill data into one file. + new_files = glob.glob(backfill_dir + "/quidel_covidtest_*") + + def get_file_with_date(files) -> Union[str, None]: + for filename in files: + # need to only match files with 6 digits for merged files + pattern = re.findall(r"_(\d{6,6})\.parquet", filename) + if pattern: + file_month = datetime.strptime(pattern[0], "%Y%m").replace(day=1) + end_date = (file_month + timedelta(days=32)).replace(day=1) + if issue_date >= file_month and issue_date < end_date: + return filename + return "" + + file_name = get_file_with_date(new_files) + + if len(file_name) == 0: + logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d")) + return + + logger.info("Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_name) + + # Start to merge files + merge_file = f"{file_name.split('.')[0]}_after_merge.parquet" + try: + shutil.copyfile(file_name, merge_file) + existing_df = pd.read_parquet(merge_file, engine="pyarrow") + df = pd.read_parquet(backfill_file, engine="pyarrow") + merged_df = pd.concat([existing_df, df]).sort_values(["time_value", "fips"]) + merged_df.to_parquet(merge_file, index=False) + os.remove(file_name) + os.rename(merge_file, file_name) + # pylint: disable=W0703: + except Exception as e: + os.remove(merge_file) + logger.error(e) + return - Usually this function should merge 28 days' data into a new file so as to - save the reading time when running the backfill pipelines. We set a softer - threshold to allow flexibility in data delivery. +def merge_backfill_file(backfill_dir, today, logger, test_mode=False): + """ + Merge month's backfill data into one file. Parameters ---------- @@ -90,17 +149,12 @@ def merge_backfill_file(backfill_dir, backfill_merge_day, today, The most recent date when the raw data is received backfill_dir : str specified path to store backfill files. - backfill_merge_day: int - The day of a week that we used to merge the backfill files. e.g. 0 - is Monday. test_mode: bool - check_nd: int - The criteria of the number of unmerged files. Ideally, we want the - number to be 28, but we use a looser criteria from practical - considerations """ - new_files = glob.glob(backfill_dir + "/quidel_covidtest_as_of_*") + previous_month = (today.replace(day=1) - timedelta(days=1)).strftime("%Y%m") + new_files = glob.glob(backfill_dir + f"/quidel_covidtest_as_of_{previous_month}*") if len(new_files) == 0: # if no any daily file is stored + logger.info("No new files to merge; skipping merging") return def get_date(file_link): @@ -115,16 +169,21 @@ def get_date(file_link): # Check whether to merge # Check the number of files that are not merged - if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd: + date_list = list(map(get_date, new_files)) + latest_date = max(date_list) + num_of_days_in_month = calendar.monthrange(latest_date.year, latest_date.month)[1] + if len(date_list) < num_of_days_in_month: + logger.info("Not enough days, skipping merging", n_file_days=len(date_list)) return # Start to merge files + logger.info(f"Merging files", start_date=date_list[0], end_date=date_list[-1]) pdList = [] for fn in new_files: df = pd.read_parquet(fn, engine='pyarrow') pdList.append(df) merged_file = pd.concat(pdList).sort_values(["time_value", "fips"]) - path = backfill_dir + "/quidel_covidtest_from_%s_to_%s.parquet"%( + path = backfill_dir + f"/quidel_covidtest_{datetime.strftime(latest_date, '%Y%m')}.parquet"%( datetime.strftime(earliest_date, "%Y%m%d"), datetime.strftime(latest_date, "%Y%m%d")) merged_file.to_parquet(path, index=False) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index b010df8b7..97364093c 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -139,15 +139,13 @@ def run_module(params: Dict[str, Any], logger=None): backfill_dir = params["indicator"]["backfill_dir"] backfill_merge_day = params["indicator"]["backfill_merge_day"] - # Merge 4 weeks' data into one file to save runtime - # Notice that here we don't check the _end_date(receive date) - # since we always want such merging happens on a certain day of a week - merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today()) + # Merge a month's data into one file to save runtime + merge_backfill_file(backfill_dir, datetime.today(), logger) if _end_date is None: logger.info("The data is up-to-date. Currently, no new data to be ingested.") return # Store the backfill intermediate file - store_backfill_file(df, _end_date, backfill_dir) + store_backfill_file(df, _end_date, backfill_dir, logger) export_end_date = check_export_end_date( export_end_date, _end_date, END_FROM_TODAY_MINUS) From 21289c960528c31f545eb3f86b21b6bd379b34d5 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Tue, 29 Oct 2024 10:17:12 -0400 Subject: [PATCH 07/13] making test more robust in progress --- quidel_covidtest/tests/test_backfill.py | 154 ++++++++++++++++++------ 1 file changed, 115 insertions(+), 39 deletions(-) diff --git a/quidel_covidtest/tests/test_backfill.py b/quidel_covidtest/tests/test_backfill.py index 27e0d01bc..3499d93a5 100644 --- a/quidel_covidtest/tests/test_backfill.py +++ b/quidel_covidtest/tests/test_backfill.py @@ -1,24 +1,20 @@ +import calendar import logging import os import glob -from datetime import datetime - +from datetime import datetime, timedelta +from pathlib import Path import pandas as pd -from delphi_quidel_covidtest.pull import pull_quidel_covidtest - +from delphi_utils.logger import get_structured_logger from delphi_quidel_covidtest.backfill import (store_backfill_file, - merge_backfill_file) - -END_FROM_TODAY_MINUS = 5 -EXPORT_DAY_RANGE = 40 + merge_backfill_file, merge_existing_backfill_files) -TEST_LOGGER = logging.getLogger() -backfill_dir="./backfill" - -class TestBackfill: - - df, _end_date = pull_quidel_covidtest({ +TEST_PATH = Path(__file__).parent +PARAMS = { + "indicator": { + "backfill_dir": f"{TEST_PATH}/backfill", + "drop_date": "2020-06-11", "static_file_dir": "../static", "input_cache_dir": "./cache", "export_start_date": "2020-06-30", @@ -29,19 +25,33 @@ class TestBackfill: "aws_access_key_id": "", "aws_secret_access_key": "" }, - "bucket_name": "", - "wip_signal": "", - "test_mode": True - }, TEST_LOGGER) - - def test_store_backfill_file(self): - - store_backfill_file(self.df, datetime(2020, 1, 1), backfill_dir) + "bucket_name": "", + "wip_signal": "", + "test_mode": True + }, + } +DATA_FILEPATH = f"{PARAMS['indicator']['input_cache_dir']}/pulled_until_20200817.csv" +backfill_dir = PARAMS["indicator"]["backfill_dir"] + +class TestBackfill: + _end_date = datetime.strptime(DATA_FILEPATH.split("_")[2].split(".")[0], + '%Y%m%d') + timedelta(days=1) + df = pd.read_csv(DATA_FILEPATH, sep=",", parse_dates=["timestamp"]) + + def cleanup(self): + for file in glob.glob(f"{backfill_dir}/*.parquet"): + os.remove(file) + assert file not in os.listdir(backfill_dir) + + def test_store_backfill_file(self, caplog): + caplog.set_level(logging.INFO) + logger = get_structured_logger() + + store_backfill_file(self.df, datetime(2020, 1, 1), backfill_dir, logger) fn = "quidel_covidtest_as_of_20200101.parquet" assert fn in os.listdir(backfill_dir) backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow') - selected_columns = ['time_value', 'fips', 'state_id', 'den_total', 'num_total', 'num_age_0_4', 'den_age_0_4', @@ -52,32 +62,34 @@ def test_store_backfill_file(self): 'num_age_0_17', 'den_age_0_17', 'lag', 'issue_date'] assert set(selected_columns) == set(backfill_df.columns) - - os.remove(backfill_dir + "/" + fn) - assert fn not in os.listdir(backfill_dir) - - def test_merge_backfill_file(self): - + + assert fn in os.listdir(backfill_dir) + assert "Stored backfill data in parquet" in caplog.text + + self.cleanup() + def test_merge_backfill_file(self, caplog): + caplog.set_level(logging.INFO) + logger = get_structured_logger() + today = datetime.today() fn = "quidel_covidtest_from_20200817_to_20200820.parquet" assert fn not in os.listdir(backfill_dir) - # Check the when no daily file stored + # Check when no daily file stored today = datetime(2020, 8, 20) - merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=8) + merge_backfill_file(backfill_dir, today, logger, test_mode=True) assert fn not in os.listdir(backfill_dir) + assert "No new files to merge; skipping merging" in caplog.text + for d in range(17, 21): dropdate = datetime(2020, 8, d) store_backfill_file(self.df, dropdate, backfill_dir) - - # Check the when the merged file is not generated - today = datetime(2020, 8, 20) - merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=8) - assert fn not in os.listdir(backfill_dir) # Generate the merged file, but not delete it - merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=2) + today = datetime(2020, 7, 1) + monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4)) + merge_backfill_file(backfill_dir, today, logger, test_mode=True,) assert fn in os.listdir(backfill_dir) # Read daily file @@ -101,6 +113,70 @@ def test_merge_backfill_file(self): assert expected.shape[0] == merged.shape[0] assert expected.shape[1] == merged.shape[1] - os.remove(backfill_dir + "/" + fn) - assert fn not in os.listdir(backfill_dir) + self.cleanup() + + def test_merge_existing_backfill_files(self, caplog): + issue_date = datetime(year=2020, month=6, day=13) + issue_date_str = issue_date.strftime("%Y%m%d") + caplog.set_level(logging.INFO) + logger = get_structured_logger() + def prep_backfill_data(): + # Generate backfill daily files + for d in range(11, 15): + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) + + today = datetime(2020, 7, 1) + # creating expected file + merge_backfill_file(backfill_dir, today, logger, + test_mode=True) + original = f"{backfill_dir}/claims_hosp_202006.parquet" + os.rename(original, f"{backfill_dir}/expected.parquet") + + # creating backfill without issue date + os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet") + merge_backfill_file(backfill_dir, today, logger, + test_mode=True) + + old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") + for file in old_files: + os.remove(file) + + prep_backfill_data() + file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) + + assert "Adding missing date to merged file" in caplog.text + + expected = pd.read_parquet(f"{backfill_dir}/expected.parquet") + merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_202006.parquet") + + check = pd.concat([merged, expected]).drop_duplicates(keep=False) + + assert len(check) == 0 + + self.cleanup() + + + def test_merge_existing_backfill_files_no_call(self, caplog): + issue_date = datetime(year=2020, month=5, day=20) + caplog.set_level(logging.INFO) + logger = get_structured_logger() + def prep_backfill_data(): + # Generate backfill daily files + for d in range(11, 15): + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) + + today = datetime(2020, 6, 14) + # creating expected file + merge_backfill_file(backfill_dir, today, logger, + test_mode=True) + + prep_backfill_data() + file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) + assert "Issue date has no matching merged files" in caplog.text + self.cleanup() + From 161b21dee95a60908544058f8551289a6dd3a364 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Tue, 29 Oct 2024 10:17:36 -0400 Subject: [PATCH 08/13] cleaning up run --- quidel_covidtest/delphi_quidel_covidtest/run.py | 1 - 1 file changed, 1 deletion(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index 97364093c..e6f05a45c 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -137,7 +137,6 @@ def run_module(params: Dict[str, Any], logger=None): # (generate files). if params["indicator"].get("generate_backfill_files", True): backfill_dir = params["indicator"]["backfill_dir"] - backfill_merge_day = params["indicator"]["backfill_merge_day"] # Merge a month's data into one file to save runtime merge_backfill_file(backfill_dir, datetime.today(), logger) From be805487b4924bbb2da027c6ad94836461caf6a2 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Tue, 29 Oct 2024 19:41:22 -0400 Subject: [PATCH 09/13] finishing up with tests --- .../delphi_quidel_covidtest/backfill.py | 11 ++-- quidel_covidtest/tests/test_backfill.py | 64 +++++++++---------- 2 files changed, 36 insertions(+), 39 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/backfill.py b/quidel_covidtest/delphi_quidel_covidtest/backfill.py index bc1b6b48e..bb2e89e44 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/backfill.py +++ b/quidel_covidtest/delphi_quidel_covidtest/backfill.py @@ -75,13 +75,13 @@ def store_backfill_file(df, _end_date, backfill_dir, logger): "state_id": "string" }) - filename = "/quidel_covidtest_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") - path = f"{backfill_dir}\{filename}" + filename = "quidel_covidtest_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + path = f"{backfill_dir}/{filename}" # Store intermediate file into the backfill folder try: backfilldata.to_parquet(path, index=False) logger.info("Stored source data in parquet", filename=filename) - except: + except Exception as e: logger.info("Failed to store source data in parquet") return path @@ -170,7 +170,6 @@ def get_date(file_link): # Check whether to merge # Check the number of files that are not merged date_list = list(map(get_date, new_files)) - latest_date = max(date_list) num_of_days_in_month = calendar.monthrange(latest_date.year, latest_date.month)[1] if len(date_list) < num_of_days_in_month: logger.info("Not enough days, skipping merging", n_file_days=len(date_list)) @@ -183,9 +182,7 @@ def get_date(file_link): df = pd.read_parquet(fn, engine='pyarrow') pdList.append(df) merged_file = pd.concat(pdList).sort_values(["time_value", "fips"]) - path = backfill_dir + f"/quidel_covidtest_{datetime.strftime(latest_date, '%Y%m')}.parquet"%( - datetime.strftime(earliest_date, "%Y%m%d"), - datetime.strftime(latest_date, "%Y%m%d")) + path = backfill_dir + f"/quidel_covidtest_{datetime.strftime(latest_date, '%Y%m')}.parquet" merged_file.to_parquet(path, index=False) # Delete daily files once we have the merged one. diff --git a/quidel_covidtest/tests/test_backfill.py b/quidel_covidtest/tests/test_backfill.py index 3499d93a5..9dc45ceb6 100644 --- a/quidel_covidtest/tests/test_backfill.py +++ b/quidel_covidtest/tests/test_backfill.py @@ -50,6 +50,7 @@ def test_store_backfill_file(self, caplog): store_backfill_file(self.df, datetime(2020, 1, 1), backfill_dir, logger) fn = "quidel_covidtest_as_of_20200101.parquet" assert fn in os.listdir(backfill_dir) + assert "Stored source data in parquet" in caplog.text backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow') selected_columns = ['time_value', 'fips', 'state_id', @@ -61,18 +62,15 @@ def test_store_backfill_file(self, caplog): 'num_age_65plus', 'den_age_65plus', 'num_age_0_17', 'den_age_0_17', 'lag', 'issue_date'] - assert set(selected_columns) == set(backfill_df.columns) - + assert set(selected_columns) == set(backfill_df.columns) assert fn in os.listdir(backfill_dir) - assert "Stored backfill data in parquet" in caplog.text self.cleanup() - def test_merge_backfill_file(self, caplog): + def test_merge_backfill_file(self, caplog, monkeypatch): caplog.set_level(logging.INFO) logger = get_structured_logger() - today = datetime.today() - fn = "quidel_covidtest_from_20200817_to_20200820.parquet" + fn = "quidel_covidtest_202008.parquet" assert fn not in os.listdir(backfill_dir) # Check when no daily file stored @@ -80,20 +78,20 @@ def test_merge_backfill_file(self, caplog): merge_backfill_file(backfill_dir, today, logger, test_mode=True) assert fn not in os.listdir(backfill_dir) assert "No new files to merge; skipping merging" in caplog.text - for d in range(17, 21): - dropdate = datetime(2020, 8, d) - store_backfill_file(self.df, dropdate, backfill_dir) - + dropdate = datetime(2020, 8, d) + store_backfill_file(self.df, dropdate, backfill_dir, logger) + # Generate the merged file, but not delete it - today = datetime(2020, 7, 1) + today = datetime(2020, 9, 1) monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4)) merge_backfill_file(backfill_dir, today, logger, test_mode=True,) assert fn in os.listdir(backfill_dir) + assert "Merging files" in caplog.text # Read daily file - new_files = glob.glob(backfill_dir + "/quidel_covidtest*.parquet") + new_files = glob.glob(backfill_dir + "/quidel_covidtest_as_of*.parquet") pdList = [] for file in new_files: if "from" in file: @@ -101,9 +99,6 @@ def test_merge_backfill_file(self, caplog): df = pd.read_parquet(file, engine='pyarrow') pdList.append(df) os.remove(file) - new_files = glob.glob(backfill_dir + "/quidel_covidtest*.parquet") - assert len(new_files) == 1 - expected = pd.concat(pdList).sort_values(["time_value", "fips"]) # Read the merged file @@ -115,46 +110,50 @@ def test_merge_backfill_file(self, caplog): self.cleanup() - def test_merge_existing_backfill_files(self, caplog): - issue_date = datetime(year=2020, month=6, day=13) + def test_merge_existing_backfill_files(self, caplog, monkeypatch): + issue_date = datetime(year=2020, month=7, day=20) issue_date_str = issue_date.strftime("%Y%m%d") caplog.set_level(logging.INFO) logger = get_structured_logger() def prep_backfill_data(): # Generate backfill daily files - for d in range(11, 15): - dropdate = datetime(2020, 6, d) - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) + for d in range(18, 24): + dropdate = datetime(2020, 7, d) + df_part = self.df[self.df['timestamp'] == dropdate] + store_backfill_file(df_part, dropdate, backfill_dir, logger) - today = datetime(2020, 7, 1) + today = datetime(2020, 8, 1) # creating expected file + monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4)) merge_backfill_file(backfill_dir, today, logger, test_mode=True) - original = f"{backfill_dir}/claims_hosp_202006.parquet" + original = f"{backfill_dir}/quidel_covidtest_202007.parquet" os.rename(original, f"{backfill_dir}/expected.parquet") # creating backfill without issue date - os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet") + issue_date_filename = f"{backfill_dir}/quidel_covidtest_as_of_{issue_date_str}.parquet" + os.remove(issue_date_filename) merge_backfill_file(backfill_dir, today, logger, test_mode=True) - old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") + old_files = glob.glob(backfill_dir + "/quidel_covidtest_as_of_*") for file in old_files: os.remove(file) prep_backfill_data() - file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) + + df_to_add = self.df[self.df['timestamp'] == issue_date] + file_to_add = store_backfill_file(df_to_add, issue_date, backfill_dir, logger) merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) assert "Adding missing date to merged file" in caplog.text expected = pd.read_parquet(f"{backfill_dir}/expected.parquet") - merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_202006.parquet") + merged = pd.read_parquet(f"{backfill_dir}/quidel_covidtest_202007.parquet") check = pd.concat([merged, expected]).drop_duplicates(keep=False) assert len(check) == 0 - self.cleanup() @@ -164,17 +163,18 @@ def test_merge_existing_backfill_files_no_call(self, caplog): logger = get_structured_logger() def prep_backfill_data(): # Generate backfill daily files - for d in range(11, 15): - dropdate = datetime(2020, 6, d) - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) + for d in range(18, 24): + dropdate = datetime(2020, 7, d) + df_part = self.df[self.df["timestamp"] == dropdate] + store_backfill_file(df_part, dropdate, backfill_dir, logger) - today = datetime(2020, 6, 14) + today = datetime(2020, 8, 1) # creating expected file merge_backfill_file(backfill_dir, today, logger, test_mode=True) prep_backfill_data() - file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) + file_to_add = store_backfill_file(self.df, issue_date, backfill_dir, logger) merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) assert "Issue date has no matching merged files" in caplog.text self.cleanup() From 85f0185fe86d220cbc9f2074702c36758a4cb9ad Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 30 Oct 2024 11:57:53 -0400 Subject: [PATCH 10/13] adding patch code --- .../delphi_quidel_covidtest/patch.py | 2 +- quidel_covidtest/tests/test_patch.py | 37 ++++++++++++++++--- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/patch.py b/quidel_covidtest/delphi_quidel_covidtest/patch.py index 96c13dc95..4a8e54269 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/patch.py +++ b/quidel_covidtest/delphi_quidel_covidtest/patch.py @@ -57,7 +57,7 @@ def patch(): current_issue = start_issue - export_day_range += END_FROM_TODAY_MINUS + export_day_range -= END_FROM_TODAY_MINUS while current_issue <= end_issue: logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d")) diff --git a/quidel_covidtest/tests/test_patch.py b/quidel_covidtest/tests/test_patch.py index e5f72b90c..43b61ec46 100644 --- a/quidel_covidtest/tests/test_patch.py +++ b/quidel_covidtest/tests/test_patch.py @@ -1,8 +1,33 @@ +from unittest.mock import patch as mock_patch +from delphi_quidel_covidtest.patch import patch +import os +import shutil -class TestPatch: - def generate_expected_dates(self, params,): - date_dict = { - "2020-07-19": [""] - } +class TestPatchModule: def test_patch(self): - pass \ No newline at end of file + with mock_patch('delphi_quidel_covidtest.patch.get_structured_logger') as mock_get_structured_logger, \ + mock_patch('delphi_quidel_covidtest.patch.read_params') as mock_read_params, \ + mock_patch('delphi_quidel_covidtest.patch.run_module') as mock_run_module: + + mock_read_params.return_value = { + "common": { + "log_filename": "test.log" + }, + "indicator": { + "export_day_range": 40, + }, + "patch": { + "start_issue": "2021-01-01", + "end_issue": "2021-01-02", + "patch_dir": "./patch_dir" + } + } + + patch() + + assert os.path.isdir('./patch_dir') + assert os.path.isdir('./patch_dir/issue_20210101/quidel-covidtest') + assert os.path.isdir('./patch_dir/issue_20210102/quidel-covidtest') + + # Clean up the created directories after the test + shutil.rmtree(mock_read_params.return_value["patch"]["patch_dir"]) \ No newline at end of file From 684e423770641d1b2a4c32221d0dd62db1b80c98 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 30 Oct 2024 11:58:17 -0400 Subject: [PATCH 11/13] clean up --- quidel_covidtest/delphi_quidel_covidtest/backfill.py | 1 - 1 file changed, 1 deletion(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/backfill.py b/quidel_covidtest/delphi_quidel_covidtest/backfill.py index bb2e89e44..814c9635c 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/backfill.py +++ b/quidel_covidtest/delphi_quidel_covidtest/backfill.py @@ -164,7 +164,6 @@ def get_date(file_link): return datetime.strptime(fn, "%Y%m%d") date_list = list(map(get_date, new_files)) - earliest_date = min(date_list) latest_date = max(date_list) # Check whether to merge From c95d3c24491bf4a18b4c7ee540f5d493440988b9 Mon Sep 17 00:00:00 2001 From: nmdefries <42820733+nmdefries@users.noreply.github.com> Date: Fri, 6 Dec 2024 19:21:16 -0500 Subject: [PATCH 12/13] linting --- quidel_covidtest/delphi_quidel_covidtest/backfill.py | 12 +++++++----- .../delphi_quidel_covidtest/constants.py | 1 - quidel_covidtest/delphi_quidel_covidtest/patch.py | 2 -- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/quidel_covidtest/delphi_quidel_covidtest/backfill.py b/quidel_covidtest/delphi_quidel_covidtest/backfill.py index 814c9635c..a7591c284 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/backfill.py +++ b/quidel_covidtest/delphi_quidel_covidtest/backfill.py @@ -81,7 +81,7 @@ def store_backfill_file(df, _end_date, backfill_dir, logger): try: backfilldata.to_parquet(path, index=False) logger.info("Stored source data in parquet", filename=filename) - except Exception as e: + except Exception: # pylint: disable=W0703 logger.info("Failed to store source data in parquet") return path @@ -105,6 +105,7 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge new_files = glob.glob(backfill_dir + "/quidel_covidtest_*") def get_file_with_date(files) -> Union[str, None]: + # pylint: disable=R1716 for filename in files: # need to only match files with 6 digits for merged files pattern = re.findall(r"_(\d{6,6})\.parquet", filename) @@ -113,6 +114,7 @@ def get_file_with_date(files) -> Union[str, None]: end_date = (file_month + timedelta(days=32)).replace(day=1) if issue_date >= file_month and issue_date < end_date: return filename + # pylint: enable=R1716 return "" file_name = get_file_with_date(new_files) @@ -121,7 +123,8 @@ def get_file_with_date(files) -> Union[str, None]: logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d")) return - logger.info("Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_name) + logger.info("Adding missing date to merged file", issue_date=issue_date, + filename=backfill_file, merged_filename=file_name) # Start to merge files merge_file = f"{file_name.split('.')[0]}_after_merge.parquet" @@ -133,8 +136,7 @@ def get_file_with_date(files) -> Union[str, None]: merged_df.to_parquet(merge_file, index=False) os.remove(file_name) os.rename(merge_file, file_name) - # pylint: disable=W0703: - except Exception as e: + except Exception as e: # pylint: disable=W0703 os.remove(merge_file) logger.error(e) return @@ -175,7 +177,7 @@ def get_date(file_link): return # Start to merge files - logger.info(f"Merging files", start_date=date_list[0], end_date=date_list[-1]) + logger.info("Merging files", start_date=date_list[0], end_date=date_list[-1]) pdList = [] for fn in new_files: df = pd.read_parquet(fn, engine='pyarrow') diff --git a/quidel_covidtest/delphi_quidel_covidtest/constants.py b/quidel_covidtest/delphi_quidel_covidtest/constants.py index 0bc2cff98..6caf51863 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/constants.py +++ b/quidel_covidtest/delphi_quidel_covidtest/constants.py @@ -53,4 +53,3 @@ ] FULL_BKFILL_START_DATE = datetime(2020, 5, 26) - diff --git a/quidel_covidtest/delphi_quidel_covidtest/patch.py b/quidel_covidtest/delphi_quidel_covidtest/patch.py index 4a8e54269..9b5dd1ee8 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/patch.py +++ b/quidel_covidtest/delphi_quidel_covidtest/patch.py @@ -20,10 +20,8 @@ It will generate data for that range of issue dates, and store them in batch issue format: [name-of-patch]/issue_[issue-date]/quidel_covidtest/actual_data_file.csv """ -import time from datetime import datetime, timedelta from os import makedirs -from pathlib import Path from delphi_utils import get_structured_logger, read_params From 7c350a9053fd383dfad195ad3945c4e9d976c725 Mon Sep 17 00:00:00 2001 From: nmdefries <42820733+nmdefries@users.noreply.github.com> Date: Fri, 6 Dec 2024 19:52:13 -0500 Subject: [PATCH 13/13] require mock --- quidel_covidtest/setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/quidel_covidtest/setup.py b/quidel_covidtest/setup.py index 82c80832a..8c40e103c 100644 --- a/quidel_covidtest/setup.py +++ b/quidel_covidtest/setup.py @@ -7,6 +7,7 @@ "darker[isort]~=2.1.1", "delphi-utils", "imap-tools", + "mock", "numpy", "openpyxl", "pandas",