-
Notifications
You must be signed in to change notification settings - Fork 21
Add RapidsMPF launcher rrun
#616
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Thanks @madsbk , I think I've addressed/responded to everything. Could you take another look when you have a chance? |
| // Write to temporary file | ||
| std::ofstream ofs(tmp_path, std::ios::binary | std::ios::trunc); | ||
| if (!ofs) { | ||
| throw std::runtime_error("Failed to open temporary file: " + tmp_path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping
| if (!ifs) { | ||
| throw std::runtime_error("Failed to open file for reading: " + path); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use RAPIDSMPF_EXPECTS()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also fixed in db112cb
cpp/tools/rrun.cpp
Outdated
| suppress_output->store(true, std::memory_order_relaxed); | ||
| // Forward signal to all local children | ||
| for (pid_t pid : pids) { | ||
| (void)kill(pid, sig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| (void)kill(pid, sig); | |
| std::ignore = kill(pid, sig); |
cpp/tools/rrun.cpp
Outdated
| { | ||
| std::error_code ec; | ||
| std::filesystem::remove_all(cfg.coord_dir, ec); | ||
| if (ec) { | ||
| std::cerr << "Warning: Failed to cleanup directory: " << cfg.coord_dir | ||
| << ": " << ec.message() << std::endl; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| { | |
| std::error_code ec; | |
| std::filesystem::remove_all(cfg.coord_dir, ec); | |
| if (ec) { | |
| std::cerr << "Warning: Failed to cleanup directory: " << cfg.coord_dir | |
| << ": " << ec.message() << std::endl; | |
| } | |
| } | |
| std::error_code ec; | |
| std::filesystem::remove_all(cfg.coord_dir, ec); | |
| if (ec) { | |
| std::cerr << "Warning: Failed to cleanup directory: " << cfg.coord_dir | |
| << ": " << ec.message() << std::endl; | |
| } |
Co-authored-by: Mads R. B. Kristensen <[email protected]>
Co-authored-by: Mads R. B. Kristensen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've addressed your new comments now @madsbk . Sorry again for missing to push some of the changes from yesterday.
| // Write to temporary file | ||
| std::ofstream ofs(tmp_path, std::ios::binary | std::ios::trunc); | ||
| if (!ofs) { | ||
| throw std::runtime_error("Failed to open temporary file: " + tmp_path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I committed these changes locally yesterday but didn't push them. 🤦
Fixed in db112cb
| if (!ifs) { | ||
| throw std::runtime_error("Failed to open file for reading: " + path); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also fixed in db112cb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @pentschev
cpp/src/bootstrap/file_backend.cpp
Outdated
| { | ||
| std::error_code ec; | ||
| std::filesystem::rename(tmp_path, path, ec); | ||
| if (ec) { | ||
| std::error_code rm_ec; | ||
| std::filesystem::remove(tmp_path, rm_ec); // Clean up temp file | ||
| RAPIDSMPF_FAIL( | ||
| "Failed to rename " + tmp_path + " to " + path + ": " + ec.message(), | ||
| std::runtime_error | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| { | |
| std::error_code ec; | |
| std::filesystem::rename(tmp_path, path, ec); | |
| if (ec) { | |
| std::error_code rm_ec; | |
| std::filesystem::remove(tmp_path, rm_ec); // Clean up temp file | |
| RAPIDSMPF_FAIL( | |
| "Failed to rename " + tmp_path + " to " + path + ": " + ec.message(), | |
| std::runtime_error | |
| ); | |
| } | |
| } | |
| std::error_code ec; | |
| std::filesystem::rename(tmp_path, path, ec); | |
| if (ec) { | |
| std::error_code rm_ec; | |
| std::filesystem::remove(tmp_path, rm_ec); // Clean up temp file | |
| RAPIDSMPF_FAIL( | |
| "Failed to rename " + tmp_path + " to " + path + ": " + ec.message(), | |
| std::runtime_error | |
| ); | |
| } |
cpp/tools/rrun.cpp
Outdated
| std::vector<int> gpus; | ||
| std::stringstream ss(gpu_str); | ||
| std::string item; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cpp/tools/rrun.cpp
Outdated
| RAPIDSMPF_FAIL("Invalid GPU ID: " + item, std::runtime_error); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Co-authored-by: Mads R. B. Kristensen <[email protected]>
This reverts commit db112cb.
|
@madsbk as discussed offline, we can't use |
Introduce a new launcher for RapidsMPF without dependencies on
mpirun. The new launcher currently namedrrun(as in RapidsMPF run) introduces basic functionality to launch processes locally to run on multiple-GPUs while providing an extension to allow it to support different backends, the only existing backend supports file-based synchronization, in the future we plan to extend it to support other synchronization mechanisms including over-the-network via specialized service, Slurm, as well as Kubernetes and others that may be requested. Additionally, it will support automatic discovery and configuration of system topology per process (i.e., per GPU).This initial implementation only supports local node multi-GPU, the implementation for multi-node multi-GPU already exists using SSH to spawn multiple processes but was split into a different PR to make this shorter.