diff --git a/ci/ci_test_env.yml b/ci/ci_test_env.yml index cee805149..08db99442 100644 --- a/ci/ci_test_env.yml +++ b/ci/ci_test_env.yml @@ -26,6 +26,7 @@ dependencies: - PyWavelets - pandas - scikit-image + - scikit-learn - rasterio - gdal # Test dependencies diff --git a/doc/source/pysteps_reference/blending.rst b/doc/source/pysteps_reference/blending.rst index 35fe47c01..09e6c294b 100644 --- a/doc/source/pysteps_reference/blending.rst +++ b/doc/source/pysteps_reference/blending.rst @@ -6,6 +6,8 @@ Implementation of blending methods for blending (ensemble) nowcasts with Numeric .. automodule:: pysteps.blending.interface .. automodule:: pysteps.blending.clim +.. automodule:: pysteps.blending.ens_kalman_filter_blending +.. automodule:: pysteps.blending.ens_kalman_filter_methods .. automodule:: pysteps.blending.linear_blending .. automodule:: pysteps.blending.skill_scores .. automodule:: pysteps.blending.steps diff --git a/doc/source/pysteps_reference/utils.rst b/doc/source/pysteps_reference/utils.rst index 0622e6bbe..6682dc669 100644 --- a/doc/source/pysteps_reference/utils.rst +++ b/doc/source/pysteps_reference/utils.rst @@ -13,7 +13,8 @@ Implementation of miscellaneous utility functions. .. automodule:: pysteps.utils.fft .. automodule:: pysteps.utils.images .. automodule:: pysteps.utils.interpolate +.. automodule:: pysteps.utils.pca +.. automodule:: pysteps.utils.reprojection .. automodule:: pysteps.utils.spectral .. automodule:: pysteps.utils.tapering .. automodule:: pysteps.utils.transformation -.. automodule:: pysteps.utils.reprojection diff --git a/doc/source/references.bib b/doc/source/references.bib index ee26c6d5b..287590040 100644 --- a/doc/source/references.bib +++ b/doc/source/references.bib @@ -388,3 +388,14 @@ @ARTICLE{Imhoff2023 YEAR = 2023, DOI = "10.1002/qj.4461" } + +@ARTICLE{Nerini2019MWR, + title = {A {Reduced}-{Space} {Ensemble} {Kalman} {Filter} {Approach} for {Flow}-{Dependent} {Integration} of {Radar} {Extrapolation} {Nowcasts} and {NWP} {Precipitation} {Ensembles}}, + volume = {147}, + doi = {10.1175/MWR-D-18-0258.1}, + number = {3}, + journal = {Monthly Weather Review}, + author = {D. Nerini and L. Foresti and D. Leuenberger and S. Robert and U. Germann}, + year = {2019}, + pages = {987--1006}, +} diff --git a/doc/source/user_guide/example_data.rst b/doc/source/user_guide/example_data.rst index e1e279b21..db683abe3 100644 --- a/doc/source/user_guide/example_data.rst +++ b/doc/source/user_guide/example_data.rst @@ -76,6 +76,7 @@ directory structure looks like this:: ├── KNMI ├── OPERA ├── bom + ├── dwd ├── fmi ├── mch diff --git a/environment.yml b/environment.yml index 63a4ad496..bfa7bfdb8 100644 --- a/environment.yml +++ b/environment.yml @@ -3,7 +3,7 @@ channels: - conda-forge - defaults dependencies: - - python>=3.11 + - python>=3.10 - jsmin - jsonschema - matplotlib diff --git a/environment_dev.yml b/environment_dev.yml index f911dae27..7ea35ad4f 100644 --- a/environment_dev.yml +++ b/environment_dev.yml @@ -4,7 +4,7 @@ channels: - conda-forge - defaults dependencies: - - python>=3.11 + - python>=3.10 - pip - jsmin - jsonschema @@ -29,5 +29,6 @@ dependencies: - pre_commit - cartopy>=0.18 - scikit-image + - scikit-learn - pandas - rasterio diff --git a/examples/ens_kalman_filter_blended_forecast.py b/examples/ens_kalman_filter_blended_forecast.py new file mode 100644 index 000000000..ccc65ba01 --- /dev/null +++ b/examples/ens_kalman_filter_blended_forecast.py @@ -0,0 +1,291 @@ +# -*- coding: utf-8 -*- +""" +Blended forecast +==================== + +This tutorial shows how to construct a blended forecast between an ensemble nowcast +and an ensemble Numerical Weather Prediction (NWP) rainfall forecast using the +Reduced-Space Ensemble Kalman filter approach in :cite:`Nerini2019MWR`. The used +datasets are from the German Weather Service (DWD). + +""" + +import os +from datetime import datetime, timedelta + +import numpy as np +from matplotlib import pyplot as plt + +import pysteps +from pysteps import io, rcparams, blending +from pysteps.visualization import plot_precip_field +import pysteps_nwp_importers + + +################################################################################ +# Read the radar images and the NWP forecast +# ------------------------------------------ +# +# First, we import a sequence of 4 images of 5-minute radar composites +# and the corresponding NWP rainfall forecast that was available at that time. +# +# You need the pysteps-data archive downloaded and the pystepsrc file +# configured with the data_source paths pointing to data folders. +# Additionally, the pysteps-nwp-importers plugin needs to be installed, see +# https://github.com/pySTEPS/pysteps-nwp-importers. + +# Selected case +date_radar = datetime.strptime("202506041645", "%Y%m%d%H%M") +# The last NWP forecast was issued at 16:00 - the blending tool will be able +# to find the correct lead times itself. +date_nwp = datetime.strptime("202506041600", "%Y%m%d%H%M") +radar_data_source = rcparams.data_sources["dwd"] +nwp_data_source = rcparams.data_sources["dwd_nwp"] + + +############################################################################### +# Load the data from the archive +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +root_path = radar_data_source["root_path"] +path_fmt = radar_data_source["path_fmt"] +fn_pattern = radar_data_source["fn_pattern"] +fn_ext = radar_data_source["fn_ext"] +importer_name = radar_data_source["importer"] +importer_kwargs = radar_data_source["importer_kwargs"] +timestep_radar = radar_data_source["timestep"] + +# Find the radar files in the archive +fns = io.find_by_date( + date_radar, + root_path, + path_fmt, + fn_pattern, + fn_ext, + timestep_radar, + num_prev_files=2, +) + +# Read the radar composites (which are already in mm/h) +importer = io.get_method(importer_name, "importer") +radar_precip, _, radar_metadata = io.read_timeseries(fns, importer, **importer_kwargs) + +# Import the NWP data +filename = os.path.join( + nwp_data_source["root_path"], + datetime.strftime(date_nwp, nwp_data_source["path_fmt"]), + datetime.strftime(date_nwp, nwp_data_source["fn_pattern"]) + + "." + + nwp_data_source["fn_ext"], +) +nwp_importer = io.get_method("dwd_nwp", "importer") +kwargs = { + "varname": "lsprate", + "grid_file_path": "./aux/grid_files/dwd/icon/R19B07/icon_grid_0047_R19B07_L.nc", +} +nwp_precip, _, nwp_metadata = nwp_importer(filename, **kwargs) + + +################################################################################ +# Pre-processing steps +# -------------------- + +# Set the zerovalue and precipitation thresholds (these are fixed from DWD) +prec_thr = 0.049 +zerovalue = 0.027 + +# Transform the zerovalue and precipitation thresholds to dBR +log_thr_prec = 10.0 * np.log10(prec_thr) +log_thr_min = 10.0 * np.log10(zerovalue) + +# Reproject the DWD ICON NWP data onto a regular grid +nwp_metadata["clon"] = nwp_precip["longitude"].values +nwp_metadata["clat"] = nwp_precip["latitude"].values +# We change the time step from the DWD NWP data to 15 min (it is actually 5 min) +# to have a longer forecast horizon available for this example, as pysteps_data +# only contains 1 hour of DWD forecast data (to minimize storage). +nwp_metadata["accutime"] = 15.0 +nwp_precip = ( + nwp_precip.values * 3.0 +) # (to account for the change in time step from 5 to 15 min) + +# Reproject ID2 data onto a regular grid +nwp_precip_rprj, nwp_metadata_rprj = ( + pysteps_nwp_importers.importer_dwd_nwp.unstructured2regular( + nwp_precip, nwp_metadata, radar_metadata + ) +) + +# Make sure the units are in mm/h +converter = pysteps.utils.get_method("mm/h") +radar_precip, radar_metadata = converter( + radar_precip, radar_metadata +) # The radar data should already be in mm/h +nwp_precip_rprj, nwp_metadata_rprj = converter(nwp_precip_rprj, nwp_metadata_rprj) + +# Threshold the data +radar_precip[radar_precip < prec_thr] = 0.0 +nwp_precip_rprj[nwp_precip_rprj < prec_thr] = 0.0 + +# Plot the radar rainfall field and the first time step and first ensemble member +# of the NWP forecast. +date_str = datetime.strftime(date_radar, "%Y-%m-%d %H:%M") +plt.figure(figsize=(10, 5)) +plt.subplot(121) +plot_precip_field( + radar_precip[-1, :, :], + geodata=radar_metadata, + title=f"Radar observation at {date_str}", + colorscale="STEPS-NL", +) +plt.subplot(122) +plot_precip_field( + nwp_precip_rprj[0, 0, :, :], + geodata=nwp_metadata_rprj, + title=f"NWP forecast at {date_str}", + colorscale="STEPS-NL", +) +plt.tight_layout() +plt.show() + +# transform the data to dB +transformer = pysteps.utils.get_method("dB") +radar_precip, radar_metadata = transformer( + radar_precip, radar_metadata, threshold=prec_thr, zerovalue=log_thr_min +) +nwp_precip_rprj, nwp_metadata_rprj = transformer( + nwp_precip_rprj, nwp_metadata_rprj, threshold=prec_thr, zerovalue=log_thr_min +) + + +############################################################################### +# Determine the velocity fields +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# In contrast to the STEPS blending method, no motion field for the NWP fields +# is needed in the ensemble kalman filter blending approach. + +# Estimate the motion vector field +oflow_method = pysteps.motion.get_method("lucaskanade") +velocity_radar = oflow_method(radar_precip) + + +################################################################################ +# The blended forecast +# ~~~~~~~~~~~~~~~~~~~~ + +# Set the timestamps for radar_precip and nwp_precip_rprj +timestamps_radar = np.array( + sorted( + [ + date_radar - timedelta(minutes=i * timestep_radar) + for i in range(len(radar_precip)) + ] + ) +) +timestamps_nwp = np.array( + sorted( + [ + date_nwp + timedelta(minutes=i * int(nwp_metadata_rprj["accutime"])) + for i in range(nwp_precip_rprj.shape[0]) + ] + ) +) + +# Set the combination kwargs +combination_kwargs = dict( + n_tapering=0, # No. of principal components of the ens. forecast for which the cov. matrix is computed + non_precip_mask=True, # Specifies whether the computation should be truncated on grid boxes where at least a minimum number of ens. members forecast precipitation. + n_ens_prec=1, # Minimum number of ens. members that forecast precip for the above-mentioned mask. + lien_criterion=True, # Specifies wheter the Lien criterion should be applied. + n_lien=10, # Minimum number of ensemble members that forecast precipitation for the Lien criterion (equals half the ens. members here) + prob_matching="iterative", # The type of probability matching used. + inflation_factor_bg=1.8, # Inflation factor of the background (NWC) covariance matrix. (this value indicates a faster convergence towards the NWP ensemble) + inflation_factor_obs=1.0, # Inflation factor of the observation (NWP) covariance matrix. + offset_bg=0.0, # Offset of the background (NWC) covariance matrix. + offset_obs=0.0, # Offset of the observation (NWP) covariance matrix. + nwp_hres_eff=14.0, # Effective horizontal resolution of the utilized NWP model (in km here). + sampling_prob_source="ensemble", # Computation method of the sampling probability for the probability matching. 'ensemble' computes this probability as the ratio between the ensemble differences. + use_accum_sampling_prob=False, # Specifies whether the current sampling probability should be used for the probability matching or a probability integrated over the previous forecast time. +) + + +# Call the PCA EnKF method +blending_method = blending.get_method("pca_enkf") +precip_forecast = blending_method( + obs_precip=radar_precip, # Radar data in dBR + obs_timestamps=timestamps_radar, # Radar timestamps + nwp_precip=nwp_precip_rprj, # NWP in dBR + nwp_timestamps=timestamps_nwp, # NWP timestamps + velocity=velocity_radar, # Velocity vector field + forecast_horizon=120, # Forecast length (horizon) in minutes - only a short forecast horizon due to the limited dataset length stored here. + issuetime=date_radar, # Forecast issue time as datetime object + n_ens_members=20, # No. of ensemble members + precip_mask_dilation=1, # Dilation of precipitation mask in grid boxes + n_cascade_levels=6, # No. of cascade levels + precip_thr=log_thr_prec, # Precip threshold + norain_thr=log_thr_min, # Zerovalue + num_workers=4, # No. of parallel threads + noise_stddev_adj="auto", # Standard deviation adjustment + noise_method="ssft", # SSFT as noise method + enable_combination=True, # Enable combination + noise_kwargs={"win_size": (512, 512), "win_fun": "hann", "overlap": 0.5}, + extrap_kwargs={"interp_order": 3, "map_coordinates_mode": "nearest"}, + combination_kwargs=combination_kwargs, + filter_kwargs={"include_mean": True}, +) + +# Transform the data back into mm/h +precip_forecast, _ = converter(precip_forecast, radar_metadata) +radar_precip, _ = converter(radar_precip, radar_metadata) +nwp_precip, _ = converter(nwp_precip_rprj, nwp_metadata_rprj) + + +################################################################################ +# Visualize the output +# ~~~~~~~~~~~~~~~~~~~~ +# +# The NWP rainfall forecast has a much lower weight than the radar-based +# extrapolation # forecast at the issue time of the forecast (+0 min). Therefore, +# the first time steps consist mostly of the extrapolation. However, near the end +# of the forecast (+180 min), the NWP share in the blended forecast has become +# the more dominant contribution to the forecast and thus the forecast starts +# to resemble the NWP forecast. + +fig = plt.figure(figsize=(5, 12)) + +leadtimes_min = [15, 30, 45, 60, 90, 120] +n_leadtimes = len(leadtimes_min) +for n, leadtime in enumerate(leadtimes_min): + # Nowcast with blending into NWP + plt.subplot(n_leadtimes, 2, n * 2 + 1) + plot_precip_field( + precip_forecast[0, int(leadtime / timestep_radar) - 1, :, :], + geodata=radar_metadata, + title=f"Blended +{leadtime} min", + axis="off", + colorscale="STEPS-NL", + colorbar=False, + ) + + # Raw NWP forecast + plt.subplot(n_leadtimes, 2, n * 2 + 2) + plot_precip_field( + nwp_precip[int(leadtime / int(nwp_metadata_rprj["accutime"])) - 1, 0, :, :], + geodata=nwp_metadata_rprj, + title=f"NWP +{leadtime} min", + axis="off", + colorscale="STEPS-NL", + colorbar=False, + ) + + +################################################################################ +# References +# ~~~~~~~~~~ +# + +# Nerini, D., Foresti, L., Leuenberger, D., Robert, S., Germann, U. 2019. "A +# Reduced-Space Ensemble Kalman Filter Approach for Flow-Dependent Integration +# of Radar Extrapolation Nowcasts and NWP Precipitation Ensembles." Monthly +# Weather Review 147(3): 987-1006. https://doi.org/10.1175/MWR-D-18-0258.1. diff --git a/examples/blended_forecast.py b/examples/steps_blended_forecast.py similarity index 100% rename from examples/blended_forecast.py rename to examples/steps_blended_forecast.py diff --git a/pysteps/blending/ens_kalman_filter_methods.py b/pysteps/blending/ens_kalman_filter_methods.py new file mode 100644 index 000000000..8f58dac1d --- /dev/null +++ b/pysteps/blending/ens_kalman_filter_methods.py @@ -0,0 +1,635 @@ +# -*- coding: utf-8 -*- +""" +pysteps.blending.ens_kalman_filter_methods +============================================= +Methods to calculate the ensemble Kalman filter based correction methods for blending +between nowcast and NWP. +The core of the method occurs in the EnsembleKalmanFilter class. The specific method +to use this core class can be selected. Currently, only the implementation of the +ensemble Kalman filter from :cite:`Nerini2019MWR` is available. + +Additional keyword arguments for the ensemble Kalman filter are: + +n_tapering: int, (0) + Number of grid boxes/principal components of the ensemble forecast for which + the covariance matrix is computed. Defaults to 0. +non_precip_mask: bool, (True) + Flag to specify whether the computation should be truncated on grid boxes where at + least a minimum number (configurable) of ensemble members forecast precipitation. + Defaults to True. +n_ens_prec: int, (1) + Minimum number of ensemble members that forecast precipitation for the above + mentioned mask. Defaults to 1. +lien_criterion: bool, (True) + Flag to specify whether Lien criterion (Lien et al., 2013) should be applied for + the computation of the update step within the ensemble Kalman filter. Defaults to + True. +n_lien: int, (n_ens_members/2) + Minimum number of ensemble members that forecast precipitation for the Lien + criterion. Defaults to half of the ensemble members. +prob_matching: str, {'iterative','post_forecast','none'} + Specify the probability matching method that should be applied as an additional + processing step of the forecast computation. Defaults to 'iterative'. +inflation_factor_bg: float, (1.0) + Inflation factor of the background (NWC) covariance matrix. This factor increases + the covariances of the background ensemble and, thus, supports a faster convergence + towards the observation ensemble (NWP). Defaults to 1.0. +inflation_factor_obs: float, (1.0) + Inflation factor of the observation (NWP) covariance matrix. This factor increases + the covariances of the observation ensemble (NWP) and, thus, supports a slower + convergence towards the observation ensemble. Defaults to 1.0. +offset_bg: float, (0.0) + Offset of the background (NWC) covariance matrix. This offset supports a faster + convergence towards the observation ensemble (NWP) by linearly increasing all + elements of the background covariance matrix. Defaults to 0.0. +offset_obs: float, (0.0) + Offset of the observation (NWP) covariance matrix. This offset supports a slower + convergence towards the observation ensemble (NWP) by linearly incerasing all + elements of the observation covariance matrix. Defaults to 0.0. +nwp_hres_eff: float + Effective horizontal resolution of the utilized NWP model. +sampling_prob_source: str, {'ensemble','explained_var'} + Computation method of the sampling probability for the probability matching. + 'ensemble' computes this probability as the ratio between the ensemble differences + of analysis_ensemble - background_ensemble and observation_ensemble - background_ensemble. + 'explained_var' uses the sum of the Kalman gain weighted by the explained variance ratio. +use_accum_sampling_prob: bool, (False) + Flag to specify whether the current sampling probability should be used for the + probability matching or a probability integrated over the previous forecast time. + Defaults to True. +ensure_full_nwp_weight: bool, (True) + Flag to specify whether the end of the combination should be represent the pure NWP + forecast. Defaults to True. +""" + + +import numpy as np + +from pysteps import utils +from pysteps.postprocessing import probmatching + +try: + import dask + + DASK_IMPORTED = True +except ImportError: + DASK_IMPORTED = False + + +class EnsembleKalmanFilter: + + def __init__(self, config, params): + + self._config = config + + # Check for combination kwargs in params + self.__n_tapering = params.combination_kwargs.get("n_tapering", 0) + self.__non_precip_mask = params.combination_kwargs.get("non_precip_mask", True) + self.__n_ens_prec = params.combination_kwargs.get("n_ens_prec", 1) + self.__lien_criterion = params.combination_kwargs.get("lien_criterion", True) + self.__n_lien = params.combination_kwargs.get( + "n_lien", self._config.n_ens_members // 2 + ) + + print("Initialize ensemble Kalman filter") + print("=================================") + print("") + + print(f"Non-tapered diagonals: {self.__n_tapering}") + print(f"Non precip mask: {self.__non_precip_mask}") + print(f"No. ens mems with precipitation: {self.__n_ens_prec}") + print(f"Lien Criterion: {self.__lien_criterion}") + print(f"No. ens mems with precip (Lien): {self.__n_lien}") + print("") + + def update( + self, + background_ensemble: np.ndarray, + observation_ensemble: np.ndarray, + inflation_factor_bg: float, + inflation_factor_obs: float, + offset_bg: float, + offset_obs: float, + background_ensemble_valid_lien: np.ndarray | None = None, + observation_ensemble_valid_lien: np.ndarray | None = None, + ): + """ + Compute the ensemble Kalman filter update step. + + Parameters + ---------- + background_ensemble: np.ndarray + Two-dimensional array of shape (n_ens, n_pc) containing the background + ensemble that corresponds to the Nowcast ensemble forecast. + observation_ensemble: np.ndarray + Two-dimensional array of shape (n_ens, n_pc) containing the observations + that correspond to the NWP ensemble forecast. + inflation_factor_bg: float + Inflation factor of the background ensemble covariance matrix. + inflation_factor_obs: float + Inflation factor of the observation covariance matrix. + offset_bg: float + Offset of the background ensemble covariance matrix. + offset_obs: float + Offset of the observation covariance matrix. + + Other Parameters + ---------------- + background_ensemble_valid_lien: np.ndarray + Two-dimensional array of shape (n_ens, n_pc) containing the background + ensemble that consists only of grid boxes at which the Lien criterion is + satisfied. + observation_ensemble_valid_lien: np.ndarray + Two-dimensional array of shape (n_ens, n_pc) containing the observations + that consists only of grid boxes at which the Lien criterion is satisfied. + + Returns + ------- + analysis_ensemble: np.ndarray + Two-dimensional array of shape (n_ens, n_pc) containing the updated + analysis matrix. + """ + + # If the masked background and observation arrays are given, compute the + # covariance matrices P and R only on these values. + if ( + background_ensemble_valid_lien is not None + and observation_ensemble_valid_lien is not None + ): + # Equation 13 in Nerini et al. (2019) + P = self.get_covariance_matrix( + background_ensemble_valid_lien, + inflation_factor=inflation_factor_bg, + offset=offset_bg, + ) + # Equation 14 in Nerini et al. (2019) + R = self.get_covariance_matrix( + observation_ensemble_valid_lien, + inflation_factor=inflation_factor_obs, + offset=offset_obs, + ) + # Otherwise use the complete arrays. + else: + # Equation 13 in Nerini et al. (2019) + P = self.get_covariance_matrix( + background_ensemble, + inflation_factor=inflation_factor_bg, + offset=offset_bg, + ) + # Equation 14 in Nerini et al. (2019) + R = self.get_covariance_matrix( + observation_ensemble, + inflation_factor=inflation_factor_obs, + offset=offset_obs, + ) + + # Estimate the Kalman gain (eq. 15 in Nerini et al., 2019) + self.K = np.dot(P, np.linalg.inv(P + R)) + + # Update the background ensemble (eq. 16 in Nerini et al., 2019) + analysis_ensemble = background_ensemble.T + np.dot( + self.K, (observation_ensemble - background_ensemble).T + ) + + return analysis_ensemble + + def get_covariance_matrix( + self, forecast_array: np.ndarray, inflation_factor: float, offset: float + ): + """ + Compute the covariance matrix of a given ensemble forecast along the grid boxes + or principal components as it is done by Eq. 13 and 14 in Nerini et al., 2019. + + Parameters + ---------- + forecast_array: np.ndarray + Two-dimensional array of shape (n_ens, n_pc) containing an ensemble + forecast of one lead time. + inflation_factor: float + Factor to increase the covariance and therefore the ensemble spread. + offset: float + Offset to shift the covariance. + + Returns + ------- + Cov: np.ndarray + Two-dimensional array of shape (n_pc, n_pc) containg the covariance matrix + of the given ensemble forecast. + """ + + # Compute the ensemble mean + ensemble_mean = np.mean(forecast_array, axis=0) + # Center the ensemble forecast and multiply with the given inflation factor + centered_ensemble = (forecast_array - ensemble_mean) * inflation_factor + # Compute the covariance matrix and add the respective offset and filter + # unwanted diagonals, respectively. + Cov = ( + 1 + / (forecast_array.shape[0] - 1) + * np.dot(centered_ensemble.T, centered_ensemble) + + offset + ) * self.get_tapering(forecast_array.shape[1]) + + return Cov + + def get_tapering(self, n: int): + """ + Create a window function to clip unwanted diagonals of the covariance matrix. + + Parameters + ---------- + n: integer + Number of grid boxes/principal components of the ensemble forecast for that + the covariance matrix is computed. + + Returns + ------- + window_function: np.ndarray + Two-dimensional array of shape (n_pc, n_pc) containing the window function + to filter unwanted diagonals of the covariance matrix. + """ + + # Create an n-dimensional I-matrix as basis of the window function + window_function = np.eye(n) + # Get the weightings of a hanning window function with respect to the number of + # diagonals that on want to keep + hanning_values = np.hanning(self.__n_tapering * 2 + 1)[ + (self.__n_tapering + 1) : + ] + + # Add the respective values to I + for d in range(self.__n_tapering): + window_function += np.diag(np.ones(n - d - 1) * hanning_values[d], k=d + 1) + window_function += np.diag(np.ones(n - d - 1) * hanning_values[d], k=-d - 1) + + return window_function + + def get_precipitation_mask(self, forecast_array: np.ndarray): + """ + Create the set of grid boxes where at least a minimum number (configurable) + of ensemble members forecast precipitation. + + Parameters + ---------- + forecast_array: np.ndarray + Two-dimensional array of shape (n_ens, n_grid) containg the ensemble + forecast for one lead time. + + Returns + ------- + idx_prec: np.ndarray + One-dimensional array of shape (n_grid) that is set to True if the minimum + number of ensemble members predict precipitation. + """ + + # Check the number of ensemble members forecast precipitation at each grid box. + forecast_array_sum = np.sum( + forecast_array >= self._config.precip_threshold, axis=0 + ) + + # If the masking of areas without precipitation is requested, mask grid boxes + # where less ensemble members predict precipitation than the set limit n_ens_prec. + if self.__non_precip_mask == True: + idx_prec = forecast_array_sum >= self.__n_ens_prec + # Else, set all to True. + else: + idx_prec = np.ones_like(forecast_array_sum).astype(bool) + + return idx_prec + + def get_lien_criterion(self, nwc_ensemble: np.ndarray, nwp_ensemble: np.ndarray): + """ + Create the set of grid boxes where the Lien criterion is satisfied (Lien et + al., 2013) and thus, at least half (configurable) of the ensemble members of + each forecast (Nowcast and NWP) predict precipitation. + + Parameters + ---------- + nwc_ensemble: np.ndarray + Two-dimensional array (n_ens, n_grid) containing the nowcast ensemble + forecast for one lead time. + nwp_ensemble: np.ndarray + Two-dimensional array (n_ens, n_grid) containg the NWP ensemble forecast + for one lead time. + + Returns + ------- + idx_lien: np.ndarray + One-dimensional array of shape (n_grid) that is set to True at grid boxes + where the Lien criterion is satisfied. + """ + + # Check the number of ensemble members forecast precipitation at each grid box. + nwc_ensemble_sum = np.sum(nwc_ensemble >= self._config.precip_threshold, axis=0) + nwp_ensemble_sum = np.sum(nwp_ensemble >= self._config.precip_threshold, axis=0) + + # If the masking of areas without precipitation is requested, mask grid boxes + # where less ensemble members predict precipitation than the set limit of n_ens_fc_prec. + if self.__lien_criterion: + idx_lien = np.logical_and( + nwc_ensemble_sum >= self.__n_lien, nwp_ensemble_sum >= self.__n_lien + ) + # Else, set all to True. + else: + idx_lien = np.ones_like(nwc_ensemble_sum).astype(bool) + + return idx_lien + + def get_weighting_for_probability_matching( + self, + background_ensemble: np.ndarray, + analysis_ensemble: np.ndarray, + observation_ensemble: np.ndarray, + ): + """ + Compute the weighting between background (nowcast) and observation (NWP) ensemble + that results to the updated analysis ensemble in physical space for an optional + probability matching. See equation 17 in Nerini et al. (2019). + + Parameters + ---------- + background_ensemble: np.ndarray + Two-dimensional array of shape (n_ens, n_grid) containing the background + ensemble (Original nowcast). + analysis_ensemble: np.ndarray + Two-dimensional array of shape (n_ens, n_grid) containing the updated + analysis ensemble. + observation_ensemble: np.ndarray + Two-dimensional array of shape (n_ens, n_grid) containing the observation + ensemble (NWP). + + Returns + ------- + prob_matching_weight: float + A weighting of which elements of the input ensemble contributed to the + updated analysis ensemble with respect to observation_ensemble. Therefore, 0 + means that the contribution comes entirely from the background_ensemble (the + original nowcast). 1 means that the contribution comes entirely from the + observation_ensemble (the NWP forecast). + """ + + # Compute the sum of differences between analysis_ensemble and background_ensemble + # as well as observation_ensemble and background_ensemble along the grid boxes. + w1 = np.sum(analysis_ensemble - background_ensemble, axis=0) + w2 = np.sum(observation_ensemble - background_ensemble, axis=0) + + # Check for infinitesimal differences between w1 and w2 as well as 0. + w_close = np.isclose(w1, w2) + w_zero = np.logical_and(w_close, np.isclose(w2, 0.0)) + + # Compute the fraction of w1 and w2 and set values on grid boxes marked by + # w_close or w_zero to 1 and 0, respectively. + prob_matching_weight = np.zeros_like(w1) + prob_matching_weight[~w_zero] = w1[~w_zero] / w2[~w_zero] + prob_matching_weight[w_close] = 1.0 + + # Even now we have at some grid boxes weights outside the range between 0 + # and 1. Therefore, we leave them out in the calculation of the averaged + # weighting. + valid_values = np.logical_and( + prob_matching_weight >= 0.0, prob_matching_weight <= 1.0 + ) + prob_matching_weight = np.nanmean(prob_matching_weight[valid_values]) + + # If there is no finite prob_matching_weight, we are switching to the NWP + if not np.isfinite(prob_matching_weight): + prob_matching_weight = 1.0 + + return prob_matching_weight + + +class MaskedEnKF(EnsembleKalmanFilter): + + def __init__(self, config, params): + + EnsembleKalmanFilter.__init__(self, config, params) + self.__params = params + + # Read arguments from combination kwargs or set standard values if kwargs not + # given + self.__iterative_prob_matching = self.__params.combination_kwargs.get( + "iterative_prob_matching", True + ) + self.__inflation_factor_bg = self.__params.combination_kwargs.get( + "inflation_factor_bg", 1.0 + ) + self.__inflation_factor_obs = self.__params.combination_kwargs.get( + "inflation_factor_obs", 1.0 + ) + self.__offset_bg = self.__params.combination_kwargs.get("offset_bg", 0.0) + self.__offset_obs = self.__params.combination_kwargs.get("offset_obs", 0.0) + self.__sampling_prob_source = self.__params.combination_kwargs.get( + "sampling_prob_source", "ensemble" + ) + self.__use_accum_sampling_prob = self.__params.combination_kwargs.get( + "use_accum_sampling_prob", False + ) + self.__ensure_full_nwp_weight = self.__params.combination_kwargs.get( + "ensure_full_nwp_weight", True + ) + + self.__sampling_probability = 0.0 + self.__accumulated_sampling_prob = 0.0 + self.__degradation_timestep = 0.2 + self.__inflation_factor_obs_tmp = 1.0 + + print("Initialize masked ensemble Kalman filter") + print("========================================") + print("") + + print(f"Iterative probability matching: {self.__iterative_prob_matching}") + print(f"Background inflation factor: {self.__inflation_factor_bg}") + print(f"Observation inflation factor: {self.__inflation_factor_obs}") + print(f"Background offset: {self.__offset_bg}") + print(f"Observation offset: {self.__offset_obs}") + print(f"Sampling probability source: {self.__sampling_prob_source}") + print(f"Use accum. sampling probability: {self.__use_accum_sampling_prob}") + print(f"Ensure full NWP weight: {self.__ensure_full_nwp_weight}") + + return + + def correct_step( + self, background_ensemble, observation_ensemble, resampled_forecast + ): + """ + Prepare input ensembles of Nowcast and NWP for the ensemble Kalman filter + update step. + + Parameters + ---------- + background_ensemble: np.ndarray + Three-dimensional array of shape (n_ens, m, n) containing the background + (Nowcast) ensemble forecast for one timestep. This data is used as background + ensemble in the ensemble Kalman filter. + observation_ensemble: np.ndarray + Three-dimensional array of shape (n_ens, m, n) containing the observation + (NWP) ensemble forecast for one timestep. This data is used as observation + ensemble in the ensemble Kalman filter. + resampled_forecast: np.ndarray + Three-dimensional array of shape (n_ens, m, n) containing the resampled (post- + processed) ensemble forecast for one timestep. + + Returns + ------- + analysis_ensemble: np.ndarray + Three-dimensional array of shape (n_ens, m, n) containing the Nowcast + ensemble forecast corrected by NWP ensemble data. + resampled_forecast: np.ndarray + Three-dimensional array of shape (n_ens, m, n) containing the resampled (post- + processed) ensemble forecast for one timestep. + """ + + # Get indices with predicted precipitation. + idx_prec = np.logical_or( + self.get_precipitation_mask(background_ensemble), + self.get_precipitation_mask(observation_ensemble), + ) + + # Get indices with satisfied Lien criterion and truncate the array onto the + # precipitation area. + idx_lien = self.get_lien_criterion(background_ensemble, observation_ensemble)[ + idx_prec + ] + + # Stack both ensemble forecasts and truncate them onto the precipitation area. + forecast_ens_stacked = np.vstack((background_ensemble, observation_ensemble))[ + :, idx_prec + ] + + # Remove possible non-finite values + forecast_ens_stacked[~np.isfinite(forecast_ens_stacked)] = ( + self._config.norain_threshold + ) + + # Check whether there are more rainy grid boxes as two times the ensemble + # members + if np.sum(idx_prec) <= forecast_ens_stacked.shape[0]: + # If this is the case, the NWP ensemble forecast is returned + return observation_ensemble + + # Transform both ensemble forecasts into the PC space. + kwargs = {"n_components": forecast_ens_stacked.shape[0], "svd_solver": "full"} + forecast_ens_stacked_pc, pca_params = utils.pca.pca_transform( + forecast_ens=forecast_ens_stacked, get_params=True, **kwargs + ) + + # And do that transformation also for the Lien criterion masked values. + forecast_ens_lien_pc = utils.pca.pca_transform( + forecast_ens=forecast_ens_stacked, + mask=idx_lien, + pca_params=pca_params, + **kwargs, + ) + + if not np.isclose(self.__accumulated_sampling_prob, 1.0, rtol=1e-2): + self.__inflation_factor_obs_tmp = ( + self.__inflation_factor_obs + - self.__accumulated_sampling_prob * (self.__inflation_factor_obs - 1.0) + ) + else: + self.__inflation_factor_obs_tmp = np.cos(self.__degradation_timestep) + self.__degradation_timestep += 0.2 + + # Get the updated background ensemble (Nowcast ensemble) in PC space. + analysis_ensemble_pc = self.update( + background_ensemble=forecast_ens_stacked_pc[: background_ensemble.shape[0]], + observation_ensemble=forecast_ens_stacked_pc[ + background_ensemble.shape[0] : + ], + inflation_factor_bg=self.__inflation_factor_bg, + inflation_factor_obs=self.__inflation_factor_obs_tmp, + offset_bg=self.__offset_bg, + offset_obs=self.__offset_obs, + background_ensemble_valid_lien=forecast_ens_lien_pc[ + : background_ensemble.shape[0] + ], + observation_ensemble_valid_lien=forecast_ens_lien_pc[ + background_ensemble.shape[0] : + ], + ) + + # Transform the analysis ensemble back into physical space. + analysis_ensemble = utils.pca.pca_backtransform( + forecast_ens_pc=analysis_ensemble_pc.T, pca_params=pca_params + ) + + # Get the sampling probability either based on the ensembles... + if self.__sampling_prob_source == "ensemble": + sampling_probability_single_step = ( + self.get_weighting_for_probability_matching( + background_ensemble=forecast_ens_stacked[ + : background_ensemble.shape[0] + ][:, idx_lien], + analysis_ensemble=analysis_ensemble[:, idx_lien], + observation_ensemble=forecast_ens_stacked[ + background_ensemble.shape[0] : + ][:, idx_lien], + ) + ) + # ...or based on the explained variance weighted Kalman gain. + elif self.__sampling_prob_source == "explained_var": + sampling_probability_single_step = np.sum( + np.diag(self.K) * pca_params["explained_variance"] + ) + else: + raise ValueError( + f"Sampling probability source should be either 'ensemble' or 'explained_var', but is {self.__sampling_prob_source}!" + ) + + # Adjust sampling probability when the accumulation flag is set + if self.__use_accum_sampling_prob == True: + self.__sampling_probability = ( + 1 - sampling_probability_single_step + ) * self.__sampling_probability + sampling_probability_single_step + else: + self.__sampling_probability = sampling_probability_single_step + + # The accumulation is divided for cases one would not use the accumulated + # sampling probability for the probability matching, but still wants to have + # the pure NWP forecast at the end of a combined forecast. + if self.__ensure_full_nwp_weight == True: + self.__accumulated_sampling_prob = ( + 1 - sampling_probability_single_step + ) * self.__accumulated_sampling_prob + sampling_probability_single_step + + print(f"Sampling probability: {self.__sampling_probability:1.4f}") + + # Apply probability matching to the analysis ensemble + if self.__iterative_prob_matching: + + def worker(j): + # Get the combined distribution based on the input weight + resampled_forecast[j] = probmatching.resample_distributions( + first_array=background_ensemble[j], + second_array=observation_ensemble[j], + probability_first_array=1 - self.__sampling_probability, + ).reshape(self.__params.len_y, self.__params.len_x) + + dask_worker_collection = [] + + if DASK_IMPORTED and self._config.n_ens_members > 1: + for j in range(self._config.n_ens_members): + dask_worker_collection.append(dask.delayed(worker)(j)) + dask.compute( + *dask_worker_collection, + num_workers=self.__params.num_ensemble_workers, + ) + else: + for j in range(self._config.n_ens_members): + worker(j) + + dask_worker_collection = None + + # Set analysis ensemble into the Nowcast ensemble + background_ensemble[:, idx_prec] = analysis_ensemble + + return background_ensemble, resampled_forecast + + def get_inflation_factor_obs(self): + """ + Helper function for ensuring the full NWP weight at the end of a combined + forecast. If an accumulated sampling probability of 1 is reached, the + observation inflation factor is reduced to 0 by a cosine function. + """ + + return self.__inflation_factor_obs_tmp diff --git a/pysteps/blending/interface.py b/pysteps/blending/interface.py index 226b15a10..50fede386 100644 --- a/pysteps/blending/interface.py +++ b/pysteps/blending/interface.py @@ -15,11 +15,13 @@ from pysteps.blending import linear_blending from pysteps.blending import steps +from pysteps.blending import pca_ens_kalman_filter _blending_methods = dict() _blending_methods["linear_blending"] = linear_blending.forecast _blending_methods["salient_blending"] = partial(linear_blending.forecast, saliency=True) _blending_methods["steps"] = steps.forecast +_blending_methods["pca_enkf"] = pca_ens_kalman_filter.forecast def get_method(name): @@ -47,6 +49,9 @@ def get_method(name): | | :cite:`SPN2013`. The blending weights approach | | | currently follows :cite:`BPS2006`. | +------------------+------------------------------------------------------+ + | pca_enkf | the reduced-space EnKF combination method described | + | | in :cite:`Nerini2019`. | + +------------------+------------------------------------------------------+ """ if isinstance(name, str): name = name.lower() diff --git a/pysteps/blending/pca_ens_kalman_filter.py b/pysteps/blending/pca_ens_kalman_filter.py new file mode 100644 index 000000000..b8f6879e2 --- /dev/null +++ b/pysteps/blending/pca_ens_kalman_filter.py @@ -0,0 +1,1674 @@ +# -*- coding: utf-8 -*- +""" +pysteps.blending.pca_ens_kalman_filter +====================================== + +Implementation of the reduced-space ensemble Kalman filter method described in +:cite:`Nerini2019MWR`. The nowcast is iteratively corrected by NWP data utilizing +an ensemble Kalman filter in PCA space. The reduced-space ensemble Kalman filter +method consists of the following main steps: + + Initialization Step + =================== + #. Set the radar rainfall fields in a Lagrangian space. + #. Perform the cascade decomposition for the input radar rainfall fields. + #. Estimate AR parameters for the extrapolation nowcast and noise cascade. + #. Initialize the noise method and precompute a set of noise fields. + #. Initialize forecast models equal to the number of ensemble members. + #. Initialize the ensemble Kalman filter method. + #. Start the forecasting loop: + + Forecast Step + ============= + #. Decompose rainfall forecast field of the previous timestep. + #. Update the common precipitation mask of Nowcast and NWP fields for noise + imprint. + #. Iterate AR model. + #. Recompose rainfall forecast field. + #. (optional) Apply probability matching. + #. Extrapolate the recomposed rainfall field to the current timestep. + + Correction Step + =============== + #. Get grid boxes where rainfall is forecast. + #. Reduce Nowcast and NWP ensemble onto these grid boxes and apply principal + component analysis to further reduce the dimensionality. + #. Apply update step of ensemble Kalman filter. + + #. Set no data values in final forecast fields. + #. The origin approach iterates between forecast and correction step. However, to + reduce smoothing effects in this implementation, a pure forecast step is computed at the first forecast time step and, afterwards, it is iterated between correction and forecast step. The mentioned smoothing effects arise due to the NWP effective horizontal resolution and due to the spatial decomposition at each forecast time step. +""" +import time +import datetime +from copy import deepcopy + +import numpy as np +from scipy.ndimage import ( + binary_dilation, + gaussian_filter, +) + +from pysteps import blending, cascade, extrapolation, noise, utils +from pysteps.nowcasts import utils as nowcast_utils +from pysteps.timeseries import autoregression, correlation +from pysteps.blending.ens_kalman_filter_methods import MaskedEnKF +from pysteps.postprocessing import probmatching +from pysteps.utils.check_norain import check_norain + +try: + import dask + + DASK_IMPORTED = True +except ImportError: + DASK_IMPORTED = False + +from dataclasses import dataclass, field +from typing import Any, Callable + + +@dataclass(frozen=True) +class EnKFCombinationConfig: + """ + Parameters + ---------- + + n_ens_members: int + The number of ensemble members to generate. This number should always be + equal to the number of NWP ensemble members / number of NWP models. + n_cascade_levels: int + The number of cascade levels to use. Defaults to 6, + see issue #385 on GitHub. + precip_threshold: float + Specifies the threshold value for minimum observable precipitation + intensity. + norain_threshold: float + Specifies the threshold value for the fraction of rainy (see above) pixels + in the radar rainfall field below which we consider there to be no rain. + Depends on the amount of clutter typically present. + precip_mask_dilation: int + Number of grid boxes by which the precipitation mask should be extended per + timestep. + extrapolation_method: str + Name of the extrapolation method to use. See the documentation of + :py:mod:`pysteps.extrapolation.interface`. + decomposition_method: str, {'fft'} + Name of the cascade decomposition method to use. See the documentation + of :py:mod:`pysteps.cascade.interface`. + bandpass_filter_method: str, {'gaussian', 'uniform'} + Name of the bandpass filter method to use with the cascade decomposition. + See the documentation of :py:mod:`pysteps.cascade.interface`. + noise_method: str, {'parametric','nonparametric','ssft','nested',None} + Name of the noise generator to use for perturbating the precipitation + field. See the documentation of :py:mod:`pysteps.noise.interface`. If set to + None, no noise is generated. + enkf_method: str, {'masked_enkf'} + Name of the ensemble Kalman filter method to use for the correction step. + Currently, only 'masked_enkf' is implemented. This method corresponds to the + reduced-space ensemble Kalman filter method described by Nerini et al., 2019. + enable_combination: bool + Flag to specify whether the correction step or only the forecast steps should + be processed. + noise_stddev_adj: str, {'auto','fixed',None} + Optional adjustment for the standard deviations of the noise fields added + to each cascade level. This is done to compensate incorrect std. dev. + estimates of casace levels due to presence of no-rain areas. 'auto'=use + the method implemented in :py:func:`pysteps.noise.utils. + compute_noise_stddev_adjs`. + 'fixed'= use the formula given in :cite:`BPS2006` (eq. 6), None=disable + noise std. dev adjustment. + ar_order: int + The order of the autoregressive model to use. Currently, only AR(1) is + implemented. + seed: int + Optional seed number for the random generators. + num_workers: int + The number of workers to use for parallel computation. Applicable if dask + is enabled or pyFFTW is used for computing the FFT. When num_workers>1, it + is advisable to disable OpenMP by setting the environment variable + OMP_NUM_THREADS to 1. This avoids slowdown caused by too many simultaneous + threads. + fft_method: str + A string defining the FFT method to use (see FFT methods in + :py:func:`pysteps.utils.interface.get_method`). + Defaults to 'numpy' for compatibility reasons. If pyFFTW is installed, + the recommended method is 'pyfftw'. + domain: str, {"spatial", "spectral"} + If "spatial", all computations are done in the spatial domain (the + classical STEPS model). If "spectral", the AR(2) models and stochastic + perturbations are applied directly in the spectral domain to reduce + memory footprint and improve performance :cite:`PCH2019b`. + extrapolation_kwargs: dict + Optional dictionary containing keyword arguments for the extrapolation + method. See the documentation of :py:func:`pysteps.extrapolation.interface`. + filter_kwargs: dict + Optional dictionary containing keyword arguments for the filter method. + See the documentation of :py:mod:`pysteps.cascade.bandpass_filters`. + noise_kwargs: dict + Optional dictionary containing keyword arguments for the initializer of + the noise generator. See the documentation of :py:mod:`pysteps.noise. + fftgenerators`. + combination_kwargs: dict + Optional dictionary containing keyword arguments for the initializer of the + correction step. Options are: {nwp_hres_eff: float, the effective horizontal + resolution of the utilized NWP model; prob_matching: str, specifies the + probability matching method that should be applied}. See the documentation of + :py:mod:`pysteps.blending.ens_kalman_filter_methods`. + measure_time: bool + If set to True, measure, print and return the computation time. + callback: function, optional + Optional function that is called after computation of each time step of + the nowcast. The function takes one argument: a three-dimensional array + of shape (n_ens_members,h,w), where h and w are the height and width + of the input field precip, respectively. This can be used, for instance, + writing the outputs into files. + return_output: bool + Set to False to disable returning the outputs as numpy arrays. This can + save memory if the intermediate results are written to output files using + the callback function. (Call back function is currently not implemented.) + n_noise_fields: int + Number of precomputed noise fields. A number of 30 is adequate to generate + sufficient spread in the Nowcast. + """ + + n_ens_members: int + n_cascade_levels: int + precip_threshold: float | None + norain_threshold: float + precip_mask_dilation: int + extrapolation_method: str + decomposition_method: str + bandpass_filter_method: str + noise_method: str | None + enkf_method: str | None + enable_combination: bool + noise_stddev_adj: str | None + ar_order: int + seed: int | None + num_workers: int + fft_method: str + domain: str + extrapolation_kwargs: dict[str, Any] = field(default_factory=dict) + filter_kwargs: dict[str, Any] = field(default_factory=dict) + noise_kwargs: dict[str, Any] = field(default_factory=dict) + combination_kwargs: dict[str, Any] = field(default_factory=dict) + measure_time: bool = False + callback: Any | None = None + return_output: bool = True + n_noise_fields: int = 30 + + +@dataclass +class EnKFCombinationParams: + noise_std_coeffs: np.ndarray | None = None + bandpass_filter: Any | None = None + fft: Any | None = None + perturbation_generator: Callable[..., np.ndarray] | None = None + noise_generator: Callable[..., np.ndarray] | None = None + PHI: np.ndarray | None = None + extrapolation_method: Callable[..., Any] | None = None + decomposition_method: Callable[..., dict] | None = None + recomposition_method: Callable[..., np.ndarray] | None = None + fft_objs: list[Any] = field(default_factory=list) + xy_coordinates: np.ndarray | None = None + precip_threshold: float | None = None + mask_threshold: np.ndarray | None = None + num_ensemble_workers: int | None = None + domain_mask: np.ndarray | None = None + extrapolation_kwargs: dict | None = None + filter_kwargs: dict | None = None + noise_kwargs: dict | None = None + combination_kwargs: dict | None = None + len_y: int | None = None + len_x: int | None = None + no_rain_case: str | None = None + + +class ForecastInitialization: + """ + Class to bundle the steps necessary for the forecast initialization. + These steps are: + + #. Set the radar rainfall fields in a Lagrangian space. + #. Perform the cascade decomposition for the input radar rainfall fields. + #. Estimate AR parameters for the extrapolation nowcast and noise cascade. + #. Initialize the noise method and precompute a set of noise fields. + """ + + def __init__( + self, + enkf_combination_config: EnKFCombinationConfig, + enkf_combination_params: EnKFCombinationParams, + obs_precip: np.ndarray, + obs_velocity: np.ndarray, + ): + self.__config = enkf_combination_config + self.__params = enkf_combination_params + + self.__obs_precip = obs_precip + self.__obs_velocity = obs_velocity + + # Measure time for initialization. + if self.__config.measure_time: + self.__start_time_init = time.time() + + self.__initialize_nowcast_components() + + self.__prepare_radar_data_and_ar_parameters() + + self.__initialize_noise() + + self.__initialize_noise_field_pool() + + if self.__config.measure_time: + print( + f"Elapsed time for initialization: {time.time() - self.__start_time_init}" + ) + + # Initialize FFT, bandpass filters, decomposition methods, and extrapolation + # method. + def __initialize_nowcast_components(self): + + # Initialize number of ensemble workers + self.__params.num_ensemble_workers = min( + self.__config.n_ens_members, + self.__config.num_workers, + ) + + # Extract the spatial dimensions of the observed precipitation (x, y) + self.__params.len_y, self.__params.len_x = self.__obs_precip.shape[1:] + + # Generate the mesh grid for spatial coordinates + x_values, y_values = np.meshgrid( + np.arange(self.__params.len_x), + np.arange(self.__params.len_y), + ) + self.__params.xy_coordinates = np.stack([x_values, y_values]) + + # Initialize FFT method + self.__params.fft = utils.get_method( + self.__config.fft_method, + shape=( + self.__params.len_y, + self.__params.len_x, + ), + n_threads=self.__config.num_workers, + ) + + # Initialize the band-pass filter for the cascade decomposition + filter_method = cascade.get_method(self.__config.bandpass_filter_method) + self.__params.bandpass_filter = filter_method( + (self.__params.len_y, self.__params.len_x), + self.__config.n_cascade_levels, + **(self.__params.filter_kwargs or {}), + ) + + # Get the decomposition method (e.g., FFT) + ( + self.__params.decomposition_method, + self.__params.recomposition_method, + ) = cascade.get_method(self.__config.decomposition_method) + + # Get the extrapolation method (e.g., semilagrangian) + self.__params.extrapolation_method = extrapolation.get_method( + self.__config.extrapolation_method + ) + + # Determine the domain mask from non-finite values in the precipitation data + self.__params.domain_mask = np.logical_or.reduce( + [ + ~np.isfinite(self.__obs_precip[i, :]) + for i in range(self.__obs_precip.shape[0]) + ] + ) + + print("Nowcast components initialized successfully.") + + # Prepare radar precipitation fields for nowcasting and estimate the AR + # parameters. + def __prepare_radar_data_and_ar_parameters(self): + """ + Prepare radar and NWP precipitation fields for nowcasting. + This includes generating a threshold mask, transforming fields into + Lagrangian coordinates, cascade decomposing/recomposing, and checking + for zero-precip areas. The results are stored in class attributes. + + Estimate autoregressive (AR) parameters for the radar rainfall field. If + precipitation exists, compute temporal auto-correlations; otherwise, use + predefined climatological values. Adjust coefficients if necessary and + estimate AR model parameters. + """ + + # Start with the radar rainfall fields. We want the fields in a Lagrangian + # space. Advect the previous precipitation fields to the same position with + # the most recent one (i.e. transform them into the Lagrangian coordinates). + self.__params.extrapolation_kwargs["xy_coords"] = self.__params.xy_coordinates + self.__params.extrapolation_kwargs["outval"] = ( + self.__config.precip_threshold - 2.0 + ) + res = [] + + def transform_to_lagrangian(precip, i): + return self.__params.extrapolation_method( + precip[i, :, :], + self.__obs_velocity, + self.__config.ar_order - i, + allow_nonfinite_values=True, + **self.__params.extrapolation_kwargs.copy(), + )[-1] + + if not DASK_IMPORTED: + # Process each earlier precipitation field directly + for i in range(self.__config.ar_order): + self.__obs_precip[i, :, :] = transform_to_lagrangian( + self.__obs_precip, i + ) + else: + # Use Dask delayed for parallelization if DASK_IMPORTED is True + for i in range(self.__config.ar_order): + res.append(dask.delayed(transform_to_lagrangian)(self.__obs_precip, i)) + num_workers_ = ( + len(res) + if self.__config.num_workers > len(res) + else self.__config.num_workers + ) + self.__obs_precip = np.stack( + list(dask.compute(*res, num_workers=num_workers_)) + + [self.__obs_precip[-1, :, :]] + ) + + # Mask the observations + obs_mask = np.logical_or( + ~np.isfinite(self.__obs_precip), + self.__obs_precip < self.__config.precip_threshold, + ) + self.__obs_precip[obs_mask] = self.__config.precip_threshold - 2.0 + + # Compute the cascade decompositions of the input precipitation fields + precip_forecast_decomp = [] + for i in range(self.__config.ar_order + 1): + precip_forecast = self.__params.decomposition_method( + self.__obs_precip[i, :, :], + self.__params.bandpass_filter, + mask=self.__params.mask_threshold, + fft_method=self.__params.fft, + output_domain=self.__config.domain, + normalize=True, + compute_stats=True, + compact_output=False, + ) + precip_forecast_decomp.append(precip_forecast) + + # Rearrange the cascaded into a four-dimensional array of shape + # (n_cascade_levels,ar_order+1,m,n) for the autoregressive model + self.precip_cascades = nowcast_utils.stack_cascades( + precip_forecast_decomp, self.__config.n_cascade_levels + ) + + # Set the mean and standard deviations based on the most recent field. + precip_forecast_decomp = precip_forecast_decomp[-1] + self.mean_extrapolation = np.array(precip_forecast_decomp["means"]) + self.std_extrapolation = np.array(precip_forecast_decomp["stds"]) + + if self.__params.no_rain_case == "obs": + + GAMMA = np.ones((self.__config.n_cascade_levels, self.__config.ar_order)) + + else: + + # If there are values in the radar fields, compute the auto-correlations + GAMMA = np.empty((self.__config.n_cascade_levels, self.__config.ar_order)) + + # compute lag-l temporal auto-correlation coefficients for each cascade level + for i in range(self.__config.n_cascade_levels): + GAMMA[i, :] = correlation.temporal_autocorrelation( + self.precip_cascades[i], mask=self.__params.mask_threshold + ) + + # Print the GAMMA value + nowcast_utils.print_corrcoefs(GAMMA) + + if self.__config.ar_order == 2: + # Adjust the lag-2 correlation coefficient to ensure that the AR(p) + # process is stationary + for i in range(self.__config.n_cascade_levels): + GAMMA[i, 1] = autoregression.adjust_lag2_corrcoef2( + GAMMA[i, 0], GAMMA[i, 1] + ) + + # Estimate the parameters of the AR(p) model from the auto-correlation + # coefficients + self.__params.PHI = np.empty( + (self.__config.n_cascade_levels, self.__config.ar_order + 1) + ) + for i in range(self.__config.n_cascade_levels): + self.__params.PHI[i, :] = autoregression.estimate_ar_params_yw(GAMMA[i, :]) + + nowcast_utils.print_ar_params(self.__params.PHI) + + # Initialize the noise generation and get n_noise_fields. + def __initialize_noise(self): + """ + Initialize noise-based perturbations if configured, computing any required + adjustment coefficients and setting up the perturbation generator. + """ + if ( + self.__config.noise_method is not None + and self.__params.no_rain_case != "obs" + ): + + # get methods for perturbations + init_noise, self.__params.noise_generator = noise.get_method( + self.__config.noise_method + ) + + self.__precip_noise_input = self.__obs_precip.copy() + + # initialize the perturbation generator for the precipitation field + self.__params.perturbation_generator = init_noise( + self.__precip_noise_input, + fft_method=self.__params.fft, + **self.__params.noise_kwargs, + ) + + if self.__config.noise_stddev_adj == "auto": + print("Computing noise adjustment coefficients... ", end="", flush=True) + precip_forecast_min = np.min(self.__precip_noise_input) + self.__params.noise_std_coeffs = noise.utils.compute_noise_stddev_adjs( + self.__precip_noise_input[-1, :, :], + self.__params.precip_threshold, + precip_forecast_min, + self.__params.bandpass_filter, + self.__params.decomposition_method, + self.__params.perturbation_generator, + self.__params.noise_generator, + 20, + conditional=True, + num_workers=self.__config.num_workers, + seed=self.__config.seed, + ) + + elif self.__config.noise_stddev_adj == "fixed": + f = lambda k: 1.0 / (0.75 + 0.09 * k) + self.__params.noise_std_coeffs = [ + f(k) for k in range(1, self.__config.n_cascade_levels + 1) + ] + else: + self.__params.noise_std_coeffs = np.ones(self.__config.n_cascade_levels) + + if self.__config.noise_stddev_adj is not None: + print(f"noise std. dev. coeffs: {self.__params.noise_std_coeffs}") + + else: + self.__params.perturbation_generator = None + self.__params.noise_generator = None + self.__params.noise_std_coeffs = None + + # Create a pool of n noise fields. + def __initialize_noise_field_pool(self): + """ + Initialize a pool of noise fields avoiding the separate generation of noise fields for each + time step and ensemble member. A pool of 30 fields is sufficient to generate adequate spread + in the nowcast for combination. + """ + self.noise_field_pool = np.zeros( + ( + self.__config.n_noise_fields, + self.__config.n_cascade_levels, + self.__params.len_y, + self.__params.len_x, + ) + ) + + # Get a seed value for each ensemble member + seed = self.__config.seed + if self.__config.noise_method is not None: + self.__randgen_precip = [] + # for j in range(self.__config.n_ens_members): + for j in range(self.__config.n_noise_fields): + rs = np.random.RandomState(seed) + self.__randgen_precip.append(rs) + seed = rs.randint(0, high=1e9) + + # Get the decomposition method + self.__params.fft_objs = [] + for _ in range(self.__config.n_noise_fields): + self.__params.fft_objs.append( + utils.get_method( + self.__config.fft_method, + shape=self.precip_cascades.shape[-2:], + ) + ) + + if self.__params.noise_generator is not None: + + # Determine the noise field for each ensemble member + for j in range(self.__config.n_noise_fields): + epsilon = self.__params.noise_generator( + self.__params.perturbation_generator, + randstate=self.__randgen_precip[j], + fft_method=self.__params.fft_objs[j], + domain=self.__config.domain, + ) + # Decompose the noise field into a cascade + self.noise_field_pool[j] = self.__params.decomposition_method( + epsilon, + self.__params.bandpass_filter, + fft_method=self.__params.fft_objs[j], + input_domain=self.__config.domain, + output_domain=self.__config.domain, + compute_stats=False, + normalize=True, + compact_output=True, + )["cascade_levels"] + + +class ForecastState: + """ + Common memory of ForecastModel instances. + """ + + def __init__( + self, + enkf_combination_config: EnKFCombinationConfig, + enkf_combination_params: EnKFCombinationParams, + noise_field_pool: np.ndarray, + latest_obs: np.ndarray, + precip_mask: np.ndarray, + ): + + self.config = enkf_combination_config + self.params = enkf_combination_params + self.noise_field_pool = noise_field_pool + self.precip_mask = np.repeat( + precip_mask[None, :], self.config.n_ens_members, axis=0 + ) + + latest_obs[~np.isfinite(latest_obs)] = self.config.precip_threshold - 2.0 + self.nwc_prediction = np.repeat( + latest_obs[None, :, :], self.config.n_ens_members, axis=0 + ) + self.fc_resampled = np.repeat( + latest_obs[None, :, :], self.config.n_ens_members, axis=0 + ) + self.nwc_prediction_btf = self.nwc_prediction.copy() + + self.final_combined_forecast = [] + + return + + +class ForecastModel: + """ + Class to manage the forecast step of each ensemble member. + """ + + def __init__( + self, + forecast_state: ForecastState, + precip_cascades: np.ndarray, + velocity: np.ndarray, + mu: np.ndarray, + sigma: np.ndarray, + ens_member: int, + ): + + # Initialize instance variables + self.__forecast_state = forecast_state + self.__precip_cascades = precip_cascades + self.__velocity = velocity + + self.__mu = mu + self.__sigma = sigma + + self.__previous_displacement = np.zeros( + (2, self.__forecast_state.params.len_y, self.__forecast_state.params.len_x) + ) + + # Get NWP effective horizontal resolution and type of probability matching from + # combination kwargs. + # It's not the best practice to mix parameters. Maybe the cascade mask as well + # as the probability matching should be implemented at another location. + self.__nwp_hres_eff = self.__forecast_state.params.combination_kwargs.get( + "nwp_hres_eff", 0.0 + ) + self.__prob_matching = self.__forecast_state.params.combination_kwargs.get( + "prob_matching", "iterative" + ) + + # Get spatial scales whose central wavelengths are above the effective + # horizontal resolution of the NWP model. + # Factor 3 on the effective resolution is similar to that factor of the + # localization of AR parameters and scaling parameters. + self.__resolution_mask = ( + self.__forecast_state.params.len_y + / self.__forecast_state.params.bandpass_filter["central_wavenumbers"] + >= self.__nwp_hres_eff * 3.0 + ) + + self.__ens_member = ens_member + + # Bundle single steps of the forecast. + def run_forecast_step(self, nwp, is_correction_timestep=False): + + # Decompose precipitation field. + self.__decompose(is_correction_timestep) + + # Update precipitation mask. + self.__update_precip_mask(nwp=nwp) + + # Iterate through the AR process. + self.__iterate() + + # Recompose the precipitation field for the correction step. + self.__forecast_state.nwc_prediction[self.__ens_member] = ( + blending.utils.recompose_cascade( + combined_cascade=self.__precip_cascades[:, -1], + combined_mean=self.__mu, + combined_sigma=self.__sigma, + ) + ) + + # Apply probability matching + if self.__prob_matching == "iterative": + self.__probability_matching() + + # Extrapolate the precipitation field onto the position of the current timestep. + self.__advect() + + # Create the resulting precipitation field and set no data area. In future, when + # transformation between linear and logarithmic scale will be necessary, it will be + # implemented in this function. + def backtransform(self): + + # Set the resulting field as shallow copy of the field that is used + # continuously for forecast computation. + self.__forecast_state.nwc_prediction_btf[self.__ens_member] = ( + self.__forecast_state.nwc_prediction[self.__ens_member] + ) + + # Set no data area + self.__set_no_data() + + # Call spatial decomposition function and compute an adjusted standard deviation of + # each spatial scale at timesteps where NWP information is incorporated. + def __decompose(self, is_correction_timestep): + + # Call spatial decomposition method. + precip_extrap_decomp = self.__forecast_state.params.decomposition_method( + self.__forecast_state.nwc_prediction[self.__ens_member], + self.__forecast_state.params.bandpass_filter, + fft_method=self.__forecast_state.params.fft_objs[self.__ens_member], + input_domain=self.__forecast_state.config.domain, + output_domain=self.__forecast_state.config.domain, + compute_stats=False, + normalize=True, + compact_output=False, + ) + + # Set decomposed field onto the latest precipitation cascade. + self.__precip_cascades[:, -1] = precip_extrap_decomp["cascade_levels"] + + # If NWP information is incorporated, use the current mean of the decomposed + # field and adjust standard deviation on spatial scales that have a central + # wavelength below the effective horizontal resolution of the NWP model. + if is_correction_timestep: + # Set the mean of the spatial scales onto the mean values of the currently + # decomposed field. + self.__mu = np.array(precip_extrap_decomp["means"]) + # Compute the standard deviation evolved by an AR(1)-process. + self.__sigma = np.sqrt( + self.__forecast_state.params.PHI[:, 0] ** 2.0 * self.__sigma**2.0 + + self.__forecast_state.params.PHI[:, 1] ** 2.0 + * self.__forecast_state.params.noise_std_coeffs**2.0 + ) + + # Use the standard deviations of the currently decomposed field for spatial + # scales above the effective horizontal resolution of the NWP model. + self.__sigma[self.__resolution_mask] = np.array( + precip_extrap_decomp["stds"] + )[self.__resolution_mask] + # Else, keep mean and standard deviation constant for pure nowcasting forecast steps. + # It's not necessary but describes better the handling of the scaling + # parameters. + else: + self.__mu = self.__mu + self.__sigma = self.__sigma + + # Call extrapolation function to extrapolate the precipitation field onto the + # position of the current timestep. + def __advect(self): + + # Since previous displacement is the sum of displacement over all previous + # timesteps, we have to compute the differences between the displacements to + # get the motion vector field for one time step. + displacement_tmp = self.__previous_displacement.copy() + + # Call the extrapolation method + ( + self.__forecast_state.nwc_prediction[self.__ens_member], + self.__previous_displacement, + ) = self.__forecast_state.params.extrapolation_method( + self.__forecast_state.nwc_prediction[self.__ens_member], + self.__velocity, + [1], + allow_nonfinite_values=True, + displacement_previous=self.__previous_displacement, + **self.__forecast_state.params.extrapolation_kwargs, + ) + + # Get the difference of the previous displacement field. + self.__previous_displacement -= displacement_tmp + + # Get a noise field out of the respective pool and iterate through the AR(1) + # process. + def __iterate(self): + + # Get a noise field out of the noise field pool and multiply it with + # precipitation mask and the standard deviation coefficients. + epsilon = ( + self.__forecast_state.noise_field_pool[ + np.random.randint(self.__forecast_state.config.n_noise_fields) + ] + * self.__forecast_state.precip_mask[self.__ens_member][None, :, :] + * self.__forecast_state.params.noise_std_coeffs[:, None, None] + ) + + # Iterate through the AR(1) process for each cascade level. + for i in range(self.__forecast_state.config.n_cascade_levels): + + self.__precip_cascades[i] = autoregression.iterate_ar_model( + self.__precip_cascades[i], + self.__forecast_state.params.PHI[i], + epsilon[i], + ) + + # Update the precipitation mask for the forecast step by incorporating areas + # where the NWP model forecast precipitation. + def __update_precip_mask(self, nwp): + + # Get the area where the NWP ensemble member forecast precipitation above + # precipitation threshold and dilate it by a configurable range. + precip_mask = ( + binary_dilation( + nwp > self.__forecast_state.config.precip_threshold, + structure=np.ones( + ( + self.__forecast_state.config.precip_mask_dilation, + self.__forecast_state.config.precip_mask_dilation, + ), + dtype=int, + ), + ) + * 1.0 + ) + # Get the area where the combined member forecast precipitation above the + # precipitation threshold and dilate it by a configurable range. + precip_mask += ( + binary_dilation( + self.__forecast_state.nwc_prediction[self.__ens_member] + > self.__forecast_state.config.precip_threshold, + structure=np.ones( + ( + self.__forecast_state.config.precip_mask_dilation, + self.__forecast_state.config.precip_mask_dilation, + ), + dtype=int, + ), + ) + * 1.0 + ) + # Set values above 1 to 1 for conversion into bool. + precip_mask[precip_mask >= 1.0] = 1.0 + # Some additional dilation of the precipitation mask. + precip_mask = gaussian_filter(precip_mask, (1, 1)) + # Set the mask outside the radar domain to 0. + precip_mask[self.__forecast_state.params.domain_mask] = 0.0 + # Convert mask into bool. + self.__forecast_state.precip_mask[self.__ens_member] = np.array( + precip_mask, dtype=bool + ) + + # Apply probability matching + def __probability_matching(self): + + # Apply probability matching + self.__forecast_state.nwc_prediction[self.__ens_member] = ( + probmatching.nonparam_match_empirical_cdf( + self.__forecast_state.nwc_prediction[self.__ens_member], + self.__forecast_state.fc_resampled[self.__ens_member], + ) + ) + + # Set no data area in the resulting precipitation field. + def __set_no_data(self): + + self.__forecast_state.nwc_prediction_btf[self.__ens_member][ + self.__forecast_state.params.domain_mask + ] = np.nan + + +class EnKFCombinationNowcaster: + def __init__( + self, + obs_precip: np.ndarray, + obs_timestamps: np.ndarray, + nwp_precip: np.ndarray, + nwp_timestamps: np.ndarray, + obs_velocity: np.ndarray, + fc_period: int, + fc_init: datetime.datetime, + enkf_combination_config: EnKFCombinationConfig, + ): + """ + Initialize EnKFCombinationNowcaster with inputs and configurations. + """ + # Store inputs + self.__obs_precip = obs_precip + self.__nwp_precip = nwp_precip + self.__obs_velocity = obs_velocity + self.__fc_period = fc_period + self.__fc_init = fc_init + + # Store config + self.__config = enkf_combination_config + + # Initialize Params + self.__params = EnKFCombinationParams() + + # Store input timestamps + self.__obs_timestamps = obs_timestamps + self.__nwp_timestamps = nwp_timestamps + + def compute_forecast(self): + """ + Generate a combined nowcast ensemble by using the reduced-space ensemble Kalman + filter method. + + Parameters + ---------- + obs_precip: np.ndarray + Array of shape (ar_order+1,m,n) containing the observed input precipitation + fields ordered by timestamp from oldest to newst. The time steps between + the inputs are assumed to be regular. + obs_timestamps: np.ndarray + Array of shape (ar_order+1) containing the corresponding time stamps of + observed input precipitation fields as datetime objects. + nwp_precip: np.ndarray + Array of shape (n_ens,n_times,m,n) containing the (NWP) ensemble model + forecast. + nwp_timestamps: np.ndarray + Array of shape (n_times) containing the corresponding time stamps of the + (NWP) ensemble model forecast as datetime objects. + obs_velocity: np.ndarray + Array of shape (2,m,n) containing the x- and y-components of the advection + field. The velocities are based on the observed input precipitation fields + and are assumed to represent one time step between the inputs. All values + are required to be finite. + fc_period: int + Forecast range in minutes. + fc_init: datetime object + Issuetime of the combined forecast to compute. + enkf_combination_config: EnKFCombinationConfig + Provides a set of configuration parameters for the nowcast ensemble + generation. + + Returns + ------- + out: np.ndarray + If return_output is True, a four-dimensional array of shape + (n_ens_members,num_timesteps,m,n) containing a time series of forecast + precipitation fields for each ensemble member. Otherwise, a None value + is returned. The time series starts from t0. The timestep is taken from the + input precipitation fields precip. + + See also + -------- + :py:mod:`pysteps.extrapolation.interface`, :py:mod:`pysteps.cascade.interface`, + :py:mod:`pysteps.noise.interface`, :py:func:`pysteps.noise.utils. + compute_noise_stddev_adjs` + + References + ---------- + :cite:`Nerini2019` + + Notes + ----- + 1. The combination method currently supports only an AR(1) process for the + forecast step. + """ + + # Check for the inputs. + self.__check_inputs() + + # Check timestamps of radar and nwp input and determine forecast and correction + # timesteps as well as the temporal resolution + self.__check_input_timestamps() + + # Check wehther there is no precipitation in observation, but in NWP or the other way around + self.__check_no_rain_case() + + # Print forecast information. + self.__print_forecast_info() + + # Initialize and compute the forecast initialization. + self.FI = ForecastInitialization( + self.__config, self.__params, self.__obs_precip, self.__obs_velocity + ) + + # NWP: Set values below precip thr and nonfinite values to norain thr. + nwp_mask = np.logical_or( + ~np.isfinite(self.__nwp_precip), + self.__nwp_precip < self.__config.precip_threshold, + ) + self.__nwp_precip[nwp_mask] = self.__config.precip_threshold - 2.0 + + # Set an initial precipitation mask for the NWC models. + precip_mask = binary_dilation( + self.__obs_precip[-1] > self.__config.precip_threshold, + structure=np.ones( + (self.__config.precip_mask_dilation, self.__config.precip_mask_dilation) + ), + ) + + # Initialize an instance of NWC forecast model class for each ensemble member. + self.FS = ForecastState( + enkf_combination_config=self.__config, + enkf_combination_params=self.__params, + noise_field_pool=self.FI.noise_field_pool, + latest_obs=self.__obs_precip[-1, :, :], + precip_mask=precip_mask.copy(), + ) + + self.FC_Models = {} + for j in range(self.__config.n_ens_members): + FC = ForecastModel( + forecast_state=self.FS, + precip_cascades=deepcopy(self.FI.precip_cascades), + velocity=self.__obs_velocity, + mu=deepcopy(self.FI.mean_extrapolation), + sigma=deepcopy(self.FI.std_extrapolation), + ens_member=j, + ) + self.FC_Models[j] = FC + + # Initialize the combination model. + if self.__config.enkf_method == "masked_enkf": + kalman_filter_model = MaskedEnKF + else: + raise ValueError( + "Currently, only 'masked_enkf' is implemented as ensemble" + "Kalman filter method!" + ) + self.KalmanFilterModel = kalman_filter_model(self.__config, self.__params) + + # Start the main forecast loop. + self.__integrated_nowcast_main_loop() + + # Stack and return the forecast output. + if self.__config.return_output: + self.FS.final_combined_forecast = np.array( + self.FS.final_combined_forecast + ).swapaxes(0, 1) + + if self.__config.measure_time: + return ( + self.FS.final_combined_forecast, + self.__fc_init, + self.__mainloop_time, + ) + return self.FS.final_combined_forecast + + # Else, return None + return None + + def __check_inputs(self): + """ + Validates user's input. + """ + + # Check dimensions of obs precip + if self.__obs_precip.ndim != 3: + raise ValueError( + "Precipitation observation must be a three-dimensional " + "array of shape (ar_order + 1, m, n)" + ) + if self.__obs_precip.shape[0] < self.__config.ar_order + 1: + raise ValueError( + f"Precipitation observation must have at least " + f"{self.__config.ar_order + 1} time steps in the first" + f"dimension to match the autoregressive order " + f"(ar_order={self.__config.ar_order})" + ) + + # If it is necessary, slice the precipitation field to only use the last + # ar_order +1 time steps. + if self.__obs_precip.shape[0] > self.__config.ar_order + 1: + self.__obs_precip = np.delete( + self.__obs_precip, + np.arange( + 0, self.__obs_precip.shape[0] - (self.__config.ar_order + 1), 1 + ), + axis=0, + ) + + # Check NWP data dimensions + NWP_shape = self.__nwp_precip.shape + NWP_timestamps_len = len(self.__nwp_timestamps) + if not NWP_timestamps_len in NWP_shape: + raise ValueError( + f"nwp_timestamps has not the same length as NWP data!" + f"nwp_timestamps length: {NWP_timestamps_len}" + f"nwp_precip shape: {NWP_shape}" + ) + + # Ensure that model has shape: [n_ens_members, t, y, x] + # n_ens_members and t can sometimes be swapped when using grib datasets. + # Check for temporal resolution of NWP data + if NWP_shape[0] == NWP_timestamps_len: + self.__nwp_precip = self.__nwp_precip.swapaxes(0, 1) + + # Check dimensions of obs velocity + if self.__obs_velocity.ndim != 3: + raise ValueError( + "The velocity field must be a three-dimensional array of shape (2, m, n)" + ) + + # Check whether the spatial dimensions match between obs precip and + # obs velocity + if self.__obs_precip.shape[1:3] != self.__obs_velocity.shape[1:3]: + raise ValueError( + f"Spatial dimension of Precipitation observation and the" + "velocity field do not match: " + f"{self.__obs_precip.shape[1:3]} vs. {self.__obs_velocity.shape[1:3]}" + ) + + # Check velocity field for non-finite values + if np.any(~np.isfinite(self.__obs_velocity)): + raise ValueError("Velocity contains non-finite values") + + # Check whether there are extrapolation kwargs + if self.__config.extrapolation_kwargs is None: + self.__params.extrapolation_kwargs = dict() + else: + self.__params.extrapolation_kwargs = deepcopy( + self.__config.extrapolation_kwargs + ) + + # Check whether there are filter kwargs + if self.__config.filter_kwargs is None: + self.__params.filter_kwargs = dict() + else: + self.__params.filter_kwargs = deepcopy(self.__config.filter_kwargs) + + # Check for noise kwargs + if self.__config.noise_kwargs is None: + self.__params.noise_kwargs = {"win_fun": "tukey"} + else: + self.__params.noise_kwargs = deepcopy(self.__config.noise_kwargs) + + # Check for combination kwargs + if self.__config.combination_kwargs is None: + self.__params.combination_kwargs = dict() + else: + self.__params.combination_kwargs = deepcopy( + self.__config.combination_kwargs + ) + + # Set the precipitation threshold also in params + self.__params.precip_threshold = self.__config.precip_threshold + + # Check for the standard deviation adjustment of the noise fields + if self.__config.noise_stddev_adj not in ["auto", "fixed", None]: + raise ValueError( + f"Unknown noise_std_dev_adj method {self.__config.noise_stddev_adj}. " + "Must be 'auto', 'fixed', or None" + ) + + def __check_input_timestamps(self): + """ + Check for timestamps of radar data and NWP data, determine forecasts and + correction timesteps as well as the temporal resolution of the combined forecast + """ + + # Check for temporal resolution of radar data + obs_time_diff = np.unique(np.diff(self.__obs_timestamps)) + if obs_time_diff.size > 1: + raise ValueError( + "Observation data has a different temporal resolution or " + "observations are missing!" + ) + self.__temporal_res = int(obs_time_diff[0].total_seconds() / 60) + + # Check for temporal resolution of NWP data + nwp_time_diff = np.unique(np.diff(self.__nwp_timestamps)) + if nwp_time_diff.size > 1: + raise ValueError( + "NWP data has a different temporal resolution or some time steps are missing!" + ) + nwp_temporal_res = int(nwp_time_diff[0].total_seconds() / 60) + + # Check whether all necessary timesteps are included in the observation + if self.__obs_timestamps[-1] != self.__fc_init: + raise ValueError( + "The last observation timestamp differs from forecast issue time!" + ) + if self.__obs_timestamps.size < self.__config.ar_order + 1: + raise ValueError( + f"Precipitation observation must have at least " + f"{self.__config.ar_order + 1} time steps in the first" + f"dimension to match the autoregressive order " + f"(ar_order={self.__config.ar_order})" + ) + + # Check whether the NWP forecasts includes the combined forecast range + if np.logical_or( + self.__fc_init < self.__nwp_timestamps[0], + self.__fc_init > self.__nwp_timestamps[-1], + ): + raise ValueError("Forecast issue time is not included in the NWP forecast!") + + max_nwp_fc_period = ( + self.__nwp_timestamps.size + - np.where(self.__nwp_timestamps == self.__fc_init)[0][0] + - 1 + ) * nwp_temporal_res + if max_nwp_fc_period < self.__fc_period - nwp_temporal_res: + raise ValueError( + "The remaining NWP forecast is not sufficient for the combined forecast period" + ) + + # Truncate the NWP dataset if there sufficient remaining timesteps are available + self.__nwp_precip = np.delete( + self.__nwp_precip, + np.logical_or( + self.__nwp_timestamps <= self.__fc_init, + self.__nwp_timestamps + > self.__fc_init + datetime.timedelta(minutes=self.__fc_period), + ), + axis=1, + ) + + # Define forecast and correction timesteps assuming that temporal resolution of + # the combined forecast is equal to that of the radar data + self.__forecast_leadtimes = np.arange( + 0, self.__fc_period + 1, self.__temporal_res + ) + trunc_nwp_timestamps = ( + self.__nwp_timestamps[ + np.logical_and( + self.__nwp_timestamps > self.__fc_init, + self.__nwp_timestamps + <= self.__fc_init + datetime.timedelta(minutes=self.__fc_period), + ) + ] + - self.__fc_init + ) + self.__correction_leadtimes = np.array( + [int(timestamp.total_seconds() / 60) for timestamp in trunc_nwp_timestamps] + ) + + def __check_no_rain_case(self): + + print("Test for no rain cases") + print("======================") + print("") + + # Check for zero input fields in the radar and NWP data. + zero_precip_radar = check_norain( + self.__obs_precip, + self.__config.precip_threshold, + self.__config.norain_threshold, + self.__params.noise_kwargs["win_fun"], + ) + # The norain fraction threshold used for nwp is the default value of 0.0, + # since nwp does not suffer from clutter. + zero_precip_nwp_forecast = check_norain( + self.__nwp_precip, + self.__config.precip_threshold, + self.__config.norain_threshold, + self.__params.noise_kwargs["win_fun"], + ) + + # If there is no precipitation in the observation, set no_rain_case to "obs" + # and use only the NWP ensemble forecast + if zero_precip_radar: + self.__params.no_rain_case = "obs" + # If there is no precipitation at the first usable NWP forecast timestep, but + # in the observation, compute an extrapolation forecast + elif zero_precip_nwp_forecast: + self.__params.no_rain_case = "nwp" + # Otherwise, set no_rain_case to 'none' and compute combined forecast as usual + else: + self.__params.no_rain_case = "none" + + return + + def __print_forecast_info(self): + """ + Print information about the forecast configuration, including inputs, methods, + and parameters. + """ + print("Reduced-space ensemble Kalman filter") + print("====================================") + print("") + + print("Inputs") + print("------") + print(f"Forecast issue time: {self.__fc_init.isoformat()}") + print( + f"Input dimensions: {self.__obs_precip.shape[1]}x{self.__obs_precip.shape[2]}" + ) + print(f"Temporal resolution: {self.__temporal_res} minutes") + print("") + + print("NWP and blending inputs") + print("-----------------------") + print(f"Number of (NWP) models: {self.__nwp_precip.shape[0]}") + print("") + + print("Methods") + print("-------") + print( + f"Extrapolation: {self.__config.extrapolation_method}" + ) + print( + f"Bandpass filter: {self.__config.bandpass_filter_method}" + ) + print( + f"Decomposition: {self.__config.decomposition_method}" + ) + print(f"Noise generator: {self.__config.noise_method}") + print( + f"Noise adjustment: {'yes' if self.__config.noise_stddev_adj else 'no'}" + ) + + print(f"EnKF implementation: {self.__config.enkf_method}") + + print(f"FFT method: {self.__config.fft_method}") + print(f"Domain: {self.__config.domain}") + print("") + + print("Parameters") + print("----------") + print(f"Forecast length in min: {self.__fc_period}") + print(f"Ensemble size: {self.__config.n_ens_members}") + print(f"Parallel threads: {self.__config.num_workers}") + print(f"Number of cascade levels: {self.__config.n_cascade_levels}") + print(f"Order of the AR(p) model: {self.__config.ar_order}") + print("") + + print(f"No rain forecast: {self.__params.no_rain_case}") + + def __integrated_nowcast_main_loop(self): + + if self.__config.measure_time: + starttime_mainloop = time.time() + + self.__params.extrapolation_kwargs["return_displacement"] = True + is_correction_timestep = False + + for t, fc_leadtime in enumerate(self.__forecast_leadtimes): + if self.__config.measure_time: + starttime = time.time() + + # Check whether forecast time step is also a correction time step. + is_correction_timestep = ( + self.__forecast_leadtimes[t - 1] in self.__correction_leadtimes + and t > 1 + and np.logical_and( + self.__config.enable_combination, + self.__params.no_rain_case != "nwp", + ) + ) + + # Check whether forecast time step is a nowcasting time step. + is_nowcasting_timestep = t > 0 + + # Check whether full NWP weight is reached. + is_full_nwp_weight = ( + self.KalmanFilterModel.get_inflation_factor_obs() <= 0.02 + or self.__params.no_rain_case == "obs" + ) + + # If full NWP weight is reached, set pure NWP ensemble forecast in combined + # forecast output + if is_full_nwp_weight: + + # Set t_corr to the first available NWP data timestep and that is 0 + try: + t_corr + except NameError: + t_corr = 0 + + print(f"Full NWP weight is reached for lead time + {fc_leadtime} min") + if is_correction_timestep: + t_corr = np.where( + self.__correction_leadtimes == self.__forecast_leadtimes[t - 1] + )[0][0] + self.FS.nwc_prediction = self.__nwp_precip[:, t_corr] + + # Otherwise compute the combined forecast. + else: + print(f"Computing combination for lead time + {fc_leadtime} min") + self.__forecast_loop(t, is_correction_timestep, is_nowcasting_timestep) + + # Apply back transformation + for FC_Model in self.FC_Models.values(): + FC_Model.backtransform() + + self.__write_output() + + if self.__config.measure_time: + _ = self.__measure_time("timestep", starttime) + else: + print("...done.") + + if self.__config.measure_time: + self.__mainloop_time = time.time() - starttime_mainloop + print( + f"Elapsed time for computing forecast: {(self.__mainloop_time / 60.0):.4} min" + ) + + return + + def __forecast_loop(self, t, is_correction_timestep, is_nowcasting_timestep): + + # If the temporal resolution of the NWP data is equal to those of the + # observation, the correction step can be applied after the forecast + # step for the current forecast leadtime. + # However, if the temporal resolution is different, the correction step + # has to be applied before the forecast step to avoid smoothing effects + # in the resulting precipitation fields. + if is_correction_timestep: + t_corr = np.where( + self.__correction_leadtimes == self.__forecast_leadtimes[t - 1] + )[0][0] + + self.FS.nwc_prediction, self.FS.fc_resampled = ( + self.KalmanFilterModel.correct_step( + self.FS.nwc_prediction, + self.__nwp_precip[:, t_corr], + self.FS.fc_resampled, + ) + ) + + # Run nowcasting time step + if is_nowcasting_timestep: + + # Set t_corr to the first available NWP data timestep and that is 0 + try: + t_corr + except NameError: + t_corr = 0 + + def worker(j): + + self.FC_Models[j].run_forecast_step( + nwp=self.__nwp_precip[j, t_corr], + is_correction_timestep=is_correction_timestep, + ) + + dask_worker_collection = [] + + if DASK_IMPORTED and self.__config.n_ens_members > 1: + for j in range(self.__config.n_ens_members): + dask_worker_collection.append(dask.delayed(worker)(j)) + dask.compute( + *dask_worker_collection, + num_workers=self.__params.num_ensemble_workers, + ) + else: + for j in range(self.__config.n_ens_members): + worker(j) + + dask_worker_collection = None + + def __write_output(self): + + if ( + self.__config.callback is not None + and self.FS.nwc_prediction_btf.shape[1] > 0 + ): + self.__config.callback(self.FS.nwc_prediction_btf) + + if self.__config.return_output: + + self.FS.final_combined_forecast.append(self.FS.nwc_prediction_btf.copy()) + + def __measure_time(self, label, start_time): + """ + Measure and print the time taken for a specific part of the process. + + Parameters: + - label: A description of the part of the process being measured. + - start_time: The timestamp when the process started (from time.time()). + """ + if self.__config.measure_time: + elapsed_time = time.time() - start_time + print(f"{label} took {elapsed_time:.2f} seconds.") + return elapsed_time + return None + + +def forecast( + obs_precip, + obs_timestamps, + nwp_precip, + nwp_timestamps, + velocity, + forecast_horizon, + issuetime, + n_ens_members, + precip_mask_dilation=1, + n_cascade_levels=6, + precip_thr=-10.0, + norain_thr=0.01, + extrap_method="semilagrangian", + decomp_method="fft", + bandpass_filter_method="gaussian", + noise_method="nonparametric", + enkf_method="masked_enkf", + enable_combination=True, + noise_stddev_adj=None, + ar_order=1, + callback=None, + return_output=True, + seed=None, + num_workers=1, + fft_method="numpy", + domain="spatial", + extrap_kwargs=None, + filter_kwargs=None, + noise_kwargs=None, + combination_kwargs=None, + measure_time=False, +): + """ + Generate a combined nowcast ensemble by using the reduced-space ensemble Kalman + filter method. + + Parameters + ---------- + obs_precip: np.ndarray + Array of shape (ar_order+1,m,n) containing the observed input precipitation + fields ordered by timestamp from oldest to newst. The time steps between + the inputs are assumed to be regular. + obs_timestamps: np.ndarray + Array of shape (ar_order+1) containing the corresponding time stamps of + observed input precipitation fields as datetime objects. + nwp_precip: np.ndarray + Array of shape (n_ens,n_times,m,n) containing the (NWP) ensemble model + forecast. + nwp_timestamps: np.ndarray + Array of shape (n_times) containing the corresponding time stamps of the + (NWP) ensemble model forecast as datetime objects. + velocity: np.ndarray + Array of shape (2,m,n) containing the x- and y-components of the advection + field. The velocities are based on the observed input precipitation fields + and are assumed to represent one time step between the inputs. All values + are required to be finite. + forecast_horizon: int + The length of the forecast horizon (the length of the forecast) in minutes. + issuetime: datetime object + Issuetime of the combined forecast to compute. + n_ens_members: int + The number of ensemble members to generate. This number should always be + equal to or larger than the number of NWP ensemble members / number of + NWP models. + precip_mask_dilation: int + Range by which the precipitation mask within the forecast step should be + extended per time step. Defaults to 1. + n_cascade_levels: int, optional + The number of cascade levels to use. Defaults to 6, see issue #385 on GitHub. + precip_thr: float, optional + pecifies the threshold value for minimum observable precipitation + intensity. Required if mask_method is not None or conditional is True. + Defaults to -10.0. + norain_thr: float + Specifies the threshold value for the fraction of rainy (see above) pixels + in the radar rainfall field below which we consider there to be no rain. + Depends on the amount of clutter typically present. Defaults to -15.0. + extrap_method: str, optional + Name of the extrapolation method to use. See the documentation of + :py:mod:`pysteps.extrapolation.interface`. Defaults to 'semilagrangian'. + decomp_method: {'fft'}, optional + Name of the cascade decomposition method to use. See the documentation + of :py:mod:`pysteps.cascade.interface`. Defaults to 'fft'. + bandpass_filter_method: {'gaussian', 'uniform'}, optional + Name of the bandpass filter method to use with the cascade decomposition. + See the documentation of :py:mod:`pysteps.cascade.interface`. Defaults to + 'guassian'. + noise_method: {'parametric','nonparametric','ssft','nested',None}, optional + Name of the noise generator to use for perturbating the precipitation + field. See the documentation of :py:mod:`pysteps.noise.interface`. If set to + None, no noise is generated. Defaults to 'nonparametric'. + enkf_method: {'masked_enkf}, optional + Name of the ensemble Kalman filter method to use for the correction step. + Currently, only 'masked_enkf' method is implemented that corresponds to the + reduced-space ensemble Kalman filter technique described in Nerini et al. 2019. + Defaults to 'masked_enkf'. + enable_combination: bool, optional + Flag to specify whether the correction step should be applied or a pure + nowcasting ensemble should be computed. Defaults to True. + noise_stddev_adj: {'auto','fixed',None}, optional + Optional adjustment for the standard deviations of the noise fields added + to each cascade level. This is done to compensate incorrect std. dev. + estimates of casace levels due to presence of no-rain areas. 'auto'=use + the method implemented in :py:func:`pysteps.noise.utils. + compute_noise_stddev_adjs`. + 'fixed'= use the formula given in :cite:`BPS2006` (eq. 6), None=disable + noise std. dev adjustment. + ar_order: int, optional + The order of the autoregressive model to use. Must be 1, since only this order + is currently implemented. + callback: function, optional + Optional function that is called after computation of each time step of + the nowcast. The function takes one argument: a three-dimensional array + of shape (n_ens_members,h,w), where h and w are the height and width + of the input field precip, respectively. This can be used, for instance, + writing the outputs into files. + return_output: bool, optional + Set to False to disable returning the outputs as numpy arrays. This can + save memory if the intermediate results are written to output files using + the callback function. + num_workers: int, optional + The number of workers to use for parallel computation. Applicable if dask + is enabled or pyFFTW is used for computing the FFT. When num_workers>1, it + is advisable to disable OpenMP by setting the environment variable + OMP_NUM_THREADS to 1. This avoids slowdown caused by too many simultaneous + threads. + fft_method: str, optional + A string defining the FFT method to use (see FFT methods in + :py:func:`pysteps.utils.interface.get_method`). + Defaults to 'numpy' for compatibility reasons. If pyFFTW is installed, + the recommended method is 'pyfftw'. + domain: {"spatial", "spectral"} + If "spatial", all computations are done in the spatial domain (the + classical STEPS model). If "spectral", the AR(2) models and stochastic + perturbations are applied directly in the spectral domain to reduce + memory footprint and improve performance :cite:`PCH2019b`. + extrap_kwargs: dict, optional + Optional dictionary containing keyword arguments for the extrapolation + method. See the documentation of :py:func:`pysteps.extrapolation.interface`. + filter_kwargs: dict, optional + Optional dictionary containing keyword arguments for the filter method. + See the documentation of :py:mod:`pysteps.cascade.bandpass_filters`. + noise_kwargs: dict, optional + Optional dictionary containing keyword arguments for the initializer of + the noise generator. See the documentation of :py:mod:`pysteps.noise. + fftgenerators`. + combination_kwargs: dict, optional + Optional dictionary containing keyword arguments for the initializer of the + ensemble Kalman filter method. See the documentation of + :py:mod:`pysteps.blending.ens_kalman_filter_methods`. + measure_time: bool + If set to True, measure, print and return the computation time. + + Returns + ------- + out: np.ndarray + If return_output is True, a four-dimensional array of shape + (n_ens_members,num_timesteps,m,n) containing a time series of forecast + precipitation fields for each ensemble member. Otherwise, a None value + is returned. The time series starts from t0. The timestep is taken from the + input precipitation fields precip. + + See also + -------- + :py:mod:`pysteps.extrapolation.interface`, :py:mod:`pysteps.cascade.interface`, + :py:mod:`pysteps.noise.interface`, :py:func:`pysteps.noise.utils. + compute_noise_stddev_adjs` + + References + ---------- + :cite:`Nerini2019` + + Notes + ----- + 1. The combination method currently supports only an AR(1) process for the + forecast step. + """ + + combination_config = EnKFCombinationConfig( + n_ens_members=n_ens_members, + n_cascade_levels=n_cascade_levels, + precip_threshold=precip_thr, + norain_threshold=norain_thr, + precip_mask_dilation=precip_mask_dilation, + extrapolation_method=extrap_method, + decomposition_method=decomp_method, + bandpass_filter_method=bandpass_filter_method, + noise_method=noise_method, + enkf_method=enkf_method, + enable_combination=enable_combination, + noise_stddev_adj=noise_stddev_adj, + ar_order=ar_order, + seed=seed, + num_workers=num_workers, + fft_method=fft_method, + domain=domain, + extrapolation_kwargs=extrap_kwargs, + filter_kwargs=filter_kwargs, + noise_kwargs=noise_kwargs, + combination_kwargs=combination_kwargs, + measure_time=measure_time, + callback=callback, + return_output=return_output, + n_noise_fields=30, + ) + + combination_nowcaster = EnKFCombinationNowcaster( + obs_precip=obs_precip, + obs_timestamps=obs_timestamps, + nwp_precip=nwp_precip, + nwp_timestamps=nwp_timestamps, + obs_velocity=velocity, + fc_period=forecast_horizon, + fc_init=issuetime, + enkf_combination_config=combination_config, + ) + + forecast_enkf_combination = combination_nowcaster.compute_forecast() + + return forecast_enkf_combination diff --git a/pysteps/io/interface.py b/pysteps/io/interface.py index 58b684ea3..b1dc74855 100644 --- a/pysteps/io/interface.py +++ b/pysteps/io/interface.py @@ -149,6 +149,8 @@ def get_method(name, method_type): | | archive containing precipitation intensity | | | composites. | +--------------+------------------------------------------------------+ + | dwd_hdf5 | HDF5 file format used by DWD. | + +--------------+------------------------------------------------------+ | fmi_geotiff | GeoTIFF files used in the Finnish Meteorological | | | Institute (FMI) archive, containing reflectivity | | | composites (dBZ). | diff --git a/pysteps/tests/test_blending_pca_ens_kalman_filter.py b/pysteps/tests/test_blending_pca_ens_kalman_filter.py new file mode 100644 index 000000000..65df77917 --- /dev/null +++ b/pysteps/tests/test_blending_pca_ens_kalman_filter.py @@ -0,0 +1,269 @@ +# -*- coding: utf-8 -*- + +import datetime + +import numpy as np +import pytest + +from pysteps import blending, motion, utils + +# fmt: off +pca_enkf_arg_values = [ + # Standard setting + (20,30,0,-60,False,False,5,5,0.05,0.01,"ssft","masked_enkf",True,None,1.0,"ensemble",False,False), + # Coarser NWP temporal resolution + (20,30,0,-60,False,False,5,15,0.05,0.01,"ssft","masked_enkf",True,None,1.0,"ensemble",False,False), + # Coarser Obs temporal resolution + (20,30,0,-60,False,False,10,5,0.05,0.01,"ssft","masked_enkf",True,None,1.0,"ensemble",False,False), + # Larger shift of the NWP init + (20,30,0,-30,False,False,5,5,0.05,0.01,"ssft","masked_enkf",True,None,1.0,"ensemble",False,False), + # Zero rain case in observation + (20,30,0,-60,True,False,5,5,0.05,0.01,"ssft","masked_enkf",True,None,1.0,"ensemble",False,False), + # Zero rain case in NWP + (20,30,0,-60,False,True,5,5,0.05,0.01,"ssft","masked_enkf",True,None,1.0,"ensemble",False,False), + # Zero rain in both + (20,30,0,-60,True,True,5,5,0.05,0.01,"ssft","masked_enkf",True,None,1.0,"ensemble",False,False), + # Accumulated sampling probability + (20,30,0,-60,False,False,5,5,0.05,0.01,"ssft","masked_enkf",True,None,1.0,"ensemble",True,False), + # Use full NWP weight + (20,30,0,-60,False,False,5,5,0.05,0.01,"ssft","masked_enkf",True,None,1.0,"ensemble",False,True), + # Both + (20,30,0,-60,False,False,5,5,0.05,0.01,"ssft","masked_enkf",True,None,1.0,"ensemble",True,True), + # Explained variance as sampling probability source + (20,30,0,-60,False,False,5,5,0.05,0.01,"ssft","masked_enkf",True,None,1.0,"explained_var",False,False), + # No combination + (20,30,0,-60,False,False,5,5,0.05,0.01,"ssft","masked_enkf",False,None,1.0,"ensemble",False,False), + # Standard deviation adjustment + (20,30,0,-60,False,False,5,5,0.05,0.01,"ssft","masked_enkf",True,"auto",1.0,"ensemble",False,False), + # Other number of ensemble members + (10,30,0,-60,False,False,5,5,0.05,0.01,"ssft","masked_enkf",True,None,1.0,"ensemble",False,False), + # Other forecast length + (20,35,0,-60,False,False,5,5,0.05,0.01,"ssft","masked_enkf",True,None,1.0,"ensemble",False,False), + # Other noise method + (20,30,0,-60,False,False,5,5,0.05,0.01,"nonparametric","masked_enkf",True,None,1.0,"ensemble",False,False),] +# fmt: on + +pca_enkf_arg_names = ( + "n_ens_members", + "forecast_length", + "forecast_shift_radar", + "forecast_shift_nwp", + "zero_radar", + "zero_nwp", + "temporal_res_radar", + "temporal_res_nwp", + "thr_prec", + "norain_thr", + "noise_method", + "enkf_method", + "enable_combination", + "noise_stddev_adj", + "inflation_factor_bg", + "sampling_prob_source", + "use_accum_sampling_prob", + "ensure_full_nwp_weight", +) + + +@pytest.mark.parametrize(pca_enkf_arg_names, pca_enkf_arg_values) +def test_pca_enkf_combination( + n_ens_members, + forecast_length, + forecast_shift_radar, + forecast_shift_nwp, + zero_radar, + zero_nwp, + temporal_res_radar, + temporal_res_nwp, + thr_prec, + norain_thr, + noise_method, + enkf_method, + enable_combination, + noise_stddev_adj, + inflation_factor_bg, + sampling_prob_source, + use_accum_sampling_prob, + ensure_full_nwp_weight, +): + pytest.importorskip("sklearn") + + # Set forecast init + forecast_init = datetime.datetime(2025, 6, 4, 17, 0) + + # Initialize dummy radar data + radar_precip = np.zeros((2, 200, 200)) + if not zero_radar: + for i in range(radar_precip.shape[0]): + a = 5 * i + radar_precip[i, 5 + a : 100 - a, 30 + a : 180 - a] = 0.1 + radar_precip[i, 10 + a : 105 - a, 35 + a : 178 - a] = 0.5 + radar_precip[i, 15 + a : 110 - a, 40 + a : 176 - a] = 0.5 + radar_precip[i, 20 + a : 115 - a, 45 + a : 174 - a] = 5.0 + radar_precip[i, 25 + a : 120 - a, 50 + a : 172 - a] = 5.0 + radar_precip[i, 30 + a : 125 - a, 55 + a : 170 - a] = 4.5 + radar_precip[i, 35 + a : 130 - a, 60 + a : 168 - a] = 4.5 + radar_precip[i, 40 + a : 135 - a, 65 + a : 166 - a] = 4.0 + radar_precip[i, 45 + a : 140 - a, 70 + a : 164 - a] = 1.0 + radar_precip[i, 50 + a : 145 - a, 75 + a : 162 - a] = 0.5 + radar_precip[i, 55 + a : 150 - a, 80 + a : 160 - a] = 0.5 + radar_precip[i, 60 + a : 155 - a, 85 + a : 158 - a] = 0.1 + + radar_precip_timestamps = np.array( + sorted( + [ + forecast_init + + datetime.timedelta(minutes=forecast_shift_radar) + - datetime.timedelta(minutes=i * temporal_res_radar) + for i in range(radar_precip.shape[0]) + ] + ) + ) + + # Initialize dummy NWP data + nwp_precip = np.zeros((n_ens_members, 20, 200, 200)) + if not zero_nwp: + for n_model in range(n_ens_members): + for i in range(nwp_precip.shape[1]): + a = 2 * n_model + b = 2 * i + nwp_precip[n_model, i, 20 + b : 160 - b, 30 + a : 180 - a] = 0.1 + nwp_precip[n_model, i, 22 + b : 162 - b, 35 + a : 178 - a] = 0.1 + nwp_precip[n_model, i, 24 + b : 164 - b, 40 + a : 176 - a] = 1.0 + nwp_precip[n_model, i, 26 + b : 166 - b, 45 + a : 174 - a] = 5.0 + nwp_precip[n_model, i, 28 + b : 168 - b, 50 + a : 172 - a] = 5.0 + nwp_precip[n_model, i, 30 + b : 170 - b, 35 + a : 170 - a] = 4.5 + nwp_precip[n_model, i, 32 + b : 172 - b, 40 + a : 168 - a] = 4.5 + nwp_precip[n_model, i, 34 + b : 174 - b, 45 + a : 166 - a] = 4.0 + nwp_precip[n_model, i, 36 + b : 176 - b, 50 + a : 164 - a] = 2.0 + nwp_precip[n_model, i, 38 + b : 178 - b, 55 + a : 162 - a] = 1.0 + nwp_precip[n_model, i, 40 + b : 180 - b, 60 + a : 160 - a] = 0.5 + nwp_precip[n_model, i, 42 + b : 182 - b, 65 + a : 158 - a] = 0.1 + + nwp_precip_timestamps = np.array( + sorted( + [ + forecast_init + + datetime.timedelta(minutes=forecast_shift_nwp) + + datetime.timedelta(minutes=i * temporal_res_nwp) + for i in range(nwp_precip.shape[1]) + ] + ) + ) + + # Metadata of dummy data is necessary for data conversion + metadata = dict() + metadata["unit"] = "mm" + metadata["transformation"] = "dB" + metadata["accutime"] = 5.0 + metadata["transform"] = None + metadata["zerovalue"] = 0.0 + metadata["threshold"] = thr_prec + metadata["zr_a"] = 200.0 + metadata["zr_b"] = 1.6 + + # Converting the input data + # Thresholding + radar_precip[radar_precip < metadata["threshold"]] = 0.0 + nwp_precip[nwp_precip < metadata["threshold"]] = 0.0 + + # Convert the data + converter = utils.get_method("mm/h") + radar_precip, _ = converter(radar_precip, metadata) + nwp_precip, metadata = converter(nwp_precip, metadata) + + # Transform the data + transformer = utils.get_method(metadata["transformation"]) + radar_precip, _ = transformer(radar_precip, metadata) + nwp_precip, metadata = transformer(nwp_precip, metadata) + + # Set NaN equal to zero + radar_precip[~np.isfinite(radar_precip)] = metadata["zerovalue"] + nwp_precip[~np.isfinite(nwp_precip)] = metadata["zerovalue"] + + assert ( + np.any(~np.isfinite(radar_precip)) == False + ), "There are still infinite values in the input radar data" + assert ( + np.any(~np.isfinite(nwp_precip)) == False + ), "There are still infinite values in the NWP data" + + # Initialize radar velocity + oflow_method = motion.get_method("LK") + radar_velocity = oflow_method(radar_precip) + + # Set the combination kwargs + combination_kwargs = dict( + n_tapering=0, + non_precip_mask=True, + n_ens_prec=1, + lien_criterion=True, + n_lien=10, + prob_matching="iterative", + inflation_factor_bg=inflation_factor_bg, + inflation_factor_obs=1.0, + offset_bg=0.0, + offset_obs=0.0, + nwp_hres_eff=14.0, + sampling_prob_source=sampling_prob_source, + use_accum_sampling_prob=use_accum_sampling_prob, + ensure_full_nwp_weight=ensure_full_nwp_weight, + ) + + # Call the reduced-spaced ensemble Kalman filter approach. + combined_forecast = blending.pca_ens_kalman_filter.forecast( + obs_precip=radar_precip, + obs_timestamps=radar_precip_timestamps, + nwp_precip=nwp_precip, + nwp_timestamps=nwp_precip_timestamps, + velocity=radar_velocity, + forecast_horizon=forecast_length, + issuetime=forecast_init, + n_ens_members=n_ens_members, + precip_mask_dilation=1, + n_cascade_levels=6, + precip_thr=metadata["threshold"], + norain_thr=norain_thr, + extrap_method="semilagrangian", + decomp_method="fft", + bandpass_filter_method="gaussian", + noise_method=noise_method, + enkf_method=enkf_method, + enable_combination=enable_combination, + noise_stddev_adj=noise_stddev_adj, + ar_order=1, + callback=None, + return_output=True, + seed=None, + num_workers=1, + fft_method="numpy", + domain="spatial", + extrap_kwargs=None, + filter_kwargs=None, + noise_kwargs=None, + combination_kwargs=combination_kwargs, + measure_time=False, + ) + + assert combined_forecast.ndim == 4, "Wrong amount of dimensions in forecast output" + assert ( + combined_forecast.shape[0] == n_ens_members + ), "Wrong amount of output ensemble members in forecast output" + assert ( + combined_forecast.shape[1] == forecast_length // temporal_res_radar + 1 + ), "Wrong amount of output time steps in forecast output" + + # Transform the data back into mm/h + combined_forecast, _ = converter(combined_forecast, metadata) + + assert ( + combined_forecast.ndim == 4 + ), "Wrong amount of dimensions in converted forecast output" + assert ( + combined_forecast.shape[0] == n_ens_members + ), "Wrong amount of output ensemble members in converted forecast output" + assert ( + combined_forecast.shape[1] == forecast_length // temporal_res_radar + 1 + ), "Wrong amount of output time steps in converted forecast output" + + return diff --git a/pysteps/tests/test_utils_pca.py b/pysteps/tests/test_utils_pca.py new file mode 100644 index 000000000..be8488286 --- /dev/null +++ b/pysteps/tests/test_utils_pca.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- + +import pytest +import numpy as np +from pysteps.utils import pca + + +pca_arg_values = ( + (10, 10), + (20, 20), + (10, 5), + (20, 15), +) + +pca_arg_names = ("len_y", "n_components") + + +@pytest.mark.parametrize(pca_arg_names, pca_arg_values) +def test_pca(len_y, n_components): + + pytest.importorskip("sklearn") + + precip_field = np.zeros((len_y, 200, 200)) + for i in range(len_y): + a = 3 * i + b = 2 * i + precip_field[i, 20 + b : 160 - b, 30 + a : 180 - a] = 0.1 + precip_field[i, 22 + b : 162 - b, 35 + a : 178 - a] = 0.1 + precip_field[i, 24 + b : 164 - b, 40 + a : 176 - a] = 1.0 + precip_field[i, 26 + b : 166 - b, 45 + a : 174 - a] = 5.0 + precip_field[i, 28 + b : 168 - b, 50 + a : 172 - a] = 5.0 + precip_field[i, 30 + b : 170 - b, 35 + a : 170 - a] = 4.5 + precip_field[i, 32 + b : 172 - b, 40 + a : 168 - a] = 4.5 + precip_field[i, 34 + b : 174 - b, 45 + a : 166 - a] = 4.0 + precip_field[i, 36 + b : 176 - b, 50 + a : 164 - a] = 2.0 + precip_field[i, 38 + b : 178 - b, 55 + a : 162 - a] = 1.0 + precip_field[i, 40 + b : 180 - b, 60 + a : 160 - a] = 0.5 + precip_field[i, 42 + b : 182 - b, 65 + a : 158 - a] = 0.1 + + precip_field = precip_field.reshape( + len_y, precip_field.shape[1] * precip_field.shape[2] + ) + + kwargs = {"n_components": n_components, "svd_solver": "full"} + precip_field_pc, pca_params = pca.pca_transform( + forecast_ens=precip_field, get_params=True, **kwargs + ) + + assert precip_field_pc.shape == (len_y, n_components) + assert pca_params["principal_components"].shape[1] == precip_field.shape[1] + assert pca_params["mean"].shape[0] == precip_field.shape[1] + + precip_field_backtransformed = pca.pca_backtransform( + precip_field_pc, pca_params=pca_params + ) + + # These fields are only equal if the full PCA is computed + if len_y == n_components: + assert np.sum(np.abs(precip_field_backtransformed - precip_field)) < 1e-6 diff --git a/pysteps/timeseries/autoregression.py b/pysteps/timeseries/autoregression.py index 729525480..0617fd3ab 100644 --- a/pysteps/timeseries/autoregression.py +++ b/pysteps/timeseries/autoregression.py @@ -1051,8 +1051,8 @@ def iterate_ar_model(x, phi, eps=None): if eps is not None and eps.shape != x.shape[1:]: raise ValueError( - "dimension mismatch between x and eps: x.shape=%s, eps.shape[1:]=%s" - % (str(x.shape), str(eps.shape[1:])) + "dimension mismatch between x and eps: x[1:].shape=%s, eps.shape=%s" + % (str(x[1:].shape), str(eps.shape)) ) x_new = 0.0 diff --git a/pysteps/utils/__init__.py b/pysteps/utils/__init__.py index 9594a75ae..a1a37c393 100644 --- a/pysteps/utils/__init__.py +++ b/pysteps/utils/__init__.py @@ -8,7 +8,8 @@ from .interface import get_method from .interpolate import * from .fft import * +from .pca import * +from .reprojection import * from .spectral import * from .tapering import * from .transformation import * -from .reprojection import * diff --git a/pysteps/utils/interface.py b/pysteps/utils/interface.py index ded0c3094..a1d9facce 100644 --- a/pysteps/utils/interface.py +++ b/pysteps/utils/interface.py @@ -18,6 +18,7 @@ from . import fft from . import images from . import interpolate +from . import pca from . import reprojection from . import spectral from . import tapering @@ -109,6 +110,18 @@ def get_method(name, **kwargs): Additional keyword arguments are passed to the initializer of the FFT methods, see utils.fft. + Principal component analysis methods: + + +-------------------+-----------------------------------------------------+ + | Name | Description | + +===================+=====================================================+ + | pca_transform | Transform a two-dimensional array into principal | + | | component analysis | + +-------------------+-----------------------------------------------------+ + | pca_backtransform | Transform a given principal component trans- | + | | formation back into physical space | + +-------------------+-----------------------------------------------------+ + Reprojection methods: +-------------------+-----------------------------------------------------+ @@ -197,6 +210,10 @@ def donothing(R, metadata=None, *args, **kwargs): methods_objects["rbfinterp2d"] = interpolate.rbfinterp2d methods_objects["idwinterp2d"] = interpolate.idwinterp2d + # pca methods + methods_objects["pca_transform"] = pca.pca_transform + methods_objects["pca_backtransform"] = pca.pca_backtransform + # reprojection methods methods_objects["reproject_grids"] = reprojection.reproject_grids diff --git a/pysteps/utils/pca.py b/pysteps/utils/pca.py new file mode 100644 index 000000000..fe052270c --- /dev/null +++ b/pysteps/utils/pca.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8 -*- +""" +pysteps.utils.pca + +Principal component analysis for pysteps. + +.. autosummary:: + :toctree: ../generated/ + + pca_transform + pca_backtransform +""" + +import numpy as np +from pysteps.exceptions import MissingOptionalDependency + +try: + from sklearn import decomposition + + SKLEARN_IMPORTED = True +except ImportError: + SKLEARN_IMPORTED = False + + +def pca_transform( + forecast_ens: np.ndarray, + mask: np.ndarray | None = None, + pca_params: dict | None = None, + get_params: bool = False, + **kwargs: dict, +): + """ + Transform a two-dimensional array into PC space. + + Parameters + ---------- + forecast_ens: np.ndarray + Two-dimensional array of shape (n_ens, n_gridpoints) containing the + precipitation ensemble forecast that should be decomposed in principal component + (PC) space. + + Other Parameters + ---------------- + mask: np.ndarray + Optional mask to transform only grid points at which at least 10 ensemble + members have forecast precipitaton to fulfill the Lien criterion (Lien et al., + 2013) that is mentioned in Nerini et al., 2019. Defaults to None. + pca_params: dict + Optional output dictionary containing the preconstructed Principal Component + Analysis, since this construction is performed on the full precipitation + forecast dataset. Defaults to None. + get_params: bool + Optional flag whether pca_params should output or not. Defaults to False. + n_components: int + Number of principal components. + svd_solver: {'auto', 'full', 'covariance_eigh', 'arpack', 'randomized'} + Solver for the singular vector decomposition. For a detailed description see + the documentation of sklearn.decomposition.PCA. + + Returns + ------- + forecast_ens_pc: np.ndarray + Two-dimensional array of shape (n_components, n_ens) containing the input data + transformed into PC space. If not a mask is given as input, the full dataset is + transformed, otherwise only the mask-filtered values are transformed. + pca_params: dict (optional) + If the respective flag (get_params) is set to True, a dictionary is returned + containing: + -principal_components: np.ndarray + Two-dimensional array of shape (n_components, n_features) containing the + principal components in feature space. + -mean: np.ndarray + One-dimensional array of shape (n_features) containing the per-feature + empirical mean estimated from the input data. + -explained_variance: np.ndarray + One-dimensional array of shape (n_features) containg the per-feature + explained variance ratio. + """ + + # Test import of sklean + if not SKLEARN_IMPORTED: + raise MissingOptionalDependency( + "scikit-learn package is required for principal component analysis " + "but it is not installed" + ) + + # Input data have to be two-dimensional + if forecast_ens.ndim != 2: + raise ValueError("Input array should be two-dimensional!") + + if pca_params is None: + # Check whether n_components and svd_solver are given as keyword arguments + n_components = kwargs.get("n_components", forecast_ens.shape[0]) + svd_solver = kwargs.get("svd_solver", "full") + + # Initialize PCA and fit it to the input data + pca = decomposition.PCA(n_components=n_components, svd_solver=svd_solver) + pca.fit(forecast_ens) + + # Create output dictionary and save principal components and mean + pca_params = {} + pca_params["principal_components"] = pca.components_ + pca_params["mean"] = pca.mean_ + pca_params["explained_variance"] = pca.explained_variance_ratio_ + + else: + # If output dict is given, check whether principal components and mean are included + if not "principal_components" in pca_params.keys(): + raise KeyError("Output is not None but has no key 'principal_components'!") + if not "mean" in pca_params.keys(): + raise KeyError("Output is not None but has no key 'mean'!") + + # Check whether PC and mean have the correct shape + if forecast_ens.shape[1] != len(pca_params["mean"]): + raise ValueError("pca mean has not the same length as the input array!") + if forecast_ens.shape[1] != pca_params["principal_components"].shape[1]: + raise ValueError( + "principal components have not the same length as the input array" + ) + + # If no mask is given, transform the full input data into PC space. + if mask is None: + forecast_ens_pc = np.dot( + (forecast_ens - pca_params["mean"]), pca_params["principal_components"].T + ) + else: + forecast_ens_pc = np.dot( + (forecast_ens[:, mask] - pca_params["mean"][mask]), + pca_params["principal_components"][:, mask].T, + ) + + if get_params: + return forecast_ens_pc, pca_params + else: + return forecast_ens_pc + + +def pca_backtransform(forecast_ens_pc: np.ndarray, pca_params: dict): + """ + Transform a given PC transformation back into physical space. + + Parameters + ---------- + forecast_ens_pc: np.ndarray + Two-dimensional array of shape (n_components, n_ens) containing the full input + data transformed into PC space. + pca_params: dict + If the respective flag (get_params) is set to True, a dictionary is returned + containing: + -principal_components: np.ndarray + Two-dimensional array of shape (n_components, n_features) containing the + principal components in feature space. + -mean: np.ndarray + One-dimensional array of shape (n_features) containing the per-feature + empirical mean estimated from the input data. + + Returns + ------- + forecast_ens: np.ndarray + Two-dimensional of shape (n_ens, n_gridpoints) containing the backtransformed + precipitation forecast. + """ + + # If output dict is given, check whether principal components and mean are included + if not "principal_components" in pca_params.keys(): + raise KeyError("Output is not None but has no key 'principal_components'!") + if not "mean" in pca_params.keys(): + raise KeyError("Output is not None but has no key 'mean'!") + + # Check whether PC and forecast_ens_pc have the correct shape + if forecast_ens_pc.shape[1] != pca_params["principal_components"].shape[0]: + raise ValueError("pca mean has not the same length as the input array!") + + # Transform forecast_ens_pc back into physical space. + forecast_ens = ( + np.dot(forecast_ens_pc, pca_params["principal_components"]) + pca_params["mean"] + ) + + return forecast_ens diff --git a/pysteps/utils/reprojection.py b/pysteps/utils/reprojection.py index 6144a52c0..1d070550d 100644 --- a/pysteps/utils/reprojection.py +++ b/pysteps/utils/reprojection.py @@ -12,6 +12,7 @@ reproject_grids """ from pysteps.exceptions import MissingOptionalDependency +from scipy.interpolate import griddata import numpy as np @@ -23,6 +24,13 @@ except ImportError: RASTERIO_IMPORTED = False +try: + import pyproj + + PYPROJ_IMPORTED = True +except ImportError: + PYPROJ_IMPORTED = False + def reproject_grids(src_array, dst_array, metadata_src, metadata_dst): """ @@ -118,3 +126,115 @@ def reproject_grids(src_array, dst_array, metadata_src, metadata_dst): metadata[key] = metadata_dst[key] return r_rprj, metadata + + +def unstructured2regular(src_array, metadata_src, metadata_dst): + """ + Reproject unstructured data onto a regular grid on the assumption that + both src data and dst grid have the same projection. + + Parameters + ---------- + src_array: np.ndarray + Three-dimensional array of shape (t, n_ens, n_gridcells) containing a + time series of precipitation ensemble forecasts. These precipitation + fields will be reprojected. + metadata_src: dict + Metadata dictionary containing the projection, clon, clat, and ngridcells + and attributes of the src_array as described in the documentation of + :py:mod:`pysteps.io.importers`. + metadata_dst: dict + Metadata dictionary containing the projection, x- and ypixelsize, x1 and + y2 attributes of the dst_array. + + Returns + ------- + tuple + A tuple containing: + - r_rprj: np.ndarray + Four dimensional array of shape (t, n_ens, x, y) containing the + precipitation fields of src_array, but reprojected to the grid + of dst_array. + - metadata: dict + Dictionary containing geospatial metadat such as: + - 'projection' : PROJ.4 string defining the stereographic projection. + - 'xpixelsize', 'ypixelsize': Pixel size in meters. + - 'x1', 'y1': Carthesian coordinates of the lower-left corner. + - 'x2', 'y2': Carthesian coordinates of the upper-right corner. + - 'cartesian_unit': Unit of the coordinate system (meters). + """ + + if not PYPROJ_IMPORTED: + raise MissingOptionalDependency( + "pyproj package is required to reproject DWD's NWP data" + "but it is not installed" + ) + + if not "clon" in metadata_src.keys(): + raise KeyError("Center longitude (clon) is missing in metadata_src") + if not "clat" in metadata_src.keys(): + raise KeyError("Center latitude (clat) is missing in metadata_src") + + # Get number of grid cells + Nc = metadata_src["clon"].shape[0] + ic_in = np.arange(Nc) + + # Get cartesian coordinates of destination grid + x_dst = np.arange( + np.float32(metadata_dst["x1"]), + np.float32(metadata_dst["x2"]), + metadata_dst["xpixelsize"], + ) + + y_dst = np.arange( + np.float32(metadata_dst["y1"]), + np.float32(metadata_dst["y2"]), + metadata_dst["ypixelsize"], + ) + + # Create destination grid + if metadata_dst["yorigin"] == "upper": + y_dst = y_dst[::-1] + xx_dst, yy_dst = np.meshgrid(x_dst, y_dst) + s_out = yy_dst.shape + + # Extract the grid info of src_array assuming the same projection of src and dst + pr = pyproj.Proj(metadata_dst["projection"]) + x_src, y_src = pr(metadata_src["clon"], metadata_src["clat"]) + + # Create array of x-y pairs for interpolation + P_in = np.stack((x_src, y_src)).T + P_out = np.array((xx_dst.flatten(), yy_dst.flatten())).T + + # Nearest neighbor interpolation of x-y pairs + ic_out = ( + griddata(P_in, ic_in.flatten(), P_out, method="nearest") + .reshape(s_out) + .astype(int) + ) + + # Apply interpolation on all time steps and ensemble members + r_rprj = np.array( + [ + [src_array[i, j][ic_out] for j in range(src_array.shape[1])] + for i in range(src_array.shape[0]) + ] + ) + + # Update the src metadata + metadata = metadata_src.copy() + + for key in [ + "projection", + "yorigin", + "xpixelsize", + "ypixelsize", + "x1", + "x2", + "y1", + "y2", + "cartesian_unit", + ]: + metadata[key] = metadata_dst[key] + + return r_rprj, metadata diff --git a/requirements_dev.txt b/requirements_dev.txt index f1ff6a845..c49ea2d18 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,5 +1,5 @@ # Base dependencies -python>=3.11 +python>=3.10 numpy opencv-python pillow @@ -16,6 +16,7 @@ pyfftw cartopy>=0.18 h5py scikit-image +scikit-learn pandas rasterio diff --git a/tox.ini b/tox.ini index af8938841..86a2e4b1d 100644 --- a/tox.ini +++ b/tox.ini @@ -27,6 +27,7 @@ deps = pyfftw h5py PyWavelets + scikit-learn gitpython pytest pytest-cov