diff --git a/configs/esm_software/esm_runscripts/defaults.yaml b/configs/esm_software/esm_runscripts/defaults.yaml index 0a23ca5fa..acf397b40 100644 --- a/configs/esm_software/esm_runscripts/defaults.yaml +++ b/configs/esm_software/esm_runscripts/defaults.yaml @@ -9,12 +9,61 @@ general: # Defaults to be added to each model or component per_model_defaults: - file_movements: - default: - all_directions: copy - bin: - init_to_exp: copy - exp_to_run: copy - run_to_work: copy - work_to_run: copy + file_movements: + default: + all_directions: copy + bin: + init_to_exp: copy + exp_to_run: copy + run_to_work: copy + work_to_run: copy +workflow: + user_phases: None + default_cluster: sim_cluster + clusters: + sim_cluster: + first_task_in_queue: prepcompute + last_task_in_queue: tidy + next_run_triggered_by: tidy + phases: + - prepcompute + - compute + - tidy + + phases: + prepcompute: + phase_type: SimulationSetup + called_from: tidy + cluster: sim_cluster + name: prepcompute + next_submit: + - compute + nproc: 1 + order_in_cluster: sequential + run_after: tidy + run_before: compute + compute: + phase_type: compute + called_from: prepcompute + cluster: sim_cluster + name: compute + next_submit: + - tidy + #nproc: None + nproc: 1 + order_in_cluster: sequential + run_after: prepcompute + run_before: tidy + #run_on_queue: ${computer.partitions.pp.name} + tidy: + phase_type: SimulationSetup + called_from: compute + cluster: sim_cluster + name: tidy + next_submit: + - prepcompute + nproc: 1 + order_in_cluster: sequential + run_after: compute + run_before: prepcompute diff --git a/src/esm_plugin_manager/esm_plugin_manager.py b/src/esm_plugin_manager/esm_plugin_manager.py index 4db6f93b4..888b324fb 100644 --- a/src/esm_plugin_manager/esm_plugin_manager.py +++ b/src/esm_plugin_manager/esm_plugin_manager.py @@ -123,16 +123,30 @@ def check_plugin_availability(plugins): def work_through_recipe(recipe, plugins, config): + """ + Works through the esm_runscripts recipes and plugin recipes. + + Parameters + ---------- + recipe : dict # What is in these two dictionaries? Where do the entries are comming from? + plugins : dict + config : dict + + Returns + ------- + config : dict + """ if config.get("general", {}).get("debug_recipe", False): import pdb pdb.set_trace() recipes = recipe["recipe"] + # Loop over the recipe for index, workitem in enumerate(recipes, start=1): if config["general"].get("verbose", False): # diagnostic message of which recipe step is being executed message = ( - f"::: Executing the step: {workitem} " + f"::: START Executing the step: {workitem} " f"(step [{index}/{len(recipes)}] of the job: " f'{recipe["job_type"]})' ) @@ -162,6 +176,18 @@ def work_through_recipe(recipe, plugins, config): thismodule = importlib.util.module_from_spec(spec) spec.loader.exec_module(thismodule) config = getattr(thismodule, workitem)(config) + if config["general"].get("verbose", False): + # diagnostic message of which recipe step is being executed + message = ( + f"::: END Executing the step: {workitem} " + f"(step [{index}/{len(recipes)}] of the job: " + f'{recipe["job_type"]})' + ) + + print() + print("=" * len(message)) + print(message) + print("=" * len(message)) return config diff --git a/src/esm_runscripts/batch_system.py b/src/esm_runscripts/batch_system.py index 171383286..ad718ca81 100644 --- a/src/esm_runscripts/batch_system.py +++ b/src/esm_runscripts/batch_system.py @@ -1,17 +1,18 @@ +import copy import os -import textwrap -import sys +import pdb import stat -import copy +import sys +import textwrap import esm_environment - from esm_parser import find_variable, user_error, user_note -from . import helpers -from . import dataprocess -from . import prepare -from .slurm import Slurm + +from . import dataprocess, helpers, prepare from .pbs import Pbs +from .slurm import Slurm + +import pdb known_batch_systems = ["slurm", "pbs"] reserved_jobtypes = ["prepcompute", "compute", "prepare", "tidy", "inspect"] @@ -263,7 +264,7 @@ def calculate_requirements(config, cluster=None): if ( not cluster - or not cluster in config["general"]["workflow"]["subjob_clusters"] + or cluster not in config["general"]["workflow"]["subjob_clusters"] ): print(f"Unknown or unset cluster: {cluster}.") sys.exit(-1) @@ -324,7 +325,7 @@ def get_extra(config): ) elif isinstance(pre_run_commands, str): extras.append(pre_run_commands) - elif pre_run_commands == None: + elif pre_run_commands is None: continue else: user_error( @@ -346,7 +347,7 @@ def append_start_statement(config, subjob): config["general"]["run_number"], config["general"]["current_date"], config["general"]["jobid"], - "- start", + "- start from run script", ], timestampStr_from_Unix=True, ) @@ -362,7 +363,7 @@ def append_done_statement(config, subjob): config["general"]["run_number"], config["general"]["current_date"], config["general"]["jobid"], - "- done", + "- done from run script", ], timestampStr_from_Unix=True, ) @@ -371,9 +372,37 @@ def append_done_statement(config, subjob): @staticmethod def get_run_commands(config, subjob, batch_or_shell): # here or in compute.py? + """ + Creates the command of the specific phase to be put in the *.run file. + + This function is covering the following phase types: + - SimulationSetup: phases that are run as 'esm_runscripts' command + - batch: phases that are run via 'srun' command + - shell: phases that are run as shell scripts. The command is generated by + a function in the 'dataprocess' module. + + Special case: phase 'compute': + - This phase is of type 'batch' + + Todo: How about other phases of type batch? in dataprocess??? + + Parameters + ---------- + config: dict + subjob: str + Name of phase + batch_or_shell: str + Type of phase (SimulationSetup, batch, shell) + + Returns + ------- + commands: list + List of command and arguments of a phase depending of its type. + """ commands = [] if subjob.startswith("compute"): + # for batch jobs if config["general"].get("submit_to_batch_system", True): batch_system = config["computer"] if "execution_command" in batch_system: @@ -384,6 +413,7 @@ def get_run_commands(config, subjob, batch_or_shell): # here or in compute.py? ) if config["general"].get("multi_srun"): return self.bs.get_run_commands_multisrun(config, commands) + # for shell scripts else: for model in config: if model == "computer": @@ -395,9 +425,50 @@ def get_run_commands(config, subjob, batch_or_shell): # here or in compute.py? + f" 2>&1{config['computer'].get('write_execution_log', '')} &" ) else: - subjob_tasks = dataprocess.subjob_tasks(config, subjob, batch_or_shell) - for task in subjob_tasks: - commands.append(task) + if batch_or_shell == "SimulationSetup": + # for phase type 'SimulationSetup' (e.g. prepcompute, tidy) + commands = [] + commands.append("esm_runscripts") + # add runscript with absolute path + runscript = config["general"]["runscript_abspath"] + commands.append(runscript) + # add experiment id + commands.append(f"-e {config['general']['expid']}") + # add phase + commands.append(f"--phase {subjob}") + # add task + commands.append(f"-t run_phase") + # add date + commands.append("-s " + config['general']['current_date'].format( + form=9, givenph=False, givenpm=False, givenps=False + )) + # add + commands.append(f"-r {str(config['general']['run_number'])}") + # add verbose and no message_of_the day argument + commands.append("-v --no-motd") + # add last-jobtype argument + commands.append(f"--last-jobtype {subjob}") + # add --open-ran or use_venv argument + if "--open-run" in config["general"]["original_command"] or not config[ + "general" + ].get("use_venv"): + commands.append(" --open-run") + elif "--contained-run" in config["general"][ + "original_command" + ] or config["general"].get("use_venv"): + commands.append("--contained-run") + else: + print("ERROR -- Not sure if you were in a contained or open run!") + print( + "ERROR -- See write_simple_runscript for the code causing this." + ) + sys.exit(1) + else: + # for all other phase types (batch, shell) except phase 'compute' + subjob_tasks = dataprocess.subjob_tasks(config, subjob, batch_or_shell) + # Why was this necessary? And not set commands directly? + for task in subjob_tasks: + commands.append(task) return commands @@ -428,8 +499,132 @@ def get_submit_command(config, batch_or_shell, runfilename): return commands @staticmethod - def write_simple_runscript(config, cluster, batch_or_shell="batch"): + def write_run_batch_script(config, cluster, batch_or_shell="batch"): + + workflow = config["general"]["workflow"]["object"] + phases = workflow.clusters[cluster]["phases"] + self = config["general"]["batch"] + runfilename = batch_system.get_run_filename(config, cluster) + if config["general"]["verbose"]: + print("jobtype: ", config["general"]["jobtype"]) + print("writing run file for:", cluster) + + with open(runfilename, "w") as runfile: + config = batch_system.calculate_requirements(config, "compute") + # TODO: remove it once it's not needed anymore (substituted by packjob) + if ( + cluster in reserved_jobtypes + and config["computer"].get("taskset", False) + ): + config = config["general"]["batch"].write_het_par_wrappers(config) + # Prepare launcher + config = config["general"]["batch"].prepare_launcher(config, "compute") + # Initiate the header + header = batch_system.get_batch_header(config, "compute") + for line in header: + runfile.write(line + "\n") + runfile.write("\n") + # environment for each phase of a cluster + environment = batch_system.get_environment(config, "compute") + batch_system.write_env(config, environment, runfilename) + for line in environment: + runfile.write(line + "\n") + + # extra entries for each phase + extra = batch_system.get_extra(config) + for line in extra: + runfile.write(line + "\n") + + for phase in phases: + phase = workflow.get_workflow_phase_by_name(phase) + # Add actual commands + commands = batch_system.get_run_commands( + config, phase["name"], batch_or_shell + ) + # commands = clusterconf.get("data_task_list", []) + runfile.write("\n") + runfile.write("#********** Start phase " + phase["name"] + " *************\n") + runfile.write(self.append_start_statement(config, phase["name"]) + "\n") + runfile.write("\n") + +# if cluster in reserved_jobtypes: + config["general"]["batch"].add_pre_launcher_lines( + config, cluster, runfile + ) + + command = phase["run_command"] + if phase["phase_type"] == "SimulationSetup": + runfile.write("cd " + config["general"]["experiment_scripts_dir"] + "\n") + runfile.write(f"{command} --run-from-batch-script\n") + elif phase["phase_type"] == "compute": + runfile.write("cd " + config["general"]["thisrun_work_dir"] + "\n") + observe_call = ( + "esm_runscripts " + + config["general"]["started_from"] + + config["general"]["scriptname"] + + " -e " + + config["general"]["expid"] + + " -t observe" + + " --phase " + + phase["name"] + + " -p ${process}" + + " -s " + + config["general"]["current_date"].format( + form=9, givenph=False, givenpm=False, givenps=False + ) + + " -r " + + str(config["general"]["run_number"]) + + " -v " + + " --last-jobtype " + + config["general"]["jobtype"] + + " --open-run" + ) + runfile.write(f"{command}\n") + runfile.write("process=$!\n") + runfile.write("\n") + runfile.write("#********** Start to observe " + phase["name"] + " *************\n") + runfile.write(self.append_start_statement(config, "observe") + "\n") + runfile.write("cd " + config["general"]["experiment_scripts_dir"] + "\n") + runfile.write(f"{observe_call}\n") + runfile.write("\n") + runfile.write("wait\n") + runfile.write(self.append_done_statement(config, "observe") + "\n") + doneline = "echo " + line + " >> " + config["general"]["experiment_log_file"] + else: + runfile.write(f"{command}\n") + runfile.write(self.append_done_statement(config, phase["name"]) + "\n") + + # TODO: Check if end_or_experiment(config) -> can not do from . import resubmit will give error dont know why + if not config["general"]["next_date"] >= config["general"]["final_date"]: + runfile.write("\n") + runfile.write("#********** Call esm_runscripts to restart simulation. *************\n") + + expid = config["general"].get("expid", None) + scriptname = config["general"].get("scriptname", None) + current_date = config["general"].get("current_date", None) + current_date = current_date.format(form=9, givenph=False, givenpm=False, givenps=False) + + run_number = config["general"].get("run_number", None) + + runfile.write(self.append_start_statement(config, "restart") + "\n") + runfile.write(f"esm_runscripts {scriptname} -e {expid} -t restart --open-run -v --no-motd --run-from-batch-script --open-run -s {current_date} -r {run_number}") + runfile.write("\n") + runfile.write(self.append_done_statement(config, "restart") + "\n") + runfile.write("\n") + + runfile.write("\n") + runfile.write("wait\n") + + config["general"]["submit_command"] = batch_system.get_submit_command( + config, batch_or_shell, runfilename + ) + + return config + + + @staticmethod + def write_simple_runscript(config, cluster, batch_or_shell="batch"): # if no cluster is specified, work on the one we are in # if not cluster: # cluster = config["general"]["jobtype"] @@ -516,7 +711,7 @@ def write_simple_runscript(config, cluster, batch_or_shell="batch"): # dummy = 0 else: # "normal" case dummy = 0 - + # check if this cluster has has something to submit (next_submit not empty) if submits_another_job(config, cluster): # and batch_or_shell == "batch": # -j ? is that used somewhere? I don't think so, replaced by workflow # " -j "+ config["general"]["jobtype"] @@ -818,7 +1013,7 @@ def calc_launcher_flags(config, model, cluster): cpus_per_proc = config[model].get("cpus_per_proc", omp_num_threads) # Check for CPUs and OpenMP threads if omp_num_threads > cpus_per_proc: - esm_parser.user_error( + user_error( "OpenMP configuration", ( "The number of OpenMP threads cannot be larger than the number" @@ -830,7 +1025,7 @@ def calc_launcher_flags(config, model, cluster): elif "nproca" in config[model] and "nprocb" in config[model]: # ``nproca``/``nprocb`` not compatible with ``omp_num_threads`` if omp_num_threads > 1: - esm_parser.user_note( + user_note( "nproc", "``nproca``/``nprocb`` not compatible with ``omp_num_threads``", ) @@ -870,7 +1065,6 @@ def calc_launcher_flags(config, model, cluster): return launcher_flags - def submits_another_job(config, cluster): clusterconf = config["general"]["workflow"]["subjob_clusters"][cluster] if clusterconf.get("next_submit", []) == []: diff --git a/src/esm_runscripts/cli.py b/src/esm_runscripts/cli.py index 081bba182..ff0b7b14f 100644 --- a/src/esm_runscripts/cli.py +++ b/src/esm_runscripts/cli.py @@ -20,6 +20,8 @@ from esm_parser import user_error +import pdb + def parse_shargs(): """The arg parser for interactive use""" parser = argparse.ArgumentParser() @@ -95,10 +97,24 @@ def parse_shargs(): parser.add_argument( "-t", "--task", - help="The task to run. Choose from: prepcompute, post, couple, tidy", + help="The task to run. Choose from: start, restart, continue, ...", + default="start", + ) + + parser.add_argument( + "-a", + "--phase", + help="The workflow phase to run. Choose from: prepcompute, compute, tidy, ...", default="unknown", ) + parser.add_argument( + "-w", + "--workflow", + help="The workflow to run. Choose from: default (sim_cluster???), ...", + default="sim_cluster", + ) + parser.add_argument( "-i", "--inspect", @@ -109,7 +125,7 @@ def parse_shargs(): parser.add_argument( "-p", "--pid", - help="The PID of the task to observe.", + help="The PID of the phase to observe.", default=-666, ) @@ -161,6 +177,12 @@ def parse_shargs(): action="store_true", ) + parser.add_argument( + "--run-from-batch-script", + default=False, + action="store_true", + ) + return parser.parse_args() @@ -202,8 +224,12 @@ def main(): update_filetypes = parsed_args["update_filetypes"] if "expid" in parsed_args: expid = parsed_args["expid"] + if "phase" in parsed_args: + jobtype = parsed_args["phase"] +# if "workflow" in parsed_args: +# workflow = parsed_args["workflow"] if "task" in parsed_args: - jobtype = parsed_args["task"] + task = parsed_args["task"] if "verbose" in parsed_args: verbose = parsed_args["verbose"] if "inspect" in parsed_args: @@ -222,6 +248,8 @@ def main(): no_motd = parsed_args["no_motd"] if "ignore_config_warnings" in parsed_args: ignore_config_warnings = parsed_args["ignore_config_warnings"] + if "run_from_batch_script" in parsed_args: + run_from_batch_script = parsed_args["run_from_batch_script"] command_line_config = {} command_line_config["check"] = check @@ -233,6 +261,8 @@ def main(): command_line_config["current_date"] = start_date command_line_config["run_number"] = run_number command_line_config["jobtype"] = jobtype +# command_line_config["workflow"] = workflow + command_line_config["task"] = task command_line_config["last_jobtype"] = ARGS.last_jobtype command_line_config["verbose"] = verbose command_line_config["inspect"] = inspect @@ -241,6 +271,7 @@ def main(): command_line_config["ignore_config_warnings"] = ignore_config_warnings if modify_config_file: command_line_config["modify_config_file"] = modify_config_file + command_line_config["run_from_batch_script"] = run_from_batch_script # runscript_from_cmdline = filter(lambda x: x.endswith(".yaml"), sys.argv) # runscript_from_cmdline = list(runscript_from_cmdline)[0] @@ -282,4 +313,5 @@ def main(): # if not Setup.config['general']['submitted']: if not Setup.config["general"]["submitted"] and not no_motd: check_all_esm_packages() + Setup() diff --git a/src/esm_runscripts/dataprocess.py b/src/esm_runscripts/dataprocess.py index 174b8c937..a3c93799c 100644 --- a/src/esm_runscripts/dataprocess.py +++ b/src/esm_runscripts/dataprocess.py @@ -27,17 +27,20 @@ def subjob_tasks(config, subjob, batch_or_shell): task_list = [] subjob_config = config["general"]["workflow"]["subjobs"][subjob] - old_logfile = config["general"]["logfile_path"] - logfile_dir = os.path.dirname(old_logfile) - if config["general"]["setup_name"] in subjob: - bare_subjob = subjob.replace("_" + config["general"]["setup_name"], "") - else: - bare_subjob = subjob - logfile_name = os.path.basename(old_logfile).replace( - config["general"]["jobtype"], bare_subjob - ) + old_logfile = config["general"].get("logfile_path", None) + if old_logfile: + logfile_dir = os.path.dirname(old_logfile) + if config["general"]["setup_name"] in subjob: + bare_subjob = subjob.replace("_" + config["general"]["setup_name"], "") + else: + bare_subjob = subjob + logfile_name = os.path.basename(old_logfile).replace( + config["general"]["jobtype"], bare_subjob + ) - new_logfile = os.path.join(logfile_dir, logfile_name) + new_logfile = os.path.join(logfile_dir, logfile_name) + else: + new_logfile = "logfile_please_set_name.txt" scriptdir = subjob_config.get("script_dir", False) script = subjob_config.get("script", False) diff --git a/src/esm_runscripts/logfiles.py b/src/esm_runscripts/logfiles.py index 102c0d233..c38eae27e 100644 --- a/src/esm_runscripts/logfiles.py +++ b/src/esm_runscripts/logfiles.py @@ -11,10 +11,13 @@ def initialize_logfiles(config, org_jobtype): log_stuff = False if os.path.isdir(os.path.dirname(config["general"]["experiment_log_file"])): if not org_jobtype == "inspect": + #if not os.path.isfile(config["general"]["experiment_log_file"]): log_stuff = True + # Set name of logfile into config config = set_logfile_name(config, "") + # Writes some line to the logfile defined in config. if log_stuff: helpers.write_to_log( @@ -24,9 +27,10 @@ def initialize_logfiles(config, org_jobtype): logfile_run_number, str(config["general"]["current_date"]), str(config["general"]["jobid"]), - "- start", + "- start in initialize_logfiles", ], ) + # Creates a logfile object/handle for stdout/phase log file logfile = RuntimeLogger( config["general"]["logfile_path"], "w", @@ -35,6 +39,7 @@ def initialize_logfiles(config, org_jobtype): else: logfile = sys.stdout + # Writes logfile handle into global variable logfile_handle = logfile return config @@ -52,7 +57,7 @@ def finalize_logfiles(config, org_jobtype): logfile_run_number, str(config["general"]["current_date"]), str(config["general"]["jobid"]), - "- done", + "- done in finalize_logfiles", ], ) diff --git a/src/esm_runscripts/prepare.py b/src/esm_runscripts/prepare.py index c8007421c..37debc20d 100644 --- a/src/esm_runscripts/prepare.py +++ b/src/esm_runscripts/prepare.py @@ -68,10 +68,53 @@ def _read_date_file(config): date = config["general"].get("initial_date", "18500101") run_number = 1 write_file = True + + date_c = config["general"].get("current_date", None) + + if date_c is not None: + date_fdf = Date(date) + date_c = Date(str(config["general"]["current_date"])) + run_number_c = int(config["general"]["run_number"]) + last_jobtype = config["general"].get("last_jobtype", "") + isresubmitted = last_jobtype == config["general"]["jobtype"] + + if date_fdf != date_c: + + msg = ( + f"``Date`` and ``run_number`` are ``not`` taken from date file, " + f"but from command_line argument (provided by -s or --start_date). " + f"The given start_date ({date_c}) and run_number ({run_number_c}) " + f"are different from the values " + f"in the current date file of your experiment ({date}, {run_number}). " + f"Your experiment may now be in a non consecutive state. " + f"Please confirm if you want to continue:" + ) + esm_parser.user_note("Detached experiment:", msg) + proceed = "" + if isresubmitted: + proceed = questionary.select( + "Do you want to continue?", + choices=[ + f"Yes, with date from command line argument: {str(config['general']['current_date'])}", + f"Yes, with date from date file: {date}", + "No, cancel." + ]).ask() + + if 'Yes, with date from command line argument' in proceed: + date = str(date_c) + run_number = run_number_c + elif 'Yes, with date from date file' in proceed: + date = date + run_number = run_number + else: + esm_parser.user_note("The experiment will be cancelled:", f"You cancelled the experiment due to date discrepancies.") + sys.exit(1) + config["general"]["run_number"] = run_number config["general"]["current_date"] = date logging.info("current_date = %s", date) logging.info("run_number = %s", run_number) + return config @@ -274,7 +317,7 @@ def _initialize_calendar(config): if config["general"]["reset_calendar_to_last"]: config = find_last_prepared_run(config) config = set_most_dates(config) - if not "iterative_coupling" in config["general"]: + if "iterative_coupling" not in config["general"]: config["general"]["chunk_number"] = 1 if config["general"]["run_number"] == 1: @@ -346,7 +389,7 @@ def set_leapyear(config): config["general"]["leapyear"] = config[model]["leapyear"] break - if not "leapyear" in config["general"]: + if "leapyear" not in config["general"]: for model in config["general"]["valid_model_names"]: config[model]["leapyear"] = True config["general"]["leapyear"] = True @@ -634,39 +677,39 @@ def set_parent_info(config): # Make sure "ini_parent_dir" and "ini_restart_dir" both work: for model in config["general"]["valid_model_names"]: # If only ini_restart_* variables are used in runcscript, set ini_parent_* to the same values - if not "ini_parent_dir" in config[model]: + if "ini_parent_dir" not in config[model]: if "ini_restart_dir" in config[model]: config[model]["ini_parent_dir"] = config[model]["ini_restart_dir"] - if not "ini_parent_exp_id" in config[model]: + if "ini_parent_exp_id" not in config[model]: if "ini_restart_exp_id" in config[model]: config[model]["ini_parent_exp_id"] = config[model]["ini_restart_exp_id"] - if not "ini_parent_date" in config[model]: + if "ini_parent_date" not in config[model]: if "ini_restart_date" in config[model]: config[model]["ini_parent_date"] = config[model]["ini_restart_date"] # check if parent is defined in esm_tools style # (only given for setup) setup = config["general"]["setup_name"] - if not setup in config: + if setup not in config: setup = "general" if "ini_parent_exp_id" in config[setup]: for model in config["general"]["valid_model_names"]: - if not "ini_parent_exp_id" in config[model]: + if "ini_parent_exp_id" not in config[model]: config[model]["ini_parent_exp_id"] = config[setup]["ini_parent_exp_id"] if "ini_parent_date" in config[setup]: for model in config["general"]["valid_model_names"]: - if not "ini_parent_date" in config[model]: + if "ini_parent_date" not in config[model]: config[model]["ini_parent_date"] = config[setup]["ini_parent_date"] if "ini_parent_dir" in config[setup]: for model in config["general"]["valid_model_names"]: - if not "ini_parent_dir" in config[model]: + if "ini_parent_dir" not in config[model]: config[model]["ini_parent_dir"] = ( config[setup]["ini_parent_dir"] + "/" + model ) # Get correct parent info for model in config["general"]["valid_model_names"]: - if config[model]["lresume"] == True and config["general"]["run_number"] == 1: + if config[model]["lresume"] is True and config["general"]["run_number"] == 1: config[model]["parent_expid"] = config[model]["ini_parent_exp_id"] if "parent_date" not in config[model]: config[model]["parent_date"] = config[model]["ini_parent_date"] @@ -726,6 +769,7 @@ def add_vcs_info(config): yaml.dump(vcs_versions, f) return config + def check_vcs_info_against_last_run(config): """ Ensures that the version control info for two runs is identical between the @@ -777,7 +821,6 @@ def check_vcs_info_against_last_run(config): If you are **sure** that this is OK, you can set 'general.allow_vcs_differences' to True to avoid this check. """) - return config @@ -811,7 +854,7 @@ def initialize_batch_system(config): def initialize_coupler(config): - if config["general"]["standalone"] == False: + if config["general"]["standalone"] is False: from . import coupler base_dir = config["general"]["base_dir"] @@ -882,6 +925,7 @@ def check_config_for_warnings_errors(config): return config + def warn_error(config, trigger, note_function): """ Checks the ``sections`` of the ``config`` for a given ``trigger`` (``"error"`` or @@ -940,7 +984,7 @@ def warn_error(config, trigger, note_function): Method to report the note """ # Sufixes for the warning special case - if trigger=="warning": + if trigger == "warning": sufix_name = f" WARNING" else: sufix_name = f"" @@ -967,7 +1011,7 @@ def warn_error(config, trigger, note_function): # needs to halt, and the user has not defined the # ``--ignore-config-warnings`` flag in the ``esm_runscripts`` call if ( - trigger=="warning" + trigger == "warning" and config["general"].get("isinteractive") and action_info.get("ask_user_to_continue", False) and not config["general"].get("ignore_config_warnings", False) diff --git a/src/esm_runscripts/prepexp.py b/src/esm_runscripts/prepexp.py index 75d32dd84..1b983940a 100644 --- a/src/esm_runscripts/prepexp.py +++ b/src/esm_runscripts/prepexp.py @@ -179,7 +179,7 @@ def _call_esm_runscripts_internally(config, command, exedir): non_interaction_flags = [ "--no-motd", f"--last-jobtype {config['general']['jobtype']}", - f"-t {config['general']['jobtype']}" + f"--phase {config['general']['jobtype']}" ] for ni_flag in non_interaction_flags: # prevent continuous addition of ``ni_flag`` @@ -356,9 +356,15 @@ def initialize_experiment_logfile(config): it_coupled_model = config["general"]["iterative_coupled_model"] datestamp = config["general"]["run_datestamp"] - if config["general"]["run_number"] == 1: - if os.path.isfile(config["general"]["experiment_log_file"]): - os.remove(config["general"]["experiment_log_file"]) + fromdir = os.path.realpath(config["general"]["started_from"]) + scriptsdir = os.path.realpath(config["general"]["experiment_scripts_dir"]) + +# if (fromdir == scriptsdir): + # TODO: Check the next if statements + if not config["general"]["run_from_batch_script"]: + #if config["general"]["run_number"] == 1: + #if os.path.isfile(config["general"]["experiment_log_file"]): + # os.remove(config["general"]["experiment_log_file"]) log_msg = f"# Beginning of Experiment {expid}" write_to_log(config, [log_msg], message_sep="") @@ -370,7 +376,7 @@ def initialize_experiment_logfile(config): str(config["general"]["run_number"]), str(config["general"]["current_date"]), str(config["general"]["jobid"]), - "- start", + "- start in prepexp", ], ) diff --git a/src/esm_runscripts/resubmit.py b/src/esm_runscripts/resubmit.py index daa15047d..65cd081a8 100644 --- a/src/esm_runscripts/resubmit.py +++ b/src/esm_runscripts/resubmit.py @@ -5,8 +5,21 @@ from . import chunky_parts from . import workflow +import pdb def submit(config): + """ + Submits a jobscript to the batch system by calling os.system + + Arguments + --------- + config : dict + + Returns + ------- + config : dict + """ + if config["general"]["verbose"]: print("\n", 40 * "+ ") print("Submitting jobscript to batch system...") @@ -22,15 +35,43 @@ def submit(config): def resubmit_batch_or_shell(config, batch_or_shell, cluster=None): + """ + - Creates a submit_commant and sets it to config depending on kind of submission (batch or shell) + - Calls function submit to acually submitting the shell or batch command + + Arguments + --------- + config : dict + batch_or_shell : Bool + cluster : (optional) + + Returns + ------- + config : dict + """ + config = config["general"]["batch"].write_simple_runscript( config, cluster, batch_or_shell ) + # Checks, if not submitted with option -c in esm_runscript call (check run) if not check_if_check(config): config = submit(config) return config def resubmit_SimulationSetup(config, cluster=None): + """ + Resubmitting a workflow phase/cluster that is of type SimulationSetup + - Initialize the cluster as a new SimulationSetup object + + Arguments + --------- + config : dict + cluster : str (optional: name of cluster) + Returns + ------- + config : dict + """ monitor_file = logfiles.logfile_handle # Jobs that should be started directly from the compute job: @@ -60,6 +101,7 @@ def resubmit_SimulationSetup(config, cluster=None): cluster_obj.config[f"{cluster}_update_{jobtype}_config_before_resubmit"] ) + # Checks, if not submitted with option -c in esm_runscript call (check run) if not check_if_check(config): monitor_file.write(f"Calling {cluster} job:\n") @@ -69,14 +111,29 @@ def resubmit_SimulationSetup(config, cluster=None): def get_submission_type(cluster, config): - # Figure out if next job is resubmitted to batch system, - # just executed in shell or invoked as new SimulationSetup - # object + """ + Figure out if next job is + - resubmitted to batch system, + - just executed in shell or + - invoked as new SimulationSetup object + + Arguments + --------- + cluster : str (name of cluster) + config : dict + + Returns + ------- + submission_type : str + """ clusterconf = config["general"]["workflow"]["subjob_clusters"][cluster] if clusterconf.get("submit_to_batch_system", False): submission_type = "batch" + # This information should come from the config of the cluster/workflow phase + # This information is given in batch_or_shell attribute of workflow phase/cluster + # TODO: Make this a function of workflow manager??? elif cluster in ["newrun", "prepcompute", "tidy", "inspect", "viz"]: submission_type = "SimulationSetup" else: @@ -86,6 +143,17 @@ def get_submission_type(cluster, config): def end_of_experiment(config): + """ + Checks if it is the end of the experiment. + + Arguments + --------- + config + + Returns + ------- + True or False + """ if config["general"]["next_date"] >= config["general"]["final_date"]: monitor_file = logfiles.logfile_handle monitor_file.write("Reached the end of the simulation, quitting...\n") @@ -96,6 +164,17 @@ def end_of_experiment(config): def end_of_experiment_all_models(config): + """ + Checks if end of experiment is reached and everything is done + + Arguments + --------- + config : dict + + Returns + ------- + True or False + """ index = 1 expid = config["general"]["expid"] while "model" + str(index) in config["general"]["original_config"]: @@ -131,6 +210,17 @@ def end_of_experiment_all_models(config): def check_if_check(config): + """ + Will check if esm_runscripts has been called with option -c (check run only) + + Arguments + --------- + config : dict + + Returns + ------- + True or False + """ if config["general"]["check"]: print( "Actually not submitting anything, this job preparation was launched in 'check' mode (-c)." @@ -142,51 +232,129 @@ def check_if_check(config): def maybe_resubmit(config): - - jobtype = config["general"]["jobtype"] - - nextrun = resubmit_recursively(config, jobtype=jobtype) - - if nextrun: # submit list contains stuff from next run - - config = _increment_date_and_run_number(config) - config = _write_date_file(config) - - if end_of_experiment(config): - if config["general"].get("iterative_coupling", False): - if end_of_experiment_all_models(config): - return config - else: - # config = chunky_parts._update_chunk_date_file(config) - return config - - cluster = config["general"]["workflow"]["first_task_in_queue"] - nextrun = resubmit_recursively( - config, list_of_clusters=[cluster], nextrun_in=True + """ + If nextrun is started, + - calls funtion to increment date and run_number + - calls function to write new date file + If it recognizes that is was actually the last run + - returns if end of the experiment (if not iterative_coupling) + + Arguments + --------- + config : dict + + Returns + ------- + config : dict + """ + task = config["general"].get("task", None) + # if task is start, restart, run_workflow -> write new *.run file +# if end_of_experiment(config): +# print('test') + if task in ["start", "run_workflow", "restart"]: + jobtype = config["general"]["jobtype"] # current phase + workflow = config["general"]["workflow"]["object"] + default_cluster = workflow.default_cluster + phases = workflow.clusters[default_cluster]["phases"] + config = config["general"]["batch"].write_run_batch_script( + config, default_cluster, 'batch' ) + print("Create *.run file") + + if not check_if_check(config): + config = submit(config) + + elif task in ["run_phase"]: + # check if phase type compute + print("execute a phase") + + # TODO: Check if run from *.run file + # TODO: Create *.run file + + # check if nextrun starts??? + # this resubmits any following jobtypes/phases until nextrun is true + # here nextrun is always set to true (if resubmit_recursively is finished) + + # cases: 1. it is the beginning of (next) run: + # - resubmit_recursively returns true but does not do anything except for returning true + # - check if end of simulation -> return + # - returns if iterative coupling, why ??? + # - if not end of simulation and not iterative_coupling -> calls itself again with nextrun_in=True which leads to case 2. + # 2. it is NOT the beginning if (next) run: + # it will start to loop over all remaining clusters to check if it can sumbit something (SimulationSetup, sbatch, shell) and do so, + # until first start of next run is reached. + # 3. nextrun is fals if no entries in next_submit for that particular jobtype/cluster + + #nextrun = resubmit_recursively(config, jobtype=jobtype) + + #if nextrun: # submit list contains stuff from next run + + # config = _increment_date_and_run_number(config) + # config = _write_date_file(config) + + # if end_of_experiment(config): + # if config["general"].get("iterative_coupling", False): + # # If not iterative coupling + # # check if end of experiment for all models + # # if not??? + # if end_of_experiment_all_models(config): + # return config + # else: + # # config = chunky_parts._update_chunk_date_file(config) + # return config + + # #cluster = config["general"]["workflow"]["first_task_in_queue"] + # ## For what is nextrun here nedded? + # #nextrun = resubmit_recursively( + # # config, list_of_clusters=[cluster], nextrun_in=True + # #) return config def resubmit_recursively(config, jobtype=None, list_of_clusters=None, nextrun_in=False): + """ + - Reads in a list of all clusters (next_submit) in a workflow of a given jobtype (if not passes as argument) + - Checks if cluster is going to be skipped + - Gets the submission_type of cluster and calls the corresponding resubmit function + - If cluster is skipped, calls this function again ??? + - What is nextrun_in for? What if true? If within a run??? + - When could cluster be first_task_in_queue and nextrun_in=true? + + Arguments + --------- + config : dict + jobtype : (optional) + list_of_clusters: (optional) + nextrun_in: (optional) + + Returns + ------- + nextrun : Boolean + """ nextrun = False - + # get a list of clusters that follow the current jobtype if not list_of_clusters: list_of_clusters = config["general"]["workflow"]["subjob_clusters"][ jobtype ].get("next_submit", []) for cluster in list_of_clusters: + # if beginning of next run if ( cluster == config["general"]["workflow"]["first_task_in_queue"] and not nextrun_in ): nextrun = True + # if not at the beginning of a run else: + # and cluster is not going to be skipped if not workflow.skip_cluster(cluster, config): submission_type = get_submission_type(cluster, config) if submission_type == "SimulationSetup": + # create the SimulationSetup object for the this/next jobtype resubmit_SimulationSetup(config, cluster) + # or submits to batch or shell if not check run elif submission_type in ["batch", "shell"]: resubmit_batch_or_shell(config, submission_type, cluster) else: @@ -199,6 +367,20 @@ def resubmit_recursively(config, jobtype=None, list_of_clusters=None, nextrun_in def _increment_date_and_run_number(config): + """ + - Incrementing + - date by adding "delta_date" to "cuirrent_date" + - run_number by adding +1 + - Updating config + + Arguments + --------- + config : dict + + Returns + ------- + config : dict + """ config["general"]["run_number"] += 1 config["general"]["current_date"] += config["general"]["delta_date"] @@ -216,6 +398,17 @@ def _increment_date_and_run_number(config): def _write_date_file(config): # self, date_file=None): + """ + Writes new date file for experiment. + + Arguments + --------- + config : dict + + Returns + ------- + config : dict + """ # monitor_file = config["general"]["logfile"] monitor_file = logfiles.logfile_handle diff --git a/src/esm_runscripts/sim_objects.py b/src/esm_runscripts/sim_objects.py index 41c8be5e3..cd465d393 100644 --- a/src/esm_runscripts/sim_objects.py +++ b/src/esm_runscripts/sim_objects.py @@ -15,6 +15,7 @@ import esm_parser +import pdb class SimulationSetup(object): @@ -125,43 +126,42 @@ def __call__(self, kill_after_submit=True): # self.pseudocall(kill_after_submit) # call to observe here.. org_jobtype = str(self.config["general"]["jobtype"]) - self.config = logfiles.initialize_logfiles(self.config, org_jobtype) - - if self.config["general"]["submitted"]: - old_stdout = sys.stdout - old_stderr = sys.stderr - sys.stdout = logfiles.logfile_handle - sys.stderr = logfiles.logfile_handle - - if self.config["general"]["jobtype"] == "prepcompute": - self.prepcompute() - elif self.config["general"]["jobtype"] == "tidy": - self.tidy() - elif self.config["general"]["jobtype"] == "viz": - self.viz() - elif self.config["general"]["jobtype"].startswith("observe"): - pid = self.config["general"]["command_line_config"].get( - "launcher_pid", -666 - ) - if not pid == -666: - self.observe() - - self.config["general"]["jobtype"] = self.config["general"][ - "jobtype" - ].replace("observe_", "") - # that last line is necessary so that maybe_resubmit knows which - # cluster to look up in the workflow - - else: - self.assembler() + # write *.run file + # submit batch script resubmit.maybe_resubmit(self.config) - self.config = logfiles.finalize_logfiles(self.config, org_jobtype) - if self.config["general"]["submitted"]: - sys.stdout = old_stdout - sys.stderr = old_stderr + if self.config["general"]["task"] == "run_phase": + # Writes to general log file and creates a (global) logile handle to logfile for current jobtype + self.config = logfiles.initialize_logfiles(self.config, org_jobtype) + + # if not check run??? + # set stdout and stderr to lofile + if self.config["general"]["submitted"]: + old_stdout = sys.stdout + old_stderr = sys.stderr + sys.stdout = logfiles.logfile_handle + sys.stderr = logfiles.logfile_handle + + if self.config["general"]["task"].startswith("observe"): + pid = self.config["general"]["command_line_config"].get( + "launcher_pid", -666 + ) + if not pid == -666: + self.observe() + else: + try: + getattr(self, self.config["general"]["jobtype"])() + except AttributeError: + print(f"No method for jobtype {self.config['general']['jobtype']} found.") + + # if this line is reached, the run is submitted and running or finished + self.config = logfiles.finalize_logfiles(self.config, org_jobtype) + + if self.config["general"]["submitted"]: + sys.stdout = old_stdout + sys.stderr = old_stderr if kill_after_submit: if self.config["general"].get("experiment_over", False): diff --git a/src/esm_runscripts/workflow.py b/src/esm_runscripts/workflow.py index b677ea507..13d3c0f55 100644 --- a/src/esm_runscripts/workflow.py +++ b/src/esm_runscripts/workflow.py @@ -1,8 +1,699 @@ -import sys, copy, os +import copy import esm_parser +from . import batch_system + +#import pygraphviz as pgv +import pdb + + +class Workflow: + """A workflow class.""" + + def __init__(self, workflow_yaml): + """ + Create a new workflow. + + Parameters + ---------- + workflow_yaml : dict + Dictionary from defaults.yaml to initialize workflow + for default phases. + + Returns + ------- + none + """ + + # TODO: check if key is in workflow_yaml dict + self.phases = [] # list for default phases (defined in defauls.yaml) + self.user_phases = [] # list of user phases (collected by collect_all_user_phases) + self.clusters = {} # dictionary of clusters + + error = False + self.clusters = workflow_yaml.get('clusters', None) + self.default_cluster = workflow_yaml.get('default_cluster', None) + + # Initialize default workflow phases from defaults.yaml + for phase in workflow_yaml.get('phases', []): + self.phases.append(WorkflowPhase(workflow_yaml['phases'][phase])) + +# if "first_task_in_queue" in workflow_yaml: self.first_task_in_queue = workflow_yaml["first_task_in_queue"] +# else: error = True +# if "last_task_in_queue" in workflow_yaml: self.last_task_in_queue = workflow_yaml["last_task_in_queue"] +# else: error = True +# if "next_run_triggered_by" in workflow_yaml: self.next_run_triggered_by = workflow_yaml["next_run_triggered_by"] +# else: error = True +# if "default_cluster" in workflow_yaml: self.default_cluster = workflow_yaml["default_cluster"] +# else: error = True +# +# if error: +# err_msg = ( +# f"Missing workflow keywords. " +# f"Make sure the following keywords are set in defaults.yaml: " +# f"``first_task_in_queue``, ``last_task_in_queue``, ``next_run_triggered_by``." +# ) +# esm_parser.user_error("ERROR", err_msg) + + def get_workflow_phase_by_name(self, phase_name): + """ + Returns phase of phase_name + + Arguments + --------- + self : class Workflow + phase_name : str (name of the phase to be returned + + Returns + ------- + phase : class phase or user_phase + """ + for phase in self.phases + self.user_phases: + if phase["name"] == phase_name: + return phase + + def get_phases_values_list(self, phase_type, keyword): + """ + Returns a certain attribute for all phases as a list. + + Parameters + ---------- + phase_type : str + ``default`` or ``user`` + keyword : str + + Returns + ------- + phases_values : list + """ + if phase_type == 'user': + phases_values = [phase[keyword] for phase in self.user_phases] + else: + phases_values = [phase[keyword] for phase in self.phases] + + return phases_values + + +# def set_default_nproc(self, config): +# """ +# Calculating the number of mpi tasks for default phases and each component/model/script +# +# Parameters +# ---------- +# config : dict +# +# Returns +# ------- +# self : Workflow object +# """ +# +# # Get the sum of all mpi tasks +# tasks = calc_number_of_tasks(config) +# +# # Write this number of tasks to phase, if +# # phase will be submitted to batch system +# for ind, phase in enumerate(self.phases): +# if phase["submit_to_batch_system"]: +# set_value(phase, "nproc", tasks) +# +# return self + + def set_workflow_attrib(self, attrib, value): + """ + Sets a workflow attribute. + + Parameters + ---------- + attrib : str + value : + + Returns + ------- + None + """ + + if type(getattr(self, attrib)).__name__ == list: + self.__dict__[attrib].append(value) + else: + self.__setattr__(attrib, value) + + def check_if_keyword_is_valid(self, keyword): + """ + Checks if the key given for a user workflow is valid. + Only keywords are allowed, that are already set during + initialization. + + Parameters + ---------- + keyword : str + + Returns + ------- + true or false + """ + + return hasattr(self, keyword) + + def collect_all_user_phases(self, config): + """ + Collect all workflows defined in config files. + + Parameters + ---------- + self : Workflow object + config : dict + + Returns + ------- + self : Workflow object + """ + user_workflow_phases = [] + user_workflow_phases_names = [] + user_workflow_next_run_triggered_by = [] + for model in config: + if "workflow" in config[model]: + w_config = config[model]["workflow"] + if "phases" in w_config: + # check if there are still workflow keywords set (except 'phases') + for key, value in w_config.items(): + if not key == "phases": + err_msg = f"``{key}`` is not allowed to be set for a workflow." + esm_parser.user_error("ERROR", err_msg) + for phase_name in w_config["phases"]: + # each phase (of a model/setup) needs to have an unique name + # same phases of the same model/setup defined in different config files + # are overwritten by the usual config file hierarchy + # user phases are not alowed to have the same name as default phases (e.g. compute) + + # check if ``new_phase`` is already defined as a default phase + # look for the name of the current phase in the list of default phase names + # if found, raise exception + + if phase_name in self.get_phases_values_list("default", "name"): + err_msg = ( + f"The user phase ``{phase_name}`` " + f"has the same name as a default workflow phase. " + f"This is not allowed." + ) + esm_parser.user_error("ERROR", err_msg) + + # check if the name of the new user phase (for a model/setup) does not already exist + # (for another model/setup). + if phase_name in user_workflow_phases_names: + err_msg = ( + f"Two workflow phases have the same name " + f"``{phase_name}``." + ) + esm_parser.user_error("ERROR", err_msg) + + # if user phase (for each setup/model) has a non-default and unique name + else: + phase_config = copy.deepcopy(w_config["phases"][phase_name]) + # add phase name + phase_config["name"] = phase_name + # set cluster of physe to default cluster if not set + phase_config["cluster"] = phase_config.get("cluster", self.default_cluster) + # make sure that batch_or_shell is set to batch if submit_to_batch is true + # should not be set by user. TODO: Remove from documentation + #if phase_config.get("submit_to_batch_system", False): + # phase_config["batch_or_shell"] = "batch" + # # check if run_on_queue is given if submit_to_sbatch is true +# # if not phase_config.get("run_on_queue", False): +# # err_msg = f"No value for target queue given by ``run_on_queue`` for phase ``{phase_name}``." +# # esm_parser.user_error("ERROR", err_msg) + #else: + # phase_config["batch_or_shell"] = "shell" + + # create a new user phase object for ``phase`` + new_phase = WorkflowPhase(phase_config) + + # append it to the list of user phases of the workflow + user_workflow_phases.append(new_phase) + user_workflow_phases_names.append(phase_name) + + # collect all user phases that are set to trigger the next run + if phase_config.get("trigger_next_run", False): + user_workflow_next_run_triggered_by.append(phase_name) + # check if more than one user phase has set trigger_next_run to true + #if len(user_workflow_next_run_triggered_by) > 1: + # err_msg = ( + # f"More than one phase is set to " + # f"trigger the next run: ``{user_workflow_next_run_triggered_by}``. " + # f"Only set ``trigger_next_run: True`` for one phase." + # ) + # esm_parser.user_error("ERROR", err_msg) + #elif user_workflow_next_run_triggered_by: + # self.set_workflow_attrib("next_run_triggered_by", user_workflow_next_run_triggered_by[0]) + + # add user phases to workflow + self.set_workflow_attrib("user_phases", user_workflow_phases) + + # check if there are unknown phases, if yes, will give error exception + unknown_phases = self.check_unknown_phases() + if unknown_phases: + unknowns = ', '.join(unknown_phases) + err_msg = ( + f"Unknown phase(s) ``{unknowns}`` defined as ``run_before`` " + f"or ``run_after``." + ) + esm_parser.user_error("ERROR", err_msg) + + # check if run_after or run_before is set for each user phase + # if not, run_after will be set to last default phase + for user_phase in self.user_phases: + if not user_phase["run_before"] and not user_phase["run_after"]: + set_value(user_phase, "run_after", self.phases[-1]["name"]) + err_msg = ( + f"No value given for ``run_after`` or ``run_before`` " + f"of user phase ``{user_phase['name']}``. " + f"Please set either run_after or run_before." + ) + esm_parser.user_error("NOTE", err_msg) + + return self + + def cluster_phases(self): + """ + Merge phases into clusters. + + - Appends new/user phases to cluster + - If a cluster is not defined for a new phase, it wil be appended to the default cluster (sim_cluster) + - If a cluster is defined for a new phase, it will be added to the workflow object + """ + clusters = self.clusters + # create an empty phases list for each new cluster + #for cluster_name in self.get_phases_values_list("default", "cluster") + self.get_phases_values_list("user", "cluster"): + for cluster_name in self.get_phases_values_list("user", "cluster"): + if cluster_name not in clusters: + clusters[cluster_name] = {"phases": []} + + # collect all new/user phases that are within the same cluster + #for phase in self.phases + self.user_phases: + # append user phases to their clusters and create new cluster + #for phase in self.user_phases: + # clusters[phase["cluster"]]["phases"].append(phase["name"]) + +# for cluster_name in clusters: +# nproc = nproc_sum = nproc_max = 0 +# # if only one phase in cluster +# if len(clusters[cluster_name]["phases"]) == 1: +# phase_name = clusters[cluster_name]["phases"][0] +# phase = self.get_workflow_phase_by_name(phase_name) +# clusters[cluster_name].update(phase) +# # if more than one phase are within the same cluster +# else: +# # fill in default phase keys for each cluster to cluster dictionary +# #clusters[cluster_name].update(WorkflowPhase({})) +# # create a list of all phases (dicts) that are within the same cluster +# phases_list = [] +# for phase_name in clusters[cluster_name]["phases"]: +# phases_list.append(self.get_workflow_phase_by_name(phase_name)) +# +# # check for inconsistencies of phase keywords within a cluster +# # collect all values for keywords of WorkflowPhase in a dictionary 'keywords' +# #keywords = {} +# #for key in WorkflowPhase({}): +# # keywords[key] = [] +# # # append keyword of a phase only if not already in keywords[key] +# # [keywords[key].append(item) for item in [phase[key] for phase in phases_list] if item not in keywords[key]] +# # # if there are no inconsistencies, all phases have the same values for a keyword 'key' +# # if len(keywords[key]) == 1: +# # clusters[cluster_name][key] = keywords[key][0] +# # # if different phases have set different values for the same keyword +# # else: +# # # if keyword is of type list, just add the list into the cluster +# # if type(clusters[cluster_name][key]) is list: +# # clusters[cluster_name][key] = keywords[key] +# # # otherwise select a single value for keyword +# # else: +# # # TODO: Explain this exception handling more +# # if key not in ["name", "script", "script_dir", "order_in_cluster", "nproc", "trigger_next_run"]: +# # err_msg = ( +# # f"Mismatch for {key}") +# # esm_parser.user_error("ERROR", err_msg) +# # elif key == "name": +# # # set keyword name to the name of the cluster +# # clusters[cluster_name]["name"] = cluster_name +# # elif key == "trigger_next_run": +# # # set key of cluster to True if key for any (at least one) of the phases is set to True +# # clusters[cluster_name][key] = any(keywords[key]) +## # elif key in ["script", "script_dir"]: +## # for ind, phase_name in enumerate(clusters[cluster_name]["phases"]): +## # phase = self.get_workflow_phase_by_name(phase_name) +## # phase_dict = {phase["name"]: {"script": phase["script"], "script_dir": phase["script_dir"]}} +## # clusters[cluster_name]["phases"][ind] = phase_dict +# # else: +# # # if key is set different for each phase in same cluster set to fill value (e.g. for script, scriptdir) +# # clusters[cluster_name][key] = "check phase" +# +# ## calculate nproc if cluster is to be submitted to sbatch system +# #for phase in phases_list: +# # nproc_sum += phase["nproc"] +# # nproc_max = max(phase["nproc"], nproc_max) +# +# # if clusters[cluster_name].get("submit_to_batch_system", False): +# # if phase["order_in_cluster"] == "concurrent": +# # if clusters[cluster_name]["order_in_cluster"] is None: +# # clusters[cluster_name]["order_in_cluster"] = "concurrent" +# # nproc = nproc_sum +# # else: +# # clusters[cluster_name]["order_in_cluster"] = "sequential" +# # nproc = nproc_max +# #clusters[cluster_name]["nproc"] = nproc +# # write clusters dictionary to workflow object attribute +# # update cluster phases +# cluster_2 = self.clusters +# self.set_workflow_attrib("clusters", clusters) + return self + + def write_to_config(self, config): + """ + Write to config. + TODO: Rename ``subjobs`` to ``phases``. But this needs changes also in resubmit.py and other files??? + TODO: Put workflow object into config. + """ + # Delete unnecessary config workflow entries (e.g. in general) + if "workflow" in config["general"]: + del config["general"]["workflow"] + + config["general"]["workflow"] = {} + config["general"]["workflow"].update(self.__dict__) + + # Write clusters + config["general"]["workflow"]["subjob_clusters"] = {} + for cluster in self.clusters: + config["general"]["workflow"]["subjob_clusters"][cluster] = {} + config["general"]["workflow"]["subjob_clusters"][cluster]["subjobs"] = [] + for phase_name in self.clusters[cluster]["phases"]: + config["general"]["workflow"]["subjob_clusters"][cluster]["subjobs"].append(phase_name) + for att in self.clusters[cluster]: + config["general"]["workflow"]["subjob_clusters"][cluster][att] = self.clusters[cluster][att] + + # Write subjobs/phases + config["general"]["workflow"]["subjobs"] = {} + for phase in self.phases + self.user_phases: + config["general"]["workflow"]["subjobs"][phase["name"]] = {} + for key, val in phase.items(): + config["general"]["workflow"]["subjobs"][phase["name"]][key] = val + + # Delete phases and user_phases + del config["general"]["workflow"]["phases"] + del config["general"]["workflow"]["user_phases"] + + + return config + + def check_user_workflow_dependency(self): + """ + Check whether the user defined workflow phases are independent + from each other or not. + + Arguments + --------- + self : Workflow object + + Returns + ------- + independent : bool (default: False) + """ + independent = False + user_phases_names = self.get_phases_values_list('user', 'name') + run_after_list = self.get_phases_values_list('user', 'run_after') + run_before_list = self.get_phases_values_list('user', 'run_before') + + # All user phases are independent from each other, if + # none of the ``user_phases_names`` are found in the union of + # ``run_before_list`` and ``run_after_list`` + # That means alls user phases can be run independent from each other. + if not set(user_phases_names).intersection(set(run_after_list).union(set(run_before_list))): + independent = True + else: + # TODO: What todo in other case? + independent = False + + return independent + + def check_unknown_phases(self): + """ + Check if any user phase keyword (run_afteer, run_before) points to an unknown workflow phase. + + Parameters + ---------- + self : Workflow object + + Returns + ------- + unknown_phases : set + """ + unknown_phases = [] + phases_names = self.get_phases_values_list('default', 'name') # list of names of all default phases + user_phases_names = self.get_phases_values_list('user', 'name') # list of name of all user phases + run_after = self.get_phases_values_list('user', 'run_after') # list of all run_after values for all user phases + run_before = self.get_phases_values_list('user', 'run_before') # list of all run_before values for all user phases + # Filter out all elements that are None + # ``filter(None, anylist)`` will filter out all items of anylist, + # for which ``if item`` is false (e.g. [], "", None, {}, ''). + # See also https://docs.python.org/3/library/functions.html#filter + run_after_list = list(filter(None, run_after)) + run_before_list = list(filter(None, run_before)) + # Get all phases that are defined as run_after or run_before, + # but do not exist as user or default phase. + # If unknown_phase is not empty, there is a user_phase that defines run_after + # or run_before for a not existing phase. + unknown_phases = set(run_after_list).union(set(run_before_list)).difference(set(user_phases_names).union(set(phases_names))) + + return unknown_phases + + def order_phases_in_clusters(self): + """ + Put the phases and clusters in the right order. + + Parameters + ---------- + self : Workflow object + + Returns + ------- + self : Workflow object + """ +# correct workflow attributes (``last_task_in_queue``, ``first_task_in_queue``, ``next_run_triggered``) + + # next_run_triggered_by is always the last phase + + # check if next_triggered is set to a default or user phase + # if user phase + # get last default phase and correct next_submit and run_before + # get first default phase and correct run_after, called_from + # correct last_task_in_queue of workflow + + + clusters = self.clusters + for cluster_name in clusters: + cluster = clusters[cluster_name] + user_phases = self.user_phases + for user_phase in user_phases: + user_run_after = user_phase.get("run_after", None) + if user_run_after: + index_run_after = cluster["phases"].index(user_run_after) + print(len(cluster["phases"])) + if len(cluster["phases"]) > index_run_after+1: + if cluster["phases"][index_run_after+1] != user_phase["name"]: + cluster["phases"].insert(index_run_after+1, user_phase["name"]) + else: + cluster["phases"].append(user_phase["name"]) + #old_next_triggered = cluster["next_run_triggered_by"] + #triggered_next_run_phase = self.get_workflow_phase_by_name(old_next_triggered) +# if old_next_triggered not in self.get_phases_values_list("default", "name"): +# first_task_name = self.first_task_in_queue +# first_phase = self.get_workflow_phase_by_name(first_task_name) +# old_last_task_name = self.last_task_in_queue +# old_last_phase = self.get_workflow_phase_by_name(old_last_task_name) +# +# remove_value(old_last_phase, "next_submit", first_phase["name"]) +# set_value(old_last_phase, "next_submit", old_next_triggered) +# set_value(old_last_phase, "run_before", old_next_triggered) +# set_value(old_last_phase, "trigger_next_run", False) +# +# set_value(self.clusters[old_last_phase["cluster"]], "next_submit", triggered_next_run_phase["cluster"], if_not_in=True) +# set_value(self.clusters[old_last_phase["cluster"]], "run_before", triggered_next_run_phase["cluster"]) +# set_value(self.clusters[old_last_phase["cluster"]], "trigger_next_run", False) +# +# set_value(first_phase, "run_after", old_next_triggered) +# set_value(first_phase, "called_from" ,old_next_triggered) +# set_value(self.clusters[first_phase["cluster"]], "run_after", triggered_next_run_phase["cluster"]) +# set_value(self.clusters[first_phase["cluster"]], "called_from", triggered_next_run_phase["cluster"]) +# +# set_value(self.clusters[triggered_next_run_phase["cluster"]], "next_submit" , first_phase["cluster"]) +# self.clusters[triggered_next_run_phase["cluster"]]["run_before"] = first_phase["cluster"] +# self.clusters[triggered_next_run_phase["cluster"]]["run_after"] = old_last_phase["cluster"] +# +# self.set_workflow_attrib("last_task_in_queue", old_next_triggered) + + +# intergrate new user phases by correcting next_submit, called_from, run_after, run_before + + # Set "next_submit" and "called_from" + # "next_submit" which phase/cluster will be called next (run_after of the next phase) + # "called_from" name of previous phase, run_after of current phase + + # Create a dict of all phases and for all clusters with empty lists +# next_submits_phases = {} +# next_submits_clusters = {} +# for phase in self.phases + self.user_phases: +# next_submits_phases[phase["name"]] = [] +# next_submits_clusters[phase["cluster"]] = [] +# +# for phase2 in self.phases + self.user_phases: +# if phase2.get("run_after", None): +# if phase2["name"] not in next_submits_phases[phase2["run_after"]]: +# next_submits_phases[phase2["run_after"]].append(phase2["name"]) # use set_value ??? +# set_value(phase2, "called_from",phase2["run_after"]) +# if self.clusters[phase2["cluster"]].get("run_after", None): +# if phase2["cluster"] not in next_submits_clusters[self.clusters[phase2["cluster"]]["run_after"]]: +# next_submits_clusters[self.clusters[phase2["cluster"]]["run_after"]].append(phase2["cluster"]) +# set_value(self.clusters[phase2["cluster"]], "called_from", self.clusters[phase2["cluster"]]["run_after"]) +# else: +# # if only run_before is set, e.g. to add a phase at the beginning of a run +# if phase2.get("run_before", None): +# if phase2["run_before"] == self.first_task_in_queue: +# old_first_phase = self.get_workflow_phase_by_name(self.first_task_in_queue) +# last_phase = self.get_workflow_phase_by_name(self.last_task_in_queue) +# next_submits_phases[phase2["name"]].append(self.first_task_in_queue) +# if self.first_task_in_queue not in next_submits_clusters[phase2["cluster"]]: +# next_submits_clusters[phase2["cluster"]].append(self.first_task_in_queue) +# next_submits_clusters[self.last_task_in_queue].append(phase2["cluster"]) +# next_submits_phases[self.last_task_in_queue].append(phase2["name"]) +# next_submits_phases[self.last_task_in_queue].remove(self.first_task_in_queue) +# next_submits_clusters[last_phase["cluster"]].remove(old_first_phase["cluster"]) +# set_value(phase2, "run_after", self.last_task_in_queue) +# set_value(last_phase, "run_before", phase2["name"]) +# set_value(self.clusters[last_phase["cluster"]], "run_before", phase2["name"]) +# set_value(self.clusters[old_first_phase["cluster"]], "run_after", phase2["name"]) +# set_value(self.clusters[old_first_phase["cluster"]], "called_from", phase2["name"]) +# set_value(self.clusters[phase2["cluster"]], "called_from",last_phase["cluster"]) +# set_value(self.clusters[phase2["cluster"]], "run_after", last_phase["cluster"]) +# set_value(last_phase, "next_submit", phase2["name"]) +# +# self.set_workflow_attrib("first_task_in_queue", phase2["name"]) +# +# for cluster in self.clusters: +# if next_submits_clusters[cluster]: +# self.clusters[cluster]["next_submit"] = next_submits_clusters[cluster] +# +# for phase3 in self.phases + self.user_phases: +# if next_submits_phases[phase3["name"]]: +# phase3["next_submit"] = next_submits_phases[phase3["name"]] + + return self + + + def get_workflow_commands_for_run(self, config): + """ + Gets the command for each workflow phase and writes in into config. + + Parameters + ---------- + self: workflow object + config: dict + + Returns + ------- + config: dict + """ + phases = self.phases + self.user_phases + phase_type = "" + run_command = "" + run_commands = [] + + for phase in phases: + phase_type = phase.get("phase_type", None) + phase_name = phase.get("name", "") + run_command = ' '.join(batch_system.get_run_commands(config, phase_name, phase_type)) + print(run_command) + phase["run_command"] = run_command + run_commands.append(run_command) + + setattr(self, 'run_commands', run_commands) + # Write workflow object + config["general"]["workflow"]["object"] = self + return self + + + def prepend_newrun_job(self): + """ + - Creates a new cluster "newrun" if first_task_in_queue is not of + type 'SimulationSetup' + + Looks for subjob_cluster that are set by user workflow (not a 'SimulationSetup') + and are not of type 'SimulationSetup'. + + Parameters + ---------- + self : Workflow object + + Returns + ------- + self : Workflow object + """ + first_task_name = self.first_task_in_queue + first_phase = self.get_workflow_phase_by_name(first_task_name) + + if not first_phase["batch_or_shell"] == "SimulationSetup": + + last_task_name = self.last_task_in_queue + last_phase = self.get_workflow_phase_by_name(last_task_name) + + # Create new default phase object + config_new_first_phase = { + "name": "newrun", + "next_submit": [first_phase["cluster"]], + "called_from": last_phase["cluster"], + "run_before": first_phase["cluster"], + "run_after": last_phase["cluster"], + "cluster": "newrun", + "batch_or_shell": "SimulationSetup", + "nproc": 1 + } + new_first_phase = WorkflowPhase(config_new_first_phase) + + # reset last_task attributes + set_value(last_phase, "next_submit", "newrun") + set_value(self.clusters[last_phase["cluster"]], "next_submit", "newrun", reset=True) + set_value(self.clusters[last_phase["cluster"]], "run_before", "newrun") + self.clusters[new_first_phase["cluster"]] = new_first_phase + set_value(self.clusters[new_first_phase["cluster"]], "phases", ["newrun"], new=True) + remove_value(last_phase, "next_submit", first_phase["cluster"]) + + # reset first_task attributes + set_value(first_phase, "called_from", "newrun") + set_value(first_phase, "run_after", "newrun") + set_value(self.clusters[first_phase["cluster"]], "called_from", "newrun") + set_value(self.clusters[first_phase["cluster"]], "run_after", "newrun") + + # reset workflow attributes + self.set_workflow_attrib("first_task_in_queue", "newrun") + + # Set new phase to beginning of default phase list + self.phases.insert(0, new_first_phase) + + return self + def skip_cluster(cluster, config): + """ + Checks if a phase/cluster can be skipped. + Needed keywords: run_only, skip_chunk_number + Is called from resubmit.py + + Parameters + ---------- + self + config : dict + + Returns + ------- + True or False + """ gw_config = config["general"]["workflow"] clusterconf = gw_config["subjob_clusters"][cluster] @@ -36,398 +727,299 @@ def skip_cluster(cluster, config): return False -def assemble_workflow(config): - # - config = init_total_workflow(config) - config = collect_all_workflow_information(config) - config = complete_clusters(config) - config = order_clusters(config) - config = prepend_newrun_job(config) - - if config["general"]["jobtype"] == "unknown": - config["general"]["command_line_config"]["jobtype"] = config["general"][ - "workflow" - ]["first_task_in_queue"] - config["general"]["jobtype"] = config["general"]["workflow"][ - "first_task_in_queue" - ] - - return config - - -def display_nicely(config): - esm_parser.pprint_config(config["general"]["workflow"]) - return config - - -def prepend_newrun_job(config): - gw_config = config["general"]["workflow"] - first_cluster_name = gw_config["first_task_in_queue"] - first_cluster = gw_config["subjob_clusters"][first_cluster_name] - - if not first_cluster.get("batch_or_shell", "Error") == "SimulationSetup": - - last_cluster_name = gw_config["last_task_in_queue"] - last_cluster = gw_config["subjob_clusters"][last_cluster_name] - - new_first_cluster_name = "newrun" - new_first_cluster = { - "newrun": { - "called_from": last_cluster_name, - "run_before": first_cluster_name, - "next_submit": [first_cluster_name], - "subjobs": ["newrun_general"], - "batch_or_shell": "SimulationSetup", - } - } +class WorkflowPhase(dict): + """A workflow phase class.""" + + def __init__(self, phase): + # defaults + self["name"] = None + self["script"] = None + self["script_dir"] = None + self["nproc"] = 1 # needed + self["run_before"] = None + self["run_after"] = None +# self["trigger_next_run"] = False # needed +# self["submit_to_batch_system"] = False # needed + self["run_on_queue"] = None + self["cluster"] = None + self["next_submit"] = [] # needed + self["called_from"] = None # needed +# self["batch_or_shell"] = "SimulationSetup" # needed + self["phase_type"] = "SimulationSetup" # needed + self["order_in_cluster"] = None # needed ??? + self["run_only"] = None + self["skip_chunk_number"] = None + self["skip_run_number"] = None + self["call_function"] = None + self["env_preparation"] = None + self["run_command"] = None + + # check if phase keywords are valid + for key, value in phase.items(): + if key not in self: + err_msg = ( + f"``{key}`` of workflow phase " + f"``{phase['name']}`` is not a valid keyword " + f"of a workflow phase." + ) + esm_parser.user_error("ERROR", err_msg) - last_cluster["next_submit"].append("newrun") - last_cluster["next_submit"].remove(first_cluster_name) + super().__init__(phase) - first_cluster["called_from"] = "newrun" + # make sure batch_or_shell is batch for sbatch jobs +# if self.get("submit_to_batch_system", False): +# self["batch_or_shell"] = "batch" +# else: +# self["batch_or_shell"] = "shell" - gw_config["first_task_in_queue"] = "newrun" + # set cluster to phase name, if not given + if self.get("cluster", None) is None: + self["cluster"] = self["name"] - new_subjob = { - "newrun_general": { - "nproc": 1, - "called_from": last_cluster_name, - "run_before": first_cluster_name, - "next_submit": [first_cluster_name], - "subjob_cluster": "newrun", - } - } +def set_value(phase, keyword, value, if_not_in=False, reset=False, new=False): + """ + Set a value for a given keyword. + + Parameters + ---------- + phase : dict or phase object + Phase or cluster + keyword : str + value : str or list + if_not_in : boolean (optional) + False (default) - if value should always be appended. + True - if value should only be appended if not already in value list. + reset : boolean (optional) + False (default) - if only append to value list. + True - if value list should be reset with new value list. + new : boolean (optional) + False (default) - for keywords that are already in phase. + True - if a new keyword should be created in phase and set to value. + """ + if not new: + if type(phase[keyword]) == list: + if if_not_in: + if value not in phase[keyword]: + phase[keyword].append(value) + elif reset: + phase[keyword] = [value] + else: + phase[keyword].append(value) + else: + phase[keyword] = value + else: + phase[keyword] = value - gw_config["subjob_clusters"].update(new_first_cluster) - gw_config["subjobs"].update(new_subjob) +def remove_value(phase, keyword, value): + """ + Remove value for keyword from phase. + + Parameters + ---------- + phase : dict or phase object + Phase or cluster + keyword : str + value : str + """ + if type(phase[keyword]) == list: + phase[keyword].remove(value) + else: + phase[keyword] = None - return config - # +def assemble_workflow(config): + from . import Workflow + """ + Assembles the workflow tasks. + Is called from the plugin recipe prepcompute. + Parameters + ---------- + config : dict -def order_clusters(config): - gw_config = config["general"]["workflow"] + Returns + ------- + config : dict + """ + # - Generate default workflow object and + # - initialize default workflow phases from defaults.yaml + workflow = init_default_workflow(config) - for subjob_cluster in gw_config["subjob_clusters"]: - if not "next_submit" in gw_config["subjob_clusters"][subjob_cluster]: - gw_config["subjob_clusters"][subjob_cluster]["next_submit"] = [] + # - Collect all user phases from runscript and config files + workflow = workflow.collect_all_user_phases(config) - for subjob_cluster in gw_config["subjob_clusters"]: - if not "run_after" in gw_config["subjob_clusters"][subjob_cluster]: - if not ("run_before" in gw_config["subjob_clusters"][subjob_cluster]): + # - Cluster phases + workflow = workflow.cluster_phases() - print(f"Don't know when to execute cluster {subjob_cluster}.") - print(gw_config) - sys.exit(-1) + # - Order user phases into default phases wrt. phase keywords + workflow = workflow.order_phases_in_clusters() - if "run_after" in gw_config["subjob_clusters"][subjob_cluster]: - if "run_before" in gw_config["subjob_clusters"][subjob_cluster]: - print( - f"Specifying both run_after and run_before for cluster {subjob_cluster} may lead to problems." - ) - print(f"Please choose.") - sys.exit(-1) - if ( - not gw_config["subjob_clusters"][subjob_cluster]["run_after"] - in gw_config["subjob_clusters"] - ): - print(f"Unknown cluster {gw_config['subjob_clusters'][subjob_cluster]['run_after']}.") - sys.exit(-1) - - calling_cluster = gw_config["subjob_clusters"][subjob_cluster]["run_after"] - - if ( - not subjob_cluster - in gw_config["subjob_clusters"][calling_cluster]["next_submit"] - ): - gw_config["subjob_clusters"][calling_cluster]["next_submit"].append( - subjob_cluster - ) - gw_config["subjob_clusters"][subjob_cluster][ - "called_from" - ] = calling_cluster - - if calling_cluster == gw_config["last_task_in_queue"]: - gw_config["last_task_in_queue"] = subjob_cluster - - if "run_before" in gw_config["subjob_clusters"][subjob_cluster]: - if ( - not gw_config["subjob_clusters"][subjob_cluster]["run_before"] - in gw_config["subjob_clusters"] - ): - print(f"Unknown cluster {gw_config['subjob_clusters'][subjob_cluster]['run_before']}.") - sys.exit(-1) - - called_cluster = gw_config["subjob_clusters"][subjob_cluster]["run_before"] - - if ( - not called_cluster - in gw_config["subjob_clusters"][subjob_cluster]["next_submit"] - ): - gw_config["subjob_clusters"][subjob_cluster]["next_submit"].append( - called_cluster - ) - gw_config["subjob_clusters"][called_cluster]["called_from"] = subjob_cluster + # - create new first phase of type SimulationSetup, if first_task_in_queue is + # a user phase (type batch or shell) + #workflow = workflow.prepend_newrun_job() - if called_cluster == gw_config["first_task_in_queue"]: - gw_config["first_task_in_queue"] = subjob_cluster - if "next_run_triggered_by" in gw_config: - gw_config["last_task_in_queue"] = gw_config["next_run_triggered_by"] + # - write the workflow to config + # - Remove old worklow from config + # - This needs to be done before getting the run commands, + # because some function need workflow to be already set in config + config = workflow.write_to_config(config) - first_cluster_name = gw_config["first_task_in_queue"] - first_cluster = gw_config["subjob_clusters"][first_cluster_name] - last_cluster_name = gw_config["last_task_in_queue"] - last_cluster = gw_config["subjob_clusters"][last_cluster_name] + workflow = workflow.get_workflow_commands_for_run(config) + - if not first_cluster_name in last_cluster.get("next_submit", ["Error"]): - last_cluster["next_submit"].append(first_cluster_name) - if not last_cluster_name in first_cluster.get("called_from", ["Error"]): - first_cluster["called_from"] = last_cluster_name + # Set "jobtype" for the first task??? + # NOTE: This is either first default phase or + # newrun??? Can't this not be set in prepend_newrun then? + #if config["general"]["jobtype"] == "unknown": + # config["general"]["command_line_config"]["jobtype"] = config["general"][ + # "workflow" + # ]["clusters"]["sim_cluster"].get("first_task_in_queue", None) + # config["general"]["jobtype"] = config["general"]["workflow"][ + # "clusters"]["sim_cluster"].get("first_task_in_queue", None) return config - -def complete_clusters(config): - gw_config = config["general"]["workflow"] - - # First, complete the matching subjobs <-> clusters - - for subjob in gw_config["subjobs"]: - subjob_cluster = gw_config["subjobs"][subjob]["subjob_cluster"] - if not subjob_cluster in gw_config["subjob_clusters"]: - gw_config["subjob_clusters"][subjob_cluster] = {} - - if not "subjobs" in gw_config["subjob_clusters"][subjob_cluster]: - gw_config["subjob_clusters"][subjob_cluster]["subjobs"] = [] - - gw_config["subjob_clusters"][subjob_cluster]["subjobs"].append(subjob) - - # Then, complete the resource information per cluster - # determine whether a cluster is to be submitted to a batch system - - for subjob_cluster in gw_config["subjob_clusters"]: - nproc_sum = nproc_max = 0 - clusterconf = gw_config["subjob_clusters"][subjob_cluster] - for subjob in clusterconf["subjobs"]: - subjobconf = gw_config["subjobs"][subjob] - - clusterconf = merge_single_entry_if_possible( - "submit_to_batch_system", subjobconf, clusterconf - ) - clusterconf = merge_single_entry_if_possible( - "order_in_cluster", subjobconf, clusterconf - ) - - if subjobconf.get("submit_to_batch_system", False): - clusterconf["batch_or_shell"] = "batch" - elif subjobconf.get("script", False): - clusterconf["batch_or_shell"] = "shell" - - clusterconf = merge_single_entry_if_possible( - "run_on_queue", subjobconf, clusterconf - ) - clusterconf = merge_single_entry_if_possible( - "run_after", subjobconf, clusterconf - ) - clusterconf = merge_single_entry_if_possible( - "run_before", subjobconf, clusterconf - ) - clusterconf = merge_single_entry_if_possible( - "run_only", subjobconf, clusterconf - ) - clusterconf = merge_single_entry_if_possible( - "skip_run_number", subjobconf, clusterconf - ) - clusterconf = merge_single_entry_if_possible( - "skip_chunk_number", subjobconf, clusterconf - ) - - nproc_sum += subjobconf.get("nproc", 1) - nproc_max = max(subjobconf.get("nproc", 1), nproc_max) - - if not "submit_to_batch_system" in clusterconf: - clusterconf["submit_to_batch_system"] = False +def init_default_workflow(config): + """ + Initialize workflow and default phases from defauls.yaml + """ + # 1. Generate default workflow object + # initialize the default workflow as Workflow object + # TODO: Where are these default phases defined? For now I placed it in + # esm_tools/configs/esm_software/esm_runscripts/defaults.yaml + if "defaults.yaml" in config["general"]: + if "workflow" in config["general"]["defaults.yaml"]: + workflow = config["general"]["defaults.yaml"]["workflow"] + phases = config["general"]["defaults.yaml"]["workflow"].get("phases", []) else: - if not "run_on_queue" in clusterconf: - print( - f"Information on target queue is missing in cluster {clusterconf}." - ) - sys.exit(-1) - - if not clusterconf.get("batch_or_shell", False): - clusterconf["batch_or_shell"] = "SimulationSetup" + esm_parser.user_error("ERROR", "No default workflow defined.") + else: + workflow = [] + phases = [] - if not "order_in_cluster" in clusterconf: - clusterconf["order_in_cluster"] = "sequential" + if phases: + workflow = Workflow(workflow) - if clusterconf["order_in_cluster"] == "concurrent": - nproc = nproc_sum - else: - nproc = nproc_max - clusterconf["nproc"] = nproc + # can be removed: default WorkflowPhase are already initialized in initialization of workflow object + #for phase in phases: + # workflow.phases.append(WorkflowPhase(phases[phase])) + else: + esm_parser.user_error("ERROR", "No default workflow phases defined.") + # Note: Should this work also if no default phases are set in such a config + # file, but instead all workflow phases are defined in different configs + # and/or runscripts? + # Where could a user define a different (default) phase list? + # Or should this be changed in defaults.yaml as it is now? + + return workflow + + +#def calc_number_of_tasks(config): +# """ +# Calculates the total number of needed tasks +# in phase compute +# TODO: make this phase method??? Or recipe entry??? +# +# Parameters +# ---------- +# config : dict +# +# Returns +# ------- +# tasks : int +# Number of task for all models +# """ +# +# tasks = 0 +# for model in config["general"]["valid_model_names"]: +# if "nproc" in config[model]: +# tasks += config[model]["nproc"] +# elif "nproca" in config[model] and "nprocb" in config[model]: +# tasks += config[model]["nproca"] * config[model]["nprocb"] +# if "nprocar" in config[model] and "nprocbr" in config[model]: +# if ( +# config[model]["nprocar"] != "remove_from_namelist" +# and config[model]["nprocbr"] != "remove_from_namelist" +# ): +# tasks += config[model]["nprocar"] * config[model]["nprocbr"] +# return tasks + + +def display_workflow(config): + """ + Displays workflow sequence. - return config + Parameters + ---------- + config : dict + Returns + ------- + config : dict + """ -def merge_single_entry_if_possible(entry, sourceconf, targetconf): - if entry in sourceconf: - if entry in targetconf and not sourceconf[entry] == targetconf[entry]: - print(f"Mismatch found in {entry} for cluster {targetconf}") - sys.exit(-1) - targetconf[entry] = sourceconf[entry] - return targetconf - - -def init_total_workflow(config): - # add compute, tidy etc information already here! - - tasks = 0 - for model in config["general"]["valid_model_names"]: - if "nproc" in config[model]: - tasks += config[model]["nproc"] - elif "nproca" in config[model] and "nprocb" in config[model]: - tasks += config[model]["nproca"] * config[model]["nprocb"] - if "nprocar" in config[model] and "nprocbr" in config[model]: - if ( - config[model]["nprocar"] != "remove_from_namelist" - and config[model]["nprocbr"] != "remove_from_namelist" - ): - tasks += config[model]["nprocar"] * config[model]["nprocbr"] - - prepcompute = { - "prepcompute": { - "nproc": 1, - "run_before": "compute", - } - } - - compute = { - "compute": { - "nproc": tasks, - "run_before": "tidy", - "submit_to_batch_system": config["general"].get( - "submit_to_batch_system", True - ), - "run_on_queue": config["computer"]["partitions"]["compute"]["name"], - } - } - - # das ist nur vorübergehend - tidy = { - "tidy": { - "nproc": 1, - "run_after": "compute", - } - } - - if not "workflow" in config["general"]: - config["general"]["workflow"] = {} - if not "subjob_clusters" in config["general"]["workflow"]: - config["general"]["workflow"]["subjob_clusters"] = {} - if not "subjobs" in config["general"]["workflow"]: - config["general"]["workflow"]["subjobs"] = prepcompute - config["general"]["workflow"]["subjobs"].update(compute) - config["general"]["workflow"]["subjobs"].update(tidy) + display_nicely(config) + display_workflow_sequence(config) + + +def display_workflow_sequence(config, display=True): + + first_phase = config["general"]["workflow"]["first_task_in_queue"] + subjobs = config["general"]["workflow"]["subjob_clusters"][first_phase]["subjobs"] + # Note: next_submit points to the next cluster (not phase) + second_phase = config["general"]["workflow"]["subjobs"][first_phase]["next_submit"] + + workflow_order = f"``{first_phase}`` {subjobs}" + + # While first_phase (first_task_in_queue) is not to be called by the next phase (next_submit). + # In other words: If not last phase/cluster is reached. + while first_phase not in second_phase and second_phase: + sec_phase_str = "" + for sec_phase in second_phase: + if config["general"]["workflow"]["subjob_clusters"][sec_phase]["next_submit"]: + second_phase = config["general"]["workflow"]["subjob_clusters"][sec_phase]["next_submit"] + subjobs = config["general"]["workflow"]["subjob_clusters"][sec_phase]["subjobs"] + else: + subjobs = config["general"]["workflow"]["subjob_clusters"][sec_phase]["subjobs"] + if sec_phase_str == "": + sec_phase_str = f"{sec_phase_str} ``{sec_phase}`` {subjobs}" + else: + sec_phase_str = f"{sec_phase_str}, ``{sec_phase}`` {subjobs}" + workflow_order = f"{workflow_order} -> {sec_phase_str}" + # For last phase that would start the next run else: - if not "prepcompute" in config["general"]["workflow"]["subjobs"]: - config["general"]["workflow"]["subjobs"].update(prepcompute) - if not "compute" in config["general"]["workflow"]["subjobs"]: - config["general"]["workflow"]["subjobs"].update(compute) - if not "tidy" in config["general"]["workflow"]["subjobs"]: - config["general"]["workflow"]["subjobs"].update(tidy) - if not "last_task_in_queue" in config["general"]["workflow"]: - config["general"]["workflow"]["last_task_in_queue"] = "tidy" - if not "first_task_in_queue" in config["general"]["workflow"]: - config["general"]["workflow"]["first_task_in_queue"] = "prepcompute" - - if not "next_run_triggered_by" in config["general"]["workflow"]: - config["general"]["workflow"]["next_run_triggered_by"] = "tidy" - - return config + sec_phase_str = "" + # for all cluster in next_submit + for sec_phase in second_phase: + second_phase = config["general"]["workflow"]["subjob_clusters"][sec_phase]["next_submit"] + subjobs = config["general"]["workflow"]["subjob_clusters"][sec_phase]["subjobs"] + if sec_phase_str == "": + sec_phase_str = f"{sec_phase_str} ``{sec_phase}`` {subjobs}" + else: + sec_phase_str = f"{sec_phase_str} and ``{sec_phase}`` {subjobs}" + workflow_order = f"{workflow_order} -> {sec_phase_str}" + + if display: + esm_parser.user_note("Workflow sequence (cluster [phases])", f"{workflow_order}") + else: + workflow_order = workflow_order.replace("``", "") + return workflow_order -def collect_all_workflow_information(config): - for model in config: - if "workflow" in config[model]: - w_config = config[model]["workflow"] - gw_config = config["general"]["workflow"] +def display_nicely(config): + """ + Pretty prints the workflow configuration assembled in config["general"]. + Is called by e.g. ``esm_runscripts runscript.yaml -e -i workflow`` - if "subjob_clusters" in w_config: - for cluster in w_config["subjob_clusters"]: - if cluster in gw_config["subjob_clusters"]: - gw_config["subjob_clusters"][cluster] = merge_if_possible( - w_config["subjob_clusters"][cluster], - gw_config["subjob_clusters"][cluster], - ) - else: - gw_config["subjob_clusters"][cluster] = copy.deepcopy( - w_config["subjob_clusters"][cluster], - ) - - if "subjobs" in w_config: - ref_config = copy.deepcopy(w_config) - for subjob in list(copy.deepcopy(w_config["subjobs"])): - - # subjobs (other than clusters) should be model specific - gw_config["subjobs"][subjob + "_" + model] = copy.deepcopy( - w_config["subjobs"][subjob] - ) - if subjob in gw_config["subjobs"]: - del gw_config["subjobs"][subjob] - # make sure that the run_after and run_before refer to that cluster - for other_subjob in gw_config["subjobs"]: - if "run_after" in gw_config["subjobs"][other_subjob]: - if ( - gw_config["subjobs"][other_subjob]["run_after"] - == subjob - ): - gw_config["subjobs"][other_subjob][ - "run_after" - ] == subjob + "_" + model - if "run_before" in gw_config["subjobs"][other_subjob]: - if ( - gw_config["subjobs"][other_subjob]["run_before"] - == subjob - ): - gw_config["subjobs"][other_subjob][ - "run_before" - ] == subjob + "_" + model - - # if not in another cluster, each subjob gets its own - if ( - not "subjob_cluster" - in gw_config["subjobs"][subjob + "_" + model] - ): - gw_config["subjobs"][subjob + "_" + model][ - "subjob_cluster" - ] = subjob # + "_" + model - - if "next_run_triggered_by" in w_config: - if not gw_config["next_run_triggered_by"] in [ - "tidy", - w_config["next_run_triggered_by"], - ]: - print(f"Mismatch found setting next_run_triggered_by for workflow.") - sys.exit(-1) - else: - gw_config["next_run_triggered_by"] = w_config[ - "next_run_triggered_by" - ] + Parameters + ---------- + config : dict + Returns + ------- + config : dict + """ + esm_parser.pprint_config(config["general"]["workflow"]) return config - - -def merge_if_possible(source, target): - for entry in source: - if entry in target: - if not source[entry] == target[entry]: - print( - f"Mismatch while trying to merge subjob_clusters {source} into {target}" - ) - sys.exit(-1) - else: - target[entry] = source[entry] - return target