Skip to content

Commit bf998bd

Browse files
authored
fix(OffsetIncrement Pagination Strategy): Fix bug where streams that use record filtering do not paginate correctly (#484)
1 parent 9de6cef commit bf998bd

File tree

5 files changed

+146
-18
lines changed

5 files changed

+146
-18
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+26-2
Original file line numberDiff line numberDiff line change
@@ -2066,6 +2066,7 @@ def create_default_paginator(
20662066
config: Config,
20672067
*,
20682068
url_base: str,
2069+
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
20692070
decoder: Optional[Decoder] = None,
20702071
cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None,
20712072
) -> Union[DefaultPaginator, PaginatorTestReadDecorator]:
@@ -2087,7 +2088,10 @@ def create_default_paginator(
20872088
else None
20882089
)
20892090
pagination_strategy = self._create_component_from_model(
2090-
model=model.pagination_strategy, config=config, decoder=decoder_to_use
2091+
model=model.pagination_strategy,
2092+
config=config,
2093+
decoder=decoder_to_use,
2094+
extractor_model=extractor_model,
20912095
)
20922096
if cursor_used_for_stop_condition:
20932097
pagination_strategy = StopConditionPaginationStrategyDecorator(
@@ -2584,7 +2588,12 @@ def create_oauth_authenticator(
25842588
)
25852589

25862590
def create_offset_increment(
2587-
self, model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any
2591+
self,
2592+
model: OffsetIncrementModel,
2593+
config: Config,
2594+
decoder: Decoder,
2595+
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
2596+
**kwargs: Any,
25882597
) -> OffsetIncrement:
25892598
if isinstance(decoder, PaginationDecoderDecorator):
25902599
inner_decoder = decoder.decoder
@@ -2599,10 +2608,24 @@ def create_offset_increment(
25992608
self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder))
26002609
)
26012610

2611+
# Ideally we would instantiate the runtime extractor from highest most level (in this case the SimpleRetriever)
2612+
# so that it can be shared by OffSetIncrement and RecordSelector. However, due to how we instantiate the
2613+
# decoder with various decorators here, but not in create_record_selector, it is simpler to retain existing
2614+
# behavior by having two separate extractors with identical behavior since they use the same extractor model.
2615+
# When we have more time to investigate we can look into reusing the same component.
2616+
extractor = (
2617+
self._create_component_from_model(
2618+
model=extractor_model, config=config, decoder=decoder_to_use
2619+
)
2620+
if extractor_model
2621+
else None
2622+
)
2623+
26022624
return OffsetIncrement(
26032625
page_size=model.page_size,
26042626
config=config,
26052627
decoder=decoder_to_use,
2628+
extractor=extractor,
26062629
inject_on_first_request=model.inject_on_first_request or False,
26072630
parameters=model.parameters or {},
26082631
)
@@ -2966,6 +2989,7 @@ def create_simple_retriever(
29662989
model=model.paginator,
29672990
config=config,
29682991
url_base=url_base,
2992+
extractor_model=model.record_selector.extractor,
29692993
decoder=decoder,
29702994
cursor_used_for_stop_condition=cursor_used_for_stop_condition,
29712995
)

airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py

+10
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
JsonDecoder,
1313
PaginationDecoderDecorator,
1414
)
15+
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import RecordExtractor
1516
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1617
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
1718
PaginationStrategy,
@@ -46,6 +47,7 @@ class OffsetIncrement(PaginationStrategy):
4647
config: Config
4748
page_size: Optional[Union[str, int]]
4849
parameters: InitVar[Mapping[str, Any]]
50+
extractor: Optional[RecordExtractor]
4951
decoder: Decoder = field(
5052
default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
5153
)
@@ -75,6 +77,14 @@ def next_page_token(
7577
) -> Optional[Any]:
7678
decoded_response = next(self.decoder.decode(response))
7779

80+
if self.extractor:
81+
page_size_from_response = len(list(self.extractor.extract_records(response=response)))
82+
# The extractor could return 0 records which is valid, but evaluates to False. Our fallback in other
83+
# cases as the best effort option is to use the incoming last_page_size
84+
last_page_size = (
85+
page_size_from_response if page_size_from_response is not None else last_page_size
86+
)
87+
7888
# Stop paginating when there are fewer records than the page size or the current page has no records
7989
if (
8090
self._page_size

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

+18-2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel
6767
from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel
6868
from airbyte_cdk.sources.declarative.models import DefaultPaginator as DefaultPaginatorModel
69+
from airbyte_cdk.sources.declarative.models import DpathExtractor as DpathExtractorModel
6970
from airbyte_cdk.sources.declarative.models import (
7071
GroupingPartitionRouter as GroupingPartitionRouterModel,
7172
)
@@ -1831,6 +1832,7 @@ def test_create_default_paginator():
18311832
component_definition=paginator_manifest,
18321833
config=input_config,
18331834
url_base="https://airbyte.io",
1835+
extractor_model=DpathExtractor(field_path=["results"], config=input_config, parameters={}),
18341836
decoder=JsonDecoder(parameters={}),
18351837
)
18361838

@@ -1968,6 +1970,7 @@ def test_create_default_paginator():
19681970
DefaultPaginator(
19691971
pagination_strategy=OffsetIncrement(
19701972
page_size=10,
1973+
extractor=None,
19711974
config={"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]},
19721975
parameters={},
19731976
),
@@ -2641,18 +2644,31 @@ def test_create_offset_increment():
26412644
page_size=10,
26422645
inject_on_first_request=True,
26432646
)
2647+
2648+
expected_extractor = DpathExtractor(field_path=["results"], config=input_config, parameters={})
2649+
extractor_model = DpathExtractorModel(
2650+
type="DpathExtractor", field_path=expected_extractor.field_path
2651+
)
2652+
26442653
expected_strategy = OffsetIncrement(
2645-
page_size=10, inject_on_first_request=True, parameters={}, config=input_config
2654+
page_size=10,
2655+
inject_on_first_request=True,
2656+
extractor=expected_extractor,
2657+
parameters={},
2658+
config=input_config,
26462659
)
26472660

26482661
strategy = factory.create_offset_increment(
2649-
model, input_config, decoder=JsonDecoder(parameters={})
2662+
model, input_config, extractor_model=extractor_model, decoder=JsonDecoder(parameters={})
26502663
)
26512664

26522665
assert strategy.page_size == expected_strategy.page_size
26532666
assert strategy.inject_on_first_request == expected_strategy.inject_on_first_request
26542667
assert strategy.config == input_config
26552668

2669+
assert isinstance(strategy.extractor, DpathExtractor)
2670+
assert strategy.extractor.field_path == expected_extractor.field_path
2671+
26562672

26572673
class MyCustomSchemaLoader(SchemaLoader):
26582674
def get_json_schema(self) -> Mapping[str, Any]:

unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py

+36-5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import requests
1010

1111
from airbyte_cdk.sources.declarative.decoders import JsonDecoder, XmlDecoder
12+
from airbyte_cdk.sources.declarative.extractors import DpathExtractor
1213
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1314
from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import (
1415
DefaultPaginator,
@@ -327,7 +328,13 @@ def test_initial_token_with_offset_pagination():
327328
)
328329
url_base = "https://airbyte.io"
329330
config = {}
330-
strategy = OffsetIncrement(config={}, page_size=2, parameters={}, inject_on_first_request=True)
331+
strategy = OffsetIncrement(
332+
config={},
333+
page_size=2,
334+
extractor=DpathExtractor(field_path=[], parameters={}, config={}),
335+
parameters={},
336+
inject_on_first_request=True,
337+
)
331338
paginator = DefaultPaginator(
332339
strategy,
333340
config,
@@ -348,7 +355,13 @@ def test_initial_token_with_offset_pagination():
348355
"pagination_strategy,last_page_size,expected_next_page_token,expected_second_next_page_token",
349356
[
350357
pytest.param(
351-
OffsetIncrement(config={}, page_size=10, parameters={}, inject_on_first_request=True),
358+
OffsetIncrement(
359+
config={},
360+
page_size=10,
361+
extractor=DpathExtractor(field_path=["results"], parameters={}, config={}),
362+
parameters={},
363+
inject_on_first_request=True,
364+
),
352365
10,
353366
{"next_page_token": 10},
354367
{"next_page_token": 20},
@@ -373,10 +386,23 @@ def test_no_inject_on_first_request_offset_pagination(
373386
"""
374387
Validate that the stateless next_page_token() works when the first page does not inject the value
375388
"""
376-
389+
response_body = {
390+
"results": [
391+
{"id": 1},
392+
{"id": 2},
393+
{"id": 3},
394+
{"id": 4},
395+
{"id": 5},
396+
{"id": 6},
397+
{"id": 7},
398+
{"id": 8},
399+
{"id": 9},
400+
{"id": 10},
401+
]
402+
}
377403
response = requests.Response()
378404
response.headers = {"A_HEADER": "HEADER_VALUE"}
379-
response._content = {}
405+
response._content = json.dumps(response_body).encode("utf-8")
380406

381407
last_record = Record(data={}, stream_name="test")
382408

@@ -430,7 +456,12 @@ def test_limit_page_fetched():
430456

431457

432458
def test_paginator_with_page_option_no_page_size():
433-
pagination_strategy = OffsetIncrement(config={}, page_size=None, parameters={})
459+
pagination_strategy = OffsetIncrement(
460+
config={},
461+
page_size=None,
462+
extractor=DpathExtractor(field_path=[], parameters={}, config={}),
463+
parameters={},
464+
)
434465

435466
with pytest.raises(ValueError):
436467
(

unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py

+56-9
Original file line numberDiff line numberDiff line change
@@ -8,57 +8,80 @@
88
import pytest
99
import requests
1010

11+
from airbyte_cdk.sources.declarative.extractors import DpathExtractor
1112
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import (
1213
OffsetIncrement,
1314
)
1415

1516

1617
@pytest.mark.parametrize(
17-
"page_size, parameters, last_page_size, last_record, last_page_token_value, expected_next_page_token, expected_offset",
18+
"page_size, parameters, response_results, last_page_size, last_record, last_page_token_value, expected_next_page_token, expected_offset",
1819
[
19-
pytest.param("2", {}, 2, {"id": 1}, 4, 6, 2, id="test_same_page_size"),
20-
pytest.param(2, {}, 2, {"id": 1}, 4, 6, 2, id="test_same_page_size"),
20+
pytest.param(
21+
"2", {}, [{"id": 1}, {"id": 2}], 2, {"id": 2}, 4, 6, 2, id="test_same_page_size"
22+
),
23+
pytest.param(
24+
2, {}, [{"id": 1}, {"id": 2}], 2, {"id": 2}, 4, 6, 2, id="test_same_page_size"
25+
),
2126
pytest.param(
2227
"{{ parameters['page_size'] }}",
2328
{"page_size": 3},
29+
[{"id": 1}, {"id": 2}],
2430
2,
2531
{"id": 1},
2632
3,
2733
None,
2834
0,
2935
id="test_larger_page_size",
3036
),
31-
pytest.param(None, {}, 0, [], 3, None, 0, id="test_stop_if_no_records"),
37+
pytest.param(None, {}, [], 0, [], 3, None, 0, id="test_stop_if_no_records"),
3238
pytest.param(
3339
"{{ response['page_metadata']['limit'] }}",
3440
{},
41+
[{"id": 1}, {"id": 2}],
3542
2,
36-
{"id": 1},
43+
{"id": 2},
3744
3,
3845
None,
3946
0,
4047
id="test_page_size_from_response",
4148
),
4249
pytest.param(
43-
2, {}, 2, {"id": 1}, None, 2, 2, id="test_get_second_page_with_first_page_not_injected"
50+
2,
51+
{},
52+
[{"id": 1}, {"id": 2}],
53+
2,
54+
{"id": 2},
55+
None,
56+
2,
57+
2,
58+
id="test_get_second_page_with_first_page_not_injected",
4459
),
4560
],
4661
)
4762
def test_offset_increment_paginator_strategy(
4863
page_size,
4964
parameters,
65+
response_results,
5066
last_page_size,
5167
last_record,
5268
last_page_token_value,
5369
expected_next_page_token,
5470
expected_offset,
5571
):
56-
paginator_strategy = OffsetIncrement(page_size=page_size, parameters=parameters, config={})
72+
extractor = DpathExtractor(field_path=["results"], parameters={}, config={})
73+
paginator_strategy = OffsetIncrement(
74+
page_size=page_size, extractor=extractor, parameters=parameters, config={}
75+
)
5776

5877
response = requests.Response()
5978

6079
response.headers = {"A_HEADER": "HEADER_VALUE"}
61-
response_body = {"next": "https://airbyte.io/next_url", "page_metadata": {"limit": 5}}
80+
response_body = {
81+
"results": response_results,
82+
"next": "https://airbyte.io/next_url",
83+
"page_metadata": {"limit": 5},
84+
}
6285
response._content = json.dumps(response_body).encode("utf-8")
6386

6487
next_page_token = paginator_strategy.next_page_token(
@@ -73,9 +96,28 @@ def test_offset_increment_paginator_strategy(
7396
assert expected_next_page_token == next_page_token
7497

7598

99+
def test_offset_increment_response_without_record_path():
100+
extractor = DpathExtractor(field_path=["results"], parameters={}, config={})
101+
paginator_strategy = OffsetIncrement(page_size=2, extractor=extractor, parameters={}, config={})
102+
103+
response = requests.Response()
104+
105+
response.headers = {"A_HEADER": "HEADER_VALUE"}
106+
response_body = {"next": "https://airbyte.io/next_url", "page_metadata": {"limit": 5}}
107+
response._content = json.dumps(response_body).encode("utf-8")
108+
109+
next_page_token = paginator_strategy.next_page_token(response, 2, None, 4)
110+
assert next_page_token is None
111+
112+
# Validate that the PaginationStrategy is stateless and calling next_page_token() again returns the same result
113+
next_page_token = paginator_strategy.next_page_token(response, 2, None, 4)
114+
assert next_page_token is None
115+
116+
76117
def test_offset_increment_paginator_strategy_rises():
77118
paginator_strategy = OffsetIncrement(
78119
page_size="{{ parameters['page_size'] }}",
120+
extractor=DpathExtractor(field_path=["results"], parameters={}, config={}),
79121
parameters={"page_size": "invalid value"},
80122
config={},
81123
)
@@ -94,8 +136,13 @@ def test_offset_increment_paginator_strategy_rises():
94136
def test_offset_increment_paginator_strategy_initial_token(
95137
inject_on_first_request: bool, expected_initial_token: Optional[Any]
96138
):
139+
extractor = DpathExtractor(field_path=[""], parameters={}, config={})
97140
paginator_strategy = OffsetIncrement(
98-
page_size=20, parameters={}, config={}, inject_on_first_request=inject_on_first_request
141+
page_size=20,
142+
extractor=extractor,
143+
parameters={},
144+
config={},
145+
inject_on_first_request=inject_on_first_request,
99146
)
100147

101148
assert paginator_strategy.initial_token == expected_initial_token

0 commit comments

Comments
 (0)