From c6a0c77ca0b4f6c40b58e3d63c41ea8c5b7e548e Mon Sep 17 00:00:00 2001 From: arpittkhandelwal Date: Fri, 1 May 2026 10:38:30 +0530 Subject: [PATCH] fix(execution): correct sends_stopped in bulk and schedule_from completion signatures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit bulk_sender and schedule_from_sender hard-coded sends_stopped = false in their generate_completion_signatures struct, even though their internal receivers unconditionally propagate set_stopped from the upstream sender to the downstream receiver. This breaks the Sender/Receiver contract (P2300): a downstream algorithm that trusts sends_stopped=false may not install a stop handler, causing set_stopped to arrive on an unprepared receiver — potential undefined behaviour. Fix bulk_sender to inherit sends_stopped from the upstream: static constexpr bool sends_stopped = sends_stopped_of_v; Fix schedule_from_sender to reflect that cancellation can arrive from either the predecessor sender or the scheduler sender: static constexpr bool sends_stopped = sends_stopped_of_v || sends_stopped_of_v; Note: as_sender and keep_future correctly retain sends_stopped=false because HPX futures have no cancellation mechanism — their receivers never call set_stopped, so the existing value is accurate and is not changed. Add tests to algorithm_bulk.cpp that verify: - bulk over a non-stopping upstream still reports sends_stopped=false - bulk over a stopped_sender_with_value_type now reports sends_stopped=true - Chained bulk layers correctly propagate sends_stopped=true - At runtime, set_stopped is delivered to the downstream receiver --- .../include/hpx/execution/algorithms/bulk.hpp | 3 +- .../execution/algorithms/schedule_from.hpp | 4 ++- .../execution/tests/unit/algorithm_bulk.cpp | 36 +++++++++++++++++++ 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/libs/core/execution/include/hpx/execution/algorithms/bulk.hpp b/libs/core/execution/include/hpx/execution/algorithms/bulk.hpp index ac18f0fe369e..55608b6a8ba3 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/bulk.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/bulk.hpp @@ -61,7 +61,8 @@ namespace hpx::execution::experimental { error_types_of_t, Variant>; - static constexpr bool sends_stopped = false; + static constexpr bool sends_stopped = + sends_stopped_of_v; }; // clang-format off diff --git a/libs/core/execution/include/hpx/execution/algorithms/schedule_from.hpp b/libs/core/execution/include/hpx/execution/algorithms/schedule_from.hpp index 2b6b53cd32a9..17cf36a94763 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/schedule_from.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/schedule_from.hpp @@ -61,7 +61,9 @@ namespace hpx::execution::experimental { predecessor_sender_error_types, scheduler_sender_error_types>; - static constexpr bool sends_stopped = false; + static constexpr bool sends_stopped = + sends_stopped_of_v || + sends_stopped_of_v; }; // clang-format off diff --git a/libs/core/execution/tests/unit/algorithm_bulk.cpp b/libs/core/execution/tests/unit/algorithm_bulk.cpp index a7057e392cc8..9e3984ef1037 100644 --- a/libs/core/execution/tests/unit/algorithm_bulk.cpp +++ b/libs/core/execution/tests/unit/algorithm_bulk.cpp @@ -412,5 +412,41 @@ int main() HPX_TEST_EQ(custom_bulk_call_count, 3); } +#if !defined(HPX_HAVE_STDEXEC) + // Completion signatures: bulk must inherit sends_stopped from its upstream. + // Before the fix, bulk_sender hard-coded sends_stopped = false regardless + // of the upstream sender. This section verifies the corrected behaviour. + { + // Upstream never sends stopped -> bulk also must not advertise stopped. + auto s_false = ex::bulk(ex::just(42), 5, [](int, int) {}); + check_sends_stopped(s_false); + + // Upstream can send stopped -> bulk must propagate that into its + // completion signatures. + auto s_true = ex::bulk(stopped_sender_with_value_type{}, 5, + [](int, int) { HPX_TEST(false); }); + check_sends_stopped(s_true); + + // Chained bulk: stopped propagates through every layer. + auto s_chain = ex::bulk(ex::bulk(stopped_sender_with_value_type{}, 5, + [](int, int) { HPX_TEST(false); }), + 5, [](int, int) { HPX_TEST(false); }); + check_sends_stopped(s_chain); + } + + // Runtime: set_stopped must reach the downstream receiver when the + // upstream sends a stopped signal through bulk. + { + std::atomic set_stopped_called{false}; + auto s = ex::bulk(stopped_sender_with_value_type{}, 5, + [](int, int) { HPX_TEST(false); }); + + auto r = expect_stopped_receiver{set_stopped_called}; + auto os = ex::connect(std::move(s), std::move(r)); + ex::start(os); + HPX_TEST(set_stopped_called); + } +#endif + return hpx::util::report_errors(); }