Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 15 additions & 21 deletions fre/pp/frepp.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,29 @@ def split_netcdf_wrapper(inputdir, outputdir, component, history_source, use_sub
@click.option('-v', '--variables', type = str, required=True,
help='''Specifies which variables in $file are split and written to $outputdir.
Either a string "all" or a comma-separated string of variable names ("tasmax,tasmin,pr")''')
def split_netcdf(file, outputdir, variables):
@click.option('-r', '--rename', is_flag=True, default=False,
help='After splitting, rename output files into a nested directory structure '
'organized by frequency and duration under $outputdir.')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"During splitting", not "after splitting", to make it clear it's a single operation not two.

@click.option('-d', '--diag-manifest', type=str, required=False, default=None,
help='Path to FMS diag manifest file. Only used with --rename. '
'Required when input file has one timestep and no time bounds.')
def split_netcdf(file, outputdir, variables, rename, diag_manifest):
''' Splits a single netcdf file into one netcdf file per data variable and writes
files to $outputdir.
$variables is an option to filter the variables split out of $file and
written to $outputdir. If set to "all" (the default), all data variables
in $file are split and written to $outputdir; if set to a comma-separated
string of variable names, only the variable names in the string will be
split and written to $outputdir. If no variable names in $variables match
variables in $file, no files will be written to $outputdir.'''
variables in $file, no files will be written to $outputdir.

If --rename is set, split files are additionally reorganized into a nested
directory structure under $outputdir with frequency and duration
(e.g. atmos_daily/P1D/P6M/atmos_daily.00010101-00010630.temp.tile1.nc).'''
var_list = variables.split(",")
split_netcdf_script.split_file_xarray(file, outputdir, variables)
split_netcdf_script.split_file_xarray(file, outputdir, variables,
rename=rename,
diag_manifest=diag_manifest)


#fre pp ppval
Expand Down Expand Up @@ -271,21 +283,3 @@ def trigger(experiment, platform, target, time):
Start postprocessing history files that represent a specific chunk of time
"""
trigger_script.trigger(experiment, platform, target, time)

# fre pp rename-split
@pp_cli.command()
@click.option("-i", "--input-dir", type=str,
help="Input directory", required=True)
@click.option("-o", "--output-dir", type=str,
help="Output directory", required=True)
@click.option("-c", "--component", type=str,
help="Component name to process", required=True)
@click.option("-u", '--use-subdirs', is_flag=True, default=False,
help="Whether to search subdirs underneath $inputdir for netcdf files. Defaults to false. This option is used in flow.cylc when regridding.")
@click.option("-d", "--diag-manifest", type=str, required=False, default=None,
help="Path to FMS diag manifest associated with the component (history file). Optional, but required when the history file has one timestep and no time bounds.")
def rename_split(input_dir, output_dir, component, use_subdirs, diag_manifest):
"""
Create per-variable timeseries from shards
"""
rename_split_script.rename_split(input_dir, output_dir, component, use_subdirs, diag_manifest)
35 changes: 34 additions & 1 deletion fre/pp/split_netcdf_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging

from fre.app.helpers import get_variables
from fre.pp import rename_split_script


fre_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -140,20 +141,28 @@ def split_netcdf(inputDir, outputDir, component, history_source, use_subdirs,
fre_logger.info(f"split-netcdf-wrapper call complete, having split {files_split} files")
sys.exit(0) #check this

def split_file_xarray(infile, outfiledir, var_list='all'):
def split_file_xarray(infile, outfiledir, var_list='all', rename=False, diag_manifest=None):
'''
Given a netcdf infile containing one or more data variables,
writes out a separate file for each data variable in the file, including the
variable name in the filename.
if var_list if specified, only the vars in var_list are written to file;
if no vars in the file match the vars in var_list, no files are written.

If rename is True, split files are additionally reorganized into a nested
directory structure under outfiledir with frequency and duration
(e.g. atmos_daily/P1D/P6M/atmos_daily.00010101-00010630.temp.tile1.nc).

:param infile: input netcdf file
:type infile: string
:param outfiledir: writeable directory to which to write netcdf files
:type outfiledir: string
:param var_list: python list of string variable names or a string "all"
:type var_list: list of strings
:param rename: if True, reorganize split files into nested dirs
:type rename: bool
:param diag_manifest: path to FMS diag manifest file, used with rename
:type diag_manifest: string or None
'''
if not os.path.isdir(outfiledir):
fre_logger.info("creating output directory")
Expand Down Expand Up @@ -234,6 +243,30 @@ def matchlist(xstr):
data2.to_netcdf(var_out, encoding = var_encode)
fre_logger.debug(f"Wrote '{var_out}'")

if rename:
outpath = Path(outfiledir)
basename = Path(infile).stem
pattern = f"{basename}.*.nc"
split_files = list(outpath.glob(pattern))
renamed_files = []
try:
for split_file in split_files:
new_rel_path = rename_split_script.rename_file(split_file, diag_manifest)
new_full_path = outpath / new_rel_path
rename_split_script.link_or_copy(str(split_file), str(new_full_path))
renamed_files.append((split_file, new_full_path))
except Exception as exc:
fre_logger.error(f"Error renaming split files: {exc}")
fre_logger.error("Cleaning up partially renamed files")
for _, renamed_path in renamed_files:
if Path(renamed_path).exists():
Path(renamed_path).unlink()
raise
for split_file in split_files:
if split_file.exists():
split_file.unlink()
fre_logger.info(f"Renamed {len(split_files)} split files under {outfiledir}")

def get_max_ndims(dataset):
'''
Gets the maximum number of dimensions of a single var in an xarray Dataset object. Excludes coord vars, which should be single-dim anyway.
Expand Down
134 changes: 134 additions & 0 deletions fre/pp/tests/test_split_netcdf_rename.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
'''
Tests split-netcdf with --rename flag.
Tests the combined split + rename functionality that reorganizes
split netcdf files into a nested directory structure with frequency and duration.

Uses the existing split-netcdf test data (atmos_daily, ocean_static) to verify
the --rename flag behavior via direct import.
CLI tests (CliRunner) are in fre/tests/test_fre_pp_cli.py.
'''

import pytest
import subprocess
import os
from os import path as osp
import shutil
from pathlib import Path
from fre.pp import split_netcdf_script
from fre.pp import rename_split_script

test_dir = osp.realpath("fre/tests/test_files/ascii_files/split_netcdf")

cases = {"ts": {"dir": "atmos_daily.tile3",
"nc": "00010101.atmos_daily.tile3.nc",
"cdl": "00010101.atmos_daily.tile3.cdl"},
"static": {"dir": "ocean_static",
"nc": "00010101.ocean_static.nc",
"cdl": "00010101.ocean_static.cdl"}}


@pytest.fixture(scope="module")
def ncgen_setup():
'''Generates netcdf files from cdl test data needed for split+rename testing.'''
nc_files = []
for testcase in cases.keys():
cds = osp.join(test_dir, cases[testcase]["dir"])
nc_path = osp.join(cds, cases[testcase]["nc"])
cdl_path = osp.join(cds, cases[testcase]["cdl"])
subprocess.run(["ncgen3", "-k", "netCDF-4", "-o", nc_path, cdl_path],
check=True, capture_output=True)
nc_files.append(nc_path)
yield nc_files
for nc in nc_files:
if osp.isfile(nc):
os.unlink(nc)


def test_split_rename_import_ts(ncgen_setup, tmp_path):
'''Tests split+rename via direct import for timeseries data.

Uses split_file_xarray with rename=True directly.
'''
workdir = osp.join(test_dir, cases["ts"]["dir"])
infile = osp.join(workdir, cases["ts"]["nc"])
outfiledir = str(tmp_path / "import_ts")

split_netcdf_script.split_file_xarray(infile, outfiledir, "all",
rename=True)

outpath = Path(outfiledir)

# Verify no flat .nc files remain at root
root_nc_files = list(outpath.glob("*.nc"))
assert len(root_nc_files) == 0, f"Flat .nc files remain at root: {root_nc_files}"

# Verify nested structure was created
nested_nc_files = list(outpath.rglob("*.nc"))
assert len(nested_nc_files) > 0, "No .nc files found in nested structure"

# Verify component directory
component_dir = outpath / "atmos_daily"
assert component_dir.is_dir(), f"Component directory {component_dir} not found"

# Verify depth (component/freq/duration/file.nc)
for nc_file in nested_nc_files:
rel_path = nc_file.relative_to(outpath)
parts = rel_path.parts
assert len(parts) >= 4, \
f"File {nc_file} is not deep enough: {parts}"


def test_split_rename_import_static(ncgen_setup, tmp_path):
'''Tests split+rename via direct import for static data.

Uses split_file_xarray with rename=True directly.
'''
workdir = osp.join(test_dir, cases["static"]["dir"])
infile = osp.join(workdir, cases["static"]["nc"])
outfiledir = str(tmp_path / "import_static")

split_netcdf_script.split_file_xarray(infile, outfiledir, "all",
rename=True)

outpath = Path(outfiledir)

# Verify no flat .nc files remain at root
root_nc_files = list(outpath.glob("*.nc"))
assert len(root_nc_files) == 0, f"Flat .nc files remain at root: {root_nc_files}"

# Verify nested structure was created
nested_nc_files = list(outpath.rglob("*.nc"))
assert len(nested_nc_files) > 0, "No .nc files found in nested structure"

# Verify component directory
component_dir = outpath / "ocean_static"
assert component_dir.is_dir(), f"Component directory {component_dir} not found"

# Verify depth (component/P0Y/P0Y/file.nc)
for nc_file in nested_nc_files:
rel_path = nc_file.relative_to(outpath)
parts = rel_path.parts
assert len(parts) >= 4, \
f"File {nc_file} is not deep enough: {parts}"


def test_split_rename_without_flag(ncgen_setup, tmp_path):
'''Tests that split_file_xarray without rename produces flat output (no nesting).

This verifies backward compatibility.
'''
workdir = osp.join(test_dir, cases["ts"]["dir"])
infile = osp.join(workdir, cases["ts"]["nc"])
outfiledir = str(tmp_path / "no_rename")

split_netcdf_script.split_file_xarray(infile, outfiledir, "all")

outpath = Path(outfiledir)
# Verify flat .nc files exist at root (no nesting)
root_nc_files = list(outpath.glob("*.nc"))
assert len(root_nc_files) > 0, "No flat .nc files at root without rename"

# Verify no subdirectories were created
subdirs = [d for d in outpath.iterdir() if d.is_dir()]
assert len(subdirs) == 0, f"Subdirs created without rename: {subdirs}"

2 changes: 1 addition & 1 deletion fre/tests/test_files/rename-split/README
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
1. Notes on test data

rename-split-to-pp is called directly after split-netcdf in the fre workflow;
you shoudl be using split-netcdf output as input for rename-split-to-pp
you should be using split-netcdf output as input for rename-split-to-pp

netcdf files -> split-netcdf -> rename-split-to-pp

Expand Down
89 changes: 89 additions & 0 deletions fre/tests/test_fre_pp_cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os
import shutil
import subprocess
from pathlib import Path

import pytest
from click.testing import CliRunner

from fre import fre
Expand Down Expand Up @@ -213,3 +215,90 @@ def test_cli_fre_pp_split_netcdf_opt_dne():
''' fre pp split-netcdf optionDNE '''
result = runner.invoke(fre.fre, args=["pp", "split-netcdf", "optionDNE"])
assert result.exit_code == 2

def test_cli_fre_pp_split_netcdf_rename_help():
''' fre pp split-netcdf --help includes --rename option '''
result = runner.invoke(fre.fre, args=["pp", "split-netcdf", "--help"])
assert result.exit_code == 0
assert "--rename" in result.output

def test_cli_fre_pp_split_netcdf_diag_manifest_help():
''' fre pp split-netcdf --help includes --diag-manifest option '''
result = runner.invoke(fre.fre, args=["pp", "split-netcdf", "--help"])
assert result.exit_code == 0
assert "--diag-manifest" in result.output
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the new test above it do not test anything other than the existence of the click options!



#-- fre pp split-netcdf --rename (CLI functional tests)

SPLIT_NETCDF_TEST_DIR = os.path.realpath("fre/tests/test_files/ascii_files/split_netcdf")
SPLIT_RENAME_CASES = {
"ts": {"dir": "atmos_daily.tile3",
"nc": "00010101.atmos_daily.tile3.nc",
"cdl": "00010101.atmos_daily.tile3.cdl",
"component": "atmos_daily"},
"static": {"dir": "ocean_static",
"nc": "00010101.ocean_static.nc",
"cdl": "00010101.ocean_static.cdl",
"component": "ocean_static"}
}

@pytest.fixture(scope="module")
def split_rename_ncgen():
'''Generates netcdf files from cdl test data for split --rename CLI tests.'''
nc_files = []
for testcase in SPLIT_RENAME_CASES.values():
cds = os.path.join(SPLIT_NETCDF_TEST_DIR, testcase["dir"])
nc_path = os.path.join(cds, testcase["nc"])
cdl_path = os.path.join(cds, testcase["cdl"])
subprocess.run(["ncgen3", "-k", "netCDF-4", "-o", nc_path, cdl_path],
check=True, capture_output=True)
nc_files.append(nc_path)
yield nc_files
for nc in nc_files:
if os.path.isfile(nc):
os.unlink(nc)


@pytest.mark.parametrize("case_key", ["ts", "static"])
def test_cli_fre_pp_split_netcdf_rename_run(split_rename_ncgen, tmp_path, case_key):
''' fre pp split-netcdf --rename runs successfully via CLI '''
case = SPLIT_RENAME_CASES[case_key]
workdir = os.path.join(SPLIT_NETCDF_TEST_DIR, case["dir"])
infile = os.path.join(workdir, case["nc"])
outfiledir = str(tmp_path / f"rename_{case_key}")
result = runner.invoke(fre.fre, args=["pp", "split-netcdf",
"--file", infile,
"--outputdir", outfiledir,
"--variables", "all",
"--rename"])
assert result.exit_code == 0
outpath = Path(outfiledir)
component_dir = outpath / case["component"]
assert component_dir.is_dir(), f"Expected component directory {component_dir} not found"
root_nc_files = list(outpath.glob("*.nc"))
assert len(root_nc_files) == 0, f"Flat .nc files remain at root: {root_nc_files}"
nested_nc_files = list(outpath.rglob("*.nc"))
assert len(nested_nc_files) > 0, "No .nc files found in nested structure"
for nc_file in nested_nc_files:
rel_path = nc_file.relative_to(outpath)
assert len(rel_path.parts) >= 4, \
f"File {nc_file} not deep enough: {rel_path.parts}"


def test_cli_fre_pp_split_netcdf_no_rename(split_rename_ncgen, tmp_path):
''' fre pp split-netcdf without --rename produces flat output '''
case = SPLIT_RENAME_CASES["ts"]
workdir = os.path.join(SPLIT_NETCDF_TEST_DIR, case["dir"])
infile = os.path.join(workdir, case["nc"])
outfiledir = str(tmp_path / "no_rename")
result = runner.invoke(fre.fre, args=["pp", "split-netcdf",
"--file", infile,
"--outputdir", outfiledir,
"--variables", "all"])
assert result.exit_code == 0
outpath = Path(outfiledir)
root_nc_files = list(outpath.glob("*.nc"))
assert len(root_nc_files) > 0, "No flat .nc files at root without --rename"
subdirs = [d for d in outpath.iterdir() if d.is_dir()]
assert len(subdirs) == 0, f"Subdirs created without --rename: {subdirs}"
Loading