Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed mypy errors #480

Merged
merged 5 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions multiversx_sdk_cli/cli_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ def check(args: Any):
if len(missing_dependencies):
raise errors.DependenciesMissing(missing_dependencies)
return
else:
module = dependencies.get_module_by_key(name)
tag_to_check: str = config.get_dependency_tag(module.key)

is_installed = check_module_is_installed(module, tag_to_check)
if is_installed and name != "rust":
logger.info(f"[{module.key} {tag_to_check}] is installed.")
return
elif not is_installed:
raise errors.DependencyMissing(module.key, tag_to_check)

module = dependencies.get_module_by_key(name)
tag_to_check = config.get_dependency_tag(module.key)

is_installed = check_module_is_installed(module, tag_to_check)
if is_installed:
logger.info(f"[{module.key} {tag_to_check}] is installed.")
return
elif not is_installed:
raise errors.DependencyMissing(module.key, tag_to_check)


def check_module_is_installed(module: DependencyModule, tag_to_check: str) -> bool:
Expand Down
32 changes: 16 additions & 16 deletions multiversx_sdk_cli/cli_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,30 +318,28 @@ def prepare_account(args: Any):
return Account.new_from_pem(file_path=Path(args.pem), index=args.pem_index, hrp=hrp)
elif args.keyfile:
password = load_password(args)
account = Account.new_from_keystore(
return Account.new_from_keystore(
file_path=Path(args.keyfile),
password=password,
address_index=args.address_index,
hrp=hrp,
)
elif args.ledger:
account = LedgerAccount(address_index=args.ledger_address_index)
return LedgerAccount(address_index=args.ledger_address_index)
else:
raise errors.NoWalletProvided()

return account


def prepare_relayer_account(args: Any) -> IAccount:
hrp = config.get_address_hrp()

if args.relayer_ledger:
account = LedgerAccount(address_index=args.relayer_ledger_address_index)
return LedgerAccount(address_index=args.relayer_ledger_address_index)
if args.relayer_pem:
account = Account.new_from_pem(file_path=Path(args.relayer_pem), index=args.relayer_pem_index, hrp=hrp)
return Account.new_from_pem(file_path=Path(args.relayer_pem), index=args.relayer_pem_index, hrp=hrp)
elif args.relayer_keyfile:
password = load_password(args)
account = Account.new_from_keystore(
return Account.new_from_keystore(
file_path=Path(args.relayer_keyfile),
password=password,
address_index=args.relayer_address_index,
Expand All @@ -350,29 +348,25 @@ def prepare_relayer_account(args: Any) -> IAccount:
else:
raise errors.NoWalletProvided()

return account


def prepare_guardian_account(args: Any) -> IAccount:
hrp = config.get_address_hrp()

if args.guardian_pem:
account = Account.new_from_pem(file_path=Path(args.guardian_pem), index=args.guardian_pem_index, hrp=hrp)
return Account.new_from_pem(file_path=Path(args.guardian_pem), index=args.guardian_pem_index, hrp=hrp)
elif args.guardian_keyfile:
password = load_guardian_password(args)
account = Account.new_from_keystore(
return Account.new_from_keystore(
file_path=Path(args.guardian_keyfile),
password=password,
address_index=args.guardian_address_index,
hrp=hrp,
)
elif args.guardian_ledger:
account = LedgerAccount(address_index=args.relayer_ledger_address_index)
return LedgerAccount(address_index=args.relayer_ledger_address_index)
else:
raise errors.NoWalletProvided()

return account


def load_sender_account(args: Any) -> Union[IAccount, None]:
hrp = config.get_address_hrp()
Expand Down Expand Up @@ -416,7 +410,7 @@ def get_guardian_address(guardian: Union[IAccount, None], args: Any) -> Union[Ad
address_from_account = guardian.address if guardian else None
address_from_args = Address.new_from_bech32(args.guardian) if hasattr(args, "guardian") and args.guardian else None

if address_from_account and address_from_args and address_from_account != address_from_args:
if not _is_matching_address(address_from_account, address_from_args):
raise IncorrectWalletError("Guardian wallet does not match the guardian's address set in the arguments.")

return address_from_account or address_from_args
Expand All @@ -426,12 +420,18 @@ def get_relayer_address(relayer: Union[IAccount, None], args: Any) -> Union[Addr
address_from_account = relayer.address if relayer else None
address_from_args = Address.new_from_bech32(args.relayer) if hasattr(args, "relayer") and args.relayer else None

if address_from_account and address_from_args and address_from_account != address_from_args:
if not _is_matching_address(address_from_account, address_from_args):
raise IncorrectWalletError("Relayer wallet does not match the relayer's address set in the arguments.")

return address_from_account or address_from_args


def _is_matching_address(account_address: Union[Address, None], args_address: Union[Address, None]) -> bool:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if account_address and args_address and account_address != args_address:
return False
return True


def load_relayer_account(args: Any) -> Union[IAccount, None]:
hrp = config.get_address_hrp()

Expand Down
2 changes: 1 addition & 1 deletion multiversx_sdk_cli/cli_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def setup_parser(args: list[str], subparsers: Any) -> Any:
f"Sign a previously saved transaction.{CLIOutputBuilder.describe()}",
)
# we add the wallet args, but don't make the args mandatory
cli_shared.add_wallet_args(args, sub, True)
cli_shared.add_wallet_args(args=args, sub=sub, skip_required_check=True)
cli_shared.add_infile_arg(sub, what="a previously saved transaction")
cli_shared.add_outfile_arg(sub, what="the signed transaction")
cli_shared.add_broadcast_args(sub)
Expand Down
67 changes: 21 additions & 46 deletions multiversx_sdk_cli/dns.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
from typing import Any, List, Optional, Protocol, Union
from typing import Any, List, Protocol

from Cryptodome.Hash import keccak
from multiversx_sdk import (
Address,
AddressComputer,
AwaitingOptions,
NetworkConfig,
TransactionOnNetwork,
TransactionsFactoryConfig,
SmartContractQuery,
SmartContractQueryResponse,
)

from multiversx_sdk_cli import cli_shared, utils
from multiversx_sdk_cli.config import get_address_hrp
from multiversx_sdk_cli.constants import ADDRESS_ZERO_HEX
from multiversx_sdk_cli.contracts import SmartContract
from multiversx_sdk_cli.transactions import do_prepare_transaction

MaxNumShards = 256
Expand All @@ -23,53 +20,39 @@

# fmt: off
class INetworkProvider(Protocol):
def query_contract(self, query: Any) -> Any:
...

def get_network_config(self) -> NetworkConfig:
...

def await_transaction_completed(
self, transaction_hash: Union[bytes, str], options: Optional[AwaitingOptions] = None
) -> TransactionOnNetwork:
def query_contract(self, query: SmartContractQuery) -> SmartContractQueryResponse:
...
# fmt: on


def resolve(name: str, proxy: INetworkProvider) -> Address:
name_arg = "0x{}".format(str.encode(name).hex())
dns_address = dns_address_for_name(name)

response = _query_contract(contract_address=dns_address, proxy=proxy, function="resolve", args=[name_arg])
response = _query_contract(contract_address=dns_address, proxy=proxy, function="resolve", args=[name.encode()])

if len(response) == 0:
if len(response.return_data_parts) == 0:
return Address.new_from_hex(ADDRESS_ZERO_HEX, get_address_hrp())

result = response[0].get("returnDataParts")[0]
return Address.new_from_hex(result, get_address_hrp())
result = response.return_data_parts[0]
return Address(result, get_address_hrp())


def validate_name(name: str, shard_id: int, proxy: INetworkProvider):
name_arg = "0x{}".format(str.encode(name).hex())
dns_address = compute_dns_address_for_shard_id(shard_id)

response = _query_contract(
contract_address=dns_address,
proxy=proxy,
function="validateName",
args=[name_arg],
args=[name.encode()],
)

response = response[0]

return_code = response["returnCode"]
return_code: str = response.return_code
if return_code == "ok":
print(f"name [{name}] is valid")
else:
print(f"name [{name}] is invalid")

print(response)


def register(args: Any):
args = utils.as_object(args)
Expand Down Expand Up @@ -107,22 +90,18 @@ def registration_cost(shard_id: int, proxy: INetworkProvider) -> int:
args=[],
)

response = response[0]

data = response["returnDataParts"][0]
data = response.return_data_parts[0]
if not data:
return 0
else:
return int("0x{}".format(data))
return int.from_bytes(data, byteorder="big", signed=False)


def version(shard_id: int, proxy: INetworkProvider) -> str:
dns_address = compute_dns_address_for_shard_id(shard_id)

response = _query_contract(contract_address=dns_address, proxy=proxy, function="version", args=[])

response = response[0]
return bytearray.fromhex(response["returnDataParts"][0]).decode()
return response.return_data_parts[0].decode()


def dns_address_for_name(name: str) -> Address:
Expand All @@ -147,15 +126,11 @@ def dns_register_data(name: str) -> str:
return "register@{}".format(name_enc.hex())


def _query_contract(contract_address: Address, proxy: INetworkProvider, function: str, args: List[Any]) -> List[Any]:
chain_id = proxy.get_network_config().chain_id
config = TransactionsFactoryConfig(chain_id)
contract = SmartContract(config)

return contract.query_contract(
contract_address=contract_address,
proxy=proxy,
function=function,
arguments=args,
should_prepare_args=False,
)
def _query_contract(
contract_address: Address,
proxy: INetworkProvider,
function: str,
args: List[bytes],
) -> SmartContractQueryResponse:
query = SmartContractQuery(contract=contract_address, function=function, arguments=args)
return proxy.query_contract(query)
7 changes: 4 additions & 3 deletions multiversx_sdk_cli/localnet/config_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@

from multiversx_sdk_cli.localnet import config_default
from multiversx_sdk_cli.localnet.config_part import ConfigPart
from multiversx_sdk_cli.localnet.config_sharding import Metashard, RegularShards
from multiversx_sdk_cli.localnet.constants import METACHAIN_ID
from multiversx_sdk_cli.localnet.node import Node

logger = logging.getLogger("localnet")


class ConfigRoot(ConfigPart):
def __init__(self):
def __init__(self) -> None:
self.general = config_default.general
self.software = config_default.software
self.metashard = config_default.metashard
self.shards = config_default.shards
self.metashard: Metashard = config_default.metashard
self.shards: RegularShards = config_default.shards
self.networking = config_default.networking

def get_name(self) -> str:
Expand Down
4 changes: 2 additions & 2 deletions multiversx_sdk_cli/localnet/config_software.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ def _verify(self):
f"In configuration section '{self.get_name()}', resolution is '{self.resolution.value}', but 'local_path' is not a directory: {self.local_path}"
)

def get_archive_download_folder(self):
def get_archive_download_folder(self) -> Path:
return self.archive_download_folder.expanduser().resolve()

def get_archive_extraction_folder(self):
def get_archive_extraction_folder(self) -> Path:
return self.archive_extraction_folder.expanduser().resolve()

def get_local_path(self):
Expand Down
10 changes: 5 additions & 5 deletions multiversx_sdk_cli/native_auth_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ def _get_current_block_hash_using_gateway(self) -> str:
round = self._get_current_round()
url = f"{self.config.gateway_url}/blocks/by-round/{round}"
response = self._execute_request(url)
blocks = response["data"]["blocks"]
block = [b for b in blocks if b["shard"] == self.config.block_hash_shard][0]
blocks: list[dict[str, Any]] = response["data"]["blocks"]
block: dict[str, str] = [b for b in blocks if b["shard"] == self.config.block_hash_shard][0]
return block["hash"]

def _get_current_round(self) -> int:
Expand All @@ -67,14 +67,14 @@ def _get_current_round(self) -> int:

url = f"{self.config.gateway_url}/network/status/{self.config.block_hash_shard}"
response = self._execute_request(url)
status = response["data"]["status"]
status: dict[str, int] = response["data"]["status"]

return status["erd_current_round"]

def _get_current_block_hash_using_api(self) -> str:
try:
url = f"{self.config.api_url}/blocks/latest?ttl={self.config.expiry_seconds}&fields=hash"
response = self._execute_request(url)
response: dict[str, str] = self._execute_request(url)
if response["hash"]:
return response["hash"]
except Exception:
Expand All @@ -88,7 +88,7 @@ def _get_current_block_hash_using_api_fallback(self) -> str:
if self.config.block_hash_shard:
url += f"&shard={self.config.block_hash_shard}"

response = self._execute_request(url)
response: list[dict[str, str]] = self._execute_request(url)
return response[0]["hash"]

def _encode_value(self, string: str) -> str:
Expand Down
5 changes: 3 additions & 2 deletions multiversx_sdk_cli/tests/test_cli_contracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,8 @@ def test_contract_query(capsys: Any):


def _read_stdout(capsys: Any) -> str:
return capsys.readouterr().out.strip()
stdout: str = capsys.readouterr().out.strip()
return stdout


def get_contract_address(capsys: Any):
Expand All @@ -633,5 +634,5 @@ def get_query_response(capsys: Any):

def get_transaction_data(capsys: Any) -> str:
out = _read_stdout(capsys)
output = json.loads(out)
output: dict[str, str] = json.loads(out)
return output["emittedTransactionData"]
3 changes: 2 additions & 1 deletion multiversx_sdk_cli/tests/test_cli_staking_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,8 @@ def test_withdraw(capsys: Any):


def _read_stdout(capsys: Any) -> str:
return capsys.readouterr().out.strip()
stdout: str = capsys.readouterr().out.strip()
return stdout


def get_transaction(capsys: Any):
Expand Down
3 changes: 2 additions & 1 deletion multiversx_sdk_cli/tests/test_cli_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,4 +394,5 @@ def test_sign_transaction(capsys: Any):


def _read_stdout(capsys: Any) -> str:
return capsys.readouterr().out.strip()
stdout: str = capsys.readouterr().out.strip()
return stdout
7 changes: 3 additions & 4 deletions multiversx_sdk_cli/tests/test_cli_wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,7 @@ def test_wallet_convert_pem_to_pubkey(capsys: Any):
def test_wallet_convert_pem_to_secret_key(capsys: Any):
infile = testdata_path / "alice.pem"

main([
"wallet", "convert", "--infile", str(infile), "--in-format", "pem", "--out-format", "secret-key"
])
main(["wallet", "convert", "--infile", str(infile), "--in-format", "pem", "--out-format", "secret-key"])

out = _read_stdout(capsys).strip("Output:\n\n")
assert out == "413f42575f7f26fad3317a778771212fdb80245850981e48b58a4f25e344e8f9"
Expand Down Expand Up @@ -538,7 +536,8 @@ def _read_stdout_wallet_address(capsys: Any) -> str:


def _read_stdout(capsys: Any) -> str:
return capsys.readouterr().out.strip()
stdout: str = capsys.readouterr().out.strip()
return stdout


def _mock_getpass(monkeypatch: Any, password: str):
Expand Down
Loading
Loading