Skip to content

feat(low-code cdk): add StateDelegatingStream #318

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 67 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
ad36c6e
Add PoC for state delegating retriever
lazebnyi Feb 6, 2025
1f01589
Auto-fix lint and format issues
Feb 6, 2025
a0e5d92
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Feb 6, 2025
3181ac2
Update annotations
lazebnyi Feb 6, 2025
5593d24
Merge master
lazebnyi Feb 6, 2025
f85a68e
Auto-fix lint and format issues
Feb 6, 2025
ff57a28
Update annotations for __getattr__
lazebnyi Feb 6, 2025
e46a88a
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Feb 6, 2025
1e71e63
Fix mypy
lazebnyi Feb 12, 2025
9706535
Add incremental_sync validation
lazebnyi Feb 12, 2025
63e9951
Move async retriever validation to quit faster
lazebnyi Feb 12, 2025
387cf09
Refactor stream slicer merge method
lazebnyi Feb 12, 2025
b78cc6e
Fix errors messages
lazebnyi Feb 12, 2025
53b2980
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Feb 12, 2025
14138ed
Auto-fix lint and format issues
Feb 12, 2025
bb3b176
Refactor _merge_stream_slicers
lazebnyi Feb 12, 2025
66001f1
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Feb 12, 2025
1a4b044
Auto-fix lint and format issues
Feb 12, 2025
8cbb9b2
Update retriever validation
lazebnyi Feb 12, 2025
407766d
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Feb 12, 2025
1c38282
Auto-fix lint and format issues
Feb 12, 2025
88d5adb
Rollback _merge_stream_slicers
lazebnyi Feb 12, 2025
d3a83a4
Merge master to branch
lazebnyi Feb 12, 2025
666c4fa
Auto-fix lint and format issues
Feb 12, 2025
8c1907a
Add ignore_first_request_options_provider and fix retriever in StateD…
lazebnyi Feb 14, 2025
8417712
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Feb 14, 2025
a05c391
Auto-fix lint and format issues
Feb 14, 2025
f0159de
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Feb 21, 2025
d7b0d25
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Mar 4, 2025
0cd7471
Fix mypy
lazebnyi Mar 4, 2025
8e7b2a3
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 4, 2025
9af489d
Update StateDelegatingRetriever
lazebnyi Mar 6, 2025
06cccc5
Auto-fix lint and format issues
Mar 6, 2025
b218f3a
Update unit test for StateDelegatingRetriever
lazebnyi Mar 6, 2025
b35e1e9
Merge master to branch
lazebnyi Mar 6, 2025
d29bd30
Auto-fix lint and format issues
Mar 6, 2025
bf5c241
Fix mypy
lazebnyi Mar 6, 2025
c70913d
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 6, 2025
6fb23f6
Auto-fix lint and format issues
Mar 6, 2025
43a56ed
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Mar 6, 2025
3481894
Rollback poetry.lock
lazebnyi Mar 6, 2025
35a83cd
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 6, 2025
19d1b22
Fix unit test
lazebnyi Mar 6, 2025
4ef852e
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Mar 7, 2025
11382f9
Add full_refresh_ignore_min_max_datetime flag
lazebnyi Mar 7, 2025
4862ec1
Auto-fix lint and format issues
Mar 7, 2025
3f92617
Move to a two-retriever instances approach
lazebnyi Mar 7, 2025
9eccc14
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 7, 2025
571ffa9
Auto-fix lint and format issues
Mar 7, 2025
63b156e
Fix mypy
lazebnyi Mar 7, 2025
5bf46a7
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 7, 2025
b0d5689
Update cocurrent source
lazebnyi Mar 12, 2025
204726a
Add StateDelegatingStream to schema
lazebnyi Mar 12, 2025
c89bc24
Add component to constructor
lazebnyi Mar 12, 2025
cc759dc
Add model
lazebnyi Mar 12, 2025
35f359d
Update parents resolving
lazebnyi Mar 12, 2025
6632d29
Update stream test
lazebnyi Mar 12, 2025
d2f352a
Remove state delegation retriver implementation
lazebnyi Mar 12, 2025
2846522
Remove state delegation retriver import
lazebnyi Mar 12, 2025
02030b5
Auto-fix lint and format issues
Mar 12, 2025
69bc211
Fix mypy
lazebnyi Mar 12, 2025
c83dce6
Fix mypy
lazebnyi Mar 12, 2025
52ba2ec
Update comment to pass mypy check
lazebnyi Mar 12, 2025
8465c56
Auto-fix lint and format issues
Mar 12, 2025
9e6134d
Remove copy import
lazebnyi Mar 12, 2025
8f2554e
Split unit test to two
lazebnyi Mar 12, 2025
d8222f3
Update child_stat to has_parent_state
lazebnyi Mar 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ def read(
else:
filtered_catalog = catalog

# It is no need run read for synchronous streams if they are not exists.
if not filtered_catalog.streams:
return

yield from super().read(logger, config, filtered_catalog, state)

def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
Expand Down Expand Up @@ -201,6 +205,18 @@ def _group_streams(
# Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
# so we need to treat them as synchronous

if name_to_stream_mapping[declarative_stream.name]["type"] == "StateDelegatingStream":
stream_state = self._connector_state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)

name_to_stream_mapping[declarative_stream.name] = (
name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
if stream_state
else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
)

if isinstance(declarative_stream, DeclarativeStream) and (
name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
== "SimpleRetriever"
Expand Down
38 changes: 36 additions & 2 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ properties:
streams:
type: array
items:
"$ref": "#/definitions/DeclarativeStream"
anyOf:
- "$ref": "#/definitions/DeclarativeStream"
- "$ref": "#/definitions/StateDelegatingStream"
dynamic_streams:
type: array
items:
Expand Down Expand Up @@ -2877,7 +2879,9 @@ definitions:
stream:
title: Parent Stream
description: Reference to the parent stream.
"$ref": "#/definitions/DeclarativeStream"
anyOf:
- "$ref": "#/definitions/DeclarativeStream"
- "$ref": "#/definitions/StateDelegatingStream"
partition_field:
title: Current Parent Key Value Identifier
description: While iterating over parent records during a sync, the parent_key value can be referenced by using this field.
Expand Down Expand Up @@ -3150,6 +3154,36 @@ definitions:
$parameters:
type: object
additionalProperties: true
StateDelegatingStream:
description: (This component is experimental. Use at your own risk.) Orchestrate the retriever's usage based on the state value.
type: object
required:
- type
- name
- full_refresh_stream
- incremental_stream
properties:
type:
type: string
enum: [ StateDelegatingStream ]
name:
title: Name
description: The stream name.
type: string
default: ""
example:
- "Users"
full_refresh_stream:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.
"$ref": "#/definitions/DeclarativeStream"
incremental_stream:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages when the state provided.
"$ref": "#/definitions/DeclarativeStream"
$parameters:
type: object
additionalProperties: true
SimpleRetriever:
description: Retrieves records by synchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router.
type: object
Expand Down
27 changes: 24 additions & 3 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
DeclarativeStream as DeclarativeStreamModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
StateDelegatingStream as StateDelegatingStreamModel,
)
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
get_registered_components_module,
)
Expand Down Expand Up @@ -143,7 +146,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:

source_streams = [
self._constructor.create_component(
DeclarativeStreamModel,
StateDelegatingStreamModel
if stream_config.get("type") == StateDelegatingStreamModel.__name__
else DeclarativeStreamModel,
stream_config,
config,
emit_connector_builder_messages=self._emit_connector_builder_messages,
Expand All @@ -162,7 +167,15 @@ def _initialize_cache_for_parent_streams(
def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None:
for parent_config in parent_configs:
parent_streams.add(parent_config["stream"]["name"])
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
if parent_config["stream"]["type"] == "StateDelegatingStream":
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
"use_cache"
] = True
parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
"use_cache"
] = True
else:
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True

for stream_config in stream_configs:
if stream_config.get("incremental_sync", {}).get("parent_stream"):
Expand All @@ -185,7 +198,15 @@ def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> No

for stream_config in stream_configs:
if stream_config["name"] in parent_streams:
stream_config["retriever"]["requester"]["use_cache"] = True
if stream_config["type"] == "StateDelegatingStream":
stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
True
)
stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
True
)
else:
stream_config["retriever"]["requester"]["use_cache"] = True

return stream_configs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1860,7 +1860,7 @@ class Config:

type: Literal["DeclarativeSource"]
check: Union[CheckStream, CheckDynamicStream]
streams: List[DeclarativeStream]
streams: List[Union[DeclarativeStream, StateDelegatingStream]]
dynamic_streams: Optional[List[DynamicDeclarativeStream]] = None
version: str = Field(
...,
Expand All @@ -1887,7 +1887,7 @@ class Config:

type: Literal["DeclarativeSource"]
check: Union[CheckStream, CheckDynamicStream]
streams: Optional[List[DeclarativeStream]] = None
streams: Optional[List[Union[DeclarativeStream, StateDelegatingStream]]] = None
dynamic_streams: List[DynamicDeclarativeStream]
version: str = Field(
...,
Expand Down Expand Up @@ -2201,7 +2201,7 @@ class ParentStreamConfig(BaseModel):
examples=["id", "{{ config['parent_record_id'] }}"],
title="Parent Key",
)
stream: DeclarativeStream = Field(
stream: Union[DeclarativeStream, StateDelegatingStream] = Field(
..., description="Reference to the parent stream.", title="Parent Stream"
)
partition_field: str = Field(
Expand All @@ -2228,6 +2228,22 @@ class ParentStreamConfig(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class StateDelegatingStream(BaseModel):
type: Literal["StateDelegatingStream"]
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
full_refresh_stream: DeclarativeStream = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.",
title="Retriever",
)
incremental_stream: DeclarativeStream = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages when the state provided.",
title="Retriever",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class SimpleRetriever(BaseModel):
type: Literal["SimpleRetriever"]
record_selector: RecordSelector = Field(
Expand Down Expand Up @@ -2413,5 +2429,6 @@ class DynamicDeclarativeStream(BaseModel):
DeclarativeStream.update_forward_refs()
SessionTokenAuthenticator.update_forward_refs()
DynamicSchemaLoader.update_forward_refs()
ParentStreamConfig.update_forward_refs()
SimpleRetriever.update_forward_refs()
AsyncRetriever.update_forward_refs()
Loading
Loading