Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 90 additions & 4 deletions denonavr/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
AvrProcessingError,
AvrTimoutError,
)
from .rate_limiter import AdaptiveLimiter

if sys.version_info[:2] < (3, 11):
from async_timeout import timeout as asyncio_timeout
Expand Down Expand Up @@ -83,6 +84,11 @@ class HTTPXAsyncClient:
init=False,
)

rate_limiter: AdaptiveLimiter = attr.ib(
validator=attr.validators.instance_of(AdaptiveLimiter),
default=attr.Factory(AdaptiveLimiter),
)

def __hash__(self) -> int:
"""Hash the class using its ID that caching works."""
return id(self)
Expand All @@ -92,14 +98,20 @@ def __hash__(self) -> int:
async def async_get(
self,
url: str,
rate_limit_key: str,
timeout: float,
read_timeout: float,
*,
cache_id: Hashable = None,
record_latency: bool = True,
skip_rate_limiter: bool = False,
) -> httpx.Response:
"""Call GET endpoint of Denon AVR receiver asynchronously."""
client = self.client_getter()
start = time.monotonic()
try:
if not skip_rate_limiter:
await self.rate_limiter.acquire(rate_limit_key)
async with client.stream(
"GET", url, timeout=httpx.Timeout(timeout, read=read_timeout)
) as res:
Expand All @@ -110,23 +122,31 @@ async def async_get(
if self.is_default_async_client():
await client.aclose()

if record_latency:
self.rate_limiter.record_latency(rate_limit_key, start)
return res

@cache_result
@async_handle_receiver_exceptions
async def async_post(
self,
url: str,
rate_limit_key: str,
timeout: float,
read_timeout: float,
*,
content: Optional[bytes] = None,
data: Optional[Dict] = None,
cache_id: Hashable = None,
record_latency: bool = True,
skip_rate_limiter: bool = False,
) -> httpx.Response:
"""Call POST endpoint of Denon AVR receiver asynchronously."""
client = self.client_getter()
start = time.monotonic()
try:
if not skip_rate_limiter:
await self.rate_limiter.acquire(rate_limit_key)
async with client.stream(
"POST",
url,
Expand All @@ -141,12 +161,18 @@ async def async_post(
if self.is_default_async_client():
await client.aclose()

if record_latency:
self.rate_limiter.record_latency(rate_limit_key, start)
return res

def is_default_async_client(self) -> bool:
"""Check if default httpx.AsyncClient getter is used."""
return self.client_getter is get_default_async_client

async def aclose(self):
"""Close rate limiter when done."""
await self.rate_limiter.aclose()


@attr.s(auto_attribs=True, on_setattr=DENON_ATTR_SETATTR)
class DenonAVRApi:
Expand Down Expand Up @@ -175,22 +201,49 @@ class DenonAVRApi:
default=attr.Factory(HTTPXAsyncClient),
init=False,
)
_http_callback_tasks: Set[asyncio.Task] = attr.ib(default=attr.Factory(set))

async def async_get(
self,
request: str,
*,
port: Optional[int] = None,
cache_id: Hashable = None,
skip_confirmation: bool = False,
record_latency: bool = True,
skip_rate_limiter: bool = False,
) -> httpx.Response:
"""Call GET endpoint of Denon AVR receiver asynchronously."""
# Use default port of the receiver if no different port is specified
port = port if port is not None else self.port

endpoint = f"http://{self.host}:{port}{request}"

if skip_confirmation:
task = asyncio.create_task(
self.httpx_async_client.async_get(
endpoint,
self.host,
self.timeout,
self.read_timeout,
cache_id=cache_id,
record_latency=False,
skip_rate_limiter=skip_rate_limiter,
)
)
task.add_done_callback(
self._http_callback_tasks.discard
) # Prevent garbage collection
return httpx.Response(200, text="")

return await self.httpx_async_client.async_get(
endpoint, self.timeout, self.read_timeout, cache_id=cache_id
endpoint,
self.host,
self.timeout,
self.read_timeout,
cache_id=cache_id,
record_latency=record_latency,
skip_rate_limiter=skip_rate_limiter,
)

async def async_post(
Expand All @@ -210,17 +263,29 @@ async def async_post(

return await self.httpx_async_client.async_post(
endpoint,
self.host,
self.timeout,
self.read_timeout,
content=content,
data=data,
cache_id=cache_id,
record_latency=False,
skip_rate_limiter=True,
)

async def async_get_command(self, request: str) -> str:
async def async_get_command(
self,
request: str,
skip_confirmation: bool = False,
skip_rate_limiter: bool = False,
) -> str:
"""Send HTTP GET command to Denon AVR receiver asynchronously."""
# HTTP GET to endpoint
res = await self.async_get(request)
res = await self.async_get(
request,
skip_confirmation=skip_confirmation,
skip_rate_limiter=skip_rate_limiter,
)
# Return text
return res.text

Expand All @@ -229,7 +294,9 @@ async def async_get_xml(
) -> ET.Element:
"""Return XML data from HTTP GET endpoint asynchronously."""
# HTTP GET to endpoint
res = await self.async_get(request, cache_id=cache_id)
res = await self.async_get(
request, cache_id=cache_id, record_latency=False, skip_rate_limiter=True
)
# create ElementTree
try:
xml_root = fromstring(res.text)
Expand Down Expand Up @@ -494,6 +561,11 @@ class DenonAVRTelnetApi:
default=attr.Factory(list),
init=False,
)
_rate_limiter: AdaptiveLimiter = attr.ib(
validator=attr.validators.instance_of(AdaptiveLimiter),
default=attr.Factory(AdaptiveLimiter),
init=False,
)
_update_callback_tasks: Set[asyncio.Task] = attr.ib(default=attr.Factory(set))

def __attrs_post_init__(self) -> None:
Expand Down Expand Up @@ -635,6 +707,8 @@ async def _async_trigger_updates(self) -> None:
await self.async_send_commands(
*commands,
confirmation_timeout=0.2,
skip_rate_limiter=True,
record_latency=False,
)

def _schedule_monitor(self) -> None:
Expand Down Expand Up @@ -872,6 +946,9 @@ async def _async_send_command(
command: str,
skip_confirmation: bool = False,
confirmation_timeout: Optional[float] = None,
*,
record_latency: bool = True,
skip_rate_limiter: bool = False,
) -> None:
"""Send one telnet command to the receiver."""
if confirmation_timeout is None:
Expand All @@ -886,6 +963,9 @@ async def _async_send_command(
f"Error sending command {command}. Telnet connected: "
f"{self.connected}, Connection healthy: {self.healthy}"
)
start = time.monotonic()
if not skip_rate_limiter:
await self._rate_limiter.acquire(self.host)
self._protocol.write(f"{command}\r")
if not skip_confirmation:
try:
Expand All @@ -898,20 +978,26 @@ async def _async_send_command(
"Timeout waiting for confirmation of command: %s", command
)
finally:
if record_latency:
self._rate_limiter.record_latency(self.host, start)
self._send_confirmation_command = ""

async def async_send_commands(
self,
*commands: str,
skip_confirmation: bool = False,
confirmation_timeout: Optional[float] = None,
record_latency: bool = True,
skip_rate_limiter: bool = False,
) -> None:
"""Send telnet commands to the receiver."""
for command in commands:
await self._async_send_command(
command,
skip_confirmation=skip_confirmation,
confirmation_timeout=confirmation_timeout,
record_latency=record_latency,
skip_rate_limiter=skip_rate_limiter,
)

def send_commands(
Expand Down
36 changes: 27 additions & 9 deletions denonavr/foundation.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,9 @@ async def async_get_device_info(self) -> None:

device_info = None
try:
res = await self.api.async_get(command, port=port)
res = await self.api.async_get(
command, port=port, record_latency=False, skip_rate_limiter=True
)
except AvrTimoutError as err:
_LOGGER.debug("Timeout when getting device info: %s", err)
raise
Expand Down Expand Up @@ -1085,7 +1087,9 @@ async def async_cursor_up(self) -> None:
self.telnet_commands.command_cusor_up, skip_confirmation=True
)
else:
await self.api.async_get_command(self.urls.command_cusor_up)
await self.api.async_get_command(
self.urls.command_cusor_up, skip_confirmation=True
)

async def async_cursor_down(self) -> None:
"""Cursor Down on receiver."""
Expand All @@ -1094,7 +1098,9 @@ async def async_cursor_down(self) -> None:
self.telnet_commands.command_cusor_down, skip_confirmation=True
)
else:
await self.api.async_get_command(self.urls.command_cusor_down)
await self.api.async_get_command(
self.urls.command_cusor_down, skip_confirmation=True
)

async def async_cursor_left(self) -> None:
"""Cursor Left on receiver."""
Expand All @@ -1103,7 +1109,9 @@ async def async_cursor_left(self) -> None:
self.telnet_commands.command_cusor_left, skip_confirmation=True
)
else:
await self.api.async_get_command(self.urls.command_cusor_left)
await self.api.async_get_command(
self.urls.command_cusor_left, skip_confirmation=True
)

async def async_cursor_right(self) -> None:
"""Cursor Right on receiver."""
Expand All @@ -1112,7 +1120,9 @@ async def async_cursor_right(self) -> None:
self.telnet_commands.command_cusor_right, skip_confirmation=True
)
else:
await self.api.async_get_command(self.urls.command_cusor_right)
await self.api.async_get_command(
self.urls.command_cusor_right, skip_confirmation=True
)

async def async_cursor_enter(self) -> None:
"""Cursor Enter on receiver."""
Expand All @@ -1121,7 +1131,9 @@ async def async_cursor_enter(self) -> None:
self.telnet_commands.command_cusor_enter, skip_confirmation=True
)
else:
await self.api.async_get_command(self.urls.command_cusor_enter)
await self.api.async_get_command(
self.urls.command_cusor_enter, skip_confirmation=True
)

async def async_back(self) -> None:
"""Back command on receiver."""
Expand All @@ -1130,7 +1142,9 @@ async def async_back(self) -> None:
self.telnet_commands.command_back, skip_confirmation=True
)
else:
await self.api.async_get_command(self.urls.command_back)
await self.api.async_get_command(
self.urls.command_back, skip_confirmation=True
)

async def async_info(self) -> None:
"""Info OSD on receiver."""
Expand All @@ -1139,7 +1153,9 @@ async def async_info(self) -> None:
self.telnet_commands.command_info, skip_confirmation=True
)
else:
await self.api.async_get_command(self.urls.command_info)
await self.api.async_get_command(
self.urls.command_info, skip_confirmation=True
)

async def async_options(self) -> None:
"""Options menu on receiver."""
Expand All @@ -1148,7 +1164,9 @@ async def async_options(self) -> None:
self.telnet_commands.command_options, skip_confirmation=True
)
else:
await self.api.async_get_command(self.urls.command_options)
await self.api.async_get_command(
self.urls.command_options, skip_confirmation=True
)

async def async_settings_menu(self) -> None:
"""
Expand Down
Loading