diff --git a/denonavr/api.py b/denonavr/api.py index e737b02..3376268 100644 --- a/denonavr/api.py +++ b/denonavr/api.py @@ -47,6 +47,7 @@ AvrProcessingError, AvrTimoutError, ) +from .rate_limiter import AdaptiveLimiter if sys.version_info[:2] < (3, 11): from async_timeout import timeout as asyncio_timeout @@ -83,6 +84,11 @@ class HTTPXAsyncClient: init=False, ) + rate_limiter: AdaptiveLimiter = attr.ib( + validator=attr.validators.instance_of(AdaptiveLimiter), + default=attr.Factory(AdaptiveLimiter), + ) + def __hash__(self) -> int: """Hash the class using its ID that caching works.""" return id(self) @@ -92,14 +98,20 @@ def __hash__(self) -> int: async def async_get( self, url: str, + rate_limit_key: str, timeout: float, read_timeout: float, *, cache_id: Hashable = None, + record_latency: bool = True, + skip_rate_limiter: bool = False, ) -> httpx.Response: """Call GET endpoint of Denon AVR receiver asynchronously.""" client = self.client_getter() + start = time.monotonic() try: + if not skip_rate_limiter: + await self.rate_limiter.acquire(rate_limit_key) async with client.stream( "GET", url, timeout=httpx.Timeout(timeout, read=read_timeout) ) as res: @@ -110,6 +122,8 @@ async def async_get( if self.is_default_async_client(): await client.aclose() + if record_latency: + self.rate_limiter.record_latency(rate_limit_key, start) return res @cache_result @@ -117,16 +131,22 @@ async def async_get( async def async_post( self, url: str, + rate_limit_key: str, timeout: float, read_timeout: float, *, content: Optional[bytes] = None, data: Optional[Dict] = None, cache_id: Hashable = None, + record_latency: bool = True, + skip_rate_limiter: bool = False, ) -> httpx.Response: """Call POST endpoint of Denon AVR receiver asynchronously.""" client = self.client_getter() + start = time.monotonic() try: + if not skip_rate_limiter: + await self.rate_limiter.acquire(rate_limit_key) async with client.stream( "POST", url, @@ -141,12 +161,18 @@ async def async_post( if self.is_default_async_client(): await client.aclose() + if record_latency: + self.rate_limiter.record_latency(rate_limit_key, start) return res def is_default_async_client(self) -> bool: """Check if default httpx.AsyncClient getter is used.""" return self.client_getter is get_default_async_client + async def aclose(self): + """Close rate limiter when done.""" + await self.rate_limiter.aclose() + @attr.s(auto_attribs=True, on_setattr=DENON_ATTR_SETATTR) class DenonAVRApi: @@ -175,6 +201,7 @@ class DenonAVRApi: default=attr.Factory(HTTPXAsyncClient), init=False, ) + _http_callback_tasks: Set[asyncio.Task] = attr.ib(default=attr.Factory(set)) async def async_get( self, @@ -182,6 +209,9 @@ async def async_get( *, port: Optional[int] = None, cache_id: Hashable = None, + skip_confirmation: bool = False, + record_latency: bool = True, + skip_rate_limiter: bool = False, ) -> httpx.Response: """Call GET endpoint of Denon AVR receiver asynchronously.""" # Use default port of the receiver if no different port is specified @@ -189,8 +219,31 @@ async def async_get( endpoint = f"http://{self.host}:{port}{request}" + if skip_confirmation: + task = asyncio.create_task( + self.httpx_async_client.async_get( + endpoint, + self.host, + self.timeout, + self.read_timeout, + cache_id=cache_id, + record_latency=False, + skip_rate_limiter=skip_rate_limiter, + ) + ) + task.add_done_callback( + self._http_callback_tasks.discard + ) # Prevent garbage collection + return httpx.Response(200, text="") + return await self.httpx_async_client.async_get( - endpoint, self.timeout, self.read_timeout, cache_id=cache_id + endpoint, + self.host, + self.timeout, + self.read_timeout, + cache_id=cache_id, + record_latency=record_latency, + skip_rate_limiter=skip_rate_limiter, ) async def async_post( @@ -210,17 +263,29 @@ async def async_post( return await self.httpx_async_client.async_post( endpoint, + self.host, self.timeout, self.read_timeout, content=content, data=data, cache_id=cache_id, + record_latency=False, + skip_rate_limiter=True, ) - async def async_get_command(self, request: str) -> str: + async def async_get_command( + self, + request: str, + skip_confirmation: bool = False, + skip_rate_limiter: bool = False, + ) -> str: """Send HTTP GET command to Denon AVR receiver asynchronously.""" # HTTP GET to endpoint - res = await self.async_get(request) + res = await self.async_get( + request, + skip_confirmation=skip_confirmation, + skip_rate_limiter=skip_rate_limiter, + ) # Return text return res.text @@ -229,7 +294,9 @@ async def async_get_xml( ) -> ET.Element: """Return XML data from HTTP GET endpoint asynchronously.""" # HTTP GET to endpoint - res = await self.async_get(request, cache_id=cache_id) + res = await self.async_get( + request, cache_id=cache_id, record_latency=False, skip_rate_limiter=True + ) # create ElementTree try: xml_root = fromstring(res.text) @@ -494,6 +561,11 @@ class DenonAVRTelnetApi: default=attr.Factory(list), init=False, ) + _rate_limiter: AdaptiveLimiter = attr.ib( + validator=attr.validators.instance_of(AdaptiveLimiter), + default=attr.Factory(AdaptiveLimiter), + init=False, + ) _update_callback_tasks: Set[asyncio.Task] = attr.ib(default=attr.Factory(set)) def __attrs_post_init__(self) -> None: @@ -635,6 +707,8 @@ async def _async_trigger_updates(self) -> None: await self.async_send_commands( *commands, confirmation_timeout=0.2, + skip_rate_limiter=True, + record_latency=False, ) def _schedule_monitor(self) -> None: @@ -872,6 +946,9 @@ async def _async_send_command( command: str, skip_confirmation: bool = False, confirmation_timeout: Optional[float] = None, + *, + record_latency: bool = True, + skip_rate_limiter: bool = False, ) -> None: """Send one telnet command to the receiver.""" if confirmation_timeout is None: @@ -886,6 +963,9 @@ async def _async_send_command( f"Error sending command {command}. Telnet connected: " f"{self.connected}, Connection healthy: {self.healthy}" ) + start = time.monotonic() + if not skip_rate_limiter: + await self._rate_limiter.acquire(self.host) self._protocol.write(f"{command}\r") if not skip_confirmation: try: @@ -898,6 +978,8 @@ async def _async_send_command( "Timeout waiting for confirmation of command: %s", command ) finally: + if record_latency: + self._rate_limiter.record_latency(self.host, start) self._send_confirmation_command = "" async def async_send_commands( @@ -905,6 +987,8 @@ async def async_send_commands( *commands: str, skip_confirmation: bool = False, confirmation_timeout: Optional[float] = None, + record_latency: bool = True, + skip_rate_limiter: bool = False, ) -> None: """Send telnet commands to the receiver.""" for command in commands: @@ -912,6 +996,8 @@ async def async_send_commands( command, skip_confirmation=skip_confirmation, confirmation_timeout=confirmation_timeout, + record_latency=record_latency, + skip_rate_limiter=skip_rate_limiter, ) def send_commands( diff --git a/denonavr/foundation.py b/denonavr/foundation.py index 7ee6c42..16f5bbd 100644 --- a/denonavr/foundation.py +++ b/denonavr/foundation.py @@ -655,7 +655,9 @@ async def async_get_device_info(self) -> None: device_info = None try: - res = await self.api.async_get(command, port=port) + res = await self.api.async_get( + command, port=port, record_latency=False, skip_rate_limiter=True + ) except AvrTimoutError as err: _LOGGER.debug("Timeout when getting device info: %s", err) raise @@ -1085,7 +1087,9 @@ async def async_cursor_up(self) -> None: self.telnet_commands.command_cusor_up, skip_confirmation=True ) else: - await self.api.async_get_command(self.urls.command_cusor_up) + await self.api.async_get_command( + self.urls.command_cusor_up, skip_confirmation=True + ) async def async_cursor_down(self) -> None: """Cursor Down on receiver.""" @@ -1094,7 +1098,9 @@ async def async_cursor_down(self) -> None: self.telnet_commands.command_cusor_down, skip_confirmation=True ) else: - await self.api.async_get_command(self.urls.command_cusor_down) + await self.api.async_get_command( + self.urls.command_cusor_down, skip_confirmation=True + ) async def async_cursor_left(self) -> None: """Cursor Left on receiver.""" @@ -1103,7 +1109,9 @@ async def async_cursor_left(self) -> None: self.telnet_commands.command_cusor_left, skip_confirmation=True ) else: - await self.api.async_get_command(self.urls.command_cusor_left) + await self.api.async_get_command( + self.urls.command_cusor_left, skip_confirmation=True + ) async def async_cursor_right(self) -> None: """Cursor Right on receiver.""" @@ -1112,7 +1120,9 @@ async def async_cursor_right(self) -> None: self.telnet_commands.command_cusor_right, skip_confirmation=True ) else: - await self.api.async_get_command(self.urls.command_cusor_right) + await self.api.async_get_command( + self.urls.command_cusor_right, skip_confirmation=True + ) async def async_cursor_enter(self) -> None: """Cursor Enter on receiver.""" @@ -1121,7 +1131,9 @@ async def async_cursor_enter(self) -> None: self.telnet_commands.command_cusor_enter, skip_confirmation=True ) else: - await self.api.async_get_command(self.urls.command_cusor_enter) + await self.api.async_get_command( + self.urls.command_cusor_enter, skip_confirmation=True + ) async def async_back(self) -> None: """Back command on receiver.""" @@ -1130,7 +1142,9 @@ async def async_back(self) -> None: self.telnet_commands.command_back, skip_confirmation=True ) else: - await self.api.async_get_command(self.urls.command_back) + await self.api.async_get_command( + self.urls.command_back, skip_confirmation=True + ) async def async_info(self) -> None: """Info OSD on receiver.""" @@ -1139,7 +1153,9 @@ async def async_info(self) -> None: self.telnet_commands.command_info, skip_confirmation=True ) else: - await self.api.async_get_command(self.urls.command_info) + await self.api.async_get_command( + self.urls.command_info, skip_confirmation=True + ) async def async_options(self) -> None: """Options menu on receiver.""" @@ -1148,7 +1164,9 @@ async def async_options(self) -> None: self.telnet_commands.command_options, skip_confirmation=True ) else: - await self.api.async_get_command(self.urls.command_options) + await self.api.async_get_command( + self.urls.command_options, skip_confirmation=True + ) async def async_settings_menu(self) -> None: """ diff --git a/denonavr/rate_limiter.py b/denonavr/rate_limiter.py new file mode 100644 index 0000000..fd19376 --- /dev/null +++ b/denonavr/rate_limiter.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Adaptive, protocol-agnostic per-destination rate limiter built on aiolimiter. + +Single-process only. Excludes fire-and-forget calls (skip_confirmation) from RTT stats. +""" + +from __future__ import annotations + +import asyncio +import contextlib +import time +from typing import Dict, Optional + +from aiolimiter import AsyncLimiter + + +class EWMALatency: + """Track EWMA of response times per destination.""" + + def __init__(self, alpha: float = 0.2) -> None: + """Initialize EWMA response times tracker.""" + self.alpha = alpha + self._value: Optional[float] = None + + def update(self, sample: float) -> float: + """Update EWMA with new sample and return current value.""" + if sample < 0: + raise ValueError("Latency sample must be non-negative") + if self._value is None: + self._value = sample + else: + self._value = self.alpha * sample + (1 - self.alpha) * self._value + return self._value + + @property + def value(self) -> float: + """Return current EWMA value, or 0.0 if uninitialized.""" + return self._value or 0.0 + + +class AdaptiveLimiter: + """ + Adaptive token-bucket rate limiter per key. + + Parameters: + - initial_wait_ms: initial wait time between requests in milliseconds (> 0) + - min_wait_ms: minimum wait time between requests in milliseconds (> 0) + - max_wait_ms: maximum wait time between requests in milliseconds (>= min_wait_ms) + - k: scaling constant used in target_rate = k / avg_rtt (> 0) + - adjust_interval: seconds between rate recalculations (> 0) + - alpha: EWMA smoothing factor in (0,1] + + Behavior: + - The limiter updates each destination's rate every adjust_interval seconds + based on the EWMA of observed latencies (excluding skip_confirmation calls). + - Burst protection is inherent in the token bucket model. + """ + + def __init__( + self, + *, + initial_wait_ms: float = 100.0, + min_wait_ms: float = 100.0, + max_wait_ms: float = 200.0, + k: float = 2.0, + adjust_interval: float = 2.0, + alpha: float = 0.2, + ) -> None: + """Initialize AdaptiveLimiter with given parameters.""" + # Validate wait params + if initial_wait_ms <= 0 or min_wait_ms <= 0 or max_wait_ms <= 0: + raise ValueError("wait values must be > 0") + if min_wait_ms > max_wait_ms: + raise ValueError("min_wait_ms must be <= max_wait_ms") + + # Validate EWMA and adjust params + if k <= 0: + raise ValueError("k must be > 0") + if adjust_interval <= 0: + raise ValueError("adjust_interval must be > 0") + if not 0 < alpha <= 1: + raise ValueError("alpha must be in (0, 1]") + + # Convert waits to rates (req/s): rate = 1000 / wait_ms + initial_rate = 1000.0 / initial_wait_ms + self._min_rate = 1000.0 / max_wait_ms # max wait -> min rate + self._max_rate = 1000.0 / min_wait_ms # min wait -> max rate + self._initial_rate = float(initial_rate) + + self._k = float(k) + self._adjust_interval = float(adjust_interval) + self._alpha = float(alpha) + self._limiters: Dict[str, AsyncLimiter] = {} + self._latencies: Dict[str, EWMALatency] = {} + self._locks: Dict[str, asyncio.Lock] = {} + self._tasks: Dict[str, asyncio.Task] = {} + self._init_lock = asyncio.Lock() + self._shutdown_event = asyncio.Event() + + def _target_rate(self, avg_rtt: float) -> float: + if avg_rtt <= 0: + return self._initial_rate + target = self._k / avg_rtt + return max(self._min_rate, min(self._max_rate, target)) + + async def _ensure_key(self, key: str) -> None: + # Fast path: key already exists + if key in self._limiters: + return + + # Slow path: need to initialize, use lock to prevent duplicate initialization + async with self._init_lock: + # Double-check after acquiring lock (another coroutine may have initialized) + if key not in self._limiters: + # Start limiter at configured initial rate + self._limiters[key] = AsyncLimiter(self._initial_rate, time_period=1) + self._latencies[key] = EWMALatency(self._alpha) + self._locks[key] = asyncio.Lock() + # Background adjuster + self._tasks[key] = asyncio.create_task(self._adjust_loop(key)) + + async def _adjust_loop(self, key: str) -> None: + try: + while not self._shutdown_event.is_set(): + await asyncio.sleep(self._adjust_interval) + if self._shutdown_event.is_set(): + break + avg = self._latencies[key].value + new_rate = self._target_rate(avg) + # AsyncLimiter does not expose direct rate mutation; + # recreate it atomically under a lock to avoid races. + async with self._locks[key]: + self._limiters[key] = AsyncLimiter(new_rate, time_period=1) + except asyncio.CancelledError: + pass + + async def acquire(self, destination: str) -> None: + """Acquire a token for the given destination.""" + await self._ensure_key(destination) + # Acquire with current limiter. Use lock to avoid swap race. + async with self._locks[destination]: + limiter = self._limiters[destination] + await limiter.acquire() + + def record_latency(self, destination: str, start: float) -> None: + """ + Record RTT after a call. + + Safe to call even if key doesn't exist yet; if the key is not present, + the call is silently ignored and no action is taken. + """ + seconds = time.monotonic() - start + # Only record if destination already initialized + if latency_tracker := self._latencies.get(destination): + latency_tracker.update(seconds) + + async def aclose(self) -> None: + """Clean up background tasks.""" + # Cancel background tasks + self._shutdown_event.set() + for task in list(self._tasks.values()): + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + self._tasks.clear() diff --git a/denonavr/volume.py b/denonavr/volume.py index 8f1b874..e95bfdd 100644 --- a/denonavr/volume.py +++ b/denonavr/volume.py @@ -310,7 +310,7 @@ async def async_volume_up(self) -> None: ) else: await self._device.api.async_get_command( - self._device.urls.command_volume_up + self._device.urls.command_volume_up, skip_confirmation=True ) async def async_volume_down(self) -> None: @@ -321,7 +321,7 @@ async def async_volume_down(self) -> None: ) else: await self._device.api.async_get_command( - self._device.urls.command_volume_down + self._device.urls.command_volume_down, skip_confirmation=True ) async def async_set_volume(self, volume: float) -> None: diff --git a/requirements.txt b/requirements.txt index dc050a4..8231af1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ ftfy>=6.1.1 httpx>=0.21.0 netifaces>=0.11.0 async-timeout>=4.0.2; python_version < "3.11" +aiolimiter>=1.2.1