From a50f9b35be8b3e5c413ba4416368035808f7a4cb Mon Sep 17 00:00:00 2001 From: Ron Frederick Date: Fri, 26 Jul 2024 20:12:25 -0700 Subject: [PATCH] Improve handling of deprecated ciphers This commit eliminates deprecation warnings from cryptography 43.0.0, and automates the handling of "decrepit" ciphers going forward to avoid needing to update AsyncSSH each time a new cipher is retired. --- asyncssh/crypto/cipher.py | 78 ++++++++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 30 deletions(-) diff --git a/asyncssh/crypto/cipher.py b/asyncssh/crypto/cipher.py index 68badae..23134e3 100644 --- a/asyncssh/crypto/cipher.py +++ b/asyncssh/crypto/cipher.py @@ -1,4 +1,4 @@ -# Copyright (c) 2014-2021 by Ron Frederick and others. +# Copyright (c) 2014-2024 by Ron Frederick and others. # # This program and the accompanying materials are made available under # the terms of the Eclipse Public License v2.0 which accompanies this @@ -20,22 +20,23 @@ """A shim around PyCA for accessing symmetric ciphers needed by AsyncSSH""" +from types import ModuleType from typing import Any, MutableMapping, Optional, Tuple import warnings from cryptography.exceptions import InvalidTag from cryptography.hazmat.primitives.ciphers import Cipher, CipherContext from cryptography.hazmat.primitives.ciphers.aead import AESGCM -from cryptography.hazmat.primitives.ciphers.algorithms import AES, ARC4 -from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES from cryptography.hazmat.primitives.ciphers.modes import CBC, CTR -with warnings.catch_warnings(): - warnings.simplefilter('ignore') +import cryptography.hazmat.primitives.ciphers.algorithms as _algs + +_decrepit_algs: Optional[ModuleType] - from cryptography.hazmat.primitives.ciphers.algorithms import Blowfish - from cryptography.hazmat.primitives.ciphers.algorithms import CAST5 - from cryptography.hazmat.primitives.ciphers.algorithms import SEED +try: + import cryptography.hazmat.decrepit.ciphers.algorithms as _decrepit_algs +except ImportError: + _decrepit_algs = None _CipherAlgs = Tuple[Any, Any, int] @@ -140,27 +141,44 @@ def get_cipher_params(cipher_name: str) -> _CipherParams: _cipher_alg_list = ( - ('aes128-cbc', AES, CBC, 0, 16, 16, 16), - ('aes192-cbc', AES, CBC, 0, 24, 16, 16), - ('aes256-cbc', AES, CBC, 0, 32, 16, 16), - ('aes128-ctr', AES, CTR, 0, 16, 16, 16), - ('aes192-ctr', AES, CTR, 0, 24, 16, 16), - ('aes256-ctr', AES, CTR, 0, 32, 16, 16), - ('aes128-gcm', None, None, 0, 16, 12, 16), - ('aes256-gcm', None, None, 0, 32, 12, 16), - ('arcfour', ARC4, None, 0, 16, 1, 1), - ('arcfour40', ARC4, None, 0, 5, 1, 1), - ('arcfour128', ARC4, None, 1536, 16, 1, 1), - ('arcfour256', ARC4, None, 1536, 32, 1, 1), - ('blowfish-cbc', Blowfish, CBC, 0, 16, 8, 8), - ('cast128-cbc', CAST5, CBC, 0, 16, 8, 8), - ('des-cbc', TripleDES, CBC, 0, 8, 8, 8), - ('des2-cbc', TripleDES, CBC, 0, 16, 8, 8), - ('des3-cbc', TripleDES, CBC, 0, 24, 8, 8), - ('seed-cbc', SEED, CBC, 0, 16, 16, 16) + ('aes128-cbc', 'AES', CBC, 0, 16, 16, 16), + ('aes192-cbc', 'AES', CBC, 0, 24, 16, 16), + ('aes256-cbc', 'AES', CBC, 0, 32, 16, 16), + ('aes128-ctr', 'AES', CTR, 0, 16, 16, 16), + ('aes192-ctr', 'AES', CTR, 0, 24, 16, 16), + ('aes256-ctr', 'AES', CTR, 0, 32, 16, 16), + ('aes128-gcm', None, None, 0, 16, 12, 16), + ('aes256-gcm', None, None, 0, 32, 12, 16), + ('arcfour', 'ARC4', None, 0, 16, 1, 1), + ('arcfour40', 'ARC4', None, 0, 5, 1, 1), + ('arcfour128', 'ARC4', None, 1536, 16, 1, 1), + ('arcfour256', 'ARC4', None, 1536, 32, 1, 1), + ('blowfish-cbc', 'Blowfish', CBC, 0, 16, 8, 8), + ('cast128-cbc', 'CAST5', CBC, 0, 16, 8, 8), + ('des-cbc', 'TripleDES', CBC, 0, 8, 8, 8), + ('des2-cbc', 'TripleDES', CBC, 0, 16, 8, 8), + ('des3-cbc', 'TripleDES', CBC, 0, 24, 8, 8), + ('seed-cbc', 'SEED', CBC, 0, 16, 16, 16) ) -for _cipher_name, _cipher, _mode, _initial_bytes, \ - _key_size, _iv_size, _block_size in _cipher_alg_list: - _cipher_algs[_cipher_name] = (_cipher, _mode, _initial_bytes) - register_cipher(_cipher_name, _key_size, _iv_size, _block_size) +with warnings.catch_warnings(): + warnings.simplefilter('ignore') + + for _cipher_name, _alg, _mode, _initial_bytes, \ + _key_size, _iv_size, _block_size in _cipher_alg_list: + if _alg: + try: + _cipher = getattr(_algs, _alg) + except AttributeError as exc: # pragma: no cover + if _decrepit_algs: + try: + _cipher = getattr(_decrepit_algs, _alg) + except AttributeError: + raise exc from None + else: + raise + else: + _cipher = None + + _cipher_algs[_cipher_name] = (_cipher, _mode, _initial_bytes) + register_cipher(_cipher_name, _key_size, _iv_size, _block_size)