Skip to content

Commit caa1e7d

Browse files
lazebnyioctavia-squidington-iii
and
octavia-squidington-iii
authored
feat(low-code cdk): add dynamic stream config to check connection (#450)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 58be2b9 commit caa1e7d

File tree

6 files changed

+732
-24
lines changed

6 files changed

+732
-24
lines changed

airbyte_cdk/sources/declarative/checks/__init__.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
from pydantic.v1 import BaseModel
88

99
from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream
10-
from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream
10+
from airbyte_cdk.sources.declarative.checks.check_stream import (
11+
CheckStream,
12+
DynamicStreamCheckConfig,
13+
)
1114
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
1215
from airbyte_cdk.sources.declarative.models import (
1316
CheckDynamicStream as CheckDynamicStreamModel,
@@ -21,4 +24,4 @@
2124
"CheckDynamicStream": CheckDynamicStreamModel,
2225
}
2326

24-
__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker"]
27+
__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker", "DynamicStreamCheckConfig"]

airbyte_cdk/sources/declarative/checks/check_stream.py

+113-11
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,23 @@
55
import logging
66
import traceback
77
from dataclasses import InitVar, dataclass
8-
from typing import Any, List, Mapping, Tuple
8+
from typing import Any, Dict, List, Mapping, Optional, Tuple
99

1010
from airbyte_cdk import AbstractSource
1111
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
1212
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
1313

1414

15+
@dataclass(frozen=True)
16+
class DynamicStreamCheckConfig:
17+
"""Defines the configuration for dynamic stream during connection checking. This class specifies
18+
what dynamic streams in the stream template should be updated with value, supporting dynamic interpolation
19+
and type enforcement."""
20+
21+
dynamic_stream_name: str
22+
stream_count: int = 0
23+
24+
1525
@dataclass
1626
class CheckStream(ConnectionChecker):
1727
"""
@@ -23,34 +33,126 @@ class CheckStream(ConnectionChecker):
2333

2434
stream_names: List[str]
2535
parameters: InitVar[Mapping[str, Any]]
36+
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
2637

2738
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
2839
self._parameters = parameters
40+
if self.dynamic_streams_check_configs is None:
41+
self.dynamic_streams_check_configs = []
42+
43+
def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> Tuple[bool, str]:
44+
"""Logs an error and returns a formatted error message."""
45+
error_message = f"Encountered an error while {action}. Error: {error}"
46+
logger.error(error_message + f"Error traceback: \n {traceback.format_exc()}", exc_info=True)
47+
return False, error_message
2948

3049
def check_connection(
3150
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
3251
) -> Tuple[bool, Any]:
33-
streams = source.streams(config=config)
52+
"""Checks the connection to the source and its streams."""
53+
try:
54+
streams = source.streams(config=config)
55+
if not streams:
56+
return False, f"No streams to connect to from source {source}"
57+
except Exception as error:
58+
return self._log_error(logger, "discovering streams", error)
59+
3460
stream_name_to_stream = {s.name: s for s in streams}
35-
if len(streams) == 0:
36-
return False, f"No streams to connect to from source {source}"
3761
for stream_name in self.stream_names:
38-
if stream_name not in stream_name_to_stream.keys():
62+
if stream_name not in stream_name_to_stream:
3963
raise ValueError(
40-
f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}."
64+
f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}."
4165
)
4266

67+
stream_availability, message = self._check_stream_availability(
68+
stream_name_to_stream, stream_name, logger
69+
)
70+
if not stream_availability:
71+
return stream_availability, message
72+
73+
should_check_dynamic_streams = (
74+
hasattr(source, "resolved_manifest")
75+
and hasattr(source, "dynamic_streams")
76+
and self.dynamic_streams_check_configs
77+
)
78+
79+
if should_check_dynamic_streams:
80+
return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger)
81+
82+
return True, None
83+
84+
def _check_stream_availability(
85+
self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger
86+
) -> Tuple[bool, Any]:
87+
"""Checks if streams are available."""
88+
availability_strategy = HttpAvailabilityStrategy()
89+
try:
4390
stream = stream_name_to_stream[stream_name]
44-
availability_strategy = HttpAvailabilityStrategy()
91+
stream_is_available, reason = availability_strategy.check_availability(stream, logger)
92+
if not stream_is_available:
93+
message = f"Stream {stream_name} is not available: {reason}"
94+
logger.warning(message)
95+
return stream_is_available, message
96+
except Exception as error:
97+
return self._log_error(logger, f"checking availability of stream {stream_name}", error)
98+
return True, None
99+
100+
def _check_dynamic_streams_availability(
101+
self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger
102+
) -> Tuple[bool, Any]:
103+
"""Checks the availability of dynamic streams."""
104+
dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method
105+
dynamic_stream_name_to_dynamic_stream = {
106+
ds.get("name", f"dynamic_stream_{i}"): ds for i, ds in enumerate(dynamic_streams)
107+
}
108+
generated_streams = self._map_generated_streams(source.dynamic_streams) # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method
109+
110+
for check_config in self.dynamic_streams_check_configs: # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__
111+
if check_config.dynamic_stream_name not in dynamic_stream_name_to_dynamic_stream:
112+
return (
113+
False,
114+
f"Dynamic stream {check_config.dynamic_stream_name} is not found in manifest.",
115+
)
116+
117+
generated = generated_streams.get(check_config.dynamic_stream_name, [])
118+
stream_availability, message = self._check_generated_streams_availability(
119+
generated, stream_name_to_stream, logger, check_config.stream_count
120+
)
121+
if not stream_availability:
122+
return stream_availability, message
123+
124+
return True, None
125+
126+
def _map_generated_streams(
127+
self, dynamic_streams: List[Dict[str, Any]]
128+
) -> Dict[str, List[Dict[str, Any]]]:
129+
"""Maps dynamic stream names to their corresponding generated streams."""
130+
mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
131+
for stream in dynamic_streams:
132+
mapped_streams.setdefault(stream["dynamic_stream_name"], []).append(stream)
133+
return mapped_streams
134+
135+
def _check_generated_streams_availability(
136+
self,
137+
generated_streams: List[Dict[str, Any]],
138+
stream_name_to_stream: Dict[str, Any],
139+
logger: logging.Logger,
140+
max_count: int,
141+
) -> Tuple[bool, Any]:
142+
"""Checks availability of generated dynamic streams."""
143+
availability_strategy = HttpAvailabilityStrategy()
144+
for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]:
145+
stream = stream_name_to_stream[declarative_stream["name"]]
45146
try:
46147
stream_is_available, reason = availability_strategy.check_availability(
47148
stream, logger
48149
)
49150
if not stream_is_available:
50-
return False, reason
151+
message = f"Dynamic Stream {stream.name} is not available: {reason}"
152+
logger.warning(message)
153+
return False, message
51154
except Exception as error:
52-
logger.error(
53-
f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}"
155+
return self._log_error(
156+
logger, f"checking availability of dynamic stream {stream.name}", error
54157
)
55-
return False, f"Unable to connect to stream {stream_name} - {error}"
56158
return True, None

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+22-1
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,6 @@ definitions:
316316
type: object
317317
required:
318318
- type
319-
- stream_names
320319
properties:
321320
type:
322321
type: string
@@ -330,6 +329,28 @@ definitions:
330329
examples:
331330
- ["users"]
332331
- ["users", "contacts"]
332+
dynamic_streams_check_configs:
333+
type: array
334+
items:
335+
"$ref": "#/definitions/DynamicStreamCheckConfig"
336+
DynamicStreamCheckConfig:
337+
type: object
338+
required:
339+
- type
340+
- dynamic_stream_name
341+
properties:
342+
type:
343+
type: string
344+
enum: [ DynamicStreamCheckConfig ]
345+
dynamic_stream_name:
346+
title: Dynamic Stream Name
347+
description: The dynamic stream name.
348+
type: string
349+
stream_count:
350+
title: Stream Count
351+
description: The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.
352+
type: integer
353+
default: 0
333354
CheckDynamicStream:
334355
title: Dynamic Streams to Check
335356
description: (This component is experimental. Use at your own risk.) Defines the dynamic streams to try reading when running a check operation.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+20-7
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,15 @@ class BearerAuthenticator(BaseModel):
4242
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
4343

4444

45-
class CheckStream(BaseModel):
46-
type: Literal["CheckStream"]
47-
stream_names: List[str] = Field(
48-
...,
49-
description="Names of the streams to try reading from when running a check operation.",
50-
examples=[["users"], ["users", "contacts"]],
51-
title="Stream Names",
45+
class DynamicStreamCheckConfig(BaseModel):
46+
type: Literal["DynamicStreamCheckConfig"]
47+
dynamic_stream_name: str = Field(
48+
..., description="The dynamic stream name.", title="Dynamic Stream Name"
49+
)
50+
stream_count: Optional[int] = Field(
51+
0,
52+
description="Numbers of the streams to try reading from when running a check operation.",
53+
title="Stream Count",
5254
)
5355

5456

@@ -1523,6 +1525,17 @@ class AuthFlow(BaseModel):
15231525
oauth_config_specification: Optional[OAuthConfigSpecification] = None
15241526

15251527

1528+
class CheckStream(BaseModel):
1529+
type: Literal["CheckStream"]
1530+
stream_names: Optional[List[str]] = Field(
1531+
None,
1532+
description="Names of the streams to try reading from when running a check operation.",
1533+
examples=[["users"], ["users", "contacts"]],
1534+
title="Stream Names",
1535+
)
1536+
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
1537+
1538+
15261539
class IncrementingCountCursor(BaseModel):
15271540
type: Literal["IncrementingCountCursor"]
15281541
cursor_field: str = Field(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+39-3
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,11 @@
5454
SessionTokenProvider,
5555
TokenProvider,
5656
)
57-
from airbyte_cdk.sources.declarative.checks import CheckDynamicStream, CheckStream
57+
from airbyte_cdk.sources.declarative.checks import (
58+
CheckDynamicStream,
59+
CheckStream,
60+
DynamicStreamCheckConfig,
61+
)
5862
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
5963
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
6064
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
@@ -218,6 +222,9 @@
218222
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
219223
DynamicSchemaLoader as DynamicSchemaLoaderModel,
220224
)
225+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
226+
DynamicStreamCheckConfig as DynamicStreamCheckConfigModel,
227+
)
221228
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
222229
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
223230
)
@@ -559,6 +566,7 @@ def _init_mappings(self) -> None:
559566
BasicHttpAuthenticatorModel: self.create_basic_http_authenticator,
560567
BearerAuthenticatorModel: self.create_bearer_authenticator,
561568
CheckStreamModel: self.create_check_stream,
569+
DynamicStreamCheckConfigModel: self.create_dynamic_stream_check_config,
562570
CheckDynamicStreamModel: self.create_check_dynamic_stream,
563571
CompositeErrorHandlerModel: self.create_composite_error_handler,
564572
ConcurrencyLevelModel: self.create_concurrency_level,
@@ -936,8 +944,36 @@ def create_bearer_authenticator(
936944
)
937945

938946
@staticmethod
939-
def create_check_stream(model: CheckStreamModel, config: Config, **kwargs: Any) -> CheckStream:
940-
return CheckStream(stream_names=model.stream_names, parameters={})
947+
def create_dynamic_stream_check_config(
948+
model: DynamicStreamCheckConfigModel, config: Config, **kwargs: Any
949+
) -> DynamicStreamCheckConfig:
950+
return DynamicStreamCheckConfig(
951+
dynamic_stream_name=model.dynamic_stream_name,
952+
stream_count=model.stream_count or 0,
953+
)
954+
955+
def create_check_stream(
956+
self, model: CheckStreamModel, config: Config, **kwargs: Any
957+
) -> CheckStream:
958+
if model.dynamic_streams_check_configs is None and model.stream_names is None:
959+
raise ValueError(
960+
"Expected either stream_names or dynamic_streams_check_configs to be set for CheckStream"
961+
)
962+
963+
dynamic_streams_check_configs = (
964+
[
965+
self._create_component_from_model(model=dynamic_stream_check_config, config=config)
966+
for dynamic_stream_check_config in model.dynamic_streams_check_configs
967+
]
968+
if model.dynamic_streams_check_configs
969+
else []
970+
)
971+
972+
return CheckStream(
973+
stream_names=model.stream_names or [],
974+
dynamic_streams_check_configs=dynamic_streams_check_configs,
975+
parameters={},
976+
)
941977

942978
@staticmethod
943979
def create_check_dynamic_stream(

0 commit comments

Comments
 (0)