Skip to content
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
39c9cf9
Add blastoff
pentschev Oct 17, 2025
63fe70f
Multi-node support via SSH
pentschev Oct 30, 2025
fffc5a5
Add support to pass environment variables to spawned processes
pentschev Oct 30, 2025
593ab52
Disable output buffering
pentschev Oct 30, 2025
1f5aa97
Remove `initialized` synchronization file that causes deadlocks
pentschev Oct 30, 2025
700b5c0
Periodically refresh NFS directory cache
pentschev Oct 30, 2025
5a83c37
Add --tag-output support
pentschev Oct 30, 2025
adc5eeb
Support terminating remote SSH child processes
pentschev Oct 30, 2025
a2fac42
Omit verbose `sh kill` messages
pentschev Oct 30, 2025
2e05013
Fix broken early termination
pentschev Oct 30, 2025
e6b749b
Acknowledge terminate request
pentschev Oct 30, 2025
88ce281
Use std::chrono::milliseconds instead of int
pentschev Oct 31, 2025
629aa18
Use string_view where possible
pentschev Oct 31, 2025
71c87e2
Use std::filesystem::create_directories
pentschev Oct 31, 2025
696f3c6
Use rename and remove from std::filesystem
pentschev Oct 31, 2025
deba42a
Use std::filesystem to attempt refreshing NFS directories
pentschev Oct 31, 2025
decb3e8
Remove remove_dir_recursive/create_coord_dir in favor of std::filesystem
pentschev Oct 31, 2025
509b627
Replace usleep with sleep_for
pentschev Oct 31, 2025
0f9887c
Use dedicated signal handling thread
pentschev Oct 31, 2025
6c9be66
Close stdout/stderr pipes when termination begins
pentschev Oct 31, 2025
9c880ac
Fix per-line atomicity of output
pentschev Oct 31, 2025
6a38dde
Add rrun smoketests
pentschev Oct 31, 2025
fb1c1e9
Remove SSH support
pentschev Oct 31, 2025
1ec2650
Move UCXX implementation to new files, fix compile-time checks
pentschev Oct 31, 2025
6126103
Fix missing nodiscard
pentschev Oct 31, 2025
839973c
Update cpp/src/bootstrap/file_backend.cpp
KyleFromNVIDIA Oct 31, 2025
12b3947
Update cpp/tools/CMakeLists.txt
KyleFromNVIDIA Oct 31, 2025
8680224
Add and use Conda recipe for tools
KyleFromNVIDIA Oct 31, 2025
e4ae979
Fix header
KyleFromNVIDIA Oct 31, 2025
7342798
Remove ignore_run_exports
KyleFromNVIDIA Oct 31, 2025
1a3aae9
Revert "Remove ignore_run_exports"
KyleFromNVIDIA Oct 31, 2025
b88f010
Revert "Add and use Conda recipe for tools"
KyleFromNVIDIA Oct 31, 2025
07b7e3c
Put tools in librapidsmpf
KyleFromNVIDIA Oct 31, 2025
4aaa231
Merge remote-tracking branch 'upstream/main' into rrun
pentschev Oct 31, 2025
aa9eefc
Unify ucxx-bootstrap and ucxx
pentschev Oct 31, 2025
9204eaf
Install tools into librapidsmpf wheel
KyleFromNVIDIA Nov 3, 2025
6afb08a
Bring namespace open/close bracket closer to format
pentschev Nov 3, 2025
51d0f14
rapidsmpf component
KyleFromNVIDIA Nov 3, 2025
61ecf6a
Use Duration alias
pentschev Nov 3, 2025
8df332d
Simplify backend selection code
pentschev Nov 3, 2025
615c5ee
Fix older mentions
pentschev Nov 3, 2025
97ef6d5
Remove unnecessary break statement
pentschev Nov 3, 2025
9ba0e3d
Print FileBackend destructor errors to stderr
pentschev Nov 3, 2025
40465cc
Cleanup temporary directory during FileBackend destructor
pentschev Nov 3, 2025
2cd918f
Further cleanups
pentschev Nov 3, 2025
467a861
Merge remote-tracking branch 'origin/rrun' into rrun
pentschev Nov 3, 2025
28e9969
Merge remote-tracking branch 'upstream/main' into rrun
pentschev Nov 3, 2025
b6aba18
Document return of generate_session_id
pentschev Nov 3, 2025
db112cb
Use RAPIDSMPF_{EXPECTS,FAIL}
pentschev Nov 3, 2025
73fde7d
Use std::ignore instead of void cast
pentschev Nov 4, 2025
8ee9525
Use seconds instead of milliseconds as defaults
pentschev Nov 4, 2025
3aa29d9
Merge remote-tracking branch 'origin/rrun' into rrun
pentschev Nov 4, 2025
22e4742
Use Duration in wait_for_file
pentschev Nov 4, 2025
7df3104
Linting
pentschev Nov 4, 2025
d9fff6d
More linting
pentschev Nov 4, 2025
4926172
Clarify GPU indices
pentschev Nov 4, 2025
a02ffd5
Fix linting
pentschev Nov 4, 2025
1b7c558
Merge branch 'main' into rrun
pentschev Nov 4, 2025
a9563ad
More code formatting changes
pentschev Nov 4, 2025
e190b68
Merge branch 'main' into rrun
pentschev Nov 4, 2025
6325b32
Fix build errors
pentschev Nov 4, 2025
eafe45e
Fix build error attempt 2
pentschev Nov 4, 2025
9ff9d3a
Formatting
pentschev Nov 4, 2025
51e010c
Revert "Use RAPIDSMPF_{EXPECTS,FAIL}"
pentschev Nov 4, 2025
df05f11
Replace other uses of RAPIDSMPF_{EXPECTS,FAIL} with throw
pentschev Nov 4, 2025
55757a4
Add note on not using `RAPIDSMPF_{EXPECTS,FAIL}`
pentschev Nov 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,24 @@ The UCX test suite uses, for convenience, MPI to bootstrap, therefore we need to
mpirun -np 2 cpp/build/gtests/ucxx_tests
```

### rrun - Distributed Launcher

RapidsMPF includes `rrun`, a lightweight launcher that eliminates the MPI dependency for multi-GPU workloads. This is particularly useful for development, testing, and environments where MPI is not available.

#### Single-Node Usage

```bash
# Build rrun
cd cpp/build
cmake --build . --target rrun

# Launch 2 ranks in the local node
./tools/rrun -n 2 ./benchmarks/bench_comm -C ucxx -O all-to-all

# With verbose output and specific GPUs
./tools/rrun -v -n 4 -g 0,1,2,3 ./benchmarks/bench_comm -C ucxx
```

## Algorithms
### Table Shuffle Service
Example of a MPI program that uses the shuffler:
Expand Down
12 changes: 12 additions & 0 deletions ci/run_cpp_benchmark_smoketests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,15 @@ for i in {0..2}; do
exit 1
fi
done

# Test with rrun

# Confirm no dependencies on OpenMPI variables
unset OMPI_ALLOW_RUN_AS_ROOT
unset OMPI_ALLOW_RUN_AS_ROOT_CONFIRM
unset OMPI_MCA_opal_cuda_support

python "${TIMEOUT_TOOL_PATH}" 30 \
rrun -n 3 -g 0,0,0 ./bench_comm -m cuda -C ucxx
python "${TIMEOUT_TOOL_PATH}" 30 \
rrun --tag-output -n 3 -g 0,0,0 ./bench_comm -m cuda -C ucxx
1 change: 1 addition & 0 deletions conda/recipes/librapidsmpf/recipe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ outputs:
content: |
cmake --install cpp/build
cmake --install cpp/build --component=benchmarking
cmake --install cpp/build --component=tools
dynamic_linking:
overlinking_behavior: "error"
prefix_detection:
Expand Down
9 changes: 8 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ target_link_options(maybe_asan INTERFACE "$<$<BOOL:${RAPIDSMPF_ASAN}>:-fsanitize
add_library(
rapidsmpf
src/allgather/allgather.cpp
src/bootstrap/bootstrap.cpp
src/bootstrap/file_backend.cpp
src/buffer/buffer.cpp
src/buffer/pinned_memory_resource.cu
src/buffer/resource.cpp
Expand Down Expand Up @@ -189,7 +191,7 @@ if(RAPIDSMPF_HAVE_STREAMING)
)
endif()
if(RAPIDSMPF_HAVE_UCXX)
target_sources(rapidsmpf PRIVATE src/communicator/ucxx.cpp)
target_sources(rapidsmpf PRIVATE src/bootstrap/ucxx.cpp src/communicator/ucxx.cpp)
endif()
if(RAPIDSMPF_HAVE_MPI)
target_sources(rapidsmpf PRIVATE src/communicator/mpi.cpp)
Expand Down Expand Up @@ -323,6 +325,11 @@ if(RAPIDSMPF_BUILD_EXAMPLES)
add_subdirectory(examples)
endif()

# ##################################################################################################
# * add tools
# -------------------------------------------------------------------------------------
add_subdirectory(tools)

# ##################################################################################################
# * install targets -------------------------------------------------------------------------------
rapids_cmake_install_lib_dir(lib_dir)
Expand Down
93 changes: 70 additions & 23 deletions cpp/benchmarks/bench_comm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include <mpi.h>

#include <rapidsmpf/bootstrap/bootstrap.hpp>
#include <rapidsmpf/bootstrap/ucxx.hpp>
#include <rapidsmpf/communicator/communicator.hpp>
#include <rapidsmpf/communicator/mpi.hpp>
#include <rapidsmpf/communicator/ucxx_utils.hpp>
Expand All @@ -27,12 +29,15 @@ using namespace rapidsmpf;

class ArgumentParser {
public:
ArgumentParser(int argc, char* const* argv) {
RAPIDSMPF_EXPECTS(mpi::is_initialized() == true, "MPI is not initialized");

int rank, nranks;
RAPIDSMPF_MPI(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
RAPIDSMPF_MPI(MPI_Comm_size(MPI_COMM_WORLD, &nranks));
ArgumentParser(int argc, char* const* argv, bool use_mpi = true) {
int rank = 0;
int nranks = 1;

if (use_mpi) {
RAPIDSMPF_EXPECTS(mpi::is_initialized() == true, "MPI is not initialized");
RAPIDSMPF_MPI(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
RAPIDSMPF_MPI(MPI_Comm_size(MPI_COMM_WORLD, &nranks));
}

try {
int option;
Expand All @@ -44,6 +49,8 @@ class ArgumentParser {
ss << "Usage: " << argv[0] << " [options]\n"
<< "Options:\n"
<< " -C <comm> Communicator {mpi, ucxx} (default: mpi)\n"
<< " ucxx automatically detects launcher (mpirun "
"or rrun)\n"
<< " -O <op> Operation {all-to-all} (default: "
"all-to-all)\n"
<< " -n <num> Message size in bytes (default: 1M)\n"
Expand All @@ -63,7 +70,11 @@ class ArgumentParser {
if (rank == 0) {
std::cerr << ss.str();
}
RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, 0));
if (use_mpi) {
RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, 0));
} else {
std::exit(0);
}
}
break;
case 'C':
Expand All @@ -73,7 +84,11 @@ class ArgumentParser {
std::cerr << "-C (Communicator) must be one of {mpi, ucxx}"
<< std::endl;
}
RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, -1));
if (use_mpi) {
RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, -1));
} else {
std::exit(-1);
}
}
break;
case 'O':
Expand Down Expand Up @@ -114,7 +129,11 @@ class ArgumentParser {
break;
#endif
case '?':
RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, -1));
if (use_mpi) {
RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, -1));
} else {
std::exit(-1);
}
break;
default:
RAPIDSMPF_FAIL("unknown option", std::invalid_argument);
Expand All @@ -127,7 +146,11 @@ class ArgumentParser {
if (rank == 0) {
std::cerr << "Error parsing arguments: " << e.what() << std::endl;
}
RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, -1));
if (use_mpi) {
RAPIDSMPF_MPI(MPI_Abort(MPI_COMM_WORLD, -1));
} else {
std::exit(-1);
}
}

if (rmm_mr == "cuda") {
Expand Down Expand Up @@ -229,27 +252,49 @@ Duration run(
}

int main(int argc, char** argv) {
// Explicitly initialize MPI with thread support, as this is needed for both mpi and
// ucxx communicators.
int provided;
RAPIDSMPF_MPI(MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided));

RAPIDSMPF_EXPECTS(
provided == MPI_THREAD_MULTIPLE,
"didn't get the requested thread level support: MPI_THREAD_MULTIPLE"
);
// Check if we should use bootstrap mode with rrun
// This is determined by checking for RAPIDSMPF_RANK environment variable
bool use_bootstrap = std::getenv("RAPIDSMPF_RANK") != nullptr;

int provided = 0;
if (!use_bootstrap) {
// Explicitly initialize MPI with thread support, as this is needed for both mpi
// and ucxx communicators.
RAPIDSMPF_MPI(MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided));

RAPIDSMPF_EXPECTS(
provided == MPI_THREAD_MULTIPLE,
"didn't get the requested thread level support: MPI_THREAD_MULTIPLE"
);
}

ArgumentParser args{argc, argv};
ArgumentParser args{argc, argv, !use_bootstrap};

// Initialize configuration options from environment variables.
rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()};

std::shared_ptr<Communicator> comm;
if (args.comm_type == "mpi") {
if (use_bootstrap) {
std::cerr << "Error: MPI communicator requires MPI initialization. "
<< "Don't use with rrun or unset RAPIDSMPF_RANK." << std::endl;
return 1;
}
mpi::init(&argc, &argv);
comm = std::make_shared<MPI>(MPI_COMM_WORLD, options);
} else { // ucxx
comm = rapidsmpf::ucxx::init_using_mpi(MPI_COMM_WORLD, options);
} else if (args.comm_type == "ucxx") {
if (use_bootstrap) {
// Launched with rrun - use bootstrap backend
comm = rapidsmpf::bootstrap::create_ucxx_comm(
rapidsmpf::bootstrap::Backend::AUTO, options
);
} else {
// Launched with mpirun - use MPI bootstrap
comm = rapidsmpf::ucxx::init_using_mpi(MPI_COMM_WORLD, options);
}
} else {
std::cerr << "Error: Unknown communicator type: " << args.comm_type << std::endl;
return 1;
}

auto& log = comm->logger();
Expand Down Expand Up @@ -350,6 +395,8 @@ int main(int argc, char** argv) {
}
#endif

RAPIDSMPF_MPI(MPI_Finalize());
if (!use_bootstrap) {
RAPIDSMPF_MPI(MPI_Finalize());
}
return 0;
}
139 changes: 139 additions & 0 deletions cpp/include/rapidsmpf/bootstrap/bootstrap.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <chrono>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>

#include <rapidsmpf/config.hpp>

namespace rapidsmpf {

namespace bootstrap {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
namespace rapidsmpf {
namespace bootstrap {
namespace rapidsmpf::bootstrap {


// Rank type (matches communicator::Rank)
using Rank = std::int32_t;

/**
* @brief Backend types for process coordination and bootstrapping.
*/
enum class Backend {
/**
* @brief Automatically detect the best backend based on environment.
*
* Detection order:
* 1. File-based (default fallback)
*/
AUTO,

/**
* @brief File-based coordination using a shared directory.
*
* Uses filesystem for rank coordination and address exchange. Works on single-node
* and multi-node with shared storage (e.g., NFS) via SSH. Requires RAPIDSMPF_RANK,
* RAPIDSMPF_NRANKS, RAPIDSMPF_COORD_DIR environment variables.
*/
FILE,
};

/**
* @brief Context information for the current process/rank.
*
* This structure contains the rank assignment and total rank count,
* along with additional metadata about the execution environment.
*/
struct Context {
/** @brief This process's rank (0-indexed). */
Rank rank;

/** @brief Total number of ranks in the job. */
Rank nranks;

/** @brief Backend used for coordination. */
Backend backend;

/** @brief Coordination directory (for FILE backend). */
std::optional<std::string> coord_dir;
};

/**
* @brief Initialize the bootstrap context from environment variables.
*
* This function reads environment variables to determine rank, nranks, and
* backend configuration. It should be called early in the application lifecycle.
*
* Environment variables checked (in order of precedence):
* - RAPIDSMPF_RANK: Explicitly set rank
* - RAPIDSMPF_NRANKS: Explicitly set total rank count
* - RAPIDSMPF_COORD_DIR: File-based coordination directory
*
* @param backend Backend to use (default: AUTO for auto-detection).
* @return Context object containing rank and coordination information.
* @throws std::runtime_error if environment is not properly configured.
*
* @code
* auto ctx = rapidsmpf::bootstrap::init();
* std::cout << "I am rank " << ctx.rank << " of " << ctx.nranks << std::endl;
* @endcode
*/
Context init(Backend backend = Backend::AUTO);

/**
* @brief Broadcast data from root rank to all other ranks.
*
* This is a helper function for broadcasting small amounts of data during
* bootstrapping. It uses the underlying backend's coordination mechanism.
*
* @param ctx Bootstrap context.
* @param data Data buffer to broadcast (both input on root, output on others).
* @param size Size of data in bytes.
* @param root Root rank performing the broadcast (default: 0).
*/
void broadcast(Context const& ctx, void* data, std::size_t size, Rank root = 0);

/**
* @brief Perform a barrier synchronization across all ranks.
*
* This ensures all ranks reach this point before any rank proceeds.
*
* @param ctx Bootstrap context.
*/
void barrier(Context const& ctx);

/**
* @brief Store a key-value pair in the coordination backend.
*
* This is useful for custom coordination beyond UCXX address exchange.
*
* @param ctx Bootstrap context.
* @param key Key name.
* @param value Value to store.
*/
void put(Context const& ctx, std::string const& key, std::string const& value);

/**
* @brief Retrieve a value from the coordination backend.
*
* This function blocks until the key is available or timeout occurs.
*
* @param ctx Bootstrap context.
* @param key Key name to retrieve.
* @param timeout Timeout duration.
* @return Value associated with the key.
* @throws std::runtime_error if key not found within timeout.
*/
std::string get(
Context const& ctx,
std::string const& key,
std::chrono::milliseconds timeout = std::chrono::milliseconds{30000}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Duration from #include <rapidsmpf/utils.hpp>

Suggested change
std::chrono::milliseconds timeout = std::chrono::milliseconds{30000}
Duration timeout = std::chrono::seconds{30}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Duration, but I think we need something more fine-grained than seconds, we might need to express values smaller than a second when implementing other backends.

);

} // namespace bootstrap

} // namespace rapidsmpf
Loading