Skip to content

Commit d79e7b8

Browse files
authored
Move ZMQ monitoring radio initialization into HTEX (#3813)
This is part of an ongoing project to move monitoring message forwarding into relevant executors, with executors only initializing whichever routers are needed. In this step of the project: The ZMQ radio/router is only used by the high throughput executor to receive messages from the interchange. This PR makes the initialization of that look more like other interchange<->submit side ZMQ initialiation, rather than look like monitoring system initialization. This does not stop other future components which want to use monitoring-over-ZMQ: those components would initialize ZMQ routing themselves. API-wise: Parsl executors lose the hub_zmq_port attribute which was configured by the DFK after initialising monitoring. As a replacement, Parsl executors gain a monitoring_messages attribute, which is the multiprocessing.Queue where executors should send monitoring messages when they have received them from their own components. This queue already existed between monitoring components, but is now exposed as an interface for other Parsl components. Configuration-wise: Several ZMQ-related monitoring configuration options now come from HTEX configuration, rather than from MonitoringHub configuration. There are no new configuration options. * Choice of ZMQ port now comes from the same range as other HTEX ZMQ ports, rather than from the port range in MonitoringHub. This means the minimum number of available ports there is now 4, instead of 3. * ZMQ router log files now go into the High Throughput Executor's log directory, usually a subdirectory of the main log directory. * ZMQ log verbosity is configured by High Throughput Executor's worker_debug configuration parameter, rather than by MonitoringHub.monitoring_debug. * The ZMQ router will now listen on the interchange loopback address, configured by HighThroughputExecutor.loopback_address, rather than on all addresses. ## Type of change - New feature - Code maintenance/cleanup
1 parent 3e10e38 commit d79e7b8

File tree

10 files changed

+167
-78
lines changed

10 files changed

+167
-78
lines changed

parsl/dataflow/dflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1129,7 +1129,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
11291129
executor.run_dir = self.run_dir
11301130
if self.monitoring:
11311131
executor.hub_address = self.monitoring.hub_address
1132-
executor.hub_zmq_port = self.monitoring.hub_zmq_port
1132+
executor.monitoring_messages = self.monitoring.resource_msgs
11331133
executor.submit_monitoring_radio = self.monitoring.radio
11341134
if hasattr(executor, 'provider'):
11351135
if hasattr(executor.provider, 'script_dir'):

parsl/executors/base.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1+
from __future__ import annotations
2+
13
import os
24
from abc import ABCMeta, abstractmethod
35
from concurrent.futures import Future
6+
from multiprocessing.queues import Queue
47
from typing import Any, Callable, Dict, Optional
58

69
from typing_extensions import Literal, Self
710

811
from parsl.monitoring.radios.base import MonitoringRadioSender
12+
from parsl.monitoring.types import TaggedMonitoringMessage
913

1014

1115
class ParslExecutor(metaclass=ABCMeta):
@@ -42,6 +46,16 @@ class ParslExecutor(metaclass=ABCMeta):
4246
invariant, not co-variant, and it looks like @typeguard cannot be
4347
persuaded otherwise. So if you're implementing an executor and want to
4448
@typeguard the constructor, you'll have to use List[Any] here.
49+
50+
The DataFlowKernel will set these two attributes before calling .start(),
51+
if monitoring is enabled:
52+
53+
monitoring_messages: Optional[Queue[TaggedMonitoringMessage]] - an executor
54+
can send messages to the monitoring hub by putting them into
55+
this queue.
56+
57+
submit_monitoring_radio: Optional[MonitoringRadioSender] - an executor can
58+
send messages to the monitoring hub by sending them using this sender.
4559
"""
4660

4761
label: str = "undefined"
@@ -51,13 +65,13 @@ def __init__(
5165
self,
5266
*,
5367
hub_address: Optional[str] = None,
54-
hub_zmq_port: Optional[int] = None,
68+
monitoring_messages: Optional[Queue[TaggedMonitoringMessage]] = None,
5569
submit_monitoring_radio: Optional[MonitoringRadioSender] = None,
5670
run_dir: str = ".",
5771
run_id: Optional[str] = None,
5872
):
5973
self.hub_address = hub_address
60-
self.hub_zmq_port = hub_zmq_port
74+
self.monitoring_messages = monitoring_messages
6175
self.submit_monitoring_radio = submit_monitoring_radio
6276
self.run_dir = os.path.abspath(run_dir)
6377
self.run_id = run_id
@@ -136,16 +150,6 @@ def hub_address(self) -> Optional[str]:
136150
def hub_address(self, value: Optional[str]) -> None:
137151
self._hub_address = value
138152

139-
@property
140-
def hub_zmq_port(self) -> Optional[int]:
141-
"""Port to the Hub for monitoring.
142-
"""
143-
return self._hub_zmq_port
144-
145-
@hub_zmq_port.setter
146-
def hub_zmq_port(self, value: Optional[int]) -> None:
147-
self._hub_zmq_port = value
148-
149153
@property
150154
def submit_monitoring_radio(self) -> Optional[MonitoringRadioSender]:
151155
"""Local radio for sending monitoring messages

parsl/executors/high_throughput/executor.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
)
3030
from parsl.executors.status_handling import BlockProviderExecutor
3131
from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus
32+
from parsl.monitoring.radios.zmq_router import ZMQRadioReceiver, start_zmq_receiver
3233
from parsl.process_loggers import wrap_with_logs
3334
from parsl.providers import LocalProvider
3435
from parsl.providers.base import ExecutionProvider
@@ -334,6 +335,10 @@ def __init__(self,
334335
self._result_queue_thread_exit = threading.Event()
335336
self._result_queue_thread: Optional[threading.Thread] = None
336337

338+
self.zmq_monitoring: Optional[ZMQRadioReceiver]
339+
self.zmq_monitoring = None
340+
self.hub_zmq_port = None
341+
337342
radio_mode = "htex"
338343
enable_mpi_mode: bool = False
339344
mpi_launcher: str = "mpiexec"
@@ -427,6 +432,15 @@ def start(self):
427432
self.loopback_address, self.interchange_port_range, self.cert_dir
428433
)
429434

435+
if self.monitoring_messages is not None:
436+
self.zmq_monitoring = start_zmq_receiver(monitoring_messages=self.monitoring_messages,
437+
loopback_address=self.loopback_address,
438+
port_range=self.interchange_port_range,
439+
logdir=self.logdir,
440+
worker_debug=self.worker_debug,
441+
)
442+
self.hub_zmq_port = self.zmq_monitoring.port
443+
430444
self._result_queue_thread = None
431445
self._start_result_queue_thread()
432446
self._start_local_interchange_process()
@@ -861,6 +875,9 @@ def shutdown(self, timeout: float = 10.0):
861875
if self._result_queue_thread:
862876
self._result_queue_thread.join()
863877

878+
if self.zmq_monitoring:
879+
self.zmq_monitoring.close()
880+
864881
logger.info("Finished HighThroughputExecutor shutdown attempt")
865882

866883
def get_usage_information(self):

parsl/monitoring/errors.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,8 @@
44
class MonitoringHubStartError(ParslError):
55
def __str__(self) -> str:
66
return "Hub failed to start"
7+
8+
9+
class MonitoringRouterStartError(ParslError):
10+
def __str__(self) -> str:
11+
return "Monitoring router failed to start"

parsl/monitoring/monitoring.py

Lines changed: 17 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@
44
import multiprocessing.synchronize as ms
55
import os
66
import queue
7+
import warnings
78
from multiprocessing.queues import Queue
8-
from typing import TYPE_CHECKING, Optional, Tuple, Union
9+
from typing import TYPE_CHECKING, Any, Optional, Union
910

1011
import typeguard
1112

1213
from parsl.monitoring.errors import MonitoringHubStartError
1314
from parsl.monitoring.radios.filesystem_router import filesystem_router_starter
1415
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
1516
from parsl.monitoring.radios.udp_router import udp_router_starter
16-
from parsl.monitoring.radios.zmq_router import zmq_router_starter
1717
from parsl.monitoring.types import TaggedMonitoringMessage
1818
from parsl.multiprocessing import (
1919
SizedQueue,
@@ -41,7 +41,7 @@ class MonitoringHub(RepresentationMixin):
4141
def __init__(self,
4242
hub_address: str,
4343
hub_port: Optional[int] = None,
44-
hub_port_range: Tuple[int, int] = (55050, 56000),
44+
hub_port_range: Any = None,
4545

4646
workflow_name: Optional[str] = None,
4747
workflow_version: Optional[str] = None,
@@ -60,12 +60,11 @@ def __init__(self,
6060
Note that despite the similar name, this is not related to
6161
hub_port_range.
6262
Default: None
63-
hub_port_range : tuple(int, int)
64-
The port range for a ZMQ channel from an executor process
65-
(for example, the interchange in the High Throughput Executor)
66-
to deliver monitoring messages to the monitoring router.
67-
Note that despite the similar name, this is not related to hub_port.
68-
Default: (55050, 56000)
63+
hub_port_range : unused
64+
Unused, but retained until 2025-09-14 to avoid configuration errors.
65+
This value previously configured one ZMQ channel inside the
66+
HighThroughputExecutor. That ZMQ channel is now configured by the
67+
interchange_port_range parameter of HighThroughputExecutor.
6968
workflow_name : str
7069
The name for the workflow. Default to the name of the parsl script
7170
workflow_version : str
@@ -92,6 +91,13 @@ def __init__(self,
9291

9392
self.hub_address = hub_address
9493
self.hub_port = hub_port
94+
95+
if hub_port_range is not None:
96+
message = "Instead of MonitoringHub.hub_port_range, Use HighThroughputExecutor.interchange_port_range"
97+
warnings.warn(message, DeprecationWarning)
98+
logger.warning(message)
99+
# This is used by RepresentationMixin so needs to exist as an attribute
100+
# even though now it is otherwise unused.
95101
self.hub_port_range = hub_port_range
96102

97103
self.logging_endpoint = logging_endpoint
@@ -123,13 +129,10 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
123129
# in the future, Queue will allow runtime subscripts.
124130

125131
if TYPE_CHECKING:
126-
zmq_comm_q: Queue[Union[int, str]]
127132
udp_comm_q: Queue[Union[int, str]]
128133
else:
129-
zmq_comm_q: Queue
130134
udp_comm_q: Queue
131135

132-
zmq_comm_q = SizedQueue(maxsize=10)
133136
udp_comm_q = SizedQueue(maxsize=10)
134137

135138
self.resource_msgs: Queue[TaggedMonitoringMessage]
@@ -138,20 +141,6 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
138141
self.router_exit_event: ms.Event
139142
self.router_exit_event = SpawnEvent()
140143

141-
self.zmq_router_proc = SpawnProcess(target=zmq_router_starter,
142-
kwargs={"comm_q": zmq_comm_q,
143-
"resource_msgs": self.resource_msgs,
144-
"exit_event": self.router_exit_event,
145-
"hub_address": self.hub_address,
146-
"zmq_port_range": self.hub_port_range,
147-
"run_dir": dfk_run_dir,
148-
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
149-
},
150-
name="Monitoring-ZMQ-Router-Process",
151-
daemon=True,
152-
)
153-
self.zmq_router_proc.start()
154-
155144
self.udp_router_proc = SpawnProcess(target=udp_router_starter,
156145
kwargs={"comm_q": udp_comm_q,
157146
"resource_msgs": self.resource_msgs,
@@ -180,8 +169,8 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
180169
daemon=True,
181170
)
182171
self.dbm_proc.start()
183-
logger.info("Started ZMQ router process %s, UDP router process %s and DBM process %s",
184-
self.zmq_router_proc.pid, self.udp_router_proc.pid, self.dbm_proc.pid)
172+
logger.info("Started UDP router process %s and DBM process %s",
173+
self.udp_router_proc.pid, self.dbm_proc.pid)
185174

186175
self.filesystem_proc = SpawnProcess(target=filesystem_router_starter,
187176
args=(self.resource_msgs, dfk_run_dir, self.router_exit_event),
@@ -193,20 +182,6 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
193182

194183
self.radio = MultiprocessingQueueRadioSender(self.resource_msgs)
195184

196-
try:
197-
zmq_comm_q_result = zmq_comm_q.get(block=True, timeout=120)
198-
zmq_comm_q.close()
199-
zmq_comm_q.join_thread()
200-
except queue.Empty:
201-
logger.error("Monitoring ZMQ Router has not reported port in 120s. Aborting")
202-
raise MonitoringHubStartError()
203-
204-
if isinstance(zmq_comm_q_result, str):
205-
logger.error("MonitoringRouter sent an error message: %s", zmq_comm_q_result)
206-
raise RuntimeError(f"MonitoringRouter failed to start: {zmq_comm_q_result}")
207-
208-
self.hub_zmq_port = zmq_comm_q_result
209-
210185
try:
211186
udp_comm_q_result = udp_comm_q.get(block=True, timeout=120)
212187
udp_comm_q.close()
@@ -235,9 +210,6 @@ def close(self) -> None:
235210
logger.info("Setting router termination event")
236211
self.router_exit_event.set()
237212

238-
logger.info("Waiting for ZMQ router to terminate")
239-
join_terminate_close_proc(self.zmq_router_proc)
240-
241213
logger.info("Waiting for UDP router to terminate")
242214
join_terminate_close_proc(self.udp_router_proc)
243215

0 commit comments

Comments
 (0)