-
Notifications
You must be signed in to change notification settings - Fork 203
/
Copy pathconfig.py
192 lines (174 loc) · 9.54 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import logging
from typing import Callable, Iterable, Optional, Sequence, Union
import typeguard
from typing_extensions import Literal
from parsl.dataflow.dependency_resolvers import DependencyResolver
from parsl.dataflow.memoization import Memoizer
from parsl.dataflow.taskrecord import TaskRecord
from parsl.errors import ConfigurationError
from parsl.executors.base import ParslExecutor
from parsl.executors.threads import ThreadPoolExecutor
from parsl.monitoring import MonitoringHub
from parsl.usage_tracking.api import UsageInformation
from parsl.usage_tracking.levels import DISABLED as USAGE_TRACKING_DISABLED
from parsl.usage_tracking.levels import LEVEL_3 as USAGE_TRACKING_LEVEL_3
from parsl.utils import RepresentationMixin
logger = logging.getLogger(__name__)
class Config(RepresentationMixin, UsageInformation):
"""
Specification of Parsl configuration options.
Parameters
----------
executors : sequence of ParslExecutor, optional
List (or other iterable) of `ParslExecutor` instances to use for executing tasks.
Default is (:class:`~parsl.executors.threads.ThreadPoolExecutor()`,).
app_cache : bool, optional
Enable app caching. Default is True.
checkpoint_files : sequence of str, optional
List of paths to checkpoint files. See :func:`parsl.utils.get_all_checkpoints` and
:func:`parsl.utils.get_last_checkpoint` for helpers. Default is None.
checkpoint_mode : str, optional
Checkpoint mode to use, can be ``'dfk_exit'``, ``'task_exit'``, ``'periodic'`` or ``'manual'``.
If set to `None`, checkpointing will be disabled. Default is None.
checkpoint_period : str, optional
Time interval (in "HH:MM:SS") at which to checkpoint completed tasks. Only has an effect if
``checkpoint_mode='periodic'``.
dependency_resolver: plugin point for custom dependency resolvers. Default: only resolve Futures,
using the `SHALLOW_DEPENDENCY_RESOLVER`.
exit_mode: str, optional
When Parsl is used as a context manager (using ``with parsl.load`` syntax) then this parameter
controls what will happen to running tasks and exceptions at exit. The options are:
* ``cleanup``: cleanup the DFK on exit without waiting for any tasks
* ``skip``: skip all shutdown behaviour when exiting the context manager
* ``wait``: wait for all tasks to complete when exiting normally, but exit immediately when exiting due to an exception.
Default is ``cleanup``.
garbage_collect : bool. optional.
Delete task records from DFK when tasks have completed. Default: True
internal_tasks_max_threads : int, optional
Maximum number of threads to allocate for submit side internal tasks such as some data transfers
or @joinapps
Default is 10.
monitoring : MonitoringHub, optional
The config to use for database monitoring. Default is None which does not log to a database.
retries : int, optional
Set the number of retries (or available retry budget when using retry_handler) in case of failure. Default is 0.
retry_handler : function, optional
A user pluggable handler to decide if/how a task retry should happen.
If no handler is specified, then each task failure incurs a retry cost
of 1.
run_dir : str, optional
Path to run directory. Default is 'runinfo'.
std_autopath : function, optional
Sets the function used to generate stdout/stderr specifications when parsl.AUTO_LOGPATH is used. If no function
is specified, generates paths that look like: ``rundir/NNN/task_logs/X/task_{id}_{name}{label}.{out/err}``
strategy : str, optional
Strategy to use for scaling blocks according to workflow needs. Can be 'simple', 'htex_auto_scale', 'none'
or `None`.
If 'none' or `None`, dynamic scaling will be disabled. Default is 'simple'. The literal value `None` is
deprecated.
strategy_period : float or int, optional
How often the scaling strategy should be executed. Default is 5 seconds.
max_idletime : float, optional
The maximum idle time allowed for an executor before strategy could shut down unused blocks. Default is 120.0 seconds.
usage_tracking : int, optional
Set this field to 1, 2, or 3 to opt-in to Parsl's usage tracking system.
The value represents the level of usage tracking detail to be collected.
Setting this field to 0 will disable usage tracking. Default (this field is not set): usage tracking is not enabled.
Parsl only collects minimal, non personally-identifiable,
information used for reporting to our funding agencies.
project_name: str, optional
Option to deanonymize usage tracking data.
If set, this value will be used as the project name in the usage tracking data and placed on the leaderboard.
initialize_logging : bool, optional
Make DFK optionally not initialize any logging. Log messages
will still be passed into the python logging system under the
``parsl`` logger name, but the logging system will not by default
perform any further log system configuration. Most noticeably,
it will not create a parsl.log logfile. The use case for this
is when parsl is used as a library in a bigger system which
wants to configure logging in a way that makes sense for that
bigger system as a whole.
"""
@typeguard.typechecked
def __init__(self,
executors: Optional[Iterable[ParslExecutor]] = None,
app_cache: bool = True,
memoizer: Optional[Memoizer] = None,
checkpoint_files: Optional[Sequence[str]] = None,
checkpoint_mode: Union[None,
Literal['task_exit'],
Literal['periodic'],
Literal['dfk_exit'],
Literal['manual']] = None,
checkpoint_period: Optional[str] = None,
dependency_resolver: Optional[DependencyResolver] = None,
exit_mode: Literal['cleanup', 'skip', 'wait'] = 'cleanup',
garbage_collect: bool = True,
internal_tasks_max_threads: int = 10,
retries: int = 0,
retry_handler: Optional[Callable[[Exception, TaskRecord], float]] = None,
run_dir: str = 'runinfo',
std_autopath: Optional[Callable] = None,
strategy: Optional[str] = 'simple',
strategy_period: Union[float, int] = 5,
max_idletime: float = 120.0,
monitoring: Optional[MonitoringHub] = None,
usage_tracking: int = 0,
project_name: Optional[str] = None,
initialize_logging: bool = True) -> None:
executors = tuple(executors or [])
if not executors:
executors = (ThreadPoolExecutor(),)
self._executors: Sequence[ParslExecutor] = executors
self._validate_executors()
self.memoizer = memoizer
self.app_cache = app_cache
self.checkpoint_files = checkpoint_files
self.checkpoint_mode = checkpoint_mode
if checkpoint_period is not None:
if checkpoint_mode is None:
logger.debug('The requested `checkpoint_period={}` will have no effect because `checkpoint_mode=None`'.format(
checkpoint_period)
)
elif checkpoint_mode != 'periodic':
logger.debug("Requested checkpoint period of {} only has an effect with checkpoint_mode='periodic'".format(
checkpoint_period)
)
if checkpoint_mode == 'periodic' and checkpoint_period is None:
checkpoint_period = "00:30:00"
self.checkpoint_period = checkpoint_period
self.dependency_resolver = dependency_resolver
self.exit_mode = exit_mode
self.garbage_collect = garbage_collect
self.internal_tasks_max_threads = internal_tasks_max_threads
self.retries = retries
self.retry_handler = retry_handler
self.run_dir = run_dir
self.strategy = strategy
self.strategy_period = strategy_period
self.max_idletime = max_idletime
self.validate_usage_tracking(usage_tracking)
self.usage_tracking = usage_tracking
self.project_name = project_name
self.initialize_logging = initialize_logging
self.monitoring = monitoring
self.std_autopath: Optional[Callable] = std_autopath
@property
def executors(self) -> Sequence[ParslExecutor]:
return self._executors
def _validate_executors(self) -> None:
if len(self.executors) == 0:
raise ConfigurationError('At least one executor must be specified')
labels = [e.label for e in self.executors]
duplicates = [e for n, e in enumerate(labels) if e in labels[:n]]
if len(duplicates) > 0:
raise ConfigurationError('Executors must have unique labels ({})'.format(
', '.join(['label={}'.format(repr(d)) for d in duplicates])))
def validate_usage_tracking(self, level: int) -> None:
if not USAGE_TRACKING_DISABLED <= level <= USAGE_TRACKING_LEVEL_3:
raise ConfigurationError(
f"Usage Tracking values must be 0, 1, 2, or 3 and not {level}"
)
def get_usage_information(self):
return {"executors_len": len(self.executors),
"dependency_resolver": self.dependency_resolver is not None}