Skip to content

Commit

Permalink
set-up client and docker runner api
Browse files Browse the repository at this point in the history
  • Loading branch information
anibalsolon committed Aug 30, 2019
1 parent 3e9253a commit 3162ab5
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 82 deletions.
47 changes: 28 additions & 19 deletions src/theodore/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, items)


def address(str):
addr, port = str.split(':')
port = int(port)
return addr, port


def parse_args(args):
parser = argparse.ArgumentParser(
description="theodore: a C-PAC utility"
Expand Down Expand Up @@ -50,10 +56,16 @@ def parse_args(args):

scheduler_parser = subparsers.add_parser('scheduler')
scheduler_parser.register('action', 'extend', ExtendAction)
scheduler_parser.add_argument('--address', action='store', type=str, default='localhost')
scheduler_parser.add_argument('--port', action='store', type=int, default=3333)
scheduler_parser.add_argument('--address', action='store', type=address, default='localhost:3333')
scheduler_parser.add_argument('--backend', nargs='+', action='extend', choices=['docker', 'singularity'])

run_parser = subparsers.add_parser('run')
run_parser.register('action', 'extend', ExtendAction)
run_parser.add_argument('--address', action='store', type=address)
run_parser.add_argument('--backend', choices=['docker', 'singularity'])
run_parser.add_argument('data_config')
run_parser.add_argument('pipeline', nargs='?')

parsed = parser.parse_args(args)

return parsed
Expand All @@ -66,33 +78,30 @@ def setup_logging(loglevel):


def main(args):
args = parse_args(args)
command = args[0]
args = parse_args(args[1:])
setup_logging(args.loglevel)
_logger.debug("Script starting...")

if args.command == 'scheduler':
from theodore.scheduler.api import start
from theodore.scheduler import Scheduler
from theodore.backends import docker
from theodore.scheduler.process import start_scheduler
start_scheduler(args.address, args.backend)

# TODO Backend check for availability
elif args.command == 'run':

backends = args.backend or ['docker']
clients = {}
if 'docker' in backends:
clients['docker'] = docker.Docker
if not args.address:
from theodore.scheduler.process import spawn_scheduler
spawn_scheduler(args.address, args.backend)

scheduler = Scheduler(clients, clients_priority=backends)
start(args.address, args.port, scheduler)
from theodore.scheduler.client import schedule, wait
from theodore.scheduler import SCHEDULER_ADDRESS

elif args.command == 'run':
pass

_logger.info("Script ends here")
scheduler = args.address or SCHEDULER_ADDRESS

schedule(scheduler, args.backend, args.data_config, args.pipeline)


def run():
main(sys.argv[1:])
main(sys.argv)


if __name__ == "__main__":
Expand Down
19 changes: 17 additions & 2 deletions src/theodore/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from .docker import Docker, DockerDataSettingsSchedule
from .docker import (
Docker,
DockerDataSettingsSchedule,
DockerDataConfigSchedule
)


class BackendMapper(object):
Expand All @@ -9,7 +13,6 @@ def __init__(self, **kwargs):
self.parameters = kwargs



class DataSettingsSchedule(BackendMapper):

_clients = {
Expand All @@ -24,3 +27,15 @@ def __call__(self, backend, parent=None):
)


class DataConfigSchedule(BackendMapper):

_clients = {
Docker: DockerDataConfigSchedule
}

def __call__(self, backend, parent=None):
return self._clients[backend.__class__](
backend=backend,
**self.parameters,
parent=parent
)
45 changes: 25 additions & 20 deletions src/theodore/backends/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

class Docker(Backend):

tag = 'latest'
tag = 'nightly'

def __init__(self, scheduler):
self.client = docker.from_env()
Expand All @@ -31,8 +31,8 @@ def __init__(self, scheduler):
raise "Could not connect to Docker"
self.scheduler = scheduler

def schedule(self, pipeline_config, data_config):
self.scheduler.schedule(DockerSchedule(self, pipeline_config, data_config))
def schedule(self, pipeline, data_config):
self.scheduler.schedule(DockerSchedule(self, pipeline, data_config))


class DockerRun(object):
Expand Down Expand Up @@ -64,9 +64,9 @@ def status(self):

class DockerSchedule(Schedule):

def __init__(self, backend, pipeline_config=None, data_config=None, parent=None):
def __init__(self, backend, pipeline=None, data_config=None, parent=None):
super(DockerSchedule, self).__init__(backend=backend, parent=parent)
self.pipeline_config = pipeline_config
self.pipeline = pipeline
self.data_config = data_config
self._uid = str(uuid.uuid4())
self._results = {}
Expand Down Expand Up @@ -112,7 +112,7 @@ def run(self):
'data_config',
DockerDataConfigSchedule(
self.backend,
self.pipeline_config,
self.pipeline,
self.data_config,
parent=self
)
Expand All @@ -121,9 +121,9 @@ def run(self):

class DockerSubjectSchedule(DockerSchedule):

def __init__(self, backend, pipeline_config, subject, parent=None):
def __init__(self, backend, pipeline, subject, parent=None):
super(DockerSubjectSchedule, self).__init__(backend=backend, parent=parent)
self.pipeline_config = pipeline_config
self.pipeline = pipeline
self.subject = subject
self._run = None

Expand Down Expand Up @@ -161,10 +161,10 @@ def run(self):
config_folder = tempfile.mkdtemp()
output_folder = tempfile.mkdtemp()

if self.pipeline_config is not None:
new_pipeline_config = os.path.join(config_folder, 'pipeline.yml')
shutil.copy(self.pipeline_config, new_pipeline_config)
pipeline_config = new_pipeline_config
if self.pipeline is not None:
new_pipeline = os.path.join(config_folder, 'pipeline.yml')
shutil.copy(self.pipeline, new_pipeline)
pipeline = new_pipeline

volumes = {
'/tmp': {'bind': '/scratch', 'mode': 'rw'},
Expand All @@ -176,11 +176,16 @@ def run(self):
b64encode(yaml.dump([self.subject], default_flow_style=False).encode("utf-8")).decode("utf-8")

# TODO handle local databases, transverse subject dict to get folder mappings
command = ['/', '/output', 'participant',
'--monitoring',
'--data_config_file', subject]
command = [
'/', '/output', 'participant',
'--monitoring',
'--skip_bids_validator',
'--save_working_dir',
'--data_config_file',
subject
]

if self.pipeline_config:
if self.pipeline:
command += ['--pipeline_file', '/config/pipeline.yml']

self._run = DockerRun(self.backend.client.containers.run(
Expand Down Expand Up @@ -233,9 +238,9 @@ class DockerDataConfigSchedule(DockerSchedule):
_start = None
_finish = None

def __init__(self, backend, pipeline_config, data_config, parent=None):
def __init__(self, backend, pipeline, data_config, parent=None):
super(DockerDataConfigSchedule, self).__init__(backend=backend, parent=parent)
self.pipeline_config = pipeline_config
self.pipeline = pipeline
self.data_config = data_config
self._run = None

Expand Down Expand Up @@ -292,8 +297,8 @@ def run(self):
subject_id += [subject['unique_id']]

yield (
'_'.join(subject_id),
DockerSubjectSchedule(self.backend, self.pipeline_config, subject, parent=self)
'/'.join(subject_id),
DockerSubjectSchedule(self.backend, self.pipeline, subject, parent=self)
)
finally:
shutil.rmtree(self._output_folder)
Expand Down
33 changes: 17 additions & 16 deletions src/theodore/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@


MAX_WORKERS = 1
SCHEDULER_ADDRESS = ('localhost', 3333)


class Schedule:
Expand All @@ -28,7 +29,7 @@ def __eq__(self, other):
@property
def uid(self):
raise NotImplementedError

@property
def status(self):
raise NotImplementedError
Expand All @@ -53,6 +54,9 @@ def status(self):
for k, v in self.children.items():
status['children'][str(k)] = v.status
return status

def __getitem__(self, key):
return self.children[key]


class Scheduler:
Expand All @@ -72,6 +76,8 @@ def __getitem__(self, key):

def schedule(self, schedule, parent=None, client=None):

from theodore.backends import BackendMapper

if client:
assert client in self.clients
client = self.clients[client]
Expand All @@ -81,16 +87,17 @@ def schedule(self, schedule, parent=None, client=None):
if isinstance(schedule, collections.Mapping):
schedules = {}
for sid, s in schedule.items():
s = s(client)
self._schedules[s] = ScheduleTree(name=sid, parent=parent, schedule=s)
if isinstance(s, BackendMapper):
s = s(client)
self._schedules.children[s] = ScheduleTree(name=sid, parent=parent, schedule=s)
if parent:
self._schedules[parent]["children"][sid] = self
self._schedules[parent].children[sid] = self._schedules[s]
schedules[sid] = s
self.executor.submit(self.run_scheduled, s)
return schedules
else:
schedule = schedule(client)
if isinstance(schedule, BackendMapper):
schedule = schedule(client)
self._schedules.children[schedule] = ScheduleTree(name=schedule.uid, parent=parent, schedule=schedule)
if parent:
self._schedules[parent].children[schedule] = self._schedules[schedule]
Expand All @@ -116,9 +123,10 @@ def run_scheduled(self, schedule):
if not isinstance(y, Schedule):
continue

self.schedule({
yid: y
}, parent=schedule)
self.schedule(
{ yid: y },
parent=schedule
)

if sid in self._watchers:
for watcher in self._watchers[sid]:
Expand All @@ -131,7 +139,7 @@ def run_scheduled(self, schedule):
function=watcher["function"],
children=watcher["children"],
)

if sid in self._watchers:
for watcher in self._watchers[sid]:
function = watcher["function"]
Expand Down Expand Up @@ -162,11 +170,4 @@ def __transverse_logs(root):
node["children"][id] = __transverse_logs(s)
return node

# for _s in sorted(self._schedules, key=lambda x: 0 if x.parent is None else 1):
# logs[_s.uid] = {"logs": _s.logs}
# if _s.parent:
# if "schedules" not in logs[_s.parent.uid]:
# logs[_s.parent.uid]["schedules"] = {}
# logs[_s.parent.uid]["schedules"][_s.uid] = logs[_s.uid]

return __transverse_logs(self._schedules[None])
Loading

0 comments on commit 3162ab5

Please sign in to comment.