Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move ZMQ monitoring radio initialization into HTEX #3813

Merged
merged 1 commit into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
executor.run_dir = self.run_dir
if self.monitoring:
executor.hub_address = self.monitoring.hub_address
executor.hub_zmq_port = self.monitoring.hub_zmq_port
executor.monitoring_messages = self.monitoring.resource_msgs
executor.submit_monitoring_radio = self.monitoring.radio
if hasattr(executor, 'provider'):
if hasattr(executor.provider, 'script_dir'):
Expand Down
28 changes: 16 additions & 12 deletions parsl/executors/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from __future__ import annotations

import os
from abc import ABCMeta, abstractmethod
from concurrent.futures import Future
from multiprocessing.queues import Queue
from typing import Any, Callable, Dict, Optional

from typing_extensions import Literal, Self

from parsl.monitoring.radios.base import MonitoringRadioSender
from parsl.monitoring.types import TaggedMonitoringMessage


class ParslExecutor(metaclass=ABCMeta):
Expand Down Expand Up @@ -42,6 +46,16 @@ class ParslExecutor(metaclass=ABCMeta):
invariant, not co-variant, and it looks like @typeguard cannot be
persuaded otherwise. So if you're implementing an executor and want to
@typeguard the constructor, you'll have to use List[Any] here.

The DataFlowKernel will set these two attributes before calling .start(),
if monitoring is enabled:
Comment on lines +50 to +51
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stylistically, I might switch the clauses. (But by no means a blocker.)

If monitoring is enabled, the DataFlowKernel will set the following two ...


monitoring_messages: Optional[Queue[TaggedMonitoringMessage]] - an executor
can send messages to the monitoring hub by putting them into
this queue.

submit_monitoring_radio: Optional[MonitoringRadioSender] - an executor can
send messages to the monitoring hub by sending them using this sender.
"""
Comment on lines +53 to 59
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that it's what's in place now, but if typing is included in the function signature, I've gotten to not including it in the list of documented parameters, so that there's a single point of authority and/or a single place to update later. A thought.


label: str = "undefined"
Expand All @@ -51,13 +65,13 @@ def __init__(
self,
*,
hub_address: Optional[str] = None,
hub_zmq_port: Optional[int] = None,
monitoring_messages: Optional[Queue[TaggedMonitoringMessage]] = None,
submit_monitoring_radio: Optional[MonitoringRadioSender] = None,
run_dir: str = ".",
run_id: Optional[str] = None,
):
self.hub_address = hub_address
self.hub_zmq_port = hub_zmq_port
self.monitoring_messages = monitoring_messages
self.submit_monitoring_radio = submit_monitoring_radio
self.run_dir = os.path.abspath(run_dir)
self.run_id = run_id
Expand Down Expand Up @@ -136,16 +150,6 @@ def hub_address(self) -> Optional[str]:
def hub_address(self, value: Optional[str]) -> None:
self._hub_address = value

@property
def hub_zmq_port(self) -> Optional[int]:
"""Port to the Hub for monitoring.
"""
return self._hub_zmq_port

@hub_zmq_port.setter
def hub_zmq_port(self, value: Optional[int]) -> None:
self._hub_zmq_port = value

@property
def submit_monitoring_radio(self) -> Optional[MonitoringRadioSender]:
"""Local radio for sending monitoring messages
Expand Down
17 changes: 17 additions & 0 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus
from parsl.monitoring.radios.zmq_router import ZMQRadioReceiver, start_zmq_receiver
from parsl.process_loggers import wrap_with_logs
from parsl.providers import LocalProvider
from parsl.providers.base import ExecutionProvider
Expand Down Expand Up @@ -334,6 +335,10 @@ def __init__(self,
self._result_queue_thread_exit = threading.Event()
self._result_queue_thread: Optional[threading.Thread] = None

self.zmq_monitoring: Optional[ZMQRadioReceiver]
self.zmq_monitoring = None
self.hub_zmq_port = None

radio_mode = "htex"
enable_mpi_mode: bool = False
mpi_launcher: str = "mpiexec"
Expand Down Expand Up @@ -427,6 +432,15 @@ def start(self):
self.loopback_address, self.interchange_port_range, self.cert_dir
)

if self.monitoring_messages is not None:
self.zmq_monitoring = start_zmq_receiver(monitoring_messages=self.monitoring_messages,
loopback_address=self.loopback_address,
port_range=self.interchange_port_range,
logdir=self.logdir,
worker_debug=self.worker_debug,
)
self.hub_zmq_port = self.zmq_monitoring.port

self._result_queue_thread = None
self._start_result_queue_thread()
self._start_local_interchange_process()
Expand Down Expand Up @@ -861,6 +875,9 @@ def shutdown(self, timeout: float = 10.0):
if self._result_queue_thread:
self._result_queue_thread.join()

if self.zmq_monitoring:
self.zmq_monitoring.close()

logger.info("Finished HighThroughputExecutor shutdown attempt")

def get_usage_information(self):
Expand Down
5 changes: 5 additions & 0 deletions parsl/monitoring/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@
class MonitoringHubStartError(ParslError):
def __str__(self) -> str:
return "Hub failed to start"


class MonitoringRouterStartError(ParslError):
def __str__(self) -> str:
return "Monitoring router failed to start"
62 changes: 17 additions & 45 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
import multiprocessing.synchronize as ms
import os
import queue
import warnings
from multiprocessing.queues import Queue
from typing import TYPE_CHECKING, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Optional, Union

import typeguard

from parsl.monitoring.errors import MonitoringHubStartError
from parsl.monitoring.radios.filesystem_router import filesystem_router_starter
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
from parsl.monitoring.radios.udp_router import udp_router_starter
from parsl.monitoring.radios.zmq_router import zmq_router_starter
from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.multiprocessing import (
SizedQueue,
Expand Down Expand Up @@ -41,7 +41,7 @@ class MonitoringHub(RepresentationMixin):
def __init__(self,
hub_address: str,
hub_port: Optional[int] = None,
hub_port_range: Tuple[int, int] = (55050, 56000),
hub_port_range: Any = None,

workflow_name: Optional[str] = None,
workflow_version: Optional[str] = None,
Expand All @@ -60,12 +60,11 @@ def __init__(self,
Note that despite the similar name, this is not related to
hub_port_range.
Default: None
hub_port_range : tuple(int, int)
The port range for a ZMQ channel from an executor process
(for example, the interchange in the High Throughput Executor)
to deliver monitoring messages to the monitoring router.
Note that despite the similar name, this is not related to hub_port.
Default: (55050, 56000)
hub_port_range : unused
Unused, but retained until 2025-09-14 to avoid configuration errors.
This value previously configured one ZMQ channel inside the
HighThroughputExecutor. That ZMQ channel is now configured by the
interchange_port_range parameter of HighThroughputExecutor.
workflow_name : str
The name for the workflow. Default to the name of the parsl script
workflow_version : str
Expand All @@ -92,6 +91,13 @@ def __init__(self,

self.hub_address = hub_address
self.hub_port = hub_port

if hub_port_range is not None:
message = "Instead of MonitoringHub.hub_port_range, Use HighThroughputExecutor.interchange_port_range"
warnings.warn(message, DeprecationWarning)
logger.warning(message)
# This is used by RepresentationMixin so needs to exist as an attribute
# even though now it is otherwise unused.
self.hub_port_range = hub_port_range

self.logging_endpoint = logging_endpoint
Expand Down Expand Up @@ -123,13 +129,10 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
# in the future, Queue will allow runtime subscripts.

if TYPE_CHECKING:
zmq_comm_q: Queue[Union[int, str]]
udp_comm_q: Queue[Union[int, str]]
else:
zmq_comm_q: Queue
udp_comm_q: Queue

zmq_comm_q = SizedQueue(maxsize=10)
udp_comm_q = SizedQueue(maxsize=10)

self.resource_msgs: Queue[TaggedMonitoringMessage]
Expand All @@ -138,20 +141,6 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
self.router_exit_event: ms.Event
self.router_exit_event = SpawnEvent()

self.zmq_router_proc = SpawnProcess(target=zmq_router_starter,
kwargs={"comm_q": zmq_comm_q,
"resource_msgs": self.resource_msgs,
"exit_event": self.router_exit_event,
"hub_address": self.hub_address,
"zmq_port_range": self.hub_port_range,
"run_dir": dfk_run_dir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
},
name="Monitoring-ZMQ-Router-Process",
daemon=True,
)
self.zmq_router_proc.start()

self.udp_router_proc = SpawnProcess(target=udp_router_starter,
kwargs={"comm_q": udp_comm_q,
"resource_msgs": self.resource_msgs,
Expand Down Expand Up @@ -180,8 +169,8 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
daemon=True,
)
self.dbm_proc.start()
logger.info("Started ZMQ router process %s, UDP router process %s and DBM process %s",
self.zmq_router_proc.pid, self.udp_router_proc.pid, self.dbm_proc.pid)
logger.info("Started UDP router process %s and DBM process %s",
self.udp_router_proc.pid, self.dbm_proc.pid)

self.filesystem_proc = SpawnProcess(target=filesystem_router_starter,
args=(self.resource_msgs, dfk_run_dir, self.router_exit_event),
Expand All @@ -193,20 +182,6 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No

self.radio = MultiprocessingQueueRadioSender(self.resource_msgs)

try:
zmq_comm_q_result = zmq_comm_q.get(block=True, timeout=120)
zmq_comm_q.close()
zmq_comm_q.join_thread()
except queue.Empty:
logger.error("Monitoring ZMQ Router has not reported port in 120s. Aborting")
raise MonitoringHubStartError()

if isinstance(zmq_comm_q_result, str):
logger.error("MonitoringRouter sent an error message: %s", zmq_comm_q_result)
raise RuntimeError(f"MonitoringRouter failed to start: {zmq_comm_q_result}")

self.hub_zmq_port = zmq_comm_q_result

try:
udp_comm_q_result = udp_comm_q.get(block=True, timeout=120)
udp_comm_q.close()
Expand Down Expand Up @@ -235,9 +210,6 @@ def close(self) -> None:
logger.info("Setting router termination event")
self.router_exit_event.set()

logger.info("Waiting for ZMQ router to terminate")
join_terminate_close_proc(self.zmq_router_proc)

logger.info("Waiting for UDP router to terminate")
join_terminate_close_proc(self.udp_router_proc)

Expand Down
Loading
Loading