diff --git a/src/agents/atomdb_broker/AtomDBProcessor.cc b/src/agents/atomdb_broker/AtomDBProcessor.cc index 500cab23..de1c954e 100644 --- a/src/agents/atomdb_broker/AtomDBProcessor.cc +++ b/src/agents/atomdb_broker/AtomDBProcessor.cc @@ -26,7 +26,7 @@ AtomDBProcessor::~AtomDBProcessor() { LOG_INFO("Waiting for thread management to finish..."); this->thread_management.join(); LOG_INFO("Stopping all query threads..."); - lock_guard semaphore(this->query_threads_mutex); + lock_guard lock(this->query_threads_mutex); for (auto& pair : this->query_threads) { LOG_INFO("Stopping thread: " << pair.first); pair.second->stop(); @@ -39,8 +39,9 @@ AtomDBProcessor::~AtomDBProcessor() { shared_ptr AtomDBProcessor::factory_empty_proxy() { return make_shared(); } void AtomDBProcessor::run_command(shared_ptr proxy) { - lock_guard semaphore(this->query_threads_mutex); + lock_guard lock(this->query_threads_mutex); auto atomdb_proxy = dynamic_pointer_cast(proxy); + atomdb_proxy->init_server_side(); string thread_id = "thread<" + proxy->my_id() + "_" + std::to_string(proxy->get_serial()) + ">"; LOG_DEBUG("Starting new thread: " << thread_id << " to run command: <" << proxy->get_command() << ">"); @@ -74,19 +75,28 @@ void AtomDBProcessor::thread_process_one_query(shared_ptr monit LOG_DEBUG("Command finished: <" << command << ">"); LOG_INFO("Aborting command: <" << command << "> in thread: " << monitor->get_id()); - this->query_threads[monitor->get_id()]->stop(false); + proxy->shutdown_server_side(); + { + lock_guard lock(this->query_threads_mutex); + auto it = this->query_threads.find(monitor->get_id()); + if (it != this->query_threads.end()) { + it->second->stop(false); + } + } } void AtomDBProcessor::manage_finished_threads() { while (!this->stop_flag) { - for (auto it = this->query_threads.begin(); it != this->query_threads.end();) { - if (it->second->stopped()) { - lock_guard semaphore(this->query_threads_mutex); - LOG_DEBUG("Removing finished thread: " << it->first); - it->second->stop(); - it = this->query_threads.erase(it); - } else { - ++it; + { + lock_guard lock(this->query_threads_mutex); + for (auto it = this->query_threads.begin(); it != this->query_threads.end();) { + if (it->second->stopped()) { + LOG_DEBUG("Removing finished thread: " << it->first); + it->second->stop(); + it = this->query_threads.erase(it); + } else { + ++it; + } } } Utils::sleep(1000); diff --git a/src/agents/atomdb_broker/AtomDBProxy.cc b/src/agents/atomdb_broker/AtomDBProxy.cc index 91a15343..fcd20c42 100644 --- a/src/agents/atomdb_broker/AtomDBProxy.cc +++ b/src/agents/atomdb_broker/AtomDBProxy.cc @@ -35,28 +35,9 @@ string AtomDBProxy::END_STREAM = "end_stream"; // ------------------------------------------------------------------------------------------------- // Constructor and destructor -AtomDBProxy::AtomDBProxy() : BaseProxy() { - this->command = ServiceBus::ATOMDB; - this->atomdb = AtomDBSingleton::get_instance(); - this->is_processing_buffer = false; - this->processing_queue = new SharedQueue(AtomDBProxy::MAX_PENDING_ATOMS * 2); - this->thread_pool = new ThreadPool(AtomDBProxy::THREAD_POOL_SIZE); - this->processing_thread = thread(&AtomDBProxy::process_atom_batches, this); -} +AtomDBProxy::AtomDBProxy() : BaseProxy() { this->command = ServiceBus::ATOMDB; } -AtomDBProxy::~AtomDBProxy() { - LOG_DEBUG("Shuting down AtomDBProxy, waiting for pending atoms to be processed..."); - while (!this->processing_queue->empty()) { - Utils::sleep(100); - } - this->thread_pool->wait(); - this->stop_processing = true; - this->processing_thread.join(); - delete this->processing_queue; - delete this->thread_pool; - this->abort(); - LOG_DEBUG("AtomDBProxy shut down complete."); -} +AtomDBProxy::~AtomDBProxy() {} // ------------------------------------------------------------------------------------------------- // Client-side API @@ -131,6 +112,26 @@ void AtomDBProxy::delete_atoms(const vector& handles, bool delete_link_t // ------------------------------------------------------------------------------------------------- // Server-side API +void AtomDBProxy::init_server_side() { + this->atomdb = AtomDBSingleton::get_instance(); + this->is_processing_buffer = false; + this->processing_queue = make_shared(MAX_PENDING_ATOMS * 2); + this->thread_pool = make_shared(AtomDBProxy::THREAD_POOL_SIZE); + this->processing_thread = thread(&AtomDBProxy::process_atom_batches, this); +} + +void AtomDBProxy::shutdown_server_side() { + LOG_DEBUG("Shutting down AtomDBProxy, waiting for pending atoms to be processed..."); + while (!this->processing_queue->empty()) { + Utils::sleep(100); + } + this->thread_pool->wait(); + this->stop_processing = true; + this->processing_thread.join(); + this->abort(); + LOG_DEBUG("AtomDBProxy shut down complete."); +} + void AtomDBProxy::untokenize(vector& tokens) { BaseProxy::untokenize(tokens); } bool AtomDBProxy::from_remote_peer(const string& command, const vector& args) { @@ -190,10 +191,10 @@ void AtomDBProxy::delete_atoms_callback(const vector& args) { void AtomDBProxy::enqueue_request(const vector& tokens) { auto atoms = build_atoms_from_tokens(tokens, raw_ptr_atom_factory); + unique_lock lock(this->api_mutex); for (const auto& atom : atoms) { this->processing_queue->enqueue((void*) atom); } - unique_lock lock(this->api_mutex); this->pending_atoms_count += atoms.size(); } @@ -215,7 +216,7 @@ void AtomDBProxy::process_atom_batches() { } this->pending_atoms_count -= atoms.size(); lock.unlock(); - auto job = [this, atoms]() { + auto job = [this, atoms = std::move(atoms)]() { this->atomdb->add_atoms(atoms, false, true); for (auto& atom : atoms) { delete atom; diff --git a/src/agents/atomdb_broker/AtomDBProxy.h b/src/agents/atomdb_broker/AtomDBProxy.h index d6de053c..bb7a9847 100644 --- a/src/agents/atomdb_broker/AtomDBProxy.h +++ b/src/agents/atomdb_broker/AtomDBProxy.h @@ -93,7 +93,7 @@ class AtomDBProxy : public BaseProxy { * * @param handles Vector of handles of atoms to be deleted. * @param delete_link_targets Whether to delete link targets. - * @return The number of atoms deleted. + */ void delete_atoms(const vector& handles, bool delete_link_targets = false); @@ -114,6 +114,10 @@ class AtomDBProxy : public BaseProxy { */ virtual void untokenize(vector& tokens) override; + void init_server_side(); + + void shutdown_server_side(); + private: /** * @brief Callback to handle an incoming ADD_ATOMS command from a remote peer. @@ -170,10 +174,10 @@ class AtomDBProxy : public BaseProxy { static const size_t BATCH_SIZE; static const size_t COMMAND_SIZE_LIMIT; bool is_processing_buffer = false; - SharedQueue* processing_queue = nullptr; + shared_ptr processing_queue = nullptr; size_t pending_atoms_count = 0; thread processing_thread; - ThreadPool* thread_pool; + shared_ptr thread_pool; bool stop_processing = false; };