diff --git a/3rd_party_slots/python_client/hyperon_das/service_clients/atomdb.py b/3rd_party_slots/python_client/hyperon_das/service_clients/atomdb.py index eaa84943..b7db125f 100644 --- a/3rd_party_slots/python_client/hyperon_das/service_clients/atomdb.py +++ b/3rd_party_slots/python_client/hyperon_das/service_clients/atomdb.py @@ -7,8 +7,12 @@ class AtomDBProxy(BaseProxy): + COMMAND_SIZE_LIMIT = 50 + # Proxy Commands ADD_ATOMS = "add_atoms" + START_STREAM = "start_stream" + END_STREAM = "end_stream" def __init__(self) -> None: super().__init__() @@ -63,7 +67,7 @@ def flush(): return atoms - def add_atoms(self, atoms: list[Atom]) -> list[str]: + def add_atoms(self, atoms: list[Atom], use_streaming: bool = False) -> list[str]: """ Serialize atoms, send `ADD_ATOMS` to the remote peer, and return their handles. @@ -79,6 +83,10 @@ def add_atoms(self, atoms: list[Atom]) -> list[str]: """ args = [] handles = [] + stream_info = [str(len(atoms))] + + if use_streaming: + self.to_remote_peer(self.START_STREAM, stream_info) for atom in atoms: atom_type = "NODE" if atom.arity() == 0 else "LINK" @@ -86,6 +94,11 @@ def add_atoms(self, atoms: list[Atom]) -> list[str]: atom.tokenize(args) handles.append(atom.handle) - self.to_remote_peer(self.ADD_ATOMS, args) + if len(args) > self.COMMAND_SIZE_LIMIT or atom == atoms[-1]: + self.to_remote_peer(self.ADD_ATOMS, args) + args.clear() + + if use_streaming: + self.to_remote_peer(self.END_STREAM, []) return handles diff --git a/3rd_party_slots/python_client/tests/integration/test_atomdb.py b/3rd_party_slots/python_client/tests/integration/test_atomdb.py index 7bc83a9b..cbf0664c 100644 --- a/3rd_party_slots/python_client/tests/integration/test_atomdb.py +++ b/3rd_party_slots/python_client/tests/integration/test_atomdb.py @@ -1,4 +1,5 @@ import concurrent.futures +import time import uuid from hyperon_das.commons.atoms import Link, Node @@ -76,6 +77,8 @@ def test_atomdb_proxy_simple_client(): proxy.add_atoms(atoms) + time.sleep(1) # Wait a moment for the data to be committed to the database + decoder = AtomDecoder() node_db1 = decoder.get_atom(node1.handle) diff --git a/3rd_party_slots/python_client/tests/integration/utils.py b/3rd_party_slots/python_client/tests/integration/utils.py index 27ca4e59..a3cffa59 100644 --- a/3rd_party_slots/python_client/tests/integration/utils.py +++ b/3rd_party_slots/python_client/tests/integration/utils.py @@ -6,9 +6,9 @@ from hyperon_das.logger import log DAS_MONGODB_HOSTNAME = "0.0.0.0" -DAS_MONGODB_PORT = 28000 -DAS_MONGODB_USERNAME = "dbadmin" -DAS_MONGODB_PASSWORD = "dassecret" +DAS_MONGODB_PORT = 40021 +DAS_MONGODB_USERNAME = "admin" +DAS_MONGODB_PASSWORD = "admin" class AtomDecoder(HandleDecoder):