Skip to content

Commit 7339d0a

Browse files
committed
count fds
1 parent 12a5f88 commit 7339d0a

File tree

7 files changed

+33
-11
lines changed

7 files changed

+33
-11
lines changed

parsl/dataflow/dflow.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def __init__(self, config: Config) -> None:
9393
self.run_dir = make_rundir(config.run_dir)
9494

9595
if config.initialize_logging:
96-
parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG)
96+
_, self.logging_handler = parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG)
9797

9898
logger.info("Starting DataFlowKernel with config\n{}".format(config))
9999

@@ -1321,7 +1321,13 @@ def cleanup(self) -> None:
13211321
else:
13221322
logger.debug("Cleaning up non-default DFK - not unregistering")
13231323

1324-
logger.info("DFK cleanup complete")
1324+
# TODO: do this in parsl/logutils.py
1325+
logger.info("DFK cleanup complete - removing parsl.log handler")
1326+
logger_to_remove = logging.getLogger("parsl")
1327+
logger_to_remove.removeHandler(self.logging_handler)
1328+
self.logging_handler.close()
1329+
1330+
logger.info("handler closed - is this going to break things?")
13251331

13261332
def checkpoint(self, tasks: Optional[Sequence[TaskRecord]] = None) -> str:
13271333
"""Checkpoint the dfk incrementally to a checkpoint file.

parsl/executors/taskvine/executor.py

+2
Original file line numberDiff line numberDiff line change
@@ -589,11 +589,13 @@ def shutdown(self, *args, **kwargs):
589589
# Join all processes before exiting
590590
logger.debug("Joining on submit process")
591591
self._submit_process.join()
592+
self._submit_process.close()
592593
logger.debug("Joining on collector thread")
593594
self._collector_thread.join()
594595
if self.worker_launch_method == 'factory':
595596
logger.debug("Joining on factory process")
596597
self._factory_process.join()
598+
self._factory_process.close()
597599

598600
# Shutdown multiprocessing queues
599601
self._ready_task_queue.close()

parsl/executors/workqueue/executor.py

+2
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,8 @@ def shutdown(self, *args, **kwargs):
704704

705705
logger.debug("Joining on submit process")
706706
self.submit_process.join()
707+
self.submit_process.close()
708+
707709
logger.debug("Joining on collector thread")
708710
self.collector_thread.join()
709711

parsl/log_utils.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"""
1313
import io
1414
import logging
15-
from typing import Optional
15+
from typing import Optional, Tuple
1616

1717
import typeguard
1818

@@ -65,7 +65,7 @@ def set_stream_logger(name: str = 'parsl',
6565
def set_file_logger(filename: str,
6666
name: str = 'parsl',
6767
level: int = logging.DEBUG,
68-
format_string: Optional[str] = None) -> logging.Logger:
68+
format_string: Optional[str] = None) -> Tuple[logging.Logger, logging.FileHandler]:
6969
"""Add a file log handler.
7070
7171
Args:
@@ -93,4 +93,4 @@ def set_file_logger(filename: str,
9393
futures_logger = logging.getLogger("concurrent.futures")
9494
futures_logger.addHandler(handler)
9595

96-
return logger
96+
return (logger, handler)

parsl/monitoring/monitoring.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -283,9 +283,9 @@ def close(self) -> None:
283283

284284
@wrap_with_logs
285285
def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None:
286-
logger = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir),
287-
name="monitoring_filesystem_radio",
288-
level=logging.INFO)
286+
logger, _ = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir),
287+
name="monitoring_filesystem_radio",
288+
level=logging.INFO)
289289

290290
logger.info("Starting filesystem radio receiver")
291291
setproctitle("parsl: monitoring filesystem receiver")

parsl/monitoring/router.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ def __init__(self,
6464
An event that the main Parsl process will set to signal that the monitoring router should shut down.
6565
"""
6666
os.makedirs(logdir, exist_ok=True)
67-
self.logger = set_file_logger("{}/monitoring_router.log".format(logdir),
68-
name="monitoring_router",
69-
level=logging_level)
67+
self.logger, _ = set_file_logger("{}/monitoring_router.log".format(logdir),
68+
name="monitoring_router",
69+
level=logging_level)
7070
self.logger.debug("Monitoring router starting")
7171

7272
self.hub_address = hub_address

parsl/tests/conftest.py

+12
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
180180
config = pytestconfig.getoption('config')[0]
181181

182182
if config != 'local':
183+
this_process = psutil.Process()
184+
start_fds = this_process.num_fds()
185+
logger.error(f"BENC: open fds: {start_fds}")
183186
assert threading.active_count() == 1, "precondition: only one thread can be running before this test: " + repr(threading.enumerate())
184187

185188
spec = importlib.util.spec_from_file_location('', config)
@@ -211,6 +214,9 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
211214
assert DataFlowKernelLoader._dfk is None
212215

213216
assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())
217+
end_fds = this_process.num_fds()
218+
logger.error(f"BENC: end open fds: {end_fds} (vs {start_fds} at start)")
219+
assert start_fds == end_fds, "number of open fds changed across test run"
214220

215221
else:
216222
yield
@@ -275,6 +281,12 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session):
275281
logger.error(f"BENC: end threads: {threading.active_count()}")
276282

277283
assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())
284+
end_fds = this_process.num_fds()
285+
logger.error(f"BENC: open fds END: {end_fds}")
286+
if end_fds > start_fds:
287+
logger.error(f"Open files (not all fds, though?): {this_process.open_files()!r}")
288+
os.system(f"ls -l /proc/{os.getpid()}/fd")
289+
pytest.fail("BENC: number of open fds increased across test")
278290

279291
else:
280292
yield

0 commit comments

Comments
 (0)