Skip to content

feat: destination discover PoC and adding sync modes #527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented May 1, 2025

What

This PR contains various changes needed for DA. The changes are:

  • Cleanup to allow destinations to call parse_args without having to instantiate a destination object (I also added support to debug log while doing this)
  • Adding the discover command for destinations
  • Adding more features to build a json schema:
    • Ability to set allow_additional_properties == False
    • Ability to add other fields than type within each property objects
  • Ability to configure DestinationSyncMode in the configured catalog
  • Fix regarding HTTP matcher

For more context, the following PRs can be useful:

Summary by CodeRabbit

  • New Features
    • Added a "discover" command to support catalog discovery for destinations.
    • Enabled configurable handling of additional properties in dynamic JSON schemas, including support for property field inferrers.
    • Allowed setting destination sync mode and object name in stream configuration.
    • Extended Airbyte message format to include destination catalog information.
  • Bug Fixes
    • Improved JSON parsing error handling in HTTP request comparison to prevent exceptions.
  • Tests
    • Added tests for additional property inferrer and schema generation scenarios.
    • Expanded request matcher tests to handle non-mapping bodies.
    • Updated destination argument parsing tests to verify debug flag inclusion.
  • Documentation
    • Marked the additional property fields inferrer as experimental and deprecated.

@@ -72,7 +72,11 @@ def _to_mapping(
elif isinstance(body, bytes):
return json.loads(body.decode()) # type: ignore # assumes return type of Mapping[str, Any]
elif isinstance(body, str):
return json.loads(body) # type: ignore # assumes return type of Mapping[str, Any]
try:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this addition, depending on the order of evaluation, test test_given_on_match_is_mapping_but_not_input_when_matches_then_return_false would fail

@maxi297 maxi297 requested a review from aaronsteers May 2, 2025 15:20
@maxi297
Copy link
Contributor Author

maxi297 commented May 2, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@maxi297 maxi297 changed the title [WIP] destination discover PoC [WIP] destination discover PoC and adding sync modes May 2, 2025
@maxi297 maxi297 requested a review from lazebnyi May 5, 2025 19:04
@maxi297 maxi297 marked this pull request as ready for review May 5, 2025 19:06
Copy link
Contributor

coderabbitai bot commented May 5, 2025

📝 Walkthrough

Walkthrough

This update adds a "discover" command to the Destination CLI, refactors argument parsing, introduces an abstract discover() method, and extends command handling. It also adds an inferrer mechanism and configurable additional_properties in DynamicSchemaLoader, enhances test coverage for these features, improves error handling in HTTP request tests, and extends test utilities for destination sync modes and schema scenarios. Additionally, it updates imports and dependencies accordingly.

Changes

File(s) Change Summary
airbyte_cdk/destinations/destination.py Added "discover" CLI command; refactored argument parsing into a top-level function; introduced abstract discover() method; extended run_cmd() to handle "discover" and yield catalog messages.
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py Added AdditionalPropertyFieldsInferrer abstract class; added inferrer and allow_additional_properties options; modified _transform() and get_json_schema() to incorporate additional fields and configurable additionalProperties.
airbyte_cdk/test/catalog_builder.py Added with_destination_sync_mode() and with_destination_object_name() methods; changed default sync_mode in with_stream() to SyncMode.full_refresh.
airbyte_cdk/test/mock_http/request.py Improved error handling in _to_mapping() to catch JSON decode errors and return None.
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py Added helper for mock retriever, TestAdditionalPropertyFieldsInferrer, and tests for additional property inferrer and allow_additional_properties.
unit_tests/test/mock_http/test_request.py Added test for mismatched body types when the expected body is a mapping but the actual body is a string.
unit_tests/destinations/test_destination.py Updated test parameterization to include "debug": False in expected parsed args for CLI commands.
airbyte_cdk/models/__init__.py Added imports for DestinationCatalog and DestinationOperation from airbyte_protocol.
pyproject.toml Changed dependency for airbyte-protocol-models-dataclasses to a fixed exact version "0.16.0".
airbyte_cdk/models/airbyte_protocol_serializers.py Reorganized imports: moved serpyco_rs import below protocol dataclasses imports.
airbyte_cdk/logger.py Updated import sources for Airbyte models and serializers to airbyte_protocol_dataclasses.models, reordered imports for clarity.
airbyte_cdk/models/airbyte_protocol.py Added custom AirbyteStateBlob class to fix protocol issues; redeclared related state dataclasses; extended AirbyteMessage with optional destination_catalog field.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant CLI
    participant Destination
    participant discover()
    participant AirbyteCatalog

    User->>CLI: Run "discover" command with --config
    CLI->>Destination: parse_args("discover", --config)
    CLI->>Destination: run_cmd(parsed_args)
    Destination->>discover(): discover()
    discover()-->>Destination: AirbyteCatalog
    Destination->>CLI: yield AirbyteMessage(type=CATALOG, catalog)
    CLI->>User: Output catalog message
Loading
sequenceDiagram
    participant Loader as DynamicSchemaLoader
    participant Inferrer as AdditionalPropertyFieldsInferrer

    Loader->>Loader: get_json_schema()
    alt additional_property_fields_inferrer is set
        Loader->>Inferrer: infer(property_definition)
        Inferrer-->>Loader: additional_fields
        Loader->>Loader: Merge additional_fields into property_definition
    end
    Loader->>Loader: Set "additionalProperties" per allow_additional_properties
    Loader-->>Caller: Return JSON schema
Loading

Suggested reviewers

  • aldogonzalez8

Would you like to add more documentation or usage examples for the new "discover" command and schema inferrer, wdyt?

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (6)
airbyte_cdk/test/catalog_builder.py (1)

37-41: Add minimal doc-string and type hint to the new builder helper?

with_destination_sync_mode() is handy! For consistency with with_sync_mode(), would you like to:

  1. add a short doc-string describing the param, and
  2. return self with an explicit -> "ConfiguredAirbyteStreamBuilder" (already present but worth keeping)?

Small thing, but it keeps the fluent API self-documenting – wdyt?

airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (2)

185-191: Potential key override when merging inferred fields

value.update(extra_fields) will silently overwrite keys such as "type" if an inferrer returns a colliding key.
Should we detect collisions and either raise or log a warning to avoid hard-to-debug schema issues? Something like:

for k in extra_fields:
    if k in value:
        logger.warning("Overwriting %s in inferred property fields", k)
value.update(extra_fields)

Would that be helpful, wdyt?


295-303: Minor typing nit – remove the type: ignore?

Since _get_airbyte_type now returns Dict[str, Any], the mypy ignore on the deepcopy return may no longer be needed.
Could we try dropping the # type: ignore and see if the type checker is happy? wdyt?

airbyte_cdk/destinations/destination.py (2)

30-90: Duplicated CLI parser – reuse existing helper to avoid drift?

The new parse_args() is almost identical to the one in airbyte_cdk/entrypoint.py.
Would it make sense to extract a shared helper (e.g., cdk.cli_utils.parse_destination_args) so that future flag changes don’t need to be duplicated in two places? wdyt?


138-141: Static wrapper seems redundant

Since the free-function parse_args already exists in the module, the @staticmethod wrapper could be removed and callers can import the function directly, reducing one level of indirection.
Is the extra layer intentional for API stability, or could we drop it, wdyt?

unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)

426-429: Well-structured mock helper function.

This utility function elegantly creates a mock Retriever that returns the specified HTTP response body, making the new tests cleaner and more readable. What do you think about adding a docstring to clarify its purpose for future contributors? wdyt?

def _mock_schema_loader_retriever(http_response_body) -> Retriever:
+    """
+    Create a mock Retriever that returns the specified HTTP response body.
+    
+    Args:
+        http_response_body: The HTTP response body to be returned by the retriever
+        
+    Returns:
+        A mock Retriever instance
+    """
    retriever = Mock(spec=Retriever)
    retriever.read_records.return_value = iter([http_response_body])
    return retriever
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ce2a7bb and f2c993e.

📒 Files selected for processing (6)
  • airbyte_cdk/destinations/destination.py (4 hunks)
  • airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (8 hunks)
  • airbyte_cdk/test/catalog_builder.py (2 hunks)
  • airbyte_cdk/test/mock_http/request.py (1 hunks)
  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (3 hunks)
  • unit_tests/test/mock_http/test_request.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
airbyte_cdk/test/mock_http/request.py (1)
airbyte_cdk/test/mock_http/response.py (1)
  • body (19-20)
airbyte_cdk/destinations/destination.py (6)
airbyte_cdk/entrypoint.py (2)
  • parse_args (65-118)
  • discover (224-233)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
  • discover (175-181)
unit_tests/test_entrypoint.py (1)
  • discover (55-56)
airbyte_cdk/sources/abstract_source.py (1)
  • discover (85-90)
unit_tests/sources/test_source.py (2)
  • discover (47-48)
  • catalog (70-93)
airbyte_cdk/models/airbyte_protocol.py (1)
  • AirbyteMessage (79-88)
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • ComplexFieldType (806-808)
airbyte_cdk/sources/source.py (1)
  • ExperimentalClassWarning (24-25)
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)
  • infer (436-437)
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (12)
airbyte_cdk/test/mock_http/request.py (1)

75-79: Excellent error handling improvement!

This is a great addition for making the code more robust by handling JSON decode errors gracefully. Instead of raising an exception when a string isn't valid JSON, returning None is appropriate since the comparison should fail anyway when one body is a mapping and another isn't. This change helps prevent test failures that would happen depending on the order of evaluation.

The comment you included also explains the reasoning clearly, which is helpful for future maintainers.

unit_tests/test/mock_http/test_request.py (1)

160-165: Good test case addition!

This test effectively verifies that when one request has a mapping body and the other has a non-JSON string body, the matches method correctly returns False. It's a nice complement to the error handling added in the _to_mapping method.

It's worth noting that without the error handling added to the _to_mapping method, this test could fail depending on which request is evaluated first when checking for a match. Great job on covering this edge case!

airbyte_cdk/test/catalog_builder.py (1)

5-6: Import looks good – just a quick check on dependency boundaries

Nice addition 👍. Just make sure DestinationSyncMode is available in all test environments (some downstream projects run the test helpers without the full protocol package).
Maybe add a defensive try / except ImportError around the import, or document the extra requirement, wdyt?

airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (2)

160-162: Default for allow_additional_properties – safe by default?

The new flag defaults to True. For many connectors we’d like schemas to be strict unless the author explicitly opts-in.
Have we considered defaulting to False to follow “secure by default” principles, or at least logging a warning when it’s True, wdyt?


196-200: Great touch exposing additionalProperties

Just a 👍 that wiring this flag straight into the output schema is elegant and keeps the JSON Schema minimal.

airbyte_cdk/destinations/destination.py (1)

164-166: Forward config to discover() for symmetry

If we adopt the earlier suggestion to pass config, we’d need:

-            yield AirbyteMessage(type=Type.CATALOG, catalog=self.discover())
+            yield AirbyteMessage(type=Type.CATALOG, catalog=self.discover(config=config))

Keeps runtime behaviour consistent – wdyt?

unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (6)

7-8: Imports properly updated for the new functionality.

The additional imports make sense for supporting the new test features related to type handling in additional properties and mocking. Good job keeping imports organized.


18-26: Comprehensive imports for the new schema testing capabilities.

Clear organization of imports to support the new testing functionality. The explicit import of AdditionalPropertyFieldsInferrer makes the code's intent easier to follow.


32-32: Good use of a constant for parameters.

Creating a consistent _ANY_PARAMETERS constant helps maintain uniformity across tests. Nice touch!


432-437: Well-implemented test inferrer class.

This test implementation of AdditionalPropertyFieldsInferrer is clean and focused - it returns predefined properties which simplifies testing. The interface is clear and follows good practices.


476-507: Thorough test for additional properties configuration.

The test clearly demonstrates that when allow_additional_properties=False and no inferrer is provided, the schema correctly sets additionalProperties: False while maintaining the expected property definitions.

The setup is clean with good separation of concerns between this test and the previous one, allowing for focused testing of each feature.


440-473:

✅ Verification successful

Comprehensive test for property field inferrer functionality.

This test effectively verifies that additional property fields are correctly merged into the schema and that additionalProperties is set to False when configured. The use of dictionary union operator on line 447 is clean and modern.

Note: The dictionary merge operator (|) requires Python 3.9+. Is this compatibility requirement intentional and documented elsewhere in the project? wdyt?


🏁 Script executed:

#!/bin/bash
# Check the required Python version in setup.py or similar files

echo "Checking Python version requirements..."
cat setup.py | grep -E "python_requires|python_version"

Length of output: 187


Let’s check if the project actually pins Python 3.9+ in its config files (e.g. pyproject.toml, setup.cfg, tox.ini):


🏁 Script executed:

#!/bin/bash
# Searching for Python version constraints in project config files
echo "Searching for 'python_requires' or 'requires-python' across the repository..."
rg -n 'python_requires|requires-python' .
echo "Searching for version specifiers '>=3.' across the repository..."
rg -n '>= *3\.[0-9]+' .

Length of output: 21953


Python 3.9+ support is already guaranteed

The project’s pyproject.toml specifies python = ">=3.10,<3.13", so using the | operator is fully supported and intentional. No further changes needed here.

• See pyproject.toml (line 32): python = ">=3.10,<3.13"

Comment on lines +116 to +146
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class AdditionalPropertyFieldsInferrer(ABC):
"""
Infers additional fields to be added to each property. For example, if this inferrer returns {"toto": "tata"}, a property that would have looked like this:
```
"properties": {
"Id": {
"type": ["null", "string"],
},
<...>
}
```
... will look like this:
```
"properties": {
"Id": {
"type": ["null", "string"],
"toto": "tata"
},
<...>
}
```
"""

@abstractmethod
def infer(self, property_definition: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""
Infers additional property fields from the given property definition.
"""
pass

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider making the inferrer contract read-only

infer() receives a mutable property_definition. Accidental in-place edits by an inferrer could leak into the final schema.
Would switching the parameter type from MutableMapping to Mapping (and passing a copy) help protect against that?

-    def infer(self, property_definition: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
+    def infer(self, property_definition: Mapping[str, Any]) -> Mapping[str, Any]:

That way implementers must return a new dict, reducing side-effects – wdyt?

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class AdditionalPropertyFieldsInferrer(ABC):
"""
Infers additional fields to be added to each property. For example, if this inferrer returns {"toto": "tata"}, a property that would have looked like this:
```
"properties": {
"Id": {
"type": ["null", "string"],
},
<...>
}
```
... will look like this:
```
"properties": {
"Id": {
"type": ["null", "string"],
"toto": "tata"
},
<...>
}
```
"""
@abstractmethod
def infer(self, property_definition: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""
Infers additional property fields from the given property definition.
"""
pass
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class AdditionalPropertyFieldsInferrer(ABC):
"""
Infers additional fields to be added to each property. For example, if this inferrer returns {"toto": "tata"}, a property that would have looked like this:

@maxi297 maxi297 changed the title [WIP] destination discover PoC and adding sync modes feat: destination discover PoC and adding sync modes May 5, 2025
Comment on lines +138 to +140
@staticmethod
def parse_args(args: List[str]) -> argparse.Namespace:
return parse_args(args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of patterns, I think my preference here and proposed best best practice (eventually) is that we make this a class method on the base connector class. I would call it "launch" and then every connector class could be able to invoke itself. In theory, all sources and destinations could share the same implementation across connectors of the same type, and unless the connector needs something special (a code smell anyway), the "cls" input arg should be all you need in order to instantiate a connector using CLI args

I don't think we need to tackle all the scope here in this PR, but your implementation is already very close to what I think is the ideal state, so I'll mention it here as a non-blocking proposal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me. I started diverging a bit from the current solution because the run_cmd it is not static so we need to instantiate the object before calling it but the way we instantiate the destination depends on how the method of the protocal that is called (see this piece of code for an example).

So do we agree that the launch method would be static? If so, I think we are moving the right direction. I can start adding this in later changes. If not, I'll need more information about what you had in mind.

Copy link
Contributor

@aaronsteers aaronsteers May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxi297 - Difference b/w static method and class method for this use case is just the a class method knows what class it is (static methods don't get a cls input), and that class method implementations can be inherited by subclasses. So, yes, agreed method is static in a sense that it doesn't need the object, but in order to not need to implement on each class, I think making a class method is slightly cleaner in the long run. Sorry if I'm not good at explaining this, and please don't block on my comment - either way, it's a step in the right direction, I think, if the class knows how to instantiate itself, without needing an external class or function in order to be instantiated. 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxi297 - Here's the early implementation I wrote for S3 a while back...

    @classmethod
    def launch(cls, args: list[str] | None = None) -> None:
        """Launch the source using the provided CLI args.

        If no args are provided, the launch args will be inferred automatically.

        In the future, we should consider moving this method to the Connector base class,
        so that all sources and destinations can launch themselves and so none of this
        code needs to live in the connector itself.
        """
        args = args or sys.argv[1:]
        catalog_path = AirbyteEntrypoint.extract_catalog(args)
        # TODO: Delete if not needed:
        # config_path = AirbyteEntrypoint.extract_config(args)
        # state_path = AirbyteEntrypoint.extract_state(args)

        source = cls.create(
            configured_catalog_path=Path(catalog_path) if catalog_path else None,
        )
        # The following function will wrap the execution in proper error handling.
        # Failures prior to here may not emit proper Airbyte TRACE or CONNECTION_STATUS messages.
        launch(
            source=source,
            args=args,
        )

This wasn't a very good implementation, but you can see the generic cls.create() ref.

Where cls.create() was defined as:

    @classmethod
    def create(
        cls,
        *,
        configured_catalog_path: Path | str | None = None,
    ) -> SourceS3:
        """Create a new instance of the source.
        ...
        # A bunch of hacky stuff here.
        ...
        return cls(
            # These are the defaults for the source. No need for a caller to change them:
            stream_reader=SourceS3StreamReader(),
            spec_class=Config,
            cursor_cls=Cursor,
            # This is needed early. (We also will provide it again later.)
            catalog=configured_catalog,
            # These will be provided later, after we have wrapped proper error handling.
            config=None,
            state=None,
        )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem with our current design is that we ask for config, catalog, etc in the constructor, but then pass it again during invocation of 'check', 'discover', etc. I think it would be better to not need these in the constructor at all, in which case we greatly simplify the process of creating a connector class, and we put all code that can fail in a code path that can properly message about any failures.

Again - hopefully this is helpful for long-term thinking, but I don't think it needs to block the current PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

classmethod makes sense, yes!

And I would prefer to have these as part of the constructor personally as if we don't, each method will need to instantiate the object it needs every time is called or have some kind on intelligent accessor which would check if the field has already been instantiated and if not instantiate it. It feels complex instead of just instantiating the connector properly in launch. WDYT?

@github-actions github-actions bot added the enhancement New feature or request label May 6, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
unit_tests/destinations/test_destination.py (1)

1-1: ⚠️ Potential issue

Fix Ruff formatting issues

There's a pipeline failure indicating Ruff formatting issues. Could you run ruff format to fix the code style issues?

#!/bin/bash
# Run Ruff formatter to fix code style issues
ruff format unit_tests/destinations/test_destination.py
🧰 Tools
🪛 GitHub Actions: Linters

[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.

🧹 Nitpick comments (1)
unit_tests/destinations/test_destination.py (1)

49-54: Consider testing the debug flag when explicitly set to True

The test cases are verifying the default debug=False behavior, but there's no test case that verifies what happens when debug=True is explicitly set. Would it be helpful to add a test case like this to verify the flag works correctly when enabled, wdyt?

        [
            (["spec"], {"command": "spec", "debug": False}),
+           (["spec", "--debug"], {"command": "spec", "debug": True}),
            (["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}),
            (
                ["write", "--config", "config_path1", "--catalog", "catalog_path1"],
                {"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False},
            ),
        ],
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f2c993e and c6c8d8f.

📒 Files selected for processing (1)
  • unit_tests/destinations/test_destination.py (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
unit_tests/destinations/test_destination.py

[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)

Comment on lines +49 to 54
(["spec"], {"command": "spec", "debug": False}),
(["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}),
(
["write", "--config", "config_path1", "--catalog", "catalog_path1"],
{"command": "write", "config": "config_path1", "catalog": "catalog_path1"},
{"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False},
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Test coverage gap for the new "discover" command

I notice the test parameterization has been updated to include "debug": False in the expected outputs, which aligns with your new debug logging functionality. However, I don't see any test cases for the new discover command mentioned in the PR summary. Should we add a test case for this new functionality, wdyt?

        [
            (["spec"], {"command": "spec", "debug": False}),
            (["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}),
            (
                ["write", "--config", "config_path1", "--catalog", "catalog_path1"],
                {"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False},
            ),
+           (
+               ["discover", "--config", "config_path1"],
+               {"command": "discover", "config": "config_path1", "debug": False},
+           ),
        ],
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
(["spec"], {"command": "spec", "debug": False}),
(["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}),
(
["write", "--config", "config_path1", "--catalog", "catalog_path1"],
{"command": "write", "config": "config_path1", "catalog": "catalog_path1"},
{"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False},
),
[
(["spec"], {"command": "spec", "debug": False}),
(["check", "--config", "bogus_path/"], {"command": "check", "config": "bogus_path/", "debug": False}),
(
["write", "--config", "config_path1", "--catalog", "catalog_path1"],
{"command": "write", "config": "config_path1", "catalog": "catalog_path1", "debug": False},
),
(
["discover", "--config", "config_path1"],
{"command": "discover", "config": "config_path1", "debug": False},
),
],


return {
"$schema": "https://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💎 Nice. Appreciate that we're following JSON Schema standards for communicating this. 👍

Copy link
Contributor

@aaronsteers aaronsteers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxi297 - I have reviewed all the code here and it looks good. 👍

Thanks for responding to my comments - and again, nothing blocking there.

What I didn't find was a definition of DestinationSyncMode or any changes to sync modes. I did want to review those - can you point me to them?

Comment on lines +138 to +140
@staticmethod
def parse_args(args: List[str]) -> argparse.Namespace:
return parse_args(args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxi297 - Here's the early implementation I wrote for S3 a while back...

    @classmethod
    def launch(cls, args: list[str] | None = None) -> None:
        """Launch the source using the provided CLI args.

        If no args are provided, the launch args will be inferred automatically.

        In the future, we should consider moving this method to the Connector base class,
        so that all sources and destinations can launch themselves and so none of this
        code needs to live in the connector itself.
        """
        args = args or sys.argv[1:]
        catalog_path = AirbyteEntrypoint.extract_catalog(args)
        # TODO: Delete if not needed:
        # config_path = AirbyteEntrypoint.extract_config(args)
        # state_path = AirbyteEntrypoint.extract_state(args)

        source = cls.create(
            configured_catalog_path=Path(catalog_path) if catalog_path else None,
        )
        # The following function will wrap the execution in proper error handling.
        # Failures prior to here may not emit proper Airbyte TRACE or CONNECTION_STATUS messages.
        launch(
            source=source,
            args=args,
        )

This wasn't a very good implementation, but you can see the generic cls.create() ref.

Where cls.create() was defined as:

    @classmethod
    def create(
        cls,
        *,
        configured_catalog_path: Path | str | None = None,
    ) -> SourceS3:
        """Create a new instance of the source.
        ...
        # A bunch of hacky stuff here.
        ...
        return cls(
            # These are the defaults for the source. No need for a caller to change them:
            stream_reader=SourceS3StreamReader(),
            spec_class=Config,
            cursor_cls=Cursor,
            # This is needed early. (We also will provide it again later.)
            catalog=configured_catalog,
            # These will be provided later, after we have wrapped proper error handling.
            config=None,
            state=None,
        )

Comment on lines +138 to +140
@staticmethod
def parse_args(args: List[str]) -> argparse.Namespace:
return parse_args(args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem with our current design is that we ask for config, catalog, etc in the constructor, but then pass it again during invocation of 'check', 'discover', etc. I think it would be better to not need these in the constructor at all, in which case we greatly simplify the process of creating a connector class, and we put all code that can fail in a code path that can properly message about any failures.

Again - hopefully this is helpful for long-term thinking, but I don't think it needs to block the current PR.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
pyproject.toml (1)

94-98: Document Custom Source Rationale
The testpypi supplemental source is added solely to fetch the dev protocol-models-dataclasses package. Could you add a brief comment above this block explaining that it's temporary until the next official release? This will help future maintainers understand its purpose. wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c6c8d8f and af0640c.

⛔ Files ignored due to path filters (1)
  • poetry.lock is excluded by !**/*.lock
📒 Files selected for processing (4)
  • airbyte_cdk/destinations/destination.py (4 hunks)
  • airbyte_cdk/models/__init__.py (1 hunks)
  • airbyte_cdk/test/catalog_builder.py (3 hunks)
  • pyproject.toml (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • airbyte_cdk/models/init.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/test/catalog_builder.py
  • airbyte_cdk/destinations/destination.py
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: preview_docs
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
pyproject.toml (1)

33-33:

✅ Verification successful

Verify Dev Pre-release Version Usage
You've pinned airbyte-protocol-models-dataclasses to a specific dev build (0.15.0.dev1746621859) from the testpypi index. Would you consider confirming that this version is stable and available (and that it won't get yanked), or switching to a released SemVer once it's out? wdyt?


🏁 Script executed:

#!/bin/bash
# Check if the dev version exists on testpypi
pip index versions airbyte-protocol-models-dataclasses \
  --extra-index-url https://test.pypi.org/simple/ | grep "0.15.0.dev1746621859"

Length of output: 258


Double-check Test PyPI for the dev version
It looks like the pip index check didn’t surface 0.15.0.dev1746621859—let’s hit the Test PyPI JSON API directly to confirm whether this build is actually published (or if we should reconsider pinning to it). wdyt?


🏁 Script executed:

#!/bin/bash
# Verify if the specific dev version exists on Test PyPI’s JSON endpoint
curl -s https://test.pypi.org/pypi/airbyte-protocol-models-dataclasses/json \
  | grep '"0.15.0.dev1746621859"'

Length of output: 65646


Dev build confirmed—0.15.0.dev1746621859 is live and not yanked

  • pyproject.toml (line 33): Test PyPI JSON metadata shows "version":"0.15.0.dev1746621859" and "yanked":false.

Looks safe to pin—would you still like to switch to the first official 0.15.0 release once it’s out? wdyt?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/models/airbyte_protocol_serializers.py (1)

29-39: The DestinationOperationType implementation follows good patterns, but could we clarify the schema field handling?

The class correctly mirrors the structure of AirbyteStateBlobType, and the special handling for the schema_schema field mapping addresses what appears to be a dataclass naming constraint. However, would it be helpful to expand the comment to explain why this field mapping is necessary? For example, is schema a reserved word in the dataclass context, or does it conflict with a method name?

Also, should we consider adding some basic validation in the deserialize method to ensure the input dictionary has the expected structure, wdyt?

-        # one field is named `schema` in the DestinationOperation which renders it as `schema_`. We need to reserialize this properly
+        # The `schema` field is automatically renamed to `schema_` in the dataclass to avoid conflicts.
+        # During serialization, we need to map it back to the original `schema` field name.
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8c412b4 and 94209aa.

📒 Files selected for processing (1)
  • airbyte_cdk/models/airbyte_protocol_serializers.py (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/models/airbyte_protocol_serializers.py (2)

4-14: Import reorganization looks clean!

The move to import from airbyte_protocol_dataclasses.models and the addition of DestinationOperation align well with the PR's objective to support destination functionality. The serpyco_rs imports are properly positioned.


41-46: Custom type resolver extension looks solid!

The addition of the DestinationOperation case maintains consistency with the existing AirbyteStateBlob handling and properly returns the new custom type instance.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/models/airbyte_protocol.py (1)

42-47: Consider adding type validation for positional arguments?

The current implementation assumes that all positional arguments are dict-like objects that can be passed to update(). Would it be safer to add some type checking here, wdyt? For example:

 def __init__(self, *args: Any, **kwargs: Any) -> None:
     # Set any attribute passed in through kwargs
     for arg in args:
+        if not hasattr(arg, 'items'):
+            raise TypeError(f"Positional argument must be dict-like, got {type(arg)}")
         self.__dict__.update(arg)
     for key, value in kwargs.items():
         setattr(self, key, value)

This would prevent runtime errors if non-dict objects are passed as positional arguments.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f9a7ed2 and 0f785c1.

📒 Files selected for processing (2)
  • airbyte_cdk/models/airbyte_protocol.py (2 hunks)
  • airbyte_cdk/models/airbyte_protocol_serializers.py (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • airbyte_cdk/models/airbyte_protocol_serializers.py
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: Publish SDM to DockerHub
  • GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/models/airbyte_protocol.py (3)

5-8: Clear explanation for the workaround!

The comment effectively explains why this redefinition is necessary and provides helpful context about the TypeError issue. This will help future maintainers understand the reasoning behind this approach.


49-54: The equality method looks solid!

The __eq__ implementation correctly handles type checking and compares the internal dictionaries. The explicit False return for non-AirbyteStateBlob instances is a good defensive programming practice.


93-93: New destination_catalog field aligns well with PR objectives!

This addition supports the new destination discovery feature mentioned in the PR. The optional typing and type ignore comment are consistent with the existing pattern in this dataclass.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants