-
Notifications
You must be signed in to change notification settings - Fork 21
Feat: Add Hidden-Check Streams #585
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Adds support for defining “check-only” streams inline in the CheckStream
manifest, so they don’t appear in user-facing syncs.
- Updates the Pydantic schema to accept either a stream name or a full
DeclarativeStream
instream_names
. - Implements a factory method in
DeclarativeSource
to build inline stream objects from dicts. - Refactors
CheckStream.check_connection
to instantiate and validate inline streams alongside catalog streams.
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
File | Description |
---|---|
airbyte_cdk/sources/declarative/models/declarative_component_schema.py | Changed stream_names to List[Union[str, DeclarativeStream]] and updated examples |
airbyte_cdk/sources/declarative/declarative_source.py | Added _instantiate_stream_from_dict to create streams from inline definitions |
airbyte_cdk/sources/declarative/models/declarative_component_schema.yaml | Modified YAML schema to allow anyOf: string or DeclarativeStream for items |
airbyte_cdk/sources/declarative/checks/check_stream.py | Expanded check_connection to handle and validate inline stream definitions |
@@ -1696,10 +1698,30 @@ class AuthFlow(BaseModel): | |||
|
|||
class CheckStream(BaseModel): | |||
type: Literal["CheckStream"] | |||
stream_names: Optional[List[str]] = Field( | |||
stream_names: List[Union[str, "DeclarativeStream"]] = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field has a default of None but is typed as a non-optional List, which can cause validation errors. Change the annotation to Optional[List[Union[str, DeclarativeStream]]] or remove the None default.
stream_names: List[Union[str, "DeclarativeStream"]] = Field( | |
stream_names: Optional[List[Union[str, "DeclarativeStream"]]] = Field( |
Copilot uses AI. Check for mistakes.
return True, None | ||
return True, None | ||
except Exception as error: | ||
return self._log_error(logger, "discovering streams", error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This catch-all block still logs its action as "discovering streams" even though it now wraps the entire check logic. Consider using a more accurate action string or narrowing the try/except scope.
return self._log_error(logger, "discovering streams", error) | |
return self._log_error(logger, "checking connection", error) |
Copilot uses AI. Check for mistakes.
stream_name_to_stream = {s.name: s for s in streams} | ||
|
||
# Add inline check-only streams to the map | ||
for stream_def in self.stream_names: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The code iterates over self.stream_names
twice (once to instantiate inline streams, once to check availability). Consider consolidating these loops to reduce duplication and improve readability.
for stream_def in self.stream_names: | |
for stream_def in self.stream_names: | |
stream_obj = None |
Copilot uses AI. Check for mistakes.
PyTest Results (Fast)564 tests - 3 093 554 ✅ - 3 093 3m 5s ⏱️ - 2m 42s For more details on these failures, see this check. Results for commit 138fc60. ± Comparison against base commit acc1003. This pull request removes 3093 tests.
|
📝 WalkthroughWalkthroughThe updates expand the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant CheckStream
participant DeclarativeSource
participant ModelToComponentFactory
User->>CheckStream: check_connection(source, logger, config)
CheckStream->>DeclarativeSource: get all streams (discovered + inline)
alt Inline stream (dict)
CheckStream->>DeclarativeSource: _instantiate_stream_from_dict(stream_def, config)
DeclarativeSource->>ModelToComponentFactory: create_component(DeclarativeStreamModel, ...)
ModelToComponentFactory-->>DeclarativeSource: DeclarativeStream instance
DeclarativeSource-->>CheckStream: DeclarativeStream instance
end
CheckStream->>CheckStream: Validate stream availability
CheckStream-->>User: Return check result (success/failure)
Suggested labels
Suggested reviewers
Would you like to add a test or example demonstrating the new inline stream definition capability in action, wdyt? ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
334-346
: Consider adding a mixed string/object example?
Would it help to show a sample list combining both string references and inline objects in onestream_names
array to illustrate realistic usage? wdyt?examples: - ["users"] - ["users", "contacts"] - name: "check_only_stream" type: DeclarativeStream retriever: type: SimpleRetriever requester: type: HttpRequester url_base: "https://api.example.com" record_selector: type: RecordSelector extractor: type: DpathExtractor field_path: [] + - ["users", + { name: "check_only_stream", + type: DeclarativeStream, + retriever: + type: SimpleRetriever, + requester: + type: HttpRequester, + url_base: "https://api.example.com", + record_selector: + type: RecordSelector, + extractor: + type: DpathExtractor, + field_path: [] + } + ]airbyte_cdk/sources/declarative/checks/check_stream.py (1)
72-72
: Consider alternatives to hasattr for feature detectionUsing
hasattr
to check for method existence is fragile and makes the code harder to maintain. Have you considered:
- Making
_instantiate_stream_from_dict
a public method in the base class- Using a protocol/interface to declare this capability
- Adding it as an abstract method with a default implementation
This would make the contract more explicit and type-safe. wdyt?
Also applies to: 81-81
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/declarative/checks/check_stream.py
(3 hunks)airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(1 hunks)airbyte_cdk/sources/declarative/declarative_source.py
(2 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(3 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/declarative_source.py
[error] 53-53: mypy: Function is missing a return type annotation. (no-untyped-def)
[error] 53-53: mypy: Missing type parameters for generic type "dict". (type-arg)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2)
324-327
: CheckStreamstream_names
description is clear
The updated description accurately conveys that items can be string references or inlineDeclarativeStream
objects.
328-331
:items.anyOf
correctly allows strings or inline streams
The schema now properly accepts either a string or aDeclarativeStream
object for each entry.airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
1-2
: LGTM! Standard copyright header addition.The copyright header follows the expected format and is appropriately placed at the top of the file.
1701-1726
: Verify the breaking change from optional to required field, wdyt?The type change from
Optional[List[str]]
toList[Union[str, "DeclarativeStream"]]
removes the optional nature of the field, which could be a breaking change for existing configurations. Is this intentional?The expanded functionality looks great for supporting inline stream definitions, and the examples are comprehensive. The forward reference to
"DeclarativeStream"
is correctly handled.Could you confirm that removing the optional nature aligns with the intended API design? If all CheckStream configurations should now specify stream_names, this makes sense. Otherwise, we might want to preserve the Optional wrapper.
2970-2970
: LGTM! Necessary forward reference update.The
CheckStream.update_forward_refs()
call is correctly added to resolve the forward reference to"DeclarativeStream"
introduced in the CheckStream class. This is the proper way to handle circular references in Pydantic models.
def _instantiate_stream_from_dict(self, stream_def: dict, config: Mapping[str, Any]): | ||
""" | ||
Instantiates a stream from a stream definition dict (used for check-only streams). | ||
""" | ||
factory = ModelToComponentFactory() | ||
return factory.create_component( | ||
model_type=DeclarativeStreamModel, | ||
component_definition=stream_def, | ||
config=config, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix type annotations for mypy compliance
The method is missing type annotations that are causing mypy failures. Also, since this is a private method that's used by CheckStream
via hasattr
, have you considered making it part of the public API to avoid the fragile feature detection pattern? wdyt?
-def _instantiate_stream_from_dict(self, stream_def: dict, config: Mapping[str, Any]):
+def _instantiate_stream_from_dict(self, stream_def: Dict[str, Any], config: Mapping[str, Any]) -> Any:
Also add the import at the top:
-from typing import Any, List, Mapping, Tuple
+from typing import Any, Dict, List, Mapping, Tuple
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def _instantiate_stream_from_dict(self, stream_def: dict, config: Mapping[str, Any]): | |
""" | |
Instantiates a stream from a stream definition dict (used for check-only streams). | |
""" | |
factory = ModelToComponentFactory() | |
return factory.create_component( | |
model_type=DeclarativeStreamModel, | |
component_definition=stream_def, | |
config=config, | |
) | |
# at the top of the file | |
from typing import Any, Dict, List, Mapping, Tuple | |
def _instantiate_stream_from_dict( | |
self, | |
stream_def: Dict[str, Any], | |
config: Mapping[str, Any], | |
) -> Any: | |
""" | |
Instantiates a stream from a stream definition dict (used for check-only streams). | |
""" | |
factory = ModelToComponentFactory() | |
return factory.create_component( | |
model_type=DeclarativeStreamModel, | |
component_definition=stream_def, | |
config=config, | |
) |
🧰 Tools
🪛 GitHub Actions: Linters
[error] 53-53: mypy: Function is missing a return type annotation. (no-untyped-def)
[error] 53-53: mypy: Missing type parameters for generic type "dict". (type-arg)
🤖 Prompt for AI Agents
In airbyte_cdk/sources/declarative/declarative_source.py around lines 53 to 62,
add precise type annotations to the _instantiate_stream_from_dict method
signature to satisfy mypy, specifying the return type and parameter types
clearly. Additionally, consider renaming this method to remove the leading
underscore to make it part of the public API, which will avoid fragile feature
detection via hasattr. Finally, add the necessary import statements at the top
of the file to support the type annotations used.
""" | ||
|
||
stream_names: List[str] | ||
stream_names: List[Union[str, DeclarativeStream]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type annotation doesn't match docstring
The docstring mentions that stream_names
can contain dicts (inline DeclarativeStream definitions from YAML), but the type annotation only includes str
and DeclarativeStream
. Should we update it to include dict
as well? wdyt?
-stream_names: List[Union[str, DeclarativeStream]]
+stream_names: List[Union[str, Dict[str, Any], DeclarativeStream]]
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
stream_names: List[Union[str, DeclarativeStream]] | |
- stream_names: List[Union[str, DeclarativeStream]] | |
+ stream_names: List[Union[str, Dict[str, Any], DeclarativeStream]] |
🤖 Prompt for AI Agents
In airbyte_cdk/sources/declarative/checks/check_stream.py at line 39, the type
annotation for stream_names currently includes only str and DeclarativeStream,
but the docstring states it can also contain dicts representing inline
DeclarativeStream definitions from YAML. Update the type annotation to include
dict as well, changing it to List[Union[str, DeclarativeStream, dict]] to
accurately reflect the possible types.
if isinstance(stream_def, dict): | ||
if hasattr(source, "_instantiate_stream_from_dict"): | ||
stream_obj = source._instantiate_stream_from_dict(stream_def, config) | ||
stream_name_to_stream[stream_obj.name] = stream_obj | ||
else: | ||
raise NotImplementedError( | ||
f"Source {type(source)} does not support inline stream definitions for check-only streams." | ||
) | ||
elif isinstance(stream_def, DeclarativeStream): | ||
# Convert the Pydantic model to dict before passing to the factory | ||
if hasattr(source, "_instantiate_stream_from_dict"): | ||
stream_obj = source._instantiate_stream_from_dict(stream_def.dict(), config) | ||
stream_name_to_stream[stream_obj.name] = stream_obj | ||
else: | ||
raise NotImplementedError( | ||
f"Source {type(source)} does not support inline stream definitions for check-only streams." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refactor to eliminate code duplication
There's significant duplication between handling dict and DeclarativeStream types. What about extracting this into a helper method to make the code more DRY? wdyt?
-if isinstance(stream_def, dict):
- if hasattr(source, "_instantiate_stream_from_dict"):
- stream_obj = source._instantiate_stream_from_dict(stream_def, config)
- stream_name_to_stream[stream_obj.name] = stream_obj
- else:
- raise NotImplementedError(
- f"Source {type(source)} does not support inline stream definitions for check-only streams."
- )
-elif isinstance(stream_def, DeclarativeStream):
- # Convert the Pydantic model to dict before passing to the factory
- if hasattr(source, "_instantiate_stream_from_dict"):
- stream_obj = source._instantiate_stream_from_dict(stream_def.dict(), config)
- stream_name_to_stream[stream_obj.name] = stream_obj
- else:
- raise NotImplementedError(
- f"Source {type(source)} does not support inline stream definitions for check-only streams."
- )
+if isinstance(stream_def, (dict, DeclarativeStream)):
+ if not hasattr(source, "_instantiate_stream_from_dict"):
+ raise NotImplementedError(
+ f"Source {type(source)} does not support inline stream definitions for check-only streams."
+ )
+ stream_dict = stream_def if isinstance(stream_def, dict) else stream_def.dict()
+ stream_obj = source._instantiate_stream_from_dict(stream_dict, config)
+ stream_name_to_stream[stream_obj.name] = stream_obj
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if isinstance(stream_def, dict): | |
if hasattr(source, "_instantiate_stream_from_dict"): | |
stream_obj = source._instantiate_stream_from_dict(stream_def, config) | |
stream_name_to_stream[stream_obj.name] = stream_obj | |
else: | |
raise NotImplementedError( | |
f"Source {type(source)} does not support inline stream definitions for check-only streams." | |
) | |
elif isinstance(stream_def, DeclarativeStream): | |
# Convert the Pydantic model to dict before passing to the factory | |
if hasattr(source, "_instantiate_stream_from_dict"): | |
stream_obj = source._instantiate_stream_from_dict(stream_def.dict(), config) | |
stream_name_to_stream[stream_obj.name] = stream_obj | |
else: | |
raise NotImplementedError( | |
f"Source {type(source)} does not support inline stream definitions for check-only streams." | |
) | |
if isinstance(stream_def, (dict, DeclarativeStream)): | |
if not hasattr(source, "_instantiate_stream_from_dict"): | |
raise NotImplementedError( | |
f"Source {type(source)} does not support inline stream definitions for check-only streams." | |
) | |
stream_dict = stream_def if isinstance(stream_def, dict) else stream_def.dict() | |
stream_obj = source._instantiate_stream_from_dict(stream_dict, config) | |
stream_name_to_stream[stream_obj.name] = stream_obj |
🤖 Prompt for AI Agents
In airbyte_cdk/sources/declarative/checks/check_stream.py around lines 71 to 87,
the code handling stream_def when it is a dict or a DeclarativeStream is
duplicated. Refactor by extracting the common logic of checking for
_instantiate_stream_from_dict and creating the stream object into a helper
method that accepts the stream definition (converted to dict if needed) and
config. Replace the duplicated blocks with calls to this helper to make the code
DRY and easier to maintain.
PyTest Results (Full)3 660 tests ±0 3 647 ✅ - 3 17m 24s ⏱️ +4s For more details on these failures, see this check. Results for commit 138fc60. ± Comparison against base commit acc1003. |
Context:
Some streams (in this case, HubSpot) have Dynamic Schema Loading, which makes them very resource-intensive to run CHECK. (See ref. OC Issue) We've temporarily increased the memory for CHECK, but this could incur a high cost for us if it remains in place permanently.
Solution: Create a less resource-intensive stream for the connector that can be used for CHECK only.
Issue we are trying to solve:
Currently, if the stream isn't part of the catalog,
streams
component of the manifest, it cannot be used for check.We need to determine a way to create 'check-only' streams without exposing them to the user as streams they can sync.
Proposed solution:
Currently,
CheckStream
allows for an Array of Strings. Expand this to allow an array of Strings or an inlineDeclarativeStream
object.Example current implementation (Source-Intercom used for this):
Example with the proposed solution:
This will keep the defined stream in
CheckStream
hidden from theStreams
component in the manifest, thus preventing users from syncing it.Summary by CodeRabbit
New Features
Documentation