Skip to content

Commit 48ad42c

Browse files
committed
new strategy
1 parent 486131f commit 48ad42c

File tree

5 files changed

+149
-118
lines changed

5 files changed

+149
-118
lines changed

xinference/core/launch_strategy.py

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from dataclasses import dataclass
1717
from typing import Dict, List, Mapping, Optional, Set, Tuple, Union
1818

19-
from ..device_utils import update_gpu_memory_info
2019
from .utils import parse_replica_model_uid
2120

2221
logger = logging.getLogger(__name__)
@@ -57,10 +56,7 @@ def is_idle(self) -> bool:
5756

5857

5958
class IdleFirstLaunchStrategy(LaunchStrategy):
60-
"""
61-
Prefer the GPU running Xinference, otherwise keep allocating onto the emptiest
62-
remaining GPU.
63-
"""
59+
"""Always place replicas onto the currently emptiest GPU."""
6460

6561
_DEFAULT_BOOKED_MB = 1024 # logical reservation per replica
6662

@@ -104,17 +100,29 @@ def _select_emptiest_gpu(
104100

105101
scored: List[Tuple[int, Union[int, float]]] = []
106102
for dev in candidates:
107-
update_gpu_memory_info(self._gpu_memory_info, dev, logger=logger)
108103
available = self._gpu_memory_info.get(dev, {}).get("available", 0)
109104
# Deduct logical reservations to avoid stacking replicas too quickly
110105
available -= self._reserved_memory_mb.get(dev, 0)
111106
# Penalize GPUs already planned/allocated to avoid stacking too early
112107
penalty = pending_gpu_counts.get(dev, 0) + len(
113108
allocated_gpus.get(dev, set())
114109
)
115-
scored.append((dev, available - penalty))
116-
117-
scored.sort(key=lambda item: (-item[1], item[0]))
110+
score = available - penalty
111+
scored.append((dev, score))
112+
113+
# If scores are infinite (heartbeat missing => infinite available),
114+
# fall back to smallest reserved/penalty; tie-break by GPU index.
115+
if any(val[1] == float("inf") for val in scored):
116+
scored.sort(
117+
key=lambda item: (
118+
self._reserved_memory_mb.get(item[0], 0.0)
119+
+ pending_gpu_counts.get(item[0], 0)
120+
+ len(allocated_gpus.get(item[0], set())),
121+
item[0],
122+
)
123+
)
124+
else:
125+
scored.sort(key=lambda item: (-item[1], item[0]))
118126
return scored[0][0] if scored else None
119127

120128
def allocate(
@@ -133,36 +141,18 @@ def allocate(
133141
base_model_uid, _ = parse_replica_model_uid(model_uid)
134142
except Exception:
135143
base_model_uid = model_uid
136-
used_in_spread = self._model_spread_used_gpus.setdefault(base_model_uid, set())
137144
n_gpu = spec.n_gpu
138145

139146
pending_gpu_counts: Dict[int, int] = {}
140147
selected: List[int] = []
141148

142149
while len(selected) < n_gpu:
143-
# Prefer truly idle GPUs first: those without existing allocations
144-
unoccupied_gpus = [
150+
# Always pick the emptiest eligible GPU (excludes user-specified ones)
151+
candidate_pool = [
145152
dev
146153
for dev in available_total
147154
if dev not in user_specified_allocated_devices
148-
and not allocated_gpus.get(dev)
149155
]
150-
spreading_phase = bool(unoccupied_gpus) and len(used_in_spread) < len(
151-
unoccupied_gpus
152-
)
153-
if spreading_phase:
154-
# First round: try to place replicas on distinct, unoccupied GPUs
155-
candidate_pool = [
156-
dev for dev in unoccupied_gpus if dev not in used_in_spread
157-
]
158-
if not candidate_pool:
159-
candidate_pool = [dev for dev in unoccupied_gpus]
160-
else:
161-
candidate_pool = [
162-
dev
163-
for dev in available_total
164-
if dev not in user_specified_allocated_devices
165-
]
166156
emptiest_gpu = self._select_emptiest_gpu(
167157
candidate_pool, pending_gpu_counts, allocated_gpus
168158
)
@@ -173,10 +163,9 @@ def allocate(
173163
pending_gpu_counts[emptiest_gpu] = (
174164
pending_gpu_counts.get(emptiest_gpu, 0) + 1
175165
)
176-
used_in_spread.add(emptiest_gpu)
177166

178-
# Persist spread history for this base model
179-
self._model_spread_used_gpus[base_model_uid] = used_in_spread
167+
# Persist spread history for compatibility with release bookkeeping
168+
self._model_spread_used_gpus.setdefault(base_model_uid, set()).update(selected)
180169
self._active_model_counts[base_model_uid] = (
181170
self._active_model_counts.get(base_model_uid, 0) + 1
182171
)

xinference/core/supervisor.py

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
List,
3131
Literal,
3232
Optional,
33+
Set,
3334
Tuple,
3435
Type,
3536
Union,
@@ -48,6 +49,7 @@
4849
from ..core.status_guard import InstanceInfo, LaunchStatus
4950
from ..model.utils import get_engine_params_by_name
5051
from ..types import PeftModelConfig
52+
from .launch_strategy import create_launch_strategy
5153
from .metrics import record_metrics
5254
from .resource import GPUStatus, ResourceStatus
5355
from .utils import (
@@ -899,6 +901,44 @@ def _get_worker_refs_by_ip(self, ip: str) -> List[xo.ActorRefType["WorkerActor"]
899901
)
900902
return refs
901903

904+
def _build_gpu_memory_info(
905+
self, worker_ref
906+
) -> Optional[Dict[int, Dict[str, float]]]:
907+
"""Use latest heartbeat data for GPU memory snapshot."""
908+
worker_status = self._worker_status.get(worker_ref.address)
909+
if worker_status is None:
910+
return None
911+
gpu_info: Dict[int, Dict[str, float]] = {}
912+
for dev, status in worker_status.status.items():
913+
if isinstance(status, GPUStatus) and str(dev).startswith("gpu-"):
914+
try:
915+
idx = int(str(dev).split("-", 1)[1])
916+
except Exception:
917+
continue
918+
gpu_info[idx] = {
919+
"total": status.mem_total // (1024**2),
920+
"used": status.mem_used // (1024**2),
921+
"available": status.mem_free // (1024**2),
922+
}
923+
return gpu_info or None
924+
925+
async def _install_strategy_on_worker(self, model_uid: str, worker_ref) -> None:
926+
ctx = await worker_ref.get_launch_strategy_context()
927+
gpu_memory_info = self._build_gpu_memory_info(worker_ref)
928+
if gpu_memory_info is None:
929+
# Heartbeat disabled or missing: assume all visible GPUs are available with "infinite" mem
930+
gpu_memory_info = {
931+
dev: {"total": float("inf"), "used": 0.0, "available": float("inf")}
932+
for dev in ctx["total_gpu_devices"]
933+
}
934+
strategy = create_launch_strategy(
935+
strategy_name=ctx["launch_strategy_name"],
936+
total_gpu_devices=ctx["total_gpu_devices"],
937+
allowed_devices=ctx["allowed_devices"],
938+
gpu_memory_info=gpu_memory_info,
939+
)
940+
await worker_ref.install_launch_strategy(model_uid, strategy)
941+
902942
@log_async(logger=logger)
903943
async def launch_builtin_model(
904944
self,
@@ -1096,9 +1136,6 @@ async def _launch_one_model(worker_ref, _replica_model_uid, rank: int):
10961136
model_type = model_type or "LLM"
10971137

10981138
try:
1099-
# Ensure per-base-model launch strategy is ready on worker before concurrent launches
1100-
await worker_ref.ensure_launch_strategy(model_uid)
1101-
11021139
subpool_address = await worker_ref.launch_builtin_model(
11031140
model_uid=_replica_model_uid,
11041141
model_name=model_name,
@@ -1140,6 +1177,7 @@ async def _launch_model():
11401177
try:
11411178
# Pre-fetch worker loads for balanced scheduling
11421179
worker_candidates = []
1180+
prepared_workers: Set[str] = set()
11431181

11441182
if target_worker_refs:
11451183
workers = target_worker_refs
@@ -1188,6 +1226,11 @@ async def _launch_model():
11881226
_idx
11891227
].append(worker_ref)
11901228

1229+
# Prepare launch strategy per worker once before launching replicas
1230+
if worker_ref.address not in prepared_workers:
1231+
await self._install_strategy_on_worker(model_uid, worker_ref)
1232+
prepared_workers.add(worker_ref.address)
1233+
11911234
if enable_xavier and _idx == 0:
11921235
"""
11931236
Start the rank 0 model actor on the worker that holds the rank 1 replica,
@@ -1359,6 +1402,7 @@ async def _launch_model():
13591402
"n_worker cannot be larger than the number of available workers."
13601403
)
13611404
try:
1405+
prepared_workers: Set[str] = set()
13621406
for _idx, rep_model_uid in enumerate(
13631407
iter_replica_model_uid(model_uid, replica)
13641408
):
@@ -1375,6 +1419,11 @@ async def _launch_model():
13751419
].replica_to_worker_refs[_idx].append(worker_ref)
13761420
nonlocal model_type
13771421
model_type = model_type or "LLM"
1422+
if worker_ref.address not in prepared_workers:
1423+
await self._install_strategy_on_worker(
1424+
model_uid, worker_ref
1425+
)
1426+
prepared_workers.add(worker_ref.address)
13781427
if i_worker > 1:
13791428
assert (
13801429
driver_info is not None

0 commit comments

Comments
 (0)