diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index c92ffb150..99cb37cd5 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -162,6 +162,10 @@ def read( else: filtered_catalog = catalog + # It is no need run read for synchronous streams if they are not exists. + if not filtered_catalog.streams: + return + yield from super().read(logger, config, filtered_catalog, state) def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: @@ -201,6 +205,18 @@ def _group_streams( # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, # so we need to treat them as synchronous + + if name_to_stream_mapping[declarative_stream.name]["type"] == "StateDelegatingStream": + stream_state = self._connector_state_manager.get_stream_state( + stream_name=declarative_stream.name, namespace=declarative_stream.namespace + ) + + name_to_stream_mapping[declarative_stream.name] = ( + name_to_stream_mapping[declarative_stream.name]["incremental_stream"] + if stream_state + else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"] + ) + if isinstance(declarative_stream, DeclarativeStream) and ( name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] == "SimpleRetriever" diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 6cd9998c7..18b1f7ec9 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -24,7 +24,9 @@ properties: streams: type: array items: - "$ref": "#/definitions/DeclarativeStream" + anyOf: + - "$ref": "#/definitions/DeclarativeStream" + - "$ref": "#/definitions/StateDelegatingStream" dynamic_streams: type: array items: @@ -2877,7 +2879,9 @@ definitions: stream: title: Parent Stream description: Reference to the parent stream. - "$ref": "#/definitions/DeclarativeStream" + anyOf: + - "$ref": "#/definitions/DeclarativeStream" + - "$ref": "#/definitions/StateDelegatingStream" partition_field: title: Current Parent Key Value Identifier description: While iterating over parent records during a sync, the parent_key value can be referenced by using this field. @@ -3150,6 +3154,36 @@ definitions: $parameters: type: object additionalProperties: true + StateDelegatingStream: + description: (This component is experimental. Use at your own risk.) Orchestrate the retriever's usage based on the state value. + type: object + required: + - type + - name + - full_refresh_stream + - incremental_stream + properties: + type: + type: string + enum: [ StateDelegatingStream ] + name: + title: Name + description: The stream name. + type: string + default: "" + example: + - "Users" + full_refresh_stream: + title: Retriever + description: Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided. + "$ref": "#/definitions/DeclarativeStream" + incremental_stream: + title: Retriever + description: Component used to coordinate how records are extracted across stream slices and request pages when the state provided. + "$ref": "#/definitions/DeclarativeStream" + $parameters: + type: object + additionalProperties: true SimpleRetriever: description: Retrieves records by synchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router. type: object diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index d3afb1396..e9ce991b5 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -30,6 +30,9 @@ DeclarativeStream as DeclarativeStreamModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + StateDelegatingStream as StateDelegatingStreamModel, +) from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( get_registered_components_module, ) @@ -143,7 +146,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: source_streams = [ self._constructor.create_component( - DeclarativeStreamModel, + StateDelegatingStreamModel + if stream_config.get("type") == StateDelegatingStreamModel.__name__ + else DeclarativeStreamModel, stream_config, config, emit_connector_builder_messages=self._emit_connector_builder_messages, @@ -162,7 +167,15 @@ def _initialize_cache_for_parent_streams( def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None: for parent_config in parent_configs: parent_streams.add(parent_config["stream"]["name"]) - parent_config["stream"]["retriever"]["requester"]["use_cache"] = True + if parent_config["stream"]["type"] == "StateDelegatingStream": + parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ + "use_cache" + ] = True + parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ + "use_cache" + ] = True + else: + parent_config["stream"]["retriever"]["requester"]["use_cache"] = True for stream_config in stream_configs: if stream_config.get("incremental_sync", {}).get("parent_stream"): @@ -185,7 +198,15 @@ def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> No for stream_config in stream_configs: if stream_config["name"] in parent_streams: - stream_config["retriever"]["requester"]["use_cache"] = True + if stream_config["type"] == "StateDelegatingStream": + stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( + True + ) + stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( + True + ) + else: + stream_config["retriever"]["requester"]["use_cache"] = True return stream_configs diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index a49b66c03..9d6a93c21 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1860,7 +1860,7 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: List[DeclarativeStream] + streams: List[Union[DeclarativeStream, StateDelegatingStream]] dynamic_streams: Optional[List[DynamicDeclarativeStream]] = None version: str = Field( ..., @@ -1887,7 +1887,7 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[List[DeclarativeStream]] = None + streams: Optional[List[Union[DeclarativeStream, StateDelegatingStream]]] = None dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2201,7 +2201,7 @@ class ParentStreamConfig(BaseModel): examples=["id", "{{ config['parent_record_id'] }}"], title="Parent Key", ) - stream: DeclarativeStream = Field( + stream: Union[DeclarativeStream, StateDelegatingStream] = Field( ..., description="Reference to the parent stream.", title="Parent Stream" ) partition_field: str = Field( @@ -2228,6 +2228,22 @@ class ParentStreamConfig(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class StateDelegatingStream(BaseModel): + type: Literal["StateDelegatingStream"] + name: str = Field(..., description="The stream name.", example=["Users"], title="Name") + full_refresh_stream: DeclarativeStream = Field( + ..., + description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", + title="Retriever", + ) + incremental_stream: DeclarativeStream = Field( + ..., + description="Component used to coordinate how records are extracted across stream slices and request pages when the state provided.", + title="Retriever", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class SimpleRetriever(BaseModel): type: Literal["SimpleRetriever"] record_selector: RecordSelector = Field( @@ -2413,5 +2429,6 @@ class DynamicDeclarativeStream(BaseModel): DeclarativeStream.update_forward_refs() SessionTokenAuthenticator.update_forward_refs() DynamicSchemaLoader.update_forward_refs() +ParentStreamConfig.update_forward_refs() SimpleRetriever.update_forward_refs() AsyncRetriever.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 1c2289c17..232824425 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -351,6 +351,9 @@ SimpleRetriever as SimpleRetrieverModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + StateDelegatingStream as StateDelegatingStreamModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( StreamConfig as StreamConfigModel, ) @@ -615,6 +618,7 @@ def _init_mappings(self) -> None: LegacySessionTokenAuthenticatorModel: self.create_legacy_session_token_authenticator, SelectiveAuthenticatorModel: self.create_selective_authenticator, SimpleRetrieverModel: self.create_simple_retriever, + StateDelegatingStreamModel: self.create_state_delegating_stream, SpecModel: self.create_spec, SubstreamPartitionRouterModel: self.create_substream_partition_router, WaitTimeFromHeaderModel: self.create_wait_time_from_header, @@ -1771,8 +1775,13 @@ def create_declarative_stream( def _build_stream_slicer_from_partition_router( self, - model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel], + model: Union[ + AsyncRetrieverModel, + CustomRetrieverModel, + SimpleRetrieverModel, + ], config: Config, + stream_name: Optional[str] = None, ) -> Optional[PartitionRouter]: if ( hasattr(model, "partition_router") @@ -1780,95 +1789,65 @@ def _build_stream_slicer_from_partition_router( and model.partition_router ): stream_slicer_model = model.partition_router - if isinstance(stream_slicer_model, list): return CartesianProductStreamSlicer( [ - self._create_component_from_model(model=slicer, config=config) + self._create_component_from_model( + model=slicer, config=config, stream_name=stream_name or "" + ) for slicer in stream_slicer_model ], parameters={}, ) else: - return self._create_component_from_model(model=stream_slicer_model, config=config) # type: ignore[no-any-return] - # Will be created PartitionRouter as stream_slicer_model is model.partition_router + return self._create_component_from_model( # type: ignore[no-any-return] # Will be created PartitionRouter as stream_slicer_model is model.partition_router + model=stream_slicer_model, config=config, stream_name=stream_name or "" + ) return None - def _build_resumable_cursor_from_paginator( + def _build_incremental_cursor( self, - model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel], - stream_slicer: Optional[StreamSlicer], - ) -> Optional[StreamSlicer]: - if hasattr(model, "paginator") and model.paginator and not stream_slicer: - # For the regular Full-Refresh streams, we use the high level `ResumableFullRefreshCursor` - return ResumableFullRefreshCursor(parameters={}) - return None - - def _merge_stream_slicers( - self, model: DeclarativeStreamModel, config: Config + model: DeclarativeStreamModel, + stream_slicer: Optional[PartitionRouter], + config: Config, ) -> Optional[StreamSlicer]: - stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config) - if model.incremental_sync and stream_slicer: if model.retriever.type == "AsyncRetriever": - if model.incremental_sync.type != "DatetimeBasedCursor": - # We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the support or unordered slices (for example, when we trigger reports for January and February, the report in February can be completed first). Once we have support for custom concurrent cursor or have a new implementation available in the CDK, we can enable more cursors here. - raise ValueError( - "AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet" - ) - if stream_slicer: - return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing - state_manager=self._connector_state_manager, - model_type=DatetimeBasedCursorModel, - component_definition=model.incremental_sync.__dict__, - stream_name=model.name or "", - stream_namespace=None, - config=config or {}, - stream_state={}, - partition_router=stream_slicer, - ) - return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing + return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing + state_manager=self._connector_state_manager, model_type=DatetimeBasedCursorModel, component_definition=model.incremental_sync.__dict__, stream_name=model.name or "", stream_namespace=None, config=config or {}, + stream_state={}, + partition_router=stream_slicer, ) incremental_sync_model = model.incremental_sync - if ( + cursor_component = self._create_component_from_model( + model=incremental_sync_model, config=config + ) + is_global_cursor = ( hasattr(incremental_sync_model, "global_substream_cursor") and incremental_sync_model.global_substream_cursor - ): - cursor_component = self._create_component_from_model( - model=incremental_sync_model, config=config - ) + ) + + if is_global_cursor: return GlobalSubstreamCursor( stream_cursor=cursor_component, partition_router=stream_slicer ) - else: - cursor_component = self._create_component_from_model( - model=incremental_sync_model, config=config - ) - return PerPartitionWithGlobalCursor( - cursor_factory=CursorFactory( - lambda: self._create_component_from_model( - model=incremental_sync_model, config=config - ), + return PerPartitionWithGlobalCursor( + cursor_factory=CursorFactory( + lambda: self._create_component_from_model( + model=incremental_sync_model, config=config ), - partition_router=stream_slicer, - stream_cursor=cursor_component, - ) + ), + partition_router=stream_slicer, + stream_cursor=cursor_component, + ) elif model.incremental_sync: if model.retriever.type == "AsyncRetriever": - if model.incremental_sync.type != "DatetimeBasedCursor": - # We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the support or unordered slices (for example, when we trigger reports for January and February, the report in February can be completed first). Once we have support for custom concurrent cursor or have a new implementation available in the CDK, we can enable more cursors here. - raise ValueError( - "AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet" - ) - if model.retriever.partition_router: - # Note that this development is also done in parallel to the per partition development which once merged we could support here by calling `create_concurrent_cursor_from_perpartition_cursor` - raise ValueError("Per partition state is not supported yet for AsyncRetriever") return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing model_type=DatetimeBasedCursorModel, component_definition=model.incremental_sync.__dict__, @@ -1877,13 +1856,21 @@ def _merge_stream_slicers( config=config or {}, stream_state_migrations=model.state_migrations, ) - return ( - self._create_component_from_model(model=model.incremental_sync, config=config) - if model.incremental_sync - else None - ) - elif self._disable_resumable_full_refresh: - return stream_slicer + return self._create_component_from_model(model=model.incremental_sync, config=config) # type: ignore[no-any-return] # Will be created Cursor as stream_slicer_model is model.incremental_sync + return None + + def _build_resumable_cursor( + self, + model: Union[ + AsyncRetrieverModel, + CustomRetrieverModel, + SimpleRetrieverModel, + ], + stream_slicer: Optional[PartitionRouter], + ) -> Optional[StreamSlicer]: + if hasattr(model, "paginator") and model.paginator and not stream_slicer: + # For the regular Full-Refresh streams, we use the high level `ResumableFullRefreshCursor` + return ResumableFullRefreshCursor(parameters={}) elif stream_slicer: # For the Full-Refresh sub-streams, we use the nested `ChildPartitionResumableFullRefreshCursor` return PerPartitionCursor( @@ -1892,7 +1879,47 @@ def _merge_stream_slicers( ), partition_router=stream_slicer, ) - return self._build_resumable_cursor_from_paginator(model.retriever, stream_slicer) + return None + + def _merge_stream_slicers( + self, model: DeclarativeStreamModel, config: Config + ) -> Optional[StreamSlicer]: + retriever_model = model.retriever + + if retriever_model.type == "AsyncRetriever": + is_not_datetime_cursor = ( + model.incremental_sync.type != "DatetimeBasedCursor" + if model.incremental_sync + else None + ) + is_partition_router = ( + bool(retriever_model.partition_router) if model.incremental_sync else None + ) + + if is_not_datetime_cursor: + # We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the + # support or unordered slices (for example, when we trigger reports for January and February, the report + # in February can be completed first). Once we have support for custom concurrent cursor or have a new + # implementation available in the CDK, we can enable more cursors here. + raise ValueError( + "AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet." + ) + + if is_partition_router: + # Note that this development is also done in parallel to the per partition development which once merged + # we could support here by calling create_concurrent_cursor_from_perpartition_cursor + raise ValueError("Per partition state is not supported yet for AsyncRetriever.") + + stream_slicer = self._build_stream_slicer_from_partition_router(retriever_model, config) + + if model.incremental_sync: + return self._build_incremental_cursor(model, stream_slicer, config) + + return ( + stream_slicer + if self._disable_resumable_full_refresh + else self._build_resumable_cursor(retriever_model, stream_slicer) + ) def create_default_error_handler( self, model: DefaultErrorHandlerModel, config: Config, **kwargs: Any @@ -2153,9 +2180,7 @@ def create_dynamic_schema_loader( self, model: DynamicSchemaLoaderModel, config: Config, **kwargs: Any ) -> DynamicSchemaLoader: stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config) - combined_slicers = self._build_resumable_cursor_from_paginator( - model.retriever, stream_slicer - ) + combined_slicers = self._build_resumable_cursor(model.retriever, stream_slicer) schema_transformations = [] if model.schema_transformations: @@ -2456,7 +2481,9 @@ def create_page_increment( def create_parent_stream_config( self, model: ParentStreamConfigModel, config: Config, **kwargs: Any ) -> ParentStreamConfig: - declarative_stream = self._create_component_from_model(model.stream, config=config) + declarative_stream = self._create_component_from_model( + model.stream, config=config, **kwargs + ) request_option = ( self._create_component_from_model(model.request_option, config=config) if model.request_option @@ -2697,6 +2724,29 @@ def create_simple_retriever( parameters=model.parameters or {}, ) + def create_state_delegating_stream( + self, + model: StateDelegatingStreamModel, + config: Config, + has_parent_state: Optional[bool] = None, + **kwargs: Any, + ) -> DeclarativeStream: + if ( + model.full_refresh_stream.name != model.name + or model.name != model.incremental_stream.name + ): + raise ValueError( + f"state_delegating_stream, full_refresh_stream name and incremental_stream must have equal names. Instead has {model.name}, {model.full_refresh_stream.name} and {model.incremental_stream.name}." + ) + + stream_model = ( + model.incremental_stream + if self._connector_state_manager.get_stream_state(model.name, None) or has_parent_state + else model.full_refresh_stream + ) + + return self._create_component_from_model(stream_model, config=config, **kwargs) # type: ignore[no-any-return] # Will be created DeclarativeStream as stream_model is stream description + def _create_async_job_status_mapping( self, model: AsyncJobStatusMapModel, config: Config, **kwargs: Any ) -> Mapping[str, AsyncJobStatus]: @@ -2923,7 +2973,7 @@ def create_substream_partition_router( parent_stream_configs.extend( [ self._create_message_repository_substream_wrapper( - model=parent_stream_config, config=config + model=parent_stream_config, config=config, **kwargs ) for parent_stream_config in model.parent_stream_configs ] @@ -2936,7 +2986,7 @@ def create_substream_partition_router( ) def _create_message_repository_substream_wrapper( - self, model: ParentStreamConfigModel, config: Config + self, model: ParentStreamConfigModel, config: Config, **kwargs: Any ) -> Any: substream_factory = ModelToComponentFactory( limit_pages_fetched_per_slice=self._limit_pages_fetched_per_slice, @@ -2950,7 +3000,16 @@ def _create_message_repository_substream_wrapper( self._evaluate_log_level(self._emit_connector_builder_messages), ), ) - return substream_factory._create_component_from_model(model=model, config=config) + + # This flag will be used exclusively for StateDelegatingStream when a parent stream is created + has_parent_state = bool( + self._connector_state_manager.get_stream_state(kwargs.get("stream_name", ""), None) + if model.incremental_dependency + else False + ) + return substream_factory._create_component_from_model( + model=model, config=config, has_parent_state=has_parent_state, **kwargs + ) @staticmethod def create_wait_time_from_header( @@ -3006,9 +3065,7 @@ def create_http_components_resolver( self, model: HttpComponentsResolverModel, config: Config ) -> Any: stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config) - combined_slicers = self._build_resumable_cursor_from_paginator( - model.retriever, stream_slicer - ) + combined_slicers = self._build_resumable_cursor(model.retriever, stream_slicer) retriever = self._create_component_from_model( model=model.retriever, diff --git a/airbyte_cdk/sources/declarative/retrievers/__init__.py b/airbyte_cdk/sources/declarative/retrievers/__init__.py index 177d141a3..2c3bbc7d5 100644 --- a/airbyte_cdk/sources/declarative/retrievers/__init__.py +++ b/airbyte_cdk/sources/declarative/retrievers/__init__.py @@ -9,4 +9,9 @@ SimpleRetrieverTestReadDecorator, ) -__all__ = ["Retriever", "SimpleRetriever", "SimpleRetrieverTestReadDecorator", "AsyncRetriever"] +__all__ = [ + "Retriever", + "SimpleRetriever", + "SimpleRetrieverTestReadDecorator", + "AsyncRetriever", +] diff --git a/unit_tests/sources/declarative/test_state_delegating_stream.py b/unit_tests/sources/declarative/test_state_delegating_stream.py new file mode 100644 index 000000000..1239fe653 --- /dev/null +++ b/unit_tests/sources/declarative/test_state_delegating_stream.py @@ -0,0 +1,255 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +import json +from unittest.mock import MagicMock + +import freezegun + +from airbyte_cdk.models import ( + AirbyteStateBlob, + AirbyteStateMessage, + AirbyteStateType, + AirbyteStreamState, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + StreamDescriptor, + Type, +) +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse + +_CONFIG = {"start_date": "2024-07-01T00:00:00.000Z"} +_MANIFEST = { + "version": "6.0.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["TestStream"]}, + "definitions": { + "TestStream": { + "type": "StateDelegatingStream", + "name": "TestStream", + "full_refresh_stream": { + "type": "DeclarativeStream", + "name": "TestStream", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": {}, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "/items", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "start_datetime": { + "datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}" + }, + "end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"}, + "datetime_format": "%Y-%m-%d", + "cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"], + "cursor_field": "updated_at", + }, + }, + "incremental_stream": { + "type": "DeclarativeStream", + "name": "TestStream", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": {}, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "/items_with_filtration", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "start_datetime": { + "datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}" + }, + "end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"}, + "datetime_format": "%Y-%m-%d", + "cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"], + "cursor_granularity": "P1D", + "step": "P15D", + "cursor_field": "updated_at", + "start_time_option": { + "type": "RequestOption", + "field_name": "start", + "inject_into": "request_parameter", + }, + "end_time_option": { + "type": "RequestOption", + "field_name": "end", + "inject_into": "request_parameter", + }, + }, + }, + }, + }, + "streams": [{"$ref": "#/definitions/TestStream"}], + "spec": { + "connection_specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": [], + "properties": {}, + "additionalProperties": True, + }, + "documentation_url": "https://example.org", + "type": "Spec", + }, +} + + +def to_configured_stream( + stream, + sync_mode=None, + destination_sync_mode=DestinationSyncMode.append, + cursor_field=None, + primary_key=None, +) -> ConfiguredAirbyteStream: + return ConfiguredAirbyteStream( + stream=stream, + sync_mode=sync_mode, + destination_sync_mode=destination_sync_mode, + cursor_field=cursor_field, + primary_key=primary_key, + ) + + +def to_configured_catalog( + configured_streams, +) -> ConfiguredAirbyteCatalog: + return ConfiguredAirbyteCatalog(streams=configured_streams) + + +def create_configured_catalog( + source: ConcurrentDeclarativeSource, config: dict +) -> ConfiguredAirbyteCatalog: + """ + Discovers streams from the source and converts them into a configured catalog. + """ + actual_catalog = source.discover(logger=source.logger, config=config) + configured_streams = [ + to_configured_stream(stream, primary_key=stream.source_defined_primary_key) + for stream in actual_catalog.streams + ] + return to_configured_catalog(configured_streams) + + +def get_records( + source: ConcurrentDeclarativeSource, + config: dict, + catalog: ConfiguredAirbyteCatalog, + state: list = None, +) -> list: + """ + Reads records from the source given a configuration, catalog, and optional state. + Returns a list of record data dictionaries. + """ + return [ + message.record.data + for message in source.read(logger=MagicMock(), config=config, catalog=catalog, state=state) + if message.type == Type.RECORD + ] + + +@freezegun.freeze_time("2024-07-15") +def test_full_refresh_retriever(): + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items"), + HttpResponse( + body=json.dumps( + [ + {"id": 1, "name": "item_1", "updated_at": "2024-07-13"}, + {"id": 2, "name": "item_2", "updated_at": "2024-07-13"}, + ] + ) + ), + ) + + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + + # Test full data retrieval (without state) + full_records = get_records(source, _CONFIG, configured_catalog) + expected_full = [ + {"id": 1, "name": "item_1", "updated_at": "2024-07-13"}, + {"id": 2, "name": "item_2", "updated_at": "2024-07-13"}, + ] + assert expected_full == full_records + + +@freezegun.freeze_time("2024-07-15") +def test_incremental_retriever(): + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest( + url="https://api.test.com/items_with_filtration?start=2024-07-13&end=2024-07-15" + ), + HttpResponse( + body=json.dumps( + [ + {"id": 3, "name": "item_3", "updated_at": "2024-02-01"}, + {"id": 4, "name": "item_4", "updated_at": "2024-02-01"}, + ] + ) + ), + ) + + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="TestStream", namespace=None), + stream_state=AirbyteStateBlob(updated_at="2024-07-13"), + ), + ) + ] + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=None, state=state + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + + # Test incremental data retrieval (with state) + incremental_records = get_records(source, _CONFIG, configured_catalog, state) + expected_incremental = [ + {"id": 3, "name": "item_3", "updated_at": "2024-02-01"}, + {"id": 4, "name": "item_4", "updated_at": "2024-02-01"}, + ] + assert expected_incremental == incremental_records