Skip to content

Feat: Add Hidden-Check Streams #585

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 71 additions & 28 deletions airbyte_cdk/sources/declarative/checks/check_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import logging
import traceback
from dataclasses import InitVar, dataclass
from typing import Any, Dict, List, Mapping, Optional, Tuple
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union

from airbyte_cdk import AbstractSource
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.models.declarative_component_schema import DeclarativeStream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy


Expand All @@ -25,13 +26,17 @@ class DynamicStreamCheckConfig:
@dataclass
class CheckStream(ConnectionChecker):
"""
Checks the connections by checking availability of one or many streams selected by the developer
Checks the connection by checking the availability of one or more streams specified by the developer.

Attributes:
stream_name (List[str]): names of streams to check
stream_names (List[Union[str, DeclarativeStream]]):
Names of streams to check. Each item can be:
- a string (referencing a stream in the manifest's streams block)
- a dict (an inline DeclarativeStream definition from YAML)
- a DeclarativeStream Pydantic model (from parsed manifest)
"""

stream_names: List[str]
stream_names: List[Union[str, DeclarativeStream]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Type annotation doesn't match docstring

The docstring mentions that stream_names can contain dicts (inline DeclarativeStream definitions from YAML), but the type annotation only includes str and DeclarativeStream. Should we update it to include dict as well? wdyt?

-stream_names: List[Union[str, DeclarativeStream]]
+stream_names: List[Union[str, Dict[str, Any], DeclarativeStream]]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
stream_names: List[Union[str, DeclarativeStream]]
- stream_names: List[Union[str, DeclarativeStream]]
+ stream_names: List[Union[str, Dict[str, Any], DeclarativeStream]]
🤖 Prompt for AI Agents
In airbyte_cdk/sources/declarative/checks/check_stream.py at line 39, the type
annotation for stream_names currently includes only str and DeclarativeStream,
but the docstring states it can also contain dicts representing inline
DeclarativeStream definitions from YAML. Update the type annotation to include
dict as well, changing it to List[Union[str, DeclarativeStream, dict]] to
accurately reflect the possible types.

parameters: InitVar[Mapping[str, Any]]
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None

Expand All @@ -49,37 +54,75 @@ def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> T
def check_connection(
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
) -> Tuple[bool, Any]:
"""Checks the connection to the source and its streams."""
"""
Checks the connection to the source and its streams.

Handles both:
- Referenced streams (by name)
- Inline check-only streams (as dicts or DeclarativeStream models)
"""
try:
streams = source.streams(config=config)
if not streams:
return False, f"No streams to connect to from source {source}"
except Exception as error:
return self._log_error(logger, "discovering streams", error)

stream_name_to_stream = {s.name: s for s in streams}
for stream_name in self.stream_names:
if stream_name not in stream_name_to_stream:
raise ValueError(
f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}."
stream_name_to_stream = {s.name: s for s in streams}

# Add inline check-only streams to the map
for stream_def in self.stream_names:
Copy link
Preview

Copilot AI Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The code iterates over self.stream_names twice (once to instantiate inline streams, once to check availability). Consider consolidating these loops to reduce duplication and improve readability.

Suggested change
for stream_def in self.stream_names:
for stream_def in self.stream_names:
stream_obj = None

Copilot uses AI. Check for mistakes.

# Handle dicts (from YAML) and DeclarativeStream objects (from Pydantic)
if isinstance(stream_def, dict):
if hasattr(source, "_instantiate_stream_from_dict"):
stream_obj = source._instantiate_stream_from_dict(stream_def, config)
stream_name_to_stream[stream_obj.name] = stream_obj
else:
raise NotImplementedError(
f"Source {type(source)} does not support inline stream definitions for check-only streams."
)
elif isinstance(stream_def, DeclarativeStream):
# Convert the Pydantic model to dict before passing to the factory
if hasattr(source, "_instantiate_stream_from_dict"):
stream_obj = source._instantiate_stream_from_dict(stream_def.dict(), config)
stream_name_to_stream[stream_obj.name] = stream_obj
else:
raise NotImplementedError(
f"Source {type(source)} does not support inline stream definitions for check-only streams."
)
Comment on lines +71 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor to eliminate code duplication

There's significant duplication between handling dict and DeclarativeStream types. What about extracting this into a helper method to make the code more DRY? wdyt?

-if isinstance(stream_def, dict):
-    if hasattr(source, "_instantiate_stream_from_dict"):
-        stream_obj = source._instantiate_stream_from_dict(stream_def, config)
-        stream_name_to_stream[stream_obj.name] = stream_obj
-    else:
-        raise NotImplementedError(
-            f"Source {type(source)} does not support inline stream definitions for check-only streams."
-        )
-elif isinstance(stream_def, DeclarativeStream):
-    # Convert the Pydantic model to dict before passing to the factory
-    if hasattr(source, "_instantiate_stream_from_dict"):
-        stream_obj = source._instantiate_stream_from_dict(stream_def.dict(), config)
-        stream_name_to_stream[stream_obj.name] = stream_obj
-    else:
-        raise NotImplementedError(
-            f"Source {type(source)} does not support inline stream definitions for check-only streams."
-        )
+if isinstance(stream_def, (dict, DeclarativeStream)):
+    if not hasattr(source, "_instantiate_stream_from_dict"):
+        raise NotImplementedError(
+            f"Source {type(source)} does not support inline stream definitions for check-only streams."
+        )
+    stream_dict = stream_def if isinstance(stream_def, dict) else stream_def.dict()
+    stream_obj = source._instantiate_stream_from_dict(stream_dict, config)
+    stream_name_to_stream[stream_obj.name] = stream_obj
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if isinstance(stream_def, dict):
if hasattr(source, "_instantiate_stream_from_dict"):
stream_obj = source._instantiate_stream_from_dict(stream_def, config)
stream_name_to_stream[stream_obj.name] = stream_obj
else:
raise NotImplementedError(
f"Source {type(source)} does not support inline stream definitions for check-only streams."
)
elif isinstance(stream_def, DeclarativeStream):
# Convert the Pydantic model to dict before passing to the factory
if hasattr(source, "_instantiate_stream_from_dict"):
stream_obj = source._instantiate_stream_from_dict(stream_def.dict(), config)
stream_name_to_stream[stream_obj.name] = stream_obj
else:
raise NotImplementedError(
f"Source {type(source)} does not support inline stream definitions for check-only streams."
)
if isinstance(stream_def, (dict, DeclarativeStream)):
if not hasattr(source, "_instantiate_stream_from_dict"):
raise NotImplementedError(
f"Source {type(source)} does not support inline stream definitions for check-only streams."
)
stream_dict = stream_def if isinstance(stream_def, dict) else stream_def.dict()
stream_obj = source._instantiate_stream_from_dict(stream_dict, config)
stream_name_to_stream[stream_obj.name] = stream_obj
🤖 Prompt for AI Agents
In airbyte_cdk/sources/declarative/checks/check_stream.py around lines 71 to 87,
the code handling stream_def when it is a dict or a DeclarativeStream is
duplicated. Refactor by extracting the common logic of checking for
_instantiate_stream_from_dict and creating the stream object into a helper
method that accepts the stream definition (converted to dict if needed) and
config. Replace the duplicated blocks with calls to this helper to make the code
DRY and easier to maintain.

# Optionally: warn if stream_def is an unexpected type
elif not isinstance(stream_def, str):
logger.warning(f"Unexpected stream definition type: {type(stream_def)}")

# Now check availability
for stream_def in self.stream_names:
if isinstance(stream_def, dict):
stream_name = stream_def.get("name")
elif hasattr(stream_def, "name"): # DeclarativeStream object
stream_name = stream_def.name
else:
stream_name = stream_def # string

if stream_name not in stream_name_to_stream:
raise ValueError(
f"{stream_name} is not part of the catalog or check-only streams. Expected one of {list(stream_name_to_stream.keys())}."
)

stream_availability, message = self._check_stream_availability(
stream_name_to_stream, stream_name, logger
)
if not stream_availability:
return stream_availability, message

stream_availability, message = self._check_stream_availability(
stream_name_to_stream, stream_name, logger
should_check_dynamic_streams = (
hasattr(source, "resolved_manifest")
and hasattr(source, "dynamic_streams")
and self.dynamic_streams_check_configs
)
if not stream_availability:
return stream_availability, message

should_check_dynamic_streams = (
hasattr(source, "resolved_manifest")
and hasattr(source, "dynamic_streams")
and self.dynamic_streams_check_configs
)

if should_check_dynamic_streams:
return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger)
if should_check_dynamic_streams:
return self._check_dynamic_streams_availability(
source, stream_name_to_stream, logger
)

return True, None
return True, None
except Exception as error:
return self._log_error(logger, "discovering streams", error)
Copy link
Preview

Copilot AI Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catch-all block still logs its action as "discovering streams" even though it now wraps the entire check logic. Consider using a more accurate action string or narrowing the try/except scope.

Suggested change
return self._log_error(logger, "discovering streams", error)
return self._log_error(logger, "checking connection", error)

Copilot uses AI. Check for mistakes.


def _check_stream_availability(
self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger
Expand Down
18 changes: 16 additions & 2 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,27 @@ definitions:
enum: [CheckStream]
stream_names:
title: Stream Names
description: Names of the streams to try reading from when running a check operation.
description: Names of the streams to try reading from when running a check operation. Each item can be a string (referencing a stream in the streams block) or an inline DeclarativeStream object (for check-only streams).
type: array
items:
type: string
anyOf:
- type: string
- $ref: "#/definitions/DeclarativeStream"
examples:
- ["users"]
- ["users", "contacts"]
- name: "check_only_stream"
type: DeclarativeStream
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: "https://api.example.com"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: []
dynamic_streams_check_configs:
type: array
items:
Expand Down
17 changes: 17 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
)
from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DeclarativeStream as DeclarativeStreamModel,
)
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
ModelToComponentFactory,
)


class DeclarativeSource(AbstractSource):
Expand Down Expand Up @@ -43,3 +49,14 @@ def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
Returns a list of deprecation warnings for the source.
"""
return []

def _instantiate_stream_from_dict(self, stream_def: dict, config: Mapping[str, Any]):
"""
Instantiates a stream from a stream definition dict (used for check-only streams).
"""
factory = ModelToComponentFactory()
return factory.create_component(
model_type=DeclarativeStreamModel,
component_definition=stream_def,
config=config,
)
Comment on lines +53 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix type annotations for mypy compliance

The method is missing type annotations that are causing mypy failures. Also, since this is a private method that's used by CheckStream via hasattr, have you considered making it part of the public API to avoid the fragile feature detection pattern? wdyt?

-def _instantiate_stream_from_dict(self, stream_def: dict, config: Mapping[str, Any]):
+def _instantiate_stream_from_dict(self, stream_def: Dict[str, Any], config: Mapping[str, Any]) -> Any:

Also add the import at the top:

-from typing import Any, List, Mapping, Tuple
+from typing import Any, Dict, List, Mapping, Tuple
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _instantiate_stream_from_dict(self, stream_def: dict, config: Mapping[str, Any]):
"""
Instantiates a stream from a stream definition dict (used for check-only streams).
"""
factory = ModelToComponentFactory()
return factory.create_component(
model_type=DeclarativeStreamModel,
component_definition=stream_def,
config=config,
)
# at the top of the file
from typing import Any, Dict, List, Mapping, Tuple
def _instantiate_stream_from_dict(
self,
stream_def: Dict[str, Any],
config: Mapping[str, Any],
) -> Any:
"""
Instantiates a stream from a stream definition dict (used for check-only streams).
"""
factory = ModelToComponentFactory()
return factory.create_component(
model_type=DeclarativeStreamModel,
component_definition=stream_def,
config=config,
)
🧰 Tools
🪛 GitHub Actions: Linters

[error] 53-53: mypy: Function is missing a return type annotation. (no-untyped-def)


[error] 53-53: mypy: Missing type parameters for generic type "dict". (type-arg)

🤖 Prompt for AI Agents
In airbyte_cdk/sources/declarative/declarative_source.py around lines 53 to 62,
add precise type annotations to the _instantiate_stream_from_dict method
signature to satisfy mypy, specifying the return type and parameter types
clearly. Additionally, consider renaming this method to remove the leading
underscore to make it part of the public API, which will avoid fragile feature
detection via hasattr. Finally, add the necessary import statements at the top
of the file to support the type annotations used.

Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -1696,10 +1698,30 @@ class AuthFlow(BaseModel):

class CheckStream(BaseModel):
type: Literal["CheckStream"]
stream_names: Optional[List[str]] = Field(
stream_names: List[Union[str, "DeclarativeStream"]] = Field(
Copy link
Preview

Copilot AI Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field has a default of None but is typed as a non-optional List, which can cause validation errors. Change the annotation to Optional[List[Union[str, DeclarativeStream]]] or remove the None default.

Suggested change
stream_names: List[Union[str, "DeclarativeStream"]] = Field(
stream_names: Optional[List[Union[str, "DeclarativeStream"]]] = Field(

Copilot uses AI. Check for mistakes.

None,
description="Names of the streams to try reading from when running a check operation.",
examples=[["users"], ["users", "contacts"]],
examples=[
["users"],
["users", "contacts"],
[
{
"name": "check_only_stream",
"type": "DeclarativeStream",
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.example.com",
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
},
}
],
],
title="Stream Names",
)
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
Expand Down Expand Up @@ -2945,3 +2967,4 @@ class DynamicDeclarativeStream(BaseModel):
PropertiesFromEndpoint.update_forward_refs()
SimpleRetriever.update_forward_refs()
AsyncRetriever.update_forward_refs()
CheckStream.update_forward_refs()
Loading