Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/commons/Utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ bool Utils::flip_coin(double true_probability) {
return (rand() % f) < lround(true_probability * f);
}

unsigned int Utils::uint_rand(unsigned int closed_lower_bound, unsigned int open_upper_bound) {
if (open_upper_bound <= closed_lower_bound) {
Utils::error("Invalid bounds");
}
return (rand() % (open_upper_bound - closed_lower_bound)) + closed_lower_bound;
}

unsigned int Utils::uint_rand(unsigned int open_upper_bound) {
return Utils::uint_rand(0, open_upper_bound);
}

void Utils::sleep(unsigned int milliseconds) {
this_thread::sleep_for(chrono::milliseconds(milliseconds));
}
Expand Down
2 changes: 2 additions & 0 deletions src/commons/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class Utils {

static void error(string msg, bool throw_flag = true);
static bool flip_coin(double true_probability = 0.5);
static unsigned int uint_rand(unsigned int open_upper_bound);
static unsigned int uint_rand(unsigned int closed_lower_bound, unsigned int open_upper_bound);
static void sleep(unsigned int milliseconds = 100);
static string get_environment(string const& key);
static map<string, string> parse_config(string const& config_path);
Expand Down
4 changes: 4 additions & 0 deletions src/commons/processor/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ package(default_visibility = ["//visibility:public"])
cc_library(
name = "processor_lib",
srcs = [
"DedicatedThread.cc",
"Processor.cc",
"ThreadPool.cc",
],
hdrs = [
"DedicatedThread.h",
"Processor.h",
"ThreadPool.h",
],
includes = ["."],
deps = [
Expand Down
70 changes: 70 additions & 0 deletions src/commons/processor/DedicatedThread.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#ifndef LOG_LEVEL
#define LOG_LEVEL INFO_LEVEL
#endif
#include "DedicatedThread.h"

#include "Logger.h"
#include "Utils.h"

using namespace processor;
using namespace commons;

// -------------------------------------------------------------------------------------------------
// Public methods

DedicatedThread::DedicatedThread(const string& id, ThreadMethod* job) : Processor(id) {
this->job = job;
this->start_flag = false;
this->stop_flag = false;
}

DedicatedThread::~DedicatedThread() {}

void DedicatedThread::setup() {
this->thread_object = new thread(&DedicatedThread::thread_method, this);
Processor::setup();
}

void DedicatedThread::start() {
if (is_setup()) {
this->api_mutex.lock();
this->start_flag = true;
this->api_mutex.unlock();
}
Processor::start();
}

void DedicatedThread::stop() {
this->api_mutex.lock();
this->stop_flag = true;
this->api_mutex.unlock();
Processor::stop();
LOG_DEBUG("Joining DedicatedThread " + this->to_string() + "...");
this->thread_object->join();
LOG_DEBUG("Joined DedicatedThread " + this->to_string() ". Deleting thread object.");
delete this->thread_object;
}

// -------------------------------------------------------------------------------------------------
// Private methods

bool DedicatedThread::started() {
lock_guard<mutex> semaphore(this->api_mutex);
return this->start_flag;
}

bool DedicatedThread::stopped() {
lock_guard<mutex> semaphore(this->api_mutex);
return this->stop_flag;
}

void DedicatedThread::thread_method() {
while (!started()) {
Utils::sleep();
}
do {
if (!this->job->thread_one_step()) {
Utils::sleep();
};
} while (!stopped());
}
41 changes: 41 additions & 0 deletions src/commons/processor/DedicatedThread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <mutex>
#include <thread>

#include "Processor.h"

using namespace std;

namespace processor {

class ThreadMethod {
public:
virtual bool thread_one_step() = 0;
};

/**
*
*/
class DedicatedThread : public Processor {
public:
DedicatedThread(const string& id, ThreadMethod* job);
virtual ~DedicatedThread();

virtual void setup();
virtual void start();
virtual void stop();

private:
void thread_method();
bool inline started();
bool inline stopped();

ThreadMethod* job;
thread* thread_object;
bool start_flag;
bool stop_flag;
mutex api_mutex;
};

} // namespace processor
2 changes: 1 addition & 1 deletion src/commons/processor/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Processor {
enum State { UNDEFINED = 0, WAITING_SETUP, WAITING_START, WAITING_STOP, FINISHED };

Processor(const string& id);
~Processor();
virtual ~Processor();

static void bind_subprocessor(shared_ptr<Processor> root, shared_ptr<Processor> child);

Expand Down
85 changes: 85 additions & 0 deletions src/commons/processor/ThreadPool.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#include "ThreadPool.h"

#include "Utils.h"

using namespace processor;
using namespace commons;

// -------------------------------------------------------------------------------------------------
// Public methods

ThreadPool::ThreadPool(const string& id, unsigned int num_threads) : Processor(id) {
this->num_threads = num_threads;
this->active_tasks = 0;
}

ThreadPool::~ThreadPool() {}

void ThreadPool::setup() { Processor::setup(); }

void ThreadPool::start() {
for (unsigned int i = 0; i < this->num_threads; ++i) {
this->workers.emplace_back([this, i] {
while (true) {
function<void()> task;
{
unique_lock<mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this] { return this->stop_flag || !this->tasks.empty(); });
if (this->stop_flag && this->tasks.empty()) return;

task = move(this->tasks.front());
this->tasks.pop();
++this->active_tasks;
}
task();
{
unique_lock<mutex> lock(this->queue_mutex);
--this->active_tasks;
if (this->tasks.empty() && this->active_tasks == 0) {
this->done_condition.notify_all();
}
}
}
});
}
Processor::start();
}

void ThreadPool::stop() {
{
unique_lock<mutex> lock(queue_mutex);
this->stop_flag = true;
}
condition.notify_all();
for (thread& worker : workers) worker.join();
Processor::stop();
}

void ThreadPool::enqueue(function<void()> task) {
if (is_running()) {
{
unique_lock<mutex> lock(queue_mutex);
tasks.emplace(move(task));
}
condition.notify_one();
} else {
Utils::error("Attempt to add a job to ThreadPool " + this->to_string() +
" which has not being started");
}
}

int ThreadPool::size() {
unique_lock<mutex> lock(queue_mutex);
return tasks.size();
}

bool ThreadPool::empty() {
unique_lock<mutex> lock(queue_mutex);
return tasks.empty();
}

void ThreadPool::wait() {
unique_lock<mutex> lock(queue_mutex);
done_condition.wait(lock, [this] { return tasks.empty() && active_tasks == 0; });
}
49 changes: 49 additions & 0 deletions src/commons/processor/ThreadPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once

#include <condition_variable>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>

#include "Processor.h"

using namespace std;

namespace processor {

/**
*
*/
class ThreadPool : public Processor {
public:
ThreadPool(const string& id, unsigned int num_threads);
virtual ~ThreadPool();

virtual void setup();
virtual void start();
virtual void stop();

void enqueue(function<void()> task);
int size();
bool empty();
void wait();

private:
unsigned int num_threads;

vector<thread> workers;
queue<function<void()>> tasks;
mutex queue_mutex;
condition_variable condition;
condition_variable done_condition;
unsigned int active_tasks;
bool stop_flag = false;
};

} // namespace processor
Loading