Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion 3rd_party_slots/python_client/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ ENV POETRY_VIRTUALENVS_IN_PROJECT=false

# Create the proto directory and download the .proto files
RUN mkdir -p /proto_files \
&& curl -o /proto_files/atom_space_node.proto https://raw.githubusercontent.com/singnet/das-proto/refs/heads/master/atom_space_node.proto \
&& curl -o /proto_files/distributed_algorithm_node.proto https://raw.githubusercontent.com/singnet/das-proto/refs/heads/master/distributed_algorithm_node.proto \
&& curl -o /proto_files/common.proto https://raw.githubusercontent.com/singnet/das-proto/refs/heads/master/common.proto

RUN useradd -ms /bin/bash builder
Expand Down
2 changes: 1 addition & 1 deletion 3rd_party_slots/python_client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pip install dist/python_client-0.1.0*.whl

If you prefer to build without Docker, you can do so locally, though this requires manual adjustments.

**Note:** The Docker method is preferred for its consistency, as the local build requires manual path adjustments and assumes the proto/ directory contains the necessary .proto files (atom_space_node.proto and common.proto).
**Note:** The Docker method is preferred for its consistency, as the local build requires manual path adjustments and assumes the proto/ directory contains the necessary .proto files (distributed_algorithm_node.proto and common.proto).

1. Install Poetry: [Follow the instructions at Poetry Installation](https://python-poetry.org/docs/#installation)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import grpc

from hyperon_das._grpc import atom_space_node_pb2
from hyperon_das._grpc.atom_space_node_pb2_grpc import AtomSpaceNodeStub
from hyperon_das._grpc import distributed_algorithm_node_pb2
from hyperon_das._grpc.distributed_algorithm_node_pb2_grpc import DistributedAlgorithmNodeStub
from hyperon_das.logger import log


Expand Down Expand Up @@ -81,9 +81,9 @@ def send_bus_command(self, command: str, args: list[str]) -> None:

def send(self, command: str, args: list[str], target_id: str) -> None:
with grpc.insecure_channel(target_id) as channel:
stub = AtomSpaceNodeStub(channel)
stub = DistributedAlgorithmNodeStub(channel)
log.debug(f"Sending command: {command} with args: {args} to target: {target_id}")
message = atom_space_node_pb2.MessageData(
message = distributed_algorithm_node_pb2.MessageData(
command=command, args=args, sender=self.node_id, is_broadcast=False, visited_recipients=[]
)
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import argparse
import sys
import time

from hyperon_das.commons.helpers import str_2_bool, tokenize_preserve_quotes
Expand Down
30 changes: 16 additions & 14 deletions 3rd_party_slots/python_client/hyperon_das/service_bus/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

import grpc

import hyperon_das._grpc.atom_space_node_pb2 as atom__space__node__pb2
import hyperon_das._grpc.atom_space_node_pb2_grpc as atom__space__node__pb2__grpc
import hyperon_das._grpc.common_pb2 as common__pb2
from hyperon_das._grpc.atom_space_node_pb2_grpc import AtomSpaceNodeStub
import hyperon_das._grpc.distributed_algorithm_node_pb2 as distributed__algorithm__node__pb2
import hyperon_das._grpc.distributed_algorithm_node_pb2_grpc as distributed__algorithm__node__pb2__grpc
from hyperon_das._grpc.distributed_algorithm_node_pb2_grpc import DistributedAlgorithmNodeStub
from hyperon_das.logger import log
from hyperon_das.service_bus.port_pool import PortPool

Expand All @@ -18,7 +18,7 @@ def __init__(self, command: str = None, args: list[str] = None) -> None:
self.command = command
self.args = args or []
self.proxy_port: int = 0
self.proxy_node: 'AtomSpaceNodeManager' = None
self.proxy_node: 'DistributedAlgorithmNodeManager' = None
self.requestor_id: str = None
self.serial: int = None

Expand All @@ -38,9 +38,9 @@ def setup_proxy_node(self, client_id: str = "", server_id: str = "") -> None:
id = self.requestor_id
requestor_host = id.split(":")[0]
requestor_id = requestor_host + ":" + str(self.proxy_port)
self.proxy_node = AtomSpaceNodeManager(node_id=requestor_id, server_id=server_id, proxy=self)
self.proxy_node = DistributedAlgorithmNodeManager(node_id=requestor_id, server_id=server_id, proxy=self)
else:
self.proxy_node = AtomSpaceNodeManager(node_id=client_id, server_id=server_id, proxy=self)
self.proxy_node = DistributedAlgorithmNodeManager(node_id=client_id, server_id=server_id, proxy=self)
self.proxy_node.peer_id = server_id
self.proxy_node.start()

Expand All @@ -58,7 +58,7 @@ def to_remote_peer(self, command: str, args: list[str]) -> None:
self.proxy_node.to_remote_peer(command, args)


class AtomSpaceNodeManager:
class DistributedAlgorithmNodeManager:
"""Manages the AtomSpace node, including the gRPC server lifecycle."""

PROXY_COMMAND = "bus_command_proxy"
Expand All @@ -68,11 +68,13 @@ def __init__(self, node_id: str, server_id: str, proxy: BusCommandProxy) -> None
self.peer_id = None
self.node_id = node_id
self.server_id = server_id
self.servicer = AtomSpaceNodeServicer(proxy)
self.servicer = DistributedAlgorithmNodeServicer(proxy)

def start(self):
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
atom__space__node__pb2__grpc.add_AtomSpaceNodeServicer_to_server(self.servicer, self.server)
distributed__algorithm__node__pb2__grpc.add_DistributedAlgorithmNodeServicer_to_server(
self.servicer, self.server
)
self.server.add_insecure_port(self.node_id)
self.server.start()

Expand All @@ -97,9 +99,9 @@ def to_remote_peer(self, command: str, args: list[str]) -> None:
new_args.append(command)

with grpc.insecure_channel(self.peer_id) as channel:
stub = AtomSpaceNodeStub(channel)
stub = DistributedAlgorithmNodeStub(channel)
log.debug(f"Sending command: {self.PROXY_COMMAND} to target: {self.peer_id}")
message = atom__space__node__pb2.MessageData(
message = distributed__algorithm__node__pb2.MessageData(
command=self.PROXY_COMMAND,
args=new_args,
sender=self.node_id,
Expand All @@ -112,7 +114,7 @@ def to_remote_peer(self, command: str, args: list[str]) -> None:
log.error(f"Failed to send message: {e}")


class AtomSpaceNodeServicer(atom__space__node__pb2__grpc.AtomSpaceNodeServicer):
class DistributedAlgorithmNodeServicer(distributed__algorithm__node__pb2__grpc.DistributedAlgorithmNodeServicer):
"""Implements the gRPC service for the AtomSpace node."""

def __init__(self, proxy: BusCommandProxy):
Expand All @@ -121,9 +123,9 @@ def __init__(self, proxy: BusCommandProxy):
def ping(self, request=None, context=None):
return common__pb2.Ack(error=False, msg="ack")

def execute_message(self, request: atom__space__node__pb2.MessageData, context=None):
def execute_message(self, request: distributed__algorithm__node__pb2.MessageData, context=None):
log.info(
f"Remote command: <{request.command}> arrived at AtomSpaceNodeServicer {self.proxy.proxy_node.node_id}"
f"Remote command: <{request.command}> arrived at DistributedAlgorithmNodeServicer {self.proxy.proxy_node.node_id}"
)
log.debug(
f"Request command: {request.command}, args: {request.args}, sender: {request.sender}, is_broadcast: {request.is_broadcast}, visited_recipients: {request.visited_recipients}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


class AtomDBProxy(BaseProxy):
COMMAND_SIZE_LIMIT = 50
COMMAND_SIZE_LIMIT = 5000

# Proxy Commands
ADD_ATOMS = "add_atoms"
Expand Down
2 changes: 1 addition & 1 deletion 3rd_party_slots/python_client/proto
6 changes: 3 additions & 3 deletions 3rd_party_slots/python_client/scripts/build_grpc_files.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@ else
PROTO_DIR=$1
fi

# git submodule update --init --recursive
# git submodule update --remote 3rd_party_slots/python_client/proto

poetry run \
python -m grpc_tools.protoc \
--proto_path="$PROTO_DIR" \
--python_out=$(pwd)/hyperon_das/_grpc \
--pyi_out=$(pwd)/hyperon_das/_grpc \
--grpc_python_out=$(pwd)/hyperon_das/_grpc \
"$PROTO_DIR"/atom_space_node.proto \
"$PROTO_DIR"/distributed_algorithm_node.proto \
"$PROTO_DIR"/common.proto

find "$(pwd)/hyperon_das/_grpc" -name '*_pb2*.py' | while read -r file; do
sed -i \
-e "s/import atom_space_node_pb2/import hyperon_das._grpc.atom_space_node_pb2/g" \
-e "s/import distributed_algorithm_node_pb2/import hyperon_das._grpc.distributed_algorithm_node_pb2/g" \
-e "s/import common_pb2/import hyperon_das._grpc.common_pb2/g" \
"$file"
done
22 changes: 13 additions & 9 deletions 3rd_party_slots/python_client/tests/unit/test_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
import grpc
import pytest

import hyperon_das._grpc.atom_space_node_pb2 as grpc_pb2
import hyperon_das._grpc.common_pb2 as common_pb2
import hyperon_das._grpc.distributed_algorithm_node_pb2 as grpc_pb2
from hyperon_das.query_answer import QueryAnswer
from hyperon_das.service_bus.proxy import AtomSpaceNodeManager, AtomSpaceNodeServicer, BusCommandProxy
from hyperon_das.service_bus.proxy import (
BusCommandProxy,
DistributedAlgorithmNodeManager,
DistributedAlgorithmNodeServicer,
)
from hyperon_das.service_clients.pattern_matching_query import PatternMatchingQueryProxy


Expand All @@ -29,7 +33,7 @@ def test_setup_proxy_node_raises_if_no_port(self):
with pytest.raises(RuntimeError):
proxy.setup_proxy_node()

@patch('hyperon_das.service_bus.proxy.AtomSpaceNodeManager')
@patch('hyperon_das.service_bus.proxy.DistributedAlgorithmNodeManager')
def test_setup_proxy_node_with_requestor_id(self, MockManager):
proxy = DummyProxy()
proxy.proxy_port = 1111
Expand All @@ -40,7 +44,7 @@ def test_setup_proxy_node_with_requestor_id(self, MockManager):
MockManager.assert_called_with(node_id=expected_node_id, server_id="srv", proxy=proxy)
instance.start.assert_called_once()

@patch('hyperon_das.service_bus.proxy.AtomSpaceNodeManager')
@patch('hyperon_das.service_bus.proxy.DistributedAlgorithmNodeManager')
def test_setup_proxy_node_with_client_override(self, MockManager):
proxy = DummyProxy()
proxy.proxy_port = 2222
Expand Down Expand Up @@ -110,7 +114,7 @@ def test_process_message_and_counters(self, proxy):
assert proxy2.abort_flag


class TestAtomSpaceNodeManager:
class TestDistributedAlgorithmNodeManager:
@pytest.fixture(autouse=True)
def stub_grpc(self, monkeypatch):
fake_server = MagicMock()
Expand All @@ -122,25 +126,25 @@ def stub_grpc(self, monkeypatch):
return fake_server

def test_start_adds_port_and_starts(self, stub_grpc):
mgr = AtomSpaceNodeManager(node_id="host:1234", server_id="srv", proxy=None)
mgr = DistributedAlgorithmNodeManager(node_id="host:1234", server_id="srv", proxy=None)
mgr.start()
stub_grpc.add_insecure_port.assert_called_with("host:1234")
stub_grpc.start.assert_called_once()

def test_stop_and_wait(self, stub_grpc):
mgr = AtomSpaceNodeManager(node_id="h:1", server_id="s", proxy=None)
mgr = DistributedAlgorithmNodeManager(node_id="h:1", server_id="s", proxy=None)
mgr.server = stub_grpc
mgr.stop(grace=5)
mgr.wait_for_termination()
stub_grpc.stop.assert_called_with(5)
stub_grpc.wait_for_termination.assert_called_once()


class TestAtomSpaceNodeServicer:
class TestDistributedAlgorithmNodeServicer:
@pytest.fixture(autouse=True)
def servicer(self):
proxy = MagicMock()
return AtomSpaceNodeServicer(proxy)
return DistributedAlgorithmNodeServicer(proxy)

def test_ping(self, servicer):
ack = servicer.ping()
Expand Down
2 changes: 1 addition & 1 deletion docs/database-adapter-users-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ If everything is configured correctly, the adapter will connect to the remote da
2025-12-03 14:06:44 | [INFO] | Port range: [50000 : 50999]
2025-12-03 14:06:44 | [INFO] | 0.0.0.0:9999 is issuing BUS command <atomdb>
2025-12-03 14:06:44 | [INFO] | BUS node 0.0.0.0:9999 is routing command <atomdb> to localhost:40007
2025-12-03 14:06:44 | [INFO] | Remote command: <node_joined_network> arrived at AtomSpaceNodeServicer 0.0.0.0:50000
2025-12-03 14:06:44 | [INFO] | Remote command: <node_joined_network> arrived at DistributedAlgorithmNodeServicer 0.0.0.0:50000
2025-12-03 14:06:51 | [INFO] | Mapped 879/87882 (1%) rows from the public.cvterm table
2025-12-03 14:06:52 | [INFO] | Mapped 1758/87882 (2%) rows from the public.cvterm table
2025-12-03 14:06:53 | [INFO] | Mapped 2637/87882 (3%) rows from the public.cvterm table
Expand Down