Add aliveness to rollout engine state#941
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces more granular state management for rollout engines by distinguishing between 'allocated-uninitialized' and 'alive' states. It updates the ServerEngine class with new state transitions and modifies ServerGroup to track specific engine indices during startup and recovery. Review feedback indicates that the mark_alive call in rollout_server.py is premature because it occurs before the asynchronous initialization of Ray actors is complete. Additionally, a suggestion was made to update the start_engines docstring to accurately reflect its new return type and behavior.
| ) | ||
| handles, _ = group.start_engines(port_cursors) | ||
| handles, new_engine_indices = group.start_engines(port_cursors) | ||
| group.mark_alive(engine_indices=new_engine_indices) |
There was a problem hiding this comment.
The mark_alive call here is premature. group.start_engines returns Ray ObjectRefs for the asynchronous init calls, which are only resolved later at line 86 via ray.get(all_init_handles). Marking the engines as alive before they have finished initializing is inconsistent with the state's intended meaning. Additionally, per repository rules, when waiting for a server process to start, checking process liveness is not sufficient; the check must also verify that the server is actively listening for connections on its designated port.
References
- When waiting for a server process to start, checking process liveness (e.g., is_alive()) is not sufficient. The check must also verify that the server is actively listening for connections on its designated port, for instance by attempting a socket connection or making an HTTP request.
| def start_engines(self, port_cursors: PortCursors) -> tuple[list, list[int]]: | ||
| """Create Ray actors, allocate ports, and fire ``engine.init()`` without waiting. | ||
|
|
||
| Returns ``(init_handles, curr_num_new_engines)`` where *init_handles* is a list | ||
| of Ray ObjectRefs and *port_cursors* maps node index -> next free port. | ||
| """ |
There was a problem hiding this comment.
The docstring for start_engines is outdated. It still refers to curr_num_new_engines (an integer) as the second return value, but the method now returns new_engine_indices (a list of integers). Additionally, it incorrectly implies that port_cursors is part of the return value, whereas it is modified in-place.
| def start_engines(self, port_cursors: PortCursors) -> tuple[list, list[int]]: | |
| """Create Ray actors, allocate ports, and fire ``engine.init()`` without waiting. | |
| Returns ``(init_handles, curr_num_new_engines)`` where *init_handles* is a list | |
| of Ray ObjectRefs and *port_cursors* maps node index -> next free port. | |
| """ | |
| def start_engines(self, port_cursors: PortCursors) -> tuple[list, list[int]]: | |
| """Create Ray actors, allocate ports, and fire engine.init() without waiting. | |
| Returns (init_handles, new_engine_indices) where *init_handles* is a list | |
| of Ray ObjectRefs and *new_engine_indices* is a list of indices of the new engines. | |
| """ |
No description provided.