diff --git a/securesystemslib/signer/_azure_signer.py b/securesystemslib/signer/_azure_signer.py index a55b96c5..966448ae 100644 --- a/securesystemslib/signer/_azure_signer.py +++ b/securesystemslib/signer/_azure_signer.py @@ -6,7 +6,7 @@ import securesystemslib.hash as sslib_hash from securesystemslib.exceptions import UnsupportedLibraryError -from securesystemslib.signer._key import Key, SSlibKey +from securesystemslib.signer._key import SSlibKey from securesystemslib.signer._signer import SecretsHandler, Signature, Signer from securesystemslib.signer._utils import compute_default_keyid @@ -65,7 +65,7 @@ class AzureSigner(Signer): SCHEME = "azurekms" - def __init__(self, az_key_uri: str, public_key: Key): + def __init__(self, az_key_uri: str, public_key: SSlibKey): if AZURE_IMPORT_ERROR: raise UnsupportedLibraryError(AZURE_IMPORT_ERROR) @@ -78,14 +78,14 @@ def __init__(self, az_key_uri: str, public_key: Key): self.signature_algorithm = self._get_signature_algorithm( public_key, ) - self.hash_algorithm = self._get_hash_algorithm(public_key) + self.hash_algorithm = public_key.get_hash_algorithm_str() except UnsupportedKeyType as e: logger.info("Key %s has unsupported key type or unsupported elliptic curve") raise e self._public_key = public_key @property - def public_key(self) -> Key: + def public_key(self) -> SSlibKey: return self._public_key @staticmethod @@ -128,7 +128,7 @@ def _create_crypto_client( raise e @staticmethod - def _get_signature_algorithm(public_key: Key) -> "SignatureAlgorithm": + def _get_signature_algorithm(public_key: SSlibKey) -> "SignatureAlgorithm": """Return SignatureAlgorithm after parsing the public key""" if public_key.keytype != "ecdsa": logger.info("only EC keys are supported for now") @@ -147,23 +147,6 @@ def _get_signature_algorithm(public_key: Key) -> "SignatureAlgorithm": raise UnsupportedKeyType("Unsupported curve supplied by key") - @staticmethod - def _get_hash_algorithm(public_key: "Key") -> str: - """Return the hash algorithm used by the public key""" - # Format is "ecdsa-sha2-nistp256" - comps = public_key.scheme.split("-") - if len(comps) != 3: # noqa: PLR2004 - raise UnsupportedKeyType("Invalid scheme found") - - if comps[2] == "nistp256": - return "sha256" - if comps[2] == "nistp384": - return "sha384" - if comps[2] == "nistp521": - return "sha512" - - raise UnsupportedKeyType("Unsupported curve supplied by key") - @staticmethod def _get_keytype_and_scheme(crv: str) -> Tuple[str, str]: if crv == KeyCurveName.p_256: @@ -179,7 +162,7 @@ def _get_keytype_and_scheme(crv: str) -> Tuple[str, str]: def from_priv_key_uri( cls, priv_key_uri: str, - public_key: Key, + public_key: SSlibKey, secrets_handler: Optional[SecretsHandler] = None, ) -> "AzureSigner": uri = parse.urlparse(priv_key_uri) @@ -191,7 +174,7 @@ def from_priv_key_uri( return cls(az_key_uri, public_key) @classmethod - def import_(cls, az_vault_name: str, az_key_name: str) -> Tuple[str, Key]: + def import_(cls, az_vault_name: str, az_key_name: str) -> Tuple[str, SSlibKey]: """Load key and signer details from KMS Returns the private key uri and the public key. This method should only diff --git a/securesystemslib/signer/_crypto_signer.py b/securesystemslib/signer/_crypto_signer.py index 31ff5bc5..c6acd2f7 100644 --- a/securesystemslib/signer/_crypto_signer.py +++ b/securesystemslib/signer/_crypto_signer.py @@ -11,6 +11,7 @@ from securesystemslib.signer._signature import Signature from securesystemslib.signer._signer import SecretsHandler, Signer +# ruff: noqa: F401 CRYPTO_IMPORT_ERROR = None try: from cryptography.hazmat.primitives.asymmetric.ec import ( @@ -77,33 +78,6 @@ class _NoSignArgs: _ECDSA_KEYTYPES = ["ecdsa", "ecdsa-sha2-nistp256"] -def _get_hash_algorithm(name: str) -> "HashAlgorithm": - """Helper to return hash algorithm for name.""" - algorithm: HashAlgorithm - if name == "sha224": - algorithm = SHA224() - if name == "sha256": - algorithm = SHA256() - if name == "sha384": - algorithm = SHA384() - if name == "sha512": - algorithm = SHA512() - - return algorithm - - -def _get_rsa_padding(name: str, hash_algorithm: "HashAlgorithm") -> "AsymmetricPadding": - """Helper to return rsa signature padding for name.""" - padding: AsymmetricPadding - if name == "pss": - padding = PSS(mgf=MGF1(hash_algorithm), salt_length=PSS.DIGEST_LENGTH) - - if name == "pkcs1v15": - padding = PKCS1v15() - - return padding - - class CryptoSigner(Signer): """PYCA/cryptography Signer implementations. @@ -155,9 +129,8 @@ def __init__( if not isinstance(private_key, RSAPrivateKey): raise ValueError(f"invalid rsa key: {type(private_key)}") - padding_name, hash_name = public_key.scheme.split("-")[1:] - hash_algo = _get_hash_algorithm(hash_name) - padding = _get_rsa_padding(padding_name, hash_algo) + hash_algo = public_key.get_hash_algorithm() + padding = public_key.get_padding_name(hash_algo, PSS.DIGEST_LENGTH) self._sign_args = _RSASignArgs(padding, hash_algo) self._private_key = private_key diff --git a/securesystemslib/signer/_gcp_signer.py b/securesystemslib/signer/_gcp_signer.py index 4beaf829..8844faae 100644 --- a/securesystemslib/signer/_gcp_signer.py +++ b/securesystemslib/signer/_gcp_signer.py @@ -6,7 +6,7 @@ import securesystemslib.hash as sslib_hash from securesystemslib import exceptions -from securesystemslib.signer._key import Key, SSlibKey +from securesystemslib.signer._key import SSlibKey from securesystemslib.signer._signer import SecretsHandler, Signature, Signer from securesystemslib.signer._utils import compute_default_keyid @@ -55,24 +55,24 @@ class GCPSigner(Signer): SCHEME = "gcpkms" - def __init__(self, gcp_keyid: str, public_key: Key): + def __init__(self, gcp_keyid: str, public_key: SSlibKey): if GCP_IMPORT_ERROR: raise exceptions.UnsupportedLibraryError(GCP_IMPORT_ERROR) - self.hash_algorithm = self._get_hash_algorithm(public_key) + self.hash_algorithm = public_key.get_hash_algorithm_str() self.gcp_keyid = gcp_keyid self._public_key = public_key self.client = kms.KeyManagementServiceClient() @property - def public_key(self) -> Key: + def public_key(self) -> SSlibKey: return self._public_key @classmethod def from_priv_key_uri( cls, priv_key_uri: str, - public_key: Key, + public_key: SSlibKey, secrets_handler: Optional[SecretsHandler] = None, ) -> "GCPSigner": uri = parse.urlparse(priv_key_uri) @@ -83,7 +83,7 @@ def from_priv_key_uri( return cls(uri.path, public_key) @classmethod - def import_(cls, gcp_keyid: str) -> Tuple[str, Key]: + def import_(cls, gcp_keyid: str) -> Tuple[str, SSlibKey]: """Load key and signer details from KMS Returns the private key uri and the public key. This method should only @@ -155,33 +155,6 @@ def _get_keytype_and_scheme(algorithm: int) -> Tuple[str, str]: } return keytypes_and_schemes[algorithm] - @staticmethod - def _get_hash_algorithm(public_key: Key) -> str: - """Helper function to return payload hash algorithm used for this key""" - - # TODO: This could be a public abstract method on Key so that GCPSigner - # would not be tied to a specific Key implementation -- not all keys - # have a pre hash algorithm though. - if public_key.keytype == "rsa": - # hash algorithm is encoded as last scheme portion - algo = public_key.scheme.split("-")[-1] - elif public_key.keytype in [ - "ecdsa", - "ecdsa-sha2-nistp256", - "ecdsa-sha2-nistp384", - ]: - # nistp256 uses sha-256, nistp384 uses sha-384 - bits = public_key.scheme.split("-nistp")[-1] - algo = f"sha{bits}" - else: - raise exceptions.UnsupportedAlgorithmError( - f"Unsupported key type {public_key.keytype} in key {public_key.keyid}" - ) - - # trigger UnsupportedAlgorithm if appropriate - _ = sslib_hash.digest(algo) - return algo - def sign(self, payload: bytes) -> Signature: """Signs payload with Google Cloud KMS. diff --git a/securesystemslib/signer/_gpg_signer.py b/securesystemslib/signer/_gpg_signer.py index eae78aea..622e0d06 100644 --- a/securesystemslib/signer/_gpg_signer.py +++ b/securesystemslib/signer/_gpg_signer.py @@ -55,6 +55,18 @@ def verify_signature(self, signature: Signature, data: bytes) -> None: f"Unknown failure to verify signature by {self.keyid}" ) from e + def get_hash_algorithm_str(self) -> None: + raise NotImplementedError + + def get_hash_algorithm(self) -> None: + raise NotImplementedError + + def get_padding_name_str(self) -> None: + raise NotImplementedError + + def get_padding_name(self, hash_algorithm: None, salt_length: None) -> None: + raise NotImplementedError + class GPGSigner(Signer): """OpenPGP Signer diff --git a/securesystemslib/signer/_key.py b/securesystemslib/signer/_key.py index 88faeb35..842c913e 100644 --- a/securesystemslib/signer/_key.py +++ b/securesystemslib/signer/_key.py @@ -4,11 +4,13 @@ from abc import ABCMeta, abstractmethod from typing import Any, Dict, Optional, Tuple, Type, cast +import securesystemslib.hash as sslib_hash from securesystemslib._vendor.ed25519.ed25519 import ( SignatureMismatch, checkvalid, ) from securesystemslib.exceptions import ( + UnsupportedAlgorithmError, UnsupportedLibraryError, UnverifiedSignatureError, VerificationError, @@ -56,6 +58,11 @@ logger = logging.getLogger(__name__) + +class UnsupportedKeyType(Exception): # noqa: N818 + pass + + # NOTE Key dispatch table is defined here so it's usable by Key, # but is populated in __init__.py (and can be appended by users). KEY_FOR_TYPE_AND_SCHEME: Dict[Tuple[str, str], Type] = {} @@ -301,35 +308,6 @@ def from_crypto( return SSlibKey(keyid, keytype, scheme, keyval) - @staticmethod - def _get_hash_algorithm(name: str) -> "HashAlgorithm": - """Helper to return hash algorithm for name.""" - algorithm: HashAlgorithm - if name == "sha224": - algorithm = SHA224() - if name == "sha256": - algorithm = SHA256() - if name == "sha384": - algorithm = SHA384() - if name == "sha512": - algorithm = SHA512() - - return algorithm - - @staticmethod - def _get_rsa_padding( - name: str, hash_algorithm: "HashAlgorithm" - ) -> "AsymmetricPadding": - """Helper to return rsa signature padding for name.""" - padding: AsymmetricPadding - if name == "pss": - padding = PSS(mgf=MGF1(hash_algorithm), salt_length=PSS.AUTO) - - if name == "pkcs1v15": - padding = PKCS1v15() - - return padding - def _verify_ed25519_fallback(self, signature: bytes, data: bytes) -> None: """Helper to verify ed25519 sig if pyca/cryptography is unavailable.""" try: @@ -364,9 +342,8 @@ def _validate_curve(key, curve): ]: key = cast(RSAPublicKey, self._crypto_key()) _validate_type(key, RSAPublicKey) - padding_name, hash_name = self.scheme.split("-")[1:] - hash_algorithm = self._get_hash_algorithm(hash_name) - padding = self._get_rsa_padding(padding_name, hash_algorithm) + hash_algorithm = self.get_hash_algorithm() + padding = self.get_padding_name(hash_algorithm, PSS.AUTO) key.verify(signature, data, padding, hash_algorithm) elif ( @@ -426,3 +403,75 @@ def verify_signature(self, signature: Signature, data: bytes) -> None: raise VerificationError( f"Unknown failure to verify signature by {self.keyid}" ) from e + + def get_hash_algorithm_str(self) -> str: + """Returns the hash algorithm from the key scheme as a string.""" + # key scheme should always be of format xxx-xxx-xxx + comps = self.scheme.split("-") + if len(comps) != 3: # noqa: PLR2004 + raise UnsupportedKeyType("Invalid scheme found") + + if self.keytype == "rsa": + # hash algorithm is encoded as last scheme portion + hash_algo = self.scheme.split("-")[-1] + elif self.keytype in [ + "ecdsa", + "ecdsa-sha2-nistp256", + "ecdsa-sha2-nistp384", + ]: + # nistp256 uses sha-256, nistp384 uses sha-384 + bits = self.scheme.split("-nistp")[-1] + hash_algo = f"sha{bits}" + else: + raise UnsupportedAlgorithmError( + f"Unsupported key type {self.keytype} in key {self.keyid}" + ) + + # trigger UnsupportedAlgorithm if appropriate + _ = sslib_hash.digest(hash_algo) + + return hash_algo + + def get_hash_algorithm(self) -> "HashAlgorithm": + """Returns the hash algorithm from the key scheme as a HashAlgorithm""" + name = self.get_hash_algorithm_str() + algorithm: HashAlgorithm + if name == "sha224": + algorithm = SHA224() + if name == "sha256": + algorithm = SHA256() + if name == "sha384": + algorithm = SHA384() + if name == "sha512": + algorithm = SHA512() + + return algorithm + + def get_padding_name_str(self) -> str: + """Returns the padding name from the key scheme as a string""" + padding_name = self.scheme.split("-")[1] + return padding_name + + def get_padding_name( + self, hash_algorithm: "HashAlgorithm", salt_length: Any + ) -> "AsymmetricPadding": + """Returns the padding name from the key scheme as a AsymmetricPadding + + Args: + hash_algorithm: the hash algorithm used as a HashAlgorithm + object, only for PSS. + selt_length: the salt length to use for PSS. + PSS.AUTO or PSS.DIGEST_LENGTH + + Returns: + AsymmetricPadding + + """ + name = self.get_padding_name_str() + padding: AsymmetricPadding + if name == "pss": + padding = PSS(mgf=MGF1(hash_algorithm), salt_length=salt_length) + if name == "pkcs1v15": + padding = PKCS1v15() + + return padding diff --git a/securesystemslib/signer/_signer.py b/securesystemslib/signer/_signer.py index 54ec8747..148c05ba 100644 --- a/securesystemslib/signer/_signer.py +++ b/securesystemslib/signer/_signer.py @@ -4,7 +4,7 @@ from abc import ABCMeta, abstractmethod from typing import Callable, Dict, Optional, Type -from securesystemslib.signer._key import Key +from securesystemslib.signer._key import Key, SSlibKey from securesystemslib.signer._signature import Signature logger = logging.getLogger(__name__) @@ -80,7 +80,7 @@ def sign(self, payload: bytes) -> Signature: def from_priv_key_uri( cls, priv_key_uri: str, - public_key: Key, + public_key: SSlibKey, secrets_handler: Optional[SecretsHandler] = None, ) -> "Signer": """Factory constructor for a given private key URI