From 4247a94265d1f45d0463eab2390ebd902a5b183d Mon Sep 17 00:00:00 2001 From: rany Date: Wed, 6 Sep 2023 18:35:04 +0300 Subject: [PATCH 1/3] Improve Local Mode Reliability Local mode devices cannot handle more than 1 concurrent connection to the HTTP server. This means that if an additional connection is made when another one is pending or is in the process of being cleaned up by the device, it will return connection reset error. ==Mention of Failed Attempt== An approach where we acquire a lock and wait 100ms whenever we use the send() method was attempted but did not work consistently on all devices. My theory is that depending the on the processing on the device, it takes a different amount of time for it to reallocate resources to the next new connection. Therefore, simply retrying has worked just fine in my testing. It also has the added advantage of working despite another HA instance, app, etc. taking advantage of local mode. Signed-off-by: rany --- .../sonoff/core/ewelink/local.py | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/custom_components/sonoff/core/ewelink/local.py b/custom_components/sonoff/core/ewelink/local.py index 8a7067d5..db08775a 100644 --- a/custom_components/sonoff/core/ewelink/local.py +++ b/custom_components/sonoff/core/ewelink/local.py @@ -5,6 +5,7 @@ """ import asyncio import base64 +import errno import ipaddress import json import logging @@ -162,6 +163,7 @@ async def send( command: str = None, sequence: str = None, timeout: int = 5, + cre_retry_counter: int = 10, ): # known commands for DIY: switch, startup, pulse, sledonline # other commands: switch, switches, transmit, dimmable, light, fan @@ -231,8 +233,31 @@ async def send( _LOGGER.debug(f"{log} !! Can't connect: {e}") return "E#CON" + except aiohttp.ClientOSError as e: + if e.errno != errno.ECONNRESET: + _LOGGER.debug(log, exc_info=e) + return "E#COE" # ClientOSError + + # This happens because the device's web server is not multi-threaded + # and can only process one request at a time. Therefore, if the + # device is busy processing another request, it will close the + # connection for the new request and we will get this error. + # + # It appears that the device takes some time to process a new request + # after the previous one was closed, which caused a locking approach + # to not work across different devices. Simply retrying on this error + # a few times seems to fortunately work reliably, so we'll do that. + + _LOGGER.debug(f"{log} !! ConnectionResetError") + if cre_retry_counter > 0: + await asyncio.sleep(0.1) + return await self.send( + device, params, command, sequence, timeout, cre_retry_counter - 1 + ) + + return "E#CRE" # ConnectionResetError + except ( - aiohttp.ClientOSError, aiohttp.ServerDisconnectedError, asyncio.CancelledError, ) as e: From e0d9706476809f261b817ae743f1e4b91fdebb70 Mon Sep 17 00:00:00 2001 From: rany Date: Wed, 6 Sep 2023 18:41:09 +0300 Subject: [PATCH 2/3] Use time.time_ns() to generate sequence number Converting time.time() to an int and then converting that to ms risks resulting in the same sequence number for a given second. While we have safe guards against this already, it is better to prevent this scenario all together and just convert to ms properly. Signed-off-by: rany --- custom_components/sonoff/core/ewelink/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/custom_components/sonoff/core/ewelink/base.py b/custom_components/sonoff/core/ewelink/base.py index 8d87ce76..114a4745 100644 --- a/custom_components/sonoff/core/ewelink/base.py +++ b/custom_components/sonoff/core/ewelink/base.py @@ -43,7 +43,7 @@ def __init__(self, session: ClientSession): @staticmethod def sequence() -> str: """Return sequnce counter in ms. Always unique.""" - t = int(time.time()) * 1000 + t = time.time_ns() // 1_000_000 if t > XRegistryBase._sequence: XRegistryBase._sequence = t else: From 64e969fb162047ba0d7bd99c98c09365cb9c9fd4 Mon Sep 17 00:00:00 2001 From: rany Date: Wed, 6 Sep 2023 18:44:52 +0300 Subject: [PATCH 3/3] Acquire lock when performing comparisons against/returning of sequence Prevent the same sequence number being returned for different threads as this might cause hard to detect issues. Signed-off-by: rany --- custom_components/sonoff/core/ewelink/__init__.py | 2 +- custom_components/sonoff/core/ewelink/base.py | 14 ++++++++------ custom_components/sonoff/core/ewelink/cloud.py | 2 +- custom_components/sonoff/core/ewelink/local.py | 2 +- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/custom_components/sonoff/core/ewelink/__init__.py b/custom_components/sonoff/core/ewelink/__init__.py index c2a0a7f7..722932a7 100644 --- a/custom_components/sonoff/core/ewelink/__init__.py +++ b/custom_components/sonoff/core/ewelink/__init__.py @@ -98,7 +98,7 @@ async def send( ignored if params empty :param timeout_lan: optional custom LAN timeout """ - seq = self.sequence() + seq = await self.sequence() if "parent" in device: main_device = device["parent"] diff --git a/custom_components/sonoff/core/ewelink/base.py b/custom_components/sonoff/core/ewelink/base.py index 114a4745..985ec792 100644 --- a/custom_components/sonoff/core/ewelink/base.py +++ b/custom_components/sonoff/core/ewelink/base.py @@ -35,20 +35,22 @@ class XDevice(TypedDict, total=False): class XRegistryBase: dispatcher: dict[str, list[Callable]] = None _sequence: int = 0 + _sequence_lock: asyncio.Lock = asyncio.Lock() def __init__(self, session: ClientSession): self.dispatcher = {} self.session = session @staticmethod - def sequence() -> str: + async def sequence() -> str: """Return sequnce counter in ms. Always unique.""" t = time.time_ns() // 1_000_000 - if t > XRegistryBase._sequence: - XRegistryBase._sequence = t - else: - XRegistryBase._sequence += 1 - return str(XRegistryBase._sequence) + async with XRegistryBase._sequence_lock: + if t > XRegistryBase._sequence: + XRegistryBase._sequence = t + else: + XRegistryBase._sequence += 1 + return str(XRegistryBase._sequence) def dispatcher_connect(self, signal: str, target: Callable) -> Callable: targets = self.dispatcher.setdefault(signal, []) diff --git a/custom_components/sonoff/core/ewelink/cloud.py b/custom_components/sonoff/core/ewelink/cloud.py index ff01dbf3..dc7e7d50 100644 --- a/custom_components/sonoff/core/ewelink/cloud.py +++ b/custom_components/sonoff/core/ewelink/cloud.py @@ -220,7 +220,7 @@ async def send( self.last_ts = time.time() if sequence is None: - sequence = self.sequence() + sequence = await self.sequence() log += sequence # https://coolkit-technologies.github.io/eWeLink-API/#/en/APICenterV2?id=websocket-update-device-status diff --git a/custom_components/sonoff/core/ewelink/local.py b/custom_components/sonoff/core/ewelink/local.py index db08775a..6754e6d3 100644 --- a/custom_components/sonoff/core/ewelink/local.py +++ b/custom_components/sonoff/core/ewelink/local.py @@ -174,7 +174,7 @@ async def send( command = next(iter(params)) payload = { - "sequence": sequence or self.sequence(), + "sequence": sequence or await self.sequence(), "deviceid": device["deviceid"], "selfApikey": "123", "data": params or {},