diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 96bd67c32..7419b9dda 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -24,7 +24,6 @@ from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( PerPartitionWithGlobalCursor, ) -from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConcurrencyLevel as ConcurrencyLevelModel, @@ -36,13 +35,11 @@ ModelToComponentFactory, ) from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter -from airbyte_cdk.sources.declarative.requesters import HttpRequester from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( DeclarativePartitionFactory, StreamSlicerPartitionGenerator, ) -from airbyte_cdk.sources.declarative.transformations.add_fields import AddFields from airbyte_cdk.sources.declarative.types import ConnectionDefinition from airbyte_cdk.sources.source import TState from airbyte_cdk.sources.streams import Stream @@ -321,9 +318,6 @@ def _group_streams( incremental_sync_component_definition and incremental_sync_component_definition.get("type", "") == DatetimeBasedCursorModel.__name__ - and self._stream_supports_concurrent_partition_processing( - declarative_stream=declarative_stream - ) and hasattr(declarative_stream.retriever, "stream_slicer") and isinstance( declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor @@ -390,9 +384,6 @@ def _is_datetime_incremental_without_partition_routing( and bool(incremental_sync_component_definition) and incremental_sync_component_definition.get("type", "") == DatetimeBasedCursorModel.__name__ - and self._stream_supports_concurrent_partition_processing( - declarative_stream=declarative_stream - ) and hasattr(declarative_stream.retriever, "stream_slicer") and ( isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) @@ -400,72 +391,6 @@ def _is_datetime_incremental_without_partition_routing( ) ) - def _stream_supports_concurrent_partition_processing( - self, declarative_stream: DeclarativeStream - ) -> bool: - """ - Many connectors make use of stream_state during interpolation on a per-partition basis under the assumption that - state is updated sequentially. Because the concurrent CDK engine processes different partitions in parallel, - stream_state is no longer a thread-safe interpolation context. It would be a race condition because a cursor's - stream_state can be updated in any order depending on which stream partition's finish first. - - We should start to move away from depending on the value of stream_state for low-code components that operate - per-partition, but we need to gate this otherwise some connectors will be blocked from publishing. See the - cdk-migrations.md for the full list of connectors. - """ - - if isinstance(declarative_stream.retriever, SimpleRetriever) and isinstance( - declarative_stream.retriever.requester, HttpRequester - ): - http_requester = declarative_stream.retriever.requester - if "stream_state" in http_requester._path.string: - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing" - ) - return False - - request_options_provider = http_requester._request_options_provider - if request_options_provider.request_options_contain_stream_state(): - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing" - ) - return False - - record_selector = declarative_stream.retriever.record_selector - if isinstance(record_selector, RecordSelector): - if ( - record_selector.record_filter - and not isinstance( - record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator - ) - and "stream_state" in record_selector.record_filter.condition - ): - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the RecordFilter which is not thread-safe. Defaulting to synchronous processing" - ) - return False - - for add_fields in [ - transformation - for transformation in record_selector.transformations - if isinstance(transformation, AddFields) - ]: - for field in add_fields.fields: - if isinstance(field.value, str) and "stream_state" in field.value: - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe. Defaulting to synchronous processing" - ) - return False - if ( - isinstance(field.value, InterpolatedString) - and "stream_state" in field.value.string - ): - self.logger.warning( - f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe. Defaulting to synchronous processing" - ) - return False - return True - @staticmethod def _get_retriever( declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 55a6e02d9..b4eef5f03 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -82,7 +82,6 @@ definitions: - stream_interval - stream_partition - stream_slice - - stream_state examples: - "{{ record['updates'] }}" - "{{ record['MetaData']['LastUpdatedTime'] }}" @@ -1776,7 +1775,6 @@ definitions: - stream_interval - stream_partition - stream_slice - - stream_state examples: - "/products" - "/quotes/{{ stream_partition['id'] }}/quote_line_groups" @@ -1826,7 +1824,6 @@ definitions: - stream_interval - stream_partition - stream_slice - - stream_state examples: - | [{"clause": {"type": "timestamp", "operator": 10, "parameters": @@ -1844,7 +1841,6 @@ definitions: - stream_interval - stream_partition - stream_slice - - stream_state examples: - sort_order: "ASC" sort_field: "CREATED_AT" @@ -1865,7 +1861,6 @@ definitions: - stream_interval - stream_partition - stream_slice - - stream_state examples: - Output-Format: JSON - Version: "{{ config['version'] }}" @@ -1882,7 +1877,6 @@ definitions: - stream_interval - stream_partition - stream_slice - - stream_state examples: - unit: "day" - query: 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' @@ -2237,7 +2231,6 @@ definitions: interpolation_context: - config - record - - stream_state - stream_slice new: type: string @@ -2251,7 +2244,6 @@ definitions: interpolation_context: - config - record - - stream_state - stream_slice $parameters: type: object @@ -2901,7 +2893,6 @@ definitions: - stream_interval - stream_partition - stream_slice - - stream_state examples: - "{{ record['created_at'] >= stream_interval['start_time'] }}" - "{{ record.status in ['active', 'expired'] }}" @@ -3689,12 +3680,6 @@ interpolation: - title: stream_slice description: This variable is deprecated. Use stream_interval or stream_partition instead. type: object - - title: stream_state - description: The current state of the stream. The object's keys are defined by the incremental sync's cursor_field the and partition router's values. - type: object - examples: - - created_at: "2020-01-01 00:00:00.000+00:00" - - updated_at: "2020-01-02 00:00:00.000+00:00" macros: - title: now_utc description: Returns the current date and time in the UTC timezone. diff --git a/airbyte_cdk/sources/declarative/interpolation/jinja.py b/airbyte_cdk/sources/declarative/interpolation/jinja.py index 3bb9b0c20..8f8548aee 100644 --- a/airbyte_cdk/sources/declarative/interpolation/jinja.py +++ b/airbyte_cdk/sources/declarative/interpolation/jinja.py @@ -11,10 +11,12 @@ from jinja2.exceptions import UndefinedError from jinja2.sandbox import SandboxedEnvironment +from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.interpolation.filters import filters from airbyte_cdk.sources.declarative.interpolation.interpolation import Interpolation from airbyte_cdk.sources.declarative.interpolation.macros import macros from airbyte_cdk.sources.types import Config +from airbyte_cdk.utils import AirbyteTracedException class StreamPartitionAccessEnvironment(SandboxedEnvironment): @@ -36,6 +38,10 @@ def is_safe_attribute(self, obj: Any, attr: str, value: Any) -> bool: "stream_partition": "stream_slice", # Use stream_partition to access partition router's values } +_UNSUPPORTED_INTERPOLATION_VARIABLES: Mapping[str, str] = { + "stream_state": "`stream_state` is no longer supported for interpolation. We recommend using `stream_interval` instead. Please reference the CDK Migration Guide for more information.", +} + # These extensions are not installed so they're not currently a problem, # but we're still explicitly removing them from the jinja context. # At worst, this is documentation that we do NOT want to include these extensions because of the potential security risks @@ -95,6 +101,13 @@ def eval( elif equivalent in context: context[alias] = context[equivalent] + for variable_name in _UNSUPPORTED_INTERPOLATION_VARIABLES: + if variable_name in input_str: + raise AirbyteTracedException( + message=_UNSUPPORTED_INTERPOLATION_VARIABLES[variable_name], + internal_message=_UNSUPPORTED_INTERPOLATION_VARIABLES[variable_name], + failure_type=FailureType.config_error, + ) try: if isinstance(input_str, str): result = self._eval(input_str, context) diff --git a/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte_cdk/sources/declarative/requesters/http_requester.py index b206bd688..e8c446503 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -123,7 +123,6 @@ def get_path( next_page_token: Optional[Mapping[str, Any]], ) -> str: kwargs = { - "stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token, } diff --git a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_nested_request_input_provider.py b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_nested_request_input_provider.py index 6403417c9..4e175bb28 100644 --- a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_nested_request_input_provider.py +++ b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_nested_request_input_provider.py @@ -10,7 +10,7 @@ NestedMapping, ) from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString -from airbyte_cdk.sources.types import Config, StreamSlice, StreamState +from airbyte_cdk.sources.types import Config, StreamSlice @dataclass @@ -42,20 +42,17 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def eval_request_inputs( self, - stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: """ Returns the request inputs to set on an outgoing HTTP request - :param stream_state: The stream state :param stream_slice: The stream slice :param next_page_token: The pagination token :return: The request inputs to set on an outgoing HTTP request """ kwargs = { - "stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token, } diff --git a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py index 0278df351..ed0e54c60 100644 --- a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py +++ b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py @@ -37,7 +37,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def eval_request_inputs( self, - stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, valid_key_types: Optional[Tuple[Type[Any]]] = None, @@ -46,7 +45,6 @@ def eval_request_inputs( """ Returns the request inputs to set on an outgoing HTTP request - :param stream_state: The stream state :param stream_slice: The stream slice :param next_page_token: The pagination token :param valid_key_types: A tuple of types that the interpolator should allow @@ -54,7 +52,6 @@ def eval_request_inputs( :return: The request inputs to set on an outgoing HTTP request """ kwargs = { - "stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token, } diff --git a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py index c327b83da..e14c64de0 100644 --- a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py +++ b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py @@ -5,8 +5,6 @@ from dataclasses import InitVar, dataclass, field from typing import Any, Mapping, MutableMapping, Optional, Union -from typing_extensions import deprecated - from airbyte_cdk.sources.declarative.interpolation.interpolated_nested_mapping import NestedMapping from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_nested_request_input_provider import ( InterpolatedNestedRequestInputProvider, @@ -17,7 +15,6 @@ from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import ( RequestOptionsProvider, ) -from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.types import Config, StreamSlice, StreamState RequestInput = Union[str, Mapping[str, str]] @@ -80,7 +77,6 @@ def get_request_params( next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: interpolated_value = self._parameter_interpolator.eval_request_inputs( - stream_state, stream_slice, next_page_token, valid_key_types=(str,), @@ -97,9 +93,7 @@ def get_request_headers( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._headers_interpolator.eval_request_inputs( - stream_state, stream_slice, next_page_token - ) + return self._headers_interpolator.eval_request_inputs(stream_slice, next_page_token) def get_request_body_data( self, @@ -109,7 +103,6 @@ def get_request_body_data( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Union[Mapping[str, Any], str]: return self._body_data_interpolator.eval_request_inputs( - stream_state, stream_slice, next_page_token, valid_key_types=(str,), @@ -123,42 +116,4 @@ def get_request_body_json( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._body_json_interpolator.eval_request_inputs( - stream_state, stream_slice, next_page_token - ) - - @deprecated( - "This class is temporary and used to incrementally deliver low-code to concurrent", - category=ExperimentalClassWarning, - ) - def request_options_contain_stream_state(self) -> bool: - """ - Temporary helper method used as we move low-code streams to the concurrent framework. This method determines if - the InterpolatedRequestOptionsProvider has is a dependency on a non-thread safe interpolation context such as - stream_state. - """ - - return ( - self._check_if_interpolation_uses_stream_state(self.request_parameters) - or self._check_if_interpolation_uses_stream_state(self.request_headers) - or self._check_if_interpolation_uses_stream_state(self.request_body_data) - or self._check_if_interpolation_uses_stream_state(self.request_body_json) - ) - - @staticmethod - def _check_if_interpolation_uses_stream_state( - request_input: Optional[Union[RequestInput, NestedMapping]], - ) -> bool: - if not request_input: - return False - elif isinstance(request_input, str): - return "stream_state" in request_input - else: - for key, val in request_input.items(): - # Covers the case of RequestInput in the form of a string or Mapping[str, str]. It also covers the case - # of a NestedMapping where the value is a string. - # Note: Doesn't account for nested mappings for request_body_json, but I don't see stream_state used in that way - # in our code - if "stream_state" in key or (isinstance(val, str) and "stream_state" in val): - return True - return False + return self._body_json_interpolator.eval_request_inputs(stream_slice, next_page_token) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index a5a8a71bc..a535a9b3d 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -133,7 +133,6 @@ def _get_request_options( mappings = [ paginator_method( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, ), @@ -141,7 +140,6 @@ def _get_request_options( if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests: mappings.append( stream_slicer_method( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, ) diff --git a/airbyte_cdk/sources/declarative/transformations/add_fields.py b/airbyte_cdk/sources/declarative/transformations/add_fields.py index 4c9d5366c..59e0c2aeb 100644 --- a/airbyte_cdk/sources/declarative/transformations/add_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/add_fields.py @@ -64,9 +64,9 @@ class AddFields(RecordTransformation): - path: ["shop_id"] value: "{{ config.shop_id }}" - # from state - - path: ["current_state"] - value: "{{ stream_state.cursor_field }}" # Or {{ stream_state['cursor_field'] }} + # from stream_interval + - path: ["date"] + value: "{{ stream_interval.start_date }}" # from record - path: ["unnested_value"] @@ -128,7 +128,7 @@ def transform( ) -> None: if config is None: config = {} - kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice} + kwargs = {"record": record, "stream_slice": stream_slice} for parsed_field in self._parsed_fields: valid_types = (parsed_field.value_type,) if parsed_field.value_type else None value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs) diff --git a/cdk-migrations.md b/cdk-migrations.md index 8173a5edd..dc39a8a67 100644 --- a/cdk-migrations.md +++ b/cdk-migrations.md @@ -1,10 +1,41 @@ # CDK Migration Guide +## Upgrading to 6.X.X + +Version 6.X.X of the CDK removes support for `stream_state` in the Jinja interpolation context. This change is breaking for any low-code connectors that use `stream_state` in the interpolation context. + +The following components are impacted by this change: + +- `HttpRequester` + - `request_parameters` + - `request_body_json` + - `request_body_data` + - `request_headers` +- `RecordFilter` +- `AddField` + +Where applicable, we recommend updating to use `stream_interval` instead. + +### Example + +```yaml +# Before +record_filter: + type: RecordFilter + condition: "{{ stream_state['updated_at'] }}" + +# After +record_filter: + type: RecordFilter + condition: "{{ stream_interval['start_date'] }}" +``` + ## Upgrading to 6.28.0 Starting from version 6.28.0, the CDK no longer includes Pendulum as a transitive dependency. If your connector relies on Pendulum without explicitly declaring it as a dependency, you will need to add it to your connector's dependencies going forward. More info: + - https://deptry.com/rules-violations/#transitive-dependencies-dep003 ## Upgrading to 6.0.0 diff --git a/unit_tests/sources/declarative/datetime/test_min_max_datetime.py b/unit_tests/sources/declarative/datetime/test_min_max_datetime.py index 1cc0e6014..c00cdd2ac 100644 --- a/unit_tests/sources/declarative/datetime/test_min_max_datetime.py +++ b/unit_tests/sources/declarative/datetime/test_min_max_datetime.py @@ -22,13 +22,13 @@ ( "test_time_is_greater_than_min", "{{ config['older'] }}", - "{{ stream_state['newer'] }}", + "{{ stream_interval['start_date'] }}", "", new_date, ), ( "test_time_is_less_than_min", - "{{ stream_state['newer'] }}", + "{{ stream_interval['start_date'] }}", "{{ config['older'] }}", "", new_date, @@ -42,7 +42,7 @@ ), ( "test_time_is_greater_than_max", - "{{ stream_state['newer'] }}", + "{{ stream_interval['start_date'] }}", "", "{{ config['older'] }}", old_date, @@ -51,13 +51,13 @@ "test_time_is_less_than_max", "{{ config['older'] }}", "", - "{{ stream_state['newer'] }}", + "{{ stream_interval['start_date'] }}", old_date, ), ( "test_time_is_equal_to_min", - "{{ stream_state['newer'] }}", - "{{ stream_state['newer'] }}", + "{{ stream_interval['start_date'] }}", + "{{ stream_interval['start_date'] }}", "", new_date, ), @@ -65,7 +65,7 @@ "test_time_is_between_min_and_max", "{{ config['middle'] }}", "{{ config['older'] }}", - "{{ stream_state['newer'] }}", + "{{ stream_interval['start_date'] }}", middle_date, ), ( @@ -77,7 +77,7 @@ ), ( "test_max_newer_time_from_parameters", - "{{ stream_state['newer'] }}", + "{{ stream_interval['start_date'] }}", "", "{{ parameters['older'] }}", old_date, @@ -86,29 +86,29 @@ ) def test_min_max_datetime(test_name, date, min_date, max_date, expected_date): config = {"older": old_date, "middle": middle_date} - stream_state = {"newer": new_date} + stream_slice = {"start_date": new_date} parameters = {"newer": new_date, "older": old_date} min_max_date = MinMaxDatetime( datetime=date, min_datetime=min_date, max_datetime=max_date, parameters=parameters ) - actual_date = min_max_date.get_datetime(config, **{"stream_state": stream_state}) + actual_date = min_max_date.get_datetime(config, **{"stream_slice": stream_slice}) assert actual_date == datetime.datetime.strptime(expected_date, date_format) def test_custom_datetime_format(): config = {"older": "2021-01-01T20:12:19", "middle": "2022-01-01T20:12:19"} - stream_state = {"newer": "2022-06-24T20:12:19"} + stream_slice = {"newer": "2022-06-24T20:12:19"} min_max_date = MinMaxDatetime( datetime="{{ config['middle'] }}", datetime_format="%Y-%m-%dT%H:%M:%S", min_datetime="{{ config['older'] }}", - max_datetime="{{ stream_state['newer'] }}", + max_datetime="{{ stream_slice['newer'] }}", parameters={}, ) - actual_date = min_max_date.get_datetime(config, **{"stream_state": stream_state}) + actual_date = min_max_date.get_datetime(config, **{"stream_slice": stream_slice}) assert actual_date == datetime.datetime.strptime( "2022-01-01T20:12:19", "%Y-%m-%dT%H:%M:%S" @@ -117,16 +117,16 @@ def test_custom_datetime_format(): def test_format_is_a_number(): config = {"older": "20210101", "middle": "20220101"} - stream_state = {"newer": "20220624"} + stream_slice = {"newer": "20220624"} min_max_date = MinMaxDatetime( datetime="{{ config['middle'] }}", datetime_format="%Y%m%d", min_datetime="{{ config['older'] }}", - max_datetime="{{ stream_state['newer'] }}", + max_datetime="{{ stream_slice['newer'] }}", parameters={}, ) - actual_date = min_max_date.get_datetime(config, **{"stream_state": stream_state}) + actual_date = min_max_date.get_datetime(config, **{"stream_slice": stream_slice}) assert actual_date == datetime.datetime.strptime("20220101", "%Y%m%d").replace( tzinfo=datetime.timezone.utc diff --git a/unit_tests/sources/declarative/extractors/test_record_filter.py b/unit_tests/sources/declarative/extractors/test_record_filter.py index 5df391327..24956078f 100644 --- a/unit_tests/sources/declarative/extractors/test_record_filter.py +++ b/unit_tests/sources/declarative/extractors/test_record_filter.py @@ -56,7 +56,7 @@ "filter_template, records, expected_records", [ ( - "{{ record['created_at'] > stream_state['created_at'] }}", + "{{ record['created_at'] >= stream_interval.extra_fields['created_at'] }}", [ {"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, diff --git a/unit_tests/sources/declarative/extractors/test_record_selector.py b/unit_tests/sources/declarative/extractors/test_record_selector.py index 5ec883ad2..2d32bc75c 100644 --- a/unit_tests/sources/declarative/extractors/test_record_selector.py +++ b/unit_tests/sources/declarative/extractors/test_record_selector.py @@ -23,7 +23,7 @@ ( "test_with_extractor_and_filter", ["data"], - "{{ record['created_at'] > stream_state['created_at'] }}", + "{{ record['created_at'] > stream_interval.extra_fields['created_at'] }}", { "data": [ {"id": 1, "created_at": "06-06-21"}, @@ -80,7 +80,11 @@ def test_record_filter(test_name, field_path, filter_template, body, expected_da config = {"response_override": "stop_if_you_see_me"} parameters = {"parameters_field": "data", "created_at": "06-07-21"} stream_state = {"created_at": "06-06-21"} - stream_slice = StreamSlice(partition={}, cursor_slice={"last_seen": "06-10-21"}) + stream_slice = StreamSlice( + partition={}, + cursor_slice={"last_seen": "06-10-21"}, + extra_fields={"created_at": "06-06-21"}, + ) next_page_token = {"last_seen_id": 14} schema = create_schema() first_transformation = Mock(spec=RecordTransformation) diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index ef06676f5..3ec3bee99 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -143,7 +143,7 @@ "type": "RecordSelector", "extractor": {"type": "DpathExtractor", "field_path": ["comments"]}, "record_filter": { - "condition": "{{ record['updated_at'] >= stream_state.get('updated_at', config.get('start_date')) }}" + "condition": "{{ record['updated_at'] >= stream_interval.extra_fields.get('updated_at', config.get('start_date')) }}" }, }, "paginator": "#/definitions/retriever/paginator", @@ -2543,7 +2543,7 @@ def test_incremental_error( "type": "RecordSelector", "extractor": {"type": "DpathExtractor", "field_path": ["comments"]}, "record_filter": { - "condition": "{{ record['updated_at'] >= stream_state.get('updated_at', config.get('start_date')) }}" + "condition": "{{ record['updated_at'] >= stream_interval['extra_fields'].get('updated_at', config.get('start_date')) }}" }, }, "paginator": "#/definitions/retriever/paginator", diff --git a/unit_tests/sources/declarative/interpolation/test_jinja.py b/unit_tests/sources/declarative/interpolation/test_jinja.py index a99236324..7335e056b 100644 --- a/unit_tests/sources/declarative/interpolation/test_jinja.py +++ b/unit_tests/sources/declarative/interpolation/test_jinja.py @@ -10,6 +10,7 @@ from airbyte_cdk import StreamSlice from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation +from airbyte_cdk.utils import AirbyteTracedException interpolation = JinjaInterpolation() @@ -208,6 +209,11 @@ def test_invalid_jinja_statements(template_string): interpolation.eval(template_string, config=config) +def test_given_unsupported_jinja_expression_then_raises_airbyte_traced_exception(): + with pytest.raises(AirbyteTracedException): + interpolation.eval("{{ stream_state.get('some_field') }}", config={}) + + @pytest.mark.parametrize( "template_string", [ diff --git a/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py b/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py index b65f1f724..2cc6080e9 100644 --- a/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py +++ b/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py @@ -145,7 +145,7 @@ "type": "RecordSelector", "extractor": {"type": "DpathExtractor", "field_path": ["comments"]}, "record_filter": { - "condition": "{{ record['updated_at'] >= stream_state.get('updated_at', config.get('start_date')) }}" + "condition": "{{ record['updated_at'] >= stream_interval.get('start_date', config.get('start_date')) }}" }, }, "paginator": "#/definitions/retriever/paginator", @@ -1655,7 +1655,7 @@ def test_incremental_parent_state_no_incremental_dependency( "type": "RecordSelector", "extractor": {"type": "DpathExtractor", "field_path": ["comments"]}, "record_filter": { - "condition": "{{ record['updated_at'] >= stream_state.get('updated_at', config.get('start_date')) }}" + "condition": "{{ record['updated_at'] >= stream_interval.get('start_date', config.get('start_date')) }}" }, }, "paginator": "#/definitions/retriever/paginator", diff --git a/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py b/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py index d244cfc4c..2c646cf43 100644 --- a/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py +++ b/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py @@ -23,9 +23,9 @@ {"a_static_request_param": "a_static_value"}, ), ( - "test_value_depends_on_state", - {"read_from_state": "{{ stream_state['date'] }}"}, - {"read_from_state": "2021-01-01"}, + "test_value_depends_on_stream_interval", + {"read_from_stream_interval": "{{ stream_interval['start_date'] }}"}, + {"read_from_stream_interval": "2020-01-01"}, ), ( "test_value_depends_on_stream_slice", @@ -45,9 +45,9 @@ ( "test_parameter_is_interpolated", { - "{{ stream_state['date'] }} - {{stream_slice['start_date']}} - {{next_page_token['offset']}} - {{config['option']}}": "ABC" + "{{ stream_interval['start_date'] }} - {{stream_slice['start_date']}} - {{next_page_token['offset']}} - {{config['option']}}": "ABC" }, - {"2021-01-01 - 2020-01-01 - 12345 - OPTION": "ABC"}, + {"2020-01-01 - 2020-01-01 - 12345 - OPTION": "ABC"}, ), ("test_boolean_false_value", {"boolean_false": "{{ False }}"}, {"boolean_false": "False"}), ("test_integer_falsy_value", {"integer_falsy": "{{ 0 }}"}, {"integer_falsy": "0"}), @@ -76,11 +76,6 @@ def test_interpolated_request_params(test_name, input_request_params, expected_r {"a_static_request_param": "a_static_value"}, {"a_static_request_param": "a_static_value"}, ), - ( - "test_value_depends_on_state", - {"read_from_state": "{{ stream_state['date'] }}"}, - {"read_from_state": "2021-01-01"}, - ), ( "test_value_depends_on_stream_slice", {"read_from_slice": "{{ stream_slice['start_date'] }}"}, @@ -98,8 +93,8 @@ def test_interpolated_request_params(test_name, input_request_params, expected_r ), ( "test_interpolated_keys", - {"{{ stream_state['date'] }}": 123, "{{ config['option'] }}": "ABC"}, - {"2021-01-01": 123, "OPTION": "ABC"}, + {"{{ stream_interval['start_date'] }}": 123, "{{ config['option'] }}": "ABC"}, + {"2020-01-01": 123, "OPTION": "ABC"}, ), ("test_boolean_false_value", {"boolean_false": "{{ False }}"}, {"boolean_false": False}), ("test_integer_falsy_value", {"integer_falsy": "{{ 0 }}"}, {"integer_falsy": 0}), @@ -118,8 +113,8 @@ def test_interpolated_request_params(test_name, input_request_params, expected_r ), ( "test_nested_objects_interpolated keys", - {"nested": {"{{ stream_state['date'] }}": "{{ config['option'] }}"}}, - {"nested": {"2021-01-01": "OPTION"}}, + {"nested": {"{{ stream_interval['start_date'] }}": "{{ config['option'] }}"}}, + {"nested": {"2020-01-01": "OPTION"}}, ), ], ) @@ -156,8 +151,8 @@ def test_interpolated_request_json(test_name, input_request_json, expected_reque ("test_defaults_to_empty_dict", None, {}), ( "test_interpolated_keys", - {"{{ stream_state['date'] }} - {{ next_page_token['offset'] }}": "ABC"}, - {"2021-01-01 - 12345": "ABC"}, + {"{{ stream_interval['start_date'] }} - {{ next_page_token['offset'] }}": "ABC"}, + {"2020-01-01 - 12345": "ABC"}, ), ], ) @@ -183,83 +178,3 @@ def test_error_on_create_for_both_request_json_and_data(): request_body_data=request_data, parameters={}, ) - - -@pytest.mark.parametrize( - "request_option_type,request_input,contains_state", - [ - pytest.param( - "request_parameter", - {"start": "{{ stream_state.get('start_date') }}"}, - True, - id="test_request_parameter_has_state", - ), - pytest.param( - "request_parameter", - {"start": "{{ slice_interval.get('start_date') }}"}, - False, - id="test_request_parameter_no_state", - ), - pytest.param( - "request_header", - {"start": "{{ stream_state.get('start_date') }}"}, - True, - id="test_request_header_has_state", - ), - pytest.param( - "request_header", - {"start": "{{ slice_interval.get('start_date') }}"}, - False, - id="test_request_header_no_state", - ), - pytest.param( - "request_body_data", - "[{'query': {'type': 'timestamp', 'value': stream_state.get('start_date')}}]", - True, - id="test_request_body_data_has_state", - ), - pytest.param( - "request_body_data", - "[{'query': {'type': 'timestamp', 'value': stream_interval.get('start_date')}}]", - False, - id="test_request_body_data_no_state", - ), - pytest.param( - "request_body_json", - {"start": "{{ stream_state.get('start_date') }}"}, - True, - id="test_request_body_json_has_state", - ), - pytest.param( - "request_body_json", - {"start": "{{ slice_interval.get('start_date') }}"}, - False, - id="test_request_request_body_json_no_state", - ), - ], -) -def test_request_options_contain_stream_state(request_option_type, request_input, contains_state): - request_options_provider: InterpolatedRequestOptionsProvider - match request_option_type: - case "request_parameter": - request_options_provider = InterpolatedRequestOptionsProvider( - config=config, request_parameters=request_input, parameters={} - ) - case "request_header": - request_options_provider = InterpolatedRequestOptionsProvider( - config=config, request_headers=request_input, parameters={} - ) - case "request_body_data": - request_options_provider = InterpolatedRequestOptionsProvider( - config=config, request_body_data=request_input, parameters={} - ) - case "request_body_json": - request_options_provider = InterpolatedRequestOptionsProvider( - config=config, request_body_json=request_input, parameters={} - ) - case _: - request_options_provider = InterpolatedRequestOptionsProvider( - config=config, parameters={} - ) - - assert request_options_provider.request_options_contain_stream_state() == contains_state diff --git a/unit_tests/sources/declarative/requesters/test_interpolated_request_input_provider.py b/unit_tests/sources/declarative/requesters/test_interpolated_request_input_provider.py index 6a332b528..5161fe765 100644 --- a/unit_tests/sources/declarative/requesters/test_interpolated_request_input_provider.py +++ b/unit_tests/sources/declarative/requesters/test_interpolated_request_input_provider.py @@ -45,7 +45,7 @@ def test_initialize_interpolated_mapping_request_input_provider( provider = InterpolatedRequestInputProvider( request_inputs=input_request_data, config=config, parameters=parameters ) - actual_request_data = provider.eval_request_inputs(stream_state={}, stream_slice=stream_slice) + actual_request_data = provider.eval_request_inputs(stream_slice=stream_slice) assert isinstance(provider._interpolator, InterpolatedMapping) assert actual_request_data == expected_request_data diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index fe03c6ad4..0b5778b7b 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -129,7 +129,7 @@ def test_simple_retriever_full(mock_http_stream): retriever._next_page_token(response, last_page_size, last_record, last_page_token_value) == next_page_token ) - assert retriever._request_params(None, None, None) == {} + assert retriever._request_params(None, None) == {} assert retriever.stream_slices() == stream_slices diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 71874248d..188256b10 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -1589,38 +1589,6 @@ def test_concurrency_level_initial_number_partitions_to_generate_is_always_one_o assert source._concurrent_source._initial_number_partitions_to_generate == 1 -def test_streams_with_stream_state_interpolation_should_be_synchronous(): - manifest_with_stream_state_interpolation = copy.deepcopy(_MANIFEST) - - # Add stream_state interpolation to the location stream's HttpRequester - manifest_with_stream_state_interpolation["definitions"]["locations_stream"]["retriever"][ - "requester" - ]["request_parameters"] = { - "after": "{{ stream_state['updated_at'] }}", - } - - # Add a RecordFilter component that uses stream_state interpolation to the party member stream - manifest_with_stream_state_interpolation["definitions"]["party_members_stream"]["retriever"][ - "record_selector" - ]["record_filter"] = { - "type": "RecordFilter", - "condition": "{{ record.updated_at > stream_state['updated_at'] }}", - } - - source = ConcurrentDeclarativeSource( - source_config=manifest_with_stream_state_interpolation, - config=_CONFIG, - catalog=_CATALOG, - state=None, - ) - concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) - - # 1 full refresh stream, 2 with parent stream without incremental dependency, 1 stream with async retriever, 1 incremental with parent stream (palace_enemies) - assert len(concurrent_streams) == 5 - # 2 incremental stream with interpolation on state (locations and party_members) - assert len(synchronous_streams) == 2 - - def test_given_partition_routing_and_incremental_sync_then_stream_is_concurrent(): manifest = { "version": "5.0.0", diff --git a/unit_tests/sources/declarative/transformations/test_add_fields.py b/unit_tests/sources/declarative/transformations/test_add_fields.py index 507f60abc..929af28a6 100644 --- a/unit_tests/sources/declarative/transformations/test_add_fields.py +++ b/unit_tests/sources/declarative/transformations/test_add_fields.py @@ -86,22 +86,6 @@ {"k": "v", "k2": "in-n-out"}, id="set a value from the config using dot notation", ), - pytest.param( - {"k": "v"}, - [(["k2"], '{{ stream_state["cursor"] }}')], - None, - {"stream_state": {"cursor": "t0"}}, - {"k": "v", "k2": "t0"}, - id="set a value from the state using bracket notation", - ), - pytest.param( - {"k": "v"}, - [(["k2"], "{{ stream_state.cursor }}")], - None, - {"stream_state": {"cursor": "t0"}}, - {"k": "v", "k2": "t0"}, - id="set a value from the state using dot notation", - ), pytest.param( {"k": "v"}, [(["k2"], '{{ stream_slice["start_date"] }}')],