diff --git a/configs/esm_software/esm_runscripts/defaults.yaml b/configs/esm_software/esm_runscripts/defaults.yaml index 0a23ca5fa..acf397b40 100644 --- a/configs/esm_software/esm_runscripts/defaults.yaml +++ b/configs/esm_software/esm_runscripts/defaults.yaml @@ -9,12 +9,61 @@ general: # Defaults to be added to each model or component per_model_defaults: - file_movements: - default: - all_directions: copy - bin: - init_to_exp: copy - exp_to_run: copy - run_to_work: copy - work_to_run: copy + file_movements: + default: + all_directions: copy + bin: + init_to_exp: copy + exp_to_run: copy + run_to_work: copy + work_to_run: copy +workflow: + user_phases: None + default_cluster: sim_cluster + clusters: + sim_cluster: + first_task_in_queue: prepcompute + last_task_in_queue: tidy + next_run_triggered_by: tidy + phases: + - prepcompute + - compute + - tidy + + phases: + prepcompute: + phase_type: SimulationSetup + called_from: tidy + cluster: sim_cluster + name: prepcompute + next_submit: + - compute + nproc: 1 + order_in_cluster: sequential + run_after: tidy + run_before: compute + compute: + phase_type: compute + called_from: prepcompute + cluster: sim_cluster + name: compute + next_submit: + - tidy + #nproc: None + nproc: 1 + order_in_cluster: sequential + run_after: prepcompute + run_before: tidy + #run_on_queue: ${computer.partitions.pp.name} + tidy: + phase_type: SimulationSetup + called_from: compute + cluster: sim_cluster + name: tidy + next_submit: + - prepcompute + nproc: 1 + order_in_cluster: sequential + run_after: compute + run_before: prepcompute diff --git a/configs/esm_software/esm_runscripts/esm_plugins.yaml b/configs/esm_software/esm_runscripts/esm_plugins.yaml index 47158d88e..1ddcd35e5 100644 --- a/configs/esm_software/esm_runscripts/esm_plugins.yaml +++ b/configs/esm_software/esm_runscripts/esm_plugins.yaml @@ -1,6 +1,6 @@ # Mappings of functions/methods to parent ESM python libraries: # tells ESM-Tools in which library and file/sublibrary can find the functions -# of the recipies +# of the recipes # (prescribed in configs/esm_software/esm_runscripts/esm_runscripts.yaml). # # Core (not an external plugin) @@ -25,8 +25,8 @@ core: - "initialize_batch_system" - "initialize_coupler" - "set_logfile" - #- "add_vcs_info" - #- "check_vcs_info_against_last_run" + - "add_vcs_info" + - "check_vcs_info_against_last_run" - "check_config_for_warnings_errors" prepexp: @@ -111,3 +111,6 @@ core: workflow: - "assemble_workflow" + postprocess: + - "convert_to_zarr" + diff --git a/configs/esm_software/esm_runscripts/esm_runscripts.yaml b/configs/esm_software/esm_runscripts/esm_runscripts.yaml index 8e425a98e..c55619c62 100644 --- a/configs/esm_software/esm_runscripts/esm_runscripts.yaml +++ b/configs/esm_software/esm_runscripts/esm_runscripts.yaml @@ -1,4 +1,4 @@ -# Default recipies +# Default recipes # ESM-Tools uses config/esm_software/esm_runscripts/esm_plugins.yaml to understand # where to look for each of this function/methods (the steps within each recipy, e.g. # "_read_date_file"). @@ -49,7 +49,7 @@ choose_job_type: - "copy_tools_to_thisrun" # The next step will call esm_runscripts again from the experiment folder, # if the current folder is not the experiment folder already. - # If esm_runscripts will be excuted, the following step will be skipped, since + # If esm_runscripts will be executed, the following step will be skipped, since # there is a sys.exit() after the esm_runscripts call. - "call_esm_runscripts_from_prepexp" # The following step will be skipped, if not in experiment folder. diff --git a/runscripts/fesom2/fesom2.6-albedo-reference.yaml b/runscripts/fesom2/fesom2.6-albedo-reference.yaml new file mode 100644 index 000000000..3af430553 --- /dev/null +++ b/runscripts/fesom2/fesom2.6-albedo-reference.yaml @@ -0,0 +1,25 @@ +general: + user: !ENV ${USER} + account: "computing.computing" + setup_name: fesom + compute_time: "08:00:00" + initial_date: "1958-01-01" + final_date: "1958-03-31" + project_base: !ENV ${PROJECT_BASE} + base_dir: "${general.project_base}/experiments" + nyear: 0 + nmonth: 1 + use_venv: False + +fesom: + version: 2.6 + model_dir: "${general.project_base}/model-codes/fesom-2.6/" + mesh_dir: /albedo/pool/FESOM2/core2/ + restart_rate: 1 + restart_first: 1 + restart_unit: "m" + resolution: "CORE2" + lresume: false + time_step: 1800 + # FIXME(@pgierz, @hpc-team): We have FESOM and FESOM2 in the albedo pool, this should be unified... + forcing_data_dir: "/albedo/pool/FESOM/forcing/" diff --git a/src/esm_plugin_manager/esm_plugin_manager.py b/src/esm_plugin_manager/esm_plugin_manager.py index 02d18ecaf..82adef679 100644 --- a/src/esm_plugin_manager/esm_plugin_manager.py +++ b/src/esm_plugin_manager/esm_plugin_manager.py @@ -8,12 +8,11 @@ import esm_parser import esm_profile -from esm_parser import yaml_file_to_dict def read_recipe(recipe, additional_dict, needs_parse=True): if needs_parse: - recipe = yaml_file_to_dict(recipe) + recipe = esm_parser.yaml_file_to_dict(recipe) recipe.update(additional_dict) esm_parser.basic_choose_blocks(recipe, recipe) esm_parser.recursive_run_function( @@ -26,7 +25,7 @@ def read_recipe(recipe, additional_dict, needs_parse=True): def read_plugin_information(plugins_bare, recipe, needs_parse=True): # pluginfile = esm_plugins.yaml if needs_parse: - plugins_bare = yaml_file_to_dict(plugins_bare) + plugins_bare = esm_parser.yaml_file_to_dict(plugins_bare) extra_info = ["location", "git-url"] plugins = {} for workitem in recipe["recipe"]: @@ -62,6 +61,7 @@ def read_plugin_information(plugins_bare, recipe, needs_parse=True): if found: break + breakpoint() attach_installed_plugins_to_all(plugins) return plugins @@ -127,17 +127,31 @@ def check_plugin_availability(plugins): def work_through_recipe(recipe, plugins, config): + """ + Works through the esm_runscripts recipes and plugin recipes. + + Parameters + ---------- + recipe : dict # What is in these two dictionaries? Where do the entries are comming from? + plugins : dict + config : dict + + Returns + ------- + config : dict + """ if config.get("general", {}).get("debug_recipe", False): import pdb pdb.set_trace() recipes = recipe["recipe"] recipe_name = recipe["job_type"] + # Loop over the recipe for index, workitem in enumerate(recipes, start=1): if config["general"].get("verbose", False): # diagnostic message of which recipe step is being executed message = ( - f"::: Executing the step: {workitem} " + f"::: START Executing the step: {workitem} " f"(step [{index}/{len(recipes)}] of the job: " f'{recipe["job_type"]})' ) @@ -187,6 +201,19 @@ def work_through_recipe(recipe, plugins, config): config = timed_workitem_callable(config) else: config = getattr(thismodule, workitem)(config) + config = getattr(thismodule, workitem)(config) + if config["general"].get("verbose", False): + # diagnostic message of which recipe step is being executed + message = ( + f"::: END Executing the step: {workitem} " + f"(step [{index}/{len(recipes)}] of the job: " + f'{recipe["job_type"]})' + ) + + logger.info() + logger.info("=" * len(message)) + logger.info(message) + logger.info("=" * len(message)) return config diff --git a/src/esm_runscripts/batch_system.py b/src/esm_runscripts/batch_system.py index f0cd44a58..825a4521d 100644 --- a/src/esm_runscripts/batch_system.py +++ b/src/esm_runscripts/batch_system.py @@ -1,12 +1,14 @@ import copy import os +import pdb import stat import sys import textwrap +from loguru import logger + import esm_environment from esm_parser import find_variable, user_error, user_note -from loguru import logger from . import dataprocess, helpers, prepare from .pbs import Pbs @@ -21,7 +23,6 @@ class UnknownBatchSystemError(Exception): class batch_system: - # all wrappers to slurm, pbs and co as esm_runscript # should be written independent of actual batch system def __init__(self, config, name): @@ -323,7 +324,7 @@ def get_extra(config): ) elif isinstance(pre_run_commands, str): extras.append(pre_run_commands) - elif pre_run_commands == None: + elif pre_run_commands is None: continue else: user_error( @@ -345,7 +346,7 @@ def append_start_statement(config, subjob): config["general"]["run_number"], config["general"]["current_date"], config["general"]["jobid"], - "- start", + "- start from run script", ], timestampStr_from_Unix=True, ) @@ -361,7 +362,7 @@ def append_done_statement(config, subjob): config["general"]["run_number"], config["general"]["current_date"], config["general"]["jobid"], - "- done", + "- done from run script", ], timestampStr_from_Unix=True, ) @@ -370,9 +371,37 @@ def append_done_statement(config, subjob): @staticmethod def get_run_commands(config, subjob, batch_or_shell): # here or in compute.py? + """ + Creates the command of the specific phase to be put in the *.run file. + + This function is covering the following phase types: + - SimulationSetup: phases that are run as 'esm_runscripts' command + - batch: phases that are run via 'srun' command + - shell: phases that are run as shell scripts. The command is generated by + a function in the 'dataprocess' module. + + Special case: phase 'compute': + - This phase is of type 'batch' + + Todo: How about other phases of type batch? in dataprocess??? + + Parameters + ---------- + config: dict + subjob: str + Name of phase + batch_or_shell: str + Type of phase (SimulationSetup, batch, shell) + + Returns + ------- + commands: list + List of command and arguments of a phase depending of its type. + """ commands = [] if subjob.startswith("compute"): + # for batch jobs if config["general"].get("submit_to_batch_system", True): batch_system = config["computer"] if "execution_command" in batch_system: @@ -383,6 +412,7 @@ def get_run_commands(config, subjob, batch_or_shell): # here or in compute.py? ) if config["general"].get("multi_srun"): return self.bs.get_run_commands_multisrun(config, commands) + # for shell scripts else: for model in config: if model == "computer": @@ -394,9 +424,53 @@ def get_run_commands(config, subjob, batch_or_shell): # here or in compute.py? + f" 2>&1{config['computer'].get('write_execution_log', '')} &" ) else: - subjob_tasks = dataprocess.subjob_tasks(config, subjob, batch_or_shell) - for task in subjob_tasks: - commands.append(task) + if batch_or_shell == "SimulationSetup": + # for phase type 'SimulationSetup' (e.g. prepcompute, tidy) + commands = [] + commands.append("esm_runscripts") + # add runscript with absolute path + runscript = config["general"]["runscript_abspath"] + commands.append(runscript) + # add experiment id + commands.append(f"-e {config['general']['expid']}") + # add phase + commands.append(f"--phase {subjob}") + # add task + commands.append(f"-t run_phase") + # add date + commands.append( + "-s " + + config["general"]["current_date"].format( + form=9, givenph=False, givenpm=False, givenps=False + ) + ) + # add + commands.append(f"-r {str(config['general']['run_number'])}") + # add verbose and no message_of_the day argument + commands.append("-v --no-motd") + # add last-jobtype argument + commands.append(f"--last-jobtype {subjob}") + # add --open-ran or use_venv argument + if "--open-run" in config["general"]["original_command"] or not config[ + "general" + ].get("use_venv"): + commands.append(" --open-run") + elif "--contained-run" in config["general"][ + "original_command" + ] or config["general"].get("use_venv"): + commands.append("--contained-run") + else: + print("ERROR -- Not sure if you were in a contained or open run!") + print( + "ERROR -- See write_simple_runscript for the code causing this." + ) + sys.exit(1) + else: + # for all other phase types (batch, shell) except phase 'compute' + subjob_tasks = dataprocess.subjob_tasks(config, subjob, batch_or_shell) + # Why was this necessary? And not set commands directly? + for task in subjob_tasks: + commands.append(task) return commands @@ -427,8 +501,150 @@ def get_submit_command(config, batch_or_shell, runfilename): return commands @staticmethod - def write_simple_runscript(config, cluster, batch_or_shell="batch"): + def write_run_batch_script(config, cluster, batch_or_shell="batch"): + workflow = config["general"]["workflow"]["object"] + phases = workflow.clusters[cluster]["phases"] + + self = config["general"]["batch"] + runfilename = batch_system.get_run_filename(config, cluster) + if config["general"]["verbose"]: + print("jobtype: ", config["general"]["jobtype"]) + print("writing run file for:", cluster) + + with open(runfilename, "w") as runfile: + config = batch_system.calculate_requirements(config, "compute") + # TODO: remove it once it's not needed anymore (substituted by packjob) + if cluster in reserved_jobtypes and config["computer"].get( + "taskset", False + ): + config = config["general"]["batch"].write_het_par_wrappers(config) + # Prepare launcher + config = config["general"]["batch"].prepare_launcher(config, "compute") + # Initiate the header + header = batch_system.get_batch_header(config, "compute") + for line in header: + runfile.write(line + "\n") + runfile.write("\n") + # environment for each phase of a cluster + environment = batch_system.get_environment(config, "compute") + batch_system.write_env(config, environment, runfilename) + for line in environment: + runfile.write(line + "\n") + + # extra entries for each phase + extra = batch_system.get_extra(config) + for line in extra: + runfile.write(line + "\n") + + for phase in phases: + phase = workflow.get_workflow_phase_by_name(phase) + # Add actual commands + commands = batch_system.get_run_commands( + config, phase["name"], batch_or_shell + ) + # commands = clusterconf.get("data_task_list", []) + runfile.write("\n") + runfile.write( + "#********** Start phase " + phase["name"] + " *************\n" + ) + runfile.write(self.append_start_statement(config, phase["name"]) + "\n") + runfile.write("\n") + + # if cluster in reserved_jobtypes: + config["general"]["batch"].add_pre_launcher_lines( + config, cluster, runfile + ) + + command = phase["run_command"] + if phase["phase_type"] == "SimulationSetup": + runfile.write( + "cd " + config["general"]["experiment_scripts_dir"] + "\n" + ) + runfile.write(f"{command} --run-from-batch-script\n") + elif phase["phase_type"] == "compute": + runfile.write("cd " + config["general"]["thisrun_work_dir"] + "\n") + observe_call = ( + "esm_runscripts " + + config["general"]["started_from"] + + config["general"]["scriptname"] + + " -e " + + config["general"]["expid"] + + " -t observe" + + " --phase " + + phase["name"] + + " -p ${process}" + + " -s " + + config["general"]["current_date"].format( + form=9, givenph=False, givenpm=False, givenps=False + ) + + " -r " + + str(config["general"]["run_number"]) + + " -v " + + " --last-jobtype " + + config["general"]["jobtype"] + + " --open-run" + ) + runfile.write(f"{command}\n") + runfile.write("process=$!\n") + runfile.write("\n") + runfile.write( + "#********** Start to observe " + + phase["name"] + + " *************\n" + ) + runfile.write(self.append_start_statement(config, "observe") + "\n") + runfile.write( + "cd " + config["general"]["experiment_scripts_dir"] + "\n" + ) + runfile.write(f"{observe_call}\n") + runfile.write("\n") + runfile.write("wait\n") + runfile.write(self.append_done_statement(config, "observe") + "\n") + doneline = ( + "echo " + + line + + " >> " + + config["general"]["experiment_log_file"] + ) + else: + runfile.write(f"{command}\n") + runfile.write(self.append_done_statement(config, phase["name"]) + "\n") + # TODO: Check if end_or_experiment(config) -> can not do from . import resubmit will give error dont know why + if not config["general"]["next_date"] >= config["general"]["final_date"]: + runfile.write("\n") + runfile.write( + "#********** Call esm_runscripts to restart simulation. *************\n" + ) + + expid = config["general"].get("expid", None) + scriptname = config["general"].get("scriptname", None) + current_date = config["general"].get("current_date", None) + current_date = current_date.format( + form=9, givenph=False, givenpm=False, givenps=False + ) + + run_number = config["general"].get("run_number", None) + + runfile.write(self.append_start_statement(config, "restart") + "\n") + runfile.write( + f"esm_runscripts {scriptname} -e {expid} -t restart --open-run -v --no-motd --run-from-batch-script --open-run -s {current_date} -r {run_number}" + ) + runfile.write("\n") + runfile.write(self.append_done_statement(config, "restart") + "\n") + runfile.write("\n") + + runfile.write("\n") + runfile.write("wait\n") + + config["general"]["submit_command"] = batch_system.get_submit_command( + config, batch_or_shell, runfilename + ) + + return config + + @staticmethod + def write_simple_runscript(config, cluster, batch_or_shell="batch"): # if no cluster is specified, work on the one we are in # if not cluster: # cluster = config["general"]["jobtype"] @@ -453,10 +669,8 @@ def write_simple_runscript(config, cluster, batch_or_shell="batch"): logger.debug(f"writing run file for: {cluster}") with open(runfilename, "w") as runfile: - # batch header (if any) if batch_or_shell == "batch": - config = batch_system.calculate_requirements(config, cluster) # TODO: remove it once it's not needed anymore (substituted by packjob) if cluster in reserved_jobtypes and config["computer"].get( @@ -480,7 +694,6 @@ def write_simple_runscript(config, cluster, batch_or_shell="batch"): if clusterconf: for subjob in clusterconf["subjobs"]: - # environment for each subjob of a cluster environment = batch_system.get_environment(config, subjob) batch_system.write_env(config, environment, runfilename) @@ -513,7 +726,7 @@ def write_simple_runscript(config, cluster, batch_or_shell="batch"): # dummy = 0 else: # "normal" case dummy = 0 - + # check if this cluster has has something to submit (next_submit not empty) if submits_another_job(config, cluster): # and batch_or_shell == "batch": # -j ? is that used somewhere? I don't think so, replaced by workflow # " -j "+ config["general"]["jobtype"] @@ -814,7 +1027,7 @@ def calc_launcher_flags(config, model, cluster): cpus_per_proc = config[model].get("cpus_per_proc", omp_num_threads) # Check for CPUs and OpenMP threads if omp_num_threads > cpus_per_proc: - esm_parser.user_error( + user_error( "OpenMP configuration", ( "The number of OpenMP threads cannot be larger than the number" @@ -826,7 +1039,7 @@ def calc_launcher_flags(config, model, cluster): elif "nproca" in config[model] and "nprocb" in config[model]: # ``nproca``/``nprocb`` not compatible with ``omp_num_threads`` if omp_num_threads > 1: - esm_parser.user_note( + user_note( "nproc", "``nproca``/``nprocb`` not compatible with ``omp_num_threads``", ) @@ -834,7 +1047,6 @@ def calc_launcher_flags(config, model, cluster): cpus_per_proc = 1 omp_num_threads = 1 else: - # kh 22.06.22 defensive (user_error/user_note could also be added here) nproc = 0 cpus_per_proc = 0 diff --git a/src/esm_runscripts/cli.py b/src/esm_runscripts/cli.py index 69ea10154..61cb096c7 100644 --- a/src/esm_runscripts/cli.py +++ b/src/esm_runscripts/cli.py @@ -10,6 +10,7 @@ import argparse import logging import os +import pdb import sys from loguru import logger @@ -18,7 +19,7 @@ from esm_parser import user_error from .helpers import SmartSink -from .sim_objects import * +from .sim_objects import Simulation def parse_shargs(): @@ -96,10 +97,24 @@ def parse_shargs(): parser.add_argument( "-t", "--task", - help="The task to run. Choose from: prepcompute, post, couple, tidy", + help="The task to run. Choose from: start, restart, continue, ...", + default="start", + ) + + parser.add_argument( + "-a", + "--phase", + help="The workflow phase to run. Choose from: prepcompute, compute, tidy, ...", default="unknown", ) + parser.add_argument( + "-w", + "--workflow", + help="The workflow to run. Choose from: default (sim_cluster???), ...", + default="sim_cluster", + ) + parser.add_argument( "-i", "--inspect", @@ -110,7 +125,7 @@ def parse_shargs(): parser.add_argument( "-p", "--pid", - help="The PID of the task to observe.", + help="The PID of the phase to observe.", default=-666, ) @@ -162,6 +177,12 @@ def parse_shargs(): action="store_true", ) + parser.add_argument( + "--run-from-batch-script", + default=False, + action="store_true", + ) + return parser.parse_args() @@ -202,8 +223,12 @@ def main(): update_filetypes = parsed_args["update_filetypes"] if "expid" in parsed_args: expid = parsed_args["expid"] + if "phase" in parsed_args: + jobtype = parsed_args["phase"] + # if "workflow" in parsed_args: + # workflow = parsed_args["workflow"] if "task" in parsed_args: - jobtype = parsed_args["task"] + task = parsed_args["task"] if "verbose" in parsed_args: verbose = parsed_args["verbose"] if "inspect" in parsed_args: @@ -224,6 +249,8 @@ def main(): no_motd = parsed_args["no_motd"] if "ignore_config_warnings" in parsed_args: ignore_config_warnings = parsed_args["ignore_config_warnings"] + if "run_from_batch_script" in parsed_args: + run_from_batch_script = parsed_args["run_from_batch_script"] command_line_config = {} command_line_config["check"] = check @@ -235,6 +262,8 @@ def main(): command_line_config["current_date"] = start_date command_line_config["run_number"] = run_number command_line_config["jobtype"] = jobtype + # command_line_config["workflow"] = workflow + command_line_config["task"] = task command_line_config["last_jobtype"] = ARGS.last_jobtype command_line_config["verbose"] = verbose command_line_config["inspect"] = inspect @@ -243,6 +272,7 @@ def main(): command_line_config["ignore_config_warnings"] = ignore_config_warnings if modify_config_file: command_line_config["modify_config_file"] = modify_config_file + command_line_config["run_from_batch_script"] = run_from_batch_script # runscript_from_cmdline = filter(lambda x: x.endswith(".yaml"), sys.argv) # runscript_from_cmdline = list(runscript_from_cmdline)[0] @@ -282,8 +312,7 @@ def main(): else: logger.add(sys.stdout, level="INFO", format="{message}") - setup = SimulationSetup(command_line_config=command_line_config) - # if not Setup.config['general']['submitted']: - if not setup.config["general"]["submitted"] and not no_motd: + this_sim = Simulation(command_line_config=command_line_config) + if not this_sim.config["general"]["submitted"] and not no_motd: check_all_esm_packages() - setup() + this_sim.run() diff --git a/src/esm_runscripts/config_initialization.py b/src/esm_runscripts/config_initialization.py index e12cfed5e..7e8fa2cf8 100644 --- a/src/esm_runscripts/config_initialization.py +++ b/src/esm_runscripts/config_initialization.py @@ -1,5 +1,6 @@ import copy import os +import pdb import sys import esm_parser @@ -47,7 +48,6 @@ def init_iterative_coupling(command_line_config, user_config): ][0] scriptname = user_config["general"]["original_config"][next_model]["runscript"] - # command_line_config["scriptname"] = os.path.join(user_config["general"]["started_from"], scriptname) new_command_line_config = copy.deepcopy(command_line_config) new_command_line_config["scriptname"] = scriptname new_command_line_config["runscript_abspath"] = os.path.join( @@ -58,7 +58,7 @@ def init_iterative_coupling(command_line_config, user_config): user_config = esm_parser.new_deep_update(user_config, model_config) # Set the ``iterative_coupled_model`` string, to add the model name to the - # run_ folder, finished_config.yaml, etc., to avoid overwritting with the + # run_ folder, finished_config.yaml, etc., to avoid overwriting with the # files of other offline coupled models user_config["general"][ "iterative_coupled_model" @@ -151,7 +151,7 @@ def get_user_config_from_command_line(command_line_config): If there is a problem with the parsing of the runscript """ - # Read the content of the runscrip + # Read the content of the runscript try: user_config = esm_parser.initialize_from_yaml( command_line_config["runscript_abspath"] @@ -160,7 +160,7 @@ def get_user_config_from_command_line(command_line_config): # ``check_for_empty_components`` in ``yaml_to_dict.py``) catch the sys.exit. except SystemExit as sysexit: sys.exit(sysexit) - except: + except Exception: # any other exception esm_parser.user_error( "Syntax error", f"An error occurred while reading the config file " @@ -221,7 +221,7 @@ def init_interactive_info(config, command_line_config): def get_total_config_from_user_config(user_config): """ - Finds the version of the setup in ``user_config`` instanciates the ``config`` with + Finds the version of the setup in ``user_config`` instantiates the ``config`` with ``esm_parser.ConfigSetup`` which appends all the information from the config files required for this simulation and stores it in ``config``. diff --git a/src/esm_runscripts/dataprocess.py b/src/esm_runscripts/dataprocess.py index 174b8c937..a3c93799c 100644 --- a/src/esm_runscripts/dataprocess.py +++ b/src/esm_runscripts/dataprocess.py @@ -27,17 +27,20 @@ def subjob_tasks(config, subjob, batch_or_shell): task_list = [] subjob_config = config["general"]["workflow"]["subjobs"][subjob] - old_logfile = config["general"]["logfile_path"] - logfile_dir = os.path.dirname(old_logfile) - if config["general"]["setup_name"] in subjob: - bare_subjob = subjob.replace("_" + config["general"]["setup_name"], "") - else: - bare_subjob = subjob - logfile_name = os.path.basename(old_logfile).replace( - config["general"]["jobtype"], bare_subjob - ) + old_logfile = config["general"].get("logfile_path", None) + if old_logfile: + logfile_dir = os.path.dirname(old_logfile) + if config["general"]["setup_name"] in subjob: + bare_subjob = subjob.replace("_" + config["general"]["setup_name"], "") + else: + bare_subjob = subjob + logfile_name = os.path.basename(old_logfile).replace( + config["general"]["jobtype"], bare_subjob + ) - new_logfile = os.path.join(logfile_dir, logfile_name) + new_logfile = os.path.join(logfile_dir, logfile_name) + else: + new_logfile = "logfile_please_set_name.txt" scriptdir = subjob_config.get("script_dir", False) script = subjob_config.get("script", False) diff --git a/src/esm_runscripts/logfiles.py b/src/esm_runscripts/logfiles.py index f8353df0c..9b7e92e45 100644 --- a/src/esm_runscripts/logfiles.py +++ b/src/esm_runscripts/logfiles.py @@ -15,10 +15,13 @@ def initialize_logfiles(config, org_jobtype): log_stuff = False if os.path.isdir(os.path.dirname(config["general"]["experiment_log_file"])): if not org_jobtype == "inspect": + #if not os.path.isfile(config["general"]["experiment_log_file"]): log_stuff = True + # Set name of logfile into config config = set_logfile_name(config, "") + # Writes some line to the logfile defined in config. if log_stuff: helpers.write_to_log( @@ -28,9 +31,10 @@ def initialize_logfiles(config, org_jobtype): logfile_run_number, str(config["general"]["current_date"]), str(config["general"]["jobid"]), - "- start", + "- start in initialize_logfiles", ], ) + # Creates a logfile object/handle for stdout/phase log file logfile = RuntimeLogger( config["general"]["logfile_path"], "w", @@ -39,6 +43,7 @@ def initialize_logfiles(config, org_jobtype): else: logfile = sys.stdout + # Writes logfile handle into global variable logfile_handle = logfile return config @@ -56,7 +61,7 @@ def finalize_logfiles(config, org_jobtype): logfile_run_number, str(config["general"]["current_date"]), str(config["general"]["jobid"]), - "- done", + "- done in finalize_logfiles", ], ) diff --git a/src/esm_runscripts/observe.py b/src/esm_runscripts/observe.py index 11f9a3438..c8c4995dd 100644 --- a/src/esm_runscripts/observe.py +++ b/src/esm_runscripts/observe.py @@ -3,7 +3,6 @@ import time import psutil - from loguru import logger from . import database_actions, helpers, logfiles diff --git a/src/esm_runscripts/old_workflow.py b/src/esm_runscripts/old_workflow.py new file mode 100644 index 000000000..e99352b9f --- /dev/null +++ b/src/esm_runscripts/old_workflow.py @@ -0,0 +1,782 @@ +def should_skip_cluster(cluster, config): + """ + Determine whether a specific cluster should be skipped based on the provided configuration. + + Parameters + ---------- + cluster : str + The name of the cluster to check. + config : dict + A dictionary containing various configuration settings. + + Returns + ------- + bool + True if the cluster should be skipped, False otherwise. + + Notes + ----- + The function evaluates several conditions to decide if the cluster should be skipped: + 1. If `run_only` in the cluster configuration is set to "last_run_in_chunk" and `last_run_in_chunk` in the general configuration is not True. + 2. If `run_only` in the cluster configuration is set to "first_run_in_chunk" and `first_run_in_chunk` in the general configuration is not True. + 3. If `skip_chunk_number` in the cluster configuration matches `chunk_number` in the general configuration. + 4. If `skip_run_number` in the cluster configuration matches `run_number` in the general configuration. + + If none of these conditions are met, the function returns False, indicating that the cluster should not be skipped. + """ + general_config = config["general"] + workflow_config = general_config["workflow"] + cluster_config = workflow_config["clusters"][cluster] + + run_only_on = cluster_config.get("run_only") + + if run_only_on == "last_run_in_chunk" and not general_config.get( + "last_run_in_chunk", False + ): + return True + + if run_only_on == "first_run_in_chunk" and not general_config.get( + "first_run_in_chunk", False + ): + return True + + skip_chunk_number = cluster_config.get("skip_chunk_number") + if skip_chunk_number == general_config.get("chunk_number"): + return True + + skip_run_number = cluster_config.get("skip_run_number") + if skip_run_number == general_config.get("run_number"): + return True + + return False + + +def assemble_workflow(config): + config = init_total_workflow(config) + config = collect_all_workflow_information(config) + config = complete_clusters(config) + config = order_clusters(config) + config = prepend_newrun_job(config) + config = handle_unknown_jobtype(config) + return config + + +def handle_unknown_jobtype(config): + """ + Update the jobtype in the configuration if it is set to 'unknown'. + + Args: + config (dict): The configuration dictionary. + + Returns: + dict: The updated configuration dictionary. + """ + if config["general"]["jobtype"] == "unknown": + first_task = config["general"]["workflow"]["first_task_in_queue"] + config["general"]["command_line_config"]["jobtype"] = first_task + config["general"]["jobtype"] = first_task + return config + + +def display_nicely(config): + esm_parser.pprint_config(config["general"]["workflow"]) + return config + + +def prepend_newrun_job(config): + gw_config = config["general"]["workflow"] + first_cluster_name = gw_config["first_task_in_queue"] + first_cluster = gw_config["clusters"][first_cluster_name] + + if not first_cluster.get("submission_type", "Error") == "sim_object": + last_cluster_name = gw_config["last_task_in_queue"] + last_cluster = gw_config["clusters"][last_cluster_name] + + new_first_cluster_name = "newrun" + new_first_cluster = { + "newrun": { + "called_from": last_cluster_name, + "run_before": first_cluster_name, + "next_submit": [first_cluster_name], + "jobs": ["newrun_general"], + "submission_type": "sim_object", + } + } + + last_cluster["next_submit"].append("newrun") + last_cluster["next_submit"].remove(first_cluster_name) + + first_cluster["called_from"] = "newrun" + + gw_config["first_task_in_queue"] = "newrun" + + new_job = { + "newrun_general": { + "nproc": 1, + "called_from": last_cluster_name, + "run_before": first_cluster_name, + "next_submit": [first_cluster_name], + "job_cluster": "newrun", + } + } + + gw_config["clusters"].update(new_first_cluster) + gw_config["jobs"].update(new_job) + + return config + + +def order_clusters(config): + gw_config = config["general"]["workflow"] + + initialize_next_submit(gw_config) + validate_and_set_dependencies(gw_config) + handle_next_run_triggered_by_and_last_task_in_queue(gw_config) + ensure_first_and_last_clusters_linked(gw_config) + + return config + + +def initialize_next_submit(gw_config): + for job_cluster in gw_config["clusters"]: + if "next_submit" not in gw_config["clusters"][job_cluster]: + gw_config["clusters"][job_cluster]["next_submit"] = [] + + +def validate_and_set_dependencies(gw_config): + for job_cluster in gw_config["clusters"]: + cluster_config = gw_config["clusters"][job_cluster] + if "run_after" not in cluster_config and "run_before" not in cluster_config: + raise WorkflowValidateError( + f"Don't know when to execute cluster {job_cluster}.", gw_config + ) + + if "run_after" in cluster_config: + validate_run_after(cluster_config, job_cluster, gw_config) + if "run_before" in cluster_config: + validate_run_before(cluster_config, job_cluster, gw_config) + + +def validate_run_after(cluster_config, job_cluster, gw_config): + if "run_before" in cluster_config: + raise WorkflowValidateError( + f"Specifying both run_after and run_before for cluster {job_cluster} may lead to problems. Please choose.", + gw_config, + ) + + calling_cluster = cluster_config["run_after"] + if calling_cluster not in gw_config["clusters"]: + raise WorkflowValidateError( + f"Validate run after -- Unknown cluster {calling_cluster}.", gw_config + ) + + append_to_next_submit(gw_config, calling_cluster, job_cluster) + cluster_config["called_from"] = calling_cluster + + if calling_cluster == gw_config["last_task_in_queue"]: + gw_config["last_task_in_queue"] = job_cluster + + +def validate_run_before(cluster_config, job_cluster, gw_config): + called_cluster = cluster_config["run_before"] + if called_cluster not in gw_config["clusters"]: + raise WorkflowValidateError( + f"Validate run before -- Unknown cluster {called_cluster}.", gw_config + ) + + append_to_next_submit(gw_config, job_cluster, called_cluster) + gw_config["clusters"][called_cluster]["called_from"] = job_cluster + + if called_cluster == gw_config["first_task_in_queue"]: + gw_config["first_task_in_queue"] = job_cluster + + +def append_to_next_submit(gw_config, source_cluster, target_cluster): + if target_cluster not in gw_config["clusters"][source_cluster]["next_submit"]: + gw_config["clusters"][source_cluster]["next_submit"].append(target_cluster) + + +def ensure_first_and_last_clusters_linked(gw_config): + first_cluster_name = gw_config["first_task_in_queue"] + first_cluster = gw_config["clusters"][first_cluster_name] + last_cluster_name = gw_config["last_task_in_queue"] + last_cluster = gw_config["clusters"][last_cluster_name] + + if first_cluster_name not in last_cluster.get("next_submit", ["Error"]): + last_cluster["next_submit"].append(first_cluster_name) + if last_cluster_name not in first_cluster.get("called_from", ["Error"]): + first_cluster["called_from"] = last_cluster_name + + +def complete_clusters(config): + gw_config = config["general"]["workflow"] + + complete_job_cluster_assignments(gw_config) + complete_resource_information(gw_config) + + return config + + +def complete_job_cluster_assignments(gw_config): + """ + Assigns jobs to their respective job clusters in the workflow configuration. + + Parameters + ---------- + gw_config : dict + The general workflow configuration dictionary containing jobs and job clusters. + + Notes + ----- + This function iterates over all jobs in the workflow configuration and assigns each job + to its specified job cluster. If the job cluster does not already exist in the configuration, + it is created. Each job is then appended to the list of jobs within its respective job cluster. + """ + for job in gw_config["jobs"]: + job_cluster = gw_config["jobs"][job]["job_cluster"] + if job_cluster not in gw_config["clusters"]: + gw_config["clusters"][job_cluster] = {} + + if "jobs" not in gw_config["clusters"][job_cluster]: + gw_config["clusters"][job_cluster]["jobs"] = [] + + gw_config["clusters"][job_cluster]["jobs"].append(job) + + +def complete_resource_information(gw_config): + """ + Completes the resource information for each job cluster in the workflow configuration. + + Parameters + ---------- + gw_config : dict + The general workflow configuration dictionary containing jobs and job clusters. + + Notes + ----- + This function iterates over all job clusters in the workflow configuration and performs the following tasks: + - Merges individual configuration entries from jobs into their respective job clusters. + - Determines whether each job cluster should be submitted to a batch system or run as a shell script. + - Ensures that necessary information such as target queue and order in cluster is present. + - Calculates the total number of processors required for each job cluster based on the order in cluster. + - Sets default values for missing configuration entries. + """ + for job_cluster in gw_config["clusters"]: + clusterconf = gw_config["clusters"][job_cluster] + nproc_sum, nproc_max = calculate_nproc(clusterconf, gw_config) + set_default_job_values(clusterconf) + clusterconf["nproc"] = ( + nproc_sum if clusterconf["order_in_cluster"] == "concurrent" else nproc_max + ) + + +def calculate_nproc(clusterconf, gw_config): + """ + Calculates the total and maximum number of processors required for a job cluster. + + Parameters + ---------- + clusterconf : dict + The configuration dictionary for a specific job cluster. + gw_config : dict + The general workflow configuration dictionary containing jobs and job clusters. + + Returns + ------- + nproc_sum : int + The sum of processors required for all jobs in the cluster. + nproc_max : int + The maximum number of processors required for any single job in the cluster. + """ + nproc_sum = nproc_max = 0 + for job in clusterconf["jobs"]: + jobconf = gw_config["jobs"][job] + merge_job_entries(clusterconf, jobconf) + nproc_sum += jobconf.get("nproc", 1) + nproc_max = max(jobconf.get("nproc", 1), nproc_max) + return nproc_sum, nproc_max + + +def merge_job_entries(clusterconf, jobconf): + """ + Merges individual configuration entries from a job into its respective job cluster. + + Parameters + ---------- + clusterconf : dict + The configuration dictionary for a specific job cluster. + jobconf : dict + The configuration dictionary for a specific job. + """ + keys_to_merge = [ + "submit_to_batch_system", + "order_in_cluster", + "run_on_queue", + "run_after", + "run_before", + "run_only", + "skip_run_number", + "skip_chunk_number", + ] + for key in keys_to_merge: + clusterconf = merge_single_entry_if_possible(key, jobconf, clusterconf) + + if jobconf.get("submit_to_batch_system", False): + clusterconf["submission_type"] = "batch" + elif jobconf.get("script", False): + clusterconf["submission_type"] = "shell" + + +def set_default_job_values(clusterconf): + """ + Sets default values for missing configuration entries in a job cluster. + + Parameters + ---------- + clusterconf : dict + The configuration dictionary for a specific job cluster. + """ + if "submit_to_batch_system" not in clusterconf: + clusterconf["submit_to_batch_system"] = False + else: + if "run_on_queue" not in clusterconf: + raise WorkflowUnknownQueueError( + f"Information on target queue is missing in cluster {clusterconf}." + ) + + if not clusterconf.get("submission_type", False): + clusterconf["submission_type"] = "sim_object" + + if "order_in_cluster" not in clusterconf: + clusterconf["order_in_cluster"] = "sequential" + + +def merge_single_entry_if_possible(entry, sourceconf, targetconf): + if entry in sourceconf: + if entry in targetconf and not sourceconf[entry] == targetconf[entry]: + raise WorkflowMismatchError( + f"Mismatch found in {entry} for cluster {targetconf}" + ) + targetconf[entry] = sourceconf[entry] + return targetconf + + +def calculate_tasks(config): + """ + Calculate the total number of tasks based on the configuration. + + Parameters + ---------- + config : dict + The configuration dictionary containing model information. + + Returns + ------- + int + The total number of tasks calculated from the model configurations. + """ + tasks = 0 + for model in config["general"]["valid_model_names"]: + if "nproc" in config[model]: + tasks += config[model]["nproc"] + elif "nproca" in config[model] and "nprocb" in config[model]: + tasks += config[model]["nproca"] * config[model]["nprocb"] + if "nprocar" in config[model] and "nprocbr" in config[model]: + if ( + config[model]["nprocar"] != "remove_from_namelist" + and config[model]["nprocbr"] != "remove_from_namelist" + ): + tasks += config[model]["nprocar"] * config[model]["nprocbr"] + return tasks + + +def create_prepcompute_stage(): + """ + Create the prepcompute stage configuration for the workflow manager. + + Returns + ------- + dict + The configuration dictionary for the prepcompute stage. + """ + return { + "prepcompute": { + "nproc": 1, + "run_before": "compute", + } + } + + +def create_compute_stage(tasks, config): + """ + Create the compute stage configuration for the workflow manager + + Parameters + ---------- + tasks : int + The total number of tasks to be used in the compute stage. + config : dict + The configuration dictionary containing general settings. + + Returns + ------- + dict + The configuration dictionary for the compute stage. + """ + return { + "compute": { + "nproc": tasks, + "run_before": "tidy", + "submit_to_batch_system": config["general"].get( + "submit_to_batch_system", True + ), + "run_on_queue": config["computer"]["partitions"]["compute"]["name"], + } + } + + +def create_tidy_stage(): + """ + Create the tidy stage configuration for the workflow manager + + Returns + ------- + dict + The configuration dictionary for the tidy stage. + """ + return { + "tidy": { + "nproc": 1, + "run_after": "compute", + } + } + + +def init_total_workflow(config): + """ + Initialize and configure the total workflow based on the given configuration. + + Parameters + ---------- + config : dict + The configuration dictionary containing workflow settings. + + Returns + ------- + dict + The updated configuration dictionary with the initialized workflow. + + Example + ------- + >>> config = { + ... "general": { + ... "workflow": { + ... "clusters": {}, + ... "jobs": {} + ... } + ... }, + ... } + >>> updated_config = init_total_workflow(config) + >>> print(updated_config) + { + "general": { + "workflow": { + "clusters": {}, + "jobs": { + "prepcompute": {...}, + "compute": {...}, + "tidy": {...} + }, + "last_task_in_queue": "tidy", + "first_task_in_queue": "prepcompute", + "next_run_triggered_by": "tidy" + } + }, + } + """ + tasks = calculate_tasks(config) + + prepcompute = create_prepcompute_stage() + compute = create_compute_stage(tasks, config) + tidy = create_tidy_stage() + + workflow = config["general"].setdefault("workflow", {}) + workflow.setdefault("clusters", {}) + jobs = workflow.setdefault("jobs", {}) + + jobs.update(prepcompute) + jobs.update(compute) + jobs.update(tidy) + + workflow.setdefault("last_task_in_queue", "tidy") + workflow.setdefault("first_task_in_queue", "prepcompute") + workflow.setdefault("next_run_triggered_by", "tidy") + + return config + + +def merge_jobs(w_config, gw_config, model): + """ + Merge jobs from model-specific workflow configuration into the general workflow configuration. + + Parameters + ---------- + w_config : dict + The model-specific workflow configuration. + gw_config : dict + The general workflow configuration. + model : str + The name of the model. + """ + logger.critical(f"{model=}") + if "jobs" in w_config: + for job in list(copy.deepcopy(w_config["jobs"])): + logger.critical(job) + gw_config["jobs"][job + "_" + model] = copy.deepcopy(w_config["jobs"][job]) + if job in gw_config["jobs"]: + del gw_config["jobs"][job] + update_run_references(gw_config, job, model) + assign_job_cluster(gw_config, job, model) + + +def update_run_references(gw_config, job, model): + """ + Update run_after and run_before references to be model-specific. + + Parameters + ---------- + gw_config : dict + The general workflow configuration. + job : str + The name of the job. + model : str + The name of the model. + """ + for other_job in gw_config["jobs"]: + if "run_after" in gw_config["jobs"][other_job]: + if gw_config["jobs"][other_job]["run_after"] == job: + logger.critical("Updating run_after 001") + logger.critical("Old value: ") + logger.critical(gw_config["jobs"][other_job]["run_after"]) + gw_config["jobs"][other_job]["run_after"] = f"{job}_{model}" + logger.critical( + f"gw_config['jobs']['{other_job}']['run_after'] = {job}_{model}" + ) + if "run_before" in gw_config["jobs"][other_job]: + if gw_config["jobs"][other_job]["run_before"] == job: + logger.critical("Updating run_before 001") + logger.critical("Old value: ") + logger.critical(gw_config["jobs"][other_job]["run_before"]) + gw_config["jobs"][other_job]["run_before"] = f"{job}_{model}" + logger.critical( + f"gw_config['jobs']['{other_job}']['run_before'] = {job}_{model}" + ) + + +def assign_job_cluster(gw_config, job, model): + """ + Assign each job to a job cluster if not already assigned. + + Parameters + ---------- + gw_config : dict + The general workflow configuration. + job : str + The name of the job. + model : str + The name of the model. + """ + if "job_cluster" not in gw_config["jobs"][f"{job}_{model}"]: + gw_config["jobs"][f"{job}_{model}"]["job_cluster"] = job + + +def handle_next_run_triggered_by_and_last_task_in_queue(gw_config): + """ + Handle the next_run_triggered_by key in the workflow configuration. + + Parameters + ---------- + gw_config : dict + The general workflow configuration. + """ + if "next_run_triggered_by" in gw_config: + gw_config["last_task_in_queue"] = gw_config["next_run_triggered_by"] + + +def collect_all_workflow_information(config): + """ + Aggregates workflow configurations from all models into the general workflow config, + handling job renaming and reference updates. + """ + for model_name in config: + if "workflow" not in config[model_name]: + continue + + model_wf = config[model_name]["workflow"] + general_wf = config["general"]["workflow"] + + # Merge clusters first as jobs might depend on them + merge_clusters(model_wf, general_wf) + process_model_jobs(model_wf, general_wf, model_name) + handle_next_run_trigger(model_wf, general_wf) + + return config + + +def merge_clusters(source_wf, target_wf): + """Merge job clusters from model workflow into general workflow""" + for cluster_name, cluster_config in source_wf.get("clusters", {}).items(): + if cluster_name in target_wf["clusters"]: + target_wf["clusters"][cluster_name] = safe_merge( + cluster_config, target_wf["clusters"][cluster_name] + ) + else: + target_wf["clusters"][cluster_name] = copy.deepcopy(cluster_config) + + +def process_model_jobs(source_wf, target_wf, model_name): + """Process and merge jobs with model-specific naming and references""" + if "jobs" not in source_wf: + return + + rename_map = create_rename_mapping(source_wf["jobs"], model_name) + create_model_specific_jobs(source_wf, target_wf, model_name, rename_map) + update_workflow_references(target_wf, rename_map) + resolve_references_to_clusters(target_wf) + + +def resolve_references_to_clusters(workflow_config): + """Convert job references in dependencies to their parent clusters""" + job_to_cluster = { + job: conf["job_cluster"] for job, conf in workflow_config["jobs"].items() + } + + # Update references in ALL jobs + for job_conf in workflow_config["jobs"].values(): + for ref_type in ["run_after", "run_before"]: + if ref_type in job_conf: + job_conf[ref_type] = job_to_cluster.get( + job_conf[ref_type], job_conf[ref_type] + ) + + # Update references in CLUSTERS + for cluster_conf in workflow_config["clusters"].values(): + for ref_type in ["run_after", "run_before", "next_submit"]: + if ref_type in cluster_conf: + if isinstance(cluster_conf[ref_type], list): + cluster_conf[ref_type] = [ + job_to_cluster.get(name, name) + for name in cluster_conf[ref_type] + ] + else: + cluster_conf[ref_type] = job_to_cluster.get( + cluster_conf[ref_type], cluster_conf[ref_type] + ) + + +def create_rename_mapping(jobs, model_name): + """Create mapping from original job names to model-specific names""" + return {orig: f"{orig}_{model_name}" for orig in jobs} + + +def create_model_specific_jobs(source_wf, target_wf, model_name, rename_map): + """Create renamed job entries in general workflow""" + for orig_name, new_name in rename_map.items(): + target_wf["jobs"][new_name] = copy.deepcopy(source_wf["jobs"][orig_name]) + + # Remove original entry if present in general workflow + if orig_name in target_wf["jobs"]: + del target_wf["jobs"][orig_name] + + # Ensure cluster assignment + if "job_cluster" not in target_wf["jobs"][new_name]: + target_wf["jobs"][new_name]["job_cluster"] = orig_name + + +def update_workflow_references(target_wf, rename_map): + """Update references throughout workflow to use renamed jobs""" + # Update references in all jobs + for job_config in target_wf["jobs"].values(): + update_references_in_config(job_config, rename_map) + + # Update references in clusters + for cluster_config in target_wf["clusters"].values(): + update_references_in_config(cluster_config, rename_map) + + +def update_references_in_config(config, rename_map): + """Update references in a single configuration block""" + for ref_type in ["run_after", "run_before", "called_from"]: + if ref_type in config: + config[ref_type] = rename_map.get(config[ref_type], config[ref_type]) + + +def handle_next_run_trigger(source_wf, target_wf): + """Handle next_run_triggered_by inheritance with validation""" + if "next_run_triggered_by" in source_wf: + new_trigger = source_wf["next_run_triggered_by"] + current_trigger = target_wf.get("next_run_triggered_by", "tidy") + + if new_trigger != current_trigger and current_trigger != "tidy": + raise WorkflowMergeError( + f"Conflicting next_run_triggered_by: {current_trigger} vs {new_trigger}" + ) + + target_wf["next_run_triggered_by"] = new_trigger + + +def safe_merge(source, target): + """Safely merge two configurations with conflict checking""" + merged = copy.deepcopy(target) + for key, value in source.items(): + if key in merged and merged[key] != value: + raise WorkflowMergeError( + f"Conflict in key '{key}': {merged[key]} vs {value}" + ) + merged[key] = copy.deepcopy(value) + return merged + + +def merge_if_possible(source, target): + """ + Merge source dictionary into target dictionary, ensuring no conflicts. + + Parameters + ---------- + source : dict + The source dictionary to merge from. + target : dict + The target dictionary to merge into. + + Returns + ------- + dict + The updated target dictionary with merged entries from the source. + """ + for key, value in source.items(): + if key in target and target[key] != value: + raise WorkflowMismatchError( + f"Mismatch while trying to merge clusters {source} into {target}" + ) + target[key] = value + return target + + +class WorkflowError(Exception): + """Base exception for workflow configuration errors""" + + +class WorkflowMergeError(WorkflowError): + """Exception for workflow configuration merge conflicts""" + + +class WorkflowMismatchError(WorkflowError): + """Exception for workflow configuration mismatch errors""" + + +class WorkflowUnknownQueueError(WorkflowError): + """Exception for unknown target queue in workflow configuration""" + + +class WorkflowValidateError(WorkflowError): + """Exception for workflow configuration validation errors""" diff --git a/src/esm_runscripts/postprocess.py b/src/esm_runscripts/postprocess.py index 4ab1bb19c..d3793c2af 100644 --- a/src/esm_runscripts/postprocess.py +++ b/src/esm_runscripts/postprocess.py @@ -4,117 +4,16 @@ def run_job(config): - config["general"]["relevant_filetypes"] = [ - "log", - "mon", - "outdata", - "restart_out", - "bin", - "config", - "forcing", - "input", - "restart_in", - "ignore", - ] + """Runs the post-processing job""" helpers.evaluate(config, "postprocess", "post_recipe") return config -def _assemble_postprocess_tasks(config): - """ - Generates all tasks for post processing which will be written to the run file. +def convert_to_zarr(config): + """Converts the output to zarr format using xarray""" + if not config["general"].get("convert_to_zarr", False): + logger.info("Skipping conversion to zarr") + return config + logger.info("Converting output to zarr via xarray") - Parameters - ---------- - post_file - File handle to which information should be written. - - Returns - ------- - post_task_list : list - The list of post commands which will be executed. These are written - to the run file. - """ - postfile = config["general"]["post_file"] - - post_task_list = [] - for component in config["general"]["valid_model_names"]: - post_file.write(40 * "+ " + "\n") - post_file.write("Generating post-processing tasks for: %s \n" % component) - - post_task_list.append("\n#Postprocessing %s\n" % component) - post_task_list.append( - "cd " + config[component]["experiment_outdata_dir"] + "\n" - ) - - pconfig_tasks = config[component].get("postprocess_tasks", {}) - post_file.write("Configuration for post processing: %s \n" % pconfig_tasks) - for outfile in pconfig_tasks: - post_file.write("Generating task to create: %s \n" % outfile) - ofile_config = pconfig_tasks[outfile] - # TODO(PG): This can be cleaned up. I probably actually want a - # ChainMap here for more than just the bottom... - # - # Run CDO tasks (default) - task_definition = ( - config[component] - .get("postprocess_task_definitions", {}) - .get(ofile_config["post_process"]) - ) - method_definition = ( - config[component] - .get("postprocess_method_definitions", {}) - .get(task_definition["method"]) - ) - - program = method_definition.get("program", task_definition["method"]) - - possible_args = method_definition.get("possible_args", []) - required_args = method_definition.get("required_args", []) - - possible_flags = method_definition.get("possible_flags", []) - required_flags = method_definition.get("required_flags", []) - - outfile_flags = ofile_config.get("flags") - outfile_args = ofile_config.get("args") - - task_def_flags = task_definition.get("flags") - task_def_args = task_definition.get("args") - - args = collections.ChainMap(outfile_args, task_def_args) - flags = outfile_flags + task_def_flags - flags = ["-" + flag for flag in flags] - - # See here: https://stackoverflow.com/questions/21773866/how-to-sort-a-dictionary-based-on-a-list-in-python - all_call_things = { - "program": program, - "outfile": outfile, - **args, - "flags": flags, - } - logger.info(all_call_things) - index_map = {v: i for i, v in enumerate(method_definition["call_order"])} - call_list = sorted( - all_call_things.items(), key=lambda pair: index_map[pair[0]] - ) - call = [] - for call_id, call_part in call_list: - if isinstance(call_part, str): - call.append(call_part) - elif isinstance(call_part, list): - call.append(" ".join(call_part)) - else: - raise TypeError( - "Something straaaange happened. Consider starting the debugger." - ) - post_file.write(" ".join(call) + "\n") - post_task_list.append(" ".join(call)) - post_task_list.append("cd -\n") - config["general"]["post_task_list"] = post_task_list return config - - -# ????? -# def write_simple_postscript(config): -# batch_system.write_simple_runscript(config) -# return config diff --git a/src/esm_runscripts/prepare.py b/src/esm_runscripts/prepare.py index 9615091f1..04707a762 100644 --- a/src/esm_runscripts/prepare.py +++ b/src/esm_runscripts/prepare.py @@ -5,12 +5,12 @@ import questionary import yaml +from loguru import logger import esm_parser import esm_utilities from esm_calendar import Calendar, Date from esm_plugin_manager import install_missing_plugins -from loguru import logger from . import batch_system, helpers @@ -69,10 +69,56 @@ def _read_date_file(config): date = config["general"].get("initial_date", "18500101") run_number = 1 write_file = True + + date_c = config["general"].get("current_date", None) + + if date_c is not None: + date_fdf = Date(date) + date_c = Date(str(config["general"]["current_date"])) + run_number_c = int(config["general"]["run_number"]) + last_jobtype = config["general"].get("last_jobtype", "") + isresubmitted = last_jobtype == config["general"]["jobtype"] + + if date_fdf != date_c: + msg = ( + f"``Date`` and ``run_number`` are ``not`` taken from date file, " + f"but from command_line argument (provided by -s or --start_date). " + f"The given start_date ({date_c}) and run_number ({run_number_c}) " + f"are different from the values " + f"in the current date file of your experiment ({date}, {run_number}). " + f"Your experiment may now be in a non consecutive state. " + f"Please confirm if you want to continue:" + ) + esm_parser.user_note("Detached experiment:", msg) + proceed = "" + if isresubmitted: + proceed = questionary.select( + "Do you want to continue?", + choices=[ + f"Yes, with date from command line argument: {str(config['general']['current_date'])}", + f"Yes, with date from date file: {date}", + "No, cancel.", + ], + ).ask() + + if "Yes, with date from command line argument" in proceed: + date = str(date_c) + run_number = run_number_c + elif "Yes, with date from date file" in proceed: + date = date + run_number = run_number + else: + esm_parser.user_note( + "The experiment will be cancelled:", + f"You cancelled the experiment due to date discrepancies.", + ) + sys.exit(1) + config["general"]["run_number"] = run_number config["general"]["current_date"] = date logging.info("current_date = %s", date) logging.info("run_number = %s", run_number) + return config @@ -93,7 +139,6 @@ def check_model_lresume(config): model, user_lresume, config, [], [] ) if isinstance(user_lresume, str): - if user_lresume == "0" or user_lresume.upper() == "FALSE": user_lresume = False elif user_lresume == "1" or user_lresume.upper() == "TRUE": @@ -275,7 +320,7 @@ def _initialize_calendar(config): if config["general"]["reset_calendar_to_last"]: config = find_last_prepared_run(config) config = set_most_dates(config) - if not "iterative_coupling" in config["general"]: + if "iterative_coupling" not in config["general"]: config["general"]["chunk_number"] = 1 if config["general"]["run_number"] == 1: @@ -347,7 +392,7 @@ def set_leapyear(config): config["general"]["leapyear"] = config[model]["leapyear"] break - if not "leapyear" in config["general"]: + if "leapyear" not in config["general"]: for model in config["general"]["valid_model_names"]: config[model]["leapyear"] = True config["general"]["leapyear"] = True @@ -355,7 +400,6 @@ def set_leapyear(config): def set_overall_calendar(config): - # set the overall calendar if config["general"]["leapyear"]: config["general"]["calendar"] = Calendar(1) @@ -365,7 +409,6 @@ def set_overall_calendar(config): def find_last_prepared_run(config): - calendar = config["general"]["calendar"] current_date = Date(config["general"]["current_date"], calendar) initial_date = Date(config["general"]["initial_date"], calendar) @@ -409,7 +452,6 @@ def find_last_prepared_run(config): def set_most_dates(config): - calendar = config["general"]["calendar"] if isinstance(config["general"]["current_date"], Date): current_date = config["general"]["current_date"] @@ -471,23 +513,25 @@ def _add_all_folders(config): all_filetypes = [ "analysis", "config", + "couple", + "ignore", "log", "mon", - "couple", + "post", "scripts", - "ignore", - "unknown", "src", + "unknown", ] config["general"]["out_filetypes"] = [ "analysis", + "ignore", "log", "mon", - "scripts", - "ignore", - "unknown", "outdata", + "post", "restart_out", + "scripts", + "unknown", ] config["general"]["in_filetypes"] = [ "scripts", @@ -500,7 +544,7 @@ def _add_all_folders(config): config["general"]["reusable_filetypes"] = config["general"].get( "reusable_filetypes", ["bin", "src"] ) - # Define the files that could be reusable accross runs (external files) + # Define the files that could be reusable across runs (external files) config["general"]["potentially_reusable_filetypes"] = ( all_filetypes + config["general"]["in_filetypes"] ) @@ -533,16 +577,17 @@ def _add_all_folders(config): "analysis", "bin", "config", + "couple", "forcing", + "ignore", "input", - "couple", "log", "mon", "outdata", + "post", "restart_in", "restart_out", "viz", - "ignore", ] config["general"]["all_model_filetypes"] = all_model_filetypes @@ -632,39 +677,39 @@ def set_parent_info(config): # Make sure "ini_parent_dir" and "ini_restart_dir" both work: for model in config["general"]["valid_model_names"]: # If only ini_restart_* variables are used in runcscript, set ini_parent_* to the same values - if not "ini_parent_dir" in config[model]: + if "ini_parent_dir" not in config[model]: if "ini_restart_dir" in config[model]: config[model]["ini_parent_dir"] = config[model]["ini_restart_dir"] - if not "ini_parent_exp_id" in config[model]: + if "ini_parent_exp_id" not in config[model]: if "ini_restart_exp_id" in config[model]: config[model]["ini_parent_exp_id"] = config[model]["ini_restart_exp_id"] - if not "ini_parent_date" in config[model]: + if "ini_parent_date" not in config[model]: if "ini_restart_date" in config[model]: config[model]["ini_parent_date"] = config[model]["ini_restart_date"] # check if parent is defined in esm_tools style # (only given for setup) setup = config["general"]["setup_name"] - if not setup in config: + if setup not in config: setup = "general" if "ini_parent_exp_id" in config[setup]: for model in config["general"]["valid_model_names"]: - if not "ini_parent_exp_id" in config[model]: + if "ini_parent_exp_id" not in config[model]: config[model]["ini_parent_exp_id"] = config[setup]["ini_parent_exp_id"] if "ini_parent_date" in config[setup]: for model in config["general"]["valid_model_names"]: - if not "ini_parent_date" in config[model]: + if "ini_parent_date" not in config[model]: config[model]["ini_parent_date"] = config[setup]["ini_parent_date"] if "ini_parent_dir" in config[setup]: for model in config["general"]["valid_model_names"]: - if not "ini_parent_dir" in config[model]: + if "ini_parent_dir" not in config[model]: config[model]["ini_parent_dir"] = ( config[setup]["ini_parent_dir"] + "/" + model ) # Get correct parent info for model in config["general"]["valid_model_names"]: - if config[model]["lresume"] == True and config["general"]["run_number"] == 1: + if config[model]["lresume"] is True and config["general"]["run_number"] == 1: config[model]["parent_expid"] = config[model]["ini_parent_exp_id"] if "parent_date" not in config[model]: config[model]["parent_date"] = config[model]["ini_parent_date"] @@ -793,7 +838,6 @@ def finalize_config(config): def add_submission_info(config): - bs = batch_system.batch_system(config, config["computer"]["batch_system"]) submitted = bs.check_if_submitted() @@ -808,7 +852,6 @@ def add_submission_info(config): def initialize_batch_system(config): - config["general"]["batch"] = batch_system.batch_system( config, config["computer"]["batch_system"] ) @@ -817,7 +860,7 @@ def initialize_batch_system(config): def initialize_coupler(config): - if config["general"]["standalone"] == False: + if config["general"]["standalone"] is False: from . import coupler base_dir = config["general"]["base_dir"] diff --git a/src/esm_runscripts/prepexp.py b/src/esm_runscripts/prepexp.py index b0fbe44f5..0d50c12b5 100644 --- a/src/esm_runscripts/prepexp.py +++ b/src/esm_runscripts/prepexp.py @@ -6,10 +6,10 @@ import questionary from colorama import Fore +from loguru import logger import esm_parser import esm_tools -from loguru import logger from . import filelists from .helpers import end_it_all, evaluate, write_to_log @@ -181,7 +181,7 @@ def _call_esm_runscripts_internally(config, command, exedir): non_interaction_flags = [ "--no-motd", f"--last-jobtype {config['general']['jobtype']}", - f"-t {config['general']['jobtype']}", + f"--phase {config['general']['jobtype']}", ] for ni_flag in non_interaction_flags: # prevent continuous addition of ``ni_flag`` @@ -355,9 +355,15 @@ def initialize_experiment_logfile(config): it_coupled_model = config["general"]["iterative_coupled_model"] datestamp = config["general"]["run_datestamp"] - if config["general"]["run_number"] == 1: - if os.path.isfile(config["general"]["experiment_log_file"]): - os.remove(config["general"]["experiment_log_file"]) + fromdir = os.path.realpath(config["general"]["started_from"]) + scriptsdir = os.path.realpath(config["general"]["experiment_scripts_dir"]) + +# if (fromdir == scriptsdir): + # TODO: Check the next if statements + if not config["general"]["run_from_batch_script"]: + #if config["general"]["run_number"] == 1: + #if os.path.isfile(config["general"]["experiment_log_file"]): + # os.remove(config["general"]["experiment_log_file"]) log_msg = f"# Beginning of Experiment {expid}" write_to_log(config, [log_msg], message_sep="") @@ -369,7 +375,7 @@ def initialize_experiment_logfile(config): str(config["general"]["run_number"]), str(config["general"]["current_date"]), str(config["general"]["jobid"]), - "- start", + "- start in prepexp", ], ) diff --git a/src/esm_runscripts/prev_run.py b/src/esm_runscripts/prev_run.py index 7583872e9..9033083fa 100644 --- a/src/esm_runscripts/prev_run.py +++ b/src/esm_runscripts/prev_run.py @@ -3,16 +3,16 @@ import questionary import yaml +from loguru import logger import esm_parser from esm_calendar import Calendar, Date -from loguru import logger class PrevRunInfo(dict): """ A dictionary subclass to access information from the previous run. The object is - created in the ``SimulationSetup`` class in ``self.config["prev_run"]``. The idea + created in the ``Simulation`` class in ``self.config["prev_run"]``. The idea behind this class is that variables from the previous run can be called from the yaml files with the same syntax as one would do for the current run. @@ -64,7 +64,6 @@ def __init__(self, config={}, prev_config=None): # Counter for debuggin self._prev_config_count = 0 - def components_with_prev_run(self): """ Lists components containning variables using the ``prev_run`` feature. Reading diff --git a/src/esm_runscripts/resubmit.py b/src/esm_runscripts/resubmit.py index 1b6130afd..7adbf2f29 100644 --- a/src/esm_runscripts/resubmit.py +++ b/src/esm_runscripts/resubmit.py @@ -1,4 +1,5 @@ import os +import pdb from loguru import logger @@ -6,6 +7,17 @@ def submit(config): + """ + Submits a jobscript to the batch system by calling os.system + + Arguments + --------- + config : dict + + Returns + ------- + config : dict + """ logger.debug("\n", 40 * "+ ") logger.info("Submitting jobscript to batch system...") logger.info(f"Output written by {config['computer']['batch_system']}:") @@ -18,15 +30,43 @@ def submit(config): def resubmit_batch_or_shell(config, batch_or_shell, cluster=None): + """ + - Creates a submit_commant and sets it to config depending on kind of submission (batch or shell) + - Calls function submit to acually submitting the shell or batch command + + Arguments + --------- + config : dict + batch_or_shell : Bool + cluster : (optional) + + Returns + ------- + config : dict + """ + config = config["general"]["batch"].write_simple_runscript( config, cluster, batch_or_shell ) + # Checks, if not submitted with option -c in esm_runscript call (check run) if not check_if_check(config): config = submit(config) return config -def resubmit_SimulationSetup(config, cluster=None): +def resubmit_Simulation(config, cluster=None): + """ + Resubmitting a workflow phase/cluster that is of type SimulationSetup + - Initialize the cluster as a new SimulationSetup object + + Arguments + --------- + config : dict + cluster : str (optional: name of cluster) + Returns + ------- + config : dict + """ monitor_file = logfiles.logfile_handle # Jobs that should be started directly from the compute job: @@ -41,9 +81,9 @@ def resubmit_SimulationSetup(config, cluster=None): # NOTE(PG) Non top level import to avoid circular dependency: os.chdir(config["general"]["started_from"]) - from .sim_objects import SimulationSetup + from .sim_objects import Simulation - cluster_obj = SimulationSetup(command_line_config) + cluster_obj = Simulation(command_line_config) monitor_file.write(f"{cluster} object built....\n") @@ -56,6 +96,7 @@ def resubmit_SimulationSetup(config, cluster=None): cluster_obj.config[f"{cluster}_update_{jobtype}_config_before_resubmit"] ) + # Checks, if not submitted with option -c in esm_runscript call (check run) if not check_if_check(config): monitor_file.write(f"Calling {cluster} job:\n") config["general"]["experiment_over"] = cluster_obj(kill_after_submit=False) @@ -64,16 +105,31 @@ def resubmit_SimulationSetup(config, cluster=None): def get_submission_type(cluster, config): - # Figure out if next job is resubmitted to batch system, - # just executed in shell or invoked as new SimulationSetup - # object + """ + Figure out if next job is + - resubmitted to batch system, + - just executed in shell or + - invoked as new SimulationSetup object + + Arguments + --------- + cluster : str (name of cluster) + config : dict + + Returns + ------- + submission_type : str + """ clusterconf = config["general"]["workflow"]["subjob_clusters"][cluster] if clusterconf.get("submit_to_batch_system", False): submission_type = "batch" + # This information should come from the config of the cluster/workflow phase + # This information is given in batch_or_shell attribute of workflow phase/cluster + # TODO: Make this a function of workflow manager??? elif cluster in ["newrun", "prepcompute", "tidy", "inspect", "viz"]: - submission_type = "SimulationSetup" + submission_type = "Simulation" else: submission_type = "shell" @@ -81,6 +137,17 @@ def get_submission_type(cluster, config): def end_of_experiment(config): + """ + Checks if it is the end of the experiment. + + Arguments + --------- + config + + Returns + ------- + True or False + """ if config["general"]["next_date"] >= config["general"]["final_date"]: monitor_file = logfiles.logfile_handle monitor_file.write("Reached the end of the simulation, quitting...\n") @@ -91,6 +158,17 @@ def end_of_experiment(config): def end_of_experiment_all_models(config): + """ + Checks if end of experiment is reached and everything is done + + Arguments + --------- + config : dict + + Returns + ------- + True or False + """ index = 1 expid = config["general"]["expid"] while "model" + str(index) in config["general"]["original_config"]: @@ -126,6 +204,17 @@ def end_of_experiment_all_models(config): def check_if_check(config): + """ + Will check if esm_runscripts has been called with option -c (check run only) + + Arguments + --------- + config : dict + + Returns + ------- + True or False + """ if config["general"]["check"]: logger.info( "Actually not submitting anything, this job preparation was launched in 'check' mode (-c)." @@ -136,49 +225,126 @@ def check_if_check(config): def maybe_resubmit(config): - jobtype = config["general"]["jobtype"] - - nextrun = resubmit_recursively(config, jobtype=jobtype) - - if nextrun: # submit list contains stuff from next run - config = _increment_date_and_run_number(config) - config = _write_date_file(config) - - if end_of_experiment(config): - if config["general"].get("iterative_coupling", False): - if end_of_experiment_all_models(config): - return config - else: - # config = chunky_parts._update_chunk_date_file(config) - return config - - cluster = config["general"]["workflow"]["first_task_in_queue"] - nextrun = resubmit_recursively( - config, list_of_clusters=[cluster], nextrun_in=True + """ + If nextrun is started, + - calls funtion to increment date and run_number + - calls function to write new date file + If it recognizes that is was actually the last run + - returns if end of the experiment (if not iterative_coupling) + + Arguments + --------- + config : dict + + Returns + ------- + config : dict + """ + task = config["general"].get("task", None) + # if task is start, restart, run_workflow -> write new *.run file + # if end_of_experiment(config): + # print('test') + if task in ["start", "run_workflow", "restart"]: + jobtype = config["general"]["jobtype"] # current phase + workflow = config["general"]["workflow"]["object"] + default_cluster = workflow.default_cluster + phases = workflow.clusters[default_cluster]["phases"] + config = config["general"]["batch"].write_run_batch_script( + config, default_cluster, "batch" ) + print("Create *.run file") + + if not check_if_check(config): + config = submit(config) + + elif task in ["run_phase"]: + # check if phase type compute + print("execute a phase") + + # TODO: Check if run from *.run file + # TODO: Create *.run file + + # check if nextrun starts??? + # this resubmits any following jobtypes/phases until nextrun is true + # here nextrun is always set to true (if resubmit_recursively is finished) + + # cases: 1. it is the beginning of (next) run: + # - resubmit_recursively returns true but does not do anything except for returning true + # - check if end of simulation -> return + # - returns if iterative coupling, why ??? + # - if not end of simulation and not iterative_coupling -> calls itself again with nextrun_in=True which leads to case 2. + # 2. it is NOT the beginning if (next) run: + # it will start to loop over all remaining clusters to check if it can sumbit something (SimulationSetup, sbatch, shell) and do so, + # until first start of next run is reached. + # 3. nextrun is fals if no entries in next_submit for that particular jobtype/cluster + + # nextrun = resubmit_recursively(config, jobtype=jobtype) + + # if nextrun: # submit list contains stuff from next run + + # config = _increment_date_and_run_number(config) + # config = _write_date_file(config) + + # if end_of_experiment(config): + # if config["general"].get("iterative_coupling", False): + # # If not iterative coupling + # # check if end of experiment for all models + # # if not??? + # if end_of_experiment_all_models(config): + # return config + # else: + # # config = chunky_parts._update_chunk_date_file(config) + # return config + + # #cluster = config["general"]["workflow"]["first_task_in_queue"] + # ## For what is nextrun here nedded? + # #nextrun = resubmit_recursively( + # # config, list_of_clusters=[cluster], nextrun_in=True + # #) return config def resubmit_recursively(config, jobtype=None, list_of_clusters=None, nextrun_in=False): + """ + - Reads in a list of all clusters (next_submit) in a workflow of a given jobtype (if not passes as argument) + - Checks if cluster is going to be skipped + - Gets the submission_type of cluster and calls the corresponding resubmit function + - If cluster is skipped, calls this function again ??? + - What is nextrun_in for? What if true? If within a run??? + - When could cluster be first_task_in_queue and nextrun_in=true? + + Arguments + --------- + config : dict + jobtype : (optional) + list_of_clusters: (optional) + nextrun_in: (optional) + + Returns + ------- + nextrun : Boolean + """ nextrun = False - + # get a list of clusters that follow the current jobtype if not list_of_clusters: list_of_clusters = config["general"]["workflow"]["subjob_clusters"][ jobtype ].get("next_submit", []) for cluster in list_of_clusters: + # if beginning of next run if ( cluster == config["general"]["workflow"]["first_task_in_queue"] and not nextrun_in ): nextrun = True + # if not at the beginning of a run else: - if not workflow.skip_cluster(cluster, config): + if not workflow.should_skip_cluster(cluster, config): submission_type = get_submission_type(cluster, config) - if submission_type == "SimulationSetup": - resubmit_SimulationSetup(config, cluster) + if submission_type == "Simulation": + resubmit_Simulation(config, cluster) elif submission_type in ["batch", "shell"]: resubmit_batch_or_shell(config, submission_type, cluster) else: @@ -191,6 +357,20 @@ def resubmit_recursively(config, jobtype=None, list_of_clusters=None, nextrun_in def _increment_date_and_run_number(config): + """ + - Incrementing + - date by adding "delta_date" to "cuirrent_date" + - run_number by adding +1 + - Updating config + + Arguments + --------- + config : dict + + Returns + ------- + config : dict + """ config["general"]["run_number"] += 1 config["general"]["current_date"] += config["general"]["delta_date"] @@ -208,6 +388,17 @@ def _increment_date_and_run_number(config): def _write_date_file(config): # self, date_file=None): + """ + Writes new date file for experiment. + + Arguments + --------- + config : dict + + Returns + ------- + config : dict + """ # monitor_file = config["general"]["logfile"] monitor_file = logfiles.logfile_handle diff --git a/src/esm_runscripts/sim_objects.py b/src/esm_runscripts/sim_objects.py index 0b257bced..51910ddc4 100644 --- a/src/esm_runscripts/sim_objects.py +++ b/src/esm_runscripts/sim_objects.py @@ -2,21 +2,21 @@ Documentation goes here """ +import pdb import sys from loguru import logger -import esm_parser from esm_tools import __version__ from . import (config_initialization, helpers, logfiles, prepare, prepexp, - prev_run, resubmit, workflow) + prev_run, resubmit) -class SimulationSetup(object): +class Simulation: def __init__(self, command_line_config=None, user_config=None): """ - Initializes the ``SimulationSetup`` object, and prepares the ``self.config`` by + Initializes the ``Simulation`` object, and prepares the ``self.config`` by taking the information from the ``command_line_config`` and/or the ``user_config`` and expanding it with the configuration files from `ESM-Tools` (in `esm_tools/configs`), and then running the ``prepare`` recipe. In essence, @@ -56,7 +56,7 @@ def __init__(self, command_line_config=None, user_config=None): # 1. Check that at least one input is given if not command_line_config and not user_config: raise ValueError( - "SimulationSetup needs to be initialized with either " + "Simulation needs to be initialized with either " "command_line_config or user_config." ) @@ -121,43 +121,43 @@ def __call__(self, kill_after_submit=True): # self.pseudocall(kill_after_submit) # call to observe here.. org_jobtype = str(self.config["general"]["jobtype"]) - self.config = logfiles.initialize_logfiles(self.config, org_jobtype) - - if self.config["general"]["submitted"]: - old_stdout = sys.stdout - old_stderr = sys.stderr - sys.stdout = logfiles.logfile_handle - sys.stderr = logfiles.logfile_handle - - if self.config["general"]["jobtype"] == "prepcompute": - self.prepcompute() - elif self.config["general"]["jobtype"] == "tidy": - self.tidy() - elif self.config["general"]["jobtype"] == "viz": - self.viz() - elif self.config["general"]["jobtype"].startswith("observe"): - pid = self.config["general"]["command_line_config"].get( - "launcher_pid", -666 - ) - if not pid == -666: - self.observe() - - self.config["general"]["jobtype"] = self.config["general"][ - "jobtype" - ].replace("observe_", "") - # that last line is necessary so that maybe_resubmit knows which - # cluster to look up in the workflow - - else: - self.assembler() + # write *.run file + # submit batch script resubmit.maybe_resubmit(self.config) - self.config = logfiles.finalize_logfiles(self.config, org_jobtype) - - if self.config["general"]["submitted"]: - sys.stdout = old_stdout - sys.stderr = old_stderr + if self.config["general"]["task"] == "run_phase": + # Writes to general log file and creates a (global) logile handle to logfile for current jobtype + self.config = logfiles.initialize_logfiles(self.config, org_jobtype) + + # if not check run??? + # set stdout and stderr to lofile + if self.config["general"]["submitted"]: + old_stdout = sys.stdout + old_stderr = sys.stderr + sys.stdout = logfiles.logfile_handle + sys.stderr = logfiles.logfile_handle + + if self.config["general"]["task"].startswith("observe"): + pid = self.config["general"]["command_line_config"].get( + "launcher_pid", -666 + ) + if not pid == -666: + self.observe() + else: + try: + getattr(self, self.config["general"]["jobtype"])() + except AttributeError: + print( + f"No method for jobtype {self.config['general']['jobtype']} found." + ) + + # if this line is reached, the run is submitted and running or finished + self.config = logfiles.finalize_logfiles(self.config, org_jobtype) + + if self.config["general"]["submitted"]: + sys.stdout = old_stdout + sys.stderr = old_stderr if kill_after_submit: if self.config["general"].get("experiment_over", False): @@ -166,6 +166,9 @@ def __call__(self, kill_after_submit=True): return self.config["general"].get("experiment_over", False) + # NOTE(PG): Alias for __call__ as run, since I find sim.run() more readable than sim() + run = __call__ + ######################### OBSERVE ############################################################# def observe(self): @@ -210,37 +213,27 @@ def inspect(self): ################################### PREPCOMPUTE ############################################################# def prepcompute(self): - """ - All steps needed for a model computation. - - Parameters - ---------- - kill_after_submit : bool - Default ``True``. If set, the entire Python instance is killed with - a ``sys.exit()`` as the very last after job submission. - """ + """Prepares for a compute job""" from . import prepcompute self.config = prepcompute.run_job(self.config) ################################### VIZ ############################################################# - def viz(self): - """ - Starts the Viz job. - - Parameters - ---------- - kill_after_submit: bool - Default ``True``. If set, the entire Python instance is killed with ``sys.exit()``. - """ + """Starts the Viz job.""" # NOTE(PG): Local import, not everyone will have viz yet... import esm_viz as viz self.config = viz.run_job(self.config) - ######################### HELPERS ############################################################# + ################################### POST ############################################################# + def post(self): + """Starts the Post job.""" + from . import postprocess + + self.config = postprocess.run_job(self.config) + ######################### HELPERS ############################################################# def store_prev_objects(self): self.config.prev_objects = ["prev_run"] self.config.prev_objects.extend( diff --git a/src/esm_runscripts/workflow-scratch.py b/src/esm_runscripts/workflow-scratch.py new file mode 100644 index 000000000..2d315e070 --- /dev/null +++ b/src/esm_runscripts/workflow-scratch.py @@ -0,0 +1,26 @@ +import copy +import os +import sys + +from loguru import logger + +import esm_parser + + +def assemble_workflow(config): + # + config = init_total_workflow(config) + config = collect_all_workflow_information(config) + config = complete_clusters(config) + config = order_clusters(config) + config = prepend_newrun_job(config) + + if config["general"]["jobtype"] == "unknown": + config["general"]["command_line_config"]["jobtype"] = config["general"][ + "workflow" + ]["first_task_in_queue"] + config["general"]["jobtype"] = config["general"]["workflow"][ + "first_task_in_queue" + ] + + return config diff --git a/src/esm_runscripts/workflow.py b/src/esm_runscripts/workflow.py index 336043a1e..8840b8f80 100644 --- a/src/esm_runscripts/workflow.py +++ b/src/esm_runscripts/workflow.py @@ -1,443 +1,222 @@ -import copy -import os -import sys - -import esm_parser +""" +The ESM-Tools Workflow Manager + +Terminology +----------- +job: An encapsulated esm-tools workflow such as tidy, prepexp, prepcompute, + designated by a Prefect Flow (using the @flow decorator) +cluster: A collection of jobs that are grouped together (flow of flows) --> all + written down in 1 HPC Job Scheduler script, better name soon... +task: A single unit of work within a job +""" + +from enum import Enum + +import matplotlib.pyplot as plt +import networkx as nx +import randomname from loguru import logger +from prefect import flow +from rich.columns import Columns +from rich.console import Console, ConsoleOptions, RenderResult +from rich.panel import Panel +from rich.text import Text -def skip_cluster(cluster, config): - gw_config = config["general"]["workflow"] - clusterconf = gw_config["subjob_clusters"][cluster] - - """ - print(f"run_only {clusterconf.get('run_only', 'Error') }") - print(f"skip_chunk_number {clusterconf.get('skip_chunk_number', -999)}") - print(f"skip_run_number {clusterconf.get('skip_run_number', -999)}") - print(f"chunk_number {config['general'].get('chunk_number', -998)}") - print(f"run_number {config['general'].get('run_number', -998)}") - print(f"last_run_in_chunk {config['general']['last_run_in_chunk']}") - print(f"first_run_in_chunk {config['general']['first_run_in_chunk']}") - """ - - if clusterconf.get("run_only", "Error") == "last_run_in_chunk" and not config[ - "general" - ].get("last_run_in_chunk", False): - return True - if clusterconf.get("run_only", "Error") == "first_run_in_chunk" and not config[ - "general" - ].get("first_run_in_chunk", False): - return True - if clusterconf.get("skip_chunk_number", -999) == config["general"].get( - "chunk_number", -998 - ): - return True - if clusterconf.get("skip_run_number", -999) == config["general"].get( - "run_number", -998 - ): - return True - - return False - - -def assemble_workflow(config): - # - config = init_total_workflow(config) - config = collect_all_workflow_information(config) - config = complete_clusters(config) - config = order_clusters(config) - config = prepend_newrun_job(config) - - if config["general"]["jobtype"] == "unknown": - config["general"]["command_line_config"]["jobtype"] = config["general"][ - "workflow" - ]["first_task_in_queue"] - config["general"]["jobtype"] = config["general"]["workflow"][ - "first_task_in_queue" - ] - - return config +class SubmissionType(Enum): + """Enum for the different submission types""" + BATCH = "batch" + SHELL = "shell" + SIM_OBJECT = "sim_object" -def display_nicely(config): - esm_parser.pprint_config(config["general"]["workflow"]) - return config - - -def prepend_newrun_job(config): - gw_config = config["general"]["workflow"] - first_cluster_name = gw_config["first_task_in_queue"] - first_cluster = gw_config["subjob_clusters"][first_cluster_name] - - if not first_cluster.get("batch_or_shell", "Error") == "SimulationSetup": - - last_cluster_name = gw_config["last_task_in_queue"] - last_cluster = gw_config["subjob_clusters"][last_cluster_name] - - new_first_cluster_name = "newrun" - new_first_cluster = { - "newrun": { - "called_from": last_cluster_name, - "run_before": first_cluster_name, - "next_submit": [first_cluster_name], - "subjobs": ["newrun_general"], - "batch_or_shell": "SimulationSetup", - } + def __rich__(self) -> str: + colors = { + SubmissionType.BATCH: "blue", + SubmissionType.SHELL: "green", + SubmissionType.SIM_OBJECT: "magenta", } - - last_cluster["next_submit"].append("newrun") - last_cluster["next_submit"].remove(first_cluster_name) - - first_cluster["called_from"] = "newrun" - - gw_config["first_task_in_queue"] = "newrun" - - new_subjob = { - "newrun_general": { - "nproc": 1, - "called_from": last_cluster_name, - "run_before": first_cluster_name, - "next_submit": [first_cluster_name], - "subjob_cluster": "newrun", - } + return Text(self.value, style=colors[self]) + + +class Workflow: + """A cyclable collection of clusters""" + + +class Cluster: + """A collection of jobs""" + + def __init__(self, jobs=None, name=None): + self.name = name or randomname.get_name() + self.jobs = jobs or [] + + self._call_order = [] + self.G = nx.DiGraph() + self.order_jobs() + + def order_jobs(self): + """Order jobs using topological sorting and detect execution levels.""" + self.G.clear() + job_lookup = {job.name: job for job in self.jobs} + + # Add nodes + for job in self.jobs: + self.G.add_node(job.name, submission_type=job.submission_type) + + # Add edges based on dependencies + for job in self.jobs: + if job.run_after: + for dep in job.run_after: + if dep not in job_lookup: + raise ValueError(f"Dependency '{dep}' not found in jobs!") + self.G.add_edge(job_lookup[dep].name, job.name) + + if job.run_before: + for dep in job.run_before: + if dep not in job_lookup: + raise ValueError(f"Dependency '{dep}' not found in jobs!") + self.G.add_edge(job.name, job_lookup[dep].name) + + # Detect cycles + if not nx.is_directed_acyclic_graph(self.G): + raise ValueError("Circular dependency detected in job ordering!") + + # Perform topological sort + sorted_job_names = list(nx.topological_sort(self.G)) + self._call_order = [job_lookup[name] for name in sorted_job_names] + + # Compute job levels (execution layers) + levels = {job_name: 0 for job_name in sorted_job_names} + for job in sorted_job_names: + for dep in self.G.predecessors(job): + levels[job] = max(levels[job], levels[dep] + 1) + + # Store levels as node attributes + nx.set_node_attributes(self.G, levels, "subset") + + # Create a dictionary of levels and their jobs + self._call_order_by_level = {} + for job_name, level in levels.items(): + if level not in self._call_order_by_level: + self._call_order_by_level[level] = [] + self._call_order_by_level[level].append(job_lookup[job_name]) + + logger.warning(f"Job execution order: {sorted_job_names}") + + def draw_graph(self): + """Draw the job dependency graph with execution levels and colored nodes.""" + plt.figure(figsize=(10, 6)) + + # Define colors for different SubmissionTypes + type_colors = { + SubmissionType.BATCH: "blue", + SubmissionType.SHELL: "green", + SubmissionType.SIM_OBJECT: "magenta", } + node_colors = [ + type_colors[self.G.nodes[node]["submission_type"]] for node in self.G.nodes + ] - gw_config["subjob_clusters"].update(new_first_cluster) - gw_config["subjobs"].update(new_subjob) - - return config - - # - - -def order_clusters(config): - gw_config = config["general"]["workflow"] - - for subjob_cluster in gw_config["subjob_clusters"]: - if "next_submit" not in gw_config["subjob_clusters"][subjob_cluster]: - gw_config["subjob_clusters"][subjob_cluster]["next_submit"] = [] - - for subjob_cluster in gw_config["subjob_clusters"]: - if "run_after" not in gw_config["subjob_clusters"][subjob_cluster]: - if not ("run_before" in gw_config["subjob_clusters"][subjob_cluster]): - - logger.error(f"Don't know when to execute cluster {subjob_cluster}.") - logger.error(gw_config) - sys.exit(-1) - - if "run_after" in gw_config["subjob_clusters"][subjob_cluster]: - if "run_before" in gw_config["subjob_clusters"][subjob_cluster]: - logger.error( - f"Specifying both run_after and run_before for cluster {subjob_cluster} may lead to problems." - ) - logger.error(f"Please choose.") - sys.exit(-1) - if ( - not gw_config["subjob_clusters"][subjob_cluster]["run_after"] - in gw_config["subjob_clusters"] - ): - logger.error( - f"Unknown cluster {gw_config['subjob_clusters'][subjob_cluster]['run_after']}." - ) - sys.exit(-1) - - calling_cluster = gw_config["subjob_clusters"][subjob_cluster]["run_after"] - - if ( - subjob_cluster - not in gw_config["subjob_clusters"][calling_cluster]["next_submit"] - ): - gw_config["subjob_clusters"][calling_cluster]["next_submit"].append( - subjob_cluster - ) - gw_config["subjob_clusters"][subjob_cluster][ - "called_from" - ] = calling_cluster - - if calling_cluster == gw_config["last_task_in_queue"]: - gw_config["last_task_in_queue"] = subjob_cluster - - if "run_before" in gw_config["subjob_clusters"][subjob_cluster]: - if ( - not gw_config["subjob_clusters"][subjob_cluster]["run_before"] - in gw_config["subjob_clusters"] - ): - logger.error( - f"Unknown cluster {gw_config['subjob_clusters'][subjob_cluster]['run_before']}." - ) - sys.exit(-1) - - called_cluster = gw_config["subjob_clusters"][subjob_cluster]["run_before"] - - if ( - called_cluster - not in gw_config["subjob_clusters"][subjob_cluster]["next_submit"] - ): - gw_config["subjob_clusters"][subjob_cluster]["next_submit"].append( - called_cluster - ) - gw_config["subjob_clusters"][called_cluster]["called_from"] = subjob_cluster - - if called_cluster == gw_config["first_task_in_queue"]: - gw_config["first_task_in_queue"] = subjob_cluster - - if "next_run_triggered_by" in gw_config: - gw_config["last_task_in_queue"] = gw_config["next_run_triggered_by"] - - first_cluster_name = gw_config["first_task_in_queue"] - first_cluster = gw_config["subjob_clusters"][first_cluster_name] - last_cluster_name = gw_config["last_task_in_queue"] - last_cluster = gw_config["subjob_clusters"][last_cluster_name] - - if first_cluster_name not in last_cluster.get("next_submit", ["Error"]): - last_cluster["next_submit"].append(first_cluster_name) - if last_cluster_name not in first_cluster.get("called_from", ["Error"]): - first_cluster["called_from"] = last_cluster_name - - return config - - -def complete_clusters(config): - gw_config = config["general"]["workflow"] - - # First, complete the matching subjobs <-> clusters - - for subjob in gw_config["subjobs"]: - subjob_cluster = gw_config["subjobs"][subjob]["subjob_cluster"] - if subjob_cluster not in gw_config["subjob_clusters"]: - gw_config["subjob_clusters"][subjob_cluster] = {} - - if "subjobs" not in gw_config["subjob_clusters"][subjob_cluster]: - gw_config["subjob_clusters"][subjob_cluster]["subjobs"] = [] - - gw_config["subjob_clusters"][subjob_cluster]["subjobs"].append(subjob) - - # Then, complete the resource information per cluster - # determine whether a cluster is to be submitted to a batch system - - for subjob_cluster in gw_config["subjob_clusters"]: - nproc_sum = nproc_max = 0 - clusterconf = gw_config["subjob_clusters"][subjob_cluster] - for subjob in clusterconf["subjobs"]: - subjobconf = gw_config["subjobs"][subjob] - - clusterconf = merge_single_entry_if_possible( - "submit_to_batch_system", subjobconf, clusterconf - ) - clusterconf = merge_single_entry_if_possible( - "order_in_cluster", subjobconf, clusterconf - ) - - if subjobconf.get("submit_to_batch_system", False): - clusterconf["batch_or_shell"] = "batch" - elif subjobconf.get("script", False): - clusterconf["batch_or_shell"] = "shell" - - clusterconf = merge_single_entry_if_possible( - "run_on_queue", subjobconf, clusterconf - ) - clusterconf = merge_single_entry_if_possible( - "run_after", subjobconf, clusterconf - ) - clusterconf = merge_single_entry_if_possible( - "run_before", subjobconf, clusterconf - ) - clusterconf = merge_single_entry_if_possible( - "run_only", subjobconf, clusterconf - ) - clusterconf = merge_single_entry_if_possible( - "skip_run_number", subjobconf, clusterconf - ) - clusterconf = merge_single_entry_if_possible( - "skip_chunk_number", subjobconf, clusterconf + # Position nodes based on execution level stored in "subset" + pos = nx.multipartite_layout(self.G, subset_key="subset") + + # Draw the graph + nx.draw( + self.G, + pos, + with_labels=True, + node_color=node_colors, + edge_color="gray", + arrows=True, + node_size=2000, + font_size=10, + ) + + plt.title(f"Dependency Graph for {self.name}") + plt.show() + + def add_job(self, job): + self.jobs.append(job) + self.order_jobs() + + def __rich_console__( + self, console: Console, options: ConsoleOptions + ) -> RenderResult: + job_panels = [] + for job in self.jobs: + job_panels.append(job) + job_columns = Columns(job_panels) + panel = Panel( + job_columns, + title="Cluster", + title_align="left", + subtitle=self.name, + border_style="blue", + ) + yield panel + + +class Job: + """One phase of esm-tools""" + + def __init__( + self, + name=None, + steps=None, + submission_type=None, + nproc=None, + run_after=None, + run_before=None, + script=None, + order_in_cluster="sequential", + ): + self.name = name or randomname.get_name() + self.steps = steps or [] + self.submission_type = submission_type or SubmissionType.SIM_OBJECT + self.nproc = nproc or 1 + self.run_after = run_after + self.run_before = run_before + self.script = script + self.order_in_cluster = order_in_cluster + + # Sanity checks: run_after and run_before must be lists + if self.run_after and not isinstance(self.run_after, list): + self.run_after = [self.run_after] + if self.run_before and not isinstance(self.run_before, list): + self.run_before = [self.run_before] + + # Sanity checks: steps must be callable + for step in self.steps: + if not callable(step): + raise ValueError(f"Step '{step}' is not callable!") + + # Sanity check: if submission type is script, script must be provided + if self.submission_type == SubmissionType.SHELL and not self.script: + raise ValueError("Submission type is shell but no script provided!") + + def __call__(self, config): + @flow(name=self.name) + def job_task(config): + for step in self.steps: + config = step(config) + return config + + return job_task(config) + + def __rich__(self) -> RenderResult: + step_list = Text() + for index, step in enumerate(self.steps): + step_list.append( + f"* [{index+1}/{len(self.steps)}] {step.__name__}\n", style="bold" ) - - nproc_sum += subjobconf.get("nproc", 1) - nproc_max = max(subjobconf.get("nproc", 1), nproc_max) - - if "submit_to_batch_system" not in clusterconf: - clusterconf["submit_to_batch_system"] = False - else: - if "run_on_queue" not in clusterconf: - logger.error( - f"Information on target queue is missing in cluster {clusterconf}." - ) - sys.exit(-1) - - if not clusterconf.get("batch_or_shell", False): - clusterconf["batch_or_shell"] = "SimulationSetup" - - if "order_in_cluster" not in clusterconf: - clusterconf["order_in_cluster"] = "sequential" - - if clusterconf["order_in_cluster"] == "concurrent": - nproc = nproc_sum - else: - nproc = nproc_max - clusterconf["nproc"] = nproc - - return config - - -def merge_single_entry_if_possible(entry, sourceconf, targetconf): - if entry in sourceconf: - if entry in targetconf and not sourceconf[entry] == targetconf[entry]: - logger.error(f"Mismatch found in {entry} for cluster {targetconf}") - sys.exit(-1) - targetconf[entry] = sourceconf[entry] - return targetconf - - -def init_total_workflow(config): - # add compute, tidy etc information already here! - - tasks = 0 - for model in config["general"]["valid_model_names"]: - if "nproc" in config[model]: - tasks += config[model]["nproc"] - elif "nproca" in config[model] and "nprocb" in config[model]: - tasks += config[model]["nproca"] * config[model]["nprocb"] - if "nprocar" in config[model] and "nprocbr" in config[model]: - if ( - config[model]["nprocar"] != "remove_from_namelist" - and config[model]["nprocbr"] != "remove_from_namelist" - ): - tasks += config[model]["nprocar"] * config[model]["nprocbr"] - - prepcompute = { - "prepcompute": { - "nproc": 1, - "run_before": "compute", - } - } - - compute = { - "compute": { - "nproc": tasks, - "run_before": "tidy", - "submit_to_batch_system": config["general"].get( - "submit_to_batch_system", True - ), - "run_on_queue": config["computer"]["partitions"]["compute"]["name"], - } - } - - # das ist nur vorübergehend - tidy = { - "tidy": { - "nproc": 1, - "run_after": "compute", - } - } - - if "workflow" not in config["general"]: - config["general"]["workflow"] = {} - if "subjob_clusters" not in config["general"]["workflow"]: - config["general"]["workflow"]["subjob_clusters"] = {} - if "subjobs" not in config["general"]["workflow"]: - config["general"]["workflow"]["subjobs"] = prepcompute - config["general"]["workflow"]["subjobs"].update(compute) - config["general"]["workflow"]["subjobs"].update(tidy) - else: - if "prepcompute" not in config["general"]["workflow"]["subjobs"]: - config["general"]["workflow"]["subjobs"].update(prepcompute) - if "compute" not in config["general"]["workflow"]["subjobs"]: - config["general"]["workflow"]["subjobs"].update(compute) - if "tidy" not in config["general"]["workflow"]["subjobs"]: - config["general"]["workflow"]["subjobs"].update(tidy) - if "last_task_in_queue" not in config["general"]["workflow"]: - config["general"]["workflow"]["last_task_in_queue"] = "tidy" - if "first_task_in_queue" not in config["general"]["workflow"]: - config["general"]["workflow"]["first_task_in_queue"] = "prepcompute" - - if "next_run_triggered_by" not in config["general"]["workflow"]: - config["general"]["workflow"]["next_run_triggered_by"] = "tidy" - - return config - - -def collect_all_workflow_information(config): - - for model in config: - if "workflow" in config[model]: - w_config = config[model]["workflow"] - gw_config = config["general"]["workflow"] - - if "subjob_clusters" in w_config: - for cluster in w_config["subjob_clusters"]: - if cluster in gw_config["subjob_clusters"]: - gw_config["subjob_clusters"][cluster] = merge_if_possible( - w_config["subjob_clusters"][cluster], - gw_config["subjob_clusters"][cluster], - ) - else: - gw_config["subjob_clusters"][cluster] = copy.deepcopy( - w_config["subjob_clusters"][cluster], - ) - - if "subjobs" in w_config: - ref_config = copy.deepcopy(w_config) - for subjob in list(copy.deepcopy(w_config["subjobs"])): - - # subjobs (other than clusters) should be model specific - gw_config["subjobs"][subjob + "_" + model] = copy.deepcopy( - w_config["subjobs"][subjob] - ) - if subjob in gw_config["subjobs"]: - del gw_config["subjobs"][subjob] - # make sure that the run_after and run_before refer to that cluster - for other_subjob in gw_config["subjobs"]: - if "run_after" in gw_config["subjobs"][other_subjob]: - if ( - gw_config["subjobs"][other_subjob]["run_after"] - == subjob - ): - gw_config["subjobs"][other_subjob][ - "run_after" - ] == subjob + "_" + model - if "run_before" in gw_config["subjobs"][other_subjob]: - if ( - gw_config["subjobs"][other_subjob]["run_before"] - == subjob - ): - gw_config["subjobs"][other_subjob][ - "run_before" - ] == subjob + "_" + model - - # if not in another cluster, each subjob gets its own - if ( - "subjob_cluster" - not in gw_config["subjobs"][subjob + "_" + model] - ): - gw_config["subjobs"][subjob + "_" + model][ - "subjob_cluster" - ] = subjob # + "_" + model - - if "next_run_triggered_by" in w_config: - if not gw_config["next_run_triggered_by"] in [ - "tidy", - w_config["next_run_triggered_by"], - ]: - logger.error( - f"Mismatch found setting next_run_triggered_by for workflow." - ) - sys.exit(-1) - else: - gw_config["next_run_triggered_by"] = w_config[ - "next_run_triggered_by" - ] - - return config - - -def merge_if_possible(source, target): - for entry in source: - if entry in target: - if not source[entry] == target[entry]: - logger.error( - f"Mismatch while trying to merge subjob_clusters {source} into {target}" - ) - sys.exit(-1) - else: - target[entry] = source[entry] - return target + step_list.append("\n\n") + step_list.append("Submission type: ", style="dim") + step_list.append(self.submission_type.__rich__()) + panel = Panel.fit( + step_list, + title="Job", + title_align="left", + subtitle=self.name, + border_style="green", + ) + return panel diff --git a/tests/test_esm_runscripts/test_workflow.py b/tests/test_esm_runscripts/test_workflow.py new file mode 100644 index 000000000..5d0a84775 --- /dev/null +++ b/tests/test_esm_runscripts/test_workflow.py @@ -0,0 +1,642 @@ +import copy + +import pytest + +from esm_runscripts.workflow import (WorkflowError, + collect_all_workflow_information, + complete_clusters, init_total_workflow, + merge_if_possible, + merge_single_entry_if_possible, + order_clusters, prepend_newrun_job, + should_skip_cluster) + + +def test_should_skip_cluster_last_run_in_chunk(): + config = { + "general": { + "workflow": { + "clusters": {"test_cluster": {"run_only": "last_run_in_chunk"}} + }, + "last_run_in_chunk": False, + } + } + assert should_skip_cluster("test_cluster", config) is True + + +def test_should_skip_cluster_first_run_in_chunk(): + config = { + "general": { + "workflow": { + "clusters": {"test_cluster": {"run_only": "first_run_in_chunk"}} + }, + "first_run_in_chunk": False, + } + } + assert should_skip_cluster("test_cluster", config) is True + + +def test_should_skip_cluster_skip_chunk_number(): + config = { + "general": { + "workflow": {"clusters": {"test_cluster": {"skip_chunk_number": 1}}}, + "chunk_number": 1, + } + } + assert should_skip_cluster("test_cluster", config) is True + + +def test_should_skip_cluster_skip_run_number(): + config = { + "general": { + "workflow": {"clusters": {"test_cluster": {"skip_run_number": 1}}}, + "run_number": 1, + } + } + assert should_skip_cluster("test_cluster", config) is True + + +def test_should_skip_cluster_no_skip(): + config = { + "general": { + "workflow": {"clusters": {"test_cluster": {}}}, + "last_run_in_chunk": True, + "first_run_in_chunk": True, + "chunk_number": 2, + "run_number": 2, + } + } + assert should_skip_cluster("test_cluster", config) is False + + +def test_init_total_workflow_minimal_config(): + config = { + "general": {"valid_model_names": [], "workflow": {}}, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + expected = { + "general": { + "valid_model_names": [], + "workflow": { + "clusters": {}, + "jobs": { + "prepcompute": {"nproc": 1, "run_before": "compute"}, + "compute": { + "nproc": 0, + "run_before": "tidy", + "submit_to_batch_system": True, + "run_on_queue": "default_queue", + }, + "tidy": {"nproc": 1, "run_after": "compute"}, + }, + "last_task_in_queue": "tidy", + "first_task_in_queue": "prepcompute", + "next_run_triggered_by": "tidy", + }, + }, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + result = init_total_workflow(config) + assert result == expected + + +def test_init_total_workflow_config_with_nproc(): + config = { + "general": {"valid_model_names": ["model1"], "workflow": {}}, + "model1": {"nproc": 4}, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + expected = { + "general": { + "valid_model_names": ["model1"], + "workflow": { + "clusters": {}, + "jobs": { + "prepcompute": {"nproc": 1, "run_before": "compute"}, + "compute": { + "nproc": 4, + "run_before": "tidy", + "submit_to_batch_system": True, + "run_on_queue": "default_queue", + }, + "tidy": {"nproc": 1, "run_after": "compute"}, + }, + "last_task_in_queue": "tidy", + "first_task_in_queue": "prepcompute", + "next_run_triggered_by": "tidy", + }, + }, + "model1": {"nproc": 4}, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + result = init_total_workflow(config) + assert result == expected + + +def test_init_total_workflow_config_with_nproca_nprocb(): + config = { + "general": {"valid_model_names": ["model1"], "workflow": {}}, + "model1": {"nproca": 2, "nprocb": 3}, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + expected = { + "general": { + "valid_model_names": ["model1"], + "workflow": { + "clusters": {}, + "jobs": { + "prepcompute": {"nproc": 1, "run_before": "compute"}, + "compute": { + "nproc": 6, + "run_before": "tidy", + "submit_to_batch_system": True, + "run_on_queue": "default_queue", + }, + "tidy": {"nproc": 1, "run_after": "compute"}, + }, + "last_task_in_queue": "tidy", + "first_task_in_queue": "prepcompute", + "next_run_triggered_by": "tidy", + }, + }, + "model1": {"nproca": 2, "nprocb": 3}, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + result = init_total_workflow(config) + assert result == expected + + +def test_init_total_workflow_config_with_nprocar_nprocbr(): + config = { + "general": {"valid_model_names": ["model1"], "workflow": {}}, + "model1": {"nproca": 2, "nprocb": 3, "nprocar": 1, "nprocbr": 2}, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + expected = { + "general": { + "valid_model_names": ["model1"], + "workflow": { + "clusters": {}, + "jobs": { + "prepcompute": {"nproc": 1, "run_before": "compute"}, + "compute": { + "nproc": 8, + "run_before": "tidy", + "submit_to_batch_system": True, + "run_on_queue": "default_queue", + }, + "tidy": {"nproc": 1, "run_after": "compute"}, + }, + "last_task_in_queue": "tidy", + "first_task_in_queue": "prepcompute", + "next_run_triggered_by": "tidy", + }, + }, + "model1": {"nproca": 2, "nprocb": 3, "nprocar": 1, "nprocbr": 2}, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + result = init_total_workflow(config) + assert result == expected + + +def test_prepend_newrun_job(): + config = { + "general": { + "workflow": { + "first_task_in_queue": "cluster1", + "last_task_in_queue": "cluster2", + "clusters": { + "cluster1": { + "submission_type": "OtherTask", + "called_from": None, + "next_submit": [], + }, + "cluster2": { + "submission_type": "OtherTask", + "called_from": None, + "next_submit": ["cluster1"], + }, + }, + "jobs": {}, + } + } + } + + expected_config = { + "general": { + "workflow": { + "first_task_in_queue": "newrun", + "last_task_in_queue": "cluster2", + "clusters": { + "cluster1": { + "submission_type": "OtherTask", + "called_from": "newrun", + "next_submit": [], + }, + "cluster2": { + "submission_type": "OtherTask", + "called_from": None, + "next_submit": ["newrun"], + }, + "newrun": { + "called_from": "cluster2", + "run_before": "cluster1", + "next_submit": ["cluster1"], + "jobs": ["newrun_general"], + "submission_type": "sim_object", + }, + }, + "jobs": { + "newrun_general": { + "nproc": 1, + "called_from": "cluster2", + "run_before": "cluster1", + "next_submit": ["cluster1"], + "job_cluster": "newrun", + } + }, + } + } + } + + result = prepend_newrun_job(config) + assert result == expected_config + + +def test_prepend_newrun_job_no_change(): + config = { + "general": { + "workflow": { + "first_task_in_queue": "cluster1", + "last_task_in_queue": "cluster2", + "clusters": { + "cluster1": { + "submission_type": "sim_object", + "called_from": None, + "next_submit": [], + }, + "cluster2": { + "submission_type": "OtherTask", + "called_from": None, + "next_submit": ["cluster1"], + }, + }, + "jobs": {}, + } + } + } + + expected_config = config.copy() + + result = prepend_newrun_job(config) + assert result == expected_config + + +def test_merge_single_entry_if_possible_entry_in_source_not_in_target(): + sourceconf = {"key1": "value1"} + targetconf = {} + entry = "key1" + result = merge_single_entry_if_possible(entry, sourceconf, targetconf) + assert result == {"key1": "value1"} + + +def test_merge_single_entry_if_possible_entry_in_both_matching(): + sourceconf = {"key1": "value1"} + targetconf = {"key1": "value1"} + entry = "key1" + result = merge_single_entry_if_possible(entry, sourceconf, targetconf) + assert result == {"key1": "value1"} + + +def test_merge_single_entry_if_possible_entry_in_both_mismatching(): + sourceconf = {"key1": "value1"} + targetconf = {"key1": "value2"} + entry = "key1" + with pytest.raises(WorkflowError): + merge_single_entry_if_possible(entry, sourceconf, targetconf) + + +def test_merge_if_possible_no_conflict(): + source = {"a": 1, "b": 2} + target = {"c": 3, "d": 4} + expected = {"c": 3, "d": 4, "a": 1, "b": 2} + result = merge_if_possible(source, target) + assert result == expected + + +def test_merge_if_possible_with_conflict(): + source = {"a": 1, "b": 2} + target = {"a": 2, "c": 3} + with pytest.raises(WorkflowError): + merge_if_possible(source, target) + + +def test_merge_if_possible_partial_conflict(): + source = {"a": 1, "b": 2} + target = {"a": 1, "c": 3} + expected = {"a": 1, "c": 3, "b": 2} + result = merge_if_possible(source, target) + assert result == expected + + +@pytest.fixture +def sample_config(): + return { + "model1": { + "workflow": { + "clusters": { + "cluster1": {"key1": "value1"}, + }, + "jobs": { + "jobA": {"run_before": "jobB"}, + "jobB": {"run_after": "jobA", "run_before": "jobC"}, + "jobC": {"run_after": "jobB"}, + }, + "next_run_triggered_by": "trigger1", + } + }, + "general": { + "workflow": { + "clusters": { + "cluster1": {"key1": "general_value1"}, + }, + "jobs": { + "job2": {}, + "job3": {}, + }, + "next_run_triggered_by": "tidy", + } + }, + } + + +def test_collect_all_workflow_information(sample_config): + config = copy.deepcopy(sample_config) + updated_config = collect_all_workflow_information(config) + + # Test clusters merging + assert ( + updated_config["general"]["workflow"]["clusters"]["cluster1"]["key1"] + == "value1" + ) + + # Test jobs renaming and copying + assert "job1_model1" in updated_config["general"]["workflow"]["jobs"] + assert "job1" not in updated_config["general"]["workflow"]["jobs"] + + # Test run_after and run_before renaming + assert ( + updated_config["general"]["workflow"]["jobs"]["job1_model1"]["run_after"] + == "job2" + ) + assert ( + updated_config["general"]["workflow"]["jobs"]["job1_model1"]["run_before"] + == "job3" + ) + + # Test job_cluster assignment + assert ( + updated_config["general"]["workflow"]["jobs"]["job1_model1"]["job_cluster"] + == "job1" + ) + + # Test next_run_triggered_by + assert updated_config["general"]["workflow"]["next_run_triggered_by"] == "trigger1" + + +def test_collect_all_workflow_information_mismatch(sample_config): + config = copy.deepcopy(sample_config) + config["model1"]["workflow"]["next_run_triggered_by"] = "mismatch_trigger" + + with pytest.raises(SystemExit): + collect_all_workflow_information(config) + + +def test_order_clusters_minimal_config(): + config = { + "general": { + "workflow": { + "clusters": {}, + "jobs": {}, + "first_task_in_queue": "first_task", + "last_task_in_queue": "last_task", + } + } + } + expected = { + "general": { + "workflow": { + "clusters": {}, + "jobs": {}, + "first_task_in_queue": "first_task", + "last_task_in_queue": "last_task", + } + } + } + result = order_clusters(config) + assert result == expected + + +def test_order_clusters_with_run_after(): + config = { + "general": { + "workflow": { + "clusters": { + "cluster1": {"run_after": "cluster2"}, + "cluster2": {}, + }, + "jobs": {}, + "first_task_in_queue": "cluster2", + "last_task_in_queue": "cluster1", + } + } + } + expected = { + "general": { + "workflow": { + "clusters": { + "cluster1": { + "run_after": "cluster2", + "called_from": "cluster2", + "next_submit": [], + }, + "cluster2": {"next_submit": ["cluster1"]}, + }, + "jobs": {}, + "first_task_in_queue": "cluster2", + "last_task_in_queue": "cluster1", + } + } + } + result = order_clusters(config) + assert result == expected + + +def test_order_clusters_with_run_before(): + config = { + "general": { + "workflow": { + "clusters": { + "cluster1": {"run_before": "cluster2"}, + "cluster2": {}, + }, + "jobs": {}, + "first_task_in_queue": "cluster1", + "last_task_in_queue": "cluster2", + } + } + } + expected = { + "general": { + "workflow": { + "clusters": { + "cluster1": {"run_before": "cluster2", "next_submit": ["cluster2"]}, + "cluster2": {"called_from": "cluster1"}, + }, + "jobs": {}, + "first_task_in_queue": "cluster1", + "last_task_in_queue": "cluster2", + } + } + } + result = order_clusters(config) + assert result == expected + + +def test_order_clusters_with_run_after_and_run_before(): + config = { + "general": { + "workflow": { + "clusters": { + "cluster1": {"run_after": "cluster2"}, + "cluster2": {"run_before": "cluster3"}, + "cluster3": {}, + }, + "jobs": {}, + "first_task_in_queue": "cluster2", + "last_task_in_queue": "cluster1", + } + } + } + expected = { + "general": { + "workflow": { + "clusters": { + "cluster1": { + "run_after": "cluster2", + "called_from": "cluster2", + "next_submit": [], + }, + "cluster2": { + "run_before": "cluster3", + "next_submit": ["cluster1", "cluster3"], + }, + "cluster3": {"called_from": "cluster2"}, + }, + "jobs": {}, + "first_task_in_queue": "cluster2", + "last_task_in_queue": "cluster1", + } + } + } + result = order_clusters(config) + assert result == expected + + +def test_complete_clusters(): + config = { + "general": { + "workflow": { + "jobs": { + "newexp": { + "job_cluster": "cluster0", + "nproc": 1, + "submit_to_batch_system": False, + "submission_type": "sim_object", + }, + "prepexp": { + "job_cluster": "cluster0", + "nproc": 1, + "submit_to_batch_system": False, + "submission_type": "sim_object", + }, + "prepcompute": { + "job_cluster": "cluster0", + "nproc": 1, + "submit_to_batch_system": False, + "submission_type": "sim_object", + }, + "write_next_cluster": { + "job_cluster": "cluster0", + "nproc": 1, + "submit_to_batch_system": False, + "submission_type": "sim_object", + }, + "compute": { + "job_cluster": "cluster1", + "nproc": 288, + "submit_to_batch_system": True, + "submission_type": "batch", + }, + "tidy": { + "job_cluster": "cluster1", + "nproc": 1, + "submit_to_batch_system": True, + "submission_type": "sim_object", + }, + "prepcompute": { + "job_cluster": "cluster1", + "nproc": 1, + "submit_to_batch_system": False, + "submission_type": "sim_object", + }, + }, + "clusters": {}, + } + } + } + + expected_config = { + "general": { + "workflow": { + "jobs": { + "job1": { + "job_cluster": "cluster1", + "nproc": 2, + "submit_to_batch_system": True, + "script": True, + }, + "job2": { + "job_cluster": "cluster1", + "nproc": 4, + "submit_to_batch_system": False, + "script": True, + }, + "job3": { + "job_cluster": "cluster2", + "nproc": 1, + "submit_to_batch_system": False, + "script": False, + }, + }, + "clusters": { + "cluster1": { + "jobs": ["job1", "job2"], + "submission_type": "batch", + "submit_to_batch_system": True, + "order_in_cluster": "sequential", + "nproc": 4, + }, + "cluster2": { + "jobs": ["job3"], + "submission_type": "sim_object", + "submit_to_batch_system": False, + "order_in_cluster": "sequential", + "nproc": 1, + }, + }, + } + } + } + + result = complete_clusters(config) + assert result == expected_config