Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed mypy errors #480

Merged
merged 5 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions multiversx_sdk_cli/cli_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ def check(args: Any):
if len(missing_dependencies):
raise errors.DependenciesMissing(missing_dependencies)
return
else:
module = dependencies.get_module_by_key(name)
tag_to_check: str = config.get_dependency_tag(module.key)

is_installed = check_module_is_installed(module, tag_to_check)
if is_installed and name != "rust":
logger.info(f"[{module.key} {tag_to_check}] is installed.")
return
elif not is_installed:
raise errors.DependencyMissing(module.key, tag_to_check)

module = dependencies.get_module_by_key(name)
tag_to_check = config.get_dependency_tag(module.key)

is_installed = check_module_is_installed(module, tag_to_check)
if is_installed:
logger.info(f"[{module.key} {tag_to_check}] is installed.")
return
elif not is_installed:
raise errors.DependencyMissing(module.key, tag_to_check)


def check_module_is_installed(module: DependencyModule, tag_to_check: str) -> bool:
Expand Down
32 changes: 16 additions & 16 deletions multiversx_sdk_cli/cli_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,30 +318,28 @@ def prepare_account(args: Any):
return Account.new_from_pem(file_path=Path(args.pem), index=args.pem_index, hrp=hrp)
elif args.keyfile:
password = load_password(args)
account = Account.new_from_keystore(
return Account.new_from_keystore(
file_path=Path(args.keyfile),
password=password,
address_index=args.address_index,
hrp=hrp,
)
elif args.ledger:
account = LedgerAccount(address_index=args.ledger_address_index)
return LedgerAccount(address_index=args.ledger_address_index)
else:
raise errors.NoWalletProvided()

return account


def prepare_relayer_account(args: Any) -> IAccount:
hrp = config.get_address_hrp()

if args.relayer_ledger:
account = LedgerAccount(address_index=args.relayer_ledger_address_index)
return LedgerAccount(address_index=args.relayer_ledger_address_index)
if args.relayer_pem:
account = Account.new_from_pem(file_path=Path(args.relayer_pem), index=args.relayer_pem_index, hrp=hrp)
return Account.new_from_pem(file_path=Path(args.relayer_pem), index=args.relayer_pem_index, hrp=hrp)
elif args.relayer_keyfile:
password = load_password(args)
account = Account.new_from_keystore(
return Account.new_from_keystore(
file_path=Path(args.relayer_keyfile),
password=password,
address_index=args.relayer_address_index,
Expand All @@ -350,29 +348,25 @@ def prepare_relayer_account(args: Any) -> IAccount:
else:
raise errors.NoWalletProvided()

return account


def prepare_guardian_account(args: Any) -> IAccount:
hrp = config.get_address_hrp()

if args.guardian_pem:
account = Account.new_from_pem(file_path=Path(args.guardian_pem), index=args.guardian_pem_index, hrp=hrp)
return Account.new_from_pem(file_path=Path(args.guardian_pem), index=args.guardian_pem_index, hrp=hrp)
elif args.guardian_keyfile:
password = load_guardian_password(args)
account = Account.new_from_keystore(
return Account.new_from_keystore(
file_path=Path(args.guardian_keyfile),
password=password,
address_index=args.guardian_address_index,
hrp=hrp,
)
elif args.guardian_ledger:
account = LedgerAccount(address_index=args.relayer_ledger_address_index)
return LedgerAccount(address_index=args.relayer_ledger_address_index)
else:
raise errors.NoWalletProvided()

return account


def load_sender_account(args: Any) -> Union[IAccount, None]:
hrp = config.get_address_hrp()
Expand Down Expand Up @@ -416,7 +410,7 @@ def get_guardian_address(guardian: Union[IAccount, None], args: Any) -> Union[Ad
address_from_account = guardian.address if guardian else None
address_from_args = Address.new_from_bech32(args.guardian) if hasattr(args, "guardian") and args.guardian else None

if address_from_account and address_from_args and address_from_account != address_from_args:
if not _is_matching_address(address_from_account, address_from_args):
raise IncorrectWalletError("Guardian wallet does not match the guardian's address set in the arguments.")

return address_from_account or address_from_args
Expand All @@ -426,12 +420,18 @@ def get_relayer_address(relayer: Union[IAccount, None], args: Any) -> Union[Addr
address_from_account = relayer.address if relayer else None
address_from_args = Address.new_from_bech32(args.relayer) if hasattr(args, "relayer") and args.relayer else None

if address_from_account and address_from_args and address_from_account != address_from_args:
if not _is_matching_address(address_from_account, address_from_args):
raise IncorrectWalletError("Relayer wallet does not match the relayer's address set in the arguments.")

return address_from_account or address_from_args


def _is_matching_address(account_address: Union[Address, None], args_address: Union[Address, None]) -> bool:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if account_address and args_address and account_address != args_address:
return False
return True


def load_relayer_account(args: Any) -> Union[IAccount, None]:
hrp = config.get_address_hrp()

Expand Down
2 changes: 1 addition & 1 deletion multiversx_sdk_cli/cli_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def setup_parser(args: list[str], subparsers: Any) -> Any:
f"Sign a previously saved transaction.{CLIOutputBuilder.describe()}",
)
# we add the wallet args, but don't make the args mandatory
cli_shared.add_wallet_args(args, sub, True)
cli_shared.add_wallet_args(args=args, sub=sub, skip_required_check=True)
cli_shared.add_infile_arg(sub, what="a previously saved transaction")
cli_shared.add_outfile_arg(sub, what="the signed transaction")
cli_shared.add_broadcast_args(sub)
Expand Down
67 changes: 21 additions & 46 deletions multiversx_sdk_cli/dns.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
from typing import Any, List, Optional, Protocol, Union
from typing import Any, List, Protocol

from Cryptodome.Hash import keccak
from multiversx_sdk import (
Address,
AddressComputer,
AwaitingOptions,
NetworkConfig,
TransactionOnNetwork,
TransactionsFactoryConfig,
SmartContractQuery,
SmartContractQueryResponse,
)

from multiversx_sdk_cli import cli_shared, utils
from multiversx_sdk_cli.config import get_address_hrp
from multiversx_sdk_cli.constants import ADDRESS_ZERO_HEX
from multiversx_sdk_cli.contracts import SmartContract
from multiversx_sdk_cli.transactions import do_prepare_transaction

MaxNumShards = 256
Expand All @@ -23,53 +20,39 @@

# fmt: off
class INetworkProvider(Protocol):
def query_contract(self, query: Any) -> Any:
...

def get_network_config(self) -> NetworkConfig:
...

def await_transaction_completed(
self, transaction_hash: Union[bytes, str], options: Optional[AwaitingOptions] = None
) -> TransactionOnNetwork:
def query_contract(self, query: SmartContractQuery) -> SmartContractQueryResponse:
...
# fmt: on


def resolve(name: str, proxy: INetworkProvider) -> Address:
name_arg = "0x{}".format(str.encode(name).hex())
dns_address = dns_address_for_name(name)

response = _query_contract(contract_address=dns_address, proxy=proxy, function="resolve", args=[name_arg])
response = _query_contract(contract_address=dns_address, proxy=proxy, function="resolve", args=[name.encode()])

if len(response) == 0:
if len(response.return_data_parts) == 0:
return Address.new_from_hex(ADDRESS_ZERO_HEX, get_address_hrp())

result = response[0].get("returnDataParts")[0]
return Address.new_from_hex(result, get_address_hrp())
result = response.return_data_parts[0]
return Address(result, get_address_hrp())


def validate_name(name: str, shard_id: int, proxy: INetworkProvider):
name_arg = "0x{}".format(str.encode(name).hex())
dns_address = compute_dns_address_for_shard_id(shard_id)

response = _query_contract(
contract_address=dns_address,
proxy=proxy,
function="validateName",
args=[name_arg],
args=[name.encode()],
)

response = response[0]

return_code = response["returnCode"]
return_code: str = response.return_code
if return_code == "ok":
print(f"name [{name}] is valid")
else:
print(f"name [{name}] is invalid")

print(response)


def register(args: Any):
args = utils.as_object(args)
Expand Down Expand Up @@ -107,22 +90,18 @@ def registration_cost(shard_id: int, proxy: INetworkProvider) -> int:
args=[],
)

response = response[0]

data = response["returnDataParts"][0]
data = response.return_data_parts[0]
if not data:
return 0
else:
return int("0x{}".format(data))
return int.from_bytes(data, byteorder="big", signed=False)


def version(shard_id: int, proxy: INetworkProvider) -> str:
dns_address = compute_dns_address_for_shard_id(shard_id)

response = _query_contract(contract_address=dns_address, proxy=proxy, function="version", args=[])

response = response[0]
return bytearray.fromhex(response["returnDataParts"][0]).decode()
return response.return_data_parts[0].decode()


def dns_address_for_name(name: str) -> Address:
Expand All @@ -147,15 +126,11 @@ def dns_register_data(name: str) -> str:
return "register@{}".format(name_enc.hex())


def _query_contract(contract_address: Address, proxy: INetworkProvider, function: str, args: List[Any]) -> List[Any]:
chain_id = proxy.get_network_config().chain_id
config = TransactionsFactoryConfig(chain_id)
contract = SmartContract(config)

return contract.query_contract(
contract_address=contract_address,
proxy=proxy,
function=function,
arguments=args,
should_prepare_args=False,
)
def _query_contract(
contract_address: Address,
proxy: INetworkProvider,
function: str,
args: List[bytes],
) -> SmartContractQueryResponse:
query = SmartContractQuery(contract=contract_address, function=function, arguments=args)
return proxy.query_contract(query)
4 changes: 4 additions & 0 deletions multiversx_sdk_cli/localnet/config_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@

from multiversx_sdk_cli.localnet import config_default
from multiversx_sdk_cli.localnet.config_part import ConfigPart
from multiversx_sdk_cli.localnet.config_sharding import Metashard, RegularShards
from multiversx_sdk_cli.localnet.constants import METACHAIN_ID
from multiversx_sdk_cli.localnet.node import Node

logger = logging.getLogger("localnet")


class ConfigRoot(ConfigPart):
shards: RegularShards
metashard: Metashard
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move to constructor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


def __init__(self):
self.general = config_default.general
self.software = config_default.software
Expand Down
4 changes: 2 additions & 2 deletions multiversx_sdk_cli/localnet/config_software.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ def _verify(self):
f"In configuration section '{self.get_name()}', resolution is '{self.resolution.value}', but 'local_path' is not a directory: {self.local_path}"
)

def get_archive_download_folder(self):
def get_archive_download_folder(self) -> Path:
return self.archive_download_folder.expanduser().resolve()

def get_archive_extraction_folder(self):
def get_archive_extraction_folder(self) -> Path:
return self.archive_extraction_folder.expanduser().resolve()

def get_local_path(self):
Expand Down
10 changes: 5 additions & 5 deletions multiversx_sdk_cli/native_auth_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ def _get_current_block_hash_using_gateway(self) -> str:
round = self._get_current_round()
url = f"{self.config.gateway_url}/blocks/by-round/{round}"
response = self._execute_request(url)
blocks = response["data"]["blocks"]
block = [b for b in blocks if b["shard"] == self.config.block_hash_shard][0]
blocks: list[dict[str, Any]] = response["data"]["blocks"]
block: dict[str, str] = [b for b in blocks if b["shard"] == self.config.block_hash_shard][0]
return block["hash"]

def _get_current_round(self) -> int:
Expand All @@ -67,14 +67,14 @@ def _get_current_round(self) -> int:

url = f"{self.config.gateway_url}/network/status/{self.config.block_hash_shard}"
response = self._execute_request(url)
status = response["data"]["status"]
status: dict[str, int] = response["data"]["status"]

return status["erd_current_round"]

def _get_current_block_hash_using_api(self) -> str:
try:
url = f"{self.config.api_url}/blocks/latest?ttl={self.config.expiry_seconds}&fields=hash"
response = self._execute_request(url)
response: dict[str, str] = self._execute_request(url)
if response["hash"]:
return response["hash"]
except Exception:
Expand All @@ -88,7 +88,7 @@ def _get_current_block_hash_using_api_fallback(self) -> str:
if self.config.block_hash_shard:
url += f"&shard={self.config.block_hash_shard}"

response = self._execute_request(url)
response: list[dict[str, str]] = self._execute_request(url)
return response[0]["hash"]

def _encode_value(self, string: str) -> str:
Expand Down
5 changes: 3 additions & 2 deletions multiversx_sdk_cli/tests/test_cli_contracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,8 @@ def test_contract_query(capsys: Any):


def _read_stdout(capsys: Any) -> str:
return capsys.readouterr().out.strip()
stdout: str = capsys.readouterr().out.strip()
return stdout


def get_contract_address(capsys: Any):
Expand All @@ -633,5 +634,5 @@ def get_query_response(capsys: Any):

def get_transaction_data(capsys: Any) -> str:
out = _read_stdout(capsys)
output = json.loads(out)
output: dict[str, str] = json.loads(out)
return output["emittedTransactionData"]
3 changes: 2 additions & 1 deletion multiversx_sdk_cli/tests/test_cli_staking_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,8 @@ def test_withdraw(capsys: Any):


def _read_stdout(capsys: Any) -> str:
return capsys.readouterr().out.strip()
stdout: str = capsys.readouterr().out.strip()
return stdout


def get_transaction(capsys: Any):
Expand Down
3 changes: 2 additions & 1 deletion multiversx_sdk_cli/tests/test_cli_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,4 +394,5 @@ def test_sign_transaction(capsys: Any):


def _read_stdout(capsys: Any) -> str:
return capsys.readouterr().out.strip()
stdout: str = capsys.readouterr().out.strip()
return stdout
7 changes: 3 additions & 4 deletions multiversx_sdk_cli/tests/test_cli_wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,7 @@ def test_wallet_convert_pem_to_pubkey(capsys: Any):
def test_wallet_convert_pem_to_secret_key(capsys: Any):
infile = testdata_path / "alice.pem"

main([
"wallet", "convert", "--infile", str(infile), "--in-format", "pem", "--out-format", "secret-key"
])
main(["wallet", "convert", "--infile", str(infile), "--in-format", "pem", "--out-format", "secret-key"])

out = _read_stdout(capsys).strip("Output:\n\n")
assert out == "413f42575f7f26fad3317a778771212fdb80245850981e48b58a4f25e344e8f9"
Expand Down Expand Up @@ -538,7 +536,8 @@ def _read_stdout_wallet_address(capsys: Any) -> str:


def _read_stdout(capsys: Any) -> str:
return capsys.readouterr().out.strip()
stdout: str = capsys.readouterr().out.strip()
return stdout


def _mock_getpass(monkeypatch: Any, password: str):
Expand Down
2 changes: 1 addition & 1 deletion multiversx_sdk_cli/tests/test_shared.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any

import pytest

Check failure on line 3 in multiversx_sdk_cli/tests/test_shared.py

View workflow job for this annotation

GitHub Actions / runner / mypy

[mypy] reported by reviewdog 🐶 Cannot find implementation or library stub for module named "pytest" [import-not-found] Raw Output: /home/runner/work/mx-sdk-py-cli/mx-sdk-py-cli/multiversx_sdk_cli/tests/test_shared.py:3:1: error: Cannot find implementation or library stub for module named "pytest" [import-not-found]

from multiversx_sdk_cli.cli_shared import prepare_chain_id_in_args
from multiversx_sdk_cli.errors import ArgumentsNotProvidedError
Expand All @@ -10,7 +10,7 @@
pass


def test_prepare_chain_id_in_args():
def test_prepare_chain_id_in_args() -> None:
args: Any = Args()
args.chain = None
args.proxy = None
Expand Down
Loading
Loading