From 0c8ee70061734cdbdeea9ee50a227e0e094e144c Mon Sep 17 00:00:00 2001 From: Daniel Villarreal <65949926+DANIELV95@users.noreply.github.com> Date: Wed, 12 Feb 2025 15:21:02 +0100 Subject: [PATCH 1/5] Update anvil.py - Added lines to print averaged gamma and phi values - Corrected for inputs longer than required (vil.shape[0] > ar_order + 2) --- pysteps/nowcasts/anvil.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pysteps/nowcasts/anvil.py b/pysteps/nowcasts/anvil.py index 9da0fb47e..c9dfbe774 100644 --- a/pysteps/nowcasts/anvil.py +++ b/pysteps/nowcasts/anvil.py @@ -23,6 +23,7 @@ from scipy.ndimage import gaussian_filter from pysteps import cascade, extrapolation from pysteps.nowcasts.utils import nowcast_main_loop +from pysteps.nowcasts import utils as nowcast_utils from pysteps.timeseries import autoregression from pysteps import utils @@ -184,7 +185,7 @@ def forecast( starttime_init = time.time() m, n = vil.shape[1:] - vil = vil.copy() + vil = vil[-(ar_order+2) :].copy() if rainrate is None and apply_rainrate_mask: rainrate_mask = vil[-1, :] < 0.1 @@ -273,6 +274,10 @@ def worker(vil, i): gamma[i, 0, :], gamma[i, 1, :] ) + nowcast_utils.print_corrcoefs( + np.nanmean(gamma, axis=(-2,-1)) + ) + # estimate the parameters of the ARI models phi = [] for i in range(n_cascade_levels): @@ -284,6 +289,10 @@ def worker(vil, i): phi_ = _estimate_ar1_params(gamma[i, :]) phi.append(phi_) + nowcast_utils.print_ar_params( + np.nanmean(np.array(phi), axis=(-2,-1)) + ) + vil_dec = vil_dec[:, -(ar_order + 1) :, :] if measure_time: @@ -339,7 +348,7 @@ def _check_inputs(vil, rainrate, velocity, timesteps, ar_order): "rainrate.shape = %s, but a two-dimensional array expected" % str(rainrate.shape) ) - if vil.shape[0] != ar_order + 2: + if vil.shape[0] < ar_order + 2: raise ValueError( "vil.shape[0] = %d, but vil.shape[0] = ar_order + 2 = %d required" % (vil.shape[0], ar_order + 2) From c9928fbc159425f87bf6f2b0a0d0c69ed1b4700b Mon Sep 17 00:00:00 2001 From: Daniel Eduardo Villarreal Jaime Date: Tue, 9 Sep 2025 10:55:00 +0200 Subject: [PATCH 2/5] Copy sprog.py as sprogloc.py to compare --- pysteps/nowcasts/sprogloc.py | 432 +++++++++++++++++++++++++++++++++++ 1 file changed, 432 insertions(+) create mode 100644 pysteps/nowcasts/sprogloc.py diff --git a/pysteps/nowcasts/sprogloc.py b/pysteps/nowcasts/sprogloc.py new file mode 100644 index 000000000..3742556e2 --- /dev/null +++ b/pysteps/nowcasts/sprogloc.py @@ -0,0 +1,432 @@ +""" +pysteps.nowcasts.sprog +====================== + +Implementation of the S-PROG method described in :cite:`Seed2003` + +.. autosummary:: + :toctree: ../generated/ + + forecast +""" + +import time + +import numpy as np + +from pysteps import cascade, extrapolation, utils +from pysteps.nowcasts import utils as nowcast_utils +from pysteps.nowcasts.utils import compute_percentile_mask, nowcast_main_loop +from pysteps.postprocessing import probmatching +from pysteps.timeseries import autoregression, correlation +from pysteps.utils.check_norain import check_norain + +try: + import dask + + DASK_IMPORTED = True +except ImportError: + DASK_IMPORTED = False + + +def forecast( + precip, + velocity, + timesteps, + precip_thr=None, + norain_thr=0.0, + n_cascade_levels=6, + extrap_method="semilagrangian", + decomp_method="fft", + bandpass_filter_method="gaussian", + ar_order=2, + conditional=False, + probmatching_method="cdf", + num_workers=1, + fft_method="numpy", + domain="spatial", + extrap_kwargs=None, + filter_kwargs=None, + measure_time=False, +): + """ + Generate a nowcast by using the Spectral Prognosis (S-PROG) method. + + Parameters + ---------- + precip: array-like + Array of shape (ar_order+1,m,n) containing the input precipitation fields + ordered by timestamp from oldest to newest. The time steps between + the inputs are assumed to be regular. + velocity: array-like + Array of shape (2,m,n) containing the x- and y-components of the + advection field. + The velocities are assumed to represent one time step between the + inputs. All values are required to be finite. + timesteps: int or list of floats + Number of time steps to forecast or a list of time steps for which the + forecasts are computed (relative to the input time step). The elements + of the list are required to be in ascending order. + precip_thr: float, required + The threshold value for minimum observable precipitation intensity. + norain_thr: float + Specifies the threshold value for the fraction of rainy (see above) pixels + in the radar rainfall field below which we consider there to be no rain. + Depends on the amount of clutter typically present. + Standard set to 0.0 + n_cascade_levels: int, optional + The number of cascade levels to use. Defaults to 6, see issue #385 + on GitHub. + extrap_method: str, optional + Name of the extrapolation method to use. See the documentation of + pysteps.extrapolation.interface. + decomp_method: {'fft'}, optional + Name of the cascade decomposition method to use. See the documentation + of pysteps.cascade.interface. + bandpass_filter_method: {'gaussian', 'uniform'}, optional + Name of the bandpass filter method to use with the cascade decomposition. + See the documentation of pysteps.cascade.interface. + ar_order: int, optional + The order of the autoregressive model to use. Must be >= 1. + conditional: bool, optional + If set to True, compute the statistics of the precipitation field + conditionally by excluding pixels where the values are + below the threshold precip_thr. + probmatching_method: {'cdf','mean',None}, optional + Method for matching the conditional statistics of the forecast field + (areas with precipitation intensity above the threshold precip_thr) with + those of the most recently observed one. 'cdf'=map the forecast CDF to the + observed one, 'mean'=adjust only the mean value, + None=no matching applied. + num_workers: int, optional + The number of workers to use for parallel computation. Applicable if dask + is enabled or pyFFTW is used for computing the FFT. + When num_workers>1, it is advisable to disable OpenMP by setting + the environment variable OMP_NUM_THREADS to 1. + This avoids slowdown caused by too many simultaneous threads. + fft_method: str, optional + A string defining the FFT method to use (see utils.fft.get_method). + Defaults to 'numpy' for compatibility reasons. If pyFFTW is installed, + the recommended method is 'pyfftw'. + domain: {"spatial", "spectral"} + If "spatial", all computations are done in the spatial domain (the + classical S-PROG model). If "spectral", the AR(2) models are applied + directly in the spectral domain to reduce memory footprint and improve + performance :cite:`PCH2019a`. + extrap_kwargs: dict, optional + Optional dictionary containing keyword arguments for the extrapolation + method. See the documentation of pysteps.extrapolation. + filter_kwargs: dict, optional + Optional dictionary containing keyword arguments for the filter method. + See the documentation of pysteps.cascade.bandpass_filters.py. + measure_time: bool + If set to True, measure, print and return the computation time. + + Returns + ------- + out: ndarray + A three-dimensional array of shape (num_timesteps,m,n) containing a time + series of forecast precipitation fields. The time series starts from + t0+timestep, where timestep is taken from the input precipitation fields + precip. If measure_time is True, the return value is a three-element + tuple containing the nowcast array, the initialization time of the + nowcast generator and the time used in the main loop (seconds). + + See also + -------- + pysteps.extrapolation.interface, pysteps.cascade.interface + + References + ---------- + :cite:`Seed2003`, :cite:`PCH2019a` + """ + + _check_inputs(precip, velocity, timesteps, ar_order) + + if extrap_kwargs is None: + extrap_kwargs = dict() + + if filter_kwargs is None: + filter_kwargs = dict() + + if np.any(~np.isfinite(velocity)): + raise ValueError("velocity contains non-finite values") + + if precip_thr is None: + raise ValueError("precip_thr required but not specified") + + print("Computing S-PROG nowcast") + print("------------------------") + print("") + + print("Inputs") + print("------") + print(f"input dimensions: {precip.shape[1]}x{precip.shape[2]}") + print("") + + print("Methods") + print("-------") + print(f"extrapolation: {extrap_method}") + print(f"bandpass filter: {bandpass_filter_method}") + print(f"decomposition: {decomp_method}") + print("conditional statistics: {}".format("yes" if conditional else "no")) + print(f"probability matching: {probmatching_method}") + print(f"FFT method: {fft_method}") + print(f"domain: {domain}") + print("") + + print("Parameters") + print("----------") + if isinstance(timesteps, int): + print(f"number of time steps: {timesteps}") + else: + print(f"time steps: {timesteps}") + print(f"parallel threads: {num_workers}") + print(f"number of cascade levels: {n_cascade_levels}") + print(f"order of the AR(p) model: {ar_order}") + print(f"precip. intensity threshold: {precip_thr}") + + if measure_time: + starttime_init = time.time() + else: + starttime_init = None + + fft = utils.get_method(fft_method, shape=precip.shape[1:], n_threads=num_workers) + + m, n = precip.shape[1:] + + # initialize the band-pass filter + filter_method = cascade.get_method(bandpass_filter_method) + bp_filter = filter_method((m, n), n_cascade_levels, **filter_kwargs) + + decomp_method, recomp_method = cascade.get_method(decomp_method) + + extrapolator_method = extrapolation.get_method(extrap_method) + + precip = precip[-(ar_order + 1) :, :, :].copy() + precip_min = np.nanmin(precip) + + # determine the domain mask from non-finite values + domain_mask = np.logical_or.reduce( + [~np.isfinite(precip[i, :]) for i in range(precip.shape[0])] + ) + + if check_norain(precip, precip_thr, norain_thr, None): + return nowcast_utils.zero_precipitation_forecast( + None, timesteps, precip, None, True, measure_time, starttime_init + ) + + # determine the precipitation threshold mask + if conditional: + mask_thr = np.logical_and.reduce( + [precip[i, :, :] >= precip_thr for i in range(precip.shape[0])] + ) + else: + mask_thr = None + + # initialize the extrapolator + x_values, y_values = np.meshgrid( + np.arange(precip.shape[2]), np.arange(precip.shape[1]) + ) + + xy_coords = np.stack([x_values, y_values]) + + extrap_kwargs = extrap_kwargs.copy() + extrap_kwargs["xy_coords"] = xy_coords + extrap_kwargs["allow_nonfinite_values"] = ( + True if np.any(~np.isfinite(precip)) else False + ) + + # advect the previous precipitation fields to the same position with the + # most recent one (i.e. transform them into the Lagrangian coordinates) + res = list() + + def f(precip, i): + return extrapolator_method( + precip[i, :], velocity, ar_order - i, "min", **extrap_kwargs + )[-1] + + for i in range(ar_order): + if not DASK_IMPORTED: + precip[i, :, :] = f(precip, i) + else: + res.append(dask.delayed(f)(precip, i)) + + if DASK_IMPORTED: + num_workers_ = len(res) if num_workers > len(res) else num_workers + precip = np.stack( + list(dask.compute(*res, num_workers=num_workers_)) + [precip[-1, :, :]] + ) + + # replace non-finite values with the minimum value + precip = precip.copy() + for i in range(precip.shape[0]): + precip[i, ~np.isfinite(precip[i, :])] = np.nanmin(precip[i, :]) + + # compute the cascade decompositions of the input precipitation fields + precip_decomp = [] + for i in range(ar_order + 1): + precip_ = decomp_method( + precip[i, :, :], + bp_filter, + mask=mask_thr, + fft_method=fft, + output_domain=domain, + normalize=True, + compute_stats=True, + compact_output=True, + ) + precip_decomp.append(precip_) + + # rearrange the cascade levels into a four-dimensional array of shape + # (n_cascade_levels,ar_order+1,m,n) for the autoregressive model + precip_cascades = nowcast_utils.stack_cascades( + precip_decomp, n_cascade_levels, convert_to_full_arrays=True + ) + + # compute lag-l temporal autocorrelation coefficients for each cascade level + gamma = np.empty((n_cascade_levels, ar_order)) + for i in range(n_cascade_levels): + if domain == "spatial": + gamma[i, :] = correlation.temporal_autocorrelation( + precip_cascades[i], mask=mask_thr + ) + else: + gamma[i, :] = correlation.temporal_autocorrelation( + precip_cascades[i], domain="spectral", x_shape=precip.shape[1:] + ) + + precip_cascades = nowcast_utils.stack_cascades( + precip_decomp, n_cascade_levels, convert_to_full_arrays=False + ) + + precip_decomp = precip_decomp[-1] + + nowcast_utils.print_corrcoefs(gamma) + + if ar_order == 2: + # adjust the lag-2 correlation coefficient to ensure that the AR(p) + # process is stationary + for i in range(n_cascade_levels): + gamma[i, 1] = autoregression.adjust_lag2_corrcoef2(gamma[i, 0], gamma[i, 1]) + + # estimate the parameters of the AR(p) model from the autocorrelation + # coefficients + phi = np.empty((n_cascade_levels, ar_order + 1)) + for i in range(n_cascade_levels): + phi[i, :] = autoregression.estimate_ar_params_yw(gamma[i, :]) + + nowcast_utils.print_ar_params(phi) + + # discard all except the p-1 last cascades because they are not needed for + # the AR(p) model + precip_cascades = [precip_cascades[i][-ar_order:] for i in range(n_cascade_levels)] + + if probmatching_method == "mean": + mu_0 = np.mean(precip[-1, :, :][precip[-1, :, :] >= precip_thr]) + else: + mu_0 = None + + # compute precipitation mask and wet area ratio + mask_p = precip[-1, :, :] >= precip_thr + war = 1.0 * np.sum(mask_p) / (precip.shape[1] * precip.shape[2]) + + if measure_time: + init_time = time.time() - starttime_init + + precip = precip[-1, :, :] + + print("Starting nowcast computation.") + + precip_forecast = [] + + state = {"precip_cascades": precip_cascades, "precip_decomp": precip_decomp} + params = { + "domain": domain, + "domain_mask": domain_mask, + "fft": fft, + "mu_0": mu_0, + "n_cascade_levels": n_cascade_levels, + "phi": phi, + "precip_0": precip, + "precip_min": precip_min, + "probmatching_method": probmatching_method, + "recomp_method": recomp_method, + "war": war, + } + + precip_forecast = nowcast_main_loop( + precip, + velocity, + state, + timesteps, + extrap_method, + _update, + extrap_kwargs=extrap_kwargs, + params=params, + measure_time=measure_time, + ) + if measure_time: + precip_forecast, mainloop_time = precip_forecast + + precip_forecast = np.stack(precip_forecast) + + if measure_time: + return precip_forecast, init_time, mainloop_time + else: + return precip_forecast + + +def _check_inputs(precip, velocity, timesteps, ar_order): + if precip.ndim != 3: + raise ValueError("precip must be a three-dimensional array") + if precip.shape[0] < ar_order + 1: + raise ValueError("precip.shape[0] < ar_order+1") + if velocity.ndim != 3: + raise ValueError("velocity must be a three-dimensional array") + if precip.shape[1:3] != velocity.shape[1:3]: + raise ValueError( + "dimension mismatch between precip and velocity: shape(precip)=%s, shape(velocity)=%s" + % (str(precip.shape), str(velocity.shape)) + ) + if isinstance(timesteps, list) and not sorted(timesteps) == timesteps: + raise ValueError("timesteps is not in ascending order") + + +def _update(state, params): + for i in range(params["n_cascade_levels"]): + state["precip_cascades"][i] = autoregression.iterate_ar_model( + state["precip_cascades"][i], params["phi"][i, :] + ) + + state["precip_decomp"]["cascade_levels"] = [ + state["precip_cascades"][i][-1, :] for i in range(params["n_cascade_levels"]) + ] + if params["domain"] == "spatial": + state["precip_decomp"]["cascade_levels"] = np.stack( + state["precip_decomp"]["cascade_levels"] + ) + + precip_forecast_recomp = params["recomp_method"](state["precip_decomp"]) + + if params["domain"] == "spectral": + precip_forecast_recomp = params["fft"].irfft2(precip_forecast_recomp) + + mask = compute_percentile_mask(precip_forecast_recomp, params["war"]) + precip_forecast_recomp[~mask] = params["precip_min"] + + if params["probmatching_method"] == "cdf": + # adjust the CDF of the forecast to match the most recently + # observed precipitation field + precip_forecast_recomp = probmatching.nonparam_match_empirical_cdf( + precip_forecast_recomp, params["precip_0"] + ) + elif params["probmatching_method"] == "mean": + mu_fct = np.mean(precip_forecast_recomp[mask]) + precip_forecast_recomp[mask] = ( + precip_forecast_recomp[mask] - mu_fct + params["mu_0"] + ) + + precip_forecast_recomp[params["domain_mask"]] = np.nan + + return precip_forecast_recomp, state From 4d077ca414e1cc72921348576435e7a0492d3a09 Mon Sep 17 00:00:00 2001 From: Daniel Eduardo Villarreal Jaime Date: Wed, 10 Sep 2025 10:35:49 +0200 Subject: [PATCH 3/5] Added Sprogloc method and small changes in correlation and autoregression functions with localization --- pysteps/timeseries/autoregression.py | 74 ++++++++++++++++++++++++++++ pysteps/timeseries/correlation.py | 14 +++--- 2 files changed, 81 insertions(+), 7 deletions(-) diff --git a/pysteps/timeseries/autoregression.py b/pysteps/timeseries/autoregression.py index 729525480..80213f9b0 100644 --- a/pysteps/timeseries/autoregression.py +++ b/pysteps/timeseries/autoregression.py @@ -553,6 +553,80 @@ def estimate_ar_params_yw_localized(gamma, d=0): return list(phi_out.reshape(np.hstack([[len(phi_out)], gamma[0].shape]))) +# optimized version of timeseries.autoregression.estimate_ar_params_yw_localized +# for an ARI(1,d) model +def estimate_ar1_params_yw_localized(gamma, d): + """ + Estimate AR(1) parameters for a given autocorrelation structure. + + Parameters: + - gamma (np.ndarray): The autocorrelation coefficients for the time series. + - d (int): The differencing order (0 for AR, 1 for ARI). + + Returns: + - phi (np.ndarray): Estimated AR(1) parameters. + """ + phi = [] + phi1 = gamma[0, :] + phi0 = np.sqrt(np.maximum(1.0 - phi1**2, 0)) + + if d == 0: + # AR(1,0) model (no differencing) + phi.append(phi1) + # Noise term + phi.append(phi0) + + elif d == 1: + # ARI(1,1) model (first differenced) + phi1_d = 1 + phi1 + phi2_d = -phi1 + phi.append(phi1_d) + phi.append(phi2_d) + # Noise term + phi.append(phi0) + + return np.array(phi) + + +# for an ARI(2,d) model +def estimate_ar2_params_yw_localized(gamma, d): + """ + Estimate AR(2) parameters for a given autocorrelation structure. + + Parameters: + - gamma (np.ndarray): The autocorrelation coefficients for the time series. + - d (int): The differencing order (0 for AR, 1 for ARI). + + Returns: + - phi (np.ndarray): Estimated AR(2) parameters. + """ + phi = [] + g1 = gamma[0, :] + g2 = gamma[1, :] + phi1 = (g1 * (1.0 - g2)) / (1.0 - g1**2) + phi2 = (g2 - g1**2) / (1.0 - g1**2) + phi0 = np.sqrt(np.maximum(1.0 - phi1 * g1 - phi2 * g2, 0)) + + if d == 0: + # AR(2,0) model (no differencing) + phi.append(phi1) + phi.append(phi2) + # Noise term + phi.append(phi0) + elif d == 1: + # ARI(2,1) model (first differenced) + phi1_d = 1 + phi1 + phi2_d = phi2 - phi1 + phi3_d = -phi2 + phi.append(phi1_d) + phi.append(phi2_d) + phi.append(phi3_d) + # Noise term + phi.append(phi0) + + return np.array(phi) + + def estimate_var_params_ols( x, p, d=0, check_stationarity=True, include_constant_term=False, h=0, lam=0.0 ): diff --git a/pysteps/timeseries/correlation.py b/pysteps/timeseries/correlation.py index a43b41a7e..c72189963 100644 --- a/pysteps/timeseries/correlation.py +++ b/pysteps/timeseries/correlation.py @@ -118,9 +118,12 @@ def temporal_autocorrelation( if window_radius == np.inf: cc = np.corrcoef(x[-1, :][mask], x[-(k + 2), :][mask])[0, 1] else: + #Compute global autocorrelation coefficient to fill nan values + ccg = np.corrcoef(x[-1, :][mask], x[-(k + 2), :][mask])[0, 1] cc = _moving_window_corrcoef( - x[-1, :], x[-(k + 2), :], window_radius, mask=mask + x[-1, :], x[-(k + 2), :], window_radius, window=window, mask=mask ) + cc[~np.isfinite(cc)] = ccg else: cc = spectral.corrcoef( x[-1, :, :], x[-(k + 2), :], x_shape, use_full_fft=use_full_fft @@ -237,14 +240,11 @@ def _moving_window_corrcoef(x, y, window_radius, window="gaussian", mask=None): if window == "gaussian": convol_filter = ndimage.gaussian_filter - else: + window_size = window_radius + else: #window == "uniform" convol_filter = ndimage.uniform_filter - - if window == "uniform": window_size = 2 * window_radius + 1 - else: - window_size = window_radius - + n = convol_filter(mask, window_size, mode="constant") * window_size**2 sx = convol_filter(x, window_size, mode="constant") * window_size**2 From 3a9fd51bc55ed354381c74f864a5eec6b1c8296c Mon Sep 17 00:00:00 2001 From: Daniel Eduardo Villarreal Jaime Date: Wed, 10 Sep 2025 10:37:33 +0200 Subject: [PATCH 4/5] Updated sprogloc method --- pysteps/nowcasts/sprogloc.py | 134 ++++++++++++++++++++++++----------- 1 file changed, 91 insertions(+), 43 deletions(-) diff --git a/pysteps/nowcasts/sprogloc.py b/pysteps/nowcasts/sprogloc.py index 3742556e2..23ee6d36f 100644 --- a/pysteps/nowcasts/sprogloc.py +++ b/pysteps/nowcasts/sprogloc.py @@ -1,8 +1,9 @@ """ -pysteps.nowcasts.sprog -====================== +pysteps.nowcasts.sprogloc +========================= -Implementation of the S-PROG method described in :cite:`Seed2003` +Implementation of the S-PROG Localized method described in :cite:`RRR2022`, +based in S-PROG method described in :cite:`Seed2003` .. autosummary:: :toctree: ../generated/ @@ -39,7 +40,11 @@ def forecast( extrap_method="semilagrangian", decomp_method="fft", bandpass_filter_method="gaussian", + gamma_filter_method="uniform", ar_order=2, + d_order=0, + ar_window_radius=None, + gamma_factor=1.0, conditional=False, probmatching_method="cdf", num_workers=1, @@ -50,12 +55,12 @@ def forecast( measure_time=False, ): """ - Generate a nowcast by using the Spectral Prognosis (S-PROG) method. + Generate a nowcast by using the Spectral Prognosis Localized (S-PROG-LOC) method. Parameters ---------- precip: array-like - Array of shape (ar_order+1,m,n) containing the input precipitation fields + Array of shape (ar_order+d_order+1,m,n) containing the input precipitation fields ordered by timestamp from oldest to newest. The time steps between the inputs are assumed to be regular. velocity: array-like @@ -87,7 +92,13 @@ def forecast( Name of the bandpass filter method to use with the cascade decomposition. See the documentation of pysteps.cascade.interface. ar_order: int, optional - The order of the autoregressive model to use. Must be >= 1. + The order of the autoregressive integrated model to use. Must be >= 1. + d_order: int, optional + The differencing order of the autoregressive integrated model to use. + Must be >= 0. If d == 0, then ARI(p,0) == AR(p), as used in S-PROG model. + ar_window_radius: int or list, optional + The radius of the window to use for determining the parameters of the + autoregressive integrated model. Set to None to disable localization. conditional: bool, optional If set to True, compute the statistics of the precipitation field conditionally by excluding pixels where the values are @@ -110,7 +121,7 @@ def forecast( the recommended method is 'pyfftw'. domain: {"spatial", "spectral"} If "spatial", all computations are done in the spatial domain (the - classical S-PROG model). If "spectral", the AR(2) models are applied + classical S-PROG model). If "spectral", the ARI(p,d) models are applied directly in the spectral domain to reduce memory footprint and improve performance :cite:`PCH2019a`. extrap_kwargs: dict, optional @@ -138,10 +149,10 @@ def forecast( References ---------- - :cite:`Seed2003`, :cite:`PCH2019a` + :cite:`Seed2003`, :cite:`PCH2019a`, :cite:`RRR2022` """ - _check_inputs(precip, velocity, timesteps, ar_order) + _check_inputs(precip, velocity, timesteps, ar_order, d_order, ar_window_radius) if extrap_kwargs is None: extrap_kwargs = dict() @@ -155,7 +166,16 @@ def forecast( if precip_thr is None: raise ValueError("precip_thr required but not specified") - print("Computing S-PROG nowcast") + # Fix length of ar_windows_radius for localization + if ar_window_radius is None: + ar_window_radius = np.full(n_cascade_levels, np.inf) + elif isinstance(ar_window_radius, int): + ar_window_radius = np.full(n_cascade_levels, ar_window_radius) + elif isinstance(ar_window_radius, list): + # Use user-defined window sizes + ar_window_radius = np.array(ar_window_radius) + + print("Computing S-PROG-LOC nowcast") print("------------------------") print("") @@ -183,7 +203,9 @@ def forecast( print(f"time steps: {timesteps}") print(f"parallel threads: {num_workers}") print(f"number of cascade levels: {n_cascade_levels}") - print(f"order of the AR(p) model: {ar_order}") + print(f"order of the ARI(p,d) model: {ar_order}") + print(f"differencing order of the ARI(p,d) model: {d_order}") + print(f"ARI(p,d) window radius: {ar_window_radius}") print(f"precip. intensity threshold: {precip_thr}") if measure_time: @@ -203,7 +225,7 @@ def forecast( extrapolator_method = extrapolation.get_method(extrap_method) - precip = precip[-(ar_order + 1) :, :, :].copy() + precip = precip[-(ar_order + d_order + 1) :, :, :].copy() precip_min = np.nanmin(precip) # determine the domain mask from non-finite values @@ -243,10 +265,10 @@ def forecast( def f(precip, i): return extrapolator_method( - precip[i, :], velocity, ar_order - i, "min", **extrap_kwargs + precip[i, :], velocity, ar_order + d_order - i, "min", **extrap_kwargs )[-1] - for i in range(ar_order): + for i in range(ar_order + d_order): if not DASK_IMPORTED: precip[i, :, :] = f(precip, i) else: @@ -265,7 +287,7 @@ def f(precip, i): # compute the cascade decompositions of the input precipitation fields precip_decomp = [] - for i in range(ar_order + 1): + for i in range(ar_order + d_order + 1): precip_ = decomp_method( precip[i, :, :], bp_filter, @@ -279,22 +301,44 @@ def f(precip, i): precip_decomp.append(precip_) # rearrange the cascade levels into a four-dimensional array of shape - # (n_cascade_levels,ar_order+1,m,n) for the autoregressive model + # (n_cascade_levels,ar_order+d_order+1,m,n) for the autoregressive integrated model precip_cascades = nowcast_utils.stack_cascades( precip_decomp, n_cascade_levels, convert_to_full_arrays=True ) - # compute lag-l temporal autocorrelation coefficients for each cascade level - gamma = np.empty((n_cascade_levels, ar_order)) + # compute localized lag-l temporal autocorrelation coefficients for each cascade level + gamma = np.empty((n_cascade_levels, ar_order, m, n)) for i in range(n_cascade_levels): - if domain == "spatial": - gamma[i, :] = correlation.temporal_autocorrelation( - precip_cascades[i], mask=mask_thr - ) - else: - gamma[i, :] = correlation.temporal_autocorrelation( - precip_cascades[i], domain="spectral", x_shape=precip.shape[1:] + gamma_ = np.array( + correlation.temporal_autocorrelation( + precip_cascades[i], + d=d_order, + domain=domain, + x_shape=(m, n), + mask=mask_thr, + window=gamma_filter_method, + window_radius=ar_window_radius[i], ) + ) + # Adjust shape if no localization + if gamma_.ndim == 1: + for j in range(len(gamma_)): + gamma[i, j] = np.full((m, n), gamma_[j]) + # Assign values if localization + elif gamma_.ndim == 3: + gamma[i, :] = gamma_ + + # Adjust autocorrelation coefficients by the coefficient factor + gamma *= gamma_factor + gamma[gamma >= 1] = 0.999999 + + if ar_order == 2: + # adjust the lag-2 correlation coefficient to ensure that the ARI(p,d) + # process is stationary + for i in range(n_cascade_levels): + gamma[i, 1] = autoregression.adjust_lag2_corrcoef2(gamma[i, 0], gamma[i, 1]) + + nowcast_utils.print_corrcoefs(np.nanmean(gamma, axis=(-2, -1))) precip_cascades = nowcast_utils.stack_cascades( precip_decomp, n_cascade_levels, convert_to_full_arrays=False @@ -302,25 +346,26 @@ def f(precip, i): precip_decomp = precip_decomp[-1] - nowcast_utils.print_corrcoefs(gamma) - - if ar_order == 2: - # adjust the lag-2 correlation coefficient to ensure that the AR(p) - # process is stationary - for i in range(n_cascade_levels): - gamma[i, 1] = autoregression.adjust_lag2_corrcoef2(gamma[i, 0], gamma[i, 1]) - - # estimate the parameters of the AR(p) model from the autocorrelation + # estimate the parameters of the ARI(p,d) model from the autocorrelation # coefficients - phi = np.empty((n_cascade_levels, ar_order + 1)) + phi = np.empty((n_cascade_levels, ar_order + d_order + 1, m, n)) for i in range(n_cascade_levels): - phi[i, :] = autoregression.estimate_ar_params_yw(gamma[i, :]) + if ar_order > 2 or d_order > 1: + phi[i, :] = autoregression.estimate_ar_params_yw_localized( + gamma[i], d=d_order + ) + elif ar_order == 2: + phi[i, :] = autoregression.estimate_ar2_params_yw_localized(gamma[i], d=d_order) + elif ar_order == 1: + phi[i, :] = autoregression.estimate_ar1_params_yw_localized(gamma[i], d=d_order) - nowcast_utils.print_ar_params(phi) + nowcast_utils.print_ar_params(np.nanmean(np.array(phi), axis=(-2, -1))) - # discard all except the p-1 last cascades because they are not needed for - # the AR(p) model - precip_cascades = [precip_cascades[i][-ar_order:] for i in range(n_cascade_levels)] + # discard all except the p+d-1 last cascades because they are not needed for + # the ARI(p,d) model + precip_cascades = [ + precip_cascades[i][-(ar_order + d_order) :] for i in range(n_cascade_levels) + ] if probmatching_method == "mean": mu_0 = np.mean(precip[-1, :, :][precip[-1, :, :] >= precip_thr]) @@ -377,11 +422,11 @@ def f(precip, i): return precip_forecast -def _check_inputs(precip, velocity, timesteps, ar_order): +def _check_inputs(precip, velocity, timesteps, ar_order, d_order, ar_window_radius): if precip.ndim != 3: raise ValueError("precip must be a three-dimensional array") - if precip.shape[0] < ar_order + 1: - raise ValueError("precip.shape[0] < ar_order+1") + if precip.shape[0] < ar_order + d_order + 1: + raise ValueError("precip.shape[0] < ar_order+d_order+1") if velocity.ndim != 3: raise ValueError("velocity must be a three-dimensional array") if precip.shape[1:3] != velocity.shape[1:3]: @@ -391,6 +436,9 @@ def _check_inputs(precip, velocity, timesteps, ar_order): ) if isinstance(timesteps, list) and not sorted(timesteps) == timesteps: raise ValueError("timesteps is not in ascending order") + if ar_window_radius is not None: + if not isinstance(ar_window_radius, (int, list)): + raise ValueError("ar_window_radius type must be None, int or list") def _update(state, params): From a7c16183d00f6d3eee599eafe5912da8c45b9125 Mon Sep 17 00:00:00 2001 From: Daniel Eduardo Villarreal Jaime Date: Wed, 10 Sep 2025 10:47:57 +0200 Subject: [PATCH 5/5] Update sprogloc variable descriptions --- pysteps/nowcasts/sprogloc.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pysteps/nowcasts/sprogloc.py b/pysteps/nowcasts/sprogloc.py index 23ee6d36f..c2fbb34bc 100644 --- a/pysteps/nowcasts/sprogloc.py +++ b/pysteps/nowcasts/sprogloc.py @@ -91,6 +91,8 @@ def forecast( bandpass_filter_method: {'gaussian', 'uniform'}, optional Name of the bandpass filter method to use with the cascade decomposition. See the documentation of pysteps.cascade.interface. + gamma_filter_method: {'gaussian', 'uniform'}, optional + Name of the filter method for localized gamma values. ar_order: int, optional The order of the autoregressive integrated model to use. Must be >= 1. d_order: int, optional @@ -99,6 +101,8 @@ def forecast( ar_window_radius: int or list, optional The radius of the window to use for determining the parameters of the autoregressive integrated model. Set to None to disable localization. + gamma_factor: float, optional + Gamma correction factor to adjust 'manually' lozalized gamma values. conditional: bool, optional If set to True, compute the statistics of the precipitation field conditionally by excluding pixels where the values are