Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions miles/ray/rollout/addr_allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def next_base_port(self) -> int:
return max(self._values.values()) if self._values else 15000


# NOTE: May re-implement this in a potentially easier way if needed
def allocate_rollout_engine_addr_and_ports_normal(
*,
args,
Expand Down
14 changes: 7 additions & 7 deletions miles/ray/rollout/server_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def start_engines(self, port_cursors: PortCursors) -> list:

RolloutRayActor = ray.remote(SGLangEngine)

rollout_engines = []
new_engines = []
for i in range(len(self.all_engines)):
if self.all_engines[i] is not None:
continue
Expand Down Expand Up @@ -117,23 +117,23 @@ def start_engines(self, port_cursors: PortCursors) -> list:
num_gpus_per_engine=self.num_gpus_per_engine,
)

rollout_engines.append((global_rank, rollout_engine))
new_engines.append((global_rank, rollout_engine))
self.all_engines[i] = rollout_engine

self.num_new_engines = len(rollout_engines)
self.num_new_engines = len(new_engines)

if self.num_new_engines == 0:
return []

if self.args.rollout_external:
addr_and_ports = allocate_rollout_engine_addr_and_ports_external(
args=self.args, rollout_engines=rollout_engines
args=self.args, rollout_engines=new_engines
)
else:
base_port = port_cursors.next_base_port()
addr_and_ports, next_port_cursors = allocate_rollout_engine_addr_and_ports_normal(
args=self.args,
rollout_engines=rollout_engines,
rollout_engines=new_engines,
worker_type=self.worker_type,
num_gpus_per_engine=self.num_gpus_per_engine,
rank_offset=self.rank_offset,
Expand All @@ -143,11 +143,11 @@ def start_engines(self, port_cursors: PortCursors) -> list:

init_handles = [
engine.init.remote(
**(addr_and_ports[rank]),
**addr_and_ports[index],
router_ip=self.router_ip,
router_port=self.router_port,
)
for rank, engine in rollout_engines
for index, engine in new_engines
Comment on lines +146 to +150
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Renaming rank to index here is less descriptive and potentially misleading. In distributed computing contexts, "rank" is the standard term for a process identifier within a group, whereas "index" typically refers to a position in a sequence (like a 0-based list index). Since the value being unpacked from new_engines is the global_rank (as seen in line 120), it is better to maintain the name rank or global_rank to reflect its actual semantics and maintain consistency with the rest of the codebase.

Suggested change
**addr_and_ports[index],
router_ip=self.router_ip,
router_port=self.router_port,
)
for rank, engine in rollout_engines
for index, engine in new_engines
**addr_and_ports[rank],
router_ip=self.router_ip,
router_port=self.router_port,
)
for rank, engine in new_engines

]
return init_handles

Expand Down
Loading