Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Removes stream_state interpolation from CDK #320

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 0 additions & 75 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
PerPartitionWithGlobalCursor,
)
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConcurrencyLevel as ConcurrencyLevelModel,
Expand All @@ -36,13 +35,11 @@
ModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
DeclarativePartitionFactory,
StreamSlicerPartitionGenerator,
)
from airbyte_cdk.sources.declarative.transformations.add_fields import AddFields
from airbyte_cdk.sources.declarative.types import ConnectionDefinition
from airbyte_cdk.sources.source import TState
from airbyte_cdk.sources.streams import Stream
Expand Down Expand Up @@ -320,9 +317,6 @@ def _group_streams(
incremental_sync_component_definition
and incremental_sync_component_definition.get("type", "")
== DatetimeBasedCursorModel.__name__
and self._stream_supports_concurrent_partition_processing(
declarative_stream=declarative_stream
)
and hasattr(declarative_stream.retriever, "stream_slicer")
and isinstance(
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor
Expand Down Expand Up @@ -387,82 +381,13 @@ def _is_datetime_incremental_without_partition_routing(
and bool(incremental_sync_component_definition)
and incremental_sync_component_definition.get("type", "")
== DatetimeBasedCursorModel.__name__
and self._stream_supports_concurrent_partition_processing(
declarative_stream=declarative_stream
)
and hasattr(declarative_stream.retriever, "stream_slicer")
and (
isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
)
)

def _stream_supports_concurrent_partition_processing(
self, declarative_stream: DeclarativeStream
) -> bool:
"""
Many connectors make use of stream_state during interpolation on a per-partition basis under the assumption that
state is updated sequentially. Because the concurrent CDK engine processes different partitions in parallel,
stream_state is no longer a thread-safe interpolation context. It would be a race condition because a cursor's
stream_state can be updated in any order depending on which stream partition's finish first.

We should start to move away from depending on the value of stream_state for low-code components that operate
per-partition, but we need to gate this otherwise some connectors will be blocked from publishing. See the
cdk-migrations.md for the full list of connectors.
"""

if isinstance(declarative_stream.retriever, SimpleRetriever) and isinstance(
declarative_stream.retriever.requester, HttpRequester
):
http_requester = declarative_stream.retriever.requester
if "stream_state" in http_requester._path.string:
self.logger.warning(
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing"
)
return False

request_options_provider = http_requester._request_options_provider
if request_options_provider.request_options_contain_stream_state():
self.logger.warning(
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing"
)
return False

record_selector = declarative_stream.retriever.record_selector
if isinstance(record_selector, RecordSelector):
if (
record_selector.record_filter
and not isinstance(
record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
)
and "stream_state" in record_selector.record_filter.condition
):
self.logger.warning(
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the RecordFilter which is not thread-safe. Defaulting to synchronous processing"
)
return False

for add_fields in [
transformation
for transformation in record_selector.transformations
if isinstance(transformation, AddFields)
]:
for field in add_fields.fields:
if isinstance(field.value, str) and "stream_state" in field.value:
self.logger.warning(
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe. Defaulting to synchronous processing"
)
return False
if (
isinstance(field.value, InterpolatedString)
and "stream_state" in field.value.string
):
self.logger.warning(
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe. Defaulting to synchronous processing"
)
return False
return True

@staticmethod
def _get_retriever(
declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- "{{ record['updates'] }}"
- "{{ record['MetaData']['LastUpdatedTime'] }}"
Expand Down Expand Up @@ -794,7 +793,7 @@ definitions:
description: This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month)
type: object
required:
- target
- target
properties:
target:
title: Target
Expand Down Expand Up @@ -1611,7 +1610,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- "/products"
- "/quotes/{{ stream_partition['id'] }}/quote_line_groups"
Expand Down Expand Up @@ -1661,7 +1659,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- |
[{"clause": {"type": "timestamp", "operator": 10, "parameters":
Expand All @@ -1679,7 +1676,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- sort_order: "ASC"
sort_field: "CREATED_AT"
Expand All @@ -1700,7 +1696,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- Output-Format: JSON
- Version: "{{ config['version'] }}"
Expand All @@ -1717,7 +1712,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- unit: "day"
- query: 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
Expand Down Expand Up @@ -2072,7 +2066,6 @@ definitions:
interpolation_context:
- config
- record
- stream_state
- stream_slice
new:
type: string
Expand All @@ -2086,7 +2079,6 @@ definitions:
interpolation_context:
- config
- record
- stream_state
- stream_slice
$parameters:
type: object
Expand Down Expand Up @@ -2753,7 +2745,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- "{{ record['created_at'] >= stream_interval['start_time'] }}"
- "{{ record.status in ['active', 'expired'] }}"
Expand Down
13 changes: 13 additions & 0 deletions airbyte_cdk/sources/declarative/interpolation/jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
from jinja2.exceptions import UndefinedError
from jinja2.sandbox import SandboxedEnvironment

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.interpolation.filters import filters
from airbyte_cdk.sources.declarative.interpolation.interpolation import Interpolation
from airbyte_cdk.sources.declarative.interpolation.macros import macros
from airbyte_cdk.sources.types import Config
from airbyte_cdk.utils import AirbyteTracedException


class StreamPartitionAccessEnvironment(SandboxedEnvironment):
Expand All @@ -36,6 +38,10 @@ def is_safe_attribute(self, obj: Any, attr: str, value: Any) -> bool:
"stream_partition": "stream_slice", # Use stream_partition to access partition router's values
}

_UNSUPPORTED_INTERPOLATION_VARIABLES: Mapping[str, str] = {
"stream_state": "`stream_state` is no longer supported for interpolation. We recommend using `stream_interval` instead. Please reference the CDK Migration Guide for more information.",
}

# These extensions are not installed so they're not currently a problem,
# but we're still explicitly removing them from the jinja context.
# At worst, this is documentation that we do NOT want to include these extensions because of the potential security risks
Expand Down Expand Up @@ -95,6 +101,13 @@ def eval(
elif equivalent in context:
context[alias] = context[equivalent]

for variable_name in _UNSUPPORTED_INTERPOLATION_VARIABLES:
if variable_name in input_str:
raise AirbyteTracedException(
message=_UNSUPPORTED_INTERPOLATION_VARIABLES[variable_name],
internal_message=_UNSUPPORTED_INTERPOLATION_VARIABLES[variable_name],
failure_type=FailureType.config_error,
)
try:
if isinstance(input_str, str):
result = self._eval(input_str, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ def get_path(
next_page_token: Optional[Mapping[str, Any]],
) -> str:
kwargs = {
"stream_state": stream_state,
"stream_slice": stream_slice,
"next_page_token": next_page_token,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
NestedMapping,
)
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
from airbyte_cdk.sources.types import Config, StreamSlice


@dataclass
Expand Down Expand Up @@ -42,20 +42,17 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

def eval_request_inputs(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""
Returns the request inputs to set on an outgoing HTTP request

:param stream_state: The stream state
:param stream_slice: The stream slice
:param next_page_token: The pagination token
:return: The request inputs to set on an outgoing HTTP request
"""
kwargs = {
"stream_state": stream_state,
"stream_slice": stream_slice,
"next_page_token": next_page_token,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

def eval_request_inputs(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
valid_key_types: Optional[Tuple[Type[Any]]] = None,
Expand All @@ -46,15 +45,13 @@ def eval_request_inputs(
"""
Returns the request inputs to set on an outgoing HTTP request

:param stream_state: The stream state
:param stream_slice: The stream slice
:param next_page_token: The pagination token
:param valid_key_types: A tuple of types that the interpolator should allow
:param valid_value_types: A tuple of types that the interpolator should allow
:return: The request inputs to set on an outgoing HTTP request
"""
kwargs = {
"stream_state": stream_state,
"stream_slice": stream_slice,
"next_page_token": next_page_token,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from dataclasses import InitVar, dataclass, field
from typing import Any, Mapping, MutableMapping, Optional, Union

from typing_extensions import deprecated

from airbyte_cdk.sources.declarative.interpolation.interpolated_nested_mapping import NestedMapping
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_nested_request_input_provider import (
InterpolatedNestedRequestInputProvider,
Expand All @@ -17,7 +15,6 @@
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import (
RequestOptionsProvider,
)
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

RequestInput = Union[str, Mapping[str, str]]
Expand Down Expand Up @@ -80,7 +77,6 @@ def get_request_params(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
interpolated_value = self._parameter_interpolator.eval_request_inputs(
stream_state,
stream_slice,
next_page_token,
valid_key_types=(str,),
Expand All @@ -97,9 +93,7 @@ def get_request_headers(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._headers_interpolator.eval_request_inputs(
stream_state, stream_slice, next_page_token
)
return self._headers_interpolator.eval_request_inputs(stream_slice, next_page_token)

def get_request_body_data(
self,
Expand All @@ -109,7 +103,6 @@ def get_request_body_data(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Union[Mapping[str, Any], str]:
return self._body_data_interpolator.eval_request_inputs(
stream_state,
stream_slice,
next_page_token,
valid_key_types=(str,),
Expand All @@ -123,42 +116,4 @@ def get_request_body_json(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._body_json_interpolator.eval_request_inputs(
stream_state, stream_slice, next_page_token
)

@deprecated(
"This class is temporary and used to incrementally deliver low-code to concurrent",
category=ExperimentalClassWarning,
)
def request_options_contain_stream_state(self) -> bool:
"""
Temporary helper method used as we move low-code streams to the concurrent framework. This method determines if
the InterpolatedRequestOptionsProvider has is a dependency on a non-thread safe interpolation context such as
stream_state.
"""

return (
self._check_if_interpolation_uses_stream_state(self.request_parameters)
or self._check_if_interpolation_uses_stream_state(self.request_headers)
or self._check_if_interpolation_uses_stream_state(self.request_body_data)
or self._check_if_interpolation_uses_stream_state(self.request_body_json)
)

@staticmethod
def _check_if_interpolation_uses_stream_state(
request_input: Optional[Union[RequestInput, NestedMapping]],
) -> bool:
if not request_input:
return False
elif isinstance(request_input, str):
return "stream_state" in request_input
else:
for key, val in request_input.items():
# Covers the case of RequestInput in the form of a string or Mapping[str, str]. It also covers the case
# of a NestedMapping where the value is a string.
# Note: Doesn't account for nested mappings for request_body_json, but I don't see stream_state used in that way
# in our code
if "stream_state" in key or (isinstance(val, str) and "stream_state" in val):
return True
return False
return self._body_json_interpolator.eval_request_inputs(stream_slice, next_page_token)
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def transform(
) -> None:
if config is None:
config = {}
kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
kwargs = {"record": record, "stream_slice": stream_slice}
for parsed_field in self._parsed_fields:
valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs)
Expand Down
Loading
Loading