From 6b341dd57a3a0b31625a17c06c404d03113dc227 Mon Sep 17 00:00:00 2001 From: anibalsolon Date: Thu, 8 Aug 2019 15:14:29 -0400 Subject: [PATCH] task sckeduler, api & socket --- src/theodore/__main__.py | 8 +- src/theodore/backends/docker.py | 131 +++++++++++++++++++----- src/theodore/scheduler/__init__.py | 155 +++++++++++++---------------- src/theodore/scheduler/api.py | 119 ++++++++++++++++++++++ 4 files changed, 299 insertions(+), 114 deletions(-) create mode 100644 src/theodore/scheduler/api.py diff --git a/src/theodore/__main__.py b/src/theodore/__main__.py index 89df929..e275bb5 100644 --- a/src/theodore/__main__.py +++ b/src/theodore/__main__.py @@ -51,7 +51,7 @@ def parse_args(args): scheduler_parser = subparsers.add_parser('scheduler') scheduler_parser.register('action', 'extend', ExtendAction) scheduler_parser.add_argument('--address', action='store', type=str, default='localhost') - scheduler_parser.add_argument('--port', action='store', type=int, default=8080) + scheduler_parser.add_argument('--port', action='store', type=int, default=3333) scheduler_parser.add_argument('--backend', nargs='+', action='extend', choices=['docker', 'singularity']) parsed = parser.parse_args(args) @@ -73,11 +73,13 @@ def main(args): print(args) if args.command == 'scheduler': - from theodore.scheduler import start + from theodore.scheduler.api import start + from theodore.scheduler import Scheduler # TODO Backend check for availability - start(args.address, args.port, args.backend or ['docker']) + scheduler = Scheduler(args.backend or ['docker']) + start(args.address, args.port, scheduler) elif args.command == 'run': pass diff --git a/src/theodore/backends/docker.py b/src/theodore/backends/docker.py index 296c480..eb2d47e 100644 --- a/src/theodore/backends/docker.py +++ b/src/theodore/backends/docker.py @@ -1,4 +1,5 @@ import os +import time import glob import yaml import docker @@ -11,9 +12,10 @@ from base64 import b64decode, b64encode from ..utils import string_types -from ..scheduler import Schedule +from ..scheduler import Schedule, SubSchedule from .backend import Backend +from tornado import httpclient class Docker(Backend): @@ -29,7 +31,7 @@ def __init__(self, scheduler): self.scheduler = scheduler def schedule(self, pipeline_config, data_config): - self.scheduler.schedule(DockerDataConfigSchedule(self, pipeline_config, data_config)) + self.scheduler.schedule(DockerSchedule(self, pipeline_config, data_config)) class DockerRun(object): @@ -45,7 +47,6 @@ def status(self): except Exception as e: return 'stopped' status = self.container.status - print('CONTAINER STATUS', status) status_map = { 'created': 'starting', 'restarting': 'running', @@ -63,17 +64,44 @@ def status(self): class DockerSchedule(Schedule): - def __init__(self, backend, pipeline_config, subject): - self.backend = backend + def __init__(self, backend, pipeline_config=None, data_config=None, parent=None): + super(DockerSchedule, self).__init__(backend=backend, parent=parent) self.pipeline_config = pipeline_config - self.subject = subject + self.data_config = data_config self._uid = str(uuid.uuid4()) - self._run = None @property def uid(self): return self._uid + @property + def logs(self): + return [{ + 'id': 'schedule', + 'hash': 'schedule', + }] + + def run(self): + if self.data_config: + yield ( + 'data_config', + DockerDataConfigSchedule( + self.backend, + self.pipeline_config, + self.data_config, + parent=self + ) + ) + + +class DockerSubjectSchedule(DockerSchedule): + + def __init__(self, backend, pipeline_config, subject, parent=None): + super(DockerSubjectSchedule, self).__init__(backend=backend, parent=parent) + self.pipeline_config = pipeline_config + self.subject = subject + self._run = None + @staticmethod def _remap_files(subject): mapping = {} @@ -94,17 +122,17 @@ def _remap_files(subject): elif isinstance(subject, dict): for key, val in subject.items(): - subject[key], submapping = DockerSchedule._remap_files(val) + subject[key], submapping = DockerSubjectSchedule._remap_files(val) mapping.update(submapping) return subject, mapping elif isinstance(subject, list): for key, val in enumerate(subject): - subject[key], submapping = DockerSchedule._remap_files(val) + subject[key], submapping = DockerSubjectSchedule._remap_files(val) mapping.update(submapping) return subject, mapping - def start(self): + def run(self): config_folder = tempfile.mkdtemp() output_folder = tempfile.mkdtemp() @@ -147,22 +175,49 @@ def status(self): return "unstarted" else: return self._run.status + + @property + def logs(self): + + if not self._run: + return [] + + try: + self._run.container.reload() + except Exception as e: + return [] + + if '8080/tcp' not in self._run.container.attrs['NetworkSettings']['Ports']: + return [] + + port = int(self._run.container.attrs['NetworkSettings']['Ports']['8080/tcp'][0]['HostPort']) + + http_client = httpclient.HTTPClient() + + try: + response = json.loads(http_client.fetch("http://localhost:%d/" % port).body.decode('utf-8')) + except Exception as e: + print(e) + http_client.close() + + return [] + +class DockerDataConfigSchedule(DockerSchedule): -class DockerDataConfigSchedule(Schedule): + _start = None + _finish = None - def __init__(self, backend, pipeline_config, data_config): - self.backend = backend + def __init__(self, backend, pipeline_config, data_config, parent=None): + super(DockerDataConfigSchedule, self).__init__(backend=backend, parent=parent) self.pipeline_config = pipeline_config self.data_config = data_config - self._uid = str(uuid.uuid4()) self._run = None - @property - def uid(self): - return self._uid + def run(self): + + self._start = time.time() - def start(self): self._output_folder = tempfile.mkdtemp() volumes = { @@ -198,22 +253,46 @@ def start(self): self._run.container.wait() - def teardown(self): - # self._run.container.attrs['State']['ExitCode'] try: files = glob.glob(os.path.join(self._output_folder, 'cpac_data_config_*.yml')) - if not files: - return - with open(files[0]) as f: - for subject in yaml.load(f): - self.backend.scheduler.schedule(DockerSchedule(self.backend, self.pipeline_config, subject)) + if files: + with open(files[0]) as f: + for subject in yaml.load(f): + subject_id = [] + if 'site_id' in subject: + subject_id += [subject['site_id']] + if 'subject_id' in subject: + subject_id += [subject['subject_id']] + if 'unique_id' in subject: + subject_id += [subject['unique_id']] + + yield ( + '_'.join(subject_id), + DockerSubjectSchedule(self.backend, self.pipeline_config, subject, parent=self) + ) finally: shutil.rmtree(self._output_folder) + self._finish = time.time() + @property def status(self): if not self._run: return "unstarted" else: return self._run.status - + + @property + def logs(self): + log = { + 'id': 'data_config', + 'hash': 'data_config', + } + + if self._start is not None: + log['start'] = self._start + + if self._finish is not None: + log['finish'] = self._finish + + return [log] diff --git a/src/theodore/scheduler/__init__.py b/src/theodore/scheduler/__init__.py index 4948777..c0a1cfb 100644 --- a/src/theodore/scheduler/__init__.py +++ b/src/theodore/scheduler/__init__.py @@ -1,118 +1,103 @@ -import tornado.escape -import tornado.web -import tornado.ioloop -from tornado.options import define, options -from tornado.concurrent import run_on_executor +from theodore import __version__ import time import yaml +import collections from concurrent.futures import ThreadPoolExecutor # `pip install futures` for python2 -MAX_WORKERS = 4 + + +MAX_WORKERS = 1 class Schedule: - def __init__(self): - self.subschedules = [] + def __init__(self, backend, parent=None): + self.backend = backend + self.parent = parent - def start(self): + def run(self): raise NotImplementedError - def teardown(self): - pass + def __hash__(self): + return hash(self.uid) + + def __eq__(self, other): + return self.uid == self.other.uid + @property + def uid(self): + raise NotImplementedError + @property def status(self): raise NotImplementedError + @property + def logs(self): + raise NotImplementedError + class Scheduler: - executor = None - - def __init__(self): - self._schedules = [] + def __init__(self, backends=[]): + self._schedules = { + None: {"children": {}, "parent": None, "schedule": None} # Root + } self.executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) - def schedule(self, schedule): - assert isinstance(schedule, Schedule) - self._schedules += [schedule] - self.executor.submit(self.run_scheduled, schedule) + def schedule(self, schedule, parent=None): + assert isinstance(schedule, (Schedule, collections.Mapping)) + if isinstance(schedule, collections.Mapping): + for sid, s in schedule.items(): + self._schedules[s] = {"children": {}, "parent": parent, "schedule": s} + self._schedules[parent]["children"][sid] = self + self.executor.submit(self.run_scheduled, s) + else: + self._schedules[schedule] = {"children": {}, "parent": parent, "schedule": schedule} + self._schedules[parent]["children"][schedule.uid] = self._schedules[schedule] + self.executor.submit(self.run_scheduled, schedule) def run_scheduled(self, schedule): try: - schedule.start() - except: - import traceback - traceback.print_exc() - pass - try: - schedule.teardown() + for yid, y in schedule.run(): + if isinstance(y, Schedule): + self.schedule({ + yid: y + }, parent=schedule) except: import traceback traceback.print_exc() @property def statuses(self): - return [ - { "uid": _s.uid, "status": _s.status } - for _s in self._schedules - ] - - -class JSONBaseHandler(tornado.web.RequestHandler): - - def prepare(self): - super(JSONBaseHandler, self).prepare() - self.json_data = None - if self.request.body: - try: - self.json_data = tornado.escape.json_decode(self.request.body) - except ValueError: - pass - - def get_argument(self, arg, default=None): - if self.request.method in ['POST', 'PUT'] and self.json_data: - return self.json_data.get(arg, default) - else: - return super(JSONBaseHandler, self).get_argument(arg, default) - - -class MainHandler(JSONBaseHandler): - def get(self): - self.finish("hello_theodore") - + statuses = {} + # for _s in sorted(self._schedules, key=lambda x: 0 if x.parent is None else 1): + # statuses[_s.uid] = {"status": _s.status} + # if _s.parent: + # if "schedules" not in statuses[_s.parent.uid]: + # statuses[_s.parent.uid]["schedules"] = {} + # statuses[_s.parent.uid]["schedules"][_s.uid] = statuses[_s.uid] -class BackendsHandler(JSONBaseHandler): - def get(self): - self.finish({"backends": self.application.settings.get('backends')}) + return statuses - -class ScheduleHandler(JSONBaseHandler): - def get(self): - self.finish(""" -
- - -
- -""") - def post(self): - if not self.request.files.get('data_config'): - self.clear() - self.set_status(400) - return self.finish() - - data_config = yaml.load(self.request.files['data_config'][0]['body']) - self.finish({'subjects': len(data_config)}) - - -def start(address, port, backends): - app = tornado.web.Application([ - (r"/", MainHandler), - (r"/schedule", ScheduleHandler), - (r"/backends", BackendsHandler), - ], backends=backends) - app.listen(address=address, port=port) - tornado.ioloop.IOLoop.current().start() + @property + def logs(self): + + def __transverse_logs(root): + node = {} + if root["schedule"]: + node["logs"] = root["schedule"].logs + node["children"] = {} + for id, s in root["children"].items(): + node["children"][id] = __transverse_logs(s) + return node + + # for _s in sorted(self._schedules, key=lambda x: 0 if x.parent is None else 1): + # logs[_s.uid] = {"logs": _s.logs} + # if _s.parent: + # if "schedules" not in logs[_s.parent.uid]: + # logs[_s.parent.uid]["schedules"] = {} + # logs[_s.parent.uid]["schedules"][_s.uid] = logs[_s.uid] + + return __transverse_logs(self._schedules[None]) diff --git a/src/theodore/scheduler/api.py b/src/theodore/scheduler/api.py new file mode 100644 index 0000000..6852572 --- /dev/null +++ b/src/theodore/scheduler/api.py @@ -0,0 +1,119 @@ +import tornado.escape +import tornado.web +import tornado.websocket +import tornado.ioloop +import tornado.autoreload +from tornado.options import define, options +from tornado.concurrent import run_on_executor + +from theodore import __version__ + +import time +import json +import yaml +import collections + + +class TheoBaseHandler(tornado.web.RequestHandler): + + def set_default_headers(self): + self.set_header("Access-Control-Allow-Origin", "*") + self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS') + + def prepare(self): + super(TheoBaseHandler, self).prepare() + self.json_data = None + if self.request.body: + try: + self.json_data = tornado.escape.json_decode(self.request.body) + except ValueError: + pass + + def get_argument(self, arg, default=None): + if self.request.method in ['POST', 'PUT'] and self.json_data: + return self.json_data.get(arg, default) + else: + return super(TheoBaseHandler, self).get_argument(arg, default) + + +class MainHandler(TheoBaseHandler): + def get(self): + self.finish({ + "api": "theodore", + "version": __version__, + }) + + +class BackendsHandler(TheoBaseHandler): + def get(self): + self.finish({"backends": self.application.settings.get('backends')}) + + +class ExecutionScheduleHandler(TheoBaseHandler): + def post(self): + + if not self.request.files.get('data_config'): + self.clear() + self.set_status(400) + return self.finish() + + data_config = yaml.load(self.request.files['data_config'][0]['body']) + self.finish({'subjects': len(data_config)}) + + +class DataScheduleHandler(TheoBaseHandler): + def post(self): + + if not self.request.files.get('data_config'): + self.clear() + self.set_status(400) + return self.finish() + + data_config = yaml.load(self.request.files['data_config'][0]['body']) + self.finish({'subjects': len(data_config)}) + + +class WatchScheduleHandler(tornado.websocket.WebSocketHandler): + + def open(self): + print("New client connected") + self.write_message(json.dumps({'connected': True})) + self.loop = tornado.ioloop.PeriodicCallback( + self.check_periodically, + 1000, + io_loop=tornado.ioloop.IOLoop.instance() + ) + self.loop.start() + + def on_message(self, message): + self.write_message(u"You said: " + message + " -> " + str(self.application.settings.get('scheduler'))) + + def on_close(self): + print("Client disconnected") + + def check_periodically(self): + try: + self.write_message(json.dumps({'time': time.time()})) + except tornado.websocket.WebSocketClosedError: + self.loop.stop() + + def check_origin(self, origin): + print(origin) + return True + + +def start(address, port, scheduler): + app = tornado.web.Application( + [ + (r"/", MainHandler), + (r"/schedule/watch", WatchScheduleHandler), + (r"/schedule/data", DataScheduleHandler), + (r"/schedule/pipeline", ExecutionScheduleHandler), + (r"/backends", BackendsHandler), + ], + scheduler=scheduler + ) + + app.listen(address=address, port=port) + tornado.autoreload.start() + tornado.ioloop.IOLoop.current().start()