Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/run-coverage-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
DAS_MONGODB_USERNAME: dbadmin
DAS_MONGODB_PASSWORD: dassecret
DAS_MORK_HOSTNAME: localhost
DAS_MORK_PORT: 8000
DAS_MORK_PORT: 40022
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down Expand Up @@ -66,7 +66,7 @@ jobs:
run: |-
make run-mork-server > /dev/null 2>&1 &

until curl --silent http://localhost:8000/status/-; do
until curl --silent http://localhost:40022/status/-; do
echo "Waiting MORK…"
sleep 1
done
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
DAS_MONGODB_USERNAME: admin
DAS_MONGODB_PASSWORD: admin
DAS_MORK_HOSTNAME: localhost
DAS_MORK_PORT: 8000
DAS_MORK_PORT: 40022
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down Expand Up @@ -66,7 +66,7 @@ jobs:
run: |-
make run-mork-server > /dev/null 2>&1 &

until curl --silent http://localhost:8000/status/-; do
until curl --silent http://localhost:40022/status/-; do
echo "Waiting MORK…"
sleep 1
done
Expand Down
14 changes: 13 additions & 1 deletion src/atomdb/morkdb/MorkDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ vector<string> MorkDB::add_links(const vector<atoms::Link*>& links,
bool throw_if_exists,
bool is_transactional) {
if (links.empty()) {
if (is_transactional) {
lock_guard<mutex> composite_type_hashes_map_lock(this->composite_type_hashes_map_mutex);
this->composite_type_hashes_map.clear();
}
return {};
}

Expand All @@ -210,8 +214,11 @@ vector<string> MorkDB::add_links(const vector<atoms::Link*>& links,
}

map<string, vector<string>> composite_type_entries_map;
map<string, string> composite_type_hashes_map_copy;
if (is_transactional) {
this->build_composite_type_entries_map(links, composite_type_entries_map);
lock_guard<mutex> composite_type_hashes_map_lock(this->composite_type_hashes_map_mutex);
composite_type_hashes_map_copy = this->composite_type_hashes_map;
} else {
this->check_existing_targets(links);
}
Expand Down Expand Up @@ -239,7 +246,7 @@ vector<string> MorkDB::add_links(const vector<atoms::Link*>& links,
if (is_transactional) {
mongodb_doc = make_shared<atomdb_api_types::MongodbDocument>(
link,
this->composite_type_hashes_map[link_handle],
composite_type_hashes_map_copy[link_handle],
composite_type_entries_map[link_handle]);
} else {
mongodb_doc = make_shared<atomdb_api_types::MongodbDocument>(link, *this);
Expand All @@ -258,6 +265,11 @@ vector<string> MorkDB::add_links(const vector<atoms::Link*>& links,
this->mork_client->post(metta_expressions, "$x", "$x");
}

if (is_transactional) {
lock_guard<mutex> composite_type_hashes_map_lock(this->composite_type_hashes_map_mutex);
this->composite_type_hashes_map.clear();
}

return handles;
}

Expand Down
4 changes: 3 additions & 1 deletion src/main/tests_db_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ int main(int argc, char* argv[]) {

for (int i = 1; i <= num_links; i++) {
string metta_expression = "(";
vector<string> node_names;
vector<string> targets;
for (int j = 1; j <= arity; j++) {
string name = "add-links-" + to_string(i) + "-" + to_string(j);
auto node = new Node("Symbol", name);
targets.push_back(node->handle());
nodes.push_back(node);
node_names.push_back(name);
metta_expression += name;
if (j != arity) {
metta_expression += " ";
Expand All @@ -82,7 +84,7 @@ int main(int argc, char* argv[]) {

vector<string> nested_targets = {targets[0], targets[1], link->handle()};
string nested_metta_expression =
"(" + targets[0] + " " + targets[1] + " " + metta_expression + ")";
"(" + node_names[0] + " " + node_names[1] + " " + metta_expression + ")";
auto link_with_nested =
new Link("Expression", nested_targets, true, {}, nested_metta_expression);
links.push_back(link_with_nested);
Expand Down
6 changes: 3 additions & 3 deletions src/scripts/mork_loader.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ else
FILE=$1
fi

IMAGE_NAME="trueagi/das:mork-loader-0.10.2"
IMAGE_NAME="trueagi/das:mork-loader-1.0.0"
CONTAINER_NAME="das-mork-loader-$(date +%Y%m%d%H%M%S)"

docker run --rm \
--name="${CONTAINER_NAME}" \
--network host \
--env MORK_SERVER_ADDR=${DAS_MORK_HOSTNAME:-localhost} \
--env MORK_SERVER_PORT=${DAS_MORK_PORT:-8000} \
--env MORK_SERVER_ADDR=${DAS_MORK_HOSTNAME:-0.0.0.0} \
--env MORK_SERVER_PORT=${DAS_MORK_PORT:-40022} \
--volume "${FILE}":/app/file.metta \
--volume `pwd`/src/scripts/mork_loader.py:/app/mork_loader.py \
--workdir /app \
Expand Down
4 changes: 3 additions & 1 deletion src/scripts/mork_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

set -eoux pipefail

IMAGE_NAME="trueagi/das:mork-server-0.10.2"
IMAGE_NAME="trueagi/das:mork-server-1.0.0"
CONTAINER_NAME="das-mork-server-$(date +%Y%m%d%H%M%S)"

docker run --rm \
--name="${CONTAINER_NAME}" \
--network host \
-e MORK_SERVER_ADDR=0.0.0.0 \
-e MORK_SERVER_PORT=${DAS_MORK_PORT:-40022} \
"${IMAGE_NAME}" "$@"

sleep 1
Expand Down
4 changes: 2 additions & 2 deletions src/tests/cpp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ cc_test(

cc_test(
name = "redis_mongodb_test_2",
size = "small",
size = "medium",
srcs = [
"redis_mongodb_test_2.cc",
"test_utils.cc",
Expand Down Expand Up @@ -690,7 +690,7 @@ cc_test(

cc_test(
name = "morkdb_test",
size = "small",
size = "medium",
srcs = ["morkdb_test.cc"],
copts = [
"-Iexternal/gtest/googletest/include",
Expand Down
79 changes: 79 additions & 0 deletions src/tests/cpp/morkdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,85 @@ TEST_F(MorkDBTest, AddLinksWithDuplicateTargets) {
EXPECT_FALSE(db->delete_link(link->handle(), true));
}

TEST_F(MorkDBTest, ConcurrentAddLinks) {
int num_links = 5000;
int arity = 3;
int chunck_size = 500;

const int num_threads = 100;
vector<thread> threads;
atomic<int> success_count{0};

auto worker = [&](int thread_id) {
try {
vector<Node*> nodes;
vector<Link*> links;
for (int i = 1; i <= num_links; i++) {
string metta_expression = "(";
vector<string> targets;
vector<string> node_names;
for (int j = 1; j <= arity; j++) {
string name = "ConcurrentAddLinks-" + to_string(thread_id) + "-" + to_string(i) +
"-" + to_string(j);
auto node = new Node("Symbol", name);
targets.push_back(node->handle());
nodes.push_back(node);
node_names.push_back(name);
metta_expression += name;
if (j != arity) metta_expression += " ";
}
metta_expression += ")";

auto link = new Link("Expression", targets, true, {}, metta_expression);
links.push_back(link);

vector<string> nested_targets = {targets[0], targets[1], link->handle()};
string nested_metta_expression =
"(" + node_names[0] + " " + node_names[1] + " " + metta_expression + ")";
auto link_with_nested =
new Link("Expression", nested_targets, true, {}, nested_metta_expression);
links.push_back(link_with_nested);

if (i % chunck_size == 0) {
db->add_nodes(nodes, false, true);
db->add_links(links, false, true);
nodes.clear();
links.clear();
}
}

if (!nodes.empty()) db->add_nodes(nodes, false, true);
if (!links.empty()) db->add_links(links, false, true);

success_count++;
} catch (const exception& e) {
cout << "Thread " << thread_id << " failed with error: " << e.what() << endl;
}
};

for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(worker, i);
}

for (auto& t : threads) {
t.join();
}

EXPECT_EQ(success_count, num_threads);

// clang-format off
LinkSchema link_schema({
"LINK_TEMPLATE", "Expression", "3",
"NODE", "Symbol", "ConcurrentAddLinks-0-1-1",
"NODE", "Symbol", "ConcurrentAddLinks-0-1-2",
"VARIABLE", "V"
});
// clang-format on

auto result = db->query_for_pattern(link_schema);
EXPECT_EQ(result->size(), 2);
}

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
::testing::AddGlobalTestEnvironment(new MorkDBTestEnvironment());
Expand Down
69 changes: 69 additions & 0 deletions src/tests/cpp/redis_mongodb_test_2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,75 @@ TEST_F(RedisMongoDBTest, MongodbDocumentGetSize) {
EXPECT_EQ(link4_document->get_size("targets"), 10);
}

TEST_F(RedisMongoDBTest, ConcurrentAddLinks) {
int num_links = 5000;
int arity = 3;
int chunck_size = 500;

const int num_threads = 100;
vector<thread> threads;
atomic<int> success_count{0};

auto worker = [&](int thread_id) {
try {
vector<Node*> nodes;
vector<Link*> links;
for (int i = 1; i <= num_links; i++) {
vector<string> targets;
for (int j = 1; j <= arity; j++) {
string name = "ConcurrentAddLinks-" + to_string(thread_id) + "-" + to_string(i) +
"-" + to_string(j);
auto node = new Node("Symbol", name);
targets.push_back(node->handle());
nodes.push_back(node);
}
auto link = new Link("Expression", targets);
links.push_back(link);

vector<string> nested_targets = {targets[0], targets[1], link->handle()};
auto link_with_nested = new Link("Expression", nested_targets);
links.push_back(link_with_nested);

if (i % chunck_size == 0) {
db2->add_nodes(nodes, false, true);
db2->add_links(links, false, true);
nodes.clear();
links.clear();
}
}

if (!nodes.empty()) db2->add_nodes(nodes, false, true);
if (!links.empty()) db2->add_links(links, false, true);

success_count++;
} catch (const exception& e) {
cout << "Thread " << thread_id << " failed with error: " << e.what() << endl;
}
};

for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(worker, i);
}

for (auto& t : threads) {
t.join();
}

EXPECT_EQ(success_count, num_threads);

// clang-format off
LinkSchema link_schema({
"LINK_TEMPLATE", "Expression", "3",
"NODE", "Symbol", "ConcurrentAddLinks-0-1-1",
"NODE", "Symbol", "ConcurrentAddLinks-0-1-2",
"VARIABLE", "V"
});
// clang-format on

auto result = db2->query_for_pattern(link_schema);
EXPECT_EQ(result->size(), 2);
}

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
::testing::AddGlobalTestEnvironment(new RedisMongoDBTestEnvironment());
Expand Down
2 changes: 1 addition & 1 deletion src/tests/cpp/test_commons/TestConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ string TestConfig::DAS_MONGODB_USERNAME = "admin";
string TestConfig::DAS_MONGODB_PASSWORD = "admin";
string TestConfig::DAS_DISABLE_ATOMDB_CACHE = "false";
string TestConfig::DAS_MORK_HOSTNAME = "localhost";
string TestConfig::DAS_MORK_PORT = "8000";
string TestConfig::DAS_MORK_PORT = "40022";

void TestConfig::load_environment(bool replace_existing) {
setenv("DAS_REDIS_HOSTNAME", DAS_REDIS_HOSTNAME.c_str(), replace_existing);
Expand Down