Skip to content

Commit

Permalink
file-based: create new stream for acls so we can save from if-else pa…
Browse files Browse the repository at this point in the history
…in in default file based stream
  • Loading branch information
aldogonzalez8 committed Feb 12, 2025
1 parent b74a005 commit 2c08bb4
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 35 deletions.
36 changes: 32 additions & 4 deletions airbyte_cdk/sources/file_based/file_based_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
FileBasedFinalStateCursor,
)
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
from airbyte_cdk.sources.file_based.stream.permissions_file_based_stream import (
PermissionsFileBasedStream,
)
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository, MessageRepository
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
Expand Down Expand Up @@ -257,7 +260,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
message_repository=self.message_repository,
)
stream = FileBasedStreamFacade.create_from_stream(
stream=self._make_default_stream(
stream=self._make_file_based_stream(
stream_config=stream_config,
cursor=cursor,
parsed_config=parsed_config,
Expand Down Expand Up @@ -288,7 +291,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
CursorField(DefaultFileBasedStream.ab_last_mod_col),
)
stream = FileBasedStreamFacade.create_from_stream(
stream=self._make_default_stream(
stream=self._make_file_based_stream(
stream_config=stream_config,
cursor=cursor,
parsed_config=parsed_config,
Expand All @@ -300,7 +303,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
)
else:
cursor = self.cursor_cls(stream_config)
stream = self._make_default_stream(
stream = self._make_file_based_stream(
stream_config=stream_config,
cursor=cursor,
parsed_config=parsed_config,
Expand Down Expand Up @@ -334,9 +337,34 @@ def _make_default_stream(
cursor=cursor,
use_file_transfer=use_file_transfer(parsed_config),
preserve_directory_structure=preserve_directory_structure(parsed_config),
use_permissions_transfer=use_permissions_transfer(parsed_config),
)

def _make_permissions_stream(
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
) -> AbstractFileBasedStream:
return PermissionsFileBasedStream(
config=stream_config,
catalog_schema=self.stream_schemas.get(stream_config.name),
stream_reader=self.stream_reader,
availability_strategy=self.availability_strategy,
discovery_policy=self.discovery_policy,
parsers=self.parsers,
validation_policy=self._validate_and_get_validation_policy(stream_config),
errors_collector=self.errors_collector,
cursor=cursor,
)

def _make_file_based_stream(
self,
stream_config: FileBasedStreamConfig,
cursor: Optional[AbstractFileBasedCursor],
parsed_config: AbstractFileBasedSpec,
) -> AbstractFileBasedStream:
if use_permissions_transfer(parsed_config):
return self._make_permissions_stream(stream_config, cursor)
else:
return self._make_default_stream(stream_config, cursor, parsed_config)

def _make_identities_stream(
self,
) -> Stream:
Expand Down
31 changes: 0 additions & 31 deletions airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):

FILE_TRANSFER_KW = "use_file_transfer"
PRESERVE_DIRECTORY_STRUCTURE_KW = "preserve_directory_structure"
PERMISSIONS_TRANSFER_KW = "use_permissions_transfer"
FILES_KEY = "files"
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
ab_last_mod_col = "_ab_source_file_last_modified"
Expand All @@ -57,7 +56,6 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
airbyte_columns = [ab_last_mod_col, ab_file_name_col]
use_file_transfer = False
preserve_directory_structure = True
use_permissions_transfer = False

def __init__(self, **kwargs: Any):
if self.FILE_TRANSFER_KW in kwargs:
Expand All @@ -66,8 +64,6 @@ def __init__(self, **kwargs: Any):
self.preserve_directory_structure = kwargs.pop(
self.PRESERVE_DIRECTORY_STRUCTURE_KW, True
)
if self.PERMISSIONS_TRANSFER_KW in kwargs:
self.use_permissions_transfer = kwargs.pop(self.PERMISSIONS_TRANSFER_KW, False)
super().__init__(**kwargs)

@property
Expand Down Expand Up @@ -109,8 +105,6 @@ def _filter_schema_invalid_properties(
self.ab_file_name_col: {"type": "string"},
},
}
elif self.use_permissions_transfer:
return self.stream_reader.file_permissions_schema
else:
return super()._filter_schema_invalid_properties(configured_catalog_json_schema)

Expand Down Expand Up @@ -193,29 +187,6 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte
yield stream_data_to_airbyte_message(
self.name, record, is_file_transfer_message=True
)
elif self.use_permissions_transfer:
try:
permissions_record = self.stream_reader.get_file_acl_permissions(
file, logger=self.logger
)
permissions_record = self.transform_record(
permissions_record, file, file_datetime_string
)
yield stream_data_to_airbyte_message(
self.name, permissions_record, is_file_transfer_message=False
)
except Exception as e:
self.logger.error(
f"Failed to retrieve permissions for file {file.uri}: {str(e)}"
)
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(
level=Level.ERROR,
message=f"Error retrieving files permissions: stream={self.name} file={file.uri}",
stack_trace=traceback.format_exc(),
),
)
else:
for record in parser.parse_records(
self.config, file, self.stream_reader, self.logger, schema
Expand Down Expand Up @@ -313,8 +284,6 @@ def get_json_schema(self) -> JsonSchema:
def _get_raw_json_schema(self) -> JsonSchema:
if self.use_file_transfer:
return file_transfer_schema
elif self.use_permissions_transfer:
return self.stream_reader.file_permissions_schema
elif self.config.input_schema:
return self.config.get_input_schema() # type: ignore
elif self.config.schemaless:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

import traceback
from typing import Any, Dict, Iterable

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.file_based.types import StreamSlice
from airbyte_cdk.sources.streams.core import JsonSchema
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.sources.file_based.stream import DefaultFileBasedStream


class PermissionsFileBasedStream(DefaultFileBasedStream):
"""
The permissions stream, stream_reader on source handles logic for schemas and ACLs permissions.
"""

def _filter_schema_invalid_properties(
self, configured_catalog_json_schema: Dict[str, Any]
) -> Dict[str, Any]:
return self.stream_reader.file_permissions_schema

def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
"""
Yield permissions records from all remote files
"""
for file in stream_slice["files"]:
file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
try:
permissions_record = self.stream_reader.get_file_acl_permissions(
file, logger=self.logger
)
permissions_record = self.transform_record(
permissions_record, file, file_datetime_string
)
yield stream_data_to_airbyte_message(
self.name, permissions_record, is_file_transfer_message=False
)
except Exception as e:
self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}")
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(
level=Level.ERROR,
message=f"Error retrieving files permissions: stream={self.name} file={file.uri}",
stack_trace=traceback.format_exc(),
),
)

def _get_raw_json_schema(self) -> JsonSchema:
return self.stream_reader.file_permissions_schema

0 comments on commit 2c08bb4

Please sign in to comment.