diff --git a/libs/core/executors/CMakeLists.txt b/libs/core/executors/CMakeLists.txt index 8deb1494338..9149359ac1c 100644 --- a/libs/core/executors/CMakeLists.txt +++ b/libs/core/executors/CMakeLists.txt @@ -26,6 +26,8 @@ set(executors_headers hpx/executors/execution_policy_parameters.hpp hpx/executors/execution_policy_scheduling_property.hpp hpx/executors/execution_policy.hpp + hpx/executors/executor_scheduler.hpp + hpx/executors/executor_scheduler_bulk.hpp hpx/executors/explicit_scheduler_executor.hpp hpx/executors/fork_join_executor.hpp hpx/executors/limiting_executor.hpp diff --git a/libs/core/executors/include/hpx/executors/executor_scheduler.hpp b/libs/core/executors/include/hpx/executors/executor_scheduler.hpp new file mode 100644 index 00000000000..6949b4d2777 --- /dev/null +++ b/libs/core/executors/include/hpx/executors/executor_scheduler.hpp @@ -0,0 +1,148 @@ +// Copyright (c) 2026 The STE||AR-Group +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +/// \file parallel/executors/executor_scheduler.hpp + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +namespace hpx::execution::experimental { + + // Forward declarations + template + struct executor_scheduler; + + template + struct executor_sender; + + /////////////////////////////////////////////////////////////////////////// + template + struct executor_operation_state + { + HPX_NO_UNIQUE_ADDRESS std::decay_t exec_; + HPX_NO_UNIQUE_ADDRESS std::decay_t receiver_; + + template + executor_operation_state(Exec&& exec, Recv&& recv) + : exec_(HPX_FORWARD(Exec, exec)) + , receiver_(HPX_FORWARD(Recv, recv)) + { + } + + executor_operation_state(executor_operation_state&&) = delete; + executor_operation_state(executor_operation_state const&) = delete; + executor_operation_state& operator=( + executor_operation_state&&) = delete; + executor_operation_state& operator=( + executor_operation_state const&) = delete; + + ~executor_operation_state() = default; + + friend void tag_invoke(start_t, executor_operation_state& os) noexcept + { + hpx::detail::try_catch_exception_ptr( + [&]() { + hpx::parallel::execution::post(os.exec_, + [receiver = HPX_MOVE(os.receiver_)]() mutable { + hpx::execution::experimental::set_value( + HPX_MOVE(receiver)); + }); + }, + [&](std::exception_ptr ep) { + hpx::execution::experimental::set_error( + HPX_MOVE(os.receiver_), HPX_MOVE(ep)); + }); + } + }; + + /////////////////////////////////////////////////////////////////////////// + template + struct executor_sender + { + HPX_NO_UNIQUE_ADDRESS std::decay_t exec_; + + using completion_signatures = + hpx::execution::experimental::completion_signatures< + hpx::execution::experimental::set_value_t(), + hpx::execution::experimental::set_error_t(std::exception_ptr)>; + + template + friend auto tag_invoke( + hpx::execution::experimental::get_completion_signatures_t, + executor_sender const&, Env) noexcept -> completion_signatures; + + friend constexpr auto tag_invoke( + hpx::execution::experimental::get_completion_scheduler_t< + hpx::execution::experimental::set_value_t>, + executor_sender const& s) noexcept + { + return executor_scheduler{s.exec_}; + } + + template + friend executor_operation_state tag_invoke( + connect_t, executor_sender&& s, Receiver&& receiver) + { + return {HPX_MOVE(s.exec_), HPX_FORWARD(Receiver, receiver)}; + } + + template + friend executor_operation_state tag_invoke( + connect_t, executor_sender const& s, Receiver&& receiver) + { + return {s.exec_, HPX_FORWARD(Receiver, receiver)}; + } + }; + + /////////////////////////////////////////////////////////////////////////// + template + struct executor_scheduler + { + using executor_type = std::decay_t; + + HPX_NO_UNIQUE_ADDRESS executor_type exec_; + + constexpr executor_scheduler() = default; + + template , executor_scheduler>>> + explicit executor_scheduler(Exec&& exec) + : exec_(HPX_FORWARD(Exec, exec)) + { + } + + constexpr bool operator==(executor_scheduler const& rhs) const noexcept + { + return exec_ == rhs.exec_; + } + + constexpr bool operator!=(executor_scheduler const& rhs) const noexcept + { + return !(*this == rhs); + } + + friend executor_sender tag_invoke( + schedule_t, executor_scheduler&& sched) + { + return {HPX_MOVE(sched.exec_)}; + } + + friend executor_sender tag_invoke( + schedule_t, executor_scheduler const& sched) + { + return {sched.exec_}; + } + }; +} // namespace hpx::execution::experimental diff --git a/libs/core/executors/include/hpx/executors/executor_scheduler_bulk.hpp b/libs/core/executors/include/hpx/executors/executor_scheduler_bulk.hpp new file mode 100644 index 00000000000..52ce7a62e4e --- /dev/null +++ b/libs/core/executors/include/hpx/executors/executor_scheduler_bulk.hpp @@ -0,0 +1,222 @@ +// Copyright (c) 2026 The STE||AR-Group +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#if defined(HPX_HAVE_STDEXEC) +#include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace hpx::execution::experimental { + + namespace detail { + template + struct executor_bulk_receiver + { + HPX_NO_UNIQUE_ADDRESS std::decay_t exec_; + HPX_NO_UNIQUE_ADDRESS std::decay_t receiver_; + HPX_NO_UNIQUE_ADDRESS std::decay_t shape_; + HPX_NO_UNIQUE_ADDRESS std::decay_t f_; + + template + friend void tag_invoke( + set_error_t, executor_bulk_receiver&& r, Error&& error) noexcept + { + hpx::execution::experimental::set_error( + HPX_MOVE(r.receiver_), HPX_FORWARD(Error, error)); + } + + friend void tag_invoke( + set_stopped_t, executor_bulk_receiver&& r) noexcept + { + hpx::execution::experimental::set_stopped( + HPX_MOVE(r.receiver_)); + } + + template + friend void tag_invoke( + set_value_t, executor_bulk_receiver&& r, Ts&&... ts) noexcept + { + hpx::detail::try_catch_exception_ptr( + [&]() { + hpx::parallel::execution::bulk_sync_execute( + r.exec_, r.f_, r.shape_, ts...); + + hpx::execution::experimental::set_value( + HPX_MOVE(r.receiver_), HPX_FORWARD(Ts, ts)...); + }, + [&](std::exception_ptr ep) { + hpx::execution::experimental::set_error( + HPX_MOVE(r.receiver_), HPX_MOVE(ep)); + }); + } + }; + + template + struct executor_bulk_sender + { + HPX_NO_UNIQUE_ADDRESS std::decay_t exec_; + HPX_NO_UNIQUE_ADDRESS std::decay_t sender_; + HPX_NO_UNIQUE_ADDRESS std::decay_t shape_; + HPX_NO_UNIQUE_ADDRESS std::decay_t f_; + +#if defined(HPX_HAVE_STDEXEC) + using sender_concept = hpx::execution::experimental::sender_t; + + template + friend auto tag_invoke( + hpx::execution::experimental::get_completion_signatures_t, + executor_bulk_sender const&, Env const&) -> hpx::execution:: + experimental::transform_completion_signatures_of>; + + struct env + { + std::decay_t const& pred_snd; + std::decay_t const& exec; + + template + requires( + meta::value> && + hpx::execution::experimental::detail:: + has_completion_scheduler_v>) + friend constexpr auto tag_invoke( + hpx::execution::experimental::get_completion_scheduler_t< + CPO> + tag, + env const& e) noexcept + { + return tag( + hpx::execution::experimental::get_env(e.pred_snd)); + } + + friend constexpr auto tag_invoke( + hpx::execution::experimental::get_completion_scheduler_t< + hpx::execution::experimental::set_value_t>, + env const& e) noexcept + { + return hpx::execution::experimental::executor_scheduler< + Executor>{e.exec}; + } + }; + + friend constexpr auto tag_invoke( + hpx::execution::experimental::get_env_t, + executor_bulk_sender const& s) noexcept + { + return env{s.sender_, s.exec_}; + } +#else + template + struct generate_completion_signatures + { + template