Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Removes stream_state interpolation from CDK #320

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- "{{ record['updates'] }}"
- "{{ record['MetaData']['LastUpdatedTime'] }}"
Expand Down Expand Up @@ -794,7 +793,7 @@ definitions:
description: This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month)
type: object
required:
- target
- target
properties:
target:
title: Target
Expand Down Expand Up @@ -1611,7 +1610,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- "/products"
- "/quotes/{{ stream_partition['id'] }}/quote_line_groups"
Expand Down Expand Up @@ -1661,7 +1659,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- |
[{"clause": {"type": "timestamp", "operator": 10, "parameters":
Expand All @@ -1679,7 +1676,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- sort_order: "ASC"
sort_field: "CREATED_AT"
Expand All @@ -1700,7 +1696,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- Output-Format: JSON
- Version: "{{ config['version'] }}"
Expand All @@ -1717,7 +1712,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- unit: "day"
- query: 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
Expand Down Expand Up @@ -2072,7 +2066,6 @@ definitions:
interpolation_context:
- config
- record
- stream_state
- stream_slice
new:
type: string
Expand All @@ -2086,7 +2079,6 @@ definitions:
interpolation_context:
- config
- record
- stream_state
- stream_slice
$parameters:
type: object
Expand Down Expand Up @@ -2753,7 +2745,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- "{{ record['created_at'] >= stream_interval['start_time'] }}"
- "{{ record.status in ['active', 'expired'] }}"
Expand Down
13 changes: 13 additions & 0 deletions airbyte_cdk/sources/declarative/interpolation/jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from airbyte_cdk.sources.declarative.interpolation.interpolation import Interpolation
from airbyte_cdk.sources.declarative.interpolation.macros import macros
from airbyte_cdk.sources.types import Config
from airbyte_cdk.models import FailureType
from airbyte_cdk.utils import AirbyteTracedException


class StreamPartitionAccessEnvironment(SandboxedEnvironment):
Expand All @@ -36,6 +38,10 @@ def is_safe_attribute(self, obj: Any, attr: str, value: Any) -> bool:
"stream_partition": "stream_slice", # Use stream_partition to access partition router's values
}

_UNSUPPORTED_INTERPOLATION_VARIABLES: Mapping[str, str] = {
"stream_state": "`stream_state` is no longer supported for interpolation. We recommend using `stream_interval` instead. Please reference the CDK Migration Guide for more information.",
}

# These extensions are not installed so they're not currently a problem,
# but we're still explicitly removing them from the jinja context.
# At worst, this is documentation that we do NOT want to include these extensions because of the potential security risks
Expand Down Expand Up @@ -95,6 +101,13 @@ def eval(
elif equivalent in context:
context[alias] = context[equivalent]

for variable_name in _UNSUPPORTED_INTERPOLATION_VARIABLES:
if variable_name in input_str:
raise AirbyteTracedException(
message=_UNSUPPORTED_INTERPOLATION_VARIABLES[variable_name],
internal_message=_UNSUPPORTED_INTERPOLATION_VARIABLES[variable_name],
failure_type=FailureType.config_error,
)
try:
if isinstance(input_str, str):
result = self._eval(input_str, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ def get_path(
next_page_token: Optional[Mapping[str, Any]],
) -> str:
kwargs = {
"stream_state": stream_state,
"stream_slice": stream_slice,
"next_page_token": next_page_token,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

def eval_request_inputs(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
valid_key_types: Optional[Tuple[Type[Any]]] = None,
Expand All @@ -54,7 +53,6 @@ def eval_request_inputs(
:return: The request inputs to set on an outgoing HTTP request
"""
kwargs = {
"stream_state": stream_state,
"stream_slice": stream_slice,
"next_page_token": next_page_token,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from dataclasses import InitVar, dataclass, field
from typing import Any, Mapping, MutableMapping, Optional, Union

from typing_extensions import deprecated

from airbyte_cdk.sources.declarative.interpolation.interpolated_nested_mapping import NestedMapping
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_nested_request_input_provider import (
InterpolatedNestedRequestInputProvider,
Expand All @@ -17,7 +15,6 @@
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import (
RequestOptionsProvider,
)
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

RequestInput = Union[str, Mapping[str, str]]
Expand Down Expand Up @@ -80,7 +77,6 @@ def get_request_params(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
interpolated_value = self._parameter_interpolator.eval_request_inputs(
stream_state,
stream_slice,
next_page_token,
valid_key_types=(str,),
Expand All @@ -98,7 +94,7 @@ def get_request_headers(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._headers_interpolator.eval_request_inputs(
stream_state, stream_slice, next_page_token
stream_slice, next_page_token
)

def get_request_body_data(
Expand All @@ -109,7 +105,6 @@ def get_request_body_data(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Union[Mapping[str, Any], str]:
return self._body_data_interpolator.eval_request_inputs(
stream_state,
stream_slice,
next_page_token,
valid_key_types=(str,),
Expand All @@ -126,39 +121,3 @@ def get_request_body_json(
return self._body_json_interpolator.eval_request_inputs(
stream_state, stream_slice, next_page_token
)

@deprecated(
"This class is temporary and used to incrementally deliver low-code to concurrent",
category=ExperimentalClassWarning,
)
def request_options_contain_stream_state(self) -> bool:
"""
Temporary helper method used as we move low-code streams to the concurrent framework. This method determines if
the InterpolatedRequestOptionsProvider has is a dependency on a non-thread safe interpolation context such as
stream_state.
"""

return (
self._check_if_interpolation_uses_stream_state(self.request_parameters)
or self._check_if_interpolation_uses_stream_state(self.request_headers)
or self._check_if_interpolation_uses_stream_state(self.request_body_data)
or self._check_if_interpolation_uses_stream_state(self.request_body_json)
)

@staticmethod
def _check_if_interpolation_uses_stream_state(
request_input: Optional[Union[RequestInput, NestedMapping]],
) -> bool:
if not request_input:
return False
elif isinstance(request_input, str):
return "stream_state" in request_input
else:
for key, val in request_input.items():
# Covers the case of RequestInput in the form of a string or Mapping[str, str]. It also covers the case
# of a NestedMapping where the value is a string.
# Note: Doesn't account for nested mappings for request_body_json, but I don't see stream_state used in that way
# in our code
if "stream_state" in key or (isinstance(val, str) and "stream_state" in val):
return True
return False
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def transform(
) -> None:
if config is None:
config = {}
kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
kwargs = {"record": record, "stream_slice": stream_slice}
for parsed_field in self._parsed_fields:
valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def test_initialize_interpolated_mapping_request_input_provider(
provider = InterpolatedRequestInputProvider(
request_inputs=input_request_data, config=config, parameters=parameters
)
actual_request_data = provider.eval_request_inputs(stream_state={}, stream_slice=stream_slice)
actual_request_data = provider.eval_request_inputs(stream_slice=stream_slice)

assert isinstance(provider._interpolator, InterpolatedMapping)
assert actual_request_data == expected_request_data
Loading