diff --git a/src/atomdb/redis_mongodb/RedisContext.cc b/src/atomdb/redis_mongodb/RedisContext.cc index c2b9aa00..570d8934 100644 --- a/src/atomdb/redis_mongodb/RedisContext.cc +++ b/src/atomdb/redis_mongodb/RedisContext.cc @@ -39,6 +39,13 @@ redisReply* RedisContext::execute(const char* command) { } } +bool RedisContext::ping() { + redisReply* reply = this->execute("PING"); + if (reply == NULL) return false; + if (reply->type != REDIS_REPLY_STATUS) return false; + return string(reply->str) == string("PONG"); +} + void RedisContext::append_command(const char* command) { if (cluster_flag) { if (redisClusterAppendCommand(cluster_ctx, command) == REDIS_OK) { diff --git a/src/atomdb/redis_mongodb/RedisContext.h b/src/atomdb/redis_mongodb/RedisContext.h index c16add57..9541ea50 100644 --- a/src/atomdb/redis_mongodb/RedisContext.h +++ b/src/atomdb/redis_mongodb/RedisContext.h @@ -17,6 +17,7 @@ class RedisContext { void set_context(redisClusterContext* ctx); redisReply* execute(const char* command); + bool ping(); void append_command(const char* command); void flush_commands(); bool has_error() const; diff --git a/src/atomdb/redis_mongodb/RedisContextPool.cc b/src/atomdb/redis_mongodb/RedisContextPool.cc index 8463eab9..fbbc6945 100644 --- a/src/atomdb/redis_mongodb/RedisContextPool.cc +++ b/src/atomdb/redis_mongodb/RedisContextPool.cc @@ -32,7 +32,13 @@ shared_ptr RedisContextPool::acquire() { if (!pool.empty()) { auto ctx = pool.front(); pool.pop(); - return ctx; + if (ctx->ping()) { + LOG_DEBUG("Context is alive, returning to the caller."); + return shared_ptr(ctx.get(), + [this, ctx](RedisContext* ptr) { this->release(ctx); }); + } + LOG_DEBUG("Context is not alive, creating a new one."); + total_contexts--; } // Create a new context if the pool is empty @@ -83,9 +89,9 @@ shared_ptr RedisContextPool::acquire() { void RedisContextPool::release(shared_ptr ctx) { if (!ctx) return; - unique_lock lock(this->pool_mutex); pool.push(ctx); - total_contexts--; - pool_cond_var.notify_one(); + LOG_DEBUG("Context added to the pool (size: " << pool.size() + << ", contexts: " << to_string(total_contexts) << ")"); + pool_cond_var.notify_all(); } diff --git a/src/atomdb/redis_mongodb/RedisMongoDB.cc b/src/atomdb/redis_mongodb/RedisMongoDB.cc index da3bf3ae..a03dfea8 100644 --- a/src/atomdb/redis_mongodb/RedisMongoDB.cc +++ b/src/atomdb/redis_mongodb/RedisMongoDB.cc @@ -375,8 +375,15 @@ uint RedisMongoDB::get_next_score(const string& key) { void RedisMongoDB::set_next_score(const string& key, uint score) { if (SKIP_REDIS) return; - auto ctx = this->redis_pool->acquire(); + set_next_score_with_context(ctx, key, score); +} + +void RedisMongoDB::set_next_score_with_context(shared_ptr ctx, + const string& key, + uint score) { + if (SKIP_REDIS) return; + string command = "SET " + key + " " + to_string(score); redisReply* reply = ctx->execute(command.c_str()); if (reply == NULL) Utils::error("Redis error at set_next_score: <" + command + ">"); @@ -1012,8 +1019,11 @@ vector RedisMongoDB::add_links(const vector& links, LOG_DEBUG("Setting next scores: patterns=" + to_string(this->patterns_next_score.load()) + ", incoming=" + to_string(this->incoming_set_next_score.load())); - set_next_score(REDIS_PATTERNS_PREFIX + ":next_score", this->patterns_next_score.load()); - set_next_score(REDIS_INCOMING_PREFIX + ":next_score", this->incoming_set_next_score.load()); + + set_next_score_with_context( + ctx, REDIS_PATTERNS_PREFIX + ":next_score", this->patterns_next_score.load()); + set_next_score_with_context( + ctx, REDIS_INCOMING_PREFIX + ":next_score", this->incoming_set_next_score.load()); if (is_transactional) { lock_guard composite_type_hashes_map_lock(this->composite_type_hashes_map_mutex); diff --git a/src/atomdb/redis_mongodb/RedisMongoDB.h b/src/atomdb/redis_mongodb/RedisMongoDB.h index 03af54d1..8c12d723 100644 --- a/src/atomdb/redis_mongodb/RedisMongoDB.h +++ b/src/atomdb/redis_mongodb/RedisMongoDB.h @@ -170,6 +170,7 @@ class RedisMongoDB : public AtomDB { uint get_next_score(const string& key); void set_next_score(const string& key, uint score); + void set_next_score_with_context(shared_ptr ctx, const string& key, uint score); void reset_scores(); void add_pattern(const string& handle, const string& pattern_handle); diff --git a/src/main/tests_db_loader.cc b/src/main/tests_db_loader.cc index eaa094ce..cf8b7336 100644 --- a/src/main/tests_db_loader.cc +++ b/src/main/tests_db_loader.cc @@ -1,9 +1,16 @@ #include +#include +#include +#include #include #include +#include +#include #include "AtomDBSingleton.h" +#include "MettaParser.h" +#include "MettaParserActions.h" #include "TestConfig.h" #define LOG_LEVEL DEBUG_LEVEL @@ -12,6 +19,8 @@ using namespace std; using namespace atomdb; +using namespace metta; +using namespace atoms; void ctrl_c_handler(int) { cout << "Stopping tests_db_loader..." << endl; @@ -23,10 +32,14 @@ int main(int argc, char* argv[]) { TestConfig::load_environment(); TestConfig::set_atomdb_cache(false); - // OPTIONS="context atomdb_type num_links arity chunck_size" + // OPTIONS="context atomdb_type num_threads num_links arity chunck_size" // - // make run-tests-db-loader OPTIONS="test_1m_ redismongodb 1000000 3 5000" - // make run-tests-db-loader OPTIONS="test_1m_ morkdb 1000000 3 5000" + // make run-tests-db-loader OPTIONS="test_1m_ redismongodb 8 1000000 3 5000" + // make run-tests-db-loader OPTIONS="test_1m_ morkdb 8 1000000 3 5000" + // make run-tests-db-loader OPTIONS="test_1m_ redismongodb 8 1000000 3 5000" + + // From file + // make run-tests-db-loader OPTIONS="test_1m_ redismongodb 8 file /path/to/file.metta 5000" string context = ""; if (argc > 1) context = string(argv[1]); @@ -45,7 +58,7 @@ int main(int argc, char* argv[]) { signal(SIGINT, &ctrl_c_handler); signal(SIGTERM, &ctrl_c_handler); - if (argc > 5) { + if (argc > 4) { auto atomdb = AtomDBSingleton::get_instance(); auto db = dynamic_pointer_cast(atomdb); @@ -53,82 +66,254 @@ int main(int argc, char* argv[]) { db->drop_all(); LOG_INFO("Databases dropped"); - int num_links = Utils::string_to_int(argv[3]); - int arity = Utils::string_to_int(argv[4]); - int chunck_size = Utils::string_to_int(argv[5]); - - vector links; - vector nodes; - - STOP_WATCH_START(tests_db_loader); - - for (int i = 1; i <= num_links; i++) { - string metta_expression = "("; - vector node_names; - vector targets; - for (int j = 1; j <= arity; j++) { - string name = "add-links-" + to_string(i) + "-" + to_string(j); - auto node = new Node("Symbol", name); - targets.push_back(node->handle()); - nodes.push_back(node); - node_names.push_back(name); - metta_expression += name; - if (j != arity) { - metta_expression += " "; + int num_threads = Utils::string_to_int(argv[3]); + + string arg4 = string(argv[4]); + vector file_keywords = {"file", "--file", "FILE"}; + bool is_file_mode = + find(file_keywords.begin(), file_keywords.end(), arg4) != file_keywords.end(); + + if (is_file_mode) { + STOP_WATCH_START(tests_db_loader_from_file); + + if (argc < 6) { + Utils::error( + "File mode requires file path as argument. Usage: context atomdb_type num_threads " + "file " + " [chunk_size]"); + return 1; + } + + string file_path = argv[5]; + int chunk_size = 1000; // Batch size for adding atoms to DB + if (argc > 6) { + chunk_size = Utils::string_to_int(argv[6]); + } + + // First, read all lines from the file to determine line ranges for each thread + ifstream file(file_path); + if (!file.is_open()) { + Utils::error("Failed to open file: " + file_path); + return 1; + } + + // Read all lines first to determine line ranges for each thread + vector lines; + string line; + while (getline(file, line)) { + if (!line.empty()) { + lines.push_back(line); } } - metta_expression += ")"; - - auto link = new Link("Expression", targets, true, {}, metta_expression); - links.push_back(link); - - vector nested_targets = {targets[0], targets[1], link->handle()}; - string nested_metta_expression = - "(" + node_names[0] + " " + node_names[1] + " " + metta_expression + ")"; - auto link_with_nested = - new Link("Expression", nested_targets, true, {}, nested_metta_expression); - links.push_back(link_with_nested); - - if (i % chunck_size == 0) { - LOG_DEBUG("Adding " + to_string(nodes.size()) + " nodes"); - db->add_nodes(nodes, false, true); - LOG_DEBUG("Adding " + to_string(links.size()) + " links"); - db->add_links(links, false, true); - nodes.clear(); - links.clear(); + file.close(); + + if (lines.empty()) { + LOG_INFO("File is empty or contains no valid lines"); + return 0; } - } - if (!nodes.empty()) { - LOG_DEBUG("Final - Adding " + to_string(nodes.size()) + " nodes"); - db->add_nodes(nodes, false, true); - } - if (!links.empty()) { - LOG_DEBUG("Final - Adding " + to_string(links.size()) + " links"); - db->add_links(links, false, true); - } + // Calculate lines per thread + size_t lines_per_thread = lines.size() / num_threads; + size_t remainder = lines.size() % num_threads; - STOP_WATCH_FINISH(tests_db_loader, "TestsDBLoader"); - - // clang-format off - LinkSchema link_schema({ - "LINK_TEMPLATE", "Expression", "3", - "NODE", "Symbol", "add-links-1-1", - "NODE", "Symbol", "add-links-1-2", - "VARIABLE", "V" - }); - // clang-format on - - auto result = db->query_for_pattern(link_schema); - if (result->size() != 2) { - Utils::error("Expected 2 results, got " + to_string(result->size())); - return 1; - } + LOG_INFO("Processing " + to_string(lines.size()) + " lines with " + to_string(num_threads) + + " threads (chunk size: " + to_string(chunk_size) + " | lines per thread: " + + to_string(lines_per_thread) + " | remainder: " + to_string(remainder) + ")"); + + vector threads; + atomic total_atoms_processed(0); + + for (int i = 0; i < num_threads; i++) { + size_t start_line = i * lines_per_thread; + size_t end_line = start_line + lines_per_thread; + + // Remainder must be added to the last thread + if (i == num_threads - 1) { + end_line++; + } + + threads.emplace_back([&, start_line, end_line, i]() -> void { + auto thread_atomdb = AtomDBSingleton::get_instance(); + vector batch_atoms; + vector> + parser_actions_list; // Keep parser_actions alive + size_t thread_atoms_count = 0; + set handles; // Avoid deduplication + + for (size_t line_idx = start_line; line_idx < end_line; line_idx++) { + const string& metta_expr = lines[line_idx]; + + try { + // Create parser actions for this line + shared_ptr parser_actions = + make_shared(); + + // Parse the S-Expression + MettaParser parser(metta_expr, parser_actions); + bool parse_error = parser.parse(false); // Don't throw on error + + if (parse_error) { + LOG_ERROR("Thread " + to_string(i) + " failed to parse line " + + to_string(line_idx) + ": " + metta_expr); + continue; + } + + // Keep parser_actions alive by storing it + parser_actions_list.push_back(parser_actions); + + // Convert shared_ptr to Atom* for add_atoms + for (const auto& pair : parser_actions->handle_to_atom) { + // Deduplicate atoms by handle within this thread + if (handles.find(pair.second->handle()) != handles.end()) { + continue; + } + handles.insert(pair.second->handle()); + batch_atoms.push_back(pair.second.get()); + thread_atoms_count++; + + // Add atoms in batches + if (batch_atoms.size() >= static_cast(chunk_size)) { + thread_atomdb->add_atoms(batch_atoms, false, true); + batch_atoms.clear(); + // Clear old parser_actions that are no longer needed + // Keep only the last few to avoid memory buildup + if (parser_actions_list.size() > 10) { + parser_actions_list.erase(parser_actions_list.begin(), + parser_actions_list.end() - 5); + } + } + } + } catch (const exception& e) { + LOG_ERROR("Thread " + to_string(i) + " exception on line " + + to_string(line_idx) + ": " + string(e.what())); + } + } + + // Add remaining atoms + if (!batch_atoms.empty()) { + thread_atomdb->add_atoms(batch_atoms, false, true); + } + + total_atoms_processed += thread_atoms_count; + LOG_DEBUG("Thread " + to_string(i) + " processed " + to_string(thread_atoms_count) + + " atoms " + "(lines " + to_string(start_line) + "-" + + to_string(end_line - 1) + ")"); + }); + } + + // Wait for all threads to complete + for (auto& thread : threads) { + thread.join(); + } + + LOG_INFO("Completed processing. Total atoms processed: " + + to_string(total_atoms_processed.load())); + + STOP_WATCH_FINISH(tests_db_loader_from_file, "TestsDBLoaderFromFile"); + + } else { + int num_links = Utils::string_to_int(argv[4]); + int arity = Utils::string_to_int(argv[5]); + int chunck_size = Utils::string_to_int(argv[6]); + + int links_per_thread = num_links / num_threads; + int remainder = num_links % num_threads; + + vector threads; + + STOP_WATCH_START(tests_db_loader); + + for (int i = 0; i < num_threads; i++) { + threads.emplace_back([&, thread_id = i, links_per_thread, remainder]() -> void { + auto db = AtomDBSingleton::get_instance(); + + vector links; + vector nodes; + + int links_to_add = links_per_thread; + if (thread_id == num_threads - 1) { + links_to_add += remainder; + } + + LOG_DEBUG("[" + to_string(thread_id) + "] Adding " + to_string(links_to_add * 2) + + " links and " + to_string(links_to_add * arity) + " nodes"); + + for (int j = 1; j <= links_to_add; j++) { + string metta_expression = "("; + vector node_names; + vector targets; + for (int k = 1; k <= arity; k++) { + string name = "add-links-T" + to_string(thread_id) + "-" + to_string(j) + + "-" + to_string(k); + auto node = new Node("Symbol", name); + targets.push_back(node->handle()); + nodes.push_back(node); + node_names.push_back(name); + metta_expression += name; + if (k != arity) { + metta_expression += " "; + } + } + metta_expression += ")"; + + auto link = new Link("Expression", targets, true, {}, metta_expression); + links.push_back(link); + + vector nested_targets = {targets[0], targets[1], link->handle()}; + string nested_metta_expression = + "(" + node_names[0] + " " + node_names[1] + " " + metta_expression + ")"; + auto link_with_nested = + new Link("Expression", nested_targets, true, {}, nested_metta_expression); + links.push_back(link_with_nested); + + if (j % chunck_size == 0) { + db->add_nodes(nodes, false, true); + db->add_links(links, false, true); + nodes.clear(); + links.clear(); + } + } + + if (!nodes.empty()) { + LOG_DEBUG("[" + to_string(thread_id) + "] Final - Adding " + + to_string(nodes.size()) + " nodes"); + db->add_nodes(nodes, false, true); + } + if (!links.empty()) { + LOG_DEBUG("[" + to_string(thread_id) + "] Final - Adding " + + to_string(links.size()) + " links"); + db->add_links(links, false, true); + } + + // clang-format off + LinkSchema link_schema({ + "LINK_TEMPLATE", "Expression", "3", + "NODE", "Symbol", "add-links-T" + to_string(thread_id) + "-1-1", + "NODE", "Symbol", "add-links-T" + to_string(thread_id) + "-1-2", + "VARIABLE", "V" + }); + // clang-format on + + auto result = db->query_for_pattern(link_schema); + if (result->size() != 2) { + Utils::error("[" + to_string(thread_id) + "] Expected 2 results, got " + + to_string(result->size())); + return; + } + + auto iterator = result->get_iterator(); + char* handle; + while ((handle = iterator->next()) != nullptr) { + LOG_DEBUG("[" + to_string(thread_id) + "] Match: " + string(handle)); + } + }); + } + + for (auto& thread : threads) { + thread.join(); + } - auto iterator = result->get_iterator(); - char* handle; - while ((handle = iterator->next()) != nullptr) { - LOG_DEBUG("Match: " + string(handle)); + STOP_WATCH_FINISH(tests_db_loader, "TestsDBLoader"); } } else { LOG_INFO("Starting tests_db_loader (context: '" + context + "')"); diff --git a/src/scripts/run.sh b/src/scripts/run.sh index 61c5222d..bb5d7e3b 100755 --- a/src/scripts/run.sh +++ b/src/scripts/run.sh @@ -31,6 +31,7 @@ docker run --rm \ --name="${CONTAINER_NAME}" \ --network host \ --volume .:/opt/das \ + --volume /tmp:/tmp \ --workdir /opt/das \ $ENV_VARS \ "${IMAGE_NAME}" \ diff --git a/src/tests/cpp/redis_mongodb_test_2.cc b/src/tests/cpp/redis_mongodb_test_2.cc index 09dacb03..2dcba09d 100644 --- a/src/tests/cpp/redis_mongodb_test_2.cc +++ b/src/tests/cpp/redis_mongodb_test_2.cc @@ -215,11 +215,11 @@ TEST_F(RedisMongoDBTest, MongodbDocumentGetSize) { } TEST_F(RedisMongoDBTest, ConcurrentAddLinks) { - int num_links = 5000; + int num_links = 2000; int arity = 3; - int chunck_size = 500; + int chunck_size = 250; - const int num_threads = 100; + const int num_threads = 120; vector threads; atomic success_count{0};