diff --git a/README.md b/README.md index 3704f37bb..120cd7184 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,24 @@ The UCX test suite uses, for convenience, MPI to bootstrap, therefore we need to mpirun -np 2 cpp/build/gtests/ucxx_tests ``` +### rrun - Distributed Launcher + +RapidsMPF includes `rrun`, a lightweight launcher that eliminates the MPI dependency for multi-GPU workloads. This is particularly useful for development, testing, and environments where MPI is not available. + +#### Single-Node Usage + +```bash +# Build rrun +cd cpp/build +cmake --build . --target rrun + +# Launch 2 ranks in the local node +./tools/rrun -n 2 ./benchmarks/bench_comm -C ucxx -O all-to-all + +# With verbose output and specific GPUs +./tools/rrun -v -n 4 -g 0,1,2,3 ./benchmarks/bench_comm -C ucxx +``` + ## Algorithms ### Table Shuffle Service Example of a MPI program that uses the shuffler: diff --git a/ci/run_cpp_benchmark_smoketests.sh b/ci/run_cpp_benchmark_smoketests.sh index d27b04692..579cf8109 100755 --- a/ci/run_cpp_benchmark_smoketests.sh +++ b/ci/run_cpp_benchmark_smoketests.sh @@ -40,3 +40,15 @@ for i in {0..2}; do exit 1 fi done + +# Test with rrun + +# Confirm no dependencies on OpenMPI variables +unset OMPI_ALLOW_RUN_AS_ROOT +unset OMPI_ALLOW_RUN_AS_ROOT_CONFIRM +unset OMPI_MCA_opal_cuda_support + +python "${TIMEOUT_TOOL_PATH}" 30 \ + rrun -n 3 -g 0,0,0 ./bench_comm -m cuda -C ucxx +python "${TIMEOUT_TOOL_PATH}" 30 \ + rrun --tag-output -n 3 -g 0,0,0 ./bench_comm -m cuda -C ucxx diff --git a/conda/recipes/librapidsmpf/recipe.yaml b/conda/recipes/librapidsmpf/recipe.yaml index 2715fd928..8170b0f77 100644 --- a/conda/recipes/librapidsmpf/recipe.yaml +++ b/conda/recipes/librapidsmpf/recipe.yaml @@ -74,6 +74,7 @@ outputs: content: | cmake --install cpp/build cmake --install cpp/build --component=benchmarking + cmake --install cpp/build --component=tools dynamic_linking: overlinking_behavior: "error" prefix_detection: diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 9c5ea3d66..58d5b81fd 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -155,6 +155,8 @@ target_link_options(maybe_asan INTERFACE "$<$:-fsanitize add_library( rapidsmpf src/allgather/allgather.cpp + src/bootstrap/bootstrap.cpp + src/bootstrap/file_backend.cpp src/buffer/buffer.cpp src/buffer/pinned_memory_resource.cu src/buffer/resource.cpp @@ -190,7 +192,7 @@ if(RAPIDSMPF_HAVE_STREAMING) ) endif() if(RAPIDSMPF_HAVE_UCXX) - target_sources(rapidsmpf PRIVATE src/communicator/ucxx.cpp) + target_sources(rapidsmpf PRIVATE src/bootstrap/ucxx.cpp src/communicator/ucxx.cpp) endif() if(RAPIDSMPF_HAVE_MPI) target_sources(rapidsmpf PRIVATE src/communicator/mpi.cpp) @@ -324,6 +326,11 @@ if(RAPIDSMPF_BUILD_EXAMPLES) add_subdirectory(examples) endif() +# ################################################################################################## +# * add tools +# ------------------------------------------------------------------------------------- +add_subdirectory(tools) + # ################################################################################################## # * install targets ------------------------------------------------------------------------------- rapids_cmake_install_lib_dir(lib_dir) diff --git a/cpp/benchmarks/bench_comm.cpp b/cpp/benchmarks/bench_comm.cpp index a8a0b5884..3218b3a96 100644 --- a/cpp/benchmarks/bench_comm.cpp +++ b/cpp/benchmarks/bench_comm.cpp @@ -8,6 +8,8 @@ #include +#include +#include #include #include #include @@ -27,12 +29,15 @@ using namespace rapidsmpf; class ArgumentParser { public: - ArgumentParser(int argc, char* const* argv) { - RAPIDSMPF_EXPECTS(mpi::is_initialized() == true, "MPI is not initialized"); - - int rank, nranks; - RAPIDSMPF_MPI(MPI_Comm_rank(MPI_COMM_WORLD, &rank)); - RAPIDSMPF_MPI(MPI_Comm_size(MPI_COMM_WORLD, &nranks)); + ArgumentParser(int argc, char* const* argv, bool use_mpi = true) { + int rank = 0; + int nranks = 1; + + if (use_mpi) { + RAPIDSMPF_EXPECTS(mpi::is_initialized() == true, "MPI is not initialized"); + RAPIDSMPF_MPI(MPI_Comm_rank(MPI_COMM_WORLD, &rank)); + RAPIDSMPF_MPI(MPI_Comm_size(MPI_COMM_WORLD, &nranks)); + } try { int option; @@ -44,6 +49,8 @@ class ArgumentParser { ss << "Usage: " << argv[0] << " [options]\n" << "Options:\n" << " -C Communicator {mpi, ucxx} (default: mpi)\n" + << " ucxx automatically detects launcher (mpirun " + "or rrun)\n" << " -O Operation {all-to-all} (default: " "all-to-all)\n" << " -n Message size in bytes (default: 1M)\n" @@ -63,7 +70,11 @@ class ArgumentParser { if (rank == 0) { std::cerr << ss.str(); } - RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, 0)); + if (use_mpi) { + RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, 0)); + } else { + std::exit(0); + } } break; case 'C': @@ -73,7 +84,11 @@ class ArgumentParser { std::cerr << "-C (Communicator) must be one of {mpi, ucxx}" << std::endl; } - RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, -1)); + if (use_mpi) { + RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, -1)); + } else { + std::exit(-1); + } } break; case 'O': @@ -114,7 +129,11 @@ class ArgumentParser { break; #endif case '?': - RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, -1)); + if (use_mpi) { + RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, -1)); + } else { + std::exit(-1); + } break; default: RAPIDSMPF_FAIL("unknown option", std::invalid_argument); @@ -127,7 +146,11 @@ class ArgumentParser { if (rank == 0) { std::cerr << "Error parsing arguments: " << e.what() << std::endl; } - RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, -1)); + if (use_mpi) { + RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, -1)); + } else { + std::exit(-1); + } } if (rmm_mr == "cuda") { @@ -229,27 +252,49 @@ Duration run( } int main(int argc, char** argv) { - // Explicitly initialize MPI with thread support, as this is needed for both mpi and - // ucxx communicators. - int provided; - RAPIDSMPF_MPI(MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided)); - - RAPIDSMPF_EXPECTS( - provided == MPI_THREAD_MULTIPLE, - "didn't get the requested thread level support: MPI_THREAD_MULTIPLE" - ); + // Check if we should use bootstrap mode with rrun + // This is determined by checking for RAPIDSMPF_RANK environment variable + bool use_bootstrap = std::getenv("RAPIDSMPF_RANK") != nullptr; + + int provided = 0; + if (!use_bootstrap) { + // Explicitly initialize MPI with thread support, as this is needed for both mpi + // and ucxx communicators. + RAPIDSMPF_MPI(MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided)); + + RAPIDSMPF_EXPECTS( + provided == MPI_THREAD_MULTIPLE, + "didn't get the requested thread level support: MPI_THREAD_MULTIPLE" + ); + } - ArgumentParser args{argc, argv}; + ArgumentParser args{argc, argv, !use_bootstrap}; // Initialize configuration options from environment variables. rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()}; std::shared_ptr comm; if (args.comm_type == "mpi") { + if (use_bootstrap) { + std::cerr << "Error: MPI communicator requires MPI initialization. " + << "Don't use with rrun or unset RAPIDSMPF_RANK." << std::endl; + return 1; + } mpi::init(&argc, &argv); comm = std::make_shared(MPI_COMM_WORLD, options); - } else { // ucxx - comm = rapidsmpf::ucxx::init_using_mpi(MPI_COMM_WORLD, options); + } else if (args.comm_type == "ucxx") { + if (use_bootstrap) { + // Launched with rrun - use bootstrap backend + comm = rapidsmpf::bootstrap::create_ucxx_comm( + rapidsmpf::bootstrap::Backend::AUTO, options + ); + } else { + // Launched with mpirun - use MPI bootstrap + comm = rapidsmpf::ucxx::init_using_mpi(MPI_COMM_WORLD, options); + } + } else { + std::cerr << "Error: Unknown communicator type: " << args.comm_type << std::endl; + return 1; } auto& log = comm->logger(); @@ -350,6 +395,8 @@ int main(int argc, char** argv) { } #endif - RAPIDSMPF_MPI(MPI_Finalize()); + if (!use_bootstrap) { + RAPIDSMPF_MPI(MPI_Finalize()); + } return 0; } diff --git a/cpp/include/rapidsmpf/bootstrap/bootstrap.hpp b/cpp/include/rapidsmpf/bootstrap/bootstrap.hpp new file mode 100644 index 000000000..4a9def3bb --- /dev/null +++ b/cpp/include/rapidsmpf/bootstrap/bootstrap.hpp @@ -0,0 +1,139 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +namespace rapidsmpf::bootstrap { + +/// @brief Type alias for communicator::Rank +using Rank = std::int32_t; + +/// @brief Type alias for duration type +using rapidsmpf::Duration; + +/** + * @brief Backend types for process coordination and bootstrapping. + */ +enum class Backend { + /** + * @brief Automatically detect the best backend based on environment. + * + * Detection order: + * 1. File-based (default fallback) + */ + AUTO, + + /** + * @brief File-based coordination using a shared directory. + * + * Uses filesystem for rank coordination and address exchange. Works on single-node + * and multi-node with shared storage (e.g., NFS) via SSH. Requires RAPIDSMPF_RANK, + * RAPIDSMPF_NRANKS, RAPIDSMPF_COORD_DIR environment variables. + */ + FILE, +}; + +/** + * @brief Context information for the current process/rank. + * + * This structure contains the rank assignment and total rank count, + * along with additional metadata about the execution environment. + */ +struct Context { + /** @brief This process's rank (0-indexed). */ + Rank rank; + + /** @brief Total number of ranks in the job. */ + Rank nranks; + + /** @brief Backend used for coordination. */ + Backend backend; + + /** @brief Coordination directory (for FILE backend). */ + std::optional coord_dir; +}; + +/** + * @brief Initialize the bootstrap context from environment variables. + * + * This function reads environment variables to determine rank, nranks, and + * backend configuration. It should be called early in the application lifecycle. + * + * Environment variables checked (in order of precedence): + * - RAPIDSMPF_RANK: Explicitly set rank + * - RAPIDSMPF_NRANKS: Explicitly set total rank count + * - RAPIDSMPF_COORD_DIR: File-based coordination directory + * + * @param backend Backend to use (default: AUTO for auto-detection). + * @return Context object containing rank and coordination information. + * @throws std::runtime_error if environment is not properly configured. + * + * @code + * auto ctx = rapidsmpf::bootstrap::init(); + * std::cout << "I am rank " << ctx.rank << " of " << ctx.nranks << std::endl; + * @endcode + */ +Context init(Backend backend = Backend::AUTO); + +/** + * @brief Broadcast data from root rank to all other ranks. + * + * This is a helper function for broadcasting small amounts of data during + * bootstrapping. It uses the underlying backend's coordination mechanism. + * + * @param ctx Bootstrap context. + * @param data Data buffer to broadcast (both input on root, output on others). + * @param size Size of data in bytes. + * @param root Root rank performing the broadcast (default: 0). + */ +void broadcast(Context const& ctx, void* data, std::size_t size, Rank root = 0); + +/** + * @brief Perform a barrier synchronization across all ranks. + * + * This ensures all ranks reach this point before any rank proceeds. + * + * @param ctx Bootstrap context. + */ +void barrier(Context const& ctx); + +/** + * @brief Store a key-value pair in the coordination backend. + * + * This is useful for custom coordination beyond UCXX address exchange. + * + * @param ctx Bootstrap context. + * @param key Key name. + * @param value Value to store. + */ +void put(Context const& ctx, std::string const& key, std::string const& value); + +/** + * @brief Retrieve a value from the coordination backend. + * + * This function blocks until the key is available or timeout occurs. + * + * @param ctx Bootstrap context. + * @param key Key name to retrieve. + * @param timeout Timeout duration. + * @return Value associated with the key. + * @throws std::runtime_error if key not found within timeout. + */ +std::string get( + Context const& ctx, + std::string const& key, + Duration timeout = std::chrono::seconds{30} +); + +} // namespace rapidsmpf::bootstrap diff --git a/cpp/include/rapidsmpf/bootstrap/file_backend.hpp b/cpp/include/rapidsmpf/bootstrap/file_backend.hpp new file mode 100644 index 000000000..87590da03 --- /dev/null +++ b/cpp/include/rapidsmpf/bootstrap/file_backend.hpp @@ -0,0 +1,140 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include + +#include + +namespace rapidsmpf::bootstrap::detail { + +/** + * @brief File-based coordination backend implementation. + * + * This class implements coordination using a shared directory on the filesystem. + * It creates lock files and data files to coordinate between ranks. + * + * Directory structure: + * ``` + * / + * ├── rank__alive # Created by each rank to signal presence + * ├── kv/ + * │ ├── # Key-value pairs + * │ └── + * └── barriers/ + * └── barrier_ # Barrier synchronization + * ``` + */ +class FileBackend { + public: + /** + * @brief Construct a file backend. + * + * @param ctx Bootstrap context containing rank and coordination directory. + */ + explicit FileBackend(Context ctx); + + ~FileBackend(); + + /** + * @brief Store a key-value pair. + * + * @param key Key name. + * @param value Value to store. + */ + void put(std::string const& key, std::string const& value); + + /** + * @brief Retrieve a value, blocking until available or timeout occurs. + * + * @param key Key name. + * @param timeout Timeout duration. + * @return Value associated with key. + */ + std::string get(std::string const& key, Duration timeout); + + /** + * @brief Perform a barrier synchronization. + * + * All ranks must call this before any rank proceeds. + */ + void barrier(); + + /** + * @brief Broadcast data from root to all ranks. + * + * @param data Data buffer. + * @param size Size in bytes. + * @param root Root rank. + */ + void broadcast(void* data, std::size_t size, Rank root); + + private: + Context ctx_; + std::string coord_dir_; + std::string kv_dir_; + std::string barrier_dir_; + std::size_t barrier_count_{0}; + + /** + * @brief Get path for a key-value file. + * + * @param key Key name + */ + [[nodiscard]] std::string get_kv_path(std::string const& key) const; + + /** + * @brief Get path for a barrier file. + * + * @param barrier_id Unique barrier identifier. + */ + [[nodiscard]] std::string get_barrier_path(std::size_t barrier_id) const; + + /** + * @brief Get path for rank alive file. + * + * @param rank Rank to retrieve file. + */ + [[nodiscard]] std::string get_rank_alive_path(Rank rank) const; + + /** + * @brief Wait for a file to exist. + * + * @param path File path. + * @param timeout Timeout duration. + * @return true if file exists within timeout, false otherwise. + */ + bool wait_for_file( + std::string const& path, Duration timeout = std::chrono::seconds{30} + ); + + /** + * @brief Write string to file atomically. + * + * @param path Path to file. + * @param content Content to write. + */ + void write_file(std::string const& path, std::string const& content); + + /** + * @brief Read string from file. + * + * @param path Path to file. + */ + std::string read_file(std::string const& path); + + /** + * @brief Clean up coordination directory after all ranks are done. + * + * This method can be called by all ranks, but only rank 0 performs the actual + * cleanup. Rank 0 waits for all other ranks to finish before removing the + * coordination directory. Non-zero ranks will return immediately (no-op). + */ + void cleanup_coordination_directory(); +}; + +} // namespace rapidsmpf::bootstrap::detail diff --git a/cpp/include/rapidsmpf/bootstrap/ucxx.hpp b/cpp/include/rapidsmpf/bootstrap/ucxx.hpp new file mode 100644 index 000000000..30ef58386 --- /dev/null +++ b/cpp/include/rapidsmpf/bootstrap/ucxx.hpp @@ -0,0 +1,53 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#ifdef RAPIDSMPF_HAVE_UCXX + +#include + +#include + +namespace rapidsmpf { + +namespace ucxx { +class UCXX; +} + +namespace bootstrap { + +/** + * @brief Create a UCXX communicator using the bootstrap backend. + * + * This function creates a fully initialized UCXX communicator by: + * 1. Initializing the bootstrap context (rank, nranks) + * 2. If rank 0: Creating UCXX root and publishing its address + * 3. If rank != 0: Retrieving root address and connecting + * 4. Performing a barrier to ensure all ranks are connected + * + * The function handles all coordination transparently based on the detected + * or specified backend. + * + * @param backend Backend to use (default: AUTO for auto-detection). + * @param options Configuration options for the UCXX communicator. + * @return Shared pointer to initialized UCXX communicator. + * @throws std::runtime_error if initialization fails. + * + * @code + * auto comm = rapidsmpf::bootstrap::create_ucxx_comm(); + * comm->logger().print("Hello from rank " + std::to_string(comm->rank())); + * @endcode + */ +std::shared_ptr create_ucxx_comm( + Backend backend = Backend::AUTO, config::Options options = config::Options{} +); + +} // namespace bootstrap +} // namespace rapidsmpf + +#endif // RAPIDSMPF_HAVE_UCXX diff --git a/cpp/src/bootstrap/bootstrap.cpp b/cpp/src/bootstrap/bootstrap.cpp new file mode 100644 index 000000000..4beceaa88 --- /dev/null +++ b/cpp/src/bootstrap/bootstrap.cpp @@ -0,0 +1,174 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include + +#include +#include +#include + +// NOTE: Do not use RAPIDSMPF_EXPECTS or RAPIDSMPF_FAIL in this file. +// Using these macros introduces a CUDA dependency via rapidsmpf/error.hpp. +// Prefer throwing standard exceptions instead. + +namespace rapidsmpf::bootstrap { +namespace { + +/** + * @brief Get environment variable as string. + */ +std::optional getenv_optional(std::string_view name) { + // std::getenv requires a null-terminated string; construct a std::string + // to ensure this even when called with a non-literal std::string_view. + char const* value = std::getenv(std::string{name}.c_str()); + if (value == nullptr) { + return std::nullopt; + } + return std::string{value}; +} + +/** + * @brief Parse integer from environment variable. + */ +std::optional getenv_int(std::string_view name) { + auto value = getenv_optional(name); + if (!value) { + return std::nullopt; + } + try { + return std::stoi(*value); + } catch (...) { + throw std::runtime_error( + std::string{"Failed to parse integer from environment variable "} + + std::string{name} + ": " + *value + ); + } +} + +/** + * @brief Detect backend from environment variables. + */ +Backend detect_backend() { + // Check for file-based coordination + if (getenv_optional("RAPIDSMPF_COORD_DIR")) { + return Backend::FILE; + } + + // Default to file-based + return Backend::FILE; +} +} // namespace + +Context init(Backend backend) { + Context ctx; + ctx.backend = (backend == Backend::AUTO) ? detect_backend() : backend; + + // Get rank and nranks based on backend + switch (ctx.backend) { + case Backend::FILE: + { + // Require explicit RAPIDSMPF_RANK and RAPIDSMPF_NRANKS + auto rank_opt = getenv_int("RAPIDSMPF_RANK"); + auto nranks_opt = getenv_int("RAPIDSMPF_NRANKS"); + auto coord_dir_opt = getenv_optional("RAPIDSMPF_COORD_DIR"); + + if (!rank_opt.has_value()) { + throw std::runtime_error( + "RAPIDSMPF_RANK environment variable not set. " + "Set it or use a launcher like 'rrun'." + ); + } + + if (!nranks_opt.has_value()) { + throw std::runtime_error( + "RAPIDSMPF_NRANKS environment variable not set. " + "Set it or use a launcher like 'rrun'." + ); + } + + if (!coord_dir_opt.has_value()) { + throw std::runtime_error( + "RAPIDSMPF_COORD_DIR environment variable not set. " + "Set it or use a launcher like 'rrun'." + ); + } + + ctx.rank = static_cast(*rank_opt); + ctx.nranks = static_cast(*nranks_opt); + ctx.coord_dir = *coord_dir_opt; + + if (!(ctx.rank >= 0 && ctx.rank < ctx.nranks)) { + throw std::runtime_error( + "Invalid rank: RAPIDSMPF_RANK=" + std::to_string(ctx.rank) + + " must be in range [0, " + std::to_string(ctx.nranks) + ")" + ); + } + break; + } + case Backend::AUTO: + { + // Should have been resolved above + throw std::logic_error("Backend::AUTO should have been resolved"); + } + } + return ctx; +} + +void broadcast(Context const& ctx, void* data, std::size_t size, Rank root) { + switch (ctx.backend) { + case Backend::FILE: + { + detail::FileBackend backend{ctx}; + backend.broadcast(data, size, root); + break; + } + default: + throw std::runtime_error("broadcast not implemented for this backend"); + } +} + +void barrier(Context const& ctx) { + switch (ctx.backend) { + case Backend::FILE: + { + detail::FileBackend backend{ctx}; + backend.barrier(); + break; + } + default: + throw std::runtime_error("barrier not implemented for this backend"); + } +} + +void put(Context const& ctx, std::string const& key, std::string const& value) { + switch (ctx.backend) { + case Backend::FILE: + { + detail::FileBackend backend{ctx}; + backend.put(key, value); + break; + } + default: + throw std::runtime_error("put not implemented for this backend"); + } +} + +std::string get(Context const& ctx, std::string const& key, Duration timeout) { + switch (ctx.backend) { + case Backend::FILE: + { + detail::FileBackend backend{ctx}; + return backend.get(key, timeout); + } + default: + throw std::runtime_error("get not implemented for this backend"); + } +} + +} // namespace rapidsmpf::bootstrap diff --git a/cpp/src/bootstrap/file_backend.cpp b/cpp/src/bootstrap/file_backend.cpp new file mode 100644 index 000000000..ddadbb05d --- /dev/null +++ b/cpp/src/bootstrap/file_backend.cpp @@ -0,0 +1,293 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +// NOTE: Do not use RAPIDSMPF_EXPECTS or RAPIDSMPF_FAIL in this file. +// Using these macros introduces a CUDA dependency via rapidsmpf/error.hpp. +// Prefer throwing standard exceptions instead. + +namespace rapidsmpf::bootstrap::detail { + +FileBackend::FileBackend(Context ctx) : ctx_{std::move(ctx)} { + if (!ctx_.coord_dir.has_value()) { + throw std::runtime_error("FileBackend requires coord_dir in context"); + } + + coord_dir_ = *ctx_.coord_dir; + kv_dir_ = coord_dir_ + "/kv"; + barrier_dir_ = coord_dir_ + "/barriers"; + + try { + std::filesystem::create_directories(coord_dir_); + std::filesystem::create_directories(kv_dir_); + std::filesystem::create_directories(barrier_dir_); + } catch (std::exception const& e) { + throw std::runtime_error( + "Failed to initialize coordination directory structure: " + + std::string{e.what()} + ); + } + + // Create rank alive file + write_file(get_rank_alive_path(ctx_.rank), std::to_string(getpid())); + + // Note: Do not block in the constructor. Ranks only create their alive file + // and continue. Synchronization occurs where needed (e.g., get/put/barrier). +} + +FileBackend::~FileBackend() { + // Clean up rank alive file + try { + std::error_code ec; + if (!std::filesystem::remove(get_rank_alive_path(ctx_.rank), ec) && ec) { + std::cerr << "Error removing rank alive file: " << ec.message() << std::endl; + } + } catch (const std::exception& e) { + std::cerr << "Exception during rank alive file cleanup: " << e.what() + << std::endl; + } + cleanup_coordination_directory(); +} + +void FileBackend::put(std::string const& key, std::string const& value) { + std::string path = get_kv_path(key); + write_file(get_kv_path(key), value); +} + +std::string FileBackend::get(std::string const& key, Duration timeout) { + std::string path = get_kv_path(key); + auto timeout_ms = std::chrono::duration_cast(timeout); + if (!wait_for_file(path, timeout_ms)) { + throw std::runtime_error( + "Key '" + key + "' not available within " + std::to_string(timeout.count()) + + "s timeout" + ); + } + return read_file(path); +} + +void FileBackend::barrier() { + std::size_t barrier_id = barrier_count_++; + std::string my_barrier_file = + get_barrier_path(barrier_id) + "." + std::to_string(ctx_.rank); + + // Each rank creates its barrier file + write_file(my_barrier_file, "1"); + + // Wait for all other ranks + for (Rank r = 0; r < ctx_.nranks; ++r) { + if (r == ctx_.rank) { + continue; + } + + std::string other_barrier_file = + get_barrier_path(barrier_id) + "." + std::to_string(r); + if (!wait_for_file(other_barrier_file, std::chrono::milliseconds{60000})) { + throw std::runtime_error( + "Barrier timeout: rank " + std::to_string(r) + " did not arrive" + ); + } + } + + // Clean up our barrier file + std::error_code ec; + std::filesystem::remove(my_barrier_file, ec); +} + +void FileBackend::broadcast(void* data, std::size_t size, Rank root) { + if (ctx_.rank == root) { + // Root writes data + std::string bcast_data{static_cast(data), size}; + put("broadcast_" + std::to_string(root), bcast_data); + } else { + // Non-root reads data + std::string bcast_data = + get("broadcast_" + std::to_string(root), std::chrono::seconds{30}); + if (bcast_data.size() != size) { + throw std::runtime_error( + "Broadcast size mismatch: expected " + std::to_string(size) + ", got " + + std::to_string(bcast_data.size()) + ); + } + std::memcpy(data, bcast_data.data(), size); + } + barrier(); +} + +std::string FileBackend::get_kv_path(std::string const& key) const { + return kv_dir_ + "/" + key; +} + +std::string FileBackend::get_barrier_path(std::size_t barrier_id) const { + return barrier_dir_ + "/barrier_" + std::to_string(barrier_id); +} + +std::string FileBackend::get_rank_alive_path(Rank rank) const { + return coord_dir_ + "/rank_" + std::to_string(rank) + "_alive"; +} + +bool FileBackend::wait_for_file(std::string const& path, Duration timeout) { + auto start = std::chrono::steady_clock::now(); + auto poll_interval = std::chrono::milliseconds{10}; + + // NFS visibility aid: derive parent directory to refresh its metadata + std::string parent_dir; + { + auto pos = path.find_last_of('/'); + if (pos != std::string::npos && pos > 0) { + parent_dir = path.substr(0, pos); + } + } + auto last_dir_scan = start - std::chrono::milliseconds{1000}; + + while (true) { + std::error_code ec; + if (std::filesystem::exists(path, ec)) { + return true; // File exists + } + + auto elapsed = std::chrono::steady_clock::now() - start; + if (elapsed >= timeout) { + return false; // Timeout + } + + // Hint NFS to refresh directory cache: status and occasionally iterate directory + // on parent. Without this remote processes may timeout to spawn because NFS never + // refreshes. + if (!parent_dir.empty()) { + std::ignore = std::filesystem::status(parent_dir, ec); + + auto now = std::chrono::steady_clock::now(); + if (now - last_dir_scan >= std::chrono::milliseconds{500}) { + std::filesystem::directory_iterator it( + parent_dir, + std::filesystem::directory_options::skip_permission_denied, + ec + ); + if (!ec) { + for (; it != std::filesystem::directory_iterator(); ++it) { + std::ignore = + it->path(); // no-op; traversal nudges directory cache + } + } + last_dir_scan = now; + } + } + + // Sleep before next poll + std::this_thread::sleep_for(poll_interval); + + // Exponential backoff up to 100ms + if (poll_interval < std::chrono::milliseconds{100}) { + poll_interval = std::min(poll_interval * 2, std::chrono::milliseconds{100}); + } + } +} + +void FileBackend::write_file(std::string const& path, std::string const& content) { + std::string tmp_path = path + ".tmp." + std::to_string(getpid()); + + // Write to temporary file + std::ofstream ofs(tmp_path, std::ios::binary | std::ios::trunc); + if (!ofs) { + throw std::runtime_error("Failed to open temporary file: " + tmp_path); + } + ofs << content; + ofs.close(); + + // Atomic rename + std::error_code ec; + std::filesystem::rename(tmp_path, path, ec); + if (ec) { + std::error_code rm_ec; + std::filesystem::remove(tmp_path, rm_ec); // Clean up temp file + throw std::runtime_error( + "Failed to rename " + tmp_path + " to " + path + ": " + ec.message() + ); + } +} + +std::string FileBackend::read_file(std::string const& path) { + std::ifstream ifs(path, std::ios::binary); + if (!ifs) { + throw std::runtime_error("Failed to open file for reading: " + path); + } + + std::stringstream buffer; + buffer << ifs.rdbuf(); + return buffer.str(); +} + +void FileBackend::cleanup_coordination_directory() { + // Only rank 0 performs cleanup; other ranks return immediately + if (ctx_.rank != 0) { + return; + } + + // Wait for all other ranks to clean up their alive files + auto cleanup_timeout = std::chrono::seconds{30}; + auto start = std::chrono::steady_clock::now(); + auto poll_interval = std::chrono::milliseconds{100}; + + bool all_ranks_done = false; + while (!all_ranks_done) { + all_ranks_done = true; + + // Check if all other ranks' alive files are gone + for (Rank r = 0; r < ctx_.nranks; ++r) { + if (r == ctx_.rank) { + continue; + } + + std::error_code ec; + std::string alive_path = get_rank_alive_path(r); + if (std::filesystem::exists(alive_path, ec)) { + all_ranks_done = false; + break; + } + } + + if (all_ranks_done) { + break; + } + + // Check timeout + auto elapsed = std::chrono::steady_clock::now() - start; + if (elapsed >= cleanup_timeout) { + std::cerr << "Warning: Timeout waiting for all ranks to finish. " + << "Some alive files may still exist. Proceeding with cleanup." + << std::endl; + break; + } + + // Sleep before next poll + std::this_thread::sleep_for(poll_interval); + } + + // Clean up the entire coordination directory + try { + std::error_code ec; + if (std::filesystem::remove_all(coord_dir_, ec) == 0 && ec) { + std::cerr << "Warning: Failed to remove coordination directory '" + << coord_dir_ << "': " << ec.message() << std::endl; + } + } catch (const std::exception& e) { + std::cerr << "Exception during coordination directory cleanup: " << e.what() + << std::endl; + } +} +} // namespace rapidsmpf::bootstrap::detail diff --git a/cpp/src/bootstrap/ucxx.cpp b/cpp/src/bootstrap/ucxx.cpp new file mode 100644 index 000000000..97832db3f --- /dev/null +++ b/cpp/src/bootstrap/ucxx.cpp @@ -0,0 +1,57 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#ifdef RAPIDSMPF_HAVE_UCXX + +#include +#include + +#include + +#include +#include +#include + +namespace rapidsmpf::bootstrap { + +std::shared_ptr create_ucxx_comm(Backend backend, config::Options options) { + auto ctx = init(backend); + + // Ensure CUDA context is created before UCX is initialized + cudaFree(nullptr); + + std::shared_ptr comm; + + if (ctx.rank == 0) { + // Create root UCXX communicator + auto ucxx_initialized_rank = + ucxx::init(nullptr, ctx.nranks, std::nullopt, options); + comm = std::make_shared(std::move(ucxx_initialized_rank), options); + + // Get the listener address and publish + auto listener_address = comm->listener_address(); + auto root_worker_address_str = + std::get>(listener_address.address) + ->getString(); + put(ctx, "ucxx_root_address", root_worker_address_str); + } else { + // Worker ranks retrieve the root address and connect + auto root_worker_address_str = + get(ctx, "ucxx_root_address", std::chrono::seconds{30}); + auto root_worker_address = + ::ucxx::createAddressFromString(root_worker_address_str); + + auto ucxx_initialized_rank = + ucxx::init(nullptr, ctx.nranks, root_worker_address, options); + comm = std::make_shared(std::move(ucxx_initialized_rank), options); + } + comm->barrier(); + return comm; +} +} // namespace rapidsmpf::bootstrap + +#endif // RAPIDSMPF_HAVE_UCXX diff --git a/cpp/tools/CMakeLists.txt b/cpp/tools/CMakeLists.txt new file mode 100644 index 000000000..540e38d0f --- /dev/null +++ b/cpp/tools/CMakeLists.txt @@ -0,0 +1,24 @@ +# ================================================================================= +# cmake-format: off +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +# cmake-format: on +# ================================================================================= + +# rrun launcher tool +add_executable(rrun "rrun.cpp") +set_target_properties( + rrun + PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${RAPIDSMPF_BINARY_DIR}/tools" + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON +) +target_include_directories(rrun PRIVATE "$") +target_compile_options(rrun PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>") +target_link_libraries(rrun PRIVATE Threads::Threads $ maybe_asan) +install( + TARGETS rrun + COMPONENT tools + DESTINATION bin + EXCLUDE_FROM_ALL +) diff --git a/cpp/tools/example_hostfile.txt b/cpp/tools/example_hostfile.txt new file mode 100644 index 000000000..4bfb759f9 --- /dev/null +++ b/cpp/tools/example_hostfile.txt @@ -0,0 +1,22 @@ +# Example hostfile for rrun multi-node launches +# Format: hostname [slots=N] [gpus=list] +# +# Lines starting with # are comments + +# Example 1: Node with 4 slots and 4 GPUs explicitly listed +# node1 slots=4 gpus=0,1,2,3 + +# Example 2: Node with 4 slots, GPUs will be auto-assigned +# node2 slots=4 + +# Example 3: Node with default 1 slot +# node3 + +# Localhost example for testing (works on single node) +localhost slots=2 gpus=0,1 + +# For actual multi-node deployment, replace with your node names: +# dgx01 slots=8 gpus=0,1,2,3,4,5,6,7 +# dgx02 slots=8 gpus=0,1,2,3,4,5,6,7 +# dgx03 slots=8 gpus=0,1,2,3,4,5,6,7 +# dgx04 slots=8 gpus=0,1,2,3,4,5,6,7 diff --git a/cpp/tools/rrun.cpp b/cpp/tools/rrun.cpp new file mode 100644 index 000000000..93379905d --- /dev/null +++ b/cpp/tools/rrun.cpp @@ -0,0 +1,604 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * @brief Process launcher for multi-GPU applications (single-node). + * + * rrun is a lightweight alternative to mpirun that: + * - Launches multiple processes locally without requiring MPI + * - Automatically assigns GPUs to ranks + * - Provides file-based coordination for inter-process synchronization + * - Tags process output with rank numbers (--tag-output feature) + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +// NOTE: Do not use RAPIDSMPF_EXPECTS or RAPIDSMPF_FAIL in this file. +// Using these macros introduces a CUDA dependency via rapidsmpf/error.hpp. +// Prefer throwing standard exceptions instead. + +namespace { + +static std::mutex output_mutex; + +/** + * @brief Configuration for the rrun launcher. + */ +struct Config { + int nranks{1}; // Total number of ranks + std::string app_binary; // Application binary path + std::vector app_args; // Arguments to pass to application + std::vector gpus; // GPU IDs to use + std::string coord_dir; // Coordination directory + std::map env_vars; // Environment variables to pass + bool verbose{false}; // Verbose output + bool cleanup{true}; // Cleanup coordination directory on exit + bool tag_output{false}; // Tag output with rank number +}; + +/** + * @brief Generate a random session ID for coordination directory. + */ +std::string generate_session_id() { + static char const chars[] = + "0123456789" + "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, sizeof(chars) - 2); + + std::string result; + result.reserve(8); + for (int i = 0; i < 8; ++i) { + result += chars[dis(gen)]; + } + return result; +} + +/** + * @brief Detect available GPUs on the system. + * + * Currently using nvidia-smi to detect GPUs. This may be replaced with NVML in the + * future. + * + * @return Vector of monotonically increasing GPU indices, as observed in nvidia-smi. + */ +std::vector detect_gpus() { + // Use nvidia-smi to detect GPUs + FILE* pipe = + popen("nvidia-smi --query-gpu=index --format=csv,noheader 2>/dev/null", "r"); + if (!pipe) { + std::cerr << "Warning: Could not detect GPUs using nvidia-smi" << std::endl; + return {}; + } + + std::vector gpus; + char buffer[128]; + while (fgets(buffer, sizeof(buffer), pipe) != nullptr) { + int gpu_id; + if (sscanf(buffer, "%d", &gpu_id) == 1) { + gpus.push_back(gpu_id); + } + } + pclose(pipe); + return gpus; +} + +/** + * @brief Print usage information. + */ +void print_usage(std::string_view prog_name) { + std::cout + << "rrun - RapidsMPF Process Launcher\n\n" + << "Usage: " << prog_name << " [options] [app_args...]\n\n" + << "Single-Node Options:\n" + << " -n Number of ranks to launch (required)\n" + << " -g Comma-separated list of GPU IDs (e.g., 0,1,2,3)\n" + << " If not specified, auto-detect available GPUs\n\n" + << "Common Options:\n" + << " -d Coordination directory (default: /tmp/rrun_)\n" + << " --tag-output Tag stdout and stderr with rank number\n" + << " -x, --set-env \n" + << " Set environment variable for all ranks\n" + << " Can be specified multiple times\n" + << " -v Verbose output\n" + << " --no-cleanup Don't cleanup coordination directory on exit\n" + << " -h, --help Display this help message\n\n" + << "Environment Variables:\n" + << " CUDA_VISIBLE_DEVICES is set for each rank based on GPU assignment\n" + << " Additional environment variables can be passed with -x/--set-env\n\n" + << "Single-Node Examples:\n" + << " # Launch 2 ranks with auto-detected GPUs:\n" + << " rrun -n 2 ./bench_comm -C ucxx -O all-to-all\n\n" + << " # Launch 4 ranks on specific GPUs:\n" + << " rrun -n 4 -g 0,1,2,3 ./bench_comm -C ucxx\n\n" + << " # Launch with custom environment variables:\n" + << " rrun -n 2 -x UCX_TLS=cuda_copy,cuda_ipc,rc,tcp -x MY_VAR=value " + "./bench_comm\n\n" + << std::endl; +} + +/** + * @brief Parse GPU list from comma-separated string. + */ +std::vector parse_gpu_list(std::string const& gpu_str) { + std::vector gpus; + std::stringstream ss(gpu_str); + std::string item; + while (std::getline(ss, item, ',')) { + try { + gpus.push_back(std::stoi(item)); + } catch (...) { + throw std::runtime_error("Invalid GPU ID: " + item); + } + } + return gpus; +} + +/** + * @brief Parse command-line arguments. + */ +Config parse_args(int argc, char* argv[]) { + Config cfg; + int i = 1; + while (i < argc) { + std::string arg = argv[i]; + + if (arg == "-h" || arg == "--help") { + print_usage(argv[0]); + exit(0); + } else if (arg == "-n") { + if (i + 1 >= argc) { + throw std::runtime_error("Missing argument for -n"); + } + cfg.nranks = std::stoi(argv[++i]); + if (cfg.nranks <= 0) { + throw std::runtime_error( + "Invalid number of ranks: " + std::to_string(cfg.nranks) + ); + } + } else if (arg == "-g") { + if (i + 1 >= argc) { + throw std::runtime_error("Missing argument for -g"); + } + cfg.gpus = parse_gpu_list(argv[++i]); + } else if (arg == "--tag-output") { + cfg.tag_output = true; + } else if (arg == "-d") { + if (i + 1 >= argc) { + throw std::runtime_error("Missing argument for -d"); + } + cfg.coord_dir = argv[++i]; + } else if (arg == "-x" || arg == "--set-env") { + if (i + 1 >= argc) { + throw std::runtime_error("Missing argument for -x/--set-env"); + } + std::string env_spec = argv[++i]; + auto eq_pos = env_spec.find('='); + if (eq_pos == std::string::npos) { + throw std::runtime_error( + "Invalid environment variable format: " + env_spec + + ". Expected VAR=value" + ); + } + std::string var_name = env_spec.substr(0, eq_pos); + std::string var_value = env_spec.substr(eq_pos + 1); + if (var_name.empty()) { + throw std::runtime_error("Empty environment variable name"); + } + cfg.env_vars[var_name] = var_value; + } else if (arg == "-v") { + cfg.verbose = true; + } else if (arg == "--no-cleanup") { + cfg.cleanup = false; + } else if (arg[0] == '-') { + throw std::runtime_error("Unknown option: " + arg); + } else { + // First non-option argument is the application binary + cfg.app_binary = arg; + // Rest are application arguments + for (int j = i + 1; j < argc; ++j) { + cfg.app_args.push_back(argv[j]); + } + break; + } + ++i; + } + + // Validate configuration + if (cfg.app_binary.empty()) { + throw std::runtime_error("Missing application binary"); + } + + // Single-node mode validation + if (cfg.nranks <= 0) { + throw std::runtime_error("Number of ranks (-n) must be specified and positive"); + } + + // Auto-detect GPUs if not specified + if (cfg.gpus.empty()) { + cfg.gpus = detect_gpus(); + if (cfg.gpus.empty()) { + std::cerr + << "Warning: No GPUs detected. CUDA_VISIBLE_DEVICES will not be set." + << std::endl; + } + } + + // Validate GPU count vs rank count + if (!cfg.gpus.empty() && cfg.nranks > static_cast(cfg.gpus.size())) { + std::cerr << "Warning: Number of ranks (" << cfg.nranks + << ") exceeds number of GPUs (" << cfg.gpus.size() + << "). Multiple ranks will share GPUs." << std::endl; + } + + // Generate coordination directory if not specified + if (cfg.coord_dir.empty()) { + cfg.coord_dir = "/tmp/rrun_" + generate_session_id(); + } + + return cfg; +} + +/** + * @brief Helper to fork a child with stdout/stderr redirected to pipes. + * + * @param out_fd_stdout The file descriptor for stdout. + * @param out_fd_stderr The file descriptor for stderr. + * @param combine_stderr If true, stderr is redirected to stdout pipe. + * @param child_body The function to execute in the child process. Must not throw, must + * not return. Must only call exit() or _exit() if an error occurs. + * @returns Child pid. + */ +pid_t fork_with_piped_stdio( + int* out_fd_stdout, + int* out_fd_stderr, + bool combine_stderr, + std::function child_body +) { + if (out_fd_stdout) + *out_fd_stdout = -1; + if (out_fd_stderr) + *out_fd_stderr = -1; + + int pipe_out[2] = {-1, -1}; + int pipe_err[2] = {-1, -1}; + if (pipe(pipe_out) < 0) + throw std::runtime_error( + "Failed to create stdout pipe: " + std::string{std::strerror(errno)} + ); + if (!combine_stderr) { + if (pipe(pipe_err) < 0) { + close(pipe_out[0]); + close(pipe_out[1]); + throw std::runtime_error( + "Failed to create stderr pipe: " + std::string{std::strerror(errno)} + ); + } + } + + pid_t pid = fork(); + if (pid < 0) { + close(pipe_out[0]); + close(pipe_out[1]); + if (!combine_stderr) { + close(pipe_err[0]); + close(pipe_err[1]); + } + throw std::runtime_error("Failed to fork: " + std::string{std::strerror(errno)}); + } else if (pid == 0) { + // Child: redirect stdout/stderr + std::ignore = dup2(pipe_out[1], STDOUT_FILENO); + std::ignore = dup2(combine_stderr ? pipe_out[1] : pipe_err[1], STDERR_FILENO); + close(pipe_out[0]); + close(pipe_out[1]); + if (!combine_stderr) { + close(pipe_err[0]); + close(pipe_err[1]); + } + + // Unbuffered output + setvbuf(stdout, nullptr, _IONBF, 0); + setvbuf(stderr, nullptr, _IONBF, 0); + + // Execute child body (should not return) + child_body(); + _exit(127); + } + + // Parent: return read fds + close(pipe_out[1]); + if (out_fd_stdout) + *out_fd_stdout = pipe_out[0]; + else + close(pipe_out[0]); + if (!combine_stderr) { + close(pipe_err[1]); + if (out_fd_stderr) + *out_fd_stderr = pipe_err[0]; + else + close(pipe_err[0]); + } + return pid; +} + +/** + * @brief Launch a single rank locally (fork-based). + */ +pid_t launch_rank_local( + Config const& cfg, int rank, int* out_fd_stdout, int* out_fd_stderr +) { + return fork_with_piped_stdio( + out_fd_stdout, + out_fd_stderr, + /*combine_stderr*/ false, + [&cfg, rank]() { + // Set custom environment variables first (can be overridden by specific vars) + for (auto const& env_pair : cfg.env_vars) { + setenv(env_pair.first.c_str(), env_pair.second.c_str(), 1); + } + + // Set environment variables + setenv("RAPIDSMPF_RANK", std::to_string(rank).c_str(), 1); + setenv("RAPIDSMPF_NRANKS", std::to_string(cfg.nranks).c_str(), 1); + setenv("RAPIDSMPF_COORD_DIR", cfg.coord_dir.c_str(), 1); + + // Set CUDA_VISIBLE_DEVICES if GPUs are available + if (!cfg.gpus.empty()) { + int gpu_id = cfg.gpus[static_cast(rank) % cfg.gpus.size()]; + setenv("CUDA_VISIBLE_DEVICES", std::to_string(gpu_id).c_str(), 1); + } + + // Prepare arguments for execvp + std::vector exec_args; + exec_args.push_back(const_cast(cfg.app_binary.c_str())); + for (auto const& arg : cfg.app_args) { + exec_args.push_back(const_cast(arg.c_str())); + } + exec_args.push_back(nullptr); + + execvp(cfg.app_binary.c_str(), exec_args.data()); + std::cerr << "Failed to execute " << cfg.app_binary << ": " + << std::strerror(errno) << std::endl; + _exit(1); + } + ); +} + +/** + * @brief Wait for all child processes and check their exit status. + */ +int wait_for_ranks(std::vector const& pids) { + int overall_status = 0; + + for (size_t i = 0; i < pids.size(); ++i) { + int status; + while (true) { + pid_t result = waitpid(pids[i], &status, 0); + + if (result < 0) { + if (errno == EINTR) { + // Retry waitpid for the same pid + continue; + } + std::cerr << "Error waiting for rank " << i << ": " + << std::strerror(errno) << std::endl; + overall_status = 1; + break; + } + + if (WIFEXITED(status)) { + int exit_code = WEXITSTATUS(status); + if (exit_code != 0) { + std::cerr << "Rank " << i << " (PID " << pids[i] + << ") exited with code " << exit_code << std::endl; + overall_status = exit_code; + } + } else if (WIFSIGNALED(status)) { + int signal = WTERMSIG(status); + std::cerr << "Rank " << i << " (PID " << pids[i] + << ") terminated by signal " << signal << std::endl; + overall_status = 128 + signal; + } + break; + } + } + + return overall_status; +} +} // namespace + +int main(int argc, char* argv[]) { + try { + // Parse arguments + Config cfg = parse_args(argc, argv); + + if (cfg.verbose) { + std::cout << "rrun configuration:\n"; + std::cout << " Mode: Single-node\n" + << " GPUs: "; + if (cfg.gpus.empty()) { + std::cout << "(none)\n"; + } else { + for (size_t i = 0; i < cfg.gpus.size(); ++i) { + if (i > 0) + std::cout << ", "; + std::cout << cfg.gpus[i]; + } + std::cout << "\n"; + } + if (cfg.tag_output) { + std::cout << " Tag Output: Yes\n"; + } + std::cout << " Ranks: " << cfg.nranks << "\n" + << " Application: " << cfg.app_binary << "\n" + << " Coord Dir: " << cfg.coord_dir << "\n" + << " Cleanup: " << (cfg.cleanup ? "yes" : "no") << "\n"; + if (!cfg.env_vars.empty()) { + std::cout << " Env Vars: "; + bool first = true; + for (auto const& env_pair : cfg.env_vars) { + if (!first) + std::cout << " "; + std::cout << env_pair.first << "=" << env_pair.second << "\n"; + first = false; + } + } + std::cout << std::endl; + } + + std::filesystem::create_directories(cfg.coord_dir); + + std::vector pids; + pids.reserve(static_cast(cfg.nranks)); + + // Block SIGINT/SIGTERM in this thread; a dedicated thread will handle them. + sigset_t signal_set; + sigemptyset(&signal_set); + sigaddset(&signal_set, SIGINT); + sigaddset(&signal_set, SIGTERM); + sigprocmask(SIG_BLOCK, &signal_set, nullptr); + + // Output suppression flag and forwarder threads + auto suppress_output = std::make_shared>(false); + std::vector forwarders; + forwarders.reserve(static_cast(cfg.nranks) * 2); + + // Helper to start a forwarder thread for a given fd + auto start_forwarder = [&](int fd, int rank, bool to_stderr) { + if (fd < 0) { + return; + } + forwarders.emplace_back([fd, rank, to_stderr, &cfg, suppress_output]() { + FILE* stream = fdopen(fd, "r"); + if (!stream) { + close(fd); + return; + } + std::string tag = + cfg.tag_output ? ("[" + std::to_string(rank) + "] ") : std::string{}; + char buffer[4096]; + while (fgets(buffer, sizeof(buffer), stream) != nullptr) { + if (suppress_output->load(std::memory_order_relaxed)) { + // Discard further lines after suppression + continue; + } + FILE* out = to_stderr ? stderr : stdout; + { + std::lock_guard lock(output_mutex); + if (!tag.empty()) { + fputs(tag.c_str(), out); + } + fputs(buffer, out); + fflush(out); + } + } + fclose(stream); + }); + }; + + // Single-node local mode + for (int rank = 0; rank < cfg.nranks; ++rank) { + int fd_out = -1; + int fd_err = -1; + pid_t pid = launch_rank_local(cfg, rank, &fd_out, &fd_err); + pids.push_back(pid); + + if (cfg.verbose) { + std::cout << "Launched rank " << rank << " (PID " << pid << ")"; + if (!cfg.gpus.empty()) { + std::cout << " on GPU " + << cfg.gpus[static_cast(rank) % cfg.gpus.size()]; + } + std::cout << std::endl; + } + // Parent-side forwarders for local stdout and stderr + start_forwarder(fd_out, rank, false); + start_forwarder(fd_err, rank, true); + } + + // Start a signal-waiting thread to forward signals. + std::thread([signal_set, &pids, suppress_output]() mutable { + for (;;) { + int sig = 0; + int rc = sigwait(&signal_set, &sig); + if (rc != 0) { + return; + } + // Stop printing further output immediately + suppress_output->store(true, std::memory_order_relaxed); + // Forward signal to all local children + for (pid_t pid : pids) { + std::ignore = kill(pid, sig); + } + } + }).detach(); + + if (cfg.verbose) { + std::cout << "\nAll ranks launched. Waiting for completion...\n" << std::endl; + } + + // Wait for all ranks to complete + int exit_status = wait_for_ranks(pids); + + // Join forwarders before cleanup + for (auto& th : forwarders) { + if (th.joinable()) { + th.join(); + } + } + + if (cfg.cleanup) { + if (cfg.verbose) { + std::cout << "Cleaning up coordination directory: " << cfg.coord_dir + << std::endl; + } + std::error_code ec; + std::filesystem::remove_all(cfg.coord_dir, ec); + if (ec) { + std::cerr << "Warning: Failed to cleanup directory: " << cfg.coord_dir + << ": " << ec.message() << std::endl; + } + } else if (cfg.verbose) { + std::cout << "Coordination directory preserved: " << cfg.coord_dir + << std::endl; + } + + if (cfg.verbose && exit_status == 0) { + std::cout << "\nAll ranks completed successfully." << std::endl; + } + + return exit_status; + + } catch (std::exception const& e) { + std::cerr << "Error: " << e.what() << std::endl; + std::cerr << "Run with -h or --help for usage information." << std::endl; + return 1; + } +} diff --git a/python/librapidsmpf/pyproject.toml b/python/librapidsmpf/pyproject.toml index abfb80a71..6135a0992 100644 --- a/python/librapidsmpf/pyproject.toml +++ b/python/librapidsmpf/pyproject.toml @@ -61,6 +61,7 @@ sdist.reproducible = true wheel.packages = ["librapidsmpf"] wheel.install-dir = "librapidsmpf" wheel.py-api = "py3" +install.components = ["Unspecified", "rapidsmpf", "tools"] [tool.scikit-build.metadata.version] provider = "scikit_build_core.metadata.regex"