Skip to content

feat: Add unprivileged and config-free discover for declarative static schemas #559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cdd1ac9
feat: skip config validation during discovery for sources with Dynami…
devin-ai-integration[bot] Apr 8, 2025
7490ad1
style: fix formatting issues
devin-ai-integration[bot] Apr 8, 2025
9e84e1c
fix: update entrypoint to make --config optional for discovery
devin-ai-integration[bot] Apr 8, 2025
47bd67c
style: fix formatting issues
devin-ai-integration[bot] Apr 8, 2025
36d7f1f
fix: add type annotation for empty_config
devin-ai-integration[bot] Apr 8, 2025
b002218
refactor: use generator comprehension instead of list comprehension
devin-ai-integration[bot] Apr 8, 2025
acbab7e
Update airbyte_cdk/entrypoint.py
aaronsteers Apr 8, 2025
d33dcdd
Update CHANGELOG.md
aaronsteers Apr 8, 2025
4253f28
Update airbyte_cdk/sources/declarative/manifest_declarative_source.py
aaronsteers Apr 8, 2025
64610b9
feat: add check_config_during_discover flag for targeted config valid…
devin-ai-integration[bot] Apr 8, 2025
b228857
style: fix formatting issues
devin-ai-integration[bot] Apr 8, 2025
77772c3
Update CHANGELOG.md
aaronsteers Apr 8, 2025
6ca213c
refactor: push check_config_during_discover flag into connector base …
devin-ai-integration[bot] Apr 8, 2025
dce4f8c
style: fix formatting issues
devin-ai-integration[bot] Apr 8, 2025
24a0919
fix: resolve MyPy type checking issues with check_config_during_disco…
devin-ai-integration[bot] Apr 9, 2025
f920f04
refactor: move check_config_during_discover to BaseConnector class
devin-ai-integration[bot] Apr 9, 2025
f01525f
Update airbyte_cdk/connector.py
aaronsteers Apr 9, 2025
769d361
Update airbyte_cdk/connector.py
aaronsteers Apr 9, 2025
c3cbad8
Update airbyte_cdk/sources/declarative/manifest_declarative_source.py
aaronsteers Apr 9, 2025
3cb8faf
Auto-fix lint and format issues
Apr 9, 2025
08397ad
fix condition direction
aaronsteers Apr 9, 2025
4066c75
fix direction of comparison
aaronsteers May 20, 2025
943ea41
Merge remote-tracking branch 'origin/main' into aj/feat/unprivileged-…
aaronsteers May 20, 2025
fac853c
fix config parse
aaronsteers May 20, 2025
9b37bc5
handle case of missing config
aaronsteers May 20, 2025
640c811
fix: don't assume bad config will fail discover
aaronsteers May 20, 2025
5894a1f
improve read check
aaronsteers May 20, 2025
a79ad83
quote the name
aaronsteers May 20, 2025
46170ad
Auto-fix lint and format issues
May 20, 2025
c8dd910
default to not checking before discover
aaronsteers May 20, 2025
2ce3b99
don't raise on missing error in helper function
aaronsteers May 20, 2025
dd94e86
improve tests
aaronsteers May 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airbyte_cdk/cli/source_declarative_manifest/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ def _parse_inputs_into_config_catalog_state(
]:
config = (
ConcurrentDeclarativeSource.read_config(parsed_args.config)
if hasattr(parsed_args, "config")
else None
if hasattr(parsed_args, "config") and parsed_args.config
else {}
)
catalog = (
ConcurrentDeclarativeSource.read_catalog(parsed_args.catalog)
Expand Down
9 changes: 8 additions & 1 deletion airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,15 @@ def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:


class BaseConnector(ABC, Generic[TConfig]):
# configure whether the `check_config_against_spec_or_exit()` needs to be called
check_config_against_spec: bool = True
"""Configure whether `check_config_against_spec_or_exit()` needs to be called."""

check_config_during_discover: bool = True
"""Determines whether config validation should be skipped during discovery.

By default, config validation is not skipped during discovery. This can be overridden
by sources that can provide catalog information without requiring authentication.
"""

@abstractmethod
def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig:
Expand Down
30 changes: 24 additions & 6 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def parse_args(args: List[str]) -> argparse.Namespace:
)
required_discover_parser = discover_parser.add_argument_group("required named arguments")
required_discover_parser.add_argument(
"--config", type=str, required=True, help="path to the json configuration file"
"--config", type=str, required=False, help="path to the json configuration file"
)
discover_parser.add_argument(
"--manifest-path",
Expand Down Expand Up @@ -177,19 +177,37 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
)
if cmd == "spec":
message = AirbyteMessage(type=Type.SPEC, spec=source_spec)
yield from [
yield from (
self.airbyte_message_to_string(queued_message)
for queued_message in self._emit_queued_messages(self.source)
]
)
yield self.airbyte_message_to_string(message)
elif (
cmd == "discover"
and not parsed_args.config
and not self.source.check_config_during_discover
):
# Connector supports unprivileged discover
empty_config: dict[str, Any] = {}
yield from (
self.airbyte_message_to_string(queued_message)
for queued_message in self._emit_queued_messages(self.source)
)
yield from map(
AirbyteEntrypoint.airbyte_message_to_string,
self.discover(source_spec, empty_config),
)
elif parsed_args.config is None:
# Raise a helpful error message if we reach here with no config.
raise ValueError("The '--config' arg is required but was not provided.")
else:
raw_config = self.source.read_config(parsed_args.config)
config = self.source.configure(raw_config, temp_dir)

yield from [
yield from (
self.airbyte_message_to_string(queued_message)
for queued_message in self._emit_queued_messages(self.source)
]
)
if cmd == "check":
yield from map(
AirbyteEntrypoint.airbyte_message_to_string,
Expand Down Expand Up @@ -261,7 +279,7 @@ def discover(
self, source_spec: ConnectorSpecification, config: TConfig
) -> Iterable[AirbyteMessage]:
self.set_up_secret_filter(config, source_spec.connectionSpecification)
if self.source.check_config_against_spec:
if self.source.check_config_during_discover:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Am I reading this right that this is a manifest level flag? Should it be a spec level definition instead?

Copy link
Contributor Author

@aaronsteers aaronsteers May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bnchrch - I will think through this a bit more. As of now, it is a property of the connector base class as of now (defaulting to requiring check), overwritten by Declarative source to not require by default.

I think the desired behavior is that for declarative sources, we'd not validate config unless dynamic schemas are needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to approve if this is useful for this area.

Certainly would be great to enable for all connectors via something in the spec though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it would take too much to add a proposal

self.validate_connection(source_spec, config)
catalog = self.source.discover(self.logger, config)

Expand Down
38 changes: 38 additions & 0 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
)
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.types import ConnectionDefinition
from airbyte_cdk.sources.utils.slice_logger import (
Expand Down Expand Up @@ -89,6 +90,9 @@ def _get_declarative_component_schema() -> Dict[str, Any]:
class ManifestDeclarativeSource(DeclarativeSource):
"""Declarative source defined by a manifest of low-code components that define source connector behavior"""

check_config_during_discover: bool = False
"""Declarative sources default to not checking config before discovery."""

def __init__(
self,
source_config: ConnectionDefinition,
Expand Down Expand Up @@ -139,6 +143,8 @@ def __init__(
# apply additional post-processing to the manifest
self._post_process_manifest()

self.check_config_during_discover = self._uses_dynamic_schema_loader()

@property
def resolved_manifest(self) -> Mapping[str, Any]:
"""
Expand Down Expand Up @@ -542,3 +548,35 @@ def _dynamic_stream_configs(

def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
self.logger.debug("declarative source created from manifest", extra=extra_args)

def _uses_dynamic_schema_loader(self) -> bool:
"""
Determines if any stream in the source uses a DynamicSchemaLoader.

DynamicSchemaLoader makes a separate call to retrieve schema information,
which might not require authentication, so we can skip config validation
during discovery when it's used.

Returns:
bool: True if any stream uses a DynamicSchemaLoader, False otherwise.
"""
for stream_config in self._stream_configs(self._source_config):
schema_loader = stream_config.get("schema_loader", {})
if (
isinstance(schema_loader, dict)
and schema_loader.get("type") == "DynamicSchemaLoader"
):
return True

dynamic_streams = self._source_config.get("dynamic_streams", [])
if dynamic_streams:
for dynamic_stream in dynamic_streams:
stream_template = dynamic_stream.get("stream_template", {})
schema_loader = stream_template.get("schema_loader", {})
if (
isinstance(schema_loader, dict)
and schema_loader.get("type") == "DynamicSchemaLoader"
):
return True

return False
14 changes: 4 additions & 10 deletions airbyte_cdk/test/standard_tests/_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,9 @@ def run_test_job(

return result

# For all other verbs, we assert check that an exception is raised (or not).
if test_scenario.expect_exception:
if not result.errors:
raise AssertionError("Expected exception but got none.")

return result

assert not result.errors, (
f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result)
)
if not test_scenario.expect_exception:
assert not result.errors, (
f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result)
)

return result
26 changes: 17 additions & 9 deletions airbyte_cdk/test/standard_tests/connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
from typing import cast

import yaml
from airbyte_protocol_dataclasses.models.airbyte_protocol import AirbyteConnectionStatus
from boltons.typeutils import classproperty

from airbyte_cdk.models import (
AirbyteMessage,
Type,
)
from airbyte_cdk.models import Status
from airbyte_cdk.test import entrypoint_wrapper
from airbyte_cdk.test.standard_tests._job_runner import IConnector, run_test_job
from airbyte_cdk.test.standard_tests.models import (
Expand Down Expand Up @@ -117,12 +115,22 @@ def test_check(
"check",
test_scenario=scenario,
)
conn_status_messages: list[AirbyteMessage] = [
msg for msg in result._messages if msg.type == Type.CONNECTION_STATUS
] # noqa: SLF001 # Non-public API
assert len(conn_status_messages) == 1, (
f"Expected exactly one CONNECTION_STATUS message. Got: {result._messages}"
assert len(result.connection_status_messages) == 1, (
f"Expected exactly one CONNECTION_STATUS message. Got {len(result.connection_status_messages)}: \n"
+ "\n".join([str(m) for m in result._messages])
+ "\nErrors: "
+ str(result.errors)
or "None"
)
conn_status = cast(
AirbyteConnectionStatus, result.connection_status_messages[0].connectionStatus
)
if (
scenario.expect_exception
and conn_status.status == Status.SUCCEEDED
and not result.errors
):
raise AssertionError(f"Expected error in `check` but got success.")

@classmethod
def get_connector_root_dir(cls) -> Path:
Expand Down
28 changes: 25 additions & 3 deletions airbyte_cdk/test/standard_tests/source_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from dataclasses import asdict

import pytest

from airbyte_cdk.models import (
AirbyteMessage,
AirbyteStream,
Expand Down Expand Up @@ -58,6 +60,10 @@ def test_discover(
scenario: ConnectorTestScenario,
) -> None:
"""Standard test for `discover`."""
if scenario.expect_exception:
pytest.skip(
"Skipping `discover` test because the scenario is expected to raise an exception."
)
run_test_job(
self.create_connector(scenario),
"discover",
Expand Down Expand Up @@ -99,13 +105,22 @@ def test_basic_read(
obtain the catalog of streams, and then it runs a `read` job to fetch
records from those streams.
"""
check_result: entrypoint_wrapper.EntrypointOutput = run_test_job(
self.create_connector(scenario),
"check",
test_scenario=scenario,
)
if scenario.expect_exception and check_result.errors:
# Expected failure and we got it. Return early.
return

discover_result = run_test_job(
self.create_connector(scenario),
"discover",
test_scenario=scenario,
)
if scenario.expect_exception:
assert discover_result.errors, "Expected exception but got none."
if scenario.expect_exception and check_result.errors:
# Expected failure and we got it. Return early.
return

configured_catalog = ConfiguredAirbyteCatalog(
Expand All @@ -124,6 +139,9 @@ def test_basic_read(
test_scenario=scenario,
catalog=configured_catalog,
)
if scenario.expect_exception and not result.errors:
# By now we should have raised an exception.
raise AssertionError("Expected an error but got none.")

if not result.records:
raise AssertionError("Expected records but got none.") # noqa: TRY003
Expand All @@ -133,6 +151,11 @@ def test_fail_read_with_bad_catalog(
scenario: ConnectorTestScenario,
) -> None:
"""Standard test for `read` when passed a bad catalog file."""
# Recreate the scenario with the same config but set the status to "failed".
scenario = ConnectorTestScenario(
config_dict=scenario.get_config_dict(empty_if_missing=False),
status="failed",
)
invalid_configured_catalog = ConfiguredAirbyteCatalog(
streams=[
# Create ConfiguredAirbyteStream which is deliberately invalid
Expand All @@ -153,7 +176,6 @@ def test_fail_read_with_bad_catalog(
]
)
# Set expected status to "failed" to ensure the test fails if the connector.
scenario.status = "failed"
result: entrypoint_wrapper.EntrypointOutput = run_test_job(
self.create_connector(scenario),
"read",
Expand Down
Loading
Loading