diff --git a/libs/core/execution/include/hpx/execution/algorithms/ensure_started.hpp b/libs/core/execution/include/hpx/execution/algorithms/ensure_started.hpp index b4a124a2e1cc..c749d20653ee 100644 --- a/libs/core/execution/include/hpx/execution/algorithms/ensure_started.hpp +++ b/libs/core/execution/include/hpx/execution/algorithms/ensure_started.hpp @@ -1,5 +1,6 @@ // Copyright (c) 2021 ETH Zurich // Copyright (c) 2022 Hartmut Kaiser +// Copyright (c) 2026 The STE||AR-Group // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -13,62 +14,438 @@ #include #else +#include #include #include #include -#include +#include +#include +#include +#include +#include #include #include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include +#include #include namespace hpx::execution::experimental { - // execution::ensure_started is used to eagerly start the execution of a - // sender, while also providing a way to attach further work to execute once - // it has completed. - // - // Once ensure_started returns, it is known that the provided sender has - // been connected and start has been called on the resulting operation state - // (see 5.2 Operation states represent work); in other words, the work - // described by the provided sender has been submitted for execution on the - // appropriate execution contexts. Returns a sender which completes when the - // provided sender completes and sends values equivalent to those of the - // provided sender. - // - // If the returned sender is destroyed before execution::connect() is - // called, or if execution::connect() is called but the returned - // operation-state is destroyed before execution::start() is called, then a - // stop-request is sent to the eagerly launched operation and the operation - // is detached and will run to completion in the background. Its result will - // be discarded when it eventually completes. - // - // Note that the application will need to make sure that resources are kept - // alive in the case that the operation detaches. e.g. by holding a - // std::shared_ptr to those resources or otherwise having some out-of-band - // way to signal completion of the operation so that resource release can be - // sequenced after the completion. - // + namespace detail { + + enum class ensure_started_state_enum + { + empty, + started, + completed + }; + + template + struct ensure_started_error_visitor + { + HPX_NO_UNIQUE_ADDRESS std::decay_t& receiver; + + template + void operator()(Error const& error) noexcept + { + hpx::execution::experimental::set_error( + HPX_MOVE(receiver), error); + } + }; + + template + struct ensure_started_value_visitor + { + HPX_NO_UNIQUE_ADDRESS std::decay_t& receiver; + + template + void operator()(Ts const& ts) noexcept + { + hpx::invoke_fused( + hpx::bind_front(hpx::execution::experimental::set_value, + HPX_MOVE(receiver)), + ts); + } + }; + + template + struct ensure_started_sender + { + using is_sender = void; + + template + struct value_types_helper + { + using const_type = + hpx::util::detail::transform_t; + using type = hpx::util::detail::transform_t; + }; + + template + struct generate_completion_signatures + { + template