diff --git a/dispy.py b/dispy.py index fc553ae..0e2f61b 100755 --- a/dispy.py +++ b/dispy.py @@ -1865,8 +1865,8 @@ def _terminate_scheduler(self, coro=None): if not ext_ip_addr: break - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock = AsyncSocket(sock, blocking=True) + sock = AsyncSocket(socket.socket(socket.AF_INET, socket.SOCK_STREAM), blocking=True, + keyfile=self.keyfile, certfile=self.certfile) sock.connect((self.scheduler_ip_addr, scheduler_port)) sock.sendall(self._cluster.auth_code) sock.send_msg('CLIENT:' + serialize({'version':_dispy_version, 'ip_addr':ext_ip_addr, diff --git a/dispynode.py b/dispynode.py index 1e50bb7..5813bc2 100755 --- a/dispynode.py +++ b/dispynode.py @@ -296,7 +296,7 @@ def send_pong_msg(self, addr, auth_code, coro=None): # TODO: process each message as separate Coro, so # exceptions are contained? if msg.startswith('PING:'): - if self.num_cpus != self.avail_cpus or self.scheduler['ip'] != None: + if self.num_cpus != self.avail_cpus: logger.debug('Busy (%s/%s); ignoring ping message from %s:%s', self.num_cpus, self.avail_cpus, addr[0], addr[1]) continue diff --git a/dispyscheduler.py b/dispyscheduler.py index 874ef88..59b1810 100755 --- a/dispyscheduler.py +++ b/dispyscheduler.py @@ -31,6 +31,7 @@ import traceback import tempfile import cPickle as pickle +import uuid from dispy import _Compute, DispyJob, _DispyJob_, _Node, _JobReply, \ num_min, _parse_nodes, _node_ipaddr, _XferFile, _dispy_version @@ -615,7 +616,7 @@ def _job_request_task(self, msg): except: logger.debug('Ignoring job request from %s', addr[0]) return - _job.uid = id(_job) + _job.uid = uuid.uuid4().hex setattr(_job, 'node', None) job = type('DispyJob', (), {'status':DispyJob.Created, 'start_time':None, 'end_time':None}) @@ -1392,7 +1393,7 @@ def cleanup_computation(self, cluster): scheduler = _Scheduler(**config) while True: try: - sys.stdin.readline() + time.sleep(60) except KeyboardInterrupt: # TODO: terminate even if jobs are scheduled? logger.info('Interrupted; terminating')