From 3f916ba07c20c888fb646562ad08ee2b022b2403 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Tue, 15 Apr 2025 18:32:58 +0200 Subject: [PATCH 1/5] [WIP] Proof of concept for fallback networks. Not tested or working with Async. Not perfect with sync. --- bittensor/core/subtensor.py | 18 +++- bittensor/core/types.py | 181 +++++++++++++++++++++++++++++++++++- 2 files changed, 195 insertions(+), 4 deletions(-) diff --git a/bittensor/core/subtensor.py b/bittensor/core/subtensor.py index bcb1e256c1..f32b537aec 100644 --- a/bittensor/core/subtensor.py +++ b/bittensor/core/subtensor.py @@ -81,7 +81,7 @@ SS58_FORMAT, TYPE_REGISTRY, ) -from bittensor.core.types import ParamWithTypes, SubtensorMixin +from bittensor.core.types import ParamWithTypes, SubtensorMixin, RetrySubstrate from bittensor.utils import ( Certificate, decode_hex_identity_dict, @@ -92,6 +92,7 @@ u16_normalized_float, u64_normalized_float, unlock_key, + determine_chain_endpoint_and_network, ) from bittensor.utils.balance import ( Balance, @@ -117,6 +118,8 @@ def __init__( config: Optional["Config"] = None, _mock: bool = False, log_verbose: bool = False, + fallback_chains: Optional[list[str]] = None, + retry_forever: bool = False, ): """ Initializes an instance of the Subtensor class. @@ -126,10 +129,13 @@ def __init__( config (Optional[Config]): Configuration object for the AsyncSubtensor instance. _mock: Whether this is a mock instance. Mainly just for use in testing. log_verbose (bool): Enables or disables verbose logging. + fallback_chains: list of chain urls to try if the initial one fails + retry_forever: whether to continuously try the chains indefinitely if timeout failure Raises: Any exceptions raised during the setup, configuration, or connection process. """ + fallback_chains_ = fallback_chains or [] if config is None: config = self.config() self._config = copy.deepcopy(config) @@ -143,13 +149,19 @@ def __init__( f"Connecting to network: [blue]{self.network}[/blue], " f"chain_endpoint: [blue]{self.chain_endpoint}[/blue]> ..." ) - self.substrate = SubstrateInterface( - url=self.chain_endpoint, + fallback_chain_urls = [ + determine_chain_endpoint_and_network(x)[1] for x in fallback_chains_ + ] + self.substrate = RetrySubstrate( + substrate=SubstrateInterface, + main_url=self.chain_endpoint, ss58_format=SS58_FORMAT, type_registry=TYPE_REGISTRY, use_remote_preset=True, chain_name="Bittensor", _mock=_mock, + fallback_chains=fallback_chain_urls, + retry_forever=retry_forever, ) if self.log_verbose: logging.info( diff --git a/bittensor/core/types.py b/bittensor/core/types.py index db5ab0b24e..2c677e5876 100644 --- a/bittensor/core/types.py +++ b/bittensor/core/types.py @@ -1,6 +1,12 @@ from abc import ABC import argparse -from typing import TypedDict, Optional +from functools import partial +from itertools import cycle +from typing import TypedDict, Optional, Union, Type + +from async_substrate_interface.sync_substrate import SubstrateInterface +from async_substrate_interface.async_substrate import AsyncSubstrateInterface +from async_substrate_interface.errors import MaxRetriesExceeded from bittensor.utils import networking, Certificate from bittensor.utils.btlogging import logging @@ -9,6 +15,179 @@ from bittensor.core.chain_data import NeuronInfo, NeuronInfoLite from bittensor.utils import determine_chain_endpoint_and_network +SubstrateClass = Type[Union[SubstrateInterface, AsyncSubstrateInterface]] + + +class RetrySubstrate: + def __init__( + self, + substrate: SubstrateClass, + main_url: str, + ss58_format: int, + type_registry: dict, + use_remote_preset: bool, + chain_name: str, + _mock: bool, + fallback_chains: Optional[list[str]] = None, + retry_forever: bool = False, + ): + self._substrate_class: SubstrateClass = substrate + self.ss58_format: int = ss58_format + self.type_registry: dict = type_registry + self.use_remote_preset: bool = use_remote_preset + self.chain_name: str = chain_name + self._mock = _mock + self.fallback_chains = ( + iter(fallback_chains) + if not retry_forever + else cycle(fallback_chains + [main_url]) + ) + initialized = False + for chain_url in [main_url] + fallback_chains: + try: + self._substrate = self._substrate_class( + url=chain_url, + ss58_format=ss58_format, + type_registry=type_registry, + use_remote_preset=use_remote_preset, + chain_name=chain_name, + _mock=_mock, + ) + initialized = True + break + except ConnectionError: + continue + if not initialized: + raise ConnectionError( + f"Unable to connect at any chains specified: {[main_url]+fallback_chains}" + ) + + # retries + + # TODO: properties that need retry logic + # properties + # version + # token_decimals + # token_symbol + # name + + self._get_block_handler = partial(self._retry, "_get_block_handler") + self.apply_type_registry_presets = partial( + self._retry, "apply_type_registry_presets" + ) + self.close = partial(self._retry, "close") + self.compose_call = partial(self._retry, "compose_call") + self.connect = partial(self._retry, "connect") + self.create_scale_object = partial(self._retry, "create_scale_object") + self.create_signed_extrinsic = partial(self._retry, "create_signed_extrinsic") + self.create_storage_key = partial(self._retry, "create_storage_key") + self.decode_scale = partial(self._retry, "decode_scale") + self.encode_scale = partial(self._retry, "encode_scale") + self.extension_call = partial(self._retry, "extension_call") + self.filter_events = partial(self._retry, "filter_events") + self.filter_extrinsics = partial(self._retry, "filter_extrinsics") + self.generate_signature_payload = partial( + self._retry, "generate_signature_payload" + ) + self.get_account_next_index = partial(self._retry, "get_account_next_index") + self.get_account_nonce = partial(self._retry, "get_account_nonce") + self.get_block = partial(self._retry, "get_block") + self.get_block_hash = partial(self._retry, "get_block_hash") + self.get_block_header = partial(self._retry, "get_block_header") + self.get_block_metadata = partial(self._retry, "get_block_metadata") + self.get_block_number = partial(self._retry, "get_block_number") + self.get_block_runtime_info = partial(self._retry, "get_block_runtime_info") + self.get_block_runtime_version_for = partial( + self._retry, "get_block_runtime_version_for" + ) + self.get_block_timestamp = partial(self._retry, "get_block_timestamp") + self.get_chain_finalised_head = partial(self._retry, "get_chain_finalised_head") + self.get_chain_head = partial(self._retry, "get_chain_head") + self.get_constant = partial(self._retry, "get_constant") + self.get_events = partial(self._retry, "get_events") + self.get_extrinsics = partial(self._retry, "get_extrinsics") + self.get_metadata_call_function = partial( + self._retry, "get_metadata_call_function" + ) + self.get_metadata_constant = partial(self._retry, "get_metadata_constant") + self.get_metadata_error = partial(self._retry, "get_metadata_error") + self.get_metadata_errors = partial(self._retry, "get_metadata_errors") + self.get_metadata_module = partial(self._retry, "get_metadata_module") + self.get_metadata_modules = partial(self._retry, "get_metadata_modules") + self.get_metadata_runtime_call_function = partial( + self._retry, "get_metadata_runtime_call_function" + ) + self.get_metadata_runtime_call_functions = partial( + self._retry, "get_metadata_runtime_call_functions" + ) + self.get_metadata_storage_function = partial( + self._retry, "get_metadata_storage_function" + ) + self.get_metadata_storage_functions = partial( + self._retry, "get_metadata_storage_functions" + ) + self.get_parent_block_hash = partial(self._retry, "get_parent_block_hash") + self.get_payment_info = partial(self._retry, "get_payment_info") + self.get_storage_item = partial(self._retry, "get_storage_item") + self.get_type_definition = partial(self._retry, "get_type_definition") + self.get_type_registry = partial(self._retry, "get_type_registry") + self.init_runtime = partial(self._retry, "init_runtime") + self.initialize = partial(self._retry, "initialize") + self.is_valid_ss58_address = partial(self._retry, "is_valid_ss58_address") + self.load_runtime = partial(self._retry, "load_runtime") + self.make_payload = partial(self._retry, "make_payload") + self.query = partial(self._retry, "query") + self.query_map = partial(self._retry, "query_map") + self.query_multi = partial(self._retry, "query_multi") + self.query_multiple = partial(self._retry, "query_multiple") + self.reload_type_registry = partial(self._retry, "reload_type_registry") + self.retrieve_extrinsic_by_hash = partial( + self._retry, "retrieve_extrinsic_by_hash" + ) + self.retrieve_extrinsic_by_identifier = partial( + self._retry, "retrieve_extrinsic_by_identifier" + ) + self.rpc_request = partial(self._retry, "rpc_request") + self.runtime_call = partial(self._retry, "runtime_call") + self.search_block_number = partial(self._retry, "search_block_number") + self.serialize_constant = partial(self._retry, "serialize_constant") + self.serialize_module_call = partial(self._retry, "serialize_module_call") + self.serialize_module_error = partial(self._retry, "serialize_module_error") + self.serialize_module_event = partial(self._retry, "serialize_module_event") + self.serialize_storage_item = partial(self._retry, "serialize_storage_item") + self.ss58_decode = partial(self._retry, "ss58_decode") + self.ss58_encode = partial(self._retry, "ss58_encode") + self.submit_extrinsic = partial(self._retry, "submit_extrinsic") + self.subscribe_block_headers = partial(self._retry, "subscribe_block_headers") + self.supports_rpc_method = partial(self._retry, "supports_rpc_method") + self.ws = self._substrate.ws + + def _retry(self, method, *args, **kwargs): + try: + method_ = getattr(self._substrate, method) + return method_(*args, **kwargs) + except MaxRetriesExceeded: + try: + next_network = next(self.fallback_chains) + logging.error( + f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}." + ) + self._substrate = self._substrate_class( + url=next_network, + ss58_format=self.ss58_format, + type_registry=self.type_registry, + use_remote_preset=self.use_remote_preset, + chain_name=self.chain_name, + _mock=self._mock, + ) + method_ = getattr(self._substrate, method) + return self._retry(method_(*args, **kwargs)) + except StopIteration: + logging.error( + f"Max retries exceeded with {self._substrate.url}. No more fallback chains." + ) + raise MaxRetriesExceeded + class SubtensorMixin(ABC): network: str From a1e8e2918a7e933e52c41746ef27b48ee05a00ca Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 18 Apr 2025 14:26:26 +0200 Subject: [PATCH 2/5] Async seems to be working --- bittensor/core/async_subtensor.py | 18 ++- bittensor/core/axon.py | 30 ++--- bittensor/core/types.py | 198 ++++++++++++++++++------------ 3 files changed, 148 insertions(+), 98 deletions(-) diff --git a/bittensor/core/async_subtensor.py b/bittensor/core/async_subtensor.py index 1bd3ce9820..51170ab1d6 100644 --- a/bittensor/core/async_subtensor.py +++ b/bittensor/core/async_subtensor.py @@ -79,7 +79,7 @@ ) from bittensor.core.metagraph import AsyncMetagraph from bittensor.core.settings import version_as_int, TYPE_REGISTRY -from bittensor.core.types import ParamWithTypes, SubtensorMixin +from bittensor.core.types import ParamWithTypes, SubtensorMixin, RetrySubstrate from bittensor.utils import ( Certificate, decode_hex_identity_dict, @@ -90,6 +90,7 @@ u16_normalized_float, u64_normalized_float, unlock_key, + determine_chain_endpoint_and_network, ) from bittensor.utils.balance import ( Balance, @@ -115,6 +116,8 @@ def __init__( config: Optional["Config"] = None, _mock: bool = False, log_verbose: bool = False, + fallback_chains: Optional[list[str]] = None, + retry_forever: bool = False, ): """ Initializes an instance of the AsyncSubtensor class. @@ -124,10 +127,13 @@ def __init__( config (Optional[Config]): Configuration object for the AsyncSubtensor instance. _mock: Whether this is a mock instance. Mainly just for use in testing. log_verbose (bool): Enables or disables verbose logging. + fallback_chains: list of chain urls to try if the initial one fails + retry_forever: whether to continuously try the chains indefinitely if timeout failure Raises: Any exceptions raised during the setup, configuration, or connection process. """ + fallback_chains_ = fallback_chains or [] if config is None: config = AsyncSubtensor.config() self._config = copy.deepcopy(config) @@ -135,6 +141,9 @@ def __init__( network, self._config ) self._mock = _mock + fallback_chain_urls = [ + determine_chain_endpoint_and_network(x)[1] for x in fallback_chains_ + ] self.log_verbose = log_verbose self._check_and_log_network_settings() @@ -143,13 +152,16 @@ def __init__( f"Connecting to network: [blue]{self.network}[/blue], " f"chain_endpoint: [blue]{self.chain_endpoint}[/blue]..." ) - self.substrate = AsyncSubstrateInterface( - url=self.chain_endpoint, + self.substrate = RetrySubstrate( + AsyncSubstrateInterface, + main_url=self.chain_endpoint, ss58_format=SS58_FORMAT, type_registry=TYPE_REGISTRY, use_remote_preset=True, chain_name="Bittensor", _mock=_mock, + fallback_chains=fallback_chain_urls, + retry_forever=retry_forever, ) if self.log_verbose: logging.info( diff --git a/bittensor/core/axon.py b/bittensor/core/axon.py index 56b55e60eb..54817ccdfd 100644 --- a/bittensor/core/axon.py +++ b/bittensor/core/axon.py @@ -504,9 +504,9 @@ def verify_custom(synapse: MyCustomSynapse): ) param_class = first_param.annotation - assert issubclass(param_class, Synapse), ( - "The first argument of forward_fn must inherit from bittensor.Synapse" - ) + assert issubclass( + param_class, Synapse + ), "The first argument of forward_fn must inherit from bittensor.Synapse" request_name = param_class.__name__ async def endpoint(*args, **kwargs): @@ -580,19 +580,19 @@ async def endpoint(*args, **kwargs): blacklist_sig = Signature( expected_params, return_annotation=Tuple[bool, str] ) - assert signature(blacklist_fn) == blacklist_sig, ( - f"The blacklist_fn function must have the signature: blacklist( synapse: {request_name} ) -> tuple[bool, str]" - ) + assert ( + signature(blacklist_fn) == blacklist_sig + ), f"The blacklist_fn function must have the signature: blacklist( synapse: {request_name} ) -> tuple[bool, str]" if priority_fn: priority_sig = Signature(expected_params, return_annotation=float) - assert signature(priority_fn) == priority_sig, ( - f"The priority_fn function must have the signature: priority( synapse: {request_name} ) -> float" - ) + assert ( + signature(priority_fn) == priority_sig + ), f"The priority_fn function must have the signature: priority( synapse: {request_name} ) -> float" if verify_fn: verify_sig = Signature(expected_params, return_annotation=None) - assert signature(verify_fn) == verify_sig, ( - f"The verify_fn function must have the signature: verify( synapse: {request_name} ) -> None" - ) + assert ( + signature(verify_fn) == verify_sig + ), f"The verify_fn function must have the signature: verify( synapse: {request_name} ) -> None" # Store functions in appropriate attribute dictionaries self.forward_class_types[request_name] = param_class @@ -747,9 +747,9 @@ def check_config(cls, config: "Config"): Raises: AssertionError: If the axon or external ports are not in range [1024, 65535] """ - assert 1024 < config.axon.port < 65535, ( - "Axon port must be in range [1024, 65535]" - ) + assert ( + 1024 < config.axon.port < 65535 + ), "Axon port must be in range [1024, 65535]" assert config.axon.external_port is None or ( 1024 < config.axon.external_port < 65535 diff --git a/bittensor/core/types.py b/bittensor/core/types.py index 2c677e5876..9f79b8d49d 100644 --- a/bittensor/core/types.py +++ b/bittensor/core/types.py @@ -1,3 +1,4 @@ +import asyncio from abc import ABC import argparse from functools import partial @@ -31,6 +32,7 @@ def __init__( fallback_chains: Optional[list[str]] = None, retry_forever: bool = False, ): + fallback_chains = fallback_chains or [] self._substrate_class: SubstrateClass = substrate self.ss58_format: int = ss58_format self.type_registry: dict = type_registry @@ -56,7 +58,7 @@ def __init__( initialized = True break except ConnectionError: - continue + logging.warning(f"Unable to connect to {chain_url}") if not initialized: raise ConnectionError( f"Unable to connect at any chains specified: {[main_url]+fallback_chains}" @@ -71,107 +73,140 @@ def __init__( # token_symbol # name - self._get_block_handler = partial(self._retry, "_get_block_handler") - self.apply_type_registry_presets = partial( - self._retry, "apply_type_registry_presets" + retry = ( + self._async_retry + if self._substrate_class == AsyncSubstrateInterface + else self._retry ) - self.close = partial(self._retry, "close") - self.compose_call = partial(self._retry, "compose_call") - self.connect = partial(self._retry, "connect") - self.create_scale_object = partial(self._retry, "create_scale_object") - self.create_signed_extrinsic = partial(self._retry, "create_signed_extrinsic") - self.create_storage_key = partial(self._retry, "create_storage_key") - self.decode_scale = partial(self._retry, "decode_scale") - self.encode_scale = partial(self._retry, "encode_scale") - self.extension_call = partial(self._retry, "extension_call") - self.filter_events = partial(self._retry, "filter_events") - self.filter_extrinsics = partial(self._retry, "filter_extrinsics") - self.generate_signature_payload = partial( - self._retry, "generate_signature_payload" - ) - self.get_account_next_index = partial(self._retry, "get_account_next_index") - self.get_account_nonce = partial(self._retry, "get_account_nonce") - self.get_block = partial(self._retry, "get_block") - self.get_block_hash = partial(self._retry, "get_block_hash") - self.get_block_header = partial(self._retry, "get_block_header") - self.get_block_metadata = partial(self._retry, "get_block_metadata") - self.get_block_number = partial(self._retry, "get_block_number") - self.get_block_runtime_info = partial(self._retry, "get_block_runtime_info") + + self._get_block_handler = partial(retry, "_get_block_handler") + self.apply_type_registry_presets = partial(retry, "apply_type_registry_presets") + self.close = partial(retry, "close") + self.compose_call = partial(retry, "compose_call") + self.connect = partial(retry, "connect") + self.create_scale_object = partial(retry, "create_scale_object") + self.create_signed_extrinsic = partial(retry, "create_signed_extrinsic") + self.create_storage_key = partial(retry, "create_storage_key") + self.decode_scale = partial(retry, "decode_scale") + self.encode_scale = partial(retry, "encode_scale") + self.extension_call = partial(retry, "extension_call") + self.filter_events = partial(retry, "filter_events") + self.filter_extrinsics = partial(retry, "filter_extrinsics") + self.generate_signature_payload = partial(retry, "generate_signature_payload") + self.get_account_next_index = partial(retry, "get_account_next_index") + self.get_account_nonce = partial(retry, "get_account_nonce") + self.get_block = partial(retry, "get_block") + self.get_block_hash = partial(retry, "get_block_hash") + self.get_block_header = partial(retry, "get_block_header") + self.get_block_metadata = partial(retry, "get_block_metadata") + self.get_block_number = partial(retry, "get_block_number") + self.get_block_runtime_info = partial(retry, "get_block_runtime_info") self.get_block_runtime_version_for = partial( - self._retry, "get_block_runtime_version_for" - ) - self.get_block_timestamp = partial(self._retry, "get_block_timestamp") - self.get_chain_finalised_head = partial(self._retry, "get_chain_finalised_head") - self.get_chain_head = partial(self._retry, "get_chain_head") - self.get_constant = partial(self._retry, "get_constant") - self.get_events = partial(self._retry, "get_events") - self.get_extrinsics = partial(self._retry, "get_extrinsics") - self.get_metadata_call_function = partial( - self._retry, "get_metadata_call_function" + retry, "get_block_runtime_version_for" ) - self.get_metadata_constant = partial(self._retry, "get_metadata_constant") - self.get_metadata_error = partial(self._retry, "get_metadata_error") - self.get_metadata_errors = partial(self._retry, "get_metadata_errors") - self.get_metadata_module = partial(self._retry, "get_metadata_module") - self.get_metadata_modules = partial(self._retry, "get_metadata_modules") + self.get_block_timestamp = partial(retry, "get_block_timestamp") + self.get_chain_finalised_head = partial(retry, "get_chain_finalised_head") + self.get_chain_head = partial(retry, "get_chain_head") + self.get_constant = partial(retry, "get_constant") + self.get_events = partial(retry, "get_events") + self.get_extrinsics = partial(retry, "get_extrinsics") + self.get_metadata_call_function = partial(retry, "get_metadata_call_function") + self.get_metadata_constant = partial(retry, "get_metadata_constant") + self.get_metadata_error = partial(retry, "get_metadata_error") + self.get_metadata_errors = partial(retry, "get_metadata_errors") + self.get_metadata_module = partial(retry, "get_metadata_module") + self.get_metadata_modules = partial(retry, "get_metadata_modules") self.get_metadata_runtime_call_function = partial( - self._retry, "get_metadata_runtime_call_function" + retry, "get_metadata_runtime_call_function" ) self.get_metadata_runtime_call_functions = partial( - self._retry, "get_metadata_runtime_call_functions" + retry, "get_metadata_runtime_call_functions" ) self.get_metadata_storage_function = partial( - self._retry, "get_metadata_storage_function" + retry, "get_metadata_storage_function" ) self.get_metadata_storage_functions = partial( - self._retry, "get_metadata_storage_functions" - ) - self.get_parent_block_hash = partial(self._retry, "get_parent_block_hash") - self.get_payment_info = partial(self._retry, "get_payment_info") - self.get_storage_item = partial(self._retry, "get_storage_item") - self.get_type_definition = partial(self._retry, "get_type_definition") - self.get_type_registry = partial(self._retry, "get_type_registry") - self.init_runtime = partial(self._retry, "init_runtime") - self.initialize = partial(self._retry, "initialize") - self.is_valid_ss58_address = partial(self._retry, "is_valid_ss58_address") - self.load_runtime = partial(self._retry, "load_runtime") - self.make_payload = partial(self._retry, "make_payload") - self.query = partial(self._retry, "query") - self.query_map = partial(self._retry, "query_map") - self.query_multi = partial(self._retry, "query_multi") - self.query_multiple = partial(self._retry, "query_multiple") - self.reload_type_registry = partial(self._retry, "reload_type_registry") - self.retrieve_extrinsic_by_hash = partial( - self._retry, "retrieve_extrinsic_by_hash" + retry, "get_metadata_storage_functions" ) + self.get_parent_block_hash = partial(retry, "get_parent_block_hash") + self.get_payment_info = partial(retry, "get_payment_info") + self.get_storage_item = partial(retry, "get_storage_item") + self.get_type_definition = partial(retry, "get_type_definition") + self.get_type_registry = partial(retry, "get_type_registry") + self.init_runtime = partial(retry, "init_runtime") + self.initialize = partial(retry, "initialize") + self.is_valid_ss58_address = partial(retry, "is_valid_ss58_address") + self.load_runtime = partial(retry, "load_runtime") + self.make_payload = partial(retry, "make_payload") + self.query = partial(retry, "query") + self.query_map = partial(retry, "query_map") + self.query_multi = partial(retry, "query_multi") + self.query_multiple = partial(retry, "query_multiple") + self.reload_type_registry = partial(retry, "reload_type_registry") + self.retrieve_extrinsic_by_hash = partial(retry, "retrieve_extrinsic_by_hash") self.retrieve_extrinsic_by_identifier = partial( - self._retry, "retrieve_extrinsic_by_identifier" + retry, "retrieve_extrinsic_by_identifier" ) - self.rpc_request = partial(self._retry, "rpc_request") - self.runtime_call = partial(self._retry, "runtime_call") - self.search_block_number = partial(self._retry, "search_block_number") - self.serialize_constant = partial(self._retry, "serialize_constant") - self.serialize_module_call = partial(self._retry, "serialize_module_call") - self.serialize_module_error = partial(self._retry, "serialize_module_error") - self.serialize_module_event = partial(self._retry, "serialize_module_event") - self.serialize_storage_item = partial(self._retry, "serialize_storage_item") - self.ss58_decode = partial(self._retry, "ss58_decode") - self.ss58_encode = partial(self._retry, "ss58_encode") - self.submit_extrinsic = partial(self._retry, "submit_extrinsic") - self.subscribe_block_headers = partial(self._retry, "subscribe_block_headers") - self.supports_rpc_method = partial(self._retry, "supports_rpc_method") + self.rpc_request = partial(retry, "rpc_request") + self.runtime_call = partial(retry, "runtime_call") + self.search_block_number = partial(retry, "search_block_number") + self.serialize_constant = partial(retry, "serialize_constant") + self.serialize_module_call = partial(retry, "serialize_module_call") + self.serialize_module_error = partial(retry, "serialize_module_error") + self.serialize_module_event = partial(retry, "serialize_module_event") + self.serialize_storage_item = partial(retry, "serialize_storage_item") + self.ss58_decode = partial(retry, "ss58_decode") + self.ss58_encode = partial(retry, "ss58_encode") + self.submit_extrinsic = partial(retry, "submit_extrinsic") + self.subscribe_block_headers = partial(retry, "subscribe_block_headers") + self.supports_rpc_method = partial(retry, "supports_rpc_method") self.ws = self._substrate.ws def _retry(self, method, *args, **kwargs): try: method_ = getattr(self._substrate, method) return method_(*args, **kwargs) - except MaxRetriesExceeded: + except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: try: next_network = next(self.fallback_chains) + if e.__class__ == MaxRetriesExceeded: + logging.error( + f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}." + ) + else: + print(f"Connection error. Trying again with {next_network}") + self._substrate = self._substrate_class( + url=next_network, + ss58_format=self.ss58_format, + type_registry=self.type_registry, + use_remote_preset=self.use_remote_preset, + chain_name=self.chain_name, + _mock=self._mock, + ) + method_ = getattr(self._substrate, method) + return self._retry(method_(*args, **kwargs)) + except StopIteration: logging.error( - f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}." + f"Max retries exceeded with {self._substrate.url}. No more fallback chains." ) + raise MaxRetriesExceeded + + async def _async_retry(self, method, *args, **kwargs): + try: + method_ = getattr(self._substrate, method) + if asyncio.iscoroutinefunction(method_): + return await method_(*args, **kwargs) + else: + return method_(*args, **kwargs) + except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: + try: + next_network = next(self.fallback_chains) + if e.__class__ == MaxRetriesExceeded: + logging.error( + f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}." + ) + else: + print(f"Connection error. Trying again with {next_network}") self._substrate = self._substrate_class( url=next_network, ss58_format=self.ss58_format, @@ -181,7 +216,10 @@ def _retry(self, method, *args, **kwargs): _mock=self._mock, ) method_ = getattr(self._substrate, method) - return self._retry(method_(*args, **kwargs)) + if asyncio.iscoroutinefunction(method_): + return await method_(*args, **kwargs) + else: + return method_(*args, **kwargs) except StopIteration: logging.error( f"Max retries exceeded with {self._substrate.url}. No more fallback chains." From f590e5a804dd4bfbdf9e3f52ccf0a57ce026f8f3 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 18 Apr 2025 15:44:58 +0200 Subject: [PATCH 3/5] code cleanup --- bittensor/core/types.py | 49 ++++++++++++++++------------------------- 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/bittensor/core/types.py b/bittensor/core/types.py index 9f79b8d49d..833130b853 100644 --- a/bittensor/core/types.py +++ b/bittensor/core/types.py @@ -168,21 +168,7 @@ def _retry(self, method, *args, **kwargs): return method_(*args, **kwargs) except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: try: - next_network = next(self.fallback_chains) - if e.__class__ == MaxRetriesExceeded: - logging.error( - f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}." - ) - else: - print(f"Connection error. Trying again with {next_network}") - self._substrate = self._substrate_class( - url=next_network, - ss58_format=self.ss58_format, - type_registry=self.type_registry, - use_remote_preset=self.use_remote_preset, - chain_name=self.chain_name, - _mock=self._mock, - ) + self._reinstantiate_substrate() method_ = getattr(self._substrate, method) return self._retry(method_(*args, **kwargs)) except StopIteration: @@ -200,21 +186,7 @@ async def _async_retry(self, method, *args, **kwargs): return method_(*args, **kwargs) except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: try: - next_network = next(self.fallback_chains) - if e.__class__ == MaxRetriesExceeded: - logging.error( - f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}." - ) - else: - print(f"Connection error. Trying again with {next_network}") - self._substrate = self._substrate_class( - url=next_network, - ss58_format=self.ss58_format, - type_registry=self.type_registry, - use_remote_preset=self.use_remote_preset, - chain_name=self.chain_name, - _mock=self._mock, - ) + self._reinstantiate_substrate(e) method_ = getattr(self._substrate, method) if asyncio.iscoroutinefunction(method_): return await method_(*args, **kwargs) @@ -226,6 +198,23 @@ async def _async_retry(self, method, *args, **kwargs): ) raise MaxRetriesExceeded + def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: + next_network = next(self.fallback_chains) + if e.__class__ == MaxRetriesExceeded: + logging.error( + f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}." + ) + else: + print(f"Connection error. Trying again with {next_network}") + self._substrate = self._substrate_class( + url=next_network, + ss58_format=self.ss58_format, + type_registry=self.type_registry, + use_remote_preset=self.use_remote_preset, + chain_name=self.chain_name, + _mock=self._mock, + ) + class SubtensorMixin(ABC): network: str From c4f99f5cffe4e38efb9bb095b270c61012b70e3c Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 25 Apr 2025 15:54:56 +0200 Subject: [PATCH 4/5] [WIP] moving to async-substrate-interface --- bittensor/core/types.py | 199 ---------------------------------------- 1 file changed, 199 deletions(-) diff --git a/bittensor/core/types.py b/bittensor/core/types.py index 833130b853..4137b9541d 100644 --- a/bittensor/core/types.py +++ b/bittensor/core/types.py @@ -16,205 +16,6 @@ from bittensor.core.chain_data import NeuronInfo, NeuronInfoLite from bittensor.utils import determine_chain_endpoint_and_network -SubstrateClass = Type[Union[SubstrateInterface, AsyncSubstrateInterface]] - - -class RetrySubstrate: - def __init__( - self, - substrate: SubstrateClass, - main_url: str, - ss58_format: int, - type_registry: dict, - use_remote_preset: bool, - chain_name: str, - _mock: bool, - fallback_chains: Optional[list[str]] = None, - retry_forever: bool = False, - ): - fallback_chains = fallback_chains or [] - self._substrate_class: SubstrateClass = substrate - self.ss58_format: int = ss58_format - self.type_registry: dict = type_registry - self.use_remote_preset: bool = use_remote_preset - self.chain_name: str = chain_name - self._mock = _mock - self.fallback_chains = ( - iter(fallback_chains) - if not retry_forever - else cycle(fallback_chains + [main_url]) - ) - initialized = False - for chain_url in [main_url] + fallback_chains: - try: - self._substrate = self._substrate_class( - url=chain_url, - ss58_format=ss58_format, - type_registry=type_registry, - use_remote_preset=use_remote_preset, - chain_name=chain_name, - _mock=_mock, - ) - initialized = True - break - except ConnectionError: - logging.warning(f"Unable to connect to {chain_url}") - if not initialized: - raise ConnectionError( - f"Unable to connect at any chains specified: {[main_url]+fallback_chains}" - ) - - # retries - - # TODO: properties that need retry logic - # properties - # version - # token_decimals - # token_symbol - # name - - retry = ( - self._async_retry - if self._substrate_class == AsyncSubstrateInterface - else self._retry - ) - - self._get_block_handler = partial(retry, "_get_block_handler") - self.apply_type_registry_presets = partial(retry, "apply_type_registry_presets") - self.close = partial(retry, "close") - self.compose_call = partial(retry, "compose_call") - self.connect = partial(retry, "connect") - self.create_scale_object = partial(retry, "create_scale_object") - self.create_signed_extrinsic = partial(retry, "create_signed_extrinsic") - self.create_storage_key = partial(retry, "create_storage_key") - self.decode_scale = partial(retry, "decode_scale") - self.encode_scale = partial(retry, "encode_scale") - self.extension_call = partial(retry, "extension_call") - self.filter_events = partial(retry, "filter_events") - self.filter_extrinsics = partial(retry, "filter_extrinsics") - self.generate_signature_payload = partial(retry, "generate_signature_payload") - self.get_account_next_index = partial(retry, "get_account_next_index") - self.get_account_nonce = partial(retry, "get_account_nonce") - self.get_block = partial(retry, "get_block") - self.get_block_hash = partial(retry, "get_block_hash") - self.get_block_header = partial(retry, "get_block_header") - self.get_block_metadata = partial(retry, "get_block_metadata") - self.get_block_number = partial(retry, "get_block_number") - self.get_block_runtime_info = partial(retry, "get_block_runtime_info") - self.get_block_runtime_version_for = partial( - retry, "get_block_runtime_version_for" - ) - self.get_block_timestamp = partial(retry, "get_block_timestamp") - self.get_chain_finalised_head = partial(retry, "get_chain_finalised_head") - self.get_chain_head = partial(retry, "get_chain_head") - self.get_constant = partial(retry, "get_constant") - self.get_events = partial(retry, "get_events") - self.get_extrinsics = partial(retry, "get_extrinsics") - self.get_metadata_call_function = partial(retry, "get_metadata_call_function") - self.get_metadata_constant = partial(retry, "get_metadata_constant") - self.get_metadata_error = partial(retry, "get_metadata_error") - self.get_metadata_errors = partial(retry, "get_metadata_errors") - self.get_metadata_module = partial(retry, "get_metadata_module") - self.get_metadata_modules = partial(retry, "get_metadata_modules") - self.get_metadata_runtime_call_function = partial( - retry, "get_metadata_runtime_call_function" - ) - self.get_metadata_runtime_call_functions = partial( - retry, "get_metadata_runtime_call_functions" - ) - self.get_metadata_storage_function = partial( - retry, "get_metadata_storage_function" - ) - self.get_metadata_storage_functions = partial( - retry, "get_metadata_storage_functions" - ) - self.get_parent_block_hash = partial(retry, "get_parent_block_hash") - self.get_payment_info = partial(retry, "get_payment_info") - self.get_storage_item = partial(retry, "get_storage_item") - self.get_type_definition = partial(retry, "get_type_definition") - self.get_type_registry = partial(retry, "get_type_registry") - self.init_runtime = partial(retry, "init_runtime") - self.initialize = partial(retry, "initialize") - self.is_valid_ss58_address = partial(retry, "is_valid_ss58_address") - self.load_runtime = partial(retry, "load_runtime") - self.make_payload = partial(retry, "make_payload") - self.query = partial(retry, "query") - self.query_map = partial(retry, "query_map") - self.query_multi = partial(retry, "query_multi") - self.query_multiple = partial(retry, "query_multiple") - self.reload_type_registry = partial(retry, "reload_type_registry") - self.retrieve_extrinsic_by_hash = partial(retry, "retrieve_extrinsic_by_hash") - self.retrieve_extrinsic_by_identifier = partial( - retry, "retrieve_extrinsic_by_identifier" - ) - self.rpc_request = partial(retry, "rpc_request") - self.runtime_call = partial(retry, "runtime_call") - self.search_block_number = partial(retry, "search_block_number") - self.serialize_constant = partial(retry, "serialize_constant") - self.serialize_module_call = partial(retry, "serialize_module_call") - self.serialize_module_error = partial(retry, "serialize_module_error") - self.serialize_module_event = partial(retry, "serialize_module_event") - self.serialize_storage_item = partial(retry, "serialize_storage_item") - self.ss58_decode = partial(retry, "ss58_decode") - self.ss58_encode = partial(retry, "ss58_encode") - self.submit_extrinsic = partial(retry, "submit_extrinsic") - self.subscribe_block_headers = partial(retry, "subscribe_block_headers") - self.supports_rpc_method = partial(retry, "supports_rpc_method") - self.ws = self._substrate.ws - - def _retry(self, method, *args, **kwargs): - try: - method_ = getattr(self._substrate, method) - return method_(*args, **kwargs) - except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: - try: - self._reinstantiate_substrate() - method_ = getattr(self._substrate, method) - return self._retry(method_(*args, **kwargs)) - except StopIteration: - logging.error( - f"Max retries exceeded with {self._substrate.url}. No more fallback chains." - ) - raise MaxRetriesExceeded - - async def _async_retry(self, method, *args, **kwargs): - try: - method_ = getattr(self._substrate, method) - if asyncio.iscoroutinefunction(method_): - return await method_(*args, **kwargs) - else: - return method_(*args, **kwargs) - except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: - try: - self._reinstantiate_substrate(e) - method_ = getattr(self._substrate, method) - if asyncio.iscoroutinefunction(method_): - return await method_(*args, **kwargs) - else: - return method_(*args, **kwargs) - except StopIteration: - logging.error( - f"Max retries exceeded with {self._substrate.url}. No more fallback chains." - ) - raise MaxRetriesExceeded - - def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: - next_network = next(self.fallback_chains) - if e.__class__ == MaxRetriesExceeded: - logging.error( - f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}." - ) - else: - print(f"Connection error. Trying again with {next_network}") - self._substrate = self._substrate_class( - url=next_network, - ss58_format=self.ss58_format, - type_registry=self.type_registry, - use_remote_preset=self.use_remote_preset, - chain_name=self.chain_name, - _mock=self._mock, - ) - class SubtensorMixin(ABC): network: str From 0e2f934b81612bd86490adfb119bb7f0604720d5 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 25 Apr 2025 17:39:26 +0200 Subject: [PATCH 5/5] [WIP] moving to async-substrate-interface --- bittensor/core/async_subtensor.py | 3 ++- bittensor/core/subtensor.py | 3 ++- bittensor/core/types.py | 8 +------- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/bittensor/core/async_subtensor.py b/bittensor/core/async_subtensor.py index 51170ab1d6..94759d4a22 100644 --- a/bittensor/core/async_subtensor.py +++ b/bittensor/core/async_subtensor.py @@ -9,6 +9,7 @@ import numpy as np import scalecodec from async_substrate_interface import AsyncSubstrateInterface +async_substrate_interface.substrate_addons import RetrySubstrate from bittensor_commit_reveal import get_encrypted_commitment from bittensor_wallet.utils import SS58_FORMAT from numpy.typing import NDArray @@ -79,7 +80,7 @@ ) from bittensor.core.metagraph import AsyncMetagraph from bittensor.core.settings import version_as_int, TYPE_REGISTRY -from bittensor.core.types import ParamWithTypes, SubtensorMixin, RetrySubstrate +from bittensor.core.types import ParamWithTypes, SubtensorMixin from bittensor.utils import ( Certificate, decode_hex_identity_dict, diff --git a/bittensor/core/subtensor.py b/bittensor/core/subtensor.py index 5e92f7b38d..a3abec48ff 100644 --- a/bittensor/core/subtensor.py +++ b/bittensor/core/subtensor.py @@ -7,6 +7,7 @@ import scalecodec from async_substrate_interface.errors import SubstrateRequestException from async_substrate_interface.sync_substrate import SubstrateInterface +from async_substrate_interface.substrate_addons import RetrySubstrate from async_substrate_interface.types import ScaleObj from bittensor_commit_reveal import get_encrypted_commitment from numpy.typing import NDArray @@ -81,7 +82,7 @@ SS58_FORMAT, TYPE_REGISTRY, ) -from bittensor.core.types import ParamWithTypes, SubtensorMixin, RetrySubstrate +from bittensor.core.types import ParamWithTypes, SubtensorMixin from bittensor.utils import ( Certificate, decode_hex_identity_dict, diff --git a/bittensor/core/types.py b/bittensor/core/types.py index 4137b9541d..c7ae4e2e3d 100644 --- a/bittensor/core/types.py +++ b/bittensor/core/types.py @@ -1,13 +1,7 @@ -import asyncio from abc import ABC import argparse -from functools import partial -from itertools import cycle -from typing import TypedDict, Optional, Union, Type +from typing import TypedDict, Optional -from async_substrate_interface.sync_substrate import SubstrateInterface -from async_substrate_interface.async_substrate import AsyncSubstrateInterface -from async_substrate_interface.errors import MaxRetriesExceeded from bittensor.utils import networking, Certificate from bittensor.utils.btlogging import logging