From 7cadc32321b9cc40fcff05b85ed0a96fc8d2d16b Mon Sep 17 00:00:00 2001 From: Archento Date: Thu, 17 Oct 2024 10:55:48 +0200 Subject: [PATCH 1/6] add: new agent and protocol class with rate limit ability --- python/src/uagents/agent.py | 11 +- .../uagents/experimental/quota/__init__.py | 244 ++++++++++++++++++ 2 files changed, 251 insertions(+), 4 deletions(-) create mode 100644 python/src/uagents/experimental/quota/__init__.py diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index 2e7794b46..0760755e1 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -393,10 +393,7 @@ def __init__( logger=self._logger, ) - # define default error message handler - @self.on_message(ErrorMessage) - async def _handle_error_message(ctx: Context, sender: str, msg: ErrorMessage): - ctx.logger.exception(f"Received error message from {sender}: {msg.error}") + self._add_error_message_handler() # define default rest message handlers if agent inspector is enabled if enable_agent_inspector: @@ -417,6 +414,12 @@ async def _handle_get_messages(_ctx: Context): self._init_done = True + def _add_error_message_handler(self): + # define default error message handler + @self.on_message(ErrorMessage) + async def _handle_error_message(ctx: Context, sender: str, msg: ErrorMessage): + ctx.logger.exception(f"Received error message from {sender}: {msg.error}") + def _build_context(self) -> InternalContext: """ An internal method to build the context for the agent. diff --git a/python/src/uagents/experimental/quota/__init__.py b/python/src/uagents/experimental/quota/__init__.py new file mode 100644 index 000000000..35ff0c66a --- /dev/null +++ b/python/src/uagents/experimental/quota/__init__.py @@ -0,0 +1,244 @@ +""" +This agent class can be used to rate limit your message handlers. + +The rate limiter uses the agents storage to keep track of the number of requests +made by another agent within a given time window. If the number of requests exceeds +a specified limit, the rate limiter will block further requests until the time +window resets. + +> Default: 6 requests per hour + +Usage examples: + +This message handler is rate limited with default values: +```python +@agent.on_message(ExampleMessage) +async def handle(ctx: Context, sender: str, msg: ExampleMessage): + ... +``` + +This message handler is rate limited with custom window size and request limit: +```python +@agent.on_message(ExampleMessage, window_size_minutes=30, max_requests=10) +async def handle(ctx: Context, sender: str, msg: ExampleMessage): + ... +``` + +Also applies to protocol message handlers if you import the QuotaProtocol class: +```python +protocol = QuotaProtocol( + quota_callback=agent.add_request, + name="quota_proto", + version=agent._version, +) + +@protocol.on_message(ExampleMessage) +async def handle(ctx: Context, sender: str, msg: ExampleMessage): + ... + +agent.include(protocol) +``` +""" + +import functools +import time +from typing import Callable, Optional, Set, Type, Union + +from pydantic import BaseModel + +from uagents import Agent, Context, Model, Protocol +from uagents.models import ErrorMessage +from uagents.types import MessageCallback + +WINDOW_SIZE_MINUTES = 60 +MAX_REQUESTS = 6 + + +class Usage(BaseModel): + time_window_start: float + window_size_minutes: int + requests: int + max_requests: int + + +class QuotaProtocol(Protocol): + def __init__(self, quota_callback: Callable, *args, **kwargs): + super().__init__(*args, **kwargs) + self.quota_callback = quota_callback + + def on_message( + self, + model: Type[Model], + replies: Optional[Union[Type[Model], Set[Type[Model]]]] = None, + allow_unverified: Optional[bool] = False, + window_size_minutes: int = WINDOW_SIZE_MINUTES, + max_requests: int = MAX_REQUESTS, + ): + """ + Overwritten decorator to register a message handler for the protocol + including rate limiting. + + Args: + model (Type[Model]): The message model type. + replies (Optional[Union[Type[Model], Set[Type[Model]]]], optional): The associated + reply types. Defaults to None. + allow_unverified (Optional[bool], optional): Whether to allow unverified messages. + Defaults to False. + window_size_minutes (int, optional): The size of the time window in minutes. + max_requests (int, optional): The maximum number of requests allowed in the time window. + + Returns: + Callable: The decorator to register the message handler. + """ + + def decorator_on_message(func: MessageCallback): + handler = self.wrap(func, window_size_minutes, max_requests) + self._add_message_handler(model, handler, replies, allow_unverified) + return handler + + return decorator_on_message + + def wrap( + self, + func: MessageCallback, + window_size_minutes: int = WINDOW_SIZE_MINUTES, + max_requests: int = MAX_REQUESTS, + ) -> MessageCallback: + """ + Decorator to wrap a function with rate limiting. + + Args: + func: The function to wrap with rate limiting + window_size_minutes: The size of the time window in minutes + max_requests: The maximum number of requests allowed in the time window + + Returns: + Callable: The decorated + """ + + @functools.wraps(func) + async def decorator(ctx: Context, sender: str, msg: Type[Model]): + if self.quota_callback( + sender, func.__name__, window_size_minutes, max_requests + ): + result = await func(ctx, sender, msg) + else: + result = await ctx.send( + sender, + ErrorMessage( + error=( + f"Rate limit exceeded for {msg.schema()["title"]}. " + f"This endpoint allows for {max_requests} calls per " + f"{window_size_minutes} minutes. Try again later." + ) + ), + ) + return result + + return decorator # type: ignore + + +class QuotaAgent(Agent): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._protocol = QuotaProtocol( + quota_callback=self.add_request, name=self._name, version=self._version + ) + + # only necessary because this feature is not implemented in the core + @self.on_message(ErrorMessage) + async def _handle_error_message(ctx: Context, sender: str, msg: ErrorMessage): + ctx.logger.exception(f"Received error message from {sender}: {msg.error}") + + def on_message( + self, + model: Type[Model], + replies: Optional[Union[Type[Model], Set[Type[Model]]]] = None, + allow_unverified: Optional[bool] = False, + window_size_minutes: int = WINDOW_SIZE_MINUTES, + max_requests: int = MAX_REQUESTS, + ): + """ + Overwritten decorator to register an message handler for the provided + message model including rate limiting. + + Args: + model (Type[Model]): The message model. + replies (Optional[Union[Type[Model], Set[Type[Model]]]]): Optional reply models. + allow_unverified (Optional[bool]): Allow unverified messages. + window_size_minutes (int): The size of the time window in minutes. + max_requests (int): The maximum number of requests allowed in the time window. + + Returns: + Callable: The decorator function for registering message handlers. + + """ + + return self._protocol.on_message( + model, replies, allow_unverified, window_size_minutes, max_requests + ) + + def _add_error_message_handler(self): + # This would not be necessary if this feature was implemented in the core + pass + + def _clean_usage(self, usage: dict[str, dict]): + """ + Remove all time windows that are older than the current time window. + + Args: + usage: The usage dictionary to clean + """ + now = int(time.time()) + for key in list(usage.keys()): + if (now - usage[key]["time_window_start"]) > usage[key][ + "window_size_minutes" + ] * 60: + del usage[key] + + def add_request( + self, + agent_address: str, + function_name: str, + window_size_minutes: int, + max_requests: int, + ) -> bool: + """ + Add a request to the rate limiter if the current time is still within the + time window since the beginning of the most recent time window. Otherwise, + reset the time window and add the request. + + Args: + agent_address: The address of the agent making the request + + Returns: + False if the maximum number of requests has been exceeded, True otherwise + """ + + now = int(time.time()) + + usage = self.storage.get(agent_address) or {} + + if function_name in usage: + quota = Usage(**usage[function_name]) + if (now - quota.time_window_start) <= window_size_minutes * 60: + if quota.requests >= max_requests: + return False + quota.requests += 1 + else: + quota.time_window_start = now + quota.requests = 1 + usage[function_name] = quota.model_dump() + else: + usage[function_name] = Usage( + time_window_start=now, + window_size_minutes=window_size_minutes, + requests=1, + max_requests=max_requests, + ).model_dump() + + self._clean_usage(usage) + + self.storage.set(agent_address, usage) + + return True From 5e339667eda773ba0ccdc28d54a9aa4ddf731dcd Mon Sep 17 00:00:00 2001 From: Archento Date: Thu, 17 Oct 2024 11:04:22 +0200 Subject: [PATCH 2/6] small correction in docstring --- python/src/uagents/experimental/quota/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/src/uagents/experimental/quota/__init__.py b/python/src/uagents/experimental/quota/__init__.py index 35ff0c66a..282e7c0c5 100644 --- a/python/src/uagents/experimental/quota/__init__.py +++ b/python/src/uagents/experimental/quota/__init__.py @@ -24,7 +24,8 @@ async def handle(ctx: Context, sender: str, msg: ExampleMessage): ... ``` -Also applies to protocol message handlers if you import the QuotaProtocol class: +Also applies to protocol message handlers if you import the QuotaProtocol class +and use it with the Quota agents callback function: ```python protocol = QuotaProtocol( quota_callback=agent.add_request, From 51096302ebce60ce2405b2426ce3607ed7cb9c06 Mon Sep 17 00:00:00 2001 From: Archento Date: Thu, 17 Oct 2024 22:04:06 +0200 Subject: [PATCH 3/6] move quota logic completely to Protocol and start implementing an access control list --- .../uagents/experimental/quota/__init__.py | 164 ++++++++++++------ 1 file changed, 108 insertions(+), 56 deletions(-) diff --git a/python/src/uagents/experimental/quota/__init__.py b/python/src/uagents/experimental/quota/__init__.py index 282e7c0c5..ee24c804d 100644 --- a/python/src/uagents/experimental/quota/__init__.py +++ b/python/src/uagents/experimental/quota/__init__.py @@ -25,30 +25,31 @@ async def handle(ctx: Context, sender: str, msg: ExampleMessage): ``` Also applies to protocol message handlers if you import the QuotaProtocol class -and use it with the Quota agents callback function: +and include it in your agent: ```python -protocol = QuotaProtocol( - quota_callback=agent.add_request, +quota_protocol = QuotaProtocol( + storage_reference=agent.storage, name="quota_proto", version=agent._version, ) -@protocol.on_message(ExampleMessage) +@quota_protocol.on_message(ExampleMessage) async def handle(ctx: Context, sender: str, msg: ExampleMessage): ... -agent.include(protocol) +agent.include(quota_protocol) ``` """ import functools import time -from typing import Callable, Optional, Set, Type, Union +from typing import Optional, Set, Type, Union from pydantic import BaseModel from uagents import Agent, Context, Model, Protocol from uagents.models import ErrorMessage +from uagents.storage import StorageAPI from uagents.types import MessageCallback WINDOW_SIZE_MINUTES = 60 @@ -62,10 +63,61 @@ class Usage(BaseModel): max_requests: int +class AccessControlList(BaseModel): + default: bool + allowed: list[str] + blocked: list[str] + + class QuotaProtocol(Protocol): - def __init__(self, quota_callback: Callable, *args, **kwargs): + def __init__( + self, + storage_reference: StorageAPI, + *args, + acl: Optional[AccessControlList] = None, + **kwargs, + ): super().__init__(*args, **kwargs) - self.quota_callback = quota_callback + self.storage_ref = storage_reference + self.acl = acl or AccessControlList(default=True, allowed=[], blocked=[]) + + @property + def access_control_list(self) -> AccessControlList: + return self.acl + + def set_access_control_default(self, allowed: bool = True): + """ + Set the default access control rule. + + Args: + allowed: Whether the default rule should allow or block + """ + self.acl.default = allowed + + def add_agent_to_acl(self, agent_address: str, allowed: bool = True): + """ + Add an agent to the access control list. + + Args: + agent_address: The address of the agent to add to the ACL + allowed: Whether the agent should be allowed or blocked + """ + if allowed: + self.acl.allowed.append(agent_address) + else: + self.acl.blocked.append(agent_address) + + def remove_agent_from_acl(self, agent_address: str): + """ + Remove an agent from the access control list. + + Args: + agent_address: The address of the agent to remove from the ACL + """ + if agent_address in self.acl.allowed: + self.acl.allowed.remove(agent_address) + if agent_address in self.acl.blocked: + self.acl.blocked.remove(agent_address) def on_message( self, @@ -119,7 +171,7 @@ def wrap( @functools.wraps(func) async def decorator(ctx: Context, sender: str, msg: Type[Model]): - if self.quota_callback( + if self.add_request( sender, func.__name__, window_size_minutes, max_requests ): result = await func(ctx, sender, msg) @@ -138,51 +190,6 @@ async def decorator(ctx: Context, sender: str, msg: Type[Model]): return decorator # type: ignore - -class QuotaAgent(Agent): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._protocol = QuotaProtocol( - quota_callback=self.add_request, name=self._name, version=self._version - ) - - # only necessary because this feature is not implemented in the core - @self.on_message(ErrorMessage) - async def _handle_error_message(ctx: Context, sender: str, msg: ErrorMessage): - ctx.logger.exception(f"Received error message from {sender}: {msg.error}") - - def on_message( - self, - model: Type[Model], - replies: Optional[Union[Type[Model], Set[Type[Model]]]] = None, - allow_unverified: Optional[bool] = False, - window_size_minutes: int = WINDOW_SIZE_MINUTES, - max_requests: int = MAX_REQUESTS, - ): - """ - Overwritten decorator to register an message handler for the provided - message model including rate limiting. - - Args: - model (Type[Model]): The message model. - replies (Optional[Union[Type[Model], Set[Type[Model]]]]): Optional reply models. - allow_unverified (Optional[bool]): Allow unverified messages. - window_size_minutes (int): The size of the time window in minutes. - max_requests (int): The maximum number of requests allowed in the time window. - - Returns: - Callable: The decorator function for registering message handlers. - - """ - - return self._protocol.on_message( - model, replies, allow_unverified, window_size_minutes, max_requests - ) - - def _add_error_message_handler(self): - # This would not be necessary if this feature was implemented in the core - pass - def _clean_usage(self, usage: dict[str, dict]): """ Remove all time windows that are older than the current time window. @@ -218,7 +225,7 @@ def add_request( now = int(time.time()) - usage = self.storage.get(agent_address) or {} + usage = self.storage_ref.get(agent_address) or {} if function_name in usage: quota = Usage(**usage[function_name]) @@ -240,6 +247,51 @@ def add_request( self._clean_usage(usage) - self.storage.set(agent_address, usage) + self.storage_ref.set(agent_address, usage) return True + + +class QuotaAgent(Agent): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._protocol = QuotaProtocol( + storage_reference=self.storage, name=self._name, version=self._version + ) + + # only necessary because this feature is not implemented in the core + @self.on_message(ErrorMessage) + async def _handle_error_message(ctx: Context, sender: str, msg: ErrorMessage): + ctx.logger.exception(f"Received error message from {sender}: {msg.error}") + + def on_message( + self, + model: Type[Model], + replies: Optional[Union[Type[Model], Set[Type[Model]]]] = None, + allow_unverified: Optional[bool] = False, + window_size_minutes: int = WINDOW_SIZE_MINUTES, + max_requests: int = MAX_REQUESTS, + ): + """ + Overwritten decorator to register an message handler for the provided + message model including rate limiting. + + Args: + model (Type[Model]): The message model. + replies (Optional[Union[Type[Model], Set[Type[Model]]]]): Optional reply models. + allow_unverified (Optional[bool]): Allow unverified messages. + window_size_minutes (int): The size of the time window in minutes. + max_requests (int): The maximum number of requests allowed in the time window. + + Returns: + Callable: The decorator function for registering message handlers. + + """ + + return self._protocol.on_message( + model, replies, allow_unverified, window_size_minutes, max_requests + ) + + def _add_error_message_handler(self): + # This would not be necessary if this feature was implemented in the core + pass From b0bc324d40b1c531833c6725705e96ac836ed872 Mon Sep 17 00:00:00 2001 From: Archento Date: Fri, 18 Oct 2024 09:51:57 +0200 Subject: [PATCH 4/6] add: Access control list functionality --- python/src/uagents/agent.py | 11 +- .../uagents/experimental/quota/__init__.py | 198 ++++++++---------- 2 files changed, 86 insertions(+), 123 deletions(-) diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index 0760755e1..2e7794b46 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -393,7 +393,10 @@ def __init__( logger=self._logger, ) - self._add_error_message_handler() + # define default error message handler + @self.on_message(ErrorMessage) + async def _handle_error_message(ctx: Context, sender: str, msg: ErrorMessage): + ctx.logger.exception(f"Received error message from {sender}: {msg.error}") # define default rest message handlers if agent inspector is enabled if enable_agent_inspector: @@ -414,12 +417,6 @@ async def _handle_get_messages(_ctx: Context): self._init_done = True - def _add_error_message_handler(self): - # define default error message handler - @self.on_message(ErrorMessage) - async def _handle_error_message(ctx: Context, sender: str, msg: ErrorMessage): - ctx.logger.exception(f"Received error message from {sender}: {msg.error}") - def _build_context(self) -> InternalContext: """ An internal method to build the context for the agent. diff --git a/python/src/uagents/experimental/quota/__init__.py b/python/src/uagents/experimental/quota/__init__.py index ee24c804d..fd615c900 100644 --- a/python/src/uagents/experimental/quota/__init__.py +++ b/python/src/uagents/experimental/quota/__init__.py @@ -1,5 +1,5 @@ """ -This agent class can be used to rate limit your message handlers. +This Protocol class can be used to rate limit `on_message` message handlers. The rate limiter uses the agents storage to keep track of the number of requests made by another agent within a given time window. If the number of requests exceeds @@ -8,37 +8,56 @@ > Default: 6 requests per hour -Usage examples: +Additionally, the protocol can be used to set access control rules for handlers +allowing or blocking specific agents from accessing the handler. +The default access control rule can be set to allow or block all agents. -This message handler is rate limited with default values: -```python -@agent.on_message(ExampleMessage) -async def handle(ctx: Context, sender: str, msg: ExampleMessage): - ... -``` +Both rules can work together to provide a secure and rate-limited environment for +message handlers. -This message handler is rate limited with custom window size and request limit: -```python -@agent.on_message(ExampleMessage, window_size_minutes=30, max_requests=10) -async def handle(ctx: Context, sender: str, msg: ExampleMessage): - ... -``` -Also applies to protocol message handlers if you import the QuotaProtocol class -and include it in your agent: +Usage examples: + ```python +from uagents.experimental.quota import AccessControlList, QuotaProtocol, RateLimit + quota_protocol = QuotaProtocol( storage_reference=agent.storage, name="quota_proto", version=agent._version, -) +) # Initialize the QuotaProtocol instance + +# This message handler is rate limited with default values +@quota_protocol.on_message(ExampleMessage1) +async def handle(ctx: Context, sender: str, msg: ExampleMessage1): + ... + +# This message handler is rate limited with custom window size and request limit +@agent.on_message(ExampleMessage2, rate_limit=RateLimit(window_size_minutes=1, max_requests=3)) +async def handle(ctx: Context, sender: str, msg: ExampleMessage2): + ... -@quota_protocol.on_message(ExampleMessage) -async def handle(ctx: Context, sender: str, msg: ExampleMessage): +# This message handler has access control rules set +@agent.on_message(ExampleMessage2, acl=AccessControlList(default=False, allowed={"agent1"})) +async def handle(ctx: Context, sender: str, msg: ExampleMessage2): ... agent.include(quota_protocol) ``` + +Tip: The `AccessControlList` object can be used to set access control rules during +runtime. This can be useful for dynamic access control rules based on the state of the +agent or the network. +```python +acl = AccessControlList(default=True, allowed={""}, blocked={""}) + +@proto.on_message(model=Message, access_control_list=acl) +async def message_handler(ctx: Context, sender: str, msg: Message): + if REASON_TO_BLOCK: + acl.blocked.add(sender) + ctx.logger.info(f"Received message from {sender}: {msg.text}") +``` + """ import functools @@ -47,7 +66,7 @@ async def handle(ctx: Context, sender: str, msg: ExampleMessage): from pydantic import BaseModel -from uagents import Agent, Context, Model, Protocol +from uagents import Context, Model, Protocol from uagents.models import ErrorMessage from uagents.storage import StorageAPI from uagents.types import MessageCallback @@ -63,69 +82,47 @@ class Usage(BaseModel): max_requests: int +class RateLimit(BaseModel): + window_size_minutes: int + max_requests: int + + class AccessControlList(BaseModel): default: bool - allowed: list[str] - blocked: list[str] + allowed: set[str] + blocked: set[str] class QuotaProtocol(Protocol): def __init__( self, storage_reference: StorageAPI, - *args, - acl: Optional[AccessControlList] = None, - **kwargs, + name: Optional[str] = None, + version: Optional[str] = None, + default_rate_limit: Optional[RateLimit] = None, ): - super().__init__(*args, **kwargs) - self.storage_ref = storage_reference - self.acl = acl or AccessControlList(default=True, allowed=[], blocked=[]) - - @property - def access_control_list(self) -> AccessControlList: - return self.acl - - def set_access_control_default(self, allowed: bool = True): - """ - Set the default access control rule. - - Args: - allowed: Whether the default rule should allow or block """ - self.acl.default = allowed - - def add_agent_to_acl(self, agent_address: str, allowed: bool = True): - """ - Add an agent to the access control list. + Initialize a QuotaProtocol instance. Args: - agent_address: The address of the agent to add to the ACL - allowed: Whether the agent should be allowed or blocked + storage_reference (StorageAPI): The storage reference to use for rate limiting. + name (Optional[str], optional): The name of the protocol. Defaults to None. + version (Optional[str], optional): The version of the protocol. Defaults to None. + acl (Optional[AccessControlList], optional): The access control list. Defaults to None. """ - if allowed: - self.acl.allowed.append(agent_address) - else: - self.acl.blocked.append(agent_address) - - def remove_agent_from_acl(self, agent_address: str): - """ - Remove an agent from the access control list. - - Args: - agent_address: The address of the agent to remove from the ACL - """ - if agent_address in self.acl.allowed: - self.acl.allowed.remove(agent_address) - if agent_address in self.acl.blocked: - self.acl.blocked.remove(agent_address) + super().__init__(name=name, version=version) + self.storage_ref = storage_reference + self.default_rate_limit = default_rate_limit or RateLimit( + window_size_minutes=WINDOW_SIZE_MINUTES, max_requests=MAX_REQUESTS + ) def on_message( self, model: Type[Model], replies: Optional[Union[Type[Model], Set[Type[Model]]]] = None, allow_unverified: Optional[bool] = False, - window_size_minutes: int = WINDOW_SIZE_MINUTES, - max_requests: int = MAX_REQUESTS, + rate_limit: Optional[RateLimit] = None, + access_control_list: Optional[AccessControlList] = None, ): """ Overwritten decorator to register a message handler for the protocol @@ -145,7 +142,7 @@ def on_message( """ def decorator_on_message(func: MessageCallback): - handler = self.wrap(func, window_size_minutes, max_requests) + handler = self.wrap(func, rate_limit, access_control_list) self._add_message_handler(model, handler, replies, allow_unverified) return handler @@ -154,8 +151,8 @@ def decorator_on_message(func: MessageCallback): def wrap( self, func: MessageCallback, - window_size_minutes: int = WINDOW_SIZE_MINUTES, - max_requests: int = MAX_REQUESTS, + rate_limit: Optional[RateLimit] = None, + acl: Optional[AccessControlList] = None, ) -> MessageCallback: """ Decorator to wrap a function with rate limiting. @@ -168,11 +165,25 @@ def wrap( Returns: Callable: The decorated """ + if acl is None: + acl = AccessControlList(default=True, allowed=set(), blocked=set()) @functools.wraps(func) async def decorator(ctx: Context, sender: str, msg: Type[Model]): - if self.add_request( - sender, func.__name__, window_size_minutes, max_requests + if (acl.default and sender in acl.blocked) or ( + not acl.default and sender not in acl.allowed + ): + return await ctx.send( + sender, + ErrorMessage( + error=("You are not allowed to access this endpoint.") + ), + ) + if not rate_limit or self.add_request( + sender, + func.__name__, + rate_limit.window_size_minutes, + rate_limit.max_requests, ): result = await func(ctx, sender, msg) else: @@ -181,8 +192,8 @@ async def decorator(ctx: Context, sender: str, msg: Type[Model]): ErrorMessage( error=( f"Rate limit exceeded for {msg.schema()["title"]}. " - f"This endpoint allows for {max_requests} calls per " - f"{window_size_minutes} minutes. Try again later." + f"This endpoint allows for {rate_limit.max_requests} calls per " + f"{rate_limit. window_size_minutes} minutes. Try again later." ) ), ) @@ -250,48 +261,3 @@ def add_request( self.storage_ref.set(agent_address, usage) return True - - -class QuotaAgent(Agent): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._protocol = QuotaProtocol( - storage_reference=self.storage, name=self._name, version=self._version - ) - - # only necessary because this feature is not implemented in the core - @self.on_message(ErrorMessage) - async def _handle_error_message(ctx: Context, sender: str, msg: ErrorMessage): - ctx.logger.exception(f"Received error message from {sender}: {msg.error}") - - def on_message( - self, - model: Type[Model], - replies: Optional[Union[Type[Model], Set[Type[Model]]]] = None, - allow_unverified: Optional[bool] = False, - window_size_minutes: int = WINDOW_SIZE_MINUTES, - max_requests: int = MAX_REQUESTS, - ): - """ - Overwritten decorator to register an message handler for the provided - message model including rate limiting. - - Args: - model (Type[Model]): The message model. - replies (Optional[Union[Type[Model], Set[Type[Model]]]]): Optional reply models. - allow_unverified (Optional[bool]): Allow unverified messages. - window_size_minutes (int): The size of the time window in minutes. - max_requests (int): The maximum number of requests allowed in the time window. - - Returns: - Callable: The decorator function for registering message handlers. - - """ - - return self._protocol.on_message( - model, replies, allow_unverified, window_size_minutes, max_requests - ) - - def _add_error_message_handler(self): - # This would not be necessary if this feature was implemented in the core - pass From afbdf17538a5104ceb942e6a782e5ee0b13db439 Mon Sep 17 00:00:00 2001 From: Archento Date: Fri, 18 Oct 2024 10:06:54 +0200 Subject: [PATCH 5/6] small fix --- .../uagents/experimental/quota/__init__.py | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/python/src/uagents/experimental/quota/__init__.py b/python/src/uagents/experimental/quota/__init__.py index fd615c900..b5462ca38 100644 --- a/python/src/uagents/experimental/quota/__init__.py +++ b/python/src/uagents/experimental/quota/__init__.py @@ -6,7 +6,7 @@ a specified limit, the rate limiter will block further requests until the time window resets. -> Default: 6 requests per hour +> Default: Not rate limited, but you can set a default during initialization. Additionally, the protocol can be used to set access control rules for handlers allowing or blocking specific agents from accessing the handler. @@ -21,25 +21,33 @@ ```python from uagents.experimental.quota import AccessControlList, QuotaProtocol, RateLimit +# Initialize the QuotaProtocol instance quota_protocol = QuotaProtocol( storage_reference=agent.storage, name="quota_proto", version=agent._version, -) # Initialize the QuotaProtocol instance + # default_rate_limit=RateLimit(window_size_minutes=1, max_requests=3), # Optional +) -# This message handler is rate limited with default values +# This message handler is not rate limited @quota_protocol.on_message(ExampleMessage1) async def handle(ctx: Context, sender: str, msg: ExampleMessage1): ... # This message handler is rate limited with custom window size and request limit -@agent.on_message(ExampleMessage2, rate_limit=RateLimit(window_size_minutes=1, max_requests=3)) +@quota_protocol.on_message( + ExampleMessage2, + rate_limit=RateLimit(window_size_minutes=1, max_requests=3), +) async def handle(ctx: Context, sender: str, msg: ExampleMessage2): ... # This message handler has access control rules set -@agent.on_message(ExampleMessage2, acl=AccessControlList(default=False, allowed={"agent1"})) -async def handle(ctx: Context, sender: str, msg: ExampleMessage2): +@quota_protocol.on_message( + ExampleMessage3, + acl=AccessControlList(default=False, allowed={""}), +) +async def handle(ctx: Context, sender: str, msg: ExampleMessage3): ... agent.include(quota_protocol) @@ -112,9 +120,7 @@ def __init__( """ super().__init__(name=name, version=version) self.storage_ref = storage_reference - self.default_rate_limit = default_rate_limit or RateLimit( - window_size_minutes=WINDOW_SIZE_MINUTES, max_requests=MAX_REQUESTS - ) + self.default_rate_limit = default_rate_limit def on_message( self, @@ -134,8 +140,9 @@ def on_message( reply types. Defaults to None. allow_unverified (Optional[bool], optional): Whether to allow unverified messages. Defaults to False. - window_size_minutes (int, optional): The size of the time window in minutes. - max_requests (int, optional): The maximum number of requests allowed in the time window. + rate_limit (Optional[RateLimit], optional): The rate limit to apply. Defaults to None. + access_control_list (Optional[AccessControlList], optional): The access control list to + apply. Returns: Callable: The decorator to register the message handler. @@ -159,8 +166,8 @@ def wrap( Args: func: The function to wrap with rate limiting - window_size_minutes: The size of the time window in minutes - max_requests: The maximum number of requests allowed in the time window + rate_limit: The rate limit to apply + acl: The access control list to apply Returns: Callable: The decorated @@ -168,6 +175,8 @@ def wrap( if acl is None: acl = AccessControlList(default=True, allowed=set(), blocked=set()) + rate_limit = rate_limit or self.default_rate_limit + @functools.wraps(func) async def decorator(ctx: Context, sender: str, msg: Type[Model]): if (acl.default and sender in acl.blocked) or ( From 96c8ff4b6429e5c8eff3a5785a8358b3c21f7fd3 Mon Sep 17 00:00:00 2001 From: Archento Date: Fri, 18 Oct 2024 18:15:19 +0200 Subject: [PATCH 6/6] Update python/src/uagents/experimental/quota/__init__.py Co-authored-by: James Riehl <33920192+jrriehl@users.noreply.github.com> --- python/src/uagents/experimental/quota/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/src/uagents/experimental/quota/__init__.py b/python/src/uagents/experimental/quota/__init__.py index b5462ca38..7b48bad09 100644 --- a/python/src/uagents/experimental/quota/__init__.py +++ b/python/src/uagents/experimental/quota/__init__.py @@ -202,7 +202,7 @@ async def decorator(ctx: Context, sender: str, msg: Type[Model]): error=( f"Rate limit exceeded for {msg.schema()["title"]}. " f"This endpoint allows for {rate_limit.max_requests} calls per " - f"{rate_limit. window_size_minutes} minutes. Try again later." + f"{rate_limit.window_size_minutes} minutes. Try again later." ) ), )