diff --git a/async_semaphore.cpp b/async_semaphore.cpp index 57b93c8..7d68bd0 100644 --- a/async_semaphore.cpp +++ b/async_semaphore.cpp @@ -93,6 +93,14 @@ bot(int n, async_semaphore &sem, std::chrono::milliseconds deadline) errors++; \ } + +#define check_le(X, Y) \ + if (X > Y) \ + { \ + printf(#X " <= " #Y " failed: %d <= %d\n", X, Y); \ + errors++; \ + } + int test_value() { int errors = 0; @@ -122,11 +130,53 @@ int test_value() return errors; } +int test_sync() +{ + asio::io_context ioc; + int errors = 0; + async_semaphore se2{ioc.get_executor(), 3}; //allow at most three in parallel + + std::vector order; // isn't 100% defined! + + static int concurrent = 0; + + auto op = + [&](int id, auto && token) + { + return asio::co_spawn(ioc, [&, id]() -> asio::awaitable + { + check_le(concurrent, 3); + concurrent ++; + printf("Entered %d\n", id); + + asio::steady_timer tim{co_await asio::this_coro::executor, std::chrono::milliseconds{10}}; + co_await tim.async_wait(asio::use_awaitable); + printf("Exited %d\n", id); + concurrent --; + }, std::move(token)); + }; + + synchronized(se2, std::bind(op, 0, std::placeholders::_1), asio::detached); + synchronized(se2, std::bind(op, 2, std::placeholders::_1), asio::detached); + synchronized(se2, std::bind(op, 4, std::placeholders::_1), asio::detached); + synchronized(se2, std::bind(op, 6, std::placeholders::_1), asio::detached); + synchronized(se2, std::bind(op, 8, std::placeholders::_1), asio::detached); + synchronized(se2, std::bind(op, 10, std::placeholders::_1), asio::detached); + synchronized(se2, std::bind(op, 12, std::placeholders::_1), asio::detached); + synchronized(se2, std::bind(op, 14, std::placeholders::_1), asio::detached); + + ioc.run(); + + return errors; + +} + int main() { int res = 0; res += test_value(); + res += test_sync(); auto ioc = asio::io_context(ASIO_CONCURRENCY_HINT_UNSAFE); auto sem = async_semaphore(ioc.get_executor(), 10); @@ -139,6 +189,10 @@ main() { return std::chrono::milliseconds(dist(eng)); }; for (int i = 0; i < 100; i += 2) co_spawn(ioc, bot(i, sem, random_time()), detached); + + ioc.run(); + + return res; } diff --git a/include/asioex/async_semaphore.hpp b/include/asioex/async_semaphore.hpp index 2f83fc3..a5313e0 100644 --- a/include/asioex/async_semaphore.hpp +++ b/include/asioex/async_semaphore.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -128,6 +129,77 @@ struct basic_async_semaphore : async_semaphore_base using async_semaphore = basic_async_semaphore<>; +template +struct synchronized_op; + +template +struct synchronized_op +{ + basic_async_semaphore & sm; + Op op; + + struct semaphore_tag {}; + struct op_tag {}; + + static auto make_error_impl(error_code ec, error_code *) + { + return ec; + } + + static auto make_error_impl(error_code ec, std::exception_ptr *) + { + return std::make_exception_ptr(std::system_error(ec)); + } + + static auto make_error(error_code ec) + { + return make_error_impl(ec, static_cast(nullptr)); + } + + template + void operator()(Self && self) // init + { + if (self.get_cancellation_state().cancelled() != asio::cancellation_type::none) + return std::move(self).complete(make_error(asio::error::operation_aborted), Args{}...); + + sm.async_acquire( + asio::experimental::prepend(std::move(self), semaphore_tag{})); + } + + template + void operator()(Self && self, semaphore_tag, error_code ec) // semaphore obtained + { + std::move(op)(asio::experimental::prepend(std::move(self), op_tag{})); + } + + template + void operator()(Self && self, op_tag, Args_ && ... args ) // semaphore obtained + { + sm.release(); + std::move(self).complete(std::forward(args)...); + } +}; + + + +/// Function to run OPs only when the semaphore can be acquired. +/// That way an artificial number of processes can run in parallel. +template()(asio::experimental::detail::deferred_signature_probe{}))::type) + CompletionToken + ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(Executor)> +auto synchronized(basic_async_semaphore & sm, + Op && op, + CompletionToken && completion_token) +{ + using sig_t = typename decltype(std::declval()(asio::experimental::detail::deferred_signature_probe{}))::type; + + using cop = synchronized_op, sig_t>; + + return asio::async_compose(cop{sm, std::forward(op)}, completion_token, sm); + +} + } // namespace asioex #endif