Skip to content

feat: check for request_option mapping conflicts in individual components #328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.utils.mapping_helpers import _validate_component_request_option_paths


@dataclass
Expand Down Expand Up @@ -122,6 +123,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if not self.cursor_datetime_formats:
self.cursor_datetime_formats = [self.datetime_format]

_validate_component_request_option_paths(
self.config, self.start_time_option, self.end_time_option
)

def get_stream_state(self) -> StreamState:
return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {} # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
)
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.utils.mapping_helpers import (
_validate_component_request_option_paths,
)


@dataclass
Expand Down Expand Up @@ -113,6 +116,13 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if isinstance(self.url_base, str):
self.url_base = InterpolatedString(string=self.url_base, parameters=parameters)

if self.page_token_option and not isinstance(self.page_token_option, RequestPath):
_validate_component_request_option_paths(
self.config,
self.page_size_option,
self.page_token_option,
)

def get_initial_token(self) -> Optional[Any]:
"""
Return the page token that should be used for the first request of a stream
Expand Down
45 changes: 43 additions & 2 deletions airbyte_cdk/utils/mapping_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
import copy
from typing import Any, Dict, List, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.requesters.request_option import (
RequestOption,
RequestOptionType,
)
from airbyte_cdk.sources.types import Config


def _merge_mappings(
target: Dict[str, Any],
Expand Down Expand Up @@ -33,13 +39,17 @@ def _merge_mappings(
if isinstance(target_value, dict) and isinstance(source_value, dict):
# Only body_json supports nested_structures
if not allow_same_value_merge:
raise ValueError(f"Duplicate keys found: {'.'.join(current_path)}")
raise ValueError(
f"Request body collision, duplicate keys detected at key path: {'.'.join(current_path)}. Please ensure that all keys in the request are unique."
)
# If both are dictionaries, recursively merge them
_merge_mappings(target_value, source_value, current_path, allow_same_value_merge)

elif not allow_same_value_merge or target_value != source_value:
# If same key has different values, that's a conflict
raise ValueError(f"Duplicate keys found: {'.'.join(current_path)}")
raise ValueError(
f"Request body collision, duplicate keys detected at key path: {'.'.join(current_path)}. Please ensure that all keys in the request are unique."
)
else:
# No conflict, just copy the value (using deepcopy for nested structures)
target[key] = copy.deepcopy(source_value)
Expand Down Expand Up @@ -102,3 +112,34 @@ def combine_mappings(
_merge_mappings(result, mapping, allow_same_value_merge=allow_same_value_merge)

return result


def _validate_component_request_option_paths(
config: Config, *request_options: Optional[RequestOption]
) -> None:
"""
Validates that a component with multiple request options does not have conflicting paths.
Uses dummy values for validation since actual values might not be available at init time.
"""
grouped_options: Dict[RequestOptionType, List[RequestOption]] = {}
for option in request_options:
if option:
grouped_options.setdefault(option.inject_into, []).append(option)

for inject_type, options in grouped_options.items():
if len(options) <= 1:
continue

option_dicts: List[Optional[Union[Mapping[str, Any], str]]] = []
for i, option in enumerate(options):
option_dict: Dict[str, Any] = {}
# Use indexed dummy values to ensure we catch conflicts
option.inject_into_request(option_dict, f"dummy_value_{i}", config)
option_dicts.append(option_dict)

try:
combine_mappings(
option_dicts, allow_same_value_merge=(inject_type == RequestOptionType.body_json)
)
except ValueError as error:
raise ValueError(error)
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,29 @@ def test_paginator_with_page_option_no_page_size():
parameters={},
),
)


def test_request_option_mapping_validator():
pagination_strategy = PageIncrement(
config={}, page_size=1, start_from_page=0, parameters={}, inject_on_first_request=True
)

with pytest.raises(ValueError):
(
DefaultPaginator(
page_size_option=RequestOption(
field_path=["variables", "limit"],
inject_into=RequestOptionType.body_json,
parameters={},
),
page_token_option=RequestOption(
field_path=["variables", "limit"],
inject_into=RequestOptionType.body_json,
parameters={},
),
pagination_strategy=pagination_strategy,
config=MagicMock(),
url_base=MagicMock(),
parameters={},
),
)
120 changes: 113 additions & 7 deletions unit_tests/utils/test_mapping_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import pytest

from airbyte_cdk.utils.mapping_helpers import combine_mappings
from airbyte_cdk.utils.mapping_helpers import (
RequestOption,
RequestOptionType,
_validate_component_request_option_paths,
combine_mappings,
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -46,14 +51,14 @@ def test_string_handling(test_name, mappings, expected_result, expected_error):
@pytest.mark.parametrize(
"test_name, mappings, expected_error",
[
("duplicate_keys_same_value", [{"a": 1}, {"a": 1}], "Duplicate keys found"),
("duplicate_keys_different_value", [{"a": 1}, {"a": 2}], "Duplicate keys found"),
("duplicate_keys_same_value", [{"a": 1}, {"a": 1}], "duplicate keys detected"),
("duplicate_keys_different_value", [{"a": 1}, {"a": 2}], "duplicate keys detected"),
(
"nested_structure_not_allowed",
[{"a": {"b": 1}}, {"a": {"c": 2}}],
"Duplicate keys found",
"duplicate keys detected",
),
("any_nesting_not_allowed", [{"a": {"b": 1}}, {"a": {"d": 2}}], "Duplicate keys found"),
("any_nesting_not_allowed", [{"a": {"b": 1}}, {"a": {"d": 2}}], "duplicate keys detected"),
],
)
def test_non_body_json_requests(test_name, mappings, expected_error):
Expand Down Expand Up @@ -96,13 +101,13 @@ def test_non_body_json_requests(test_name, mappings, expected_error):
"nested_conflict",
[{"a": {"b": 1}}, {"a": {"b": 2}}],
None,
"Duplicate keys found",
"duplicate keys detected",
),
(
"type_conflict",
[{"a": 1}, {"a": {"b": 2}}],
None,
"Duplicate keys found",
"duplicate keys detected",
),
],
)
Expand All @@ -113,3 +118,104 @@ def test_body_json_requests(test_name, mappings, expected_result, expected_error
combine_mappings(mappings, allow_same_value_merge=True)
else:
assert combine_mappings(mappings, allow_same_value_merge=True) == expected_result


@pytest.fixture
def mock_config() -> dict[str, str]:
return {"test": "config"}


@pytest.mark.parametrize(
"test_name, option1, option2, should_raise",
[
(
"different_fields",
RequestOption(
field_name="field1", inject_into=RequestOptionType.body_json, parameters={}
),
RequestOption(
field_name="field2", inject_into=RequestOptionType.body_json, parameters={}
),
False,
),
(
"same_field_name_header",
RequestOption(field_name="field", inject_into=RequestOptionType.header, parameters={}),
RequestOption(field_name="field", inject_into=RequestOptionType.header, parameters={}),
True,
),
(
"different_nested_paths",
RequestOption(
field_path=["data", "query1", "limit"],
inject_into=RequestOptionType.body_json,
parameters={},
),
RequestOption(
field_path=["data", "query2", "limit"],
inject_into=RequestOptionType.body_json,
parameters={},
),
False,
),
(
"same_nested_paths",
RequestOption(
field_path=["data", "query", "limit"],
inject_into=RequestOptionType.body_json,
parameters={},
),
RequestOption(
field_path=["data", "query", "limit"],
inject_into=RequestOptionType.body_json,
parameters={},
),
True,
),
(
"different_inject_types",
RequestOption(field_name="field", inject_into=RequestOptionType.header, parameters={}),
RequestOption(
field_name="field", inject_into=RequestOptionType.body_json, parameters={}
),
False,
),
],
)
def test_request_option_validation(test_name, option1, option2, should_raise, mock_config):
"""Test various combinations of request option validation"""
if should_raise:
with pytest.raises(ValueError, match="duplicate keys detected"):
_validate_component_request_option_paths(mock_config, option1, option2)
else:
_validate_component_request_option_paths(mock_config, option1, option2)


@pytest.mark.parametrize(
"test_name, options",
[
(
"none_options",
[
None,
RequestOption(
field_name="field", inject_into=RequestOptionType.header, parameters={}
),
None,
],
),
(
"single_option",
[
RequestOption(
field_name="field", inject_into=RequestOptionType.header, parameters={}
)
],
),
("all_none", [None, None, None]),
("empty_list", []),
],
)
def test_edge_cases(test_name, options, mock_config):
"""Test edge cases like None values and single options"""
_validate_component_request_option_paths(mock_config, *options)
Loading