Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ def parse_args():
parser.add_argument('--work_path', type=str, default="/data/lilu/PHydro")

# data
parser.add_argument('--reuse_input', type=bool, default=False)
parser.add_argument('--reuse_input', type=bool, default=True)
parser.add_argument('--use_ancillary', type=bool, default=True)
parser.add_argument('--seq_len', type=int, default=365)
parser.add_argument('--seq_len', type=int, default=100)
parser.add_argument('--interval', type=int, default=365)
parser.add_argument('--window_size', type=int, default=0)
parser.add_argument('--num_out', type=int, default=6)
Expand All @@ -23,7 +23,7 @@ def parse_args():
# model
parser.add_argument('--model_name', type=str, default="hard_multi_tasks_v1")
parser.add_argument('--learning_rate', type=float, default=0.001)
parser.add_argument('--epochs', type=int, default=300)
parser.add_argument('--epochs', type=int, default=200)
parser.add_argument('--niter', type=int, default=100)
parser.add_argument('--hidden_size', type=int, default=64)
parser.add_argument('--alpha', type=float, default=0.5)
Expand Down
211 changes: 44 additions & 167 deletions src/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@

<<<<<<<< HEAD
Author: Lu Li
11/1/2022 - First edition
11/1/2022 - V1.0
12/9/2022 - V2.0
"""

import json
import numpy as np
from utils import make_CoLM_soil_depth


class Dataset():
Expand All @@ -20,24 +22,26 @@ def __init__(self, cfg, mode):
self.window_size = cfg["window_size"]
self.num_out = cfg["num_out"]
self.mode = mode
# calculate soil depth in CoLM
depth, zi = make_CoLM_soil_depth() # m/cm
self.soil = [70, 210, 720, 10*(zi[9]-100)] # mm

def fit(self):
# load input data [nt,ngrid,...]
# load input data shape as [nt,ngrid,.]
forcing, hydro, ancillary = self._load_input()

# remove unreasonable runoff
hydro = self._remove_rnof_outlier(hydro, threshold=20)

# Correct water balance [nt-1,ngrid,...]
# NOTE: CoLM soil moisture is available as mean of
# day, however, the delta(swc) in water balance
# should be swc(24)-swc(0). Thus we need to
# correct the water balance by add the residual
# of water balance into precipitation.
# NOTE: CoLM water balance is defined as
# P = ET + R + delta(SWC+ldew+wa+scv)
# In our study, we focus on simulating variables ET,
# R, SWC, thus we treat `wa`... as static variable.
# Thus, to enforce water balance, we need to
# remove the residual of water balance to P.
forcing, hydro, aux = self._correct_mass_conserve(forcing, hydro)

# remove unreasonable runoff
# NOTE: 30mm/day may not be the best setting, but we didn't
# found a better way to remove unreasonable runoff.
hydro = self._remove_outlier(hydro, threshold=30)

# get scaler
if self.mode == 'train':
scaler = self._get_z_scaler(forcing, hydro)
Expand All @@ -61,62 +65,29 @@ def fit(self):
ancillary = np.tile(ancillary[np.newaxis],(forcing.shape[0],1,1))
forcing = np.concatenate([forcing, ancillary], axis=-1)

# trans nt and ngrid [ngrid, nt, nfeat]
# trans shape as [ngrid,nt,.]
forcing = np.transpose(forcing, (1, 0, 2))
hydro = np.transpose(hydro, (1, 0, 2))
aux = np.transpose(aux, (1, 0))

# make training/test data
if self.mode == 'train':
return forcing, hydro, aux
elif self.mode == 'test':
forcing, hydro, aux = self._make_inference_data(forcing, hydro, aux, self.seq_len)
return forcing, hydro, aux
return forcing, hydro, aux

def _load_input(self):
forcing = np.load(self.inputs_path+"forcing_gd_9km_{}.npy".format(self.mode))
hydro = np.load(self.inputs_path+"hydro_gd_9km_{}.npy".format(self.mode))
ancillary = np.load(self.inputs_path+"ancil_gd_9km.npy")
forcing = np.load(self.inputs_path+"gd_9km_forcing_{}.npy".format(self.mode))
hydro = np.load(self.inputs_path+"gd_9km_hydrology_{}.npy".format(self.mode))
ancillary = np.load(self.inputs_path+"gd_9km_ancillary.npy")
return forcing, hydro, ancillary

def _correct_mass_conserve(self, forcing, hydro):
soil_depth = [70, 210, 720, 1864.6] # mm
hydro_prev, hydro, forcing = hydro[:-1], hydro[1:], forcing[1:]
swvl, swvl_prev = 0, 0
for i in range(4):
swvl += hydro[:, :, i]*soil_depth[i]
swvl_prev += hydro_prev[:,:,i]*soil_depth[i]
mc_in = np.nansum(forcing[:,:,:2], axis=-1) + swvl_prev
mc_out = swvl + np.nansum(hydro[:,:,4:], axis=-1)
diff = mc_in - mc_out

# if diff>0, then add to runoff;
# if diff<0, then add to precipitation;
for i in range(diff.shape[0]):
for j in range(diff.shape[1]):
tmp = diff[i,j]
if tmp < 0:
forcing[i,j,0] = forcing[i,j,0]-diff[i,j]
else:
hydro[i,j,-1] = hydro[i,j,-1]+diff[i,j]
# get mass in
aux = np.nansum(forcing[:,:,:2], axis=-1) + swvl_prev
return forcing, hydro, aux

def _remove_outlier(self, hydro, threshold=30):
"""
std = np.nanstd(input, axis=(0), keepdims=True) # (1, ngrids, nfeat)
mean = np.nanmean(input, axis=(0), keepdims=True) # (1, ngrids, nfeat)
input[np.where(input > (mean+3*std))] = np.nan
input[np.where(input < (mean-3*std))] = np.nan
self.remove_outlier = True
"""
"""
# @(Zhongwang Wei): remove unreasonable runoff > 200mm/day and
# interplote by adjacency two days
def _remove_rnof_outlier(self, hydro, threshold=20):
# NOTE: In CoLM, there are some runoff larger than 1e3 mm/h, this is
# caused by the parameterization of soil moisture, which cannot
# be solved now. Thus I remove runoff larger than 20 mm/h to
# ensure the robustness of results.
rnof = hydro[:, :, -1]
rnof[rnof > threshold] = np.nan
nt, ngrid, nout = hydro.shape
# @(Zhongwang Wei): interplote by adjacency two days
"""
for i in range(ngrid):
tmp = rnof[:, i]
if np.isnan(tmp).any():
Expand All @@ -129,20 +100,24 @@ def _remove_outlier(self, hydro, threshold=30):
else:
tmp[j] = np.nanmean(tmp) # (tmp[j-1]+tmp[j+1])/2
rnof[:, i] = tmp
hydro[:, :, -1] = rnof
"""
rnof = hydro[:, :, -1]
rnof[rnof > threshold] = np.nan
hydro[:, :, -1] = rnof
return hydro

def _get_minmax_scaler(self, x, y):
scaler = {}
scaler["x_min"] = np.nanmin(x, axis=(0), keepdims=True).tolist()
scaler["x_max"] = np.nanmax(x, axis=(0), keepdims=True).tolist()
scaler["y_min"] = np.nanmin(y, axis=(0), keepdims=True).tolist()
scaler["y_max"] = np.nanmax(y, axis=(0), keepdims=True).tolist()
return scaler
def _correct_mass_conserve(self, forcing, hydro):
hydro_prev, hydro, forcing = hydro[:-1], hydro[1:], forcing[1:]
# cal mass in/out
swvl, swvl_prev = 0, 0
for i in range(4):
swvl += hydro[:, :, i]*self.soil[i]
swvl_prev += hydro_prev[:,:,i]*self.soil[i]
mc_in = np.sum(forcing[:,:,:2], axis=-1) + swvl_prev
mc_out = swvl + np.sum(hydro[:,:,4:], axis=-1)
# cal diff in mass balance caused by wa, ldew, scv, xerror
diff = mc_in - mc_out
# get mass in after remove diff in balance
aux = np.sum(forcing[:,:,:2], axis=-1) - diff
return forcing, hydro, aux

def _get_z_scaler(self, x, y):
scaler = {}
Expand All @@ -162,6 +137,7 @@ def _load_scaler(self, inputs_path):
return scaler

def _z_normalize(self, input, scaler, is_feat):
"""normalize features using pre-computed statistics."""
if is_feat:
input = (input - np.array(scaler["x_mean"])) / (
np.array(scaler["x_std"]))
Expand All @@ -170,107 +146,8 @@ def _z_normalize(self, input, scaler, is_feat):
np.array(scaler["y_std"]))
return input

def _minmax_normalize(self, input, scaler, is_feat):
"""normalize features using pre-computed statistics."""
if is_feat:
input = (input - np.array(scaler["x_min"])) / (
np.array(scaler["x_max"])-np.array(scaler["x_min"]))
else:
input = (input - np.array(scaler["y_min"])) / (
np.array(scaler["y_max"])-np.array(scaler["y_min"]))
return input

def _spatial_normalize(self, static):
# (ngrid, nfeat) for static data
mean = np.nanmean(static, axis=(0), keepdims=True)
std = np.nanstd(static, axis=(0), keepdims=True)
return (static-mean)/std

def _make_inference_data(self,
x,
y,
aux,
seq_len=365,
interval=1,
window_size=0):
x_, y_, aux_ = [], [], []
for i in range(x.shape[0]):
tmpx, tmpy, tmp_aux = self._reshape_1d_data(
x[i], y[i], aux[i], seq_len, interval, window_size)
x_.append(tmpx)
y_.append(tmpy)
aux_.append(tmp_aux)
# (ngrids, nsamples, seq_len, nfeat)
return np.stack(x_, axis=0), np.stack(y_, axis=0), np.stack(aux_, axis=0)

def _reshape_1d_data(self,
x,
y,
aux,
seq_len=365,
interval=1,
window_size=0):
"""reshape data into LSTM many-to-one input samples

Parameters
----------
x : np.ndarray
Input features of shape [num_samples, num_features]
y : np.ndarray
Output feature of shape [num_samples, 1]
seq_length : int
Length of the requested input sequences.
interval: int
interval of time length to generate samples.
window_size: int
window size between x and y

Returns
-------
x_new: np.ndarray
shape of [num_samples*, seq_length, num_features], where
num_samples* is equal to num_samples - seq_length + 1, due to
the need of a warm start at the beginning
y_new: np.ndarray
The target value for each sample in x_new
"""
num_samples, num_features = x.shape
_, num_out = y.shape
n = (num_samples-seq_len+1) // interval
x_new = np.zeros((n, seq_len, num_features))*np.nan
y_new = np.zeros((n, num_out))*np.nan
aux_new = np.zeros((n, ))*np.nan

for i in range(n):
x_new[i] = x[i*interval:i*interval+seq_len]
y_new[i] = y[i*interval+seq_len-1]
aux_new[i] = aux[i*interval+seq_len-1]
return x_new, y_new, aux_new

def __split_into_batch(self, X, y, seq_len=365, offset=1, window_size=0):
"""split training data into batches with size of batch_size

Params
------
offset: [float] 0-1, how to offset the batches (e.g., 0.5 means that
the first batch will be 0-365 and the second will be 182-547)
"""
#(nt_, ngrid, nfeat)
x_batchs, y_batchs = [], []
for i in range(int(1 / offset)):
start = int(i * offset * seq_len)
idx = np.arange(start, y.shape[0]+1, seq_len)
split_x = np.split(X, indices_or_sections=idx,
axis=0) # (seq_len,ngrid,nfeat)
split_y = np.split(y, indices_or_sections=idx, axis=0)
# add all but the first and last batch since they will be smaller
for s in split_x:
if s.shape[0] == seq_len:
x_batchs.append(s)
for s in split_y:
if s.shape[0] == seq_len:
y_batchs.append(s)
x_batchs = np.concatenate(x_batchs, axis=1)
# (seq_len,ngrid*nyears*1/offset,nfeat)
y_batchs = np.concatenate(y_batchs, axis=1)
return np.transpose(x_batchs, (1, 0, 2)), np.transpose(y_batchs, (1, 0, 2))
return (static-mean)/std
12 changes: 6 additions & 6 deletions src/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ def load_train_data(cfg, x, y, aux, scaler):
return x, y, aux, mean, std


def load_test_data(cfg, x, y, aux, scaler):
def load_test_data(cfg, x, y, aux, scaler, stride=1):
ngrid, nt, _ = x.shape
n = (nt-cfg["seq_len"]+1)
n = (nt-cfg["seq_len"]+1)//stride
x_new = np.zeros((ngrid, n, cfg["seq_len"], cfg["num_feat"]))*np.nan
y_new = np.zeros((ngrid, n, y.shape[-1]))*np.nan
aux_new = np.zeros((ngrid, n))*np.nan
mean, std = np.array(scaler["y_mean"]), np.array(scaler["y_std"]) #(1, ngrid, nout)
mean, std = np.array(scaler["y_mean"]), np.array(scaler["y_std"])
mean = np.transpose(mean, (1,0,2))
std = np.transpose(std, (1,0,2))
for i in range(n):
x_new[:,i] = x[:,i:i+cfg["seq_len"]]
y_new[:,i] = y[:,i+cfg["seq_len"]-1]
aux_new[:,i] = aux[:,i+cfg["seq_len"]-1]
x_new[:,i] = x[:,i*stride:i*stride+cfg["seq_len"]]
y_new[:,i] = y[:,i*stride+cfg["seq_len"]-1]
aux_new[:,i] = aux[:,i*stride+cfg["seq_len"]-1]
return x_new, y_new, aux_new, np.tile(mean, (1, n, 1)), np.tile(std, (1,n,1))


Expand Down
Loading