Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 22 additions & 29 deletions libs/core/async_mpi/include/hpx/async_mpi/transform_mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ namespace hpx::mpi::experimental {
HPX_CXX_CORE_EXPORT template <typename R, typename F>
struct transform_mpi_receiver
{
using is_receiver = void;
using receiver_concept = hpx::execution::experimental::receiver_t;
HPX_NO_UNIQUE_ADDRESS std::decay_t<R> r;
HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f;

Expand All @@ -85,55 +85,52 @@ namespace hpx::mpi::experimental {
}

template <typename E>
friend constexpr void tag_invoke(
hpx::execution::experimental::set_error_t,
transform_mpi_receiver&& r, E&& e) noexcept
void set_error(E&& e) && noexcept
{
hpx::execution::experimental::set_error(
HPX_MOVE(r.r), HPX_FORWARD(E, e));
HPX_MOVE(r), HPX_FORWARD(E, e));
}

friend constexpr void tag_invoke(
hpx::execution::experimental::set_stopped_t,
transform_mpi_receiver&& r) noexcept
void set_stopped() && noexcept
{
hpx::execution::experimental::set_stopped(HPX_MOVE(r.r));
};
hpx::execution::experimental::set_stopped(HPX_MOVE(r));
}

template <typename... Ts,
typename = std::enable_if_t<
hpx::is_invocable_v<F, Ts..., MPI_Request*>>>
friend constexpr void tag_invoke(
hpx::execution::experimental::set_value_t,
transform_mpi_receiver&& r, Ts&&... ts) noexcept
void set_value(Ts&&... ts) && noexcept
{
hpx::detail::try_catch_exception_ptr(
[&]() {
if constexpr (std::is_void_v<util::invoke_result_t<F,
Ts..., MPI_Request*>>)
{
MPI_Request request;
HPX_INVOKE(r.f, ts..., &request);
HPX_INVOKE(f, ts..., &request);
// When the return type is void, there is no value
// to forward to the receiver
set_value_request_callback_void(
request, HPX_MOVE(r.r), HPX_FORWARD(Ts, ts)...);
request, HPX_MOVE(r), HPX_FORWARD(Ts, ts)...);
}
else
{
MPI_Request request;
// When the return type is non-void, we have to
// forward the value to the receiver
auto&& result = HPX_INVOKE(
r.f, HPX_FORWARD(Ts, ts)..., &request);
// forward the value to the receiver. Pass `ts...`
// unforwarded into `f` so the same arguments can
// be moved once into the keep-alive callback
// below; this matches the void branch above and
// avoids a double-move when `Ts...` are rvalues.
auto&& result = HPX_INVOKE(f, ts..., &request);
set_value_request_callback_non_void(request,
HPX_MOVE(r.r), HPX_MOVE(result),
HPX_MOVE(r), HPX_MOVE(result),
HPX_FORWARD(Ts, ts)...);
}
},
[&](std::exception_ptr ep) {
hpx::execution::experimental::set_error(
HPX_MOVE(r.r), HPX_MOVE(ep));
HPX_MOVE(r), HPX_MOVE(ep));
});
}
};
Expand Down Expand Up @@ -204,22 +201,18 @@ namespace hpx::mpi::experimental {
// clang-format on

template <typename R>
friend constexpr auto tag_invoke(
hpx::execution::experimental::connect_t,
transform_mpi_sender& s, R&& r)
constexpr auto connect(R&& r) &
{
return hpx::execution::experimental::connect(
s.s, transform_mpi_receiver<R, F>(HPX_FORWARD(R, r), s.f));
s, transform_mpi_receiver<R, F>(HPX_FORWARD(R, r), f));
}

template <typename R>
friend constexpr auto tag_invoke(
hpx::execution::experimental::connect_t,
transform_mpi_sender&& s, R&& r)
constexpr auto connect(R&& r) &&
{
return hpx::execution::experimental::connect(HPX_MOVE(s.s),
return hpx::execution::experimental::connect(HPX_MOVE(s),
transform_mpi_receiver<R, F>(
HPX_FORWARD(R, r), HPX_MOVE(s.f)));
HPX_FORWARD(R, r), HPX_MOVE(f)));
}
};
} // namespace detail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,6 @@ namespace hpx::execution::experimental {

} // namespace detail

namespace detail {

HPX_CXX_CORE_EXPORT template <typename Sender, typename Enable = void>
struct has_completion_signatures : std::false_type
{
};

HPX_CXX_CORE_EXPORT template <typename Sender>
struct has_completion_signatures<Sender,
std::void_t<
typename remove_cv_ref_t<Sender>::completion_signatures>>
: std::true_type
{
};
} // namespace detail
// A sender is a type that is describing an asynchronous operation. The
// operation itself might not have started yet. In order to get the result
// of this asynchronous operation, a sender needs to be connected to a
Expand Down
1 change: 0 additions & 1 deletion libs/core/execution_base/tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ set(tests
get_env
execute_may_block_caller
stdexec
test_tag_invoke_only_completion_signatures
)

if(HPX_WITH_CXX20_COROUTINES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,6 @@ void test_awaitable_sender1(Signatures&&, Awaiter&&)
static_assert(ex::is_awaitable_v<awaitable_sender_1<Awaiter>>);

awaitable_sender_1<Awaiter> s;
static_assert(!hpx::meta::value<
ex::detail::has_completion_signatures<awaitable_sender_1<Awaiter>>>);
static_assert(std::is_same_v<decltype(ex::get_completion_signatures(
s, ex::empty_env{})),
Signatures>);
Expand Down
66 changes: 7 additions & 59 deletions libs/core/execution_base/tests/unit/coroutine_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,46 +177,6 @@ int main()
{
namespace ex = hpx::execution::experimental;

// clang-format off
#if !defined(HPX_HAVE_STDEXEC)
{
// clang-format off
static_assert(
std::is_same_v<ex::single_sender_value_t<non_awaitable_sender<decltype(
signature_all(std::exception_ptr(), int()))>>,
int>);
static_assert(
std::is_same_v<ex::single_sender_value_t<non_awaitable_sender<decltype(
signature_all(std::exception_ptr()))>>,
void>);
// clang-format on
}
#endif
// clang-format on

// single sender value
#if !defined(HPX_HAVE_STDEXEC)
{
static_assert(std::is_same_v<
ex::single_sender_value_t<awaitable_sender_1<awaiter>>, bool>);
static_assert(std::is_same_v<
ex::single_sender_value_t<awaitable_sender_1<hpx::suspend_always>>,
void>);
}
#endif

// connect awaitable
#if !defined(HPX_HAVE_STDEXEC)
{
// Verify that connect_awaitable and connect return the same operation state type
static_assert(std::is_same_v<decltype(ex::connect_awaitable(
awaitable_sender_1<awaiter>{},
recv_set_value{})),
decltype(ex::connect(
awaitable_sender_1<awaiter>{}, recv_set_value{}))>);
}
#endif

// Promise env
{
static_assert(ex::is_awaiter_v<awaiter>);
Expand Down Expand Up @@ -255,25 +215,13 @@ int main()
awaiter>);
}

// Operation base
#if !defined(HPX_HAVE_STDEXEC)
{
using operation_type = decltype(ex::connect(
awaitable_sender_1<awaiter>{}, recv_set_value{}));
static_assert(ex::is_operation_state_v<operation_type>);
}
#endif

// Connect result type
#if !defined(HPX_HAVE_STDEXEC)
{
using operation_type = decltype(ex::connect(
awaitable_sender_1<awaiter>{}, recv_set_value{}));
static_assert(std::is_same_v<
ex::connect_result_t<awaitable_sender_1<awaiter>, recv_set_value>,
operation_type>);
}
#endif
// Note: tests for `single_sender_value_t<non_awaitable_sender<...>>`,
// `single_sender_value_t<awaitable_sender_1<...>>`, `connect_awaitable`
// and `connect_result_t<awaitable_sender_1, ...>` were removed in the
// post-stdexec cleanup. Under stdexec, awaitables are not standalone
// senders outside a coroutine context (they require
// `with_awaitable_senders`), so those tests relied on HPX's removed
// awaitable-as-sender path and are no longer applicable.

// As awaitable
{
Expand Down
Loading
Loading