Skip to content

Commit 486131f

Browse files
committed
new strategy
1 parent d973a54 commit 486131f

File tree

5 files changed

+174
-127
lines changed

5 files changed

+174
-127
lines changed

xinference/core/launch_strategy.py

Lines changed: 67 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from dataclasses import dataclass
1717
from typing import Dict, List, Mapping, Optional, Set, Tuple, Union
1818

19-
from ..device_utils import initialize_gpu_memory_info, update_gpu_memory_info
19+
from ..device_utils import update_gpu_memory_info
2020
from .utils import parse_replica_model_uid
2121

2222
logger = logging.getLogger(__name__)
@@ -34,12 +34,36 @@ class LaunchModelSpec:
3434
quantization: Optional[str] = None
3535

3636

37-
class IdleFirstLaunchStrategy:
37+
class LaunchStrategy:
38+
"""
39+
Base class for launch strategies.
40+
Concrete implementations should override allocate/release/is_idle.
41+
"""
42+
43+
def allocate(
44+
self,
45+
spec: LaunchModelSpec,
46+
total_gpu_devices: List[int],
47+
user_specified_allocated_devices: Set[int],
48+
allocated_gpus: Mapping[int, Set[str]],
49+
) -> List[int]:
50+
raise NotImplementedError
51+
52+
def release(self, model_uid: str, devices: List[int]) -> None:
53+
raise NotImplementedError
54+
55+
def is_idle(self) -> bool:
56+
raise NotImplementedError
57+
58+
59+
class IdleFirstLaunchStrategy(LaunchStrategy):
3860
"""
3961
Prefer the GPU running Xinference, otherwise keep allocating onto the emptiest
4062
remaining GPU.
4163
"""
4264

65+
_DEFAULT_BOOKED_MB = 1024 # logical reservation per replica
66+
4367
def __init__(
4468
self,
4569
total_gpu_devices: List[int],
@@ -50,9 +74,9 @@ def __init__(
5074
):
5175
self._allowed_devices = allowed_devices
5276
self._total_gpu_devices = self._filter_allowed(total_gpu_devices)
53-
self._gpu_memory_info = gpu_memory_info or initialize_gpu_memory_info(
54-
self._total_gpu_devices, logger=logger
55-
)
77+
if gpu_memory_info is None:
78+
raise ValueError("gpu_memory_info must be provided for launch strategy")
79+
self._gpu_memory_info = gpu_memory_info
5680
# Track which GPUs have been used in the first round for each model
5781
self._model_spread_used_gpus: Dict[str, Set[int]] = (
5882
model_spread_used_gpus if model_spread_used_gpus is not None else {}
@@ -61,6 +85,8 @@ def __init__(
6185
self._active_model_counts: Dict[str, int] = (
6286
active_model_counts if active_model_counts is not None else {}
6387
)
88+
# Logical reservations (MB) per GPU for this strategy's base model
89+
self._reserved_memory_mb: Dict[int, float] = {}
6490

6591
def _filter_allowed(self, total_gpu_devices: List[int]) -> List[int]:
6692
if self._allowed_devices is None:
@@ -80,13 +106,15 @@ def _select_emptiest_gpu(
80106
for dev in candidates:
81107
update_gpu_memory_info(self._gpu_memory_info, dev, logger=logger)
82108
available = self._gpu_memory_info.get(dev, {}).get("available", 0)
109+
# Deduct logical reservations to avoid stacking replicas too quickly
110+
available -= self._reserved_memory_mb.get(dev, 0)
83111
# Penalize GPUs already planned/allocated to avoid stacking too early
84112
penalty = pending_gpu_counts.get(dev, 0) + len(
85113
allocated_gpus.get(dev, set())
86114
)
87115
scored.append((dev, available - penalty))
88116

89-
scored.sort(key=lambda item: item[1], reverse=True)
117+
scored.sort(key=lambda item: (-item[1], item[0]))
90118
return scored[0][0] if scored else None
91119

92120
def allocate(
@@ -112,20 +140,23 @@ def allocate(
112140
selected: List[int] = []
113141

114142
while len(selected) < n_gpu:
115-
# If some GPUs haven't received a replica for this model yet, try them first
116-
if len(used_in_spread) < len(available_total):
143+
# Prefer truly idle GPUs first: those without existing allocations
144+
unoccupied_gpus = [
145+
dev
146+
for dev in available_total
147+
if dev not in user_specified_allocated_devices
148+
and not allocated_gpus.get(dev)
149+
]
150+
spreading_phase = bool(unoccupied_gpus) and len(used_in_spread) < len(
151+
unoccupied_gpus
152+
)
153+
if spreading_phase:
154+
# First round: try to place replicas on distinct, unoccupied GPUs
117155
candidate_pool = [
118-
dev
119-
for dev in available_total
120-
if dev not in user_specified_allocated_devices
121-
and dev not in used_in_spread
156+
dev for dev in unoccupied_gpus if dev not in used_in_spread
122157
]
123158
if not candidate_pool:
124-
candidate_pool = [
125-
dev
126-
for dev in available_total
127-
if dev not in user_specified_allocated_devices
128-
]
159+
candidate_pool = [dev for dev in unoccupied_gpus]
129160
else:
130161
candidate_pool = [
131162
dev
@@ -149,6 +180,11 @@ def allocate(
149180
self._active_model_counts[base_model_uid] = (
150181
self._active_model_counts.get(base_model_uid, 0) + 1
151182
)
183+
# Reserve logical memory for selected GPUs
184+
for dev in selected:
185+
self._reserved_memory_mb[dev] = (
186+
self._reserved_memory_mb.get(dev, 0.0) + self._DEFAULT_BOOKED_MB
187+
)
152188
return selected
153189

154190
def release(self, model_uid: str, devices: List[int]) -> None:
@@ -160,8 +196,22 @@ def release(self, model_uid: str, devices: List[int]) -> None:
160196
if count <= 1:
161197
self._active_model_counts.pop(base_model_uid, None)
162198
self._model_spread_used_gpus.pop(base_model_uid, None)
199+
for dev in devices:
200+
if dev in self._reserved_memory_mb:
201+
self._reserved_memory_mb[dev] -= self._DEFAULT_BOOKED_MB
202+
if self._reserved_memory_mb[dev] <= 0:
203+
self._reserved_memory_mb.pop(dev, None)
163204
else:
164205
self._active_model_counts[base_model_uid] = count - 1
206+
for dev in devices:
207+
if dev in self._reserved_memory_mb:
208+
self._reserved_memory_mb[dev] -= self._DEFAULT_BOOKED_MB
209+
if self._reserved_memory_mb[dev] <= 0:
210+
self._reserved_memory_mb.pop(dev, None)
211+
212+
def is_idle(self) -> bool:
213+
"""Return True when no active models are tracked by this strategy."""
214+
return not self._active_model_counts
165215

166216

167217
def create_launch_strategy(

xinference/core/supervisor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,6 +1096,9 @@ async def _launch_one_model(worker_ref, _replica_model_uid, rank: int):
10961096
model_type = model_type or "LLM"
10971097

10981098
try:
1099+
# Ensure per-base-model launch strategy is ready on worker before concurrent launches
1100+
await worker_ref.ensure_launch_strategy(model_uid)
1101+
10991102
subpool_address = await worker_ref.launch_builtin_model(
11001103
model_uid=_replica_model_uid,
11011104
model_name=model_name,

xinference/core/tests/test_worker.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def _select_emptiest_gpu(
4444
)
4545
scored.append((dev, available - penalty))
4646

47-
# Prefer higher available memory, then the lowest GPU index.
47+
# Prefer higher available memory, then lowest GPU index.
4848
scored.sort(key=lambda item: (-item[1], item[0]))
4949
return scored[0][0]
5050

@@ -62,12 +62,13 @@ def __init__(
6262
for idx in cuda_devices
6363
}
6464

65-
def _create_launch_strategy_instance(self):
65+
def _gather_initial_gpu_memory_info(self):
66+
return self._test_gpu_memory_info
67+
68+
def _create_launch_strategy_instance(self, gpu_memory_info=None):
6669
return DeterministicIdleFirstLaunchStrategy(
6770
self._total_gpu_devices,
68-
gpu_memory_info=self._test_gpu_memory_info,
69-
model_spread_used_gpus=self._model_spread_used_gpus,
70-
active_model_counts=self._active_model_counts,
71+
gpu_memory_info=gpu_memory_info or self._test_gpu_memory_info,
7172
)
7273

7374
async def __post_create__(self):

xinference/core/worker.py

Lines changed: 97 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@ def __init__(
158158
# Share launch spread/replica counts across strategy instances
159159
self._model_spread_used_gpus: Dict[str, Set[int]] = {}
160160
self._active_model_counts: Dict[str, int] = {}
161+
# Cached launch strategies per base model
162+
self._launch_strategies: Dict[str, Any] = {}
163+
# Protect concurrent allocations/releases so bookings stay consistent
164+
self._allocation_lock = threading.Lock()
161165
from ..constants import (
162166
XINFERENCE_LAUNCH_ALLOWED_GPUS,
163167
XINFERENCE_LAUNCH_STRATEGY,
@@ -559,7 +563,7 @@ def _collect_user_specified_devices(self) -> Set[int]:
559563
user_specified_allocated_devices.add(dev)
560564
return user_specified_allocated_devices
561565

562-
def _create_launch_strategy_instance(self):
566+
def _gather_initial_gpu_memory_info(self) -> Optional[Dict[int, Dict[str, float]]]:
563567
# Try to seed strategy with current GPU memory snapshot from NVML
564568
initial_gpu_memory_info: Optional[Dict[int, Dict[str, float]]] = None
565569
try:
@@ -576,27 +580,64 @@ def _create_launch_strategy_instance(self):
576580
initial_gpu_memory_info = gpu_info or None
577581
except Exception:
578582
initial_gpu_memory_info = None
583+
return initial_gpu_memory_info
579584

585+
def _create_launch_strategy_instance(
586+
self, gpu_memory_info: Optional[Dict[int, Dict[str, float]]] = None
587+
):
588+
if gpu_memory_info is None:
589+
raise ValueError("gpu_memory_info is required to create launch strategy")
580590
return create_launch_strategy(
581591
strategy_name=self._launch_strategy_name,
582592
total_gpu_devices=self._total_gpu_devices,
583593
allowed_devices=self._launch_allowed_gpus,
584-
gpu_memory_info=initial_gpu_memory_info,
585-
model_spread_used_gpus=self._model_spread_used_gpus,
586-
active_model_counts=self._active_model_counts,
594+
gpu_memory_info=gpu_memory_info,
595+
)
596+
597+
def _get_base_model_uid(self, model_uid: str) -> str:
598+
try:
599+
base_model_uid, _ = parse_replica_model_uid(model_uid)
600+
return base_model_uid
601+
except Exception:
602+
return model_uid
603+
604+
def _get_or_create_launch_strategy(self, model_uid: str):
605+
base_model_uid = self._get_base_model_uid(model_uid)
606+
strategy = self._launch_strategies.get(base_model_uid)
607+
if strategy is not None:
608+
return strategy
609+
strategy = self._create_launch_strategy_instance(
610+
gpu_memory_info=self._gather_initial_gpu_memory_info()
587611
)
612+
self._launch_strategies[base_model_uid] = strategy
613+
return strategy
614+
615+
def ensure_launch_strategy(self, model_uid: str):
616+
"""
617+
Ensure a launch strategy exists for the given base model.
618+
This is intended to be triggered from supervisor before concurrent launches.
619+
"""
620+
base_model_uid = self._get_base_model_uid(model_uid)
621+
with self._allocation_lock:
622+
if base_model_uid in self._launch_strategies:
623+
return
624+
strategy = self._create_launch_strategy_instance(
625+
gpu_memory_info=self._gather_initial_gpu_memory_info()
626+
)
627+
self._launch_strategies[base_model_uid] = strategy
588628

589629
def allocate_devices(self, model_uid: str, n_gpu: int) -> List[int]:
590630
spec = LaunchModelSpec(model_uid=model_uid, n_gpu=n_gpu)
591-
strategy = self._create_launch_strategy_instance()
592-
devices = strategy.allocate(
593-
spec=spec,
594-
total_gpu_devices=self._total_gpu_devices,
595-
user_specified_allocated_devices=self._collect_user_specified_devices(),
596-
allocated_gpus=self._gpu_to_model_uid,
597-
)
598-
for dev in devices:
599-
self._gpu_to_model_uid[int(dev)].add(model_uid)
631+
strategy = self._get_or_create_launch_strategy(model_uid)
632+
with self._allocation_lock:
633+
devices = strategy.allocate(
634+
spec=spec,
635+
total_gpu_devices=self._total_gpu_devices,
636+
user_specified_allocated_devices=self._collect_user_specified_devices(),
637+
allocated_gpus=self._gpu_to_model_uid,
638+
)
639+
for dev in devices:
640+
self._gpu_to_model_uid[int(dev)].add(model_uid)
600641
return sorted(devices)
601642

602643
def allocate_devices_for_model(
@@ -616,15 +657,16 @@ def allocate_devices_for_model(
616657
model_format=model_format,
617658
quantization=quantization,
618659
)
619-
strategy = self._create_launch_strategy_instance()
620-
devices = strategy.allocate(
621-
spec=spec,
622-
total_gpu_devices=self._total_gpu_devices,
623-
user_specified_allocated_devices=self._collect_user_specified_devices(),
624-
allocated_gpus=self._gpu_to_model_uid,
625-
)
626-
for dev in devices:
627-
self._gpu_to_model_uid[int(dev)].add(model_uid)
660+
strategy = self._get_or_create_launch_strategy(model_uid)
661+
with self._allocation_lock:
662+
devices = strategy.allocate(
663+
spec=spec,
664+
total_gpu_devices=self._total_gpu_devices,
665+
user_specified_allocated_devices=self._collect_user_specified_devices(),
666+
allocated_gpus=self._gpu_to_model_uid,
667+
)
668+
for dev in devices:
669+
self._gpu_to_model_uid[int(dev)].add(model_uid)
628670
return sorted(devices)
629671

630672
async def allocate_devices_with_gpu_idx(
@@ -666,35 +708,40 @@ async def allocate_devices_with_gpu_idx(
666708
return sorted(gpu_idx)
667709

668710
def release_devices(self, model_uid: str):
669-
devices = [
670-
dev for dev, uids in self._gpu_to_model_uid.items() if model_uid in uids
671-
]
672-
for dev in devices:
673-
if model_uid in self._gpu_to_model_uid[dev]:
674-
self._gpu_to_model_uid[dev].remove(model_uid)
675-
if not self._gpu_to_model_uid[dev]:
676-
del self._gpu_to_model_uid[dev]
677-
678-
# check embedding
679-
for dev in self._gpu_to_embedding_model_uids:
680-
if model_uid in self._gpu_to_embedding_model_uids[dev]:
681-
self._gpu_to_embedding_model_uids[dev].remove(model_uid)
682-
683-
# check user-specified slots
684-
for dev in list(self._user_specified_gpu_to_model_uids):
685-
model_infos = [
686-
info
687-
for info in self._user_specified_gpu_to_model_uids[dev]
688-
if info[0] == model_uid
711+
base_model_uid = self._get_base_model_uid(model_uid)
712+
strategy = self._launch_strategies.get(base_model_uid)
713+
with self._allocation_lock:
714+
devices = [
715+
dev for dev, uids in self._gpu_to_model_uid.items() if model_uid in uids
689716
]
690-
for model_info in model_infos:
691-
self._user_specified_gpu_to_model_uids[dev].remove(model_info)
692-
if not self._user_specified_gpu_to_model_uids[dev]:
693-
del self._user_specified_gpu_to_model_uids[dev]
694-
695-
# Keep strategy bookkeeping in sync for spread逻辑
696-
strategy = self._create_launch_strategy_instance()
697-
strategy.release(model_uid, devices)
717+
for dev in devices:
718+
if model_uid in self._gpu_to_model_uid[dev]:
719+
self._gpu_to_model_uid[dev].remove(model_uid)
720+
if not self._gpu_to_model_uid[dev]:
721+
del self._gpu_to_model_uid[dev]
722+
723+
# check embedding
724+
for dev in self._gpu_to_embedding_model_uids:
725+
if model_uid in self._gpu_to_embedding_model_uids[dev]:
726+
self._gpu_to_embedding_model_uids[dev].remove(model_uid)
727+
728+
# check user-specified slots
729+
for dev in list(self._user_specified_gpu_to_model_uids):
730+
model_infos = [
731+
info
732+
for info in self._user_specified_gpu_to_model_uids[dev]
733+
if info[0] == model_uid
734+
]
735+
for model_info in model_infos:
736+
self._user_specified_gpu_to_model_uids[dev].remove(model_info)
737+
if not self._user_specified_gpu_to_model_uids[dev]:
738+
del self._user_specified_gpu_to_model_uids[dev]
739+
740+
# Keep strategy bookkeeping in sync for spread逻辑
741+
if strategy is not None:
742+
strategy.release(model_uid, devices)
743+
if strategy.is_idle():
744+
self._launch_strategies.pop(base_model_uid, None)
698745

699746
async def _create_subpool(
700747
self,

0 commit comments

Comments
 (0)