Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 120 additions & 46 deletions ush/python/pygfs/task/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import shutil
import tarfile
from logging import getLogger
from typing import List
from typing import List, Tuple, Union
from wxflow import (AttrDict, FileHandler, Hsi, Htar, Task, to_timedelta,
chgrp, get_gid, logit, mkdir_p, parse_j2yaml, rm_p, rmdir,
strftime, to_YMDH, which, chdir, ProcessError, save_as_yaml,
Expand Down Expand Up @@ -749,77 +749,151 @@ def _pop_git_info(self, arch_dict: AttrDict) -> None:

return

def _normalize_arch_cyc(self, arch_cyc: Union[int, List[int], Tuple[int, ...]]) -> List[int]:
"""
Normalizes ARCH_CYC configuration to a list of integers.

Parameters
----------
arch_cyc : int, list of int, or tuple of int
Cycle hour(s) for archiving

Returns
-------
List[int]
List of cycle hours as integers

Raises
------
ValueError
If arch_cyc is not an int, list, or tuple, or contains non-integer values
"""
if isinstance(arch_cyc, int):
return [arch_cyc]
elif isinstance(arch_cyc, (list, tuple)):
try:
return [int(cyc) for cyc in arch_cyc]
except (ValueError, TypeError) as e:
raise ValueError(f"ARCH_CYC list must contain only integers: {e}")
else:
raise ValueError("ARCH_CYC must be an int or list/tuple of ints.")

def _arch_warm_start_increments(self, arch_dict: AttrDict) -> bool:
"""
This method determines if warm restart increments are to be archived based on the
configuration settings ARCH_CYC (integer or list of cycle numbers) and
ARCH_FCSTICFREQ (integer frequency in days) and the current cycle.
Determines whether warm restart increments should be archived for the current cycle.

Parameters
----------
arch_dict : AttrDict
Dictionary containing configuration options, including:
- ARCH_CYC (int or list of int): Valid cycle hours for archiving
- ARCH_FCSTICFREQ (int): Frequency in days for archiving forecast ICs
- current_cycle (datetime): The current cycle datetime
- SDATE (datetime): Reference start date
- assim_freq (int or str): Assimilation frequency in hours

Returns
-------
bool
True if warm restart increments should be archived, False otherwise.
"""

# Get the variables need to determine if warm restart increments should be archived
# Normalize ARCH_CYC to a list of integers
cycle_hours = self._normalize_arch_cyc(arch_dict.ARCH_CYC)

# Get the current cycle and the ARCH_CYC
# Check if current cycle hour matches any configured cycle hour
cycle_HH = int(strftime(arch_dict.current_cycle, "%H"))
if cycle_HH not in cycle_hours:
return False

# Standardize ARCH_CYC into a list to handle both int and list inputs
raw_arch_cyc = arch_dict.ARCH_CYC
arch_cyc = [raw_arch_cyc] if isinstance(raw_arch_cyc, int) else raw_arch_cyc
# Validate archiving frequency
try:
fcsticfreq = int(arch_dict.ARCH_FCSTICFREQ)
except (ValueError, TypeError) as e:
raise ValueError(f"ARCH_FCSTICFREQ must be an integer: {e}")

# Check if the current hour is in the list of valid cycle hours
if cycle_HH not in arch_cyc:
# Not the right cycle hour
if fcsticfreq <= 0:
return False

# Calculate the offset date for increments and check day frequency
SDATE = arch_dict.SDATE
assim_freq = arch_dict.assim_freq
ics_offset_cycle = add_to_datetime(arch_dict.current_cycle, to_timedelta(f"+{assim_freq}H"))
days_since_sdate = (ics_offset_cycle - SDATE).days
# Calculate offset cycle and check day frequency
try:
SDATE = arch_dict.SDATE
assim_freq = int(arch_dict.assim_freq)
ics_offset_cycle = add_to_datetime(
arch_dict.current_cycle,
to_timedelta(f"+{assim_freq}H")
)
except (AttributeError, KeyError, ValueError, TypeError) as e:
raise ValueError(f"Invalid configuration for date calculations: {e}")

if arch_dict.ARCH_FCSTICFREQ > 0 and days_since_sdate % arch_dict.ARCH_FCSTICFREQ == 0:
# We are on the right cycle hour and the right day
days_since_sdate = (ics_offset_cycle - SDATE).days
if days_since_sdate % fcsticfreq == 0:
return True

# Otherwise, do not archive warm restarts
return False

def _arch_warm_restart_ics(self, arch_dict: AttrDict) -> bool:
"""
This method determines if warm ICs are to be archived based on the
configuration settings ARCH_CYC (integer or list of cycle numbers) and
ARCH_WARMICFREQ (integer frequency in days) and the current cycle.
Determines whether warm initial conditions (ICs) should be archived for the current cycle.

For GDAS and EnKFGDAS runs, the archive cycle hours are adjusted by subtracting the
assimilation frequency, as ICs lag forecast increments by that amount.

Parameters
----------
arch_dict : AttrDict
Dictionary containing configuration options, including:
- ARCH_CYC (int or list of int): Target cycle hour(s) for archiving
- ARCH_WARMICFREQ (int): Frequency in days for archiving warm ICs
- current_cycle (datetime): The current cycle datetime
- SDATE (datetime): Reference start date
- assim_freq (int or str): Assimilation frequency in hours
- RUN (str): Run type identifier (e.g., "gdas", "gfs")

Returns
-------
bool
True if warm ICs should be archived, False otherwise.
"""
HOURS_PER_DAY = 24

# Get the variables need to determine if warm restart ICs should be archived
# Extract and validate basic configuration
try:
cycle_HH = int(strftime(arch_dict.current_cycle, "%H"))
SDATE = arch_dict.SDATE
RUN = arch_dict.RUN.lower()
assim_freq = int(arch_dict.assim_freq)
warmicfreq = int(arch_dict.ARCH_WARMICFREQ)
except (AttributeError, KeyError, ValueError, TypeError) as e:
raise ValueError(f"Invalid or missing configuration in arch_dict: {e}")

# Validate frequency
if warmicfreq <= 0:
return False

cycle_HH = int(strftime(arch_dict.current_cycle, "%H"))
SDATE = arch_dict.SDATE
RUN = arch_dict.RUN
assim_freq = int(arch_dict.assim_freq)

# Standardize ARCH_CYC into a list to handle both int and list inputs
raw_arch_cyc = arch_dict.ARCH_CYC
arch_cyc_list = [raw_arch_cyc] if isinstance(raw_arch_cyc, int) else raw_arch_cyc

# Calculate the target cycles, adjusting for GDAS lag if necessary
if "gdas" in RUN:
# Apply the (x - freq) % 24 logic to every element in the list
arch_cyc_to_check = [(c - assim_freq) % 24 for c in arch_cyc_list]
else:
arch_cyc_to_check = arch_cyc_list
# Normalize ARCH_CYC to a list of integers
cycle_hours = self._normalize_arch_cyc(arch_dict.ARCH_CYC)

# Adjust cycle hours for GDAS runs
# GDAS and EnKFGDAS ICs lag forecast increments by assim_freq hours
is_gdas_run = RUN.startswith("gdas") or RUN.startswith("enkfgdas")
adjusted_cycle_hours = []
for cyc_hour in cycle_hours:
if is_gdas_run:
adjusted_hour = (cyc_hour - assim_freq) % HOURS_PER_DAY
else:
adjusted_hour = cyc_hour
adjusted_cycle_hours.append(adjusted_hour)

# Now check if the current hour is in our list of valid hours
if cycle_HH not in arch_cyc_to_check:
# Not the right cycle hour
# Check if current cycle hour matches any adjusted cycle hour
if cycle_HH not in adjusted_cycle_hours:
return False

# Check the day frequency
# Check if the day frequency criterion is met
days_since_sdate = (arch_dict.current_cycle - SDATE).days
if arch_dict.ARCH_WARMICFREQ > 0 and days_since_sdate % arch_dict.ARCH_WARMICFREQ == 0:
# We are on the right cycle hour and the right day
if days_since_sdate % warmicfreq == 0:
return True

# Otherwise, do not archive warm restarts
return False

def _arch_restart(self, arch_dict: AttrDict) -> bool:
Expand Down
Loading