Skip to content
4 changes: 4 additions & 0 deletions airbyte_cdk/test/entrypoint_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ def records(self) -> List[AirbyteMessage]:
def state_messages(self) -> List[AirbyteMessage]:
return self._get_message_by_types([Type.STATE])

@property
def spec_messages(self) -> List[AirbyteMessage]:
return self._get_message_by_types([Type.SPEC])

@property
def connection_status_messages(self) -> List[AirbyteMessage]:
return self._get_message_by_types([Type.CONNECTION_STATUS])
Expand Down
15 changes: 9 additions & 6 deletions airbyte_cdk/test/standard_tests/_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ def spec(self, logger: logging.Logger) -> Any:

def run_test_job(
connector: IConnector | type[IConnector] | Callable[[], IConnector],
verb: Literal["read", "check", "discover"],
test_scenario: ConnectorTestScenario,
verb: Literal["spec", "read", "check", "discover"],
*,
test_scenario: ConnectorTestScenario | None = None,
catalog: ConfiguredAirbyteCatalog | dict[str, Any] | None = None,
) -> entrypoint_wrapper.EntrypointOutput:
"""Run a test scenario from provided CLI args and return the result."""
# Use default (empty) scenario if not provided:
test_scenario = test_scenario or ConnectorTestScenario()

if not connector:
raise ValueError("Connector is required")

Expand All @@ -81,14 +84,14 @@ def run_test_job(
)

args: list[str] = [verb]
if test_scenario.config_path:
args += ["--config", str(test_scenario.config_path)]
elif test_scenario.config_dict:
config_dict = test_scenario.get_config_dict(empty_if_missing=True)
if config_dict and verb != "spec":
# Write the config to a temp json file and pass the path to the file as an argument.
config_path = (
Path(tempfile.gettempdir()) / "airbyte-test" / f"temp_config_{uuid.uuid4().hex}.json"
)
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(orjson.dumps(test_scenario.config_dict).decode())
config_path.write_text(orjson.dumps(config_dict).decode())
args += ["--config", str(config_path)]

catalog_path: Path | None = None
Expand Down
35 changes: 21 additions & 14 deletions airbyte_cdk/test/standard_tests/connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def get_test_class_dir(cls) -> Path:
@classmethod
def create_connector(
cls,
scenario: ConnectorTestScenario,
scenario: ConnectorTestScenario | None,
) -> IConnector:
"""Instantiate the connector class."""
connector = cls.connector # type: ignore
Expand Down Expand Up @@ -147,28 +147,35 @@ def get_scenarios(
This has to be a separate function because pytest does not allow
parametrization of fixtures with arguments from the test class itself.
"""
category = "connection"
categories = ["connection", "spec"]
all_tests_config = yaml.safe_load(cls.acceptance_test_config_path.read_text())
if "acceptance_tests" not in all_tests_config:
raise ValueError(
f"Acceptance tests config not found in {cls.acceptance_test_config_path}."
f" Found only: {str(all_tests_config)}."
)
if category not in all_tests_config["acceptance_tests"]:
return []
if "tests" not in all_tests_config["acceptance_tests"][category]:
raise ValueError(f"No tests found for category {category}")

tests_scenarios = [
ConnectorTestScenario.model_validate(test)
for test in all_tests_config["acceptance_tests"][category]["tests"]
if "iam_role" not in test["config_path"]
]

test_scenarios: list[ConnectorTestScenario] = []
for category in categories:
if (
category not in all_tests_config["acceptance_tests"]
or "tests" not in all_tests_config["acceptance_tests"][category]
):
continue

test_scenarios.extend(
[
ConnectorTestScenario.model_validate(test)
for test in all_tests_config["acceptance_tests"][category]["tests"]
if "config_path" in test and "iam_role" not in test["config_path"]
]
)

connector_root = cls.get_connector_root_dir().absolute()
for test in tests_scenarios:
for test in test_scenarios:
if test.config_path:
test.config_path = connector_root / test.config_path
if test.configured_catalog_path:
test.configured_catalog_path = connector_root / test.configured_catalog_path

return tests_scenarios
return test_scenarios
10 changes: 7 additions & 3 deletions airbyte_cdk/test/standard_tests/declarative_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def components_py_path(cls) -> Path | None:
@classmethod
def create_connector(
cls,
scenario: ConnectorTestScenario,
scenario: ConnectorTestScenario | None,
) -> IConnector:
"""Create a connector scenario for the test suite.

Expand All @@ -73,9 +73,13 @@ def create_connector(

Subclasses should not need to override this method.
"""
config: dict[str, Any] = scenario.get_config_dict()

scenario = scenario or ConnectorTestScenario() # Use default (empty) scenario if None
manifest_dict = yaml.safe_load(cls.manifest_yaml_path.read_text())
config = {
"__injected_manifest": manifest_dict,
}
config.update(scenario.get_config_dict(empty_if_missing=True))

if cls.components_py_path and cls.components_py_path.exists():
os.environ["AIRBYTE_ENABLE_UNSAFE_CODE"] = "true"
config["__injected_components_py"] = cls.components_py_path.read_text()
Expand Down
17 changes: 14 additions & 3 deletions airbyte_cdk/test/standard_tests/models/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,29 @@ class AcceptanceTestFileTypes(BaseModel):
file_types: AcceptanceTestFileTypes | None = None
status: Literal["succeed", "failed"] | None = None

def get_config_dict(self) -> dict[str, Any]:
def get_config_dict(
self,
*,
empty_if_missing: bool,
) -> dict[str, Any]:
"""Return the config dictionary.

If a config dictionary has already been loaded, return it. Otherwise, load
the config file and return the dictionary.

If `self.config_dict` and `self.config_path` are both `None`:
- return an empty dictionary if `empty_if_missing` is True
- raise a ValueError if `empty_if_missing` is False
"""
if self.config_dict:
if self.config_dict is not None:
return self.config_dict

if self.config_path:
if self.config_path is not None:
return cast(dict[str, Any], yaml.safe_load(self.config_path.read_text()))

if empty_if_missing:
return {}

raise ValueError("No config dictionary or path provided.")

@property
Expand Down
24 changes: 24 additions & 0 deletions airbyte_cdk/test/standard_tests/source_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,30 @@ def test_discover(
test_scenario=scenario,
)

def test_spec(self) -> None:
"""Standard test for `spec`.

This test does not require a `scenario` input, since `spec`
does not require any inputs.

We assume `spec` should always succeed and it should always generate
a valid `SPEC` message.

Note: the parsing of messages by type also implicitly validates that
the generated `SPEC` message is valid JSON.
"""
result = run_test_job(
verb="spec",
test_scenario=None,
connector=self.create_connector(scenario=None),
)
# If an error occurs, it will be raised above.

assert len(result.spec_messages) == 1, (
"Expected exactly 1 spec message but got {len(result.spec_messages)}",
result.errors,
)

def test_basic_read(
self,
scenario: ConnectorTestScenario,
Expand Down
Loading