Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 15 additions & 21 deletions fre/pp/frepp.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,29 @@ def split_netcdf_wrapper(inputdir, outputdir, component, history_source, use_sub
@click.option('-v', '--variables', type = str, required=True,
help='''Specifies which variables in $file are split and written to $outputdir.
Either a string "all" or a comma-separated string of variable names ("tasmax,tasmin,pr")''')
def split_netcdf(file, outputdir, variables):
@click.option('-r', '--rename', is_flag=True, default=False,
help='After splitting, rename output files into a nested directory structure '
'organized by frequency and duration under $outputdir.')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"During splitting", not "after splitting", to make it clear it's a single operation not two.

@click.option('-d', '--diag-manifest', type=str, required=False, default=None,
help='Path to FMS diag manifest file. Only used with --rename. '
'Required when input file has one timestep and no time bounds.')
def split_netcdf(file, outputdir, variables, rename, diag_manifest):
''' Splits a single netcdf file into one netcdf file per data variable and writes
files to $outputdir.
$variables is an option to filter the variables split out of $file and
written to $outputdir. If set to "all" (the default), all data variables
in $file are split and written to $outputdir; if set to a comma-separated
string of variable names, only the variable names in the string will be
split and written to $outputdir. If no variable names in $variables match
variables in $file, no files will be written to $outputdir.'''
variables in $file, no files will be written to $outputdir.

If --rename is set, split files are additionally reorganized into a nested
directory structure under $outputdir with frequency and duration
(e.g. atmos_daily/P1D/P6M/atmos_daily.00010101-00010630.temp.tile1.nc).'''
var_list = variables.split(",")
split_netcdf_script.split_file_xarray(file, outputdir, variables)
split_netcdf_script.split_file_xarray(file, outputdir, variables,
rename=rename,
diag_manifest=diag_manifest)


#fre pp ppval
Expand Down Expand Up @@ -271,21 +283,3 @@ def trigger(experiment, platform, target, time):
Start postprocessing history files that represent a specific chunk of time
"""
trigger_script.trigger(experiment, platform, target, time)

# fre pp rename-split
@pp_cli.command()
@click.option("-i", "--input-dir", type=str,
help="Input directory", required=True)
@click.option("-o", "--output-dir", type=str,
help="Output directory", required=True)
@click.option("-c", "--component", type=str,
help="Component name to process", required=True)
@click.option("-u", '--use-subdirs', is_flag=True, default=False,
help="Whether to search subdirs underneath $inputdir for netcdf files. Defaults to false. This option is used in flow.cylc when regridding.")
@click.option("-d", "--diag-manifest", type=str, required=False, default=None,
help="Path to FMS diag manifest associated with the component (history file). Optional, but required when the history file has one timestep and no time bounds.")
def rename_split(input_dir, output_dir, component, use_subdirs, diag_manifest):
"""
Create per-variable timeseries from shards
"""
rename_split_script.rename_split(input_dir, output_dir, component, use_subdirs, diag_manifest)
220 changes: 127 additions & 93 deletions fre/pp/split_netcdf_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging

from fre.app.helpers import get_variables
from fre.pp import rename_split_script


fre_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -140,99 +141,132 @@ def split_netcdf(inputDir, outputDir, component, history_source, use_subdirs,
fre_logger.info(f"split-netcdf-wrapper call complete, having split {files_split} files")
sys.exit(0) #check this

def split_file_xarray(infile, outfiledir, var_list='all'):
'''
Given a netcdf infile containing one or more data variables,
writes out a separate file for each data variable in the file, including the
variable name in the filename.
if var_list if specified, only the vars in var_list are written to file;
if no vars in the file match the vars in var_list, no files are written.

:param infile: input netcdf file
:type infile: string
:param outfiledir: writeable directory to which to write netcdf files
:type outfiledir: string
:param var_list: python list of string variable names or a string "all"
:type var_list: list of strings
'''
if not os.path.isdir(outfiledir):
fre_logger.info("creating output directory")
os.makedirs(outfiledir)

if not os.path.isfile(infile):
fre_logger.error(f"error: input file {infile} not found. Please check the path.")
raise OSError(f"error: input file {infile} not found. Please check the path.")

dataset = xr.load_dataset(infile, decode_cf=False, decode_times=False, decode_coords="all")
allvars = dataset.data_vars.keys()

#If you have a file of 3 or more dim vars, 2d-or-fewer vars are likely to be
#metadata vars; if your file is 2d vars, 1d vars are likely to be metadata.
max_ndims = get_max_ndims(dataset)
if max_ndims >= 3:
varsize = 2
else:
varsize = 1
fre_logger.debug(f"varsize: {varsize}")
#note: netcdf dimensions and xarray coords are NOT ALWAYS THE SAME THING.
#If they were, I could get away with the following:
#var_zerovars = [v for v in datavars if not len(dataset[v].coords) > 0])
#instead of this:
var_shortvars = [v for v in allvars if (len(dataset[v].shape) < varsize) and v not in dataset._coord_names]
#having a variable listed as both a metadata var and a coordinate var seems to
#lead to the weird adding a _FillValue behavior
fre_logger.info(f"var patterns: {VAR_PATTERNS}")
fre_logger.info(f"1 or 2-d vars: {var_shortvars}")
#both combined gets you a decent list of non-diagnostic variables
var_exclude = list(set(VAR_PATTERNS + [str(el) for el in var_shortvars] ))
def matchlist(xstr):
''' checks a string for matches in a list of patterns

xstr: string to search for matches
var_exclude: list of patterns defined in VAR_EXCLUDE'''
allmatch = [re.search(el, xstr)for el in var_exclude]
#If there's at least one match in the var_exclude list (average_bnds is OK)
return len(list(set(allmatch))) > 1
metavars = [el for el in allvars if matchlist(el)]
datavars = [el for el in allvars if not matchlist(el)]
fre_logger.debug(f"metavars: {metavars}")
fre_logger.debug(f"datavars: {datavars}")
fre_logger.debug(f"var filter list: {var_list}")

#datavars does 2 things: keep track of which vars to write, and tell xarray
#which vars to drop. we need to separate those things for the variable filtering.
if var_list == "all":
write_vars = datavars
else:
if isinstance(var_list, str):
var_list = var_list.split(",")
var_list = list(set(var_list))
write_vars = [el for el in datavars if el in var_list]
fre_logger.debug(f"intersection of datavars and var_list: {write_vars}")

if len(write_vars) <= 0:
fre_logger.info(f"No data variables found in {infile}; no writes take place.")
else:
vc_encode = set_coord_encoding(dataset, dataset._coord_names)
for variable in write_vars:
fre_logger.info(f"splitting var {variable}")
#drop all data vars (diagnostics) that are not the current var of interest
#but KEEP the metadata vars
#(seriously, we need the time_bnds)
data2 = dataset.drop_vars([el for el in datavars if el is not variable])
v_encode= set_var_encoding(dataset, metavars)
#combine 2 dicts into 1 dict - should be no shared keys,
#so the merge is straightforward
var_encode = {**vc_encode, **v_encode}
fre_logger.debug(f"var_encode settings: {var_encode}")
#Encoding principles for xarray:
# - no coords have a _FillValue
# - Everything is written out with THE SAME precision it was read in
# - Everything has THE SAME UNITS as it did when it was read in
var_outfile = fre_outfile_name(os.path.basename(infile), variable)
var_out = os.path.join(outfiledir, os.path.basename(var_outfile))
data2.to_netcdf(var_out, encoding = var_encode)
fre_logger.debug(f"Wrote '{var_out}'")
def split_file_xarray(infile, outfiledir, var_list='all', rename=False, diag_manifest=None):
'''
Given a netcdf infile containing one or more data variables,
writes out a separate file for each data variable in the file, including the
variable name in the filename.
if var_list if specified, only the vars in var_list are written to file;
if no vars in the file match the vars in var_list, no files are written.

If rename is True, split files are additionally reorganized into a nested
directory structure under outfiledir with frequency and duration (e.g. if
outfiledir=atmos_daily, a complete path/dir structure might look like
atmos_daily/P1D/P6M/atmos_daily.00010101-00010630.temp.tile1.nc).

:param infile: input netcdf file
:type infile: string
:param outfiledir: writeable directory to which to write netcdf files
:type outfiledir: string
:param var_list: python list of string variable names or a string "all"
:type var_list: list of strings
:param rename: if True, reorganize split files into nested dirs
:type rename: bool
:param diag_manifest: path to FMS diag manifest file, used with rename
:type diag_manifest: string or None
'''
if not os.path.isdir(outfiledir):
fre_logger.info("creating output directory")
os.makedirs(outfiledir)

if not os.path.isfile(infile):
fre_logger.error(f"error: input file {infile} not found. Please check the path.")
raise OSError(f"error: input file {infile} not found. Please check the path.")

dataset = xr.load_dataset(infile, decode_cf=False, decode_times=False, decode_coords="all")
allvars = dataset.data_vars.keys()

#If you have a file of 3 or more dim vars, 2d-or-fewer vars are likely to be
#metadata vars; if your file is 2d vars, 1d vars are likely to be metadata.
max_ndims = get_max_ndims(dataset)
if max_ndims >= 3:
varsize = 2
else:
varsize = 1
fre_logger.debug(f"varsize: {varsize}")
#note: netcdf dimensions and xarray coords are NOT ALWAYS THE SAME THING.
#If they were, I could get away with the following:
#var_zerovars = [v for v in datavars if not len(dataset[v].coords) > 0])
#instead of this:
var_shortvars = [v for v in allvars if (len(dataset[v].shape) < varsize) and v not in dataset._coord_names]
#having a variable listed as both a metadata var and a coordinate var seems to
#lead to the weird adding a _FillValue behavior
fre_logger.info(f"var patterns: {VAR_PATTERNS}")
fre_logger.info(f"1 or 2-d vars: {var_shortvars}")
#both combined gets you a decent list of non-diagnostic variables
var_exclude = list(set(VAR_PATTERNS + [str(el) for el in var_shortvars] ))
def matchlist(xstr):
''' checks a string for matches in a list of patterns

xstr: string to search for matches
var_exclude: list of patterns defined in VAR_EXCLUDE'''
allmatch = [re.search(el, xstr)for el in var_exclude]
#If there's at least one match in the var_exclude list (average_bnds is OK)
return len(list(set(allmatch))) > 1
metavars = [el for el in allvars if matchlist(el)]
datavars = [el for el in allvars if not matchlist(el)]
fre_logger.debug(f"metavars: {metavars}")
fre_logger.debug(f"datavars: {datavars}")
fre_logger.debug(f"var filter list: {var_list}")

#datavars does 2 things: keep track of which vars to write, and tell xarray
#which vars to drop. we need to separate those things for the variable filtering.
if var_list == "all":
write_vars = datavars
else:
if isinstance(var_list, str):
var_list = var_list.split(",")
var_list = list(set(var_list))
write_vars = [el for el in datavars if el in var_list]
fre_logger.debug(f"intersection of datavars and var_list: {write_vars}")

if len(write_vars) <= 0:
fre_logger.info(f"No data variables found in {infile}; no writes take place.")
else:
vc_encode = set_coord_encoding(dataset, dataset._coord_names)
for variable in write_vars:
fre_logger.info(f"splitting var {variable}")
#drop all data vars (diagnostics) that are not the current var of interest
#but KEEP the metadata vars
#(seriously, we need the time_bnds)
data2 = dataset.drop_vars([el for el in datavars if el is not variable])
v_encode= set_var_encoding(dataset, metavars)
#combine 2 dicts into 1 dict - should be no shared keys,
#so the merge is straightforward
var_encode = {**vc_encode, **v_encode}
fre_logger.debug(f"var_encode settings: {var_encode}")
#Encoding principles for xarray:
# - no coords have a _FillValue
# - Everything is written out with THE SAME precision it was read in
# - Everything has THE SAME UNITS as it did when it was read in
var_outfile = fre_outfile_name(os.path.basename(infile), variable)
var_out = os.path.join(outfiledir, os.path.basename(var_outfile))
data2.to_netcdf(var_out, encoding = var_encode)
fre_logger.debug(f"Wrote '{var_out}'")

if rename:
outpath = Path(outfiledir)
basename = Path(infile).stem
pattern = f"{basename}.*.nc"
split_files = list(outpath.glob(pattern))
renamed_files = []
try:
for split_file in split_files:
new_rel_path = rename_split_script.rename_file(split_file, diag_manifest)
new_full_path = outpath / new_rel_path
rename_split_script.link_or_copy(str(split_file), str(new_full_path))
renamed_files.append((split_file, new_full_path))
except Exception as exc:
fre_logger.error(f"Error renaming split files: {exc}")
fre_logger.error("Cleaning up partially renamed files")
for _, renamed_path in renamed_files:
if Path(renamed_path).exists():
Path(renamed_path).unlink()
raise
for split_file in split_files:
if split_file.exists():
split_file.unlink()
fre_logger.info(f"Renamed {len(split_files)} split files under {outfiledir}")

def get_max_ndims(dataset):
'''
Expand Down
Loading
Loading