|
9 | 9 | an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
10 | 10 | specific language governing permissions and limitations under the License.
|
11 | 11 | """
|
| 12 | +import abc |
12 | 13 | import typing
|
13 | 14 |
|
14 | 15 | from bkcrypto.asymmetric.ciphers import BaseAsymmetricCipher
|
|
17 | 18 | from bkcrypto.symmetric.ciphers import BaseSymmetricCipher
|
18 | 19 | from bkcrypto.symmetric.options import SymmetricOptions
|
19 | 20 |
|
20 |
| -from .init_configs import AsymmetricCipherInitConfig, SymmetricCipherInitConfig |
| 21 | +from .init_configs import CipherInitConfig |
21 | 22 | from .settings import crypto_settings
|
22 | 23 |
|
23 | 24 |
|
@@ -47,54 +48,86 @@ def get_symmetric_cipher(
|
47 | 48 | )
|
48 | 49 |
|
49 | 50 |
|
50 |
| -class SymmetricCipherManager: |
51 |
| - _cache: typing.Optional[typing.Dict[str, BaseSymmetricCipher]] = None |
| 51 | +class BaseCipherManager(abc.ABC): |
52 | 52 |
|
53 |
| - def __init__(self): |
54 |
| - self._cache: [str, BaseSymmetricCipher] = {} |
| 53 | + _cache: typing.Dict[str, typing.Any] = None |
55 | 54 |
|
56 |
| - def cipher( |
57 |
| - self, using: typing.Optional[str] = None, cipher_type: typing.Optional[str] = None |
58 |
| - ) -> BaseSymmetricCipher: |
| 55 | + def __init__(self): |
| 56 | + self._cache = {} |
59 | 57 |
|
| 58 | + def _get_init_config(self, using: typing.Optional[str] = None) -> CipherInitConfig: |
60 | 59 | using: str = using or "default"
|
61 |
| - if using not in crypto_settings.SYMMETRIC_CIPHERS: |
| 60 | + init_configs: typing.Dict[str, CipherInitConfig] = self._get_init_configs_from_settings() |
| 61 | + if using not in init_configs: |
62 | 62 | raise RuntimeError(f"Invalid using {using}")
|
| 63 | + return init_configs[using] |
| 64 | + |
| 65 | + @abc.abstractmethod |
| 66 | + def _get_init_configs_from_settings(self) -> typing.Dict[str, CipherInitConfig]: |
| 67 | + raise NotImplementedError |
| 68 | + |
| 69 | + @abc.abstractmethod |
| 70 | + def _get_cipher_type_from_settings(self) -> str: |
| 71 | + raise NotImplementedError |
63 | 72 |
|
64 |
| - cipher_type: str = cipher_type or crypto_settings.SYMMETRIC_CIPHER_TYPE |
| 73 | + @abc.abstractmethod |
| 74 | + def _get_cipher(self, cipher_type: str, init_config: CipherInitConfig): |
| 75 | + raise NotImplementedError |
| 76 | + |
| 77 | + def _cipher(self, using: typing.Optional[str] = None, cipher_type: typing.Optional[str] = None): |
| 78 | + |
| 79 | + # try to get cipher from cache |
| 80 | + cipher_type: str = cipher_type or self._get_cipher_type_from_settings() |
65 | 81 | cache_key: str = f"{using}-{cipher_type}"
|
66 | 82 | if cache_key in self._cache:
|
67 | 83 | return self._cache[cache_key]
|
68 | 84 |
|
69 |
| - init_config: SymmetricCipherInitConfig = crypto_settings.SYMMETRIC_CIPHERS[using] |
70 |
| - cipher: BaseSymmetricCipher = get_symmetric_cipher(**init_config.as_get_cipher_params(cipher_type)) |
71 |
| - self._cache[cache_key] = cipher |
72 |
| - return cipher |
| 85 | + # create & cache instance |
| 86 | + init_config: CipherInitConfig = self._get_init_config(using=using) |
| 87 | + self._cache[cache_key] = self._get_cipher(cipher_type, init_config) |
| 88 | + return self._cache[cache_key] |
73 | 89 |
|
| 90 | + @abc.abstractmethod |
| 91 | + def cipher(self, using: typing.Optional[str] = None, cipher_type: typing.Optional[str] = None): |
| 92 | + raise NotImplementedError |
74 | 93 |
|
75 |
| -class AsymmetricCipherManager: |
76 |
| - _cache: typing.Optional[typing.Dict[str, BaseAsymmetricCipher]] = None |
77 | 94 |
|
78 |
| - def __init__(self): |
79 |
| - self._cache: [str, BaseAsymmetricCipher] = {} |
| 95 | +class SymmetricCipherManager(BaseCipherManager): |
| 96 | + |
| 97 | + _cache: typing.Optional[typing.Dict[str, BaseSymmetricCipher]] = None |
| 98 | + |
| 99 | + def _get_init_configs_from_settings(self) -> typing.Dict[str, CipherInitConfig]: |
| 100 | + return crypto_settings.SYMMETRIC_CIPHERS |
| 101 | + |
| 102 | + def _get_cipher_type_from_settings(self) -> str: |
| 103 | + return crypto_settings.SYMMETRIC_CIPHER_TYPE |
| 104 | + |
| 105 | + def _get_cipher(self, cipher_type: str, init_config: CipherInitConfig) -> BaseSymmetricCipher: |
| 106 | + return get_symmetric_cipher(**init_config.as_get_cipher_params(cipher_type)) |
80 | 107 |
|
81 | 108 | def cipher(
|
82 | 109 | self, using: typing.Optional[str] = None, cipher_type: typing.Optional[str] = None
|
83 |
| - ) -> BaseAsymmetricCipher: |
| 110 | + ) -> BaseSymmetricCipher: |
| 111 | + return self._cipher(using, cipher_type) |
84 | 112 |
|
85 |
| - using: str = using or "default" |
86 |
| - if using not in crypto_settings.ASYMMETRIC_CIPHERS: |
87 |
| - raise RuntimeError(f"Invalid using {using}") |
88 | 113 |
|
89 |
| - cipher_type: str = cipher_type or crypto_settings.ASYMMETRIC_CIPHER_TYPE |
90 |
| - cache_key: str = f"{using}-{cipher_type}" |
91 |
| - if cache_key in self._cache: |
92 |
| - return self._cache[cache_key] |
| 114 | +class AsymmetricCipherManager(BaseCipherManager): |
93 | 115 |
|
94 |
| - init_config: AsymmetricCipherInitConfig = crypto_settings.ASYMMETRIC_CIPHERS[using] |
95 |
| - cipher: BaseAsymmetricCipher = get_asymmetric_cipher(**init_config.as_get_cipher_params(cipher_type)) |
96 |
| - self._cache[cache_key] = cipher |
97 |
| - return cipher |
| 116 | + _cache: typing.Optional[typing.Dict[str, BaseAsymmetricCipher]] = None |
| 117 | + |
| 118 | + def _get_init_configs_from_settings(self) -> typing.Dict[str, CipherInitConfig]: |
| 119 | + return crypto_settings.ASYMMETRIC_CIPHERS |
| 120 | + |
| 121 | + def _get_cipher_type_from_settings(self) -> str: |
| 122 | + return crypto_settings.ASYMMETRIC_CIPHER_TYPE |
| 123 | + |
| 124 | + def _get_cipher(self, cipher_type: str, init_config: CipherInitConfig) -> BaseAsymmetricCipher: |
| 125 | + return get_asymmetric_cipher(**init_config.as_get_cipher_params(cipher_type)) |
| 126 | + |
| 127 | + def cipher( |
| 128 | + self, using: typing.Optional[str] = None, cipher_type: typing.Optional[str] = None |
| 129 | + ) -> BaseAsymmetricCipher: |
| 130 | + return self._cipher(using, cipher_type) |
98 | 131 |
|
99 | 132 |
|
100 | 133 | symmetric_cipher_manager = SymmetricCipherManager()
|
|
0 commit comments