-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: (low code)run state migrations for concurrent streams #316
fix: (low code)run state migrations for concurrent streams #316
Conversation
/autofix
|
📝 WalkthroughWalkthroughThe changes introduce a new static method Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Wdyt? Do these updates look good to you? Let me know if there's anything you'd like to adjust! ✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
unit_tests/sources/declarative/custom_state_migration.py (2)
13-25
: Consider adding docstrings to improve code documentation.The class and its methods would benefit from docstrings explaining their purpose and behavior. For example:
class CustomStateMigration(StateMigration): + """Handles state migration for declarative streams with partitioned states. + + This class evaluates cursor fields against the provided configuration and + creates a new migrated state with specific partition types. + """ declarative_stream: DeclarativeStream config: Config
26-28
: Consider making should_migrate more selective.The method always returns True, which means migration will be attempted for all states. Should we add some validation to determine when migration is actually needed? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(2 hunks)unit_tests/sources/declarative/custom_state_migration.py
(1 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py
(1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
[error] 229-229: Incompatible types in assignment (expression has type "Mapping[str, Any]", variable has type "MutableMapping[str, Any]")
[error] 339-339: Incompatible types in assignment (expression has type "Mapping[str, Any]", variable has type "MutableMapping[str, Any]")
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Validate PR title
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
unit_tests/sources/declarative/custom_state_migration.py (1)
29-47
: Verify the migrated state structure.The migration creates a fixed state with two partitions of type "type_1" and "type_2". Let's ensure this structure aligns with the expected state format.
unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)
1234-1461
: LGTM! Comprehensive test coverage for state migration.The test thoroughly validates:
- State migration configuration
- HTTP request mocking
- State transformation verification
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is something missing in my explanation during our sync. This PR updates the state that is used on the retriever (which seems to cover this hack) but not the one on the cursor. I'm not even sure the hack needs to be maintained at that point. @brianjlai what do you think?
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Outdated
Show resolved
Hide resolved
unit_tests/sources/declarative/test_concurrent_declarative_source.py
Outdated
Show resolved
Hide resolved
@maxi297 Yeah that sounds correct. I think the missing piece here is that we need to also run a state migration after we read in state from That will also require that we pass in the migrations to the model factory. Or an idea that i kind of like is that when we parse the manifest into the DatetimeBasedCursorModel, we also append the state migrations to the pydantic model. And we have it available to run from edit: actually we don't want to modify the schema to change the model so I'm being dumb. maybe we just pass it in as a parameter to |
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
6-6
: Consider organizing imports alphabetically?The pipeline indicates that the import block is unsorted. Would you like me to help organize them alphabetically, wdyt?
-from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple, MutableMapping +from typing import ( + Any, + Generic, + Iterator, + List, + Mapping, + MutableMapping, + Optional, + Tuple, +)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(4 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- unit_tests/sources/declarative/test_concurrent_declarative_source.py
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
[warning] 5-5: Import block is un-sorted or un-formatted. Organize imports.
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
528-537
: LGTM! Clean implementation of state migration.The static method effectively handles state migrations and properly converts the immutable mapping to a mutable one using
dict()
. Nice job implementing the feedback from previous reviews!
227-227
: LGTM! Well-placed integration points.The state migration is correctly applied at both points where stream state is retrieved, ensuring proper state handling for both datetime incremental streams and streams with concurrent partition processing.
Also applies to: 335-335
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1197-1262
: Should we applystream_state_migrations
tostream_state
here as well?In
create_concurrent_cursor_from_perpartition_cursor
, we passstream_state_migrations
to the cursor factory, but it seems we are not applying the migrations tostream_state
before using it. Unlikecreate_concurrent_cursor_from_datetime_based_cursor
, we're not applying the migrations directly in this method. Should we apply the migrations tostream_state
here to ensure consistent state handling across methods? Wdyt?
🧹 Nitpick comments (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
946-946
: Should we specify a more precise type forstream_state_migrations
?Currently,
stream_state_migrations
is typed asOptional[List[Any]]
. Specifying a more precise type, such asOptional[List[StateMigration]]
, could improve type checking and readability. Wdyt?
957-961
: Can we refactor the state migration logic into a helper method?Noticing that the logic for applying
stream_state_migrations
tostream_state
is (or should be) similar in bothcreate_concurrent_cursor_from_datetime_based_cursor
andcreate_concurrent_cursor_from_perpartition_cursor
. Would it make sense to extract this into a helper method to avoid duplication and ensure consistency? Wdyt?Also applies to: 1197-1199
unit_tests/sources/declarative/test_concurrent_declarative_source.py (3)
1234-1383
: Consider adding more test cases for state migration scenarios.The test verifies basic state migration functionality, but we could make it more comprehensive. What do you think about adding test cases for:
- Multiple state migrations
- Failed state migrations
- Empty state migrations
Also, would you like me to help generate these additional test cases?
1362-1371
: Consider using a more descriptive variable name for clarity.The variable
state_blob
could be more descriptive. What do you think about renaming it toinitial_state_blob
orunmigrated_state_blob
to better reflect its purpose? wdyt?
1376-1382
: Consider adding descriptive error messages for assertions.The assertions could benefit from more descriptive error messages. What do you think about:
- assert ( - concurrent_streams[0].cursor.state.get("state") != state_blob.__dict__ - ), "State was not migrated." - assert concurrent_streams[0].cursor.state.get("states") == [ - {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_1"}}, - {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_2"}}, - ], "State was migrated, but actual state don't match expected" + assert ( + concurrent_streams[0].cursor.state.get("state") != state_blob.__dict__ + ), "Expected state to be migrated but it remained unchanged" + assert concurrent_streams[0].cursor.state.get("states") == [ + {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_1"}}, + {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_2"}}, + ], "Expected migrated state to have two partitions with updated_at='2024-08-21' but got different state"unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
3284-3340
: The test looks good but could be more comprehensive!The test effectively validates the basic state migration functionality. However, we could make it more robust by adding a few more test cases. What do you think about:
- Testing when
should_migrate
returnsFalse
?- Testing with multiple state migrations to ensure they're applied in order?
- Adding assertions to verify the input state is not modified?
Here's a suggested enhancement to make the test more comprehensive:
def test_create_concurrent_cursor_from_datetime_based_cursor_runs_state_migrations(): class DummyStateMigration: def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: return True def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: updated_at = stream_state["updated_at"] return { "states": [ { "partition": {"type": "type_1"}, "cursor": {"updated_at": updated_at}, }, { "partition": {"type": "type_2"}, "cursor": {"updated_at": updated_at}, }, ] } + class NoOpStateMigration: + def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: + return False + + def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: + return stream_state + stream_name = "test" config = { "start_time": "2024-08-01T00:00:00.000000Z", "end_time": "2024-09-01T00:00:00.000000Z", } stream_state = {"updated_at": "2025-01-01T00:00:00.000000Z"} + original_state = stream_state.copy() connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) connector_state_manager = ConnectorStateManager() cursor_component_definition = { "type": "DatetimeBasedCursor", "cursor_field": "updated_at", "datetime_format": "%Y-%m-%dT%H:%M:%S.%fZ", "start_datetime": "{{ config['start_time'] }}", "end_datetime": "{{ config['end_time'] }}", "partition_field_start": "custom_start", "partition_field_end": "custom_end", "step": "P10D", "cursor_granularity": "PT0.000001S", "lookback_window": "P3D", } concurrent_cursor = ( connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor( state_manager=connector_state_manager, model_type=DatetimeBasedCursorModel, component_definition=cursor_component_definition, stream_name=stream_name, stream_namespace=None, config=config, stream_state=stream_state, - stream_state_migrations=[DummyStateMigration()], + stream_state_migrations=[NoOpStateMigration(), DummyStateMigration()], ) ) assert concurrent_cursor.state["states"] == [ {"cursor": {"updated_at": stream_state["updated_at"]}, "partition": {"type": "type_1"}}, {"cursor": {"updated_at": stream_state["updated_at"]}, "partition": {"type": "type_2"}}, ] + assert stream_state == original_state, "Input state should not be modified"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(6 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(1 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
957-961
: Applying state migrations correctlyThe logic for applying state migrations to
stream_state
looks good. This ensures that any necessary migrations are applied before processing. Nice work!
@maxi297 @brianjlai |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One comment on the scope of the migration for perpartition. I think we can make it even more useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just small nits but I'm good with this one. Thanks Daryna!
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
937-946
: Would you consider adding type hints and docstring? wdyt?The new static method could benefit from:
- Type hints for the
Any
type to be more specific (e.g.,StateMigration
)- A docstring explaining the method's purpose, parameters, and return value
@staticmethod - def apply_stream_state_migrations( - stream_state_migrations: List[Any] | None, stream_state: MutableMapping[str, Any] - ) -> MutableMapping[str, Any]: + def apply_stream_state_migrations( + stream_state_migrations: List[StateMigration] | None, + stream_state: MutableMapping[str, Any] + ) -> MutableMapping[str, Any]: + """Apply a list of state migrations to the given stream state. + + Args: + stream_state_migrations: List of state migrations to apply + stream_state: The current stream state to migrate + + Returns: + The migrated stream state + """ if stream_state_migrations: for state_migration in stream_state_migrations: if state_migration.should_migrate(stream_state): stream_state = dict(state_migration.migrate(stream_state)) return stream_state
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
948-969
: LGTM! The state migration integration looks good.The addition of the
stream_state_migrations
parameter and its application to the stream state before processing is well-placed and follows the existing pattern of optional parameters.
1194-1257
: Could you verify if passing migrations twice is intentional? wdyt?The migrations are applied in two places:
- Passed to the cursor factory (line 1253)
- Applied directly to the stream state (line 1256)
Is this double application necessary, or should we only apply the migrations once? If it's intentional, consider adding a comment explaining why both are needed.
1765-1765
: LGTM! The state migrations are correctly passed to the cursor creation.The addition of
stream_state_migrations
parameter is consistent with the changes in other methods.
What
related to klaviyo campaigns stream to low code airbytehq/airbyte#51551
Concurrent framework gets the stream_state from the state manager and not the DeclarativeStream, where state migrations are called.
How
Added
state_migration.migrate(stream_state)
to the concurrent declarative source. Added unit test.Summary by CodeRabbit
New Features
Tests