Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions miles/ray/rollout/server_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def start_engines(self, port_cursors: PortCursors) -> tuple[list, list[int]]:
]
return init_handles, new_engine_indices

def stop_engines(self, rollout_engine_id: int):
async def stop_engines(self, rollout_engine_id: int):
logger.info(f"Killing server group {rollout_engine_id}...")
for i in range(
rollout_engine_id * self.nodes_per_engine,
Expand All @@ -166,7 +166,7 @@ def stop_engines(self, rollout_engine_id: int):
if engine.is_allocated:
logger.info(f"Shutting down and killing engine at index {i}")
try:
ray.get(engine.actor_handle.shutdown.remote())
await engine.actor_handle.shutdown.remote()
ray.kill(engine.actor_handle)
logger.info(f"Successfully killed engine at index {i}")
except Exception as e:
Expand Down
3 changes: 2 additions & 1 deletion miles/utils/health_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import ray

from miles.ray.rollout.server_group import ServerGroup
from miles.utils.async_utils import run

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -154,6 +155,6 @@ def _check_engine_health(self, rollout_engine_id, engine) -> None:
logger.error(
f"Health check failed for rollout engine {rollout_engine_id} (ray timeout or error). Killing actor. Exception: {e}"
)
self._server_group.stop_engines(rollout_engine_id=rollout_engine_id)
run(self._server_group.stop_engines(rollout_engine_id=rollout_engine_id))
else:
logger.debug(f"Health check passed for rollout engine {rollout_engine_id}")
Loading