Skip to content

Commit b74b57c

Browse files
committed
schedule_backlog_tasks is now updated to fetch all tasks in the backlog_queue and then attempt to schedule them avoiding the infinite loop.
* Adding regression test `test_tiny_large_loop` that triggers `RecursionError: maximum recursion depth exceeded while getting the repr of an object`
1 parent 1b1e6a2 commit b74b57c

File tree

2 files changed

+45
-8
lines changed

2 files changed

+45
-8
lines changed

parsl/executors/high_throughput/mpi_resource_management.py

+13-8
Original file line numberDiff line numberDiff line change
@@ -191,14 +191,19 @@ def put_task(self, task_package: dict):
191191

192192
def _schedule_backlog_tasks(self):
193193
"""Attempt to schedule backlogged tasks"""
194-
try:
195-
prioritized_task = self._backlog_queue.get(block=False)
196-
self.put_task(prioritized_task.task)
197-
except queue.Empty:
198-
return
199-
else:
200-
# Keep attempting to schedule tasks till we are out of resources
201-
self._schedule_backlog_tasks()
194+
195+
# Separate fetching tasks from the _backlog_queue and scheduling them
196+
# since tasks that failed to schedule will be pushed to the _backlog_queue
197+
backlogged_tasks = []
198+
while True:
199+
try:
200+
prioritized_task = self._backlog_queue.get(block=False)
201+
backlogged_tasks.append(prioritized_task.task)
202+
except queue.Empty:
203+
break
204+
205+
for backlogged_task in backlogged_tasks:
206+
self.put_task(backlogged_task)
202207

203208
def get_result(self, block: bool = True, timeout: Optional[float] = None):
204209
"""Return result and relinquish provisioned nodes"""

parsl/tests/test_mpi_apps/test_mpi_scheduler.py

+32
Original file line numberDiff line numberDiff line change
@@ -186,3 +186,35 @@ def test_hashable_backlog_queue():
186186
task_package = {"task_id": i, "buffer": mock_task_buffer}
187187
scheduler.put_task(task_package)
188188
assert scheduler._backlog_queue.qsize() == 2, "Expected 2 backlogged tasks"
189+
190+
191+
@pytest.mark.local
192+
def test_tiny_large_loop():
193+
"""Run a set of tiny and large tasks in a loop"""
194+
195+
task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
196+
scheduler = MPITaskScheduler(task_q, result_q)
197+
198+
assert scheduler.available_nodes
199+
assert len(scheduler.available_nodes) == 8
200+
201+
assert scheduler._free_node_counter.value == 8
202+
203+
for i in range(10):
204+
num_nodes = 2 if i % 2 == 0 else 8
205+
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
206+
resource_specification={
207+
"num_nodes": num_nodes,
208+
"ranks_per_node": 2
209+
})
210+
task_package = {"task_id": i, "buffer": mock_task_buffer}
211+
scheduler.put_task(task_package)
212+
213+
for i in range(10):
214+
task = task_q.get(timeout=30)
215+
result_pkl = pickle.dumps(
216+
{"task_id": task["task_id"], "type": "result", "buffer": "RESULT BUF"})
217+
result_q.put(result_pkl)
218+
got_result = scheduler.get_result(True, 1)
219+
220+
assert got_result == result_pkl

0 commit comments

Comments
 (0)