Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions src/agents/atomdb_broker/AtomDBProxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ string AtomDBProxy::LINK = "LINK";

// Proxy Commands
string AtomDBProxy::ADD_ATOMS = "add_atoms";
string AtomDBProxy::DELETE_ATOMS = "delete_atoms";
string AtomDBProxy::START_STREAM = "start_stream";
string AtomDBProxy::END_STREAM = "end_stream";

Expand Down Expand Up @@ -87,7 +88,7 @@ vector<string> AtomDBProxy::add_atoms(const vector<Atom*>& atoms, bool use_strea
// should be re-enabled by default.

if (use_streaming) {
set_stream(AtomDBProxy::START_STREAM, stream_info);
this->to_remote_peer(AtomDBProxy::START_STREAM, stream_info);
}

for (Atom* atom : atoms) {
Expand All @@ -104,20 +105,29 @@ vector<string> AtomDBProxy::add_atoms(const vector<Atom*>& atoms, bool use_strea
args.insert(args.begin(), atom_type);
handles.push_back(atom->handle());
if ((args.size() > AtomDBProxy::COMMAND_SIZE_LIMIT) || (atom == atoms.back())) {
Utils::retry_function([&]() { this->to_remote_peer(AtomDBProxy::ADD_ATOMS, args); },
5,
2000,
"AtomDBProxy::add_atoms");
this->to_remote_peer(AtomDBProxy::ADD_ATOMS, args);
args.clear();
}
}
if (use_streaming) {
set_stream(AtomDBProxy::END_STREAM, {});
this->to_remote_peer(AtomDBProxy::END_STREAM, {});
}

return handles;
}

void AtomDBProxy::delete_atoms(const vector<string>& handles, bool delete_link_targets) {
vector<string> args;
size_t chunk_size = AtomDBProxy::COMMAND_SIZE_LIMIT;
for (size_t i = 0; i < handles.size(); i += chunk_size) {
args.clear();
size_t end = std::min(i + chunk_size, handles.size());
args.insert(args.end(), handles.begin() + i, handles.begin() + end);
args.push_back(delete_link_targets ? "1" : "0");
this->to_remote_peer(AtomDBProxy::DELETE_ATOMS, args);
}
}

// -------------------------------------------------------------------------------------------------
// Server-side API

Expand All @@ -130,7 +140,7 @@ bool AtomDBProxy::from_remote_peer(const string& command, const vector<string>&
if (this->is_processing_buffer) {
this->enqueue_request(args);
} else {
this->handle_add_atoms(args);
this->add_atoms_callback(args);
}
return true;
} else if (command == AtomDBProxy::START_STREAM) {
Expand All @@ -141,13 +151,16 @@ bool AtomDBProxy::from_remote_peer(const string& command, const vector<string>&
LOG_INFO("Ending atom stream.");
this->is_processing_buffer = false;
return true;
} else if (command == AtomDBProxy::DELETE_ATOMS) {
this->delete_atoms_callback(args);
return true;
} else {
Utils::error("Invalid AtomDBProxy command: <" + command + ">");
return false;
}
}

void AtomDBProxy::handle_add_atoms(const vector<string>& tokens) {
void AtomDBProxy::add_atoms_callback(const vector<string>& tokens) {
try {
vector<shared_ptr<Atom>> atoms =
build_atoms_from_tokens<shared_ptr<Atom>>(tokens, shared_ptr_atom_factory);
Expand All @@ -161,6 +174,20 @@ void AtomDBProxy::handle_add_atoms(const vector<string>& tokens) {
}
}

void AtomDBProxy::delete_atoms_callback(const vector<string>& args) {
try {
if (args.size() < 1) {
Utils::error("No handles provided for deletion");
}
vector<string> handles(args.begin(), args.end() - 1);
bool delete_link_targets = args.back() == "1";
uint deleted_count = this->atomdb->delete_atoms(handles, delete_link_targets);
LOG_INFO("Deleted " << deleted_count << " atoms");
} catch (const exception& e) {
LOG_ERROR("Error processing delete_atoms command: " << e.what());
}
}

void AtomDBProxy::enqueue_request(const vector<string>& tokens) {
auto atoms = build_atoms_from_tokens<Atom*>(tokens, raw_ptr_atom_factory);
for (const auto& atom : atoms) {
Expand Down Expand Up @@ -198,9 +225,4 @@ void AtomDBProxy::process_atom_batches() {
}
Utils::sleep();
}
}

void AtomDBProxy::set_stream(const string& command, const vector<string>& args) {
Utils::retry_function(
[&]() { this->to_remote_peer(command, args); }, 5, 2000, "AtomDBProxy::end_stream");
}
37 changes: 29 additions & 8 deletions src/agents/atomdb_broker/AtomDBProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class AtomDBProxy : public BaseProxy {
static int THREAD_POOL_SIZE;
static size_t MAX_PENDING_ATOMS;
static string ADD_ATOMS;
static string DELETE_ATOMS;
static string START_STREAM;
static string END_STREAM;
static string NODE;
Expand Down Expand Up @@ -76,8 +77,25 @@ class AtomDBProxy : public BaseProxy {
* @return Vector of handles corresponding to each atom.
*/
vector<string> add_atoms(const vector<Atom*>& atoms, bool use_streaming = false);

/**
* @brief Send atoms to the remote peer and return their local handles.
*
* This serializes each atom into the RPC arguments and issues an
* ADD_ATOMS command to the remote peer. The returned vector contains the
* handle of each atom in the same order as the input.
*
* @param tokens Vector of tokenized atoms to be sent.
* @return Vector of handles corresponding to each atom.
*/
vector<string> add_atoms(const vector<string>& tokens, bool use_streaming = false);
/**
* @brief Send a DELETE_ATOMS command to the remote peer.
*
* @param handles Vector of handles of atoms to be deleted.
* @param delete_link_targets Whether to delete link targets.
* @return The number of atoms deleted.
*/
void delete_atoms(const vector<string>& handles, bool delete_link_targets = false);

// ---------------------------------------------------------------------------------------------
// server-side API
Expand All @@ -98,14 +116,18 @@ class AtomDBProxy : public BaseProxy {

private:
/**
* @brief Handle an incoming ADD_ATOMS command from a remote peer.
*
* This will build Atom objects from the tokens and apply them to the
* local AtomDB instance. Any exception during processing will be
* reported back to the peer.
* @brief Callback to handle an incoming ADD_ATOMS command from a remote peer.
* @param args Command arguments (tokenized atoms)
*/

void add_atoms_callback(const vector<string>& args);
/**
* @brief Callback to handle an incoming DELETE_ATOMS command from a remote peer.
* @param args Command arguments (tokenized atoms), the last argument is delete_link_targets
* flag with "1" or "0".
*/
void delete_atoms_callback(const vector<string>& args);

void handle_add_atoms(const vector<string>& args);
template <typename AtomDataType, typename Factory>
std::vector<AtomDataType> build_atoms_from_tokens(const vector<string>& tokens, Factory&& factory) {
std::vector<AtomDataType> atoms;
Expand Down Expand Up @@ -141,7 +163,6 @@ class AtomDBProxy : public BaseProxy {
}
void enqueue_request(const vector<string>& tokens);
void process_atom_batches();
void set_stream(const string& command, const vector<string>& args);

mutex api_mutex;
shared_ptr<AtomDB> atomdb;
Expand Down
5 changes: 4 additions & 1 deletion src/service_bus/BusCommandProxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ void BusCommandProxy::setup_proxy_node(const string& client_id, const string& se

void BusCommandProxy::to_remote_peer(const string& command, const vector<string>& args) {
LOG_DEBUG(this->proxy_node->node_id() << " is issuing proxy command <" << command << ">");
this->proxy_node->to_remote_peer(command, args);
Utils::retry_function([&]() { this->proxy_node->to_remote_peer(command, args); },
6,
500,
"BusCommandProxy::to_remote_peer");
}

bool BusCommandProxy::from_remote_peer(const string& command, const vector<string>& args) {
Expand Down
Loading