Skip to content

Commit 55d46e1

Browse files
committed
Prioritize large jobs by setting priority to negative of nodes requested by MPI task
1 parent 27ad9df commit 55d46e1

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

parsl/executors/high_throughput/mpi_resource_management.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ def put_task(self, task_package: dict):
196196
_f, _args, _kwargs, resource_spec = unpack_res_spec_apply_message(task_package["buffer"])
197197

198198
nodes_needed = resource_spec.get("num_nodes")
199-
prioritized_task = PrioritizedTask(priority=nodes_needed,
199+
# Prioritize large jobs
200+
prioritized_task = PrioritizedTask(priority=-1 * nodes_needed,
200201
task=task_package,
201202
unpacked_task=(_f, _args, _kwargs, resource_spec),
202203
nodes_needed=nodes_needed)

parsl/tests/test_mpi_apps/test_mpi_scheduler.py

+36
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
import os
33
import pickle
4+
import random
45
from unittest import mock
56

67
import pytest
@@ -218,3 +219,38 @@ def test_tiny_large_loop():
218219
got_result = scheduler.get_result(True, 1)
219220

220221
assert got_result == result_pkl
222+
223+
224+
@pytest.mark.local
225+
def test_larger_jobs_prioritized():
226+
"""Larger jobs should be scheduled first"""
227+
228+
task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
229+
scheduler = MPITaskScheduler(task_q, result_q)
230+
231+
max_nodes = len(scheduler.available_nodes)
232+
233+
# The first task will get scheduled with all the nodes,
234+
# and the remainder hits the backlog queue.
235+
node_request_list = [max_nodes] + [random.randint(1, 4) for _i in range(8)]
236+
237+
for task_id, num_nodes in enumerate(node_request_list):
238+
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
239+
resource_specification={
240+
"num_nodes": num_nodes,
241+
"ranks_per_node": 2
242+
})
243+
task_package = {"task_id": task_id, "buffer": mock_task_buffer}
244+
scheduler.put_task(task_package)
245+
246+
# Confirm that the tasks are sorted in decreasing order
247+
output_priority = []
248+
for i in range(len(node_request_list) - 1):
249+
p_task = scheduler._backlog_queue.get()
250+
output_priority.append(p_task.nodes_needed)
251+
252+
# Remove the first large job that blocks the nodes and forces following
253+
# tasks into backlog
254+
expected_priority = node_request_list[1:]
255+
expected_priority.sort(reverse=True)
256+
assert expected_priority == output_priority, "Expected nodes in decreasing sorted order"

0 commit comments

Comments
 (0)