Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions flocs_runners/linc_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(
self.configdict = {}
self.outdir = outdir
self.cluster = detect_compute_cluster()
self.mspath = mspath

filedir = os.path.join(mspath, f"*{ms_suffix}")
logger.info(f"Searching {filedir}")
Expand Down Expand Up @@ -152,13 +153,13 @@ def tune_to_cluster(self, runner: str):
logger.info("We are on local scratch, bombs away")
logger.info(f"max-dp3-threads {self.configdict['max_dp3_threads']} -> 1")
self.configdict["max_dp3_threads"] = 1
elif runner == "cwltool" and self.configdict["scheduler"] == "slurm":
elif runner == "cwltool" and self.full_config["scheduler"] == "slurm":
logger.info("We are submitting a Slurm job. Hijacking config with known good settings:")
logger.info(f"slurm-cores {self.configdict['slurm_cores']} -> 32")
logger.info(f"slurm-time {self.configdict['slurm_time']} -> 4:00:00")
logger.info(f"slurm-cores {self.full_config['slurm_cores']} -> 32")
logger.info(f"slurm-time {self.full_config['slurm_time']} -> 4:00:00")
logger.info(f"max-dp3-threads {self.configdict['max_dp3_threads']} -> 1")
self.configdict["slurm_cores"] = 32
self.configdict["slurm_time"] = "4:00:00"
self.full_config["slurm_cores"] = 32
self.full_config["slurm_time"] = "4:00:00"
self.configdict["max_dp3_threads"] = 1

def move_results_from_rundir(self):
Expand Down Expand Up @@ -236,6 +237,7 @@ def run_workflow(
self.restarting = True
logger.info(f"Attempting to restart existing workflow from {self.rundir}.")
self.setup_apptainer_variables(self.rundir)
self.tune_to_cluster(runner)
logger.info(f"Running workflow with {runner} under {scheduler} in {self.rundir}")

if runner == "cwltool":
Expand All @@ -253,19 +255,27 @@ def run_workflow(
cmd += f"{self.configfile}"

if scheduler == "slurm":
wrapped_cmd = add_slurm_skeleton(
contents=cmd,
job_name=f"LINC_{self.mode.value}",
cluster=self.cluster,
**slurm_params,
)
if self.cluster == "spider":
slurm_params["cores"] = self.full_config["slurm_cores"]
slurm_params["time"] = self.full_config["slurm_time"]
wrapped_cmd = add_slurm_skeleton(
contents=cmd,
job_name=f"LINC_{self.mode.value}",
cluster=self.cluster,
**slurm_params,
)
with open("temp_jobscript.sh", "w") as f:
f.write(wrapped_cmd)
logger.info("Written temporary jobscript to temp_jobscript.sh")
out = subprocess.check_output(["bash", "temp_jobscript.sh"]).decode("utf-8")
out = subprocess.check_output(["bash", "temp_jobscript.sh", self.mspath, self.outdir]).decode("utf-8")
print(out)
else:
wrapped_cmd = add_slurm_skeleton(
contents=cmd,
job_name=f"LINC_{self.mode.value}",
cluster=self.cluster,
**slurm_params,
)
with open("temp_jobscript.sh", "w") as f:
f.write(wrapped_cmd)
logger.info("Written temporary jobscript to temp_jobscript.sh")
Expand Down Expand Up @@ -772,12 +782,13 @@ def calibrator(
"toil_jobstore",
"use_node_scratch",
]
config.full_config = args.copy()
args_for_linc = args.copy()
for key in unneeded_keys:
args_for_linc.pop(key)
for key, val in args_for_linc.items():
config.add_entry(key, val)
config.tune_to_cluster(args["runner"])
#config.tune_to_cluster(runner)
config.save(f"mslist_{config.obsid}_LINC_calibrator.json")
if args["record_toil_stats"] and args["runner"] != "toil":
logger.critical("--record-toil-stats needs '--runner toil'.")
Expand Down
3 changes: 2 additions & 1 deletion flocs_runners/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def add_slurm_skeleton(
memory: int = 0,
cluster: str = "",
):
if cluster == "spider_disabled":
if cluster == "spider":
if "calibrator" in job_name:
wrapped = rf"""sbatch <<EOT
#!/usr/bin/bash
Expand All @@ -200,6 +200,7 @@ def add_slurm_skeleton(
cd \$TMPDIR
rsync -avP \$RUNDIR/LINC_calib* $(realpath $2)
rm -rf \$RUNDIR
EOT
"""
elif "target" in job_name:
wrapped = rf"""sbatch <<EOT
Expand Down
Loading