From 4ff71bbb87782f9212db5574d6aa8ab95c27c754 Mon Sep 17 00:00:00 2001 From: Ben Deane Date: Wed, 17 Jan 2024 15:11:49 -0700 Subject: [PATCH] :sparkles: Add task manager and priority scheduler --- CMakeLists.txt | 4 +- .../async/schedulers/priority_scheduler.hpp | 99 ++++++++++ include/async/schedulers/task.hpp | 49 +++++ include/async/schedulers/task_manager.hpp | 70 +++++++ .../schedulers/task_manager_interface.hpp | 70 +++++++ test/schedulers/CMakeLists.txt | 3 +- test/schedulers/priority_scheduler.cpp | 89 +++++++++ test/schedulers/task_manager.cpp | 176 ++++++++++++++++++ 8 files changed, 557 insertions(+), 3 deletions(-) create mode 100644 include/async/schedulers/priority_scheduler.hpp create mode 100644 include/async/schedulers/task.hpp create mode 100644 include/async/schedulers/task_manager.hpp create mode 100644 include/async/schedulers/task_manager_interface.hpp create mode 100644 test/schedulers/priority_scheduler.cpp create mode 100644 test/schedulers/task_manager.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 0af3054..d31ac3a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,8 +11,8 @@ project( include(cmake/get_cpm.cmake) cpmaddpackage("gh:intel/cicd-repo-infrastructure#main") -add_versioned_package("gh:intel/cpp-std-extensions#0540d81") -add_versioned_package("gh:intel/cpp-baremetal-concurrency#8d49b6d") +add_versioned_package("gh:intel/cpp-std-extensions#8cebc04") +add_versioned_package("gh:intel/cpp-baremetal-concurrency#659771e") add_versioned_package("gh:boostorg/mp11#boost-1.83.0") add_library(async INTERFACE) diff --git a/include/async/schedulers/priority_scheduler.hpp b/include/async/schedulers/priority_scheduler.hpp new file mode 100644 index 0000000..8bc738d --- /dev/null +++ b/include/async/schedulers/priority_scheduler.hpp @@ -0,0 +1,99 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace async { +namespace task_mgr { +template struct op_state : single_linked_task { + template + requires std::same_as> + constexpr explicit(true) op_state(R &&r) : rcvr{std::forward(r)} {} + + auto start() -> void { + if (check_stopped()) { + std::move(rcvr).set_stopped(); + } else { + detail::enqueue_task(*this, P); + } + } + + auto run() -> void final { + if (check_stopped()) { + std::move(rcvr).set_stopped(); + } else { + std::move(rcvr).set_value(); + } + } + + [[no_unique_address]] Rcvr rcvr; + + private: + auto check_stopped() -> bool { + if constexpr (not unstoppable_token>>) { + return get_stop_token(get_env(rcvr)).stop_requested(); + } + return false; + } +}; +} // namespace task_mgr + +template class fixed_priority_scheduler { + + class env { + [[nodiscard]] friend constexpr auto + tag_invoke(get_completion_scheduler_t, env) noexcept + -> fixed_priority_scheduler { + return {}; + } + }; + + struct sender { + using is_sender = void; + + private: + template R> + requires std::same_as> + [[nodiscard]] friend constexpr auto tag_invoke(connect_t, S &&, R &&r) { + return task_mgr::op_state>{ + std::forward(r)}; + } + + [[nodiscard]] friend constexpr auto tag_invoke(get_env_t, + sender) noexcept -> env { + return {}; + } + + template + [[nodiscard]] friend constexpr auto + tag_invoke(get_completion_signatures_t, sender, Env const &) noexcept + -> completion_signatures { + return {}; + } + + template + requires unstoppable_token> + [[nodiscard]] friend constexpr auto + tag_invoke(get_completion_signatures_t, sender, Env const &) noexcept + -> completion_signatures { + return {}; + } + }; + + [[nodiscard]] friend constexpr auto operator==(fixed_priority_scheduler, + fixed_priority_scheduler) + -> bool = default; + + public: + [[nodiscard]] constexpr static auto schedule() -> sender { return {}; } +}; + +} // namespace async diff --git a/include/async/schedulers/task.hpp b/include/async/schedulers/task.hpp new file mode 100644 index 0000000..322a0cb --- /dev/null +++ b/include/async/schedulers/task.hpp @@ -0,0 +1,49 @@ +#pragma once + +#include + +#include +#include +#include +#include + +namespace async { +// NOLINTNEXTLINE(cppcoreguidelines-virtual-class-destructor) +struct task_base { + bool pending{}; + virtual auto run() -> void = 0; +}; + +// NOLINTNEXTLINE(*-virtual-class-destructor,*-special-member-functions) +struct single_linked_task : task_base { + constexpr single_linked_task() = default; + constexpr single_linked_task(single_linked_task &&) = delete; + single_linked_task *next{}; + + private: + [[nodiscard]] friend constexpr auto + operator==(single_linked_task const &lhs, single_linked_task const &rhs) { + return std::addressof(lhs) == std::addressof(rhs); + } +}; + +template +struct task : Base { + constexpr explicit(true) task(F const &f) : func(f) {} + constexpr explicit(true) task(F &&f) : func(std::move(f)) {} + + template auto bind_front(Args &&...args) -> task & { + [&](std::index_sequence) { + ((get(bound_args) = std::forward(args)), ...); + }(std::index_sequence_for{}); + return *this; + } + + auto run() -> void final { bound_args.apply(func); } + + [[no_unique_address]] F func; + [[no_unique_address]] ArgTuple bound_args{}; +}; + +using priority_t = std::uint8_t; +} // namespace async diff --git a/include/async/schedulers/task_manager.hpp b/include/async/schedulers/task_manager.hpp new file mode 100644 index 0000000..edd9b70 --- /dev/null +++ b/include/async/schedulers/task_manager.hpp @@ -0,0 +1,70 @@ +#pragma once + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace async { +template [[nodiscard]] constexpr auto create_priority_task(F &&f) { + using func_t = std::remove_cvref_t; + using args_t = stdx::decayed_args_t; + return task{std::forward(f)}; +} + +namespace detail { +template +concept scheduler_hal = requires { + { T::schedule(priority_t{}) } -> std::same_as; +}; +} + +template +class priority_task_manager { + struct mutex; + std::array, NumPriorities> + task_queues{}; + std::atomic task_count{}; + + public: + auto enqueue_task(single_linked_task &task, priority_t p) -> bool { + return conc::call_in_critical_section([&]() -> bool { + auto const added = not std::exchange(task.pending, true); + if (added) { + ++task_count; + task_queues[p].push_back(std::addressof(task)); + S::schedule(p); + } + return added; + }); + } + + template + auto service_tasks() -> void + requires(P <= NumPriorities) + { + auto &q = task_queues[P]; + while (not std::empty(q)) { + auto &task = q.front(); + conc::call_in_critical_section([&]() { + q.pop_front(); + task.pending = false; + }); + task.run(); + --task_count; + } + } + + [[nodiscard]] auto is_idle() const -> bool { return task_count == 0; } +}; +} // namespace async diff --git a/include/async/schedulers/task_manager_interface.hpp b/include/async/schedulers/task_manager_interface.hpp new file mode 100644 index 0000000..f195da8 --- /dev/null +++ b/include/async/schedulers/task_manager_interface.hpp @@ -0,0 +1,70 @@ +#pragma once + +#include + +#include + +#include +#include +#include + +namespace async { +template +concept task_manager = requires(T &t, single_linked_task &task) { + { t.enqueue_task(task, priority_t{}) } -> std::convertible_to; + { t.template service_tasks() } -> std::same_as; + { t.is_idle() } -> std::convertible_to; +}; + +namespace detail { +struct undefined_task_manager { + template static auto enqueue_task(Args &&...) -> bool { + static_assert(stdx::always_false_v, + "Inject a task manager by specializing " + "async::injected_task_manager."); + return false; + } + + template static auto service_tasks() -> void { + static_assert( + stdx::always_false_v>, + "Inject a task manager by specializing " + "async::injected_task_manager."); + } + + template static auto is_idle(Args &&...) -> bool { + static_assert(stdx::always_false_v, + "Inject a task manager by specializing " + "async::injected_task_manager."); + return true; + } +}; +static_assert(task_manager); +} // namespace detail + +template +inline auto injected_task_manager = detail::undefined_task_manager{}; + +namespace task_mgr { +namespace detail { +template + requires(sizeof...(DummyArgs) == 0) +auto enqueue_task(Args &&...args) -> bool { + return injected_task_manager.enqueue_task( + std::forward(args)...); +} +} // namespace detail + +template + requires(sizeof...(DummyArgs) == 0) +auto service_tasks() -> void { + return injected_task_manager.template service_tasks

(); +} + +template + requires(sizeof...(DummyArgs) == 0) +auto is_idle() -> bool { + return injected_task_manager.is_idle(); +} +} // namespace task_mgr +} // namespace async diff --git a/test/schedulers/CMakeLists.txt b/test/schedulers/CMakeLists.txt index 62881aa..fe0f5a6 100644 --- a/test/schedulers/CMakeLists.txt +++ b/test/schedulers/CMakeLists.txt @@ -1 +1,2 @@ -add_tests(inline_scheduler runloop_scheduler thread_scheduler) +add_tests(inline_scheduler priority_scheduler runloop_scheduler task_manager + thread_scheduler) diff --git a/test/schedulers/priority_scheduler.cpp b/test/schedulers/priority_scheduler.cpp new file mode 100644 index 0000000..3dff597 --- /dev/null +++ b/test/schedulers/priority_scheduler.cpp @@ -0,0 +1,89 @@ +#include "detail/common.hpp" + +#include +#include +#include +#include +#include + +#include + +#include + +namespace { +struct hal { + static auto schedule(async::priority_t) {} +}; + +using task_manager_t = async::priority_task_manager; +} // namespace + +template <> inline auto async::injected_task_manager<> = task_manager_t{}; + +TEST_CASE("fixed_priority_scheduler fulfils concept", "[priority_scheduler]") { + static_assert(async::scheduler>); +} + +TEST_CASE("fixed_priority_scheduler sender advertises nothing", + "[priority_scheduler]") { + static_assert(async::sender_of< + decltype(async::fixed_priority_scheduler<0>::schedule()), + async::set_value_t()>); +} + +TEST_CASE("sender has the fixed_priority_scheduler as its completion scheduler", + "[priority_scheduler]") { + using S = async::fixed_priority_scheduler<0>; + auto s = S::schedule(); + auto cs = + async::get_completion_scheduler(async::get_env(s)); + static_assert(std::same_as); +} + +TEST_CASE("fixed_priority_scheduler schedules tasks", "[priority_scheduler]") { + auto s = async::fixed_priority_scheduler<0>{}; + int var{}; + async::sender auto sndr = + async::on(s, async::just_result_of([&] { var = 42; })); + auto op = async::connect(sndr, universal_receiver{}); + + async::task_mgr::service_tasks<0>(); + CHECK(var == 0); + + op.start(); + async::task_mgr::service_tasks<0>(); + CHECK(var == 42); + CHECK(async::task_mgr::is_idle()); +} + +TEST_CASE("fixed_priority_scheduler is cancellable before start", + "[priority_scheduler]") { + auto s = async::fixed_priority_scheduler<0>{}; + int var{}; + async::sender auto sndr = + async::on(s, async::just_result_of([&] { var = 42; })); + auto r = stoppable_receiver{[&] { var = 17; }}; + auto op = async::connect(sndr, r); + + r.request_stop(); + op.start(); + async::task_mgr::service_tasks<0>(); + CHECK(var == 17); + CHECK(async::task_mgr::is_idle()); +} + +TEST_CASE("fixed_priority_scheduler is cancellable after start", + "[priority_scheduler]") { + auto s = async::fixed_priority_scheduler<0>{}; + int var{}; + async::sender auto sndr = + async::on(s, async::just_result_of([&] { var = 42; })); + auto r = stoppable_receiver{[&] { var = 17; }}; + auto op = async::connect(sndr, r); + + op.start(); + r.request_stop(); + async::task_mgr::service_tasks<0>(); + CHECK(var == 17); + CHECK(async::task_mgr::is_idle()); +} diff --git a/test/schedulers/task_manager.cpp b/test/schedulers/task_manager.cpp new file mode 100644 index 0000000..80b9ae2 --- /dev/null +++ b/test/schedulers/task_manager.cpp @@ -0,0 +1,176 @@ +#include "detail/common.hpp" + +#include + +#include + +#include +#include +#include + +namespace { +struct hal { + static inline std::vector calls{}; + static auto schedule(async::priority_t p) { calls.push_back(p); } +}; + +using task_manager_t = async::priority_task_manager; + +std::function interrupt_fn{}; + +struct test_concurrency_policy { + struct interrupt { + ~interrupt() { + if (interrupt_fn) { + interrupt_fn(); + } + } + }; + + template + static auto call_in_critical_section(F &&f, Pred &&...) -> decltype(auto) { + [[maybe_unused]] interrupt raii_interrupt{}; + return std::forward(f)(); + } +}; +} // namespace + +template <> inline auto conc::injected_policy<> = test_concurrency_policy{}; + +TEST_CASE("create task with lvalue", "[task_manager]") { + int var{}; + auto l = [&] { var = 42; }; + auto task = async::create_priority_task(l); + task.run(); + CHECK(var == 42); +} + +TEST_CASE("create task with rvalue", "[task_manager]") { + int var{}; + auto task = async::create_priority_task([&] { var = 42; }); + task.run(); + CHECK(var == 42); +} + +TEST_CASE("run a task with bound args", "[task_manager]") { + int var{}; + auto task = async::create_priority_task([&](int x) { var = x; }); + task.bind_front(42).run(); + CHECK(var == 42); +} + +TEST_CASE("tasks have reference equality", "[task_manager]") { + auto t = async::create_priority_task([] {}); + auto u = async::create_priority_task([] {}); + CHECK(t == t); + CHECK(t != u); +} + +TEST_CASE("nothing pending", "[task_manager]") { + auto m = task_manager_t{}; + CHECK(m.is_idle()); +} + +TEST_CASE("queue a task", "[task_manager]") { + hal::calls.clear(); + auto m = task_manager_t{}; + auto task = async::create_priority_task([] {}); + m.enqueue_task(task, 3); + CHECK(not m.is_idle()); + REQUIRE(hal::calls.size() == 1); + CHECK(hal::calls[0] == 3); +} + +TEST_CASE("run a queued task", "[task_manager]") { + hal::calls.clear(); + auto m = task_manager_t{}; + int var{}; + auto task = async::create_priority_task([&] { var = 42; }); + m.enqueue_task(task, 3); + CHECK(not m.is_idle()); + REQUIRE(hal::calls.size() == 1); + CHECK(hal::calls[0] == 3); + m.service_tasks<3>(); + CHECK(var == 42); +} + +TEST_CASE("queueing a task is idempotent", "[task_manager]") { + hal::calls.clear(); + auto m = task_manager_t{}; + int var{}; + auto task = async::create_priority_task([&] { var = 42; }); + CHECK(m.enqueue_task(task, 3)); + CHECK(not m.enqueue_task(task, 3)); + REQUIRE(hal::calls.size() == 1); + m.service_tasks<3>(); + CHECK(var == 42); + CHECK(m.is_idle()); +} + +TEST_CASE("run tasks in FIFO order", "[task_manager]") { + auto m = task_manager_t{}; + int var{1}; + auto task1 = async::create_priority_task([&] { var *= 2; }); + auto task2 = async::create_priority_task([&] { var += 2; }); + CHECK(m.enqueue_task(task1, 0)); + CHECK(m.enqueue_task(task2, 0)); + m.service_tasks<0>(); + CHECK(var == 4); + CHECK(m.is_idle()); +} + +TEST_CASE("manager is not idle during a running task", "[task_manager]") { + auto m = task_manager_t{}; + int var{}; + auto task = async::create_priority_task([&] { + CHECK(not m.is_idle()); + var = 42; + }); + CHECK(m.enqueue_task(task, 0)); + m.service_tasks<0>(); + CHECK(var == 42); + CHECK(m.is_idle()); +} + +TEST_CASE("task can requeue itself", "[task_manager]") { + auto m = task_manager_t{}; + int var{}; + + auto task = async::create_priority_task( + [&](task_manager_t *mgr, async::single_linked_task *t) { + if (var++ == 0) { + CHECK(mgr->enqueue_task(*t, 0)); + } + }); + CHECK(m.enqueue_task(task.bind_front(&m, &task), 0)); + m.service_tasks<0>(); + CHECK(var == 2); + CHECK(m.is_idle()); +} + +TEST_CASE("don't run a queued task of a different priority", "[task_manager]") { + auto m = task_manager_t{}; + int var{}; + auto task = async::create_priority_task([&] { var = 42; }); + m.enqueue_task(task, 1); + CHECK(not m.is_idle()); + m.service_tasks<0>(); + CHECK(not m.is_idle()); + CHECK(var == 0); +} + +TEST_CASE("queue a task on interrupt during servicing", "[task_manager]") { + auto m = task_manager_t{}; + int var{}; + auto task = async::create_priority_task([&] { ++var; }); + m.enqueue_task(task, 1); + + interrupt_fn = [&] { + interrupt_fn = {}; + m.enqueue_task(task, 1); + }; + + m.service_tasks<1>(); + CHECK(var == 2); + CHECK(m.is_idle()); +}