Skip to content

Commit 12a5f88

Browse files
committed
Make UDP radio receiver shutdown period configurable
.. and use that new parameter in stderr/out tests which don't make use of the UDP radio.
1 parent d5c1fe1 commit 12a5f88

File tree

3 files changed

+17
-4
lines changed

3 files changed

+17
-4
lines changed

parsl/monitoring/monitoring.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ def __init__(self,
4848
logdir: Optional[str] = None,
4949
monitoring_debug: bool = False,
5050
resource_monitoring_enabled: bool = True,
51-
resource_monitoring_interval: float = 30): # in seconds
51+
resource_monitoring_interval: float = 30, # in seconds
52+
udp_atexit_timeout: float = 3):
5253
"""
5354
Parameters
5455
----------
@@ -87,6 +88,10 @@ def __init__(self,
8788
If set to 0, only start and end information will be logged, and no periodic monitoring will
8889
be made.
8990
Default: 30 seconds
91+
udp_atexit_timeout : float
92+
The amount of time in seconds to wait for more UDP messages at shutdown, after the last DFK
93+
workflow message is received.
94+
9095
"""
9196

9297
if _db_manager_excepts:
@@ -106,6 +111,8 @@ def __init__(self,
106111
self.resource_monitoring_enabled = resource_monitoring_enabled
107112
self.resource_monitoring_interval = resource_monitoring_interval
108113

114+
self.udp_atexit_timeout = udp_atexit_timeout
115+
109116
def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None:
110117

111118
logger.debug("Starting MonitoringHub")
@@ -161,6 +168,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
161168
"zmq_port_range": self.hub_port_range,
162169
"logdir": self.logdir,
163170
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
171+
"udp_atexit_timeout": self.udp_atexit_timeout
164172
},
165173
name="Monitoring-Router-Process",
166174
daemon=True,

parsl/monitoring/router.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def __init__(self,
3232
monitoring_hub_address: str = "127.0.0.1",
3333
logdir: str = ".",
3434
logging_level: int = logging.INFO,
35-
atexit_timeout: int = 3, # in seconds
35+
atexit_timeout: float, # in seconds
3636
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
3737
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
3838
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
@@ -55,7 +55,8 @@ def __init__(self,
5555
logging_level : int
5656
Logging level as defined in the logging module. Default: logging.INFO
5757
atexit_timeout : float, optional
58-
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
58+
The amount of time in seconds to wait for more UDP messages at shutdown, after the last DFK
59+
workflow message is received.
5960
*_msgs : Queue
6061
Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag.
6162
@@ -214,6 +215,8 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
214215
udp_port: Optional[int],
215216
zmq_port_range: Tuple[int, int],
216217

218+
udp_atexit_timeout: float,
219+
217220
logdir: str,
218221
logging_level: int) -> None:
219222
setproctitle("parsl: monitoring router")
@@ -227,7 +230,8 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
227230
node_msgs=node_msgs,
228231
block_msgs=block_msgs,
229232
resource_msgs=resource_msgs,
230-
exit_event=exit_event)
233+
exit_event=exit_event,
234+
atexit_timeout=udp_atexit_timeout)
231235
except Exception as e:
232236
logger.error("MonitoringRouter construction failed.", exc_info=True)
233237
comm_q.put(f"Monitoring router construction failed: {e}")

parsl/tests/test_monitoring/test_stdouterr.py

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def fresh_config(run_dir):
3838
monitoring=MonitoringHub(
3939
hub_address="localhost",
4040
hub_port=55055,
41+
udp_atexit_timeout=0
4142
)
4243
)
4344

0 commit comments

Comments
 (0)