Skip to content

Commit dff09ac

Browse files
WIP - add work_loop
1 parent 1b98e35 commit dff09ac

File tree

3 files changed

+156
-33
lines changed

3 files changed

+156
-33
lines changed

src/agents/atomdb_broker/AtomDBProcessor.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ void AtomDBProcessor::run_command(shared_ptr<BusCommandProxy> proxy) {
2727
lock_guard<mutex> semaphore(this->query_threads_mutex);
2828
auto atomdb_proxy = dynamic_pointer_cast<AtomDBProxy>(proxy);
2929
string thread_id = "thread<" + proxy->my_id() + "_" + std::to_string(proxy->get_serial()) + ">";
30-
LOG_INFO("Starting new thread: " << thread_id << " to run command: <" << proxy->get_command()
31-
<< ">");
30+
LOG_DEBUG("Starting new thread: " << thread_id << " to run command: <" << proxy->get_command()
31+
<< ">");
3232
if (this->query_threads.find(thread_id) != this->query_threads.end()) {
3333
Utils::error("Invalid thread id: " + thread_id);
3434
} else {
@@ -45,7 +45,7 @@ void AtomDBProcessor::thread_process_one_query(shared_ptr<StoppableThread> monit
4545
proxy->untokenize(proxy->args);
4646
string command = proxy->get_command();
4747
if (command == ServiceBus::ATOMDB) {
48-
LOG_INFO("Processing ATOMDB command...");
48+
LOG_DEBUG("Processing ATOMDB command...");
4949
while (!proxy->is_aborting()) {
5050
Utils::sleep();
5151
}

src/agents/atomdb_broker/AtomDBProxy.cc

Lines changed: 139 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include "AtomDBProxy.h"
22

3+
#include <numeric>
4+
35
#include "AtomDBSingleton.h"
46
#include "BaseProxy.h"
57
#include "Link.h"
@@ -17,22 +19,41 @@ using namespace commons;
1719

1820
// -------------------------------------------------------------------------------------------------
1921
// Static constants
22+
queue<vector<Atom*>> ready_batches;
2023

21-
const size_t AtomDBProxy::BATCH_SIZE = 5000;
24+
const size_t AtomDBProxy::BATCH_SIZE = 100000;
25+
const size_t AtomDBProxy::NUM_THREADS = 10;
2226

2327
// Proxy Commands
2428
string AtomDBProxy::ADD_ATOMS = "add_atoms";
29+
string AtomDBProxy::FLUSH_ATOMS = "flush_atoms";
2530

2631
// -------------------------------------------------------------------------------------------------
2732
// Constructor and destructor
2833

2934
AtomDBProxy::AtomDBProxy() : BaseProxy() {
3035
this->command = ServiceBus::ATOMDB;
3136
this->atomdb = AtomDBSingleton::get_instance();
37+
38+
for (int i = 0; i < NUM_THREADS; ++i) {
39+
this->workers.emplace_back(&AtomDBProxy::worker_loop, this);
40+
}
3241
}
3342

3443
AtomDBProxy::~AtomDBProxy() {
3544
LOG_INFO("Shutdown AtomDBProxy...");
45+
flush_atoms();
46+
{
47+
unique_lock<mutex> lock(this->queue_mutex);
48+
this->stop_processing = true;
49+
}
50+
this->queue_condition.notify_all();
51+
52+
for (thread& worker : this->workers) {
53+
if (worker.joinable()) {
54+
worker.join();
55+
}
56+
}
3657
this->abort();
3758
}
3859

@@ -85,41 +106,15 @@ bool AtomDBProxy::from_remote_peer(const string& command, const vector<string>&
85106
} else if (command == AtomDBProxy::ADD_ATOMS) {
86107
handle_add_atoms(args);
87108
return true;
109+
} else if (command == AtomDBProxy::FLUSH_ATOMS) {
110+
flush_atoms();
111+
return true;
88112
} else {
89113
Utils::error("Invalid AtomDBProxy command: <" + command + ">");
90114
return false;
91115
}
92116
}
93117

94-
void AtomDBProxy::handle_add_atoms(const vector<string>& tokens) {
95-
vector<Atom*> atoms;
96-
try {
97-
atoms = build_atoms_from_tokens(tokens);
98-
LOG_INFO("Processing " << atoms.size() << " atoms...");
99-
100-
if (atoms.empty()) {
101-
LOG_INFO("No atoms were built from tokens. Nothing to process.");
102-
return;
103-
}
104-
105-
this->atomdb->add_atoms(atoms, false, true);
106-
107-
LOG_DEBUG("Cleaning up " << atoms.size() << " atom pointers after successful processing.");
108-
for (Atom* atom : atoms) {
109-
delete atom;
110-
}
111-
atoms.clear();
112-
113-
LOG_INFO("Finished processing all atoms.");
114-
115-
} catch (const exception& e) {
116-
LOG_ERROR("Error processing atoms: " << e.what());
117-
for (Atom* atom : atoms) {
118-
delete atom;
119-
}
120-
}
121-
}
122-
123118
vector<Atom*> AtomDBProxy::build_atoms_from_tokens(const vector<string>& tokens) {
124119
vector<Atom*> atoms;
125120
string current;
@@ -148,3 +143,117 @@ vector<Atom*> AtomDBProxy::build_atoms_from_tokens(const vector<string>& tokens)
148143

149144
return atoms;
150145
}
146+
147+
// void AtomDBProxy::handle_add_atoms(const vector<string>& tokens) {
148+
// vector<Atom*> atoms;
149+
// try {
150+
// atoms = build_atoms_from_tokens(tokens);
151+
// LOG_INFO("Processing " << atoms.size() << " atoms...");
152+
153+
// if (atoms.empty()) {
154+
// LOG_INFO("No atoms were built from tokens. Nothing to process.");
155+
// return;
156+
// }
157+
158+
// this->atomdb->add_atoms(atoms, false, true);
159+
160+
// LOG_DEBUG("Cleaning up " << atoms.size() << " atom pointers after successful processing.");
161+
// for (Atom* atom : atoms) {
162+
// delete atom;
163+
// }
164+
// atoms.clear();
165+
166+
// LOG_INFO("Finished processing all atoms.");
167+
168+
// } catch (const exception& e) {
169+
// LOG_ERROR("Error processing atoms: " << e.what());
170+
// for (Atom* atom : atoms) {
171+
// delete atom;
172+
// }
173+
// }
174+
// }
175+
176+
void AtomDBProxy::handle_add_atoms(const vector<string>& tokens) {
177+
vector<Atom*> atoms = build_atoms_from_tokens(tokens);
178+
LOG_INFO("Received " << atoms.size() << " atoms from peer " << this->peer_id());
179+
add_work(move(atoms));
180+
}
181+
182+
void AtomDBProxy::flush_atoms() {
183+
unique_lock<mutex> lock(this->queue_mutex);
184+
185+
if (this->work_queue.empty()) {
186+
LOG_INFO("[Flush] Received flush command, but accumulator is empty. Nothing to do.");
187+
return;
188+
}
189+
190+
LOG_INFO("[Flush] Received flush command. Flushing " << this->work_queue.size()
191+
<< " remaining batches from accumulator.");
192+
193+
size_t total_remaining = accumulate(
194+
this->work_queue.begin(), this->work_queue.end(), size_t{0}, [](size_t sum, const auto& batch) {
195+
return sum + batch.size();
196+
});
197+
198+
vector<Atom*> final_batch;
199+
final_batch.reserve(total_remaining);
200+
for (auto& batch : this->work_queue) {
201+
final_batch.insert(
202+
final_batch.end(), make_move_iterator(batch.begin()), make_move_iterator(batch.end()));
203+
}
204+
205+
this->work_queue.clear();
206+
ready_batches.push(move(final_batch));
207+
208+
this->queue_condition.notify_one();
209+
}
210+
211+
void AtomDBProxy::add_work(vector<Atom*> atoms) {
212+
if (atoms.empty()) return;
213+
unique_lock<mutex> lock(this->queue_mutex);
214+
this->work_queue.push_back(move(atoms));
215+
216+
size_t total_in_queue = accumulate(
217+
this->work_queue.begin(), this->work_queue.end(), size_t{0}, [](size_t sum, const auto& batch) {
218+
return sum + batch.size();
219+
});
220+
221+
if (total_in_queue < BATCH_SIZE) return;
222+
223+
LOG_DEBUG("[Accumulator] Batch target reached. Total: " << total_in_queue
224+
<< ". Creating super-batch.");
225+
226+
vector<Atom*> final_batch;
227+
final_batch.reserve(total_in_queue);
228+
229+
for (auto& batch : this->work_queue) {
230+
final_batch.insert(
231+
final_batch.end(), make_move_iterator(batch.begin()), make_move_iterator(batch.end()));
232+
}
233+
234+
this->work_queue.clear();
235+
ready_batches.push(move(final_batch));
236+
this->queue_condition.notify_one();
237+
}
238+
239+
void AtomDBProxy::worker_loop() {
240+
while (true) {
241+
vector<Atom*> batch_to_process;
242+
{
243+
unique_lock<mutex> lock(this->queue_mutex);
244+
this->queue_condition.wait(
245+
lock, [this] { return this->stop_processing || !ready_batches.empty(); });
246+
if (this->stop_processing && ready_batches.empty()) return;
247+
batch_to_process = move(ready_batches.front());
248+
ready_batches.pop();
249+
}
250+
LOG_INFO("[Thread " << this_thread::get_id() << "] Processing batch with "
251+
<< batch_to_process.size() << " atoms.");
252+
try {
253+
this->atomdb->add_atoms(batch_to_process, false, true);
254+
LOG_INFO("[Thread " << this_thread::get_id() << "] batch processed successfully.");
255+
} catch (const exception& e) {
256+
LOG_ERROR("Error processing batch: " << e.what());
257+
}
258+
}
259+
}

src/agents/atomdb_broker/AtomDBProxy.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
#pragma once
22

3+
#include <condition_variable>
34
#include <memory>
45
#include <mutex>
56
#include <string>
7+
#include <thread>
68
#include <vector>
79

810
#include "Atom.h"
@@ -28,6 +30,7 @@ class AtomDBProxy : public BaseProxy {
2830
// ---------------------------------------------------------------------------------------------
2931
// Proxy Commands
3032
static string ADD_ATOMS;
33+
static string FLUSH_ATOMS;
3134

3235
// ---------------------------------------------------------------------------------------------
3336
// Constructor and destructor
@@ -114,11 +117,22 @@ class AtomDBProxy : public BaseProxy {
114117
* reported back to the peer.
115118
*/
116119
void handle_add_atoms(const vector<string>& args);
120+
void flush_atoms();
121+
void add_work(vector<Atom*> atoms);
122+
void worker_loop();
117123

118124
mutex api_mutex;
119125
shared_ptr<AtomDB> atomdb;
120126

121127
static const size_t BATCH_SIZE;
128+
static const size_t NUM_THREADS;
129+
130+
vector<thread> workers;
131+
bool stop_processing = false;
132+
133+
vector<vector<Atom*>> work_queue;
134+
mutex queue_mutex;
135+
condition_variable queue_condition;
122136
};
123137

124138
} // namespace atomdb_broker

0 commit comments

Comments
 (0)