Skip to content

Commit f0a57f1

Browse files
feat(file-based): new AbstractFileBasedStreamPermissionsReader (#402)
1 parent 741a2a0 commit f0a57f1

7 files changed

+186
-113
lines changed

airbyte_cdk/sources/file_based/file_based_source.py

+24-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848
FileBasedErrorsCollector,
4949
FileBasedSourceError,
5050
)
51+
from airbyte_cdk.sources.file_based.file_based_stream_permissions_reader import (
52+
AbstractFileBasedStreamPermissionsReader,
53+
)
5154
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
5255
from airbyte_cdk.sources.file_based.file_types import default_parsers
5356
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
@@ -100,8 +103,10 @@ def __init__(
100103
cursor_cls: Type[
101104
Union[AbstractConcurrentFileBasedCursor, AbstractFileBasedCursor]
102105
] = FileBasedConcurrentCursor,
106+
stream_permissions_reader: Optional[AbstractFileBasedStreamPermissionsReader] = None,
103107
):
104108
self.stream_reader = stream_reader
109+
self.stream_permissions_reader = stream_permissions_reader
105110
self.spec_class = spec_class
106111
self.config = config
107112
self.catalog = catalog
@@ -234,6 +239,8 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
234239
try:
235240
parsed_config = self._get_parsed_config(config)
236241
self.stream_reader.config = parsed_config
242+
if self.stream_permissions_reader:
243+
self.stream_permissions_reader.config = parsed_config
237244
streams: List[Stream] = []
238245
for stream_config in parsed_config.streams:
239246
# Like state_manager, `catalog_stream` may be None during `check`
@@ -337,9 +344,23 @@ def _make_default_stream(
337344
preserve_directory_structure=preserve_directory_structure(parsed_config),
338345
)
339346

347+
def _ensure_permissions_reader_available(self) -> None:
348+
"""
349+
Validates that a stream permissions reader is available.
350+
Raises a ValueError if the reader is not provided.
351+
"""
352+
if not self.stream_permissions_reader:
353+
raise ValueError(
354+
"Stream permissions reader is required for streams that use permissions transfer mode."
355+
)
356+
340357
def _make_permissions_stream(
341358
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
342359
) -> AbstractFileBasedStream:
360+
"""
361+
Creates a stream that reads permissions from files.
362+
"""
363+
self._ensure_permissions_reader_available()
343364
return PermissionsFileBasedStream(
344365
config=stream_config,
345366
catalog_schema=self.stream_schemas.get(stream_config.name),
@@ -350,6 +371,7 @@ def _make_permissions_stream(
350371
validation_policy=self._validate_and_get_validation_policy(stream_config),
351372
errors_collector=self.errors_collector,
352373
cursor=cursor,
374+
stream_permissions_reader=self.stream_permissions_reader, # type: ignore
353375
)
354376

355377
def _make_file_based_stream(
@@ -370,9 +392,10 @@ def _make_file_based_stream(
370392
def _make_identities_stream(
371393
self,
372394
) -> Stream:
395+
self._ensure_permissions_reader_available()
373396
return FileIdentitiesStream(
374397
catalog_schema=self.stream_schemas.get(FileIdentitiesStream.IDENTITIES_STREAM_NAME),
375-
stream_reader=self.stream_reader,
398+
stream_permissions_reader=self.stream_permissions_reader, # type: ignore
376399
discovery_policy=self.discovery_policy,
377400
errors_collector=self.errors_collector,
378401
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import logging
6+
from abc import ABC, abstractmethod
7+
from typing import Any, Dict, Iterable, Optional
8+
9+
from airbyte_cdk.sources.file_based import AbstractFileBasedSpec
10+
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
11+
12+
13+
class AbstractFileBasedStreamPermissionsReader(ABC):
14+
"""
15+
This class is responsible for reading file permissions and Identities from a source.
16+
"""
17+
18+
def __init__(self) -> None:
19+
self._config = None
20+
21+
@property
22+
def config(self) -> Optional[AbstractFileBasedSpec]:
23+
return self._config
24+
25+
@config.setter
26+
@abstractmethod
27+
def config(self, value: AbstractFileBasedSpec) -> None:
28+
"""
29+
FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader.
30+
31+
Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader
32+
will require keys that (for example) allow it to authenticate with the 3rd party.
33+
34+
Therefore, concrete implementations of AbstractFileBasedStreamPermissionsReader's's config setter should assert that `value` is of the correct
35+
config type for that type of StreamReader.
36+
"""
37+
...
38+
39+
@abstractmethod
40+
def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]:
41+
"""
42+
This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it
43+
44+
e.g.
45+
def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger):
46+
api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
47+
result = api_conn.get_file_permissions_info(file.id)
48+
return MyPermissionsModel(
49+
id=result["id"],
50+
access_control_list = result["access_control_list"],
51+
is_public = result["is_public"],
52+
).dict()
53+
"""
54+
...
55+
56+
@abstractmethod
57+
def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
58+
"""
59+
This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists.
60+
61+
e.g.
62+
def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
63+
api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
64+
users_api = api_conn.users()
65+
groups_api = api_conn.groups()
66+
members_api = self.google_directory_service.members()
67+
for user in users_api.list():
68+
yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict()
69+
for group in groups_api.list():
70+
group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict()
71+
for member in members_api.list(group=group):
72+
group_obj.member_email_addresses = group_obj.member_email_addresses or []
73+
group_obj.member_email_addresses.append(member.email)
74+
yield group_obj.dict()
75+
"""
76+
...
77+
78+
@property
79+
@abstractmethod
80+
def file_permissions_schema(self) -> Dict[str, Any]:
81+
"""
82+
This function should return the permissions schema for file permissions stream.
83+
84+
e.g.
85+
def file_permissions_schema(self) -> Dict[str, Any]:
86+
# you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json
87+
return {
88+
"type": "object",
89+
"properties": {
90+
"id": { "type": "string" },
91+
"file_path": { "type": "string" },
92+
"access_control_list": {
93+
"type": "array",
94+
"items": { "type": "string" }
95+
},
96+
"publicly_accessible": { "type": "boolean" }
97+
}
98+
}
99+
"""
100+
...
101+
102+
@property
103+
@abstractmethod
104+
def identities_schema(self) -> Dict[str, Any]:
105+
"""
106+
This function should return the identities schema for file identity stream.
107+
108+
e.g.
109+
def identities_schema(self) -> Dict[str, Any]:
110+
# you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json
111+
return {
112+
"type": "object",
113+
"properties": {
114+
"id": { "type": "string" },
115+
"remote_id": { "type": "string" },
116+
"name": { "type": ["null", "string"] },
117+
"email_address": { "type": ["null", "string"] },
118+
"member_email_addresses": { "type": ["null", "array"] },
119+
"type": { "type": "string" },
120+
}
121+
}
122+
"""
123+
...

airbyte_cdk/sources/file_based/file_based_stream_reader.py

-94
Original file line numberDiff line numberDiff line change
@@ -184,97 +184,3 @@ def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> Li
184184
makedirs(path.dirname(local_file_path), exist_ok=True)
185185
absolute_file_path = path.abspath(local_file_path)
186186
return [file_relative_path, local_file_path, absolute_file_path]
187-
188-
@abstractmethod
189-
def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]:
190-
"""
191-
This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it
192-
193-
e.g.
194-
def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger):
195-
api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
196-
result = api_conn.get_file_permissions_info(file.id)
197-
return MyPermissionsModel(
198-
id=result["id"],
199-
access_control_list = result["access_control_list"],
200-
is_public = result["is_public"],
201-
).dict()
202-
"""
203-
raise NotImplementedError(
204-
f"{self.__class__.__name__} does not implement get_file_acl_permissions(). To support ACL permissions, implement this method and update file_permissions_schema."
205-
)
206-
207-
@abstractmethod
208-
def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
209-
"""
210-
This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists.
211-
212-
e.g.
213-
def load_identity_groups(self, logger: logging.Logger) -> Dict[str, Any]:
214-
api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
215-
users_api = api_conn.users()
216-
groups_api = api_conn.groups()
217-
members_api = self.google_directory_service.members()
218-
for user in users_api.list():
219-
yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict()
220-
for group in groups_api.list():
221-
group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict()
222-
for member in members_api.list(group=group):
223-
group_obj.member_email_addresses = group_obj.member_email_addresses or []
224-
group_obj.member_email_addresses.append(member.email)
225-
yield group_obj.dict()
226-
"""
227-
raise NotImplementedError(
228-
f"{self.__class__.__name__} does not implement load_identity_groups(). To support identities, implement this method and update identities_schema."
229-
)
230-
231-
@property
232-
@abstractmethod
233-
def file_permissions_schema(self) -> Dict[str, Any]:
234-
"""
235-
This function should return the permissions schema for file permissions stream.
236-
237-
e.g.
238-
def file_permissions_schema(self) -> Dict[str, Any]:
239-
# you can also follow the patter we have for python connectors and have a json file and read from there e.g. schemas/identities.json
240-
return {
241-
"type": "object",
242-
"properties": {
243-
"id": { "type": "string" },
244-
"file_path": { "type": "string" },
245-
"access_control_list": {
246-
"type": "array",
247-
"items": { "type": "string" }
248-
},
249-
"publicly_accessible": { "type": "boolean" }
250-
}
251-
}
252-
"""
253-
raise NotImplementedError(
254-
f"{self.__class__.__name__} does not implement file_permissions_schema, please return json schema for your permissions streams."
255-
)
256-
257-
@property
258-
@abstractmethod
259-
def identities_schema(self) -> Dict[str, Any]:
260-
"""
261-
This function should return the identities schema for file identity stream.
262-
263-
e.g.
264-
def identities_schema(self) -> Dict[str, Any]:
265-
# you can also follow the patter we have for python connectors and have a json file and read from there e.g. schemas/identities.json
266-
return {
267-
"type": "object",
268-
"properties": {
269-
"id": { "type": "string" },
270-
"remote_id": { "type": "string" },
271-
"name": { "type": ["null", "string"] },
272-
"email_address": { "type": ["null", "string"] },
273-
"member_email_addresses": { "type": ["null", "array"] },
274-
"type": { "type": "string" },
275-
}
276-
}
277-
"""
278-
raise NotImplementedError(
279-
f"{self.__class__.__name__} does not implement identities_schema, please return json schema for your identities stream."
280-
)

airbyte_cdk/sources/file_based/stream/identities_stream.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
99
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
1010
from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector
11-
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
11+
from airbyte_cdk.sources.file_based.file_based_stream_permissions_reader import (
12+
AbstractFileBasedStreamPermissionsReader,
13+
)
1214
from airbyte_cdk.sources.streams.core import JsonSchema
1315
from airbyte_cdk.sources.streams.permissions.identities_stream import IdentitiesStream
1416

@@ -24,13 +26,13 @@ class FileIdentitiesStream(IdentitiesStream):
2426
def __init__(
2527
self,
2628
catalog_schema: Optional[Mapping[str, Any]],
27-
stream_reader: AbstractFileBasedStreamReader,
29+
stream_permissions_reader: AbstractFileBasedStreamPermissionsReader,
2830
discovery_policy: AbstractDiscoveryPolicy,
2931
errors_collector: FileBasedErrorsCollector,
3032
) -> None:
3133
super().__init__()
3234
self.catalog_schema = catalog_schema
33-
self.stream_reader = stream_reader
35+
self.stream_permissions_reader = stream_permissions_reader
3436
self._discovery_policy = discovery_policy
3537
self.errors_collector = errors_collector
3638
self._cursor: MutableMapping[str, Any] = {}
@@ -40,8 +42,8 @@ def primary_key(self) -> PrimaryKeyType:
4042
return None
4143

4244
def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
43-
return self.stream_reader.load_identity_groups(logger=self.logger)
45+
return self.stream_permissions_reader.load_identity_groups(logger=self.logger)
4446

4547
@cache
4648
def get_json_schema(self) -> JsonSchema:
47-
return self.stream_reader.identities_schema
49+
return self.stream_permissions_reader.identities_schema

airbyte_cdk/sources/file_based/stream/permissions_file_based_stream.py

+12-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77

88
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level
99
from airbyte_cdk.models import Type as MessageType
10+
from airbyte_cdk.sources.file_based.file_based_stream_permissions_reader import (
11+
AbstractFileBasedStreamPermissionsReader,
12+
)
1013
from airbyte_cdk.sources.file_based.stream import DefaultFileBasedStream
1114
from airbyte_cdk.sources.file_based.types import StreamSlice
1215
from airbyte_cdk.sources.streams.core import JsonSchema
@@ -26,10 +29,16 @@ class PermissionsFileBasedStream(DefaultFileBasedStream):
2629
and schema definition, while this class handles the streaming interface.
2730
"""
2831

32+
def __init__(
33+
self, stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, **kwargs: Any
34+
):
35+
super().__init__(**kwargs)
36+
self.stream_permissions_reader = stream_permissions_reader
37+
2938
def _filter_schema_invalid_properties(
3039
self, configured_catalog_json_schema: Dict[str, Any]
3140
) -> Dict[str, Any]:
32-
return self.stream_reader.file_permissions_schema
41+
return self.stream_permissions_reader.file_permissions_schema
3342

3443
def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
3544
"""
@@ -40,7 +49,7 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte
4049
no_permissions = False
4150
file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
4251
try:
43-
permissions_record = self.stream_reader.get_file_acl_permissions(
52+
permissions_record = self.stream_permissions_reader.get_file_acl_permissions(
4453
file, logger=self.logger
4554
)
4655
if not permissions_record:
@@ -82,4 +91,4 @@ def _get_raw_json_schema(self) -> JsonSchema:
8291
Returns:
8392
The file permissions schema that defines the structure of permission records
8493
"""
85-
return self.stream_reader.file_permissions_schema
94+
return self.stream_permissions_reader.file_permissions_schema

0 commit comments

Comments
 (0)