Skip to content

Commit

Permalink
file-based: move permissions transfer mode to general with abstract I…
Browse files Browse the repository at this point in the history
…dentitie stream, so file based Identity stream becomes an implementation
  • Loading branch information
aldogonzalez8 committed Feb 11, 2025
1 parent a7081d3 commit 7e4d73f
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,10 @@

from airbyte_cdk import OneOfOptionConfig
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.specs.transfer_modes import DeliverPermissions
from airbyte_cdk.sources.utils import schema_helpers


class DeliverPermissions(BaseModel):
class Config(OneOfOptionConfig):
title = "Replicate Permissions ACL"
description = "Sends one identity stream and one for more permissions (ACL) streams to the destination. This data can be used in downstream systems to recreate permission restrictions mirroring the original source."
discriminator = "delivery_type"

delivery_type: Literal["use_permissions_transfer"] = Field(
"use_permissions_transfer", const=True
)

include_identities_stream: bool = Field(
title="Include Identity Stream",
description="This data can be used in downstream systems to recreate permission restrictions mirroring the original source",
default=True,
)


class DeliverRecords(BaseModel):
class Config(OneOfOptionConfig):
title = "Replicate Records"
Expand Down
9 changes: 4 additions & 5 deletions airbyte_cdk/sources/file_based/file_based_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
from airbyte_cdk.sources.file_based.stream import (
AbstractFileBasedStream,
DefaultFileBasedStream,
IdentitiesStream,
FileIdentities,
)
from airbyte_cdk.sources.file_based.stream.concurrent.adapters import FileBasedStreamFacade
from airbyte_cdk.sources.file_based.stream.concurrent.cursor import (
Expand All @@ -67,7 +67,6 @@
FileBasedFinalStateCursor,
)
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
from airbyte_cdk.sources.file_based.stream.identities_stream import IDENTITIES_STREAM_NAME
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository, MessageRepository
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
Expand Down Expand Up @@ -169,7 +168,7 @@ def check_connection(
errors = []
tracebacks = []
for stream in streams:
if isinstance(stream, IdentitiesStream):
if isinstance(stream, FileIdentities):
identity = next(iter(stream.load_identity_groups()))
if not identity:
errors.append(
Expand Down Expand Up @@ -341,8 +340,8 @@ def _make_default_stream(
def _make_identities_stream(
self,
) -> Stream:
return IdentitiesStream(
catalog_schema=self.stream_schemas.get(IDENTITIES_STREAM_NAME),
return FileIdentities(
catalog_schema=self.stream_schemas.get(FileIdentities.IDENTITIES_STREAM_NAME),
stream_reader=self.stream_reader,
discovery_policy=self.discovery_policy,
errors_collector=self.errors_collector,
Expand Down
4 changes: 2 additions & 2 deletions airbyte_cdk/sources/file_based/stream/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from airbyte_cdk.sources.file_based.stream.abstract_file_based_stream import AbstractFileBasedStream
from airbyte_cdk.sources.file_based.stream.default_file_based_stream import DefaultFileBasedStream
from airbyte_cdk.sources.file_based.stream.identities_stream import IdentitiesStream
from airbyte_cdk.sources.file_based.stream.identities_stream import FileIdentities

__all__ = ["AbstractFileBasedStream", "DefaultFileBasedStream", "IdentitiesStream"]
__all__ = ["AbstractFileBasedStream", "DefaultFileBasedStream", "FileIdentities"]
59 changes: 4 additions & 55 deletions airbyte_cdk/sources/file_based/stream/identities_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,18 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import traceback
from functools import cache
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional
from typing import Any, Dict, Iterable, Mapping, MutableMapping, Optional

from airbyte_protocol_dataclasses.models import SyncMode

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.streams.permissions.identities import Identities
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector, FileBasedSourceError
from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.types import StreamSlice
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.checkpoint import Cursor
from airbyte_cdk.sources.streams.core import JsonSchema
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

IDENTITIES_STREAM_NAME = "identities"


class IdentitiesStream(Stream):
class FileIdentities(Identities):
"""
The identities stream. A full refresh stream to sync identities from a certain domain.
The stream reader manage the logic to get such data, which is implemented on connector side.
Expand All @@ -46,53 +35,13 @@ def __init__(
self.errors_collector = errors_collector
self._cursor: MutableMapping[str, Any] = {}

@property
def state(self) -> MutableMapping[str, Any]:
return self._cursor

@state.setter
def state(self, value: MutableMapping[str, Any]) -> None:
"""State setter, accept state serialized by state getter."""
self._cursor = value

@property
def primary_key(self) -> PrimaryKeyType:
return None

def read_records(
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[StreamSlice] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
try:
identity_groups = self.load_identity_groups()
for record in identity_groups:
yield stream_data_to_airbyte_message(self.name, record)
except AirbyteTracedException as exc:
# Re-raise the exception to stop the whole sync immediately as this is a fatal error
raise exc
except Exception:
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(
level=Level.ERROR,
message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name}",
stack_trace=traceback.format_exc(),
),
)

def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
return self.stream_reader.load_identity_groups(logger=self.logger)

@cache
def get_json_schema(self) -> JsonSchema:
return self.stream_reader.REMOTE_FILE_IDENTITY_SCHEMA

@property
def name(self) -> str:
return IDENTITIES_STREAM_NAME

def get_cursor(self) -> Optional[Cursor]:
return None
25 changes: 25 additions & 0 deletions airbyte_cdk/sources/specs/transfer_modes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from typing import Literal

from pydantic.v1 import AnyUrl, BaseModel, Field
from airbyte_cdk import OneOfOptionConfig


class DeliverPermissions(BaseModel):
class Config(OneOfOptionConfig):
title = "Replicate Permissions ACL"
description = "Sends one identity stream and one for more permissions (ACL) streams to the destination. This data can be used in downstream systems to recreate permission restrictions mirroring the original source."
discriminator = "delivery_type"

delivery_type: Literal["use_permissions_transfer"] = Field(
"use_permissions_transfer", const=True
)

include_identities_stream: bool = Field(
title="Include Identity Stream",
description="This data can be used in downstream systems to recreate permission restrictions mirroring the original source",
default=True,
)
81 changes: 81 additions & 0 deletions airbyte_cdk/sources/streams/permissions/identities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import traceback
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional

from airbyte_protocol_dataclasses.models import SyncMode

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.checkpoint import Cursor
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

DEFAULT_IDENTITIES_STREAM_NAME = "identities"


class Identities(Stream, ABC):
"""
The identities stream. A full refresh stream to sync identities from a certain domain.
The load_identity_groups method manage the logic to get such data.
"""

IDENTITIES_STREAM_NAME = DEFAULT_IDENTITIES_STREAM_NAME

is_resumable = False

def __init__(
self,
catalog_schema: Optional[Mapping[str, Any]],
):
super().__init__()
self.catalog_schema = catalog_schema
self._cursor: MutableMapping[str, Any] = {}

@property
def state(self) -> MutableMapping[str, Any]:
return self._cursor

@state.setter
def state(self, value: MutableMapping[str, Any]) -> None:
"""State setter, accept state serialized by state getter."""
self._cursor = value

def read_records(
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
try:
identity_groups = self.load_identity_groups()
for record in identity_groups:
yield stream_data_to_airbyte_message(self.name, record)
except AirbyteTracedException as exc:
# Re-raise the exception to stop the whole sync immediately as this is a fatal error
raise exc
except Exception as e:
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(
level=Level.ERROR,
message=f"Error trying to read identities: {e} stream={self.name}",
stack_trace=traceback.format_exc(),
),
)

@abstractmethod
def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
raise NotImplementedError("Implement this method to read identity records")

@property
def name(self) -> str:
return self.IDENTITIES_STREAM_NAME

def get_cursor(self) -> Optional[Cursor]:
return None

0 comments on commit 7e4d73f

Please sign in to comment.