Skip to content

Commit 7622caa

Browse files
authored
Make result queue poll for shutdown, and tidy up at shutdown (#3709)
This poll happens at the configured htex poll period, which defaults to 10ms. Under heavy result load, this shoudn't result in much additional load: the poll loop will already be looping a lot to process the results. Under lower result load, there is a slight observable increase in CPU usage: a 30second sleep task shows this before this PR: before: real 0m37.451s user 0m2.160s sys 0m0.376s run 2, user 2.160s run 3, user 2.116s and this after this PR: real 0m37.473s user 0m2.400s sys 0m0.557s Run 2, 2.457s Run 3, 2.452s At shutdown, the ZMQ socket for incoming results is closed. This reduces both the number of threads and number of file descriptors left behind by the `--config local` tests. For example: $ pytest parsl/tests/test_monitoring/ --config local Before this PR, at end of test: 32 threads, 451 fds open. After this PR, at end of test: 1 thread, 48 fds open. This is part of PR #3397 shutdown tidyup. # Description Please include a summary of the change and (optionally) which issue is fixed. Please also include relevant motivation and context. # Changed Behaviour nothing should be really visible to normal users. Increased CPU usage in the above documented situations. ## Type of change - New feature - Code maintenance/cleanup
1 parent 7c2646e commit 7622caa

File tree

2 files changed

+28
-6
lines changed

2 files changed

+28
-6
lines changed

parsl/executors/high_throughput/executor.py

+15-2
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,9 @@ def __init__(self,
331331
interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD
332332
self.interchange_launch_cmd = interchange_launch_cmd
333333

334+
self._result_queue_thread_exit = threading.Event()
335+
self._result_queue_thread: Optional[threading.Thread] = None
336+
334337
radio_mode = "htex"
335338
enable_mpi_mode: bool = False
336339
mpi_launcher: str = "mpiexec"
@@ -455,9 +458,11 @@ def _result_queue_worker(self):
455458
"""
456459
logger.debug("Result queue worker starting")
457460

458-
while not self.bad_state_is_set:
461+
while not self.bad_state_is_set and not self._result_queue_thread_exit.is_set():
459462
try:
460-
msgs = self.incoming_q.get()
463+
msgs = self.incoming_q.get(timeout_ms=self.poll_period)
464+
if msgs is None: # timeout
465+
continue
461466

462467
except IOError as e:
463468
logger.exception("Caught broken queue with exception code {}: {}".format(e.errno, e))
@@ -515,6 +520,8 @@ def _result_queue_worker(self):
515520
else:
516521
raise BadMessage("Message received with unknown type {}".format(msg['type']))
517522

523+
logger.info("Closing result ZMQ pipe")
524+
self.incoming_q.close()
518525
logger.info("Result queue worker finished")
519526

520527
def _start_local_interchange_process(self) -> None:
@@ -817,6 +824,8 @@ def shutdown(self, timeout: float = 10.0):
817824

818825
logger.info("Attempting HighThroughputExecutor shutdown")
819826

827+
logger.info("Terminating interchange and result queue thread")
828+
self._result_queue_thread_exit.set()
820829
self.interchange_proc.terminate()
821830
try:
822831
self.interchange_proc.wait(timeout=timeout)
@@ -841,6 +850,10 @@ def shutdown(self, timeout: float = 10.0):
841850
logger.info("Closing command client")
842851
self.command_client.close()
843852

853+
logger.info("Waiting for result queue thread exit")
854+
if self._result_queue_thread:
855+
self._result_queue_thread.join()
856+
844857
logger.info("Finished HighThroughputExecutor shutdown attempt")
845858

846859
def get_usage_information(self):

parsl/executors/high_throughput/zmq_pipes.py

+13-4
Original file line numberDiff line numberDiff line change
@@ -206,12 +206,21 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None):
206206
self.port = self.results_receiver.bind_to_random_port(tcp_url(ip_address),
207207
min_port=port_range[0],
208208
max_port=port_range[1])
209+
self.poller = zmq.Poller()
210+
self.poller.register(self.results_receiver, zmq.POLLIN)
209211

210-
def get(self):
212+
def get(self, timeout_ms=None):
213+
"""Get a message from the queue, returning None if timeout expires
214+
without a message. timeout is measured in milliseconds.
215+
"""
211216
logger.debug("Waiting for ResultsIncoming message")
212-
m = self.results_receiver.recv_multipart()
213-
logger.debug("Received ResultsIncoming message")
214-
return m
217+
socks = dict(self.poller.poll(timeout=timeout_ms))
218+
if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN:
219+
m = self.results_receiver.recv_multipart()
220+
logger.debug("Received ResultsIncoming message")
221+
return m
222+
else:
223+
return None
215224

216225
def close(self):
217226
self.results_receiver.close()

0 commit comments

Comments
 (0)