diff --git a/configs/other_software/batch_system/slurm.yaml b/configs/other_software/batch_system/slurm.yaml index 441c69b22..2915920da 100644 --- a/configs/other_software/batch_system/slurm.yaml +++ b/configs/other_software/batch_system/slurm.yaml @@ -47,20 +47,30 @@ computer: output_flags: "--output=${thisrun_logfile} --error=${thisrun_logfile}" name_flag: "--job-name=${expid}" - taskset: false + hetjob_strategy: hetjob mt_launcher_flag: "" add_choose_heterogeneous_parallelization: true: cpu_bind: "none" - choose_taskset: + choose_hetjob_strategy: # Support for old taskset heterogeneous parallelization approach - true: + taskset: srun_execution_command: "${debugger_flags_prelauncher} ${launcher} ${launcher_flags} ${mt_launcher_flag}--multi-prog ${config_files.hostfile}" - # Support for new packjob heterogeneous parallelization approach - false: - - # kh 24.06.22 --hint=nomultithread is needed (in many cases) on levante to avoid hyperthreading + # Support for hetjob parallelization approach (heterogeneity handled with heterogeneous slurm job + srun steps, allows for heterogeneous resources) + hetjob: + launcher_flags_per_component: " + ${mt_launcher_flag} + --nodes=@nnodes@ + --ntasks=@nproc@ + --ntasks-per-node=@nproc_per_node@ + --cpus-per-task=@cpus_per_proc@ + --export=ALL,OMP_NUM_THREADS=@omp_num_threads@" + srun_execution_command: "${debugger_flags_prelauncher} ${launcher} ${launcher_flags} \\\n@components@" + launcher_comp_sep: " \\\n:" + # Support for srun steps approach (heterogeneity handled in the srun command) + srunsteps: + # kh 24.06.22 --hint=nomultithread is needed (in many cases) on levante to avoid hyperthreading launcher_flags_per_component: " ${mt_launcher_flag} --nodes=@nnodes@ diff --git a/src/esm_runscripts/batch_system.py b/src/esm_runscripts/batch_system.py index 8c0321b8a..a4e52a13a 100644 --- a/src/esm_runscripts/batch_system.py +++ b/src/esm_runscripts/batch_system.py @@ -50,7 +50,6 @@ def job_is_still_running(self, jobid): def add_pre_launcher_lines(self, config, cluster, runfile): return self.bs.add_pre_launcher_lines(config, cluster, runfile) - # TODO: remove it once it's not needed anymore (substituted by packjob) def write_het_par_wrappers(self, config): return self.bs.write_het_par_wrappers(config) @@ -375,26 +374,35 @@ def get_run_commands(config, subjob, batch_or_shell): # here or in compute.py? commands = [] if subjob.startswith("compute"): - if config["general"].get("submit_to_batch_system", True): - batch_system = config["computer"] - if "execution_command" in batch_system: - commands.append( - "time " - + batch_system["execution_command"] - + f" 2>&1{config['computer'].get('write_execution_log', '')} &" - ) - if config["general"].get("multi_srun"): - return self.bs.get_run_commands_multisrun(config, commands) + submit_to_batch_system = config["general"].get( + "submit_to_batch_system", True + ) + write_execution_log = config["computer"].get( + "write_execution_log", "" + ) + + if submit_to_batch_system: + execution_command_list = config["computer"].get( + "execution_command_list", + [config["computer"].get("execution_command")] + ) else: + execution_command_list = [] for model in config: if model == "computer": continue if "execution_command" in config[model]: - commands.append( - "time ./" - + config[model]["execution_command"] - + f" 2>&1{config['computer'].get('write_execution_log', '')} &" + execution_command_list.append( + config[model]["execution_command"] ) + + for execution_command in execution_command_list: + commands.append( + f"time {execution_command} 2>&1{write_execution_log} &" + ) + + if submit_to_batch_system and config["general"].get("multi_srun"): + return self.bs.get_run_commands_multisrun(config, commands) else: subjob_tasks = dataprocess.subjob_tasks(config, subjob, batch_or_shell) for task in subjob_tasks: @@ -460,10 +468,9 @@ def write_simple_runscript(config, cluster, batch_or_shell="batch"): if batch_or_shell == "batch": config = batch_system.calculate_requirements(config, cluster) - # TODO: remove it once it's not needed anymore (substituted by packjob) if cluster in reserved_jobtypes and config["computer"].get( - "taskset", False - ): + "hetjob_strategy", "hetjob" + ) == "taskset": config = config["general"]["batch"].write_het_par_wrappers(config) # Prepare launcher config = config["general"]["batch"].prepare_launcher(config, cluster) @@ -703,7 +710,7 @@ def find_openmp(config): config[model]["nproc"] = 1 return config - def het_par_launcher_lines(self, config, cluster): + def hetjob_single_launcher_command(self, config, cluster): """ Loops through the components to generate job launcher flags and execution commands, to be appended in substitution to the ``@components@`` tag, in @@ -747,6 +754,54 @@ def het_par_launcher_lines(self, config, cluster): .replace("@jobtype@", cluster) ) + def hetjob_concurrent_launcher_commands(self, config, cluster): + """ + Loops through the components to generate job launcher flags and execution + commands, to be appended in substitution to the ``@components@`` tag, in + ``computer.execution_command``, that would later be used in the writing of the + ``.run`` file, in ``batch_system.py``. + + Parameters + ---------- + config : dict + Configuration dictionary containing information about the experiment and + experiment directory. + cluster : str + Type of job cluster. + """ + component_lines = [] + # Read in the separator to be used in between component calls in the job + # launcher + sep = config["computer"].get("launcher_comp_sep", "\\\n ") + " " + # Loop through the components + for model in config["general"]["valid_model_names"]: + command = None + # Read in execution command + if "execution_command" in config[model]: + command = config[model]["execution_command"] + elif "executable" in config[model]: + command = config[model]["executable"] + # Prepare the MPMD commands + + # kh 24.06.22 workaround: filter hdmodel + if command and (command != "NONE"): + launcher = config["computer"].get("launcher") + launcher_flags = self.calc_launcher_flags(config, model, cluster) + component_lines.append(f"{launcher_flags} ./{command} ") + + execution_command = config["computer"]["execution_command"] + execution_command_list = [] + for component in component_lines: + # Replace the ``@components@`` tag with the component command + # and add the ``&`` at the end to run it in background + execution_command_list.append( + execution_command + .replace("@components@", component) + .replace("@jobtype@", cluster) + ) + + config["computer"]["execution_command_list"] = execution_command_list + @staticmethod def calc_launcher_flags(config, model, cluster): """ diff --git a/src/esm_runscripts/slurm.py b/src/esm_runscripts/slurm.py index 48a744e60..fa2e147f9 100644 --- a/src/esm_runscripts/slurm.py +++ b/src/esm_runscripts/slurm.py @@ -10,7 +10,7 @@ from loguru import logger import esm_parser -from esm_tools import user_note +from esm_tools import user_note, user_error class Slurm: @@ -69,12 +69,11 @@ def prepare_launcher(self, config, cluster): current_hostfile = self.path + "_" + run_type write_one_hostfile(current_hostfile, config) - if config["computer"].get( + heterogeneous_parallelization = config["computer"].get( "heterogeneous_parallelization", False - ) and not config["computer"].get("taskset", False): - # Prepare heterogeneous parallelization call - config["general"]["batch"].het_par_launcher_lines(config, cluster) - else: + ) + hetjob_strategy = config["computer"].get("hetjob_strategy", "hetjob") + if not heterogeneous_parallelization or hetjob_strategy == "taskset": # Standard/old way of running jobs with slurm self.write_one_hostfile(self.path, config) @@ -82,6 +81,16 @@ def prepare_launcher(self, config, cluster): config["general"]["work_dir"] + "/" + os.path.basename(self.path) ) shutil.copyfile(self.path, hostfile_in_work) + elif hetjob_strategy == "hetjob" or hetjob_strategy == "srunsteps": + # Prepare heterogeneous parallelization call for hetjob (one srun command + # per binary) + config["general"]["batch"].hetjob_single_launcher_command(config, cluster) + else: + user_error( + "hetjob strategy", + f"``{hetjob_strategy}`` is not a valid one. Choose one among " + f"``hetjob`` (default), ``srunsteps`` or ``taskset``.", + ) return config @@ -157,9 +166,8 @@ def add_pre_launcher_lines(self, config, cluster, runfile): (``runfile.write("")``). """ - # TODO: remove it once it's not needed anymore (substituted by packjob) if config["computer"].get("heterogeneous_parallelization", False): - if config["computer"].get("taskset", False): + if config["computer"].get("hetjob_strategy") == "taskset": self.add_hostlist_file_gen_lines(config, runfile) @staticmethod @@ -185,9 +193,10 @@ def het_par_headers(config, cluster, headers): for heterogeneous parallelization in SLURM. """ # Only modify the headers if ``heterogeneous_parallelization`` is ``True`` + hetjob_strategy = config["computer"].get("hetjob_strategy", "hetjob") if config["computer"].get( "heterogeneous_parallelization", False - ) and not config["computer"].get("taskset", False): + ) and hetjob_strategy not in ["taskset", "srunsteps"]: this_batch_system = config["computer"] # Get the variables to be modified for the headers nodes_flag = this_batch_system["nodes_flag"].split("=")[0] @@ -223,7 +232,6 @@ def het_par_headers(config, cluster, headers): return headers - # TODO: remove it once it's not needed anymore (substituted by packjob) @staticmethod def write_het_par_wrappers(config): cores_per_node = config["computer"]["partitions"]["compute"]["cores_per_node"] @@ -300,7 +308,6 @@ def write_het_par_wrappers(config): config[model]["execution_command_het_par"] = execution_command_het_par return config - # TODO: remove it once it's not needed anymore (substituted by packjob) @staticmethod def add_hostlist_file_gen_lines(config, runfile): cores_per_node = config["computer"]["partitions"]["compute"]["cores_per_node"] diff --git a/utils/model_time.sh b/utils/model_time.sh new file mode 100755 index 000000000..37a339ff3 --- /dev/null +++ b/utils/model_time.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +# Help flag +if [[ "$1" == "--help" || "$1" == "-h" ]]; then + echo "Usage: $0 " + echo "Example: $0 'rundir/test_*'" + exit 0 +fi + +SIM_PATTERN=$1 +FILE_PATTERN="${SIM_PATTERN}/log/${SIM_PATTERN}_*_compute_????????-????????_*.log" + +for file in $FILE_PATTERN; do + # Find real time in the file + REAL_TIME=$(grep -oP 'real\t.*' "$file" | head -n 1 | awk '{print $2}') + # Extract SIM + SIM=$(echo $file | grep -o '^[^/]*') + # Extract date from the file name + DATE=$(echo $file | grep -oP 'compute_\K[0-9]{8}-[0-9]{8}') + echo $SIM $DATE $REAL_TIME +done