Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ namespace hpx::execution::experimental {
error_types_of_t<Sender, Env, Variant>,
Variant<std::exception_ptr>>;

static constexpr bool sends_stopped = false;
static constexpr bool sends_stopped =
sends_stopped_of_v<Sender, Env>;
};

// clang-format off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ namespace hpx::execution::experimental {
predecessor_sender_error_types<Variant>,
scheduler_sender_error_types<Variant>>;

static constexpr bool sends_stopped = false;
static constexpr bool sends_stopped =
sends_stopped_of_v<Sender, Env> ||
sends_stopped_of_v<scheduler_sender_type, Env>;
Comment on lines +64 to +66
};

// clang-format off
Expand Down
36 changes: 36 additions & 0 deletions libs/core/execution/tests/unit/algorithm_bulk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,5 +412,41 @@ int main()
HPX_TEST_EQ(custom_bulk_call_count, 3);
}

#if !defined(HPX_HAVE_STDEXEC)
// Completion signatures: bulk must inherit sends_stopped from its upstream.
// Before the fix, bulk_sender hard-coded sends_stopped = false regardless
// of the upstream sender. This section verifies the corrected behaviour.
{
// Upstream never sends stopped -> bulk also must not advertise stopped.
auto s_false = ex::bulk(ex::just(42), 5, [](int, int) {});
check_sends_stopped<false>(s_false);

// Upstream can send stopped -> bulk must propagate that into its
// completion signatures.
auto s_true = ex::bulk(stopped_sender_with_value_type{}, 5,
[](int, int) { HPX_TEST(false); });
check_sends_stopped<true>(s_true);

// Chained bulk: stopped propagates through every layer.
auto s_chain = ex::bulk(ex::bulk(stopped_sender_with_value_type{}, 5,
[](int, int) { HPX_TEST(false); }),
5, [](int, int) { HPX_TEST(false); });
check_sends_stopped<true>(s_chain);
}

// Runtime: set_stopped must reach the downstream receiver when the
// upstream sends a stopped signal through bulk.
{
std::atomic<bool> set_stopped_called{false};
auto s = ex::bulk(stopped_sender_with_value_type{}, 5,
[](int, int) { HPX_TEST(false); });

auto r = expect_stopped_receiver{set_stopped_called};
auto os = ex::connect(std::move(s), std::move(r));
ex::start(os);
HPX_TEST(set_stopped_called);
}
#endif

return hpx::util::report_errors();
}
Loading