Skip to content

Commit aaf00f3

Browse files
author
Hackathon User
committed
Implement P2300 bulk adapter for HPX executors
- Integrated hpx::execution::experimental::bulk with internal bulk_sync_execute - Implemented robust tag_invoke(connect_t) with full const-correctness - Fixed receiver reference collapsing using std::decay_t - Added executor_algorithm_bulk unit tests for parallel and sequenced execution
1 parent f0b1af2 commit aaf00f3

7 files changed

Lines changed: 500 additions & 3 deletions

File tree

libs/core/executors/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ set(executors_headers
2626
hpx/executors/execution_policy_parameters.hpp
2727
hpx/executors/execution_policy_scheduling_property.hpp
2828
hpx/executors/execution_policy.hpp
29+
hpx/executors/executor_scheduler.hpp
30+
hpx/executors/executor_scheduler_bulk.hpp
2931
hpx/executors/explicit_scheduler_executor.hpp
3032
hpx/executors/fork_join_executor.hpp
3133
hpx/executors/limiting_executor.hpp
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Copyright (c) 2026 The STE||AR-Group
2+
//
3+
// SPDX-License-Identifier: BSL-1.0
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
7+
/// \file parallel/executors/executor_scheduler.hpp
8+
9+
#pragma once
10+
11+
#include <hpx/config.hpp>
12+
#include <hpx/modules/errors.hpp>
13+
#include <hpx/modules/execution.hpp>
14+
#include <hpx/modules/execution_base.hpp>
15+
16+
#include <exception>
17+
#include <type_traits>
18+
#include <utility>
19+
20+
namespace hpx::execution::experimental {
21+
22+
// Forward declarations
23+
template <typename Executor>
24+
struct executor_scheduler;
25+
26+
template <typename Executor>
27+
struct executor_sender;
28+
29+
///////////////////////////////////////////////////////////////////////////
30+
template <typename Executor, typename Receiver>
31+
struct executor_operation_state
32+
{
33+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Executor> exec_;
34+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver_;
35+
36+
template <typename Exec, typename Recv>
37+
executor_operation_state(Exec&& exec, Recv&& recv)
38+
: exec_(HPX_FORWARD(Exec, exec))
39+
, receiver_(HPX_FORWARD(Recv, recv))
40+
{
41+
}
42+
43+
executor_operation_state(executor_operation_state&&) = delete;
44+
executor_operation_state(executor_operation_state const&) = delete;
45+
executor_operation_state& operator=(
46+
executor_operation_state&&) = delete;
47+
executor_operation_state& operator=(
48+
executor_operation_state const&) = delete;
49+
50+
~executor_operation_state() = default;
51+
52+
friend void tag_invoke(start_t, executor_operation_state& os) noexcept
53+
{
54+
hpx::detail::try_catch_exception_ptr(
55+
[&]() {
56+
hpx::parallel::execution::post(os.exec_,
57+
[receiver = HPX_MOVE(os.receiver_)]() mutable {
58+
hpx::execution::experimental::set_value(
59+
HPX_MOVE(receiver));
60+
});
61+
},
62+
[&](std::exception_ptr ep) {
63+
hpx::execution::experimental::set_error(
64+
HPX_MOVE(os.receiver_), HPX_MOVE(ep));
65+
});
66+
}
67+
};
68+
69+
///////////////////////////////////////////////////////////////////////////
70+
template <typename Executor>
71+
struct executor_sender
72+
{
73+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Executor> exec_;
74+
75+
using completion_signatures =
76+
hpx::execution::experimental::completion_signatures<
77+
hpx::execution::experimental::set_value_t(),
78+
hpx::execution::experimental::set_error_t(std::exception_ptr)>;
79+
80+
template <typename Env>
81+
friend auto tag_invoke(
82+
hpx::execution::experimental::get_completion_signatures_t,
83+
executor_sender const&, Env) noexcept -> completion_signatures;
84+
85+
friend constexpr auto tag_invoke(
86+
hpx::execution::experimental::get_completion_scheduler_t<
87+
hpx::execution::experimental::set_value_t>,
88+
executor_sender const& s) noexcept
89+
{
90+
return executor_scheduler<Executor>{s.exec_};
91+
}
92+
93+
template <typename Receiver>
94+
friend executor_operation_state<Executor, Receiver> tag_invoke(
95+
connect_t, executor_sender&& s, Receiver&& receiver)
96+
{
97+
return {HPX_MOVE(s.exec_), HPX_FORWARD(Receiver, receiver)};
98+
}
99+
100+
template <typename Receiver>
101+
friend executor_operation_state<Executor, Receiver> tag_invoke(
102+
connect_t, executor_sender const& s, Receiver&& receiver)
103+
{
104+
return {s.exec_, HPX_FORWARD(Receiver, receiver)};
105+
}
106+
};
107+
108+
///////////////////////////////////////////////////////////////////////////
109+
template <typename Executor>
110+
struct executor_scheduler
111+
{
112+
using executor_type = std::decay_t<Executor>;
113+
114+
HPX_NO_UNIQUE_ADDRESS executor_type exec_;
115+
116+
constexpr executor_scheduler() = default;
117+
118+
template <typename Exec,
119+
typename = std::enable_if_t<
120+
!std::is_same_v<std::decay_t<Exec>, executor_scheduler>>>
121+
explicit executor_scheduler(Exec&& exec)
122+
: exec_(HPX_FORWARD(Exec, exec))
123+
{
124+
}
125+
126+
constexpr bool operator==(executor_scheduler const& rhs) const noexcept
127+
{
128+
return exec_ == rhs.exec_;
129+
}
130+
131+
constexpr bool operator!=(executor_scheduler const& rhs) const noexcept
132+
{
133+
return !(*this == rhs);
134+
}
135+
136+
friend executor_sender<Executor> tag_invoke(
137+
schedule_t, executor_scheduler&& sched)
138+
{
139+
return {HPX_MOVE(sched.exec_)};
140+
}
141+
142+
friend executor_sender<Executor> tag_invoke(
143+
schedule_t, executor_scheduler const& sched)
144+
{
145+
return {sched.exec_};
146+
}
147+
};
148+
} // namespace hpx::execution::experimental
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
// Copyright (c) 2026 The STE||AR-Group
2+
//
3+
// SPDX-License-Identifier: BSL-1.0
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
7+
#pragma once
8+
9+
#include <hpx/config.hpp>
10+
11+
#if defined(HPX_HAVE_STDEXEC)
12+
#include <hpx/execution_base/stdexec_forward.hpp>
13+
#endif
14+
15+
#include <hpx/execution/algorithms/bulk.hpp>
16+
#include <hpx/execution_base/completion_scheduler.hpp>
17+
#include <hpx/execution_base/completion_signatures.hpp>
18+
#include <hpx/execution_base/execution.hpp>
19+
#include <hpx/execution_base/receiver.hpp>
20+
#include <hpx/execution_base/sender.hpp>
21+
#include <hpx/executors/executor_scheduler.hpp>
22+
#include <hpx/modules/errors.hpp>
23+
24+
#include <exception>
25+
#include <type_traits>
26+
#include <utility>
27+
28+
namespace hpx::execution::experimental {
29+
30+
namespace detail {
31+
template <typename Executor, typename Receiver, typename Shape,
32+
typename F>
33+
struct executor_bulk_receiver
34+
{
35+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Executor> exec_;
36+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver_;
37+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Shape> shape_;
38+
HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f_;
39+
40+
template <typename Error>
41+
friend void tag_invoke(
42+
set_error_t, executor_bulk_receiver&& r, Error&& error) noexcept
43+
{
44+
hpx::execution::experimental::set_error(
45+
HPX_MOVE(r.receiver_), HPX_FORWARD(Error, error));
46+
}
47+
48+
friend void tag_invoke(
49+
set_stopped_t, executor_bulk_receiver&& r) noexcept
50+
{
51+
hpx::execution::experimental::set_stopped(
52+
HPX_MOVE(r.receiver_));
53+
}
54+
55+
template <typename... Ts>
56+
friend void tag_invoke(
57+
set_value_t, executor_bulk_receiver&& r, Ts&&... ts) noexcept
58+
{
59+
hpx::detail::try_catch_exception_ptr(
60+
[&]() {
61+
hpx::parallel::execution::bulk_sync_execute(
62+
r.exec_, r.f_, r.shape_, ts...);
63+
64+
hpx::execution::experimental::set_value(
65+
HPX_MOVE(r.receiver_), HPX_FORWARD(Ts, ts)...);
66+
},
67+
[&](std::exception_ptr ep) {
68+
hpx::execution::experimental::set_error(
69+
HPX_MOVE(r.receiver_), HPX_MOVE(ep));
70+
});
71+
}
72+
};
73+
74+
template <typename Executor, typename Sender, typename Shape,
75+
typename F>
76+
struct executor_bulk_sender
77+
{
78+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Executor> exec_;
79+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Sender> sender_;
80+
HPX_NO_UNIQUE_ADDRESS std::decay_t<Shape> shape_;
81+
HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f_;
82+
83+
#if defined(HPX_HAVE_STDEXEC)
84+
using sender_concept = hpx::execution::experimental::sender_t;
85+
86+
template <typename Env>
87+
friend auto tag_invoke(
88+
hpx::execution::experimental::get_completion_signatures_t,
89+
executor_bulk_sender const&, Env const&) -> hpx::execution::
90+
experimental::transform_completion_signatures_of<Sender, Env,
91+
hpx::execution::experimental::completion_signatures<
92+
hpx::execution::experimental::set_error_t(
93+
std::exception_ptr)>>;
94+
95+
struct env
96+
{
97+
std::decay_t<Sender> const& pred_snd;
98+
std::decay_t<Executor> const& exec;
99+
100+
template <typename CPO>
101+
requires(
102+
meta::value<meta::one_of<CPO,
103+
hpx::execution::experimental::set_error_t,
104+
hpx::execution::experimental::set_stopped_t>> &&
105+
hpx::execution::experimental::detail::
106+
has_completion_scheduler_v<CPO,
107+
std::decay_t<Sender>>)
108+
friend constexpr auto tag_invoke(
109+
hpx::execution::experimental::get_completion_scheduler_t<
110+
CPO>
111+
tag,
112+
env const& e) noexcept
113+
{
114+
return tag(
115+
hpx::execution::experimental::get_env(e.pred_snd));
116+
}
117+
118+
friend constexpr auto tag_invoke(
119+
hpx::execution::experimental::get_completion_scheduler_t<
120+
hpx::execution::experimental::set_value_t>,
121+
env const& e) noexcept
122+
{
123+
return hpx::execution::experimental::executor_scheduler<
124+
Executor>{e.exec};
125+
}
126+
};
127+
128+
friend constexpr auto tag_invoke(
129+
hpx::execution::experimental::get_env_t,
130+
executor_bulk_sender const& s) noexcept
131+
{
132+
return env{s.sender_, s.exec_};
133+
}
134+
#else
135+
template <typename Env>
136+
struct generate_completion_signatures
137+
{
138+
template <template <typename...> typename Tuple,
139+
template <typename...> typename Variant>
140+
using value_types =
141+
value_types_of_t<Sender, Env, Tuple, Variant>;
142+
143+
template <template <typename...> typename Variant>
144+
using error_types = hpx::util::detail::unique_concat_t<
145+
error_types_of_t<Sender, Env, Variant>,
146+
Variant<std::exception_ptr>>;
147+
148+
static constexpr bool sends_stopped =
149+
sends_stopped_of_v<Sender, Env>;
150+
};
151+
152+
template <typename Env>
153+
friend auto tag_invoke(
154+
hpx::execution::experimental::get_completion_signatures_t,
155+
executor_bulk_sender const&, Env)
156+
-> generate_completion_signatures<Env>;
157+
158+
template <typename CPO>
159+
friend constexpr auto tag_invoke(
160+
hpx::execution::experimental::get_completion_scheduler_t<CPO>
161+
tag,
162+
executor_bulk_sender const& s)
163+
{
164+
return tag(s.sender_);
165+
}
166+
167+
friend constexpr auto tag_invoke(
168+
hpx::execution::experimental::get_completion_scheduler_t<
169+
hpx::execution::experimental::set_value_t>,
170+
executor_bulk_sender const& s)
171+
{
172+
return hpx::execution::experimental::executor_scheduler<
173+
Executor>{s.exec_};
174+
}
175+
#endif
176+
177+
template <typename Receiver>
178+
friend auto tag_invoke(connect_t,
179+
executor_bulk_sender<Executor, Sender, Shape, F>&& s,
180+
Receiver&& receiver)
181+
{
182+
return hpx::execution::experimental::connect(
183+
HPX_MOVE(s.sender_),
184+
executor_bulk_receiver<Executor, std::decay_t<Receiver>,
185+
Shape, F>{HPX_MOVE(s.exec_),
186+
HPX_FORWARD(Receiver, receiver), HPX_MOVE(s.shape_),
187+
HPX_MOVE(s.f_)});
188+
}
189+
190+
template <typename Receiver>
191+
friend auto tag_invoke(connect_t,
192+
executor_bulk_sender<Executor, Sender, Shape, F>& s,
193+
Receiver&& receiver)
194+
{
195+
return hpx::execution::experimental::connect(s.sender_,
196+
executor_bulk_receiver<Executor, std::decay_t<Receiver>,
197+
Shape, F>{s.exec_, HPX_FORWARD(Receiver, receiver),
198+
s.shape_, s.f_});
199+
}
200+
201+
template <typename Receiver>
202+
friend auto tag_invoke(connect_t,
203+
executor_bulk_sender<Executor, Sender, Shape, F> const& s,
204+
Receiver&& receiver)
205+
{
206+
return hpx::execution::experimental::connect(s.sender_,
207+
executor_bulk_receiver<Executor, std::decay_t<Receiver>,
208+
Shape, F>{s.exec_, HPX_FORWARD(Receiver, receiver),
209+
s.shape_, s.f_});
210+
}
211+
};
212+
} // namespace detail
213+
214+
template <typename Executor, typename Sender, typename Shape, typename F>
215+
auto tag_invoke(bulk_t, executor_scheduler<Executor> const& sched,
216+
Sender&& sender, Shape const& shape, F&& f)
217+
{
218+
return detail::executor_bulk_sender<Executor, std::decay_t<Sender>,
219+
Shape, std::decay_t<F>>{
220+
sched.exec_, HPX_FORWARD(Sender, sender), shape, HPX_FORWARD(F, f)};
221+
}
222+
} // namespace hpx::execution::experimental

0 commit comments

Comments
 (0)