Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencies = [
"python-multipart>=0.0.16",
"filelock",
"psutil",
"gpuhunt==0.1.8",
"gpuhunt==0.1.10",
"argcomplete>=3.5.0",
"ignore-python>=0.2.0",
"orjson",
Expand Down
243 changes: 160 additions & 83 deletions src/dstack/_internal/core/backends/kubernetes/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from enum import Enum
from typing import List, Optional, Tuple

from gpuhunt import KNOWN_NVIDIA_GPUS, AcceleratorVendor
from gpuhunt import KNOWN_AMD_GPUS, KNOWN_NVIDIA_GPUS, AcceleratorVendor
from kubernetes import client

from dstack._internal.core.backends.base.compute import (
Expand Down Expand Up @@ -59,19 +59,31 @@
logger = get_logger(__name__)

JUMP_POD_SSH_PORT = 22

NVIDIA_GPU_NAME_TO_GPU_INFO = {gpu.name: gpu for gpu in KNOWN_NVIDIA_GPUS}
NVIDIA_GPU_NAMES = NVIDIA_GPU_NAME_TO_GPU_INFO.keys()
DUMMY_REGION = "-"

NVIDIA_GPU_RESOURCE = "nvidia.com/gpu"
NVIDIA_GPU_COUNT_LABEL = f"{NVIDIA_GPU_RESOURCE}.count"
NVIDIA_GPU_PRODUCT_LABEL = f"{NVIDIA_GPU_RESOURCE}.product"
NVIDIA_GPU_NODE_TAINT = NVIDIA_GPU_RESOURCE
NVIDIA_GPU_PRODUCT_LABEL = f"{NVIDIA_GPU_RESOURCE}.product"

AMD_GPU_RESOURCE = "amd.com/gpu"
AMD_GPU_NODE_TAINT = AMD_GPU_RESOURCE
# The oldest but still supported label format, the safest option, see the commit message:
# https://github.com/ROCm/k8s-device-plugin/commit/c0b0231b391a56bc9da4f362d561e25e960d7a48
# E.g., beta.amd.com/gpu.device-id.74b5=4 - A node with four MI300X VF (0x74b5) GPUs
# We cannot rely on the beta.amd.com/gpu.product-name.* label, as it may be missing, see the issue:
# https://github.com/ROCm/k8s-device-plugin/issues/112
AMD_GPU_DEVICE_ID_LABEL_PREFIX = f"beta.{AMD_GPU_RESOURCE}.device-id."

# Taints we know and tolerate when creating our objects, e.g., the jump pod.
TOLERATED_NODE_TAINTS = (NVIDIA_GPU_NODE_TAINT,)
TOLERATED_NODE_TAINTS = (NVIDIA_GPU_NODE_TAINT, AMD_GPU_NODE_TAINT)

DUMMY_REGION = "-"
NVIDIA_GPU_NAME_TO_GPU_INFO = {gpu.name: gpu for gpu in KNOWN_NVIDIA_GPUS}
NVIDIA_GPU_NAMES = NVIDIA_GPU_NAME_TO_GPU_INFO.keys()

AMD_GPU_DEVICE_ID_TO_GPU_INFO = {
device_id: gpu_info for gpu_info in KNOWN_AMD_GPUS for device_id in gpu_info.device_ids
}
AMD_GPU_NAME_TO_DEVICE_IDS = {gpu.name: gpu.device_ids for gpu in KNOWN_AMD_GPUS}


class Operator(str, Enum):
Expand Down Expand Up @@ -112,21 +124,15 @@ def get_offers_by_requirements(
nodes = get_value(node_list, ".items", list[client.V1Node], required=True)
for node in nodes:
try:
labels = get_value(node, ".metadata.labels", dict[str, str]) or {}
name = get_value(node, ".metadata.name", str, required=True)
cpus = _parse_cpu(
get_value(node, ".status.allocatable['cpu']", str, required=True)
)
cpu_arch = normalize_arch(
get_value(node, ".status.node_info.architecture", str)
).to_cpu_architecture()
memory_mib = _parse_memory(
get_value(node, ".status.allocatable['memory']", str, required=True)
)
gpus, _ = _get_gpus_from_node_labels(labels)
disk_size_mib = _parse_memory(
get_value(node, ".status.allocatable['ephemeral-storage']", str, required=True)
)
allocatable = get_value(node, ".status.allocatable", dict[str, str], required=True)
cpus = _parse_cpu(allocatable["cpu"])
memory_mib = _parse_memory(allocatable["memory"])
disk_size_mib = _parse_memory(allocatable["ephemeral-storage"])
gpus = _get_node_gpus(node)
except (AttributeError, KeyError, ValueError) as e:
logger.exception("Failed to process node: %s: %s", type(e).__name__, e)
continue
Expand Down Expand Up @@ -217,59 +223,18 @@ def run_job(
"GPU is requested but the offer has no GPUs:"
f" {gpu_spec=} {instance_offer=}",
)
offer_gpu = offer_gpus[0]
matching_gpu_label_values: set[str] = set()
# We cannot generate an expected GPU label value from the Gpu model instance
# as the actual values may have additional components (socket, memory type, etc.)
# that we don't preserve in the Gpu model, e.g., "NVIDIA-H100-80GB-HBM3".
# Moreover, a single Gpu may match multiple label values.
# As a workaround, we iterate and process all node labels once again (we already
# processed them in `get_offers_by_requirements()`).
node_list = call_api_method(
self.api.list_node,
client.V1NodeList,
)
nodes = get_value(node_list, ".items", list[client.V1Node], required=True)
for node in nodes:
labels = get_value(node, ".metadata.labels", dict[str, str])
if not labels:
continue
gpus, gpu_label_value = _get_gpus_from_node_labels(labels)
if not gpus or gpu_label_value is None:
continue
if gpus[0] == offer_gpu:
matching_gpu_label_values.add(gpu_label_value)
if not matching_gpu_label_values:
raise ComputeError(
f"GPU is requested but no matching GPU labels found: {gpu_spec=}"
)
logger.debug(
"Requesting %d GPU(s), node labels: %s", gpu_min, matching_gpu_label_values
)
# TODO: support other GPU vendors
resources_requests[NVIDIA_GPU_RESOURCE] = str(gpu_min)
resources_limits[NVIDIA_GPU_RESOURCE] = str(gpu_min)
node_affinity = client.V1NodeAffinity(
required_during_scheduling_ignored_during_execution=[
client.V1NodeSelectorTerm(
match_expressions=[
client.V1NodeSelectorRequirement(
key=NVIDIA_GPU_PRODUCT_LABEL,
operator=Operator.IN,
values=list(matching_gpu_label_values),
),
],
),
],
gpu_resource, node_affinity, node_taint = _get_pod_spec_parameters_for_gpu(
self.api, offer_gpus[0]
)
logger.debug("Requesting GPU resource: %s=%d", gpu_resource, gpu_min)
resources_requests[gpu_resource] = resources_limits[gpu_resource] = str(gpu_min)
# It should be NoSchedule, but we also add NoExecute toleration just in case.
for effect in [TaintEffect.NO_SCHEDULE, TaintEffect.NO_EXECUTE]:
tolerations.append(
client.V1Toleration(
key=NVIDIA_GPU_NODE_TAINT, operator=Operator.EXISTS, effect=effect
key=node_taint, operator=Operator.EXISTS, effect=effect
)
)

if (memory_min := resources_spec.memory.min) is not None:
resources_requests["memory"] = _render_memory(memory_min)
if (
Expand Down Expand Up @@ -331,7 +296,9 @@ def run_job(
volume_mounts=volume_mounts,
)
],
affinity=node_affinity,
affinity=client.V1Affinity(
node_affinity=node_affinity,
),
tolerations=tolerations,
volumes=volumes_,
),
Expand Down Expand Up @@ -550,34 +517,144 @@ def _render_memory(memory: Memory) -> str:
return f"{float(memory)}Gi"


def _get_gpus_from_node_labels(labels: dict[str, str]) -> tuple[list[Gpu], Optional[str]]:
def _get_node_gpus(node: client.V1Node) -> list[Gpu]:
node_name = get_value(node, ".metadata.name", str, required=True)
allocatable = get_value(node, ".status.allocatable", dict[str, str], required=True)
labels = get_value(node, ".metadata.labels", dict[str, str]) or {}
for gpu_resource, gpu_getter in (
(NVIDIA_GPU_RESOURCE, _get_nvidia_gpu_from_node_labels),
(AMD_GPU_RESOURCE, _get_amd_gpu_from_node_labels),
):
_gpu_count = allocatable.get(gpu_resource)
if not _gpu_count:
continue
gpu_count = int(_gpu_count)
if gpu_count < 1:
continue
gpu = gpu_getter(labels)
if gpu is None:
logger.warning(
"Node %s: GPU resource found, but failed to detect its model: %s=%d",
node_name,
gpu_resource,
gpu_count,
)
return []
return [gpu] * gpu_count
logger.debug("Node %s: no GPU resource found", node_name)
return []


def _get_nvidia_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]:
# We rely on https://github.com/NVIDIA/k8s-device-plugin/tree/main/docs/gpu-feature-discovery
# to detect gpus. Note that "nvidia.com/gpu.product" is not a short gpu name like "T4" or
# "A100" but a product name like "Tesla-T4" or "A100-SXM4-40GB".
# Thus, we convert the product name to a known gpu name.
# TODO: support other GPU vendors
gpu_count = labels.get(NVIDIA_GPU_COUNT_LABEL)
gpu_product = labels.get(NVIDIA_GPU_PRODUCT_LABEL)
if gpu_count is None or gpu_product is None:
return [], None
gpu_count = int(gpu_count)
gpu_name = None
for known_gpu_name in NVIDIA_GPU_NAMES:
if known_gpu_name.lower() in gpu_product.lower().split("-"):
gpu_name = known_gpu_name
if gpu_product is None:
return None
for gpu_name in NVIDIA_GPU_NAMES:
if gpu_name.lower() in gpu_product.lower().split("-"):
break
if gpu_name is None:
return [], None
else:
return None
gpu_info = NVIDIA_GPU_NAME_TO_GPU_INFO[gpu_name]
gpu_memory = gpu_info.memory * 1024
# A100 may come in two variants
if "40GB" in gpu_product:
gpu_memory = 40 * 1024
gpus = [
Gpu(vendor=AcceleratorVendor.NVIDIA, name=gpu_name, memory_mib=gpu_memory)
for _ in range(gpu_count)
]
return gpus, gpu_product
return Gpu(vendor=AcceleratorVendor.NVIDIA, name=gpu_name, memory_mib=gpu_memory)


def _get_amd_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]:
# (AMDGPUInfo.name, AMDGPUInfo.memory) pairs
gpus: set[tuple[str, int]] = set()
for label in labels:
if not label.startswith(AMD_GPU_DEVICE_ID_LABEL_PREFIX):
continue
_, _, _device_id = label.rpartition(".")
device_id = int(_device_id, 16)
gpu_info = AMD_GPU_DEVICE_ID_TO_GPU_INFO.get(device_id)
if gpu_info is None:
logger.warning("Unknown AMD GPU device id: %X", device_id)
continue
gpus.add((gpu_info.name, gpu_info.memory))
if not gpus:
return None
if len(gpus) == 1:
gpu_name, gpu_memory_gib = next(iter(gpus))
return Gpu(vendor=AcceleratorVendor.AMD, name=gpu_name, memory_mib=gpu_memory_gib * 1024)
logger.warning("Multiple AMD GPU models detected: %s, ignoring all GPUs", gpus)
return None


def _get_pod_spec_parameters_for_gpu(
api: client.CoreV1Api, gpu: Gpu
) -> tuple[str, client.V1NodeAffinity, str]:
gpu_vendor = gpu.vendor
assert gpu_vendor is not None
if gpu_vendor == AcceleratorVendor.NVIDIA:
node_affinity = _get_nvidia_gpu_node_affinity(api, gpu)
return NVIDIA_GPU_RESOURCE, node_affinity, NVIDIA_GPU_NODE_TAINT
if gpu_vendor == AcceleratorVendor.AMD:
node_affinity = _get_amd_gpu_node_affinity(gpu)
return AMD_GPU_RESOURCE, node_affinity, AMD_GPU_NODE_TAINT
raise ComputeError(f"Unsupported GPU vendor: {gpu_vendor}")


def _get_nvidia_gpu_node_affinity(api: client.CoreV1Api, gpu: Gpu) -> client.V1NodeAffinity:
matching_gpu_label_values: set[str] = set()
# We cannot generate an expected GPU label value from the Gpu model instance
# as the actual values may have additional components (socket, memory type, etc.)
# that we don't preserve in the Gpu model, e.g., "NVIDIA-H100-80GB-HBM3".
# Moreover, a single Gpu may match multiple label values.
# As a workaround, we iterate and process all node labels once again (we already
# processed them in `get_offers_by_requirements()`).
node_list = call_api_method(api.list_node, client.V1NodeList)
nodes = get_value(node_list, ".items", list[client.V1Node], required=True)
for node in nodes:
labels = get_value(node, ".metadata.labels", dict[str, str]) or {}
if _get_nvidia_gpu_from_node_labels(labels) == gpu:
matching_gpu_label_values.add(labels[NVIDIA_GPU_PRODUCT_LABEL])
if not matching_gpu_label_values:
raise ComputeError(f"NVIDIA GPU is requested but no matching GPU labels found: {gpu=}")
logger.debug("Selecting nodes by labels %s for NVIDIA %s", matching_gpu_label_values, gpu.name)
return client.V1NodeAffinity(
required_during_scheduling_ignored_during_execution=client.V1NodeSelector(
node_selector_terms=[
client.V1NodeSelectorTerm(
match_expressions=[
client.V1NodeSelectorRequirement(
key=NVIDIA_GPU_PRODUCT_LABEL,
operator=Operator.IN,
values=list(matching_gpu_label_values),
),
],
),
],
),
)


def _get_amd_gpu_node_affinity(gpu: Gpu) -> client.V1NodeAffinity:
device_ids = AMD_GPU_NAME_TO_DEVICE_IDS.get(gpu.name)
if device_ids is None:
raise ComputeError(f"AMD GPU is requested but no matching device ids found: {gpu=}")
return client.V1NodeAffinity(
required_during_scheduling_ignored_during_execution=client.V1NodeSelector(
node_selector_terms=[
client.V1NodeSelectorTerm(
match_expressions=[
client.V1NodeSelectorRequirement(
key=f"{AMD_GPU_DEVICE_ID_LABEL_PREFIX}{device_id:x}",
operator=Operator.EXISTS,
),
],
)
for device_id in device_ids
],
),
)


def _continue_setup_jump_pod(
Expand Down
65 changes: 47 additions & 18 deletions src/tests/_internal/core/backends/kubernetes/test_compute.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,53 @@
from dstack._internal.core.backends.kubernetes.compute import _get_gpus_from_node_labels
import logging

import pytest
from gpuhunt import AcceleratorVendor

from dstack._internal.core.backends.kubernetes.compute import (
_get_amd_gpu_from_node_labels,
_get_nvidia_gpu_from_node_labels,
)
from dstack._internal.core.models.instances import Gpu


class TestGetGPUsFromNodeLabels:
class TestGetNvidiaGPUFromNodeLabels:
def test_returns_none_if_no_labels(self):
assert _get_nvidia_gpu_from_node_labels({}) is None

def test_returns_correct_memory_for_different_A100(self):
assert _get_nvidia_gpu_from_node_labels(
{"nvidia.com/gpu.product": "A100-SXM4-40GB"}
) == Gpu(vendor=AcceleratorVendor.NVIDIA, name="A100", memory_mib=40 * 1024)

assert _get_nvidia_gpu_from_node_labels(
{"nvidia.com/gpu.product": "A100-SXM4-80GB"}
) == Gpu(vendor=AcceleratorVendor.NVIDIA, name="A100", memory_mib=80 * 1024)


class TestGetAMDGPUFromNodeLabels:
def test_returns_no_gpus_if_no_labels(self):
assert _get_gpus_from_node_labels({}) == ([], None)
assert _get_amd_gpu_from_node_labels({}) is None

def test_returns_no_gpus_if_missing_labels(self):
assert _get_gpus_from_node_labels({"nvidia.com/gpu.count": 1}) == ([], None)
def test_returns_known_gpu(self):
assert _get_amd_gpu_from_node_labels({"beta.amd.com/gpu.device-id.74b5": "4"}) == Gpu(
vendor=AcceleratorVendor.AMD, name="MI300X", memory_mib=192 * 1024
)

def test_returns_correct_memory_for_different_A100(self):
assert _get_gpus_from_node_labels(
{
"nvidia.com/gpu.count": 1,
"nvidia.com/gpu.product": "A100-SXM4-40GB",
}
) == ([Gpu(name="A100", memory_mib=40 * 1024)], "A100-SXM4-40GB")
assert _get_gpus_from_node_labels(
{
"nvidia.com/gpu.count": 1,
"nvidia.com/gpu.product": "A100-SXM4-80GB",
}
) == ([Gpu(name="A100", memory_mib=80 * 1024)], "A100-SXM4-80GB")
def test_returns_known_gpu_if_multiple_device_ids_match_the_same_gpu(self):
# 4x AMD Instinct MI300X VF + 4x AMD Instinct MI300X
labels = {"beta.amd.com/gpu.device-id.74b5": "4", "beta.amd.com/gpu.device-id.74a1": "4"}
assert _get_amd_gpu_from_node_labels(labels) == Gpu(
vendor=AcceleratorVendor.AMD, name="MI300X", memory_mib=192 * 1024
)

def test_returns_none_if_device_id_is_unknown(self, caplog: pytest.LogCaptureFixture):
caplog.set_level(logging.WARNING)
assert _get_amd_gpu_from_node_labels({"beta.amd.com/gpu.device-id.ffff": "4"}) is None
assert "Unknown AMD GPU device id: FFFF" in caplog.text

def test_returns_none_if_multiple_gpu_models(self, caplog: pytest.LogCaptureFixture):
caplog.set_level(logging.WARNING)
# 4x AMD Instinct MI300X VF + 4x AMD Instinct MI325X
labels = {"beta.amd.com/gpu.device-id.74b5": "4", "beta.amd.com/gpu.device-id.74a5": "4"}
assert _get_amd_gpu_from_node_labels(labels) is None
assert "Multiple AMD GPU models detected" in caplog.text