Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions fre/pp/frepp.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,29 @@ def split_netcdf_wrapper(inputdir, outputdir, component, history_source, use_sub
@click.option('-v', '--variables', type = str, required=True,
help='''Specifies which variables in $file are split and written to $outputdir.
Either a string "all" or a comma-separated string of variable names ("tasmax,tasmin,pr")''')
def split_netcdf(file, outputdir, variables):
@click.option('-r', '--rename', is_flag=True, default=False,
help='During splitting, rename output files into a nested directory structure '
'organized by frequency and duration under $outputdir.')
@click.option('-d', '--diag-manifest', type=str, required=False, default=None,
help='Path to FMS diag manifest file. Only used with --rename. '
'Required when input file has one timestep and no time bounds.')
def split_netcdf(file, outputdir, variables, rename, diag_manifest):
''' Splits a single netcdf file into one netcdf file per data variable and writes
files to $outputdir.
$variables is an option to filter the variables split out of $file and
written to $outputdir. If set to "all" (the default), all data variables
in $file are split and written to $outputdir; if set to a comma-separated
string of variable names, only the variable names in the string will be
split and written to $outputdir. If no variable names in $variables match
variables in $file, no files will be written to $outputdir.'''
variables in $file, no files will be written to $outputdir.

If --rename is set, split files are additionally reorganized into a nested
directory structure under $outputdir with frequency and duration
(e.g. atmos_daily/P1D/P6M/atmos_daily.00010101-00010630.temp.tile1.nc).'''
var_list = variables.split(",")
split_netcdf_script.split_file_xarray(file, outputdir, variables)
split_netcdf_script.split_file_xarray(file, outputdir, variables,
rename=rename,
diag_manifest=diag_manifest)


#fre pp ppval
Expand Down
224 changes: 203 additions & 21 deletions fre/pp/split_netcdf_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import yaml

from fre.app.helpers import get_variables
from fre.pp import rename_split_script


fre_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -160,20 +161,163 @@ def split_netcdf(inputDir, outputDir, component, history_source, use_subdirs,
fre_logger.info(f"split-netcdf-wrapper call complete, having split {files_split} files")
sys.exit(0) #check this

def split_file_xarray(infile, outfiledir, var_list='all'):
def _compute_renamed_path(filename, decoded_dataset, variable, diag_manifest=None):
'''
Compute the renamed relative path for a split output file from in-memory data.

Uses an already-opened (time-decoded) dataset to determine frequency,
duration, and date range so that the caller can write directly to the
final path without an intermediate flat file.

:param filename: the split output filename (e.g. 00010101.atmos_daily.tile3.temp.nc)
:type filename: string
:param decoded_dataset: xarray Dataset opened with time decoding enabled
:type decoded_dataset: xarray.Dataset
:param variable: the data variable name
:type variable: string
:param diag_manifest: path to FMS diag manifest file
:type diag_manifest: string or None
:return: relative path for the renamed file
:rtype: pathlib.Path
'''
stem = Path(filename).stem
parts = stem.split('.')

if len(parts) == 4:
date, label, tile, var = parts
elif len(parts) == 3:
date, label, var = parts
tile = None
else:
raise ValueError(
f"File '{filename}' cannot be parsed. "
f"Expected format: 'date.label.var.nc' or 'date.label.tile.var.nc'")

# determine if variable depends on time (non-static)
if 'time' in decoded_dataset[variable].dims:
is_static = False
number_of_timesteps = decoded_dataset.sizes['time']
else:
is_static = True
number_of_timesteps = 0

if is_static:
if tile is not None:
newfile_base = f"{label}.{var}.{tile}.nc"
else:
newfile_base = f"{label}.{var}.nc"
return Path(label) / 'P0Y' / 'P0Y' / newfile_base
elif number_of_timesteps >= 2:
first_timestep = decoded_dataset.time.values[0]
second_timestep = decoded_dataset.time.values[1]
last_timestep = decoded_dataset.time.values[-1]
freq_label, format_ = rename_split_script.get_freq_and_format_from_two_dates(
first_timestep, second_timestep)
freq = second_timestep - first_timestep
cell_methods = decoded_dataset[variable].attrs.get('cell_methods')
if cell_methods == "time: point":
date1 = first_timestep - freq
date2 = last_timestep - freq
else:
date1 = first_timestep - freq / 2.0
date2 = last_timestep - freq / 2.0
duration = rename_split_script.get_duration_from_two_dates(date1, date2)
else:
time_bounds_name = decoded_dataset.time.attrs.get('bounds')
if time_bounds_name:
time_bounds = decoded_dataset[time_bounds_name]
first_timestep = time_bounds[0].values[0]
second_timestep = time_bounds[0].values[1]
freq_label, format_ = rename_split_script.get_freq_and_format_from_two_dates(
first_timestep, second_timestep)
freq = second_timestep - first_timestep
date1 = first_timestep
date2 = date1 + (number_of_timesteps - 1) * freq
duration = rename_split_script.get_duration_from_two_dates(date1, date2 - freq)
else:
if diag_manifest is not None:
if Path(diag_manifest).exists():
fre_logger.info(f"Using diag manifest '{diag_manifest}'")
with open(diag_manifest, 'r') as f:
yaml_data = yaml.safe_load(f)
duration = None
for diag_file in yaml_data["diag_files"]:
if diag_file["file_name"] == label:
if diag_file["freq_units"] == "years":
duration = f"P{diag_file['freq']}Y"
format_ = "%Y"
elif diag_file["freq_units"] == "months":
if diag_file['freq'] == 12:
duration = "P1Y"
format_ = "%Y"
else:
duration = f"P{diag_file['freq']}M"
format_ = "%Y%m"
else:
raise Exception(
f"Diag manifest found but frequency units "
f"{diag_file['freq_units']} are unexpected; "
f"expected 'years' or 'months'.")
if duration is not None:
duration_object = rename_split_script.duration_parser.parse(duration)
else:
raise Exception(
f"File label '{label}' not found in diag manifest "
f"'{diag_manifest}'")
freq_label = duration
date1 = rename_split_script.time_parser.parse(date)
one_month = rename_split_script.duration_parser.parse('P1M')
date2 = date1 + duration_object - one_month
else:
raise FileNotFoundError(
f"Diag manifest '{diag_manifest}' does not exist")
elif 'annual' in label:
date1 = rename_split_script.time_parser.parse(date)
one_month = rename_split_script.duration_parser.parse('P1M')
duration = "P1Y"
duration_object = rename_split_script.duration_parser.parse(duration)
date2 = date1 + duration_object - one_month
format_ = "%Y"
freq_label = duration
else:
raise ValueError(
f"Diag manifest required to process file with one timestep "
f"and no time bounds")

date1_str = date1.strftime(format_)
date2_str = date2.strftime(format_)

if tile is not None:
newfile_base = f"{label}.{date1_str}-{date2_str}.{var}.{tile}.nc"
else:
newfile_base = f"{label}.{date1_str}-{date2_str}.{var}.nc"

return Path(label) / freq_label / duration / newfile_base


def split_file_xarray(infile, outfiledir, var_list='all', rename=False, diag_manifest=None):
'''
Given a netcdf infile containing one or more data variables,
writes out a separate file for each data variable in the file, including the
variable name in the filename.
if var_list if specified, only the vars in var_list are written to file;
if no vars in the file match the vars in var_list, no files are written.

If rename is True, each split file is written directly to a nested
directory structure under outfiledir with frequency and duration (e.g. if
outfiledir=atmos_daily, a complete path/dir structure might look like
atmos_daily/P1D/P6M/atmos_daily.00010101-00010630.temp.tile1.nc).

:param infile: input netcdf file
:type infile: string
:param outfiledir: writeable directory to which to write netcdf files
:type outfiledir: string
:param var_list: python list of string variable names or a string "all"
:type var_list: list of strings
:param rename: if True, write split files directly into nested dirs
:type rename: bool
:param diag_manifest: path to FMS diag manifest file, used with rename
:type diag_manifest: string or None
'''
if not os.path.isdir(outfiledir):
fre_logger.info("creating output directory")
Expand Down Expand Up @@ -233,26 +377,64 @@ def matchlist(xstr):
if len(write_vars) <= 0:
fre_logger.info(f"No data variables found in {infile}; no writes take place.")
else:
vc_encode = set_coord_encoding(dataset, dataset._coord_names)
for variable in write_vars:
fre_logger.info(f"splitting var {variable}")
#drop all data vars (diagnostics) that are not the current var of interest
#but KEEP the metadata vars
#(seriously, we need the time_bnds)
data2 = dataset.drop_vars([el for el in datavars if el is not variable])
v_encode= set_var_encoding(dataset, metavars)
#combine 2 dicts into 1 dict - should be no shared keys,
#so the merge is straightforward
var_encode = {**vc_encode, **v_encode}
fre_logger.debug(f"var_encode settings: {var_encode}")
#Encoding principles for xarray:
# - no coords have a _FillValue
# - Everything is written out with THE SAME precision it was read in
# - Everything has THE SAME UNITS as it did when it was read in
var_outfile = fre_outfile_name(os.path.basename(infile), variable)
var_out = os.path.join(outfiledir, os.path.basename(var_outfile))
data2.to_netcdf(var_out, encoding = var_encode)
fre_logger.debug(f"Wrote '{var_out}'")
# When rename is requested, open a second copy with time decoding
# so we can compute destination paths before writing
decoded_dataset = None
if rename:
decoded_dataset = xr.open_dataset(infile)

try:
vc_encode = set_coord_encoding(dataset, dataset._coord_names)
for variable in write_vars:
fre_logger.info(f"splitting var {variable}")
#drop all data vars (diagnostics) that are not the current var of interest
#but KEEP the metadata vars
#(seriously, we need the time_bnds)
data2 = dataset.drop_vars([el for el in datavars if el is not variable])
v_encode= set_var_encoding(dataset, metavars)
#combine 2 dicts into 1 dict - should be no shared keys,
#so the merge is straightforward
var_encode = {**vc_encode, **v_encode}
fre_logger.debug(f"var_encode settings: {var_encode}")
#Encoding principles for xarray:
# - no coords have a _FillValue
# - Everything is written out with THE SAME precision it was read in
# - Everything has THE SAME UNITS as it did when it was read in
var_outfile = fre_outfile_name(os.path.basename(infile), variable)
if rename:
# Compute final path and write directly there (no intermediate file)
new_rel_path = _compute_renamed_path(
var_outfile, decoded_dataset, variable, diag_manifest)
var_out = os.path.join(outfiledir, str(new_rel_path))
os.makedirs(os.path.dirname(var_out), exist_ok=True)
else:
var_out = os.path.join(outfiledir, os.path.basename(var_outfile))
data2.to_netcdf(var_out, encoding = var_encode)
fre_logger.debug(f"Wrote '{var_out}'")
finally:
if decoded_dataset is not None:
decoded_dataset.close()
## from main, may be needed, added during conflict resolution
# vc_encode = set_coord_encoding(dataset, dataset._coord_names)
# for variable in write_vars:
# fre_logger.info(f"splitting var {variable}")
# #drop all data vars (diagnostics) that are not the current var of interest
# #but KEEP the metadata vars
# #(seriously, we need the time_bnds)
# data2 = dataset.drop_vars([el for el in datavars if el is not variable])
# v_encode= set_var_encoding(dataset, metavars)
# #combine 2 dicts into 1 dict - should be no shared keys,
# #so the merge is straightforward
# var_encode = {**vc_encode, **v_encode}
# fre_logger.debug(f"var_encode settings: {var_encode}")
# #Encoding principles for xarray:
# # - no coords have a _FillValue
# # - Everything is written out with THE SAME precision it was read in
# # - Everything has THE SAME UNITS as it did when it was read in
# var_outfile = fre_outfile_name(os.path.basename(infile), variable)
# var_out = os.path.join(outfiledir, os.path.basename(var_outfile))
# data2.to_netcdf(var_out, encoding = var_encode)
# fre_logger.debug(f"Wrote '{var_out}'")

def get_max_ndims(dataset):
'''
Expand Down
Loading
Loading