Skip to content

Commit 7886dcd

Browse files
committed
quentin has experienced a glaring race condition in make_rundirs when launching multiple workflows at once.
it's unclear to me why its *so bad* in his situation, but the race condition exists none-the-less and this PR complicates rundir creation to accept that it might be run multiple times concurrently
1 parent 42c0e2e commit 7886dcd

File tree

2 files changed

+45
-21
lines changed

2 files changed

+45
-21
lines changed

parsl/dataflow/errors.py

+4
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,7 @@ def __init__(self, dependent_exceptions_tids: Sequence[Tuple[BaseException, Opti
6363
def __str__(self) -> str:
6464
dep_tids = [tid for (exception, tid) in self.dependent_exceptions_tids]
6565
return "Join failure for task {} with failed join dependencies from tasks {}".format(self.task_id, dep_tids)
66+
67+
68+
class RundirCreateError(ParslError):
69+
pass

parsl/dataflow/rundirs.py

+41-21
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
import os
22
from glob import glob
33
import logging
4+
import random
5+
import time
6+
7+
from parsl.dataflow.errors import RundirCreateError
48

59
logger = logging.getLogger(__name__)
610

711

8-
def make_rundir(path: str) -> str:
12+
def make_rundir(path: str, *, max_tries: int = 3) -> str:
913
"""When a path has not been specified, make the run directory.
1014
1115
Creates a rundir with the following hierarchy:
@@ -18,23 +22,39 @@ def make_rundir(path: str) -> str:
1822
Kwargs:
1923
- path (str): String path to a specific run dir
2024
"""
21-
try:
22-
if not os.path.exists(path):
23-
os.makedirs(path)
24-
25-
prev_rundirs = glob(os.path.join(path, "[0-9]*[0-9]"))
26-
27-
current_rundir = os.path.join(path, '000')
28-
29-
if prev_rundirs:
30-
# Since we globbed on files named as 0-9
31-
x = sorted([int(os.path.basename(x)) for x in prev_rundirs])[-1]
32-
current_rundir = os.path.join(path, '{0:03}'.format(x + 1))
33-
34-
os.makedirs(current_rundir)
35-
logger.debug("Parsl run initializing in rundir: {0}".format(current_rundir))
36-
return os.path.abspath(current_rundir)
37-
38-
except Exception:
39-
logger.exception("Failed to create run directory")
40-
raise
25+
backoff_time_s = 1 + random.random()
26+
27+
os.makedirs(path, exist_ok=True)
28+
29+
# try_count is 1-based for human readability
30+
try_count = 1
31+
while True:
32+
33+
# Python 3.10 introduces root_dir argument to glob which in future
34+
# can be used to simplify this code, something like:
35+
# prev_rundirs = glob("[0-9]*[0-9]", root_dir=path)
36+
full_prev_rundirs = glob(os.path.join(path, "[0-9]*[0-9]"))
37+
prev_rundirs = [os.path.basename(d) for d in full_prev_rundirs]
38+
39+
next = max([int(d) for d in prev_rundirs] + [-1]) + 1
40+
41+
current_rundir = os.path.join(path, '{0:03}'.format(next))
42+
43+
try:
44+
os.makedirs(current_rundir)
45+
logger.debug("rundir created: %s", current_rundir)
46+
return os.path.abspath(current_rundir)
47+
except FileExistsError:
48+
logger.warning(f"Could not create rundir {current_rundir} on try {try_count}")
49+
50+
if try_count >= max_tries:
51+
raise
52+
else:
53+
logger.debug("Backing off {}s", backoff_time_s)
54+
time.sleep(backoff_time_s)
55+
backoff_time_s *= 2 + random.random()
56+
try_count += 1
57+
58+
# this should never be reached - the above loop should have either returned
59+
# or raised an exception on the last try
60+
raise RundirCreateError()

0 commit comments

Comments
 (0)