Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
// Copyright (c) 2021 ETH Zurich
// Copyright (c) 2022-2025 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include <hpx/config.hpp>
#if defined(HPX_HAVE_STDEXEC)
#include <hpx/modules/execution_base.hpp>
#else

#include <hpx/execution/algorithms/detail/partial_algorithm.hpp>
#include <hpx/modules/concepts.hpp>
#include <hpx/modules/datastructures.hpp>
#include <hpx/modules/errors.hpp>
#include <hpx/modules/execution_base.hpp>
#include <hpx/modules/tag_invoke.hpp>
#include <hpx/modules/type_support.hpp>

#include <exception>
#include <optional>
#include <type_traits>
#include <utility>

namespace hpx::execution::experimental {

namespace detail {

// Maps set_value(Ts...) -> set_value(optional<decay_t<T>>)
// set_stopped() -> set_value(nullopt)
// set_error(E) -> set_error(E)
HPX_CXX_CORE_EXPORT template <typename Receiver, typename T>
struct stopped_as_optional_receiver
{
HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;

template <typename Error>
friend void tag_invoke(set_error_t,
stopped_as_optional_receiver&& r, Error&& error) noexcept
{
hpx::execution::experimental::set_error(
HPX_MOVE(r.receiver), HPX_FORWARD(Error, error));
}

friend void tag_invoke(
set_stopped_t, stopped_as_optional_receiver&& r) noexcept
{
hpx::execution::experimental::set_value(
HPX_MOVE(r.receiver), std::optional<T>(std::nullopt));
}

friend void tag_invoke(set_value_t,
stopped_as_optional_receiver&& r, T&& value) noexcept
{
hpx::detail::try_catch_exception_ptr(
[&]() {
hpx::execution::experimental::set_value(
HPX_MOVE(r.receiver),
std::optional<T>(HPX_MOVE(value)));
},
[&](std::exception_ptr ep) {
hpx::execution::experimental::set_error(
HPX_MOVE(r.receiver), HPX_MOVE(ep));
});
}

friend void tag_invoke(set_value_t,
stopped_as_optional_receiver&& r, T const& value) noexcept
{
hpx::detail::try_catch_exception_ptr(
[&]() {
hpx::execution::experimental::set_value(
HPX_MOVE(r.receiver), std::optional<T>(value));
},
[&](std::exception_ptr ep) {
hpx::execution::experimental::set_error(
HPX_MOVE(r.receiver), HPX_MOVE(ep));
});
}

friend auto tag_invoke(
get_env_t, stopped_as_optional_receiver const& r) noexcept
-> decltype(hpx::execution::experimental::get_env(r.receiver))
{
return hpx::execution::experimental::get_env(r.receiver);
}
};

// Extracts the single value type T from a sender's value_types.
// stopped_as_optional requires exactly one value set (a single T).
template <typename Sender, typename Env>
struct stopped_as_optional_value
{
template <typename... Ts>
struct pack
{
};

// single-value case: set_value(T) -> optional<T>
template <typename T>
static T extract(pack<pack<T>>);

using type =
decltype(extract(value_types_of_t<Sender, Env, pack, pack>{}));
};

template <typename Sender, typename Env>
using stopped_as_optional_value_t =
typename stopped_as_optional_value<Sender, Env>::type;

HPX_CXX_CORE_EXPORT template <typename Sender>
struct stopped_as_optional_sender
{
using is_sender = void;
HPX_NO_UNIQUE_ADDRESS std::decay_t<Sender> sender;

template <typename Env>
struct generate_completion_signatures
{
using value_type = stopped_as_optional_value_t<Sender, Env>;

template <template <typename...> typename Tuple,
template <typename...> typename Variant>
using value_types = Variant<Tuple<std::optional<value_type>>>;

template <template <typename...> typename Variant>
using error_types = hpx::util::detail::unique_concat_t<
error_types_of_t<Sender, Env, Variant>,
Variant<std::exception_ptr>>;

// stopped is consumed and turned into a value
static constexpr bool sends_stopped = false;
};

template <typename Env>
friend auto tag_invoke(get_completion_signatures_t,
stopped_as_optional_sender const&, Env) noexcept
-> generate_completion_signatures<Env>;

// clang-format off
template <typename CPO,
HPX_CONCEPT_REQUIRES_(
meta::value<meta::one_of<
std::decay_t<CPO>, set_value_t, set_stopped_t>> &&
detail::has_completion_scheduler_v<CPO, Sender>
)>
// clang-format on
friend constexpr auto tag_invoke(
hpx::execution::experimental::get_completion_scheduler_t<CPO>
tag,
stopped_as_optional_sender const& s)
{
return tag(s.sender);
}

template <typename Receiver>
friend auto tag_invoke(
connect_t, stopped_as_optional_sender&& s, Receiver&& receiver)
{
using value_type =
stopped_as_optional_value_t<Sender, empty_env>;
return hpx::execution::experimental::connect(HPX_MOVE(s.sender),
stopped_as_optional_receiver<Receiver, value_type>{
HPX_FORWARD(Receiver, receiver)});
}

template <typename Receiver>
friend auto tag_invoke(
connect_t, stopped_as_optional_sender& s, Receiver&& receiver)
{
using value_type =
stopped_as_optional_value_t<Sender, empty_env>;
return hpx::execution::experimental::connect(s.sender,
stopped_as_optional_receiver<Receiver, value_type>{
HPX_FORWARD(Receiver, receiver)});
}
};
} // namespace detail

// stopped_as_optional maps a sender's stopped channel into the value
// channel, wrapping the result in std::optional. If the predecessor
// completes with set_stopped(), the returned sender completes with
// set_value(std::nullopt). If the predecessor completes with
// set_value(T), the returned sender completes with
// set_value(std::optional<T>(t)). Errors pass through unchanged.
HPX_CXX_CORE_EXPORT inline constexpr struct stopped_as_optional_t final
: hpx::functional::detail::tag_priority<stopped_as_optional_t>
{
private:
// clang-format off
template <typename Sender,
HPX_CONCEPT_REQUIRES_(
is_sender_v<Sender> &&
experimental::detail::is_completion_scheduler_tag_invocable_v<
hpx::execution::experimental::set_value_t,
Sender, stopped_as_optional_t
>
)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_override_invoke(
stopped_as_optional_t, Sender&& sender)
{
auto scheduler =
hpx::execution::experimental::get_completion_scheduler<
hpx::execution::experimental::set_value_t>(sender);

return hpx::functional::tag_invoke(stopped_as_optional_t{},
HPX_MOVE(scheduler), HPX_FORWARD(Sender, sender));
}

// clang-format off
template <typename Sender,
HPX_CONCEPT_REQUIRES_(
is_sender_v<Sender>
)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
stopped_as_optional_t, Sender&& sender)
{
return detail::stopped_as_optional_sender<Sender>{
HPX_FORWARD(Sender, sender)};
}

friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
stopped_as_optional_t)
{
return detail::partial_algorithm<stopped_as_optional_t>{};
}
} stopped_as_optional{};
} // namespace hpx::execution::experimental

#endif
1 change: 1 addition & 0 deletions libs/core/execution/tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ set(tests
algorithm_run_loop
algorithm_split
algorithm_start_detached
algorithm_stopped_as_optional
algorithm_sync_wait
algorithm_sync_wait_with_variant
algorithm_then
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) 2021 ETH Zurich
// Copyright (c) 2022 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/config.hpp>
#include <hpx/execution/algorithms/stopped_as_optional.hpp>
#include <hpx/init.hpp>
#include <hpx/modules/execution.hpp>
#include <hpx/modules/testing.hpp>

#include <atomic>
#include <exception>
#include <optional>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>

namespace ex = hpx::execution::experimental;

void test_value_passes_through()
{
auto result = hpx::this_thread::experimental::sync_wait(
ex::stopped_as_optional(ex::just(42)));

HPX_TEST(result.has_value());
auto opt = hpx::get<0>(*result);
HPX_TEST(opt.has_value());
HPX_TEST_EQ(*opt, 42);
}

void test_value_string()
{
auto result = hpx::this_thread::experimental::sync_wait(
ex::stopped_as_optional(ex::just(std::string("hello"))));

HPX_TEST(result.has_value());
auto opt = hpx::get<0>(*result);
HPX_TEST(opt.has_value());
HPX_TEST_EQ(*opt, std::string("hello"));
}

void test_error_propagates()
{
bool caught = false;
try
{
auto s = ex::then(ex::just(),
[]() -> int { throw std::runtime_error("test error"); });
hpx::this_thread::experimental::sync_wait(
ex::stopped_as_optional(std::move(s)));
}
catch (std::runtime_error const&)
{
caught = true;
}
HPX_TEST(caught);
}

void test_pipe_operator()
{
auto result = hpx::this_thread::experimental::sync_wait(
ex::just(99) | ex::stopped_as_optional());

HPX_TEST(result.has_value());
auto opt = hpx::get<0>(*result);
HPX_TEST(opt.has_value());
HPX_TEST_EQ(*opt, 99);
}

int hpx_main()
{
test_value_passes_through();
test_value_string();
test_error_propagates();
test_pipe_operator();
return hpx::local::finalize();
}

int main(int argc, char* argv[])
{
std::vector<std::string> const cfg = {"hpx.os_threads=all"};
hpx::local::init_params init_args;
init_args.cfg = cfg;

HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0,
"HPX main exited with non-zero status");

return hpx::util::report_errors();
}
Loading