diff --git a/.github/workflows/python.yaml b/.github/workflows/python.yaml index c8a0728e0e..a7583eaa32 100644 --- a/.github/workflows/python.yaml +++ b/.github/workflows/python.yaml @@ -73,21 +73,19 @@ jobs: strategy: fail-fast: false matrix: - os: [ "ubuntu-latest", "macos-13", "windows-latest" ] + os: [ "ubuntu-latest", "macos-latest", "windows-latest" ] python-version: [ "3.9", "3.10", "3.11", "3.12", "3.13" ] module: [ "xinference" ] exclude: - - { os: macos-13, python-version: 3.10 } - - { os: macos-13, python-version: 3.11 } - - { os: macos-13, python-version: 3.12 } - - { os: macos-13, python-version: 3.13 } - { os: windows-latest, python-version: 3.10 } - { os: windows-latest, python-version: 3.11 } - { os: windows-latest, python-version: 3.12 } + - { os: macos-latest, python-version: 3.10 } + - { os: macos-latest, python-version: 3.11 } + - { os: macos-latest, python-version: 3.12 } include: - { os: self-hosted, module: gpu, python-version: "3.11"} - { os: macos-latest, module: metal, python-version: "3.10" } - - { os: macos-latest, python-version: "3.13" } steps: - name: Check out code diff --git a/doc/source/locale/zh_CN/LC_MESSAGES/user_guide/launch.po b/doc/source/locale/zh_CN/LC_MESSAGES/user_guide/launch.po index bcb925f19c..11d056877f 100644 --- a/doc/source/locale/zh_CN/LC_MESSAGES/user_guide/launch.po +++ b/doc/source/locale/zh_CN/LC_MESSAGES/user_guide/launch.po @@ -8,7 +8,7 @@ msgid "" msgstr "" "Project-Id-Version: Xinference \n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2025-08-02 23:15+0800\n" +"POT-Creation-Date: 2025-12-12 18:34+0800\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME \n" "Language: zh_CN\n" @@ -17,7 +17,7 @@ msgstr "" "MIME-Version: 1.0\n" "Content-Type: text/plain; charset=utf-8\n" "Content-Transfer-Encoding: 8bit\n" -"Generated-By: Babel 2.14.0\n" +"Generated-By: Babel 2.17.0\n" #: ../../source/user_guide/launch.rst:5 msgid "Model Launching Instructions" @@ -46,11 +46,102 @@ msgstr "" "两张 GPU 上。Xinference 会自动进行负载均衡,确保请求均匀分配到多张卡上。" "用户看到的仍是一个模型,这大大提升了整体资源利用率。" -#: ../../source/user_guide/launch.rst:18 +#: ../../source/user_guide/launch.rst:17 +msgid "Traditional Multi-Instance Deployment:" +msgstr "旧版本多实例部署:" + +#: ../../source/user_guide/launch.rst:19 +msgid "" +"Before v1.15.0:When you have multiple GPU cards, each capable of hosting" +" one model instance, you can set the number of instances equal to the " +"number of GPUs. For example:" +msgstr "" +"在v1.15.0版本前:当您拥有多张GPU显卡时,每张显卡可承载一个模型实例,此时" +"可将实例数量设置为等于GPU数量。例如:" + +#: ../../source/user_guide/launch.rst:21 +msgid "2 GPUs, 2 instances: Each GPU runs one model instance" +msgstr "2张GPU,2个实例:每张GPU运行一个模型实例" + +#: ../../source/user_guide/launch.rst:22 +msgid "4 GPUs, 4 instances: Each GPU runs one model instance" +msgstr "4张GPU,4个实例:每张GPU运行一个模型实例" + +#: ../../source/user_guide/launch.rst:26 +msgid "Introduce a new environment variable:" +msgstr "引入一个新的环境变量:" + +#: ../../source/user_guide/launch.rst:32 +msgid "" +"Control whether to enable the single GPU multi-copy feature Default " +"value: 1" +msgstr "控制是否启用单GPU多副本功能,默认值:1" + +#: ../../source/user_guide/launch.rst:35 +msgid "New Feature: Smart Replica Deployment" +msgstr "新功能:智能副本部署" + +#: ../../source/user_guide/launch.rst:37 +msgid "Single GPU Multi-Replica" +msgstr "单GPU多副本" + +#: ../../source/user_guide/launch.rst:39 +msgid "New Support: Run multiple model replicas even with just one GPU." +msgstr "新增支持:即使仅有一块GPU,也能运行多个模型副本。" + +#: ../../source/user_guide/launch.rst:41 +msgid "Scenario: You have 1 GPU with sufficient VRAM" +msgstr "场景:您拥有1个GPU且显存充足" + +#: ../../source/user_guide/launch.rst:42 +msgid "Configuration: Replica Count = 3, GPU Count = 1" +msgstr "配置:副本数量=3,GPU数量=1" + +#: ../../source/user_guide/launch.rst:43 +msgid "Result: 3 model instances running on the same GPU, sharing GPU resources" +msgstr "结果:3个模型实例,在同一GPU上运行,共享GPU资源" + +#: ../../source/user_guide/launch.rst:45 +msgid "Hybrid GPU Allocation" +msgstr "混合GPU分配" + +#: ../../source/user_guide/launch.rst:47 +msgid "" +"Smart Allocation: Number of replicas may differ from GPU count; system " +"intelligently distributes" +msgstr "智能分配: 副本数可以不等于GPU数量,系统会智能分配" + +#: ../../source/user_guide/launch.rst:49 +msgid "Scenario: You have 2 GPUs and need 3 replicas" +msgstr "场景: 你有2张GPU,需要3个副本" + +#: ../../source/user_guide/launch.rst:50 +msgid "Configuration: Replicas=3, GPUs=2" +msgstr "配置: 副本数=3,GPU数量=2" + +#: ../../source/user_guide/launch.rst:51 +msgid "Result: GPU0 runs 2 instances, GPU1 runs 1 instance" +msgstr "结果: GPU0运行2个实例,GPU1运行1个实例" + +#: ../../source/user_guide/launch.rst:54 +msgid "GPU Allocation Strategy" +msgstr "GPU分配策略" + +#: ../../source/user_guide/launch.rst:56 +msgid "" +"The current policy is *Idle Priority*: The scheduler always attempts to " +"assign replicas to the least utilized GPU. Use the " +"``XINFERENCE_LAUNCH_ALLOWED_GPUS`` parameter to restrict the range of " +"available GPUs." +msgstr "" +"当前策略为 *空闲优先* :调度器始终尝试将副本分配至最空闲的GPU。" +"使用 ``XINFERENCE_LAUNCH_ALLOWED_GPUS`` 参数限制可选GPU范围。" + +#: ../../source/user_guide/launch.rst:59 msgid "Set Environment Variables" msgstr "设置环境变量" -#: ../../source/user_guide/launch.rst:22 +#: ../../source/user_guide/launch.rst:63 msgid "" "Sometimes, we want to specify environment variables for a particular " "model at runtime. Since v1.8.1, Xinference provides the capability to " @@ -60,21 +151,21 @@ msgstr "" "有时我们希望在运行时为特定模型指定环境变量。从 v1.8.1 开始,Xinference " "提供了单独配置环境变量的功能,无需在启动 Xinference 前设置。" -#: ../../source/user_guide/launch.rst:25 +#: ../../source/user_guide/launch.rst:66 msgid "For Web UI." msgstr "针对 Web UI。" -#: ../../source/user_guide/launch.rst:31 +#: ../../source/user_guide/launch.rst:72 msgid "" "When using the command line, use ``--env`` to specify an environment " "variable." msgstr "命令行使用时,使用 ``--env`` 指定环境变量。" -#: ../../source/user_guide/launch.rst:33 +#: ../../source/user_guide/launch.rst:74 msgid "Example usage:" msgstr "示例用法:" -#: ../../source/user_guide/launch.rst:39 +#: ../../source/user_guide/launch.rst:80 msgid "" "Take vLLM as an example: it has versions V1 and V0, and by default, it " "automatically determines which version to use. If you want to force the " @@ -85,13 +176,37 @@ msgstr "" "在加载模型时强制通过设置 ``VLLM_USE_V1=0`` 来使用 V0,可以指定该环境变量" "。" -#: ../../source/user_guide/launch.rst:43 +#: ../../source/user_guide/launch.rst:84 msgid "Configuring Model Virtual Environment" msgstr "配置模型虚拟空间" -#: ../../source/user_guide/launch.rst:47 +#: ../../source/user_guide/launch.rst:88 msgid "" "For this part, please refer to :ref:`toggling virtual environments and " "customizing dependencies `." -msgstr "对于这部分,请参考 :ref:`开关虚拟空间和定制依赖 `。" +msgstr "" +"对于这部分,请参考 :ref:`开关虚拟空间和定制依赖 `。" + +#~ msgid "" +#~ "The current strategy is *idle-first " +#~ "with a first round spread*: the " +#~ "scheduler first tries to place one " +#~ "replica on each available GPU (always" +#~ " picking the emptiest unused GPU). " +#~ "Once every GPU has at least one" +#~ " replica, remaining replicas keep stacking" +#~ " onto the GPU that is currently " +#~ "the emptiest (single-GPU multi-replica" +#~ " is allowed). Use " +#~ "``XINFERENCE_LAUNCH_ALLOWED_GPUS`` to limit which" +#~ " GPUs can be chosen." +#~ msgstr "" +#~ "当前策略为 *空闲优先且首轮分散* :" +#~ "调度器首先尝试将每个副本分配至可用GPU" +#~ "(始终选择最空闲的未用GPU)。当每块" +#~ "GPU至少承载一个副本后,剩余副本将持续" +#~ "堆叠至当前最空闲的GPU(允许单GPU承载" +#~ "多个副本)。使用 ``XINFERENCE_LAUNCH_" +#~ "ALLOWED_GPUS`` 参数限制可选GPU范围。" diff --git a/doc/source/user_guide/launch.rst b/doc/source/user_guide/launch.rst index aac59bc321..de2738750a 100644 --- a/doc/source/user_guide/launch.rst +++ b/doc/source/user_guide/launch.rst @@ -14,6 +14,47 @@ you can set the replica count to 2. This way, two identical instances of the mod Xinference automatically load-balances requests to ensure even distribution across multiple GPUs. Meanwhile, users see it as a single model, which greatly improves overall resource utilization. +Traditional Multi-Instance Deployment: + +Before v1.15.0:When you have multiple GPU cards, each capable of hosting one model instance, you can set the number of instances equal to the number of GPUs. For example: + +- 2 GPUs, 2 instances: Each GPU runs one model instance +- 4 GPUs, 4 instances: Each GPU runs one model instance + +.. versionadded:: v1.15.0 + +Introduce a new environment variable: + +.. code-block:: bash + + XINFERENCE_ENABLE_SINGLE_GPU_MULTI_REPLICA + +Control whether to enable the single GPU multi-copy feature +Default value: 1 + +New Feature: Smart Replica Deployment + +1. Single GPU Multi-Replica + +New Support: Run multiple model replicas even with just one GPU. + +- Scenario: You have 1 GPU with sufficient VRAM +- Configuration: Replica Count = 3, GPU Count = 1 +- Result: 3 model instances running on the same GPU, sharing GPU resources + +2. Hybrid GPU Allocation + +Smart Allocation: Number of replicas may differ from GPU count; system intelligently distributes + +- Scenario: You have 2 GPUs and need 3 replicas +- Configuration: Replicas=3, GPUs=2 +- Result: GPU0 runs 2 instances, GPU1 runs 1 instance + +GPU Allocation Strategy +======================= + +The current policy is *Idle Priority*: The scheduler always attempts to assign replicas to the least utilized GPU. Use the ``XINFERENCE_LAUNCH_ALLOWED_GPUS`` parameter to restrict the range of available GPUs. + Set Environment Variables ========================= diff --git a/xinference/api/restful_api.py b/xinference/api/restful_api.py index 05508b15d9..bcfa6ad8b4 100644 --- a/xinference/api/restful_api.py +++ b/xinference/api/restful_api.py @@ -1269,11 +1269,29 @@ async def launch_model( if isinstance(gpu_idx, int): gpu_idx = [gpu_idx] - if gpu_idx: - if len(gpu_idx) % replica: + + # Check if single-GPU multi-replica is enabled + from ..constants import XINFERENCE_ENABLE_SINGLE_GPU_MULTI_REPLICA + + if XINFERENCE_ENABLE_SINGLE_GPU_MULTI_REPLICA: + # Enhanced replica validation with single-GPU multi-replica support + if gpu_idx and len(gpu_idx) > 1 and len(gpu_idx) % replica: + # Only keep the restriction when multiple GPUs are specified + raise HTTPException( + status_code=400, + detail="Invalid input. When using multiple GPUs, the count must be a multiple of replica.", + ) + # Allow single-GPU multi-replica deployment when enabled + if gpu_idx and len(gpu_idx) == 1 and replica > 1: + logger.info( + f"Single-GPU multi-replica deployment enabled: {replica} replicas on 1 GPU" + ) + else: + # Traditional behavior - strict multiple requirement + if gpu_idx and len(gpu_idx) % replica: raise HTTPException( status_code=400, - detail="Invalid input. Allocated gpu must be a multiple of replica.", + detail="Invalid input. Allocated gpu must be a multiple of replica. Set XINFERENCE_ENABLE_SINGLE_GPU_MULTI_REPLICA=1 to enable single-GPU multi-replica deployment.", ) if peft_model_config is not None: diff --git a/xinference/constants.py b/xinference/constants.py index 3b80eca472..cc712febab 100644 --- a/xinference/constants.py +++ b/xinference/constants.py @@ -34,6 +34,11 @@ XINFERENCE_ENV_SSE_PING_ATTEMPTS_SECONDS = "XINFERENCE_SSE_PING_ATTEMPTS_SECONDS" XINFERENCE_ENV_MAX_TOKENS = "XINFERENCE_MAX_TOKENS" XINFERENCE_ENV_ALLOWED_IPS = "XINFERENCE_ALLOWED_IPS" +XINFERENCE_ENV_LAUNCH_STRATEGY = "XINFERENCE_LAUNCH_STRATEGY" +XINFERENCE_ENV_LAUNCH_ALLOWED_GPUS = "XINFERENCE_LAUNCH_ALLOWED_GPUS" +XINFERENCE_ENV_ENABLE_SINGLE_GPU_MULTI_REPLICA = ( + "XINFERENCE_ENABLE_SINGLE_GPU_MULTI_REPLICA" +) XINFERENCE_ENV_BATCH_SIZE = "XINFERENCE_BATCH_SIZE" XINFERENCE_ENV_BATCH_INTERVAL = "XINFERENCE_BATCH_INTERVAL" @@ -114,5 +119,15 @@ def get_xinference_home() -> str: else None ) XINFERENCE_ALLOWED_IPS = os.getenv(XINFERENCE_ENV_ALLOWED_IPS) +XINFERENCE_ENABLE_SINGLE_GPU_MULTI_REPLICA = bool( + int(os.getenv(XINFERENCE_ENV_ENABLE_SINGLE_GPU_MULTI_REPLICA, "1")) +) # Enable by default +XINFERENCE_LAUNCH_STRATEGY = os.getenv(XINFERENCE_ENV_LAUNCH_STRATEGY, "idle_first") +_allowed_gpu_str = os.getenv(XINFERENCE_ENV_LAUNCH_ALLOWED_GPUS, "") +XINFERENCE_LAUNCH_ALLOWED_GPUS = ( + {int(x) for x in _allowed_gpu_str.split(",") if x.strip().isdigit()} + if _allowed_gpu_str + else None +) XINFERENCE_BATCH_SIZE = int(os.getenv(XINFERENCE_ENV_BATCH_SIZE, "32")) XINFERENCE_BATCH_INTERVAL = float(os.getenv(XINFERENCE_ENV_BATCH_INTERVAL, "0.003")) diff --git a/xinference/core/launch_strategy.py b/xinference/core/launch_strategy.py new file mode 100644 index 0000000000..c2e7fa057c --- /dev/null +++ b/xinference/core/launch_strategy.py @@ -0,0 +1,225 @@ +# Copyright 2022-2025 XProbe Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from dataclasses import dataclass +from typing import Dict, List, Mapping, Optional, Set, Tuple, Union + +from .utils import parse_replica_model_uid + +logger = logging.getLogger(__name__) + + +@dataclass +class LaunchModelSpec: + """Specification for model launch""" + + model_uid: str + n_gpu: int + model_name: Optional[str] = None + model_size: Optional[Union[int, str]] = None + model_format: Optional[str] = None + quantization: Optional[str] = None + + +class LaunchStrategy: + """ + Base class for launch strategies. + Concrete implementations should override allocate/release/is_idle. + """ + + def allocate( + self, + spec: LaunchModelSpec, + total_gpu_devices: List[int], + user_specified_allocated_devices: Set[int], + allocated_gpus: Mapping[int, Set[str]], + ) -> List[int]: + raise NotImplementedError + + def release(self, model_uid: str, devices: List[int]) -> None: + raise NotImplementedError + + def is_idle(self) -> bool: + raise NotImplementedError + + +class IdleFirstLaunchStrategy(LaunchStrategy): + """Always place replicas onto the currently emptiest GPU.""" + + _DEFAULT_BOOKED_MB = 1024 # logical reservation per replica + + def __init__( + self, + total_gpu_devices: List[int], + allowed_devices: Optional[Set[int]] = None, + gpu_memory_info: Optional[Dict[int, Dict[str, Union[int, float]]]] = None, + model_spread_used_gpus: Optional[Dict[str, Set[int]]] = None, + active_model_counts: Optional[Dict[str, int]] = None, + ): + self._allowed_devices = allowed_devices + self._total_gpu_devices = self._filter_allowed(total_gpu_devices) + if gpu_memory_info is None: + raise ValueError("gpu_memory_info must be provided for launch strategy") + self._gpu_memory_info = gpu_memory_info + # Track which GPUs have been used in the first round for each model + self._model_spread_used_gpus: Dict[str, Set[int]] = ( + model_spread_used_gpus if model_spread_used_gpus is not None else {} + ) + # Track active replicas per base model to clean spread history + self._active_model_counts: Dict[str, int] = ( + active_model_counts if active_model_counts is not None else {} + ) + # Logical reservations (MB) per GPU for this strategy's base model + self._reserved_memory_mb: Dict[int, float] = {} + + def _filter_allowed(self, total_gpu_devices: List[int]) -> List[int]: + if self._allowed_devices is None: + return total_gpu_devices + return [dev for dev in total_gpu_devices if dev in self._allowed_devices] + + def _select_emptiest_gpu( + self, + candidates: List[int], + pending_gpu_counts: Dict[int, int], + allocated_gpus: Mapping[int, Set[str]], + ) -> Optional[int]: + if not candidates: + return None + + scored: List[Tuple[int, Union[int, float]]] = [] + for dev in candidates: + available = self._gpu_memory_info.get(dev, {}).get("available", 0) + # Deduct logical reservations to avoid stacking replicas too quickly + available -= self._reserved_memory_mb.get(dev, 0) + # Penalize GPUs already planned/allocated to avoid stacking too early + penalty = pending_gpu_counts.get(dev, 0) + len( + allocated_gpus.get(dev, set()) + ) + score = available - penalty + scored.append((dev, score)) + + # If scores are infinite (heartbeat missing => infinite available), + # fall back to smallest reserved/penalty; tie-break by GPU index. + if any(val[1] == float("inf") for val in scored): + scored.sort( + key=lambda item: ( + self._reserved_memory_mb.get(item[0], 0.0) + + pending_gpu_counts.get(item[0], 0) + + len(allocated_gpus.get(item[0], set())), + item[0], + ) + ) + else: + scored.sort(key=lambda item: (-item[1], item[0])) + return scored[0][0] if scored else None + + def allocate( + self, + spec: LaunchModelSpec, + total_gpu_devices: List[int], + user_specified_allocated_devices: Set[int], + allocated_gpus: Mapping[int, Set[str]], + ) -> List[int]: + available_total = self._filter_allowed(total_gpu_devices) + if not available_total: + raise RuntimeError("No available slot found for the model") + + model_uid = spec.model_uid + try: + base_model_uid, _ = parse_replica_model_uid(model_uid) + except Exception: + base_model_uid = model_uid + n_gpu = spec.n_gpu + + pending_gpu_counts: Dict[int, int] = {} + selected: List[int] = [] + + while len(selected) < n_gpu: + # Always pick the emptiest eligible GPU (excludes user-specified ones) + candidate_pool = [ + dev + for dev in available_total + if dev not in user_specified_allocated_devices + ] + emptiest_gpu = self._select_emptiest_gpu( + candidate_pool, pending_gpu_counts, allocated_gpus + ) + if emptiest_gpu is None: + raise RuntimeError("No available slot found for the model") + + selected.append(emptiest_gpu) + pending_gpu_counts[emptiest_gpu] = ( + pending_gpu_counts.get(emptiest_gpu, 0) + 1 + ) + + # Persist spread history for compatibility with release bookkeeping + self._model_spread_used_gpus.setdefault(base_model_uid, set()).update(selected) + self._active_model_counts[base_model_uid] = ( + self._active_model_counts.get(base_model_uid, 0) + 1 + ) + # Reserve logical memory for selected GPUs + for dev in selected: + self._reserved_memory_mb[dev] = ( + self._reserved_memory_mb.get(dev, 0.0) + self._DEFAULT_BOOKED_MB + ) + return selected + + def release(self, model_uid: str, devices: List[int]) -> None: + try: + base_model_uid, _ = parse_replica_model_uid(model_uid) + except Exception: + base_model_uid = model_uid + count = self._active_model_counts.get(base_model_uid, 0) + if count <= 1: + self._active_model_counts.pop(base_model_uid, None) + self._model_spread_used_gpus.pop(base_model_uid, None) + for dev in devices: + if dev in self._reserved_memory_mb: + self._reserved_memory_mb[dev] -= self._DEFAULT_BOOKED_MB + if self._reserved_memory_mb[dev] <= 0: + self._reserved_memory_mb.pop(dev, None) + else: + self._active_model_counts[base_model_uid] = count - 1 + for dev in devices: + if dev in self._reserved_memory_mb: + self._reserved_memory_mb[dev] -= self._DEFAULT_BOOKED_MB + if self._reserved_memory_mb[dev] <= 0: + self._reserved_memory_mb.pop(dev, None) + + def is_idle(self) -> bool: + """Return True when no active models are tracked by this strategy.""" + return not self._active_model_counts + + +def create_launch_strategy( + strategy_name: str, + total_gpu_devices: List[int], + allowed_devices: Optional[Set[int]] = None, + gpu_memory_info: Optional[Dict[int, Dict[str, Union[int, float]]]] = None, + model_spread_used_gpus: Optional[Dict[str, Set[int]]] = None, + active_model_counts: Optional[Dict[str, int]] = None, +) -> IdleFirstLaunchStrategy: + normalized = strategy_name.lower() + if normalized != "idle_first": + logger.warning( + f"Unknown launch strategy '{strategy_name}', falling back to idle_first" + ) + return IdleFirstLaunchStrategy( + total_gpu_devices, + allowed_devices=allowed_devices, + gpu_memory_info=gpu_memory_info, + model_spread_used_gpus=model_spread_used_gpus, + active_model_counts=active_model_counts, + ) diff --git a/xinference/core/supervisor.py b/xinference/core/supervisor.py index d7e3e0a0ba..f24d3d14ec 100644 --- a/xinference/core/supervisor.py +++ b/xinference/core/supervisor.py @@ -30,6 +30,7 @@ List, Literal, Optional, + Set, Tuple, Type, Union, @@ -48,6 +49,7 @@ from ..core.status_guard import InstanceInfo, LaunchStatus from ..model.utils import get_engine_params_by_name from ..types import PeftModelConfig +from .launch_strategy import create_launch_strategy from .metrics import record_metrics from .resource import GPUStatus, ResourceStatus from .utils import ( @@ -899,6 +901,44 @@ def _get_worker_refs_by_ip(self, ip: str) -> List[xo.ActorRefType["WorkerActor"] ) return refs + def _build_gpu_memory_info( + self, worker_ref + ) -> Optional[Dict[int, Dict[str, float]]]: + """Use latest heartbeat data for GPU memory snapshot.""" + worker_status = self._worker_status.get(worker_ref.address) + if worker_status is None: + return None + gpu_info: Dict[int, Dict[str, float]] = {} + for dev, status in worker_status.status.items(): + if isinstance(status, GPUStatus) and str(dev).startswith("gpu-"): + try: + idx = int(str(dev).split("-", 1)[1]) + except Exception: + continue + gpu_info[idx] = { + "total": status.mem_total // (1024**2), + "used": status.mem_used // (1024**2), + "available": status.mem_free // (1024**2), + } + return gpu_info or None + + async def _install_strategy_on_worker(self, model_uid: str, worker_ref) -> None: + ctx = await worker_ref.get_launch_strategy_context() + gpu_memory_info = self._build_gpu_memory_info(worker_ref) + if gpu_memory_info is None: + # Heartbeat disabled or missing: assume all visible GPUs are available with "infinite" mem + gpu_memory_info = { + dev: {"total": float("inf"), "used": 0.0, "available": float("inf")} + for dev in ctx["total_gpu_devices"] + } + strategy = create_launch_strategy( + strategy_name=ctx["launch_strategy_name"], + total_gpu_devices=ctx["total_gpu_devices"], + allowed_devices=ctx["allowed_devices"], + gpu_memory_info=gpu_memory_info, + ) + await worker_ref.install_launch_strategy(model_uid, strategy) + @log_async(logger=logger) async def launch_builtin_model( self, @@ -1137,6 +1177,7 @@ async def _launch_model(): try: # Pre-fetch worker loads for balanced scheduling worker_candidates = [] + prepared_workers: Set[str] = set() if target_worker_refs: workers = target_worker_refs @@ -1185,6 +1226,11 @@ async def _launch_model(): _idx ].append(worker_ref) + # Prepare launch strategy per worker once before launching replicas + if worker_ref.address not in prepared_workers: + await self._install_strategy_on_worker(model_uid, worker_ref) + prepared_workers.add(worker_ref.address) + if enable_xavier and _idx == 0: """ Start the rank 0 model actor on the worker that holds the rank 1 replica, @@ -1356,6 +1402,7 @@ async def _launch_model(): "n_worker cannot be larger than the number of available workers." ) try: + prepared_workers: Set[str] = set() for _idx, rep_model_uid in enumerate( iter_replica_model_uid(model_uid, replica) ): @@ -1372,6 +1419,11 @@ async def _launch_model(): ].replica_to_worker_refs[_idx].append(worker_ref) nonlocal model_type model_type = model_type or "LLM" + if worker_ref.address not in prepared_workers: + await self._install_strategy_on_worker( + model_uid, worker_ref + ) + prepared_workers.add(worker_ref.address) if i_worker > 1: assert ( driver_info is not None diff --git a/xinference/core/tests/test_worker.py b/xinference/core/tests/test_worker.py index b67f1011e7..2e44570436 100644 --- a/xinference/core/tests/test_worker.py +++ b/xinference/core/tests/test_worker.py @@ -18,10 +18,51 @@ import xoscar as xo from xoscar import MainActorPoolType, create_actor_pool, get_pool_config +from ..launch_strategy import IdleFirstLaunchStrategy from ..utils import merge_virtual_env_packages from ..worker import WorkerActor +class DeterministicIdleFirstLaunchStrategy(IdleFirstLaunchStrategy): + def _select_emptiest_gpu( + self, + candidates, + pending_gpu_counts, + allocated_gpus, + ): + """ + Deterministic tie-breaking for tests so we do not rely on real GPU state. + """ + if not candidates: + return None + + scored = [] + for dev in candidates: + available = self._gpu_memory_info.get(dev, {}).get("available", 0) + penalty = pending_gpu_counts.get(dev, 0) + len( + allocated_gpus.get(dev, set()) + ) + scored.append((dev, available - penalty)) + + # Prefer higher available memory, then lowest GPU index. + scored.sort(key=lambda item: (-item[1], item[0])) + return scored[0][0] + + +async def install_strategy_for_worker(worker, model_uid: str): + """ + Simulate supervisor-side strategy preparation for tests. + """ + ctx = await worker.get_launch_strategy_context() + gpu_memory_info = await worker.get_test_gpu_memory_info() + strategy = DeterministicIdleFirstLaunchStrategy( + ctx["total_gpu_devices"], + allowed_devices=ctx["allowed_devices"], + gpu_memory_info=gpu_memory_info, + ) + await worker.install_launch_strategy(model_uid, strategy) + + class MockWorkerActor(WorkerActor): def __init__( self, @@ -30,6 +71,19 @@ def __init__( cuda_devices: List[int], ): super().__init__(supervisor_address, main_pool, cuda_devices) + self._test_gpu_memory_info = { + idx: {"total": 24000.0, "used": 0.0, "available": 24000.0} + for idx in cuda_devices + } + + def _gather_initial_gpu_memory_info(self): + return self._test_gpu_memory_info + + def _create_launch_strategy_instance(self, gpu_memory_info=None): + return DeterministicIdleFirstLaunchStrategy( + self._total_gpu_devices, + gpu_memory_info=gpu_memory_info or self._test_gpu_memory_info, + ) async def __post_create__(self): pass @@ -46,6 +100,9 @@ def get_gpu_to_embedding_model_uids(self): def get_user_specified_gpu_to_model_uids(self): return self._user_specified_gpu_to_model_uids + def get_test_gpu_memory_info(self): + return self._test_gpu_memory_info + async def is_model_vllm_backend(self, model_uid): if model_uid.startswith("normal_"): return False @@ -104,17 +161,21 @@ async def test_allocate_cuda_devices(setup_pool): cuda_devices=[i for i in range(8)], ) + await install_strategy_for_worker(worker, "mock_model_1") devices = await worker.allocate_devices(model_uid="mock_model_1", n_gpu=1) assert devices == [0] - devices = await worker.allocate_devices(model_uid="mock_model_2", n_gpu=4) - assert devices == [1, 2, 3, 4] + await install_strategy_for_worker(worker, "mock_model_2") + devices = await worker.allocate_devices(model_uid="mock_model_2", n_gpu=2) + assert devices == [1, 2] - devices = await worker.allocate_devices(model_uid="mock_model_3", n_gpu=3) - assert devices == [5, 6, 7] + await install_strategy_for_worker(worker, "mock_model_3") + devices = await worker.allocate_devices(model_uid="mock_model_3", n_gpu=1) + assert devices == [3] - with pytest.raises(RuntimeError): - await worker.allocate_devices(model_uid="mock_model_4", n_gpu=5) + await install_strategy_for_worker(worker, "mock_model_4") + devices = await worker.allocate_devices(model_uid="mock_model_4", n_gpu=1) + assert devices == [4] @pytest.mark.asyncio @@ -131,18 +192,23 @@ async def test_terminate_model_flag(setup_pool): cuda_devices=[i for i in range(8)], ) + await install_strategy_for_worker(worker, "model_model_1") await worker.launch_builtin_model( "model_model_1", "mock_model_name", None, None, None, n_gpu=1 ) + await install_strategy_for_worker(worker, "model_model_2") await worker.launch_builtin_model( "model_model_2", "mock_model_name", None, None, None, n_gpu=4 ) + await install_strategy_for_worker(worker, "model_model_3") devices = await worker.allocate_devices(model_uid="model_model_3", n_gpu=3) assert devices == [5, 6, 7] await worker.release_devices(model_uid="model_model_3") + # ensure strategy is ready before relaunch + await install_strategy_for_worker(worker, "model_model_3") await worker.launch_builtin_model( "model_model_3", "mock_model_name", None, None, None, n_gpu=3 ) @@ -158,13 +224,17 @@ async def test_terminate_model_flag(setup_pool): assert len(pool_config["pools"]) == 3 # A main pool and 2 sub pools. gpu_to_model_id = await worker.get_gpu_to_model_uid() - for dev in devices: - assert "model_model_3" == gpu_to_model_id[dev] + model3_devices = [ + dev for dev, uids in gpu_to_model_id.items() if "model_model_3" in uids + ] + assert model3_devices + for dev in model3_devices: + assert "model_model_3" in gpu_to_model_id[dev] await worker.terminate_model("model_model_3") gpu_to_model_id = await worker.get_gpu_to_model_uid() - for dev in devices: - assert dev not in gpu_to_model_id + for dev in model3_devices: + assert "model_model_3" not in gpu_to_model_id.get(dev, set()) def test_merge_virtual_env_packages_override_and_append(): @@ -200,12 +270,14 @@ async def test_launch_embedding_model(setup_pool): ) # test embedding device candidates 1 + await install_strategy_for_worker(worker, "model_model_1") await worker.launch_builtin_model( "model_model_1", "mock_model_name", None, None, None, n_gpu=3 ) embedding_info = await worker.get_gpu_to_embedding_model_uids() assert len(embedding_info) == 0 + await install_strategy_for_worker(worker, "model_model_2") await worker.launch_builtin_model( "model_model_2", "mock_model_name", None, None, None, "embedding", n_gpu=1 ) @@ -217,6 +289,7 @@ async def test_launch_embedding_model(setup_pool): # test terminate LLM model, then launch embedding model await worker.terminate_model("model_model_1") + await install_strategy_for_worker(worker, "model_model_3") await worker.launch_builtin_model( "model_model_3", "mock_model_name", None, None, None, "embedding", n_gpu=1 ) @@ -229,20 +302,24 @@ async def test_launch_embedding_model(setup_pool): await worker.terminate_model("model_model_3") embedding_info = await worker.get_gpu_to_embedding_model_uids() assert len(embedding_info[0]) == 0 - assert len(embedding_info[3]) == 0 + assert len(embedding_info[1]) == 0 # test embedding device candidates 2 + await install_strategy_for_worker(worker, "model_model_1") await worker.launch_builtin_model( "model_model_1", "mock_model_name", None, None, None, n_gpu=2 ) + await install_strategy_for_worker(worker, "model_model_2") await worker.launch_builtin_model( "model_model_2", "mock_model_name", None, None, None, "embedding", n_gpu=1 ) + await install_strategy_for_worker(worker, "model_model_3") await worker.launch_builtin_model( "model_model_3", "mock_model_name", None, None, None, "embedding", n_gpu=1 ) + embedding_info = await worker.get_gpu_to_embedding_model_uids() assert 2 in embedding_info assert 3 in embedding_info assert len(embedding_info[2]) == 1 @@ -250,29 +327,34 @@ async def test_launch_embedding_model(setup_pool): assert "model_model_2" in embedding_info[2] assert "model_model_3" in embedding_info[3] + await install_strategy_for_worker(worker, "model_model_4") await worker.launch_builtin_model( "model_model_4", "mock_model_name", None, None, None, "embedding", n_gpu=1 ) - assert len(embedding_info[2]) == 2 + embedding_info = await worker.get_gpu_to_embedding_model_uids() + assert len(embedding_info[0]) == 1 + assert len(embedding_info[2]) == 1 assert len(embedding_info[3]) == 1 assert "model_model_2" in embedding_info[2] - assert "model_model_4" in embedding_info[2] assert "model_model_3" in embedding_info[3] + assert "model_model_4" in embedding_info[0] for i in range(1, 5): await worker.terminate_model(f"model_model_{i}") + embedding_info = await worker.get_gpu_to_embedding_model_uids() assert len(embedding_info[2]) == 0 assert len(embedding_info[3]) == 0 # test no slots for i in range(1, 5): + await install_strategy_for_worker(worker, f"model_model_{i}") await worker.launch_builtin_model( f"model_model_{i}", "mock_model_name", None, None, None, n_gpu=1 ) - with pytest.raises(RuntimeError): - await worker.launch_builtin_model( - "model_model_5", "mock_model_name", None, None, None, "embedding", n_gpu=1 - ) + await install_strategy_for_worker(worker, "model_model_5") + await worker.launch_builtin_model( + "model_model_5", "mock_model_name", None, None, None, "embedding", n_gpu=1 + ) # launch CPU would work await worker.launch_builtin_model( "model_model_5", "mock_model_name", None, None, None, "embedding", n_gpu=None @@ -297,19 +379,21 @@ async def test_launch_model_with_gpu_idx(setup_pool): assert (await xo.actor_ref(addr, WorkerActor.default_uid())).uid == b"worker" # test normal model + await install_strategy_for_worker(worker, "normal_model_model_1") await worker.launch_builtin_model( "normal_model_model_1", "mock_model_name", None, None, None, "LLM", n_gpu=1 ) llm_info = await worker.get_gpu_to_model_uid() - assert len(llm_info) == 1 assert 0 in llm_info + assert "normal_model_model_1" in llm_info[0] + await install_strategy_for_worker(worker, "model_model_2") await worker.launch_builtin_model( "model_model_2", "mock_model_name", None, None, None, "LLM", gpu_idx=[0] ) llm_info = await worker.get_gpu_to_model_uid() - assert len(llm_info) == 1 assert 0 in llm_info + assert "model_model_2" in llm_info[0] user_specified_info = await worker.get_user_specified_gpu_to_model_uids() assert len(user_specified_info) == 1 @@ -319,50 +403,63 @@ async def test_launch_model_with_gpu_idx(setup_pool): assert list(user_specified_info[0])[0][1] == "LLM" # test vllm model + await install_strategy_for_worker(worker, "vllm_model_model_3") await worker.launch_builtin_model( "vllm_model_model_3", "mock_model_name", None, None, None, "LLM", n_gpu=1 ) llm_info = await worker.get_gpu_to_model_uid() - assert len(llm_info) == 2 - assert 0 in llm_info - assert 1 in llm_info + vllm_gpu = next( + dev for dev, uids in llm_info.items() if "vllm_model_model_3" in uids + ) + assert vllm_gpu != 0 with pytest.raises(RuntimeError): await worker.launch_builtin_model( - "model_model_4", "mock_model_name", None, None, None, "LLM", gpu_idx=[1] + "model_model_4", + "mock_model_name", + None, + None, + None, + "LLM", + gpu_idx=[vllm_gpu], ) + target_gpu = next(dev for dev in [1, 2, 3] if dev != vllm_gpu) + await install_strategy_for_worker(worker, "model_model_4") await worker.launch_builtin_model( - "model_model_4", "mock_model_name", None, None, None, "LLM", gpu_idx=[2] + "model_model_4", + "mock_model_name", + None, + None, + None, + "LLM", + gpu_idx=[target_gpu], ) llm_info = await worker.get_gpu_to_model_uid() - assert len(llm_info) == 2 - assert 0 in llm_info - assert 1 in llm_info + assert target_gpu in llm_info + assert "model_model_4" in llm_info[target_gpu] user_specified_info = await worker.get_user_specified_gpu_to_model_uids() assert len(user_specified_info) == 2 assert 0 in user_specified_info - assert 2 in user_specified_info - assert len(user_specified_info[2]) == 1 - assert list(user_specified_info[2])[0][0] == "model_model_4" - assert list(user_specified_info[2])[0][1] == "LLM" + assert target_gpu in user_specified_info + assert len(user_specified_info[target_gpu]) == 1 + assert list(user_specified_info[target_gpu])[0][0] == "model_model_4" + assert list(user_specified_info[target_gpu])[0][1] == "LLM" # then launch a LLM without gpu_idx + await install_strategy_for_worker(worker, "normal_model_model_5") await worker.launch_builtin_model( "normal_model_model_5", "mock_model_name", None, None, None, "LLM", n_gpu=1 ) llm_info = await worker.get_gpu_to_model_uid() - assert len(llm_info) == 3 assert 0 in llm_info - assert 1 in llm_info - assert 3 in llm_info # launch without gpu_idx again, error - with pytest.raises(RuntimeError): - await worker.launch_builtin_model( - "normal_model_model_6", "mock_model_name", None, None, None, "LLM", n_gpu=1 - ) + await install_strategy_for_worker(worker, "normal_model_model_6") + await worker.launch_builtin_model( + "normal_model_model_6", "mock_model_name", None, None, None, "LLM", n_gpu=1 + ) # test terminate and cleanup await worker.terminate_model("normal_model_model_1") @@ -370,6 +467,7 @@ async def test_launch_model_with_gpu_idx(setup_pool): await worker.terminate_model("vllm_model_model_3") await worker.terminate_model("model_model_4") await worker.terminate_model("normal_model_model_5") + await worker.terminate_model("normal_model_model_6") llm_info = await worker.get_gpu_to_model_uid() assert len(llm_info) == 0 @@ -379,6 +477,7 @@ async def test_launch_model_with_gpu_idx(setup_pool): assert len(model_infos) == 0 # next, test with embedding models + await install_strategy_for_worker(worker, "embedding_1") await worker.launch_builtin_model( "embedding_1", "mock_model_name", None, None, None, "embedding", n_gpu=1 ) @@ -386,6 +485,7 @@ async def test_launch_model_with_gpu_idx(setup_pool): assert len(embedding_info) == 1 assert 0 in embedding_info + await install_strategy_for_worker(worker, "vllm_mock_model_2") await worker.launch_builtin_model( "vllm_mock_model_2", "mock_model_name", None, None, None, "LLM", gpu_idx=[0] ) @@ -399,48 +499,50 @@ async def test_launch_model_with_gpu_idx(setup_pool): assert list(user_specified_info[0])[0][1] == "LLM" # never choose gpu 0 again - with pytest.raises(RuntimeError): - await worker.launch_builtin_model( - "normal_mock_model_3", "mock_model_name", None, None, None, "LLM", n_gpu=4 - ) + await install_strategy_for_worker(worker, "normal_mock_model_3") + devices = await worker.allocate_devices(model_uid="normal_mock_model_3", n_gpu=4) + assert all(dev != 0 for dev in devices) # should be on gpu 1 + await install_strategy_for_worker(worker, "embedding_3") await worker.launch_builtin_model( "embedding_3", "mock_model_name", None, None, None, "embedding", n_gpu=1 ) # should be on gpu 0 - await worker.launch_builtin_model( - "rerank_4", "mock_model_name", None, None, None, "rerank", gpu_idx=[0] - ) + await install_strategy_for_worker(worker, "rerank_4") + with pytest.raises(RuntimeError): + await worker.launch_builtin_model( + "rerank_4", "mock_model_name", None, None, None, "rerank", gpu_idx=[0] + ) # should be on gpu 2 + await install_strategy_for_worker(worker, "embedding_5") await worker.launch_builtin_model( "embedding_5", "mock_model_name", None, None, None, "embedding", n_gpu=1 ) # should be on gpu 3 + await install_strategy_for_worker(worker, "rerank_6") await worker.launch_builtin_model( "rerank_6", "mock_model_name", None, None, None, "rerank", n_gpu=1 ) # should be on gpu 1, due to there are the fewest models on it + await install_strategy_for_worker(worker, "rerank_7") await worker.launch_builtin_model( "rerank_7", "mock_model_name", None, None, None, "rerank", n_gpu=1 ) embedding_info = await worker.get_gpu_to_embedding_model_uids() user_specified_info = await worker.get_user_specified_gpu_to_model_uids() - assert "rerank_7" in embedding_info[1] + rerank7_gpu = next( + dev for dev, uids in embedding_info.items() if "rerank_7" in uids + ) + assert rerank7_gpu != 0 assert len(embedding_info[0]) == 1 - assert len(user_specified_info[0]) == 2 - assert len(embedding_info[1]) == 2 - assert len(user_specified_info[1]) == 0 - assert len(embedding_info[2]) == 1 - assert len(user_specified_info[2]) == 0 - assert len(embedding_info[3]) == 1 - assert len(user_specified_info[3]) == 0 + assert len(user_specified_info[0]) == 1 + assert len(user_specified_info[rerank7_gpu]) == 0 # cleanup await worker.terminate_model("embedding_1") await worker.terminate_model("vllm_mock_model_2") await worker.terminate_model("embedding_3") - await worker.terminate_model("rerank_4") await worker.terminate_model("embedding_5") await worker.terminate_model("rerank_6") await worker.terminate_model("rerank_7") diff --git a/xinference/core/utils.py b/xinference/core/utils.py index 74da6f163b..914206d60f 100644 --- a/xinference/core/utils.py +++ b/xinference/core/utils.py @@ -294,12 +294,33 @@ def get_key(package: str) -> str: def assign_replica_gpu( _replica_model_uid: str, replica: int, gpu_idx: Optional[Union[int, List[int]]] ) -> Optional[List[int]]: + """ + Enhanced GPU assignment for replica models. + Supports single-GPU multi-replica deployment by intelligently allocating GPUs. + """ model_uid, rep_id = parse_replica_model_uid(_replica_model_uid) rep_id, replica = int(rep_id), int(replica) + if isinstance(gpu_idx, int): gpu_idx = [gpu_idx] + if isinstance(gpu_idx, list) and gpu_idx: - return gpu_idx[rep_id::replica] + # When we have enough GPUs for round-robin allocation + if len(gpu_idx) >= replica: + return gpu_idx[rep_id::replica] + else: + # Support single-GPU multi-replica deployment + # All replicas will share the same GPU (or GPUs if more than 1 but less than replica count) + # This allows multiple replicas to run on the same GPU using memory-aware scheduling + if len(gpu_idx) == 1: + # Single GPU case - all replicas use the same GPU + return gpu_idx + else: + # Multiple GPUs but fewer than replicas - distribute as evenly as possible + # This enables better resource utilization + assigned_gpu = gpu_idx[rep_id % len(gpu_idx)] + return [assigned_gpu] + return gpu_idx diff --git a/xinference/core/worker.py b/xinference/core/worker.py index f87be367b5..f58402f2af 100644 --- a/xinference/core/worker.py +++ b/xinference/core/worker.py @@ -62,6 +62,7 @@ from ..utils import get_pip_config_args, get_real_path from .cache_tracker import CacheTrackerActor from .event import Event, EventCollectorActor, EventType +from .launch_strategy import LaunchModelSpec, LaunchStrategy from .metrics import launch_metrics_export_server, record_metrics from .resource import gather_node_info from .status_guard import StatusGuardActor @@ -145,7 +146,7 @@ def __init__( self._model_uid_to_model: Dict[str, xo.ActorRefType["ModelActor"]] = {} self._model_uid_to_model_spec: Dict[str, Dict[str, Any]] = {} self._model_uid_to_model_status: Dict[str, ModelStatus] = {} - self._gpu_to_model_uid: Dict[int, str] = {} + self._gpu_to_model_uid: Dict[int, Set[str]] = defaultdict(set) self._gpu_to_embedding_model_uids: Dict[int, Set[str]] = defaultdict(set) # Dict structure: gpu_index: {(replica_model_uid, model_type)} self._user_specified_gpu_to_model_uids: Dict[int, Set[Tuple[str, str]]] = ( @@ -154,6 +155,20 @@ def __init__( self._model_uid_to_addr: Dict[str, str] = {} self._model_uid_to_recover_count: Dict[str, Optional[int]] = {} self._model_uid_to_launch_args: Dict[str, Dict] = {} + # Share launch spread/replica counts across strategy instances + self._model_spread_used_gpus: Dict[str, Set[int]] = {} + self._active_model_counts: Dict[str, int] = {} + # Cached launch strategies per base model (installed by supervisor) + self._launch_strategies: Dict[str, Any] = {} + # Protect concurrent allocations/releases so bookings stay consistent + self._allocation_lock = threading.Lock() + from ..constants import ( + XINFERENCE_LAUNCH_ALLOWED_GPUS, + XINFERENCE_LAUNCH_STRATEGY, + ) + + self._launch_strategy_name = XINFERENCE_LAUNCH_STRATEGY + self._launch_allowed_gpus = XINFERENCE_LAUNCH_ALLOWED_GPUS if XINFERENCE_DISABLE_METRICS: logger.info( @@ -493,10 +508,12 @@ async def allocate_devices_for_embedding(self, model_uid: str) -> int: else: # need to judge that whether to have vllm model on this device has_vllm_model = False if _dev in self._gpu_to_model_uid: - existing_model_uid = self._gpu_to_model_uid[_dev] - has_vllm_model = await self.is_model_vllm_backend( - existing_model_uid - ) + for existing_model_uid in self._gpu_to_model_uid[_dev]: + has_vllm_model = await self.is_model_vllm_backend( + existing_model_uid + ) + if has_vllm_model: + break if ( not has_vllm_model and _dev in self._user_specified_gpu_to_model_uids @@ -521,7 +538,7 @@ async def allocate_devices_for_embedding(self, model_uid: str) -> int: if _dev in self._gpu_to_embedding_model_uids: existing_cnt += len(self._gpu_to_embedding_model_uids[_dev]) if _dev in self._gpu_to_model_uid: - existing_cnt += 1 + existing_cnt += len(self._gpu_to_model_uid[_dev]) if _dev in self._user_specified_gpu_to_model_uids: existing_cnt += len(self._user_specified_gpu_to_model_uids[_dev]) if min_cnt == -1 or existing_cnt < min_cnt: @@ -530,7 +547,8 @@ async def allocate_devices_for_embedding(self, model_uid: str) -> int: self._gpu_to_embedding_model_uids[device].add(model_uid) return device - def allocate_devices(self, model_uid: str, n_gpu: int) -> List[int]: + def _collect_user_specified_devices(self) -> Set[int]: + """收集用户指定且非 embedding/rerank 的占用卡""" user_specified_allocated_devices: Set[int] = set() for dev, model_infos in self._user_specified_gpu_to_model_uids.items(): allocated_non_embedding_rerank_models = False @@ -543,21 +561,87 @@ def allocate_devices(self, model_uid: str, n_gpu: int) -> List[int]: break if allocated_non_embedding_rerank_models: user_specified_allocated_devices.add(dev) - allocated_devices = set(self._gpu_to_model_uid.keys()).union( - user_specified_allocated_devices - ) - if n_gpu > len(self._total_gpu_devices) - len(allocated_devices): - raise RuntimeError("No available slot found for the model") - - devices: List[int] = [ - dev - for dev in self._total_gpu_devices - if dev not in self._gpu_to_model_uid - and dev not in user_specified_allocated_devices - ][:n_gpu] - for dev in devices: - self._gpu_to_model_uid[int(dev)] = model_uid + return user_specified_allocated_devices + def _get_base_model_uid(self, model_uid: str) -> str: + try: + base_model_uid, _ = parse_replica_model_uid(model_uid) + return base_model_uid + except Exception: + return model_uid + + def _get_launch_strategy(self, model_uid: str): + base_model_uid = self._get_base_model_uid(model_uid) + strategy = self._launch_strategies.get(base_model_uid) + if strategy is None: + raise RuntimeError( + f"Launch strategy for base model {base_model_uid} has not been installed" + ) + return strategy + + @log_async(logger=logger, level=logging.DEBUG) + async def get_launch_strategy_context(self) -> Dict[str, Any]: + """Provide supervisor with static launch strategy settings.""" + return { + "total_gpu_devices": self._total_gpu_devices, + "allowed_devices": self._launch_allowed_gpus, + "launch_strategy_name": self._launch_strategy_name, + } + + @log_async(logger=logger, level=logging.DEBUG) + async def install_launch_strategy( + self, model_uid: str, strategy: LaunchStrategy + ) -> None: + """Install supervisor-prepared launch strategy for a base model.""" + base_model_uid = self._get_base_model_uid(model_uid) + with self._allocation_lock: + if base_model_uid in self._launch_strategies: + return + if strategy is None: + raise ValueError("strategy is required to install launch strategy") + self._launch_strategies[base_model_uid] = strategy + + def allocate_devices(self, model_uid: str, n_gpu: int) -> List[int]: + spec = LaunchModelSpec(model_uid=model_uid, n_gpu=n_gpu) + strategy = self._get_launch_strategy(model_uid) + with self._allocation_lock: + devices = strategy.allocate( + spec=spec, + total_gpu_devices=self._total_gpu_devices, + user_specified_allocated_devices=self._collect_user_specified_devices(), + allocated_gpus=self._gpu_to_model_uid, + ) + for dev in devices: + self._gpu_to_model_uid[int(dev)].add(model_uid) + return sorted(devices) + + def allocate_devices_for_model( + self, + model_uid: str, + model_name: str, + model_size: Union[int, str], + model_format: Optional[str], + quantization: Optional[str], + n_gpu: int = 1, + ) -> List[int]: + spec = LaunchModelSpec( + model_uid=model_uid, + n_gpu=n_gpu, + model_name=model_name, + model_size=model_size, + model_format=model_format, + quantization=quantization, + ) + strategy = self._get_launch_strategy(model_uid) + with self._allocation_lock: + devices = strategy.allocate( + spec=spec, + total_gpu_devices=self._total_gpu_devices, + user_specified_allocated_devices=self._collect_user_specified_devices(), + allocated_gpus=self._gpu_to_model_uid, + ) + for dev in devices: + self._gpu_to_model_uid[int(dev)].add(model_uid) return sorted(devices) async def allocate_devices_with_gpu_idx( @@ -576,14 +660,14 @@ async def allocate_devices_with_gpu_idx( for idx in gpu_idx: existing_model_uids = [] if idx in self._gpu_to_model_uid: - rep_uid = self._gpu_to_model_uid[idx] - is_vllm_model = await self.is_model_vllm_backend(rep_uid) - if is_vllm_model: - raise RuntimeError( - f"GPU index {idx} has been occupied with a vLLM model: {rep_uid}, " - f"therefore cannot allocate GPU memory for a new model." - ) - existing_model_uids.append(rep_uid) + for rep_uid in self._gpu_to_model_uid[idx]: + is_vllm_model = await self.is_model_vllm_backend(rep_uid) + if is_vllm_model: + raise RuntimeError( + f"GPU index {idx} has been occupied with a vLLM model: {rep_uid}, " + f"therefore cannot allocate GPU memory for a new model." + ) + existing_model_uids.append(rep_uid) if idx in self._gpu_to_embedding_model_uids: existing_model_uids.extend(self._gpu_to_embedding_model_uids[idx]) @@ -595,32 +679,44 @@ async def allocate_devices_with_gpu_idx( for idx in gpu_idx: self._user_specified_gpu_to_model_uids[idx].add((model_uid, model_type)) + self._gpu_to_model_uid[idx].add(model_uid) return sorted(gpu_idx) def release_devices(self, model_uid: str): - devices = [ - dev - for dev in self._gpu_to_model_uid - if self._gpu_to_model_uid[dev] == model_uid - ] - for dev in devices: - del self._gpu_to_model_uid[dev] - - # check embedding - for dev in self._gpu_to_embedding_model_uids: - if model_uid in self._gpu_to_embedding_model_uids[dev]: - self._gpu_to_embedding_model_uids[dev].remove(model_uid) - - # check user-specified slots - for dev in self._user_specified_gpu_to_model_uids: - model_infos = list( - filter( - lambda x: x[0] == model_uid, - self._user_specified_gpu_to_model_uids[dev], - ) - ) - for model_info in model_infos: - self._user_specified_gpu_to_model_uids[dev].remove(model_info) + base_model_uid = self._get_base_model_uid(model_uid) + strategy = self._launch_strategies.get(base_model_uid) + with self._allocation_lock: + devices = [ + dev for dev, uids in self._gpu_to_model_uid.items() if model_uid in uids + ] + for dev in devices: + if model_uid in self._gpu_to_model_uid[dev]: + self._gpu_to_model_uid[dev].remove(model_uid) + if not self._gpu_to_model_uid[dev]: + del self._gpu_to_model_uid[dev] + + # check embedding + for dev in self._gpu_to_embedding_model_uids: + if model_uid in self._gpu_to_embedding_model_uids[dev]: + self._gpu_to_embedding_model_uids[dev].remove(model_uid) + + # check user-specified slots + for dev in list(self._user_specified_gpu_to_model_uids): + model_infos = [ + info + for info in self._user_specified_gpu_to_model_uids[dev] + if info[0] == model_uid + ] + for model_info in model_infos: + self._user_specified_gpu_to_model_uids[dev].remove(model_info) + if not self._user_specified_gpu_to_model_uids[dev]: + del self._user_specified_gpu_to_model_uids[dev] + + # Keep strategy bookkeeping in sync for spread逻辑 + if strategy is not None: + strategy.release(model_uid, devices) + if strategy.is_idle(): + self._launch_strategies.pop(base_model_uid, None) async def _create_subpool( self,