1717# b2 replication-accept destinationBucketName sourceKeyId [destinationKeyId]
1818# b2 replication-deny destinationBucketName sourceKeyId
1919
20+ from collections .abc import Iterable
2021from typing import ClassVar , List , Optional , Tuple
2122import itertools
2223import logging
2930
3031logger = logging .getLogger (__name__ )
3132
33+ try :
34+ Iterable [str ]
35+ except TypeError :
36+ Iterable = List # Remove after dropping Python 3.8
37+
3238
3339class ReplicationSetupHelper (metaclass = B2TraceMeta ):
34- """ class with various methods that help with repliction management """
40+ """ class with various methods that help with setting up repliction """
3541 PRIORITY_OFFSET : ClassVar [int ] = 5 #: how far to to put the new rule from the existing rules
3642 DEFAULT_PRIORITY : ClassVar [
3743 int
@@ -50,39 +56,38 @@ class ReplicationSetupHelper(metaclass=B2TraceMeta):
5056 'deleteFiles' ,
5157 )
5258
53- def __init__ (self , source_b2api : B2Api = None , destination_b2api : B2Api = None ):
54- assert source_b2api is not None or destination_b2api is not None
55- self .source_b2api = source_b2api
56- self .destination_b2api = destination_b2api
57-
5859 def setup_both (
5960 self ,
60- source_bucket_name : str ,
61+ source_bucket : Bucket ,
6162 destination_bucket : Bucket ,
6263 name : Optional [str ] = None , #: name for the new replication rule
6364 priority : int = None , #: priority for the new replication rule
6465 prefix : Optional [str ] = None ,
66+ include_existing_files : bool = False ,
6567 ) -> Tuple [Bucket , Bucket ]:
66- source_bucket = self . setup_source (
67- source_bucket_name ,
68- prefix ,
68+
69+ new_source_bucket = self . setup_source (
70+ source_bucket ,
6971 destination_bucket ,
72+ prefix ,
7073 name ,
7174 priority ,
75+ include_existing_files ,
7276 )
73- destination_bucket = self .setup_destination (
74- source_bucket .replication .as_replication_source .source_application_key_id ,
77+
78+ new_destination_bucket = self .setup_destination (
79+ new_source_bucket .replication .as_replication_source .source_application_key_id ,
7580 destination_bucket ,
7681 )
77- return source_bucket , destination_bucket
82+
83+ return new_source_bucket , new_destination_bucket
7884
7985 def setup_destination (
8086 self ,
8187 source_key_id : str ,
8288 destination_bucket : Bucket ,
8389 ) -> Bucket :
8490 api : B2Api = destination_bucket .api
85- destination_bucket = api .list_buckets (destination_bucket .name )[0 ] # fresh!
8691 if destination_bucket .replication is None or destination_bucket .replication .as_replication_source is None :
8792 source_configuration = None
8893 else :
@@ -145,28 +150,27 @@ def _get_destination_key(
145150 logger .debug ("no matching key found, making a new one" )
146151 key = cls ._create_destination_key (
147152 name = destination_bucket .name [:91 ] + '-replidst' ,
148- api = api ,
149- bucket_id = destination_bucket .id_ ,
153+ bucket = destination_bucket ,
150154 prefix = None ,
151155 )
152156 return keys_to_purge , key
153157
154158 def setup_source (
155159 self ,
156- source_bucket_name ,
157- prefix ,
160+ source_bucket : Bucket ,
158161 destination_bucket : Bucket ,
159- name ,
160- priority ,
162+ prefix : Optional [str ] = None ,
163+ name : Optional [str ] = None , #: name for the new replication rule
164+ priority : int = None , #: priority for the new replication rule
165+ include_existing_files : bool = False ,
161166 ) -> Bucket :
162- source_bucket : Bucket = self .source_b2api .list_buckets (source_bucket_name )[0 ] # fresh!
163167 if prefix is None :
164168 prefix = ""
165169
166170 try :
167- current_source_rrs = source_bucket .replication .as_replication_source .rules
171+ current_source_rules = source_bucket .replication .as_replication_source .rules
168172 except (NameError , AttributeError ):
169- current_source_rrs = []
173+ current_source_rules = []
170174 try :
171175 destination_configuration = source_bucket .replication .as_replication_destination
172176 except (NameError , AttributeError ):
@@ -176,27 +180,28 @@ def setup_source(
176180 source_bucket ,
177181 prefix ,
178182 source_bucket .replication ,
179- current_source_rrs ,
183+ current_source_rules ,
180184 )
181185 priority = self ._get_priority_for_new_rule (
186+ current_source_rules ,
182187 priority ,
183- current_source_rrs ,
184188 )
185- name = self ._get_name_for_new_rule (
189+ name = self ._get_new_rule_name (
190+ current_source_rules ,
191+ destination_bucket ,
186192 name ,
187- current_source_rrs ,
188- destination_bucket .name ,
189193 )
190194 new_rr = ReplicationRule (
191195 name = name ,
192196 priority = priority ,
193197 destination_bucket_id = destination_bucket .id_ ,
194198 file_name_prefix = prefix ,
199+ include_existing_files = include_existing_files ,
195200 )
196201 new_replication_configuration = ReplicationConfiguration (
197202 ReplicationSourceConfiguration (
198203 source_application_key_id = source_key .id_ ,
199- rules = current_source_rrs + [new_rr ],
204+ rules = current_source_rules + [new_rr ],
200205 ),
201206 destination_configuration ,
202207 )
@@ -208,10 +213,10 @@ def setup_source(
208213 @classmethod
209214 def _get_source_key (
210215 cls ,
211- source_bucket ,
212- prefix ,
216+ source_bucket : Bucket ,
217+ prefix : str ,
213218 current_replication_configuration : ReplicationConfiguration ,
214- current_source_rrs ,
219+ current_source_rules : Iterable [ ReplicationRule ] ,
215220 ) -> ApplicationKey :
216221 api = source_bucket .api
217222
@@ -227,9 +232,9 @@ def _get_source_key(
227232
228233 new_key = cls ._create_source_key (
229234 name = source_bucket .name [:91 ] + '-replisrc' ,
230- api = api ,
231- bucket_id = source_bucket . id_ ,
232- ) # no prefix!
235+ bucket = source_bucket ,
236+ prefix = prefix ,
237+ )
233238 return new_key
234239
235240 @classmethod
@@ -269,67 +274,74 @@ def _should_make_new_source_key(
269274 def _create_source_key (
270275 cls ,
271276 name : str ,
272- api : B2Api ,
273- bucket_id : str ,
277+ bucket : Bucket ,
274278 prefix : Optional [str ] = None ,
275279 ) -> ApplicationKey :
280+ # in this implementation we ignore the prefix and create a full key, because
281+ # if someone would need a different (wider) key later, all replication
282+ # destinations would have to start using new keys and it's not feasible
283+ # from organizational perspective, while the prefix of uploaded files can be
284+ # restricted on the rule level
285+ prefix = None
276286 capabilities = cls .DEFAULT_SOURCE_CAPABILITIES
277- return cls ._create_key (name , api , bucket_id , prefix , capabilities )
287+ return cls ._create_key (name , bucket , prefix , capabilities )
278288
279289 @classmethod
280290 def _create_destination_key (
281291 cls ,
282292 name : str ,
283- api : B2Api ,
284- bucket_id : str ,
293+ bucket : Bucket ,
285294 prefix : Optional [str ] = None ,
286295 ) -> ApplicationKey :
287296 capabilities = cls .DEFAULT_DESTINATION_CAPABILITIES
288- return cls ._create_key (name , api , bucket_id , prefix , capabilities )
297+ return cls ._create_key (name , bucket , prefix , capabilities )
289298
290299 @classmethod
291300 def _create_key (
292301 cls ,
293302 name : str ,
294- api : B2Api ,
295- bucket_id : str ,
303+ bucket : Bucket ,
296304 prefix : Optional [str ] = None ,
297305 capabilities = tuple (),
298306 ) -> ApplicationKey :
307+ api : B2Api = bucket .api
299308 return api .create_key (
300309 capabilities = capabilities ,
301310 key_name = name ,
302- bucket_id = bucket_id ,
311+ bucket_id = bucket . id_ ,
303312 name_prefix = prefix ,
304313 )
305314
306315 @classmethod
307- def _get_narrowest_common_prefix (cls , widen_to : List [str ]) -> str :
308- for path in widen_to :
309- pass # TODO
310- return ''
311-
312- @classmethod
313- def _get_priority_for_new_rule (cls , priority , current_source_rrs ):
314- # if there is no priority hint, look into current rules to determine the last priority and add a constant to it
316+ def _get_priority_for_new_rule (
317+ cls ,
318+ current_rules : Iterable [ReplicationRule ],
319+ priority : Optional [int ] = None ,
320+ ):
315321 if priority is not None :
316322 return priority
317- if current_source_rrs :
318- # TODO: maybe handle a case where the existing rrs need to have their priorities decreased to make space
319- existing_priority = max (rr .priority for rr in current_source_rrs )
323+ if current_rules :
324+ # ignore a case where the existing rrs need to have their priorities decreased to make space (max is 2**31-1)
325+ existing_priority = max (rr .priority for rr in current_rules )
320326 return min (existing_priority + cls .PRIORITY_OFFSET , cls .MAX_PRIORITY )
321327 return cls .DEFAULT_PRIORITY
322328
323329 @classmethod
324- def _get_name_for_new_rule (
325- cls , name : Optional [str ], current_source_rrs , destination_bucket_name
330+ def _get_new_rule_name (
331+ cls ,
332+ current_rules : Iterable [ReplicationRule ],
333+ destination_bucket : Bucket ,
334+ name : Optional [str ] = None ,
326335 ):
327336 if name is not None :
328337 return name
329- existing_names = set (rr .name for rr in current_source_rrs )
338+ existing_names = set (rr .name for rr in current_rules )
330339 suffixes = cls ._get_rule_name_candidate_suffixes ()
331340 while True :
332- candidate = '%s%s' % (destination_bucket_name , next (suffixes ))
341+ candidate = '%s%s' % (
342+ destination_bucket .name ,
343+ next (suffixes ),
344+ ) # use := after dropping 3.7
333345 if candidate not in existing_names :
334346 return candidate
335347
0 commit comments