diff --git a/CHANGELOG.md b/CHANGELOG.md index 38ff45b1f..cff84aa3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Refactored `sync.file.*File` and `sync.file.*FileVersion` to `sync.path.*SyncPath` * Refactored `FileVersionInfo` to `FileVersion` * `ScanPoliciesManager` exclusion interface changed +* `B2Api` unittests for v0, v1 and v2 are now common +* `B2Api.cancel_large_file` returns a `FileIdAndName` object instead of a `FileVersion` object in v2 +* `FileVersion` has a mandatory `api` parameter +* `B2Folder` holds a handle to B2Api +* `Bucket` unit tests for v1 and v2 are now common ## [1.8.0] - 2021-05-21 @@ -192,8 +197,7 @@ has changed. ### Added Initial official release of SDK as a separate package (until now it was a part of B2 CLI) -[Unreleased]: https://github.com/Backblaze/b2-sdk-python/compare/v1.8.0...HEAD -[1.8.0]: https://github.com/Backblaze/b2-sdk-python/compare/v1.7.0...v1.8.0 +[Unreleased]: https://github.com/Backblaze/b2-sdk-python/compare/v1.7.0...HEAD [1.7.0]: https://github.com/Backblaze/b2-sdk-python/compare/v1.6.0...v1.7.0 [1.6.0]: https://github.com/Backblaze/b2-sdk-python/compare/v1.5.0...v1.6.0 [1.5.0]: https://github.com/Backblaze/b2-sdk-python/compare/v1.4.0...v1.5.0 diff --git a/b2sdk/api.py b/b2sdk/api.py index c6c1288fb..0eb9c1817 100644 --- a/b2sdk/api.py +++ b/b2sdk/api.py @@ -45,15 +45,16 @@ def url_for_api(info, api_name): class Services(object): """ Gathers objects that provide high level logic over raw api usage. """ - def __init__(self, session, max_upload_workers=10, max_copy_workers=10): + def __init__(self, api, max_upload_workers=10, max_copy_workers=10): """ Initialize Services object using given session. - :param b2sdk.v1.Session session: + :param b2sdk.v1.B2Api api: :param int max_upload_workers: a number of upload threads :param int max_copy_workers: a number of copy threads """ - self.session = session + self.api = api + self.session = api.session self.large_file = LargeFileServices(self) self.download_manager = DownloadManager(self) self.upload_manager = UploadManager(self, max_upload_workers=max_upload_workers) @@ -121,7 +122,7 @@ def __init__( """ self.session = self.SESSION_CLASS(account_info=account_info, cache=cache, raw_api=raw_api) self.services = Services( - self.session, + self, max_upload_workers=max_upload_workers, max_copy_workers=max_copy_workers, ) @@ -394,28 +395,22 @@ def list_parts(self, file_id, start_part_number=None, batch_size=None): ) # delete/cancel - def cancel_large_file(self, file_id): + def cancel_large_file(self, file_id: str) -> FileIdAndName: """ Cancel a large file upload. - - :param str file_id: a file ID - :rtype: None """ return self.services.large_file.cancel_large_file(file_id) - def delete_file_version(self, file_id, file_name): + def delete_file_version(self, file_id: str, file_name: str) -> FileIdAndName: """ Permanently and irrevocably delete one version of a file. - - :param str file_id: a file ID - :param str file_name: a file name - :rtype: FileIdAndName """ # filename argument is not first, because one day it may become optional response = self.session.delete_file_version(file_id, file_name) - assert response['fileId'] == file_id - assert response['fileName'] == file_name - return FileIdAndName(file_id, file_name) + file_id_and_name = FileIdAndName.from_cancel_or_delete_response(response) + assert file_id_and_name.file_id == file_id + assert file_id_and_name.file_name == file_name + return file_id_and_name # download def get_download_url_for_fileid(self, file_id): diff --git a/b2sdk/bucket.py b/b2sdk/bucket.py index d4205d4b8..bf97996a7 100644 --- a/b2sdk/bucket.py +++ b/b2sdk/bucket.py @@ -231,7 +231,9 @@ def get_file_info_by_id(self, file_id: str) -> FileVersion: :param str file_id: the id of the file who's info will be retrieved. :rtype: generator[b2sdk.v1.FileVersionInfo] """ - return self.FILE_VERSION_FACTORY.from_api_response(self.api.get_file_info(file_id)) + return self.FILE_VERSION_FACTORY.from_api_response( + self.api, self.api.get_file_info(file_id) + ) def get_file_info_by_name(self, file_name: str) -> FileVersion: """ @@ -242,7 +244,7 @@ def get_file_info_by_name(self, file_name: str) -> FileVersion: """ try: return self.FILE_VERSION_FACTORY.from_response_headers( - self.api.session.get_file_info_by_name(self.name, file_name) + self.api, self.api.session.get_file_info_by_name(self.name, file_name) ) except FileOrBucketNotFound: raise FileNotPresent(bucket_name=self.name, file_id_or_name=file_name) @@ -290,7 +292,7 @@ def list_file_versions(self, file_name, fetch_count=None): ) for entry in response['files']: - file_version = self.FILE_VERSION_FACTORY.from_api_response(entry) + file_version = self.FILE_VERSION_FACTORY.from_api_response(self.api, entry) if file_version.file_name != file_name: # All versions for the requested file name have been listed. return @@ -350,7 +352,7 @@ def ls(self, folder_to_list='', show_versions=False, recursive=False, fetch_coun else: response = session.list_file_names(self.id_, start_file_name, fetch_count, prefix) for entry in response['files']: - file_version = self.FILE_VERSION_FACTORY.from_api_response(entry) + file_version = self.FILE_VERSION_FACTORY.from_api_response(self.api, entry) if not file_version.file_name.startswith(prefix): # We're past the files we care about return @@ -788,7 +790,7 @@ def hide_file(self, file_name): :rtype: b2sdk.v1.FileVersionInfo """ response = self.api.session.hide_file(self.id_, file_name) - return self.FILE_VERSION_FACTORY.from_api_response(response) + return self.FILE_VERSION_FACTORY.from_api_response(self.api, response) def copy( self, diff --git a/b2sdk/file_version.py b/b2sdk/file_version.py index 16563bad1..20a30cc3e 100644 --- a/b2sdk/file_version.py +++ b/b2sdk/file_version.py @@ -8,10 +8,8 @@ # ###################################################################### -from typing import Optional - from .encryption.setting import EncryptionSetting, EncryptionSettingFactory -from .file_lock import FileRetentionSetting, LegalHold +from .file_lock import FileRetentionSetting, LegalHold, NO_RETENTION_FILE_SETTING from .raw_api import SRC_LAST_MODIFIED_MILLIS @@ -36,6 +34,7 @@ class FileVersion: __slots__ = [ 'id_', + 'api', 'file_name', 'size', 'content_type', @@ -52,6 +51,7 @@ class FileVersion: def __init__( self, + api, id_, file_name, size, @@ -60,14 +60,12 @@ def __init__( file_info, upload_timestamp, action, - content_md5=None, - server_side_encryption: Optional[EncryptionSetting] = None, # TODO: make it mandatory in v2 - file_retention: Optional[ - FileRetentionSetting - ] = None, # TODO: in v2 change the default value to NO_RETENTION_FILE_SETTING - legal_hold: Optional[LegalHold - ] = None, # TODO: in v2 change the default value to LegalHold.UNSET + content_md5, + server_side_encryption: EncryptionSetting, + file_retention: FileRetentionSetting = NO_RETENTION_FILE_SETTING, + legal_hold: LegalHold = LegalHold.UNSET, ): + self.api = api self.id_ = id_ self.file_name = file_name self.size = size @@ -93,6 +91,8 @@ def as_dict(self): 'fileName': self.file_name, 'fileInfo': self.file_info, 'legalHold': self.legal_hold.to_dict_repr() if self.legal_hold is not None else None, + 'serverSideEncryption': self.server_side_encryption.as_dict(), + 'fileRetention': self.file_retention.as_dict(), } if self.size is not None: @@ -107,10 +107,6 @@ def as_dict(self): result['contentSha1'] = self.content_sha1 if self.content_md5 is not None: result['contentMd5'] = self.content_md5 - if self.server_side_encryption is not None: # this is for backward compatibility of interface only, b2sdk always sets it - result['serverSideEncryption'] = self.server_side_encryption.as_dict() - if self.file_retention is not None: # this is for backward compatibility of interface only, b2sdk always sets it - result['fileRetention'] = self.file_retention.as_dict() return result def __eq__(self, other): @@ -120,6 +116,12 @@ def __eq__(self, other): return False return True + def __repr__(self): + return '%s(%s)' % ( + self.__class__.__name__, + ', '.join(repr(getattr(self, attr)) for attr in self.__slots__) + ) + class FileVersionFactory(object): """ @@ -127,7 +129,7 @@ class FileVersionFactory(object): """ @classmethod - def from_api_response(cls, file_version_dict, force_action=None): + def from_api_response(cls, api, file_version_dict, force_action=None): """ Turn this: @@ -182,6 +184,7 @@ def from_api_response(cls, file_version_dict, force_action=None): legal_hold = LegalHold.from_file_version_dict(file_version_dict) return FileVersion( + api, id_, file_name, size, @@ -197,21 +200,9 @@ def from_api_response(cls, file_version_dict, force_action=None): ) @classmethod - def from_cancel_large_file_response(cls, response): - return FileVersion( - response['fileId'], - response['fileName'], - 0, # size - 'unknown', - 'none', - {}, - 0, # upload timestamp - 'cancel' - ) - - @classmethod - def from_response_headers(cls, headers): + def from_response_headers(cls, api, headers): return FileVersion( + api=api, id_=headers.get('x-bz-file-id'), file_name=headers.get('x-bz-file-name'), size=headers.get('content-length'), @@ -220,6 +211,7 @@ def from_response_headers(cls, headers): file_info=None, upload_timestamp=headers.get('x-bz-upload-timestamp'), action=None, + content_md5=None, server_side_encryption=EncryptionSettingFactory.from_response_headers(headers), file_retention=FileRetentionSetting.from_response_headers(headers), legal_hold=LegalHold.from_response_headers(headers), @@ -230,16 +222,23 @@ class FileIdAndName(object): """ A structure which represents a B2 cloud file with just `file_name` and `fileId` attributes. - Used to return data from calls to :py:meth:`b2sdk.v1.Bucket.delete_file_version`. - - :ivar str ~.file_id: ``fileId`` - :ivar str ~.file_name: full file name (with path) + Used to return data from calls to b2_delete_file_version and b2_cancel_large_file. """ - def __init__(self, file_id, file_name): + def __init__(self, file_id: str, file_name: str): self.file_id = file_id self.file_name = file_name + @classmethod + def from_cancel_or_delete_response(cls, response): + return cls(response['fileId'], response['fileName']) + def as_dict(self): """ represents the object as a dict which looks almost exactly like the raw api output for delete_file_version """ return {'action': 'delete', 'fileId': self.file_id, 'fileName': self.file_name} + + def __eq__(self, other): + return (self.file_id == other.file_id and self.file_name == other.file_name) + + def __repr__(self): + return '%s(%s, %s)' % (self.__class__.__name__, repr(self.file_id), repr(self.file_name)) diff --git a/b2sdk/large_file/services.py b/b2sdk/large_file/services.py index d2ab69943..39f66cf05 100644 --- a/b2sdk/large_file/services.py +++ b/b2sdk/large_file/services.py @@ -12,7 +12,7 @@ from b2sdk.encryption.setting import EncryptionSetting from b2sdk.file_lock import FileRetentionSetting, LegalHold -from b2sdk.file_version import FileVersionFactory +from b2sdk.file_version import FileIdAndName from b2sdk.large_file.part import PartFactory from b2sdk.large_file.unfinished_large_file import UnfinishedLargeFile @@ -95,7 +95,7 @@ def start_large_file( :param str,None content_type: the MIME type, or ``None`` to accept the default based on file extension of the B2 file name :param dict,None file_info: a file info to store with the file or ``None`` to not store anything :param b2sdk.v1.EncryptionSetting encryption: encryption settings (``None`` if unknown) - :param bool legal_hold: legal hold setting + :param b2sdk.v1.LegalHold legal_hold: legal hold setting :param b2sdk.v1.FileRetentionSetting file_retention: file retention setting """ return UnfinishedLargeFile( @@ -111,12 +111,9 @@ def start_large_file( ) # delete/cancel - def cancel_large_file(self, file_id): + def cancel_large_file(self, file_id: str) -> FileIdAndName: """ Cancel a large file upload. - - :param str file_id: a file ID - :rtype: None """ response = self.services.session.cancel_large_file(file_id) - return FileVersionFactory.from_cancel_large_file_response(response) + return FileIdAndName.from_cancel_or_delete_response(response) diff --git a/b2sdk/sync/folder.py b/b2sdk/sync/folder.py index e30ba781a..bec2bd58b 100644 --- a/b2sdk/sync/folder.py +++ b/b2sdk/sync/folder.py @@ -307,6 +307,7 @@ def __init__(self, bucket_name, folder_name, api): self.bucket_name = bucket_name self.folder_name = folder_name self.bucket = api.get_bucket_by_name(bucket_name) + self.api = api self.prefix = '' if self.folder_name == '' else self.folder_name + '/' def all_files(self, reporter, policies_manager=DEFAULT_SCAN_MANAGER): diff --git a/b2sdk/transfer/emerge/executor.py b/b2sdk/transfer/emerge/executor.py index 2bef9e014..8ab6cae69 100644 --- a/b2sdk/transfer/emerge/executor.py +++ b/b2sdk/transfer/emerge/executor.py @@ -220,7 +220,7 @@ def execute_plan(self, emerge_plan): # Finish the large file response = self.services.session.finish_large_file(file_id, part_sha1_array) - return FileVersionFactory.from_api_response(response) + return FileVersionFactory.from_api_response(self.services.api, response) def _execute_step(self, execution_step): semaphore = self._semaphore diff --git a/b2sdk/transfer/outbound/copy_manager.py b/b2sdk/transfer/outbound/copy_manager.py index 3f4e79544..ebecb00c1 100644 --- a/b2sdk/transfer/outbound/copy_manager.py +++ b/b2sdk/transfer/outbound/copy_manager.py @@ -216,7 +216,7 @@ def _copy_small_file( legal_hold=legal_hold, file_retention=file_retention, ) - file_info = FileVersionFactory.from_api_response(response) + file_info = FileVersionFactory.from_api_response(self.services.api, response) if progress_listener is not None: progress_listener.bytes_completed(file_info.size) diff --git a/b2sdk/transfer/outbound/upload_manager.py b/b2sdk/transfer/outbound/upload_manager.py index 61b983a8f..52c1e01ae 100644 --- a/b2sdk/transfer/outbound/upload_manager.py +++ b/b2sdk/transfer/outbound/upload_manager.py @@ -248,7 +248,7 @@ def _upload_small_file( content_sha1 = input_stream.hash assert content_sha1 == response[ 'contentSha1'], '%s != %s' % (content_sha1, response['contentSha1']) - return FileVersionFactory.from_api_response(response) + return FileVersionFactory.from_api_response(self.services.api, response) except B2Error as e: if not e.should_retry_upload(): diff --git a/b2sdk/v1/api.py b/b2sdk/v1/api.py index 1489c8b97..32296b078 100644 --- a/b2sdk/v1/api.py +++ b/b2sdk/v1/api.py @@ -10,12 +10,14 @@ from b2sdk import _v2 as v2 from .bucket import Bucket, BucketFactory +from .file_version import FileVersionInfo, file_version_info_from_id_and_name from .session import B2Session # override to use legacy no-request method of creating a bucket from bucket_id and retain `check_bucket_restrictions` # public API method # and to use v1.Bucket +# and to retain cancel_large_file return type class B2Api(v2.B2Api): SESSION_CLASS = staticmethod(B2Session) BUCKET_FACTORY_CLASS = staticmethod(BucketFactory) @@ -42,3 +44,7 @@ def check_bucket_restrictions(self, bucket_name): :raises b2sdk.v1.exception.RestrictedBucket: if the account is not allowed to use this bucket """ self.check_bucket_name_restrictions(bucket_name) + + def cancel_large_file(self, file_id: str) -> FileVersionInfo: + file_id_and_name = super().cancel_large_file(file_id) + return file_version_info_from_id_and_name(file_id_and_name) diff --git a/b2sdk/v1/bucket.py b/b2sdk/v1/bucket.py index 79e89f698..c218446af 100644 --- a/b2sdk/v1/bucket.py +++ b/b2sdk/v1/bucket.py @@ -8,9 +8,8 @@ # ###################################################################### -from typing import Optional - from .file_version import translate_single_file_version, FileVersionInfoFactory +from typing import Optional from b2sdk import _v2 as v2 from b2sdk.utils import validate_b2_file_name @@ -94,6 +93,9 @@ def start_large_file( create_file_stream = translate_single_file_version(v2.Bucket.create_file_stream) copy = translate_single_file_version(v2.Bucket.copy) + _create_file = translate_single_file_version(v2.Bucket._create_file) + copy = translate_single_file_version(v2.Bucket.copy) + class BucketFactory(v2.BucketFactory): BUCKET_CLASS = staticmethod(Bucket) diff --git a/b2sdk/v1/file_version.py b/b2sdk/v1/file_version.py index 5dfb4c395..b79b6d1e2 100644 --- a/b2sdk/v1/file_version.py +++ b/b2sdk/v1/file_version.py @@ -8,16 +8,66 @@ # ###################################################################### +from typing import Optional import datetime import functools from b2sdk import _v2 as v2 +from ..raw_api import SRC_LAST_MODIFIED_MILLIS -# override to retain old formatting methods +# Override to retain legacy class name, __init__ signature, slots +# and old formatting methods class FileVersionInfo(v2.FileVersion): + __slots__ = [ + 'id_', + 'file_name', + 'size', + 'content_type', + 'content_sha1', + 'content_md5', + 'file_info', + 'upload_timestamp', + 'action', + 'server_side_encryption', + 'mod_time_millis', + ] + LS_ENTRY_TEMPLATE = '%83s %6s %10s %8s %9d %s' # order is file_id, action, date, time, size, name + def __init__( + self, + id_, + file_name, + size, + content_type, + content_sha1, + file_info, + upload_timestamp, + action, + content_md5=None, + server_side_encryption: Optional[v2.EncryptionSetting] = None, + file_retention: Optional[v2.FileRetentionSetting] = None, + legal_hold: Optional[v2.LegalHold] = None, + ): + self.id_ = id_ + self.file_name = file_name + self.size = size + self.content_type = content_type + self.content_sha1 = content_sha1 + self.content_md5 = content_md5 + self.file_info = file_info or {} + self.upload_timestamp = upload_timestamp + self.action = action + self.server_side_encryption = server_side_encryption + self.legal_hold = legal_hold + self.file_retention = file_retention + + if SRC_LAST_MODIFIED_MILLIS in self.file_info: + self.mod_time_millis = int(self.file_info[SRC_LAST_MODIFIED_MILLIS]) + else: + self.mod_time_millis = self.upload_timestamp + def format_ls_entry(self): dt = datetime.datetime.utcfromtimestamp(self.upload_timestamp / 1000) date_str = dt.strftime('%Y-%m-%d') @@ -71,9 +121,19 @@ def inner(*a, **kw): class FileVersionInfoFactory(v2.FileVersionFactory): from_api_response = translate_single_file_version(v2.FileVersionFactory.from_api_response) - from_cancel_large_file_response = translate_single_file_version( - v2.FileVersionFactory.from_cancel_large_file_response - ) from_response_headers = translate_single_file_version( v2.FileVersionFactory.from_response_headers ) + + +def file_version_info_from_id_and_name(file_id_and_name: v2.FileIdAndName): + return FileVersionInfo( + id_=file_id_and_name.file_id, + file_name=file_id_and_name.file_name, + size=0, + content_type='unknown', + content_sha1='none', + file_info={}, + upload_timestamp=0, + action='cancel', + ) diff --git a/test/unit/api/__init__.py b/test/unit/api/__init__.py new file mode 100644 index 000000000..47b261ae7 --- /dev/null +++ b/test/unit/api/__init__.py @@ -0,0 +1,9 @@ +###################################################################### +# +# File: test/unit/api/__init__.py +# +# Copyright 2021 Backblaze Inc. All Rights Reserved. +# +# License https://www.backblaze.com/using_b2_code.html +# +###################################################################### diff --git a/test/unit/v1/test_api.py b/test/unit/api/test_api.py similarity index 58% rename from test/unit/v1/test_api.py rename to test/unit/api/test_api.py index 53b1b6e5e..3c5d026ff 100644 --- a/test/unit/v1/test_api.py +++ b/test/unit/api/test_api.py @@ -1,6 +1,6 @@ ###################################################################### # -# File: test/unit/v1/test_api.py +# File: test/unit/api/test_api.py # # Copyright 2021 Backblaze Inc. All Rights Reserved. # @@ -8,23 +8,31 @@ # ###################################################################### -from ..test_base import TestBase +import pytest -from .deps import B2Api -from .deps import DummyCache -from .deps import EncryptionAlgorithm -from .deps import EncryptionMode -from .deps import EncryptionSetting -from .deps import FileRetentionSetting -from .deps import InMemoryAccountInfo -from .deps import LegalHold -from .deps import RawSimulator -from .deps import RetentionMode -from .deps import NO_RETENTION_FILE_SETTING -from .deps_exception import RestrictedBucket +import apiver_deps +from apiver_deps import B2Api +from apiver_deps import DummyCache +from apiver_deps import EncryptionAlgorithm +from apiver_deps import EncryptionMode +from apiver_deps import EncryptionSetting +from apiver_deps import FileIdAndName +from apiver_deps import FileRetentionSetting +from apiver_deps import InMemoryAccountInfo +from apiver_deps import LegalHold +from apiver_deps import RawSimulator +from apiver_deps import RetentionMode +from apiver_deps import NO_RETENTION_FILE_SETTING +from apiver_deps_exception import RestrictedBucket +if apiver_deps.V <= 1: + from apiver_deps import FileVersionInfo as VFileVersion +else: + from apiver_deps import FileVersion as VFileVersion -class TestApi(TestBase): + +class TestApi: + @pytest.fixture(autouse=True) def setUp(self): self.account_info = InMemoryAccountInfo() self.cache = DummyCache() @@ -32,26 +40,59 @@ def setUp(self): self.api = B2Api(self.account_info, self.cache, self.raw_api) (self.application_key_id, self.master_key) = self.raw_api.create_account() - def test_list_buckets(self): + @pytest.mark.parametrize( + 'expected_delete_bucket_output', + [ + pytest.param(None, marks=pytest.mark.apiver(from_ver=1)), + pytest.param( + { + 'accountId': 'account-0', + 'bucketName': 'bucket2', + 'bucketId': 'bucket_1', + 'bucketType': 'allPrivate', + 'bucketInfo': {}, + 'corsRules': [], + 'lifecycleRules': [], + 'options': set(), + 'revision': 1, + 'defaultServerSideEncryption': + { + 'isClientAuthorizedToRead': True, + 'value': { + 'mode': 'none' + }, + }, + 'fileLockConfiguration': + { + 'isClientAuthorizedToRead': True, + 'value': + { + 'defaultRetention': { + 'mode': None, + 'period': None + }, + 'isFileLockEnabled': None + } + }, + }, + marks=pytest.mark.apiver(to_ver=0) + ), + ], + ) + def test_list_buckets(self, expected_delete_bucket_output): self._authorize_account() self.api.create_bucket('bucket1', 'allPrivate') bucket2 = self.api.create_bucket('bucket2', 'allPrivate') delete_output = self.api.delete_bucket(bucket2) - assert delete_output is None, delete_output + assert delete_output == expected_delete_bucket_output self.api.create_bucket('bucket3', 'allPrivate') - self.assertEqual( - ['bucket1', 'bucket3'], - [b.name for b in self.api.list_buckets()], - ) + assert [b.name for b in self.api.list_buckets()] == ['bucket1', 'bucket3'] def test_list_buckets_with_name(self): self._authorize_account() self.api.create_bucket('bucket1', 'allPrivate') self.api.create_bucket('bucket2', 'allPrivate') - self.assertEqual( - ['bucket1'], - [b.name for b in self.api.list_buckets(bucket_name='bucket1')], - ) + assert [b.name for b in self.api.list_buckets(bucket_name='bucket1')] == ['bucket1'] def test_buckets_with_encryption(self): self._authorize_account() @@ -94,15 +135,9 @@ def test_buckets_with_encryption(self): for b in self.api.list_buckets() # scan again with new key } - self.assertEqual( - buckets['bucket1'].default_server_side_encryption, - unknown_encryption, - ) + assert buckets['bucket1'].default_server_side_encryption == unknown_encryption - self.assertEqual( - buckets['bucket2'].default_server_side_encryption, - unknown_encryption, - ) + assert buckets['bucket2'].default_server_side_encryption == unknown_encryption def _check_if_bucket_is_encrypted(self, bucket_name, should_be_encrypted): buckets = {b.name: b for b in self.api.list_buckets()} @@ -116,32 +151,17 @@ def _verify_if_bucket_is_encrypted(self, bucket, should_be_encrypted): ) no_encryption = EncryptionSetting(mode=EncryptionMode.NONE,) if not should_be_encrypted: - self.assertEqual( - bucket.default_server_side_encryption, - no_encryption, - ) + assert bucket.default_server_side_encryption == no_encryption else: - self.assertEqual( - bucket.default_server_side_encryption, - sse_b2_aes, - ) - self.assertEqual( - bucket.default_server_side_encryption.mode, - EncryptionMode.SSE_B2, - ) - self.assertEqual( - bucket.default_server_side_encryption.algorithm, - EncryptionAlgorithm.AES256, - ) + assert bucket.default_server_side_encryption == sse_b2_aes + assert bucket.default_server_side_encryption.mode == EncryptionMode.SSE_B2 + assert bucket.default_server_side_encryption.algorithm == EncryptionAlgorithm.AES256 def test_list_buckets_with_id(self): self._authorize_account() bucket = self.api.create_bucket('bucket1', 'allPrivate') self.api.create_bucket('bucket2', 'allPrivate') - self.assertEqual( - ['bucket1'], - [b.name for b in self.api.list_buckets(bucket_id=bucket.id_)], - ) + assert [b.name for b in self.api.list_buckets(bucket_id=bucket.id_)] == ['bucket1'] def test_reauthorize_with_app_key(self): # authorize and create a key @@ -163,20 +183,14 @@ def test_list_buckets_with_restriction(self): self.api.create_bucket('bucket2', 'allPrivate') key = self.api.create_key(['listBuckets'], 'key1', bucket_id=bucket1.id_) self.api.authorize_account('production', key['applicationKeyId'], key['applicationKey']) - self.assertEqual( - ['bucket1'], - [b.name for b in self.api.list_buckets(bucket_name=bucket1.name)], - ) + assert [b.name for b in self.api.list_buckets(bucket_name=bucket1.name)] == ['bucket1'] def test_get_bucket_by_name_with_bucket_restriction(self): self._authorize_account() bucket1 = self.api.create_bucket('bucket1', 'allPrivate') key = self.api.create_key(['listBuckets'], 'key1', bucket_id=bucket1.id_) self.api.authorize_account('production', key['applicationKeyId'], key['applicationKey']) - self.assertEqual( - bucket1.id_, - self.api.get_bucket_by_name('bucket1').id_, - ) + assert self.api.get_bucket_by_name('bucket1').id_ == bucket1.id_ def test_list_buckets_with_restriction_and_wrong_name(self): self._authorize_account() @@ -184,10 +198,9 @@ def test_list_buckets_with_restriction_and_wrong_name(self): bucket2 = self.api.create_bucket('bucket2', 'allPrivate') key = self.api.create_key(['listBuckets'], 'key1', bucket_id=bucket1.id_) self.api.authorize_account('production', key['applicationKeyId'], key['applicationKey']) - with self.assertRaises( - RestrictedBucket, 'Application key is restricted to bucket: bucket1' - ): + with pytest.raises(RestrictedBucket) as excinfo: self.api.list_buckets(bucket_name=bucket2.name) + assert str(excinfo.value) == 'Application key is restricted to bucket: bucket1' def test_list_buckets_with_restriction_and_no_name(self): self._authorize_account() @@ -195,10 +208,9 @@ def test_list_buckets_with_restriction_and_no_name(self): self.api.create_bucket('bucket2', 'allPrivate') key = self.api.create_key(['listBuckets'], 'key1', bucket_id=bucket1.id_) self.api.authorize_account('production', key['applicationKeyId'], key['applicationKey']) - with self.assertRaises( - RestrictedBucket, 'Application key is restricted to bucket: bucket1' - ): + with pytest.raises(RestrictedBucket) as excinfo: self.api.list_buckets() + assert str(excinfo.value) == 'Application key is restricted to bucket: bucket1' def test_list_buckets_with_restriction_and_wrong_id(self): self._authorize_account() @@ -206,10 +218,9 @@ def test_list_buckets_with_restriction_and_wrong_id(self): self.api.create_bucket('bucket2', 'allPrivate') key = self.api.create_key(['listBuckets'], 'key1', bucket_id=bucket1.id_) self.api.authorize_account('production', key['applicationKeyId'], key['applicationKey']) - with self.assertRaises( - RestrictedBucket, 'Application key is restricted to bucket: %s' % (bucket1.id_,) - ): + with pytest.raises(RestrictedBucket) as excinfo: self.api.list_buckets(bucket_id='not the one bound to the key') + assert str(excinfo.value) == 'Application key is restricted to bucket: %s' % (bucket1.id_,) def _authorize_account(self): self.api.authorize_account('production', self.application_key_id, self.master_key) @@ -218,18 +229,47 @@ def test_update_file_retention(self): self._authorize_account() bucket = self.api.create_bucket('bucket1', 'allPrivate', is_file_lock_enabled=True) created_file = bucket.upload_bytes(b'hello world', 'file') - self.assertEqual(created_file.file_retention, NO_RETENTION_FILE_SETTING) + assert created_file.file_retention == NO_RETENTION_FILE_SETTING new_retention = FileRetentionSetting(RetentionMode.COMPLIANCE, 100) self.api.update_file_retention(created_file.id_, created_file.file_name, new_retention) file_version = bucket.get_file_info_by_id(created_file.id_) - self.assertEqual(new_retention, file_version.file_retention) + assert new_retention == file_version.file_retention def test_update_legal_hold(self): self._authorize_account() bucket = self.api.create_bucket('bucket1', 'allPrivate', is_file_lock_enabled=True) created_file = bucket.upload_bytes(b'hello world', 'file') - self.assertEqual(created_file.legal_hold, LegalHold.UNSET) + assert created_file.legal_hold == LegalHold.UNSET new_legal_hold = LegalHold.ON self.api.update_file_legal_hold(created_file.id_, created_file.file_name, new_legal_hold) file_version = bucket.get_file_info_by_id(created_file.id_) - self.assertEqual(new_legal_hold, file_version.legal_hold) + assert new_legal_hold == file_version.legal_hold + + @pytest.mark.apiver(from_ver=2) + def test_cancel_large_file_v2(self): + self._authorize_account() + bucket = self.api.create_bucket('bucket1', 'allPrivate') + unfinished_large_file = self.api.services.large_file.start_large_file( + bucket.id_, 'a_large_file' + ) + cancel_result = self.api.cancel_large_file(unfinished_large_file.file_id) + assert cancel_result == FileIdAndName('9999', 'a_large_file') + + @pytest.mark.apiver(to_ver=1) + def test_cancel_large_file_v1(self): + self._authorize_account() + bucket = self.api.create_bucket('bucket1', 'allPrivate') + unfinished_large_file = self.api.services.large_file.start_large_file( + bucket.id_, 'a_large_file' + ) + cancel_result = self.api.cancel_large_file(unfinished_large_file.file_id) + assert cancel_result == VFileVersion( + id_='9999', + file_name='a_large_file', + size=0, + content_type='unknown', + content_sha1='none', + file_info={}, + upload_timestamp=0, + action='cancel', + ) diff --git a/test/unit/bucket/__init__.py b/test/unit/bucket/__init__.py new file mode 100644 index 000000000..9d7c06249 --- /dev/null +++ b/test/unit/bucket/__init__.py @@ -0,0 +1,9 @@ +###################################################################### +# +# File: test/unit/bucket/__init__.py +# +# Copyright 2021 Backblaze Inc. All Rights Reserved. +# +# License https://www.backblaze.com/using_b2_code.html +# +###################################################################### diff --git a/test/unit/v1/test_bucket.py b/test/unit/bucket/test_bucket.py similarity index 95% rename from test/unit/v1/test_bucket.py rename to test/unit/bucket/test_bucket.py index e127a11c5..d91b83750 100644 --- a/test/unit/v1/test_bucket.py +++ b/test/unit/bucket/test_bucket.py @@ -1,6 +1,6 @@ ###################################################################### # -# File: test/unit/v1/test_bucket.py +# File: test/unit/bucket/test_bucket.py # # Copyright 2019 Backblaze Inc. All Rights Reserved. # @@ -17,7 +17,7 @@ from ..test_base import TestBase -from .deps_exception import ( +from apiver_deps_exception import ( AlreadyFailed, B2Error, B2RequestTimeoutDuringUpload, @@ -30,21 +30,27 @@ FileSha1Mismatch, SSECKeyError, ) -from .deps import B2Api -from .deps import LargeFileUploadState -from .deps import DownloadDestBytes, PreSeekedDownloadDest -from .deps import FileVersionInfo -from .deps import MetadataDirectiveMode -from .deps import Part -from .deps import AbstractProgressListener -from .deps import StubAccountInfo, RawSimulator, BucketSimulator, FakeResponse, FileSimulator -from .deps import ParallelDownloader -from .deps import SimpleDownloader -from .deps import UploadSourceBytes -from .deps import hex_sha1_of_bytes, TempDir -from .deps import EncryptionAlgorithm, EncryptionSetting, EncryptionMode, EncryptionKey, SSE_NONE, SSE_B2_AES -from .deps import CopySource, UploadSourceLocalFile, WriteIntent -from .deps import FileRetentionSetting, LegalHold, RetentionMode, NO_RETENTION_FILE_SETTING +from apiver_deps import B2Api +from apiver_deps import LargeFileUploadState +from apiver_deps import DownloadDestBytes, PreSeekedDownloadDest +from apiver_deps import MetadataDirectiveMode +from apiver_deps import Part +from apiver_deps import AbstractProgressListener +from apiver_deps import StubAccountInfo, RawSimulator, BucketSimulator, FakeResponse, FileSimulator +from apiver_deps import ParallelDownloader +from apiver_deps import SimpleDownloader +from apiver_deps import UploadSourceBytes +from apiver_deps import hex_sha1_of_bytes, TempDir +from apiver_deps import EncryptionAlgorithm, EncryptionSetting, EncryptionMode, EncryptionKey, SSE_NONE, SSE_B2_AES +from apiver_deps import CopySource, UploadSourceLocalFile, WriteIntent +from apiver_deps import FileRetentionSetting, LegalHold, RetentionMode, NO_RETENTION_FILE_SETTING +import apiver_deps +if apiver_deps.V <= 1: + from apiver_deps import FileVersionInfo as VFileVersionInfo +else: + from apiver_deps import FileVersion as VFileVersionInfo + +pytestmark = [pytest.mark.apiver(from_ver=1)] SSE_C_AES = EncryptionSetting( mode=EncryptionMode.SSE_C, @@ -211,10 +217,12 @@ def __call__(self, *args, **kwargs): class TestListParts(TestCaseWithBucket): + @pytest.mark.apiver(to_ver=1) def testEmpty(self): file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {}) self.assertEqual([], list(self.bucket.list_parts(file1.file_id, batch_size=1))) + @pytest.mark.apiver(to_ver=1) def testThree(self): file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {}) content = b'hello world' @@ -239,6 +247,7 @@ def testThree(self): class TestUploadPart(TestCaseWithBucket): + @pytest.mark.apiver(to_ver=1) def test_error_in_state(self): file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {}) content = b'hello world' @@ -259,10 +268,12 @@ class TestListUnfinished(TestCaseWithBucket): def test_empty(self): self.assertEqual([], list(self.bucket.list_unfinished_large_files())) + @pytest.mark.apiver(to_ver=1) def test_one(self): file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {}) self.assertEqual([file1], list(self.bucket.list_unfinished_large_files())) + @pytest.mark.apiver(to_ver=1) def test_three(self): file1 = self.bucket.start_large_file('file1.txt', 'text/plain', {}) file2 = self.bucket.start_large_file('file2.txt', 'text/plain', {}) @@ -271,6 +282,7 @@ def test_three(self): [file1, file2, file3], list(self.bucket.list_unfinished_large_files(batch_size=1)) ) + @pytest.mark.apiver(to_ver=1) def test_prefix(self): self.bucket.start_large_file('fileA', 'text/plain', {}) file2 = self.bucket.start_large_file('fileAB', 'text/plain', {}) @@ -294,7 +306,7 @@ def test_version_by_name(self): info = self.bucket.get_file_info_by_name('a') - self.assertIsInstance(info, FileVersionInfo) + self.assertIsInstance(info, VFileVersionInfo) expected = ( a_id, 'a', 11, None, 'b2/x-auto', 'none', NO_RETENTION_FILE_SETTING, LegalHold.UNSET ) @@ -353,7 +365,7 @@ def test_version_by_id(self): info = self.bucket.get_file_info_by_id(b_id) - self.assertIsInstance(info, FileVersionInfo) + self.assertIsInstance(info, VFileVersionInfo) expected = (b_id, 'b', 11, 'upload', 'b2/x-auto', 'none') actual = ( info.id_, info.file_name, info.size, info.action, info.content_type, @@ -418,6 +430,7 @@ def test_three_files_multiple_versions(self): ] self.assertEqual(expected, actual) + @pytest.mark.apiver(to_ver=1) def test_started_large_file(self): self.bucket.start_large_file('hello.txt') expected = [('hello.txt', 0, 'start', None)] @@ -548,18 +561,21 @@ def test_encryption(self): class TestCopyFile(TestCaseWithBucket): + @pytest.mark.apiver(to_ver=1) def test_copy_without_optional_params(self): file_id = self._make_file() self.bucket.copy_file(file_id, 'hello_new.txt') expected = [('hello.txt', 11, 'upload', None), ('hello_new.txt', 11, 'copy', None)] self.assertBucketContents(expected, '', show_versions=True) + @pytest.mark.apiver(to_ver=1) def test_copy_with_range(self): file_id = self._make_file() self.bucket.copy_file(file_id, 'hello_new.txt', bytes_range=(3, 9)) expected = [('hello.txt', 11, 'upload', None), ('hello_new.txt', 6, 'copy', None)] self.assertBucketContents(expected, '', show_versions=True) + @pytest.mark.apiver(to_ver=1) def test_copy_with_invalid_metadata(self): file_id = self._make_file() try: @@ -578,6 +594,7 @@ def test_copy_with_invalid_metadata(self): expected = [('hello.txt', 11, 'upload', None)] self.assertBucketContents(expected, '', show_versions=True) + @pytest.mark.apiver(to_ver=1) def test_copy_with_invalid_metadata_replace(self): file_id = self._make_file() try: @@ -595,6 +612,7 @@ def test_copy_with_invalid_metadata_replace(self): expected = [('hello.txt', 11, 'upload', None)] self.assertBucketContents(expected, '', show_versions=True) + @pytest.mark.apiver(to_ver=1) def test_copy_with_replace_metadata(self): file_id = self._make_file() self.bucket.copy_file( @@ -613,6 +631,7 @@ def test_copy_with_replace_metadata(self): ] self.assertEqual(expected, actual) + @pytest.mark.apiver(to_ver=1) def test_copy_with_unsatisfied_range(self): file_id = self._make_file() try: @@ -630,6 +649,7 @@ def test_copy_with_unsatisfied_range(self): expected = [('hello.txt', 11, 'upload', None)] self.assertBucketContents(expected, '', show_versions=True) + @pytest.mark.apiver(to_ver=1) def test_copy_with_different_bucket(self): source_bucket = self.api.create_bucket('source-bucket', 'allPublic') file_id = self._make_file(source_bucket) @@ -783,7 +803,7 @@ def test_copy_encryption(self): ]: with self.subTest(kwargs=kwargs, length=length, data=data): file_info = self.bucket.copy(**kwargs, new_file_name='new_file', length=length) - self.assertTrue(isinstance(file_info, FileVersionInfo)) + self.assertTrue(isinstance(file_info, VFileVersionInfo)) self.assertEqual(file_info.server_side_encryption, expected_encryption) def _make_file(self, bucket=None): @@ -796,7 +816,7 @@ class TestUpload(TestCaseWithBucket): def test_upload_bytes(self): data = b'hello world' file_info = self.bucket.upload_bytes(data, 'file1') - self.assertTrue(isinstance(file_info, FileVersionInfo)) + self.assertTrue(isinstance(file_info, VFileVersionInfo)) self._check_file_contents('file1', data) self.assertEqual(file_info.server_side_encryption, SSE_NONE) @@ -813,13 +833,13 @@ def test_upload_bytes_file_retention(self): def test_upload_bytes_sse_b2(self): data = b'hello world' file_info = self.bucket.upload_bytes(data, 'file1', encryption=SSE_B2_AES) - self.assertTrue(isinstance(file_info, FileVersionInfo)) + self.assertTrue(isinstance(file_info, VFileVersionInfo)) self.assertEqual(file_info.server_side_encryption, SSE_B2_AES) def test_upload_bytes_sse_c(self): data = b'hello world' file_info = self.bucket.upload_bytes(data, 'file1', encryption=SSE_C_AES) - self.assertTrue(isinstance(file_info, FileVersionInfo)) + self.assertTrue(isinstance(file_info, VFileVersionInfo)) self.assertEqual(SSE_C_AES_NO_SECRET, file_info.server_side_encryption) def test_upload_local_file_sse_b2(self): @@ -828,7 +848,7 @@ def test_upload_local_file_sse_b2(self): data = b'hello world' write_file(path, data) file_info = self.bucket.upload_local_file(path, 'file1', encryption=SSE_B2_AES) - self.assertTrue(isinstance(file_info, FileVersionInfo)) + self.assertTrue(isinstance(file_info, VFileVersionInfo)) self.assertEqual(file_info.server_side_encryption, SSE_B2_AES) self._check_file_contents('file1', data) @@ -838,7 +858,7 @@ def test_upload_local_file_sse_c(self): data = b'hello world' write_file(path, data) file_info = self.bucket.upload_local_file(path, 'file1', encryption=SSE_C_AES) - self.assertTrue(isinstance(file_info, FileVersionInfo)) + self.assertTrue(isinstance(file_info, VFileVersionInfo)) self.assertEqual(SSE_C_AES_NO_SECRET, file_info.server_side_encryption) self._check_file_contents('file1', data) @@ -872,7 +892,7 @@ def test_upload_local_file(self): write_file(path, data) file_info = self.bucket.upload_local_file(path, 'file1') self._check_file_contents('file1', data) - self.assertTrue(isinstance(file_info, FileVersionInfo)) + self.assertTrue(isinstance(file_info, VFileVersionInfo)) self.assertEqual(file_info.server_side_encryption, SSE_NONE) print(file_info.as_dict()) self.assertEqual(file_info.as_dict()['serverSideEncryption'], {'mode': 'none'}) @@ -1081,7 +1101,7 @@ def test_create_remote(self): ], file_name='created_file' ) - self.assertIsInstance(created_file, FileVersionInfo) + self.assertIsInstance(created_file, VFileVersionInfo) actual = ( created_file.id_, created_file.file_name, created_file.size, created_file.server_side_encryption @@ -1106,7 +1126,7 @@ def test_create_remote_encryption(self): file_name='created_file_%s' % (len(data),), encryption=SSE_C_AES ) - self.assertIsInstance(created_file, FileVersionInfo) + self.assertIsInstance(created_file, VFileVersionInfo) actual = ( created_file.id_, created_file.file_name, created_file.size, created_file.server_side_encryption diff --git a/test/unit/conftest.py b/test/unit/conftest.py index 2510007c8..fe94258ba 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -108,6 +108,22 @@ def test_illegal_regex(self, exc_class, exc_msg): with pytest.raises(exc_class, match=exc_msg): error_raising_function() + If a test is merked with "apiver" multiple times, it will be skipped if at least one of the apiver conditions + cause it to be skipped. E.g. if a test module is marked with + + .. code-block:: python + + pytestmark = [pytest.mark.apiver(from_ver=1)] + + and a test function is marked with + + .. code-block:: python + + @pytest.mark.apiver(to_ver=1) + def test_function(self): + ... + + the test function will be run only for apiver=v1 """ for mark in item.iter_markers(name='apiver'): if mark.args and mark.kwargs: diff --git a/test/unit/sync/fixtures.py b/test/unit/sync/fixtures.py index 2fbbfdf30..c140d172e 100644 --- a/test/unit/sync/fixtures.py +++ b/test/unit/sync/fixtures.py @@ -41,6 +41,14 @@ def _file_versions(self, name, mod_times, size=10): times are hides. It's a hack, but it works. """ + if apiver_deps.V <= 1: + mandatory_kwargs = {} + else: + mandatory_kwargs = { + 'api': None, + 'content_md5': 'content_md5', + 'server_side_encryption': None, + } return [ VFileVersion( id_='id_%s_%d' % (name[0], abs(mod_time)), @@ -51,6 +59,7 @@ def _file_versions(self, name, mod_times, size=10): file_info={'in_b2': 'yes'}, content_type='text/plain', content_sha1='content_sha1', + **mandatory_kwargs, ) for mod_time in mod_times ] # yapf disable diff --git a/test/unit/v0/test_api.py b/test/unit/v0/test_api.py deleted file mode 100644 index cd1d3c03a..000000000 --- a/test/unit/v0/test_api.py +++ /dev/null @@ -1,178 +0,0 @@ -###################################################################### -# -# File: test/unit/v0/test_api.py -# -# Copyright 2019 Backblaze Inc. All Rights Reserved. -# -# License https://www.backblaze.com/using_b2_code.html -# -###################################################################### - -from ..test_base import TestBase - -from .deps import B2Api -from .deps import DummyCache -from .deps import InMemoryAccountInfo -from .deps import LegalHold, FileRetentionSetting, RetentionMode, NO_RETENTION_FILE_SETTING -from .deps import RawSimulator -from .deps_exception import RestrictedBucket - - -class TestApi(TestBase): - def setUp(self): - self.account_info = InMemoryAccountInfo() - self.cache = DummyCache() - self.raw_api = RawSimulator() - self.api = B2Api(self.account_info, self.cache, self.raw_api) - (self.application_key_id, self.master_key) = self.raw_api.create_account() - - def test_list_buckets(self): - self._authorize_account() - self.api.create_bucket('bucket1', 'allPrivate') - bucket2 = self.api.create_bucket('bucket2', 'allPrivate') - delete_output = self.api.delete_bucket(bucket2) - expected = { - 'accountId': 'account-0', - 'bucketName': 'bucket2', - 'bucketId': 'bucket_1', - 'bucketType': 'allPrivate', - 'bucketInfo': {}, - 'corsRules': [], - 'lifecycleRules': [], - 'options': set(), - 'revision': 1, - 'defaultServerSideEncryption': - { - 'isClientAuthorizedToRead': True, - 'value': { - 'mode': 'none' - }, - }, - 'fileLockConfiguration': - { - 'isClientAuthorizedToRead': True, - 'value': - { - 'defaultRetention': { - 'mode': None, - 'period': None - }, - 'isFileLockEnabled': None - } - }, - } # yapf: off - assert delete_output == expected, delete_output - self.api.create_bucket('bucket3', 'allPrivate') - self.assertEqual( - ['bucket1', 'bucket3'], - [b.name for b in self.api.list_buckets()], - ) - - def test_list_buckets_with_name(self): - self._authorize_account() - self.api.create_bucket('bucket1', 'allPrivate') - self.api.create_bucket('bucket2', 'allPrivate') - self.assertEqual( - ['bucket1'], - [b.name for b in self.api.list_buckets(bucket_name='bucket1')], - ) - - def test_list_buckets_with_id(self): - self._authorize_account() - bucket = self.api.create_bucket('bucket1', 'allPrivate') - self.api.create_bucket('bucket2', 'allPrivate') - self.assertEqual( - ['bucket1'], - [b.name for b in self.api.list_buckets(bucket_id=bucket.id_)], - ) - - def test_reauthorize_with_app_key(self): - # authorize and create a key - self._authorize_account() - key = self.api.create_key(['listBuckets'], 'key1') - - # authorize with the key - self.api.authorize_account('production', key['applicationKeyId'], key['applicationKey']) - - # expire the auth token we just got - self.raw_api.expire_auth_token(self.account_info.get_account_auth_token()) - - # listing buckets should work, after it re-authorizes - self.api.list_buckets() - - def test_list_buckets_with_restriction(self): - self._authorize_account() - bucket1 = self.api.create_bucket('bucket1', 'allPrivate') - self.api.create_bucket('bucket2', 'allPrivate') - key = self.api.create_key(['listBuckets'], 'key1', bucket_id=bucket1.id_) - self.api.authorize_account('production', key['applicationKeyId'], key['applicationKey']) - self.assertEqual( - ['bucket1'], - [b.name for b in self.api.list_buckets(bucket_name=bucket1.name)], - ) - - def test_get_bucket_by_name_with_bucket_restriction(self): - self._authorize_account() - bucket1 = self.api.create_bucket('bucket1', 'allPrivate') - key = self.api.create_key(['listBuckets'], 'key1', bucket_id=bucket1.id_) - self.api.authorize_account('production', key['applicationKeyId'], key['applicationKey']) - self.assertEqual( - bucket1.id_, - self.api.get_bucket_by_name('bucket1').id_, - ) - - def test_list_buckets_with_restriction_and_wrong_name(self): - self._authorize_account() - bucket1 = self.api.create_bucket('bucket1', 'allPrivate') - bucket2 = self.api.create_bucket('bucket2', 'allPrivate') - key = self.api.create_key(['listBuckets'], 'key1', bucket_id=bucket1.id_) - self.api.authorize_account('production', key['applicationKeyId'], key['applicationKey']) - with self.assertRaises( - RestrictedBucket, 'Application key is restricted to bucket: bucket1' - ): - self.api.list_buckets(bucket_name=bucket2.name) - - def test_list_buckets_with_restriction_and_no_name(self): - self._authorize_account() - bucket1 = self.api.create_bucket('bucket1', 'allPrivate') - self.api.create_bucket('bucket2', 'allPrivate') - key = self.api.create_key(['listBuckets'], 'key1', bucket_id=bucket1.id_) - self.api.authorize_account('production', key['applicationKeyId'], key['applicationKey']) - with self.assertRaises( - RestrictedBucket, 'Application key is restricted to bucket: bucket1' - ): - self.api.list_buckets() - - def test_list_buckets_with_restriction_and_wrong_id(self): - self._authorize_account() - bucket1 = self.api.create_bucket('bucket1', 'allPrivate') - self.api.create_bucket('bucket2', 'allPrivate') - key = self.api.create_key(['listBuckets'], 'key1', bucket_id=bucket1.id_) - self.api.authorize_account('production', key['applicationKeyId'], key['applicationKey']) - with self.assertRaises( - RestrictedBucket, 'Application key is restricted to bucket: %s' % (bucket1.id_,) - ): - self.api.list_buckets(bucket_id='not the one bound to the key') - - def _authorize_account(self): - self.api.authorize_account('production', self.application_key_id, self.master_key) - - def test_update_file_retention(self): - self._authorize_account() - bucket = self.api.create_bucket('bucket1', 'allPrivate', is_file_lock_enabled=True) - created_file = bucket.upload_bytes(b'hello world', 'file') - self.assertEqual(created_file.file_retention, NO_RETENTION_FILE_SETTING) - new_retention = FileRetentionSetting(RetentionMode.COMPLIANCE, 100) - self.api.update_file_retention(created_file.id_, created_file.file_name, new_retention) - file_version = bucket.get_file_info_by_id(created_file.id_) - self.assertEqual(new_retention, file_version.file_retention) - - def test_update_legal_hold(self): - self._authorize_account() - bucket = self.api.create_bucket('bucket1', 'allPrivate', is_file_lock_enabled=True) - created_file = bucket.upload_bytes(b'hello world', 'file') - self.assertEqual(created_file.legal_hold, LegalHold.UNSET) - new_legal_hold = LegalHold.ON - self.api.update_file_legal_hold(created_file.id_, created_file.file_name, new_legal_hold) - file_version = bucket.get_file_info_by_id(created_file.id_) - self.assertEqual(new_legal_hold, file_version.legal_hold)