diff --git a/src/agents/atomdb_broker/AtomDBProxy.cc b/src/agents/atomdb_broker/AtomDBProxy.cc index 36ce9f55..91a15343 100644 --- a/src/agents/atomdb_broker/AtomDBProxy.cc +++ b/src/agents/atomdb_broker/AtomDBProxy.cc @@ -28,6 +28,7 @@ string AtomDBProxy::LINK = "LINK"; // Proxy Commands string AtomDBProxy::ADD_ATOMS = "add_atoms"; +string AtomDBProxy::DELETE_ATOMS = "delete_atoms"; string AtomDBProxy::START_STREAM = "start_stream"; string AtomDBProxy::END_STREAM = "end_stream"; @@ -87,7 +88,7 @@ vector AtomDBProxy::add_atoms(const vector& atoms, bool use_strea // should be re-enabled by default. if (use_streaming) { - set_stream(AtomDBProxy::START_STREAM, stream_info); + this->to_remote_peer(AtomDBProxy::START_STREAM, stream_info); } for (Atom* atom : atoms) { @@ -104,20 +105,29 @@ vector AtomDBProxy::add_atoms(const vector& atoms, bool use_strea args.insert(args.begin(), atom_type); handles.push_back(atom->handle()); if ((args.size() > AtomDBProxy::COMMAND_SIZE_LIMIT) || (atom == atoms.back())) { - Utils::retry_function([&]() { this->to_remote_peer(AtomDBProxy::ADD_ATOMS, args); }, - 5, - 2000, - "AtomDBProxy::add_atoms"); + this->to_remote_peer(AtomDBProxy::ADD_ATOMS, args); args.clear(); } } if (use_streaming) { - set_stream(AtomDBProxy::END_STREAM, {}); + this->to_remote_peer(AtomDBProxy::END_STREAM, {}); } return handles; } +void AtomDBProxy::delete_atoms(const vector& handles, bool delete_link_targets) { + vector args; + size_t chunk_size = AtomDBProxy::COMMAND_SIZE_LIMIT; + for (size_t i = 0; i < handles.size(); i += chunk_size) { + args.clear(); + size_t end = std::min(i + chunk_size, handles.size()); + args.insert(args.end(), handles.begin() + i, handles.begin() + end); + args.push_back(delete_link_targets ? "1" : "0"); + this->to_remote_peer(AtomDBProxy::DELETE_ATOMS, args); + } +} + // ------------------------------------------------------------------------------------------------- // Server-side API @@ -130,7 +140,7 @@ bool AtomDBProxy::from_remote_peer(const string& command, const vector& if (this->is_processing_buffer) { this->enqueue_request(args); } else { - this->handle_add_atoms(args); + this->add_atoms_callback(args); } return true; } else if (command == AtomDBProxy::START_STREAM) { @@ -141,13 +151,16 @@ bool AtomDBProxy::from_remote_peer(const string& command, const vector& LOG_INFO("Ending atom stream."); this->is_processing_buffer = false; return true; + } else if (command == AtomDBProxy::DELETE_ATOMS) { + this->delete_atoms_callback(args); + return true; } else { Utils::error("Invalid AtomDBProxy command: <" + command + ">"); return false; } } -void AtomDBProxy::handle_add_atoms(const vector& tokens) { +void AtomDBProxy::add_atoms_callback(const vector& tokens) { try { vector> atoms = build_atoms_from_tokens>(tokens, shared_ptr_atom_factory); @@ -161,6 +174,20 @@ void AtomDBProxy::handle_add_atoms(const vector& tokens) { } } +void AtomDBProxy::delete_atoms_callback(const vector& args) { + try { + if (args.size() < 1) { + Utils::error("No handles provided for deletion"); + } + vector handles(args.begin(), args.end() - 1); + bool delete_link_targets = args.back() == "1"; + uint deleted_count = this->atomdb->delete_atoms(handles, delete_link_targets); + LOG_INFO("Deleted " << deleted_count << " atoms"); + } catch (const exception& e) { + LOG_ERROR("Error processing delete_atoms command: " << e.what()); + } +} + void AtomDBProxy::enqueue_request(const vector& tokens) { auto atoms = build_atoms_from_tokens(tokens, raw_ptr_atom_factory); for (const auto& atom : atoms) { @@ -198,9 +225,4 @@ void AtomDBProxy::process_atom_batches() { } Utils::sleep(); } -} - -void AtomDBProxy::set_stream(const string& command, const vector& args) { - Utils::retry_function( - [&]() { this->to_remote_peer(command, args); }, 5, 2000, "AtomDBProxy::end_stream"); } \ No newline at end of file diff --git a/src/agents/atomdb_broker/AtomDBProxy.h b/src/agents/atomdb_broker/AtomDBProxy.h index 5d0d67ed..d6de053c 100644 --- a/src/agents/atomdb_broker/AtomDBProxy.h +++ b/src/agents/atomdb_broker/AtomDBProxy.h @@ -32,6 +32,7 @@ class AtomDBProxy : public BaseProxy { static int THREAD_POOL_SIZE; static size_t MAX_PENDING_ATOMS; static string ADD_ATOMS; + static string DELETE_ATOMS; static string START_STREAM; static string END_STREAM; static string NODE; @@ -76,8 +77,25 @@ class AtomDBProxy : public BaseProxy { * @return Vector of handles corresponding to each atom. */ vector add_atoms(const vector& atoms, bool use_streaming = false); - + /** + * @brief Send atoms to the remote peer and return their local handles. + * + * This serializes each atom into the RPC arguments and issues an + * ADD_ATOMS command to the remote peer. The returned vector contains the + * handle of each atom in the same order as the input. + * + * @param tokens Vector of tokenized atoms to be sent. + * @return Vector of handles corresponding to each atom. + */ vector add_atoms(const vector& tokens, bool use_streaming = false); + /** + * @brief Send a DELETE_ATOMS command to the remote peer. + * + * @param handles Vector of handles of atoms to be deleted. + * @param delete_link_targets Whether to delete link targets. + * @return The number of atoms deleted. + */ + void delete_atoms(const vector& handles, bool delete_link_targets = false); // --------------------------------------------------------------------------------------------- // server-side API @@ -98,14 +116,18 @@ class AtomDBProxy : public BaseProxy { private: /** - * @brief Handle an incoming ADD_ATOMS command from a remote peer. - * - * This will build Atom objects from the tokens and apply them to the - * local AtomDB instance. Any exception during processing will be - * reported back to the peer. + * @brief Callback to handle an incoming ADD_ATOMS command from a remote peer. + * @param args Command arguments (tokenized atoms) + */ + + void add_atoms_callback(const vector& args); + /** + * @brief Callback to handle an incoming DELETE_ATOMS command from a remote peer. + * @param args Command arguments (tokenized atoms), the last argument is delete_link_targets + * flag with "1" or "0". */ + void delete_atoms_callback(const vector& args); - void handle_add_atoms(const vector& args); template std::vector build_atoms_from_tokens(const vector& tokens, Factory&& factory) { std::vector atoms; @@ -141,7 +163,6 @@ class AtomDBProxy : public BaseProxy { } void enqueue_request(const vector& tokens); void process_atom_batches(); - void set_stream(const string& command, const vector& args); mutex api_mutex; shared_ptr atomdb; diff --git a/src/service_bus/BusCommandProxy.cc b/src/service_bus/BusCommandProxy.cc index a82b83fe..d384435c 100644 --- a/src/service_bus/BusCommandProxy.cc +++ b/src/service_bus/BusCommandProxy.cc @@ -77,7 +77,10 @@ void BusCommandProxy::setup_proxy_node(const string& client_id, const string& se void BusCommandProxy::to_remote_peer(const string& command, const vector& args) { LOG_DEBUG(this->proxy_node->node_id() << " is issuing proxy command <" << command << ">"); - this->proxy_node->to_remote_peer(command, args); + Utils::retry_function([&]() { this->proxy_node->to_remote_peer(command, args); }, + 6, + 500, + "BusCommandProxy::to_remote_peer"); } bool BusCommandProxy::from_remote_peer(const string& command, const vector& args) { diff --git a/src/tests/cpp/atomdb_broker_test.cc b/src/tests/cpp/atomdb_broker_test.cc index da35a1e2..29e98f7a 100644 --- a/src/tests/cpp/atomdb_broker_test.cc +++ b/src/tests/cpp/atomdb_broker_test.cc @@ -29,6 +29,7 @@ class AtomDBTestEnvironment : public ::testing::Environment { void SetUp() override { TestConfig::load_environment(); AtomDBSingleton::init(); + ServiceBusSingleton::init("0.0.0.0:52001", "", 52003, 52999); } void TearDown() override {} @@ -36,13 +37,58 @@ class AtomDBTestEnvironment : public ::testing::Environment { class AtomDBTest : public ::testing::Test { protected: + string query_agent_id; + string atomdb_broker_server_id; + string atomdb_broker_client_id; + vector added_atom_handles; + shared_ptr client_bus = nullptr; + shared_ptr query_processor = nullptr; + shared_ptr atomdb_processor = nullptr; + vector created_atoms; + void SetUp() override { auto atomdb = AtomDBSingleton::get_instance(); db = dynamic_pointer_cast(atomdb); ASSERT_NE(db, nullptr) << "Failed to cast AtomDB to RedisMongoDB"; + query_agent_id = "0.0.0.0:52000"; + atomdb_broker_server_id = "0.0.0.0:52001"; + atomdb_broker_client_id = "0.0.0.0:52002"; + shared_ptr service_bus = ServiceBusSingleton::get_instance(); + if (query_processor == nullptr) { + query_processor = make_shared(); + service_bus->register_processor(query_processor); + } + if (atomdb_processor == nullptr) { + atomdb_processor = make_shared(); + service_bus->register_processor(atomdb_processor); + } + if (client_bus == nullptr) { + client_bus = make_shared(atomdb_broker_client_id, atomdb_broker_server_id); + } + } + vector build_atoms(vector& tokens) { + vector atoms; + for (auto& token : tokens) { + if (token.find("NODE") == 0) { + auto parts = Utils::split(token, ' '); + parts.erase(parts.begin()); + atoms.push_back(new Node(parts)); + } else if (token.find("LINK") == 0) { + auto parts = Utils::split(token, ' '); + parts.erase(parts.begin()); + atoms.push_back(new Link(parts)); + } + } + created_atoms.insert(created_atoms.end(), atoms.begin(), atoms.end()); + return atoms; } - void TearDown() override {} + void TearDown() override { + for (auto atom : created_atoms) { + delete atom; + } + created_atoms.clear(); + } string timestamp() { auto now = std::chrono::system_clock::now(); @@ -54,29 +100,58 @@ class AtomDBTest : public ::testing::Test { }; TEST_F(AtomDBTest, AddAtoms) { - string query_agent_id = "0.0.0.0:52000"; - string atomdb_broker_server_id = "0.0.0.0:52001"; - string atomdb_broker_client_id = "0.0.0.0:52002"; + auto proxy = make_shared(); + + client_bus->issue_bus_command(proxy); + + string name1 = timestamp(); + string name2 = timestamp(); + string name3 = timestamp(); + + string node1 = Hasher::node_handle("Symbol", name1); + string node2 = Hasher::node_handle("Symbol", name2); + string node3 = Hasher::node_handle("Symbol", name3); + + vector tokens = {"NODE Symbol false 3 is_literal bool false " + name1, + "NODE Symbol false 3 is_literal bool true " + name2, + "NODE Symbol false 3 is_literal bool true " + name3, + "LINK Expression true 0 3 " + node1 + " " + node2 + " " + node3}; + + auto link = new Link("Expression", {node1, node2, node3}, true); + string link_handle = link->handle(); + + auto atoms = build_atoms(tokens); + + EXPECT_TRUE(atoms[0]->handle() == node1); + EXPECT_TRUE(atoms[1]->handle() == node2); + EXPECT_TRUE(atoms[2]->handle() == node3); + EXPECT_TRUE(atoms[3]->handle() == link_handle); - ServiceBusSingleton::init(query_agent_id, atomdb_broker_server_id, 52003, 52999); + EXPECT_FALSE(db->node_exists(node1)); + EXPECT_FALSE(db->node_exists(node2)); + EXPECT_FALSE(db->node_exists(node3)); + EXPECT_FALSE(db->link_exists(link_handle)); - shared_ptr service_bus = ServiceBusSingleton::get_instance(); - service_bus->register_processor(make_shared()); - Utils::sleep(500); + vector handles = proxy->add_atoms(atoms); + added_atom_handles.insert(added_atom_handles.end(), handles.begin(), handles.end()); - shared_ptr atomdb_broker_server_bus = - make_shared(atomdb_broker_server_id, query_agent_id); - Utils::sleep(500); - atomdb_broker_server_bus->register_processor(make_shared()); - Utils::sleep(500); + EXPECT_TRUE(handles[0] == node1); + EXPECT_TRUE(handles[1] == node2); + EXPECT_TRUE(handles[2] == node3); + EXPECT_TRUE(handles[3] == link_handle); - shared_ptr atomdb_broker_client_bus = - make_shared(atomdb_broker_client_id, atomdb_broker_server_id); - Utils::sleep(500); + Utils::sleep(2000); + + EXPECT_TRUE(db->node_exists(node1)); + EXPECT_TRUE(db->node_exists(node2)); + EXPECT_TRUE(db->node_exists(node3)); + EXPECT_TRUE(db->link_exists(link_handle)); +} +TEST_F(AtomDBTest, AddAtomsStreaming) { auto proxy = make_shared(); - atomdb_broker_client_bus->issue_bus_command(proxy); + client_bus->issue_bus_command(proxy); string name1 = timestamp(); string name2 = timestamp(); @@ -94,18 +169,7 @@ TEST_F(AtomDBTest, AddAtoms) { auto link = new Link("Expression", {node1, node2, node3}, true); string link_handle = link->handle(); - vector atoms; - for (auto& token : tokens) { - if (token.find("NODE") == 0) { - auto parts = Utils::split(token, ' '); - parts.erase(parts.begin()); - atoms.push_back(new Node(parts)); - } else if (token.find("LINK") == 0) { - auto parts = Utils::split(token, ' '); - parts.erase(parts.begin()); - atoms.push_back(new Link(parts)); - } - } + auto atoms = build_atoms(tokens); EXPECT_TRUE(atoms[0]->handle() == node1); EXPECT_TRUE(atoms[1]->handle() == node2); @@ -117,7 +181,8 @@ TEST_F(AtomDBTest, AddAtoms) { EXPECT_FALSE(db->node_exists(node3)); EXPECT_FALSE(db->link_exists(link_handle)); - vector handles = proxy->add_atoms(atoms); + vector handles = proxy->add_atoms(atoms, true); + added_atom_handles.insert(added_atom_handles.end(), handles.begin(), handles.end()); EXPECT_TRUE(handles[0] == node1); EXPECT_TRUE(handles[1] == node2); @@ -132,28 +197,24 @@ TEST_F(AtomDBTest, AddAtoms) { EXPECT_TRUE(db->link_exists(link_handle)); } -TEST_F(AtomDBTest, AddAtomsStreaming) { - string query_agent_id = "0.0.0.0:52000"; - string atomdb_broker_server_id = "0.0.0.0:52001"; - string atomdb_broker_client_id = "0.0.0.0:52002"; +TEST_F(AtomDBTest, DeleteAtoms) { + auto proxy = make_shared(); - shared_ptr service_bus = ServiceBusSingleton::get_instance(); - service_bus->register_processor(make_shared()); - Utils::sleep(500); + this->client_bus->issue_bus_command(proxy); - shared_ptr atomdb_broker_server_bus = - make_shared(atomdb_broker_server_id, query_agent_id); - Utils::sleep(500); - atomdb_broker_server_bus->register_processor(make_shared()); - Utils::sleep(500); + proxy->delete_atoms(added_atom_handles, false); - shared_ptr atomdb_broker_client_bus = - make_shared(atomdb_broker_client_id, atomdb_broker_server_id); - Utils::sleep(500); + Utils::sleep(2000); + + for (const auto& handle : added_atom_handles) { + EXPECT_FALSE(db->node_exists(handle)); + } +} +TEST_F(AtomDBTest, DeleteLinkTargetsTop) { auto proxy = make_shared(); - atomdb_broker_client_bus->issue_bus_command(proxy); + client_bus->issue_bus_command(proxy); string name1 = timestamp(); string name2 = timestamp(); @@ -171,18 +232,7 @@ TEST_F(AtomDBTest, AddAtomsStreaming) { auto link = new Link("Expression", {node1, node2, node3}, true); string link_handle = link->handle(); - vector atoms; - for (auto& token : tokens) { - if (token.find("NODE") == 0) { - auto parts = Utils::split(token, ' '); - parts.erase(parts.begin()); - atoms.push_back(new Node(parts)); - } else if (token.find("LINK") == 0) { - auto parts = Utils::split(token, ' '); - parts.erase(parts.begin()); - atoms.push_back(new Link(parts)); - } - } + auto atoms = build_atoms(tokens); EXPECT_TRUE(atoms[0]->handle() == node1); EXPECT_TRUE(atoms[1]->handle() == node2); @@ -194,19 +244,83 @@ TEST_F(AtomDBTest, AddAtomsStreaming) { EXPECT_FALSE(db->node_exists(node3)); EXPECT_FALSE(db->link_exists(link_handle)); - vector handles = proxy->add_atoms(atoms, true); + vector handles = proxy->add_atoms(atoms); - EXPECT_TRUE(handles[0] == node1); - EXPECT_TRUE(handles[1] == node2); - EXPECT_TRUE(handles[2] == node3); - EXPECT_TRUE(handles[3] == link_handle); + Utils::sleep(2000); + + EXPECT_TRUE(db->node_exists(node1)); + EXPECT_TRUE(db->node_exists(node2)); + EXPECT_TRUE(db->node_exists(node3)); + EXPECT_TRUE(db->link_exists(link_handle)); + + proxy->delete_atoms({link_handle}, true); + + Utils::sleep(2000); + EXPECT_FALSE(db->link_exists(link_handle)); + EXPECT_FALSE(db->node_exists(node1)); + EXPECT_FALSE(db->node_exists(node2)); + EXPECT_FALSE(db->node_exists(node3)); +} + +TEST_F(AtomDBTest, DeleteLinkTargetsBottom) { + auto proxy = make_shared(); + + client_bus->issue_bus_command(proxy); + + string name1 = timestamp(); + Utils::sleep(); + string name2 = timestamp(); + Utils::sleep(); + string name3 = timestamp(); + Utils::sleep(); + string name4 = timestamp(); + + string node1 = Hasher::node_handle("Symbol", name1); + string node2 = Hasher::node_handle("Symbol", name2); + string node3 = Hasher::node_handle("Symbol", name3); + string node4 = Hasher::node_handle("Symbol", name4); + + vector tokens = {"NODE Symbol false 3 is_literal bool false " + name1, + "NODE Symbol false 3 is_literal bool true " + name2, + "NODE Symbol false 3 is_literal bool true " + name3, + "NODE Symbol false 3 is_literal bool true " + name4, + "LINK Expression true 0 3 " + node1 + " " + node2 + " " + node3}; + + auto link = new Link("Expression", {node1, node2, node3}, true); + string link_handle = link->handle(); + + auto atoms = build_atoms(tokens); + + EXPECT_TRUE(atoms[0]->handle() == node1); + EXPECT_TRUE(atoms[1]->handle() == node2); + EXPECT_TRUE(atoms[2]->handle() == node3); + EXPECT_TRUE(atoms[3]->handle() == node4); + EXPECT_TRUE(atoms[4]->handle() == link_handle); + + EXPECT_FALSE(db->node_exists(node1)); + EXPECT_FALSE(db->node_exists(node2)); + EXPECT_FALSE(db->node_exists(node3)); + EXPECT_FALSE(db->node_exists(node4)); + EXPECT_FALSE(db->link_exists(link_handle)); + + vector handles = proxy->add_atoms(atoms); Utils::sleep(2000); EXPECT_TRUE(db->node_exists(node1)); EXPECT_TRUE(db->node_exists(node2)); EXPECT_TRUE(db->node_exists(node3)); + EXPECT_TRUE(db->node_exists(node4)); EXPECT_TRUE(db->link_exists(link_handle)); + + proxy->delete_atoms({node3}, true); + + Utils::sleep(2000); + EXPECT_FALSE(db->link_exists(link_handle)); + EXPECT_FALSE(db->node_exists(node1)); + EXPECT_FALSE(db->node_exists(node2)); + EXPECT_FALSE(db->node_exists(node3)); + EXPECT_TRUE(db->node_exists(node4)); } int main(int argc, char** argv) {