From f77f934c04079c1002f32aa0d36f9c077f2c179e Mon Sep 17 00:00:00 2001 From: James Riehl Date: Wed, 11 Dec 2024 16:07:19 +0000 Subject: [PATCH 1/7] chore(core): always return envelope on sync responses --- python/src/uagents/communication.py | 23 +---------------------- python/src/uagents/context.py | 11 ++++------- 2 files changed, 5 insertions(+), 29 deletions(-) diff --git a/python/src/uagents/communication.py b/python/src/uagents/communication.py index d43f5477..2d6eb9e7 100644 --- a/python/src/uagents/communication.py +++ b/python/src/uagents/communication.py @@ -138,7 +138,7 @@ async def send_exchange_envelope( ) if not verified: continue - return await dispatch_sync_response_envelope(env) + return env return MsgStatus( status=DeliveryStatus.DELIVERED, detail="Message successfully delivered via HTTP", @@ -165,27 +165,6 @@ async def send_exchange_envelope( ) -async def dispatch_sync_response_envelope(env: Envelope) -> Union[MsgStatus, Envelope]: - """Dispatch a synchronous response envelope locally.""" - # If there are no sinks registered, return the envelope back to the caller - if len(dispatcher.sinks) == 0: - return env - await dispatcher.dispatch_msg( - env.sender, - env.target, - env.schema_digest, - env.decode_payload(), - env.session, - ) - return MsgStatus( - status=DeliveryStatus.DELIVERED, - detail="Sync message successfully delivered via HTTP", - destination=env.target, - endpoint="", - session=env.session, - ) - - async def send_message_raw( destination: str, message_schema_digest: str, diff --git a/python/src/uagents/context.py b/python/src/uagents/context.py index d2c8158d..df0dea53 100644 --- a/python/src/uagents/context.py +++ b/python/src/uagents/context.py @@ -27,7 +27,6 @@ from uagents.communication import ( Dispenser, dispatch_local_message, - dispatch_sync_response_envelope, ) from uagents.config import ( ALMANAC_API_URL, @@ -382,7 +381,8 @@ async def broadcast( ] ) log(self.logger, logging.DEBUG, f"Sent {len(futures)} messages") - return futures + + return futures # type: ignore def _is_valid_interval_message(self, schema_digest: str) -> bool: """ @@ -404,7 +404,7 @@ async def send( message: Model, sync: bool = False, timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS, - ) -> MsgStatus: + ) -> Union[MsgStatus, Envelope]: """ This is the pro-active send method which is used in on_event and on_interval methods. In these methods, interval messages are set but @@ -441,7 +441,7 @@ async def send_raw( timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS, protocol_digest: Optional[str] = None, queries: Optional[Dict[str, asyncio.Future]] = None, - ) -> MsgStatus: + ) -> Union[MsgStatus, Envelope]: # Extract address from destination agent identifier if present _, parsed_name, parsed_address = parse_identifier(destination) @@ -521,9 +521,6 @@ async def send_raw( session=self._session, ) - if isinstance(result, Envelope): - return await dispatch_sync_response_envelope(result) - return result def _queue_envelope( From a533c59fdd41391b75130c1e365a96120d5bcde7 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Wed, 11 Dec 2024 16:22:24 +0000 Subject: [PATCH 2/7] feat: add timeout to sync response --- python/docs/api/uagents/communication.md | 11 ----------- python/docs/api/uagents/context.md | 10 ++++++---- python/src/uagents/asgi.py | 6 ++++-- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/python/docs/api/uagents/communication.md b/python/docs/api/uagents/communication.md index e0a53273..8bd61b1a 100644 --- a/python/docs/api/uagents/communication.md +++ b/python/docs/api/uagents/communication.md @@ -80,17 +80,6 @@ Method to send an exchange envelope. Union[MsgStatus, Envelope]: Either the status of the message or the response envelope. - - -#### dispatch`_`sync`_`response`_`envelope - -```python -async def dispatch_sync_response_envelope( - env: Envelope) -> Union[MsgStatus, Envelope] -``` - -Dispatch a synchronous response envelope locally. - #### send`_`message`_`raw diff --git a/python/docs/api/uagents/context.md b/python/docs/api/uagents/context.md index 2a46794c..3a0bfad3 100644 --- a/python/docs/api/uagents/context.md +++ b/python/docs/api/uagents/context.md @@ -315,10 +315,12 @@ Please use the `ctx.agent.address` property instead. #### send ```python -async def send(destination: str, - message: Model, - sync: bool = False, - timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS) -> MsgStatus +async def send( + destination: str, + message: Model, + sync: bool = False, + timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS +) -> Union[MsgStatus, Envelope] ``` This is the pro-active send method which is used in on_event and diff --git a/python/src/uagents/asgi.py b/python/src/uagents/asgi.py index d3d14814..21c67240 100644 --- a/python/src/uagents/asgi.py +++ b/python/src/uagents/asgi.py @@ -18,7 +18,7 @@ from requests.structures import CaseInsensitiveDict from uagents.communication import enclose_response_raw -from uagents.config import RESPONSE_TIME_HINT_SECONDS +from uagents.config import DEFAULT_ENVELOPE_TIMEOUT_SECONDS, RESPONSE_TIME_HINT_SECONDS from uagents.context import ERROR_MESSAGE_DIGEST from uagents.crypto import is_user_address from uagents.dispatch import dispatcher @@ -361,7 +361,9 @@ async def __call__(self, scope, receive, send): # pylint: disable=too-many-bra # wait for any queries to be resolved if expects_response: - response_msg, schema_digest = await self._queries[env.sender] + response_msg, schema_digest = await asyncio.wait_for( + self._queries[env.sender], DEFAULT_ENVELOPE_TIMEOUT_SECONDS + ) if (env.expires is not None) and ( datetime.now() > datetime.fromtimestamp(env.expires) ): From 7eb4609933dc5b9eebdf72e4dcef07ab78b6e310 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Wed, 11 Dec 2024 16:41:27 +0000 Subject: [PATCH 3/7] chore: set timeout to envelope expiry --- python/src/uagents/asgi.py | 17 +++++++++++------ python/src/uagents/communication.py | 4 ++-- python/src/uagents/context.py | 2 +- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/python/src/uagents/asgi.py b/python/src/uagents/asgi.py index 21c67240..0e5e993d 100644 --- a/python/src/uagents/asgi.py +++ b/python/src/uagents/asgi.py @@ -1,6 +1,6 @@ import asyncio import json -from datetime import datetime +from datetime import datetime, timezone from logging import Logger from typing import ( Any, @@ -361,12 +361,17 @@ async def __call__(self, scope, receive, send): # pylint: disable=too-many-bra # wait for any queries to be resolved if expects_response: - response_msg, schema_digest = await asyncio.wait_for( - self._queries[env.sender], DEFAULT_ENVELOPE_TIMEOUT_SECONDS + timeout = ( + env.expires - datetime.now(timezone.utc).timestamp() + if env.expires + else None ) - if (env.expires is not None) and ( - datetime.now() > datetime.fromtimestamp(env.expires) - ): + try: + response_msg, schema_digest = await asyncio.wait_for( + self._queries[env.sender], + timeout or DEFAULT_ENVELOPE_TIMEOUT_SECONDS, + ) + except asyncio.TimeoutError: response_msg = ErrorMessage( error="Query envelope expired" ).model_dump_json() diff --git a/python/src/uagents/communication.py b/python/src/uagents/communication.py index 2d6eb9e7..6e7b666d 100644 --- a/python/src/uagents/communication.py +++ b/python/src/uagents/communication.py @@ -3,7 +3,7 @@ import asyncio import logging import uuid -from time import time +from datetime import datetime, timezone from typing import List, Optional, Tuple, Type, Union import aiohttp @@ -222,7 +222,7 @@ async def send_message_raw( target=destination_address, session=uuid.uuid4(), schema_digest=message_schema_digest, - expires=int(time()) + timeout, + expires=int(datetime.now(timezone.utc).timestamp()) + timeout, ) env.encode_payload(message_body) if not is_user_address(sender_address) and isinstance(sender, Identity): diff --git a/python/src/uagents/context.py b/python/src/uagents/context.py index df0dea53..6fbdcbb6 100644 --- a/python/src/uagents/context.py +++ b/python/src/uagents/context.py @@ -623,7 +623,7 @@ async def send( message: Model, sync: bool = False, timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS, - ) -> MsgStatus: + ) -> Union[MsgStatus, Envelope]: """ Send a message to the specified destination. From 0f24bd7dddf09528ce15b3a75d6c113903868a61 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Wed, 11 Dec 2024 16:45:11 +0000 Subject: [PATCH 4/7] docs: generate --- python/docs/api/uagents/context.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/docs/api/uagents/context.md b/python/docs/api/uagents/context.md index 3a0bfad3..fa52d726 100644 --- a/python/docs/api/uagents/context.md +++ b/python/docs/api/uagents/context.md @@ -376,10 +376,12 @@ Initialize the ExternalContext instance and attributes needed from the InternalC #### send ```python -async def send(destination: str, - message: Model, - sync: bool = False, - timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS) -> MsgStatus +async def send( + destination: str, + message: Model, + sync: bool = False, + timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS +) -> Union[MsgStatus, Envelope] ``` Send a message to the specified destination. From f59d9a442efb2203bee056c58c1313293ffa9632 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Wed, 11 Dec 2024 16:47:00 +0000 Subject: [PATCH 5/7] chore: minor timeout update --- python/src/uagents/asgi.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/src/uagents/asgi.py b/python/src/uagents/asgi.py index 0e5e993d..cabf27da 100644 --- a/python/src/uagents/asgi.py +++ b/python/src/uagents/asgi.py @@ -364,12 +364,12 @@ async def __call__(self, scope, receive, send): # pylint: disable=too-many-bra timeout = ( env.expires - datetime.now(timezone.utc).timestamp() if env.expires - else None + else DEFAULT_ENVELOPE_TIMEOUT_SECONDS ) try: response_msg, schema_digest = await asyncio.wait_for( self._queries[env.sender], - timeout or DEFAULT_ENVELOPE_TIMEOUT_SECONDS, + timeout, ) except asyncio.TimeoutError: response_msg = ErrorMessage( From 16772eb28d770b3d643bd949332f141df1044ebb Mon Sep 17 00:00:00 2001 From: James Riehl Date: Wed, 11 Dec 2024 17:25:24 +0000 Subject: [PATCH 6/7] chore: timestamp update --- python/src/uagents/context.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/src/uagents/context.py b/python/src/uagents/context.py index 6fbdcbb6..2f64670e 100644 --- a/python/src/uagents/context.py +++ b/python/src/uagents/context.py @@ -6,7 +6,7 @@ import logging import uuid from abc import ABC, abstractmethod -from time import time +from datetime import datetime, timezone from typing import ( TYPE_CHECKING, Any, @@ -489,7 +489,7 @@ async def send_raw( ) # Calculate when the envelope expires - expires = int(time()) + timeout + expires = int(datetime.now(timezone.utc).timestamp()) + timeout # Handle external dispatch of messages env = Envelope( From f61798033792c54a7deddb76e494eed6e87e9a4f Mon Sep 17 00:00:00 2001 From: Dacksus Date: Fri, 13 Dec 2024 12:03:14 +0100 Subject: [PATCH 7/7] include models in context to build the response model to a sync call --- python/src/uagents/agent.py | 1 + python/src/uagents/context.py | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index 95711930..9e329c1a 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -442,6 +442,7 @@ def _build_context(self) -> InternalContext: interval_messages=self._interval_messages, wallet_messaging_client=self._wallet_messaging_client, logger=self._logger, + models=self._models, ) def _initialize_wallet_and_identity(self, seed, name, wallet_key_derivation_index): diff --git a/python/src/uagents/context.py b/python/src/uagents/context.py index 2f64670e..4657d999 100644 --- a/python/src/uagents/context.py +++ b/python/src/uagents/context.py @@ -261,6 +261,7 @@ def __init__( interval_messages: Optional[Set[str]] = None, wallet_messaging_client: Optional[Any] = None, logger: Optional[logging.Logger] = None, + models: Optional[Dict[str, Type[Model]]] = None, ): self._agent = agent self._storage = storage @@ -272,6 +273,7 @@ def __init__( self._interval_messages = interval_messages self._wallet_messaging_client = wallet_messaging_client self._outbound_messages: Dict[str, Tuple[JsonStr, str]] = {} + self._models = models @property def agent(self) -> AgentRepresentation: @@ -404,7 +406,7 @@ async def send( message: Model, sync: bool = False, timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS, - ) -> Union[MsgStatus, Envelope]: + ) -> Union[MsgStatus, Model]: """ This is the pro-active send method which is used in on_event and on_interval methods. In these methods, interval messages are set but @@ -441,7 +443,7 @@ async def send_raw( timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS, protocol_digest: Optional[str] = None, queries: Optional[Dict[str, asyncio.Future]] = None, - ) -> Union[MsgStatus, Envelope]: + ) -> Union[MsgStatus, Model]: # Extract address from destination agent identifier if present _, parsed_name, parsed_address = parse_identifier(destination) @@ -521,6 +523,16 @@ async def send_raw( session=self._session, ) + if isinstance(result, Envelope): + model_class: Optional[Type[Model]] = self._models.get(result.schema_digest) + if model_class is None: + log(self.logger, logging.DEBUG, "unexpected sync reply") + else: + try: + result = model_class.parse_raw(result.decode_payload()) + except Exception as ex: + log(self.logger, logging.ERROR, f"Unable to parse message: {ex}") + return result def _queue_envelope( @@ -623,7 +635,7 @@ async def send( message: Model, sync: bool = False, timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS, - ) -> Union[MsgStatus, Envelope]: + ) -> Union[MsgStatus, Model]: """ Send a message to the specified destination.