Skip to content

Release/1.5.1 #171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 1.5.1 /2025-08-05
* query multiple/decoding fix by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/168
* Fix reconnection logic by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/169

**Full Changelog**: https://github.com/opentensor/async-substrate-interface/compare/v1.5.0...v1.5.1

## 1.5.0 /2025-08-04
* ConcurrencyError fix by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/162
* Added better typing by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/163
Expand Down
72 changes: 51 additions & 21 deletions async_substrate_interface/async_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ def __init__(
options: Optional[dict] = None,
_log_raw_websockets: bool = False,
retry_timeout: float = 60.0,
max_retries: int = 5,
):
"""
Websocket manager object. Allows for the use of a single websocket connection by multiple
Expand All @@ -536,6 +537,10 @@ def __init__(
max_subscriptions: Maximum number of subscriptions per websocket connection
max_connections: Maximum number of connections total
shutdown_timer: Number of seconds to shut down websocket connection after last use
options: Options to pass to the websocket connection
_log_raw_websockets: Whether to log raw websockets in the "raw_websocket" logger
retry_timeout: Timeout in seconds to retry websocket connection
max_retries: Maximum number of retries following a timeout
"""
# TODO allow setting max concurrent connections and rpc subscriptions per connection
self.ws_url = ws_url
Expand All @@ -555,6 +560,7 @@ def __init__(
self._options = options if options else {}
self._log_raw_websockets = _log_raw_websockets
self._in_use_ids = set()
self._max_retries = max_retries

@property
def state(self):
Expand All @@ -575,7 +581,6 @@ async def loop_time() -> float:
async def _cancel(self):
try:
self._send_recv_task.cancel()
await self._send_recv_task
await self.ws.close()
except (
AttributeError,
Expand Down Expand Up @@ -616,19 +621,30 @@ async def _handler(self, ws: ClientConnection) -> None:
)
loop = asyncio.get_running_loop()
should_reconnect = False
is_retry = False
for task in pending:
task.cancel()
for task in done:
if isinstance(task.result(), (asyncio.TimeoutError, ConnectionClosed)):
task_res = task.result()
if isinstance(
task_res, (asyncio.TimeoutError, ConnectionClosed, TimeoutError)
):
should_reconnect = True
if isinstance(task_res, (asyncio.TimeoutError, TimeoutError)):
self._attempts += 1
is_retry = True
if should_reconnect is True:
for original_id, payload in list(self._inflight.items()):
self._received[original_id] = loop.create_future()
to_send = json.loads(payload)
await self._sending.put(to_send)
logger.info("Timeout occurred. Reconnecting.")
if is_retry:
# Otherwise the connection was just closed due to no activity, which should not count against retries
logger.info(
f"Timeout occurred. Reconnecting. Attempt {self._attempts} of {self._max_retries}"
)
await self.connect(True)
await self._handler(ws=ws)
await self._handler(ws=self.ws)
elif isinstance(e := recv_task.result(), Exception):
return e
elif isinstance(e := send_task.result(), Exception):
Expand Down Expand Up @@ -689,15 +705,22 @@ async def _start_receiving(self, ws: ClientConnection) -> Exception:
recd = await asyncio.wait_for(
ws.recv(decode=False), timeout=self.retry_timeout
)
# reset the counter once we successfully receive something back
self._attempts = 0
await self._recv(recd)
except Exception as e:
logger.exception("Start receiving exception", exc_info=e)
if isinstance(e, ssl.SSLError):
e = ConnectionClosed
for fut in self._received.values():
if not fut.done():
fut.set_exception(e)
fut.cancel()
if not isinstance(
e, (asyncio.TimeoutError, TimeoutError, ConnectionClosed)
):
logger.exception("Websocket receiving exception", exc_info=e)
for fut in self._received.values():
if not fut.done():
fut.set_exception(e)
fut.cancel()
else:
logger.debug("Timeout occurred. Reconnecting.")
return e

async def _start_sending(self, ws) -> Exception:
Expand All @@ -713,14 +736,21 @@ async def _start_sending(self, ws) -> Exception:
raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}")
await ws.send(to_send)
except Exception as e:
logger.exception("Start sending exception", exc_info=e)
if to_send is not None:
self._received[to_send["id"]].set_exception(e)
self._received[to_send["id"]].cancel()
if isinstance(e, ssl.SSLError):
e = ConnectionClosed
if not isinstance(
e, (asyncio.TimeoutError, TimeoutError, ConnectionClosed)
):
logger.exception("Websocket sending exception", exc_info=e)
if to_send is not None:
self._received[to_send["id"]].set_exception(e)
self._received[to_send["id"]].cancel()
else:
for i in self._received.keys():
self._received[i].set_exception(e)
self._received[i].cancel()
else:
for i in self._received.keys():
self._received[i].set_exception(e)
self._received[i].cancel()
logger.debug("Timeout occurred. Reconnecting.")
return e

async def send(self, payload: dict) -> str:
Expand Down Expand Up @@ -784,9 +814,9 @@ async def retrieve(self, item_id: str) -> Optional[dict]:
if item is not None:
if item.done():
self.max_subscriptions.release()
res = item.result()
del self._received[item_id]

return item.result()
return res
else:
try:
return self._received_subscriptions[item_id].get_nowait()
Expand Down Expand Up @@ -860,6 +890,7 @@ def __init__(
},
shutdown_timer=ws_shutdown_timer,
retry_timeout=self.retry_timeout,
max_retries=max_retries,
)
else:
self.ws = AsyncMock(spec=Websocket)
Expand Down Expand Up @@ -1165,7 +1196,7 @@ async def get_runtime_for_version(
async def _get_runtime_for_version(
self, runtime_version: int, block_hash: Optional[str] = None
) -> Runtime:
runtime_config = RuntimeConfigurationObject()
runtime_config = RuntimeConfigurationObject(ss58_format=self.ss58_format)
runtime_config.clear_type_registry()
runtime_config.update_type_registry(load_type_registry_preset(name="core"))

Expand Down Expand Up @@ -2337,7 +2368,7 @@ async def _make_rpc_request(
request_manager.add_request(item_id, payload["id"])

while True:
for item_id in list(request_manager.response_map.keys()):
for item_id in request_manager.unresponded():
if (
item_id not in request_manager.responses
or asyncio.iscoroutinefunction(result_handler)
Expand Down Expand Up @@ -2368,7 +2399,6 @@ async def _make_rpc_request(
runtime=runtime,
force_legacy_decode=force_legacy_decode,
)

request_manager.add_response(
item_id, decoded_response, complete
)
Expand Down
2 changes: 1 addition & 1 deletion async_substrate_interface/sync_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -1924,7 +1924,7 @@ def _make_rpc_request(
_received[response["params"]["subscription"]] = response
else:
raise SubstrateRequestException(response)
for item_id in list(request_manager.response_map.keys()):
for item_id in request_manager.unresponded():
if item_id not in request_manager.responses or isinstance(
result_handler, Callable
):
Expand Down
9 changes: 9 additions & 0 deletions async_substrate_interface/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,15 @@ def get_results(self) -> RequestResults:
request_id: info["results"] for request_id, info in self.responses.items()
}

def unresponded(self):
"""
Yields items from response_map whose corresponding response is missing or incomplete.
"""
for item_id, request_id in list(self.response_map.items()):
response_info = self.responses.get(request_id)
if response_info is None or not response_info["complete"]:
yield item_id


@dataclass
class Preprocessed:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "async-substrate-interface"
version = "1.5.0"
version = "1.5.1"
description = "Asyncio library for interacting with substrate. Mostly API-compatible with py-substrate-interface"
readme = "README.md"
license = { file = "LICENSE" }
Expand Down
31 changes: 30 additions & 1 deletion tests/integration_tests/test_async_substrate_interface.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import time

import pytest
Expand Down Expand Up @@ -126,9 +127,37 @@ async def test_get_events_proper_decoding():
async with AsyncSubstrateInterface(ARCHIVE_ENTRYPOINT) as substrate:
all_events = await substrate.get_events(block_hash=block_hash)
event = all_events[1]
print(type(event["attributes"]))
assert event["attributes"] == (
"5G1NjW9YhXLadMWajvTkfcJy6up3yH2q1YzMXDTi6ijanChe",
30,
"0xa6b4e5c8241d60ece0c25056b19f7d21ae845269fc771ad46bf3e011865129a5",
)


@pytest.mark.asyncio
async def test_query_multiple():
block = 6153277
cks = [
"5FH9AQM4kqbkdC9jyV5FrdEWVYt41nkhFstop7Vhyfb9ZsXt",
"5GQxLKxjZWNZDsghmYcw7P6ahC7XJCjx1WD94WGh92quSycx",
"5EcaPiDT1cv951SkCFsvdHDs2yAEUWhJDuRP9mHb343WnaVn",
]
async with AsyncSubstrateInterface(ARCHIVE_ENTRYPOINT) as substrate:
block_hash = await substrate.get_block_hash(block_id=block)
assert await substrate.query_multiple(
params=cks,
module="SubtensorModule",
storage_function="OwnedHotkeys",
block_hash=block_hash,
)


@pytest.mark.asyncio
async def test_reconnection():
async with AsyncSubstrateInterface(
ARCHIVE_ENTRYPOINT, ss58_format=42, retry_timeout=8.0
) as substrate:
await asyncio.sleep(9) # sleep for longer than the retry timeout
bh = await substrate.get_chain_finalised_head()
assert isinstance(bh, str)
assert isinstance(await substrate.get_block_number(bh), int)
18 changes: 17 additions & 1 deletion tests/integration_tests/test_substrate_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,25 @@ def test_get_events_proper_decoding():
with SubstrateInterface(ARCHIVE_ENTRYPOINT) as substrate:
all_events = substrate.get_events(block_hash=block_hash)
event = all_events[1]
print(type(event["attributes"]))
assert event["attributes"] == (
"5G1NjW9YhXLadMWajvTkfcJy6up3yH2q1YzMXDTi6ijanChe",
30,
"0xa6b4e5c8241d60ece0c25056b19f7d21ae845269fc771ad46bf3e011865129a5",
)


def test_query_multiple():
block = 6153277
cks = [
"5FH9AQM4kqbkdC9jyV5FrdEWVYt41nkhFstop7Vhyfb9ZsXt",
"5GQxLKxjZWNZDsghmYcw7P6ahC7XJCjx1WD94WGh92quSycx",
"5EcaPiDT1cv951SkCFsvdHDs2yAEUWhJDuRP9mHb343WnaVn",
]
with SubstrateInterface(ARCHIVE_ENTRYPOINT) as substrate:
block_hash = substrate.get_block_hash(block_id=block)
assert substrate.query_multiple(
params=cks,
module="SubtensorModule",
storage_function="OwnedHotkeys",
block_hash=block_hash,
)
9 changes: 5 additions & 4 deletions tests/unit_tests/asyncio_/test_substrate_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest
from websockets.exceptions import InvalidURI
from websockets.protocol import State

from async_substrate_interface.async_substrate import AsyncSubstrateInterface
from async_substrate_interface.types import ScaleObj
Expand Down Expand Up @@ -103,17 +104,17 @@ async def test_websocket_shutdown_timer():
async with AsyncSubstrateInterface("wss://lite.sub.latent.to:443") as substrate:
await substrate.get_chain_head()
await asyncio.sleep(6)
assert (
substrate.ws._initialized is False
) # connection should have closed automatically
assert (
substrate.ws.state is State.CLOSED
) # connection should have closed automatically

# using custom ws shutdown timer of 10.0 seconds
async with AsyncSubstrateInterface(
"wss://lite.sub.latent.to:443", ws_shutdown_timer=10.0
) as substrate:
await substrate.get_chain_head()
await asyncio.sleep(6) # same sleep time as before
assert substrate.ws._initialized is True # connection should still be open
assert substrate.ws.state is State.OPEN # connection should still be open


@pytest.mark.asyncio
Expand Down
Loading