Skip to content

fix(property chunking): Switch the ordering page iteration and property chunking process chunks first instead of pages first #487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ def get_request_property_chunks(
chunk_size = 0
for property_field in property_fields:
# If property_limit_type is not defined, we default to property_count which is just an incrementing count
# todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
property_field_size = (
len(property_field)
+ 1 # The +1 represents the extra character for the delimiter in between properties
if self.property_limit_type == PropertyLimitType.characters
else 1
)
Expand Down
207 changes: 106 additions & 101 deletions airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Mapping,
Expand Down Expand Up @@ -367,14 +368,65 @@ def _read_pages(
{"next_page_token": initial_token} if initial_token is not None else None
)
while not pagination_complete:
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
property_chunks: List[List[str]] = (
list(
self.additional_query_properties.get_request_property_chunks(
stream_slice=stream_slice
)
)
if self.additional_query_properties
else [
[]
] # A single empty property chunk represents the case where property chunking is not configured
)

merged_records: MutableMapping[str, Any] = defaultdict(dict)
last_page_size = 0
last_record: Optional[Record] = None
for record in records_generator_fn(response):
last_page_size += 1
last_record = record
yield record
response: Optional[requests.Response] = None
for properties in property_chunks:
if len(properties) > 0:
stream_slice = StreamSlice(
partition=stream_slice.partition or {},
cursor_slice=stream_slice.cursor_slice or {},
extra_fields={"query_properties": properties},
)

response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
for current_record in records_generator_fn(response):
if (
current_record
and self.additional_query_properties
and self.additional_query_properties.property_chunking
):
merge_key = (
self.additional_query_properties.property_chunking.get_merge_key(
current_record
)
)
if merge_key:
_deep_merge(merged_records[merge_key], current_record)
else:
# We should still emit records even if the record did not have a merge key
last_page_size += 1
last_record = current_record
yield current_record
else:
last_page_size += 1
last_record = current_record
yield current_record

if (
self.additional_query_properties
and self.additional_query_properties.property_chunking
):
for merged_record in merged_records.values():
record = Record(
data=merged_record, stream_name=self.name, associated_slice=stream_slice
)
last_page_size += 1
last_record = record
yield record

if not response:
pagination_complete = True
Expand Down Expand Up @@ -449,110 +501,43 @@ def read_records(
:param stream_slice: The stream slice to read data for
:return: The records read from the API source
"""

property_chunks = (
list(
self.additional_query_properties.get_request_property_chunks(
stream_slice=stream_slice
)
)
if self.additional_query_properties
else []
)
records_without_merge_key = []
merged_records: MutableMapping[str, Any] = defaultdict(dict)

_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check

most_recent_record_from_slice = None
record_generator = partial(
self._parse_records,
stream_slice=stream_slice,
stream_state=self.state or {},
records_schema=records_schema,
)

if self.additional_query_properties:
for properties in property_chunks:
_slice = StreamSlice(
partition=_slice.partition or {},
cursor_slice=_slice.cursor_slice or {},
extra_fields={"query_properties": properties},
) # None-check

record_generator = partial(
self._parse_records,
stream_slice=_slice,
stream_state=self.state or {},
records_schema=records_schema,
)
if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
stream_state = self.state

for stream_data in self._read_pages(record_generator, self.state, _slice):
current_record = self._extract_record(stream_data, _slice)
if self.cursor and current_record:
self.cursor.observe(_slice, current_record)
# Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
# fetch more records. The platform deletes stream state for full refresh streams before starting a
# new job, so we don't need to worry about this value existing for the initial attempt
if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
return

# Latest record read, not necessarily within slice boundaries.
# TODO Remove once all custom components implement `observe` method.
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
most_recent_record_from_slice = self._get_most_recent_record(
most_recent_record_from_slice, current_record, _slice
)
yield from self._read_single_page(record_generator, stream_state, _slice)
else:
for stream_data in self._read_pages(record_generator, self.state, _slice):
current_record = self._extract_record(stream_data, _slice)
if self.cursor and current_record:
self.cursor.observe(_slice, current_record)

# Latest record read, not necessarily within slice boundaries.
# TODO Remove once all custom components implement `observe` method.
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
most_recent_record_from_slice = self._get_most_recent_record(
most_recent_record_from_slice, current_record, _slice
)
yield stream_data

if current_record and self.additional_query_properties.property_chunking:
merge_key = (
self.additional_query_properties.property_chunking.get_merge_key(
current_record
)
)
if merge_key:
merged_records[merge_key].update(current_record)
else:
# We should still emit records even if the record did not have a merge key
records_without_merge_key.append(current_record)
else:
yield stream_data
if self.cursor:
self.cursor.close_slice(_slice, most_recent_record_from_slice)

if len(merged_records) > 0:
yield from [
Record(data=merged_record, stream_name=self.name, associated_slice=stream_slice)
for merged_record in merged_records.values()
]
if len(records_without_merge_key) > 0:
yield from records_without_merge_key
else:
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check

most_recent_record_from_slice = None
record_generator = partial(
self._parse_records,
stream_slice=stream_slice,
stream_state=self.state or {},
records_schema=records_schema,
)

if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
stream_state = self.state

# Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
# fetch more records. The platform deletes stream state for full refresh streams before starting a
# new job, so we don't need to worry about this value existing for the initial attempt
if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
return

yield from self._read_single_page(record_generator, stream_state, _slice)
else:
for stream_data in self._read_pages(record_generator, self.state, _slice):
current_record = self._extract_record(stream_data, _slice)
if self.cursor and current_record:
self.cursor.observe(_slice, current_record)

# Latest record read, not necessarily within slice boundaries.
# TODO Remove once all custom components implement `observe` method.
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
most_recent_record_from_slice = self._get_most_recent_record(
most_recent_record_from_slice, current_record, _slice
)
yield stream_data

if self.cursor:
self.cursor.close_slice(_slice, most_recent_record_from_slice)
return
return

def _get_most_recent_record(
self,
Expand Down Expand Up @@ -639,6 +624,26 @@ def _to_partition_key(to_serialize: Any) -> str:
return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)


def _deep_merge(
target: MutableMapping[str, Any], source: Union[Record, MutableMapping[str, Any]]
) -> None:
"""
Recursively merge two dictionaries, combining nested dictionaries instead of overwriting them.

:param target: The dictionary to merge into (modified in place)
:param source: The dictionary to merge from
"""
for key, value in source.items():
if (
key in target
and isinstance(target[key], MutableMapping)
and isinstance(value, MutableMapping)
):
_deep_merge(target[key], value)
else:
target[key] = value


@dataclass
class SimpleRetrieverTestReadDecorator(SimpleRetriever):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,18 @@
["kate", "laurie", "jaclyn"],
None,
PropertyLimitType.characters,
10,
15,
[["kate", "laurie"], ["jaclyn"]],
id="test_property_chunking_limit_characters",
),
pytest.param(
["laurie", "jaclyn", "kaitlin"],
None,
PropertyLimitType.characters,
12,
[["laurie"], ["jaclyn"], ["kaitlin"]],
id="test_property_chunking_includes_extra_delimiter",
),
pytest.param(
[],
None,
Expand Down
30 changes: 27 additions & 3 deletions unit_tests/sources/declarative/retrievers/test_simple_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,14 @@ def test_simple_retriever_with_additional_query_properties():
"last_name": "hongou",
"nonary": "second",
"bracelet": "1",
"dict_field": {
"key1": "value1",
"key2": "value2",
"affiliation": {
"company": "cradle",
"industry": "pharmaceutical",
},
},
},
associated_slice=None,
stream_name=stream_name,
Expand All @@ -1170,6 +1178,7 @@ def test_simple_retriever_with_additional_query_properties():
"last_name": "kurashiki",
"nonary": "second",
"bracelet": "6",
"allies": ["aoi_kurashiki"],
},
associated_slice=None,
stream_name=stream_name,
Expand Down Expand Up @@ -1216,7 +1225,12 @@ def test_simple_retriever_with_additional_query_properties():
record_selector.select_records.side_effect = [
[
Record(
data={"id": "a", "first_name": "gentarou", "last_name": "hongou"},
data={
"id": "a",
"first_name": "gentarou",
"last_name": "hongou",
"dict_field": {"key1": "value1", "affiliation": {"company": "cradle"}},
},
associated_slice=None,
stream_name=stream_name,
),
Expand All @@ -1226,7 +1240,12 @@ def test_simple_retriever_with_additional_query_properties():
stream_name=stream_name,
),
Record(
data={"id": "c", "first_name": "akane", "last_name": "kurashiki"},
data={
"id": "c",
"first_name": "akane",
"last_name": "kurashiki",
"allies": ["aoi_kurashiki"],
},
associated_slice=None,
stream_name=stream_name,
),
Expand Down Expand Up @@ -1263,7 +1282,12 @@ def test_simple_retriever_with_additional_query_properties():
stream_name=stream_name,
),
Record(
data={"id": "a", "nonary": "second", "bracelet": "1"},
data={
"id": "a",
"nonary": "second",
"bracelet": "1",
"dict_field": {"key2": "value2", "affiliation": {"industry": "pharmaceutical"}},
},
associated_slice=None,
stream_name=stream_name,
),
Expand Down
Loading