Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions src/agents/atomdb_broker/AtomDBProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ AtomDBProcessor::~AtomDBProcessor() {
LOG_INFO("Waiting for thread management to finish...");
this->thread_management.join();
LOG_INFO("Stopping all query threads...");
lock_guard<mutex> semaphore(this->query_threads_mutex);
lock_guard<mutex> lock(this->query_threads_mutex);
for (auto& pair : this->query_threads) {
LOG_INFO("Stopping thread: " << pair.first);
pair.second->stop();
Expand All @@ -39,8 +39,9 @@ AtomDBProcessor::~AtomDBProcessor() {
shared_ptr<BusCommandProxy> AtomDBProcessor::factory_empty_proxy() { return make_shared<AtomDBProxy>(); }

void AtomDBProcessor::run_command(shared_ptr<BusCommandProxy> proxy) {
lock_guard<mutex> semaphore(this->query_threads_mutex);
lock_guard<mutex> lock(this->query_threads_mutex);
auto atomdb_proxy = dynamic_pointer_cast<AtomDBProxy>(proxy);
atomdb_proxy->init_server_side();
string thread_id = "thread<" + proxy->my_id() + "_" + std::to_string(proxy->get_serial()) + ">";
LOG_DEBUG("Starting new thread: " << thread_id << " to run command: <" << proxy->get_command()
<< ">");
Expand Down Expand Up @@ -74,19 +75,28 @@ void AtomDBProcessor::thread_process_one_query(shared_ptr<StoppableThread> monit
LOG_DEBUG("Command finished: <" << command << ">");

LOG_INFO("Aborting command: <" << command << "> in thread: " << monitor->get_id());
this->query_threads[monitor->get_id()]->stop(false);
proxy->shutdown_server_side();
{
lock_guard<mutex> lock(this->query_threads_mutex);
auto it = this->query_threads.find(monitor->get_id());
if (it != this->query_threads.end()) {
it->second->stop(false);
}
}
}

void AtomDBProcessor::manage_finished_threads() {
while (!this->stop_flag) {
for (auto it = this->query_threads.begin(); it != this->query_threads.end();) {
if (it->second->stopped()) {
lock_guard<mutex> semaphore(this->query_threads_mutex);
LOG_DEBUG("Removing finished thread: " << it->first);
it->second->stop();
it = this->query_threads.erase(it);
} else {
++it;
{
lock_guard<mutex> lock(this->query_threads_mutex);
for (auto it = this->query_threads.begin(); it != this->query_threads.end();) {
if (it->second->stopped()) {
LOG_DEBUG("Removing finished thread: " << it->first);
it->second->stop();
it = this->query_threads.erase(it);
} else {
++it;
}
}
}
Utils::sleep(1000);
Expand Down
47 changes: 24 additions & 23 deletions src/agents/atomdb_broker/AtomDBProxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,9 @@ string AtomDBProxy::END_STREAM = "end_stream";
// -------------------------------------------------------------------------------------------------
// Constructor and destructor

AtomDBProxy::AtomDBProxy() : BaseProxy() {
this->command = ServiceBus::ATOMDB;
this->atomdb = AtomDBSingleton::get_instance();
this->is_processing_buffer = false;
this->processing_queue = new SharedQueue(AtomDBProxy::MAX_PENDING_ATOMS * 2);
this->thread_pool = new ThreadPool(AtomDBProxy::THREAD_POOL_SIZE);
this->processing_thread = thread(&AtomDBProxy::process_atom_batches, this);
}
AtomDBProxy::AtomDBProxy() : BaseProxy() { this->command = ServiceBus::ATOMDB; }

AtomDBProxy::~AtomDBProxy() {
LOG_DEBUG("Shuting down AtomDBProxy, waiting for pending atoms to be processed...");
while (!this->processing_queue->empty()) {
Utils::sleep(100);
}
this->thread_pool->wait();
this->stop_processing = true;
this->processing_thread.join();
delete this->processing_queue;
delete this->thread_pool;
this->abort();
LOG_DEBUG("AtomDBProxy shut down complete.");
}
AtomDBProxy::~AtomDBProxy() {}

// -------------------------------------------------------------------------------------------------
// Client-side API
Expand Down Expand Up @@ -131,6 +112,26 @@ void AtomDBProxy::delete_atoms(const vector<string>& handles, bool delete_link_t
// -------------------------------------------------------------------------------------------------
// Server-side API

void AtomDBProxy::init_server_side() {
this->atomdb = AtomDBSingleton::get_instance();
this->is_processing_buffer = false;
this->processing_queue = make_shared<SharedQueue>(MAX_PENDING_ATOMS * 2);
this->thread_pool = make_shared<ThreadPool>(AtomDBProxy::THREAD_POOL_SIZE);
this->processing_thread = thread(&AtomDBProxy::process_atom_batches, this);
}

void AtomDBProxy::shutdown_server_side() {
LOG_DEBUG("Shutting down AtomDBProxy, waiting for pending atoms to be processed...");
while (!this->processing_queue->empty()) {
Utils::sleep(100);
}
this->thread_pool->wait();
this->stop_processing = true;
this->processing_thread.join();
this->abort();
LOG_DEBUG("AtomDBProxy shut down complete.");
}

void AtomDBProxy::untokenize(vector<string>& tokens) { BaseProxy::untokenize(tokens); }

bool AtomDBProxy::from_remote_peer(const string& command, const vector<string>& args) {
Expand Down Expand Up @@ -190,10 +191,10 @@ void AtomDBProxy::delete_atoms_callback(const vector<string>& args) {

void AtomDBProxy::enqueue_request(const vector<string>& tokens) {
auto atoms = build_atoms_from_tokens<Atom*>(tokens, raw_ptr_atom_factory);
unique_lock<mutex> lock(this->api_mutex);
for (const auto& atom : atoms) {
this->processing_queue->enqueue((void*) atom);
}
unique_lock<mutex> lock(this->api_mutex);
this->pending_atoms_count += atoms.size();
}

Expand All @@ -215,7 +216,7 @@ void AtomDBProxy::process_atom_batches() {
}
this->pending_atoms_count -= atoms.size();
lock.unlock();
auto job = [this, atoms]() {
auto job = [this, atoms = std::move(atoms)]() {
this->atomdb->add_atoms(atoms, false, true);
for (auto& atom : atoms) {
delete atom;
Expand Down
10 changes: 7 additions & 3 deletions src/agents/atomdb_broker/AtomDBProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class AtomDBProxy : public BaseProxy {
*
* @param handles Vector of handles of atoms to be deleted.
* @param delete_link_targets Whether to delete link targets.
* @return The number of atoms deleted.

*/
void delete_atoms(const vector<string>& handles, bool delete_link_targets = false);

Expand All @@ -114,6 +114,10 @@ class AtomDBProxy : public BaseProxy {
*/
virtual void untokenize(vector<string>& tokens) override;

void init_server_side();

void shutdown_server_side();

private:
/**
* @brief Callback to handle an incoming ADD_ATOMS command from a remote peer.
Expand Down Expand Up @@ -170,10 +174,10 @@ class AtomDBProxy : public BaseProxy {
static const size_t BATCH_SIZE;
static const size_t COMMAND_SIZE_LIMIT;
bool is_processing_buffer = false;
SharedQueue* processing_queue = nullptr;
shared_ptr<SharedQueue> processing_queue = nullptr;
size_t pending_atoms_count = 0;
thread processing_thread;
ThreadPool* thread_pool;
shared_ptr<ThreadPool> thread_pool;
bool stop_processing = false;
};

Expand Down