Skip to content

Commit 254d8ba

Browse files
committed
count fds
1 parent 72c8dd5 commit 254d8ba

File tree

5 files changed

+26
-11
lines changed

5 files changed

+26
-11
lines changed

parsl/dataflow/dflow.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def __init__(self, config: Config) -> None:
9090
self.run_dir = make_rundir(config.run_dir)
9191

9292
if config.initialize_logging:
93-
parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG)
93+
_, self.logging_handler = parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG)
9494

9595
logger.info("Starting DataFlowKernel with config\n{}".format(config))
9696

@@ -1264,7 +1264,13 @@ def cleanup(self) -> None:
12641264
else:
12651265
logger.debug("Cleaning up non-default DFK - not unregistering")
12661266

1267-
logger.info("DFK cleanup complete")
1267+
# TODO: do this in parsl/logutils.py
1268+
logger.info("DFK cleanup complete - removing parsl.log handler")
1269+
logger_to_remove = logging.getLogger("parsl")
1270+
logger_to_remove.removeHandler(self.logging_handler)
1271+
self.logging_handler.close()
1272+
1273+
logger.info("handler closed - is this going to break things?")
12681274

12691275
def checkpoint(self, tasks: Optional[Sequence[TaskRecord]] = None) -> str:
12701276
"""Checkpoint the dfk incrementally to a checkpoint file.

parsl/log_utils.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"""
1313
import io
1414
import logging
15-
from typing import Optional
15+
from typing import Optional, Tuple
1616

1717
import typeguard
1818

@@ -65,7 +65,7 @@ def set_stream_logger(name: str = 'parsl',
6565
def set_file_logger(filename: str,
6666
name: str = 'parsl',
6767
level: int = logging.DEBUG,
68-
format_string: Optional[str] = None) -> logging.Logger:
68+
format_string: Optional[str] = None) -> Tuple[logging.Logger, logging.FileHandler]:
6969
"""Add a file log handler.
7070
7171
Args:
@@ -93,4 +93,4 @@ def set_file_logger(filename: str,
9393
futures_logger = logging.getLogger("concurrent.futures")
9494
futures_logger.addHandler(handler)
9595

96-
return logger
96+
return (logger, handler)

parsl/monitoring/monitoring.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,9 @@ def close(self) -> None:
252252

253253
@wrap_with_logs
254254
def filesystem_receiver(q: Queue[TaggedMonitoringMessage], run_dir: str) -> None:
255-
logger = set_file_logger(f"{run_dir}/monitoring_filesystem_radio.log",
256-
name="monitoring_filesystem_radio",
257-
level=logging.INFO)
255+
logger, _ = set_file_logger(f"{run_dir}/monitoring_filesystem_radio.log",
256+
name="monitoring_filesystem_radio",
257+
level=logging.INFO)
258258

259259
logger.info("Starting filesystem radio receiver")
260260
setproctitle("parsl: monitoring filesystem receiver")

parsl/monitoring/router.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ def __init__(self,
6060
An event that the main Parsl process will set to signal that the monitoring router should shut down.
6161
"""
6262
os.makedirs(run_dir, exist_ok=True)
63-
self.logger = set_file_logger(f"{run_dir}/monitoring_router.log",
64-
name="monitoring_router",
65-
level=logging_level)
63+
self.logger, _ = set_file_logger(f"{run_dir}/monitoring_router.log",
64+
name="monitoring_router",
65+
level=logging_level)
6666
self.logger.debug("Monitoring router starting")
6767

6868
self.hub_address = hub_address

parsl/tests/conftest.py

+9
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,9 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
217217
logger.error(f"BENC: end open fds: {end_fds}")
218218

219219
assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())
220+
end_fds = this_process.num_fds()
221+
logger.error(f"BENC: end open fds: {end_fds} (vs {start_fds} at start)")
222+
assert start_fds == end_fds, "number of open fds changed across test run"
220223

221224
else:
222225
yield
@@ -281,6 +284,12 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session):
281284
logger.error(f"BENC: end threads: {threading.active_count()}")
282285

283286
assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())
287+
end_fds = this_process.num_fds()
288+
logger.error(f"BENC: open fds END: {end_fds}")
289+
if end_fds > start_fds:
290+
logger.error(f"Open files (not all fds, though?): {this_process.open_files()!r}")
291+
os.system(f"ls -l /proc/{os.getpid()}/fd")
292+
pytest.fail("BENC: number of open fds increased across test")
284293

285294
else:
286295
yield

0 commit comments

Comments
 (0)