|
| 1 | +"""Signer interface and example interface implementations. |
| 2 | +
|
| 3 | +The goal of this module is to provide a signing interface supporting multiple |
| 4 | +signing implementations and a couple of example implementations. |
| 5 | +
|
| 6 | +""" |
| 7 | + |
| 8 | +import logging |
| 9 | +from typing import Optional |
| 10 | +from urllib import parse |
| 11 | + |
| 12 | +import securesystemslib.hash as sslib_hash |
| 13 | +from securesystemslib import exceptions |
| 14 | +from securesystemslib.signer._key import Key |
| 15 | +from securesystemslib.signer._signer import SecretsHandler, Signature, Signer |
| 16 | + |
| 17 | +logger = logging.getLogger(__name__) |
| 18 | + |
| 19 | +GCP_IMPORT_ERROR = None |
| 20 | +try: |
| 21 | + from google.cloud import kms |
| 22 | +except ImportError: |
| 23 | + GCP_IMPORT_ERROR = ( |
| 24 | + "google-cloud-kms library required to sign with Google Cloud keys." |
| 25 | + ) |
| 26 | + |
| 27 | + |
| 28 | +class GCPSigner(Signer): |
| 29 | + """Google Cloud KMS Signer |
| 30 | +
|
| 31 | + This Signer uses Google Cloud KMS to sign: the payload is hashed locally, |
| 32 | + but the signature is created on the KMS. |
| 33 | +
|
| 34 | + The signer uses "ambient" credentials: typically environment var |
| 35 | + GOOGLE_APPLICATION_CREDENTIALS that points to a file with valid |
| 36 | + credentials. These will be found by google.cloud.kms, see |
| 37 | + https://cloud.google.com/docs/authentication/getting-started |
| 38 | + (and https://github.com/google-github-actions/auth for the relevant |
| 39 | + GitHub action). |
| 40 | +
|
| 41 | + Arguments: |
| 42 | + gcp_keyid: Fully qualified GCP KMS key name, like |
| 43 | + projects/python-tuf-kms/locations/global/keyRings/securesystemslib-tests/cryptoKeys/ecdsa-sha2-nistp256/cryptoKeyVersions/1 |
| 44 | + public_key: The related public key instance |
| 45 | +
|
| 46 | + Raises: |
| 47 | + UnsupportedAlgorithmError: The payload hash algorithm is unsupported. |
| 48 | + UnsupportedLibraryError: google.cloud.kms was not found |
| 49 | + Various errors from google.cloud modules: e.g. |
| 50 | + google.auth.exceptions.DefaultCredentialsError if ambient |
| 51 | + credentials are not found |
| 52 | + """ |
| 53 | + |
| 54 | + SCHEME = "gcpkms" |
| 55 | + |
| 56 | + def __init__(self, gcp_keyid: str, public_key: Key): |
| 57 | + if GCP_IMPORT_ERROR: |
| 58 | + raise exceptions.UnsupportedLibraryError(GCP_IMPORT_ERROR) |
| 59 | + |
| 60 | + self.hash_algorithm = self._get_hash_algorithm(public_key) |
| 61 | + self.gcp_keyid = gcp_keyid |
| 62 | + self.public_key = public_key |
| 63 | + self.client = kms.KeyManagementServiceClient() |
| 64 | + |
| 65 | + @classmethod |
| 66 | + def from_priv_key_uri( |
| 67 | + cls, |
| 68 | + priv_key_uri: str, |
| 69 | + public_key: Key, |
| 70 | + secrets_handler: Optional[SecretsHandler] = None, |
| 71 | + ) -> "GCPSigner": |
| 72 | + uri = parse.urlparse(priv_key_uri) |
| 73 | + |
| 74 | + if uri.scheme != cls.SCHEME: |
| 75 | + raise ValueError(f"GCPSigner does not support {priv_key_uri}") |
| 76 | + |
| 77 | + return cls(uri.path, public_key) |
| 78 | + |
| 79 | + @staticmethod |
| 80 | + def _get_hash_algorithm(public_key: Key) -> str: |
| 81 | + """Helper function to return payload hash algorithm used for this key""" |
| 82 | + |
| 83 | + # TODO: This could be a public abstract method on Key so that GCPSigner |
| 84 | + # would not be tied to a specific Key implementation -- not all keys |
| 85 | + # have a pre hash algorithm though. |
| 86 | + if public_key.keytype == "rsa": |
| 87 | + # hash algorithm is encoded as last scheme portion |
| 88 | + algo = public_key.scheme.split("-")[-1] |
| 89 | + if public_key.keytype in [ |
| 90 | + "ecdsa", |
| 91 | + "ecdsa-sha2-nistp256", |
| 92 | + "ecdsa-sha2-nistp384", |
| 93 | + ]: |
| 94 | + # nistp256 uses sha-256, nistp384 uses sha-384 |
| 95 | + bits = public_key.scheme.split("-nistp")[-1] |
| 96 | + algo = f"sha{bits}" |
| 97 | + |
| 98 | + # trigger UnsupportedAlgorithm if appropriate |
| 99 | + _ = sslib_hash.digest(algo) |
| 100 | + return algo |
| 101 | + |
| 102 | + def sign(self, payload: bytes) -> Signature: |
| 103 | + """Signs payload with Google Cloud KMS. |
| 104 | +
|
| 105 | + Arguments: |
| 106 | + payload: bytes to be signed. |
| 107 | +
|
| 108 | + Raises: |
| 109 | + Various errors from google.cloud modules. |
| 110 | +
|
| 111 | + Returns: |
| 112 | + Signature. |
| 113 | + """ |
| 114 | + # NOTE: request and response can contain CRC32C of the digest/sig: |
| 115 | + # Verifying could be useful but would require another dependency... |
| 116 | + |
| 117 | + hasher = sslib_hash.digest(self.hash_algorithm) |
| 118 | + hasher.update(payload) |
| 119 | + digest = {self.hash_algorithm: hasher.digest()} |
| 120 | + request = {"name": self.gcp_keyid, "digest": digest} |
| 121 | + |
| 122 | + logger.debug("signing request %s", request) |
| 123 | + response = self.client.asymmetric_sign(request) |
| 124 | + logger.debug("signing response %s", response) |
| 125 | + |
| 126 | + return Signature(self.public_key.keyid, response.signature.hex()) |
0 commit comments