diff --git a/tests/test_evaluation.py b/tests/test_evaluation.py index a01aef4..46d2566 100644 --- a/tests/test_evaluation.py +++ b/tests/test_evaluation.py @@ -125,11 +125,11 @@ def test_evaluation_by_ID_for_forecast_step_invalid_input(): benchmark.fcst_train[0] ) # ensure ID column in dataframe with single time series # calculate metrics by ID for selected forecast step - with pytest.raises(AssertionError): + with pytest.raises(ValueError): calculate_metrics_by_ID_for_forecast_step( fcst_df=fcst_test_peyton, df_historic=fcst_train_peyton, forecast_step_in_focus=1, freq="D" ) - with pytest.raises(AssertionError): + with pytest.raises(ValueError): calculate_metrics_by_ID_for_forecast_step( fcst_df=fcst_test_peyton, df_historic=fcst_train_peyton, diff --git a/tests/test_models.py b/tests/test_models.py index 10ff28a..6acb581 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -186,7 +186,7 @@ def test_seasonal_naive_model(dataset_input, model_classes_and_params_input): @pytest.mark.parametrize(*decorator_input) def test_seasonal_naive_model_invalid_input(dataset_input, model_classes_and_params_input): - log.info("Test invalid model input - Raise Assertion") + log.info("Test invalid model input - Raise ValueError") peyton_manning_df = pd.read_csv(PEYTON_FILE, nrows=NROWS) dataset_list = [ Dataset( @@ -209,7 +209,7 @@ def test_seasonal_naive_model_invalid_input(dataset_input, model_classes_and_par num_processes=1, ) - with pytest.raises(AssertionError): + with pytest.raises(ValueError): _, _ = benchmark.run() log.info("#### Done with test_seasonal_naive_model_invalid_input") @@ -345,6 +345,6 @@ def test_check_min_input_len(): save_dir=SAVE_DIR, num_processes=1, ) - with pytest.raises(AssertionError): + with pytest.raises(ValueError): results_train, results_test = benchmark.run() log.info("#### test_check_min_input_len") diff --git a/tot/benchmark.py b/tot/benchmark.py index e5956a7..453f7cf 100644 --- a/tot/benchmark.py +++ b/tot/benchmark.py @@ -11,6 +11,7 @@ import pandas as pd from tot.datasets.dataset import Dataset +from tot.error_utils import raise_if from tot.experiment import CrossValidationExperiment, Experiment, SimpleExperiment from tot.models.models import Model @@ -98,8 +99,10 @@ def run(self, verbose=True): log.info("exp {}/{}: {}".format(i + 1, len(self.experiments), exp.experiment_name)) log.info("---- Staring Series of {} Experiments ----".format(len(self.experiments))) if self.num_processes > 1 and len(self.experiments) > 1: - if not all([exp.num_processes == 1 for exp in self.experiments]): - raise ValueError("can not set multiprocessing in experiments and Benchmark.") + raise_if( + not all([exp.num_processes == 1 for exp in self.experiments]), + "Cannot set multiprocessing in " "Experiments and Benchmark.", + ) with Pool(self.num_processes) as pool: args_list = [(exp, verbose, i + 1) for i, exp in enumerate(self.experiments)] pool.map_async( diff --git a/tot/df_utils.py b/tot/df_utils.py index 7c11d6b..f22a801 100644 --- a/tot/df_utils.py +++ b/tot/df_utils.py @@ -4,6 +4,8 @@ import numpy as np import pandas as pd +from tot.error_utils import raise_data_validation_error_if, raise_if + log = logging.getLogger("tot.df_utils") @@ -25,14 +27,12 @@ def convert_to_datetime(series: pd.Series) -> pd.Series: ValueError if input series contains NaN values or has timezone specified """ - if series.isnull().any(): - raise ValueError("Found NaN in column ds.") + raise_if(series.isnull().any(), "Found NaN in column ds.") if series.dtype == np.int64: series = series.astype(str) if not np.issubdtype(series.dtype, np.datetime64): series = pd.to_datetime(series) - if series.dt.tz is not None: - raise ValueError("Column ds has timezone specified, which is not supported. Remove timezone.") + raise_if(series.dt.tz is not None, "Column ds has timezone specified, which is not supported. Remove timezone.") return series @@ -55,17 +55,10 @@ def _split_df(df: pd.DataFrame, test_percentage: Union[float, int]) -> Tuple[pd. Tuple[pd.DataFrame, pd.DataFrame] A tuple containing the training DataFrame and the validation DataFrame. """ - # Receives df with single ID column - assert len(df["ID"].unique()) == 1 + _validate_single_ID_df(df) + n_samples = len(df) - if 0.0 < test_percentage < 1.0: - n_valid = max(1, int(n_samples * test_percentage)) - else: - assert test_percentage >= 1 - assert type(test_percentage) == int - n_valid = test_percentage - n_train = n_samples - n_valid - assert n_train >= 1 + n_train = _calculate_n_train(n_samples, test_percentage) split_idx_train = n_train split_idx_val = split_idx_train @@ -75,6 +68,56 @@ def _split_df(df: pd.DataFrame, test_percentage: Union[float, int]) -> Tuple[pd. return df_train, df_val +def _validate_single_ID_df(df: pd.DataFrame) -> None: + """Check if the DataFrame contains single ID column. + + Parameters + ---------- + df : pd.DataFrame + DataFrame to be validated. + + Raises + ------- + ValueError + If DataFrame contains multiple IDs. + """ + raise_if(len(df["ID"].unique()) != 1, "DataFrame must have a single ID column.") + + +def _calculate_n_train(n_samples: int, test_size: Union[float, int]) -> int: + """Calculate the number of train samples. + + Parameters + ---------- + n_samples : int + Number of samples in the DataFrame. + test_size : float, int + The percentage or number of samples to be used for validation. + If the value is between 0 and 1, it is interpreted as a percentage of the total number of samples. + If the value is greater than or equal to 1, it is interpreted as the number of samples to be used for validation. + + Returns + ------- + int + Number of train samples to be used in the split. + + Raises + ------- + ValueError + If test size is not a float in range (0.0, 1.0) or an integer < len(df). + """ + if 0.0 < test_size < 1.0: + n_valid = max(1, int(n_samples * test_size)) + else: + raise_if( + type(test_size) != int or not 1 < test_size < n_samples, + "Test size should be a float in range (0.0, " "1.0) or an integer < len(df)", + ) + n_valid = test_size + + return int(n_samples - n_valid) + + def split_df( df: pd.DataFrame, test_percentage: Union[float, int] = 0.25, local_split: bool = True ) -> Tuple[pd.DataFrame, pd.DataFrame]: @@ -122,7 +165,7 @@ def __crossvalidation_split_df(df, k, fold_pct, fold_overlap_pct=0.0): Parameters ---------- df : pd.DataFrame - data + data with single ID columns k : int number of CV folds fold_pct : float @@ -138,27 +181,60 @@ def __crossvalidation_split_df(df, k, fold_pct, fold_overlap_pct=0.0): validation data """ - # Receives df with single ID column - assert len(df["ID"].unique()) == 1 total_samples = len(df) - samples_fold = max(1, int(fold_pct * total_samples)) - samples_overlap = int(fold_overlap_pct * samples_fold) - assert samples_overlap < samples_fold - min_train = total_samples - samples_fold - (k - 1) * (samples_fold - samples_overlap) - assert ( - min_train >= samples_fold - ), "Test percentage too large. Not enough train samples. Select smaller test percentage. " + samples_per_fold, samples_overlap = _calculate_cv_params(total_samples, k, fold_pct, fold_overlap_pct) folds = [] df_fold = df.copy(deep=True) for i in range(k, 0, -1): - df_train, df_val = split_df(df_fold, test_percentage=samples_fold) + df_train, df_val = split_df(df_fold, test_percentage=samples_per_fold) folds.append((df_train, df_val)) - split_idx = len(df_fold) - samples_fold + samples_overlap + split_idx = len(df_fold) - samples_per_fold + samples_overlap df_fold = df_fold.iloc[:split_idx].reset_index(drop=True) folds = folds[::-1] return folds +def _calculate_cv_params(total_samples: int, k: int, fold_pct: float, fold_overlap_pct: float) -> Tuple[int, int]: + """Return validated cross validation arguments. + + Parameters + ---------- + total_samples : int + number of data samples + k : int + number of CV folds + fold_pct : float + percentage of overall samples to be in each fold + fold_overlap_pct : float + percentage of overlap between the validation folds + + Returns + ------- + tuple (samples_per_fold, samples_overlap) + + samples fold + + samples overlap + + Raises + ------- + ValueError + If samples overlap is bigger than samples fold. + ValueError + If test percentage too large and there are not enough train samples. + """ + samples_per_fold = max(1, int(fold_pct * total_samples)) + samples_overlap = int(fold_overlap_pct * samples_per_fold) + raise_if(samples_overlap > samples_per_fold, "Samples overlap is bigger than samples fold") + + min_train = total_samples - samples_per_fold - (k - 1) * (samples_per_fold - samples_overlap) + raise_if( + min_train < samples_per_fold, + "Test percentage too large. Not enough train samples. Select smaller test " "percentage.", + ) + return samples_per_fold, samples_overlap + + def _crossvalidation_split_df( df, received_single_time_series, k, fold_pct, fold_overlap_pct=0.0, global_model_cv_type="global-time" ): @@ -194,6 +270,10 @@ def _crossvalidation_split_df( training data validation data + Raises + ------- + ValueError + If invalid type of crossvalidation is selected. """ if received_single_time_series: folds = ( @@ -339,21 +419,14 @@ def _crossvalidation_with_time_threshold(df, k, fold_pct, fold_overlap_pct=0.0): validation data """ df_merged = merge_dataframes(df) - total_samples = len(df_merged) - samples_fold = max(1, int(fold_pct * total_samples)) - samples_overlap = int(fold_overlap_pct * samples_fold) - assert samples_overlap < samples_fold - min_train = total_samples - samples_fold - (k - 1) * (samples_fold - samples_overlap) - assert ( - min_train >= samples_fold - ), "Test percentage too large. Not enough train samples. Select smaller test percentage. " + samples_per_fold, samples_overlap = _calculate_cv_params(len(df_merged), k, fold_pct, fold_overlap_pct) folds = [] df_fold, _, _, _ = prep_or_copy_df(df) for i in range(k, 0, -1): - threshold_time_stamp = find_time_threshold(df_fold, samples_fold) + threshold_time_stamp = find_time_threshold(df_fold, samples_per_fold) df_train, df_val = split_considering_timestamp(df_fold, threshold_time_stamp=threshold_time_stamp) folds.append((df_train, df_val)) - split_idx = len(df_merged) - samples_fold + samples_overlap + split_idx = len(df_merged) - samples_per_fold + samples_overlap df_merged = df_merged[:split_idx].reset_index(drop=True) threshold_time_stamp = df_merged["ds"].iloc[-1] df_fold_aux = pd.DataFrame() @@ -440,11 +513,18 @@ def merge_dataframes(df: pd.DataFrame) -> pd.DataFrame: ------- pd.Dataframe Dataframe with concatenated time series (sorted 'ds', duplicates removed, index reset) + + Raises + ------- + ValueError + If df is not an instance of pd.DataFrame. + ValueError + If df does not contain 'ID' column. + """ - if not isinstance(df, pd.DataFrame): - raise ValueError("Can not join other than pd.DataFrames") - if "ID" not in df.columns: - raise ValueError("df does not contain 'ID' column") + raise_if(not isinstance(df, pd.DataFrame), "Can not join other than pd.DataFrames") + raise_if("ID" not in df.columns, "df does not contain 'ID' column") + df_merged = df.copy(deep=True).drop("ID", axis=1) df_merged = df_merged.sort_values("ds") df_merged = df_merged.drop_duplicates(subset=["ds"]) @@ -470,13 +550,8 @@ def find_time_threshold(df, valid_p): """ df_merged = merge_dataframes(df) n_samples = len(df_merged) - if 0.0 < valid_p < 1.0: - n_valid = max(1, int(n_samples * valid_p)) - else: - assert valid_p >= 1 - assert type(valid_p) == int - n_valid = valid_p - n_train = n_samples - n_valid + n_train = _calculate_n_train(n_samples, valid_p) + threshold_time_stamp = df_merged.loc[n_train, "ds"] log.debug("Time threshold: ", threshold_time_stamp) return threshold_time_stamp @@ -529,12 +604,13 @@ def _check_min_df_len(df, min_len): Raises ------ - AssertionError + ValueError If the dataframe does not have at least `min_len` rows. """ - assert ( - df.groupby("ID").apply(lambda x: len(x) > min_len).all() - ), "Input time series has not enough sample to fit an predict the model." + raise_if( + df.groupby("ID").apply(lambda x: len(x) < min_len).any(), + "Input time series has not enough sample to " "fit an predict the model.", + ) def add_first_inputs_to_df(samples: int, df_train: pd.DataFrame, df_test: pd.DataFrame) -> pd.DataFrame: @@ -704,8 +780,7 @@ def _handle_missing_data(df, freq): pd.DataFrame preprocessed dataframe """ - # Receives df with single ID column - assert len(df["ID"].unique()) == 1 + _validate_single_ID_df(df) # set imput parameters: impute_linear = 10 @@ -796,41 +871,48 @@ def check_single_dataframe(df, check_y): Returns ------- pd.DataFrame + + Raises + ------- + ValueError + If Dataframe has no rows. + ValueError + If Dataframe does not have columns 'ds' with the dates. + ValueError + If NaN is found in column 'ds'. + ValueError + If column 'ds' has timezone specified, which is not supported. + ValueError + If column 'ds' has duplicate values. """ - # Receives df with single ID column - assert len(df["ID"].unique()) == 1 - if df.shape[0] == 0: - raise ValueError("Dataframe has no rows.") - if "ds" not in df: - raise ValueError('Dataframe must have columns "ds" with the dates.') - if df.loc[:, "ds"].isnull().any(): - raise ValueError("Found NaN in column ds.") + _validate_single_ID_df(df) + + raise_if(df.shape[0] == 0, "Dataframe has no rows.") + raise_if("ds" not in df, 'Dataframe must have columns "ds" with the dates.') + raise_if(df.loc[:, "ds"].isnull().any(), "Found NaN in column ds.") + if df["ds"].dtype == np.int64: df["ds"] = df.loc[:, "ds"].astype(str) if pd.api.types.is_string_dtype(df["ds"]): df["ds"] = pd.to_datetime(df.loc[:, "ds"]) if not np.issubdtype(df["ds"].dtype, np.datetime64): df["ds"] = pd.to_datetime(df.loc[:, "ds"]) - if df["ds"].dt.tz is not None: - raise ValueError("Column ds has timezone specified, which is not supported. Remove timezone.") - if len(df.ds.unique()) != len(df.ds): - raise ValueError("Column ds has duplicate values. Please remove duplicates.") + + raise_if(df["ds"].dt.tz is not None, "Column ds has timezone specified, which is not supported. Remove timezone.") + raise_if(len(df.ds.unique()) != len(df.ds), "Column ds has duplicate values. Please remove duplicates.") columns = [] if check_y: columns.append("y") for name in columns: - if name not in df: - raise ValueError(f"Column {name!r} missing from dataframe") - if df.loc[df.loc[:, name].notnull()].shape[0] < 1: - raise ValueError(f"Dataframe column {name!r} only has NaN rows.") + raise_if(name not in df, f"Column {name!r} missing from dataframe") + raise_if(df.loc[df.loc[:, name].notnull()].shape[0] < 1, f"Dataframe column {name!r} only has NaN rows.") if not np.issubdtype(df[name].dtype, np.number): df.loc[:, name] = pd.to_numeric(df.loc[:, name]) if np.isinf(df.loc[:, name].values).any(): df.loc[:, name] = df[name].replace([np.inf, -np.inf], np.nan) - if df.loc[df.loc[:, name].notnull()].shape[0] < 1: - raise ValueError(f"Dataframe column {name!r} only has NaN rows.") + raise_if(df.loc[df.loc[:, name].notnull()].shape[0] < 1, f"Dataframe column {name!r} only has NaN rows.") if df.index.name == "ds": df.index.name = None @@ -874,12 +956,19 @@ def prep_or_copy_df(df): df or dict containing data Returns ------- - pd.DataFrames + pd.DataFrame df with ID col bool whether the ID col was present bool - wheter it is a single time series + whether it is a single time series + + Raises + ------- + ValueError + If df is None. + ValueError + If df type is invalid. """ received_ID_col = False received_single_time_series = True @@ -925,7 +1014,7 @@ def return_df_in_original_format(df, received_ID_col=False, received_single_time """ new_df = df.copy(deep=True) if not received_ID_col and received_single_time_series: - assert len(new_df["ID"].unique()) == 1 + _validate_single_ID_df(df) new_df.drop("ID", axis=1, inplace=True) log.info("Returning df with no ID column") return new_df @@ -948,13 +1037,19 @@ def unfold_dict_of_folds(folds_dict, k): training data validation data + Raises + ------- + DataValidationError + If number of folds in folds_dict does not correspond to k. """ folds = [] df_train = pd.DataFrame() df_test = pd.DataFrame() for j in range(0, k): for key in folds_dict: - assert k == len(folds_dict[key]) + raise_data_validation_error_if( + k != len(folds_dict[key]), "Number of folds in folds_dict does not " "correspond to k" + ) df_train = pd.concat((df_train, folds_dict[key][j][0]), ignore_index=True) df_test = pd.concat((df_test, folds_dict[key][j][1]), ignore_index=True) folds.append((df_train, df_test)) diff --git a/tot/error_utils.py b/tot/error_utils.py new file mode 100644 index 0000000..435bff3 --- /dev/null +++ b/tot/error_utils.py @@ -0,0 +1,43 @@ +def _raise_if(condition: bool, exception: Exception): + if condition: + raise exception + + +def raise_if(condition: bool, message: str): + """Check the condition and throw an error if True. + + Parameters + ---------- + condition : bool + condition to raise an exception + message : string + exception message + + Raises + ------- + ValueError + If condition is True. + """ + _raise_if(condition, ValueError(message)) + + +def raise_data_validation_error_if(condition: bool, message: str): + """Check the condition and throw an error if True. + + Parameters + ---------- + condition : bool + condition to raise an exception + message : string + exception message + + Raises + ------- + DataValidationError + If condition is true. + """ + _raise_if(condition, DataValidationError(message)) + + +class DataValidationError(Exception): + pass diff --git a/tot/evaluation/metric_utils.py b/tot/evaluation/metric_utils.py index 34ad02f..aac12c4 100644 --- a/tot/evaluation/metric_utils.py +++ b/tot/evaluation/metric_utils.py @@ -3,6 +3,7 @@ import numpy as np import pandas as pd +from tot.error_utils import raise_if from tot.evaluation.metrics import ERROR_FUNCTIONS @@ -54,8 +55,8 @@ def calculate_metrics_by_ID_for_forecast_step( 1 2.5 1.5 2 2.5 1.5 """ - assert metrics is not None, "Please specify a list of metrics to evaluate." - assert freq is not None, "Please specify the frequency of the data." + raise_if(metrics is None, "Please specify a list of metrics to evaluate.") + raise_if(freq is None, "Please specify the frequency of the data.") # calculate the specified metrics for every ID and every forecast step metrics_df_all_IDs = fcst_df.groupby("ID").apply( lambda x: _calc_metrics_for_single_ID_and_every_fcst_step( diff --git a/tot/evaluation/metrics.py b/tot/evaluation/metrics.py index 4450a43..86f6570 100644 --- a/tot/evaluation/metrics.py +++ b/tot/evaluation/metrics.py @@ -55,9 +55,6 @@ def _calc_mase( where: MAE = mean(|actual - forecast|) where: NaiveMAE = mean(|actual_[i] - actual_[i-1]|) """ - assert ( - truth_train is not None and len(truth_train) > 1 - ), "Please provide the actual values of the training data for MASE calculation." mae = _calc_mae(predictions, truth) naive_mae = _calc_mae(np.array(truth_train[:-1]), np.array(truth_train[1:])) return np.divide(mae, 1e-9 + naive_mae) @@ -76,9 +73,6 @@ def _calc_rmsse( where: RMSE = sqrt(mean((actual - forecast)^2)) where: NaiveMSE = sqrt(mean((actual_[i] - actual_[i-1])^2)) """ - assert ( - truth_train is not None and len(truth_train) > 1 - ), "Please provide the actual values of the training data for RMSSE calculation." rmse = _calc_rmse(predictions, truth) naive_rmse = _calc_rmse(np.array(truth_train[:-1]), np.array(truth_train[1:])) return np.divide(rmse, 1e-9 + naive_rmse) @@ -115,9 +109,6 @@ def __calc_mae_seasonal_naive( truth_train: np.ndarray = None, freq: Optional[str] = None, ) -> float: - assert ( - truth_train is not None and len(truth_train) > 1 - ), "Please provide the actual values of the training data for sMASE calculation." # convert frequency str to int K = FREQ_TO_SEASON_LENGTH[freq] # calculate seasonal forecast diff --git a/tot/models/models_naive.py b/tot/models/models_naive.py index 9ce6d9a..ee0c8f8 100644 --- a/tot/models/models_naive.py +++ b/tot/models/models_naive.py @@ -5,6 +5,7 @@ import pandas as pd from tot.df_utils import _check_min_df_len, add_first_inputs_to_df, drop_first_inputs_from_df +from tot.error_utils import raise_if from tot.models.models import Model from tot.models.utils import _convert_seasonality_to_season_length, _get_seasons, _predict_seasonal_naive @@ -46,7 +47,7 @@ def __post_init__(self): model_params = deepcopy(self.params) model_params.pop("_data_params") self.n_forecasts = model_params["n_forecasts"] - assert self.n_forecasts >= 1, "Model parameter n_forecasts must be >=1. " + raise_if(self.n_forecasts < 1, "Model parameter n_forecasts must be >=1.") self.season_length = None # always select seasonality provided by dataset first @@ -60,14 +61,17 @@ def __post_init__(self): ) elif "season_length" in model_params: self.season_length = model_params["season_length"] # for seasonal naive season_length is input parameter - assert self.season_length is not None, ( + + raise_if( + self.season_length is None, "Dataset does not provide a seasonality. Assign a seasonality to each of the datasets " "OR input desired season_length as model parameter to be used for all datasets " - "without specified seasonality." + "without specified seasonality.", + ) + raise_if( + self.season_length <= 1, + "season_length must be >1 for SeasonalNaiveModel. For season_length=1 select NaiveModel " "instead.", ) - assert ( - self.season_length > 1 - ), "season_length must be >1 for SeasonalNaiveModel. For season_length=1 select NaiveModel instead." def fit(self, df: pd.DataFrame, freq: str): pass @@ -134,6 +138,10 @@ class NaiveModel(SeasonalNaiveModel): ---------- n_forecasts : int number of steps ahead of prediction time step to forecast + Raises + ------- + ValueError + If Model parameter n_forecasts is less than 1. """ model_name: str = "NaiveModel" @@ -144,5 +152,5 @@ def __post_init__(self): model_params = deepcopy(self.params) model_params.pop("_data_params") self.n_forecasts = model_params["n_forecasts"] - assert self.n_forecasts >= 1, "Model parameter n_forecasts must be >=1. " + raise_if(self.n_forecasts < 1, "Model parameter n_forecasts must be >=1.") self.season_length = 1 # season_length=1 for NaiveModel diff --git a/tot/models/models_neuralprophet.py b/tot/models/models_neuralprophet.py index 405aa28..788c304 100644 --- a/tot/models/models_neuralprophet.py +++ b/tot/models/models_neuralprophet.py @@ -7,6 +7,7 @@ from neuralprophet import NeuralProphet, TorchProphet from tot.df_utils import _check_min_df_len, add_first_inputs_to_df, drop_first_inputs_from_df, prep_or_copy_df +from tot.error_utils import raise_if from tot.models.models import Model from tot.models.utils import _get_seasons @@ -174,10 +175,11 @@ def __post_init__(self): # TorchProphet does not support uncertainty model_params.update({"interval_width": 0}) # TorchProphet does not support n_forecasts>1 and n_lags>0 - if "n_forecasts" in model_params: - assert model_params.n_forecasts == 1, "TorchProphet does not support n_forecasts >1." - if "n_lags" in model_params: - assert model_params.n_lags == 0, "TorchProphet does not support n_lags >0." + raise_if( + "n_forecasts" in model_params and model_params["n_forecasts"] > 1, + "TorchProphet does not support " "n_forecasts >1.", + ) + raise_if("n_lags" in model_params and model_params["n_lags"] > 0, "TorchProphet does not support n_lags >0.") self.model = self.model_class(**model_params) if custom_seasonalities is not None: diff --git a/tot/models/utils.py b/tot/models/utils.py index 5738da5..a45e494 100644 --- a/tot/models/utils.py +++ b/tot/models/utils.py @@ -6,6 +6,8 @@ import pandas as pd from darts import TimeSeries +from tot.df_utils import _validate_single_ID_df + log = logging.getLogger("tot.utils") FREQ_TO_SEASON_STEP_MAPPING = { @@ -59,7 +61,8 @@ def reshape_raw_predictions_to_forecast_df( e.g. yhat3 is the prediction for this datetime, predicted 3 steps ago, "3 steps old". """ - assert len(df["ID"].unique()) == 1 + _validate_single_ID_df(df) + cols = ["ds", "y", "ID"] # cols to keep from df fcst_df = pd.concat((df[cols],), axis=1) # create a line for each forecast_lag @@ -127,7 +130,6 @@ def convert_df_to_TimeSeries(df, freq) -> TimeSeries: time series to be fitted or predicted """ - # Receives df with single ID column received_single_ts = len(df["ID"].unique()) == 1 if not received_single_ts: @@ -233,8 +235,7 @@ def _predict_single_raw_seasonal_naive(df, season_length, n_forecasts): np.array array containing the predictions """ - # Receives df with single ID column - assert len(df["ID"].unique()) == 1 + _validate_single_ID_df(df) dates = df["ds"].iloc[season_length : -n_forecasts + 1].reset_index(drop=True) # assemble last values based on season_length diff --git a/tot/plot_utils.py b/tot/plot_utils.py index 82752dd..4ac348c 100644 --- a/tot/plot_utils.py +++ b/tot/plot_utils.py @@ -1,5 +1,4 @@ import logging -from typing import Optional import arrow import numpy as np @@ -7,10 +6,10 @@ from plotly_resampler import register_plotly_resampler, unregister_plotly_resampler from tot.df_utils import prep_or_copy_df +from tot.error_utils import raise_if log = logging.getLogger("tot.plot") - # UI Configuration prediction_color = "#2d92ff" actual_color = "black" @@ -37,95 +36,28 @@ } -def log_value_error_invalid_plotting_backend_input(): - raise ValueError( - "Selected plotting backend invalid. Set plotting backend to one of the " - "valid options 'plotly','plotly-auto','plotly-resampler'." - ) - - -def log_value_error_invalid_highlight_forecast_input(): - raise ValueError( - "input for highlight_forecast invalid. Set highlight_forecast step equal to" - " or smaller than the prediction horizon" - ) - - -def log_warning_resampler_invalid_env(): - log.warning( - "Warning: plotly-resampler not supported for the environment you are using. " - "Consider switching plotting_backend to 'plotly' or 'matplotlib " - ) - - -def log_warning_resampler_switch_to_valid_env(): - log.warning( - "Warning: plotly-resampler not supported for the environment you are using. " - "Plotting backend automatically switched to 'plotly' without resampling " - ) - - -def validate_current_env_for_resampler(auto: bool = False) -> Optional[bool]: +def is_jupyter_notebook(): """ - Validate the current environment to check if it is a valid environment for "plotly-resampler" and if invalid trigger - warning message. + Determine if the code is being executed in a Jupyter notebook environment. - Parameters - ---------- - auto: bool, optional - If True, the function will automatically switch to a valid environment if the current environment is not valid. - If False, the function will return None if the current environment is not valid. Returns ------- bool : - True if the current environment is a valid environment to run the code, False if the current environment is - not a valid environment to run the code. None if the current environment is not a valid environment to run - the code and the function did not switch to a valid environment. + True if the code is being executed in a Jupyter notebook, False otherwise. """ - from IPython import get_ipython if "google.colab" in str(get_ipython()): - if auto: - log_warning_resampler_switch_to_valid_env() - return False - else: - log_warning_resampler_invalid_env() - return None - else: - if is_notebook(): - return True - else: - if auto: - log_warning_resampler_switch_to_valid_env() - return False - else: - log_warning_resampler_invalid_env() - return None - - -def is_notebook(): - """ - Determine if the code is being executed in a Jupyter notebook environment. + return False - Returns - ------- - bool : - True if the code is being executed in a Jupyter notebook, False otherwise. - """ try: from IPython.core.getipython import get_ipython - if "ipykernel" not in str(get_ipython()): # pragma: no cover - return False + return "ipykernel" in str(get_ipython()) # pragma: no cover - except ImportError: - return False - except AttributeError: + except [ImportError, AttributeError]: return False - return True - def select_plotting_backend(plotting_backend): """ @@ -146,12 +78,20 @@ def select_plotting_backend(plotting_backend): The new plotting backend. """ if plotting_backend is None: - if validate_current_env_for_resampler(auto=True): + if is_jupyter_notebook(): plotting_backend = "plotly-resampler" else: + log.warning( + "Warning: plotly-resampler not supported for the environment you are using. " + "Plotting backend is set to 'plotly' without resampling " + ) plotting_backend = "plotly" elif plotting_backend == "plotly-resampler": - validate_current_env_for_resampler() + if not is_jupyter_notebook(): + log.warning( + "Warning: plotly-resampler not supported for the environment you are using. " + "Consider switching plotting_backend to 'plotly' or 'matplotlib " + ) return plotting_backend.lower() @@ -174,8 +114,11 @@ def validate_plotting_backend_input(plotting_backend): None """ valid_plotting_backends = [None, "plotly", "plotly-resampler"] - if plotting_backend not in valid_plotting_backends: - log_value_error_invalid_plotting_backend_input() + raise_if( + plotting_backend not in valid_plotting_backends, + "Selected plotting backend invalid. Set plotting backend to one of the " + "valid options 'plotly','plotly-auto','plotly-resampler'.", + ) def validate_highlight_forecast_input(highlight_forecast, fcst): @@ -199,8 +142,13 @@ def validate_highlight_forecast_input(highlight_forecast, fcst): None """ n_yhat = len([col for col in fcst.columns if "yhat" in col]) - if highlight_forecast is not None and highlight_forecast > n_yhat: - log_value_error_invalid_highlight_forecast_input() + is_highlight_forecast_valid = highlight_forecast is None or highlight_forecast < n_yhat + raise_if( + not is_highlight_forecast_valid, + "Input for highlight_forecast invalid. " + "Set highlight_forecast step equal to " + " or smaller than the prediction horizon", + ) def validate_df_name_input(df_name, fcst): @@ -221,7 +169,7 @@ def validate_df_name_input(df_name, fcst): Raises ------ - AssertionError + ValueError If the input DataFrame contains more than one time series and the df_name argument is not provided, or if the specified df_name is not present in the DataFrame. @@ -229,9 +177,12 @@ def validate_df_name_input(df_name, fcst): fcst, received_ID_col, received_single_time_series, _ = prep_or_copy_df(fcst) if not received_single_time_series: if df_name not in fcst["ID"].unique(): - assert ( - len(fcst["ID"].unique()) > 1 - ), "Many time series are present in the pd.DataFrame (more than one ID). Please, especify ID to be plotted." + raise_if( + len(fcst["ID"].unique()) > 1, + "Many time series are present in the pd.DataFrame (more than one " + "ID). Please, " + "especify ID to be plotted.", + ) fcst = fcst[fcst["ID"] == df_name].copy(deep=True) log.info(f"Plotting data from ID {df_name}") return fcst