Skip to content

Commit

Permalink
✨ Add task manager and priority scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
elbeno committed Jan 17, 2024
1 parent 7b7679f commit 4ff71bb
Show file tree
Hide file tree
Showing 8 changed files with 557 additions and 3 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ project(
include(cmake/get_cpm.cmake)
cpmaddpackage("gh:intel/cicd-repo-infrastructure#main")

add_versioned_package("gh:intel/cpp-std-extensions#0540d81")
add_versioned_package("gh:intel/cpp-baremetal-concurrency#8d49b6d")
add_versioned_package("gh:intel/cpp-std-extensions#8cebc04")
add_versioned_package("gh:intel/cpp-baremetal-concurrency#659771e")
add_versioned_package("gh:boostorg/mp11#boost-1.83.0")

add_library(async INTERFACE)
Expand Down
99 changes: 99 additions & 0 deletions include/async/schedulers/priority_scheduler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#pragma once

#include <async/concepts.hpp>
#include <async/env.hpp>
#include <async/schedulers/task_manager_interface.hpp>
#include <async/stop_token.hpp>
#include <async/tags.hpp>
#include <async/type_traits.hpp>

#include <concepts>
#include <type_traits>
#include <utility>

namespace async {
namespace task_mgr {
template <priority_t P, typename Rcvr> struct op_state : single_linked_task {
template <typename R>
requires std::same_as<Rcvr, std::remove_cvref_t<R>>
constexpr explicit(true) op_state(R &&r) : rcvr{std::forward<R>(r)} {}

auto start() -> void {
if (check_stopped()) {
std::move(rcvr).set_stopped();
} else {
detail::enqueue_task(*this, P);
}
}

auto run() -> void final {
if (check_stopped()) {
std::move(rcvr).set_stopped();
} else {
std::move(rcvr).set_value();
}
}

[[no_unique_address]] Rcvr rcvr;

private:
auto check_stopped() -> bool {
if constexpr (not unstoppable_token<stop_token_of_t<env_of_t<Rcvr>>>) {
return get_stop_token(get_env(rcvr)).stop_requested();
}
return false;
}
};
} // namespace task_mgr

template <priority_t P> class fixed_priority_scheduler {

class env {
[[nodiscard]] friend constexpr auto
tag_invoke(get_completion_scheduler_t<set_value_t>, env) noexcept
-> fixed_priority_scheduler {
return {};
}
};

struct sender {
using is_sender = void;

private:
template <typename S, receiver_from<sender> R>
requires std::same_as<sender, std::remove_cvref_t<S>>
[[nodiscard]] friend constexpr auto tag_invoke(connect_t, S &&, R &&r) {
return task_mgr::op_state<P, std::remove_cvref_t<R>>{
std::forward<R>(r)};
}

[[nodiscard]] friend constexpr auto tag_invoke(get_env_t,
sender) noexcept -> env {
return {};
}

template <typename Env>
[[nodiscard]] friend constexpr auto
tag_invoke(get_completion_signatures_t, sender, Env const &) noexcept
-> completion_signatures<set_value_t(), set_stopped_t()> {
return {};
}

template <typename Env>
requires unstoppable_token<stop_token_of_t<Env>>
[[nodiscard]] friend constexpr auto
tag_invoke(get_completion_signatures_t, sender, Env const &) noexcept
-> completion_signatures<set_value_t()> {
return {};
}
};

[[nodiscard]] friend constexpr auto operator==(fixed_priority_scheduler,
fixed_priority_scheduler)
-> bool = default;

public:
[[nodiscard]] constexpr static auto schedule() -> sender { return {}; }
};

} // namespace async
49 changes: 49 additions & 0 deletions include/async/schedulers/task.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once

#include <stdx/concepts.hpp>

#include <cstddef>
#include <cstdint>
#include <memory>
#include <utility>

namespace async {
// NOLINTNEXTLINE(cppcoreguidelines-virtual-class-destructor)
struct task_base {
bool pending{};
virtual auto run() -> void = 0;
};

// NOLINTNEXTLINE(*-virtual-class-destructor,*-special-member-functions)
struct single_linked_task : task_base {
constexpr single_linked_task() = default;
constexpr single_linked_task(single_linked_task &&) = delete;
single_linked_task *next{};

private:
[[nodiscard]] friend constexpr auto
operator==(single_linked_task const &lhs, single_linked_task const &rhs) {
return std::addressof(lhs) == std::addressof(rhs);
}
};

template <stdx::callable F, typename ArgTuple, typename Base>
struct task : Base {
constexpr explicit(true) task(F const &f) : func(f) {}
constexpr explicit(true) task(F &&f) : func(std::move(f)) {}

template <typename... Args> auto bind_front(Args &&...args) -> task & {
[&]<std::size_t... Is>(std::index_sequence<Is...>) {
((get<Is>(bound_args) = std::forward<Args>(args)), ...);
}(std::index_sequence_for<Args...>{});
return *this;
}

auto run() -> void final { bound_args.apply(func); }

[[no_unique_address]] F func;
[[no_unique_address]] ArgTuple bound_args{};
};

using priority_t = std::uint8_t;
} // namespace async
70 changes: 70 additions & 0 deletions include/async/schedulers/task_manager.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#pragma once

#include <async/schedulers/task_manager_interface.hpp>
#include <conc/concurrency.hpp>

#include <stdx/function_traits.hpp>
#include <stdx/intrusive_forward_list.hpp>
#include <stdx/tuple.hpp>

#include <array>
#include <atomic>
#include <concepts>
#include <cstddef>
#include <memory>
#include <type_traits>
#include <utility>

namespace async {
template <typename F> [[nodiscard]] constexpr auto create_priority_task(F &&f) {
using func_t = std::remove_cvref_t<F>;
using args_t = stdx::decayed_args_t<func_t, stdx::tuple>;
return task<func_t, args_t, single_linked_task>{std::forward<F>(f)};
}

namespace detail {
template <typename T>
concept scheduler_hal = requires {
{ T::schedule(priority_t{}) } -> std::same_as<void>;
};
}

template <detail::scheduler_hal S, std::size_t NumPriorities>
class priority_task_manager {
struct mutex;
std::array<stdx::intrusive_forward_list<single_linked_task>, NumPriorities>
task_queues{};
std::atomic<int> task_count{};

public:
auto enqueue_task(single_linked_task &task, priority_t p) -> bool {
return conc::call_in_critical_section<mutex>([&]() -> bool {
auto const added = not std::exchange(task.pending, true);
if (added) {
++task_count;
task_queues[p].push_back(std::addressof(task));
S::schedule(p);
}
return added;
});
}

template <priority_t P>
auto service_tasks() -> void
requires(P <= NumPriorities)
{
auto &q = task_queues[P];
while (not std::empty(q)) {
auto &task = q.front();
conc::call_in_critical_section<mutex>([&]() {
q.pop_front();
task.pending = false;
});
task.run();
--task_count;
}
}

[[nodiscard]] auto is_idle() const -> bool { return task_count == 0; }
};
} // namespace async
70 changes: 70 additions & 0 deletions include/async/schedulers/task_manager_interface.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#pragma once

#include <async/schedulers/task.hpp>

#include <stdx/type_traits.hpp>

#include <concepts>
#include <type_traits>
#include <utility>

namespace async {
template <typename T>
concept task_manager = requires(T &t, single_linked_task &task) {
{ t.enqueue_task(task, priority_t{}) } -> std::convertible_to<bool>;
{ t.template service_tasks<priority_t{}>() } -> std::same_as<void>;
{ t.is_idle() } -> std::convertible_to<bool>;
};

namespace detail {
struct undefined_task_manager {
template <typename... Args> static auto enqueue_task(Args &&...) -> bool {
static_assert(stdx::always_false_v<Args...>,
"Inject a task manager by specializing "
"async::injected_task_manager.");
return false;
}

template <priority_t P> static auto service_tasks() -> void {
static_assert(
stdx::always_false_v<std::integral_constant<priority_t, P>>,
"Inject a task manager by specializing "
"async::injected_task_manager.");
}

template <typename... Args> static auto is_idle(Args &&...) -> bool {
static_assert(stdx::always_false_v<Args...>,
"Inject a task manager by specializing "
"async::injected_task_manager.");
return true;
}
};
static_assert(task_manager<undefined_task_manager>);
} // namespace detail

template <typename...>
inline auto injected_task_manager = detail::undefined_task_manager{};

namespace task_mgr {
namespace detail {
template <typename... DummyArgs, typename... Args>
requires(sizeof...(DummyArgs) == 0)
auto enqueue_task(Args &&...args) -> bool {
return injected_task_manager<DummyArgs...>.enqueue_task(
std::forward<Args>(args)...);
}
} // namespace detail

template <priority_t P, typename... DummyArgs>
requires(sizeof...(DummyArgs) == 0)
auto service_tasks() -> void {
return injected_task_manager<DummyArgs...>.template service_tasks<P>();
}

template <typename... DummyArgs>
requires(sizeof...(DummyArgs) == 0)
auto is_idle() -> bool {
return injected_task_manager<DummyArgs...>.is_idle();
}
} // namespace task_mgr
} // namespace async
3 changes: 2 additions & 1 deletion test/schedulers/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
add_tests(inline_scheduler runloop_scheduler thread_scheduler)
add_tests(inline_scheduler priority_scheduler runloop_scheduler task_manager
thread_scheduler)
89 changes: 89 additions & 0 deletions test/schedulers/priority_scheduler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#include "detail/common.hpp"

#include <async/concepts.hpp>
#include <async/just_result_of.hpp>
#include <async/on.hpp>
#include <async/schedulers/priority_scheduler.hpp>
#include <async/schedulers/task_manager.hpp>

#include <stdx/concepts.hpp>

#include <catch2/catch_test_macros.hpp>

namespace {
struct hal {
static auto schedule(async::priority_t) {}
};

using task_manager_t = async::priority_task_manager<hal, 8>;
} // namespace

template <> inline auto async::injected_task_manager<> = task_manager_t{};

TEST_CASE("fixed_priority_scheduler fulfils concept", "[priority_scheduler]") {
static_assert(async::scheduler<async::fixed_priority_scheduler<0>>);
}

TEST_CASE("fixed_priority_scheduler sender advertises nothing",
"[priority_scheduler]") {
static_assert(async::sender_of<
decltype(async::fixed_priority_scheduler<0>::schedule()),
async::set_value_t()>);
}

TEST_CASE("sender has the fixed_priority_scheduler as its completion scheduler",
"[priority_scheduler]") {
using S = async::fixed_priority_scheduler<0>;
auto s = S::schedule();
auto cs =
async::get_completion_scheduler<async::set_value_t>(async::get_env(s));
static_assert(std::same_as<decltype(cs), S>);
}

TEST_CASE("fixed_priority_scheduler schedules tasks", "[priority_scheduler]") {
auto s = async::fixed_priority_scheduler<0>{};
int var{};
async::sender auto sndr =
async::on(s, async::just_result_of([&] { var = 42; }));
auto op = async::connect(sndr, universal_receiver{});

async::task_mgr::service_tasks<0>();
CHECK(var == 0);

op.start();
async::task_mgr::service_tasks<0>();
CHECK(var == 42);
CHECK(async::task_mgr::is_idle());
}

TEST_CASE("fixed_priority_scheduler is cancellable before start",
"[priority_scheduler]") {
auto s = async::fixed_priority_scheduler<0>{};
int var{};
async::sender auto sndr =
async::on(s, async::just_result_of([&] { var = 42; }));
auto r = stoppable_receiver{[&] { var = 17; }};
auto op = async::connect(sndr, r);

r.request_stop();
op.start();
async::task_mgr::service_tasks<0>();
CHECK(var == 17);
CHECK(async::task_mgr::is_idle());
}

TEST_CASE("fixed_priority_scheduler is cancellable after start",
"[priority_scheduler]") {
auto s = async::fixed_priority_scheduler<0>{};
int var{};
async::sender auto sndr =
async::on(s, async::just_result_of([&] { var = 42; }));
auto r = stoppable_receiver{[&] { var = 17; }};
auto op = async::connect(sndr, r);

op.start();
r.request_stop();
async::task_mgr::service_tasks<0>();
CHECK(var == 17);
CHECK(async::task_mgr::is_idle());
}
Loading

0 comments on commit 4ff71bb

Please sign in to comment.