Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(file-based): new AbstractFileBasedStreamPermissionsReader #402

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion airbyte_cdk/sources/file_based/file_based_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
FileBasedErrorsCollector,
FileBasedSourceError,
)
from airbyte_cdk.sources.file_based.file_based_stream_permissions_reader import (
AbstractFileBasedStreamPermissionsReader,
)
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.file_types import default_parsers
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
Expand Down Expand Up @@ -100,8 +103,10 @@ def __init__(
cursor_cls: Type[
Union[AbstractConcurrentFileBasedCursor, AbstractFileBasedCursor]
] = FileBasedConcurrentCursor,
stream_permissions_reader: Optional[AbstractFileBasedStreamPermissionsReader] = None,
):
self.stream_reader = stream_reader
self.stream_permissions_reader = stream_permissions_reader
self.spec_class = spec_class
self.config = config
self.catalog = catalog
Expand Down Expand Up @@ -234,6 +239,8 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
try:
parsed_config = self._get_parsed_config(config)
self.stream_reader.config = parsed_config
if self.stream_permissions_reader:
self.stream_permissions_reader.config = parsed_config
streams: List[Stream] = []
for stream_config in parsed_config.streams:
# Like state_manager, `catalog_stream` may be None during `check`
Expand Down Expand Up @@ -337,9 +344,23 @@ def _make_default_stream(
preserve_directory_structure=preserve_directory_structure(parsed_config),
)

def _ensure_permissions_reader_available(self) -> None:
"""
Validates that a stream permissions reader is available.
Raises a ValueError if the reader is not provided.
"""
if not self.stream_permissions_reader:
raise ValueError(
"Stream permissions reader is required for streams that use permissions transfer mode."
)

def _make_permissions_stream(
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
) -> AbstractFileBasedStream:
"""
Creates a stream that reads permissions from files.
"""
self._ensure_permissions_reader_available()
return PermissionsFileBasedStream(
config=stream_config,
catalog_schema=self.stream_schemas.get(stream_config.name),
Expand All @@ -350,6 +371,7 @@ def _make_permissions_stream(
validation_policy=self._validate_and_get_validation_policy(stream_config),
errors_collector=self.errors_collector,
cursor=cursor,
stream_permissions_reader=self.stream_permissions_reader, # type: ignore
)

def _make_file_based_stream(
Expand All @@ -370,9 +392,10 @@ def _make_file_based_stream(
def _make_identities_stream(
self,
) -> Stream:
self._ensure_permissions_reader_available()
return FileIdentitiesStream(
catalog_schema=self.stream_schemas.get(FileIdentitiesStream.IDENTITIES_STREAM_NAME),
stream_reader=self.stream_reader,
stream_permissions_reader=self.stream_permissions_reader, # type: ignore
discovery_policy=self.discovery_policy,
errors_collector=self.errors_collector,
)
Expand Down
123 changes: 123 additions & 0 deletions airbyte_cdk/sources/file_based/file_based_stream_permissions_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterable, Optional

from airbyte_cdk.sources.file_based import AbstractFileBasedSpec
from airbyte_cdk.sources.file_based.remote_file import RemoteFile


class AbstractFileBasedStreamPermissionsReader(ABC):
"""
This class is responsible for reading file permissions and Identities from a source.
"""

def __init__(self) -> None:
self._config = None

@property
def config(self) -> Optional[AbstractFileBasedSpec]:
return self._config

@config.setter
@abstractmethod
def config(self, value: AbstractFileBasedSpec) -> None:
"""
FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader.

Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader
will require keys that (for example) allow it to authenticate with the 3rd party.

Therefore, concrete implementations of AbstractFileBasedStreamPermissionsReader's's config setter should assert that `value` is of the correct
config type for that type of StreamReader.
"""
...

@abstractmethod
def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]:
"""
This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it

e.g.
def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger):
api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
result = api_conn.get_file_permissions_info(file.id)
return MyPermissionsModel(
id=result["id"],
access_control_list = result["access_control_list"],
is_public = result["is_public"],
).dict()
"""
...

@abstractmethod
def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
"""
This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists.

e.g.
def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
users_api = api_conn.users()
groups_api = api_conn.groups()
members_api = self.google_directory_service.members()
for user in users_api.list():
yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict()
for group in groups_api.list():
group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict()
for member in members_api.list(group=group):
group_obj.member_email_addresses = group_obj.member_email_addresses or []
group_obj.member_email_addresses.append(member.email)
yield group_obj.dict()
"""
...

@property
@abstractmethod
def file_permissions_schema(self) -> Dict[str, Any]:
"""
This function should return the permissions schema for file permissions stream.

e.g.
def file_permissions_schema(self) -> Dict[str, Any]:
# you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json
return {
"type": "object",
"properties": {
"id": { "type": "string" },
"file_path": { "type": "string" },
"access_control_list": {
"type": "array",
"items": { "type": "string" }
},
"publicly_accessible": { "type": "boolean" }
}
}
"""
...

@property
@abstractmethod
def identities_schema(self) -> Dict[str, Any]:
"""
This function should return the identities schema for file identity stream.

e.g.
def identities_schema(self) -> Dict[str, Any]:
# you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json
return {
"type": "object",
"properties": {
"id": { "type": "string" },
"remote_id": { "type": "string" },
"name": { "type": ["null", "string"] },
"email_address": { "type": ["null", "string"] },
"member_email_addresses": { "type": ["null", "array"] },
"type": { "type": "string" },
}
}
"""
...
94 changes: 0 additions & 94 deletions airbyte_cdk/sources/file_based/file_based_stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,97 +184,3 @@ def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> Li
makedirs(path.dirname(local_file_path), exist_ok=True)
absolute_file_path = path.abspath(local_file_path)
return [file_relative_path, local_file_path, absolute_file_path]

@abstractmethod
def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]:
"""
This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it

e.g.
def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger):
api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
result = api_conn.get_file_permissions_info(file.id)
return MyPermissionsModel(
id=result["id"],
access_control_list = result["access_control_list"],
is_public = result["is_public"],
).dict()
"""
raise NotImplementedError(
f"{self.__class__.__name__} does not implement get_file_acl_permissions(). To support ACL permissions, implement this method and update file_permissions_schema."
)

@abstractmethod
def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
"""
This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists.

e.g.
def load_identity_groups(self, logger: logging.Logger) -> Dict[str, Any]:
api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
users_api = api_conn.users()
groups_api = api_conn.groups()
members_api = self.google_directory_service.members()
for user in users_api.list():
yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict()
for group in groups_api.list():
group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict()
for member in members_api.list(group=group):
group_obj.member_email_addresses = group_obj.member_email_addresses or []
group_obj.member_email_addresses.append(member.email)
yield group_obj.dict()
"""
raise NotImplementedError(
f"{self.__class__.__name__} does not implement load_identity_groups(). To support identities, implement this method and update identities_schema."
)

@property
@abstractmethod
def file_permissions_schema(self) -> Dict[str, Any]:
"""
This function should return the permissions schema for file permissions stream.

e.g.
def file_permissions_schema(self) -> Dict[str, Any]:
# you can also follow the patter we have for python connectors and have a json file and read from there e.g. schemas/identities.json
return {
"type": "object",
"properties": {
"id": { "type": "string" },
"file_path": { "type": "string" },
"access_control_list": {
"type": "array",
"items": { "type": "string" }
},
"publicly_accessible": { "type": "boolean" }
}
}
"""
raise NotImplementedError(
f"{self.__class__.__name__} does not implement file_permissions_schema, please return json schema for your permissions streams."
)

@property
@abstractmethod
def identities_schema(self) -> Dict[str, Any]:
"""
This function should return the identities schema for file identity stream.

e.g.
def identities_schema(self) -> Dict[str, Any]:
# you can also follow the patter we have for python connectors and have a json file and read from there e.g. schemas/identities.json
return {
"type": "object",
"properties": {
"id": { "type": "string" },
"remote_id": { "type": "string" },
"name": { "type": ["null", "string"] },
"email_address": { "type": ["null", "string"] },
"member_email_addresses": { "type": ["null", "array"] },
"type": { "type": "string" },
}
}
"""
raise NotImplementedError(
f"{self.__class__.__name__} does not implement identities_schema, please return json schema for your identities stream."
)
12 changes: 7 additions & 5 deletions airbyte_cdk/sources/file_based/stream/identities_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.file_based_stream_permissions_reader import (
AbstractFileBasedStreamPermissionsReader,
)
from airbyte_cdk.sources.streams.core import JsonSchema
from airbyte_cdk.sources.streams.permissions.identities_stream import IdentitiesStream

Expand All @@ -24,13 +26,13 @@ class FileIdentitiesStream(IdentitiesStream):
def __init__(
self,
catalog_schema: Optional[Mapping[str, Any]],
stream_reader: AbstractFileBasedStreamReader,
stream_permissions_reader: AbstractFileBasedStreamPermissionsReader,
discovery_policy: AbstractDiscoveryPolicy,
errors_collector: FileBasedErrorsCollector,
) -> None:
super().__init__()
self.catalog_schema = catalog_schema
self.stream_reader = stream_reader
self.stream_permissions_reader = stream_permissions_reader
self._discovery_policy = discovery_policy
self.errors_collector = errors_collector
self._cursor: MutableMapping[str, Any] = {}
Expand All @@ -40,8 +42,8 @@ def primary_key(self) -> PrimaryKeyType:
return None

def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
return self.stream_reader.load_identity_groups(logger=self.logger)
return self.stream_permissions_reader.load_identity_groups(logger=self.logger)

@cache
def get_json_schema(self) -> JsonSchema:
return self.stream_reader.identities_schema
return self.stream_permissions_reader.identities_schema
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.file_based.file_based_stream_permissions_reader import (
AbstractFileBasedStreamPermissionsReader,
)
from airbyte_cdk.sources.file_based.stream import DefaultFileBasedStream
from airbyte_cdk.sources.file_based.types import StreamSlice
from airbyte_cdk.sources.streams.core import JsonSchema
Expand All @@ -26,10 +29,16 @@ class PermissionsFileBasedStream(DefaultFileBasedStream):
and schema definition, while this class handles the streaming interface.
"""

def __init__(
self, stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, **kwargs: Any
):
super().__init__(**kwargs)
self.stream_permissions_reader = stream_permissions_reader

def _filter_schema_invalid_properties(
self, configured_catalog_json_schema: Dict[str, Any]
) -> Dict[str, Any]:
return self.stream_reader.file_permissions_schema
return self.stream_permissions_reader.file_permissions_schema

def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
"""
Expand All @@ -40,7 +49,7 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte
no_permissions = False
file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
try:
permissions_record = self.stream_reader.get_file_acl_permissions(
permissions_record = self.stream_permissions_reader.get_file_acl_permissions(
file, logger=self.logger
)
if not permissions_record:
Expand Down Expand Up @@ -82,4 +91,4 @@ def _get_raw_json_schema(self) -> JsonSchema:
Returns:
The file permissions schema that defines the structure of permission records
"""
return self.stream_reader.file_permissions_schema
return self.stream_permissions_reader.file_permissions_schema
Loading
Loading