Support stopping engines externally and map cell_id to concrete engines#944
Support stopping engines externally and map cell_id to concrete engines#944fzyzcjy wants to merge 14 commits intorollout_ft/26from
Conversation
A cell is nodes_per_engine consecutive engines from ServerGroup.all_engines. cell_id is a flat index across all servers and groups.
There was a problem hiding this comment.
Code Review
This pull request introduces functionality to stop specific cells within the rollout manager by mapping cell IDs to their corresponding server and engine indices. A new utility module, server_cell.py, was added to handle this mapping logic, and a stop_cell method was implemented in RolloutManager. The review feedback suggests renaming the mapping function for better clarity, optimizing the stop_cell method by caching the cell ID map to avoid redundant computations, and implementing input validation to prevent out-of-bounds access.
| engine_indices: list[int] | ||
|
|
||
|
|
||
| def get_cell_indexer_of_id_map(servers: dict[str, RolloutServer]) -> list[CellIndexer]: |
There was a problem hiding this comment.
The function name get_cell_indexer_of_id_map is confusing and grammatically awkward. Since it returns a list where the index represents the cell_id, a more descriptive name like get_cell_id_to_indexer_map would improve readability and maintainability.
| def get_cell_indexer_of_id_map(servers: dict[str, RolloutServer]) -> list[CellIndexer]: | |
| def get_cell_id_to_indexer_map(servers: dict[str, RolloutServer]) -> list[CellIndexer]: |
| from miles.ray.rollout.rollout_data_conversion import postprocess_rollout_data | ||
| from miles.ray.rollout.rollout_server import RolloutServer, start_rollout_servers | ||
| from miles.ray.rollout.router_manager import start_session_server | ||
| from miles.ray.rollout.server_cell import get_cell_indexer_of_id_map |
| async def stop_cell(self, cell_id: int): | ||
| idx = get_cell_indexer_of_id_map(self.servers)[cell_id] | ||
| group = self.servers[idx.srv_key].server_groups[idx.group_index] | ||
| group.stop_engines(engine_indices=idx.engine_indices) |
There was a problem hiding this comment.
Recomputing the cell mapping on every call to stop_cell is inefficient, especially if the number of engines is large. Additionally, the cell_id should be validated to ensure it is within the valid range and to prevent unexpected behavior with negative indices (which Python lists allow). Caching the mapping lazily on the instance is a good way to optimize this since self.servers is static after initialization.
| async def stop_cell(self, cell_id: int): | |
| idx = get_cell_indexer_of_id_map(self.servers)[cell_id] | |
| group = self.servers[idx.srv_key].server_groups[idx.group_index] | |
| group.stop_engines(engine_indices=idx.engine_indices) | |
| async def stop_cell(self, cell_id: int): | |
| if not hasattr(self, "_cell_id_to_indexer"): | |
| self._cell_id_to_indexer = get_cell_id_to_indexer_map(self.servers) | |
| if not (0 <= cell_id < len(self._cell_id_to_indexer)): | |
| raise IndexError(f"cell_id {cell_id} is out of range (0-{len(self._cell_id_to_indexer) - 1})") | |
| idx = self._cell_id_to_indexer[cell_id] | |
| group = self.servers[idx.srv_key].server_groups[idx.group_index] | |
| group.stop_engines(engine_indices=idx.engine_indices) |
No description provided.