-
Notifications
You must be signed in to change notification settings - Fork 30
chore: Remove cursors, stream slicing, and resumable full refresh from declarative Retrievers #827
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@brian/remove_cursor_and_rfr_from_simple_retriever#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch brian/remove_cursor_and_rfr_from_simple_retrieverHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
📝 WalkthroughWalkthroughThis PR completes the migration from legacy cursor-based state management to concurrent cursor patterns by deprecating DeclarativeStream, removing stream_state and cursor-related APIs from retrievers, eliminating legacy cursor factory methods, and updating the component factory to return DefaultStream instead of DeclarativeStream. Changes
Sequence Diagram(s)sequenceDiagram
participant Factory as ModelToComponentFactory
participant Stream as DefaultStream
participant Retriever as SimpleRetriever
participant Slicer as StreamSlicer
Note over Factory: Legacy cursor flow<br/>(Before)
Factory->>Retriever: create with cursor=DatetimeBasedCursor
Retriever->>Retriever: maintains cursor.state
Note over Factory: Concurrent cursor flow<br/>(After)
Factory->>Stream: create DefaultStream
Stream->>Retriever: create with stream_slicer
Retriever->>Slicer: get stream_slices()
Retriever->>Retriever: read_records with stream_slice only<br/>(no stream_state parameter)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes
Possibly related PRs
Suggested labels
Suggested reviewers
Friendly suggestion: Wdyt about adding a brief migration guide or comment in DeclarativeStream's deprecation notice pointing users to DefaultStream? That could help maintainers of downstream connectors understand the migration path more clearly. 🤔 Pre-merge checks and finishing touches✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py(2 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py(1 hunks)airbyte_cdk/sources/declarative/retrievers/async_retriever.py(2 hunks)airbyte_cdk/sources/declarative/retrievers/retriever.py(1 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py(8 hunks)unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py(6 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py(0 hunks)unit_tests/sources/declarative/retrievers/test_simple_retriever.py(11 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py(6 hunks)
💤 Files with no reviewable changes (1)
- unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
unit_tests/legacy/sources/declarative/test_manifest_declarative_source.pyairbyte_cdk/sources/declarative/parsers/model_to_component_factory.pyunit_tests/sources/declarative/test_concurrent_declarative_source.py
📚 Learning: 2025-01-13T23:39:15.457Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.
Applied to files:
unit_tests/legacy/sources/declarative/test_manifest_declarative_source.pyunit_tests/sources/declarative/test_concurrent_declarative_source.py
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.pyairbyte_cdk/legacy/sources/declarative/declarative_stream.py
🧬 Code graph analysis (5)
airbyte_cdk/sources/declarative/retrievers/async_retriever.py (1)
airbyte_cdk/sources/types.py (1)
StreamSlice(75-169)
airbyte_cdk/sources/declarative/retrievers/retriever.py (4)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (2)
state(107-108)state(111-118)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
state(199-219)airbyte_cdk/sources/streams/http/http.py (2)
state(387-391)state(394-398)airbyte_cdk/sources/streams/core.py (2)
state(75-86)state(90-91)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
DefaultStream(17-123)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
airbyte_cdk/sources/types.py (2)
Record(21-72)StreamSlice(75-169)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (2)
airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py (1)
SinglePartitionRouter(13-57)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (9)
primary_key(280-282)primary_key(285-287)_next_page_token(289-308)_request_params(184-202)_request_body_data(204-223)_parse_records(459-469)read_records(438-457)_read_pages(343-431)_read_pages(510-533)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: MyPy Check
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: preview_docs
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (14)
unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py (1)
1567-1567: LGTM! Test expectations correctly updated for new paginator signature.The mock call expectations have been consistently updated across all test scenarios to reflect the removal of
stream_statefrom the_fetch_next_pagemethod signature. The new two-argument pattern (stream_slice,next_page_token) is correctly applied:
- Non-paginated, non-partitioned:
call({}, None)- Paginated, non-partitioned:
call({}, None),call({}, {"next_page_token": "next"})- Non-paginated, partitioned:
call({"partition": "0"}, None),call({"partition": "1"}, None)- Paginated, partitioned:
call({"partition": "0"}, None),call({"partition": "0"}, {"next_page_token": "next"})This aligns well with the broader migration from cursor-based state management to stream_slicer-based pagination.
Also applies to: 1654-1654, 1738-1738, 1825-1829, 1915-1920, 2022-2027
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (2)
8-8: Clean deprecation path with clear migration guidance.The
@deprecateddecorator provides users with a clear message pointing toDefaultStreamas the replacement. This is good practice for guiding users through the migration.Also applies to: 32-32
202-203: Cursor removal aligns with the PR's state management migration.The
get_cursormethod now consistently returnsNone, effectively disabling legacy cursor-based state management. This is the expected behavior given thatDeclarativeStreamis deprecated and the framework is moving away from cursor-based approaches in favor of concurrent patterns.unit_tests/sources/declarative/retrievers/test_simple_retriever.py (3)
122-125: Consistent migration to SinglePartitionRouter across all test cases.All
SimpleRetrieverinstantiations have been updated to usestream_slicer=SinglePartitionRouter(parameters={})instead of cursor-based slicing. This ensures tests exercise the new stateless, slicer-driven pagination flow.Also applies to: 147-150, 438-441, 471-474, 506-509
216-223: Method signatures correctly updated to two-argument pattern.The request option methods (
_request_params,_request_body_data,_request_headers) now correctly accept(stream_slice, next_page_token)instead of the previous three-argument pattern. The test expectations have been updated accordingly, and the error handling logic remains intact.Also applies to: 263-270, 384-391
444-447: Internal retriever methods aligned with new pagination API.The
_read_pagesand_parse_recordsmethods now follow the simplified two-argument signature withoutstream_state. All test assertions and mock verifications have been updated to match this new pattern. The changes maintain test coverage while correctly reflecting the removal of state management from the retriever layer.Also applies to: 487-487, 513-513, 697-698, 708-709
airbyte_cdk/sources/declarative/retrievers/async_retriever.py (1)
14-14: State parameter deprecated with clear migration path.The
stream_stateparameter is now passed as an empty dict with a clear comment indicating deprecation. This maintains backward compatibility withrecord_selector.filter_and_transformwhile signaling the migration away from state-based interpolation contexts.Quick question: Is the
record_selector.filter_and_transformmethod being updated in this PR or a follow-up to fully remove thestream_stateparameter? Just want to ensure we have a complete deprecation path. Wdyt?Also applies to: 93-93
unit_tests/sources/declarative/test_concurrent_declarative_source.py (5)
3293-3293: LGTM! Mock call updated to match new signature.The test expectation correctly reflects the removal of the
stream_stateparameter fromSimpleRetriever._fetch_next_page. The call now expects(stream_slice, pagination_context)instead of(stream_slice, stream_state, pagination_context).
3380-3380: LGTM! Consistent mock call updates.These changes follow the same pattern as the other updated tests, correctly removing the
stream_stateparameter from the expected calls.Also applies to: 3464-3464
3551-3552: LGTM! Pagination test correctly updated.The mock expectations properly reflect pagination handling in the new signature:
- First page:
call({}, None)- no pagination context- Subsequent page:
call({}, {"next_page_token": "next"})- with pagination token
3638-3639: LGTM! Partition router test correctly updated.The expectations correctly test partitioned streams with the new signature, where partition information is passed in the
stream_sliceparameter and nostream_stateis needed.
3742-3744: LGTM! Combined pagination and partition router test correctly updated.The mock expectations properly reflect the interaction between pagination and partition routing in the new signature, showing multiple pages for one partition and a single page for another.
airbyte_cdk/sources/declarative/retrievers/retriever.py (2)
41-48: LGTM! State getter properly deprecated.The removal of
@abstractmethodand the no-op implementation returning an empty dict{}correctly implements the deprecation pattern. Subclasses are no longer required to implement state management at the Retriever level, which aligns with the PR's goal of moving state management to the stream level.
50-57: LGTM! State setter properly deprecated.The removal of
@abstractmethodand the no-op implementation withpasscorrectly implements the deprecation pattern, matching the getter's approach. This ensures backward compatibility while signaling that state management is moving away from the Retriever level.
| @abstractmethod | ||
| @deprecated("Stream slicing is being moved to the stream level.") | ||
| def stream_slices(self) -> Iterable[Optional[StreamSlice]]: | ||
| """Returns the stream slices""" | ||
| """Does nothing as this method is deprecated, so underlying Retriever implementations | ||
| do not need to implement this. | ||
| """ | ||
| yield from [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Potential inconsistency between @abstractmethod decorator and docstring.
The docstring says "underlying Retriever implementations do not need to implement this", but the @abstractmethod decorator (line 33) means that concrete subclasses must override this method or they cannot be instantiated. This seems contradictory.
Since the state property had its @abstractmethod removed (lines 41-48, 50-57), should stream_slices also have the decorator removed to match the deprecation pattern? Or if we want to keep it abstract, maybe update the docstring to clarify that subclasses should provide a minimal override that calls the parent implementation?
Something like:
@deprecated("Stream slicing is being moved to the stream level.")
def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
"""Deprecated no-op implementation. Subclasses should provide a minimal override
that yields from the parent or returns an empty iterable.
"""
yield from []wdyt?
---
🏁 Script executed:
```shell
#!/bin/bash
# Inspect the retriever.py file to see the full context of both stream_slices and state
fd -t f "retriever.py" --path "*sources/declarative/retrievers*" | head -5
Length of output: 233
🏁 Script executed:
#!/bin/bash
# Get lines around stream_slices and state property to compare their decorators
rg -n -B5 -A10 "def stream_slices|@property" "airbyte_cdk/sources/declarative/retrievers/retriever.py" | head -80Length of output: 839
🏁 Script executed:
#!/bin/bash
# Check if there are any subclasses of Retriever that override stream_slices
rg -n "class.*Retriever" "airbyte_cdk/sources/declarative/retrievers/" --type py | head -20Length of output: 384
🏁 Script executed:
#!/bin/bash
# Search for implementations of stream_slices in subclasses
rg -n -B3 -A8 "def stream_slices" "airbyte_cdk/sources/declarative/retrievers/" --type pyLength of output: 1152
---
🏁 Script executed:
```shell
#!/bin/bash
# Check if Retriever uses ABC and class definition
rg -n -B2 -A2 "^class Retriever|from abc|import ABC" "airbyte_cdk/sources/declarative/retrievers/retriever.py"
Length of output: 214
🏁 Script executed:
#!/bin/bash
# Check if AsyncRetriever and SimpleRetriever can instantiate (look for __init__ or direct instantiation patterns)
rg -n -B5 -A15 "^class AsyncRetriever|^class SimpleRetriever" "airbyte_cdk/sources/declarative/retrievers/async_retriever.py" "airbyte_cdk/sources/declarative/retrievers/simple_retriever.py" | head -60Length of output: 4285
🏁 Script executed:
#!/bin/bash
# Search for any attempt to override stream_slices in the entire codebase (not just retrievers dir)
rg -n "def stream_slices" --type py | grep -v "retriever.py:35"Length of output: 3258
Remove the @abstractmethod decorator to match the deprecation pattern used for state.
You've spotted a real inconsistency! The Retriever class doesn't inherit from ABC (it's a plain class at line 14), which means the @abstractmethod decorator on stream_slices is essentially decorative—it doesn't enforce anything. This makes it contradictory alongside the docstring saying implementations "do not need to implement this."
The state property already demonstrates the correct pattern for deprecation: it uses @property and @deprecated without @abstractmethod, and neither AsyncRetriever nor SimpleRetriever override it. Stream slicing should follow the same approach.
Removing @abstractmethod from stream_slices would:
- Eliminate the misleading decorator from a non-ABC class
- Align with how
stateis handled - Keep the no-op implementation clear without false enforcement signals
Wdyt—does removing the decorator make sense here?
🤖 Prompt for AI Agents
airbyte_cdk/sources/declarative/retrievers/retriever.py around lines 33 to 39:
the stream_slices method is decorated with @abstractmethod even though Retriever
is not an ABC and the method is intended to be a deprecated no-op; remove the
@abstractmethod decorator so the method mirrors the deprecated state property
pattern, leaving @deprecated and the generator implementation (yield from [])
intact to signal no enforcement is required.
| yield from [] | ||
| else: | ||
| yield from self._read_pages(records_generator_fn, stream_state, stream_slice) | ||
| yield from self._read_pages(records_generator_fn, stream_slice) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the recursion path in LazySimpleRetriever._read_pages.
Line 533 now calls self._read_pages from inside LazySimpleRetriever._read_pages, so whenever child_response is absent we recurse forever and eventually blow the stack instead of delegating to the base pagination logic. Could you switch that call to super()._read_pages(...) so we forward to the parent implementation instead? wdyt?
- yield from self._read_pages(records_generator_fn, stream_slice)
+ yield from super()._read_pages(records_generator_fn, stream_slice)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| yield from [] | |
| else: | |
| yield from self._read_pages(records_generator_fn, stream_state, stream_slice) | |
| yield from self._read_pages(records_generator_fn, stream_slice) | |
| yield from [] | |
| else: | |
| yield from super()._read_pages(records_generator_fn, stream_slice) | |
🤖 Prompt for AI Agents
In airbyte_cdk/sources/declarative/retrievers/simple_retriever.py around lines
531 to 534, the recursive call inside LazySimpleRetriever._read_pages uses
self._read_pages(...) which causes infinite recursion when child_response is
absent; replace that call with super()._read_pages(records_generator_fn,
stream_slice) so the method delegates to the parent implementation (preserving
the same arguments and yield-from behavior) and avoid blowing the stack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is probably outside of the scope of this PR but this seems like a legitimate concern honestly... I would either document that this code isn't tested and we don't know why it's there
PyTest Results (Fast)628 tests - 3 189 616 ✅ - 3 189 3m 33s ⏱️ - 3m 10s For more details on these failures, see this check. Results for commit ef07959. ± Comparison against base commit 6504148. This pull request removes 3189 tests. |
PyTest Results (Full)3 814 tests 3 800 ✅ 10m 50s ⏱️ For more details on these failures, see this check. Results for commit ef07959. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I am a bit worried about test_when_read_then_call_stream_slices_only_once not passing as I would assume we call the stream slicer at least once but I assume it is because of the implementation of MockSource that it fails, not because of a logical problem with this PR
| next_page_token = {"next_page_token": initial_token} if initial_token is not None else None | ||
| return next_page_token | ||
|
|
||
| def _read_single_page( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So beautiful 😍
| yield from [] | ||
| else: | ||
| yield from self._read_pages(records_generator_fn, stream_state, stream_slice) | ||
| yield from self._read_pages(records_generator_fn, stream_slice) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is probably outside of the scope of this PR but this seems like a legitimate concern honestly... I would either document that this code isn't tested and we don't know why it's there
The final nail in the coffin of the RFR mishap
At the root of things there are 3 main things that are being cleaned up
SimpleRetrieverno longer have acursorfield since cursors are managed outside of the retriever in the concurrent CDKSimpleRetrieverno longer makes any reference to RFR using synthetic pagination cursors. This was never instantiated on components nor supported in concurrent frameworkstategetter/setters orstream_slices()and rely on the dummy definitions on theRetrieverinterface. Those are kept to maintain compatibility w/ legacy classes and are not used anywaystream_statethrough the various methods and into the interpolation context.Risk:
The risk here is that
stream_stateis no longer referenceable in interpolation context. However, I don't think it would work properly anyway because stream state is not thread safe so we made a large effort to remove it earlier this year. Only one community connector references it in our repo, but the risk would be custom connector or OSS.