Skip to content

Commit

Permalink
file-based: update interfaces to obtain schemas for file_permissions …
Browse files Browse the repository at this point in the history
…and identities
  • Loading branch information
aldogonzalez8 committed Feb 11, 2025
1 parent ad4c49c commit 750935c
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@

from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec

DELIVERY_TYPE_KEY = "delivery_type"
DELIVERY_TYPE_PERMISSION_TRANSFER_MODE_VALUE = "use_permissions_transfer"
PRESERVE_DIRECTORY_STRUCTURE_KEY = "preserve_directory_structure"
INCLUDE_IDENTITIES_STREAM_KEY = "include_identities_stream"


def use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool:
return (
hasattr(parsed_config.delivery_method, "delivery_type")
hasattr(parsed_config.delivery_method, DELIVERY_TYPE_KEY)
and parsed_config.delivery_method.delivery_type == "use_file_transfer"
)

Expand All @@ -27,7 +32,7 @@ def preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool:
"""
if (
use_file_transfer(parsed_config)
and hasattr(parsed_config.delivery_method, "preserve_directory_structure")
and hasattr(parsed_config.delivery_method, PRESERVE_DIRECTORY_STRUCTURE_KEY)
and parsed_config.delivery_method.preserve_directory_structure is not None
):
return parsed_config.delivery_method.preserve_directory_structure
Expand All @@ -36,15 +41,16 @@ def preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool:

def use_permissions_transfer(parsed_config: AbstractFileBasedSpec) -> bool:
return (
hasattr(parsed_config.delivery_method, "delivery_type")
and parsed_config.delivery_method.delivery_type == "use_permissions_transfer"
hasattr(parsed_config.delivery_method, DELIVERY_TYPE_KEY)
and parsed_config.delivery_method.delivery_type
== DELIVERY_TYPE_PERMISSION_TRANSFER_MODE_VALUE
)


def include_identities_stream(parsed_config: AbstractFileBasedSpec) -> bool:
if (
use_permissions_transfer(parsed_config)
and hasattr(parsed_config.delivery_method, "include_identities_stream")
and hasattr(parsed_config.delivery_method, INCLUDE_IDENTITIES_STREAM_KEY)
and parsed_config.delivery_method.include_identities_stream is not None
):
return parsed_config.delivery_method.include_identities_stream
Expand Down
11 changes: 6 additions & 5 deletions airbyte_cdk/sources/file_based/file_based_stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use_file_transfer,
)
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import schemaless_schema


class FileReadMode(Enum):
Expand All @@ -29,8 +28,6 @@ class FileReadMode(Enum):

class AbstractFileBasedStreamReader(ABC):
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
REMOTE_FILE_PERMISSIONS_SCHEMA = schemaless_schema
REMOTE_FILE_IDENTITY_SCHEMA = schemaless_schema

def __init__(self) -> None:
self._config = None
Expand Down Expand Up @@ -195,7 +192,7 @@ def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) ->
ACL Permissions from files.
"""
raise NotImplementedError(
f"{self.__class__.__name__} required to support get_file_acl_permissions(), please update REMOTE_FILE_PERMISSIONS_SCHEMA accordingly"
f"{self.__class__.__name__} required to support get_file_acl_permissions(), please update file_permissions_schema accordingly to obtain the required schema for each stream on the source implementation"
)

@abstractmethod
Expand All @@ -205,5 +202,9 @@ def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any
identities.
"""
raise NotImplementedError(
f"{self.__class__.__name__} required to support load_identity_groups(), please update REMOTE_FILE_IDENTITY_SCHEMA accordingly"
f"{self.__class__.__name__} required to support load_identity_groups(), please add schema for your identities stream in schemas folder"
)

@property
@abstractmethod
def file_permissions_schema(self) -> Dict[str, Any]: ...
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def _filter_schema_invalid_properties(
},
}
elif self.use_permissions_transfer:
return self.stream_reader.REMOTE_FILE_PERMISSIONS_SCHEMA
return self.stream_reader.file_permissions_schema
else:
return super()._filter_schema_invalid_properties(configured_catalog_json_schema)

Expand Down Expand Up @@ -314,7 +314,7 @@ def _get_raw_json_schema(self) -> JsonSchema:
if self.use_file_transfer:
return file_transfer_schema
elif self.use_permissions_transfer:
return self.stream_reader.REMOTE_FILE_PERMISSIONS_SCHEMA
return self.stream_reader.file_permissions_schema
elif self.config.input_schema:
return self.config.get_input_schema() # type: ignore
elif self.config.schemaless:
Expand Down
5 changes: 0 additions & 5 deletions airbyte_cdk/sources/file_based/stream/identities_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.streams.core import JsonSchema
from airbyte_cdk.sources.streams.permissions.identities import Identities


Expand Down Expand Up @@ -41,7 +40,3 @@ def primary_key(self) -> PrimaryKeyType:

def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
return self.stream_reader.load_identity_groups(logger=self.logger)

@cache
def get_json_schema(self) -> JsonSchema:
return self.stream_reader.REMOTE_FILE_IDENTITY_SCHEMA
4 changes: 4 additions & 0 deletions unit_tests/sources/file_based/in_memory_files_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) ->
def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
return [{}]

@property
def file_permissions_schema(self) -> Dict[str, Any]:
return {"type": "object", "properties": {}}

def open_file(
self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger
) -> IOBase:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) ->
def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
return [{}]

@property
def file_permissions_schema(self) -> Dict[str, Any]:
return {"type": "object", "properties": {}}


class TestSpec(AbstractFileBasedSpec):
@classmethod
Expand Down

0 comments on commit 750935c

Please sign in to comment.