Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libs/core/executors/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ set(executors_headers
hpx/executors/execution_policy_parameters.hpp
hpx/executors/execution_policy_scheduling_property.hpp
hpx/executors/execution_policy.hpp
hpx/executors/executor_scheduler.hpp
hpx/executors/explicit_scheduler_executor.hpp
hpx/executors/fork_join_executor.hpp
hpx/executors/limiting_executor.hpp
Expand Down
141 changes: 141 additions & 0 deletions libs/core/executors/include/hpx/executors/executor_scheduler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright (c) 2026 The STE||AR-Group
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

/// \file parallel/executors/executor_scheduler.hpp

#pragma once

#include <hpx/config.hpp>
#include <hpx/executors/post.hpp>
#include <hpx/modules/errors.hpp>
#include <hpx/modules/execution.hpp>
#include <hpx/modules/execution_base.hpp>

#include <exception>
#include <type_traits>
#include <utility>

namespace hpx::execution::experimental {

// Forward declarations
template <typename Executor>
struct executor_scheduler;

template <typename Executor>
struct executor_sender;

///////////////////////////////////////////////////////////////////////////
template <typename Executor, typename Receiver>
struct executor_operation_state
{
HPX_NO_UNIQUE_ADDRESS std::decay_t<Executor> exec_;
HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver_;

template <typename Exec, typename Recv>
executor_operation_state(Exec&& exec, Recv&& recv)
: exec_(HPX_FORWARD(Exec, exec))
, receiver_(HPX_FORWARD(Recv, recv))
{
}

executor_operation_state(executor_operation_state&&) = delete;
executor_operation_state(executor_operation_state const&) = delete;
executor_operation_state& operator=(
executor_operation_state&&) = delete;
executor_operation_state& operator=(
executor_operation_state const&) = delete;

~executor_operation_state() = default;

friend void tag_invoke(start_t, executor_operation_state& os) noexcept
{
hpx::detail::try_catch_exception_ptr(
[&]() {
hpx::parallel::execution::post(os.exec_,
[receiver = HPX_MOVE(os.receiver_)]() mutable {
hpx::execution::experimental::set_value(
HPX_MOVE(receiver));
});
},
[&](std::exception_ptr ep) {
hpx::execution::experimental::set_error(
HPX_MOVE(os.receiver_), HPX_MOVE(ep));
});
}
};

///////////////////////////////////////////////////////////////////////////
template <typename Executor>
struct executor_sender
{
HPX_NO_UNIQUE_ADDRESS std::decay_t<Executor> exec_;

using completion_signatures =
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_value_t(),
hpx::execution::experimental::set_error_t(std::exception_ptr)>;

template <typename Env>
friend auto tag_invoke(
hpx::execution::experimental::get_completion_signatures_t,
executor_sender const&, Env) noexcept -> completion_signatures;

template <typename Receiver>
friend executor_operation_state<Executor, Receiver> tag_invoke(
connect_t, executor_sender&& s, Receiver&& receiver)
{
return {HPX_MOVE(s.exec_), HPX_FORWARD(Receiver, receiver)};
}

template <typename Receiver>
friend executor_operation_state<Executor, Receiver> tag_invoke(
connect_t, executor_sender const& s, Receiver&& receiver)
{
return {s.exec_, HPX_FORWARD(Receiver, receiver)};
}
};

///////////////////////////////////////////////////////////////////////////
template <typename Executor>
struct executor_scheduler
{
using executor_type = std::decay_t<Executor>;

HPX_NO_UNIQUE_ADDRESS executor_type exec_;

constexpr executor_scheduler() = default;

template <typename Exec,
typename = std::enable_if_t<
!std::is_same_v<std::decay_t<Exec>, executor_scheduler>>>
explicit executor_scheduler(Exec&& exec)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use requires() instead of SFINAE:

Suggested change
template <typename Exec,
typename = std::enable_if_t<
!std::is_same_v<std::decay_t<Exec>, executor_scheduler>>>
explicit executor_scheduler(Exec&& exec)
template <typename Exec>
requires(!std::is_same_v<std::decay_t<Exec>, executor_scheduler>)
explicit executor_scheduler(Exec&& exec)

: exec_(HPX_FORWARD(Exec, exec))
{
}

constexpr bool operator==(executor_scheduler const& rhs) const noexcept
{
return exec_ == rhs.exec_;
}

constexpr bool operator!=(executor_scheduler const& rhs) const noexcept
{
return !(*this == rhs);
}

friend executor_sender<Executor> tag_invoke(
schedule_t, executor_scheduler&& sched)
{
return {HPX_MOVE(sched.exec_)};
}

friend executor_sender<Executor> tag_invoke(
schedule_t, executor_scheduler const& sched)
{
return {sched.exec_};
}
};
} // namespace hpx::execution::experimental
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <hpx/config.hpp>
#include <hpx/executors/execution_policy_mappings.hpp>
#include <hpx/executors/executor_scheduler.hpp>
#include <hpx/executors/parallel_executor.hpp>
#include <hpx/modules/errors.hpp>
#include <hpx/modules/execution.hpp>
Expand Down Expand Up @@ -236,6 +237,15 @@ namespace hpx::execution {
#endif
}

friend hpx::execution::experimental::executor_scheduler<
sequenced_executor>
tag_invoke(hpx::execution::experimental::get_scheduler_t,
sequenced_executor const& exec)
{
return hpx::execution::experimental::executor_scheduler<
sequenced_executor>(exec);
}

private:
friend class hpx::serialization::access;

Expand Down
1 change: 1 addition & 0 deletions libs/core/executors/tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ set(tests
annotation_property
created_executor
execution_policy_mappings
executor_scheduler
explicit_scheduler_executor
fork_join_executor
fork_join_executor_from
Expand Down
60 changes: 60 additions & 0 deletions libs/core/executors/tests/unit/executor_scheduler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2026 The STE||AR-Group
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/execution.hpp>
#include <hpx/future.hpp>
#include <hpx/init.hpp>
#include <hpx/modules/testing.hpp>

#include <cstdlib>
#include <string>
#include <utility>
#include <vector>

///////////////////////////////////////////////////////////////////////////////
void test_executor_scheduler_schedule()
{
using namespace hpx::execution::experimental;

hpx::execution::sequenced_executor exec;

// Retrieve a P2300-compliant scheduler from the legacy executor
auto sched = get_scheduler(exec);

// Verify the scheduler satisfies the is_scheduler trait
static_assert(is_scheduler_v<decltype(sched)>,
"executor_scheduler must satisfy is_scheduler");

// Create a sender pipeline: schedule | then
auto s = then(schedule(sched), []() { return 42; });

// Execute synchronously and verify the result
auto result = hpx::this_thread::experimental::sync_wait(std::move(s));

HPX_TEST(result.has_value());
HPX_TEST_EQ(hpx::get<0>(*result), 42);
}

///////////////////////////////////////////////////////////////////////////////
int hpx_main()
{
test_executor_scheduler_schedule();

return hpx::local::finalize();
}

int main(int argc, char* argv[])
{
std::vector<std::string> const cfg = {"hpx.os_threads=all"};

hpx::local::init_params init_args;
init_args.cfg = cfg;

HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0,
"HPX main exited with non-zero status");

return hpx::util::report_errors();
}
Loading