-
Notifications
You must be signed in to change notification settings - Fork 18
fix(property chunking): Switch the ordering page iteration and property chunking process chunks first instead of pages first #487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(property chunking): Switch the ordering page iteration and property chunking process chunks first instead of pages first #487
Conversation
…ks first instead of pages first
📝 WalkthroughWalkthroughThe changes focus on the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant SimpleRetriever
participant Cursor
User->>SimpleRetriever: read_records()
alt Using ResumableFullRefreshCursor
SimpleRetriever->>Cursor: is_full_refresh_sync_complete()
alt Sync complete
SimpleRetriever-->>User: return (no records)
else Not complete
SimpleRetriever->>SimpleRetriever: _read_pages() (single page)
SimpleRetriever-->>User: yield records
end
else Standard Mode
SimpleRetriever->>SimpleRetriever: _read_pages()
loop For each page
SimpleRetriever->>Cursor: observe(record)
SimpleRetriever-->>User: yield record
end
SimpleRetriever->>Cursor: close_slice()
end
sequenceDiagram
participant SimpleRetriever
participant PropertyChunker
participant PageFetcher
SimpleRetriever->>PropertyChunker: get property chunks
loop For each property chunk
PropertyChunker->>PageFetcher: fetch page with chunk
PageFetcher-->>SimpleRetriever: return records
alt Merge key enabled
SimpleRetriever->>SimpleRetriever: merge records by key (using _deep_merge)
else No merge key
SimpleRetriever-->>User: yield records
end
end
alt Merge key enabled
SimpleRetriever-->>User: yield merged records
end
Possibly related PRs
Suggested labels
Suggested reviewers
Does this updated summary and diagrams look good to you, or would you like me to emphasize any part more? Wdyt? Tip ⚡💬 Agentic Chat (Pro Plan, General Availability)
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (2)
⏰ Context from checks skipped due to timeout of 90000ms (9)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
387-394
: Clarify handling of stream_slice for each property chunk.Here, we reassign
stream_slice
ifproperties
is present. If the original slice is needed elsewhere, a fresh slice instance might be clearer. Would you be open to creating a new variable for the updated slice to avoid overshadowing the original? wdyt?
514-536
: Check property chunking coverage with ResumableFullRefreshCursor.These lines cleanly handle the RFR scenario. However, would you consider adding a test case ensuring property chunking also works when resuming a full refresh? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
(2 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
[error] 378-378: mypy: List item 0 has incompatible type "None"; expected "list[str]". (list-item)
[error] 382-382: mypy: Need type annotation for "records_without_merge_key" (hint: "records_without_merge_key: list[] = ..."). (var-annotated)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
395-418
: Verify usage of records_without_merge_key.We instantiate
records_without_merge_key
but never add any records to it. Would you consider either removing it or integrating it into the record merging logic to store records lacking a merge key? wdyt?
420-430
: Confirm aggregation of merged records.The logic for yielding merged records looks good. One small question: could partial merges occur if a record with the same merge key is split across multiple pages? If so, do we need to persist partial merges across pages? wdyt?
505-512
: No concerns with setting default _slice.Defining
_slice
whenstream_slice
isNone
and partially applying_parse_records
is a clean solution. Great job!
540-540
: No further changes needed.This return statement appears to be a standard final exit. Looks good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
382-382
: Would you consider adding a type annotation for merged_records?Mypy might flag this in the future. Adding a type annotation would improve code clarity and help with static analysis.
- merged_records: MutableMapping[str, Any] = defaultdict(dict) + merged_records: MutableMapping[str, MutableMapping[str, Any]] = defaultdict(dict)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: SDM Docker Image Build
🔇 Additional comments (8)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (8)
378-379
: Good fix for the type issue.Your change from
[None]
to[]
correctly addresses the mypy error flagged in the previous review. This ensures proper typing compatibility for the property chunks.
370-380
: Improved property chunks initialization approach.The refactored property chunking logic looks good. By moving this initialization to the beginning of the pagination loop, you're setting up the horizontal processing pattern that improves record emission timing.
386-417
: Well-implemented property chunking loop.The horizontal chunking approach (processing all chunks for the current page before moving to the next page) is well implemented. I like how you:
- Properly update the stream_slice with the current properties
- Handle records with and without merge keys differently
- Yield records without merge keys immediately while accumulating those with merge keys
This change achieves the PR's goal of allowing earlier record emission.
418-429
: Good handling of merged records after chunk processing.This code properly handles the emission of merged records after all chunks for a page have been processed. This ensures records with the same merge key are properly combined before moving to the next page.
505-512
: Nice refactoring of record parsing logic.Creating a partial function for record parsing makes the code cleaner and removes duplication. Good improvement!
513-522
: Good addition of early termination check for ResumableFullRefreshCursor.The addition of the early termination check for streams with a ResumableFullRefreshCursor is a nice enhancement that prevents unnecessary processing when a sync is already complete.
523-535
: Clean refactoring of the main record processing logic.The refactored code is cleaner and more focused. By moving the property chunking and merging logic to
_read_pages
, you've simplified this method while maintaining the same functionality.
370-429
: Implementation aligns perfectly with PR objective.Your refactoring successfully changes the property chunking approach from vertical (all pages for a chunk) to horizontal (all chunks for a page), which should significantly improve the time to first record emission. Nice work!
What
While implementing the 3 Hubspot property history streams which require usage of property chunking, I noticed some strange behavior where there would be a very long pause between the start of the sync and the first record emitted whenever property chunking was enabled.
This change refactors the
SimpleRetriever
so we process each of the property chunks of the current page horizontally instead of verticallyHow
The way I had originally written the code where the property chunk is the outer loop and the inner loop is the pagination iteration leads to us taking the first property chunk and paginating all the way to the end. We then iterate to the second property chunk and paginate all the way to the end. And this continues for as many chunks. However, as you can see, this means that we effectively cannot emit any records until we reach the last property chunk.
This change reworks the order of how we process pages and chunks so that we start with the current page, fetch each property chunk for that page, then emit the complete merged record. We then continue to the next page. This is done by moving where we perform property chunking from
SimpleRetriever.read_records()
and intoSimpleRetriever._read_pages()
Note:
You'll see there are no changes to tests, I had already written test cases for each of the property chunking and no chunking scenarios and since this is a refactor the intent is for there to be no functional impact on what records are emitted.
Summary by CodeRabbit
Summary by CodeRabbit