Skip to content

Commit b9d99f4

Browse files
cbenazetBerengerBerthoul
authored andcommitted
shell scheduler: fix items order to pick up the right item at the reception + step+1 in print (closes #10)
1 parent f48d06f commit b9d99f4

File tree

2 files changed

+11
-6
lines changed

2 files changed

+11
-6
lines changed

pytest_parallel/shell_static_scheduler.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from .algo import partition
1111
from .static_scheduler_utils import group_items_by_parallel_steps
1212
from mpi4py import MPI
13+
import numpy as np
1314

1415
def mark_skip(item, ntasks):
1516
n_proc_test = get_n_proc_for_test(item)
@@ -103,18 +104,22 @@ def submit_items(items_to_run, SCHEDULER_IP_ADDRESS, port, main_invoke_params, n
103104

104105
script = " & \\\n".join(cmds) + '\n'
105106
Path('.pytest_parallel').mkdir(exist_ok=True)
106-
script_path = f'.pytest_parallel/pytest_static_sched_{i_step}.sh'
107+
script_path = f'.pytest_parallel/pytest_static_sched_{i_step+1}.sh'
107108
with open(script_path,'w') as f:
108109
f.write(script)
109110

110111
current_permissions = stat.S_IMODE(os.lstat(script_path).st_mode)
111112
os.chmod(script_path, current_permissions | stat.S_IXUSR)
112113

113114
p = subprocess.Popen([script_path], shell=True, stdout=subprocess.PIPE)
114-
print(f'\nLaunching tests (step {i_step}/{n_step})...')
115+
print(f'\nLaunching tests (step {i_step+1}/{n_step})...')
115116
return p
116117

117118
def receive_items(items, session, socket, n_item_to_recv):
119+
# > Precondition: Items must keep their original order to pick up the right item at the reception
120+
original_indices = np.array([item.original_index for item in items])
121+
assert (original_indices==np.arange(len(items))).all()
122+
118123
while n_item_to_recv>0:
119124
conn, addr = socket.accept()
120125
with conn:
@@ -123,13 +128,13 @@ def receive_items(items, session, socket, n_item_to_recv):
123128
test_idx = test_info['test_idx']
124129
if test_info['fatal_error'] is not None:
125130
assert 0, f'{test_info["fatal_error"]}'
126-
item = items[test_idx]
131+
item = items[test_idx] # works because of precondition
127132
item.sub_comm = None
128133
item.info = test_info
129134

130135
# "run" the test (i.e. trigger PyTest pipeline but do not really run the code)
131136
nextitem = None # not known at this point
132-
run_item_test(items[test_idx], nextitem, session)
137+
run_item_test(item, nextitem, session)
133138
n_item_to_recv -= 1
134139

135140
class ShellStaticScheduler:

pytest_parallel/static_scheduler_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
def group_items_by_parallel_steps(items, n_workers):
2-
items.sort(key=lambda item: item.n_proc, reverse=True)
2+
_items = sorted(items, key=lambda item: item.n_proc, reverse=True)
33

44
remaining_n_procs_by_step = []
55
items_by_step = []
66
items_to_skip = []
7-
for item in items:
7+
for item in _items:
88
if item.n_proc > n_workers:
99
items_to_skip += [item]
1010
else:

0 commit comments

Comments
 (0)