From 98583c4525f27ce4815e882b00948bcaa8b4654f Mon Sep 17 00:00:00 2001 From: brianjlai Date: Wed, 16 Apr 2025 05:58:10 -0700 Subject: [PATCH 1/6] switch the ordering page iteration and property chunking process chunks first instead of pages first --- .../retrievers/simple_retriever.py | 187 ++++++++---------- 1 file changed, 86 insertions(+), 101 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index a739a628a..8aabdc4bd 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -367,14 +367,66 @@ def _read_pages( {"next_page_token": initial_token} if initial_token is not None else None ) while not pagination_complete: - response = self._fetch_next_page(stream_state, stream_slice, next_page_token) + property_chunks = ( + list( + self.additional_query_properties.get_request_property_chunks( + stream_slice=stream_slice + ) + ) + if self.additional_query_properties + else [ + None + ] # A single None property chunk represents the case where property chunking is not configured + ) + records_without_merge_key = [] + merged_records: MutableMapping[str, Any] = defaultdict(dict) last_page_size = 0 last_record: Optional[Record] = None - for record in records_generator_fn(response): - last_page_size += 1 - last_record = record - yield record + response: Optional[requests.Response] = None + for properties in property_chunks: + if properties: + stream_slice = StreamSlice( + partition=stream_slice.partition or {}, + cursor_slice=stream_slice.cursor_slice or {}, + extra_fields={"query_properties": properties}, + ) + + response = self._fetch_next_page(stream_state, stream_slice, next_page_token) + for current_record in records_generator_fn(response): + if ( + current_record + and self.additional_query_properties + and self.additional_query_properties.property_chunking + ): + merge_key = ( + self.additional_query_properties.property_chunking.get_merge_key( + current_record + ) + ) + if merge_key: + merged_records[merge_key].update(current_record) + else: + # We should still emit records even if the record did not have a merge key + last_page_size += 1 + last_record = current_record + yield current_record + else: + last_page_size += 1 + last_record = current_record + yield current_record + + if ( + self.additional_query_properties + and self.additional_query_properties.property_chunking + ): + for merged_record in merged_records.values(): + record = Record( + data=merged_record, stream_name=self.name, associated_slice=stream_slice + ) + last_page_size += 1 + last_record = record + yield record if not response: pagination_complete = True @@ -449,110 +501,43 @@ def read_records( :param stream_slice: The stream slice to read data for :return: The records read from the API source """ - - property_chunks = ( - list( - self.additional_query_properties.get_request_property_chunks( - stream_slice=stream_slice - ) - ) - if self.additional_query_properties - else [] - ) - records_without_merge_key = [] - merged_records: MutableMapping[str, Any] = defaultdict(dict) - _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check + most_recent_record_from_slice = None + record_generator = partial( + self._parse_records, + stream_slice=stream_slice, + stream_state=self.state or {}, + records_schema=records_schema, + ) - if self.additional_query_properties: - for properties in property_chunks: - _slice = StreamSlice( - partition=_slice.partition or {}, - cursor_slice=_slice.cursor_slice or {}, - extra_fields={"query_properties": properties}, - ) # None-check - - record_generator = partial( - self._parse_records, - stream_slice=_slice, - stream_state=self.state or {}, - records_schema=records_schema, - ) + if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): + stream_state = self.state - for stream_data in self._read_pages(record_generator, self.state, _slice): - current_record = self._extract_record(stream_data, _slice) - if self.cursor and current_record: - self.cursor.observe(_slice, current_record) + # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to + # fetch more records. The platform deletes stream state for full refresh streams before starting a + # new job, so we don't need to worry about this value existing for the initial attempt + if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): + return - # Latest record read, not necessarily within slice boundaries. - # TODO Remove once all custom components implement `observe` method. - # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 - most_recent_record_from_slice = self._get_most_recent_record( - most_recent_record_from_slice, current_record, _slice - ) + yield from self._read_single_page(record_generator, stream_state, _slice) + else: + for stream_data in self._read_pages(record_generator, self.state, _slice): + current_record = self._extract_record(stream_data, _slice) + if self.cursor and current_record: + self.cursor.observe(_slice, current_record) + + # Latest record read, not necessarily within slice boundaries. + # TODO Remove once all custom components implement `observe` method. + # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 + most_recent_record_from_slice = self._get_most_recent_record( + most_recent_record_from_slice, current_record, _slice + ) + yield stream_data - if current_record and self.additional_query_properties.property_chunking: - merge_key = ( - self.additional_query_properties.property_chunking.get_merge_key( - current_record - ) - ) - if merge_key: - merged_records[merge_key].update(current_record) - else: - # We should still emit records even if the record did not have a merge key - records_without_merge_key.append(current_record) - else: - yield stream_data if self.cursor: self.cursor.close_slice(_slice, most_recent_record_from_slice) - - if len(merged_records) > 0: - yield from [ - Record(data=merged_record, stream_name=self.name, associated_slice=stream_slice) - for merged_record in merged_records.values() - ] - if len(records_without_merge_key) > 0: - yield from records_without_merge_key - else: - _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check - - most_recent_record_from_slice = None - record_generator = partial( - self._parse_records, - stream_slice=stream_slice, - stream_state=self.state or {}, - records_schema=records_schema, - ) - - if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): - stream_state = self.state - - # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to - # fetch more records. The platform deletes stream state for full refresh streams before starting a - # new job, so we don't need to worry about this value existing for the initial attempt - if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): - return - - yield from self._read_single_page(record_generator, stream_state, _slice) - else: - for stream_data in self._read_pages(record_generator, self.state, _slice): - current_record = self._extract_record(stream_data, _slice) - if self.cursor and current_record: - self.cursor.observe(_slice, current_record) - - # Latest record read, not necessarily within slice boundaries. - # TODO Remove once all custom components implement `observe` method. - # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 - most_recent_record_from_slice = self._get_most_recent_record( - most_recent_record_from_slice, current_record, _slice - ) - yield stream_data - - if self.cursor: - self.cursor.close_slice(_slice, most_recent_record_from_slice) - return + return def _get_most_recent_record( self, From fe7c45804c6da2e720786ab33b05fbda7e28293a Mon Sep 17 00:00:00 2001 From: brianjlai Date: Wed, 16 Apr 2025 06:14:28 -0700 Subject: [PATCH 2/6] fix mypy --- .../sources/declarative/retrievers/simple_retriever.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 8aabdc4bd..43ab89eb1 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -367,7 +367,7 @@ def _read_pages( {"next_page_token": initial_token} if initial_token is not None else None ) while not pagination_complete: - property_chunks = ( + property_chunks: List[List[str]] = ( list( self.additional_query_properties.get_request_property_chunks( stream_slice=stream_slice @@ -375,17 +375,16 @@ def _read_pages( ) if self.additional_query_properties else [ - None - ] # A single None property chunk represents the case where property chunking is not configured + [] + ] # A single empty property chunk represents the case where property chunking is not configured ) - records_without_merge_key = [] merged_records: MutableMapping[str, Any] = defaultdict(dict) last_page_size = 0 last_record: Optional[Record] = None response: Optional[requests.Response] = None for properties in property_chunks: - if properties: + if len(properties) > 0: stream_slice = StreamSlice( partition=stream_slice.partition or {}, cursor_slice=stream_slice.cursor_slice or {}, From ea877a8c9803e6c45beef56a6de67d04fe68b06b Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 17 Apr 2025 15:15:48 +0300 Subject: [PATCH 3/6] Fix merging records for property chunking --- .../declarative/retrievers/simple_retriever.py | 17 ++++++++++++++++- .../retrievers/test_simple_retriever.py | 18 ++++++++++++++++-- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 43ab89eb1..cb87da19c 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -10,6 +10,7 @@ from typing import ( Any, Callable, + Dict, Iterable, List, Mapping, @@ -404,7 +405,7 @@ def _read_pages( ) ) if merge_key: - merged_records[merge_key].update(current_record) + deep_merge(merged_records[merge_key], current_record) else: # We should still emit records even if the record did not have a merge key last_page_size += 1 @@ -623,6 +624,20 @@ def _to_partition_key(to_serialize: Any) -> str: return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True) +def deep_merge(target: Dict[str, Any], source: Union[Record, Dict[str, Any]]) -> None: + """ + Recursively merge two dictionaries, combining nested dictionaries instead of overwriting them. + + :param target: The dictionary to merge into (modified in place) + :param source: The dictionary to merge from + """ + for key, value in source.items(): + if key in target and isinstance(target[key], dict) and isinstance(value, dict): + deep_merge(target[key], value) + else: + target[key] = value + + @dataclass class SimpleRetrieverTestReadDecorator(SimpleRetriever): """ diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 0425b4e84..36e539442 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1148,6 +1148,10 @@ def test_simple_retriever_with_additional_query_properties(): "last_name": "hongou", "nonary": "second", "bracelet": "1", + "dict_field": { + "key1": "value1", + "key2": "value2", + }, }, associated_slice=None, stream_name=stream_name, @@ -1216,7 +1220,12 @@ def test_simple_retriever_with_additional_query_properties(): record_selector.select_records.side_effect = [ [ Record( - data={"id": "a", "first_name": "gentarou", "last_name": "hongou"}, + data={ + "id": "a", + "first_name": "gentarou", + "last_name": "hongou", + "dict_field": {"key1": "value1"}, + }, associated_slice=None, stream_name=stream_name, ), @@ -1263,7 +1272,12 @@ def test_simple_retriever_with_additional_query_properties(): stream_name=stream_name, ), Record( - data={"id": "a", "nonary": "second", "bracelet": "1"}, + data={ + "id": "a", + "nonary": "second", + "bracelet": "1", + "dict_field": {"key2": "value2"}, + }, associated_slice=None, stream_name=stream_name, ), From 90a247fc0f55051d3010376fc06971ed5bb7e3a4 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Thu, 17 Apr 2025 08:17:20 -0700 Subject: [PATCH 4/6] extra test cases and change types --- .../declarative/retrievers/simple_retriever.py | 14 ++++++++++---- .../retrievers/test_simple_retriever.py | 16 +++++++++++++--- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index cb87da19c..3aa3ffb87 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -405,7 +405,7 @@ def _read_pages( ) ) if merge_key: - deep_merge(merged_records[merge_key], current_record) + _deep_merge(merged_records[merge_key], current_record) else: # We should still emit records even if the record did not have a merge key last_page_size += 1 @@ -624,7 +624,9 @@ def _to_partition_key(to_serialize: Any) -> str: return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True) -def deep_merge(target: Dict[str, Any], source: Union[Record, Dict[str, Any]]) -> None: +def _deep_merge( + target: MutableMapping[str, Any], source: Union[Record, MutableMapping[str, Any]] +) -> None: """ Recursively merge two dictionaries, combining nested dictionaries instead of overwriting them. @@ -632,8 +634,12 @@ def deep_merge(target: Dict[str, Any], source: Union[Record, Dict[str, Any]]) -> :param source: The dictionary to merge from """ for key, value in source.items(): - if key in target and isinstance(target[key], dict) and isinstance(value, dict): - deep_merge(target[key], value) + if ( + key in target + and isinstance(target[key], MutableMapping) + and isinstance(value, MutableMapping) + ): + _deep_merge(target[key], value) else: target[key] = value diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 36e539442..ff09df2ca 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1151,6 +1151,10 @@ def test_simple_retriever_with_additional_query_properties(): "dict_field": { "key1": "value1", "key2": "value2", + "affiliation": { + "company": "cradle", + "industry": "pharmaceutical", + }, }, }, associated_slice=None, @@ -1174,6 +1178,7 @@ def test_simple_retriever_with_additional_query_properties(): "last_name": "kurashiki", "nonary": "second", "bracelet": "6", + "allies": ["aoi_kurashiki"], }, associated_slice=None, stream_name=stream_name, @@ -1224,7 +1229,7 @@ def test_simple_retriever_with_additional_query_properties(): "id": "a", "first_name": "gentarou", "last_name": "hongou", - "dict_field": {"key1": "value1"}, + "dict_field": {"key1": "value1", "affiliation": {"company": "cradle"}}, }, associated_slice=None, stream_name=stream_name, @@ -1235,7 +1240,12 @@ def test_simple_retriever_with_additional_query_properties(): stream_name=stream_name, ), Record( - data={"id": "c", "first_name": "akane", "last_name": "kurashiki"}, + data={ + "id": "c", + "first_name": "akane", + "last_name": "kurashiki", + "allies": ["aoi_kurashiki"], + }, associated_slice=None, stream_name=stream_name, ), @@ -1276,7 +1286,7 @@ def test_simple_retriever_with_additional_query_properties(): "id": "a", "nonary": "second", "bracelet": "1", - "dict_field": {"key2": "value2"}, + "dict_field": {"key2": "value2", "affiliation": {"industry": "pharmaceutical"}}, }, associated_slice=None, stream_name=stream_name, From a2478af2ba82c4210fa1b08ff24f176e467cbfd8 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 17 Apr 2025 20:30:50 +0300 Subject: [PATCH 5/6] Fix property field size counting by counting delimiter too --- .../requesters/query_properties/property_chunking.py | 3 ++- .../requesters/query_properties/test_property_chunking.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py index 53f387775..b997a3d93 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py @@ -52,8 +52,9 @@ def get_request_property_chunks( chunk_size = 0 for property_field in property_fields: # If property_limit_type is not defined, we default to property_count which is just an incrementing count + # +2 for the comma ToDo: Add possibility to specify parameter representation and take it into account in property_field_size property_field_size = ( - len(property_field) + len(property_field) + 2 if self.property_limit_type == PropertyLimitType.characters else 1 ) diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py index d05c66df6..6afc231bb 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py @@ -44,7 +44,7 @@ None, PropertyLimitType.characters, 10, - [["kate", "laurie"], ["jaclyn"]], + [["kate"], ["laurie"], ["jaclyn"]], id="test_property_chunking_limit_characters", ), pytest.param( From 5c6ff1c4a39a9e9928b931f0b699d8bf592dc565 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Thu, 17 Apr 2025 14:37:05 -0700 Subject: [PATCH 6/6] adjust property chunking splitting count and tests --- .../requesters/query_properties/property_chunking.py | 5 +++-- .../query_properties/test_property_chunking.py | 12 ++++++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py index b997a3d93..1020c0827 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py @@ -52,9 +52,10 @@ def get_request_property_chunks( chunk_size = 0 for property_field in property_fields: # If property_limit_type is not defined, we default to property_count which is just an incrementing count - # +2 for the comma ToDo: Add possibility to specify parameter representation and take it into account in property_field_size + # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size property_field_size = ( - len(property_field) + 2 + len(property_field) + + 1 # The +1 represents the extra character for the delimiter in between properties if self.property_limit_type == PropertyLimitType.characters else 1 ) diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py index 6afc231bb..381bb07a6 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py @@ -43,10 +43,18 @@ ["kate", "laurie", "jaclyn"], None, PropertyLimitType.characters, - 10, - [["kate"], ["laurie"], ["jaclyn"]], + 15, + [["kate", "laurie"], ["jaclyn"]], id="test_property_chunking_limit_characters", ), + pytest.param( + ["laurie", "jaclyn", "kaitlin"], + None, + PropertyLimitType.characters, + 12, + [["laurie"], ["jaclyn"], ["kaitlin"]], + id="test_property_chunking_includes_extra_delimiter", + ), pytest.param( [], None,