Skip to content

Conversation

@pentschev
Copy link
Member

@pentschev pentschev commented Aug 14, 2025

Move communication operations from the Shuffler::ProgressThread into a new CommunicationInterface abstract class to handle all communication, with the use of a class Message. This approach has the benefit of making a more distinct and easier to understand separation of the communication routine from the shuffler making the progress operator considerably smaller (will be done in follow-up PR), simultaneously allowing extensions to the communication interface disjoint from the shuffler, for example, to allow implementing an Active Message-based communicator for the shuffler. It also generalizes the concept of a "message", instead of transferring chunks all the CommunicationInterface knows about are messages represented by the Message class that combines metadata, payload and src/dst rank.

Currently implement only a TagCommunicationInterface that implements exactly the behavior that is implemented in the shuffler progress, additionally removing the need for the ack message. Extending to support Active Messages will also be handled in a separate PR.

Removes also ack messages, closes #475, closes #535 .

@pentschev pentschev self-assigned this Aug 14, 2025
@pentschev pentschev added feature request New feature or request non-breaking Introduces a non-breaking change labels Aug 14, 2025
@copy-pr-bot
Copy link

copy-pr-bot bot commented Aug 14, 2025

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@pentschev
Copy link
Member Author

/ok to test

@pentschev
Copy link
Member Author

/ok to test

@pentschev
Copy link
Member Author

/ok to test

@pentschev pentschev changed the title Separate shuffler's communication into separate interface Separate shuffler's communication into new interface Aug 15, 2025
@pentschev pentschev marked this pull request as ready for review September 24, 2025 20:47
@pentschev pentschev requested a review from a team as a code owner September 24, 2025 20:47
@pentschev
Copy link
Member Author

@madsbk @wence- @nirandaperera I think I have addressed everything, please let me know if I missed something.

Copy link
Member

@madsbk madsbk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, @pentschev. Besides a small suggestion, the only remaining comment I have is about organization.

I think we should move this to a subdir, something like:

rapidsmpf/communicator/utils/metadata_payload_exchange/core.hpp
rapidsmpf/communicator/utils/metadata_payload_exchange/tag.hpp
rapidsmpf/communicator/utils/metadata_payload_exchange/active_messaging.hpp

@pentschev
Copy link
Member Author

Looks great, @pentschev. Besides a small suggestion, the only remaining comment I have is about organization.

I think we should move this to a subdir, something like:

rapidsmpf/communicator/utils/metadata_payload_exchange/core.hpp
rapidsmpf/communicator/utils/metadata_payload_exchange/tag.hpp
rapidsmpf/communicator/utils/metadata_payload_exchange/active_messaging.hpp

Done in 2eeb521, I left it out of utils though, I think that gets too deep and doesn't really add much value, but let me know if you feel strongly about that.

@pentschev
Copy link
Member Author

Anything else @wence- @nirandaperera ?

Comment on lines +131 to +143
std::vector<std::unique_ptr<MetadataPayloadExchange::Message>> received_messages;
for (int iter = 0; iter < 10 && received_messages.empty(); ++iter) {
auto messages = comm_interface->recv();
received_messages.insert(
received_messages.end(),
std::make_move_iterator(messages.begin()),
std::make_move_iterator(messages.end())
);

if (!received_messages.empty())
break;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test could fail if everything is very slow, right? We're not actually waiting for completion.

Also, nit, use std::ranges::move(messages, std::back_inserter(received_messages))

Comment on lines +176 to +188
std::vector<std::unique_ptr<MetadataPayloadExchange::Message>> received_messages;
for (int iter = 0; iter < 50 && received_messages.empty(); ++iter) {
auto messages = comm_interface->recv();
received_messages.insert(
received_messages.end(),
std::make_move_iterator(messages.begin()),
std::make_move_iterator(messages.end())
);

if (!received_messages.empty())
break;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again here this loop makes me worried, since it could randomly fail if the other side is very slow.

Comment on lines +80 to +85
void wait_for_communication_complete() {
for (int iter = 0; iter < 100 && !comm_interface->is_idle(); ++iter) {
comm_interface->recv();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can spuriously exit without communication being completed.

This suggests that we need a way to actually progress until everything is done, but we currently don't have that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not possible in general because we can't know on the receive side that someone has sent us a message until we get it...

Comment on lines +145 to +151
if (comm->rank() == peer_rank) {
EXPECT_EQ(received_messages.size(), 1);
auto& msg = received_messages[0];
EXPECT_EQ(msg->peer_rank(), 0);
EXPECT_EQ(msg->metadata(), test_metadata);
EXPECT_EQ(msg->data(), nullptr);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This never fires, right? Because peer_rank is defined as (comm->rank() + 1) % nranks so it is never true that comm->rank() == peer_rank.

Comment on lines +131 to +143
std::vector<std::unique_ptr<MetadataPayloadExchange::Message>> received_messages;
for (int iter = 0; iter < 10 && received_messages.empty(); ++iter) {
auto messages = comm_interface->recv();
received_messages.insert(
received_messages.end(),
std::make_move_iterator(messages.begin()),
std::make_move_iterator(messages.end())
);

if (!received_messages.empty())
break;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test, rank 0 sends to rank 1, and everyone else "does nothing".

But the test is not really following that flow.

I think we want something like:

if (comm->nranks() != 2) {
   GTEST_SKIP() << "Only for two ranks";
}

if (comm->rank() == 0) {
   send(message);
} else {
    auto received = ...;
    while (received.empty()) {
        std::ranges::move(comm_interface->recv(), std::back_inserter(received));
        std::this_thread::yield();
    }
    EXPECT_EQ(received.size(), 1);
    EXPECT_EQ(msg->peer_rank(), 0);
    ...
}

But now I am worried about rank-0 finishing the test with stuff still in flight.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, wait, rank-0 must also call recv repeatedly because that's the only thing that actually progresses the underlying tag communication.

This is very counter intuitive.

Comment on lines +107 to +111
completed_messages.insert(
completed_messages.end(),
std::make_move_iterator(completed_data.begin()),
std::make_move_iterator(completed_data.end())
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::ranges::move(completed_data, std::back_inserter(completed_messages));

Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, reading through the usage in the tests gives me some pause for thought.

It seems like the sender must call recv repeatedly to progress sends (even if they're never receiving anything). Do I have this right?

@pentschev
Copy link
Member Author

pentschev commented Oct 31, 2025

@wence- I have pushed multiple changes that do what you requested in your latest comments and what we've discussed offline. Here's a summary of the changes:

  1. Add a progress() method to allow progressing the state of MetadataPayloadExchange independent of recv()
  2. recv() now only returns messages that were previously received during progress()
  3. Address test issues
    1. Use a ring topology instead of the (also previously broken) root to all: send data to "next rank", receive data from "previous rank"
    2. Remove timeouts, that was an overly eager optimization and could also lead to incomplete processing as you noticed. If tests hang this will cause a deadlock but it is how most of our tests are currently written we can always revisit and add timeouts later if they begin to show signs of being problematic
  4. Fixed a bug with message ordering that was introduced with some of the changes made when addressing previous change requests in this PR

I hope this now addresses all the concerns, and more importantly does it all correctly.

One more thing: I just want to say I will start pushing against changes in the Tag communication flow in this PR, my initial idea was to start simple with porting over the Shuffler comms implementation, and while some of the changes requested previously were indeed important to make MetadataPayloadExchange better, others were deeply involved in changing the actual implementation details of the Tag code which led me to make mistakes that are not necessarily well-tested in this PR because the Shuffler is not currently using the changes here. This made me spend over an hour now to address item 4, since that change was not necessary from an interface perspective. So for any implementation changes that do not pertain to the interface but rather to the Tag API implementation let's do so in a follow-up PR where needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature request New feature or request non-breaking Introduces a non-breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Remove need for ack messages in shuffler implementation

4 participants