diff --git a/custom_components/sonoff/core/ewelink/cloud.py b/custom_components/sonoff/core/ewelink/cloud.py index 36e688f..cbbc33b 100644 --- a/custom_components/sonoff/core/ewelink/cloud.py +++ b/custom_components/sonoff/core/ewelink/cloud.py @@ -293,41 +293,6 @@ def sign(msg: bytes) -> bytes: ).digest() -# noinspection PyProtectedMember -class WebSocket: - """Default asyncio.WebSocket keep-alive only incoming messages with heartbeats. - This is helpful if messages from the server don't come very often. - - With this changes we also keep-alive outgoing messages with heartbeats. - This is helpful if our messages to the server are not sent very often. - """ - - def __init__(self, ws: ClientWebSocketResponse): - self._heartbeat: float = ws._heartbeat - self._heartbeat_cb: asyncio.TimerHandle | None = None - self.ws = ws - - def __aiter__(self): - return self.ws - - async def __anext__(self): - return await self.ws.__anext__() - - async def receive_json(self): - return await self.ws.receive_json() - - async def send_json(self, data: dict): - if self._heartbeat_cb: - self._heartbeat_cb.cancel() - self._heartbeat_cb = None - - self._heartbeat_cb = self.ws._loop.call_later( - self._heartbeat, self.ws._send_heartbeat - ) - - await self.ws.send_json(data) - - class XRegistryCloud(ResponseWaiter, XRegistryBase): auth: dict | None = None devices: dict[str, dict] = None @@ -336,7 +301,7 @@ class XRegistryCloud(ResponseWaiter, XRegistryBase): region: str = None task: asyncio.Task | None = None - ws: WebSocket = None + ws: ClientWebSocketResponse = None @property def host(self) -> str: @@ -545,6 +510,8 @@ async def run_forever(self, **kwargs): try: msg: WSMessage async for msg in self.ws: + if msg.data == "pong": + continue resp = json.loads(msg.data) _ = asyncio.create_task(self._process_ws_msg(resp)) except Exception as e: @@ -557,10 +524,9 @@ async def connect(self) -> bool: resp = await r.json() # we can use IP, but using domain because security - ws = await self.session.ws_connect( + self.ws = await self.session.ws_connect( f"wss://{resp['domain']}:{resp['port']}/api/ws", heartbeat=90 ) - self.ws = WebSocket(ws) # https://coolkit-technologies.github.io/eWeLink-API/#/en/APICenterV2?id=websocket-handshake ts = time.time() @@ -592,7 +558,7 @@ async def connect(self) -> bool: raise Exception(resp) if (config := resp.get("config")) and config.get("hb"): - self.ws._heartbeat = config.get("hbInterval") + asyncio.create_task(_ping(self.ws, config.get("hbInterval"))) return True @@ -636,3 +602,13 @@ async def _process_ws_msg(self, data: dict): else: _LOGGER.warning(f"UNKNOWN cloud msg: {data}") + + +# https://coolkit-technologies.github.io/eWeLink-API/#/en/OAuth2.0?id=websocket-handshake +async def _ping(ws: ClientWebSocketResponse, heartbeat: int): + try: + while heartbeat: + await asyncio.sleep(heartbeat) + await ws.send_str("ping") + except: + pass