Skip to content

feat: Remove low-code cursor usage #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
46c4c88
Airbyte CDK: add should be synced
artem1205 Nov 18, 2024
32f6c3a
Merge remote-tracking branch 'origin/main' into artem1205/remove-low-…
artem1205 Nov 18, 2024
9c69e37
Airbyte CDK: fix per partition with global cursor should be synced
artem1205 Nov 18, 2024
1af1556
Airbyte CDK: add comment
artem1205 Nov 18, 2024
f278f7d
Airbyte CDK: ref tests to work with both concurrent and declarative c…
artem1205 Nov 18, 2024
95a31c0
Airbyte CDK: ref ClientSideIncrementalRecordFilterDecorator
artem1205 Nov 18, 2024
b488845
Airbyte CDK: fix mypy typing for reset
artem1205 Nov 19, 2024
32d2b28
CDK: fix test
artem1205 Nov 20, 2024
6842cc4
CDK: fix mypy
artem1205 Nov 20, 2024
0439a17
CDK: handle empty cursor
artem1205 Nov 25, 2024
3a2dfaf
CDK: add cursor test
artem1205 Nov 25, 2024
a714f87
CDK: merge declarative and concurrent Records
artem1205 Nov 26, 2024
858ce1e
CDK: add docstring
artem1205 Nov 27, 2024
74d7876
CDK: fix mypy
artem1205 Nov 27, 2024
27e0f41
CDK: fix mypy
artem1205 Nov 27, 2024
d4986f6
CDK: fix mypy
artem1205 Nov 27, 2024
3393a07
CDK: fix mypy
artem1205 Nov 27, 2024
96ca2a5
CDK: add comment
artem1205 Nov 28, 2024
411e413
Merge remote-tracking branch 'origin/main' into artem1205/remove-low-…
artem1205 Nov 28, 2024
e472905
CDK: sort imports
artem1205 Nov 28, 2024
00e50d8
CDK: fix PerPartitionWithGlobalCursor.should_be_synced
artem1205 Nov 28, 2024
d83fb25
CDK: add name to Record
artem1205 Nov 28, 2024
7ab7e06
CDK: set stream_name as required
artem1205 Nov 28, 2024
e7dcb55
CDK: remove deepcopy
artem1205 Nov 28, 2024
b529366
CDK: add sort option
artem1205 Nov 28, 2024
e62741f
CDK: log empty cursor field once per stream
artem1205 Nov 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel
from airbyte_cdk.sources.types import Record
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.slice_logger import SliceLogger
from airbyte_cdk.utils import AirbyteTracedException
Expand Down Expand Up @@ -147,11 +147,11 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
# AbstractStreams are expected to return data as they are expected.
# Any transformation on the data should be done before reaching this point
message = stream_data_to_airbyte_message(
stream_name=record.partition.stream_name(),
stream_name=record.stream_name,
data_or_message=record.data,
is_file_transfer_message=record.is_file_transfer_message,
)
stream = self._stream_name_to_instance[record.partition.stream_name()]
stream = self._stream_name_to_instance[record.stream_name]

if message.type == MessageType.RECORD:
if self._record_counter[stream.name] == 0:
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/concurrent_source/concurrent_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.concurrent.partitions.types import (
PartitionCompleteSentinel,
QueueItem,
)
from airbyte_cdk.sources.types import Record
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger


Expand Down
54 changes: 6 additions & 48 deletions airbyte_cdk/sources/declarative/extractors/record_filter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import datetime
from dataclasses import InitVar, dataclass
from typing import Any, Iterable, Mapping, Optional, Union

Expand All @@ -11,7 +10,7 @@
PerPartitionWithGlobalCursor,
)
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState


@dataclass
Expand Down Expand Up @@ -68,37 +67,21 @@ def __init__(
self._date_time_based_cursor = date_time_based_cursor
self._substream_cursor = substream_cursor

@property
def _cursor_field(self) -> str:
return self._date_time_based_cursor.cursor_field.eval(self._date_time_based_cursor.config) # type: ignore # eval returns a string in this context

@property
def _start_date_from_config(self) -> datetime.datetime:
return self._date_time_based_cursor._start_datetime.get_datetime(
self._date_time_based_cursor.config
)

@property
def _end_datetime(self) -> datetime.datetime:
return self._date_time_based_cursor.select_best_end_datetime()

def filter_records(
self,
records: Iterable[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
state_value = self._get_state_value(
stream_state, stream_slice or StreamSlice(partition={}, cursor_slice={})
)
filter_date: datetime.datetime = self._get_filter_date(state_value)
records = (
record
for record in records
if self._end_datetime
>= self._date_time_based_cursor.parse_date(record[self._cursor_field])
>= filter_date
if (self._substream_cursor or self._date_time_based_cursor).should_be_synced(
# Record is created on the fly to align with cursors interface; stream name is ignored as we don't need it here
# Record stream name is empty cause it is not used durig the filtering
Record(data=record, associated_slice=stream_slice, stream_name="")
)
)
if self.condition:
records = super().filter_records(
Expand All @@ -108,28 +91,3 @@ def filter_records(
next_page_token=next_page_token,
)
yield from records

def _get_state_value(
self, stream_state: StreamState, stream_slice: StreamSlice
) -> Optional[str]:
"""
Return cursor_value or None in case it was not found.
Cursor_value may be empty if:
1. It is an initial sync => no stream_state exist at all.
2. In Parent-child stream, and we already make initial sync, so stream_state is present.
During the second read, we receive one extra record from parent and therefore no stream_state for this record will be found.

:param StreamState stream_state: State
:param StreamSlice stream_slice: Current Stream slice
:return Optional[str]: cursor_value in case it was found, otherwise None.
"""
state = (self._substream_cursor or self._date_time_based_cursor).select_state(stream_slice)

return state.get(self._cursor_field) if state else None

def _get_filter_date(self, state_value: Optional[str]) -> datetime.datetime:
start_date_parsed = self._start_date_from_config
if state_value:
return max(start_date_parsed, self._date_time_based_cursor.parse_date(state_value))
else:
return start_date_parsed
35 changes: 31 additions & 4 deletions airbyte_cdk/sources/declarative/extractors/record_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, Optional
from typing import Any, Iterable, List, Mapping, Optional, Union

import requests

from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.models import SchemaNormalization
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
Expand Down Expand Up @@ -38,11 +39,34 @@ class RecordSelector(HttpSelector):
config: Config
parameters: InitVar[Mapping[str, Any]]
schema_normalization: TypeTransformer
name: str
_name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
record_filter: Optional[RecordFilter] = None
transformations: List[RecordTransformation] = field(default_factory=lambda: [])

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
self._name = (
InterpolatedString(self._name, parameters=parameters)
if isinstance(self._name, str)
else self._name
)

@property # type: ignore
def name(self) -> str:
"""
:return: Stream name
"""
return (
str(self._name.eval(self.config))
if isinstance(self._name, InterpolatedString)
else self._name
)

@name.setter
def name(self, value: str) -> None:
if not isinstance(value, property):
self._name = value

def select_records(
self,
Expand Down Expand Up @@ -86,7 +110,7 @@ def filter_and_transform(
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
for data in normalized_data:
yield Record(data, stream_slice)
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)

def _normalize_by_schema(
self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]
Expand Down Expand Up @@ -126,6 +150,9 @@ def _transform(
for record in records:
for transformation in self.transformations:
transformation.transform(
record, config=self.config, stream_state=stream_state, stream_slice=stream_slice
) # type: ignore # record has type Mapping[str, Any], but Dict[str, Any] expected
record, # type: ignore # record has type Mapping[str, Any], but Dict[str, Any] expected
config=self.config,
stream_state=stream_state,
stream_slice=stream_slice,
)
yield record
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,11 @@ def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
@staticmethod
def _convert_record_to_cursor_record(record: Record) -> Record:
return Record(
record.data,
StreamSlice(partition={}, cursor_slice=record.associated_slice.cursor_slice)
data=record.data,
stream_name=record.stream_name,
associated_slice=StreamSlice(
partition={}, cursor_slice=record.associated_slice.cursor_slice
)
if record.associated_slice
else None,
)
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,11 @@ def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
@staticmethod
def _convert_record_to_cursor_record(record: Record) -> Record:
return Record(
record.data,
StreamSlice(partition={}, cursor_slice=record.associated_slice.cursor_slice)
data=record.data,
stream_name=record.stream_name,
associated_slice=StreamSlice(
partition={}, cursor_slice=record.associated_slice.cursor_slice
)
if record.associated_slice
else None,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,7 @@ def get_request_body_json(
)

def should_be_synced(self, record: Record) -> bool:
return self._global_cursor.should_be_synced(
record
) or self._per_partition_cursor.should_be_synced(record)
return self._get_active_cursor().should_be_synced(record)

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
return self._global_cursor.is_greater_than_or_equal(first, second)
Original file line number Diff line number Diff line change
Expand Up @@ -1781,6 +1781,7 @@ def create_record_selector(
self,
model: RecordSelectorModel,
config: Config,
name: str,
*,
transformations: List[RecordTransformation],
decoder: Optional[Decoder] = None,
Expand Down Expand Up @@ -1811,6 +1812,7 @@ def create_record_selector(

return RecordSelector(
extractor=extractor,
name=name,
config=config,
record_filter=record_filter,
transformations=transformations,
Expand Down Expand Up @@ -1881,6 +1883,7 @@ def create_simple_retriever(
)
record_selector = self._create_component_from_model(
model=model.record_selector,
name=name,
config=config,
decoder=decoder,
transformations=transformations,
Expand Down Expand Up @@ -2035,6 +2038,7 @@ def create_async_retriever(
requester=download_requester,
record_selector=RecordSelector(
extractor=ResponseToFileExtractor(),
name=name,
record_filter=None,
transformations=[],
schema_normalization=TypeTransformer(TransformConfig.NoTransform),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
PaginationStrategy,
)
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
from airbyte_cdk.sources.types import Record


Expand All @@ -26,7 +27,11 @@ def is_met(self, record: Record) -> bool:


class CursorStopCondition(PaginationStopCondition):
def __init__(self, cursor: DeclarativeCursor):
def __init__(
self,
cursor: DeclarativeCursor
| ConcurrentCursor, # migrate to use both old and concurrent versions
):
self._cursor = cursor

def is_met(self, record: Record) -> bool:
Expand All @@ -47,8 +52,8 @@ def next_page_token(
return None
return self._delegate.next_page_token(response, last_page_size, last_record)

def reset(self) -> None:
self._delegate.reset()
def reset(self, reset_value: Optional[Any] = None) -> None:
self._delegate.reset(reset_value)

def get_page_size(self) -> Optional[int]:
return self._delegate.get_page_size()
Expand Down
15 changes: 11 additions & 4 deletions airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,9 @@ def _get_most_recent_record(
else:
return None

@staticmethod
def _extract_record(stream_data: StreamData, stream_slice: StreamSlice) -> Optional[Record]:
def _extract_record(
self, stream_data: StreamData, stream_slice: StreamSlice
) -> Optional[Record]:
"""
As we allow the output of _read_pages to be StreamData, it can be multiple things. Therefore, we need to filter out and normalize
to data to streamline the rest of the process.
Expand All @@ -478,9 +479,15 @@ def _extract_record(stream_data: StreamData, stream_slice: StreamSlice) -> Optio
# Record is not part of `StreamData` but is the most common implementation of `Mapping[str, Any]` which is part of `StreamData`
return stream_data
elif isinstance(stream_data, (dict, Mapping)):
return Record(dict(stream_data), stream_slice)
return Record(
data=dict(stream_data), associated_slice=stream_slice, stream_name=self.name
)
elif isinstance(stream_data, AirbyteMessage) and stream_data.record:
return Record(stream_data.record.data, stream_slice)
return Record(
data=stream_data.record.data, # type:ignore # AirbyteMessage always has record.data
associated_slice=stream_slice,
stream_name=self.name,
)
return None

# stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer
from airbyte_cdk.sources.types import StreamSlice
from airbyte_cdk.sources.types import Record, StreamSlice
from airbyte_cdk.utils.slice_hasher import SliceHasher


Expand Down Expand Up @@ -59,7 +58,11 @@ def __init__(
def read(self) -> Iterable[Record]:
for stream_data in self._retriever.read_records(self._json_schema, self._stream_slice):
if isinstance(stream_data, Mapping):
yield Record(stream_data, self)
yield Record(
data=stream_data,
stream_name=self.stream_name(),
associated_slice=self._stream_slice,
)
else:
self._message_repository.emit_message(stream_data)

Expand Down
6 changes: 3 additions & 3 deletions airbyte_cdk/sources/file_based/stream/concurrent/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
)
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.types import Record
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
from airbyte_cdk.sources.utils.slice_logger import SliceLogger

Expand Down Expand Up @@ -248,7 +248,7 @@ def read(self) -> Iterable[Record]:
self._stream.transformer.transform(
data_to_return, self._stream.get_json_schema()
)
yield Record(data_to_return, self)
yield Record(data=data_to_return, stream_name=self.stream_name())
elif (
isinstance(record_data, AirbyteMessage)
and record_data.type == Type.RECORD
Expand All @@ -266,7 +266,7 @@ def read(self) -> Iterable[Record]:
else:
yield Record(
data=record_message_data,
partition=self,
stream_name=self.stream_name(),
is_file_transfer_message=self._use_file_transfer(),
)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from airbyte_cdk.sources.file_based.types import StreamState
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from airbyte_cdk.sources.types import Record

if TYPE_CHECKING:
from airbyte_cdk.sources.file_based.stream.concurrent.adapters import FileBasedStreamPartition
Expand Down
Loading
Loading