Skip to content

Commit

Permalink
task sckeduler, api & socket
Browse files Browse the repository at this point in the history
  • Loading branch information
anibalsolon committed Aug 8, 2019
1 parent 70cbdf3 commit 6b341dd
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 114 deletions.
8 changes: 5 additions & 3 deletions src/theodore/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def parse_args(args):
scheduler_parser = subparsers.add_parser('scheduler')
scheduler_parser.register('action', 'extend', ExtendAction)
scheduler_parser.add_argument('--address', action='store', type=str, default='localhost')
scheduler_parser.add_argument('--port', action='store', type=int, default=8080)
scheduler_parser.add_argument('--port', action='store', type=int, default=3333)
scheduler_parser.add_argument('--backend', nargs='+', action='extend', choices=['docker', 'singularity'])

parsed = parser.parse_args(args)
Expand All @@ -73,11 +73,13 @@ def main(args):
print(args)

if args.command == 'scheduler':
from theodore.scheduler import start
from theodore.scheduler.api import start
from theodore.scheduler import Scheduler

# TODO Backend check for availability

start(args.address, args.port, args.backend or ['docker'])
scheduler = Scheduler(args.backend or ['docker'])
start(args.address, args.port, scheduler)

elif args.command == 'run':
pass
Expand Down
131 changes: 105 additions & 26 deletions src/theodore/backends/docker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import time
import glob
import yaml
import docker
Expand All @@ -11,9 +12,10 @@
from base64 import b64decode, b64encode

from ..utils import string_types
from ..scheduler import Schedule
from ..scheduler import Schedule, SubSchedule
from .backend import Backend

from tornado import httpclient


class Docker(Backend):
Expand All @@ -29,7 +31,7 @@ def __init__(self, scheduler):
self.scheduler = scheduler

def schedule(self, pipeline_config, data_config):
self.scheduler.schedule(DockerDataConfigSchedule(self, pipeline_config, data_config))
self.scheduler.schedule(DockerSchedule(self, pipeline_config, data_config))


class DockerRun(object):
Expand All @@ -45,7 +47,6 @@ def status(self):
except Exception as e:
return 'stopped'
status = self.container.status
print('CONTAINER STATUS', status)
status_map = {
'created': 'starting',
'restarting': 'running',
Expand All @@ -63,17 +64,44 @@ def status(self):

class DockerSchedule(Schedule):

def __init__(self, backend, pipeline_config, subject):
self.backend = backend
def __init__(self, backend, pipeline_config=None, data_config=None, parent=None):
super(DockerSchedule, self).__init__(backend=backend, parent=parent)
self.pipeline_config = pipeline_config
self.subject = subject
self.data_config = data_config
self._uid = str(uuid.uuid4())
self._run = None

@property
def uid(self):
return self._uid

@property
def logs(self):
return [{
'id': 'schedule',
'hash': 'schedule',
}]

def run(self):
if self.data_config:
yield (
'data_config',
DockerDataConfigSchedule(
self.backend,
self.pipeline_config,
self.data_config,
parent=self
)
)


class DockerSubjectSchedule(DockerSchedule):

def __init__(self, backend, pipeline_config, subject, parent=None):
super(DockerSubjectSchedule, self).__init__(backend=backend, parent=parent)
self.pipeline_config = pipeline_config
self.subject = subject
self._run = None

@staticmethod
def _remap_files(subject):
mapping = {}
Expand All @@ -94,17 +122,17 @@ def _remap_files(subject):

elif isinstance(subject, dict):
for key, val in subject.items():
subject[key], submapping = DockerSchedule._remap_files(val)
subject[key], submapping = DockerSubjectSchedule._remap_files(val)
mapping.update(submapping)
return subject, mapping

elif isinstance(subject, list):
for key, val in enumerate(subject):
subject[key], submapping = DockerSchedule._remap_files(val)
subject[key], submapping = DockerSubjectSchedule._remap_files(val)
mapping.update(submapping)
return subject, mapping

def start(self):
def run(self):
config_folder = tempfile.mkdtemp()
output_folder = tempfile.mkdtemp()

Expand Down Expand Up @@ -147,22 +175,49 @@ def status(self):
return "unstarted"
else:
return self._run.status

@property
def logs(self):

if not self._run:
return []

try:
self._run.container.reload()
except Exception as e:
return []

if '8080/tcp' not in self._run.container.attrs['NetworkSettings']['Ports']:
return []

port = int(self._run.container.attrs['NetworkSettings']['Ports']['8080/tcp'][0]['HostPort'])

http_client = httpclient.HTTPClient()

try:
response = json.loads(http_client.fetch("http://localhost:%d/" % port).body.decode('utf-8'))
except Exception as e:
print(e)
http_client.close()

return []


class DockerDataConfigSchedule(DockerSchedule):

class DockerDataConfigSchedule(Schedule):
_start = None
_finish = None

def __init__(self, backend, pipeline_config, data_config):
self.backend = backend
def __init__(self, backend, pipeline_config, data_config, parent=None):
super(DockerDataConfigSchedule, self).__init__(backend=backend, parent=parent)
self.pipeline_config = pipeline_config
self.data_config = data_config
self._uid = str(uuid.uuid4())
self._run = None

@property
def uid(self):
return self._uid
def run(self):

self._start = time.time()

def start(self):
self._output_folder = tempfile.mkdtemp()

volumes = {
Expand Down Expand Up @@ -198,22 +253,46 @@ def start(self):

self._run.container.wait()

def teardown(self):
# self._run.container.attrs['State']['ExitCode']
try:
files = glob.glob(os.path.join(self._output_folder, 'cpac_data_config_*.yml'))
if not files:
return
with open(files[0]) as f:
for subject in yaml.load(f):
self.backend.scheduler.schedule(DockerSchedule(self.backend, self.pipeline_config, subject))
if files:
with open(files[0]) as f:
for subject in yaml.load(f):
subject_id = []
if 'site_id' in subject:
subject_id += [subject['site_id']]
if 'subject_id' in subject:
subject_id += [subject['subject_id']]
if 'unique_id' in subject:
subject_id += [subject['unique_id']]

yield (
'_'.join(subject_id),
DockerSubjectSchedule(self.backend, self.pipeline_config, subject, parent=self)
)
finally:
shutil.rmtree(self._output_folder)

self._finish = time.time()

@property
def status(self):
if not self._run:
return "unstarted"
else:
return self._run.status


@property
def logs(self):
log = {
'id': 'data_config',
'hash': 'data_config',
}

if self._start is not None:
log['start'] = self._start

if self._finish is not None:
log['finish'] = self._finish

return [log]
Loading

0 comments on commit 6b341dd

Please sign in to comment.