Skip to content

Commit b530f78

Browse files
execution: Address hkaiser review on as_sender scheduler overload
- Replace enable_if_t with C++20 requires clause in constructor - Deduplicate as_sender_sender_with_scheduler specializations into primary template - Use requires(is_future_v<Future>) to cover both hpx::future and hpx::shared_future Signed-off-by: Shivansh <singhshivansh023@gmail.com> Signed-off-by: Shivansh Singh <singhshivansh023@gmail.com>
1 parent 6db954c commit b530f78

3 files changed

Lines changed: 130 additions & 101 deletions

File tree

libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp

Lines changed: 18 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -241,16 +241,14 @@ namespace hpx::execution::experimental {
241241
// get_completion_scheduler<set_value_t> and obtain the scheduler
242242
// that originated the work.
243243
HPX_CXX_CORE_EXPORT template <typename Future, typename Scheduler>
244-
struct as_sender_sender_with_scheduler;
245-
246-
template <typename T, typename Scheduler>
247-
struct as_sender_sender_with_scheduler<hpx::future<T>, Scheduler>
248-
: public as_sender_sender_base<hpx::future<T>>
244+
requires hpx::traits::is_future_v<std::decay_t<Future>>
245+
struct as_sender_sender_with_scheduler
246+
: public as_sender_sender_base<std::decay_t<Future>>
249247
{
250248
using sender_concept = hpx::execution::experimental::sender_t;
251-
using future_type = hpx::future<T>;
249+
using future_type = std::decay_t<Future>;
252250
using scheduler_type = std::decay_t<Scheduler>;
253-
using base_type = as_sender_sender_base<hpx::future<T>>;
251+
using base_type = as_sender_sender_base<std::decay_t<Future>>;
254252
using base_type::future_;
255253

256254
HPX_NO_UNIQUE_ADDRESS scheduler_type scheduler_;
@@ -261,107 +259,28 @@ namespace hpx::execution::experimental {
261259
{
262260
scheduler_type const& sched;
263261

264-
auto query(hpx::execution::experimental::get_domain_t)
265-
const noexcept
266-
{
267-
return hpx::execution::experimental::get_domain(sched);
268-
}
269-
270-
template <typename CPO>
271-
requires std::is_same_v<CPO,
272-
hpx::execution::experimental::
273-
set_value_t> ||
274-
std::is_same_v<CPO,
275-
hpx::execution::experimental::set_stopped_t>
276262
auto query(
277-
hpx::execution::experimental::
278-
get_completion_scheduler_t<CPO>) const noexcept
279-
{
280-
return sched;
281-
}
282-
};
283-
284-
template <typename Future_, typename Scheduler_,
285-
typename = std::enable_if_t<!std::is_same_v<
286-
std::decay_t<Future_>,
287-
as_sender_sender_with_scheduler>>>
288-
explicit as_sender_sender_with_scheduler(
289-
Future_&& future, Scheduler_&& scheduler)
290-
: base_type{HPX_FORWARD(Future_, future)}
291-
, scheduler_(HPX_FORWARD(Scheduler_, scheduler))
292-
{
293-
}
294-
295-
as_sender_sender_with_scheduler(
296-
as_sender_sender_with_scheduler&&) = default;
297-
as_sender_sender_with_scheduler& operator=(
298-
as_sender_sender_with_scheduler&&) = default;
299-
as_sender_sender_with_scheduler(
300-
as_sender_sender_with_scheduler const&) = delete;
301-
as_sender_sender_with_scheduler& operator=(
302-
as_sender_sender_with_scheduler const&) = delete;
303-
304-
template <typename Self, typename... Env>
305-
static consteval auto get_completion_signatures() noexcept ->
306-
typename base_type::completion_signatures
307-
{
308-
return {};
309-
}
310-
311-
template <typename Receiver>
312-
auto connect(Receiver&& receiver) &&
313-
{
314-
return as_sender_operation_state<Receiver, future_type>{
315-
HPX_FORWARD(Receiver, receiver), HPX_MOVE(future_)};
316-
}
317-
318-
constexpr auto get_env() const noexcept
319-
{
320-
return env{scheduler_};
321-
}
322-
};
323-
324-
template <typename T, typename Scheduler>
325-
struct as_sender_sender_with_scheduler<hpx::shared_future<T>, Scheduler>
326-
: public as_sender_sender_base<hpx::shared_future<T>>
327-
{
328-
using sender_concept = hpx::execution::experimental::sender_t;
329-
using future_type = hpx::shared_future<T>;
330-
using scheduler_type = std::decay_t<Scheduler>;
331-
using base_type =
332-
as_sender_sender_base<hpx::shared_future<T>>;
333-
using base_type::future_;
334-
335-
HPX_NO_UNIQUE_ADDRESS scheduler_type scheduler_;
336-
337-
struct env
338-
{
339-
scheduler_type const& sched;
340-
341-
auto query(hpx::execution::experimental::get_domain_t)
342-
const noexcept
263+
hpx::execution::experimental::get_domain_t) const noexcept
343264
{
344265
return hpx::execution::experimental::get_domain(sched);
345266
}
346267

347268
template <typename CPO>
348269
requires std::is_same_v<CPO,
349-
hpx::execution::experimental::
350-
set_value_t> ||
351-
std::is_same_v<CPO,
352-
hpx::execution::experimental::set_stopped_t>
270+
hpx::execution::experimental::set_value_t> ||
271+
std::is_same_v<CPO,
272+
hpx::execution::experimental::set_stopped_t>
353273
auto query(
354-
hpx::execution::experimental::
355-
get_completion_scheduler_t<CPO>) const noexcept
274+
hpx::execution::experimental::get_completion_scheduler_t<
275+
CPO>) const noexcept
356276
{
357277
return sched;
358278
}
359279
};
360280

361-
template <typename Future_, typename Scheduler_,
362-
typename = std::enable_if_t<!std::is_same_v<
363-
std::decay_t<Future_>,
364-
as_sender_sender_with_scheduler>>>
281+
template <typename Future_, typename Scheduler_>
282+
requires(!std::is_same_v<std::decay_t<Future_>,
283+
as_sender_sender_with_scheduler>)
365284
explicit as_sender_sender_with_scheduler(
366285
Future_&& future, Scheduler_&& scheduler)
367286
: base_type{HPX_FORWARD(Future_, future)}
@@ -432,15 +351,13 @@ namespace hpx::execution::experimental {
432351
// environment exposes the given scheduler as completion scheduler.
433352
template <typename Future, typename Scheduler>
434353
requires hpx::traits::is_future_v<std::decay_t<Future>> &&
435-
hpx::execution::experimental::scheduler<
436-
std::decay_t<Scheduler>>
354+
hpx::execution::experimental::scheduler<std::decay_t<Scheduler>>
437355
constexpr HPX_FORCEINLINE auto operator()(
438356
Future&& future, Scheduler&& scheduler) const
439357
{
440-
return detail::as_sender_sender_with_scheduler<
441-
std::decay_t<Future>, std::decay_t<Scheduler>>(
442-
HPX_FORWARD(Future, future),
443-
HPX_FORWARD(Scheduler, scheduler));
358+
return detail::as_sender_sender_with_scheduler<std::decay_t<Future>,
359+
std::decay_t<Scheduler>>(
360+
HPX_FORWARD(Future, future), HPX_FORWARD(Scheduler, scheduler));
444361
}
445362

446363
constexpr HPX_FORCEINLINE auto operator()() const

libs/core/execution/tests/unit/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
set(tests
88
algorithm_as_sender
9+
algorithm_as_sender_with_scheduler
910
algorithm_bulk
1011
algorithm_continues_on
1112
algorithm_ensure_started
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright (c) 2026 The STE||AR-Group
2+
//
3+
// SPDX-License-Identifier: BSL-1.0
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
7+
#include <hpx/execution.hpp>
8+
#include <hpx/executors/thread_pool_scheduler.hpp>
9+
#include <hpx/future.hpp>
10+
#include <hpx/init.hpp>
11+
#include <hpx/modules/futures.hpp>
12+
#include <hpx/modules/testing.hpp>
13+
14+
namespace ex = hpx::execution::experimental;
15+
namespace tt = hpx::this_thread::experimental;
16+
17+
void test_as_sender_with_scheduler_basic()
18+
{
19+
// Test: future + scheduler -> sender with env
20+
auto f = hpx::async([] { return 42; });
21+
ex::thread_pool_scheduler sched{};
22+
auto sender = ex::as_sender(std::move(f), sched);
23+
// Verify compilation and basic properties
24+
(void) sender;
25+
}
26+
27+
void test_as_sender_with_scheduler_get_env()
28+
{
29+
// Test: env exposes the scheduler
30+
auto f = hpx::async([] { return 42; });
31+
ex::thread_pool_scheduler sched{};
32+
auto sender = ex::as_sender(std::move(f), sched);
33+
auto env = ex::get_env(sender);
34+
// Verify env can be queried for scheduler
35+
auto sched_from_env = ex::get_completion_scheduler<ex::set_value_t>(env);
36+
HPX_TEST(sched == sched_from_env);
37+
}
38+
39+
void test_as_sender_with_scheduler_in_pipeline()
40+
{
41+
// Full end-to-end test
42+
auto f = hpx::async([] { return 42; });
43+
ex::thread_pool_scheduler sched{};
44+
45+
auto [result] =
46+
tt::sync_wait(ex::as_sender(std::move(f), sched) | ex::then([](int x) {
47+
return x * 2;
48+
})).value();
49+
50+
HPX_TEST_EQ(result, 84);
51+
}
52+
53+
void test_as_sender_with_scheduler_shared_future()
54+
{
55+
// Test shared_future variant (copyable)
56+
auto f = hpx::make_shared_future(hpx::async([] { return 42; }));
57+
ex::thread_pool_scheduler sched{};
58+
auto sender = ex::as_sender(f, sched); // f is lvalue, not moved
59+
60+
auto [result] =
61+
tt::sync_wait(sender | ex::then([](int x) { return x * 2; })).value();
62+
63+
HPX_TEST_EQ(result, 84);
64+
}
65+
66+
void test_as_sender_with_scheduler_void()
67+
{
68+
// Test void-returning futures
69+
bool executed = false;
70+
auto f = hpx::async([&] { executed = true; });
71+
ex::thread_pool_scheduler sched{};
72+
auto sender = ex::as_sender(std::move(f), sched);
73+
tt::sync_wait(std::move(sender));
74+
HPX_TEST(executed);
75+
}
76+
77+
void test_as_sender_with_scheduler_error()
78+
{
79+
// Test exception propagation
80+
auto f = hpx::async([]() -> int {
81+
throw std::runtime_error("test error");
82+
return 42;
83+
});
84+
ex::thread_pool_scheduler sched{};
85+
auto sender = ex::as_sender(std::move(f), sched);
86+
87+
bool caught = false;
88+
try
89+
{
90+
tt::sync_wait(std::move(sender));
91+
HPX_TEST(false); // Should have thrown
92+
}
93+
catch (std::runtime_error const& e)
94+
{
95+
caught = true;
96+
HPX_TEST_EQ(std::string(e.what()), "test error");
97+
}
98+
HPX_TEST(caught);
99+
}
100+
101+
int main()
102+
{
103+
test_as_sender_with_scheduler_basic();
104+
test_as_sender_with_scheduler_get_env();
105+
test_as_sender_with_scheduler_in_pipeline();
106+
test_as_sender_with_scheduler_shared_future();
107+
test_as_sender_with_scheduler_void();
108+
test_as_sender_with_scheduler_error();
109+
110+
return hpx::util::report_errors();
111+
}

0 commit comments

Comments
 (0)