Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ option(BUILD_SHARED_LIBS "Build RapidsMPF shared library" ON)
option(CUDA_STATIC_RUNTIME "Statically link the CUDA runtime" OFF)
option(RAPIDSMPF_CLANG_TIDY "Enable clang-tidy during compilation" OFF)
option(RAPIDSMPF_ASAN "Enable AddressSanitizer" OFF)
option(RAPIDSMPF_VERBOSE_INFO "Enable detail mode" OFF)

message(STATUS "librapidsmpf build options:")
message(STATUS " BUILD_MPI_SUPPORT : ${BUILD_MPI_SUPPORT}")
Expand All @@ -67,6 +68,7 @@ message(STATUS " BUILD_SHARED_LIBS : ${BUILD_SHARED_LIBS}")
message(STATUS " CUDA_STATIC_RUNTIME : ${CUDA_STATIC_RUNTIME}")
message(STATUS " RAPIDSMPF_CLANG_TIDY : ${RAPIDSMPF_CLANG_TIDY}")
message(STATUS " RAPIDSMPF_ASAN : ${RAPIDSMPF_ASAN}")
message(STATUS " RAPIDSMPF_VERBOSE_INFO : ${RAPIDSMPF_VERBOSE_INFO}")

# Copy options to our prefix to prevent upstream projects from modifying them.
set(RAPIDSMPF_HAVE_MPI ${BUILD_MPI_SUPPORT})
Expand Down Expand Up @@ -247,6 +249,7 @@ target_compile_definitions(
$<$<BOOL:${RAPIDSMPF_HAVE_STREAMING}>:RAPIDSMPF_HAVE_STREAMING>
$<$<BOOL:${RAPIDSMPF_HAVE_CUPTI}>:RAPIDSMPF_HAVE_CUPTI>
$<$<BOOL:${RAPIDSMPF_HAVE_NUMA}>:RAPIDSMPF_HAVE_NUMA>
$<$<BOOL:${RAPIDSMPF_VERBOSE_INFO}>:RAPIDSMPF_VERBOSE_INFO>
)

rapids_cuda_set_runtime(rapidsmpf USE_STATIC ${CUDA_STATIC_RUNTIME})
Expand Down
27 changes: 27 additions & 0 deletions cpp/include/rapidsmpf/nvtx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,33 @@ struct rapidsmpf_domain {
*/
#define RAPIDSMPF_NVTX_SCOPED_RANGE(...) RAPIDSMPF_NVTX_SCOPED_RANGE_IMPL(__VA_ARGS__)

/**
* @brief Convenience macro for generating an NVTX scoped range in the `rapidsmpf` domain
* that is only active when RAPIDSMPF_VERBOSE_INFO is defined.
*
* This macro behaves identically to RAPIDSMPF_NVTX_SCOPED_RANGE, but only creates
* the NVTX range when the RAPIDSMPF_VERBOSE_INFO compile-time flag is set.
*
* Usage:
* - `RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE(message)` - Annotate with message only
* - `RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE(message, payload)` - Annotate with message and
* payload
*
* Example:
* ```
* void some_function(){
* RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("detailed operation");
* RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("detailed operation", count);
* ...
* }
* ```
*/
#if RAPIDSMPF_VERBOSE_INFO
#define RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE(...) RAPIDSMPF_NVTX_SCOPED_RANGE(__VA_ARGS__)
#else
#define RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE(...)
#endif

#define RAPIDSMPF_NVTX_MARKER_IMPL(msg, val) \
nvtx3::mark_in<rapidsmpf_domain>(nvtx3::event_attributes{ \
RAPIDSMPF_REGISTER_STRING(msg), nvtx3::payload{convert_to_64bit(val)} \
Expand Down
24 changes: 18 additions & 6 deletions cpp/src/shuffler/shuffler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class Shuffler::Progress {
* @return The progress state of the shuffler.
*/
ProgressThread::ProgressState operator()() {
RAPIDSMPF_NVTX_SCOPED_RANGE("Shuffler.Progress", p_iters++);
RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("Shuffler.Progress", p_iters++);
auto const t0_event_loop = Clock::now();

// Tags for each stage of the shuffle
Expand All @@ -186,7 +186,7 @@ class Shuffler::Progress {
{
auto const t0_send_metadata = Clock::now();
auto ready_chunks = shuffler_.outgoing_postbox_.extract_all_ready();
RAPIDSMPF_NVTX_SCOPED_RANGE("meta_send", ready_chunks.size());
RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("meta_send", ready_chunks.size());
for (auto&& chunk : ready_chunks) {
// All messages in the chunk maps to the same key (checked by the PostBox)
// thus we can use the partition ID of the first message in the chunk to
Expand Down Expand Up @@ -226,8 +226,10 @@ class Shuffler::Progress {
// `incoming_chunks_`.
{
auto const t0_metadata_recv = Clock::now();
RAPIDSMPF_NVTX_SCOPED_RANGE("meta_recv");
RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("meta_recv");
#if RAPIDSMPF_VERBOSE_INFO
int i = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: now that this needs guards, rename i to something a bit more descriptive.

#endif
while (true) {
auto const [msg, src] = shuffler_.comm_->recv_any(metadata_tag);
if (msg) {
Expand All @@ -245,17 +247,23 @@ class Shuffler::Progress {
} else {
break;
}
#if RAPIDSMPF_VERBOSE_INFO
i++;
#endif
}
stats.add_duration_stat(
"event-loop-metadata-recv", Clock::now() - t0_metadata_recv
);
#if RAPIDSMPF_VERBOSE_INFO
RAPIDSMPF_NVTX_MARKER("meta_recv_iters", i);
#endif
Comment on lines +257 to +259
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or better than renaming i, how about adding a RAPIDSMPF_NVTX_MARKER_VERBOSE, then always defining i and let the RAPIDSMPF_NVTX_MARKER_VERBOSE determine whether to use i or not?

}

// Post receives for incoming chunks
{
RAPIDSMPF_NVTX_SCOPED_RANGE("post_chunk_recv", incoming_chunks_.size());
RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE(
"post_chunk_recv", incoming_chunks_.size()
);
auto const t0_post_incoming_chunk_recv = Clock::now();
for (auto it = incoming_chunks_.begin(); it != incoming_chunks_.end();) {
auto& [src, chunk] = *it;
Expand Down Expand Up @@ -342,7 +350,7 @@ class Shuffler::Progress {
// requested data.
{
auto const t0_init_gpu_data_send = Clock::now();
RAPIDSMPF_NVTX_SCOPED_RANGE(
RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE(
"init_gpu_send",
std::transform_reduce(
ready_ack_receives_.begin(),
Expand Down Expand Up @@ -379,7 +387,9 @@ class Shuffler::Progress {
// Check if any data in transit is finished.
{
auto const t0_check_future_finish = Clock::now();
RAPIDSMPF_NVTX_SCOPED_RANGE("check_fut_finish", in_transit_futures_.size());
RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE(
"check_fut_finish", in_transit_futures_.size()
);
if (!in_transit_futures_.empty()) {
std::vector<ChunkID> finished =
shuffler_.comm_->test_some(in_transit_futures_);
Expand Down Expand Up @@ -439,7 +449,9 @@ class Shuffler::Progress {
std::unordered_map<Rank, std::vector<std::unique_ptr<Communicator::Future>>>
ready_ack_receives_; ///< Receives matching ready for data messages.

#if RAPIDSMPF_VERBOSE_INFO
int64_t p_iters = 0; ///< Number of progress iterations (for NVTX)
#endif
};

std::vector<PartID> Shuffler::local_partitions(
Expand Down