Skip to content

✨ feat(Ray): Enhance Ray #2847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Guidelines for modifications:
* Ziqi Fan
* Zoe McCarthy
* David Leon
* Song Yi

## Acknowledgements

Expand Down
19 changes: 17 additions & 2 deletions docs/source/features/ray.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ specifying the ``--num_workers`` argument for resource-wrapped jobs, or ``--num_
for tuning jobs, which is especially critical for parallel aggregate
job processing on local/virtual multi-GPU machines. Tuning jobs assume homogeneous node resource composition for nodes with GPUs.

The two following files contain the core functionality of the Ray integration.
The three following files contain the core functionality of the Ray integration.

.. dropdown:: scripts/reinforcement_learning/ray/wrap_resources.py
:icon: code
Expand All @@ -62,6 +62,12 @@ The two following files contain the core functionality of the Ray integration.
:language: python
:emphasize-lines: 18-53

.. dropdown:: scripts/reinforcement_learning/ray/task_runner.py
:icon: code

.. literalinclude:: ../../../scripts/reinforcement_learning/ray/task_runner.py
:language: python
:emphasize-lines: 13-59

The following script can be used to submit aggregate
jobs to one or more Ray cluster(s), which can be used for
Expand All @@ -73,7 +79,7 @@ resource requirements.

.. literalinclude:: ../../../scripts/reinforcement_learning/ray/submit_job.py
:language: python
:emphasize-lines: 12-53
:emphasize-lines: 13-61

The following script can be used to extract KubeRay cluster information for aggregate job submission.

Expand Down Expand Up @@ -151,6 +157,15 @@ Submitting resource-wrapped individual jobs instead of automatic tuning runs is
:language: python
:emphasize-lines: 14-66

Supports specifying per-task resources and setting ``py_modules`` and ``pip`` packages for each run.
Copy link
Collaborator

@garylvov garylvov Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also contrast this relative to how this is different from submit_job.py. It should be clear to the user that these scripts have similar capability, and yours has extra benefits. Maybe, your script should be mentioned directly after submit_job.py script as well to group the similar packages together instead of separating them by other scripts.

I'd write something along the lines of:

Similar to submit_job.py, the following script submits aggregate jobs to a Ray cluster while supporting YAML-based resource specification (instead of command line arguments) and py_modules/pip modules through Ray.


.. dropdown:: scripts/reinforcement_learning/ray/task_runner.py
:icon: code

.. literalinclude:: ../../../scripts/reinforcement_learning/ray/task_runner.py
:language: python
:emphasize-lines: 9-55

Transferring files from the running container can be done as follows.

.. code-block:: bash
Expand Down
10 changes: 9 additions & 1 deletion scripts/reinforcement_learning/ray/submit_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
creates several individual jobs when started on a cluster. Alternatively, an aggregate job
could be a :file:'../wrap_resources.py` resource-wrapped job,
which may contain several individual sub-jobs separated by
the + delimiter.
the + delimiter. An aggregate job could also be a :file:`../task_runner.py` multi-task submission job,
where each sub-job and its resource requirements are defined in a YAML configuration file.
In this mode, :file:`../task_runner.py` will read the YAML file (via --task_cfg), and
submit all defined sub-tasks to the Ray cluster, supporting per-job resource specification and
real-time streaming of sub-job outputs.

If there are more aggregate jobs than cluster(s), aggregate jobs will be submitted
as clusters become available via the defined relation above. If there are less aggregate job(s)
Expand All @@ -48,6 +52,10 @@
# Example: Submitting resource wrapped job
python3 scripts/reinforcement_learning/ray/submit_job.py --aggregate_jobs wrap_resources.py --test

# Example: submitting tasks with specific resources, and supporting pip packages and py_modules
# You may use relative paths for task_cfg and py_modules, placing them in the scripts/reinforcement_learning/ray directory, which will be uploaded to the cluster.
python3 scripts/reinforcement_learning/ray/submit_job.py --aggregate_jobs task_runner.py --task_cfg tasks.yaml

# For all command line arguments
python3 scripts/reinforcement_learning/ray/submit_job.py -h
"""
Expand Down
159 changes: 159 additions & 0 deletions scripts/reinforcement_learning/ray/task_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse
import sys
import yaml

import ray
import util

"""
This script dispatches one or more user-defined Python tasks to workers in a Ray cluster.
Each task, with its resource requirements and execution parameters, is described in a YAML configuration file.
You may specify the desired number of CPUs, GPUs, and memory allocation for each task in the config file.

Key features:
- Flexible resource management per task via config fields (`num_gpus`, `num_cpus`, `memory`).
- Real-time output streaming (stdout/stderr) for each task.
- Parallel execution of multiple tasks across cluster resources.

Tasks are distributed and scheduled according to Ray’s built-in resource manager.

Typical usage:
---------------

.. code-block:: bash

# Print help and argument details:
python task_runner.py -h

# Submit tasks defined in a YAML file to the Ray cluster (auto-detects Ray head address):
python task_runner.py --task_cfg /path/to/tasks.yaml

YAML configuration example:
---------------------------
.. code-block:: yaml
pip: ["xxx"]
py_modules: ["my_package/my_package"]
tasks:
- name: "task1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also add a comment here showing what a task1 could be

py_args: "-m torch.distributed.run --nnodes=1 --nproc_per_node=2 --rdzv_endpoint=localhost:29501 /workspace/isaaclab/scripts/reinforcement_learning/rsl_rl/train.py --task=Isaac-Cartpole-v0 --max_iterations 200 --headless --distributed"
num_gpus: 2
num_cpus: 10
memory: 10737418240
- name: "task2"
py_args: "script.py --option arg"
num_gpus: 0
num_cpus: 1
memory: 10*1024*1024*1024

- `pip`: List of pip packages to install.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little pedantic, but I'd say something like an empty list means no additional packages above what is included in the base.

- `py_args`: Arguments passed to the Python executable for this task.
- `num_gpus`, `num_cpus`: Number of GPUs/CPUs to allocate. Can be integer or a string like `"2*2"`.
- `memory`: Amount of memory (bytes) to allocate. Can be integer or a string like `"10*1024*1024*1024"`.

To stop all tasks early, press Ctrl+C; the script will cancel all running Ray tasks.
"""


def parse_args():
parser = argparse.ArgumentParser(description="Run tasks from a YAML config file.")
parser.add_argument("--task_cfg", type=str, required=True, help="Path to the YAML task file.")
parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.")
parser.add_argument(
"--test",
action="store_true",
help=(
"Run nvidia-smi test instead of the arbitrary job,"
"can use as a sanity check prior to any jobs to check "
"that GPU resources are correctly isolated."
),
)
return parser.parse_args()


def parse_task_opt(task):
opts = {}
if "num_gpus" in task:
opts["num_gpus"] = eval(task["num_gpus"]) if isinstance(task["num_gpus"], str) else task["num_gpus"]
if "num_cpus" in task:
opts["num_cpus"] = eval(task["num_cpus"]) if isinstance(task["num_cpus"], str) else task["num_cpus"]
if "memory" in task:
opts["memory"] = eval(task["memory"]) if isinstance(task["memory"], str) else task["memory"]
return opts


@ray.remote
def remote_execute_job(job_cmd: str, identifier_string: str, test_mode: bool) -> str | dict:
return util.execute_job(
job_cmd=job_cmd,
identifier_string=identifier_string,
test_mode=test_mode,
log_all_output=True, # make log_all_output=True to check output in real time
)


def run_tasks(ray_address, pip, py_modules, tasks, test_mode=False):
if not tasks:
print("[WARNING]: no tasks to submit")
return

if not ray.is_initialized():
try:
ray.init(
address=ray_address,
log_to_driver=True,
runtime_env={
"pip": pip,
"py_modules": py_modules,
},
)
except Exception as e:
raise RuntimeError(f"initialize ray failed: {str(e)}")
task_results = []
for task in tasks:
opts = parse_task_opt(task)
task_cmd = " ".join([sys.executable, *task["py_args"].split()])
print(f"[INFO] submitting task {task['name']} with opts={opts}: {task_cmd}")
task_results.append(remote_execute_job.options(**opts).remote(task_cmd, task["name"], test_mode))

try:
results = ray.get(task_results)
for i, result in enumerate(results):
print(f"[INFO]: Task {tasks[i]['name']} result: \n{result}")
print("[INFO]: all tasks completed.")
except KeyboardInterrupt:
print("[INFO]: dealing with keyboard interrupt")
for future in task_results:
ray.cancel(future, force=True)
print("[INFO]: all tasks cancelled.")
sys.exit(1)
except Exception as e:
print(f"[ERROR]: error while running tasks: {str(e)}")
raise e


def main():
args = parse_args()
try:
with open(args.task_cfg) as f:
config = yaml.safe_load(f)
except Exception as e:
raise SystemExit(f"error while loading task config: {str(e)}")
tasks = config["tasks"]
py_modules = config.get("py_modules")
pip = config.get("pip")
run_tasks(
ray_address=args.ray_address,
pip=pip,
py_modules=py_modules,
tasks=tasks,
test_mode=args.test,
)


if __name__ == "__main__":
main()