diff --git a/src/commons/Utils.cc b/src/commons/Utils.cc index 13471e87..efb8dcfe 100644 --- a/src/commons/Utils.cc +++ b/src/commons/Utils.cc @@ -36,6 +36,17 @@ bool Utils::flip_coin(double true_probability) { return (rand() % f) < lround(true_probability * f); } +unsigned int Utils::uint_rand(unsigned int closed_lower_bound, unsigned int open_upper_bound) { + if (open_upper_bound <= closed_lower_bound) { + Utils::error("Invalid bounds"); + } + return (rand() % (open_upper_bound - closed_lower_bound)) + closed_lower_bound; +} + +unsigned int Utils::uint_rand(unsigned int open_upper_bound) { + return Utils::uint_rand(0, open_upper_bound); +} + void Utils::sleep(unsigned int milliseconds) { this_thread::sleep_for(chrono::milliseconds(milliseconds)); } diff --git a/src/commons/Utils.h b/src/commons/Utils.h index 7c72e3ac..933ea478 100644 --- a/src/commons/Utils.h +++ b/src/commons/Utils.h @@ -52,6 +52,8 @@ class Utils { static void error(string msg, bool throw_flag = true); static bool flip_coin(double true_probability = 0.5); + static unsigned int uint_rand(unsigned int open_upper_bound); + static unsigned int uint_rand(unsigned int closed_lower_bound, unsigned int open_upper_bound); static void sleep(unsigned int milliseconds = 100); static string get_environment(string const& key); static map parse_config(string const& config_path); diff --git a/src/commons/processor/BUILD b/src/commons/processor/BUILD index 16cd2140..a7d08879 100644 --- a/src/commons/processor/BUILD +++ b/src/commons/processor/BUILD @@ -5,10 +5,14 @@ package(default_visibility = ["//visibility:public"]) cc_library( name = "processor_lib", srcs = [ + "DedicatedThread.cc", "Processor.cc", + "ThreadPool.cc", ], hdrs = [ + "DedicatedThread.h", "Processor.h", + "ThreadPool.h", ], includes = ["."], deps = [ diff --git a/src/commons/processor/DedicatedThread.cc b/src/commons/processor/DedicatedThread.cc new file mode 100644 index 00000000..ac7e636e --- /dev/null +++ b/src/commons/processor/DedicatedThread.cc @@ -0,0 +1,70 @@ +#ifndef LOG_LEVEL +#define LOG_LEVEL INFO_LEVEL +#endif +#include "DedicatedThread.h" + +#include "Logger.h" +#include "Utils.h" + +using namespace processor; +using namespace commons; + +// ------------------------------------------------------------------------------------------------- +// Public methods + +DedicatedThread::DedicatedThread(const string& id, ThreadMethod* job) : Processor(id) { + this->job = job; + this->start_flag = false; + this->stop_flag = false; +} + +DedicatedThread::~DedicatedThread() {} + +void DedicatedThread::setup() { + this->thread_object = new thread(&DedicatedThread::thread_method, this); + Processor::setup(); +} + +void DedicatedThread::start() { + if (is_setup()) { + this->api_mutex.lock(); + this->start_flag = true; + this->api_mutex.unlock(); + } + Processor::start(); +} + +void DedicatedThread::stop() { + this->api_mutex.lock(); + this->stop_flag = true; + this->api_mutex.unlock(); + Processor::stop(); + LOG_DEBUG("Joining DedicatedThread " + this->to_string() + "..."); + this->thread_object->join(); + LOG_DEBUG("Joined DedicatedThread " + this->to_string() ". Deleting thread object."); + delete this->thread_object; +} + +// ------------------------------------------------------------------------------------------------- +// Private methods + +bool DedicatedThread::started() { + lock_guard semaphore(this->api_mutex); + return this->start_flag; +} + +bool DedicatedThread::stopped() { + lock_guard semaphore(this->api_mutex); + return this->stop_flag; +} + +void DedicatedThread::thread_method() { + while (!started()) { + Utils::sleep(); + } + do { + if (!this->job->thread_one_step()) { + Utils::sleep(); + }; + } while (!stopped()); +} diff --git a/src/commons/processor/DedicatedThread.h b/src/commons/processor/DedicatedThread.h new file mode 100644 index 00000000..d5cafd43 --- /dev/null +++ b/src/commons/processor/DedicatedThread.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include + +#include "Processor.h" + +using namespace std; + +namespace processor { + +class ThreadMethod { + public: + virtual bool thread_one_step() = 0; +}; + +/** + * + */ +class DedicatedThread : public Processor { + public: + DedicatedThread(const string& id, ThreadMethod* job); + virtual ~DedicatedThread(); + + virtual void setup(); + virtual void start(); + virtual void stop(); + + private: + void thread_method(); + bool inline started(); + bool inline stopped(); + + ThreadMethod* job; + thread* thread_object; + bool start_flag; + bool stop_flag; + mutex api_mutex; +}; + +} // namespace processor diff --git a/src/commons/processor/Processor.h b/src/commons/processor/Processor.h index dd18c529..85e2658f 100644 --- a/src/commons/processor/Processor.h +++ b/src/commons/processor/Processor.h @@ -16,7 +16,7 @@ class Processor { enum State { UNDEFINED = 0, WAITING_SETUP, WAITING_START, WAITING_STOP, FINISHED }; Processor(const string& id); - ~Processor(); + virtual ~Processor(); static void bind_subprocessor(shared_ptr root, shared_ptr child); diff --git a/src/commons/processor/ThreadPool.cc b/src/commons/processor/ThreadPool.cc new file mode 100644 index 00000000..5aa4814d --- /dev/null +++ b/src/commons/processor/ThreadPool.cc @@ -0,0 +1,85 @@ +#include "ThreadPool.h" + +#include "Utils.h" + +using namespace processor; +using namespace commons; + +// ------------------------------------------------------------------------------------------------- +// Public methods + +ThreadPool::ThreadPool(const string& id, unsigned int num_threads) : Processor(id) { + this->num_threads = num_threads; + this->active_tasks = 0; +} + +ThreadPool::~ThreadPool() {} + +void ThreadPool::setup() { Processor::setup(); } + +void ThreadPool::start() { + for (unsigned int i = 0; i < this->num_threads; ++i) { + this->workers.emplace_back([this, i] { + while (true) { + function task; + { + unique_lock lock(this->queue_mutex); + this->condition.wait(lock, + [this] { return this->stop_flag || !this->tasks.empty(); }); + if (this->stop_flag && this->tasks.empty()) return; + + task = move(this->tasks.front()); + this->tasks.pop(); + ++this->active_tasks; + } + task(); + { + unique_lock lock(this->queue_mutex); + --this->active_tasks; + if (this->tasks.empty() && this->active_tasks == 0) { + this->done_condition.notify_all(); + } + } + } + }); + } + Processor::start(); +} + +void ThreadPool::stop() { + { + unique_lock lock(queue_mutex); + this->stop_flag = true; + } + condition.notify_all(); + for (thread& worker : workers) worker.join(); + Processor::stop(); +} + +void ThreadPool::enqueue(function task) { + if (is_running()) { + { + unique_lock lock(queue_mutex); + tasks.emplace(move(task)); + } + condition.notify_one(); + } else { + Utils::error("Attempt to add a job to ThreadPool " + this->to_string() + + " which has not being started"); + } +} + +int ThreadPool::size() { + unique_lock lock(queue_mutex); + return tasks.size(); +} + +bool ThreadPool::empty() { + unique_lock lock(queue_mutex); + return tasks.empty(); +} + +void ThreadPool::wait() { + unique_lock lock(queue_mutex); + done_condition.wait(lock, [this] { return tasks.empty() && active_tasks == 0; }); +} diff --git a/src/commons/processor/ThreadPool.h b/src/commons/processor/ThreadPool.h new file mode 100644 index 00000000..57a36c94 --- /dev/null +++ b/src/commons/processor/ThreadPool.h @@ -0,0 +1,49 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Processor.h" + +using namespace std; + +namespace processor { + +/** + * + */ +class ThreadPool : public Processor { + public: + ThreadPool(const string& id, unsigned int num_threads); + virtual ~ThreadPool(); + + virtual void setup(); + virtual void start(); + virtual void stop(); + + void enqueue(function task); + int size(); + bool empty(); + void wait(); + + private: + unsigned int num_threads; + + vector workers; + queue> tasks; + mutex queue_mutex; + condition_variable condition; + condition_variable done_condition; + unsigned int active_tasks; + bool stop_flag = false; +}; + +} // namespace processor diff --git a/src/tests/cpp/processor_test.cc b/src/tests/cpp/processor_test.cc index e126833a..ad186e17 100644 --- a/src/tests/cpp/processor_test.cc +++ b/src/tests/cpp/processor_test.cc @@ -2,8 +2,14 @@ #include +#include "DedicatedThread.h" +#include "Logger.h" +#include "Utils.h" +#include "processor/ThreadPool.h" + using namespace std; using namespace processor; +using namespace commons; TEST(ProcessorTest, basics) { Processor p1("blah"); @@ -100,3 +106,113 @@ TEST(ProcessorTest, subprocessors) { } } } + +TEST(ProcessorTest, dedicated_thread) { + class TestThreadMethod : public ThreadMethod { + public: + mutex api_mutex; + bool cycle_flag; + unsigned int count; + TestThreadMethod() { + this->count = 0; + this->cycle_flag = true; + } + void set_cycle_flag(bool value) { + lock_guard semaphore(this->api_mutex); + this->cycle_flag = value; + } + bool get_cycle_flag() { + lock_guard semaphore(this->api_mutex); + return this->cycle_flag; + } + unsigned int get_count() { + lock_guard semaphore(this->api_mutex); + return this->count; + } + bool thread_one_step() { + if (get_cycle_flag()) { + this->api_mutex.lock(); + this->count++; + this->api_mutex.unlock(); + set_cycle_flag(false); + } + return false; + } + }; + + TestThreadMethod thread_method; + DedicatedThread dedicated_thread("blah", &thread_method); + EXPECT_EQ(thread_method.get_count(), 0); + dedicated_thread.setup(); + EXPECT_EQ(thread_method.get_count(), 0); + Utils::sleep(1000); + EXPECT_EQ(thread_method.get_count(), 0); + dedicated_thread.start(); + Utils::sleep(1000); + EXPECT_EQ(thread_method.get_count(), 1); + Utils::sleep(1000); + EXPECT_EQ(thread_method.get_count(), 1); + thread_method.set_cycle_flag(true); + Utils::sleep(1000); + EXPECT_EQ(thread_method.get_count(), 2); + dedicated_thread.stop(); + Utils::sleep(1000); + EXPECT_EQ(thread_method.get_count(), 2); +} + +TEST(ProcessorTest, thread_pool) { + mutex count_mutex; + int count = 0; + + auto job_inc = [&count, &count_mutex]() { + count_mutex.lock(); + count++; + count_mutex.unlock(); + Utils::sleep(Utils::uint_rand(10)); + }; + + auto job_dec = [&count, &count_mutex]() { + count_mutex.lock(); + count--; + count_mutex.unlock(); + Utils::sleep(Utils::uint_rand(10)); + }; + + ThreadPool* pool; + for (unsigned int n_threads : {1, 2, 4, 10, 100}) { + for (int n_inc : {100, 200, 300}) { + for (int n_dec : {50, 150, 250}) { + count = 0; + pool = new ThreadPool("Pool:" + std::to_string(n_threads), n_threads); + pool->setup(); + pool->start(); + int count_inc = 0; + int count_dec = 0; + for (int i = 0; i < (n_inc + n_dec); i++) { + if (count_inc == n_inc) { + pool->enqueue(job_dec); + count_dec++; + } else if (count_dec == n_dec) { + pool->enqueue(job_inc); + count_inc++; + } else { + if (Utils::flip_coin(n_inc / ((double) n_inc + n_dec))) { + pool->enqueue(job_inc); + count_inc++; + } else { + pool->enqueue(job_dec); + count_dec++; + } + } + } + EXPECT_EQ(count_inc, n_inc); + EXPECT_EQ(count_dec, n_dec); + pool->wait(); + LOG_INFO("n_threads: " << n_threads << " n_inc: " << n_inc << " n_dec: " << n_dec); + EXPECT_EQ(count, (n_inc - n_dec)); + pool->stop(); + delete (pool); + } + } + } +}