Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 39 additions & 11 deletions airbyte_cdk/test/entrypoint_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
Type,
)
from airbyte_cdk.sources import Source
from airbyte_cdk.test.models.scenario import ExpectedOutcome


class EntrypointOutput:
Expand Down Expand Up @@ -157,8 +158,22 @@ def is_not_in_logs(self, pattern: str) -> bool:


def _run_command(
source: Source, args: List[str], expecting_exception: bool = False
source: Source,
args: List[str],
expecting_exception: bool | None = None, # Deprecated, use `expected_outcome` instead.
*,
expected_outcome: ExpectedOutcome | None = None,
) -> EntrypointOutput:
"""Internal function to run a command with the AirbyteEntrypoint.

Note: Even though this function is private, some connectors do call it directly.

Note: The `expecting_exception` arg is now deprecated in favor of the tri-state
`expected_outcome` arg. The old argument is supported (for now) for backwards compatibility.
"""
expected_outcome = expected_outcome or ExpectedOutcome.from_expecting_exception_bool(
expecting_exception,
)
log_capture_buffer = StringIO()
stream_handler = logging.StreamHandler(log_capture_buffer)
stream_handler.setLevel(logging.INFO)
Expand All @@ -175,35 +190,41 @@ def _run_command(
for message in source_entrypoint.run(parsed_args):
messages.append(message)
except Exception as exception:
if not expecting_exception:
if expected_outcome.expect_success():
print("Printing unexpected error from entrypoint_wrapper")
print("".join(traceback.format_exception(None, exception, exception.__traceback__)))

uncaught_exception = exception

captured_logs = log_capture_buffer.getvalue().split("\n")[:-1]

parent_logger.removeHandler(stream_handler)

return EntrypointOutput(messages + captured_logs, uncaught_exception)
return EntrypointOutput(messages + captured_logs, uncaught_exception=uncaught_exception)


def discover(
source: Source,
config: Mapping[str, Any],
expecting_exception: bool = False,
expecting_exception: bool | None = None, # Deprecated, use `expected_outcome` instead.
*,
expected_outcome: ExpectedOutcome | None = None,
) -> EntrypointOutput:
"""
config must be json serializable
:param expecting_exception: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please
provide expecting_exception=True so that the test output logs are cleaner
:param expected_outcome: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please
provide `expected_outcome=ExpectedOutcome.EXPECT_FAILURE` so that the test output logs are cleaner
"""

with tempfile.TemporaryDirectory() as tmp_directory:
tmp_directory_path = Path(tmp_directory)
config_file = make_file(tmp_directory_path / "config.json", config)

return _run_command(
source, ["discover", "--config", config_file, "--debug"], expecting_exception
source,
["discover", "--config", config_file, "--debug"],
expecting_exception=expecting_exception, # Deprecated, but still supported.
expected_outcome=expected_outcome,
)


Expand All @@ -212,13 +233,15 @@ def read(
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False,
expecting_exception: bool | None = None, # Deprecated, use `expected_outcome` instead.
*,
expected_outcome: ExpectedOutcome | None = None,
) -> EntrypointOutput:
"""
config and state must be json serializable

:param expecting_exception: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please
provide expecting_exception=True so that the test output logs are cleaner
:param expected_outcome: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please
provide `expected_outcome=ExpectedOutcome.EXPECT_FAILURE` so that the test output logs are cleaner.
"""
with tempfile.TemporaryDirectory() as tmp_directory:
tmp_directory_path = Path(tmp_directory)
Expand All @@ -245,7 +268,12 @@ def read(
]
)

return _run_command(source, args, expecting_exception)
return _run_command(
source,
args,
expecting_exception=expecting_exception, # Deprecated, but still supported.
expected_outcome=expected_outcome,
)


def make_file(
Expand Down
10 changes: 10 additions & 0 deletions airbyte_cdk/test/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Models used for standard tests."""

from airbyte_cdk.test.models.outcome import ExpectedOutcome
from airbyte_cdk.test.models.scenario import ConnectorTestScenario

__all__ = [
"ConnectorTestScenario",
"ExpectedOutcome",
]
58 changes: 58 additions & 0 deletions airbyte_cdk/test/models/outcome.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Run acceptance tests in PyTest.

These tests leverage the same `acceptance-test-config.yml` configuration files as the
acceptance tests in CAT, but they run in PyTest instead of CAT. This allows us to run
the acceptance tests in the same local environment as we are developing in, speeding
up iteration cycles.
"""

from __future__ import annotations

from enum import Enum, auto


class ExpectedOutcome(Enum):
"""Enum to represent the expected outcome of a test scenario.

Class supports comparisons to a boolean or None.
"""

EXPECT_EXCEPTION = auto()
EXPECT_SUCCESS = auto()
ALLOW_ANY = auto()

@classmethod
def from_status_str(cls, status: str | None) -> ExpectedOutcome:
"""Convert a status string to an ExpectedOutcome."""
if status is None:
return ExpectedOutcome.ALLOW_ANY

try:
return {
"succeed": ExpectedOutcome.EXPECT_SUCCESS,
"failed": ExpectedOutcome.EXPECT_EXCEPTION,
}[status]
except KeyError as ex:
raise ValueError(f"Invalid status '{status}'. Expected 'succeed' or 'failed'.") from ex

@classmethod
def from_expecting_exception_bool(cls, expecting_exception: bool | None) -> ExpectedOutcome:
"""Convert a boolean indicating whether an exception is expected to an ExpectedOutcome."""
if expecting_exception is None:
# Align with legacy behavior where default would be 'False' (no exception expected)
return ExpectedOutcome.EXPECT_SUCCESS

return (
ExpectedOutcome.EXPECT_EXCEPTION
if expecting_exception
else ExpectedOutcome.EXPECT_SUCCESS
)

def expect_exception(self) -> bool:
"""Return whether the expectation is that an exception should be raised."""
return self == ExpectedOutcome.EXPECT_EXCEPTION

def expect_success(self) -> bool:
"""Return whether the expectation is that the test should succeed without exceptions."""
return self == ExpectedOutcome.EXPECT_SUCCESS
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@

from __future__ import annotations

from pathlib import Path
from pathlib import Path # noqa: TC003 # Pydantic needs this (don't move to 'if typing' block)
from typing import Any, Literal, cast

import yaml
from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict

from airbyte_cdk.test.models.outcome import ExpectedOutcome


class ConnectorTestScenario(BaseModel):
Expand All @@ -24,6 +26,10 @@ class ConnectorTestScenario(BaseModel):
acceptance test configuration file.
"""

# Allows the class to be hashable, which PyTest will require
# when we use to parameterize tests.
model_config = ConfigDict(frozen=True)

class AcceptanceTestExpectRecords(BaseModel):
path: Path
exact_order: bool = False
Expand All @@ -46,6 +52,7 @@ class AcceptanceTestFileTypes(BaseModel):
def get_config_dict(
self,
*,
connector_root: Path,
empty_if_missing: bool,
) -> dict[str, Any]:
"""Return the config dictionary.
Expand All @@ -61,16 +68,29 @@ def get_config_dict(
return self.config_dict

if self.config_path is not None:
return cast(dict[str, Any], yaml.safe_load(self.config_path.read_text()))
config_path = self.config_path
if not config_path.is_absolute():
# We usually receive a relative path here. Let's resolve it.
config_path = (connector_root / self.config_path).resolve().absolute()

return cast(
dict[str, Any],
yaml.safe_load(config_path.read_text()),
)

if empty_if_missing:
return {}

raise ValueError("No config dictionary or path provided.")

@property
def expect_exception(self) -> bool:
return self.status and self.status == "failed" or False
def expected_outcome(self) -> ExpectedOutcome:
"""Whether the test scenario expects an exception to be raised.

Returns True if the scenario expects an exception, False if it does not,
and None if there is no set expectation.
"""
return ExpectedOutcome.from_status_str(self.status)

@property
def instance_name(self) -> str:
Expand All @@ -83,3 +103,38 @@ def __str__(self) -> str:
return f"'{self.config_path.name}' Test Scenario"

return f"'{hash(self)}' Test Scenario"

def without_expected_outcome(self) -> ConnectorTestScenario:
"""Return a copy of the scenario that does not expect failure or success.

This is useful when running multiple steps, to defer the expectations to a later step.
"""
return ConnectorTestScenario(
**self.model_dump(exclude={"status"}),
)

def with_expecting_failure(self) -> ConnectorTestScenario:
"""Return a copy of the scenario that expects failure.

This is useful when deriving new scenarios from existing ones.
"""
if self.status == "failed":
return self

return ConnectorTestScenario(
**self.model_dump(exclude={"status"}),
status="failed",
)

def with_expecting_success(self) -> ConnectorTestScenario:
"""Return a copy of the scenario that expects success.

This is useful when deriving new scenarios from existing ones.
"""
if self.status == "succeed":
return self

return ConnectorTestScenario(
**self.model_dump(exclude={"status"}),
status="succeed",
)
23 changes: 14 additions & 9 deletions airbyte_cdk/test/standard_tests/_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
Status,
)
from airbyte_cdk.test import entrypoint_wrapper
from airbyte_cdk.test.standard_tests.models import (
from airbyte_cdk.test.models import (
ConnectorTestScenario,
)

Expand Down Expand Up @@ -58,6 +58,7 @@ def run_test_job(
connector: IConnector | type[IConnector] | Callable[[], IConnector],
verb: Literal["spec", "read", "check", "discover"],
*,
connector_root: Path,
test_scenario: ConnectorTestScenario | None = None,
catalog: ConfiguredAirbyteCatalog | dict[str, Any] | None = None,
) -> entrypoint_wrapper.EntrypointOutput:
Expand All @@ -84,7 +85,10 @@ def run_test_job(
)

args: list[str] = [verb]
config_dict = test_scenario.get_config_dict(empty_if_missing=True)
config_dict = test_scenario.get_config_dict(
empty_if_missing=True,
connector_root=connector_root,
)
if config_dict and verb != "spec":
# Write the config to a temp json file and pass the path to the file as an argument.
config_path = (
Expand Down Expand Up @@ -118,9 +122,9 @@ def run_test_job(
result: entrypoint_wrapper.EntrypointOutput = entrypoint_wrapper._run_command( # noqa: SLF001 # Non-public API
source=connector_obj, # type: ignore [arg-type]
args=args,
expecting_exception=test_scenario.expect_exception,
expected_outcome=test_scenario.expected_outcome,
)
if result.errors and not test_scenario.expect_exception:
if result.errors and test_scenario.expected_outcome.expect_success():
raise AssertionError(
f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result)
)
Expand All @@ -135,7 +139,7 @@ def run_test_job(
+ "\n".join([str(msg) for msg in result.connection_status_messages])
+ _errors_to_str(result)
)
if test_scenario.expect_exception:
if test_scenario.expected_outcome.expect_exception():
conn_status = result.connection_status_messages[0].connectionStatus
assert conn_status, (
"Expected CONNECTION_STATUS message to be present. Got: \n"
Expand All @@ -149,14 +153,15 @@ def run_test_job(
return result

# For all other verbs, we assert check that an exception is raised (or not).
if test_scenario.expect_exception:
if test_scenario.expected_outcome.expect_exception():
if not result.errors:
raise AssertionError("Expected exception but got none.")

return result

assert not result.errors, (
f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result)
)
if test_scenario.expected_outcome.expect_success():
assert not result.errors, (
f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result)
)

return result
Loading
Loading