diff --git a/airbyte_cdk/connector_builder/README.md b/airbyte_cdk/cli/connector_builder_test_read/README.md similarity index 100% rename from airbyte_cdk/connector_builder/README.md rename to airbyte_cdk/cli/connector_builder_test_read/README.md diff --git a/airbyte_cdk/cli/connector_builder_test_read/__init__.py b/airbyte_cdk/cli/connector_builder_test_read/__init__.py new file mode 100644 index 000000000..8e3879592 --- /dev/null +++ b/airbyte_cdk/cli/connector_builder_test_read/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""CLI for running test reads from the Airbyte Connector Builder. + +This CLI accepts a config file and an action to perform on the connector. + +Usage: + connector-builder-test-read [--config CONFIG] [--action ACTION] + +Options: + --action ACTION The action to perform (e.g., test, validate). + --config CONFIG Path to the config file. +""" +from airbyte_cdk.cli.connector_builder_test_read.run import run + +__all__ = [ + "run", +] diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/cli/connector_builder_test_read/connector_builder_handler.py similarity index 96% rename from airbyte_cdk/connector_builder/connector_builder_handler.py rename to airbyte_cdk/cli/connector_builder_test_read/connector_builder_handler.py index e6c9a3f3f..ea3137336 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/cli/connector_builder_test_read/connector_builder_handler.py @@ -16,6 +16,9 @@ from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( + INJECTED_MANIFEST, +) from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, ) @@ -50,7 +53,7 @@ def get_limits(config: Mapping[str, Any]) -> TestReadLimits: def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> ManifestDeclarativeSource: - manifest = config["__injected_declarative_manifest"] + manifest = config[INJECTED_MANIFEST] return ManifestDeclarativeSource( config=config, emit_connector_builder_messages=True, diff --git a/airbyte_cdk/connector_builder/message_grouper.py b/airbyte_cdk/cli/connector_builder_test_read/message_grouper.py similarity index 100% rename from airbyte_cdk/connector_builder/message_grouper.py rename to airbyte_cdk/cli/connector_builder_test_read/message_grouper.py diff --git a/airbyte_cdk/connector_builder/models.py b/airbyte_cdk/cli/connector_builder_test_read/models.py similarity index 100% rename from airbyte_cdk/connector_builder/models.py rename to airbyte_cdk/cli/connector_builder_test_read/models.py diff --git a/airbyte_cdk/connector_builder/main.py b/airbyte_cdk/cli/connector_builder_test_read/run.py similarity index 96% rename from airbyte_cdk/connector_builder/main.py rename to airbyte_cdk/cli/connector_builder_test_read/run.py index e122cee8c..029a22966 100644 --- a/airbyte_cdk/connector_builder/main.py +++ b/airbyte_cdk/cli/connector_builder_test_read/run.py @@ -96,12 +96,17 @@ def handle_request(args: List[str]) -> str: ).decode() # type: ignore[no-any-return] # Serializer.dump() always returns AirbyteMessage -if __name__ == "__main__": +def run(args: List[str] | None) -> None: + args = args or sys.argv[1:] try: - print(handle_request(sys.argv[1:])) + print(handle_request(args)) except Exception as exc: error = AirbyteTracedException.from_exception( exc, message=f"Error handling request: {str(exc)}" ) m = error.as_airbyte_message() print(orjson.dumps(AirbyteMessageSerializer.dump(m)).decode()) + + +if __name__ == "__main__": + run() diff --git a/airbyte_cdk/connector_builder/__init__.py b/airbyte_cdk/connector_builder/__init__.py deleted file mode 100644 index c941b3045..000000000 --- a/airbyte_cdk/connector_builder/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index efc779464..6b75368c5 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -55,7 +55,11 @@ class ManifestDeclarativeSource(DeclarativeSource): - """Declarative source defined by a manifest of low-code components that define source connector behavior""" + """Declarative source defined by a manifest of low-code components that define source connector behavior. + + This class can also handle custom components if they are provided in the config using the + `__injected_components_py` config key. + """ def __init__( self, diff --git a/airbyte_cdk/test/declarative/__init__.py b/airbyte_cdk/test/declarative/__init__.py new file mode 100644 index 000000000..49ae12e5a --- /dev/null +++ b/airbyte_cdk/test/declarative/__init__.py @@ -0,0 +1,18 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Declarative tests framework. + +This module provides fixtures and utilities for testing Airbyte sources and destinations +in a declarative way. +""" + +from airbyte_cdk.test.declarative.test_suites import ( + ConnectorTestSuiteBase, + DestinationTestSuiteBase, + SourceTestSuiteBase, +) + +__all__ = [ + "ConnectorTestSuiteBase", + "DestinationTestSuiteBase", + "SourceTestSuiteBase", +] diff --git a/airbyte_cdk/test/declarative/connector_interfaces.py b/airbyte_cdk/test/declarative/connector_interfaces.py new file mode 100644 index 000000000..d907c6a23 --- /dev/null +++ b/airbyte_cdk/test/declarative/connector_interfaces.py @@ -0,0 +1,16 @@ +from dataclasses import dataclass +from typing import Protocol, Type + + +class ConnectorInterface(Protocol): + """Protocol for Airbyte connectors.""" + + @classmethod + def launch(cls, args: list[str] | None): ... + + +@dataclass +class PythonWrapper: + """Wrapper for Python source and destination connectors.""" + + connector_class: Type["ConnectorInterface"] diff --git a/airbyte_cdk/test/declarative/models/__init__.py b/airbyte_cdk/test/declarative/models/__init__.py new file mode 100644 index 000000000..8066c0fb7 --- /dev/null +++ b/airbyte_cdk/test/declarative/models/__init__.py @@ -0,0 +1,7 @@ +from scenario import ( + AcceptanceTestScenario, +) + +__all__ = [ + "AcceptanceTestScenario", +] diff --git a/airbyte_cdk/test/declarative/models/scenario.py b/airbyte_cdk/test/declarative/models/scenario.py new file mode 100644 index 000000000..3270b2c9e --- /dev/null +++ b/airbyte_cdk/test/declarative/models/scenario.py @@ -0,0 +1,48 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Run acceptance tests in PyTest. + +These tests leverage the same `acceptance-test-config.yml` configuration files as the +acceptance tests in CAT, but they run in PyTest instead of CAT. This allows us to run +the acceptance tests in the same local environment as we are developing in, speeding +up iteration cycles. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Literal + +import yaml +from pydantic import BaseModel + + +class AcceptanceTestScenario(BaseModel): + """Acceptance test instance, as a Pydantic model. + + This class represents an acceptance test instance, which is a single test case + that can be run against a connector. It is used to deserialize and validate the + acceptance test configuration file. + """ + + class AcceptanceTestExpectRecords(BaseModel): + path: Path + exact_order: bool = False + + class AcceptanceTestFileTypes(BaseModel): + skip_test: bool + bypass_reason: str + + config_path: Path + configured_catalog_path: Path | None = None + timeout_seconds: int | None = None + expect_records: AcceptanceTestExpectRecords | None = None + file_types: AcceptanceTestFileTypes | None = None + status: Literal["succeed", "failed"] | None = None + + @property + def expect_exception(self) -> bool: + return self.status and self.status == "failed" + + @property + def instance_name(self) -> str: + return self.config_path.stem diff --git a/airbyte_cdk/test/declarative/test_suites/__init__.py b/airbyte_cdk/test/declarative/test_suites/__init__.py new file mode 100644 index 000000000..d36cb2a5f --- /dev/null +++ b/airbyte_cdk/test/declarative/test_suites/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Declarative test suites. + +Here we have base classes for a robust set of declarative connector test suites. +""" + +from airbyte_cdk.test.declarative.test_suites.connector_base import ConnectorTestSuiteBase +from airbyte_cdk.test.declarative.test_suites.destination_base import DestinationTestSuiteBase +from airbyte_cdk.test.declarative.test_suites.source_base import SourceTestSuiteBase + +__all__ = [ + "ConnectorTestSuiteBase", + "DestinationTestSuiteBase", + "SourceTestSuiteBase", +] diff --git a/airbyte_cdk/test/declarative/test_suites/connector_base.py b/airbyte_cdk/test/declarative/test_suites/connector_base.py new file mode 100644 index 000000000..53870bd5c --- /dev/null +++ b/airbyte_cdk/test/declarative/test_suites/connector_base.py @@ -0,0 +1,110 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Base class for connector test suites.""" + +from __future__ import annotations + +import abc +from pathlib import Path +from typing import Any, Literal + +import pytest +import yaml +from airbyte_connector_tester.job_runner import run_test_job +from pydantic import BaseModel + +from airbyte_cdk import Connector +from airbyte_cdk.models import ( + AirbyteMessage, + Type, +) +from airbyte_cdk.test import entrypoint_wrapper +from airbyte_cdk.test.declarative.models import ( + AcceptanceTestScenario, +) + +ACCEPTANCE_TEST_CONFIG_PATH = Path("acceptance-test-config.yml") + + +class ConnectorTestSuiteBase(abc.ABC): + """Base class for connector test suites.""" + + acceptance_test_file_path = Path("./acceptance-test-config.json") + """The path to the acceptance test config file. + + By default, this is set to the `acceptance-test-config.json` file in + the root of the connector source directory. + """ + + connector_class: type[Connector] + """The connector class to test.""" + + # Public Methods - Subclasses may override these + + @abc.abstractmethod + def new_connector(self, **kwargs: dict[str, Any]) -> Connector: + """Create a new connector instance. + + By default, this returns a new instance of the connector class. Subclasses + may override this method to generate a dynamic connector instance. + """ + return self.connector_factory() + + # Internal Methods - We don't expect subclasses to override these + + @classmethod + def _get_acceptance_tests( + category: str, + accept_test_config_path: Path = ACCEPTANCE_TEST_CONFIG_PATH, + ) -> list[AcceptanceTestScenario]: + all_tests_config = yaml.safe_load(accept_test_config_path.read_text()) + if "acceptance_tests" not in all_tests_config: + raise ValueError(f"Acceptance tests config not found in {accept_test_config_path}") + if category not in all_tests_config["acceptance_tests"]: + return [] + if "tests" not in all_tests_config["acceptance_tests"][category]: + raise ValueError(f"No tests found for category {category}") + + return [ + AcceptanceTestScenario.model_validate(test) + for test in all_tests_config["acceptance_tests"][category]["tests"] + if "iam_role" not in test["config_path"] + ] + + # Test Definitions + + @pytest.mark.parametrize( + "test_input,expected", + [ + ("3+5", 8), + ("2+4", 6), + ("6*9", 54), + ], + ) + def test_use_plugin_parametrized_test( + self, + test_input, + expected, + ): + assert eval(test_input) == expected + + @pytest.mark.parametrize( + "instance", + self._get_acceptance_tests("connection"), + ids=lambda instance: instance.instance_name, + ) + def test_check( + self, + instance: AcceptanceTestScenario, + ) -> None: + """Run `connection` acceptance tests.""" + result: entrypoint_wrapper.EntrypointOutput = run_test_job( + self.new_connector(), + "check", + test_instance=instance, + ) + conn_status_messages: list[AirbyteMessage] = [ + msg for msg in result._messages if msg.type == Type.CONNECTION_STATUS + ] # noqa: SLF001 # Non-public API + assert len(conn_status_messages) == 1, ( + "Expected exactly one CONNECTION_STATUS message. Got: \n" + "\n".join(result._messages) + ) diff --git a/airbyte_cdk/test/declarative/test_suites/destination_base.py b/airbyte_cdk/test/declarative/test_suites/destination_base.py new file mode 100644 index 000000000..f06efec14 --- /dev/null +++ b/airbyte_cdk/test/declarative/test_suites/destination_base.py @@ -0,0 +1,12 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Base class for destination test suites.""" + +from airbyte_connector_tester.connector_tests import ConnectorTestSuiteBase + + +class DestinationTestSuiteBase(ConnectorTestSuiteBase): + """Base class for destination test suites. + + This class provides a base set of functionality for testing destination connectors, and it + inherits all generic connector tests from the `ConnectorTestSuiteBase` class. + """ diff --git a/airbyte_cdk/test/declarative/test_suites/source_base.py b/airbyte_cdk/test/declarative/test_suites/source_base.py new file mode 100644 index 000000000..f2dee9eec --- /dev/null +++ b/airbyte_cdk/test/declarative/test_suites/source_base.py @@ -0,0 +1,139 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Base class for source test suites.""" + +from dataclasses import asdict +from pathlib import Path + +import pytest +from airbyte_connector_tester.connector_tests import ConnectorTestSuiteBase +from airbyte_connector_tester.instances import ( + AcceptanceTestScenario, + get_acceptance_tests, +) +from airbyte_connector_tester.job_runner import run_test_job + +from airbyte_cdk.models import ( + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + SyncMode, +) + + +class SourceTestSuiteBase(ConnectorTestSuiteBase): + """Base class for source test suites. + + This class provides a base set of functionality for testing source connectors, and it + inherits all generic connector tests from the `ConnectorTestSuiteBase` class. + """ + + @pytest.mark.parametrize( + "instance", + get_acceptance_tests("full_refresh"), + ids=lambda instance: instance.instance_name, + ) + def test_full_refresh( + self, + instance: AcceptanceTestScenario, + ) -> None: + """Run acceptance tests.""" + result = run_test_job( + self.new_connector(), + "read", + test_instance=instance, + ) + if not result.records: + raise AssertionError("Expected records but got none.") # noqa: TRY003 + + @pytest.mark.parametrize( + "instance", + get_acceptance_tests("basic_read"), + ids=lambda instance: instance.instance_name, + ) + def test_basic_read( + self, + instance: AcceptanceTestScenario, + ) -> None: + """Run acceptance tests.""" + discover_result = run_test_job( + self.new_connector(), + "discover", + test_instance=instance, + ) + assert discover_result.catalog, "Expected a non-empty catalog." + configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=stream, + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append_dedup, + ) + for stream in discover_result.catalog.catalog.streams + ] + ) + result = run_test_job( + self.new_connector(), + "read", + test_instance=instance, + catalog=configured_catalog, + ) + + if not result.records: + raise AssertionError("Expected records but got none.") # noqa: TRY003 + + @pytest.mark.parametrize( + "instance", + get_acceptance_tests("basic_read"), + ids=lambda instance: instance.instance_name, + ) + def test_fail_with_bad_catalog( + self, + instance: AcceptanceTestScenario, + ) -> None: + """Test that a bad catalog fails.""" + invalid_configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + # Create ConfiguredAirbyteStream which is deliberately invalid + # with regard to the Airbyte Protocol. + # This should cause the connector to fail. + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="__AIRBYTE__stream_that_does_not_exist", + json_schema={ + "type": "object", + "properties": {"f1": {"type": "string"}}, + }, + supported_sync_modes=[SyncMode.full_refresh], + ), + sync_mode="INVALID", + destination_sync_mode="INVALID", + ) + ] + ) + # Set expected status to "failed" to ensure the test fails if the connector. + instance.status = "failed" + result = run_test_job( + self.new_connector(), + "read", + test_instance=instance, + catalog=asdict(invalid_configured_catalog), + ) + assert result.errors, "Expected errors but got none." + assert result.trace_messages, "Expected trace messages but got none." + + @pytest.mark.parametrize( + "instance", + get_acceptance_tests("full_refresh"), + ids=lambda instance: instance.instance_name, + ) + def test_discover( + self, + instance: AcceptanceTestScenario, + ) -> None: + """Run acceptance tests.""" + run_test_job( + self.new_connector(), + "check", + test_instance=instance, + ) diff --git a/airbyte_cdk/test/declarative/utils/__init__.py b/airbyte_cdk/test/declarative/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/airbyte_cdk/test/declarative/utils/job_runner.py b/airbyte_cdk/test/declarative/utils/job_runner.py new file mode 100644 index 000000000..c7d2c0c86 --- /dev/null +++ b/airbyte_cdk/test/declarative/utils/job_runner.py @@ -0,0 +1,82 @@ +import tempfile +import uuid +from pathlib import Path +from typing import Callable, Literal + +import orjson +from airbyte_connector_tester.instances import AcceptanceTestScenario + +from airbyte_cdk import Connector +from airbyte_cdk.test import entrypoint_wrapper + + +def run_test_job( + connector: Connector | type[Connector] | Callable[[], Connector], + verb: Literal["read", "check", "discover"], + test_instance: AcceptanceTestScenario, + *, + catalog: dict | None = None, +) -> entrypoint_wrapper.EntrypointOutput: + """Run a test job from provided CLI args and return the result.""" + if not connector: + raise ValueError("Connector is required") + + connector_obj: Connector + if isinstance(connector, type): + connector_obj = connector() + elif isinstance(connector, Connector): + connector_obj = connector + elif isinstance(connector, Callable): + try: + connector_obj = connector() + except Exception as ex: + if not test_instance.expect_exception: + raise + + return entrypoint_wrapper.EntrypointOutput( + messages=[], + uncaught_exception=ex, + ) + else: + raise ValueError(f"Invalid source type: {type(connector)}") + + args = [verb] + if test_instance.config_path: + args += ["--config", str(test_instance.config_path)] + + catalog_path: Path | None = None + if verb not in ["discover", "check"]: + if catalog: + # Write the catalog to a temp json file and pass the path to the file as an argument. + catalog_path = ( + Path(tempfile.gettempdir()) + / "airbyte-test" + / f"temp_catalog_{uuid.uuid4().hex}.json" + ) + catalog_path.parent.mkdir(parents=True, exist_ok=True) + catalog_path.write_text(orjson.dumps(catalog).decode()) + elif test_instance.configured_catalog_path: + catalog_path = Path(test_instance.configured_catalog_path) + + if catalog_path: + args += ["--catalog", str(catalog_path)] + + # This is a bit of a hack because the source needs the catalog early. + # Because it *also* can fail, we have ot redundantly wrap it in a try/except block. + + result: entrypoint_wrapper.EntrypointOutput = entrypoint_wrapper._run_command( # noqa: SLF001 # Non-public API + source=connector_obj, + args=args, + expecting_exception=test_instance.expect_exception, + ) + if result.errors and not test_instance.expect_exception: + raise AssertionError( + "\n\n".join( + [str(err.trace.error).replace("\\n", "\n") for err in result.errors], + ) + ) + + if test_instance.expect_exception and not result.errors: + raise AssertionError("Expected exception but got none.") # noqa: TRY003 + + return result diff --git a/airbyte_cdk/test/fixtures/__init__.py b/airbyte_cdk/test/fixtures/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/airbyte_cdk/test/fixtures/auto.py b/airbyte_cdk/test/fixtures/auto.py new file mode 100644 index 000000000..8f7a34403 --- /dev/null +++ b/airbyte_cdk/test/fixtures/auto.py @@ -0,0 +1,14 @@ +"""Auto-use fixtures for pytest. + +WARNING: Importing this module will automatically apply these fixtures. If you want to selectively +enable fixtures in a different context, you can import directly from the `fixtures.general` module. + + +Usage: + +```python +from airbyte_cdk.test.fixtures import auto +# OR +from airbyte_cdk.test.fixtures.auto import * +``` +""" diff --git a/airbyte_cdk/test/fixtures/general.py b/airbyte_cdk/test/fixtures/general.py new file mode 100644 index 000000000..fe3d7b317 --- /dev/null +++ b/airbyte_cdk/test/fixtures/general.py @@ -0,0 +1,14 @@ +"""General fixtures for pytest. + +Usage: + +```python +from airbyte_cdk.test.fixtures.general import * +# OR: +from airbyte_cdk.test.fixtures.general import connector_test_dir +``` +""" + +@pytest.fixture +def connector_test_dir(): + return Path(__file__).parent diff --git a/airbyte_cdk/test/pytest_config/plugin.py b/airbyte_cdk/test/pytest_config/plugin.py new file mode 100644 index 000000000..b24cdd332 --- /dev/null +++ b/airbyte_cdk/test/pytest_config/plugin.py @@ -0,0 +1,40 @@ +from pathlib import Path + +import pytest + + +def pytest_collect_file(parent, path): + if path.basename == "test_connector.py": + return pytest.Module.from_parent(parent, path=path) + + +def pytest_configure(config): + config.addinivalue_line("markers", "connector: mark test as a connector test") + + +def pytest_addoption(parser): + parser.addoption( + "--run-connector", + action="store_true", + default=False, + help="run connector tests", + ) + + +def pytest_collection_modifyitems(config, items): + if config.getoption("--run-connector"): + return + skip_connector = pytest.mark.skip(reason="need --run-connector option to run") + for item in items: + if "connector" in item.keywords: + item.add_marker(skip_connector) + + +def pytest_runtest_setup(item): + # This hook is called before each test function is executed + print(f"Setting up test: {item.name}") + + +def pytest_runtest_teardown(item, nextitem): + # This hook is called after each test function is executed + print(f"Tearing down test: {item.name}") diff --git a/pyproject.toml b/pyproject.toml index 88a27230b..7035f39d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -109,8 +109,8 @@ vector-db-based = ["langchain", "openai", "cohere", "tiktoken"] sql = ["sqlalchemy"] [tool.poetry.scripts] - source-declarative-manifest = "airbyte_cdk.cli.source_declarative_manifest:run" +connector-builder-test-read = "airbyte_cdk.cli.connector_builder_test_read:run" [tool.isort] skip = ["__init__.py"] # TODO: Remove after this is fixed: https://github.com/airbytehq/airbyte-python-cdk/issues/12 diff --git a/unit_tests/test/test_declarative_test_suites.py b/unit_tests/test/test_declarative_test_suites.py new file mode 100644 index 000000000..e69de29bb