Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 243 additions & 28 deletions libs/core/execution/include/hpx/execution/algorithms/split.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,19 @@ namespace hpx::execution::experimental {
}
};

// schedule_completion dispatches a stored continuation to
// the correct execution context. The base implementation fires
// the continuation inline (no scheduler attached). Subclasses
// override this to reroute through a specific scheduler,
// ensuring P2300 get_completion_scheduler guarantees hold for
// late-arriving subscribers (i.e. when predecessor_done is
// already true at the time add_continuation is called).
virtual void schedule_completion(
continuation_type&& continuation)
{
continuation();
}

virtual void set_predecessor_done()
{
predecessor_done = true;
Expand Down Expand Up @@ -325,7 +338,7 @@ namespace hpx::execution::experimental {

if (!continuations.empty())
{
for (auto const& continuation : continuations)
for (auto& continuation : continuations)
{
continuation();
}
Expand All @@ -345,13 +358,18 @@ namespace hpx::execution::experimental {
// If we read predecessor_done here it means that one of
// set_error/set_stopped/set_value has been called and
// values/errors have been stored into the shared state.
// We can trigger the continuation directly.
// TODO: Should this preserve the scheduler? It does not
// if we call set_* inline.
hpx::visit(
done_error_value_visitor<Receiver>{
HPX_FORWARD(Receiver, receiver)},
v);
// We dispatch the completion through schedule_completion
// so that any attached scheduler is honoured, satisfying
// the P2300 get_completion_scheduler contract for late
// subscribers.
schedule_completion([this,
receiver = HPX_FORWARD(Receiver,
receiver)]() mutable {
hpx::visit(
done_error_value_visitor<Receiver>{
HPX_MOVE(receiver)},
v);
});
}
else
{
Expand All @@ -364,13 +382,18 @@ namespace hpx::execution::experimental {
{
// By the time the lock has been taken,
// predecessor_done might already be true and we can
// release the lock early and call the continuation
// directly again.
// release the lock early and dispatch through
// schedule_completion to honour the scheduler.
l.unlock();
hpx::visit(
done_error_value_visitor<Receiver>{
HPX_FORWARD(Receiver, receiver)},
v);
schedule_completion(
[this,
receiver = HPX_FORWARD(
Receiver, receiver)]() mutable {
hpx::visit(
done_error_value_visitor<Receiver>{
HPX_MOVE(receiver)},
v);
});
}
else
{
Expand All @@ -380,7 +403,8 @@ namespace hpx::execution::experimental {
// other threads may also try to add continuations
// to the vector and the vector is not threadsafe in
// itself. The continuation will be called later
// when set_error/set_stopped/set_value is called.
// when set_error/set_stopped/set_value is called
// (via set_predecessor_done).
continuations.emplace_back(
[this,
receiver = HPX_FORWARD(
Expand Down Expand Up @@ -454,16 +478,158 @@ namespace hpx::execution::experimental {
}
};

// shared_state_scheduler wraps a generic Scheduler and overrides
// schedule_completion so that late-arriving subscribers receive
// their completion signal dispatched on the scheduler's execution
// context, preserving the P2300 get_completion_scheduler contract.
//
// Note: this is intentionally separate from shared_state_run_loop
// to avoid adding a run_loop dependency for general schedulers.
template <typename Sched>
struct shared_state_scheduler : shared_state
{
HPX_NO_UNIQUE_ADDRESS std::decay_t<Sched> sched;

using continuation_type =
typename shared_state::continuation_type;
using base_alloc_type = typename shared_state::allocator_type;

struct schedule_op_holder;

struct schedule_receiver
{
hpx::intrusive_ptr<schedule_op_holder> holder;

friend void tag_invoke(
set_value_t, schedule_receiver&& r) noexcept
{
auto h = HPX_MOVE(r.holder);
HPX_MOVE(h->cont)();
}

template <typename Error>
[[noreturn]] friend void tag_invoke(
set_error_t, schedule_receiver&&, Error&&) noexcept
{
std::terminate();
}

friend void tag_invoke(
set_stopped_t, schedule_receiver&& r) noexcept
{
r.holder.reset();
Comment thread
arpittkhandelwal marked this conversation as resolved.
Outdated
}

friend empty_env tag_invoke(
get_env_t, schedule_receiver const&) noexcept
{
return {};
}
};

using schedule_sender_type = hpx::util::invoke_result_t<
hpx::execution::experimental::schedule_t,
std::decay_t<Sched>&>;
using op_state_type =
connect_result_t<schedule_sender_type, schedule_receiver>;

struct schedule_op_holder
{
using holder_alloc_type =
typename std::allocator_traits<base_alloc_type>::
template rebind_alloc<schedule_op_holder>;

continuation_type cont;
hpx::util::atomic_count ref_count{0};
HPX_NO_UNIQUE_ADDRESS holder_alloc_type alloc;
op_state_type op_state;

schedule_op_holder(continuation_type&& c,
std::decay_t<Sched>& s, holder_alloc_type const& a)
: cont(HPX_MOVE(c))
, alloc(a)
, op_state(hpx::execution::experimental::connect(
hpx::execution::experimental::schedule(s),
schedule_receiver{
hpx::intrusive_ptr<schedule_op_holder>(this)}))
{
Comment on lines +551 to +555
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schedule_op_holder constructs schedule_receiver with intrusive_ptr<schedule_op_holder>(this) while ref_count starts at 0. If schedule(s) or connect(...) throws during op_state construction, the temporary intrusive_ptr can decrement the count back to 0 and call intrusive_ptr_release on a partially-constructed object, which is undefined behaviour. Consider establishing an owning ref before creating the receiver (e.g., set ref_count to 1 and pass intrusive_ptr(this, /add_ref=/false), then create the external owner with add_ref=false), or restructure so op_state is created after the owner intrusive_ptr exists.

Copilot uses AI. Check for mistakes.
}

friend void intrusive_ptr_add_ref(
schedule_op_holder* p) noexcept
{
p->ref_count.increment();
}

friend void intrusive_ptr_release(
schedule_op_holder* p) noexcept
{
if (p->ref_count.decrement() == 0)
{
std::atomic_thread_fence(std::memory_order_acquire);
holder_alloc_type a(p->alloc);
std::allocator_traits<holder_alloc_type>::destroy(
a, p);
std::allocator_traits<
holder_alloc_type>::deallocate(a, p, 1);
}
}
};

// clang-format off
template <typename Sender_,
HPX_CONCEPT_REQUIRES_(
meta::value<meta::none_of<
shared_state_scheduler, std::decay_t<Sender_>>>
)>
// clang-format on
shared_state_scheduler(Sender_&& sender,
typename shared_state::allocator_type const& alloc,
std::decay_t<Sched> scheduler_)
: shared_state(HPX_FORWARD(Sender_, sender), alloc)
, sched(HPX_MOVE(scheduler_))
{
}

~shared_state_scheduler() override = default;

void schedule_completion(
continuation_type&& continuation) override
{
using holder_alloc_type =
typename schedule_op_holder::holder_alloc_type;
using holder_alloc_traits =
std::allocator_traits<holder_alloc_type>;
using holder_unique_ptr =
std::unique_ptr<schedule_op_holder,
util::allocator_deleter<holder_alloc_type>>;

holder_alloc_type holder_alloc(this->alloc);
holder_unique_ptr p(
holder_alloc_traits::allocate(holder_alloc, 1),
hpx::util::allocator_deleter<holder_alloc_type>{
holder_alloc});
holder_alloc_traits::construct(holder_alloc, p.get(),
HPX_MOVE(continuation), sched, holder_alloc);

hpx::intrusive_ptr<schedule_op_holder> owner(p.release());
hpx::execution::experimental::start(owner->op_state);
Comment on lines +597 to +616
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shared_state_scheduler::schedule_completion is invoked from split_sender::operation_state::start (which is noexcept). However this implementation performs allocations and calls schedule/connect, any of which may throw; an exception escaping here will trigger std::terminate and can make this path much less robust than the previous inline completion. Consider making schedule_completion noexcept and handling failures explicitly (e.g., catch and either invoke the continuation inline as a fallback, or terminate with a clear message).

Suggested change
continuation_type&& continuation) override
{
using holder_alloc_type =
typename schedule_op_holder::holder_alloc_type;
using holder_alloc_traits =
std::allocator_traits<holder_alloc_type>;
using holder_unique_ptr =
std::unique_ptr<schedule_op_holder,
util::allocator_deleter<holder_alloc_type>>;
holder_alloc_type holder_alloc(this->alloc);
holder_unique_ptr p(
holder_alloc_traits::allocate(holder_alloc, 1),
hpx::util::allocator_deleter<holder_alloc_type>{
holder_alloc});
holder_alloc_traits::construct(holder_alloc, p.get(),
HPX_MOVE(continuation), sched, holder_alloc);
hpx::intrusive_ptr<schedule_op_holder> owner(p.release());
hpx::execution::experimental::start(owner->op_state);
continuation_type&& continuation) noexcept override
{
try
{
using holder_alloc_type =
typename schedule_op_holder::holder_alloc_type;
using holder_alloc_traits =
std::allocator_traits<holder_alloc_type>;
using holder_unique_ptr =
std::unique_ptr<schedule_op_holder,
util::allocator_deleter<holder_alloc_type>>;
holder_alloc_type holder_alloc(this->alloc);
holder_unique_ptr p(
holder_alloc_traits::allocate(holder_alloc, 1),
hpx::util::allocator_deleter<holder_alloc_type>{
holder_alloc});
holder_alloc_traits::construct(holder_alloc, p.get(),
HPX_MOVE(continuation), sched, holder_alloc);
hpx::intrusive_ptr<schedule_op_holder> owner(
p.release());
hpx::execution::experimental::start(owner->op_state);
}
catch (...)
{
try
{
HPX_MOVE(continuation)();
}
catch (...)
{
std::terminate();
}
}

Copilot uses AI. Check for mistakes.
}
};

hpx::intrusive_ptr<shared_state> state;

template <typename Sender_, typename Scheduler_ = no_scheduler>
template <typename Sender_, typename Scheduler_ = no_scheduler,
typename = std::enable_if_t<
!is_scheduler_v<std::decay_t<Scheduler_>> ||
std::is_same_v<std::decay_t<Scheduler_>,
run_loop_scheduler>>>
split_sender(Sender_&& sender, Allocator const& allocator,
Scheduler_&& scheduler = Scheduler_{})
: scheduler(HPX_FORWARD(Scheduler_, scheduler))
{
using allocator_type = Allocator;
using other_allocator = typename std::allocator_traits<
allocator_type>::template rebind_alloc<shared_state>;
Allocator>::template rebind_alloc<shared_state>;
using allocator_traits = std::allocator_traits<other_allocator>;
using unique_ptr = std::unique_ptr<shared_state,
util::allocator_deleter<other_allocator>>;
Expand All @@ -476,9 +642,37 @@ namespace hpx::execution::experimental {
alloc, p.get(), HPX_FORWARD(Sender_, sender), allocator);
state = p.release();

// Eager submission means that we start the predecessor
// operation state already when creating the sender. We don't
// wait for another receiver to be connected.
if constexpr (Type == submission_type::eager)
{
state->start();
}
}

template <typename Sender_, typename Sched_,
std::enable_if_t<is_scheduler_v<std::decay_t<Sched_>> &&
!std::is_same_v<std::decay_t<Sched_>,
run_loop_scheduler>,
int> = 0>
split_sender(
Sender_&& sender, Allocator const& allocator, Sched_&& sched)
: scheduler(HPX_FORWARD(Sched_, sched))
{
using sched_shared_state =
shared_state_scheduler<std::decay_t<Sched_>>;
using other_allocator = typename std::allocator_traits<
Allocator>::template rebind_alloc<sched_shared_state>;
using allocator_traits = std::allocator_traits<other_allocator>;
using unique_ptr = std::unique_ptr<sched_shared_state,
util::allocator_deleter<other_allocator>>;

other_allocator alloc(allocator);
unique_ptr p(allocator_traits::allocate(alloc, 1),
hpx::util::allocator_deleter<other_allocator>{alloc});

allocator_traits::construct(alloc, p.get(),
HPX_FORWARD(Sender_, sender), allocator, scheduler);
state = p.release();

if constexpr (Type == submission_type::eager)
{
state->start();
Expand All @@ -490,10 +684,8 @@ namespace hpx::execution::experimental {
run_loop_scheduler const& sched)
: scheduler(sched)
{
using allocator_type = Allocator;
using other_allocator =
typename std::allocator_traits<allocator_type>::
template rebind_alloc<shared_state_run_loop>;
using other_allocator = typename std::allocator_traits<
Allocator>::template rebind_alloc<shared_state_run_loop>;
using allocator_traits = std::allocator_traits<other_allocator>;
using unique_ptr = std::unique_ptr<shared_state_run_loop,
util::allocator_deleter<other_allocator>>;
Expand All @@ -507,9 +699,6 @@ namespace hpx::execution::experimental {
sched.get_run_loop());
state = p.release();

// Eager submission means that we start the predecessor
// operation state already when creating the sender. We don't
// wait for another receiver to be connected.
if constexpr (Type == submission_type::eager)
{
state->start();
Expand Down Expand Up @@ -635,6 +824,32 @@ namespace hpx::execution::experimental {
HPX_FORWARD(Sender, sender), allocator, sched};
}

// Scheduler-aware split for generic (non-run_loop) schedulers.
// Dispatches completions for late-arriving subscribers through the
// provided scheduler, satisfying the P2300 get_completion_scheduler
// contract. This overload is selected when passing a scheduler
// explicitly: tag_invoke(split, my_scheduler, sender, allocator).
// clang-format off
template <typename Scheduler, typename Sender,
typename Allocator = hpx::util::internal_allocator<>,
HPX_CONCEPT_REQUIRES_(
hpx::execution::experimental::is_scheduler_v<Scheduler> &&
!std::is_same_v<std::decay_t<Scheduler>,
hpx::execution::experimental::run_loop_scheduler> &&
hpx::execution::experimental::is_sender_v<Sender> &&
hpx::traits::is_allocator_v<Allocator>
)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_invoke(split_t,
Scheduler&& scheduler, Sender&& sender,
Allocator const& allocator = {})
{
return detail::split_sender<Sender, Allocator,
detail::submission_type::lazy, std::decay_t<Scheduler>>{
HPX_FORWARD(Sender, sender), allocator,
HPX_FORWARD(Scheduler, scheduler)};
}

// clang-format off
template <typename Sender,
typename Allocator = hpx::util::internal_allocator<>,
Expand Down
1 change: 1 addition & 0 deletions libs/core/execution/tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ set(tests
algorithm_let_value
algorithm_run_loop
algorithm_split
algorithm_split_scheduler
algorithm_start_detached
algorithm_sync_wait
algorithm_sync_wait_with_variant
Expand Down
Loading
Loading