diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 1dd5b962c..c92ffb150 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -31,6 +31,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DatetimeBasedCursor as DatetimeBasedCursorModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + IncrementingCountCursor as IncrementingCountCursorModel, +) from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, ) @@ -222,7 +225,7 @@ def _group_streams( and not incremental_sync_component_definition ) - if self._is_datetime_incremental_without_partition_routing( + if self._is_concurrent_cursor_incremental_without_partition_routing( declarative_stream, incremental_sync_component_definition ): stream_state = self._connector_state_manager.get_stream_state( @@ -254,15 +257,26 @@ def _group_streams( stream_slicer=declarative_stream.retriever.stream_slicer, ) else: - cursor = ( - self._constructor.create_concurrent_cursor_from_datetime_based_cursor( + if ( + incremental_sync_component_definition + and incremental_sync_component_definition.get("type") + == IncrementingCountCursorModel.__name__ + ): + cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( + model_type=IncrementingCountCursorModel, + component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above + stream_name=declarative_stream.name, + stream_namespace=declarative_stream.namespace, + config=config or {}, + ) + else: + cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( model_type=DatetimeBasedCursorModel, component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above stream_name=declarative_stream.name, stream_namespace=declarative_stream.namespace, config=config or {}, ) - ) partition_generator = StreamSlicerPartitionGenerator( partition_factory=DeclarativePartitionFactory( declarative_stream.name, @@ -389,7 +403,7 @@ def _group_streams( return concurrent_streams, synchronous_streams - def _is_datetime_incremental_without_partition_routing( + def _is_concurrent_cursor_incremental_without_partition_routing( self, declarative_stream: DeclarativeStream, incremental_sync_component_definition: Mapping[str, Any] | None, @@ -397,11 +411,18 @@ def _is_datetime_incremental_without_partition_routing( return ( incremental_sync_component_definition is not None and bool(incremental_sync_component_definition) - and incremental_sync_component_definition.get("type", "") - == DatetimeBasedCursorModel.__name__ + and ( + incremental_sync_component_definition.get("type", "") + in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) + ) and hasattr(declarative_stream.retriever, "stream_slicer") and ( isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) + # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor + # add isintance check here if we want to create a Declarative IncrementingCountCursor + # or isinstance( + # declarative_stream.retriever.stream_slicer, IncrementingCountCursor + # ) or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) ) ) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 5bd110c4e..be7224a63 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -777,6 +777,44 @@ definitions: type: type: string enum: [LegacyToPerPartitionStateMigration] + IncrementingCountCursor: + title: Incrementing Count Cursor + description: Cursor that allows for incremental sync according to a continuously increasing integer. + type: object + required: + - type + - cursor_field + properties: + type: + type: string + enum: [IncrementingCountCursor] + cursor_field: + title: Cursor Field + description: The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top. + type: string + interpolation_context: + - config + examples: + - "created_at" + - "{{ config['record_cursor'] }}" + start_value: + title: Start Value + description: The value that determines the earliest record that should be synced. + anyOf: + - type: string + - type: integer + interpolation_context: + - config + examples: + - 0 + - "{{ config['start_value'] }}" + start_value_option: + title: Inject Start Value Into Outgoing HTTP Request + description: Optionally configures how the start value will be sent in requests to the source API. + "$ref": "#/definitions/RequestOption" + $parameters: + type: object + additionalProperties: true DatetimeBasedCursor: title: Datetime Based Cursor description: Cursor to provide incremental capabilities over datetime. @@ -1318,6 +1356,7 @@ definitions: anyOf: - "$ref": "#/definitions/CustomIncrementalSync" - "$ref": "#/definitions/DatetimeBasedCursor" + - "$ref": "#/definitions/IncrementingCountCursor" name: title: Name description: The stream name. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 7ff18fa1d..c1e4d95c3 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1508,6 +1508,28 @@ class AuthFlow(BaseModel): oauth_config_specification: Optional[OAuthConfigSpecification] = None +class IncrementingCountCursor(BaseModel): + type: Literal["IncrementingCountCursor"] + cursor_field: str = Field( + ..., + description="The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top.", + examples=["created_at", "{{ config['record_cursor'] }}"], + title="Cursor Field", + ) + start_value: Optional[Union[str, int]] = Field( + None, + description="The value that determines the earliest record that should be synced.", + examples=[0, "{{ config['start_value'] }}"], + title="Start Value", + ) + start_value_option: Optional[RequestOption] = Field( + None, + description="Optionally configures how the start value will be sent in requests to the source API.", + title="Inject Start Value Into Outgoing HTTP Request", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class DatetimeBasedCursor(BaseModel): type: Literal["DatetimeBasedCursor"] clamping: Optional[Clamping] = Field( @@ -1948,7 +1970,9 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + incremental_sync: Optional[ + Union[CustomIncrementalSync, DatetimeBasedCursor, IncrementingCountCursor] + ] = Field( None, description="Component used to fetch data incrementally based on a time field in the data.", title="Incremental Sync", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 452c4e84a..2adb738fe 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -245,6 +245,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpResponseFilter as HttpResponseFilterModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + IncrementingCountCursor as IncrementingCountCursorModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( InlineSchemaLoader as InlineSchemaLoaderModel, ) @@ -496,6 +499,9 @@ CustomFormatConcurrentStreamStateConverter, DateTimeStreamStateConverter, ) +from airbyte_cdk.sources.streams.concurrent.state_converters.incrementing_count_stream_state_converter import ( + IncrementingCountStreamStateConverter, +) from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction from airbyte_cdk.sources.types import Config from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer @@ -584,6 +590,7 @@ def _init_mappings(self) -> None: FlattenFieldsModel: self.create_flatten_fields, DpathFlattenFieldsModel: self.create_dpath_flatten_fields, IterableDecoderModel: self.create_iterable_decoder, + IncrementingCountCursorModel: self.create_incrementing_count_cursor, XmlDecoderModel: self.create_xml_decoder, JsonFileSchemaLoaderModel: self.create_json_file_schema_loader, DynamicSchemaLoaderModel: self.create_dynamic_schema_loader, @@ -1189,6 +1196,70 @@ def create_concurrent_cursor_from_datetime_based_cursor( clamping_strategy=clamping_strategy, ) + def create_concurrent_cursor_from_incrementing_count_cursor( + self, + model_type: Type[BaseModel], + component_definition: ComponentDefinition, + stream_name: str, + stream_namespace: Optional[str], + config: Config, + message_repository: Optional[MessageRepository] = None, + **kwargs: Any, + ) -> ConcurrentCursor: + # Per-partition incremental streams can dynamically create child cursors which will pass their current + # state via the stream_state keyword argument. Incremental syncs without parent streams use the + # incoming state and connector_state_manager that is initialized when the component factory is created + stream_state = ( + self._connector_state_manager.get_stream_state(stream_name, stream_namespace) + if "stream_state" not in kwargs + else kwargs["stream_state"] + ) + + component_type = component_definition.get("type") + if component_definition.get("type") != model_type.__name__: + raise ValueError( + f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead" + ) + + incrementing_count_cursor_model = model_type.parse_obj(component_definition) + + if not isinstance(incrementing_count_cursor_model, IncrementingCountCursorModel): + raise ValueError( + f"Expected {model_type.__name__} component, but received {incrementing_count_cursor_model.__class__.__name__}" + ) + + interpolated_start_value = ( + InterpolatedString.create( + incrementing_count_cursor_model.start_value, # type: ignore + parameters=incrementing_count_cursor_model.parameters or {}, + ) + if incrementing_count_cursor_model.start_value + else 0 + ) + + interpolated_cursor_field = InterpolatedString.create( + incrementing_count_cursor_model.cursor_field, + parameters=incrementing_count_cursor_model.parameters or {}, + ) + cursor_field = CursorField(interpolated_cursor_field.eval(config=config)) + + connector_state_converter = IncrementingCountStreamStateConverter( + is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state + ) + + return ConcurrentCursor( + stream_name=stream_name, + stream_namespace=stream_namespace, + stream_state=stream_state, + message_repository=message_repository or self._message_repository, + connector_state_manager=self._connector_state_manager, + connector_state_converter=connector_state_converter, + cursor_field=cursor_field, + slice_boundary_fields=None, + start=interpolated_start_value, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_provider=connector_state_converter.get_end_provider(), # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + ) + def _assemble_weekday(self, weekday: str) -> Weekday: match weekday: case "MONDAY": @@ -1622,6 +1693,31 @@ def create_declarative_stream( config=config, parameters=model.parameters or {}, ) + elif model.incremental_sync and isinstance( + model.incremental_sync, IncrementingCountCursorModel + ): + cursor_model: IncrementingCountCursorModel = model.incremental_sync # type: ignore + + start_time_option = ( + self._create_component_from_model( + cursor_model.start_value_option, # type: ignore # mypy still thinks cursor_model of type DatetimeBasedCursor + config, + parameters=cursor_model.parameters or {}, + ) + if cursor_model.start_value_option # type: ignore # mypy still thinks cursor_model of type DatetimeBasedCursor + else None + ) + + # The concurrent engine defaults the start/end fields on the slice to "start" and "end", but + # the default DatetimeBasedRequestOptionsProvider() sets them to start_time/end_time + partition_field_start = "start" + + request_options_provider = DatetimeBasedRequestOptionsProvider( + start_time_option=start_time_option, + partition_field_start=partition_field_start, + config=config, + parameters=model.parameters or {}, + ) else: request_options_provider = None @@ -2111,6 +2207,22 @@ def create_gzip_decoder( stream_response=False if self._emit_connector_builder_messages else True, ) + @staticmethod + def create_incrementing_count_cursor( + model: IncrementingCountCursorModel, config: Config, **kwargs: Any + ) -> DatetimeBasedCursor: + # This should not actually get used anywhere at runtime, but needed to add this to pass checks since + # we still parse models into components. The issue is that there's no runtime implementation of a + # IncrementingCountCursor. + # A known and expected issue with this stub is running a check with the declared IncrementingCountCursor because it is run without ConcurrentCursor. + return DatetimeBasedCursor( + cursor_field=model.cursor_field, + datetime_format="%Y-%m-%d", + start_datetime="2024-12-12", + config=config, + parameters={}, + ) + @staticmethod def create_iterable_decoder( model: IterableDecoderModel, config: Config, **kwargs: Any diff --git a/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py b/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py index 987915317..ccff41ba7 100644 --- a/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py +++ b/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py @@ -4,7 +4,7 @@ from abc import ABC, abstractmethod from enum import Enum -from typing import TYPE_CHECKING, Any, List, MutableMapping, Optional, Tuple +from typing import TYPE_CHECKING, Any, Callable, List, MutableMapping, Optional, Tuple if TYPE_CHECKING: from airbyte_cdk.sources.streams.concurrent.cursor import CursorField @@ -12,6 +12,7 @@ class ConcurrencyCompatibleStateType(Enum): date_range = "date-range" + integer = "integer" class AbstractStreamStateConverter(ABC): diff --git a/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py b/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py new file mode 100644 index 000000000..fecc984bc --- /dev/null +++ b/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py @@ -0,0 +1,92 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Callable, MutableMapping, Optional, Tuple + +from airbyte_cdk.sources.streams.concurrent.cursor import CursorField +from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( + AbstractStreamStateConverter, + ConcurrencyCompatibleStateType, +) + + +class IncrementingCountStreamStateConverter(AbstractStreamStateConverter): + def _from_state_message(self, value: Any) -> Any: + return value + + def _to_state_message(self, value: Any) -> Any: + return value + + @classmethod + def get_end_provider(cls) -> Callable[[], float]: + return lambda: float("inf") + + def convert_from_sequential_state( + self, + cursor_field: "CursorField", # to deprecate as it is only needed for sequential state + stream_state: MutableMapping[str, Any], + start: Optional[Any], + ) -> Tuple[Any, MutableMapping[str, Any]]: + """ + Convert the state message to the format required by the ConcurrentCursor. + + e.g. + { + "state_type": ConcurrencyCompatibleStateType.date_range.value, + "metadata": { … }, + "slices": [ + {"start": "10", "end": "2021-01-18T21:18:20.000+00:00"}, + ] + } + """ + sync_start = self._get_sync_start(cursor_field, stream_state, start) + if self.is_state_message_compatible(stream_state): + return sync_start, stream_state + + # Create a slice to represent the records synced during prior syncs. + # The start and end are the same to avoid confusion as to whether the records for this slice + # were actually synced + slices = [ + { + self.START_KEY: start if start is not None else sync_start, + self.END_KEY: sync_start, # this may not be relevant anymore + self.MOST_RECENT_RECORD_KEY: sync_start, + } + ] + + return sync_start, { + "state_type": ConcurrencyCompatibleStateType.integer.value, + "slices": slices, + "legacy": stream_state, + } + + def parse_value(self, value: int) -> int: + return value + + @property + def zero_value(self) -> int: + return 0 + + def increment(self, value: int) -> int: + return value + 1 + + def output_format(self, value: int) -> int: + return value + + def _get_sync_start( + self, + cursor_field: CursorField, + stream_state: MutableMapping[str, Any], + start: Optional[int], + ) -> int: + sync_start = start if start is not None else self.zero_value + prev_sync_low_water_mark: Optional[int] = ( + stream_state[cursor_field.cursor_field_key] + if cursor_field.cursor_field_key in stream_state + else None + ) + if prev_sync_low_water_mark and prev_sync_low_water_mark >= sync_start: + return prev_sync_low_water_mark + else: + return sync_start diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 188256b10..45813c4f2 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -4,6 +4,7 @@ import copy import json +import math from datetime import datetime, timedelta, timezone from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union from unittest.mock import patch @@ -43,6 +44,9 @@ from airbyte_cdk.sources.streams.checkpoint import Cursor from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream +from airbyte_cdk.sources.streams.concurrent.state_converters.incrementing_count_stream_state_converter import ( + IncrementingCountStreamStateConverter, +) from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.types import Record, StreamSlice from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -230,6 +234,16 @@ "inject_into": "request_parameter", }, }, + "incremental_counting_cursor": { + "type": "IncrementingCountCursor", + "cursor_field": "id", + "start_value": 0, + "start_time_option": { + "type": "RequestOption", + "field_name": "since_id", + "inject_into": "request_parameter", + }, + }, "base_stream": {"retriever": {"$ref": "#/definitions/retriever"}}, "base_incremental_stream": { "retriever": { @@ -238,6 +252,13 @@ }, "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, }, + "base_incremental_counting_stream": { + "retriever": { + "$ref": "#/definitions/retriever", + "requester": {"$ref": "#/definitions/requester"}, + }, + "incremental_sync": {"$ref": "#/definitions/incremental_counting_cursor"}, + }, "party_members_stream": { "$ref": "#/definitions/base_incremental_stream", "retriever": { @@ -527,6 +548,35 @@ }, }, }, + "incremental_counting_stream": { + "$ref": "#/definitions/base_incremental_counting_stream", + "retriever": { + "$ref": "#/definitions/base_incremental_counting_stream/retriever", + "record_selector": {"$ref": "#/definitions/selector"}, + }, + "$parameters": { + "name": "incremental_counting_stream", + "primary_key": "id", + "path": "/party_members", + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + "name": { + "description": "The name of the party member", + "type": ["null", "string"], + }, + }, + }, + }, + }, }, "streams": [ "#/definitions/party_members_stream", @@ -536,6 +586,7 @@ "#/definitions/arcana_personas_stream", "#/definitions/palace_enemies_stream", "#/definitions/async_job_stream", + "#/definitions/incremental_counting_stream", ], "check": {"stream_names": ["party_members", "locations"]}, "concurrency_level": { @@ -658,9 +709,9 @@ def test_group_streams(): ) concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) - # 1 full refresh stream, 2 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental + # 1 full refresh stream, 3 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental # 1 async job stream, 1 substream w/ incremental - assert len(concurrent_streams) == 7 + assert len(concurrent_streams) == 8 ( concurrent_stream_0, concurrent_stream_1, @@ -669,6 +720,7 @@ def test_group_streams(): concurrent_stream_4, concurrent_stream_5, concurrent_stream_6, + concurrent_stream_7, ) = concurrent_streams assert isinstance(concurrent_stream_0, DefaultStream) assert concurrent_stream_0.name == "party_members" @@ -684,6 +736,8 @@ def test_group_streams(): assert concurrent_stream_5.name == "palace_enemies" assert isinstance(concurrent_stream_6, DefaultStream) assert concurrent_stream_6.name == "async_job_stream" + assert isinstance(concurrent_stream_7, DefaultStream) + assert concurrent_stream_7.name == "incremental_counting_stream" @freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) @@ -756,6 +810,20 @@ def test_create_concurrent_cursor(): "state_type": "date-range", } + incremental_counting_stream = concurrent_streams[7] + assert isinstance(incremental_counting_stream, DefaultStream) + incremental_counting_cursor = incremental_counting_stream.cursor + + assert isinstance(incremental_counting_cursor, ConcurrentCursor) + assert isinstance( + incremental_counting_cursor._connector_state_converter, + IncrementingCountStreamStateConverter, + ) + assert incremental_counting_cursor._stream_name == "incremental_counting_stream" + assert incremental_counting_cursor._cursor_field.cursor_field_key == "id" + assert incremental_counting_cursor._start == 0 + assert incremental_counting_cursor._end_provider() == math.inf + def test_check(): """ @@ -808,6 +876,7 @@ def test_discover(): "arcana_personas", "palace_enemies", "async_job_stream", + "incremental_counting_stream", } source = ConcurrentDeclarativeSource( diff --git a/unit_tests/sources/streams/concurrent/test_incrementing_count_state_converter.py b/unit_tests/sources/streams/concurrent/test_incrementing_count_state_converter.py new file mode 100644 index 000000000..460d094be --- /dev/null +++ b/unit_tests/sources/streams/concurrent/test_incrementing_count_state_converter.py @@ -0,0 +1,29 @@ +from airbyte_cdk.sources.streams.concurrent.cursor import CursorField +from airbyte_cdk.sources.streams.concurrent.state_converters.incrementing_count_stream_state_converter import ( + IncrementingCountStreamStateConverter, +) + + +def test_convert_from_sequential_state(): + converter = IncrementingCountStreamStateConverter( + is_sequential_state=True, + ) + + _, conversion = converter.convert_from_sequential_state(CursorField("id"), {"id": 12345}, 0) + + assert conversion["state_type"] == "integer" + assert conversion["legacy"] == {"id": 12345} + assert len(conversion["slices"]) == 1 + assert conversion["slices"][0] == {"end": 12345, "most_recent_cursor_value": 12345, "start": 0} + + +def test_convert_to_sequential_state(): + converter = IncrementingCountStreamStateConverter( + is_sequential_state=True, + ) + concurrent_state = { + "legacy": {"id": 12345}, + "slices": [{"end": 12345, "most_recent_cursor_value": 12345, "start": 0}], + "state_type": "integer", + } + assert converter.convert_to_state_message(CursorField("id"), concurrent_state) == {"id": 12345}