diff --git a/pyproject.toml b/pyproject.toml index 3e9292d31..c6f20d662 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ dependencies = [ "python-multipart>=0.0.16", "filelock", "psutil", - "gpuhunt==0.1.8", + "gpuhunt==0.1.10", "argcomplete>=3.5.0", "ignore-python>=0.2.0", "orjson", diff --git a/src/dstack/_internal/core/backends/kubernetes/compute.py b/src/dstack/_internal/core/backends/kubernetes/compute.py index 9668a17f3..87a4fb805 100644 --- a/src/dstack/_internal/core/backends/kubernetes/compute.py +++ b/src/dstack/_internal/core/backends/kubernetes/compute.py @@ -5,7 +5,7 @@ from enum import Enum from typing import List, Optional, Tuple -from gpuhunt import KNOWN_NVIDIA_GPUS, AcceleratorVendor +from gpuhunt import KNOWN_AMD_GPUS, KNOWN_NVIDIA_GPUS, AcceleratorVendor from kubernetes import client from dstack._internal.core.backends.base.compute import ( @@ -59,19 +59,31 @@ logger = get_logger(__name__) JUMP_POD_SSH_PORT = 22 - -NVIDIA_GPU_NAME_TO_GPU_INFO = {gpu.name: gpu for gpu in KNOWN_NVIDIA_GPUS} -NVIDIA_GPU_NAMES = NVIDIA_GPU_NAME_TO_GPU_INFO.keys() +DUMMY_REGION = "-" NVIDIA_GPU_RESOURCE = "nvidia.com/gpu" -NVIDIA_GPU_COUNT_LABEL = f"{NVIDIA_GPU_RESOURCE}.count" -NVIDIA_GPU_PRODUCT_LABEL = f"{NVIDIA_GPU_RESOURCE}.product" NVIDIA_GPU_NODE_TAINT = NVIDIA_GPU_RESOURCE +NVIDIA_GPU_PRODUCT_LABEL = f"{NVIDIA_GPU_RESOURCE}.product" + +AMD_GPU_RESOURCE = "amd.com/gpu" +AMD_GPU_NODE_TAINT = AMD_GPU_RESOURCE +# The oldest but still supported label format, the safest option, see the commit message: +# https://github.com/ROCm/k8s-device-plugin/commit/c0b0231b391a56bc9da4f362d561e25e960d7a48 +# E.g., beta.amd.com/gpu.device-id.74b5=4 - A node with four MI300X VF (0x74b5) GPUs +# We cannot rely on the beta.amd.com/gpu.product-name.* label, as it may be missing, see the issue: +# https://github.com/ROCm/k8s-device-plugin/issues/112 +AMD_GPU_DEVICE_ID_LABEL_PREFIX = f"beta.{AMD_GPU_RESOURCE}.device-id." # Taints we know and tolerate when creating our objects, e.g., the jump pod. -TOLERATED_NODE_TAINTS = (NVIDIA_GPU_NODE_TAINT,) +TOLERATED_NODE_TAINTS = (NVIDIA_GPU_NODE_TAINT, AMD_GPU_NODE_TAINT) -DUMMY_REGION = "-" +NVIDIA_GPU_NAME_TO_GPU_INFO = {gpu.name: gpu for gpu in KNOWN_NVIDIA_GPUS} +NVIDIA_GPU_NAMES = NVIDIA_GPU_NAME_TO_GPU_INFO.keys() + +AMD_GPU_DEVICE_ID_TO_GPU_INFO = { + device_id: gpu_info for gpu_info in KNOWN_AMD_GPUS for device_id in gpu_info.device_ids +} +AMD_GPU_NAME_TO_DEVICE_IDS = {gpu.name: gpu.device_ids for gpu in KNOWN_AMD_GPUS} class Operator(str, Enum): @@ -112,21 +124,15 @@ def get_offers_by_requirements( nodes = get_value(node_list, ".items", list[client.V1Node], required=True) for node in nodes: try: - labels = get_value(node, ".metadata.labels", dict[str, str]) or {} name = get_value(node, ".metadata.name", str, required=True) - cpus = _parse_cpu( - get_value(node, ".status.allocatable['cpu']", str, required=True) - ) cpu_arch = normalize_arch( get_value(node, ".status.node_info.architecture", str) ).to_cpu_architecture() - memory_mib = _parse_memory( - get_value(node, ".status.allocatable['memory']", str, required=True) - ) - gpus, _ = _get_gpus_from_node_labels(labels) - disk_size_mib = _parse_memory( - get_value(node, ".status.allocatable['ephemeral-storage']", str, required=True) - ) + allocatable = get_value(node, ".status.allocatable", dict[str, str], required=True) + cpus = _parse_cpu(allocatable["cpu"]) + memory_mib = _parse_memory(allocatable["memory"]) + disk_size_mib = _parse_memory(allocatable["ephemeral-storage"]) + gpus = _get_node_gpus(node) except (AttributeError, KeyError, ValueError) as e: logger.exception("Failed to process node: %s: %s", type(e).__name__, e) continue @@ -217,59 +223,18 @@ def run_job( "GPU is requested but the offer has no GPUs:" f" {gpu_spec=} {instance_offer=}", ) - offer_gpu = offer_gpus[0] - matching_gpu_label_values: set[str] = set() - # We cannot generate an expected GPU label value from the Gpu model instance - # as the actual values may have additional components (socket, memory type, etc.) - # that we don't preserve in the Gpu model, e.g., "NVIDIA-H100-80GB-HBM3". - # Moreover, a single Gpu may match multiple label values. - # As a workaround, we iterate and process all node labels once again (we already - # processed them in `get_offers_by_requirements()`). - node_list = call_api_method( - self.api.list_node, - client.V1NodeList, - ) - nodes = get_value(node_list, ".items", list[client.V1Node], required=True) - for node in nodes: - labels = get_value(node, ".metadata.labels", dict[str, str]) - if not labels: - continue - gpus, gpu_label_value = _get_gpus_from_node_labels(labels) - if not gpus or gpu_label_value is None: - continue - if gpus[0] == offer_gpu: - matching_gpu_label_values.add(gpu_label_value) - if not matching_gpu_label_values: - raise ComputeError( - f"GPU is requested but no matching GPU labels found: {gpu_spec=}" - ) - logger.debug( - "Requesting %d GPU(s), node labels: %s", gpu_min, matching_gpu_label_values - ) - # TODO: support other GPU vendors - resources_requests[NVIDIA_GPU_RESOURCE] = str(gpu_min) - resources_limits[NVIDIA_GPU_RESOURCE] = str(gpu_min) - node_affinity = client.V1NodeAffinity( - required_during_scheduling_ignored_during_execution=[ - client.V1NodeSelectorTerm( - match_expressions=[ - client.V1NodeSelectorRequirement( - key=NVIDIA_GPU_PRODUCT_LABEL, - operator=Operator.IN, - values=list(matching_gpu_label_values), - ), - ], - ), - ], + gpu_resource, node_affinity, node_taint = _get_pod_spec_parameters_for_gpu( + self.api, offer_gpus[0] ) + logger.debug("Requesting GPU resource: %s=%d", gpu_resource, gpu_min) + resources_requests[gpu_resource] = resources_limits[gpu_resource] = str(gpu_min) # It should be NoSchedule, but we also add NoExecute toleration just in case. for effect in [TaintEffect.NO_SCHEDULE, TaintEffect.NO_EXECUTE]: tolerations.append( client.V1Toleration( - key=NVIDIA_GPU_NODE_TAINT, operator=Operator.EXISTS, effect=effect + key=node_taint, operator=Operator.EXISTS, effect=effect ) ) - if (memory_min := resources_spec.memory.min) is not None: resources_requests["memory"] = _render_memory(memory_min) if ( @@ -331,7 +296,9 @@ def run_job( volume_mounts=volume_mounts, ) ], - affinity=node_affinity, + affinity=client.V1Affinity( + node_affinity=node_affinity, + ), tolerations=tolerations, volumes=volumes_, ), @@ -550,34 +517,144 @@ def _render_memory(memory: Memory) -> str: return f"{float(memory)}Gi" -def _get_gpus_from_node_labels(labels: dict[str, str]) -> tuple[list[Gpu], Optional[str]]: +def _get_node_gpus(node: client.V1Node) -> list[Gpu]: + node_name = get_value(node, ".metadata.name", str, required=True) + allocatable = get_value(node, ".status.allocatable", dict[str, str], required=True) + labels = get_value(node, ".metadata.labels", dict[str, str]) or {} + for gpu_resource, gpu_getter in ( + (NVIDIA_GPU_RESOURCE, _get_nvidia_gpu_from_node_labels), + (AMD_GPU_RESOURCE, _get_amd_gpu_from_node_labels), + ): + _gpu_count = allocatable.get(gpu_resource) + if not _gpu_count: + continue + gpu_count = int(_gpu_count) + if gpu_count < 1: + continue + gpu = gpu_getter(labels) + if gpu is None: + logger.warning( + "Node %s: GPU resource found, but failed to detect its model: %s=%d", + node_name, + gpu_resource, + gpu_count, + ) + return [] + return [gpu] * gpu_count + logger.debug("Node %s: no GPU resource found", node_name) + return [] + + +def _get_nvidia_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]: # We rely on https://github.com/NVIDIA/k8s-device-plugin/tree/main/docs/gpu-feature-discovery # to detect gpus. Note that "nvidia.com/gpu.product" is not a short gpu name like "T4" or # "A100" but a product name like "Tesla-T4" or "A100-SXM4-40GB". # Thus, we convert the product name to a known gpu name. - # TODO: support other GPU vendors - gpu_count = labels.get(NVIDIA_GPU_COUNT_LABEL) gpu_product = labels.get(NVIDIA_GPU_PRODUCT_LABEL) - if gpu_count is None or gpu_product is None: - return [], None - gpu_count = int(gpu_count) - gpu_name = None - for known_gpu_name in NVIDIA_GPU_NAMES: - if known_gpu_name.lower() in gpu_product.lower().split("-"): - gpu_name = known_gpu_name + if gpu_product is None: + return None + for gpu_name in NVIDIA_GPU_NAMES: + if gpu_name.lower() in gpu_product.lower().split("-"): break - if gpu_name is None: - return [], None + else: + return None gpu_info = NVIDIA_GPU_NAME_TO_GPU_INFO[gpu_name] gpu_memory = gpu_info.memory * 1024 # A100 may come in two variants if "40GB" in gpu_product: gpu_memory = 40 * 1024 - gpus = [ - Gpu(vendor=AcceleratorVendor.NVIDIA, name=gpu_name, memory_mib=gpu_memory) - for _ in range(gpu_count) - ] - return gpus, gpu_product + return Gpu(vendor=AcceleratorVendor.NVIDIA, name=gpu_name, memory_mib=gpu_memory) + + +def _get_amd_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]: + # (AMDGPUInfo.name, AMDGPUInfo.memory) pairs + gpus: set[tuple[str, int]] = set() + for label in labels: + if not label.startswith(AMD_GPU_DEVICE_ID_LABEL_PREFIX): + continue + _, _, _device_id = label.rpartition(".") + device_id = int(_device_id, 16) + gpu_info = AMD_GPU_DEVICE_ID_TO_GPU_INFO.get(device_id) + if gpu_info is None: + logger.warning("Unknown AMD GPU device id: %X", device_id) + continue + gpus.add((gpu_info.name, gpu_info.memory)) + if not gpus: + return None + if len(gpus) == 1: + gpu_name, gpu_memory_gib = next(iter(gpus)) + return Gpu(vendor=AcceleratorVendor.AMD, name=gpu_name, memory_mib=gpu_memory_gib * 1024) + logger.warning("Multiple AMD GPU models detected: %s, ignoring all GPUs", gpus) + return None + + +def _get_pod_spec_parameters_for_gpu( + api: client.CoreV1Api, gpu: Gpu +) -> tuple[str, client.V1NodeAffinity, str]: + gpu_vendor = gpu.vendor + assert gpu_vendor is not None + if gpu_vendor == AcceleratorVendor.NVIDIA: + node_affinity = _get_nvidia_gpu_node_affinity(api, gpu) + return NVIDIA_GPU_RESOURCE, node_affinity, NVIDIA_GPU_NODE_TAINT + if gpu_vendor == AcceleratorVendor.AMD: + node_affinity = _get_amd_gpu_node_affinity(gpu) + return AMD_GPU_RESOURCE, node_affinity, AMD_GPU_NODE_TAINT + raise ComputeError(f"Unsupported GPU vendor: {gpu_vendor}") + + +def _get_nvidia_gpu_node_affinity(api: client.CoreV1Api, gpu: Gpu) -> client.V1NodeAffinity: + matching_gpu_label_values: set[str] = set() + # We cannot generate an expected GPU label value from the Gpu model instance + # as the actual values may have additional components (socket, memory type, etc.) + # that we don't preserve in the Gpu model, e.g., "NVIDIA-H100-80GB-HBM3". + # Moreover, a single Gpu may match multiple label values. + # As a workaround, we iterate and process all node labels once again (we already + # processed them in `get_offers_by_requirements()`). + node_list = call_api_method(api.list_node, client.V1NodeList) + nodes = get_value(node_list, ".items", list[client.V1Node], required=True) + for node in nodes: + labels = get_value(node, ".metadata.labels", dict[str, str]) or {} + if _get_nvidia_gpu_from_node_labels(labels) == gpu: + matching_gpu_label_values.add(labels[NVIDIA_GPU_PRODUCT_LABEL]) + if not matching_gpu_label_values: + raise ComputeError(f"NVIDIA GPU is requested but no matching GPU labels found: {gpu=}") + logger.debug("Selecting nodes by labels %s for NVIDIA %s", matching_gpu_label_values, gpu.name) + return client.V1NodeAffinity( + required_during_scheduling_ignored_during_execution=client.V1NodeSelector( + node_selector_terms=[ + client.V1NodeSelectorTerm( + match_expressions=[ + client.V1NodeSelectorRequirement( + key=NVIDIA_GPU_PRODUCT_LABEL, + operator=Operator.IN, + values=list(matching_gpu_label_values), + ), + ], + ), + ], + ), + ) + + +def _get_amd_gpu_node_affinity(gpu: Gpu) -> client.V1NodeAffinity: + device_ids = AMD_GPU_NAME_TO_DEVICE_IDS.get(gpu.name) + if device_ids is None: + raise ComputeError(f"AMD GPU is requested but no matching device ids found: {gpu=}") + return client.V1NodeAffinity( + required_during_scheduling_ignored_during_execution=client.V1NodeSelector( + node_selector_terms=[ + client.V1NodeSelectorTerm( + match_expressions=[ + client.V1NodeSelectorRequirement( + key=f"{AMD_GPU_DEVICE_ID_LABEL_PREFIX}{device_id:x}", + operator=Operator.EXISTS, + ), + ], + ) + for device_id in device_ids + ], + ), + ) def _continue_setup_jump_pod( diff --git a/src/tests/_internal/core/backends/kubernetes/test_compute.py b/src/tests/_internal/core/backends/kubernetes/test_compute.py index 5736e35a0..f0bcbbc17 100644 --- a/src/tests/_internal/core/backends/kubernetes/test_compute.py +++ b/src/tests/_internal/core/backends/kubernetes/test_compute.py @@ -1,24 +1,53 @@ -from dstack._internal.core.backends.kubernetes.compute import _get_gpus_from_node_labels +import logging + +import pytest +from gpuhunt import AcceleratorVendor + +from dstack._internal.core.backends.kubernetes.compute import ( + _get_amd_gpu_from_node_labels, + _get_nvidia_gpu_from_node_labels, +) from dstack._internal.core.models.instances import Gpu -class TestGetGPUsFromNodeLabels: +class TestGetNvidiaGPUFromNodeLabels: + def test_returns_none_if_no_labels(self): + assert _get_nvidia_gpu_from_node_labels({}) is None + + def test_returns_correct_memory_for_different_A100(self): + assert _get_nvidia_gpu_from_node_labels( + {"nvidia.com/gpu.product": "A100-SXM4-40GB"} + ) == Gpu(vendor=AcceleratorVendor.NVIDIA, name="A100", memory_mib=40 * 1024) + + assert _get_nvidia_gpu_from_node_labels( + {"nvidia.com/gpu.product": "A100-SXM4-80GB"} + ) == Gpu(vendor=AcceleratorVendor.NVIDIA, name="A100", memory_mib=80 * 1024) + + +class TestGetAMDGPUFromNodeLabels: def test_returns_no_gpus_if_no_labels(self): - assert _get_gpus_from_node_labels({}) == ([], None) + assert _get_amd_gpu_from_node_labels({}) is None - def test_returns_no_gpus_if_missing_labels(self): - assert _get_gpus_from_node_labels({"nvidia.com/gpu.count": 1}) == ([], None) + def test_returns_known_gpu(self): + assert _get_amd_gpu_from_node_labels({"beta.amd.com/gpu.device-id.74b5": "4"}) == Gpu( + vendor=AcceleratorVendor.AMD, name="MI300X", memory_mib=192 * 1024 + ) - def test_returns_correct_memory_for_different_A100(self): - assert _get_gpus_from_node_labels( - { - "nvidia.com/gpu.count": 1, - "nvidia.com/gpu.product": "A100-SXM4-40GB", - } - ) == ([Gpu(name="A100", memory_mib=40 * 1024)], "A100-SXM4-40GB") - assert _get_gpus_from_node_labels( - { - "nvidia.com/gpu.count": 1, - "nvidia.com/gpu.product": "A100-SXM4-80GB", - } - ) == ([Gpu(name="A100", memory_mib=80 * 1024)], "A100-SXM4-80GB") + def test_returns_known_gpu_if_multiple_device_ids_match_the_same_gpu(self): + # 4x AMD Instinct MI300X VF + 4x AMD Instinct MI300X + labels = {"beta.amd.com/gpu.device-id.74b5": "4", "beta.amd.com/gpu.device-id.74a1": "4"} + assert _get_amd_gpu_from_node_labels(labels) == Gpu( + vendor=AcceleratorVendor.AMD, name="MI300X", memory_mib=192 * 1024 + ) + + def test_returns_none_if_device_id_is_unknown(self, caplog: pytest.LogCaptureFixture): + caplog.set_level(logging.WARNING) + assert _get_amd_gpu_from_node_labels({"beta.amd.com/gpu.device-id.ffff": "4"}) is None + assert "Unknown AMD GPU device id: FFFF" in caplog.text + + def test_returns_none_if_multiple_gpu_models(self, caplog: pytest.LogCaptureFixture): + caplog.set_level(logging.WARNING) + # 4x AMD Instinct MI300X VF + 4x AMD Instinct MI325X + labels = {"beta.amd.com/gpu.device-id.74b5": "4", "beta.amd.com/gpu.device-id.74a5": "4"} + assert _get_amd_gpu_from_node_labels(labels) is None + assert "Multiple AMD GPU models detected" in caplog.text