diff --git a/flocs_runners/linc_runner.py b/flocs_runners/linc_runner.py index 0116000..503955f 100644 --- a/flocs_runners/linc_runner.py +++ b/flocs_runners/linc_runner.py @@ -51,6 +51,7 @@ def __init__( self.configdict = {} self.outdir = outdir self.cluster = detect_compute_cluster() + self.mspath = mspath filedir = os.path.join(mspath, f"*{ms_suffix}") logger.info(f"Searching {filedir}") @@ -152,13 +153,13 @@ def tune_to_cluster(self, runner: str): logger.info("We are on local scratch, bombs away") logger.info(f"max-dp3-threads {self.configdict['max_dp3_threads']} -> 1") self.configdict["max_dp3_threads"] = 1 - elif runner == "cwltool" and self.configdict["scheduler"] == "slurm": + elif runner == "cwltool" and self.full_config["scheduler"] == "slurm": logger.info("We are submitting a Slurm job. Hijacking config with known good settings:") - logger.info(f"slurm-cores {self.configdict['slurm_cores']} -> 32") - logger.info(f"slurm-time {self.configdict['slurm_time']} -> 4:00:00") + logger.info(f"slurm-cores {self.full_config['slurm_cores']} -> 32") + logger.info(f"slurm-time {self.full_config['slurm_time']} -> 4:00:00") logger.info(f"max-dp3-threads {self.configdict['max_dp3_threads']} -> 1") - self.configdict["slurm_cores"] = 32 - self.configdict["slurm_time"] = "4:00:00" + self.full_config["slurm_cores"] = 32 + self.full_config["slurm_time"] = "4:00:00" self.configdict["max_dp3_threads"] = 1 def move_results_from_rundir(self): @@ -236,6 +237,7 @@ def run_workflow( self.restarting = True logger.info(f"Attempting to restart existing workflow from {self.rundir}.") self.setup_apptainer_variables(self.rundir) + self.tune_to_cluster(runner) logger.info(f"Running workflow with {runner} under {scheduler} in {self.rundir}") if runner == "cwltool": @@ -253,19 +255,27 @@ def run_workflow( cmd += f"{self.configfile}" if scheduler == "slurm": - wrapped_cmd = add_slurm_skeleton( - contents=cmd, - job_name=f"LINC_{self.mode.value}", - cluster=self.cluster, - **slurm_params, - ) if self.cluster == "spider": + slurm_params["cores"] = self.full_config["slurm_cores"] + slurm_params["time"] = self.full_config["slurm_time"] + wrapped_cmd = add_slurm_skeleton( + contents=cmd, + job_name=f"LINC_{self.mode.value}", + cluster=self.cluster, + **slurm_params, + ) with open("temp_jobscript.sh", "w") as f: f.write(wrapped_cmd) logger.info("Written temporary jobscript to temp_jobscript.sh") - out = subprocess.check_output(["bash", "temp_jobscript.sh"]).decode("utf-8") + out = subprocess.check_output(["bash", "temp_jobscript.sh", self.mspath, self.outdir]).decode("utf-8") print(out) else: + wrapped_cmd = add_slurm_skeleton( + contents=cmd, + job_name=f"LINC_{self.mode.value}", + cluster=self.cluster, + **slurm_params, + ) with open("temp_jobscript.sh", "w") as f: f.write(wrapped_cmd) logger.info("Written temporary jobscript to temp_jobscript.sh") @@ -772,12 +782,13 @@ def calibrator( "toil_jobstore", "use_node_scratch", ] + config.full_config = args.copy() args_for_linc = args.copy() for key in unneeded_keys: args_for_linc.pop(key) for key, val in args_for_linc.items(): config.add_entry(key, val) - config.tune_to_cluster(args["runner"]) + #config.tune_to_cluster(runner) config.save(f"mslist_{config.obsid}_LINC_calibrator.json") if args["record_toil_stats"] and args["runner"] != "toil": logger.critical("--record-toil-stats needs '--runner toil'.") diff --git a/flocs_runners/utils.py b/flocs_runners/utils.py index 2d72ede..07f2db4 100644 --- a/flocs_runners/utils.py +++ b/flocs_runners/utils.py @@ -186,7 +186,7 @@ def add_slurm_skeleton( memory: int = 0, cluster: str = "", ): - if cluster == "spider_disabled": + if cluster == "spider": if "calibrator" in job_name: wrapped = rf"""sbatch <