diff --git a/airbyte_cdk/sources/declarative/extractors/record_filter.py b/airbyte_cdk/sources/declarative/extractors/record_filter.py index 888498c8d..df11dda1b 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_filter.py +++ b/airbyte_cdk/sources/declarative/extractors/record_filter.py @@ -37,14 +37,12 @@ def filter_records( stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, - stream_interval: Optional[Mapping[str, Any]] = None, ) -> Iterable[Mapping[str, Any]]: kwargs = { "stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token, "stream_slice.extra_fields": stream_slice.extra_fields if stream_slice else {}, - "stream_interval": stream_interval or {}, } for record in records: if self._filter_interpolator.eval(self.config, record=record, **kwargs): @@ -73,7 +71,6 @@ def filter_records( stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, - stream_interval: Optional[Mapping[str, Any]] = None, ) -> Iterable[Mapping[str, Any]]: records = ( record @@ -90,6 +87,6 @@ def filter_records( stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, - stream_interval=stream_interval, + ) yield from records diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index 2941de1b9..d3a8e4eae 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -73,7 +73,6 @@ def select_records( records_schema: Mapping[str, Any], stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, - stream_interval: Optional[Dict[str, Any]] = None, ) -> Iterable[Record]: """ Selects records from the response @@ -82,12 +81,11 @@ def select_records( :param records_schema: json schema of records to return :param stream_slice: The stream slice :param next_page_token: The paginator token - :param stream_interval: The stream interval for incremental sync values :return: List of Records selected from the response """ all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response) yield from self.filter_and_transform( - all_data, stream_state, records_schema, stream_slice, next_page_token, stream_interval + all_data, stream_state, records_schema, stream_slice, next_page_token ) def filter_and_transform( @@ -97,7 +95,6 @@ def filter_and_transform( records_schema: Mapping[str, Any], stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, - stream_interval: Optional[Dict[str, Any]] = None, ) -> Iterable[Record]: """ There is an issue with the selector as of 2024-08-30: it does technology-agnostic processing like filtering, transformation and @@ -108,10 +105,10 @@ def filter_and_transform( share the logic of doing transformations on a set of records. """ filtered_data = self._filter( - all_data, stream_state, stream_slice, next_page_token, stream_interval + all_data, stream_state, stream_slice, next_page_token ) transformed_data = self._transform( - filtered_data, stream_state, stream_slice, stream_interval + filtered_data, stream_state, stream_slice ) normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema) for data in normalized_data: @@ -135,7 +132,6 @@ def _filter( stream_state: StreamState, stream_slice: Optional[StreamSlice], next_page_token: Optional[Mapping[str, Any]], - stream_interval: Optional[Dict[str, Any]] = None, ) -> Iterable[Mapping[str, Any]]: if self.record_filter: yield from self.record_filter.filter_records( @@ -143,7 +139,6 @@ def _filter( stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, - stream_interval=stream_interval, ) else: yield from records @@ -153,7 +148,6 @@ def _transform( records: Iterable[Mapping[str, Any]], stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, - stream_interval: Optional[Dict[str, Any]] = None, ) -> Iterable[Mapping[str, Any]]: for record in records: for transformation in self.transformations: @@ -162,6 +156,5 @@ def _transform( config=self.config, stream_state=stream_state, stream_slice=stream_slice, - stream_interval=stream_interval, ) yield record diff --git a/airbyte_cdk/sources/declarative/interpolation/jinja.py b/airbyte_cdk/sources/declarative/interpolation/jinja.py index bdc3a0857..3d941ab98 100644 --- a/airbyte_cdk/sources/declarative/interpolation/jinja.py +++ b/airbyte_cdk/sources/declarative/interpolation/jinja.py @@ -38,6 +38,7 @@ def is_safe_attribute(self, obj: Any, attr: str, value: Any) -> bool: # These aliases are used to deprecate existing keywords without breaking all existing connectors. _ALIASES = { + "stream_interval": "stream_state", # Use stream_interval to access incremental sync values "stream_partition": "stream_slice", # Use stream_partition to access partition router's values } @@ -95,12 +96,7 @@ def eval( context = {"config": config, **additional_parameters} for alias, equivalent in _ALIASES.items(): - if alias in context: - # This is unexpected. We could ignore or log a warning, but failing loudly should result in fewer surprises - raise ValueError( - f"Found reserved keyword {alias} in interpolation context. This is unexpected and indicative of a bug in the CDK." - ) - elif equivalent in context: + if equivalent in context: context[alias] = context[equivalent] try: diff --git a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py index 9054c6318..d44571daa 100644 --- a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py +++ b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py @@ -40,24 +40,22 @@ def eval_request_inputs( stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, - stream_interval: Optional[Dict[str, Any]] = None, valid_key_types: Optional[Tuple[Type[Any]]] = None, valid_value_types: Optional[Tuple[Type[Any], ...]] = None, ) -> Mapping[str, Any]: """ Returns the request inputs to set on an outgoing HTTP request - :param stream_state: The stream state (deprecated, use stream_interval instead) + :param stream_state: The stream state :param stream_slice: The stream slice :param next_page_token: The pagination token - :param stream_interval: The stream interval for incremental sync values :param valid_key_types: A tuple of types that the interpolator should allow :param valid_value_types: A tuple of types that the interpolator should allow :return: The request inputs to set on an outgoing HTTP request """ kwargs = { "stream_slice": stream_slice, - "stream_interval": stream_interval or stream_state, # Use stream_state as stream_interval for backward compatibility + "stream_state": stream_state, "next_page_token": next_page_token, } interpolated_value = self._interpolator.eval( # type: ignore # self._interpolator is always initialized with a value and will not be None diff --git a/airbyte_cdk/sources/declarative/transformations/add_fields.py b/airbyte_cdk/sources/declarative/transformations/add_fields.py index 4de8885ff..c7e67cc1b 100644 --- a/airbyte_cdk/sources/declarative/transformations/add_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/add_fields.py @@ -45,7 +45,6 @@ class AddFields(RecordTransformation): This transformation has access to the following contextual values: record: the record about to be output by the connector config: the input configuration provided to a connector - stream_interval: the stream interval for incremental sync values stream_slice: the current stream slice being read @@ -65,10 +64,6 @@ class AddFields(RecordTransformation): - path: ["shop_id"] value: "{{ config.shop_id }}" - # from stream_interval - - path: ["current_state"] - value: "{{ stream_interval.cursor_field }}" - # from record - path: ["unnested_value"] value: {{ record.nested.field }} @@ -127,14 +122,12 @@ def transform( config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, - stream_interval: Optional[Dict[str, Any]] = None, ) -> None: if config is None: config = {} kwargs = { "record": record, "stream_slice": stream_slice, - "stream_interval": stream_interval, } for parsed_field in self._parsed_fields: valid_types = (parsed_field.value_type,) if parsed_field.value_type else None diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py index 29c0d5711..893034ecd 100644 --- a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -41,7 +41,6 @@ def transform( config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, - stream_interval: Optional[Dict[str, Any]] = None, ) -> None: path = [path.eval(self.config) for path in self._field_path] if "*" in path: diff --git a/airbyte_cdk/sources/declarative/transformations/flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/flatten_fields.py index 3d1d33ceb..59d8874f7 100644 --- a/airbyte_cdk/sources/declarative/transformations/flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/flatten_fields.py @@ -22,7 +22,6 @@ def transform( config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, - stream_interval: Optional[Dict[str, Any]] = None, ) -> None: transformed_record = self.flatten_record(record) record.clear() diff --git a/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py b/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py index bff645c92..d48529a14 100644 --- a/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py +++ b/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py @@ -41,7 +41,6 @@ def transform( config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, - stream_interval: Optional[Dict[str, Any]] = None, ) -> None: if config is None: config = {} @@ -49,7 +48,6 @@ def transform( kwargs = { "record": record, "stream_slice": stream_slice, - "stream_interval": stream_interval, } old_key = str(self._old.eval(config, **kwargs)) new_key = str(self._new.eval(config, **kwargs)) diff --git a/airbyte_cdk/sources/declarative/transformations/keys_to_lower_transformation.py b/airbyte_cdk/sources/declarative/transformations/keys_to_lower_transformation.py index a37b9a737..ea74fd326 100644 --- a/airbyte_cdk/sources/declarative/transformations/keys_to_lower_transformation.py +++ b/airbyte_cdk/sources/declarative/transformations/keys_to_lower_transformation.py @@ -20,7 +20,6 @@ def transform( config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, - stream_interval: Optional[Dict[str, Any]] = None, ) -> None: for key in set(record.keys()): record[key.lower()] = record.pop(key) diff --git a/airbyte_cdk/sources/declarative/transformations/keys_to_snake_transformation.py b/airbyte_cdk/sources/declarative/transformations/keys_to_snake_transformation.py index da3559c4a..0abc8cb38 100644 --- a/airbyte_cdk/sources/declarative/transformations/keys_to_snake_transformation.py +++ b/airbyte_cdk/sources/declarative/transformations/keys_to_snake_transformation.py @@ -26,7 +26,6 @@ def transform( config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, - stream_interval: Optional[Dict[str, Any]] = None, ) -> None: transformed_record = self._transform_record(record) record.clear() diff --git a/airbyte_cdk/sources/declarative/transformations/remove_fields.py b/airbyte_cdk/sources/declarative/transformations/remove_fields.py index a383da917..14226ce37 100644 --- a/airbyte_cdk/sources/declarative/transformations/remove_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/remove_fields.py @@ -57,7 +57,6 @@ def transform( config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, - stream_interval: Optional[Dict[str, Any]] = None, ) -> None: """ :param record: The record to be transformed diff --git a/airbyte_cdk/sources/declarative/transformations/transformation.py b/airbyte_cdk/sources/declarative/transformations/transformation.py index ba28e0b9e..206ea9edd 100644 --- a/airbyte_cdk/sources/declarative/transformations/transformation.py +++ b/airbyte_cdk/sources/declarative/transformations/transformation.py @@ -22,7 +22,6 @@ def transform( config: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[StreamSlice] = None, - stream_interval: Optional[Dict[str, Any]] = None, ) -> None: """ Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument. @@ -31,7 +30,6 @@ def transform( :param config: The user-provided configuration as specified by the source's spec :param stream_state: The stream state :param stream_slice: The stream slice - :param stream_interval: The stream interval for incremental sync values :return: The transformed record """ diff --git a/unit_tests/sources/declarative/datetime/test_min_max_datetime.py b/unit_tests/sources/declarative/datetime/test_min_max_datetime.py index 0e470166f..104add557 100644 --- a/unit_tests/sources/declarative/datetime/test_min_max_datetime.py +++ b/unit_tests/sources/declarative/datetime/test_min_max_datetime.py @@ -86,20 +86,20 @@ ) def test_min_max_datetime(test_name, date, min_date, max_date, expected_date): config = {"older": old_date, "middle": middle_date} - stream_interval = {"newer": new_date} + stream_state = {"newer": new_date} parameters = {"newer": new_date, "older": old_date} min_max_date = MinMaxDatetime( datetime=date, min_datetime=min_date, max_datetime=max_date, parameters=parameters ) - actual_date = min_max_date.get_datetime(config, **{"stream_interval": stream_interval}) + actual_date = min_max_date.get_datetime(config, **{"stream_interval": stream_state}) assert actual_date == datetime.datetime.strptime(expected_date, date_format) def test_custom_datetime_format(): config = {"older": "2021-01-01T20:12:19", "middle": "2022-01-01T20:12:19"} - stream_interval = {"newer": "2022-06-24T20:12:19"} + stream_state = {"newer": "2022-06-24T20:12:19"} min_max_date = MinMaxDatetime( datetime="{{ config['middle'] }}", @@ -108,7 +108,7 @@ def test_custom_datetime_format(): max_datetime="{{ stream_interval['newer'] }}", parameters={}, ) - actual_date = min_max_date.get_datetime(config, **{"stream_interval": stream_interval}) + actual_date = min_max_date.get_datetime(config, **{"stream_interval": stream_state}) assert actual_date == datetime.datetime.strptime( "2022-01-01T20:12:19", "%Y-%m-%dT%H:%M:%S" @@ -117,7 +117,7 @@ def test_custom_datetime_format(): def test_format_is_a_number(): config = {"older": "20210101", "middle": "20220101"} - stream_interval = {"newer": "20220624"} + stream_state = {"newer": "20220624"} min_max_date = MinMaxDatetime( datetime="{{ config['middle'] }}", @@ -126,7 +126,7 @@ def test_format_is_a_number(): max_datetime="{{ stream_interval['newer'] }}", parameters={}, ) - actual_date = min_max_date.get_datetime(config, **{"stream_interval": stream_interval}) + actual_date = min_max_date.get_datetime(config, **{"stream_interval": stream_state}) assert actual_date == datetime.datetime.strptime("20220101", "%Y%m%d").replace( tzinfo=datetime.timezone.utc diff --git a/unit_tests/sources/declarative/extractors/test_record_filter.py b/unit_tests/sources/declarative/extractors/test_record_filter.py index b20af7751..5df391327 100644 --- a/unit_tests/sources/declarative/extractors/test_record_filter.py +++ b/unit_tests/sources/declarative/extractors/test_record_filter.py @@ -56,7 +56,7 @@ "filter_template, records, expected_records", [ ( - "{{ record['created_at'] > stream_interval['created_at'] }}", + "{{ record['created_at'] > stream_state['created_at'] }}", [ {"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, @@ -116,7 +116,7 @@ def test_record_filter( ): config = {"response_override": "stop_if_you_see_me"} parameters = {"created_at": "06-07-21"} - stream_interval = {"created_at": "06-06-21"} + stream_state = {"created_at": "06-06-21"} stream_slice = StreamSlice( partition={}, cursor_slice={"last_seen": "06-10-21"}, @@ -128,10 +128,9 @@ def test_record_filter( actual_records = list( record_filter.filter_records( records, - stream_state={}, + stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, - stream_interval=stream_interval, ) ) assert actual_records == expected_records diff --git a/unit_tests/sources/declarative/extractors/test_record_selector.py b/unit_tests/sources/declarative/extractors/test_record_selector.py index d0e3b431a..060799230 100644 --- a/unit_tests/sources/declarative/extractors/test_record_selector.py +++ b/unit_tests/sources/declarative/extractors/test_record_selector.py @@ -23,7 +23,7 @@ ( "test_with_extractor_and_filter", ["data"], - "{{ record['created_at'] > stream_interval['created_at'] }}", + "{{ record['created_at'] > stream_state['created_at'] }}", { "data": [ {"id": 1, "created_at": "06-06-21"}, @@ -79,7 +79,7 @@ def test_record_filter(test_name, field_path, filter_template, body, expected_data): config = {"response_override": "stop_if_you_see_me"} parameters = {"parameters_field": "data", "created_at": "06-07-21"} - stream_interval = {"created_at": "06-06-21"} + stream_state = {"created_at": "06-06-21"} stream_slice = StreamSlice(partition={}, cursor_slice={"last_seen": "06-10-21"}) next_page_token = {"last_seen_id": 14} schema = create_schema()