Skip to content

Commit

Permalink
file-based: allow connector to provide permissions and identities sch…
Browse files Browse the repository at this point in the history
…emas
  • Loading branch information
aldogonzalez8 committed Feb 11, 2025
1 parent a6d1b62 commit a7081d3
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 32 deletions.
7 changes: 5 additions & 2 deletions airbyte_cdk/sources/file_based/file_based_stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use_file_transfer,
)
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import schemaless_schema


class FileReadMode(Enum):
Expand All @@ -28,6 +29,8 @@ class FileReadMode(Enum):

class AbstractFileBasedStreamReader(ABC):
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
REMOTE_FILE_PERMISSIONS_SCHEMA = schemaless_schema
REMOTE_FILE_IDENTITY_SCHEMA = schemaless_schema

def __init__(self) -> None:
self._config = None
Expand Down Expand Up @@ -192,7 +195,7 @@ def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) ->
ACL Permissions from files.
"""
raise NotImplementedError(
f"{self.__class__.__name__} required to support get_file_acl_permissions()"
f"{self.__class__.__name__} required to support get_file_acl_permissions(), please update REMOTE_FILE_PERMISSIONS_SCHEMA accordingly"
)

@abstractmethod
Expand All @@ -202,5 +205,5 @@ def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any
identities.
"""
raise NotImplementedError(
f"{self.__class__.__name__} required to support load_identity_groups()"
f"{self.__class__.__name__} required to support load_identity_groups(), please update REMOTE_FILE_IDENTITY_SCHEMA accordingly"
)
25 changes: 0 additions & 25 deletions airbyte_cdk/sources/file_based/schema_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,6 @@
"properties": {"data": {"type": "object"}, "file": {"type": "object"}},
}

remote_file_permissions_schema = {
"type": "object",
"properties": {
"id": {"type": "string"},
"file_path": {"type": "string"},
"allowed_identity_remote_ids": {"type": "array", "items": {"type": "string"}},
"publicly_accessible": {"type": "boolean"},
},
}

remote_file_identity_schema = {
"type": "object",
"properties": {
"id": {"type": "string"},
"remote_id": {"type": "string"},
"parent_id": {"type": ["null", "string"]},
"name": {"type": ["null", "string"]},
"description": {"type": ["null", "string"]},
"email_address": {"type": ["null", "string"]},
"member_email_addresses": {"type": ["null", "array"]},
"type": {"type": "string"},
"modified_at": {"type": "string"},
},
}


@total_ordering
class ComparableType(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
SchemaType,
file_transfer_schema,
merge_schemas,
remote_file_permissions_schema,
schemaless_schema,
)
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
Expand Down Expand Up @@ -111,7 +110,7 @@ def _filter_schema_invalid_properties(
},
}
elif self.use_permissions_transfer:
return remote_file_permissions_schema
return self.stream_reader.REMOTE_FILE_PERMISSIONS_SCHEMA
else:
return super()._filter_schema_invalid_properties(configured_catalog_json_schema)

Expand Down Expand Up @@ -315,7 +314,7 @@ def _get_raw_json_schema(self) -> JsonSchema:
if self.use_file_transfer:
return file_transfer_schema
elif self.use_permissions_transfer:
return remote_file_permissions_schema
return self.stream_reader.REMOTE_FILE_PERMISSIONS_SCHEMA
elif self.config.input_schema:
return self.config.get_input_schema() # type: ignore
elif self.config.schemaless:
Expand Down
3 changes: 1 addition & 2 deletions airbyte_cdk/sources/file_based/stream/identities_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector, FileBasedSourceError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.schema_helpers import remote_file_identity_schema
from airbyte_cdk.sources.file_based.types import StreamSlice
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.checkpoint import Cursor
Expand Down Expand Up @@ -89,7 +88,7 @@ def load_identity_groups(self) -> Iterable[Dict[str, Any]]:

@cache
def get_json_schema(self) -> JsonSchema:
return remote_file_identity_schema
return self.stream_reader.REMOTE_FILE_IDENTITY_SCHEMA

@property
def name(self) -> str:
Expand Down

0 comments on commit a7081d3

Please sign in to comment.