diff --git a/python/docs/api/uagents/communication.md b/python/docs/api/uagents/communication.md index e0a53273..8bd61b1a 100644 --- a/python/docs/api/uagents/communication.md +++ b/python/docs/api/uagents/communication.md @@ -80,17 +80,6 @@ Method to send an exchange envelope. Union[MsgStatus, Envelope]: Either the status of the message or the response envelope. - - -#### dispatch`_`sync`_`response`_`envelope - -```python -async def dispatch_sync_response_envelope( - env: Envelope) -> Union[MsgStatus, Envelope] -``` - -Dispatch a synchronous response envelope locally. - #### send`_`message`_`raw diff --git a/python/docs/api/uagents/context.md b/python/docs/api/uagents/context.md index 2a46794c..fa52d726 100644 --- a/python/docs/api/uagents/context.md +++ b/python/docs/api/uagents/context.md @@ -315,10 +315,12 @@ Please use the `ctx.agent.address` property instead. #### send ```python -async def send(destination: str, - message: Model, - sync: bool = False, - timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS) -> MsgStatus +async def send( + destination: str, + message: Model, + sync: bool = False, + timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS +) -> Union[MsgStatus, Envelope] ``` This is the pro-active send method which is used in on_event and @@ -374,10 +376,12 @@ Initialize the ExternalContext instance and attributes needed from the InternalC #### send ```python -async def send(destination: str, - message: Model, - sync: bool = False, - timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS) -> MsgStatus +async def send( + destination: str, + message: Model, + sync: bool = False, + timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS +) -> Union[MsgStatus, Envelope] ``` Send a message to the specified destination. diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index 95711930..9e329c1a 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -442,6 +442,7 @@ def _build_context(self) -> InternalContext: interval_messages=self._interval_messages, wallet_messaging_client=self._wallet_messaging_client, logger=self._logger, + models=self._models, ) def _initialize_wallet_and_identity(self, seed, name, wallet_key_derivation_index): diff --git a/python/src/uagents/asgi.py b/python/src/uagents/asgi.py index 899f8962..ecd6ea79 100644 --- a/python/src/uagents/asgi.py +++ b/python/src/uagents/asgi.py @@ -1,6 +1,6 @@ import asyncio import json -from datetime import datetime +from datetime import datetime, timezone from logging import Logger from typing import ( Any, @@ -18,7 +18,7 @@ from requests.structures import CaseInsensitiveDict from uagents.communication import enclose_response_raw -from uagents.config import RESPONSE_TIME_HINT_SECONDS +from uagents.config import DEFAULT_ENVELOPE_TIMEOUT_SECONDS, RESPONSE_TIME_HINT_SECONDS from uagents.context import ERROR_MESSAGE_DIGEST from uagents.crypto import is_user_address from uagents.dispatch import dispatcher @@ -374,10 +374,17 @@ async def __call__(self, scope, receive, send): # pylint: disable=too-many-bra # wait for any queries to be resolved if expects_response: - response_msg, schema_digest = await self._queries[env.sender] - if (env.expires is not None) and ( - datetime.now() > datetime.fromtimestamp(env.expires) - ): + timeout = ( + env.expires - datetime.now(timezone.utc).timestamp() + if env.expires + else DEFAULT_ENVELOPE_TIMEOUT_SECONDS + ) + try: + response_msg, schema_digest = await asyncio.wait_for( + self._queries[env.sender], + timeout, + ) + except asyncio.TimeoutError: response_msg = ErrorMessage( error="Query envelope expired" ).model_dump_json() diff --git a/python/src/uagents/communication.py b/python/src/uagents/communication.py index d43f5477..6e7b666d 100644 --- a/python/src/uagents/communication.py +++ b/python/src/uagents/communication.py @@ -3,7 +3,7 @@ import asyncio import logging import uuid -from time import time +from datetime import datetime, timezone from typing import List, Optional, Tuple, Type, Union import aiohttp @@ -138,7 +138,7 @@ async def send_exchange_envelope( ) if not verified: continue - return await dispatch_sync_response_envelope(env) + return env return MsgStatus( status=DeliveryStatus.DELIVERED, detail="Message successfully delivered via HTTP", @@ -165,27 +165,6 @@ async def send_exchange_envelope( ) -async def dispatch_sync_response_envelope(env: Envelope) -> Union[MsgStatus, Envelope]: - """Dispatch a synchronous response envelope locally.""" - # If there are no sinks registered, return the envelope back to the caller - if len(dispatcher.sinks) == 0: - return env - await dispatcher.dispatch_msg( - env.sender, - env.target, - env.schema_digest, - env.decode_payload(), - env.session, - ) - return MsgStatus( - status=DeliveryStatus.DELIVERED, - detail="Sync message successfully delivered via HTTP", - destination=env.target, - endpoint="", - session=env.session, - ) - - async def send_message_raw( destination: str, message_schema_digest: str, @@ -243,7 +222,7 @@ async def send_message_raw( target=destination_address, session=uuid.uuid4(), schema_digest=message_schema_digest, - expires=int(time()) + timeout, + expires=int(datetime.now(timezone.utc).timestamp()) + timeout, ) env.encode_payload(message_body) if not is_user_address(sender_address) and isinstance(sender, Identity): diff --git a/python/src/uagents/context.py b/python/src/uagents/context.py index d2c8158d..4657d999 100644 --- a/python/src/uagents/context.py +++ b/python/src/uagents/context.py @@ -6,7 +6,7 @@ import logging import uuid from abc import ABC, abstractmethod -from time import time +from datetime import datetime, timezone from typing import ( TYPE_CHECKING, Any, @@ -27,7 +27,6 @@ from uagents.communication import ( Dispenser, dispatch_local_message, - dispatch_sync_response_envelope, ) from uagents.config import ( ALMANAC_API_URL, @@ -262,6 +261,7 @@ def __init__( interval_messages: Optional[Set[str]] = None, wallet_messaging_client: Optional[Any] = None, logger: Optional[logging.Logger] = None, + models: Optional[Dict[str, Type[Model]]] = None, ): self._agent = agent self._storage = storage @@ -273,6 +273,7 @@ def __init__( self._interval_messages = interval_messages self._wallet_messaging_client = wallet_messaging_client self._outbound_messages: Dict[str, Tuple[JsonStr, str]] = {} + self._models = models @property def agent(self) -> AgentRepresentation: @@ -382,7 +383,8 @@ async def broadcast( ] ) log(self.logger, logging.DEBUG, f"Sent {len(futures)} messages") - return futures + + return futures # type: ignore def _is_valid_interval_message(self, schema_digest: str) -> bool: """ @@ -404,7 +406,7 @@ async def send( message: Model, sync: bool = False, timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS, - ) -> MsgStatus: + ) -> Union[MsgStatus, Model]: """ This is the pro-active send method which is used in on_event and on_interval methods. In these methods, interval messages are set but @@ -441,7 +443,7 @@ async def send_raw( timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS, protocol_digest: Optional[str] = None, queries: Optional[Dict[str, asyncio.Future]] = None, - ) -> MsgStatus: + ) -> Union[MsgStatus, Model]: # Extract address from destination agent identifier if present _, parsed_name, parsed_address = parse_identifier(destination) @@ -489,7 +491,7 @@ async def send_raw( ) # Calculate when the envelope expires - expires = int(time()) + timeout + expires = int(datetime.now(timezone.utc).timestamp()) + timeout # Handle external dispatch of messages env = Envelope( @@ -522,7 +524,14 @@ async def send_raw( ) if isinstance(result, Envelope): - return await dispatch_sync_response_envelope(result) + model_class: Optional[Type[Model]] = self._models.get(result.schema_digest) + if model_class is None: + log(self.logger, logging.DEBUG, "unexpected sync reply") + else: + try: + result = model_class.parse_raw(result.decode_payload()) + except Exception as ex: + log(self.logger, logging.ERROR, f"Unable to parse message: {ex}") return result @@ -626,7 +635,7 @@ async def send( message: Model, sync: bool = False, timeout: int = DEFAULT_ENVELOPE_TIMEOUT_SECONDS, - ) -> MsgStatus: + ) -> Union[MsgStatus, Model]: """ Send a message to the specified destination.