-
Notifications
You must be signed in to change notification settings - Fork 25
feat: update ManifestDeclarativeSource
to invoke declarative config migrations/transformations/validations
#561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: update ManifestDeclarativeSource
to invoke declarative config migrations/transformations/validations
#561
Conversation
…nd-spec-class-for-config-migrations
…nd-spec-class-for-config-migrations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
unit_tests/sources/declarative/test_manifest_declarative_source.py (1)
2284-2335
: Good test for config transformations!This test confirms that transformations defined in the spec are correctly applied to the configuration. Should we also add a test that verifies transformation of invalid values, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (4)
airbyte_cdk/sources/declarative/manifest_declarative_source.py
(8 hunks)airbyte_cdk/sources/declarative/spec/spec.py
(3 hunks)unit_tests/sources/declarative/spec/test_spec.py
(0 hunks)unit_tests/sources/declarative/test_manifest_declarative_source.py
(3 hunks)
💤 Files with no reviewable changes (1)
- unit_tests/sources/declarative/spec/test_spec.py
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte_cdk/sources/declarative/manifest_declarative_source.py
- airbyte_cdk/sources/declarative/spec/spec.py
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (7)
unit_tests/sources/declarative/test_manifest_declarative_source.py (7)
12-12
: Import addition looks good!Added
Mock
andmock_open
imports to support the new tests for configuration migration and transformation functionality.
33-35
: ModelToComponentFactory import looks good!Clean addition of the import needed for creating test components.
2107-2141
: Well-structured fixture for migration testing!Nice job creating a comprehensive fixture that mocks all the necessary components for testing the migration functionality. This approach keeps the tests clean and focused on behavior rather than setup details.
2144-2211
: Great test for config migration!This test thoroughly verifies that unmigrated configs are properly processed:
- Migration is applied (CRSC → Coruscant)
- Control message is emitted
- Migrated config is persisted to filesystem
- All expected functions are called
The test structure makes the expected behavior very clear. Well done!
2214-2281
: Nice negative test case!Excellent complementary test to verify that no migration side-effects occur when the config is already in the expected format. This thorough verification of "do nothing" behavior is an important edge case.
2338-2400
: Solid positive validation test!Good test to verify that valid configurations pass validation without raising exceptions.
2402-2463
: Well-written negative validation test!This test nicely complements the positive test by verifying that invalid configurations properly trigger validation errors. The setup makes it clear what's expected to fail.
AirbyteEntrypoint
and ManifestDeclarativeSource
to invoke declarative config migrations/transformations/validationsManifestDeclarativeSource
to invoke declarative config migrations/transformations/validations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
11-11
: There's an unused import hereI noticed the
cast
import seems to be missing from the import statement. This matches a past review comment, so it's likely intentional, but just wanted to double-check if you're planning to use it, wdyt?
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (3)
147-174
: Nice implementation of config migration and transformation!The new config handling with migration, transformation and persistence looks good. I particularly like the error handling that resets the config if migration fails.
A few thoughts to consider:
- Would adding file operation error handling (like permission errors) be helpful?
- Have you considered potential thread safety issues if multiple instances try to write to the same config file?
166-167
: Is printing to stdout the best approach here?Directly printing serialized messages to stdout might interfere with other logging or output. Would it make sense to use a dedicated logger or provide a way to consume these messages differently, wdyt?
- for message in self.message_repository.consume_queue(): - print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode()) + # Use a dedicated logger or provide a way to consume these messages + for message in self.message_repository.consume_queue(): + self.logger.info(f"Emitted control message: {orjson.dumps(AirbyteMessageSerializer.dump(message)).decode()}")
169-170
: Consider enhancing error loggingThe current error logging only includes the string representation of the exception. Would adding more context and possibly the traceback be helpful for debugging complex issues?
- self.logger.error(f"Error migrating config: {str(e)}") + self.logger.error(f"Error migrating config: {str(e)}", exc_info=True)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/manifest_declarative_source.py
(9 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (2)
292-294
: Good addition of validation before stream creationAdding validation before creating streams is a great safety measure to ensure the config is valid.
395-397
: Nice optimization in the spec methodUsing the cached
_spec_component
to avoid redundant component creation is a good optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR updates the ManifestDeclarativeSource to invoke declarative config migrations, transformations, and validations. Key changes include:
- Adding an optional config_path parameter to ManifestDeclarativeSource and integrating config migration logic in its constructor.
- Updating the Spec.migrate_config method to change its signature and behavior (removing file I/O and control message emission).
- Adjusting unit tests to reflect the modified migration, transformation, and validation flows.
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
File | Description |
---|---|
unit_tests/sources/declarative/test_manifest_declarative_source.py | Added tests to verify migration behavior, control message emission, and transformation of config values. |
unit_tests/sources/declarative/spec/test_spec.py | Removed migration-related tests that are no longer relevant following the changes in Spec. |
airbyte_cdk/sources/declarative/spec/spec.py | Updated config migration logic to simply apply transformations on the provided config without performing file I/O. |
airbyte_cdk/sources/declarative/manifest_declarative_source.py | Refactored the migration logic within init to handle an optional config_path and integrate transformation and validation as part of source initialization. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/yaml_declarative_source.py (1)
29-31
: Consider updating docstring to include the new parameter?The docstring only describes the
path_to_yaml
parameter but doesn't mention the newly addedconfig_path
parameter. Would it be helpful to add documentation for this parameter as well, explaining its purpose for config migration and transformation? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/cli/source_declarative_manifest/_run.py
(2 hunks)airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(2 hunks)airbyte_cdk/sources/declarative/yaml_declarative_source.py
(2 hunks)
✅ Files skipped from review due to trivial changes (1)
- airbyte_cdk/cli/source_declarative_manifest/_run.py
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/yaml_declarative_source.py (2)
27-27
: New parameter added to support config migration and transformation.The new
config_path
parameter aligns with the PR objectives to allow invoking config migrations, transformations, and validations. The change is consistent with similar updates in related classes.
40-40
: Parameter properly forwarded to parent class.The
config_path
parameter is correctly passed to the parent class constructor, which will handle the actual config migration and validation logic.airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
77-77
: New parameter added to support config migration and transformation.The new
config_path
parameter aligns with the PR objectives to allow invoking config migrations, transformations, and validations. The change is consistent with similar updates in related classes.
100-100
: Parameter properly forwarded to parent class.The
config_path
parameter is correctly passed to the parent classManifestDeclarativeSource
constructor, which will handle the actual config migration and validation logic.
68-78
: Parameter added consistently with existing pattern.The new parameter follows the same pattern as other optional parameters in this method, maintaining consistency in the codebase. This change enables the class to properly support configuration migration, transformation, and validation features.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some stylistic suggestions, but I think it's pretty close. nice work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
11-11
: Clean imports - removed unusedcast
The removal of the unused
cast
import keeps the import list clean and aligned with what's actually used in the code.
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
216-234
: Consider simplifying the conditional logic in _migrate_configThe nested conditions seem redundant - we check for
config_path
twice.- if config_path and config: - self._spec_component.migrate_config(mutable_config) - if mutable_config != config: - if config_path: - with open(config_path, "w") as f: - json.dump(mutable_config, f) + if config_path and config: + self._spec_component.migrate_config(mutable_config) + if mutable_config != config: + with open(config_path, "w") as f: + json.dump(mutable_config, f)Also, would it make sense to add error handling for file operations? What happens if the write fails?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/manifest_declarative_source.py
(10 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Analyze (python)
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (5)
13-13
: New imports support config migration functionalityThese new imports enable the config migration workflow:
orjson
for efficient JSON serializationcreate_connector_config_control_message
for emitting config changesAirbyteMessageSerializer
for serializing messages to stdoutThe imports are properly aligned with the new migration functionality being added.
Also applies to: 19-19, 34-34
106-106
: Added config_path parameterThe new
config_path
parameter enables persisting migrated configs back to disk, which is essential for the migration workflow.Also applies to: 116-116
147-154
: Improved spec component handling and config processingThis refactored implementation properly:
- Caches the spec component
- Creates a mutable config copy
- Applies migrations
- Transforms the config
- Stores the result
This is a solid improvement over the previous approach, centralizing the spec processing logic.
291-293
: Good addition: validating config before creating streamsAdding validation before stream creation ensures that configuration errors are caught early, preventing runtime errors during stream creation. This is a great improvement to the error handling flow.
394-396
: Optimized spec generationUsing the cached
_spec_component
when available is a good optimization that avoids recreating the spec component unnecessarily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
219-237
: 🛠️ Refactor suggestionImplement safer comparison in config migration.
The comparison
mutable_config != config
might not behave as expected whenconfig
is None. Consider normalizing both to dictionaries before comparison, or using a more robust comparison approach.def _migrate_config( self, config_path: Optional[str], mutable_config: MutableMapping[str, Any], config: Optional[Mapping[str, Any]], ) -> None: if config_path and config and self._spec_component: self._spec_component.migrate_config(mutable_config) - if mutable_config != config: + # Convert both to dicts for comparison to handle None case + if dict(mutable_config) != dict(config): if config_path: with open(config_path, "w") as f: json.dump(mutable_config, f)Also, the check for
config_path
on line 228 is redundant since it's already checked in the outer if statement on line 225. Simplifying this would make the code cleaner, wdyt?def _migrate_config( self, config_path: Optional[str], mutable_config: MutableMapping[str, Any], config: Optional[Mapping[str, Any]], ) -> None: if config_path and config and self._spec_component: self._spec_component.migrate_config(mutable_config) if dict(mutable_config) != dict(config): - if config_path: - with open(config_path, "w") as f: - json.dump(mutable_config, f) + with open(config_path, "w") as f: + json.dump(mutable_config, f)
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (2)
147-157
: Consider refactoring initialization logic into dedicated helper methods.The initialization logic for
_spec_component
and config migration/transformation could be extracted into dedicated helper methods to improve readability. This would also make the constructor less cluttered with implementation details.def __init__( self, source_config: ConnectionDefinition, *, config: Mapping[str, Any] | None = None, debug: bool = False, emit_connector_builder_messages: bool = False, component_factory: Optional[ModelToComponentFactory] = None, migrate_manifest: Optional[bool] = False, normalize_manifest: Optional[bool] = False, config_path: Optional[str] = None, ) -> None: ... self._source_config = self._pre_process_manifest(dict(source_config)) self._validate_source() self._post_process_manifest() - self._config: Mapping[str, Any] - self._spec_component: Optional[Spec] - spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") - self._spec_component = ( - self._constructor.create_component(SpecModel, spec, dict()) if spec else None - ) - mutable_config = dict(config) if config else {} - self._migrate_config(config_path, mutable_config, config) - if self._spec_component: - self._spec_component.transform_config(mutable_config) - self._config = mutable_config + self._spec_component = self._initialize_spec_component() + self._config = self._initialize_config(config, config_path) ... +def _initialize_spec_component(self) -> Optional[Spec]: + """Initialize the spec component from the manifest's spec block if available.""" + spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") + return self._constructor.create_component(SpecModel, spec, dict()) if spec else None + +def _initialize_config(self, config: Optional[Mapping[str, Any]], config_path: Optional[str]) -> Mapping[str, Any]: + """Initialize and process the configuration, including migration and transformation.""" + mutable_config = dict(config) if config else {} + self._migrate_config(config_path, mutable_config, config) + if self._spec_component: + self._spec_component.transform_config(mutable_config) + return mutable_configWhat do you think?
225-237
: Add docstring to the_migrate_config
method.The
_migrate_config
method lacks a docstring explaining its purpose, parameters, and behavior. Adding a docstring would improve code maintainability.def _migrate_config( self, config_path: Optional[str], mutable_config: MutableMapping[str, Any], config: Optional[Mapping[str, Any]], ) -> None: + """ + Migrates the configuration if necessary and persists changes. + + Args: + config_path: Path to the config file to write migrated config to, if changed + mutable_config: A mutable copy of the configuration to be migrated + config: The original configuration to compare against after migration + + If the configuration is changed during migration, the updated config is: + 1. Written back to the config file if config_path is provided + 2. Emitted as a connector config control message + 3. All queued messages are serialized and printed to stdout + """ if config_path and config and self._spec_component:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/manifest_declarative_source.py
(10 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (5)
11-11
: Thecast
import was removed - improvement confirmed.The updated import statement correctly removes the unused
cast
import while maintaining all the necessary types. This is a good cleanup to keep imports tidy.
13-14
: New imports support config migration and control message functionality.These imports enable the new config migration, transformation, and validation features, especially:
orjson
for efficient JSON serialization- Control message creation
- Message serialization
The imports are appropriately scoped to the required functionality.
Also applies to: 19-20, 34-35
106-106
: New optionalconfig_path
parameter looks good.This parameter allows the connector to persist migrated configuration changes back to the source file, which is essential for the migration functionality.
Also applies to: 116-117
294-296
: Config validation in streams method is a good improvement.Adding validation of the config in the streams method ensures that the config is valid before creating streams, which helps catch configuration errors earlier in the process.
397-399
: Optimization by using cached spec component.The modification to use the cached spec component is a good optimization, avoiding redundant spec component creation. The conditional fallback ensures compatibility when the spec component is not available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few discussion points as mentioned, but doesn't need to be blocking. 🚢
- the coderabbit suggestion to properly catch error handling on the file write
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
225-226
: 🛠️ Refactor suggestionAdd error handling for file operations
The file write operation lacks error handling, which could lead to unhandled exceptions if the file cannot be written (e.g., due to permission issues or disk space).
- with open(config_path, "w") as f: - json.dump(mutable_config, f) + try: + with open(config_path, "w") as f: + json.dump(mutable_config, f) + except (IOError, OSError) as e: + self.logger.error(f"Failed to write migrated config to {config_path}: {e}") + # Continue processing even if file write fails
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (2)
223-233
: Add logging for the config migration processWhen a config is migrated, it would be helpful to log this event, especially since it triggers file writes and message emissions. This aids debugging and provides an audit trail of configuration changes.
if mutable_config != config: + self.logger.info("Config migration applied - updating configuration") if config_path: with open(config_path, "w") as f: json.dump(mutable_config, f) + self.logger.debug(f"Migrated config written to {config_path}") self.message_repository.emit_message( create_connector_config_control_message(mutable_config) ) # We have no mechanism for consuming the queue, so we print the messages to stdout for message in self.message_repository.consume_queue(): print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
215-237
: Consider refactoring the migration logic into smaller functionsThe
_migrate_and_transform_config
method handles multiple responsibilities: migrating, saving, message emission, and transformation. Breaking these down into smaller, focused methods would improve readability and testability.What do you think about refactoring this into smaller methods like:
_migrate_config
- Just handles migration_persist_migrated_config
- Handles saving and message emission_transform_config
- Handles transformationThis would make each step more testable and maintainable. wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/manifest_declarative_source.py
(10 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (4)
11-11
: Clean import statement!The removal of unused
cast
from imports helps maintain clean code - nice optimization.
147-153
: Well-structured initialization of spec component and configInitializing
_spec_component
once in the constructor is a good optimization to avoid recreating it in thespec()
method. The delegation to a dedicated config migration method also keeps the constructor clean.
294-296
: Great addition of config validation in streams methodAdding validation before creating streams ensures the config is valid at runtime. This helps catch configuration issues early and provides better error messages to users.
397-399
: Nice optimization in spec methodUsing the cached
_spec_component
instead of recreating it is a good optimization that avoids unnecessary object creation.
What
config_path
param toManifestDeclarativeSource
, then builds the spec component in the source's__init__
and then migrates config, if needed.streams()
methodSummary by CodeRabbit
New Features
Bug Fixes
Tests
Chores