From 78fda5c0a7b0e0156e58782328099806671455b2 Mon Sep 17 00:00:00 2001 From: Nick Tyler Date: Mon, 17 Mar 2025 10:44:56 -0700 Subject: [PATCH 1/6] Use itertools batch to get long jobs lists --- parsl/providers/slurm/slurm.py | 35 ++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index cf50058522..4826eca20d 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -4,6 +4,7 @@ import re import time from typing import Any, Dict, Optional +import itertools import typeguard @@ -148,6 +149,8 @@ def __init__(self, self.qos = qos self.constraint = constraint self.clusters = clusters + # Used to batch requests to sacct/squeue for long jobs lists + self._job_batch_size = 50 self.scheduler_options = scheduler_options + '\n' if exclusive: self.scheduler_options += "#SBATCH --exclusive\n" @@ -199,22 +202,26 @@ def _status(self): Returns: [status...] : Status list of all jobs ''' - job_id_list = ','.join( - [jid for jid, job in self.resources.items() if not job['status'].terminal] - ) - if not job_id_list: + + if len(self.resources.items()) == 0: logger.debug('No active jobs, skipping status update') return - - cmd = self._cmd.format(job_id_list) - logger.debug("Executing %s", cmd) - retcode, stdout, stderr = self.execute_wait(cmd) - logger.debug("sacct/squeue returned %s %s", stdout, stderr) - - # Execute_wait failed. Do no update - if retcode != 0: - logger.warning("sacct/squeue failed with non-zero exit code {}".format(retcode)) - return + + job_list_batches = itertools.batched(self.resources.items(), self._job_batch_size) + stdout = "" + for job_batch in job_list_batches: + job_id_list = ','.join( + [jid for jid, job in job_batch if not job['status'].terminal] + ) + cmd = self._cmd.format(job_id_list) + logger.debug("Executing %s", cmd) + retcode, _stdout, stderr = self.execute_wait(cmd) + logger.debug("sacct/squeue returned %s %s", stdout, stderr) + stdout += _stdout + # Execute_wait failed. Do no update + if retcode != 0: + logger.warning("sacct/squeue failed with non-zero exit code {}".format(retcode)) + return jobs_missing = set(self.resources.keys()) for line in stdout.split('\n'): From a2495659a64843d4e757e5a45e28c7818c6a0f90 Mon Sep 17 00:00:00 2001 From: Nick Tyler Date: Mon, 17 Mar 2025 18:02:10 -0700 Subject: [PATCH 2/6] batched is only in new versions of python --- parsl/providers/slurm/slurm.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index 4826eca20d..86afa02bff 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -4,7 +4,21 @@ import re import time from typing import Any, Dict, Optional -import itertools +try: + from itertools import batched +except AttributeError: + # https://docs.python.org/3.13/library/itertools.html#itertools.batched + from itertools import islice + def batched(iterable, n, *, strict=False): + # batched('ABCDEFG', 3) → ABC DEF G + if n < 1: + raise ValueError('n must be at least one') + iterator = iter(iterable) + while batch := tuple(islice(iterator, n)): + if strict and len(batch) != n: + raise ValueError('batched(): incomplete batch') + yield batch + import typeguard @@ -207,7 +221,7 @@ def _status(self): logger.debug('No active jobs, skipping status update') return - job_list_batches = itertools.batched(self.resources.items(), self._job_batch_size) + job_list_batches = batched(self.resources.items(), self._job_batch_size) stdout = "" for job_batch in job_list_batches: job_id_list = ','.join( From 3ca6da7f45ae1e0d96b8c179e8c716782c615ae6 Mon Sep 17 00:00:00 2001 From: Nick Tyler Date: Mon, 17 Mar 2025 18:18:29 -0700 Subject: [PATCH 3/6] Fix imports --- parsl/providers/slurm/slurm.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index 86afa02bff..5cfb7c57ad 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -4,9 +4,10 @@ import re import time from typing import Any, Dict, Optional + try: from itertools import batched -except AttributeError: +except ImportError: # https://docs.python.org/3.13/library/itertools.html#itertools.batched from itertools import islice def batched(iterable, n, *, strict=False): From 376daf61104f9abed0a4ebabb286b51dd16bb70e Mon Sep 17 00:00:00 2001 From: Nick Tyler Date: Mon, 17 Mar 2025 18:26:08 -0700 Subject: [PATCH 4/6] Fix flake8 lint --- parsl/providers/slurm/slurm.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index 5cfb7c57ad..a5006745d0 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -10,6 +10,7 @@ except ImportError: # https://docs.python.org/3.13/library/itertools.html#itertools.batched from itertools import islice + def batched(iterable, n, *, strict=False): # batched('ABCDEFG', 3) → ABC DEF G if n < 1: @@ -19,7 +20,7 @@ def batched(iterable, n, *, strict=False): if strict and len(batch) != n: raise ValueError('batched(): incomplete batch') yield batch - + import typeguard @@ -217,11 +218,11 @@ def _status(self): Returns: [status...] : Status list of all jobs ''' - + if len(self.resources.items()) == 0: logger.debug('No active jobs, skipping status update') return - + job_list_batches = batched(self.resources.items(), self._job_batch_size) stdout = "" for job_batch in job_list_batches: From bd8dd29c535af067c11b93966049880e51c6484c Mon Sep 17 00:00:00 2001 From: Nick Tyler Date: Tue, 18 Mar 2025 08:37:37 -0700 Subject: [PATCH 5/6] Change to function without try-except --- parsl/providers/slurm/slurm.py | 229 +++++++++++++++++---------------- 1 file changed, 120 insertions(+), 109 deletions(-) diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index a5006745d0..cdd3c86a80 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -3,25 +3,9 @@ import os import re import time +from itertools import islice from typing import Any, Dict, Optional -try: - from itertools import batched -except ImportError: - # https://docs.python.org/3.13/library/itertools.html#itertools.batched - from itertools import islice - - def batched(iterable, n, *, strict=False): - # batched('ABCDEFG', 3) → ABC DEF G - if n < 1: - raise ValueError('n must be at least one') - iterator = iter(iterable) - while batch := tuple(islice(iterator, n)): - if strict and len(batch) != n: - raise ValueError('batched(): incomplete batch') - yield batch - - import typeguard from parsl.jobs.states import JobState, JobStatus @@ -36,37 +20,51 @@ def batched(iterable, n, *, strict=False): # From https://slurm.schedmd.com/sacct.html#SECTION_JOB-STATE-CODES sacct_translate_table = { - 'PENDING': JobState.PENDING, - 'RUNNING': JobState.RUNNING, - 'CANCELLED': JobState.CANCELLED, - 'COMPLETED': JobState.COMPLETED, - 'FAILED': JobState.FAILED, - 'NODE_FAIL': JobState.FAILED, - 'BOOT_FAIL': JobState.FAILED, - 'DEADLINE': JobState.TIMEOUT, - 'TIMEOUT': JobState.TIMEOUT, - 'REVOKED': JobState.FAILED, - 'OUT_OF_MEMORY': JobState.FAILED, - 'SUSPENDED': JobState.HELD, - 'PREEMPTED': JobState.TIMEOUT, - 'REQUEUED': JobState.PENDING + "PENDING": JobState.PENDING, + "RUNNING": JobState.RUNNING, + "CANCELLED": JobState.CANCELLED, + "COMPLETED": JobState.COMPLETED, + "FAILED": JobState.FAILED, + "NODE_FAIL": JobState.FAILED, + "BOOT_FAIL": JobState.FAILED, + "DEADLINE": JobState.TIMEOUT, + "TIMEOUT": JobState.TIMEOUT, + "REVOKED": JobState.FAILED, + "OUT_OF_MEMORY": JobState.FAILED, + "SUSPENDED": JobState.HELD, + "PREEMPTED": JobState.TIMEOUT, + "REQUEUED": JobState.PENDING, } squeue_translate_table = { - 'PD': JobState.PENDING, - 'R': JobState.RUNNING, - 'CA': JobState.CANCELLED, - 'CF': JobState.PENDING, # (configuring), - 'CG': JobState.RUNNING, # (completing), - 'CD': JobState.COMPLETED, - 'F': JobState.FAILED, # (failed), - 'TO': JobState.TIMEOUT, # (timeout), - 'NF': JobState.FAILED, # (node failure), - 'RV': JobState.FAILED, # (revoked) and - 'SE': JobState.FAILED # (special exit state) + "PD": JobState.PENDING, + "R": JobState.RUNNING, + "CA": JobState.CANCELLED, + "CF": JobState.PENDING, # (configuring), + "CG": JobState.RUNNING, # (completing), + "CD": JobState.COMPLETED, + "F": JobState.FAILED, # (failed), + "TO": JobState.TIMEOUT, # (timeout), + "NF": JobState.FAILED, # (node failure), + "RV": JobState.FAILED, # (revoked) and + "SE": JobState.FAILED, # (special exit state) } +def batched(iterable, n): + """Batched + Turns a list into a batch of size n. This code is from + https://docs.python.org/3.12/library/itertools.html#itertools.batched + and in versions 3.12+ this can be replaced with + itertools.batched + """ + if n < 1: + raise ValueError("n must be at least one") + iterator = iter(iterable) + while batch := tuple(islice(iterator, n)): + yield batch + + class SlurmProvider(ClusterProvider, RepresentationMixin): """Slurm Execution Provider @@ -116,6 +114,12 @@ class SlurmProvider(ClusterProvider, RepresentationMixin): symbolic group for the job ID. worker_init : str Command to be run before starting a worker, such as 'module load Anaconda; source activate env'. + cmd_timeout : int (Default = 10) + Number of seconds to wait for slurm commands to finish. For schedulers with many this + may need to be increased to wait longer for scheduler information. + status_batch_size: ine (Default = 50) + Number of jobs to batch together in calls to the scheduler status. For schedulers + with many jobs this may need to be decreased to get jobs in smaller batches. exclusive : bool (Default = True) Requests nodes which are not shared with other running jobs. launcher : Launcher @@ -126,36 +130,41 @@ class SlurmProvider(ClusterProvider, RepresentationMixin): """ @typeguard.typechecked - def __init__(self, - partition: Optional[str] = None, - account: Optional[str] = None, - qos: Optional[str] = None, - constraint: Optional[str] = None, - clusters: Optional[str] = None, - nodes_per_block: int = 1, - cores_per_node: Optional[int] = None, - mem_per_node: Optional[int] = None, - init_blocks: int = 1, - min_blocks: int = 0, - max_blocks: int = 1, - parallelism: float = 1, - walltime: str = "00:10:00", - scheduler_options: str = '', - regex_job_id: str = r"Submitted batch job (?P\S*)", - worker_init: str = '', - cmd_timeout: int = 10, - exclusive: bool = True, - launcher: Launcher = SingleNodeLauncher()): - label = 'slurm' - super().__init__(label, - nodes_per_block, - init_blocks, - min_blocks, - max_blocks, - parallelism, - walltime, - cmd_timeout=cmd_timeout, - launcher=launcher) + def __init__( + self, + partition: Optional[str] = None, + account: Optional[str] = None, + qos: Optional[str] = None, + constraint: Optional[str] = None, + clusters: Optional[str] = None, + nodes_per_block: int = 1, + cores_per_node: Optional[int] = None, + mem_per_node: Optional[int] = None, + init_blocks: int = 1, + min_blocks: int = 0, + max_blocks: int = 1, + parallelism: float = 1, + walltime: str = "00:10:00", + scheduler_options: str = "", + regex_job_id: str = r"Submitted batch job (?P\S*)", + worker_init: str = "", + cmd_timeout: int = 10, + status_batch_size: int = 50, + exclusive: bool = True, + launcher: Launcher = SingleNodeLauncher(), + ): + label = "slurm" + super().__init__( + label, + nodes_per_block, + init_blocks, + min_blocks, + max_blocks, + parallelism, + walltime, + cmd_timeout=cmd_timeout, + launcher=launcher, + ) self.partition = partition self.cores_per_node = cores_per_node @@ -166,8 +175,8 @@ def __init__(self, self.constraint = constraint self.clusters = clusters # Used to batch requests to sacct/squeue for long jobs lists - self._job_batch_size = 50 - self.scheduler_options = scheduler_options + '\n' + self.status_batch_size = status_batch_size + self.scheduler_options = scheduler_options + "\n" if exclusive: self.scheduler_options += "#SBATCH --exclusive\n" if partition: @@ -182,7 +191,7 @@ def __init__(self, self.scheduler_options += "#SBATCH --clusters={}\n".format(clusters) self.regex_job_id = regex_job_id - self.worker_init = worker_init + '\n' + self.worker_init = worker_init + "\n" # Check if sacct works and if not fall back to squeue cmd = "sacct -X" logger.debug("Executing %s", cmd) @@ -210,25 +219,23 @@ def __init__(self, self._translate_table = squeue_translate_table def _status(self): - '''Returns the status list for a list of job_ids + """Returns the status list for a list of job_ids Args: self Returns: [status...] : Status list of all jobs - ''' + """ if len(self.resources.items()) == 0: - logger.debug('No active jobs, skipping status update') + logger.debug("No active jobs, skipping status update") return - job_list_batches = batched(self.resources.items(), self._job_batch_size) + job_list_batches = batched(self.resources.items(), self.status_batch_size) stdout = "" for job_batch in job_list_batches: - job_id_list = ','.join( - [jid for jid, job in job_batch if not job['status'].terminal] - ) + job_id_list = ",".join([jid for jid, job in job_batch if not job["status"].terminal]) cmd = self._cmd.format(job_id_list) logger.debug("Executing %s", cmd) retcode, _stdout, stderr = self.execute_wait(cmd) @@ -240,7 +247,7 @@ def _status(self): return jobs_missing = set(self.resources.keys()) - for line in stdout.split('\n'): + for line in stdout.split("\n"): if not line: # Blank line continue @@ -252,9 +259,11 @@ def _status(self): logger.warning(f"Slurm status {slurm_state} is not recognized") status = self._translate_table.get(slurm_state, JobState.UNKNOWN) logger.debug("Updating job {} with slurm status {} to parsl state {!s}".format(job_id, slurm_state, status)) - self.resources[job_id]['status'] = JobStatus(status, - stdout_path=self.resources[job_id]['job_stdout_path'], - stderr_path=self.resources[job_id]['job_stderr_path']) + self.resources[job_id]["status"] = JobStatus( + status, + stdout_path=self.resources[job_id]["job_stdout_path"], + stderr_path=self.resources[job_id]["job_stderr_path"], + ) jobs_missing.remove(job_id) # sacct can get job info after jobs have completed so this path shouldn't be hit @@ -262,9 +271,11 @@ def _status(self): # blanks for missing jobs, we might lose some information about why the jobs failed. for missing_job in jobs_missing: logger.debug("Updating missing job {} to completed status".format(missing_job)) - self.resources[missing_job]['status'] = JobStatus( - JobState.COMPLETED, stdout_path=self.resources[missing_job]['job_stdout_path'], - stderr_path=self.resources[missing_job]['job_stderr_path']) + self.resources[missing_job]["status"] = JobStatus( + JobState.COMPLETED, + stdout_path=self.resources[missing_job]["job_stdout_path"], + stderr_path=self.resources[missing_job]["job_stderr_path"], + ) def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> str: """Submit the command as a slurm job. @@ -286,12 +297,12 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s scheduler_options = self.scheduler_options worker_init = self.worker_init if self.mem_per_node is not None: - scheduler_options += '#SBATCH --mem={}g\n'.format(self.mem_per_node) - worker_init += 'export PARSL_MEMORY_GB={}\n'.format(self.mem_per_node) + scheduler_options += "#SBATCH --mem={}g\n".format(self.mem_per_node) + worker_init += "export PARSL_MEMORY_GB={}\n".format(self.mem_per_node) if self.cores_per_node is not None: cpus_per_task = math.floor(self.cores_per_node / tasks_per_node) - scheduler_options += '#SBATCH --cpus-per-task={}'.format(cpus_per_task) - worker_init += 'export PARSL_CORES={}\n'.format(cpus_per_task) + scheduler_options += "#SBATCH --cpus-per-task={}".format(cpus_per_task) + worker_init += "export PARSL_CORES={}\n".format(cpus_per_task) job_name = "{0}.{1}".format(job_name, time.time()) @@ -315,9 +326,7 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s job_config["job_stderr_path"] = job_stderr_path # Wrap the command - job_config["user_script"] = self.launcher(command, - tasks_per_node, - self.nodes_per_block) + job_config["user_script"] = self.launcher(command, tasks_per_node, self.nodes_per_block) logger.debug("Writing submit script") self._write_submit_script(template_string, script_path, job_name, job_config) @@ -325,15 +334,16 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s retcode, stdout, stderr = self.execute_wait("sbatch {0}".format(script_path)) if retcode == 0: - for line in stdout.split('\n'): + for line in stdout.split("\n"): match = re.match(self.regex_job_id, line) if match: job_id = match.group("id") - self.resources[job_id] = {'job_id': job_id, - 'status': JobStatus(JobState.PENDING), - 'job_stdout_path': job_stdout_path, - 'job_stderr_path': job_stderr_path, - } + self.resources[job_id] = { + "job_id": job_id, + "status": JobStatus(JobState.PENDING), + "job_stdout_path": job_stdout_path, + "job_stderr_path": job_stderr_path, + } return job_id else: logger.error("Could not read job ID from submit command standard output.") @@ -343,29 +353,30 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s "Could not read job ID from submit command standard output", stdout=stdout, stderr=stderr, - retcode=retcode + retcode=retcode, ) else: logger.error("Submit command failed") logger.error("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip()) raise SubmitException( - job_name, "Could not read job ID from submit command standard output", + job_name, + "Could not read job ID from submit command standard output", stdout=stdout, stderr=stderr, - retcode=retcode + retcode=retcode, ) def cancel(self, job_ids): - ''' Cancels the jobs specified by a list of job ids + """Cancels the jobs specified by a list of job ids Args: job_ids : [ ...] Returns : [True/False...] : If the cancel operation fails the entire list will be False. - ''' + """ - job_id_list = ' '.join(job_ids) + job_id_list = " ".join(job_ids) # Make the command to cancel jobs _cmd = "scancel" @@ -377,7 +388,7 @@ def cancel(self, job_ids): rets = None if retcode == 0: for jid in job_ids: - self.resources[jid]['status'] = JobStatus(JobState.CANCELLED) # Setting state to cancelled + self.resources[jid]["status"] = JobStatus(JobState.CANCELLED) # Setting state to cancelled rets = [True for i in job_ids] else: rets = [False for i in job_ids] From b12241a919bfc4842f37544931fcd3df4ca1c338 Mon Sep 17 00:00:00 2001 From: Nick Tyler Date: Tue, 18 Mar 2025 10:19:47 -0700 Subject: [PATCH 6/6] Fix spelling --- parsl/providers/slurm/slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index cdd3c86a80..cce63cc202 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -117,7 +117,7 @@ class SlurmProvider(ClusterProvider, RepresentationMixin): cmd_timeout : int (Default = 10) Number of seconds to wait for slurm commands to finish. For schedulers with many this may need to be increased to wait longer for scheduler information. - status_batch_size: ine (Default = 50) + status_batch_size: int (Default = 50) Number of jobs to batch together in calls to the scheduler status. For schedulers with many jobs this may need to be decreased to get jobs in smaller batches. exclusive : bool (Default = True)