Skip to content

Commit

Permalink
fix: remove stream_state from request options interpolation
Browse files Browse the repository at this point in the history
  • Loading branch information
devin-ai-integration[bot] and natikgadzhi committed Feb 7, 2025
1 parent e7a6989 commit 74e37b9
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Mapping, Optional, Union
from typing import Any, Dict, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_nested_mapping import (
InterpolatedNestedMapping,
Expand Down Expand Up @@ -45,18 +45,20 @@ def eval_request_inputs(
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> Mapping[str, Any]:
"""
Returns the request inputs to set on an outgoing HTTP request
:param stream_state: The stream state
:param stream_state: The stream state (deprecated, use stream_interval instead)
:param stream_slice: The stream slice
:param next_page_token: The pagination token
:param stream_interval: The stream interval for incremental sync values
:return: The request inputs to set on an outgoing HTTP request
"""
kwargs = {
"stream_state": stream_state,
"stream_slice": stream_slice,
"stream_interval": stream_state, # Use stream_state as stream_interval for backward compatibility
"next_page_token": next_page_token,
}
return self._interpolator.eval(self.config, **kwargs) # type: ignore # self._interpolator is always initialized with a value and will not be None
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Mapping, Optional, Tuple, Type, Union
from typing import Any, Dict, Mapping, Optional, Tuple, Type, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
Expand Down Expand Up @@ -40,22 +40,24 @@ def eval_request_inputs(
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_interval: Optional[Dict[str, Any]] = None,
valid_key_types: Optional[Tuple[Type[Any]]] = None,
valid_value_types: Optional[Tuple[Type[Any], ...]] = None,
) -> Mapping[str, Any]:
"""
Returns the request inputs to set on an outgoing HTTP request
:param stream_state: The stream state
:param stream_state: The stream state (deprecated, use stream_interval instead)
:param stream_slice: The stream slice
:param next_page_token: The pagination token
:param stream_interval: The stream interval for incremental sync values
:param valid_key_types: A tuple of types that the interpolator should allow
:param valid_value_types: A tuple of types that the interpolator should allow
:return: The request inputs to set on an outgoing HTTP request
"""
kwargs = {
"stream_state": stream_state,
"stream_slice": stream_slice,
"stream_interval": stream_state, # Use stream_state as stream_interval for backward compatibility
"next_page_token": next_page_token,
}
interpolated_value = self._interpolator.eval( # type: ignore # self._interpolator is always initialized with a value and will not be None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,32 +133,7 @@ def get_request_body_json(
)
def request_options_contain_stream_state(self) -> bool:
"""
Temporary helper method used as we move low-code streams to the concurrent framework. This method determines if
the InterpolatedRequestOptionsProvider has is a dependency on a non-thread safe interpolation context such as
stream_state.
Returns whether any of the request options (parameters, headers, body_data, body_json) contain a reference to
stream_state, which is no longer supported.
"""

return (
self._check_if_interpolation_uses_stream_state(self.request_parameters)
or self._check_if_interpolation_uses_stream_state(self.request_headers)
or self._check_if_interpolation_uses_stream_state(self.request_body_data)
or self._check_if_interpolation_uses_stream_state(self.request_body_json)
)

@staticmethod
def _check_if_interpolation_uses_stream_state(
request_input: Optional[Union[RequestInput, NestedMapping]],
) -> bool:
if not request_input:
return False
elif isinstance(request_input, str):
return "stream_state" in request_input
else:
for key, val in request_input.items():
# Covers the case of RequestInput in the form of a string or Mapping[str, str]. It also covers the case
# of a NestedMapping where the value is a string.
# Note: Doesn't account for nested mappings for request_body_json, but I don't see stream_state used in that way
# in our code
if "stream_state" in key or (isinstance(val, str) and "stream_state" in val):
return True
return False

0 comments on commit 74e37b9

Please sign in to comment.