Skip to content

refactor: connectors can launch themselves; deprecate: AirbyteEntrypoint and launch(), add SerDe methods on critical protocol classes #562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions airbyte_cdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@
# Once those issues are resolved, the below can be sorted with isort.
import dunamai as _dunamai

from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.source import Source

# from airbyte_cdk.destinations.destination import Destination
from .config_observation import (
create_connector_config_control_message,
emit_configuration_as_airbyte_control_message,
)
from .connector import BaseConnector, Connector
from .destinations import Destination
from .entrypoint import AirbyteEntrypoint, launch
from .logger import AirbyteLogFormatter, init_logger
from .models import (
Expand All @@ -75,7 +77,6 @@
SyncMode,
Type,
)
from .sources import AbstractSource, Source
from .sources.concurrent_source.concurrent_source import ConcurrentSource
from .sources.concurrent_source.concurrent_source_adapter import ConcurrentSourceAdapter
from .sources.config import BaseConfig
Expand Down Expand Up @@ -212,7 +213,6 @@
"AbstractSource",
"BaseConfig",
"BaseConnector",
"Connector",
"Destination",
"Source",
"TState",
Expand Down
15 changes: 5 additions & 10 deletions airbyte_cdk/cli/source_declarative_manifest/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
)
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.source import TState
from airbyte_cdk.utils.cli_arg_parse import parse_cli_args
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now


Expand Down Expand Up @@ -93,7 +94,7 @@ def handle_command(args: list[str]) -> None:

def _get_local_yaml_source(args: list[str]) -> SourceLocalYaml:
try:
parsed_args = AirbyteEntrypoint.parse_args(args)
parsed_args = parse_cli_args(args)
config, catalog, state = _parse_inputs_into_config_catalog_state(parsed_args)
return SourceLocalYaml(config=config, catalog=catalog, state=state)
except Exception as error:
Expand All @@ -119,10 +120,7 @@ def _get_local_yaml_source(args: list[str]) -> SourceLocalYaml:

def handle_local_manifest_command(args: list[str]) -> None:
source = _get_local_yaml_source(args)
launch(
source=source,
args=args,
)
source.launch_with_cli_args(args)


def handle_remote_manifest_command(args: list[str]) -> None:
Expand All @@ -149,10 +147,7 @@ def handle_remote_manifest_command(args: list[str]) -> None:
print(AirbyteEntrypoint.airbyte_message_to_string(message))
else:
source = create_declarative_source(args)
launch(
source=source,
args=args,
)
source.launch_with_cli_args(args=args)


def create_declarative_source(
Expand All @@ -169,7 +164,7 @@ def create_declarative_source(
catalog: ConfiguredAirbyteCatalog | None
state: list[AirbyteStateMessage]

parsed_args = AirbyteEntrypoint.parse_args(args)
parsed_args = parse_cli_args(args)
config, catalog, state = _parse_inputs_into_config_catalog_state(parsed_args)

if config is None:
Expand Down
161 changes: 111 additions & 50 deletions airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,64 +8,85 @@
import os
import pkgutil
from abc import ABC, abstractmethod
from typing import Any, Generic, Mapping, MutableMapping, Optional, Protocol, TypeVar
from collections.abc import MutableMapping
from pathlib import Path
from typing import Any, Generic, Mapping, Optional, TypeVar

import yaml
from typing_extensions import Self, deprecated

from airbyte_cdk.models import (
AirbyteConnectionStatus,
ConnectorSpecification,
ConnectorSpecificationSerializer,
)
from airbyte_cdk.models import AirbyteConnectionStatus
from airbyte_cdk.models.airbyte_protocol import AirbyteMessage, ConnectorSpecification, Type
from airbyte_cdk.sources.message.repository import MessageRepository, PassthroughMessageRepository
from airbyte_cdk.utils.cli_arg_parse import ConnectorCLIArgs, parse_cli_args


def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
def _load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
"""Gets a resource from a package, returning None if it does not exist"""
try:
return pkgutil.get_data(package, filename)
except FileNotFoundError:
return None


def _write_config(config: Mapping[str, Any], config_path: str) -> None:
Path(config_path).write_text(json.dumps(config))


def _read_json_file(file_path: str) -> Any:
with open(file_path, "r") as file:
contents = file.read()

try:
return json.loads(contents)
except json.JSONDecodeError as error:
raise ValueError(
f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON."
)


def _read_config(config_path: str) -> MutableMapping[str, Any]:
config = _read_json_file(config_path)
if isinstance(config, MutableMapping):
return config
else:
raise ValueError(
f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config."
)


TConfig = TypeVar("TConfig", bound=Mapping[str, Any])


class BaseConnector(ABC, Generic[TConfig]):
# configure whether the `check_config_against_spec_or_exit()` needs to be called
check_config_against_spec: bool = True

@abstractmethod
def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig:
"""
Persist config in temporary directory to run the Source job
"""
@classmethod
def to_typed_config(
cls,
config: Mapping[str, Any],
) -> TConfig:
"""Return a typed config object from a config dictionary."""
...

@staticmethod
def read_config(config_path: str) -> MutableMapping[str, Any]:
config = BaseConnector._read_json_file(config_path)
if isinstance(config, MutableMapping):
return config
else:
raise ValueError(
f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config."
)
return _read_config(config_path)

@staticmethod
def _read_json_file(file_path: str) -> Any:
with open(file_path, "r") as file:
contents = file.read()

try:
return json.loads(contents)
except json.JSONDecodeError as error:
raise ValueError(
f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON."
)
return _read_json_file(file_path)

@staticmethod
def write_config(config: TConfig, config_path: str) -> None:
with open(config_path, "w") as fh:
fh.write(json.dumps(config))
_write_config(config, config_path)

@classmethod
def configure(cls, config: Mapping[str, Any], temp_dir: str) -> TConfig:
config_path = os.path.join(temp_dir, "config.json")
_write_config(config, config_path)
return cls.to_typed_config(config)

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Expand All @@ -75,8 +96,8 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:

package = self.__class__.__module__.split(".")[0]

yaml_spec = load_optional_package_file(package, "spec.yaml")
json_spec = load_optional_package_file(package, "spec.json")
yaml_spec = _load_optional_package_file(package, "spec.yaml")
json_spec = _load_optional_package_file(package, "spec.json")

if yaml_spec and json_spec:
raise RuntimeError(
Expand All @@ -95,7 +116,7 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
else:
raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.")

return ConnectorSpecificationSerializer.load(spec_obj)
return ConnectorSpecification.from_dict(spec_obj)

@abstractmethod
def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionStatus:
Expand All @@ -104,20 +125,60 @@ def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionSta
to the Stripe API.
"""


class _WriteConfigProtocol(Protocol):
@staticmethod
def write_config(config: Mapping[str, Any], config_path: str) -> None: ...


class DefaultConnectorMixin:
# can be overridden to change an input config
def configure(
self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str
) -> Mapping[str, Any]:
config_path = os.path.join(temp_dir, "config.json")
self.write_config(config, config_path)
return config


class Connector(DefaultConnectorMixin, BaseConnector[Mapping[str, Any]], ABC): ...
@classmethod
def create_with_cli_args(
cls,
cli_args: ConnectorCLIArgs,
) -> Self:
"""Return an instance of the connector, using the provided CLI args."""
...

@classmethod
def launch_with_cli_args(
cls,
args: list[str],
*,
logger: logging.Logger | None = None,
message_repository: MessageRepository | None = None,
# TODO: Add support for inputs:
# stdin: StringIO | MessageRepository | None = None,
) -> None:
"""Launches the connector with the provided configuration."""
logger = logger or logging.getLogger(f"airbyte.{type(cls).__name__}")
message_repository = message_repository or PassthroughMessageRepository()
parsed_cli_args: ConnectorCLIArgs = parse_cli_args(
args,
with_read=True if getattr(cls, "read", False) else False,
with_write=True if getattr(cls, "write", False) else False,
with_discover=True if getattr(cls, "discover", False) else False,
)
logger.info(f"Launching connector with args: {parsed_cli_args}")
verb = parsed_cli_args.command

spec: ConnectorSpecification
if verb == "check":
config = cls.to_typed_config(parsed_cli_args.get_config_dict())
connector = cls.create_with_cli_args(parsed_cli_args)
connector.check(logger, config)
elif verb == "spec":
connector = cls()
spec = connector.spec(logger)
message_repository.emit_message(
AirbyteMessage(
type=Type.SPEC,
spec=spec,
)
)
elif verb == "discover":
connector = cls()
spec = connector.spec(logger)
print(json.dumps(spec.to_dict(), indent=2))
elif verb == "read":
# Implementation for reading data goes here
pass
elif verb == "write":
# Implementation for writing data goes here
pass
else:
raise ValueError(f"Unknown command: {verb}")
# Implementation for launching the connector goes here
3 changes: 2 additions & 1 deletion airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.source import Source
from airbyte_cdk.utils.cli_arg_parse import parse_cli_args
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


Expand All @@ -35,7 +36,7 @@ def get_config_and_catalog_from_args(
) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
# TODO: Add functionality for the `debug` logger.
# Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
parsed_args = AirbyteEntrypoint.parse_args(args)
parsed_args = parse_cli_args(args)
config_path, catalog_path, state_path = (
parsed_args.config,
parsed_args.catalog,
Expand Down
4 changes: 1 addition & 3 deletions airbyte_cdk/connector_builder/test_reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,7 @@ def _read_stream(
# the generator can raise an exception
# iterate over the generated messages. if next raise an exception, catch it and yield it as an AirbyteLogMessage
try:
yield from AirbyteEntrypoint(source).read(
source.spec(self.logger), config, configured_catalog, state
)
yield from source.read(source.spec(self.logger), config, configured_catalog, state)
except AirbyteTracedException as traced_exception:
# Look for this message which indicates that it is the "final exception" raised by AbstractSource.
# If it matches, don't yield this as we don't need to show this in the Builder.
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/destinations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
"""The destinations module provides classes for building destination connectors."""

from .destination import Destination
from airbyte_cdk.destinations.destination import Destination

__all__ = [
"Destination",
Expand Down
Loading