Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion miles/ray/rollout/rollout_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,10 @@ async def onload_kv(self):

# -------------------------- engine management -----------------------------

def get_updatable_engines_and_lock(self):
async def get_updatable_engines_and_lock(self):
"""Return engines eligible for weight updates."""
srv = self._get_updatable_server()
await srv.wait_all_engines_alive()
engines = [e.actor_handle for e in srv.engines] if srv else []
gpu_counts = srv.engine_gpu_counts if srv else []
gpu_offsets = srv.engine_gpu_offsets if srv else []
Expand Down
7 changes: 7 additions & 0 deletions miles/ray/rollout/rollout_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,10 @@ async def onload(self, tags: list[str] | None = None):

async def check_weights(self, action: str):
return await asyncio.gather(*[g.check_weights(action=action) for g in self.server_groups])

async def wait_all_engines_alive(self):
while True:
if all(e.is_alive for g in self.server_groups for e in g.all_engines):
return
await asyncio.sleep(2)
logger.info("wait_all_engines_alive looping...")
4 changes: 4 additions & 0 deletions miles/ray/rollout/server_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def actor_handle(self) -> ray.actor.ActorHandle:
def is_allocated(self) -> bool:
return isinstance(self._state, _StateAllocatedBase)

@property
def is_alive(self) -> bool:
return isinstance(self._state, _StateAllocatedAlive)

# TODO: unify w/ trainer `change_state`
def _change_state(
self,
Expand Down
Loading