execution: fix split scheduler preservation for late subscribers (P2300)#7165
execution: fix split scheduler preservation for late subscribers (P2300)#7165arpittkhandelwal wants to merge 6 commits intoTheHPXProject:masterfrom
Conversation
Up to standards ✅🟢 Issues
|
When split(scheduler, sender) is used and the predecessor has already
completed (predecessor_done == true) by the time add_continuation() is
called by a new subscriber, the downstream completion signal was fired
inline on whatever thread called add_continuation(). This violated the
P2300 get_completion_scheduler<CPO> contract, which requires that
completions on the set_value and set_stopped signals are dispatched by
the scheduler passed to split.
The code itself acknowledged this with a TODO comment:
// TODO: Should this preserve the scheduler? It does not
// if we call set_* inline.
Fix:
* Add a virtual schedule_completion(continuation_type&&) method to
shared_state with a default implementation that fires inline (preserving
existing behaviour for the scheduler-free case).
* Replace the two "fire inline" paths inside add_continuation (the
predecessor_done fast-path and the lock-then-done path) with calls to
schedule_completion, so all completion dispatch goes through a single
overridable hook.
* Add shared_state_scheduler<Sched> — a new subclass of shared_state —
that overrides schedule_completion to post the continuation through
schedule(sched). The operation state is kept alive via a self-owning
intrusive_ptr-based holder (mirroring the pattern in start_detached.hpp),
so the async lifetime is correct regardless of how quickly the thread
pool processes the work item.
* Add a second constructor overload to split_sender for generic
(non-run_loop) schedulers that allocates shared_state_scheduler instead
of plain shared_state.
* Add algorithm_split_scheduler unit test that covers:
- Basic split with no scheduler (regression guard)
- split with thread_pool_scheduler: late subscriber receives value on
the pool, not inline
- Multiple concurrent late subscribers all receive the value
- ensure_started (eager submission) is unaffected
No behavioural change for the scheduler-free split or the run_loop split;
only the generic-scheduler path gains the new subclass.
Signed-off-by: arpittkhandelwal <arpitkhandelwal810@gmail.com>
5d7786a to
4afbe1b
Compare
Performance test reportHPX PerformanceComparison
Info
Comparison
Info
Comparison
Info
Explanation of Symbols
|
|
@arpittkhandelwal What's the difference to #6911? |
I think the main difference between this PR and #6911 is all about where the 'source of truth' for the scheduler lives. PR #6911 takes a receiver-centric approach—it essentially looks downstream at the moment of connection to see if the receiver has a preferred scheduler. This is a nice safety net, but it doesn't quite address the core problem: the split(scheduler, sender) call itself is currently ignoring the scheduler we specifically gave it whenever a subscriber arrives late. Better Compliance: The split_sender now strictly follows the P2300 contract by completing on the provided scheduler, even if the receiver doesn't have an environment or a specific scheduler of its own. |
|
@isidorostsa could you have a look, please? |
|
@isidorostsa ping |
There was a problem hiding this comment.
Pull request overview
Fixes a P2300 correctness issue in execution::split where late subscribers (connecting after the predecessor has completed) could receive completion inline instead of via the sender’s completion scheduler.
Changes:
- Introduces
shared_state::schedule_completionand routes late-subscriber completions through it. - Adds
shared_state_scheduler<Sched>to dispatch late completions viaschedule(sched)for generic schedulers. - Adds a new unit test target
algorithm_split_scheduler.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| libs/core/execution/include/hpx/execution/algorithms/split.hpp | Adds scheduler-aware completion routing for late subscribers and new generic-scheduler split support. |
| libs/core/execution/tests/unit/algorithm_split_scheduler.cpp | Adds regression coverage intended to exercise the late-subscriber scheduler-preservation behavior. |
| libs/core/execution/tests/unit/CMakeLists.txt | Registers the new unit test. |
| , op_state(hpx::execution::experimental::connect( | ||
| hpx::execution::experimental::schedule(s), | ||
| schedule_receiver{ | ||
| hpx::intrusive_ptr<schedule_op_holder>(this)})) | ||
| { |
There was a problem hiding this comment.
schedule_op_holder constructs schedule_receiver with intrusive_ptr<schedule_op_holder>(this) while ref_count starts at 0. If schedule(s) or connect(...) throws during op_state construction, the temporary intrusive_ptr can decrement the count back to 0 and call intrusive_ptr_release on a partially-constructed object, which is undefined behaviour. Consider establishing an owning ref before creating the receiver (e.g., set ref_count to 1 and pass intrusive_ptr(this, /add_ref=/false), then create the external owner with add_ref=false), or restructure so op_state is created after the owner intrusive_ptr exists.
| continuation_type&& continuation) override | ||
| { | ||
| using holder_alloc_type = | ||
| typename schedule_op_holder::holder_alloc_type; | ||
| using holder_alloc_traits = | ||
| std::allocator_traits<holder_alloc_type>; | ||
| using holder_unique_ptr = | ||
| std::unique_ptr<schedule_op_holder, | ||
| util::allocator_deleter<holder_alloc_type>>; | ||
|
|
||
| holder_alloc_type holder_alloc(this->alloc); | ||
| holder_unique_ptr p( | ||
| holder_alloc_traits::allocate(holder_alloc, 1), | ||
| hpx::util::allocator_deleter<holder_alloc_type>{ | ||
| holder_alloc}); | ||
| holder_alloc_traits::construct(holder_alloc, p.get(), | ||
| HPX_MOVE(continuation), sched, holder_alloc); | ||
|
|
||
| hpx::intrusive_ptr<schedule_op_holder> owner(p.release()); | ||
| hpx::execution::experimental::start(owner->op_state); |
There was a problem hiding this comment.
shared_state_scheduler::schedule_completion is invoked from split_sender::operation_state::start (which is noexcept). However this implementation performs allocations and calls schedule/connect, any of which may throw; an exception escaping here will trigger std::terminate and can make this path much less robust than the previous inline completion. Consider making schedule_completion noexcept and handling failures explicitly (e.g., catch and either invoke the continuation inline as a fallback, or terminate with a clear message).
| continuation_type&& continuation) override | |
| { | |
| using holder_alloc_type = | |
| typename schedule_op_holder::holder_alloc_type; | |
| using holder_alloc_traits = | |
| std::allocator_traits<holder_alloc_type>; | |
| using holder_unique_ptr = | |
| std::unique_ptr<schedule_op_holder, | |
| util::allocator_deleter<holder_alloc_type>>; | |
| holder_alloc_type holder_alloc(this->alloc); | |
| holder_unique_ptr p( | |
| holder_alloc_traits::allocate(holder_alloc, 1), | |
| hpx::util::allocator_deleter<holder_alloc_type>{ | |
| holder_alloc}); | |
| holder_alloc_traits::construct(holder_alloc, p.get(), | |
| HPX_MOVE(continuation), sched, holder_alloc); | |
| hpx::intrusive_ptr<schedule_op_holder> owner(p.release()); | |
| hpx::execution::experimental::start(owner->op_state); | |
| continuation_type&& continuation) noexcept override | |
| { | |
| try | |
| { | |
| using holder_alloc_type = | |
| typename schedule_op_holder::holder_alloc_type; | |
| using holder_alloc_traits = | |
| std::allocator_traits<holder_alloc_type>; | |
| using holder_unique_ptr = | |
| std::unique_ptr<schedule_op_holder, | |
| util::allocator_deleter<holder_alloc_type>>; | |
| holder_alloc_type holder_alloc(this->alloc); | |
| holder_unique_ptr p( | |
| holder_alloc_traits::allocate(holder_alloc, 1), | |
| hpx::util::allocator_deleter<holder_alloc_type>{ | |
| holder_alloc}); | |
| holder_alloc_traits::construct(holder_alloc, p.get(), | |
| HPX_MOVE(continuation), sched, holder_alloc); | |
| hpx::intrusive_ptr<schedule_op_holder> owner( | |
| p.release()); | |
| hpx::execution::experimental::start(owner->op_state); | |
| } | |
| catch (...) | |
| { | |
| try | |
| { | |
| HPX_MOVE(continuation)(); | |
| } | |
| catch (...) | |
| { | |
| std::terminate(); | |
| } | |
| } |
|
Hey @arpittkhandelwal thanks for the PR! |
Problem Statement
This PR addresses a P2300 correctness violation in the split algorithm. Previously, the split sender failed to preserve the associated scheduler when a receiver connected after the predecessor had already completed (the "late subscriber" scenario). In these cases, the completion signal was fired inline, bypassing the execution context guaranteed by the sender's completion scheduler.
Proposed Changes
Virtualized Completion Hooks: Introduced a virtual void schedule_completion(continuation_type&&) method to the shared_state base class. This allows subclasses to reroute completion signals through the appropriate execution context.
Scheduler-Aware shared_state: Implemented shared_state_scheduler, a new subclass that captures the attached scheduler. Overrode schedule_completion to dispatch stored continuations via hpx::execution::experimental::schedule(sched).
Safe Asynchronous Management: Implemented a self-owning schedule_op_holder to manage the lifetime of the schedule() operation state.
Memory Safety: Adopted the standard HPX allocator pattern, rebinding the shared_state allocator to handle internal task metadata without raw new/delete.
Race Prevention: Added an intrusive_ptr owner guard before calling start() to prevent use-after-free if a scheduler executes synchronously.
CPO & Dispatch Refactoring: Updated split_t overloads to support generic schedulers, enabling both automatic scheduler discovery and explicit injection.
Cleaned up constructor SFINAE in split_sender to handle no_scheduler, run_loop_scheduler, and generic Scheduler types without ambiguity.
Verification Results
New Test Suite: Added algorithm_split_scheduler.cpp which specifically targets the late-subscriber race condition.
Regression Testing: Verified that the legacy no_scheduler and run_loop paths remain unaffected.
Performance: Used HPX_NO_UNIQUE_ADDRESS and intrusive pointers to keep metadata overhead at an absolute minimum.