From ef4d95cb7e307fcd0eab652ea42604687ae8621e Mon Sep 17 00:00:00 2001 From: Andre Senna <“andre.senna@gmail.com”> Date: Tue, 9 Dec 2025 09:47:12 -0300 Subject: [PATCH 1/2] Added sub-processors api --- src/commons/processor/Processor.cc | 63 +++++++++++++++++++++++++++--- src/commons/processor/Processor.h | 12 ++++++ src/tests/cpp/processor_test.cc | 57 +++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 6 deletions(-) diff --git a/src/commons/processor/Processor.cc b/src/commons/processor/Processor.cc index 40741206..d8c04417 100644 --- a/src/commons/processor/Processor.cc +++ b/src/commons/processor/Processor.cc @@ -14,6 +14,7 @@ Processor::Processor(const string& id) { } this->current_state = WAITING_SETUP; this->id = id; + this->parent_processor = nullptr; } Processor::~Processor() { @@ -23,22 +24,65 @@ Processor::~Processor() { } } +void Processor::bind_subprocessor(shared_ptr root, shared_ptr child) { + // static method + if (root->is_setup() || child->is_setup()) { + Utils::error("Can't bind processors after setup() has been called"); + } + if (child->parent_processor != nullptr) { + Utils::error("Invalid attempt to bind processor " + child->to_string() + " to " + root->to_string() + ". It's already bound to " + child->parent_processor->to_string()); + } + root->subprocessors.push_back(child); + child->parent_processor = root; + root->add_subprocessor(child); + child->set_parent(root); +} + void Processor::setup() { - lock_guard semaphore(this->api_mutex); + this->api_mutex.lock(); check_state("setup", WAITING_SETUP); this->current_state = WAITING_START; + this->api_mutex.unlock(); + for (auto subprocess : this->subprocessors) { + if (! subprocess->is_setup()) { + subprocess->setup(); + } + } + if ((this->parent_processor != nullptr) && ! this->parent_processor->is_setup()) { + this->parent_processor->setup(); + } } void Processor::start() { - lock_guard semaphore(this->api_mutex); + this->api_mutex.lock(); check_state("start", WAITING_START); this->current_state = WAITING_STOP; + this->api_mutex.unlock(); + for (auto subprocess : this->subprocessors) { + if (! subprocess->is_running()) { + subprocess->start(); + } + } + if ((this->parent_processor != nullptr) && ! this->parent_processor->is_running()) { + this->parent_processor->start(); + } } void Processor::stop() { - lock_guard semaphore(this->api_mutex); + this->api_mutex.lock(); check_state("stop", WAITING_STOP); this->current_state = FINISHED; + this->api_mutex.unlock(); + for (auto subprocess : this->subprocessors) { + if (! subprocess->is_finished()) { + subprocess->stop(); + } + } + if ((this->parent_processor != nullptr) && ! this->parent_processor->is_finished()) { + this->parent_processor->stop(); + } + this->subprocessors.clear(); + this->parent_processor = nullptr; } bool Processor::is_setup() { @@ -46,15 +90,21 @@ bool Processor::is_setup() { return (this->current_state > WAITING_SETUP); } +bool Processor::is_running() { + lock_guard semaphore(this->api_mutex); + return (this->current_state == WAITING_STOP); +} + bool Processor::is_finished() { lock_guard semaphore(this->api_mutex); return (this->current_state == FINISHED); } -string Processor::to_string() { return this->id; } +void Processor::add_subprocessor(shared_ptr other) {} -// ------------------------------------------------------------------------------------------------- -// Private methods +void Processor::set_parent(shared_ptr other) {} + +string Processor::to_string() { return this->id; } void Processor::check_state(const string& action, State state) { if (this->current_state != state) { @@ -74,6 +124,7 @@ void Processor::check_state(const string& action, State state) { } else { error_message += "Processor is in unexpected state: " + this->current_state; } + this->api_mutex.unlock(); Utils::error(error_message); } } diff --git a/src/commons/processor/Processor.h b/src/commons/processor/Processor.h index 798cc5ed..dd18c529 100644 --- a/src/commons/processor/Processor.h +++ b/src/commons/processor/Processor.h @@ -1,6 +1,8 @@ #pragma once +#include #include +#include using namespace std; @@ -15,10 +17,18 @@ class Processor { Processor(const string& id); ~Processor(); + + static void bind_subprocessor(shared_ptr root, shared_ptr child); + virtual void setup(); virtual void start(); virtual void stop(); + + virtual void add_subprocessor(shared_ptr other); + virtual void set_parent(shared_ptr other); + bool is_setup(); + bool is_running(); bool is_finished(); string to_string(); @@ -27,6 +37,8 @@ class Processor { State current_state; string id; + vector> subprocessors; + shared_ptr parent_processor; mutex api_mutex; }; diff --git a/src/tests/cpp/processor_test.cc b/src/tests/cpp/processor_test.cc index 35aa24dd..36662a25 100644 --- a/src/tests/cpp/processor_test.cc +++ b/src/tests/cpp/processor_test.cc @@ -11,35 +11,92 @@ TEST(ProcessorTest, basics) { EXPECT_EQ(p1.to_string(), "blah"); EXPECT_FALSE(p1.is_setup()); + EXPECT_FALSE(p1.is_running()); EXPECT_FALSE(p1.is_finished()); EXPECT_THROW(p1.start(), runtime_error); EXPECT_THROW(p1.stop(), runtime_error); EXPECT_FALSE(p1.is_setup()); + EXPECT_FALSE(p1.is_running()); EXPECT_FALSE(p1.is_finished()); p1.setup(); EXPECT_TRUE(p1.is_setup()); + EXPECT_FALSE(p1.is_running()); EXPECT_FALSE(p1.is_finished()); EXPECT_THROW(p1.setup(), runtime_error); EXPECT_THROW(p1.stop(), runtime_error); EXPECT_TRUE(p1.is_setup()); + EXPECT_FALSE(p1.is_running()); EXPECT_FALSE(p1.is_finished()); p1.start(); EXPECT_TRUE(p1.is_setup()); + EXPECT_TRUE(p1.is_running()); EXPECT_FALSE(p1.is_finished()); EXPECT_THROW(p1.setup(), runtime_error); EXPECT_THROW(p1.start(), runtime_error); EXPECT_TRUE(p1.is_setup()); + EXPECT_TRUE(p1.is_running()); EXPECT_FALSE(p1.is_finished()); p1.stop(); EXPECT_TRUE(p1.is_setup()); + EXPECT_FALSE(p1.is_running()); EXPECT_TRUE(p1.is_finished()); EXPECT_THROW(p1.setup(), runtime_error); EXPECT_THROW(p1.start(), runtime_error); EXPECT_THROW(p1.stop(), runtime_error); EXPECT_TRUE(p1.is_setup()); + EXPECT_FALSE(p1.is_running()); EXPECT_TRUE(p1.is_finished()); } + +TEST(ProcessorTest, subprocessors) { + shared_ptr a, b, c, d, e, f, g, h; + vector> nodes; + shared_ptr p; + for (unsigned int i = 0; i < 8; i++) { + a = make_shared("a"); + b = make_shared("b"); + c = make_shared("c"); + d = make_shared("d"); + e = make_shared("e"); + f = make_shared("f"); + g = make_shared("g"); + h = make_shared("h"); + nodes = {a, b, c, d, e, f, g, h}; + Processor::bind_subprocessor(a, c); // a // + Processor::bind_subprocessor(a, d); // / \ // + Processor::bind_subprocessor(d, e); // c d // + Processor::bind_subprocessor(d, f); // / | \ // + Processor::bind_subprocessor(d, g); // e f g // + Processor::bind_subprocessor(e, b); // | | // + Processor::bind_subprocessor(f, h); // b h // + + p = nodes[i % nodes.size()]; + for (auto cursor : nodes) { + EXPECT_FALSE(cursor->is_setup()); + EXPECT_FALSE(cursor->is_running()); + EXPECT_FALSE(cursor->is_finished()); + } + p->setup(); + for (auto cursor : nodes) { + EXPECT_TRUE(cursor->is_setup()); + EXPECT_FALSE(cursor->is_running()); + EXPECT_FALSE(cursor->is_finished()); + } + p->start(); + for (auto cursor : nodes) { + EXPECT_TRUE(cursor->is_setup()); + EXPECT_TRUE(cursor->is_running()); + EXPECT_FALSE(cursor->is_finished()); + } + p->stop(); + for (auto cursor : nodes) { + EXPECT_TRUE(cursor->is_setup()); + EXPECT_FALSE(cursor->is_running()); + EXPECT_TRUE(cursor->is_finished()); + } + } +} From a5b8035c2ae9eeefb559fff6fc9edc8b1314b960 Mon Sep 17 00:00:00 2001 From: Andre Senna <“andre.senna@gmail.com”> Date: Tue, 9 Dec 2025 09:49:59 -0300 Subject: [PATCH 2/2] Format fixes --- src/commons/processor/Processor.cc | 16 +++++++++------- src/tests/cpp/processor_test.cc | 14 +++++++------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/commons/processor/Processor.cc b/src/commons/processor/Processor.cc index d8c04417..14ab42fc 100644 --- a/src/commons/processor/Processor.cc +++ b/src/commons/processor/Processor.cc @@ -30,7 +30,9 @@ void Processor::bind_subprocessor(shared_ptr root, shared_ptrparent_processor != nullptr) { - Utils::error("Invalid attempt to bind processor " + child->to_string() + " to " + root->to_string() + ". It's already bound to " + child->parent_processor->to_string()); + Utils::error("Invalid attempt to bind processor " + child->to_string() + " to " + + root->to_string() + ". It's already bound to " + + child->parent_processor->to_string()); } root->subprocessors.push_back(child); child->parent_processor = root; @@ -44,11 +46,11 @@ void Processor::setup() { this->current_state = WAITING_START; this->api_mutex.unlock(); for (auto subprocess : this->subprocessors) { - if (! subprocess->is_setup()) { + if (!subprocess->is_setup()) { subprocess->setup(); } } - if ((this->parent_processor != nullptr) && ! this->parent_processor->is_setup()) { + if ((this->parent_processor != nullptr) && !this->parent_processor->is_setup()) { this->parent_processor->setup(); } } @@ -59,11 +61,11 @@ void Processor::start() { this->current_state = WAITING_STOP; this->api_mutex.unlock(); for (auto subprocess : this->subprocessors) { - if (! subprocess->is_running()) { + if (!subprocess->is_running()) { subprocess->start(); } } - if ((this->parent_processor != nullptr) && ! this->parent_processor->is_running()) { + if ((this->parent_processor != nullptr) && !this->parent_processor->is_running()) { this->parent_processor->start(); } } @@ -74,11 +76,11 @@ void Processor::stop() { this->current_state = FINISHED; this->api_mutex.unlock(); for (auto subprocess : this->subprocessors) { - if (! subprocess->is_finished()) { + if (!subprocess->is_finished()) { subprocess->stop(); } } - if ((this->parent_processor != nullptr) && ! this->parent_processor->is_finished()) { + if ((this->parent_processor != nullptr) && !this->parent_processor->is_finished()) { this->parent_processor->stop(); } this->subprocessors.clear(); diff --git a/src/tests/cpp/processor_test.cc b/src/tests/cpp/processor_test.cc index 36662a25..e126833a 100644 --- a/src/tests/cpp/processor_test.cc +++ b/src/tests/cpp/processor_test.cc @@ -66,13 +66,13 @@ TEST(ProcessorTest, subprocessors) { g = make_shared("g"); h = make_shared("h"); nodes = {a, b, c, d, e, f, g, h}; - Processor::bind_subprocessor(a, c); // a // - Processor::bind_subprocessor(a, d); // / \ // - Processor::bind_subprocessor(d, e); // c d // - Processor::bind_subprocessor(d, f); // / | \ // - Processor::bind_subprocessor(d, g); // e f g // - Processor::bind_subprocessor(e, b); // | | // - Processor::bind_subprocessor(f, h); // b h // + Processor::bind_subprocessor(a, c); // a // + Processor::bind_subprocessor(a, d); // / \ // + Processor::bind_subprocessor(d, e); // c d // + Processor::bind_subprocessor(d, f); // / | \ // + Processor::bind_subprocessor(d, g); // e f g // + Processor::bind_subprocessor(e, b); // | | // + Processor::bind_subprocessor(f, h); // b h // p = nodes[i % nodes.size()]; for (auto cursor : nodes) {