From 750935cb981bfbd6210b099b727247dd81860d31 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Tue, 11 Feb 2025 13:24:52 -0600 Subject: [PATCH] file-based: update interfaces to obtain schemas for file_permissions and identities --- .../config/validate_config_transfer_modes.py | 16 +++++++++++----- .../file_based/file_based_stream_reader.py | 11 ++++++----- .../stream/default_file_based_stream.py | 4 ++-- .../file_based/stream/identities_stream.py | 5 ----- .../sources/file_based/in_memory_files_source.py | 4 ++++ .../file_based/test_file_based_stream_reader.py | 4 ++++ 6 files changed, 27 insertions(+), 17 deletions(-) diff --git a/airbyte_cdk/sources/file_based/config/validate_config_transfer_modes.py b/airbyte_cdk/sources/file_based/config/validate_config_transfer_modes.py index f14c36899..b3198fb39 100644 --- a/airbyte_cdk/sources/file_based/config/validate_config_transfer_modes.py +++ b/airbyte_cdk/sources/file_based/config/validate_config_transfer_modes.py @@ -4,10 +4,15 @@ from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec +DELIVERY_TYPE_KEY = "delivery_type" +DELIVERY_TYPE_PERMISSION_TRANSFER_MODE_VALUE = "use_permissions_transfer" +PRESERVE_DIRECTORY_STRUCTURE_KEY = "preserve_directory_structure" +INCLUDE_IDENTITIES_STREAM_KEY = "include_identities_stream" + def use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool: return ( - hasattr(parsed_config.delivery_method, "delivery_type") + hasattr(parsed_config.delivery_method, DELIVERY_TYPE_KEY) and parsed_config.delivery_method.delivery_type == "use_file_transfer" ) @@ -27,7 +32,7 @@ def preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool: """ if ( use_file_transfer(parsed_config) - and hasattr(parsed_config.delivery_method, "preserve_directory_structure") + and hasattr(parsed_config.delivery_method, PRESERVE_DIRECTORY_STRUCTURE_KEY) and parsed_config.delivery_method.preserve_directory_structure is not None ): return parsed_config.delivery_method.preserve_directory_structure @@ -36,15 +41,16 @@ def preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool: def use_permissions_transfer(parsed_config: AbstractFileBasedSpec) -> bool: return ( - hasattr(parsed_config.delivery_method, "delivery_type") - and parsed_config.delivery_method.delivery_type == "use_permissions_transfer" + hasattr(parsed_config.delivery_method, DELIVERY_TYPE_KEY) + and parsed_config.delivery_method.delivery_type + == DELIVERY_TYPE_PERMISSION_TRANSFER_MODE_VALUE ) def include_identities_stream(parsed_config: AbstractFileBasedSpec) -> bool: if ( use_permissions_transfer(parsed_config) - and hasattr(parsed_config.delivery_method, "include_identities_stream") + and hasattr(parsed_config.delivery_method, INCLUDE_IDENTITIES_STREAM_KEY) and parsed_config.delivery_method.include_identities_stream is not None ): return parsed_config.delivery_method.include_identities_stream diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index f3e6cacf6..4b53a6068 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -19,7 +19,6 @@ use_file_transfer, ) from airbyte_cdk.sources.file_based.remote_file import RemoteFile -from airbyte_cdk.sources.file_based.schema_helpers import schemaless_schema class FileReadMode(Enum): @@ -29,8 +28,6 @@ class FileReadMode(Enum): class AbstractFileBasedStreamReader(ABC): DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" - REMOTE_FILE_PERMISSIONS_SCHEMA = schemaless_schema - REMOTE_FILE_IDENTITY_SCHEMA = schemaless_schema def __init__(self) -> None: self._config = None @@ -195,7 +192,7 @@ def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> ACL Permissions from files. """ raise NotImplementedError( - f"{self.__class__.__name__} required to support get_file_acl_permissions(), please update REMOTE_FILE_PERMISSIONS_SCHEMA accordingly" + f"{self.__class__.__name__} required to support get_file_acl_permissions(), please update file_permissions_schema accordingly to obtain the required schema for each stream on the source implementation" ) @abstractmethod @@ -205,5 +202,9 @@ def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any identities. """ raise NotImplementedError( - f"{self.__class__.__name__} required to support load_identity_groups(), please update REMOTE_FILE_IDENTITY_SCHEMA accordingly" + f"{self.__class__.__name__} required to support load_identity_groups(), please add schema for your identities stream in schemas folder" ) + + @property + @abstractmethod + def file_permissions_schema(self) -> Dict[str, Any]: ... diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index 2efdf8eab..9bb91b2ca 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -110,7 +110,7 @@ def _filter_schema_invalid_properties( }, } elif self.use_permissions_transfer: - return self.stream_reader.REMOTE_FILE_PERMISSIONS_SCHEMA + return self.stream_reader.file_permissions_schema else: return super()._filter_schema_invalid_properties(configured_catalog_json_schema) @@ -314,7 +314,7 @@ def _get_raw_json_schema(self) -> JsonSchema: if self.use_file_transfer: return file_transfer_schema elif self.use_permissions_transfer: - return self.stream_reader.REMOTE_FILE_PERMISSIONS_SCHEMA + return self.stream_reader.file_permissions_schema elif self.config.input_schema: return self.config.get_input_schema() # type: ignore elif self.config.schemaless: diff --git a/airbyte_cdk/sources/file_based/stream/identities_stream.py b/airbyte_cdk/sources/file_based/stream/identities_stream.py index 0dc7cd3c6..4215367a3 100644 --- a/airbyte_cdk/sources/file_based/stream/identities_stream.py +++ b/airbyte_cdk/sources/file_based/stream/identities_stream.py @@ -9,7 +9,6 @@ from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader -from airbyte_cdk.sources.streams.core import JsonSchema from airbyte_cdk.sources.streams.permissions.identities import Identities @@ -41,7 +40,3 @@ def primary_key(self) -> PrimaryKeyType: def load_identity_groups(self) -> Iterable[Dict[str, Any]]: return self.stream_reader.load_identity_groups(logger=self.logger) - - @cache - def get_json_schema(self) -> JsonSchema: - return self.stream_reader.REMOTE_FILE_IDENTITY_SCHEMA diff --git a/unit_tests/sources/file_based/in_memory_files_source.py b/unit_tests/sources/file_based/in_memory_files_source.py index 8f4a3d2ca..244d58c88 100644 --- a/unit_tests/sources/file_based/in_memory_files_source.py +++ b/unit_tests/sources/file_based/in_memory_files_source.py @@ -151,6 +151,10 @@ def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: return [{}] + @property + def file_permissions_schema(self) -> Dict[str, Any]: + return {"type": "object", "properties": {}} + def open_file( self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger ) -> IOBase: diff --git a/unit_tests/sources/file_based/test_file_based_stream_reader.py b/unit_tests/sources/file_based/test_file_based_stream_reader.py index 6830a31d6..d27055384 100644 --- a/unit_tests/sources/file_based/test_file_based_stream_reader.py +++ b/unit_tests/sources/file_based/test_file_based_stream_reader.py @@ -90,6 +90,10 @@ def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: return [{}] + @property + def file_permissions_schema(self) -> Dict[str, Any]: + return {"type": "object", "properties": {}} + class TestSpec(AbstractFileBasedSpec): @classmethod