Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/atomdb/redis_mongodb/RedisContextPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ shared_ptr<RedisContext> RedisContextPool::acquire() {
void RedisContextPool::release(shared_ptr<RedisContext> ctx) {
if (!ctx) return;

// Ping the context to check if it is still alive
redisReply* reply = ctx->execute("PING");

bool is_alive = false;
if (reply != NULL) {
if (reply->type == REDIS_REPLY_STATUS) is_alive = true;
freeReplyObject(reply);
}

if (!is_alive) {
Utils::error("Redis context is not alive. Not returning to pool.", false);
return;
}

unique_lock<mutex> lock(this->pool_mutex);
pool.push(ctx);
total_contexts--;
Expand Down
333 changes: 259 additions & 74 deletions src/main/tests_db_loader.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
#include <signal.h>

#include <algorithm>
#include <atomic>
#include <fstream>
#include <iostream>
#include <string>
#include <thread>
#include <vector>

#include "AtomDBSingleton.h"
#include "MettaParser.h"
#include "MettaParserActions.h"
#include "TestConfig.h"

#define LOG_LEVEL DEBUG_LEVEL
Expand All @@ -12,6 +19,8 @@

using namespace std;
using namespace atomdb;
using namespace metta;
using namespace atoms;

void ctrl_c_handler(int) {
cout << "Stopping tests_db_loader..." << endl;
Expand All @@ -23,10 +32,14 @@ int main(int argc, char* argv[]) {
TestConfig::load_environment();
TestConfig::set_atomdb_cache(false);

// OPTIONS="context atomdb_type num_links arity chunck_size"
// OPTIONS="context atomdb_type num_threads num_links arity chunck_size"
//
// make run-tests-db-loader OPTIONS="test_1m_ redismongodb 1000000 3 5000"
// make run-tests-db-loader OPTIONS="test_1m_ morkdb 1000000 3 5000"
// make run-tests-db-loader OPTIONS="test_1m_ redismongodb 8 1000000 3 5000"
// make run-tests-db-loader OPTIONS="test_1m_ morkdb 8 1000000 3 5000"
// make run-tests-db-loader OPTIONS="test_1m_ redismongodb 8 1000000 3 5000"

// From file
// make run-tests-db-loader OPTIONS="test_1m_ redismongodb 8 file /path/to/file.metta 5000"

string context = "";
if (argc > 1) context = string(argv[1]);
Expand All @@ -45,90 +58,262 @@ int main(int argc, char* argv[]) {
signal(SIGINT, &ctrl_c_handler);
signal(SIGTERM, &ctrl_c_handler);

if (argc > 5) {
if (argc > 4) {
auto atomdb = AtomDBSingleton::get_instance();
auto db = dynamic_pointer_cast<RedisMongoDB>(atomdb);

LOG_INFO("Dropping all databases...");
db->drop_all();
LOG_INFO("Databases dropped");

int num_links = Utils::string_to_int(argv[3]);
int arity = Utils::string_to_int(argv[4]);
int chunck_size = Utils::string_to_int(argv[5]);

vector<Link*> links;
vector<Node*> nodes;

STOP_WATCH_START(tests_db_loader);

for (int i = 1; i <= num_links; i++) {
string metta_expression = "(";
vector<string> node_names;
vector<string> targets;
for (int j = 1; j <= arity; j++) {
string name = "add-links-" + to_string(i) + "-" + to_string(j);
auto node = new Node("Symbol", name);
targets.push_back(node->handle());
nodes.push_back(node);
node_names.push_back(name);
metta_expression += name;
if (j != arity) {
metta_expression += " ";
int num_threads = Utils::string_to_int(argv[3]);

string arg4 = string(argv[4]);
vector<string> file_keywords = {"file", "--file", "FILE"};
bool is_file_mode =
find(file_keywords.begin(), file_keywords.end(), arg4) != file_keywords.end();

if (is_file_mode) {
STOP_WATCH_START(tests_db_loader_from_file);

if (argc < 6) {
Utils::error(
"File mode requires file path as argument. Usage: context atomdb_type num_threads "
"file "
"<file_path> [chunk_size]");
return 1;
}

string file_path = argv[5];
int chunk_size = 1000; // Batch size for adding atoms to DB
if (argc > 6) {
chunk_size = Utils::string_to_int(argv[6]);
}

// First, read all lines from the file to determine line ranges for each thread
ifstream file(file_path);
if (!file.is_open()) {
Utils::error("Failed to open file: " + file_path);
return 1;
}

// Read all lines first to determine line ranges for each thread
vector<string> lines;
string line;
while (getline(file, line)) {
if (!line.empty()) {
lines.push_back(line);
}
}
metta_expression += ")";

auto link = new Link("Expression", targets, true, {}, metta_expression);
links.push_back(link);

vector<string> nested_targets = {targets[0], targets[1], link->handle()};
string nested_metta_expression =
"(" + node_names[0] + " " + node_names[1] + " " + metta_expression + ")";
auto link_with_nested =
new Link("Expression", nested_targets, true, {}, nested_metta_expression);
links.push_back(link_with_nested);

if (i % chunck_size == 0) {
LOG_DEBUG("Adding " + to_string(nodes.size()) + " nodes");
db->add_nodes(nodes, false, true);
LOG_DEBUG("Adding " + to_string(links.size()) + " links");
db->add_links(links, false, true);
nodes.clear();
links.clear();
file.close();

if (lines.empty()) {
LOG_INFO("File is empty or contains no valid lines");
return 0;
}
}

if (!nodes.empty()) {
LOG_DEBUG("Final - Adding " + to_string(nodes.size()) + " nodes");
db->add_nodes(nodes, false, true);
}
if (!links.empty()) {
LOG_DEBUG("Final - Adding " + to_string(links.size()) + " links");
db->add_links(links, false, true);
}
// Calculate lines per thread
size_t lines_per_thread = lines.size() / num_threads;
size_t remainder = lines.size() % num_threads;

STOP_WATCH_FINISH(tests_db_loader, "TestsDBLoader");

// clang-format off
LinkSchema link_schema({
"LINK_TEMPLATE", "Expression", "3",
"NODE", "Symbol", "add-links-1-1",
"NODE", "Symbol", "add-links-1-2",
"VARIABLE", "V"
});
// clang-format on

auto result = db->query_for_pattern(link_schema);
if (result->size() != 2) {
Utils::error("Expected 2 results, got " + to_string(result->size()));
return 1;
}
LOG_INFO("Processing " + to_string(lines.size()) + " lines with " + to_string(num_threads) +
" threads (chunk size: " + to_string(chunk_size) + " | lines per thread: " +
to_string(lines_per_thread) + " | remainder: " + to_string(remainder) + ")");

vector<thread> threads;
atomic<size_t> total_atoms_processed(0);

for (int i = 0; i < num_threads; i++) {
size_t start_line = i * lines_per_thread;
size_t end_line = start_line + lines_per_thread;

// Remainder must be added to the last thread
if (i == num_threads - 1) {
end_line++;
}

threads.emplace_back([&, start_line, end_line, i]() -> void {
auto thread_atomdb = AtomDBSingleton::get_instance();
vector<Atom*> batch_atoms;
vector<shared_ptr<MettaParserActions>>
parser_actions_list; // Keep parser_actions alive
size_t thread_atoms_count = 0;
set<string> handles; // Avoid deduplication

for (size_t line_idx = start_line; line_idx < end_line; line_idx++) {
const string& metta_expr = lines[line_idx];

try {
// Create parser actions for this line
shared_ptr<MettaParserActions> parser_actions =
make_shared<MettaParserActions>();

// Parse the S-Expression
MettaParser parser(metta_expr, parser_actions);
bool parse_error = parser.parse(false); // Don't throw on error

if (parse_error) {
LOG_ERROR("Thread " + to_string(i) + " failed to parse line " +
to_string(line_idx) + ": " + metta_expr);
continue;
}

// Keep parser_actions alive by storing it
parser_actions_list.push_back(parser_actions);

// Convert shared_ptr<Atom> to Atom* for add_atoms
for (const auto& pair : parser_actions->handle_to_atom) {
// Deduplicate atoms by handle within this thread
if (handles.find(pair.second->handle()) != handles.end()) {
continue;
}
handles.insert(pair.second->handle());
batch_atoms.push_back(pair.second.get());
thread_atoms_count++;

// Add atoms in batches
if (batch_atoms.size() >= static_cast<size_t>(chunk_size)) {
thread_atomdb->add_atoms(batch_atoms, false, true);
batch_atoms.clear();
// Clear old parser_actions that are no longer needed
// Keep only the last few to avoid memory buildup
if (parser_actions_list.size() > 10) {
parser_actions_list.erase(parser_actions_list.begin(),
parser_actions_list.end() - 5);
}
}
}
} catch (const exception& e) {
LOG_ERROR("Thread " + to_string(i) + " exception on line " +
to_string(line_idx) + ": " + string(e.what()));
}
}

// Add remaining atoms
if (!batch_atoms.empty()) {
thread_atomdb->add_atoms(batch_atoms, false, true);
}

total_atoms_processed += thread_atoms_count;
LOG_DEBUG("Thread " + to_string(i) + " processed " + to_string(thread_atoms_count) +
" atoms " + "(lines " + to_string(start_line) + "-" +
to_string(end_line - 1) + ")");
});
}

// Wait for all threads to complete
for (auto& thread : threads) {
thread.join();
}

LOG_INFO("Completed processing. Total atoms processed: " +
to_string(total_atoms_processed.load()));

STOP_WATCH_FINISH(tests_db_loader_from_file, "TestsDBLoaderFromFile");

} else {
int num_links = Utils::string_to_int(argv[4]);
int arity = Utils::string_to_int(argv[5]);
int chunck_size = Utils::string_to_int(argv[6]);

int links_per_thread = num_links / num_threads;
int remainder = num_links % num_threads;

vector<thread> threads;

STOP_WATCH_START(tests_db_loader);

for (int i = 0; i < num_threads; i++) {
threads.emplace_back([&, thread_id = i, links_per_thread, remainder]() -> void {
auto db = AtomDBSingleton::get_instance();

vector<Link*> links;
vector<Node*> nodes;

int links_to_add = links_per_thread;
if (thread_id == num_threads - 1) {
links_to_add += remainder;
}

LOG_DEBUG("[" + to_string(thread_id) + "] Adding " + to_string(links_to_add * 2) +
" links and " + to_string(links_to_add * arity) + " nodes");

for (int j = 1; j <= links_to_add; j++) {
string metta_expression = "(";
vector<string> node_names;
vector<string> targets;
for (int k = 1; k <= arity; k++) {
string name = "add-links-T" + to_string(thread_id) + "-" + to_string(j) +
"-" + to_string(k);
auto node = new Node("Symbol", name);
targets.push_back(node->handle());
nodes.push_back(node);
node_names.push_back(name);
metta_expression += name;
if (k != arity) {
metta_expression += " ";
}
}
metta_expression += ")";

auto link = new Link("Expression", targets, true, {}, metta_expression);
links.push_back(link);

vector<string> nested_targets = {targets[0], targets[1], link->handle()};
string nested_metta_expression =
"(" + node_names[0] + " " + node_names[1] + " " + metta_expression + ")";
auto link_with_nested =
new Link("Expression", nested_targets, true, {}, nested_metta_expression);
links.push_back(link_with_nested);

if (j % chunck_size == 0) {
db->add_nodes(nodes, false, true);
db->add_links(links, false, true);
nodes.clear();
links.clear();
}
}

if (!nodes.empty()) {
LOG_DEBUG("[" + to_string(thread_id) + "] Final - Adding " +
to_string(nodes.size()) + " nodes");
db->add_nodes(nodes, false, true);
}
if (!links.empty()) {
LOG_DEBUG("[" + to_string(thread_id) + "] Final - Adding " +
to_string(links.size()) + " links");
db->add_links(links, false, true);
}

// clang-format off
LinkSchema link_schema({
"LINK_TEMPLATE", "Expression", "3",
"NODE", "Symbol", "add-links-T" + to_string(thread_id) + "-1-1",
"NODE", "Symbol", "add-links-T" + to_string(thread_id) + "-1-2",
"VARIABLE", "V"
});
// clang-format on

auto result = db->query_for_pattern(link_schema);
if (result->size() != 2) {
Utils::error("[" + to_string(thread_id) + "] Expected 2 results, got " +
to_string(result->size()));
return;
}

auto iterator = result->get_iterator();
char* handle;
while ((handle = iterator->next()) != nullptr) {
LOG_DEBUG("[" + to_string(thread_id) + "] Match: " + string(handle));
}
});
}

for (auto& thread : threads) {
thread.join();
}

auto iterator = result->get_iterator();
char* handle;
while ((handle = iterator->next()) != nullptr) {
LOG_DEBUG("Match: " + string(handle));
STOP_WATCH_FINISH(tests_db_loader, "TestsDBLoader");
}
} else {
LOG_INFO("Starting tests_db_loader (context: '" + context + "')");
Expand Down
Loading