Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
06617ca
Add communication interface
pentschev Aug 14, 2025
f60e76c
Make ShufflerCommunicationInterface stateful
pentschev Aug 14, 2025
4cb58d4
Fix build errors
pentschev Aug 14, 2025
89bd2aa
Fix segfault
pentschev Aug 14, 2025
94a1c34
Fix logic errors that caused tests failures
pentschev Aug 14, 2025
a05498e
Add statistics back
pentschev Aug 14, 2025
0cc2ea1
Add missing log
pentschev Aug 14, 2025
9d2351a
Add stats for total communication
pentschev Aug 14, 2025
c7007ff
Rename default to TagShufflerCommunication
pentschev Aug 14, 2025
8f32166
Fix build errors
pentschev Aug 14, 2025
ff5db84
Make class names more consistent
pentschev Aug 14, 2025
3e0c826
Mark appropriate methods nodiscard
pentschev Aug 14, 2025
131d952
Use std::ranges::all_of
pentschev Aug 15, 2025
44e73f4
Use std::ranges::all_of
pentschev Aug 15, 2025
088896a
Merge remote-tracking branch 'upstream/branch-25.10' into comms-inter…
pentschev Sep 24, 2025
fc765e0
Add MessageInterface
pentschev Sep 24, 2025
ec1dfb8
Refactor CommunicationInterface for better generalization
pentschev Sep 24, 2025
67deec8
Add ChunkMessageAdapter for MessageInterface
pentschev Sep 24, 2025
d926526
Update Shuffler for the refactored CommunicationInterface
pentschev Sep 24, 2025
9e4efd3
Remove leftover from conflict resolution
pentschev Sep 24, 2025
1cb32ec
Update test_dask stats names
pentschev Sep 24, 2025
68023c5
Merge remote-tracking branch 'upstream/branch-25.10' into comms-inter…
pentschev Sep 25, 2025
05bbcee
Add missing test_dask stat name
pentschev Sep 25, 2025
cc4fe76
Merge branch 'branch-25.12' into comms-interface
pentschev Sep 25, 2025
de81ed8
Merge remote-tracking branch 'upstream/branch-25.12' into comms-inter…
pentschev Oct 13, 2025
f76962e
Merge remote-tracking branch 'origin/comms-interface' into comms-inte…
pentschev Oct 13, 2025
702583a
Fix build errors
pentschev Oct 13, 2025
1e48429
Merge branch 'main' into comms-interface
pentschev Oct 20, 2025
470bbe1
Merge remote-tracking branch 'upstream/main' into comms-interface
pentschev Oct 21, 2025
0434304
Merge remote-tracking branch 'origin/comms-interface' into comms-inte…
pentschev Oct 21, 2025
aea3eaa
Remove ack messages
pentschev Oct 21, 2025
ded9147
Simplify message interface
pentschev Oct 21, 2025
5da980d
Rename to send_messages/receive_messages
pentschev Oct 22, 2025
a42daf1
Add tests
pentschev Oct 22, 2025
984164b
Revert CommunicationInterface changes from shuffler
pentschev Oct 22, 2025
49d41f2
Merge branch 'main' into comms-interface
pentschev Oct 22, 2025
00446e2
Merge remote-tracking branch 'upstream/main' into comms-interface
pentschev Oct 23, 2025
ed4eb9b
Move communication state description to TagCommunicationInterface
pentschev Oct 23, 2025
73c8a32
Improve docs and comments
pentschev Oct 23, 2025
4f1b1c8
Rename CommunicationInterface to MetadataPayloadExchange
pentschev Oct 27, 2025
8fdb66c
Move Message into MetadataPayloadExchange
pentschev Oct 27, 2025
adbdf41
Merge remote-tracking branch 'upstream/main' into comms-interface
pentschev Oct 27, 2025
683ef9a
Use rank from Communicator
pentschev Oct 28, 2025
9461b2a
Truncated metadata is fatal
pentschev Oct 28, 2025
40154f3
Add missing curly braces
pentschev Oct 28, 2025
f0c4743
Use std::erase_if
pentschev Oct 28, 2025
6d5bdce
Improve MetadataPayloadExchange description
pentschev Oct 28, 2025
08cba54
Use wrapper for Message internal implementation's details
pentschev Oct 28, 2025
db546ef
Merge remote-tracking branch 'upstream/main' into comms-interface
pentschev Oct 28, 2025
6c7fd1c
Prevent unnecessary use of set_data
pentschev Oct 28, 2025
bb53e29
Simplify test_some
pentschev Oct 28, 2025
ec76b02
Add send_message method
pentschev Oct 28, 2025
4de6ba9
Add note on thread-safety
pentschev Oct 28, 2025
1c6510a
Remove unnecessary after release_data call
pentschev Oct 28, 2025
f8d3b44
Ensure message receive ordering per rank
pentschev Oct 28, 2025
d514cd3
Remove unnecessary release_data check
pentschev Oct 28, 2025
0d28a8c
Handle completed metadata immediately
pentschev Oct 28, 2025
7183e2e
Simplify test checks
pentschev Oct 28, 2025
8daa244
Make Message methods constexpr
pentschev Oct 28, 2025
b16797f
Make attributes const
pentschev Oct 28, 2025
5000215
Release metadata to reuse the same buffer
pentschev Oct 28, 2025
04cd976
Pass allocate_buffer_fn by constref
pentschev Oct 28, 2025
dbef5b7
Merge branch 'main' into comms-interface
pentschev Oct 28, 2025
0fdd8d0
Rename send_messages/receive_messages to send/recv
pentschev Oct 29, 2025
f82f9fa
Clarify is_idle docs
pentschev Oct 29, 2025
7728a30
Check that data is not set before setting
pentschev Oct 29, 2025
ffa9541
Move allocate_buffer_fn into MetadataPayloadExchange constructor
pentschev Oct 29, 2025
014dc44
Document constructor
pentschev Oct 29, 2025
2eeb521
Move MetadataPayloadExchange into subdirectory
pentschev Oct 29, 2025
df51b4e
Clarify ordering guarantees in TagMetadataPayloadExchange
pentschev Oct 29, 2025
be525ed
Move ordering doc to MetdataPayloadExchange
pentschev Oct 31, 2025
8b5e0a0
Merge remote-tracking branch 'upstream/main' into comms-interface
pentschev Oct 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ add_library(
src/buffer/resource.cpp
src/buffer/spill_manager.cpp
src/communicator/communicator.cpp
src/communicator/metadata_payload_exchange/core.cpp
src/communicator/metadata_payload_exchange/tag.cpp
src/communicator/single.cpp
src/config.cpp
src/cuda_event.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once

#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include <rapidsmpf/buffer/buffer.hpp>
#include <rapidsmpf/buffer/resource.hpp>
#include <rapidsmpf/communicator/communicator.hpp>
#include <rapidsmpf/statistics.hpp>

namespace rapidsmpf::communicator {

/**
* @brief Interface for exchanging serialized metadata and payload between ranks.
*
* The `MetadataPayloadExchange` class defines an abstract interface for transmitting
* messages that contain both serialized metadata and a data payload. This abstraction
* simplifies scenarios where metadata and payload must be exchanged together as a
* single logical unit.
*
* Concrete implementations, such as `TagMetadataPayloadExchange`, use the
* `Communicator` to implement this interface. In the future, other implementations
* may leverage specialized features beyond the basic `Communicator` API to further
* optimize this communication pattern.
*
* @note This class is not thread-safe. All methods must be called from the same thread.
*
* @note All concrete implementations are expected to provide a constructor with
* the following signature:
* @code
* DerivedMetadataPayloadExchange(
* std::shared_ptr<Communicator> comm,
* OpID op_id,
* std::function<std::unique_ptr<Buffer>(std::size_t)> allocate_buffer_fn,
* std::shared_ptr<Statistics> statistics
* );
* @endcode
*/
class MetadataPayloadExchange {
public:
/**
* @brief Message class for communication.
*
* This class contains the essential information needed for communication:
* data payload, metadata, and peer rank (source/destination).
*/
class Message {
public:
/**
* @brief Construct a new Message.
*
* @param peer_rank Destination (outgoing) or source (incoming) rank.
* @param metadata Serialized metadata.
* @param data Data buffer (can be nullptr for metadata-only messages).
*/
Message(
Rank peer_rank,
std::vector<std::uint8_t> metadata,
std::unique_ptr<Buffer> data = nullptr
);

/**
* @brief Get the destination rank for outgoing or source rank for incoming
* messages.
*
* @return The rank of the destination or source.
*/
[[nodiscard]] constexpr Rank peer_rank() const {
return peer_rank_;
}

/**
* @brief Get the serialized metadata for this message.
*
* This metadata is sent first to inform the receiver about the incoming message.
*
* @return The serialized metadata.
*/
[[nodiscard]] constexpr std::vector<std::uint8_t> const& metadata() const {
return metadata_;
}

/**
* @brief Release ownership of the metadata.
*
* This is typically called when transferring metadata to the communication layer.
*
* @return Metadata with ownership transferred.
*/
[[nodiscard]] std::vector<std::uint8_t> release_metadata() {
return std::move(metadata_);
}

/**
* @brief Get the data buffer for this message.
*
* @return The data buffer, or nullptr if no data.
*/
[[nodiscard]] Buffer const* data() const;

/**
* @brief Release ownership of the data buffer.
*
* This is typically called when transferring a buffer to the communication layer.
*
* @return Data buffer with ownership transferred, or nullptr if no data.
*/
[[nodiscard]] std::unique_ptr<Buffer> release_data();

/**
* @brief Set the data buffer for this message.
*
* This method can be used by implementations to update the data buffer.
*
* @param buffer Data buffer to be set.
*/
void set_data(std::unique_ptr<Buffer> buffer);

private:
Rank peer_rank_;
std::vector<std::uint8_t> metadata_;
std::unique_ptr<Buffer> data_;
};

virtual ~MetadataPayloadExchange() = default;

/**
* @brief Send a single message to a remote rank.
*
* Takes ownership of a ready message and manages its transmission, including
* metadata sending and coordination of data transfer.
*
* The messages sent from the calling process to a destination remote rank are
* guaranteed to be received in the same order as they were sent. No ordering is
* guaranteed between messages sent to different remote ranks.
*
* @param message Message ready to be sent to a remote rank.
*/
virtual void send(std::unique_ptr<Message> message) = 0;

/**
* @brief Send messages to remote ranks.
*
* Takes ownership of ready messages and manages their transmission, including
* metadata sending and coordination of data transfer.
*
* The messages sent from the calling process to a destination remote rank are
* guaranteed to be received in the same order as they were sent. No ordering is
* guaranteed between messages sent to different remote ranks.
*
* @param messages Vector of messages ready to be sent to remote ranks.
*/
virtual void send(std::vector<std::unique_ptr<Message>>&& messages) = 0;

/**
* @brief Receive messages from remote ranks.
*
* The messages received by the calling process are guaranteed to be received in the
* same order as they were sent by the source remote rank. No ordering is guaranteed
* between messages received from different remote ranks.
*
* @return Vector of completed messages ready for local processing.
*/
[[nodiscard]] virtual std::vector<std::unique_ptr<Message>> recv() = 0;

/**
* @brief Check if the communication layer is currently idle.
*
* Indicates whether there are any active or pending communication operations.
* A return value of `true` means the exchange is idling, i.e. no operations
* are currently in progress. However, new send/receive requests may still be
* submitted in the future; this does not imply that all communication has been
* fully finalized or globally synchronized.
*
* @return `true` if the communication layer is idle; `false` if activity is ongoing.
*/
[[nodiscard]] virtual bool is_idle() const = 0;
};


} // namespace rapidsmpf::communicator
150 changes: 150 additions & 0 deletions cpp/include/rapidsmpf/communicator/metadata_payload_exchange/tag.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once

#include <cstdint>
#include <functional>
#include <memory>
#include <unordered_map>
#include <vector>

#include <rapidsmpf/buffer/buffer.hpp>
#include <rapidsmpf/communicator/communicator.hpp>
#include <rapidsmpf/communicator/metadata_payload_exchange/core.hpp>
#include <rapidsmpf/statistics.hpp>

namespace rapidsmpf::communicator {

/**
* @brief Tag-based implementation of MetadataPayloadExchange.
*
* This implementation provides the same communication protocol as
* TagMetadataPayloadExchange but works with the abstract Message.
*/
class TagMetadataPayloadExchange : public MetadataPayloadExchange {
public:
/**
* @brief Constructor for TagMetadataPayloadExchange.
*
* @param comm The communicator to use for operations.
* @param op_id The operation ID for tagging messages.
* @param allocate_buffer_fn Function to allocate buffers for incoming data.
* @param statistics The statistics to use for tracking communication operations.
*/
TagMetadataPayloadExchange(
std::shared_ptr<Communicator> comm,
OpID op_id,
std::function<std::unique_ptr<Buffer>(std::size_t)> allocate_buffer_fn,
std::shared_ptr<Statistics> statistics
);

/**
* @copydoc MetadataPayloadExchange::send
*
* @throw std::runtime_error if a message is sent to itself or if an outgoing
* message already exists.
*/
void send(std::unique_ptr<Message> message) override;

// clang-format off
/**
* @copydoc MetadataPayloadExchange::send(std::vector<std::unique_ptr<Message>>&& messages);
*
* @throw std::runtime_error if a message is sent to itself or if an outgoing
* message already exists.
*/
// clang-format on
void send(std::vector<std::unique_ptr<Message>>&& messages) override;

/**
* @copydoc MetadataPayloadExchange::recv
*
* Advances the communication state machine by:
* - Receiving incoming message metadata
* - Setting up data transfers
* - Handling completed data transfers
* - Cleaning up completed operations
*/
std::vector<std::unique_ptr<Message>> recv() override;

/**
* @copydoc MetadataPayloadExchange::is_idle
*/
bool is_idle() const override;

private:
/**
* @brief Internal wrapper for tracking protocol-specific state.
*
* This struct wraps a Message with protocol-specific tracking information
* that is used internally by TagMetadataPayloadExchange but not exposed
* through the public interface.
*/
struct TagMessage {
std::unique_ptr<Message> message;
std::uint64_t message_id{0};
std::size_t expected_payload_size{0};

TagMessage(
std::unique_ptr<Message> msg, std::uint64_t id = 0, std::size_t size = 0
)
: message(std::move(msg)), message_id(id), expected_payload_size(size) {}
};

// Core communication infrastructure
std::shared_ptr<Communicator> comm_;
Tag const metadata_tag_;
Tag const gpu_data_tag_;
std::function<std::unique_ptr<Buffer>(std::size_t)> allocate_buffer_fn_;

// Communication state containers
std::vector<std::unique_ptr<Communicator::Future>>
fire_and_forget_; ///< Ongoing "fire-and-forget" operations (non-blocking sends).
std::unordered_map<Rank, std::vector<TagMessage>>
incoming_messages_; ///< Messages ready to be received, grouped by rank.
std::unordered_map<std::uint64_t, TagMessage>
in_transit_messages_; ///< Messages currently in transit.
std::unordered_map<std::uint64_t, std::unique_ptr<Communicator::Future>>
in_transit_futures_; ///< Futures corresponding to in-transit messages.

// Statistics tracking
std::shared_ptr<Statistics> statistics_;

// Sequential message ID generator
std::uint64_t next_message_id_{0};

/**
* @brief Receive metadata for incoming messages.
*/
void receive_metadata();

/**
* @brief Setup data receives for incoming messages.
*
* @return A vector of completed metadata-only messages.
*
* @throw std::runtime_error if an in-transit message or future is not found, or
* if a data buffer is not available.
*/
std::vector<std::unique_ptr<Message>> setup_data_receives();

/**
* @brief Complete data transfers for in-transit messages.
*
* @return A vector of completed messages.
*
* @throw std::runtime_error if an in-transit message or future is not found, or
* if a data buffer is not available
*/
std::vector<std::unique_ptr<Message>> complete_data_transfers();

/**
* @brief Cleanup completed operations (fire-and-forget sends and receives).
*/
void cleanup_completed_operations();
};


} // namespace rapidsmpf::communicator
30 changes: 30 additions & 0 deletions cpp/src/communicator/metadata_payload_exchange/core.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/

#include <rapidsmpf/communicator/metadata_payload_exchange/core.hpp>
#include <rapidsmpf/error.hpp>

namespace rapidsmpf::communicator {

MetadataPayloadExchange::Message::Message(
Rank peer_rank, std::vector<std::uint8_t> metadata, std::unique_ptr<Buffer> data
)
: peer_rank_(peer_rank), metadata_(std::move(metadata)), data_(std::move(data)) {}

Buffer const* MetadataPayloadExchange::Message::data() const {
return data_.get();
}

std::unique_ptr<Buffer> MetadataPayloadExchange::Message::release_data() {
return std::move(data_);
}

void MetadataPayloadExchange::Message::set_data(std::unique_ptr<Buffer> buffer) {
RAPIDSMPF_EXPECTS(data_ == nullptr, "data already set");
data_ = std::move(buffer);
}


} // namespace rapidsmpf::communicator
Loading