-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Remove low-code cursor usage #62
Conversation
Signed-off-by: Artem Inzhyyants <[email protected]>
…code-cursor-usage # Conflicts: # airbyte_cdk/sources/streams/concurrent/cursor.py
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
…ursors Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
📝 Walkthrough📝 WalkthroughWalkthroughThe pull request includes significant modifications to several classes within the Airbyte CDK. The Changes
Possibly related PRs
Suggested reviewers
Wydt about these suggestions? 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (7)
airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py (2)
54-55
: How about adding a docstring for the reset method?The new
reset_value
parameter could benefit from some documentation. Maybe something like this?def reset(self, reset_value: Optional[Any] = None) -> None: + """Reset the pagination strategy. + + Args: + reset_value: Optional value to reset the delegate strategy with. + """ self._delegate.reset(reset_value)wdyt? 🤔
Line range hint
29-55
: Thoughts on the cursor migration strategyThe current approach of supporting both cursor types is a good way to ensure a smooth transition. However, to prevent this from becoming a long-term maintenance burden, would it be helpful to:
- Add a migration guide in the docs?
- Set up deprecation warnings for
DeclarativeCursor
?- Create tracking issues for dependent repositories?
What are your thoughts on this? 🤔
airbyte_cdk/sources/declarative/extractors/record_filter.py (2)
Line range hint
80-86
: Consider enhancing error handling for cursor sync decisionsThe simplified filtering logic looks cleaner! However, should we add some debug logging to help troubleshoot when records are filtered out? This could be super helpful for debugging sync issues. wdyt?
records = ( record for record in records - if (self._substream_cursor or self._date_time_based_cursor).should_be_synced(record) + if self._should_sync_record(record) ) if self.condition: records = super().filter_records( records=records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, ) yield from records +def _should_sync_record(self, record: Mapping[str, Any]) -> bool: + cursor = self._substream_cursor or self._date_time_based_cursor + should_sync = cursor.should_be_synced(record) + if not should_sync: + self._logger.debug(f"Filtering out record: cursor={cursor.__class__.__name__}") + return should_sync
Line range hint
80-86
: Architecture feedback on cursor delegationThe refactoring to use
should_be_synced
nicely simplifies the filtering logic! This change aligns well with the PR objective to remove low-code cursor usage. The delegation to cursor objects follows good OOP principles by letting them handle their specific sync logic.unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py (1)
110-110
: Consider adding test cases for different reset values?The test verifies reset behavior with
None
, but since the method now accepts an optional parameter, we might want to test other scenarios. What do you think about adding test cases for:
- Reset with a specific value
- Reset with different types of values
Example addition:
def test_when_reset_with_value_then_delegate(mocked_pagination_strategy, mocked_stop_condition): decorator = StopConditionPaginationStrategyDecorator( mocked_pagination_strategy, mocked_stop_condition ) reset_value = {"cursor": "2023-01-01"} decorator.reset(reset_value) mocked_pagination_strategy.reset.assert_called_once_with(reset_value)wdyt? 🤔
airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py (1)
197-201
: Consider using_get_active_cursor()
for consistency?The implementation looks good! However, I noticed that other methods in the class use the
_get_active_cursor()
helper method. What do you think about using it here as well for consistency? Something like:def should_be_synced(self, record: Record) -> bool: - return ( - self._global_cursor.should_be_synced(record) - if self._use_global_cursor - else self._per_partition_cursor.should_be_synced(record) - ) + return self._get_active_cursor().should_be_synced(record)This would make the code more consistent with the rest of the class and potentially more maintainable. WDYT? 🤔
unit_tests/sources/declarative/extractors/test_record_filter.py (1)
439-442
: Nice implementation! Would you consider adding a few more assertions? 🤔The Record wrapping looks good, but we could make the tests even more robust. What do you think about:
- Adding assertions to verify the stream_slice is correctly attached to the Record objects?
- Testing edge cases like empty stream_slice or null values?
Example:
# Verify stream_slice attachment assert all(r.stream_slice == stream_slice for r in filtered_records)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (6)
airbyte_cdk/sources/declarative/extractors/record_filter.py
(1 hunks)airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py
(3 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(2 hunks)unit_tests/sources/declarative/extractors/test_record_filter.py
(2 hunks)unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py
(1 hunks)
🔇 Additional comments (9)
airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py (2)
9-9
: LGTM! Clean import addition
The new import aligns well with the type hint requirements in CursorStopCondition
.
29-33
: Consider enhancing the migration comment
The inline comment suggests this is a transitional change. Would it make sense to add more context about the migration plan? Maybe something like:
- | ConcurrentCursor, # migrate to use both old and concurrent versions
+ | ConcurrentCursor, # TODO(migration): Supporting both cursor types until DeclarativeCursor is fully deprecated. Target removal date: <DATE>
What do you think? 🤔
Let's check if DeclarativeCursor is still widely used:
airbyte_cdk/sources/declarative/extractors/record_filter.py (1)
80-80
: Verify cursor fallback behavior
The new implementation uses either _substream_cursor
or _date_time_based_cursor
for sync decisions. Should we add a validation to ensure at least one cursor is always provided? wdyt?
unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py (1)
Line range hint 1-124
: LGTM! Well-structured tests 👍
The test file is well-organized with:
- Clear test cases covering different scenarios
- Good use of fixtures and mocks
- Descriptive test names
airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py (1)
Line range hint 15-82
: Documentation looks great! 👏
The class documentation is comprehensive, well-structured, and includes clear examples. It effectively explains the purpose, functionality, and state management approach. Great job!
unit_tests/sources/declarative/extractors/test_record_filter.py (2)
5-5
: LGTM! Clean import addition.
The Record import aligns well with the refactoring goals.
Line range hint 439-446
: Should we verify integration with should_be_synced
? 🤔
The tests look comprehensive, but given that this PR introduces a new should_be_synced
feature (mentioned in PR objectives), should we add specific test cases to verify its integration with the Record filtering? This would help ensure the new feature works correctly with all cursor types.
airbyte_cdk/sources/streams/concurrent/cursor.py (2)
19-21
: LGTM! Clean logging setup 🎯
The logging configuration follows Python best practices and maintains consistency with the Airbyte namespace.
449-456
: Verify usage and impact of should_be_synced
Let's verify how this method is being used across the codebase to ensure the default True behavior for missing cursor fields won't cause issues.
✅ Verification successful
Default behavior for missing cursor fields is consistent with existing implementations
Based on the codebase analysis, the default True
behavior for missing cursor fields is actually the established pattern across different cursor implementations:
DatetimeBasedCursor
has explicit test coverage for this case intest_given_record_without_cursor_value_when_should_be_synced_then_return_true
- The behavior is used by various cursor implementations including
PerPartitionCursor
,GlobalSubstreamCursor
, and others - The warning message provides good observability for debugging
The implementation aligns with the existing cursor behavior in the codebase, making it a safe and consistent choice.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for calls to should_be_synced
rg "should_be_synced" --type python -B 2 -A 2
# Look for potential error handling patterns in other cursor implementations
ast-grep --pattern 'if not $cursor_value:
$$$
return True
$$$'
Length of output: 147
Script:
#!/bin/bash
# Search for calls to should_be_synced with correct file type
rg "should_be_synced" -t py -B 2 -A 2
# Look for cursor-related error handling patterns
rg "cursor.*field.*not.*found|missing.*cursor" -t py -B 2 -A 2
# Find usages of the cursor class
rg "class.*Cursor" -t py -B 2 -A 2
Length of output: 39903
UPD:
I guess it is out of scope of current PR to fix this, as we'll have to change throughout all codebase
@maxi297 , WDYT ? |
I'm a bit worried this means that we might break things and we can't proceed with this change. The reason is If this is the case, then I fear that we will need to update the |
Tested on mailchimp segment_members stream::: got an exception :: So I see 2 options here:
Index: airbyte_cdk/sources/declarative/extractors/record_filter.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/airbyte_cdk/sources/declarative/extractors/record_filter.py b/airbyte_cdk/sources/declarative/extractors/record_filter.py
--- a/airbyte_cdk/sources/declarative/extractors/record_filter.py (revision 6842cc40efe287250fb477a7438d6d76dab4cf19)
+++ b/airbyte_cdk/sources/declarative/extractors/record_filter.py (date 1732126900725)
@@ -10,7 +10,7 @@
PerPartitionWithGlobalCursor,
)
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
-from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
+from airbyte_cdk.sources.types import Config, StreamSlice, StreamState, Record
@dataclass
@@ -77,7 +77,7 @@
records = (
record
for record in records
- if (self._substream_cursor or self._date_time_based_cursor).should_be_synced(record)
+ if (self._substream_cursor or self._date_time_based_cursor).should_be_synced(Record(data=record, associated_slice=stream_slice))
)
if self.condition:
records = super().filter_records(
Do we plan to initialize |
I think I don't understand the proposals. It feels like in both case, we would be at risk of having a cursor with a different implementation that doesn't take the type of record. For example, if the cursor is the ConcurrentCursor, calling So for me right now, the only solutions are:
I think I prefer the second one. It is more changes code wise in the CDK but create less issues in the sources. I'm also fine breaking interfaces in the concurrent framework as we agreed they were experimental until we complete the migration to one CDK. WDYT? |
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
unit_tests/sources/streams/concurrent/test_cursor.py (2)
1500-1546
: Consider adding more edge cases to strengthen test coverage, wdyt?The test cases look good and cover the main scenarios! Would you consider adding these additional cases to make the coverage even more robust?
- Edge cases with cursor values exactly at the boundaries (equal to lower/upper bounds)
- Different data types for cursor field values (strings, dates, etc.)
Here's a suggestion for the additional test cases:
[ Record( data={"not_a_cursor_field": "some_data"}, partition=_partition( {_LOWER_SLICE_BOUNDARY_FIELD: 0, _UPPER_SLICE_BOUNDARY_FIELD: 10} ), ), True, ], + [ + Record( + data={_A_CURSOR_FIELD_KEY: 0}, # Equal to lower bound + partition=_partition( + {_LOWER_SLICE_BOUNDARY_FIELD: 0, _UPPER_SLICE_BOUNDARY_FIELD: 10} + ), + ), + True, + ], + [ + Record( + data={_A_CURSOR_FIELD_KEY: "2024-01-01"}, # String date + partition=_partition( + {_LOWER_SLICE_BOUNDARY_FIELD: "2024-01-01", _UPPER_SLICE_BOUNDARY_FIELD: "2024-01-02"} + ), + ), + True, + ], ], ids=[ "with_cursor_field_inside_range", "with_cursor_field_lower_than_range", "with_cursor_field_higher_than_range", "no_cursor", + "with_cursor_field_at_lower_bound", + "with_string_date_cursor_field", ],
1547-1562
: Would you like to add a docstring to explain the test's purpose?The test implementation looks great! To make it even more maintainable, consider adding a docstring to explain what aspect of the ConcurrentCursor we're testing here and why these test cases are important, wdyt?
Here's a suggestion:
@freezegun.freeze_time(time_to_freeze=datetime.fromtimestamp(20, timezone.utc)) def test_should_be_synced(record: Record, should_be_synced: bool): + """ + Test the should_be_synced method of ConcurrentCursor which determines if a record + should be synchronized based on its cursor field value relative to the slice boundaries. + + The test verifies that: + 1. Records with cursor values within the slice range are synced + 2. Records with cursor values outside the slice range are not synced + 3. Records without cursor fields are always synced + """ cursor = ConcurrentCursor( _A_STREAM_NAME,
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
airbyte_cdk/sources/streams/concurrent/cursor.py
(2 hunks)unit_tests/sources/streams/concurrent/test_cursor.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/streams/concurrent/cursor.py
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (25)
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (2)
8-8
: Consider documenting the Record type requirementsThe move to use
Record
fromairbyte_cdk.sources.types
aligns with centralizing type definitions. Given the type compatibility discussions in the PR, would it be helpful to add a note about expectedRecord
structure and requirements in the class docstring? This could help prevent future type mismatches, wdyt? 🤔
Line range hint
22-31
: Consider enhancing slice type documentationThe comment about nested slice values is super helpful! Given the type compatibility discussions, would it make sense to add some example structures or common patterns for slice construction? This could help implementers understand the relationship between
Record
and slice representations better. 😊unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (1)
59-59
: LGTM! Consider adding more test cases? 🤔The assertion change from checking
partition
toassociated_slice
aligns well with the cursor refactoring goals. This ensures that records maintain their slice association, which is crucial for proper cursor handling.Would you consider adding a few more test cases to verify edge scenarios? For example:
- A test with an empty stream slice
- A test with multiple records from the same slice
- A test verifying slice association persistence through transformations
wdyt?
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (2)
62-66
: Nice improvement to Record instantiation clarity! Would you consider adding a docstring? 🤔The named parameters make the code much more readable and self-documenting. Since this seems to be a key part of the partition handling, perhaps we could add a brief docstring explaining the purpose of these parameters, wdyt?
Here's a suggestion:
def read(self) -> Iterable[Record]: + """ + Reads records from the retriever and yields them as Record objects. + + Each record includes: + - data: The actual stream data + - stream_name: Name of the stream this record belongs to + - associated_slice: The slice context this record was generated from + """ for stream_data in self._retriever.read_records(self._json_schema, self._stream_slice):
Line range hint
1-93
: Clean separation of concerns! A thought about error handling...The partition generation design looks solid with good separation between factory, partition, and generator. Since this is part of removing low-code cursor usage, would it make sense to add some explicit error handling for cursor-related edge cases that might have been handled implicitly before? For example, handling cases where stream_data might contain cursor-related fields that are no longer supported?
airbyte_cdk/sources/types.py (1)
19-29
: Consider adding docstring for the new parameters?The new parameters look good, but adding documentation would help users understand their purpose. Something like this perhaps? wdyt?
def __init__( self, data: Mapping[str, Any], associated_slice: Optional[StreamSlice] = None, stream_name: Optional[str] = None, is_file_transfer_message: bool = False, ): + """Initialize a Record instance. + Args: + data: The actual record data + associated_slice: The stream slice this record was read from + stream_name: Optional name of the stream this record belongs to + is_file_transfer_message: Flag indicating if this record represents a file transfer + """airbyte_cdk/sources/declarative/extractors/record_selector.py (1)
Line range hint
63-69
: Great documentation of technical debt! Would you consider tracking it formally? 🤔The docstring nicely explains the architectural issue about mixing technology-agnostic processing with technology-specific API. Would it make sense to create a technical debt ticket to track this? This could help prioritize the refactoring work in the future, wdyt?
Would you like me to help create a GitHub issue to track this technical debt?
airbyte_cdk/sources/concurrent_source/concurrent_source.py (1)
Line range hint
156-167
: Consider using a pattern matching approach for queue item handling?The current type checking with
isinstance
is working well, but with Python 3.10+ we could make this more elegant using pattern matching. This could make the code more maintainable as new queue item types are added, wdyt?Here's how it could look:
- def _handle_item( - self, - queue_item: QueueItem, - concurrent_stream_processor: ConcurrentReadProcessor, - ) -> Iterable[AirbyteMessage]: - if isinstance(queue_item, StreamThreadException): - yield from concurrent_stream_processor.on_exception(queue_item) - elif isinstance(queue_item, PartitionGenerationCompletedSentinel): - yield from concurrent_stream_processor.on_partition_generation_completed(queue_item) - elif isinstance(queue_item, Partition): - concurrent_stream_processor.on_partition(queue_item) - elif isinstance(queue_item, PartitionCompleteSentinel): - yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item) - elif isinstance(queue_item, Record): - yield from concurrent_stream_processor.on_record(queue_item) - else: - raise ValueError(f"Unknown queue item type: {type(queue_item)}") + def _handle_item( + self, + queue_item: QueueItem, + concurrent_stream_processor: ConcurrentReadProcessor, + ) -> Iterable[AirbyteMessage]: + match queue_item: + case StreamThreadException(): + yield from concurrent_stream_processor.on_exception(queue_item) + case PartitionGenerationCompletedSentinel(): + yield from concurrent_stream_processor.on_partition_generation_completed(queue_item) + case Partition(): + concurrent_stream_processor.on_partition(queue_item) + case PartitionCompleteSentinel(): + yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item) + case Record(): + yield from concurrent_stream_processor.on_record(queue_item) + case _: + raise ValueError(f"Unknown queue item type: {type(queue_item)}")unit_tests/sources/streams/concurrent/scenarios/incremental_scenarios.py (1)
Line range hint
1-300
: Great test coverage! 🎯The test scenarios comprehensively cover:
- No input state
- Legacy state
- Concurrent state
Each with variants for with/without slice boundaries. The systematic approach looks solid!
One small suggestion: Since we're making significant changes to how slices are handled, would it be helpful to add a test case that verifies error handling for malformed StreamSlice objects? For example, what happens if partition is None or has missing keys? wdyt?
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
150-154
: Consider documenting the Record type contractThe change from
record.partition.stream_name()
torecord.stream_name
simplifies the access pattern, which is great! Would it make sense to add some type hints or documentation to clarify what attributes are required on the Record type? This could help prevent runtime errors if someone creates a custom Record implementation. wdyt? 🤔airbyte_cdk/sources/file_based/stream/concurrent/adapters.py (1)
Line range hint
261-267
: Should we handle more edge cases for record_message_data? 🤔The current check
if not record_message_data
catches None, but what about other falsy values like empty strings or empty collections? Would it make sense to be more specific about what constitutes valid data, wdyt?- if not record_message_data: + if record_message_data is None or (isinstance(record_message_data, (str, list, dict)) and not record_message_data): raise ExceptionWithDisplayMessage("A record without data was found") + elif not isinstance(record_message_data, (str, dict, list)): + raise ExceptionWithDisplayMessage(f"Unexpected data type: {type(record_message_data)}")unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py (2)
349-353
: Nice type-safety improvement! Would you consider adding a docstring?The switch to
StreamSlice
objects makes the test more robust and type-safe. Since this is a significant pattern change, would you consider adding a brief docstring to explain the slice structure? Something like "Test verifies stream behavior with typed slice boundaries", wdyt?
444-451
: Consider consistent formatting and additional error cases?The StreamSlice conversion looks good, but I noticed a few extra newlines. Would you like to make the formatting consistent with the previous test case?
Also, since this is testing error conditions, should we add a test case for when
cursor_slice
is non-empty, wdyt? This could help verify the behavior when both partition and cursor data are present.Here's how we could make the formatting consistent:
[ - ( - StreamSlice(partition={"parent_id": 1}, cursor_slice={}), - [{"id": "1", "cursor_field": 0}], - ), - ( - StreamSlice(partition={"parent_id": 309}, cursor_slice={}), - [{"id": "3", "cursor_field": 0}], - ), + (StreamSlice(partition={"parent_id": 1}, cursor_slice={}), + [{"id": "1", "cursor_field": 0}]), + (StreamSlice(partition={"parent_id": 309}, cursor_slice={}), + [{"id": "3", "cursor_field": 0}]), ],airbyte_cdk/sources/streams/concurrent/adapters.py (1)
Line range hint
285-286
: Consider enhancing state behavior documentation.The comment about state behavior differences is super important! Would it be helpful to expand this into a more detailed docstring explaining:
- Current slice-by-slice state update behavior
- How this differs in Concurrent CDK
- Potential implications for connector implementations
This could help future maintainers and connector developers, wdyt? 🤓
airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py (1)
328-331
: LGTM! The keyword arguments improve clarity. Would you consider adding a docstring? wdyt?The change to use keyword arguments makes the code more maintainable and type-safe. Consider adding a brief docstring to explain why we're creating a new Record with an empty partition dictionary, as this might help future maintainers understand the intent better.
@staticmethod def _convert_record_to_cursor_record(record: Record) -> Record: """ Converts a record to a cursor record by creating a new Record with the same data but with an empty partition dictionary in its associated slice. Args: record (Record): The original record to convert Returns: Record: A new record suitable for cursor operations """airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (1)
343-346
: LGTM! Would you consider adding a docstring to clarify the method's purpose? wdyt?The implementation looks solid and handles the None case properly. Maybe we could add a brief docstring explaining why we need to convert records and what "cursor record" means in this context? This would help future maintainers understand the purpose of this conversion.
Something like:
@staticmethod def _convert_record_to_cursor_record(record: Record) -> Record: """ Converts a record to a cursor record by creating a new Record instance with an empty partition. This ensures consistent cursor behavior across different partitions. Args: record (Record): The original record to convert Returns: Record: A new record instance with an empty partition and the original cursor slice """unit_tests/sources/file_based/stream/concurrent/test_adapters.py (2)
296-302
: LGTM! Clean list comprehension with explicit parameters.The code is clear and consistent. Would you consider adding type hints to make the data structure expectations more explicit? Something like:
records: List[Record] = [ Record( data=data, associated_slice=partition, ) for data in expected_stream_data ]wdyt?
312-318
: Consider making the associated_slice more dynamic.Using the string literal "stream" for
associated_slice
might make the test brittle. What do you think about using a variable or the stream name from the test configuration? This would make the test more maintainable if the stream name changes. wdyt?records = [ Record( data=data, - associated_slice="stream", + associated_slice=self._facade.name, ) for data in expected_stream_data ]unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py (2)
33-40
: Consider using a test helper for Record creation, wdyt?I notice we're creating
Record
instances with the same pattern across multiple test scenarios. To make the tests more maintainable and reduce duplication, what do you think about introducing a test helper function? Something like:def create_test_record(id_value: str, stream_name: str, **additional_data) -> Record: data = {"id": id_value, **additional_data} return Record(data=data, stream_name=stream_name)This would allow us to simplify the test cases to:
create_test_record("1", "stream1")Also applies to: 69-76, 105-112, 141-163, 192-214, 243-246, 378-383
243-246
: Consider adding more error scenarios, wdyt?In
_stream_raising_exception
, we're testing withValueError
. Since this is testing concurrent stream behavior, would it be valuable to add test cases for other concurrent-specific exceptions like:
- TimeoutError
- InterruptedError
- Race conditions with multiple partitions
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
451-459
: Consider adding docstring and tests?The implementation looks good and handles missing cursor fields gracefully. Would you consider:
- Adding a docstring to explain the behavior? Something like:
def should_be_synced(self, record: Record) -> bool: """ Determines if a record should be synced based on its cursor value. If the cursor field is missing, logs a warning and returns True to ensure the record is synced. Otherwise, checks if the record's cursor value falls within the sync boundaries. Args: record: The record to evaluate Returns: bool: True if the record should be synced, False otherwise """
- Adding test cases for:
- Missing cursor field
- Edge cases at boundaries
- Values outside boundaries
wdyt? 🤔
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
480-480
: LGTM! Consider using dict.copy() for defensive copyingThe addition of
associated_slice
parameter and use ofdict()
looks good. For slightly better performance when dealing with dictionaries, we could usedict.copy()
instead ofdict()
. What do you think? 🤔- return Record(data=dict(stream_data), associated_slice=stream_slice) + return Record(data=stream_data.copy(), associated_slice=stream_slice)unit_tests/sources/streams/test_stream_read.py (1)
545-546
: LGTM! Nice improvement using named parameters.The explicit parameter naming makes the code more readable. Would you consider adding a type hint for the
data
parameter to make it even more clear? Something like this:- data=record, + data: Mapping[str, Any] = record,wdyt?
unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py (1)
Line range hint
1-800
: Impressive test coverage! A few enhancement ideas to considerThe test suite is comprehensive with excellent coverage of edge cases. Would you consider adding a few additional test cases to make it even more robust? For example:
- Test cases for invalid datetime formats
- Tests for timezone edge cases
- Tests for leap year scenarios
What do you think? Happy to help draft these additional test cases if you're interested!
unit_tests/sources/streams/concurrent/test_cursor.py (1)
1577-1639
: LGTM! Comprehensive test cases for should_be_synced.The parametrized tests cover essential scenarios including boundary conditions and edge cases. Would you consider adding a few more test cases to make it even more robust? For example:
- Edge cases exactly at the boundary values
- Special values (null, empty string, etc.) for the cursor field
- Different data types for the cursor field
What do you think? 🤔
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (32)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
(2 hunks)airbyte_cdk/sources/concurrent_source/concurrent_source.py
(1 hunks)airbyte_cdk/sources/declarative/extractors/record_filter.py
(2 hunks)airbyte_cdk/sources/declarative/extractors/record_selector.py
(1 hunks)airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py
(1 hunks)airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py
(1 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
(1 hunks)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
(2 hunks)airbyte_cdk/sources/file_based/stream/concurrent/adapters.py
(3 hunks)airbyte_cdk/sources/file_based/stream/concurrent/cursor/abstract_concurrent_file_based_cursor.py
(1 hunks)airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py
(1 hunks)airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py
(1 hunks)airbyte_cdk/sources/streams/concurrent/adapters.py
(2 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(5 hunks)airbyte_cdk/sources/streams/concurrent/partitions/partition.py
(1 hunks)airbyte_cdk/sources/streams/concurrent/partitions/record.py
(0 hunks)airbyte_cdk/sources/streams/concurrent/partitions/types.py
(1 hunks)airbyte_cdk/sources/types.py
(3 hunks)unit_tests/sources/declarative/extractors/test_record_filter.py
(2 hunks)unit_tests/sources/declarative/extractors/test_record_selector.py
(1 hunks)unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py
(1 hunks)unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
(1 hunks)unit_tests/sources/file_based/stream/concurrent/test_adapters.py
(5 hunks)unit_tests/sources/streams/concurrent/scenarios/incremental_scenarios.py
(7 hunks)unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py
(3 hunks)unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py
(8 hunks)unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_source_builder.py
(1 hunks)unit_tests/sources/streams/concurrent/test_adapters.py
(1 hunks)unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
(2 hunks)unit_tests/sources/streams/concurrent/test_cursor.py
(24 hunks)unit_tests/sources/streams/concurrent/test_partition_reader.py
(1 hunks)unit_tests/sources/streams/test_stream_read.py
(2 hunks)
💤 Files with no reviewable changes (1)
- airbyte_cdk/sources/streams/concurrent/partitions/record.py
✅ Files skipped from review due to trivial changes (4)
- airbyte_cdk/sources/file_based/stream/concurrent/cursor/abstract_concurrent_file_based_cursor.py
- airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py
- unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_source_builder.py
- unit_tests/sources/streams/concurrent/test_adapters.py
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte_cdk/sources/declarative/extractors/record_filter.py
- unit_tests/sources/declarative/extractors/test_record_filter.py
🔇 Additional comments (41)
airbyte_cdk/sources/streams/concurrent/partitions/types.py (2)
11-11
: LGTM! Moving towards a more centralized type system.
The change to import Record
from airbyte_cdk.sources.types
aligns well with the refactoring goals. This centralization should make the type system more maintainable.
Let's verify that this is part of a consistent pattern:
✅ Verification successful
Import change is consistent across the codebase
The verification shows that Record
is consistently imported from airbyte_cdk.sources.types
across the entire codebase, with no remaining imports from the old location (concurrent.partitions.record
). This confirms that the import change aligns with the broader codebase structure.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if Record is consistently imported from airbyte_cdk.sources.types
# across the codebase after this change
# Search for any remaining imports from the old location
rg "from.*concurrent\.partitions\.record.*import.*Record"
# Check the new import pattern usage
rg "from.*airbyte_cdk\.sources\.types.*import.*Record"
Length of output: 6407
Line range hint 37-39
: Type definition looks good! Quick verification needed.
The QueueItem
Union type definition remains valid with the new Record
import. Since this is a critical type used in concurrent streaming, would you mind if we double-check that all consumers of this type are still working as expected? 🤔
Let's verify the usage:
✅ Verification successful
The previous command might have failed because we tried to pipe between commands. Let's try a more direct approach to find QueueItem usage and its type definition. This will help us verify if the Record type is being used consistently across the codebase.
Let's try without the Python file type filter since it's not recognized. This should help us find all occurrences of QueueItem and its relationship with Record type across the codebase.
All clear! The QueueItem type is consistently used across the codebase 🎯
I've checked all the usages of QueueItem, and they're all properly importing and using it from the types module. The type is used consistently in:
- Queue type hints in partition_enqueuer.py and partition_reader.py
- Concurrent source implementation in concurrent_source.py
- Test files that verify the behavior
The change in Record import location doesn't affect any of these implementations since they all reference QueueItem through the correct import path. Looking good!
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any potential type-related issues in files using QueueItem
# Find files using QueueItem
rg "QueueItem" -l | xargs rg "Union\[.*Record.*\]" -A 2
Length of output: 55
Script:
#!/bin/bash
# Let's try a different approach to find QueueItem usage
# First, find the files containing QueueItem
rg "QueueItem" --type python
# Then specifically look for the type definition
rg "QueueItem\s*=" --type python -A 2
# Also check for imports of QueueItem
rg "from.*import.*QueueItem" --type python
Length of output: 206
Script:
#!/bin/bash
# Let's try without file type filtering
# Find QueueItem usage
rg "QueueItem"
# Look for the type definition
rg "QueueItem\s*="
# Check for imports
rg "from.*import.*QueueItem"
# Also check for Record usage in the same context
rg "Record.*QueueItem|QueueItem.*Record" -A 2
Length of output: 2216
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
Line range hint 15-20
: Verify type compatibility with cursor implementations
The read()
method's return type Iterable[Record]
might need additional context given the PR's focus on cursor handling. Should we add a note about how Records should be structured for cursor-based filtering? This could help address the type compatibility concerns raised in the PR discussion.
✅ Verification successful
Let me gather more information about cursor implementations and Record usage.
Let me gather more information about Record's structure and its relationship with cursor implementations.
Based on the extensive analysis of cursor implementations and Record usage in the codebase, I can now provide a final response.
Hey there! What if we add a note about cursor compatibility in the docstring? 🤔
The read()
method's return type Iterable[Record]
is well-aligned with cursor implementations throughout the codebase. Records are used extensively with cursors for state management and incremental syncs. However, to make the cursor-compatibility requirements more explicit, maybe we could add a note like:
def read(self) -> Iterable[Record]:
"""
Reads the data from the partition.
Records emitted by this method should include any fields required by cursor implementations
for proper state management. For cursor-based filtering, ensure records contain the cursor
field at the top level of the record data.
:return: An iterable of records.
"""
This would help developers understand the relationship between records and cursors, wdyt? 🙂
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for Record usage in cursor implementations
ast-grep --pattern 'class $name implements ConcurrentCursor {
$$$
def $method($args) {
$$$
Record($$$)
$$$
}
$$$
}'
# Look for existing Record type hints
rg -A 2 'def.*->.*Record.*:'
Length of output: 5871
Script:
#!/bin/bash
# Search for cursor-related Record handling
rg -A 5 'class.*Cursor.*:'
# Look for Record type definition and its structure
ast-grep --pattern 'class Record:
$$$'
# Search for cursor-related filtering of records
rg -A 3 'cursor.*record|record.*cursor' -i
Length of output: 104033
Script:
#!/bin/bash
# Look for Record class definition and its usage in cursor-related code
ast-grep --pattern 'class Record($args):
$$$'
# Search for Record imports and their context
rg -A 3 'from.*import.*Record'
# Look for Record type hints in cursor-related files
rg 'Record.*:' --type python airbyte_cdk/sources/streams/concurrent/
Length of output: 40399
unit_tests/sources/streams/concurrent/test_partition_reader.py (3)
Line range hint 18-39
: Test cases look solid! 🎯
The test coverage is comprehensive, covering key scenarios:
- Empty partition handling
- Successful record processing
- Exception handling
The test data and assertions are clear and well-structured. Nice work!
Line range hint 41-85
: Helper methods are well-designed! 👍
The helper methods (_a_partition
, _read_with_exception
, _consume_queue
) are clean, focused, and make the tests very readable. They handle both the happy path and error scenarios effectively.
13-13
: Verify the Record class relocation impact
The Record class import has been moved from a partition-specific location to a more general types module. This looks like a good architectural decision for better code organization!
Let's verify that this change is consistent across the codebase:
✅ Verification successful
Import change is consistent across the codebase
The Record class has been successfully moved to airbyte_cdk.sources.types
, and there are no remaining references to the old import path (concurrent.partitions.record
). The new import is consistently used across test files and implementation code, confirming this was a complete refactoring. Nice work on maintaining consistency! 🎯
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any remaining old-style Record imports that might have been missed
# and verify the new import is used consistently
echo "Checking for any remaining old Record imports..."
rg "from.*concurrent\.partitions\.record.*import.*Record"
echo "Verifying new Record import usage..."
rg "from.*sources\.types.*import.*Record"
Length of output: 6663
airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py (2)
Line range hint 45-45
: Verify observe() method behavior with new Record type.
The observe()
method uses the newly imported Record type. Since you mentioned type errors in the PR comments, should we double-check if this method's implementation works correctly with the new Record type? wdyt? 🤔
✅ Verification successful
The observe()
method implementation is consistent with the Record type usage across the codebase
Looking at the codebase, the FileBasedFinalStateCursor.observe()
method's empty implementation (using pass
) is intentional and follows the same pattern as other cursor implementations that don't need to track record-level state, such as:
FileBasedConcurrentCursor.observe()
ResumableFullRefreshCursor.observe()
SubstreamResumableFullRefreshCursor.observe()
The empty implementation is valid since this cursor only needs to track the final state using NO_CURSOR_STATE_KEY
, and doesn't require per-record observation.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check observe() method implementations and Record usage patterns
# Look for other observe() implementations with Record parameter
ast-grep --pattern 'def observe(self, record: Record):
$$$
'
# Check for any test cases using observe() with Record
rg -A 5 'observe.*Record'
Length of output: 10218
19-19
: LGTM! Let's verify type compatibility.
The move to import Record
from airbyte_cdk.sources.types
looks good and aligns with the standardization effort. Shall we verify the type compatibility? 🤔
✅ Verification successful
Import change looks good! Type compatibility verified ✅
The move to import Record
from airbyte_cdk.sources.types
is perfectly safe. The new Record
class is a proper Mapping[str, Any]
with the same interface, and it's being used consistently across the codebase. The observe
method in your file uses it exactly as intended.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify Record type compatibility and usage
# Check if the new Record type has the same interface as the old one
# Look for Record type definition and its attributes
ast-grep --pattern 'class Record:
$$$
'
# Check for any existing Record type usages that might conflict
rg -A 3 'from airbyte_cdk\.sources\.types import Record'
Length of output: 18325
Script:
#!/bin/bash
# Let's check the Record type definition in airbyte_cdk/sources/types.py
ast-grep --pattern 'class Record:
$$$
' airbyte_cdk/sources/types.py
# Let's also check the old Record type definition for comparison
ast-grep --pattern 'class Record:
$$$
' airbyte_cdk/sources/streams/concurrent/partitions/record.py
Length of output: 283
Script:
#!/bin/bash
# Let's try a different approach to find Record type definition
rg -A 10 "^class Record" airbyte_cdk/sources/types.py
# Also check if there are any direct usages of Record in the file we're reviewing
rg -A 5 "Record" airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py
Length of output: 832
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
9-9
: LGTM! Let's verify the import consistency across the codebase.
The move of Record
to airbyte_cdk.sources.types
makes sense as it's a fundamental type.
Let's check if we've consistently updated all imports:
✅ Verification successful
Import consistency verified across the codebase
The search results confirm that all Record
imports are consistently using the new path airbyte_cdk.sources.types
. No instances of the old import path concurrent.partitions.record
were found, indicating a complete migration.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any remaining old-style Record imports
rg "from.*concurrent\.partitions\.record.*import.*Record"
# Search for new-style Record imports to confirm the change
rg "from.*sources\.types.*import.*Record"
Length of output: 6473
airbyte_cdk/sources/types.py (2)
8-8
: LGTM! Good choice of JSON library.
Using orjson is a solid choice here - it's fast and handles Python types consistently, which is crucial for reliable hashing.
152-153
: Verify hash consistency for complex types?
The hash implementation looks good for most cases! However, we might want to verify behavior with edge cases like:
- Floating-point numbers (JSON serialization might vary)
- Nested complex types
- Unicode strings
Would you like me to help create test cases for these scenarios?
✅ Verification successful
Let me analyze the usage of StreamSlice to understand the types of data being hashed. Let me check the actual implementations and data types being used.
Based on the code analysis, I can now provide a definitive response about the hash implementation.
Hash implementation is safe and consistent
The StreamSlice hash implementation is well-designed for all data types:
- It uses orjson.dumps which provides consistent serialization across runs
- The class enforces immutability through
__setitem__
prevention - The internal
_stream_slice
is a simple dict with partition and cursor_slice data - All usage patterns in the codebase show simple scalar values (strings, integers, dates) in both partition and cursor_slice
The edge cases mentioned in the review comment are actually handled well:
- Floating-point numbers: orjson uses a stable float representation
- Complex types: The codebase only uses simple scalar values
- Unicode strings: orjson has full unicode support
Would you like me to add any test cases for specific scenarios you're concerned about? 😊
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for existing usage of StreamSlice in collections to understand impact
rg -l "StreamSlice.*[set|dict|frozenset]" --type python
# Look for potential complex types in stream slices
ast-grep --pattern 'StreamSlice($$$)'
Length of output: 95140
Script:
#!/bin/bash
# Look for StreamSlice instantiations with actual data to understand types
rg -A 3 "StreamSlice\(" --type python | grep -v "test"
# Look for the StreamSlice class definition and its properties
ast-grep --pattern 'class StreamSlice:
$$$
'
Length of output: 134
Script:
#!/bin/bash
# Look for StreamSlice class definition and its properties
cat airbyte_cdk/sources/types.py
# Look for actual usage of StreamSlice in non-test files
find . -type f -name "*.py" ! -path "*/test*" -exec grep -l "StreamSlice" {} \; | while read -r file; do
echo "=== $file ==="
grep -A 5 "StreamSlice(" "$file" 2>/dev/null
done
Length of output: 38999
airbyte_cdk/sources/declarative/extractors/record_selector.py (2)
88-88
: Nice improvement to code clarity! 👍
The switch to keyword arguments in Record
instantiation makes the code more explicit and maintainable. This change aligns well with the broader refactoring effort across the codebase.
87-89
: Should we verify the Record creation type safety? 🤔
Given the type compatibility concerns mentioned in the PR discussion, would it be helpful to add runtime type checking or assertions to ensure that data
and associated_slice
meet the Record
constructor's requirements? This could help catch any type mismatches early, wdyt?
Let's check the Record
class usage:
✅ Verification successful
Let me search for the Record class definition to understand its type requirements.
No type safety concerns here! 🎯
Looking at the Record class definition, the code is already type-safe since:
data
parameter acceptsMapping[str, Any]
which matches the normalized data typestream_slice
is correctly passed as an optionalStreamSlice
The implementation aligns perfectly with the type hints, so no additional runtime checks are needed.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for Record class definition and its usages
echo "Finding Record class definition:"
rg -A 5 "class Record"
echo "\nFinding Record instantiations:"
ast-grep --pattern 'Record($$$)'
Length of output: 55228
Script:
#!/bin/bash
# Find Record class definition and its type hints
rg -A 10 "class Record\(" airbyte_cdk/sources/types.py
Length of output: 423
unit_tests/sources/declarative/extractors/test_record_selector.py (2)
118-120
: Consider adding test cases for the new should_be_synced
feature.
I notice we're updating the Record
instantiation to use keyword arguments, which is great for clarity! However, given the PR introduces a new should_be_synced
feature, should we add some test cases to verify its behavior with different cursor scenarios? wdyt? 🤔
Let me check if we have any test coverage for this feature:
Also, for consistency, should we update the Record
instantiation in test_schema_normalization
(line 239) to use keyword arguments as well? This would make the code more uniform across the test file.
Line range hint 11-82
: Verify test cases alignment with cursor changes.
Since we're removing low-code cursor usage, should we review if these test cases still align with the new implementation? For example, the test_with_extractor_and_filter
case uses created_at
for filtering - is this still the expected pattern? 🤔
Let me check the cursor-related changes:
✅ Verification successful
Let me check the test cases and cursor-related changes in more detail:
Let me check the implementation of the record selector to understand the cursor usage better:
Let me check the actual implementation of the record selector to understand how it handles cursor-related filtering:
Let me check the record filter implementation since that's where the cursor-based filtering happens:
The test cases look good with the cursor changes! 🎯
After reviewing the implementation, I can see that the test cases in test_record_selector.py
are still valid and align well with the cursor changes because:
- The test cases focus on record filtering behavior using
created_at
which is still a valid pattern - theRecordFilter
continues to support both template-based filtering ({{ record['created_at'] > stream_state['created_at'] }}
) and cursor-based filtering throughClientSideIncrementalRecordFilterDecorator
- The test's
StreamSlice
usage withcursor_slice
matches the current implementation's slice handling in bothRecordSelector
andRecordFilter
- The test coverage includes both filtered and unfiltered scenarios, which remain relevant regardless of low-code cursor changes
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Look for cursor-related changes in the codebase
rg -A 3 'cursor' 'airbyte_cdk/sources/'
Length of output: 134169
Script:
#!/bin/bash
# Look for cursor-related changes in the test file and related files
rg -A 3 'cursor' 'unit_tests/sources/declarative/extractors/test_record_selector.py'
# Look for related test files
fd test_record_selector.py
Length of output: 393
Script:
#!/bin/bash
# Look for the record selector implementation
ast-grep --pattern 'class RecordSelector {
$$$
}'
# Also check for any recent changes to record selector
rg -A 3 'class RecordSelector' 'airbyte_cdk/sources/declarative/extractors/'
Length of output: 593
Script:
#!/bin/bash
# Look at the record selector implementation
cat airbyte_cdk/sources/declarative/extractors/record_selector.py
Length of output: 5808
Script:
#!/bin/bash
# Look at the record filter implementation
cat airbyte_cdk/sources/declarative/extractors/record_filter.py
Length of output: 3379
airbyte_cdk/sources/concurrent_source/concurrent_source.py (1)
21-21
: Verify type compatibility with the new Record import
I notice we're changing the Record import from concurrent-specific to general types. This aligns with removing low-code cursor usage, but could we verify the type compatibility? Especially since there were concerns about Mapping[str, Any]
vs Record
type mismatches in the PR comments, wdyt?
✅ Verification successful
The Record import change looks safe to proceed with
Based on the verification, the Record class from airbyte_cdk.sources.types
maintains the same interface as expected by the concurrent source implementation. The class is used primarily in type hints and instance checks, and the implementation in concurrent_read_processor.py
shows standard record processing that's compatible with the general Record type. The change aligns well with removing low-code cursor usage without breaking type compatibility.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check if the new Record type maintains compatibility with existing usage
# Look for any type hints or assertions related to Record type
# Search for Record type hints
rg -t py "Record[^a-zA-Z]" -A 2 -B 2
# Search for isinstance checks with Record
ast-grep --pattern 'isinstance($_, Record)'
# Look for Record construction patterns
ast-grep --pattern 'Record($$$)'
Length of output: 101993
Script:
#!/bin/bash
# Let's check the Record class usage in concurrent_source.py and its dependencies
rg -t py "Record" airbyte_cdk/sources/concurrent_source/concurrent_source.py -A 2 -B 2
# Let's also check if there are any type hints or isinstance checks specifically in concurrent_source.py
ast-grep --pattern 'Record[^a-zA-Z]' airbyte_cdk/sources/concurrent_source/concurrent_source.py
# Check the definition of Record in both old and new locations
rg -t py "class Record" airbyte_cdk/sources/
Length of output: 1557
Script:
#!/bin/bash
# Let's check the Record class definition in both locations to compare their interfaces
rg -t py -B 5 -A 10 "class Record" airbyte_cdk/sources/types.py
# Also check if there are any Record-specific methods or type hints in concurrent_source
ast-grep --pattern 'def $_($_: Record)' airbyte_cdk/sources/concurrent_source/concurrent_source.py
# And let's see how Record is used in the queue handling
rg -t py "QueueItem = Union" airbyte_cdk/sources/concurrent_source/
Length of output: 705
Script:
#!/bin/bash
# Let's check how Record is used in type definitions and queue handling
rg -t py "Union.*Record.*]" airbyte_cdk/sources/concurrent_source/
# And check the implementation details around Record handling
rg -t py "def on_record" -A 5 airbyte_cdk/sources/concurrent_source/
Length of output: 880
unit_tests/sources/streams/concurrent/scenarios/incremental_scenarios.py (2)
19-20
: LGTM! Clean import addition.
The new StreamSlice import is well-placed and consistently used throughout the test scenarios.
34-38
: Question about the empty cursor_slice parameter
I notice we're consistently using cursor_slice={}
across all test scenarios. This seems intentional, but I'm curious about the reasoning. Would it make sense to add a comment explaining why we're using an empty cursor_slice here, or are there cases where we might want to test with non-empty cursor_slices? wdyt? 🤔
Let me check if there are any other test files using non-empty cursor_slices:
Also applies to: 74-80, 125-129, 165-169, 226-230, 266-270
✅ Verification successful
Let me check the test file to understand the context better:
Also, let me check if there's any documentation about StreamSlice:
Empty cursor_slice is intentional in this test scenario
Looking at the codebase, the empty cursor_slice={} is correct here because:
- This is a test for incremental scenarios without slice boundaries
- The cursor_slice is used for incremental sync metadata like pagination tokens or date ranges
- In these test cases, we're specifically testing partition-based slicing without any cursor-based slicing
- The test file follows the pattern where cursor_slice is only populated in tests that involve cursor-based incremental sync
The empty cursor_slice={} helps maintain a clear separation between partition-based slicing (used for parallelization) and cursor-based slicing (used for incremental sync) in the test scenarios.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for StreamSlice usage with non-empty cursor_slice
rg -U "StreamSlice.*cursor_slice=\{[^}]+\}" unit_tests/
Length of output: 5190
Script:
#!/bin/bash
# Get the content of the test file
cat unit_tests/sources/streams/concurrent/scenarios/incremental_scenarios.py
Length of output: 11921
Script:
#!/bin/bash
# Search for StreamSlice class definition and documentation
ast-grep --pattern 'class StreamSlice' -A 10
Length of output: 46
Script:
#!/bin/bash
# Let's try to find the StreamSlice class definition with ripgrep
rg -U "class StreamSlice" -A 10
Length of output: 5580
Script:
#!/bin/bash
# Also search for any documentation or comments about cursor_slice
rg -U "cursor_slice.*\n.*\n.*\n" --type py
Length of output: 106881
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2)
Line range hint 1-254
: Overall implementation looks solid! 👍
The changes simplify the code while maintaining robust error handling and state management. The concurrent processing logic remains clear and well-documented.
20-20
: Verify Record import changes across the codebase
The move of Record
to airbyte_cdk.sources.types
looks like a good architectural decision! However, let's make sure all usages are updated consistently.
✅ Verification successful
Import changes look good across the codebase!
The search results show that:
- No instances of the old import pattern
from concurrent.partitions.record import Record
were found - The new import
from airbyte_cdk.sources.types import Record
is consistently used across the codebase - All relevant files (tests, core functionality, etc.) are using the new import pattern
This confirms that the architectural change has been properly implemented throughout the codebase.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any remaining old-style imports of Record
rg "from.*concurrent\.partitions\.record.*import.*Record"
# Search for new-style imports to confirm adoption
rg "from.*airbyte_cdk\.sources\.types.*import.*Record"
Length of output: 6407
airbyte_cdk/sources/file_based/stream/concurrent/adapters.py (2)
42-42
: LGTM! Clean import reorganization.
The import change aligns well with the PR's objective of removing low-code cursor usage. Nice work on maintaining consistency! 👍
250-250
: Nice improvement using named parameters! 🎯
The switch to named parameters makes the code more explicit and easier to maintain. Great attention to readability!
Also applies to: 268-268
unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py (1)
14-14
: LGTM! Clean import addition.
The StreamSlice import aligns well with the refactoring goals. Nice job on keeping the imports organized!
airbyte_cdk/sources/streams/concurrent/adapters.py (2)
40-40
: LGTM! Clean import reorganization.
Moving Record
to airbyte_cdk.sources.types
makes sense as it's a core type. This aligns well with the module organization principles.
297-301
: Consider performance impact of deep copying slices.
The deep copy of self._slice
ensures data isolation, which is great for safety! However, for large slices, this could potentially impact performance. Would you like to verify the performance impact with some benchmarks? We could add a size threshold above which we use a shallow copy instead, wdyt? 🤔
Let's check if there are any particularly large slices in the codebase:
airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py (1)
328-331
: Should we verify that no code relies on partition information in cursor records?
The empty partition dictionary {}
in the new StreamSlice could potentially affect functionality if any code expects partition information in cursor records. Let's verify this assumption.
airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (1)
343-346
: Let's verify the usage of this method across the codebase
Since this method is used in cursor-related operations, we should verify its usage to ensure consistency.
✅ Verification successful
The Record instantiation pattern is consistent across the codebase
The usage of Record
with StreamSlice
follows a consistent pattern throughout the codebase, particularly in cursor-related operations. The implementation in global_substream_cursor.py
matches the established pattern where:
- Empty partitions are used when converting cursor records
- The cursor_slice is preserved from the original record
- The pattern is used consistently in both
PerPartitionCursor
andGlobalSubstreamCursor
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find all usages of _convert_record_to_cursor_record and analyze the context
# to ensure it's being used correctly with the new Record instantiation pattern
# Search for method usage
echo "Searching for method usages..."
rg --type python "_convert_record_to_cursor_record" -B 2 -A 2
# Search for similar Record instantiation patterns to ensure consistency
echo "Checking for similar Record instantiation patterns..."
ast-grep --pattern 'Record(data=$_, associated_slice=StreamSlice(partition={}, cursor_slice=$_))'
Length of output: 380
Script:
#!/bin/bash
# Let's try with correct file type flag and more specific context
echo "Searching for method usages..."
rg -t py "_convert_record_to_cursor_record" -B 5 -A 5
# Let's also search for Record class usage patterns
echo "Searching for Record class usage patterns..."
rg -t py "Record\(.*associated_slice" -B 2 -A 2
# Let's look for StreamSlice usage patterns
echo "Searching for StreamSlice patterns..."
rg -t py "StreamSlice\(.*partition" -B 2 -A 2
Length of output: 95255
unit_tests/sources/file_based/stream/concurrent/test_adapters.py (2)
32-32
: LGTM! Clean import refactoring.
Moving Record
to airbyte_cdk.sources.types
makes sense as it's a core type used across different parts of the codebase.
79-84
: LGTM! Nice improvement to Record instantiation readability.
Using named parameters makes the test cases more explicit and maintainable. What do you think about adding a test case for handling empty or null data? This could help catch edge cases early. wdyt?
Also applies to: 93-98
unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py (2)
14-14
: LGTM! Import change aligns with the refactoring
The import change from concurrent.partitions
to types
reflects the standardization of the Record
type. This is a good architectural decision as it centralizes the type definition.
141-163
: Verify concurrent execution behavior
The multiple partition scenarios look good, but let's verify the concurrent execution behavior is working as expected.
Also applies to: 192-214
✅ Verification successful
Based on the shell script results, I can generate the final response as the verification is complete. The codebase shows robust thread-safety mechanisms:
Thread safety looks solid! The concurrent implementation has proper synchronization
The concurrent stream implementation uses appropriate thread-safety mechanisms:
- ThreadPoolExecutor for managing the worker pool
- Thread-safe Queue for inter-thread communication
- RLock for thread-safe operations in critical sections
- Proper synchronization primitives in key components:
- ThreadPoolManager uses Lock for state management
- PrintBuffer uses RLock for thread-safe writes
- FileBasedConcurrentCursor uses RLock for state and pending files
- PartitionEnqueuer safely coordinates work distribution
The test scenarios you're adding look good as they exercise the concurrent execution paths. The implementation follows thread-safety best practices, making it suitable for concurrent execution.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for potential race conditions or synchronization issues in the concurrent stream implementation
# Look for synchronization primitives and thread safety mechanisms
rg -A 3 "Lock|Event|Condition|Semaphore|ThreadPoolExecutor"
# Check for thread-safe collections
rg -A 3 "Queue|deque|OrderedDict"
# Look for potential thread safety issues
ast-grep --pattern 'class $_ {
$$$
def $method($$$) {
$$$
self.$_ = $_
$$$
}
$$$
}'
Length of output: 33553
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
13-13
: LGTM! Clean import organization.
The addition of logging and import reorganization improves code organization.
Also applies to: 19-21
237-239
: LGTM! Consistent with type changes.
The use of partition.to_slice()
aligns well with the earlier type changes.
179-179
: Verify type compatibility across usages
The type change from Partition
to Mapping
makes the field more generic. Let's verify this doesn't break type safety in other parts of the codebase.
✅ Verification successful
Type change from Partition
to Mapping
looks safe, wdyt?
The change appears to be type-safe because:
- The
Record
class usesassociated_slice
which is of typeStreamSlice
, andStreamSlice
is aMapping
- All usages of
_most_recent_cursor_value_per_partition
in the cursor.py file are accessing it throughrecord.associated_slice
, which is already aMapping
- The change makes the type more generic which is safer than being more specific in this case
- No direct dependencies on
Partition
type are found in the cursor implementation
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for usages of _most_recent_cursor_value_per_partition to verify type compatibility
ast-grep --pattern 'self._most_recent_cursor_value_per_partition'
Length of output: 632
Script:
#!/bin/bash
# Let's check the implementation to understand how record.associated_slice is used
ast-grep --pattern 'class Record'
# Also check where Partition type is defined and used
rg "class Partition" -A 5
rg "Partition.*=.*typing" -A 2
Length of output: 5569
Script:
#!/bin/bash
# Let's check the Record class and its associated_slice property
rg "class Record" -A 10
rg "associated_slice" -B 2 -A 2
# Also check the type hints and imports in the cursor file
rg "from.*import.*Partition" airbyte_cdk/sources/streams/concurrent/cursor.py
cat airbyte_cdk/sources/streams/concurrent/cursor.py | head -n 20
Length of output: 40607
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
482-482
: LGTM! Clean and consistent Record instantiation
The addition of associated_slice
parameter maintains consistency with the other record creation path. Nice work! 👍
unit_tests/sources/streams/test_stream_read.py (1)
36-36
: LGTM! Good move on reorganizing the imports.
Moving the Record
import to airbyte_cdk.sources.types
makes the codebase more organized and maintainable.
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (2)
33-33
: LGTM! Clean import refactoring.
The Record class import has been moved from a specific implementation location to a more general types module, which suggests better code organization. This change aligns with the principle of keeping common types in a centralized location.
Let's verify this refactoring is consistent across the codebase:
✅ Verification successful
Import refactoring looks consistent across the codebase!
The verification shows that:
- No instances of the old import path remain in the codebase
- The new import
from airbyte_cdk.sources.types import Record
is consistently used across test files and implementation files
This confirms the refactoring was thorough and complete. Nice work on maintaining consistency! 🎯
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check if there are any remaining imports from the old location
rg "from.*airbyte_cdk\.sources\.streams\.concurrent\.partitions\.record import Record"
# Check if the new import is used consistently
rg "from.*airbyte_cdk\.sources\.types import Record"
Length of output: 5129
85-85
: LGTM! Mock setup enhancement.
The addition of stream_name
to the Record mock object in setUp ensures consistent test behavior. This change appears to be related to the Record class refactoring, making the mock more complete.
Let's verify if this attribute is required by checking its usage:
✅ Verification successful
The previous pattern might have failed due to syntax or language support. Let's try to find the Record class first and then look for stream_name usage in the codebase to understand the context better.
We need to verify if the Record class actually requires the stream_name attribute by looking at its implementation.
LGTM! The stream_name attribute is indeed required in Record class.
Looking at the Record class implementation in airbyte_cdk/sources/types.py
, we can see that stream_name
is a valid constructor parameter and is assigned to the instance in self.stream_name = stream_name
. The mock setup enhancement correctly adds this required attribute to maintain consistency with the actual Record class implementation.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check how stream_name is used in Record class
ast-grep --pattern 'class Record {
$$$
stream_name
$$$
}'
Length of output: 65
Script:
#!/bin/bash
# Let's try a different approach to find Record class and stream_name usage
rg -l "class Record"
# Then search for stream_name usage
rg "stream_name" -A 2 -B 2
Length of output: 106143
Script:
#!/bin/bash
# Let's check the Record class implementation to understand the stream_name attribute
rg -g "*.py" "class Record" -A 10 -B 2
Length of output: 11418
unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py (1)
711-711
: Nice improvement to code clarity! 🎉
The explicit use of keyword arguments in Record(data=record_data, associated_slice=stream_slice)
makes the code more readable and maintainable. This change aligns well with Python best practices.
unit_tests/sources/streams/concurrent/test_cursor.py (2)
61-64
: LGTM! Nice improvement in Record instantiation.
The addition of associated_slice
parameter makes the relationship between records and their slices explicit, which improves test clarity and maintainability.
108-111
: LGTM! Consistent usage of StreamSlice across test cases.
The standardization of slice creation using StreamSlice with both partition and cursor_slice improves test consistency and readability.
Also applies to: 130-133, 157-160, 180-185
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (3)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
216-221
: Consider adding error handling for missing associated_sliceThe change from
partition
toassociated_slice
looks good, but what happens ifassociated_slice
is None? Should we add a guard clause? wdyt?def observe(self, record: Record) -> None: + if record.associated_slice is None: + LOGGER.warning("Record has no associated_slice, skipping cursor update") + return most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( record.associated_slice )
237-239
: Consider enhancing the error messageThe change to use
to_slice()
is consistent, but should we make the error message more descriptive if the slice is missing? wdyt?most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( - partition.to_slice() + partition.to_slice() if partition else None, # Handle None partition case )
451-465
: Consider clarifying boundary check behaviorThe implementation looks solid with good error handling and documentation! A few suggestions to make it even better:
- Should we make the boundary check configurable (inclusive/exclusive)? This could help prevent duplicate records in edge cases.
- The warning message is helpful, but should we add debug logging for the actual cursor values when a record is synced/not synced?
wdyt?
def should_be_synced(self, record: Record) -> bool: """ Determines if a record should be synced based on its cursor value. Args: record: The record to evaluate Returns: bool: True if the record's cursor value falls within the sync boundaries + + Note: + Boundaries are inclusive by default. This means records with cursor values + exactly matching the boundaries will be synced. """ try: record_cursor_value: CursorValueType = self._extract_cursor_value(record) except ValueError: LOGGER.warning( f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record. The incremental sync will assume it needs to be synced" ) return True + should_sync = self.start <= record_cursor_value <= self._end_provider() + LOGGER.debug(f"Record cursor value: {record_cursor_value}, sync decision: {should_sync}") + return should_sync
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/extractors/record_selector.py
(2 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
(1 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte_cdk/sources/declarative/extractors/record_selector.py
- airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
🔇 Additional comments (2)
airbyte_cdk/sources/streams/concurrent/cursor.py (2)
13-13
: LGTM! Good refactoring of imports
Moving Record
to a centralized types module and adding proper logging setup improves code organization and observability.
Also applies to: 19-21
179-179
: Verify type compatibility with existing code
The type change from Partition
to Mapping
aligns with the refactoring goal. However, let's ensure all callers handle this type correctly.
✅ Verification successful
Let me analyze the usage of _most_recent_cursor_value_per_partition
to verify the type change impact.
Let me check how the associated_slice
is used in the Record class to ensure type compatibility.
Type change from Partition
to Mapping
is safe and compatible
The change is safe because:
Record.associated_slice
returns aStreamSlice
type which is used as the key in_most_recent_cursor_value_per_partition
- All usages of this field are through dictionary operations (get/set) which work with any hashable type
- The tests continue to use
partition.to_slice()
which returns aMapping
type
The type change aligns with the actual usage pattern in the codebase and doesn't impact functionality. Nice cleanup! 👍
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for usage of _most_recent_cursor_value_per_partition
rg "_most_recent_cursor_value_per_partition" -t py
# Search for type hints using Partition
rg "Partition[^:]" -t py
Length of output: 77695
Script:
#!/bin/bash
# Search for the implementation of ConcurrentCursor class to understand how the field is used
ast-grep --pattern $'class ConcurrentCursor {
$$$
_most_recent_cursor_value_per_partition$$$
$$$
}'
# Search for any direct access to this field
rg "_most_recent_cursor_value_per_partition\[" -t py -A 2 -B 2
Length of output: 1968
Script:
#!/bin/bash
# Search for Record class definition and its associated_slice property
ast-grep --pattern $'class Record {
$$$
associated_slice$$$
$$$
}'
# Search for Record class usage and associated_slice access
rg "associated_slice" -t py -A 2 -B 2
Length of output: 29464
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple more comments. This is clean. I think we are hitting the last miles on this one. Thanks Artem for tackling this!
airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
airbyte_cdk/sources/declarative/extractors/record_filter.py (1)
80-83
: Consider adding type safety checks for cursor access?Given the concerns raised in PR comments about type compatibility, would it make sense to add a type guard here? Something like:
if (self._substream_cursor or self._date_time_based_cursor).should_be_synced( + record if isinstance(record, Record) else Record(data=record, associated_slice=stream_slice) - Record(data=record, associated_slice=stream_slice) )This would prevent unnecessary Record wrapping and handle cases where a Record instance is already passed, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/extractors/record_filter.py
(2 hunks)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/extractors/record_filter.py (3)
13-13
: Consider organizing imports alphabetically?
Previous review raised this point about import order. Just keeping it here for consistency, wdyt?
82-82
: Document parameter omission rationale?
Previous review asked about documenting why it's okay to omit parameters like stream_name
. Would you mind adding a brief comment explaining this? It would help future maintainers understand the design decision.
80-83
: LGTM! Clean refactoring of cursor logic.
The changes nicely simplify the filtering logic by delegating to cursor implementations. The removal of datetime-specific logic makes the code more maintainable and focused.
…code-cursor-usage # Conflicts: # airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (4)
airbyte_cdk/sources/file_based/stream/concurrent/adapters.py (1)
251-251
: The Record instantiation changes look good! Consider adding parameter validation?The addition of
stream_name
andis_file_transfer_message
parameters provides better context for the records. However, what do you think about adding some validation for these new parameters? For example, ensuringstream_name
is not empty? wdyt?def read(self) -> Iterable[Record]: + if not self.stream_name(): + raise ValueError("stream_name cannot be empty") try: for record_data in self._stream.read_records(Also applies to: 269-271
airbyte_cdk/sources/streams/concurrent/cursor.py (2)
178-178
: Consider documenting the type change rationale.The type change from
Partition
toStreamSlice
for_most_recent_cursor_value_per_partition
is a significant change. Would you consider adding a comment explaining why this change was made? This could help future maintainers understand the design decision, wdyt?
450-464
: The implementation of should_be_synced looks good, but consider a few enhancements.The error handling aligns with the discussion in past review comments about proceeding with
True
when the cursor field is missing. However, a few suggestions:
- The docstring could be more detailed about the behavior when the cursor field is missing
- Consider adding debug-level logging for when records are skipped
- The type ignore comment could be more specific about why it's needed
What do you think about this enhancement?
def should_be_synced(self, record: Record) -> bool: """ Determines if a record should be synced based on its cursor value. + + If the cursor field is missing from the record, the method will log a warning + and return True to ensure no records are accidentally skipped. + :param record: The record to evaluate - :return: True if the record's cursor value falls within the sync boundaries """ try: - record_cursor_value: CursorValueType = self._extract_cursor_value(record) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ + # Type ignore: cursor_field is converted to an InterpolatedString in __post_init__ + record_cursor_value: CursorValueType = self._extract_cursor_value(record) # type: ignore[assignment] except ValueError: LOGGER.warning( f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record. The incremental sync will assume it needs to be synced" ) return True - return self.start <= record_cursor_value <= self._end_provider() + should_sync = self.start <= record_cursor_value <= self._end_provider() + if not should_sync: + LOGGER.debug(f"Skipping record with cursor value {record_cursor_value} as it's outside the sync range [{self.start}, {self._end_provider()}]") + return should_syncunit_tests/sources/streams/concurrent/test_cursor.py (1)
1576-1638
: Great test coverage! Consider adding more edge cases?The parametrized test thoroughly covers the main scenarios. Would you consider adding a few more test cases to make it even more robust? For example:
- Edge case where cursor value equals the boundary values
- Different data types for cursor fields (strings, dates, etc.)
What do you think?
Here's a suggestion for additional test cases:
[ Record( data={_A_CURSOR_FIELD_KEY: 0}, # Equal to lower boundary associated_slice=_partition( {_LOWER_SLICE_BOUNDARY_FIELD: 0, _UPPER_SLICE_BOUNDARY_FIELD: 10} ).to_slice(), ), True, ], [ Record( data={_A_CURSOR_FIELD_KEY: "2024-01-01"}, # String date associated_slice=_partition( {_LOWER_SLICE_BOUNDARY_FIELD: "2024-01-01", _UPPER_SLICE_BOUNDARY_FIELD: "2024-01-02"} ).to_slice(), ), True, ]
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (26)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
(2 hunks)airbyte_cdk/sources/concurrent_source/concurrent_source.py
(1 hunks)airbyte_cdk/sources/declarative/extractors/record_filter.py
(2 hunks)airbyte_cdk/sources/declarative/extractors/record_selector.py
(2 hunks)airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py
(3 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
(1 hunks)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
(2 hunks)airbyte_cdk/sources/file_based/stream/concurrent/adapters.py
(3 hunks)airbyte_cdk/sources/streams/concurrent/adapters.py
(2 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(5 hunks)airbyte_cdk/sources/types.py
(3 hunks)unit_tests/sources/declarative/extractors/test_record_filter.py
(2 hunks)unit_tests/sources/declarative/extractors/test_record_selector.py
(1 hunks)unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py
(1 hunks)unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py
(1 hunks)unit_tests/sources/file_based/stream/concurrent/test_adapters.py
(5 hunks)unit_tests/sources/streams/concurrent/scenarios/incremental_scenarios.py
(7 hunks)unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py
(3 hunks)unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py
(8 hunks)unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_source_builder.py
(1 hunks)unit_tests/sources/streams/concurrent/test_adapters.py
(1 hunks)unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
(2 hunks)unit_tests/sources/streams/concurrent/test_cursor.py
(24 hunks)unit_tests/sources/streams/concurrent/test_partition_reader.py
(1 hunks)unit_tests/sources/streams/test_stream_read.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (22)
- airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
- airbyte_cdk/sources/concurrent_source/concurrent_source.py
- airbyte_cdk/sources/declarative/extractors/record_filter.py
- airbyte_cdk/sources/declarative/extractors/record_selector.py
- airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py
- airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
- airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
- airbyte_cdk/sources/streams/concurrent/adapters.py
- airbyte_cdk/sources/types.py
- unit_tests/sources/declarative/extractors/test_record_filter.py
- unit_tests/sources/declarative/extractors/test_record_selector.py
- unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py
- unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py
- unit_tests/sources/file_based/stream/concurrent/test_adapters.py
- unit_tests/sources/streams/concurrent/scenarios/incremental_scenarios.py
- unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py
- unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py
- unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_source_builder.py
- unit_tests/sources/streams/concurrent/test_adapters.py
- unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
- unit_tests/sources/streams/concurrent/test_partition_reader.py
- unit_tests/sources/streams/test_stream_read.py
🔇 Additional comments (7)
airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py (1)
197-197
: Nice improvement to cursor selection logic! 👍
The change to use _get_active_cursor()
aligns perfectly with the class's single-cursor-at-a-time design and prevents potential exceptions from the previous OR-based implementation. Much cleaner approach, wdyt?
airbyte_cdk/sources/file_based/stream/concurrent/adapters.py (2)
45-45
: LGTM! Import change aligns with standardization efforts.
The change to import Record from airbyte_cdk.sources.types
aligns with the broader effort to standardize Record usage across the codebase.
251-251
: Let's verify the impact of these Record parameter changes.
These changes to Record instantiation might affect downstream consumers. Should we verify that all consumers handle these new parameters correctly?
Also applies to: 269-271
✅ Verification successful
Let me analyze the Record instantiation patterns to verify if there are any potential issues. I'll search for any usage of the parameters we're concerned about.
No changes needed - Record parameters are handled consistently
Based on the analysis of Record usage in the codebase:
- The Record class definition shows
stream_name
is a supported parameter - All Record instantiations across the codebase consistently handle the
stream_name
parameter - The changes in the file are consistent with the existing Record usage patterns
- No downstream consumers are impacted as they access Record attributes through well-defined interfaces
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for Record usage patterns in the codebase
# Look for places where Record is instantiated or consumed
# Find Record instantiations
echo "Checking Record instantiations:"
rg "Record\(" -A 2
# Find Record parameter access
echo "\nChecking Record parameter access:"
ast-grep --pattern 'record.$stream_name'
ast-grep --pattern 'record.$is_file_transfer_message'
Length of output: 46815
Script:
#!/bin/bash
# Let's check the Record class definition and its usage in the codebase
echo "Checking Record class definition:"
rg -A 10 "^class Record" airbyte_cdk/sources/types.py
echo "\nChecking Record parameter access patterns:"
rg "record\." -A 2
Length of output: 56581
airbyte_cdk/sources/streams/concurrent/cursor.py (2)
6-6
: LGTM! Clean import reorganization.
The addition of logging and the reorganization of imports improves the code structure. The Record
import change aligns with the type system better.
Also applies to: 18-20
236-238
: LGTM! Consistent slice handling.
The change to use partition.to_slice()
aligns with the new approach of using StreamSlice throughout the codebase.
unit_tests/sources/streams/concurrent/test_cursor.py (2)
32-32
: LGTM! Clean refactor of imports and helper function.
The changes align well with the PR's objective of removing low-code cursor usage. The _record helper function update properly handles the associated_slice parameter.
Also applies to: 60-63
107-110
: LGTM! Consistent migration to StreamSlice.
The test cases have been systematically updated to use StreamSlice objects instead of raw dictionaries, which improves type safety and maintainability.
Also applies to: 129-132, 156-159, 179-184, 234-236, 750-753, 777-780, 805-810, 823-828, 841-844, 847-850, 914-917, 1178-1184, 1275-1281, 1364-1370, 1374-1380, 1486-1492, 1496-1502, 1506-1512
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (10)
unit_tests/sources/file_based/stream/concurrent/test_adapters.py (2)
80-85
: Consider reducing test data duplication?The Record instantiations look much clearer with named parameters! 👍 To make it even better, what do you think about extracting the common test data into a helper function or fixture? Something like this maybe? wdyt?
def create_test_record(data_value, stream_name=_STREAM_NAME): return Record( data={"data": data_value}, stream_name=stream_name, )Also applies to: 94-99
297-304
: Consider aligning record creation pattern?The list comprehension is neat! 🎯 Would it make sense to align this with the pattern used in other tests for consistency? Maybe using the same helper function suggested above, with an optional
associated_slice
parameter? wdyt?unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py (2)
363-370
: Consider using a helper function for Record creation, wdyt?The Record instantiation pattern is repeated multiple times with similar parameters. To reduce duplication and improve maintainability, we could extract this into a helper function:
def create_test_record(cursor_value: str, slice: StreamSlice, stream_name: str) -> Record: return Record( data={"a record key": "a record value", CURSOR_FIELD: cursor_value}, associated_slice=slice, stream_name=stream_name )Then usage would be simpler:
records_list = [ [ create_test_record("2022-01-15", partition_slices[0], stream_name), create_test_record("2022-01-16", partition_slices[0], stream_name), ], # ... ]Also applies to: 375-377, 382-384, 391-393
Line range hint
477-566
: Consider parameterizing the test data, what do you think?The test data in
test_perpartition_with_fallback
contains a lot of repeated Record creation with different dates. We could make this more maintainable by using a data-driven approach:test_data = [ ("2022-01-15", 0), ("2022-01-16", 0), ("2022-02-15", 0), # ... more date and slice index pairs ] records_list = [ [create_test_record(date, partition_slices[slice_idx], stream_name)] for date, slice_idx in test_data ]This would make it easier to modify test cases and see the test data pattern at a glance.
unit_tests/sources/declarative/incremental/test_per_partition_cursor.py (1)
280-286
: LGTM! Consider extracting the test stream name to a constant?The test correctly verifies the delegation to the underlying cursor with the new
stream_name
parameter. Since "test_stream" is used multiple times, wdyt about extracting it to a constant at the module level for better maintainability?+TEST_STREAM_NAME = "test_stream" + def test_when_should_be_synced_then_delegate_to_underlying_cursor(...): cursor.should_be_synced( - Record(data={}, associated_slice=first_slice, stream_name="test_stream") + Record(data={}, associated_slice=first_slice, stream_name=TEST_STREAM_NAME) )unit_tests/sources/declarative/retrievers/test_simple_retriever.py (2)
165-174
: Consider extracting stream_name as a constant, wdyt?The stream_name "stream_name" is used in multiple test records. Since this appears to be a test fixture value, we could improve maintainability by extracting it as a constant at the module level.
+ STREAM_NAME = "stream_name" + def test_simple_retriever_resumable_full_refresh_cursor_page_increment(...): - stream_name = "stream_name" expected_records = [ - Record(data={"id": "abc"}, associated_slice=None, stream_name=stream_name), + Record(data={"id": "abc"}, associated_slice=None, stream_name=STREAM_NAME), # ... similar changes for other records ]
267-274
: Consider using a more descriptive stream name for this test case, wdyt?The test is using "users" as the stream name which is good as it's descriptive, but we could make it even more specific to match the test's context about astronauts.
- Record(data={"name": "ed_baldwin"}, associated_slice=None, stream_name="users"), + Record(data={"name": "ed_baldwin"}, associated_slice=None, stream_name="astronauts"), # ... similar changes for other recordsunit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (2)
131-131
: Consider using a constant for stream name in MockStream.The stream name is derived from
self.name
, which is good. However, to make the tests more maintainable, would you consider defining a constant for the stream name at the class level? This would make it easier to update if needed, wdyt?class MockStream(DeclarativeStream): + DEFAULT_STREAM_NAME = "mock_stream" def __init__(self, slices, records, name, cursor_field="", cursor=None): self.config = {}
859-892
: Consider parameterizing the stream name in test cases.The stream name is hardcoded as "test_stream". To make the test more flexible and reusable, would you consider parameterizing it? This would allow testing different stream name scenarios, wdyt?
@pytest.mark.parametrize( - "use_incremental_dependency", + "use_incremental_dependency,stream_name", [ - pytest.param(False, id="test_resumable_full_refresh_stream_without_parent_checkpoint"), - pytest.param(True, id="test_resumable_full_refresh_stream_with_use_incremental_dependency_for_parent_checkpoint"), + pytest.param(False, "test_stream", id="test_resumable_full_refresh_stream_without_parent_checkpoint"), + pytest.param(True, "custom_stream", id="test_resumable_full_refresh_stream_with_use_incremental_dependency_for_parent_checkpoint"), ], ) -def test_substream_using_resumable_full_refresh_parent_stream(use_incremental_dependency): +def test_substream_using_resumable_full_refresh_parent_stream(use_incremental_dependency, stream_name):airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
Line range hint
1784-1815
: Consider adding a docstring for the name parameter?The addition of the
name
parameter improves component identification. Would you consider adding a docstring to explain its purpose and usage? This would help other developers understand when and how to use it, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (16)
airbyte_cdk/sources/declarative/extractors/record_filter.py
(2 hunks)airbyte_cdk/sources/declarative/extractors/record_selector.py
(4 hunks)airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py
(1 hunks)airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py
(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(4 hunks)airbyte_cdk/sources/types.py
(3 hunks)unit_tests/sources/declarative/extractors/test_record_filter.py
(2 hunks)unit_tests/sources/declarative/extractors/test_record_selector.py
(1 hunks)unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py
(3 hunks)unit_tests/sources/declarative/incremental/test_per_partition_cursor.py
(2 hunks)unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py
(5 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(1 hunks)unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
(3 hunks)unit_tests/sources/declarative/retrievers/test_simple_retriever.py
(4 hunks)unit_tests/sources/file_based/stream/concurrent/test_adapters.py
(5 hunks)unit_tests/sources/streams/concurrent/test_cursor.py
(18 hunks)
🚧 Files skipped from review as they are similar to previous changes (9)
- airbyte_cdk/sources/declarative/extractors/record_filter.py
- airbyte_cdk/sources/declarative/extractors/record_selector.py
- airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py
- airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py
- airbyte_cdk/sources/types.py
- unit_tests/sources/declarative/extractors/test_record_filter.py
- unit_tests/sources/declarative/extractors/test_record_selector.py
- unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py
- unit_tests/sources/streams/concurrent/test_cursor.py
🔇 Additional comments (11)
unit_tests/sources/file_based/stream/concurrent/test_adapters.py (2)
34-34
: LGTM! Clean import change
The move from concurrent.partitions.record
to sources.types
aligns well with the goal of removing low-code cursor usage. Nice cleanup! 🎯
314-321
: Verify associated_slice type consistency
I noticed that associated_slice
is a string "stream"
here but a partition
object in test_full_refresh
. Is this intentional? Should we maintain consistency between these test methods? 🤔
✅ Verification successful
Let me search for Record class definition and its usages to understand the expected type for associated_slice parameter.
Using string literal for associated_slice is valid here
Looking at the codebase, associated_slice
accepts various types including None
, string literals, and partition objects. In test files, we see both patterns being used based on the testing context - partition objects for partition-specific tests and simpler types (strings/None) for basic stream tests. Since this is testing basic stream functionality, using "stream"
is perfectly fine! 🎯
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other usages of associated_slice to verify the expected type
rg -A 2 "associated_slice" --type python
Length of output: 71
Script:
#!/bin/bash
# Search for Record class definition and usages
ast-grep --pattern 'class Record'
# Search for associated_slice in test files
rg "associated_slice" unit_tests/
Length of output: 10478
unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py (3)
336-341
: Nice refactoring with the stream_name
variable! 👍
The introduction of the stream_name
variable improves maintainability by reducing magic strings. This makes it easier to update the stream name if needed and reduces the chance of typos.
Line range hint 573-586
: Good consistency in using the stream_name
variable! 🎯
The stream descriptor is now using the stream_name
variable consistently, which aligns with the refactoring pattern seen throughout the file.
Line range hint 336-720
: Verify test coverage for edge cases
The tests cover the main scenarios well, but we might want to verify a few edge cases:
- What happens when a partition has no records?
- What happens when cursor values are not in chronological order?
- What happens when cursor values are invalid dates?
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (2)
368-369
: LGTM! The stream name is appropriately descriptive.
The use of "test_stream" as the stream name is clear and appropriate for this test case.
219-219
: Verify stream name consistency with retriever name, wdyt?
The retriever is initialized with stream_name
, but the test records use different stream names. Should we ensure they match for consistency?
✅ Verification successful
Looking good! The stream names are consistent within their test contexts
From the results, I can see that the stream names are being used consistently within their respective test contexts. In the test test_simple_retriever_resumable_full_refresh_cursor_page_increment
, the stream_name
variable is defined at the start and used consistently throughout both the retriever initialization and the test records. Other tests use different stream names like "users" and "test_stream" but they maintain their own consistency within their test contexts.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for consistency between retriever names and record stream names
# Look for patterns where retriever name and record stream name might mismatch
rg -A 5 'name="stream_name".*\n.*stream_name=' unit_tests/sources/declarative/retrievers/test_simple_retriever.py
Length of output: 307
Script:
#!/bin/bash
# Let's search for the test context around the stream_name usage
rg -B 5 -A 10 'name=stream_name' unit_tests/sources/declarative/retrievers/test_simple_retriever.py
# Also search for any stream_name assignments in test records
rg 'stream_name.*=' unit_tests/sources/declarative/retrievers/test_simple_retriever.py
Length of output: 3378
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (1)
995-1026
: LGTM! Test coverage looks good.
The Record instantiations consistently include the stream_name parameter, which is good practice. The test cases cover both the parent and child stream scenarios effectively.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
Line range hint 1886-1891
: LGTM! Consistent parameter propagation
The name parameter is correctly propagated to the record selector creation, maintaining consistency with the method signature changes.
Line range hint 2041-2048
: LGTM! Consistent constructor usage
The name parameter is properly passed to the RecordSelector constructor, completing the chain of identification propagation.
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
1312-1312
: Question about adding the name
parameter
I noticed that you've added the name="test_stream"
argument to the create_component
call for RecordSelectorModel
. Is this consistent with how create_component
is used elsewhere, and does it expect a name
parameter for this model type? Just wanted to confirm to ensure consistency across the codebase. Wdyt?
To help verify, you can run the following script to check for other instances where create_component
is called with the name
parameter for RecordSelectorModel
:
✅ Verification successful
The previous ast-grep pattern might have been too specific. Let's try with ripgrep to get a broader view of how RecordSelectorModel is used in the codebase.
The name
parameter is correctly used here
Looking at the implementation in airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
, I can see that create_record_selector
method explicitly requires a name
parameter in its signature. The test is following the expected interface correctly.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find all calls to create_component with 'name' parameter when model_type is RecordSelectorModel
# Search for create_component calls with model_type=RecordSelectorModel and a name parameter
ast-grep --pattern $'create_component(\n model_type=RecordSelectorModel,\n name: $_,\n $$$\n)'
Length of output: 94
Script:
#!/bin/bash
# Let's try a different pattern to find create_component calls with RecordSelectorModel
rg -A 5 "create_component.*RecordSelectorModel"
# Also search for any usage of RecordSelectorModel to understand the context better
rg "RecordSelectorModel" -A 3 -B 3
Length of output: 4230
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! In terms of new version, can we confirm that the changes to Record
will not be breaking for the sources? If not, I'm all good to merge this
Removed |
What
Resolve https://github.com/airbytehq/airbyte-internal-issues/issues/10568
How
should_be_synced
Review guide
airbyte_cdk/sources/streams/concurrent/cursor.py
User Impact
Can this PR be safely reverted and rolled back?
Summary by CodeRabbit
Release Notes
New Features
stream_name
parameter in various components for enhanced traceability.Bug Fixes
Tests
StreamSlice
representation.Record
instantiation.DatetimeBasedCursor
functionality across various scenarios.Record
class.