Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
bcb3346
[Subnet Prioritization] Add prioritized|capacity-optimized-prioritize…
Allenz5 Jun 3, 2025
bd77a73
Merge branch 'develop' of github.com:Allenz5/aws-parallelcluster into…
Allenz5 Jun 3, 2025
96d1e01
Merge branch 'aws:develop' into develop
Allenz5 Jun 5, 2025
1360560
[Subnet Prioritization] Add test cases for instance allocation strate…
Allenz5 Jun 5, 2025
8cf10aa
[Subnet Prioritization] Update the default value and update policy of…
Allenz5 Jun 5, 2025
7f303dd
[Subnet Prioritization] Move AllocationStrategy Enum from pcluster.co…
Allenz5 Jun 6, 2025
26ddfec
[Subnet Prioritization] Add validator and validator test for enable_s…
Allenz5 Jun 6, 2025
13d8cfa
[Subnet Prioritization] Move AllocationStrategy Enum from cluster_con…
Allenz5 Jun 6, 2025
f8de4a5
Revert "[Subnet Prioritization] Move AllocationStrategy Enum from clu…
Allenz5 Jun 6, 2025
c7889cd
[Subnet Prioritization] Register enable_single_availability_zone_vali…
Allenz5 Jun 6, 2025
b6506cf
[Subnet Prioritization] Change default value of enable_single_availab…
Allenz5 Jun 9, 2025
a3b7672
[Subnet Prioritization] Add enable_single_availability_zone parameter…
Allenz5 Jun 9, 2025
33d3fd1
Merge branch 'aws:develop' into develop
Allenz5 Jun 9, 2025
406d5e8
[Subnet Prioritization] Fix format issues
Allenz5 Jun 11, 2025
dee33ca
Merge branch 'develop' of github.com:Allenz5/aws-parallelcluster into…
Allenz5 Jun 11, 2025
f2a0c7e
[Subnet Prioritization] Update CHANGELOG.md
Allenz5 Jun 11, 2025
2e94485
[Subnet Prioritization] Remove duplicated AllocationStrategy Enum
Allenz5 Jun 11, 2025
d2c2ba6
[Subnet Prioritization] Remove duplicated EnableSingleAvailabilityZon…
Allenz5 Jun 11, 2025
db7a9c5
[Subnet Prioritization] Update the failure message of InstancesAlloca…
Allenz5 Jun 11, 2025
99773c4
[Subnet Prioritization] Update enable_single_availability_zone_valida…
Allenz5 Jun 11, 2025
ce299af
[Subnet Prioritization] Update format
Allenz5 Jun 12, 2025
aa81737
[Subnet Prioritization] Fix EnableSingleAvailabilityZoneValidator
Allenz5 Jun 12, 2025
24129ff
[Subnet Prioritization] Fix format issue
Allenz5 Jun 12, 2025
444859a
[Subnet Prioritization] Add integration test for subnet prioritization
Allenz5 Jun 30, 2025
09c0170
[Subnet Prioritization] Update integration test for subnet prioritiza…
Allenz5 Jul 1, 2025
5117270
[Subnet Prioritization] Remove EnableSingleAvailabilityZone parameter…
Allenz5 Jul 7, 2025
ca4f2e7
[Subnet Prioritization] Update Integration Test
Allenz5 Jul 7, 2025
c033811
[Subnet Prioritization] Fix format issues
Allenz5 Jul 7, 2025
48ccdd0
[Subnet Prioritization] Remove EnableSingleAvailabilityZone from inte…
Allenz5 Jul 7, 2025
1f37961
Merge branch 'develop' into develop
Allenz5 Jul 7, 2025
80ab65b
[Subnet Prioritization] Move AllocationStrategy Enum from common.py b…
Allenz5 Jul 8, 2025
aafcfdf
[Subnet Prioritization] Update integration test and format
Allenz5 Jul 8, 2025
cc57784
Merge branch 'develop' of github.com:Allenz5/aws-parallelcluster into…
Allenz5 Jul 8, 2025
08e1901
[Subnet Prioritization] Remove custom hardcode settings from integrat…
Allenz5 Jul 8, 2025
f0b36fb
Merge branch 'develop' into develop
Allenz5 Jul 9, 2025
aed81d6
[Subnet Prioritization] Update format in instances_validators.py
Allenz5 Jul 9, 2025
ed7725c
Merge remote-tracking branch 'origin/develop' into develop
Allenz5 Jul 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ CHANGELOG

**CHANGES**
- Ubuntu 20.04 is no longer supported.
- Support prioritized and capacity-optimized-prioritized Allocation Strategy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- Support DCV on Amazon Linux 2023.
- Upgrade Python runtime used by Lambda functions to python3.12 (from python3.9).
- Remove `berkshelf`. All cookbooks are local and do not need `berkshelf` dependency management.
Expand Down
2 changes: 2 additions & 0 deletions cli/src/pcluster/config/cluster_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2578,6 +2578,8 @@ class AllocationStrategy(Enum):
LOWEST_PRICE = "lowest-price"
CAPACITY_OPTIMIZED = "capacity-optimized"
PRICE_CAPACITY_OPTIMIZED = "price-capacity-optimized"
PRIORITIZED = "prioritized"
CAPACITY_OPTIMIZED_PRIORITIZED = "capacity-optimized-prioritized"


class SlurmQueue(_CommonQueue):
Expand Down
28 changes: 23 additions & 5 deletions cli/src/pcluster/validators/instances_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,22 @@ class InstancesAllocationStrategyValidator(Validator, _FlexibleInstanceTypesVali
"""Confirm Allocation Strategy matches with the Capacity Type."""

def _validate(self, compute_resource_name: str, capacity_type: Enum, allocation_strategy: Enum, **kwargs):
"""On-demand Capacity type only supports "lowest-price" allocation strategy."""
"""On-demand Capacity type only supports "lowest-price" and "prioritized" allocation strategy."""
valid_on_demand_allocation_strategy = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CodeStyle] This is a constant dictionary, let's defined it in a common script, ideally where the constants about strategies are defined. There is not reason to redefine it every time this function gets called.

cluster_config.AllocationStrategy.LOWEST_PRICE,
cluster_config.AllocationStrategy.PRIORITIZED,
}
if (
capacity_type == cluster_config.CapacityType.ONDEMAND
and allocation_strategy
and allocation_strategy != cluster_config.AllocationStrategy.LOWEST_PRICE
and allocation_strategy not in valid_on_demand_allocation_strategy
):
alloc_strategy_msg = allocation_strategy.value if allocation_strategy else "not set"
self._add_failure(
f"Compute Resource {compute_resource_name} is using an OnDemand CapacityType but the Allocation "
f"Strategy specified is {alloc_strategy_msg}. OnDemand CapacityType can only use '"
f"{cluster_config.AllocationStrategy.LOWEST_PRICE.value}' allocation strategy.",
f"Compute Resource {compute_resource_name} is using an OnDemand CapacityType but "
f"the Allocation Strategy specified is {alloc_strategy_msg}. OnDemand CapacityType can only use '"
f"{cluster_config.AllocationStrategy.LOWEST_PRICE.value}' or "
f"'{cluster_config.AllocationStrategy.PRIORITIZED.value}' allocation strategy.",
FailureLevel.ERROR,
)
if capacity_type == cluster_config.CapacityType.CAPACITY_BLOCK and allocation_strategy:
Expand All @@ -241,6 +246,19 @@ def _validate(self, compute_resource_name: str, capacity_type: Enum, allocation_
"When using CAPACITY_BLOCK CapacityType, allocation strategy should not be set.",
FailureLevel.ERROR,
)
if (
capacity_type == cluster_config.CapacityType.SPOT
and allocation_strategy == cluster_config.AllocationStrategy.PRIORITIZED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could take the chance here to generalize the check the same way we did for on demand, i,.e,. define in a constant the valid strategies for spot and check against that constant.

):
self._add_failure(
f"Compute Resource {compute_resource_name} is using a SPOT CapacityType but the "
f"Allocation Strategy specified is {allocation_strategy.value}. SPOT CapacityType can only use "
f"'{cluster_config.AllocationStrategy.LOWEST_PRICE.value}', "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to define the valid strategies for spot into a constant and concatenate those into the message

f"'{cluster_config.AllocationStrategy.CAPACITY_OPTIMIZED.value}', "
f"'{cluster_config.AllocationStrategy.PRICE_CAPACITY_OPTIMIZED.value}' "
f"or '{cluster_config.AllocationStrategy.CAPACITY_OPTIMIZED_PRIORITIZED.value}' allocation strategy.",
FailureLevel.ERROR,
)


class InstancesMemorySchedulingWarningValidator(Validator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Scheduling:
- Name: compute_resource2
InstanceType: c4.2xlarge
- Name: queue2
AllocationStrategy: "prioritized"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double quotes are not required in yaml

Networking:
SubnetIds:
- subnet-23456789
Expand Down
45 changes: 38 additions & 7 deletions cli/tests/pcluster/validators/test_instances_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,31 +591,49 @@ def test_instances_networking_validator(
CapacityType.ONDEMAND,
AllocationStrategy.CAPACITY_OPTIMIZED,
"Compute Resource TestComputeResource is using an OnDemand CapacityType but the Allocation Strategy "
"specified is capacity-optimized. OnDemand CapacityType can only use 'lowest-price' allocation strategy.",
"specified is capacity-optimized. "
"OnDemand CapacityType can only use 'lowest-price' or 'prioritized' allocation strategy.",
),
(
CapacityType.ONDEMAND,
AllocationStrategy.PRICE_CAPACITY_OPTIMIZED,
"Compute Resource TestComputeResource is using an OnDemand CapacityType but the Allocation Strategy "
"specified is price-capacity-optimized. "
"OnDemand CapacityType can only use 'lowest-price' allocation strategy.",
"OnDemand CapacityType can only use 'lowest-price' or 'prioritized' allocation strategy.",
),
(
CapacityType.ONDEMAND,
AllocationStrategy.CAPACITY_OPTIMIZED_PRIORITIZED,
"Compute Resource TestComputeResource is using an OnDemand CapacityType but the Allocation "
"Strategy specified is capacity-optimized-prioritized. "
"OnDemand CapacityType can only use 'lowest-price' or 'prioritized' allocation strategy.",
),
(CapacityType.ONDEMAND, AllocationStrategy.PRIORITIZED, ""),
(CapacityType.ONDEMAND, AllocationStrategy.LOWEST_PRICE, ""),
(CapacityType.ONDEMAND, None, ""),
# Spot Capacity type supports both "lowest-price" and "capacity-optimized" allocation strategy
(CapacityType.SPOT, AllocationStrategy.LOWEST_PRICE, ""),
(CapacityType.SPOT, AllocationStrategy.CAPACITY_OPTIMIZED, ""),
(CapacityType.SPOT, AllocationStrategy.PRICE_CAPACITY_OPTIMIZED, ""),
(CapacityType.SPOT, AllocationStrategy.CAPACITY_OPTIMIZED_PRIORITIZED, ""),
(CapacityType.SPOT, None, ""),
(
CapacityType.SPOT,
AllocationStrategy.PRIORITIZED,
"Compute Resource TestComputeResource is using a SPOT CapacityType but the "
"Allocation Strategy specified is prioritized. SPOT CapacityType can only use "
"'lowest-price', "
"'capacity-optimized', "
"'price-capacity-optimized' "
"or 'capacity-optimized-prioritized' allocation strategy.",
),
# Capacity Block type supports does not support any allocation strategy
(
CapacityType.CAPACITY_BLOCK,
AllocationStrategy.CAPACITY_OPTIMIZED,
(
"Compute Resource TestComputeResource is using a CAPACITY_BLOCK CapacityType but the Allocation "
"Strategy specified is capacity-optimized. When using CAPACITY_BLOCK CapacityType, "
"allocation strategy should not be set."
),
"Compute Resource TestComputeResource is using a CAPACITY_BLOCK CapacityType but the Allocation "
"Strategy specified is capacity-optimized. When using CAPACITY_BLOCK CapacityType, "
"allocation strategy should not be set.",
),
(
CapacityType.CAPACITY_BLOCK,
Expand All @@ -632,6 +650,19 @@ def test_instances_networking_validator(
"Compute Resource TestComputeResource is using a CAPACITY_BLOCK CapacityType but the Allocation Strategy "
"specified is lowest-price. When using CAPACITY_BLOCK CapacityType, allocation strategy should not be set.",
),
(
CapacityType.CAPACITY_BLOCK,
AllocationStrategy.PRIORITIZED,
"Compute Resource TestComputeResource is using a CAPACITY_BLOCK CapacityType but the Allocation Strategy "
"specified is prioritized. When using CAPACITY_BLOCK CapacityType, allocation strategy should not be set.",
),
(
CapacityType.CAPACITY_BLOCK,
AllocationStrategy.CAPACITY_OPTIMIZED_PRIORITIZED,
"Compute Resource TestComputeResource is using a CAPACITY_BLOCK CapacityType but the Allocation Strategy "
"specified is capacity-optimized-prioritized. When using CAPACITY_BLOCK CapacityType, "
"allocation strategy should not be set.",
),
(CapacityType.CAPACITY_BLOCK, None, ""),
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import boto3
import pytest
from assertpy import assert_that
from assertpy import assert_that, soft_assertions
from cfn_stacks_factory import CfnStack
from constants import OSU_BENCHMARK_VERSION, UNSUPPORTED_OSES_FOR_DCV
from fabric import Connection
Expand All @@ -24,16 +24,19 @@
check_pcluster_list_cluster_log_streams,
generate_stack_name,
get_compute_nodes_instance_ids,
get_compute_nodes_subnet_ids,
get_username_for_os,
is_dcv_supported,
render_jinja_template,
)

from tests.common.assertions import (
assert_lambda_vpc_settings_are_correct,
assert_msg_in_log,
assert_no_errors_in_logs,
assert_no_msg_in_logs,
wait_for_num_instances_in_cluster,
wait_for_num_instances_in_queue,
)
from tests.common.osu_common import compile_osu
from tests.common.schedulers_common import SlurmCommands
Expand Down Expand Up @@ -238,3 +241,33 @@ def _run_mpi_jobs(mpi_variants, remote_command_executor, test_datadir, slurm_com
slurm_commands.assert_job_succeeded(job_id)
logging.info("Checking cluster has two nodes after running MPI jobs") # 1 static node + 1 dynamic node
assert_that(len(get_compute_nodes_instance_ids(cluster.cfn_name, region))).is_equal_to(2)


@pytest.mark.usefixtures("instance")
def test_cluster_with_subnet_prioritization(
region, os, pcluster_config_reader, clusters_factory, vpc_stack, scheduler_commands_factory
):
# Create cluster with subnet prioritization
init_config_file = pcluster_config_reader(config_file="pcluster.config.yaml")
cluster = clusters_factory(init_config_file)

remote_command_executor = RemoteCommandExecutor(cluster)
scheduler_commands = scheduler_commands_factory(remote_command_executor)
public_subnets = vpc_stack.get_all_public_subnets()
queues = ["queue1", "queue2"]
logging.info(f"Public subnets: {public_subnets}")
# Check that all instances are launched in the subnet with the highest priority
with soft_assertions():
for queue in queues:
scheduler_commands.submit_command("sleep 60", nodes=5, partition=queue)
wait_for_num_instances_in_queue(cluster.cfn_name, cluster.region, desired=5, queue=queue)

subnet_ids = get_compute_nodes_subnet_ids(cluster.cfn_name, region, node_type="Compute", queue_name=queue)
logging.info(f"Subnets: {subnet_ids}")
for subnet_id in subnet_ids:
assert_that(subnet_id).is_equal_to(public_subnets[0])

# Check that the CreateFleet request contains priorities for each subnet
slurm_resume_log = "/var/log/parallelcluster/slurm_resume.log"
assert_msg_in_log(remote_command_executor, slurm_resume_log, f"'SubnetId': '{public_subnets[0]}', 'Priority': 0.0")
assert_msg_in_log(remote_command_executor, slurm_resume_log, f"'SubnetId': '{public_subnets[1]}', 'Priority': 1.0")
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
Image:
Os: {{ os }}
HeadNode:
SharedStorageType: {{ shared_headnode_storage_type }}
InstanceType: {{ instance }}
Networking:
SubnetId: {{ public_subnet_id }}
Ssh:
KeyName: {{ key_name }}
Scheduling:
Scheduler: slurm
SlurmQueues:
- Name: queue1
CapacityType: ONDEMAND
AllocationStrategy: prioritized
ComputeResources:
- Name: queue1-i1
Instances:
- InstanceType: {{ instance }}
MinCount: 0
MaxCount: 10
Networking:
SubnetIds:
- {{ public_subnet_ids[0] }}
- {{ public_subnet_ids[1] }}
- Name: queue2
CapacityType: SPOT
AllocationStrategy: capacity-optimized-prioritized
ComputeResources:
- Name: queue2-i1
Instances:
- InstanceType: {{ instance }}
MinCount: 0
MaxCount: 10
Networking:
SubnetIds:
- {{ public_subnet_ids[0] }}
- {{ public_subnet_ids[1] }}
16 changes: 16 additions & 0 deletions tests/integration-tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,22 @@ def get_cluster_nodes_instance_ids(stack_name, region, instance_types=None, node
raise


def get_compute_nodes_subnet_ids(stack_name, region, instance_types=None, node_type=None, queue_name=None):
"""Return a list of cluster Instances Subnet Ids."""
try:
instances = describe_cluster_instances(
stack_name,
region,
filter_by_node_type=node_type,
filter_by_instance_types=instance_types,
filter_by_queue_name=queue_name,
)
return [instance["SubnetId"] for instance in instances]
except Exception as e:
logging.error("Failed retrieving instance ids with exception: %s", e)
raise


def get_compute_nodes_instance_ips(stack_name, region):
"""Return a list of compute Instances Ip's."""
try:
Expand Down
Loading