From c673c2c9b1edf69a3aad0e1201e4df296f760403 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Fri, 7 Feb 2025 17:27:14 -0800 Subject: [PATCH 01/18] small poc of the new IncrementingCountCursor that reuses the ConcurrentCursor implementation --- .../concurrent_declarative_source.py | 32 ++++- .../declarative_component_schema.yaml | 39 ++++++ .../models/declarative_component_schema.py | 26 +++- .../parsers/model_to_component_factory.py | 118 ++++++++++++++++++ .../abstract_stream_state_converter.py | 3 +- ...crementing_count_stream_state_converter.py | 92 ++++++++++++++ 6 files changed, 303 insertions(+), 7 deletions(-) create mode 100644 airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index d4ecc0084..73b35c7cd 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -32,6 +32,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DatetimeBasedCursor as DatetimeBasedCursorModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + IncrementingCountCursor as IncrementingCountCursorModel, +) from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, ) @@ -249,15 +252,27 @@ def _group_streams( stream_slicer=declarative_stream.retriever.stream_slicer, ) else: - cursor = ( - self._constructor.create_concurrent_cursor_from_datetime_based_cursor( + cursor: ConcurrentCursor + if ( + incremental_sync_component_definition.get("type") + == IncrementingCountCursorModel.__name__ + ): + cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( + model_type=IncrementingCountCursorModel, + component_definition=incremental_sync_component_definition, + # type: ignore # Not None because of the if condition above + stream_name=declarative_stream.name, + stream_namespace=declarative_stream.namespace, + config=config or {}, + ) + else: + cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( model_type=DatetimeBasedCursorModel, component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above stream_name=declarative_stream.name, stream_namespace=declarative_stream.namespace, config=config or {}, ) - ) partition_generator = StreamSlicerPartitionGenerator( partition_factory=DeclarativePartitionFactory( declarative_stream.name, @@ -385,14 +400,21 @@ def _is_datetime_incremental_without_partition_routing( return ( incremental_sync_component_definition is not None and bool(incremental_sync_component_definition) - and incremental_sync_component_definition.get("type", "") - == DatetimeBasedCursorModel.__name__ + and ( + incremental_sync_component_definition.get("type", "") + == DatetimeBasedCursorModel.__name__ + or incremental_sync_component_definition.get("type", "") + == IncrementingCountCursorModel.__name__ + ) and self._stream_supports_concurrent_partition_processing( declarative_stream=declarative_stream ) and hasattr(declarative_stream.retriever, "stream_slicer") and ( isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) + or isinstance( + declarative_stream.retriever.stream_slicer, IncrementingCountCursorModel + ) or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) ) ) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 072a1efcd..ccaee9d3c 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -776,6 +776,44 @@ definitions: type: type: string enum: [LegacyToPerPartitionStateMigration] + IncrementingCountCursor: + title: Incrementing Count Cursor + description: Cursor that allows for incremental sync according to a continuously increasing number + type: object + required: + - type + - cursor_field + properties: + type: + type: string + enum: [IncrementingCountCursor] + cursor_field: + title: Cursor Field + description: The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top. + type: string + interpolation_context: + - config + examples: + - "created_at" + - "{{ config['record_cursor'] }}" + start_value: + title: Start Value + description: The value that determines the earliest record that should be synced. + anyOf: + - type: string + - type: integer + interpolation_context: + - config + examples: + - 0 + - "{{ config['start_value'] }}" + start_value_option: + title: Inject Start Value Into Outgoing HTTP Request + description: Optionally configures how the start datetime will be sent in requests to the source API. + "$ref": "#/definitions/RequestOption" + $parameters: + type: object + additionalProperties: true DatetimeBasedCursor: title: Datetime Based Cursor description: Cursor to provide incremental capabilities over datetime. @@ -1317,6 +1355,7 @@ definitions: anyOf: - "$ref": "#/definitions/CustomIncrementalSync" - "$ref": "#/definitions/DatetimeBasedCursor" + - "$ref": "#/definitions/IncrementingCountCursor" name: title: Name description: The stream name. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index fe29cee2c..8e87fa55d 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1485,6 +1485,28 @@ class AuthFlow(BaseModel): oauth_config_specification: Optional[OAuthConfigSpecification] = None +class IncrementingCountCursor(BaseModel): + type: Literal["IncrementingCountCursor"] + cursor_field: str = Field( + ..., + description="The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top.", + examples=["created_at", "{{ config['record_cursor'] }}"], + title="Cursor Field", + ) + start_value: Optional[Union[str, int]] = Field( + None, + description="The value that determines the earliest record that should be synced.", + examples=[0, "{{ config['start_value'] }}"], + title="Start Value", + ) + start_value_option: Optional[RequestOption] = Field( + None, + description="Optionally configures how the start datetime will be sent in requests to the source API.", + title="Inject Start Value Into Outgoing HTTP Request", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class DatetimeBasedCursor(BaseModel): type: Literal["DatetimeBasedCursor"] clamping: Optional[Clamping] = Field( @@ -1846,7 +1868,9 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + incremental_sync: Optional[ + Union[CustomIncrementalSync, DatetimeBasedCursor, IncrementingCountCursor] + ] = Field( None, description="Component used to fetch data incrementally based on a time field in the data.", title="Incremental Sync", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index a664b8530..45286aaab 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -244,6 +244,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpResponseFilter as HttpResponseFilterModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + IncrementingCountCursor as IncrementingCountCursorModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( InlineSchemaLoader as InlineSchemaLoaderModel, ) @@ -483,6 +486,9 @@ CustomFormatConcurrentStreamStateConverter, DateTimeStreamStateConverter, ) +from airbyte_cdk.sources.streams.concurrent.state_converters.incrementing_count_stream_state_converter import ( + IncrementingCountStreamStateConverter, +) from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction from airbyte_cdk.sources.types import Config from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer @@ -573,6 +579,7 @@ def _init_mappings(self) -> None: FlattenFieldsModel: self.create_flatten_fields, DpathFlattenFieldsModel: self.create_dpath_flatten_fields, IterableDecoderModel: self.create_iterable_decoder, + IncrementingCountCursorModel: self.create_incrementing_count_cursor, XmlDecoderModel: self.create_xml_decoder, JsonFileSchemaLoaderModel: self.create_json_file_schema_loader, DynamicSchemaLoaderModel: self.create_dynamic_schema_loader, @@ -1159,6 +1166,77 @@ def create_concurrent_cursor_from_datetime_based_cursor( clamping_strategy=clamping_strategy, ) + def create_concurrent_cursor_from_incrementing_count_cursor( + self, + model_type: Type[BaseModel], + component_definition: ComponentDefinition, + stream_name: str, + stream_namespace: Optional[str], + config: Config, + message_repository: Optional[MessageRepository] = None, + **kwargs: Any, + ) -> ConcurrentCursor: + # Per-partition incremental streams can dynamically create child cursors which will pass their current + # state via the stream_state keyword argument. Incremental syncs without parent streams use the + # incoming state and connector_state_manager that is initialized when the component factory is created + stream_state = ( + self._connector_state_manager.get_stream_state(stream_name, stream_namespace) + if "stream_state" not in kwargs + else kwargs["stream_state"] + ) + + component_type = component_definition.get("type") + if component_definition.get("type") != model_type.__name__: + raise ValueError( + f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead" + ) + + incrementing_count_cursor_model = model_type.parse_obj(component_definition) + + if not isinstance(incrementing_count_cursor_model, IncrementingCountCursorModel): + raise ValueError( + f"Expected {model_type.__name__} component, but received {incrementing_count_cursor_model.__class__.__name__}" + ) + + interpolated_start_value = ( + InterpolatedString.create( + incrementing_count_cursor_model.start_value, + parameters=incrementing_count_cursor_model.parameters or {}, + ) + if incrementing_count_cursor_model.start_value + else 0 + ) + + interpolated_cursor_field = InterpolatedString.create( + incrementing_count_cursor_model.cursor_field, + parameters=incrementing_count_cursor_model.parameters or {}, + ) + cursor_field = CursorField(interpolated_cursor_field.eval(config=config)) + + # todo brian: what do we do about the state converter, is there a way to signal valid integer and what + # should the end provider be when the end time is an unknown uncapped value? + + # hmmmmmm what if we create a new uncapped_end_date_provider which gives back infinite and that indicates + # to the concurrent cursor to return an empty slice interval and we create a connector_state_converter + # that basically just returns an integer + + connector_state_converter = IncrementingCountStreamStateConverter( + is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state + ) + + return ConcurrentCursor( + stream_name=stream_name, + stream_namespace=stream_namespace, + stream_state=stream_state, + message_repository=message_repository or self._message_repository, + connector_state_manager=self._connector_state_manager, + connector_state_converter=connector_state_converter, + cursor_field=cursor_field, + slice_boundary_fields=None, + start=interpolated_start_value, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_provider=connector_state_converter.get_end_provider(), # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + ) + def _assemble_weekday(self, weekday: str) -> Weekday: match weekday: case "MONDAY": @@ -1589,6 +1667,31 @@ def create_declarative_stream( config=config, parameters=model.parameters or {}, ) + elif model.incremental_sync and isinstance( + model.incremental_sync, IncrementingCountCursorModel + ): + cursor_model = model.incremental_sync + + start_time_option = ( + self._create_component_from_model( + cursor_model.start_value_option, + config, + parameters=cursor_model.parameters or {}, + ) + if cursor_model.start_value_option + else None + ) + + # The concurrent engine defaults the start/end fields on the slice to "start" and "end", but + # the default DatetimeBasedRequestOptionsProvider() sets them to start_time/end_time + partition_field_start = "start" + + request_options_provider = DatetimeBasedRequestOptionsProvider( + start_time_option=start_time_option, + partition_field_start=partition_field_start, + config=config, + parameters=model.parameters or {}, + ) else: request_options_provider = None @@ -2071,6 +2174,21 @@ def create_json_line_parser( ) -> JsonLineParser: return JsonLineParser(encoding=model.encoding) + @staticmethod + def create_incrementing_count_cursor( + model: IncrementingCountCursorModel, config: Config, **kwargs: Any + ) -> DatetimeBasedCursor: + # This should not actually get used anywhere at runtime, but needed to add this to pass checks since + # we still parse models into components. The issue is that there's no runtime implementation of a + # IncrementingCountCursor. Maybe this should just be a pass or returns None? + return DatetimeBasedCursor( + cursor_field=model.cursor_field, + datetime_format="%Y-%m-%d", + start_datetime="2024-12-12", + config=config, + parameters={}, + ) + @staticmethod def create_iterable_decoder( model: IterableDecoderModel, config: Config, **kwargs: Any diff --git a/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py b/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py index 987915317..ccff41ba7 100644 --- a/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py +++ b/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py @@ -4,7 +4,7 @@ from abc import ABC, abstractmethod from enum import Enum -from typing import TYPE_CHECKING, Any, List, MutableMapping, Optional, Tuple +from typing import TYPE_CHECKING, Any, Callable, List, MutableMapping, Optional, Tuple if TYPE_CHECKING: from airbyte_cdk.sources.streams.concurrent.cursor import CursorField @@ -12,6 +12,7 @@ class ConcurrencyCompatibleStateType(Enum): date_range = "date-range" + integer = "integer" class AbstractStreamStateConverter(ABC): diff --git a/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py b/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py new file mode 100644 index 000000000..06cd5321e --- /dev/null +++ b/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py @@ -0,0 +1,92 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Callable, MutableMapping, Optional, Tuple + +from airbyte_cdk.sources.streams.concurrent.cursor import CursorField +from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( + AbstractStreamStateConverter, + ConcurrencyCompatibleStateType, +) + + +class IncrementingCountStreamStateConverter(AbstractStreamStateConverter): + def _from_state_message(self, value: Any) -> Any: + return value + + def _to_state_message(self, value: Any) -> Any: + return value + + @classmethod + def get_end_provider(cls) -> Callable[[], float]: + return lambda: float("inf") # i sort of hate that this is a float, shakes fist at python + + def convert_from_sequential_state( + self, + cursor_field: "CursorField", # to deprecate as it is only needed for sequential state + stream_state: MutableMapping[str, Any], + start: Optional[Any], + ) -> Tuple[Any, MutableMapping[str, Any]]: + """ + Convert the state message to the format required by the ConcurrentCursor. + + e.g. + { + "state_type": ConcurrencyCompatibleStateType.date_range.value, + "metadata": { … }, + "slices": [ + {"start": "10", "end": "2021-01-18T21:18:20.000+00:00"}, + ] + } + """ + sync_start = self._get_sync_start(cursor_field, stream_state, start) + if self.is_state_message_compatible(stream_state): + return sync_start, stream_state + + # Create a slice to represent the records synced during prior syncs. + # The start and end are the same to avoid confusion as to whether the records for this slice + # were actually synced + slices = [ + { + self.START_KEY: start if start is not None else sync_start, + self.END_KEY: sync_start, # this may not be relevant anymore + self.MOST_RECENT_RECORD_KEY: sync_start, + } + ] + + return sync_start, { + "state_type": ConcurrencyCompatibleStateType.integer.value, + "slices": slices, + "legacy": stream_state, + } + + def parse_value(self, value: int) -> int: # should we use a string which is "higher fidelity + return value + + @property + def zero_value(self) -> int: + return 0 + + def increment(self, value: int) -> int: + return value + 1 + + def output_format(self, value: int) -> int: + return value + + def _get_sync_start( + self, + cursor_field: CursorField, + stream_state: MutableMapping[str, Any], + start: Optional[int], + ) -> int: + sync_start = start if start is not None else self.zero_value + prev_sync_low_water_mark = ( + stream_state[cursor_field.cursor_field_key] + if cursor_field.cursor_field_key in stream_state + else None + ) + if prev_sync_low_water_mark and prev_sync_low_water_mark >= sync_start: + return prev_sync_low_water_mark + else: + return sync_start From cfe98955a9609e1006ae3ec6a50892166a5a0f9a Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 18 Feb 2025 18:20:44 +0100 Subject: [PATCH 02/18] CDK: ref add comment Signed-off-by: Artem Inzhyyants --- .../declarative/concurrent_declarative_source.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 73b35c7cd..b5588b304 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -402,9 +402,7 @@ def _is_datetime_incremental_without_partition_routing( and bool(incremental_sync_component_definition) and ( incremental_sync_component_definition.get("type", "") - == DatetimeBasedCursorModel.__name__ - or incremental_sync_component_definition.get("type", "") - == IncrementingCountCursorModel.__name__ + in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) ) and self._stream_supports_concurrent_partition_processing( declarative_stream=declarative_stream @@ -412,9 +410,11 @@ def _is_datetime_incremental_without_partition_routing( and hasattr(declarative_stream.retriever, "stream_slicer") and ( isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) - or isinstance( - declarative_stream.retriever.stream_slicer, IncrementingCountCursorModel - ) + # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor + # add isntance check here if we want to have a IncrementingCountCursor + # or isinstance( + # declarative_stream.retriever.stream_slicer, IncrementingCountCursor + # ) or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) ) ) From e8765ebdd82e23acd32871d1b6960c2fafdd0358 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 18 Feb 2025 18:20:56 +0100 Subject: [PATCH 03/18] CDK: add test Signed-off-by: Artem Inzhyyants --- ...test_incrementing_count_state_converter.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 unit_tests/sources/streams/concurrent/test_incrementing_count_state_converter.py diff --git a/unit_tests/sources/streams/concurrent/test_incrementing_count_state_converter.py b/unit_tests/sources/streams/concurrent/test_incrementing_count_state_converter.py new file mode 100644 index 000000000..460d094be --- /dev/null +++ b/unit_tests/sources/streams/concurrent/test_incrementing_count_state_converter.py @@ -0,0 +1,29 @@ +from airbyte_cdk.sources.streams.concurrent.cursor import CursorField +from airbyte_cdk.sources.streams.concurrent.state_converters.incrementing_count_stream_state_converter import ( + IncrementingCountStreamStateConverter, +) + + +def test_convert_from_sequential_state(): + converter = IncrementingCountStreamStateConverter( + is_sequential_state=True, + ) + + _, conversion = converter.convert_from_sequential_state(CursorField("id"), {"id": 12345}, 0) + + assert conversion["state_type"] == "integer" + assert conversion["legacy"] == {"id": 12345} + assert len(conversion["slices"]) == 1 + assert conversion["slices"][0] == {"end": 12345, "most_recent_cursor_value": 12345, "start": 0} + + +def test_convert_to_sequential_state(): + converter = IncrementingCountStreamStateConverter( + is_sequential_state=True, + ) + concurrent_state = { + "legacy": {"id": 12345}, + "slices": [{"end": 12345, "most_recent_cursor_value": 12345, "start": 0}], + "state_type": "integer", + } + assert converter.convert_to_state_message(CursorField("id"), concurrent_state) == {"id": 12345} From 642e856d8112a831b8feae35a549c694b83f2537 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 19 Feb 2025 16:26:52 +0100 Subject: [PATCH 04/18] CDK: ref Signed-off-by: Artem Inzhyyants --- .../incrementing_count_stream_state_converter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py b/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py index 06cd5321e..a392113b1 100644 --- a/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py +++ b/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py @@ -20,7 +20,7 @@ def _to_state_message(self, value: Any) -> Any: @classmethod def get_end_provider(cls) -> Callable[[], float]: - return lambda: float("inf") # i sort of hate that this is a float, shakes fist at python + return lambda: float("inf") def convert_from_sequential_state( self, @@ -61,7 +61,7 @@ def convert_from_sequential_state( "legacy": stream_state, } - def parse_value(self, value: int) -> int: # should we use a string which is "higher fidelity + def parse_value(self, value: int) -> int: return value @property From 8e8182590174ceddba757734f79d4f1341129837 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 21 Feb 2025 17:45:28 +0100 Subject: [PATCH 05/18] CDK: add comment Signed-off-by: Artem Inzhyyants --- .../sources/declarative/parsers/model_to_component_factory.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 45286aaab..977c3bb50 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2180,7 +2180,8 @@ def create_incrementing_count_cursor( ) -> DatetimeBasedCursor: # This should not actually get used anywhere at runtime, but needed to add this to pass checks since # we still parse models into components. The issue is that there's no runtime implementation of a - # IncrementingCountCursor. Maybe this should just be a pass or returns None? + # IncrementingCountCursor. + # A known and expected issue with this stub is running a check with the declared IncrementingCountCursor because it is run without ConcurrentCursor. return DatetimeBasedCursor( cursor_field=model.cursor_field, datetime_format="%Y-%m-%d", From 8a706407978fae55b7c9392ff6805cf62a86c2b2 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 21 Feb 2025 18:42:15 +0100 Subject: [PATCH 06/18] CDK: ref Signed-off-by: Artem Inzhyyants --- .../sources/declarative/concurrent_declarative_source.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 19b46df28..4d8d63802 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -416,9 +416,6 @@ def _is_datetime_incremental_without_partition_routing( incremental_sync_component_definition.get("type", "") in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) ) - and self._stream_supports_concurrent_partition_processing( - declarative_stream=declarative_stream - ) and hasattr(declarative_stream.retriever, "stream_slicer") and ( isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) From bc31f47c2edc593e86ef17d411eb37a678077074 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 21 Feb 2025 19:14:39 +0100 Subject: [PATCH 07/18] CDK: fix mypy Signed-off-by: Artem Inzhyyants --- .../sources/declarative/concurrent_declarative_source.py | 8 ++++---- .../incrementing_count_stream_state_converter.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 4d8d63802..5006c9d81 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -257,15 +257,15 @@ def _group_streams( stream_slicer=declarative_stream.retriever.stream_slicer, ) else: - cursor: ConcurrentCursor + # cursor: ConcurrentCursor if ( - incremental_sync_component_definition.get("type") + incremental_sync_component_definition + and incremental_sync_component_definition.get("type") == IncrementingCountCursorModel.__name__ ): cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( model_type=IncrementingCountCursorModel, - component_definition=incremental_sync_component_definition, - # type: ignore # Not None because of the if condition above + component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above stream_name=declarative_stream.name, stream_namespace=declarative_stream.namespace, config=config or {}, diff --git a/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py b/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py index a392113b1..fecc984bc 100644 --- a/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py +++ b/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py @@ -81,7 +81,7 @@ def _get_sync_start( start: Optional[int], ) -> int: sync_start = start if start is not None else self.zero_value - prev_sync_low_water_mark = ( + prev_sync_low_water_mark: Optional[int] = ( stream_state[cursor_field.cursor_field_key] if cursor_field.cursor_field_key in stream_state else None From 1d862c17738bf25b824dcf854acf96037f56582f Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 24 Feb 2025 15:07:58 +0100 Subject: [PATCH 08/18] CDK: fix mypy Signed-off-by: Artem Inzhyyants --- .../declarative/parsers/model_to_component_factory.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 55598f30b..16922798e 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1230,7 +1230,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor( interpolated_start_value = ( InterpolatedString.create( - incrementing_count_cursor_model.start_value, + incrementing_count_cursor_model.start_value, # type: ignore parameters=incrementing_count_cursor_model.parameters or {}, ) if incrementing_count_cursor_model.start_value @@ -1703,15 +1703,15 @@ def create_declarative_stream( elif model.incremental_sync and isinstance( model.incremental_sync, IncrementingCountCursorModel ): - cursor_model = model.incremental_sync + cursor_model: IncrementingCountCursorModel = model.incremental_sync # type: ignore start_time_option = ( self._create_component_from_model( - cursor_model.start_value_option, + cursor_model.start_value_option, # type: ignore: mypy still thinks cursor_model of type DatetimeBasedCursor config, parameters=cursor_model.parameters or {}, ) - if cursor_model.start_value_option + if cursor_model.start_value_option # type: ignore: mypy still thinks cursor_model of type DatetimeBasedCursor else None ) From d6eae1490cb869e1452fc287a45b61d13ec79d69 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 24 Feb 2025 15:34:13 +0100 Subject: [PATCH 09/18] CDK: fix mypy Signed-off-by: Artem Inzhyyants --- .../sources/declarative/parsers/model_to_component_factory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 16922798e..91e98e3f5 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1707,11 +1707,11 @@ def create_declarative_stream( start_time_option = ( self._create_component_from_model( - cursor_model.start_value_option, # type: ignore: mypy still thinks cursor_model of type DatetimeBasedCursor + cursor_model.start_value_option, # type: ignore # mypy still thinks cursor_model of type DatetimeBasedCursor config, parameters=cursor_model.parameters or {}, ) - if cursor_model.start_value_option # type: ignore: mypy still thinks cursor_model of type DatetimeBasedCursor + if cursor_model.start_value_option # type: ignore # mypy still thinks cursor_model of type DatetimeBasedCursor else None ) From 710698ea3d6eb594b3b90c3fd8e812a3d51b5c09 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 24 Feb 2025 18:14:30 +0100 Subject: [PATCH 10/18] CDK: ref Signed-off-by: Artem Inzhyyants --- .../sources/declarative/concurrent_declarative_source.py | 1 - .../declarative/parsers/model_to_component_factory.py | 7 ------- 2 files changed, 8 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 5006c9d81..3bfaa69cd 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -257,7 +257,6 @@ def _group_streams( stream_slicer=declarative_stream.retriever.stream_slicer, ) else: - # cursor: ConcurrentCursor if ( incremental_sync_component_definition and incremental_sync_component_definition.get("type") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 91e98e3f5..47cd00f42 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1243,13 +1243,6 @@ def create_concurrent_cursor_from_incrementing_count_cursor( ) cursor_field = CursorField(interpolated_cursor_field.eval(config=config)) - # todo brian: what do we do about the state converter, is there a way to signal valid integer and what - # should the end provider be when the end time is an unknown uncapped value? - - # hmmmmmm what if we create a new uncapped_end_date_provider which gives back infinite and that indicates - # to the concurrent cursor to return an empty slice interval and we create a connector_state_converter - # that basically just returns an integer - connector_state_converter = IncrementingCountStreamStateConverter( is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state ) From 394dc6bbad172b1925a300ce73d93662aa99a3c8 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 26 Feb 2025 14:34:32 +0100 Subject: [PATCH 11/18] CDK: rename method Signed-off-by: Artem Inzhyyants --- .../sources/declarative/concurrent_declarative_source.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 3bfaa69cd..c92ffb150 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -225,7 +225,7 @@ def _group_streams( and not incremental_sync_component_definition ) - if self._is_datetime_incremental_without_partition_routing( + if self._is_concurrent_cursor_incremental_without_partition_routing( declarative_stream, incremental_sync_component_definition ): stream_state = self._connector_state_manager.get_stream_state( @@ -403,7 +403,7 @@ def _group_streams( return concurrent_streams, synchronous_streams - def _is_datetime_incremental_without_partition_routing( + def _is_concurrent_cursor_incremental_without_partition_routing( self, declarative_stream: DeclarativeStream, incremental_sync_component_definition: Mapping[str, Any] | None, @@ -419,7 +419,7 @@ def _is_datetime_incremental_without_partition_routing( and ( isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor - # add isntance check here if we want to have a IncrementingCountCursor + # add isintance check here if we want to create a Declarative IncrementingCountCursor # or isinstance( # declarative_stream.retriever.stream_slicer, IncrementingCountCursor # ) From cf170d5ff6f4d83c78b26d333c8c9aaf4da48732 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 26 Feb 2025 14:52:12 +0100 Subject: [PATCH 12/18] CDK: ref Signed-off-by: Artem Inzhyyants --- .../sources/declarative/declarative_component_schema.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 7e6af7a60..d2a68d8d1 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -779,7 +779,7 @@ definitions: enum: [LegacyToPerPartitionStateMigration] IncrementingCountCursor: title: Incrementing Count Cursor - description: Cursor that allows for incremental sync according to a continuously increasing number + description: Cursor that allows for incremental sync according to a continuously increasing integer. type: object required: - type From d4f5d37665daaafa510830d8545cf6113ec181e6 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 27 Feb 2025 11:27:35 +0100 Subject: [PATCH 13/18] CDK: ref Signed-off-by: Artem Inzhyyants --- .../sources/declarative/declarative_component_schema.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index d2a68d8d1..be7224a63 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -810,7 +810,7 @@ definitions: - "{{ config['start_value'] }}" start_value_option: title: Inject Start Value Into Outgoing HTTP Request - description: Optionally configures how the start datetime will be sent in requests to the source API. + description: Optionally configures how the start value will be sent in requests to the source API. "$ref": "#/definitions/RequestOption" $parameters: type: object From 0cddc3a4069f44468aaa99a88fc49b0b6539544a Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 27 Feb 2025 11:46:14 +0100 Subject: [PATCH 14/18] CDK: ref Signed-off-by: Artem Inzhyyants --- .../sources/declarative/models/declarative_component_schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 3203f3821..c1e4d95c3 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1524,7 +1524,7 @@ class IncrementingCountCursor(BaseModel): ) start_value_option: Optional[RequestOption] = Field( None, - description="Optionally configures how the start datetime will be sent in requests to the source API.", + description="Optionally configures how the start value will be sent in requests to the source API.", title="Inject Start Value Into Outgoing HTTP Request", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") From 263e6626b79d79ad491ab3ca3b6e506c33a59b08 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 27 Feb 2025 18:18:32 +0100 Subject: [PATCH 15/18] CDK: add test for concurrent sate cursor Signed-off-by: Artem Inzhyyants --- .../test_concurrent_declarative_source.py | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 188256b10..db016ffc6 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -4,6 +4,7 @@ import copy import json +import math from datetime import datetime, timedelta, timezone from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union from unittest.mock import patch @@ -43,6 +44,9 @@ from airbyte_cdk.sources.streams.checkpoint import Cursor from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream +from airbyte_cdk.sources.streams.concurrent.state_converters.incrementing_count_stream_state_converter import ( + IncrementingCountStreamStateConverter, +) from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.types import Record, StreamSlice from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -230,6 +234,16 @@ "inject_into": "request_parameter", }, }, + "incremental_counting_cursor": { + "type": "IncrementingCountCursor", + "cursor_field": "id", + "start_value": 0, + "start_time_option": { + "type": "RequestOption", + "field_name": "since_id", + "inject_into": "request_parameter", + }, + }, "base_stream": {"retriever": {"$ref": "#/definitions/retriever"}}, "base_incremental_stream": { "retriever": { @@ -238,6 +252,13 @@ }, "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, }, + "base_incremental_counting_stream": { + "retriever": { + "$ref": "#/definitions/retriever", + "requester": {"$ref": "#/definitions/requester"}, + }, + "incremental_sync": {"$ref": "#/definitions/incremental_counting_cursor"}, + }, "party_members_stream": { "$ref": "#/definitions/base_incremental_stream", "retriever": { @@ -527,6 +548,35 @@ }, }, }, + "incremental_counting_stream": { + "$ref": "#/definitions/base_incremental_counting_stream", + "retriever": { + "$ref": "#/definitions/base_incremental_counting_stream/retriever", + "record_selector": {"$ref": "#/definitions/selector"}, + }, + "$parameters": { + "name": "incremental_counting_stream", + "primary_key": "id", + "path": "/party_members", + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + "name": { + "description": "The name of the party member", + "type": ["null", "string"], + }, + }, + }, + }, + }, }, "streams": [ "#/definitions/party_members_stream", @@ -536,6 +586,7 @@ "#/definitions/arcana_personas_stream", "#/definitions/palace_enemies_stream", "#/definitions/async_job_stream", + "#/definitions/incremental_counting_stream", ], "check": {"stream_names": ["party_members", "locations"]}, "concurrency_level": { @@ -756,6 +807,20 @@ def test_create_concurrent_cursor(): "state_type": "date-range", } + incremental_counting_stream = concurrent_streams[7] + assert isinstance(incremental_counting_stream, DefaultStream) + incremental_counting_cursor = incremental_counting_stream.cursor + + assert isinstance(incremental_counting_cursor, ConcurrentCursor) + assert isinstance( + incremental_counting_cursor._connector_state_converter, + IncrementingCountStreamStateConverter, + ) + assert incremental_counting_cursor._stream_name == "incremental_counting_stream" + assert incremental_counting_cursor._cursor_field.cursor_field_key == "id" + assert incremental_counting_cursor._start == 0 + assert incremental_counting_cursor._end_provider() == math.inf + def test_check(): """ From 075ac16fcfd9f6c4c0d4c62b21d9e3322f77c62e Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 27 Feb 2025 19:13:46 +0100 Subject: [PATCH 16/18] CDK: fix test Signed-off-by: Artem Inzhyyants --- .../sources/declarative/test_concurrent_declarative_source.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index db016ffc6..9b17ef0ec 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -709,9 +709,9 @@ def test_group_streams(): ) concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) - # 1 full refresh stream, 2 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental + # 1 full refresh stream, 3 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental # 1 async job stream, 1 substream w/ incremental - assert len(concurrent_streams) == 7 + assert len(concurrent_streams) == 8 ( concurrent_stream_0, concurrent_stream_1, From 6fe3d638affb593b984a68b49bd2f920c72adf47 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 27 Feb 2025 23:05:49 +0100 Subject: [PATCH 17/18] CDK: fix test Signed-off-by: Artem Inzhyyants --- .../sources/declarative/test_concurrent_declarative_source.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 9b17ef0ec..e168be104 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -720,6 +720,7 @@ def test_group_streams(): concurrent_stream_4, concurrent_stream_5, concurrent_stream_6, + concurrent_stream_7, ) = concurrent_streams assert isinstance(concurrent_stream_0, DefaultStream) assert concurrent_stream_0.name == "party_members" @@ -735,6 +736,8 @@ def test_group_streams(): assert concurrent_stream_5.name == "palace_enemies" assert isinstance(concurrent_stream_6, DefaultStream) assert concurrent_stream_6.name == "async_job_stream" + assert isinstance(concurrent_stream_7, DefaultStream) + assert concurrent_stream_7.name == "incremental_counting_stream" @freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) From bc8c5838d75b5e6b81f988e1e64869ded0f4003e Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 27 Feb 2025 23:10:16 +0100 Subject: [PATCH 18/18] CDK: fix test Signed-off-by: Artem Inzhyyants --- .../sources/declarative/test_concurrent_declarative_source.py | 1 + 1 file changed, 1 insertion(+) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index e168be104..45813c4f2 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -876,6 +876,7 @@ def test_discover(): "arcana_personas", "palace_enemies", "async_job_stream", + "incremental_counting_stream", } source = ConcurrentDeclarativeSource(