Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 36 additions & 20 deletions fre/pp/frepp.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,51 @@ def split_netcdf_wrapper(inputdir, outputdir, component, history_source, use_sub
@click.option('-v', '--variables', type = str, required=True,
help='''Specifies which variables in $file are split and written to $outputdir.
Either a string "all" or a comma-separated string of variable names ("tasmax,tasmin,pr")''')
def split_netcdf(file, outputdir, variables):
@click.option('-r', '--rename', is_flag=True, default=False,
help='After splitting, rename output files into a nested directory structure '
'organized by frequency and duration under $outputdir.')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"During splitting", not "after splitting", to make it clear it's a single operation not two.

@click.option('-d', '--diag-manifest', type=str, required=False, default=None,
help='Path to FMS diag manifest file. Only used with --rename. '
'Required when input file has one timestep and no time bounds.')
def split_netcdf(file, outputdir, variables, rename, diag_manifest):
''' Splits a single netcdf file into one netcdf file per data variable and writes
files to $outputdir.
$variables is an option to filter the variables split out of $file and
written to $outputdir. If set to "all" (the default), all data variables
in $file are split and written to $outputdir; if set to a comma-separated
string of variable names, only the variable names in the string will be
split and written to $outputdir. If no variable names in $variables match
variables in $file, no files will be written to $outputdir.'''
variables in $file, no files will be written to $outputdir.

If --rename is set, split files are additionally reorganized into a nested
directory structure under $outputdir with frequency and duration
(e.g. atmos_daily/P1D/P6M/atmos_daily.00010101-00010630.temp.tile1.nc).'''
from pathlib import Path
var_list = variables.split(",")
split_netcdf_script.split_file_xarray(file, outputdir, variables)
if rename:
outpath = Path(outputdir)
basename = Path(file).stem
pattern = f"{basename}.*.nc"
split_files = list(outpath.glob(pattern))
renamed_files = []
try:
for split_file in split_files:
new_rel_path = rename_split_script.rename_file(split_file, diag_manifest)
new_full_path = outpath / new_rel_path
rename_split_script.link_or_copy(str(split_file), str(new_full_path))
renamed_files.append((split_file, new_full_path))
except Exception as exc:
fre_logger.error(f"Error renaming split files: {exc}")
fre_logger.error("Cleaning up partially renamed files")
for _, renamed_path in renamed_files:
if Path(renamed_path).exists():
Path(renamed_path).unlink()
raise
for split_file in split_files:
if split_file.exists():
split_file.unlink()
fre_logger.info(f"Renamed {len(split_files)} split files under {outputdir}")


#fre pp ppval
Expand Down Expand Up @@ -271,21 +305,3 @@ def trigger(experiment, platform, target, time):
Start postprocessing history files that represent a specific chunk of time
"""
trigger_script.trigger(experiment, platform, target, time)

# fre pp rename-split
@pp_cli.command()
@click.option("-i", "--input-dir", type=str,
help="Input directory", required=True)
@click.option("-o", "--output-dir", type=str,
help="Output directory", required=True)
@click.option("-c", "--component", type=str,
help="Component name to process", required=True)
@click.option("-u", '--use-subdirs', is_flag=True, default=False,
help="Whether to search subdirs underneath $inputdir for netcdf files. Defaults to false. This option is used in flow.cylc when regridding.")
@click.option("-d", "--diag-manifest", type=str, required=False, default=None,
help="Path to FMS diag manifest associated with the component (history file). Optional, but required when the history file has one timestep and no time bounds.")
def rename_split(input_dir, output_dir, component, use_subdirs, diag_manifest):
"""
Create per-variable timeseries from shards
"""
rename_split_script.rename_split(input_dir, output_dir, component, use_subdirs, diag_manifest)
234 changes: 234 additions & 0 deletions fre/pp/tests/test_split_netcdf_rename.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
'''
Tests split-netcdf with --rename flag.
Tests the combined split + rename functionality that reorganizes
split netcdf files into a nested directory structure with frequency and duration.

Uses the existing split-netcdf test data (atmos_daily, ocean_static) to verify
the --rename flag behavior via both CLI (CliRunner) and direct import.
'''

import pytest
import re
import subprocess
import os
from os import path as osp
import pathlib
from pathlib import Path
from fre import fre
from fre.pp import split_netcdf_script
from fre.pp import rename_split_script

import click
from click.testing import CliRunner
runner = CliRunner()

test_dir = osp.realpath("fre/tests/test_files/ascii_files/split_netcdf")

cases = {"ts": {"dir": "atmos_daily.tile3",
"nc": "00010101.atmos_daily.tile3.nc",
"cdl": "00010101.atmos_daily.tile3.cdl"},
"static": {"dir": "ocean_static",
"nc": "00010101.ocean_static.nc",
"cdl": "00010101.ocean_static.cdl"}}

casedirs = [osp.join(test_dir, el) for el in [cases["ts"]["dir"], cases["static"]["dir"]]]

rename_outdir_prefix = "new_rename_"


def test_split_rename_setup():
'''Sets up the netcdf files needed for split+rename testing.'''
ncgen_commands = []
sp_stat = []
for testcase in cases.keys():
cds = osp.join(test_dir, cases[testcase]["dir"])
ncgen_commands.append(["ncgen3", "-k", "netCDF-4", "-o",
osp.join(cds, cases[testcase]["nc"]),
osp.join(cds, cases[testcase]["cdl"])])
for ncg in ncgen_commands:
sp = subprocess.run(ncg, check=True, capture_output=True)
sp_stat.append(sp.returncode)
sp_success = [el == 0 for el in sp_stat]
assert all(sp_success)


@pytest.mark.parametrize("workdir,infile,outfiledir,varlist",
[pytest.param(casedirs[0], cases["ts"]["nc"],
rename_outdir_prefix + "ts_all", "all",
id="rename_ts_all"),
pytest.param(casedirs[1], cases["static"]["nc"],
rename_outdir_prefix + "static_all", "all",
id="rename_static_all")])
def test_split_rename_cli_run(workdir, infile, outfiledir, varlist):
'''Tests split-netcdf with --rename flag via CLI CliRunner.

Verifies that the command exits successfully when --rename is used.
'''
infile = osp.join(workdir, infile)
outfiledir = osp.join(workdir, outfiledir)
split_netcdf_args = ["pp", "split-netcdf",
"--file", infile,
"--outputdir", outfiledir,
"--variables", varlist,
"--rename"]
result = runner.invoke(fre.fre, args=split_netcdf_args)
print(result.output)
if result.exception:
import traceback
traceback.print_exception(type(result.exception), result.exception, result.exception.__traceback__)
assert result.exit_code == 0


@pytest.mark.parametrize("workdir,outfiledir,expected_component",
[pytest.param(casedirs[0],
rename_outdir_prefix + "ts_all",
"atmos_daily",
id="rename_ts_structure"),
pytest.param(casedirs[1],
rename_outdir_prefix + "static_all",
"ocean_static",
id="rename_static_structure")])
def test_split_rename_cli_structure(workdir, outfiledir, expected_component):
'''Verifies that split+rename created the expected nested directory structure.

After split+rename:
- Timeseries: outputdir/component/freq/duration/component.date1-date2.var.tile.nc
- Static: outputdir/component/P0Y/P0Y/component.var.nc

Also verifies no flat .nc files remain at the root of outputdir.
'''
outfiledir = osp.join(workdir, outfiledir)
outpath = Path(outfiledir)

# Check that the component directory was created
component_dir = outpath / expected_component
assert component_dir.is_dir(), f"Expected component directory {component_dir} not found"

# Check that no flat .nc files remain at the root of outfiledir
root_nc_files = list(outpath.glob("*.nc"))
assert len(root_nc_files) == 0, f"Flat .nc files remain at root: {root_nc_files}"

# Check that .nc files exist somewhere in the nested structure
nested_nc_files = list(outpath.rglob("*.nc"))
assert len(nested_nc_files) > 0, "No .nc files found in nested structure"


@pytest.mark.parametrize("workdir,outfiledir",
[pytest.param(casedirs[0],
rename_outdir_prefix + "ts_all",
id="rename_ts_freq"),
pytest.param(casedirs[1],
rename_outdir_prefix + "static_all",
id="rename_static_freq")])
def test_split_rename_cli_freq_dirs(workdir, outfiledir):
'''Verifies that split+rename created frequency and duration subdirectories.

For timeseries (atmos_daily), expects freq/duration dirs (e.g. P1D/P6M)
For static (ocean_static), expects P0Y/P0Y
'''
outfiledir = osp.join(workdir, outfiledir)
outpath = Path(outfiledir)

# Find all .nc files
nc_files = list(outpath.rglob("*.nc"))
assert len(nc_files) > 0

# Check that each .nc file is at least 3 levels deep
# (component/freq/duration/file.nc)
for nc_file in nc_files:
rel_path = nc_file.relative_to(outpath)
parts = rel_path.parts
assert len(parts) >= 4, \
f"File {nc_file} is not deep enough: {parts}"


def test_split_rename_import_run():
'''Tests split+rename via direct import (standard import path).

Uses split_file_xarray + rename_file + link_or_copy directly.
'''
workdir = casedirs[0]
infile_name = cases["ts"]["nc"]
infile = osp.join(workdir, infile_name)
outfiledir = osp.join(workdir, rename_outdir_prefix + "import_ts")

# Split the file
split_netcdf_script.split_file_xarray(infile, outfiledir, "all")

# Rename the split files
outpath = Path(outfiledir)
basename = Path(infile).stem
pattern = f"{basename}.*.nc"
split_files = list(outpath.glob(pattern))
assert len(split_files) > 0, "No split files were created"

for split_file in split_files:
new_rel_path = rename_split_script.rename_file(split_file)
new_full_path = outpath / new_rel_path
rename_split_script.link_or_copy(str(split_file), str(new_full_path))
split_file.unlink()

# Verify no flat .nc files remain at root
root_nc_files = list(outpath.glob("*.nc"))
assert len(root_nc_files) == 0, f"Flat .nc files remain at root: {root_nc_files}"

# Verify nested structure was created
nested_nc_files = list(outpath.rglob("*.nc"))
assert len(nested_nc_files) > 0, "No .nc files found in nested structure"

# Verify component directory
component_dir = outpath / "atmos_daily"
assert component_dir.is_dir(), f"Component directory {component_dir} not found"


def test_split_rename_without_flag():
'''Tests that split-netcdf without --rename produces flat output (no nesting).

This verifies backward compatibility.
'''
workdir = casedirs[0]
infile_name = cases["ts"]["nc"]
infile = osp.join(workdir, infile_name)
outfiledir = osp.join(workdir, rename_outdir_prefix + "no_rename")

split_netcdf_args = ["pp", "split-netcdf",
"--file", infile,
"--outputdir", outfiledir,
"--variables", "all"]
result = runner.invoke(fre.fre, args=split_netcdf_args)
assert result.exit_code == 0

outpath = Path(outfiledir)
# Verify flat .nc files exist at root (no nesting)
root_nc_files = list(outpath.glob("*.nc"))
assert len(root_nc_files) > 0, "No flat .nc files at root without --rename"

# Verify no subdirectories were created
subdirs = [d for d in outpath.iterdir() if d.is_dir()]
assert len(subdirs) == 0, f"Subdirs created without --rename: {subdirs}"


def test_split_rename_cleanup():
'''Cleans up files and dirs created for split+rename tests.'''
el_list = []
dir_list = []
for path, subdirs, files in os.walk(test_dir):
for name in files:
if name.endswith(".nc"):
el_list.append(osp.join(path, name))
for name in subdirs:
if name.startswith(rename_outdir_prefix):
dir_list.append(osp.join(path, name))
for nc in el_list:
pathlib.Path.unlink(Path(nc))
# Sort in reverse to delete deepest dirs first
all_dirs = []
for d in dir_list:
for path, subdirs, files in os.walk(d, topdown=False):
all_dirs.append(path)
for d in sorted(all_dirs, reverse=True):
if osp.isdir(d):
pathlib.Path.rmdir(Path(d))
dir_deleted = [not osp.isdir(el) for el in dir_list]
el_deleted = [not osp.isfile(el) for el in el_list]
assert all(el_deleted + dir_deleted)
2 changes: 1 addition & 1 deletion fre/tests/test_files/rename-split/README
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
1. Notes on test data

rename-split-to-pp is called directly after split-netcdf in the fre workflow;
you shoudl be using split-netcdf output as input for rename-split-to-pp
you should be using split-netcdf output as input for rename-split-to-pp

netcdf files -> split-netcdf -> rename-split-to-pp

Expand Down
12 changes: 12 additions & 0 deletions fre/tests/test_fre_pp_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,15 @@ def test_cli_fre_pp_split_netcdf_opt_dne():
''' fre pp split-netcdf optionDNE '''
result = runner.invoke(fre.fre, args=["pp", "split-netcdf", "optionDNE"])
assert result.exit_code == 2

def test_cli_fre_pp_split_netcdf_rename_help():
''' fre pp split-netcdf --help includes --rename option '''
result = runner.invoke(fre.fre, args=["pp", "split-netcdf", "--help"])
assert result.exit_code == 0
assert "--rename" in result.output

def test_cli_fre_pp_split_netcdf_diag_manifest_help():
''' fre pp split-netcdf --help includes --diag-manifest option '''
result = runner.invoke(fre.fre, args=["pp", "split-netcdf", "--help"])
assert result.exit_code == 0
assert "--diag-manifest" in result.output
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the new test above it do not test anything other than the existence of the click options!

Loading