Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): add message endpoint for inspector #490

Merged
merged 33 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
381cdb9
feat(core): add message endpoint for inspector
Alejandro-Morales Aug 7, 2024
4e6ffd2
fix: ruff check
Alejandro-Morales Aug 7, 2024
8f53362
fix: ruff check
Alejandro-Morales Aug 7, 2024
f4acd8c
Merge branch 'main' into feat/message-inspector
Alejandro-Morales Aug 15, 2024
bd8d4cc
feat: move file storage to memory storage
Alejandro-Morales Aug 15, 2024
f13f0c4
fix: ruff check
Alejandro-Morales Aug 16, 2024
c899ca5
Merge branch 'main' into feat/message-inspector
Alejandro-Morales Aug 16, 2024
2a1e27d
updates on Florians comments
Alejandro-Morales Aug 16, 2024
613c257
ruff format
Alejandro-Morales Aug 16, 2024
7a78622
Merge branch 'main' into feat/message-inspector
Alejandro-Morales Aug 16, 2024
7dd0098
feat: updates on florians comments and agent_info endpoint
Alejandro-Morales Aug 19, 2024
b40656a
fix: delete unused imports
Alejandro-Morales Aug 19, 2024
474d5bd
fix: ruff format check
Alejandro-Morales Aug 19, 2024
d244919
Merge branch 'main' into feat/message-inspector
Alejandro-Morales Aug 19, 2024
c6e5e62
feat: add retention mechanism
Alejandro-Morales Aug 19, 2024
5dc4996
Merge branch 'main' into feat/message-inspector
Alejandro-Morales Aug 21, 2024
f43ab1c
feat: Bureau integration
Alejandro-Morales Aug 22, 2024
46aea4c
try adding server headers
Archento Aug 22, 2024
e37ee6a
fix: format
Alejandro-Morales Aug 22, 2024
05f08e8
fix: model dump agent endpoint list
jrriehl Aug 23, 2024
0e3c4d1
loosen access control rules
Archento Aug 23, 2024
724ede6
Merge branch 'main' into feat/message-inspector
Alejandro-Morales Aug 28, 2024
e65f152
feat: Refactoring based on new REST updates
Alejandro-Morales Aug 28, 2024
39f87b5
fix: test and ruff check
Alejandro-Morales Aug 28, 2024
ead3242
minor corrections
Alejandro-Morales Aug 28, 2024
01b075f
fix rest example, make inspector opt-out, enforce reserved routes, wr…
Archento Aug 29, 2024
887c3f7
fix test
Archento Aug 29, 2024
2999472
simplify AgentEndpoint model
Archento Aug 29, 2024
bec7141
relocate cache ref to dispenser
Archento Aug 30, 2024
a593e94
update docs
Archento Aug 30, 2024
4bcf5e8
fix: small context inconsistency
Archento Aug 30, 2024
0c72136
another small revert in context
Archento Aug 30, 2024
a28162e
refactor retention mechanism
Archento Aug 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions python/src/uagents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,11 @@ def __init__(

if not self._use_mailbox:
self._server = ASGIServer(
self._port, self._loop, self._queries, logger=self._logger
self._port,
self._loop,
self._queries,
self._dispenser,
logger=self._logger,
)

# define default error message handler
Expand Down Expand Up @@ -851,13 +855,13 @@ def include(self, protocol: Protocol, publish_manifest: Optional[bool] = False):
if schema_digest in self._signed_message_handlers:
raise RuntimeError("Unable to register duplicate message handler")
if schema_digest in protocol.signed_message_handlers:
self._signed_message_handlers[schema_digest] = (
protocol.signed_message_handlers[schema_digest]
)
self._signed_message_handlers[
schema_digest
] = protocol.signed_message_handlers[schema_digest]
elif schema_digest in protocol.unsigned_message_handlers:
self._unsigned_message_handlers[schema_digest] = (
protocol.unsigned_message_handlers[schema_digest]
)
self._unsigned_message_handlers[
schema_digest
] = protocol.unsigned_message_handlers[schema_digest]
else:
raise RuntimeError("Unable to lookup up message handler in protocol")

Expand Down Expand Up @@ -1147,7 +1151,10 @@ def __init__(
self._port = port or 8000
self._queries: Dict[str, asyncio.Future] = {}
self._logger = get_logger("bureau", log_level)
self._server = ASGIServer(self._port, self._loop, self._queries, self._logger)
self._dispenser = Dispenser()
self._server = ASGIServer(
self._port, self._loop, self._queries, self._dispenser, self._logger
)
self._use_mailbox = False

if agents is not None:
Expand Down
32 changes: 30 additions & 2 deletions python/src/uagents/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pydantic
import uvicorn
from requests.structures import CaseInsensitiveDict
from uagents.communication import enclose_response_raw
from uagents.communication import Dispenser, enclose_response_raw
from uagents.config import RESPONSE_TIME_HINT_SECONDS
from uagents.context import ERROR_MESSAGE_DIGEST
from uagents.crypto import is_user_address
Expand Down Expand Up @@ -44,6 +44,7 @@ def __init__(
port: int,
loop: asyncio.AbstractEventLoop,
queries: Dict[str, asyncio.Future],
dispenser: Dispenser,
logger: Optional[Logger] = None,
):
"""
Expand All @@ -58,6 +59,7 @@ def __init__(
self._port = int(port)
self._loop = loop
self._queries = queries
self._dispenser = dispenser
self._logger = logger or get_logger("server")
self._server = None

Expand Down Expand Up @@ -149,6 +151,26 @@ async def handle_missing_content_type(self, headers: CaseInsensitiveDict, send):
}
)

async def handle_get_messages(self, send):
"""
Handle retrieval of stored messages.
"""
messages = dispatcher.received_messages + self._dispenser.sent_messages
messages.sort(key=lambda x: x["timestamp"])
response = {"messages": [msg for msg in messages]}
await send(
{
"type": "http.response.start",
"status": 200,
"headers": [
[b"content-type", b"application/json"],
],
}
)
await send(
{"type": "http.response.body", "body": json.dumps(response).encode()}
)

async def serve(self):
"""
Start the server.
Expand All @@ -173,6 +195,13 @@ async def __call__(self, scope, receive, send): # pylint: disable=too-many-bra

assert scope["type"] == "http"

request_method = scope["method"]
path = scope["path"]

if request_method == "GET" and path == "/messages":
await self.handle_get_messages(send)
return

if scope["path"] != "/submit":
await send(
{
Expand All @@ -190,7 +219,6 @@ async def __call__(self, scope, receive, send): # pylint: disable=too-many-bra

headers = CaseInsensitiveDict(scope.get("headers", {}))

request_method = scope["method"]
if request_method == "HEAD":
await self.handle_readiness_probe(headers, send)
return
Expand Down
12 changes: 12 additions & 0 deletions python/src/uagents/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(self):
self._envelopes: asyncio.Queue[
Tuple[Envelope, List[str], asyncio.Future, bool]
] = asyncio.Queue()
self.sent_messages = []

def add_envelope(
self,
Expand Down Expand Up @@ -104,6 +105,17 @@ async def run(self):
sync=sync,
)
response_future.set_result(result)

self.sent_messages.append({
"timestamp": time(),
"envelope": {
"sender": env.sender,
"target": env.target,
"schema_digest": env.schema_digest,
"payload": env.decode_payload(),
"session": str(env.session),
}
})
except Exception as err:
LOGGER.error(f"Failed to send envelope: {err}")

Expand Down
12 changes: 12 additions & 0 deletions python/src/uagents/dispatch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import uuid
from abc import ABC, abstractmethod
from time import time
from typing import Dict, Set

JsonStr = str
Expand All @@ -24,6 +25,7 @@ class Dispatcher:

def __init__(self):
self._sinks: Dict[str, Set[Sink]] = {}
self.received_messages = []

@property
def sinks(self) -> Dict[str, Set[Sink]]:
Expand Down Expand Up @@ -56,5 +58,15 @@ async def dispatch(
for handler in self._sinks.get(destination, set()):
await handler.handle_message(sender, schema_digest, message, session)

self.received_messages.append({
"timestamp": time(),
"envelope": {
"sender": sender,
"target": destination,
"schema_digest": schema_digest,
"payload": message,
"session": str(session),
}
})

dispatcher = Dispatcher()
Loading