diff --git a/.github/workflows/run-coverage-check.yml b/.github/workflows/run-coverage-check.yml index d01b5533..4b4d2ff1 100644 --- a/.github/workflows/run-coverage-check.yml +++ b/.github/workflows/run-coverage-check.yml @@ -18,7 +18,7 @@ jobs: DAS_MONGODB_USERNAME: dbadmin DAS_MONGODB_PASSWORD: dassecret DAS_MORK_HOSTNAME: localhost - DAS_MORK_PORT: 8000 + DAS_MORK_PORT: 40022 steps: - name: Checkout uses: actions/checkout@v4 @@ -66,7 +66,7 @@ jobs: run: |- make run-mork-server > /dev/null 2>&1 & - until curl --silent http://localhost:8000/status/-; do + until curl --silent http://localhost:40022/status/-; do echo "Waiting MORK…" sleep 1 done diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index 45e5b57c..725531b7 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -18,7 +18,7 @@ jobs: DAS_MONGODB_USERNAME: admin DAS_MONGODB_PASSWORD: admin DAS_MORK_HOSTNAME: localhost - DAS_MORK_PORT: 8000 + DAS_MORK_PORT: 40022 steps: - name: Checkout uses: actions/checkout@v4 @@ -66,7 +66,7 @@ jobs: run: |- make run-mork-server > /dev/null 2>&1 & - until curl --silent http://localhost:8000/status/-; do + until curl --silent http://localhost:40022/status/-; do echo "Waiting MORK…" sleep 1 done diff --git a/src/atomdb/morkdb/MorkDB.cc b/src/atomdb/morkdb/MorkDB.cc index acb0835b..bd3a01f0 100644 --- a/src/atomdb/morkdb/MorkDB.cc +++ b/src/atomdb/morkdb/MorkDB.cc @@ -192,6 +192,10 @@ vector MorkDB::add_links(const vector& links, bool throw_if_exists, bool is_transactional) { if (links.empty()) { + if (is_transactional) { + lock_guard composite_type_hashes_map_lock(this->composite_type_hashes_map_mutex); + this->composite_type_hashes_map.clear(); + } return {}; } @@ -210,8 +214,11 @@ vector MorkDB::add_links(const vector& links, } map> composite_type_entries_map; + map composite_type_hashes_map_copy; if (is_transactional) { this->build_composite_type_entries_map(links, composite_type_entries_map); + lock_guard composite_type_hashes_map_lock(this->composite_type_hashes_map_mutex); + composite_type_hashes_map_copy = this->composite_type_hashes_map; } else { this->check_existing_targets(links); } @@ -239,7 +246,7 @@ vector MorkDB::add_links(const vector& links, if (is_transactional) { mongodb_doc = make_shared( link, - this->composite_type_hashes_map[link_handle], + composite_type_hashes_map_copy[link_handle], composite_type_entries_map[link_handle]); } else { mongodb_doc = make_shared(link, *this); @@ -258,6 +265,11 @@ vector MorkDB::add_links(const vector& links, this->mork_client->post(metta_expressions, "$x", "$x"); } + if (is_transactional) { + lock_guard composite_type_hashes_map_lock(this->composite_type_hashes_map_mutex); + this->composite_type_hashes_map.clear(); + } + return handles; } diff --git a/src/main/tests_db_loader.cc b/src/main/tests_db_loader.cc index 027201f9..eaa094ce 100644 --- a/src/main/tests_db_loader.cc +++ b/src/main/tests_db_loader.cc @@ -64,12 +64,14 @@ int main(int argc, char* argv[]) { for (int i = 1; i <= num_links; i++) { string metta_expression = "("; + vector node_names; vector targets; for (int j = 1; j <= arity; j++) { string name = "add-links-" + to_string(i) + "-" + to_string(j); auto node = new Node("Symbol", name); targets.push_back(node->handle()); nodes.push_back(node); + node_names.push_back(name); metta_expression += name; if (j != arity) { metta_expression += " "; @@ -82,7 +84,7 @@ int main(int argc, char* argv[]) { vector nested_targets = {targets[0], targets[1], link->handle()}; string nested_metta_expression = - "(" + targets[0] + " " + targets[1] + " " + metta_expression + ")"; + "(" + node_names[0] + " " + node_names[1] + " " + metta_expression + ")"; auto link_with_nested = new Link("Expression", nested_targets, true, {}, nested_metta_expression); links.push_back(link_with_nested); diff --git a/src/scripts/mork_loader.sh b/src/scripts/mork_loader.sh index 5171ccfc..5cb729a0 100755 --- a/src/scripts/mork_loader.sh +++ b/src/scripts/mork_loader.sh @@ -9,14 +9,14 @@ else FILE=$1 fi -IMAGE_NAME="trueagi/das:mork-loader-0.10.2" +IMAGE_NAME="trueagi/das:mork-loader-1.0.0" CONTAINER_NAME="das-mork-loader-$(date +%Y%m%d%H%M%S)" docker run --rm \ --name="${CONTAINER_NAME}" \ --network host \ - --env MORK_SERVER_ADDR=${DAS_MORK_HOSTNAME:-localhost} \ - --env MORK_SERVER_PORT=${DAS_MORK_PORT:-8000} \ + --env MORK_SERVER_ADDR=${DAS_MORK_HOSTNAME:-0.0.0.0} \ + --env MORK_SERVER_PORT=${DAS_MORK_PORT:-40022} \ --volume "${FILE}":/app/file.metta \ --volume `pwd`/src/scripts/mork_loader.py:/app/mork_loader.py \ --workdir /app \ diff --git a/src/scripts/mork_server.sh b/src/scripts/mork_server.sh index b7c29eb9..13a62114 100755 --- a/src/scripts/mork_server.sh +++ b/src/scripts/mork_server.sh @@ -2,12 +2,14 @@ set -eoux pipefail -IMAGE_NAME="trueagi/das:mork-server-0.10.2" +IMAGE_NAME="trueagi/das:mork-server-1.0.0" CONTAINER_NAME="das-mork-server-$(date +%Y%m%d%H%M%S)" docker run --rm \ --name="${CONTAINER_NAME}" \ --network host \ + -e MORK_SERVER_ADDR=0.0.0.0 \ + -e MORK_SERVER_PORT=${DAS_MORK_PORT:-40022} \ "${IMAGE_NAME}" "$@" sleep 1 diff --git a/src/tests/cpp/BUILD b/src/tests/cpp/BUILD index 77f1b769..217d2ed9 100644 --- a/src/tests/cpp/BUILD +++ b/src/tests/cpp/BUILD @@ -662,7 +662,7 @@ cc_test( cc_test( name = "redis_mongodb_test_2", - size = "small", + size = "medium", srcs = [ "redis_mongodb_test_2.cc", "test_utils.cc", @@ -690,7 +690,7 @@ cc_test( cc_test( name = "morkdb_test", - size = "small", + size = "medium", srcs = ["morkdb_test.cc"], copts = [ "-Iexternal/gtest/googletest/include", diff --git a/src/tests/cpp/morkdb_test.cc b/src/tests/cpp/morkdb_test.cc index 6872a9f1..ba2e99f6 100644 --- a/src/tests/cpp/morkdb_test.cc +++ b/src/tests/cpp/morkdb_test.cc @@ -298,6 +298,85 @@ TEST_F(MorkDBTest, AddLinksWithDuplicateTargets) { EXPECT_FALSE(db->delete_link(link->handle(), true)); } +TEST_F(MorkDBTest, ConcurrentAddLinks) { + int num_links = 5000; + int arity = 3; + int chunck_size = 500; + + const int num_threads = 100; + vector threads; + atomic success_count{0}; + + auto worker = [&](int thread_id) { + try { + vector nodes; + vector links; + for (int i = 1; i <= num_links; i++) { + string metta_expression = "("; + vector targets; + vector node_names; + for (int j = 1; j <= arity; j++) { + string name = "ConcurrentAddLinks-" + to_string(thread_id) + "-" + to_string(i) + + "-" + to_string(j); + auto node = new Node("Symbol", name); + targets.push_back(node->handle()); + nodes.push_back(node); + node_names.push_back(name); + metta_expression += name; + if (j != arity) metta_expression += " "; + } + metta_expression += ")"; + + auto link = new Link("Expression", targets, true, {}, metta_expression); + links.push_back(link); + + vector nested_targets = {targets[0], targets[1], link->handle()}; + string nested_metta_expression = + "(" + node_names[0] + " " + node_names[1] + " " + metta_expression + ")"; + auto link_with_nested = + new Link("Expression", nested_targets, true, {}, nested_metta_expression); + links.push_back(link_with_nested); + + if (i % chunck_size == 0) { + db->add_nodes(nodes, false, true); + db->add_links(links, false, true); + nodes.clear(); + links.clear(); + } + } + + if (!nodes.empty()) db->add_nodes(nodes, false, true); + if (!links.empty()) db->add_links(links, false, true); + + success_count++; + } catch (const exception& e) { + cout << "Thread " << thread_id << " failed with error: " << e.what() << endl; + } + }; + + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back(worker, i); + } + + for (auto& t : threads) { + t.join(); + } + + EXPECT_EQ(success_count, num_threads); + + // clang-format off + LinkSchema link_schema({ + "LINK_TEMPLATE", "Expression", "3", + "NODE", "Symbol", "ConcurrentAddLinks-0-1-1", + "NODE", "Symbol", "ConcurrentAddLinks-0-1-2", + "VARIABLE", "V" + }); + // clang-format on + + auto result = db->query_for_pattern(link_schema); + EXPECT_EQ(result->size(), 2); +} + int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); ::testing::AddGlobalTestEnvironment(new MorkDBTestEnvironment()); diff --git a/src/tests/cpp/redis_mongodb_test_2.cc b/src/tests/cpp/redis_mongodb_test_2.cc index bdf80ade..09dacb03 100644 --- a/src/tests/cpp/redis_mongodb_test_2.cc +++ b/src/tests/cpp/redis_mongodb_test_2.cc @@ -214,6 +214,75 @@ TEST_F(RedisMongoDBTest, MongodbDocumentGetSize) { EXPECT_EQ(link4_document->get_size("targets"), 10); } +TEST_F(RedisMongoDBTest, ConcurrentAddLinks) { + int num_links = 5000; + int arity = 3; + int chunck_size = 500; + + const int num_threads = 100; + vector threads; + atomic success_count{0}; + + auto worker = [&](int thread_id) { + try { + vector nodes; + vector links; + for (int i = 1; i <= num_links; i++) { + vector targets; + for (int j = 1; j <= arity; j++) { + string name = "ConcurrentAddLinks-" + to_string(thread_id) + "-" + to_string(i) + + "-" + to_string(j); + auto node = new Node("Symbol", name); + targets.push_back(node->handle()); + nodes.push_back(node); + } + auto link = new Link("Expression", targets); + links.push_back(link); + + vector nested_targets = {targets[0], targets[1], link->handle()}; + auto link_with_nested = new Link("Expression", nested_targets); + links.push_back(link_with_nested); + + if (i % chunck_size == 0) { + db2->add_nodes(nodes, false, true); + db2->add_links(links, false, true); + nodes.clear(); + links.clear(); + } + } + + if (!nodes.empty()) db2->add_nodes(nodes, false, true); + if (!links.empty()) db2->add_links(links, false, true); + + success_count++; + } catch (const exception& e) { + cout << "Thread " << thread_id << " failed with error: " << e.what() << endl; + } + }; + + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back(worker, i); + } + + for (auto& t : threads) { + t.join(); + } + + EXPECT_EQ(success_count, num_threads); + + // clang-format off + LinkSchema link_schema({ + "LINK_TEMPLATE", "Expression", "3", + "NODE", "Symbol", "ConcurrentAddLinks-0-1-1", + "NODE", "Symbol", "ConcurrentAddLinks-0-1-2", + "VARIABLE", "V" + }); + // clang-format on + + auto result = db2->query_for_pattern(link_schema); + EXPECT_EQ(result->size(), 2); +} + int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); ::testing::AddGlobalTestEnvironment(new RedisMongoDBTestEnvironment()); diff --git a/src/tests/cpp/test_commons/TestConfig.cc b/src/tests/cpp/test_commons/TestConfig.cc index a35aede1..5bbccfa0 100644 --- a/src/tests/cpp/test_commons/TestConfig.cc +++ b/src/tests/cpp/test_commons/TestConfig.cc @@ -9,7 +9,7 @@ string TestConfig::DAS_MONGODB_USERNAME = "admin"; string TestConfig::DAS_MONGODB_PASSWORD = "admin"; string TestConfig::DAS_DISABLE_ATOMDB_CACHE = "false"; string TestConfig::DAS_MORK_HOSTNAME = "localhost"; -string TestConfig::DAS_MORK_PORT = "8000"; +string TestConfig::DAS_MORK_PORT = "40022"; void TestConfig::load_environment(bool replace_existing) { setenv("DAS_REDIS_HOSTNAME", DAS_REDIS_HOSTNAME.c_str(), replace_existing);