Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 59 additions & 6 deletions src/commons/processor/Processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Processor::Processor(const string& id) {
}
this->current_state = WAITING_SETUP;
this->id = id;
this->parent_processor = nullptr;
}

Processor::~Processor() {
Expand All @@ -23,38 +24,89 @@ Processor::~Processor() {
}
}

void Processor::bind_subprocessor(shared_ptr<Processor> root, shared_ptr<Processor> child) {
// static method
if (root->is_setup() || child->is_setup()) {
Utils::error("Can't bind processors after setup() has been called");
}
if (child->parent_processor != nullptr) {
Utils::error("Invalid attempt to bind processor " + child->to_string() + " to " +
root->to_string() + ". It's already bound to " +
child->parent_processor->to_string());
}
root->subprocessors.push_back(child);
child->parent_processor = root;
root->add_subprocessor(child);
child->set_parent(root);
}

void Processor::setup() {
lock_guard<mutex> semaphore(this->api_mutex);
this->api_mutex.lock();
check_state("setup", WAITING_SETUP);
this->current_state = WAITING_START;
this->api_mutex.unlock();
for (auto subprocess : this->subprocessors) {
if (!subprocess->is_setup()) {
subprocess->setup();
}
}
if ((this->parent_processor != nullptr) && !this->parent_processor->is_setup()) {
this->parent_processor->setup();
}
}

void Processor::start() {
lock_guard<mutex> semaphore(this->api_mutex);
this->api_mutex.lock();
check_state("start", WAITING_START);
this->current_state = WAITING_STOP;
this->api_mutex.unlock();
for (auto subprocess : this->subprocessors) {
if (!subprocess->is_running()) {
subprocess->start();
}
}
if ((this->parent_processor != nullptr) && !this->parent_processor->is_running()) {
this->parent_processor->start();
}
}

void Processor::stop() {
lock_guard<mutex> semaphore(this->api_mutex);
this->api_mutex.lock();
check_state("stop", WAITING_STOP);
this->current_state = FINISHED;
this->api_mutex.unlock();
for (auto subprocess : this->subprocessors) {
if (!subprocess->is_finished()) {
subprocess->stop();
}
}
if ((this->parent_processor != nullptr) && !this->parent_processor->is_finished()) {
this->parent_processor->stop();
}
this->subprocessors.clear();
this->parent_processor = nullptr;
}

bool Processor::is_setup() {
lock_guard<mutex> semaphore(this->api_mutex);
return (this->current_state > WAITING_SETUP);
}

bool Processor::is_running() {
lock_guard<mutex> semaphore(this->api_mutex);
return (this->current_state == WAITING_STOP);
}

bool Processor::is_finished() {
lock_guard<mutex> semaphore(this->api_mutex);
return (this->current_state == FINISHED);
}

string Processor::to_string() { return this->id; }
void Processor::add_subprocessor(shared_ptr<Processor> other) {}

// -------------------------------------------------------------------------------------------------
// Private methods
void Processor::set_parent(shared_ptr<Processor> other) {}

string Processor::to_string() { return this->id; }

void Processor::check_state(const string& action, State state) {
if (this->current_state != state) {
Expand All @@ -74,6 +126,7 @@ void Processor::check_state(const string& action, State state) {
} else {
error_message += "Processor is in unexpected state: " + this->current_state;
}
this->api_mutex.unlock();
Utils::error(error_message);
}
}
12 changes: 12 additions & 0 deletions src/commons/processor/Processor.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <memory>
#include <mutex>
#include <vector>

using namespace std;

Expand All @@ -15,10 +17,18 @@ class Processor {

Processor(const string& id);
~Processor();

static void bind_subprocessor(shared_ptr<Processor> root, shared_ptr<Processor> child);

virtual void setup();
virtual void start();
virtual void stop();

virtual void add_subprocessor(shared_ptr<Processor> other);
virtual void set_parent(shared_ptr<Processor> other);

bool is_setup();
bool is_running();
bool is_finished();
string to_string();

Expand All @@ -27,6 +37,8 @@ class Processor {

State current_state;
string id;
vector<shared_ptr<Processor>> subprocessors;
shared_ptr<Processor> parent_processor;
mutex api_mutex;
};

Expand Down
57 changes: 57 additions & 0 deletions src/tests/cpp/processor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,92 @@ TEST(ProcessorTest, basics) {
EXPECT_EQ(p1.to_string(), "blah");

EXPECT_FALSE(p1.is_setup());
EXPECT_FALSE(p1.is_running());
EXPECT_FALSE(p1.is_finished());

EXPECT_THROW(p1.start(), runtime_error);
EXPECT_THROW(p1.stop(), runtime_error);
EXPECT_FALSE(p1.is_setup());
EXPECT_FALSE(p1.is_running());
EXPECT_FALSE(p1.is_finished());
p1.setup();
EXPECT_TRUE(p1.is_setup());
EXPECT_FALSE(p1.is_running());
EXPECT_FALSE(p1.is_finished());

EXPECT_THROW(p1.setup(), runtime_error);
EXPECT_THROW(p1.stop(), runtime_error);
EXPECT_TRUE(p1.is_setup());
EXPECT_FALSE(p1.is_running());
EXPECT_FALSE(p1.is_finished());
p1.start();
EXPECT_TRUE(p1.is_setup());
EXPECT_TRUE(p1.is_running());
EXPECT_FALSE(p1.is_finished());

EXPECT_THROW(p1.setup(), runtime_error);
EXPECT_THROW(p1.start(), runtime_error);
EXPECT_TRUE(p1.is_setup());
EXPECT_TRUE(p1.is_running());
EXPECT_FALSE(p1.is_finished());
p1.stop();
EXPECT_TRUE(p1.is_setup());
EXPECT_FALSE(p1.is_running());
EXPECT_TRUE(p1.is_finished());

EXPECT_THROW(p1.setup(), runtime_error);
EXPECT_THROW(p1.start(), runtime_error);
EXPECT_THROW(p1.stop(), runtime_error);
EXPECT_TRUE(p1.is_setup());
EXPECT_FALSE(p1.is_running());
EXPECT_TRUE(p1.is_finished());
}

TEST(ProcessorTest, subprocessors) {
shared_ptr<Processor> a, b, c, d, e, f, g, h;
vector<shared_ptr<Processor>> nodes;
shared_ptr<Processor> p;
for (unsigned int i = 0; i < 8; i++) {
a = make_shared<Processor>("a");
b = make_shared<Processor>("b");
c = make_shared<Processor>("c");
d = make_shared<Processor>("d");
e = make_shared<Processor>("e");
f = make_shared<Processor>("f");
g = make_shared<Processor>("g");
h = make_shared<Processor>("h");
nodes = {a, b, c, d, e, f, g, h};
Processor::bind_subprocessor(a, c); // a //
Processor::bind_subprocessor(a, d); // / \ //
Processor::bind_subprocessor(d, e); // c d //
Processor::bind_subprocessor(d, f); // / | \ //
Processor::bind_subprocessor(d, g); // e f g //
Processor::bind_subprocessor(e, b); // | | //
Processor::bind_subprocessor(f, h); // b h //

p = nodes[i % nodes.size()];
for (auto cursor : nodes) {
EXPECT_FALSE(cursor->is_setup());
EXPECT_FALSE(cursor->is_running());
EXPECT_FALSE(cursor->is_finished());
}
p->setup();
for (auto cursor : nodes) {
EXPECT_TRUE(cursor->is_setup());
EXPECT_FALSE(cursor->is_running());
EXPECT_FALSE(cursor->is_finished());
}
p->start();
for (auto cursor : nodes) {
EXPECT_TRUE(cursor->is_setup());
EXPECT_TRUE(cursor->is_running());
EXPECT_FALSE(cursor->is_finished());
}
p->stop();
for (auto cursor : nodes) {
EXPECT_TRUE(cursor->is_setup());
EXPECT_FALSE(cursor->is_running());
EXPECT_TRUE(cursor->is_finished());
}
}
}