diff --git a/fre/pp/frepp.py b/fre/pp/frepp.py index 2aca15004..825b738e8 100644 --- a/fre/pp/frepp.py +++ b/fre/pp/frepp.py @@ -229,7 +229,13 @@ def split_netcdf_wrapper(inputdir, outputdir, component, history_source, use_sub @click.option('-v', '--variables', type = str, required=True, help='''Specifies which variables in $file are split and written to $outputdir. Either a string "all" or a comma-separated string of variable names ("tasmax,tasmin,pr")''') -def split_netcdf(file, outputdir, variables): +@click.option('-r', '--rename', is_flag=True, default=False, + help='During splitting, rename output files into a nested directory structure ' + 'organized by frequency and duration under $outputdir.') +@click.option('-d', '--diag-manifest', type=str, required=False, default=None, + help='Path to FMS diag manifest file. Only used with --rename. ' + 'Required when input file has one timestep and no time bounds.') +def split_netcdf(file, outputdir, variables, rename, diag_manifest): ''' Splits a single netcdf file into one netcdf file per data variable and writes files to $outputdir. $variables is an option to filter the variables split out of $file and @@ -237,9 +243,15 @@ def split_netcdf(file, outputdir, variables): in $file are split and written to $outputdir; if set to a comma-separated string of variable names, only the variable names in the string will be split and written to $outputdir. If no variable names in $variables match - variables in $file, no files will be written to $outputdir.''' + variables in $file, no files will be written to $outputdir. + + If --rename is set, split files are additionally reorganized into a nested + directory structure under $outputdir with frequency and duration + (e.g. atmos_daily/P1D/P6M/atmos_daily.00010101-00010630.temp.tile1.nc).''' var_list = variables.split(",") - split_netcdf_script.split_file_xarray(file, outputdir, variables) + split_netcdf_script.split_file_xarray(file, outputdir, variables, + rename=rename, + diag_manifest=diag_manifest) #fre pp ppval diff --git a/fre/pp/split_netcdf_script.py b/fre/pp/split_netcdf_script.py index ed08688d6..7618302d9 100644 --- a/fre/pp/split_netcdf_script.py +++ b/fre/pp/split_netcdf_script.py @@ -20,6 +20,7 @@ import yaml from fre.app.helpers import get_variables +from fre.pp import rename_split_script fre_logger = logging.getLogger(__name__) @@ -160,7 +161,141 @@ def split_netcdf(inputDir, outputDir, component, history_source, use_subdirs, fre_logger.info(f"split-netcdf-wrapper call complete, having split {files_split} files") sys.exit(0) #check this -def split_file_xarray(infile, outfiledir, var_list='all'): +def _compute_renamed_path(filename, decoded_dataset, variable, diag_manifest=None): + ''' + Compute the renamed relative path for a split output file from in-memory data. + + Uses an already-opened (time-decoded) dataset to determine frequency, + duration, and date range so that the caller can write directly to the + final path without an intermediate flat file. + + :param filename: the split output filename (e.g. 00010101.atmos_daily.tile3.temp.nc) + :type filename: string + :param decoded_dataset: xarray Dataset opened with time decoding enabled + :type decoded_dataset: xarray.Dataset + :param variable: the data variable name + :type variable: string + :param diag_manifest: path to FMS diag manifest file + :type diag_manifest: string or None + :return: relative path for the renamed file + :rtype: pathlib.Path + ''' + stem = Path(filename).stem + parts = stem.split('.') + + if len(parts) == 4: + date, label, tile, var = parts + elif len(parts) == 3: + date, label, var = parts + tile = None + else: + raise ValueError( + f"File '{filename}' cannot be parsed. " + f"Expected format: 'date.label.var.nc' or 'date.label.tile.var.nc'") + + # determine if variable depends on time (non-static) + if 'time' in decoded_dataset[variable].dims: + is_static = False + number_of_timesteps = decoded_dataset.sizes['time'] + else: + is_static = True + number_of_timesteps = 0 + + if is_static: + if tile is not None: + newfile_base = f"{label}.{var}.{tile}.nc" + else: + newfile_base = f"{label}.{var}.nc" + return Path(label) / 'P0Y' / 'P0Y' / newfile_base + elif number_of_timesteps >= 2: + first_timestep = decoded_dataset.time.values[0] + second_timestep = decoded_dataset.time.values[1] + last_timestep = decoded_dataset.time.values[-1] + freq_label, format_ = rename_split_script.get_freq_and_format_from_two_dates( + first_timestep, second_timestep) + freq = second_timestep - first_timestep + cell_methods = decoded_dataset[variable].attrs.get('cell_methods') + if cell_methods == "time: point": + date1 = first_timestep - freq + date2 = last_timestep - freq + else: + date1 = first_timestep - freq / 2.0 + date2 = last_timestep - freq / 2.0 + duration = rename_split_script.get_duration_from_two_dates(date1, date2) + else: + time_bounds_name = decoded_dataset.time.attrs.get('bounds') + if time_bounds_name: + time_bounds = decoded_dataset[time_bounds_name] + first_timestep = time_bounds[0].values[0] + second_timestep = time_bounds[0].values[1] + freq_label, format_ = rename_split_script.get_freq_and_format_from_two_dates( + first_timestep, second_timestep) + freq = second_timestep - first_timestep + date1 = first_timestep + date2 = date1 + (number_of_timesteps - 1) * freq + duration = rename_split_script.get_duration_from_two_dates(date1, date2 - freq) + else: + if diag_manifest is not None: + if Path(diag_manifest).exists(): + fre_logger.info(f"Using diag manifest '{diag_manifest}'") + with open(diag_manifest, 'r') as f: + yaml_data = yaml.safe_load(f) + duration = None + for diag_file in yaml_data["diag_files"]: + if diag_file["file_name"] == label: + if diag_file["freq_units"] == "years": + duration = f"P{diag_file['freq']}Y" + format_ = "%Y" + elif diag_file["freq_units"] == "months": + if diag_file['freq'] == 12: + duration = "P1Y" + format_ = "%Y" + else: + duration = f"P{diag_file['freq']}M" + format_ = "%Y%m" + else: + raise Exception( + f"Diag manifest found but frequency units " + f"{diag_file['freq_units']} are unexpected; " + f"expected 'years' or 'months'.") + if duration is not None: + duration_object = rename_split_script.duration_parser.parse(duration) + else: + raise Exception( + f"File label '{label}' not found in diag manifest " + f"'{diag_manifest}'") + freq_label = duration + date1 = rename_split_script.time_parser.parse(date) + one_month = rename_split_script.duration_parser.parse('P1M') + date2 = date1 + duration_object - one_month + else: + raise FileNotFoundError( + f"Diag manifest '{diag_manifest}' does not exist") + elif 'annual' in label: + date1 = rename_split_script.time_parser.parse(date) + one_month = rename_split_script.duration_parser.parse('P1M') + duration = "P1Y" + duration_object = rename_split_script.duration_parser.parse(duration) + date2 = date1 + duration_object - one_month + format_ = "%Y" + freq_label = duration + else: + raise ValueError( + f"Diag manifest required to process file with one timestep " + f"and no time bounds") + + date1_str = date1.strftime(format_) + date2_str = date2.strftime(format_) + + if tile is not None: + newfile_base = f"{label}.{date1_str}-{date2_str}.{var}.{tile}.nc" + else: + newfile_base = f"{label}.{date1_str}-{date2_str}.{var}.nc" + + return Path(label) / freq_label / duration / newfile_base + + +def split_file_xarray(infile, outfiledir, var_list='all', rename=False, diag_manifest=None): ''' Given a netcdf infile containing one or more data variables, writes out a separate file for each data variable in the file, including the @@ -168,12 +303,21 @@ def split_file_xarray(infile, outfiledir, var_list='all'): if var_list if specified, only the vars in var_list are written to file; if no vars in the file match the vars in var_list, no files are written. + If rename is True, each split file is written directly to a nested + directory structure under outfiledir with frequency and duration (e.g. if + outfiledir=atmos_daily, a complete path/dir structure might look like + atmos_daily/P1D/P6M/atmos_daily.00010101-00010630.temp.tile1.nc). + :param infile: input netcdf file :type infile: string :param outfiledir: writeable directory to which to write netcdf files :type outfiledir: string :param var_list: python list of string variable names or a string "all" :type var_list: list of strings + :param rename: if True, write split files directly into nested dirs + :type rename: bool + :param diag_manifest: path to FMS diag manifest file, used with rename + :type diag_manifest: string or None ''' if not os.path.isdir(outfiledir): fre_logger.info("creating output directory") @@ -233,26 +377,64 @@ def matchlist(xstr): if len(write_vars) <= 0: fre_logger.info(f"No data variables found in {infile}; no writes take place.") else: - vc_encode = set_coord_encoding(dataset, dataset._coord_names) - for variable in write_vars: - fre_logger.info(f"splitting var {variable}") - #drop all data vars (diagnostics) that are not the current var of interest - #but KEEP the metadata vars - #(seriously, we need the time_bnds) - data2 = dataset.drop_vars([el for el in datavars if el is not variable]) - v_encode= set_var_encoding(dataset, metavars) - #combine 2 dicts into 1 dict - should be no shared keys, - #so the merge is straightforward - var_encode = {**vc_encode, **v_encode} - fre_logger.debug(f"var_encode settings: {var_encode}") - #Encoding principles for xarray: - # - no coords have a _FillValue - # - Everything is written out with THE SAME precision it was read in - # - Everything has THE SAME UNITS as it did when it was read in - var_outfile = fre_outfile_name(os.path.basename(infile), variable) - var_out = os.path.join(outfiledir, os.path.basename(var_outfile)) - data2.to_netcdf(var_out, encoding = var_encode) - fre_logger.debug(f"Wrote '{var_out}'") + # When rename is requested, open a second copy with time decoding + # so we can compute destination paths before writing + decoded_dataset = None + if rename: + decoded_dataset = xr.open_dataset(infile) + + try: + vc_encode = set_coord_encoding(dataset, dataset._coord_names) + for variable in write_vars: + fre_logger.info(f"splitting var {variable}") + #drop all data vars (diagnostics) that are not the current var of interest + #but KEEP the metadata vars + #(seriously, we need the time_bnds) + data2 = dataset.drop_vars([el for el in datavars if el is not variable]) + v_encode= set_var_encoding(dataset, metavars) + #combine 2 dicts into 1 dict - should be no shared keys, + #so the merge is straightforward + var_encode = {**vc_encode, **v_encode} + fre_logger.debug(f"var_encode settings: {var_encode}") + #Encoding principles for xarray: + # - no coords have a _FillValue + # - Everything is written out with THE SAME precision it was read in + # - Everything has THE SAME UNITS as it did when it was read in + var_outfile = fre_outfile_name(os.path.basename(infile), variable) + if rename: + # Compute final path and write directly there (no intermediate file) + new_rel_path = _compute_renamed_path( + var_outfile, decoded_dataset, variable, diag_manifest) + var_out = os.path.join(outfiledir, str(new_rel_path)) + os.makedirs(os.path.dirname(var_out), exist_ok=True) + else: + var_out = os.path.join(outfiledir, os.path.basename(var_outfile)) + data2.to_netcdf(var_out, encoding = var_encode) + fre_logger.debug(f"Wrote '{var_out}'") + finally: + if decoded_dataset is not None: + decoded_dataset.close() +## from main, may be needed, added during conflict resolution +# vc_encode = set_coord_encoding(dataset, dataset._coord_names) +# for variable in write_vars: +# fre_logger.info(f"splitting var {variable}") +# #drop all data vars (diagnostics) that are not the current var of interest +# #but KEEP the metadata vars +# #(seriously, we need the time_bnds) +# data2 = dataset.drop_vars([el for el in datavars if el is not variable]) +# v_encode= set_var_encoding(dataset, metavars) +# #combine 2 dicts into 1 dict - should be no shared keys, +# #so the merge is straightforward +# var_encode = {**vc_encode, **v_encode} +# fre_logger.debug(f"var_encode settings: {var_encode}") +# #Encoding principles for xarray: +# # - no coords have a _FillValue +# # - Everything is written out with THE SAME precision it was read in +# # - Everything has THE SAME UNITS as it did when it was read in +# var_outfile = fre_outfile_name(os.path.basename(infile), variable) +# var_out = os.path.join(outfiledir, os.path.basename(var_outfile)) +# data2.to_netcdf(var_out, encoding = var_encode) +# fre_logger.debug(f"Wrote '{var_out}'") def get_max_ndims(dataset): ''' diff --git a/fre/pp/tests/test_split_netcdf.py b/fre/pp/tests/test_split_netcdf.py index 6272757c7..d3cc79f63 100644 --- a/fre/pp/tests/test_split_netcdf.py +++ b/fre/pp/tests/test_split_netcdf.py @@ -240,3 +240,112 @@ def test_split_file_cleanup(): dir_deleted = [not osp.isdir(el) for el in newdir] el_deleted = [not osp.isdir(el) for el in netcdf_files] assert all(el_deleted + dir_deleted) + + +#-- split-netcdf with --rename flag (direct import tests) -- + +# scope=module means that this is invoked once for the tests that run from this file +@pytest.fixture(scope="module") +def ncgen_setup(): + '''Generates netcdf files from cdl test data needed for split+rename testing.''' + nc_files = [] + for testcase in cases.keys(): + cds = osp.join(test_dir, cases[testcase]["dir"]) + nc_path = osp.join(cds, cases[testcase]["nc"]) + cdl_path = osp.join(cds, cases[testcase]["cdl"]) + subprocess.run(["ncgen3", "-k", "netCDF-4", "-o", nc_path, cdl_path], + check=True, capture_output=True) + nc_files.append(nc_path) + yield nc_files + for nc in nc_files: + if osp.isfile(nc): + os.unlink(nc) + + +def test_split_rename_import_ts(ncgen_setup, tmp_path): + '''Tests split+rename via direct import for timeseries data. + + Uses split_file_xarray with rename=True directly. + ''' + workdir = osp.join(test_dir, cases["ts"]["dir"]) + infile = osp.join(workdir, cases["ts"]["nc"]) + outfiledir = str(tmp_path / "import_ts") + + split_netcdf_script.split_file_xarray(infile, outfiledir, "all", + rename=True) + + outpath = Path(outfiledir) + + # Verify no flat .nc files remain at root + root_nc_files = list(outpath.glob("*.nc")) + assert len(root_nc_files) == 0, f"Flat .nc files remain at root: {root_nc_files}" + + # Verify nested structure was created + nested_nc_files = list(outpath.rglob("*.nc")) + assert len(nested_nc_files) > 0, "No .nc files found in nested structure" + + # Verify component directory + component_dir = outpath / "atmos_daily" + assert component_dir.is_dir(), f"Component directory {component_dir} not found" + + # Verify depth (component/freq/duration/file.nc) + for nc_file in nested_nc_files: + rel_path = nc_file.relative_to(outpath) + parts = rel_path.parts + assert len(parts) >= 4, \ + f"File {nc_file} is not deep enough: {parts}" + + +def test_split_rename_import_static(ncgen_setup, tmp_path): + '''Tests split+rename via direct import for static data. + + Uses split_file_xarray with rename=True directly. + ''' + workdir = osp.join(test_dir, cases["static"]["dir"]) + infile = osp.join(workdir, cases["static"]["nc"]) + outfiledir = str(tmp_path / "import_static") + + split_netcdf_script.split_file_xarray(infile, outfiledir, "all", + rename=True) + + outpath = Path(outfiledir) + + # Verify no flat .nc files remain at root + root_nc_files = list(outpath.glob("*.nc")) + assert len(root_nc_files) == 0, f"Flat .nc files remain at root: {root_nc_files}" + + # Verify nested structure was created + nested_nc_files = list(outpath.rglob("*.nc")) + assert len(nested_nc_files) > 0, "No .nc files found in nested structure" + + # Verify component directory + component_dir = outpath / "ocean_static" + assert component_dir.is_dir(), f"Component directory {component_dir} not found" + + # Verify depth (component/P0Y/P0Y/file.nc) + for nc_file in nested_nc_files: + rel_path = nc_file.relative_to(outpath) + parts = rel_path.parts + assert len(parts) >= 4, \ + f"File {nc_file} is not deep enough: {parts}" + + +def test_split_rename_without_flag(ncgen_setup, tmp_path): + '''Tests that split_file_xarray without rename produces flat output (no nesting). + + This verifies backward compatibility. + ''' + workdir = osp.join(test_dir, cases["ts"]["dir"]) + infile = osp.join(workdir, cases["ts"]["nc"]) + outfiledir = str(tmp_path / "no_rename") + + split_netcdf_script.split_file_xarray(infile, outfiledir, "all") + + outpath = Path(outfiledir) + # Verify flat .nc files exist at root (no nesting) + root_nc_files = list(outpath.glob("*.nc")) + assert len(root_nc_files) > 0, "No flat .nc files at root without rename" + + # Verify no subdirectories were created + subdirs = [d for d in outpath.iterdir() if d.is_dir()] + assert len(subdirs) == 0, f"Subdirs created without rename: {subdirs}" diff --git a/fre/tests/test_files/rename-split/README b/fre/tests/test_files/rename-split/README index b366d870f..d477c23bb 100644 --- a/fre/tests/test_files/rename-split/README +++ b/fre/tests/test_files/rename-split/README @@ -5,7 +5,7 @@ 1. Notes on test data rename-split-to-pp is called directly after split-netcdf in the fre workflow; -you shoudl be using split-netcdf output as input for rename-split-to-pp +you should be using split-netcdf output as input for rename-split-to-pp netcdf files -> split-netcdf -> rename-split-to-pp diff --git a/fre/tests/test_fre_pp_cli.py b/fre/tests/test_fre_pp_cli.py index 1c4df7e6e..3ee3d1401 100644 --- a/fre/tests/test_fre_pp_cli.py +++ b/fre/tests/test_fre_pp_cli.py @@ -1,7 +1,9 @@ import os import shutil +import subprocess from pathlib import Path +import pytest from click.testing import CliRunner from fre import fre @@ -213,3 +215,77 @@ def test_cli_fre_pp_split_netcdf_opt_dne(): ''' fre pp split-netcdf optionDNE ''' result = runner.invoke(fre.fre, args=["pp", "split-netcdf", "optionDNE"]) assert result.exit_code == 2 + +#-- fre pp split-netcdf --rename (CLI functional tests) + +SPLIT_NETCDF_TEST_DIR = os.path.realpath("fre/tests/test_files/ascii_files/split_netcdf") +SPLIT_RENAME_CASES = { + "ts": {"dir": "atmos_daily.tile3", + "nc": "00010101.atmos_daily.tile3.nc", + "cdl": "00010101.atmos_daily.tile3.cdl", + "component": "atmos_daily"}, + "static": {"dir": "ocean_static", + "nc": "00010101.ocean_static.nc", + "cdl": "00010101.ocean_static.cdl", + "component": "ocean_static"} +} + +@pytest.fixture(scope="module") +def split_rename_ncgen(): + '''Generates netcdf files from cdl test data for split --rename CLI tests.''' + nc_files = [] + for testcase in SPLIT_RENAME_CASES.values(): + cds = os.path.join(SPLIT_NETCDF_TEST_DIR, testcase["dir"]) + nc_path = os.path.join(cds, testcase["nc"]) + cdl_path = os.path.join(cds, testcase["cdl"]) + subprocess.run(["ncgen3", "-k", "netCDF-4", "-o", nc_path, cdl_path], + check=True, capture_output=True) + nc_files.append(nc_path) + yield nc_files + for nc in nc_files: + if os.path.isfile(nc): + os.unlink(nc) + + +@pytest.mark.parametrize("case_key", ["ts", "static"]) +def test_cli_fre_pp_split_netcdf_rename_run(split_rename_ncgen, tmp_path, case_key): + ''' fre pp split-netcdf --rename runs successfully via CLI ''' + case = SPLIT_RENAME_CASES[case_key] + workdir = os.path.join(SPLIT_NETCDF_TEST_DIR, case["dir"]) + infile = os.path.join(workdir, case["nc"]) + outfiledir = str(tmp_path / f"rename_{case_key}") + result = runner.invoke(fre.fre, args=["pp", "split-netcdf", + "--file", infile, + "--outputdir", outfiledir, + "--variables", "all", + "--rename"]) + assert result.exit_code == 0 + outpath = Path(outfiledir) + component_dir = outpath / case["component"] + assert component_dir.is_dir(), f"Expected component directory {component_dir} not found" + root_nc_files = list(outpath.glob("*.nc")) + assert len(root_nc_files) == 0, f"Flat .nc files remain at root: {root_nc_files}" + nested_nc_files = list(outpath.rglob("*.nc")) + assert len(nested_nc_files) > 0, "No .nc files found in nested structure" + for nc_file in nested_nc_files: + rel_path = nc_file.relative_to(outpath) + assert len(rel_path.parts) >= 4, \ + f"File {nc_file} not deep enough: {rel_path.parts}" + + +def test_cli_fre_pp_split_netcdf_no_rename(split_rename_ncgen, tmp_path): + ''' fre pp split-netcdf without --rename produces flat output ''' + case = SPLIT_RENAME_CASES["ts"] + workdir = os.path.join(SPLIT_NETCDF_TEST_DIR, case["dir"]) + infile = os.path.join(workdir, case["nc"]) + outfiledir = str(tmp_path / "no_rename") + result = runner.invoke(fre.fre, args=["pp", "split-netcdf", + "--file", infile, + "--outputdir", outfiledir, + "--variables", "all"]) + assert result.exit_code == 0 + outpath = Path(outfiledir) + root_nc_files = list(outpath.glob("*.nc")) + assert len(root_nc_files) > 0, "No flat .nc files at root without --rename" + subdirs = [d for d in outpath.iterdir() if d.is_dir()] + assert len(subdirs) == 0, f"Subdirs created without --rename: {subdirs}"