Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "ht

http_archive(
name = "com_github_singnet_das_proto",
strip_prefix = "das-proto-0.1.18",
urls = ["https://github.com/singnet/das-proto/archive/refs/tags/0.1.18.tar.gz"],
strip_prefix = "das-proto-0.1.19",
urls = ["https://github.com/singnet/das-proto/archive/refs/tags/0.1.19.tar.gz"],
)

# Deps for formatting and linting BUILD files
Expand Down
2 changes: 1 addition & 1 deletion src/distributed_algorithm_node/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ cc_library(
"//commons:commons_lib",
"@com_github_grpc_grpc//:grpc++",
"@com_github_grpc_grpc//:grpc++_reflection",
"@com_github_singnet_das_proto//:atom_space_node_cc_grpc",
"@com_github_singnet_das_proto//:distributed_algorithm_node_cc_grpc",
],
)

Expand Down
30 changes: 8 additions & 22 deletions src/distributed_algorithm_node/MessageBroker.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include "MessageBroker.h"

#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
Expand All @@ -6,16 +8,10 @@
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/security/credentials.h>

#include "common.pb.h"

// TODO: Once das-proto is updated, update atom_space_node to distributed_algorithm_node

// #include "distributed_algorithm_node.grpc.pb.h"
// #include "distributed_algorithm_node.pb.h"
#include "MessageBroker.h"
#include "Utils.h"
#include "atom_space_node.grpc.pb.h"
#include "atom_space_node.pb.h"
#include "common.pb.h"
#include "distributed_algorithm_node.grpc.pb.h"
#include "distributed_algorithm_node.pb.h"

#define LOG_LEVEL INFO_LEVEL
#include "Logger.h"
Expand Down Expand Up @@ -214,11 +210,7 @@ void SynchronousGRPC::inbox_thread_method(shared_ptr<StoppableThread> monitor) {
this->peers_mutex.lock();
for (auto target : this->peers) {
if (visited.find(target) == visited.end()) {
// TODO: Once das-proto is updated, update atom_space_node to
// distributed_algorithm_node auto stub =
// dasproto::DistributedAlgorithmNode::NewStub(grpc::CreateChannel(target,
// grpc::InsecureChannelCredentials()));
auto stub = dasproto::AtomSpaceNode::NewStub(
auto stub = dasproto::DistributedAlgorithmNode::NewStub(
grpc::CreateChannel(target, grpc::InsecureChannelCredentials()));
stub->execute_message(&(context[cursor]), *message_data, &(reply[cursor]));
cursor++;
Expand Down Expand Up @@ -377,10 +369,7 @@ void SynchronousGRPC::send(const string& command, const vector<string>& args, co
message_data.set_is_broadcast(false);
dasproto::Empty reply;
grpc::ClientContext context;
// TODO: Once das-proto is updated, update atom_space_node to distributed_algorithm_node
// auto stub = dasproto::DistributedAlgorithmNode::NewStub(grpc::CreateChannel(recipient,
// grpc::InsecureChannelCredentials()));
auto stub = dasproto::AtomSpaceNode::NewStub(
auto stub = dasproto::DistributedAlgorithmNode::NewStub(
grpc::CreateChannel(recipient, grpc::InsecureChannelCredentials()));
stub->execute_message(&context, message_data, &reply);
}
Expand All @@ -404,10 +393,7 @@ void SynchronousGRPC::broadcast(const string& command, const vector<string>& arg
message_data.set_sender(this->node_id);
message_data.set_is_broadcast(true);
message_data.add_visited_recipients(this->node_id);
// TODO: Once das-proto is updated, update atom_space_node to distributed_algorithm_node
// auto stub = dasproto::DistributedAlgorithmNode::NewStub(grpc::CreateChannel(peer_id,
// grpc::InsecureChannelCredentials()));
auto stub = dasproto::AtomSpaceNode::NewStub(
auto stub = dasproto::DistributedAlgorithmNode::NewStub(
grpc::CreateChannel(peer_id, grpc::InsecureChannelCredentials()));
stub->execute_message(&(context[cursor]), message_data, &(reply[cursor]));
cursor++;
Expand Down
10 changes: 2 additions & 8 deletions src/distributed_algorithm_node/MessageBroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,11 @@
#include <unordered_set>
#include <vector>

// TODO: Once das-proto is updated, update atom_space_node to distributed_algorithm_node

#include "atom_space_node.grpc.pb.h"
// #include "distributed_algorithm_node.grpc.pb.h"

#include "Message.h"
#include "SharedQueue.h"
#include "Stoppable.h"
#include "StoppableThread.h"
#include "distributed_algorithm_node.grpc.pb.h"

using namespace std;
using namespace commons;
Expand Down Expand Up @@ -246,9 +242,7 @@ class SynchronousSharedRAM : public MessageBroker {
* answer is supposed to be implemented as a separate Message going back from the target node to
* the node that originated the request.
*/
// TODO: Once das-proto is updated, update atom_space_node to distributed_algorithm_node
// class SynchronousGRPC : public MessageBroker, public dasproto::DistributedAlgorithmNode::Service {
class SynchronousGRPC : public MessageBroker, public dasproto::AtomSpaceNode::Service {
class SynchronousGRPC : public MessageBroker, public dasproto::DistributedAlgorithmNode::Service {
public:
/**
* Basic constructor
Expand Down
11 changes: 5 additions & 6 deletions src/tests/cpp/pattern_matching_query_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void check_query(const string& query_tag,

// giving time to the server to close the previous connection
// otherwise the test fails with "Node ID already in the network"
Utils::sleep(3000);
Utils::sleep();

client_bus->issue_bus_command(proxy2);
while (!proxy2->finished()) {
Expand All @@ -113,7 +113,7 @@ void check_query(const string& query_tag,

// giving time to the server to close the previous connection
// otherwise the test fails with "Node ID already in the network"
Utils::sleep(3000);
Utils::sleep();

if (metta_expression != "") {
client_bus->issue_bus_command(proxy3);
Expand Down Expand Up @@ -147,11 +147,11 @@ TEST(PatternMatchingQuery, queries) {
string peer2_id = "localhost:40042";

ServiceBus* server_bus = new ServiceBus(peer1_id);
Utils::sleep(500);
Utils::sleep();
server_bus->register_processor(make_shared<PatternMatchingQueryProcessor>());
Utils::sleep(500);
Utils::sleep();
ServiceBus* client_bus = new ServiceBus(peer2_id, peer1_id);
Utils::sleep(500);
Utils::sleep();

// clang-format off
vector<string> q1 = {
Expand Down Expand Up @@ -284,6 +284,5 @@ TEST(PatternMatchingQuery, queries) {
}
EXPECT_EQ(count, 1);

Utils::sleep(2000);
// clang-format on
}