Skip to content

Commit

Permalink
Merge pull request #62 from cedadev/i_did_shepard_merges_wrong
Browse files Browse the repository at this point in the history
Initial commit of all files
  • Loading branch information
dwest77a authored Mar 3, 2025
2 parents 18f0e9d + 7a40269 commit b72796c
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 23 deletions.
1 change: 1 addition & 0 deletions padocc/core/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def get_status(tb: list) -> str:

if status_fh is not None:
status_fh.update_status(phase, status, jobid=jobid)
status_fh.close()

if subset_bypass:
logger.error('\n'.join(tb))
Expand Down
12 changes: 12 additions & 0 deletions padocc/core/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def __init__(
:param dryrun: (bool) If True will prevent output files being generated
or updated and instead will demonstrate commands that would otherwise happen.
:param thorough: (bool) From args.quality - if True will create all files
from scratch, otherwise saved refs from previous runs will be loaded.
"""
Expand Down Expand Up @@ -139,6 +140,17 @@ def _set_fh_kwargs(
self._dryrun = dryrun or self._dryrun
self._thorough = thorough or self._thorough

def clear_loggers(
ignore: list[str] = None
):

ignore = ignore or []
for name in logging.root.manager.loggerDict:
if name not in ignore:
lg = logging.getLogger(name)
while lg.hasHandlers():
lg.removeHandler(lg.handlers[0])

def reset_file_handler(
logger : logging.Logger,
verbose : int,
Expand Down
1 change: 0 additions & 1 deletion padocc/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
'scan',
'compute',
'validate',
'catalog'
]

BASE_CFG = {
Expand Down
3 changes: 2 additions & 1 deletion padocc/groups/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def run(
proj_code,
mode=mode,
logid=logid,
label=phase,
label=f'{self._label}_{phase}',
fh=fh,
bypass=bypass,
run_kwargs=run_kwargs,
Expand Down Expand Up @@ -337,6 +337,7 @@ def _scan_config(
proj_code, self.workdir, groupID=self.groupID,
verbose=self._verbose, bypass=bypass,
dryrun=self._dryrun, **kwargs)

status = so.run(mode=mode, **self.fh_kwargs, **run_kwargs)
so.save_files()
return status
Expand Down
12 changes: 12 additions & 0 deletions padocc/groups/mixins/evaluations.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,18 @@ def summarise_data(self, repeat_id: str = 'main', func: Callable = print):

func('\n'.join(ot))

def get_codes_by_status(
self,
repeat_id: str = 'main',
) -> dict:
"""
Public Method for just getting the status dict
for a group.
"""

status_dict, _ = self._get_status_dict(repeat_id)
return status_dict

def summarise_status(
self,
repeat_id: str = 'main',
Expand Down
204 changes: 186 additions & 18 deletions padocc/groups/shepard.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
import argparse
import glob
import json
import time
from datetime import datetime
import os
from typing import Union

import yaml

from padocc.core.logs import LoggedOperation
from padocc.core.logs import LoggedOperation, clear_loggers
from padocc.core.utils import phases, BypassSwitch

from .group import GroupOperation

Expand All @@ -26,11 +29,37 @@
'substitutions':['a','b']
}

class ShepardTask:
def __init__(self, fid: str, groupID: str, old_phase: str, num_codes: int):

self.fid = fid
self.groupID = groupID
self.old_phase = old_phase
self.new_phase = phases[phases.index(old_phase) + 1]
self.num_codes = num_codes

class ShepardOperator(LoggedOperation):
"""
Operator class for Shepard deployments
"""

def __init__(
self,
mode: Union[str, None] = None,
conf: Union[dict,str,None] = None,
verbose: int = 0
) -> None:

self.log_label = 'shepard-deploy'

super().__init__(label=self.log_label,verbose=verbose)

def __init__(self, conf: Union[dict,None] = None, verbose: int = 0) -> None:
self.mode = mode

self.conf = self._load_config(conf)
if isinstance(conf, str):
self.conf = self._load_config(conf)
else:
self.conf = conf

if self.conf is None:
raise NotImplementedError(
Expand All @@ -42,33 +71,169 @@ def __init__(self, conf: Union[dict,None] = None, verbose: int = 0) -> None:
raise ValueError(
'Missing "flock_dir" from config.'
)

super().__init__(label='shepard-deploy',verbose=verbose)

# Shepard Files
# - workdir for operations.
# - path to a group file.

def run_batch(self, batch_limit: int = 100):
@property
def bypass(self):
"""
Standard bypass switch for shepard operations
"""
return BypassSwitch('DFLS')

@property
def cycle_limit(self):
"""
Limit for cycling operations
"""
return 1000

@property
def cycle_delay(self):
"""
Delay between cycling operations
"""
return 10

def activate(self, mode: str = None):
"""
Main operation function to activate the deployment
"""

mode = mode or self.mode

if mode == 'batch':
self.logger.info('Running in single batch mode')
self.run_batch()
else:
self.logger.info('Running in continuous cycle mode')
for cycle in range(1, self.cycle_limit+1):
self.logger.info(f'Cycle {cycle}/{self.cycle_limit}')
self.run_batch(cycle=cycle)
time.sleep(self.cycle_delay)

self.logger.info('Operation complete')

def run_batch(
self,
batch_limit: int = 100,
cycle: int = 1) -> None:
"""
Run a batch of processes.
"""

batch_limit = self.conf.get('batch_limit',None) or batch_limit

# Initialise all groups if needed (outside of batch limit)

flock = self._init_all_flocks()

flocks = self._init_all_flocks()
self.logger.info("All flocks initialised")

task_list, total_processes = self._assemble_task_list(flocks, batch_limit)

current = datetime.strftime(datetime.now(), "%y/%m/%d %H:%M:%S")

if len(task_list) == 0:
self.logger.info(f'No processes identified: {current}')
return

self.logger.info(
f'Shepard Batch {cycle}: {current} ({total_processes} processes)'
)
for task in task_list:
self.logger.info(
f' > Group: {task.groupID}, '
f'Progression: {task.old_phase} -> '
f'{task.new_phase} [{task.num_codes}]'
)

self.logger.info('Starting processing jobs')

for task in task_list:
flock = flocks[task.fid]
self._process_task(task, flock)

self.logger.info('Finished processing jobs')

del flocks
del task_list
#clear_loggers(ignore=[self.log_label])

def _process_task(
self,
task: ShepardTask,
flock: GroupOperation):
"""
Process Individual Tasks
"""

# Create Repeat ID for the given task
# Execute the correct phase with that given repeat ID

new_repeat_id = f'progression_{task.new_phase}'

flock.repeat_by_status(
'Success',
new_repeat_id,
task.old_phase
)

flock.run(
task.new_phase,
repeat_id=new_repeat_id,
bypass=self.bypass
)

def _assemble_task_list(
self,
flocks: list[GroupOperation],
batch_limit: int) -> tuple:
"""
Assemble the task list for the retrieved flocks
"""

task_list = []
proj_count = 0
for fid, flock in enumerate(flocks):
status_dict = flock.get_codes_by_status()

for phase in ['init','scan','compute']:
if 'Success' not in status_dict[phase]:
continue

num_codes = len(status_dict[phase]['Success'])

if num_codes == 0:
continue

task_list.append(
ShepardTask(fid, flock.groupID, phase, num_codes)
)

proj_count += num_codes

if proj_count > batch_limit:
break

return task_list, proj_count

def _init_all_flocks(self):
"""
Initialise and find all flocks
"""
shepard_files = self.find_flocks()
missed_flocks = []
shp_flock = []
for idx, shp in enumerate(shepard_files):
self.logger.info(f'Instantiating flock {idx+1}: {shp}')
self.logger.info(f'Discovering {len(shepard_files)} flocks')
for idx, flock_path in enumerate(shepard_files):
flock_file = flock_path.split('/')[-1]
try:
fconf = self.open_flock(shp)
fconf = self.open_flock(flock_path)
self.logger.info(f' > Accessed flock {idx+1}')
except ValueError as err:
missed_flocks.append((shp, err))
missed_flocks.append((flock_path, err))
continue

flock = GroupOperation(
Expand All @@ -79,9 +244,10 @@ def _init_all_flocks(self):
)

if not flock.datasets.get():
self.logger.info(f' > Creating flock {idx+1}: {flock_file}')
flock.init_from_file(fconf['group_file'], substitutions=fconf['substitutions'])
else:
self.logger.info(f'Skipped existing flock: {fconf["groupID"]}')
self.logger.debug(f' > Skipped creating existing flock: {fconf["groupID"]}')

shp_flock.append(flock)

Expand All @@ -104,7 +270,7 @@ def find_flocks(self):
f'Flock Directory: {self.flock_dir} - inaccessible.'
)

return glob.glob(f'{self.flock_dir}/**/*.shp', recursive=True)
return glob.glob(f'{self.flock_dir}/*.shp', recursive=True)

def _load_config(self, conf: str) -> Union[dict,None]:
"""
Expand All @@ -118,30 +284,32 @@ def _load_config(self, conf: str) -> Union[dict,None]:
config = yaml.safe_load(f)
return config
else:
self.logger.error(f'Config file {conf} unreachable')
return None
raise FileNotFoundError(f'Config file {conf} unreachable')

def _get_cmdline_args():
"""
Get command line arguments passed to shepard
"""

parser = argparse.ArgumentParser(description='Entrypoint for SHEPARD module')
parser.add_argument('mode', type=str, help='Operational mode, either `batch` or `continuous`')
parser.add_argument('--conf',type=str, help='Config file as part of deployment')
parser.add_argument('-v','--verbose', action='count', default=0, help='Set level of verbosity for logs')

args = parser.parse_args()

return {
'mode': args.mode,
'conf': args.conf,
'verbose': args.verbose}
'verbose': args.verbose,
}

def main():

kwargs = _get_cmdline_args()

shepherd = ShepardOperator(**kwargs)
shepherd.run_batch()
shepherd.activate()

if __name__ == '__main__':
main()
1 change: 0 additions & 1 deletion padocc/tests/data_creator/flock_conf.yaml

This file was deleted.

1 change: 1 addition & 0 deletions padocc/tests/data_creator/shepard_conf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
flock_dir: "/Users/daniel.westwood/cedadev/padocc_branches/shepard_dev/padocc/padocc/tests/data_creator"
4 changes: 2 additions & 2 deletions padocc/tests/data_creator/test-flock.shp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"workdir":"/Users/daniel.westwood/cedadev/padocc/padocc/tests/auto_testdata_dir",
"workdir":"/Users/daniel.westwood/cedadev/padocc_branches/shepard_dev/padocc/padocc/tests/auto_testdata_dir",
"groupID":"padocc-test-suite",
"group_file":"/Users/daniel.westwood/cedadev/padocc/padocc/tests/data_creator/Aggs.csv",
"group_file":"/Users/daniel.westwood/cedadev/padocc_branches/shepard_dev/padocc/padocc/tests/data_creator/Aggs.csv",
"substitutions":{
"init_file": {
"/home/users/dwest77/cedadev/":"/Users/daniel.westwood/cedadev/"
Expand Down
Loading

0 comments on commit b72796c

Please sign in to comment.