-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathfile_based_stream_permissions_reader.py
123 lines (106 loc) · 4.93 KB
/
file_based_stream_permissions_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterable, Optional
from airbyte_cdk.sources.file_based import AbstractFileBasedSpec
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
class AbstractFileBasedStreamPermissionsReader(ABC):
"""
This class is responsible for reading file permissions and Identities from a source.
"""
def __init__(self) -> None:
self._config = None
@property
def config(self) -> Optional[AbstractFileBasedSpec]:
return self._config
@config.setter
@abstractmethod
def config(self, value: AbstractFileBasedSpec) -> None:
"""
FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader.
Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader
will require keys that (for example) allow it to authenticate with the 3rd party.
Therefore, concrete implementations of AbstractFileBasedStreamPermissionsReader's's config setter should assert that `value` is of the correct
config type for that type of StreamReader.
"""
...
@abstractmethod
def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]:
"""
This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it
e.g.
def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger):
api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
result = api_conn.get_file_permissions_info(file.id)
return MyPermissionsModel(
id=result["id"],
access_control_list = result["access_control_list"],
is_public = result["is_public"],
).dict()
"""
...
@abstractmethod
def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
"""
This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists.
e.g.
def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
api_conn = some_api.conn(credentials=SOME_CREDENTIALS)
users_api = api_conn.users()
groups_api = api_conn.groups()
members_api = self.google_directory_service.members()
for user in users_api.list():
yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict()
for group in groups_api.list():
group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict()
for member in members_api.list(group=group):
group_obj.member_email_addresses = group_obj.member_email_addresses or []
group_obj.member_email_addresses.append(member.email)
yield group_obj.dict()
"""
...
@property
@abstractmethod
def file_permissions_schema(self) -> Dict[str, Any]:
"""
This function should return the permissions schema for file permissions stream.
e.g.
def file_permissions_schema(self) -> Dict[str, Any]:
# you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json
return {
"type": "object",
"properties": {
"id": { "type": "string" },
"file_path": { "type": "string" },
"access_control_list": {
"type": "array",
"items": { "type": "string" }
},
"publicly_accessible": { "type": "boolean" }
}
}
"""
...
@property
@abstractmethod
def identities_schema(self) -> Dict[str, Any]:
"""
This function should return the identities schema for file identity stream.
e.g.
def identities_schema(self) -> Dict[str, Any]:
# you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json
return {
"type": "object",
"properties": {
"id": { "type": "string" },
"remote_id": { "type": "string" },
"name": { "type": ["null", "string"] },
"email_address": { "type": ["null", "string"] },
"member_email_addresses": { "type": ["null", "array"] },
"type": { "type": "string" },
}
}
"""
...