diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 20af1bb..19cc69e 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -22,6 +22,7 @@ cast, ) +import websockets.exceptions from bt_decode import MetadataV15, PortableRegistry, decode as decode_by_type_string from scalecodec.base import ScaleBytes, ScaleType, RuntimeConfigurationObject from scalecodec.type_registry import load_type_registry_preset @@ -599,6 +600,7 @@ async def _cancel(self): async def connect(self, force=False): async with self._lock: + logger.debug(f"Websocket connecting to {self.ws_url}") if self._sending is None or self._sending.empty(): self._sending = asyncio.Queue() if self._exit_task: @@ -723,8 +725,10 @@ async def _start_receiving(self, ws: ClientConnection) -> Exception: if not fut.done(): fut.set_exception(e) fut.cancel() + elif isinstance(e, websockets.exceptions.ConnectionClosedOK): + logger.debug("Websocket connection closed.") else: - logger.debug("Timeout occurred. Reconnecting.") + logger.debug(f"Timeout occurred. Reconnecting.") return e async def _start_sending(self, ws) -> Exception: @@ -753,6 +757,8 @@ async def _start_sending(self, ws) -> Exception: for i in self._received.keys(): self._received[i].set_exception(e) self._received[i].cancel() + elif isinstance(e, websockets.exceptions.ConnectionClosedOK): + logger.debug("Websocket connection closed.") else: logger.debug("Timeout occurred. Reconnecting.") return e @@ -2370,6 +2376,9 @@ async def _make_rpc_request( for payload in payloads: item_id = await ws.send(payload["payload"]) request_manager.add_request(item_id, payload["id"]) + logger.debug( + f"Submitted payload ID {payload['id']} with websocket ID {item_id}: {payload}" + ) while True: for item_id in request_manager.unresponded(): @@ -2390,6 +2399,10 @@ async def _make_rpc_request( ) subscription_added = True except KeyError: + logger.error( + f"Error received from subtensor for {item_id}: {response}\n" + f"Currently received responses: {request_manager.get_results()}" + ) raise SubstrateRequestException(str(response)) ( decoded_response, @@ -2406,6 +2419,20 @@ async def _make_rpc_request( request_manager.add_response( item_id, decoded_response, complete ) + if ( + len(stringified_response := str(decoded_response)) + < 2_000 + ): + output_response = stringified_response + # avoids clogging logs up needlessly (esp for Metadata stuff) + else: + output_response = ( + f"{stringified_response[:2_000]} (truncated)" + ) + logger.debug( + f"Received response for item ID {item_id}:\n{output_response}\n" + f"Complete: {complete}" + ) if request_manager.is_complete: break diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 2c961b2..504f77d 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -632,6 +632,7 @@ def name(self): def connect(self, init=False): if init is True: try: + logger.debug(f"Websocket connecting to {self.chain_endpoint}") return connect(self.chain_endpoint, max_size=self.ws_max_size) except (ConnectionError, socket.gaierror) as e: raise ConnectionError(e) @@ -640,6 +641,7 @@ def connect(self, init=False): return self.ws else: try: + logger.debug(f"Websocket reconnecting to {self.chain_endpoint}") self.ws = connect(self.chain_endpoint, max_size=self.ws_max_size) return self.ws except (ConnectionError, socket.gaierror) as e: @@ -1902,6 +1904,9 @@ def _make_rpc_request( raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") ws.send(json.dumps(to_send)) request_manager.add_request(item_id, payload["id"]) + logger.debug( + f"Submitted payload ID {payload['id']} with websocket ID {item_id}: {payload}" + ) while True: try: @@ -1948,6 +1953,10 @@ def _make_rpc_request( subscription_added = True except KeyError: raise SubstrateRequestException(str(response)) + logger.error( + f"Error received from subtensor for {item_id}: {response}\n" + f"Currently received responses: {request_manager.get_results()}" + ) decoded_response, complete = self._process_response( response, item_id, @@ -1959,6 +1968,17 @@ def _make_rpc_request( request_manager.add_response( item_id, decoded_response, complete ) + if len(stringified_response := str(decoded_response)) < 2_000: + output_response = stringified_response + # avoids clogging logs up needlessly (esp for Metadata stuff) + else: + output_response = ( + f"{stringified_response[:2_000]} (truncated)" + ) + logger.debug( + f"Received response for item ID {item_id}:\n{output_response}\n" + f"Complete: {complete}" + ) if request_manager.is_complete: break