Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make MPITaskScheduler prioritize large tasks #3805

Open
wants to merge 1 commit into
base: fix_mpi_infinite_loop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/mpi_resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ def put_task(self, task_package: dict):
_f, _args, _kwargs, resource_spec = unpack_res_spec_apply_message(task_package["buffer"])

nodes_needed = resource_spec.get("num_nodes")
prioritized_task = PrioritizedTask(priority=nodes_needed,
# Prioritize large jobs
prioritized_task = PrioritizedTask(priority=-1 * nodes_needed,
task=task_package,
unpacked_task=(_f, _args, _kwargs, resource_spec),
nodes_needed=nodes_needed)
Expand Down
36 changes: 36 additions & 0 deletions parsl/tests/test_mpi_apps/test_mpi_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import pickle
import random
from unittest import mock

import pytest
Expand Down Expand Up @@ -218,3 +219,38 @@ def test_tiny_large_loop():
got_result = scheduler.get_result(True, 1)

assert got_result == result_pkl


@pytest.mark.local
def test_larger_jobs_prioritized():
"""Larger jobs should be scheduled first"""

task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
scheduler = MPITaskScheduler(task_q, result_q)

max_nodes = len(scheduler.available_nodes)

# The first task will get scheduled with all the nodes,
# and the remainder hits the backlog queue.
node_request_list = [max_nodes] + [random.randint(1, 4) for _i in range(8)]

for task_id, num_nodes in enumerate(node_request_list):
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
resource_specification={
"num_nodes": num_nodes,
"ranks_per_node": 2
})
task_package = {"task_id": task_id, "buffer": mock_task_buffer}
scheduler.put_task(task_package)

# Confirm that the tasks are sorted in decreasing order
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test checks that tests coming out of the queue are sorted in priority order (that is, is the priority part of PrioritizedTask implemented correctly).

It doesn't check that they are sorted in num_nodes order, I think? (for example, introduce a bug into the calculation at line 200 and I think this test will still pass, even though the code is buggy?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll update this test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just pushed the updated test that tests for the backlog queue returning tasks in decreasing order of nodes requested.

output_priority = []
for i in range(len(node_request_list) - 1):
p_task = scheduler._backlog_queue.get()
output_priority.append(p_task.nodes_needed)

# Remove the first large job that blocks the nodes and forces following
# tasks into backlog
expected_priority = node_request_list[1:]
expected_priority.sort(reverse=True)
assert expected_priority == output_priority, "Expected nodes in decreasing sorted order"
Loading