diff --git a/.gitignore b/.gitignore index e73a52a..7d44098 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,59 @@ -*.pyc -*.pyo +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class -*.egg -gridmap.egg-info/ -dist/ +# C extensions +*.so + +# Distribution / packaging +.Python +env/ build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log + +# Sphinx documentation docs/_build/ + +# PyBuilder +target/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..21dd671 --- /dev/null +++ b/README.md @@ -0,0 +1,33 @@ +#GridMap + +This is a WIP fork of the original gridmap project. This was created to achieve compatability +with the TORQUE and PBS grid engine. + +### Requirements +Jobs can only be submitted from nodes than are allowed to do that (i.e they can run 'qsub') + +A couple of environment variables need to be set in order to work. + +ERROR_MAIL_RECIPIENT = *your email address* + +export DRMAA_LIBRARY_PATH = *like pbs_drmaa/libs/libdrmaa.so for pbs* + +export DEFAULT_TEMP_DIR="/local/$USER/" + +export USE_MEM_FREE=TRUE + +export SMTP_SERVER="unimail.tu-dortmund.de" + +export ERROR_MAIL_RECIPIENT="your.email@address.com" + +export ERROR_MAIL_SENDER="torque@hpc-main3.phido.physik.tu-dortmund.de" + +export SEND_ERROR_MAIL=TRUE + +### Python Requirements + + +- drmaa +- psutil +- pyzmq +- Python 3.4+ diff --git a/README.rst b/README.rst deleted file mode 100644 index a0c91db..0000000 --- a/README.rst +++ /dev/null @@ -1,58 +0,0 @@ -GridMap ------------ - -.. image:: https://img.shields.io/travis/pygridtools/gridmap/stable.svg - :alt: Build status - :target: https://travis-ci.org/pygridtools/gridmap - -.. image:: https://img.shields.io/coveralls/pygridtools/gridmap/stable.svg - :target: https://coveralls.io/r/pygridtools/gridmap - -.. image:: https://img.shields.io/pypi/dm/gridmap.svg - :target: https://warehouse.python.org/project/gridmap/ - :alt: PyPI downloads - -.. image:: https://img.shields.io/pypi/v/gridmap.svg - :target: https://warehouse.python.org/project/gridmap/ - :alt: Latest version on PyPI - -.. image:: https://img.shields.io/pypi/l/gridmap.svg - :alt: License - -A package to allow you to easily create jobs on the cluster directly from -Python. You can directly map Python functions onto the cluster without needing -to write any wrapper code yourself. - -This is the ETS fork of an older project called `Python Grid `__. Unlike the older -version, it is Python 2/3 compatible. Another major difference is that you can -change the configuration via environment variables instead of having to modify -a Python file in your ``site-packages`` directory. We've also fixed some bugs. - -For some examples of how to use it, check out ``map_reduce.py`` (for a simple -example of how you can map a function onto the cluster) and ``manual.py`` (for -an example of how you can create list of jobs yourself) in the examples folder. - -For complete documentation `read the docs `__. - -*NOTE*: You cannot use GridMap on a machine that is not allowed to submit jobs -(e.g., slave nodes). - -Requirements -~~~~~~~~~~~~ - -- `drmaa `__ -- `psutil `__ -- `pyzmq `__ -- Python 2.7+ - -Acknowledgments -~~~~~~~~~~~~~~~ - -Thank you to `Max-Planck-Society `__ and -`Educational Testing Service `__ for -funding the development of GridMap. - -Changelog -~~~~~~~~~ - -See `GitHub releases `__. diff --git a/examples/manual.py b/examples/manual.py index 937ad92..3b5f314 100755 --- a/examples/manual.py +++ b/examples/manual.py @@ -32,8 +32,15 @@ from datetime import datetime from gridmap import Job, process_jobs - - +import argparse + +parser = argparse.ArgumentParser() +parser.add_argument('--engine', help='Name of the grid engine you are using.', choices=['TOURQUE','PBS','SGE'], default='SGE') +parser.add_argument('--queue', help='Name of the queue you want to send jobs to.', default='all.q') +parser.add_argument('--vmem', help='Amount of memory to use on a node.', default='200m') +parser.add_argument('--port', help='The port through which to communicate with the JobMonitor', default=None, type=int) +parser.add_argument('--local', help='Flag indicating whether the jobs should run locally instead of on the cluster', default=False, type=bool) +parser.add_argument("--logging", type=str, choices=['INFO', 'DEBUG', 'WARN'], help='increase output verbosity', default='INFO') def sleep_walk(secs): ''' Pass the time by adding numbers until the specified number of seconds has @@ -46,18 +53,18 @@ def sleep_walk(secs): num = num + 1 -def compute_factorial(n): +def compute_factorial(n, sleep=10): """ computes factorial of n """ - sleep_walk(10) + sleep_walk(sleep) ret = 1 for i in range(n): ret = ret * (i + 1) return ret -def make_jobs(): +def make_jobs(engine, queue, vmem): """ creates a list of Job objects, which carry all information needed @@ -68,7 +75,7 @@ def make_jobs(): """ # set up list of arguments - inputvec = [[3], [5], [10], [20]] + inputvec = [[3, 10], [5, 20], [10, 10], [20, 20]] # create empty job vector jobs = [] @@ -77,7 +84,8 @@ def make_jobs(): for arg in inputvec: # The default queue used by the Job class is all.q. You must specify # the `queue` keyword argument if that is not the name of your queue. - job = Job(compute_factorial, arg, queue='all.q') + job = Job(compute_factorial, arg, queue=queue, engine=engine, + mem_free=vmem) jobs.append(job) return jobs @@ -88,21 +96,37 @@ def main(): run a set of jobs on cluster """ + args = parser.parse_args() + engine = args.engine + queue = args.queue + vmem = args.vmem + port = args.port + local =args.local + level = args.logging + + if level is 'DEBUG': + level = logging.DEBUG + elif level is 'WARN': + level = logging.WARN + elif level is 'INFO': + level = logging.INFO + logging.captureWarnings(True) - logging.basicConfig(format=('%(asctime)s - %(name)s - %(levelname)s - ' + - '%(message)s'), level=logging.INFO) + logging.basicConfig(format=('%(asctime)s - %(name)s - %(levelname)s - ' + '%(message)s'), level=level) print("=====================================") print("======== Submit and Wait ========") - print("=====================================") - print("") + print("=====================================\n") + - functionJobs = make_jobs() + functionJobs = make_jobs(engine, queue, vmem) + if local : + print('Running jobs locally') + else: + print("Sending function jobs to cluster engine: {}. Into queue: {} \n".format(engine, queue)) - print("sending function jobs to cluster") - print("") - job_outputs = process_jobs(functionJobs, max_processes=4) + job_outputs = process_jobs(functionJobs, max_processes=4, port=port, local=local) print("results from each job") for (i, result) in enumerate(job_outputs): @@ -111,4 +135,3 @@ def main(): if __name__ == "__main__": main() - diff --git a/gridmap/conf.py b/gridmap/conf.py index bfc7d5f..14a018e 100644 --- a/gridmap/conf.py +++ b/gridmap/conf.py @@ -75,10 +75,11 @@ try: import drmaa DRMAA_PRESENT = True -except (ImportError, OSError, RuntimeError): +except (ImportError, OSError, RuntimeError) as e: logger = logging.getLogger(__name__) logger.warning('Could not import drmaa. Only local multiprocessing ' + 'supported.') + logger.warning(str(e)) DRMAA_PRESENT = False # plot cpu and mem usage and send via email diff --git a/gridmap/job.py b/gridmap/job.py index 46e8abd..7fe27cb 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -44,6 +44,8 @@ import sys import traceback import functools +import tempfile +import signal from datetime import datetime from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText @@ -56,6 +58,11 @@ SMTPDataError) import zmq +#add this for chekcing of string instances in python 2 and 3 +try: + basestring +except NameError: + basestring = str from gridmap.conf import (CHECK_FREQUENCY, CREATE_PLOTS, DEFAULT_QUEUE, DRMAA_PRESENT, ERROR_MAIL_RECIPIENT, @@ -77,6 +84,8 @@ class DRMAANotPresentException(ImportError): JobControlAction, JOB_IDS_SESSION_ALL, Session, TIMEOUT_NO_WAIT) + import drmaa.errors + # Python 2.x backward compatibility if sys.version_info < (3, 0): range = xrange @@ -116,11 +125,12 @@ class Job(object): 'cause_of_death', 'num_resubmits', 'home_address', 'log_stderr_fn', 'log_stdout_fn', 'timestamp', 'host_name', 'heart_beat', 'track_mem', 'track_cpu', 'interpreting_shell', - 'copy_env') + 'copy_env', 'engine', 'walltime') def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G", name='gridmap_job', num_slots=1, queue=DEFAULT_QUEUE, - interpreting_shell=None, copy_env=True, add_env=None): + interpreting_shell=None, copy_env=True, add_env=None, + engine='SGE', walltime=None): """ Initializes a new Job. @@ -141,6 +151,9 @@ def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G", :type num_slots: int :param queue: SGE queue to schedule job on. :type queue: str + :param engine: Indicates compatability with a grid engine. Either SGE or + TORQUE / PBS + :type engine: str :param interpreting_shell: The interpreting shell for the job :type interpreting_shell: str :param copy_env: copy environment from master node to worker node? @@ -171,11 +184,14 @@ def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G", self.ret = _JOB_NOT_FINISHED self.num_slots = num_slots self.mem_free = mem_free + self.walltime = walltime self.white_list = [] self.name = name.replace(' ', '_') self.queue = queue + self.engine = engine self.interpreting_shell = interpreting_shell self.copy_env = copy_env + # Save copy of environment variables self.environment = {} def _add_env(env_vars): @@ -242,6 +258,7 @@ def execute(self): contain a pickled version of it. Input data is removed after execution to save space. """ + try: self.ret = self.function(*self.args, **self.kwlist) except Exception as exception: @@ -254,18 +271,39 @@ def native_specification(self): """ define python-style getter """ + pbs = (self.engine == "TORQUE" or self.engine == "PBS") + sge = (self.engine == "SGE") + + ret = "" + + if sge: + ret = "-shell yes" + if self.interpreting_shell: + ret += " -S {}".format(self.interpreting_shell) + + ret += " -b yes" - ret = "-shell yes" - if self.interpreting_shell: - ret += " -S {}".format(self.interpreting_shell) - ret += " -b yes" - if self.mem_free and USE_MEM_FREE: - ret += " -l mem_free={}".format(self.mem_free) if self.num_slots and self.num_slots > 1: - ret += " -pe smp {}".format(self.num_slots) + if sge: + ret += " -pe smp {}".format(self.num_slots) + if pbs: + ret += " -l nodes=1:ppn={}".format(self.num_slots) + + if self.mem_free and USE_MEM_FREE: + if sge: + ret += " -l mem_free={}".format(self.mem_free) + if pbs: + ret += " -l vmem={}".format(self.mem_free) + if self.white_list: - ret += " -l h={}".format('|'.join(self.white_list)) + if sge: + ret += " -l h={}".format('|'.join(self.white_list)) + + if self.walltime: + if pbs: + ret += " -l walltime={}".format(self.walltime) + if self.queue: ret += " -q {}".format(self.queue) @@ -280,7 +318,7 @@ class JobMonitor(object): """ Job monitor that communicates with other nodes via 0MQ. """ - def __init__(self, temp_dir=DEFAULT_TEMP_DIR): + def __init__(self, temp_dir=DEFAULT_TEMP_DIR, port=None): """ set up socket """ @@ -307,7 +345,12 @@ def __init__(self, temp_dir=DEFAULT_TEMP_DIR): self.interface = "tcp://%s" % (self.ip_address) # bind to random port and remember it - self.port = self.socket.bind_to_random_port(self.interface) + if port is None: + self.port = self.socket.bind_to_random_port(self.interface) + else: + self.port = port + self.socket.bind("{}:{}".format(self.interface, port)) + self.home_address = "%s:%i" % (self.interface, self.port) self.logger.info("Setting up JobMonitor on %s", self.home_address) @@ -315,7 +358,7 @@ def __init__(self, temp_dir=DEFAULT_TEMP_DIR): # uninitialized field (set in check method) self.jobs = [] self.ids = [] - self.session_id = None + self.session = None self.id_to_job = {} def __enter__(self): @@ -333,28 +376,44 @@ def __exit__(self, exc_type, exc_value, exc_tb): self.socket.close() # Clean up if we have a valid session - if self.session_id is not None: - with Session(self.session_id) as session: + if self.session is not None: # If we encounter an exception, kill all jobs - if exc_type is not None: - self.logger.info('Encountered %s, so killing all jobs.', - exc_type.__name__) - # try to kill off all old jobs - try: - session.control(JOB_IDS_SESSION_ALL, - JobControlAction.TERMINATE) - except InvalidJobException: - self.logger.debug("Could not kill all jobs for " + - "session.", exc_info=True) - - # Get rid of job info to prevent memory leak + if exc_type is not None: + self.logger.info('Encountered %s, so killing all jobs.', exc_type.__name__) + # try to kill off all old jobs try: - session.synchronize([JOB_IDS_SESSION_ALL], TIMEOUT_NO_WAIT, - dispose=True) - except ExitTimeoutException: + self.logger.info('Sending Terminate for all jobs on Session {} '.format(self.session)) + self.session.control(JOB_IDS_SESSION_ALL, JobControlAction.TERMINATE) + + except InvalidJobException: + self.logger.warn("Could not kill all jobs for session.", exc_info=True) + + except drmaa.errors.InternalException: + #cleanup in finaly clause below + self.logger.warn("Could not kill all jobs for session.") pass + # Get rid of job info to prevent memory leak + try: + self.session.synchronize([JOB_IDS_SESSION_ALL], TIMEOUT_NO_WAIT, + dispose=True) + except ExitTimeoutException: + pass + + finally: + #cleanup remaining jobs that cannot be kille by JOB_IDS_SESSION_ALL + for job in self.jobs: + try: + self.logger.info("Killing job {}".format(job.id)) + self.session.control(job.id, JobControlAction.TERMINATE) + except: + pass - def check(self, session_id, jobs): + self.logger.info('Exiting drmaa session {}'.format(self.session)) + self.session.exit() + + + + def check(self, session, jobs): """ serves input and output data """ @@ -362,8 +421,8 @@ def check(self, session_id, jobs): self.jobs = jobs self.id_to_job = {job.id: job for job in self.jobs} - # keep track of DRMAA session_id (for resubmissions) - self.session_id = session_id + # keep track of DRMAA session (for resubmissions) + self.session = session # determines in which interval to check if jobs are alive self.logger.debug('Starting local hearbeat') @@ -392,8 +451,7 @@ def check(self, session_id, jobs): if msg["command"] == "fetch_input": return_msg = self.id_to_job[job_id] job.timestamp = datetime.now() - self.logger.debug("Received input request from %s", - job_id) + self.logger.debug("Received input request from %s", job_id) if msg["command"] == "store_output": # be nice @@ -405,13 +463,13 @@ def check(self, session_id, jobs): # copy relevant fields job.ret = tmp_job.ret job.traceback = tmp_job.traceback - self.logger.info("Received output from %s", - job_id) + self.logger.info("Received output from %s", job_id) + if isinstance(job.ret, basestring) and len(job.ret) < 1000: + self.logger.info("Output was {}".format(job.ret) ) # Returned exception instead of job, so store that elif isinstance(msg["data"], tuple): job.ret, job.traceback = msg["data"] - self.logger.info("Received exception from %s", - job_id) + self.logger.info("Received exception from %s", job_id) else: self.logger.error(("Received message with " + "invalid data: %s"), msg) @@ -431,6 +489,10 @@ def check(self, session_id, jobs): return_msg = "all good" job.timestamp = datetime.now() + running_jobs = [ j for j in self.jobs if (len(j.track_cpu) > 0 and (isinstance(j.ret, basestring) and j.ret == _JOB_NOT_FINISHED))] + usage = [j.track_cpu[-1][0] for j in running_jobs] + print("Total CPU usage: {} % for a total of {} running jobs.".format(sum(usage), len(running_jobs)), end='\r') + if msg["command"] == "get_job": # serve job for display return_msg = job @@ -467,7 +529,7 @@ def check_if_alive(self): for job in self.jobs: # noting was returned yet - if job.ret == _JOB_NOT_FINISHED: + if isinstance(job.ret, basestring) and job.ret == _JOB_NOT_FINISHED: # exclude first-timers if job.timestamp is not None: @@ -518,7 +580,7 @@ def check_if_alive(self): old_id = job.id job.track_cpu = [] job.track_mem = [] - handle_resubmit(self.session_id, job, temp_dir=self.temp_dir) + handle_resubmit(self.session, job, temp_dir=self.temp_dir) # Update job ID if successfully resubmitted self.logger.info('Resubmitted job %s; it now has ID %s', old_id, @@ -533,18 +595,19 @@ def all_jobs_done(self): """ checks for all jobs if they are done """ + + def condition(retval): + return (isinstance(retval, basestring) and retval == _JOB_NOT_FINISHED) + + running_jobs = [ job for job in self.jobs if condition(job.ret) ] + num_jobs = len(self.jobs) + if self.logger.getEffectiveLevel() == logging.DEBUG: - num_jobs = len(self.jobs) - num_completed = sum((job.ret != _JOB_NOT_FINISHED and - not isinstance(job.ret, Exception)) - for job in self.jobs) - self.logger.debug('%i out of %i jobs completed', num_completed, + self.logger.debug('%i out of %i jobs completed', num_jobs - len(running_jobs), num_jobs) # exceptions will be handled in check_if_alive - return all((job.ret != _JOB_NOT_FINISHED and not isinstance(job.ret, - Exception)) - for job in self.jobs) + return len(running_jobs) == 0 def _send_mail(subject, body_text, attachments=None): @@ -570,7 +633,7 @@ def _send_mail(subject, body_text, attachments=None): msg["From"] = ERROR_MAIL_SENDER msg["To"] = ERROR_MAIL_RECIPIENT - logger.info('Email body: %s', body_text) + logger.info('Email Sent with subject : {}'.format(subject)) body_msg = MIMEText(body_text) msg.attach(body_msg) @@ -638,7 +701,9 @@ def send_error_mail(job): time = [HEARTBEAT_FREQUENCY * i for i in range(len(job.track_mem))] # attack mem plot - img_mem_fn = os.path.join('/tmp', "{}_mem.png".format(job.id)) + temp_dir = tempfile.mkdtemp() + + img_mem_fn = os.path.join(temp_dir, "{}_mem.png".format(job.id)) plt.figure(1) plt.plot(time, job.track_mem, "-o") plt.xlabel("time (s)") @@ -653,7 +718,7 @@ def send_error_mail(job): attachments.append(img_mem_attachement) # attach cpu plot - img_cpu_fn = os.path.join("/tmp", "{}_cpu.png".format(job.id)) + img_cpu_fn = os.path.join(temp_dir, "{}_cpu.png".format(job.id)) plt.figure(2) plt.plot(time, [cpu_load for cpu_load, _ in job.track_cpu], "-o") plt.xlabel("time (s)") @@ -676,7 +741,7 @@ def send_error_mail(job): os.unlink(img_mem_fn) -def handle_resubmit(session_id, job, temp_dir=DEFAULT_TEMP_DIR): +def handle_resubmit(session, job, temp_dir=DEFAULT_TEMP_DIR): """ heuristic to determine if the job should be resubmitted @@ -703,7 +768,7 @@ def handle_resubmit(session_id, job, temp_dir=DEFAULT_TEMP_DIR): job.num_resubmits += 1 job.cause_of_death = "" - _resubmit(session_id, job, temp_dir) + _resubmit(session, job, temp_dir) else: raise JobException(("Job {0} ({1}) failed after {2} " + "resubmissions").format(job.name, job.id, @@ -771,19 +836,19 @@ def _submit_jobs(jobs, home_address, temp_dir=DEFAULT_TEMP_DIR, white_list=None, :returns: Session ID """ - with Session() as session: - for job in jobs: - # set job white list - job.white_list = white_list + session = Session() + session.initialize() + for job in jobs: + # set job white list + job.white_list = white_list - # remember address of submission host - job.home_address = home_address + # remember address of submission host + job.home_address = home_address - # append jobs - _append_job_to_session(session, job, temp_dir=temp_dir, quiet=quiet) + # append jobs + _append_job_to_session(session, job, temp_dir=temp_dir, quiet=quiet) - sid = session.contact - return sid + return session def _append_job_to_session(session, job, temp_dir=DEFAULT_TEMP_DIR, quiet=True): @@ -844,7 +909,7 @@ def _append_job_to_session(session, job, temp_dir=DEFAULT_TEMP_DIR, quiet=True): def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True, - max_processes=1, local=False, require_cluster=False): + max_processes=1, local=False, require_cluster=False, port=None): """ Take a list of jobs and process them on the cluster. @@ -870,8 +935,9 @@ def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True, :returns: List of Job results """ + + logger = logging.getLogger(__name__) if (not local and not DRMAA_PRESENT): - logger = logging.getLogger(__name__) if require_cluster: raise DRMAANotPresentException( 'Could not import drmaa, but cluster access required.' @@ -881,23 +947,37 @@ def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True, if not local: # initialize monitor to get port number - with JobMonitor(temp_dir=temp_dir) as monitor: - # get interface and port - home_address = monitor.home_address + try: + with JobMonitor(temp_dir=temp_dir, port=port) as monitor: + # get interface and port + home_address = monitor.home_address + + # job_id field is attached to each job object + session = _submit_jobs(jobs, home_address, temp_dir=temp_dir, + white_list=white_list, quiet=quiet) + + # handling of inputs, outputs and heartbeats + logger.info("Started DRMAA session with Session {}".format(session)) + monitor.check(session, jobs) + except KeyboardInterrupt: + print("JobMonitor killed.") + print("Caught KeyBoard interrupt. Returning intermediate results.") + return [job.ret for job in jobs] + + if SEND_ERROR_MAIL: + send_completion_mail(name="gridmap job", jobs=jobs) + print("returning results so far") + return [job.ret for job in jobs] - # job_id field is attached to each job object - sid = _submit_jobs(jobs, home_address, temp_dir=temp_dir, - white_list=white_list, quiet=quiet) - # handling of inputs, outputs and heartbeats - monitor.check(sid, jobs) else: _process_jobs_locally(jobs, max_processes=max_processes) + return [job.ret for job in jobs] -def _resubmit(session_id, job, temp_dir): +def _resubmit(session, job, temp_dir): """ Resubmit a failed job. @@ -907,17 +987,15 @@ def _resubmit(session_id, job, temp_dir): logger.info("starting resubmission process") if DRMAA_PRESENT: - # append to session - with Session(session_id) as session: # try to kill off old job - try: - session.control(job.id, JobControlAction.TERMINATE) - logger.info("zombie job killed") - except Exception: - logger.error("Could not kill job with SGE id %s", job.id, - exc_info=True) - # create new job - _append_job_to_session(session, job, temp_dir=temp_dir) + try: + session.control(job.id, JobControlAction.TERMINATE) + logger.info("zombie job killed") + except Exception: + logger.error("Could not kill job with SGE id %s", job.id, + exc_info=True) + # create new job + _append_job_to_session(session, job, temp_dir=temp_dir) else: logger.error("Could not restart job because we're in local mode.") @@ -1005,21 +1083,42 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job', # send a completion mail (if requested and configured) if completion_mail and SEND_ERROR_MAIL: - send_completion_mail(name=name) + send_completion_mail(name=name, jobs=jobs) return job_results -def send_completion_mail(name): +def send_completion_mail(name, jobs): """ send out success email """ # create message - subject = "GridMap completed grid_map {}".format(name) + subject = "GridMap completed {}".format(name) # compose error message body_text = "" body_text += "Job {}\n".format(name) - + body_text += "Collected results from {} jobs \n \n".format(len(jobs)) + + attachments = [] + for job in jobs: + if job.heart_beat: + body_text += "Last memory usage: {}\n".format(job.heart_beat["memory"]) + body_text += "Mean cpu load: {}\n".format(sum(job.heart_beat["cpu_load"])/len(job.heart_beat["cpu_load"])) + body_text += ("Process was running at last check: " + + "{}\n\n").format(job.heart_beat["cpu_load"][1]) + + body_text += "Host: {}\n\n".format(job.host_name) + + if isinstance(job.ret, Exception): + body_text += "Job encountered exception: {}\n".format(job.ret) + body_text += "Stacktrace: {}\n\n".format(job.traceback) + + # attach log file + if job.heart_beat and "log_file" in job.heart_beat: + log_file_attachement = MIMEText(job.heart_beat['log_file']) + log_file_attachement.add_header('Content-Disposition', 'attachment', + filename='{}_log.txt'.format(job.id)) + attachments.append(log_file_attachement) # Send mail - _send_mail(subject, body_text) + _send_mail(subject, body_text, attachments=attachments) diff --git a/gridmap/runner.py b/gridmap/runner.py index 48c8cd2..2527290 100644 --- a/gridmap/runner.py +++ b/gridmap/runner.py @@ -188,10 +188,22 @@ def _run_job(job_id, address): """ # create heart beat process logger = logging.getLogger(__name__) + parent_pid = os.getpid() + log_path = None + if 'SGE_STDERR_PATH' in os.environ: + log_path = os.environ['SGE_STDERR_PATH'] + + if log_path is None and 'PBS_O_WORKDIR' in os.environ: + n = 'log_{}.out'.format(os.environ['PBS_JOBID'], ) + log_path = os.path.join(os.environ['PBS_O_WORKDIR'], n) + + if log_path is None: + log_path='./log.out' + heart = multiprocessing.Process(target=_heart_beat, args=(job_id, address, parent_pid, - os.environ['SGE_STDERR_PATH'], + log_path, HEARTBEAT_FREQUENCY)) logger.info("Starting heart beat") heart.start() @@ -272,12 +284,17 @@ def _main(): logger.info("Appended {0} to PYTHONPATH".format(args.module_dir)) sys.path.insert(0, args.module_dir) + current_job_id = 0 + if 'JOB_ID' in os.environ: + current_job_id = os.environ['JOB_ID'] + else: + current_job_id = os.environ['PBS_JOBID'] logger.debug("Job ID: %s\tHome address: %s\tModule dir: %s", - os.environ['JOB_ID'], + current_job_id, args.home_address, args.module_dir) # Process the database and get job started - _run_job(os.environ['JOB_ID'], args.home_address) + _run_job(current_job_id, args.home_address) if __name__ == "__main__": diff --git a/gridmap/version.py b/gridmap/version.py index 53a3201..24ae398 100644 --- a/gridmap/version.py +++ b/gridmap/version.py @@ -28,5 +28,5 @@ :organization: ETS ''' -__version__ = '0.13.0' +__version__ = '0.13.1' VERSION = tuple(int(x) for x in __version__.split('.')) diff --git a/setup.py b/setup.py index 523a236..30b05ba 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ def readme(): - with open('README.rst') as f: + with open('README.md') as f: return f.read()