Skip to content

Commit

Permalink
fix: restore stream_state aliasing in jinja.py and remove stream_inte…
Browse files Browse the repository at this point in the history
…rval parameters

Co-Authored-By: [email protected] <[email protected]>
  • Loading branch information
devin-ai-integration[bot] and natikgadzhi committed Feb 10, 2025
1 parent a4be945 commit 11269f2
Show file tree
Hide file tree
Showing 15 changed files with 19 additions and 52 deletions.
5 changes: 1 addition & 4 deletions airbyte_cdk/sources/declarative/extractors/record_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ def filter_records(
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_interval: Optional[Mapping[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
kwargs = {
"stream_state": stream_state,
"stream_slice": stream_slice,
"next_page_token": next_page_token,
"stream_slice.extra_fields": stream_slice.extra_fields if stream_slice else {},
"stream_interval": stream_interval or {},
}
for record in records:
if self._filter_interpolator.eval(self.config, record=record, **kwargs):
Expand Down Expand Up @@ -73,7 +71,6 @@ def filter_records(
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_interval: Optional[Mapping[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
records = (
record
Expand All @@ -90,6 +87,6 @@ def filter_records(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
stream_interval=stream_interval,

)
yield from records
13 changes: 3 additions & 10 deletions airbyte_cdk/sources/declarative/extractors/record_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def select_records(
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> Iterable[Record]:
"""
Selects records from the response
Expand All @@ -82,12 +81,11 @@ def select_records(
:param records_schema: json schema of records to return
:param stream_slice: The stream slice
:param next_page_token: The paginator token
:param stream_interval: The stream interval for incremental sync values
:return: List of Records selected from the response
"""
all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response)
yield from self.filter_and_transform(
all_data, stream_state, records_schema, stream_slice, next_page_token, stream_interval
all_data, stream_state, records_schema, stream_slice, next_page_token
)

def filter_and_transform(
Expand All @@ -97,7 +95,6 @@ def filter_and_transform(
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> Iterable[Record]:
"""
There is an issue with the selector as of 2024-08-30: it does technology-agnostic processing like filtering, transformation and
Expand All @@ -108,10 +105,10 @@ def filter_and_transform(
share the logic of doing transformations on a set of records.
"""
filtered_data = self._filter(
all_data, stream_state, stream_slice, next_page_token, stream_interval
all_data, stream_state, stream_slice, next_page_token
)
transformed_data = self._transform(
filtered_data, stream_state, stream_slice, stream_interval
filtered_data, stream_state, stream_slice
)
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
for data in normalized_data:
Expand All @@ -135,15 +132,13 @@ def _filter(
stream_state: StreamState,
stream_slice: Optional[StreamSlice],
next_page_token: Optional[Mapping[str, Any]],
stream_interval: Optional[Dict[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
if self.record_filter:
yield from self.record_filter.filter_records(
records,
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
stream_interval=stream_interval,
)
else:
yield from records
Expand All @@ -153,7 +148,6 @@ def _transform(
records: Iterable[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
for record in records:
for transformation in self.transformations:
Expand All @@ -162,6 +156,5 @@ def _transform(
config=self.config,
stream_state=stream_state,
stream_slice=stream_slice,
stream_interval=stream_interval,
)
yield record
8 changes: 2 additions & 6 deletions airbyte_cdk/sources/declarative/interpolation/jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def is_safe_attribute(self, obj: Any, attr: str, value: Any) -> bool:

# These aliases are used to deprecate existing keywords without breaking all existing connectors.
_ALIASES = {
"stream_interval": "stream_state", # Use stream_interval to access incremental sync values
"stream_partition": "stream_slice", # Use stream_partition to access partition router's values
}

Expand Down Expand Up @@ -95,12 +96,7 @@ def eval(
context = {"config": config, **additional_parameters}

for alias, equivalent in _ALIASES.items():
if alias in context:
# This is unexpected. We could ignore or log a warning, but failing loudly should result in fewer surprises
raise ValueError(
f"Found reserved keyword {alias} in interpolation context. This is unexpected and indicative of a bug in the CDK."
)
elif equivalent in context:
if equivalent in context:
context[alias] = context[equivalent]

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,22 @@ def eval_request_inputs(
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_interval: Optional[Dict[str, Any]] = None,
valid_key_types: Optional[Tuple[Type[Any]]] = None,
valid_value_types: Optional[Tuple[Type[Any], ...]] = None,
) -> Mapping[str, Any]:
"""
Returns the request inputs to set on an outgoing HTTP request
:param stream_state: The stream state (deprecated, use stream_interval instead)
:param stream_state: The stream state
:param stream_slice: The stream slice
:param next_page_token: The pagination token
:param stream_interval: The stream interval for incremental sync values
:param valid_key_types: A tuple of types that the interpolator should allow
:param valid_value_types: A tuple of types that the interpolator should allow
:return: The request inputs to set on an outgoing HTTP request
"""
kwargs = {
"stream_slice": stream_slice,
"stream_interval": stream_interval or stream_state, # Use stream_state as stream_interval for backward compatibility
"stream_state": stream_state,
"next_page_token": next_page_token,
}
interpolated_value = self._interpolator.eval( # type: ignore # self._interpolator is always initialized with a value and will not be None
Expand Down
7 changes: 0 additions & 7 deletions airbyte_cdk/sources/declarative/transformations/add_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class AddFields(RecordTransformation):
This transformation has access to the following contextual values:
record: the record about to be output by the connector
config: the input configuration provided to a connector
stream_interval: the stream interval for incremental sync values
stream_slice: the current stream slice being read
Expand All @@ -65,10 +64,6 @@ class AddFields(RecordTransformation):
- path: ["shop_id"]
value: "{{ config.shop_id }}"
# from stream_interval
- path: ["current_state"]
value: "{{ stream_interval.cursor_field }}"
# from record
- path: ["unnested_value"]
value: {{ record.nested.field }}
Expand Down Expand Up @@ -127,14 +122,12 @@ def transform(
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> None:
if config is None:
config = {}
kwargs = {
"record": record,
"stream_slice": stream_slice,
"stream_interval": stream_interval,
}
for parsed_field in self._parsed_fields:
valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def transform(
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> None:
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def transform(
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> None:
transformed_record = self.flatten_record(record)
record.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@ def transform(
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> None:
if config is None:
config = {}

kwargs = {
"record": record,
"stream_slice": stream_slice,
"stream_interval": stream_interval,
}
old_key = str(self._old.eval(config, **kwargs))
new_key = str(self._new.eval(config, **kwargs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def transform(
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> None:
for key in set(record.keys()):
record[key.lower()] = record.pop(key)
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def transform(
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> None:
transformed_record = self._transform_record(record)
record.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def transform(
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> None:
"""
:param record: The record to be transformed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def transform(
config: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
stream_slice: Optional[StreamSlice] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> None:
"""
Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.
Expand All @@ -31,7 +30,6 @@ def transform(
:param config: The user-provided configuration as specified by the source's spec
:param stream_state: The stream state
:param stream_slice: The stream slice
:param stream_interval: The stream interval for incremental sync values
:return: The transformed record
"""

Expand Down
12 changes: 6 additions & 6 deletions unit_tests/sources/declarative/datetime/test_min_max_datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,20 @@
)
def test_min_max_datetime(test_name, date, min_date, max_date, expected_date):
config = {"older": old_date, "middle": middle_date}
stream_interval = {"newer": new_date}
stream_state = {"newer": new_date}
parameters = {"newer": new_date, "older": old_date}

min_max_date = MinMaxDatetime(
datetime=date, min_datetime=min_date, max_datetime=max_date, parameters=parameters
)
actual_date = min_max_date.get_datetime(config, **{"stream_interval": stream_interval})
actual_date = min_max_date.get_datetime(config, **{"stream_interval": stream_state})

assert actual_date == datetime.datetime.strptime(expected_date, date_format)


def test_custom_datetime_format():
config = {"older": "2021-01-01T20:12:19", "middle": "2022-01-01T20:12:19"}
stream_interval = {"newer": "2022-06-24T20:12:19"}
stream_state = {"newer": "2022-06-24T20:12:19"}

min_max_date = MinMaxDatetime(
datetime="{{ config['middle'] }}",
Expand All @@ -108,7 +108,7 @@ def test_custom_datetime_format():
max_datetime="{{ stream_interval['newer'] }}",
parameters={},
)
actual_date = min_max_date.get_datetime(config, **{"stream_interval": stream_interval})
actual_date = min_max_date.get_datetime(config, **{"stream_interval": stream_state})

assert actual_date == datetime.datetime.strptime(
"2022-01-01T20:12:19", "%Y-%m-%dT%H:%M:%S"
Expand All @@ -117,7 +117,7 @@ def test_custom_datetime_format():

def test_format_is_a_number():
config = {"older": "20210101", "middle": "20220101"}
stream_interval = {"newer": "20220624"}
stream_state = {"newer": "20220624"}

min_max_date = MinMaxDatetime(
datetime="{{ config['middle'] }}",
Expand All @@ -126,7 +126,7 @@ def test_format_is_a_number():
max_datetime="{{ stream_interval['newer'] }}",
parameters={},
)
actual_date = min_max_date.get_datetime(config, **{"stream_interval": stream_interval})
actual_date = min_max_date.get_datetime(config, **{"stream_interval": stream_state})

assert actual_date == datetime.datetime.strptime("20220101", "%Y%m%d").replace(
tzinfo=datetime.timezone.utc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"filter_template, records, expected_records",
[
(
"{{ record['created_at'] > stream_interval['created_at'] }}",
"{{ record['created_at'] > stream_state['created_at'] }}",
[
{"id": 1, "created_at": "06-06-21"},
{"id": 2, "created_at": "06-07-21"},
Expand Down Expand Up @@ -116,7 +116,7 @@ def test_record_filter(
):
config = {"response_override": "stop_if_you_see_me"}
parameters = {"created_at": "06-07-21"}
stream_interval = {"created_at": "06-06-21"}
stream_state = {"created_at": "06-06-21"}
stream_slice = StreamSlice(
partition={},
cursor_slice={"last_seen": "06-10-21"},
Expand All @@ -128,10 +128,9 @@ def test_record_filter(
actual_records = list(
record_filter.filter_records(
records,
stream_state={},
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
stream_interval=stream_interval,
)
)
assert actual_records == expected_records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
(
"test_with_extractor_and_filter",
["data"],
"{{ record['created_at'] > stream_interval['created_at'] }}",
"{{ record['created_at'] > stream_state['created_at'] }}",
{
"data": [
{"id": 1, "created_at": "06-06-21"},
Expand Down Expand Up @@ -79,7 +79,7 @@
def test_record_filter(test_name, field_path, filter_template, body, expected_data):
config = {"response_override": "stop_if_you_see_me"}
parameters = {"parameters_field": "data", "created_at": "06-07-21"}
stream_interval = {"created_at": "06-06-21"}
stream_state = {"created_at": "06-06-21"}
stream_slice = StreamSlice(partition={}, cursor_slice={"last_seen": "06-10-21"})
next_page_token = {"last_seen_id": 14}
schema = create_schema()
Expand Down

0 comments on commit 11269f2

Please sign in to comment.