Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(file-based): sync file acl permissions and identities #260

Merged
merged 51 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
79c5f40
file-based: initial implementation to sync metadarecords
aldogonzalez8 Jan 23, 2025
4638f89
file-based: fix lint
aldogonzalez8 Jan 23, 2025
266c0cd
file-based: fix errors
aldogonzalez8 Jan 23, 2025
7bfb8c3
Auto-fix lint and format issues
Jan 23, 2025
88af543
file-based: remove abstract decorator
aldogonzalez8 Jan 23, 2025
7e6ca59
Merge branch 'main' into aldogonzalez8/sync-metadata-records2
aaronsteers Jan 24, 2025
edd6f69
file-based: fix check
aldogonzalez8 Jan 24, 2025
35e0e68
file-based: add identities stream and rename acl toggle
aldogonzalez8 Jan 26, 2025
0ae4267
Auto-fix lint and format issues
Jan 26, 2025
43e3ea3
file-based: fix annoying mypy issues
aldogonzalez8 Jan 26, 2025
4aee2c9
file-based: minor fix to schema
aldogonzalez8 Jan 26, 2025
7b5c245
file-based: add logger to load_identity_groups method
aldogonzalez8 Jan 27, 2025
f022b4c
file-based: simplify sync permissions schema
aldogonzalez8 Jan 27, 2025
597e458
file-based: remove unused config and fix unit tests
aldogonzalez8 Jan 27, 2025
2430eae
file-based: format record to have file last modified data
aldogonzalez8 Jan 29, 2025
24a93ba
file-based: create three toggle instead of option below transfer records
aldogonzalez8 Feb 3, 2025
40c1787
file-based: fix csv test
aldogonzalez8 Feb 3, 2025
36e0bca
Auto-fix lint and format issues
Feb 3, 2025
d30b1ad
file-based: make new methods abstract
aldogonzalez8 Feb 3, 2025
b5cf88c
file-based: add check for identities stream
aldogonzalez8 Feb 5, 2025
2a04f12
file-based: ruff format
aldogonzalez8 Feb 5, 2025
4c18889
Auto-fix lint and format issues
Feb 5, 2025
4e5bb90
Merge branch 'main' into aldogonzalez8/sync-metadata-records2
aldogonzalez8 Feb 5, 2025
1271e3a
file-based: add notImplemented error
aldogonzalez8 Feb 5, 2025
4105c3c
file-based: fix unit tests
aldogonzalez8 Feb 11, 2025
a6d1b62
file-based: add NotImplementedError
aldogonzalez8 Feb 11, 2025
a7081d3
file-based: allow connector to provide permissions and identities sch…
aldogonzalez8 Feb 11, 2025
7e4d73f
file-based: move permissions transfer mode to general with abstract I…
aldogonzalez8 Feb 11, 2025
0f5cc5f
Auto-fix lint and format issues
Feb 11, 2025
a36ec7d
file-based: remove unnecesary param in Abstract Identities
aldogonzalez8 Feb 11, 2025
ad4c49c
file-based: fix init return types
aldogonzalez8 Feb 11, 2025
750935c
file-based: update interfaces to obtain schemas for file_permissions …
aldogonzalez8 Feb 11, 2025
d4f2b1d
file-based: update transfer mode validations
aldogonzalez8 Feb 11, 2025
4962949
file-based: update docstrings for abstract methos
aldogonzalez8 Feb 11, 2025
ea31a2d
file-based: add method in stream reader to obtain Identities schema
aldogonzalez8 Feb 11, 2025
428c669
Auto-fix lint and format issues
Feb 11, 2025
791241c
file-based: implement get schema method/proerty in unit tests
aldogonzalez8 Feb 11, 2025
b74a005
file-based: update messages for not implemented errors
aldogonzalez8 Feb 11, 2025
2c08bb4
file-based: create new stream for acls so we can save from if-else pa…
aldogonzalez8 Feb 12, 2025
a71eb08
Auto-fix lint and format issues
Feb 12, 2025
a41934a
file-based: add unit tests for Permissions Stream
aldogonzalez8 Feb 12, 2025
b764d46
file-based: add more tests for permissions file based stream
aldogonzalez8 Feb 13, 2025
9566b3c
file-based: add docstrings to some methods
aldogonzalez8 Feb 13, 2025
8612d4b
file-based: add docstrings to some methods
aldogonzalez8 Feb 13, 2025
c94b704
file-based: user better example of schema for file permissions in test
aldogonzalez8 Feb 13, 2025
722c7e0
file-based: add tests for identities stream
aldogonzalez8 Feb 13, 2025
54d5ec6
Auto-fix lint and format issues
Feb 13, 2025
afdd60a
file-based: minor change in unit tests
aldogonzalez8 Feb 13, 2025
7b2ffce
file-based: move test to correct folder
aldogonzalez8 Feb 13, 2025
1b729d7
file-based: rename streams to follow pattern and add more docs to met…
aldogonzalez8 Feb 14, 2025
7924c3d
file-based: rename stream file to correct pattern
aldogonzalez8 Feb 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from airbyte_cdk import OneOfOptionConfig
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.specs.transfer_modes import DeliverPermissions
from airbyte_cdk.sources.utils import schema_helpers


Expand Down Expand Up @@ -65,7 +66,7 @@ class AbstractFileBasedSpec(BaseModel):
order=10,
)

delivery_method: Union[DeliverRecords, DeliverRawFiles] = Field(
delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field(
title="Delivery Method",
discriminator="delivery_type",
type="object",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import (
AbstractFileBasedSpec,
DeliverRawFiles,
)
from airbyte_cdk.sources.specs.transfer_modes import DeliverPermissions

DELIVERY_TYPE_KEY = "delivery_type"
DELIVERY_TYPE_PERMISSION_TRANSFER_MODE_VALUE = "use_permissions_transfer"
DELIVERY_TYPE_FILES_TRANSFER_MODE_VALUE = "use_file_transfer"
PRESERVE_DIRECTORY_STRUCTURE_KEY = "preserve_directory_structure"
INCLUDE_IDENTITIES_STREAM_KEY = "include_identities_stream"


def use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool:
"""Returns `True` if the configuration uses file transfer mode."""
return (
hasattr(parsed_config.delivery_method, DELIVERY_TYPE_KEY)
and parsed_config.delivery_method.delivery_type == DELIVERY_TYPE_FILES_TRANSFER_MODE_VALUE
)


def preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool:
"""
Determines whether to preserve directory structure during file transfer.

When enabled, files maintain their subdirectory paths in the destination.
When disabled, files are flattened to the root of the destination.

Args:
parsed_config: The parsed configuration containing delivery method settings

Returns:
True if directory structure should be preserved (default), False otherwise
"""
if (
use_file_transfer(parsed_config)
and hasattr(parsed_config.delivery_method, PRESERVE_DIRECTORY_STRUCTURE_KEY)
and isinstance(parsed_config.delivery_method, DeliverRawFiles)
):
return parsed_config.delivery_method.preserve_directory_structure
return True


def use_permissions_transfer(parsed_config: AbstractFileBasedSpec) -> bool:
"""
Determines whether to use permissions transfer to sync ACLs and Identities

Args:
parsed_config: The parsed configuration containing delivery method settings

Returns:
True if permissions transfer should be enabled, False otherwise
"""
return (
hasattr(parsed_config.delivery_method, DELIVERY_TYPE_KEY)
and parsed_config.delivery_method.delivery_type
== DELIVERY_TYPE_PERMISSION_TRANSFER_MODE_VALUE
)


def include_identities_stream(parsed_config: AbstractFileBasedSpec) -> bool:
"""
There are scenarios where user may not have access to identities but still is valuable to get ACLs

Args:
parsed_config: The parsed configuration containing delivery method settings

Returns:
True if we should include Identities stream.
"""
if (
use_permissions_transfer(parsed_config)
and hasattr(parsed_config.delivery_method, INCLUDE_IDENTITIES_STREAM_KEY)
and isinstance(parsed_config.delivery_method, DeliverPermissions)
):
return parsed_config.delivery_method.include_identities_stream
return False
107 changes: 70 additions & 37 deletions airbyte_cdk/sources/file_based/file_based_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
FileBasedStreamConfig,
ValidationPolicy,
)
from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import (
include_identities_stream,
preserve_directory_structure,
use_file_transfer,
use_permissions_transfer,
)
from airbyte_cdk.sources.file_based.discovery_policy import (
AbstractDiscoveryPolicy,
DefaultDiscoveryPolicy,
Expand All @@ -49,7 +55,12 @@
DEFAULT_SCHEMA_VALIDATION_POLICIES,
AbstractSchemaValidationPolicy,
)
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream, DefaultFileBasedStream
from airbyte_cdk.sources.file_based.stream import (
AbstractFileBasedStream,
DefaultFileBasedStream,
FileIdentitiesStream,
PermissionsFileBasedStream,
)
from airbyte_cdk.sources.file_based.stream.concurrent.adapters import FileBasedStreamFacade
from airbyte_cdk.sources.file_based.stream.concurrent.cursor import (
AbstractConcurrentFileBasedCursor,
Expand All @@ -66,6 +77,7 @@
DEFAULT_CONCURRENCY = 100
MAX_CONCURRENCY = 100
INITIAL_N_PARTITIONS = MAX_CONCURRENCY // 2
IDENTITIES_STREAM = "identities"


class FileBasedSource(ConcurrentSourceAdapter, ABC):
Expand Down Expand Up @@ -157,13 +169,20 @@ def check_connection(
errors = []
tracebacks = []
for stream in streams:
if isinstance(stream, FileIdentitiesStream):
identity = next(iter(stream.load_identity_groups()))
if not identity:
errors.append(
"Unable to get identities for current configuration, please check your credentials"
)
continue
if not isinstance(stream, AbstractFileBasedStream):
raise ValueError(f"Stream {stream} is not a file-based stream.")
try:
parsed_config = self._get_parsed_config(config)
availability_method = (
stream.availability_strategy.check_availability
if self._use_file_transfer(parsed_config)
if use_file_transfer(parsed_config) or use_permissions_transfer(parsed_config)
else stream.availability_strategy.check_availability_and_parsability
)
(
Expand Down Expand Up @@ -239,7 +258,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
message_repository=self.message_repository,
)
stream = FileBasedStreamFacade.create_from_stream(
stream=self._make_default_stream(
stream=self._make_file_based_stream(
stream_config=stream_config,
cursor=cursor,
parsed_config=parsed_config,
Expand Down Expand Up @@ -270,7 +289,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
CursorField(DefaultFileBasedStream.ab_last_mod_col),
)
stream = FileBasedStreamFacade.create_from_stream(
stream=self._make_default_stream(
stream=self._make_file_based_stream(
stream_config=stream_config,
cursor=cursor,
parsed_config=parsed_config,
Expand All @@ -282,13 +301,17 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
)
else:
cursor = self.cursor_cls(stream_config)
stream = self._make_default_stream(
stream = self._make_file_based_stream(
stream_config=stream_config,
cursor=cursor,
parsed_config=parsed_config,
)

streams.append(stream)

if include_identities_stream(parsed_config):
identities_stream = self._make_identities_stream()
streams.append(identities_stream)
return streams

except ValidationError as exc:
Expand All @@ -310,8 +333,48 @@ def _make_default_stream(
validation_policy=self._validate_and_get_validation_policy(stream_config),
errors_collector=self.errors_collector,
cursor=cursor,
use_file_transfer=self._use_file_transfer(parsed_config),
preserve_directory_structure=self._preserve_directory_structure(parsed_config),
use_file_transfer=use_file_transfer(parsed_config),
preserve_directory_structure=preserve_directory_structure(parsed_config),
)

def _make_permissions_stream(
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
) -> AbstractFileBasedStream:
return PermissionsFileBasedStream(
config=stream_config,
catalog_schema=self.stream_schemas.get(stream_config.name),
stream_reader=self.stream_reader,
availability_strategy=self.availability_strategy,
discovery_policy=self.discovery_policy,
parsers=self.parsers,
validation_policy=self._validate_and_get_validation_policy(stream_config),
errors_collector=self.errors_collector,
cursor=cursor,
)

def _make_file_based_stream(
self,
stream_config: FileBasedStreamConfig,
cursor: Optional[AbstractFileBasedCursor],
parsed_config: AbstractFileBasedSpec,
) -> AbstractFileBasedStream:
"""
Creates different streams depending on the type of the transfer mode selected
"""
if use_permissions_transfer(parsed_config):
return self._make_permissions_stream(stream_config, cursor)
# we should have a stream for File transfer mode to decouple from DefaultFileBasedStream
else:
return self._make_default_stream(stream_config, cursor, parsed_config)

def _make_identities_stream(
self,
) -> Stream:
return FileIdentitiesStream(
catalog_schema=self.stream_schemas.get(FileIdentitiesStream.IDENTITIES_STREAM_NAME),
stream_reader=self.stream_reader,
discovery_policy=self.discovery_policy,
errors_collector=self.errors_collector,
)

def _get_stream_from_catalog(
Expand Down Expand Up @@ -378,33 +441,3 @@ def _validate_input_schema(self, stream_config: FileBasedStreamConfig) -> None:
"`input_schema` and `schemaless` options cannot both be set",
model=FileBasedStreamConfig,
)

@staticmethod
def _use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool:
use_file_transfer = (
hasattr(parsed_config.delivery_method, "delivery_type")
and parsed_config.delivery_method.delivery_type == "use_file_transfer"
)
return use_file_transfer

@staticmethod
def _preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool:
"""
Determines whether to preserve directory structure during file transfer.

When enabled, files maintain their subdirectory paths in the destination.
When disabled, files are flattened to the root of the destination.

Args:
parsed_config: The parsed configuration containing delivery method settings

Returns:
True if directory structure should be preserved (default), False otherwise
"""
if (
FileBasedSource._use_file_transfer(parsed_config)
and hasattr(parsed_config.delivery_method, "preserve_directory_structure")
and parsed_config.delivery_method.preserve_directory_structure is not None
):
return parsed_config.delivery_method.preserve_directory_structure
return True
Loading
Loading