Skip to content

Commit 30e3cad

Browse files
author
Hackathon User
committed
Implement P2300 get_scheduler bridge for parallel_executor
- Add executor_scheduler<Executor> adapter (header + module registration) - Add get_scheduler_t tag_invoke to parallel_policy_executor (both flat and hierarchical) - Add get_scheduler_t tag_invoke to sequenced_executor - Add unit tests for both executor_scheduler and parallel_executor_scheduler
1 parent 4d26638 commit 30e3cad

7 files changed

Lines changed: 295 additions & 4 deletions

File tree

libs/core/executors/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ set(executors_headers
2626
hpx/executors/execution_policy_parameters.hpp
2727
hpx/executors/execution_policy_scheduling_property.hpp
2828
hpx/executors/execution_policy.hpp
29+
hpx/executors/executor_scheduler.hpp
2930
hpx/executors/explicit_scheduler_executor.hpp
3031
hpx/executors/fork_join_executor.hpp
3132
hpx/executors/limiting_executor.hpp
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Copyright (c) 2026 The STE||AR-Group
2+
//
3+
// SPDX-License-Identifier: BSL-1.0
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
7+
/// \file parallel/executors/executor_scheduler.hpp
8+
9+
#pragma once
10+
11+
#include <hpx/config.hpp>
12+
#include <hpx/executors/post.hpp>
13+
#include <hpx/modules/errors.hpp>
14+
#include <hpx/modules/execution.hpp>
15+
#include <hpx/modules/execution_base.hpp>
16+
17+
#include <exception>
18+
#include <type_traits>
19+
#include <utility>
20+
21+
namespace hpx::execution::experimental {
22+
23+
// Forward declarations
24+
template <typename Executor>
25+
struct executor_scheduler;
26+
27+
template <typename Executor>
28+
struct executor_sender;
29+
30+
///////////////////////////////////////////////////////////////////////////
31+
template <typename Executor, typename Receiver>
32+
struct executor_operation_state
33+
{
34+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Executor> exec_;
35+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver_;
36+
37+
template <typename Exec, typename Recv>
38+
executor_operation_state(Exec&& exec, Recv&& recv)
39+
: exec_(HPX_FORWARD(Exec, exec))
40+
, receiver_(HPX_FORWARD(Recv, recv))
41+
{
42+
}
43+
44+
executor_operation_state(executor_operation_state&&) = delete;
45+
executor_operation_state(executor_operation_state const&) = delete;
46+
executor_operation_state& operator=(
47+
executor_operation_state&&) = delete;
48+
executor_operation_state& operator=(
49+
executor_operation_state const&) = delete;
50+
51+
~executor_operation_state() = default;
52+
53+
friend void tag_invoke(start_t, executor_operation_state& os) noexcept
54+
{
55+
hpx::detail::try_catch_exception_ptr(
56+
[&]() {
57+
hpx::parallel::execution::post(os.exec_,
58+
[receiver = HPX_MOVE(os.receiver_)]() mutable {
59+
hpx::execution::experimental::set_value(
60+
HPX_MOVE(receiver));
61+
});
62+
},
63+
[&](std::exception_ptr ep) {
64+
hpx::execution::experimental::set_error(
65+
HPX_MOVE(os.receiver_), HPX_MOVE(ep));
66+
});
67+
}
68+
};
69+
70+
///////////////////////////////////////////////////////////////////////////
71+
template <typename Executor>
72+
struct executor_sender
73+
{
74+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Executor> exec_;
75+
76+
using completion_signatures =
77+
hpx::execution::experimental::completion_signatures<
78+
hpx::execution::experimental::set_value_t(),
79+
hpx::execution::experimental::set_error_t(std::exception_ptr)>;
80+
81+
template <typename Env>
82+
friend auto tag_invoke(
83+
hpx::execution::experimental::get_completion_signatures_t,
84+
executor_sender const&, Env) noexcept -> completion_signatures;
85+
86+
template <typename Receiver>
87+
friend executor_operation_state<Executor, Receiver> tag_invoke(
88+
connect_t, executor_sender&& s, Receiver&& receiver)
89+
{
90+
return {HPX_MOVE(s.exec_), HPX_FORWARD(Receiver, receiver)};
91+
}
92+
93+
template <typename Receiver>
94+
friend executor_operation_state<Executor, Receiver> tag_invoke(
95+
connect_t, executor_sender const& s, Receiver&& receiver)
96+
{
97+
return {s.exec_, HPX_FORWARD(Receiver, receiver)};
98+
}
99+
};
100+
101+
///////////////////////////////////////////////////////////////////////////
102+
template <typename Executor>
103+
struct executor_scheduler
104+
{
105+
using executor_type = std::decay_t<Executor>;
106+
107+
HPX_NO_UNIQUE_ADDRESS executor_type exec_;
108+
109+
constexpr executor_scheduler() = default;
110+
111+
template <typename Exec,
112+
typename = std::enable_if_t<
113+
!std::is_same_v<std::decay_t<Exec>, executor_scheduler>>>
114+
explicit executor_scheduler(Exec&& exec)
115+
: exec_(HPX_FORWARD(Exec, exec))
116+
{
117+
}
118+
119+
constexpr bool operator==(executor_scheduler const& rhs) const noexcept
120+
{
121+
return exec_ == rhs.exec_;
122+
}
123+
124+
constexpr bool operator!=(executor_scheduler const& rhs) const noexcept
125+
{
126+
return !(*this == rhs);
127+
}
128+
129+
friend executor_sender<Executor> tag_invoke(
130+
schedule_t, executor_scheduler&& sched)
131+
{
132+
return {HPX_MOVE(sched.exec_)};
133+
}
134+
135+
friend executor_sender<Executor> tag_invoke(
136+
schedule_t, executor_scheduler const& sched)
137+
{
138+
return {sched.exec_};
139+
}
140+
};
141+
} // namespace hpx::execution::experimental

libs/core/executors/include/hpx/executors/parallel_executor.hpp

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <hpx/executors/detail/hierarchical_spawning.hpp>
1414
#include <hpx/executors/detail/index_queue_spawning.hpp>
1515
#include <hpx/executors/execution_policy_mappings.hpp>
16+
#include <hpx/executors/executor_scheduler.hpp>
1617
#include <hpx/modules/allocator_support.hpp>
1718
#include <hpx/modules/async_base.hpp>
1819
#include <hpx/modules/concepts.hpp>
@@ -259,7 +260,7 @@ namespace hpx::execution {
259260
#endif
260261

261262
using allocator_type = hpx::util::thread_local_caching_allocator<
262-
hpx::lockfree::variable_size_stack,
263+
hpx::lockfree::variable_size_stack, char,
263264
hpx::util::internal_allocator<>>;
264265
hpx::traits::detail::shared_state_ptr_t<result_type> p =
265266
lcos::detail::make_continuation_alloc_nounwrap<result_type>(
@@ -474,9 +475,7 @@ namespace hpx::execution {
474475
parallel_policy_executor& operator=(
475476
parallel_policy_executor&&) = default;
476477

477-
#if defined(__NVCC__) || defined(__CUDACC__)
478-
constexpr ~parallel_policy_executor() {}
479-
#endif
478+
~parallel_policy_executor() = default;
480479

481480
private:
482481
// property implementations
@@ -687,6 +686,15 @@ namespace hpx::execution {
687686
#endif
688687
}
689688
}
689+
690+
friend hpx::execution::experimental::executor_scheduler<
691+
parallel_policy_executor>
692+
tag_invoke(hpx::execution::experimental::get_scheduler_t,
693+
parallel_policy_executor const& exec)
694+
{
695+
return hpx::execution::experimental::executor_scheduler<
696+
parallel_policy_executor>(exec);
697+
}
690698
/// \endcond
691699

692700
public:
@@ -1015,6 +1023,15 @@ namespace hpx::execution {
10151023
#endif
10161024
}
10171025
}
1026+
1027+
friend hpx::execution::experimental::executor_scheduler<
1028+
parallel_policy_executor>
1029+
tag_invoke(hpx::execution::experimental::get_scheduler_t,
1030+
parallel_policy_executor const& exec)
1031+
{
1032+
return hpx::execution::experimental::executor_scheduler<
1033+
parallel_policy_executor>(exec);
1034+
}
10181035
/// \endcond
10191036

10201037
public:

libs/core/executors/include/hpx/executors/sequenced_executor.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#include <hpx/config.hpp>
1212
#include <hpx/executors/execution_policy_mappings.hpp>
13+
#include <hpx/executors/executor_scheduler.hpp>
1314
#include <hpx/executors/parallel_executor.hpp>
1415
#include <hpx/modules/errors.hpp>
1516
#include <hpx/modules/execution.hpp>
@@ -236,6 +237,15 @@ namespace hpx::execution {
236237
#endif
237238
}
238239

240+
friend hpx::execution::experimental::executor_scheduler<
241+
sequenced_executor>
242+
tag_invoke(hpx::execution::experimental::get_scheduler_t,
243+
sequenced_executor const& exec)
244+
{
245+
return hpx::execution::experimental::executor_scheduler<
246+
sequenced_executor>(exec);
247+
}
248+
239249
private:
240250
friend class hpx::serialization::access;
241251

libs/core/executors/tests/unit/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ set(tests
99
annotation_property
1010
created_executor
1111
execution_policy_mappings
12+
executor_scheduler
1213
explicit_scheduler_executor
1314
fork_join_executor
1415
fork_join_executor_from
1516
limiting_executor
1617
parallel_executor
1718
parallel_executor_parameters
19+
parallel_executor_scheduler
1820
parallel_fork_executor
1921
parallel_policy_executor
2022
polymorphic_executor
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright (c) 2026 The STE||AR-Group
2+
//
3+
// SPDX-License-Identifier: BSL-1.0
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
7+
#include <hpx/execution.hpp>
8+
#include <hpx/future.hpp>
9+
#include <hpx/init.hpp>
10+
#include <hpx/modules/testing.hpp>
11+
12+
#include <cstdlib>
13+
#include <string>
14+
#include <utility>
15+
#include <vector>
16+
17+
///////////////////////////////////////////////////////////////////////////////
18+
void test_executor_scheduler_schedule()
19+
{
20+
using namespace hpx::execution::experimental;
21+
22+
hpx::execution::sequenced_executor exec;
23+
24+
// Retrieve a P2300-compliant scheduler from the legacy executor
25+
auto sched = get_scheduler(exec);
26+
27+
// Verify the scheduler satisfies the is_scheduler trait
28+
static_assert(is_scheduler_v<decltype(sched)>,
29+
"executor_scheduler must satisfy is_scheduler");
30+
31+
// Create a sender pipeline: schedule | then
32+
auto s = then(schedule(sched), []() { return 42; });
33+
34+
// Execute synchronously and verify the result
35+
auto result = hpx::this_thread::experimental::sync_wait(std::move(s));
36+
37+
HPX_TEST(result.has_value());
38+
HPX_TEST_EQ(hpx::get<0>(*result), 42);
39+
}
40+
41+
///////////////////////////////////////////////////////////////////////////////
42+
int hpx_main()
43+
{
44+
test_executor_scheduler_schedule();
45+
46+
return hpx::local::finalize();
47+
}
48+
49+
int main(int argc, char* argv[])
50+
{
51+
std::vector<std::string> const cfg = {"hpx.os_threads=all"};
52+
53+
hpx::local::init_params init_args;
54+
init_args.cfg = cfg;
55+
56+
HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0,
57+
"HPX main exited with non-zero status");
58+
59+
return hpx::util::report_errors();
60+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright (c) 2026 The STE||AR-Group
2+
//
3+
// SPDX-License-Identifier: BSL-1.0
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
7+
#include <hpx/execution.hpp>
8+
#include <hpx/future.hpp>
9+
#include <hpx/init.hpp>
10+
#include <hpx/modules/testing.hpp>
11+
12+
#include <cstdlib>
13+
#include <string>
14+
#include <utility>
15+
#include <vector>
16+
17+
///////////////////////////////////////////////////////////////////////////////
18+
void test_parallel_executor_scheduler()
19+
{
20+
using namespace hpx::execution::experimental;
21+
22+
hpx::execution::parallel_executor exec;
23+
24+
// Retrieve a P2300-compliant scheduler from the legacy executor
25+
auto sched = get_scheduler(exec);
26+
27+
// Verify the scheduler satisfies the is_scheduler trait
28+
static_assert(is_scheduler_v<decltype(sched)>,
29+
"executor_scheduler must satisfy is_scheduler");
30+
31+
// Create a sender pipeline: schedule | then
32+
auto s = then(schedule(sched), []() { return 42; });
33+
34+
// Execute synchronously and verify the result
35+
auto result = hpx::this_thread::experimental::sync_wait(std::move(s));
36+
37+
HPX_TEST(result.has_value());
38+
HPX_TEST_EQ(hpx::get<0>(*result), 42);
39+
}
40+
41+
///////////////////////////////////////////////////////////////////////////////
42+
int hpx_main()
43+
{
44+
test_parallel_executor_scheduler();
45+
46+
return hpx::local::finalize();
47+
}
48+
49+
int main(int argc, char* argv[])
50+
{
51+
std::vector<std::string> const cfg = {"hpx.os_threads=all"};
52+
53+
hpx::local::init_params init_args;
54+
init_args.cfg = cfg;
55+
56+
HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0,
57+
"HPX main exited with non-zero status");
58+
59+
return hpx::util::report_errors();
60+
}

0 commit comments

Comments
 (0)