Skip to content

Commit

Permalink
file-based: add method in stream reader to obtain Identities schema
Browse files Browse the repository at this point in the history
  • Loading branch information
aldogonzalez8 committed Feb 11, 2025
1 parent 4962949 commit ea31a2d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
17 changes: 14 additions & 3 deletions airbyte_cdk/sources/file_based/file_based_stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) ->
ACL Permissions from files.
"""
raise NotImplementedError(
f"{self.__class__.__name__} required to support ACL permissions, please update file_permissions_schema accordingly to obtain the required schema for each stream on the source implementation"
f"{self.__class__.__name__} required to support ACL permissions, please update file_permissions_schema accordingly."
)

@abstractmethod
Expand All @@ -202,7 +202,7 @@ def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any
identities.
"""
raise NotImplementedError(
f"{self.__class__.__name__} required to support identities, please add schema for your identities stream in schemas folder"
f"{self.__class__.__name__} required to support identities, please update identities_schema."
)

@property
Expand All @@ -213,5 +213,16 @@ def file_permissions_schema(self) -> Dict[str, Any]:
ACL Permissions from files.
"""
raise NotImplementedError(
f"{self.__class__.__name__} required to support ACL Permissions, please return required json schema for your permissions streams"
f"{self.__class__.__name__} required to support ACL Permissions, please return required json schema for your permissions streams."
)

@property
@abstractmethod
def identities_schema(self) -> Dict[str, Any]:
"""
This is required for connectors that will support syncing
identities.
"""
raise NotImplementedError(
f"{self.__class__.__name__} required to support fetch Identities, please return required json schema for your Identities stream."
)
5 changes: 5 additions & 0 deletions airbyte_cdk/sources/file_based/stream/identities_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.streams.permissions.identities import Identities
from airbyte_cdk.sources.streams.core import JsonSchema


class FileIdentities(Identities):
Expand Down Expand Up @@ -40,3 +41,7 @@ def primary_key(self) -> PrimaryKeyType:

def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
return self.stream_reader.load_identity_groups(logger=self.logger)

@cache
def get_json_schema(self) -> JsonSchema:
return self.stream_reader.identities_schema

0 comments on commit ea31a2d

Please sign in to comment.