diff --git a/airbyte_cdk/test/entrypoint_wrapper.py b/airbyte_cdk/test/entrypoint_wrapper.py index 43c84204a..79c328203 100644 --- a/airbyte_cdk/test/entrypoint_wrapper.py +++ b/airbyte_cdk/test/entrypoint_wrapper.py @@ -82,6 +82,10 @@ def records(self) -> List[AirbyteMessage]: def state_messages(self) -> List[AirbyteMessage]: return self._get_message_by_types([Type.STATE]) + @property + def spec_messages(self) -> List[AirbyteMessage]: + return self._get_message_by_types([Type.SPEC]) + @property def connection_status_messages(self) -> List[AirbyteMessage]: return self._get_message_by_types([Type.CONNECTION_STATUS]) diff --git a/airbyte_cdk/test/standard_tests/_job_runner.py b/airbyte_cdk/test/standard_tests/_job_runner.py index bab170361..ad8316d78 100644 --- a/airbyte_cdk/test/standard_tests/_job_runner.py +++ b/airbyte_cdk/test/standard_tests/_job_runner.py @@ -56,12 +56,15 @@ def spec(self, logger: logging.Logger) -> Any: def run_test_job( connector: IConnector | type[IConnector] | Callable[[], IConnector], - verb: Literal["read", "check", "discover"], - test_scenario: ConnectorTestScenario, + verb: Literal["spec", "read", "check", "discover"], *, + test_scenario: ConnectorTestScenario | None = None, catalog: ConfiguredAirbyteCatalog | dict[str, Any] | None = None, ) -> entrypoint_wrapper.EntrypointOutput: """Run a test scenario from provided CLI args and return the result.""" + # Use default (empty) scenario if not provided: + test_scenario = test_scenario or ConnectorTestScenario() + if not connector: raise ValueError("Connector is required") @@ -81,14 +84,14 @@ def run_test_job( ) args: list[str] = [verb] - if test_scenario.config_path: - args += ["--config", str(test_scenario.config_path)] - elif test_scenario.config_dict: + config_dict = test_scenario.get_config_dict(empty_if_missing=True) + if config_dict and verb != "spec": + # Write the config to a temp json file and pass the path to the file as an argument. config_path = ( Path(tempfile.gettempdir()) / "airbyte-test" / f"temp_config_{uuid.uuid4().hex}.json" ) config_path.parent.mkdir(parents=True, exist_ok=True) - config_path.write_text(orjson.dumps(test_scenario.config_dict).decode()) + config_path.write_text(orjson.dumps(config_dict).decode()) args += ["--config", str(config_path)] catalog_path: Path | None = None diff --git a/airbyte_cdk/test/standard_tests/connector_base.py b/airbyte_cdk/test/standard_tests/connector_base.py index 35bcdbe8f..7d4c35132 100644 --- a/airbyte_cdk/test/standard_tests/connector_base.py +++ b/airbyte_cdk/test/standard_tests/connector_base.py @@ -89,7 +89,7 @@ def get_test_class_dir(cls) -> Path: @classmethod def create_connector( cls, - scenario: ConnectorTestScenario, + scenario: ConnectorTestScenario | None, ) -> IConnector: """Instantiate the connector class.""" connector = cls.connector # type: ignore @@ -147,28 +147,35 @@ def get_scenarios( This has to be a separate function because pytest does not allow parametrization of fixtures with arguments from the test class itself. """ - category = "connection" + categories = ["connection", "spec"] all_tests_config = yaml.safe_load(cls.acceptance_test_config_path.read_text()) if "acceptance_tests" not in all_tests_config: raise ValueError( f"Acceptance tests config not found in {cls.acceptance_test_config_path}." f" Found only: {str(all_tests_config)}." ) - if category not in all_tests_config["acceptance_tests"]: - return [] - if "tests" not in all_tests_config["acceptance_tests"][category]: - raise ValueError(f"No tests found for category {category}") - - tests_scenarios = [ - ConnectorTestScenario.model_validate(test) - for test in all_tests_config["acceptance_tests"][category]["tests"] - if "iam_role" not in test["config_path"] - ] + + test_scenarios: list[ConnectorTestScenario] = [] + for category in categories: + if ( + category not in all_tests_config["acceptance_tests"] + or "tests" not in all_tests_config["acceptance_tests"][category] + ): + continue + + test_scenarios.extend( + [ + ConnectorTestScenario.model_validate(test) + for test in all_tests_config["acceptance_tests"][category]["tests"] + if "config_path" in test and "iam_role" not in test["config_path"] + ] + ) + connector_root = cls.get_connector_root_dir().absolute() - for test in tests_scenarios: + for test in test_scenarios: if test.config_path: test.config_path = connector_root / test.config_path if test.configured_catalog_path: test.configured_catalog_path = connector_root / test.configured_catalog_path - return tests_scenarios + return test_scenarios diff --git a/airbyte_cdk/test/standard_tests/declarative_sources.py b/airbyte_cdk/test/standard_tests/declarative_sources.py index e1954246f..a105e26a4 100644 --- a/airbyte_cdk/test/standard_tests/declarative_sources.py +++ b/airbyte_cdk/test/standard_tests/declarative_sources.py @@ -64,7 +64,7 @@ def components_py_path(cls) -> Path | None: @classmethod def create_connector( cls, - scenario: ConnectorTestScenario, + scenario: ConnectorTestScenario | None, ) -> IConnector: """Create a connector scenario for the test suite. @@ -73,9 +73,13 @@ def create_connector( Subclasses should not need to override this method. """ - config: dict[str, Any] = scenario.get_config_dict() - + scenario = scenario or ConnectorTestScenario() # Use default (empty) scenario if None manifest_dict = yaml.safe_load(cls.manifest_yaml_path.read_text()) + config = { + "__injected_manifest": manifest_dict, + } + config.update(scenario.get_config_dict(empty_if_missing=True)) + if cls.components_py_path and cls.components_py_path.exists(): os.environ["AIRBYTE_ENABLE_UNSAFE_CODE"] = "true" config["__injected_components_py"] = cls.components_py_path.read_text() diff --git a/airbyte_cdk/test/standard_tests/models/scenario.py b/airbyte_cdk/test/standard_tests/models/scenario.py index 944b60921..0ace85d33 100644 --- a/airbyte_cdk/test/standard_tests/models/scenario.py +++ b/airbyte_cdk/test/standard_tests/models/scenario.py @@ -43,18 +43,29 @@ class AcceptanceTestFileTypes(BaseModel): file_types: AcceptanceTestFileTypes | None = None status: Literal["succeed", "failed"] | None = None - def get_config_dict(self) -> dict[str, Any]: + def get_config_dict( + self, + *, + empty_if_missing: bool, + ) -> dict[str, Any]: """Return the config dictionary. If a config dictionary has already been loaded, return it. Otherwise, load the config file and return the dictionary. + + If `self.config_dict` and `self.config_path` are both `None`: + - return an empty dictionary if `empty_if_missing` is True + - raise a ValueError if `empty_if_missing` is False """ - if self.config_dict: + if self.config_dict is not None: return self.config_dict - if self.config_path: + if self.config_path is not None: return cast(dict[str, Any], yaml.safe_load(self.config_path.read_text())) + if empty_if_missing: + return {} + raise ValueError("No config dictionary or path provided.") @property diff --git a/airbyte_cdk/test/standard_tests/source_base.py b/airbyte_cdk/test/standard_tests/source_base.py index 83cc7326f..a256fa04c 100644 --- a/airbyte_cdk/test/standard_tests/source_base.py +++ b/airbyte_cdk/test/standard_tests/source_base.py @@ -64,6 +64,30 @@ def test_discover( test_scenario=scenario, ) + def test_spec(self) -> None: + """Standard test for `spec`. + + This test does not require a `scenario` input, since `spec` + does not require any inputs. + + We assume `spec` should always succeed and it should always generate + a valid `SPEC` message. + + Note: the parsing of messages by type also implicitly validates that + the generated `SPEC` message is valid JSON. + """ + result = run_test_job( + verb="spec", + test_scenario=None, + connector=self.create_connector(scenario=None), + ) + # If an error occurs, it will be raised above. + + assert len(result.spec_messages) == 1, ( + "Expected exactly 1 spec message but got {len(result.spec_messages)}", + result.errors, + ) + def test_basic_read( self, scenario: ConnectorTestScenario,