Skip to content

Commit 893a621

Browse files
author
Hackathon User
committed
Implement P2300 get_scheduler bridge for parallel_executor
- Add executor_scheduler<Executor> adapter (header + module registration) - Add get_scheduler_t tag_invoke to parallel_policy_executor (both flat and hierarchical) - Add get_scheduler_t tag_invoke to sequenced_executor - Add unit tests for both executor_scheduler and parallel_executor_scheduler
1 parent 4d26638 commit 893a621

7 files changed

Lines changed: 294 additions & 4 deletions

File tree

libs/core/executors/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ set(executors_headers
2626
hpx/executors/execution_policy_parameters.hpp
2727
hpx/executors/execution_policy_scheduling_property.hpp
2828
hpx/executors/execution_policy.hpp
29+
hpx/executors/executor_scheduler.hpp
2930
hpx/executors/explicit_scheduler_executor.hpp
3031
hpx/executors/fork_join_executor.hpp
3132
hpx/executors/limiting_executor.hpp
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Copyright (c) 2026 The STE||AR-Group
2+
//
3+
// SPDX-License-Identifier: BSL-1.0
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
7+
/// \file parallel/executors/executor_scheduler.hpp
8+
9+
#pragma once
10+
11+
#include <hpx/config.hpp>
12+
#include <hpx/modules/errors.hpp>
13+
#include <hpx/modules/execution.hpp>
14+
#include <hpx/modules/execution_base.hpp>
15+
16+
#include <exception>
17+
#include <type_traits>
18+
#include <utility>
19+
20+
namespace hpx::execution::experimental {
21+
22+
// Forward declarations
23+
template <typename Executor>
24+
struct executor_scheduler;
25+
26+
template <typename Executor>
27+
struct executor_sender;
28+
29+
///////////////////////////////////////////////////////////////////////////
30+
template <typename Executor, typename Receiver>
31+
struct executor_operation_state
32+
{
33+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Executor> exec_;
34+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver_;
35+
36+
template <typename Exec, typename Recv>
37+
executor_operation_state(Exec&& exec, Recv&& recv)
38+
: exec_(HPX_FORWARD(Exec, exec))
39+
, receiver_(HPX_FORWARD(Recv, recv))
40+
{
41+
}
42+
43+
executor_operation_state(executor_operation_state&&) = delete;
44+
executor_operation_state(executor_operation_state const&) = delete;
45+
executor_operation_state& operator=(
46+
executor_operation_state&&) = delete;
47+
executor_operation_state& operator=(
48+
executor_operation_state const&) = delete;
49+
50+
~executor_operation_state() = default;
51+
52+
friend void tag_invoke(start_t, executor_operation_state& os) noexcept
53+
{
54+
hpx::detail::try_catch_exception_ptr(
55+
[&]() {
56+
hpx::parallel::execution::post(os.exec_,
57+
[receiver = HPX_MOVE(os.receiver_)]() mutable {
58+
hpx::execution::experimental::set_value(
59+
HPX_MOVE(receiver));
60+
});
61+
},
62+
[&](std::exception_ptr ep) {
63+
hpx::execution::experimental::set_error(
64+
HPX_MOVE(os.receiver_), HPX_MOVE(ep));
65+
});
66+
}
67+
};
68+
69+
///////////////////////////////////////////////////////////////////////////
70+
template <typename Executor>
71+
struct executor_sender
72+
{
73+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Executor> exec_;
74+
75+
using completion_signatures =
76+
hpx::execution::experimental::completion_signatures<
77+
hpx::execution::experimental::set_value_t(),
78+
hpx::execution::experimental::set_error_t(std::exception_ptr)>;
79+
80+
template <typename Env>
81+
friend auto tag_invoke(
82+
hpx::execution::experimental::get_completion_signatures_t,
83+
executor_sender const&, Env) noexcept -> completion_signatures;
84+
85+
template <typename Receiver>
86+
friend executor_operation_state<Executor, Receiver> tag_invoke(
87+
connect_t, executor_sender&& s, Receiver&& receiver)
88+
{
89+
return {HPX_MOVE(s.exec_), HPX_FORWARD(Receiver, receiver)};
90+
}
91+
92+
template <typename Receiver>
93+
friend executor_operation_state<Executor, Receiver> tag_invoke(
94+
connect_t, executor_sender const& s, Receiver&& receiver)
95+
{
96+
return {s.exec_, HPX_FORWARD(Receiver, receiver)};
97+
}
98+
};
99+
100+
///////////////////////////////////////////////////////////////////////////
101+
template <typename Executor>
102+
struct executor_scheduler
103+
{
104+
using executor_type = std::decay_t<Executor>;
105+
106+
HPX_NO_UNIQUE_ADDRESS executor_type exec_;
107+
108+
constexpr executor_scheduler() = default;
109+
110+
template <typename Exec,
111+
typename = std::enable_if_t<
112+
!std::is_same_v<std::decay_t<Exec>, executor_scheduler>>>
113+
explicit executor_scheduler(Exec&& exec)
114+
: exec_(HPX_FORWARD(Exec, exec))
115+
{
116+
}
117+
118+
constexpr bool operator==(executor_scheduler const& rhs) const noexcept
119+
{
120+
return exec_ == rhs.exec_;
121+
}
122+
123+
constexpr bool operator!=(executor_scheduler const& rhs) const noexcept
124+
{
125+
return !(*this == rhs);
126+
}
127+
128+
friend executor_sender<Executor> tag_invoke(
129+
schedule_t, executor_scheduler&& sched)
130+
{
131+
return {HPX_MOVE(sched.exec_)};
132+
}
133+
134+
friend executor_sender<Executor> tag_invoke(
135+
schedule_t, executor_scheduler const& sched)
136+
{
137+
return {sched.exec_};
138+
}
139+
};
140+
} // namespace hpx::execution::experimental

libs/core/executors/include/hpx/executors/parallel_executor.hpp

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <hpx/executors/detail/hierarchical_spawning.hpp>
1414
#include <hpx/executors/detail/index_queue_spawning.hpp>
1515
#include <hpx/executors/execution_policy_mappings.hpp>
16+
#include <hpx/executors/executor_scheduler.hpp>
1617
#include <hpx/modules/allocator_support.hpp>
1718
#include <hpx/modules/async_base.hpp>
1819
#include <hpx/modules/concepts.hpp>
@@ -259,7 +260,7 @@ namespace hpx::execution {
259260
#endif
260261

261262
using allocator_type = hpx::util::thread_local_caching_allocator<
262-
hpx::lockfree::variable_size_stack,
263+
hpx::lockfree::variable_size_stack, char,
263264
hpx::util::internal_allocator<>>;
264265
hpx::traits::detail::shared_state_ptr_t<result_type> p =
265266
lcos::detail::make_continuation_alloc_nounwrap<result_type>(
@@ -474,9 +475,7 @@ namespace hpx::execution {
474475
parallel_policy_executor& operator=(
475476
parallel_policy_executor&&) = default;
476477

477-
#if defined(__NVCC__) || defined(__CUDACC__)
478-
constexpr ~parallel_policy_executor() {}
479-
#endif
478+
~parallel_policy_executor() = default;
480479

481480
private:
482481
// property implementations
@@ -687,6 +686,15 @@ namespace hpx::execution {
687686
#endif
688687
}
689688
}
689+
690+
friend hpx::execution::experimental::executor_scheduler<
691+
parallel_policy_executor>
692+
tag_invoke(hpx::execution::experimental::get_scheduler_t,
693+
parallel_policy_executor const& exec)
694+
{
695+
return hpx::execution::experimental::executor_scheduler<
696+
parallel_policy_executor>(exec);
697+
}
690698
/// \endcond
691699

692700
public:
@@ -1015,6 +1023,15 @@ namespace hpx::execution {
10151023
#endif
10161024
}
10171025
}
1026+
1027+
friend hpx::execution::experimental::executor_scheduler<
1028+
parallel_policy_executor>
1029+
tag_invoke(hpx::execution::experimental::get_scheduler_t,
1030+
parallel_policy_executor const& exec)
1031+
{
1032+
return hpx::execution::experimental::executor_scheduler<
1033+
parallel_policy_executor>(exec);
1034+
}
10181035
/// \endcond
10191036

10201037
public:

libs/core/executors/include/hpx/executors/sequenced_executor.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#include <hpx/config.hpp>
1212
#include <hpx/executors/execution_policy_mappings.hpp>
13+
#include <hpx/executors/executor_scheduler.hpp>
1314
#include <hpx/executors/parallel_executor.hpp>
1415
#include <hpx/modules/errors.hpp>
1516
#include <hpx/modules/execution.hpp>
@@ -236,6 +237,15 @@ namespace hpx::execution {
236237
#endif
237238
}
238239

240+
friend hpx::execution::experimental::executor_scheduler<
241+
sequenced_executor>
242+
tag_invoke(hpx::execution::experimental::get_scheduler_t,
243+
sequenced_executor const& exec)
244+
{
245+
return hpx::execution::experimental::executor_scheduler<
246+
sequenced_executor>(exec);
247+
}
248+
239249
private:
240250
friend class hpx::serialization::access;
241251

libs/core/executors/tests/unit/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ set(tests
99
annotation_property
1010
created_executor
1111
execution_policy_mappings
12+
executor_scheduler
1213
explicit_scheduler_executor
1314
fork_join_executor
1415
fork_join_executor_from
1516
limiting_executor
1617
parallel_executor
1718
parallel_executor_parameters
19+
parallel_executor_scheduler
1820
parallel_fork_executor
1921
parallel_policy_executor
2022
polymorphic_executor
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright (c) 2026 The STE||AR-Group
2+
//
3+
// SPDX-License-Identifier: BSL-1.0
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
7+
#include <hpx/execution.hpp>
8+
#include <hpx/future.hpp>
9+
#include <hpx/init.hpp>
10+
#include <hpx/modules/testing.hpp>
11+
12+
#include <cstdlib>
13+
#include <string>
14+
#include <utility>
15+
#include <vector>
16+
17+
///////////////////////////////////////////////////////////////////////////////
18+
void test_executor_scheduler_schedule()
19+
{
20+
using namespace hpx::execution::experimental;
21+
22+
hpx::execution::sequenced_executor exec;
23+
24+
// Retrieve a P2300-compliant scheduler from the legacy executor
25+
auto sched = get_scheduler(exec);
26+
27+
// Verify the scheduler satisfies the is_scheduler trait
28+
static_assert(is_scheduler_v<decltype(sched)>,
29+
"executor_scheduler must satisfy is_scheduler");
30+
31+
// Create a sender pipeline: schedule | then
32+
auto s = then(schedule(sched), []() { return 42; });
33+
34+
// Execute synchronously and verify the result
35+
auto result = hpx::this_thread::experimental::sync_wait(std::move(s));
36+
37+
HPX_TEST(result.has_value());
38+
HPX_TEST_EQ(hpx::get<0>(*result), 42);
39+
}
40+
41+
///////////////////////////////////////////////////////////////////////////////
42+
int hpx_main()
43+
{
44+
test_executor_scheduler_schedule();
45+
46+
return hpx::local::finalize();
47+
}
48+
49+
int main(int argc, char* argv[])
50+
{
51+
std::vector<std::string> const cfg = {"hpx.os_threads=all"};
52+
53+
hpx::local::init_params init_args;
54+
init_args.cfg = cfg;
55+
56+
HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0,
57+
"HPX main exited with non-zero status");
58+
59+
return hpx::util::report_errors();
60+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright (c) 2026 The STE||AR-Group
2+
//
3+
// SPDX-License-Identifier: BSL-1.0
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
7+
#include <hpx/execution.hpp>
8+
#include <hpx/future.hpp>
9+
#include <hpx/init.hpp>
10+
#include <hpx/modules/testing.hpp>
11+
12+
#include <cstdlib>
13+
#include <string>
14+
#include <utility>
15+
#include <vector>
16+
17+
///////////////////////////////////////////////////////////////////////////////
18+
void test_parallel_executor_scheduler()
19+
{
20+
using namespace hpx::execution::experimental;
21+
22+
hpx::execution::parallel_executor exec;
23+
24+
// Retrieve a P2300-compliant scheduler from the legacy executor
25+
auto sched = get_scheduler(exec);
26+
27+
// Verify the scheduler satisfies the is_scheduler trait
28+
static_assert(is_scheduler_v<decltype(sched)>,
29+
"executor_scheduler must satisfy is_scheduler");
30+
31+
// Create a sender pipeline: schedule | then
32+
auto s = then(schedule(sched), []() { return 42; });
33+
34+
// Execute synchronously and verify the result
35+
auto result = hpx::this_thread::experimental::sync_wait(std::move(s));
36+
37+
HPX_TEST(result.has_value());
38+
HPX_TEST_EQ(hpx::get<0>(*result), 42);
39+
}
40+
41+
///////////////////////////////////////////////////////////////////////////////
42+
int hpx_main()
43+
{
44+
test_parallel_executor_scheduler();
45+
46+
return hpx::local::finalize();
47+
}
48+
49+
int main(int argc, char* argv[])
50+
{
51+
std::vector<std::string> const cfg = {"hpx.os_threads=all"};
52+
53+
hpx::local::init_params init_args;
54+
init_args.cfg = cfg;
55+
56+
HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0,
57+
"HPX main exited with non-zero status");
58+
59+
return hpx::util::report_errors();
60+
}

0 commit comments

Comments
 (0)